lumina_node/store/
redb_store.rs

1use std::fmt::Display;
2use std::ops::RangeInclusive;
3use std::path::Path;
4use std::pin::pin;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use celestia_types::hash::Hash;
9use celestia_types::ExtendedHeader;
10use cid::Cid;
11use redb::{
12    CommitError, Database, ReadTransaction, ReadableTable, StorageError, Table, TableDefinition,
13    TableError, TransactionError, WriteTransaction,
14};
15use tendermint_proto::Protobuf;
16use tokio::sync::Notify;
17use tokio::task::spawn_blocking;
18use tracing::warn;
19use tracing::{debug, trace};
20
21use crate::block_ranges::BlockRanges;
22use crate::store::utils::VerifiedExtendedHeaders;
23use crate::store::{
24    Result, SamplingMetadata, SamplingStatus, Store, StoreError, StoreInsertionError,
25};
26use crate::utils::Counter;
27
28use super::utils::{deserialize_extended_header, deserialize_sampling_metadata};
29
30const SCHEMA_VERSION: u64 = 2;
31
32const HEIGHTS_TABLE: TableDefinition<'static, &[u8], u64> = TableDefinition::new("STORE.HEIGHTS");
33const HEADERS_TABLE: TableDefinition<'static, u64, &[u8]> = TableDefinition::new("STORE.HEADERS");
34const SAMPLING_METADATA_TABLE: TableDefinition<'static, u64, &[u8]> =
35    TableDefinition::new("STORE.SAMPLING_METADATA");
36const SCHEMA_VERSION_TABLE: TableDefinition<'static, (), u64> =
37    TableDefinition::new("STORE.SCHEMA_VERSION");
38const RANGES_TABLE: TableDefinition<'static, &str, Vec<(u64, u64)>> =
39    TableDefinition::new("STORE.RANGES");
40
41const ACCEPTED_SAMPING_RANGES_KEY: &str = "KEY.ACCEPTED_SAMPING_RANGES";
42const HEADER_RANGES_KEY: &str = "KEY.HEADER_RANGES";
43
44/// A [`Store`] implementation based on a [`redb`] database.
45#[derive(Debug)]
46pub struct RedbStore {
47    inner: Arc<Inner>,
48    task_counter: Counter,
49}
50
51#[derive(Debug)]
52struct Inner {
53    /// Reference to the entire redb database
54    db: Arc<Database>,
55    /// Notify when a new header is added
56    header_added_notifier: Notify,
57}
58
59impl RedbStore {
60    /// Open a persistent [`redb`] store.
61    pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
62        let path = path.as_ref().to_owned();
63
64        let db = spawn_blocking(|| Database::create(path))
65            .await?
66            .map_err(|e| StoreError::OpenFailed(e.to_string()))?;
67
68        RedbStore::new(Arc::new(db)).await
69    }
70
71    /// Open an in memory [`redb`] store.
72    pub async fn in_memory() -> Result<Self> {
73        let db = Database::builder()
74            .create_with_backend(redb::backends::InMemoryBackend::new())
75            .map_err(|e| StoreError::OpenFailed(e.to_string()))?;
76
77        RedbStore::new(Arc::new(db)).await
78    }
79
80    /// Create new `RedbStore` with an already opened [`redb::Database`].
81    pub async fn new(db: Arc<Database>) -> Result<Self> {
82        let store = RedbStore {
83            inner: Arc::new(Inner {
84                db,
85                header_added_notifier: Notify::new(),
86            }),
87            task_counter: Counter::new(),
88        };
89
90        store
91            .write_tx(|tx| {
92                let mut schema_version_table = tx.open_table(SCHEMA_VERSION_TABLE)?;
93                let schema_version = schema_version_table.get(())?.map(|guard| guard.value());
94
95                match schema_version {
96                    Some(schema_version) => {
97                        if schema_version > SCHEMA_VERSION {
98                            let e = format!(
99                                "Incompatible database schema; found {}, expected {}.",
100                                schema_version, SCHEMA_VERSION
101                            );
102                            return Err(StoreError::OpenFailed(e));
103                        }
104
105                        // Do migrations
106                        migrate_v1_to_v2(tx, &mut schema_version_table)?;
107                    }
108                    None => {
109                        // New database
110                        schema_version_table.insert((), SCHEMA_VERSION)?;
111                    }
112                }
113
114                // Force us to write migrations!
115                debug_assert_eq!(
116                    schema_version_table.get(())?.map(|guard| guard.value()),
117                    Some(SCHEMA_VERSION),
118                    "Some migrations are missing"
119                );
120
121                // create tables, so that reads later don't complain
122                let _heights_table = tx.open_table(HEIGHTS_TABLE)?;
123                let _headers_table = tx.open_table(HEADERS_TABLE)?;
124                let _ranges_table = tx.open_table(RANGES_TABLE)?;
125                let _sampling_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
126
127                Ok(())
128            })
129            .await
130            .map_err(|e| match e {
131                e @ StoreError::OpenFailed(_) => e,
132                e => StoreError::OpenFailed(e.to_string()),
133            })?;
134
135        Ok(store)
136    }
137
138    /// Returns the raw [`redb::Database`].
139    ///
140    /// This is useful if you want to pass the database handle to any other
141    /// stores (e.g. [`blockstore`]).
142    pub fn raw_db(&self) -> Arc<Database> {
143        self.inner.db.clone()
144    }
145
146    /// Execute a read transaction.
147    async fn read_tx<F, T>(&self, f: F) -> Result<T>
148    where
149        F: FnOnce(&mut ReadTransaction) -> Result<T> + Send + 'static,
150        T: Send + 'static,
151    {
152        let inner = self.inner.clone();
153        let guard = self.task_counter.guard();
154
155        spawn_blocking(move || {
156            let _guard = guard;
157
158            {
159                let mut tx = inner.db.begin_read()?;
160                f(&mut tx)
161            }
162        })
163        .await?
164    }
165
166    /// Execute a write transaction.
167    ///
168    /// If closure returns an error the transaction is aborted, otherwise commited.
169    async fn write_tx<F, T>(&self, f: F) -> Result<T>
170    where
171        F: FnOnce(&mut WriteTransaction) -> Result<T> + Send + 'static,
172        T: Send + 'static,
173    {
174        let inner = self.inner.clone();
175        let guard = self.task_counter.guard();
176
177        spawn_blocking(move || {
178            let _guard = guard;
179
180            {
181                let mut tx = inner.db.begin_write()?;
182                let res = f(&mut tx);
183
184                if res.is_ok() {
185                    tx.commit()?;
186                } else {
187                    tx.abort()?;
188                }
189
190                res
191            }
192        })
193        .await?
194    }
195
196    async fn head_height(&self) -> Result<u64> {
197        self.read_tx(|tx| {
198            let table = tx.open_table(RANGES_TABLE)?;
199            let header_ranges = get_ranges(&table, HEADER_RANGES_KEY)?;
200
201            header_ranges.head().ok_or(StoreError::NotFound)
202        })
203        .await
204    }
205
206    async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
207        let hash = *hash;
208
209        self.read_tx(move |tx| {
210            let heights_table = tx.open_table(HEIGHTS_TABLE)?;
211            let headers_table = tx.open_table(HEADERS_TABLE)?;
212
213            let height = get_height(&heights_table, hash.as_bytes())?;
214            get_header(&headers_table, height)
215        })
216        .await
217    }
218
219    async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
220        self.read_tx(move |tx| {
221            let table = tx.open_table(HEADERS_TABLE)?;
222            get_header(&table, height)
223        })
224        .await
225    }
226
227    async fn get_head(&self) -> Result<ExtendedHeader> {
228        self.read_tx(|tx| {
229            let ranges_table = tx.open_table(RANGES_TABLE)?;
230            let headers_table = tx.open_table(HEADERS_TABLE)?;
231
232            let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
233            let head = header_ranges.head().ok_or(StoreError::NotFound)?;
234
235            get_header(&headers_table, head)
236        })
237        .await
238    }
239
240    async fn contains_hash(&self, hash: &Hash) -> bool {
241        let hash = *hash;
242
243        self.read_tx(move |tx| {
244            let heights_table = tx.open_table(HEIGHTS_TABLE)?;
245            let headers_table = tx.open_table(HEADERS_TABLE)?;
246
247            let height = get_height(&heights_table, hash.as_bytes())?;
248            Ok(headers_table.get(height)?.is_some())
249        })
250        .await
251        .unwrap_or(false)
252    }
253
254    async fn contains_height(&self, height: u64) -> bool {
255        self.read_tx(move |tx| {
256            let headers_table = tx.open_table(HEADERS_TABLE)?;
257            Ok(headers_table.get(height)?.is_some())
258        })
259        .await
260        .unwrap_or(false)
261    }
262
263    async fn insert<R>(&self, headers: R) -> Result<()>
264    where
265        R: TryInto<VerifiedExtendedHeaders> + Send,
266        <R as TryInto<VerifiedExtendedHeaders>>::Error: Display,
267    {
268        let headers = headers
269            .try_into()
270            .map_err(|e| StoreInsertionError::HeadersVerificationFailed(e.to_string()))?;
271
272        self.write_tx(move |tx| {
273            let (Some(head), Some(tail)) = (headers.as_ref().first(), headers.as_ref().last())
274            else {
275                return Ok(());
276            };
277
278            let mut heights_table = tx.open_table(HEIGHTS_TABLE)?;
279            let mut headers_table = tx.open_table(HEADERS_TABLE)?;
280            let mut ranges_table = tx.open_table(RANGES_TABLE)?;
281
282            let mut header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
283            let headers_range = head.height().value()..=tail.height().value();
284
285            let (prev_exists, next_exists) = header_ranges
286                .check_insertion_constraints(&headers_range)
287                .map_err(StoreInsertionError::ContraintsNotMet)?;
288
289            verify_against_neighbours(
290                &headers_table,
291                prev_exists.then_some(head),
292                next_exists.then_some(tail),
293            )?;
294
295            for header in headers {
296                let height = header.height().value();
297                let hash = header.hash();
298                let serialized_header = header.encode_vec();
299
300                if headers_table
301                    .insert(height, &serialized_header[..])?
302                    .is_some()
303                {
304                    return Err(StoreError::StoredDataError(
305                        "inconsistency between headers table and ranges table".into(),
306                    ));
307                }
308
309                if heights_table.insert(hash.as_bytes(), height)?.is_some() {
310                    // TODO: Replace this with `StoredDataError` when we implement
311                    // type-safe validation on insertion.
312                    return Err(StoreInsertionError::HashExists(hash).into());
313                }
314
315                trace!("Inserted header {hash} with height {height}");
316            }
317
318            header_ranges
319                .insert_relaxed(&headers_range)
320                .expect("invalid range");
321            set_ranges(&mut ranges_table, HEADER_RANGES_KEY, &header_ranges)?;
322
323            debug!("Inserted header range {headers_range:?}",);
324
325            Ok(())
326        })
327        .await?;
328
329        self.inner.header_added_notifier.notify_waiters();
330
331        Ok(())
332    }
333
334    async fn update_sampling_metadata(
335        &self,
336        height: u64,
337        status: SamplingStatus,
338        cids: Vec<Cid>,
339    ) -> Result<()> {
340        self.write_tx(move |tx| {
341            let mut sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
342            let mut ranges_table = tx.open_table(RANGES_TABLE)?;
343
344            let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
345            let mut sampling_ranges = get_ranges(&ranges_table, ACCEPTED_SAMPING_RANGES_KEY)?;
346
347            if !header_ranges.contains(height) {
348                return Err(StoreError::NotFound);
349            }
350
351            let previous = get_sampling_metadata(&sampling_metadata_table, height)?;
352
353            let entry = match previous {
354                Some(mut previous) => {
355                    previous.status = status;
356
357                    for cid in cids {
358                        if !previous.cids.contains(&cid) {
359                            previous.cids.push(cid);
360                        }
361                    }
362
363                    previous
364                }
365                None => SamplingMetadata { status, cids },
366            };
367
368            // make sure Result is Infallible and unwrap it later
369            let serialized = entry.encode_vec();
370
371            sampling_metadata_table.insert(height, &serialized[..])?;
372
373            match status {
374                SamplingStatus::Accepted => sampling_ranges
375                    .insert_relaxed(height..=height)
376                    .expect("invalid height"),
377                _ => sampling_ranges
378                    .remove_relaxed(height..=height)
379                    .expect("invalid height"),
380            }
381
382            set_ranges(
383                &mut ranges_table,
384                ACCEPTED_SAMPING_RANGES_KEY,
385                &sampling_ranges,
386            )?;
387
388            Ok(())
389        })
390        .await
391    }
392
393    async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
394        self.read_tx(move |tx| {
395            let headers_table = tx.open_table(HEADERS_TABLE)?;
396            let sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
397
398            if headers_table.get(height)?.is_none() {
399                return Err(StoreError::NotFound);
400            }
401
402            get_sampling_metadata(&sampling_metadata_table, height)
403        })
404        .await
405    }
406
407    async fn get_stored_ranges(&self) -> Result<BlockRanges> {
408        self.read_tx(|tx| {
409            let table = tx.open_table(RANGES_TABLE)?;
410            get_ranges(&table, HEADER_RANGES_KEY)
411        })
412        .await
413    }
414
415    async fn get_sampling_ranges(&self) -> Result<BlockRanges> {
416        self.read_tx(|tx| {
417            let table = tx.open_table(RANGES_TABLE)?;
418            get_ranges(&table, ACCEPTED_SAMPING_RANGES_KEY)
419        })
420        .await
421    }
422
423    async fn remove_height(&self, height: u64) -> Result<()> {
424        self.write_tx(move |tx| {
425            let mut heights_table = tx.open_table(HEIGHTS_TABLE)?;
426            let mut headers_table = tx.open_table(HEADERS_TABLE)?;
427            let mut ranges_table = tx.open_table(RANGES_TABLE)?;
428
429            let mut header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
430
431            if !header_ranges.contains(height) {
432                return Err(StoreError::NotFound);
433            }
434
435            header_ranges
436                .remove_relaxed(height..=height)
437                .expect("valid range never fails");
438
439            set_ranges(&mut ranges_table, HEADER_RANGES_KEY, &header_ranges)?;
440
441            let Some(header) = headers_table.remove(height)? else {
442                return Err(StoreError::StoredDataError(format!(
443                    "inconsistency between ranges and height_to_hash tables, height {height}"
444                )));
445            };
446
447            let hash = ExtendedHeader::decode(header.value())
448                .map_err(|e| StoreError::StoredDataError(e.to_string()))?
449                .hash();
450
451            if heights_table.remove(hash.as_bytes())?.is_none() {
452                return Err(StoreError::StoredDataError(format!(
453                    "inconsistency between header and height_to_hash tables, hash {hash}"
454                )));
455            }
456
457            Ok(())
458        })
459        .await
460    }
461}
462
463#[async_trait]
464impl Store for RedbStore {
465    async fn get_head(&self) -> Result<ExtendedHeader> {
466        self.get_head().await
467    }
468
469    async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
470        self.get_by_hash(hash).await
471    }
472
473    async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
474        self.get_by_height(height).await
475    }
476
477    async fn wait_new_head(&self) -> u64 {
478        let head = self.head_height().await.unwrap_or(0);
479        let mut notifier = pin!(self.inner.header_added_notifier.notified());
480
481        loop {
482            let new_head = self.head_height().await.unwrap_or(0);
483
484            if head != new_head {
485                return new_head;
486            }
487
488            // Await for a notification
489            notifier.as_mut().await;
490
491            // Reset notifier
492            notifier.set(self.inner.header_added_notifier.notified());
493        }
494    }
495
496    async fn wait_height(&self, height: u64) -> Result<()> {
497        let mut notifier = pin!(self.inner.header_added_notifier.notified());
498
499        loop {
500            if self.contains_height(height).await {
501                return Ok(());
502            }
503
504            // Await for a notification
505            notifier.as_mut().await;
506
507            // Reset notifier
508            notifier.set(self.inner.header_added_notifier.notified());
509        }
510    }
511
512    async fn head_height(&self) -> Result<u64> {
513        self.head_height().await
514    }
515
516    async fn has(&self, hash: &Hash) -> bool {
517        self.contains_hash(hash).await
518    }
519
520    async fn has_at(&self, height: u64) -> bool {
521        self.contains_height(height).await
522    }
523
524    async fn insert<R>(&self, headers: R) -> Result<()>
525    where
526        R: TryInto<VerifiedExtendedHeaders> + Send,
527        <R as TryInto<VerifiedExtendedHeaders>>::Error: Display,
528    {
529        self.insert(headers).await
530    }
531
532    async fn update_sampling_metadata(
533        &self,
534        height: u64,
535        status: SamplingStatus,
536        cids: Vec<Cid>,
537    ) -> Result<()> {
538        self.update_sampling_metadata(height, status, cids).await
539    }
540
541    async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
542        self.get_sampling_metadata(height).await
543    }
544
545    async fn get_stored_header_ranges(&self) -> Result<BlockRanges> {
546        Ok(self.get_stored_ranges().await?)
547    }
548
549    async fn get_accepted_sampling_ranges(&self) -> Result<BlockRanges> {
550        self.get_sampling_ranges().await
551    }
552
553    async fn remove_height(&self, height: u64) -> Result<()> {
554        self.remove_height(height).await
555    }
556
557    async fn close(mut self) -> Result<()> {
558        // Wait all ongoing `spawn_blocking` tasks to finish.
559        self.task_counter.wait_guards().await;
560        Ok(())
561    }
562}
563
564fn verify_against_neighbours<R>(
565    headers_table: &R,
566    lowest_header: Option<&ExtendedHeader>,
567    highest_header: Option<&ExtendedHeader>,
568) -> Result<()>
569where
570    R: ReadableTable<u64, &'static [u8]>,
571{
572    if let Some(lowest_header) = lowest_header {
573        let prev = get_header(headers_table, lowest_header.height().value() - 1).map_err(|e| {
574            if let StoreError::NotFound = e {
575                StoreError::StoredDataError("inconsistency between headers and ranges table".into())
576            } else {
577                e
578            }
579        })?;
580
581        prev.verify(lowest_header)
582            .map_err(|e| StoreInsertionError::NeighborsVerificationFailed(e.to_string()))?;
583    }
584
585    if let Some(highest_header) = highest_header {
586        let next = get_header(headers_table, highest_header.height().value() + 1).map_err(|e| {
587            if let StoreError::NotFound = e {
588                StoreError::StoredDataError("inconsistency between headers and ranges table".into())
589            } else {
590                e
591            }
592        })?;
593
594        highest_header
595            .verify(&next)
596            .map_err(|e| StoreInsertionError::NeighborsVerificationFailed(e.to_string()))?;
597    }
598
599    Ok(())
600}
601
602fn get_ranges<R>(ranges_table: &R, name: &str) -> Result<BlockRanges>
603where
604    R: ReadableTable<&'static str, Vec<(u64, u64)>>,
605{
606    let raw_ranges = ranges_table
607        .get(name)?
608        .map(|guard| {
609            guard
610                .value()
611                .iter()
612                .map(|(start, end)| *start..=*end)
613                .collect()
614        })
615        .unwrap_or_default();
616
617    BlockRanges::from_vec(raw_ranges).map_err(|e| {
618        let s = format!("Stored BlockRanges for {name} are invalid: {e}");
619        StoreError::StoredDataError(s)
620    })
621}
622
623fn set_ranges(
624    ranges_table: &mut Table<&str, Vec<(u64, u64)>>,
625    name: &str,
626    ranges: &BlockRanges,
627) -> Result<()> {
628    let raw_ranges: &[RangeInclusive<u64>] = ranges.as_ref();
629    let raw_ranges = raw_ranges
630        .iter()
631        .map(|range| (*range.start(), *range.end()))
632        .collect::<Vec<_>>();
633
634    ranges_table.insert(name, raw_ranges)?;
635
636    Ok(())
637}
638
639#[inline]
640fn get_height<R>(heights_table: &R, key: &[u8]) -> Result<u64>
641where
642    R: ReadableTable<&'static [u8], u64>,
643{
644    heights_table
645        .get(key)?
646        .map(|guard| guard.value())
647        .ok_or(StoreError::NotFound)
648}
649
650#[inline]
651fn get_header<R>(headers_table: &R, key: u64) -> Result<ExtendedHeader>
652where
653    R: ReadableTable<u64, &'static [u8]>,
654{
655    let serialized = headers_table.get(key)?.ok_or(StoreError::NotFound)?;
656    deserialize_extended_header(serialized.value())
657}
658
659#[inline]
660fn get_sampling_metadata<R>(
661    sampling_metadata_table: &R,
662    key: u64,
663) -> Result<Option<SamplingMetadata>>
664where
665    R: ReadableTable<u64, &'static [u8]>,
666{
667    sampling_metadata_table
668        .get(key)?
669        .map(|guard| deserialize_sampling_metadata(guard.value()))
670        .transpose()
671}
672
673impl From<TransactionError> for StoreError {
674    fn from(e: TransactionError) -> Self {
675        match e {
676            TransactionError::ReadTransactionStillInUse(_) => {
677                unreachable!("redb::ReadTransaction::close is never used")
678            }
679            e => StoreError::FatalDatabaseError(format!("TransactionError: {e}")),
680        }
681    }
682}
683
684impl From<TableError> for StoreError {
685    fn from(e: TableError) -> Self {
686        match e {
687            TableError::Storage(e) => e.into(),
688            TableError::TableAlreadyOpen(table, location) => {
689                panic!("Table {table} already opened from: {location}")
690            }
691            TableError::TableDoesNotExist(table) => {
692                panic!("Table {table} was not created on initialization")
693            }
694            e => StoreError::StoredDataError(format!("TableError: {e}")),
695        }
696    }
697}
698
699impl From<StorageError> for StoreError {
700    fn from(e: StorageError) -> Self {
701        match e {
702            StorageError::ValueTooLarge(_) => {
703                unreachable!("redb::Table::insert_reserve is never used")
704            }
705            e => StoreError::FatalDatabaseError(format!("StorageError: {e}")),
706        }
707    }
708}
709
710impl From<CommitError> for StoreError {
711    fn from(e: CommitError) -> Self {
712        StoreError::FatalDatabaseError(format!("CommitError: {e}"))
713    }
714}
715
716fn migrate_v1_to_v2(
717    tx: &WriteTransaction,
718    schema_version_table: &mut Table<(), u64>,
719) -> Result<()> {
720    const HEADER_HEIGHT_RANGES: TableDefinition<'static, u64, (u64, u64)> =
721        TableDefinition::new("STORE.HEIGHT_RANGES");
722
723    let schema_version = schema_version_table.get(())?.map(|guard| guard.value());
724
725    // We only migrate from v1
726    if schema_version != Some(1) {
727        return Ok(());
728    }
729
730    warn!("Migrating DB schema from v1 to v2");
731
732    let header_ranges_table = tx.open_table(HEADER_HEIGHT_RANGES)?;
733    let mut ranges_table = tx.open_table(RANGES_TABLE)?;
734
735    let raw_ranges = header_ranges_table
736        .iter()?
737        .map(|range_guard| {
738            let range = range_guard?.1.value();
739            Ok((range.0, range.1))
740        })
741        .collect::<Result<Vec<_>>>()?;
742
743    tx.delete_table(header_ranges_table)?;
744    ranges_table.insert(HEADER_RANGES_KEY, raw_ranges)?;
745
746    // Migrated to v2
747    schema_version_table.insert((), 2)?;
748
749    Ok(())
750}
751
752#[cfg(test)]
753pub mod tests {
754    use super::*;
755    use crate::test_utils::ExtendedHeaderGeneratorExt;
756    use celestia_types::test_utils::ExtendedHeaderGenerator;
757    use std::path::Path;
758    use tempfile::TempDir;
759
760    #[tokio::test]
761    async fn test_store_persistence() {
762        let db_dir = TempDir::with_prefix("lumina.store.test").unwrap();
763        let db = db_dir.path().join("db");
764
765        let (original_store, mut gen) = gen_filled_store(0, Some(&db)).await;
766        let mut original_headers = gen.next_many(20);
767
768        original_store
769            .insert(original_headers.clone())
770            .await
771            .expect("inserting test data failed");
772        drop(original_store);
773
774        let reopened_store = create_store(Some(&db)).await;
775
776        assert_eq!(
777            original_headers.last().unwrap().height().value(),
778            reopened_store.head_height().await.unwrap()
779        );
780        for original_header in &original_headers {
781            let stored_header = reopened_store
782                .get_by_height(original_header.height().value())
783                .await
784                .unwrap();
785            assert_eq!(original_header, &stored_header);
786        }
787
788        let mut new_headers = gen.next_many(10);
789        reopened_store
790            .insert(new_headers.clone())
791            .await
792            .expect("failed to insert data");
793        drop(reopened_store);
794
795        original_headers.append(&mut new_headers);
796
797        let reopened_store = create_store(Some(&db)).await;
798        assert_eq!(
799            original_headers.last().unwrap().height().value(),
800            reopened_store.head_height().await.unwrap()
801        );
802        for original_header in &original_headers {
803            let stored_header = reopened_store
804                .get_by_height(original_header.height().value())
805                .await
806                .unwrap();
807            assert_eq!(original_header, &stored_header);
808        }
809    }
810
811    #[tokio::test]
812    async fn test_separate_stores() {
813        let (store0, mut gen0) = gen_filled_store(0, None).await;
814        let store1 = create_store(None).await;
815
816        let headers = gen0.next_many(10);
817        store0.insert(headers.clone()).await.unwrap();
818        store1.insert(headers).await.unwrap();
819
820        let mut gen1 = gen0.fork();
821
822        store0.insert(gen0.next_many_verified(5)).await.unwrap();
823        store1.insert(gen1.next_many_verified(6)).await.unwrap();
824
825        assert_eq!(
826            store0.get_by_height(10).await.unwrap(),
827            store1.get_by_height(10).await.unwrap()
828        );
829        assert_ne!(
830            store0.get_by_height(11).await.unwrap(),
831            store1.get_by_height(11).await.unwrap()
832        );
833
834        assert_eq!(store0.head_height().await.unwrap(), 15);
835        assert_eq!(store1.head_height().await.unwrap(), 16);
836    }
837
838    pub async fn create_store(path: Option<&Path>) -> RedbStore {
839        match path {
840            Some(path) => RedbStore::open(path).await.unwrap(),
841            None => RedbStore::in_memory().await.unwrap(),
842        }
843    }
844
845    pub async fn gen_filled_store(
846        amount: u64,
847        path: Option<&Path>,
848    ) -> (RedbStore, ExtendedHeaderGenerator) {
849        let s = create_store(path).await;
850        let mut gen = ExtendedHeaderGenerator::new();
851        let headers = gen.next_many(amount);
852
853        s.insert(headers).await.expect("inserting test data failed");
854
855        (s, gen)
856    }
857}