1use std::convert::Infallible;
4use std::fmt::{Debug, Display};
5use std::io::Cursor;
6use std::ops::{Bound, RangeBounds, RangeInclusive};
7
8use async_trait::async_trait;
9use celestia_types::hash::Hash;
10use celestia_types::ExtendedHeader;
11use cid::Cid;
12use prost::Message;
13use serde::{Deserialize, Serialize};
14use tendermint_proto::Protobuf;
15use thiserror::Error;
16#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
17use wasm_bindgen::prelude::*;
18
19pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError};
20pub use crate::store::either_store::EitherStore;
21pub use crate::store::utils::VerifiedExtendedHeaders;
22
23pub use in_memory_store::InMemoryStore;
24#[cfg(target_arch = "wasm32")]
25pub use indexed_db_store::IndexedDbStore;
26#[cfg(not(target_arch = "wasm32"))]
27pub use redb_store::RedbStore;
28
29mod either_store;
30mod in_memory_store;
31#[cfg(target_arch = "wasm32")]
32mod indexed_db_store;
33#[cfg(not(target_arch = "wasm32"))]
34mod redb_store;
35
36pub(crate) mod utils;
37
38#[derive(Debug, Default, Clone, Serialize, Deserialize)]
42#[cfg_attr(all(feature = "wasm-bindgen", target_arch = "wasm32"), wasm_bindgen)]
43pub struct SamplingMetadata {
44 pub status: SamplingStatus,
46
47 #[cfg_attr(
50 all(feature = "wasm-bindgen", target_arch = "wasm32"),
51 wasm_bindgen(skip)
52 )]
53 pub cids: Vec<Cid>,
54}
55
56#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58#[cfg_attr(all(feature = "wasm-bindgen", target_arch = "wasm32"), wasm_bindgen)]
59pub enum SamplingStatus {
60 #[default]
62 Unknown,
63 Accepted,
65 Rejected,
67}
68
69type Result<T, E = StoreError> = std::result::Result<T, E>;
70
71#[async_trait]
76pub trait Store: Send + Sync + Debug {
77 async fn get_head(&self) -> Result<ExtendedHeader>;
79
80 async fn get_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader>;
82
83 async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader>;
85
86 async fn wait_new_head(&self) -> u64;
88
89 async fn wait_height(&self, height: u64) -> Result<()>;
91
92 async fn get_range<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
103 where
104 R: RangeBounds<u64> + Send,
105 {
106 let head_height = self.head_height().await?;
107 let range = to_headers_range(range, head_height)?;
108
109 let amount = if range.is_empty() {
110 0
111 } else {
112 range.end() - range.start() + 1 };
114
115 let mut headers = Vec::with_capacity(amount.try_into().unwrap_or(usize::MAX));
116
117 for height in range {
118 let header = self.get_by_height(height).await?;
119 headers.push(header);
120 }
121
122 Ok(headers)
123 }
124
125 async fn head_height(&self) -> Result<u64>;
127
128 async fn has(&self, hash: &Hash) -> bool;
130
131 async fn has_at(&self, height: u64) -> bool;
133
134 async fn update_sampling_metadata(
139 &self,
140 height: u64,
141 status: SamplingStatus,
142 cids: Vec<Cid>,
143 ) -> Result<()>;
144
145 async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>>;
152
153 async fn insert<R>(&self, headers: R) -> Result<()>
158 where
159 R: TryInto<VerifiedExtendedHeaders> + Send,
160 <R as TryInto<VerifiedExtendedHeaders>>::Error: Display;
161
162 async fn get_stored_header_ranges(&self) -> Result<BlockRanges>;
164
165 async fn get_accepted_sampling_ranges(&self) -> Result<BlockRanges>;
167
168 async fn remove_height(&self, height: u64) -> Result<()>;
170
171 async fn close(self) -> Result<()>;
173}
174
175#[derive(Error, Debug)]
177pub enum StoreError {
178 #[error("Header not found in store")]
180 NotFound,
181
182 #[error("Insertion failed: {0}")]
184 InsertionFailed(#[from] StoreInsertionError),
185
186 #[error("Stored data are inconsistent or invalid, try reseting the store: {0}")]
188 StoredDataError(String),
189
190 #[error("Database reported unrecoverable error: {0}")]
192 FatalDatabaseError(String),
193
194 #[error("Received error from executor: {0}")]
196 ExecutorError(String),
197
198 #[error("Error opening store: {0}")]
200 OpenFailed(String),
201}
202
203#[derive(Error, Debug)]
205pub enum StoreInsertionError {
206 #[error("Provided headers failed verification: {0}")]
208 HeadersVerificationFailed(String),
209
210 #[error("Provided headers failed to be verified with existing neighbors: {0}")]
212 NeighborsVerificationFailed(String),
213
214 #[error("Contraints not met: {0}")]
216 ContraintsNotMet(BlockRangesError),
217
218 #[error("Hash {0} already exists in store")]
223 HashExists(Hash),
224}
225
226impl StoreError {
227 pub(crate) fn is_fatal(&self) -> bool {
229 match self {
230 StoreError::StoredDataError(_)
231 | StoreError::FatalDatabaseError(_)
232 | StoreError::ExecutorError(_)
233 | StoreError::OpenFailed(_) => true,
234 StoreError::NotFound | StoreError::InsertionFailed(_) => false,
235 }
236 }
237}
238
239#[cfg(not(target_arch = "wasm32"))]
240impl From<tokio::task::JoinError> for StoreError {
241 fn from(error: tokio::task::JoinError) -> StoreError {
242 StoreError::ExecutorError(error.to_string())
243 }
244}
245
246impl From<Infallible> for StoreError {
248 fn from(_: Infallible) -> Self {
249 unreachable!("Infallible failed")
251 }
252}
253
254#[cfg(all(feature = "wasm-bindgen", target_arch = "wasm32"))]
255#[wasm_bindgen]
256impl SamplingMetadata {
257 #[wasm_bindgen(getter)]
259 pub fn cids(&self) -> Vec<js_sys::Uint8Array> {
260 self.cids
261 .iter()
262 .map(|cid| js_sys::Uint8Array::from(cid.to_bytes().as_ref()))
263 .collect()
264 }
265}
266
267#[derive(Message)]
268struct RawSamplingMetadata {
269 #[prost(bool, tag = "1")]
270 accepted: bool,
271
272 #[prost(message, repeated, tag = "2")]
273 cids: Vec<Vec<u8>>,
274
275 #[prost(bool, tag = "3")]
276 unknown: bool,
277}
278
279impl Protobuf<RawSamplingMetadata> for SamplingMetadata {}
280
281impl TryFrom<RawSamplingMetadata> for SamplingMetadata {
282 type Error = cid::Error;
283
284 fn try_from(item: RawSamplingMetadata) -> Result<Self, Self::Error> {
285 let status = if item.unknown {
286 SamplingStatus::Unknown
287 } else if item.accepted {
288 SamplingStatus::Accepted
289 } else {
290 SamplingStatus::Rejected
291 };
292
293 let cids = item
294 .cids
295 .iter()
296 .map(|cid| {
297 let buffer = Cursor::new(cid);
298 Cid::read_bytes(buffer)
299 })
300 .collect::<Result<_, _>>()?;
301
302 Ok(SamplingMetadata { status, cids })
303 }
304}
305
306impl From<SamplingMetadata> for RawSamplingMetadata {
307 fn from(item: SamplingMetadata) -> Self {
308 let cids = item.cids.iter().map(|cid| cid.to_bytes()).collect();
309
310 let (accepted, unknown) = match item.status {
311 SamplingStatus::Unknown => (false, true),
312 SamplingStatus::Accepted => (true, false),
313 SamplingStatus::Rejected => (false, false),
314 };
315
316 RawSamplingMetadata {
317 accepted,
318 unknown,
319 cids,
320 }
321 }
322}
323
324fn to_headers_range(bounds: impl RangeBounds<u64>, last_index: u64) -> Result<RangeInclusive<u64>> {
326 let start = match bounds.start_bound() {
327 Bound::Unbounded => 1,
329 Bound::Included(&x) if x > last_index || x == 0 => return Err(StoreError::NotFound),
331 Bound::Excluded(&x) if x >= last_index => return Err(StoreError::NotFound),
332 Bound::Included(&x) => x,
334 Bound::Excluded(&x) => x + 1, };
336 let end = match bounds.end_bound() {
337 Bound::Unbounded => last_index,
339 Bound::Included(&x) if x > last_index => return Err(StoreError::NotFound),
341 Bound::Excluded(&x) if x > last_index + 1 => return Err(StoreError::NotFound),
342 Bound::Excluded(&0) => 0,
344 Bound::Included(&x) => x,
346 Bound::Excluded(&x) => x - 1,
347 };
348
349 Ok(start..=end)
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use crate::test_utils::ExtendedHeaderGeneratorExt;
356 use celestia_types::test_utils::ExtendedHeaderGenerator;
357 use celestia_types::Height;
358 use lumina_utils::test_utils::async_test as test;
359 use rstest::rstest;
360
361 use crate::test_utils::new_block_ranges;
364
365 #[test]
366 async fn converts_bounded_ranges() {
367 assert_eq!(1..=15, to_headers_range(1..16, 100).unwrap());
368 assert_eq!(1..=15, to_headers_range(1..=15, 100).unwrap());
369 assert_eq!(300..=400, to_headers_range(300..401, 500).unwrap());
370 assert_eq!(300..=400, to_headers_range(300..=400, 500).unwrap());
371 }
372
373 #[test]
374 async fn starts_from_one_when_unbounded_start() {
375 assert_eq!(&1, to_headers_range(..=10, 100).unwrap().start());
376 assert_eq!(&1, to_headers_range(..10, 100).unwrap().start());
377 assert_eq!(&1, to_headers_range(.., 100).unwrap().start());
378 }
379
380 #[test]
381 async fn ends_on_last_index_when_unbounded_end() {
382 assert_eq!(&10, to_headers_range(1.., 10).unwrap().end());
383 assert_eq!(&11, to_headers_range(1.., 11).unwrap().end());
384 assert_eq!(&10, to_headers_range(.., 10).unwrap().end());
385 }
386
387 #[test]
388 async fn handle_ranges_ending_precisely_at_last_index() {
389 let last_index = 10;
390
391 let bounds_ending_at_last_index = [
392 (Bound::Unbounded, Bound::Included(last_index)),
393 (Bound::Unbounded, Bound::Excluded(last_index + 1)),
394 ];
395
396 for bound in bounds_ending_at_last_index {
397 let range = to_headers_range(bound, last_index).unwrap();
398 assert_eq!(*range.end(), last_index);
399 }
400 }
401
402 #[test]
403 async fn handle_ranges_ending_after_last_index() {
404 let last_index = 10;
405
406 let bounds_ending_after_last_index = [
407 (Bound::Unbounded, Bound::Included(last_index + 1)),
408 (Bound::Unbounded, Bound::Excluded(last_index + 2)),
409 ];
410
411 for bound in bounds_ending_after_last_index {
412 to_headers_range(bound, last_index).unwrap_err();
413 }
414 }
415
416 #[test]
417 async fn errors_if_zero_heigth_is_included() {
418 let includes_zero_height = 0..5;
419 to_headers_range(includes_zero_height, 10).unwrap_err();
420 }
421
422 #[test]
423 async fn handle_ranges_starting_precisely_at_last_index() {
424 let last_index = 10;
425
426 let bounds_starting_at_last_index = [
427 (Bound::Included(last_index), Bound::Unbounded),
428 (Bound::Excluded(last_index - 1), Bound::Unbounded),
429 ];
430
431 for bound in bounds_starting_at_last_index {
432 let range = to_headers_range(bound, last_index).unwrap();
433 assert_eq!(*range.start(), last_index);
434 }
435 }
436
437 #[test]
438 async fn handle_ranges_starting_after_last_index() {
439 let last_index = 10;
440
441 let bounds_starting_after_last_index = [
442 (Bound::Included(last_index + 1), Bound::Unbounded),
443 (Bound::Excluded(last_index), Bound::Unbounded),
444 ];
445
446 for bound in bounds_starting_after_last_index {
447 to_headers_range(bound, last_index).unwrap_err();
448 }
449 }
450
451 #[test]
452 async fn handle_ranges_that_lead_to_empty_ranges() {
453 let last_index = 10;
454
455 let bounds_leading_to_empty_range = [
456 (Bound::Unbounded, Bound::Excluded(0)),
457 (Bound::Included(3), Bound::Excluded(3)),
458 (Bound::Included(3), Bound::Included(2)),
459 (Bound::Excluded(2), Bound::Included(2)),
460 ];
461
462 for bound in bounds_leading_to_empty_range {
463 assert!(to_headers_range(bound, last_index).unwrap().is_empty());
464 }
465 }
466
467 #[rstest]
468 #[case::in_memory(new_in_memory_store())]
469 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
470 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
471 #[self::test]
472 async fn test_contains_height<S: Store>(
473 #[case]
474 #[future(awt)]
475 s: S,
476 ) {
477 let mut s = s;
478 fill_store(&mut s, 2).await;
479
480 assert!(!s.has_at(0).await);
481 assert!(s.has_at(1).await);
482 assert!(s.has_at(2).await);
483 assert!(!s.has_at(3).await);
484 }
485
486 #[rstest]
487 #[case::in_memory(new_in_memory_store())]
488 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
489 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
490 #[self::test]
491 async fn test_empty_store<S: Store>(
492 #[case]
493 #[future(awt)]
494 s: S,
495 ) {
496 assert!(matches!(s.head_height().await, Err(StoreError::NotFound)));
497 assert!(matches!(s.get_head().await, Err(StoreError::NotFound)));
498 assert!(matches!(
499 s.get_by_height(1).await,
500 Err(StoreError::NotFound)
501 ));
502 assert!(matches!(
503 s.get_by_hash(&Hash::Sha256([0; 32])).await,
504 Err(StoreError::NotFound)
505 ));
506 }
507
508 #[rstest]
509 #[case::in_memory(new_in_memory_store())]
510 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
511 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
512 #[self::test]
513 async fn test_read_write<S: Store>(
514 #[case]
515 #[future(awt)]
516 s: S,
517 ) {
518 let mut gen = ExtendedHeaderGenerator::new();
519
520 let header = gen.next();
521
522 s.insert(header.clone()).await.unwrap();
523 assert_eq!(s.head_height().await.unwrap(), 1);
524 assert_eq!(s.get_head().await.unwrap(), header);
525 assert_eq!(s.get_by_height(1).await.unwrap(), header);
526 assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
527 }
528
529 #[rstest]
530 #[case::in_memory(new_in_memory_store())]
531 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
532 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
533 #[self::test]
534 async fn test_pregenerated_data<S: Store>(
535 #[case]
536 #[future(awt)]
537 s: S,
538 ) {
539 let mut s = s;
540 fill_store(&mut s, 100).await;
541
542 assert_eq!(s.head_height().await.unwrap(), 100);
543 let head = s.get_head().await.unwrap();
544 assert_eq!(s.get_by_height(100).await.unwrap(), head);
545 assert!(matches!(
546 s.get_by_height(101).await,
547 Err(StoreError::NotFound)
548 ));
549
550 let header = s.get_by_height(54).await.unwrap();
551 assert_eq!(s.get_by_hash(&header.hash()).await.unwrap(), header);
552 }
553
554 #[rstest]
555 #[case::in_memory(new_in_memory_store())]
556 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
557 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
558 #[self::test]
559 async fn test_duplicate_insert<S: Store>(
560 #[case]
561 #[future(awt)]
562 s: S,
563 ) {
564 let mut s = s;
565 let mut gen = fill_store(&mut s, 100).await;
566
567 let header101 = gen.next();
568 s.insert(header101.clone()).await.unwrap();
569
570 let error = match s.insert(header101).await {
571 Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
572 res => panic!("Invalid result: {res:?}"),
573 };
574
575 assert_eq!(
576 error,
577 BlockRangesError::BlockRangeOverlap(101..=101, 101..=101)
578 );
579 }
580
581 #[rstest]
582 #[case::in_memory(new_in_memory_store())]
583 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
584 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
585 #[self::test]
586 async fn test_overwrite_height<S: Store>(
587 #[case]
588 #[future(awt)]
589 s: S,
590 ) {
591 let mut s = s;
592 let gen = fill_store(&mut s, 100).await;
593
594 let header29 = s.get_by_height(29).await.unwrap();
596 let header30 = gen.next_of(&header29);
597
598 let error = match s.insert(header30).await {
599 Err(StoreError::InsertionFailed(StoreInsertionError::ContraintsNotMet(e))) => e,
600 res => panic!("Invalid result: {res:?}"),
601 };
602 assert_eq!(error, BlockRangesError::BlockRangeOverlap(30..=30, 30..=30));
603 }
604
605 #[rstest]
606 #[case::in_memory(new_in_memory_store())]
607 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
608 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
609 #[self::test]
610 async fn test_overwrite_hash<S: Store>(
611 #[case]
612 #[future(awt)]
613 s: S,
614 ) {
615 let mut s = s;
616 fill_store(&mut s, 100).await;
617
618 let mut dup_header = s.get_by_height(99).await.unwrap();
619 dup_header.header.height = Height::from(102u32);
620
621 assert!(matches!(
622 s.insert(dup_header).await,
623 Err(StoreError::InsertionFailed(
624 StoreInsertionError::HashExists(_)
625 ))
626 ));
627 }
628
629 #[rstest]
630 #[case::in_memory(new_in_memory_store())]
631 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
632 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
633 #[self::test]
634 async fn test_append_range<S: Store>(
635 #[case]
636 #[future(awt)]
637 s: S,
638 ) {
639 let mut s = s;
640 let mut gen = fill_store(&mut s, 10).await;
641
642 s.insert(gen.next_many_verified(4)).await.unwrap();
643 s.get_by_height(14).await.unwrap();
644 }
645
646 #[rstest]
647 #[case::in_memory(new_in_memory_store())]
648 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
649 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
650 #[self::test]
651 async fn test_fill_range_gap<S: Store>(
652 #[case]
653 #[future(awt)]
654 s: S,
655 ) {
656 let mut s = s;
657 let mut gen = fill_store(&mut s, 10).await;
658
659 let skipped = gen.next();
661 let upcoming_head = gen.next();
663
664 s.insert(upcoming_head).await.unwrap();
665 s.insert(skipped).await.unwrap();
666 }
667
668 #[rstest]
669 #[case::in_memory(new_in_memory_store())]
670 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
671 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
672 #[self::test]
673 async fn test_fill_range_gap_with_invalid_header<S: Store>(
674 #[case]
675 #[future(awt)]
676 s: S,
677 ) {
678 let mut s = s;
679 let mut gen = fill_store(&mut s, 10).await;
680
681 let mut gen_prime = gen.fork();
682 let _skipped = gen.next();
684 let another_chain = gen_prime.next();
685 let upcoming_head = gen.next();
687
688 s.insert(upcoming_head).await.unwrap();
689 assert!(matches!(
690 s.insert(another_chain).await,
691 Err(StoreError::InsertionFailed(
692 StoreInsertionError::NeighborsVerificationFailed(_)
693 ))
694 ));
695 }
696
697 #[rstest]
698 #[case::in_memory(new_in_memory_store())]
699 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
700 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
701 #[self::test]
702 async fn test_appends_with_gaps<S: Store>(
703 #[case]
704 #[future(awt)]
705 s: S,
706 ) {
707 let mut gen = ExtendedHeaderGenerator::new_from_height(5);
708 let header5 = gen.next();
709 gen.next_many(4);
710 let header10 = gen.next();
711 gen.next_many(4);
712 let header15 = gen.next();
713
714 s.insert(header5).await.unwrap();
715 s.insert(header15).await.unwrap();
716 s.insert(header10).await.unwrap_err();
717 }
718
719 #[rstest]
720 #[case::in_memory(new_in_memory_store())]
721 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
722 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
723 #[self::test]
724 async fn test_sampling_height_empty_store<S: Store>(
725 #[case]
726 #[future(awt)]
727 store: S,
728 ) {
729 store
730 .update_sampling_metadata(0, SamplingStatus::Accepted, vec![])
731 .await
732 .unwrap_err();
733 store
734 .update_sampling_metadata(1, SamplingStatus::Accepted, vec![])
735 .await
736 .unwrap_err();
737 }
738
739 #[rstest]
740 #[case::in_memory(new_in_memory_store())]
741 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
742 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
743 #[self::test]
744 async fn test_sampling_height<S: Store>(
745 #[case]
746 #[future(awt)]
747 s: S,
748 ) {
749 let mut store = s;
750 fill_store(&mut store, 9).await;
751
752 store
753 .update_sampling_metadata(0, SamplingStatus::Accepted, vec![])
754 .await
755 .unwrap_err();
756 store
757 .update_sampling_metadata(1, SamplingStatus::Accepted, vec![])
758 .await
759 .unwrap();
760 store
761 .update_sampling_metadata(2, SamplingStatus::Accepted, vec![])
762 .await
763 .unwrap();
764 store
765 .update_sampling_metadata(3, SamplingStatus::Rejected, vec![])
766 .await
767 .unwrap();
768 store
769 .update_sampling_metadata(4, SamplingStatus::Accepted, vec![])
770 .await
771 .unwrap();
772 store
773 .update_sampling_metadata(5, SamplingStatus::Rejected, vec![])
774 .await
775 .unwrap();
776 store
777 .update_sampling_metadata(6, SamplingStatus::Rejected, vec![])
778 .await
779 .unwrap();
780
781 store
782 .update_sampling_metadata(8, SamplingStatus::Accepted, vec![])
783 .await
784 .unwrap();
785
786 store
787 .update_sampling_metadata(7, SamplingStatus::Accepted, vec![])
788 .await
789 .unwrap();
790
791 store
792 .update_sampling_metadata(9, SamplingStatus::Accepted, vec![])
793 .await
794 .unwrap();
795
796 store
797 .update_sampling_metadata(10, SamplingStatus::Accepted, vec![])
798 .await
799 .unwrap_err();
800 store
801 .update_sampling_metadata(10, SamplingStatus::Rejected, vec![])
802 .await
803 .unwrap_err();
804 store
805 .update_sampling_metadata(20, SamplingStatus::Accepted, vec![])
806 .await
807 .unwrap_err();
808 }
809
810 #[rstest]
811 #[case::in_memory(new_in_memory_store())]
812 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
813 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
814 #[self::test]
815 async fn test_sampling_merge<S: Store>(
816 #[case]
817 #[future(awt)]
818 s: S,
819 ) {
820 let mut store = s;
821 fill_store(&mut store, 1).await;
822
823 let cid0 = "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ"
824 .parse()
825 .unwrap();
826 let cid1 = "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA"
827 .parse()
828 .unwrap();
829 let cid2 = "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq"
830 .parse()
831 .unwrap();
832
833 store
834 .update_sampling_metadata(1, SamplingStatus::Rejected, vec![cid0])
835 .await
836 .unwrap();
837
838 store
839 .update_sampling_metadata(1, SamplingStatus::Rejected, vec![])
840 .await
841 .unwrap();
842
843 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
844 assert_eq!(sampling_data.status, SamplingStatus::Rejected);
845 assert_eq!(sampling_data.cids, vec![cid0]);
846
847 store
848 .update_sampling_metadata(1, SamplingStatus::Accepted, vec![cid1])
849 .await
850 .unwrap();
851
852 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
853 assert_eq!(sampling_data.status, SamplingStatus::Accepted);
854 assert_eq!(sampling_data.cids, vec![cid0, cid1]);
855
856 store
857 .update_sampling_metadata(1, SamplingStatus::Accepted, vec![cid0, cid2])
858 .await
859 .unwrap();
860
861 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
862 assert_eq!(sampling_data.status, SamplingStatus::Accepted);
863 assert_eq!(sampling_data.cids, vec![cid0, cid1, cid2]);
864 }
865
866 #[rstest]
867 #[case::in_memory(new_in_memory_store())]
868 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
869 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
870 #[self::test]
871 async fn test_sampled_cids<S: Store>(
872 #[case]
873 #[future(awt)]
874 s: S,
875 ) {
876 let mut store = s;
877 fill_store(&mut store, 5).await;
878
879 let cids: Vec<Cid> = [
880 "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq",
881 "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi",
882 "zdpuAyvkgEDQm9TenwGkd5eNaosSxjgEYd8QatfPetgB1CdEZ",
883 "zb2rhe5P4gXftAwvA4eXQ5HJwsER2owDyS9sKaQRRVQPn93bA",
884 ]
885 .iter()
886 .map(|s| s.parse().unwrap())
887 .collect();
888
889 store
890 .update_sampling_metadata(1, SamplingStatus::Accepted, cids.clone())
891 .await
892 .unwrap();
893 store
894 .update_sampling_metadata(2, SamplingStatus::Accepted, cids[0..1].to_vec())
895 .await
896 .unwrap();
897 store
898 .update_sampling_metadata(4, SamplingStatus::Rejected, cids[3..].to_vec())
899 .await
900 .unwrap();
901 store
902 .update_sampling_metadata(5, SamplingStatus::Rejected, vec![])
903 .await
904 .unwrap();
905
906 let sampling_data = store.get_sampling_metadata(1).await.unwrap().unwrap();
907 assert_eq!(sampling_data.cids, cids);
908 assert_eq!(sampling_data.status, SamplingStatus::Accepted);
909
910 let sampling_data = store.get_sampling_metadata(2).await.unwrap().unwrap();
911 assert_eq!(sampling_data.cids, cids[0..1]);
912 assert_eq!(sampling_data.status, SamplingStatus::Accepted);
913
914 assert!(store.get_sampling_metadata(3).await.unwrap().is_none());
915
916 let sampling_data = store.get_sampling_metadata(4).await.unwrap().unwrap();
917 assert_eq!(sampling_data.cids, cids[3..]);
918 assert_eq!(sampling_data.status, SamplingStatus::Rejected);
919
920 let sampling_data = store.get_sampling_metadata(5).await.unwrap().unwrap();
921 assert_eq!(sampling_data.cids, vec![]);
922 assert_eq!(sampling_data.status, SamplingStatus::Rejected);
923
924 assert!(matches!(
925 store.get_sampling_metadata(0).await,
926 Err(StoreError::NotFound)
927 ));
928 assert!(matches!(
929 store.get_sampling_metadata(6).await,
930 Err(StoreError::NotFound)
931 ));
932 assert!(matches!(
933 store.get_sampling_metadata(100).await,
934 Err(StoreError::NotFound)
935 ));
936 }
937
938 #[rstest]
939 #[case::in_memory(new_in_memory_store())]
940 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
941 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
942 #[self::test]
943 async fn test_empty_store_range<S: Store>(
944 #[case]
945 #[future(awt)]
946 s: S,
947 ) {
948 let store = s;
949
950 assert_eq!(
951 store.get_stored_header_ranges().await.unwrap().as_ref(),
952 &[]
953 );
954 }
955
956 #[rstest]
957 #[case::in_memory(new_in_memory_store())]
958 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
959 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
960 #[self::test]
961 async fn test_single_header_range<S: Store>(
962 #[case]
963 #[future(awt)]
964 s: S,
965 ) {
966 let store = s;
967 let mut gen = ExtendedHeaderGenerator::new();
968
969 gen.skip(19);
970
971 let prepend0 = gen.next();
972 let prepend1 = gen.next_many_verified(5);
973 store.insert(gen.next_many_verified(4)).await.unwrap();
974 store.insert(gen.next_many_verified(5)).await.unwrap();
975 store.insert(prepend1).await.unwrap();
976 store.insert(prepend0).await.unwrap();
977 store.insert(gen.next_many_verified(5)).await.unwrap();
978 store.insert(gen.next()).await.unwrap();
979
980 let final_ranges = store.get_stored_header_ranges().await.unwrap();
981 assert_eq!(final_ranges.as_ref(), &[20..=40]);
982 }
983
984 #[rstest]
987 #[case::in_memory(new_in_memory_store())]
988 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
989 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
990 #[self::test]
991 async fn test_ranges_consolidation<S: Store>(
992 #[case]
993 #[future(awt)]
994 s: S,
995 ) {
996 let store = s;
997 let mut gen = ExtendedHeaderGenerator::new();
998
999 gen.skip(9);
1000
1001 let skip0 = gen.next_many_verified(5);
1002 store.insert(gen.next_many_verified(2)).await.unwrap();
1003 store.insert(gen.next_many_verified(3)).await.unwrap();
1004
1005 let skip1 = gen.next();
1006 store.insert(gen.next()).await.unwrap();
1007
1008 let skip2 = gen.next_many_verified(5);
1009
1010 store.insert(gen.next()).await.unwrap();
1011
1012 let skip3 = gen.next_many_verified(5);
1013 let skip4 = gen.next_many_verified(5);
1014 let skip5 = gen.next_many_verified(5);
1015
1016 store.insert(skip5).await.unwrap();
1017 store.insert(skip4).await.unwrap();
1018 store.insert(skip3).await.unwrap();
1019 store.insert(skip2).await.unwrap();
1020 store.insert(skip1).await.unwrap();
1021 store.insert(skip0).await.unwrap();
1022
1023 let final_ranges = store.get_stored_header_ranges().await.unwrap();
1024 assert_eq!(final_ranges.as_ref(), &[10..=42]);
1025 }
1026
1027 #[rstest]
1028 #[case::in_memory(new_in_memory_store())]
1029 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1030 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1031 #[self::test]
1032 async fn test_neighbour_validation<S: Store>(
1033 #[case]
1034 #[future(awt)]
1035 s: S,
1036 ) {
1037 let store = s;
1038 let mut gen = ExtendedHeaderGenerator::new();
1039
1040 store.insert(gen.next_many_verified(5)).await.unwrap();
1041 let mut fork = gen.fork();
1042 let _gap = gen.next();
1043 store.insert(gen.next_many_verified(4)).await.unwrap();
1044
1045 store.insert(fork.next()).await.unwrap_err();
1046 }
1047
1048 #[rstest]
1049 #[case::in_memory(new_in_memory_store())]
1050 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1051 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1052 #[self::test]
1053 async fn tail_removal_partial_range<S: Store>(
1054 #[case]
1055 #[future(awt)]
1056 s: S,
1057 ) {
1058 let store = s;
1059 let headers = ExtendedHeaderGenerator::new().next_many(128);
1060
1061 store.insert(&headers[0..64]).await.unwrap();
1062 store.insert(&headers[96..128]).await.unwrap();
1063 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1064
1065 store.remove_height(1).await.unwrap();
1066 assert_store(&store, &headers, new_block_ranges([2..=64, 97..=128])).await;
1067 }
1068
1069 #[rstest]
1070 #[case::in_memory(new_in_memory_store())]
1071 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1072 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1073 #[self::test]
1074 async fn tail_removal_full_range<S: Store>(
1075 #[case]
1076 #[future(awt)]
1077 s: S,
1078 ) {
1079 let store = s;
1080 let headers = ExtendedHeaderGenerator::new().next_many(128);
1081
1082 store.insert(&headers[0..1]).await.unwrap();
1083 store.insert(&headers[65..128]).await.unwrap();
1084 assert_store(&store, &headers, new_block_ranges([1..=1, 66..=128])).await;
1085
1086 store.remove_height(1).await.unwrap();
1087 assert_store(&store, &headers, new_block_ranges([66..=128])).await;
1088 }
1089
1090 #[rstest]
1091 #[case::in_memory(new_in_memory_store())]
1092 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1093 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1094 #[self::test]
1095 async fn tail_removal_remove_all<S: Store>(
1096 #[case]
1097 #[future(awt)]
1098 s: S,
1099 ) {
1100 let store = s;
1101 let headers = ExtendedHeaderGenerator::new().next_many(66);
1102
1103 store.insert(&headers[..]).await.unwrap();
1104 assert_store(&store, &headers, new_block_ranges([1..=66])).await;
1105
1106 for i in 1..=66 {
1107 store.remove_height(i).await.unwrap();
1108 }
1109
1110 let stored_ranges = store.get_stored_header_ranges().await.unwrap();
1111 assert!(stored_ranges.is_empty());
1112
1113 for h in 1..=66 {
1114 assert!(!store.has_at(h).await);
1115 }
1116 }
1117
1118 #[rstest]
1119 #[case::in_memory(new_in_memory_store())]
1120 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1121 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1122 #[self::test]
1123 async fn head_removal_partial_range<S: Store>(
1124 #[case]
1125 #[future(awt)]
1126 s: S,
1127 ) {
1128 let store = s;
1129 let headers = ExtendedHeaderGenerator::new().next_many(128);
1130
1131 store.insert(&headers[0..64]).await.unwrap();
1132 store.insert(&headers[96..128]).await.unwrap();
1133 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1134
1135 store.remove_height(128).await.unwrap();
1136 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=127])).await;
1137 }
1138
1139 #[rstest]
1140 #[case::in_memory(new_in_memory_store())]
1141 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1142 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1143 #[self::test]
1144 async fn head_removal_full_range<S: Store>(
1145 #[case]
1146 #[future(awt)]
1147 s: S,
1148 ) {
1149 let store = s;
1150 let headers = ExtendedHeaderGenerator::new().next_many(128);
1151
1152 store.insert(&headers[0..64]).await.unwrap();
1153 store.insert(&headers[127..128]).await.unwrap();
1154 assert_store(&store, &headers, new_block_ranges([1..=64, 128..=128])).await;
1155
1156 store.remove_height(128).await.unwrap();
1157 assert_store(&store, &headers, new_block_ranges([1..=64])).await;
1158 }
1159
1160 #[rstest]
1161 #[case::in_memory(new_in_memory_store())]
1162 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1163 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1164 #[self::test]
1165 async fn middle_removal<S: Store>(
1166 #[case]
1167 #[future(awt)]
1168 s: S,
1169 ) {
1170 let store = s;
1171 let headers = ExtendedHeaderGenerator::new().next_many(128);
1172
1173 store.insert(&headers[0..64]).await.unwrap();
1174 store.insert(&headers[96..128]).await.unwrap();
1175 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1176
1177 store.remove_height(62).await.unwrap();
1178 assert_store(
1179 &store,
1180 &headers,
1181 new_block_ranges([1..=61, 63..=64, 97..=128]),
1182 )
1183 .await;
1184
1185 store.remove_height(64).await.unwrap();
1186 assert_store(
1187 &store,
1188 &headers,
1189 new_block_ranges([1..=61, 63..=63, 97..=128]),
1190 )
1191 .await;
1192
1193 store.remove_height(63).await.unwrap();
1194 assert_store(&store, &headers, new_block_ranges([1..=61, 97..=128])).await;
1195 }
1196
1197 #[rstest]
1198 #[case::in_memory(new_in_memory_store())]
1199 #[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
1200 #[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
1201 #[self::test]
1202 async fn neighbor_removal<S: Store>(
1203 #[case]
1204 #[future(awt)]
1205 s: S,
1206 ) {
1207 let store = s;
1208 let headers = ExtendedHeaderGenerator::new().next_many(128);
1209
1210 store.insert(&headers[0..64]).await.unwrap();
1211 store.insert(&headers[96..128]).await.unwrap();
1212 assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;
1213
1214 store.remove_height(64).await.unwrap();
1215 assert_store(&store, &headers, new_block_ranges([1..=63, 97..=128])).await;
1216
1217 store.remove_height(97).await.unwrap();
1218 assert_store(&store, &headers, new_block_ranges([1..=63, 98..=128])).await;
1219 }
1220
1221 async fn fill_store<S: Store>(store: &mut S, amount: u64) -> ExtendedHeaderGenerator {
1223 assert!(!store.has_at(1).await, "Store is not empty");
1224
1225 let mut gen = ExtendedHeaderGenerator::new();
1226
1227 store
1228 .insert(gen.next_many_verified(amount))
1229 .await
1230 .expect("inserting test data failed");
1231
1232 gen
1233 }
1234
1235 async fn new_in_memory_store() -> InMemoryStore {
1236 InMemoryStore::new()
1237 }
1238
1239 pub(crate) async fn assert_store<S: Store>(
1240 store: &S,
1241 headers: &[ExtendedHeader],
1242 expected_ranges: BlockRanges,
1243 ) {
1244 assert_eq!(
1245 store.get_stored_header_ranges().await.unwrap(),
1246 expected_ranges
1247 );
1248 for header in headers {
1249 let height = header.height().value();
1250 if expected_ranges.contains(height) {
1251 assert_eq!(&store.get_by_height(height).await.unwrap(), header);
1252 assert_eq!(&store.get_by_hash(&header.hash()).await.unwrap(), header);
1253 } else {
1254 assert!(matches!(
1255 store.get_by_height(height).await.unwrap_err(),
1256 StoreError::NotFound
1257 ));
1258 assert!(matches!(
1259 store.get_by_hash(&header.hash()).await.unwrap_err(),
1260 StoreError::NotFound
1261 ));
1262 }
1263 }
1264 }
1265
1266 #[cfg(not(target_arch = "wasm32"))]
1267 async fn new_redb_store() -> RedbStore {
1268 RedbStore::in_memory().await.unwrap()
1269 }
1270
1271 #[cfg(target_arch = "wasm32")]
1272 async fn new_indexed_db_store() -> IndexedDbStore {
1273 use std::sync::atomic::{AtomicU32, Ordering};
1274 static NEXT_ID: AtomicU32 = AtomicU32::new(0);
1275
1276 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
1277 let db_name = format!("indexeddb-lumina-node-store-test-{id}");
1278
1279 rexie::Rexie::delete(&db_name).await.unwrap();
1281
1282 IndexedDbStore::new(&db_name)
1283 .await
1284 .expect("creating test store failed")
1285 }
1286}