Skip to main content

commonware_glue/stateful/db/
mod.rs

1//! Traits for database batch lifecycle and state sync in [`Stateful`](super::Stateful).
2//!
3//! This module defines the boundary between stateful application logic and
4//! storage backends (QMDB variants).
5//!
6//! # Batch Lifecycle
7//!
8//! Normal execution has three stages:
9//! 1. [`Unmerkleized`]: mutable, in-progress batch (concrete types expose reads and writes).
10//! 2. [`Merkleized`]: a sealed batch with a computed root.
11//! 3. Finalization: persist the sealed batch via [`ManagedDb::finalize`].
12//!
13//! [`DatabaseSet`] groups one or more [`ManagedDb`] instances into one logical
14//! unit for execution and commit.
15//!
16//! # State Sync
17//!
18//! State sync orchestration is expressed by two traits:
19//! - [`StateSyncDb`]: per-database sync entrypoint.
20//! - [`StateSyncSet`]: set-level orchestration.
21//!
22//! ## Anchors
23//!
24//! Each set of sync targets is paired with an anchor `(Height, Round, D)` where
25//! `D` is the block digest. The db layer never interprets the anchor; it
26//! only tracks which anchor each database converged on.
27//!
28//! On completion, [`StateSyncSet::sync`] returns the anchor that all databases
29//! agreed on. The caller uses this to set the marshal floor and the
30//! last-processed digest, ensuring they match the actual convergence point
31//! rather than whatever marshal's head happens to be (which may have advanced
32//! during sync).
33//!
34//! ## Convergence Algorithm (tuple sets)
35//!
36//! Tuple [`StateSyncSet`] implementations assign each `(anchor, targets)`
37//! pair a *generation* number and use this algorithm:
38//!
39//! 1. Forward tip updates only to databases that have not yet reported
40//!    "reached target". Reached databases are frozen to prevent them from
41//!    running ahead to a newer anchor.
42//! 2. When all databases report reached, compare the generation each was
43//!    assigned when it reported.
44//! 3. If all generations match, every database synced to targets from the
45//!    same anchor. Return that anchor.
46//! 4. If generations differ, *regroup*: re-send the highest-reached
47//!    generation's targets to the behind databases, clear their reached
48//!    state, and repeat from step 1.
49//!
50//! ### Chasing a moving tip
51//!
52//! ```text
53//! time -------------------------------------------------------------->
54//!
55//! marshal finalized tip:   A0 ------ A1 ------ A2 ------ A3
56//! generation:              g0        g1        g2        g3
57//!
58//! db0 (slow):              g0 ------------------> g1 -----------------> g3 reached
59//! db1 (fast):              g0 ----> g1 reached -- frozen -- regroup --> g3 reached
60//! db2 (fast):              g0 ----> g1 reached -- frozen -- regroup --> g3 reached
61//!
62//! coordinator queue while db0 is still catching up:
63//!                          [A2] [A3] -- drain --> keep only A3
64//!
65//! finish only when:
66//! - every database has reported the same generation
67//! - no newer tip update is still queued behind it
68//! ```
69//!
70//! The coordinator continuously drains tip updates and keeps only the latest
71//! value before forwarding, which avoids target-channel backpressure buildup.
72//! The `generation_state` map is pruned after every dispatch to only retain
73//! generations currently assigned to at least one database, so memory usage
74//! is bounded by the number of databases regardless of how long sync runs.
75
76use commonware_consensus::{
77    types::{Height, Round},
78    CertifiableBlock, Epochable, Roundable, Viewable,
79};
80use commonware_cryptography::Digest;
81use commonware_macros::select;
82use commonware_runtime::{reschedule, Metrics, Spawner};
83use commonware_utils::{
84    channel::{fallible::AsyncFallibleExt, mpsc, oneshot, ring},
85    sync::AsyncRwLock,
86};
87use futures::{
88    future::{pending, Either},
89    join,
90};
91use std::{
92    collections::BTreeMap,
93    fmt::Debug,
94    future::Future,
95    num::{NonZeroU64, NonZeroUsize},
96    sync::Arc,
97};
98
99const MAX_CHANNEL_DRAIN_PER_TICK: usize = 32;
100
101pub mod any;
102pub mod current;
103pub mod immutable;
104pub mod keyless;
105pub mod p2p;
106
107/// Mutable batch state before merkleization.
108///
109/// Concrete types provide key-value operations (`get`, `write`, `set`,
110/// `append`, etc.) as inherent methods; the generic wrapper only needs
111/// [`merkleize`](Self::merkleize).
112pub trait Unmerkleized: Sized + Send {
113    /// The merkleized batch produced by [`merkleize`](Self::merkleize).
114    type Merkleized: Merkleized;
115
116    /// The error type returned by fallible operations.
117    type Error: Send;
118
119    /// Resolve all mutations, compute the new state root, and produce a
120    /// merkleized batch.
121    fn merkleize(self) -> impl Future<Output = Result<Self::Merkleized, Self::Error>> + Send;
122}
123
124/// Sealed batch state with a computed root.
125///
126/// The application uses [`root`](Self::root) in block headers, and the wrapper
127/// later finalizes this batch.
128pub trait Merkleized: Sized + Send + Sync {
129    /// The digest type used for the state root.
130    type Digest: Digest;
131
132    /// The unmerkleized batch type produced by [`new_batch`](Self::new_batch).
133    type Unmerkleized: Unmerkleized;
134
135    /// The canonical state root committed in block headers.
136    fn root(&self) -> Self::Digest;
137
138    /// Create a child unmerkleized batch that reads through this batch's
139    /// pending changes before falling back to the committed database state.
140    ///
141    /// In QMDB, this maps to `merkleized_batch.new_batch()`.
142    fn new_batch(&self) -> Self::Unmerkleized;
143}
144
145/// One database managed by the [`Stateful`](super::Stateful) wrapper.
146///
147/// Implementations create new batches from committed state and persist finalized
148/// batches back to storage.
149///
150/// [`new_batch`](Self::new_batch) receives `Arc<AsyncRwLock<Self>>` so batch
151/// types can keep read-through access to committed state.
152///
153/// `E` is a trait generic (not an associated type), so one database type can
154/// work across runtimes that satisfy the bounds.
155pub trait ManagedDb<E>: Send + Sync + Sized {
156    /// An in-progress batch of mutations that has not yet been merkleized.
157    type Unmerkleized: Unmerkleized;
158
159    /// A batch whose root has been computed but has not yet been applied to
160    /// the underlying database.
161    ///
162    /// Constrained so that [`Merkleized::new_batch`] produces the same
163    /// [`Unmerkleized`] type as [`ManagedDb::new_batch`](Self::new_batch).
164    type Merkleized: Merkleized<Unmerkleized = Self::Unmerkleized>;
165
166    /// The error type returned by fallible operations.
167    type Error: Debug + Send;
168
169    /// Configuration needed to construct a new database instance.
170    type Config: Send;
171
172    /// Sync target type for state sync of this database.
173    ///
174    /// Typically a database-specific state commitment plus the operation range needed to reach it.
175    type SyncTarget: Clone + PartialEq + Send + Sync;
176
177    /// Construct a new database from its configuration.
178    fn init(
179        context: E,
180        config: Self::Config,
181    ) -> impl Future<Output = Result<Self, Self::Error>> + Send;
182
183    /// Create a new unmerkleized batch rooted at the database's committed
184    /// state.
185    ///
186    /// The `db` parameter is the `Arc<AsyncRwLock<Self>>` that wraps this
187    /// database, allowing batch types to capture a shared reference for
188    /// read-through to committed state.
189    fn new_batch(db: &Arc<AsyncRwLock<Self>>) -> impl Future<Output = Self::Unmerkleized> + Send;
190
191    /// Return true if a merkleized batch matches a committed sync target.
192    fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool;
193
194    /// Apply a merkleized batch's changeset to the underlying database.
195    ///
196    /// In QMDB, this encapsulates calling `merkleized.finalize()` to produce
197    /// a `Changeset`, then `db.apply_batch(changeset)` and `db.commit()`.
198    fn finalize(
199        &mut self,
200        batch: Self::Merkleized,
201    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
202
203    /// Return the sync target for this database's current committed state.
204    fn sync_target(&self) -> impl Future<Output = Self::SyncTarget> + Send;
205
206    /// Rewind committed state to `target`.
207    ///
208    /// Implementations must ensure rewind effects are durable before returning
209    /// `Ok(())` (for example by committing after rewind).
210    fn rewind_to_target(
211        &mut self,
212        target: Self::SyncTarget,
213    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
214
215    /// Return the maximum number of finalized commits this database can rewind.
216    ///
217    /// `None` means rewind depth is not bounded by a known finite limit.
218    fn max_rewind_depth() -> Option<usize> {
219        None
220    }
221}
222
223/// A collection of individually locked [`ManagedDb`] instances.
224///
225/// Each database is wrapped in `Arc<AsyncRwLock<...>>`, so the set is cheap to
226/// clone and each database can be shared without a global lock.
227///
228/// `E` is a trait generic (not an associated type), so one set type can work
229/// across runtimes that satisfy the bounds.
230pub trait DatabaseSet<E>: Clone + Send + Sync + 'static {
231    /// Tuple of [`ManagedDb::Unmerkleized`] for every database in the set.
232    type Unmerkleized: Send;
233
234    /// Tuple of [`ManagedDb::Merkleized`] for every database in the set.
235    type Merkleized: Send + Sync;
236
237    /// Configuration needed to construct every database in the set.
238    ///
239    /// - Single database sets use that database's [`ManagedDb::Config`].
240    /// - Multi-database tuple sets use a tuple of per-database configs
241    ///   `(Db1::Config, Db2::Config, ...)`.
242    type Config: Send;
243
244    /// Per-database sync targets extracted from a finalized block.
245    ///
246    /// For a single-database set this is one target. For multi-database sets it is a tuple of
247    /// targets, one per database.
248    type SyncTargets: Clone + PartialEq + Send + Sync;
249
250    /// Construct the database set from its configuration.
251    fn init(context: E, config: Self::Config) -> impl Future<Output = Self> + Send;
252
253    /// Create unmerkleized batches from each database's committed state.
254    ///
255    /// Acquires a read lock on each database.
256    fn new_batches(&self) -> impl Future<Output = Self::Unmerkleized> + Send;
257
258    /// Create child unmerkleized batches from a pending merkleized parent.
259    ///
260    /// No lock is needed; reads come from the in-memory merkleized state.
261    fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized;
262
263    /// Return true if merkleized batches match the committed sync targets.
264    fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool;
265
266    /// Apply each merkleized batch's changeset to its underlying database.
267    ///
268    /// Acquires a write lock on each database.
269    fn finalize(&self, batches: Self::Merkleized) -> impl Future<Output = ()> + Send;
270
271    /// Return sync targets for the set's current committed state.
272    fn committed_targets(&self) -> impl Future<Output = Self::SyncTargets> + Send;
273
274    /// Rewind the set to the provided per-database targets.
275    ///
276    /// Rewind failures are fatal for startup recovery and therefore panic.
277    fn rewind_to_targets(&self, targets: Self::SyncTargets) -> impl Future<Output = ()> + Send;
278
279    /// Return the most restrictive finite rewind depth across the database set.
280    ///
281    /// `None` means every database in the set is unbounded.
282    fn max_rewind_depth() -> Option<usize>;
283}
284
285pub(crate) fn assert_rewind_window_safety<E, D>(max_pending_acks: NonZeroUsize)
286where
287    D: DatabaseSet<E>,
288{
289    let Some(max_rewind_depth) = D::max_rewind_depth() else {
290        return;
291    };
292
293    assert!(
294        max_pending_acks.get() <= max_rewind_depth,
295        "marshal max_pending_acks={} exceeds database_set.max_rewind_depth={}",
296        max_pending_acks,
297        max_rewind_depth,
298    );
299}
300
301/// Parameters for a one-time state-sync pass.
302#[derive(Clone, Copy, Debug)]
303pub struct SyncEngineConfig {
304    /// Maximum operations fetched per resolver request.
305    pub fetch_batch_size: NonZeroU64,
306
307    /// Number of operations applied per local apply step.
308    pub apply_batch_size: usize,
309
310    /// Maximum number of outstanding resolver requests.
311    pub max_outstanding_requests: usize,
312
313    /// Capacity of per-database target-update channels.
314    pub update_channel_size: NonZeroUsize,
315
316    /// Number of historical roots to retain for proof verification across
317    /// target updates.
318    pub max_retained_roots: usize,
319}
320
321/// A [`ManagedDb`] with a state-sync entrypoint.
322pub trait StateSyncDb<E, R>: ManagedDb<E> {
323    /// Error returned by the state-sync engine for this database.
324    type SyncError: Debug + Send;
325
326    /// Run state-sync for this database and return a fully-initialized instance.
327    #[allow(clippy::too_many_arguments)]
328    fn sync_db(
329        context: E,
330        config: Self::Config,
331        resolver: R,
332        target: Self::SyncTarget,
333        tip_updates: mpsc::Receiver<Self::SyncTarget>,
334        finish: Option<mpsc::Receiver<()>>,
335        reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
336        sync_config: SyncEngineConfig,
337    ) -> impl Future<Output = Result<Self, Self::SyncError>> + Send;
338}
339
340/// Block metadata identifying the block that produced a set
341/// of sync targets.
342#[derive(Clone, Copy, Debug, PartialEq, Eq)]
343pub struct Anchor<D: Digest> {
344    /// Height of the anchoring block.
345    pub height: Height,
346    /// Consensus round of the anchoring block.
347    pub round: Round,
348    /// Digest of the anchoring block.
349    pub digest: D,
350}
351
352impl<B, D> From<&B> for Anchor<D>
353where
354    B: CertifiableBlock<Digest = D>,
355    B::Context: Epochable + Viewable,
356    D: Digest,
357{
358    fn from(block: &B) -> Self {
359        Self {
360            height: block.height(),
361            round: block.context().round(),
362            digest: block.digest(),
363        }
364    }
365}
366
367/// Tip update delivered to a live state-sync session.
368///
369/// The optional observation barrier is used by the stateful actor to delay
370/// marshal acknowledgement until the sync coordinator has recorded the new
371/// anchor and targets.
372pub struct TipUpdate<D: Digest, T> {
373    anchor: Anchor<D>,
374    targets: T,
375    observed: Option<oneshot::Sender<()>>,
376}
377
378impl<D: Digest, T> TipUpdate<D, T> {
379    pub const fn new(anchor: Anchor<D>, targets: T) -> Self {
380        Self {
381            anchor,
382            targets,
383            observed: None,
384        }
385    }
386
387    pub(crate) fn with_observation(anchor: Anchor<D>, targets: T) -> (Self, oneshot::Receiver<()>) {
388        let (observed, receiver) = oneshot::channel();
389        (
390            Self {
391                anchor,
392                targets,
393                observed: Some(observed),
394            },
395            receiver,
396        )
397    }
398
399    pub(crate) fn record(mut self) -> (Anchor<D>, T) {
400        if let Some(observed) = self.observed.take() {
401            let _ = observed.send(());
402        }
403        (self.anchor, self.targets)
404    }
405}
406
407/// A [`DatabaseSet`] that can run one-time state sync.
408///
409/// `D` is the block digest type. Each set of sync targets is paired
410/// with an [`Anchor`] identifying the block that produced those targets.
411/// On convergence, `sync` returns the anchor that all databases agreed on.
412pub trait StateSyncSet<E, R, D>: DatabaseSet<E>
413where
414    D: Digest,
415{
416    /// Error returned if any database in the set fails state sync.
417    type Error: Debug + Send;
418
419    /// Run one-time state sync and return the initialized set
420    /// together with the anchor all databases converged on.
421    #[allow(clippy::too_many_arguments)]
422    fn sync(
423        context: E,
424        config: Self::Config,
425        resolvers: R,
426        anchor: Anchor<D>,
427        targets: Self::SyncTargets,
428        tip_updates: ring::Receiver<TipUpdate<D, Self::SyncTargets>>,
429        sync_config: SyncEngineConfig,
430    ) -> impl Future<Output = Result<(Self, Anchor<D>), Self::Error>> + Send;
431}
432
433/// Implement [`DatabaseSet`] for a single [`ManagedDb`] behind a lock.
434impl<E: Send + Sync, T: ManagedDb<E> + 'static> DatabaseSet<E> for Arc<AsyncRwLock<T>> {
435    type Unmerkleized = T::Unmerkleized;
436    type Merkleized = T::Merkleized;
437    type Config = T::Config;
438    type SyncTargets = T::SyncTarget;
439
440    async fn init(context: E, config: Self::Config) -> Self {
441        let db = T::init(context, config)
442            .await
443            .expect("database init failed");
444        Self::new(AsyncRwLock::new(db))
445    }
446
447    async fn new_batches(&self) -> Self::Unmerkleized {
448        T::new_batch(self).await
449    }
450
451    fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized {
452        parent.new_batch()
453    }
454
455    fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool {
456        T::matches_sync_target(batches, targets)
457    }
458
459    async fn finalize(&self, batches: Self::Merkleized) {
460        let mut database = self.write().await;
461        finalize_or_panic(&mut *database, batches, None).await;
462    }
463
464    async fn committed_targets(&self) -> Self::SyncTargets {
465        let database = self.read().await;
466        T::sync_target(&*database).await
467    }
468
469    async fn rewind_to_targets(&self, target: Self::SyncTargets) {
470        let mut database = self.write().await;
471        if T::sync_target(&*database).await == target {
472            return;
473        }
474        rewind_or_panic(&mut *database, target, None).await;
475    }
476
477    fn max_rewind_depth() -> Option<usize> {
478        T::max_rewind_depth()
479    }
480}
481
482impl<E, T, R, D> StateSyncSet<E, R, D> for Arc<AsyncRwLock<T>>
483where
484    E: Send + Sync + Metrics,
485    T: StateSyncDb<E, R> + 'static,
486    R: Send + 'static,
487    D: Digest,
488{
489    type Error = T::SyncError;
490
491    #[allow(clippy::too_many_arguments)]
492    async fn sync(
493        context: E,
494        config: Self::Config,
495        resolver: R,
496        anchor: Anchor<D>,
497        target: Self::SyncTargets,
498        tip_updates: ring::Receiver<TipUpdate<D, Self::SyncTargets>>,
499        sync_config: SyncEngineConfig,
500    ) -> Result<(Self, Anchor<D>), Self::Error> {
501        let (target_tx, target_rx) = mpsc::channel(sync_config.update_channel_size.get());
502        let (finish_tx, finish_rx) = mpsc::channel(1);
503        let (reached_tx, mut reached_rx) = mpsc::channel(1);
504        let mut current_target = target.clone();
505        let sync = T::sync_db(
506            context,
507            config,
508            resolver,
509            target,
510            target_rx,
511            Some(finish_rx),
512            Some(reached_tx),
513            sync_config,
514        );
515
516        let coordinator = async {
517            let mut current_anchor = anchor;
518            let mut tip_updates = Some(tip_updates);
519            loop {
520                if !drain_single_tip_updates(
521                    &mut tip_updates,
522                    &target_tx,
523                    &mut current_anchor,
524                    &mut current_target,
525                )
526                .await
527                {
528                    return (current_anchor, current_target);
529                }
530
531                let update_future = tip_updates.as_mut().map_or_else(
532                    || Either::Right(pending()),
533                    |updates| Either::Left(updates.recv()),
534                );
535                select! {
536                    reached = reached_rx.recv() => {
537                        let Some(reached) = reached else {
538                            return (current_anchor, current_target);
539                        };
540                        if !drain_single_tip_updates(
541                            &mut tip_updates,
542                            &target_tx,
543                            &mut current_anchor,
544                            &mut current_target,
545                        )
546                        .await
547                        {
548                            return (current_anchor, current_target);
549                        };
550                        if reached != current_target {
551                            continue;
552                        }
553                        let _ = finish_tx.send_lossy(()).await;
554                        return (current_anchor, current_target);
555                    },
556                    update = update_future => {
557                        let Some(update) = update else {
558                            tip_updates = None;
559                            continue;
560                        };
561                        let (new_anchor, new_target) = update.record();
562                        if new_anchor.height <= current_anchor.height {
563                            continue;
564                        }
565                        current_anchor = new_anchor;
566                        if new_target == current_target {
567                            continue;
568                        }
569                        current_target = new_target.clone();
570                        if !target_tx.send_lossy(new_target).await {
571                            return (current_anchor, current_target);
572                        }
573                    },
574                }
575            }
576        };
577
578        let (db_result, (converged_anchor, converged_target)) = join!(sync, coordinator);
579        let database = db_result?;
580        assert!(
581            T::sync_target(&database).await == converged_target,
582            "state sync database target does not match the coordinator target",
583        );
584        Ok((Self::new(AsyncRwLock::new(database)), converged_anchor))
585    }
586}
587
588async fn drain_single_tip_updates<D, T>(
589    tip_updates: &mut Option<ring::Receiver<TipUpdate<D, T>>>,
590    target_tx: &mpsc::Sender<T>,
591    current_anchor: &mut Anchor<D>,
592    current_target: &mut T,
593) -> bool
594where
595    D: Digest,
596    T: Clone + PartialEq + Send + Sync,
597{
598    let mut drained = 0usize;
599    let mut latest = None;
600    loop {
601        let update = match tip_updates.as_mut().map(ring::Receiver::try_recv) {
602            Some(Ok(update)) => update,
603            Some(Err(ring::TryRecvError::Empty)) => break,
604            Some(Err(ring::TryRecvError::Disconnected)) => {
605                *tip_updates = None;
606                break;
607            }
608            None => break,
609        };
610        drained += 1;
611
612        let (new_anchor, new_target) = update.record();
613        if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) {
614            reschedule().await;
615        }
616
617        let latest_height = latest
618            .as_ref()
619            .map_or(current_anchor.height, |(anchor, _): &(Anchor<D>, T)| {
620                anchor.height
621            });
622        if new_anchor.height <= latest_height {
623            continue;
624        }
625        latest = Some((new_anchor, new_target));
626    }
627
628    let Some((new_anchor, new_target)) = latest else {
629        return true;
630    };
631    *current_anchor = new_anchor;
632    if new_target == *current_target {
633        return true;
634    }
635    *current_target = new_target.clone();
636    target_tx.send_lossy(new_target).await
637}
638
639/// Implement [`DatabaseSet`] for a tuple of individually-locked
640/// [`ManagedDb`] instances.
641macro_rules! impl_database_set {
642    ($($T:ident : $idx:tt),+) => {
643        impl<E: Send + Sync + Metrics, $($T: ManagedDb<E> + 'static),+> DatabaseSet<E>
644            for ($(Arc<AsyncRwLock<$T>>,)+)
645        {
646            type Unmerkleized = ($($T::Unmerkleized,)+);
647            type Merkleized = ($($T::Merkleized,)+);
648            type Config = ($($T::Config,)+);
649            type SyncTargets = ($($T::SyncTarget,)+);
650
651            async fn init(context: E, config: Self::Config) -> Self {
652                let result = join!($(
653                    async {
654                        let db = $T::init(
655                                context.child(concat!("db_", stringify!($idx))),
656                                config.$idx,
657                            )
658                            .await
659                            .expect(concat!(
660                                "database init failed (index ",
661                                stringify!($idx),
662                                ", type ",
663                                stringify!($T),
664                                ")",
665                            ));
666                        Arc::new(AsyncRwLock::new(db))
667                    },
668                )+);
669                result
670            }
671
672            async fn new_batches(&self) -> Self::Unmerkleized {
673                join!($($T::new_batch(&self.$idx),)+)
674            }
675
676            fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized {
677                ($(parent.$idx.new_batch(),)+)
678            }
679
680            fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool {
681                $($T::matches_sync_target(&batches.$idx, &targets.$idx))&&+
682            }
683
684            async fn finalize(&self, batches: Self::Merkleized) {
685                join!($(
686                    async {
687                        let mut database = self.$idx.write().await;
688                        finalize_or_panic(&mut *database, batches.$idx, Some($idx)).await;
689                    },
690                )+);
691            }
692
693            async fn committed_targets(&self) -> Self::SyncTargets {
694                join!($(
695                    async {
696                        let database = self.$idx.read().await;
697                        $T::sync_target(&*database).await
698                    },
699                )+)
700            }
701
702            async fn rewind_to_targets(&self, targets: Self::SyncTargets) {
703                join!($(
704                    async {
705                        let mut database = self.$idx.write().await;
706                        if $T::sync_target(&*database).await == targets.$idx {
707                            return;
708                        }
709                        rewind_or_panic(&mut *database, targets.$idx, Some($idx)).await;
710                    },
711                )+);
712            }
713
714            fn max_rewind_depth() -> Option<usize> {
715                let mut max_rewind_depth: Option<usize> = None;
716                $(
717                    max_rewind_depth = match (max_rewind_depth, $T::max_rewind_depth()) {
718                        (Some(current), Some(next)) => Some(current.min(next)),
719                        (Some(current), None) => Some(current),
720                        (None, Some(next)) => Some(next),
721                        (None, None) => None,
722                    };
723                )+
724                max_rewind_depth
725            }
726        }
727    };
728}
729
730impl_database_set!(DB1: 0);
731impl_database_set!(DB1: 0, DB2: 1);
732impl_database_set!(DB1: 0, DB2: 1, DB3: 2);
733impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3);
734impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4);
735impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5);
736impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5, DB7: 6);
737impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5, DB7: 6, DB8: 7);
738
739struct DbSyncChannels<T> {
740    target_tx: mpsc::Sender<T>,
741    target_rx: mpsc::Receiver<T>,
742    finish_tx: mpsc::Sender<()>,
743    finish_rx: mpsc::Receiver<()>,
744    generation_tx: mpsc::Sender<(usize, T)>,
745    generation_rx: mpsc::Receiver<(usize, T)>,
746    reached_tx: mpsc::Sender<T>,
747    reached_rx: mpsc::Receiver<T>,
748}
749
750impl<T> DbSyncChannels<T> {
751    fn new(update_channel_size: usize) -> Self {
752        let (target_tx, target_rx) = mpsc::channel(update_channel_size);
753        let (finish_tx, finish_rx) = mpsc::channel(1);
754        let (generation_tx, generation_rx) = mpsc::channel(update_channel_size);
755        let (reached_tx, reached_rx) = mpsc::channel(1);
756        Self {
757            target_tx,
758            target_rx,
759            finish_tx,
760            finish_rx,
761            generation_tx,
762            generation_rx,
763            reached_tx,
764            reached_rx,
765        }
766    }
767}
768
769struct CoordinatorSyncSenders<T> {
770    target_tx: mpsc::Sender<T>,
771    finish_tx: mpsc::Sender<()>,
772    generation_tx: mpsc::Sender<(usize, T)>,
773}
774
775macro_rules! impl_state_sync_set {
776    ($($T:ident : $R:ident : $idx:tt),+) => {
777        impl<E, D, $($T, $R),+> StateSyncSet<E, ($($R,)+), D> for ($(Arc<AsyncRwLock<$T>>,)+)
778        where
779            E: Send + Sync + Spawner + Metrics + 'static,
780            D: Digest + 'static,
781            $(
782                $T: StateSyncDb<E, $R> + 'static,
783                $R: Send + 'static,
784            )+
785        {
786            type Error = String;
787
788            #[allow(clippy::too_many_arguments)]
789            async fn sync(
790                context: E,
791                config: Self::Config,
792                resolvers: ($($R,)+),
793                anchor: Anchor<D>,
794                targets: Self::SyncTargets,
795                tip_updates: ring::Receiver<TipUpdate<D, Self::SyncTargets>>,
796                sync_config: SyncEngineConfig,
797            ) -> Result<(Self, Anchor<D>), Self::Error> {
798                let db_channels = ($(
799                    DbSyncChannels::<<$T as ManagedDb<E>>::SyncTarget>::new(
800                        sync_config.update_channel_size.get(),
801                    ),
802                )+);
803                let coordinator_senders = ($(
804                    CoordinatorSyncSenders {
805                        target_tx: db_channels.$idx.target_tx.clone(),
806                        finish_tx: db_channels.$idx.finish_tx.clone(),
807                        generation_tx: db_channels.$idx.generation_tx.clone(),
808                    },
809                )+);
810                let coordinator_owned_senders = ($(
811                    CoordinatorSyncSenders {
812                        target_tx: db_channels.$idx.target_tx,
813                        finish_tx: db_channels.$idx.finish_tx,
814                        generation_tx: db_channels.$idx.generation_tx,
815                    },
816                )+);
817                let (reached_event_tx, mut reached_event_rx) = mpsc::channel(16);
818                let (completion_tx, mut completion_rx) = mpsc::channel(1);
819                let db_count = [$($idx,)+].len();
820                let coordinator_targets = targets.clone();
821                let initial_targets = targets.clone();
822                let first_db_error: Arc<commonware_utils::sync::Mutex<Option<String>>> =
823                    Arc::new(commonware_utils::sync::Mutex::new(None));
824                let coordinator_handle = context.child("coordinator").spawn({
825                    move |_context| async move {
826                        let coordinator_owned_senders = coordinator_owned_senders;
827                        let mut tip_updates = Some(tip_updates);
828                        let mut state = CoordinatorState::new(db_count, anchor, coordinator_targets);
829                        let mut last_dispatched_targets = initial_targets;
830
831                        loop {
832                            loop {
833                                match reached_event_rx.try_recv() {
834                                    Ok((idx, generation)) => state.record_reached(idx, generation),
835                                    Err(mpsc::error::TryRecvError::Empty) => break,
836                                    Err(mpsc::error::TryRecvError::Disconnected) => return None,
837                                }
838                            }
839
840                            if let Some(updates) = tip_updates.as_mut() {
841                                loop {
842                                    match updates.try_recv() {
843                                        Ok(update) => {
844                                            let (anchor, targets) = update.record();
845                                            state.record_tip_update(anchor, targets);
846                                        }
847                                        Err(ring::TryRecvError::Empty) => break,
848                                        Err(ring::TryRecvError::Disconnected) => {
849                                            tip_updates = None;
850                                            break;
851                                        }
852                                    }
853                                }
854                            }
855
856                            match state.next_action() {
857                                CoordinatorAction::Converged { anchor, targets } => {
858                                    $(
859                                        let _ = coordinator_senders.$idx.finish_tx.send_lossy(()).await;
860                                    )+
861                                    return Some((anchor, targets));
862                                }
863                                CoordinatorAction::Dispatch {
864                                    generation,
865                                    targets: dispatch_targets,
866                                } => {
867                                    $(
868                                        let dispatch_target = dispatch_targets.$idx.clone();
869                                        if !coordinator_senders.$idx
870                                            .generation_tx
871                                            .send_lossy((generation, dispatch_target.clone()))
872                                            .await
873                                        {
874                                            return None;
875                                        }
876                                        if state.should_dispatch($idx) {
877                                            if dispatch_target != last_dispatched_targets.$idx {
878                                                if !coordinator_senders.$idx
879                                                    .target_tx
880                                                    .send_lossy(dispatch_target.clone())
881                                                    .await
882                                                {
883                                                    return None;
884                                                }
885                                                last_dispatched_targets.$idx = dispatch_target;
886                                            }
887                                        } else if dispatch_target == last_dispatched_targets.$idx {
888                                            state.mark_reached_same_target($idx, generation);
889                                        }
890                                    )+
891                                    continue;
892                                }
893                                CoordinatorAction::Wait => {}
894                            }
895
896                            let update_future = tip_updates.as_mut().map_or_else(
897                                || Either::Right(pending()),
898                                |updates| Either::Left(updates.recv()),
899                            );
900                            select! {
901                                reached_event = reached_event_rx.recv() => {
902                                    let (idx, generation) = reached_event?;
903                                    state.record_reached(idx, generation);
904                                },
905                                _ = completion_rx.recv() => {
906                                    drop(coordinator_owned_senders);
907                                    return None;
908                                },
909                                update = update_future => {
910                                    let Some(update) = update else {
911                                        tip_updates = None;
912                                        continue;
913                                    };
914                                    let (anchor, targets) = update.record();
915                                    state.record_tip_update(anchor, targets);
916                                },
917                            };
918                        }
919                    }
920                });
921                let db_handles = (
922                    $(
923                        context.child(concat!("db_", stringify!($idx))).spawn({
924                            let first_db_error = first_db_error.clone();
925                            let mut reached_target_rx = db_channels.$idx.reached_rx;
926                            let mut generation_rx = Some(db_channels.$idx.generation_rx);
927                            let mut current_generation = 0usize;
928                            let mut current_target = targets.$idx.clone();
929                            let mut last_reached_target = None;
930                            let mut last_reported_generation = None;
931                            let reached_event_sender = reached_event_tx.clone();
932                            let completion_signal = completion_tx.clone();
933                            let config = config.$idx;
934                            let resolver = resolvers.$idx;
935                            let target = targets.$idx;
936                            let target_rx = db_channels.$idx.target_rx;
937                            let finish_rx = db_channels.$idx.finish_rx;
938                            let reached_tx = db_channels.$idx.reached_tx;
939                            move |context| async move {
940                                let sync = $T::sync_db(
941                                    context,
942                                    config,
943                                    resolver,
944                                    target,
945                                    target_rx,
946                                    Some(finish_rx),
947                                    Some(reached_tx),
948                                    sync_config,
949                                );
950                                let forward_reached = async move {
951                                    loop {
952                                        drain_generation_updates(
953                                            &mut generation_rx,
954                                            &mut current_generation,
955                                            &mut current_target,
956                                            &last_reached_target,
957                                            &mut last_reported_generation,
958                                            &reached_event_sender,
959                                            $idx,
960                                        )
961                                        .await;
962
963                                        let update_future = generation_rx.as_mut().map_or_else(
964                                            || Either::Right(pending()),
965                                            |updates| Either::Left(updates.recv()),
966                                        );
967                                        select! {
968                                            reached_target = reached_target_rx.recv() => {
969                                                let Some(reached_target) = reached_target else {
970                                                    return;
971                                                };
972
973                                                last_reached_target = Some(reached_target.clone());
974                                                drain_generation_updates(
975                                                    &mut generation_rx,
976                                                    &mut current_generation,
977                                                    &mut current_target,
978                                                    &last_reached_target,
979                                                    &mut last_reported_generation,
980                                                    &reached_event_sender,
981                                                    $idx,
982                                                )
983                                                .await;
984
985                                                if reached_target != current_target {
986                                                    continue;
987                                                }
988
989                                                if last_reported_generation != Some(current_generation) {
990                                                    if !reached_event_sender
991                                                        .send_lossy(($idx, current_generation))
992                                                        .await
993                                                    {
994                                                        return;
995                                                    }
996                                                    last_reported_generation = Some(current_generation);
997                                                }
998                                            },
999                                            update = update_future => {
1000                                                let Some((generation, target)) = update else {
1001                                                    generation_rx = None;
1002                                                    continue;
1003                                                };
1004                                                current_generation = generation;
1005                                                current_target = target;
1006                                                if last_reached_target.as_ref() == Some(&current_target)
1007                                                    && last_reported_generation != Some(current_generation)
1008                                                {
1009                                                    if !reached_event_sender
1010                                                        .send_lossy(($idx, current_generation))
1011                                                        .await
1012                                                    {
1013                                                        return;
1014                                                    }
1015                                                    last_reported_generation = Some(current_generation);
1016                                                }
1017                                            },
1018                                        };
1019                                    }
1020                                };
1021                                let (sync_result, _) = join!(sync, forward_reached);
1022                                let result = sync_result
1023                                    .map(|database| Arc::new(AsyncRwLock::new(database)))
1024                                    .map_err(|err| {
1025                                        format!(
1026                                            "state sync failed (index {}, db {}): {err:?}",
1027                                            $idx,
1028                                            core::any::type_name::<$T>(),
1029                                        )
1030                                    });
1031                                if let Err(err) = &result {
1032                                    let mut first = first_db_error.lock();
1033                                    if first.is_none() {
1034                                        *first = Some(err.clone());
1035                                    }
1036                                }
1037                                let _ = completion_signal.send_lossy(()).await;
1038                                result
1039                            }
1040                        }),
1041                    )+
1042                );
1043
1044                let synced = join!(
1045                    $(
1046                        async {
1047                            db_handles.$idx
1048                                .await
1049                                .expect("state sync database task exited")
1050                        },
1051                    )+
1052                );
1053                let converged_anchor = coordinator_handle
1054                    .await
1055                    .expect("state sync coordinator task exited");
1056
1057                if let Some(err) = first_db_error.lock().take() {
1058                    return Err(err);
1059                }
1060
1061                let synced = ($(synced.$idx?,)+);
1062                let Some((converged_anchor, converged_targets)) = converged_anchor else {
1063                    return Err("state sync coordinator did not report a converged anchor".into());
1064                };
1065                if <Self as DatabaseSet<E>>::committed_targets(&synced).await != converged_targets {
1066                    return Err(
1067                        "state sync database targets do not match the coordinator target set"
1068                            .into(),
1069                    );
1070                }
1071
1072                Ok((synced, converged_anchor))
1073            }
1074        }
1075    };
1076}
1077
1078impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1);
1079impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2);
1080impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3);
1081impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4);
1082impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4, DB6: R6: 5);
1083impl_state_sync_set!(
1084    DB1: R1: 0,
1085    DB2: R2: 1,
1086    DB3: R3: 2,
1087    DB4: R4: 3,
1088    DB5: R5: 4,
1089    DB6: R6: 5,
1090    DB7: R7: 6
1091);
1092impl_state_sync_set!(
1093    DB1: R1: 0,
1094    DB2: R2: 1,
1095    DB3: R3: 2,
1096    DB4: R4: 3,
1097    DB5: R5: 4,
1098    DB6: R6: 5,
1099    DB7: R7: 6,
1100    DB8: R8: 7
1101);
1102
1103async fn drain_generation_updates<T>(
1104    generation_rx: &mut Option<mpsc::Receiver<(usize, T)>>,
1105    current_generation: &mut usize,
1106    current_target: &mut T,
1107    last_reached_target: &Option<T>,
1108    last_reported_generation: &mut Option<usize>,
1109    reached_event_sender: &mpsc::Sender<(usize, usize)>,
1110    idx: usize,
1111) where
1112    T: Clone + PartialEq,
1113{
1114    if let Some(updates) = generation_rx.as_mut() {
1115        let mut drained = 0usize;
1116        loop {
1117            match updates.try_recv() {
1118                Ok((generation, target)) => {
1119                    drained += 1;
1120                    *current_generation = generation;
1121                    *current_target = target;
1122
1123                    if last_reached_target.as_ref() == Some(current_target)
1124                        && *last_reported_generation != Some(*current_generation)
1125                    {
1126                        if !reached_event_sender
1127                            .send_lossy((idx, *current_generation))
1128                            .await
1129                        {
1130                            return;
1131                        }
1132                        *last_reported_generation = Some(*current_generation);
1133                    }
1134                    if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) {
1135                        reschedule().await;
1136                    }
1137                }
1138                Err(mpsc::error::TryRecvError::Empty) => break,
1139                Err(mpsc::error::TryRecvError::Disconnected) => {
1140                    *generation_rx = None;
1141                    break;
1142                }
1143            }
1144        }
1145    }
1146}
1147
1148/// Per-database sync tracking state.
1149#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1150enum DbSyncState {
1151    /// Database is still syncing toward its assigned generation's targets.
1152    Seeking { generation: usize },
1153    /// Database reported it reached its assigned generation's targets.
1154    Reached { generation: usize },
1155}
1156
1157impl DbSyncState {
1158    const fn generation(self) -> usize {
1159        match self {
1160            Self::Seeking { generation } | Self::Reached { generation } => generation,
1161        }
1162    }
1163
1164    const fn is_reached(self) -> bool {
1165        matches!(self, Self::Reached { .. })
1166    }
1167}
1168
1169/// What the coordinator should do after processing events.
1170enum CoordinatorAction<D: Digest, T> {
1171    /// Nothing to do; wait for the next event.
1172    Wait,
1173    /// Dispatch targets to non-reached databases for `generation`.
1174    Dispatch { generation: usize, targets: T },
1175    /// All databases converged on the same generation.
1176    Converged { anchor: Anchor<D>, targets: T },
1177}
1178
1179/// Pure state machine for multi-database sync convergence.
1180///
1181/// Tracks which generation each database is assigned to, which have
1182/// reported "reached", and decides when to regroup or declare
1183/// convergence.
1184struct CoordinatorState<D: Digest, T> {
1185    dbs: Vec<DbSyncState>,
1186    generation_state: BTreeMap<usize, (Anchor<D>, T)>,
1187    current_generation: usize,
1188    latest_tip: Option<(Anchor<D>, T)>,
1189    last_dispatched_anchor: Anchor<D>,
1190}
1191
1192impl<D: Digest, T: Clone> CoordinatorState<D, T> {
1193    fn new(db_count: usize, anchor: Anchor<D>, targets: T) -> Self {
1194        let dbs = vec![DbSyncState::Seeking { generation: 0 }; db_count];
1195        let mut generation_state = BTreeMap::new();
1196        generation_state.insert(0, (anchor, targets));
1197        Self {
1198            dbs,
1199            generation_state,
1200            current_generation: 0,
1201            latest_tip: None,
1202            last_dispatched_anchor: anchor,
1203        }
1204    }
1205
1206    /// Record that database `idx` reached `generation`.
1207    ///
1208    /// Reached events can arrive late. If the database has already been
1209    /// re-assigned to a newer generation, stale events are ignored.
1210    fn record_reached(&mut self, idx: usize, generation: usize) {
1211        if self.dbs[idx].generation() != generation {
1212            return;
1213        }
1214        if self.dbs[idx].is_reached() {
1215            return;
1216        }
1217        self.dbs[idx] = DbSyncState::Reached { generation };
1218    }
1219
1220    /// Record a new tip update.
1221    ///
1222    /// Sync targets must move strictly forward. Ignore stale and duplicate
1223    /// anchors to avoid dispatching backward targets.
1224    fn record_tip_update(&mut self, anchor: Anchor<D>, targets: T) {
1225        let current_height = self
1226            .latest_tip
1227            .as_ref()
1228            .map_or(self.last_dispatched_anchor.height, |(latest_anchor, _)| {
1229                latest_anchor.height
1230            });
1231        if anchor.height <= current_height {
1232            return;
1233        }
1234        self.latest_tip = Some((anchor, targets));
1235    }
1236
1237    /// Determine the next action. Mutates internal state for regroup/dispatch.
1238    ///
1239    /// Returns which database indices should receive targets via
1240    /// `dbs[idx].is_reached() == false` after a `Dispatch` action.
1241    fn next_action(&mut self) -> CoordinatorAction<D, T> {
1242        let all_reached = self.dbs.iter().all(|db| db.is_reached());
1243
1244        if all_reached {
1245            let min_gen = self.dbs.iter().map(|db| db.generation()).min().unwrap();
1246            let max_gen = self.dbs.iter().map(|db| db.generation()).max().unwrap();
1247
1248            if min_gen == max_gen {
1249                if let Some((anchor, targets)) = self.latest_tip.take() {
1250                    let generation = self.current_generation + 1;
1251                    self.current_generation = generation;
1252                    for db in &mut self.dbs {
1253                        *db = DbSyncState::Seeking { generation };
1254                    }
1255                    self.generation_state
1256                        .insert(generation, (anchor, targets.clone()));
1257                    self.last_dispatched_anchor = anchor;
1258                    self.prune_generations();
1259                    return CoordinatorAction::Dispatch {
1260                        generation,
1261                        targets,
1262                    };
1263                }
1264
1265                let (anchor, targets) = self
1266                    .generation_state
1267                    .get(&min_gen)
1268                    .expect("missing state for converged generation")
1269                    .clone();
1270                return CoordinatorAction::Converged { anchor, targets };
1271            }
1272
1273            // Regroup: reset behind databases to seek the highest generation.
1274            let (_anchor, targets) = self
1275                .generation_state
1276                .get(&max_gen)
1277                .expect("missing state for regroup generation")
1278                .clone();
1279            for db in &mut self.dbs {
1280                if db.generation() != max_gen {
1281                    *db = DbSyncState::Seeking {
1282                        generation: max_gen,
1283                    };
1284                }
1285            }
1286            self.prune_generations();
1287            return CoordinatorAction::Dispatch {
1288                generation: max_gen,
1289                targets,
1290            };
1291        }
1292
1293        // Not all reached. If there's a pending tip, dispatch it.
1294        let Some((anchor, targets)) = self.latest_tip.take() else {
1295            return CoordinatorAction::Wait;
1296        };
1297
1298        let generation = self.current_generation + 1;
1299        self.current_generation = generation;
1300        for db in &mut self.dbs {
1301            if !db.is_reached() {
1302                *db = DbSyncState::Seeking { generation };
1303            }
1304        }
1305        self.generation_state
1306            .insert(generation, (anchor, targets.clone()));
1307        self.last_dispatched_anchor = anchor;
1308
1309        self.prune_generations();
1310        CoordinatorAction::Dispatch {
1311            generation,
1312            targets,
1313        }
1314    }
1315
1316    /// Retain only generations referenced by at least one database.
1317    fn prune_generations(&mut self) {
1318        self.generation_state
1319            .retain(|gen, _| self.dbs.iter().any(|db| db.generation() == *gen));
1320    }
1321
1322    /// Whether database `idx` is a non-reached recipient for dispatch.
1323    fn should_dispatch(&self, idx: usize) -> bool {
1324        !self.dbs[idx].is_reached()
1325    }
1326
1327    /// Advance a reached database to `generation` when its target is unchanged.
1328    fn mark_reached_same_target(&mut self, idx: usize, generation: usize) {
1329        if !self.dbs[idx].is_reached() {
1330            return;
1331        }
1332        self.dbs[idx] = DbSyncState::Reached { generation };
1333    }
1334}
1335
1336async fn finalize_or_panic<E, T: ManagedDb<E>>(
1337    database: &mut T,
1338    batch: T::Merkleized,
1339    index: Option<usize>,
1340) {
1341    // Mutable finalize failures are fatal by design because other databases in
1342    // the same set may already have committed, leaving partially applied state.
1343    if let Err(err) = database.finalize(batch).await {
1344        match index {
1345            Some(index) => panic!(
1346                "database finalize failed (index {index}, type {}): {err:?}",
1347                core::any::type_name::<T>(),
1348            ),
1349            None => panic!(
1350                "database finalize failed (type {}): {err:?}",
1351                core::any::type_name::<T>(),
1352            ),
1353        }
1354    }
1355}
1356
1357async fn rewind_or_panic<E, T: ManagedDb<E>>(
1358    database: &mut T,
1359    target: T::SyncTarget,
1360    index: Option<usize>,
1361) {
1362    // Mutable rewind failures are fatal by design because the database handle
1363    // may be internally diverged after a failed rewind.
1364    if let Err(err) = database.rewind_to_target(target).await {
1365        match index {
1366            Some(index) => panic!(
1367                "database rewind failed (index {index}, type {}): {err:?}",
1368                core::any::type_name::<T>(),
1369            ),
1370            None => panic!(
1371                "database rewind failed (type {}): {err:?}",
1372                core::any::type_name::<T>(),
1373            ),
1374        }
1375    }
1376}
1377
1378/// A resolver that can attach a database at runtime.
1379///
1380/// Implementations receive a database handle after startup so they can
1381/// serve incoming sync requests once the database is initialized.
1382pub trait AttachableResolver<DB>: Clone + Send + Sync + 'static {
1383    /// Attach a database for serving incoming requests.
1384    fn attach_database(&self, db: Arc<AsyncRwLock<DB>>) -> impl Future<Output = ()> + Send;
1385}
1386
1387/// Attach a database set to a resolver set with matching shape.
1388pub trait AttachableResolverSet<DBs>: Clone + Send + Sync + 'static {
1389    /// Attach all databases to their corresponding resolvers.
1390    fn attach_databases(&self, databases: DBs) -> impl Future<Output = ()> + Send;
1391}
1392
1393impl<R, DB> AttachableResolverSet<Arc<AsyncRwLock<DB>>> for R
1394where
1395    R: AttachableResolver<DB>,
1396    DB: Send + Sync + 'static,
1397{
1398    async fn attach_databases(&self, db: Arc<AsyncRwLock<DB>>) {
1399        self.attach_database(db).await;
1400    }
1401}
1402
1403macro_rules! impl_attachable_resolver_set {
1404    ($($R:ident : $DB:ident : $idx:tt),+) => {
1405        impl<$($R, $DB),+> AttachableResolverSet<($(Arc<AsyncRwLock<$DB>>,)+)> for ($($R,)+)
1406        where
1407            $(
1408                $R: AttachableResolver<$DB>,
1409                $DB: Send + Sync + 'static,
1410            )+
1411        {
1412            async fn attach_databases(&self, databases: ($(Arc<AsyncRwLock<$DB>>,)+)) {
1413                futures::join!($(
1414                    self.$idx.attach_database(databases.$idx),
1415                )+);
1416            }
1417        }
1418    };
1419}
1420
1421impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1);
1422impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2);
1423impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3);
1424impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3, R5: DB5: 4);
1425impl_attachable_resolver_set!(
1426    R1: DB1: 0,
1427    R2: DB2: 1,
1428    R3: DB3: 2,
1429    R4: DB4: 3,
1430    R5: DB5: 4,
1431    R6: DB6: 5
1432);
1433impl_attachable_resolver_set!(
1434    R1: DB1: 0,
1435    R2: DB2: 1,
1436    R3: DB3: 2,
1437    R4: DB4: 3,
1438    R5: DB5: 4,
1439    R6: DB6: 5,
1440    R7: DB7: 6
1441);
1442impl_attachable_resolver_set!(
1443    R1: DB1: 0,
1444    R2: DB2: 1,
1445    R3: DB3: 2,
1446    R4: DB4: 3,
1447    R5: DB5: 4,
1448    R6: DB6: 5,
1449    R7: DB7: 6,
1450    R8: DB8: 7
1451);
1452
1453#[cfg(test)]
1454mod tests {
1455    use super::{
1456        assert_rewind_window_safety, drain_single_tip_updates, Anchor, AttachableResolver,
1457        AttachableResolverSet, CoordinatorAction, CoordinatorState, DatabaseSet, ManagedDb,
1458        StateSyncDb, StateSyncSet, SyncEngineConfig, TipUpdate, MAX_CHANNEL_DRAIN_PER_TICK,
1459    };
1460    use crate::stateful::tests::mocks::{anchor as mock_anchor, TestMerkleized, TestUnmerkleized};
1461    use commonware_cryptography::sha256;
1462    use commonware_macros::select;
1463    use commonware_runtime::{
1464        deterministic, reschedule, Clock, Runner as _, Spawner as _, Supervisor as _,
1465    };
1466    use commonware_utils::{
1467        channel::{mpsc, oneshot, ring},
1468        sync::AsyncRwLock,
1469    };
1470    use futures::{pin_mut, FutureExt, SinkExt};
1471    use std::{
1472        convert::Infallible,
1473        num::{NonZeroU64, NonZeroUsize},
1474        sync::{
1475            atomic::{AtomicBool, AtomicUsize, Ordering},
1476            Arc,
1477        },
1478        time::Duration,
1479    };
1480
1481    #[derive(Default)]
1482    struct TestDb;
1483
1484    #[derive(Default)]
1485    struct OneStepRewindDb;
1486
1487    #[derive(Default)]
1488    struct ThreeStepRewindDb;
1489
1490    struct CountingRewindDb {
1491        current_target: u64,
1492        rewind_count: usize,
1493    }
1494
1495    impl<E: Send> ManagedDb<E> for TestDb {
1496        type Unmerkleized = TestUnmerkleized;
1497        type Merkleized = TestMerkleized;
1498        type Error = Infallible;
1499        type Config = ();
1500        type SyncTarget = ();
1501
1502        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1503            Ok(Self)
1504        }
1505
1506        async fn new_batch(db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1507            let _guard = db.read().await;
1508            TestUnmerkleized
1509        }
1510
1511        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1512            true
1513        }
1514
1515        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1516            Ok(())
1517        }
1518
1519        async fn sync_target(&self) -> Self::SyncTarget {}
1520
1521        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1522            Ok(())
1523        }
1524    }
1525
1526    impl<E: Send> ManagedDb<E> for OneStepRewindDb {
1527        type Unmerkleized = TestUnmerkleized;
1528        type Merkleized = TestMerkleized;
1529        type Error = Infallible;
1530        type Config = ();
1531        type SyncTarget = ();
1532
1533        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1534            Ok(Self)
1535        }
1536
1537        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1538            TestUnmerkleized
1539        }
1540
1541        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1542            true
1543        }
1544
1545        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1546            Ok(())
1547        }
1548
1549        async fn sync_target(&self) -> Self::SyncTarget {}
1550
1551        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1552            Ok(())
1553        }
1554
1555        fn max_rewind_depth() -> Option<usize> {
1556            Some(1)
1557        }
1558    }
1559
1560    impl<E: Send> ManagedDb<E> for ThreeStepRewindDb {
1561        type Unmerkleized = TestUnmerkleized;
1562        type Merkleized = TestMerkleized;
1563        type Error = Infallible;
1564        type Config = ();
1565        type SyncTarget = ();
1566
1567        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1568            Ok(Self)
1569        }
1570
1571        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1572            TestUnmerkleized
1573        }
1574
1575        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1576            true
1577        }
1578
1579        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1580            Ok(())
1581        }
1582
1583        async fn sync_target(&self) -> Self::SyncTarget {}
1584
1585        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1586            Ok(())
1587        }
1588
1589        fn max_rewind_depth() -> Option<usize> {
1590            Some(3)
1591        }
1592    }
1593
1594    impl<E: Send> ManagedDb<E> for CountingRewindDb {
1595        type Unmerkleized = TestUnmerkleized;
1596        type Merkleized = TestMerkleized;
1597        type Error = Infallible;
1598        type Config = ();
1599        type SyncTarget = u64;
1600
1601        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1602            unreachable!("CountingRewindDb is constructed directly in tests")
1603        }
1604
1605        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1606            TestUnmerkleized
1607        }
1608
1609        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1610            true
1611        }
1612
1613        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1614            Ok(())
1615        }
1616
1617        async fn sync_target(&self) -> Self::SyncTarget {
1618            self.current_target
1619        }
1620
1621        async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Self::Error> {
1622            self.current_target = target;
1623            self.rewind_count += 1;
1624            Ok(())
1625        }
1626    }
1627
1628    struct BlockingFinalizeDb {
1629        started: Option<oneshot::Sender<()>>,
1630        release: Option<oneshot::Receiver<()>>,
1631    }
1632
1633    impl BlockingFinalizeDb {
1634        fn new(started: oneshot::Sender<()>, release: oneshot::Receiver<()>) -> Self {
1635            Self {
1636                started: Some(started),
1637                release: Some(release),
1638            }
1639        }
1640    }
1641
1642    #[derive(Debug)]
1643    struct TestFinalizeError;
1644
1645    struct FailingFinalizeDb;
1646
1647    struct SlowSyncDb {
1648        final_target: u64,
1649    }
1650
1651    struct RejectDuplicateTargetSyncDb {
1652        final_target: u64,
1653    }
1654
1655    struct StaleReachedSyncDb {
1656        final_target: u64,
1657    }
1658
1659    struct FastSyncDb {
1660        final_target: u64,
1661    }
1662
1663    struct ImmediateStateSyncDb;
1664
1665    struct FailingStateSyncDb;
1666
1667    struct MismatchedTargetSyncDb {
1668        final_target: u64,
1669    }
1670
1671    struct FinishClosedSyncDb {
1672        final_target: u64,
1673    }
1674
1675    struct ObservedSlowSyncDb {
1676        final_target: u64,
1677    }
1678
1679    struct ObservedFastSyncDb {
1680        final_target: u64,
1681    }
1682
1683    struct DistinctObservedFastSyncDb {
1684        final_target: u64,
1685    }
1686
1687    #[derive(Clone)]
1688    struct SlowSyncController {
1689        release: Arc<AtomicBool>,
1690    }
1691
1692    #[derive(Clone)]
1693    struct FastSyncObserver {
1694        ready: Arc<AtomicBool>,
1695        update_count: Arc<AtomicUsize>,
1696    }
1697
1698    impl<E: Send> ManagedDb<E> for FailingFinalizeDb {
1699        type Unmerkleized = TestUnmerkleized;
1700        type Merkleized = TestMerkleized;
1701        type Error = TestFinalizeError;
1702        type Config = ();
1703        type SyncTarget = ();
1704
1705        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1706            Ok(Self)
1707        }
1708
1709        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1710            TestUnmerkleized
1711        }
1712
1713        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1714            true
1715        }
1716
1717        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1718            Err(TestFinalizeError)
1719        }
1720
1721        async fn sync_target(&self) -> Self::SyncTarget {}
1722
1723        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1724            Ok(())
1725        }
1726    }
1727
1728    #[test]
1729    fn single_db_set_reports_unbounded_rewind_depth() {
1730        let rewind_depth =
1731            <Arc<AsyncRwLock<TestDb>> as DatabaseSet<deterministic::Context>>::max_rewind_depth();
1732        assert_eq!(rewind_depth, None);
1733    }
1734
1735    #[test]
1736    fn single_db_set_reports_one_step_rewind_depth() {
1737        let rewind_depth = <Arc<AsyncRwLock<OneStepRewindDb>> as DatabaseSet<
1738            deterministic::Context,
1739        >>::max_rewind_depth();
1740        assert_eq!(rewind_depth, Some(1));
1741    }
1742
1743    #[test]
1744    fn tuple_db_set_uses_most_restrictive_finite_rewind_depth() {
1745        type DbSet = (
1746            Arc<AsyncRwLock<TestDb>>,
1747            Arc<AsyncRwLock<ThreeStepRewindDb>>,
1748            Arc<AsyncRwLock<OneStepRewindDb>>,
1749        );
1750
1751        let rewind_depth = <DbSet as DatabaseSet<deterministic::Context>>::max_rewind_depth();
1752        assert_eq!(rewind_depth, Some(1));
1753    }
1754
1755    #[test]
1756    fn rewind_window_assertion_accepts_equal_pending_acks_and_rewind_depth() {
1757        assert_rewind_window_safety::<deterministic::Context, Arc<AsyncRwLock<OneStepRewindDb>>>(
1758            NonZeroUsize::new(1).unwrap(),
1759        );
1760    }
1761
1762    #[test]
1763    #[should_panic(expected = "marshal max_pending_acks=2 exceeds database_set.max_rewind_depth=1")]
1764    fn rewind_window_assertion_panics_when_pending_acks_exceed_rewind_depth() {
1765        assert_rewind_window_safety::<deterministic::Context, Arc<AsyncRwLock<OneStepRewindDb>>>(
1766            NonZeroUsize::new(2).unwrap(),
1767        );
1768    }
1769
1770    #[test]
1771    fn tuple_rewind_to_targets_skips_already_aligned_databases() {
1772        deterministic::Runner::default().start(|_context| async move {
1773            type DbSet = (
1774                Arc<AsyncRwLock<CountingRewindDb>>,
1775                Arc<AsyncRwLock<CountingRewindDb>>,
1776            );
1777
1778            let left = Arc::new(AsyncRwLock::new(CountingRewindDb {
1779                current_target: 2,
1780                rewind_count: 0,
1781            }));
1782            let right = Arc::new(AsyncRwLock::new(CountingRewindDb {
1783                current_target: 1,
1784                rewind_count: 0,
1785            }));
1786            let databases: DbSet = (left.clone(), right.clone());
1787
1788            <DbSet as DatabaseSet<deterministic::Context>>::rewind_to_targets(&databases, (1, 1))
1789                .await;
1790
1791            let left = left.read().await;
1792            assert_eq!(left.current_target, 1);
1793            assert_eq!(left.rewind_count, 1);
1794
1795            let right = right.read().await;
1796            assert_eq!(right.current_target, 1);
1797            assert_eq!(right.rewind_count, 0);
1798        });
1799    }
1800
1801    impl<E: Send> ManagedDb<E> for BlockingFinalizeDb {
1802        type Unmerkleized = TestUnmerkleized;
1803        type Merkleized = TestMerkleized;
1804        type Error = Infallible;
1805        type Config = ();
1806        type SyncTarget = ();
1807
1808        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1809            unreachable!("BlockingFinalizeDb is constructed directly in tests")
1810        }
1811
1812        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1813            TestUnmerkleized
1814        }
1815
1816        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1817            true
1818        }
1819
1820        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1821            if let Some(started) = self.started.take() {
1822                let _ = started.send(());
1823            }
1824            if let Some(release) = self.release.take() {
1825                let _ = release.await;
1826            }
1827            Ok(())
1828        }
1829
1830        async fn sync_target(&self) -> Self::SyncTarget {}
1831
1832        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1833            Ok(())
1834        }
1835    }
1836
1837    impl<E: Send> ManagedDb<E> for SlowSyncDb {
1838        type Unmerkleized = TestUnmerkleized;
1839        type Merkleized = TestMerkleized;
1840        type Error = Infallible;
1841        type Config = ();
1842        type SyncTarget = u64;
1843
1844        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1845            unreachable!("SlowSyncDb is only constructed through state sync in tests")
1846        }
1847
1848        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1849            TestUnmerkleized
1850        }
1851
1852        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1853            true
1854        }
1855
1856        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1857            Ok(())
1858        }
1859
1860        async fn sync_target(&self) -> Self::SyncTarget {
1861            self.final_target
1862        }
1863
1864        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1865            Ok(())
1866        }
1867    }
1868
1869    impl<E: Send> ManagedDb<E> for RejectDuplicateTargetSyncDb {
1870        type Unmerkleized = TestUnmerkleized;
1871        type Merkleized = TestMerkleized;
1872        type Error = Infallible;
1873        type Config = ();
1874        type SyncTarget = u64;
1875
1876        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1877            unreachable!(
1878                "RejectDuplicateTargetSyncDb is only constructed through state sync in tests"
1879            )
1880        }
1881
1882        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1883            TestUnmerkleized
1884        }
1885
1886        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1887            true
1888        }
1889
1890        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1891            Ok(())
1892        }
1893
1894        async fn sync_target(&self) -> Self::SyncTarget {
1895            self.final_target
1896        }
1897
1898        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1899            Ok(())
1900        }
1901    }
1902
1903    impl<E: Send> ManagedDb<E> for FastSyncDb {
1904        type Unmerkleized = TestUnmerkleized;
1905        type Merkleized = TestMerkleized;
1906        type Error = Infallible;
1907        type Config = ();
1908        type SyncTarget = u64;
1909
1910        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1911            unreachable!("FastSyncDb is only constructed through state sync in tests")
1912        }
1913
1914        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1915            TestUnmerkleized
1916        }
1917
1918        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1919            true
1920        }
1921
1922        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1923            Ok(())
1924        }
1925
1926        async fn sync_target(&self) -> Self::SyncTarget {
1927            self.final_target
1928        }
1929
1930        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1931            Ok(())
1932        }
1933    }
1934
1935    impl<E: Send> ManagedDb<E> for FailingStateSyncDb {
1936        type Unmerkleized = TestUnmerkleized;
1937        type Merkleized = TestMerkleized;
1938        type Error = Infallible;
1939        type Config = ();
1940        type SyncTarget = u64;
1941
1942        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1943            unreachable!("FailingStateSyncDb is only constructed through state sync in tests")
1944        }
1945
1946        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1947            TestUnmerkleized
1948        }
1949
1950        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1951            true
1952        }
1953
1954        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1955            Ok(())
1956        }
1957
1958        async fn sync_target(&self) -> Self::SyncTarget {
1959            0
1960        }
1961
1962        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1963            Ok(())
1964        }
1965    }
1966
1967    impl<E: Send> ManagedDb<E> for MismatchedTargetSyncDb {
1968        type Unmerkleized = TestUnmerkleized;
1969        type Merkleized = TestMerkleized;
1970        type Error = Infallible;
1971        type Config = ();
1972        type SyncTarget = u64;
1973
1974        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
1975            unreachable!("MismatchedTargetSyncDb is only constructed through state sync in tests")
1976        }
1977
1978        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
1979            TestUnmerkleized
1980        }
1981
1982        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
1983            true
1984        }
1985
1986        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
1987            Ok(())
1988        }
1989
1990        async fn sync_target(&self) -> Self::SyncTarget {
1991            self.final_target
1992        }
1993
1994        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
1995            Ok(())
1996        }
1997    }
1998
1999    impl<E: Send> ManagedDb<E> for ImmediateStateSyncDb {
2000        type Unmerkleized = TestUnmerkleized;
2001        type Merkleized = TestMerkleized;
2002        type Error = Infallible;
2003        type Config = ();
2004        type SyncTarget = u64;
2005
2006        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2007            unreachable!("ImmediateStateSyncDb is only constructed through state sync in tests")
2008        }
2009
2010        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2011            TestUnmerkleized
2012        }
2013
2014        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2015            true
2016        }
2017
2018        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2019            Ok(())
2020        }
2021
2022        async fn sync_target(&self) -> Self::SyncTarget {
2023            0
2024        }
2025
2026        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2027            Ok(())
2028        }
2029    }
2030
2031    impl<E: Send> ManagedDb<E> for FinishClosedSyncDb {
2032        type Unmerkleized = TestUnmerkleized;
2033        type Merkleized = TestMerkleized;
2034        type Error = Infallible;
2035        type Config = ();
2036        type SyncTarget = u64;
2037
2038        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2039            unreachable!("FinishClosedSyncDb is only constructed through state sync in tests")
2040        }
2041
2042        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2043            TestUnmerkleized
2044        }
2045
2046        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2047            true
2048        }
2049
2050        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2051            Ok(())
2052        }
2053
2054        async fn sync_target(&self) -> Self::SyncTarget {
2055            self.final_target
2056        }
2057
2058        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2059            Ok(())
2060        }
2061    }
2062
2063    impl<E: Send> ManagedDb<E> for ObservedSlowSyncDb {
2064        type Unmerkleized = TestUnmerkleized;
2065        type Merkleized = TestMerkleized;
2066        type Error = Infallible;
2067        type Config = ();
2068        type SyncTarget = u64;
2069
2070        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2071            unreachable!("ObservedSlowSyncDb is only constructed through state sync in tests")
2072        }
2073
2074        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2075            TestUnmerkleized
2076        }
2077
2078        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2079            true
2080        }
2081
2082        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2083            Ok(())
2084        }
2085
2086        async fn sync_target(&self) -> Self::SyncTarget {
2087            self.final_target
2088        }
2089
2090        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2091            Ok(())
2092        }
2093    }
2094
2095    impl<E: Send> ManagedDb<E> for ObservedFastSyncDb {
2096        type Unmerkleized = TestUnmerkleized;
2097        type Merkleized = TestMerkleized;
2098        type Error = Infallible;
2099        type Config = ();
2100        type SyncTarget = u64;
2101
2102        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2103            unreachable!("ObservedFastSyncDb is only constructed through state sync in tests")
2104        }
2105
2106        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2107            TestUnmerkleized
2108        }
2109
2110        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2111            true
2112        }
2113
2114        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2115            Ok(())
2116        }
2117
2118        async fn sync_target(&self) -> Self::SyncTarget {
2119            self.final_target
2120        }
2121
2122        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2123            Ok(())
2124        }
2125    }
2126
2127    impl<E: Send> ManagedDb<E> for DistinctObservedFastSyncDb {
2128        type Unmerkleized = TestUnmerkleized;
2129        type Merkleized = TestMerkleized;
2130        type Error = Infallible;
2131        type Config = ();
2132        type SyncTarget = u64;
2133
2134        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2135            unreachable!(
2136                "DistinctObservedFastSyncDb is only constructed through state sync in tests"
2137            )
2138        }
2139
2140        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2141            TestUnmerkleized
2142        }
2143
2144        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2145            true
2146        }
2147
2148        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2149            Ok(())
2150        }
2151
2152        async fn sync_target(&self) -> Self::SyncTarget {
2153            self.final_target
2154        }
2155
2156        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2157            Ok(())
2158        }
2159    }
2160
2161    impl<E> StateSyncDb<E, Arc<AtomicBool>> for SlowSyncDb
2162    where
2163        E: Send + Clock,
2164    {
2165        type SyncError = Infallible;
2166
2167        async fn sync_db(
2168            context: E,
2169            _config: Self::Config,
2170            release: Arc<AtomicBool>,
2171            target: Self::SyncTarget,
2172            tip_updates: mpsc::Receiver<Self::SyncTarget>,
2173            mut finish: Option<mpsc::Receiver<()>>,
2174            reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2175            _sync_config: SyncEngineConfig,
2176        ) -> Result<Self, Self::SyncError> {
2177            while !release.load(Ordering::SeqCst) {
2178                context.sleep(Duration::from_millis(1)).await;
2179            }
2180            let mut final_target = target;
2181            let mut tip_updates = Some(tip_updates);
2182
2183            loop {
2184                if let Some(reached_target) = reached_target.as_ref() {
2185                    if reached_target.send(final_target).await.is_err() {
2186                        break;
2187                    }
2188                }
2189
2190                context.sleep(Duration::from_millis(1)).await;
2191
2192                if finish.is_none() && tip_updates.is_none() {
2193                    break;
2194                }
2195
2196                let finish_signal = finish.as_mut().map_or_else(
2197                    || futures::future::Either::Right(futures::future::pending()),
2198                    |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2199                );
2200                let update_signal = tip_updates.as_mut().map_or_else(
2201                    || futures::future::Either::Right(futures::future::pending()),
2202                    |update_rx| futures::future::Either::Left(update_rx.recv()),
2203                );
2204
2205                select! {
2206                    _ = finish_signal => {
2207                        break;
2208                    },
2209                    update = update_signal => match update {
2210                        Some(update) => {
2211                            final_target = update;
2212                        }
2213                        None => {
2214                            tip_updates = None;
2215                            if finish.is_none() {
2216                                break;
2217                            }
2218                        }
2219                    },
2220                }
2221            }
2222
2223            Ok(Self { final_target })
2224        }
2225    }
2226
2227    impl<E> StateSyncDb<E, Arc<AtomicBool>> for RejectDuplicateTargetSyncDb
2228    where
2229        E: Send + Clock,
2230    {
2231        type SyncError = Infallible;
2232
2233        async fn sync_db(
2234            context: E,
2235            _config: Self::Config,
2236            release: Arc<AtomicBool>,
2237            target: Self::SyncTarget,
2238            mut tip_updates: mpsc::Receiver<Self::SyncTarget>,
2239            mut finish: Option<mpsc::Receiver<()>>,
2240            reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2241            _sync_config: SyncEngineConfig,
2242        ) -> Result<Self, Self::SyncError> {
2243            let mut final_target = target;
2244            while !release.load(Ordering::SeqCst) {
2245                match tip_updates.try_recv() {
2246                    Ok(update) => {
2247                        assert_ne!(
2248                            update, final_target,
2249                            "state sync must not send duplicate target updates"
2250                        );
2251                        final_target = update;
2252                    }
2253                    Err(mpsc::error::TryRecvError::Empty) => {}
2254                    Err(mpsc::error::TryRecvError::Disconnected) => break,
2255                }
2256                context.sleep(Duration::from_millis(1)).await;
2257            }
2258
2259            if let Some(reached_target) = reached_target.as_ref() {
2260                let _ = reached_target.send(final_target).await;
2261            }
2262            if let Some(finish_rx) = finish.as_mut() {
2263                let _ = finish_rx.recv().await;
2264            }
2265
2266            Ok(Self { final_target })
2267        }
2268    }
2269
2270    impl<E: Send> ManagedDb<E> for StaleReachedSyncDb {
2271        type Unmerkleized = TestUnmerkleized;
2272        type Merkleized = TestMerkleized;
2273        type Error = Infallible;
2274        type Config = ();
2275        type SyncTarget = u64;
2276
2277        async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
2278            unreachable!("StaleReachedSyncDb is only constructed through state sync in tests")
2279        }
2280
2281        async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
2282            TestUnmerkleized
2283        }
2284
2285        fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
2286            true
2287        }
2288
2289        async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
2290            Ok(())
2291        }
2292
2293        async fn sync_target(&self) -> Self::SyncTarget {
2294            self.final_target
2295        }
2296
2297        async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
2298            Ok(())
2299        }
2300    }
2301
2302    impl<E> StateSyncDb<E, ()> for StaleReachedSyncDb
2303    where
2304        E: Send + Clock,
2305    {
2306        type SyncError = Infallible;
2307
2308        async fn sync_db(
2309            context: E,
2310            _config: Self::Config,
2311            _resolver: (),
2312            target: Self::SyncTarget,
2313            mut tip_updates: mpsc::Receiver<Self::SyncTarget>,
2314            mut finish: Option<mpsc::Receiver<()>>,
2315            reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2316            _sync_config: SyncEngineConfig,
2317        ) -> Result<Self, Self::SyncError> {
2318            let update = tip_updates.recv().await.expect("expected forwarded tip");
2319            if let Some(reached_target) = reached_target.as_ref() {
2320                let _ = reached_target.send(target).await;
2321            }
2322
2323            let finish_signal = finish.as_mut().map_or_else(
2324                || futures::future::Either::Right(futures::future::pending()),
2325                |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2326            );
2327            select! {
2328                _ = finish_signal => Ok(Self {
2329                    final_target: target
2330                }),
2331                _ = context.sleep(Duration::from_millis(10)) => {
2332                    if let Some(reached_target) = reached_target.as_ref() {
2333                        let _ = reached_target.send(update).await;
2334                    }
2335                    if let Some(finish_rx) = finish.as_mut() {
2336                        let _ = finish_rx.recv().await;
2337                    }
2338                    Ok(Self {
2339                        final_target: update,
2340                    })
2341                },
2342            }
2343        }
2344    }
2345
2346    impl<E: Send> StateSyncDb<E, Arc<AtomicBool>> for FastSyncDb {
2347        type SyncError = Infallible;
2348
2349        async fn sync_db(
2350            _context: E,
2351            _config: Self::Config,
2352            done: Arc<AtomicBool>,
2353            target: Self::SyncTarget,
2354            tip_updates: mpsc::Receiver<Self::SyncTarget>,
2355            mut finish: Option<mpsc::Receiver<()>>,
2356            reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2357            _sync_config: SyncEngineConfig,
2358        ) -> Result<Self, Self::SyncError> {
2359            done.store(true, Ordering::SeqCst);
2360            let mut final_target = target;
2361            let mut tip_updates = Some(tip_updates);
2362
2363            loop {
2364                if let Some(reached_target) = reached_target.as_ref() {
2365                    if reached_target.send(final_target).await.is_err() {
2366                        break;
2367                    }
2368                }
2369
2370                if finish.is_none() && tip_updates.is_none() {
2371                    break;
2372                }
2373
2374                let finish_signal = finish.as_mut().map_or_else(
2375                    || futures::future::Either::Right(futures::future::pending()),
2376                    |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2377                );
2378                let update_signal = tip_updates.as_mut().map_or_else(
2379                    || futures::future::Either::Right(futures::future::pending()),
2380                    |update_rx| futures::future::Either::Left(update_rx.recv()),
2381                );
2382
2383                select! {
2384                    _ = finish_signal => {
2385                        break;
2386                    },
2387                    update = update_signal => match update {
2388                        Some(update) => {
2389                            final_target = update;
2390                        }
2391                        None => {
2392                            tip_updates = None;
2393                            if finish.is_none() {
2394                                break;
2395                            }
2396                        }
2397                    },
2398                }
2399            }
2400
2401            Ok(Self { final_target })
2402        }
2403    }
2404
2405    #[derive(Debug)]
2406    struct TestSyncError;
2407
2408    #[derive(Debug)]
2409    struct FinishClosedSyncError;
2410
2411    impl<E: Send> StateSyncDb<E, ()> for FailingStateSyncDb {
2412        type SyncError = TestSyncError;
2413
2414        async fn sync_db(
2415            _context: E,
2416            _config: Self::Config,
2417            _resolver: (),
2418            _target: Self::SyncTarget,
2419            _tip_updates: mpsc::Receiver<Self::SyncTarget>,
2420            _finish: Option<mpsc::Receiver<()>>,
2421            _reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2422            _sync_config: SyncEngineConfig,
2423        ) -> Result<Self, Self::SyncError> {
2424            Err(TestSyncError)
2425        }
2426    }
2427
2428    impl<E: Send> StateSyncDb<E, ()> for ImmediateStateSyncDb {
2429        type SyncError = Infallible;
2430
2431        async fn sync_db(
2432            _context: E,
2433            _config: Self::Config,
2434            _resolver: (),
2435            _target: Self::SyncTarget,
2436            _tip_updates: mpsc::Receiver<Self::SyncTarget>,
2437            _finish: Option<mpsc::Receiver<()>>,
2438            _reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2439            _sync_config: SyncEngineConfig,
2440        ) -> Result<Self, Self::SyncError> {
2441            Ok(Self)
2442        }
2443    }
2444
2445    impl<E: Send> StateSyncDb<E, ()> for MismatchedTargetSyncDb {
2446        type SyncError = Infallible;
2447
2448        async fn sync_db(
2449            _context: E,
2450            _config: Self::Config,
2451            _resolver: (),
2452            target: Self::SyncTarget,
2453            _tip_updates: mpsc::Receiver<Self::SyncTarget>,
2454            mut finish: Option<mpsc::Receiver<()>>,
2455            reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2456            _sync_config: SyncEngineConfig,
2457        ) -> Result<Self, Self::SyncError> {
2458            if let Some(reached_target) = reached_target.as_ref() {
2459                let _ = reached_target.send(target).await;
2460            }
2461            if let Some(finish_rx) = finish.as_mut() {
2462                let _ = finish_rx.recv().await;
2463            }
2464            Ok(Self {
2465                final_target: target + 1,
2466            })
2467        }
2468    }
2469
2470    impl<E: Send> StateSyncDb<E, ()> for FinishClosedSyncDb {
2471        type SyncError = FinishClosedSyncError;
2472
2473        async fn sync_db(
2474            _context: E,
2475            _config: Self::Config,
2476            _resolver: (),
2477            target: Self::SyncTarget,
2478            _tip_updates: mpsc::Receiver<Self::SyncTarget>,
2479            mut finish: Option<mpsc::Receiver<()>>,
2480            _reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2481            _sync_config: SyncEngineConfig,
2482        ) -> Result<Self, Self::SyncError> {
2483            let Some(finish_rx) = finish.as_mut() else {
2484                panic!("finish receiver should be provided");
2485            };
2486            match finish_rx.recv().await {
2487                Some(()) => Ok(Self {
2488                    final_target: target,
2489                }),
2490                None => Err(FinishClosedSyncError),
2491            }
2492        }
2493    }
2494
2495    impl<E> StateSyncDb<E, SlowSyncController> for ObservedSlowSyncDb
2496    where
2497        E: Send + Clock,
2498    {
2499        type SyncError = Infallible;
2500
2501        async fn sync_db(
2502            context: E,
2503            _config: Self::Config,
2504            controller: SlowSyncController,
2505            target: Self::SyncTarget,
2506            tip_updates: mpsc::Receiver<Self::SyncTarget>,
2507            mut finish: Option<mpsc::Receiver<()>>,
2508            reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2509            _sync_config: SyncEngineConfig,
2510        ) -> Result<Self, Self::SyncError> {
2511            while !controller.release.load(Ordering::SeqCst) {
2512                context.sleep(Duration::from_millis(1)).await;
2513            }
2514
2515            let mut final_target = target;
2516            let mut tip_updates = Some(tip_updates);
2517            let mut reported_target = None;
2518            let mut observed_update = false;
2519            loop {
2520                if let Some(update_rx) = tip_updates.as_mut() {
2521                    let mut drained = 0usize;
2522                    loop {
2523                        match update_rx.try_recv() {
2524                            Ok(update) => {
2525                                drained += 1;
2526                                final_target = update;
2527                                observed_update = true;
2528                                reported_target = None;
2529                                if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) {
2530                                    reschedule().await;
2531                                }
2532                            }
2533                            Err(mpsc::error::TryRecvError::Empty) => {
2534                                break;
2535                            }
2536                            Err(mpsc::error::TryRecvError::Disconnected) => {
2537                                tip_updates = None;
2538                                break;
2539                            }
2540                        }
2541                    }
2542                }
2543
2544                if observed_update && reported_target != Some(final_target) {
2545                    if let Some(reached_target) = reached_target.as_ref() {
2546                        if reached_target.send(final_target).await.is_err() {
2547                            break;
2548                        }
2549                    }
2550                    reported_target = Some(final_target);
2551                }
2552
2553                if finish.is_none() && tip_updates.is_none() {
2554                    break;
2555                }
2556
2557                let finish_signal = finish.as_mut().map_or_else(
2558                    || futures::future::Either::Right(futures::future::pending()),
2559                    |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2560                );
2561                let update_signal = tip_updates.as_mut().map_or_else(
2562                    || futures::future::Either::Right(futures::future::pending()),
2563                    |update_rx| futures::future::Either::Left(update_rx.recv()),
2564                );
2565
2566                select! {
2567                    _ = finish_signal => {
2568                        break;
2569                    },
2570                    update = update_signal => match update {
2571                        Some(update) => {
2572                            final_target = update;
2573                            observed_update = true;
2574                            reported_target = None;
2575                        }
2576                        None => {
2577                            tip_updates = None;
2578                            if finish.is_none() {
2579                                break;
2580                            }
2581                        }
2582                    },
2583                }
2584            }
2585
2586            Ok(Self { final_target })
2587        }
2588    }
2589
2590    impl<E: Send> StateSyncDb<E, FastSyncObserver> for ObservedFastSyncDb {
2591        type SyncError = Infallible;
2592
2593        async fn sync_db(
2594            _context: E,
2595            _config: Self::Config,
2596            observer: FastSyncObserver,
2597            target: Self::SyncTarget,
2598            tip_updates: mpsc::Receiver<Self::SyncTarget>,
2599            mut finish: Option<mpsc::Receiver<()>>,
2600            reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2601            _sync_config: SyncEngineConfig,
2602        ) -> Result<Self, Self::SyncError> {
2603            let mut final_target = target;
2604            let mut tip_updates = Some(tip_updates);
2605            let mut reported_target = None;
2606            observer.ready.store(true, Ordering::SeqCst);
2607
2608            loop {
2609                if reported_target != Some(final_target) {
2610                    if let Some(reached_target) = reached_target.as_ref() {
2611                        if reached_target.send(final_target).await.is_err() {
2612                            break;
2613                        }
2614                    }
2615                    reported_target = Some(final_target);
2616                }
2617
2618                if finish.is_none() && tip_updates.is_none() {
2619                    break;
2620                }
2621
2622                let finish_signal = finish.as_mut().map_or_else(
2623                    || futures::future::Either::Right(futures::future::pending()),
2624                    |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2625                );
2626                let update_signal = tip_updates.as_mut().map_or_else(
2627                    || futures::future::Either::Right(futures::future::pending()),
2628                    |update_rx| futures::future::Either::Left(update_rx.recv()),
2629                );
2630
2631                select! {
2632                    _ = finish_signal => {
2633                        break;
2634                    },
2635                    update = update_signal => match update {
2636                        Some(update) => {
2637                            observer.update_count.fetch_add(1, Ordering::SeqCst);
2638                            final_target = update;
2639                            reported_target = None;
2640                        }
2641                        None => {
2642                            tip_updates = None;
2643                            if finish.is_none() {
2644                                break;
2645                            }
2646                        }
2647                    },
2648                }
2649            }
2650
2651            Ok(Self { final_target })
2652        }
2653    }
2654
2655    impl<E: Send> StateSyncDb<E, FastSyncObserver> for DistinctObservedFastSyncDb {
2656        type SyncError = Infallible;
2657
2658        async fn sync_db(
2659            _context: E,
2660            _config: Self::Config,
2661            observer: FastSyncObserver,
2662            target: Self::SyncTarget,
2663            tip_updates: mpsc::Receiver<Self::SyncTarget>,
2664            mut finish: Option<mpsc::Receiver<()>>,
2665            reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
2666            _sync_config: SyncEngineConfig,
2667        ) -> Result<Self, Self::SyncError> {
2668            let mut final_target = target;
2669            let mut tip_updates = Some(tip_updates);
2670            let mut reported_target = None;
2671            observer.ready.store(true, Ordering::SeqCst);
2672
2673            loop {
2674                if reported_target != Some(final_target) {
2675                    if let Some(reached_target) = reached_target.as_ref() {
2676                        if reached_target.send(final_target).await.is_err() {
2677                            break;
2678                        }
2679                    }
2680                    reported_target = Some(final_target);
2681                }
2682
2683                if finish.is_none() && tip_updates.is_none() {
2684                    break;
2685                }
2686
2687                let finish_signal = finish.as_mut().map_or_else(
2688                    || futures::future::Either::Right(futures::future::pending()),
2689                    |finish_rx| futures::future::Either::Left(finish_rx.recv()),
2690                );
2691                let update_signal = tip_updates.as_mut().map_or_else(
2692                    || futures::future::Either::Right(futures::future::pending()),
2693                    |update_rx| futures::future::Either::Left(update_rx.recv()),
2694                );
2695
2696                select! {
2697                    _ = finish_signal => {
2698                        break;
2699                    },
2700                    update = update_signal => match update {
2701                        Some(update) => {
2702                            observer.update_count.fetch_add(1, Ordering::SeqCst);
2703                            if update != final_target {
2704                                final_target = update;
2705                                reported_target = None;
2706                            }
2707                        }
2708                        None => {
2709                            tip_updates = None;
2710                            if finish.is_none() {
2711                                break;
2712                            }
2713                        }
2714                    },
2715                }
2716            }
2717
2718            Ok(Self { final_target })
2719        }
2720    }
2721
2722    #[test]
2723    fn tuple_new_batches_queues_reads_concurrently() {
2724        deterministic::Runner::default().start(|_context| async move {
2725            let db1 = Arc::new(AsyncRwLock::new(TestDb));
2726            let db2 = Arc::new(AsyncRwLock::new(TestDb));
2727            let databases = (db1.clone(), db2.clone());
2728
2729            let writer1 = db1.write().await;
2730            let writer2 = db2.write().await;
2731
2732            let new_batches =
2733                <(Arc<AsyncRwLock<TestDb>>, Arc<AsyncRwLock<TestDb>>) as DatabaseSet<
2734                    deterministic::Context,
2735                >>::new_batches(&databases);
2736            pin_mut!(new_batches);
2737            assert!(new_batches.as_mut().now_or_never().is_none());
2738
2739            drop(writer2);
2740            {
2741                let writer2_again = db2.write();
2742                pin_mut!(writer2_again);
2743                assert!(
2744                    writer2_again.as_mut().now_or_never().is_none(),
2745                    "tuple new_batches should queue reads for all databases concurrently"
2746                );
2747            }
2748
2749            drop(writer1);
2750            let _ = new_batches.await;
2751        });
2752    }
2753
2754    #[test]
2755    fn tuple_finalize_runs_databases_in_parallel() {
2756        deterministic::Runner::default().start(|_context| async move {
2757            let (started1_tx, started1_rx) = oneshot::channel();
2758            let (started2_tx, started2_rx) = oneshot::channel();
2759            let (release1_tx, release1_rx) = oneshot::channel();
2760            let (release2_tx, release2_rx) = oneshot::channel();
2761
2762            let databases = (
2763                Arc::new(AsyncRwLock::new(BlockingFinalizeDb::new(
2764                    started1_tx,
2765                    release1_rx,
2766                ))),
2767                Arc::new(AsyncRwLock::new(BlockingFinalizeDb::new(
2768                    started2_tx,
2769                    release2_rx,
2770                ))),
2771            );
2772
2773            let finalize = <(
2774                Arc<AsyncRwLock<BlockingFinalizeDb>>,
2775                Arc<AsyncRwLock<BlockingFinalizeDb>>,
2776            ) as DatabaseSet<deterministic::Context>>::finalize(
2777                &databases,
2778                (TestMerkleized, TestMerkleized),
2779            );
2780            pin_mut!(finalize);
2781            assert!(finalize.as_mut().now_or_never().is_none());
2782
2783            let started1 = started1_rx;
2784            let started2 = started2_rx;
2785            pin_mut!(started1);
2786            pin_mut!(started2);
2787            assert!(matches!(started1.as_mut().now_or_never(), Some(Ok(()))));
2788            assert!(
2789                matches!(started2.as_mut().now_or_never(), Some(Ok(()))),
2790                "tuple finalize should start all database finalizations concurrently"
2791            );
2792
2793            let _ = release1_tx.send(());
2794            let _ = release2_tx.send(());
2795            finalize.await;
2796        });
2797    }
2798
2799    #[test]
2800    #[should_panic(
2801        expected = "database finalize failed (index 1, type commonware_glue::stateful::db::tests::FailingFinalizeDb)"
2802    )]
2803    fn tuple_finalize_panic_identifies_failing_database() {
2804        deterministic::Runner::default().start(|_context| async move {
2805            let databases = (
2806                Arc::new(AsyncRwLock::new(TestDb)),
2807                Arc::new(AsyncRwLock::new(FailingFinalizeDb)),
2808            );
2809            <(
2810                Arc<AsyncRwLock<TestDb>>,
2811                Arc<AsyncRwLock<FailingFinalizeDb>>,
2812            ) as DatabaseSet<deterministic::Context>>::finalize(
2813                &databases,
2814                (TestMerkleized, TestMerkleized),
2815            )
2816            .await;
2817        });
2818    }
2819
2820    type TestAnchor = Anchor<sha256::Digest>;
2821
2822    fn anchor(n: u64) -> TestAnchor {
2823        mock_anchor(n, n as u8)
2824    }
2825
2826    #[test]
2827    fn single_tip_update_drain_keeps_highest_recorded_target() {
2828        deterministic::Runner::default().start(|_context| async move {
2829            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
2830            let (target_tx, mut target_rx) = mpsc::channel(4);
2831            let (newer_update, newer_observed) = TipUpdate::with_observation(anchor(2), 2u64);
2832            let (older_update, older_observed) = TipUpdate::with_observation(anchor(1), 1u64);
2833
2834            let _ = tip_tx.send(newer_update).await;
2835            let _ = tip_tx.send(older_update).await;
2836
2837            let mut tip_updates = Some(tip_rx);
2838            let mut current_anchor = anchor(0);
2839            let mut current_target = 0u64;
2840            assert!(
2841                drain_single_tip_updates(
2842                    &mut tip_updates,
2843                    &target_tx,
2844                    &mut current_anchor,
2845                    &mut current_target,
2846                )
2847                .await
2848            );
2849
2850            newer_observed
2851                .await
2852                .expect("newer update should be observed");
2853            older_observed
2854                .await
2855                .expect("older update should also be observed");
2856            assert_eq!(current_anchor, anchor(2));
2857            assert_eq!(current_target, 2);
2858            assert_eq!(target_rx.recv().await, Some(2));
2859            assert!(matches!(
2860                target_rx.try_recv(),
2861                Err(mpsc::error::TryRecvError::Empty)
2862            ));
2863        });
2864    }
2865
2866    #[test]
2867    fn single_tip_update_drain_advances_anchor_without_duplicate_target() {
2868        deterministic::Runner::default().start(|_context| async move {
2869            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
2870            let (target_tx, mut target_rx) = mpsc::channel(1);
2871            let (update, observed) = TipUpdate::with_observation(anchor(3), 7u64);
2872
2873            let _ = tip_tx.send(update).await;
2874
2875            let mut tip_updates = Some(tip_rx);
2876            let mut current_anchor = anchor(2);
2877            let mut current_target = 7u64;
2878            assert!(
2879                drain_single_tip_updates(
2880                    &mut tip_updates,
2881                    &target_tx,
2882                    &mut current_anchor,
2883                    &mut current_target,
2884                )
2885                .await
2886            );
2887
2888            observed.await.expect("update should be observed");
2889            assert_eq!(current_anchor, anchor(3));
2890            assert_eq!(current_target, 7);
2891            assert!(matches!(
2892                target_rx.try_recv(),
2893                Err(mpsc::error::TryRecvError::Empty)
2894            ));
2895        });
2896    }
2897
2898    #[test]
2899    fn single_state_sync_handles_closed_tip_updates_channel() {
2900        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
2901            let (tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
2902            let release = Arc::new(AtomicBool::new(false));
2903            let release_for_sync = release.clone();
2904
2905            let sync = context.child("single_state_sync_closed_tip_updates").spawn(
2906                move |context| async move {
2907                    <Arc<AsyncRwLock<SlowSyncDb>> as StateSyncSet<
2908                        deterministic::Context,
2909                        Arc<AtomicBool>,
2910                        sha256::Digest,
2911                    >>::sync(
2912                        context,
2913                        (),
2914                        release_for_sync,
2915                        anchor(0),
2916                        0,
2917                        tip_rx,
2918                        SyncEngineConfig {
2919                            fetch_batch_size: NonZeroU64::new(1).unwrap(),
2920                            apply_batch_size: 1,
2921                            max_outstanding_requests: 1,
2922                            update_channel_size: NonZeroUsize::new(1).unwrap(),
2923                            max_retained_roots: 0,
2924                        },
2925                    )
2926                    .await
2927                    .expect("single state sync should succeed")
2928                },
2929            );
2930
2931            drop(tip_tx);
2932            context.sleep(Duration::from_millis(1)).await;
2933            release.store(true, Ordering::SeqCst);
2934
2935            let (_database, converged_anchor) = sync.await.expect("sync task should complete");
2936            assert_eq!(converged_anchor, anchor(0));
2937        });
2938    }
2939
2940    #[test]
2941    fn single_state_sync_preserves_db_error_when_target_channel_closes() {
2942        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
2943            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
2944            let _ = tip_tx.send(TipUpdate::new(anchor(1), 1u64)).await;
2945
2946            let result = <Arc<AsyncRwLock<FailingStateSyncDb>> as StateSyncSet<
2947                deterministic::Context,
2948                (),
2949                sha256::Digest,
2950            >>::sync(
2951                context,
2952                (),
2953                (),
2954                anchor(0),
2955                0,
2956                tip_rx,
2957                SyncEngineConfig {
2958                    fetch_batch_size: NonZeroU64::new(1).unwrap(),
2959                    apply_batch_size: 1,
2960                    max_outstanding_requests: 1,
2961                    update_channel_size: NonZeroUsize::new(1).unwrap(),
2962                    max_retained_roots: 0,
2963                },
2964            )
2965            .await;
2966
2967            assert!(matches!(result, Err(TestSyncError)));
2968        });
2969    }
2970
2971    #[test]
2972    fn single_state_sync_ignores_backward_tip_updates() {
2973        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
2974            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
2975            let release = Arc::new(AtomicBool::new(true));
2976            let resolver = SlowSyncController {
2977                release: release.clone(),
2978            };
2979
2980            let sync = context
2981                .child("single_state_sync_ignores_backward_tip_updates")
2982                .spawn(move |context| async move {
2983                    <Arc<AsyncRwLock<ObservedSlowSyncDb>> as StateSyncSet<
2984                        deterministic::Context,
2985                        SlowSyncController,
2986                        sha256::Digest,
2987                    >>::sync(
2988                        context,
2989                        (),
2990                        resolver,
2991                        anchor(0),
2992                        0,
2993                        tip_rx,
2994                        SyncEngineConfig {
2995                            fetch_batch_size: NonZeroU64::new(1).unwrap(),
2996                            apply_batch_size: 1,
2997                            max_outstanding_requests: 1,
2998                            update_channel_size: NonZeroUsize::new(4).unwrap(),
2999                            max_retained_roots: 0,
3000                        },
3001                    )
3002                    .await
3003                    .expect("single state sync should succeed")
3004                });
3005
3006            let _ = tip_tx.send(TipUpdate::new(anchor(2), 2)).await;
3007            let _ = tip_tx.send(TipUpdate::new(anchor(1), 1)).await;
3008            drop(tip_tx);
3009
3010            let (database, converged_anchor) = sync.await.expect("sync task should complete");
3011            let final_target = database.read().await.final_target;
3012            assert_eq!(
3013                final_target, 2,
3014                "single-db sync target must never move backward"
3015            );
3016            assert_eq!(
3017                converged_anchor,
3018                anchor(2),
3019                "converged anchor must remain on the highest seen tip"
3020            );
3021        });
3022    }
3023
3024    #[test]
3025    fn single_state_sync_advances_anchor_without_duplicate_target_update() {
3026        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3027            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3028            let release = Arc::new(AtomicBool::new(false));
3029            let release_for_sync = release.clone();
3030
3031            let sync = context.child("single_state_sync_noop_target_update").spawn(
3032                move |context| async move {
3033                    <Arc<AsyncRwLock<RejectDuplicateTargetSyncDb>> as StateSyncSet<
3034                        deterministic::Context,
3035                        Arc<AtomicBool>,
3036                        sha256::Digest,
3037                    >>::sync(
3038                        context,
3039                        (),
3040                        release_for_sync,
3041                        anchor(7),
3042                        7,
3043                        tip_rx,
3044                        SyncEngineConfig {
3045                            fetch_batch_size: NonZeroU64::new(1).unwrap(),
3046                            apply_batch_size: 1,
3047                            max_outstanding_requests: 1,
3048                            update_channel_size: NonZeroUsize::new(4).unwrap(),
3049                            max_retained_roots: 0,
3050                        },
3051                    )
3052                    .await
3053                    .expect("single state sync should succeed")
3054                },
3055            );
3056
3057            let (update, observed) = TipUpdate::with_observation(anchor(9), 7);
3058            let _ = tip_tx.send(update).await;
3059            observed
3060                .await
3061                .expect("single-db coordinator should record noop target update");
3062            release.store(true, Ordering::SeqCst);
3063            drop(tip_tx);
3064
3065            let (database, converged_anchor) = sync.await.expect("sync task should complete");
3066            assert_eq!(database.read().await.final_target, 7);
3067            assert_eq!(converged_anchor, anchor(9));
3068        });
3069    }
3070
3071    #[test]
3072    fn single_state_sync_ignores_stale_reached_after_forwarded_tip() {
3073        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3074            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3075
3076            let sync =
3077                context
3078                    .child("single_state_sync_stale_reached")
3079                    .spawn(move |context| async move {
3080                        <Arc<AsyncRwLock<StaleReachedSyncDb>> as StateSyncSet<
3081                            deterministic::Context,
3082                            (),
3083                            sha256::Digest,
3084                        >>::sync(
3085                            context,
3086                            (),
3087                            (),
3088                            anchor(0),
3089                            0,
3090                            tip_rx,
3091                            SyncEngineConfig {
3092                                fetch_batch_size: NonZeroU64::new(1).unwrap(),
3093                                apply_batch_size: 1,
3094                                max_outstanding_requests: 1,
3095                                update_channel_size: NonZeroUsize::new(4).unwrap(),
3096                                max_retained_roots: 0,
3097                            },
3098                        )
3099                        .await
3100                        .expect("single state sync should succeed")
3101                    });
3102
3103            let _ = tip_tx.send(TipUpdate::new(anchor(2), 2)).await;
3104
3105            let (database, converged_anchor) = sync.await.expect("sync task should complete");
3106            let final_target = database.read().await.final_target;
3107            assert_eq!(
3108                final_target, 2,
3109                "single-db sync must not finish on a stale reached target",
3110            );
3111            assert_eq!(
3112                converged_anchor,
3113                anchor(2),
3114                "converged anchor must match the target the database reached",
3115            );
3116        });
3117    }
3118
3119    #[test]
3120    fn tuple_state_sync_converges_before_finish() {
3121        deterministic::Runner::default().start(|context| async move {
3122            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3123            let slow_release = Arc::new(AtomicBool::new(false));
3124            let fast_done = Arc::new(AtomicBool::new(false));
3125
3126            let slow_release_for_sync = slow_release.clone();
3127            let fast_done_for_sync = fast_done.clone();
3128            let sync = context
3129                .child("tuple_state_sync")
3130                .spawn(move |context| async move {
3131                    <(Arc<AsyncRwLock<SlowSyncDb>>, Arc<AsyncRwLock<FastSyncDb>>) as StateSyncSet<
3132                        deterministic::Context,
3133                        (Arc<AtomicBool>, Arc<AtomicBool>),
3134                        sha256::Digest,
3135                    >>::sync(
3136                        context,
3137                        ((), ()),
3138                        (slow_release_for_sync, fast_done_for_sync),
3139                        anchor(0),
3140                        (0, 0),
3141                        tip_rx,
3142                        SyncEngineConfig {
3143                            fetch_batch_size: NonZeroU64::new(1).unwrap(),
3144                            apply_batch_size: 1,
3145                            max_outstanding_requests: 1,
3146                            update_channel_size: NonZeroUsize::new(4).unwrap(),
3147                            max_retained_roots: 0,
3148                        },
3149                    )
3150                    .await
3151                    .expect("tuple state sync should succeed")
3152                });
3153
3154            while !fast_done.load(Ordering::SeqCst) {
3155                context.sleep(Duration::from_millis(1)).await;
3156            }
3157            let _ = tip_tx.send(TipUpdate::new(anchor(1), (1, 1))).await;
3158            let _ = tip_tx.send(TipUpdate::new(anchor(2), (2, 2))).await;
3159            slow_release.store(true, Ordering::SeqCst);
3160            drop(tip_tx);
3161
3162            let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3163            let slow_target = synced.0.read().await.final_target;
3164            let fast_target = synced.1.read().await.final_target;
3165
3166            assert_eq!(
3167                slow_target, fast_target,
3168                "all databases should finish on the same converged target set"
3169            );
3170            assert_eq!(
3171                converged_anchor.height.get(),
3172                slow_target,
3173                "returned anchor height should match the converged generation"
3174            );
3175        });
3176    }
3177
3178    #[test]
3179    fn tuple_state_sync_ignores_backward_tip_updates() {
3180        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3181            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(8).unwrap());
3182            let slow_release = Arc::new(AtomicBool::new(false));
3183            let fast_done = Arc::new(AtomicBool::new(false));
3184
3185            let slow_release_for_sync = slow_release.clone();
3186            let fast_done_for_sync = fast_done.clone();
3187            let sync = context
3188                .child("tuple_state_sync_ignores_backward_tip_updates")
3189                .spawn(move |context| async move {
3190                    <(Arc<AsyncRwLock<SlowSyncDb>>, Arc<AsyncRwLock<FastSyncDb>>) as StateSyncSet<
3191                        deterministic::Context,
3192                        (Arc<AtomicBool>, Arc<AtomicBool>),
3193                        sha256::Digest,
3194                    >>::sync(
3195                        context,
3196                        ((), ()),
3197                        (slow_release_for_sync, fast_done_for_sync),
3198                        anchor(0),
3199                        (0, 0),
3200                        tip_rx,
3201                        SyncEngineConfig {
3202                            fetch_batch_size: NonZeroU64::new(1).unwrap(),
3203                            apply_batch_size: 1,
3204                            max_outstanding_requests: 1,
3205                            update_channel_size: NonZeroUsize::new(8).unwrap(),
3206                            max_retained_roots: 0,
3207                        },
3208                    )
3209                    .await
3210                    .expect("tuple state sync should succeed")
3211                });
3212
3213            while !fast_done.load(Ordering::SeqCst) {
3214                context.sleep(Duration::from_millis(1)).await;
3215            }
3216
3217            let _ = tip_tx.send(TipUpdate::new(anchor(2), (2, 2))).await;
3218            let _ = tip_tx.send(TipUpdate::new(anchor(1), (1, 1))).await;
3219            drop(tip_tx);
3220            context.sleep(Duration::from_millis(1)).await;
3221            slow_release.store(true, Ordering::SeqCst);
3222
3223            let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3224            let slow_target = synced.0.read().await.final_target;
3225            let fast_target = synced.1.read().await.final_target;
3226            assert_eq!(
3227                slow_target, 2,
3228                "slow database target must never move backward"
3229            );
3230            assert_eq!(
3231                fast_target, 2,
3232                "fast database target must never move backward"
3233            );
3234            assert_eq!(
3235                converged_anchor,
3236                anchor(2),
3237                "converged anchor must remain on the highest seen tip"
3238            );
3239        });
3240    }
3241
3242    #[test]
3243    fn tuple_state_sync_rejects_database_target_mismatch() {
3244        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3245            let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
3246            let fast_done = Arc::new(AtomicBool::new(false));
3247
3248            let result = <(
3249                Arc<AsyncRwLock<MismatchedTargetSyncDb>>,
3250                Arc<AsyncRwLock<FastSyncDb>>,
3251            ) as StateSyncSet<
3252                deterministic::Context,
3253                ((), Arc<AtomicBool>),
3254                sha256::Digest,
3255            >>::sync(
3256                context,
3257                ((), ()),
3258                ((), fast_done),
3259                anchor(7),
3260                (7, 7),
3261                tip_rx,
3262                SyncEngineConfig {
3263                    fetch_batch_size: NonZeroU64::new(1).unwrap(),
3264                    apply_batch_size: 1,
3265                    max_outstanding_requests: 1,
3266                    update_channel_size: NonZeroUsize::new(1).unwrap(),
3267                    max_retained_roots: 0,
3268                },
3269            )
3270            .await;
3271
3272            let err = match result {
3273                Ok(_) => panic!("tuple state sync should reject a mismatched database target"),
3274                Err(err) => err,
3275            };
3276            assert!(
3277                err.contains("database targets do not match"),
3278                "error should identify the target mismatch, got: {err}"
3279            );
3280        });
3281    }
3282
3283    #[test]
3284    fn tuple_state_sync_returns_db_error_instead_of_panicking_when_anchor_missing() {
3285        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3286            let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
3287
3288            let result = <(
3289                Arc<AsyncRwLock<ImmediateStateSyncDb>>,
3290                Arc<AsyncRwLock<FailingStateSyncDb>>,
3291            ) as StateSyncSet<deterministic::Context, ((), ()), sha256::Digest>>::sync(
3292                context,
3293                ((), ()),
3294                ((), ()),
3295                anchor(0),
3296                (0, 0),
3297                tip_rx,
3298                SyncEngineConfig {
3299                    fetch_batch_size: NonZeroU64::new(1).unwrap(),
3300                    apply_batch_size: 1,
3301                    max_outstanding_requests: 1,
3302                    update_channel_size: NonZeroUsize::new(1).unwrap(),
3303                    max_retained_roots: 0,
3304                },
3305            )
3306            .await;
3307
3308            let err = match result {
3309                Ok(_) => panic!("tuple state sync should return the database sync error"),
3310                Err(err) => err,
3311            };
3312            assert!(
3313                err.contains("state sync failed (index 1, db"),
3314                "error should include failing database index: {err}"
3315            );
3316            assert!(
3317                err.contains("FailingStateSyncDb"),
3318                "error should include failing database type: {err}"
3319            );
3320        });
3321    }
3322
3323    #[test]
3324    fn tuple_state_sync_returns_db_error_when_other_database_waits_for_finish() {
3325        deterministic::Runner::timed(Duration::from_secs(1)).start(|context| async move {
3326            let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
3327            let release = Arc::new(AtomicBool::new(true));
3328
3329            let result = <(
3330                Arc<AsyncRwLock<SlowSyncDb>>,
3331                Arc<AsyncRwLock<FailingStateSyncDb>>,
3332            ) as StateSyncSet<
3333                deterministic::Context,
3334                (Arc<AtomicBool>, ()),
3335                sha256::Digest,
3336            >>::sync(
3337                context,
3338                ((), ()),
3339                (release, ()),
3340                anchor(0),
3341                (0, 0),
3342                tip_rx,
3343                SyncEngineConfig {
3344                    fetch_batch_size: NonZeroU64::new(1).unwrap(),
3345                    apply_batch_size: 1,
3346                    max_outstanding_requests: 1,
3347                    update_channel_size: NonZeroUsize::new(1).unwrap(),
3348                    max_retained_roots: 0,
3349                },
3350            )
3351            .await;
3352
3353            let err = match result {
3354                Ok(_) => panic!("tuple state sync should return the database sync error"),
3355                Err(err) => err,
3356            };
3357            assert!(
3358                err.contains("state sync failed (index 1, db"),
3359                "error should include failing database index: {err}"
3360            );
3361            assert!(
3362                err.contains("FailingStateSyncDb"),
3363                "error should include failing database type: {err}"
3364            );
3365        });
3366    }
3367
3368    #[test]
3369    fn tuple_state_sync_preserves_original_failure_when_peer_finish_channel_closes() {
3370        deterministic::Runner::timed(Duration::from_secs(1)).start(|context| async move {
3371            let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
3372
3373            let result = <(
3374                Arc<AsyncRwLock<FinishClosedSyncDb>>,
3375                Arc<AsyncRwLock<FailingStateSyncDb>>,
3376            ) as StateSyncSet<deterministic::Context, ((), ()), sha256::Digest>>::sync(
3377                context,
3378                ((), ()),
3379                ((), ()),
3380                anchor(0),
3381                (0, 0),
3382                tip_rx,
3383                SyncEngineConfig {
3384                    fetch_batch_size: NonZeroU64::new(1).unwrap(),
3385                    apply_batch_size: 1,
3386                    max_outstanding_requests: 1,
3387                    update_channel_size: NonZeroUsize::new(1).unwrap(),
3388                    max_retained_roots: 0,
3389                },
3390            )
3391            .await;
3392
3393            let err = match result {
3394                Ok(_) => panic!("tuple state sync should return the database sync error"),
3395                Err(err) => err,
3396            };
3397            assert!(
3398                err.contains("state sync failed (index 1, db"),
3399                "error should include failing database index, got: {err}",
3400            );
3401            assert!(
3402                err.contains("FailingStateSyncDb"),
3403                "error should include failing database type, got: {err}",
3404            );
3405        });
3406    }
3407
3408    #[test]
3409    fn coordinator_rejects_stale_reached_event_from_older_generation() {
3410        let mut state = CoordinatorState::new(2, anchor(0), (0u64, 0u64));
3411
3412        state.record_tip_update(anchor(1), (1, 1));
3413        match state.next_action() {
3414            CoordinatorAction::Dispatch {
3415                generation,
3416                targets: (left, right),
3417            } => {
3418                assert_eq!(generation, 1, "coordinator should dispatch generation 1");
3419                assert_eq!((left, right), (1, 1));
3420            }
3421            CoordinatorAction::Wait => panic!("coordinator should dispatch the newer tip"),
3422            CoordinatorAction::Converged { anchor, .. } => {
3423                panic!("coordinator converged too early at {anchor:?}")
3424            }
3425        }
3426
3427        // This reached event belongs to generation 0 but arrives after the
3428        // coordinator has already advanced the database to generation 1.
3429        state.record_reached(1, 0);
3430
3431        // Only database 0 has actually reached generation 1 so far.
3432        state.record_reached(0, 1);
3433
3434        match state.next_action() {
3435            CoordinatorAction::Wait => {}
3436            CoordinatorAction::Dispatch { targets, .. } => {
3437                panic!(
3438                    "coordinator should wait for a fresh reached event, got dispatch {targets:?}"
3439                )
3440            }
3441            CoordinatorAction::Converged { anchor, .. } => {
3442                panic!("stale reached event must not allow convergence at {anchor:?}")
3443            }
3444        }
3445    }
3446
3447    #[test]
3448    fn coordinator_dispatches_pending_tip_before_converging() {
3449        let mut state = CoordinatorState::new(2, anchor(0), (0u64, 0u64));
3450
3451        state.record_tip_update(anchor(1), (1, 1));
3452        match state.next_action() {
3453            CoordinatorAction::Dispatch {
3454                generation,
3455                targets: (left, right),
3456            } => {
3457                assert_eq!(generation, 1, "coordinator should dispatch generation 1");
3458                assert_eq!((left, right), (1, 1));
3459            }
3460            CoordinatorAction::Wait => panic!("coordinator should dispatch the newer tip"),
3461            CoordinatorAction::Converged { anchor, .. } => {
3462                panic!("coordinator converged too early at {anchor:?}")
3463            }
3464        }
3465
3466        state.record_reached(0, 1);
3467        state.record_reached(1, 1);
3468        state.record_tip_update(anchor(2), (2, 2));
3469
3470        match state.next_action() {
3471            CoordinatorAction::Dispatch {
3472                generation,
3473                targets: (left, right),
3474            } => {
3475                assert_eq!(generation, 2, "coordinator should advance to generation 2");
3476                assert_eq!((left, right), (2, 2));
3477            }
3478            CoordinatorAction::Wait => panic!("coordinator should dispatch the pending tip"),
3479            CoordinatorAction::Converged { anchor, .. } => {
3480                panic!("coordinator should not converge with a pending tip: {anchor:?}")
3481            }
3482        }
3483    }
3484
3485    #[test]
3486    fn tuple_state_sync_stops_updates_after_reached_until_regroup() {
3487        deterministic::Runner::default().start(|context| async move {
3488            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(32).unwrap());
3489            let slow_release = Arc::new(AtomicBool::new(true));
3490            let fast_ready = Arc::new(AtomicBool::new(false));
3491            let fast_update_count = Arc::new(AtomicUsize::new(0));
3492
3493            let slow_resolver = SlowSyncController {
3494                release: slow_release.clone(),
3495            };
3496            let fast_resolver = FastSyncObserver {
3497                ready: fast_ready.clone(),
3498                update_count: fast_update_count.clone(),
3499            };
3500            let sync = context.child("tuple_state_sync_algorithm").spawn(
3501                move |context| async move {
3502                    <(
3503                        Arc<AsyncRwLock<ObservedSlowSyncDb>>,
3504                        Arc<AsyncRwLock<ObservedFastSyncDb>>,
3505                    ) as StateSyncSet<
3506                        deterministic::Context,
3507                        (SlowSyncController, FastSyncObserver),
3508                        sha256::Digest,
3509                    >>::sync(
3510                        context,
3511                        ((), ()),
3512                        (slow_resolver, fast_resolver),
3513                        anchor(0),
3514                        (0, 0),
3515                        tip_rx,
3516                        SyncEngineConfig {
3517                            fetch_batch_size: NonZeroU64::new(1).unwrap(),
3518                            apply_batch_size: 1,
3519                            max_outstanding_requests: 1,
3520                            update_channel_size: NonZeroUsize::new(1).unwrap(),
3521                            max_retained_roots: 0,
3522                        },
3523                    )
3524                    .await
3525                    .expect("tuple state sync should succeed")
3526                },
3527            );
3528
3529            while !fast_ready.load(Ordering::SeqCst) {
3530                context.sleep(Duration::from_millis(1)).await;
3531            }
3532
3533            for target in 1..=16u64 {
3534                let _ = tip_tx.send(TipUpdate::new(anchor(target), (target, target))).await;
3535            }
3536            drop(tip_tx);
3537
3538            let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3539            let slow_target = synced.0.read().await.final_target;
3540            let fast_target = synced.1.read().await.final_target;
3541
3542            assert_eq!(
3543                slow_target, fast_target,
3544                "all databases should finish on the same converged target set"
3545            );
3546            assert_eq!(
3547                converged_anchor.height.get(), slow_target,
3548                "returned anchor height should match the converged generation"
3549            );
3550            assert_eq!(
3551                fast_update_count.load(Ordering::SeqCst),
3552                1,
3553                "a reached database must not receive tip updates before regroup; only regroup retarget should be observed"
3554            );
3555        });
3556    }
3557
3558    #[test]
3559    fn tuple_state_sync_allows_noop_database_while_other_catches_up() {
3560        deterministic::Runner::default().start(|context| async move {
3561            let (tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3562            let slow_release = Arc::new(AtomicBool::new(false));
3563            let fast_ready = Arc::new(AtomicBool::new(false));
3564            let fast_update_count = Arc::new(AtomicUsize::new(0));
3565            let target = 7u64;
3566
3567            let sync = context.child("tuple_state_sync_noop").spawn({
3568                let slow_resolver = slow_release.clone();
3569                let fast_resolver = FastSyncObserver {
3570                    ready: fast_ready.clone(),
3571                    update_count: fast_update_count.clone(),
3572                };
3573                move |context| async move {
3574                    <(
3575                        Arc<AsyncRwLock<SlowSyncDb>>,
3576                        Arc<AsyncRwLock<ObservedFastSyncDb>>,
3577                    ) as StateSyncSet<
3578                        deterministic::Context,
3579                        (Arc<AtomicBool>, FastSyncObserver),
3580                        sha256::Digest,
3581                    >>::sync(
3582                        context,
3583                        ((), ()),
3584                        (slow_resolver, fast_resolver),
3585                        anchor(target),
3586                        (target, target),
3587                        tip_rx,
3588                        SyncEngineConfig {
3589                            fetch_batch_size: NonZeroU64::new(1).unwrap(),
3590                            apply_batch_size: 1,
3591                            max_outstanding_requests: 1,
3592                            update_channel_size: NonZeroUsize::new(1).unwrap(),
3593                            max_retained_roots: 0,
3594                        },
3595                    )
3596                    .await
3597                    .expect("tuple state sync should succeed")
3598                }
3599            });
3600
3601            while !fast_ready.load(Ordering::SeqCst) {
3602                context.sleep(Duration::from_millis(1)).await;
3603            }
3604
3605            drop(tip_tx);
3606            slow_release.store(true, Ordering::SeqCst);
3607
3608            let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3609            let slow_target = synced.0.read().await.final_target;
3610            let fast_target = synced.1.read().await.final_target;
3611
3612            assert_eq!(slow_target, target);
3613            assert_eq!(fast_target, target);
3614            assert_eq!(converged_anchor, anchor(target));
3615            assert_eq!(
3616                fast_update_count.load(Ordering::SeqCst),
3617                0,
3618                "already-at-target database should not receive tip updates"
3619            );
3620        });
3621    }
3622
3623    #[test]
3624    fn tuple_state_sync_regroup_completes_when_database_target_is_unchanged() {
3625        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
3626            let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
3627            let slow_release = Arc::new(AtomicBool::new(false));
3628            let fast_ready = Arc::new(AtomicBool::new(false));
3629            let fast_update_count = Arc::new(AtomicUsize::new(0));
3630
3631            let sync = context
3632                .child("tuple_state_sync_regroup_unchanged_target")
3633                .spawn({
3634                    let slow_resolver = slow_release.clone();
3635                    let fast_resolver = FastSyncObserver {
3636                        ready: fast_ready.clone(),
3637                        update_count: fast_update_count.clone(),
3638                    };
3639                    move |context| async move {
3640                        <(
3641                            Arc<AsyncRwLock<SlowSyncDb>>,
3642                            Arc<AsyncRwLock<DistinctObservedFastSyncDb>>,
3643                        ) as StateSyncSet<
3644                            deterministic::Context,
3645                            (Arc<AtomicBool>, FastSyncObserver),
3646                            sha256::Digest,
3647                        >>::sync(
3648                            context,
3649                            ((), ()),
3650                            (slow_resolver, fast_resolver),
3651                            anchor(0),
3652                            (0, 7),
3653                            tip_rx,
3654                            SyncEngineConfig {
3655                                fetch_batch_size: NonZeroU64::new(1).unwrap(),
3656                                apply_batch_size: 1,
3657                                max_outstanding_requests: 1,
3658                                update_channel_size: NonZeroUsize::new(4).unwrap(),
3659                                max_retained_roots: 0,
3660                            },
3661                        )
3662                        .await
3663                        .expect("tuple state sync should succeed")
3664                    }
3665                });
3666
3667            while !fast_ready.load(Ordering::SeqCst) {
3668                context.sleep(Duration::from_millis(1)).await;
3669            }
3670
3671            let _ = tip_tx.send(TipUpdate::new(anchor(9), (9, 7))).await;
3672            context.sleep(Duration::from_millis(1)).await;
3673            slow_release.store(true, Ordering::SeqCst);
3674            drop(tip_tx);
3675
3676            let (synced, converged_anchor) = sync.await.expect("sync task should complete");
3677            let slow_target = synced.0.read().await.final_target;
3678            let fast_target = synced.1.read().await.final_target;
3679
3680            assert_eq!(slow_target, 9);
3681            assert_eq!(fast_target, 7);
3682            assert_eq!(converged_anchor, anchor(9));
3683            assert_eq!(
3684                fast_update_count.load(Ordering::SeqCst),
3685                0,
3686                "the unchanged-target database should not receive duplicate target updates",
3687            );
3688        });
3689    }
3690
3691    #[derive(Default)]
3692    struct AttachDb1;
3693
3694    #[derive(Default)]
3695    struct AttachDb2;
3696
3697    #[derive(Clone)]
3698    struct RecordingResolver {
3699        id: &'static str,
3700        log: Arc<commonware_utils::sync::Mutex<Vec<&'static str>>>,
3701    }
3702
3703    impl RecordingResolver {
3704        fn new(
3705            id: &'static str,
3706            log: Arc<commonware_utils::sync::Mutex<Vec<&'static str>>>,
3707        ) -> Self {
3708            Self { id, log }
3709        }
3710    }
3711
3712    impl<DB: Send + Sync + 'static> AttachableResolver<DB> for RecordingResolver {
3713        async fn attach_database(&self, _db: Arc<AsyncRwLock<DB>>) {
3714            self.log.lock().push(self.id);
3715        }
3716    }
3717
3718    #[test]
3719    fn single_db_attach_calls_single_resolver() {
3720        deterministic::Runner::default().start(|_| async move {
3721            let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new()));
3722            let resolver = RecordingResolver::new("db1", log.clone());
3723            let db = Arc::new(AsyncRwLock::new(AttachDb1));
3724
3725            resolver.attach_databases(db).await;
3726            assert_eq!(&*log.lock(), &["db1"]);
3727        });
3728    }
3729
3730    #[test]
3731    fn tuple_attach_is_index_stable() {
3732        deterministic::Runner::default().start(|_| async move {
3733            let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new()));
3734            let resolvers = (
3735                RecordingResolver::new("resolver_0", log.clone()),
3736                RecordingResolver::new("resolver_1", log.clone()),
3737            );
3738            let databases = (
3739                Arc::new(AsyncRwLock::new(AttachDb1)),
3740                Arc::new(AsyncRwLock::new(AttachDb2)),
3741            );
3742
3743            resolvers.attach_databases(databases).await;
3744            assert_eq!(&*log.lock(), &["resolver_0", "resolver_1"]);
3745        });
3746    }
3747
3748    #[test]
3749    fn heterogeneous_tuple_attach_compiles() {
3750        deterministic::Runner::default().start(|_| async move {
3751            let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new()));
3752            let resolvers = (
3753                RecordingResolver::new("db1", log.clone()),
3754                RecordingResolver::new("db2", log.clone()),
3755            );
3756            let databases = (
3757                Arc::new(AsyncRwLock::new(AttachDb1)),
3758                Arc::new(AsyncRwLock::new(AttachDb2)),
3759            );
3760
3761            resolvers.attach_databases(databases).await;
3762            assert_eq!(&*log.lock(), &["db1", "db2"]);
3763        });
3764    }
3765}