lumina_node/
store.rs

1//! Primitives related to the [`ExtendedHeader`] storage.
2
3use std::convert::Infallible;
4use std::fmt::{Debug, Display};
5use std::io::Cursor;
6use std::ops::{Bound, RangeBounds, RangeInclusive};
7
8use async_trait::async_trait;
9use celestia_types::hash::Hash;
10use celestia_types::ExtendedHeader;
11use cid::Cid;
12use prost::Message;
13use serde::{Deserialize, Serialize};
14use tendermint_proto::Protobuf;
15use thiserror::Error;
16#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
17use wasm_bindgen::prelude::*;
18
19pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError};
20pub use crate::store::either_store::EitherStore;
21pub use crate::store::utils::VerifiedExtendedHeaders;
22
23pub use in_memory_store::InMemoryStore;
24#[cfg(target_arch = "wasm32")]
25pub use indexed_db_store::IndexedDbStore;
26#[cfg(not(target_arch = "wasm32"))]
27pub use redb_store::RedbStore;
28
29mod either_store;
30mod in_memory_store;
31#[cfg(target_arch = "wasm32")]
32mod indexed_db_store;
33#[cfg(not(target_arch = "wasm32"))]
34mod redb_store;
35
36pub(crate) mod utils;
37
38/// Sampling metadata for a block.
39///
40/// This struct persists DAS-ing information in a header store for future reference.
41#[derive(Debug, Default, Clone, Serialize, Deserialize)]
42#[cfg_attr(all(feature = "wasm-bindgen", target_arch = "wasm32"), wasm_bindgen)]
43pub struct SamplingMetadata {
44    /// List of CIDs used while sampling. Can be used to remove associated data
45    /// from Blockstore, when cleaning up the old ExtendedHeaders
46    #[cfg_attr(
47        all(feature = "wasm-bindgen", target_arch = "wasm32"),
48        wasm_bindgen(skip)
49    )]
50    pub cids: Vec<Cid>,
51}
52
53type Result<T, E = StoreError> = std::result::Result<T, E>;
54
55/// An asynchronous [`ExtendedHeader`] storage.
56///
57/// Currently it is required that all the headers are inserted to the storage
58/// in order, starting from the genesis.
59#[async_trait]
60pub trait Store: Send + Sync + Debug {
61    /// Returns the [`ExtendedHeader`] with the highest height.
62    async fn get_head(&self) -> Result<ExtendedHeader>;
63
64    /// Returns the header of a specific hash.
65    async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader>;
66
67    /// Returns the header of a specific height.
68    async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader>;
69
70    /// Returns when new head is available in the `Store`.
71    async fn wait_new_head(&self) -> u64;
72
73    /// Returns when `height` is available in the `Store`.
74    async fn wait_height(&self, height: u64) -> Result<()>;
75
76    /// Returns the headers from the given heights range.
77    ///
78    /// If start of the range is unbounded, the first returned header will be of height 1.
79    /// If end of the range is unbounded, the last returned header will be the last header in the
80    /// store.
81    ///
82    /// # Errors
83    ///
84    /// If range contains a height of a header that is not found in the store or [`RangeBounds`]
85    /// cannot be converted to a valid range.
86    async fn get_range<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
87    where
88        R: RangeBounds<u64> + Send,
89    {
90        let head_height = self.head_height().await?;
91        let range = to_headers_range(range, head_height)?;
92
93        let amount = if range.is_empty() {
94            0
95        } else {
96            range.end() - range.start() + 1 // add one as it's inclusive
97        };
98
99        let mut headers = Vec::with_capacity(amount.try_into().unwrap_or(usize::MAX));
100
101        for height in range {
102            let header = self.get_by_height(height).await?;
103            headers.push(header);
104        }
105
106        Ok(headers)
107    }
108
109    /// Returns the highest known height.
110    async fn head_height(&self) -> Result<u64>;
111
112    /// Returns true if hash exists in the store.
113    async fn has(&self, hash: &Hash) -> bool;
114
115    /// Returns true if height exists in the store.
116    async fn has_at(&self, height: u64) -> bool;
117
118    /// Sets or updates sampling metadata for the header.
119    ///
120    /// In case of update, provided CID list is appended onto the existing one, as not to lose
121    /// references to previously sampled blocks.
122    async fn update_sampling_metadata(&self, height: u64, cids: Vec<Cid>) -> Result<()>;
123
124    /// Gets the sampling metadata for the height.
125    ///
126    /// `Err(StoreError::NotFound)` indicates that both header **and** sampling metadata for the requested
127    /// height are not in the store.
128    ///
129    /// `Ok(None)` indicates that header is in the store but sampling metadata is not set yet.
130    async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>>;
131
132    /// Mark block as sampled.
133    async fn mark_as_sampled(&self, height: u64) -> Result<()>;
134
135    /// Insert a range of headers into the store.
136    ///
137    /// New insertion should pass all the constraints in [`BlockRanges::check_insertion_constraints`],
138    /// additionaly it should be [`ExtendedHeader::verify`]ed against neighbor headers.
139    async fn insert<R>(&self, headers: R) -> Result<()>
140    where
141        R: TryInto<VerifiedExtendedHeaders> + Send,
142        <R as TryInto<VerifiedExtendedHeaders>>::Error: Display;
143
144    /// Returns a list of header ranges currenty held in store.
145    async fn get_stored_header_ranges(&self) -> Result<BlockRanges>;
146
147    /// Returns a list of blocks that were sampled and their header is currenty held in store.
148    async fn get_sampled_ranges(&self) -> Result<BlockRanges>;
149
150    /// Returns a list of headers that were pruned until now.
151    async fn get_pruned_ranges(&self) -> Result<BlockRanges>;
152
153    /// Remove header with given height from the store.
154    async fn remove_height(&self, height: u64) -> Result<()>;
155
156    /// Close store.
157    async fn close(self) -> Result<()>;
158}
159
160/// Representation of all the errors that can occur when interacting with the [`Store`].
161#[derive(Error, Debug)]
162pub enum StoreError {
163    /// Header not found.
164    #[error("Header not found in store")]
165    NotFound,
166
167    /// Non-fatal error during insertion.
168    #[error("Insertion failed: {0}")]
169    InsertionFailed(#[from] StoreInsertionError),
170
171    /// Storage corrupted.
172    #[error("Stored data are inconsistent or invalid, try reseting the store: {0}")]
173    StoredDataError(String),
174
175    /// Unrecoverable error reported by the database.
176    #[error("Database reported unrecoverable error: {0}")]
177    FatalDatabaseError(String),
178
179    /// An error propagated from the async executor.
180    #[error("Received error from executor: {0}")]
181    ExecutorError(String),
182
183    /// Failed to open the store.
184    #[error("Error opening store: {0}")]
185    OpenFailed(String),
186}
187
188/// Store insersion non-fatal errors.
189#[derive(Error, Debug)]
190pub enum StoreInsertionError {
191    /// Provided headers failed verification.
192    #[error("Provided headers failed verification: {0}")]
193    HeadersVerificationFailed(String),
194
195    /// Provided headers cannot be appended on existing headers of the store.
196    #[error("Provided headers failed to be verified with existing neighbors: {0}")]
197    NeighborsVerificationFailed(String),
198
199    /// Store containts are not met.
200    #[error("Contraints not met: {0}")]
201    ContraintsNotMet(BlockRangesError),
202
203    // TODO: Same hash for two different heights is not really possible
204    // and `ExtendedHeader::validate` would return an error.
205    // Remove this when a type-safe validation is implemented.
206    /// Hash already exists in the store.
207    #[error("Hash {0} already exists in store")]
208    HashExists(Hash),
209}
210
211impl StoreError {
212    /// Returns `true` if an error is fatal.
213    pub(crate) fn is_fatal(&self) -> bool {
214        match self {
215            StoreError::StoredDataError(_)
216            | StoreError::FatalDatabaseError(_)
217            | StoreError::ExecutorError(_)
218            | StoreError::OpenFailed(_) => true,
219            StoreError::NotFound | StoreError::InsertionFailed(_) => false,
220        }
221    }
222}
223
224#[cfg(not(target_arch = "wasm32"))]
225impl From<tokio::task::JoinError> for StoreError {
226    fn from(error: tokio::task::JoinError) -> StoreError {
227        StoreError::ExecutorError(error.to_string())
228    }
229}
230
231// Needed for `Into<VerifiedExtendedHeaders>`
232impl From<Infallible> for StoreError {
233    fn from(_: Infallible) -> Self {
234        // Infallible should not be possible to construct
235        unreachable!("Infallible failed")
236    }
237}
238
239#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
240#[wasm_bindgen]
241impl SamplingMetadata {
242    /// Return Array of cids
243    #[wasm_bindgen(getter)]
244    pub fn cids(&self) -> Vec<js_sys::Uint8Array> {
245        self.cids
246            .iter()
247            .map(|cid| js_sys::Uint8Array::from(cid.to_bytes().as_ref()))
248            .collect()
249    }
250}
251
252#[derive(Message)]
253struct RawSamplingMetadata {
254    // Tags 1 and 3 are reserved because they were used in previous versions
255    // of this struct.
256    #[prost(message, repeated, tag = "2")]
257    cids: Vec<Vec<u8>>,
258}
259
260impl Protobuf<RawSamplingMetadata> for SamplingMetadata {}
261
262impl TryFrom<RawSamplingMetadata> for SamplingMetadata {
263    type Error = cid::Error;
264
265    fn try_from(item: RawSamplingMetadata) -> Result<Self, Self::Error> {
266        let cids = item
267            .cids
268            .iter()
269            .map(|cid| {
270                let buffer = Cursor::new(cid);
271                Cid::read_bytes(buffer)
272            })
273            .collect::<Result<_, _>>()?;
274
275        Ok(SamplingMetadata { cids })
276    }
277}
278
279impl From<SamplingMetadata> for RawSamplingMetadata {
280    fn from(item: SamplingMetadata) -> Self {
281        let cids = item.cids.iter().map(|cid| cid.to_bytes()).collect();
282
283        RawSamplingMetadata { cids }
284    }
285}
286
287/// a helper function to convert any kind of range to the inclusive range of header heights.
288fn to_headers_range(bounds: impl RangeBounds<u64>, last_index: u64) -> Result<RangeInclusive<u64>> {
289    let start = match bounds.start_bound() {
290        // in case of unbounded, default to the first height
291        Bound::Unbounded => 1,
292        // range starts after the last index or before first height
293        Bound::Included(&x) if x > last_index || x == 0 => return Err(StoreError::NotFound),
294        Bound::Excluded(&x) if x >= last_index => return Err(StoreError::NotFound),
295        // valid start indexes
296        Bound::Included(&x) => x,
297        Bound::Excluded(&x) => x + 1, // can't overflow thanks to last_index check
298    };
299    let end = match bounds.end_bound() {
300        // in case of unbounded, default to the last index
301        Bound::Unbounded => last_index,
302        // range ends after the last index
303        Bound::Included(&x) if x > last_index => return Err(StoreError::NotFound),
304        Bound::Excluded(&x) if x > last_index + 1 => return Err(StoreError::NotFound),
305        // prevent the underflow later on
306        Bound::Excluded(&0) => 0,
307        // valid end indexes
308        Bound::Included(&x) => x,
309        Bound::Excluded(&x) => x - 1,
310    };
311
312    Ok(start..=end)
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use crate::test_utils::ExtendedHeaderGeneratorExt;
319    use celestia_types::test_utils::ExtendedHeaderGenerator;
320    use celestia_types::Height;
321    use rstest::rstest;
322    // rstest only supports attributes which last segment is `test`
323    // https://docs.rs/rstest/0.18.2/rstest/attr.rstest.html#inject-test-attribute
324    use lumina_utils::test_utils::async_test as test;
325
326    use crate::test_utils::new_block_ranges;
327
328    #[test]
329    async fn converts_bounded_ranges() {
330        assert_eq!(1..=15, to_headers_range(1..16, 100).unwrap());
331        assert_eq!(1..=15, to_headers_range(1..=15, 100).unwrap());
332        assert_eq!(300..=400, to_headers_range(300..401, 500).unwrap());
333        assert_eq!(300..=400, to_headers_range(300..=400, 500).unwrap());
334    }
335
336    #[test]
337    async fn starts_from_one_when_unbounded_start() {
338        assert_eq!(&1, to_headers_range(..=10, 100).unwrap().start());
339        assert_eq!(&1, to_headers_range(..10, 100).unwrap().start());
340        assert_eq!(&1, to_headers_range(.., 100).unwrap().start());
341    }
342
343    #[test]
344    async fn ends_on_last_index_when_unbounded_end() {
345        assert_eq!(&10, to_headers_range(1.., 10).unwrap().end());
346        assert_eq!(&11, to_headers_range(1.., 11).unwrap().end());
347        assert_eq!(&10, to_headers_range(.., 10).unwrap().end());
348    }
349
350    #[test]
351    async fn handle_ranges_ending_precisely_at_last_index() {
352        let last_index = 10;
353
354        let bounds_ending_at_last_index = [
355            (Bound::Unbounded, Bound::Included(last_index)),
356            (Bound::Unbounded, Bound::Excluded(last_index + 1)),
357        ];
358
359        for bound in bounds_ending_at_last_index {
360            let range = to_headers_range(bound, last_index).unwrap();
361            assert_eq!(*range.end(), last_index);
362        }
363    }
364
365    #[test]
366    async fn handle_ranges_ending_after_last_index() {
367        let last_index = 10;
368
369        let bounds_ending_after_last_index = [
370            (Bound::Unbounded, Bound::Included(last_index + 1)),
371            (Bound::Unbounded, Bound::Excluded(last_index + 2)),
372        ];
373
374        for bound in bounds_ending_after_last_index {
375            to_headers_range(bound, last_index).unwrap_err();
376        }
377    }
378
379    #[test]
380    async fn errors_if_zero_heigth_is_included() {
381        let includes_zero_height = 0..5;
382        to_headers_range(includes_zero_height, 10).unwrap_err();
383    }
384
385    #[test]
386    async fn handle_ranges_starting_precisely_at_last_index() {
387        let last_index = 10;
388
389        let bounds_starting_at_last_index = [
390            (Bound::Included(last_index), Bound::Unbounded),
391            (Bound::Excluded(last_index - 1), Bound::Unbounded),
392        ];
393
394        for bound in bounds_starting_at_last_index {
395            let range = to_headers_range(bound, last_index).unwrap();
396            assert_eq!(*range.start(), last_index);
397        }
398    }
399
400    #[test]
401    async fn handle_ranges_starting_after_last_index() {
402        let last_index = 10;
403
404        let bounds_starting_after_last_index = [
405            (Bound::Included(last_index + 1), Bound::Unbounded),
406            (Bound::Excluded(last_index), Bound::Unbounded),
407        ];
408
409        for bound in bounds_starting_after_last_index {
410            to_headers_range(bound, last_index).unwrap_err();
411        }
412    }
413
414    #[test]
415    async fn handle_ranges_that_lead_to_empty_ranges() {
416        let last_index = 10;
417
418        let bounds_leading_to_empty_range = [
419            (Bound::Unbounded, Bound::Excluded(0)),
420            (Bound::Included(3), Bound::Excluded(3)),
421            (Bound::Included(3), Bound::Included(2)),
422            (Bound::Excluded(2), Bound::Included(2)),
423        ];
424
425        for bound in bounds_leading_to_empty_range {
426            assert!(to_headers_range(bound, last_index).unwrap().is_empty());
427        }
428    }
429
430    #[rstest]
431    #[case::in_memory(new_in_memory_store())]
432    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
433    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
434    #[self::test]
435    async fn test_contains_height<S: Store>(
436        #[case]
437        #[future(awt)]
438        s: S,
439    ) {
440        let mut s = s;
441        fill_store(&mut s, 2).await;
442
443        assert!(!s.has_at(0).await);
444        assert!(s.has_at(1).await);
445        assert!(s.has_at(2).await);
446        assert!(!s.has_at(3).await);
447    }
448
449    #[rstest]
450    #[case::in_memory(new_in_memory_store())]
451    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
452    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
453    #[self::test]
454    async fn test_empty_store<S: Store>(
455        #[case]
456        #[future(awt)]
457        s: S,
458    ) {
459        assert!(matches!(s.head_height().await, Err(StoreError::NotFound)));
460        assert!(matches!(s.get_head().await, Err(StoreError::NotFound)));
461        assert!(matches!(
462            s.get_by_height(1).await,
463            Err(StoreError::NotFound)
464        ));
465        assert!(matches!(
466            s.get_by_hash(&Hash::Sha256([0; 32])).await,
467            Err(StoreError::NotFound)
468        ));
469    }
470
471    #[rstest]
472    #[case::in_memory(new_in_memory_store())]
473    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
474    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
475    #[self::test]
476    async fn test_read_write<S: Store>(
477        #[case]
478        #[future(awt)]
479        s: S,
480    ) {
481        let mut gen = ExtendedHeaderGenerator::new();
482
483        let header = gen.next();
484
485        s.insert(header.clone()).await.unwrap();
486        assert_eq!(s.head_height().await.unwrap(), 1);
487        assert_eq!(s.get_head().await.unwrap(), header);
488        assert_eq!(s.get_by_height(1).await.unwrap(), header);
489        assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
490    }
491
492    #[rstest]
493    #[case::in_memory(new_in_memory_store())]
494    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
495    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
496    #[self::test]
497    async fn test_pregenerated_data<S: Store>(
498        #[case]
499        #[future(awt)]
500        s: S,
501    ) {
502        let mut s = s;
503        fill_store(&mut s, 100).await;
504
505        assert_eq!(s.head_height().await.unwrap(), 100);
506        let head = s.get_head().await.unwrap();
507        assert_eq!(s.get_by_height(100).await.unwrap(), head);
508        assert!(matches!(
509            s.get_by_height(101).await,
510            Err(StoreError::NotFound)
511        ));
512
513        let header = s.get_by_height(54).await.unwrap();
514        assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
515    }
516
517    #[rstest]
518    #[case::in_memory(new_in_memory_store())]
519    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
520    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
521    #[self::test]
522    async fn test_duplicate_insert<S: Store>(
523        #[case]
524        #[future(awt)]
525        s: S,
526    ) {
527        let mut s = s;
528        let mut gen = fill_store(&mut s, 100).await;
529
530        let header101 = gen.next();
531        s.insert(header101.clone()).await.unwrap();
532
533        let error = match s.insert(header101).await {
534            Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
535            res => panic!("Invalid result: {res:?}"),
536        };
537
538        assert_eq!(
539            error,
540            BlockRangesError::BlockRangeOverlap(101..=101, 101..=101)
541        );
542    }
543
544    #[rstest]
545    #[case::in_memory(new_in_memory_store())]
546    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
547    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
548    #[self::test]
549    async fn test_overwrite_height<S: Store>(
550        #[case]
551        #[future(awt)]
552        s: S,
553    ) {
554        let mut s = s;
555        let gen = fill_store(&mut s, 100).await;
556
557        // Height 30 with different hash
558        let header29 = s.get_by_height(29).await.unwrap();
559        let header30 = gen.next_of(&header29);
560
561        let error = match s.insert(header30).await {
562            Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
563            res => panic!("Invalid result: {res:?}"),
564        };
565        assert_eq!(error, BlockRangesError::BlockRangeOverlap(30..=30, 30..=30));
566    }
567
568    #[rstest]
569    #[case::in_memory(new_in_memory_store())]
570    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
571    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
572    #[self::test]
573    async fn test_overwrite_hash<S: Store>(
574        #[case]
575        #[future(awt)]
576        s: S,
577    ) {
578        let mut s = s;
579        fill_store(&mut s, 100).await;
580
581        let mut dup_header = s.get_by_height(99).await.unwrap();
582        dup_header.header.height = Height::from(102u32);
583
584        assert!(matches!(
585            s.insert(dup_header).await,
586            Err(StoreError::InsertionFailed(
587                StoreInsertionError::HashExists(_)
588            ))
589        ));
590    }
591
592    #[rstest]
593    #[case::in_memory(new_in_memory_store())]
594    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
595    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
596    #[self::test]
597    async fn test_append_range<S: Store>(
598        #[case]
599        #[future(awt)]
600        s: S,
601    ) {
602        let mut s = s;
603        let mut gen = fill_store(&mut s, 10).await;
604
605        s.insert(gen.next_many_verified(4)).await.unwrap();
606        s.get_by_height(14).await.unwrap();
607    }
608
609    #[rstest]
610    #[case::in_memory(new_in_memory_store())]
611    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
612    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
613    #[self::test]
614    async fn test_fill_range_gap<S: Store>(
615        #[case]
616        #[future(awt)]
617        s: S,
618    ) {
619        let mut s = s;
620        let mut gen = fill_store(&mut s, 10).await;
621
622        // height 11
623        let skipped = gen.next();
624        // height 12
625        let upcoming_head = gen.next();
626
627        s.insert(upcoming_head).await.unwrap();
628        s.insert(skipped).await.unwrap();
629    }
630
631    #[rstest]
632    #[case::in_memory(new_in_memory_store())]
633    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
634    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
635    #[self::test]
636    async fn test_fill_range_gap_with_invalid_header<S: Store>(
637        #[case]
638        #[future(awt)]
639        s: S,
640    ) {
641        let mut s = s;
642        let mut gen = fill_store(&mut s, 10).await;
643
644        let mut gen_prime = gen.fork();
645        // height 11
646        let _skipped = gen.next();
647        let another_chain = gen_prime.next();
648        // height 12
649        let upcoming_head = gen.next();
650
651        s.insert(upcoming_head).await.unwrap();
652        assert!(matches!(
653            s.insert(another_chain).await,
654            Err(StoreError::InsertionFailed(
655                StoreInsertionError::NeighborsVerificationFailed(_)
656            ))
657        ));
658    }
659
660    #[rstest]
661    #[case::in_memory(new_in_memory_store())]
662    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
663    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
664    #[self::test]
665    async fn test_appends_with_gaps<S: Store>(
666        #[case]
667        #[future(awt)]
668        s: S,
669    ) {
670        let mut gen = ExtendedHeaderGenerator::new_from_height(5);
671        let header5 = gen.next();
672        gen.next_many(4);
673        let header10 = gen.next();
674        gen.next_many(4);
675        let header15 = gen.next();
676
677        s.insert(header5).await.unwrap();
678        s.insert(header15).await.unwrap();
679        s.insert(header10).await.unwrap_err();
680    }
681
682    #[rstest]
683    #[case::in_memory(new_in_memory_store())]
684    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
685    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
686    #[self::test]
687    async fn check_pruned_ranges<S: Store>(
688        #[case]
689        #[future(awt)]
690        s: S,
691    ) {
692        let store = s;
693        let headers = ExtendedHeaderGenerator::new().next_many(10);
694
695        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
696        let pruned_ranges = store.get_pruned_ranges().await.unwrap();
697        assert!(stored_ranges.is_empty());
698        assert!(pruned_ranges.is_empty());
699
700        store.insert(&headers[..]).await.unwrap();
701
702        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
703        let pruned_ranges = store.get_pruned_ranges().await.unwrap();
704        assert_eq!(stored_ranges, new_block_ranges([1..=10]));
705        assert!(pruned_ranges.is_empty());
706
707        store.remove_height(4).await.unwrap();
708        store.remove_height(9).await.unwrap();
709
710        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
711        let pruned_ranges = store.get_pruned_ranges().await.unwrap();
712        assert_eq!(stored_ranges, new_block_ranges([1..=3, 5..=8, 10..=10]));
713        assert_eq!(pruned_ranges, new_block_ranges([4..=4, 9..=9]));
714
715        // Put back height 9
716        store.insert(&headers[8]).await.unwrap();
717
718        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
719        let pruned_ranges = store.get_pruned_ranges().await.unwrap();
720        assert_eq!(stored_ranges, new_block_ranges([1..=3, 5..=10]));
721        assert_eq!(pruned_ranges, new_block_ranges([4..=4]));
722    }
723
724    #[rstest]
725    #[case::in_memory(new_in_memory_store())]
726    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
727    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
728    #[self::test]
729    async fn check_sampled_ranges<S: Store>(
730        #[case]
731        #[future(awt)]
732        s: S,
733    ) {
734        let store = s;
735        let headers = ExtendedHeaderGenerator::new().next_many(10);
736
737        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
738        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
739        assert!(stored_ranges.is_empty());
740        assert!(sampled_ranges.is_empty());
741
742        store.insert(&headers[..]).await.unwrap();
743
744        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
745        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
746        assert_eq!(stored_ranges, new_block_ranges([1..=10]));
747        assert!(sampled_ranges.is_empty());
748
749        store.mark_as_sampled(4).await.unwrap();
750        store.mark_as_sampled(9).await.unwrap();
751
752        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
753        assert_eq!(sampled_ranges, new_block_ranges([4..=4, 9..=9]));
754
755        // Remove header of sampled height
756        store.remove_height(4).await.unwrap();
757
758        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
759        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
760        assert_eq!(stored_ranges, new_block_ranges([1..=3, 5..=10]));
761        assert_eq!(sampled_ranges, new_block_ranges([9..=9]));
762
763        // We do not allow marking when header is missing
764        assert!(matches!(
765            store.mark_as_sampled(4).await,
766            Err(StoreError::NotFound)
767        ));
768    }
769
770    #[rstest]
771    #[case::in_memory(new_in_memory_store())]
772    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
773    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
774    #[self::test]
775    async fn test_sampling_height_empty_store<S: Store>(
776        #[case]
777        #[future(awt)]
778        store: S,
779    ) {
780        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
781        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
782
783        assert_eq!(stored_ranges.len(), 0);
784        assert_eq!(sampled_ranges.len(), 0);
785
786        assert!(matches!(
787            store.mark_as_sampled(0).await,
788            Err(StoreError::NotFound)
789        ));
790        assert!(matches!(
791            store.mark_as_sampled(1).await,
792            Err(StoreError::NotFound)
793        ));
794
795        assert!(matches!(
796            store.update_sampling_metadata(0, vec![]).await,
797            Err(StoreError::NotFound)
798        ));
799        assert!(matches!(
800            store.update_sampling_metadata(1, vec![]).await,
801            Err(StoreError::NotFound)
802        ));
803    }
804
805    #[rstest]
806    #[case::in_memory(new_in_memory_store())]
807    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
808    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
809    #[self::test]
810    async fn test_sampling_merge<S: Store>(
811        #[case]
812        #[future(awt)]
813        s: S,
814    ) {
815        let mut store = s;
816        fill_store(&mut store, 1).await;
817
818        let cid0 = "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ"
819            .parse()
820            .unwrap();
821        let cid1 = "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA"
822            .parse()
823            .unwrap();
824        let cid2 = "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq"
825            .parse()
826            .unwrap();
827
828        // Sampling metadata is not initialized
829        assert!(store.get_sampling_metadata(1).await.unwrap().is_none());
830
831        // Sampling metadata is initialized but empty
832        store.update_sampling_metadata(1, vec![]).await.unwrap();
833        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
834        assert_eq!(sampling_data.cids, vec![]);
835
836        store.update_sampling_metadata(1, vec![cid0]).await.unwrap();
837        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
838        assert_eq!(sampling_data.cids, vec![cid0]);
839
840        store.update_sampling_metadata(1, vec![cid1]).await.unwrap();
841        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
842        assert_eq!(sampling_data.cids, vec![cid0, cid1]);
843
844        store.update_sampling_metadata(1, vec![cid2]).await.unwrap();
845        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
846        assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
847
848        // Updating with empty new CIDs should not change saved CIDs
849        store.update_sampling_metadata(1, vec![]).await.unwrap();
850        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
851        assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
852
853        // Updating with an already existing CIDs should not change saved CIDs
854        store.update_sampling_metadata(1, vec![cid1]).await.unwrap();
855        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
856        assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
857
858        // Updating of sampling metadata should not mark height as sampled
859        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
860        assert!(!sampled_ranges.contains(1));
861    }
862
863    #[rstest]
864    #[case::in_memory(new_in_memory_store())]
865    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
866    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
867    #[self::test]
868    async fn test_sampled_cids<S: Store>(
869        #[case]
870        #[future(awt)]
871        s: S,
872    ) {
873        let mut store = s;
874        fill_store(&mut store, 5).await;
875
876        let cids: Vec<Cid> = [
877            "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq",
878            "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi",
879            "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ",
880            "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA",
881        ]
882        .iter()
883        .map(|s| s.parse().unwrap())
884        .collect();
885
886        store
887            .update_sampling_metadata(1, cids.clone())
888            .await
889            .unwrap();
890        store
891            .update_sampling_metadata(2, cids[0..1].to_vec())
892            .await
893            .unwrap();
894        store
895            .update_sampling_metadata(4, cids[3..].to_vec())
896            .await
897            .unwrap();
898        store.update_sampling_metadata(5, vec![]).await.unwrap();
899
900        let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
901        assert_eq!(sampling_data.cids, cids);
902
903        let sampling_data = store.get_sampling_metadata(2).await.unwrap().unwrap();
904        assert_eq!(sampling_data.cids, cids[0..1]);
905
906        assert!(store.get_sampling_metadata(3).await.unwrap().is_none());
907
908        let sampling_data = store.get_sampling_metadata(4).await.unwrap().unwrap();
909        assert_eq!(sampling_data.cids, cids[3..]);
910
911        let sampling_data = store.get_sampling_metadata(5).await.unwrap().unwrap();
912        assert_eq!(sampling_data.cids, vec![]);
913
914        assert!(matches!(
915            store.get_sampling_metadata(0).await,
916            Err(StoreError::NotFound)
917        ));
918        assert!(matches!(
919            store.get_sampling_metadata(6).await,
920            Err(StoreError::NotFound)
921        ));
922        assert!(matches!(
923            store.get_sampling_metadata(100).await,
924            Err(StoreError::NotFound)
925        ));
926    }
927
928    #[rstest]
929    #[case::in_memory(new_in_memory_store())]
930    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
931    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
932    #[self::test]
933    async fn test_empty_store_range<S: Store>(
934        #[case]
935        #[future(awt)]
936        s: S,
937    ) {
938        let store = s;
939
940        assert_eq!(
941            store.get_stored_header_ranges().await.unwrap().as_ref(),
942            &[]
943        );
944    }
945
946    #[rstest]
947    #[case::in_memory(new_in_memory_store())]
948    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
949    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
950    #[self::test]
951    async fn test_single_header_range<S: Store>(
952        #[case]
953        #[future(awt)]
954        s: S,
955    ) {
956        let store = s;
957        let mut gen = ExtendedHeaderGenerator::new();
958
959        gen.skip(19);
960
961        let prepend0 = gen.next();
962        let prepend1 = gen.next_many_verified(5);
963        store.insert(gen.next_many_verified(4)).await.unwrap();
964        store.insert(gen.next_many_verified(5)).await.unwrap();
965        store.insert(prepend1).await.unwrap();
966        store.insert(prepend0).await.unwrap();
967        store.insert(gen.next_many_verified(5)).await.unwrap();
968        store.insert(gen.next()).await.unwrap();
969
970        let final_ranges = store.get_stored_header_ranges().await.unwrap();
971        assert_eq!(final_ranges.as_ref(), &[20..=40]);
972    }
973
974    // no in-memory store for tests below. It doesn't expect to be resumed from disk,
975    // so it doesn't support multiple ranges.
976    #[rstest]
977    #[case::in_memory(new_in_memory_store())]
978    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
979    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
980    #[self::test]
981    async fn test_ranges_consolidation<S: Store>(
982        #[case]
983        #[future(awt)]
984        s: S,
985    ) {
986        let store = s;
987        let mut gen = ExtendedHeaderGenerator::new();
988
989        gen.skip(9);
990
991        let skip0 = gen.next_many_verified(5);
992        store.insert(gen.next_many_verified(2)).await.unwrap();
993        store.insert(gen.next_many_verified(3)).await.unwrap();
994
995        let skip1 = gen.next();
996        store.insert(gen.next()).await.unwrap();
997
998        let skip2 = gen.next_many_verified(5);
999
1000        store.insert(gen.next()).await.unwrap();
1001
1002        let skip3 = gen.next_many_verified(5);
1003        let skip4 = gen.next_many_verified(5);
1004        let skip5 = gen.next_many_verified(5);
1005
1006        store.insert(skip5).await.unwrap();
1007        store.insert(skip4).await.unwrap();
1008        store.insert(skip3).await.unwrap();
1009        store.insert(skip2).await.unwrap();
1010        store.insert(skip1).await.unwrap();
1011        store.insert(skip0).await.unwrap();
1012
1013        let final_ranges = store.get_stored_header_ranges().await.unwrap();
1014        assert_eq!(final_ranges.as_ref(), &[10..=42]);
1015    }
1016
1017    #[rstest]
1018    #[case::in_memory(new_in_memory_store())]
1019    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1020    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1021    #[self::test]
1022    async fn test_neighbour_validation<S: Store>(
1023        #[case]
1024        #[future(awt)]
1025        s: S,
1026    ) {
1027        let store = s;
1028        let mut gen = ExtendedHeaderGenerator::new();
1029
1030        store.insert(gen.next_many_verified(5)).await.unwrap();
1031        let mut fork = gen.fork();
1032        let _gap = gen.next();
1033        store.insert(gen.next_many_verified(4)).await.unwrap();
1034
1035        store.insert(fork.next()).await.unwrap_err();
1036    }
1037
1038    #[rstest]
1039    #[case::in_memory(new_in_memory_store())]
1040    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1041    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1042    #[self::test]
1043    async fn tail_removal_partial_range<S: Store>(
1044        #[case]
1045        #[future(awt)]
1046        s: S,
1047    ) {
1048        let store = s;
1049        let headers = ExtendedHeaderGenerator::new().next_many(128);
1050
1051        store.insert(&headers[0..64]).await.unwrap();
1052        store.insert(&headers[96..128]).await.unwrap();
1053        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1054
1055        store.remove_height(1).await.unwrap();
1056        assert_store(&store, &headers, new_block_ranges([2..=64, 97..=128])).await;
1057    }
1058
1059    #[rstest]
1060    #[case::in_memory(new_in_memory_store())]
1061    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1062    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1063    #[self::test]
1064    async fn tail_removal_full_range<S: Store>(
1065        #[case]
1066        #[future(awt)]
1067        s: S,
1068    ) {
1069        let store = s;
1070        let headers = ExtendedHeaderGenerator::new().next_many(128);
1071
1072        store.insert(&headers[0..1]).await.unwrap();
1073        store.insert(&headers[65..128]).await.unwrap();
1074        assert_store(&store, &headers, new_block_ranges([1..=1, 66..=128])).await;
1075
1076        store.remove_height(1).await.unwrap();
1077        assert_store(&store, &headers, new_block_ranges([66..=128])).await;
1078    }
1079
1080    #[rstest]
1081    #[case::in_memory(new_in_memory_store())]
1082    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1083    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1084    #[self::test]
1085    async fn tail_removal_remove_all<S: Store>(
1086        #[case]
1087        #[future(awt)]
1088        s: S,
1089    ) {
1090        let store = s;
1091        let headers = ExtendedHeaderGenerator::new().next_many(66);
1092
1093        store.insert(&headers[..]).await.unwrap();
1094        assert_store(&store, &headers, new_block_ranges([1..=66])).await;
1095
1096        for i in 1..=66 {
1097            store.remove_height(i).await.unwrap();
1098        }
1099
1100        let stored_ranges = store.get_stored_header_ranges().await.unwrap();
1101        assert!(stored_ranges.is_empty());
1102
1103        for h in 1..=66 {
1104            assert!(!store.has_at(h).await);
1105        }
1106    }
1107
1108    #[rstest]
1109    #[case::in_memory(new_in_memory_store())]
1110    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1111    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1112    #[self::test]
1113    async fn head_removal_partial_range<S: Store>(
1114        #[case]
1115        #[future(awt)]
1116        s: S,
1117    ) {
1118        let store = s;
1119        let headers = ExtendedHeaderGenerator::new().next_many(128);
1120
1121        store.insert(&headers[0..64]).await.unwrap();
1122        store.insert(&headers[96..128]).await.unwrap();
1123        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1124
1125        store.remove_height(128).await.unwrap();
1126        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=127])).await;
1127    }
1128
1129    #[rstest]
1130    #[case::in_memory(new_in_memory_store())]
1131    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1132    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1133    #[self::test]
1134    async fn head_removal_full_range<S: Store>(
1135        #[case]
1136        #[future(awt)]
1137        s: S,
1138    ) {
1139        let store = s;
1140        let headers = ExtendedHeaderGenerator::new().next_many(128);
1141
1142        store.insert(&headers[0..64]).await.unwrap();
1143        store.insert(&headers[127..128]).await.unwrap();
1144        assert_store(&store, &headers, new_block_ranges([1..=64, 128..=128])).await;
1145
1146        store.remove_height(128).await.unwrap();
1147        assert_store(&store, &headers, new_block_ranges([1..=64])).await;
1148    }
1149
1150    #[rstest]
1151    #[case::in_memory(new_in_memory_store())]
1152    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1153    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1154    #[self::test]
1155    async fn middle_removal<S: Store>(
1156        #[case]
1157        #[future(awt)]
1158        s: S,
1159    ) {
1160        let store = s;
1161        let headers = ExtendedHeaderGenerator::new().next_many(128);
1162
1163        store.insert(&headers[0..64]).await.unwrap();
1164        store.insert(&headers[96..128]).await.unwrap();
1165        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1166
1167        store.remove_height(62).await.unwrap();
1168        assert_store(
1169            &store,
1170            &headers,
1171            new_block_ranges([1..=61, 63..=64, 97..=128]),
1172        )
1173        .await;
1174
1175        store.remove_height(64).await.unwrap();
1176        assert_store(
1177            &store,
1178            &headers,
1179            new_block_ranges([1..=61, 63..=63, 97..=128]),
1180        )
1181        .await;
1182
1183        store.remove_height(63).await.unwrap();
1184        assert_store(&store, &headers, new_block_ranges([1..=61, 97..=128])).await;
1185    }
1186
1187    #[rstest]
1188    #[case::in_memory(new_in_memory_store())]
1189    #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1190    #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1191    #[self::test]
1192    async fn neighbor_removal<S: Store>(
1193        #[case]
1194        #[future(awt)]
1195        s: S,
1196    ) {
1197        let store = s;
1198        let headers = ExtendedHeaderGenerator::new().next_many(128);
1199
1200        store.insert(&headers[0..64]).await.unwrap();
1201        store.insert(&headers[96..128]).await.unwrap();
1202        assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1203
1204        store.remove_height(64).await.unwrap();
1205        assert_store(&store, &headers, new_block_ranges([1..=63, 97..=128])).await;
1206
1207        store.remove_height(97).await.unwrap();
1208        assert_store(&store, &headers, new_block_ranges([1..=63, 98..=128])).await;
1209    }
1210
1211    /// Fills an empty store
1212    async fn fill_store<S: Store>(store: &mut S, amount: u64) -> ExtendedHeaderGenerator {
1213        assert!(!store.has_at(1).await, "Store is not empty");
1214
1215        let mut gen = ExtendedHeaderGenerator::new();
1216
1217        store
1218            .insert(gen.next_many_verified(amount))
1219            .await
1220            .expect("inserting test data failed");
1221
1222        gen
1223    }
1224
1225    async fn new_in_memory_store() -> InMemoryStore {
1226        InMemoryStore::new()
1227    }
1228
1229    pub(crate) async fn assert_store<S: Store>(
1230        store: &S,
1231        headers: &[ExtendedHeader],
1232        expected_ranges: BlockRanges,
1233    ) {
1234        assert_eq!(
1235            store.get_stored_header_ranges().await.unwrap(),
1236            expected_ranges
1237        );
1238        for header in headers {
1239            let height = header.height().value();
1240            if expected_ranges.contains(height) {
1241                assert_eq!(&store.get_by_height(height).await.unwrap(), header);
1242                assert_eq!(&store.get_by_hash(&header.hash()).await.unwrap(), header);
1243            } else {
1244                assert!(matches!(
1245                    store.get_by_height(height).await.unwrap_err(),
1246                    StoreError::NotFound
1247                ));
1248                assert!(matches!(
1249                    store.get_by_hash(&header.hash()).await.unwrap_err(),
1250                    StoreError::NotFound
1251                ));
1252            }
1253        }
1254    }
1255
1256    #[cfg(not(target_arch = "wasm32"))]
1257    async fn new_redb_store() -> RedbStore {
1258        RedbStore::in_memory().await.unwrap()
1259    }
1260
1261    #[cfg(target_arch = "wasm32")]
1262    async fn new_indexed_db_store() -> IndexedDbStore {
1263        let store_name = crate::test_utils::new_indexed_db_store_name().await;
1264
1265        IndexedDbStore::new(&store_name)
1266            .await
1267            .expect("creating test store failed")
1268    }
1269}