Skip to main content

commonware_storage/qmdb/current/
batch.rs

1//! Batch mutation API for Current QMDBs.
2//!
3//! Wraps the [`any::batch`] API.
4
5use crate::{
6    index::Unordered as UnorderedIndex,
7    journal::contiguous::{Contiguous, Mutable},
8    merkle::{
9        self, hasher::Standard as StandardHasher, storage::Storage as MerkleStorage, Graftable,
10        Location, Position, Readable,
11    },
12    qmdb::{
13        any::{
14            self,
15            batch::{DiffEntry, FloorScan},
16            operation::{update, Operation},
17            ValueEncoding,
18        },
19        current::{
20            db::{compute_db_root, compute_grafted_leaves},
21            grafting,
22        },
23        operation::Key,
24        Error,
25    },
26    Context,
27};
28use commonware_codec::Codec;
29use commonware_cryptography::{Digest, Hasher};
30use commonware_utils::bitmap::{Prunable as BitMap, Readable as BitmapReadable};
31use std::{collections::BTreeMap, sync::Arc};
32
33/// Speculative chunk-level bitmap overlay.
34///
35/// Instead of tracking individual pushed bits and cleared locations, maintains materialized chunk
36/// bytes for every chunk that differs from the parent bitmap. This directly produces the chunk data
37/// needed for grafted MMR leaf computation.
38#[derive(Clone, Debug, Default)]
39pub(crate) struct ChunkOverlay<const N: usize> {
40    /// Dirty chunks: chunk_idx -> materialized chunk bytes.
41    pub(crate) chunks: BTreeMap<usize, [u8; N]>,
42    /// Total number of bits (parent + new operations).
43    pub(crate) len: u64,
44}
45
46impl<const N: usize> ChunkOverlay<N> {
47    const CHUNK_BITS: u64 = BitMap::<N>::CHUNK_SIZE_BITS;
48
49    const fn new(len: u64) -> Self {
50        Self {
51            chunks: BTreeMap::new(),
52            len,
53        }
54    }
55
56    /// Load-or-create a chunk: returns a mutable reference to the materialized chunk bytes. On
57    /// first access for an existing chunk, reads from `base`.
58    fn chunk_mut<B: BitmapReadable<N>>(&mut self, base: &B, idx: usize) -> &mut [u8; N] {
59        self.chunks.entry(idx).or_insert_with(|| {
60            let base_len = base.len();
61            let base_complete = base.complete_chunks();
62            let base_has_partial = !base_len.is_multiple_of(Self::CHUNK_BITS);
63            if idx < base_complete {
64                base.get_chunk(idx)
65            } else if idx == base_complete && base_has_partial {
66                base.last_chunk().0
67            } else {
68                [0u8; N]
69            }
70        })
71    }
72
73    /// Set a single bit (used for pushes and active operations).
74    fn set_bit<B: BitmapReadable<N>>(&mut self, base: &B, loc: u64) {
75        let idx = BitMap::<N>::to_chunk_index(loc);
76        let rel = (loc % Self::CHUNK_BITS) as usize;
77        let chunk = self.chunk_mut(base, idx);
78        chunk[rel / 8] |= 1 << (rel % 8);
79    }
80
81    /// Clear a single bit (used for superseded locations).
82    /// Skips locations in pruned chunks — those bits are already inactive.
83    fn clear_bit<B: BitmapReadable<N>>(&mut self, base: &B, loc: u64) {
84        let idx = BitMap::<N>::to_chunk_index(loc);
85        if idx < base.pruned_chunks() {
86            return;
87        }
88        let rel = (loc % Self::CHUNK_BITS) as usize;
89        let chunk = self.chunk_mut(base, idx);
90        chunk[rel / 8] &= !(1 << (rel % 8));
91    }
92
93    /// Get a dirty chunk's bytes, or `None` if unmodified.
94    pub(crate) fn get(&self, idx: usize) -> Option<&[u8; N]> {
95        self.chunks.get(&idx)
96    }
97
98    /// Number of complete chunks.
99    pub(crate) const fn complete_chunks(&self) -> usize {
100        (self.len / Self::CHUNK_BITS) as usize
101    }
102}
103
104/// Bitmap-accelerated floor scan. Skips locations where the bitmap bit is
105/// unset, avoiding I/O reads for inactive operations.
106pub(crate) struct BitmapScan<'a, B, const N: usize> {
107    bitmap: &'a B,
108}
109
110impl<'a, B: BitmapReadable<N>, const N: usize> BitmapScan<'a, B, N> {
111    pub(crate) const fn new(bitmap: &'a B) -> Self {
112        Self { bitmap }
113    }
114}
115
116impl<F: Graftable, B: BitmapReadable<N>, const N: usize> FloorScan<F> for BitmapScan<'_, B, N> {
117    fn next_candidate(&mut self, floor: Location<F>, tip: u64) -> Option<Location<F>> {
118        let loc = *floor;
119        if loc >= tip {
120            return None;
121        }
122        let bitmap_len = self.bitmap.len();
123        // Within the bitmap: find the next set bit at or after floor. ones_iter_from returns
124        // set indices in ascending order so the first result is the only possible candidate
125        // below bound. tip >= bitmap_len always holds (base_size == bitmap_parent.len()), so
126        // bound == bitmap_len and the length check inside the iterator prevents scanning past
127        // bound.
128        if loc < bitmap_len {
129            let bound = bitmap_len.min(tip);
130            if let Some(idx) = self.bitmap.ones_iter_from(loc).next() {
131                if idx < bound {
132                    return Some(Location::<F>::new(idx));
133                }
134            }
135        }
136        // Beyond the bitmap: uncommitted ops from prior batches in the chain that aren't
137        // tracked by the bitmap yet. Conservatively treat them as candidates.
138        if bitmap_len < tip {
139            let candidate = loc.max(bitmap_len);
140            if candidate < tip {
141                return Some(Location::<F>::new(candidate));
142            }
143        }
144        None
145    }
146}
147
148/// Adapter that resolves ops MMR nodes for a batch's `compute_current_layer`.
149///
150/// Tries the batch chain's sync [`Readable`] first (which covers nodes appended or overwritten
151/// by the batch, plus anything still in the in-memory MMR). Falls through to the base's async
152/// [`MerkleStorage`].
153struct BatchStorageAdapter<
154    'a,
155    F: Graftable,
156    D: Digest,
157    R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
158    S: MerkleStorage<F, Digest = D>,
159> {
160    batch: &'a R,
161    base: &'a S,
162    _phantom: core::marker::PhantomData<(F, D)>,
163}
164
165impl<
166        'a,
167        F: Graftable,
168        D: Digest,
169        R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
170        S: MerkleStorage<F, Digest = D>,
171    > BatchStorageAdapter<'a, F, D, R, S>
172{
173    const fn new(batch: &'a R, base: &'a S) -> Self {
174        Self {
175            batch,
176            base,
177            _phantom: core::marker::PhantomData,
178        }
179    }
180}
181
182impl<
183        F: Graftable,
184        D: Digest,
185        R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
186        S: MerkleStorage<F, Digest = D>,
187    > MerkleStorage<F> for BatchStorageAdapter<'_, F, D, R, S>
188{
189    type Digest = D;
190
191    async fn size(&self) -> Position<F> {
192        self.batch.size()
193    }
194    async fn get_node(&self, pos: Position<F>) -> Result<Option<D>, merkle::Error<F>> {
195        if let Some(node) = self.batch.get_node(pos) {
196            return Ok(Some(node));
197        }
198        self.base.get_node(pos).await
199    }
200}
201
202/// Layers a [`merkle::batch::MerkleizedBatch`] over a [`merkle::mem::Mem`] for node resolution.
203///
204/// [`merkle::batch::MerkleizedBatch::get_node`] only covers the batch chain; committed positions
205/// return `None`. This adapter falls through to the committed Mem for those positions.
206struct BatchOverMem<'a, F: Graftable, D: Digest> {
207    batch: &'a merkle::batch::MerkleizedBatch<F, D>,
208    mem: &'a merkle::mem::Mem<F, D>,
209}
210
211impl<F: Graftable, D: Digest> Readable for BatchOverMem<'_, F, D> {
212    type Family = F;
213    type Digest = D;
214    type Error = merkle::Error<F>;
215
216    fn size(&self) -> Position<F> {
217        self.batch.size()
218    }
219
220    fn get_node(&self, pos: Position<F>) -> Option<D> {
221        if let Some(d) = self.batch.get_node(pos) {
222            return Some(d);
223        }
224        self.mem.get_node(pos)
225    }
226
227    fn root(&self) -> D {
228        self.batch.root()
229    }
230
231    fn pruning_boundary(&self) -> Location<F> {
232        self.batch.pruning_boundary()
233    }
234
235    fn proof(
236        &self,
237        _hasher: &impl crate::merkle::hasher::Hasher<F, Digest = D>,
238        _loc: Location<F>,
239    ) -> Result<crate::merkle::Proof<F, D>, merkle::Error<F>> {
240        unreachable!("proof not used in compute_current_layer")
241    }
242
243    fn range_proof(
244        &self,
245        _hasher: &impl crate::merkle::hasher::Hasher<F, Digest = D>,
246        _range: core::ops::Range<Location<F>>,
247    ) -> Result<crate::merkle::Proof<F, D>, merkle::Error<F>> {
248        unreachable!("range_proof not used in compute_current_layer")
249    }
250}
251
252/// A speculative batch of mutations whose root digest has not yet been computed,
253/// in contrast to [`MerkleizedBatch`].
254///
255/// Wraps a [`any::batch::UnmerkleizedBatch`] and adds bitmap and grafted MMR parent state
256/// needed to compute the current layer during [`merkleize`](Self::merkleize).
257pub struct UnmerkleizedBatch<F, H, U, const N: usize>
258where
259    F: Graftable,
260    U: update::Update + Send + Sync,
261    H: Hasher,
262    Operation<F, U>: Codec,
263{
264    /// The inner any-layer batch that handles mutations, journal, and floor raise.
265    inner: any::batch::UnmerkleizedBatch<F, H, U>,
266
267    /// Parent's grafted MMR state.
268    grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest>>,
269
270    /// Parent's bitmap state (COW, Arc-based).
271    bitmap_parent: BitmapBatch<N>,
272}
273
274/// A speculative batch of operations whose root digest has been computed,
275/// in contrast to [`UnmerkleizedBatch`].
276///
277/// Wraps an [`any::batch::MerkleizedBatch`] and adds the bitmap and grafted MMR state needed
278/// to compute the canonical root.
279pub struct MerkleizedBatch<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize>
280where
281    Operation<F, U>: Send + Sync,
282{
283    /// Inner any-layer batch (ops MMR, diff, floor, commit loc, sizes).
284    pub(crate) inner: Arc<any::batch::MerkleizedBatch<F, D, U>>,
285
286    /// Grafted MMR state.
287    pub(crate) grafted: Arc<merkle::batch::MerkleizedBatch<F, D>>,
288
289    /// COW bitmap state (for use as a parent in speculative batches).
290    pub(crate) bitmap: BitmapBatch<N>,
291
292    /// The canonical root (ops root + grafted root + partial chunk).
293    pub(crate) canonical_root: D,
294}
295
296impl<F, H, U, const N: usize> UnmerkleizedBatch<F, H, U, N>
297where
298    F: Graftable,
299    U: update::Update + Send + Sync,
300    H: Hasher,
301    Operation<F, U>: Codec,
302{
303    pub(super) const fn new(
304        inner: any::batch::UnmerkleizedBatch<F, H, U>,
305        grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest>>,
306        bitmap_parent: BitmapBatch<N>,
307    ) -> Self {
308        Self {
309            inner,
310            grafted_parent,
311            bitmap_parent,
312        }
313    }
314
315    /// Record a mutation. Use `Some(value)` for update/create, `None` for delete.
316    ///
317    /// If the same key is written multiple times within a batch, the last
318    /// value wins.
319    pub fn write(mut self, key: U::Key, value: Option<U::Value>) -> Self {
320        self.inner = self.inner.write(key, value);
321        self
322    }
323}
324
325// Unordered get + merkleize.
326impl<F, K, V, H, const N: usize> UnmerkleizedBatch<F, H, update::Unordered<K, V>, N>
327where
328    F: Graftable,
329    K: Key,
330    V: ValueEncoding,
331    H: Hasher,
332    Operation<F, update::Unordered<K, V>>: Codec,
333{
334    /// Read through: mutations -> ancestor diffs -> committed DB.
335    pub async fn get<E, C, I>(
336        &self,
337        key: &K,
338        db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N>,
339    ) -> Result<Option<V::Value>, Error<F>>
340    where
341        E: Context,
342        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
343        I: UnorderedIndex<Value = Location<F>> + 'static,
344    {
345        self.inner.get(key, &db.any).await
346    }
347
348    /// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
349    pub async fn merkleize<E, C, I>(
350        self,
351        db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N>,
352        metadata: Option<V::Value>,
353    ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N>>, Error<F>>
354    where
355        E: Context,
356        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
357        I: UnorderedIndex<Value = Location<F>> + 'static,
358    {
359        let Self {
360            inner,
361            grafted_parent,
362            bitmap_parent,
363        } = self;
364        let scan = BitmapScan::new(&bitmap_parent);
365        let inner = inner
366            .merkleize_with_floor_scan(&db.any, metadata, scan)
367            .await?;
368        compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
369    }
370}
371
372// Ordered get + merkleize.
373impl<F, K, V, H, const N: usize> UnmerkleizedBatch<F, H, update::Ordered<K, V>, N>
374where
375    F: Graftable,
376    K: Key,
377    V: ValueEncoding,
378    H: Hasher,
379    Operation<F, update::Ordered<K, V>>: Codec,
380{
381    /// Read through: mutations -> ancestor diffs -> committed DB.
382    pub async fn get<E, C, I>(
383        &self,
384        key: &K,
385        db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N>,
386    ) -> Result<Option<V::Value>, Error<F>>
387    where
388        E: Context,
389        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
390        I: crate::index::Ordered<Value = Location<F>> + 'static,
391    {
392        self.inner.get(key, &db.any).await
393    }
394
395    /// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
396    pub async fn merkleize<E, C, I>(
397        self,
398        db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N>,
399        metadata: Option<V::Value>,
400    ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N>>, Error<F>>
401    where
402        E: Context,
403        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
404        I: crate::index::Ordered<Value = Location<F>> + 'static,
405    {
406        let Self {
407            inner,
408            grafted_parent,
409            bitmap_parent,
410        } = self;
411        let scan = BitmapScan::new(&bitmap_parent);
412        let inner = inner
413            .merkleize_with_floor_scan(&db.any, metadata, scan)
414            .await?;
415        compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
416    }
417}
418
419/// Derive all bitmap mutations (pushes + clears) for this batch in a single pass over the diff and
420/// ancestor diffs. Avoids iterating raw operations.
421///
422/// Pushes: one bit per operation in the batch. All false except active diff entries (whose `loc`
423/// falls in the batch) and the CommitFloor (last op).
424///
425/// Clears: previous CommitFloor, plus the most recent superseded location for each mutated key. We
426/// search back through ancestors to find the most recent active location; if none exists, we clear
427/// the committed DB location (`base_old_loc`).
428#[allow(clippy::type_complexity)]
429fn build_chunk_overlay<F: Graftable, U, B: BitmapReadable<N>, const N: usize>(
430    base: &B,
431    batch_len: usize,
432    batch_base: u64,
433    diff: &BTreeMap<U::Key, DiffEntry<F, U::Value>>,
434    ancestor_diffs: &[Arc<BTreeMap<U::Key, DiffEntry<F, U::Value>>>],
435) -> ChunkOverlay<N>
436where
437    U: update::Update,
438{
439    let total_bits = base.len() + batch_len as u64;
440    let mut overlay = ChunkOverlay::new(total_bits);
441
442    // 1. CommitFloor (last op) is always active.
443    let commit_loc = batch_base + batch_len as u64 - 1;
444    overlay.set_bit(base, commit_loc);
445
446    // 2. Inactivate previous CommitFloor.
447    overlay.clear_bit(base, batch_base - 1);
448
449    // 3. Set active bits + clear superseded locations from the diff.
450    for (key, entry) in diff {
451        // Set the active bit for this key's final location.
452        if let Some(loc) = entry.loc() {
453            if *loc >= batch_base && *loc < batch_base + batch_len as u64 {
454                overlay.set_bit(base, *loc);
455            }
456        }
457
458        // Clear the most recent superseded location. Older locations were already cleared by the
459        // ancestor batch that superseded them.
460        let mut prev_loc = entry.base_old_loc();
461        for ancestor_diff in ancestor_diffs {
462            if let Some(ancestor_entry) = ancestor_diff.get(key) {
463                prev_loc = ancestor_entry.loc();
464                break;
465            }
466        }
467        if let Some(old) = prev_loc {
468            overlay.clear_bit(base, *old);
469        }
470    }
471
472    // Ensure all new complete chunks beyond the parent are materialized, so downstream consumers
473    // don't read from the parent and panic on out-of-range indices. Uses chunk_mut to inherit the
474    // parent's partial chunk data when idx == parent_complete (avoiding loss of existing bits).
475    let parent_complete = base.complete_chunks();
476    let new_complete = overlay.complete_chunks();
477    for idx in parent_complete..new_complete {
478        overlay.chunk_mut(base, idx);
479    }
480
481    overlay
482}
483
484/// Compute the current layer (bitmap + grafted MMR + canonical root) on top of a merkleized any
485/// batch.
486///
487/// Builds a chunk overlay from the diff, computes grafted MMR leaves from dirty chunks, and
488/// produces the `Arc<MerkleizedBatch>` directly.
489async fn compute_current_layer<F, E, U, C, I, H, const N: usize>(
490    inner: Arc<any::batch::MerkleizedBatch<F, H::Digest, U>>,
491    current_db: &super::db::Db<F, E, C, I, H, U, N>,
492    grafted_parent: &Arc<merkle::batch::MerkleizedBatch<F, H::Digest>>,
493    bitmap_parent: &BitmapBatch<N>,
494) -> Result<Arc<MerkleizedBatch<F, H::Digest, U, N>>, Error<F>>
495where
496    F: Graftable,
497    E: Context,
498    U: update::Update + Send + Sync,
499    C: Contiguous<Item = Operation<F, U>>,
500    I: UnorderedIndex<Value = Location<F>>,
501    H: Hasher,
502    Operation<F, U>: Codec,
503{
504    let batch_len = inner.journal_batch.items().len();
505    let batch_base = *inner.new_last_commit_loc + 1 - batch_len as u64;
506
507    // Build chunk overlay: materialized bytes for every dirty chunk.
508    let overlay = build_chunk_overlay::<F, U, _, N>(
509        bitmap_parent,
510        batch_len,
511        batch_base,
512        &inner.diff,
513        &inner.ancestor_diffs,
514    );
515
516    // Grafted MMR recomputation: iterate complete chunks in the overlay.
517    // This covers both new chunks and dirty existing chunks in a single pass.
518    let new_grafted_leaves = overlay.complete_chunks();
519    let chunks_to_update = overlay
520        .chunks
521        .iter()
522        .filter(|(&idx, _)| idx < new_grafted_leaves)
523        .map(|(&idx, &chunk)| (idx, chunk));
524    let ops_tree_adapter =
525        BatchStorageAdapter::new(&inner.journal_batch, &current_db.any.log.merkle);
526    let hasher = StandardHasher::<H>::new();
527    let new_leaves = compute_grafted_leaves::<F, H, N>(
528        &hasher,
529        &ops_tree_adapter,
530        chunks_to_update,
531        current_db.thread_pool.as_ref(),
532    )
533    .await?;
534
535    // Build grafted MMR from parent batch.
536    let grafting_height = grafting::height::<N>();
537    let grafted_batch = {
538        let mut grafted_batch = grafted_parent
539            .new_batch()
540            .with_pool(current_db.thread_pool.clone());
541        let old_grafted_leaves = *grafted_parent.leaves() as usize;
542        for &(chunk_idx, digest) in &new_leaves {
543            if chunk_idx < old_grafted_leaves {
544                grafted_batch = grafted_batch
545                    .update_leaf_digest(Location::<F>::new(chunk_idx as u64), digest)
546                    .expect("update_leaf_digest failed");
547            } else {
548                grafted_batch = grafted_batch.add_leaf_digest(digest);
549            }
550        }
551        let gh = grafting::GraftedHasher::<F, _>::new(hasher.clone(), grafting_height);
552        grafted_batch.merkleize(&current_db.grafted_tree, &gh)
553    };
554
555    // Build the layered bitmap (parent + overlay) before computing the canonical root, so that
556    // compute_db_root sees newly completed chunks. Using bitmap_parent alone would miss chunks
557    // that transitioned from partial to complete in this batch.
558    let bitmap_batch = BitmapBatch::Layer(Arc::new(BitmapBatchLayer {
559        parent: bitmap_parent.clone(),
560        overlay: Arc::new(overlay),
561    }));
562
563    // Compute canonical root. The grafted batch alone cannot resolve committed nodes,
564    // so layer it over the committed grafted MMR.
565    let ops_root = inner.root();
566    let layered = BatchOverMem {
567        batch: &grafted_batch,
568        mem: &current_db.grafted_tree,
569    };
570    let grafted_storage = grafting::Storage::new(&layered, grafting_height, &ops_tree_adapter);
571    // Compute partial chunk (last incomplete chunk, if any).
572    let partial = {
573        let rem = bitmap_batch.len() % BitmapBatch::<N>::CHUNK_SIZE_BITS;
574        if rem == 0 {
575            None
576        } else {
577            let idx = new_grafted_leaves;
578            let chunk = bitmap_batch.get_chunk(idx);
579            Some((chunk, rem))
580        }
581    };
582    let canonical_root = compute_db_root::<F, H, _, _, _, N>(
583        &hasher,
584        &bitmap_batch,
585        &grafted_storage,
586        partial,
587        &ops_root,
588    )
589    .await?;
590
591    Ok(Arc::new(MerkleizedBatch {
592        inner,
593        grafted: grafted_batch,
594        bitmap: bitmap_batch,
595        canonical_root,
596    }))
597}
598
599/// Immutable bitmap state at any point in a batch chain.
600///
601/// Mirrors the [`crate::merkle::batch::MerkleizedBatch`] pattern.
602#[derive(Clone, Debug)]
603pub(crate) enum BitmapBatch<const N: usize> {
604    /// Committed bitmap (chain terminal).
605    Base(Arc<BitMap<N>>),
606    /// Speculative layer on top of a parent batch.
607    Layer(Arc<BitmapBatchLayer<N>>),
608}
609
610/// The data behind a [`BitmapBatch::Layer`].
611#[derive(Debug)]
612pub(crate) struct BitmapBatchLayer<const N: usize> {
613    pub(crate) parent: BitmapBatch<N>,
614    /// Chunk-level overlay: materialized bytes for every chunk that differs from parent.
615    pub(crate) overlay: Arc<ChunkOverlay<N>>,
616}
617
618impl<const N: usize> BitmapBatch<N> {
619    const CHUNK_SIZE_BITS: u64 = BitMap::<N>::CHUNK_SIZE_BITS;
620}
621
622impl<const N: usize> BitmapReadable<N> for BitmapBatch<N> {
623    fn complete_chunks(&self) -> usize {
624        (self.len() / Self::CHUNK_SIZE_BITS) as usize
625    }
626
627    fn get_chunk(&self, idx: usize) -> [u8; N] {
628        match self {
629            Self::Base(bm) => *bm.get_chunk(idx),
630            Self::Layer(layer) => {
631                // Check overlay first; fall through to parent if unmodified.
632                if let Some(&chunk) = layer.overlay.get(idx) {
633                    chunk
634                } else {
635                    layer.parent.get_chunk(idx)
636                }
637            }
638        }
639    }
640
641    fn last_chunk(&self) -> ([u8; N], u64) {
642        let total = self.len();
643        if total == 0 {
644            return ([0u8; N], 0);
645        }
646        let rem = total % Self::CHUNK_SIZE_BITS;
647        let bits_in_last = if rem == 0 { Self::CHUNK_SIZE_BITS } else { rem };
648        let idx = if rem == 0 {
649            self.complete_chunks().saturating_sub(1)
650        } else {
651            self.complete_chunks()
652        };
653        (self.get_chunk(idx), bits_in_last)
654    }
655
656    fn pruned_chunks(&self) -> usize {
657        match self {
658            Self::Base(bm) => bm.pruned_chunks(),
659            Self::Layer(layer) => layer.parent.pruned_chunks(),
660        }
661    }
662
663    fn len(&self) -> u64 {
664        match self {
665            Self::Base(bm) => BitmapReadable::<N>::len(bm.as_ref()),
666            Self::Layer(layer) => layer.overlay.len,
667        }
668    }
669}
670
671impl<const N: usize> BitmapBatch<N> {
672    /// Apply a chunk overlay to this bitmap. When `self` is `Base` with sole ownership, writes
673    /// overlay chunks directly into the bitmap. Otherwise creates a new `Layer`.
674    pub(super) fn apply_overlay(&mut self, overlay: Arc<ChunkOverlay<N>>) {
675        // Fast path: write overlay chunks directly into the Base bitmap.
676        if let Self::Base(base) = self {
677            if let Some(bitmap) = Arc::get_mut(base) {
678                // Extend bitmap to the overlay's length.
679                bitmap.extend_to(overlay.len);
680                // Overwrite dirty chunks.
681                for (&idx, chunk_bytes) in &overlay.chunks {
682                    if idx >= bitmap.pruned_chunks() {
683                        bitmap.set_chunk_by_index(idx, chunk_bytes);
684                    }
685                }
686                return;
687            }
688        }
689
690        // Slow path: create a new layer.
691        let parent = self.clone();
692        *self = Self::Layer(Arc::new(BitmapBatchLayer { parent, overlay }));
693    }
694
695    /// Flatten all layers back to a single `Base(Arc<BitMap<N>>)`.
696    ///
697    /// After flattening, the new `Base` Arc has refcount 1 (assuming no external clones
698    /// are held).
699    pub(super) fn flatten(&mut self) {
700        if matches!(self, Self::Base(_)) {
701            return;
702        }
703
704        // Take ownership of the chain so that Arc refcounts are not
705        // artificially inflated by a clone.
706        let mut owned = std::mem::replace(self, Self::Base(Arc::new(BitMap::default())));
707
708        // Collect overlays from tip to base.
709        let mut overlays = Vec::new();
710        let base = loop {
711            match owned {
712                Self::Base(bm) => break bm,
713                Self::Layer(layer) => match Arc::try_unwrap(layer) {
714                    Ok(inner) => {
715                        overlays.push(inner.overlay);
716                        owned = inner.parent;
717                    }
718                    Err(arc) => {
719                        overlays.push(arc.overlay.clone());
720                        owned = arc.parent.clone();
721                    }
722                },
723            }
724        };
725
726        // Apply overlays from base to tip.
727        let mut bitmap = Arc::try_unwrap(base).unwrap_or_else(|arc| (*arc).clone());
728        for overlay in overlays.into_iter().rev() {
729            // Extend bitmap to the overlay's length.
730            bitmap.extend_to(overlay.len);
731            // Apply dirty chunks.
732            for (&idx, chunk_bytes) in &overlay.chunks {
733                if idx >= bitmap.pruned_chunks() {
734                    bitmap.set_chunk_by_index(idx, chunk_bytes);
735                }
736            }
737        }
738        *self = Self::Base(Arc::new(bitmap));
739    }
740}
741
742impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize>
743    MerkleizedBatch<F, D, U, N>
744where
745    Operation<F, U>: Send + Sync,
746{
747    /// Return the canonical root.
748    pub const fn root(&self) -> D {
749        self.canonical_root
750    }
751
752    /// Return the ops-only MMR root.
753    pub fn ops_root(&self) -> D {
754        self.inner.root()
755    }
756}
757
758impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize>
759    MerkleizedBatch<F, D, U, N>
760where
761    Operation<F, U>: Codec,
762{
763    /// Create a new speculative batch of operations with this batch as its parent.
764    ///
765    /// All uncommitted ancestors in the chain must be kept alive until the child (or any
766    /// descendant) is merkleized. Dropping an uncommitted ancestor causes data
767    /// loss detected at `apply_batch` time.
768    pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, U, N>
769    where
770        H: Hasher<Digest = D>,
771    {
772        UnmerkleizedBatch::new(
773            self.inner.new_batch::<H>(),
774            Arc::clone(&self.grafted),
775            self.bitmap.clone(),
776        )
777    }
778
779    /// Read through: local diff -> ancestor diffs -> committed DB.
780    pub async fn get<E, C, I, H>(
781        &self,
782        key: &U::Key,
783        db: &super::db::Db<F, E, C, I, H, U, N>,
784    ) -> Result<Option<U::Value>, Error<F>>
785    where
786        E: Context,
787        C: Contiguous<Item = Operation<F, U>>,
788        I: UnorderedIndex<Value = Location<F>> + 'static,
789        H: Hasher<Digest = D>,
790    {
791        self.inner.get(key, &db.any).await
792    }
793}
794
795impl<F, E, C, I, H, U, const N: usize> super::db::Db<F, E, C, I, H, U, N>
796where
797    F: Graftable,
798    E: Context,
799    U: update::Update + Send + Sync,
800    C: Contiguous<Item = Operation<F, U>>,
801    I: UnorderedIndex<Value = Location<F>>,
802    H: Hasher,
803    Operation<F, U>: Codec,
804{
805    /// Create an initial [`MerkleizedBatch`] from the committed DB state.
806    pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, U, N>> {
807        let grafted = self.grafted_snapshot();
808        Arc::new(MerkleizedBatch {
809            inner: self.any.to_batch(),
810            grafted,
811            bitmap: self.status.clone(),
812            canonical_root: self.root,
813        })
814    }
815}
816
817#[cfg(any(test, feature = "test-traits"))]
818mod trait_impls {
819    use super::*;
820    use crate::{
821        journal::contiguous::Mutable,
822        qmdb::any::traits::{
823            BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
824            UnmerkleizedBatch as UnmerkleizedBatchTrait,
825        },
826        Persistable,
827    };
828    use std::future::Future;
829
830    type CurrentDb<F, E, C, I, H, U, const N: usize> =
831        crate::qmdb::current::db::Db<F, E, C, I, H, U, N>;
832
833    impl<F, K, V, H, E, C, I, const N: usize>
834        UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N>>
835        for UnmerkleizedBatch<F, H, update::Unordered<K, V>, N>
836    where
837        F: Graftable,
838        K: Key,
839        V: ValueEncoding + 'static,
840        H: Hasher,
841        E: Context,
842        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
843            + Persistable<Error = crate::journal::Error>,
844        I: UnorderedIndex<Value = Location<F>> + 'static,
845        Operation<F, update::Unordered<K, V>>: Codec,
846    {
847        type Family = F;
848        type K = K;
849        type V = V::Value;
850        type Metadata = V::Value;
851        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N>>;
852
853        fn write(self, key: K, value: Option<V::Value>) -> Self {
854            Self::write(self, key, value)
855        }
856
857        fn merkleize(
858            self,
859            db: &CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N>,
860            metadata: Option<V::Value>,
861        ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
862            self.merkleize(db, metadata)
863        }
864    }
865
866    impl<F, K, V, H, E, C, I, const N: usize>
867        UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N>>
868        for UnmerkleizedBatch<F, H, update::Ordered<K, V>, N>
869    where
870        F: Graftable,
871        K: Key,
872        V: ValueEncoding + 'static,
873        H: Hasher,
874        E: Context,
875        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
876            + Persistable<Error = crate::journal::Error>,
877        I: crate::index::Ordered<Value = Location<F>> + 'static,
878        Operation<F, update::Ordered<K, V>>: Codec,
879    {
880        type Family = F;
881        type K = K;
882        type V = V::Value;
883        type Metadata = V::Value;
884        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N>>;
885
886        fn write(self, key: K, value: Option<V::Value>) -> Self {
887            Self::write(self, key, value)
888        }
889
890        fn merkleize(
891            self,
892            db: &CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N>,
893            metadata: Option<V::Value>,
894        ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
895            self.merkleize(db, metadata)
896        }
897    }
898
899    impl<F: Graftable, D: Digest, U: update::Update + Send + Sync + 'static, const N: usize>
900        MerkleizedBatchTrait for Arc<MerkleizedBatch<F, D, U, N>>
901    where
902        Operation<F, U>: Codec,
903    {
904        type Digest = D;
905
906        fn root(&self) -> D {
907            MerkleizedBatch::root(self)
908        }
909    }
910
911    impl<F, E, K, V, C, I, H, const N: usize> BatchableDb
912        for CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N>
913    where
914        F: Graftable,
915        E: Context,
916        K: Key,
917        V: ValueEncoding + 'static,
918        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
919            + Persistable<Error = crate::journal::Error>,
920        I: UnorderedIndex<Value = Location<F>> + 'static,
921        H: Hasher,
922        Operation<F, update::Unordered<K, V>>: Codec,
923    {
924        type Family = F;
925        type K = K;
926        type V = V::Value;
927        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N>>;
928        type Batch = UnmerkleizedBatch<F, H, update::Unordered<K, V>, N>;
929
930        fn new_batch(&self) -> Self::Batch {
931            self.new_batch()
932        }
933
934        fn apply_batch(
935            &mut self,
936            batch: Self::Merkleized,
937        ) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
938        {
939            self.apply_batch(batch)
940        }
941    }
942
943    impl<F, E, K, V, C, I, H, const N: usize> BatchableDb
944        for CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N>
945    where
946        F: Graftable,
947        E: Context,
948        K: Key,
949        V: ValueEncoding + 'static,
950        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
951            + Persistable<Error = crate::journal::Error>,
952        I: crate::index::Ordered<Value = Location<F>> + 'static,
953        H: Hasher,
954        Operation<F, update::Ordered<K, V>>: Codec,
955    {
956        type Family = F;
957        type K = K;
958        type V = V::Value;
959        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N>>;
960        type Batch = UnmerkleizedBatch<F, H, update::Ordered<K, V>, N>;
961
962        fn new_batch(&self) -> Self::Batch {
963            self.new_batch()
964        }
965
966        fn apply_batch(
967            &mut self,
968            batch: Self::Merkleized,
969        ) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
970        {
971            self.apply_batch(batch)
972        }
973    }
974}
975
976#[cfg(test)]
977mod tests {
978    use super::*;
979    use crate::mmr;
980    use commonware_utils::bitmap::Prunable as BitMap;
981
982    // N=4 -> CHUNK_SIZE_BITS = 32
983    const N: usize = 4;
984    type Bm = BitMap<N>;
985    type Location = mmr::Location;
986
987    fn make_bitmap(bits: &[bool]) -> Bm {
988        let mut bm = Bm::new();
989        for &b in bits {
990            bm.push(b);
991        }
992        bm
993    }
994
995    // ---- build_chunk_overlay tests ----
996
997    #[test]
998    fn chunk_overlay_pushes() {
999        use crate::qmdb::any::value::FixedEncoding;
1000        use commonware_utils::sequence::FixedBytes;
1001
1002        type K = FixedBytes<4>;
1003        type V = FixedEncoding<u64>;
1004        type U = crate::qmdb::any::operation::update::Unordered<K, V>;
1005
1006        let key1 = FixedBytes::from([1, 0, 0, 0]);
1007        let key2 = FixedBytes::from([2, 0, 0, 0]);
1008
1009        // Base: 4 bits, all set (previous commit at loc 3).
1010        // Segment of 4 operations starting at base_size=4.
1011        // Diff: key1 active at loc=4 (in batch), key2 active at loc=99 (not in batch,
1012        // so superseded within this batch).
1013        let base = make_bitmap(&[true; 4]);
1014        let mut diff = BTreeMap::new();
1015        diff.insert(
1016            key1,
1017            DiffEntry::Active {
1018                value: 100u64,
1019                loc: Location::new(4), // offset 0 in batch
1020                base_old_loc: None,
1021            },
1022        );
1023        diff.insert(
1024            key2,
1025            DiffEntry::Active {
1026                value: 200u64,
1027                loc: Location::new(99), // not in batch [4,8), so superseded
1028                base_old_loc: None,
1029            },
1030        );
1031
1032        let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 4, 4, &diff, &[]);
1033
1034        // Chunk 0 should have: bits 0-3 from base (all set), bit 4 set (key1), bits 5-6 false
1035        // (inactive), bit 7 set (CommitFloor at loc 7). Also bit 3 cleared (previous commit).
1036        let c0 = overlay.get(0).expect("chunk 0 should be dirty");
1037        assert_ne!(c0[0] & (1 << 4), 0); // key1 active
1038        assert_eq!(c0[0] & (1 << 5), 0); // inactive
1039        assert_eq!(c0[0] & (1 << 6), 0); // inactive
1040        assert_ne!(c0[0] & (1 << 7), 0); // CommitFloor
1041        assert_eq!(c0[0] & (1 << 3), 0); // previous commit cleared
1042    }
1043
1044    #[test]
1045    fn chunk_overlay_clears() {
1046        use crate::qmdb::any::value::FixedEncoding;
1047        use commonware_utils::sequence::FixedBytes;
1048
1049        type K = FixedBytes<4>;
1050        type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
1051
1052        let key1 = FixedBytes::from([1, 0, 0, 0]);
1053        let key2 = FixedBytes::from([2, 0, 0, 0]);
1054        let key3 = FixedBytes::from([3, 0, 0, 0]);
1055
1056        // Base bitmap with 64 bits, all set.
1057        let base = make_bitmap(&[true; 64]);
1058
1059        let mut diff: BTreeMap<K, DiffEntry<mmr::Family, u64>> = BTreeMap::new();
1060
1061        // key1: Active with base_old_loc = Some(5) -> should clear bit 5.
1062        diff.insert(
1063            key1,
1064            DiffEntry::Active {
1065                value: 100,
1066                loc: Location::new(70),
1067                base_old_loc: Some(Location::new(5)),
1068            },
1069        );
1070
1071        // key2: Deleted with base_old_loc = Some(10) -> should clear bit 10.
1072        diff.insert(
1073            key2,
1074            DiffEntry::Deleted {
1075                base_old_loc: Some(Location::new(10)),
1076            },
1077        );
1078
1079        // key3: Active with base_old_loc = None -> no clear.
1080        diff.insert(
1081            key3,
1082            DiffEntry::Active {
1083                value: 300,
1084                loc: Location::new(71),
1085                base_old_loc: None,
1086            },
1087        );
1088
1089        // Segment of 8 ops starting at 64; previous commit at loc 63.
1090        let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 8, 64, &diff, &[]);
1091
1092        // Verify bits 5 and 10 are cleared in chunk 0.
1093        let c0 = overlay.get(0).expect("chunk 0 should be dirty");
1094        assert_eq!(c0[0] & (1 << 5), 0); // bit 5 cleared
1095        assert_eq!(c0[1] & (1 << 2), 0); // bit 10 = byte 1, bit 2 cleared
1096
1097        // Other bits should still be set.
1098        assert_eq!(c0[0] & (1 << 4), 1 << 4); // bit 4 still set
1099        assert_eq!(c0[1] & (1 << 3), 1 << 3); // bit 11 still set
1100    }
1101
1102    /// Regression: when the parent bitmap has a partial last chunk that becomes complete in the
1103    /// child (without any active bits landing in that chunk), the overlay must inherit the parent's
1104    /// partial chunk data, not zero it out.
1105    #[test]
1106    fn chunk_overlay_preserves_partial_parent_chunk() {
1107        use crate::qmdb::any::value::FixedEncoding;
1108        use commonware_utils::sequence::FixedBytes;
1109
1110        type K = FixedBytes<4>;
1111        type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
1112
1113        // Base: 20 bits set (partial chunk 0, CHUNK_SIZE_BITS=32).
1114        let base = make_bitmap(&[true; 20]);
1115        assert_eq!(base.complete_chunks(), 0); // partial
1116
1117        // Segment of 20 ops starting at loc 20. This pushes total to 40 bits, completing chunk 0
1118        // (32 bits) and starting chunk 1. Diff: only one active key at loc 35 (in chunk 1), plus
1119        // CommitFloor at loc 39. No active bits land in chunk 0's new region (bits 20-31).
1120        let key1 = FixedBytes::from([1, 0, 0, 0]);
1121        let mut diff = BTreeMap::new();
1122        diff.insert(
1123            key1,
1124            DiffEntry::Active {
1125                value: 42u64,
1126                loc: Location::new(35),
1127                base_old_loc: None,
1128            },
1129        );
1130
1131        let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 20, 20, &diff, &[]);
1132
1133        // Chunk 0 should be materialized and preserve the parent's first 20 bits.
1134        let c0 = overlay.get(0).expect("chunk 0 should be in overlay");
1135        // Bits 0-7 all set -> byte 0 = 0xFF
1136        assert_eq!(c0[0], 0xFF);
1137        // Bits 8-15 all set -> byte 1 = 0xFF
1138        assert_eq!(c0[1], 0xFF);
1139        // Bits 16-18 set, bit 19 cleared (previous commit), 20-23 not set -> byte 2 = 0x07
1140        assert_eq!(c0[2], 0x07);
1141    }
1142
1143    // ---- FloorScan tests ----
1144
1145    use crate::qmdb::any::batch::{FloorScan, SequentialScan};
1146
1147    #[test]
1148    fn sequential_scan_returns_floor_when_below_tip() {
1149        let mut scan = SequentialScan;
1150        assert_eq!(
1151            scan.next_candidate(Location::new(5), 10),
1152            Some(Location::new(5))
1153        );
1154    }
1155
1156    #[test]
1157    fn sequential_scan_returns_none_at_tip() {
1158        let mut scan = SequentialScan;
1159        assert_eq!(scan.next_candidate(Location::new(10), 10), None);
1160        assert_eq!(scan.next_candidate(Location::new(11), 10), None);
1161    }
1162
1163    #[test]
1164    fn bitmap_scan_all_active() {
1165        let bm = make_bitmap(&[true; 8]);
1166        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1167        for i in 0..8 {
1168            assert_eq!(
1169                scan.next_candidate(Location::new(i), 8),
1170                Some(Location::new(i))
1171            );
1172        }
1173        assert_eq!(scan.next_candidate(Location::new(8), 8), None);
1174    }
1175
1176    #[test]
1177    fn bitmap_scan_all_inactive() {
1178        let bm = make_bitmap(&[false; 8]);
1179        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1180        assert_eq!(scan.next_candidate(Location::new(0), 8), None);
1181    }
1182
1183    #[test]
1184    fn bitmap_scan_skips_inactive() {
1185        // Pattern: inactive, inactive, active, inactive, active
1186        let bm = make_bitmap(&[false, false, true, false, true]);
1187        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1188
1189        assert_eq!(
1190            scan.next_candidate(Location::new(0), 5),
1191            Some(Location::new(2))
1192        );
1193        assert_eq!(
1194            scan.next_candidate(Location::new(3), 5),
1195            Some(Location::new(4))
1196        );
1197        assert_eq!(scan.next_candidate(Location::new(5), 5), None);
1198    }
1199
1200    #[test]
1201    fn bitmap_scan_beyond_bitmap_len_returns_candidate() {
1202        // Bitmap has 4 bits, but tip is 8. Locations 4..8 are beyond the
1203        // bitmap and should be returned as candidates.
1204        let bm = make_bitmap(&[false; 4]);
1205        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1206
1207        // All bitmap bits are unset, so 0..4 are skipped.
1208        // Location 4 is beyond bitmap -> candidate.
1209        assert_eq!(
1210            scan.next_candidate(Location::new(0), 8),
1211            Some(Location::new(4))
1212        );
1213        assert_eq!(
1214            scan.next_candidate(Location::new(6), 8),
1215            Some(Location::new(6))
1216        );
1217    }
1218
1219    #[test]
1220    fn bitmap_scan_respects_tip() {
1221        let bm = make_bitmap(&[false, false, false, true]);
1222        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1223
1224        // Active bit at 3, but tip is 3 so it's excluded.
1225        assert_eq!(scan.next_candidate(Location::new(0), 3), None);
1226        // With tip=4, bit 3 is included.
1227        assert_eq!(
1228            scan.next_candidate(Location::new(0), 4),
1229            Some(Location::new(3))
1230        );
1231    }
1232
1233    #[test]
1234    fn bitmap_scan_floor_at_tip() {
1235        let bm = make_bitmap(&[true; 4]);
1236        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1237        assert_eq!(scan.next_candidate(Location::new(4), 4), None);
1238    }
1239
1240    #[test]
1241    fn bitmap_scan_empty_bitmap() {
1242        let bm = Bm::new();
1243        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1244
1245        // Empty bitmap, but tip > 0: all locations are beyond bitmap.
1246        assert_eq!(
1247            scan.next_candidate(Location::new(0), 5),
1248            Some(Location::new(0))
1249        );
1250        // Empty bitmap, tip = 0: no candidates.
1251        assert_eq!(scan.next_candidate(Location::new(0), 0), None);
1252    }
1253
1254    #[test]
1255    fn test_apply_overlay() {
1256        // Base: 8 bits all set, sole owner -> fast path.
1257        let base = make_bitmap(&[true; 8]);
1258        let mut batch = BitmapBatch::Base(Arc::new(base));
1259
1260        let mut overlay = ChunkOverlay::new(12);
1261        let mut c0 = [0u8; N];
1262        c0[0] = 0b1111_0111; // bits 0-7 set except bit 3
1263        c0[1] = 0b0000_0100; // bit 10 set
1264        overlay.chunks.insert(0, c0);
1265        batch.apply_overlay(Arc::new(overlay));
1266
1267        // Fast path keeps Base, extends length, applies chunks.
1268        assert!(matches!(batch, BitmapBatch::Base(_)));
1269        assert_eq!(batch.len(), 12);
1270        assert_eq!(batch.get_chunk(0)[0] & (1 << 3), 0);
1271        assert_ne!(batch.get_chunk(0)[1] & (1 << 2), 0);
1272
1273        // Shared Arc -> slow path creates Layer.
1274        let BitmapBatch::Base(ref base_arc) = batch else {
1275            panic!("expected Base");
1276        };
1277        let _shared = Arc::clone(base_arc);
1278        let overlay2 = ChunkOverlay::new(12);
1279        batch.apply_overlay(Arc::new(overlay2));
1280        assert!(matches!(batch, BitmapBatch::Layer(_)));
1281    }
1282}