1use std::fmt::Display;
2use std::ops::RangeInclusive;
3use std::path::Path;
4use std::pin::pin;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use celestia_types::hash::Hash;
9use celestia_types::ExtendedHeader;
10use cid::Cid;
11use redb::{
12 CommitError, Database, ReadTransaction, ReadableTable, StorageError, Table, TableDefinition,
13 TableError, TransactionError, WriteTransaction,
14};
15use tendermint_proto::Protobuf;
16use tokio::sync::Notify;
17use tokio::task::spawn_blocking;
18use tracing::warn;
19use tracing::{debug, trace};
20
21use crate::block_ranges::BlockRanges;
22use crate::store::utils::VerifiedExtendedHeaders;
23use crate::store::{Result, SamplingMetadata, Store, StoreError, StoreInsertionError};
24use crate::utils::Counter;
25
26use super::utils::{deserialize_extended_header, deserialize_sampling_metadata};
27
28const SCHEMA_VERSION: u64 = 3;
29
30const HEIGHTS_TABLE: TableDefinition<'static, &[u8], u64> = TableDefinition::new("STORE.HEIGHTS");
31const HEADERS_TABLE: TableDefinition<'static, u64, &[u8]> = TableDefinition::new("STORE.HEADERS");
32const SAMPLING_METADATA_TABLE: TableDefinition<'static, u64, &[u8]> =
33 TableDefinition::new("STORE.SAMPLING_METADATA");
34const SCHEMA_VERSION_TABLE: TableDefinition<'static, (), u64> =
35 TableDefinition::new("STORE.SCHEMA_VERSION");
36const RANGES_TABLE: TableDefinition<'static, &str, Vec<(u64, u64)>> =
37 TableDefinition::new("STORE.RANGES");
38
39const SAMPLED_RANGES_KEY: &str = "KEY.SAMPLED_RANGES";
40const HEADER_RANGES_KEY: &str = "KEY.HEADER_RANGES";
41const PRUNED_RANGES_KEY: &str = "KEY.PRUNED_RANGES";
42
43#[derive(Debug)]
45pub struct RedbStore {
46 inner: Arc<Inner>,
47 task_counter: Counter,
48}
49
50#[derive(Debug)]
51struct Inner {
52 db: Arc<Database>,
54 header_added_notifier: Notify,
56}
57
58impl RedbStore {
59 pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
61 let path = path.as_ref().to_owned();
62
63 let db = spawn_blocking(|| Database::create(path))
64 .await?
65 .map_err(|e| StoreError::OpenFailed(e.to_string()))?;
66
67 RedbStore::new(Arc::new(db)).await
68 }
69
70 pub async fn in_memory() -> Result<Self> {
72 let db = Database::builder()
73 .create_with_backend(redb::backends::InMemoryBackend::new())
74 .map_err(|e| StoreError::OpenFailed(e.to_string()))?;
75
76 RedbStore::new(Arc::new(db)).await
77 }
78
79 pub async fn new(db: Arc<Database>) -> Result<Self> {
81 let store = RedbStore {
82 inner: Arc::new(Inner {
83 db,
84 header_added_notifier: Notify::new(),
85 }),
86 task_counter: Counter::new(),
87 };
88
89 store
90 .write_tx(|tx| {
91 let mut schema_version_table = tx.open_table(SCHEMA_VERSION_TABLE)?;
92 let schema_version = schema_version_table.get(())?.map(|guard| guard.value());
93
94 match schema_version {
95 Some(schema_version) => {
96 if schema_version > SCHEMA_VERSION {
97 let e = format!(
98 "Incompatible database schema; found {schema_version}, expected {SCHEMA_VERSION}."
99 );
100 return Err(StoreError::OpenFailed(e));
101 }
102
103 migrate_v1_to_v2(tx, &mut schema_version_table)?;
105 migrate_v2_to_v3(tx, &mut schema_version_table)?;
106 }
107 None => {
108 schema_version_table.insert((), SCHEMA_VERSION)?;
110 }
111 }
112
113 debug_assert_eq!(
115 schema_version_table.get(())?.map(|guard| guard.value()),
116 Some(SCHEMA_VERSION),
117 "Some migrations are missing"
118 );
119
120 let _heights_table = tx.open_table(HEIGHTS_TABLE)?;
122 let _headers_table = tx.open_table(HEADERS_TABLE)?;
123 let _ranges_table = tx.open_table(RANGES_TABLE)?;
124 let _sampling_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
125
126 Ok(())
127 })
128 .await
129 .map_err(|e| match e {
130 e @ StoreError::OpenFailed(_) => e,
131 e => StoreError::OpenFailed(e.to_string()),
132 })?;
133
134 Ok(store)
135 }
136
137 pub fn raw_db(&self) -> Arc<Database> {
142 self.inner.db.clone()
143 }
144
145 async fn read_tx<F, T>(&self, f: F) -> Result<T>
147 where
148 F: FnOnce(&mut ReadTransaction) -> Result<T> + Send + 'static,
149 T: Send + 'static,
150 {
151 let inner = self.inner.clone();
152 let guard = self.task_counter.guard();
153
154 spawn_blocking(move || {
155 let _guard = guard;
156
157 {
158 let mut tx = inner.db.begin_read()?;
159 f(&mut tx)
160 }
161 })
162 .await?
163 }
164
165 async fn write_tx<F, T>(&self, f: F) -> Result<T>
169 where
170 F: FnOnce(&mut WriteTransaction) -> Result<T> + Send + 'static,
171 T: Send + 'static,
172 {
173 let inner = self.inner.clone();
174 let guard = self.task_counter.guard();
175
176 spawn_blocking(move || {
177 let _guard = guard;
178
179 {
180 let mut tx = inner.db.begin_write()?;
181 let res = f(&mut tx);
182
183 if res.is_ok() {
184 tx.commit()?;
185 } else {
186 tx.abort()?;
187 }
188
189 res
190 }
191 })
192 .await?
193 }
194
195 async fn head_height(&self) -> Result<u64> {
196 self.read_tx(|tx| {
197 let table = tx.open_table(RANGES_TABLE)?;
198 let header_ranges = get_ranges(&table, HEADER_RANGES_KEY)?;
199
200 header_ranges.head().ok_or(StoreError::NotFound)
201 })
202 .await
203 }
204
205 async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
206 let hash = *hash;
207
208 self.read_tx(move |tx| {
209 let heights_table = tx.open_table(HEIGHTS_TABLE)?;
210 let headers_table = tx.open_table(HEADERS_TABLE)?;
211
212 let height = get_height(&heights_table, hash.as_bytes())?;
213 get_header(&headers_table, height)
214 })
215 .await
216 }
217
218 async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
219 self.read_tx(move |tx| {
220 let table = tx.open_table(HEADERS_TABLE)?;
221 get_header(&table, height)
222 })
223 .await
224 }
225
226 async fn get_head(&self) -> Result<ExtendedHeader> {
227 self.read_tx(|tx| {
228 let ranges_table = tx.open_table(RANGES_TABLE)?;
229 let headers_table = tx.open_table(HEADERS_TABLE)?;
230
231 let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
232 let head = header_ranges.head().ok_or(StoreError::NotFound)?;
233
234 get_header(&headers_table, head)
235 })
236 .await
237 }
238
239 async fn contains_hash(&self, hash: &Hash) -> bool {
240 let hash = *hash;
241
242 self.read_tx(move |tx| {
243 let heights_table = tx.open_table(HEIGHTS_TABLE)?;
244 let headers_table = tx.open_table(HEADERS_TABLE)?;
245
246 let height = get_height(&heights_table, hash.as_bytes())?;
247 Ok(headers_table.get(height)?.is_some())
248 })
249 .await
250 .unwrap_or(false)
251 }
252
253 async fn contains_height(&self, height: u64) -> bool {
254 self.read_tx(move |tx| {
255 let headers_table = tx.open_table(HEADERS_TABLE)?;
256 Ok(headers_table.get(height)?.is_some())
257 })
258 .await
259 .unwrap_or(false)
260 }
261
262 async fn insert<R>(&self, headers: R) -> Result<()>
263 where
264 R: TryInto<VerifiedExtendedHeaders> + Send,
265 <R as TryInto<VerifiedExtendedHeaders>>::Error: Display,
266 {
267 let headers = headers
268 .try_into()
269 .map_err(|e| StoreInsertionError::HeadersVerificationFailed(e.to_string()))?;
270
271 self.write_tx(move |tx| {
272 let (Some(head), Some(tail)) = (headers.as_ref().first(), headers.as_ref().last())
273 else {
274 return Ok(());
275 };
276
277 let mut heights_table = tx.open_table(HEIGHTS_TABLE)?;
278 let mut headers_table = tx.open_table(HEADERS_TABLE)?;
279 let mut ranges_table = tx.open_table(RANGES_TABLE)?;
280
281 let mut header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
282 let mut sampled_ranges = get_ranges(&ranges_table, SAMPLED_RANGES_KEY)?;
283 let mut pruned_ranges = get_ranges(&ranges_table, PRUNED_RANGES_KEY)?;
284
285 let headers_range = head.height().value()..=tail.height().value();
286
287 let (prev_exists, next_exists) = header_ranges
288 .check_insertion_constraints(&headers_range)
289 .map_err(StoreInsertionError::ContraintsNotMet)?;
290
291 verify_against_neighbours(
292 &headers_table,
293 prev_exists.then_some(head),
294 next_exists.then_some(tail),
295 )?;
296
297 for header in headers {
298 let height = header.height().value();
299 let hash = header.hash();
300 let serialized_header = header.encode_vec();
301
302 if headers_table
303 .insert(height, &serialized_header[..])?
304 .is_some()
305 {
306 return Err(StoreError::StoredDataError(
307 "inconsistency between headers table and ranges table".into(),
308 ));
309 }
310
311 if heights_table.insert(hash.as_bytes(), height)?.is_some() {
312 return Err(StoreInsertionError::HashExists(hash).into());
315 }
316
317 trace!("Inserted header {hash} with height {height}");
318 }
319
320 header_ranges
321 .insert_relaxed(&headers_range)
322 .expect("invalid range");
323 sampled_ranges
324 .remove_relaxed(&headers_range)
325 .expect("invalid range");
326 pruned_ranges
327 .remove_relaxed(&headers_range)
328 .expect("invalid range");
329
330 set_ranges(&mut ranges_table, HEADER_RANGES_KEY, &header_ranges)?;
331 set_ranges(&mut ranges_table, SAMPLED_RANGES_KEY, &sampled_ranges)?;
332 set_ranges(&mut ranges_table, PRUNED_RANGES_KEY, &pruned_ranges)?;
333
334 debug!("Inserted header range {headers_range:?}",);
335
336 Ok(())
337 })
338 .await?;
339
340 self.inner.header_added_notifier.notify_waiters();
341
342 Ok(())
343 }
344
345 async fn update_sampling_metadata(&self, height: u64, cids: Vec<Cid>) -> Result<()> {
346 self.write_tx(move |tx| {
347 let mut sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
348 let ranges_table = tx.open_table(RANGES_TABLE)?;
349
350 let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
351
352 if !header_ranges.contains(height) {
353 return Err(StoreError::NotFound);
354 }
355
356 let previous = get_sampling_metadata(&sampling_metadata_table, height)?;
357
358 let entry = match previous {
359 Some(mut previous) => {
360 for cid in cids {
361 if !previous.cids.contains(&cid) {
362 previous.cids.push(cid);
363 }
364 }
365
366 previous
367 }
368 None => SamplingMetadata { cids },
369 };
370
371 let serialized = entry.encode_vec();
372 sampling_metadata_table.insert(height, &serialized[..])?;
373
374 Ok(())
375 })
376 .await
377 }
378
379 async fn mark_as_sampled(&self, height: u64) -> Result<()> {
380 self.write_tx(move |tx| {
381 let mut ranges_table = tx.open_table(RANGES_TABLE)?;
382 let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
383 let mut sampled_ranges = get_ranges(&ranges_table, SAMPLED_RANGES_KEY)?;
384
385 if !header_ranges.contains(height) {
386 return Err(StoreError::NotFound);
387 }
388
389 sampled_ranges
390 .insert_relaxed(height..=height)
391 .expect("invalid height");
392
393 set_ranges(&mut ranges_table, SAMPLED_RANGES_KEY, &sampled_ranges)?;
394
395 Ok(())
396 })
397 .await
398 }
399
400 async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
401 self.read_tx(move |tx| {
402 let headers_table = tx.open_table(HEADERS_TABLE)?;
403 let sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
404
405 if headers_table.get(height)?.is_none() {
406 return Err(StoreError::NotFound);
407 }
408
409 get_sampling_metadata(&sampling_metadata_table, height)
410 })
411 .await
412 }
413
414 async fn get_stored_ranges(&self) -> Result<BlockRanges> {
415 self.read_tx(|tx| {
416 let table = tx.open_table(RANGES_TABLE)?;
417 get_ranges(&table, HEADER_RANGES_KEY)
418 })
419 .await
420 }
421
422 async fn get_sampled_ranges(&self) -> Result<BlockRanges> {
423 self.read_tx(|tx| {
424 let table = tx.open_table(RANGES_TABLE)?;
425 get_ranges(&table, SAMPLED_RANGES_KEY)
426 })
427 .await
428 }
429
430 async fn get_pruned_ranges(&self) -> Result<BlockRanges> {
431 self.read_tx(|tx| {
432 let table = tx.open_table(RANGES_TABLE)?;
433 get_ranges(&table, PRUNED_RANGES_KEY)
434 })
435 .await
436 }
437
438 async fn remove_height(&self, height: u64) -> Result<()> {
439 self.write_tx(move |tx| {
440 let mut heights_table = tx.open_table(HEIGHTS_TABLE)?;
441 let mut headers_table = tx.open_table(HEADERS_TABLE)?;
442 let mut ranges_table = tx.open_table(RANGES_TABLE)?;
443 let mut sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
444
445 let mut header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
446 let mut sampled_ranges = get_ranges(&ranges_table, SAMPLED_RANGES_KEY)?;
447 let mut pruned_ranges = get_ranges(&ranges_table, PRUNED_RANGES_KEY)?;
448
449 if !header_ranges.contains(height) {
450 return Err(StoreError::NotFound);
451 }
452
453 let Some(header) = headers_table.remove(height)? else {
454 return Err(StoreError::StoredDataError(format!(
455 "inconsistency between ranges and height_to_hash tables, height {height}"
456 )));
457 };
458
459 let hash = ExtendedHeader::decode(header.value())
460 .map_err(|e| StoreError::StoredDataError(e.to_string()))?
461 .hash();
462
463 if heights_table.remove(hash.as_bytes())?.is_none() {
464 return Err(StoreError::StoredDataError(format!(
465 "inconsistency between header and height_to_hash tables, hash {hash}"
466 )));
467 }
468
469 sampling_metadata_table.remove(height)?;
470
471 header_ranges
472 .remove_relaxed(height..=height)
473 .expect("valid range never fails");
474 sampled_ranges
475 .remove_relaxed(height..=height)
476 .expect("valid range never fails");
477 pruned_ranges
478 .insert_relaxed(height..=height)
479 .expect("valid range never fails");
480
481 set_ranges(&mut ranges_table, HEADER_RANGES_KEY, &header_ranges)?;
482 set_ranges(&mut ranges_table, SAMPLED_RANGES_KEY, &sampled_ranges)?;
483 set_ranges(&mut ranges_table, PRUNED_RANGES_KEY, &pruned_ranges)?;
484
485 Ok(())
486 })
487 .await
488 }
489}
490
491#[async_trait]
492impl Store for RedbStore {
493 async fn get_head(&self) -> Result<ExtendedHeader> {
494 self.get_head().await
495 }
496
497 async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
498 self.get_by_hash(hash).await
499 }
500
501 async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
502 self.get_by_height(height).await
503 }
504
505 async fn wait_new_head(&self) -> u64 {
506 let head = self.head_height().await.unwrap_or(0);
507 let mut notifier = pin!(self.inner.header_added_notifier.notified());
508
509 loop {
510 let new_head = self.head_height().await.unwrap_or(0);
511
512 if head != new_head {
513 return new_head;
514 }
515
516 notifier.as_mut().await;
518
519 notifier.set(self.inner.header_added_notifier.notified());
521 }
522 }
523
524 async fn wait_height(&self, height: u64) -> Result<()> {
525 let mut notifier = pin!(self.inner.header_added_notifier.notified());
526
527 loop {
528 if self.contains_height(height).await {
529 return Ok(());
530 }
531
532 notifier.as_mut().await;
534
535 notifier.set(self.inner.header_added_notifier.notified());
537 }
538 }
539
540 async fn head_height(&self) -> Result<u64> {
541 self.head_height().await
542 }
543
544 async fn has(&self, hash: &Hash) -> bool {
545 self.contains_hash(hash).await
546 }
547
548 async fn has_at(&self, height: u64) -> bool {
549 self.contains_height(height).await
550 }
551
552 async fn insert<R>(&self, headers: R) -> Result<()>
553 where
554 R: TryInto<VerifiedExtendedHeaders> + Send,
555 <R as TryInto<VerifiedExtendedHeaders>>::Error: Display,
556 {
557 self.insert(headers).await
558 }
559
560 async fn update_sampling_metadata(&self, height: u64, cids: Vec<Cid>) -> Result<()> {
561 self.update_sampling_metadata(height, cids).await
562 }
563
564 async fn mark_as_sampled(&self, height: u64) -> Result<()> {
565 self.mark_as_sampled(height).await
566 }
567
568 async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
569 self.get_sampling_metadata(height).await
570 }
571
572 async fn get_stored_header_ranges(&self) -> Result<BlockRanges> {
573 Ok(self.get_stored_ranges().await?)
574 }
575
576 async fn get_sampled_ranges(&self) -> Result<BlockRanges> {
577 self.get_sampled_ranges().await
578 }
579
580 async fn get_pruned_ranges(&self) -> Result<BlockRanges> {
581 self.get_pruned_ranges().await
582 }
583
584 async fn remove_height(&self, height: u64) -> Result<()> {
585 self.remove_height(height).await
586 }
587
588 async fn close(mut self) -> Result<()> {
589 self.task_counter.wait_guards().await;
591 Ok(())
592 }
593}
594
595fn verify_against_neighbours<R>(
596 headers_table: &R,
597 lowest_header: Option<&ExtendedHeader>,
598 highest_header: Option<&ExtendedHeader>,
599) -> Result<()>
600where
601 R: ReadableTable<u64, &'static [u8]>,
602{
603 if let Some(lowest_header) = lowest_header {
604 let prev = get_header(headers_table, lowest_header.height().value() - 1).map_err(|e| {
605 if let StoreError::NotFound = e {
606 StoreError::StoredDataError("inconsistency between headers and ranges table".into())
607 } else {
608 e
609 }
610 })?;
611
612 prev.verify(lowest_header)
613 .map_err(|e| StoreInsertionError::NeighborsVerificationFailed(e.to_string()))?;
614 }
615
616 if let Some(highest_header) = highest_header {
617 let next = get_header(headers_table, highest_header.height().value() + 1).map_err(|e| {
618 if let StoreError::NotFound = e {
619 StoreError::StoredDataError("inconsistency between headers and ranges table".into())
620 } else {
621 e
622 }
623 })?;
624
625 highest_header
626 .verify(&next)
627 .map_err(|e| StoreInsertionError::NeighborsVerificationFailed(e.to_string()))?;
628 }
629
630 Ok(())
631}
632
633fn get_ranges<R>(ranges_table: &R, name: &str) -> Result<BlockRanges>
634where
635 R: ReadableTable<&'static str, Vec<(u64, u64)>>,
636{
637 let raw_ranges = ranges_table
638 .get(name)?
639 .map(|guard| {
640 guard
641 .value()
642 .iter()
643 .map(|(start, end)| *start..=*end)
644 .collect()
645 })
646 .unwrap_or_default();
647
648 BlockRanges::from_vec(raw_ranges).map_err(|e| {
649 let s = format!("Stored BlockRanges for {name} are invalid: {e}");
650 StoreError::StoredDataError(s)
651 })
652}
653
654fn set_ranges(
655 ranges_table: &mut Table<&str, Vec<(u64, u64)>>,
656 name: &str,
657 ranges: &BlockRanges,
658) -> Result<()> {
659 let raw_ranges: &[RangeInclusive<u64>] = ranges.as_ref();
660 let raw_ranges = raw_ranges
661 .iter()
662 .map(|range| (*range.start(), *range.end()))
663 .collect::<Vec<_>>();
664
665 ranges_table.insert(name, raw_ranges)?;
666
667 Ok(())
668}
669
670#[inline]
671fn get_height<R>(heights_table: &R, key: &[u8]) -> Result<u64>
672where
673 R: ReadableTable<&'static [u8], u64>,
674{
675 heights_table
676 .get(key)?
677 .map(|guard| guard.value())
678 .ok_or(StoreError::NotFound)
679}
680
681#[inline]
682fn get_header<R>(headers_table: &R, key: u64) -> Result<ExtendedHeader>
683where
684 R: ReadableTable<u64, &'static [u8]>,
685{
686 let serialized = headers_table.get(key)?.ok_or(StoreError::NotFound)?;
687 deserialize_extended_header(serialized.value())
688}
689
690#[inline]
691fn get_sampling_metadata<R>(
692 sampling_metadata_table: &R,
693 key: u64,
694) -> Result<Option<SamplingMetadata>>
695where
696 R: ReadableTable<u64, &'static [u8]>,
697{
698 sampling_metadata_table
699 .get(key)?
700 .map(|guard| deserialize_sampling_metadata(guard.value()))
701 .transpose()
702}
703
704impl From<TransactionError> for StoreError {
705 fn from(e: TransactionError) -> Self {
706 match e {
707 TransactionError::ReadTransactionStillInUse(_) => {
708 unreachable!("redb::ReadTransaction::close is never used")
709 }
710 e => StoreError::FatalDatabaseError(format!("TransactionError: {e}")),
711 }
712 }
713}
714
715impl From<TableError> for StoreError {
716 fn from(e: TableError) -> Self {
717 match e {
718 TableError::Storage(e) => e.into(),
719 TableError::TableAlreadyOpen(table, location) => {
720 panic!("Table {table} already opened from: {location}")
721 }
722 TableError::TableDoesNotExist(table) => {
723 panic!("Table {table} was not created on initialization")
724 }
725 e => StoreError::StoredDataError(format!("TableError: {e}")),
726 }
727 }
728}
729
730impl From<StorageError> for StoreError {
731 fn from(e: StorageError) -> Self {
732 match e {
733 StorageError::ValueTooLarge(_) => {
734 unreachable!("redb::Table::insert_reserve is never used")
735 }
736 e => StoreError::FatalDatabaseError(format!("StorageError: {e}")),
737 }
738 }
739}
740
741impl From<CommitError> for StoreError {
742 fn from(e: CommitError) -> Self {
743 StoreError::FatalDatabaseError(format!("CommitError: {e}"))
744 }
745}
746
747fn migrate_v1_to_v2(
748 tx: &WriteTransaction,
749 schema_version_table: &mut Table<(), u64>,
750) -> Result<()> {
751 const HEADER_HEIGHT_RANGES: TableDefinition<'static, u64, (u64, u64)> =
752 TableDefinition::new("STORE.HEIGHT_RANGES");
753
754 let version = schema_version_table
755 .get(())?
756 .map(|guard| guard.value())
757 .expect("migrations never run on new db");
758
759 if version >= 2 {
760 return Ok(());
762 }
763
764 debug_assert_eq!(version, 1);
765 warn!("Migrating DB schema from v1 to v2");
766
767 let header_ranges_table = tx.open_table(HEADER_HEIGHT_RANGES)?;
768 let mut ranges_table = tx.open_table(RANGES_TABLE)?;
769
770 let raw_ranges = header_ranges_table
771 .iter()?
772 .map(|range_guard| {
773 let range = range_guard?.1.value();
774 Ok((range.0, range.1))
775 })
776 .collect::<Result<Vec<_>>>()?;
777
778 tx.delete_table(header_ranges_table)?;
779 ranges_table.insert(HEADER_RANGES_KEY, raw_ranges)?;
780
781 schema_version_table.insert((), 2)?;
783
784 Ok(())
785}
786
787fn migrate_v2_to_v3(
788 tx: &WriteTransaction,
789 schema_version_table: &mut Table<(), u64>,
790) -> Result<()> {
791 let version = schema_version_table
792 .get(())?
793 .map(|guard| guard.value())
794 .expect("migrations never run on new db");
795
796 if version >= 3 {
797 return Ok(());
799 }
800
801 debug_assert_eq!(version, 2);
802 warn!("Migrating DB schema from v2 to v3");
803
804 let mut ranges_table = tx.open_table(RANGES_TABLE)?;
812 let sampled_ranges = get_ranges(&ranges_table, v2::SAMPLED_RANGES_KEY)?;
813 set_ranges(&mut ranges_table, SAMPLED_RANGES_KEY, &sampled_ranges)?;
814 ranges_table.remove(v2::SAMPLED_RANGES_KEY)?;
815
816 schema_version_table.insert((), 3)?;
818
819 Ok(())
820}
821
822mod v2 {
823 pub(super) const SAMPLED_RANGES_KEY: &str = "KEY.ACCEPTED_SAMPING_RANGES";
824}
825
826#[cfg(test)]
827pub mod tests {
828 use super::*;
829 use crate::test_utils::ExtendedHeaderGeneratorExt;
830 use celestia_types::test_utils::ExtendedHeaderGenerator;
831 use std::path::Path;
832 use tempfile::TempDir;
833
834 #[tokio::test]
835 async fn test_store_persistence() {
836 let db_dir = TempDir::with_prefix("lumina.store.test").unwrap();
837 let db = db_dir.path().join("db");
838
839 let (original_store, mut gen) = gen_filled_store(0, Some(&db)).await;
840 let mut original_headers = gen.next_many(20);
841
842 original_store
843 .insert(original_headers.clone())
844 .await
845 .expect("inserting test data failed");
846 drop(original_store);
847
848 let reopened_store = create_store(Some(&db)).await;
849
850 assert_eq!(
851 original_headers.last().unwrap().height().value(),
852 reopened_store.head_height().await.unwrap()
853 );
854 for original_header in &original_headers {
855 let stored_header = reopened_store
856 .get_by_height(original_header.height().value())
857 .await
858 .unwrap();
859 assert_eq!(original_header, &stored_header);
860 }
861
862 let mut new_headers = gen.next_many(10);
863 reopened_store
864 .insert(new_headers.clone())
865 .await
866 .expect("failed to insert data");
867 drop(reopened_store);
868
869 original_headers.append(&mut new_headers);
870
871 let reopened_store = create_store(Some(&db)).await;
872 assert_eq!(
873 original_headers.last().unwrap().height().value(),
874 reopened_store.head_height().await.unwrap()
875 );
876 for original_header in &original_headers {
877 let stored_header = reopened_store
878 .get_by_height(original_header.height().value())
879 .await
880 .unwrap();
881 assert_eq!(original_header, &stored_header);
882 }
883 }
884
885 #[tokio::test]
886 async fn test_separate_stores() {
887 let (store0, mut gen0) = gen_filled_store(0, None).await;
888 let store1 = create_store(None).await;
889
890 let headers = gen0.next_many(10);
891 store0.insert(headers.clone()).await.unwrap();
892 store1.insert(headers).await.unwrap();
893
894 let mut gen1 = gen0.fork();
895
896 store0.insert(gen0.next_many_verified(5)).await.unwrap();
897 store1.insert(gen1.next_many_verified(6)).await.unwrap();
898
899 assert_eq!(
900 store0.get_by_height(10).await.unwrap(),
901 store1.get_by_height(10).await.unwrap()
902 );
903 assert_ne!(
904 store0.get_by_height(11).await.unwrap(),
905 store1.get_by_height(11).await.unwrap()
906 );
907
908 assert_eq!(store0.head_height().await.unwrap(), 15);
909 assert_eq!(store1.head_height().await.unwrap(), 16);
910 }
911
912 #[tokio::test]
913 async fn migration_from_v2() {
914 const SCHEMA_VERSION_TABLE: TableDefinition<'static, (), u64> =
915 TableDefinition::new("STORE.SCHEMA_VERSION");
916 const RANGES_TABLE: TableDefinition<'static, &str, Vec<(u64, u64)>> =
917 TableDefinition::new("STORE.RANGES");
918
919 let db = Database::builder()
920 .create_with_backend(redb::backends::InMemoryBackend::new())
921 .unwrap();
922 let db = Arc::new(db);
923
924 tokio::task::spawn_blocking({
926 let db = db.clone();
927 move || {
928 let tx = db.begin_write().unwrap();
929
930 {
931 let mut schema_version_table = tx.open_table(SCHEMA_VERSION_TABLE).unwrap();
932 schema_version_table.insert((), 2).unwrap();
933
934 let mut ranges_table = tx.open_table(RANGES_TABLE).unwrap();
935 ranges_table
936 .insert(v2::SAMPLED_RANGES_KEY, vec![(123, 124)])
937 .unwrap();
938 }
939
940 tx.commit().unwrap();
941 }
942 })
943 .await
944 .unwrap();
945
946 let store = RedbStore::new(db.clone()).await.unwrap();
948 let ranges = store.get_sampled_ranges().await.unwrap();
949 assert_eq!(ranges, BlockRanges::try_from([123..=124]).unwrap());
950 store.close().await.unwrap();
951
952 tokio::task::spawn_blocking({
954 let db = db.clone();
955 move || {
956 let tx = db.begin_read().unwrap();
957 let ranges_table = tx.open_table(RANGES_TABLE).unwrap();
958 assert!(ranges_table.get(v2::SAMPLED_RANGES_KEY).unwrap().is_none());
959 }
960 })
961 .await
962 .unwrap();
963 }
964
965 pub async fn create_store(path: Option<&Path>) -> RedbStore {
966 match path {
967 Some(path) => RedbStore::open(path).await.unwrap(),
968 None => RedbStore::in_memory().await.unwrap(),
969 }
970 }
971
972 pub async fn gen_filled_store(
973 amount: u64,
974 path: Option<&Path>,
975 ) -> (RedbStore, ExtendedHeaderGenerator) {
976 let s = create_store(path).await;
977 let mut gen = ExtendedHeaderGenerator::new();
978 let headers = gen.next_many(amount);
979
980 s.insert(headers).await.expect("inserting test data failed");
981
982 (s, gen)
983 }
984}