Skip to main content

commonware_storage/qmdb/current/
batch.rs

1//! Batch mutation API for Current QMDBs.
2//!
3//! Wraps the [`any::batch`] API.
4
5use crate::{
6    index::Unordered as UnorderedIndex,
7    journal::contiguous::{Contiguous, Mutable},
8    merkle::{
9        self, batch::MerkleizedBatch as GenericMerkleizedBatch, mem::Mem,
10        storage::Storage as MerkleStorage, Graftable, Location, Position, Readable,
11    },
12    qmdb::{
13        self,
14        any::{
15            self,
16            batch::{lookup_sorted, DiffEntry},
17            operation::{update, Operation},
18            ValueEncoding,
19        },
20        batch_chain::Bounds,
21        bitmap::Shared,
22        current::{
23            db::{compute_db_root, compute_grafted_leaves},
24            grafting,
25        },
26        operation::Key,
27        Error,
28    },
29    Context,
30};
31use ahash::AHasher;
32use commonware_codec::Codec;
33use commonware_cryptography::{Digest, Hasher};
34use commonware_parallel::Strategy;
35use commonware_utils::bitmap::{self, Readable as _};
36use std::{
37    collections::{BTreeSet, HashMap},
38    hash::BuildHasherDefault,
39    sync::Arc,
40};
41
42/// Speculative chunk-level bitmap overlay.
43///
44/// Instead of tracking individual pushed bits and cleared locations, maintains materialized chunk
45/// bytes for every chunk that differs from the parent bitmap. This directly produces the chunk data
46/// needed for grafted MMR leaf computation.
47#[derive(Clone, Debug, Default)]
48pub(crate) struct ChunkOverlay<const N: usize> {
49    /// Dirty chunks: chunk_idx -> materialized chunk bytes.
50    ///
51    /// `ahash` (fast on integer keys) with `BuildHasherDefault` (no per-construction RNG
52    /// sampling). Iteration order is not observed by any consumer.
53    pub(crate) chunks: HashMap<usize, [u8; N], BuildHasherDefault<AHasher>>,
54    /// Total number of bits (parent + new operations).
55    pub(crate) len: u64,
56}
57
58impl<const N: usize> ChunkOverlay<N> {
59    const CHUNK_BITS: u64 = bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
60
61    fn new(len: u64) -> Self {
62        Self {
63            chunks: HashMap::default(),
64            len,
65        }
66    }
67
68    /// Load-or-create a chunk: returns a mutable reference to the materialized chunk bytes. On
69    /// first access for an existing chunk, reads from `base`.
70    fn chunk_mut<B: bitmap::Readable<N>>(&mut self, base: &B, idx: usize) -> &mut [u8; N] {
71        self.chunks.entry(idx).or_insert_with(|| {
72            let base_len = base.len();
73            let base_complete = base.complete_chunks();
74            let base_has_partial = !base_len.is_multiple_of(Self::CHUNK_BITS);
75            if idx < base_complete {
76                base.get_chunk(idx)
77            } else if idx == base_complete && base_has_partial {
78                base.last_chunk().0
79            } else {
80                [0u8; N]
81            }
82        })
83    }
84
85    /// Set a single bit (used for pushes and active operations).
86    fn set_bit<B: bitmap::Readable<N>>(&mut self, base: &B, loc: u64) {
87        let idx = bitmap::Prunable::<N>::to_chunk_index(loc);
88        let rel = (loc % Self::CHUNK_BITS) as usize;
89        let chunk = self.chunk_mut(base, idx);
90        chunk[rel / 8] |= 1 << (rel % 8);
91    }
92
93    /// Clear a single bit (used for superseded locations). `pruned_chunks` is passed in by the
94    /// caller so the hot loop in `build_chunk_overlay` reads it once rather than per call.
95    /// Skips locations in pruned chunks since those bits are already inactive.
96    fn clear_bit<B: bitmap::Readable<N>>(&mut self, base: &B, pruned_chunks: usize, loc: u64) {
97        let idx = bitmap::Prunable::<N>::to_chunk_index(loc);
98        if idx < pruned_chunks {
99            return;
100        }
101        let rel = (loc % Self::CHUNK_BITS) as usize;
102        let chunk = self.chunk_mut(base, idx);
103        chunk[rel / 8] &= !(1 << (rel % 8));
104    }
105
106    /// Get a dirty chunk's bytes, or `None` if unmodified.
107    pub(crate) fn get(&self, idx: usize) -> Option<&[u8; N]> {
108        self.chunks.get(&idx)
109    }
110
111    /// Number of complete chunks.
112    pub(crate) const fn complete_chunks(&self) -> usize {
113        (self.len / Self::CHUNK_BITS) as usize
114    }
115}
116
117/// Bitmap-accelerated floor scan over a layered `BitmapBatch` chain. Skips locations where the
118/// bitmap bit is unset, avoiding I/O reads for inactive operations.
119///
120/// Mirrors the contract on `any::batch::next_candidate`: may return only locations that are
121/// *possibly* active in `[floor, tip)`, may skip locations only when known inactive.
122/// `is_active_at` revalidates each candidate, so false positives are tolerated; false negatives
123/// are forbidden.
124///
125/// False positives can arise two ways:
126/// - In the committed prefix, an uncommitted ancestor batch in the chain may have superseded
127///   the location -- the committed bitmap doesn't reflect uncommitted shadows.
128/// - Beyond the committed bitmap, locations are returned as sequential candidates (one per
129///   index) without per-location filtering, so any inactive uncommitted op shows up here.
130pub(crate) fn next_candidate<F: Graftable, B: bitmap::Readable<N>, const N: usize>(
131    bitmap: &B,
132    floor: Location<F>,
133    tip: u64,
134) -> Option<Location<F>> {
135    let floor = *floor;
136    let bitmap_len = bitmap.len();
137    let committed_end = bitmap_len.min(tip);
138    if floor < committed_end {
139        if let Some(idx) = bitmap.ones_iter_from(floor).next() {
140            if idx < committed_end {
141                return Some(Location::<F>::new(idx));
142            }
143        }
144    }
145    let candidate = floor.max(bitmap_len);
146    (candidate < tip).then(|| Location::<F>::new(candidate))
147}
148
149/// Adapter that resolves ops MMR nodes for a batch's `compute_current_layer`.
150///
151/// Tries the batch chain's sync [`Readable`] first (which covers nodes appended or overwritten
152/// by the batch, plus anything still in the in-memory MMR). Falls through to the base's async
153/// [`MerkleStorage`].
154struct BatchStorageAdapter<
155    'a,
156    F: Graftable,
157    D: Digest,
158    R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
159    S: MerkleStorage<F, Digest = D>,
160> {
161    batch: &'a R,
162    base: &'a S,
163    _phantom: core::marker::PhantomData<(F, D)>,
164}
165
166impl<
167        'a,
168        F: Graftable,
169        D: Digest,
170        R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
171        S: MerkleStorage<F, Digest = D>,
172    > BatchStorageAdapter<'a, F, D, R, S>
173{
174    const fn new(batch: &'a R, base: &'a S) -> Self {
175        Self {
176            batch,
177            base,
178            _phantom: core::marker::PhantomData,
179        }
180    }
181}
182
183impl<
184        F: Graftable,
185        D: Digest,
186        R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
187        S: MerkleStorage<F, Digest = D>,
188    > MerkleStorage<F> for BatchStorageAdapter<'_, F, D, R, S>
189{
190    type Digest = D;
191
192    async fn size(&self) -> Position<F> {
193        self.batch.size()
194    }
195    async fn get_node(&self, pos: Position<F>) -> Result<Option<D>, merkle::Error<F>> {
196        if let Some(node) = self.batch.get_node(pos) {
197            return Ok(Some(node));
198        }
199        self.base.get_node(pos).await
200    }
201}
202
203/// Layers a [`GenericMerkleizedBatch`] over a [`Mem`] for node resolution.
204///
205/// [`GenericMerkleizedBatch::get_node`] only covers the batch chain; committed positions
206/// return `None`. This adapter falls through to the committed Mem for those positions.
207struct BatchOverMem<'a, F: Graftable, D: Digest, S: Strategy> {
208    batch: &'a GenericMerkleizedBatch<F, D, S>,
209    mem: &'a Mem<F, D>,
210}
211
212impl<F: Graftable, D: Digest, S: Strategy> Readable for BatchOverMem<'_, F, D, S> {
213    type Family = F;
214    type Digest = D;
215    type Error = merkle::Error<F>;
216
217    fn size(&self) -> Position<F> {
218        self.batch.size()
219    }
220
221    fn get_node(&self, pos: Position<F>) -> Option<D> {
222        if let Some(d) = self.batch.get_node(pos) {
223            return Some(d);
224        }
225        self.mem.get_node(pos)
226    }
227
228    fn pruning_boundary(&self) -> Location<F> {
229        self.batch.pruning_boundary()
230    }
231}
232
233/// A speculative batch of mutations whose root digest has not yet been computed,
234/// in contrast to [`MerkleizedBatch`].
235///
236/// Wraps a [`any::batch::UnmerkleizedBatch`] and adds bitmap and grafted MMR parent state
237/// needed to compute the current layer during [`merkleize`](Self::merkleize).
238pub struct UnmerkleizedBatch<F, H, U, const N: usize, S: Strategy>
239where
240    F: Graftable,
241    U: update::Update + Send + Sync,
242    H: Hasher,
243    Operation<F, U>: Codec,
244{
245    /// The inner any-layer batch that handles mutations, journal, and floor raise.
246    inner: any::batch::UnmerkleizedBatch<F, H, U, S>,
247
248    /// Parent's grafted MMR state.
249    grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>>,
250
251    /// Parent's bitmap state (COW, Arc-based).
252    bitmap_parent: BitmapBatch<N>,
253}
254
255/// A speculative batch of operations whose root digest has been computed, in contrast to
256/// [`UnmerkleizedBatch`].
257///
258/// Wraps an [`any::batch::MerkleizedBatch`] and adds the bitmap and grafted MMR state needed to
259/// compute the canonical root.
260///
261/// # Branch validity
262///
263/// A `MerkleizedBatch` is a branch-scoped view rooted at a specific committed prefix of the DB. It
264/// is not an immutable snapshot.
265///
266/// Internally, the batch chain terminates in the DB's committed bitmap via `BitmapBatch::Base`.
267/// That committed bitmap evolves in place as [`Db::apply_batch`](super::db::Db::apply_batch),
268/// [`Db::prune`](super::db::Db::prune), and [`Db::rewind`](super::db::Db::rewind) update the DB.
269///
270/// Reads through this batch's chain, constructing child batches from it, and applying it later are
271/// only semantically correct while its ancestor chain is still the committed prefix of the DB. In
272/// other words, every successful [`apply_batch`](super::db::Db::apply_batch) since this batch was
273/// merkleized must have applied an ancestor of this batch.
274///
275/// Once a non-ancestor batch is applied, this batch and all of its descendants become invalid
276/// objects. The library does not guard against continued use after that point.
277///
278/// Applying an invalid batch is caught by the any-layer staleness check and returns
279/// [`Error::StaleBatch`] without mutating committed state, so `apply_batch` itself cannot corrupt
280/// the DB. The one exception is equal-size sibling branches (where both branches have the same
281/// total operation count): the staleness check is size-based and cannot distinguish them, so
282/// applying a descendant of one sibling after the other was already applied can silently corrupt
283/// snapshot/log state. Callers must not apply batches from an orphaned branch.
284///
285/// Rules of thumb:
286/// - Drop any `Arc<MerkleizedBatch>` you no longer intend to apply.
287/// - Extending a batch after `apply_batch` has consumed it (building a child off the just-applied
288///   parent) is safe. The committed bitmap now equals the parent's post-apply state, so child reads
289///   are consistent.
290/// - Extending a batch after a different branch has been applied is not safe. Do not call `get`,
291///   `new_batch`, or `apply_batch` on that branch again.
292pub struct MerkleizedBatch<
293    F: Graftable,
294    D: Digest,
295    U: update::Update + Send + Sync,
296    const N: usize,
297    S: Strategy,
298> where
299    Operation<F, U>: Send + Sync,
300{
301    /// Inner any-layer batch (ops MMR, diff, floor, commit loc, sizes).
302    pub(crate) inner: Arc<any::batch::MerkleizedBatch<F, D, U, S>>,
303
304    /// Grafted MMR state.
305    pub(crate) grafted: Arc<merkle::batch::MerkleizedBatch<F, D, S>>,
306
307    /// COW bitmap state (for use as a parent in speculative batches).
308    pub(crate) bitmap: BitmapBatch<N>,
309
310    /// The canonical root (ops root + grafted root + partial chunk).
311    pub(crate) canonical_root: D,
312}
313
314impl<F, H, U, const N: usize, S: Strategy> UnmerkleizedBatch<F, H, U, N, S>
315where
316    F: Graftable,
317    U: update::Update + Send + Sync,
318    H: Hasher,
319    Operation<F, U>: Codec,
320{
321    pub(super) const fn new(
322        inner: any::batch::UnmerkleizedBatch<F, H, U, S>,
323        grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>>,
324        bitmap_parent: BitmapBatch<N>,
325    ) -> Self {
326        Self {
327            inner,
328            grafted_parent,
329            bitmap_parent,
330        }
331    }
332
333    /// Record a mutation. Use `Some(value)` for update/create, `None` for delete.
334    ///
335    /// If the same key is written multiple times within a batch, the last
336    /// value wins.
337    pub fn write(mut self, key: U::Key, value: Option<U::Value>) -> Self {
338        self.inner = self.inner.write(key, value);
339        self
340    }
341}
342
343// Unordered get + merkleize.
344impl<F, K, V, H, const N: usize, S: Strategy> UnmerkleizedBatch<F, H, update::Unordered<K, V>, N, S>
345where
346    F: Graftable,
347    K: Key,
348    V: ValueEncoding,
349    H: Hasher,
350    Operation<F, update::Unordered<K, V>>: Codec,
351{
352    /// Read through: mutations -> ancestor diffs -> committed DB.
353    pub async fn get<E, C, I>(
354        &self,
355        key: &K,
356        db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
357    ) -> Result<Option<V::Value>, Error<F>>
358    where
359        E: Context,
360        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
361        I: UnorderedIndex<Value = Location<F>> + 'static,
362    {
363        self.inner.get(key, &db.any).await
364    }
365
366    /// Batch read multiple keys.
367    ///
368    /// Returns results in the same order as the input keys.
369    pub async fn get_many<E, C, I>(
370        &self,
371        keys: &[&K],
372        db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
373    ) -> Result<Vec<Option<V::Value>>, Error<F>>
374    where
375        E: Context,
376        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
377        I: UnorderedIndex<Value = Location<F>> + 'static,
378    {
379        self.inner.get_many(keys, &db.any).await
380    }
381
382    /// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
383    pub async fn merkleize<E, C, I>(
384        self,
385        db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
386        metadata: Option<V::Value>,
387    ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N, S>>, Error<F>>
388    where
389        E: Context,
390        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
391        I: UnorderedIndex<Value = Location<F>> + 'static,
392    {
393        let Self {
394            inner,
395            grafted_parent,
396            bitmap_parent,
397        } = self;
398        // Use the speculative parent bitmap rather than the committed `any` bitmap.
399        let inner = inner
400            .merkleize_with_floor_scan(&db.any, metadata, |floor, tip| {
401                next_candidate(&bitmap_parent, floor, tip)
402            })
403            .await?;
404        compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
405    }
406}
407
408// Ordered get + merkleize.
409impl<F, K, V, H, const N: usize, S: Strategy> UnmerkleizedBatch<F, H, update::Ordered<K, V>, N, S>
410where
411    F: Graftable,
412    K: Key,
413    V: ValueEncoding,
414    H: Hasher,
415    Operation<F, update::Ordered<K, V>>: Codec,
416{
417    /// Read through: mutations -> ancestor diffs -> committed DB.
418    pub async fn get<E, C, I>(
419        &self,
420        key: &K,
421        db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
422    ) -> Result<Option<V::Value>, Error<F>>
423    where
424        E: Context,
425        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
426        I: crate::index::Ordered<Value = Location<F>> + 'static,
427    {
428        self.inner.get(key, &db.any).await
429    }
430
431    /// Batch read multiple keys.
432    ///
433    /// Returns results in the same order as the input keys.
434    pub async fn get_many<E, C, I>(
435        &self,
436        keys: &[&K],
437        db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
438    ) -> Result<Vec<Option<V::Value>>, Error<F>>
439    where
440        E: Context,
441        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
442        I: crate::index::Ordered<Value = Location<F>> + 'static,
443    {
444        self.inner.get_many(keys, &db.any).await
445    }
446
447    /// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
448    pub async fn merkleize<E, C, I>(
449        self,
450        db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
451        metadata: Option<V::Value>,
452    ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N, S>>, Error<F>>
453    where
454        E: Context,
455        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
456        I: crate::index::Ordered<Value = Location<F>> + 'static,
457    {
458        let Self {
459            inner,
460            grafted_parent,
461            bitmap_parent,
462        } = self;
463        // Use the speculative parent bitmap rather than the committed `any` bitmap.
464        let inner = inner
465            .merkleize_with_floor_scan(&db.any, metadata, |floor, tip| {
466                next_candidate(&bitmap_parent, floor, tip)
467            })
468            .await?;
469        compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
470    }
471}
472
473/// Derive all bitmap mutations (pushes + clears) for this batch in a single pass over the diff and
474/// ancestor diffs. Avoids iterating raw operations.
475///
476/// Pushes: one bit per operation in the batch. All false except active diff entries (whose `loc`
477/// falls in the batch) and the CommitFloor (last op).
478///
479/// Clears: previous CommitFloor, plus the most recent superseded location for each mutated key. We
480/// search back through ancestors to find the most recent active location; if none exists, we clear
481/// the committed DB location (`base_old_loc`).
482#[allow(clippy::type_complexity)]
483fn build_chunk_overlay<F: Graftable, U, B: bitmap::Readable<N>, const N: usize>(
484    base: &B,
485    batch_len: usize,
486    batch_base: u64,
487    diff: &[(U::Key, DiffEntry<F, U::Value>)],
488    ancestor_diffs: &[Arc<Vec<(U::Key, DiffEntry<F, U::Value>)>>],
489) -> ChunkOverlay<N>
490where
491    U: update::Update,
492{
493    let total_bits = base.len() + batch_len as u64;
494    let mut overlay = ChunkOverlay::new(total_bits);
495    let pruned_chunks = base.pruned_chunks();
496
497    // 1. CommitFloor (last op) is always active.
498    let commit_loc = batch_base + batch_len as u64 - 1;
499    overlay.set_bit(base, commit_loc);
500
501    // 2. Inactivate previous CommitFloor.
502    overlay.clear_bit(base, pruned_chunks, batch_base - 1);
503
504    // 3. Set active bits + clear superseded locations from the diff.
505    for (key, entry) in diff {
506        // Set the active bit for this key's final location.
507        if let Some(loc) = entry.loc() {
508            if *loc >= batch_base && *loc < batch_base + batch_len as u64 {
509                overlay.set_bit(base, *loc);
510            }
511        }
512
513        // Clear the most recent superseded location. Older locations were already cleared by the
514        // ancestor batch that superseded them.
515        let mut prev_loc = entry.base_old_loc();
516        for ancestor_diff in ancestor_diffs {
517            if let Some(ancestor_entry) = lookup_sorted(ancestor_diff.as_slice(), key) {
518                prev_loc = ancestor_entry.loc();
519                break;
520            }
521        }
522        if let Some(old) = prev_loc {
523            overlay.clear_bit(base, pruned_chunks, *old);
524        }
525    }
526
527    // Ensure all new complete chunks beyond the parent are materialized, so downstream consumers
528    // don't read from the parent and panic on out-of-range indices. Uses chunk_mut to inherit the
529    // parent's partial chunk data when idx == parent_complete (avoiding loss of existing bits).
530    let parent_complete = base.complete_chunks();
531    let new_complete = overlay.complete_chunks();
532    for idx in parent_complete..new_complete {
533        overlay.chunk_mut(base, idx);
534    }
535
536    overlay
537}
538
539/// Compute the current layer (bitmap + grafted MMR + canonical root) on top of a merkleized any
540/// batch.
541///
542/// Builds a chunk overlay from the diff, computes grafted MMR leaves from dirty chunks, and
543/// produces the `Arc<MerkleizedBatch>` directly.
544async fn compute_current_layer<F, E, U, C, I, H, const N: usize, S>(
545    inner: Arc<any::batch::MerkleizedBatch<F, H::Digest, U, S>>,
546    current_db: &super::db::Db<F, E, C, I, H, U, N, S>,
547    grafted_parent: &Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>>,
548    bitmap_parent: &BitmapBatch<N>,
549) -> Result<Arc<MerkleizedBatch<F, H::Digest, U, N, S>>, Error<F>>
550where
551    F: Graftable,
552    E: Context,
553    U: update::Update + Send + Sync,
554    C: Contiguous<Item = Operation<F, U>>,
555    I: UnorderedIndex<Value = Location<F>>,
556    H: Hasher,
557    S: Strategy,
558    Operation<F, U>: Codec,
559{
560    let batch_len = inner.journal_batch.items().len();
561    let batch_base = inner.bounds.total_size - batch_len as u64;
562
563    // Build chunk overlay: materialized bytes for every dirty chunk.
564    let overlay = build_chunk_overlay::<F, U, _, N>(
565        bitmap_parent,
566        batch_len,
567        batch_base,
568        &inner.diff,
569        &inner.ancestor_diffs,
570    );
571
572    let grafting_height = grafting::height::<N>();
573    let ops_tree_adapter =
574        BatchStorageAdapter::new(&inner.journal_batch, &current_db.any.log.merkle);
575
576    // Snapshot ops_leaves for the post-batch state (the canonical root we're about to compute
577    // sees this many ops). Thread it through `graftable_chunks` derivation and root computation.
578    let overlay_ops_leaves = Location::<F>::new(inner.bounds.total_size);
579
580    // Distinguish three counters:
581    //   - new_complete_chunks: chunks with all bits filled in the post-batch bitmap
582    //   - graftable_overlay:      chunks committed by the grafted tree (have a single h=G ancestor)
583    //   - graftable_parent:       grafted-tree leaf count from the parent (structural source of truth)
584    //
585    // The pending chunk (if any) sits at index `graftable_overlay` and is excluded from the
586    // grafted tree; its digest is hashed directly into the canonical root.
587    let new_complete_chunks = overlay.complete_chunks();
588    let graftable_overlay = grafting::graftable_chunks::<F>(*overlay_ops_leaves, grafting_height)
589        .min(new_complete_chunks as u64) as usize;
590    let graftable_parent = *grafted_parent.leaves() as usize;
591    let pruned_chunks = bitmap_parent.pruned_chunks();
592    assert!(
593        pruned_chunks <= graftable_parent && graftable_parent <= graftable_overlay && graftable_overlay <= new_complete_chunks,
594        "invariant violated: pruned={pruned_chunks} graftable_parent={graftable_parent} graftable_overlay={graftable_overlay} new_complete={new_complete_chunks}"
595    );
596
597    // Build the set of chunk indices whose grafted-leaf needs (re)computing:
598    //   1) Dirty chunks (bits changed in this batch) within the graftable range.
599    //   2) Pending -> graftable transitions: chunks newly graftable because the ops tree built
600    //      their h=G ancestor in this batch. Their bitmap bytes may not be dirty (the chunk
601    //      became graftable via ops growth alone) but they need a grafted-leaf entry now.
602    let mut chunk_indices_to_update: BTreeSet<usize> = overlay
603        .chunks
604        .iter()
605        .filter(|(&idx, _)| idx < graftable_overlay && idx >= pruned_chunks)
606        .map(|(&idx, _)| idx)
607        .collect();
608    for idx in graftable_parent..graftable_overlay {
609        chunk_indices_to_update.insert(idx);
610    }
611    let chunks_to_update = chunk_indices_to_update.into_iter().map(|idx| {
612        let chunk = overlay
613            .get(idx)
614            .copied()
615            .unwrap_or_else(|| bitmap_parent.get_chunk(idx));
616        (idx, chunk)
617    });
618
619    let hasher = qmdb::hasher::<H>();
620    let new_leaves = compute_grafted_leaves::<F, H, S, N>(
621        &hasher,
622        &ops_tree_adapter,
623        chunks_to_update,
624        &current_db.strategy,
625    )
626    .await?;
627
628    // Build grafted MMR from parent batch.
629    let grafted_batch = {
630        let mut grafted_batch = grafted_parent.new_batch();
631        let old_grafted_leaves = *grafted_parent.leaves() as usize;
632        for &(chunk_idx, digest) in &new_leaves {
633            if chunk_idx < old_grafted_leaves {
634                grafted_batch = grafted_batch
635                    .update_leaf_digest(Location::<F>::new(chunk_idx as u64), digest)
636                    .expect("update_leaf_digest failed");
637            } else {
638                grafted_batch = grafted_batch.add_leaf_digest(digest);
639            }
640        }
641        let gh = grafting::GraftedHasher::<F, _>::new(hasher.clone(), grafting_height);
642        grafted_batch.merkleize(&current_db.grafted_tree, &gh)
643    };
644
645    // Build the layered bitmap (parent + overlay) before computing the canonical root, so that
646    // compute_db_root sees newly completed chunks. Using bitmap_parent alone would miss chunks
647    // that transitioned from partial to complete in this batch.
648    let bitmap_batch = BitmapBatch::Layer(Arc::new(BitmapBatchLayer {
649        parent: bitmap_parent.clone(),
650        overlay: Arc::new(overlay),
651        shared: Arc::clone(bitmap_parent.shared()),
652    }));
653
654    // Compute canonical root. The grafted batch alone cannot resolve committed nodes,
655    // so layer it over the committed grafted MMR.
656    let ops_root = inner.root();
657    let layered = BatchOverMem {
658        batch: &grafted_batch,
659        mem: &current_db.grafted_tree,
660    };
661    let grafted_storage =
662        grafting::Storage::new(&layered, grafting_height, &ops_tree_adapter, hasher.clone());
663    // Compute partial chunk (last incomplete chunk, if any). The partial chunk lives at
664    // index `new_complete_chunks` (the chunk currently being filled with bits) -- distinct
665    // from `graftable_overlay` (the grafted-tree boundary). At gh >= 3, partial and pending can
666    // coexist; this branch only handles partial. The pending chunk (when present) is read
667    // from the bitmap inside `compute_db_root` via `pending_chunk()`.
668    let partial = {
669        let rem = bitmap_batch.len() % BitmapBatch::<N>::CHUNK_SIZE_BITS;
670        if rem == 0 {
671            None
672        } else {
673            let idx = new_complete_chunks;
674            let chunk = bitmap_batch.get_chunk(idx);
675            Some((chunk, rem))
676        }
677    };
678    let canonical_root = compute_db_root::<F, H, _, _, N>(
679        &hasher,
680        &bitmap_batch,
681        &grafted_storage,
682        overlay_ops_leaves,
683        partial,
684        inner.bounds.inactivity_floor,
685        &ops_root,
686    )
687    .await?;
688
689    Ok(Arc::new(MerkleizedBatch {
690        inner,
691        grafted: grafted_batch,
692        bitmap: bitmap_batch,
693        canonical_root,
694    }))
695}
696
697/// A view of the committed bitmap plus zero or more speculative overlay `Layer`s.
698///
699/// The chain terminates in a `Base` that references the shared committed bitmap. No validity
700/// check is performed. Callers must ensure they only read through batches whose chains are
701/// still valid prefixes of committed state (see [`Shared`]'s docs).
702#[derive(Clone, Debug)]
703pub(crate) enum BitmapBatch<const N: usize> {
704    /// Chain terminal: shared reference to the committed bitmap.
705    Base(Arc<Shared<N>>),
706    /// Speculative layer on top of a parent batch.
707    Layer(Arc<BitmapBatchLayer<N>>),
708}
709
710/// The data behind a [`BitmapBatch::Layer`].
711#[derive(Debug)]
712pub(crate) struct BitmapBatchLayer<const N: usize> {
713    pub(crate) parent: BitmapBatch<N>,
714    /// Chunk-level overlay: materialized bytes for every chunk that differs from parent.
715    pub(crate) overlay: Arc<ChunkOverlay<N>>,
716    /// Cached terminal [`Shared`] so [`BitmapBatch::shared`] and
717    /// [`BitmapBatch::pruned_chunks`] answer in O(1) instead of walking the chain.
718    pub(crate) shared: Arc<Shared<N>>,
719}
720
721impl<const N: usize> BitmapBatch<N> {
722    const CHUNK_SIZE_BITS: u64 = bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
723
724    /// Return the terminal [`Shared`] at the bottom of the chain.
725    fn shared(&self) -> &Arc<Shared<N>> {
726        match self {
727            Self::Base(s) => s,
728            Self::Layer(layer) => &layer.shared,
729        }
730    }
731
732    /// Return a chain equivalent to `self` with any `Layer` whose overlay is now fully committed
733    /// replaced by a direct reference to the committed bitmap. Since `apply_batch` commits
734    /// contiguous prefixes, committed `Layer`s are always at the bottom of the chain.
735    fn trim_committed(&self) -> Self {
736        let shared = self.shared();
737        let committed = bitmap::Readable::<N>::len(shared.as_ref());
738        let mut kept = Vec::new();
739        let mut current = self;
740        while let Self::Layer(layer) = current {
741            if layer.overlay.len <= committed {
742                break;
743            }
744            kept.push(Arc::clone(&layer.overlay));
745            current = &layer.parent;
746        }
747        let mut result = Self::Base(Arc::clone(shared));
748        for overlay in kept.into_iter().rev() {
749            result = Self::Layer(Arc::new(BitmapBatchLayer {
750                parent: result,
751                overlay,
752                shared: Arc::clone(shared),
753            }));
754        }
755        result
756    }
757}
758
759impl<const N: usize> bitmap::Readable<N> for BitmapBatch<N> {
760    fn complete_chunks(&self) -> usize {
761        (self.len() / Self::CHUNK_SIZE_BITS) as usize
762    }
763
764    fn get_chunk(&self, idx: usize) -> [u8; N] {
765        // Walk the layer chain. Each layer's overlay either holds the chunk (return it) or
766        // doesn't (descend).
767        let mut current = self;
768        loop {
769            match current {
770                Self::Base(shared) => return shared.get_chunk(idx),
771                Self::Layer(layer) => {
772                    if let Some(&chunk) = layer.overlay.get(idx) {
773                        return chunk;
774                    }
775                    current = &layer.parent;
776                }
777            }
778        }
779    }
780
781    fn last_chunk(&self) -> ([u8; N], u64) {
782        let total = self.len();
783        if total == 0 {
784            return ([0u8; N], 0);
785        }
786        let rem = total % Self::CHUNK_SIZE_BITS;
787        let bits_in_last = if rem == 0 { Self::CHUNK_SIZE_BITS } else { rem };
788        let idx = if rem == 0 {
789            self.complete_chunks().saturating_sub(1)
790        } else {
791            self.complete_chunks()
792        };
793        (self.get_chunk(idx), bits_in_last)
794    }
795
796    fn pruned_chunks(&self) -> usize {
797        self.shared().pruned_chunks()
798    }
799
800    fn len(&self) -> u64 {
801        match self {
802            Self::Base(shared) => bitmap::Readable::<N>::len(shared.as_ref()),
803            Self::Layer(layer) => layer.overlay.len,
804        }
805    }
806}
807
808impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize, S: Strategy>
809    MerkleizedBatch<F, D, U, N, S>
810where
811    Operation<F, U>: Send + Sync,
812{
813    /// Return the canonical root.
814    pub const fn root(&self) -> D {
815        self.canonical_root
816    }
817
818    /// Return the QMDB ops-only root.
819    pub fn ops_root(&self) -> D {
820        self.inner.root()
821    }
822
823    /// Return the [`Bounds`] of the batch.
824    pub fn bounds(&self) -> &Bounds<F> {
825        self.inner.bounds()
826    }
827
828    /// Return the batch's safe sync boundary.
829    pub fn sync_boundary(&self) -> Location<F> {
830        super::db::sync_boundary::<F, N>(
831            self.bitmap.pruned_chunks() as u64,
832            self.inner.bounds().total_size,
833        )
834    }
835}
836
837impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize, S: Strategy>
838    MerkleizedBatch<F, D, U, N, S>
839where
840    Operation<F, U>: Codec,
841{
842    /// Create a new speculative batch of operations with this batch as its parent.
843    ///
844    /// All uncommitted ancestors in the chain must be kept alive until the child (or any
845    /// descendant) is merkleized. Dropping an uncommitted ancestor causes data
846    /// loss detected at `apply_batch` time.
847    ///
848    /// This is only valid while `self` is still on the winning branch. If a different branch has
849    /// been applied since `self` was created, `self` is no longer a valid parent and must not be
850    /// extended.
851    pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, U, N, S>
852    where
853        H: Hasher<Digest = D>,
854    {
855        UnmerkleizedBatch::new(
856            self.inner.new_batch::<H>(),
857            Arc::clone(&self.grafted),
858            self.bitmap.trim_committed(),
859        )
860    }
861
862    /// Read through: local diff -> ancestor diffs -> committed DB.
863    ///
864    /// This is only valid while `self` remains on the committed prefix. If a non-ancestor batch
865    /// has been applied since `self` was merkleized, do not read through it.
866    pub async fn get<E, C, I, H>(
867        &self,
868        key: &U::Key,
869        db: &super::db::Db<F, E, C, I, H, U, N, S>,
870    ) -> Result<Option<U::Value>, Error<F>>
871    where
872        E: Context,
873        C: Contiguous<Item = Operation<F, U>>,
874        I: UnorderedIndex<Value = Location<F>> + 'static,
875        H: Hasher<Digest = D>,
876    {
877        self.inner.get(key, &db.any).await
878    }
879
880    /// Batch read multiple keys.
881    ///
882    /// Returns results in the same order as the input keys.
883    pub async fn get_many<E, C, I, H>(
884        &self,
885        keys: &[&U::Key],
886        db: &super::db::Db<F, E, C, I, H, U, N, S>,
887    ) -> Result<Vec<Option<U::Value>>, Error<F>>
888    where
889        E: Context,
890        C: Contiguous<Item = Operation<F, U>>,
891        I: UnorderedIndex<Value = Location<F>> + 'static,
892        H: Hasher<Digest = D>,
893    {
894        self.inner.get_many(keys, &db.any).await
895    }
896}
897
898impl<F, E, C, I, H, U, const N: usize, S> super::db::Db<F, E, C, I, H, U, N, S>
899where
900    F: Graftable,
901    E: Context,
902    U: update::Update + Send + Sync,
903    C: Contiguous<Item = Operation<F, U>>,
904    I: UnorderedIndex<Value = Location<F>>,
905    H: Hasher,
906    S: Strategy,
907    Operation<F, U>: Codec,
908{
909    /// Create an initial [`MerkleizedBatch`] from the current committed DB state.
910    ///
911    /// The returned batch is rooted at the current committed prefix, but it is not a persistent
912    /// snapshot across later divergent commits. If some other branch is applied afterward, this
913    /// batch is no longer valid and must not be read through, extended, or applied.
914    pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, U, N, S>> {
915        let grafted = self.grafted_snapshot();
916        Arc::new(MerkleizedBatch {
917            inner: self.any.to_batch(),
918            grafted,
919            bitmap: BitmapBatch::Base(Arc::clone(&self.any.bitmap)),
920            canonical_root: self.root,
921        })
922    }
923}
924
925#[cfg(any(test, feature = "test-traits"))]
926mod trait_impls {
927    use super::*;
928    use crate::{
929        journal::contiguous::Mutable,
930        qmdb::any::traits::{
931            BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
932            UnmerkleizedBatch as UnmerkleizedBatchTrait,
933        },
934        Persistable,
935    };
936    use std::future::Future;
937
938    type CurrentDb<F, E, C, I, H, U, const N: usize, S> =
939        crate::qmdb::current::db::Db<F, E, C, I, H, U, N, S>;
940
941    impl<F, K, V, H, E, C, I, const N: usize, S>
942        UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N, S>>
943        for UnmerkleizedBatch<F, H, update::Unordered<K, V>, N, S>
944    where
945        F: Graftable,
946        K: Key,
947        V: ValueEncoding + 'static,
948        H: Hasher,
949        E: Context,
950        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
951            + Persistable<Error = crate::journal::Error>,
952        I: UnorderedIndex<Value = Location<F>> + 'static,
953        S: Strategy,
954        Operation<F, update::Unordered<K, V>>: Codec,
955    {
956        type Family = F;
957        type K = K;
958        type V = V::Value;
959        type Metadata = V::Value;
960        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N, S>>;
961
962        fn write(self, key: K, value: Option<V::Value>) -> Self {
963            Self::write(self, key, value)
964        }
965
966        async fn merkleize(
967            self,
968            db: &CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N, S>,
969            metadata: Option<V::Value>,
970        ) -> Result<Self::Merkleized, crate::qmdb::Error<F>> {
971            self.merkleize(db, metadata).await
972        }
973    }
974
975    impl<F, K, V, H, E, C, I, const N: usize, S>
976        UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N, S>>
977        for UnmerkleizedBatch<F, H, update::Ordered<K, V>, N, S>
978    where
979        F: Graftable,
980        K: Key,
981        V: ValueEncoding + 'static,
982        H: Hasher,
983        E: Context,
984        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
985            + Persistable<Error = crate::journal::Error>,
986        I: crate::index::Ordered<Value = Location<F>> + 'static,
987        S: Strategy,
988        Operation<F, update::Ordered<K, V>>: Codec,
989    {
990        type Family = F;
991        type K = K;
992        type V = V::Value;
993        type Metadata = V::Value;
994        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N, S>>;
995
996        fn write(self, key: K, value: Option<V::Value>) -> Self {
997            Self::write(self, key, value)
998        }
999
1000        async fn merkleize(
1001            self,
1002            db: &CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N, S>,
1003            metadata: Option<V::Value>,
1004        ) -> Result<Self::Merkleized, crate::qmdb::Error<F>> {
1005            self.merkleize(db, metadata).await
1006        }
1007    }
1008
1009    impl<
1010            F: Graftable,
1011            D: Digest,
1012            U: update::Update + Send + Sync + 'static,
1013            const N: usize,
1014            S: Strategy,
1015        > MerkleizedBatchTrait for Arc<MerkleizedBatch<F, D, U, N, S>>
1016    where
1017        Operation<F, U>: Codec,
1018    {
1019        type Digest = D;
1020
1021        fn root(&self) -> D {
1022            MerkleizedBatch::root(self)
1023        }
1024    }
1025
1026    impl<F, E, K, V, C, I, H, const N: usize, S> BatchableDb
1027        for CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N, S>
1028    where
1029        F: Graftable,
1030        E: Context,
1031        K: Key,
1032        V: ValueEncoding + 'static,
1033        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
1034            + Persistable<Error = crate::journal::Error>,
1035        I: UnorderedIndex<Value = Location<F>> + 'static,
1036        H: Hasher,
1037        S: Strategy,
1038        Operation<F, update::Unordered<K, V>>: Codec,
1039    {
1040        type Family = F;
1041        type K = K;
1042        type V = V::Value;
1043        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N, S>>;
1044        type Batch = UnmerkleizedBatch<F, H, update::Unordered<K, V>, N, S>;
1045
1046        fn new_batch(&self) -> Self::Batch {
1047            self.new_batch()
1048        }
1049
1050        fn apply_batch(
1051            &mut self,
1052            batch: Self::Merkleized,
1053        ) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
1054        {
1055            self.apply_batch(batch)
1056        }
1057    }
1058
1059    impl<F, E, K, V, C, I, H, const N: usize, S> BatchableDb
1060        for CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N, S>
1061    where
1062        F: Graftable,
1063        E: Context,
1064        K: Key,
1065        V: ValueEncoding + 'static,
1066        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
1067            + Persistable<Error = crate::journal::Error>,
1068        I: crate::index::Ordered<Value = Location<F>> + 'static,
1069        H: Hasher,
1070        S: Strategy,
1071        Operation<F, update::Ordered<K, V>>: Codec,
1072    {
1073        type Family = F;
1074        type K = K;
1075        type V = V::Value;
1076        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N, S>>;
1077        type Batch = UnmerkleizedBatch<F, H, update::Ordered<K, V>, N, S>;
1078
1079        fn new_batch(&self) -> Self::Batch {
1080            self.new_batch()
1081        }
1082
1083        fn apply_batch(
1084            &mut self,
1085            batch: Self::Merkleized,
1086        ) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
1087        {
1088            self.apply_batch(batch)
1089        }
1090    }
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096    use crate::mmr;
1097    use commonware_utils::bitmap::Prunable as BitMap;
1098
1099    // N=4 -> CHUNK_SIZE_BITS = 32
1100    const N: usize = 4;
1101    type Bm = BitMap<N>;
1102    type Location = mmr::Location;
1103
1104    fn make_bitmap(bits: &[bool]) -> Bm {
1105        let mut bm = Bm::new();
1106        for &b in bits {
1107            bm.push(b);
1108        }
1109        bm
1110    }
1111
1112    // ---- build_chunk_overlay tests ----
1113
1114    #[test]
1115    fn chunk_overlay_pushes() {
1116        use crate::qmdb::any::value::FixedEncoding;
1117        use commonware_utils::sequence::FixedBytes;
1118
1119        type K = FixedBytes<4>;
1120        type V = FixedEncoding<u64>;
1121        type U = crate::qmdb::any::operation::update::Unordered<K, V>;
1122
1123        let key1 = FixedBytes::from([1, 0, 0, 0]);
1124        let key2 = FixedBytes::from([2, 0, 0, 0]);
1125
1126        // Base: 4 bits, all set (previous commit at loc 3).
1127        // Segment of 4 operations starting at base_size=4.
1128        // Diff: key1 active at loc=4 (in batch), key2 active at loc=99 (not in batch,
1129        // so superseded within this batch).
1130        let base = make_bitmap(&[true; 4]);
1131        let mut diff = vec![
1132            (
1133                key1,
1134                DiffEntry::Active {
1135                    value: 100u64,
1136                    loc: Location::new(4), // offset 0 in batch
1137                    base_old_loc: None,
1138                },
1139            ),
1140            (
1141                key2,
1142                DiffEntry::Active {
1143                    value: 200u64,
1144                    loc: Location::new(99), // not in batch [4,8), so superseded
1145                    base_old_loc: None,
1146                },
1147            ),
1148        ];
1149        diff.sort_by(|a, b| a.0.cmp(&b.0));
1150
1151        let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 4, 4, &diff, &[]);
1152
1153        // Chunk 0 should have: bits 0-3 from base (all set), bit 4 set (key1), bits 5-6 false
1154        // (inactive), bit 7 set (CommitFloor at loc 7). Also bit 3 cleared (previous commit).
1155        let c0 = overlay.get(0).expect("chunk 0 should be dirty");
1156        assert_ne!(c0[0] & (1 << 4), 0); // key1 active
1157        assert_eq!(c0[0] & (1 << 5), 0); // inactive
1158        assert_eq!(c0[0] & (1 << 6), 0); // inactive
1159        assert_ne!(c0[0] & (1 << 7), 0); // CommitFloor
1160        assert_eq!(c0[0] & (1 << 3), 0); // previous commit cleared
1161    }
1162
1163    #[test]
1164    fn chunk_overlay_clears() {
1165        use crate::qmdb::any::value::FixedEncoding;
1166        use commonware_utils::sequence::FixedBytes;
1167
1168        type K = FixedBytes<4>;
1169        type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
1170
1171        let key1 = FixedBytes::from([1, 0, 0, 0]);
1172        let key2 = FixedBytes::from([2, 0, 0, 0]);
1173        let key3 = FixedBytes::from([3, 0, 0, 0]);
1174
1175        // Base bitmap with 64 bits, all set.
1176        let base = make_bitmap(&[true; 64]);
1177
1178        let mut diff: Vec<(K, DiffEntry<mmr::Family, u64>)> = vec![
1179            (
1180                key1,
1181                DiffEntry::Active {
1182                    value: 100,
1183                    loc: Location::new(70),
1184                    base_old_loc: Some(Location::new(5)),
1185                },
1186            ),
1187            (
1188                key2,
1189                DiffEntry::Deleted {
1190                    base_old_loc: Some(Location::new(10)),
1191                },
1192            ),
1193            (
1194                key3,
1195                DiffEntry::Active {
1196                    value: 300,
1197                    loc: Location::new(71),
1198                    base_old_loc: None,
1199                },
1200            ),
1201        ];
1202        diff.sort_by(|a, b| a.0.cmp(&b.0));
1203
1204        // Segment of 8 ops starting at 64; previous commit at loc 63.
1205        let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 8, 64, &diff, &[]);
1206
1207        // Verify bits 5 and 10 are cleared in chunk 0.
1208        let c0 = overlay.get(0).expect("chunk 0 should be dirty");
1209        assert_eq!(c0[0] & (1 << 5), 0); // bit 5 cleared
1210        assert_eq!(c0[1] & (1 << 2), 0); // bit 10 = byte 1, bit 2 cleared
1211
1212        // Other bits should still be set.
1213        assert_eq!(c0[0] & (1 << 4), 1 << 4); // bit 4 still set
1214        assert_eq!(c0[1] & (1 << 3), 1 << 3); // bit 11 still set
1215    }
1216
1217    /// Regression: when the parent bitmap has a partial last chunk that becomes complete in the
1218    /// child (without any active bits landing in that chunk), the overlay must inherit the parent's
1219    /// partial chunk data, not zero it out.
1220    #[test]
1221    fn chunk_overlay_preserves_partial_parent_chunk() {
1222        use crate::qmdb::any::value::FixedEncoding;
1223        use commonware_utils::sequence::FixedBytes;
1224
1225        type K = FixedBytes<4>;
1226        type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
1227
1228        // Base: 20 bits set (partial chunk 0, CHUNK_SIZE_BITS=32).
1229        let base = make_bitmap(&[true; 20]);
1230        assert_eq!(base.complete_chunks(), 0); // partial
1231
1232        // Segment of 20 ops starting at loc 20. This pushes total to 40 bits, completing chunk 0
1233        // (32 bits) and starting chunk 1. Diff: only one active key at loc 35 (in chunk 1), plus
1234        // CommitFloor at loc 39. No active bits land in chunk 0's new region (bits 20-31).
1235        let key1 = FixedBytes::from([1, 0, 0, 0]);
1236        let mut diff = vec![(
1237            key1,
1238            DiffEntry::Active {
1239                value: 42u64,
1240                loc: Location::new(35),
1241                base_old_loc: None,
1242            },
1243        )];
1244        diff.sort_by(|a, b| a.0.cmp(&b.0));
1245
1246        let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 20, 20, &diff, &[]);
1247
1248        // Chunk 0 should be materialized and preserve the parent's first 20 bits.
1249        let c0 = overlay.get(0).expect("chunk 0 should be in overlay");
1250        // Bits 0-7 all set -> byte 0 = 0xFF
1251        assert_eq!(c0[0], 0xFF);
1252        // Bits 8-15 all set -> byte 1 = 0xFF
1253        assert_eq!(c0[1], 0xFF);
1254        // Bits 16-18 set, bit 19 cleared (previous commit), 20-23 not set -> byte 2 = 0x07
1255        assert_eq!(c0[2], 0x07);
1256    }
1257
1258    // ---- next_candidate tests ----
1259
1260    #[test]
1261    fn bitmap_scan_all_active() {
1262        let bm = make_bitmap(&[true; 8]);
1263        for i in 0..8 {
1264            assert_eq!(
1265                next_candidate(&bm, Location::new(i), 8),
1266                Some(Location::new(i))
1267            );
1268        }
1269        assert_eq!(next_candidate(&bm, Location::new(8), 8), None);
1270    }
1271
1272    #[test]
1273    fn bitmap_scan_all_inactive() {
1274        let bm = make_bitmap(&[false; 8]);
1275        assert_eq!(next_candidate(&bm, Location::new(0), 8), None);
1276    }
1277
1278    #[test]
1279    fn bitmap_scan_skips_inactive() {
1280        // Pattern: inactive, inactive, active, inactive, active
1281        let bm = make_bitmap(&[false, false, true, false, true]);
1282        assert_eq!(
1283            next_candidate(&bm, Location::new(0), 5),
1284            Some(Location::new(2))
1285        );
1286        assert_eq!(
1287            next_candidate(&bm, Location::new(3), 5),
1288            Some(Location::new(4))
1289        );
1290        assert_eq!(next_candidate(&bm, Location::new(5), 5), None);
1291    }
1292
1293    #[test]
1294    fn bitmap_scan_beyond_bitmap_len_returns_candidate() {
1295        // Bitmap has 4 bits, but tip is 8. Locations 4..8 are beyond the bitmap and should be
1296        // returned as candidates.
1297        let bm = make_bitmap(&[false; 4]);
1298        // All bitmap bits are unset, so 0..4 are skipped; loc 4 is beyond bitmap -> candidate.
1299        assert_eq!(
1300            next_candidate(&bm, Location::new(0), 8),
1301            Some(Location::new(4))
1302        );
1303        assert_eq!(
1304            next_candidate(&bm, Location::new(6), 8),
1305            Some(Location::new(6))
1306        );
1307    }
1308
1309    #[test]
1310    fn bitmap_scan_respects_tip() {
1311        let bm = make_bitmap(&[false, false, false, true]);
1312        // Active bit at 3, but tip is 3 so it's excluded.
1313        assert_eq!(next_candidate(&bm, Location::new(0), 3), None);
1314        // With tip=4, bit 3 is included.
1315        assert_eq!(
1316            next_candidate(&bm, Location::new(0), 4),
1317            Some(Location::new(3))
1318        );
1319    }
1320
1321    #[test]
1322    fn bitmap_scan_floor_at_tip() {
1323        let bm = make_bitmap(&[true; 4]);
1324        assert_eq!(next_candidate(&bm, Location::new(4), 4), None);
1325    }
1326
1327    #[test]
1328    fn bitmap_scan_empty_bitmap() {
1329        let bm = Bm::new();
1330        // Empty bitmap, but tip > 0: all locations are beyond bitmap.
1331        assert_eq!(
1332            next_candidate(&bm, Location::new(0), 5),
1333            Some(Location::new(0))
1334        );
1335        // Empty bitmap, tip = 0: no candidates.
1336        assert_eq!(next_candidate(&bm, Location::new(0), 0), None);
1337    }
1338
1339    // ---- trim_committed tests ----
1340    //
1341    // `trim_committed` is called from `MerkleizedBatch::new_batch` to strip any `Layer`s whose
1342    // overlays have already been absorbed into the shared committed bitmap by a prior apply.
1343    // The implementation is a single loop that collects uncommitted overlays top-down and
1344    // rebuilds a fresh chain rooted at `Base`. These tests cover distinct input shapes directly,
1345    // without going through the full Db/batch machinery, so the function's structural output
1346    // can be asserted.
1347
1348    /// Build a chain `Base(shared) -> Layer(len=L1) -> Layer(len=L2) -> ...` from a list of
1349    /// overlay lengths (bottom to top). Each constructed `Layer` caches `shared` per the
1350    /// struct's invariant.
1351    fn make_chain(shared: &Arc<Shared<N>>, overlay_lens: &[u64]) -> BitmapBatch<N> {
1352        let mut chain = BitmapBatch::Base(Arc::clone(shared));
1353        for &len in overlay_lens {
1354            chain = BitmapBatch::Layer(Arc::new(BitmapBatchLayer {
1355                parent: chain,
1356                overlay: Arc::new(ChunkOverlay::new(len)),
1357                shared: Arc::clone(shared),
1358            }));
1359        }
1360        chain
1361    }
1362
1363    /// Walk a chain and return its overlay lengths in bottom-to-top order. Used to assert the
1364    /// structural output of `trim_committed` without touching private fields. Panics if the
1365    /// chain isn't terminated by a single `Base` at the bottom.
1366    fn chain_overlays(batch: &BitmapBatch<N>) -> Vec<u64> {
1367        let mut lens = Vec::new();
1368        let mut current = batch;
1369        while let BitmapBatch::Layer(layer) = current {
1370            lens.push(layer.overlay.len);
1371            current = &layer.parent;
1372        }
1373        assert!(matches!(current, BitmapBatch::Base(_)));
1374        lens.reverse();
1375        lens
1376    }
1377
1378    /// Input is already a bare `Base` with no speculative layers on top -- the loop body never
1379    /// runs, `kept` stays empty, and the result is a freshly constructed `Base` pointing at the
1380    /// same `Shared`. Real-world trigger: `MerkleizedBatch::new_batch` on a batch whose
1381    /// chain was previously trimmed flat (e.g., immediately after an apply collapsed everything).
1382    #[test]
1383    fn trim_committed_already_base() {
1384        let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
1385        let base = BitmapBatch::Base(Arc::clone(&shared));
1386        let result = base.trim_committed();
1387        // Still `Base`, pointing at the same shared terminal.
1388        match result {
1389            BitmapBatch::Base(s) => assert!(Arc::ptr_eq(&s, &shared)),
1390            BitmapBatch::Layer(_) => panic!("expected Base"),
1391        }
1392    }
1393
1394    /// Every layer has been absorbed by prior applies -- the loop breaks on the first iteration
1395    /// and `kept` stays empty, so the result is a bare `Base`. This is the steady-state
1396    /// "extend a just-applied batch" flow: after `apply_batch(A)`, `A`'s own layer has
1397    /// `overlay.len == committed` and the next `new_batch` call should start from a clean
1398    /// terminal.
1399    #[test]
1400    fn trim_committed_all_committed() {
1401        // `shared.len() == 64`; the single layer's `overlay.len == 32 (<= 64)`, so it's committed.
1402        let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
1403        let chain = make_chain(&shared, &[32]);
1404        let result = chain.trim_committed();
1405        // Collapsed to a bare Base, pointing at the original shared.
1406        match result {
1407            BitmapBatch::Base(s) => assert!(Arc::ptr_eq(&s, &shared)),
1408            BitmapBatch::Layer(_) => panic!("expected Base after full trim"),
1409        }
1410    }
1411
1412    /// Every layer is still speculative -- the loop walks all the way to `Base` without
1413    /// breaking, and `kept` holds every overlay. The rebuilt chain is structurally equivalent
1414    /// to the input (same overlay lens, same shared terminal). Real-world trigger: speculating
1415    /// multiple batches deep (A, then B off A, then C off B) without `apply_batch` in between.
1416    #[test]
1417    fn trim_committed_none_committed() {
1418        // `shared.len() == 32`; both overlays have `len > 32`, so neither is committed.
1419        let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 32])));
1420        let chain = make_chain(&shared, &[64, 96]);
1421        let result = chain.trim_committed();
1422        // Structure must be preserved in bottom-to-top order.
1423        assert_eq!(chain_overlays(&result), vec![64, 96]);
1424    }
1425
1426    /// Exactly one layer is uncommitted (the newest) on top of a committed prefix -- the
1427    /// dominant pattern in chained growth. The loop collects the one uncommitted overlay, and
1428    /// the rebuild produces `Layer(Base, overlay_B)`. Also verifies the rebuilt layer carries
1429    /// the cached `shared` reference correctly. Real-world trigger: apply parent A, then B
1430    /// held alive off A, then `B.new_batch()` to build C.
1431    #[test]
1432    fn trim_committed_exactly_one_uncommitted() {
1433        // `shared.len() == 64`; committed layer (`overlay.len == 64`) + uncommitted (`96`).
1434        let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
1435        let chain = make_chain(&shared, &[64, 96]);
1436        let result = chain.trim_committed();
1437        // The committed layer is gone; only the uncommitted overlay remains.
1438        assert_eq!(chain_overlays(&result), vec![96]);
1439        // And the rebuilt layer's `shared` field still points at the original terminal.
1440        assert!(Arc::ptr_eq(result.shared(), &shared));
1441    }
1442
1443    /// Two or more uncommitted layers on top of a committed prefix -- exercises the loop's
1444    /// iterated `kept.push` and the rebuild's iterated `Arc::new(BitmapBatchLayer)`, including
1445    /// the cached `shared` wire-through on every reconstructed layer. Real-world trigger:
1446    /// build A, then B off A, then C off B; apply only A; then call `C.new_batch()`.
1447    #[test]
1448    fn trim_committed_multiple_uncommitted() {
1449        // `shared.len() == 64`; committed layer (64), then two uncommitted (96, 128).
1450        let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
1451        let chain = make_chain(&shared, &[64, 96, 128]);
1452        let result = chain.trim_committed();
1453        // Committed layer dropped; uncommitted pair preserved in order.
1454        assert_eq!(chain_overlays(&result), vec![96, 128]);
1455        // Every reconstructed layer must still cache the original shared terminal.
1456        assert!(Arc::ptr_eq(result.shared(), &shared));
1457    }
1458}