use commonware_consensus::{
types::{Height, Round},
CertifiableBlock, Epochable, Roundable, Viewable,
};
use commonware_cryptography::Digest;
use commonware_macros::select;
use commonware_runtime::{reschedule, Metrics, Spawner};
use commonware_utils::{
channel::{fallible::AsyncFallibleExt, mpsc, oneshot, ring},
sync::AsyncRwLock,
};
use futures::{
future::{pending, Either},
join,
};
use std::{
collections::BTreeMap,
fmt::Debug,
future::Future,
num::{NonZeroU64, NonZeroUsize},
sync::Arc,
};
const MAX_CHANNEL_DRAIN_PER_TICK: usize = 32;
pub mod any;
pub mod current;
pub mod immutable;
pub mod keyless;
pub mod p2p;
pub trait Unmerkleized: Sized + Send {
type Merkleized: Merkleized;
type Error: Send;
fn merkleize(self) -> impl Future<Output = Result<Self::Merkleized, Self::Error>> + Send;
}
pub trait Merkleized: Sized + Send + Sync {
type Digest: Digest;
type Unmerkleized: Unmerkleized;
fn root(&self) -> Self::Digest;
fn new_batch(&self) -> Self::Unmerkleized;
}
pub trait ManagedDb<E>: Send + Sync + Sized {
type Unmerkleized: Unmerkleized;
type Merkleized: Merkleized<Unmerkleized = Self::Unmerkleized>;
type Error: Debug + Send;
type Config: Send;
type SyncTarget: Clone + PartialEq + Send + Sync;
fn init(
context: E,
config: Self::Config,
) -> impl Future<Output = Result<Self, Self::Error>> + Send;
fn new_batch(db: &Arc<AsyncRwLock<Self>>) -> impl Future<Output = Self::Unmerkleized> + Send;
fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool;
fn finalize(
&mut self,
batch: Self::Merkleized,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn sync_target(&self) -> impl Future<Output = Self::SyncTarget> + Send;
fn rewind_to_target(
&mut self,
target: Self::SyncTarget,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn max_rewind_depth() -> Option<usize> {
None
}
}
pub trait DatabaseSet<E>: Clone + Send + Sync + 'static {
type Unmerkleized: Send;
type Merkleized: Send + Sync;
type Config: Send;
type SyncTargets: Clone + PartialEq + Send + Sync;
fn init(context: E, config: Self::Config) -> impl Future<Output = Self> + Send;
fn new_batches(&self) -> impl Future<Output = Self::Unmerkleized> + Send;
fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized;
fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool;
fn finalize(&self, batches: Self::Merkleized) -> impl Future<Output = ()> + Send;
fn committed_targets(&self) -> impl Future<Output = Self::SyncTargets> + Send;
fn rewind_to_targets(&self, targets: Self::SyncTargets) -> impl Future<Output = ()> + Send;
fn max_rewind_depth() -> Option<usize>;
}
pub(crate) fn assert_rewind_window_safety<E, D>(max_pending_acks: NonZeroUsize)
where
D: DatabaseSet<E>,
{
let Some(max_rewind_depth) = D::max_rewind_depth() else {
return;
};
assert!(
max_pending_acks.get() <= max_rewind_depth,
"marshal max_pending_acks={} exceeds database_set.max_rewind_depth={}",
max_pending_acks,
max_rewind_depth,
);
}
#[derive(Clone, Copy, Debug)]
pub struct SyncEngineConfig {
pub fetch_batch_size: NonZeroU64,
pub apply_batch_size: usize,
pub max_outstanding_requests: usize,
pub update_channel_size: NonZeroUsize,
pub max_retained_roots: usize,
}
pub trait StateSyncDb<E, R>: ManagedDb<E> {
type SyncError: Debug + Send;
#[allow(clippy::too_many_arguments)]
fn sync_db(
context: E,
config: Self::Config,
resolver: R,
target: Self::SyncTarget,
tip_updates: mpsc::Receiver<Self::SyncTarget>,
finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
sync_config: SyncEngineConfig,
) -> impl Future<Output = Result<Self, Self::SyncError>> + Send;
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct Anchor<D: Digest> {
pub height: Height,
pub round: Round,
pub digest: D,
}
impl<B, D> From<&B> for Anchor<D>
where
B: CertifiableBlock<Digest = D>,
B::Context: Epochable + Viewable,
D: Digest,
{
fn from(block: &B) -> Self {
Self {
height: block.height(),
round: block.context().round(),
digest: block.digest(),
}
}
}
pub struct TipUpdate<D: Digest, T> {
anchor: Anchor<D>,
targets: T,
observed: Option<oneshot::Sender<()>>,
}
impl<D: Digest, T> TipUpdate<D, T> {
pub const fn new(anchor: Anchor<D>, targets: T) -> Self {
Self {
anchor,
targets,
observed: None,
}
}
pub(crate) fn with_observation(anchor: Anchor<D>, targets: T) -> (Self, oneshot::Receiver<()>) {
let (observed, receiver) = oneshot::channel();
(
Self {
anchor,
targets,
observed: Some(observed),
},
receiver,
)
}
pub(crate) fn record(mut self) -> (Anchor<D>, T) {
if let Some(observed) = self.observed.take() {
let _ = observed.send(());
}
(self.anchor, self.targets)
}
}
pub trait StateSyncSet<E, R, D>: DatabaseSet<E>
where
D: Digest,
{
type Error: Debug + Send;
#[allow(clippy::too_many_arguments)]
fn sync(
context: E,
config: Self::Config,
resolvers: R,
anchor: Anchor<D>,
targets: Self::SyncTargets,
tip_updates: ring::Receiver<TipUpdate<D, Self::SyncTargets>>,
sync_config: SyncEngineConfig,
) -> impl Future<Output = Result<(Self, Anchor<D>), Self::Error>> + Send;
}
impl<E: Send + Sync, T: ManagedDb<E> + 'static> DatabaseSet<E> for Arc<AsyncRwLock<T>> {
type Unmerkleized = T::Unmerkleized;
type Merkleized = T::Merkleized;
type Config = T::Config;
type SyncTargets = T::SyncTarget;
async fn init(context: E, config: Self::Config) -> Self {
let db = T::init(context, config)
.await
.expect("database init failed");
Self::new(AsyncRwLock::new(db))
}
async fn new_batches(&self) -> Self::Unmerkleized {
T::new_batch(self).await
}
fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized {
parent.new_batch()
}
fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool {
T::matches_sync_target(batches, targets)
}
async fn finalize(&self, batches: Self::Merkleized) {
let mut database = self.write().await;
finalize_or_panic(&mut *database, batches, None).await;
}
async fn committed_targets(&self) -> Self::SyncTargets {
let database = self.read().await;
T::sync_target(&*database).await
}
async fn rewind_to_targets(&self, target: Self::SyncTargets) {
let mut database = self.write().await;
if T::sync_target(&*database).await == target {
return;
}
rewind_or_panic(&mut *database, target, None).await;
}
fn max_rewind_depth() -> Option<usize> {
T::max_rewind_depth()
}
}
impl<E, T, R, D> StateSyncSet<E, R, D> for Arc<AsyncRwLock<T>>
where
E: Send + Sync + Metrics,
T: StateSyncDb<E, R> + 'static,
R: Send + 'static,
D: Digest,
{
type Error = T::SyncError;
#[allow(clippy::too_many_arguments)]
async fn sync(
context: E,
config: Self::Config,
resolver: R,
anchor: Anchor<D>,
target: Self::SyncTargets,
tip_updates: ring::Receiver<TipUpdate<D, Self::SyncTargets>>,
sync_config: SyncEngineConfig,
) -> Result<(Self, Anchor<D>), Self::Error> {
let (target_tx, target_rx) = mpsc::channel(sync_config.update_channel_size.get());
let (finish_tx, finish_rx) = mpsc::channel(1);
let (reached_tx, mut reached_rx) = mpsc::channel(1);
let mut current_target = target.clone();
let sync = T::sync_db(
context,
config,
resolver,
target,
target_rx,
Some(finish_rx),
Some(reached_tx),
sync_config,
);
let coordinator = async {
let mut current_anchor = anchor;
let mut tip_updates = Some(tip_updates);
loop {
if !drain_single_tip_updates(
&mut tip_updates,
&target_tx,
&mut current_anchor,
&mut current_target,
)
.await
{
return (current_anchor, current_target);
}
let update_future = tip_updates.as_mut().map_or_else(
|| Either::Right(pending()),
|updates| Either::Left(updates.recv()),
);
select! {
reached = reached_rx.recv() => {
let Some(reached) = reached else {
return (current_anchor, current_target);
};
if !drain_single_tip_updates(
&mut tip_updates,
&target_tx,
&mut current_anchor,
&mut current_target,
)
.await
{
return (current_anchor, current_target);
};
if reached != current_target {
continue;
}
let _ = finish_tx.send_lossy(()).await;
return (current_anchor, current_target);
},
update = update_future => {
let Some(update) = update else {
tip_updates = None;
continue;
};
let (new_anchor, new_target) = update.record();
if new_anchor.height <= current_anchor.height {
continue;
}
current_anchor = new_anchor;
if new_target == current_target {
continue;
}
current_target = new_target.clone();
if !target_tx.send_lossy(new_target).await {
return (current_anchor, current_target);
}
},
}
}
};
let (db_result, (converged_anchor, converged_target)) = join!(sync, coordinator);
let database = db_result?;
assert!(
T::sync_target(&database).await == converged_target,
"state sync database target does not match the coordinator target",
);
Ok((Self::new(AsyncRwLock::new(database)), converged_anchor))
}
}
async fn drain_single_tip_updates<D, T>(
tip_updates: &mut Option<ring::Receiver<TipUpdate<D, T>>>,
target_tx: &mpsc::Sender<T>,
current_anchor: &mut Anchor<D>,
current_target: &mut T,
) -> bool
where
D: Digest,
T: Clone + PartialEq + Send + Sync,
{
let mut drained = 0usize;
let mut latest = None;
loop {
let update = match tip_updates.as_mut().map(ring::Receiver::try_recv) {
Some(Ok(update)) => update,
Some(Err(ring::TryRecvError::Empty)) => break,
Some(Err(ring::TryRecvError::Disconnected)) => {
*tip_updates = None;
break;
}
None => break,
};
drained += 1;
let (new_anchor, new_target) = update.record();
if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) {
reschedule().await;
}
let latest_height = latest
.as_ref()
.map_or(current_anchor.height, |(anchor, _): &(Anchor<D>, T)| {
anchor.height
});
if new_anchor.height <= latest_height {
continue;
}
latest = Some((new_anchor, new_target));
}
let Some((new_anchor, new_target)) = latest else {
return true;
};
*current_anchor = new_anchor;
if new_target == *current_target {
return true;
}
*current_target = new_target.clone();
target_tx.send_lossy(new_target).await
}
macro_rules! impl_database_set {
($($T:ident : $idx:tt),+) => {
impl<E: Send + Sync + Metrics, $($T: ManagedDb<E> + 'static),+> DatabaseSet<E>
for ($(Arc<AsyncRwLock<$T>>,)+)
{
type Unmerkleized = ($($T::Unmerkleized,)+);
type Merkleized = ($($T::Merkleized,)+);
type Config = ($($T::Config,)+);
type SyncTargets = ($($T::SyncTarget,)+);
async fn init(context: E, config: Self::Config) -> Self {
let result = join!($(
async {
let db = $T::init(
context.child(concat!("db_", stringify!($idx))),
config.$idx,
)
.await
.expect(concat!(
"database init failed (index ",
stringify!($idx),
", type ",
stringify!($T),
")",
));
Arc::new(AsyncRwLock::new(db))
},
)+);
result
}
async fn new_batches(&self) -> Self::Unmerkleized {
join!($($T::new_batch(&self.$idx),)+)
}
fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized {
($(parent.$idx.new_batch(),)+)
}
fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool {
$($T::matches_sync_target(&batches.$idx, &targets.$idx))&&+
}
async fn finalize(&self, batches: Self::Merkleized) {
join!($(
async {
let mut database = self.$idx.write().await;
finalize_or_panic(&mut *database, batches.$idx, Some($idx)).await;
},
)+);
}
async fn committed_targets(&self) -> Self::SyncTargets {
join!($(
async {
let database = self.$idx.read().await;
$T::sync_target(&*database).await
},
)+)
}
async fn rewind_to_targets(&self, targets: Self::SyncTargets) {
join!($(
async {
let mut database = self.$idx.write().await;
if $T::sync_target(&*database).await == targets.$idx {
return;
}
rewind_or_panic(&mut *database, targets.$idx, Some($idx)).await;
},
)+);
}
fn max_rewind_depth() -> Option<usize> {
let mut max_rewind_depth: Option<usize> = None;
$(
max_rewind_depth = match (max_rewind_depth, $T::max_rewind_depth()) {
(Some(current), Some(next)) => Some(current.min(next)),
(Some(current), None) => Some(current),
(None, Some(next)) => Some(next),
(None, None) => None,
};
)+
max_rewind_depth
}
}
};
}
impl_database_set!(DB1: 0);
impl_database_set!(DB1: 0, DB2: 1);
impl_database_set!(DB1: 0, DB2: 1, DB3: 2);
impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3);
impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4);
impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5);
impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5, DB7: 6);
impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5, DB7: 6, DB8: 7);
struct DbSyncChannels<T> {
target_tx: mpsc::Sender<T>,
target_rx: mpsc::Receiver<T>,
finish_tx: mpsc::Sender<()>,
finish_rx: mpsc::Receiver<()>,
generation_tx: mpsc::Sender<(usize, T)>,
generation_rx: mpsc::Receiver<(usize, T)>,
reached_tx: mpsc::Sender<T>,
reached_rx: mpsc::Receiver<T>,
}
impl<T> DbSyncChannels<T> {
fn new(update_channel_size: usize) -> Self {
let (target_tx, target_rx) = mpsc::channel(update_channel_size);
let (finish_tx, finish_rx) = mpsc::channel(1);
let (generation_tx, generation_rx) = mpsc::channel(update_channel_size);
let (reached_tx, reached_rx) = mpsc::channel(1);
Self {
target_tx,
target_rx,
finish_tx,
finish_rx,
generation_tx,
generation_rx,
reached_tx,
reached_rx,
}
}
}
struct CoordinatorSyncSenders<T> {
target_tx: mpsc::Sender<T>,
finish_tx: mpsc::Sender<()>,
generation_tx: mpsc::Sender<(usize, T)>,
}
macro_rules! impl_state_sync_set {
($($T:ident : $R:ident : $idx:tt),+) => {
impl<E, D, $($T, $R),+> StateSyncSet<E, ($($R,)+), D> for ($(Arc<AsyncRwLock<$T>>,)+)
where
E: Send + Sync + Spawner + Metrics + 'static,
D: Digest + 'static,
$(
$T: StateSyncDb<E, $R> + 'static,
$R: Send + 'static,
)+
{
type Error = String;
#[allow(clippy::too_many_arguments)]
async fn sync(
context: E,
config: Self::Config,
resolvers: ($($R,)+),
anchor: Anchor<D>,
targets: Self::SyncTargets,
tip_updates: ring::Receiver<TipUpdate<D, Self::SyncTargets>>,
sync_config: SyncEngineConfig,
) -> Result<(Self, Anchor<D>), Self::Error> {
let db_channels = ($(
DbSyncChannels::<<$T as ManagedDb<E>>::SyncTarget>::new(
sync_config.update_channel_size.get(),
),
)+);
let coordinator_senders = ($(
CoordinatorSyncSenders {
target_tx: db_channels.$idx.target_tx.clone(),
finish_tx: db_channels.$idx.finish_tx.clone(),
generation_tx: db_channels.$idx.generation_tx.clone(),
},
)+);
let coordinator_owned_senders = ($(
CoordinatorSyncSenders {
target_tx: db_channels.$idx.target_tx,
finish_tx: db_channels.$idx.finish_tx,
generation_tx: db_channels.$idx.generation_tx,
},
)+);
let (reached_event_tx, mut reached_event_rx) = mpsc::channel(16);
let (completion_tx, mut completion_rx) = mpsc::channel(1);
let db_count = [$($idx,)+].len();
let coordinator_targets = targets.clone();
let initial_targets = targets.clone();
let first_db_error: Arc<commonware_utils::sync::Mutex<Option<String>>> =
Arc::new(commonware_utils::sync::Mutex::new(None));
let coordinator_handle = context.child("coordinator").spawn({
move |_context| async move {
let coordinator_owned_senders = coordinator_owned_senders;
let mut tip_updates = Some(tip_updates);
let mut state = CoordinatorState::new(db_count, anchor, coordinator_targets);
let mut last_dispatched_targets = initial_targets;
loop {
loop {
match reached_event_rx.try_recv() {
Ok((idx, generation)) => state.record_reached(idx, generation),
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => return None,
}
}
if let Some(updates) = tip_updates.as_mut() {
loop {
match updates.try_recv() {
Ok(update) => {
let (anchor, targets) = update.record();
state.record_tip_update(anchor, targets);
}
Err(ring::TryRecvError::Empty) => break,
Err(ring::TryRecvError::Disconnected) => {
tip_updates = None;
break;
}
}
}
}
match state.next_action() {
CoordinatorAction::Converged { anchor, targets } => {
$(
let _ = coordinator_senders.$idx.finish_tx.send_lossy(()).await;
)+
return Some((anchor, targets));
}
CoordinatorAction::Dispatch {
generation,
targets: dispatch_targets,
} => {
$(
let dispatch_target = dispatch_targets.$idx.clone();
if !coordinator_senders.$idx
.generation_tx
.send_lossy((generation, dispatch_target.clone()))
.await
{
return None;
}
if state.should_dispatch($idx) {
if dispatch_target != last_dispatched_targets.$idx {
if !coordinator_senders.$idx
.target_tx
.send_lossy(dispatch_target.clone())
.await
{
return None;
}
last_dispatched_targets.$idx = dispatch_target;
}
} else if dispatch_target == last_dispatched_targets.$idx {
state.mark_reached_same_target($idx, generation);
}
)+
continue;
}
CoordinatorAction::Wait => {}
}
let update_future = tip_updates.as_mut().map_or_else(
|| Either::Right(pending()),
|updates| Either::Left(updates.recv()),
);
select! {
reached_event = reached_event_rx.recv() => {
let (idx, generation) = reached_event?;
state.record_reached(idx, generation);
},
_ = completion_rx.recv() => {
drop(coordinator_owned_senders);
return None;
},
update = update_future => {
let Some(update) = update else {
tip_updates = None;
continue;
};
let (anchor, targets) = update.record();
state.record_tip_update(anchor, targets);
},
};
}
}
});
let db_handles = (
$(
context.child(concat!("db_", stringify!($idx))).spawn({
let first_db_error = first_db_error.clone();
let mut reached_target_rx = db_channels.$idx.reached_rx;
let mut generation_rx = Some(db_channels.$idx.generation_rx);
let mut current_generation = 0usize;
let mut current_target = targets.$idx.clone();
let mut last_reached_target = None;
let mut last_reported_generation = None;
let reached_event_sender = reached_event_tx.clone();
let completion_signal = completion_tx.clone();
let config = config.$idx;
let resolver = resolvers.$idx;
let target = targets.$idx;
let target_rx = db_channels.$idx.target_rx;
let finish_rx = db_channels.$idx.finish_rx;
let reached_tx = db_channels.$idx.reached_tx;
move |context| async move {
let sync = $T::sync_db(
context,
config,
resolver,
target,
target_rx,
Some(finish_rx),
Some(reached_tx),
sync_config,
);
let forward_reached = async move {
loop {
drain_generation_updates(
&mut generation_rx,
&mut current_generation,
&mut current_target,
&last_reached_target,
&mut last_reported_generation,
&reached_event_sender,
$idx,
)
.await;
let update_future = generation_rx.as_mut().map_or_else(
|| Either::Right(pending()),
|updates| Either::Left(updates.recv()),
);
select! {
reached_target = reached_target_rx.recv() => {
let Some(reached_target) = reached_target else {
return;
};
last_reached_target = Some(reached_target.clone());
drain_generation_updates(
&mut generation_rx,
&mut current_generation,
&mut current_target,
&last_reached_target,
&mut last_reported_generation,
&reached_event_sender,
$idx,
)
.await;
if reached_target != current_target {
continue;
}
if last_reported_generation != Some(current_generation) {
if !reached_event_sender
.send_lossy(($idx, current_generation))
.await
{
return;
}
last_reported_generation = Some(current_generation);
}
},
update = update_future => {
let Some((generation, target)) = update else {
generation_rx = None;
continue;
};
current_generation = generation;
current_target = target;
if last_reached_target.as_ref() == Some(¤t_target)
&& last_reported_generation != Some(current_generation)
{
if !reached_event_sender
.send_lossy(($idx, current_generation))
.await
{
return;
}
last_reported_generation = Some(current_generation);
}
},
};
}
};
let (sync_result, _) = join!(sync, forward_reached);
let result = sync_result
.map(|database| Arc::new(AsyncRwLock::new(database)))
.map_err(|err| {
format!(
"state sync failed (index {}, db {}): {err:?}",
$idx,
core::any::type_name::<$T>(),
)
});
if let Err(err) = &result {
let mut first = first_db_error.lock();
if first.is_none() {
*first = Some(err.clone());
}
}
let _ = completion_signal.send_lossy(()).await;
result
}
}),
)+
);
let synced = join!(
$(
async {
db_handles.$idx
.await
.expect("state sync database task exited")
},
)+
);
let converged_anchor = coordinator_handle
.await
.expect("state sync coordinator task exited");
if let Some(err) = first_db_error.lock().take() {
return Err(err);
}
let synced = ($(synced.$idx?,)+);
let Some((converged_anchor, converged_targets)) = converged_anchor else {
return Err("state sync coordinator did not report a converged anchor".into());
};
if <Self as DatabaseSet<E>>::committed_targets(&synced).await != converged_targets {
return Err(
"state sync database targets do not match the coordinator target set"
.into(),
);
}
Ok((synced, converged_anchor))
}
}
};
}
impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1);
impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2);
impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3);
impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4);
impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4, DB6: R6: 5);
impl_state_sync_set!(
DB1: R1: 0,
DB2: R2: 1,
DB3: R3: 2,
DB4: R4: 3,
DB5: R5: 4,
DB6: R6: 5,
DB7: R7: 6
);
impl_state_sync_set!(
DB1: R1: 0,
DB2: R2: 1,
DB3: R3: 2,
DB4: R4: 3,
DB5: R5: 4,
DB6: R6: 5,
DB7: R7: 6,
DB8: R8: 7
);
async fn drain_generation_updates<T>(
generation_rx: &mut Option<mpsc::Receiver<(usize, T)>>,
current_generation: &mut usize,
current_target: &mut T,
last_reached_target: &Option<T>,
last_reported_generation: &mut Option<usize>,
reached_event_sender: &mpsc::Sender<(usize, usize)>,
idx: usize,
) where
T: Clone + PartialEq,
{
if let Some(updates) = generation_rx.as_mut() {
let mut drained = 0usize;
loop {
match updates.try_recv() {
Ok((generation, target)) => {
drained += 1;
*current_generation = generation;
*current_target = target;
if last_reached_target.as_ref() == Some(current_target)
&& *last_reported_generation != Some(*current_generation)
{
if !reached_event_sender
.send_lossy((idx, *current_generation))
.await
{
return;
}
*last_reported_generation = Some(*current_generation);
}
if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) {
reschedule().await;
}
}
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => {
*generation_rx = None;
break;
}
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum DbSyncState {
Seeking { generation: usize },
Reached { generation: usize },
}
impl DbSyncState {
const fn generation(self) -> usize {
match self {
Self::Seeking { generation } | Self::Reached { generation } => generation,
}
}
const fn is_reached(self) -> bool {
matches!(self, Self::Reached { .. })
}
}
enum CoordinatorAction<D: Digest, T> {
Wait,
Dispatch { generation: usize, targets: T },
Converged { anchor: Anchor<D>, targets: T },
}
struct CoordinatorState<D: Digest, T> {
dbs: Vec<DbSyncState>,
generation_state: BTreeMap<usize, (Anchor<D>, T)>,
current_generation: usize,
latest_tip: Option<(Anchor<D>, T)>,
last_dispatched_anchor: Anchor<D>,
}
impl<D: Digest, T: Clone> CoordinatorState<D, T> {
fn new(db_count: usize, anchor: Anchor<D>, targets: T) -> Self {
let dbs = vec![DbSyncState::Seeking { generation: 0 }; db_count];
let mut generation_state = BTreeMap::new();
generation_state.insert(0, (anchor, targets));
Self {
dbs,
generation_state,
current_generation: 0,
latest_tip: None,
last_dispatched_anchor: anchor,
}
}
fn record_reached(&mut self, idx: usize, generation: usize) {
if self.dbs[idx].generation() != generation {
return;
}
if self.dbs[idx].is_reached() {
return;
}
self.dbs[idx] = DbSyncState::Reached { generation };
}
fn record_tip_update(&mut self, anchor: Anchor<D>, targets: T) {
let current_height = self
.latest_tip
.as_ref()
.map_or(self.last_dispatched_anchor.height, |(latest_anchor, _)| {
latest_anchor.height
});
if anchor.height <= current_height {
return;
}
self.latest_tip = Some((anchor, targets));
}
fn next_action(&mut self) -> CoordinatorAction<D, T> {
let all_reached = self.dbs.iter().all(|db| db.is_reached());
if all_reached {
let min_gen = self.dbs.iter().map(|db| db.generation()).min().unwrap();
let max_gen = self.dbs.iter().map(|db| db.generation()).max().unwrap();
if min_gen == max_gen {
if let Some((anchor, targets)) = self.latest_tip.take() {
let generation = self.current_generation + 1;
self.current_generation = generation;
for db in &mut self.dbs {
*db = DbSyncState::Seeking { generation };
}
self.generation_state
.insert(generation, (anchor, targets.clone()));
self.last_dispatched_anchor = anchor;
self.prune_generations();
return CoordinatorAction::Dispatch {
generation,
targets,
};
}
let (anchor, targets) = self
.generation_state
.get(&min_gen)
.expect("missing state for converged generation")
.clone();
return CoordinatorAction::Converged { anchor, targets };
}
let (_anchor, targets) = self
.generation_state
.get(&max_gen)
.expect("missing state for regroup generation")
.clone();
for db in &mut self.dbs {
if db.generation() != max_gen {
*db = DbSyncState::Seeking {
generation: max_gen,
};
}
}
self.prune_generations();
return CoordinatorAction::Dispatch {
generation: max_gen,
targets,
};
}
let Some((anchor, targets)) = self.latest_tip.take() else {
return CoordinatorAction::Wait;
};
let generation = self.current_generation + 1;
self.current_generation = generation;
for db in &mut self.dbs {
if !db.is_reached() {
*db = DbSyncState::Seeking { generation };
}
}
self.generation_state
.insert(generation, (anchor, targets.clone()));
self.last_dispatched_anchor = anchor;
self.prune_generations();
CoordinatorAction::Dispatch {
generation,
targets,
}
}
fn prune_generations(&mut self) {
self.generation_state
.retain(|gen, _| self.dbs.iter().any(|db| db.generation() == *gen));
}
fn should_dispatch(&self, idx: usize) -> bool {
!self.dbs[idx].is_reached()
}
fn mark_reached_same_target(&mut self, idx: usize, generation: usize) {
if !self.dbs[idx].is_reached() {
return;
}
self.dbs[idx] = DbSyncState::Reached { generation };
}
}
async fn finalize_or_panic<E, T: ManagedDb<E>>(
database: &mut T,
batch: T::Merkleized,
index: Option<usize>,
) {
if let Err(err) = database.finalize(batch).await {
match index {
Some(index) => panic!(
"database finalize failed (index {index}, type {}): {err:?}",
core::any::type_name::<T>(),
),
None => panic!(
"database finalize failed (type {}): {err:?}",
core::any::type_name::<T>(),
),
}
}
}
async fn rewind_or_panic<E, T: ManagedDb<E>>(
database: &mut T,
target: T::SyncTarget,
index: Option<usize>,
) {
if let Err(err) = database.rewind_to_target(target).await {
match index {
Some(index) => panic!(
"database rewind failed (index {index}, type {}): {err:?}",
core::any::type_name::<T>(),
),
None => panic!(
"database rewind failed (type {}): {err:?}",
core::any::type_name::<T>(),
),
}
}
}
pub trait AttachableResolver<DB>: Clone + Send + Sync + 'static {
fn attach_database(&self, db: Arc<AsyncRwLock<DB>>) -> impl Future<Output = ()> + Send;
}
pub trait AttachableResolverSet<DBs>: Clone + Send + Sync + 'static {
fn attach_databases(&self, databases: DBs) -> impl Future<Output = ()> + Send;
}
impl<R, DB> AttachableResolverSet<Arc<AsyncRwLock<DB>>> for R
where
R: AttachableResolver<DB>,
DB: Send + Sync + 'static,
{
async fn attach_databases(&self, db: Arc<AsyncRwLock<DB>>) {
self.attach_database(db).await;
}
}
macro_rules! impl_attachable_resolver_set {
($($R:ident : $DB:ident : $idx:tt),+) => {
impl<$($R, $DB),+> AttachableResolverSet<($(Arc<AsyncRwLock<$DB>>,)+)> for ($($R,)+)
where
$(
$R: AttachableResolver<$DB>,
$DB: Send + Sync + 'static,
)+
{
async fn attach_databases(&self, databases: ($(Arc<AsyncRwLock<$DB>>,)+)) {
futures::join!($(
self.$idx.attach_database(databases.$idx),
)+);
}
}
};
}
impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1);
impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2);
impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3);
impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3, R5: DB5: 4);
impl_attachable_resolver_set!(
R1: DB1: 0,
R2: DB2: 1,
R3: DB3: 2,
R4: DB4: 3,
R5: DB5: 4,
R6: DB6: 5
);
impl_attachable_resolver_set!(
R1: DB1: 0,
R2: DB2: 1,
R3: DB3: 2,
R4: DB4: 3,
R5: DB5: 4,
R6: DB6: 5,
R7: DB7: 6
);
impl_attachable_resolver_set!(
R1: DB1: 0,
R2: DB2: 1,
R3: DB3: 2,
R4: DB4: 3,
R5: DB5: 4,
R6: DB6: 5,
R7: DB7: 6,
R8: DB8: 7
);
#[cfg(test)]
mod tests {
use super::{
assert_rewind_window_safety, drain_single_tip_updates, Anchor, AttachableResolver,
AttachableResolverSet, CoordinatorAction, CoordinatorState, DatabaseSet, ManagedDb,
StateSyncDb, StateSyncSet, SyncEngineConfig, TipUpdate, MAX_CHANNEL_DRAIN_PER_TICK,
};
use crate::stateful::tests::mocks::{anchor as mock_anchor, TestMerkleized, TestUnmerkleized};
use commonware_cryptography::sha256;
use commonware_macros::select;
use commonware_runtime::{
deterministic, reschedule, Clock, Runner as _, Spawner as _, Supervisor as _,
};
use commonware_utils::{
channel::{mpsc, oneshot, ring},
sync::AsyncRwLock,
};
use futures::{pin_mut, FutureExt, SinkExt};
use std::{
convert::Infallible,
num::{NonZeroU64, NonZeroUsize},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
#[derive(Default)]
struct TestDb;
#[derive(Default)]
struct OneStepRewindDb;
#[derive(Default)]
struct ThreeStepRewindDb;
struct CountingRewindDb {
current_target: u64,
rewind_count: usize,
}
impl<E: Send> ManagedDb<E> for TestDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = ();
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
Ok(Self)
}
async fn new_batch(db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
let _guard = db.read().await;
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for OneStepRewindDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = ();
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
Ok(Self)
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
fn max_rewind_depth() -> Option<usize> {
Some(1)
}
}
impl<E: Send> ManagedDb<E> for ThreeStepRewindDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = ();
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
Ok(Self)
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
fn max_rewind_depth() -> Option<usize> {
Some(3)
}
}
impl<E: Send> ManagedDb<E> for CountingRewindDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("CountingRewindDb is constructed directly in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.current_target
}
async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Self::Error> {
self.current_target = target;
self.rewind_count += 1;
Ok(())
}
}
struct BlockingFinalizeDb {
started: Option<oneshot::Sender<()>>,
release: Option<oneshot::Receiver<()>>,
}
impl BlockingFinalizeDb {
fn new(started: oneshot::Sender<()>, release: oneshot::Receiver<()>) -> Self {
Self {
started: Some(started),
release: Some(release),
}
}
}
#[derive(Debug)]
struct TestFinalizeError;
struct FailingFinalizeDb;
struct SlowSyncDb {
final_target: u64,
}
struct RejectDuplicateTargetSyncDb {
final_target: u64,
}
struct StaleReachedSyncDb {
final_target: u64,
}
struct FastSyncDb {
final_target: u64,
}
struct ImmediateStateSyncDb;
struct FailingStateSyncDb;
struct MismatchedTargetSyncDb {
final_target: u64,
}
struct FinishClosedSyncDb {
final_target: u64,
}
struct ObservedSlowSyncDb {
final_target: u64,
}
struct ObservedFastSyncDb {
final_target: u64,
}
struct DistinctObservedFastSyncDb {
final_target: u64,
}
#[derive(Clone)]
struct SlowSyncController {
release: Arc<AtomicBool>,
}
#[derive(Clone)]
struct FastSyncObserver {
ready: Arc<AtomicBool>,
update_count: Arc<AtomicUsize>,
}
impl<E: Send> ManagedDb<E> for FailingFinalizeDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = TestFinalizeError;
type Config = ();
type SyncTarget = ();
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
Ok(Self)
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Err(TestFinalizeError)
}
async fn sync_target(&self) -> Self::SyncTarget {}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
#[test]
fn single_db_set_reports_unbounded_rewind_depth() {
let rewind_depth =
<Arc<AsyncRwLock<TestDb>> as DatabaseSet<deterministic::Context>>::max_rewind_depth();
assert_eq!(rewind_depth, None);
}
#[test]
fn single_db_set_reports_one_step_rewind_depth() {
let rewind_depth = <Arc<AsyncRwLock<OneStepRewindDb>> as DatabaseSet<
deterministic::Context,
>>::max_rewind_depth();
assert_eq!(rewind_depth, Some(1));
}
#[test]
fn tuple_db_set_uses_most_restrictive_finite_rewind_depth() {
type DbSet = (
Arc<AsyncRwLock<TestDb>>,
Arc<AsyncRwLock<ThreeStepRewindDb>>,
Arc<AsyncRwLock<OneStepRewindDb>>,
);
let rewind_depth = <DbSet as DatabaseSet<deterministic::Context>>::max_rewind_depth();
assert_eq!(rewind_depth, Some(1));
}
#[test]
fn rewind_window_assertion_accepts_equal_pending_acks_and_rewind_depth() {
assert_rewind_window_safety::<deterministic::Context, Arc<AsyncRwLock<OneStepRewindDb>>>(
NonZeroUsize::new(1).unwrap(),
);
}
#[test]
#[should_panic(expected = "marshal max_pending_acks=2 exceeds database_set.max_rewind_depth=1")]
fn rewind_window_assertion_panics_when_pending_acks_exceed_rewind_depth() {
assert_rewind_window_safety::<deterministic::Context, Arc<AsyncRwLock<OneStepRewindDb>>>(
NonZeroUsize::new(2).unwrap(),
);
}
#[test]
fn tuple_rewind_to_targets_skips_already_aligned_databases() {
deterministic::Runner::default().start(|_context| async move {
type DbSet = (
Arc<AsyncRwLock<CountingRewindDb>>,
Arc<AsyncRwLock<CountingRewindDb>>,
);
let left = Arc::new(AsyncRwLock::new(CountingRewindDb {
current_target: 2,
rewind_count: 0,
}));
let right = Arc::new(AsyncRwLock::new(CountingRewindDb {
current_target: 1,
rewind_count: 0,
}));
let databases: DbSet = (left.clone(), right.clone());
<DbSet as DatabaseSet<deterministic::Context>>::rewind_to_targets(&databases, (1, 1))
.await;
let left = left.read().await;
assert_eq!(left.current_target, 1);
assert_eq!(left.rewind_count, 1);
let right = right.read().await;
assert_eq!(right.current_target, 1);
assert_eq!(right.rewind_count, 0);
});
}
impl<E: Send> ManagedDb<E> for BlockingFinalizeDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = ();
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("BlockingFinalizeDb is constructed directly in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
if let Some(started) = self.started.take() {
let _ = started.send(());
}
if let Some(release) = self.release.take() {
let _ = release.await;
}
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for SlowSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("SlowSyncDb is only constructed through state sync in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.final_target
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for RejectDuplicateTargetSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!(
"RejectDuplicateTargetSyncDb is only constructed through state sync in tests"
)
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.final_target
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for FastSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("FastSyncDb is only constructed through state sync in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.final_target
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for FailingStateSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("FailingStateSyncDb is only constructed through state sync in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
0
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for MismatchedTargetSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("MismatchedTargetSyncDb is only constructed through state sync in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.final_target
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for ImmediateStateSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("ImmediateStateSyncDb is only constructed through state sync in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
0
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for FinishClosedSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("FinishClosedSyncDb is only constructed through state sync in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.final_target
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for ObservedSlowSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("ObservedSlowSyncDb is only constructed through state sync in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.final_target
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for ObservedFastSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("ObservedFastSyncDb is only constructed through state sync in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.final_target
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E: Send> ManagedDb<E> for DistinctObservedFastSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!(
"DistinctObservedFastSyncDb is only constructed through state sync in tests"
)
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.final_target
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E> StateSyncDb<E, Arc<AtomicBool>> for SlowSyncDb
where
E: Send + Clock,
{
type SyncError = Infallible;
async fn sync_db(
context: E,
_config: Self::Config,
release: Arc<AtomicBool>,
target: Self::SyncTarget,
tip_updates: mpsc::Receiver<Self::SyncTarget>,
mut finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
while !release.load(Ordering::SeqCst) {
context.sleep(Duration::from_millis(1)).await;
}
let mut final_target = target;
let mut tip_updates = Some(tip_updates);
loop {
if let Some(reached_target) = reached_target.as_ref() {
if reached_target.send(final_target).await.is_err() {
break;
}
}
context.sleep(Duration::from_millis(1)).await;
if finish.is_none() && tip_updates.is_none() {
break;
}
let finish_signal = finish.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|finish_rx| futures::future::Either::Left(finish_rx.recv()),
);
let update_signal = tip_updates.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|update_rx| futures::future::Either::Left(update_rx.recv()),
);
select! {
_ = finish_signal => {
break;
},
update = update_signal => match update {
Some(update) => {
final_target = update;
}
None => {
tip_updates = None;
if finish.is_none() {
break;
}
}
},
}
}
Ok(Self { final_target })
}
}
impl<E> StateSyncDb<E, Arc<AtomicBool>> for RejectDuplicateTargetSyncDb
where
E: Send + Clock,
{
type SyncError = Infallible;
async fn sync_db(
context: E,
_config: Self::Config,
release: Arc<AtomicBool>,
target: Self::SyncTarget,
mut tip_updates: mpsc::Receiver<Self::SyncTarget>,
mut finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
let mut final_target = target;
while !release.load(Ordering::SeqCst) {
match tip_updates.try_recv() {
Ok(update) => {
assert_ne!(
update, final_target,
"state sync must not send duplicate target updates"
);
final_target = update;
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => break,
}
context.sleep(Duration::from_millis(1)).await;
}
if let Some(reached_target) = reached_target.as_ref() {
let _ = reached_target.send(final_target).await;
}
if let Some(finish_rx) = finish.as_mut() {
let _ = finish_rx.recv().await;
}
Ok(Self { final_target })
}
}
impl<E: Send> ManagedDb<E> for StaleReachedSyncDb {
type Unmerkleized = TestUnmerkleized;
type Merkleized = TestMerkleized;
type Error = Infallible;
type Config = ();
type SyncTarget = u64;
async fn init(_context: E, _config: Self::Config) -> Result<Self, Self::Error> {
unreachable!("StaleReachedSyncDb is only constructed through state sync in tests")
}
async fn new_batch(_db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
TestUnmerkleized
}
fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool {
true
}
async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> {
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
self.final_target
}
async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E> StateSyncDb<E, ()> for StaleReachedSyncDb
where
E: Send + Clock,
{
type SyncError = Infallible;
async fn sync_db(
context: E,
_config: Self::Config,
_resolver: (),
target: Self::SyncTarget,
mut tip_updates: mpsc::Receiver<Self::SyncTarget>,
mut finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
let update = tip_updates.recv().await.expect("expected forwarded tip");
if let Some(reached_target) = reached_target.as_ref() {
let _ = reached_target.send(target).await;
}
let finish_signal = finish.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|finish_rx| futures::future::Either::Left(finish_rx.recv()),
);
select! {
_ = finish_signal => Ok(Self {
final_target: target
}),
_ = context.sleep(Duration::from_millis(10)) => {
if let Some(reached_target) = reached_target.as_ref() {
let _ = reached_target.send(update).await;
}
if let Some(finish_rx) = finish.as_mut() {
let _ = finish_rx.recv().await;
}
Ok(Self {
final_target: update,
})
},
}
}
}
impl<E: Send> StateSyncDb<E, Arc<AtomicBool>> for FastSyncDb {
type SyncError = Infallible;
async fn sync_db(
_context: E,
_config: Self::Config,
done: Arc<AtomicBool>,
target: Self::SyncTarget,
tip_updates: mpsc::Receiver<Self::SyncTarget>,
mut finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
done.store(true, Ordering::SeqCst);
let mut final_target = target;
let mut tip_updates = Some(tip_updates);
loop {
if let Some(reached_target) = reached_target.as_ref() {
if reached_target.send(final_target).await.is_err() {
break;
}
}
if finish.is_none() && tip_updates.is_none() {
break;
}
let finish_signal = finish.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|finish_rx| futures::future::Either::Left(finish_rx.recv()),
);
let update_signal = tip_updates.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|update_rx| futures::future::Either::Left(update_rx.recv()),
);
select! {
_ = finish_signal => {
break;
},
update = update_signal => match update {
Some(update) => {
final_target = update;
}
None => {
tip_updates = None;
if finish.is_none() {
break;
}
}
},
}
}
Ok(Self { final_target })
}
}
#[derive(Debug)]
struct TestSyncError;
#[derive(Debug)]
struct FinishClosedSyncError;
impl<E: Send> StateSyncDb<E, ()> for FailingStateSyncDb {
type SyncError = TestSyncError;
async fn sync_db(
_context: E,
_config: Self::Config,
_resolver: (),
_target: Self::SyncTarget,
_tip_updates: mpsc::Receiver<Self::SyncTarget>,
_finish: Option<mpsc::Receiver<()>>,
_reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
Err(TestSyncError)
}
}
impl<E: Send> StateSyncDb<E, ()> for ImmediateStateSyncDb {
type SyncError = Infallible;
async fn sync_db(
_context: E,
_config: Self::Config,
_resolver: (),
_target: Self::SyncTarget,
_tip_updates: mpsc::Receiver<Self::SyncTarget>,
_finish: Option<mpsc::Receiver<()>>,
_reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
Ok(Self)
}
}
impl<E: Send> StateSyncDb<E, ()> for MismatchedTargetSyncDb {
type SyncError = Infallible;
async fn sync_db(
_context: E,
_config: Self::Config,
_resolver: (),
target: Self::SyncTarget,
_tip_updates: mpsc::Receiver<Self::SyncTarget>,
mut finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
if let Some(reached_target) = reached_target.as_ref() {
let _ = reached_target.send(target).await;
}
if let Some(finish_rx) = finish.as_mut() {
let _ = finish_rx.recv().await;
}
Ok(Self {
final_target: target + 1,
})
}
}
impl<E: Send> StateSyncDb<E, ()> for FinishClosedSyncDb {
type SyncError = FinishClosedSyncError;
async fn sync_db(
_context: E,
_config: Self::Config,
_resolver: (),
target: Self::SyncTarget,
_tip_updates: mpsc::Receiver<Self::SyncTarget>,
mut finish: Option<mpsc::Receiver<()>>,
_reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
let Some(finish_rx) = finish.as_mut() else {
panic!("finish receiver should be provided");
};
match finish_rx.recv().await {
Some(()) => Ok(Self {
final_target: target,
}),
None => Err(FinishClosedSyncError),
}
}
}
impl<E> StateSyncDb<E, SlowSyncController> for ObservedSlowSyncDb
where
E: Send + Clock,
{
type SyncError = Infallible;
async fn sync_db(
context: E,
_config: Self::Config,
controller: SlowSyncController,
target: Self::SyncTarget,
tip_updates: mpsc::Receiver<Self::SyncTarget>,
mut finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
while !controller.release.load(Ordering::SeqCst) {
context.sleep(Duration::from_millis(1)).await;
}
let mut final_target = target;
let mut tip_updates = Some(tip_updates);
let mut reported_target = None;
let mut observed_update = false;
loop {
if let Some(update_rx) = tip_updates.as_mut() {
let mut drained = 0usize;
loop {
match update_rx.try_recv() {
Ok(update) => {
drained += 1;
final_target = update;
observed_update = true;
reported_target = None;
if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) {
reschedule().await;
}
}
Err(mpsc::error::TryRecvError::Empty) => {
break;
}
Err(mpsc::error::TryRecvError::Disconnected) => {
tip_updates = None;
break;
}
}
}
}
if observed_update && reported_target != Some(final_target) {
if let Some(reached_target) = reached_target.as_ref() {
if reached_target.send(final_target).await.is_err() {
break;
}
}
reported_target = Some(final_target);
}
if finish.is_none() && tip_updates.is_none() {
break;
}
let finish_signal = finish.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|finish_rx| futures::future::Either::Left(finish_rx.recv()),
);
let update_signal = tip_updates.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|update_rx| futures::future::Either::Left(update_rx.recv()),
);
select! {
_ = finish_signal => {
break;
},
update = update_signal => match update {
Some(update) => {
final_target = update;
observed_update = true;
reported_target = None;
}
None => {
tip_updates = None;
if finish.is_none() {
break;
}
}
},
}
}
Ok(Self { final_target })
}
}
impl<E: Send> StateSyncDb<E, FastSyncObserver> for ObservedFastSyncDb {
type SyncError = Infallible;
async fn sync_db(
_context: E,
_config: Self::Config,
observer: FastSyncObserver,
target: Self::SyncTarget,
tip_updates: mpsc::Receiver<Self::SyncTarget>,
mut finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
let mut final_target = target;
let mut tip_updates = Some(tip_updates);
let mut reported_target = None;
observer.ready.store(true, Ordering::SeqCst);
loop {
if reported_target != Some(final_target) {
if let Some(reached_target) = reached_target.as_ref() {
if reached_target.send(final_target).await.is_err() {
break;
}
}
reported_target = Some(final_target);
}
if finish.is_none() && tip_updates.is_none() {
break;
}
let finish_signal = finish.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|finish_rx| futures::future::Either::Left(finish_rx.recv()),
);
let update_signal = tip_updates.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|update_rx| futures::future::Either::Left(update_rx.recv()),
);
select! {
_ = finish_signal => {
break;
},
update = update_signal => match update {
Some(update) => {
observer.update_count.fetch_add(1, Ordering::SeqCst);
final_target = update;
reported_target = None;
}
None => {
tip_updates = None;
if finish.is_none() {
break;
}
}
},
}
}
Ok(Self { final_target })
}
}
impl<E: Send> StateSyncDb<E, FastSyncObserver> for DistinctObservedFastSyncDb {
type SyncError = Infallible;
async fn sync_db(
_context: E,
_config: Self::Config,
observer: FastSyncObserver,
target: Self::SyncTarget,
tip_updates: mpsc::Receiver<Self::SyncTarget>,
mut finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
_sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
let mut final_target = target;
let mut tip_updates = Some(tip_updates);
let mut reported_target = None;
observer.ready.store(true, Ordering::SeqCst);
loop {
if reported_target != Some(final_target) {
if let Some(reached_target) = reached_target.as_ref() {
if reached_target.send(final_target).await.is_err() {
break;
}
}
reported_target = Some(final_target);
}
if finish.is_none() && tip_updates.is_none() {
break;
}
let finish_signal = finish.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|finish_rx| futures::future::Either::Left(finish_rx.recv()),
);
let update_signal = tip_updates.as_mut().map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|update_rx| futures::future::Either::Left(update_rx.recv()),
);
select! {
_ = finish_signal => {
break;
},
update = update_signal => match update {
Some(update) => {
observer.update_count.fetch_add(1, Ordering::SeqCst);
if update != final_target {
final_target = update;
reported_target = None;
}
}
None => {
tip_updates = None;
if finish.is_none() {
break;
}
}
},
}
}
Ok(Self { final_target })
}
}
#[test]
fn tuple_new_batches_queues_reads_concurrently() {
deterministic::Runner::default().start(|_context| async move {
let db1 = Arc::new(AsyncRwLock::new(TestDb));
let db2 = Arc::new(AsyncRwLock::new(TestDb));
let databases = (db1.clone(), db2.clone());
let writer1 = db1.write().await;
let writer2 = db2.write().await;
let new_batches =
<(Arc<AsyncRwLock<TestDb>>, Arc<AsyncRwLock<TestDb>>) as DatabaseSet<
deterministic::Context,
>>::new_batches(&databases);
pin_mut!(new_batches);
assert!(new_batches.as_mut().now_or_never().is_none());
drop(writer2);
{
let writer2_again = db2.write();
pin_mut!(writer2_again);
assert!(
writer2_again.as_mut().now_or_never().is_none(),
"tuple new_batches should queue reads for all databases concurrently"
);
}
drop(writer1);
let _ = new_batches.await;
});
}
#[test]
fn tuple_finalize_runs_databases_in_parallel() {
deterministic::Runner::default().start(|_context| async move {
let (started1_tx, started1_rx) = oneshot::channel();
let (started2_tx, started2_rx) = oneshot::channel();
let (release1_tx, release1_rx) = oneshot::channel();
let (release2_tx, release2_rx) = oneshot::channel();
let databases = (
Arc::new(AsyncRwLock::new(BlockingFinalizeDb::new(
started1_tx,
release1_rx,
))),
Arc::new(AsyncRwLock::new(BlockingFinalizeDb::new(
started2_tx,
release2_rx,
))),
);
let finalize = <(
Arc<AsyncRwLock<BlockingFinalizeDb>>,
Arc<AsyncRwLock<BlockingFinalizeDb>>,
) as DatabaseSet<deterministic::Context>>::finalize(
&databases,
(TestMerkleized, TestMerkleized),
);
pin_mut!(finalize);
assert!(finalize.as_mut().now_or_never().is_none());
let started1 = started1_rx;
let started2 = started2_rx;
pin_mut!(started1);
pin_mut!(started2);
assert!(matches!(started1.as_mut().now_or_never(), Some(Ok(()))));
assert!(
matches!(started2.as_mut().now_or_never(), Some(Ok(()))),
"tuple finalize should start all database finalizations concurrently"
);
let _ = release1_tx.send(());
let _ = release2_tx.send(());
finalize.await;
});
}
#[test]
#[should_panic(
expected = "database finalize failed (index 1, type commonware_glue::stateful::db::tests::FailingFinalizeDb)"
)]
fn tuple_finalize_panic_identifies_failing_database() {
deterministic::Runner::default().start(|_context| async move {
let databases = (
Arc::new(AsyncRwLock::new(TestDb)),
Arc::new(AsyncRwLock::new(FailingFinalizeDb)),
);
<(
Arc<AsyncRwLock<TestDb>>,
Arc<AsyncRwLock<FailingFinalizeDb>>,
) as DatabaseSet<deterministic::Context>>::finalize(
&databases,
(TestMerkleized, TestMerkleized),
)
.await;
});
}
type TestAnchor = Anchor<sha256::Digest>;
fn anchor(n: u64) -> TestAnchor {
mock_anchor(n, n as u8)
}
#[test]
fn single_tip_update_drain_keeps_highest_recorded_target() {
deterministic::Runner::default().start(|_context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
let (target_tx, mut target_rx) = mpsc::channel(4);
let (newer_update, newer_observed) = TipUpdate::with_observation(anchor(2), 2u64);
let (older_update, older_observed) = TipUpdate::with_observation(anchor(1), 1u64);
let _ = tip_tx.send(newer_update).await;
let _ = tip_tx.send(older_update).await;
let mut tip_updates = Some(tip_rx);
let mut current_anchor = anchor(0);
let mut current_target = 0u64;
assert!(
drain_single_tip_updates(
&mut tip_updates,
&target_tx,
&mut current_anchor,
&mut current_target,
)
.await
);
newer_observed
.await
.expect("newer update should be observed");
older_observed
.await
.expect("older update should also be observed");
assert_eq!(current_anchor, anchor(2));
assert_eq!(current_target, 2);
assert_eq!(target_rx.recv().await, Some(2));
assert!(matches!(
target_rx.try_recv(),
Err(mpsc::error::TryRecvError::Empty)
));
});
}
#[test]
fn single_tip_update_drain_advances_anchor_without_duplicate_target() {
deterministic::Runner::default().start(|_context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
let (target_tx, mut target_rx) = mpsc::channel(1);
let (update, observed) = TipUpdate::with_observation(anchor(3), 7u64);
let _ = tip_tx.send(update).await;
let mut tip_updates = Some(tip_rx);
let mut current_anchor = anchor(2);
let mut current_target = 7u64;
assert!(
drain_single_tip_updates(
&mut tip_updates,
&target_tx,
&mut current_anchor,
&mut current_target,
)
.await
);
observed.await.expect("update should be observed");
assert_eq!(current_anchor, anchor(3));
assert_eq!(current_target, 7);
assert!(matches!(
target_rx.try_recv(),
Err(mpsc::error::TryRecvError::Empty)
));
});
}
#[test]
fn single_state_sync_handles_closed_tip_updates_channel() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let (tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
let release = Arc::new(AtomicBool::new(false));
let release_for_sync = release.clone();
let sync = context.child("single_state_sync_closed_tip_updates").spawn(
move |context| async move {
<Arc<AsyncRwLock<SlowSyncDb>> as StateSyncSet<
deterministic::Context,
Arc<AtomicBool>,
sha256::Digest,
>>::sync(
context,
(),
release_for_sync,
anchor(0),
0,
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(1).unwrap(),
max_retained_roots: 0,
},
)
.await
.expect("single state sync should succeed")
},
);
drop(tip_tx);
context.sleep(Duration::from_millis(1)).await;
release.store(true, Ordering::SeqCst);
let (_database, converged_anchor) = sync.await.expect("sync task should complete");
assert_eq!(converged_anchor, anchor(0));
});
}
#[test]
fn single_state_sync_preserves_db_error_when_target_channel_closes() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
let _ = tip_tx.send(TipUpdate::new(anchor(1), 1u64)).await;
let result = <Arc<AsyncRwLock<FailingStateSyncDb>> as StateSyncSet<
deterministic::Context,
(),
sha256::Digest,
>>::sync(
context,
(),
(),
anchor(0),
0,
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(1).unwrap(),
max_retained_roots: 0,
},
)
.await;
assert!(matches!(result, Err(TestSyncError)));
});
}
#[test]
fn single_state_sync_ignores_backward_tip_updates() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
let release = Arc::new(AtomicBool::new(true));
let resolver = SlowSyncController {
release: release.clone(),
};
let sync = context
.child("single_state_sync_ignores_backward_tip_updates")
.spawn(move |context| async move {
<Arc<AsyncRwLock<ObservedSlowSyncDb>> as StateSyncSet<
deterministic::Context,
SlowSyncController,
sha256::Digest,
>>::sync(
context,
(),
resolver,
anchor(0),
0,
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(4).unwrap(),
max_retained_roots: 0,
},
)
.await
.expect("single state sync should succeed")
});
let _ = tip_tx.send(TipUpdate::new(anchor(2), 2)).await;
let _ = tip_tx.send(TipUpdate::new(anchor(1), 1)).await;
drop(tip_tx);
let (database, converged_anchor) = sync.await.expect("sync task should complete");
let final_target = database.read().await.final_target;
assert_eq!(
final_target, 2,
"single-db sync target must never move backward"
);
assert_eq!(
converged_anchor,
anchor(2),
"converged anchor must remain on the highest seen tip"
);
});
}
#[test]
fn single_state_sync_advances_anchor_without_duplicate_target_update() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
let release = Arc::new(AtomicBool::new(false));
let release_for_sync = release.clone();
let sync = context.child("single_state_sync_noop_target_update").spawn(
move |context| async move {
<Arc<AsyncRwLock<RejectDuplicateTargetSyncDb>> as StateSyncSet<
deterministic::Context,
Arc<AtomicBool>,
sha256::Digest,
>>::sync(
context,
(),
release_for_sync,
anchor(7),
7,
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(4).unwrap(),
max_retained_roots: 0,
},
)
.await
.expect("single state sync should succeed")
},
);
let (update, observed) = TipUpdate::with_observation(anchor(9), 7);
let _ = tip_tx.send(update).await;
observed
.await
.expect("single-db coordinator should record noop target update");
release.store(true, Ordering::SeqCst);
drop(tip_tx);
let (database, converged_anchor) = sync.await.expect("sync task should complete");
assert_eq!(database.read().await.final_target, 7);
assert_eq!(converged_anchor, anchor(9));
});
}
#[test]
fn single_state_sync_ignores_stale_reached_after_forwarded_tip() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
let sync =
context
.child("single_state_sync_stale_reached")
.spawn(move |context| async move {
<Arc<AsyncRwLock<StaleReachedSyncDb>> as StateSyncSet<
deterministic::Context,
(),
sha256::Digest,
>>::sync(
context,
(),
(),
anchor(0),
0,
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(4).unwrap(),
max_retained_roots: 0,
},
)
.await
.expect("single state sync should succeed")
});
let _ = tip_tx.send(TipUpdate::new(anchor(2), 2)).await;
let (database, converged_anchor) = sync.await.expect("sync task should complete");
let final_target = database.read().await.final_target;
assert_eq!(
final_target, 2,
"single-db sync must not finish on a stale reached target",
);
assert_eq!(
converged_anchor,
anchor(2),
"converged anchor must match the target the database reached",
);
});
}
#[test]
fn tuple_state_sync_converges_before_finish() {
deterministic::Runner::default().start(|context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
let slow_release = Arc::new(AtomicBool::new(false));
let fast_done = Arc::new(AtomicBool::new(false));
let slow_release_for_sync = slow_release.clone();
let fast_done_for_sync = fast_done.clone();
let sync = context
.child("tuple_state_sync")
.spawn(move |context| async move {
<(Arc<AsyncRwLock<SlowSyncDb>>, Arc<AsyncRwLock<FastSyncDb>>) as StateSyncSet<
deterministic::Context,
(Arc<AtomicBool>, Arc<AtomicBool>),
sha256::Digest,
>>::sync(
context,
((), ()),
(slow_release_for_sync, fast_done_for_sync),
anchor(0),
(0, 0),
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(4).unwrap(),
max_retained_roots: 0,
},
)
.await
.expect("tuple state sync should succeed")
});
while !fast_done.load(Ordering::SeqCst) {
context.sleep(Duration::from_millis(1)).await;
}
let _ = tip_tx.send(TipUpdate::new(anchor(1), (1, 1))).await;
let _ = tip_tx.send(TipUpdate::new(anchor(2), (2, 2))).await;
slow_release.store(true, Ordering::SeqCst);
drop(tip_tx);
let (synced, converged_anchor) = sync.await.expect("sync task should complete");
let slow_target = synced.0.read().await.final_target;
let fast_target = synced.1.read().await.final_target;
assert_eq!(
slow_target, fast_target,
"all databases should finish on the same converged target set"
);
assert_eq!(
converged_anchor.height.get(),
slow_target,
"returned anchor height should match the converged generation"
);
});
}
#[test]
fn tuple_state_sync_ignores_backward_tip_updates() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(8).unwrap());
let slow_release = Arc::new(AtomicBool::new(false));
let fast_done = Arc::new(AtomicBool::new(false));
let slow_release_for_sync = slow_release.clone();
let fast_done_for_sync = fast_done.clone();
let sync = context
.child("tuple_state_sync_ignores_backward_tip_updates")
.spawn(move |context| async move {
<(Arc<AsyncRwLock<SlowSyncDb>>, Arc<AsyncRwLock<FastSyncDb>>) as StateSyncSet<
deterministic::Context,
(Arc<AtomicBool>, Arc<AtomicBool>),
sha256::Digest,
>>::sync(
context,
((), ()),
(slow_release_for_sync, fast_done_for_sync),
anchor(0),
(0, 0),
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(8).unwrap(),
max_retained_roots: 0,
},
)
.await
.expect("tuple state sync should succeed")
});
while !fast_done.load(Ordering::SeqCst) {
context.sleep(Duration::from_millis(1)).await;
}
let _ = tip_tx.send(TipUpdate::new(anchor(2), (2, 2))).await;
let _ = tip_tx.send(TipUpdate::new(anchor(1), (1, 1))).await;
drop(tip_tx);
context.sleep(Duration::from_millis(1)).await;
slow_release.store(true, Ordering::SeqCst);
let (synced, converged_anchor) = sync.await.expect("sync task should complete");
let slow_target = synced.0.read().await.final_target;
let fast_target = synced.1.read().await.final_target;
assert_eq!(
slow_target, 2,
"slow database target must never move backward"
);
assert_eq!(
fast_target, 2,
"fast database target must never move backward"
);
assert_eq!(
converged_anchor,
anchor(2),
"converged anchor must remain on the highest seen tip"
);
});
}
#[test]
fn tuple_state_sync_rejects_database_target_mismatch() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
let fast_done = Arc::new(AtomicBool::new(false));
let result = <(
Arc<AsyncRwLock<MismatchedTargetSyncDb>>,
Arc<AsyncRwLock<FastSyncDb>>,
) as StateSyncSet<
deterministic::Context,
((), Arc<AtomicBool>),
sha256::Digest,
>>::sync(
context,
((), ()),
((), fast_done),
anchor(7),
(7, 7),
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(1).unwrap(),
max_retained_roots: 0,
},
)
.await;
let err = match result {
Ok(_) => panic!("tuple state sync should reject a mismatched database target"),
Err(err) => err,
};
assert!(
err.contains("database targets do not match"),
"error should identify the target mismatch, got: {err}"
);
});
}
#[test]
fn tuple_state_sync_returns_db_error_instead_of_panicking_when_anchor_missing() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
let result = <(
Arc<AsyncRwLock<ImmediateStateSyncDb>>,
Arc<AsyncRwLock<FailingStateSyncDb>>,
) as StateSyncSet<deterministic::Context, ((), ()), sha256::Digest>>::sync(
context,
((), ()),
((), ()),
anchor(0),
(0, 0),
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(1).unwrap(),
max_retained_roots: 0,
},
)
.await;
let err = match result {
Ok(_) => panic!("tuple state sync should return the database sync error"),
Err(err) => err,
};
assert!(
err.contains("state sync failed (index 1, db"),
"error should include failing database index: {err}"
);
assert!(
err.contains("FailingStateSyncDb"),
"error should include failing database type: {err}"
);
});
}
#[test]
fn tuple_state_sync_returns_db_error_when_other_database_waits_for_finish() {
deterministic::Runner::timed(Duration::from_secs(1)).start(|context| async move {
let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
let release = Arc::new(AtomicBool::new(true));
let result = <(
Arc<AsyncRwLock<SlowSyncDb>>,
Arc<AsyncRwLock<FailingStateSyncDb>>,
) as StateSyncSet<
deterministic::Context,
(Arc<AtomicBool>, ()),
sha256::Digest,
>>::sync(
context,
((), ()),
(release, ()),
anchor(0),
(0, 0),
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(1).unwrap(),
max_retained_roots: 0,
},
)
.await;
let err = match result {
Ok(_) => panic!("tuple state sync should return the database sync error"),
Err(err) => err,
};
assert!(
err.contains("state sync failed (index 1, db"),
"error should include failing database index: {err}"
);
assert!(
err.contains("FailingStateSyncDb"),
"error should include failing database type: {err}"
);
});
}
#[test]
fn tuple_state_sync_preserves_original_failure_when_peer_finish_channel_closes() {
deterministic::Runner::timed(Duration::from_secs(1)).start(|context| async move {
let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap());
let result = <(
Arc<AsyncRwLock<FinishClosedSyncDb>>,
Arc<AsyncRwLock<FailingStateSyncDb>>,
) as StateSyncSet<deterministic::Context, ((), ()), sha256::Digest>>::sync(
context,
((), ()),
((), ()),
anchor(0),
(0, 0),
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(1).unwrap(),
max_retained_roots: 0,
},
)
.await;
let err = match result {
Ok(_) => panic!("tuple state sync should return the database sync error"),
Err(err) => err,
};
assert!(
err.contains("state sync failed (index 1, db"),
"error should include failing database index, got: {err}",
);
assert!(
err.contains("FailingStateSyncDb"),
"error should include failing database type, got: {err}",
);
});
}
#[test]
fn coordinator_rejects_stale_reached_event_from_older_generation() {
let mut state = CoordinatorState::new(2, anchor(0), (0u64, 0u64));
state.record_tip_update(anchor(1), (1, 1));
match state.next_action() {
CoordinatorAction::Dispatch {
generation,
targets: (left, right),
} => {
assert_eq!(generation, 1, "coordinator should dispatch generation 1");
assert_eq!((left, right), (1, 1));
}
CoordinatorAction::Wait => panic!("coordinator should dispatch the newer tip"),
CoordinatorAction::Converged { anchor, .. } => {
panic!("coordinator converged too early at {anchor:?}")
}
}
state.record_reached(1, 0);
state.record_reached(0, 1);
match state.next_action() {
CoordinatorAction::Wait => {}
CoordinatorAction::Dispatch { targets, .. } => {
panic!(
"coordinator should wait for a fresh reached event, got dispatch {targets:?}"
)
}
CoordinatorAction::Converged { anchor, .. } => {
panic!("stale reached event must not allow convergence at {anchor:?}")
}
}
}
#[test]
fn coordinator_dispatches_pending_tip_before_converging() {
let mut state = CoordinatorState::new(2, anchor(0), (0u64, 0u64));
state.record_tip_update(anchor(1), (1, 1));
match state.next_action() {
CoordinatorAction::Dispatch {
generation,
targets: (left, right),
} => {
assert_eq!(generation, 1, "coordinator should dispatch generation 1");
assert_eq!((left, right), (1, 1));
}
CoordinatorAction::Wait => panic!("coordinator should dispatch the newer tip"),
CoordinatorAction::Converged { anchor, .. } => {
panic!("coordinator converged too early at {anchor:?}")
}
}
state.record_reached(0, 1);
state.record_reached(1, 1);
state.record_tip_update(anchor(2), (2, 2));
match state.next_action() {
CoordinatorAction::Dispatch {
generation,
targets: (left, right),
} => {
assert_eq!(generation, 2, "coordinator should advance to generation 2");
assert_eq!((left, right), (2, 2));
}
CoordinatorAction::Wait => panic!("coordinator should dispatch the pending tip"),
CoordinatorAction::Converged { anchor, .. } => {
panic!("coordinator should not converge with a pending tip: {anchor:?}")
}
}
}
#[test]
fn tuple_state_sync_stops_updates_after_reached_until_regroup() {
deterministic::Runner::default().start(|context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(32).unwrap());
let slow_release = Arc::new(AtomicBool::new(true));
let fast_ready = Arc::new(AtomicBool::new(false));
let fast_update_count = Arc::new(AtomicUsize::new(0));
let slow_resolver = SlowSyncController {
release: slow_release.clone(),
};
let fast_resolver = FastSyncObserver {
ready: fast_ready.clone(),
update_count: fast_update_count.clone(),
};
let sync = context.child("tuple_state_sync_algorithm").spawn(
move |context| async move {
<(
Arc<AsyncRwLock<ObservedSlowSyncDb>>,
Arc<AsyncRwLock<ObservedFastSyncDb>>,
) as StateSyncSet<
deterministic::Context,
(SlowSyncController, FastSyncObserver),
sha256::Digest,
>>::sync(
context,
((), ()),
(slow_resolver, fast_resolver),
anchor(0),
(0, 0),
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(1).unwrap(),
max_retained_roots: 0,
},
)
.await
.expect("tuple state sync should succeed")
},
);
while !fast_ready.load(Ordering::SeqCst) {
context.sleep(Duration::from_millis(1)).await;
}
for target in 1..=16u64 {
let _ = tip_tx.send(TipUpdate::new(anchor(target), (target, target))).await;
}
drop(tip_tx);
let (synced, converged_anchor) = sync.await.expect("sync task should complete");
let slow_target = synced.0.read().await.final_target;
let fast_target = synced.1.read().await.final_target;
assert_eq!(
slow_target, fast_target,
"all databases should finish on the same converged target set"
);
assert_eq!(
converged_anchor.height.get(), slow_target,
"returned anchor height should match the converged generation"
);
assert_eq!(
fast_update_count.load(Ordering::SeqCst),
1,
"a reached database must not receive tip updates before regroup; only regroup retarget should be observed"
);
});
}
#[test]
fn tuple_state_sync_allows_noop_database_while_other_catches_up() {
deterministic::Runner::default().start(|context| async move {
let (tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
let slow_release = Arc::new(AtomicBool::new(false));
let fast_ready = Arc::new(AtomicBool::new(false));
let fast_update_count = Arc::new(AtomicUsize::new(0));
let target = 7u64;
let sync = context.child("tuple_state_sync_noop").spawn({
let slow_resolver = slow_release.clone();
let fast_resolver = FastSyncObserver {
ready: fast_ready.clone(),
update_count: fast_update_count.clone(),
};
move |context| async move {
<(
Arc<AsyncRwLock<SlowSyncDb>>,
Arc<AsyncRwLock<ObservedFastSyncDb>>,
) as StateSyncSet<
deterministic::Context,
(Arc<AtomicBool>, FastSyncObserver),
sha256::Digest,
>>::sync(
context,
((), ()),
(slow_resolver, fast_resolver),
anchor(target),
(target, target),
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(1).unwrap(),
max_retained_roots: 0,
},
)
.await
.expect("tuple state sync should succeed")
}
});
while !fast_ready.load(Ordering::SeqCst) {
context.sleep(Duration::from_millis(1)).await;
}
drop(tip_tx);
slow_release.store(true, Ordering::SeqCst);
let (synced, converged_anchor) = sync.await.expect("sync task should complete");
let slow_target = synced.0.read().await.final_target;
let fast_target = synced.1.read().await.final_target;
assert_eq!(slow_target, target);
assert_eq!(fast_target, target);
assert_eq!(converged_anchor, anchor(target));
assert_eq!(
fast_update_count.load(Ordering::SeqCst),
0,
"already-at-target database should not receive tip updates"
);
});
}
#[test]
fn tuple_state_sync_regroup_completes_when_database_target_is_unchanged() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap());
let slow_release = Arc::new(AtomicBool::new(false));
let fast_ready = Arc::new(AtomicBool::new(false));
let fast_update_count = Arc::new(AtomicUsize::new(0));
let sync = context
.child("tuple_state_sync_regroup_unchanged_target")
.spawn({
let slow_resolver = slow_release.clone();
let fast_resolver = FastSyncObserver {
ready: fast_ready.clone(),
update_count: fast_update_count.clone(),
};
move |context| async move {
<(
Arc<AsyncRwLock<SlowSyncDb>>,
Arc<AsyncRwLock<DistinctObservedFastSyncDb>>,
) as StateSyncSet<
deterministic::Context,
(Arc<AtomicBool>, FastSyncObserver),
sha256::Digest,
>>::sync(
context,
((), ()),
(slow_resolver, fast_resolver),
anchor(0),
(0, 7),
tip_rx,
SyncEngineConfig {
fetch_batch_size: NonZeroU64::new(1).unwrap(),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NonZeroUsize::new(4).unwrap(),
max_retained_roots: 0,
},
)
.await
.expect("tuple state sync should succeed")
}
});
while !fast_ready.load(Ordering::SeqCst) {
context.sleep(Duration::from_millis(1)).await;
}
let _ = tip_tx.send(TipUpdate::new(anchor(9), (9, 7))).await;
context.sleep(Duration::from_millis(1)).await;
slow_release.store(true, Ordering::SeqCst);
drop(tip_tx);
let (synced, converged_anchor) = sync.await.expect("sync task should complete");
let slow_target = synced.0.read().await.final_target;
let fast_target = synced.1.read().await.final_target;
assert_eq!(slow_target, 9);
assert_eq!(fast_target, 7);
assert_eq!(converged_anchor, anchor(9));
assert_eq!(
fast_update_count.load(Ordering::SeqCst),
0,
"the unchanged-target database should not receive duplicate target updates",
);
});
}
#[derive(Default)]
struct AttachDb1;
#[derive(Default)]
struct AttachDb2;
#[derive(Clone)]
struct RecordingResolver {
id: &'static str,
log: Arc<commonware_utils::sync::Mutex<Vec<&'static str>>>,
}
impl RecordingResolver {
fn new(
id: &'static str,
log: Arc<commonware_utils::sync::Mutex<Vec<&'static str>>>,
) -> Self {
Self { id, log }
}
}
impl<DB: Send + Sync + 'static> AttachableResolver<DB> for RecordingResolver {
async fn attach_database(&self, _db: Arc<AsyncRwLock<DB>>) {
self.log.lock().push(self.id);
}
}
#[test]
fn single_db_attach_calls_single_resolver() {
deterministic::Runner::default().start(|_| async move {
let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new()));
let resolver = RecordingResolver::new("db1", log.clone());
let db = Arc::new(AsyncRwLock::new(AttachDb1));
resolver.attach_databases(db).await;
assert_eq!(&*log.lock(), &["db1"]);
});
}
#[test]
fn tuple_attach_is_index_stable() {
deterministic::Runner::default().start(|_| async move {
let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new()));
let resolvers = (
RecordingResolver::new("resolver_0", log.clone()),
RecordingResolver::new("resolver_1", log.clone()),
);
let databases = (
Arc::new(AsyncRwLock::new(AttachDb1)),
Arc::new(AsyncRwLock::new(AttachDb2)),
);
resolvers.attach_databases(databases).await;
assert_eq!(&*log.lock(), &["resolver_0", "resolver_1"]);
});
}
#[test]
fn heterogeneous_tuple_attach_compiles() {
deterministic::Runner::default().start(|_| async move {
let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new()));
let resolvers = (
RecordingResolver::new("db1", log.clone()),
RecordingResolver::new("db2", log.clone()),
);
let databases = (
Arc::new(AsyncRwLock::new(AttachDb1)),
Arc::new(AsyncRwLock::new(AttachDb2)),
);
resolvers.attach_databases(databases).await;
assert_eq!(&*log.lock(), &["db1", "db2"]);
});
}
}