lumina_node/
store.rs

1//! Primitives related to the [`ExtendedHeader`] storage.
2
3use std::convert::Infallible;
4use std::fmt::{Debug, Display};
5use std::io::Cursor;
6use std::ops::{Bound, RangeBounds, RangeInclusive};
7
8use async_trait::async_trait;
9use celestia_types::hash::Hash;
10use celestia_types::ExtendedHeader;
11use cid::Cid;
12use prost::Message;
13use serde::{Deserialize, Serialize};
14use tendermint_proto::Protobuf;
15use thiserror::Error;
16#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
17use wasm_bindgen::prelude::*;
18
19pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError};
20pub use crate::store::either_store::EitherStore;
21pub use crate::store::utils::VerifiedExtendedHeaders;
22
23pub use in_memory_store::InMemoryStore;
24#[cfg(target_arch = "wasm32")]
25pub use indexed_db_store::IndexedDbStore;
26#[cfg(not(target_arch = "wasm32"))]
27pub use redb_store::RedbStore;
28
29mod either_store;
30mod in_memory_store;
31#[cfg(target_arch = "wasm32")]
32mod indexed_db_store;
33#[cfg(not(target_arch = "wasm32"))]
34mod redb_store;
35
36pub(crate) mod utils;
37
38/// Sampling metadata for a block.
39///
40/// This struct persists DAS-ing information in a header store for future reference.
41#[derive(Debug, Default, Clone, Serialize, Deserialize)]
42#[cfg_attr(all(feature = "wasm-bindgen", target_arch = "wasm32"), wasm_bindgen)]
43pub struct SamplingMetadata {
44    /// Indicates whether this node was able to successfuly sample the block
45    pub status: SamplingStatus,
46
47    /// List of CIDs used while sampling. Can be used to remove associated data
48    /// from Blockstore, when cleaning up the old ExtendedHeaders
49    #[cfg_attr(
50        all(feature = "wasm-bindgen", target_arch = "wasm32"),
51        wasm_bindgen(skip)
52    )]
53    pub cids: Vec<Cid>,
54}
55
56/// Sampling status for a block.
57#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58#[cfg_attr(all(feature = "wasm-bindgen", target_arch = "wasm32"), wasm_bindgen)]
59pub enum SamplingStatus {
60    /// Sampling is not done.
61    #[default]
62    Unknown,
63    /// Sampling is done and block is accepted.
64    Accepted,
65    /// Sampling is done and block is rejected.
66    Rejected,
67}
68
69type Result<T, E = StoreError> = std::result::Result<T, E>;
70
71/// An asynchronous [`ExtendedHeader`] storage.
72///
73/// Currently it is required that all the headers are inserted to the storage
74/// in order, starting from the genesis.
75#[async_trait]
76pub trait Store: Send + Sync + Debug {
77    /// Returns the [`ExtendedHeader`] with the highest height.
78    async fn get_head(&self) -> Result<ExtendedHeader>;
79
80    /// Returns the header of a specific hash.
81    async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader>;
82
83    /// Returns the header of a specific height.
84    async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader>;
85
86    /// Returns when new head is available in the `Store`.
87    async fn wait_new_head(&self) -> u64;
88
89    /// Returns when `height` is available in the `Store`.
90    async fn wait_height(&self, height: u64) -> Result<()>;
91
92    /// Returns the headers from the given heights range.
93    ///
94    /// If start of the range is unbounded, the first returned header will be of height 1.
95    /// If end of the range is unbounded, the last returned header will be the last header in the
96    /// store.
97    ///
98    /// # Errors
99    ///
100    /// If range contains a height of a header that is not found in the store or [`RangeBounds`]
101    /// cannot be converted to a valid range.
102    async fn get_range<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
103    where
104        R: RangeBounds<u64> + Send,
105    {
106        let head_height = self.head_height().await?;
107        let range = to_headers_range(range, head_height)?;
108
109        let amount = if range.is_empty() {
110            0
111        } else {
112            range.end() - range.start() + 1 // add one as it's inclusive
113        };
114
115        let mut headers = Vec::with_capacity(amount.try_into().unwrap_or(usize::MAX));
116
117        for height in range {
118            let header = self.get_by_height(height).await?;
119            headers.push(header);
120        }
121
122        Ok(headers)
123    }
124
125    /// Returns the highest known height.
126    async fn head_height(&self) -> Result<u64>;
127
128    /// Returns true if hash exists in the store.
129    async fn has(&self, hash: &Hash) -> bool;
130
131    /// Returns true if height exists in the store.
132    async fn has_at(&self, height: u64) -> bool;
133
134    /// Sets or updates sampling result for the header.
135    ///
136    /// In case of update, provided CID list is appended onto the existing one, as not to lose
137    /// references to previously sampled blocks.
138    async fn update_sampling_metadata(
139        &self,
140        height: u64,
141        status: SamplingStatus,
142        cids: Vec<Cid>,
143    ) -> Result<()>;
144
145    /// Gets the sampling metadata for the height.
146    ///
147    /// `Err(StoreError::NotFound)` indicates that both header **and** sampling metadata for the requested
148    /// height are not in the store.
149    ///
150    /// `Ok(None)` indicates that header is in the store but sampling metadata is not set yet.
151    async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>>;
152
153    /// Insert a range of headers into the store.
154    ///
155    /// New insertion should pass all the constraints in [`BlockRanges::check_insertion_constraints`],
156    /// additionaly it should be [`ExtendedHeader::verify`]ed against neighbor headers.
157    async fn insert<R>(&self, headers: R) -> Result<()>
158    where
159        R: TryInto<VerifiedExtendedHeaders> + Send,
160        <R as TryInto<VerifiedExtendedHeaders>>::Error: Display;
161
162    /// Returns a list of header ranges currenty held in store.
163    async fn get_stored_header_ranges(&self) -> Result<BlockRanges>;
164
165    /// Returns a list of accepted sampling ranges currently held in store.
166    async fn get_accepted_sampling_ranges(&self) -> Result<BlockRanges>;
167
168    /// Remove header with given height from the store.
169    async fn remove_height(&self, height: u64) -> Result<()>;
170
171    /// Close store.
172    async fn close(self) -> Result<()>;
173}
174
175/// Representation of all the errors that can occur when interacting with the [`Store`].
176#[derive(Error, Debug)]
177pub enum StoreError {
178    /// Header not found.
179    #[error("Header not found in store")]
180    NotFound,
181
182    /// Non-fatal error during insertion.
183    #[error("Insertion failed: {0}")]
184    InsertionFailed(#[from] StoreInsertionError),
185
186    /// Storage corrupted.
187    #[error("Stored data are inconsistent or invalid, try reseting the store: {0}")]
188    StoredDataError(String),
189
190    /// Unrecoverable error reported by the database.
191    #[error("Database reported unrecoverable error: {0}")]
192    FatalDatabaseError(String),
193
194    /// An error propagated from the async executor.
195    #[error("Received error from executor: {0}")]
196    ExecutorError(String),
197
198    /// Failed to open the store.
199    #[error("Error opening store: {0}")]
200    OpenFailed(String),
201}
202
203/// Store insersion non-fatal errors.
204#[derive(Error, Debug)]
205pub enum StoreInsertionError {
206    /// Provided headers failed verification.
207    #[error("Provided headers failed verification: {0}")]
208    HeadersVerificationFailed(String),
209
210    /// Provided headers cannot be appended on existing headers of the store.
211    #[error("Provided headers failed to be verified with existing neighbors: {0}")]
212    NeighborsVerificationFailed(String),
213
214    /// Store containts are not met.
215    #[error("Contraints not met: {0}")]
216    ContraintsNotMet(BlockRangesError),
217
218    // TODO: Same hash for two different heights is not really possible
219    // and `ExtendedHeader::validate` would return an error.
220    // Remove this when a type-safe validation is implemented.
221    /// Hash already exists in the store.
222    #[error("Hash {0} already exists in store")]
223    HashExists(Hash),
224}
225
226impl StoreError {
227    /// Returns `true` if an error is fatal.
228    pub(crate) fn is_fatal(&self) -> bool {
229        match self {
230            StoreError::StoredDataError(_)
231            | StoreError::FatalDatabaseError(_)
232            | StoreError::ExecutorError(_)
233            | StoreError::OpenFailed(_) => true,
234            StoreError::NotFound | StoreError::InsertionFailed(_) => false,
235        }
236    }
237}
238
239#[cfg(not(target_arch = "wasm32"))]
240impl From<tokio::task::JoinError> for StoreError {
241    fn from(error: tokio::task::JoinError) -> StoreError {
242        StoreError::ExecutorError(error.to_string())
243    }
244}
245
246// Needed for `Into<VerifiedExtendedHeaders>`
247impl From<Infallible> for StoreError {
248    fn from(_: Infallible) -> Self {
249        // Infallible should not be possible to construct
250        unreachable!("Infallible failed")
251    }
252}
253
254#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
255#[wasm_bindgen]
256impl SamplingMetadata {
257    /// Return Array of cids
258    #[wasm_bindgen(getter)]
259    pub fn cids(&self) -> Vec<js_sys::Uint8Array> {
260        self.cids
261            .iter()
262            .map(|cid| js_sys::Uint8Array::from(cid.to_bytes().as_ref()))
263            .collect()
264    }
265}
266
267#[derive(Message)]
268struct RawSamplingMetadata {
269    #[prost(bool, tag = "1")]
270    accepted: bool,
271
272    #[prost(message, repeated, tag = "2")]
273    cids: Vec<Vec<u8>>,
274
275    #[prost(bool, tag = "3")]
276    unknown: bool,
277}
278
279impl Protobuf<RawSamplingMetadata> for SamplingMetadata {}
280
281impl TryFrom<RawSamplingMetadata> for SamplingMetadata {
282    type Error = cid::Error;
283
284    fn try_from(item: RawSamplingMetadata) -> Result<Self, Self::Error> {
285        let status = if item.unknown {
286            SamplingStatus::Unknown
287        } else if item.accepted {
288            SamplingStatus::Accepted
289        } else {
290            SamplingStatus::Rejected
291        };
292
293        let cids = item
294            .cids
295            .iter()
296            .map(|cid| {
297                let buffer = Cursor::new(cid);
298                Cid::read_bytes(buffer)
299            })
300            .collect::<Result<_, _>>()?;
301
302        Ok(SamplingMetadata { status, cids })
303    }
304}
305
306impl From<SamplingMetadata> for RawSamplingMetadata {
307    fn from(item: SamplingMetadata) -> Self {
308        let cids = item.cids.iter().map(|cid| cid.to_bytes()).collect();
309
310        let (accepted, unknown) = match item.status {
311            SamplingStatus::Unknown => (false, true),
312            SamplingStatus::Accepted => (true, false),
313            SamplingStatus::Rejected => (false, false),
314        };
315
316        RawSamplingMetadata {
317            accepted,
318            unknown,
319            cids,
320        }
321    }
322}
323
324/// a helper function to convert any kind of range to the inclusive range of header heights.
325fn to_headers_range(bounds: impl RangeBounds<u64>, last_index: u64) -> Result<RangeInclusive<u64>> {
326    let start = match bounds.start_bound() {
327        // in case of unbounded, default to the first height
328        Bound::Unbounded => 1,
329        // range starts after the last index or before first height
330        Bound::Included(&x) if x > last_index || x == 0 => return Err(StoreError::NotFound),
331        Bound::Excluded(&x) if x >= last_index => return Err(StoreError::NotFound),
332        // valid start indexes
333        Bound::Included(&x) => x,
334        Bound::Excluded(&x) => x + 1, // can't overflow thanks to last_index check
335    };
336    let end = match bounds.end_bound() {
337        // in case of unbounded, default to the last index
338        Bound::Unbounded => last_index,
339        // range ends after the last index
340        Bound::Included(&x) if x > last_index => return Err(StoreError::NotFound),
341        Bound::Excluded(&x) if x > last_index + 1 => return Err(StoreError::NotFound),
342        // prevent the underflow later on
343        Bound::Excluded(&0) => 0,
344        // valid end indexes
345        Bound::Included(&x) => x,
346        Bound::Excluded(&x) => x - 1,
347    };
348
349    Ok(start..=end)
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use crate::test_utils::ExtendedHeaderGeneratorExt;
356    use celestia_types::test_utils::ExtendedHeaderGenerator;
357    use celestia_types::Height;
358    use lumina_utils::test_utils::async_test as test;
359    use rstest::rstest;
360
361    // rstest only supports attributes which last segment is `test`
362    // https://docs.rs/rstest/0.18.2/rstest/attr.rstest.html#inject-test-attribute
363    use crate::test_utils::new_block_ranges;
364
365    #[test]
366    async fn converts_bounded_ranges() {
367        assert_eq!(1..=15, to_headers_range(1..16, 100).unwrap());
368        assert_eq!(1..=15, to_headers_range(1..=15, 100).unwrap());
369        assert_eq!(300..=400, to_headers_range(300..401, 500).unwrap());
370        assert_eq!(300..=400, to_headers_range(300..=400, 500).unwrap());
371    }
372
373    #[test]
374    async fn starts_from_one_when_unbounded_start() {
375        assert_eq!(&1, to_headers_range(..=10, 100).unwrap().start());
376        assert_eq!(&1, to_headers_range(..10, 100).unwrap().start());
377        assert_eq!(&1, to_headers_range(.., 100).unwrap().start());
378    }
379
380    #[test]
381    async fn ends_on_last_index_when_unbounded_end() {
382        assert_eq!(&10, to_headers_range(1.., 10).unwrap().end());
383        assert_eq!(&11, to_headers_range(1.., 11).unwrap().end());
384        assert_eq!(&10, to_headers_range(.., 10).unwrap().end());
385    }
386
387    #[test]
388    async fn handle_ranges_ending_precisely_at_last_index() {
389        let last_index = 10;
390
391        let bounds_ending_at_last_index = [
392            (Bound::Unbounded, Bound::Included(last_index)),
393            (Bound::Unbounded, Bound::Excluded(last_index + 1)),
394        ];
395
396        for bound in bounds_ending_at_last_index {
397            let range = to_headers_range(bound, last_index).unwrap();
398            assert_eq!(*range.end(), last_index);
399        }
400    }
401
402    #[test]
403    async fn handle_ranges_ending_after_last_index() {
404        let last_index = 10;
405
406        let bounds_ending_after_last_index = [
407            (Bound::Unbounded, Bound::Included(last_index + 1)),
408            (Bound::Unbounded, Bound::Excluded(last_index + 2)),
409        ];
410
411        for bound in bounds_ending_after_last_index {
412            to_headers_range(bound, last_index).unwrap_err();
413        }
414    }
415
416    #[test]
417    async fn errors_if_zero_heigth_is_included() {
418        let includes_zero_height = 0..5;
419        to_headers_range(includes_zero_height, 10).unwrap_err();
420    }
421
422    #[test]
423    async fn handle_ranges_starting_precisely_at_last_index() {
424        let last_index = 10;
425
426        let bounds_starting_at_last_index = [
427            (Bound::Included(last_index), Bound::Unbounded),
428            (Bound::Excluded(last_index - 1), Bound::Unbounded),
429        ];
430
431        for bound in bounds_starting_at_last_index {
432            let range = to_headers_range(bound, last_index).unwrap();
433            assert_eq!(*range.start(), last_index);
434        }
435    }
436
437    #[test]
438    async fn handle_ranges_starting_after_last_index() {
439        let last_index = 10;
440
441        let bounds_starting_after_last_index = [
442            (Bound::Included(last_index + 1), Bound::Unbounded),
443            (Bound::Excluded(last_index), Bound::Unbounded),
444        ];
445
446        for bound in bounds_starting_after_last_index {
447            to_headers_range(bound, last_index).unwrap_err();
448        }
449    }
450
451    #[test]
452    async fn handle_ranges_that_lead_to_empty_ranges() {
453        let last_index = 10;
454
455        let bounds_leading_to_empty_range = [
456            (Bound::Unbounded, Bound::Excluded(0)),
457            (Bound::Included(3), Bound::Excluded(3)),
458            (Bound::Included(3), Bound::Included(2)),
459            (Bound::Excluded(2), Bound::Included(2)),
460        ];
461
462        for bound in bounds_leading_to_empty_range {
463            assert!(to_headers_range(bound, last_index).unwrap().is_empty());
464        }
465    }
466
467    #[rstest]
468    #[case::in_memory(new_in_memory_store())]
469    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
470    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
471    #[self::test]
472    async fn test_contains_height<S: Store>(
473        #[case]
474        #[future(awt)]
475        s: S,
476    ) {
477        let mut s = s;
478        fill_store(&mut s, 2).await;
479
480        assert!(!s.has_at(0).await);
481        assert!(s.has_at(1).await);
482        assert!(s.has_at(2).await);
483        assert!(!s.has_at(3).await);
484    }
485
486    #[rstest]
487    #[case::in_memory(new_in_memory_store())]
488    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
489    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
490    #[self::test]
491    async fn test_empty_store<S: Store>(
492        #[case]
493        #[future(awt)]
494        s: S,
495    ) {
496        assert!(matches!(s.head_height().await, Err(StoreError::NotFound)));
497        assert!(matches!(s.get_head().await, Err(StoreError::NotFound)));
498        assert!(matches!(
499            s.get_by_height(1).await,
500            Err(StoreError::NotFound)
501        ));
502        assert!(matches!(
503            s.get_by_hash(&Hash::Sha256([0; 32])).await,
504            Err(StoreError::NotFound)
505        ));
506    }
507
508    #[rstest]
509    #[case::in_memory(new_in_memory_store())]
510    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
511    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
512    #[self::test]
513    async fn test_read_write<S: Store>(
514        #[case]
515        #[future(awt)]
516        s: S,
517    ) {
518        let mut gen = ExtendedHeaderGenerator::new();
519
520        let header = gen.next();
521
522        s.insert(header.clone()).await.unwrap();
523        assert_eq!(s.head_height().await.unwrap(), 1);
524        assert_eq!(s.get_head().await.unwrap(), header);
525        assert_eq!(s.get_by_height(1).await.unwrap(), header);
526        assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
527    }
528
529    #[rstest]
530    #[case::in_memory(new_in_memory_store())]
531    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
532    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
533    #[self::test]
534    async fn test_pregenerated_data<S: Store>(
535        #[case]
536        #[future(awt)]
537        s: S,
538    ) {
539        let mut s = s;
540        fill_store(&mut s, 100).await;
541
542        assert_eq!(s.head_height().await.unwrap(), 100);
543        let head = s.get_head().await.unwrap();
544        assert_eq!(s.get_by_height(100).await.unwrap(), head);
545        assert!(matches!(
546            s.get_by_height(101).await,
547            Err(StoreError::NotFound)
548        ));
549
550        let header = s.get_by_height(54).await.unwrap();
551        assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
552    }
553
554    #[rstest]
555    #[case::in_memory(new_in_memory_store())]
556    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
557    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
558    #[self::test]
559    async fn test_duplicate_insert<S: Store>(
560        #[case]
561        #[future(awt)]
562        s: S,
563    ) {
564        let mut s = s;
565        let mut gen = fill_store(&mut s, 100).await;
566
567        let header101 = gen.next();
568        s.insert(header101.clone()).await.unwrap();
569
570        let error = match s.insert(header101).await {
571            Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
572            res => panic!("Invalid result: {res:?}"),
573        };
574
575        assert_eq!(
576            error,
577            BlockRangesError::BlockRangeOverlap(101..=101, 101..=101)
578        );
579    }
580
581    #[rstest]
582    #[case::in_memory(new_in_memory_store())]
583    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
584    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
585    #[self::test]
586    async fn test_overwrite_height<S: Store>(
587        #[case]
588        #[future(awt)]
589        s: S,
590    ) {
591        let mut s = s;
592        let gen = fill_store(&mut s, 100).await;
593
594        // Height 30 with different hash
595        let header29 = s.get_by_height(29).await.unwrap();
596        let header30 = gen.next_of(&header29);
597
598        let error = match s.insert(header30).await {
599            Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
600            res => panic!("Invalid result: {res:?}"),
601        };
602        assert_eq!(error, BlockRangesError::BlockRangeOverlap(30..=30, 30..=30));
603    }
604
605    #[rstest]
606    #[case::in_memory(new_in_memory_store())]
607    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
608    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
609    #[self::test]
610    async fn test_overwrite_hash<S: Store>(
611        #[case]
612        #[future(awt)]
613        s: S,
614    ) {
615        let mut s = s;
616        fill_store(&mut s, 100).await;
617
618        let mut dup_header = s.get_by_height(99).await.unwrap();
619        dup_header.header.height = Height::from(102u32);
620
621        assert!(matches!(
622            s.insert(dup_header).await,
623            Err(StoreError::InsertionFailed(
624                StoreInsertionError::HashExists(_)
625            ))
626        ));
627    }
628
629    #[rstest]
630    #[case::in_memory(new_in_memory_store())]
631    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
632    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
633    #[self::test]
634    async fn test_append_range<S: Store>(
635        #[case]
636        #[future(awt)]
637        s: S,
638    ) {
639        let mut s = s;
640        let mut gen = fill_store(&mut s, 10).await;
641
642        s.insert(gen.next_many_verified(4)).await.unwrap();
643        s.get_by_height(14).await.unwrap();
644    }
645
646    #[rstest]
647    #[case::in_memory(new_in_memory_store())]
648    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
649    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
650    #[self::test]
651    async fn test_fill_range_gap<S: Store>(
652        #[case]
653        #[future(awt)]
654        s: S,
655    ) {
656        let mut s = s;
657        let mut gen = fill_store(&mut s, 10).await;
658
659        // height 11
660        let skipped = gen.next();
661        // height 12
662        let upcoming_head = gen.next();
663
664        s.insert(upcoming_head).await.unwrap();
665        s.insert(skipped).await.unwrap();
666    }
667
668    #[rstest]
669    #[case::in_memory(new_in_memory_store())]
670    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
671    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
672    #[self::test]
673    async fn test_fill_range_gap_with_invalid_header<S: Store>(
674        #[case]
675        #[future(awt)]
676        s: S,
677    ) {
678        let mut s = s;
679        let mut gen = fill_store(&mut s, 10).await;
680
681        let mut gen_prime = gen.fork();
682        // height 11
683        let _skipped = gen.next();
684        let another_chain = gen_prime.next();
685        // height 12
686        let upcoming_head = gen.next();
687
688        s.insert(upcoming_head).await.unwrap();
689        assert!(matches!(
690            s.insert(another_chain).await,
691            Err(StoreError::InsertionFailed(
692                StoreInsertionError::NeighborsVerificationFailed(_)
693            ))
694        ));
695    }
696
697    #[rstest]
698    #[case::in_memory(new_in_memory_store())]
699    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
700    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
701    #[self::test]
702    async fn test_appends_with_gaps<S: Store>(
703        #[case]
704        #[future(awt)]
705        s: S,
706    ) {
707        let mut gen = ExtendedHeaderGenerator::new_from_height(5);
708        let header5 = gen.next();
709        gen.next_many(4);
710        let header10 = gen.next();
711        gen.next_many(4);
712        let header15 = gen.next();
713
714        s.insert(header5).await.unwrap();
715        s.insert(header15).await.unwrap();
716        s.insert(header10).await.unwrap_err();
717    }
718
719    #[rstest]
720    #[case::in_memory(new_in_memory_store())]
721    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
722    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
723    #[self::test]
724    async fn test_sampling_height_empty_store<S: Store>(
725        #[case]
726        #[future(awt)]
727        store: S,
728    ) {
729        store
730            .update_sampling_metadata(0, SamplingStatus::Accepted, vec![])
731            .await
732            .unwrap_err();
733        store
734            .update_sampling_metadata(1, SamplingStatus::Accepted, vec![])
735            .await
736            .unwrap_err();
737    }
738
739    #[rstest]
740    #[case::in_memory(new_in_memory_store())]
741    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
742    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
743    #[self::test]
744    async fn test_sampling_height<S: Store>(
745        #[case]
746        #[future(awt)]
747        s: S,
748    ) {
749        let mut store = s;
750        fill_store(&mut store, 9).await;
751
752        store
753            .update_sampling_metadata(0, SamplingStatus::Accepted, vec![])
754            .await
755            .unwrap_err();
756        store
757            .update_sampling_metadata(1, SamplingStatus::Accepted, vec![])
758            .await
759            .unwrap();
760        store
761            .update_sampling_metadata(2, SamplingStatus::Accepted, vec![])
762            .await
763            .unwrap();
764        store
765            .update_sampling_metadata(3, SamplingStatus::Rejected, vec![])
766            .await
767            .unwrap();
768        store
769            .update_sampling_metadata(4, SamplingStatus::Accepted, vec![])
770            .await
771            .unwrap();
772        store
773            .update_sampling_metadata(5, SamplingStatus::Rejected, vec![])
774            .await
775            .unwrap();
776        store
777            .update_sampling_metadata(6, SamplingStatus::Rejected, vec![])
778            .await
779            .unwrap();
780
781        store
782            .update_sampling_metadata(8, SamplingStatus::Accepted, vec![])
783            .await
784            .unwrap();
785
786        store
787            .update_sampling_metadata(7, SamplingStatus::Accepted, vec![])
788            .await
789            .unwrap();
790
791        store
792            .update_sampling_metadata(9, SamplingStatus::Accepted, vec![])
793            .await
794            .unwrap();
795
796        store
797            .update_sampling_metadata(10, SamplingStatus::Accepted, vec![])
798            .await
799            .unwrap_err();
800        store
801            .update_sampling_metadata(10, SamplingStatus::Rejected, vec![])
802            .await
803            .unwrap_err();
804        store
805            .update_sampling_metadata(20, SamplingStatus::Accepted, vec![])
806            .await
807            .unwrap_err();
808    }
809
810    #[rstest]
811    #[case::in_memory(new_in_memory_store())]
812    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
813    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
814    #[self::test]
815    async fn test_sampling_merge<S: Store>(
816        #[case]
817        #[future(awt)]
818        s: S,
819    ) {
820        let mut store = s;
821        fill_store(&mut store, 1).await;
822
823        let cid0 = "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ"
824            .parse()
825            .unwrap();
826        let cid1 = "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA"
827            .parse()
828            .unwrap();
829        let cid2 = "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq"
830            .parse()
831            .unwrap();
832
833        store
834            .update_sampling_metadata(1, SamplingStatus::Rejected, vec![cid0])
835            .await
836            .unwrap();
837
838        store
839            .update_sampling_metadata(1, SamplingStatus::Rejected, vec![])
840            .await
841            .unwrap();
842
843        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
844        assert_eq!(sampling_data.status, SamplingStatus::Rejected);
845        assert_eq!(sampling_data.cids, vec![cid0]);
846
847        store
848            .update_sampling_metadata(1, SamplingStatus::Accepted, vec![cid1])
849            .await
850            .unwrap();
851
852        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
853        assert_eq!(sampling_data.status, SamplingStatus::Accepted);
854        assert_eq!(sampling_data.cids, vec![cid0, cid1]);
855
856        store
857            .update_sampling_metadata(1, SamplingStatus::Accepted, vec![cid0, cid2])
858            .await
859            .unwrap();
860
861        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
862        assert_eq!(sampling_data.status, SamplingStatus::Accepted);
863        assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
864    }
865
866    #[rstest]
867    #[case::in_memory(new_in_memory_store())]
868    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
869    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
870    #[self::test]
871    async fn test_sampled_cids<S: Store>(
872        #[case]
873        #[future(awt)]
874        s: S,
875    ) {
876        let mut store = s;
877        fill_store(&mut store, 5).await;
878
879        let cids: Vec<Cid> = [
880            "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq",
881            "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi",
882            "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ",
883            "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA",
884        ]
885        .iter()
886        .map(|s| s.parse().unwrap())
887        .collect();
888
889        store
890            .update_sampling_metadata(1, SamplingStatus::Accepted, cids.clone())
891            .await
892            .unwrap();
893        store
894            .update_sampling_metadata(2, SamplingStatus::Accepted, cids[0..1].to_vec())
895            .await
896            .unwrap();
897        store
898            .update_sampling_metadata(4, SamplingStatus::Rejected, cids[3..].to_vec())
899            .await
900            .unwrap();
901        store
902            .update_sampling_metadata(5, SamplingStatus::Rejected, vec![])
903            .await
904            .unwrap();
905
906        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
907        assert_eq!(sampling_data.cids, cids);
908        assert_eq!(sampling_data.status, SamplingStatus::Accepted);
909
910        let sampling_data = store.get_sampling_metadata(2).await.unwrap().unwrap();
911        assert_eq!(sampling_data.cids, cids[0..1]);
912        assert_eq!(sampling_data.status, SamplingStatus::Accepted);
913
914        assert!(store.get_sampling_metadata(3).await.unwrap().is_none());
915
916        let sampling_data = store.get_sampling_metadata(4).await.unwrap().unwrap();
917        assert_eq!(sampling_data.cids, cids[3..]);
918        assert_eq!(sampling_data.status, SamplingStatus::Rejected);
919
920        let sampling_data = store.get_sampling_metadata(5).await.unwrap().unwrap();
921        assert_eq!(sampling_data.cids, vec![]);
922        assert_eq!(sampling_data.status, SamplingStatus::Rejected);
923
924        assert!(matches!(
925            store.get_sampling_metadata(0).await,
926            Err(StoreError::NotFound)
927        ));
928        assert!(matches!(
929            store.get_sampling_metadata(6).await,
930            Err(StoreError::NotFound)
931        ));
932        assert!(matches!(
933            store.get_sampling_metadata(100).await,
934            Err(StoreError::NotFound)
935        ));
936    }
937
938    #[rstest]
939    #[case::in_memory(new_in_memory_store())]
940    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
941    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
942    #[self::test]
943    async fn test_empty_store_range<S: Store>(
944        #[case]
945        #[future(awt)]
946        s: S,
947    ) {
948        let store = s;
949
950        assert_eq!(
951            store.get_stored_header_ranges().await.unwrap().as_ref(),
952            &[]
953        );
954    }
955
956    #[rstest]
957    #[case::in_memory(new_in_memory_store())]
958    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
959    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
960    #[self::test]
961    async fn test_single_header_range<S: Store>(
962        #[case]
963        #[future(awt)]
964        s: S,
965    ) {
966        let store = s;
967        let mut gen = ExtendedHeaderGenerator::new();
968
969        gen.skip(19);
970
971        let prepend0 = gen.next();
972        let prepend1 = gen.next_many_verified(5);
973        store.insert(gen.next_many_verified(4)).await.unwrap();
974        store.insert(gen.next_many_verified(5)).await.unwrap();
975        store.insert(prepend1).await.unwrap();
976        store.insert(prepend0).await.unwrap();
977        store.insert(gen.next_many_verified(5)).await.unwrap();
978        store.insert(gen.next()).await.unwrap();
979
980        let final_ranges = store.get_stored_header_ranges().await.unwrap();
981        assert_eq!(final_ranges.as_ref(), &[20..=40]);
982    }
983
984    // no in-memory store for tests below. It doesn't expect to be resumed from disk,
985    // so it doesn't support multiple ranges.
986    #[rstest]
987    #[case::in_memory(new_in_memory_store())]
988    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
989    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
990    #[self::test]
991    async fn test_ranges_consolidation<S: Store>(
992        #[case]
993        #[future(awt)]
994        s: S,
995    ) {
996        let store = s;
997        let mut gen = ExtendedHeaderGenerator::new();
998
999        gen.skip(9);
1000
1001        let skip0 = gen.next_many_verified(5);
1002        store.insert(gen.next_many_verified(2)).await.unwrap();
1003        store.insert(gen.next_many_verified(3)).await.unwrap();
1004
1005        let skip1 = gen.next();
1006        store.insert(gen.next()).await.unwrap();
1007
1008        let skip2 = gen.next_many_verified(5);
1009
1010        store.insert(gen.next()).await.unwrap();
1011
1012        let skip3 = gen.next_many_verified(5);
1013        let skip4 = gen.next_many_verified(5);
1014        let skip5 = gen.next_many_verified(5);
1015
1016        store.insert(skip5).await.unwrap();
1017        store.insert(skip4).await.unwrap();
1018        store.insert(skip3).await.unwrap();
1019        store.insert(skip2).await.unwrap();
1020        store.insert(skip1).await.unwrap();
1021        store.insert(skip0).await.unwrap();
1022
1023        let final_ranges = store.get_stored_header_ranges().await.unwrap();
1024        assert_eq!(final_ranges.as_ref(), &[10..=42]);
1025    }
1026
1027    #[rstest]
1028    #[case::in_memory(new_in_memory_store())]
1029    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1030    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1031    #[self::test]
1032    async fn test_neighbour_validation<S: Store>(
1033        #[case]
1034        #[future(awt)]
1035        s: S,
1036    ) {
1037        let store = s;
1038        let mut gen = ExtendedHeaderGenerator::new();
1039
1040        store.insert(gen.next_many_verified(5)).await.unwrap();
1041        let mut fork = gen.fork();
1042        let _gap = gen.next();
1043        store.insert(gen.next_many_verified(4)).await.unwrap();
1044
1045        store.insert(fork.next()).await.unwrap_err();
1046    }
1047
1048    #[rstest]
1049    #[case::in_memory(new_in_memory_store())]
1050    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1051    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1052    #[self::test]
1053    async fn tail_removal_partial_range<S: Store>(
1054        #[case]
1055        #[future(awt)]
1056        s: S,
1057    ) {
1058        let store = s;
1059        let headers = ExtendedHeaderGenerator::new().next_many(128);
1060
1061        store.insert(&headers[0..64]).await.unwrap();
1062        store.insert(&headers[96..128]).await.unwrap();
1063        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1064
1065        store.remove_height(1).await.unwrap();
1066        assert_store(&store, &headers, new_block_ranges([2..=64, 97..=128])).await;
1067    }
1068
1069    #[rstest]
1070    #[case::in_memory(new_in_memory_store())]
1071    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1072    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1073    #[self::test]
1074    async fn tail_removal_full_range<S: Store>(
1075        #[case]
1076        #[future(awt)]
1077        s: S,
1078    ) {
1079        let store = s;
1080        let headers = ExtendedHeaderGenerator::new().next_many(128);
1081
1082        store.insert(&headers[0..1]).await.unwrap();
1083        store.insert(&headers[65..128]).await.unwrap();
1084        assert_store(&store, &headers, new_block_ranges([1..=1, 66..=128])).await;
1085
1086        store.remove_height(1).await.unwrap();
1087        assert_store(&store, &headers, new_block_ranges([66..=128])).await;
1088    }
1089
1090    #[rstest]
1091    #[case::in_memory(new_in_memory_store())]
1092    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1093    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1094    #[self::test]
1095    async fn tail_removal_remove_all<S: Store>(
1096        #[case]
1097        #[future(awt)]
1098        s: S,
1099    ) {
1100        let store = s;
1101        let headers = ExtendedHeaderGenerator::new().next_many(66);
1102
1103        store.insert(&headers[..]).await.unwrap();
1104        assert_store(&store, &headers, new_block_ranges([1..=66])).await;
1105
1106        for i in 1..=66 {
1107            store.remove_height(i).await.unwrap();
1108        }
1109
1110        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
1111        assert!(stored_ranges.is_empty());
1112
1113        for h in 1..=66 {
1114            assert!(!store.has_at(h).await);
1115        }
1116    }
1117
1118    #[rstest]
1119    #[case::in_memory(new_in_memory_store())]
1120    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1121    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1122    #[self::test]
1123    async fn head_removal_partial_range<S: Store>(
1124        #[case]
1125        #[future(awt)]
1126        s: S,
1127    ) {
1128        let store = s;
1129        let headers = ExtendedHeaderGenerator::new().next_many(128);
1130
1131        store.insert(&headers[0..64]).await.unwrap();
1132        store.insert(&headers[96..128]).await.unwrap();
1133        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1134
1135        store.remove_height(128).await.unwrap();
1136        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=127])).await;
1137    }
1138
1139    #[rstest]
1140    #[case::in_memory(new_in_memory_store())]
1141    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1142    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1143    #[self::test]
1144    async fn head_removal_full_range<S: Store>(
1145        #[case]
1146        #[future(awt)]
1147        s: S,
1148    ) {
1149        let store = s;
1150        let headers = ExtendedHeaderGenerator::new().next_many(128);
1151
1152        store.insert(&headers[0..64]).await.unwrap();
1153        store.insert(&headers[127..128]).await.unwrap();
1154        assert_store(&store, &headers, new_block_ranges([1..=64, 128..=128])).await;
1155
1156        store.remove_height(128).await.unwrap();
1157        assert_store(&store, &headers, new_block_ranges([1..=64])).await;
1158    }
1159
1160    #[rstest]
1161    #[case::in_memory(new_in_memory_store())]
1162    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1163    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1164    #[self::test]
1165    async fn middle_removal<S: Store>(
1166        #[case]
1167        #[future(awt)]
1168        s: S,
1169    ) {
1170        let store = s;
1171        let headers = ExtendedHeaderGenerator::new().next_many(128);
1172
1173        store.insert(&headers[0..64]).await.unwrap();
1174        store.insert(&headers[96..128]).await.unwrap();
1175        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1176
1177        store.remove_height(62).await.unwrap();
1178        assert_store(
1179            &store,
1180            &headers,
1181            new_block_ranges([1..=61, 63..=64, 97..=128]),
1182        )
1183        .await;
1184
1185        store.remove_height(64).await.unwrap();
1186        assert_store(
1187            &store,
1188            &headers,
1189            new_block_ranges([1..=61, 63..=63, 97..=128]),
1190        )
1191        .await;
1192
1193        store.remove_height(63).await.unwrap();
1194        assert_store(&store, &headers, new_block_ranges([1..=61, 97..=128])).await;
1195    }
1196
1197    #[rstest]
1198    #[case::in_memory(new_in_memory_store())]
1199    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1200    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1201    #[self::test]
1202    async fn neighbor_removal<S: Store>(
1203        #[case]
1204        #[future(awt)]
1205        s: S,
1206    ) {
1207        let store = s;
1208        let headers = ExtendedHeaderGenerator::new().next_many(128);
1209
1210        store.insert(&headers[0..64]).await.unwrap();
1211        store.insert(&headers[96..128]).await.unwrap();
1212        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1213
1214        store.remove_height(64).await.unwrap();
1215        assert_store(&store, &headers, new_block_ranges([1..=63, 97..=128])).await;
1216
1217        store.remove_height(97).await.unwrap();
1218        assert_store(&store, &headers, new_block_ranges([1..=63, 98..=128])).await;
1219    }
1220
1221    /// Fills an empty store
1222    async fn fill_store<S: Store>(store: &mut S, amount: u64) -> ExtendedHeaderGenerator {
1223        assert!(!store.has_at(1).await, "Store is not empty");
1224
1225        let mut gen = ExtendedHeaderGenerator::new();
1226
1227        store
1228            .insert(gen.next_many_verified(amount))
1229            .await
1230            .expect("inserting test data failed");
1231
1232        gen
1233    }
1234
1235    async fn new_in_memory_store() -> InMemoryStore {
1236        InMemoryStore::new()
1237    }
1238
1239    pub(crate) async fn assert_store<S: Store>(
1240        store: &S,
1241        headers: &[ExtendedHeader],
1242        expected_ranges: BlockRanges,
1243    ) {
1244        assert_eq!(
1245            store.get_stored_header_ranges().await.unwrap(),
1246            expected_ranges
1247        );
1248        for header in headers {
1249            let height = header.height().value();
1250            if expected_ranges.contains(height) {
1251                assert_eq!(&store.get_by_height(height).await.unwrap(), header);
1252                assert_eq!(&store.get_by_hash(&header.hash()).await.unwrap(), header);
1253            } else {
1254                assert!(matches!(
1255                    store.get_by_height(height).await.unwrap_err(),
1256                    StoreError::NotFound
1257                ));
1258                assert!(matches!(
1259                    store.get_by_hash(&header.hash()).await.unwrap_err(),
1260                    StoreError::NotFound
1261                ));
1262            }
1263        }
1264    }
1265
1266    #[cfg(not(target_arch = "wasm32"))]
1267    async fn new_redb_store() -> RedbStore {
1268        RedbStore::in_memory().await.unwrap()
1269    }
1270
1271    #[cfg(target_arch = "wasm32")]
1272    async fn new_indexed_db_store() -> IndexedDbStore {
1273        use std::sync::atomic::{AtomicU32, Ordering};
1274        static NEXT_ID: AtomicU32 = AtomicU32::new(0);
1275
1276        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
1277        let db_name = format!("indexeddb-lumina-node-store-test-{id}");
1278
1279        // DB can persist if test run within the browser
1280        rexie::Rexie::delete(&db_name).await.unwrap();
1281
1282        IndexedDbStore::new(&db_name)
1283            .await
1284            .expect("creating test store failed")
1285    }
1286}