use crate::{
journal::contiguous::Contiguous,
merkle::{mmr, mmr::Location},
qmdb::{
self,
any::traits::DbAny,
operation::Operation as OperationTrait,
sync::{
self,
engine::{Config, NextStep},
resolver::{self, FetchResult, Resolver},
Engine, Target,
},
},
Persistable,
};
use commonware_codec::Encode;
use commonware_cryptography::sha256::Digest;
use commonware_macros::select;
use commonware_runtime::{deterministic, BufferPooler, Metrics, Runner as _};
use commonware_utils::{
channel::{mpsc, oneshot},
non_empty_range,
sync::AsyncRwLock,
NZU64,
};
use futures::{pin_mut, FutureExt};
use rand::RngCore as _;
use std::{num::NonZeroU64, sync::Arc};
pub(crate) type DbOf<H> = <H as SyncTestHarness>::Db;
pub(crate) type OpOf<H> = <DbOf<H> as qmdb::sync::Database>::Op;
pub(crate) type ConfigOf<H> = <DbOf<H> as qmdb::sync::Database>::Config;
pub(crate) type JournalOf<H> = <DbOf<H> as qmdb::sync::Database>::Journal;
pub(crate) trait Destructible {
fn destroy(
self,
) -> impl std::future::Future<Output = Result<(), qmdb::Error<crate::mmr::Family>>> + Send;
}
impl Destructible for crate::mmr::journaled::Mmr<deterministic::Context, Digest> {
async fn destroy(self) -> Result<(), qmdb::Error<crate::mmr::Family>> {
self.destroy().await.map_err(qmdb::Error::Merkle)
}
}
pub(crate) trait FromSyncTestable: qmdb::sync::Database {
type Mmr: Destructible + Send;
fn into_log_components(self) -> (Self::Mmr, Self::Journal);
fn pinned_nodes_at(
&self,
loc: Location,
) -> impl std::future::Future<Output = Vec<Self::Digest>> + Send;
}
pub(crate) trait SyncTestHarness: Sized + 'static {
type Db: qmdb::sync::Database<Context = deterministic::Context, Digest = Digest, Config: Clone>
+ DbAny<mmr::Family, Key = Digest, Digest = Digest>;
fn sync_target_root(db: &Self::Db) -> Digest;
fn config(suffix: &str, pooler: &impl BufferPooler) -> ConfigOf<Self>;
fn create_ops(n: usize) -> Vec<OpOf<Self>>;
fn create_ops_seeded(n: usize, seed: u64) -> Vec<OpOf<Self>>;
fn init_db(ctx: deterministic::Context) -> impl std::future::Future<Output = Self::Db> + Send;
fn init_db_with_config(
ctx: deterministic::Context,
config: ConfigOf<Self>,
) -> impl std::future::Future<Output = Self::Db> + Send;
fn apply_ops(
db: Self::Db,
ops: Vec<OpOf<Self>>,
) -> impl std::future::Future<Output = Self::Db> + Send;
}
pub(crate) fn test_sync_empty_operations_no_panic<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let target_db = H::init_db(context.with_label("target")).await;
let db_config = H::config(&context.next_u64().to_string(), &context);
let config = Config {
db_config,
fetch_batch_size: NZU64!(10),
target: Target {
root: Digest::from([1u8; 32]),
range: non_empty_range!(Location::new(0), Location::new(10)),
},
context: context.with_label("client"),
resolver: Arc::new(target_db),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let mut client: Engine<H::Db, _> = Engine::new(config).await.unwrap();
client.store_operations(Location::new(0), vec![]);
client.store_operations(Location::new(5), vec![]);
client.apply_operations().await.unwrap();
});
}
pub(crate) fn test_sync_resolver_fails<H: SyncTestHarness>()
where
resolver::tests::FailResolver<OpOf<H>, Digest>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let resolver = resolver::tests::FailResolver::<OpOf<H>, Digest>::new();
let target_root = Digest::from([0; 32]);
let db_config = H::config(&context.next_u64().to_string(), &context);
let engine_config = Config {
context: context.with_label("client"),
target: Target {
root: target_root,
range: non_empty_range!(Location::new(0), Location::new(5)),
},
resolver,
apply_batch_size: 2,
max_outstanding_requests: 2,
fetch_batch_size: NZU64!(2),
db_config,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let result: Result<H::Db, _> = sync::sync(engine_config).await;
assert!(result.is_err());
});
}
pub(crate) fn test_sync<H: SyncTestHarness>(target_db_ops: usize, fetch_batch_size: NonZeroU64)
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let target_ops = H::create_ops(target_db_ops);
target_db = H::apply_ops(target_db, target_ops).await;
target_db
.prune(target_db.inactivity_floor_loc().await)
.await
.unwrap();
let target_op_count = target_db.bounds().await.end;
let target_inactivity_floor = target_db.inactivity_floor_loc().await;
let sync_root = H::sync_target_root(&target_db);
let verification_root = target_db.root();
let lower_bound = target_db.inactivity_floor_loc().await;
let db_config = H::config(&context.next_u64().to_string(), &context);
let target_db = Arc::new(target_db);
let client_context = context.with_label("client");
let config = Config {
db_config: db_config.clone(),
fetch_batch_size,
target: Target {
root: sync_root,
range: non_empty_range!(lower_bound, target_op_count),
},
context: client_context.clone(),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let synced_db: H::Db = sync::sync(config).await.unwrap();
assert_eq!(synced_db.bounds().await.end, target_op_count);
assert_eq!(
synced_db.inactivity_floor_loc().await,
target_inactivity_floor
);
assert_eq!(synced_db.root(), verification_root);
let final_root = synced_db.root();
let final_op_count = synced_db.bounds().await.end;
let final_inactivity_floor = synced_db.inactivity_floor_loc().await;
drop(synced_db);
let reopened_db =
H::init_db_with_config(client_context.with_label("reopened"), db_config).await;
assert_eq!(reopened_db.bounds().await.end, final_op_count);
assert_eq!(
reopened_db.inactivity_floor_loc().await,
final_inactivity_floor
);
assert_eq!(reopened_db.root(), final_root);
reopened_db.destroy().await.unwrap();
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_sync_subset_of_target_database<H: SyncTestHarness>(target_db_ops: usize)
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode + Clone + OperationTrait<mmr::Family, Key = Digest>,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let target_ops = H::create_ops(target_db_ops);
target_db = H::apply_ops(target_db, target_ops[0..target_db_ops - 1].to_vec()).await;
let upper_bound = target_db.bounds().await.end;
let sync_root = H::sync_target_root(&target_db);
let verification_root = target_db.root();
let lower_bound = target_db.inactivity_floor_loc().await;
let final_op = target_ops[target_db_ops - 1].clone();
let final_key = final_op.key().cloned(); target_db = H::apply_ops(target_db, vec![final_op]).await;
let db_config = H::config(&context.next_u64().to_string(), &context);
let config = Config {
db_config,
fetch_batch_size: NZU64!(10),
target: Target {
root: sync_root,
range: non_empty_range!(lower_bound, upper_bound),
},
context: context.with_label("client"),
resolver: Arc::new(target_db),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let synced_db: H::Db = sync::sync(config).await.unwrap();
assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound);
assert_eq!(synced_db.bounds().await.end, upper_bound);
assert_eq!(synced_db.root(), verification_root);
if let Some(key) = final_key {
assert!(synced_db.get(&key).await.unwrap().is_none());
}
synced_db.destroy().await.unwrap();
});
}
pub(crate) fn test_sync_use_existing_db_partial_match<H: SyncTestHarness>(original_ops: usize)
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode + Clone + OperationTrait<mmr::Family, Key = Digest>,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let original_ops_data = H::create_ops(original_ops);
let mut target_db = H::init_db(context.with_label("target")).await;
let sync_db_config = H::config(&context.next_u64().to_string(), &context);
let client_context = context.with_label("client");
let mut sync_db: H::Db =
H::init_db_with_config(client_context.clone(), sync_db_config.clone()).await;
target_db = H::apply_ops(target_db, original_ops_data.clone()).await;
sync_db = H::apply_ops(sync_db, original_ops_data.clone()).await;
drop(sync_db);
let more_ops = H::create_ops_seeded(1, 1);
target_db = H::apply_ops(target_db, more_ops.clone()).await;
let sync_root = H::sync_target_root(&target_db);
let verification_root = target_db.root();
let lower_bound = target_db.inactivity_floor_loc().await;
let upper_bound = target_db.bounds().await.end;
let target_db = Arc::new(target_db);
let config = Config {
db_config: sync_db_config,
fetch_batch_size: NZU64!(10),
target: Target {
root: sync_root,
range: non_empty_range!(lower_bound, upper_bound),
},
context: client_context.with_label("sync"),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let synced_db: H::Db = sync::sync(config).await.unwrap();
let bounds = synced_db.bounds().await;
assert_eq!(bounds.end, upper_bound);
assert_eq!(
synced_db.inactivity_floor_loc().await,
target_db.inactivity_floor_loc().await
);
assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound);
assert_eq!(bounds.end, target_db.bounds().await.end);
assert_eq!(synced_db.root(), verification_root);
for target_op in &original_ops_data {
if let Some(key) = target_op.key() {
let target_value = target_db.get(key).await.unwrap();
let synced_value = synced_db.get(key).await.unwrap();
assert_eq!(target_value.is_some(), synced_value.is_some());
}
}
if let Some(key) = more_ops[0].key() {
let synced_value = synced_db.get(key).await.unwrap();
let target_value = target_db.get(key).await.unwrap();
assert_eq!(synced_value.is_some(), target_value.is_some());
}
synced_db.destroy().await.unwrap();
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_sync_use_existing_db_exact_match<H: SyncTestHarness>(num_ops: usize)
where
resolver::tests::FailResolver<OpOf<H>, Digest>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode + Clone + OperationTrait<mmr::Family, Key = Digest>,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let target_ops = H::create_ops(num_ops);
let target_config = H::config(&context.next_u64().to_string(), &context);
let mut target_db =
H::init_db_with_config(context.with_label("target"), target_config).await;
let sync_config = H::config(&context.next_u64().to_string(), &context);
let client_context = context.with_label("client");
let mut sync_db = H::init_db_with_config(client_context.clone(), sync_config.clone()).await;
target_db = H::apply_ops(target_db, target_ops.clone()).await;
sync_db = H::apply_ops(sync_db, target_ops.clone()).await;
target_db
.prune(target_db.inactivity_floor_loc().await)
.await
.unwrap();
sync_db
.prune(sync_db.inactivity_floor_loc().await)
.await
.unwrap();
sync_db.sync().await.unwrap();
drop(sync_db);
let sync_root = H::sync_target_root(&target_db);
let verification_root = target_db.root();
let lower_bound = target_db.inactivity_floor_loc().await;
let upper_bound = target_db.bounds().await.end;
let resolver = resolver::tests::FailResolver::<OpOf<H>, Digest>::new();
let config = Config {
db_config: sync_config, fetch_batch_size: NZU64!(10),
target: Target {
root: sync_root,
range: non_empty_range!(lower_bound, upper_bound),
},
context: client_context.with_label("sync"),
resolver,
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let synced_db: H::Db = sync::sync(config).await.unwrap();
let bounds = synced_db.bounds().await;
assert_eq!(bounds.end, upper_bound);
assert_eq!(bounds.end, target_db.bounds().await.end);
assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound);
assert_eq!(synced_db.root(), verification_root);
for target_op in &target_ops {
if let Some(key) = target_op.key() {
let target_value = target_db.get(key).await.unwrap();
let synced_value = synced_db.get(key).await.unwrap();
assert_eq!(target_value.is_some(), synced_value.is_some());
}
}
synced_db.destroy().await.unwrap();
target_db.destroy().await.unwrap();
});
}
pub(crate) fn test_target_update_lower_bound_decrease<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let target_ops = H::create_ops(50);
target_db = H::apply_ops(target_db, target_ops).await;
let initial_lower_bound = target_db.inactivity_floor_loc().await;
let initial_upper_bound = target_db.bounds().await.end;
let initial_root = H::sync_target_root(&target_db);
let (update_sender, update_receiver) = mpsc::channel(1);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: H::config(&context.next_u64().to_string(), &context),
fetch_batch_size: NZU64!(5),
target: Target {
root: initial_root,
range: non_empty_range!(initial_lower_bound, initial_upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 10,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
let client: Engine<H::Db, _> = Engine::new(config).await.unwrap();
update_sender
.send(Target {
root: initial_root,
range: non_empty_range!(
initial_lower_bound.checked_sub(1).unwrap(),
initial_upper_bound.checked_add(1).unwrap()
),
})
.await
.unwrap();
let result = client.step().await;
assert!(matches!(
result,
Err(sync::Error::Engine(
sync::EngineError::SyncTargetMovedBackward { .. }
))
));
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_target_update_upper_bound_decrease<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let target_ops = H::create_ops(50);
target_db = H::apply_ops(target_db, target_ops).await;
let initial_lower_bound = target_db.inactivity_floor_loc().await;
let initial_upper_bound = target_db.bounds().await.end;
let initial_root = H::sync_target_root(&target_db);
let (update_sender, update_receiver) = mpsc::channel(1);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: H::config(&context.next_u64().to_string(), &context),
fetch_batch_size: NZU64!(5),
target: Target {
root: initial_root,
range: non_empty_range!(initial_lower_bound, initial_upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 10,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
let client: Engine<H::Db, _> = Engine::new(config).await.unwrap();
update_sender
.send(Target {
root: initial_root,
range: non_empty_range!(
initial_lower_bound,
initial_upper_bound.checked_sub(1).unwrap()
),
})
.await
.unwrap();
let result = client.step().await;
assert!(matches!(
result,
Err(sync::Error::Engine(
sync::EngineError::SyncTargetMovedBackward { .. }
))
));
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_target_update_bounds_increase<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode + Clone,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let target_ops = H::create_ops(100);
target_db = H::apply_ops(target_db, target_ops).await;
let initial_lower_bound = target_db.inactivity_floor_loc().await;
let initial_upper_bound = target_db.bounds().await.end;
let initial_root = H::sync_target_root(&target_db);
let additional_ops = H::create_ops_seeded(1, 1);
let new_verification_root = {
target_db = H::apply_ops(target_db, additional_ops).await;
let new_lower_bound = target_db.inactivity_floor_loc().await;
let new_upper_bound = target_db.bounds().await.end;
let new_sync_root = H::sync_target_root(&target_db);
let new_verification_root = target_db.root();
let (update_sender, update_receiver) = mpsc::channel(1);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: H::config(&context.next_u64().to_string(), &context),
fetch_batch_size: NZU64!(1),
target: Target {
root: initial_root,
range: non_empty_range!(initial_lower_bound, initial_upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
update_sender
.send(Target {
root: new_sync_root,
range: non_empty_range!(new_lower_bound, new_upper_bound),
})
.await
.unwrap();
let synced_db: H::Db = sync::sync(config).await.unwrap();
assert_eq!(synced_db.root(), new_verification_root);
assert_eq!(synced_db.bounds().await.end, new_upper_bound);
assert_eq!(synced_db.inactivity_floor_loc().await, new_lower_bound);
synced_db.destroy().await.unwrap();
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
new_verification_root
};
let _ = new_verification_root; });
}
pub(crate) fn test_target_update_on_done_client<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let target_ops = H::create_ops(10);
target_db = H::apply_ops(target_db, target_ops).await;
let lower_bound = target_db.inactivity_floor_loc().await;
let upper_bound = target_db.bounds().await.end;
let sync_root = H::sync_target_root(&target_db);
let verification_root = target_db.root();
let (update_sender, update_receiver) = mpsc::channel(1);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: H::config(&context.next_u64().to_string(), &context),
fetch_batch_size: NZU64!(20),
target: Target {
root: sync_root,
range: non_empty_range!(lower_bound, upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 10,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
let synced_db: H::Db = sync::sync(config).await.unwrap();
let _ = update_sender
.send(Target {
root: Digest::from([2u8; 32]),
range: non_empty_range!(lower_bound + 1, upper_bound + 1),
})
.await;
assert_eq!(synced_db.root(), verification_root);
assert_eq!(synced_db.bounds().await.end, upper_bound);
assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound);
synced_db.destroy().await.unwrap();
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_sync_waits_for_explicit_finish<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
target_db = H::apply_ops(target_db, H::create_ops(10)).await;
let initial_target = Target {
root: H::sync_target_root(&target_db),
range: non_empty_range!(
target_db.inactivity_floor_loc().await,
target_db.bounds().await.end
),
};
target_db = H::apply_ops(target_db, H::create_ops_seeded(5, 1)).await;
let updated_lower_bound = target_db.inactivity_floor_loc().await;
let updated_upper_bound = target_db.bounds().await.end;
let updated_target = Target {
root: H::sync_target_root(&target_db),
range: non_empty_range!(updated_lower_bound, updated_upper_bound),
};
let updated_verification_root = target_db.root();
let (update_sender, update_receiver) = mpsc::channel(1);
let (finish_sender, finish_receiver) = mpsc::channel(1);
let (reached_sender, mut reached_receiver) = mpsc::channel(1);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: H::config(&context.next_u64().to_string(), &context),
fetch_batch_size: NZU64!(10),
target: initial_target.clone(),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: Some(update_receiver),
finish_rx: Some(finish_receiver),
reached_target_tx: Some(reached_sender),
max_retained_roots: 0,
};
let sync_handle = sync::sync(config);
pin_mut!(sync_handle);
select! {
_ = sync_handle.as_mut() => {
panic!("sync completed before explicit finish signal");
},
reached = reached_receiver.recv() => {
let reached = reached.expect("engine should report reached-target before finish");
assert_eq!(reached, initial_target);
},
}
assert!(
sync_handle.as_mut().now_or_never().is_none(),
"sync must wait for explicit finish signal after reaching target"
);
update_sender
.send(updated_target.clone())
.await
.expect("target update channel should be open");
select! {
_ = sync_handle.as_mut() => {
panic!("sync completed before explicit finish signal for updated target");
},
reached = reached_receiver.recv() => {
let reached = reached.expect("engine should report updated target before finish");
assert_eq!(reached, updated_target);
},
}
assert!(
sync_handle.as_mut().now_or_never().is_none(),
"sync must still wait for explicit finish signal after updated target is reached"
);
finish_sender
.send(())
.await
.expect("finish signal channel should be open");
let synced_db: H::Db = sync_handle
.await
.expect("sync should succeed after finish signal");
assert_eq!(synced_db.root(), updated_verification_root);
assert_eq!(synced_db.bounds().await.end, updated_upper_bound);
assert_eq!(synced_db.inactivity_floor_loc().await, updated_lower_bound);
synced_db.destroy().await.unwrap();
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_sync_handles_early_finish_signal<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
target_db = H::apply_ops(target_db, H::create_ops(30)).await;
let lower_bound = target_db.inactivity_floor_loc().await;
let upper_bound = target_db.bounds().await.end;
let target = Target {
root: H::sync_target_root(&target_db),
range: non_empty_range!(lower_bound, upper_bound),
};
let verification_root = target_db.root();
let (finish_sender, finish_receiver) = mpsc::channel(1);
let (reached_sender, mut reached_receiver) = mpsc::channel(1);
finish_sender
.send(())
.await
.expect("finish signal channel should be open");
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: H::config(&context.next_u64().to_string(), &context),
fetch_batch_size: NZU64!(3),
target: target.clone(),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: Some(finish_receiver),
reached_target_tx: Some(reached_sender),
max_retained_roots: 1,
};
let synced_db: H::Db = sync::sync(config)
.await
.expect("sync should complete after early finish signal");
let reached = reached_receiver
.recv()
.await
.expect("engine should report reached-target");
assert_eq!(reached, target);
assert_eq!(synced_db.root(), verification_root);
assert_eq!(synced_db.bounds().await.end, upper_bound);
assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound);
synced_db.destroy().await.unwrap();
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_sync_fails_when_finish_sender_dropped<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
target_db = H::apply_ops(target_db, H::create_ops(10)).await;
let lower_bound = target_db.inactivity_floor_loc().await;
let upper_bound = target_db.bounds().await.end;
let (finish_sender, finish_receiver) = mpsc::channel(1);
drop(finish_sender);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: H::config(&context.next_u64().to_string(), &context),
fetch_batch_size: NZU64!(5),
target: Target {
root: H::sync_target_root(&target_db),
range: non_empty_range!(lower_bound, upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: Some(finish_receiver),
reached_target_tx: None,
max_retained_roots: 1,
};
let result: Result<H::Db, _> = sync::sync(config).await;
assert!(matches!(
result,
Err(sync::Error::Engine(sync::EngineError::FinishChannelClosed))
));
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_sync_allows_dropped_reached_target_receiver<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
target_db = H::apply_ops(target_db, H::create_ops(10)).await;
let lower_bound = target_db.inactivity_floor_loc().await;
let upper_bound = target_db.bounds().await.end;
let verification_root = target_db.root();
let (reached_sender, reached_receiver) = mpsc::channel(1);
drop(reached_receiver);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: H::config(&context.next_u64().to_string(), &context),
fetch_batch_size: NZU64!(5),
target: Target {
root: H::sync_target_root(&target_db),
range: non_empty_range!(lower_bound, upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: Some(reached_sender),
max_retained_roots: 1,
};
let synced_db: H::Db = sync::sync(config)
.await
.expect("sync should succeed when reached-target receiver is dropped");
assert_eq!(synced_db.root(), verification_root);
assert_eq!(synced_db.bounds().await.end, upper_bound);
assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound);
synced_db.destroy().await.unwrap();
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_target_update_during_sync<H: SyncTestHarness>(
initial_ops: usize,
additional_ops: usize,
) where
Arc<AsyncRwLock<Option<DbOf<H>>>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode + Clone,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let target_ops = H::create_ops(initial_ops);
target_db = H::apply_ops(target_db, target_ops).await;
let initial_lower_bound = target_db.inactivity_floor_loc().await;
let initial_upper_bound = target_db.bounds().await.end;
let initial_sync_root = H::sync_target_root(&target_db);
let target_db = Arc::new(AsyncRwLock::new(Some(target_db)));
let (update_sender, update_receiver) = mpsc::channel(1);
let client = {
let config = Config {
context: context.with_label("client"),
db_config: H::config(&context.next_u64().to_string(), &context),
target: Target {
root: initial_sync_root,
range: non_empty_range!(initial_lower_bound, initial_upper_bound),
},
resolver: target_db.clone(),
fetch_batch_size: NZU64!(1), max_outstanding_requests: 10,
apply_batch_size: 1024,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
let mut client: Engine<H::Db, _> = Engine::new(config).await.unwrap();
loop {
client = match client.step().await.unwrap() {
NextStep::Continue(new_client) => new_client,
NextStep::Complete(_) => panic!("client should not be complete"),
};
let log_size = client.journal().size().await;
if log_size > initial_lower_bound {
break client;
}
}
};
let additional_ops_data = H::create_ops_seeded(additional_ops, 1);
let new_verification_root = {
let mut db_guard = target_db.write().await;
let db = db_guard.take().unwrap();
let db = H::apply_ops(db, additional_ops_data).await;
let new_lower_bound = db.inactivity_floor_loc().await;
let new_upper_bound = db.bounds().await.end;
let new_sync_root = H::sync_target_root(&db);
let new_verification_root = db.root();
*db_guard = Some(db);
update_sender
.send(Target {
root: new_sync_root,
range: non_empty_range!(new_lower_bound, new_upper_bound),
})
.await
.unwrap();
new_verification_root
};
let synced_db = client.sync().await.unwrap();
assert_eq!(synced_db.root(), new_verification_root);
let target_db = Arc::try_unwrap(target_db).map_or_else(
|_| panic!("Failed to unwrap Arc - still has references"),
|rw_lock| rw_lock.into_inner().expect("db should be present"),
);
{
let synced_bounds = synced_db.bounds().await;
let target_bounds = target_db.bounds().await;
assert_eq!(synced_bounds.end, target_bounds.end);
assert_eq!(
synced_db.inactivity_floor_loc().await,
target_db.inactivity_floor_loc().await
);
assert_eq!(synced_db.root(), target_db.root());
}
synced_db.destroy().await.unwrap();
target_db.destroy().await.unwrap();
});
}
pub(crate) fn test_sync_database_persistence<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode + Clone,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let target_ops = H::create_ops(10);
target_db = H::apply_ops(target_db, target_ops).await;
let sync_root = H::sync_target_root(&target_db);
let verification_root = target_db.root();
let lower_bound = target_db.inactivity_floor_loc().await;
let upper_bound = target_db.bounds().await.end;
let db_config = H::config(&context.next_u64().to_string(), &context);
let client_context = context.with_label("client");
let target_db = Arc::new(target_db);
let config = Config {
db_config: db_config.clone(),
fetch_batch_size: NZU64!(5),
target: Target {
root: sync_root,
range: non_empty_range!(lower_bound, upper_bound),
},
context: client_context.clone(),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let synced_db: H::Db = sync::sync(config).await.unwrap();
assert_eq!(synced_db.root(), verification_root);
let expected_root = synced_db.root();
let expected_op_count = synced_db.bounds().await.end;
let expected_inactivity_floor_loc = synced_db.inactivity_floor_loc().await;
drop(synced_db);
let reopened_db =
H::init_db_with_config(client_context.with_label("reopened"), db_config).await;
assert_eq!(reopened_db.root(), expected_root);
assert_eq!(reopened_db.bounds().await.end, expected_op_count);
assert_eq!(
reopened_db.inactivity_floor_loc().await,
expected_inactivity_floor_loc
);
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
reopened_db.destroy().await.unwrap();
});
}
pub(crate) fn test_sync_post_sync_usability<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let target_ops = H::create_ops(50);
target_db = H::apply_ops(target_db, target_ops).await;
let sync_root = H::sync_target_root(&target_db);
let lower_bound = target_db.inactivity_floor_loc().await;
let upper_bound = target_db.bounds().await.end;
let target_db = Arc::new(target_db);
let config = H::config(&context.next_u64().to_string(), &context);
let config = Config {
db_config: config,
fetch_batch_size: NZU64!(100),
target: Target {
root: sync_root,
range: non_empty_range!(lower_bound, upper_bound),
},
context: context.with_label("client"),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let synced_db: H::Db = sync::sync(config).await.unwrap();
let root_after_sync = synced_db.root();
let more_ops = H::create_ops_seeded(10, 1);
let synced_db = H::apply_ops(synced_db, more_ops).await;
assert_ne!(synced_db.root(), root_after_sync);
assert!(synced_db.bounds().await.end > upper_bound);
synced_db.destroy().await.unwrap();
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
pub(crate) fn test_from_sync_result_nonempty_to_nonempty_exact_match<H: SyncTestHarness>()
where
DbOf<H>: FromSyncTestable,
OpOf<H>: Encode + Clone,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let db_config = H::config(&context.next_u64().to_string(), &context);
let mut db = H::init_db_with_config(context.with_label("source"), db_config.clone()).await;
let ops = H::create_ops(100);
db = H::apply_ops(db, ops).await;
let sync_lower_bound = db.inactivity_floor_loc().await;
let bounds = db.bounds().await;
let sync_upper_bound = bounds.end;
let target_db_op_count = bounds.end;
let target_db_inactivity_floor_loc = db.inactivity_floor_loc().await;
let pinned_nodes = db.pinned_nodes_at(db.inactivity_floor_loc().await).await;
let (_, journal) = db.into_log_components();
let sync_db: DbOf<H> = <DbOf<H> as qmdb::sync::Database>::from_sync_result(
context.with_label("synced"),
db_config,
journal,
Some(pinned_nodes),
sync_lower_bound..sync_upper_bound,
1024,
)
.await
.unwrap();
assert_eq!(sync_db.bounds().await.end, target_db_op_count);
assert_eq!(
sync_db.inactivity_floor_loc().await,
target_db_inactivity_floor_loc
);
assert_eq!(sync_db.inactivity_floor_loc().await, sync_lower_bound);
sync_db.destroy().await.unwrap();
});
}
pub(crate) fn test_from_sync_result_nonempty_to_nonempty_partial_match<H: SyncTestHarness>()
where
DbOf<H>: FromSyncTestable,
OpOf<H>: Encode + Clone,
JournalOf<H>: Contiguous,
{
const NUM_OPS: usize = 100;
const NUM_ADDITIONAL_OPS: usize = 5;
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let sync_db_config = H::config(&context.next_u64().to_string(), &context);
let client_context = context.with_label("client");
let mut sync_db =
H::init_db_with_config(client_context.clone(), sync_db_config.clone()).await;
let original_ops = H::create_ops(NUM_OPS);
target_db = H::apply_ops(target_db, original_ops.clone()).await;
target_db
.prune(target_db.inactivity_floor_loc().await)
.await
.unwrap();
sync_db = H::apply_ops(sync_db, original_ops.clone()).await;
sync_db
.prune(sync_db.inactivity_floor_loc().await)
.await
.unwrap();
sync_db.sync().await.unwrap();
drop(sync_db);
let more_ops = H::create_ops_seeded(NUM_ADDITIONAL_OPS, 1);
target_db = H::apply_ops(target_db, more_ops).await;
let bounds = target_db.bounds().await;
let target_db_op_count = bounds.end;
let target_db_inactivity_floor_loc = target_db.inactivity_floor_loc().await;
let sync_lower_bound = target_db.inactivity_floor_loc().await;
let sync_upper_bound = bounds.end;
let target_hash = target_db.root();
let pinned_nodes = target_db.pinned_nodes_at(sync_lower_bound).await;
let (mmr, journal) = target_db.into_log_components();
let sync_db: DbOf<H> = <DbOf<H> as qmdb::sync::Database>::from_sync_result(
client_context.with_label("synced"),
sync_db_config,
journal,
Some(pinned_nodes),
sync_lower_bound..sync_upper_bound,
1024,
)
.await
.unwrap();
assert_eq!(sync_db.bounds().await.end, target_db_op_count);
assert_eq!(
sync_db.inactivity_floor_loc().await,
target_db_inactivity_floor_loc
);
assert_eq!(sync_db.inactivity_floor_loc().await, sync_lower_bound);
assert_eq!(sync_db.root(), target_hash);
sync_db.destroy().await.unwrap();
mmr.destroy().await.unwrap();
});
}
pub(crate) fn test_from_sync_result_empty_to_nonempty<H: SyncTestHarness>()
where
DbOf<H>: FromSyncTestable,
OpOf<H>: Encode + Clone,
JournalOf<H>: Contiguous,
{
const NUM_OPS: usize = 100;
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut source_db = H::init_db(context.with_label("source")).await;
let ops = H::create_ops(NUM_OPS);
source_db = H::apply_ops(source_db, ops).await;
source_db
.prune(source_db.inactivity_floor_loc().await)
.await
.unwrap();
let lower_bound = source_db.inactivity_floor_loc().await;
let upper_bound = source_db.bounds().await.end;
let pinned_nodes = source_db.pinned_nodes_at(lower_bound).await;
let target_hash = source_db.root();
let target_op_count = source_db.bounds().await.end;
let target_inactivity_floor = source_db.inactivity_floor_loc().await;
let (mmr, journal) = source_db.into_log_components();
let new_db_config = H::config(&context.next_u64().to_string(), &context);
let db: DbOf<H> = <DbOf<H> as qmdb::sync::Database>::from_sync_result(
context.with_label("synced"),
new_db_config,
journal,
Some(pinned_nodes),
lower_bound..upper_bound,
1024,
)
.await
.unwrap();
assert_eq!(db.bounds().await.end, target_op_count);
assert_eq!(db.inactivity_floor_loc().await, target_inactivity_floor);
assert_eq!(db.inactivity_floor_loc().await, lower_bound);
assert_eq!(db.root(), target_hash);
db.destroy().await.unwrap();
mmr.destroy().await.unwrap();
});
}
pub(crate) fn test_from_sync_result_empty_to_empty<H: SyncTestHarness>()
where
DbOf<H>: FromSyncTestable,
OpOf<H>: Encode + Clone,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let source_db = H::init_db(context.with_label("source")).await;
assert_eq!(source_db.bounds().await.end, Location::new(1));
let target_hash = source_db.root();
let (mmr, journal) = source_db.into_log_components();
let new_db_config = H::config(&context.next_u64().to_string(), &context);
let mut synced_db: DbOf<H> = <DbOf<H> as qmdb::sync::Database>::from_sync_result(
context.with_label("synced"),
new_db_config,
journal,
None,
Location::new(0)..Location::new(1),
1024,
)
.await
.unwrap();
assert_eq!(synced_db.bounds().await.end, Location::new(1));
assert_eq!(synced_db.inactivity_floor_loc().await, Location::new(0));
assert_eq!(synced_db.root(), target_hash);
let ops = H::create_ops(10);
synced_db = H::apply_ops(synced_db, ops).await;
assert!(synced_db.bounds().await.end > Location::new(1));
synced_db.destroy().await.unwrap();
mmr.destroy().await.unwrap();
});
}
#[derive(Clone)]
struct CorruptFirstPinnedNodesResolver<R> {
inner: R,
corrupted: Arc<std::sync::atomic::AtomicBool>,
}
impl<R: Resolver<Digest = Digest>> Resolver for CorruptFirstPinnedNodesResolver<R> {
type Digest = Digest;
type Op = R::Op;
type Error = R::Error;
async fn get_operations(
&self,
op_count: Location,
start_loc: Location,
max_ops: NonZeroU64,
include_pinned_nodes: bool,
cancel_rx: oneshot::Receiver<()>,
) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
let mut result = self
.inner
.get_operations(
op_count,
start_loc,
max_ops,
include_pinned_nodes,
cancel_rx,
)
.await?;
if result.pinned_nodes.is_some()
&& !self
.corrupted
.swap(true, std::sync::atomic::Ordering::Relaxed)
{
if let Some(ref mut nodes) = result.pinned_nodes {
if !nodes.is_empty() {
nodes[0] = Digest::from([0xFFu8; 32]);
}
}
}
Ok(result)
}
}
pub(crate) fn test_sync_retries_bad_pinned_nodes<H: SyncTestHarness>()
where
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
OpOf<H>: Encode,
JournalOf<H>: Contiguous,
{
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = H::init_db(context.with_label("target")).await;
let ops = H::create_ops(20);
target_db = H::apply_ops(target_db, ops).await;
target_db
.prune(target_db.inactivity_floor_loc().await)
.await
.unwrap();
let sync_root = H::sync_target_root(&target_db);
let lower_bound = target_db.inactivity_floor_loc().await;
let upper_bound = target_db.bounds().await.end;
let db_config = H::config(&context.next_u64().to_string(), &context);
let resolver = CorruptFirstPinnedNodesResolver {
inner: Arc::new(target_db),
corrupted: Arc::new(std::sync::atomic::AtomicBool::new(false)),
};
let config = sync::engine::Config {
db_config,
fetch_batch_size: NZU64!(100),
target: Target {
root: sync_root,
range: non_empty_range!(lower_bound, upper_bound),
},
context: context.with_label("client"),
resolver,
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let synced_db: H::Db = sync::sync(config).await.unwrap();
assert_eq!(synced_db.root(), sync_root);
synced_db.destroy().await.unwrap();
});
}
mod harnesses {
use super::SyncTestHarness;
use crate::{qmdb::any::value::VariableEncoding, translator::TwoCap};
use commonware_cryptography::sha256::Digest;
use commonware_runtime::{deterministic::Context, BufferPooler};
pub struct OrderedFixedHarness;
impl SyncTestHarness for OrderedFixedHarness {
type Db = crate::qmdb::any::ordered::fixed::test::AnyTest;
fn sync_target_root(db: &Self::Db) -> Digest {
db.root()
}
fn config(
suffix: &str,
pooler: &impl BufferPooler,
) -> crate::qmdb::any::FixedConfig<TwoCap> {
crate::qmdb::any::test::fixed_db_config(suffix, pooler)
}
fn create_ops(
n: usize,
) -> Vec<crate::qmdb::any::ordered::fixed::Operation<crate::mmr::Family, Digest, Digest>>
{
crate::qmdb::any::ordered::fixed::test::create_test_ops(n)
}
fn create_ops_seeded(
n: usize,
seed: u64,
) -> Vec<crate::qmdb::any::ordered::fixed::Operation<crate::mmr::Family, Digest, Digest>>
{
crate::qmdb::any::ordered::fixed::test::create_test_ops_seeded(n, seed)
}
async fn init_db(ctx: Context) -> Self::Db {
crate::qmdb::any::ordered::fixed::test::create_test_db(ctx).await
}
async fn init_db_with_config(
ctx: Context,
config: crate::qmdb::any::FixedConfig<TwoCap>,
) -> Self::Db {
Self::Db::init(ctx, config).await.unwrap()
}
async fn apply_ops(
mut db: Self::Db,
ops: Vec<
crate::qmdb::any::ordered::fixed::Operation<crate::mmr::Family, Digest, Digest>,
>,
) -> Self::Db {
crate::qmdb::any::ordered::fixed::test::apply_ops(&mut db, ops).await;
let merkleized = db.new_batch().merkleize(&db, None::<Digest>).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db
}
}
pub struct OrderedVariableHarness;
impl SyncTestHarness for OrderedVariableHarness {
type Db = crate::qmdb::any::ordered::variable::test::AnyTest;
fn sync_target_root(db: &Self::Db) -> Digest {
db.root()
}
fn config(
suffix: &str,
pooler: &impl BufferPooler,
) -> crate::qmdb::any::ordered::variable::test::VarConfig {
crate::qmdb::any::ordered::variable::test::create_test_config(
suffix.parse().unwrap_or(0),
pooler,
)
}
fn create_ops_seeded(
n: usize,
seed: u64,
) -> Vec<crate::qmdb::any::ordered::variable::Operation<crate::mmr::Family, Digest, Vec<u8>>>
{
crate::qmdb::any::ordered::variable::test::create_test_ops_seeded(n, seed)
}
fn create_ops(
n: usize,
) -> Vec<crate::qmdb::any::ordered::variable::Operation<crate::mmr::Family, Digest, Vec<u8>>>
{
crate::qmdb::any::ordered::variable::test::create_test_ops(n)
}
async fn init_db(ctx: Context) -> Self::Db {
crate::qmdb::any::ordered::variable::test::create_test_db(ctx).await
}
async fn init_db_with_config(
ctx: Context,
config: crate::qmdb::any::ordered::variable::test::VarConfig,
) -> Self::Db {
Self::Db::init(ctx, config).await.unwrap()
}
async fn apply_ops(
mut db: Self::Db,
ops: Vec<
crate::qmdb::any::ordered::variable::Operation<crate::mmr::Family, Digest, Vec<u8>>,
>,
) -> Self::Db {
crate::qmdb::any::ordered::variable::test::apply_ops(&mut db, ops).await;
let merkleized = db
.new_batch()
.merkleize(&db, None::<Vec<u8>>)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db
}
}
pub struct UnorderedFixedHarness;
impl SyncTestHarness for UnorderedFixedHarness {
type Db = crate::qmdb::any::unordered::fixed::test::AnyTest;
fn sync_target_root(db: &Self::Db) -> Digest {
db.root()
}
fn config(
suffix: &str,
pooler: &impl BufferPooler,
) -> crate::qmdb::any::FixedConfig<TwoCap> {
crate::qmdb::any::test::fixed_db_config(suffix, pooler)
}
fn create_ops_seeded(
n: usize,
seed: u64,
) -> Vec<crate::qmdb::any::unordered::fixed::Operation<crate::mmr::Family, Digest, Digest>>
{
crate::qmdb::any::unordered::fixed::test::create_test_ops_seeded(n, seed)
}
fn create_ops(
n: usize,
) -> Vec<crate::qmdb::any::unordered::fixed::Operation<crate::mmr::Family, Digest, Digest>>
{
crate::qmdb::any::unordered::fixed::test::create_test_ops(n)
}
async fn init_db(ctx: Context) -> Self::Db {
crate::qmdb::any::unordered::fixed::test::create_test_db(ctx).await
}
async fn init_db_with_config(
ctx: Context,
config: crate::qmdb::any::FixedConfig<TwoCap>,
) -> Self::Db {
Self::Db::init(ctx, config).await.unwrap()
}
async fn apply_ops(
mut db: Self::Db,
ops: Vec<
crate::qmdb::any::unordered::fixed::Operation<crate::mmr::Family, Digest, Digest>,
>,
) -> Self::Db {
crate::qmdb::any::unordered::fixed::test::apply_ops(&mut db, ops).await;
let merkleized = db.new_batch().merkleize(&db, None::<Digest>).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db
}
}
pub struct UnorderedVariableHarness;
impl SyncTestHarness for UnorderedVariableHarness {
type Db = crate::qmdb::any::unordered::variable::test::AnyTest;
fn sync_target_root(db: &Self::Db) -> Digest {
db.root()
}
fn config(
suffix: &str,
pooler: &impl BufferPooler,
) -> crate::qmdb::any::unordered::variable::test::VarConfig {
crate::qmdb::any::unordered::variable::test::create_test_config(
suffix.parse().unwrap_or(0),
pooler,
)
}
fn create_ops(
n: usize,
) -> Vec<
crate::qmdb::any::unordered::Operation<
crate::mmr::Family,
Digest,
VariableEncoding<Vec<u8>>,
>,
> {
crate::qmdb::any::unordered::variable::test::create_test_ops(n)
}
fn create_ops_seeded(n: usize, seed: u64) -> Vec<super::OpOf<Self>> {
crate::qmdb::any::unordered::variable::test::create_test_ops_seeded(n, seed)
}
async fn init_db(ctx: Context) -> Self::Db {
crate::qmdb::any::unordered::variable::test::create_test_db(ctx).await
}
async fn init_db_with_config(
ctx: Context,
config: crate::qmdb::any::unordered::variable::test::VarConfig,
) -> Self::Db {
Self::Db::init(ctx, config).await.unwrap()
}
async fn apply_ops(
mut db: Self::Db,
ops: Vec<
crate::qmdb::any::unordered::Operation<
crate::mmr::Family,
Digest,
VariableEncoding<Vec<u8>>,
>,
>,
) -> Self::Db {
crate::qmdb::any::unordered::variable::test::apply_ops(&mut db, ops).await;
let merkleized = db
.new_batch()
.merkleize(&db, None::<Vec<u8>>)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db
}
}
}
macro_rules! sync_tests_for_harness {
($harness:ty, $mod_name:ident) => {
mod $mod_name {
use super::harnesses;
use commonware_macros::test_traced;
use rstest::rstest;
use std::num::NonZeroU64;
#[test_traced]
fn test_sync_empty_operations_no_panic() {
super::test_sync_empty_operations_no_panic::<$harness>();
}
#[test_traced]
fn test_sync_subset_of_target_database() {
super::test_sync_subset_of_target_database::<$harness>(1000);
}
#[rstest]
#[case::small_batch_size_one(10, 1)]
#[case::small_batch_size_gt_db_size(10, 20)]
#[case::batch_size_one(1000, 1)]
#[case::floor_div_db_batch_size(1000, 3)]
#[case::floor_div_db_batch_size_2(1000, 999)]
#[case::div_db_batch_size(1000, 100)]
#[case::db_size_eq_batch_size(1000, 1000)]
#[case::batch_size_gt_db_size(1000, 1001)]
fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: u64) {
super::test_sync::<$harness>(
target_db_ops,
NonZeroU64::new(fetch_batch_size).unwrap(),
);
}
#[test_traced]
fn test_sync_use_existing_db_partial_match() {
super::test_sync_use_existing_db_partial_match::<$harness>(1000);
}
#[test_traced]
fn test_sync_use_existing_db_exact_match() {
super::test_sync_use_existing_db_exact_match::<$harness>(1000);
}
#[test_traced("WARN")]
fn test_target_update_lower_bound_decrease() {
super::test_target_update_lower_bound_decrease::<$harness>();
}
#[test_traced("WARN")]
fn test_target_update_upper_bound_decrease() {
super::test_target_update_upper_bound_decrease::<$harness>();
}
#[test_traced("WARN")]
fn test_target_update_bounds_increase() {
super::test_target_update_bounds_increase::<$harness>();
}
#[test_traced("WARN")]
fn test_target_update_on_done_client() {
super::test_target_update_on_done_client::<$harness>();
}
#[test_traced]
fn test_sync_waits_for_explicit_finish() {
super::test_sync_waits_for_explicit_finish::<$harness>();
}
#[test_traced]
fn test_sync_handles_early_finish_signal() {
super::test_sync_handles_early_finish_signal::<$harness>();
}
#[test_traced]
fn test_sync_fails_when_finish_sender_dropped() {
super::test_sync_fails_when_finish_sender_dropped::<$harness>();
}
#[test_traced]
fn test_sync_allows_dropped_reached_target_receiver() {
super::test_sync_allows_dropped_reached_target_receiver::<$harness>();
}
#[rstest]
#[case(1, 1)]
#[case(1, 2)]
#[case(1, 100)]
#[case(2, 1)]
#[case(2, 2)]
#[case(2, 100)]
#[case(20, 10)]
#[case(100, 1)]
#[case(100, 2)]
#[case(100, 100)]
#[case(100, 1000)]
fn test_target_update_during_sync(
#[case] initial_ops: usize,
#[case] additional_ops: usize,
) {
super::test_target_update_during_sync::<$harness>(initial_ops, additional_ops);
}
#[test_traced]
fn test_sync_database_persistence() {
super::test_sync_database_persistence::<$harness>();
}
#[test_traced]
fn test_sync_post_sync_usability() {
super::test_sync_post_sync_usability::<$harness>();
}
#[test_traced]
fn test_sync_resolver_fails() {
super::test_sync_resolver_fails::<$harness>();
}
#[test_traced]
fn test_sync_retries_bad_pinned_nodes() {
super::test_sync_retries_bad_pinned_nodes::<$harness>();
}
#[test_traced("WARN")]
fn test_from_sync_result_empty_to_empty() {
super::test_from_sync_result_empty_to_empty::<$harness>();
}
#[test_traced]
fn test_from_sync_result_empty_to_nonempty() {
super::test_from_sync_result_empty_to_nonempty::<$harness>();
}
#[test_traced]
fn test_from_sync_result_nonempty_to_nonempty_partial_match() {
super::test_from_sync_result_nonempty_to_nonempty_partial_match::<$harness>();
}
#[test_traced]
fn test_from_sync_result_nonempty_to_nonempty_exact_match() {
super::test_from_sync_result_nonempty_to_nonempty_exact_match::<$harness>();
}
}
};
}
sync_tests_for_harness!(harnesses::OrderedFixedHarness, ordered_fixed);
sync_tests_for_harness!(harnesses::OrderedVariableHarness, ordered_variable);
sync_tests_for_harness!(harnesses::UnorderedFixedHarness, unordered_fixed);
sync_tests_for_harness!(harnesses::UnorderedVariableHarness, unordered_variable);