lumina_node/store/
redb_store.rs

1use std::fmt::Display;
2use std::ops::RangeInclusive;
3use std::path::Path;
4use std::pin::pin;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use celestia_types::hash::Hash;
9use celestia_types::ExtendedHeader;
10use cid::Cid;
11use redb::{
12    CommitError, Database, ReadTransaction, ReadableTable, StorageError, Table, TableDefinition,
13    TableError, TransactionError, WriteTransaction,
14};
15use tendermint_proto::Protobuf;
16use tokio::sync::Notify;
17use tokio::task::spawn_blocking;
18use tracing::warn;
19use tracing::{debug, trace};
20
21use crate::block_ranges::BlockRanges;
22use crate::store::utils::VerifiedExtendedHeaders;
23use crate::store::{Result, SamplingMetadata, Store, StoreError, StoreInsertionError};
24use crate::utils::Counter;
25
26use super::utils::{deserialize_extended_header, deserialize_sampling_metadata};
27
28const SCHEMA_VERSION: u64 = 3;
29
30const HEIGHTS_TABLE: TableDefinition<'static, &[u8], u64> = TableDefinition::new("STORE.HEIGHTS");
31const HEADERS_TABLE: TableDefinition<'static, u64, &[u8]> = TableDefinition::new("STORE.HEADERS");
32const SAMPLING_METADATA_TABLE: TableDefinition<'static, u64, &[u8]> =
33    TableDefinition::new("STORE.SAMPLING_METADATA");
34const SCHEMA_VERSION_TABLE: TableDefinition<'static, (), u64> =
35    TableDefinition::new("STORE.SCHEMA_VERSION");
36const RANGES_TABLE: TableDefinition<'static, &str, Vec<(u64, u64)>> =
37    TableDefinition::new("STORE.RANGES");
38
39const SAMPLED_RANGES_KEY: &str = "KEY.SAMPLED_RANGES";
40const HEADER_RANGES_KEY: &str = "KEY.HEADER_RANGES";
41const PRUNED_RANGES_KEY: &str = "KEY.PRUNED_RANGES";
42
43/// A [`Store`] implementation based on a [`redb`] database.
44#[derive(Debug)]
45pub struct RedbStore {
46    inner: Arc<Inner>,
47    task_counter: Counter,
48}
49
50#[derive(Debug)]
51struct Inner {
52    /// Reference to the entire redb database
53    db: Arc<Database>,
54    /// Notify when a new header is added
55    header_added_notifier: Notify,
56}
57
58impl RedbStore {
59    /// Open a persistent [`redb`] store.
60    pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
61        let path = path.as_ref().to_owned();
62
63        let db = spawn_blocking(|| Database::create(path))
64            .await?
65            .map_err(|e| StoreError::OpenFailed(e.to_string()))?;
66
67        RedbStore::new(Arc::new(db)).await
68    }
69
70    /// Open an in memory [`redb`] store.
71    pub async fn in_memory() -> Result<Self> {
72        let db = Database::builder()
73            .create_with_backend(redb::backends::InMemoryBackend::new())
74            .map_err(|e| StoreError::OpenFailed(e.to_string()))?;
75
76        RedbStore::new(Arc::new(db)).await
77    }
78
79    /// Create new `RedbStore` with an already opened [`redb::Database`].
80    pub async fn new(db: Arc<Database>) -> Result<Self> {
81        let store = RedbStore {
82            inner: Arc::new(Inner {
83                db,
84                header_added_notifier: Notify::new(),
85            }),
86            task_counter: Counter::new(),
87        };
88
89        store
90            .write_tx(|tx| {
91                let mut schema_version_table = tx.open_table(SCHEMA_VERSION_TABLE)?;
92                let schema_version = schema_version_table.get(())?.map(|guard| guard.value());
93
94                match schema_version {
95                    Some(schema_version) => {
96                        if schema_version > SCHEMA_VERSION {
97                            let e = format!(
98                                "Incompatible database schema; found {schema_version}, expected {SCHEMA_VERSION}."
99                            );
100                            return Err(StoreError::OpenFailed(e));
101                        }
102
103                        // Do migrations
104                        migrate_v1_to_v2(tx, &mut schema_version_table)?;
105                        migrate_v2_to_v3(tx, &mut schema_version_table)?;
106                    }
107                    None => {
108                        // New database
109                        schema_version_table.insert((), SCHEMA_VERSION)?;
110                    }
111                }
112
113                // Force us to write migrations!
114                debug_assert_eq!(
115                    schema_version_table.get(())?.map(|guard| guard.value()),
116                    Some(SCHEMA_VERSION),
117                    "Some migrations are missing"
118                );
119
120                // create tables, so that reads later don't complain
121                let _heights_table = tx.open_table(HEIGHTS_TABLE)?;
122                let _headers_table = tx.open_table(HEADERS_TABLE)?;
123                let _ranges_table = tx.open_table(RANGES_TABLE)?;
124                let _sampling_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
125
126                Ok(())
127            })
128            .await
129            .map_err(|e| match e {
130                e @ StoreError::OpenFailed(_) => e,
131                e => StoreError::OpenFailed(e.to_string()),
132            })?;
133
134        Ok(store)
135    }
136
137    /// Returns the raw [`redb::Database`].
138    ///
139    /// This is useful if you want to pass the database handle to any other
140    /// stores (e.g. [`blockstore`]).
141    pub fn raw_db(&self) -> Arc<Database> {
142        self.inner.db.clone()
143    }
144
145    /// Execute a read transaction.
146    async fn read_tx<F, T>(&self, f: F) -> Result<T>
147    where
148        F: FnOnce(&mut ReadTransaction) -> Result<T> + Send + 'static,
149        T: Send + 'static,
150    {
151        let inner = self.inner.clone();
152        let guard = self.task_counter.guard();
153
154        spawn_blocking(move || {
155            let _guard = guard;
156
157            {
158                let mut tx = inner.db.begin_read()?;
159                f(&mut tx)
160            }
161        })
162        .await?
163    }
164
165    /// Execute a write transaction.
166    ///
167    /// If closure returns an error the transaction is aborted, otherwise commited.
168    async fn write_tx<F, T>(&self, f: F) -> Result<T>
169    where
170        F: FnOnce(&mut WriteTransaction) -> Result<T> + Send + 'static,
171        T: Send + 'static,
172    {
173        let inner = self.inner.clone();
174        let guard = self.task_counter.guard();
175
176        spawn_blocking(move || {
177            let _guard = guard;
178
179            {
180                let mut tx = inner.db.begin_write()?;
181                let res = f(&mut tx);
182
183                if res.is_ok() {
184                    tx.commit()?;
185                } else {
186                    tx.abort()?;
187                }
188
189                res
190            }
191        })
192        .await?
193    }
194
195    async fn head_height(&self) -> Result<u64> {
196        self.read_tx(|tx| {
197            let table = tx.open_table(RANGES_TABLE)?;
198            let header_ranges = get_ranges(&table, HEADER_RANGES_KEY)?;
199
200            header_ranges.head().ok_or(StoreError::NotFound)
201        })
202        .await
203    }
204
205    async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
206        let hash = *hash;
207
208        self.read_tx(move |tx| {
209            let heights_table = tx.open_table(HEIGHTS_TABLE)?;
210            let headers_table = tx.open_table(HEADERS_TABLE)?;
211
212            let height = get_height(&heights_table, hash.as_bytes())?;
213            get_header(&headers_table, height)
214        })
215        .await
216    }
217
218    async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
219        self.read_tx(move |tx| {
220            let table = tx.open_table(HEADERS_TABLE)?;
221            get_header(&table, height)
222        })
223        .await
224    }
225
226    async fn get_head(&self) -> Result<ExtendedHeader> {
227        self.read_tx(|tx| {
228            let ranges_table = tx.open_table(RANGES_TABLE)?;
229            let headers_table = tx.open_table(HEADERS_TABLE)?;
230
231            let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
232            let head = header_ranges.head().ok_or(StoreError::NotFound)?;
233
234            get_header(&headers_table, head)
235        })
236        .await
237    }
238
239    async fn contains_hash(&self, hash: &Hash) -> bool {
240        let hash = *hash;
241
242        self.read_tx(move |tx| {
243            let heights_table = tx.open_table(HEIGHTS_TABLE)?;
244            let headers_table = tx.open_table(HEADERS_TABLE)?;
245
246            let height = get_height(&heights_table, hash.as_bytes())?;
247            Ok(headers_table.get(height)?.is_some())
248        })
249        .await
250        .unwrap_or(false)
251    }
252
253    async fn contains_height(&self, height: u64) -> bool {
254        self.read_tx(move |tx| {
255            let headers_table = tx.open_table(HEADERS_TABLE)?;
256            Ok(headers_table.get(height)?.is_some())
257        })
258        .await
259        .unwrap_or(false)
260    }
261
262    async fn insert<R>(&self, headers: R) -> Result<()>
263    where
264        R: TryInto<VerifiedExtendedHeaders> + Send,
265        <R as TryInto<VerifiedExtendedHeaders>>::Error: Display,
266    {
267        let headers = headers
268            .try_into()
269            .map_err(|e| StoreInsertionError::HeadersVerificationFailed(e.to_string()))?;
270
271        self.write_tx(move |tx| {
272            let (Some(head), Some(tail)) = (headers.as_ref().first(), headers.as_ref().last())
273            else {
274                return Ok(());
275            };
276
277            let mut heights_table = tx.open_table(HEIGHTS_TABLE)?;
278            let mut headers_table = tx.open_table(HEADERS_TABLE)?;
279            let mut ranges_table = tx.open_table(RANGES_TABLE)?;
280
281            let mut header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
282            let mut sampled_ranges = get_ranges(&ranges_table, SAMPLED_RANGES_KEY)?;
283            let mut pruned_ranges = get_ranges(&ranges_table, PRUNED_RANGES_KEY)?;
284
285            let headers_range = head.height().value()..=tail.height().value();
286
287            let (prev_exists, next_exists) = header_ranges
288                .check_insertion_constraints(&headers_range)
289                .map_err(StoreInsertionError::ContraintsNotMet)?;
290
291            verify_against_neighbours(
292                &headers_table,
293                prev_exists.then_some(head),
294                next_exists.then_some(tail),
295            )?;
296
297            for header in headers {
298                let height = header.height().value();
299                let hash = header.hash();
300                let serialized_header = header.encode_vec();
301
302                if headers_table
303                    .insert(height, &serialized_header[..])?
304                    .is_some()
305                {
306                    return Err(StoreError::StoredDataError(
307                        "inconsistency between headers table and ranges table".into(),
308                    ));
309                }
310
311                if heights_table.insert(hash.as_bytes(), height)?.is_some() {
312                    // TODO: Replace this with `StoredDataError` when we implement
313                    // type-safe validation on insertion.
314                    return Err(StoreInsertionError::HashExists(hash).into());
315                }
316
317                trace!("Inserted header {hash} with height {height}");
318            }
319
320            header_ranges
321                .insert_relaxed(&headers_range)
322                .expect("invalid range");
323            sampled_ranges
324                .remove_relaxed(&headers_range)
325                .expect("invalid range");
326            pruned_ranges
327                .remove_relaxed(&headers_range)
328                .expect("invalid range");
329
330            set_ranges(&mut ranges_table, HEADER_RANGES_KEY, &header_ranges)?;
331            set_ranges(&mut ranges_table, SAMPLED_RANGES_KEY, &sampled_ranges)?;
332            set_ranges(&mut ranges_table, PRUNED_RANGES_KEY, &pruned_ranges)?;
333
334            debug!("Inserted header range {headers_range:?}",);
335
336            Ok(())
337        })
338        .await?;
339
340        self.inner.header_added_notifier.notify_waiters();
341
342        Ok(())
343    }
344
345    async fn update_sampling_metadata(&self, height: u64, cids: Vec<Cid>) -> Result<()> {
346        self.write_tx(move |tx| {
347            let mut sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
348            let ranges_table = tx.open_table(RANGES_TABLE)?;
349
350            let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
351
352            if !header_ranges.contains(height) {
353                return Err(StoreError::NotFound);
354            }
355
356            let previous = get_sampling_metadata(&sampling_metadata_table, height)?;
357
358            let entry = match previous {
359                Some(mut previous) => {
360                    for cid in cids {
361                        if !previous.cids.contains(&cid) {
362                            previous.cids.push(cid);
363                        }
364                    }
365
366                    previous
367                }
368                None => SamplingMetadata { cids },
369            };
370
371            let serialized = entry.encode_vec();
372            sampling_metadata_table.insert(height, &serialized[..])?;
373
374            Ok(())
375        })
376        .await
377    }
378
379    async fn mark_as_sampled(&self, height: u64) -> Result<()> {
380        self.write_tx(move |tx| {
381            let mut ranges_table = tx.open_table(RANGES_TABLE)?;
382            let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
383            let mut sampled_ranges = get_ranges(&ranges_table, SAMPLED_RANGES_KEY)?;
384
385            if !header_ranges.contains(height) {
386                return Err(StoreError::NotFound);
387            }
388
389            sampled_ranges
390                .insert_relaxed(height..=height)
391                .expect("invalid height");
392
393            set_ranges(&mut ranges_table, SAMPLED_RANGES_KEY, &sampled_ranges)?;
394
395            Ok(())
396        })
397        .await
398    }
399
400    async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
401        self.read_tx(move |tx| {
402            let headers_table = tx.open_table(HEADERS_TABLE)?;
403            let sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
404
405            if headers_table.get(height)?.is_none() {
406                return Err(StoreError::NotFound);
407            }
408
409            get_sampling_metadata(&sampling_metadata_table, height)
410        })
411        .await
412    }
413
414    async fn get_stored_ranges(&self) -> Result<BlockRanges> {
415        self.read_tx(|tx| {
416            let table = tx.open_table(RANGES_TABLE)?;
417            get_ranges(&table, HEADER_RANGES_KEY)
418        })
419        .await
420    }
421
422    async fn get_sampled_ranges(&self) -> Result<BlockRanges> {
423        self.read_tx(|tx| {
424            let table = tx.open_table(RANGES_TABLE)?;
425            get_ranges(&table, SAMPLED_RANGES_KEY)
426        })
427        .await
428    }
429
430    async fn get_pruned_ranges(&self) -> Result<BlockRanges> {
431        self.read_tx(|tx| {
432            let table = tx.open_table(RANGES_TABLE)?;
433            get_ranges(&table, PRUNED_RANGES_KEY)
434        })
435        .await
436    }
437
438    async fn remove_height(&self, height: u64) -> Result<()> {
439        self.write_tx(move |tx| {
440            let mut heights_table = tx.open_table(HEIGHTS_TABLE)?;
441            let mut headers_table = tx.open_table(HEADERS_TABLE)?;
442            let mut ranges_table = tx.open_table(RANGES_TABLE)?;
443            let mut sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
444
445            let mut header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
446            let mut sampled_ranges = get_ranges(&ranges_table, SAMPLED_RANGES_KEY)?;
447            let mut pruned_ranges = get_ranges(&ranges_table, PRUNED_RANGES_KEY)?;
448
449            if !header_ranges.contains(height) {
450                return Err(StoreError::NotFound);
451            }
452
453            let Some(header) = headers_table.remove(height)? else {
454                return Err(StoreError::StoredDataError(format!(
455                    "inconsistency between ranges and height_to_hash tables, height {height}"
456                )));
457            };
458
459            let hash = ExtendedHeader::decode(header.value())
460                .map_err(|e| StoreError::StoredDataError(e.to_string()))?
461                .hash();
462
463            if heights_table.remove(hash.as_bytes())?.is_none() {
464                return Err(StoreError::StoredDataError(format!(
465                    "inconsistency between header and height_to_hash tables, hash {hash}"
466                )));
467            }
468
469            sampling_metadata_table.remove(height)?;
470
471            header_ranges
472                .remove_relaxed(height..=height)
473                .expect("valid range never fails");
474            sampled_ranges
475                .remove_relaxed(height..=height)
476                .expect("valid range never fails");
477            pruned_ranges
478                .insert_relaxed(height..=height)
479                .expect("valid range never fails");
480
481            set_ranges(&mut ranges_table, HEADER_RANGES_KEY, &header_ranges)?;
482            set_ranges(&mut ranges_table, SAMPLED_RANGES_KEY, &sampled_ranges)?;
483            set_ranges(&mut ranges_table, PRUNED_RANGES_KEY, &pruned_ranges)?;
484
485            Ok(())
486        })
487        .await
488    }
489}
490
491#[async_trait]
492impl Store for RedbStore {
493    async fn get_head(&self) -> Result<ExtendedHeader> {
494        self.get_head().await
495    }
496
497    async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
498        self.get_by_hash(hash).await
499    }
500
501    async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
502        self.get_by_height(height).await
503    }
504
505    async fn wait_new_head(&self) -> u64 {
506        let head = self.head_height().await.unwrap_or(0);
507        let mut notifier = pin!(self.inner.header_added_notifier.notified());
508
509        loop {
510            let new_head = self.head_height().await.unwrap_or(0);
511
512            if head != new_head {
513                return new_head;
514            }
515
516            // Await for a notification
517            notifier.as_mut().await;
518
519            // Reset notifier
520            notifier.set(self.inner.header_added_notifier.notified());
521        }
522    }
523
524    async fn wait_height(&self, height: u64) -> Result<()> {
525        let mut notifier = pin!(self.inner.header_added_notifier.notified());
526
527        loop {
528            if self.contains_height(height).await {
529                return Ok(());
530            }
531
532            // Await for a notification
533            notifier.as_mut().await;
534
535            // Reset notifier
536            notifier.set(self.inner.header_added_notifier.notified());
537        }
538    }
539
540    async fn head_height(&self) -> Result<u64> {
541        self.head_height().await
542    }
543
544    async fn has(&self, hash: &Hash) -> bool {
545        self.contains_hash(hash).await
546    }
547
548    async fn has_at(&self, height: u64) -> bool {
549        self.contains_height(height).await
550    }
551
552    async fn insert<R>(&self, headers: R) -> Result<()>
553    where
554        R: TryInto<VerifiedExtendedHeaders> + Send,
555        <R as TryInto<VerifiedExtendedHeaders>>::Error: Display,
556    {
557        self.insert(headers).await
558    }
559
560    async fn update_sampling_metadata(&self, height: u64, cids: Vec<Cid>) -> Result<()> {
561        self.update_sampling_metadata(height, cids).await
562    }
563
564    async fn mark_as_sampled(&self, height: u64) -> Result<()> {
565        self.mark_as_sampled(height).await
566    }
567
568    async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
569        self.get_sampling_metadata(height).await
570    }
571
572    async fn get_stored_header_ranges(&self) -> Result<BlockRanges> {
573        Ok(self.get_stored_ranges().await?)
574    }
575
576    async fn get_sampled_ranges(&self) -> Result<BlockRanges> {
577        self.get_sampled_ranges().await
578    }
579
580    async fn get_pruned_ranges(&self) -> Result<BlockRanges> {
581        self.get_pruned_ranges().await
582    }
583
584    async fn remove_height(&self, height: u64) -> Result<()> {
585        self.remove_height(height).await
586    }
587
588    async fn close(mut self) -> Result<()> {
589        // Wait all ongoing `spawn_blocking` tasks to finish.
590        self.task_counter.wait_guards().await;
591        Ok(())
592    }
593}
594
595fn verify_against_neighbours<R>(
596    headers_table: &R,
597    lowest_header: Option<&ExtendedHeader>,
598    highest_header: Option<&ExtendedHeader>,
599) -> Result<()>
600where
601    R: ReadableTable<u64, &'static [u8]>,
602{
603    if let Some(lowest_header) = lowest_header {
604        let prev = get_header(headers_table, lowest_header.height().value() - 1).map_err(|e| {
605            if let StoreError::NotFound = e {
606                StoreError::StoredDataError("inconsistency between headers and ranges table".into())
607            } else {
608                e
609            }
610        })?;
611
612        prev.verify(lowest_header)
613            .map_err(|e| StoreInsertionError::NeighborsVerificationFailed(e.to_string()))?;
614    }
615
616    if let Some(highest_header) = highest_header {
617        let next = get_header(headers_table, highest_header.height().value() + 1).map_err(|e| {
618            if let StoreError::NotFound = e {
619                StoreError::StoredDataError("inconsistency between headers and ranges table".into())
620            } else {
621                e
622            }
623        })?;
624
625        highest_header
626            .verify(&next)
627            .map_err(|e| StoreInsertionError::NeighborsVerificationFailed(e.to_string()))?;
628    }
629
630    Ok(())
631}
632
633fn get_ranges<R>(ranges_table: &R, name: &str) -> Result<BlockRanges>
634where
635    R: ReadableTable<&'static str, Vec<(u64, u64)>>,
636{
637    let raw_ranges = ranges_table
638        .get(name)?
639        .map(|guard| {
640            guard
641                .value()
642                .iter()
643                .map(|(start, end)| *start..=*end)
644                .collect()
645        })
646        .unwrap_or_default();
647
648    BlockRanges::from_vec(raw_ranges).map_err(|e| {
649        let s = format!("Stored BlockRanges for {name} are invalid: {e}");
650        StoreError::StoredDataError(s)
651    })
652}
653
654fn set_ranges(
655    ranges_table: &mut Table<&str, Vec<(u64, u64)>>,
656    name: &str,
657    ranges: &BlockRanges,
658) -> Result<()> {
659    let raw_ranges: &[RangeInclusive<u64>] = ranges.as_ref();
660    let raw_ranges = raw_ranges
661        .iter()
662        .map(|range| (*range.start(), *range.end()))
663        .collect::<Vec<_>>();
664
665    ranges_table.insert(name, raw_ranges)?;
666
667    Ok(())
668}
669
670#[inline]
671fn get_height<R>(heights_table: &R, key: &[u8]) -> Result<u64>
672where
673    R: ReadableTable<&'static [u8], u64>,
674{
675    heights_table
676        .get(key)?
677        .map(|guard| guard.value())
678        .ok_or(StoreError::NotFound)
679}
680
681#[inline]
682fn get_header<R>(headers_table: &R, key: u64) -> Result<ExtendedHeader>
683where
684    R: ReadableTable<u64, &'static [u8]>,
685{
686    let serialized = headers_table.get(key)?.ok_or(StoreError::NotFound)?;
687    deserialize_extended_header(serialized.value())
688}
689
690#[inline]
691fn get_sampling_metadata<R>(
692    sampling_metadata_table: &R,
693    key: u64,
694) -> Result<Option<SamplingMetadata>>
695where
696    R: ReadableTable<u64, &'static [u8]>,
697{
698    sampling_metadata_table
699        .get(key)?
700        .map(|guard| deserialize_sampling_metadata(guard.value()))
701        .transpose()
702}
703
704impl From<TransactionError> for StoreError {
705    fn from(e: TransactionError) -> Self {
706        match e {
707            TransactionError::ReadTransactionStillInUse(_) => {
708                unreachable!("redb::ReadTransaction::close is never used")
709            }
710            e => StoreError::FatalDatabaseError(format!("TransactionError: {e}")),
711        }
712    }
713}
714
715impl From<TableError> for StoreError {
716    fn from(e: TableError) -> Self {
717        match e {
718            TableError::Storage(e) => e.into(),
719            TableError::TableAlreadyOpen(table, location) => {
720                panic!("Table {table} already opened from: {location}")
721            }
722            TableError::TableDoesNotExist(table) => {
723                panic!("Table {table} was not created on initialization")
724            }
725            e => StoreError::StoredDataError(format!("TableError: {e}")),
726        }
727    }
728}
729
730impl From<StorageError> for StoreError {
731    fn from(e: StorageError) -> Self {
732        match e {
733            StorageError::ValueTooLarge(_) => {
734                unreachable!("redb::Table::insert_reserve is never used")
735            }
736            e => StoreError::FatalDatabaseError(format!("StorageError: {e}")),
737        }
738    }
739}
740
741impl From<CommitError> for StoreError {
742    fn from(e: CommitError) -> Self {
743        StoreError::FatalDatabaseError(format!("CommitError: {e}"))
744    }
745}
746
747fn migrate_v1_to_v2(
748    tx: &WriteTransaction,
749    schema_version_table: &mut Table<(), u64>,
750) -> Result<()> {
751    const HEADER_HEIGHT_RANGES: TableDefinition<'static, u64, (u64, u64)> =
752        TableDefinition::new("STORE.HEIGHT_RANGES");
753
754    let version = schema_version_table
755        .get(())?
756        .map(|guard| guard.value())
757        .expect("migrations never run on new db");
758
759    if version >= 2 {
760        // Nothing to migrate.
761        return Ok(());
762    }
763
764    debug_assert_eq!(version, 1);
765    warn!("Migrating DB schema from v1 to v2");
766
767    let header_ranges_table = tx.open_table(HEADER_HEIGHT_RANGES)?;
768    let mut ranges_table = tx.open_table(RANGES_TABLE)?;
769
770    let raw_ranges = header_ranges_table
771        .iter()?
772        .map(|range_guard| {
773            let range = range_guard?.1.value();
774            Ok((range.0, range.1))
775        })
776        .collect::<Result<Vec<_>>>()?;
777
778    tx.delete_table(header_ranges_table)?;
779    ranges_table.insert(HEADER_RANGES_KEY, raw_ranges)?;
780
781    // Migrated to v2
782    schema_version_table.insert((), 2)?;
783
784    Ok(())
785}
786
787fn migrate_v2_to_v3(
788    tx: &WriteTransaction,
789    schema_version_table: &mut Table<(), u64>,
790) -> Result<()> {
791    let version = schema_version_table
792        .get(())?
793        .map(|guard| guard.value())
794        .expect("migrations never run on new db");
795
796    if version >= 3 {
797        // Nothing to migrate.
798        return Ok(());
799    }
800
801    debug_assert_eq!(version, 2);
802    warn!("Migrating DB schema from v2 to v3");
803
804    // There are two chages in v3:
805    //
806    // * Removal of `SamplingStatus` in `SamplingMetadata`
807    // * Rename of sampled ranges database key
808    //
809    // For the first one we don't need to take any actions because it will
810    // be ingored by the deserializer.
811    let mut ranges_table = tx.open_table(RANGES_TABLE)?;
812    let sampled_ranges = get_ranges(&ranges_table, v2::SAMPLED_RANGES_KEY)?;
813    set_ranges(&mut ranges_table, SAMPLED_RANGES_KEY, &sampled_ranges)?;
814    ranges_table.remove(v2::SAMPLED_RANGES_KEY)?;
815
816    // Migrated to v3
817    schema_version_table.insert((), 3)?;
818
819    Ok(())
820}
821
822mod v2 {
823    pub(super) const SAMPLED_RANGES_KEY: &str = "KEY.ACCEPTED_SAMPING_RANGES";
824}
825
826#[cfg(test)]
827pub mod tests {
828    use super::*;
829    use crate::test_utils::ExtendedHeaderGeneratorExt;
830    use celestia_types::test_utils::ExtendedHeaderGenerator;
831    use std::path::Path;
832    use tempfile::TempDir;
833
834    #[tokio::test]
835    async fn test_store_persistence() {
836        let db_dir = TempDir::with_prefix("lumina.store.test").unwrap();
837        let db = db_dir.path().join("db");
838
839        let (original_store, mut gen) = gen_filled_store(0, Some(&db)).await;
840        let mut original_headers = gen.next_many(20);
841
842        original_store
843            .insert(original_headers.clone())
844            .await
845            .expect("inserting test data failed");
846        drop(original_store);
847
848        let reopened_store = create_store(Some(&db)).await;
849
850        assert_eq!(
851            original_headers.last().unwrap().height().value(),
852            reopened_store.head_height().await.unwrap()
853        );
854        for original_header in &original_headers {
855            let stored_header = reopened_store
856                .get_by_height(original_header.height().value())
857                .await
858                .unwrap();
859            assert_eq!(original_header, &stored_header);
860        }
861
862        let mut new_headers = gen.next_many(10);
863        reopened_store
864            .insert(new_headers.clone())
865            .await
866            .expect("failed to insert data");
867        drop(reopened_store);
868
869        original_headers.append(&mut new_headers);
870
871        let reopened_store = create_store(Some(&db)).await;
872        assert_eq!(
873            original_headers.last().unwrap().height().value(),
874            reopened_store.head_height().await.unwrap()
875        );
876        for original_header in &original_headers {
877            let stored_header = reopened_store
878                .get_by_height(original_header.height().value())
879                .await
880                .unwrap();
881            assert_eq!(original_header, &stored_header);
882        }
883    }
884
885    #[tokio::test]
886    async fn test_separate_stores() {
887        let (store0, mut gen0) = gen_filled_store(0, None).await;
888        let store1 = create_store(None).await;
889
890        let headers = gen0.next_many(10);
891        store0.insert(headers.clone()).await.unwrap();
892        store1.insert(headers).await.unwrap();
893
894        let mut gen1 = gen0.fork();
895
896        store0.insert(gen0.next_many_verified(5)).await.unwrap();
897        store1.insert(gen1.next_many_verified(6)).await.unwrap();
898
899        assert_eq!(
900            store0.get_by_height(10).await.unwrap(),
901            store1.get_by_height(10).await.unwrap()
902        );
903        assert_ne!(
904            store0.get_by_height(11).await.unwrap(),
905            store1.get_by_height(11).await.unwrap()
906        );
907
908        assert_eq!(store0.head_height().await.unwrap(), 15);
909        assert_eq!(store1.head_height().await.unwrap(), 16);
910    }
911
912    #[tokio::test]
913    async fn migration_from_v2() {
914        const SCHEMA_VERSION_TABLE: TableDefinition<'static, (), u64> =
915            TableDefinition::new("STORE.SCHEMA_VERSION");
916        const RANGES_TABLE: TableDefinition<'static, &str, Vec<(u64, u64)>> =
917            TableDefinition::new("STORE.RANGES");
918
919        let db = Database::builder()
920            .create_with_backend(redb::backends::InMemoryBackend::new())
921            .unwrap();
922        let db = Arc::new(db);
923
924        // Prepare a v2 db
925        tokio::task::spawn_blocking({
926            let db = db.clone();
927            move || {
928                let tx = db.begin_write().unwrap();
929
930                {
931                    let mut schema_version_table = tx.open_table(SCHEMA_VERSION_TABLE).unwrap();
932                    schema_version_table.insert((), 2).unwrap();
933
934                    let mut ranges_table = tx.open_table(RANGES_TABLE).unwrap();
935                    ranges_table
936                        .insert(v2::SAMPLED_RANGES_KEY, vec![(123, 124)])
937                        .unwrap();
938                }
939
940                tx.commit().unwrap();
941            }
942        })
943        .await
944        .unwrap();
945
946        // Migrate and check store
947        let store = RedbStore::new(db.clone()).await.unwrap();
948        let ranges = store.get_sampled_ranges().await.unwrap();
949        assert_eq!(ranges, BlockRanges::try_from([123..=124]).unwrap());
950        store.close().await.unwrap();
951
952        // Check that old ranges were deleted
953        tokio::task::spawn_blocking({
954            let db = db.clone();
955            move || {
956                let tx = db.begin_read().unwrap();
957                let ranges_table = tx.open_table(RANGES_TABLE).unwrap();
958                assert!(ranges_table.get(v2::SAMPLED_RANGES_KEY).unwrap().is_none());
959            }
960        })
961        .await
962        .unwrap();
963    }
964
965    pub async fn create_store(path: Option<&Path>) -> RedbStore {
966        match path {
967            Some(path) => RedbStore::open(path).await.unwrap(),
968            None => RedbStore::in_memory().await.unwrap(),
969        }
970    }
971
972    pub async fn gen_filled_store(
973        amount: u64,
974        path: Option<&Path>,
975    ) -> (RedbStore, ExtendedHeaderGenerator) {
976        let s = create_store(path).await;
977        let mut gen = ExtendedHeaderGenerator::new();
978        let headers = gen.next_many(amount);
979
980        s.insert(headers).await.expect("inserting test data failed");
981
982        (s, gen)
983    }
984}