1use commonware_consensus::{
77 types::{Height, Round},
78 CertifiableBlock, Epochable, Roundable, Viewable,
79};
80use commonware_cryptography::Digest;
81use commonware_macros::select;
82use commonware_runtime::{reschedule, Metrics, Spawner};
83use commonware_utils::{
84 channel::{fallible::AsyncFallibleExt, mpsc, oneshot, ring},
85 sync::AsyncRwLock,
86};
87use futures::{
88 future::{pending, Either},
89 join,
90};
91use std::{
92 collections::BTreeMap,
93 fmt::Debug,
94 future::Future,
95 num::{NonZeroU64, NonZeroUsize},
96 sync::Arc,
97};
98
99const MAX_CHANNEL_DRAIN_PER_TICK: usize = 32;
100
101pub mod any;
102pub mod current;
103pub mod immutable;
104pub mod keyless;
105pub mod p2p;
106
107pub trait Unmerkleized: Sized + Send {
113 type Merkleized: Merkleized;
115
116 type Error: Send;
118
119 fn merkleize(self) -> impl Future<Output = Result<Self::Merkleized, Self::Error>> + Send;
122}
123
124pub trait Merkleized: Sized + Send + Sync {
129 type Digest: Digest;
131
132 type Unmerkleized: Unmerkleized;
134
135 fn root(&self) -> Self::Digest;
137
138 fn new_batch(&self) -> Self::Unmerkleized;
143}
144
145pub trait ManagedDb<E>: Send + Sync + Sized {
156 type Unmerkleized: Unmerkleized;
158
159 type Merkleized: Merkleized<Unmerkleized = Self::Unmerkleized>;
165
166 type Error: Debug + Send;
168
169 type Config: Send;
171
172 type SyncTarget: Clone + PartialEq + Send + Sync;
176
177 fn init(
179 context: E,
180 config: Self::Config,
181 ) -> impl Future<Output = Result<Self, Self::Error>> + Send;
182
183 fn new_batch(db: &Arc<AsyncRwLock<Self>>) -> impl Future<Output = Self::Unmerkleized> + Send;
190
191 fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool;
193
194 fn finalize(
199 &mut self,
200 batch: Self::Merkleized,
201 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
202
203 fn sync_target(&self) -> impl Future<Output = Self::SyncTarget> + Send;
205
206 fn rewind_to_target(
211 &mut self,
212 target: Self::SyncTarget,
213 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
214
215 fn max_rewind_depth() -> Option<usize> {
219 None
220 }
221}
222
223pub trait DatabaseSet<E>: Clone + Send + Sync + 'static {
231 type Unmerkleized: Send;
233
234 type Merkleized: Send + Sync;
236
237 type Config: Send;
243
244 type SyncTargets: Clone + PartialEq + Send + Sync;
249
250 fn init(context: E, config: Self::Config) -> impl Future<Output = Self> + Send;
252
253 fn new_batches(&self) -> impl Future<Output = Self::Unmerkleized> + Send;
257
258 fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized;
262
263 fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool;
265
266 fn finalize(&self, batches: Self::Merkleized) -> impl Future<Output = ()> + Send;
270
271 fn committed_targets(&self) -> impl Future<Output = Self::SyncTargets> + Send;
273
274 fn rewind_to_targets(&self, targets: Self::SyncTargets) -> impl Future<Output = ()> + Send;
278
279 fn max_rewind_depth() -> Option<usize>;
283}
284
285pub(crate) fn assert_rewind_window_safety<E, D>(max_pending_acks: NonZeroUsize)
286where
287 D: DatabaseSet<E>,
288{
289 let Some(max_rewind_depth) = D::max_rewind_depth() else {
290 return;
291 };
292
293 assert!(
294 max_pending_acks.get() <= max_rewind_depth,
295 "marshal max_pending_acks={} exceeds database_set.max_rewind_depth={}",
296 max_pending_acks,
297 max_rewind_depth,
298 );
299}
300
301#[derive(Clone, Copy, Debug)]
303pub struct SyncEngineConfig {
304 pub fetch_batch_size: NonZeroU64,
306
307 pub apply_batch_size: usize,
309
310 pub max_outstanding_requests: usize,
312
313 pub update_channel_size: NonZeroUsize,
315
316 pub max_retained_roots: usize,
319}
320
321pub trait StateSyncDb<E, R>: ManagedDb<E> {
323 type SyncError: Debug + Send;
325
326 #[allow(clippy::too_many_arguments)]
328 fn sync_db(
329 context: E,
330 config: Self::Config,
331 resolver: R,
332 target: Self::SyncTarget,
333 tip_updates: mpsc::Receiver<Self::SyncTarget>,
334 finish: Option<mpsc::Receiver<()>>,
335 reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
336 sync_config: SyncEngineConfig,
337 ) -> impl Future<Output = Result<Self, Self::SyncError>> + Send;
338}
339
340#[derive(Clone, Copy, Debug, PartialEq, Eq)]
343pub struct Anchor<D: Digest> {
344 pub height: Height,
346 pub round: Round,
348 pub digest: D,
350}
351
352impl<B, D> From<&B> for Anchor<D>
353where
354 B: CertifiableBlock<Digest = D>,
355 B::Context: Epochable + Viewable,
356 D: Digest,
357{
358 fn from(block: &B) -> Self {
359 Self {
360 height: block.height(),
361 round: block.context().round(),
362 digest: block.digest(),
363 }
364 }
365}
366
367pub struct TipUpdate<D: Digest, T> {
373 anchor: Anchor<D>,
374 targets: T,
375 observed: Option<oneshot::Sender<()>>,
376}
377
378impl<D: Digest, T> TipUpdate<D, T> {
379 pub const fn new(anchor: Anchor<D>, targets: T) -> Self {
380 Self {
381 anchor,
382 targets,
383 observed: None,
384 }
385 }
386
387 pub(crate) fn with_observation(anchor: Anchor<D>, targets: T) -> (Self, oneshot::Receiver<()>) {
388 let (observed, receiver) = oneshot::channel();
389 (
390 Self {
391 anchor,
392 targets,
393 observed: Some(observed),
394 },
395 receiver,
396 )
397 }
398
399 pub(crate) fn record(mut self) -> (Anchor<D>, T) {
400 if let Some(observed) = self.observed.take() {
401 let _ = observed.send(());
402 }
403 (self.anchor, self.targets)
404 }
405}
406
407pub trait StateSyncSet<E, R, D>: DatabaseSet<E>
413where
414 D: Digest,
415{
416 type Error: Debug + Send;
418
419 #[allow(clippy::too_many_arguments)]
422 fn sync(
423 context: E,
424 config: Self::Config,
425 resolvers: R,
426 anchor: Anchor<D>,
427 targets: Self::SyncTargets,
428 tip_updates: ring::Receiver<TipUpdate<D, Self::SyncTargets>>,
429 sync_config: SyncEngineConfig,
430 ) -> impl Future<Output = Result<(Self, Anchor<D>), Self::Error>> + Send;
431}
432
433impl<E: Send + Sync, T: ManagedDb<E> + 'static> DatabaseSet<E> for Arc<AsyncRwLock<T>> {
435 type Unmerkleized = T::Unmerkleized;
436 type Merkleized = T::Merkleized;
437 type Config = T::Config;
438 type SyncTargets = T::SyncTarget;
439
440 async fn init(context: E, config: Self::Config) -> Self {
441 let db = T::init(context, config)
442 .await
443 .expect("database init failed");
444 Self::new(AsyncRwLock::new(db))
445 }
446
447 async fn new_batches(&self) -> Self::Unmerkleized {
448 T::new_batch(self).await
449 }
450
451 fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized {
452 parent.new_batch()
453 }
454
455 fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool {
456 T::matches_sync_target(batches, targets)
457 }
458
459 async fn finalize(&self, batches: Self::Merkleized) {
460 let mut database = self.write().await;
461 finalize_or_panic(&mut *database, batches, None).await;
462 }
463
464 async fn committed_targets(&self) -> Self::SyncTargets {
465 let database = self.read().await;
466 T::sync_target(&*database).await
467 }
468
469 async fn rewind_to_targets(&self, target: Self::SyncTargets) {
470 let mut database = self.write().await;
471 if T::sync_target(&*database).await == target {
472 return;
473 }
474 rewind_or_panic(&mut *database, target, None).await;
475 }
476
477 fn max_rewind_depth() -> Option<usize> {
478 T::max_rewind_depth()
479 }
480}
481
482impl<E, T, R, D> StateSyncSet<E, R, D> for Arc<AsyncRwLock<T>>
483where
484 E: Send + Sync + Metrics,
485 T: StateSyncDb<E, R> + 'static,
486 R: Send + 'static,
487 D: Digest,
488{
489 type Error = T::SyncError;
490
491 #[allow(clippy::too_many_arguments)]
492 async fn sync(
493 context: E,
494 config: Self::Config,
495 resolver: R,
496 anchor: Anchor<D>,
497 target: Self::SyncTargets,
498 tip_updates: ring::Receiver<TipUpdate<D, Self::SyncTargets>>,
499 sync_config: SyncEngineConfig,
500 ) -> Result<(Self, Anchor<D>), Self::Error> {
501 let (target_tx, target_rx) = mpsc::channel(sync_config.update_channel_size.get());
502 let (finish_tx, finish_rx) = mpsc::channel(1);
503 let (reached_tx, mut reached_rx) = mpsc::channel(1);
504 let mut current_target = target.clone();
505 let sync = T::sync_db(
506 context,
507 config,
508 resolver,
509 target,
510 target_rx,
511 Some(finish_rx),
512 Some(reached_tx),
513 sync_config,
514 );
515
516 let coordinator = async {
517 let mut current_anchor = anchor;
518 let mut tip_updates = Some(tip_updates);
519 loop {
520 if !drain_single_tip_updates(
521 &mut tip_updates,
522 &target_tx,
523 &mut current_anchor,
524 &mut current_target,
525 )
526 .await
527 {
528 return (current_anchor, current_target);
529 }
530
531 let update_future = tip_updates.as_mut().map_or_else(
532 || Either::Right(pending()),
533 |updates| Either::Left(updates.recv()),
534 );
535 select! {
536 reached = reached_rx.recv() => {
537 let Some(reached) = reached else {
538 return (current_anchor, current_target);
539 };
540 if !drain_single_tip_updates(
541 &mut tip_updates,
542 &target_tx,
543 &mut current_anchor,
544 &mut current_target,
545 )
546 .await
547 {
548 return (current_anchor, current_target);
549 };
550 if reached != current_target {
551 continue;
552 }
553 let _ = finish_tx.send_lossy(()).await;
554 return (current_anchor, current_target);
555 },
556 update = update_future => {
557 let Some(update) = update else {
558 tip_updates = None;
559 continue;
560 };
561 let (new_anchor, new_target) = update.record();
562 if new_anchor.height <= current_anchor.height {
563 continue;
564 }
565 current_anchor = new_anchor;
566 if new_target == current_target {
567 continue;
568 }
569 current_target = new_target.clone();
570 if !target_tx.send_lossy(new_target).await {
571 return (current_anchor, current_target);
572 }
573 },
574 }
575 }
576 };
577
578 let (db_result, (converged_anchor, converged_target)) = join!(sync, coordinator);
579 let database = db_result?;
580 assert!(
581 T::sync_target(&database).await == converged_target,
582 "state sync database target does not match the coordinator target",
583 );
584 Ok((Self::new(AsyncRwLock::new(database)), converged_anchor))
585 }
586}
587
588async fn drain_single_tip_updates<D, T>(
589 tip_updates: &mut Option<ring::Receiver<TipUpdate<D, T>>>,
590 target_tx: &mpsc::Sender<T>,
591 current_anchor: &mut Anchor<D>,
592 current_target: &mut T,
593) -> bool
594where
595 D: Digest,
596 T: Clone + PartialEq + Send + Sync,
597{
598 let mut drained = 0usize;
599 let mut latest = None;
600 loop {
601 let update = match tip_updates.as_mut().map(ring::Receiver::try_recv) {
602 Some(Ok(update)) => update,
603 Some(Err(ring::TryRecvError::Empty)) => break,
604 Some(Err(ring::TryRecvError::Disconnected)) => {
605 *tip_updates = None;
606 break;
607 }
608 None => break,
609 };
610 drained += 1;
611
612 let (new_anchor, new_target) = update.record();
613 if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) {
614 reschedule().await;
615 }
616
617 let latest_height = latest
618 .as_ref()
619 .map_or(current_anchor.height, |(anchor, _): &(Anchor<D>, T)| {
620 anchor.height
621 });
622 if new_anchor.height <= latest_height {
623 continue;
624 }
625 latest = Some((new_anchor, new_target));
626 }
627
628 let Some((new_anchor, new_target)) = latest else {
629 return true;
630 };
631 *current_anchor = new_anchor;
632 if new_target == *current_target {
633 return true;
634 }
635 *current_target = new_target.clone();
636 target_tx.send_lossy(new_target).await
637}
638
639macro_rules! impl_database_set {
642 ($($T:ident : $idx:tt),+) => {
643 impl<E: Send + Sync + Metrics, $($T: ManagedDb<E> + 'static),+> DatabaseSet<E>
644 for ($(Arc<AsyncRwLock<$T>>,)+)
645 {
646 type Unmerkleized = ($($T::Unmerkleized,)+);
647 type Merkleized = ($($T::Merkleized,)+);
648 type Config = ($($T::Config,)+);
649 type SyncTargets = ($($T::SyncTarget,)+);
650
651 async fn init(context: E, config: Self::Config) -> Self {
652 let result = join!($(
653 async {
654 let db = $T::init(
655 context.child(concat!("db_", stringify!($idx))),
656 config.$idx,
657 )
658 .await
659 .expect(concat!(
660 "database init failed (index ",
661 stringify!($idx),
662 ", type ",
663 stringify!($T),
664 ")",
665 ));
666 Arc::new(AsyncRwLock::new(db))
667 },
668 )+);
669 result
670 }
671
672 async fn new_batches(&self) -> Self::Unmerkleized {
673 join!($($T::new_batch(&self.$idx),)+)
674 }
675
676 fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized {
677 ($(parent.$idx.new_batch(),)+)
678 }
679
680 fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool {
681 $($T::matches_sync_target(&batches.$idx, &targets.$idx))&&+
682 }
683
684 async fn finalize(&self, batches: Self::Merkleized) {
685 join!($(
686 async {
687 let mut database = self.$idx.write().await;
688 finalize_or_panic(&mut *database, batches.$idx, Some($idx)).await;
689 },
690 )+);
691 }
692
693 async fn committed_targets(&self) -> Self::SyncTargets {
694 join!($(
695 async {
696 let database = self.$idx.read().await;
697 $T::sync_target(&*database).await
698 },
699 )+)
700 }
701
702 async fn rewind_to_targets(&self, targets: Self::SyncTargets) {
703 join!($(
704 async {
705 let mut database = self.$idx.write().await;
706 if $T::sync_target(&*database).await == targets.$idx {
707 return;
708 }
709 rewind_or_panic(&mut *database, targets.$idx, Some($idx)).await;
710 },
711 )+);
712 }
713
714 fn max_rewind_depth() -> Option<usize> {
715 let mut max_rewind_depth: Option<usize> = None;
716 $(
717 max_rewind_depth = match (max_rewind_depth, $T::max_rewind_depth()) {
718 (Some(current), Some(next)) => Some(current.min(next)),
719 (Some(current), None) => Some(current),
720 (None, Some(next)) => Some(next),
721 (None, None) => None,
722 };
723 )+
724 max_rewind_depth
725 }
726 }
727 };
728}
729
730impl_database_set!(DB1: 0);
731impl_database_set!(DB1: 0, DB2: 1);
732impl_database_set!(DB1: 0, DB2: 1, DB3: 2);
733impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3);
734impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4);
735impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5);
736impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5, DB7: 6);
737impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5, DB7: 6, DB8: 7);
738
739struct DbSyncChannels<T> {
740 target_tx: mpsc::Sender<T>,
741 target_rx: mpsc::Receiver<T>,
742 finish_tx: mpsc::Sender<()>,
743 finish_rx: mpsc::Receiver<()>,
744 generation_tx: mpsc::Sender<(usize, T)>,
745 generation_rx: mpsc::Receiver<(usize, T)>,
746 reached_tx: mpsc::Sender<T>,
747 reached_rx: mpsc::Receiver<T>,
748}
749
750impl<T> DbSyncChannels<T> {
751 fn new(update_channel_size: usize) -> Self {
752 let (target_tx, target_rx) = mpsc::channel(update_channel_size);
753 let (finish_tx, finish_rx) = mpsc::channel(1);
754 let (generation_tx, generation_rx) = mpsc::channel(update_channel_size);
755 let (reached_tx, reached_rx) = mpsc::channel(1);
756 Self {
757 target_tx,
758 target_rx,
759 finish_tx,
760 finish_rx,
761 generation_tx,
762 generation_rx,
763 reached_tx,
764 reached_rx,
765 }
766 }
767}
768
769struct CoordinatorSyncSenders<T> {
770 target_tx: mpsc::Sender<T>,
771 finish_tx: mpsc::Sender<()>,
772 generation_tx: mpsc::Sender<(usize, T)>,
773}
774
775macro_rules! impl_state_sync_set {
776 ($($T:ident : $R:ident : $idx:tt),+) => {
777 impl<E, D, $($T, $R),+> StateSyncSet<E, ($($R,)+), D> for ($(Arc<AsyncRwLock<$T>>,)+)
778 where
779 E: Send + Sync + Spawner + Metrics + 'static,
780 D: Digest + 'static,
781 $(
782 $T: StateSyncDb<E, $R> + 'static,
783 $R: Send + 'static,
784 )+
785 {
786 type Error = String;
787
788 #[allow(clippy::too_many_arguments)]
789 async fn sync(
790 context: E,
791 config: Self::Config,
792 resolvers: ($($R,)+),
793 anchor: Anchor<D>,
794 targets: Self::SyncTargets,
795 tip_updates: ring::Receiver<TipUpdate<D, Self::SyncTargets>>,
796 sync_config: SyncEngineConfig,
797 ) -> Result<(Self, Anchor<D>), Self::Error> {
798 let db_channels = ($(
799 DbSyncChannels::<<$T as ManagedDb<E>>::SyncTarget>::new(
800 sync_config.update_channel_size.get(),
801 ),
802 )+);
803 let coordinator_senders = ($(
804 CoordinatorSyncSenders {
805 target_tx: db_channels.$idx.target_tx.clone(),
806 finish_tx: db_channels.$idx.finish_tx.clone(),
807 generation_tx: db_channels.$idx.generation_tx.clone(),
808 },
809 )+);
810 let coordinator_owned_senders = ($(
811 CoordinatorSyncSenders {
812 target_tx: db_channels.$idx.target_tx,
813 finish_tx: db_channels.$idx.finish_tx,
814 generation_tx: db_channels.$idx.generation_tx,
815 },
816 )+);
817 let (reached_event_tx, mut reached_event_rx) = mpsc::channel(16);
818 let (completion_tx, mut completion_rx) = mpsc::channel(1);
819 let db_count = [$($idx,)+].len();
820 let coordinator_targets = targets.clone();
821 let initial_targets = targets.clone();
822 let first_db_error: Arc<commonware_utils::sync::Mutex<Option<String>>> =
823 Arc::new(commonware_utils::sync::Mutex::new(None));
824 let coordinator_handle = context.child("coordinator").spawn({
825 move |_context| async move {
826 let coordinator_owned_senders = coordinator_owned_senders;
827 let mut tip_updates = Some(tip_updates);
828 let mut state = CoordinatorState::new(db_count, anchor, coordinator_targets);
829 let mut last_dispatched_targets = initial_targets;
830
831 loop {
832 loop {
833 match reached_event_rx.try_recv() {
834 Ok((idx, generation)) => state.record_reached(idx, generation),
835 Err(mpsc::error::TryRecvError::Empty) => break,
836 Err(mpsc::error::TryRecvError::Disconnected) => return None,
837 }
838 }
839
840 if let Some(updates) = tip_updates.as_mut() {
841 loop {
842 match updates.try_recv() {
843 Ok(update) => {
844 let (anchor, targets) = update.record();
845 state.record_tip_update(anchor, targets);
846 }
847 Err(ring::TryRecvError::Empty) => break,
848 Err(ring::TryRecvError::Disconnected) => {
849 tip_updates = None;
850 break;
851 }
852 }
853 }
854 }
855
856 match state.next_action() {
857 CoordinatorAction::Converged { anchor, targets } => {
858 $(
859 let _ = coordinator_senders.$idx.finish_tx.send_lossy(()).await;
860 )+
861 return Some((anchor, targets));
862 }
863 CoordinatorAction::Dispatch {
864 generation,
865 targets: dispatch_targets,
866 } => {
867 $(
868 let dispatch_target = dispatch_targets.$idx.clone();
869 if !coordinator_senders.$idx
870 .generation_tx
871 .send_lossy((generation, dispatch_target.clone()))
872 .await
873 {
874 return None;
875 }
876 if state.should_dispatch($idx) {
877 if dispatch_target != last_dispatched_targets.$idx {
878 if !coordinator_senders.$idx
879 .target_tx
880 .send_lossy(dispatch_target.clone())
881 .await
882 {
883 return None;
884 }
885 last_dispatched_targets.$idx = dispatch_target;
886 }
887 } else if dispatch_target == last_dispatched_targets.$idx {
888 state.mark_reached_same_target($idx, generation);
889 }
890 )+
891 continue;
892 }
893 CoordinatorAction::Wait => {}
894 }
895
896 let update_future = tip_updates.as_mut().map_or_else(
897 || Either::Right(pending()),
898 |updates| Either::Left(updates.recv()),
899 );
900 select! {
901 reached_event = reached_event_rx.recv() => {
902 let (idx, generation) = reached_event?;
903 state.record_reached(idx, generation);
904 },
905 _ = completion_rx.recv() => {
906 drop(coordinator_owned_senders);
907 return None;
908 },
909 update = update_future => {
910 let Some(update) = update else {
911 tip_updates = None;
912 continue;
913 };
914 let (anchor, targets) = update.record();
915 state.record_tip_update(anchor, targets);
916 },
917 };
918 }
919 }
920 });
921 let db_handles = (
922 $(
923 context.child(concat!("db_", stringify!($idx))).spawn({
924 let first_db_error = first_db_error.clone();
925 let mut reached_target_rx = db_channels.$idx.reached_rx;
926 let mut generation_rx = Some(db_channels.$idx.generation_rx);
927 let mut current_generation = 0usize;
928 let mut current_target = targets.$idx.clone();
929 let mut last_reached_target = None;
930 let mut last_reported_generation = None;
931 let reached_event_sender = reached_event_tx.clone();
932 let completion_signal = completion_tx.clone();
933 let config = config.$idx;
934 let resolver = resolvers.$idx;
935 let target = targets.$idx;
936 let target_rx = db_channels.$idx.target_rx;
937 let finish_rx = db_channels.$idx.finish_rx;
938 let reached_tx = db_channels.$idx.reached_tx;
939 move |context| async move {
940 let sync = $T::sync_db(
941 context,
942 config,
943 resolver,
944 target,
945 target_rx,
946 Some(finish_rx),
947 Some(reached_tx),
948 sync_config,
949 );
950 let forward_reached = async move {
951 loop {
952 drain_generation_updates(
953 &mut generation_rx,
954 &mut current_generation,
955 &mut current_target,
956 &last_reached_target,
957 &mut last_reported_generation,
958 &reached_event_sender,
959 $idx,
960 )
961 .await;
962
963 let update_future = generation_rx.as_mut().map_or_else(
964 || Either::Right(pending()),
965 |updates| Either::Left(updates.recv()),
966 );
967 select! {
968 reached_target = reached_target_rx.recv() => {
969 let Some(reached_target) = reached_target else {
970 return;
971 };
972
973 last_reached_target = Some(reached_target.clone());
974 drain_generation_updates(
975 &mut generation_rx,
976 &mut current_generation,
977 &mut current_target,
978 &last_reached_target,
979 &mut last_reported_generation,
980 &reached_event_sender,
981 $idx,
982 )
983 .await;
984
985 if reached_target != current_target {
986 continue;
987 }
988
989 if last_reported_generation != Some(current_generation) {
990 if !reached_event_sender
991 .send_lossy(($idx, current_generation))
992 .await
993 {
994 return;
995 }
996 last_reported_generation = Some(current_generation);
997 }
998 },
999 update = update_future => {
1000 let Some((generation, target)) = update else {
1001 generation_rx = None;
1002 continue;
1003 };
1004 current_generation = generation;
1005 current_target = target;
1006 if last_reached_target.as_ref() == Some(¤t_target)
1007 && last_reported_generation != Some(current_generation)
1008 {
1009 if !reached_event_sender
1010 .send_lossy(($idx, current_generation))
1011 .await
1012 {
1013 return;
1014 }
1015 last_reported_generation = Some(current_generation);
1016 }
1017 },
1018 };
1019 }
1020 };
1021 let (sync_result, _) = join!(sync, forward_reached);
1022 let result = sync_result
1023 .map(|database| Arc::new(AsyncRwLock::new(database)))
1024 .map_err(|err| {
1025 format!(
1026 "state sync failed (index {}, db {}): {err:?}",
1027 $idx,
1028 core::any::type_name::<$T>(),
1029 )
1030 });
1031 if let Err(err) = &result {
1032 let mut first = first_db_error.lock();
1033 if first.is_none() {
1034 *first = Some(err.clone());
1035 }
1036 }
1037 let _ = completion_signal.send_lossy(()).await;
1038 result
1039 }
1040 }),
1041 )+
1042 );
1043
1044 let synced = join!(
1045 $(
1046 async {
1047 db_handles.$idx
1048 .await
1049 .expect("state sync database task exited")
1050 },
1051 )+
1052 );
1053 let converged_anchor = coordinator_handle
1054 .await
1055 .expect("state sync coordinator task exited");
1056
1057 if let Some(err) = first_db_error.lock().take() {
1058 return Err(err);
1059 }
1060
1061 let synced = ($(synced.$idx?,)+);
1062 let Some((converged_anchor, converged_targets)) = converged_anchor else {
1063 return Err("state sync coordinator did not report a converged anchor".into());
1064 };
1065 if <Self as DatabaseSet<E>>::committed_targets(&synced).await != converged_targets {
1066 return Err(
1067 "state sync database targets do not match the coordinator target set"
1068 .into(),
1069 );
1070 }
1071
1072 Ok((synced, converged_anchor))
1073 }
1074 }
1075 };
1076}
1077
1078impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1);
1079impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2);
1080impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3);
1081impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4);
1082impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4, DB6: R6: 5);
1083impl_state_sync_set!(
1084 DB1: R1: 0,
1085 DB2: R2: 1,
1086 DB3: R3: 2,
1087 DB4: R4: 3,
1088 DB5: R5: 4,
1089 DB6: R6: 5,
1090 DB7: R7: 6
1091);
1092impl_state_sync_set!(
1093 DB1: R1: 0,
1094 DB2: R2: 1,
1095 DB3: R3: 2,
1096 DB4: R4: 3,
1097 DB5: R5: 4,
1098 DB6: R6: 5,
1099 DB7: R7: 6,
1100 DB8: R8: 7
1101);
1102
1103async fn drain_generation_updates<T>(
1104 generation_rx: &mut Option<mpsc::Receiver<(usize, T)>>,
1105 current_generation: &mut usize,
1106 current_target: &mut T,
1107 last_reached_target: &Option<T>,
1108 last_reported_generation: &mut Option<usize>,
1109 reached_event_sender: &mpsc::Sender<(usize, usize)>,
1110 idx: usize,
1111) where
1112 T: Clone + PartialEq,
1113{
1114 if let Some(updates) = generation_rx.as_mut() {
1115 let mut drained = 0usize;
1116 loop {
1117 match updates.try_recv() {
1118 Ok((generation, target)) => {
1119 drained += 1;
1120 *current_generation = generation;
1121 *current_target = target;
1122
1123 if last_reached_target.as_ref() == Some(current_target)
1124 && *last_reported_generation != Some(*current_generation)
1125 {
1126 if !reached_event_sender
1127 .send_lossy((idx, *current_generation))
1128 .await
1129 {
1130 return;
1131 }
1132 *last_reported_generation = Some(*current_generation);
1133 }
1134 if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) {
1135 reschedule().await;
1136 }
1137 }
1138 Err(mpsc::error::TryRecvError::Empty) => break,
1139 Err(mpsc::error::TryRecvError::Disconnected) => {
1140 *generation_rx = None;
1141 break;
1142 }
1143 }
1144 }
1145 }
1146}
1147
1148#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1150enum DbSyncState {
1151 Seeking { generation: usize },
1153 Reached { generation: usize },
1155}
1156
1157impl DbSyncState {
1158 const fn generation(self) -> usize {
1159 match self {
1160 Self::Seeking { generation } | Self::Reached { generation } => generation,
1161 }
1162 }
1163
1164 const fn is_reached(self) -> bool {
1165 matches!(self, Self::Reached { .. })
1166 }
1167}
1168
1169enum CoordinatorAction<D: Digest, T> {
1171 Wait,
1173 Dispatch { generation: usize, targets: T },
1175 Converged { anchor: Anchor<D>, targets: T },
1177}
1178
1179struct CoordinatorState<D: Digest, T> {
1185 dbs: Vec<DbSyncState>,
1186 generation_state: BTreeMap<usize, (Anchor<D>, T)>,
1187 current_generation: usize,
1188 latest_tip: Option<(Anchor<D>, T)>,
1189 last_dispatched_anchor: Anchor<D>,
1190}
1191
1192impl<D: Digest, T: Clone> CoordinatorState<D, T> {
1193 fn new(db_count: usize, anchor: Anchor<D>, targets: T) -> Self {
1194 let dbs = vec![DbSyncState::Seeking { generation: 0 }; db_count];
1195 let mut generation_state = BTreeMap::new();
1196 generation_state.insert(0, (anchor, targets));
1197 Self {
1198 dbs,
1199 generation_state,
1200 current_generation: 0,
1201 latest_tip: None,
1202 last_dispatched_anchor: anchor,
1203 }
1204 }
1205
1206 fn record_reached(&mut self, idx: usize, generation: usize) {
1211 if self.dbs[idx].generation() != generation {
1212 return;
1213 }
1214 if self.dbs[idx].is_reached() {
1215 return;
1216 }
1217 self.dbs[idx] = DbSyncState::Reached { generation };
1218 }
1219
1220 fn record_tip_update(&mut self, anchor: Anchor<D>, targets: T) {
1225 let current_height = self
1226 .latest_tip
1227 .as_ref()
1228 .map_or(self.last_dispatched_anchor.height, |(latest_anchor, _)| {
1229 latest_anchor.height
1230 });
1231 if anchor.height <= current_height {
1232 return;
1233 }
1234 self.latest_tip = Some((anchor, targets));
1235 }
1236
1237 fn next_action(&mut self) -> CoordinatorAction<D, T> {
1242 let all_reached = self.dbs.iter().all(|db| db.is_reached());
1243
1244 if all_reached {
1245 let min_gen = self.dbs.iter().map(|db| db.generation()).min().unwrap();
1246 let max_gen = self.dbs.iter().map(|db| db.generation()).max().unwrap();
1247
1248 if min_gen == max_gen {
1249 if let Some((anchor, targets)) = self.latest_tip.take() {
1250 let generation = self.current_generation + 1;
1251 self.current_generation = generation;
1252 for db in &mut self.dbs {
1253 *db = DbSyncState::Seeking { generation };
1254 }
1255 self.generation_state
1256 .insert(generation, (anchor, targets.clone()));
1257 self.last_dispatched_anchor = anchor;
1258 self.prune_generations();
1259 return CoordinatorAction::Dispatch {
1260 generation,
1261 targets,
1262 };
1263 }
1264
1265 let (anchor, targets) = self
1266 .generation_state
1267 .get(&min_gen)
1268 .expect("missing state for converged generation")
1269 .clone();
1270 return CoordinatorAction::Converged { anchor, targets };
1271 }
1272
1273 let (_anchor, targets) = self
1275 .generation_state
1276 .get(&max_gen)
1277 .expect("missing state for regroup generation")
1278 .clone();
1279 for db in &mut self.dbs {
1280 if db.generation() != max_gen {
1281 *db = DbSyncState::Seeking {
1282 generation: max_gen,
1283 };
1284 }
1285 }
1286 self.prune_generations();
1287 return CoordinatorAction::Dispatch {
1288 generation: max_gen,
1289 targets,
1290 };
1291 }
1292
1293 let Some((anchor, targets)) = self.latest_tip.take() else {
1295 return CoordinatorAction::Wait;
1296 };
1297
1298 let generation = self.current_generation + 1;
1299 self.current_generation = generation;
1300 for db in &mut self.dbs {
1301 if !db.is_reached() {
1302 *db = DbSyncState::Seeking { generation };
1303 }
1304 }
1305 self.generation_state
1306 .insert(generation, (anchor, targets.clone()));
1307 self.last_dispatched_anchor = anchor;
1308
1309 self.prune_generations();
1310 CoordinatorAction::Dispatch {
1311 generation,
1312 targets,
1313 }
1314 }
1315
1316 fn prune_generations(&mut self) {
1318 self.generation_state
1319 .retain(|gen, _| self.dbs.iter().any(|db| db.generation() == *gen));
1320 }
1321
1322 fn should_dispatch(&self, idx: usize) -> bool {
1324 !self.dbs[idx].is_reached()
1325 }
1326
1327 fn mark_reached_same_target(&mut self, idx: usize, generation: usize) {
1329 if !self.dbs[idx].is_reached() {
1330 return;
1331 }
1332 self.dbs[idx] = DbSyncState::Reached { generation };
1333 }
1334}
1335
1336async fn finalize_or_panic<E, T: ManagedDb<E>>(
1337 database: &mut T,
1338 batch: T::Merkleized,
1339 index: Option<usize>,
1340) {
1341 if let Err(err) = database.finalize(batch).await {
1344 match index {
1345 Some(index) => panic!(
1346 "database finalize failed (index {index}, type {}): {err:?}",
1347 core::any::type_name::<T>(),
1348 ),
1349 None => panic!(
1350 "database finalize failed (type {}): {err:?}",
1351 core::any::type_name::<T>(),
1352 ),
1353 }
1354 }
1355}
1356
1357async fn rewind_or_panic<E, T: ManagedDb<E>>(
1358 database: &mut T,
1359 target: T::SyncTarget,
1360 index: Option<usize>,
1361) {
1362 if let Err(err) = database.rewind_to_target(target).await {
1365 match index {
1366 Some(index) => panic!(
1367 "database rewind failed (index {index}, type {}): {err:?}",
1368 core::any::type_name::<T>(),
1369 ),
1370 None => panic!(
1371 "database rewind failed (type {}): {err:?}",
1372 core::any::type_name::<T>(),
1373 ),
1374 }
1375 }
1376}
1377
1378pub trait AttachableResolver<DB>: Clone + Send + Sync + 'static {
1383 fn attach_database(&self, db: Arc<AsyncRwLock<DB>>) -> impl Future<Output = ()> + Send;
1385}
1386
1387pub trait AttachableResolverSet<DBs>: Clone + Send + Sync + 'static {
1389 fn attach_databases(&self, databases: DBs) -> impl Future<Output = ()> + Send;
1391}
1392
1393impl<R, DB> AttachableResolverSet<Arc<AsyncRwLock<DB>>> for R
1394where
1395 R: AttachableResolver<DB>,
1396 DB: Send + Sync + 'static,
1397{
1398 async fn attach_databases(&self, db: Arc<AsyncRwLock<DB>>) {
1399 self.attach_database(db).await;
1400 }
1401}
1402
1403macro_rules! impl_attachable_resolver_set {
1404 ($($R:ident : $DB:ident : $idx:tt),+) => {
1405 impl<$($R, $DB),+> AttachableResolverSet<($(Arc<AsyncRwLock<$DB>>,)+)> for ($($R,)+)
1406 where
1407 $(
1408 $R: AttachableResolver<$DB>,
1409 $DB: Send + Sync + 'static,
1410 )+
1411 {
1412 async fn attach_databases(&self, databases: ($(Arc<AsyncRwLock<$DB>>,)+)) {
1413 futures::join!($(
1414 self.$idx.attach_database(databases.$idx),
1415 )+);
1416 }
1417 }
1418 };
1419}
1420
1421impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1);
1422impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2);
1423impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3);
1424impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3, R5: DB5: 4);
1425impl_attachable_resolver_set!(
1426 R1: DB1: 0,
1427 R2: DB2: 1,
1428 R3: DB3: 2,
1429 R4: DB4: 3,
1430 R5: DB5: 4,
1431 R6: DB6: 5
1432);
1433impl_attachable_resolver_set!(
1434 R1: DB1: 0,
1435 R2: DB2: 1,
1436 R3: DB3: 2,
1437 R4: DB4: 3,
1438 R5: DB5: 4,
1439 R6: DB6: 5,
1440 R7: DB7: 6
1441);
1442impl_attachable_resolver_set!(
1443 R1: DB1: 0,
1444 R2: DB2: 1,
1445 R3: DB3: 2,
1446 R4: DB4: 3,
1447 R5: DB5: 4,
1448 R6: DB6: 5,
1449 R7: DB7: 6,
1450 R8: DB8: 7
1451);
1452
1453#[cfg(test)]
1454mod tests {
1455 use super::{
1456 assert_rewind_window_safety, drain_single_tip_updates, Anchor, AttachableResolver,
1457 AttachableResolverSet, CoordinatorAction, CoordinatorState, DatabaseSet, ManagedDb,
1458 StateSyncDb, StateSyncSet, SyncEngineConfig, TipUpdate, MAX_CHANNEL_DRAIN_PER_TICK,
1459 };
1460 use crate::stateful::tests::mocks::{anchor as mock_anchor, TestMerkleized, TestUnmerkleized};
1461 use commonware_cryptography::sha256;
1462 use commonware_macros::select;
1463 use commonware_runtime::{
1464 deterministic, reschedule, Clock, Runner as _, Spawner as _, Supervisor as _,
1465 };
1466 use commonware_utils::{
1467 channel::{mpsc, oneshot, ring},
1468 sync::AsyncRwLock,
1469 };
1470 use futures::{pin_mut, FutureExt, SinkExt};
1471 use std::{
1472 convert::Infallible,
1473 num::{NonZeroU64, NonZeroUsize},
1474 sync::{
1475 atomic::{AtomicBool, AtomicUsize, Ordering},
1476 Arc,
1477 },
1478 time::Duration,
1479 };
1480
1481 #[derive(Default)]
1482 struct TestDb;
1483
1484 #[derive(Default)]
1485 struct OneStepRewindDb;
1486
1487 #[derive(Default)]
1488 struct ThreeStepRewindDb;
1489
1490 struct CountingRewindDb {
1491 current_target: u64,
1492 rewind_count: usize,
1493 }
1494
1495 impl<E: Send> ManagedDb<E> for TestDb {
1496 type Unmerkleized = TestUnmerkleized;
1497 type Merkleized = TestMerkleized;
1498 type Error = Infallible;
1499 type Config = ();
1500 type SyncTarget = ();
1501
1502 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1503 Ok(Self)
1504 }
1505
1506 async fn new_batch(db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1507 let _guard = db.read().await;
1508 TestUnmerkleized
1509 }
1510
1511 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1512 true
1513 }
1514
1515 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1516 Ok(())
1517 }
1518
1519 async fn sync_target(&self) -> Self::SyncTarget {}
1520
1521 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1522 Ok(())
1523 }
1524 }
1525
1526 impl<E: Send> ManagedDb<E> for OneStepRewindDb {
1527 type Unmerkleized = TestUnmerkleized;
1528 type Merkleized = TestMerkleized;
1529 type Error = Infallible;
1530 type Config = ();
1531 type SyncTarget = ();
1532
1533 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1534 Ok(Self)
1535 }
1536
1537 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1538 TestUnmerkleized
1539 }
1540
1541 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1542 true
1543 }
1544
1545 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1546 Ok(())
1547 }
1548
1549 async fn sync_target(&self) -> Self::SyncTarget {}
1550
1551 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1552 Ok(())
1553 }
1554
1555 fn max_rewind_depth() -> Option<usize> {
1556 Some(1)
1557 }
1558 }
1559
1560 impl<E: Send> ManagedDb<E> for ThreeStepRewindDb {
1561 type Unmerkleized = TestUnmerkleized;
1562 type Merkleized = TestMerkleized;
1563 type Error = Infallible;
1564 type Config = ();
1565 type SyncTarget = ();
1566
1567 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1568 Ok(Self)
1569 }
1570
1571 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1572 TestUnmerkleized
1573 }
1574
1575 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1576 true
1577 }
1578
1579 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1580 Ok(())
1581 }
1582
1583 async fn sync_target(&self) -> Self::SyncTarget {}
1584
1585 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1586 Ok(())
1587 }
1588
1589 fn max_rewind_depth() -> Option<usize> {
1590 Some(3)
1591 }
1592 }
1593
1594 impl<E: Send> ManagedDb<E> for CountingRewindDb {
1595 type Unmerkleized = TestUnmerkleized;
1596 type Merkleized = TestMerkleized;
1597 type Error = Infallible;
1598 type Config = ();
1599 type SyncTarget = u64;
1600
1601 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1602 unreachable!("CountingRewindDb is constructed directly in tests")
1603 }
1604
1605 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1606 TestUnmerkleized
1607 }
1608
1609 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1610 true
1611 }
1612
1613 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1614 Ok(())
1615 }
1616
1617 async fn sync_target(&self) -> Self::SyncTarget {
1618 self.current_target
1619 }
1620
1621 async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Self::Error> {
1622 self.current_target = target;
1623 self.rewind_count += 1;
1624 Ok(())
1625 }
1626 }
1627
1628 struct BlockingFinalizeDb {
1629 started: Option<oneshot::Sender<()>>,
1630 release: Option<oneshot::Receiver<()>>,
1631 }
1632
1633 impl BlockingFinalizeDb {
1634 fn new(started: oneshot::Sender<()>, release: oneshot::Receiver<()>) -> Self {
1635 Self {
1636 started: Some(started),
1637 release: Some(release),
1638 }
1639 }
1640 }
1641
1642 #[derive(Debug)]
1643 struct TestFinalizeError;
1644
1645 struct FailingFinalizeDb;
1646
1647 struct SlowSyncDb {
1648 final_target: u64,
1649 }
1650
1651 struct RejectDuplicateTargetSyncDb {
1652 final_target: u64,
1653 }
1654
1655 struct StaleReachedSyncDb {
1656 final_target: u64,
1657 }
1658
1659 struct FastSyncDb {
1660 final_target: u64,
1661 }
1662
1663 struct ImmediateStateSyncDb;
1664
1665 struct FailingStateSyncDb;
1666
1667 struct MismatchedTargetSyncDb {
1668 final_target: u64,
1669 }
1670
1671 struct FinishClosedSyncDb {
1672 final_target: u64,
1673 }
1674
1675 struct ObservedSlowSyncDb {
1676 final_target: u64,
1677 }
1678
1679 struct ObservedFastSyncDb {
1680 final_target: u64,
1681 }
1682
1683 struct DistinctObservedFastSyncDb {
1684 final_target: u64,
1685 }
1686
1687 #[derive(Clone)]
1688 struct SlowSyncController {
1689 release: Arc<AtomicBool>,
1690 }
1691
1692 #[derive(Clone)]
1693 struct FastSyncObserver {
1694 ready: Arc<AtomicBool>,
1695 update_count: Arc<AtomicUsize>,
1696 }
1697
1698 impl<E: Send> ManagedDb<E> for FailingFinalizeDb {
1699 type Unmerkleized = TestUnmerkleized;
1700 type Merkleized = TestMerkleized;
1701 type Error = TestFinalizeError;
1702 type Config = ();
1703 type SyncTarget = ();
1704
1705 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1706 Ok(Self)
1707 }
1708
1709 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1710 TestUnmerkleized
1711 }
1712
1713 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1714 true
1715 }
1716
1717 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1718 Err(TestFinalizeError)
1719 }
1720
1721 async fn sync_target(&self) -> Self::SyncTarget {}
1722
1723 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1724 Ok(())
1725 }
1726 }
1727
1728 #[test]
1729 fn single_db_set_reports_unbounded_rewind_depth() {
1730 let rewind_depth =
1731 <Arc<AsyncRwLock<TestDb>> as DatabaseSet<deterministic::Context>>::max_rewind_depth();
1732 assert_eq!(rewind_depth, None);
1733 }
1734
1735 #[test]
1736 fn single_db_set_reports_one_step_rewind_depth() {
1737 let rewind_depth = <Arc<AsyncRwLock<OneStepRewindDb>> as DatabaseSet<
1738 deterministic::Context,
1739 >>::max_rewind_depth();
1740 assert_eq!(rewind_depth, Some(1));
1741 }
1742
1743 #[test]
1744 fn tuple_db_set_uses_most_restrictive_finite_rewind_depth() {
1745 type DbSet = (
1746 Arc<AsyncRwLock<TestDb>>,
1747 Arc<AsyncRwLock<ThreeStepRewindDb>>,
1748 Arc<AsyncRwLock<OneStepRewindDb>>,
1749 );
1750
1751 let rewind_depth = <DbSet as DatabaseSet<deterministic::Context>>::max_rewind_depth();
1752 assert_eq!(rewind_depth, Some(1));
1753 }
1754
1755 #[test]
1756 fn rewind_window_assertion_accepts_equal_pending_acks_and_rewind_depth() {
1757 assert_rewind_window_safety::<deterministic::Context, Arc<AsyncRwLock<OneStepRewindDb>>>(
1758 NonZeroUsize::new(1).unwrap(),
1759 );
1760 }
1761
1762 #[test]
1763 #[should_panic(expected = "marshal max_pending_acks=2 exceeds database_set.max_rewind_depth=1")]
1764 fn rewind_window_assertion_panics_when_pending_acks_exceed_rewind_depth() {
1765 assert_rewind_window_safety::<deterministic::Context, Arc<AsyncRwLock<OneStepRewindDb>>>(
1766 NonZeroUsize::new(2).unwrap(),
1767 );
1768 }
1769
1770 #[test]
1771 fn tuple_rewind_to_targets_skips_already_aligned_databases() {
1772 deterministic::Runner::default().start(|_context| async move {
1773 type DbSet = (
1774 Arc<AsyncRwLock<CountingRewindDb>>,
1775 Arc<AsyncRwLock<CountingRewindDb>>,
1776 );
1777
1778 let left = Arc::new(AsyncRwLock::new(CountingRewindDb {
1779 current_target: 2,
1780 rewind_count: 0,
1781 }));
1782 let right = Arc::new(AsyncRwLock::new(CountingRewindDb {
1783 current_target: 1,
1784 rewind_count: 0,
1785 }));
1786 let databases: DbSet = (left.clone(), right.clone());
1787
1788 <DbSet as DatabaseSet<deterministic::Context>>::rewind_to_targets(&databases, (1, 1))
1789 .await;
1790
1791 let left = left.read().await;
1792 assert_eq!(left.current_target, 1);
1793 assert_eq!(left.rewind_count, 1);
1794
1795 let right = right.read().await;
1796 assert_eq!(right.current_target, 1);
1797 assert_eq!(right.rewind_count, 0);
1798 });
1799 }
1800
1801 impl<E: Send> ManagedDb<E> for BlockingFinalizeDb {
1802 type Unmerkleized = TestUnmerkleized;
1803 type Merkleized = TestMerkleized;
1804 type Error = Infallible;
1805 type Config = ();
1806 type SyncTarget = ();
1807
1808 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1809 unreachable!("BlockingFinalizeDb is constructed directly in tests")
1810 }
1811
1812 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1813 TestUnmerkleized
1814 }
1815
1816 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1817 true
1818 }
1819
1820 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1821 if let Some(started) = self.started.take() {
1822 let _ = started.send(());
1823 }
1824 if let Some(release) = self.release.take() {
1825 let _ = release.await;
1826 }
1827 Ok(())
1828 }
1829
1830 async fn sync_target(&self) -> Self::SyncTarget {}
1831
1832 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1833 Ok(())
1834 }
1835 }
1836
1837 impl<E: Send> ManagedDb<E> for SlowSyncDb {
1838 type Unmerkleized = TestUnmerkleized;
1839 type Merkleized = TestMerkleized;
1840 type Error = Infallible;
1841 type Config = ();
1842 type SyncTarget = u64;
1843
1844 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1845 unreachable!("SlowSyncDb is only constructed through state sync in tests")
1846 }
1847
1848 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1849 TestUnmerkleized
1850 }
1851
1852 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1853 true
1854 }
1855
1856 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1857 Ok(())
1858 }
1859
1860 async fn sync_target(&self) -> Self::SyncTarget {
1861 self.final_target
1862 }
1863
1864 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1865 Ok(())
1866 }
1867 }
1868
1869 impl<E: Send> ManagedDb<E> for RejectDuplicateTargetSyncDb {
1870 type Unmerkleized = TestUnmerkleized;
1871 type Merkleized = TestMerkleized;
1872 type Error = Infallible;
1873 type Config = ();
1874 type SyncTarget = u64;
1875
1876 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1877 unreachable!(
1878 "RejectDuplicateTargetSyncDb is only constructed through state sync in tests"
1879 )
1880 }
1881
1882 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1883 TestUnmerkleized
1884 }
1885
1886 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1887 true
1888 }
1889
1890 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1891 Ok(())
1892 }
1893
1894 async fn sync_target(&self) -> Self::SyncTarget {
1895 self.final_target
1896 }
1897
1898 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1899 Ok(())
1900 }
1901 }
1902
1903 impl<E: Send> ManagedDb<E> for FastSyncDb {
1904 type Unmerkleized = TestUnmerkleized;
1905 type Merkleized = TestMerkleized;
1906 type Error = Infallible;
1907 type Config = ();
1908 type SyncTarget = u64;
1909
1910 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1911 unreachable!("FastSyncDb is only constructed through state sync in tests")
1912 }
1913
1914 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1915 TestUnmerkleized
1916 }
1917
1918 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1919 true
1920 }
1921
1922 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1923 Ok(())
1924 }
1925
1926 async fn sync_target(&self) -> Self::SyncTarget {
1927 self.final_target
1928 }
1929
1930 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1931 Ok(())
1932 }
1933 }
1934
1935 impl<E: Send> ManagedDb<E> for FailingStateSyncDb {
1936 type Unmerkleized = TestUnmerkleized;
1937 type Merkleized = TestMerkleized;
1938 type Error = Infallible;
1939 type Config = ();
1940 type SyncTarget = u64;
1941
1942 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1943 unreachable!("FailingStateSyncDb is only constructed through state sync in tests")
1944 }
1945
1946 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1947 TestUnmerkleized
1948 }
1949
1950 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1951 true
1952 }
1953
1954 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1955 Ok(())
1956 }
1957
1958 async fn sync_target(&self) -> Self::SyncTarget {
1959 0
1960 }
1961
1962 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1963 Ok(())
1964 }
1965 }
1966
1967 impl<E: Send> ManagedDb<E> for MismatchedTargetSyncDb {
1968 type Unmerkleized = TestUnmerkleized;
1969 type Merkleized = TestMerkleized;
1970 type Error = Infallible;
1971 type Config = ();
1972 type SyncTarget = u64;
1973
1974 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1975 unreachable!("MismatchedTargetSyncDb is only constructed through state sync in tests")
1976 }
1977
1978 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1979 TestUnmerkleized
1980 }
1981
1982 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1983 true
1984 }
1985
1986 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1987 Ok(())
1988 }
1989
1990 async fn sync_target(&self) -> Self::SyncTarget {
1991 self.final_target
1992 }
1993
1994 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1995 Ok(())
1996 }
1997 }
1998
1999 impl<E: Send> ManagedDb<E> for ImmediateStateSyncDb {
2000 type Unmerkleized = TestUnmerkleized;
2001 type Merkleized = TestMerkleized;
2002 type Error = Infallible;
2003 type Config = ();
2004 type SyncTarget = u64;
2005
2006 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2007 unreachable!("ImmediateStateSyncDb is only constructed through state sync in tests")
2008 }
2009
2010 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2011 TestUnmerkleized
2012 }
2013
2014 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2015 true
2016 }
2017
2018 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2019 Ok(())
2020 }
2021
2022 async fn sync_target(&self) -> Self::SyncTarget {
2023 0
2024 }
2025
2026 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2027 Ok(())
2028 }
2029 }
2030
2031 impl<E: Send> ManagedDb<E> for FinishClosedSyncDb {
2032 type Unmerkleized = TestUnmerkleized;
2033 type Merkleized = TestMerkleized;
2034 type Error = Infallible;
2035 type Config = ();
2036 type SyncTarget = u64;
2037
2038 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2039 unreachable!("FinishClosedSyncDb is only constructed through state sync in tests")
2040 }
2041
2042 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2043 TestUnmerkleized
2044 }
2045
2046 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2047 true
2048 }
2049
2050 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2051 Ok(())
2052 }
2053
2054 async fn sync_target(&self) -> Self::SyncTarget {
2055 self.final_target
2056 }
2057
2058 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2059 Ok(())
2060 }
2061 }
2062
2063 impl<E: Send> ManagedDb<E> for ObservedSlowSyncDb {
2064 type Unmerkleized = TestUnmerkleized;
2065 type Merkleized = TestMerkleized;
2066 type Error = Infallible;
2067 type Config = ();
2068 type SyncTarget = u64;
2069
2070 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2071 unreachable!("ObservedSlowSyncDb is only constructed through state sync in tests")
2072 }
2073
2074 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2075 TestUnmerkleized
2076 }
2077
2078 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2079 true
2080 }
2081
2082 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2083 Ok(())
2084 }
2085
2086 async fn sync_target(&self) -> Self::SyncTarget {
2087 self.final_target
2088 }
2089
2090 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2091 Ok(())
2092 }
2093 }
2094
2095 impl<E: Send> ManagedDb<E> for ObservedFastSyncDb {
2096 type Unmerkleized = TestUnmerkleized;
2097 type Merkleized = TestMerkleized;
2098 type Error = Infallible;
2099 type Config = ();
2100 type SyncTarget = u64;
2101
2102 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2103 unreachable!("ObservedFastSyncDb is only constructed through state sync in tests")
2104 }
2105
2106 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2107 TestUnmerkleized
2108 }
2109
2110 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2111 true
2112 }
2113
2114 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2115 Ok(())
2116 }
2117
2118 async fn sync_target(&self) -> Self::SyncTarget {
2119 self.final_target
2120 }
2121
2122 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2123 Ok(())
2124 }
2125 }
2126
2127 impl<E: Send> ManagedDb<E> for DistinctObservedFastSyncDb {
2128 type Unmerkleized = TestUnmerkleized;
2129 type Merkleized = TestMerkleized;
2130 type Error = Infallible;
2131 type Config = ();
2132 type SyncTarget = u64;
2133
2134 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2135 unreachable!(
2136 "DistinctObservedFastSyncDb is only constructed through state sync in tests"
2137 )
2138 }
2139
2140 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2141 TestUnmerkleized
2142 }
2143
2144 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2145 true
2146 }
2147
2148 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2149 Ok(())
2150 }
2151
2152 async fn sync_target(&self) -> Self::SyncTarget {
2153 self.final_target
2154 }
2155
2156 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2157 Ok(())
2158 }
2159 }
2160
2161 impl<E> StateSyncDb<E, Arc<AtomicBool>> for SlowSyncDb
2162 where
2163 E: Send + Clock,
2164 {
2165 type SyncError = Infallible;
2166
2167 async fn sync_db(
2168 context: E,
2169 _config: Self::Config,
2170 release: Arc<AtomicBool>,
2171 target: Self::SyncTarget,
2172 tip_updates: mpsc::Receiver<Self::SyncTarget>,
2173 mut finish: Option<mpsc::Receiver<()>>,
2174 reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2175 _sync_config: SyncEngineConfig,
2176 ) -> Result<Self, Self::SyncError> {
2177 while !release.load(Ordering::SeqCst) {
2178 context.sleep(Duration::from_millis(1)).await;
2179 }
2180 let mut final_target = target;
2181 let mut tip_updates = Some(tip_updates);
2182
2183 loop {
2184 if let Some(reached_target) = reached_target.as_ref() {
2185 if reached_target.send(final_target).await.is_err() {
2186 break;
2187 }
2188 }
2189
2190 context.sleep(Duration::from_millis(1)).await;
2191
2192 if finish.is_none() && tip_updates.is_none() {
2193 break;
2194 }
2195
2196 let finish_signal = finish.as_mut().map_or_else(
2197 || futures::future::Either::Right(futures::future::pending()),
2198 |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2199 );
2200 let update_signal = tip_updates.as_mut().map_or_else(
2201 || futures::future::Either::Right(futures::future::pending()),
2202 |update_rx| futures::future::Either::Left(update_rx.recv()),
2203 );
2204
2205 select! {
2206 _ = finish_signal => {
2207 break;
2208 },
2209 update = update_signal => match update {
2210 Some(update) => {
2211 final_target = update;
2212 }
2213 None => {
2214 tip_updates = None;
2215 if finish.is_none() {
2216 break;
2217 }
2218 }
2219 },
2220 }
2221 }
2222
2223 Ok(Self { final_target })
2224 }
2225 }
2226
2227 impl<E> StateSyncDb<E, Arc<AtomicBool>> for RejectDuplicateTargetSyncDb
2228 where
2229 E: Send + Clock,
2230 {
2231 type SyncError = Infallible;
2232
2233 async fn sync_db(
2234 context: E,
2235 _config: Self::Config,
2236 release: Arc<AtomicBool>,
2237 target: Self::SyncTarget,
2238 mut tip_updates: mpsc::Receiver<Self::SyncTarget>,
2239 mut finish: Option<mpsc::Receiver<()>>,
2240 reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2241 _sync_config: SyncEngineConfig,
2242 ) -> Result<Self, Self::SyncError> {
2243 let mut final_target = target;
2244 while !release.load(Ordering::SeqCst) {
2245 match tip_updates.try_recv() {
2246 Ok(update) => {
2247 assert_ne!(
2248 update, final_target,
2249 "state sync must not send duplicate target updates"
2250 );
2251 final_target = update;
2252 }
2253 Err(mpsc::error::TryRecvError::Empty) => {}
2254 Err(mpsc::error::TryRecvError::Disconnected) => break,
2255 }
2256 context.sleep(Duration::from_millis(1)).await;
2257 }
2258
2259 if let Some(reached_target) = reached_target.as_ref() {
2260 let _ = reached_target.send(final_target).await;
2261 }
2262 if let Some(finish_rx) = finish.as_mut() {
2263 let _ = finish_rx.recv().await;
2264 }
2265
2266 Ok(Self { final_target })
2267 }
2268 }
2269
2270 impl<E: Send> ManagedDb<E> for StaleReachedSyncDb {
2271 type Unmerkleized = TestUnmerkleized;
2272 type Merkleized = TestMerkleized;
2273 type Error = Infallible;
2274 type Config = ();
2275 type SyncTarget = u64;
2276
2277 async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2278 unreachable!("StaleReachedSyncDb is only constructed through state sync in tests")
2279 }
2280
2281 async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2282 TestUnmerkleized
2283 }
2284
2285 fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2286 true
2287 }
2288
2289 async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2290 Ok(())
2291 }
2292
2293 async fn sync_target(&self) -> Self::SyncTarget {
2294 self.final_target
2295 }
2296
2297 async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2298 Ok(())
2299 }
2300 }
2301
2302 impl<E> StateSyncDb<E, ()> for StaleReachedSyncDb
2303 where
2304 E: Send + Clock,
2305 {
2306 type SyncError = Infallible;
2307
2308 async fn sync_db(
2309 context: E,
2310 _config: Self::Config,
2311 _resolver: (),
2312 target: Self::SyncTarget,
2313 mut tip_updates: mpsc::Receiver<Self::SyncTarget>,
2314 mut finish: Option<mpsc::Receiver<()>>,
2315 reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2316 _sync_config: SyncEngineConfig,
2317 ) -> Result<Self, Self::SyncError> {
2318 let update = tip_updates.recv().await.expect("expected forwarded tip");
2319 if let Some(reached_target) = reached_target.as_ref() {
2320 let _ = reached_target.send(target).await;
2321 }
2322
2323 let finish_signal = finish.as_mut().map_or_else(
2324 || futures::future::Either::Right(futures::future::pending()),
2325 |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2326 );
2327 select! {
2328 _ = finish_signal => Ok(Self {
2329 final_target: target
2330 }),
2331 _ = context.sleep(Duration::from_millis(10)) => {
2332 if let Some(reached_target) = reached_target.as_ref() {
2333 let _ = reached_target.send(update).await;
2334 }
2335 if let Some(finish_rx) = finish.as_mut() {
2336 let _ = finish_rx.recv().await;
2337 }
2338 Ok(Self {
2339 final_target: update,
2340 })
2341 },
2342 }
2343 }
2344 }
2345
2346 impl<E: Send> StateSyncDb<E, Arc<AtomicBool>> for FastSyncDb {
2347 type SyncError = Infallible;
2348
2349 async fn sync_db(
2350 _context: E,
2351 _config: Self::Config,
2352 done: Arc<AtomicBool>,
2353 target: Self::SyncTarget,
2354 tip_updates: mpsc::Receiver<Self::SyncTarget>,
2355 mut finish: Option<mpsc::Receiver<()>>,
2356 reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2357 _sync_config: SyncEngineConfig,
2358 ) -> Result<Self, Self::SyncError> {
2359 done.store(true, Ordering::SeqCst);
2360 let mut final_target = target;
2361 let mut tip_updates = Some(tip_updates);
2362
2363 loop {
2364 if let Some(reached_target) = reached_target.as_ref() {
2365 if reached_target.send(final_target).await.is_err() {
2366 break;
2367 }
2368 }
2369
2370 if finish.is_none() && tip_updates.is_none() {
2371 break;
2372 }
2373
2374 let finish_signal = finish.as_mut().map_or_else(
2375 || futures::future::Either::Right(futures::future::pending()),
2376 |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2377 );
2378 let update_signal = tip_updates.as_mut().map_or_else(
2379 || futures::future::Either::Right(futures::future::pending()),
2380 |update_rx| futures::future::Either::Left(update_rx.recv()),
2381 );
2382
2383 select! {
2384 _ = finish_signal => {
2385 break;
2386 },
2387 update = update_signal => match update {
2388 Some(update) => {
2389 final_target = update;
2390 }
2391 None => {
2392 tip_updates = None;
2393 if finish.is_none() {
2394 break;
2395 }
2396 }
2397 },
2398 }
2399 }
2400
2401 Ok(Self { final_target })
2402 }
2403 }
2404
2405 #[derive(Debug)]
2406 struct TestSyncError;
2407
2408 #[derive(Debug)]
2409 struct FinishClosedSyncError;
2410
2411 impl<E: Send> StateSyncDb<E, ()> for FailingStateSyncDb {
2412 type SyncError = TestSyncError;
2413
2414 async fn sync_db(
2415 _context: E,
2416 _config: Self::Config,
2417 _resolver: (),
2418 _target: Self::SyncTarget,
2419 _tip_updates: mpsc::Receiver<Self::SyncTarget>,
2420 _finish: Option<mpsc::Receiver<()>>,
2421 _reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2422 _sync_config: SyncEngineConfig,
2423 ) -> Result<Self, Self::SyncError> {
2424 Err(TestSyncError)
2425 }
2426 }
2427
2428 impl<E: Send> StateSyncDb<E, ()> for ImmediateStateSyncDb {
2429 type SyncError = Infallible;
2430
2431 async fn sync_db(
2432 _context: E,
2433 _config: Self::Config,
2434 _resolver: (),
2435 _target: Self::SyncTarget,
2436 _tip_updates: mpsc::Receiver<Self::SyncTarget>,
2437 _finish: Option<mpsc::Receiver<()>>,
2438 _reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2439 _sync_config: SyncEngineConfig,
2440 ) -> Result<Self, Self::SyncError> {
2441 Ok(Self)
2442 }
2443 }
2444
2445 impl<E: Send> StateSyncDb<E, ()> for MismatchedTargetSyncDb {
2446 type SyncError = Infallible;
2447
2448 async fn sync_db(
2449 _context: E,
2450 _config: Self::Config,
2451 _resolver: (),
2452 target: Self::SyncTarget,
2453 _tip_updates: mpsc::Receiver<Self::SyncTarget>,
2454 mut finish: Option<mpsc::Receiver<()>>,
2455 reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2456 _sync_config: SyncEngineConfig,
2457 ) -> Result<Self, Self::SyncError> {
2458 if let Some(reached_target) = reached_target.as_ref() {
2459 let _ = reached_target.send(target).await;
2460 }
2461 if let Some(finish_rx) = finish.as_mut() {
2462 let _ = finish_rx.recv().await;
2463 }
2464 Ok(Self {
2465 final_target: target + 1,
2466 })
2467 }
2468 }
2469
2470 impl<E: Send> StateSyncDb<E, ()> for FinishClosedSyncDb {
2471 type SyncError = FinishClosedSyncError;
2472
2473 async fn sync_db(
2474 _context: E,
2475 _config: Self::Config,
2476 _resolver: (),
2477 target: Self::SyncTarget,
2478 _tip_updates: mpsc::Receiver<Self::SyncTarget>,
2479 mut finish: Option<mpsc::Receiver<()>>,
2480 _reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2481 _sync_config: SyncEngineConfig,
2482 ) -> Result<Self, Self::SyncError> {
2483 let Some(finish_rx) = finish.as_mut() else {
2484 panic!("finish receiver should be provided");
2485 };
2486 match finish_rx.recv().await {
2487 Some(()) => Ok(Self {
2488 final_target: target,
2489 }),
2490 None => Err(FinishClosedSyncError),
2491 }
2492 }
2493 }
2494
2495 impl<E> StateSyncDb<E, SlowSyncController> for ObservedSlowSyncDb
2496 where
2497 E: Send + Clock,
2498 {
2499 type SyncError = Infallible;
2500
2501 async fn sync_db(
2502 context: E,
2503 _config: Self::Config,
2504 controller: SlowSyncController,
2505 target: Self::SyncTarget,
2506 tip_updates: mpsc::Receiver<Self::SyncTarget>,
2507 mut finish: Option<mpsc::Receiver<()>>,
2508 reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2509 _sync_config: SyncEngineConfig,
2510 ) -> Result<Self, Self::SyncError> {
2511 while !controller.release.load(Ordering::SeqCst) {
2512 context.sleep(Duration::from_millis(1)).await;
2513 }
2514
2515 let mut final_target = target;
2516 let mut tip_updates = Some(tip_updates);
2517 let mut reported_target = None;
2518 let mut observed_update = false;
2519 loop {
2520 if let Some(update_rx) = tip_updates.as_mut() {
2521 let mut drained = 0usize;
2522 loop {
2523 match update_rx.try_recv() {
2524 Ok(update) => {
2525 drained += 1;
2526 final_target = update;
2527 observed_update = true;
2528 reported_target = None;
2529 if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) {
2530 reschedule().await;
2531 }
2532 }
2533 Err(mpsc::error::TryRecvError::Empty) => {
2534 break;
2535 }
2536 Err(mpsc::error::TryRecvError::Disconnected) => {
2537 tip_updates = None;
2538 break;
2539 }
2540 }
2541 }
2542 }
2543
2544 if observed_update && reported_target != Some(final_target) {
2545 if let Some(reached_target) = reached_target.as_ref() {
2546 if reached_target.send(final_target).await.is_err() {
2547 break;
2548 }
2549 }
2550 reported_target = Some(final_target);
2551 }
2552
2553 if finish.is_none() && tip_updates.is_none() {
2554 break;
2555 }
2556
2557 let finish_signal = finish.as_mut().map_or_else(
2558 || futures::future::Either::Right(futures::future::pending()),
2559 |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2560 );
2561 let update_signal = tip_updates.as_mut().map_or_else(
2562 || futures::future::Either::Right(futures::future::pending()),
2563 |update_rx| futures::future::Either::Left(update_rx.recv()),
2564 );
2565
2566 select! {
2567 _ = finish_signal => {
2568 break;
2569 },
2570 update = update_signal => match update {
2571 Some(update) => {
2572 final_target = update;
2573 observed_update = true;
2574 reported_target = None;
2575 }
2576 None => {
2577 tip_updates = None;
2578 if finish.is_none() {
2579 break;
2580 }
2581 }
2582 },
2583 }
2584 }
2585
2586 Ok(Self { final_target })
2587 }
2588 }
2589
2590 impl<E: Send> StateSyncDb<E, FastSyncObserver> for ObservedFastSyncDb {
2591 type SyncError = Infallible;
2592
2593 async fn sync_db(
2594 _context: E,
2595 _config: Self::Config,
2596 observer: FastSyncObserver,
2597 target: Self::SyncTarget,
2598 tip_updates: mpsc::Receiver<Self::SyncTarget>,
2599 mut finish: Option<mpsc::Receiver<()>>,
2600 reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2601 _sync_config: SyncEngineConfig,
2602 ) -> Result<Self, Self::SyncError> {
2603 let mut final_target = target;
2604 let mut tip_updates = Some(tip_updates);
2605 let mut reported_target = None;
2606 observer.ready.store(true, Ordering::SeqCst);
2607
2608 loop {
2609 if reported_target != Some(final_target) {
2610 if let Some(reached_target) = reached_target.as_ref() {
2611 if reached_target.send(final_target).await.is_err() {
2612 break;
2613 }
2614 }
2615 reported_target = Some(final_target);
2616 }
2617
2618 if finish.is_none() && tip_updates.is_none() {
2619 break;
2620 }
2621
2622 let finish_signal = finish.as_mut().map_or_else(
2623 || futures::future::Either::Right(futures::future::pending()),
2624 |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2625 );
2626 let update_signal = tip_updates.as_mut().map_or_else(
2627 || futures::future::Either::Right(futures::future::pending()),
2628 |update_rx| futures::future::Either::Left(update_rx.recv()),
2629 );
2630
2631 select! {
2632 _ = finish_signal => {
2633 break;
2634 },
2635 update = update_signal => match update {
2636 Some(update) => {
2637 observer.update_count.fetch_add(1, Ordering::SeqCst);
2638 final_target = update;
2639 reported_target = None;
2640 }
2641 None => {
2642 tip_updates = None;
2643 if finish.is_none() {
2644 break;
2645 }
2646 }
2647 },
2648 }
2649 }
2650
2651 Ok(Self { final_target })
2652 }
2653 }
2654
2655 impl<E: Send> StateSyncDb<E, FastSyncObserver> for DistinctObservedFastSyncDb {
2656 type SyncError = Infallible;
2657
2658 async fn sync_db(
2659 _context: E,
2660 _config: Self::Config,
2661 observer: FastSyncObserver,
2662 target: Self::SyncTarget,
2663 tip_updates: mpsc::Receiver<Self::SyncTarget>,
2664 mut finish: Option<mpsc::Receiver<()>>,
2665 reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2666 _sync_config: SyncEngineConfig,
2667 ) -> Result<Self, Self::SyncError> {
2668 let mut final_target = target;
2669 let mut tip_updates = Some(tip_updates);
2670 let mut reported_target = None;
2671 observer.ready.store(true, Ordering::SeqCst);
2672
2673 loop {
2674 if reported_target != Some(final_target) {
2675 if let Some(reached_target) = reached_target.as_ref() {
2676 if reached_target.send(final_target).await.is_err() {
2677 break;
2678 }
2679 }
2680 reported_target = Some(final_target);
2681 }
2682
2683 if finish.is_none() && tip_updates.is_none() {
2684 break;
2685 }
2686
2687 let finish_signal = finish.as_mut().map_or_else(
2688 || futures::future::Either::Right(futures::future::pending()),
2689 |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2690 );
2691 let update_signal = tip_updates.as_mut().map_or_else(
2692 || futures::future::Either::Right(futures::future::pending()),
2693 |update_rx| futures::future::Either::Left(update_rx.recv()),
2694 );
2695
2696 select! {
2697 _ = finish_signal => {
2698 break;
2699 },
2700 update = update_signal => match update {
2701 Some(update) => {
2702 observer.update_count.fetch_add(1, Ordering::SeqCst);
2703 if update != final_target {
2704 final_target = update;
2705 reported_target = None;
2706 }
2707 }
2708 None => {
2709 tip_updates = None;
2710 if finish.is_none() {
2711 break;
2712 }
2713 }
2714 },
2715 }
2716 }
2717
2718 Ok(Self { final_target })
2719 }
2720 }
2721
2722 #[test]
2723 fn tuple_new_batches_queues_reads_concurrently() {
2724 deterministic::Runner::default().start(|_context| async move {
2725 let db1 = Arc::new(AsyncRwLock::new(TestDb));
2726 let db2 = Arc::new(AsyncRwLock::new(TestDb));
2727 let databases = (db1.clone(), db2.clone());
2728
2729 let writer1 = db1.write().await;
2730 let writer2 = db2.write().await;
2731
2732 let new_batches =
2733 <(Arc<AsyncRwLock<TestDb>>, Arc<AsyncRwLock<TestDb>>) as DatabaseSet<
2734 deterministic::Context,
2735 >>::new_batches(&databases);
2736 pin_mut!(new_batches);
2737 assert!(new_batches.as_mut().now_or_never().is_none());
2738
2739 drop(writer2);
2740 {
2741 let writer2_again = db2.write();
2742 pin_mut!(writer2_again);
2743 assert!(
2744 writer2_again.as_mut().now_or_never().is_none(),
2745 "tuple new_batches should queue reads for all databases concurrently"
2746 );
2747 }
2748
2749 drop(writer1);
2750 let _ = new_batches.await;
2751 });
2752 }
2753
2754 #[test]
2755 fn tuple_finalize_runs_databases_in_parallel() {
2756 deterministic::Runner::default().start(|_context| async move {
2757 let (started1_tx, started1_rx) = oneshot::channel();
2758 let (started2_tx, started2_rx) = oneshot::channel();
2759 let (release1_tx, release1_rx) = oneshot::channel();
2760 let (release2_tx, release2_rx) = oneshot::channel();
2761
2762 let databases = (
2763 Arc::new(AsyncRwLock::new(BlockingFinalizeDb::new(
2764 started1_tx,
2765 release1_rx,
2766 ))),
2767 Arc::new(AsyncRwLock::new(BlockingFinalizeDb::new(
2768 started2_tx,
2769 release2_rx,
2770 ))),
2771 );
2772
2773 let finalize = <(
2774 Arc<AsyncRwLock<BlockingFinalizeDb>>,
2775 Arc<AsyncRwLock<BlockingFinalizeDb>>,
2776 ) as DatabaseSet<deterministic::Context>>::finalize(
2777 &databases,
2778 (TestMerkleized, TestMerkleized),
2779 );
2780 pin_mut!(finalize);
2781 assert!(finalize.as_mut().now_or_never().is_none());
2782
2783 let started1 = started1_rx;
2784 let started2 = started2_rx;
2785 pin_mut!(started1);
2786 pin_mut!(started2);
2787 assert!(matches!(started1.as_mut().now_or_never(), Some(Ok(()))));
2788 assert!(
2789 matches!(started2.as_mut().now_or_never(), Some(Ok(()))),
2790 "tuple finalize should start all database finalizations concurrently"
2791 );
2792
2793 let _ = release1_tx.send(());
2794 let _ = release2_tx.send(());
2795 finalize.await;
2796 });
2797 }
2798
2799 #[test]
2800 #[should_panic(
2801 expected = "database finalize failed (index 1, type commonware_glue::stateful::db::tests::FailingFinalizeDb)"
2802 )]
2803 fn tuple_finalize_panic_identifies_failing_database() {
2804 deterministic::Runner::default().start(|_context| async move {
2805 let databases = (
2806 Arc::new(AsyncRwLock::new(TestDb)),
2807 Arc::new(AsyncRwLock::new(FailingFinalizeDb)),
2808 );
2809 <(
2810 Arc<AsyncRwLock<TestDb>>,
2811 Arc<AsyncRwLock<FailingFinalizeDb>>,
2812 ) as DatabaseSet<deterministic::Context>>::finalize(
2813 &databases,
2814 (TestMerkleized, TestMerkleized),
2815 )
2816 .await;
2817 });
2818 }
2819
2820 type TestAnchor = Anchor<sha256::Digest>;
2821
2822 fn anchor(n: u64) -> TestAnchor {
2823 mock_anchor(n, n as u8)
2824 }
2825
2826 #[test]
2827 fn single_tip_update_drain_keeps_highest_recorded_target() {
2828 deterministic::Runner::default().start(|_context| async move {
2829 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
2830 let (target_tx, mut target_rx) = mpsc::channel(4);
2831 let (newer_update, newer_observed) = TipUpdate::with_observation(anchor(2), 2u64);
2832 let (older_update, older_observed) = TipUpdate::with_observation(anchor(1), 1u64);
2833
2834 let _ = tip_tx.send(newer_update).await;
2835 let _ = tip_tx.send(older_update).await;
2836
2837 let mut tip_updates = Some(tip_rx);
2838 let mut current_anchor = anchor(0);
2839 let mut current_target = 0u64;
2840 assert!(
2841 drain_single_tip_updates(
2842 &mut tip_updates,
2843 &target_tx,
2844 &mut current_anchor,
2845 &mut current_target,
2846 )
2847 .await
2848 );
2849
2850 newer_observed
2851 .await
2852 .expect("newer update should be observed");
2853 older_observed
2854 .await
2855 .expect("older update should also be observed");
2856 assert_eq!(current_anchor, anchor(2));
2857 assert_eq!(current_target, 2);
2858 assert_eq!(target_rx.recv().await, Some(2));
2859 assert!(matches!(
2860 target_rx.try_recv(),
2861 Err(mpsc::error::TryRecvError::Empty)
2862 ));
2863 });
2864 }
2865
2866 #[test]
2867 fn single_tip_update_drain_advances_anchor_without_duplicate_target() {
2868 deterministic::Runner::default().start(|_context| async move {
2869 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
2870 let (target_tx, mut target_rx) = mpsc::channel(1);
2871 let (update, observed) = TipUpdate::with_observation(anchor(3), 7u64);
2872
2873 let _ = tip_tx.send(update).await;
2874
2875 let mut tip_updates = Some(tip_rx);
2876 let mut current_anchor = anchor(2);
2877 let mut current_target = 7u64;
2878 assert!(
2879 drain_single_tip_updates(
2880 &mut tip_updates,
2881 &target_tx,
2882 &mut current_anchor,
2883 &mut current_target,
2884 )
2885 .await
2886 );
2887
2888 observed.await.expect("update should be observed");
2889 assert_eq!(current_anchor, anchor(3));
2890 assert_eq!(current_target, 7);
2891 assert!(matches!(
2892 target_rx.try_recv(),
2893 Err(mpsc::error::TryRecvError::Empty)
2894 ));
2895 });
2896 }
2897
2898 #[test]
2899 fn single_state_sync_handles_closed_tip_updates_channel() {
2900 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
2901 let (tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
2902 let release = Arc::new(AtomicBool::new(false));
2903 let release_for_sync = release.clone();
2904
2905 let sync = context.child("single_state_sync_closed_tip_updates").spawn(
2906 move |context| async move {
2907 <Arc<AsyncRwLock<SlowSyncDb>> as StateSyncSet<
2908 deterministic::Context,
2909 Arc<AtomicBool>,
2910 sha256::Digest,
2911 >>::sync(
2912 context,
2913 (),
2914 release_for_sync,
2915 anchor(0),
2916 0,
2917 tip_rx,
2918 SyncEngineConfig {
2919 fetch_batch_size: NonZeroU64::new(1).unwrap(),
2920 apply_batch_size: 1,
2921 max_outstanding_requests: 1,
2922 update_channel_size: NonZeroUsize::new(1).unwrap(),
2923 max_retained_roots: 0,
2924 },
2925 )
2926 .await
2927 .expect("single state sync should succeed")
2928 },
2929 );
2930
2931 drop(tip_tx);
2932 context.sleep(Duration::from_millis(1)).await;
2933 release.store(true, Ordering::SeqCst);
2934
2935 let (_database, converged_anchor) = sync.await.expect("sync task should complete");
2936 assert_eq!(converged_anchor, anchor(0));
2937 });
2938 }
2939
2940 #[test]
2941 fn single_state_sync_preserves_db_error_when_target_channel_closes() {
2942 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
2943 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
2944 let _ = tip_tx.send(TipUpdate::new(anchor(1), 1u64)).await;
2945
2946 let result = <Arc<AsyncRwLock<FailingStateSyncDb>> as StateSyncSet<
2947 deterministic::Context,
2948 (),
2949 sha256::Digest,
2950 >>::sync(
2951 context,
2952 (),
2953 (),
2954 anchor(0),
2955 0,
2956 tip_rx,
2957 SyncEngineConfig {
2958 fetch_batch_size: NonZeroU64::new(1).unwrap(),
2959 apply_batch_size: 1,
2960 max_outstanding_requests: 1,
2961 update_channel_size: NonZeroUsize::new(1).unwrap(),
2962 max_retained_roots: 0,
2963 },
2964 )
2965 .await;
2966
2967 assert!(matches!(result, Err(TestSyncError)));
2968 });
2969 }
2970
2971 #[test]
2972 fn single_state_sync_ignores_backward_tip_updates() {
2973 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
2974 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
2975 let release = Arc::new(AtomicBool::new(true));
2976 let resolver = SlowSyncController {
2977 release: release.clone(),
2978 };
2979
2980 let sync = context
2981 .child("single_state_sync_ignores_backward_tip_updates")
2982 .spawn(move |context| async move {
2983 <Arc<AsyncRwLock<ObservedSlowSyncDb>> as StateSyncSet<
2984 deterministic::Context,
2985 SlowSyncController,
2986 sha256::Digest,
2987 >>::sync(
2988 context,
2989 (),
2990 resolver,
2991 anchor(0),
2992 0,
2993 tip_rx,
2994 SyncEngineConfig {
2995 fetch_batch_size: NonZeroU64::new(1).unwrap(),
2996 apply_batch_size: 1,
2997 max_outstanding_requests: 1,
2998 update_channel_size: NonZeroUsize::new(4).unwrap(),
2999 max_retained_roots: 0,
3000 },
3001 )
3002 .await
3003 .expect("single state sync should succeed")
3004 });
3005
3006 let _ = tip_tx.send(TipUpdate::new(anchor(2), 2)).await;
3007 let _ = tip_tx.send(TipUpdate::new(anchor(1), 1)).await;
3008 drop(tip_tx);
3009
3010 let (database, converged_anchor) = sync.await.expect("sync task should complete");
3011 let final_target = database.read().await.final_target;
3012 assert_eq!(
3013 final_target, 2,
3014 "single-db sync target must never move backward"
3015 );
3016 assert_eq!(
3017 converged_anchor,
3018 anchor(2),
3019 "converged anchor must remain on the highest seen tip"
3020 );
3021 });
3022 }
3023
3024 #[test]
3025 fn single_state_sync_advances_anchor_without_duplicate_target_update() {
3026 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3027 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3028 let release = Arc::new(AtomicBool::new(false));
3029 let release_for_sync = release.clone();
3030
3031 let sync = context.child("single_state_sync_noop_target_update").spawn(
3032 move |context| async move {
3033 <Arc<AsyncRwLock<RejectDuplicateTargetSyncDb>> as StateSyncSet<
3034 deterministic::Context,
3035 Arc<AtomicBool>,
3036 sha256::Digest,
3037 >>::sync(
3038 context,
3039 (),
3040 release_for_sync,
3041 anchor(7),
3042 7,
3043 tip_rx,
3044 SyncEngineConfig {
3045 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3046 apply_batch_size: 1,
3047 max_outstanding_requests: 1,
3048 update_channel_size: NonZeroUsize::new(4).unwrap(),
3049 max_retained_roots: 0,
3050 },
3051 )
3052 .await
3053 .expect("single state sync should succeed")
3054 },
3055 );
3056
3057 let (update, observed) = TipUpdate::with_observation(anchor(9), 7);
3058 let _ = tip_tx.send(update).await;
3059 observed
3060 .await
3061 .expect("single-db coordinator should record noop target update");
3062 release.store(true, Ordering::SeqCst);
3063 drop(tip_tx);
3064
3065 let (database, converged_anchor) = sync.await.expect("sync task should complete");
3066 assert_eq!(database.read().await.final_target, 7);
3067 assert_eq!(converged_anchor, anchor(9));
3068 });
3069 }
3070
3071 #[test]
3072 fn single_state_sync_ignores_stale_reached_after_forwarded_tip() {
3073 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3074 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3075
3076 let sync =
3077 context
3078 .child("single_state_sync_stale_reached")
3079 .spawn(move |context| async move {
3080 <Arc<AsyncRwLock<StaleReachedSyncDb>> as StateSyncSet<
3081 deterministic::Context,
3082 (),
3083 sha256::Digest,
3084 >>::sync(
3085 context,
3086 (),
3087 (),
3088 anchor(0),
3089 0,
3090 tip_rx,
3091 SyncEngineConfig {
3092 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3093 apply_batch_size: 1,
3094 max_outstanding_requests: 1,
3095 update_channel_size: NonZeroUsize::new(4).unwrap(),
3096 max_retained_roots: 0,
3097 },
3098 )
3099 .await
3100 .expect("single state sync should succeed")
3101 });
3102
3103 let _ = tip_tx.send(TipUpdate::new(anchor(2), 2)).await;
3104
3105 let (database, converged_anchor) = sync.await.expect("sync task should complete");
3106 let final_target = database.read().await.final_target;
3107 assert_eq!(
3108 final_target, 2,
3109 "single-db sync must not finish on a stale reached target",
3110 );
3111 assert_eq!(
3112 converged_anchor,
3113 anchor(2),
3114 "converged anchor must match the target the database reached",
3115 );
3116 });
3117 }
3118
3119 #[test]
3120 fn tuple_state_sync_converges_before_finish() {
3121 deterministic::Runner::default().start(|context| async move {
3122 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3123 let slow_release = Arc::new(AtomicBool::new(false));
3124 let fast_done = Arc::new(AtomicBool::new(false));
3125
3126 let slow_release_for_sync = slow_release.clone();
3127 let fast_done_for_sync = fast_done.clone();
3128 let sync = context
3129 .child("tuple_state_sync")
3130 .spawn(move |context| async move {
3131 <(Arc<AsyncRwLock<SlowSyncDb>>, Arc<AsyncRwLock<FastSyncDb>>) as StateSyncSet<
3132 deterministic::Context,
3133 (Arc<AtomicBool>, Arc<AtomicBool>),
3134 sha256::Digest,
3135 >>::sync(
3136 context,
3137 ((), ()),
3138 (slow_release_for_sync, fast_done_for_sync),
3139 anchor(0),
3140 (0, 0),
3141 tip_rx,
3142 SyncEngineConfig {
3143 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3144 apply_batch_size: 1,
3145 max_outstanding_requests: 1,
3146 update_channel_size: NonZeroUsize::new(4).unwrap(),
3147 max_retained_roots: 0,
3148 },
3149 )
3150 .await
3151 .expect("tuple state sync should succeed")
3152 });
3153
3154 while !fast_done.load(Ordering::SeqCst) {
3155 context.sleep(Duration::from_millis(1)).await;
3156 }
3157 let _ = tip_tx.send(TipUpdate::new(anchor(1), (1, 1))).await;
3158 let _ = tip_tx.send(TipUpdate::new(anchor(2), (2, 2))).await;
3159 slow_release.store(true, Ordering::SeqCst);
3160 drop(tip_tx);
3161
3162 let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3163 let slow_target = synced.0.read().await.final_target;
3164 let fast_target = synced.1.read().await.final_target;
3165
3166 assert_eq!(
3167 slow_target, fast_target,
3168 "all databases should finish on the same converged target set"
3169 );
3170 assert_eq!(
3171 converged_anchor.height.get(),
3172 slow_target,
3173 "returned anchor height should match the converged generation"
3174 );
3175 });
3176 }
3177
3178 #[test]
3179 fn tuple_state_sync_ignores_backward_tip_updates() {
3180 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3181 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(8).unwrap());
3182 let slow_release = Arc::new(AtomicBool::new(false));
3183 let fast_done = Arc::new(AtomicBool::new(false));
3184
3185 let slow_release_for_sync = slow_release.clone();
3186 let fast_done_for_sync = fast_done.clone();
3187 let sync = context
3188 .child("tuple_state_sync_ignores_backward_tip_updates")
3189 .spawn(move |context| async move {
3190 <(Arc<AsyncRwLock<SlowSyncDb>>, Arc<AsyncRwLock<FastSyncDb>>) as StateSyncSet<
3191 deterministic::Context,
3192 (Arc<AtomicBool>, Arc<AtomicBool>),
3193 sha256::Digest,
3194 >>::sync(
3195 context,
3196 ((), ()),
3197 (slow_release_for_sync, fast_done_for_sync),
3198 anchor(0),
3199 (0, 0),
3200 tip_rx,
3201 SyncEngineConfig {
3202 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3203 apply_batch_size: 1,
3204 max_outstanding_requests: 1,
3205 update_channel_size: NonZeroUsize::new(8).unwrap(),
3206 max_retained_roots: 0,
3207 },
3208 )
3209 .await
3210 .expect("tuple state sync should succeed")
3211 });
3212
3213 while !fast_done.load(Ordering::SeqCst) {
3214 context.sleep(Duration::from_millis(1)).await;
3215 }
3216
3217 let _ = tip_tx.send(TipUpdate::new(anchor(2), (2, 2))).await;
3218 let _ = tip_tx.send(TipUpdate::new(anchor(1), (1, 1))).await;
3219 drop(tip_tx);
3220 context.sleep(Duration::from_millis(1)).await;
3221 slow_release.store(true, Ordering::SeqCst);
3222
3223 let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3224 let slow_target = synced.0.read().await.final_target;
3225 let fast_target = synced.1.read().await.final_target;
3226 assert_eq!(
3227 slow_target, 2,
3228 "slow database target must never move backward"
3229 );
3230 assert_eq!(
3231 fast_target, 2,
3232 "fast database target must never move backward"
3233 );
3234 assert_eq!(
3235 converged_anchor,
3236 anchor(2),
3237 "converged anchor must remain on the highest seen tip"
3238 );
3239 });
3240 }
3241
3242 #[test]
3243 fn tuple_state_sync_rejects_database_target_mismatch() {
3244 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3245 let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
3246 let fast_done = Arc::new(AtomicBool::new(false));
3247
3248 let result = <(
3249 Arc<AsyncRwLock<MismatchedTargetSyncDb>>,
3250 Arc<AsyncRwLock<FastSyncDb>>,
3251 ) as StateSyncSet<
3252 deterministic::Context,
3253 ((), Arc<AtomicBool>),
3254 sha256::Digest,
3255 >>::sync(
3256 context,
3257 ((), ()),
3258 ((), fast_done),
3259 anchor(7),
3260 (7, 7),
3261 tip_rx,
3262 SyncEngineConfig {
3263 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3264 apply_batch_size: 1,
3265 max_outstanding_requests: 1,
3266 update_channel_size: NonZeroUsize::new(1).unwrap(),
3267 max_retained_roots: 0,
3268 },
3269 )
3270 .await;
3271
3272 let err = match result {
3273 Ok(_) => panic!("tuple state sync should reject a mismatched database target"),
3274 Err(err) => err,
3275 };
3276 assert!(
3277 err.contains("database targets do not match"),
3278 "error should identify the target mismatch, got: {err}"
3279 );
3280 });
3281 }
3282
3283 #[test]
3284 fn tuple_state_sync_returns_db_error_instead_of_panicking_when_anchor_missing() {
3285 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3286 let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
3287
3288 let result = <(
3289 Arc<AsyncRwLock<ImmediateStateSyncDb>>,
3290 Arc<AsyncRwLock<FailingStateSyncDb>>,
3291 ) as StateSyncSet<deterministic::Context, ((), ()), sha256::Digest>>::sync(
3292 context,
3293 ((), ()),
3294 ((), ()),
3295 anchor(0),
3296 (0, 0),
3297 tip_rx,
3298 SyncEngineConfig {
3299 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3300 apply_batch_size: 1,
3301 max_outstanding_requests: 1,
3302 update_channel_size: NonZeroUsize::new(1).unwrap(),
3303 max_retained_roots: 0,
3304 },
3305 )
3306 .await;
3307
3308 let err = match result {
3309 Ok(_) => panic!("tuple state sync should return the database sync error"),
3310 Err(err) => err,
3311 };
3312 assert!(
3313 err.contains("state sync failed (index 1, db"),
3314 "error should include failing database index: {err}"
3315 );
3316 assert!(
3317 err.contains("FailingStateSyncDb"),
3318 "error should include failing database type: {err}"
3319 );
3320 });
3321 }
3322
3323 #[test]
3324 fn tuple_state_sync_returns_db_error_when_other_database_waits_for_finish() {
3325 deterministic::Runner::timed(Duration::from_secs(1)).start(|context| async move {
3326 let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
3327 let release = Arc::new(AtomicBool::new(true));
3328
3329 let result = <(
3330 Arc<AsyncRwLock<SlowSyncDb>>,
3331 Arc<AsyncRwLock<FailingStateSyncDb>>,
3332 ) as StateSyncSet<
3333 deterministic::Context,
3334 (Arc<AtomicBool>, ()),
3335 sha256::Digest,
3336 >>::sync(
3337 context,
3338 ((), ()),
3339 (release, ()),
3340 anchor(0),
3341 (0, 0),
3342 tip_rx,
3343 SyncEngineConfig {
3344 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3345 apply_batch_size: 1,
3346 max_outstanding_requests: 1,
3347 update_channel_size: NonZeroUsize::new(1).unwrap(),
3348 max_retained_roots: 0,
3349 },
3350 )
3351 .await;
3352
3353 let err = match result {
3354 Ok(_) => panic!("tuple state sync should return the database sync error"),
3355 Err(err) => err,
3356 };
3357 assert!(
3358 err.contains("state sync failed (index 1, db"),
3359 "error should include failing database index: {err}"
3360 );
3361 assert!(
3362 err.contains("FailingStateSyncDb"),
3363 "error should include failing database type: {err}"
3364 );
3365 });
3366 }
3367
3368 #[test]
3369 fn tuple_state_sync_preserves_original_failure_when_peer_finish_channel_closes() {
3370 deterministic::Runner::timed(Duration::from_secs(1)).start(|context| async move {
3371 let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
3372
3373 let result = <(
3374 Arc<AsyncRwLock<FinishClosedSyncDb>>,
3375 Arc<AsyncRwLock<FailingStateSyncDb>>,
3376 ) as StateSyncSet<deterministic::Context, ((), ()), sha256::Digest>>::sync(
3377 context,
3378 ((), ()),
3379 ((), ()),
3380 anchor(0),
3381 (0, 0),
3382 tip_rx,
3383 SyncEngineConfig {
3384 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3385 apply_batch_size: 1,
3386 max_outstanding_requests: 1,
3387 update_channel_size: NonZeroUsize::new(1).unwrap(),
3388 max_retained_roots: 0,
3389 },
3390 )
3391 .await;
3392
3393 let err = match result {
3394 Ok(_) => panic!("tuple state sync should return the database sync error"),
3395 Err(err) => err,
3396 };
3397 assert!(
3398 err.contains("state sync failed (index 1, db"),
3399 "error should include failing database index, got: {err}",
3400 );
3401 assert!(
3402 err.contains("FailingStateSyncDb"),
3403 "error should include failing database type, got: {err}",
3404 );
3405 });
3406 }
3407
3408 #[test]
3409 fn coordinator_rejects_stale_reached_event_from_older_generation() {
3410 let mut state = CoordinatorState::new(2, anchor(0), (0u64, 0u64));
3411
3412 state.record_tip_update(anchor(1), (1, 1));
3413 match state.next_action() {
3414 CoordinatorAction::Dispatch {
3415 generation,
3416 targets: (left, right),
3417 } => {
3418 assert_eq!(generation, 1, "coordinator should dispatch generation 1");
3419 assert_eq!((left, right), (1, 1));
3420 }
3421 CoordinatorAction::Wait => panic!("coordinator should dispatch the newer tip"),
3422 CoordinatorAction::Converged { anchor, .. } => {
3423 panic!("coordinator converged too early at {anchor:?}")
3424 }
3425 }
3426
3427 state.record_reached(1, 0);
3430
3431 state.record_reached(0, 1);
3433
3434 match state.next_action() {
3435 CoordinatorAction::Wait => {}
3436 CoordinatorAction::Dispatch { targets, .. } => {
3437 panic!(
3438 "coordinator should wait for a fresh reached event, got dispatch {targets:?}"
3439 )
3440 }
3441 CoordinatorAction::Converged { anchor, .. } => {
3442 panic!("stale reached event must not allow convergence at {anchor:?}")
3443 }
3444 }
3445 }
3446
3447 #[test]
3448 fn coordinator_dispatches_pending_tip_before_converging() {
3449 let mut state = CoordinatorState::new(2, anchor(0), (0u64, 0u64));
3450
3451 state.record_tip_update(anchor(1), (1, 1));
3452 match state.next_action() {
3453 CoordinatorAction::Dispatch {
3454 generation,
3455 targets: (left, right),
3456 } => {
3457 assert_eq!(generation, 1, "coordinator should dispatch generation 1");
3458 assert_eq!((left, right), (1, 1));
3459 }
3460 CoordinatorAction::Wait => panic!("coordinator should dispatch the newer tip"),
3461 CoordinatorAction::Converged { anchor, .. } => {
3462 panic!("coordinator converged too early at {anchor:?}")
3463 }
3464 }
3465
3466 state.record_reached(0, 1);
3467 state.record_reached(1, 1);
3468 state.record_tip_update(anchor(2), (2, 2));
3469
3470 match state.next_action() {
3471 CoordinatorAction::Dispatch {
3472 generation,
3473 targets: (left, right),
3474 } => {
3475 assert_eq!(generation, 2, "coordinator should advance to generation 2");
3476 assert_eq!((left, right), (2, 2));
3477 }
3478 CoordinatorAction::Wait => panic!("coordinator should dispatch the pending tip"),
3479 CoordinatorAction::Converged { anchor, .. } => {
3480 panic!("coordinator should not converge with a pending tip: {anchor:?}")
3481 }
3482 }
3483 }
3484
3485 #[test]
3486 fn tuple_state_sync_stops_updates_after_reached_until_regroup() {
3487 deterministic::Runner::default().start(|context| async move {
3488 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(32).unwrap());
3489 let slow_release = Arc::new(AtomicBool::new(true));
3490 let fast_ready = Arc::new(AtomicBool::new(false));
3491 let fast_update_count = Arc::new(AtomicUsize::new(0));
3492
3493 let slow_resolver = SlowSyncController {
3494 release: slow_release.clone(),
3495 };
3496 let fast_resolver = FastSyncObserver {
3497 ready: fast_ready.clone(),
3498 update_count: fast_update_count.clone(),
3499 };
3500 let sync = context.child("tuple_state_sync_algorithm").spawn(
3501 move |context| async move {
3502 <(
3503 Arc<AsyncRwLock<ObservedSlowSyncDb>>,
3504 Arc<AsyncRwLock<ObservedFastSyncDb>>,
3505 ) as StateSyncSet<
3506 deterministic::Context,
3507 (SlowSyncController, FastSyncObserver),
3508 sha256::Digest,
3509 >>::sync(
3510 context,
3511 ((), ()),
3512 (slow_resolver, fast_resolver),
3513 anchor(0),
3514 (0, 0),
3515 tip_rx,
3516 SyncEngineConfig {
3517 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3518 apply_batch_size: 1,
3519 max_outstanding_requests: 1,
3520 update_channel_size: NonZeroUsize::new(1).unwrap(),
3521 max_retained_roots: 0,
3522 },
3523 )
3524 .await
3525 .expect("tuple state sync should succeed")
3526 },
3527 );
3528
3529 while !fast_ready.load(Ordering::SeqCst) {
3530 context.sleep(Duration::from_millis(1)).await;
3531 }
3532
3533 for target in 1..=16u64 {
3534 let _ = tip_tx.send(TipUpdate::new(anchor(target), (target, target))).await;
3535 }
3536 drop(tip_tx);
3537
3538 let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3539 let slow_target = synced.0.read().await.final_target;
3540 let fast_target = synced.1.read().await.final_target;
3541
3542 assert_eq!(
3543 slow_target, fast_target,
3544 "all databases should finish on the same converged target set"
3545 );
3546 assert_eq!(
3547 converged_anchor.height.get(), slow_target,
3548 "returned anchor height should match the converged generation"
3549 );
3550 assert_eq!(
3551 fast_update_count.load(Ordering::SeqCst),
3552 1,
3553 "a reached database must not receive tip updates before regroup; only regroup retarget should be observed"
3554 );
3555 });
3556 }
3557
3558 #[test]
3559 fn tuple_state_sync_allows_noop_database_while_other_catches_up() {
3560 deterministic::Runner::default().start(|context| async move {
3561 let (tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3562 let slow_release = Arc::new(AtomicBool::new(false));
3563 let fast_ready = Arc::new(AtomicBool::new(false));
3564 let fast_update_count = Arc::new(AtomicUsize::new(0));
3565 let target = 7u64;
3566
3567 let sync = context.child("tuple_state_sync_noop").spawn({
3568 let slow_resolver = slow_release.clone();
3569 let fast_resolver = FastSyncObserver {
3570 ready: fast_ready.clone(),
3571 update_count: fast_update_count.clone(),
3572 };
3573 move |context| async move {
3574 <(
3575 Arc<AsyncRwLock<SlowSyncDb>>,
3576 Arc<AsyncRwLock<ObservedFastSyncDb>>,
3577 ) as StateSyncSet<
3578 deterministic::Context,
3579 (Arc<AtomicBool>, FastSyncObserver),
3580 sha256::Digest,
3581 >>::sync(
3582 context,
3583 ((), ()),
3584 (slow_resolver, fast_resolver),
3585 anchor(target),
3586 (target, target),
3587 tip_rx,
3588 SyncEngineConfig {
3589 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3590 apply_batch_size: 1,
3591 max_outstanding_requests: 1,
3592 update_channel_size: NonZeroUsize::new(1).unwrap(),
3593 max_retained_roots: 0,
3594 },
3595 )
3596 .await
3597 .expect("tuple state sync should succeed")
3598 }
3599 });
3600
3601 while !fast_ready.load(Ordering::SeqCst) {
3602 context.sleep(Duration::from_millis(1)).await;
3603 }
3604
3605 drop(tip_tx);
3606 slow_release.store(true, Ordering::SeqCst);
3607
3608 let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3609 let slow_target = synced.0.read().await.final_target;
3610 let fast_target = synced.1.read().await.final_target;
3611
3612 assert_eq!(slow_target, target);
3613 assert_eq!(fast_target, target);
3614 assert_eq!(converged_anchor, anchor(target));
3615 assert_eq!(
3616 fast_update_count.load(Ordering::SeqCst),
3617 0,
3618 "already-at-target database should not receive tip updates"
3619 );
3620 });
3621 }
3622
3623 #[test]
3624 fn tuple_state_sync_regroup_completes_when_database_target_is_unchanged() {
3625 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3626 let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3627 let slow_release = Arc::new(AtomicBool::new(false));
3628 let fast_ready = Arc::new(AtomicBool::new(false));
3629 let fast_update_count = Arc::new(AtomicUsize::new(0));
3630
3631 let sync = context
3632 .child("tuple_state_sync_regroup_unchanged_target")
3633 .spawn({
3634 let slow_resolver = slow_release.clone();
3635 let fast_resolver = FastSyncObserver {
3636 ready: fast_ready.clone(),
3637 update_count: fast_update_count.clone(),
3638 };
3639 move |context| async move {
3640 <(
3641 Arc<AsyncRwLock<SlowSyncDb>>,
3642 Arc<AsyncRwLock<DistinctObservedFastSyncDb>>,
3643 ) as StateSyncSet<
3644 deterministic::Context,
3645 (Arc<AtomicBool>, FastSyncObserver),
3646 sha256::Digest,
3647 >>::sync(
3648 context,
3649 ((), ()),
3650 (slow_resolver, fast_resolver),
3651 anchor(0),
3652 (0, 7),
3653 tip_rx,
3654 SyncEngineConfig {
3655 fetch_batch_size: NonZeroU64::new(1).unwrap(),
3656 apply_batch_size: 1,
3657 max_outstanding_requests: 1,
3658 update_channel_size: NonZeroUsize::new(4).unwrap(),
3659 max_retained_roots: 0,
3660 },
3661 )
3662 .await
3663 .expect("tuple state sync should succeed")
3664 }
3665 });
3666
3667 while !fast_ready.load(Ordering::SeqCst) {
3668 context.sleep(Duration::from_millis(1)).await;
3669 }
3670
3671 let _ = tip_tx.send(TipUpdate::new(anchor(9), (9, 7))).await;
3672 context.sleep(Duration::from_millis(1)).await;
3673 slow_release.store(true, Ordering::SeqCst);
3674 drop(tip_tx);
3675
3676 let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3677 let slow_target = synced.0.read().await.final_target;
3678 let fast_target = synced.1.read().await.final_target;
3679
3680 assert_eq!(slow_target, 9);
3681 assert_eq!(fast_target, 7);
3682 assert_eq!(converged_anchor, anchor(9));
3683 assert_eq!(
3684 fast_update_count.load(Ordering::SeqCst),
3685 0,
3686 "the unchanged-target database should not receive duplicate target updates",
3687 );
3688 });
3689 }
3690
3691 #[derive(Default)]
3692 struct AttachDb1;
3693
3694 #[derive(Default)]
3695 struct AttachDb2;
3696
3697 #[derive(Clone)]
3698 struct RecordingResolver {
3699 id: &'static str,
3700 log: Arc<commonware_utils::sync::Mutex<Vec<&'static str>>>,
3701 }
3702
3703 impl RecordingResolver {
3704 fn new(
3705 id: &'static str,
3706 log: Arc<commonware_utils::sync::Mutex<Vec<&'static str>>>,
3707 ) -> Self {
3708 Self { id, log }
3709 }
3710 }
3711
3712 impl<DB: Send + Sync + 'static> AttachableResolver<DB> for RecordingResolver {
3713 async fn attach_database(&self, _db: Arc<AsyncRwLock<DB>>) {
3714 self.log.lock().push(self.id);
3715 }
3716 }
3717
3718 #[test]
3719 fn single_db_attach_calls_single_resolver() {
3720 deterministic::Runner::default().start(|_| async move {
3721 let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new()));
3722 let resolver = RecordingResolver::new("db1", log.clone());
3723 let db = Arc::new(AsyncRwLock::new(AttachDb1));
3724
3725 resolver.attach_databases(db).await;
3726 assert_eq!(&*log.lock(), &["db1"]);
3727 });
3728 }
3729
3730 #[test]
3731 fn tuple_attach_is_index_stable() {
3732 deterministic::Runner::default().start(|_| async move {
3733 let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new()));
3734 let resolvers = (
3735 RecordingResolver::new("resolver_0", log.clone()),
3736 RecordingResolver::new("resolver_1", log.clone()),
3737 );
3738 let databases = (
3739 Arc::new(AsyncRwLock::new(AttachDb1)),
3740 Arc::new(AsyncRwLock::new(AttachDb2)),
3741 );
3742
3743 resolvers.attach_databases(databases).await;
3744 assert_eq!(&*log.lock(), &["resolver_0", "resolver_1"]);
3745 });
3746 }
3747
3748 #[test]
3749 fn heterogeneous_tuple_attach_compiles() {
3750 deterministic::Runner::default().start(|_| async move {
3751 let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new()));
3752 let resolvers = (
3753 RecordingResolver::new("db1", log.clone()),
3754 RecordingResolver::new("db2", log.clone()),
3755 );
3756 let databases = (
3757 Arc::new(AsyncRwLock::new(AttachDb1)),
3758 Arc::new(AsyncRwLock::new(AttachDb2)),
3759 );
3760
3761 resolvers.attach_databases(databases).await;
3762 assert_eq!(&*log.lock(), &["db1", "db2"]);
3763 });
3764 }
3765}