use crate::{
index::Factory as IndexFactory,
journal::{
authenticated::Inner,
contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
},
merkle::{self, full::Config as MerkleConfig, Location},
qmdb::{
self,
any::{
self,
operation::{Operation, Update},
Config as AnyConfig,
},
bitmap::Shared,
operation::Committable,
},
translator::Translator,
Context,
};
use commonware_codec::{CodecShared, FixedSize};
use commonware_cryptography::Hasher;
use commonware_parallel::Strategy;
use commonware_utils::{bitmap::Prunable as BitMap, sync::AsyncMutex};
use std::sync::Arc;
pub mod batch;
pub mod db;
pub mod grafting;
pub mod ordered;
pub mod proof;
pub(crate) mod sync;
pub mod unordered;
use self::db::Metrics;
#[derive(Clone)]
pub struct Config<T: Translator, J, S: Strategy> {
pub merkle_config: MerkleConfig<S>,
pub journal_config: J,
pub grafted_metadata_partition: String,
pub translator: T,
}
impl<T: Translator, J, S: Strategy> From<Config<T, J, S>> for AnyConfig<T, J, S> {
fn from(cfg: Config<T, J, S>) -> Self {
Self {
merkle_config: cfg.merkle_config,
journal_config: cfg.journal_config,
translator: cfg.translator,
}
}
}
pub type FixedConfig<T, S> = Config<T, FConfig, S>;
pub type VariableConfig<T, C, S> = Config<T, VConfig<C>, S>;
pub(super) async fn init<F, E, U, H, T, I, J, const N: usize, S>(
context: E,
config: Config<T, J::Config, S>,
) -> Result<db::Db<F, E, J, I, H, U, N, S>, crate::qmdb::Error<F>>
where
F: merkle::Graftable,
E: Context,
U: Update + Send + Sync,
H: Hasher,
T: Translator,
I: IndexFactory<T, Value = Location<F>>,
J: Inner<E, Item = Operation<F, U>>,
S: Strategy,
Operation<F, U>: Committable + CodecShared,
{
const {
assert!(
N.is_multiple_of(H::Digest::SIZE),
"chunk size must be some multiple of the digest size",
);
assert!(N.is_power_of_two(), "chunk size must be a power of 2");
}
let strategy = config.merkle_config.strategy.clone();
let metadata_partition = config.grafted_metadata_partition.clone();
let (metadata, pruned_chunks, pinned_nodes) =
db::init_metadata(context.child("metadata"), &metadata_partition).await?;
let bitmap = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
.map_err(|_| crate::qmdb::Error::<F>::DataCorrupted("pruned chunks overflow"))?;
let bitmap = Arc::new(Shared::<N>::new(bitmap));
let any = any::init_with_bitmap(context.child("any"), config.into(), Some(bitmap)).await?;
let hasher = qmdb::hasher::<H>();
let ops_size = any.log.merkle.size();
let ops_leaves = crate::merkle::Location::<F>::try_from(ops_size)?;
let grafted_tree = db::build_grafted_tree::<F, H, S, N>(
&hasher,
any.bitmap.as_ref(),
&pinned_nodes,
&any.log.merkle,
ops_leaves,
&strategy,
)
.await?;
let storage = grafting::Storage::new(
&grafted_tree,
grafting::height::<N>(),
&any.log.merkle,
hasher.clone(),
);
let partial_chunk = db::partial_chunk(any.bitmap.as_ref());
let ops_root = any.root();
let root = db::compute_db_root(
&hasher,
any.bitmap.as_ref(),
&storage,
ops_leaves,
partial_chunk,
any.inactivity_floor_loc,
&ops_root,
)
.await?;
let metrics = Metrics::new(context);
let db = db::Db {
any,
grafted_tree,
metadata: AsyncMutex::new(metadata),
strategy,
root,
metrics,
};
db.update_metrics();
Ok(db)
}
#[cfg(any(test, feature = "test-traits"))]
pub trait BitmapPrunedBits {
fn pruned_bits(&self) -> u64;
fn get_bit(&self, index: u64) -> bool;
fn oldest_retained(&self) -> impl core::future::Future<Output = u64> + Send;
}
#[cfg(test)]
pub mod tests {
pub use super::BitmapPrunedBits;
use super::{ordered, unordered, FConfig, FixedConfig, MerkleConfig, VConfig, VariableConfig};
use crate::{
merkle::{self, mmb, mmr, Bagging::ForwardFold},
qmdb::{
self,
any::{
test::colliding_digest,
traits::{DbAny, MerkleizedBatch as _, UnmerkleizedBatch as _},
},
store::tests::{TestKey, TestValue},
},
translator::Translator,
};
use commonware_parallel::Sequential;
use commonware_runtime::{
buffer::paged::CacheRef,
deterministic::{self, Context},
BufferPooler, Runner as _, Supervisor as _,
};
use commonware_utils::{bitmap::Readable, NZUsize, NZU16, NZU64};
use core::future::Future;
use rand::{rngs::StdRng, RngCore, SeedableRng};
use std::{
num::{NonZeroU16, NonZeroUsize},
sync::Arc,
};
use tracing::warn;
type Error<F> = crate::qmdb::Error<F>;
type Location<F> = merkle::Location<F>;
type WriteVec<F, C> = Vec<(<C as DbAny<F>>::Key, Option<<C as DbAny<F>>::Value>)>;
const PAGE_SIZE: NonZeroU16 = NZU16!(88);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(8);
pub(crate) fn fixed_config<T: Translator + Default>(
partition_prefix: &str,
pooler: &impl BufferPooler,
) -> FixedConfig<T, Sequential> {
let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
FixedConfig {
merkle_config: MerkleConfig {
journal_partition: format!("{partition_prefix}-journal-partition"),
metadata_partition: format!("{partition_prefix}-metadata-partition"),
items_per_blob: NZU64!(11),
write_buffer: NZUsize!(1024),
strategy: Sequential,
page_cache: page_cache.clone(),
},
journal_config: FConfig {
partition: format!("{partition_prefix}-partition-prefix"),
items_per_blob: NZU64!(7),
page_cache,
write_buffer: NZUsize!(1024),
},
grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
translator: T::default(),
}
}
pub(crate) fn variable_config<T: Translator + Default>(
partition_prefix: &str,
pooler: &impl BufferPooler,
) -> VariableConfig<T, ((), ()), Sequential> {
let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
VariableConfig {
merkle_config: MerkleConfig {
journal_partition: format!("{partition_prefix}-journal-partition"),
metadata_partition: format!("{partition_prefix}-metadata-partition"),
items_per_blob: NZU64!(11),
write_buffer: NZUsize!(1024),
strategy: Sequential,
page_cache: page_cache.clone(),
},
journal_config: VConfig {
partition: format!("{partition_prefix}-partition-prefix"),
items_per_section: NZU64!(7),
compression: None,
codec_config: ((), ()),
page_cache,
write_buffer: NZUsize!(1024),
},
grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
translator: T::default(),
}
}
async fn commit_writes<F: merkle::Graftable, C: DbAny<F>>(
db: &mut C,
writes: impl IntoIterator<Item = (C::Key, Option<<C as DbAny<F>>::Value>)>,
) -> Result<(), Error<F>> {
let mut batch = db.new_batch();
for (k, v) in writes {
batch = batch.write(k, v);
}
let merkleized = batch.merkleize(db, None).await?;
db.apply_batch(merkleized).await?;
db.commit().await?;
Ok(())
}
async fn apply_random_ops_inner<F, C>(
num_elements: u64,
commit_changes: bool,
rng_seed: u64,
mut db: C,
) -> Result<C, Error<F>>
where
F: merkle::Graftable,
C: DbAny<F>,
C::Key: TestKey,
<C as DbAny<F>>::Value: TestValue,
{
warn!("rng_seed={}", rng_seed);
let mut rng = StdRng::seed_from_u64(rng_seed);
let writes: Vec<_> = (0u64..num_elements)
.map(|i| {
let k = TestKey::from_seed(i);
let v = TestValue::from_seed(rng.next_u64());
(k, Some(v))
})
.collect();
if commit_changes {
commit_writes(&mut db, writes).await?;
}
let mut pending: WriteVec<F, C> = Vec::new();
for _ in 0u64..num_elements * 10 {
let rand_key = TestKey::from_seed(rng.next_u64() % num_elements);
if rng.next_u32() % 7 == 0 {
pending.push((rand_key, None));
continue;
}
let v = TestValue::from_seed(rng.next_u64());
pending.push((rand_key, Some(v)));
if commit_changes && rng.next_u32() % 20 == 0 {
commit_writes(&mut db, pending.drain(..)).await?;
}
}
if commit_changes {
commit_writes(&mut db, pending).await?;
}
Ok(db)
}
pub fn apply_random_ops<F, C>(
num_elements: u64,
commit_changes: bool,
rng_seed: u64,
db: C,
) -> std::pin::Pin<Box<dyn Future<Output = Result<C, Error<F>>>>>
where
F: merkle::Graftable + 'static,
C: DbAny<F> + 'static,
C::Key: TestKey,
<C as DbAny<F>>::Value: TestValue,
{
Box::pin(apply_random_ops_inner::<F, C>(
num_elements,
commit_changes,
rng_seed,
db,
))
}
pub fn test_build_random_close_reopen<M, C, F, Fut>(mut open_db: F)
where
M: merkle::Graftable + 'static,
C: DbAny<M> + 'static,
C::Key: TestKey,
<C as DbAny<M>>::Value: TestValue,
F: FnMut(Context, String) -> Fut + Clone,
Fut: Future<Output = C>,
{
const ELEMENTS: u64 = 1000;
let executor = deterministic::Runner::default();
let mut open_db_clone = open_db.clone();
let state1 = executor.start(|mut context| async move {
let partition = "build-random".to_string();
let rng_seed = context.next_u64();
let mut db: C = open_db_clone(context.child("first"), partition.clone()).await;
db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
.await
.unwrap();
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.sync().await.unwrap();
let root = db.root();
drop(db);
let db: C = open_db_clone(context.child("second"), partition).await;
assert_eq!(db.root(), root);
db.destroy().await.unwrap();
context.auditor().state()
});
let executor = deterministic::Runner::default();
let state2 = executor.start(|mut context| async move {
let partition = "build-random".to_string();
let rng_seed = context.next_u64();
let mut db: C = open_db(context.child("first"), partition.clone()).await;
db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
.await
.unwrap();
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.sync().await.unwrap();
let root = db.root();
drop(db);
let db: C = open_db(context.child("second"), partition).await;
assert_eq!(db.root(), root);
db.destroy().await.unwrap();
context.auditor().state()
});
assert_eq!(state1, state2);
}
pub fn test_simulate_write_failures<M, C, F, Fut>(mut open_db: F)
where
M: merkle::Graftable + 'static,
C: DbAny<M> + 'static,
C::Key: TestKey,
<C as DbAny<M>>::Value: TestValue,
F: FnMut(Context, String) -> Fut + Clone,
Fut: Future<Output = C>,
{
const ELEMENTS: u64 = 1000;
let executor = deterministic::Runner::default();
executor.start(|mut context| {
Box::pin(async move {
let partition = "build-random-fail-commit".to_string();
let rng_seed = context.next_u64();
let mut db: C = open_db(context.child("first"), partition.clone()).await;
db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
.await
.unwrap();
commit_writes(&mut db, []).await.unwrap();
let committed_root = db.root();
let committed_op_count = db.bounds().await.end;
db.prune(db.sync_boundary().await).await.unwrap();
let db = apply_random_ops::<M, C>(ELEMENTS, false, rng_seed + 1, db)
.await
.unwrap();
drop(db);
let db: C = open_db(
context.child("scenario").with_attribute("index", 1),
partition.clone(),
)
.await;
assert_eq!(db.root(), committed_root);
assert_eq!(db.bounds().await.end, committed_op_count);
let db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
.await
.unwrap();
let committed_op_count = db.bounds().await.end;
drop(db);
let db: C = open_db(
context.child("scenario").with_attribute("index", 2),
partition.clone(),
)
.await;
let scenario_2_root = db.root();
let fresh_partition = "build-random-fail-commit-fresh".to_string();
let mut db: C = open_db(context.child("fresh"), fresh_partition.clone()).await;
db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
.await
.unwrap();
commit_writes(&mut db, []).await.unwrap();
db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
.await
.unwrap();
db.prune(db.sync_boundary().await).await.unwrap();
assert_eq!(db.bounds().await.end, committed_op_count);
assert_eq!(db.root(), scenario_2_root);
db.destroy().await.unwrap();
})
});
}
pub fn test_different_pruning_delays_same_root<M, C, F, Fut>(mut open_db: F)
where
M: merkle::Graftable,
C: DbAny<M>,
C::Key: TestKey,
<C as DbAny<M>>::Value: TestValue,
F: FnMut(Context, String) -> Fut + Clone,
Fut: Future<Output = C>,
{
const NUM_OPERATIONS: u64 = 1000;
let executor = deterministic::Runner::default();
let mut open_db_clone = open_db.clone();
executor.start(|context| async move {
let mut db_no_pruning: C =
open_db_clone(context.child("no_pruning"), "no-pruning-test".into()).await;
let mut db_pruning: C = open_db(context.child("pruning"), "pruning-test".into()).await;
let mut pending_no_pruning: WriteVec<M, C> = Vec::new();
let mut pending_pruning: WriteVec<M, C> = Vec::new();
for i in 0..NUM_OPERATIONS {
let key: C::Key = TestKey::from_seed(i);
let value: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
pending_no_pruning.push((key, Some(value.clone())));
pending_pruning.push((key, Some(value)));
if i % 50 == 49 {
commit_writes(&mut db_no_pruning, pending_no_pruning.drain(..))
.await
.unwrap();
commit_writes(&mut db_pruning, pending_pruning.drain(..))
.await
.unwrap();
db_pruning
.prune(db_no_pruning.sync_boundary().await)
.await
.unwrap();
}
}
commit_writes(&mut db_no_pruning, pending_no_pruning)
.await
.unwrap();
commit_writes(&mut db_pruning, pending_pruning)
.await
.unwrap();
let root_no_pruning = db_no_pruning.root();
let root_pruning = db_pruning.root();
assert_eq!(root_no_pruning, root_pruning);
assert_eq!(
db_no_pruning.inactivity_floor_loc().await,
db_pruning.inactivity_floor_loc().await
);
db_no_pruning.destroy().await.unwrap();
db_pruning.destroy().await.unwrap();
});
}
pub fn test_sync_persists_bitmap_pruning_boundary<M, C, F, Fut>(mut open_db: F)
where
M: merkle::Graftable + 'static,
C: DbAny<M> + BitmapPrunedBits + 'static,
C::Key: TestKey,
<C as DbAny<M>>::Value: TestValue,
F: FnMut(Context, String) -> Fut + Clone,
Fut: Future<Output = C>,
{
const ELEMENTS: u64 = 500;
let executor = deterministic::Runner::default();
let mut open_db_clone = open_db.clone();
executor.start(|mut context| async move {
let partition = "sync-bitmap-pruning".to_string();
let rng_seed = context.next_u64();
let mut db: C = open_db_clone(context.child("first"), partition.clone()).await;
db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db).await.unwrap();
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.prune(db.sync_boundary().await).await.unwrap();
let pruned_bits_before = db.pruned_bits();
warn!(
"pruned_bits_before={}, inactivity_floor={}, op_count={}",
pruned_bits_before,
*db.inactivity_floor_loc().await,
*db.bounds().await.end
);
assert!(
pruned_bits_before > 0,
"Expected bitmap to have pruned bits after prune()"
);
db.sync().await.unwrap();
let root_before = db.root();
drop(db);
let db: C = open_db(context.child("second"), partition).await;
let pruned_bits_after = db.pruned_bits();
warn!("pruned_bits_after={}", pruned_bits_after);
assert_eq!(
pruned_bits_after, pruned_bits_before,
"Bitmap pruned bits mismatch after reopen - sync() may not have called write_pruned()"
);
assert_eq!(db.root(), root_before);
db.destroy().await.unwrap();
});
}
pub fn test_current_db_build_big<M, C, F, Fut>(mut open_db: F)
where
M: merkle::Graftable,
C: DbAny<M>,
C::Key: TestKey,
<C as DbAny<M>>::Value: TestValue,
F: FnMut(Context, String) -> Fut + Clone,
Fut: Future<Output = C>,
{
const ELEMENTS: u64 = 1000;
let executor = deterministic::Runner::default();
let mut open_db_clone = open_db.clone();
executor.start(|context| async move {
let mut db: C = open_db_clone(context.child("first"), "build-big".into()).await;
let mut map = std::collections::HashMap::<C::Key, <C as DbAny<M>>::Value>::default();
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k: C::Key = TestKey::from_seed(i);
let v: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
batch = batch.write(k, Some(v.clone()));
map.insert(k, v);
}
for i in 0u64..ELEMENTS {
if i % 3 != 0 {
continue;
}
let k: C::Key = TestKey::from_seed(i);
let v: <C as DbAny<M>>::Value = TestValue::from_seed((i + 1) * 10000);
batch = batch.write(k, Some(v.clone()));
map.insert(k, v);
}
for i in 0u64..ELEMENTS {
if i % 7 != 1 {
continue;
}
let k: C::Key = TestKey::from_seed(i);
batch = batch.write(k, None);
map.remove(&k);
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.sync().await.unwrap();
db.prune(db.sync_boundary().await).await.unwrap();
let root = db.root();
db.sync().await.unwrap();
drop(db);
let db: C = open_db(context.child("second"), "build-big".into()).await;
assert_eq!(root, db.root());
for i in 0u64..ELEMENTS {
let k: C::Key = TestKey::from_seed(i);
if let Some(map_value) = map.get(&k) {
let Some(db_value) = db.get(&k).await.unwrap() else {
panic!("key not found in db: {k}");
};
assert_eq!(*map_value, db_value);
} else {
assert!(db.get(&k).await.unwrap().is_none());
}
}
});
}
pub fn test_stale_batch_side_effect_free<M, C, F, Fut>(mut open_db: F)
where
M: merkle::Graftable,
C: DbAny<M>,
C::Key: TestKey,
<C as DbAny<M>>::Value: TestValue,
F: FnMut(Context, String) -> Fut,
Fut: Future<Output = C>,
{
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db: C = open_db(context.child("db"), "stale-side-effect-free".into()).await;
let key1 = <C::Key as TestKey>::from_seed(1);
let key2 = <C::Key as TestKey>::from_seed(2);
let value1 = <<C as DbAny<M>>::Value as TestValue>::from_seed(10);
let value2 = <<C as DbAny<M>>::Value as TestValue>::from_seed(20);
let mut batch = db.new_batch();
batch = batch.write(key1, Some(value1.clone()));
let batch_a = batch.merkleize(&db, None).await.unwrap();
let mut batch = db.new_batch();
batch = batch.write(key2, Some(value2));
let batch_b = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(batch_a).await.unwrap();
let expected_root = db.root();
let expected_bounds = db.bounds().await;
let expected_metadata = db.get_metadata().await.unwrap();
assert_eq!(db.get(&key1).await.unwrap(), Some(value1.clone()));
assert_eq!(db.get(&key2).await.unwrap(), None);
let result = db.apply_batch(batch_b).await;
assert!(
matches!(result, Err(Error::StaleBatch { .. })),
"expected StaleBatch error, got {result:?}"
);
assert_eq!(db.root(), expected_root);
assert_eq!(db.bounds().await, expected_bounds);
assert_eq!(db.get_metadata().await.unwrap(), expected_metadata);
assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
assert_eq!(db.get(&key2).await.unwrap(), None);
db.destroy().await.unwrap();
});
}
use crate::translator::OneCap;
use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
use commonware_macros::{test_group, test_traced};
type OrderedFixedDb =
ordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
type OrderedVariableDb =
ordered::variable::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
type UnorderedFixedDb =
unordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
type UnorderedVariableDb = unordered::variable::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
32,
Sequential,
>;
type OrderedFixedP1Db = ordered::fixed::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
1,
32,
Sequential,
>;
type OrderedVariableP1Db = ordered::variable::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
1,
32,
Sequential,
>;
type UnorderedFixedP1Db = unordered::fixed::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
1,
32,
Sequential,
>;
type UnorderedVariableP1Db = unordered::variable::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
1,
32,
Sequential,
>;
type OrderedFixedP2Db = ordered::fixed::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
2,
32,
Sequential,
>;
type OrderedVariableP2Db = ordered::variable::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
2,
32,
Sequential,
>;
type UnorderedFixedP2Db = unordered::fixed::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
2,
32,
Sequential,
>;
type UnorderedVariableP2Db = unordered::variable::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
2,
32,
Sequential,
>;
type OrderedFixedMmbDb =
ordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
type OrderedVariableMmbDb =
ordered::variable::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
type UnorderedFixedMmbDb =
unordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
type UnorderedVariableMmbDb = unordered::variable::Db<
mmb::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
32,
Sequential,
>;
type OrderedFixedMmbP1Db = ordered::fixed::partitioned::Db<
mmb::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
1,
32,
Sequential,
>;
type OrderedVariableMmbP1Db = ordered::variable::partitioned::Db<
mmb::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
1,
32,
Sequential,
>;
type UnorderedFixedMmbP1Db = unordered::fixed::partitioned::Db<
mmb::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
1,
32,
Sequential,
>;
type UnorderedVariableMmbP1Db = unordered::variable::partitioned::Db<
mmb::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
1,
32,
Sequential,
>;
type OrderedFixedMmbP2Db = ordered::fixed::partitioned::Db<
mmb::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
2,
32,
Sequential,
>;
type OrderedVariableMmbP2Db = ordered::variable::partitioned::Db<
mmb::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
2,
32,
Sequential,
>;
type UnorderedFixedMmbP2Db = unordered::fixed::partitioned::Db<
mmb::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
2,
32,
Sequential,
>;
type UnorderedVariableMmbP2Db = unordered::variable::partitioned::Db<
mmb::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
2,
32,
Sequential,
>;
macro_rules! open_db_fn {
($db:ty, $cfg:ident) => {
|ctx: Context, partition: String| async move {
<$db>::init(ctx.child("storage"), $cfg::<OneCap>(&partition, &ctx))
.await
.unwrap()
}
};
}
macro_rules! with_all_variants {
($cb:ident!($($args:tt)*)) => {
$cb!($($args)*, "of", OrderedFixedDb, fixed_config);
$cb!($($args)*, "ov", OrderedVariableDb, variable_config);
$cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
$cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
$cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
$cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
$cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
$cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
$cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
$cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
$cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
$cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
$cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
$cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
$cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
$cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
$cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
$cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
$cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
$cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
$cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
$cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
$cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
$cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
};
}
macro_rules! with_ordered_variants {
($cb:ident!($($args:tt)*)) => {
$cb!($($args)*, "of", OrderedFixedDb, fixed_config);
$cb!($($args)*, "ov", OrderedVariableDb, variable_config);
$cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
$cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
$cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
$cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
$cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
$cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
$cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
$cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
$cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
$cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
};
}
macro_rules! with_unordered_variants {
($cb:ident!($($args:tt)*)) => {
$cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
$cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
$cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
$cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
$cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
$cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
$cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
$cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
$cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
$cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
$cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
$cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
};
}
macro_rules! test_simple {
($f:expr, $l:literal, $db:ty, $cfg:ident) => {
Box::pin(async {
$f(open_db_fn!($db, $cfg));
})
.await
};
}
macro_rules! for_all_variants {
(simple: $f:expr) => {{
with_all_variants!(test_simple!($f));
}};
(ordered: $f:expr) => {{
with_ordered_variants!(test_simple!($f));
}};
(unordered: $f:expr) => {{
with_unordered_variants!(test_simple!($f));
}};
}
fn test_ordered_build_big<M, C, F, Fut>(open_db: F)
where
M: merkle::Graftable,
C: DbAny<M>,
C::Key: TestKey,
<C as DbAny<M>>::Value: TestValue,
F: FnMut(Context, String) -> Fut + Clone,
Fut: Future<Output = C>,
{
test_current_db_build_big::<M, C, F, Fut>(open_db);
}
fn test_unordered_build_big<M, C, F, Fut>(open_db: F)
where
M: merkle::Graftable,
C: DbAny<M>,
C::Key: TestKey,
<C as DbAny<M>>::Value: TestValue,
F: FnMut(Context, String) -> Fut + Clone,
Fut: Future<Output = C>,
{
test_current_db_build_big::<M, C, F, Fut>(open_db);
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_build_random_close_reopen() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(simple: test_build_random_close_reopen);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_simulate_write_failures() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(simple: test_simulate_write_failures);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_different_pruning_delays_same_root() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(simple: test_different_pruning_delays_same_root);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_sync_persists_bitmap_pruning_boundary() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(simple: test_sync_persists_bitmap_pruning_boundary);
});
}
#[test_traced("WARN")]
fn test_all_variants_stale_batch_side_effect_free() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(simple: test_stale_batch_side_effect_free);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_ordered_variants_build_big() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(ordered: test_ordered_build_big);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_unordered_variants_build_big() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(unordered: test_unordered_build_big);
});
}
#[test_group("slow")]
#[test_traced("DEBUG")]
fn test_ordered_variants_build_small_close_reopen() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(ordered: ordered::tests::test_build_small_close_reopen);
});
}
#[test_group("slow")]
#[test_traced("DEBUG")]
fn test_unordered_variants_build_small_close_reopen() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(unordered: unordered::tests::test_build_small_close_reopen);
});
}
fn key(i: u64) -> Digest {
Sha256::hash(&i.to_be_bytes())
}
fn val(i: u64) -> Digest {
Sha256::hash(&(i + 10000).to_be_bytes())
}
async fn mmb_commit(
db: &mut UnorderedVariableMmbDb,
writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
) {
let mut batch = db.new_batch();
for (k, v) in writes {
batch = batch.write(k, v);
}
let merkleized = batch.merkleize(db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
}
async fn commit_writes_with_metadata(
db: &mut UnorderedVariableDb,
writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
metadata: Option<Digest>,
) -> std::ops::Range<Location<mmr::Family>> {
let mut batch = db.new_batch();
for (k, v) in writes {
batch = batch.write(k, v);
}
let merkleized = batch.merkleize(db, metadata).await.unwrap();
let range = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
range
}
#[test_traced("INFO")]
fn test_current_rewind_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let partition = "current-rewind-recovery";
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>(partition, &ctx),
)
.await
.unwrap();
let initial_size = db.bounds().await.end;
let initial_root = db.root();
let initial_ops_root = db.ops_root();
let initial_floor = db.inactivity_floor_loc();
let metadata_a = val(900);
let first_range = commit_writes_with_metadata(
&mut db,
[(key(0), Some(val(0))), (key(1), Some(val(1)))],
Some(metadata_a),
)
.await;
assert_eq!(first_range.start, initial_size);
let size_before = db.bounds().await.end;
let root_before = db.root();
let ops_root_before = db.ops_root();
let floor_before = db.inactivity_floor_loc();
assert_eq!(size_before, first_range.end);
let metadata_b = val(901);
let second_range = commit_writes_with_metadata(
&mut db,
[
(key(0), Some(val(100))),
(key(1), None),
(key(2), Some(val(2))),
],
Some(metadata_b),
)
.await;
assert_eq!(second_range.start, size_before);
assert_ne!(db.root(), root_before);
assert_eq!(db.get_metadata().await.unwrap(), Some(val(901)));
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
assert_eq!(db.get(&key(1)).await.unwrap(), None);
assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
db.rewind(size_before).await.unwrap();
assert_eq!(db.bounds().await.end, size_before);
assert_eq!(db.root(), root_before);
assert_eq!(db.ops_root(), ops_root_before);
assert_eq!(db.inactivity_floor_loc(), floor_before);
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
assert_eq!(db.get(&key(2)).await.unwrap(), None);
db.commit().await.unwrap();
drop(db);
let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
context.child("reopen"),
variable_config::<OneCap>(partition, &context),
)
.await
.unwrap();
assert_eq!(reopened.bounds().await.end, size_before);
assert_eq!(reopened.root(), root_before);
assert_eq!(reopened.ops_root(), ops_root_before);
assert_eq!(reopened.inactivity_floor_loc(), floor_before);
assert_eq!(reopened.get_metadata().await.unwrap(), Some(val(900)));
assert_eq!(reopened.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(reopened.get(&key(1)).await.unwrap(), Some(val(1)));
assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
let mut reopened = reopened;
reopened.rewind(initial_size).await.unwrap();
assert_eq!(reopened.bounds().await.end, initial_size);
assert_eq!(reopened.root(), initial_root);
assert_eq!(reopened.ops_root(), initial_ops_root);
assert_eq!(reopened.inactivity_floor_loc(), initial_floor);
assert_eq!(reopened.get_metadata().await.unwrap(), None);
assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
assert_eq!(reopened.get(&key(1)).await.unwrap(), None);
assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
reopened.commit().await.unwrap();
drop(reopened);
let reopened_initial: UnorderedVariableDb = UnorderedVariableDb::init(
context.child("reopen_initial"),
variable_config::<OneCap>(partition, &context),
)
.await
.unwrap();
assert_eq!(reopened_initial.bounds().await.end, initial_size);
assert_eq!(reopened_initial.root(), initial_root);
assert_eq!(reopened_initial.ops_root(), initial_ops_root);
assert_eq!(reopened_initial.inactivity_floor_loc(), initial_floor);
assert_eq!(reopened_initial.get_metadata().await.unwrap(), None);
assert_eq!(reopened_initial.get(&key(0)).await.unwrap(), None);
assert_eq!(reopened_initial.get(&key(1)).await.unwrap(), None);
assert_eq!(reopened_initial.get(&key(2)).await.unwrap(), None);
reopened_initial.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_rewind_recovery_pruned_repeated_updates() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const COMMITS: u64 = 96;
let partition = "current-rewind-pruned-recovery";
let ctx = context.child("db");
let mut db: UnorderedVariableDb =
UnorderedVariableDb::init(ctx.child("storage"), variable_config::<OneCap>(partition, &ctx))
.await
.unwrap();
let key0 = key(0);
let mut history = Vec::new();
for round in 0..COMMITS {
commit_writes_with_metadata(
&mut db,
[(key0, Some(val(20_000 + round)))],
None,
)
.await;
history.push((
db.bounds().await.end,
db.inactivity_floor_loc(),
db.root(),
db.ops_root(),
val(20_000 + round),
));
}
db.prune(Location::new(1)).await.unwrap();
let pruned_bits = db.pruned_bits();
assert!(pruned_bits > 0, "expected bitmap pruning for rewind test");
let bounds = db.bounds().await;
let (target_size, target_root, target_ops_root, target_value) = history
.iter()
.enumerate()
.find_map(|(idx, (size, floor, root, ops_root, value))| {
let removed_commits = history.len() - idx - 1;
if removed_commits >= 3 && *size > bounds.start && *floor >= pruned_bits {
Some((*size, *root, *ops_root, *value))
} else {
None
}
})
.unwrap_or_else(|| {
panic!(
"expected legal pruned rewind target with repeated updates; bounds={bounds:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
db.inactivity_floor_loc()
)
});
db.rewind(target_size).await.unwrap();
assert_eq!(db.root(), target_root);
assert_eq!(db.ops_root(), target_ops_root);
assert_eq!(db.bounds().await.end, target_size);
assert_eq!(db.get(&key0).await.unwrap(), Some(target_value));
db.commit().await.unwrap();
drop(db);
let mut reopened: UnorderedVariableDb = UnorderedVariableDb::init(
context.child("reopen_pruned_recovery"),
variable_config::<OneCap>(partition, &context),
)
.await
.unwrap();
assert_eq!(reopened.root(), target_root);
assert_eq!(reopened.ops_root(), target_ops_root);
assert_eq!(reopened.bounds().await.end, target_size);
assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
let metadata_after_rewind = val(30_000);
let new_key = key(1);
let new_value = val(30_001);
let expected_end = commit_writes_with_metadata(
&mut reopened,
[(new_key, Some(new_value))],
Some(metadata_after_rewind),
)
.await
.end;
let root_after_new_write = reopened.root();
let ops_root_after_new_write = reopened.ops_root();
assert_eq!(reopened.bounds().await.end, expected_end);
assert_eq!(reopened.get_metadata().await.unwrap(), Some(metadata_after_rewind));
assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
assert_eq!(reopened.get(&new_key).await.unwrap(), Some(new_value));
drop(reopened);
let reopened_after_new_write: UnorderedVariableDb = UnorderedVariableDb::init(
context.child("reopen_pruned_after_new_write"),
variable_config::<OneCap>(partition, &context),
)
.await
.unwrap();
assert_eq!(reopened_after_new_write.root(), root_after_new_write);
assert_eq!(reopened_after_new_write.ops_root(), ops_root_after_new_write);
assert_eq!(reopened_after_new_write.bounds().await.end, expected_end);
assert_eq!(
reopened_after_new_write.get_metadata().await.unwrap(),
Some(metadata_after_rewind)
);
assert_eq!(reopened_after_new_write.get(&key0).await.unwrap(), Some(target_value));
assert_eq!(
reopened_after_new_write.get(&new_key).await.unwrap(),
Some(new_value)
);
reopened_after_new_write.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_mmb_settlement_guard_defers_pruning() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const COMMITS: u64 = 100;
let partition = "current-mmb-reopen-prove-after-prune";
let ctx = context.child("db");
let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
ctx.child("storage"),
variable_config::<OneCap>(partition, &ctx),
)
.await
.unwrap();
let k = key(0);
let mut expected = None;
for round in 0..COMMITS {
expected = Some(val(50_000 + round));
let mut batch = db.new_batch();
batch = batch.write(k, expected);
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
}
let root_before = db.root();
assert!(
*db.inactivity_floor_loc() >= 256,
"expected inactivity floor past chunk 0"
);
assert_eq!(
*db.sync_boundary(),
0,
"settlement guard should hold boundary at 0 during unsettled window"
);
let result = db.prune(Location::<mmb::Family>::new(1)).await;
assert!(
matches!(result, Err(Error::PruneBeyondMinRequired(_, _))),
"expected PruneBeyondMinRequired, got {result:?}"
);
assert_eq!(db.pruned_bits(), 0);
db.sync().await.unwrap();
drop(db);
let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
context.child("reopen"),
variable_config::<OneCap>(partition, &context),
)
.await
.unwrap();
assert_eq!(reopened.root(), root_before);
assert_eq!(reopened.get(&k).await.unwrap(), expected);
let hasher = qmdb::hasher::<Sha256>();
let _proof = reopened.key_value_proof(&hasher, k).await.unwrap();
reopened.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_mmb_rewind_rejects_unsettled_pruned_window() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const COMMITS: u64 = 320;
const N: usize = 32;
let partition = "current-mmb-rewind-unsettled-window";
let ctx = context.child("db");
let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
ctx.child("storage"),
variable_config::<OneCap>(partition, &ctx),
)
.await
.unwrap();
let key0 = key(0);
let mut history = Vec::new();
for round in 0..COMMITS {
let mut batch = db.new_batch();
batch = batch.write(key0, Some(val(60_000 + round)));
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
history.push((db.bounds().await.end, db.inactivity_floor_loc()));
}
db.prune(db.sync_boundary()).await.unwrap();
let pruned_bits = db.pruned_bits();
assert!(pruned_bits > 0, "expected MMB bitmap pruning to be active");
db.sync().await.unwrap();
let chunk_bits = commonware_utils::bitmap::BitMap::<N>::CHUNK_SIZE_BITS;
let pruned_chunks = (pruned_bits / chunk_bits) as u64;
let gh = super::grafting::height::<N>();
let youngest = pruned_chunks - 1;
let pair_chunk = youngest & !1;
let pair_start = pair_chunk << gh;
let pair_pos = <mmb::Family as merkle::Graftable>::subtree_root_position(
merkle::Location::<mmb::Family>::new(pair_start),
gh + 1,
);
let absorbed_after =
<mmb::Family as merkle::Graftable>::peak_birth_size(pair_pos, gh + 1);
let unsafe_target = history
.iter()
.filter_map(|(size, floor)| {
let s = **size;
if s >= pruned_bits && s < absorbed_after && **floor >= pruned_bits {
Some(s)
} else {
None
}
})
.max()
.unwrap_or_else(|| {
panic!(
"expected rewind target in unsettled window: pruned_bits={pruned_bits}, absorbed_after={absorbed_after}, history={history:?}"
)
});
let err = db
.rewind(merkle::Location::<mmb::Family>::new(unsafe_target))
.await
.unwrap_err();
assert!(
matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
"unexpected rewind error for unsettled delayed-merge window: {err:?}"
);
drop(db);
let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
context.child("reopen"),
variable_config::<OneCap>(partition, &context),
)
.await
.unwrap();
reopened.destroy().await.unwrap();
});
}
#[test_traced]
fn test_current_mmb_prune_respects_sync_boundary() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const COMMITS: u64 = 320;
let ctx = context.child("db");
let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
ctx.child("storage"),
variable_config::<OneCap>("prune-clip-mmb", &ctx),
)
.await
.unwrap();
let k = key(0);
for round in 0..COMMITS {
mmb_commit(&mut db, [(k, Some(val(70_000 + round)))]).await;
}
db.prune(db.sync_boundary()).await.unwrap();
let boundary = db.sync_boundary();
let floor = db.inactivity_floor_loc();
assert!(
boundary < floor,
"delayed-merge lag must be strictly active: boundary={boundary}, floor={floor}"
);
assert!(
db.bounds().await.start <= boundary,
"ops journal was pruned past the settled bitmap boundary: \
bounds.start={}, boundary={boundary}",
db.bounds().await.start
);
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_current_mmr_prune_boundary_lag_is_only_chunk_alignment() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const COMMITS: u64 = 320;
const N: usize = 32;
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("prune-clip-mmr", &ctx),
)
.await
.unwrap();
for round in 0..COMMITS {
commit_writes_with_metadata(
&mut db,
[(key(0), Some(val(80_000 + round)))],
None,
)
.await;
}
db.prune(db.sync_boundary()).await.unwrap();
let boundary = db.sync_boundary();
let floor = db.inactivity_floor_loc();
let chunk_bits = commonware_utils::bitmap::BitMap::<N>::CHUNK_SIZE_BITS;
assert!(
boundary <= floor && *floor - *boundary < chunk_bits,
"MMR lag should be only chunk alignment: boundary={boundary}, floor={floor}, chunk_bits={chunk_bits}"
);
assert!(
db.bounds().await.start <= boundary,
"ops journal bounds must be <= sync_boundary: bounds.start={}, boundary={boundary}",
db.bounds().await.start
);
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_current_prune_below_settled_boundary_is_honored() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const COMMITS: u64 = 100;
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("prune-below-boundary", &ctx),
)
.await
.unwrap();
for round in 0..COMMITS {
commit_writes_with_metadata(&mut db, [(key(0), Some(val(90_000 + round)))], None)
.await;
}
assert!(*db.inactivity_floor_loc() > 1);
let small = Location::new(1);
db.prune(small).await.unwrap();
assert!(
db.bounds().await.start <= small,
"journal pruning exceeded the caller-supplied target: bounds.start={}, requested={small}",
db.bounds().await.start
);
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_current_mmb_reopen_and_prove_after_prune_delayed_merge() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let db_ctx = context.child("db_init");
let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
db_ctx.child("db"),
variable_config::<OneCap>("test_prune_delayed_merge", &db_ctx),
)
.await
.unwrap();
let k = key(0);
for round in 0..200u64 {
mmb_commit(&mut db, [(k, Some(val(60_000 + round)))]).await;
}
db.prune(db.sync_boundary()).await.unwrap();
db.sync().await.unwrap();
for round in 200..300u64 {
mmb_commit(&mut db, [(key(1), Some(val(round)))]).await;
}
let hasher = qmdb::hasher::<Sha256>();
let proof = db.key_value_proof(&hasher, k).await.unwrap();
assert!(UnorderedVariableMmbDb::verify_key_value_proof(
&hasher,
k,
val(60_000 + 199),
&proof,
&db.root()
));
let target_root = db.root();
drop(db);
let reopen_ctx = context.child("db_reopen");
let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
reopen_ctx.child("db"),
variable_config::<OneCap>("test_prune_delayed_merge", &reopen_ctx),
)
.await
.unwrap();
assert_eq!(reopened.root(), target_root);
let hasher = qmdb::hasher::<Sha256>();
let proof = reopened.key_value_proof(&hasher, k).await.unwrap();
assert!(UnorderedVariableMmbDb::verify_key_value_proof(
&hasher,
k,
val(60_000 + 199),
&proof,
&reopened.root()
));
reopened.destroy().await.unwrap();
});
}
#[test_traced]
fn test_current_mmb_reopen_after_prune_two_chunks() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let db_ctx = context.child("db");
let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
db_ctx.child("db"),
variable_config::<OneCap>("test_prune_two", &db_ctx),
)
.await
.unwrap();
let k = key(0);
let mut expected;
let mut round = 0u64;
loop {
expected = Some(val(60_000 + round));
mmb_commit(&mut db, [(k, expected)]).await;
round += 1;
db.prune(db.sync_boundary()).await.unwrap();
if db.pruned_bits() >= 512 {
break;
}
assert!(
round < 500,
"failed to reach 2 pruned chunks after {round} commits"
);
}
db.sync().await.unwrap();
let target_root = db.root();
drop(db);
let reopen_ctx = context.child("db_reopen");
let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
reopen_ctx.child("db"),
variable_config::<OneCap>("test_prune_two", &reopen_ctx),
)
.await
.unwrap();
assert_eq!(reopened.root(), target_root);
assert_eq!(reopened.get(&k).await.unwrap(), expected);
reopened.destroy().await.unwrap();
});
}
#[test_traced]
fn test_current_mmb_repeated_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db_ctx = context.child("db_init");
let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
db_ctx.child("db"),
variable_config::<OneCap>("test_repeated_prune", &db_ctx),
)
.await
.unwrap();
for round in 0..3u64 {
let k = key(round * 1000);
let mut expected = None;
for i in 0..90 {
expected = Some(val(round * 1000 + i));
mmb_commit(&mut db, [(k, expected)]).await;
}
db.prune(db.sync_boundary()).await.unwrap();
db.sync().await.unwrap();
let root_before = db.root();
db_ctx = context.child("db").with_attribute("round", round);
let prev_db = db;
db = UnorderedVariableMmbDb::init(
db_ctx.child("db"),
variable_config::<OneCap>("test_repeated_prune", &db_ctx),
)
.await
.unwrap();
assert_eq!(db.root(), root_before);
assert_eq!(db.get(&k).await.unwrap(), expected);
drop(prev_db);
}
db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_current_mmb_stepwise_growth_matches_unpruned_reference() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let db_ctx = context.child("db_stepwise");
let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
db_ctx.child("db"),
variable_config::<OneCap>("test_stepwise", &db_ctx),
)
.await
.unwrap();
let ref_ctx = context.child("ref_stepwise");
let mut ref_db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
ref_ctx.child("db"),
variable_config::<OneCap>("test_stepwise_ref", &ref_ctx),
)
.await
.unwrap();
let k = key(0);
let mut commit_idx = 0u64;
while *db.inactivity_floor_loc() < 1024 {
let value = Some(val(80_000 + commit_idx));
mmb_commit(&mut db, [(k, value)]).await;
mmb_commit(&mut ref_db, [(k, value)]).await;
commit_idx += 1;
}
db.prune(db.sync_boundary()).await.unwrap();
db.sync().await.unwrap();
assert_eq!(
db.root(),
ref_db.root(),
"root mismatch immediately after prune"
);
loop {
let db_leaves =
*Location::<mmb::Family>::try_from(db.any.log.merkle.size()).unwrap();
if db_leaves >= 1560 {
break;
}
let value = Some(val(80_000 + commit_idx));
mmb_commit(&mut db, [(k, value)]).await;
mmb_commit(&mut ref_db, [(k, value)]).await;
commit_idx += 1;
let db_leaves =
*Location::<mmb::Family>::try_from(db.any.log.merkle.size()).unwrap();
assert_eq!(
db.root(),
ref_db.root(),
"stepwise root mismatch: leaves={db_leaves}, commit_idx={commit_idx}"
);
}
db.destroy().await.unwrap();
ref_db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_current_mmb_large_repeated_prune_matches_unpruned_reference() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const ROUNDS: u64 = 8;
const COMMITS_PER_ROUND: u64 = 120;
let mut db_ctx = context.child("db_init");
let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
db_ctx.child("db"),
variable_config::<OneCap>("test_large_prune", &db_ctx),
)
.await
.unwrap();
let ref_ctx = context.child("ref");
let mut ref_db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
ref_ctx.child("db"),
variable_config::<OneCap>("test_large_prune_ref", &ref_ctx),
)
.await
.unwrap();
let k = key(0);
let mut expected = None;
for round in 0..ROUNDS {
for i in 0..COMMITS_PER_ROUND {
let value = Some(val(round * 10_000 + i));
expected = value;
mmb_commit(&mut db, [(k, value)]).await;
mmb_commit(&mut ref_db, [(k, value)]).await;
}
assert_eq!(
db.root(),
ref_db.root(),
"root mismatch before prune at round {round}"
);
db.prune(db.sync_boundary()).await.unwrap();
db.sync().await.unwrap();
assert_eq!(
db.root(),
ref_db.root(),
"root mismatch after prune at round {round}"
);
let hasher = qmdb::hasher::<Sha256>();
let proof = db.key_value_proof(&hasher, k).await.unwrap();
assert!(
UnorderedVariableMmbDb::verify_key_value_proof(
&hasher,
k,
expected.expect("value should exist"),
&proof,
&db.root()
),
"proof verification failed at round {round}"
);
db_ctx = context.child("db_reopen").with_attribute("round", round);
let prev_db = db;
db = UnorderedVariableMmbDb::init(
db_ctx.child("db"),
variable_config::<OneCap>("test_large_prune", &db_ctx),
)
.await
.unwrap();
assert_eq!(
db.root(),
ref_db.root(),
"root mismatch after reopen at round {round}"
);
assert_eq!(
db.get(&k).await.unwrap(),
expected,
"value mismatch after reopen at round {round}"
);
let hasher = qmdb::hasher::<Sha256>();
let proof = db.key_value_proof(&hasher, k).await.unwrap();
assert!(
UnorderedVariableMmbDb::verify_key_value_proof(
&hasher,
k,
expected.expect("value should exist"),
&proof,
&db.root()
),
"proof verification failed after reopen at round {round}"
);
drop(prev_db);
}
db.destroy().await.unwrap();
ref_db.destroy().await.unwrap();
});
}
#[test_traced]
fn test_current_prune_rejects_beyond_sync_boundary_without_mutation() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const COMMITS: u64 = 160;
let partition = "current-prune-beyond-boundary";
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>(partition, &ctx),
)
.await
.unwrap();
let key0 = key(0);
for round in 0..COMMITS {
commit_writes_with_metadata(&mut db, [(key0, Some(val(40_000 + round)))], None)
.await;
}
let expected_root = db.root();
let expected_ops_root = db.ops_root();
let expected_boundary = db.sync_boundary();
let expected_pruned_bits = db.pruned_bits();
let expected_value = db.get(&key0).await.unwrap();
let invalid_prune_loc = Location::new(*expected_boundary + 256);
let result = db.prune(invalid_prune_loc).await;
assert!(
matches!(result, Err(Error::PruneBeyondMinRequired(loc, boundary))
if loc == invalid_prune_loc && boundary == expected_boundary),
"expected prune rejection above sync boundary, got {result:?}"
);
assert_eq!(db.root(), expected_root);
assert_eq!(db.ops_root(), expected_ops_root);
assert_eq!(db.pruned_bits(), expected_pruned_bits);
assert_eq!(db.get(&key0).await.unwrap(), expected_value);
drop(db);
let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
context.child("reopen"),
variable_config::<OneCap>(partition, &context),
)
.await
.unwrap();
assert_eq!(reopened.root(), expected_root);
assert_eq!(reopened.ops_root(), expected_ops_root);
assert_eq!(reopened.pruned_bits(), expected_pruned_bits);
assert_eq!(reopened.get(&key0).await.unwrap(), expected_value);
reopened.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_rewind_small_delta_large_history() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const COMMITS: u64 = 200;
let partition = "current-rewind-small-delta";
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>(partition, &ctx),
)
.await
.unwrap();
let key0 = key(0);
let key1 = key(1);
let mut history = Vec::new();
for round in 0..COMMITS {
let key0_value = val(40_000 + round);
let key1_value = if round % 3 == 1 {
None
} else {
Some(val(50_000 + round))
};
commit_writes_with_metadata(
&mut db,
[(key0, Some(key0_value)), (key1, key1_value)],
None,
)
.await;
history.push((
db.bounds().await.end,
db.root(),
db.ops_root(),
key0_value,
key1_value,
));
}
let target = *history
.get(history.len() - 3)
.expect("history should contain at least three commits");
let (target_size, target_root, target_ops_root, target_key0, target_key1) = target;
db.rewind(target_size).await.unwrap();
assert_eq!(db.bounds().await.end, target_size);
assert_eq!(db.root(), target_root);
assert_eq!(db.ops_root(), target_ops_root);
assert_eq!(db.get(&key0).await.unwrap(), Some(target_key0));
assert_eq!(db.get(&key1).await.unwrap(), target_key1);
db.commit().await.unwrap();
drop(db);
let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
context.child("reopen_small_delta"),
variable_config::<OneCap>(partition, &context),
)
.await
.unwrap();
assert_eq!(reopened.bounds().await.end, target_size);
assert_eq!(reopened.root(), target_root);
assert_eq!(reopened.ops_root(), target_ops_root);
assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_key0));
assert_eq!(reopened.get(&key1).await.unwrap(), target_key1);
reopened.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_rewind_pruned_target_errors() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const KEYS: u64 = 384;
let partition = "current-rewind-pruned";
let ctx = context.child("db");
let mut db: UnorderedVariableDb =
UnorderedVariableDb::init(ctx.child("storage"), variable_config::<OneCap>(partition, &ctx))
.await
.unwrap();
let first_range = commit_writes_with_metadata(
&mut db,
(0..KEYS).map(|i| (key(i), Some(val(i)))),
None,
)
.await;
commit_writes_with_metadata(
&mut db,
(0..KEYS).map(|i| (key(i), Some(val(1000 + i)))),
None,
)
.await;
db.prune(db.sync_boundary()).await.unwrap();
let pruned_bits = db.pruned_bits();
assert!(
pruned_bits > *first_range.start,
"expected bitmap pruning boundary above rewind target: pruned_bits={pruned_bits}, target={:?}",
first_range.start
);
let oldest_retained = db.bounds().await.start;
let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
assert!(
matches!(
boundary_err,
Error::Journal(crate::journal::Error::ItemPruned(_))
),
"unexpected rewind error at retained boundary: {boundary_err:?}"
);
let expected_pruned_loc = *first_range.start - 1;
let err = db.rewind(first_range.start).await.unwrap_err();
assert!(
matches!(
err,
Error::Journal(crate::journal::Error::ItemPruned(loc))
if loc == expected_pruned_loc
),
"unexpected rewind error: {err:?}"
);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_rewind_rejects_target_below_bitmap_floor() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const COMMITS: u64 = 96;
let partition = "current-rewind-bitmap-floor";
let ctx = context.child("db");
let mut db: UnorderedVariableDb =
UnorderedVariableDb::init(ctx.child("storage"), variable_config::<OneCap>(partition, &ctx))
.await
.unwrap();
let mut history = Vec::new();
for round in 0..COMMITS {
commit_writes_with_metadata(
&mut db,
[(key(0), Some(val(10_000 + round)))],
None,
)
.await;
history.push((db.bounds().await.end, db.inactivity_floor_loc()));
}
assert!(db.inactivity_floor_loc() > Location::new(64));
let prune_loc = Location::new(1);
db.prune(prune_loc).await.unwrap();
let pruned_bits = db.pruned_bits();
assert!(pruned_bits > 0);
let retained_start = db.bounds().await.start;
let rewind_target = history
.iter()
.find_map(|(size, floor)| {
if *size > *retained_start
&& *size >= pruned_bits
&& *floor >= *retained_start
&& *floor < pruned_bits
{
Some(*size)
} else {
None
}
})
.unwrap_or_else(|| {
panic!(
"expected rewind target below bitmap boundary. retained_start={retained_start:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
db.inactivity_floor_loc()
)
});
let err = db.rewind(rewind_target).await.unwrap_err();
assert!(
matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
"unexpected rewind error: {err:?}"
);
db.destroy().await.unwrap();
});
}
pub fn test_speculative_root_matches_committed<M, C, F, Fut>(mut open_db: F)
where
M: merkle::Graftable + 'static,
C: DbAny<M> + 'static,
C::Key: TestKey,
<C as DbAny<M>>::Value: TestValue,
F: FnMut(Context, String) -> Fut + Clone,
Fut: Future<Output = C>,
{
let executor = deterministic::Runner::default();
let mut open_db_clone = open_db.clone();
executor.start(|context| async move {
let partition = "speculative-root".to_string();
let mut db: C = open_db_clone(context.child("init"), partition.clone()).await;
let mut batch = db.new_batch();
for i in 0..260 {
batch = batch.write(TestKey::from_seed(i), Some(TestValue::from_seed(i + 1000)));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
let speculative_root = db.root();
db.sync().await.unwrap();
drop(db);
let db: C = open_db(context.child("reopen"), partition).await;
assert_eq!(db.root(), speculative_root);
db.destroy().await.unwrap();
});
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_all_variants_speculative_root_matches_committed() {
let executor = deterministic::Runner::default();
executor.start(|_context| async move {
for_all_variants!(simple: test_speculative_root_matches_committed);
});
}
#[test_traced("INFO")]
fn test_current_batch_merkleized_get() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("mg", &ctx),
)
.await
.unwrap();
let ka = key(0);
let kb = key(1);
let kc = key(2);
{
let mut batch = db.new_batch();
batch = batch.write(ka, Some(val(0)));
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
let va2 = val(100);
let vb = val(1);
let mut batch = db.new_batch();
batch = batch.write(ka, Some(va2));
batch = batch.write(kb, Some(vb));
let merkleized = batch.merkleize(&db, None).await.unwrap();
assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
assert_eq!(merkleized.get(&kb, &db).await.unwrap(), Some(vb));
assert_eq!(merkleized.get(&kc, &db).await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_batch_chaining() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("ch", &ctx),
)
.await
.unwrap();
let mut parent = db.new_batch();
for i in 0..5 {
parent = parent.write(key(i), Some(val(i)));
}
let parent_m = parent.merkleize(&db, None).await.unwrap();
let mut child = parent_m.new_batch::<Sha256>();
for i in 5..10 {
child = child.write(key(i), Some(val(i)));
}
child = child.write(key(0), Some(val(999)));
let child_m = child.merkleize(&db, None).await.unwrap();
let child_root = child_m.root();
assert_eq!(child_m.get(&key(0), &db).await.unwrap(), Some(val(999)));
assert_eq!(child_m.get(&key(3), &db).await.unwrap(), Some(val(3)));
assert_eq!(child_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
db.apply_batch(child_m).await.unwrap();
assert_eq!(db.root(), child_root);
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(999)));
for i in 1..10 {
assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
}
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_unordered_root_matches_between_pending_and_committed_paths() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedFixedDb =
UnorderedFixedDb::init(ctx.child("storage"), fixed_config::<OneCap>("ucr", &ctx))
.await
.unwrap();
let key_a = colliding_digest(0xAA, 1);
let key_b = colliding_digest(0xAA, 0);
let mut initial = db.new_batch();
for i in 0..4 {
initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
}
let merkleized = initial.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let parent = db
.new_batch()
.write(key_a, Some(colliding_digest(0xCC, 1)))
.merkleize(&db, None)
.await
.unwrap();
let pending_child = parent
.new_batch::<Sha256>()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
let pending_root = pending_child.root();
let pending_ops_root = pending_child.ops_root();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
let committed_child = db
.new_batch()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(pending_root, committed_child.root());
assert_eq!(pending_ops_root, committed_child.ops_root());
db.apply_batch(pending_child).await.unwrap();
assert_eq!(db.root(), committed_child.root());
assert_eq!(db.ops_root(), committed_child.ops_root());
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_ordered_root_matches_between_pending_and_committed_paths() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: OrderedFixedDb =
OrderedFixedDb::init(ctx.child("storage"), fixed_config::<OneCap>("ocr", &ctx))
.await
.unwrap();
let key_a = colliding_digest(0xAA, 1);
let key_b = colliding_digest(0xAA, 0);
let mut initial = db.new_batch();
for i in 0..4 {
initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
}
let merkleized = initial.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let parent = db
.new_batch()
.write(key_a, Some(colliding_digest(0xCC, 1)))
.merkleize(&db, None)
.await
.unwrap();
let pending_child = parent
.new_batch::<Sha256>()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
let pending_root = pending_child.root();
let pending_ops_root = pending_child.ops_root();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
let committed_child = db
.new_batch()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(pending_root, committed_child.root());
assert_eq!(pending_ops_root, committed_child.ops_root());
db.apply_batch(pending_child).await.unwrap();
assert_eq!(db.root(), committed_child.root());
assert_eq!(db.ops_root(), committed_child.ops_root());
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_batch_apply_requires_commit_for_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let partition = "apply_requires_commit";
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>(partition, &ctx),
)
.await
.unwrap();
let committed_root = db.root();
let merkleized = db
.new_batch()
.write(key(0), Some(val(0)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
drop(db);
let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
context.child("reopen"),
variable_config::<OneCap>(partition, &context),
)
.await
.unwrap();
assert_eq!(reopened.root(), committed_root);
assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
reopened.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_batch_single_stage_pipeline() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("pipe", &ctx),
)
.await
.unwrap();
let mut batch = db.new_batch();
batch = batch.write(key(0), Some(val(0)));
let parent_merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(parent_merkleized).await.unwrap();
let (child_merkleized, commit_result) = futures::join!(
async {
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
let mut child = db.new_batch();
child = child.write(key(1), Some(val(1)));
child.merkleize(&db, None).await.unwrap()
},
db.commit(),
);
commit_result.unwrap();
db.apply_batch(child_merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_sequential_commit() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("ff", &ctx),
)
.await
.unwrap();
let parent_m = db
.new_batch()
.write(key(0), Some(val(0)))
.merkleize(&db, None)
.await
.unwrap();
let child_m = parent_m
.new_batch::<Sha256>()
.write(key(1), Some(val(1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent_m).await.unwrap();
db.apply_batch(child_m).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
let ctx2 = context.child("db").with_attribute("index", 2);
let mut db2: UnorderedVariableDb = UnorderedVariableDb::init(
ctx2.child("db"),
variable_config::<OneCap>("ff2", &ctx2),
)
.await
.unwrap();
let m1 = db2
.new_batch()
.write(key(0), Some(val(0)))
.merkleize(&db2, None)
.await
.unwrap();
db2.apply_batch(m1).await.unwrap();
let m2 = db2
.new_batch()
.write(key(1), Some(val(1)))
.merkleize(&db2, None)
.await
.unwrap();
db2.apply_batch(m2).await.unwrap();
assert_eq!(db.root(), db2.root());
db.destroy().await.unwrap();
db2.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_to_batch_then_chain() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("tb", &ctx),
)
.await
.unwrap();
let m = db
.new_batch()
.write(key(0), Some(val(0)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(m).await.unwrap();
let snapshot = db.to_batch();
assert_eq!(snapshot.root(), db.root());
let child = snapshot
.new_batch::<Sha256>()
.write(key(1), Some(val(1)))
.merkleize(&db, None)
.await
.unwrap();
assert_ne!(child.root(), snapshot.root());
db.apply_batch(child).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_live_batch_safe_across_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("prune-live", &ctx),
)
.await
.unwrap();
let mut seed = db.new_batch();
for i in 0u64..300 {
seed = seed.write(key(i), Some(val(i)));
}
let seed_m = seed.merkleize(&db, None).await.unwrap();
db.apply_batch(seed_m).await.unwrap();
db.commit().await.unwrap();
let mut p = db.new_batch();
for i in 0u64..250 {
p = p.write(key(i), Some(val(i + 10_000)));
}
let p_m = p.merkleize(&db, None).await.unwrap();
db.apply_batch(Arc::clone(&p_m)).await.unwrap();
db.commit().await.unwrap();
let c = p_m
.new_batch::<Sha256>()
.write(key(250), Some(val(99_999)))
.merkleize(&db, None)
.await
.unwrap();
db.prune(db.sync_boundary()).await.unwrap();
assert_eq!(c.get(&key(250), &db).await.unwrap(), Some(val(99_999)));
db.apply_batch(c).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(10_000)));
assert_eq!(db.get(&key(250)).await.unwrap(), Some(val(99_999)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_extend_applied_batch() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("xtend", &ctx),
)
.await
.unwrap();
let a = db
.new_batch()
.write(key(0), Some(val(0)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(Arc::clone(&a)).await.unwrap();
let b = a
.new_batch::<Sha256>()
.write(key(1), Some(val(1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(b).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
let c = db
.new_batch()
.write(key(2), Some(val(2)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(c).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_live_batch_child_after_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("child-after-prune", &ctx),
)
.await
.unwrap();
let mut seed = db.new_batch();
for i in 0u64..300 {
seed = seed.write(key(i), Some(val(i)));
}
let seed_m = seed.merkleize(&db, None).await.unwrap();
db.apply_batch(seed_m).await.unwrap();
db.commit().await.unwrap();
let mut a_batch = db.new_batch();
for i in 0u64..250 {
a_batch = a_batch.write(key(i), Some(val(i + 10_000)));
}
let a = a_batch.merkleize(&db, None).await.unwrap();
db.apply_batch(Arc::clone(&a)).await.unwrap();
db.commit().await.unwrap();
db.prune(db.sync_boundary()).await.unwrap();
let b = a
.new_batch::<Sha256>()
.write(key(300), Some(val(300)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(b).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(10_000)));
assert_eq!(db.get(&key(249)).await.unwrap(), Some(val(10_249)));
assert_eq!(db.get(&key(300)).await.unwrap(), Some(val(300)));
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_current_apply_after_ancestor_dropped() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("adrop", &ctx),
)
.await
.unwrap();
let mut a = db.new_batch();
for i in 0..3 {
a = a.write(key(i), Some(val(i)));
}
let a_m = a.merkleize(&db, None).await.unwrap();
let mut b = a_m.new_batch::<Sha256>();
for i in 3..6 {
b = b.write(key(i), Some(val(i)));
}
let b_m = b.merkleize(&db, None).await.unwrap();
let mut c = b_m.new_batch::<Sha256>();
for i in 6..9 {
c = c.write(key(i), Some(val(i)));
}
let c_m = c.merkleize(&db, None).await.unwrap();
drop(a_m);
drop(b_m);
db.apply_batch(c_m).await.unwrap();
db.commit().await.unwrap();
for i in 0..9 {
assert_eq!(
db.get(&key(i)).await.unwrap(),
Some(val(i)),
"key({i}) missing after apply_batch with dropped ancestors"
);
}
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_current_chain_bitmap_order_matches_sequential() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx1 = context.child("db").with_attribute("index", 1);
let mut db1: UnorderedVariableDb = UnorderedVariableDb::init(
ctx1.child("db"),
variable_config::<OneCap>("ord1", &ctx1),
)
.await
.unwrap();
commit_writes_with_metadata(
&mut db1,
[(key(10), Some(val(10))), (key(11), Some(val(11)))],
None,
)
.await;
let a = db1
.new_batch()
.write(key(10), Some(val(100)))
.write(key(11), None) .merkleize(&db1, None)
.await
.unwrap();
let b = a
.new_batch::<Sha256>()
.write(key(12), Some(val(120)))
.write(key(13), Some(val(130)))
.merkleize(&db1, None)
.await
.unwrap();
let c = b
.new_batch::<Sha256>()
.write(key(14), Some(val(140)))
.merkleize(&db1, None)
.await
.unwrap();
db1.apply_batch(c).await.unwrap();
db1.commit().await.unwrap();
let d1 = db1
.new_batch()
.write(key(20), Some(val(200)))
.merkleize(&db1, None)
.await
.unwrap();
let chain_then_d_root = d1.root();
let ctx2 = context.child("db").with_attribute("index", 2);
let mut db2: UnorderedVariableDb = UnorderedVariableDb::init(
ctx2.child("db"),
variable_config::<OneCap>("ord2", &ctx2),
)
.await
.unwrap();
commit_writes_with_metadata(
&mut db2,
[(key(10), Some(val(10))), (key(11), Some(val(11)))],
None,
)
.await;
let a2 = db2
.new_batch()
.write(key(10), Some(val(100)))
.write(key(11), None)
.merkleize(&db2, None)
.await
.unwrap();
db2.apply_batch(a2).await.unwrap();
db2.commit().await.unwrap();
let b2 = db2
.new_batch()
.write(key(12), Some(val(120)))
.write(key(13), Some(val(130)))
.merkleize(&db2, None)
.await
.unwrap();
db2.apply_batch(b2).await.unwrap();
db2.commit().await.unwrap();
let c2 = db2
.new_batch()
.write(key(14), Some(val(140)))
.merkleize(&db2, None)
.await
.unwrap();
db2.apply_batch(c2).await.unwrap();
db2.commit().await.unwrap();
let d2 = db2
.new_batch()
.write(key(20), Some(val(200)))
.merkleize(&db2, None)
.await
.unwrap();
let sequential_then_d_root = d2.root();
assert_eq!(
chain_then_d_root, sequential_then_d_root,
"batch D's root on top of chain-applied state must match sequential state"
);
db1.destroy().await.unwrap();
db2.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_current_stale_bitmap_clears_after_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("stale-clears", &ctx),
)
.await
.unwrap();
let mut seed = db.new_batch();
for i in 0u64..255 {
seed = seed.write(key(i), Some(val(i)));
}
let seed_m = seed.merkleize(&db, None).await.unwrap();
db.apply_batch(seed_m).await.unwrap();
db.commit().await.unwrap();
let mut p = db.new_batch();
for i in 1u64..255 {
p = p.write(key(i), Some(val(i + 10000)));
}
let p_m = p.merkleize(&db, None).await.unwrap();
let c_m = p_m
.new_batch::<Sha256>()
.write(key(0), Some(val(9999)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(p_m).await.unwrap();
db.commit().await.unwrap();
let floor = *db.inactivity_floor_loc();
assert!(floor >= 256, "floor must be past chunk 0: floor={floor}",);
db.prune(db.sync_boundary()).await.unwrap();
db.apply_batch(c_m).await.unwrap();
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_partial_ancestor_commit() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("pac", &ctx),
)
.await
.unwrap();
let a = db
.new_batch()
.write(key(0), Some(val(0)))
.merkleize(&db, None)
.await
.unwrap();
let b = a
.new_batch::<Sha256>()
.write(key(1), Some(val(1)))
.merkleize(&db, None)
.await
.unwrap();
let c = b
.new_batch::<Sha256>()
.write(key(2), Some(val(2)))
.merkleize(&db, None)
.await
.unwrap();
let expected_root = c.root();
db.apply_batch(a).await.unwrap();
db.apply_batch(c).await.unwrap();
assert_eq!(db.root(), expected_root);
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_partial_ancestor_bitmap_ordering() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("bmo", &ctx),
)
.await
.unwrap();
let a = db
.new_batch()
.write(key(0), Some(val(0)))
.merkleize(&db, None)
.await
.unwrap();
let b = a
.new_batch::<Sha256>()
.write(key(1), Some(val(1)))
.merkleize(&db, None)
.await
.unwrap();
let c = b
.new_batch::<Sha256>()
.write(key(2), Some(val(2)))
.merkleize(&db, None)
.await
.unwrap();
let d = c
.new_batch::<Sha256>()
.write(key(3), Some(val(3)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(a).await.unwrap();
db.apply_batch(d.clone()).await.unwrap();
let e = db
.new_batch()
.write(key(4), Some(val(4)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(e).await.unwrap();
let ref_ctx = context.child("ref");
let mut ref_db: UnorderedVariableDb = UnorderedVariableDb::init(
ref_ctx.child("db"),
variable_config::<OneCap>("bmo_ref", &ref_ctx),
)
.await
.unwrap();
for i in 0..5 {
let batch = ref_db
.new_batch()
.write(key(i), Some(val(i)))
.merkleize(&ref_db, None)
.await
.unwrap();
ref_db.apply_batch(batch).await.unwrap();
}
assert_eq!(
db.root(),
ref_db.root(),
"root mismatch: bitmap ordering bug"
);
db.destroy().await.unwrap();
ref_db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_apply_chunks_match_speculative_chunks() {
const N: usize = 32;
const CHUNK_SIZE_BITS: u64 = commonware_utils::bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
const SEED_KEYS: u64 = CHUNK_SIZE_BITS + 50;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
ctx.child("storage"),
variable_config::<OneCap>("spec_eq", &ctx),
)
.await
.unwrap();
let seed = (0..SEED_KEYS).fold(db.new_batch(), |b, i| b.write(key(i), Some(val(i))));
let seed = seed.merkleize(&db, None).await.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
assert!(
Readable::<N>::len(db.any.bitmap.as_ref()) > CHUNK_SIZE_BITS,
"setup must cross a chunk boundary",
);
let parent = db
.new_batch()
.write(key(10), Some(val(110))) .write(key(50), None) .write(key(CHUNK_SIZE_BITS + 5), Some(val(120))) .write(key(SEED_KEYS), Some(val(130))) .write(key(SEED_KEYS + 1), Some(val(131))) .merkleize(&db, None)
.await
.unwrap();
let child = parent
.new_batch::<Sha256>()
.write(key(10), Some(val(210)))
.write(key(SEED_KEYS), None)
.write(key(75), None)
.write(key(CHUNK_SIZE_BITS + 30), Some(val(220)))
.merkleize(&db, None)
.await
.unwrap();
let speculative_chunks: Vec<[u8; N]> = {
let len = Readable::<N>::len(&child.bitmap);
let chunk_count = len.div_ceil(CHUNK_SIZE_BITS) as usize;
(0..chunk_count)
.map(|idx| Readable::<N>::get_chunk(&child.bitmap, idx))
.collect()
};
assert!(speculative_chunks.len() >= 2);
db.apply_batch(child).await.unwrap();
let committed_chunks: Vec<[u8; N]> = {
let len = Readable::<N>::len(db.any.bitmap.as_ref());
let chunk_count = len.div_ceil(CHUNK_SIZE_BITS) as usize;
(0..chunk_count)
.map(|idx| Readable::<N>::get_chunk(db.any.bitmap.as_ref(), idx))
.collect()
};
assert_eq!(
speculative_chunks, committed_chunks,
"speculative chunks must equal post-apply committed chunks across all chunks",
);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_current_mmb_ops_historical_proof_verifies_with_backward_bagging() {
use crate::{merkle::hasher::Standard, qmdb::verify_proof};
use commonware_utils::NZU64;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.child("db");
let mut db: UnorderedFixedMmbDb = UnorderedFixedMmbDb::init(
ctx.child("storage"),
fixed_config::<OneCap>("mmb-ops-proof", &ctx),
)
.await
.unwrap();
let writes: Vec<(Digest, Option<Digest>)> =
(0u64..16).map(|i| (key(i), Some(val(i)))).collect();
commit_writes(&mut db, writes).await.unwrap();
let ops_root = db.ops_root();
let historical_size = db.bounds().await.end;
let (proof, ops) = db
.ops_historical_proof(historical_size, Location::new(0), NZU64!(32))
.await
.unwrap();
let hasher = qmdb::hasher::<Sha256>();
assert!(verify_proof(
&hasher,
&proof,
Location::new(0),
&ops,
&ops_root
));
let plain = Standard::<Sha256>::new(ForwardFold);
assert!(!verify_proof(
&plain,
&proof,
Location::new(0),
&ops,
&ops_root
));
db.destroy().await.unwrap();
});
}
}