use crate::{
index::unordered::Index,
journal::contiguous::fixed::Journal,
merkle::{self, Location},
qmdb::{
any::{unordered, value::FixedEncoding, FixedConfig as Config, FixedValue},
Error,
},
translator::Translator,
Context,
};
use commonware_cryptography::Hasher;
use commonware_utils::Array;
pub type Update<K, V> = unordered::Update<K, FixedEncoding<V>>;
pub type Operation<F, K, V> = unordered::Operation<F, K, FixedEncoding<V>>;
pub type Db<F, E, K, V, H, T> =
super::Db<F, E, Journal<E, Operation<F, K, V>>, Index<T, Location<F>>, H, Update<K, V>>;
impl<F: merkle::Family, E: Context, K: Array, V: FixedValue, H: Hasher, T: Translator>
Db<F, E, K, V, H, T>
{
pub async fn init(context: E, cfg: Config<T>) -> Result<Self, Error<F>> {
Self::init_with_callback(context, cfg, None, |_, _| {}).await
}
pub(crate) async fn init_with_callback(
context: E,
cfg: Config<T>,
known_inactivity_floor: Option<Location<F>>,
callback: impl FnMut(bool, Option<Location<F>>),
) -> Result<Self, Error<F>> {
crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
}
}
pub mod partitioned {
pub use super::{Operation, Update};
use crate::{
index::partitioned::unordered::Index,
journal::contiguous::fixed::Journal,
merkle::{self, Location},
qmdb::{
any::{FixedConfig as Config, FixedValue},
Error,
},
translator::Translator,
Context,
};
use commonware_cryptography::Hasher;
use commonware_utils::Array;
pub type Db<F, E, K, V, H, T, const P: usize> = crate::qmdb::any::unordered::Db<
F,
E,
Journal<E, Operation<F, K, V>>,
Index<T, Location<F>, P>,
H,
Update<K, V>,
>;
impl<
F: merkle::Family,
E: Context,
K: Array,
V: FixedValue,
H: Hasher,
T: Translator,
const P: usize,
> Db<F, E, K, V, H, T, P>
{
pub async fn init(context: E, cfg: Config<T>) -> Result<Self, Error<F>> {
Self::init_with_callback(context, cfg, None, |_, _| {}).await
}
pub(crate) async fn init_with_callback(
context: E,
cfg: Config<T>,
known_inactivity_floor: Option<Location<F>>,
callback: impl FnMut(bool, Option<Location<F>>),
) -> Result<Self, Error<F>> {
crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
}
}
pub mod p256 {
pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 1>;
}
pub mod p64k {
pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 2>;
}
}
#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::{
index::Unordered as _,
merkle::{
mmr::{self, Location, StandardHasher},
Location as GenericLocation,
},
qmdb::{
any::{
test::fixed_db_config,
unordered::{fixed::Operation, Update},
},
verify_proof,
},
translator::TwoCap,
};
use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
use commonware_macros::test_traced;
use commonware_math::algebra::Random;
use commonware_runtime::{
deterministic::{self, Context},
Metrics, Runner as _,
};
use commonware_utils::{test_rng_seeded, NZU64};
use rand::RngCore;
type AnyTestGeneric<F> = crate::qmdb::any::db::Db<
F,
deterministic::Context,
Journal<
deterministic::Context,
crate::qmdb::any::operation::Unordered<F, Digest, FixedEncoding<Digest>>,
>,
Index<TwoCap, GenericLocation<F>>,
Sha256,
crate::qmdb::any::operation::update::Unordered<Digest, FixedEncoding<Digest>>,
>;
pub(crate) type AnyTest =
Db<mmr::Family, deterministic::Context, Digest, Digest, Sha256, TwoCap>;
async fn open_db_generic<F: crate::merkle::Family>(
context: deterministic::Context,
) -> AnyTestGeneric<F> {
let cfg = fixed_db_config::<TwoCap>("partition", &context);
crate::qmdb::any::init(context, cfg, None, |_, _| {})
.await
.unwrap()
}
pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
let seed = context.next_u64();
let cfg = fixed_db_config::<TwoCap>(&seed.to_string(), &context);
AnyTest::init(context, cfg).await.unwrap()
}
pub(crate) fn create_test_ops(n: usize) -> Vec<Operation<mmr::Family, Digest, Digest>> {
create_test_ops_seeded(n, 0)
}
pub(crate) fn create_test_ops_seeded(
n: usize,
seed: u64,
) -> Vec<Operation<mmr::Family, Digest, Digest>> {
let mut rng = test_rng_seeded(seed);
let mut prev_key = Digest::random(&mut rng);
let mut ops = Vec::new();
for i in 0..n {
let key = Digest::random(&mut rng);
if i % 10 == 0 && i > 0 {
ops.push(Operation::Delete(prev_key));
} else {
let value = Digest::random(&mut rng);
ops.push(Operation::Update(Update(key, value)));
prev_key = key;
}
}
ops
}
pub(crate) async fn apply_ops(
db: &mut AnyTest,
ops: Vec<Operation<mmr::Family, Digest, Digest>>,
) {
let mut batch = db.new_batch();
for op in ops {
match op {
Operation::Update(Update(key, value)) => {
batch = batch.write(key, Some(value));
}
Operation::Delete(key) => {
batch = batch.write(key, None);
}
Operation::CommitFloor(_, _) => {
panic!("CommitFloor not supported in apply_ops");
}
}
}
let merkleized = batch.merkleize(db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
async fn commit_writes_generic<F: crate::merkle::Family>(
db: &mut AnyTestGeneric<F>,
writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
metadata: Option<Digest>,
) -> std::ops::Range<GenericLocation<F>> {
let mut batch = db.new_batch();
for (k, v) in writes {
batch = batch.write(k, v);
}
let merkleized = batch.merkleize(db, metadata).await.unwrap();
let range = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
range
}
fn key(i: u64) -> Digest {
Sha256::hash(&i.to_be_bytes())
}
fn val(i: u64) -> Digest {
Sha256::hash(&(i + 10000).to_be_bytes())
}
async fn batch_empty_inner<F: crate::merkle::Family>(context: deterministic::Context) {
let mut db = open_db_generic::<F>(context.with_label("db")).await;
let root_before = db.root();
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_ne!(db.root(), root_before);
commit_writes_generic(&mut db, [(key(0), Some(val(0)))], None).await;
assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
db.destroy().await.unwrap();
}
async fn batch_metadata_inner<F: crate::merkle::Family>(context: deterministic::Context) {
let mut db = open_db_generic::<F>(context.with_label("db")).await;
let metadata = val(42);
commit_writes_generic(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), None);
db.destroy().await.unwrap();
}
async fn batch_get_read_through_inner<F: crate::merkle::Family>(
context: deterministic::Context,
) {
let mut db = open_db_generic::<F>(context.with_label("db")).await;
let ka = key(0);
let va = val(0);
commit_writes_generic(&mut db, [(ka, Some(va))], None).await;
let kb = key(1);
let vb = val(1);
let kc = key(2);
let mut batch = db.new_batch();
assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va));
batch = batch.write(kb, Some(vb));
assert_eq!(batch.get(&kb, &db).await.unwrap(), Some(vb));
assert_eq!(batch.get(&kc, &db).await.unwrap(), None);
let va2 = val(100);
batch = batch.write(ka, Some(va2));
assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va2));
batch = batch.write(ka, None);
assert_eq!(batch.get(&ka, &db).await.unwrap(), None);
db.destroy().await.unwrap();
}
async fn batch_get_on_merkleized_inner<F: crate::merkle::Family>(
context: deterministic::Context,
) {
let mut db = open_db_generic::<F>(context.with_label("db")).await;
let ka = key(0);
let kb = key(1);
let kc = key(2);
let kd = key(3);
commit_writes_generic(&mut db, [(ka, Some(val(0))), (kb, Some(val(1)))], None).await;
let va2 = val(100);
let vc = val(2);
let merkleized = db
.new_batch()
.write(ka, Some(va2))
.write(kb, None)
.write(kc, Some(vc))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
assert_eq!(merkleized.get(&kb, &db).await.unwrap(), None);
assert_eq!(merkleized.get(&kc, &db).await.unwrap(), Some(vc));
assert_eq!(merkleized.get(&kd, &db).await.unwrap(), None);
db.destroy().await.unwrap();
}
async fn batch_stacked_get_inner<F: crate::merkle::Family>(context: deterministic::Context) {
let db = open_db_generic::<F>(context.with_label("db")).await;
let ka = key(0);
let kb = key(1);
let merkleized = db
.new_batch()
.write(ka, Some(val(0)))
.merkleize(&db, None)
.await
.unwrap();
let mut child = merkleized.new_batch();
assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(0)));
child = child.write(ka, Some(val(100)));
assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(100)));
child = child.write(kb, Some(val(1)));
assert_eq!(child.get(&kb, &db).await.unwrap(), Some(val(1)));
child = child.write(ka, None);
assert_eq!(child.get(&ka, &db).await.unwrap(), None);
drop(child);
drop(merkleized);
db.destroy().await.unwrap();
}
async fn batch_stacked_delete_recreate_inner<F: crate::merkle::Family>(
context: deterministic::Context,
) {
let mut db = open_db_generic::<F>(context.with_label("db")).await;
let ka = key(0);
commit_writes_generic(&mut db, [(ka, Some(val(0)))], None).await;
let parent_m = db
.new_batch()
.write(ka, None)
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(parent_m.get(&ka, &db).await.unwrap(), None);
let child_m = parent_m
.new_batch()
.write(ka, Some(val(200)))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(child_m.get(&ka, &db).await.unwrap(), Some(val(200)));
db.apply_batch(child_m).await.unwrap();
assert_eq!(db.get(&ka).await.unwrap(), Some(val(200)));
db.destroy().await.unwrap();
}
async fn batch_apply_returns_range_inner<F: crate::merkle::Family>(
context: deterministic::Context,
) {
let mut db = open_db_generic::<F>(context.with_label("db")).await;
let writes: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
let range1 = commit_writes_generic(&mut db, writes, None).await;
assert_eq!(range1.start, GenericLocation::<F>::new(1));
assert!(range1.end.saturating_sub(*range1.start) >= 6);
let writes: Vec<_> = (5..10).map(|i| (key(i), Some(val(i)))).collect();
let range2 = commit_writes_generic(&mut db, writes, None).await;
assert_eq!(range2.start, range1.end);
db.destroy().await.unwrap();
}
async fn batch_speculative_root_inner<F: crate::merkle::Family>(
context: deterministic::Context,
) {
let mut db = open_db_generic::<F>(context.with_label("db")).await;
let mut batch = db.new_batch();
for i in 0..10 {
batch = batch.write(key(i), Some(val(i)));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
let speculative_root = merkleized.root();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.root(), speculative_root);
db.destroy().await.unwrap();
}
async fn log_replay_inner<F: crate::merkle::Family>(context: deterministic::Context) {
let db_context = context.with_label("db");
let mut db = open_db_generic::<F>(db_context.clone()).await;
const UPDATES: u64 = 100;
let k = Sha256::hash(&UPDATES.to_be_bytes());
let mut batch = db.new_batch();
for i in 0u64..UPDATES {
let v = Sha256::hash(&(i * 1000).to_be_bytes());
batch = batch.write(k, Some(v));
}
let merkleized = batch.merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let root = db.root();
drop(db);
let db: AnyTestGeneric<F> = open_db_generic::<F>(db_context.with_label("reopened")).await;
let iter = db.snapshot.get(&k);
assert_eq!(iter.cloned().collect::<Vec<_>>().len(), 1);
assert_eq!(db.root(), root);
db.destroy().await.unwrap();
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_empty() {
let executor = deterministic::Runner::default();
executor.start(batch_empty_inner::<mmr::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_metadata() {
let executor = deterministic::Runner::default();
executor.start(batch_metadata_inner::<mmr::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_get_read_through() {
let executor = deterministic::Runner::default();
executor.start(batch_get_read_through_inner::<mmr::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_get_on_merkleized() {
let executor = deterministic::Runner::default();
executor.start(batch_get_on_merkleized_inner::<mmr::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_stacked_get() {
let executor = deterministic::Runner::default();
executor.start(batch_stacked_get_inner::<mmr::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_stacked_delete_recreate() {
let executor = deterministic::Runner::default();
executor.start(batch_stacked_delete_recreate_inner::<mmr::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_apply_returns_range() {
let executor = deterministic::Runner::default();
executor.start(batch_apply_returns_range_inner::<mmr::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_speculative_root() {
let executor = deterministic::Runner::default();
executor.start(batch_speculative_root_inner::<mmr::Family>);
}
#[test_traced("WARN")]
fn test_any_fixed_db_log_replay() {
let executor = deterministic::Runner::default();
executor.start(log_replay_inner::<mmr::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_empty_mmb() {
let executor = deterministic::Runner::default();
executor.start(batch_empty_inner::<crate::merkle::mmb::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_metadata_mmb() {
let executor = deterministic::Runner::default();
executor.start(batch_metadata_inner::<crate::merkle::mmb::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_get_read_through_mmb() {
let executor = deterministic::Runner::default();
executor.start(batch_get_read_through_inner::<crate::merkle::mmb::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_get_on_merkleized_mmb() {
let executor = deterministic::Runner::default();
executor.start(batch_get_on_merkleized_inner::<crate::merkle::mmb::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_stacked_get_mmb() {
let executor = deterministic::Runner::default();
executor.start(batch_stacked_get_inner::<crate::merkle::mmb::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_stacked_delete_recreate_mmb() {
let executor = deterministic::Runner::default();
executor.start(batch_stacked_delete_recreate_inner::<crate::merkle::mmb::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_apply_returns_range_mmb() {
let executor = deterministic::Runner::default();
executor.start(batch_apply_returns_range_inner::<crate::merkle::mmb::Family>);
}
#[test_traced("INFO")]
fn test_unordered_fixed_batch_speculative_root_mmb() {
let executor = deterministic::Runner::default();
executor.start(batch_speculative_root_inner::<crate::merkle::mmb::Family>);
}
#[test_traced("WARN")]
fn test_any_fixed_db_log_replay_mmb() {
let executor = deterministic::Runner::default();
executor.start(log_replay_inner::<crate::merkle::mmb::Family>);
}
#[test]
fn test_any_fixed_db_historical_proof_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context.clone()).await;
let ops = create_test_ops(20);
apply_ops(&mut db, ops.clone()).await;
let root_hash = db.root();
let original_op_count = db.bounds().await.end;
let max_ops = NZU64!(10);
let (historical_proof, historical_ops) = db
.historical_proof(original_op_count, Location::new(6), max_ops)
.await
.unwrap();
let (regular_proof, regular_ops) = db.proof(Location::new(6), max_ops).await.unwrap();
assert_eq!(historical_proof.leaves, regular_proof.leaves);
assert_eq!(historical_proof.digests, regular_proof.digests);
assert_eq!(historical_ops, regular_ops);
let hasher = StandardHasher::<Sha256>::new();
assert!(verify_proof(
&hasher,
&historical_proof,
Location::new(6),
&historical_ops,
&root_hash
));
let more_ops = create_test_ops_seeded(5, 1);
apply_ops(&mut db, more_ops.clone()).await;
let (historical_proof, historical_ops) = db
.historical_proof(original_op_count, Location::new(6), NZU64!(10))
.await
.unwrap();
assert_eq!(historical_proof.leaves, original_op_count);
assert_eq!(historical_proof.leaves, regular_proof.leaves);
assert_eq!(historical_ops.len(), 10);
assert_eq!(historical_proof.digests, regular_proof.digests);
assert_eq!(historical_ops, regular_ops);
assert!(verify_proof(
&hasher,
&historical_proof,
Location::new(6),
&historical_ops,
&root_hash
));
assert!(matches!(
db.historical_proof(db.bounds().await.end + 1, Location::new(6), NZU64!(10))
.await,
Err(Error::Merkle(crate::mmr::Error::RangeOutOfBounds(_)))
));
db.destroy().await.unwrap();
});
}
#[test]
fn test_any_fixed_db_historical_proof_edge_cases() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let hasher = StandardHasher::<Sha256>::new();
let ops = create_test_ops(50);
let mut db = create_test_db(context.with_label("first")).await;
apply_ops(&mut db, ops.clone()).await;
let root = db.root();
let full_size = db.bounds().await.end;
let (proof, proof_ops) = db.proof(Location::new(1), NZU64!(1)).await.unwrap();
assert_eq!(proof_ops.len(), 1);
assert!(verify_proof(
&hasher,
&proof,
Location::new(1),
&proof_ops,
&root
));
let (hp, hp_ops) = db
.historical_proof(full_size, Location::new(1), NZU64!(1))
.await
.unwrap();
assert_eq!(hp.digests, proof.digests);
assert_eq!(hp_ops, proof_ops);
let (_proof, limited_ops) = db
.historical_proof(Location::new(11), Location::new(6), NZU64!(20))
.await
.unwrap();
assert_eq!(limited_ops.len(), 5);
let (min_proof, min_ops) = db
.historical_proof(Location::new(4), Location::new(1), NZU64!(3))
.await
.unwrap();
assert_eq!(min_proof.leaves, Location::new(4));
assert_eq!(min_ops.len(), 3);
db.destroy().await.unwrap();
});
}
#[test]
fn test_any_fixed_db_historical_proof_different_historical_sizes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let ops = create_test_ops(100);
let hasher = StandardHasher::<Sha256>::new();
let start_loc = Location::new(2);
let max_ops = NZU64!(10);
let mut db = create_test_db(context.with_label("main")).await;
let mut offset = 0usize;
let mut checkpoints = Vec::new();
for chunk in [20usize, 15, 25, 30, 10] {
apply_ops(&mut db, ops[offset..offset + chunk].to_vec()).await;
offset += chunk;
let end_loc = db.bounds().await.end;
let root = db.root();
let (proof, proof_ops) = db.proof(start_loc, max_ops).await.unwrap();
checkpoints.push((end_loc, root, proof, proof_ops));
}
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
for (historical_size, root, reference_proof, reference_ops) in checkpoints {
let (historical_proof, historical_ops) = db
.historical_proof(historical_size, start_loc, max_ops)
.await
.unwrap();
assert_eq!(historical_proof.leaves, reference_proof.leaves);
assert_eq!(historical_proof.digests, reference_proof.digests);
assert_eq!(historical_ops, reference_ops);
assert!(verify_proof(
&hasher,
&historical_proof,
start_loc,
&historical_ops,
&root
));
}
let full_root = db.root();
let (full_proof, full_ops) = db.proof(start_loc, max_ops).await.unwrap();
assert!(verify_proof(
&hasher,
&full_proof,
start_loc,
&full_ops,
&full_root
));
db.destroy().await.unwrap();
});
}
fn is_send<T: Send>(_: T) {}
#[allow(dead_code)]
fn assert_non_trait_futures_are_send(db: &AnyTest, key: Digest, value: Digest) {
let batch = db.new_batch().write(key, Some(value));
is_send(batch.merkleize(db, None));
is_send(db.get_with_loc(&key));
}
mod from_sync_testable {
use super::*;
use crate::{
merkle::{
mmr::{self, journaled::Mmr},
Family as _,
},
qmdb::any::sync::tests::FromSyncTestable,
};
use futures::future::join_all;
type TestMmr = Mmr<deterministic::Context, Digest>;
impl FromSyncTestable for AnyTest {
type Mmr = TestMmr;
fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
(self.log.merkle, self.log.journal)
}
async fn pinned_nodes_at(&self, loc: Location) -> Vec<Digest> {
join_all(mmr::Family::nodes_to_pin(loc).map(|p| self.log.merkle.get_node(p)))
.await
.into_iter()
.map(|n| n.unwrap().unwrap())
.collect()
}
}
}
}