1use std::convert::Infallible;
4use std::fmt::{Debug, Display};
5use std::io::Cursor;
6use std::ops::{Bound, RangeBounds, RangeInclusive};
7
8use async_trait::async_trait;
9use celestia_types::hash::Hash;
10use celestia_types::ExtendedHeader;
11use cid::Cid;
12use prost::Message;
13use serde::{Deserialize, Serialize};
14use tendermint_proto::Protobuf;
15use thiserror::Error;
16#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
17use wasm_bindgen::prelude::*;
18
19pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError};
20pub use crate::store::either_store::EitherStore;
21pub use crate::store::utils::VerifiedExtendedHeaders;
22
23pub use in_memory_store::InMemoryStore;
24#[cfg(target_arch = "wasm32")]
25pub use indexed_db_store::IndexedDbStore;
26#[cfg(not(target_arch = "wasm32"))]
27pub use redb_store::RedbStore;
28
29mod either_store;
30mod in_memory_store;
31#[cfg(target_arch = "wasm32")]
32mod indexed_db_store;
33#[cfg(not(target_arch = "wasm32"))]
34mod redb_store;
35
36pub(crate) mod utils;
37
38#[derive(Debug, Default, Clone, Serialize, Deserialize)]
42#[cfg_attr(all(feature = "wasm-bindgen", target_arch = "wasm32"), wasm_bindgen)]
43pub struct SamplingMetadata {
44 #[cfg_attr(
47 all(feature = "wasm-bindgen", target_arch = "wasm32"),
48 wasm_bindgen(skip)
49 )]
50 pub cids: Vec<Cid>,
51}
52
53type Result<T, E = StoreError> = std::result::Result<T, E>;
54
55#[async_trait]
60pub trait Store: Send + Sync + Debug {
61 async fn get_head(&self) -> Result<ExtendedHeader>;
63
64 async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader>;
66
67 async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader>;
69
70 async fn wait_new_head(&self) -> u64;
72
73 async fn wait_height(&self, height: u64) -> Result<()>;
75
76 async fn get_range<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
87 where
88 R: RangeBounds<u64> + Send,
89 {
90 let head_height = self.head_height().await?;
91 let range = to_headers_range(range, head_height)?;
92
93 let amount = if range.is_empty() {
94 0
95 } else {
96 range.end() - range.start() + 1 };
98
99 let mut headers = Vec::with_capacity(amount.try_into().unwrap_or(usize::MAX));
100
101 for height in range {
102 let header = self.get_by_height(height).await?;
103 headers.push(header);
104 }
105
106 Ok(headers)
107 }
108
109 async fn head_height(&self) -> Result<u64>;
111
112 async fn has(&self, hash: &Hash) -> bool;
114
115 async fn has_at(&self, height: u64) -> bool;
117
118 async fn update_sampling_metadata(&self, height: u64, cids: Vec<Cid>) -> Result<()>;
123
124 async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>>;
131
132 async fn mark_as_sampled(&self, height: u64) -> Result<()>;
134
135 async fn insert<R>(&self, headers: R) -> Result<()>
140 where
141 R: TryInto<VerifiedExtendedHeaders> + Send,
142 <R as TryInto<VerifiedExtendedHeaders>>::Error: Display;
143
144 async fn get_stored_header_ranges(&self) -> Result<BlockRanges>;
146
147 async fn get_sampled_ranges(&self) -> Result<BlockRanges>;
149
150 async fn get_pruned_ranges(&self) -> Result<BlockRanges>;
152
153 async fn remove_height(&self, height: u64) -> Result<()>;
155
156 async fn close(self) -> Result<()>;
158}
159
160#[derive(Error, Debug)]
162pub enum StoreError {
163 #[error("Header not found in store")]
165 NotFound,
166
167 #[error("Insertion failed: {0}")]
169 InsertionFailed(#[from] StoreInsertionError),
170
171 #[error("Stored data are inconsistent or invalid, try reseting the store: {0}")]
173 StoredDataError(String),
174
175 #[error("Database reported unrecoverable error: {0}")]
177 FatalDatabaseError(String),
178
179 #[error("Received error from executor: {0}")]
181 ExecutorError(String),
182
183 #[error("Error opening store: {0}")]
185 OpenFailed(String),
186}
187
188#[derive(Error, Debug)]
190pub enum StoreInsertionError {
191 #[error("Provided headers failed verification: {0}")]
193 HeadersVerificationFailed(String),
194
195 #[error("Provided headers failed to be verified with existing neighbors: {0}")]
197 NeighborsVerificationFailed(String),
198
199 #[error("Contraints not met: {0}")]
201 ContraintsNotMet(BlockRangesError),
202
203 #[error("Hash {0} already exists in store")]
208 HashExists(Hash),
209}
210
211impl StoreError {
212 pub(crate) fn is_fatal(&self) -> bool {
214 match self {
215 StoreError::StoredDataError(_)
216 | StoreError::FatalDatabaseError(_)
217 | StoreError::ExecutorError(_)
218 | StoreError::OpenFailed(_) => true,
219 StoreError::NotFound | StoreError::InsertionFailed(_) => false,
220 }
221 }
222}
223
224#[cfg(not(target_arch = "wasm32"))]
225impl From<tokio::task::JoinError> for StoreError {
226 fn from(error: tokio::task::JoinError) -> StoreError {
227 StoreError::ExecutorError(error.to_string())
228 }
229}
230
231impl From<Infallible> for StoreError {
233 fn from(_: Infallible) -> Self {
234 unreachable!("Infallible failed")
236 }
237}
238
239#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
240#[wasm_bindgen]
241impl SamplingMetadata {
242 #[wasm_bindgen(getter)]
244 pub fn cids(&self) -> Vec<js_sys::Uint8Array> {
245 self.cids
246 .iter()
247 .map(|cid| js_sys::Uint8Array::from(cid.to_bytes().as_ref()))
248 .collect()
249 }
250}
251
252#[derive(Message)]
253struct RawSamplingMetadata {
254 #[prost(message, repeated, tag = "2")]
257 cids: Vec<Vec<u8>>,
258}
259
260impl Protobuf<RawSamplingMetadata> for SamplingMetadata {}
261
262impl TryFrom<RawSamplingMetadata> for SamplingMetadata {
263 type Error = cid::Error;
264
265 fn try_from(item: RawSamplingMetadata) -> Result<Self, Self::Error> {
266 let cids = item
267 .cids
268 .iter()
269 .map(|cid| {
270 let buffer = Cursor::new(cid);
271 Cid::read_bytes(buffer)
272 })
273 .collect::<Result<_, _>>()?;
274
275 Ok(SamplingMetadata { cids })
276 }
277}
278
279impl From<SamplingMetadata> for RawSamplingMetadata {
280 fn from(item: SamplingMetadata) -> Self {
281 let cids = item.cids.iter().map(|cid| cid.to_bytes()).collect();
282
283 RawSamplingMetadata { cids }
284 }
285}
286
287fn to_headers_range(bounds: impl RangeBounds<u64>, last_index: u64) -> Result<RangeInclusive<u64>> {
289 let start = match bounds.start_bound() {
290 Bound::Unbounded => 1,
292 Bound::Included(&x) if x > last_index || x == 0 => return Err(StoreError::NotFound),
294 Bound::Excluded(&x) if x >= last_index => return Err(StoreError::NotFound),
295 Bound::Included(&x) => x,
297 Bound::Excluded(&x) => x + 1, };
299 let end = match bounds.end_bound() {
300 Bound::Unbounded => last_index,
302 Bound::Included(&x) if x > last_index => return Err(StoreError::NotFound),
304 Bound::Excluded(&x) if x > last_index + 1 => return Err(StoreError::NotFound),
305 Bound::Excluded(&0) => 0,
307 Bound::Included(&x) => x,
309 Bound::Excluded(&x) => x - 1,
310 };
311
312 Ok(start..=end)
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use crate::test_utils::ExtendedHeaderGeneratorExt;
319 use celestia_types::test_utils::ExtendedHeaderGenerator;
320 use celestia_types::Height;
321 use rstest::rstest;
322 use lumina_utils::test_utils::async_test as test;
325
326 use crate::test_utils::new_block_ranges;
327
328 #[test]
329 async fn converts_bounded_ranges() {
330 assert_eq!(1..=15, to_headers_range(1..16, 100).unwrap());
331 assert_eq!(1..=15, to_headers_range(1..=15, 100).unwrap());
332 assert_eq!(300..=400, to_headers_range(300..401, 500).unwrap());
333 assert_eq!(300..=400, to_headers_range(300..=400, 500).unwrap());
334 }
335
336 #[test]
337 async fn starts_from_one_when_unbounded_start() {
338 assert_eq!(&1, to_headers_range(..=10, 100).unwrap().start());
339 assert_eq!(&1, to_headers_range(..10, 100).unwrap().start());
340 assert_eq!(&1, to_headers_range(.., 100).unwrap().start());
341 }
342
343 #[test]
344 async fn ends_on_last_index_when_unbounded_end() {
345 assert_eq!(&10, to_headers_range(1.., 10).unwrap().end());
346 assert_eq!(&11, to_headers_range(1.., 11).unwrap().end());
347 assert_eq!(&10, to_headers_range(.., 10).unwrap().end());
348 }
349
350 #[test]
351 async fn handle_ranges_ending_precisely_at_last_index() {
352 let last_index = 10;
353
354 let bounds_ending_at_last_index = [
355 (Bound::Unbounded, Bound::Included(last_index)),
356 (Bound::Unbounded, Bound::Excluded(last_index + 1)),
357 ];
358
359 for bound in bounds_ending_at_last_index {
360 let range = to_headers_range(bound, last_index).unwrap();
361 assert_eq!(*range.end(), last_index);
362 }
363 }
364
365 #[test]
366 async fn handle_ranges_ending_after_last_index() {
367 let last_index = 10;
368
369 let bounds_ending_after_last_index = [
370 (Bound::Unbounded, Bound::Included(last_index + 1)),
371 (Bound::Unbounded, Bound::Excluded(last_index + 2)),
372 ];
373
374 for bound in bounds_ending_after_last_index {
375 to_headers_range(bound, last_index).unwrap_err();
376 }
377 }
378
379 #[test]
380 async fn errors_if_zero_heigth_is_included() {
381 let includes_zero_height = 0..5;
382 to_headers_range(includes_zero_height, 10).unwrap_err();
383 }
384
385 #[test]
386 async fn handle_ranges_starting_precisely_at_last_index() {
387 let last_index = 10;
388
389 let bounds_starting_at_last_index = [
390 (Bound::Included(last_index), Bound::Unbounded),
391 (Bound::Excluded(last_index - 1), Bound::Unbounded),
392 ];
393
394 for bound in bounds_starting_at_last_index {
395 let range = to_headers_range(bound, last_index).unwrap();
396 assert_eq!(*range.start(), last_index);
397 }
398 }
399
400 #[test]
401 async fn handle_ranges_starting_after_last_index() {
402 let last_index = 10;
403
404 let bounds_starting_after_last_index = [
405 (Bound::Included(last_index + 1), Bound::Unbounded),
406 (Bound::Excluded(last_index), Bound::Unbounded),
407 ];
408
409 for bound in bounds_starting_after_last_index {
410 to_headers_range(bound, last_index).unwrap_err();
411 }
412 }
413
414 #[test]
415 async fn handle_ranges_that_lead_to_empty_ranges() {
416 let last_index = 10;
417
418 let bounds_leading_to_empty_range = [
419 (Bound::Unbounded, Bound::Excluded(0)),
420 (Bound::Included(3), Bound::Excluded(3)),
421 (Bound::Included(3), Bound::Included(2)),
422 (Bound::Excluded(2), Bound::Included(2)),
423 ];
424
425 for bound in bounds_leading_to_empty_range {
426 assert!(to_headers_range(bound, last_index).unwrap().is_empty());
427 }
428 }
429
430 #[rstest]
431 #[case::in_memory(new_in_memory_store())]
432 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
433 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
434 #[self::test]
435 async fn test_contains_height<S: Store>(
436 #[case]
437 #[future(awt)]
438 s: S,
439 ) {
440 let mut s = s;
441 fill_store(&mut s, 2).await;
442
443 assert!(!s.has_at(0).await);
444 assert!(s.has_at(1).await);
445 assert!(s.has_at(2).await);
446 assert!(!s.has_at(3).await);
447 }
448
449 #[rstest]
450 #[case::in_memory(new_in_memory_store())]
451 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
452 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
453 #[self::test]
454 async fn test_empty_store<S: Store>(
455 #[case]
456 #[future(awt)]
457 s: S,
458 ) {
459 assert!(matches!(s.head_height().await, Err(StoreError::NotFound)));
460 assert!(matches!(s.get_head().await, Err(StoreError::NotFound)));
461 assert!(matches!(
462 s.get_by_height(1).await,
463 Err(StoreError::NotFound)
464 ));
465 assert!(matches!(
466 s.get_by_hash(&Hash::Sha256([0; 32])).await,
467 Err(StoreError::NotFound)
468 ));
469 }
470
471 #[rstest]
472 #[case::in_memory(new_in_memory_store())]
473 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
474 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
475 #[self::test]
476 async fn test_read_write<S: Store>(
477 #[case]
478 #[future(awt)]
479 s: S,
480 ) {
481 let mut gen = ExtendedHeaderGenerator::new();
482
483 let header = gen.next();
484
485 s.insert(header.clone()).await.unwrap();
486 assert_eq!(s.head_height().await.unwrap(), 1);
487 assert_eq!(s.get_head().await.unwrap(), header);
488 assert_eq!(s.get_by_height(1).await.unwrap(), header);
489 assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
490 }
491
492 #[rstest]
493 #[case::in_memory(new_in_memory_store())]
494 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
495 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
496 #[self::test]
497 async fn test_pregenerated_data<S: Store>(
498 #[case]
499 #[future(awt)]
500 s: S,
501 ) {
502 let mut s = s;
503 fill_store(&mut s, 100).await;
504
505 assert_eq!(s.head_height().await.unwrap(), 100);
506 let head = s.get_head().await.unwrap();
507 assert_eq!(s.get_by_height(100).await.unwrap(), head);
508 assert!(matches!(
509 s.get_by_height(101).await,
510 Err(StoreError::NotFound)
511 ));
512
513 let header = s.get_by_height(54).await.unwrap();
514 assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
515 }
516
517 #[rstest]
518 #[case::in_memory(new_in_memory_store())]
519 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
520 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
521 #[self::test]
522 async fn test_duplicate_insert<S: Store>(
523 #[case]
524 #[future(awt)]
525 s: S,
526 ) {
527 let mut s = s;
528 let mut gen = fill_store(&mut s, 100).await;
529
530 let header101 = gen.next();
531 s.insert(header101.clone()).await.unwrap();
532
533 let error = match s.insert(header101).await {
534 Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
535 res => panic!("Invalid result: {res:?}"),
536 };
537
538 assert_eq!(
539 error,
540 BlockRangesError::BlockRangeOverlap(101..=101, 101..=101)
541 );
542 }
543
544 #[rstest]
545 #[case::in_memory(new_in_memory_store())]
546 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
547 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
548 #[self::test]
549 async fn test_overwrite_height<S: Store>(
550 #[case]
551 #[future(awt)]
552 s: S,
553 ) {
554 let mut s = s;
555 let gen = fill_store(&mut s, 100).await;
556
557 let header29 = s.get_by_height(29).await.unwrap();
559 let header30 = gen.next_of(&header29);
560
561 let error = match s.insert(header30).await {
562 Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
563 res => panic!("Invalid result: {res:?}"),
564 };
565 assert_eq!(error, BlockRangesError::BlockRangeOverlap(30..=30, 30..=30));
566 }
567
568 #[rstest]
569 #[case::in_memory(new_in_memory_store())]
570 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
571 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
572 #[self::test]
573 async fn test_overwrite_hash<S: Store>(
574 #[case]
575 #[future(awt)]
576 s: S,
577 ) {
578 let mut s = s;
579 fill_store(&mut s, 100).await;
580
581 let mut dup_header = s.get_by_height(99).await.unwrap();
582 dup_header.header.height = Height::from(102u32);
583
584 assert!(matches!(
585 s.insert(dup_header).await,
586 Err(StoreError::InsertionFailed(
587 StoreInsertionError::HashExists(_)
588 ))
589 ));
590 }
591
592 #[rstest]
593 #[case::in_memory(new_in_memory_store())]
594 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
595 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
596 #[self::test]
597 async fn test_append_range<S: Store>(
598 #[case]
599 #[future(awt)]
600 s: S,
601 ) {
602 let mut s = s;
603 let mut gen = fill_store(&mut s, 10).await;
604
605 s.insert(gen.next_many_verified(4)).await.unwrap();
606 s.get_by_height(14).await.unwrap();
607 }
608
609 #[rstest]
610 #[case::in_memory(new_in_memory_store())]
611 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
612 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
613 #[self::test]
614 async fn test_fill_range_gap<S: Store>(
615 #[case]
616 #[future(awt)]
617 s: S,
618 ) {
619 let mut s = s;
620 let mut gen = fill_store(&mut s, 10).await;
621
622 let skipped = gen.next();
624 let upcoming_head = gen.next();
626
627 s.insert(upcoming_head).await.unwrap();
628 s.insert(skipped).await.unwrap();
629 }
630
631 #[rstest]
632 #[case::in_memory(new_in_memory_store())]
633 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
634 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
635 #[self::test]
636 async fn test_fill_range_gap_with_invalid_header<S: Store>(
637 #[case]
638 #[future(awt)]
639 s: S,
640 ) {
641 let mut s = s;
642 let mut gen = fill_store(&mut s, 10).await;
643
644 let mut gen_prime = gen.fork();
645 let _skipped = gen.next();
647 let another_chain = gen_prime.next();
648 let upcoming_head = gen.next();
650
651 s.insert(upcoming_head).await.unwrap();
652 assert!(matches!(
653 s.insert(another_chain).await,
654 Err(StoreError::InsertionFailed(
655 StoreInsertionError::NeighborsVerificationFailed(_)
656 ))
657 ));
658 }
659
660 #[rstest]
661 #[case::in_memory(new_in_memory_store())]
662 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
663 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
664 #[self::test]
665 async fn test_appends_with_gaps<S: Store>(
666 #[case]
667 #[future(awt)]
668 s: S,
669 ) {
670 let mut gen = ExtendedHeaderGenerator::new_from_height(5);
671 let header5 = gen.next();
672 gen.next_many(4);
673 let header10 = gen.next();
674 gen.next_many(4);
675 let header15 = gen.next();
676
677 s.insert(header5).await.unwrap();
678 s.insert(header15).await.unwrap();
679 s.insert(header10).await.unwrap_err();
680 }
681
682 #[rstest]
683 #[case::in_memory(new_in_memory_store())]
684 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
685 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
686 #[self::test]
687 async fn check_pruned_ranges<S: Store>(
688 #[case]
689 #[future(awt)]
690 s: S,
691 ) {
692 let store = s;
693 let headers = ExtendedHeaderGenerator::new().next_many(10);
694
695 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
696 let pruned_ranges = store.get_pruned_ranges().await.unwrap();
697 assert!(stored_ranges.is_empty());
698 assert!(pruned_ranges.is_empty());
699
700 store.insert(&headers[..]).await.unwrap();
701
702 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
703 let pruned_ranges = store.get_pruned_ranges().await.unwrap();
704 assert_eq!(stored_ranges, new_block_ranges([1..=10]));
705 assert!(pruned_ranges.is_empty());
706
707 store.remove_height(4).await.unwrap();
708 store.remove_height(9).await.unwrap();
709
710 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
711 let pruned_ranges = store.get_pruned_ranges().await.unwrap();
712 assert_eq!(stored_ranges, new_block_ranges([1..=3, 5..=8, 10..=10]));
713 assert_eq!(pruned_ranges, new_block_ranges([4..=4, 9..=9]));
714
715 store.insert(&headers[8]).await.unwrap();
717
718 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
719 let pruned_ranges = store.get_pruned_ranges().await.unwrap();
720 assert_eq!(stored_ranges, new_block_ranges([1..=3, 5..=10]));
721 assert_eq!(pruned_ranges, new_block_ranges([4..=4]));
722 }
723
724 #[rstest]
725 #[case::in_memory(new_in_memory_store())]
726 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
727 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
728 #[self::test]
729 async fn check_sampled_ranges<S: Store>(
730 #[case]
731 #[future(awt)]
732 s: S,
733 ) {
734 let store = s;
735 let headers = ExtendedHeaderGenerator::new().next_many(10);
736
737 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
738 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
739 assert!(stored_ranges.is_empty());
740 assert!(sampled_ranges.is_empty());
741
742 store.insert(&headers[..]).await.unwrap();
743
744 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
745 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
746 assert_eq!(stored_ranges, new_block_ranges([1..=10]));
747 assert!(sampled_ranges.is_empty());
748
749 store.mark_as_sampled(4).await.unwrap();
750 store.mark_as_sampled(9).await.unwrap();
751
752 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
753 assert_eq!(sampled_ranges, new_block_ranges([4..=4, 9..=9]));
754
755 store.remove_height(4).await.unwrap();
757
758 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
759 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
760 assert_eq!(stored_ranges, new_block_ranges([1..=3, 5..=10]));
761 assert_eq!(sampled_ranges, new_block_ranges([9..=9]));
762
763 assert!(matches!(
765 store.mark_as_sampled(4).await,
766 Err(StoreError::NotFound)
767 ));
768 }
769
770 #[rstest]
771 #[case::in_memory(new_in_memory_store())]
772 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
773 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
774 #[self::test]
775 async fn test_sampling_height_empty_store<S: Store>(
776 #[case]
777 #[future(awt)]
778 store: S,
779 ) {
780 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
781 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
782
783 assert_eq!(stored_ranges.len(), 0);
784 assert_eq!(sampled_ranges.len(), 0);
785
786 assert!(matches!(
787 store.mark_as_sampled(0).await,
788 Err(StoreError::NotFound)
789 ));
790 assert!(matches!(
791 store.mark_as_sampled(1).await,
792 Err(StoreError::NotFound)
793 ));
794
795 assert!(matches!(
796 store.update_sampling_metadata(0, vec![]).await,
797 Err(StoreError::NotFound)
798 ));
799 assert!(matches!(
800 store.update_sampling_metadata(1, vec![]).await,
801 Err(StoreError::NotFound)
802 ));
803 }
804
805 #[rstest]
806 #[case::in_memory(new_in_memory_store())]
807 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
808 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
809 #[self::test]
810 async fn test_sampling_merge<S: Store>(
811 #[case]
812 #[future(awt)]
813 s: S,
814 ) {
815 let mut store = s;
816 fill_store(&mut store, 1).await;
817
818 let cid0 = "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ"
819 .parse()
820 .unwrap();
821 let cid1 = "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA"
822 .parse()
823 .unwrap();
824 let cid2 = "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq"
825 .parse()
826 .unwrap();
827
828 assert!(store.get_sampling_metadata(1).await.unwrap().is_none());
830
831 store.update_sampling_metadata(1, vec![]).await.unwrap();
833 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
834 assert_eq!(sampling_data.cids, vec![]);
835
836 store.update_sampling_metadata(1, vec![cid0]).await.unwrap();
837 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
838 assert_eq!(sampling_data.cids, vec![cid0]);
839
840 store.update_sampling_metadata(1, vec![cid1]).await.unwrap();
841 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
842 assert_eq!(sampling_data.cids, vec![cid0, cid1]);
843
844 store.update_sampling_metadata(1, vec![cid2]).await.unwrap();
845 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
846 assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
847
848 store.update_sampling_metadata(1, vec![]).await.unwrap();
850 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
851 assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
852
853 store.update_sampling_metadata(1, vec![cid1]).await.unwrap();
855 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
856 assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
857
858 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
860 assert!(!sampled_ranges.contains(1));
861 }
862
863 #[rstest]
864 #[case::in_memory(new_in_memory_store())]
865 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
866 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
867 #[self::test]
868 async fn test_sampled_cids<S: Store>(
869 #[case]
870 #[future(awt)]
871 s: S,
872 ) {
873 let mut store = s;
874 fill_store(&mut store, 5).await;
875
876 let cids: Vec<Cid> = [
877 "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq",
878 "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi",
879 "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ",
880 "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA",
881 ]
882 .iter()
883 .map(|s| s.parse().unwrap())
884 .collect();
885
886 store
887 .update_sampling_metadata(1, cids.clone())
888 .await
889 .unwrap();
890 store
891 .update_sampling_metadata(2, cids[0..1].to_vec())
892 .await
893 .unwrap();
894 store
895 .update_sampling_metadata(4, cids[3..].to_vec())
896 .await
897 .unwrap();
898 store.update_sampling_metadata(5, vec![]).await.unwrap();
899
900 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
901 assert_eq!(sampling_data.cids, cids);
902
903 let sampling_data = store.get_sampling_metadata(2).await.unwrap().unwrap();
904 assert_eq!(sampling_data.cids, cids[0..1]);
905
906 assert!(store.get_sampling_metadata(3).await.unwrap().is_none());
907
908 let sampling_data = store.get_sampling_metadata(4).await.unwrap().unwrap();
909 assert_eq!(sampling_data.cids, cids[3..]);
910
911 let sampling_data = store.get_sampling_metadata(5).await.unwrap().unwrap();
912 assert_eq!(sampling_data.cids, vec![]);
913
914 assert!(matches!(
915 store.get_sampling_metadata(0).await,
916 Err(StoreError::NotFound)
917 ));
918 assert!(matches!(
919 store.get_sampling_metadata(6).await,
920 Err(StoreError::NotFound)
921 ));
922 assert!(matches!(
923 store.get_sampling_metadata(100).await,
924 Err(StoreError::NotFound)
925 ));
926 }
927
928 #[rstest]
929 #[case::in_memory(new_in_memory_store())]
930 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
931 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
932 #[self::test]
933 async fn test_empty_store_range<S: Store>(
934 #[case]
935 #[future(awt)]
936 s: S,
937 ) {
938 let store = s;
939
940 assert_eq!(
941 store.get_stored_header_ranges().await.unwrap().as_ref(),
942 &[]
943 );
944 }
945
946 #[rstest]
947 #[case::in_memory(new_in_memory_store())]
948 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
949 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
950 #[self::test]
951 async fn test_single_header_range<S: Store>(
952 #[case]
953 #[future(awt)]
954 s: S,
955 ) {
956 let store = s;
957 let mut gen = ExtendedHeaderGenerator::new();
958
959 gen.skip(19);
960
961 let prepend0 = gen.next();
962 let prepend1 = gen.next_many_verified(5);
963 store.insert(gen.next_many_verified(4)).await.unwrap();
964 store.insert(gen.next_many_verified(5)).await.unwrap();
965 store.insert(prepend1).await.unwrap();
966 store.insert(prepend0).await.unwrap();
967 store.insert(gen.next_many_verified(5)).await.unwrap();
968 store.insert(gen.next()).await.unwrap();
969
970 let final_ranges = store.get_stored_header_ranges().await.unwrap();
971 assert_eq!(final_ranges.as_ref(), &[20..=40]);
972 }
973
974 #[rstest]
977 #[case::in_memory(new_in_memory_store())]
978 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
979 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
980 #[self::test]
981 async fn test_ranges_consolidation<S: Store>(
982 #[case]
983 #[future(awt)]
984 s: S,
985 ) {
986 let store = s;
987 let mut gen = ExtendedHeaderGenerator::new();
988
989 gen.skip(9);
990
991 let skip0 = gen.next_many_verified(5);
992 store.insert(gen.next_many_verified(2)).await.unwrap();
993 store.insert(gen.next_many_verified(3)).await.unwrap();
994
995 let skip1 = gen.next();
996 store.insert(gen.next()).await.unwrap();
997
998 let skip2 = gen.next_many_verified(5);
999
1000 store.insert(gen.next()).await.unwrap();
1001
1002 let skip3 = gen.next_many_verified(5);
1003 let skip4 = gen.next_many_verified(5);
1004 let skip5 = gen.next_many_verified(5);
1005
1006 store.insert(skip5).await.unwrap();
1007 store.insert(skip4).await.unwrap();
1008 store.insert(skip3).await.unwrap();
1009 store.insert(skip2).await.unwrap();
1010 store.insert(skip1).await.unwrap();
1011 store.insert(skip0).await.unwrap();
1012
1013 let final_ranges = store.get_stored_header_ranges().await.unwrap();
1014 assert_eq!(final_ranges.as_ref(), &[10..=42]);
1015 }
1016
1017 #[rstest]
1018 #[case::in_memory(new_in_memory_store())]
1019 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1020 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1021 #[self::test]
1022 async fn test_neighbour_validation<S: Store>(
1023 #[case]
1024 #[future(awt)]
1025 s: S,
1026 ) {
1027 let store = s;
1028 let mut gen = ExtendedHeaderGenerator::new();
1029
1030 store.insert(gen.next_many_verified(5)).await.unwrap();
1031 let mut fork = gen.fork();
1032 let _gap = gen.next();
1033 store.insert(gen.next_many_verified(4)).await.unwrap();
1034
1035 store.insert(fork.next()).await.unwrap_err();
1036 }
1037
1038 #[rstest]
1039 #[case::in_memory(new_in_memory_store())]
1040 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1041 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1042 #[self::test]
1043 async fn tail_removal_partial_range<S: Store>(
1044 #[case]
1045 #[future(awt)]
1046 s: S,
1047 ) {
1048 let store = s;
1049 let headers = ExtendedHeaderGenerator::new().next_many(128);
1050
1051 store.insert(&headers[0..64]).await.unwrap();
1052 store.insert(&headers[96..128]).await.unwrap();
1053 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1054
1055 store.remove_height(1).await.unwrap();
1056 assert_store(&store, &headers, new_block_ranges([2..=64, 97..=128])).await;
1057 }
1058
1059 #[rstest]
1060 #[case::in_memory(new_in_memory_store())]
1061 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1062 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1063 #[self::test]
1064 async fn tail_removal_full_range<S: Store>(
1065 #[case]
1066 #[future(awt)]
1067 s: S,
1068 ) {
1069 let store = s;
1070 let headers = ExtendedHeaderGenerator::new().next_many(128);
1071
1072 store.insert(&headers[0..1]).await.unwrap();
1073 store.insert(&headers[65..128]).await.unwrap();
1074 assert_store(&store, &headers, new_block_ranges([1..=1, 66..=128])).await;
1075
1076 store.remove_height(1).await.unwrap();
1077 assert_store(&store, &headers, new_block_ranges([66..=128])).await;
1078 }
1079
1080 #[rstest]
1081 #[case::in_memory(new_in_memory_store())]
1082 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1083 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1084 #[self::test]
1085 async fn tail_removal_remove_all<S: Store>(
1086 #[case]
1087 #[future(awt)]
1088 s: S,
1089 ) {
1090 let store = s;
1091 let headers = ExtendedHeaderGenerator::new().next_many(66);
1092
1093 store.insert(&headers[..]).await.unwrap();
1094 assert_store(&store, &headers, new_block_ranges([1..=66])).await;
1095
1096 for i in 1..=66 {
1097 store.remove_height(i).await.unwrap();
1098 }
1099
1100 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
1101 assert!(stored_ranges.is_empty());
1102
1103 for h in 1..=66 {
1104 assert!(!store.has_at(h).await);
1105 }
1106 }
1107
1108 #[rstest]
1109 #[case::in_memory(new_in_memory_store())]
1110 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1111 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1112 #[self::test]
1113 async fn head_removal_partial_range<S: Store>(
1114 #[case]
1115 #[future(awt)]
1116 s: S,
1117 ) {
1118 let store = s;
1119 let headers = ExtendedHeaderGenerator::new().next_many(128);
1120
1121 store.insert(&headers[0..64]).await.unwrap();
1122 store.insert(&headers[96..128]).await.unwrap();
1123 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1124
1125 store.remove_height(128).await.unwrap();
1126 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=127])).await;
1127 }
1128
1129 #[rstest]
1130 #[case::in_memory(new_in_memory_store())]
1131 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1132 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1133 #[self::test]
1134 async fn head_removal_full_range<S: Store>(
1135 #[case]
1136 #[future(awt)]
1137 s: S,
1138 ) {
1139 let store = s;
1140 let headers = ExtendedHeaderGenerator::new().next_many(128);
1141
1142 store.insert(&headers[0..64]).await.unwrap();
1143 store.insert(&headers[127..128]).await.unwrap();
1144 assert_store(&store, &headers, new_block_ranges([1..=64, 128..=128])).await;
1145
1146 store.remove_height(128).await.unwrap();
1147 assert_store(&store, &headers, new_block_ranges([1..=64])).await;
1148 }
1149
1150 #[rstest]
1151 #[case::in_memory(new_in_memory_store())]
1152 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1153 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1154 #[self::test]
1155 async fn middle_removal<S: Store>(
1156 #[case]
1157 #[future(awt)]
1158 s: S,
1159 ) {
1160 let store = s;
1161 let headers = ExtendedHeaderGenerator::new().next_many(128);
1162
1163 store.insert(&headers[0..64]).await.unwrap();
1164 store.insert(&headers[96..128]).await.unwrap();
1165 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1166
1167 store.remove_height(62).await.unwrap();
1168 assert_store(
1169 &store,
1170 &headers,
1171 new_block_ranges([1..=61, 63..=64, 97..=128]),
1172 )
1173 .await;
1174
1175 store.remove_height(64).await.unwrap();
1176 assert_store(
1177 &store,
1178 &headers,
1179 new_block_ranges([1..=61, 63..=63, 97..=128]),
1180 )
1181 .await;
1182
1183 store.remove_height(63).await.unwrap();
1184 assert_store(&store, &headers, new_block_ranges([1..=61, 97..=128])).await;
1185 }
1186
1187 #[rstest]
1188 #[case::in_memory(new_in_memory_store())]
1189 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1190 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1191 #[self::test]
1192 async fn neighbor_removal<S: Store>(
1193 #[case]
1194 #[future(awt)]
1195 s: S,
1196 ) {
1197 let store = s;
1198 let headers = ExtendedHeaderGenerator::new().next_many(128);
1199
1200 store.insert(&headers[0..64]).await.unwrap();
1201 store.insert(&headers[96..128]).await.unwrap();
1202 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1203
1204 store.remove_height(64).await.unwrap();
1205 assert_store(&store, &headers, new_block_ranges([1..=63, 97..=128])).await;
1206
1207 store.remove_height(97).await.unwrap();
1208 assert_store(&store, &headers, new_block_ranges([1..=63, 98..=128])).await;
1209 }
1210
1211 async fn fill_store<S: Store>(store: &mut S, amount: u64) -> ExtendedHeaderGenerator {
1213 assert!(!store.has_at(1).await, "Store is not empty");
1214
1215 let mut gen = ExtendedHeaderGenerator::new();
1216
1217 store
1218 .insert(gen.next_many_verified(amount))
1219 .await
1220 .expect("inserting test data failed");
1221
1222 gen
1223 }
1224
1225 async fn new_in_memory_store() -> InMemoryStore {
1226 InMemoryStore::new()
1227 }
1228
1229 pub(crate) async fn assert_store<S: Store>(
1230 store: &S,
1231 headers: &[ExtendedHeader],
1232 expected_ranges: BlockRanges,
1233 ) {
1234 assert_eq!(
1235 store.get_stored_header_ranges().await.unwrap(),
1236 expected_ranges
1237 );
1238 for header in headers {
1239 let height = header.height().value();
1240 if expected_ranges.contains(height) {
1241 assert_eq!(&store.get_by_height(height).await.unwrap(), header);
1242 assert_eq!(&store.get_by_hash(&header.hash()).await.unwrap(), header);
1243 } else {
1244 assert!(matches!(
1245 store.get_by_height(height).await.unwrap_err(),
1246 StoreError::NotFound
1247 ));
1248 assert!(matches!(
1249 store.get_by_hash(&header.hash()).await.unwrap_err(),
1250 StoreError::NotFound
1251 ));
1252 }
1253 }
1254 }
1255
1256 #[cfg(not(target_arch = "wasm32"))]
1257 async fn new_redb_store() -> RedbStore {
1258 RedbStore::in_memory().await.unwrap()
1259 }
1260
1261 #[cfg(target_arch = "wasm32")]
1262 async fn new_indexed_db_store() -> IndexedDbStore {
1263 let store_name = crate::test_utils::new_indexed_db_store_name().await;
1264
1265 IndexedDbStore::new(&store_name)
1266 .await
1267 .expect("creating test store failed")
1268 }
1269}