#[cfg(any(test, feature = "test-traits"))]
use crate::qmdb::any::traits::PersistableMutableLog;
use crate::{
index::Ordered as Index,
journal::contiguous::{Contiguous, Reader},
merkle::{Family, Location},
qmdb::{
any::{db::Db, ValueEncoding},
operation::{Key, Operation as OperationTrait},
},
Context,
};
use commonware_codec::Codec;
use commonware_cryptography::Hasher;
use futures::{
future::try_join_all,
stream::{self, Stream},
};
use std::{
collections::{BTreeMap, BTreeSet},
ops::Bound,
};
pub mod fixed;
pub mod variable;
pub use crate::qmdb::any::operation::{update::Ordered as Update, Ordered as Operation};
type LocatedKey<F, K, V> = Option<(Location<F>, Update<K, V>)>;
impl<
F: Family,
E: Context,
K: Key,
V: ValueEncoding,
C: Contiguous<Item = Operation<F, K, V>>,
I: Index<Value = Location<F>>,
H: Hasher,
> Db<F, E, C, I, H, Update<K, V>>
where
Operation<F, K, V>: Codec,
{
async fn get_update_op(
reader: &impl Reader<Item = Operation<F, K, V>>,
loc: Location<F>,
) -> Result<Update<K, V>, crate::qmdb::Error<F>> {
match reader.read(*loc).await? {
Operation::Update(key_data) => Ok(key_data),
_ => unreachable!("expected update operation at location {}", loc),
}
}
pub fn span_contains(span_start: &K, span_end: &K, key: &K) -> bool {
if span_start >= span_end {
if key >= span_start || key < span_end {
return true;
}
} else {
if key >= span_start && key < span_end {
return true;
}
}
false
}
async fn find_span(
&self,
locs: impl IntoIterator<Item = Location<F>>,
key: &K,
) -> Result<LocatedKey<F, K, V>, crate::qmdb::Error<F>> {
let reader = self.log.reader().await;
for loc in locs {
let data = Self::get_update_op(&reader, loc).await?;
if Self::span_contains(&data.key, &data.next_key, key) {
return Ok(Some((loc, data)));
}
}
Ok(None)
}
pub async fn get_span(&self, key: &K) -> Result<LocatedKey<F, K, V>, crate::qmdb::Error<F>> {
if self.is_empty() {
return Ok(None);
}
let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
let span = self.find_span(locs, key).await?;
if let Some(span) = span {
return Ok(Some(span));
}
let Some((iter, _)) = self.snapshot.prev_translated_key(key) else {
return Ok(None);
};
let locs: Vec<Location<F>> = iter.copied().collect();
let span = self
.find_span(locs, key)
.await?
.expect("a span that includes any given key should always exist if db is non-empty");
Ok(Some(span))
}
pub async fn get_all(&self, key: &K) -> Result<Option<(V::Value, K)>, crate::qmdb::Error<F>> {
self.get_with_loc(key)
.await
.map(|res| res.map(|(data, _)| (data.value, data.next_key)))
}
pub(crate) async fn get_with_loc(
&self,
key: &K,
) -> Result<Option<(Update<K, V>, Location<F>)>, crate::qmdb::Error<F>> {
let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
let reader = self.log.reader().await;
for loc in locs {
let op = reader.read(*loc).await?;
assert!(
op.is_update(),
"location does not reference update operation. loc={loc}"
);
if op.key().expect("update operation must have key") == key {
let Operation::Update(data) = op else {
unreachable!("expected update operation");
};
return Ok(Some((data, loc)));
}
}
Ok(None)
}
pub async fn stream_range<'a>(
&'a self,
start: K,
) -> Result<
impl Stream<Item = Result<(K, V::Value), crate::qmdb::Error<F>>> + 'a,
crate::qmdb::Error<F>,
>
where
V: 'a,
{
let start_iter = self.snapshot.get(&start);
let mut init_pending = self.fetch_all_updates(start_iter).await?;
init_pending.retain(|x| x.key >= start);
Ok(stream::unfold(
(start, init_pending),
move |(driver_key, mut pending): (K, Vec<Update<K, V>>)| async move {
if !pending.is_empty() {
let item = pending.pop().expect("pending is not empty");
return Some((Ok((item.key, item.value)), (driver_key, pending)));
}
let Some((iter, wrapped)) = self.snapshot.next_translated_key(&driver_key) else {
return None; };
if wrapped {
return None; }
match self.fetch_all_updates(iter).await {
Ok(mut pending) => {
let item = pending.pop().expect("pending is not empty");
let key = item.key.clone();
Some((Ok((item.key, item.value)), (key, pending)))
}
Err(e) => Some((Err(e), (driver_key, pending))),
}
},
))
}
async fn fetch_all_updates(
&self,
locs: impl IntoIterator<Item = &Location<F>>,
) -> Result<Vec<Update<K, V>>, crate::qmdb::Error<F>> {
let reader = self.log.reader().await;
let futures = locs
.into_iter()
.map(|loc| Self::get_update_op(&reader, *loc));
let mut updates = try_join_all(futures).await?;
updates.sort_by(|a, b| b.key.cmp(&a.key));
Ok(updates)
}
}
pub(crate) fn find_next_key<K: Ord + Clone>(key: &K, possible_next: &BTreeSet<K>) -> K {
let next = possible_next
.range((Bound::Excluded(key), Bound::Unbounded))
.next();
if let Some(next) = next {
return next.clone();
}
possible_next
.first()
.expect("possible_next should not be empty")
.clone()
}
pub(crate) fn find_prev_key<'a, K: Ord, V>(
key: &K,
possible_previous: &'a BTreeMap<K, V>,
) -> (&'a K, &'a V) {
let prev = possible_previous
.range((Bound::Unbounded, Bound::Excluded(key)))
.next_back();
if let Some(prev) = prev {
return prev;
}
possible_previous
.iter()
.next_back()
.expect("possible_previous should not be empty")
}
#[cfg(any(test, feature = "test-traits"))]
crate::qmdb::any::traits::impl_db_any! {
[E, K, V, C, I, H] Db<crate::merkle::mmr::Family, E, C, I, H, Update<K, V>>
where {
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: PersistableMutableLog<Operation<crate::merkle::mmr::Family, K, V>>,
I: Index<Value = crate::mmr::Location> + 'static,
H: Hasher,
Operation<crate::merkle::mmr::Family, K, V>: Codec,
V::Value: Send + Sync,
}
Family = crate::merkle::mmr::Family, Key = K, Value = V::Value, Digest = H::Digest
}
#[cfg(any(test, feature = "test-traits"))]
crate::qmdb::any::traits::impl_provable! {
[E, K, V, C, I, H] Db<crate::merkle::mmr::Family, E, C, I, H, Update<K, V>>
where {
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: PersistableMutableLog<Operation<crate::merkle::mmr::Family, K, V>>,
I: Index<Value = crate::mmr::Location> + 'static,
H: Hasher,
Operation<crate::merkle::mmr::Family, K, V>: Codec,
V::Value: Send + Sync,
}
Family = crate::merkle::mmr::Family, Operation = Operation<crate::merkle::mmr::Family, K, V>
}
#[cfg(any(test, feature = "test-traits"))]
crate::qmdb::any::traits::impl_db_any! {
[E, K, V, C, I, H] Db<crate::merkle::mmb::Family, E, C, I, H, Update<K, V>>
where {
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: PersistableMutableLog<Operation<crate::merkle::mmb::Family, K, V>>,
I: Index<Value = crate::merkle::Location<crate::merkle::mmb::Family>> + 'static,
H: Hasher,
Operation<crate::merkle::mmb::Family, K, V>: Codec,
V::Value: Send + Sync,
}
Family = crate::merkle::mmb::Family, Key = K, Value = V::Value, Digest = H::Digest
}
#[cfg(any(test, feature = "test-traits"))]
crate::qmdb::any::traits::impl_provable! {
[E, K, V, C, I, H] Db<crate::merkle::mmb::Family, E, C, I, H, Update<K, V>>
where {
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: PersistableMutableLog<Operation<crate::merkle::mmb::Family, K, V>>,
I: Index<Value = crate::merkle::Location<crate::merkle::mmb::Family>> + 'static,
H: Hasher,
Operation<crate::merkle::mmb::Family, K, V>: Codec,
V::Value: Send + Sync,
}
Family = crate::merkle::mmb::Family, Operation = Operation<crate::merkle::mmb::Family, K, V>
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
merkle::mmr,
qmdb::any::traits::{DbAny, UnmerkleizedBatch as _},
};
use commonware_cryptography::{sha256::Digest, Sha256};
use commonware_runtime::{deterministic::Context, Metrics};
use commonware_utils::sequence::FixedBytes;
use core::{future::Future, pin::Pin};
pub(crate) async fn test_ordered_any_db_empty<
D: DbAny<mmr::Family, Key = FixedBytes<4>, Value = Digest, Digest = Digest>,
>(
context: Context,
mut db: D,
reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
) {
assert!(db.get_metadata().await.unwrap().is_none());
assert!(matches!(
db.prune(db.inactivity_floor_loc().await).await,
Ok(())
));
let d1 = FixedBytes::from([1u8; 4]);
let d2 = Sha256::fill(2u8);
let root = db.root();
{
let _batch = db.new_batch().write(d1, Some(d2));
}
let mut db = reopen_db(context.with_label("reopen1")).await;
assert_eq!(db.root(), root);
let metadata = Sha256::fill(3u8);
let merkleized = db.new_batch().merkleize(&db, Some(metadata)).await.unwrap();
let range = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(range.start, Location::new(1));
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
let root = db.root();
assert!(matches!(
db.prune(db.inactivity_floor_loc().await).await,
Ok(())
));
let mut db = reopen_db(context.with_label("reopen2")).await;
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
assert_eq!(db.root(), root);
for _ in 1..100 {
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
let _ = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
}
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
let _ = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
db.destroy().await.unwrap();
}
pub(crate) async fn test_ordered_any_db_basic<
D: DbAny<mmr::Family, Key = FixedBytes<4>, Value = Digest, Digest = Digest>,
>(
context: Context,
mut db: D,
reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
) {
let key1 = FixedBytes::from([1u8; 4]);
let key2 = FixedBytes::from([2u8; 4]);
let val1 = Sha256::fill(3u8);
let val2 = Sha256::fill(4u8);
assert!(db.get(&key1).await.unwrap().is_none());
assert!(db.get(&key2).await.unwrap().is_none());
assert!(db.get(&key1).await.unwrap().is_none());
let merkleized = db
.new_batch()
.write(key1.clone(), Some(val1))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key1).await.unwrap().unwrap(), val1);
assert!(db.get(&key2).await.unwrap().is_none());
assert!(db.get(&key2).await.unwrap().is_none());
let merkleized = db
.new_batch()
.write(key2.clone(), Some(val2))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key1).await.unwrap().unwrap(), val1);
assert_eq!(db.get(&key2).await.unwrap().unwrap(), val2);
let merkleized = db
.new_batch()
.write(key1.clone(), None)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert!(db.get(&key1).await.unwrap().is_none());
assert_eq!(db.get(&key2).await.unwrap().unwrap(), val2);
let new_val = Sha256::fill(5u8);
let merkleized = db
.new_batch()
.write(key1.clone(), Some(new_val))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key1).await.unwrap().unwrap(), new_val);
let merkleized = db
.new_batch()
.write(key2.clone(), Some(new_val))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&key2).await.unwrap().unwrap(), new_val);
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
let _ = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert!(db.get(&key1).await.unwrap().is_some());
assert!(db.get(&key1).await.unwrap().is_some());
let merkleized = db
.new_batch()
.write(key1.clone(), None)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert!(db.get(&key2).await.unwrap().is_some());
let merkleized = db
.new_batch()
.write(key2.clone(), None)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert!(db.get(&key1).await.unwrap().is_none());
assert!(db.get(&key2).await.unwrap().is_none());
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
let _ = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert!(db.get(&key1).await.unwrap().is_none());
let key3 = FixedBytes::from([6u8; 4]);
assert!(db.get(&key3).await.unwrap().is_none());
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
let _ = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let op_count = db.bounds().await.end;
let root = db.root();
let mut db = reopen_db(context.with_label("reopen1")).await;
assert_eq!(db.bounds().await.end, op_count);
assert_eq!(db.root(), root);
let merkleized = db
.new_batch()
.write(key1.clone(), Some(val1))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let merkleized = db
.new_batch()
.write(key2.clone(), Some(val2))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let merkleized = db
.new_batch()
.write(key1.clone(), None)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let merkleized = db
.new_batch()
.write(key2.clone(), Some(val1))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let merkleized = db
.new_batch()
.write(key1.clone(), Some(val2))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
let _ = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
let op_count = db.bounds().await.end;
let root = db.root();
let mut db = reopen_db(context.with_label("reopen2")).await;
assert_eq!(db.root(), root);
assert_eq!(db.bounds().await.end, op_count);
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
let _ = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert!(db.root() != root);
let root = db.root();
db.prune(db.inactivity_floor_loc().await).await.unwrap();
assert_eq!(db.root(), root);
db.destroy().await.unwrap();
}
pub(crate) async fn test_ordered_any_update_collision_edge_case<
D: DbAny<mmr::Family, Key = FixedBytes<4>, Value = Digest, Digest = Digest>,
>(
mut db: D,
) {
let key1 = FixedBytes::from([0xFFu8, 0xFFu8, 5u8, 5u8]);
let key2 = FixedBytes::from([0xFFu8, 0xFFu8, 6u8, 6u8]);
let key3 = FixedBytes::from([0xFFu8, 0xFFu8, 0u8, 0u8]);
let val = Sha256::fill(1u8);
let merkleized = db
.new_batch()
.write(key1.clone(), Some(val))
.write(key2.clone(), Some(val))
.write(key3.clone(), Some(val))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.get(&key1).await.unwrap().unwrap(), val);
assert_eq!(db.get(&key2).await.unwrap().unwrap(), val);
assert_eq!(db.get(&key3).await.unwrap().unwrap(), val);
let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
let _ = db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
db.destroy().await.unwrap();
}
}