1use crate::{
3 merkle::{hasher::Standard as StandardHasher, Family, Location},
4 qmdb::{
5 self,
6 sync::{
7 database::Config as _,
8 error::EngineError,
9 requests::{Id as RequestId, Requests},
10 resolver::{FetchResult, Resolver},
11 target::validate_update,
12 Database, DbResolver, Error as SyncError, Journal, Target,
13 },
14 },
15};
16use commonware_codec::Encode;
17use commonware_cryptography::Digest;
18use commonware_macros::select;
19use commonware_runtime::{
20 telemetry::metrics::{Gauge, GaugeExt, MetricsExt},
21 Supervisor as _,
22};
23use commonware_utils::{
24 channel::{
25 fallible::{AsyncFallibleExt, OneshotExt as _},
26 mpsc, oneshot,
27 },
28 NZU64,
29};
30use futures::{
31 future::{pending, Either},
32 StreamExt,
33};
34use mpsc::error::TryRecvError;
35use std::{
36 collections::{BTreeMap, HashMap, VecDeque},
37 fmt::Debug,
38 num::NonZeroU64,
39};
40
41type Error<DB, R> =
43 qmdb::sync::Error<<DB as Database>::Family, <R as Resolver>::Error, <DB as Database>::Digest>;
44
45#[derive(Debug)]
47pub(crate) enum NextStep<C, D> {
48 Continue(C),
50 Complete(D),
52}
53
54#[derive(Debug)]
56enum Event<F: Family, Op, D: Digest, E> {
57 TargetUpdate(Target<F, D>),
59 BatchReceived(IndexedFetchResult<F, Op, D, E>),
61 UpdateChannelClosed,
63 FinishRequested,
65 FinishChannelClosed,
67}
68
69struct ProgressMetrics {
71 journal_size: Gauge,
72 target_end: Gauge,
73}
74
75impl ProgressMetrics {
76 fn new(context: &impl commonware_runtime::Metrics) -> Self {
78 let journal_size = context.gauge("journal_size", "Current sync journal size");
79 let target_end = context.gauge(
80 "target_end",
81 "Exclusive target range end, equal to journal size when sync completes",
82 );
83
84 Self {
85 journal_size,
86 target_end,
87 }
88 }
89
90 fn record(&self, journal_size: u64, target_end: u64) {
92 let _ = self.journal_size.try_set(journal_size);
93 let _ = self.target_end.try_set(target_end);
94 }
95}
96
97#[derive(Debug)]
99pub(super) struct IndexedFetchResult<F: Family, Op, D: Digest, E> {
100 pub id: RequestId,
102 pub result: Result<FetchResult<F, Op, D>, E>,
104}
105
106async fn wait_for_event<F: Family, Op, D: Digest, E>(
109 update_rx: &mut Option<mpsc::Receiver<Target<F, D>>>,
110 finish_rx: &mut Option<mpsc::Receiver<()>>,
111 outstanding_requests: &mut Requests<F, Op, D, E>,
112) -> Option<Event<F, Op, D, E>> {
113 if outstanding_requests.len() == 0 && update_rx.is_none() && finish_rx.is_none() {
114 return None;
115 }
116
117 let target_update_fut = update_rx.as_mut().map_or_else(
118 || Either::Right(pending()),
119 |update_rx| Either::Left(update_rx.recv()),
120 );
121 let finish_fut = finish_rx.as_mut().map_or_else(
122 || Either::Right(pending()),
123 |finish_rx| Either::Left(finish_rx.recv()),
124 );
125 let batch_result_fut = if outstanding_requests.len() == 0 {
126 Either::Right(pending())
127 } else {
128 Either::Left(outstanding_requests.futures_mut().next())
129 };
130
131 select! {
132 finish = finish_fut => finish.map_or_else(
133 || Some(Event::FinishChannelClosed),
134 |_| Some(Event::FinishRequested)
135 ),
136 target = target_update_fut => target.map_or_else(
137 || Some(Event::UpdateChannelClosed),
138 |target| Some(Event::TargetUpdate(target))
139 ),
140 result = batch_result_fut => result.map(|fetch_result| Event::BatchReceived(fetch_result)),
141 }
142}
143
144pub struct Config<DB, R>
146where
147 DB: Database,
148 R: DbResolver<DB>,
149 DB::Op: Encode,
150{
151 pub context: DB::Context,
153 pub resolver: R,
155 pub target: Target<DB::Family, DB::Digest>,
157 pub max_outstanding_requests: usize,
159 pub fetch_batch_size: NonZeroU64,
161 pub apply_batch_size: usize,
163 pub db_config: DB::Config,
165 pub update_rx: Option<mpsc::Receiver<Target<DB::Family, DB::Digest>>>,
167 pub finish_rx: Option<mpsc::Receiver<()>>,
171 pub reached_target_tx: Option<mpsc::Sender<Target<DB::Family, DB::Digest>>>,
178 pub max_retained_roots: usize,
182}
183pub(crate) struct Engine<DB, R>
185where
186 DB: Database,
187 R: DbResolver<DB>,
188 DB::Op: Encode,
189{
190 outstanding_requests: Requests<DB::Family, DB::Op, DB::Digest, R::Error>,
192
193 fetched_operations: BTreeMap<Location<DB::Family>, Vec<DB::Op>>,
199
200 pinned_nodes: Option<Vec<DB::Digest>>,
202
203 retained_roots: HashMap<Location<DB::Family>, DB::Digest>,
209
210 retained_roots_order: VecDeque<Location<DB::Family>>,
213
214 max_retained_roots: usize,
216
217 target: Target<DB::Family, DB::Digest>,
219
220 max_outstanding_requests: usize,
222
223 fetch_batch_size: NonZeroU64,
225
226 apply_batch_size: usize,
228
229 journal: DB::Journal,
231
232 resolver: R,
234
235 hasher: StandardHasher<DB::Hasher>,
237
238 context: DB::Context,
240
241 config: DB::Config,
243
244 update_rx: Option<mpsc::Receiver<Target<DB::Family, DB::Digest>>>,
246
247 finish_rx: Option<mpsc::Receiver<()>>,
251
252 reached_target_tx: Option<mpsc::Sender<Target<DB::Family, DB::Digest>>>,
259
260 progress_metrics: ProgressMetrics,
262
263 finish_requested: bool,
265
266 reached_current_target_reported: bool,
268}
269
270#[cfg(test)]
271impl<DB, R> Engine<DB, R>
272where
273 DB: Database,
274 R: DbResolver<DB>,
275 DB::Op: Encode,
276{
277 pub(crate) fn journal(&self) -> &DB::Journal {
278 &self.journal
279 }
280}
281
282impl<DB, R> Engine<DB, R>
283where
284 DB: Database,
285 R: DbResolver<DB>,
286 DB::Op: Encode,
287{
288 pub async fn new(config: Config<DB, R>) -> Result<Self, Error<DB, R>> {
289 if !config.target.range.end().is_valid() {
290 return Err(SyncError::Engine(EngineError::InvalidTarget {
291 lower_bound_pos: config.target.range.start(),
292 upper_bound_pos: config.target.range.end(),
293 }));
294 }
295
296 let journal = <DB::Journal as Journal<DB::Family>>::new(
298 config.context.child("journal"),
299 config.db_config.journal_config(),
300 config.target.range.clone(),
301 )
302 .await?;
303 let journal_size = journal.size().await;
304
305 let pinned_nodes = if journal_size == *config.target.range.end() {
310 DB::local_boundary_nodes(
311 config.context.child("local_boundary"),
312 &config.db_config,
313 &config.target,
314 &journal,
315 )
316 .await?
317 } else {
318 None
319 };
320
321 let sync_context = config.context.child("sync");
322 let progress_metrics = ProgressMetrics::new(&sync_context);
323 let mut engine = Self {
324 outstanding_requests: Requests::new(),
325 fetched_operations: BTreeMap::new(),
326 pinned_nodes,
327 retained_roots: HashMap::new(),
328 retained_roots_order: VecDeque::new(),
329 max_retained_roots: config.max_retained_roots,
330 target: config.target.clone(),
331 max_outstanding_requests: config.max_outstanding_requests,
332 fetch_batch_size: config.fetch_batch_size,
333 apply_batch_size: config.apply_batch_size,
334 journal,
335 resolver: config.resolver.clone(),
336 hasher: qmdb::hasher::<DB::Hasher>(),
337 context: config.context,
338 config: config.db_config,
339 update_rx: config.update_rx,
340 finish_rx: config.finish_rx,
341 reached_target_tx: config.reached_target_tx,
342 finish_requested: false,
343 reached_current_target_reported: false,
344 progress_metrics,
345 };
346 engine.schedule_requests().await?;
347 engine.record_progress().await;
348 Ok(engine)
349 }
350
351 async fn schedule_requests(&mut self) -> Result<(), Error<DB, R>> {
353 let target_size = self.target.range.end();
354
355 if !self.has_boundary_state()
358 && !self
359 .outstanding_requests
360 .contains(&self.target.range.start())
361 {
362 let start_loc = self.target.range.start();
363 let resolver = self.resolver.clone();
364 let (cancel_tx, cancel_rx) = oneshot::channel();
365 let id = self.outstanding_requests.next_id();
366 self.outstanding_requests.insert(
367 id,
368 start_loc,
369 target_size,
370 cancel_tx,
371 Box::pin(async move {
372 let result = resolver
373 .get_operations(target_size, start_loc, NZU64!(1), true, cancel_rx)
374 .await;
375 IndexedFetchResult { id, result }
376 }),
377 );
378 }
379
380 let num_requests = self
382 .max_outstanding_requests
383 .saturating_sub(self.outstanding_requests.len());
384
385 let log_size = self.journal.size().await;
386
387 for _ in 0..num_requests {
388 let operation_counts: BTreeMap<Location<DB::Family>, u64> = self
390 .fetched_operations
391 .iter()
392 .map(|(&start_loc, operations)| (start_loc, operations.len() as u64))
393 .collect();
394
395 let Some(gap_range) = crate::qmdb::sync::gaps::find_next(
397 Location::new(log_size)..self.target.range.end(),
398 &operation_counts,
399 self.outstanding_requests.locations(),
400 self.fetch_batch_size,
401 ) else {
402 break; };
404
405 let gap_size = *gap_range.end.checked_sub(*gap_range.start).unwrap();
407 let gap_size: NonZeroU64 = gap_size.try_into().unwrap();
408 let batch_size = self.fetch_batch_size.min(gap_size);
409
410 let resolver = self.resolver.clone();
412 let (cancel_tx, cancel_rx) = oneshot::channel();
413 let id = self.outstanding_requests.next_id();
414 self.outstanding_requests.insert(
415 id,
416 gap_range.start,
417 target_size,
418 cancel_tx,
419 Box::pin(async move {
420 let result = resolver
421 .get_operations(target_size, gap_range.start, batch_size, false, cancel_rx)
422 .await;
423 IndexedFetchResult { id, result }
424 }),
425 );
426 }
427
428 Ok(())
429 }
430
431 pub async fn reset_for_target_update(
438 mut self,
439 new_target: Target<DB::Family, DB::Digest>,
440 ) -> Result<Self, Error<DB, R>> {
441 self.journal.resize(new_target.range.start()).await?;
442 self.outstanding_requests
445 .remove_before(new_target.range.start().checked_add(1).unwrap());
446 self.fetched_operations.clear();
447 self.pinned_nodes = None;
448
449 if self.max_retained_roots > 0 {
452 let old_target_size = self.target.range.end();
453 assert!(
454 self.retained_roots
455 .insert(old_target_size, self.target.root)
456 .is_none(),
457 "duplicate retained root for tree size {old_target_size:?}"
458 );
459 self.retained_roots_order.push_back(old_target_size);
460 while self.retained_roots.len() > self.max_retained_roots {
461 if let Some(oldest) = self.retained_roots_order.pop_front() {
462 self.retained_roots.remove(&oldest);
463 }
464 }
465 }
466
467 self.target = new_target;
468 self.reached_current_target_reported = false;
469 Ok(self)
470 }
471
472 fn drain_finish_requests(&mut self) -> Result<(), Error<DB, R>> {
478 let Some(finish_rx) = self.finish_rx.as_mut() else {
479 return Ok(());
480 };
481 match finish_rx.try_recv() {
482 Ok(()) => {
483 self.accept_finish();
484 Ok(())
485 }
486 Err(TryRecvError::Empty) => Ok(()),
487 Err(TryRecvError::Disconnected) => {
488 Err(SyncError::Engine(EngineError::FinishChannelClosed))
489 }
490 }
491 }
492
493 fn accept_finish(&mut self) {
498 self.finish_requested = true;
499 self.finish_rx = None;
500 }
501
502 async fn report_reached_target(&mut self) {
510 if self.reached_current_target_reported {
511 return;
512 }
513 if let Some(sender) = self.reached_target_tx.as_ref() {
514 if !sender.send_lossy(self.target.clone()).await {
515 self.reached_target_tx = None;
516 }
517 }
518 self.reached_current_target_reported = true;
519 }
520
521 async fn record_progress(&mut self) {
523 self.progress_metrics
524 .record(self.journal.size().await, *self.target.range.end());
525 }
526
527 pub(crate) fn store_operations(
529 &mut self,
530 start_loc: Location<DB::Family>,
531 operations: Vec<DB::Op>,
532 ) {
533 if operations.is_empty() {
534 return;
535 }
536 self.fetched_operations.insert(start_loc, operations);
537 }
538
539 pub(crate) async fn apply_operations(&mut self) -> Result<(), Error<DB, R>> {
545 let mut next_loc = self.journal.size().await;
546
547 self.fetched_operations.retain(|&start_loc, operations| {
550 assert!(!operations.is_empty());
551 let end_loc = start_loc.checked_add(operations.len() as u64 - 1).unwrap();
552 end_loc >= next_loc
553 });
554
555 loop {
556 let range_start_loc =
559 self.fetched_operations
560 .iter()
561 .find_map(|(range_start, range_ops)| {
562 assert!(!range_ops.is_empty());
563 let range_end =
564 range_start.checked_add(range_ops.len() as u64 - 1).unwrap();
565 if *range_start <= next_loc && next_loc <= range_end {
566 Some(*range_start)
567 } else {
568 None
569 }
570 });
571
572 let Some(range_start_loc) = range_start_loc else {
573 break;
575 };
576
577 let operations = self.fetched_operations.remove(&range_start_loc).unwrap();
579 assert!(!operations.is_empty());
580 let skip_count = (next_loc - *range_start_loc) as usize;
582 let operations_count = operations.len() - skip_count;
583 let remaining_operations = operations.into_iter().skip(skip_count);
584 next_loc += operations_count as u64;
585 self.apply_operations_batch(remaining_operations).await?;
586 }
587
588 Ok(())
589 }
590
591 async fn apply_operations_batch<I>(&mut self, operations: I) -> Result<(), Error<DB, R>>
593 where
594 I: IntoIterator<Item = DB::Op>,
595 {
596 for op in operations {
597 self.journal.append(op).await?;
598 }
601 Ok(())
602 }
603
604 pub async fn is_at_target(&mut self) -> Result<bool, Error<DB, R>> {
606 let journal_size = self.journal.size().await;
607 let target_journal_size = self.target.range.end();
608
609 if journal_size >= target_journal_size {
611 if journal_size > target_journal_size {
612 return Err(SyncError::Engine(EngineError::InvalidState));
614 }
615 return Ok(true);
616 }
617
618 Ok(false)
619 }
620
621 fn needs_pinned_boundary(&self) -> bool {
623 self.target.range.start() > Location::new(0)
624 }
625
626 fn has_boundary_state(&self) -> bool {
628 !self.needs_pinned_boundary() || self.pinned_nodes.is_some()
629 }
630
631 async fn is_ready_to_complete(&mut self) -> Result<bool, Error<DB, R>> {
633 Ok(self.is_at_target().await? && self.has_boundary_state())
634 }
635
636 fn handle_fetch_result(
643 &mut self,
644 fetch_result: IndexedFetchResult<DB::Family, DB::Op, DB::Digest, R::Error>,
645 ) -> Result<(), Error<DB, R>> {
646 let Some(request) = self.outstanding_requests.remove(fetch_result.id) else {
650 return Ok(());
651 };
652
653 let start_loc = request.start_loc;
654 let FetchResult {
655 proof,
656 operations,
657 pinned_nodes,
658 callback,
659 } = fetch_result.result.map_err(SyncError::Resolver)?;
660
661 let operations_len = operations.len() as u64;
663 if operations_len == 0 || operations_len > self.fetch_batch_size.get() {
664 if let Some(callback) = callback {
667 callback.send_lossy(false);
668 }
669 return Ok(());
670 }
671
672 if proof.leaves != request.target_size {
673 if let Some(callback) = callback {
674 callback.send_lossy(false);
675 }
676 return Ok(());
677 }
678
679 let is_current_target = request.target_size == self.target.range.end();
683 let target_root = if is_current_target {
684 &self.target.root
685 } else {
686 let Some(root) = self.retained_roots.get(&request.target_size) else {
687 return Ok(());
691 };
692 root
693 };
694
695 let need_pinned = is_current_target
698 && self.pinned_nodes.is_none()
699 && start_loc == self.target.range.start();
700 let elements = operations.iter().map(|op| op.encode()).collect::<Vec<_>>();
701 let valid = if need_pinned {
702 let nodes = pinned_nodes.as_deref().unwrap_or(&[]);
703 proof.verify_proof_and_pinned_nodes(
704 &self.hasher,
705 &elements,
706 start_loc,
707 nodes,
708 target_root,
709 )
710 } else {
711 proof.verify_range_inclusion(&self.hasher, &elements, start_loc, target_root)
712 };
713
714 if let Some(callback) = callback {
716 callback.send_lossy(valid);
717 }
718
719 if !valid {
720 if need_pinned {
721 tracing::warn!("boundary proof or pinned nodes failed verification, will retry");
722 }
723 return Ok(());
724 }
725
726 if need_pinned {
728 if let Some(nodes) = pinned_nodes {
729 self.pinned_nodes = Some(nodes);
730 }
731 }
732
733 self.store_operations(start_loc, operations);
735
736 Ok(())
737 }
738
739 async fn handle_event(
741 mut self,
742 event: Event<DB::Family, DB::Op, DB::Digest, R::Error>,
743 ) -> Result<NextStep<Self, DB>, Error<DB, R>> {
744 match event {
745 Event::TargetUpdate(new_target) => {
746 validate_update(&self.target, &new_target)?;
747
748 let mut updated_self = self.reset_for_target_update(new_target).await?;
749 updated_self.record_progress().await;
750 updated_self.schedule_requests().await?;
751 Ok(NextStep::Continue(updated_self))
752 }
753 Event::UpdateChannelClosed => {
754 self.update_rx = None;
755 Ok(NextStep::Continue(self))
756 }
757 Event::FinishRequested => {
758 self.accept_finish();
759 Ok(NextStep::Continue(self))
760 }
761 Event::FinishChannelClosed => Err(SyncError::Engine(EngineError::FinishChannelClosed)),
762 Event::BatchReceived(fetch_result) => {
763 self.handle_fetch_result(fetch_result)?;
764 self.schedule_requests().await?;
765 self.apply_operations().await?;
766 self.record_progress().await;
767 Ok(NextStep::Continue(self))
768 }
769 }
770 }
771
772 pub(crate) async fn step(self) -> Result<NextStep<Self, DB>, Error<DB, R>> {
783 Box::pin(Self::step_inner(self)).await
784 }
785
786 async fn step_inner(mut self) -> Result<NextStep<Self, DB>, Error<DB, R>> {
788 self.drain_finish_requests()?;
789
790 if self.is_ready_to_complete().await? {
792 self.report_reached_target().await;
793
794 if self.finish_rx.is_some() && !self.finish_requested {
795 let event = wait_for_event(
796 &mut self.update_rx,
797 &mut self.finish_rx,
798 &mut self.outstanding_requests,
799 )
800 .await
801 .ok_or(SyncError::Engine(EngineError::SyncStalled))?;
802 return self.handle_event(event).await;
803 }
804
805 self.journal.sync().await?;
806
807 let database = DB::from_sync_result(
809 self.context,
810 self.config,
811 self.journal,
812 self.pinned_nodes,
813 self.target.range.clone(),
814 self.apply_batch_size,
815 )
816 .await?;
817
818 let got_root = database.root();
820 let expected_root = self.target.root;
821 if got_root != expected_root {
822 return Err(SyncError::Engine(EngineError::RootMismatch {
823 expected: expected_root,
824 actual: got_root,
825 }));
826 }
827
828 return Ok(NextStep::Complete(database));
829 }
830
831 let event = wait_for_event(
833 &mut self.update_rx,
834 &mut self.finish_rx,
835 &mut self.outstanding_requests,
836 )
837 .await
838 .ok_or(SyncError::Engine(EngineError::SyncStalled))?;
839 self.handle_event(event).await
840 }
841
842 pub async fn sync(mut self) -> Result<DB, Error<DB, R>> {
847 loop {
849 match self.step().await? {
850 NextStep::Continue(new_engine) => self = new_engine,
851 NextStep::Complete(database) => return Ok(database),
852 }
853 }
854 }
855}
856
857#[cfg(test)]
858mod tests {
859 use super::*;
860 use crate::{
861 merkle::mmr::{Family as MmrFamily, Proof},
862 qmdb::sync::requests::FetchFuture,
863 };
864 use commonware_cryptography::{sha256, Sha256};
865 use commonware_runtime::{deterministic, Runner as _};
866 use commonware_utils::{channel::oneshot, non_empty_range, NZU64};
867 use std::{
868 convert::Infallible,
869 sync::{
870 atomic::{AtomicUsize, Ordering},
871 Arc,
872 },
873 };
874
875 #[derive(Clone)]
876 struct TestConfig {
877 journal_size: u64,
878 boundary_probes: Arc<AtomicUsize>,
879 }
880
881 impl crate::qmdb::sync::DatabaseConfig for TestConfig {
882 type JournalConfig = u64;
883
884 fn journal_config(&self) -> Self::JournalConfig {
885 self.journal_size
886 }
887 }
888
889 struct TestJournal {
890 size: u64,
891 }
892
893 impl Journal<MmrFamily> for TestJournal {
894 type Config = u64;
895 type Context = deterministic::Context;
896 type Error = crate::journal::Error;
897 type Op = i32;
898
899 async fn new(
900 _context: Self::Context,
901 size: Self::Config,
902 _range: commonware_utils::range::NonEmptyRange<Location<MmrFamily>>,
903 ) -> Result<Self, Self::Error> {
904 Ok(Self { size })
905 }
906
907 async fn resize(&mut self, start: Location<MmrFamily>) -> Result<(), Self::Error> {
908 self.size = *start;
909 Ok(())
910 }
911
912 async fn sync(&mut self) -> Result<(), Self::Error> {
913 Ok(())
914 }
915
916 async fn size(&self) -> u64 {
917 self.size
918 }
919
920 async fn append(&mut self, _op: Self::Op) -> Result<(), Self::Error> {
921 self.size += 1;
922 Ok(())
923 }
924 }
925
926 struct TestDb;
927
928 impl Database for TestDb {
929 type Config = TestConfig;
930 type Context = deterministic::Context;
931 type Digest = sha256::Digest;
932 type Family = MmrFamily;
933 type Hasher = Sha256;
934 type Journal = TestJournal;
935 type Op = i32;
936
937 async fn from_sync_result(
938 _context: Self::Context,
939 _config: Self::Config,
940 _journal: Self::Journal,
941 _pinned_nodes: Option<Vec<Self::Digest>>,
942 _range: commonware_utils::range::NonEmptyRange<Location<Self::Family>>,
943 _apply_batch_size: usize,
944 ) -> Result<Self, qmdb::Error<Self::Family>> {
945 Ok(Self)
946 }
947
948 async fn local_boundary_nodes(
949 _context: Self::Context,
950 config: &Self::Config,
951 _target: &Target<Self::Family, Self::Digest>,
952 _journal: &Self::Journal,
953 ) -> Result<Option<Vec<Self::Digest>>, qmdb::Error<Self::Family>> {
954 config.boundary_probes.fetch_add(1, Ordering::SeqCst);
955 Ok(Some(vec![]))
956 }
957
958 fn root(&self) -> Self::Digest {
959 sha256::Digest::from([0u8; 32])
960 }
961 }
962
963 #[derive(Clone)]
964 struct TestResolver;
965
966 impl Resolver for TestResolver {
967 type Digest = sha256::Digest;
968 type Error = Infallible;
969 type Family = MmrFamily;
970 type Op = i32;
971
972 async fn get_operations(
973 &self,
974 _op_count: Location<Self::Family>,
975 _start_loc: Location<Self::Family>,
976 _max_ops: NonZeroU64,
977 _include_pinned_nodes: bool,
978 _cancel_rx: oneshot::Receiver<()>,
979 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
980 Ok(FetchResult::new(
981 Proof {
982 leaves: Location::new(0),
983 inactive_peaks: 0,
984 digests: vec![],
985 },
986 vec![],
987 None,
988 ))
989 }
990 }
991
992 fn test_engine_config(
993 context: deterministic::Context,
994 journal_size: u64,
995 boundary_probes: Arc<AtomicUsize>,
996 ) -> Config<TestDb, TestResolver> {
997 Config {
998 context,
999 resolver: TestResolver,
1000 target: Target {
1001 root: sha256::Digest::from([1u8; 32]),
1002 range: non_empty_range!(Location::new(5), Location::new(10)),
1003 },
1004 max_outstanding_requests: 1,
1005 fetch_batch_size: NZU64!(1),
1006 apply_batch_size: 1,
1007 db_config: TestConfig {
1008 journal_size,
1009 boundary_probes,
1010 },
1011 update_rx: None,
1012 finish_rx: None,
1013 reached_target_tx: None,
1014 max_retained_roots: 0,
1015 }
1016 }
1017
1018 #[test]
1019 fn new_probes_local_boundary_when_journal_reaches_target() {
1020 deterministic::Runner::default().start(|context| async move {
1021 let boundary_probes = Arc::new(AtomicUsize::new(0));
1022 Engine::new(test_engine_config(context, 10, boundary_probes.clone()))
1023 .await
1024 .unwrap();
1025
1026 assert_eq!(boundary_probes.load(Ordering::SeqCst), 1);
1027 });
1028 }
1029
1030 #[test]
1031 fn new_skips_local_boundary_when_journal_is_partial() {
1032 deterministic::Runner::default().start(|context| async move {
1033 let boundary_probes = Arc::new(AtomicUsize::new(0));
1034 Engine::new(test_engine_config(context, 7, boundary_probes.clone()))
1035 .await
1036 .unwrap();
1037
1038 assert_eq!(boundary_probes.load(Ordering::SeqCst), 0);
1039 });
1040 }
1041
1042 fn dummy_future(id: RequestId) -> FetchFuture<MmrFamily, i32, sha256::Digest, ()> {
1044 Box::pin(async move {
1045 IndexedFetchResult {
1046 id,
1047 result: Ok(FetchResult::new(
1048 Proof {
1049 leaves: Location::new(0),
1050 inactive_peaks: 0,
1051 digests: vec![],
1052 },
1053 vec![],
1054 None,
1055 )),
1056 }
1057 })
1058 }
1059
1060 fn add(requests: &mut Requests<MmrFamily, i32, sha256::Digest, ()>, loc: u64) -> RequestId {
1062 let id = requests.next_id();
1063 requests.insert(
1064 id,
1065 Location::new(loc),
1066 Location::new(loc),
1067 oneshot::channel().0,
1068 dummy_future(id),
1069 );
1070 id
1071 }
1072
1073 #[test]
1074 fn test_add_and_remove() {
1075 let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1076 assert_eq!(requests.len(), 0);
1077
1078 let id = add(&mut requests, 10);
1079 assert_eq!(requests.len(), 1);
1080 assert!(requests.contains(&Location::new(10)));
1081
1082 assert!(requests.remove(id).is_some());
1083 assert!(!requests.contains(&Location::new(10)));
1084 assert!(requests.remove(id).is_none());
1085 }
1086
1087 #[test]
1088 fn test_remove_before() {
1089 let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1090
1091 add(&mut requests, 5);
1092 add(&mut requests, 10);
1093 add(&mut requests, 15);
1094 add(&mut requests, 20);
1095 assert_eq!(requests.len(), 4);
1096
1097 requests.remove_before(Location::new(10));
1098 assert_eq!(requests.len(), 3);
1099 assert!(!requests.contains(&Location::new(5)));
1100 assert!(requests.contains(&Location::new(10)));
1101 assert!(requests.contains(&Location::new(15)));
1102 assert!(requests.contains(&Location::new(20)));
1103 }
1104
1105 #[test]
1106 fn test_remove_before_all() {
1107 let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1108
1109 add(&mut requests, 5);
1110 add(&mut requests, 10);
1111 assert_eq!(requests.len(), 2);
1112
1113 requests.remove_before(Location::new(100));
1114 assert_eq!(requests.len(), 0);
1115 }
1116
1117 #[test]
1118 fn test_remove_before_empty() {
1119 let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1120 requests.remove_before(Location::new(10));
1121 assert_eq!(requests.len(), 0);
1122 }
1123
1124 #[test]
1125 fn test_remove_before_none() {
1126 let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1127
1128 add(&mut requests, 10);
1129 add(&mut requests, 20);
1130 assert_eq!(requests.len(), 2);
1131
1132 requests.remove_before(Location::new(5));
1133 assert_eq!(requests.len(), 2);
1134 assert!(requests.contains(&Location::new(10)));
1135 assert!(requests.contains(&Location::new(20)));
1136 }
1137
1138 #[test]
1139 fn test_superseded_request() {
1140 let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1141
1142 let old_id = add(&mut requests, 10);
1144 assert_eq!(requests.len(), 1);
1145
1146 let new_id = add(&mut requests, 10);
1148 assert_eq!(requests.len(), 1);
1149
1150 assert!(requests.remove(old_id).is_none());
1152
1153 assert!(requests.contains(&Location::new(10)));
1155 assert!(requests.remove(new_id).is_some());
1156 assert!(!requests.contains(&Location::new(10)));
1157 }
1158
1159 #[test]
1160 fn test_stale_id_after_remove_before() {
1161 let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1162
1163 let old_id = add(&mut requests, 5);
1164 add(&mut requests, 15);
1165 requests.remove_before(Location::new(10));
1166
1167 assert!(requests.remove(old_id).is_none());
1169
1170 let new_id = add(&mut requests, 5);
1172 assert_ne!(old_id, new_id);
1173 assert!(requests.remove(new_id).is_some());
1174 }
1175}