use crate::{
index::{Ordered as OrderedIndex, Unordered as UnorderedIndex},
journal::{
authenticated,
contiguous::{Contiguous, Mutable, Reader},
},
merkle::{Family, Location},
qmdb::{
any::{
db::Db,
operation::{update, Operation},
ordered::{find_next_key, find_prev_key},
ValueEncoding,
},
delete_known_loc,
operation::{Key, Operation as OperationTrait},
update_known_loc,
},
Context,
};
use commonware_codec::Codec;
use commonware_cryptography::{Digest, Hasher};
use core::{iter, ops::Range};
use futures::future::try_join_all;
use std::{
collections::{BTreeMap, BTreeSet},
sync::{Arc, Weak},
};
use tracing::debug;
const MAX_CONCURRENT_READS: u64 = 64;
pub(crate) trait FloorScan<F: Family> {
fn next_candidate(&mut self, floor: Location<F>, tip: u64) -> Option<Location<F>>;
}
pub(crate) struct SequentialScan;
impl<F: Family> FloorScan<F> for SequentialScan {
fn next_candidate(&mut self, floor: Location<F>, tip: u64) -> Option<Location<F>> {
if *floor < tip {
Some(floor)
} else {
None
}
}
}
#[derive(Clone)]
pub(crate) enum DiffEntry<F: Family, V> {
Active {
value: V,
loc: Location<F>,
base_old_loc: Option<Location<F>>,
},
Deleted {
base_old_loc: Option<Location<F>>,
},
}
impl<F: Family, V> DiffEntry<F, V> {
pub(crate) const fn base_old_loc(&self) -> Option<Location<F>> {
match self {
Self::Active { base_old_loc, .. } | Self::Deleted { base_old_loc } => *base_old_loc,
}
}
pub(crate) const fn loc(&self) -> Option<Location<F>> {
match self {
Self::Active { loc, .. } => Some(*loc),
Self::Deleted { .. } => None,
}
}
pub(crate) const fn value(&self) -> Option<&V> {
match self {
Self::Active { value, .. } => Some(value),
Self::Deleted { .. } => None,
}
}
}
enum Base<F: Family, D: Digest, U: update::Update + Send + Sync>
where
Operation<F, U>: Send + Sync,
{
Db {
db_size: u64,
inactivity_floor_loc: Location<F>,
active_keys: usize,
},
Child(Arc<MerkleizedBatch<F, D, U>>),
}
impl<F: Family, D: Digest, U: update::Update + Send + Sync> Base<F, D, U>
where
Operation<F, U>: Send + Sync,
{
fn base_size(&self) -> u64 {
match self {
Self::Db { db_size, .. } => *db_size,
Self::Child(parent) => parent.total_size,
}
}
fn db_size(&self) -> u64 {
match self {
Self::Db { db_size, .. } => *db_size,
Self::Child(parent) => parent.db_size,
}
}
fn inactivity_floor_loc(&self) -> Location<F> {
match self {
Self::Db {
inactivity_floor_loc,
..
} => *inactivity_floor_loc,
Self::Child(parent) => parent.new_inactivity_floor_loc,
}
}
fn active_keys(&self) -> usize {
match self {
Self::Db { active_keys, .. } => *active_keys,
Self::Child(parent) => parent.total_active_keys,
}
}
const fn parent(&self) -> Option<&Arc<MerkleizedBatch<F, D, U>>> {
match self {
Self::Db { .. } => None,
Self::Child(parent) => Some(parent),
}
}
}
pub struct UnmerkleizedBatch<F: Family, H, U>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, U>>,
mutations: BTreeMap<U::Key, Option<U::Value>>,
base: Base<F, H::Digest, U>,
}
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct MerkleizedBatch<F: Family, D: Digest, U: update::Update + Send + Sync>
where
Operation<F, U>: Send + Sync,
{
pub(crate) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<F, U>>>,
pub(crate) diff: Arc<BTreeMap<U::Key, DiffEntry<F, U::Value>>>,
parent: Option<Weak<Self>>,
pub(crate) new_inactivity_floor_loc: Location<F>,
pub(crate) new_last_commit_loc: Location<F>,
pub(crate) base_size: u64,
pub(crate) total_size: u64,
pub(crate) total_active_keys: usize,
pub(crate) db_size: u64,
pub(crate) ancestor_diffs: Vec<Arc<BTreeMap<U::Key, DiffEntry<F, U::Value>>>>,
pub(crate) ancestor_diff_ends: Vec<u64>,
}
struct Merkleizer<F: Family, H, U>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, U>>,
ancestors: Vec<Arc<MerkleizedBatch<F, H::Digest, U>>>,
base_size: u64,
db_size: u64,
base_inactivity_floor_loc: Location<F>,
base_active_keys: usize,
}
fn resolve_in_ancestors<'a, F: Family, D: Digest, U: update::Update + Send + Sync>(
ancestors: &'a [Arc<MerkleizedBatch<F, D, U>>],
key: &U::Key,
) -> Option<&'a DiffEntry<F, U::Value>>
where
Operation<F, U>: Send + Sync,
{
for batch in ancestors {
if let Some(entry) = batch.diff.get(key) {
return Some(entry);
}
}
None
}
fn apply_snapshot_diff<F: Family, V, I: UnorderedIndex<Value = Location<F>>>(
snapshot: &mut I,
key: &impl Key,
entry: &DiffEntry<F, V>,
base_old_loc: Option<Location<F>>,
) {
match entry {
DiffEntry::Active { loc, .. } => match base_old_loc {
Some(old) => update_known_loc::<F, _>(snapshot, key, old, *loc),
None => snapshot.insert(key, *loc),
},
DiffEntry::Deleted { .. } => {
if let Some(old) = base_old_loc {
delete_known_loc::<F, _>(snapshot, key, old);
}
}
}
}
fn read_op_from_ancestors<F: Family, D: Digest, U: update::Update + Send + Sync>(
ancestors: &[Arc<MerkleizedBatch<F, D, U>>],
loc: u64,
db_size: u64,
) -> &Operation<F, U>
where
Operation<F, U>: Send + Sync,
{
for (i, batch) in ancestors.iter().enumerate() {
let batch_base = ancestors
.get(i + 1)
.map_or(db_size, |b| b.journal_batch.size());
let batch_end = batch.journal_batch.size();
if loc >= batch_base && loc < batch_end {
return &batch.journal_batch.items()[(loc - batch_base) as usize];
}
}
unreachable!("location {loc} not found in ancestor chain (db_size={db_size})")
}
impl<F: Family, H, U> Merkleizer<F, H, U>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
fn try_read_op_from_uncommitted(
&self,
loc: Location<F>,
batch_ops: &[Operation<F, U>],
) -> Option<Operation<F, U>> {
let loc = *loc;
if loc >= self.base_size {
return Some(batch_ops[(loc - self.base_size) as usize].clone());
}
if loc >= self.db_size {
return Some(read_op_from_ancestors(&self.ancestors, loc, self.db_size).clone());
}
None
}
fn try_read_op_sync<R: Reader<Item = Operation<F, U>>>(
&self,
loc: Location<F>,
batch_ops: &[Operation<F, U>],
reader: &R,
) -> Option<Operation<F, U>> {
self.try_read_op_from_uncommitted(loc, batch_ops)
.or_else(|| reader.try_read_sync(*loc))
}
async fn read_op<R: Reader<Item = Operation<F, U>>>(
&self,
loc: Location<F>,
batch_ops: &[Operation<F, U>],
reader: &R,
) -> Result<Operation<F, U>, crate::qmdb::Error<F>> {
match self.try_read_op_sync(loc, batch_ops, reader) {
Some(op) => Ok(op),
None => Ok(reader.read(*loc).await?),
}
}
async fn read_ops<R: Reader<Item = Operation<F, U>>>(
&self,
locations: &[Location<F>],
batch_ops: &[Operation<F, U>],
reader: &R,
) -> Result<Vec<Operation<F, U>>, crate::qmdb::Error<F>> {
let results: Vec<Option<Operation<F, U>>> = locations
.iter()
.map(|loc| self.try_read_op_sync(*loc, batch_ops, reader))
.collect();
let disk_results = try_join_all(
locations
.iter()
.zip(results.iter())
.filter(|(_, cached)| cached.is_none())
.map(|(loc, _)| reader.read(**loc)),
)
.await?;
let mut disk_iter = disk_results.into_iter();
Ok(results
.into_iter()
.map(|r| r.unwrap_or_else(|| disk_iter.next().expect("disk result count mismatch")))
.collect())
}
fn gather_existing_locations<E, C, I>(
&self,
mutations: &BTreeMap<U::Key, Option<U::Value>>,
db: &Db<F, E, C, I, H, U>,
include_active_collision_siblings: bool,
) -> Vec<Location<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
{
let mut locations = Vec::with_capacity(mutations.len() * 3 / 2);
if self.ancestors.is_empty() {
for key in mutations.keys() {
locations.extend(db.snapshot.get(key).copied());
}
} else {
for key in mutations.keys() {
match resolve_in_ancestors(&self.ancestors, key) {
Some(DiffEntry::Deleted { .. }) => {
}
Some(DiffEntry::Active {
loc, base_old_loc, ..
}) => {
locations.push(*loc);
if include_active_collision_siblings {
locations.extend(
db.snapshot
.get(key)
.copied()
.filter(move |loc| Some(*loc) != *base_old_loc),
);
}
}
None => {
locations.extend(db.snapshot.get(key).copied());
}
}
}
}
locations.sort();
locations.dedup();
locations
}
fn is_active_at<E, C, I>(
&self,
key: &U::Key,
loc: Location<F>,
batch_diff: &BTreeMap<U::Key, DiffEntry<F, U::Value>>,
db: &Db<F, E, C, I, H, U>,
) -> bool
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
{
if let Some(entry) = batch_diff
.get(key)
.or_else(|| resolve_in_ancestors(&self.ancestors, key))
{
return entry.loc() == Some(loc);
}
db.snapshot.get(key).any(|&l| l == loc)
}
#[allow(clippy::type_complexity)]
fn extract_parent_deleted_creates(
&self,
mutations: &mut BTreeMap<U::Key, Option<U::Value>>,
) -> BTreeMap<U::Key, (U::Value, Option<Location<F>>)> {
if self.ancestors.is_empty() {
return BTreeMap::new();
}
let mut creates = BTreeMap::new();
mutations.retain(|key, value| {
if let Some(DiffEntry::Deleted { base_old_loc }) =
resolve_in_ancestors(&self.ancestors, key)
{
if let Some(v) = value.take() {
creates.insert(key.clone(), (v, *base_old_loc));
return false;
}
}
true
});
creates
}
#[allow(clippy::too_many_arguments)]
async fn finish<E, C, I, S, R>(
self,
mut ops: Vec<Operation<F, U>>,
mut diff: BTreeMap<U::Key, DiffEntry<F, U::Value>>,
active_keys_delta: isize,
user_steps: u64,
metadata: Option<U::Value>,
mut scan: S,
reader: R,
db: &Db<F, E, C, I, H, U>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, U>>, crate::qmdb::Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
S: FloorScan<F>,
R: Reader<Item = Operation<F, U>>,
{
let total_steps = user_steps + 1;
let total_active_keys = self.base_active_keys as isize + active_keys_delta;
let mut floor = self.base_inactivity_floor_loc;
if total_active_keys > 0 {
let fixed_tip = self.base_size + ops.len() as u64;
let mut moved = 0u64;
let mut scan_from = floor;
while moved < total_steps {
let limit = ((total_steps - moved) as usize).min(MAX_CONCURRENT_READS as usize);
let mut candidates = Vec::with_capacity(limit);
while candidates.len() < limit {
let Some(candidate) = scan.next_candidate(scan_from, fixed_tip) else {
break;
};
candidates.push(candidate);
scan_from = Location::new(*candidate + 1);
}
if candidates.is_empty() {
break;
}
let resolved = self.read_ops(&candidates, &ops, &reader).await?;
for (candidate, op) in candidates.into_iter().zip(resolved) {
floor = Location::new(*candidate + 1);
let Some(key) = op.key().cloned() else {
continue; };
if !self.is_active_at(&key, candidate, &diff, db) {
continue;
}
let new_loc = Location::new(self.base_size + ops.len() as u64);
let base_old_loc = diff
.get(&key)
.or_else(|| resolve_in_ancestors(&self.ancestors, &key))
.map_or(Some(candidate), DiffEntry::base_old_loc);
let value = extract_update_value(&op);
ops.push(op);
diff.insert(
key,
DiffEntry::Active {
value,
loc: new_loc,
base_old_loc,
},
);
moved += 1;
if moved >= total_steps {
break;
}
}
}
} else {
floor = Location::new(self.base_size + ops.len() as u64);
debug!(tip = ?floor, "db is empty, raising floor to tip");
}
drop(reader);
let commit_loc = Location::new(self.base_size + ops.len() as u64);
ops.push(Operation::CommitFloor(metadata, floor));
let ops = Arc::new(ops);
let journal = db
.log
.with_mem(|base| self.journal_batch.merkleize_with(base, ops));
let ancestor_diffs: Vec<_> = self.ancestors.iter().map(|a| Arc::clone(&a.diff)).collect();
let ancestor_diff_ends: Vec<_> = self.ancestors.iter().map(|a| a.total_size).collect();
debug_assert!(total_active_keys >= 0, "active_keys underflow");
Ok(Arc::new(MerkleizedBatch {
journal_batch: journal,
diff: Arc::new(diff),
parent: self.ancestors.first().map(Arc::downgrade),
new_inactivity_floor_loc: floor,
new_last_commit_loc: commit_loc,
base_size: self.base_size,
total_size: *commit_loc + 1,
total_active_keys: total_active_keys as usize,
db_size: self.db_size,
ancestor_diffs,
ancestor_diff_ends,
}))
}
}
impl<F: Family, H, U> UnmerkleizedBatch<F, H, U>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
pub fn write(mut self, key: U::Key, value: Option<U::Value>) -> Self {
self.mutations.insert(key, value);
self
}
#[allow(clippy::type_complexity)]
fn into_parts(self) -> (BTreeMap<U::Key, Option<U::Value>>, Merkleizer<F, H, U>) {
let ancestors: Vec<_> = self.base.parent().map_or_else(Vec::new, |parent| {
let mut v = vec![Arc::clone(parent)];
v.extend(parent.ancestors());
v
});
let db_size = self.base.db_size();
let effective_db_size = ancestors.last().map_or(db_size, |oldest| {
let oldest_base =
oldest.journal_batch.size() - oldest.journal_batch.items().len() as u64;
db_size.max(oldest_base)
});
(
self.mutations,
Merkleizer {
journal_batch: self.journal_batch,
ancestors,
base_size: self.base.base_size(),
db_size: effective_db_size,
base_inactivity_floor_loc: self.base.inactivity_floor_loc(),
base_active_keys: self.base.active_keys(),
},
)
}
}
impl<F: Family, H, U> UnmerkleizedBatch<F, H, U>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
pub async fn get<E, C, I>(
&self,
key: &U::Key,
db: &Db<F, E, C, I, H, U>,
) -> Result<Option<U::Value>, crate::qmdb::Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
{
if let Some(value) = self.mutations.get(key) {
return Ok(value.clone());
}
if let Some(parent) = self.base.parent() {
if let Some(entry) = parent.diff.get(key) {
return Ok(entry.value().cloned());
}
for batch in parent.ancestors() {
if let Some(entry) = batch.diff.get(key) {
return Ok(entry.value().cloned());
}
}
}
db.get(key).await
}
}
impl<F: Family, K, V, H> UnmerkleizedBatch<F, H, update::Unordered<K, V>>
where
K: Key,
V: ValueEncoding,
H: Hasher,
Operation<F, update::Unordered<K, V>>: Codec,
{
pub async fn merkleize<E, C, I>(
self,
db: &Db<F, E, C, I, H, update::Unordered<K, V>>,
metadata: Option<V::Value>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>, crate::qmdb::Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
I: UnorderedIndex<Value = Location<F>>,
{
self.merkleize_with_floor_scan(db, metadata, SequentialScan)
.await
}
pub(crate) async fn merkleize_with_floor_scan<E, C, I, S: FloorScan<F>>(
self,
db: &Db<F, E, C, I, H, update::Unordered<K, V>>,
metadata: Option<V::Value>,
scan: S,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>, crate::qmdb::Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
I: UnorderedIndex<Value = Location<F>>,
{
let (mut mutations, m) = self.into_parts();
let locations = m.gather_existing_locations(&mutations, db, false);
let reader = db.log.reader().await;
let results = m.read_ops(&locations, &[], &reader).await?;
let mut ops: Vec<Operation<F, update::Unordered<K, V>>> =
Vec::with_capacity(mutations.len() + 1);
let mut diff: BTreeMap<K, DiffEntry<F, V::Value>> = BTreeMap::new();
let mut active_keys_delta: isize = 0;
let mut user_steps: u64 = 0;
for (op, &old_loc) in results.iter().zip(&locations) {
let key = op.key().expect("updates should have a key");
let base_old_loc = if let Some(entry) = resolve_in_ancestors(&m.ancestors, key) {
if entry.loc() != Some(old_loc) {
continue;
}
entry.base_old_loc()
} else {
Some(old_loc)
};
let Some(mutation) = mutations.remove(key) else {
continue;
};
let new_loc = Location::new(m.base_size + ops.len() as u64);
match mutation {
Some(value) => {
ops.push(Operation::Update(update::Unordered(
key.clone(),
value.clone(),
)));
diff.insert(
key.clone(),
DiffEntry::Active {
value,
loc: new_loc,
base_old_loc,
},
);
user_steps += 1;
}
None => {
ops.push(Operation::Delete(key.clone()));
diff.insert(key.clone(), DiffEntry::Deleted { base_old_loc });
active_keys_delta -= 1;
user_steps += 1;
}
}
}
let parent_deleted_creates = m.extract_parent_deleted_creates(&mut mutations);
let mut creates: Vec<(K, V::Value, Option<Location<F>>)> =
Vec::with_capacity(mutations.len() + parent_deleted_creates.len());
for (key, value) in mutations {
if let Some(value) = value {
creates.push((key, value, None));
}
}
for (key, (value, base_old_loc)) in parent_deleted_creates {
creates.push((key, value, base_old_loc));
}
creates.sort_by(|(a, _, _), (b, _, _)| a.cmp(b));
for (key, value, base_old_loc) in creates {
let new_loc = Location::new(m.base_size + ops.len() as u64);
ops.push(Operation::Update(update::Unordered(
key.clone(),
value.clone(),
)));
diff.insert(
key,
DiffEntry::Active {
value,
loc: new_loc,
base_old_loc,
},
);
active_keys_delta += 1;
}
m.finish(
ops,
diff,
active_keys_delta,
user_steps,
metadata,
scan,
reader,
db,
)
.await
}
}
impl<F: Family, K, V, H> UnmerkleizedBatch<F, H, update::Ordered<K, V>>
where
K: Key,
V: ValueEncoding,
H: Hasher,
Operation<F, update::Ordered<K, V>>: Codec,
{
pub async fn merkleize<E, C, I>(
self,
db: &Db<F, E, C, I, H, update::Ordered<K, V>>,
metadata: Option<V::Value>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>, crate::qmdb::Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
I: OrderedIndex<Value = Location<F>>,
{
self.merkleize_with_floor_scan(db, metadata, SequentialScan)
.await
}
pub(crate) async fn merkleize_with_floor_scan<E, C, I, S: FloorScan<F>>(
self,
db: &Db<F, E, C, I, H, update::Ordered<K, V>>,
metadata: Option<V::Value>,
scan: S,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>, crate::qmdb::Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
I: OrderedIndex<Value = Location<F>>,
{
let (mut mutations, m) = self.into_parts();
let locations = m.gather_existing_locations(&mutations, db, true);
let reader = db.log.reader().await;
let mut next_candidates: BTreeSet<K> = BTreeSet::new();
let mut prev_candidates: BTreeMap<K, (V::Value, Location<F>)> = BTreeMap::new();
let mut deleted: BTreeMap<K, Location<F>> = BTreeMap::new();
let mut updated: BTreeMap<K, (V::Value, Location<F>)> = BTreeMap::new();
for (op, &old_loc) in m
.read_ops(&locations, &[], &reader)
.await?
.into_iter()
.zip(&locations)
{
let update::Ordered {
key,
value,
next_key,
} = match op {
Operation::Update(data) => data,
_ => unreachable!("snapshot should only reference Update operations"),
};
next_candidates.insert(next_key);
let mutation = mutations.remove(&key);
prev_candidates.insert(key.clone(), (value, old_loc));
let Some(mutation) = mutation else {
continue;
};
if let Some(new_value) = mutation {
updated.insert(key, (new_value, old_loc));
} else {
deleted.insert(key, old_loc);
}
}
let parent_deleted_creates = m.extract_parent_deleted_creates(&mut mutations);
let mut created: Vec<(K, V::Value, Option<Location<F>>)> =
Vec::with_capacity(mutations.len() + parent_deleted_creates.len());
for (key, value) in mutations {
let Some(value) = value else {
continue; };
next_candidates.insert(key.clone());
created.push((key, value, None));
}
for (key, (value, base_old_loc)) in parent_deleted_creates {
next_candidates.insert(key.clone());
created.push((key, value, base_old_loc));
}
created.sort_by(|(a, _, _), (b, _, _)| a.cmp(b));
let mut prev_locations = Vec::new();
for key in deleted.keys().chain(created.iter().map(|(k, _, _)| k)) {
let Some((iter, _)) = db.snapshot.prev_translated_key(key) else {
continue;
};
prev_locations.extend(iter.copied());
}
prev_locations.sort();
prev_locations.dedup();
let prev_results = m.read_ops(&prev_locations, &[], &reader).await?;
for (op, &old_loc) in prev_results.into_iter().zip(&prev_locations) {
let data = match op {
Operation::Update(data) => data,
_ => unreachable!("expected update operation"),
};
next_candidates.insert(data.next_key);
prev_candidates.insert(data.key, (data.value, old_loc));
}
let ancestor_entries = {
let mut entries: BTreeMap<&K, &DiffEntry<F, V::Value>> = BTreeMap::new();
for batch in &m.ancestors {
for (key, entry) in batch.diff.iter() {
entries.entry(key).or_insert(entry);
}
}
entries
};
for (key, entry) in &ancestor_entries {
if updated.contains_key(*key)
|| created.binary_search_by(|(k, _, _)| k.cmp(*key)).is_ok()
|| deleted.contains_key(*key)
{
continue;
}
if let DiffEntry::Active { value, loc, .. } = entry {
let op = m.read_op(*loc, &[], &reader).await?;
let data = match op {
Operation::Update(data) => data,
_ => unreachable!("ancestor diff Active should reference Update op"),
};
next_candidates.insert((*key).clone());
next_candidates.insert(data.next_key);
prev_candidates.insert((*key).clone(), (value.clone(), *loc));
}
}
for key in deleted.keys() {
prev_candidates.remove(key);
next_candidates.remove(key);
}
for (key, entry) in &ancestor_entries {
if matches!(entry, DiffEntry::Deleted { .. })
&& created.binary_search_by(|(k, _, _)| k.cmp(*key)).is_err()
{
prev_candidates.remove(*key);
next_candidates.remove(*key);
}
}
let mut ops: Vec<Operation<F, update::Ordered<K, V>>> =
Vec::with_capacity(deleted.len() + updated.len() + created.len() + 1);
let mut diff: BTreeMap<K, DiffEntry<F, V::Value>> = BTreeMap::new();
let mut active_keys_delta: isize = 0;
let mut user_steps: u64 = 0;
for (key, old_loc) in &deleted {
ops.push(Operation::Delete(key.clone()));
let base_old_loc = resolve_in_ancestors(&m.ancestors, key)
.map_or(Some(*old_loc), DiffEntry::base_old_loc);
diff.insert(key.clone(), DiffEntry::Deleted { base_old_loc });
active_keys_delta -= 1;
user_steps += 1;
}
for (key, (value, old_loc)) in updated {
let new_loc = Location::new(m.base_size + ops.len() as u64);
let next_key = find_next_key(&key, &next_candidates);
ops.push(Operation::Update(update::Ordered {
key: key.clone(),
value: value.clone(),
next_key,
}));
let base_old_loc = resolve_in_ancestors(&m.ancestors, &key)
.map_or(Some(old_loc), DiffEntry::base_old_loc);
diff.insert(
key,
DiffEntry::Active {
value,
loc: new_loc,
base_old_loc,
},
);
user_steps += 1;
}
let mut created_keys: Vec<K> = Vec::with_capacity(created.len());
for (key, value, base_old_loc) in created {
created_keys.push(key.clone());
let new_loc = Location::new(m.base_size + ops.len() as u64);
let next_key = find_next_key(&key, &next_candidates);
ops.push(Operation::Update(update::Ordered {
key: key.clone(),
value: value.clone(),
next_key,
}));
diff.insert(
key,
DiffEntry::Active {
value,
loc: new_loc,
base_old_loc,
},
);
active_keys_delta += 1;
}
if !prev_candidates.is_empty() {
for key in created_keys.iter().chain(deleted.keys()) {
let (prev_key, (prev_value, prev_loc)) = find_prev_key(key, &prev_candidates);
if diff.contains_key(prev_key) {
continue;
}
let prev_new_loc = Location::new(m.base_size + ops.len() as u64);
let prev_next_key = find_next_key(prev_key, &next_candidates);
ops.push(Operation::Update(update::Ordered {
key: prev_key.clone(),
value: prev_value.clone(),
next_key: prev_next_key,
}));
let prev_base_old_loc = resolve_in_ancestors(&m.ancestors, prev_key)
.map_or(Some(*prev_loc), DiffEntry::base_old_loc);
diff.insert(
prev_key.clone(),
DiffEntry::Active {
value: prev_value.clone(),
loc: prev_new_loc,
base_old_loc: prev_base_old_loc,
},
);
user_steps += 1;
}
}
m.finish(
ops,
diff,
active_keys_delta,
user_steps,
metadata,
scan,
reader,
db,
)
.await
}
}
impl<F: Family, D: Digest, U: update::Update + Send + Sync> MerkleizedBatch<F, D, U>
where
Operation<F, U>: Send + Sync,
{
pub fn root(&self) -> D {
self.journal_batch.root()
}
pub(crate) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
let mut next = self.parent.as_ref().and_then(Weak::upgrade);
iter::from_fn(move || {
let batch = next.take()?;
next = batch.parent.as_ref().and_then(Weak::upgrade);
Some(batch)
})
}
}
impl<F: Family, D: Digest, U: update::Update + Send + Sync> MerkleizedBatch<F, D, U>
where
Operation<F, U>: Codec,
{
pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, U>
where
H: Hasher<Digest = D>,
{
UnmerkleizedBatch {
journal_batch: self.journal_batch.new_batch::<H>(),
mutations: BTreeMap::new(),
base: Base::Child(Arc::clone(self)),
}
}
pub async fn get<E, C, I, H>(
&self,
key: &U::Key,
db: &Db<F, E, C, I, H, U>,
) -> Result<Option<U::Value>, crate::qmdb::Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher<Digest = D>,
{
if let Some(entry) = self.diff.get(key) {
return Ok(entry.value().cloned());
}
for batch in self.ancestors() {
if let Some(entry) = batch.diff.get(key) {
return Ok(entry.value().cloned());
}
}
db.get(key).await
}
}
impl<F, E, C, I, H, U> Db<F, E, C, I, H, U>
where
F: Family,
E: Context,
U: update::Update + Send + Sync,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
Operation<F, U>: Codec,
{
pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, U> {
let journal_size = *self.last_commit_loc + 1;
UnmerkleizedBatch {
journal_batch: self.log.new_batch(),
mutations: BTreeMap::new(),
base: Base::Db {
db_size: journal_size,
inactivity_floor_loc: self.inactivity_floor_loc,
active_keys: self.active_keys,
},
}
}
}
impl<F, E, C, I, H, U> Db<F, E, C, I, H, U>
where
F: Family,
E: Context,
U: update::Update + Send + Sync + 'static,
C: Mutable<Item = Operation<F, U>> + crate::Persistable<Error = crate::journal::Error>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
Operation<F, U>: Codec,
{
pub async fn apply_batch(
&mut self,
batch: Arc<MerkleizedBatch<F, H::Digest, U>>,
) -> Result<Range<Location<F>>, crate::qmdb::Error<F>> {
let db_size = *self.last_commit_loc + 1;
let valid = db_size == batch.db_size
|| db_size == batch.base_size
|| batch.ancestor_diff_ends.contains(&db_size);
if !valid {
return Err(crate::qmdb::Error::StaleBatch {
db_size,
batch_db_size: batch.db_size,
batch_base_size: batch.base_size,
});
}
let start_loc = Location::new(db_size);
self.log.apply_batch(&batch.journal_batch).await?;
let mut committed_locs: BTreeMap<&U::Key, Option<Location<F>>> = BTreeMap::new();
for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
if batch.ancestor_diff_ends[i] <= db_size {
for (key, entry) in ancestor_diff.iter() {
committed_locs.entry(key).or_insert(entry.loc());
}
}
}
let mut seen = BTreeSet::<&U::Key>::new();
for (key, entry) in batch.diff.iter() {
seen.insert(key);
let base_old_loc = committed_locs
.get(key)
.copied()
.unwrap_or_else(|| entry.base_old_loc());
apply_snapshot_diff(&mut self.snapshot, key, entry, base_old_loc);
}
for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
if batch.ancestor_diff_ends[i] <= db_size {
continue;
}
for (key, entry) in ancestor_diff.iter() {
if !seen.insert(key) {
continue;
}
let base_old_loc = committed_locs
.get(key)
.copied()
.unwrap_or_else(|| entry.base_old_loc());
apply_snapshot_diff(&mut self.snapshot, key, entry, base_old_loc);
}
}
self.active_keys = batch.total_active_keys;
self.inactivity_floor_loc = batch.new_inactivity_floor_loc;
self.last_commit_loc = batch.new_last_commit_loc;
let end_loc = Location::new(*self.last_commit_loc + 1);
Ok(start_loc..end_loc)
}
}
impl<F: Family, E, C, I, H, U> Db<F, E, C, I, H, U>
where
E: Context,
U: update::Update + Send + Sync,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
Operation<F, U>: Codec,
{
pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, U>> {
let journal_size = *self.last_commit_loc + 1;
Arc::new(MerkleizedBatch {
journal_batch: self.log.to_merkleized_batch(),
diff: Arc::new(BTreeMap::new()),
parent: None,
new_inactivity_floor_loc: self.inactivity_floor_loc,
new_last_commit_loc: self.last_commit_loc,
base_size: journal_size,
total_size: journal_size,
total_active_keys: self.active_keys,
db_size: journal_size,
ancestor_diffs: Vec::new(),
ancestor_diff_ends: Vec::new(),
})
}
}
fn extract_update_value<F: Family, U: update::Update>(op: &Operation<F, U>) -> U::Value {
match op {
Operation::Update(update) => update.value().clone(),
_ => unreachable!("floor raise should only re-append Update operations"),
}
}
#[cfg(any(test, feature = "test-traits"))]
mod trait_impls {
use super::*;
use crate::qmdb::any::traits::{
BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
UnmerkleizedBatch as UnmerkleizedBatchTrait,
};
use std::future::Future;
impl<F, K, V, H, E, C, I> UnmerkleizedBatchTrait<Db<F, E, C, I, H, update::Unordered<K, V>>>
for UnmerkleizedBatch<F, H, update::Unordered<K, V>>
where
F: Family,
K: Key,
V: ValueEncoding + 'static,
H: Hasher,
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
I: UnorderedIndex<Value = Location<F>>,
Operation<F, update::Unordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Metadata = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>;
fn write(mut self, key: K, value: Option<V::Value>) -> Self {
self.mutations.insert(key, value);
self
}
fn merkleize(
self,
db: &Db<F, E, C, I, H, update::Unordered<K, V>>,
metadata: Option<V::Value>,
) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
self.merkleize(db, metadata)
}
}
impl<F, K, V, H, E, C, I> UnmerkleizedBatchTrait<Db<F, E, C, I, H, update::Ordered<K, V>>>
for UnmerkleizedBatch<F, H, update::Ordered<K, V>>
where
F: Family,
K: Key,
V: ValueEncoding + 'static,
H: Hasher,
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
I: OrderedIndex<Value = Location<F>>,
Operation<F, update::Ordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Metadata = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>;
fn write(mut self, key: K, value: Option<V::Value>) -> Self {
self.mutations.insert(key, value);
self
}
fn merkleize(
self,
db: &Db<F, E, C, I, H, update::Ordered<K, V>>,
metadata: Option<V::Value>,
) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
self.merkleize(db, metadata)
}
}
impl<F: Family, D: Digest, U: update::Update + Send + Sync + 'static> MerkleizedBatchTrait
for Arc<MerkleizedBatch<F, D, U>>
where
Operation<F, U>: Codec,
{
type Digest = D;
fn root(&self) -> D {
MerkleizedBatch::root(self)
}
}
impl<F, E, K, V, C, I, H> BatchableDb for Db<F, E, C, I, H, update::Unordered<K, V>>
where
F: Family,
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
+ crate::Persistable<Error = crate::journal::Error>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
Operation<F, update::Unordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>;
type Batch = UnmerkleizedBatch<F, H, update::Unordered<K, V>>;
fn new_batch(&self) -> Self::Batch {
self.new_batch()
}
fn apply_batch(
&mut self,
batch: Self::Merkleized,
) -> impl Future<Output = Result<Range<Location<F>>, crate::qmdb::Error<F>>> {
self.apply_batch(batch)
}
}
impl<F, E, K, V, C, I, H> BatchableDb for Db<F, E, C, I, H, update::Ordered<K, V>>
where
F: Family,
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
+ crate::Persistable<Error = crate::journal::Error>,
I: OrderedIndex<Value = Location<F>>,
H: Hasher,
Operation<F, update::Ordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>;
type Batch = UnmerkleizedBatch<F, H, update::Ordered<K, V>>;
fn new_batch(&self) -> Self::Batch {
self.new_batch()
}
fn apply_batch(
&mut self,
batch: Self::Merkleized,
) -> impl Future<Output = Result<Range<Location<F>>, crate::qmdb::Error<F>>> {
self.apply_batch(batch)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
mmr,
qmdb::any::{
ordered::fixed::Db as OrderedFixedDb,
test::{colliding_digest, fixed_db_config},
unordered::fixed::Db as UnorderedFixedDb,
},
translator::OneCap,
};
use commonware_cryptography::{sha256, Sha256};
use commonware_runtime::{deterministic, Runner as _};
fn extract_parent_deleted_creates<K: Ord + Clone, V: Clone>(
mutations: &mut BTreeMap<K, Option<V>>,
base_diff: &BTreeMap<K, DiffEntry<mmr::Family, V>>,
) -> BTreeMap<K, (V, Option<crate::mmr::Location>)> {
let creates: BTreeMap<_, _> = mutations
.iter()
.filter_map(|(key, value)| {
if let Some(DiffEntry::Deleted { base_old_loc }) = base_diff.get(key) {
if let Some(value) = value {
return Some((key.clone(), (value.clone(), *base_old_loc)));
}
}
None
})
.collect();
for key in creates.keys() {
mutations.remove(key);
}
creates
}
#[test]
fn extract_parent_deleted_creates_basic() {
let mut mutations: BTreeMap<u64, Option<u64>> = BTreeMap::new();
mutations.insert(1, Some(100)); mutations.insert(2, None); mutations.insert(3, Some(300));
let mut base_diff: BTreeMap<u64, DiffEntry<mmr::Family, u64>> = BTreeMap::new();
base_diff.insert(
1,
DiffEntry::Deleted {
base_old_loc: Some(crate::mmr::Location::new(5)),
},
);
base_diff.insert(
4,
DiffEntry::Active {
value: 400,
loc: crate::mmr::Location::new(10),
base_old_loc: None,
},
);
let creates = extract_parent_deleted_creates(&mut mutations, &base_diff);
assert_eq!(creates.len(), 1);
let (value, base_old_loc) = creates.get(&1).unwrap();
assert_eq!(*value, 100);
assert_eq!(*base_old_loc, Some(crate::mmr::Location::new(5)));
assert_eq!(mutations.len(), 2);
assert!(mutations.contains_key(&2));
assert!(mutations.contains_key(&3));
}
#[test]
fn extract_parent_deleted_creates_delete_not_extracted() {
let mut mutations: BTreeMap<u64, Option<u64>> = BTreeMap::new();
mutations.insert(1, None);
let mut base_diff: BTreeMap<u64, DiffEntry<mmr::Family, u64>> = BTreeMap::new();
base_diff.insert(
1,
DiffEntry::Deleted {
base_old_loc: Some(crate::mmr::Location::new(5)),
},
);
let creates = extract_parent_deleted_creates(&mut mutations, &base_diff);
assert!(creates.is_empty());
assert_eq!(mutations.len(), 1);
assert!(mutations.contains_key(&1));
}
#[test]
fn read_ops_resolves_committed_ancestor_and_current_sources() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
>;
let config = fixed_db_config::<OneCap>("read-locations-all-sources", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key_db = colliding_digest(0x30, 0);
let value_db = colliding_digest(0x30, 1);
let key_parent = colliding_digest(0x31, 0);
let value_parent = colliding_digest(0x31, 1);
let key_current = colliding_digest(0x32, 0);
let value_current = colliding_digest(0x32, 1);
let seed = db
.new_batch()
.write(key_db, Some(value_db))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
let committed_loc = db.snapshot.get(&key_db).next().copied().unwrap();
let parent = db
.new_batch()
.write(key_parent, Some(value_parent))
.merkleize(&db, None)
.await
.unwrap();
let parent_loc = parent.diff.get(&key_parent).unwrap().loc().unwrap();
let child = parent
.new_batch::<Sha256>()
.write(key_current, Some(value_current));
let (_mutations, merkleizer) = child.into_parts();
let current_loc = Location::new(merkleizer.base_size);
let batch_ops = vec![Operation::Update(update::Unordered(
key_current,
value_current,
))];
let reader = db.log.reader().await;
let ops = merkleizer
.read_ops(
&[committed_loc, parent_loc, current_loc],
&batch_ops,
&reader,
)
.await
.unwrap();
drop(reader);
assert_eq!(
ops,
vec![
Operation::Update(update::Unordered(key_db, value_db)),
Operation::Update(update::Unordered(key_parent, value_parent)),
Operation::Update(update::Unordered(key_current, value_current)),
]
);
let reader = db.log.reader().await;
let disk_op = merkleizer
.read_op(committed_loc, &batch_ops, &reader)
.await
.unwrap();
assert_eq!(
disk_op,
Operation::Update(update::Unordered(key_db, value_db))
);
let ancestor_op = merkleizer
.read_op(parent_loc, &batch_ops, &reader)
.await
.unwrap();
assert_eq!(
ancestor_op,
Operation::Update(update::Unordered(key_parent, value_parent))
);
let current_op = merkleizer
.read_op(current_loc, &batch_ops, &reader)
.await
.unwrap();
assert_eq!(
current_op,
Operation::Update(update::Unordered(key_current, value_current))
);
drop(reader);
db.destroy().await.unwrap();
});
}
#[test]
fn child_root_matches_between_pending_and_committed_paths_under_collisions() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
>;
let config = fixed_db_config::<OneCap>("batch-collision-regression", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key_a = colliding_digest(0xAA, 1);
let key_b = colliding_digest(0xAA, 0);
let mut initial = db.new_batch();
for i in 0..4 {
initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
}
let initial = initial.merkleize(&db, None).await.unwrap();
db.apply_batch(initial).await.unwrap();
db.commit().await.unwrap();
let parent = db
.new_batch()
.write(key_a, Some(colliding_digest(0xCC, 1)))
.merkleize(&db, None)
.await
.unwrap();
assert!(
!parent.diff.contains_key(&key_b),
"regression requires a sibling collision to remain only in the committed snapshot"
);
let pending_child = parent
.new_batch::<Sha256>()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
let pending_root = pending_child.root();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
let committed_child = db
.new_batch()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(pending_root, committed_child.root());
db.apply_batch(pending_child).await.unwrap();
assert_eq!(db.root(), committed_child.root());
db.destroy().await.unwrap();
});
}
#[test]
fn ordered_child_root_matches_between_pending_and_committed_paths_under_collisions() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = OrderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
>;
let config = fixed_db_config::<OneCap>("ordered-batch-collision-regression", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key_a = colliding_digest(0xAA, 1);
let key_b = colliding_digest(0xAA, 0);
let mut initial = db.new_batch();
for i in 0..4 {
initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
}
let initial = initial.merkleize(&db, None).await.unwrap();
db.apply_batch(initial).await.unwrap();
db.commit().await.unwrap();
let parent = db
.new_batch()
.write(key_a, Some(colliding_digest(0xCC, 1)))
.merkleize(&db, None)
.await
.unwrap();
assert!(
!parent.diff.contains_key(&key_b),
"ordered regression requires a sibling collision to remain only in the committed snapshot"
);
let pending_child = parent
.new_batch::<Sha256>()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
let pending_root = pending_child.root();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
let committed_child = db
.new_batch()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(pending_root, committed_child.root());
db.apply_batch(pending_child).await.unwrap();
assert_eq!(db.root(), committed_child.root());
db.destroy().await.unwrap();
});
}
#[test]
fn sequential_commit_basic() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
>;
let config = fixed_db_config::<OneCap>("seq-commit-basic", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let seed = db
.new_batch()
.write(colliding_digest(0x01, 0), Some(colliding_digest(0x01, 1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
let key_a = colliding_digest(0x02, 0);
let val_a = colliding_digest(0x02, 1);
let batch_a = db
.new_batch()
.write(key_a, Some(val_a))
.merkleize(&db, None)
.await
.unwrap();
let key_b = colliding_digest(0x03, 0);
let val_b = colliding_digest(0x03, 1);
let batch_b = batch_a
.new_batch::<Sha256>()
.write(key_b, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(batch_a).await.unwrap();
db.commit().await.unwrap();
let committed_b = db
.new_batch()
.write(key_b, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(batch_b.root(), committed_b.root());
db.apply_batch(batch_b).await.unwrap();
assert_eq!(db.root(), committed_b.root());
db.destroy().await.unwrap();
});
}
#[test]
fn sequential_commit_fixes_base_old_loc() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
>;
let config = fixed_db_config::<OneCap>("seq-commit-base-old-loc", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key = colliding_digest(0x10, 0);
let seed = db
.new_batch()
.write(key, Some(colliding_digest(0x10, 1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
let val_a = colliding_digest(0x10, 2);
let batch_a = db
.new_batch()
.write(key, Some(val_a))
.merkleize(&db, None)
.await
.unwrap();
let a_entry = batch_a.diff.get(&key).unwrap();
let a_loc = a_entry.loc();
assert!(a_loc.is_some());
let val_b = colliding_digest(0x10, 3);
let batch_b = batch_a
.new_batch::<Sha256>()
.write(key, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(batch_a).await.unwrap();
db.commit().await.unwrap();
let committed_b = db
.new_batch()
.write(key, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(batch_b.root(), committed_b.root());
db.apply_batch(batch_b).await.unwrap();
assert_eq!(db.root(), committed_b.root());
db.destroy().await.unwrap();
});
}
#[test]
fn fork_apply_after_parent_committed() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
>;
let config = fixed_db_config::<OneCap>("fork-after-commit", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let seed = db
.new_batch()
.write(colliding_digest(0x20, 0), Some(colliding_digest(0x20, 1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
let key_a = colliding_digest(0x21, 0);
let val_a = colliding_digest(0x21, 1);
let batch_a = db
.new_batch()
.write(key_a, Some(val_a))
.merkleize(&db, None)
.await
.unwrap();
let key_b = colliding_digest(0x22, 0);
let val_b = colliding_digest(0x22, 1);
let batch_b = batch_a
.new_batch::<Sha256>()
.write(key_b, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
let key_c = colliding_digest(0x23, 0);
let val_c = colliding_digest(0x23, 1);
let batch_c = batch_a
.new_batch::<Sha256>()
.write(key_c, Some(val_c))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(batch_a).await.unwrap();
db.commit().await.unwrap();
let committed_b = db
.new_batch()
.write(key_b, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(batch_b.root(), committed_b.root());
let committed_c = db
.new_batch()
.write(key_c, Some(val_c))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(batch_c.root(), committed_c.root());
db.destroy().await.unwrap();
});
}
#[test]
fn sequential_commit_three_deep() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
>;
let config = fixed_db_config::<OneCap>("ff-cross", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let grandparent = db
.new_batch()
.write(colliding_digest(0x01, 0), Some(colliding_digest(0x01, 1)))
.write(colliding_digest(0x02, 0), Some(colliding_digest(0x02, 1)))
.merkleize(&db, None)
.await
.unwrap();
let parent = grandparent
.new_batch::<Sha256>()
.write(colliding_digest(0x03, 0), Some(colliding_digest(0x03, 1)))
.merkleize(&db, None)
.await
.unwrap();
let child = parent
.new_batch::<Sha256>()
.write(colliding_digest(0x04, 0), Some(colliding_digest(0x04, 1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(grandparent).await.unwrap();
db.commit().await.unwrap();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
db.apply_batch(child).await.unwrap();
for i in 1..=4 {
assert_eq!(
db.get(&colliding_digest(i, 0)).await.unwrap(),
Some(colliding_digest(i, 1))
);
}
db.destroy().await.unwrap();
});
}
#[test]
fn recreate_deleted_key_with_collision_sibling_root_matches() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
>;
let config = fixed_db_config::<OneCap>("recreate-deleted-collision", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let k0 = colliding_digest(0xAA, 0);
let k6 = colliding_digest(0xAA, 6);
let initial = db
.new_batch()
.write(k0, Some(colliding_digest(0xBB, 0)))
.write(k6, Some(colliding_digest(0xBB, 6)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(initial).await.unwrap();
db.commit().await.unwrap();
let parent = db
.new_batch()
.write(k0, None)
.merkleize(&db, None)
.await
.unwrap();
let k29 = colliding_digest(0xAA, 29);
let pending_child = parent
.new_batch::<Sha256>()
.write(k0, Some(colliding_digest(0xCC, 0)))
.write(k29, Some(colliding_digest(0xCC, 29)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
let committed_child = db
.new_batch()
.write(k0, Some(colliding_digest(0xCC, 0)))
.write(k29, Some(colliding_digest(0xCC, 29)))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(
pending_child.root(),
committed_child.root(),
"root depended on pending-vs-committed parent path \
when re-creating a deleted key with collision siblings"
);
db.destroy().await.unwrap();
});
}
}