1use std::convert::Infallible;
4use std::fmt::{Debug, Display};
5use std::io::Cursor;
6use std::ops::{Bound, RangeBounds, RangeInclusive};
7
8#[cfg(target_arch = "wasm32")]
9use crate::utils::NamedLockError;
10use async_trait::async_trait;
11use celestia_types::ExtendedHeader;
12use celestia_types::hash::Hash;
13use cid::Cid;
14use libp2p::identity::Keypair;
15use prost::Message;
16use serde::{Deserialize, Serialize};
17use tendermint_proto::Protobuf;
18use thiserror::Error;
19#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
20use wasm_bindgen::prelude::*;
21
22pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError};
23pub use crate::store::either_store::EitherStore;
24pub use crate::store::utils::VerifiedExtendedHeaders;
25
26pub use in_memory_store::InMemoryStore;
27#[cfg(target_arch = "wasm32")]
28pub use indexed_db_store::IndexedDbStore;
29#[cfg(not(target_arch = "wasm32"))]
30pub use redb_store::RedbStore;
31
32mod either_store;
33mod in_memory_store;
34#[cfg(target_arch = "wasm32")]
35mod indexed_db_store;
36#[cfg(not(target_arch = "wasm32"))]
37mod redb_store;
38
39pub(crate) mod utils;
40
41#[derive(Debug, Default, Clone, Serialize, Deserialize)]
45#[cfg_attr(all(feature = "wasm-bindgen", target_arch = "wasm32"), wasm_bindgen)]
46pub struct SamplingMetadata {
47 #[cfg_attr(
50 all(feature = "wasm-bindgen", target_arch = "wasm32"),
51 wasm_bindgen(skip)
52 )]
53 pub cids: Vec<Cid>,
54}
55
56type Result<T, E = StoreError> = std::result::Result<T, E>;
57
58#[async_trait]
63pub trait Store: Send + Sync + Debug {
64 async fn get_head(&self) -> Result<ExtendedHeader>;
66
67 async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader>;
69
70 async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader>;
72
73 async fn wait_new_head(&self) -> u64;
75
76 async fn wait_height(&self, height: u64) -> Result<()>;
78
79 async fn get_range<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
90 where
91 R: RangeBounds<u64> + Send,
92 {
93 let head_height = self.head_height().await?;
94 let range = to_headers_range(range, head_height)?;
95
96 let amount = if range.is_empty() {
97 0
98 } else {
99 range.end() - range.start() + 1 };
101
102 let mut headers = Vec::with_capacity(amount.try_into().unwrap_or(usize::MAX));
103
104 for height in range {
105 let header = self.get_by_height(height).await?;
106 headers.push(header);
107 }
108
109 Ok(headers)
110 }
111
112 async fn head_height(&self) -> Result<u64>;
114
115 async fn has(&self, hash: &Hash) -> bool;
117
118 async fn has_at(&self, height: u64) -> bool;
120
121 async fn update_sampling_metadata(&self, height: u64, cids: Vec<Cid>) -> Result<()>;
126
127 async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>>;
134
135 async fn mark_as_sampled(&self, height: u64) -> Result<()>;
137
138 async fn insert<R>(&self, headers: R) -> Result<()>
143 where
144 R: TryInto<VerifiedExtendedHeaders> + Send,
145 <R as TryInto<VerifiedExtendedHeaders>>::Error: Display;
146
147 async fn get_stored_header_ranges(&self) -> Result<BlockRanges>;
149
150 async fn get_sampled_ranges(&self) -> Result<BlockRanges>;
152
153 async fn get_pruned_ranges(&self) -> Result<BlockRanges>;
155
156 async fn remove_height(&self, height: u64) -> Result<()>;
158
159 async fn get_identity(&self) -> Result<Keypair>;
161
162 async fn close(self) -> Result<()>;
164}
165
166#[derive(Error, Debug)]
168pub enum StoreError {
169 #[error("Header not found in store")]
171 NotFound,
172
173 #[error("Insertion failed: {0}")]
175 InsertionFailed(#[from] StoreInsertionError),
176
177 #[error("Stored data are inconsistent or invalid, try reseting the store: {0}")]
179 StoredDataError(String),
180
181 #[error("Database reported unrecoverable error: {0}")]
183 FatalDatabaseError(String),
184
185 #[error("Received error from executor: {0}")]
187 ExecutorError(String),
188
189 #[error("Error opening store: {0}")]
191 OpenFailed(String),
192
193 #[error("Error locking database instance: {0}")]
195 NamedLock(String),
196}
197
198#[derive(Error, Debug)]
200pub enum StoreInsertionError {
201 #[error("Provided headers failed verification: {0}")]
203 HeadersVerificationFailed(String),
204
205 #[error("Provided headers failed to be verified with existing neighbors: {0}")]
207 NeighborsVerificationFailed(String),
208
209 #[error("Contraints not met: {0}")]
211 ContraintsNotMet(BlockRangesError),
212
213 #[error("Hash {0} already exists in store")]
218 HashExists(Hash),
219}
220
221impl StoreError {
222 pub(crate) fn is_fatal(&self) -> bool {
224 match self {
225 StoreError::StoredDataError(_)
226 | StoreError::FatalDatabaseError(_)
227 | StoreError::ExecutorError(_)
228 | StoreError::NamedLock(_)
229 | StoreError::OpenFailed(_) => true,
230 StoreError::NotFound | StoreError::InsertionFailed(_) => false,
231 }
232 }
233}
234
235impl From<libp2p::identity::DecodingError> for StoreError {
236 fn from(error: libp2p::identity::DecodingError) -> Self {
237 StoreError::StoredDataError(format!(
238 "Could not deserialize stored libp2p identity: {error}"
239 ))
240 }
241}
242
243#[cfg(not(target_arch = "wasm32"))]
244impl From<tokio::task::JoinError> for StoreError {
245 fn from(error: tokio::task::JoinError) -> StoreError {
246 StoreError::ExecutorError(error.to_string())
247 }
248}
249
250#[cfg(target_arch = "wasm32")]
251impl From<NamedLockError> for StoreError {
252 fn from(value: NamedLockError) -> Self {
253 StoreError::NamedLock(value.to_string())
254 }
255}
256
257impl From<Infallible> for StoreError {
259 fn from(_: Infallible) -> Self {
260 unreachable!("Infallible failed")
262 }
263}
264
265#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
266#[wasm_bindgen]
267impl SamplingMetadata {
268 #[wasm_bindgen(getter)]
270 pub fn cids(&self) -> Vec<js_sys::Uint8Array> {
271 self.cids
272 .iter()
273 .map(|cid| js_sys::Uint8Array::from(cid.to_bytes().as_ref()))
274 .collect()
275 }
276}
277
278#[derive(Message)]
279struct RawSamplingMetadata {
280 #[prost(message, repeated, tag = "2")]
283 cids: Vec<Vec<u8>>,
284}
285
286impl Protobuf<RawSamplingMetadata> for SamplingMetadata {}
287
288impl TryFrom<RawSamplingMetadata> for SamplingMetadata {
289 type Error = cid::Error;
290
291 fn try_from(item: RawSamplingMetadata) -> Result<Self, Self::Error> {
292 let cids = item
293 .cids
294 .iter()
295 .map(|cid| {
296 let buffer = Cursor::new(cid);
297 Cid::read_bytes(buffer)
298 })
299 .collect::<Result<_, _>>()?;
300
301 Ok(SamplingMetadata { cids })
302 }
303}
304
305impl From<SamplingMetadata> for RawSamplingMetadata {
306 fn from(item: SamplingMetadata) -> Self {
307 let cids = item.cids.iter().map(|cid| cid.to_bytes()).collect();
308
309 RawSamplingMetadata { cids }
310 }
311}
312
313fn to_headers_range(bounds: impl RangeBounds<u64>, last_index: u64) -> Result<RangeInclusive<u64>> {
315 let start = match bounds.start_bound() {
316 Bound::Unbounded => 1,
318 Bound::Included(&x) if x > last_index || x == 0 => return Err(StoreError::NotFound),
320 Bound::Excluded(&x) if x >= last_index => return Err(StoreError::NotFound),
321 Bound::Included(&x) => x,
323 Bound::Excluded(&x) => x + 1, };
325 let end = match bounds.end_bound() {
326 Bound::Unbounded => last_index,
328 Bound::Included(&x) if x > last_index => return Err(StoreError::NotFound),
330 Bound::Excluded(&x) if x > last_index + 1 => return Err(StoreError::NotFound),
331 Bound::Excluded(&0) => 0,
333 Bound::Included(&x) => x,
335 Bound::Excluded(&x) => x - 1,
336 };
337
338 Ok(start..=end)
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::test_utils::ExtendedHeaderGeneratorExt;
345 use celestia_types::Height;
346 use celestia_types::test_utils::ExtendedHeaderGenerator;
347 use rstest::rstest;
348 use lumina_utils::test_utils::async_test as test;
351
352 use crate::test_utils::new_block_ranges;
353
354 #[test]
355 async fn converts_bounded_ranges() {
356 assert_eq!(1..=15, to_headers_range(1..16, 100).unwrap());
357 assert_eq!(1..=15, to_headers_range(1..=15, 100).unwrap());
358 assert_eq!(300..=400, to_headers_range(300..401, 500).unwrap());
359 assert_eq!(300..=400, to_headers_range(300..=400, 500).unwrap());
360 }
361
362 #[test]
363 async fn starts_from_one_when_unbounded_start() {
364 assert_eq!(&1, to_headers_range(..=10, 100).unwrap().start());
365 assert_eq!(&1, to_headers_range(..10, 100).unwrap().start());
366 assert_eq!(&1, to_headers_range(.., 100).unwrap().start());
367 }
368
369 #[test]
370 async fn ends_on_last_index_when_unbounded_end() {
371 assert_eq!(&10, to_headers_range(1.., 10).unwrap().end());
372 assert_eq!(&11, to_headers_range(1.., 11).unwrap().end());
373 assert_eq!(&10, to_headers_range(.., 10).unwrap().end());
374 }
375
376 #[test]
377 async fn handle_ranges_ending_precisely_at_last_index() {
378 let last_index = 10;
379
380 let bounds_ending_at_last_index = [
381 (Bound::Unbounded, Bound::Included(last_index)),
382 (Bound::Unbounded, Bound::Excluded(last_index + 1)),
383 ];
384
385 for bound in bounds_ending_at_last_index {
386 let range = to_headers_range(bound, last_index).unwrap();
387 assert_eq!(*range.end(), last_index);
388 }
389 }
390
391 #[test]
392 async fn handle_ranges_ending_after_last_index() {
393 let last_index = 10;
394
395 let bounds_ending_after_last_index = [
396 (Bound::Unbounded, Bound::Included(last_index + 1)),
397 (Bound::Unbounded, Bound::Excluded(last_index + 2)),
398 ];
399
400 for bound in bounds_ending_after_last_index {
401 to_headers_range(bound, last_index).unwrap_err();
402 }
403 }
404
405 #[test]
406 async fn errors_if_zero_heigth_is_included() {
407 let includes_zero_height = 0..5;
408 to_headers_range(includes_zero_height, 10).unwrap_err();
409 }
410
411 #[test]
412 async fn handle_ranges_starting_precisely_at_last_index() {
413 let last_index = 10;
414
415 let bounds_starting_at_last_index = [
416 (Bound::Included(last_index), Bound::Unbounded),
417 (Bound::Excluded(last_index - 1), Bound::Unbounded),
418 ];
419
420 for bound in bounds_starting_at_last_index {
421 let range = to_headers_range(bound, last_index).unwrap();
422 assert_eq!(*range.start(), last_index);
423 }
424 }
425
426 #[test]
427 async fn handle_ranges_starting_after_last_index() {
428 let last_index = 10;
429
430 let bounds_starting_after_last_index = [
431 (Bound::Included(last_index + 1), Bound::Unbounded),
432 (Bound::Excluded(last_index), Bound::Unbounded),
433 ];
434
435 for bound in bounds_starting_after_last_index {
436 to_headers_range(bound, last_index).unwrap_err();
437 }
438 }
439
440 #[test]
441 async fn handle_ranges_that_lead_to_empty_ranges() {
442 let last_index = 10;
443
444 let bounds_leading_to_empty_range = [
445 (Bound::Unbounded, Bound::Excluded(0)),
446 (Bound::Included(3), Bound::Excluded(3)),
447 (Bound::Included(3), Bound::Included(2)),
448 (Bound::Excluded(2), Bound::Included(2)),
449 ];
450
451 for bound in bounds_leading_to_empty_range {
452 assert!(to_headers_range(bound, last_index).unwrap().is_empty());
453 }
454 }
455
456 #[rstest]
457 #[case::in_memory(new_in_memory_store())]
458 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
459 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
460 #[self::test]
461 async fn test_contains_height<S: Store>(
462 #[case]
463 #[future(awt)]
464 s: S,
465 ) {
466 let mut s = s;
467 fill_store(&mut s, 2).await;
468
469 assert!(!s.has_at(0).await);
470 assert!(s.has_at(1).await);
471 assert!(s.has_at(2).await);
472 assert!(!s.has_at(3).await);
473 }
474
475 #[rstest]
476 #[case::in_memory(new_in_memory_store())]
477 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
478 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
479 #[self::test]
480 async fn test_empty_store<S: Store>(
481 #[case]
482 #[future(awt)]
483 s: S,
484 ) {
485 assert!(matches!(s.head_height().await, Err(StoreError::NotFound)));
486 assert!(matches!(s.get_head().await, Err(StoreError::NotFound)));
487 assert!(matches!(
488 s.get_by_height(1).await,
489 Err(StoreError::NotFound)
490 ));
491 assert!(matches!(
492 s.get_by_hash(&Hash::Sha256([0; 32])).await,
493 Err(StoreError::NotFound)
494 ));
495 }
496
497 #[rstest]
498 #[case::in_memory(new_in_memory_store())]
499 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
500 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
501 #[self::test]
502 async fn test_read_write<S: Store>(
503 #[case]
504 #[future(awt)]
505 s: S,
506 ) {
507 let mut generator = ExtendedHeaderGenerator::new();
508
509 let header = generator.next();
510
511 s.insert(header.clone()).await.unwrap();
512 assert_eq!(s.head_height().await.unwrap(), 1);
513 assert_eq!(s.get_head().await.unwrap(), header);
514 assert_eq!(s.get_by_height(1).await.unwrap(), header);
515 assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
516 }
517
518 #[rstest]
519 #[case::in_memory(new_in_memory_store())]
520 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
521 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
522 #[self::test]
523 async fn test_pregenerated_data<S: Store>(
524 #[case]
525 #[future(awt)]
526 s: S,
527 ) {
528 let mut s = s;
529 fill_store(&mut s, 100).await;
530
531 assert_eq!(s.head_height().await.unwrap(), 100);
532 let head = s.get_head().await.unwrap();
533 assert_eq!(s.get_by_height(100).await.unwrap(), head);
534 assert!(matches!(
535 s.get_by_height(101).await,
536 Err(StoreError::NotFound)
537 ));
538
539 let header = s.get_by_height(54).await.unwrap();
540 assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
541 }
542
543 #[rstest]
544 #[case::in_memory(new_in_memory_store())]
545 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
546 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
547 #[self::test]
548 async fn test_duplicate_insert<S: Store>(
549 #[case]
550 #[future(awt)]
551 s: S,
552 ) {
553 let mut s = s;
554 let mut generator = fill_store(&mut s, 100).await;
555
556 let header101 = generator.next();
557 s.insert(header101.clone()).await.unwrap();
558
559 let error = match s.insert(header101).await {
560 Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
561 res => panic!("Invalid result: {res:?}"),
562 };
563
564 assert_eq!(
565 error,
566 BlockRangesError::BlockRangeOverlap(101..=101, 101..=101)
567 );
568 }
569
570 #[rstest]
571 #[case::in_memory(new_in_memory_store())]
572 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
573 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
574 #[self::test]
575 async fn test_overwrite_height<S: Store>(
576 #[case]
577 #[future(awt)]
578 s: S,
579 ) {
580 let mut s = s;
581 let generator = fill_store(&mut s, 100).await;
582
583 let header29 = s.get_by_height(29).await.unwrap();
585 let header30 = generator.next_of(&header29);
586
587 let error = match s.insert(header30).await {
588 Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
589 res => panic!("Invalid result: {res:?}"),
590 };
591 assert_eq!(error, BlockRangesError::BlockRangeOverlap(30..=30, 30..=30));
592 }
593
594 #[rstest]
595 #[case::in_memory(new_in_memory_store())]
596 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
597 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
598 #[self::test]
599 async fn test_overwrite_hash<S: Store>(
600 #[case]
601 #[future(awt)]
602 s: S,
603 ) {
604 let mut s = s;
605 fill_store(&mut s, 100).await;
606
607 let mut dup_header = s.get_by_height(99).await.unwrap();
608 dup_header.header.height = Height::from(102u32);
609
610 assert!(matches!(
611 s.insert(dup_header).await,
612 Err(StoreError::InsertionFailed(
613 StoreInsertionError::HashExists(_)
614 ))
615 ));
616 }
617
618 #[rstest]
619 #[case::in_memory(new_in_memory_store())]
620 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
621 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
622 #[self::test]
623 async fn test_append_range<S: Store>(
624 #[case]
625 #[future(awt)]
626 s: S,
627 ) {
628 let mut s = s;
629 let mut generator = fill_store(&mut s, 10).await;
630
631 s.insert(generator.next_many_verified(4)).await.unwrap();
632 s.get_by_height(14).await.unwrap();
633 }
634
635 #[rstest]
636 #[case::in_memory(new_in_memory_store())]
637 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
638 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
639 #[self::test]
640 async fn test_fill_range_gap<S: Store>(
641 #[case]
642 #[future(awt)]
643 s: S,
644 ) {
645 let mut s = s;
646 let mut generator = fill_store(&mut s, 10).await;
647
648 let skipped = generator.next();
650 let upcoming_head = generator.next();
652
653 s.insert(upcoming_head).await.unwrap();
654 s.insert(skipped).await.unwrap();
655 }
656
657 #[rstest]
658 #[case::in_memory(new_in_memory_store())]
659 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
660 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
661 #[self::test]
662 async fn test_fill_range_gap_with_invalid_header<S: Store>(
663 #[case]
664 #[future(awt)]
665 s: S,
666 ) {
667 let mut s = s;
668 let mut generator = fill_store(&mut s, 10).await;
669
670 let mut gen_prime = generator.fork();
671 let _skipped = generator.next();
673 let another_chain = gen_prime.next();
674 let upcoming_head = generator.next();
676
677 s.insert(upcoming_head).await.unwrap();
678 assert!(matches!(
679 s.insert(another_chain).await,
680 Err(StoreError::InsertionFailed(
681 StoreInsertionError::NeighborsVerificationFailed(_)
682 ))
683 ));
684 }
685
686 #[rstest]
687 #[case::in_memory(new_in_memory_store())]
688 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
689 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
690 #[self::test]
691 async fn test_appends_with_gaps<S: Store>(
692 #[case]
693 #[future(awt)]
694 s: S,
695 ) {
696 let mut generator = ExtendedHeaderGenerator::new_from_height(5);
697 let header5 = generator.next();
698 generator.next_many(4);
699 let header10 = generator.next();
700 generator.next_many(4);
701 let header15 = generator.next();
702
703 s.insert(header5).await.unwrap();
704 s.insert(header15).await.unwrap();
705 s.insert(header10).await.unwrap_err();
706 }
707
708 #[rstest]
709 #[case::in_memory(new_in_memory_store())]
710 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
711 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
712 #[self::test]
713 async fn check_pruned_ranges<S: Store>(
714 #[case]
715 #[future(awt)]
716 s: S,
717 ) {
718 let store = s;
719 let headers = ExtendedHeaderGenerator::new().next_many(10);
720
721 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
722 let pruned_ranges = store.get_pruned_ranges().await.unwrap();
723 assert!(stored_ranges.is_empty());
724 assert!(pruned_ranges.is_empty());
725
726 store.insert(&headers[..]).await.unwrap();
727
728 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
729 let pruned_ranges = store.get_pruned_ranges().await.unwrap();
730 assert_eq!(stored_ranges, new_block_ranges([1..=10]));
731 assert!(pruned_ranges.is_empty());
732
733 store.remove_height(4).await.unwrap();
734 store.remove_height(9).await.unwrap();
735
736 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
737 let pruned_ranges = store.get_pruned_ranges().await.unwrap();
738 assert_eq!(stored_ranges, new_block_ranges([1..=3, 5..=8, 10..=10]));
739 assert_eq!(pruned_ranges, new_block_ranges([4..=4, 9..=9]));
740
741 store.insert(&headers[8]).await.unwrap();
743
744 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
745 let pruned_ranges = store.get_pruned_ranges().await.unwrap();
746 assert_eq!(stored_ranges, new_block_ranges([1..=3, 5..=10]));
747 assert_eq!(pruned_ranges, new_block_ranges([4..=4]));
748 }
749
750 #[rstest]
751 #[case::in_memory(new_in_memory_store())]
752 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
753 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
754 #[self::test]
755 async fn check_sampled_ranges<S: Store>(
756 #[case]
757 #[future(awt)]
758 s: S,
759 ) {
760 let store = s;
761 let headers = ExtendedHeaderGenerator::new().next_many(10);
762
763 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
764 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
765 assert!(stored_ranges.is_empty());
766 assert!(sampled_ranges.is_empty());
767
768 store.insert(&headers[..]).await.unwrap();
769
770 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
771 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
772 assert_eq!(stored_ranges, new_block_ranges([1..=10]));
773 assert!(sampled_ranges.is_empty());
774
775 store.mark_as_sampled(4).await.unwrap();
776 store.mark_as_sampled(9).await.unwrap();
777
778 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
779 assert_eq!(sampled_ranges, new_block_ranges([4..=4, 9..=9]));
780
781 store.remove_height(4).await.unwrap();
783
784 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
785 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
786 assert_eq!(stored_ranges, new_block_ranges([1..=3, 5..=10]));
787 assert_eq!(sampled_ranges, new_block_ranges([9..=9]));
788
789 assert!(matches!(
791 store.mark_as_sampled(4).await,
792 Err(StoreError::NotFound)
793 ));
794 }
795
796 #[rstest]
797 #[case::in_memory(new_in_memory_store())]
798 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
799 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
800 #[self::test]
801 async fn test_sampling_height_empty_store<S: Store>(
802 #[case]
803 #[future(awt)]
804 store: S,
805 ) {
806 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
807 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
808
809 assert_eq!(stored_ranges.len(), 0);
810 assert_eq!(sampled_ranges.len(), 0);
811
812 assert!(matches!(
813 store.mark_as_sampled(0).await,
814 Err(StoreError::NotFound)
815 ));
816 assert!(matches!(
817 store.mark_as_sampled(1).await,
818 Err(StoreError::NotFound)
819 ));
820
821 assert!(matches!(
822 store.update_sampling_metadata(0, vec![]).await,
823 Err(StoreError::NotFound)
824 ));
825 assert!(matches!(
826 store.update_sampling_metadata(1, vec![]).await,
827 Err(StoreError::NotFound)
828 ));
829 }
830
831 #[rstest]
832 #[case::in_memory(new_in_memory_store())]
833 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
834 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
835 #[self::test]
836 async fn test_sampling_merge<S: Store>(
837 #[case]
838 #[future(awt)]
839 s: S,
840 ) {
841 let mut store = s;
842 fill_store(&mut store, 1).await;
843
844 let cid0 = "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ"
845 .parse()
846 .unwrap();
847 let cid1 = "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA"
848 .parse()
849 .unwrap();
850 let cid2 = "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq"
851 .parse()
852 .unwrap();
853
854 assert!(store.get_sampling_metadata(1).await.unwrap().is_none());
856
857 store.update_sampling_metadata(1, vec![]).await.unwrap();
859 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
860 assert_eq!(sampling_data.cids, vec![]);
861
862 store.update_sampling_metadata(1, vec![cid0]).await.unwrap();
863 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
864 assert_eq!(sampling_data.cids, vec![cid0]);
865
866 store.update_sampling_metadata(1, vec![cid1]).await.unwrap();
867 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
868 assert_eq!(sampling_data.cids, vec![cid0, cid1]);
869
870 store.update_sampling_metadata(1, vec![cid2]).await.unwrap();
871 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
872 assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
873
874 store.update_sampling_metadata(1, vec![]).await.unwrap();
876 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
877 assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
878
879 store.update_sampling_metadata(1, vec![cid1]).await.unwrap();
881 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
882 assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
883
884 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
886 assert!(!sampled_ranges.contains(1));
887 }
888
889 #[rstest]
890 #[case::in_memory(new_in_memory_store())]
891 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
892 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
893 #[self::test]
894 async fn test_sampled_cids<S: Store>(
895 #[case]
896 #[future(awt)]
897 s: S,
898 ) {
899 let mut store = s;
900 fill_store(&mut store, 5).await;
901
902 let cids: Vec<Cid> = [
903 "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq",
904 "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi",
905 "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ",
906 "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA",
907 ]
908 .iter()
909 .map(|s| s.parse().unwrap())
910 .collect();
911
912 store
913 .update_sampling_metadata(1, cids.clone())
914 .await
915 .unwrap();
916 store
917 .update_sampling_metadata(2, cids[0..1].to_vec())
918 .await
919 .unwrap();
920 store
921 .update_sampling_metadata(4, cids[3..].to_vec())
922 .await
923 .unwrap();
924 store.update_sampling_metadata(5, vec![]).await.unwrap();
925
926 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
927 assert_eq!(sampling_data.cids, cids);
928
929 let sampling_data = store.get_sampling_metadata(2).await.unwrap().unwrap();
930 assert_eq!(sampling_data.cids, cids[0..1]);
931
932 assert!(store.get_sampling_metadata(3).await.unwrap().is_none());
933
934 let sampling_data = store.get_sampling_metadata(4).await.unwrap().unwrap();
935 assert_eq!(sampling_data.cids, cids[3..]);
936
937 let sampling_data = store.get_sampling_metadata(5).await.unwrap().unwrap();
938 assert_eq!(sampling_data.cids, vec![]);
939
940 assert!(matches!(
941 store.get_sampling_metadata(0).await,
942 Err(StoreError::NotFound)
943 ));
944 assert!(matches!(
945 store.get_sampling_metadata(6).await,
946 Err(StoreError::NotFound)
947 ));
948 assert!(matches!(
949 store.get_sampling_metadata(100).await,
950 Err(StoreError::NotFound)
951 ));
952 }
953
954 #[rstest]
955 #[case::in_memory(new_in_memory_store())]
956 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
957 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
958 #[self::test]
959 async fn test_empty_store_range<S: Store>(
960 #[case]
961 #[future(awt)]
962 s: S,
963 ) {
964 let store = s;
965
966 assert_eq!(
967 store.get_stored_header_ranges().await.unwrap().as_ref(),
968 &[]
969 );
970 }
971
972 #[rstest]
973 #[case::in_memory(new_in_memory_store())]
974 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
975 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
976 #[self::test]
977 async fn test_single_header_range<S: Store>(
978 #[case]
979 #[future(awt)]
980 s: S,
981 ) {
982 let store = s;
983 let mut generator = ExtendedHeaderGenerator::new();
984
985 generator.skip(19);
986
987 let prepend0 = generator.next();
988 let prepend1 = generator.next_many_verified(5);
989 store.insert(generator.next_many_verified(4)).await.unwrap();
990 store.insert(generator.next_many_verified(5)).await.unwrap();
991 store.insert(prepend1).await.unwrap();
992 store.insert(prepend0).await.unwrap();
993 store.insert(generator.next_many_verified(5)).await.unwrap();
994 store.insert(generator.next()).await.unwrap();
995
996 let final_ranges = store.get_stored_header_ranges().await.unwrap();
997 assert_eq!(final_ranges.as_ref(), &[20..=40]);
998 }
999
1000 #[rstest]
1003 #[case::in_memory(new_in_memory_store())]
1004 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1005 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1006 #[self::test]
1007 async fn test_ranges_consolidation<S: Store>(
1008 #[case]
1009 #[future(awt)]
1010 s: S,
1011 ) {
1012 let store = s;
1013 let mut generator = ExtendedHeaderGenerator::new();
1014
1015 generator.skip(9);
1016
1017 let skip0 = generator.next_many_verified(5);
1018 store.insert(generator.next_many_verified(2)).await.unwrap();
1019 store.insert(generator.next_many_verified(3)).await.unwrap();
1020
1021 let skip1 = generator.next();
1022 store.insert(generator.next()).await.unwrap();
1023
1024 let skip2 = generator.next_many_verified(5);
1025
1026 store.insert(generator.next()).await.unwrap();
1027
1028 let skip3 = generator.next_many_verified(5);
1029 let skip4 = generator.next_many_verified(5);
1030 let skip5 = generator.next_many_verified(5);
1031
1032 store.insert(skip5).await.unwrap();
1033 store.insert(skip4).await.unwrap();
1034 store.insert(skip3).await.unwrap();
1035 store.insert(skip2).await.unwrap();
1036 store.insert(skip1).await.unwrap();
1037 store.insert(skip0).await.unwrap();
1038
1039 let final_ranges = store.get_stored_header_ranges().await.unwrap();
1040 assert_eq!(final_ranges.as_ref(), &[10..=42]);
1041 }
1042
1043 #[rstest]
1044 #[case::in_memory(new_in_memory_store())]
1045 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1046 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1047 #[self::test]
1048 async fn test_neighbour_validation<S: Store>(
1049 #[case]
1050 #[future(awt)]
1051 s: S,
1052 ) {
1053 let store = s;
1054 let mut generator = ExtendedHeaderGenerator::new();
1055
1056 store.insert(generator.next_many_verified(5)).await.unwrap();
1057 let mut fork = generator.fork();
1058 let _gap = generator.next();
1059 store.insert(generator.next_many_verified(4)).await.unwrap();
1060
1061 store.insert(fork.next()).await.unwrap_err();
1062 }
1063
1064 #[rstest]
1065 #[case::in_memory(new_in_memory_store())]
1066 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1067 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1068 #[self::test]
1069 async fn tail_removal_partial_range<S: Store>(
1070 #[case]
1071 #[future(awt)]
1072 s: S,
1073 ) {
1074 let store = s;
1075 let headers = ExtendedHeaderGenerator::new().next_many(128);
1076
1077 store.insert(&headers[0..64]).await.unwrap();
1078 store.insert(&headers[96..128]).await.unwrap();
1079 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1080
1081 store.remove_height(1).await.unwrap();
1082 assert_store(&store, &headers, new_block_ranges([2..=64, 97..=128])).await;
1083 }
1084
1085 #[rstest]
1086 #[case::in_memory(new_in_memory_store())]
1087 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1088 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1089 #[self::test]
1090 async fn tail_removal_full_range<S: Store>(
1091 #[case]
1092 #[future(awt)]
1093 s: S,
1094 ) {
1095 let store = s;
1096 let headers = ExtendedHeaderGenerator::new().next_many(128);
1097
1098 store.insert(&headers[0..1]).await.unwrap();
1099 store.insert(&headers[65..128]).await.unwrap();
1100 assert_store(&store, &headers, new_block_ranges([1..=1, 66..=128])).await;
1101
1102 store.remove_height(1).await.unwrap();
1103 assert_store(&store, &headers, new_block_ranges([66..=128])).await;
1104 }
1105
1106 #[rstest]
1107 #[case::in_memory(new_in_memory_store())]
1108 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1109 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1110 #[self::test]
1111 async fn tail_removal_remove_all<S: Store>(
1112 #[case]
1113 #[future(awt)]
1114 s: S,
1115 ) {
1116 let store = s;
1117 let headers = ExtendedHeaderGenerator::new().next_many(66);
1118
1119 store.insert(&headers[..]).await.unwrap();
1120 assert_store(&store, &headers, new_block_ranges([1..=66])).await;
1121
1122 for i in 1..=66 {
1123 store.remove_height(i).await.unwrap();
1124 }
1125
1126 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
1127 assert!(stored_ranges.is_empty());
1128
1129 for h in 1..=66 {
1130 assert!(!store.has_at(h).await);
1131 }
1132 }
1133
1134 #[rstest]
1135 #[case::in_memory(new_in_memory_store())]
1136 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1137 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1138 #[self::test]
1139 async fn head_removal_partial_range<S: Store>(
1140 #[case]
1141 #[future(awt)]
1142 s: S,
1143 ) {
1144 let store = s;
1145 let headers = ExtendedHeaderGenerator::new().next_many(128);
1146
1147 store.insert(&headers[0..64]).await.unwrap();
1148 store.insert(&headers[96..128]).await.unwrap();
1149 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1150
1151 store.remove_height(128).await.unwrap();
1152 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=127])).await;
1153 }
1154
1155 #[rstest]
1156 #[case::in_memory(new_in_memory_store())]
1157 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1158 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1159 #[self::test]
1160 async fn head_removal_full_range<S: Store>(
1161 #[case]
1162 #[future(awt)]
1163 s: S,
1164 ) {
1165 let store = s;
1166 let headers = ExtendedHeaderGenerator::new().next_many(128);
1167
1168 store.insert(&headers[0..64]).await.unwrap();
1169 store.insert(&headers[127..128]).await.unwrap();
1170 assert_store(&store, &headers, new_block_ranges([1..=64, 128..=128])).await;
1171
1172 store.remove_height(128).await.unwrap();
1173 assert_store(&store, &headers, new_block_ranges([1..=64])).await;
1174 }
1175
1176 #[rstest]
1177 #[case::in_memory(new_in_memory_store())]
1178 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1179 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1180 #[self::test]
1181 async fn middle_removal<S: Store>(
1182 #[case]
1183 #[future(awt)]
1184 s: S,
1185 ) {
1186 let store = s;
1187 let headers = ExtendedHeaderGenerator::new().next_many(128);
1188
1189 store.insert(&headers[0..64]).await.unwrap();
1190 store.insert(&headers[96..128]).await.unwrap();
1191 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1192
1193 store.remove_height(62).await.unwrap();
1194 assert_store(
1195 &store,
1196 &headers,
1197 new_block_ranges([1..=61, 63..=64, 97..=128]),
1198 )
1199 .await;
1200
1201 store.remove_height(64).await.unwrap();
1202 assert_store(
1203 &store,
1204 &headers,
1205 new_block_ranges([1..=61, 63..=63, 97..=128]),
1206 )
1207 .await;
1208
1209 store.remove_height(63).await.unwrap();
1210 assert_store(&store, &headers, new_block_ranges([1..=61, 97..=128])).await;
1211 }
1212
1213 #[rstest]
1214 #[case::in_memory(new_in_memory_store())]
1215 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1216 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1217 #[self::test]
1218 async fn neighbor_removal<S: Store>(
1219 #[case]
1220 #[future(awt)]
1221 s: S,
1222 ) {
1223 let store = s;
1224 let headers = ExtendedHeaderGenerator::new().next_many(128);
1225
1226 store.insert(&headers[0..64]).await.unwrap();
1227 store.insert(&headers[96..128]).await.unwrap();
1228 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1229
1230 store.remove_height(64).await.unwrap();
1231 assert_store(&store, &headers, new_block_ranges([1..=63, 97..=128])).await;
1232
1233 store.remove_height(97).await.unwrap();
1234 assert_store(&store, &headers, new_block_ranges([1..=63, 98..=128])).await;
1235 }
1236
1237 #[rstest]
1238 #[case::in_memory(new_in_memory_store())]
1239 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1240 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1241 #[self::test]
1242 async fn libp2p_identity_seeding<S: Store>(
1243 #[case]
1244 #[future(awt)]
1245 s: S,
1246 ) {
1247 let store = s;
1248 let generated_keypair = store.get_identity().await.unwrap();
1249 let persisted_keypair = store.get_identity().await.unwrap();
1250
1251 assert_eq!(generated_keypair.public(), persisted_keypair.public());
1252 }
1253
1254 async fn fill_store<S: Store>(store: &mut S, amount: u64) -> ExtendedHeaderGenerator {
1256 assert!(!store.has_at(1).await, "Store is not empty");
1257
1258 let mut generator = ExtendedHeaderGenerator::new();
1259
1260 store
1261 .insert(generator.next_many_verified(amount))
1262 .await
1263 .expect("inserting test data failed");
1264
1265 generator
1266 }
1267
1268 async fn new_in_memory_store() -> InMemoryStore {
1269 InMemoryStore::new()
1270 }
1271
1272 pub(crate) async fn assert_store<S: Store>(
1273 store: &S,
1274 headers: &[ExtendedHeader],
1275 expected_ranges: BlockRanges,
1276 ) {
1277 assert_eq!(
1278 store.get_stored_header_ranges().await.unwrap(),
1279 expected_ranges
1280 );
1281 for header in headers {
1282 let height = header.height().value();
1283 if expected_ranges.contains(height) {
1284 assert_eq!(&store.get_by_height(height).await.unwrap(), header);
1285 assert_eq!(&store.get_by_hash(&header.hash()).await.unwrap(), header);
1286 } else {
1287 assert!(matches!(
1288 store.get_by_height(height).await.unwrap_err(),
1289 StoreError::NotFound
1290 ));
1291 assert!(matches!(
1292 store.get_by_hash(&header.hash()).await.unwrap_err(),
1293 StoreError::NotFound
1294 ));
1295 }
1296 }
1297 }
1298
1299 #[cfg(not(target_arch = "wasm32"))]
1300 async fn new_redb_store() -> RedbStore {
1301 RedbStore::in_memory().await.unwrap()
1302 }
1303
1304 #[cfg(target_arch = "wasm32")]
1305 async fn new_indexed_db_store() -> IndexedDbStore {
1306 let store_name = crate::test_utils::new_indexed_db_store_name().await;
1307
1308 IndexedDbStore::new(&store_name)
1309 .await
1310 .expect("creating test store failed")
1311 }
1312}