use crate::{
index::ordered::Index,
journal::contiguous::variable::Journal,
merkle::{self, Location},
qmdb::{
any::{ordered, value::VariableEncoding, VariableConfig, VariableValue},
operation::Key,
Error,
},
translator::Translator,
Context,
};
use commonware_codec::{Codec, Read};
use commonware_cryptography::Hasher;
pub type Update<K, V> = ordered::Update<K, VariableEncoding<V>>;
pub type Operation<F, K, V> = ordered::Operation<F, K, VariableEncoding<V>>;
pub type Db<F, E, K, V, H, T> =
super::Db<F, E, Journal<E, Operation<F, K, V>>, Index<T, Location<F>>, H, Update<K, V>>;
impl<F: merkle::Family, E: Context, K: Key, V: VariableValue, H: Hasher, T: Translator>
Db<F, E, K, V, H, T>
where
Operation<F, K, V>: Codec,
{
pub async fn init(
context: E,
cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
) -> Result<Self, Error<F>> {
Self::init_with_callback(context, cfg, None, |_, _| {}).await
}
pub(crate) async fn init_with_callback(
context: E,
cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
known_inactivity_floor: Option<Location<F>>,
callback: impl FnMut(bool, Option<Location<F>>),
) -> Result<Self, Error<F>> {
crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
}
}
pub mod partitioned {
pub use super::{Operation, Update};
use crate::{
index::partitioned::ordered::Index,
journal::contiguous::variable::Journal,
merkle::{self, Location},
qmdb::{
any::{VariableConfig, VariableValue},
operation::Key,
Error,
},
translator::Translator,
Context,
};
use commonware_codec::{Codec, Read};
use commonware_cryptography::Hasher;
pub type Db<F, E, K, V, H, T, const P: usize> = crate::qmdb::any::ordered::Db<
F,
E,
Journal<E, Operation<F, K, V>>,
Index<T, Location<F>, P>,
H,
Update<K, V>,
>;
impl<
F: merkle::Family,
E: Context,
K: Key,
V: VariableValue,
H: Hasher,
T: Translator,
const P: usize,
> Db<F, E, K, V, H, T, P>
where
Operation<F, K, V>: Codec,
{
pub async fn init(
context: E,
cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
) -> Result<Self, Error<F>> {
Self::init_with_callback(context, cfg, None, |_, _| {}).await
}
pub(crate) async fn init_with_callback(
context: E,
cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
known_inactivity_floor: Option<Location<F>>,
callback: impl FnMut(bool, Option<Location<F>>),
) -> Result<Self, Error<F>> {
crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
}
}
pub mod p256 {
pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 1>;
}
pub mod p64k {
pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 2>;
}
}
#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::{
mmr,
qmdb::any::{
ordered::test::{
test_ordered_any_db_basic, test_ordered_any_db_empty,
test_ordered_any_update_collision_edge_case,
},
test::variable_db_config,
},
translator::TwoCap,
};
use commonware_cryptography::{sha256::Digest, Sha256};
use commonware_macros::test_traced;
use commonware_math::algebra::Random;
use commonware_runtime::{
buffer::paged::CacheRef,
deterministic::{self, Context},
BufferPooler, Metrics, Runner as _,
};
use commonware_utils::{sequence::FixedBytes, test_rng_seeded, NZUsize, NZU16, NZU64};
use rand::RngCore;
const PAGE_SIZE: u16 = 103;
const PAGE_CACHE_SIZE: usize = 13;
pub(crate) type VarConfig =
VariableConfig<TwoCap, ((), (commonware_codec::RangeCfg<usize>, ()))>;
pub(crate) type AnyTest =
Db<mmr::Family, deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
pub(crate) fn create_test_config(seed: u64, pooler: &impl BufferPooler) -> VarConfig {
let page_cache =
CacheRef::from_pooler(pooler, NZU16!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE));
VariableConfig {
merkle_config: crate::mmr::journaled::Config {
journal_partition: format!("mmr-journal-{seed}"),
metadata_partition: format!("mmr-metadata-{seed}"),
items_per_blob: NZU64!(12), write_buffer: NZUsize!(64),
thread_pool: None,
page_cache: page_cache.clone(),
},
journal_config: crate::journal::contiguous::variable::Config {
partition: format!("log-journal-{seed}"),
items_per_section: NZU64!(14), write_buffer: NZUsize!(64),
compression: None,
codec_config: ((), ((0..=10000).into(), ())),
page_cache,
},
translator: TwoCap,
}
}
pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
let seed = context.next_u64();
let config = create_test_config(seed, &context);
AnyTest::init(context, config).await.unwrap()
}
fn to_bytes(i: u64) -> Vec<u8> {
let len = ((i % 13) + 7) as usize;
vec![(i % 255) as u8; len]
}
pub(crate) fn create_test_ops(n: usize) -> Vec<Operation<mmr::Family, Digest, Vec<u8>>> {
create_test_ops_seeded(n, 0)
}
pub(crate) fn create_test_ops_seeded(
n: usize,
seed: u64,
) -> Vec<Operation<mmr::Family, Digest, Vec<u8>>> {
let mut rng = test_rng_seeded(seed);
let mut prev_key = Digest::random(&mut rng);
let mut ops = Vec::new();
for i in 0..n {
if i % 10 == 0 && i > 0 {
ops.push(Operation::Delete(prev_key));
} else {
let key = Digest::random(&mut rng);
let next_key = Digest::random(&mut rng);
let value = to_bytes(rng.next_u64());
ops.push(Operation::Update(ordered::Update {
key,
value,
next_key,
}));
prev_key = key;
}
}
ops
}
pub(crate) async fn apply_ops(
db: &mut AnyTest,
ops: Vec<Operation<mmr::Family, Digest, Vec<u8>>>,
) {
let mut batch = db.new_batch();
for op in ops {
match op {
Operation::Update(data) => {
batch = batch.write(data.key, Some(data.value));
}
Operation::Delete(key) => {
batch = batch.write(key, None);
}
Operation::CommitFloor(_, _) => {
panic!("CommitFloor not supported in apply_ops");
}
}
}
let merkleized = batch.merkleize(db, None).await.unwrap();
db.apply_batch(merkleized).await.unwrap();
}
type VariableDb = Db<mmr::Family, Context, FixedBytes<4>, Digest, Sha256, TwoCap>;
async fn open_variable_db(context: Context) -> VariableDb {
let cfg = variable_db_config("fixed-bytes-var-partition", &context);
VariableDb::init(context, cfg).await.unwrap()
}
#[test_traced("WARN")]
fn test_ordered_any_variable_db_empty() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let db = open_variable_db(context.with_label("initial")).await;
test_ordered_any_db_empty(context, db, |ctx| Box::pin(open_variable_db(ctx))).await;
});
}
#[test_traced("WARN")]
fn test_ordered_any_variable_db_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let db = open_variable_db(context.with_label("initial")).await;
test_ordered_any_db_basic(context, db, |ctx| Box::pin(open_variable_db(ctx))).await;
});
}
#[test_traced("WARN")]
fn test_ordered_any_update_collision_edge_case_variable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let db = open_variable_db(context.clone()).await;
test_ordered_any_update_collision_edge_case(db).await;
});
}
#[test_traced("WARN")]
fn test_ordered_any_update_batch_create_between_collisions() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = open_variable_db(context.clone()).await;
let key1 = FixedBytes::from([0xFFu8, 0xFFu8, 5u8, 5u8]);
let key2 = FixedBytes::from([0xFFu8, 0xFFu8, 6u8, 6u8]);
let key3 = FixedBytes::from([0xFFu8, 0xFFu8, 7u8, 0u8]);
let val = Sha256::fill(1u8);
let merkleized = db
.new_batch()
.write(key1.clone(), Some(val))
.write(key3.clone(), Some(val))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get(&key1).await.unwrap().unwrap(), val);
assert!(db.get(&key2).await.unwrap().is_none());
assert_eq!(db.get(&key3).await.unwrap().unwrap(), val);
let merkleized = db
.new_batch()
.write(key2.clone(), Some(val))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get(&key1).await.unwrap().unwrap(), val);
assert_eq!(db.get(&key2).await.unwrap().unwrap(), val);
assert_eq!(db.get(&key3).await.unwrap().unwrap(), val);
let span1 = db.get_span(&key1).await.unwrap().unwrap();
assert_eq!(span1.1.next_key, key2);
let span2 = db.get_span(&key2).await.unwrap().unwrap();
assert_eq!(span2.1.next_key, key3);
let span3 = db.get_span(&key3).await.unwrap().unwrap();
assert_eq!(span3.1.next_key, key1);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_ordered_any_batch_create_delete_prev_links() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let key1 = FixedBytes::from([0x10u8, 0x00, 0x00, 0x00]);
let key2 = FixedBytes::from([0x20u8, 0x00, 0x00, 0x00]);
let key3 = FixedBytes::from([0x30u8, 0x00, 0x00, 0x00]);
let val1 = Sha256::fill(1u8);
let val2 = Sha256::fill(2u8);
let val3 = Sha256::fill(3u8);
let mut db = open_variable_db(context.with_label("first")).await;
let merkleized = db
.new_batch()
.write(key1.clone(), Some(val1))
.write(key3.clone(), Some(val3))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
let merkleized = db
.new_batch()
.write(key1.clone(), None)
.write(key2.clone(), Some(val2))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert!(db.get(&key1).await.unwrap().is_none());
assert_eq!(db.get(&key2).await.unwrap(), Some(val2));
assert_eq!(db.get(&key3).await.unwrap(), Some(val3));
let span2 = db.get_span(&key2).await.unwrap().unwrap();
assert_eq!(span2.1.next_key, key3);
let span3 = db.get_span(&key3).await.unwrap().unwrap();
assert_eq!(span3.1.next_key, key2);
db.destroy().await.unwrap();
let mut db = open_variable_db(context.with_label("second")).await;
let merkleized = db
.new_batch()
.write(key1.clone(), Some(val1))
.write(key3.clone(), Some(val3))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
let merkleized = db
.new_batch()
.write(key2.clone(), Some(val2))
.write(key3.clone(), None)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get(&key1).await.unwrap(), Some(val1));
assert_eq!(db.get(&key2).await.unwrap(), Some(val2));
assert!(db.get(&key3).await.unwrap().is_none());
let span1 = db.get_span(&key1).await.unwrap().unwrap();
assert_eq!(span1.1.next_key, key2);
let span2 = db.get_span(&key2).await.unwrap().unwrap();
assert_eq!(span2.1.next_key, key1);
db.destroy().await.unwrap();
});
}
fn is_send<T: Send>(_: T) {}
#[allow(dead_code)]
fn assert_non_trait_futures_are_send(db: &mut AnyTest, key: Digest) {
is_send(db.get_all(&key));
is_send(db.get_with_loc(&key));
is_send(db.get_span(&key));
}
#[test_traced("WARN")]
fn test_ordered_sequential_commit_basic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(10)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_a = Digest::random(&mut test_rng_seeded(800));
let val_a = vec![1u8; 10];
let parent_batch = base
.new_batch::<Sha256>()
.write(key_a, Some(val_a.clone()))
.merkleize(&db, None)
.await
.unwrap();
let key_b = Digest::random(&mut test_rng_seeded(801));
let val_b = vec![2u8; 10];
let child_batch = parent_batch
.new_batch::<Sha256>()
.write(key_b, Some(val_b.clone()))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent_batch).await.unwrap();
db.commit().await.unwrap();
db.apply_batch(child_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_ordered_sequential_commit_delete_after_insert() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(5)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_x = Digest::random(&mut test_rng_seeded(810));
let val_x = vec![10u8; 8];
let parent_batch = base
.new_batch::<Sha256>()
.write(key_x, Some(val_x.clone()))
.merkleize(&db, None)
.await
.unwrap();
let child_batch = parent_batch
.new_batch::<Sha256>()
.write(key_x, None)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_x);
db.apply_batch(child_batch).await.unwrap();
db.commit().await.unwrap();
assert!(db.get(&key_x).await.unwrap().is_none());
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
fn test_ordered_sequential_commit_overlapping_keys() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut db = create_test_db(context).await;
apply_ops(&mut db, create_test_ops(5)).await;
db.commit().await.unwrap();
let base = db.to_batch();
let key_x = Digest::random(&mut test_rng_seeded(820));
let val_a = vec![10u8; 8];
let parent_batch = base
.new_batch::<Sha256>()
.write(key_x, Some(val_a.clone()))
.merkleize(&db, None)
.await
.unwrap();
let val_b = vec![20u8; 8];
let child_batch = parent_batch
.new_batch::<Sha256>()
.write(key_x, Some(val_b.clone()))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
db.apply_batch(child_batch).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_b);
db.destroy().await.unwrap();
});
}
mod from_sync_testable {
use super::*;
use crate::{
merkle::{
mmr::{self, journaled::Mmr},
Family as _,
},
qmdb::any::sync::tests::FromSyncTestable,
};
use futures::future::join_all;
type TestMmr = Mmr<deterministic::Context, Digest>;
impl FromSyncTestable for AnyTest {
type Mmr = TestMmr;
fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
(self.log.merkle, self.log.journal)
}
async fn pinned_nodes_at(&self, loc: mmr::Location) -> Vec<Digest> {
join_all(mmr::Family::nodes_to_pin(loc).map(|p| self.log.merkle.get_node(p)))
.await
.into_iter()
.map(|n| n.unwrap().unwrap())
.collect()
}
}
}
}