use crate::{
index::unordered::Index,
journal::contiguous::variable::Journal,
merkle::{self, Location},
qmdb::{
any::{unordered, value::VariableEncoding, VariableConfig, VariableValue},
operation::Key,
Error,
},
translator::Translator,
Context,
};
use commonware_codec::{Codec, Read};
use commonware_cryptography::Hasher;
pub type Update<K, V> = unordered::Update<K, VariableEncoding<V>>;
pub type Operation<F, K, V> = unordered::Operation<F, K, VariableEncoding<V>>;
pub type Db<F, E, K, V, H, T> =
super::Db<F, E, Journal<E, Operation<F, K, V>>, Index<T, Location<F>>, H, Update<K, V>>;
impl<F: merkle::Family, E: Context, K: Key, V: VariableValue, H: Hasher, T: Translator>
Db<F, E, K, V, H, T>
where
Operation<F, K, V>: Codec,
{
pub async fn init(
context: E,
cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
) -> Result<Self, Error<F>> {
Self::init_with_callback(context, cfg, None, |_, _| {}).await
}
pub(crate) async fn init_with_callback(
context: E,
cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
known_inactivity_floor: Option<Location<F>>,
callback: impl FnMut(bool, Option<Location<F>>),
) -> Result<Self, Error<F>> {
crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
}
}
pub mod partitioned {
pub use super::{Operation, Update};
use crate::{
index::partitioned::unordered::Index,
journal::contiguous::variable::Journal,
merkle::{self, Location},
qmdb::{
any::{VariableConfig, VariableValue},
operation::Key,
Error,
},
translator::Translator,
Context,
};
use commonware_codec::{Codec, Read};
use commonware_cryptography::Hasher;
pub type Db<F, E, K, V, H, T, const P: usize> = crate::qmdb::any::unordered::Db<
F,
E,
Journal<E, Operation<F, K, V>>,
Index<T, Location<F>, P>,
H,
Update<K, V>,
>;
impl<
F: merkle::Family,
E: Context,
K: Key,
V: VariableValue,
H: Hasher,
T: Translator,
const P: usize,
> Db<F, E, K, V, H, T, P>
where
Operation<F, K, V>: Codec,
{
pub async fn init(
context: E,
cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
) -> Result<Self, Error<F>> {
Self::init_with_callback(context, cfg, None, |_, _| {}).await
}
pub(crate) async fn init_with_callback(
context: E,
cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
known_inactivity_floor: Option<Location<F>>,
callback: impl FnMut(bool, Option<Location<F>>),
) -> Result<Self, Error<F>> {
crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
}
}
pub mod p256 {
pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 1>;
}
pub mod p64k {
pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 2>;
}
}
#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::{index::Unordered as _, mmr, translator::TwoCap};
use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
use commonware_macros::test_traced;
use commonware_math::algebra::Random;
use commonware_runtime::{
buffer::paged::CacheRef,
deterministic::{self, Context},
BufferPooler, Metrics, Runner as _,
};
use commonware_utils::{test_rng_seeded, NZUsize, NZU16, NZU64};
use rand::RngCore;
use std::{
num::{NonZeroU16, NonZeroUsize},
sync::Arc,
};
const PAGE_SIZE: NonZeroU16 = NZU16!(77);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
pub(crate) fn create_test_config(seed: u64, pooler: &impl BufferPooler) -> VarConfig {
let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
VariableConfig {
merkle_config: crate::mmr::journaled::Config {
journal_partition: format!("journal-{seed}"),
metadata_partition: format!("metadata-{seed}"),
items_per_blob: NZU64!(13),
write_buffer: NZUsize!(1024),
thread_pool: None,
page_cache: page_cache.clone(),
},
journal_config: crate::journal::contiguous::variable::Config {
partition: format!("log-journal-{seed}"),
items_per_section: NZU64!(7),
write_buffer: NZUsize!(1024),
compression: None,
codec_config: ((), ((0..=10000).into(), ())),
page_cache,
},
translator: TwoCap,
}
}
pub(crate) type VarConfig =
VariableConfig<TwoCap, ((), (commonware_codec::RangeCfg<usize>, ()))>;
pub(crate) type AnyTest =
Db<mmr::Family, deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
let seed = context.next_u64();
let config = create_test_config(seed, &context);
AnyTest::init(context, config).await.unwrap()
}
fn to_bytes(i: u64) -> Vec<u8> {
let len = ((i % 13) + 7) as usize;
vec![(i % 255) as u8; len]
}
pub(crate) fn create_test_ops(
n: usize,
) -> Vec<unordered::Operation<mmr::Family, Digest, VariableEncoding<Vec<u8>>>> {
create_test_ops_seeded(n, 0)
}
pub(crate) fn create_test_ops_seeded(
n: usize,
seed: u64,
) -> Vec<unordered::Operation<mmr::Family, Digest, VariableEncoding<Vec<u8>>>> {
let mut rng = test_rng_seeded(seed);
let mut prev_key = Digest::random(&mut rng);
let mut ops = Vec::new();
for i in 0..n {
let key = Digest::random(&mut rng);
if i % 10 == 0 && i > 0 {
ops.push(unordered::Operation::Delete(prev_key));
} else {
let value = to_bytes(rng.next_u64());
ops.push(unordered::Operation::Update(unordered::Update(key, value)));
prev_key = key;
}
}
ops
}
pub(crate) async fn apply_ops(
db: &mut AnyTest,
ops: Vec<unordered::Operation<mmr::Family, Digest, VariableEncoding<Vec<u8>>>>,
) {
let mut batch = db.new_batch();
for op in ops {
match op {
unordered::Operation::Update(unordered::Update(key, value)) => {
batch = batch.write(key, Some(value));
}
unordered::Operation::Delete(key) => {
batch = batch.write(key, None);
}
unordered::Operation::CommitFloor(_, _) => {
panic!("CommitFloor not supported in apply_ops");
}
}
}
let merkleized = batch.merkleize(db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
async fn open_db(context: deterministic::Context) -> AnyTest {
let cfg = create_test_config(0, &context);
AnyTest::init(context, cfg).await.unwrap()
}
#[test_traced("WARN")]
pub fn test_any_variable_db_build_and_authenticate() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let db = open_db(context.clone()).await;
crate::qmdb::any::test::test_any_db_build_and_authenticate(
context,
db,
|ctx| Box::pin(open_db(ctx)),
to_bytes,
)
.await;
});
}
#[test_traced("WARN")]
pub fn test_any_variable_db_recovery() {
let executor = deterministic::Runner::default();
const ELEMENTS: u64 = 1000;
executor.start(|context| async move {
let db = open_db(context.with_label("open1")).await;
let root = db.root();
{
let mut batch = db.new_batch();
for i in 0..ELEMENTS {
batch = batch.write(
Sha256::hash(&i.to_be_bytes()),
Some(vec![(i % 255) as u8; ((i % 13) + 7) as usize]),
);
}
let _ = batch.merkleize(&db, None).await.unwrap();
}
drop(db);
let mut db = open_db(context.with_label("open2")).await;
assert_eq!(root, db.root());
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let root = db.root();
{
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
if i % 3 != 0 {
continue;
}
let k = Sha256::hash(&i.to_be_bytes());
let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
batch = batch.write(k, Some(v));
}
let _ = batch.merkleize(&db, None).await.unwrap();
}
drop(db);
let mut db = open_db(context.with_label("open3")).await;
assert_eq!(root, db.root());
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
if i % 3 != 0 {
continue;
}
let k = Sha256::hash(&i.to_be_bytes());
let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let root = db.root();
{
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
if i % 7 != 1 {
continue;
}
let k = Sha256::hash(&i.to_be_bytes());
batch = batch.write(k, None);
}
let _ = batch.merkleize(&db, None).await.unwrap();
}
drop(db);
let mut db = open_db(context.with_label("open4")).await;
assert_eq!(root, db.root());
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
if i % 7 != 1 {
continue;
}
let k = Sha256::hash(&i.to_be_bytes());
batch = batch.write(k, None);
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let root = db.root();
let inactivity_floor = db.inactivity_floor_loc();
db.sync().await.unwrap(); db.prune(inactivity_floor).await.unwrap();
let bounds = db.bounds().await;
let snapshot_items = db.snapshot.items();
db.sync().await.unwrap();
drop(db);
let db = open_db(context.with_label("open5")).await;
assert_eq!(root, db.root());
assert_eq!(db.bounds().await, bounds);
assert_eq!(db.inactivity_floor_loc(), inactivity_floor);
assert_eq!(db.snapshot.items(), snapshot_items);
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_any_variable_db_prune_beyond_inactivity_floor() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut db = open_db(context.clone()).await;
let key1 = Digest::random(&mut context);
let key2 = Digest::random(&mut context);
let key3 = Digest::random(&mut context);
let merkleized = db
.new_batch()
.write(key1, Some(vec![10]))
.write(key2, Some(vec![20]))
.write(key3, Some(vec![30]))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
let inactivity_floor = db.inactivity_floor_loc();
let beyond_floor = Location::new(*inactivity_floor + 1);
let result = db.prune(beyond_floor).await;
assert!(
matches!(result, Err(Error::PruneBeyondMinRequired(loc, floor))
if loc == beyond_floor && floor == inactivity_floor)
);
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_stale_batch_rejected() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_db(context.clone()).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let batch_a = db
.new_batch()
.write(key1, Some(vec![10]))
.merkleize(&db, None)
.await
.unwrap();
let batch_b = db
.new_batch()
.write(key2, Some(vec![20]))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(batch_a).await.unwrap();
let expected_root = db.root();
let expected_bounds = db.bounds().await;
assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
assert_eq!(db.get(&key2).await.unwrap(), None);
let result = db.apply_batch(batch_b).await;
assert!(
matches!(result, Err(Error::StaleBatch { .. })),
"expected StaleBatch error, got {result:?}"
);
assert_eq!(db.root(), expected_root);
assert_eq!(db.bounds().await, expected_bounds);
assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
assert_eq!(db.get(&key2).await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_stale_batch_rejected_different_sizes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_db(context.clone()).await;
let batch_a = db
.new_batch()
.write(Sha256::hash(&[1]), Some(vec![10]))
.merkleize(&db, None)
.await
.unwrap();
let batch_b = db
.new_batch()
.write(Sha256::hash(&[2]), Some(vec![20]))
.write(Sha256::hash(&[3]), Some(vec![30]))
.write(Sha256::hash(&[4]), Some(vec![40]))
.write(Sha256::hash(&[5]), Some(vec![50]))
.write(Sha256::hash(&[6]), Some(vec![60]))
.merkleize(&db, None)
.await
.unwrap();
assert!(batch_b.total_size > batch_a.total_size);
db.apply_batch(batch_a).await.unwrap();
let result = db.apply_batch(batch_b).await;
assert!(
matches!(result, Err(Error::StaleBatch { .. })),
"expected StaleBatch for asymmetric sibling, got {result:?}"
);
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_partial_ancestor_commit() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_db(context.clone()).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let key3 = Sha256::hash(&[3]);
let a = db
.new_batch()
.write(key1, Some(vec![10]))
.merkleize(&db, None)
.await
.unwrap();
let b = a
.new_batch::<Sha256>()
.write(key2, Some(vec![20]))
.merkleize(&db, None)
.await
.unwrap();
let c = b
.new_batch::<Sha256>()
.write(key3, Some(vec![30]))
.merkleize(&db, None)
.await
.unwrap();
let expected_root = c.root();
db.apply_batch(a).await.unwrap();
db.apply_batch(c).await.unwrap();
assert_eq!(db.root(), expected_root);
assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
assert_eq!(db.get(&key2).await.unwrap(), Some(vec![20]));
assert_eq!(db.get(&key3).await.unwrap(), Some(vec![30]));
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_stale_batch_chained() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_db(context.clone()).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let key3 = Sha256::hash(&[3]);
let merkleized = db
.new_batch()
.write(key1, Some(vec![10]))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
let parent = db
.new_batch()
.write(key2, Some(vec![20]))
.merkleize(&db, None)
.await
.unwrap();
let child_a = parent
.new_batch::<Sha256>()
.write(key3, Some(vec![30]))
.merkleize(&db, None)
.await
.unwrap();
let child_b = parent
.new_batch::<Sha256>()
.write(key3, Some(vec![40]))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(child_a).await.unwrap();
let result = db.apply_batch(child_b).await;
assert!(
matches!(result, Err(Error::StaleBatch { .. })),
"expected StaleBatch error for sibling, got {result:?}"
);
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_sequential_commit_parent_then_child() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_db(context.clone()).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let parent = db
.new_batch()
.write(key1, Some(vec![10]))
.merkleize(&db, None)
.await
.unwrap();
let child = parent
.new_batch::<Sha256>()
.write(key2, Some(vec![20]))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent).await.unwrap();
db.apply_batch(child).await.unwrap();
assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
assert_eq!(db.get(&key2).await.unwrap(), Some(vec![20]));
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_stale_batch_child_applied_before_parent() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_db(context.clone()).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let parent = db
.new_batch()
.write(key1, Some(vec![10]))
.merkleize(&db, None)
.await
.unwrap();
let child = parent
.new_batch::<Sha256>()
.write(key2, Some(vec![20]))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(child).await.unwrap();
let result = db.apply_batch(parent).await;
assert!(
matches!(result, Err(Error::StaleBatch { .. })),
"expected StaleBatch for parent after child applied, got {result:?}"
);
db.destroy().await.unwrap();
});
}
mod from_sync_testable {
use super::*;
use crate::{
merkle::{
mmr::{self, journaled::Mmr},
Family as _,
},
qmdb::any::sync::tests::FromSyncTestable,
};
use futures::future::join_all;
type TestMmr = Mmr<deterministic::Context, Digest>;
impl FromSyncTestable for AnyTest {
type Mmr = TestMmr;
fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
(self.log.merkle, self.log.journal)
}
async fn pinned_nodes_at(&self, loc: mmr::Location) -> Vec<Digest> {
join_all(mmr::Family::nodes_to_pin(loc).map(|p| self.log.merkle.get_node(p)))
.await
.into_iter()
.map(|n| n.unwrap().unwrap())
.collect()
}
}
}
#[allow(dead_code, clippy::manual_async_fn)]
fn issue_2787_regression(
db: &crate::qmdb::immutable::variable::Db<
mmr::Family,
deterministic::Context,
Digest,
Vec<u8>,
Sha256,
TwoCap,
>,
key: Digest,
) -> impl std::future::Future<Output = ()> + Send + use<'_> {
async move {
let _ = db.get(&key).await;
}
}
fn is_send<T: Send>(_: T) {}
#[allow(dead_code)]
fn assert_non_trait_futures_are_send(db: &AnyTest, key: Digest, value: Vec<u8>) {
let batch = db.new_batch().write(key, Some(value));
is_send(batch.merkleize(db, None));
is_send(db.get_with_loc(&key));
}
#[test_traced("WARN")]
fn test_owned_batch_root_matches() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(20)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let ops = create_test_ops_seeded(10, 99);
let mut batch = db.new_batch();
for op in &ops {
match op {
unordered::Operation::Update(unordered::Update(k, v)) => {
batch = batch.write(*k, Some(v.clone()));
}
unordered::Operation::Delete(k) => {
batch = batch.write(*k, None);
}
_ => unreachable!(),
}
}
let borrow_root = batch.merkleize(&db, None).await.unwrap().root();
let mut batch = base.new_batch::<Sha256>();
for op in &ops {
match op {
unordered::Operation::Update(unordered::Update(k, v)) => {
batch = batch.write(*k, Some(v.clone()));
}
unordered::Operation::Delete(k) => {
batch = batch.write(*k, None);
}
_ => unreachable!(),
}
}
let batch_root = batch.merkleize(&db, None).await.unwrap().root();
assert_eq!(borrow_root, batch_root);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_owned_batch_apply() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(20)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key = Digest::random(&mut commonware_utils::test_rng_seeded(200));
let value = vec![42u8; 16];
let child_batch = base
.new_batch::<Sha256>()
.write(key, Some(value.clone()))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(child_batch).await.unwrap();
db.commit().await.unwrap();
let fetched = db.get(&key).await.unwrap();
assert_eq!(fetched.unwrap(), value);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_owned_batch_chain_commit_parent_first() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(10)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(300));
let val_a = vec![1u8; 10];
let parent_batch = base
.new_batch::<Sha256>()
.write(key_a, Some(val_a.clone()))
.merkleize(&db, None)
.await
.unwrap();
let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(301));
let val_b = vec![2u8; 10];
let child_batch = parent_batch
.new_batch::<Sha256>()
.write(key_b, Some(val_b.clone()))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent_batch).await.unwrap();
db.commit().await.unwrap();
db.apply_batch(child_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_owned_batch_multiple_forks() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(10)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(400));
let fork_a = base
.new_batch::<Sha256>()
.write(key_a, Some(vec![10u8; 8]))
.merkleize(&db, None)
.await
.unwrap();
let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(401));
let fork_b = base
.new_batch::<Sha256>()
.write(key_b, Some(vec![20u8; 8]))
.merkleize(&db, None)
.await
.unwrap();
assert_ne!(fork_a.root(), fork_b.root());
db.apply_batch(fork_a).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_a).await.unwrap().unwrap(), vec![10u8; 8]);
assert!(db.get(&key_b).await.unwrap().is_none());
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_owned_batch_homogeneous_collection() {
use crate::qmdb::any::batch::MerkleizedBatch;
use commonware_cryptography::sha256;
use std::collections::HashMap;
type Snap = MerkleizedBatch<mmr::Family, sha256::Digest, super::Update<Digest, Vec<u8>>>;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(10)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let mut collection: HashMap<sha256::Digest, Arc<Snap>> = HashMap::new();
let key = Digest::random(&mut commonware_utils::test_rng_seeded(500));
let batch1 = base
.new_batch::<Sha256>()
.write(key, Some(vec![1u8; 8]))
.merkleize(&db, None)
.await
.unwrap();
collection.insert(batch1.root(), batch1);
let batch1_root = *collection.keys().next().unwrap();
let batch1_ref = collection.get(&batch1_root).unwrap();
let key = Digest::random(&mut commonware_utils::test_rng_seeded(501));
let batch2 = batch1_ref
.new_batch::<Sha256>()
.write(key, Some(vec![2u8; 8]))
.merkleize(&db, None)
.await
.unwrap();
collection.insert(batch2.root(), batch2);
assert_eq!(collection.len(), 2);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_owned_batch_chain_delete_after_ancestor_insert() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(5)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_x = Digest::random(&mut commonware_utils::test_rng_seeded(700));
let val_a = vec![10u8; 8];
let parent_batch = base
.new_batch::<Sha256>()
.write(key_x, Some(val_a.clone()))
.merkleize(&db, None)
.await
.unwrap();
let child_batch = parent_batch
.new_batch::<Sha256>()
.write(key_x, None)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
db.apply_batch(child_batch).await.unwrap();
db.commit().await.unwrap();
assert!(db.get(&key_x).await.unwrap().is_none());
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_owned_batch_chain_overlapping_keys() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(5)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_x = Digest::random(&mut commonware_utils::test_rng_seeded(600));
let val_a = vec![10u8; 8];
let parent_batch = base
.new_batch::<Sha256>()
.write(key_x, Some(val_a.clone()))
.merkleize(&db, None)
.await
.unwrap();
let val_b = vec![20u8; 8];
let child_batch = parent_batch
.new_batch::<Sha256>()
.write(key_x, Some(val_b.clone()))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
db.apply_batch(child_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_b);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_owned_batch_chain_three_deep() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(10)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(900));
let val_a = vec![1u8; 10];
let grandparent_batch = base
.new_batch::<Sha256>()
.write(key_a, Some(val_a.clone()))
.merkleize(&db, None)
.await
.unwrap();
let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(901));
let val_b = vec![2u8; 10];
let parent_batch = grandparent_batch
.new_batch::<Sha256>()
.write(key_b, Some(val_b.clone()))
.merkleize(&db, None)
.await
.unwrap();
let key_c = Digest::random(&mut commonware_utils::test_rng_seeded(902));
let val_c = vec![3u8; 10];
let child_batch = parent_batch
.new_batch::<Sha256>()
.write(key_c, Some(val_c.clone()))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(grandparent_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
db.apply_batch(parent_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
db.apply_batch(child_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_owned_batch_chain_three_deep_overlapping_key() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(5)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_x = Digest::random(&mut commonware_utils::test_rng_seeded(910));
let val_a = vec![10u8; 8];
let grandparent_batch = base
.new_batch::<Sha256>()
.write(key_x, Some(val_a.clone()))
.merkleize(&db, None)
.await
.unwrap();
let val_b = vec![20u8; 8];
let parent_batch = grandparent_batch
.new_batch::<Sha256>()
.write(key_x, Some(val_b.clone()))
.merkleize(&db, None)
.await
.unwrap();
let child_batch = parent_batch
.new_batch::<Sha256>()
.write(key_x, None)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(grandparent_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
db.apply_batch(parent_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_b);
db.apply_batch(child_batch).await.unwrap();
db.commit().await.unwrap();
assert!(db.get(&key_x).await.unwrap().is_none());
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_new_child_after_ancestor_committed_and_dropped() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(5)).await;
db.commit().await.unwrap();
let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(800));
let val_a = vec![10u8; 8];
let a = db
.new_batch()
.write(key_a, Some(val_a.clone()))
.merkleize(&db, None)
.await
.unwrap();
let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(801));
let val_b = vec![20u8; 8];
let b = a
.new_batch::<Sha256>()
.write(key_b, Some(val_b.clone()))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(a).await.unwrap();
db.commit().await.unwrap();
let key_c = Digest::random(&mut commonware_utils::test_rng_seeded(802));
let val_c = vec![30u8; 8];
let c = b
.new_batch::<Sha256>()
.write(key_c, Some(val_c.clone()))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(b).await.unwrap();
db.commit().await.unwrap();
db.apply_batch(c).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_apply_batch_after_ancestor_dropped_without_commit() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(5)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(700));
let val_a = vec![1u8; 10];
let a = base
.new_batch::<Sha256>()
.write(key_a, Some(val_a.clone()))
.merkleize(&db, None)
.await
.unwrap();
let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(701));
let val_b = vec![2u8; 10];
let b = a
.new_batch::<Sha256>()
.write(key_b, Some(val_b.clone()))
.merkleize(&db, None)
.await
.unwrap();
let key_c = Digest::random(&mut commonware_utils::test_rng_seeded(702));
let val_c = vec![3u8; 10];
let c = b
.new_batch::<Sha256>()
.write(key_c, Some(val_c.clone()))
.merkleize(&db, None)
.await
.unwrap();
drop(a);
drop(b);
db.apply_batch(c).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
db.destroy().await.unwrap();
});
}
}