use crate::{
index::unordered::Index,
journal::{
authenticated,
contiguous::{Mutable, Reader as _},
Error as JournalError,
},
merkle::{
journaled::{self, Journaled},
mmr, Location,
},
qmdb::{
any::ValueEncoding,
build_snapshot_from_log,
immutable::{self, Operation},
operation::Key,
sync::{self},
Error,
},
translator::Translator,
Context, Persistable,
};
use commonware_codec::EncodeShared;
use commonware_cryptography::Hasher;
use std::ops::Range;
type StandardHasher<H> = crate::merkle::hasher::Standard<H>;
impl<E, K, V, C, H, T> sync::Database for immutable::Immutable<mmr::Family, E, K, V, C, H, T>
where
E: Context,
K: Key,
V: ValueEncoding,
C: Mutable<Item = Operation<K, V>>
+ Persistable<Error = JournalError>
+ sync::Journal<Context = E, Op = Operation<K, V>>,
C::Item: EncodeShared,
C::Config: Clone + Send,
H: Hasher,
T: Translator,
{
type Op = Operation<K, V>;
type Journal = C;
type Hasher = H;
type Config = immutable::Config<T, C::Config>;
type Digest = H::Digest;
type Context = E;
async fn from_sync_result(
context: Self::Context,
db_config: Self::Config,
log: Self::Journal,
pinned_nodes: Option<Vec<Self::Digest>>,
range: Range<mmr::Location>,
apply_batch_size: usize,
) -> Result<Self, Error<mmr::Family>> {
let hasher = StandardHasher::new();
let merkle = Journaled::init_sync(
context.with_label("merkle"),
journaled::SyncConfig {
config: db_config.merkle_config.clone(),
range,
pinned_nodes,
},
&hasher,
)
.await?;
let journal = authenticated::Journal::<_, _, _, _>::from_components(
merkle,
log,
hasher,
apply_batch_size as u64,
)
.await?;
let mut snapshot: Index<T, mmr::Location> =
Index::new(context.with_label("snapshot"), db_config.translator.clone());
let last_commit_loc = {
let reader = journal.journal.reader().await;
let bounds = reader.bounds();
let start_loc = mmr::Location::new(bounds.start);
build_snapshot_from_log::<mmr::Family, _, _, _>(
start_loc,
&reader,
&mut snapshot,
|_, _| {},
)
.await?;
Location::new(bounds.end.checked_sub(1).expect("commit should exist"))
};
let db = Self {
journal,
snapshot,
last_commit_loc,
};
db.sync().await?;
Ok(db)
}
fn root(&self) -> Self::Digest {
self.root()
}
}
#[cfg(test)]
mod tests {
use crate::{
merkle::mmr::Location,
qmdb::{
immutable,
immutable::variable::Operation,
sync::{
self,
engine::{Config, NextStep},
Engine, Target,
},
},
translator::TwoCap,
};
use commonware_cryptography::{sha256, Sha256};
use commonware_macros::test_traced;
use commonware_math::algebra::Random;
use commonware_runtime::{
buffer::paged::CacheRef, deterministic, BufferPooler, Metrics, Runner as _,
};
use commonware_utils::{
channel::mpsc, non_empty_range, test_rng_seeded, NZUsize, NZU16, NZU64,
};
use rand::RngCore as _;
use rstest::rstest;
use std::{
collections::HashMap,
num::{NonZeroU16, NonZeroU64, NonZeroUsize},
sync::Arc,
};
type ImmutableSyncTest = immutable::variable::Db<
crate::merkle::mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
crate::translator::TwoCap,
>;
fn create_sync_config(
suffix: &str,
pooler: &impl BufferPooler,
) -> immutable::variable::Config<crate::translator::TwoCap, ((), ())> {
const PAGE_SIZE: NonZeroU16 = NZU16!(77);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5);
let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
immutable::Config {
merkle_config: crate::merkle::journaled::Config {
journal_partition: format!("journal-{suffix}"),
metadata_partition: format!("metadata-{suffix}"),
items_per_blob: NZU64!(11),
write_buffer: NZUsize!(1024),
thread_pool: None,
page_cache: page_cache.clone(),
},
log: crate::journal::contiguous::variable::Config {
partition: format!("log-{suffix}"),
items_per_section: ITEMS_PER_SECTION,
compression: None,
codec_config: ((), ()),
page_cache,
write_buffer: NZUsize!(1024),
},
translator: TwoCap,
}
}
async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest {
let seed = context.next_u64();
let config = create_sync_config(&format!("sync-test-{seed}"), &context);
ImmutableSyncTest::init(context, config).await.unwrap()
}
fn create_test_ops(n: usize) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
create_test_ops_seeded(n, 0)
}
fn create_test_ops_seeded(
n: usize,
seed: u64,
) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
let mut rng = test_rng_seeded(seed);
let mut ops = Vec::new();
for _i in 0..n {
let key = sha256::Digest::random(&mut rng);
let value = sha256::Digest::random(&mut rng);
ops.push(Operation::Set(key, value));
}
ops
}
async fn apply_ops(
db: &mut ImmutableSyncTest,
ops: Vec<Operation<sha256::Digest, sha256::Digest>>,
metadata: Option<sha256::Digest>,
) {
let mut batch = db.new_batch();
for op in ops {
match op {
Operation::Set(key, value) => {
batch = batch.set(key, value);
}
Operation::Commit(_metadata) => {
panic!("Commit operation not supported in apply_ops");
}
}
}
let merkleized = batch.merkleize(db, metadata);
db.apply_batch(merkleized).await.unwrap();
}
#[rstest]
#[case::singleton_batch_size_one(1, NZU64!(1))]
#[case::singleton_batch_size_gt_db_size(1, NZU64!(2))]
#[case::batch_size_one(1000, NZU64!(1))]
#[case::floor_div_db_batch_size(1000, NZU64!(3))]
#[case::floor_div_db_batch_size_2(1000, NZU64!(999))]
#[case::div_db_batch_size(1000, NZU64!(100))]
#[case::db_size_eq_batch_size(1000, NZU64!(1000))]
#[case::batch_size_gt_db_size(1000, NZU64!(1001))]
fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: NonZeroU64) {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = create_test_db(context.with_label("target")).await;
let target_db_ops = create_test_ops(target_db_ops);
apply_ops(&mut target_db, target_db_ops.clone(), Some(Sha256::fill(1))).await;
let bounds = target_db.bounds().await;
let target_op_count = bounds.end;
let target_oldest_retained_loc = bounds.start;
let target_root = target_db.root();
let mut expected_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
for op in &target_db_ops {
if let Operation::Set(key, value) = op {
expected_kvs.insert(*key, *value);
}
}
let db_config =
create_sync_config(&format!("sync_client_{}", context.next_u64()), &context);
let target_db = Arc::new(target_db);
let config = Config {
db_config: db_config.clone(),
fetch_batch_size,
target: Target {
root: target_root,
range: non_empty_range!(target_oldest_retained_loc, target_op_count),
},
context: context.with_label("client"),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
let bounds = got_db.bounds().await;
assert_eq!(bounds.end, target_op_count);
assert_eq!(bounds.start, target_oldest_retained_loc);
assert_eq!(got_db.root(), target_root);
for (key, expected_value) in &expected_kvs {
let synced_value = got_db.get(key).await.unwrap();
assert_eq!(synced_value, Some(*expected_value));
}
let mut new_ops = Vec::new();
let mut rng = test_rng_seeded(1);
let mut new_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
for _i in 0..expected_kvs.len() {
let key = sha256::Digest::random(&mut rng);
let value = sha256::Digest::random(&mut rng);
new_ops.push(Operation::Set(key, value));
new_kvs.insert(key, value);
}
let mut got_db = got_db;
apply_ops(&mut got_db, new_ops.clone(), None).await;
let mut target_db = Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("target_db should have no other references"));
apply_ops(&mut target_db, new_ops.clone(), None).await;
for (key, expected_value) in &new_kvs {
let synced_value = got_db.get(key).await.unwrap();
assert_eq!(synced_value, Some(*expected_value));
let target_value = target_db.get(key).await.unwrap();
assert_eq!(target_value, Some(*expected_value));
}
got_db.destroy().await.unwrap();
target_db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_sync_empty_to_nonempty() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = create_test_db(context.with_label("target")).await;
apply_ops(&mut target_db, vec![], Some(Sha256::fill(1))).await;
let bounds = target_db.bounds().await;
let target_op_count = bounds.end;
let target_oldest_retained_loc = bounds.start;
let target_root = target_db.root();
let db_config =
create_sync_config(&format!("empty_sync_{}", context.next_u64()), &context);
let target_db = Arc::new(target_db);
let config = Config {
db_config,
fetch_batch_size: NZU64!(10),
target: Target {
root: target_root,
range: non_empty_range!(target_oldest_retained_loc, target_op_count),
},
context: context.with_label("client"),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
let bounds = got_db.bounds().await;
assert_eq!(bounds.end, target_op_count);
assert_eq!(bounds.start, target_oldest_retained_loc);
assert_eq!(got_db.root(), target_root);
assert_eq!(got_db.get_metadata().await.unwrap(), Some(Sha256::fill(1)));
got_db.destroy().await.unwrap();
let target_db = Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("Failed to unwrap Arc - still has references"));
target_db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_sync_database_persistence() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut target_db = create_test_db(context.with_label("target")).await;
let target_ops = create_test_ops(10);
apply_ops(&mut target_db, target_ops.clone(), Some(Sha256::fill(0))).await;
let target_root = target_db.root();
let bounds = target_db.bounds().await;
let lower_bound = bounds.start;
let op_count = bounds.end;
let db_config = create_sync_config("persistence-test", &context);
let client_context = context.with_label("client");
let target_db = Arc::new(target_db);
let config = Config {
db_config: db_config.clone(),
fetch_batch_size: NZU64!(5),
target: Target {
root: target_root,
range: non_empty_range!(lower_bound, op_count),
},
context: client_context.clone(),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
assert_eq!(synced_db.root(), target_root);
let expected_root = synced_db.root();
let bounds = synced_db.bounds().await;
let expected_op_count = bounds.end;
let expected_oldest_retained_loc = bounds.start;
synced_db.sync().await.unwrap();
drop(synced_db);
let reopened_db = ImmutableSyncTest::init(context.with_label("reopened"), db_config)
.await
.unwrap();
assert_eq!(reopened_db.root(), expected_root);
let bounds = reopened_db.bounds().await;
assert_eq!(bounds.end, expected_op_count);
assert_eq!(bounds.start, expected_oldest_retained_loc);
for op in &target_ops {
if let Operation::Set(key, value) = op {
let stored_value = reopened_db.get(key).await.unwrap();
assert_eq!(stored_value, Some(*value));
}
}
reopened_db.destroy().await.unwrap();
let target_db = Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("Failed to unwrap Arc - still has references"));
target_db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_target_update_during_sync() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = create_test_db(context.with_label("target")).await;
let initial_ops = create_test_ops(50);
apply_ops(&mut target_db, initial_ops.clone(), None).await;
let bounds = target_db.bounds().await;
let initial_lower_bound = bounds.start;
let initial_upper_bound = bounds.end;
let initial_root = target_db.root();
let additional_ops = create_test_ops_seeded(25, 1);
apply_ops(&mut target_db, additional_ops.clone(), None).await;
let final_upper_bound = target_db.bounds().await.end;
let final_root = target_db.root();
let target_db = Arc::new(target_db);
let (update_sender, update_receiver) = mpsc::channel(1);
let client = {
let config = Config {
context: context.with_label("client"),
db_config: create_sync_config(
&format!("update_test_{}", context.next_u64()),
&context,
),
target: Target {
root: initial_root,
range: non_empty_range!(initial_lower_bound, initial_upper_bound),
},
resolver: target_db.clone(),
fetch_batch_size: NZU64!(2), max_outstanding_requests: 10,
apply_batch_size: 1024,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
let mut client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
loop {
client = match client.step().await.unwrap() {
NextStep::Continue(new_client) => new_client,
NextStep::Complete(_) => panic!("client should not be complete"),
};
let log_size =
crate::journal::contiguous::Contiguous::size(client.journal()).await;
if log_size > initial_lower_bound {
break client;
}
}
};
update_sender
.send(Target {
root: final_root,
range: non_empty_range!(initial_lower_bound, final_upper_bound),
})
.await
.unwrap();
let synced_db = client.sync().await.unwrap();
assert_eq!(synced_db.root(), final_root);
let target_db = Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("Failed to unwrap Arc - still has references"));
{
let bounds = synced_db.bounds().await;
let target_bounds = target_db.bounds().await;
assert_eq!(bounds.end, target_bounds.end);
assert_eq!(bounds.start, target_bounds.start);
assert_eq!(synced_db.root(), target_db.root());
}
let all_ops = [initial_ops, additional_ops].concat();
for op in &all_ops {
if let Operation::Set(key, value) = op {
let synced_value = synced_db.get(key).await.unwrap();
assert_eq!(synced_value, Some(*value));
}
}
synced_db.destroy().await.unwrap();
target_db.destroy().await.unwrap();
});
}
#[test]
fn test_sync_subset_of_target_database() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = create_test_db(context.with_label("target")).await;
let target_ops = create_test_ops(30);
apply_ops(&mut target_db, target_ops[..29].to_vec(), None).await;
let target_root = target_db.root();
let bounds = target_db.bounds().await;
let lower_bound = bounds.start;
let op_count = bounds.end;
apply_ops(&mut target_db, target_ops[29..].to_vec(), None).await;
let target_db = Arc::new(target_db);
let config = Config {
db_config: create_sync_config(&format!("subset_{}", context.next_u64()), &context),
fetch_batch_size: NZU64!(10),
target: Target {
root: target_root,
range: non_empty_range!(lower_bound, op_count),
},
context: context.with_label("client"),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
assert_eq!(synced_db.root(), target_root);
assert_eq!(synced_db.bounds().await.end, op_count);
synced_db.destroy().await.unwrap();
let target_db =
Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
target_db.destroy().await.unwrap();
});
}
#[test]
fn test_sync_use_existing_db_partial_match() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let original_ops = create_test_ops(50);
let mut target_db = create_test_db(context.with_label("target")).await;
let sync_db_config =
create_sync_config(&format!("partial_{}", context.next_u64()), &context);
let client_context = context.with_label("client");
let mut sync_db: ImmutableSyncTest =
immutable::variable::Db::init(client_context.clone(), sync_db_config.clone())
.await
.unwrap();
apply_ops(&mut target_db, original_ops.clone(), None).await;
apply_ops(&mut sync_db, original_ops.clone(), None).await;
drop(sync_db);
let last_op = create_test_ops_seeded(1, 1);
apply_ops(&mut target_db, last_op.clone(), None).await;
let root = target_db.root();
let bounds = target_db.bounds().await;
let lower_bound = bounds.start;
let upper_bound = bounds.end;
let target_db = Arc::new(target_db);
let config = Config {
db_config: sync_db_config, fetch_batch_size: NZU64!(10),
target: Target {
root,
range: non_empty_range!(lower_bound, upper_bound),
},
context: context.with_label("sync"),
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
assert_eq!(sync_db.bounds().await.end, upper_bound);
assert_eq!(sync_db.root(), root);
sync_db.destroy().await.unwrap();
let target_db =
Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
target_db.destroy().await.unwrap();
});
}
#[test]
fn test_sync_use_existing_db_exact_match() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let target_ops = create_test_ops(40);
let mut target_db = create_test_db(context.with_label("target")).await;
let sync_config =
create_sync_config(&format!("exact_{}", context.next_u64()), &context);
let client_context = context.with_label("client");
let mut sync_db: ImmutableSyncTest =
immutable::variable::Db::init(client_context.clone(), sync_config.clone())
.await
.unwrap();
apply_ops(&mut target_db, target_ops.clone(), None).await;
apply_ops(&mut sync_db, target_ops.clone(), None).await;
drop(sync_db);
let root = target_db.root();
let bounds = target_db.bounds().await;
let lower_bound = bounds.start;
let upper_bound = bounds.end;
let resolver = Arc::new(target_db);
let config = Config {
db_config: sync_config,
fetch_batch_size: NZU64!(10),
target: Target {
root,
range: non_empty_range!(lower_bound, upper_bound),
},
context: context.with_label("sync"),
resolver: resolver.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: None,
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 8,
};
let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
assert_eq!(sync_db.bounds().await.end, upper_bound);
assert_eq!(sync_db.root(), root);
sync_db.destroy().await.unwrap();
let target_db =
Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
target_db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_target_update_lower_bound_decrease() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = create_test_db(context.with_label("target")).await;
let target_ops = create_test_ops(100);
apply_ops(&mut target_db, target_ops, None).await;
target_db.prune(Location::new(10)).await.unwrap();
let bounds = target_db.bounds().await;
let initial_lower_bound = bounds.start;
let initial_upper_bound = bounds.end;
let initial_root = target_db.root();
let (update_sender, update_receiver) = mpsc::channel(1);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: create_sync_config(&format!("lb-dec-{}", context.next_u64()), &context),
fetch_batch_size: NZU64!(5),
target: Target {
root: initial_root,
range: non_empty_range!(initial_lower_bound, initial_upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 10,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
update_sender
.send(Target {
root: initial_root,
range: non_empty_range!(
initial_lower_bound.checked_sub(1).unwrap(),
initial_upper_bound
),
})
.await
.unwrap();
let result = client.step().await;
assert!(matches!(
result,
Err(sync::Error::Engine(
sync::EngineError::SyncTargetMovedBackward { .. }
))
));
let target_db =
Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
target_db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_target_update_upper_bound_decrease() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = create_test_db(context.with_label("target")).await;
let target_ops = create_test_ops(50);
apply_ops(&mut target_db, target_ops, None).await;
let bounds = target_db.bounds().await;
let initial_lower_bound = bounds.start;
let initial_upper_bound = bounds.end;
let initial_root = target_db.root();
let (update_sender, update_receiver) = mpsc::channel(1);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: create_sync_config(&format!("ub-dec-{}", context.next_u64()), &context),
fetch_batch_size: NZU64!(5),
target: Target {
root: initial_root,
range: non_empty_range!(initial_lower_bound, initial_upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 10,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
update_sender
.send(Target {
root: initial_root,
range: non_empty_range!(initial_lower_bound, initial_upper_bound - 1),
})
.await
.unwrap();
let result = client.step().await;
assert!(matches!(
result,
Err(sync::Error::Engine(
sync::EngineError::SyncTargetMovedBackward { .. }
))
));
let target_db =
Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
target_db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_target_update_bounds_increase() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = create_test_db(context.with_label("target")).await;
let target_ops = create_test_ops(100);
apply_ops(&mut target_db, target_ops.clone(), None).await;
let bounds = target_db.bounds().await;
let initial_lower_bound = bounds.start;
let initial_upper_bound = bounds.end;
let initial_root = target_db.root();
let more_ops = create_test_ops_seeded(5, 1);
apply_ops(&mut target_db, more_ops, None).await;
target_db.prune(Location::new(10)).await.unwrap();
apply_ops(&mut target_db, vec![], None).await;
let bounds = target_db.bounds().await;
let final_lower_bound = bounds.start;
let final_upper_bound = bounds.end;
let final_root = target_db.root();
assert_ne!(final_lower_bound, initial_lower_bound);
assert_ne!(final_upper_bound, initial_upper_bound);
let (update_sender, update_receiver) = mpsc::channel(1);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: create_sync_config(
&format!("bounds_inc_{}", context.next_u64()),
&context,
),
fetch_batch_size: NZU64!(1),
target: Target {
root: initial_root,
range: non_empty_range!(initial_lower_bound, initial_upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 1,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
update_sender
.send(Target {
root: final_root,
range: non_empty_range!(final_lower_bound, final_upper_bound),
})
.await
.unwrap();
let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
assert_eq!(synced_db.root(), final_root);
let bounds = synced_db.bounds().await;
assert_eq!(bounds.end, final_upper_bound);
assert_eq!(bounds.start, final_lower_bound);
synced_db.destroy().await.unwrap();
let target_db = Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("Failed to unwrap Arc - still has references"));
target_db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_target_update_on_done_client() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut target_db = create_test_db(context.with_label("target")).await;
let target_ops = create_test_ops(10);
apply_ops(&mut target_db, target_ops, None).await;
let bounds = target_db.bounds().await;
let lower_bound = bounds.start;
let upper_bound = bounds.end;
let root = target_db.root();
let (update_sender, update_receiver) = mpsc::channel(1);
let target_db = Arc::new(target_db);
let config = Config {
context: context.with_label("client"),
db_config: create_sync_config(&format!("done_{}", context.next_u64()), &context),
fetch_batch_size: NZU64!(20),
target: Target {
root,
range: non_empty_range!(lower_bound, upper_bound),
},
resolver: target_db.clone(),
apply_batch_size: 1024,
max_outstanding_requests: 10,
update_rx: Some(update_receiver),
finish_rx: None,
reached_target_tx: None,
max_retained_roots: 1,
};
let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
let _ = update_sender
.send(Target {
root: sha256::Digest::from([2u8; 32]),
range: non_empty_range!(lower_bound + 1, upper_bound + 1),
})
.await;
assert_eq!(synced_db.root(), root);
let bounds = synced_db.bounds().await;
assert_eq!(bounds.end, upper_bound);
assert_eq!(bounds.start, lower_bound);
synced_db.destroy().await.unwrap();
Arc::try_unwrap(target_db)
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
.destroy()
.await
.unwrap();
});
}
}