use crate::{
index::Unordered as UnorderedIndex,
journal::contiguous::{Contiguous, Mutable},
merkle::{
self, batch::MerkleizedBatch as GenericMerkleizedBatch, mem::Mem,
storage::Storage as MerkleStorage, Graftable, Location, Position, Readable,
},
qmdb::{
self,
any::{
self,
batch::{lookup_sorted, DiffEntry},
operation::{update, Operation},
ValueEncoding,
},
batch_chain::Bounds,
bitmap::Shared,
current::{
db::{compute_db_root, compute_grafted_leaves},
grafting,
},
operation::Key,
Error,
},
Context,
};
use ahash::AHasher;
use commonware_codec::Codec;
use commonware_cryptography::{Digest, Hasher};
use commonware_parallel::Strategy;
use commonware_utils::bitmap::{self, Readable as _};
use std::{
collections::{BTreeSet, HashMap},
hash::BuildHasherDefault,
sync::Arc,
};
#[derive(Clone, Debug, Default)]
pub(crate) struct ChunkOverlay<const N: usize> {
pub(crate) chunks: HashMap<usize, [u8; N], BuildHasherDefault<AHasher>>,
pub(crate) len: u64,
}
impl<const N: usize> ChunkOverlay<N> {
const CHUNK_BITS: u64 = bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
fn new(len: u64) -> Self {
Self {
chunks: HashMap::default(),
len,
}
}
fn chunk_mut<B: bitmap::Readable<N>>(&mut self, base: &B, idx: usize) -> &mut [u8; N] {
self.chunks.entry(idx).or_insert_with(|| {
let base_len = base.len();
let base_complete = base.complete_chunks();
let base_has_partial = !base_len.is_multiple_of(Self::CHUNK_BITS);
if idx < base_complete {
base.get_chunk(idx)
} else if idx == base_complete && base_has_partial {
base.last_chunk().0
} else {
[0u8; N]
}
})
}
fn set_bit<B: bitmap::Readable<N>>(&mut self, base: &B, loc: u64) {
let idx = bitmap::Prunable::<N>::to_chunk_index(loc);
let rel = (loc % Self::CHUNK_BITS) as usize;
let chunk = self.chunk_mut(base, idx);
chunk[rel / 8] |= 1 << (rel % 8);
}
fn clear_bit<B: bitmap::Readable<N>>(&mut self, base: &B, pruned_chunks: usize, loc: u64) {
let idx = bitmap::Prunable::<N>::to_chunk_index(loc);
if idx < pruned_chunks {
return;
}
let rel = (loc % Self::CHUNK_BITS) as usize;
let chunk = self.chunk_mut(base, idx);
chunk[rel / 8] &= !(1 << (rel % 8));
}
pub(crate) fn get(&self, idx: usize) -> Option<&[u8; N]> {
self.chunks.get(&idx)
}
pub(crate) const fn complete_chunks(&self) -> usize {
(self.len / Self::CHUNK_BITS) as usize
}
}
pub(crate) fn next_candidate<F: Graftable, B: bitmap::Readable<N>, const N: usize>(
bitmap: &B,
floor: Location<F>,
tip: u64,
) -> Option<Location<F>> {
let floor = *floor;
let bitmap_len = bitmap.len();
let committed_end = bitmap_len.min(tip);
if floor < committed_end {
if let Some(idx) = bitmap.ones_iter_from(floor).next() {
if idx < committed_end {
return Some(Location::<F>::new(idx));
}
}
}
let candidate = floor.max(bitmap_len);
(candidate < tip).then(|| Location::<F>::new(candidate))
}
struct BatchStorageAdapter<
'a,
F: Graftable,
D: Digest,
R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
S: MerkleStorage<F, Digest = D>,
> {
batch: &'a R,
base: &'a S,
_phantom: core::marker::PhantomData<(F, D)>,
}
impl<
'a,
F: Graftable,
D: Digest,
R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
S: MerkleStorage<F, Digest = D>,
> BatchStorageAdapter<'a, F, D, R, S>
{
const fn new(batch: &'a R, base: &'a S) -> Self {
Self {
batch,
base,
_phantom: core::marker::PhantomData,
}
}
}
impl<
F: Graftable,
D: Digest,
R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
S: MerkleStorage<F, Digest = D>,
> MerkleStorage<F> for BatchStorageAdapter<'_, F, D, R, S>
{
type Digest = D;
async fn size(&self) -> Position<F> {
self.batch.size()
}
async fn get_node(&self, pos: Position<F>) -> Result<Option<D>, merkle::Error<F>> {
if let Some(node) = self.batch.get_node(pos) {
return Ok(Some(node));
}
self.base.get_node(pos).await
}
}
struct BatchOverMem<'a, F: Graftable, D: Digest, S: Strategy> {
batch: &'a GenericMerkleizedBatch<F, D, S>,
mem: &'a Mem<F, D>,
}
impl<F: Graftable, D: Digest, S: Strategy> Readable for BatchOverMem<'_, F, D, S> {
type Family = F;
type Digest = D;
type Error = merkle::Error<F>;
fn size(&self) -> Position<F> {
self.batch.size()
}
fn get_node(&self, pos: Position<F>) -> Option<D> {
if let Some(d) = self.batch.get_node(pos) {
return Some(d);
}
self.mem.get_node(pos)
}
fn pruning_boundary(&self) -> Location<F> {
self.batch.pruning_boundary()
}
}
pub struct UnmerkleizedBatch<F, H, U, const N: usize, S: Strategy>
where
F: Graftable,
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
inner: any::batch::UnmerkleizedBatch<F, H, U, S>,
grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>>,
bitmap_parent: BitmapBatch<N>,
}
pub struct MerkleizedBatch<
F: Graftable,
D: Digest,
U: update::Update + Send + Sync,
const N: usize,
S: Strategy,
> where
Operation<F, U>: Send + Sync,
{
pub(crate) inner: Arc<any::batch::MerkleizedBatch<F, D, U, S>>,
pub(crate) grafted: Arc<merkle::batch::MerkleizedBatch<F, D, S>>,
pub(crate) bitmap: BitmapBatch<N>,
pub(crate) canonical_root: D,
}
impl<F, H, U, const N: usize, S: Strategy> UnmerkleizedBatch<F, H, U, N, S>
where
F: Graftable,
U: update::Update + Send + Sync,
H: Hasher,
Operation<F, U>: Codec,
{
pub(super) const fn new(
inner: any::batch::UnmerkleizedBatch<F, H, U, S>,
grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>>,
bitmap_parent: BitmapBatch<N>,
) -> Self {
Self {
inner,
grafted_parent,
bitmap_parent,
}
}
pub fn write(mut self, key: U::Key, value: Option<U::Value>) -> Self {
self.inner = self.inner.write(key, value);
self
}
}
impl<F, K, V, H, const N: usize, S: Strategy> UnmerkleizedBatch<F, H, update::Unordered<K, V>, N, S>
where
F: Graftable,
K: Key,
V: ValueEncoding,
H: Hasher,
Operation<F, update::Unordered<K, V>>: Codec,
{
pub async fn get<E, C, I>(
&self,
key: &K,
db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
) -> Result<Option<V::Value>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
{
self.inner.get(key, &db.any).await
}
pub async fn get_many<E, C, I>(
&self,
keys: &[&K],
db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
) -> Result<Vec<Option<V::Value>>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
{
self.inner.get_many(keys, &db.any).await
}
pub async fn merkleize<E, C, I>(
self,
db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
metadata: Option<V::Value>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N, S>>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
{
let Self {
inner,
grafted_parent,
bitmap_parent,
} = self;
let inner = inner
.merkleize_with_floor_scan(&db.any, metadata, |floor, tip| {
next_candidate(&bitmap_parent, floor, tip)
})
.await?;
compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
}
}
impl<F, K, V, H, const N: usize, S: Strategy> UnmerkleizedBatch<F, H, update::Ordered<K, V>, N, S>
where
F: Graftable,
K: Key,
V: ValueEncoding,
H: Hasher,
Operation<F, update::Ordered<K, V>>: Codec,
{
pub async fn get<E, C, I>(
&self,
key: &K,
db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
) -> Result<Option<V::Value>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
I: crate::index::Ordered<Value = Location<F>> + 'static,
{
self.inner.get(key, &db.any).await
}
pub async fn get_many<E, C, I>(
&self,
keys: &[&K],
db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
) -> Result<Vec<Option<V::Value>>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
I: crate::index::Ordered<Value = Location<F>> + 'static,
{
self.inner.get_many(keys, &db.any).await
}
pub async fn merkleize<E, C, I>(
self,
db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
metadata: Option<V::Value>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N, S>>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
I: crate::index::Ordered<Value = Location<F>> + 'static,
{
let Self {
inner,
grafted_parent,
bitmap_parent,
} = self;
let inner = inner
.merkleize_with_floor_scan(&db.any, metadata, |floor, tip| {
next_candidate(&bitmap_parent, floor, tip)
})
.await?;
compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
}
}
#[allow(clippy::type_complexity)]
fn build_chunk_overlay<F: Graftable, U, B: bitmap::Readable<N>, const N: usize>(
base: &B,
batch_len: usize,
batch_base: u64,
diff: &[(U::Key, DiffEntry<F, U::Value>)],
ancestor_diffs: &[Arc<Vec<(U::Key, DiffEntry<F, U::Value>)>>],
) -> ChunkOverlay<N>
where
U: update::Update,
{
let total_bits = base.len() + batch_len as u64;
let mut overlay = ChunkOverlay::new(total_bits);
let pruned_chunks = base.pruned_chunks();
let commit_loc = batch_base + batch_len as u64 - 1;
overlay.set_bit(base, commit_loc);
overlay.clear_bit(base, pruned_chunks, batch_base - 1);
for (key, entry) in diff {
if let Some(loc) = entry.loc() {
if *loc >= batch_base && *loc < batch_base + batch_len as u64 {
overlay.set_bit(base, *loc);
}
}
let mut prev_loc = entry.base_old_loc();
for ancestor_diff in ancestor_diffs {
if let Some(ancestor_entry) = lookup_sorted(ancestor_diff.as_slice(), key) {
prev_loc = ancestor_entry.loc();
break;
}
}
if let Some(old) = prev_loc {
overlay.clear_bit(base, pruned_chunks, *old);
}
}
let parent_complete = base.complete_chunks();
let new_complete = overlay.complete_chunks();
for idx in parent_complete..new_complete {
overlay.chunk_mut(base, idx);
}
overlay
}
async fn compute_current_layer<F, E, U, C, I, H, const N: usize, S>(
inner: Arc<any::batch::MerkleizedBatch<F, H::Digest, U, S>>,
current_db: &super::db::Db<F, E, C, I, H, U, N, S>,
grafted_parent: &Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>>,
bitmap_parent: &BitmapBatch<N>,
) -> Result<Arc<MerkleizedBatch<F, H::Digest, U, N, S>>, Error<F>>
where
F: Graftable,
E: Context,
U: update::Update + Send + Sync,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
let batch_len = inner.journal_batch.items().len();
let batch_base = inner.bounds.total_size - batch_len as u64;
let overlay = build_chunk_overlay::<F, U, _, N>(
bitmap_parent,
batch_len,
batch_base,
&inner.diff,
&inner.ancestor_diffs,
);
let grafting_height = grafting::height::<N>();
let ops_tree_adapter =
BatchStorageAdapter::new(&inner.journal_batch, ¤t_db.any.log.merkle);
let overlay_ops_leaves = Location::<F>::new(inner.bounds.total_size);
let new_complete_chunks = overlay.complete_chunks();
let graftable_overlay = grafting::graftable_chunks::<F>(*overlay_ops_leaves, grafting_height)
.min(new_complete_chunks as u64) as usize;
let graftable_parent = *grafted_parent.leaves() as usize;
let pruned_chunks = bitmap_parent.pruned_chunks();
assert!(
pruned_chunks <= graftable_parent && graftable_parent <= graftable_overlay && graftable_overlay <= new_complete_chunks,
"invariant violated: pruned={pruned_chunks} graftable_parent={graftable_parent} graftable_overlay={graftable_overlay} new_complete={new_complete_chunks}"
);
let mut chunk_indices_to_update: BTreeSet<usize> = overlay
.chunks
.iter()
.filter(|(&idx, _)| idx < graftable_overlay && idx >= pruned_chunks)
.map(|(&idx, _)| idx)
.collect();
for idx in graftable_parent..graftable_overlay {
chunk_indices_to_update.insert(idx);
}
let chunks_to_update = chunk_indices_to_update.into_iter().map(|idx| {
let chunk = overlay
.get(idx)
.copied()
.unwrap_or_else(|| bitmap_parent.get_chunk(idx));
(idx, chunk)
});
let hasher = qmdb::hasher::<H>();
let new_leaves = compute_grafted_leaves::<F, H, S, N>(
&hasher,
&ops_tree_adapter,
chunks_to_update,
¤t_db.strategy,
)
.await?;
let grafted_batch = {
let mut grafted_batch = grafted_parent.new_batch();
let old_grafted_leaves = *grafted_parent.leaves() as usize;
for &(chunk_idx, digest) in &new_leaves {
if chunk_idx < old_grafted_leaves {
grafted_batch = grafted_batch
.update_leaf_digest(Location::<F>::new(chunk_idx as u64), digest)
.expect("update_leaf_digest failed");
} else {
grafted_batch = grafted_batch.add_leaf_digest(digest);
}
}
let gh = grafting::GraftedHasher::<F, _>::new(hasher.clone(), grafting_height);
grafted_batch.merkleize(¤t_db.grafted_tree, &gh)
};
let bitmap_batch = BitmapBatch::Layer(Arc::new(BitmapBatchLayer {
parent: bitmap_parent.clone(),
overlay: Arc::new(overlay),
shared: Arc::clone(bitmap_parent.shared()),
}));
let ops_root = inner.root();
let layered = BatchOverMem {
batch: &grafted_batch,
mem: ¤t_db.grafted_tree,
};
let grafted_storage =
grafting::Storage::new(&layered, grafting_height, &ops_tree_adapter, hasher.clone());
let partial = {
let rem = bitmap_batch.len() % BitmapBatch::<N>::CHUNK_SIZE_BITS;
if rem == 0 {
None
} else {
let idx = new_complete_chunks;
let chunk = bitmap_batch.get_chunk(idx);
Some((chunk, rem))
}
};
let canonical_root = compute_db_root::<F, H, _, _, N>(
&hasher,
&bitmap_batch,
&grafted_storage,
overlay_ops_leaves,
partial,
inner.bounds.inactivity_floor,
&ops_root,
)
.await?;
Ok(Arc::new(MerkleizedBatch {
inner,
grafted: grafted_batch,
bitmap: bitmap_batch,
canonical_root,
}))
}
#[derive(Clone, Debug)]
pub(crate) enum BitmapBatch<const N: usize> {
Base(Arc<Shared<N>>),
Layer(Arc<BitmapBatchLayer<N>>),
}
#[derive(Debug)]
pub(crate) struct BitmapBatchLayer<const N: usize> {
pub(crate) parent: BitmapBatch<N>,
pub(crate) overlay: Arc<ChunkOverlay<N>>,
pub(crate) shared: Arc<Shared<N>>,
}
impl<const N: usize> BitmapBatch<N> {
const CHUNK_SIZE_BITS: u64 = bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
fn shared(&self) -> &Arc<Shared<N>> {
match self {
Self::Base(s) => s,
Self::Layer(layer) => &layer.shared,
}
}
fn trim_committed(&self) -> Self {
let shared = self.shared();
let committed = bitmap::Readable::<N>::len(shared.as_ref());
let mut kept = Vec::new();
let mut current = self;
while let Self::Layer(layer) = current {
if layer.overlay.len <= committed {
break;
}
kept.push(Arc::clone(&layer.overlay));
current = &layer.parent;
}
let mut result = Self::Base(Arc::clone(shared));
for overlay in kept.into_iter().rev() {
result = Self::Layer(Arc::new(BitmapBatchLayer {
parent: result,
overlay,
shared: Arc::clone(shared),
}));
}
result
}
}
impl<const N: usize> bitmap::Readable<N> for BitmapBatch<N> {
fn complete_chunks(&self) -> usize {
(self.len() / Self::CHUNK_SIZE_BITS) as usize
}
fn get_chunk(&self, idx: usize) -> [u8; N] {
let mut current = self;
loop {
match current {
Self::Base(shared) => return shared.get_chunk(idx),
Self::Layer(layer) => {
if let Some(&chunk) = layer.overlay.get(idx) {
return chunk;
}
current = &layer.parent;
}
}
}
}
fn last_chunk(&self) -> ([u8; N], u64) {
let total = self.len();
if total == 0 {
return ([0u8; N], 0);
}
let rem = total % Self::CHUNK_SIZE_BITS;
let bits_in_last = if rem == 0 { Self::CHUNK_SIZE_BITS } else { rem };
let idx = if rem == 0 {
self.complete_chunks().saturating_sub(1)
} else {
self.complete_chunks()
};
(self.get_chunk(idx), bits_in_last)
}
fn pruned_chunks(&self) -> usize {
self.shared().pruned_chunks()
}
fn len(&self) -> u64 {
match self {
Self::Base(shared) => bitmap::Readable::<N>::len(shared.as_ref()),
Self::Layer(layer) => layer.overlay.len,
}
}
}
impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize, S: Strategy>
MerkleizedBatch<F, D, U, N, S>
where
Operation<F, U>: Send + Sync,
{
pub const fn root(&self) -> D {
self.canonical_root
}
pub fn ops_root(&self) -> D {
self.inner.root()
}
pub fn bounds(&self) -> &Bounds<F> {
self.inner.bounds()
}
pub fn sync_boundary(&self) -> Location<F> {
super::db::sync_boundary::<F, N>(
self.bitmap.pruned_chunks() as u64,
self.inner.bounds().total_size,
)
}
}
impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize, S: Strategy>
MerkleizedBatch<F, D, U, N, S>
where
Operation<F, U>: Codec,
{
pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, U, N, S>
where
H: Hasher<Digest = D>,
{
UnmerkleizedBatch::new(
self.inner.new_batch::<H>(),
Arc::clone(&self.grafted),
self.bitmap.trim_committed(),
)
}
pub async fn get<E, C, I, H>(
&self,
key: &U::Key,
db: &super::db::Db<F, E, C, I, H, U, N, S>,
) -> Result<Option<U::Value>, Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher<Digest = D>,
{
self.inner.get(key, &db.any).await
}
pub async fn get_many<E, C, I, H>(
&self,
keys: &[&U::Key],
db: &super::db::Db<F, E, C, I, H, U, N, S>,
) -> Result<Vec<Option<U::Value>>, Error<F>>
where
E: Context,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher<Digest = D>,
{
self.inner.get_many(keys, &db.any).await
}
}
impl<F, E, C, I, H, U, const N: usize, S> super::db::Db<F, E, C, I, H, U, N, S>
where
F: Graftable,
E: Context,
U: update::Update + Send + Sync,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, U, N, S>> {
let grafted = self.grafted_snapshot();
Arc::new(MerkleizedBatch {
inner: self.any.to_batch(),
grafted,
bitmap: BitmapBatch::Base(Arc::clone(&self.any.bitmap)),
canonical_root: self.root,
})
}
}
#[cfg(any(test, feature = "test-traits"))]
mod trait_impls {
use super::*;
use crate::{
journal::contiguous::Mutable,
qmdb::any::traits::{
BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
UnmerkleizedBatch as UnmerkleizedBatchTrait,
},
Persistable,
};
use std::future::Future;
type CurrentDb<F, E, C, I, H, U, const N: usize, S> =
crate::qmdb::current::db::Db<F, E, C, I, H, U, N, S>;
impl<F, K, V, H, E, C, I, const N: usize, S>
UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N, S>>
for UnmerkleizedBatch<F, H, update::Unordered<K, V>, N, S>
where
F: Graftable,
K: Key,
V: ValueEncoding + 'static,
H: Hasher,
E: Context,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
+ Persistable<Error = crate::journal::Error>,
I: UnorderedIndex<Value = Location<F>> + 'static,
S: Strategy,
Operation<F, update::Unordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Metadata = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N, S>>;
fn write(self, key: K, value: Option<V::Value>) -> Self {
Self::write(self, key, value)
}
async fn merkleize(
self,
db: &CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N, S>,
metadata: Option<V::Value>,
) -> Result<Self::Merkleized, crate::qmdb::Error<F>> {
self.merkleize(db, metadata).await
}
}
impl<F, K, V, H, E, C, I, const N: usize, S>
UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N, S>>
for UnmerkleizedBatch<F, H, update::Ordered<K, V>, N, S>
where
F: Graftable,
K: Key,
V: ValueEncoding + 'static,
H: Hasher,
E: Context,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
+ Persistable<Error = crate::journal::Error>,
I: crate::index::Ordered<Value = Location<F>> + 'static,
S: Strategy,
Operation<F, update::Ordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Metadata = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N, S>>;
fn write(self, key: K, value: Option<V::Value>) -> Self {
Self::write(self, key, value)
}
async fn merkleize(
self,
db: &CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N, S>,
metadata: Option<V::Value>,
) -> Result<Self::Merkleized, crate::qmdb::Error<F>> {
self.merkleize(db, metadata).await
}
}
impl<
F: Graftable,
D: Digest,
U: update::Update + Send + Sync + 'static,
const N: usize,
S: Strategy,
> MerkleizedBatchTrait for Arc<MerkleizedBatch<F, D, U, N, S>>
where
Operation<F, U>: Codec,
{
type Digest = D;
fn root(&self) -> D {
MerkleizedBatch::root(self)
}
}
impl<F, E, K, V, C, I, H, const N: usize, S> BatchableDb
for CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N, S>
where
F: Graftable,
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
+ Persistable<Error = crate::journal::Error>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher,
S: Strategy,
Operation<F, update::Unordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N, S>>;
type Batch = UnmerkleizedBatch<F, H, update::Unordered<K, V>, N, S>;
fn new_batch(&self) -> Self::Batch {
self.new_batch()
}
fn apply_batch(
&mut self,
batch: Self::Merkleized,
) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
{
self.apply_batch(batch)
}
}
impl<F, E, K, V, C, I, H, const N: usize, S> BatchableDb
for CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N, S>
where
F: Graftable,
E: Context,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
+ Persistable<Error = crate::journal::Error>,
I: crate::index::Ordered<Value = Location<F>> + 'static,
H: Hasher,
S: Strategy,
Operation<F, update::Ordered<K, V>>: Codec,
{
type Family = F;
type K = K;
type V = V::Value;
type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N, S>>;
type Batch = UnmerkleizedBatch<F, H, update::Ordered<K, V>, N, S>;
fn new_batch(&self) -> Self::Batch {
self.new_batch()
}
fn apply_batch(
&mut self,
batch: Self::Merkleized,
) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
{
self.apply_batch(batch)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mmr;
use commonware_utils::bitmap::Prunable as BitMap;
const N: usize = 4;
type Bm = BitMap<N>;
type Location = mmr::Location;
fn make_bitmap(bits: &[bool]) -> Bm {
let mut bm = Bm::new();
for &b in bits {
bm.push(b);
}
bm
}
#[test]
fn chunk_overlay_pushes() {
use crate::qmdb::any::value::FixedEncoding;
use commonware_utils::sequence::FixedBytes;
type K = FixedBytes<4>;
type V = FixedEncoding<u64>;
type U = crate::qmdb::any::operation::update::Unordered<K, V>;
let key1 = FixedBytes::from([1, 0, 0, 0]);
let key2 = FixedBytes::from([2, 0, 0, 0]);
let base = make_bitmap(&[true; 4]);
let mut diff = vec![
(
key1,
DiffEntry::Active {
value: 100u64,
loc: Location::new(4), base_old_loc: None,
},
),
(
key2,
DiffEntry::Active {
value: 200u64,
loc: Location::new(99), base_old_loc: None,
},
),
];
diff.sort_by(|a, b| a.0.cmp(&b.0));
let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 4, 4, &diff, &[]);
let c0 = overlay.get(0).expect("chunk 0 should be dirty");
assert_ne!(c0[0] & (1 << 4), 0); assert_eq!(c0[0] & (1 << 5), 0); assert_eq!(c0[0] & (1 << 6), 0); assert_ne!(c0[0] & (1 << 7), 0); assert_eq!(c0[0] & (1 << 3), 0); }
#[test]
fn chunk_overlay_clears() {
use crate::qmdb::any::value::FixedEncoding;
use commonware_utils::sequence::FixedBytes;
type K = FixedBytes<4>;
type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
let key1 = FixedBytes::from([1, 0, 0, 0]);
let key2 = FixedBytes::from([2, 0, 0, 0]);
let key3 = FixedBytes::from([3, 0, 0, 0]);
let base = make_bitmap(&[true; 64]);
let mut diff: Vec<(K, DiffEntry<mmr::Family, u64>)> = vec![
(
key1,
DiffEntry::Active {
value: 100,
loc: Location::new(70),
base_old_loc: Some(Location::new(5)),
},
),
(
key2,
DiffEntry::Deleted {
base_old_loc: Some(Location::new(10)),
},
),
(
key3,
DiffEntry::Active {
value: 300,
loc: Location::new(71),
base_old_loc: None,
},
),
];
diff.sort_by(|a, b| a.0.cmp(&b.0));
let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 8, 64, &diff, &[]);
let c0 = overlay.get(0).expect("chunk 0 should be dirty");
assert_eq!(c0[0] & (1 << 5), 0); assert_eq!(c0[1] & (1 << 2), 0);
assert_eq!(c0[0] & (1 << 4), 1 << 4); assert_eq!(c0[1] & (1 << 3), 1 << 3); }
#[test]
fn chunk_overlay_preserves_partial_parent_chunk() {
use crate::qmdb::any::value::FixedEncoding;
use commonware_utils::sequence::FixedBytes;
type K = FixedBytes<4>;
type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
let base = make_bitmap(&[true; 20]);
assert_eq!(base.complete_chunks(), 0);
let key1 = FixedBytes::from([1, 0, 0, 0]);
let mut diff = vec![(
key1,
DiffEntry::Active {
value: 42u64,
loc: Location::new(35),
base_old_loc: None,
},
)];
diff.sort_by(|a, b| a.0.cmp(&b.0));
let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 20, 20, &diff, &[]);
let c0 = overlay.get(0).expect("chunk 0 should be in overlay");
assert_eq!(c0[0], 0xFF);
assert_eq!(c0[1], 0xFF);
assert_eq!(c0[2], 0x07);
}
#[test]
fn bitmap_scan_all_active() {
let bm = make_bitmap(&[true; 8]);
for i in 0..8 {
assert_eq!(
next_candidate(&bm, Location::new(i), 8),
Some(Location::new(i))
);
}
assert_eq!(next_candidate(&bm, Location::new(8), 8), None);
}
#[test]
fn bitmap_scan_all_inactive() {
let bm = make_bitmap(&[false; 8]);
assert_eq!(next_candidate(&bm, Location::new(0), 8), None);
}
#[test]
fn bitmap_scan_skips_inactive() {
let bm = make_bitmap(&[false, false, true, false, true]);
assert_eq!(
next_candidate(&bm, Location::new(0), 5),
Some(Location::new(2))
);
assert_eq!(
next_candidate(&bm, Location::new(3), 5),
Some(Location::new(4))
);
assert_eq!(next_candidate(&bm, Location::new(5), 5), None);
}
#[test]
fn bitmap_scan_beyond_bitmap_len_returns_candidate() {
let bm = make_bitmap(&[false; 4]);
assert_eq!(
next_candidate(&bm, Location::new(0), 8),
Some(Location::new(4))
);
assert_eq!(
next_candidate(&bm, Location::new(6), 8),
Some(Location::new(6))
);
}
#[test]
fn bitmap_scan_respects_tip() {
let bm = make_bitmap(&[false, false, false, true]);
assert_eq!(next_candidate(&bm, Location::new(0), 3), None);
assert_eq!(
next_candidate(&bm, Location::new(0), 4),
Some(Location::new(3))
);
}
#[test]
fn bitmap_scan_floor_at_tip() {
let bm = make_bitmap(&[true; 4]);
assert_eq!(next_candidate(&bm, Location::new(4), 4), None);
}
#[test]
fn bitmap_scan_empty_bitmap() {
let bm = Bm::new();
assert_eq!(
next_candidate(&bm, Location::new(0), 5),
Some(Location::new(0))
);
assert_eq!(next_candidate(&bm, Location::new(0), 0), None);
}
fn make_chain(shared: &Arc<Shared<N>>, overlay_lens: &[u64]) -> BitmapBatch<N> {
let mut chain = BitmapBatch::Base(Arc::clone(shared));
for &len in overlay_lens {
chain = BitmapBatch::Layer(Arc::new(BitmapBatchLayer {
parent: chain,
overlay: Arc::new(ChunkOverlay::new(len)),
shared: Arc::clone(shared),
}));
}
chain
}
fn chain_overlays(batch: &BitmapBatch<N>) -> Vec<u64> {
let mut lens = Vec::new();
let mut current = batch;
while let BitmapBatch::Layer(layer) = current {
lens.push(layer.overlay.len);
current = &layer.parent;
}
assert!(matches!(current, BitmapBatch::Base(_)));
lens.reverse();
lens
}
#[test]
fn trim_committed_already_base() {
let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
let base = BitmapBatch::Base(Arc::clone(&shared));
let result = base.trim_committed();
match result {
BitmapBatch::Base(s) => assert!(Arc::ptr_eq(&s, &shared)),
BitmapBatch::Layer(_) => panic!("expected Base"),
}
}
#[test]
fn trim_committed_all_committed() {
let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
let chain = make_chain(&shared, &[32]);
let result = chain.trim_committed();
match result {
BitmapBatch::Base(s) => assert!(Arc::ptr_eq(&s, &shared)),
BitmapBatch::Layer(_) => panic!("expected Base after full trim"),
}
}
#[test]
fn trim_committed_none_committed() {
let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 32])));
let chain = make_chain(&shared, &[64, 96]);
let result = chain.trim_committed();
assert_eq!(chain_overlays(&result), vec![64, 96]);
}
#[test]
fn trim_committed_exactly_one_uncommitted() {
let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
let chain = make_chain(&shared, &[64, 96]);
let result = chain.trim_committed();
assert_eq!(chain_overlays(&result), vec![96]);
assert!(Arc::ptr_eq(result.shared(), &shared));
}
#[test]
fn trim_committed_multiple_uncommitted() {
let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
let chain = make_chain(&shared, &[64, 96, 128]);
let result = chain.trim_committed();
assert_eq!(chain_overlays(&result), vec![96, 128]);
assert!(Arc::ptr_eq(result.shared(), &shared));
}
}