use crate::{
index::Factory as IndexFactory,
journal::{
authenticated::Inner,
contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
},
merkle::{journaled::Config as MerkleConfig, Family, Location},
qmdb::{
any::operation::{Operation, Update},
operation::Committable,
},
translator::Translator,
Context,
};
use commonware_codec::CodecShared;
use commonware_cryptography::Hasher;
use tracing::warn;
pub mod batch;
pub mod db;
pub mod operation;
#[cfg(any(test, feature = "test-traits"))]
pub mod traits;
pub mod value;
pub use value::{FixedValue, ValueEncoding, VariableValue};
pub mod ordered;
pub(crate) mod sync;
pub mod unordered;
#[derive(Clone)]
pub struct Config<T: Translator, J> {
pub merkle_config: MerkleConfig,
pub journal_config: J,
pub translator: T,
}
pub type FixedConfig<T> = Config<T, FConfig>;
pub type VariableConfig<T, C> = Config<T, VConfig<C>>;
pub async fn init<F, E, U, H, T, I, J, Cb>(
context: E,
cfg: Config<T, J::Config>,
known_inactivity_floor: Option<Location<F>>,
callback: Cb,
) -> Result<db::Db<F, E, J, I, H, U>, crate::qmdb::Error<F>>
where
F: Family,
E: Context,
U: Update + Send + Sync,
H: Hasher,
T: Translator,
I: IndexFactory<T, Value = Location<F>>,
J: Inner<E, Item = Operation<F, U>>,
Operation<F, U>: Committable + CodecShared,
Cb: FnMut(bool, Option<Location<F>>),
{
let mut log = J::init::<F, H>(
context.with_label("log"),
cfg.merkle_config,
cfg.journal_config,
Operation::is_commit,
)
.await?;
if log.size().await == 0 {
warn!("Authenticated log is empty, initializing new db");
let commit_floor = Operation::CommitFloor(None, Location::new(0));
log.append(&commit_floor).await?;
log.sync().await?;
}
let index = I::new(context.with_label("index"), cfg.translator);
db::Db::init_from_log(index, log, known_inactivity_floor, callback).await
}
#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::{
journal::contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
qmdb::any::{FixedConfig, MerkleConfig, VariableConfig},
translator::OneCap,
};
use commonware_codec::{Codec, CodecShared};
use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
use commonware_runtime::{
buffer::paged::CacheRef, deterministic::Context, BufferPooler, Metrics,
};
use commonware_utils::{NZUsize, NZU16, NZU64};
use core::{future::Future, pin::Pin};
use std::{
collections::HashMap,
num::{NonZeroU16, NonZeroUsize},
};
pub(crate) fn colliding_digest(prefix: u8, suffix: u64) -> Digest {
let mut bytes = [0u8; 32];
bytes[0] = prefix;
bytes[24..].copy_from_slice(&suffix.to_be_bytes());
Digest::from(bytes)
}
const PAGE_SIZE: NonZeroU16 = NZU16!(101);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
pub(crate) fn fixed_db_config<T: Translator + Default>(
suffix: &str,
pooler: &impl BufferPooler,
) -> FixedConfig<T> {
let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
FixedConfig {
merkle_config: MerkleConfig {
journal_partition: format!("journal-{suffix}"),
metadata_partition: format!("metadata-{suffix}"),
items_per_blob: NZU64!(11),
write_buffer: NZUsize!(1024),
thread_pool: None,
page_cache: page_cache.clone(),
},
journal_config: FConfig {
partition: format!("log-journal-{suffix}"),
items_per_blob: NZU64!(7),
page_cache,
write_buffer: NZUsize!(1024),
},
translator: T::default(),
}
}
pub(crate) fn variable_db_config<T: Translator + Default>(
suffix: &str,
pooler: &impl BufferPooler,
) -> VariableConfig<T, ((), ())> {
let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
VariableConfig {
merkle_config: MerkleConfig {
journal_partition: format!("journal-{suffix}"),
metadata_partition: format!("metadata-{suffix}"),
items_per_blob: NZU64!(11),
write_buffer: NZUsize!(1024),
thread_pool: None,
page_cache: page_cache.clone(),
},
journal_config: VConfig {
partition: format!("log-journal-{suffix}"),
items_per_section: NZU64!(7),
compression: None,
codec_config: ((), ()),
page_cache,
write_buffer: NZUsize!(1024),
},
translator: T::default(),
}
}
use crate::{
index::Unordered as UnorderedIndex,
journal::contiguous::Mutable,
merkle::mmr,
qmdb::any::{
db::Db as AnyDb,
operation::{update::Update as UpdateTrait, Operation as AnyOperation},
traits::{DbAny, Provable, UnmerkleizedBatch as _},
},
};
type Error = crate::qmdb::Error<mmr::Family>;
type Location = mmr::Location;
pub(crate) trait RewindableDb {
fn rewind_to_size(
&mut self,
size: Location,
) -> impl Future<Output = Result<(), Error>> + Send;
}
impl<E, U, C, I, H> RewindableDb for AnyDb<mmr::Family, E, C, I, H, U>
where
E: crate::Context,
U: UpdateTrait,
C: Mutable<Item = AnyOperation<mmr::Family, U>>,
I: UnorderedIndex<Value = Location>,
H: Hasher,
AnyOperation<mmr::Family, U>: Codec,
{
async fn rewind_to_size(&mut self, size: Location) -> Result<(), Error> {
self.rewind(size).await?;
Ok(())
}
}
pub(crate) async fn test_any_db_non_empty_recovery<F: Family, D, V: Clone + CodecShared>(
context: Context,
mut db: D,
reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
make_value: impl Fn(u64) -> V,
) where
D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
{
const ELEMENTS: u64 = 1000;
{
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value(i * 1000);
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
db.commit().await.unwrap();
db.prune(db.inactivity_floor_loc().await).await.unwrap();
let root = db.root();
let op_count = db.size().await;
let inactivity_floor_loc = db.inactivity_floor_loc().await;
let db = reopen_db(context.with_label("reopen1")).await;
assert_eq!(db.size().await, op_count);
assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
assert_eq!(db.root(), root);
{
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value((i + 1) * 10000);
batch = batch.write(k, Some(v));
}
let _merkleized = batch.merkleize(&db, None).await.unwrap();
}
let db = reopen_db(context.with_label("reopen2")).await;
assert_eq!(db.size().await, op_count);
assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
assert_eq!(db.root(), root);
{
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value((i + 1) * 10000);
batch = batch.write(k, Some(v));
}
let _merkleized = batch.merkleize(&db, None).await.unwrap();
}
let db = reopen_db(context.with_label("reopen3")).await;
assert_eq!(db.size().await, op_count);
assert_eq!(db.root(), root);
for _ in 0..3 {
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value((i + 1) * 10000);
batch = batch.write(k, Some(v));
}
let _merkleized = batch.merkleize(&db, None).await.unwrap();
}
let mut db = reopen_db(context.with_label("reopen4")).await;
assert_eq!(db.size().await, op_count);
assert_eq!(db.root(), root);
{
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value((i + 1) * 10000);
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
db.commit().await.unwrap();
let db = reopen_db(context.with_label("reopen5")).await;
assert!(db.size().await > op_count);
assert_ne!(db.inactivity_floor_loc().await, inactivity_floor_loc);
assert_ne!(db.root(), root);
db.destroy().await.unwrap();
}
pub(crate) async fn test_any_db_empty_recovery<F: Family, D, V: Clone + CodecShared>(
context: Context,
db: D,
reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
make_value: impl Fn(u64) -> V,
) where
D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
{
let root = db.root();
let db = reopen_db(context.with_label("reopen1")).await;
assert_eq!(db.size().await, 1);
assert_eq!(db.root(), root);
{
let mut batch = db.new_batch();
for i in 0u64..1000 {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value((i + 1) * 10000);
batch = batch.write(k, Some(v));
}
let _merkleized = batch.merkleize(&db, None).await.unwrap();
}
let db = reopen_db(context.with_label("reopen2")).await;
assert_eq!(db.size().await, 1);
assert_eq!(db.root(), root);
{
let mut batch = db.new_batch();
for i in 0u64..1000 {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value((i + 1) * 10000);
batch = batch.write(k, Some(v));
}
let _merkleized = batch.merkleize(&db, None).await.unwrap();
}
drop(db);
let db = reopen_db(context.with_label("reopen3")).await;
assert_eq!(db.size().await, 1);
assert_eq!(db.root(), root);
for _ in 0..3 {
let mut batch = db.new_batch();
for i in 0u64..1000 {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value((i + 1) * 10000);
batch = batch.write(k, Some(v));
}
let _merkleized = batch.merkleize(&db, None).await.unwrap();
}
drop(db);
let mut db = reopen_db(context.with_label("reopen4")).await;
assert_eq!(db.size().await, 1);
assert_eq!(db.root(), root);
{
let mut batch = db.new_batch();
for i in 0u64..1000 {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value((i + 1) * 10000);
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
db.commit().await.unwrap();
drop(db);
let db = reopen_db(context.with_label("reopen5")).await;
assert!(db.size().await > 1);
assert_ne!(db.root(), root);
db.destroy().await.unwrap();
}
pub(crate) async fn test_any_db_rewind_recovery<D, V>(
context: Context,
mut db: D,
reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
make_value: impl Fn(u64) -> V,
) where
D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + RewindableDb,
V: Clone + CodecShared + Eq + std::fmt::Debug,
{
let key0 = Sha256::hash(&0u64.to_be_bytes());
let key1 = Sha256::hash(&1u64.to_be_bytes());
let key2 = Sha256::hash(&2u64.to_be_bytes());
let initial_root = db.root();
let initial_size = db.size().await;
let initial_floor = db.inactivity_floor_loc().await;
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
let empty_range = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(empty_range.start, initial_size);
assert_eq!(db.size().await, empty_range.end);
db.rewind_to_size(initial_size).await.unwrap();
assert_eq!(db.root(), initial_root);
assert_eq!(db.size().await, initial_size);
assert_eq!(db.inactivity_floor_loc().await, initial_floor);
assert_eq!(db.get_metadata().await.unwrap(), None);
let value0_a = make_value(10);
let value1_a = make_value(11);
let metadata_a = make_value(12);
let merkleized = db
.new_batch()
.write(key0, Some(value0_a.clone()))
.write(key1, Some(value1_a.clone()))
.merkleize(&db, Some(metadata_a.clone()))
.await
.unwrap();
let range_a = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let root_a = db.root();
let size_a = db.size().await;
let floor_a = db.inactivity_floor_loc().await;
assert_eq!(size_a, range_a.end);
let value0_b = make_value(20);
let value2_b = make_value(21);
let metadata_b = make_value(22);
let merkleized = db
.new_batch()
.write(key0, Some(value0_b))
.write(key1, None)
.write(key2, Some(value2_b))
.merkleize(&db, Some(metadata_b))
.await
.unwrap();
let range_b = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(range_b.start, size_a);
assert_ne!(db.root(), root_a);
let value0_c = make_value(30);
let value1_c = make_value(31);
let metadata_c = make_value(32);
let merkleized = db
.new_batch()
.write(key0, Some(value0_c))
.write(key1, Some(value1_c))
.write(key2, None)
.merkleize(&db, Some(metadata_c))
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
db.rewind_to_size(size_a).await.unwrap();
assert_eq!(db.root(), root_a);
assert_eq!(db.size().await, size_a);
assert_eq!(db.inactivity_floor_loc().await, floor_a);
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a.clone()));
assert_eq!(db.get(&key0).await.unwrap(), Some(value0_a));
assert_eq!(db.get(&key1).await.unwrap(), Some(value1_a));
assert_eq!(db.get(&key2).await.unwrap(), None);
db.commit().await.unwrap();
drop(db);
let mut db = reopen_db(context.with_label("reopen_after_rewind")).await;
assert_eq!(db.root(), root_a);
assert_eq!(db.size().await, size_a);
assert_eq!(db.inactivity_floor_loc().await, floor_a);
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
assert_eq!(db.get(&key2).await.unwrap(), None);
let value2_d = make_value(40);
let metadata_d = make_value(41);
let merkleized = db
.new_batch()
.write(key2, Some(value2_d.clone()))
.merkleize(&db, Some(metadata_d.clone()))
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d.clone()));
assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d.clone()));
drop(db);
let mut db = reopen_db(context.with_label("reopen_after_rewind_new_writes")).await;
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d));
assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d));
db.rewind_to_size(initial_size).await.unwrap();
assert_eq!(db.root(), initial_root);
assert_eq!(db.size().await, initial_size);
assert_eq!(db.inactivity_floor_loc().await, initial_floor);
assert_eq!(db.get_metadata().await.unwrap(), None);
assert_eq!(db.get(&key0).await.unwrap(), None);
assert_eq!(db.get(&key1).await.unwrap(), None);
assert_eq!(db.get(&key2).await.unwrap(), None);
db.commit().await.unwrap();
drop(db);
let db = reopen_db(context.with_label("reopen_initial_boundary")).await;
assert_eq!(db.root(), initial_root);
assert_eq!(db.size().await, initial_size);
assert_eq!(db.inactivity_floor_loc().await, initial_floor);
assert_eq!(db.get_metadata().await.unwrap(), None);
assert_eq!(db.get(&key0).await.unwrap(), None);
assert_eq!(db.get(&key1).await.unwrap(), None);
assert_eq!(db.get(&key2).await.unwrap(), None);
db.destroy().await.unwrap();
}
pub(crate) async fn test_any_db_build_and_authenticate<D, V>(
context: Context,
mut db: D,
reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
make_value: impl Fn(u64) -> V,
) where
D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
V: CodecShared + Clone + Eq + std::hash::Hash + std::fmt::Debug,
<D as Provable<mmr::Family>>::Operation: Codec,
{
use crate::{mmr::StandardHasher, qmdb::verify_proof};
const ELEMENTS: u64 = 1000;
let mut map = HashMap::<Digest, V>::default();
{
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value(i * 1000);
batch = batch.write(k, Some(v.clone()));
map.insert(k, v);
}
for i in 0u64..ELEMENTS {
if i % 3 != 0 {
continue;
}
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value((i + 1) * 10000);
batch = batch.write(k, Some(v.clone()));
map.insert(k, v);
}
for i in 0u64..ELEMENTS {
if i % 7 != 1 {
continue;
}
let k = Sha256::hash(&i.to_be_bytes());
batch = batch.write(k, None);
map.remove(&k);
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
db.sync().await.unwrap();
db.prune(db.inactivity_floor_loc().await).await.unwrap();
let root = db.root();
db.sync().await.unwrap();
drop(db);
let db = reopen_db(context.with_label("reopened")).await;
assert_eq!(root, db.root());
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
if let Some(map_value) = map.get(&k) {
let Some(db_value) = db.get(&k).await.unwrap() else {
panic!("key not found in db: {k}");
};
assert_eq!(*map_value, db_value);
} else {
assert!(db.get(&k).await.unwrap().is_none());
}
}
let hasher = StandardHasher::<Sha256>::new();
let bounds = db.bounds().await;
let inactivity_floor = db.inactivity_floor_loc().await;
for loc in *inactivity_floor..*bounds.end {
let loc = Location::new(loc);
let (proof, ops) = db.proof(loc, NZU64!(10)).await.unwrap();
assert!(verify_proof(&hasher, &proof, loc, &ops, &root));
}
db.destroy().await.unwrap();
}
pub(crate) async fn test_any_db_log_replay<
F: Family,
D,
V: Clone + CodecShared + PartialEq + std::fmt::Debug,
>(
context: Context,
mut db: D,
reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
make_value: impl Fn(u64) -> V,
) where
D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
{
const UPDATES: u64 = 100;
let k = Sha256::hash(&UPDATES.to_be_bytes());
let mut last_value = None;
{
let mut batch = db.new_batch();
for i in 0u64..UPDATES {
let v = make_value(i * 1000);
last_value = Some(v.clone());
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
db.commit().await.unwrap();
let root = db.root();
drop(db);
let db = reopen_db(context.with_label("reopened")).await;
assert_eq!(db.root(), root);
assert_eq!(db.get(&k).await.unwrap(), last_value);
db.destroy().await.unwrap();
}
pub(crate) async fn test_any_db_historical_proof_basic<D, V: Clone + CodecShared>(
_context: Context,
mut db: D,
make_value: impl Fn(u64) -> V,
) where
D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
<D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
{
use crate::{mmr::StandardHasher, qmdb::verify_proof};
use commonware_utils::NZU64;
const OPS: u64 = 20;
{
let mut batch = db.new_batch();
for i in 0u64..OPS {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value(i * 1000);
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
let root_hash = db.root();
let original_op_count = db.size().await;
let max_ops = NZU64!(10);
let start_loc = Location::new(5);
let (historical_proof, historical_ops) = db
.historical_proof(original_op_count, start_loc, max_ops)
.await
.unwrap();
let (regular_proof, regular_ops) = db.proof(start_loc, max_ops).await.unwrap();
assert_eq!(historical_proof.leaves, regular_proof.leaves);
assert_eq!(historical_proof.digests, regular_proof.digests);
assert_eq!(historical_ops, regular_ops);
let hasher = StandardHasher::<Sha256>::new();
assert!(verify_proof(
&hasher,
&historical_proof,
start_loc,
&historical_ops,
&root_hash
));
{
let mut batch = db.new_batch();
for i in OPS..(OPS + 5) {
let k = Sha256::hash(&(i + 1000).to_be_bytes()); let v = make_value(i * 1000);
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
let (historical_proof2, historical_ops2) = db
.historical_proof(original_op_count, start_loc, max_ops)
.await
.unwrap();
assert_eq!(historical_proof2.leaves, original_op_count);
assert_eq!(historical_proof2.digests, regular_proof.digests);
assert_eq!(historical_ops2, regular_ops);
assert!(verify_proof(
&hasher,
&historical_proof2,
start_loc,
&historical_ops2,
&root_hash
));
db.destroy().await.unwrap();
}
pub(crate) async fn test_any_db_historical_proof_invalid<D, V: Clone + CodecShared>(
_context: Context,
mut db: D,
make_value: impl Fn(u64) -> V,
) where
D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
<D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug + Clone,
{
use crate::{mmr::StandardHasher, qmdb::verify_proof};
use commonware_utils::NZU64;
{
let mut batch = db.new_batch();
for i in 0u64..10 {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value(i * 1000);
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
let historical_op_count = Location::new(5);
let (proof, ops) = db
.historical_proof(historical_op_count, Location::new(1), NZU64!(10))
.await
.unwrap();
assert_eq!(proof.leaves, historical_op_count);
assert_eq!(ops.len(), 4);
let hasher = StandardHasher::<Sha256>::new();
{
let mut tampered_proof = proof.clone();
tampered_proof.digests[0] = Sha256::hash(b"invalid");
let root_hash = db.root();
assert!(!verify_proof(
&hasher,
&tampered_proof,
Location::new(1),
&ops,
&root_hash
));
}
{
let mut tampered_proof = proof.clone();
tampered_proof.digests.push(Sha256::hash(b"invalid"));
let root_hash = db.root();
assert!(!verify_proof(
&hasher,
&tampered_proof,
Location::new(1),
&ops,
&root_hash
));
}
{
let root_hash = db.root();
let mut tampered_ops = ops.clone();
if tampered_ops.len() >= 2 {
tampered_ops.swap(0, 1);
assert!(!verify_proof(
&hasher,
&proof,
Location::new(1),
&tampered_ops,
&root_hash
));
}
}
{
let root_hash = db.root();
let mut tampered_ops = ops.clone();
tampered_ops.push(tampered_ops[0].clone());
assert!(!verify_proof(
&hasher,
&proof,
Location::new(1),
&tampered_ops,
&root_hash
));
}
{
let root_hash = db.root();
assert!(!verify_proof(
&hasher,
&proof,
Location::new(2),
&ops,
&root_hash
));
}
{
let invalid_root = Sha256::hash(b"invalid");
assert!(!verify_proof(
&hasher,
&proof,
Location::new(1),
&ops,
&invalid_root
));
}
{
let mut tampered_proof = proof.clone();
tampered_proof.leaves = Location::new(100);
let root_hash = db.root();
assert!(!verify_proof(
&hasher,
&tampered_proof,
Location::new(1),
&ops,
&root_hash
));
}
db.destroy().await.unwrap();
}
pub(crate) async fn test_any_db_historical_proof_edge_cases<D, V: Clone + CodecShared>(
_context: Context,
mut db: D,
make_value: impl Fn(u64) -> V,
) where
D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
<D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
{
use commonware_utils::NZU64;
{
let mut batch = db.new_batch();
for i in 0u64..50 {
let k = Sha256::hash(&i.to_be_bytes());
let v = make_value(i * 1000);
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
let (single_proof, single_ops) = db
.historical_proof(Location::new(2), Location::new(1), NZU64!(1))
.await
.unwrap();
assert_eq!(single_proof.leaves, Location::new(2));
assert_eq!(single_ops.len(), 1);
let (_limited_proof, limited_ops) = db
.historical_proof(Location::new(11), Location::new(6), NZU64!(20))
.await
.unwrap();
assert_eq!(limited_ops.len(), 5);
let (min_proof, min_ops) = db
.historical_proof(Location::new(4), Location::new(1), NZU64!(3))
.await
.unwrap();
assert_eq!(min_proof.leaves, Location::new(4));
assert_eq!(min_ops.len(), 3);
db.destroy().await.unwrap();
}
pub(crate) async fn test_any_db_multiple_commits_delete_replayed<F: Family, D, V>(
context: Context,
mut db: D,
reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
make_value: impl Fn(u64) -> V,
) where
D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
V: Clone + CodecShared + Eq + std::fmt::Debug,
{
let mut map = HashMap::<Digest, V>::default();
const ELEMENTS: u64 = 10;
let metadata_value = make_value(42);
let key_at = |j: u64, i: u64| Sha256::hash(&(j * 1000 + i).to_be_bytes());
for j in 0u64..ELEMENTS {
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = key_at(j, i);
let v = make_value(i * 1000);
batch = batch.write(k, Some(v.clone()));
map.insert(k, v);
}
let merkleized = batch
.merkleize(&db, Some(metadata_value.clone()))
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
}
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_value));
let k = key_at(ELEMENTS - 1, ELEMENTS - 1);
let merkleized = db
.new_batch()
.write(k, None)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), None);
assert!(db.get(&k).await.unwrap().is_none());
let root = db.root();
drop(db);
let db = reopen_db(context.with_label("reopened")).await;
assert_eq!(root, db.root());
assert_eq!(db.get_metadata().await.unwrap(), None);
assert!(db.get(&k).await.unwrap().is_none());
db.destroy().await.unwrap();
}
use crate::qmdb::any::{
ordered::{fixed::Db as OrderedFixedDb, variable::Db as OrderedVariableDb},
unordered::{fixed::Db as UnorderedFixedDb, variable::Db as UnorderedVariableDb},
};
use commonware_macros::{test_group, test_traced};
use commonware_runtime::{deterministic, Runner as _};
type UnorderedFixed = UnorderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
type UnorderedVariable =
UnorderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
type OrderedFixed = OrderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
type OrderedVariable = OrderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
type UnorderedFixedP1 =
unordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 1>;
type UnorderedVariableP1 = unordered::variable::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
1,
>;
type OrderedFixedP1 =
ordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 1>;
type OrderedVariableP1 =
ordered::variable::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 1>;
type UnorderedFixedP2 =
unordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 2>;
type UnorderedVariableP2 = unordered::variable::partitioned::Db<
mmr::Family,
Context,
Digest,
Digest,
Sha256,
OneCap,
2,
>;
type OrderedFixedP2 =
ordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 2>;
type OrderedVariableP2 =
ordered::variable::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 2>;
mod mmb_types {
use super::*;
use crate::{
index::{ordered::Index as OrderedIndex, unordered::Index as UnorderedIndex},
journal::contiguous::{fixed::Journal as FJournal, variable::Journal as VJournal},
merkle::{mmb, Location},
qmdb::any::{
operation::{update, Operation},
value::{FixedEncoding, VariableEncoding},
},
};
type MmbLocation = Location<mmb::Family>;
pub type MmbUnorderedFixed = super::super::db::Db<
mmb::Family,
Context,
FJournal<
Context,
Operation<mmb::Family, update::Unordered<Digest, FixedEncoding<Digest>>>,
>,
UnorderedIndex<OneCap, MmbLocation>,
Sha256,
update::Unordered<Digest, FixedEncoding<Digest>>,
>;
pub type MmbUnorderedVariable = super::super::db::Db<
mmb::Family,
Context,
VJournal<
Context,
Operation<mmb::Family, update::Unordered<Digest, VariableEncoding<Digest>>>,
>,
UnorderedIndex<OneCap, MmbLocation>,
Sha256,
update::Unordered<Digest, VariableEncoding<Digest>>,
>;
pub type MmbOrderedFixed = super::super::db::Db<
mmb::Family,
Context,
FJournal<
Context,
Operation<mmb::Family, update::Ordered<Digest, FixedEncoding<Digest>>>,
>,
OrderedIndex<OneCap, MmbLocation>,
Sha256,
update::Ordered<Digest, FixedEncoding<Digest>>,
>;
pub type MmbOrderedVariable = super::super::db::Db<
mmb::Family,
Context,
VJournal<
Context,
Operation<mmb::Family, update::Ordered<Digest, VariableEncoding<Digest>>>,
>,
OrderedIndex<OneCap, MmbLocation>,
Sha256,
update::Ordered<Digest, VariableEncoding<Digest>>,
>;
}
use mmb_types::*;
#[inline]
fn to_digest(i: u64) -> Digest {
Sha256::hash(&i.to_be_bytes())
}
macro_rules! with_mmr_variants {
($cb:ident!($($args:tt)*)) => {
$cb!($($args)*, "uf", UnorderedFixed, fixed_db_config);
$cb!($($args)*, "uv", UnorderedVariable, variable_db_config);
$cb!($($args)*, "of", OrderedFixed, fixed_db_config);
$cb!($($args)*, "ov", OrderedVariable, variable_db_config);
$cb!($($args)*, "ufp1", UnorderedFixedP1, fixed_db_config);
$cb!($($args)*, "uvp1", UnorderedVariableP1, variable_db_config);
$cb!($($args)*, "ofp1", OrderedFixedP1, fixed_db_config);
$cb!($($args)*, "ovp1", OrderedVariableP1, variable_db_config);
$cb!($($args)*, "ufp2", UnorderedFixedP2, fixed_db_config);
$cb!($($args)*, "uvp2", UnorderedVariableP2, variable_db_config);
$cb!($($args)*, "ofp2", OrderedFixedP2, fixed_db_config);
$cb!($($args)*, "ovp2", OrderedVariableP2, variable_db_config);
};
}
macro_rules! with_all_variants {
($cb:ident!($($args:tt)*)) => {
$cb!($($args)*, "uf", UnorderedFixed, fixed_db_config);
$cb!($($args)*, "uv", UnorderedVariable, variable_db_config);
$cb!($($args)*, "of", OrderedFixed, fixed_db_config);
$cb!($($args)*, "ov", OrderedVariable, variable_db_config);
$cb!($($args)*, "ufp1", UnorderedFixedP1, fixed_db_config);
$cb!($($args)*, "uvp1", UnorderedVariableP1, variable_db_config);
$cb!($($args)*, "ofp1", OrderedFixedP1, fixed_db_config);
$cb!($($args)*, "ovp1", OrderedVariableP1, variable_db_config);
$cb!($($args)*, "ufp2", UnorderedFixedP2, fixed_db_config);
$cb!($($args)*, "uvp2", UnorderedVariableP2, variable_db_config);
$cb!($($args)*, "ofp2", OrderedFixedP2, fixed_db_config);
$cb!($($args)*, "ovp2", OrderedVariableP2, variable_db_config);
$cb!($($args)*, "uf_mmb", MmbUnorderedFixed, fixed_db_config);
$cb!($($args)*, "uv_mmb", MmbUnorderedVariable, variable_db_config);
$cb!($($args)*, "of_mmb", MmbOrderedFixed, fixed_db_config);
$cb!($($args)*, "ov_mmb", MmbOrderedVariable, variable_db_config);
};
}
macro_rules! test_with_reopen {
($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
let p = concat!($l, "_", $sfx);
Box::pin(async {
let ctx = $ctx.with_label($l);
let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
.await
.unwrap();
$f(
ctx,
db,
|ctx| {
Box::pin(async move {
<$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
.await
.unwrap()
})
},
to_digest,
)
.await;
})
.await
}};
}
macro_rules! test_with_make_value {
($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
let p = concat!($l, "_", $sfx);
Box::pin(async {
let ctx = $ctx.with_label($l);
let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
.await
.unwrap();
$f(ctx, db, to_digest).await;
})
.await
}};
}
macro_rules! for_all_variants {
($ctx:expr, $sfx:expr, with_reopen: $f:expr) => {{
with_all_variants!(test_with_reopen!($ctx, $sfx, $f));
}};
($ctx:expr, $sfx:expr, with_make_value: $f:expr) => {{
with_all_variants!(test_with_make_value!($ctx, $sfx, $f));
}};
}
macro_rules! for_mmr_variants {
($ctx:expr, $sfx:expr, with_reopen: $f:expr) => {{
with_mmr_variants!(test_with_reopen!($ctx, $sfx, $f));
}};
($ctx:expr, $sfx:expr, with_make_value: $f:expr) => {{
with_mmr_variants!(test_with_make_value!($ctx, $sfx, $f));
}};
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_log_replay() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
for_all_variants!(context, "lr", with_reopen: test_any_db_log_replay);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_build_and_authenticate() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
for_mmr_variants!(context, "baa", with_reopen: test_any_db_build_and_authenticate);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_historical_proof_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
for_mmr_variants!(context, "hpb", with_make_value: test_any_db_historical_proof_basic);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_historical_proof_invalid() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
for_mmr_variants!(context, "hpi", with_make_value: test_any_db_historical_proof_invalid);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_historical_proof_edge_cases() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
for_mmr_variants!(context, "hpec", with_make_value: test_any_db_historical_proof_edge_cases);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_multiple_commits_delete_replayed() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
for_all_variants!(context, "mcdr", with_reopen: test_any_db_multiple_commits_delete_replayed);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_non_empty_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
for_all_variants!(context, "ner", with_reopen: test_any_db_non_empty_recovery);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_empty_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
for_all_variants!(context, "er", with_reopen: test_any_db_empty_recovery);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_all_variants_rewind_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
for_mmr_variants!(context, "rr", with_reopen: test_any_db_rewind_recovery);
});
}
fn key(i: u64) -> Digest {
Sha256::hash(&i.to_be_bytes())
}
fn val(i: u64) -> Digest {
Sha256::hash(&(i + 10000).to_be_bytes())
}
async fn commit_writes(
db: &mut UnorderedVariable,
writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
metadata: Option<Digest>,
) -> std::ops::Range<crate::mmr::Location> {
let mut batch = db.new_batch();
for (k, v) in writes {
batch = batch.write(k, v);
}
let merkleized = batch.merkleize(&*db, metadata).await.unwrap();
let range = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
range
}
#[test_traced("INFO")]
fn test_any_batch_empty() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("e", &ctx))
.await
.unwrap();
let root_before = db.root();
let batch = db.new_batch();
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_ne!(db.root(), root_before);
commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_metadata() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("m", &ctx))
.await
.unwrap();
let metadata = val(42);
commit_writes(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
let batch = db.new_batch();
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_get_read_through() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("g", &ctx))
.await
.unwrap();
let ka = key(0);
let va = val(0);
commit_writes(&mut db, [(ka, Some(va))], None).await;
let kb = key(1);
let vb = val(1);
let kc = key(2);
let mut batch = db.new_batch();
assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va));
batch = batch.write(kb, Some(vb));
assert_eq!(batch.get(&kb, &db).await.unwrap(), Some(vb));
assert_eq!(batch.get(&kc, &db).await.unwrap(), None);
let va2 = val(100);
batch = batch.write(ka, Some(va2));
assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va2));
batch = batch.write(ka, None);
assert_eq!(batch.get(&ka, &db).await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_get_on_merkleized() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("mg", &ctx))
.await
.unwrap();
let ka = key(0);
let kb = key(1);
let kc = key(2);
let kd = key(3);
commit_writes(&mut db, [(ka, Some(val(0))), (kb, Some(val(1)))], None).await;
let va2 = val(100);
let vc = val(2);
let mut batch = db.new_batch();
batch = batch.write(ka, Some(va2));
batch = batch.write(kb, None);
batch = batch.write(kc, Some(vc));
let merkleized = batch.merkleize(&db, None).await.unwrap();
assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
assert_eq!(merkleized.get(&kb, &db).await.unwrap(), None);
assert_eq!(merkleized.get(&kc, &db).await.unwrap(), Some(vc));
assert_eq!(merkleized.get(&kd, &db).await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_stacked_get() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("sg", &ctx))
.await
.unwrap();
let ka = key(0);
let kb = key(1);
let mut batch = db.new_batch();
batch = batch.write(ka, Some(val(0)));
let merkleized = batch.merkleize(&db, None).await.unwrap();
let mut child = merkleized.new_batch::<Sha256>();
assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(0)));
child = child.write(ka, Some(val(100)));
assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(100)));
child = child.write(kb, Some(val(1)));
assert_eq!(child.get(&kb, &db).await.unwrap(), Some(val(1)));
child = child.write(ka, None);
assert_eq!(child.get(&ka, &db).await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_stacked_delete_recreate() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dr", &ctx))
.await
.unwrap();
let ka = key(0);
commit_writes(&mut db, [(ka, Some(val(0)))], None).await;
let mut parent = db.new_batch();
parent = parent.write(ka, None);
let parent_m = parent.merkleize(&db, None).await.unwrap();
assert_eq!(parent_m.get(&ka, &db).await.unwrap(), None);
let mut child = parent_m.new_batch::<Sha256>();
child = child.write(ka, Some(val(200)));
let child_m = child.merkleize(&db, None).await.unwrap();
assert_eq!(child_m.get(&ka, &db).await.unwrap(), Some(val(200)));
db.apply_batch(child_m).await.unwrap();
assert_eq!(db.get(&ka).await.unwrap(), Some(val(200)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_floor_raise() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("fr", &ctx))
.await
.unwrap();
let init: Vec<_> = (0..100).map(|i| (key(i), Some(val(i)))).collect();
commit_writes(&mut db, init, None).await;
let floor_before = db.inactivity_floor_loc();
let updates: Vec<_> = (0..30).map(|i| (key(i), Some(val(i + 500)))).collect();
commit_writes(&mut db, updates, None).await;
assert!(db.inactivity_floor_loc() > floor_before);
for i in 0..30 {
assert_eq!(
db.get(&key(i)).await.unwrap(),
Some(val(i + 500)),
"updated key {i} mismatch"
);
}
for i in 30..100 {
assert_eq!(
db.get(&key(i)).await.unwrap(),
Some(val(i)),
"untouched key {i} mismatch"
);
}
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_apply_returns_range() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ar", &ctx))
.await
.unwrap();
let writes: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
let range1 = commit_writes(&mut db, writes, None).await;
assert_eq!(range1.start, crate::mmr::Location::new(1));
assert!(range1.end.saturating_sub(*range1.start) >= 6);
let writes: Vec<_> = (5..10).map(|i| (key(i), Some(val(i)))).collect();
let range2 = commit_writes(&mut db, writes, None).await;
assert_eq!(range2.start, range1.end);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_deep_chain() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dc", &ctx))
.await
.unwrap();
let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
commit_writes(&mut db, init, None).await;
let mut parent = db.new_batch();
parent = parent.write(key(0), Some(val(100)));
parent = parent.write(key(5), Some(val(5)));
let parent_m = parent.merkleize(&db, None).await.unwrap();
let mut child = parent_m.new_batch::<Sha256>();
child = child.write(key(1), Some(val(101)));
child = child.write(key(6), Some(val(6)));
let child_m = child.merkleize(&db, None).await.unwrap();
let mut grandchild = child_m.new_batch::<Sha256>();
grandchild = grandchild.write(key(2), None);
grandchild = grandchild.write(key(7), Some(val(7)));
let grandchild_m = grandchild.merkleize(&db, None).await.unwrap();
assert_eq!(
grandchild_m.get(&key(0), &db).await.unwrap(),
Some(val(100))
);
assert_eq!(
grandchild_m.get(&key(1), &db).await.unwrap(),
Some(val(101))
);
assert_eq!(grandchild_m.get(&key(2), &db).await.unwrap(), None);
assert_eq!(grandchild_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
db.apply_batch(grandchild_m).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(101)));
assert_eq!(db.get(&key(2)).await.unwrap(), None);
assert_eq!(db.get(&key(3)).await.unwrap(), Some(val(3)));
assert_eq!(db.get(&key(4)).await.unwrap(), Some(val(4)));
assert_eq!(db.get(&key(5)).await.unwrap(), Some(val(5)));
assert_eq!(db.get(&key(6)).await.unwrap(), Some(val(6)));
assert_eq!(db.get(&key(7)).await.unwrap(), Some(val(7)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_chain_matches_sequential() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let ctx_a = ctx.with_label("a");
let mut db_a: UnorderedVariable = UnorderedVariableDb::init(
ctx_a.clone(),
variable_db_config::<OneCap>("cms-a", &ctx_a),
)
.await
.unwrap();
let ctx_b = ctx.with_label("b");
let mut db_b: UnorderedVariable = UnorderedVariableDb::init(
ctx_b.clone(),
variable_db_config::<OneCap>("cms-b", &ctx_b),
)
.await
.unwrap();
let writes1: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
let writes2 = vec![
(key(0), Some(val(100))),
(key(1), None),
(key(5), Some(val(5))),
];
commit_writes(&mut db_a, writes1.clone(), None).await;
commit_writes(&mut db_a, writes2.clone(), None).await;
let mut parent = db_b.new_batch();
for (k, v) in &writes1 {
parent = parent.write(*k, *v);
}
let parent_m = parent.merkleize(&db_b, None).await.unwrap();
let mut child = parent_m.new_batch::<Sha256>();
for (k, v) in &writes2 {
child = child.write(*k, *v);
}
let child_m = child.merkleize(&db_b, None).await.unwrap();
db_b.apply_batch(child_m).await.unwrap();
assert_eq!(db_a.root(), db_b.root());
for i in 0..6 {
assert_eq!(
db_a.get(&key(i)).await.unwrap(),
db_b.get(&key(i)).await.unwrap(),
"key {i} mismatch"
);
}
db_a.destroy().await.unwrap();
db_b.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_create_then_delete_same_batch() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("cd", &ctx))
.await
.unwrap();
commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
let mut batch = db.new_batch();
batch = batch.write(key(1), Some(val(1))); batch = batch.write(key(1), None); batch = batch.write(key(2), Some(val(2))); batch = batch.write(key(0), None); let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), None);
assert_eq!(db.get(&key(1)).await.unwrap(), None);
assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_delete_all_keys() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("da", &ctx))
.await
.unwrap();
let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
commit_writes(&mut db, init, None).await;
let deletes: Vec<_> = (0..5).map(|i| (key(i), None)).collect();
commit_writes(&mut db, deletes, None).await;
for i in 0..5 {
assert_eq!(db.get(&key(i)).await.unwrap(), None, "key {i} not deleted");
}
commit_writes(&mut db, [(key(10), Some(val(10)))], None).await;
assert_eq!(db.get(&key(10)).await.unwrap(), Some(val(10)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_parallel_forks() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("pf", &ctx))
.await
.unwrap();
commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
let root_before = db.root();
let fork_a_m = db
.new_batch()
.write(key(0), Some(val(100)))
.write(key(1), Some(val(1)))
.merkleize(&db, None)
.await
.unwrap();
let fork_b_m = db
.new_batch()
.write(key(0), None)
.write(key(2), Some(val(2)))
.merkleize(&db, None)
.await
.unwrap();
assert_ne!(fork_a_m.root(), fork_b_m.root());
assert_eq!(db.root(), root_before);
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), None);
db.apply_batch(fork_a_m).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
assert_eq!(db.get(&key(2)).await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_floor_raise_chained() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("frc", &ctx))
.await
.unwrap();
let init: Vec<_> = (0..50).map(|i| (key(i), Some(val(i)))).collect();
commit_writes(&mut db, init, None).await;
let floor_before = db.inactivity_floor_loc();
let mut parent = db.new_batch();
for i in 0..20 {
parent = parent.write(key(i), Some(val(i + 500)));
}
let parent_m = parent.merkleize(&db, None).await.unwrap();
let mut child = parent_m.new_batch::<Sha256>();
for i in 20..30 {
child = child.write(key(i), Some(val(i + 500)));
}
let child_m = child.merkleize(&db, None).await.unwrap();
db.apply_batch(child_m).await.unwrap();
assert!(db.inactivity_floor_loc() > floor_before);
for i in 0..30 {
assert_eq!(
db.get(&key(i)).await.unwrap(),
Some(val(i + 500)),
"updated key {i} mismatch"
);
}
for i in 30..50 {
assert_eq!(
db.get(&key(i)).await.unwrap(),
Some(val(i)),
"untouched key {i} mismatch"
);
}
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_abandoned() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ab", &ctx))
.await
.unwrap();
commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
let root_before = db.root();
{
let mut batch = db.new_batch();
batch = batch.write(key(0), Some(val(999)));
batch = batch.write(key(1), Some(val(1)));
let _merkleized = batch.merkleize(&db, None).await.unwrap();
}
assert_eq!(db.root(), root_before);
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_apply_requires_commit_for_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let partition = "apply_requires_commit";
let ctx = context.with_label("db");
let mut db: UnorderedVariable = UnorderedVariableDb::init(
ctx.clone(),
variable_db_config::<OneCap>(partition, &ctx),
)
.await
.unwrap();
let committed_root = db.root();
let merkleized = db
.new_batch()
.write(key(0), Some(val(0)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
drop(db);
let reopened: UnorderedVariable = UnorderedVariableDb::init(
context.with_label("reopen"),
variable_db_config::<OneCap>(partition, &context),
)
.await
.unwrap();
assert_eq!(reopened.root(), committed_root);
assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
reopened.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_rewind_pruned_target_errors() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const KEYS: u64 = 64;
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("rp", &ctx))
.await
.unwrap();
let initial: Vec<_> = (0..KEYS).map(|i| (key(i), Some(val(i)))).collect();
let first_range = commit_writes(&mut db, initial, None).await;
let mut round = 0u64;
loop {
round += 1;
assert!(
round <= 64,
"failed to prune enough history for rewind test"
);
let updates: Vec<_> = (0..KEYS)
.map(|i| (key(i), Some(val(1000 + round * KEYS + i))))
.collect();
commit_writes(&mut db, updates, None).await;
db.prune(db.inactivity_floor_loc()).await.unwrap();
let bounds = db.bounds().await;
if bounds.start > first_range.start {
break;
}
}
let oldest_retained = db.bounds().await.start;
let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
assert!(
matches!(
boundary_err,
crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
),
"unexpected rewind error at retained boundary: {boundary_err:?}"
);
let err = db.rewind(first_range.start).await.unwrap_err();
assert!(
matches!(
err,
crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
),
"unexpected rewind error: {err:?}"
);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_rewind_invalid_target_errors() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ri", &ctx))
.await
.unwrap();
let root_before = db.root();
let size_before = db.size().await;
let no_op_locs = db.rewind(size_before).await.unwrap();
assert!(
no_op_locs.is_empty(),
"expected no-op rewind to return no restored locations"
);
assert_eq!(db.root(), root_before);
assert_eq!(db.size().await, size_before);
let zero_err = db.rewind(Location::new(0)).await.unwrap_err();
assert!(
matches!(
zero_err,
crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(0))
),
"unexpected rewind error: {zero_err:?}"
);
assert_eq!(db.root(), root_before);
assert_eq!(db.size().await, size_before);
let too_large_target = Location::new(*size_before + 1);
let too_large_err = db.rewind(too_large_target).await.unwrap_err();
assert!(
matches!(
too_large_err,
crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(size))
if size == *too_large_target
),
"unexpected rewind error: {too_large_err:?}"
);
assert_eq!(db.root(), root_before);
assert_eq!(db.size().await, size_before);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_rewind_rejects_target_with_pruned_floor() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const KEYS: u64 = 64;
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("rf", &ctx))
.await
.unwrap();
commit_writes(&mut db, (0..KEYS).map(|i| (key(i), Some(val(i)))), None).await;
commit_writes(
&mut db,
(0..KEYS).map(|i| (key(i), Some(val(1_000 + i)))),
None,
)
.await;
let rewind_target = db.size().await;
let target_floor = db.inactivity_floor_loc();
let prune_loc = Location::new(*target_floor + (KEYS / 2));
assert!(
rewind_target > *prune_loc,
"test setup expected target size > prune_loc; target={rewind_target:?}, floor={target_floor:?}"
);
let mut round = 0u64;
while db.inactivity_floor_loc() < prune_loc {
round += 1;
assert!(
round <= 8,
"failed to advance inactivity floor enough for floor-pruned rewind test"
);
commit_writes(
&mut db,
(0..KEYS).map(|i| (key(i), Some(val(10_000 + round * KEYS + i)))),
None,
)
.await;
}
db.prune(prune_loc).await.unwrap();
let bounds = db.bounds().await;
assert!(
bounds.start > *target_floor,
"test setup expected pruned start beyond target floor; bounds={bounds:?}, target_floor={target_floor:?}"
);
assert!(
rewind_target > bounds.start,
"test setup expected target commit retained; target={rewind_target:?}, bounds={bounds:?}"
);
let err = db.rewind(rewind_target).await.unwrap_err();
assert!(
matches!(
err,
crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
),
"unexpected rewind error: {err:?}"
);
db.destroy().await.unwrap();
});
}
type MmbVariable = super::db::Db<
crate::merkle::mmb::Family,
Context,
crate::journal::contiguous::variable::Journal<
Context,
super::operation::Operation<
crate::merkle::mmb::Family,
super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
>,
>,
crate::index::unordered::Index<OneCap, crate::merkle::Location<crate::merkle::mmb::Family>>,
Sha256,
super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
>;
async fn open_mmb_db(context: Context, suffix: &str) -> MmbVariable {
let cfg = variable_db_config::<OneCap>(suffix, &context);
super::init(context, cfg, None, |_, _| {}).await.unwrap()
}
async fn commit_writes_mmb(
db: &mut MmbVariable,
writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
metadata: Option<Digest>,
) {
let mut batch = db.new_batch();
for (k, v) in writes {
batch = batch.write(k, v);
}
let merkleized = batch.merkleize(db, metadata).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
}
#[test_traced("INFO")]
fn test_mmb_batch_crud() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_mmb_db(context.with_label("db"), "crud").await;
commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
commit_writes_mmb(&mut db, [(key(0), Some(val(1)))], None).await;
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(1)));
commit_writes_mmb(&mut db, [(key(0), None)], None).await;
assert!(db.get(&key(0)).await.unwrap().is_none());
commit_writes_mmb(
&mut db,
[(key(1), Some(val(1))), (key(2), Some(val(2)))],
None,
)
.await;
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_mmb_batch_empty() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_mmb_db(context.with_label("db"), "empty").await;
let root_before = db.root();
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_ne!(db.root(), root_before);
commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_mmb_batch_metadata() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_mmb_db(context.with_label("db"), "meta").await;
let metadata = val(42);
commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_mmb_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_mmb_db(context.with_label("db0"), "recovery").await;
commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(val(99))).await;
commit_writes_mmb(&mut db, [(key(1), Some(val(1)))], None).await;
let root = db.root();
let bounds = db.bounds().await;
db.sync().await.unwrap();
drop(db);
let db = open_mmb_db(context.with_label("db1"), "recovery").await;
assert_eq!(db.root(), root);
assert_eq!(db.bounds().await, bounds);
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
assert_eq!(db.get_metadata().await.unwrap(), None);
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_mmb_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_mmb_db(context.with_label("db"), "prune").await;
for i in 0u64..20 {
commit_writes_mmb(&mut db, [(key(i), Some(val(i)))], None).await;
}
let floor = db.inactivity_floor_loc();
db.prune(floor).await.unwrap();
for i in 0u64..20 {
assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
}
db.destroy().await.unwrap();
});
}
#[test_traced("INFO")]
fn test_any_batch_single_stage_pipeline() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ctx = context.with_label("db");
let mut db: UnorderedVariable =
UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("pipe", &ctx))
.await
.unwrap();
{
let mut batch = db.new_batch();
batch = batch.write(key(0), Some(val(0)));
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
let (child_merkleized, commit_result) = futures::join!(
async {
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
let mut child = db.new_batch();
child = child.write(key(1), Some(val(1)));
child.merkleize(&db, None).await.unwrap()
},
db.commit(),
);
commit_result.unwrap();
db.apply_batch(child_merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
db.destroy().await.unwrap();
});
}
}