1use std::fmt::Display;
2use std::ops::RangeInclusive;
3use std::path::Path;
4use std::pin::pin;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use celestia_types::hash::Hash;
9use celestia_types::ExtendedHeader;
10use cid::Cid;
11use redb::{
12 CommitError, Database, ReadTransaction, ReadableTable, StorageError, Table, TableDefinition,
13 TableError, TransactionError, WriteTransaction,
14};
15use tendermint_proto::Protobuf;
16use tokio::sync::Notify;
17use tokio::task::spawn_blocking;
18use tracing::warn;
19use tracing::{debug, trace};
20
21use crate::block_ranges::BlockRanges;
22use crate::store::utils::VerifiedExtendedHeaders;
23use crate::store::{
24 Result, SamplingMetadata, SamplingStatus, Store, StoreError, StoreInsertionError,
25};
26use crate::utils::Counter;
27
28use super::utils::{deserialize_extended_header, deserialize_sampling_metadata};
29
30const SCHEMA_VERSION: u64 = 2;
31
32const HEIGHTS_TABLE: TableDefinition<'static, &[u8], u64> = TableDefinition::new("STORE.HEIGHTS");
33const HEADERS_TABLE: TableDefinition<'static, u64, &[u8]> = TableDefinition::new("STORE.HEADERS");
34const SAMPLING_METADATA_TABLE: TableDefinition<'static, u64, &[u8]> =
35 TableDefinition::new("STORE.SAMPLING_METADATA");
36const SCHEMA_VERSION_TABLE: TableDefinition<'static, (), u64> =
37 TableDefinition::new("STORE.SCHEMA_VERSION");
38const RANGES_TABLE: TableDefinition<'static, &str, Vec<(u64, u64)>> =
39 TableDefinition::new("STORE.RANGES");
40
41const ACCEPTED_SAMPING_RANGES_KEY: &str = "KEY.ACCEPTED_SAMPING_RANGES";
42const HEADER_RANGES_KEY: &str = "KEY.HEADER_RANGES";
43
44#[derive(Debug)]
46pub struct RedbStore {
47 inner: Arc<Inner>,
48 task_counter: Counter,
49}
50
51#[derive(Debug)]
52struct Inner {
53 db: Arc<Database>,
55 header_added_notifier: Notify,
57}
58
59impl RedbStore {
60 pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
62 let path = path.as_ref().to_owned();
63
64 let db = spawn_blocking(|| Database::create(path))
65 .await?
66 .map_err(|e| StoreError::OpenFailed(e.to_string()))?;
67
68 RedbStore::new(Arc::new(db)).await
69 }
70
71 pub async fn in_memory() -> Result<Self> {
73 let db = Database::builder()
74 .create_with_backend(redb::backends::InMemoryBackend::new())
75 .map_err(|e| StoreError::OpenFailed(e.to_string()))?;
76
77 RedbStore::new(Arc::new(db)).await
78 }
79
80 pub async fn new(db: Arc<Database>) -> Result<Self> {
82 let store = RedbStore {
83 inner: Arc::new(Inner {
84 db,
85 header_added_notifier: Notify::new(),
86 }),
87 task_counter: Counter::new(),
88 };
89
90 store
91 .write_tx(|tx| {
92 let mut schema_version_table = tx.open_table(SCHEMA_VERSION_TABLE)?;
93 let schema_version = schema_version_table.get(())?.map(|guard| guard.value());
94
95 match schema_version {
96 Some(schema_version) => {
97 if schema_version > SCHEMA_VERSION {
98 let e = format!(
99 "Incompatible database schema; found {}, expected {}.",
100 schema_version, SCHEMA_VERSION
101 );
102 return Err(StoreError::OpenFailed(e));
103 }
104
105 migrate_v1_to_v2(tx, &mut schema_version_table)?;
107 }
108 None => {
109 schema_version_table.insert((), SCHEMA_VERSION)?;
111 }
112 }
113
114 debug_assert_eq!(
116 schema_version_table.get(())?.map(|guard| guard.value()),
117 Some(SCHEMA_VERSION),
118 "Some migrations are missing"
119 );
120
121 let _heights_table = tx.open_table(HEIGHTS_TABLE)?;
123 let _headers_table = tx.open_table(HEADERS_TABLE)?;
124 let _ranges_table = tx.open_table(RANGES_TABLE)?;
125 let _sampling_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
126
127 Ok(())
128 })
129 .await
130 .map_err(|e| match e {
131 e @ StoreError::OpenFailed(_) => e,
132 e => StoreError::OpenFailed(e.to_string()),
133 })?;
134
135 Ok(store)
136 }
137
138 pub fn raw_db(&self) -> Arc<Database> {
143 self.inner.db.clone()
144 }
145
146 async fn read_tx<F, T>(&self, f: F) -> Result<T>
148 where
149 F: FnOnce(&mut ReadTransaction) -> Result<T> + Send + 'static,
150 T: Send + 'static,
151 {
152 let inner = self.inner.clone();
153 let guard = self.task_counter.guard();
154
155 spawn_blocking(move || {
156 let _guard = guard;
157
158 {
159 let mut tx = inner.db.begin_read()?;
160 f(&mut tx)
161 }
162 })
163 .await?
164 }
165
166 async fn write_tx<F, T>(&self, f: F) -> Result<T>
170 where
171 F: FnOnce(&mut WriteTransaction) -> Result<T> + Send + 'static,
172 T: Send + 'static,
173 {
174 let inner = self.inner.clone();
175 let guard = self.task_counter.guard();
176
177 spawn_blocking(move || {
178 let _guard = guard;
179
180 {
181 let mut tx = inner.db.begin_write()?;
182 let res = f(&mut tx);
183
184 if res.is_ok() {
185 tx.commit()?;
186 } else {
187 tx.abort()?;
188 }
189
190 res
191 }
192 })
193 .await?
194 }
195
196 async fn head_height(&self) -> Result<u64> {
197 self.read_tx(|tx| {
198 let table = tx.open_table(RANGES_TABLE)?;
199 let header_ranges = get_ranges(&table, HEADER_RANGES_KEY)?;
200
201 header_ranges.head().ok_or(StoreError::NotFound)
202 })
203 .await
204 }
205
206 async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
207 let hash = *hash;
208
209 self.read_tx(move |tx| {
210 let heights_table = tx.open_table(HEIGHTS_TABLE)?;
211 let headers_table = tx.open_table(HEADERS_TABLE)?;
212
213 let height = get_height(&heights_table, hash.as_bytes())?;
214 get_header(&headers_table, height)
215 })
216 .await
217 }
218
219 async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
220 self.read_tx(move |tx| {
221 let table = tx.open_table(HEADERS_TABLE)?;
222 get_header(&table, height)
223 })
224 .await
225 }
226
227 async fn get_head(&self) -> Result<ExtendedHeader> {
228 self.read_tx(|tx| {
229 let ranges_table = tx.open_table(RANGES_TABLE)?;
230 let headers_table = tx.open_table(HEADERS_TABLE)?;
231
232 let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
233 let head = header_ranges.head().ok_or(StoreError::NotFound)?;
234
235 get_header(&headers_table, head)
236 })
237 .await
238 }
239
240 async fn contains_hash(&self, hash: &Hash) -> bool {
241 let hash = *hash;
242
243 self.read_tx(move |tx| {
244 let heights_table = tx.open_table(HEIGHTS_TABLE)?;
245 let headers_table = tx.open_table(HEADERS_TABLE)?;
246
247 let height = get_height(&heights_table, hash.as_bytes())?;
248 Ok(headers_table.get(height)?.is_some())
249 })
250 .await
251 .unwrap_or(false)
252 }
253
254 async fn contains_height(&self, height: u64) -> bool {
255 self.read_tx(move |tx| {
256 let headers_table = tx.open_table(HEADERS_TABLE)?;
257 Ok(headers_table.get(height)?.is_some())
258 })
259 .await
260 .unwrap_or(false)
261 }
262
263 async fn insert<R>(&self, headers: R) -> Result<()>
264 where
265 R: TryInto<VerifiedExtendedHeaders> + Send,
266 <R as TryInto<VerifiedExtendedHeaders>>::Error: Display,
267 {
268 let headers = headers
269 .try_into()
270 .map_err(|e| StoreInsertionError::HeadersVerificationFailed(e.to_string()))?;
271
272 self.write_tx(move |tx| {
273 let (Some(head), Some(tail)) = (headers.as_ref().first(), headers.as_ref().last())
274 else {
275 return Ok(());
276 };
277
278 let mut heights_table = tx.open_table(HEIGHTS_TABLE)?;
279 let mut headers_table = tx.open_table(HEADERS_TABLE)?;
280 let mut ranges_table = tx.open_table(RANGES_TABLE)?;
281
282 let mut header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
283 let headers_range = head.height().value()..=tail.height().value();
284
285 let (prev_exists, next_exists) = header_ranges
286 .check_insertion_constraints(&headers_range)
287 .map_err(StoreInsertionError::ContraintsNotMet)?;
288
289 verify_against_neighbours(
290 &headers_table,
291 prev_exists.then_some(head),
292 next_exists.then_some(tail),
293 )?;
294
295 for header in headers {
296 let height = header.height().value();
297 let hash = header.hash();
298 let serialized_header = header.encode_vec();
299
300 if headers_table
301 .insert(height, &serialized_header[..])?
302 .is_some()
303 {
304 return Err(StoreError::StoredDataError(
305 "inconsistency between headers table and ranges table".into(),
306 ));
307 }
308
309 if heights_table.insert(hash.as_bytes(), height)?.is_some() {
310 return Err(StoreInsertionError::HashExists(hash).into());
313 }
314
315 trace!("Inserted header {hash} with height {height}");
316 }
317
318 header_ranges
319 .insert_relaxed(&headers_range)
320 .expect("invalid range");
321 set_ranges(&mut ranges_table, HEADER_RANGES_KEY, &header_ranges)?;
322
323 debug!("Inserted header range {headers_range:?}",);
324
325 Ok(())
326 })
327 .await?;
328
329 self.inner.header_added_notifier.notify_waiters();
330
331 Ok(())
332 }
333
334 async fn update_sampling_metadata(
335 &self,
336 height: u64,
337 status: SamplingStatus,
338 cids: Vec<Cid>,
339 ) -> Result<()> {
340 self.write_tx(move |tx| {
341 let mut sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
342 let mut ranges_table = tx.open_table(RANGES_TABLE)?;
343
344 let header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
345 let mut sampling_ranges = get_ranges(&ranges_table, ACCEPTED_SAMPING_RANGES_KEY)?;
346
347 if !header_ranges.contains(height) {
348 return Err(StoreError::NotFound);
349 }
350
351 let previous = get_sampling_metadata(&sampling_metadata_table, height)?;
352
353 let entry = match previous {
354 Some(mut previous) => {
355 previous.status = status;
356
357 for cid in cids {
358 if !previous.cids.contains(&cid) {
359 previous.cids.push(cid);
360 }
361 }
362
363 previous
364 }
365 None => SamplingMetadata { status, cids },
366 };
367
368 let serialized = entry.encode_vec();
370
371 sampling_metadata_table.insert(height, &serialized[..])?;
372
373 match status {
374 SamplingStatus::Accepted => sampling_ranges
375 .insert_relaxed(height..=height)
376 .expect("invalid height"),
377 _ => sampling_ranges
378 .remove_relaxed(height..=height)
379 .expect("invalid height"),
380 }
381
382 set_ranges(
383 &mut ranges_table,
384 ACCEPTED_SAMPING_RANGES_KEY,
385 &sampling_ranges,
386 )?;
387
388 Ok(())
389 })
390 .await
391 }
392
393 async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
394 self.read_tx(move |tx| {
395 let headers_table = tx.open_table(HEADERS_TABLE)?;
396 let sampling_metadata_table = tx.open_table(SAMPLING_METADATA_TABLE)?;
397
398 if headers_table.get(height)?.is_none() {
399 return Err(StoreError::NotFound);
400 }
401
402 get_sampling_metadata(&sampling_metadata_table, height)
403 })
404 .await
405 }
406
407 async fn get_stored_ranges(&self) -> Result<BlockRanges> {
408 self.read_tx(|tx| {
409 let table = tx.open_table(RANGES_TABLE)?;
410 get_ranges(&table, HEADER_RANGES_KEY)
411 })
412 .await
413 }
414
415 async fn get_sampling_ranges(&self) -> Result<BlockRanges> {
416 self.read_tx(|tx| {
417 let table = tx.open_table(RANGES_TABLE)?;
418 get_ranges(&table, ACCEPTED_SAMPING_RANGES_KEY)
419 })
420 .await
421 }
422
423 async fn remove_height(&self, height: u64) -> Result<()> {
424 self.write_tx(move |tx| {
425 let mut heights_table = tx.open_table(HEIGHTS_TABLE)?;
426 let mut headers_table = tx.open_table(HEADERS_TABLE)?;
427 let mut ranges_table = tx.open_table(RANGES_TABLE)?;
428
429 let mut header_ranges = get_ranges(&ranges_table, HEADER_RANGES_KEY)?;
430
431 if !header_ranges.contains(height) {
432 return Err(StoreError::NotFound);
433 }
434
435 header_ranges
436 .remove_relaxed(height..=height)
437 .expect("valid range never fails");
438
439 set_ranges(&mut ranges_table, HEADER_RANGES_KEY, &header_ranges)?;
440
441 let Some(header) = headers_table.remove(height)? else {
442 return Err(StoreError::StoredDataError(format!(
443 "inconsistency between ranges and height_to_hash tables, height {height}"
444 )));
445 };
446
447 let hash = ExtendedHeader::decode(header.value())
448 .map_err(|e| StoreError::StoredDataError(e.to_string()))?
449 .hash();
450
451 if heights_table.remove(hash.as_bytes())?.is_none() {
452 return Err(StoreError::StoredDataError(format!(
453 "inconsistency between header and height_to_hash tables, hash {hash}"
454 )));
455 }
456
457 Ok(())
458 })
459 .await
460 }
461}
462
463#[async_trait]
464impl Store for RedbStore {
465 async fn get_head(&self) -> Result<ExtendedHeader> {
466 self.get_head().await
467 }
468
469 async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
470 self.get_by_hash(hash).await
471 }
472
473 async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
474 self.get_by_height(height).await
475 }
476
477 async fn wait_new_head(&self) -> u64 {
478 let head = self.head_height().await.unwrap_or(0);
479 let mut notifier = pin!(self.inner.header_added_notifier.notified());
480
481 loop {
482 let new_head = self.head_height().await.unwrap_or(0);
483
484 if head != new_head {
485 return new_head;
486 }
487
488 notifier.as_mut().await;
490
491 notifier.set(self.inner.header_added_notifier.notified());
493 }
494 }
495
496 async fn wait_height(&self, height: u64) -> Result<()> {
497 let mut notifier = pin!(self.inner.header_added_notifier.notified());
498
499 loop {
500 if self.contains_height(height).await {
501 return Ok(());
502 }
503
504 notifier.as_mut().await;
506
507 notifier.set(self.inner.header_added_notifier.notified());
509 }
510 }
511
512 async fn head_height(&self) -> Result<u64> {
513 self.head_height().await
514 }
515
516 async fn has(&self, hash: &Hash) -> bool {
517 self.contains_hash(hash).await
518 }
519
520 async fn has_at(&self, height: u64) -> bool {
521 self.contains_height(height).await
522 }
523
524 async fn insert<R>(&self, headers: R) -> Result<()>
525 where
526 R: TryInto<VerifiedExtendedHeaders> + Send,
527 <R as TryInto<VerifiedExtendedHeaders>>::Error: Display,
528 {
529 self.insert(headers).await
530 }
531
532 async fn update_sampling_metadata(
533 &self,
534 height: u64,
535 status: SamplingStatus,
536 cids: Vec<Cid>,
537 ) -> Result<()> {
538 self.update_sampling_metadata(height, status, cids).await
539 }
540
541 async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
542 self.get_sampling_metadata(height).await
543 }
544
545 async fn get_stored_header_ranges(&self) -> Result<BlockRanges> {
546 Ok(self.get_stored_ranges().await?)
547 }
548
549 async fn get_accepted_sampling_ranges(&self) -> Result<BlockRanges> {
550 self.get_sampling_ranges().await
551 }
552
553 async fn remove_height(&self, height: u64) -> Result<()> {
554 self.remove_height(height).await
555 }
556
557 async fn close(mut self) -> Result<()> {
558 self.task_counter.wait_guards().await;
560 Ok(())
561 }
562}
563
564fn verify_against_neighbours<R>(
565 headers_table: &R,
566 lowest_header: Option<&ExtendedHeader>,
567 highest_header: Option<&ExtendedHeader>,
568) -> Result<()>
569where
570 R: ReadableTable<u64, &'static [u8]>,
571{
572 if let Some(lowest_header) = lowest_header {
573 let prev = get_header(headers_table, lowest_header.height().value() - 1).map_err(|e| {
574 if let StoreError::NotFound = e {
575 StoreError::StoredDataError("inconsistency between headers and ranges table".into())
576 } else {
577 e
578 }
579 })?;
580
581 prev.verify(lowest_header)
582 .map_err(|e| StoreInsertionError::NeighborsVerificationFailed(e.to_string()))?;
583 }
584
585 if let Some(highest_header) = highest_header {
586 let next = get_header(headers_table, highest_header.height().value() + 1).map_err(|e| {
587 if let StoreError::NotFound = e {
588 StoreError::StoredDataError("inconsistency between headers and ranges table".into())
589 } else {
590 e
591 }
592 })?;
593
594 highest_header
595 .verify(&next)
596 .map_err(|e| StoreInsertionError::NeighborsVerificationFailed(e.to_string()))?;
597 }
598
599 Ok(())
600}
601
602fn get_ranges<R>(ranges_table: &R, name: &str) -> Result<BlockRanges>
603where
604 R: ReadableTable<&'static str, Vec<(u64, u64)>>,
605{
606 let raw_ranges = ranges_table
607 .get(name)?
608 .map(|guard| {
609 guard
610 .value()
611 .iter()
612 .map(|(start, end)| *start..=*end)
613 .collect()
614 })
615 .unwrap_or_default();
616
617 BlockRanges::from_vec(raw_ranges).map_err(|e| {
618 let s = format!("Stored BlockRanges for {name} are invalid: {e}");
619 StoreError::StoredDataError(s)
620 })
621}
622
623fn set_ranges(
624 ranges_table: &mut Table<&str, Vec<(u64, u64)>>,
625 name: &str,
626 ranges: &BlockRanges,
627) -> Result<()> {
628 let raw_ranges: &[RangeInclusive<u64>] = ranges.as_ref();
629 let raw_ranges = raw_ranges
630 .iter()
631 .map(|range| (*range.start(), *range.end()))
632 .collect::<Vec<_>>();
633
634 ranges_table.insert(name, raw_ranges)?;
635
636 Ok(())
637}
638
639#[inline]
640fn get_height<R>(heights_table: &R, key: &[u8]) -> Result<u64>
641where
642 R: ReadableTable<&'static [u8], u64>,
643{
644 heights_table
645 .get(key)?
646 .map(|guard| guard.value())
647 .ok_or(StoreError::NotFound)
648}
649
650#[inline]
651fn get_header<R>(headers_table: &R, key: u64) -> Result<ExtendedHeader>
652where
653 R: ReadableTable<u64, &'static [u8]>,
654{
655 let serialized = headers_table.get(key)?.ok_or(StoreError::NotFound)?;
656 deserialize_extended_header(serialized.value())
657}
658
659#[inline]
660fn get_sampling_metadata<R>(
661 sampling_metadata_table: &R,
662 key: u64,
663) -> Result<Option<SamplingMetadata>>
664where
665 R: ReadableTable<u64, &'static [u8]>,
666{
667 sampling_metadata_table
668 .get(key)?
669 .map(|guard| deserialize_sampling_metadata(guard.value()))
670 .transpose()
671}
672
673impl From<TransactionError> for StoreError {
674 fn from(e: TransactionError) -> Self {
675 match e {
676 TransactionError::ReadTransactionStillInUse(_) => {
677 unreachable!("redb::ReadTransaction::close is never used")
678 }
679 e => StoreError::FatalDatabaseError(format!("TransactionError: {e}")),
680 }
681 }
682}
683
684impl From<TableError> for StoreError {
685 fn from(e: TableError) -> Self {
686 match e {
687 TableError::Storage(e) => e.into(),
688 TableError::TableAlreadyOpen(table, location) => {
689 panic!("Table {table} already opened from: {location}")
690 }
691 TableError::TableDoesNotExist(table) => {
692 panic!("Table {table} was not created on initialization")
693 }
694 e => StoreError::StoredDataError(format!("TableError: {e}")),
695 }
696 }
697}
698
699impl From<StorageError> for StoreError {
700 fn from(e: StorageError) -> Self {
701 match e {
702 StorageError::ValueTooLarge(_) => {
703 unreachable!("redb::Table::insert_reserve is never used")
704 }
705 e => StoreError::FatalDatabaseError(format!("StorageError: {e}")),
706 }
707 }
708}
709
710impl From<CommitError> for StoreError {
711 fn from(e: CommitError) -> Self {
712 StoreError::FatalDatabaseError(format!("CommitError: {e}"))
713 }
714}
715
716fn migrate_v1_to_v2(
717 tx: &WriteTransaction,
718 schema_version_table: &mut Table<(), u64>,
719) -> Result<()> {
720 const HEADER_HEIGHT_RANGES: TableDefinition<'static, u64, (u64, u64)> =
721 TableDefinition::new("STORE.HEIGHT_RANGES");
722
723 let schema_version = schema_version_table.get(())?.map(|guard| guard.value());
724
725 if schema_version != Some(1) {
727 return Ok(());
728 }
729
730 warn!("Migrating DB schema from v1 to v2");
731
732 let header_ranges_table = tx.open_table(HEADER_HEIGHT_RANGES)?;
733 let mut ranges_table = tx.open_table(RANGES_TABLE)?;
734
735 let raw_ranges = header_ranges_table
736 .iter()?
737 .map(|range_guard| {
738 let range = range_guard?.1.value();
739 Ok((range.0, range.1))
740 })
741 .collect::<Result<Vec<_>>>()?;
742
743 tx.delete_table(header_ranges_table)?;
744 ranges_table.insert(HEADER_RANGES_KEY, raw_ranges)?;
745
746 schema_version_table.insert((), 2)?;
748
749 Ok(())
750}
751
752#[cfg(test)]
753pub mod tests {
754 use super::*;
755 use crate::test_utils::ExtendedHeaderGeneratorExt;
756 use celestia_types::test_utils::ExtendedHeaderGenerator;
757 use std::path::Path;
758 use tempfile::TempDir;
759
760 #[tokio::test]
761 async fn test_store_persistence() {
762 let db_dir = TempDir::with_prefix("lumina.store.test").unwrap();
763 let db = db_dir.path().join("db");
764
765 let (original_store, mut gen) = gen_filled_store(0, Some(&db)).await;
766 let mut original_headers = gen.next_many(20);
767
768 original_store
769 .insert(original_headers.clone())
770 .await
771 .expect("inserting test data failed");
772 drop(original_store);
773
774 let reopened_store = create_store(Some(&db)).await;
775
776 assert_eq!(
777 original_headers.last().unwrap().height().value(),
778 reopened_store.head_height().await.unwrap()
779 );
780 for original_header in &original_headers {
781 let stored_header = reopened_store
782 .get_by_height(original_header.height().value())
783 .await
784 .unwrap();
785 assert_eq!(original_header, &stored_header);
786 }
787
788 let mut new_headers = gen.next_many(10);
789 reopened_store
790 .insert(new_headers.clone())
791 .await
792 .expect("failed to insert data");
793 drop(reopened_store);
794
795 original_headers.append(&mut new_headers);
796
797 let reopened_store = create_store(Some(&db)).await;
798 assert_eq!(
799 original_headers.last().unwrap().height().value(),
800 reopened_store.head_height().await.unwrap()
801 );
802 for original_header in &original_headers {
803 let stored_header = reopened_store
804 .get_by_height(original_header.height().value())
805 .await
806 .unwrap();
807 assert_eq!(original_header, &stored_header);
808 }
809 }
810
811 #[tokio::test]
812 async fn test_separate_stores() {
813 let (store0, mut gen0) = gen_filled_store(0, None).await;
814 let store1 = create_store(None).await;
815
816 let headers = gen0.next_many(10);
817 store0.insert(headers.clone()).await.unwrap();
818 store1.insert(headers).await.unwrap();
819
820 let mut gen1 = gen0.fork();
821
822 store0.insert(gen0.next_many_verified(5)).await.unwrap();
823 store1.insert(gen1.next_many_verified(6)).await.unwrap();
824
825 assert_eq!(
826 store0.get_by_height(10).await.unwrap(),
827 store1.get_by_height(10).await.unwrap()
828 );
829 assert_ne!(
830 store0.get_by_height(11).await.unwrap(),
831 store1.get_by_height(11).await.unwrap()
832 );
833
834 assert_eq!(store0.head_height().await.unwrap(), 15);
835 assert_eq!(store1.head_height().await.unwrap(), 16);
836 }
837
838 pub async fn create_store(path: Option<&Path>) -> RedbStore {
839 match path {
840 Some(path) => RedbStore::open(path).await.unwrap(),
841 None => RedbStore::in_memory().await.unwrap(),
842 }
843 }
844
845 pub async fn gen_filled_store(
846 amount: u64,
847 path: Option<&Path>,
848 ) -> (RedbStore, ExtendedHeaderGenerator) {
849 let s = create_store(path).await;
850 let mut gen = ExtendedHeaderGenerator::new();
851 let headers = gen.next_many(amount);
852
853 s.insert(headers).await.expect("inserting test data failed");
854
855 (s, gen)
856 }
857}