use crate::{
index::{Ordered as OrderedIndex, Unordered as UnorderedIndex},
journal::{
authenticated,
contiguous::{Contiguous, Mutable, Reader},
},
merkle::{Family, Location},
qmdb::{
any::{
db::Db,
operation::{update, Operation},
ordered::{find_next_key, find_prev_key},
ValueEncoding,
},
batch_chain::{self, Bounds},
bitmap::Shared,
delete_known_loc,
operation::{Key, Operation as OperationTrait},
update_known_loc,
},
Context,
};
use ahash::AHashSet;
use commonware_codec::Codec;
use commonware_cryptography::{Digest, Hasher};
use commonware_parallel::Strategy;
use commonware_utils::bitmap;
use core::ops::Range;
use std::{
collections::BTreeMap,
iter,
sync::{Arc, Weak},
};
use tracing::debug;
const MAX_CONCURRENT_READS: u64 = 64;
type DiffVec<K, F, V> = Vec<(K, DiffEntry<F, V>)>;
type DiffSlice<K, F, V> = [(K, DiffEntry<F, V>)];
type PrevCandidates<K, F, V> = Vec<(K, (V, Location<F>))>;
#[derive(Clone)]
pub(crate) enum DiffEntry<F: Family, V> {
Active {
value: V,
loc: Location<F>,
base_old_loc: Option<Location<F>>,
},
Deleted {
base_old_loc: Option<Location<F>>,
},
}
impl<F: Family, V> DiffEntry<F, V> {
pub(crate) const fn base_old_loc(&self) -> Option<Location<F>> {
match self {
Self::Active { base_old_loc, .. } | Self::Deleted { base_old_loc } => *base_old_loc,
}
}
pub(crate) const fn loc(&self) -> Option<Location<F>> {
match self {
Self::Active { loc, .. } => Some(*loc),
Self::Deleted { .. } => None,
}
}
pub(crate) const fn value(&self) -> Option<&V> {
match self {
Self::Active { value, .. } => Some(value),
Self::Deleted { .. } => None,
}
}
}
pub(crate) fn lookup_sorted<'a, K: Ord, V>(entries: &'a [(K, V)], key: &K) -> Option<&'a V> {
entries
.binary_search_by(|(candidate, _)| candidate.cmp(key))
.ok()
.map(|idx| &entries[idx].1)
}
enum Base<F: Family, D: Digest, U: update::Update + Send + Sync, S: Strategy>
where
Operation<F, U>: Send + Sync,
{
Db {
db_size: u64,
inactivity_floor_loc: Location<F>,
active_keys: usize,
},
Child(Arc<MerkleizedBatch<F, D, U, S>>),
}
impl<F: Family, D: Digest, U: update::Update + Send + Sync, S: Strategy> Base<F, D, U, S>
where
Operation<F, U>: Send + Sync,
{
fn base_size(&self) -> u64 {
match self {
Self::Db { db_size, .. } => *db_size,
Self::Child(parent) => parent.bounds.total_size,
}
}
fn db_size(&self) -> u64 {
match self {
Self::Db { db_size, .. } => *db_size,
Self::Child(parent) => parent.bounds.db_size,
}
}
fn inactivity_floor_loc(&self) -> Location<F> {
match self {
Self::Db {
inactivity_floor_loc,
..
} => *inactivity_floor_loc,
Self::Child(parent) => parent.bounds.inactivity_floor,
}
}
fn active_keys(&self) -> usize {
match self {
Self::Db { active_keys, .. } => *active_keys,
Self::Child(parent) => parent.total_active_keys,
}
}
const fn parent(&self) -> Option<&Arc<MerkleizedBatch<F, D, U, S>>> {
match self {
Self::Db { .. } => None,
Self::Child(parent) => Some(parent),
}
}
}
pub struct UnmerkleizedBatch<F: Family, H, U, S: Strategy>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, U>, S>,
mutations: BTreeMap<U::Key, Option<U::Value>>,
base: Base<F, H::Digest, U, S>,
}
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct MerkleizedBatch<F: Family, D: Digest, U: update::Update + Send + Sync, S: Strategy>
where
Operation<F, U>: Send + Sync,
{
pub(crate) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<F, U>, S>>,
pub(crate) root: D,
pub(crate) diff: Arc<DiffVec<U::Key, F, U::Value>>,
parent: Option<Weak<Self>>,
pub(crate) total_active_keys: usize,
pub(crate) ancestor_diffs: Vec<Arc<DiffVec<U::Key, F, U::Value>>>,
pub(crate) bounds: batch_chain::Bounds<F>,
}
type AncestorBatch<F, D, U, S> = Arc<MerkleizedBatch<F, D, U, S>>;
struct Merkleizer<F: Family, H, U, S: Strategy>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, U>, S>,
ancestors: Vec<AncestorBatch<F, H::Digest, U, S>>,
base_size: u64,
db_size: u64,
base_inactivity_floor_loc: Location<F>,
base_active_keys: usize,
}
fn resolve_in_ancestors<'a, F: Family, D: Digest, U: update::Update + Send + Sync, S: Strategy>(
ancestors: &'a [Arc<MerkleizedBatch<F, D, U, S>>],
key: &U::Key,
) -> Option<&'a DiffEntry<F, U::Value>>
where
Operation<F, U>: Send + Sync,
{
for batch in ancestors {
if let Some(entry) = lookup_sorted(batch.diff.as_slice(), key) {
return Some(entry);
}
}
None
}
fn apply_diff<F: Family, V, I: UnorderedIndex<Value = Location<F>>, const N: usize>(
snapshot: &mut I,
bitmap: &mut bitmap::Prunable<N>,
key: &impl Key,
entry: &DiffEntry<F, V>,
base_old_loc: Option<Location<F>>,
) {
match entry {
DiffEntry::Active { loc, .. } => match base_old_loc {
Some(old) => update_known_loc::<F, _>(snapshot, key, old, *loc),
None => snapshot.insert(key, *loc),
},
DiffEntry::Deleted { .. } => {
if let Some(old) = base_old_loc {
delete_known_loc::<F, _>(snapshot, key, old);
}
}
}
if let Some(loc) = entry.loc() {
bitmap.set_bit(*loc, true);
}
if let Some(loc) = base_old_loc {
bitmap.set_bit(*loc, false);
}
}
struct DiffMerge<'a, K, F: Family, V> {
cursors: Vec<(&'a DiffSlice<K, F, V>, usize)>,
}
impl<'a, K: Ord, F: Family, V> DiffMerge<'a, K, F, V> {
fn new(streams: impl IntoIterator<Item = &'a DiffSlice<K, F, V>>) -> Self {
Self {
cursors: streams.into_iter().map(|s| (s, 0)).collect(),
}
}
}
impl<'a, K: Ord, F: Family, V> Iterator for DiffMerge<'a, K, F, V> {
type Item = (&'a K, &'a DiffEntry<F, V>);
fn next(&mut self) -> Option<Self::Item> {
let n = self.cursors.len();
let mut winner: Option<usize> = None;
for level in 0..n {
let (slice, pos) = self.cursors[level];
let Some((k, _)) = slice.get(pos) else {
continue;
};
let better = match winner {
None => true,
Some(w) => {
let (ws, wpos) = self.cursors[w];
*k < ws[wpos].0
}
};
if better {
winner = Some(level);
}
}
let level = winner?;
let (slice, pos) = self.cursors[level];
for inner in 0..n {
let (s, p) = self.cursors[inner];
if s.get(p).is_some_and(|(k, _)| *k == slice[pos].0) {
self.cursors[inner].1 += 1;
}
}
Some((&slice[pos].0, &slice[pos].1))
}
}
struct AppliedAncestorResolver<'a, K, F: Family, V> {
cursors: Vec<(&'a DiffSlice<K, F, V>, usize)>,
}
impl<'a, K: Ord, F: Family, V> AppliedAncestorResolver<'a, K, F, V> {
fn new(applied: impl IntoIterator<Item = &'a DiffSlice<K, F, V>>) -> Self {
Self {
cursors: applied.into_iter().map(|s| (s, 0)).collect(),
}
}
fn lookup(&mut self, key: &K) -> Option<Option<Location<F>>> {
for (slice, idx) in self.cursors.iter_mut() {
while *idx < slice.len() && slice[*idx].0 < *key {
*idx += 1;
}
if *idx < slice.len() && slice[*idx].0 == *key {
return Some(slice[*idx].1.loc());
}
}
None
}
}
fn next_candidate<F: Family, const N: usize>(
bitmap: &Shared<N>,
floor: Location<F>,
tip: u64,
) -> Option<Location<F>> {
let floor = *floor;
let bitmap_len = bitmap::Readable::<N>::len(bitmap);
let committed_end = bitmap_len.min(tip);
if floor < committed_end {
if let Some(idx) = bitmap.next_one_from(floor) {
if idx < committed_end {
return Some(Location::new(idx));
}
}
}
let candidate = floor.max(bitmap_len);
(candidate < tip).then(|| Location::new(candidate))
}
fn read_op_from_ancestors<F: Family, D: Digest, U: update::Update + Send + Sync, S: Strategy>(
ancestors: &[Arc<MerkleizedBatch<F, D, U, S>>],
loc: u64,
db_size: u64,
) -> &Operation<F, U>
where
Operation<F, U>: Send + Sync,
{
for (i, batch) in ancestors.iter().enumerate() {
let batch_base = ancestors
.get(i + 1)
.map_or(db_size, |b| b.journal_batch.size());
let batch_end = batch.journal_batch.size();
if loc >= batch_base && loc < batch_end {
return &batch.journal_batch.items()[(loc - batch_base) as usize];
}
}
unreachable!("location {loc} not found in ancestor chain (db_size={db_size})")
}
impl<F: Family, H, U, S: Strategy> Merkleizer<F, H, U, S>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
fn try_read_op_from_uncommitted(
&self,
loc: Location<F>,
batch_ops: &[Operation<F, U>],
) -> Option<Operation<F, U>> {
let loc = *loc;
if loc >= self.base_size {
return Some(batch_ops[(loc - self.base_size) as usize].clone());
}
if loc >= self.db_size {
return Some(read_op_from_ancestors(&self.ancestors, loc, self.db_size).clone());
}
None
}
fn try_read_op_sync<R: Reader<Item = Operation<F, U>>>(
&self,
loc: Location<F>,
batch_ops: &[Operation<F, U>],
reader: &R,
) -> Option<Operation<F, U>> {
self.try_read_op_from_uncommitted(loc, batch_ops)
.or_else(|| reader.try_read_sync(*loc))
}
async fn read_op<R: Reader<Item = Operation<F, U>>>(
&self,
loc: Location<F>,
batch_ops: &[Operation<F, U>],
reader: &R,
) -> Result<Operation<F, U>, crate::qmdb::Error<F>> {
match self.try_read_op_sync(loc, batch_ops, reader) {
Some(op) => Ok(op),
None => Ok(reader.read(*loc).await?),
}
}
async fn read_ops<R: Reader<Item = Operation<F, U>>>(
&self,
locations: &[Location<F>],
batch_ops: &[Operation<F, U>],
reader: &R,
) -> Result<Vec<Operation<F, U>>, crate::qmdb::Error<F>> {
let results: Vec<Option<Operation<F, U>>> = locations
.iter()
.map(|loc| self.try_read_op_sync(*loc, batch_ops, reader))
.collect();
let misses: Vec<(usize, u64)> = locations
.iter()
.zip(results.iter())
.enumerate()
.filter_map(|(idx, (loc, cached))| cached.is_none().then_some((idx, **loc)))
.collect();
if misses.is_empty() {
return Ok(results.into_iter().map(Option::unwrap).collect());
}
let mut miss_positions: Vec<u64> = misses.iter().map(|(_, loc)| *loc).collect();
miss_positions.sort_unstable();
miss_positions.dedup();
let disk_results = reader.read_many(&miss_positions).await?;
let mut results = results;
for (idx, loc) in misses {
let result_idx = miss_positions
.binary_search(&loc)
.expect("disk result missing for requested location");
results[idx] = Some(disk_results[result_idx].clone());
}
Ok(results
.into_iter()
.map(|r| r.expect("operation should be resolved"))
.collect())
}
fn gather_existing_locations<E, C, I, const N: usize>(
&self,
mutations: &BTreeMap<U::Key, Option<U::Value>>,
db: &Db<F, E, C, I, H, U, N, S>,
include_active_collision_siblings: bool,
) -> Vec<Location<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
{
let mut locations = Vec::with_capacity(mutations.len() * 3 / 2);
if self.ancestors.is_empty() {
for key in mutations.keys() {
locations.extend(db.snapshot.get(key).copied());
}
} else {
for key in mutations.keys() {
match resolve_in_ancestors(&self.ancestors, key) {
Some(DiffEntry::Deleted { .. }) => {
}
Some(DiffEntry::Active {
loc, base_old_loc, ..
}) => {
locations.push(*loc);
if include_active_collision_siblings {
locations.extend(
db.snapshot
.get(key)
.copied()
.filter(move |loc| Some(*loc) != *base_old_loc),
);
}
}
None => {
locations.extend(db.snapshot.get(key).copied());
}
}
}
}
locations.sort();
locations.dedup();
locations
}
fn is_active_at<E, C, I, const N: usize>(
&self,
key: &U::Key,
loc: Location<F>,
batch_diff: &DiffSlice<U::Key, F, U::Value>,
db: &Db<F, E, C, I, H, U, N, S>,
) -> bool
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
{
let diff_entry = lookup_sorted(batch_diff, key);
if let Some(entry) = diff_entry.or_else(|| resolve_in_ancestors(&self.ancestors, key)) {
return entry.loc() == Some(loc);
}
db.snapshot.get(key).any(|&l| l == loc)
}
#[allow(clippy::type_complexity)]
fn extract_parent_deleted_creates(
&self,
mutations: &mut BTreeMap<U::Key, Option<U::Value>>,
) -> Vec<(U::Key, U::Value, Option<Location<F>>)> {
if self.ancestors.is_empty() {
return Vec::new();
}
let mut creates = Vec::new();
mutations.retain(|key, value| {
if let Some(DiffEntry::Deleted { base_old_loc }) =
resolve_in_ancestors(&self.ancestors, key)
{
if let Some(v) = value.take() {
creates.push((key.clone(), v, *base_old_loc));
return false;
}
}
true
});
creates
}
#[allow(clippy::too_many_arguments)]
async fn finish<E, C, I, R, const N: usize>(
self,
mut ops: Vec<Operation<F, U>>,
mut diff: DiffVec<U::Key, F, U::Value>,
active_keys_delta: isize,
user_steps: u64,
metadata: Option<U::Value>,
mut next_candidate: impl FnMut(Location<F>, u64) -> Option<Location<F>>,
reader: R,
db: &Db<F, E, C, I, H, U, N, S>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, U, S>>, crate::qmdb::Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
R: Reader<Item = Operation<F, U>>,
{
let total_steps = user_steps + 1;
let total_active_keys = self.base_active_keys as isize + active_keys_delta;
let mut floor = self.base_inactivity_floor_loc;
if total_active_keys > 0 {
let fixed_tip = self.base_size + ops.len() as u64;
let mut moved = 0u64;
let mut scan_from = floor;
let mut floor_diff = Vec::with_capacity(total_steps as usize);
while moved < total_steps {
let limit = ((total_steps - moved) as usize).min(MAX_CONCURRENT_READS as usize);
let mut candidates = Vec::with_capacity(limit);
while candidates.len() < limit {
let Some(candidate) = next_candidate(scan_from, fixed_tip) else {
break;
};
candidates.push(candidate);
scan_from = Location::new(*candidate + 1);
}
if candidates.is_empty() {
break;
}
let resolved = self.read_ops(&candidates, &ops, &reader).await?;
for (candidate, op) in candidates.into_iter().zip(resolved) {
floor = Location::new(*candidate + 1);
let Some(key) = op.key().cloned() else {
continue; };
if !self.is_active_at(&key, candidate, &diff, db) {
continue;
}
let new_loc = Location::new(self.base_size + ops.len() as u64);
let search = diff.binary_search_by(|(k, _)| k.cmp(&key));
let base_old_loc = match &search {
Ok(idx) => diff[*idx].1.base_old_loc(),
Err(_) => resolve_in_ancestors(&self.ancestors, &key)
.map_or(Some(candidate), DiffEntry::base_old_loc),
};
let value = extract_update_value(&op);
ops.push(op);
let new_entry = (
key,
DiffEntry::Active {
value,
loc: new_loc,
base_old_loc,
},
);
match search {
Ok(idx) => diff[idx] = new_entry,
Err(_) => floor_diff.push(new_entry),
}
moved += 1;
if moved >= total_steps {
break;
}
}
}
if !floor_diff.is_empty() {
diff.extend(floor_diff);
diff.sort_by(|a, b| a.0.cmp(&b.0));
assert!(diff.is_sorted_by(|a, b| a.0 < b.0));
}
} else {
floor = Location::new(self.base_size + ops.len() as u64);
debug!(tip = ?floor, "db is empty, raising floor to tip");
}
drop(reader);
let commit_loc = Location::<F>::new(self.base_size + ops.len() as u64);
ops.push(Operation::CommitFloor(metadata, floor));
let ops = Arc::new(ops);
let leaves = Location::new(self.base_size + ops.len() as u64);
let inactive_peaks = db.inactive_peaks(leaves, floor);
let journal = db
.log
.with_mem(|base| self.journal_batch.merkleize_with(base, ops));
let root = db
.log
.with_mem(|base| journal.root(base, &db.log.hasher, inactive_peaks))?;
let ancestor_diffs: Vec<_> = self.ancestors.iter().map(|a| Arc::clone(&a.diff)).collect();
let ancestors: Vec<_> = self
.ancestors
.iter()
.map(|a| batch_chain::AncestorBounds {
floor: a.bounds.inactivity_floor,
end: a.bounds.total_size,
})
.collect();
assert!(total_active_keys >= 0, "active_keys underflow");
Ok(Arc::new(MerkleizedBatch {
journal_batch: journal,
root,
diff: Arc::new(diff),
parent: self.ancestors.first().map(Arc::downgrade),
total_active_keys: total_active_keys as usize,
ancestor_diffs,
bounds: batch_chain::Bounds {
base_size: self.base_size,
db_size: self.db_size,
total_size: *commit_loc + 1,
ancestors,
inactivity_floor: floor,
},
}))
}
}
impl<F: Family, H, U, S: Strategy> UnmerkleizedBatch<F, H, U, S>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
pub fn write(mut self, key: U::Key, value: Option<U::Value>) -> Self {
self.mutations.insert(key, value);
self
}
#[allow(clippy::type_complexity)]
fn into_parts(self) -> (BTreeMap<U::Key, Option<U::Value>>, Merkleizer<F, H, U, S>) {
let ancestors: Vec<_> = self.base.parent().map_or_else(Vec::new, |parent| {
let mut v = vec![Arc::clone(parent)];
v.extend(parent.ancestors());
v
});
let db_size = self.base.db_size();
let effective_db_size = ancestors.last().map_or(db_size, |oldest| {
let oldest_base =
oldest.journal_batch.size() - oldest.journal_batch.items().len() as u64;
db_size.max(oldest_base)
});
(
self.mutations,
Merkleizer {
journal_batch: self.journal_batch,
ancestors,
base_size: self.base.base_size(),
db_size: effective_db_size,
base_inactivity_floor_loc: self.base.inactivity_floor_loc(),
base_active_keys: self.base.active_keys(),
},
)
}
}
impl<F: Family, H, U, S: Strategy> UnmerkleizedBatch<F, H, U, S>
where
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
pub async fn get<E, C, I, const N: usize>(
&self,
key: &U::Key,
db: &Db<F, E, C, I, H, U, N, S>,
) -> Result<Option<U::Value>, crate::qmdb::Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
{
if let Some(value) = self.mutations.get(key) {
return Ok(value.clone());
}
if let Some(parent) = self.base.parent() {
if let Some(entry) = lookup_sorted(parent.diff.as_slice(), key) {
return Ok(entry.value().cloned());
}
for batch in parent.ancestors() {
if let Some(entry) = lookup_sorted(batch.diff.as_slice(), key) {
return Ok(entry.value().cloned());
}
}
}
db.get(key).await
}
pub async fn get_many<E, C, I, const N: usize>(
&self,
keys: &[&U::Key],
db: &Db<F, E, C, I, H, U, N, S>,
) -> Result<Vec<Option<U::Value>>, crate::qmdb::Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
{
if keys.is_empty() {
return Ok(Vec::new());
}
let mut results: Vec<Option<U::Value>> = Vec::with_capacity(keys.len());
let mut db_indices = Vec::new();
let mut db_keys = Vec::new();
for (i, key) in keys.iter().enumerate() {
if let Some(value) = self.mutations.get(*key) {
results.push(value.clone());
continue;
}
let mut found = false;
if let Some(parent) = self.base.parent() {
if let Some(entry) = lookup_sorted(parent.diff.as_slice(), *key) {
results.push(entry.value().cloned());
found = true;
}
if !found {
for batch in parent.ancestors() {
if let Some(entry) = lookup_sorted(batch.diff.as_slice(), *key) {
results.push(entry.value().cloned());
found = true;
break;
}
}
}
}
if found {
continue;
}
db_indices.push(i);
db_keys.push(*key);
results.push(None);
}
if !db_keys.is_empty() {
let db_results = db.get_many(&db_keys).await?;
for (slot, value) in db_indices.into_iter().zip(db_results) {
results[slot] = value;
}
}
Ok(results)
}
}
impl<F: Family, K, V, H, S: Strategy> UnmerkleizedBatch<F, H, update::Unordered<K, V>, S>
where
K: Key,
V: ValueEncoding,
H: Hasher,
Operation<F, update::Unordered<K, V>>: Codec,
{
#[allow(clippy::type_complexity)]
#[tracing::instrument(
name = "qmdb::any::batch::merkleize",
level = "info",
skip_all,
fields(
variant = "unordered",
mutations = self.mutations.len() as u64,
),
)]
pub async fn merkleize<E, C, I, const N: usize>(
self,
db: &Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
metadata: Option<V::Value>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, S>>, crate::qmdb::Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
I: UnorderedIndex<Value = Location<F>>,
{
self.merkleize_with_floor_scan(db, metadata, |floor, tip| {
next_candidate(&db.bitmap, floor, tip)
})
.await
}
pub(crate) async fn merkleize_with_floor_scan<E, C, I, const N: usize>(
self,
db: &Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
metadata: Option<V::Value>,
next_candidate: impl FnMut(Location<F>, u64) -> Option<Location<F>>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, S>>, crate::qmdb::Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
I: UnorderedIndex<Value = Location<F>>,
{
let (mut mutations, m) = self.into_parts();
let locations = m.gather_existing_locations(&mutations, db, false);
let reader = db.log.reader().await;
let results = m.read_ops(&locations, &[], &reader).await?;
let mut ops: Vec<Operation<F, update::Unordered<K, V>>> =
Vec::with_capacity(mutations.len() + 1);
let mut diff: DiffVec<K, F, V::Value> = Vec::with_capacity(mutations.len());
let mut active_keys_delta: isize = 0;
let mut user_steps: u64 = 0;
for (op, &old_loc) in results.iter().zip(&locations) {
let key = op.key().expect("updates should have a key");
let base_old_loc = if let Some(entry) = resolve_in_ancestors(&m.ancestors, key) {
if entry.loc() != Some(old_loc) {
continue;
}
entry.base_old_loc()
} else {
Some(old_loc)
};
let Some(mutation) = mutations.remove(key) else {
continue;
};
let new_loc = Location::new(m.base_size + ops.len() as u64);
match mutation {
Some(value) => {
ops.push(Operation::Update(update::Unordered(
key.clone(),
value.clone(),
)));
diff.push((
key.clone(),
DiffEntry::Active {
value,
loc: new_loc,
base_old_loc,
},
));
user_steps += 1;
}
None => {
ops.push(Operation::Delete(key.clone()));
diff.push((key.clone(), DiffEntry::Deleted { base_old_loc }));
active_keys_delta -= 1;
user_steps += 1;
}
}
}
let parent_deleted_creates = m.extract_parent_deleted_creates(&mut mutations);
let mut creates: Vec<(K, V::Value, Option<Location<F>>)> =
Vec::with_capacity(mutations.len() + parent_deleted_creates.len());
for (key, value) in mutations {
if let Some(value) = value {
creates.push((key, value, None));
}
}
for (key, value, base_old_loc) in parent_deleted_creates {
creates.push((key, value, base_old_loc));
}
creates.sort_by(|(a, _, _), (b, _, _)| a.cmp(b));
for (key, value, base_old_loc) in creates {
let new_loc = Location::new(m.base_size + ops.len() as u64);
ops.push(Operation::Update(update::Unordered(
key.clone(),
value.clone(),
)));
diff.push((
key,
DiffEntry::Active {
value,
loc: new_loc,
base_old_loc,
},
));
active_keys_delta += 1;
}
diff.sort_by(|a, b| a.0.cmp(&b.0));
m.finish(
ops,
diff,
active_keys_delta,
user_steps,
metadata,
next_candidate,
reader,
db,
)
.await
}
}
impl<F: Family, K, V, H, S: Strategy> UnmerkleizedBatch<F, H, update::Ordered<K, V>, S>
where
K: Key,
V: ValueEncoding,
H: Hasher,
Operation<F, update::Ordered<K, V>>: Codec,
{
#[allow(clippy::type_complexity)]
#[tracing::instrument(
name = "qmdb::any::batch::merkleize",
level = "info",
skip_all,
fields(
variant = "ordered",
mutations = self.mutations.len() as u64,
),
)]
pub async fn merkleize<E, C, I, const N: usize>(
self,
db: &Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
metadata: Option<V::Value>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, S>>, crate::qmdb::Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
I: OrderedIndex<Value = Location<F>>,
{
self.merkleize_with_floor_scan(db, metadata, |floor, tip| {
next_candidate(&db.bitmap, floor, tip)
})
.await
}
pub(crate) async fn merkleize_with_floor_scan<E, C, I, const N: usize>(
self,
db: &Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
metadata: Option<V::Value>,
next_candidate: impl FnMut(Location<F>, u64) -> Option<Location<F>>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, S>>, crate::qmdb::Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
I: OrderedIndex<Value = Location<F>>,
{
let (mut mutations, m) = self.into_parts();
let locations = m.gather_existing_locations(&mutations, db, true);
let reader = db.log.reader().await;
let mut next_candidates: Vec<K> = Vec::new();
let mut prev_candidates: PrevCandidates<K, F, V::Value> = Vec::new();
let mut deleted: Vec<(K, Location<F>)> = Vec::new();
let mut updated: Vec<(K, V::Value, Location<F>)> = Vec::new();
for (op, &old_loc) in m
.read_ops(&locations, &[], &reader)
.await?
.into_iter()
.zip(&locations)
{
let update::Ordered {
key,
value,
next_key,
} = match op {
Operation::Update(data) => data,
_ => unreachable!("snapshot should only reference Update operations"),
};
next_candidates.push(next_key);
let mutation = mutations.remove(&key);
prev_candidates.push((key.clone(), (value, old_loc)));
let Some(mutation) = mutation else {
continue;
};
if let Some(new_value) = mutation {
updated.push((key, new_value, old_loc));
} else {
deleted.push((key, old_loc));
}
}
deleted.sort_by(|a, b| a.0.cmp(&b.0));
updated.sort_by(|a, b| a.0.cmp(&b.0));
let parent_deleted_creates = m.extract_parent_deleted_creates(&mut mutations);
let mut created: Vec<(K, V::Value, Option<Location<F>>)> =
Vec::with_capacity(mutations.len() + parent_deleted_creates.len());
for (key, value) in mutations {
let Some(value) = value else {
continue; };
next_candidates.push(key.clone());
created.push((key, value, None));
}
for (key, value, base_old_loc) in parent_deleted_creates {
next_candidates.push(key.clone());
created.push((key, value, base_old_loc));
}
created.sort_by(|(a, _, _), (b, _, _)| a.cmp(b));
let mut prev_locations = Vec::new();
for key in deleted
.iter()
.map(|(k, _)| k)
.chain(created.iter().map(|(k, _, _)| k))
{
let Some((iter, _)) = db.snapshot.prev_translated_key(key) else {
continue;
};
prev_locations.extend(iter.copied());
}
prev_locations.sort();
prev_locations.dedup();
let prev_results = m.read_ops(&prev_locations, &[], &reader).await?;
for (op, &old_loc) in prev_results.into_iter().zip(&prev_locations) {
let data = match op {
Operation::Update(data) => data,
_ => unreachable!("expected update operation"),
};
next_candidates.push(data.next_key);
prev_candidates.push((data.key, (data.value, old_loc)));
}
let track_shadow = m.ancestors.len() > 1;
let seen_cap = if track_shadow {
m.ancestors.iter().map(|a| a.diff.len()).sum()
} else {
0
};
let mut seen: AHashSet<&K> = AHashSet::with_capacity(seen_cap);
let mut ancestor_deleted: Vec<K> = Vec::new();
for batch in m.ancestors.iter() {
for (key, entry) in batch.diff.iter() {
if track_shadow && !seen.insert(key) {
continue;
}
if updated.binary_search_by(|(k, _, _)| k.cmp(key)).is_ok()
|| created.binary_search_by(|(k, _, _)| k.cmp(key)).is_ok()
|| deleted.binary_search_by(|(k, _)| k.cmp(key)).is_ok()
{
continue;
}
match entry {
DiffEntry::Active { value, loc, .. } => {
let op = m.read_op(*loc, &[], &reader).await?;
let data = match op {
Operation::Update(data) => data,
_ => unreachable!("ancestor diff Active should reference Update op"),
};
next_candidates.push(key.clone());
next_candidates.push(data.next_key);
prev_candidates.push((key.clone(), (value.clone(), *loc)));
}
DiffEntry::Deleted { .. } => {
ancestor_deleted.push(key.clone());
}
}
}
}
ancestor_deleted.sort();
ancestor_deleted.dedup();
next_candidates.sort();
next_candidates.dedup();
prev_candidates.sort_by(|a, b| a.0.cmp(&b.0));
prev_candidates.dedup_by(|a, b| {
if a.0 == b.0 {
std::mem::swap(a, b);
true
} else {
false
}
});
let is_deleted = |k: &K| -> bool {
deleted.binary_search_by(|(dk, _)| dk.cmp(k)).is_ok()
|| (ancestor_deleted.binary_search(k).is_ok()
&& created.binary_search_by(|(ck, _, _)| ck.cmp(k)).is_err())
};
next_candidates.retain(|k| !is_deleted(k));
prev_candidates.retain(|(k, _)| !is_deleted(k));
let mut ops: Vec<Operation<F, update::Ordered<K, V>>> =
Vec::with_capacity(deleted.len() + updated.len() + created.len() + 1);
let mut diff: DiffVec<K, F, V::Value> =
Vec::with_capacity(deleted.len() + updated.len() + created.len());
let mut active_keys_delta: isize = 0;
let mut user_steps: u64 = 0;
for (key, old_loc) in &deleted {
ops.push(Operation::Delete(key.clone()));
let base_old_loc = resolve_in_ancestors(&m.ancestors, key)
.map_or(Some(*old_loc), DiffEntry::base_old_loc);
diff.push((key.clone(), DiffEntry::Deleted { base_old_loc }));
active_keys_delta -= 1;
user_steps += 1;
}
for (key, value, old_loc) in &updated {
let new_loc = Location::new(m.base_size + ops.len() as u64);
let next_key = find_next_key(key, &next_candidates);
ops.push(Operation::Update(update::Ordered {
key: key.clone(),
value: value.clone(),
next_key,
}));
let base_old_loc = resolve_in_ancestors(&m.ancestors, key)
.map_or(Some(*old_loc), DiffEntry::base_old_loc);
diff.push((
key.clone(),
DiffEntry::Active {
value: value.clone(),
loc: new_loc,
base_old_loc,
},
));
user_steps += 1;
}
for (key, value, base_old_loc) in &created {
let new_loc = Location::new(m.base_size + ops.len() as u64);
let next_key = find_next_key(key, &next_candidates);
ops.push(Operation::Update(update::Ordered {
key: key.clone(),
value: value.clone(),
next_key,
}));
diff.push((
key.clone(),
DiffEntry::Active {
value: value.clone(),
loc: new_loc,
base_old_loc: *base_old_loc,
},
));
active_keys_delta += 1;
}
if !prev_candidates.is_empty() {
let mut rewritten_predecessors = AHashSet::with_capacity(created.len() + deleted.len());
for key in created
.iter()
.map(|(k, _, _)| k)
.chain(deleted.iter().map(|(k, _)| k))
{
let (prev_key, (prev_value, prev_loc)) = find_prev_key(key, &prev_candidates);
if deleted.binary_search_by(|(k, _)| k.cmp(prev_key)).is_ok()
|| updated
.binary_search_by(|(k, _, _)| k.cmp(prev_key))
.is_ok()
|| created
.binary_search_by(|(k, _, _)| k.cmp(prev_key))
.is_ok()
{
continue;
}
if !rewritten_predecessors.insert(prev_key.clone()) {
continue;
}
let prev_new_loc = Location::new(m.base_size + ops.len() as u64);
let prev_next_key = find_next_key(prev_key, &next_candidates);
ops.push(Operation::Update(update::Ordered {
key: prev_key.clone(),
value: prev_value.clone(),
next_key: prev_next_key,
}));
let prev_base_old_loc = resolve_in_ancestors(&m.ancestors, prev_key)
.map_or(Some(*prev_loc), DiffEntry::base_old_loc);
diff.push((
prev_key.clone(),
DiffEntry::Active {
value: prev_value.clone(),
loc: prev_new_loc,
base_old_loc: prev_base_old_loc,
},
));
user_steps += 1;
}
}
diff.sort_by(|a, b| a.0.cmp(&b.0));
m.finish(
ops,
diff,
active_keys_delta,
user_steps,
metadata,
next_candidate,
reader,
db,
)
.await
}
}
impl<F: Family, D: Digest, U: update::Update + Send + Sync, S: Strategy> MerkleizedBatch<F, D, U, S>
where
Operation<F, U>: Send + Sync,
{
pub const fn root(&self) -> D {
self.root
}
pub const fn bounds(&self) -> &Bounds<F> {
&self.bounds
}
pub(crate) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
batch_chain::ancestors(self.parent.clone(), |batch| batch.parent.as_ref())
}
}
impl<F: Family, D: Digest, U: update::Update + Send + Sync, S: Strategy> MerkleizedBatch<F, D, U, S>
where
Operation<F, U>: Codec,
{
#[tracing::instrument(
name = "qmdb::any::batch::new",
level = "debug",
skip_all,
fields(
source = "batch",
base_size = self.bounds.base_size,
total_size = self.bounds.total_size,
ancestor_batches = self.ancestor_diffs.len() as u64,
),
)]
pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, U, S>
where
H: Hasher<Digest = D>,
{
UnmerkleizedBatch {
journal_batch: self.journal_batch.new_batch::<H>(),
mutations: BTreeMap::new(),
base: Base::Child(Arc::clone(self)),
}
}
pub async fn get<E, C, I, H, const N: usize>(
&self,
key: &U::Key,
db: &Db<F, E, C, I, H, U, N, S>,
) -> Result<Option<U::Value>, crate::qmdb::Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher<Digest = D>,
{
if let Some(entry) = lookup_sorted(self.diff.as_slice(), key) {
return Ok(entry.value().cloned());
}
for batch in self.ancestors() {
if let Some(entry) = lookup_sorted(batch.diff.as_slice(), key) {
return Ok(entry.value().cloned());
}
}
db.get(key).await
}
pub async fn get_many<E, C, I, H, const N: usize>(
&self,
keys: &[&U::Key],
db: &Db<F, E, C, I, H, U, N, S>,
) -> Result<Vec<Option<U::Value>>, crate::qmdb::Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher<Digest = D>,
{
if keys.is_empty() {
return Ok(Vec::new());
}
let mut results: Vec<Option<U::Value>> = Vec::with_capacity(keys.len());
let mut db_indices = Vec::new();
let mut db_keys = Vec::new();
for (i, key) in keys.iter().enumerate() {
if let Some(entry) = lookup_sorted(self.diff.as_slice(), *key) {
results.push(entry.value().cloned());
continue;
}
let mut found = false;
for batch in self.ancestors() {
if let Some(entry) = lookup_sorted(batch.diff.as_slice(), *key) {
results.push(entry.value().cloned());
found = true;
break;
}
}
if found {
continue;
}
db_indices.push(i);
db_keys.push(*key);
results.push(None);
}
if !db_keys.is_empty() {
let db_results = db.get_many(&db_keys).await?;
for (slot, value) in db_indices.into_iter().zip(db_results) {
results[slot] = value;
}
}
Ok(results)
}
}
impl<F, E, C, I, H, U, const N: usize, S> Db<F, E, C, I, H, U, N, S>
where
F: Family,
E: Context,
U: update::Update + Send + Sync,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
#[tracing::instrument(
name = "qmdb::any::batch::new",
level = "debug",
skip_all,
fields(
source = "db",
base_size = *self.last_commit_loc + 1,
inactivity_floor = *self.inactivity_floor_loc,
active_keys = self.active_keys as u64,
),
)]
pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, U, S> {
let journal_size = *self.last_commit_loc + 1;
UnmerkleizedBatch {
journal_batch: self.log.new_batch(),
mutations: BTreeMap::new(),
base: Base::Db {
db_size: journal_size,
inactivity_floor_loc: self.inactivity_floor_loc,
active_keys: self.active_keys,
},
}
}
}
impl<F, E, C, I, H, U, const N: usize, S> Db<F, E, C, I, H, U, N, S>
where
F: Family,
E: Context,
U: update::Update + Send + Sync + 'static,
C: Mutable<Item = Operation<F, U>> + crate::Persistable<Error = crate::journal::Error>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
#[tracing::instrument(
name = "qmdb::any::Db::apply_batch",
level = "info",
skip_all,
fields(
batch_total_size = batch.bounds.total_size,
batch_base_size = batch.bounds.base_size,
db_size = *self.last_commit_loc + 1,
ancestor_batches = batch.ancestor_diffs.len() as u64,
),
)]
pub async fn apply_batch(
&mut self,
batch: Arc<MerkleizedBatch<F, H::Digest, U, S>>,
) -> Result<Range<Location<F>>, crate::qmdb::Error<F>> {
let _timer = self.metrics.operations.apply_batch_timer();
self.metrics.operations.apply_batch_calls.inc();
let db_size = *self.last_commit_loc + 1;
batch
.bounds
.validate_apply_to(db_size, self.inactivity_floor_loc)?;
let start_loc = Location::new(db_size);
self.log.apply_batch(&batch.journal_batch).await?;
{
let mut bitmap = self.bitmap.write();
bitmap.extend_to(batch.bounds.total_size);
if batch.ancestor_diffs.is_empty() {
for (key, entry) in batch.diff.iter() {
apply_diff(
&mut self.snapshot,
&mut bitmap,
key,
entry,
entry.base_old_loc(),
);
}
} else {
let mut applied = Vec::with_capacity(batch.ancestor_diffs.len());
let mut pending = Vec::with_capacity(batch.ancestor_diffs.len());
for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
if batch.bounds.ancestors[i].end <= db_size {
applied.push(ancestor_diff.as_slice());
} else {
pending.push(ancestor_diff.as_slice());
}
}
let mut resolver = AppliedAncestorResolver::new(applied);
let merge = DiffMerge::new(
iter::once(batch.diff.as_slice()).chain(pending.iter().copied()),
);
for (key, entry) in merge {
let old = resolver.lookup(key).unwrap_or_else(|| entry.base_old_loc());
apply_diff(&mut self.snapshot, &mut bitmap, key, entry, old);
}
}
bitmap.set_bit(*self.last_commit_loc, false);
bitmap.set_bit(batch.bounds.total_size - 1, true);
}
self.active_keys = batch.total_active_keys;
self.inactivity_floor_loc = batch.bounds.inactivity_floor;
self.last_commit_loc = Location::new(batch.bounds.total_size - 1);
self.root = batch.root;
let end_loc = Location::new(*self.last_commit_loc + 1);
let range = start_loc..end_loc;
self.update_metrics().await;
self.metrics
.operations
.operations_applied
.inc_by(*range.end - *range.start);
Ok(range)
}
}
impl<F: Family, E, C, I, H, U, const N: usize, S> Db<F, E, C, I, H, U, N, S>
where
E: Context,
U: update::Update + Send + Sync,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
#[tracing::instrument(
name = "qmdb::any::Db::to_batch",
level = "info",
skip_all,
fields(
db_size = *self.last_commit_loc + 1,
inactivity_floor = *self.inactivity_floor_loc,
active_keys = self.active_keys as u64,
),
)]
pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, U, S>> {
let journal_size = *self.last_commit_loc + 1;
Arc::new(MerkleizedBatch {
journal_batch: self.log.to_merkleized_batch(),
root: self.root,
diff: Arc::new(Vec::new()),
parent: None,
total_active_keys: self.active_keys,
ancestor_diffs: Vec::new(),
bounds: batch_chain::Bounds {
base_size: journal_size,
db_size: journal_size,
total_size: journal_size,
ancestors: Vec::new(),
inactivity_floor: self.inactivity_floor_loc,
},
})
}
}
fn extract_update_value<F: Family, U: update::Update>(op: &Operation<F, U>) -> U::Value {
match op {
Operation::Update(update) => update.value().clone(),
_ => unreachable!("floor raise should only re-append Update operations"),
}
}
#[cfg(any(test, feature = "test-traits"))]
mod trait_impls {
use super::*;
use crate::qmdb::any::traits::{
BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
UnmerkleizedBatch as UnmerkleizedBatchTrait,
};
use std::future::Future;
impl<F, K, V, H, E, C, I, const N: usize, S>
UnmerkleizedBatchTrait<Db<F, E, C, I, H, update::Unordered<K, V>, N, S>>
for UnmerkleizedBatch<F, H, update::Unordered<K, V>, S>
where
F: Family,
K: Key,
V: ValueEncoding + 'static,
H: Hasher,
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
I: UnorderedIndex<Value = Location<F>>,
S: Strategy,
Operation<F, update::Unordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Metadata = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, S>>;
fn write(mut self, key: K, value: Option<V::Value>) -> Self {
self.mutations.insert(key, value);
self
}
fn merkleize(
self,
db: &Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
metadata: Option<V::Value>,
) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
self.merkleize(db, metadata)
}
}
impl<F, K, V, H, E, C, I, const N: usize, S>
UnmerkleizedBatchTrait<Db<F, E, C, I, H, update::Ordered<K, V>, N, S>>
for UnmerkleizedBatch<F, H, update::Ordered<K, V>, S>
where
F: Family,
K: Key,
V: ValueEncoding + 'static,
H: Hasher,
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
I: OrderedIndex<Value = Location<F>>,
S: Strategy,
Operation<F, update::Ordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Metadata = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, S>>;
fn write(mut self, key: K, value: Option<V::Value>) -> Self {
self.mutations.insert(key, value);
self
}
fn merkleize(
self,
db: &Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
metadata: Option<V::Value>,
) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
self.merkleize(db, metadata)
}
}
impl<F: Family, D: Digest, U: update::Update + Send + Sync + 'static, S: Strategy>
MerkleizedBatchTrait for Arc<MerkleizedBatch<F, D, U, S>>
where
Operation<F, U>: Codec,
{
type Digest = D;
fn root(&self) -> D {
MerkleizedBatch::root(self)
}
}
impl<F, E, K, V, C, I, H, const N: usize, S> BatchableDb
for Db<F, E, C, I, H, update::Unordered<K, V>, N, S>
where
F: Family,
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
+ crate::Persistable<Error = crate::journal::Error>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, update::Unordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, S>>;
type Batch = UnmerkleizedBatch<F, H, update::Unordered<K, V>, S>;
fn new_batch(&self) -> Self::Batch {
self.new_batch()
}
fn apply_batch(
&mut self,
batch: Self::Merkleized,
) -> impl Future<Output = Result<Range<Location<F>>, crate::qmdb::Error<F>>> {
self.apply_batch(batch)
}
}
impl<F, E, K, V, C, I, H, const N: usize, S> BatchableDb
for Db<F, E, C, I, H, update::Ordered<K, V>, N, S>
where
F: Family,
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
+ crate::Persistable<Error = crate::journal::Error>,
I: OrderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, update::Ordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, S>>;
type Batch = UnmerkleizedBatch<F, H, update::Ordered<K, V>, S>;
fn new_batch(&self) -> Self::Batch {
self.new_batch()
}
fn apply_batch(
&mut self,
batch: Self::Merkleized,
) -> impl Future<Output = Result<Range<Location<F>>, crate::qmdb::Error<F>>> {
self.apply_batch(batch)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
mmr,
qmdb::any::{
ordered::fixed::Db as OrderedFixedDb,
test::{colliding_digest, fixed_db_config},
unordered::fixed::Db as UnorderedFixedDb,
BITMAP_CHUNK_BYTES,
},
translator::OneCap,
};
use commonware_cryptography::{sha256, Sha256};
use commonware_parallel::Sequential;
use commonware_runtime::{deterministic, Runner as _};
const BITMAP_CHUNK_BITS: u64 = bitmap::Prunable::<BITMAP_CHUNK_BYTES>::CHUNK_SIZE_BITS;
fn loc(n: u64) -> Location<mmr::Family> {
Location::new(n)
}
fn shared_with<F>(build: F) -> Shared<BITMAP_CHUNK_BYTES>
where
F: FnOnce(&mut bitmap::Prunable<BITMAP_CHUNK_BYTES>),
{
let mut bm = bitmap::Prunable::<BITMAP_CHUNK_BYTES>::new();
build(&mut bm);
Shared::new(bm)
}
fn active(value: u64, location: u64) -> DiffEntry<mmr::Family, u64> {
DiffEntry::Active {
value,
loc: loc(location),
base_old_loc: None,
}
}
fn deleted(base_old_loc: Option<u64>) -> DiffEntry<mmr::Family, u64> {
DiffEntry::Deleted {
base_old_loc: base_old_loc.map(loc),
}
}
#[test]
fn diff_merge_returns_sorted_newest_entries() {
let child = vec![(2, active(20, 20)), (5, active(50, 50))];
let parent = vec![
(1, active(11, 11)),
(2, active(12, 12)),
(4, deleted(Some(4))),
(7, active(17, 17)),
];
let grandparent = vec![
(2, active(102, 102)),
(3, active(103, 103)),
(4, active(104, 104)),
(6, active(106, 106)),
];
let merged: Vec<_> =
DiffMerge::new([child.as_slice(), parent.as_slice(), grandparent.as_slice()])
.map(|(key, entry)| (*key, entry.value().copied(), entry.loc()))
.collect();
assert_eq!(
merged,
vec![
(1, Some(11), Some(loc(11))),
(2, Some(20), Some(loc(20))),
(3, Some(103), Some(loc(103))),
(4, None, None),
(5, Some(50), Some(loc(50))),
(6, Some(106), Some(loc(106))),
(7, Some(17), Some(loc(17))),
]
);
}
#[test]
fn applied_ancestor_resolver_uses_nearest_touch() {
let parent = vec![(2, active(20, 20)), (5, deleted(Some(5)))];
let grandparent = vec![
(2, active(200, 200)),
(4, active(40, 40)),
(5, active(50, 50)),
];
let mut resolver =
AppliedAncestorResolver::new([parent.as_slice(), grandparent.as_slice()]);
assert_eq!(resolver.lookup(&1), None);
assert_eq!(resolver.lookup(&2), Some(Some(loc(20))));
assert_eq!(resolver.lookup(&4), Some(Some(loc(40))));
assert_eq!(resolver.lookup(&5), Some(None));
assert_eq!(resolver.lookup(&9), None);
}
#[test]
fn bitmap_scan_empty() {
let bitmap = shared_with(|_| {});
assert_eq!(next_candidate(&bitmap, loc(0), 0), None);
}
#[test]
fn bitmap_scan_uncommitted_tail() {
let bitmap = shared_with(|_| {});
assert_eq!(next_candidate(&bitmap, loc(0), 3), Some(loc(0)));
assert_eq!(next_candidate(&bitmap, loc(1), 3), Some(loc(1)));
assert_eq!(next_candidate(&bitmap, loc(2), 3), Some(loc(2)));
assert_eq!(next_candidate(&bitmap, loc(3), 3), None);
}
#[test]
fn bitmap_scan_committed_region() {
let bitmap = shared_with(|bm| {
bm.extend_to(10);
bm.set_bit(*loc(3), true);
bm.set_bit(*loc(7), true);
});
assert_eq!(next_candidate(&bitmap, loc(0), 10), Some(loc(3)));
assert_eq!(next_candidate(&bitmap, loc(4), 10), Some(loc(7)));
assert_eq!(next_candidate(&bitmap, loc(8), 10), None);
assert_eq!(next_candidate(&bitmap, loc(0), 5), Some(loc(3)));
assert_eq!(next_candidate(&bitmap, loc(4), 5), None);
}
#[test]
fn bitmap_scan_transitions_into_tail() {
let bitmap = shared_with(|bm| {
bm.extend_to(5);
bm.set_bit(*loc(2), true);
});
assert_eq!(next_candidate(&bitmap, loc(0), 8), Some(loc(2)));
assert_eq!(next_candidate(&bitmap, loc(3), 8), Some(loc(5)));
assert_eq!(next_candidate(&bitmap, loc(6), 8), Some(loc(6)));
assert_eq!(next_candidate(&bitmap, loc(8), 8), None);
}
#[test]
fn bitmap_scan_after_prune() {
let bitmap = shared_with(|bm| {
bm.extend_to(BITMAP_CHUNK_BITS * 3);
bm.set_bit(*loc(BITMAP_CHUNK_BITS * 2 + 5), true);
bm.prune_to_bit(BITMAP_CHUNK_BITS * 2);
});
assert_eq!(
commonware_utils::bitmap::Readable::pruned_chunks(&bitmap),
2
);
assert_eq!(
next_candidate(&bitmap, loc(BITMAP_CHUNK_BITS * 2), BITMAP_CHUNK_BITS * 3),
Some(loc(BITMAP_CHUNK_BITS * 2 + 5))
);
}
#[test]
fn bitmap_scan_after_truncate() {
let bitmap = shared_with(|bm| {
bm.extend_to(BITMAP_CHUNK_BITS * 2);
bm.set_bit(*loc(BITMAP_CHUNK_BITS + 3), true);
bm.truncate(BITMAP_CHUNK_BITS);
});
assert_eq!(
commonware_utils::bitmap::Readable::<BITMAP_CHUNK_BYTES>::len(&bitmap),
BITMAP_CHUNK_BITS
);
assert_eq!(next_candidate(&bitmap, loc(0), BITMAP_CHUNK_BITS), None);
}
fn extract_parent_deleted_creates<K: Ord + Clone, V: Clone>(
mutations: &mut BTreeMap<K, Option<V>>,
base_diff: &[(K, DiffEntry<mmr::Family, V>)],
) -> Vec<(K, V, Option<crate::mmr::Location>)> {
let creates: Vec<_> = mutations
.iter()
.filter_map(|(key, value)| {
if let Some(DiffEntry::Deleted { base_old_loc }) = lookup_sorted(base_diff, key) {
if let Some(value) = value {
return Some((key.clone(), value.clone(), *base_old_loc));
}
}
None
})
.collect();
for (key, _, _) in &creates {
mutations.remove(key);
}
creates
}
#[test]
fn extract_parent_deleted_creates_basic() {
let mut mutations: BTreeMap<u64, Option<u64>> = BTreeMap::new();
mutations.insert(1, Some(100)); mutations.insert(2, None); mutations.insert(3, Some(300));
let mut base_diff: Vec<(u64, DiffEntry<mmr::Family, u64>)> = vec![
(
1,
DiffEntry::Deleted {
base_old_loc: Some(crate::mmr::Location::new(5)),
},
),
(
4,
DiffEntry::Active {
value: 400,
loc: crate::mmr::Location::new(10),
base_old_loc: None,
},
),
];
base_diff.sort_by_key(|a| a.0);
let creates = extract_parent_deleted_creates(&mut mutations, &base_diff);
assert_eq!(creates.len(), 1);
let (key, value, base_old_loc) = creates.first().unwrap();
assert_eq!(*key, 1);
assert_eq!(*value, 100);
assert_eq!(*base_old_loc, Some(crate::mmr::Location::new(5)));
assert_eq!(mutations.len(), 2);
assert!(mutations.contains_key(&2));
assert!(mutations.contains_key(&3));
}
#[test]
fn extract_parent_deleted_creates_delete_not_extracted() {
let mut mutations: BTreeMap<u64, Option<u64>> = BTreeMap::new();
mutations.insert(1, None);
let base_diff: Vec<(u64, DiffEntry<mmr::Family, u64>)> = vec![(
1,
DiffEntry::Deleted {
base_old_loc: Some(crate::mmr::Location::new(5)),
},
)];
let creates = extract_parent_deleted_creates(&mut mutations, &base_diff);
assert!(creates.is_empty());
assert_eq!(mutations.len(), 1);
assert!(mutations.contains_key(&1));
}
#[test]
fn apply_batch_merges_committed_and_uncommitted_overlaps() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("mixed-ancestor-overlaps", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key_update = Sha256::hash(b"update-through-all-layers");
let key_recreate_then_delete = Sha256::hash(b"recreate-then-delete");
let key_delete_from_uncommitted = Sha256::hash(b"delete-from-uncommitted");
let key_uncommitted_create = Sha256::hash(b"uncommitted-create");
let seed = db
.new_batch()
.write(key_update, Some(Sha256::hash(b"seed-update")))
.write(
key_recreate_then_delete,
Some(Sha256::hash(b"seed-recreate")),
)
.write(
key_delete_from_uncommitted,
Some(Sha256::hash(b"seed-delete")),
)
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
let applied = db
.new_batch()
.write(key_update, Some(Sha256::hash(b"committed-update")))
.write(key_recreate_then_delete, None)
.write(
key_delete_from_uncommitted,
Some(Sha256::hash(b"committed-delete-base")),
)
.merkleize(&db, None)
.await
.unwrap();
let pending = applied
.new_batch::<Sha256>()
.write(key_update, Some(Sha256::hash(b"uncommitted-update")))
.write(
key_recreate_then_delete,
Some(Sha256::hash(b"uncommitted-recreate")),
)
.write(key_delete_from_uncommitted, None)
.write(
key_uncommitted_create,
Some(Sha256::hash(b"uncommitted-create")),
)
.merkleize(&db, None)
.await
.unwrap();
let final_update = Sha256::hash(b"child-update");
let child = pending
.new_batch::<Sha256>()
.write(key_update, Some(final_update))
.write(key_recreate_then_delete, None)
.merkleize(&db, None)
.await
.unwrap();
let expected_root = child.root();
db.apply_batch(applied).await.unwrap();
db.apply_batch(child).await.unwrap();
assert_eq!(db.root(), expected_root);
assert_eq!(db.get(&key_update).await.unwrap(), Some(final_update));
assert_eq!(db.get(&key_recreate_then_delete).await.unwrap(), None);
assert_eq!(db.get(&key_delete_from_uncommitted).await.unwrap(), None);
assert_eq!(
db.get(&key_uncommitted_create).await.unwrap(),
Some(Sha256::hash(b"uncommitted-create"))
);
db.destroy().await.unwrap();
});
}
#[test]
fn read_ops_resolves_committed_ancestor_and_current_sources() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("read-locations-all-sources", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key_db = colliding_digest(0x30, 0);
let value_db = colliding_digest(0x30, 1);
let key_parent = colliding_digest(0x31, 0);
let value_parent = colliding_digest(0x31, 1);
let key_current = colliding_digest(0x32, 0);
let value_current = colliding_digest(0x32, 1);
let seed = db
.new_batch()
.write(key_db, Some(value_db))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
let committed_loc = db.snapshot.get(&key_db).next().copied().unwrap();
let parent = db
.new_batch()
.write(key_parent, Some(value_parent))
.merkleize(&db, None)
.await
.unwrap();
let parent_loc = lookup_sorted(parent.diff.as_slice(), &key_parent)
.unwrap()
.loc()
.unwrap();
let child = parent
.new_batch::<Sha256>()
.write(key_current, Some(value_current));
let (_mutations, merkleizer) = child.into_parts();
let current_loc = Location::new(merkleizer.base_size);
let batch_ops = vec![Operation::Update(update::Unordered(
key_current,
value_current,
))];
let reader = db.log.reader().await;
let ops = merkleizer
.read_ops(
&[current_loc, committed_loc, parent_loc, committed_loc],
&batch_ops,
&reader,
)
.await
.unwrap();
drop(reader);
assert_eq!(
ops,
vec![
Operation::Update(update::Unordered(key_current, value_current)),
Operation::Update(update::Unordered(key_db, value_db)),
Operation::Update(update::Unordered(key_parent, value_parent)),
Operation::Update(update::Unordered(key_db, value_db)),
]
);
let reader = db.log.reader().await;
let disk_op = merkleizer
.read_op(committed_loc, &batch_ops, &reader)
.await
.unwrap();
assert_eq!(
disk_op,
Operation::Update(update::Unordered(key_db, value_db))
);
let ancestor_op = merkleizer
.read_op(parent_loc, &batch_ops, &reader)
.await
.unwrap();
assert_eq!(
ancestor_op,
Operation::Update(update::Unordered(key_parent, value_parent))
);
let current_op = merkleizer
.read_op(current_loc, &batch_ops, &reader)
.await
.unwrap();
assert_eq!(
current_op,
Operation::Update(update::Unordered(key_current, value_current))
);
drop(reader);
db.destroy().await.unwrap();
});
}
#[test]
fn child_root_matches_between_pending_and_committed_paths_under_collisions() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("batch-collision-regression", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key_a = colliding_digest(0xAA, 1);
let key_b = colliding_digest(0xAA, 0);
let mut initial = db.new_batch();
for i in 0..4 {
initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
}
let initial = initial.merkleize(&db, None).await.unwrap();
db.apply_batch(initial).await.unwrap();
db.commit().await.unwrap();
let parent = db
.new_batch()
.write(key_a, Some(colliding_digest(0xCC, 1)))
.merkleize(&db, None)
.await
.unwrap();
assert!(
!parent.diff.iter().any(|(k, _)| k == &key_b),
"regression requires a sibling collision to remain only in the committed snapshot"
);
let pending_child = parent
.new_batch::<Sha256>()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
let pending_root = pending_child.root();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
let committed_child = db
.new_batch()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(pending_root, committed_child.root());
db.apply_batch(pending_child).await.unwrap();
assert_eq!(db.root(), committed_child.root());
db.destroy().await.unwrap();
});
}
#[test]
fn ordered_child_root_matches_between_pending_and_committed_paths_under_collisions() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = OrderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("ordered-batch-collision-regression", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key_a = colliding_digest(0xAA, 1);
let key_b = colliding_digest(0xAA, 0);
let mut initial = db.new_batch();
for i in 0..4 {
initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
}
let initial = initial.merkleize(&db, None).await.unwrap();
db.apply_batch(initial).await.unwrap();
db.commit().await.unwrap();
let parent = db
.new_batch()
.write(key_a, Some(colliding_digest(0xCC, 1)))
.merkleize(&db, None)
.await
.unwrap();
assert!(
!parent.diff.iter().any(|(k, _)| k == &key_b),
"ordered regression requires a sibling collision to remain only in the committed snapshot"
);
let pending_child = parent
.new_batch::<Sha256>()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
let pending_root = pending_child.root();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
let committed_child = db
.new_batch()
.write(key_a, Some(colliding_digest(0xDD, 1)))
.write(key_b, Some(colliding_digest(0xDD, 0)))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(pending_root, committed_child.root());
db.apply_batch(pending_child).await.unwrap();
assert_eq!(db.root(), committed_child.root());
db.destroy().await.unwrap();
});
}
#[test]
fn sequential_commit_basic() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("seq-commit-basic", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let seed = db
.new_batch()
.write(colliding_digest(0x01, 0), Some(colliding_digest(0x01, 1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
let key_a = colliding_digest(0x02, 0);
let val_a = colliding_digest(0x02, 1);
let batch_a = db
.new_batch()
.write(key_a, Some(val_a))
.merkleize(&db, None)
.await
.unwrap();
let key_b = colliding_digest(0x03, 0);
let val_b = colliding_digest(0x03, 1);
let batch_b = batch_a
.new_batch::<Sha256>()
.write(key_b, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(batch_a).await.unwrap();
db.commit().await.unwrap();
let committed_b = db
.new_batch()
.write(key_b, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(batch_b.root(), committed_b.root());
db.apply_batch(batch_b).await.unwrap();
assert_eq!(db.root(), committed_b.root());
db.destroy().await.unwrap();
});
}
#[test]
fn sequential_commit_fixes_base_old_loc() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("seq-commit-base-old-loc", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key = colliding_digest(0x10, 0);
let seed = db
.new_batch()
.write(key, Some(colliding_digest(0x10, 1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
let val_a = colliding_digest(0x10, 2);
let batch_a = db
.new_batch()
.write(key, Some(val_a))
.merkleize(&db, None)
.await
.unwrap();
let a_entry = lookup_sorted(batch_a.diff.as_slice(), &key).unwrap();
let a_loc = a_entry.loc();
assert!(a_loc.is_some());
let val_b = colliding_digest(0x10, 3);
let batch_b = batch_a
.new_batch::<Sha256>()
.write(key, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(batch_a).await.unwrap();
db.commit().await.unwrap();
let committed_b = db
.new_batch()
.write(key, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(batch_b.root(), committed_b.root());
db.apply_batch(batch_b).await.unwrap();
assert_eq!(db.root(), committed_b.root());
db.destroy().await.unwrap();
});
}
#[test]
fn fork_apply_after_parent_committed() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("fork-after-commit", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let seed = db
.new_batch()
.write(colliding_digest(0x20, 0), Some(colliding_digest(0x20, 1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
let key_a = colliding_digest(0x21, 0);
let val_a = colliding_digest(0x21, 1);
let batch_a = db
.new_batch()
.write(key_a, Some(val_a))
.merkleize(&db, None)
.await
.unwrap();
let key_b = colliding_digest(0x22, 0);
let val_b = colliding_digest(0x22, 1);
let batch_b = batch_a
.new_batch::<Sha256>()
.write(key_b, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
let key_c = colliding_digest(0x23, 0);
let val_c = colliding_digest(0x23, 1);
let batch_c = batch_a
.new_batch::<Sha256>()
.write(key_c, Some(val_c))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(batch_a).await.unwrap();
db.commit().await.unwrap();
let committed_b = db
.new_batch()
.write(key_b, Some(val_b))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(batch_b.root(), committed_b.root());
let committed_c = db
.new_batch()
.write(key_c, Some(val_c))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(batch_c.root(), committed_c.root());
db.destroy().await.unwrap();
});
}
#[test]
fn sequential_commit_three_deep() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("ff-cross", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let grandparent = db
.new_batch()
.write(colliding_digest(0x01, 0), Some(colliding_digest(0x01, 1)))
.write(colliding_digest(0x02, 0), Some(colliding_digest(0x02, 1)))
.merkleize(&db, None)
.await
.unwrap();
let parent = grandparent
.new_batch::<Sha256>()
.write(colliding_digest(0x03, 0), Some(colliding_digest(0x03, 1)))
.merkleize(&db, None)
.await
.unwrap();
let child = parent
.new_batch::<Sha256>()
.write(colliding_digest(0x04, 0), Some(colliding_digest(0x04, 1)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(grandparent).await.unwrap();
db.commit().await.unwrap();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
db.apply_batch(child).await.unwrap();
for i in 1..=4 {
assert_eq!(
db.get(&colliding_digest(i, 0)).await.unwrap(),
Some(colliding_digest(i, 1))
);
}
db.destroy().await.unwrap();
});
}
#[test]
fn recreate_deleted_key_with_collision_sibling_root_matches() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("recreate-deleted-collision", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let k0 = colliding_digest(0xAA, 0);
let k6 = colliding_digest(0xAA, 6);
let initial = db
.new_batch()
.write(k0, Some(colliding_digest(0xBB, 0)))
.write(k6, Some(colliding_digest(0xBB, 6)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(initial).await.unwrap();
db.commit().await.unwrap();
let parent = db
.new_batch()
.write(k0, None)
.merkleize(&db, None)
.await
.unwrap();
let k29 = colliding_digest(0xAA, 29);
let pending_child = parent
.new_batch::<Sha256>()
.write(k0, Some(colliding_digest(0xCC, 0)))
.write(k29, Some(colliding_digest(0xCC, 29)))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
let committed_child = db
.new_batch()
.write(k0, Some(colliding_digest(0xCC, 0)))
.write(k29, Some(colliding_digest(0xCC, 29)))
.merkleize(&db, None)
.await
.unwrap();
assert_eq!(
pending_child.root(),
committed_child.root(),
"root depended on pending-vs-committed parent path \
when re-creating a deleted key with collision siblings"
);
db.destroy().await.unwrap();
});
}
#[test]
fn get_many_resolves_mutation_parent_and_db() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type TestDb = UnorderedFixedDb<
mmr::Family,
deterministic::Context,
sha256::Digest,
sha256::Digest,
Sha256,
OneCap,
Sequential,
>;
let config = fixed_db_config::<OneCap>("get-many-basic", &context);
let mut db = TestDb::init(context, config).await.unwrap();
let key_db = colliding_digest(0x40, 0);
let val_db = colliding_digest(0x40, 1);
let key_parent = colliding_digest(0x41, 0);
let val_parent = colliding_digest(0x41, 1);
let key_batch = colliding_digest(0x42, 0);
let val_batch = colliding_digest(0x42, 1);
let key_missing = colliding_digest(0x43, 0);
let seed = db
.new_batch()
.write(key_db, Some(val_db))
.merkleize(&db, None)
.await
.unwrap();
db.apply_batch(seed).await.unwrap();
db.commit().await.unwrap();
let results = db.get_many(&[&key_db, &key_missing]).await.unwrap();
assert_eq!(results, vec![Some(val_db), None]);
let batch = db.new_batch().write(key_batch, Some(val_batch));
let results = batch
.get_many(&[&key_batch, &key_db, &key_missing], &db)
.await
.unwrap();
assert_eq!(results, vec![Some(val_batch), Some(val_db), None]);
let parent = db
.new_batch()
.write(key_parent, Some(val_parent))
.merkleize(&db, None)
.await
.unwrap();
let child = parent
.new_batch::<Sha256>()
.write(key_batch, Some(val_batch));
let results = child
.get_many(&[&key_batch, &key_parent, &key_db, &key_missing], &db)
.await
.unwrap();
assert_eq!(
results,
vec![Some(val_batch), Some(val_parent), Some(val_db), None]
);
let results = parent
.get_many(&[&key_parent, &key_db, &key_missing], &db)
.await
.unwrap();
assert_eq!(results, vec![Some(val_parent), Some(val_db), None]);
let results: Vec<Option<sha256::Digest>> =
db.get_many(&([] as [&sha256::Digest; 0])).await.unwrap();
assert!(results.is_empty());
db.destroy().await.unwrap();
});
}
}