Skip to main content

commonware_storage/qmdb/current/
batch.rs

1//! Batch mutation API for Current QMDBs.
2//!
3//! Wraps the [`any::batch`] API, layering bitmap and grafted MMR
4//! computation on top during [`UnmerkleizedBatch::merkleize()`].
5
6use crate::{
7    index::Unordered as UnorderedIndex,
8    journal::{
9        authenticated::{self, BatchChain},
10        contiguous::{Contiguous, Mutable},
11    },
12    mmr::{
13        self,
14        hasher::Hasher as _,
15        read::{BatchChainInfo, Readable},
16        storage::Storage as MmrStorage,
17        Location, Position, StandardHasher,
18    },
19    qmdb::{
20        any::{
21            self,
22            batch::{DiffEntry, FloorScan},
23            operation::{update, Operation},
24            ValueEncoding,
25        },
26        current::{
27            db::{compute_db_root, compute_grafted_leaves, partial_chunk},
28            grafting,
29        },
30        operation::{Key, Operation as OperationTrait},
31        Error,
32    },
33};
34use commonware_codec::Codec;
35use commonware_cryptography::{Digest, Hasher};
36use commonware_runtime::{Clock, Metrics, Storage};
37use commonware_utils::bitmap::Prunable as BitMap;
38use std::{
39    collections::{BTreeMap, HashSet},
40    sync::Arc,
41};
42
43/// A bitmap that can be read.
44pub trait BitmapRead<const N: usize> {
45    /// Return the number of complete (fully filled) chunks.
46    fn complete_chunks(&self) -> usize;
47    /// Return the chunk data at the given absolute chunk index.
48    fn get_chunk(&self, chunk: usize) -> [u8; N];
49    /// Return the last chunk and its size in bits.
50    fn last_chunk(&self) -> ([u8; N], u64);
51    /// Return the number of pruned chunks.
52    fn pruned_chunks(&self) -> usize;
53    /// Return the total number of bits.
54    fn len(&self) -> u64;
55    /// Returns true if the bitmap is empty.
56    fn is_empty(&self) -> bool {
57        self.len() == 0
58    }
59    /// Return the value of a single bit.
60    fn get_bit(&self, bit: u64) -> bool {
61        let chunk = self.get_chunk(BitMap::<N>::to_chunk_index(bit));
62        BitMap::<N>::get_bit_from_chunk(&chunk, bit % BitMap::<N>::CHUNK_SIZE_BITS)
63    }
64}
65
66impl<const N: usize> BitmapRead<N> for BitMap<N> {
67    fn complete_chunks(&self) -> usize {
68        Self::complete_chunks(self)
69    }
70    fn get_chunk(&self, chunk: usize) -> [u8; N] {
71        *Self::get_chunk(self, chunk)
72    }
73    fn last_chunk(&self) -> ([u8; N], u64) {
74        let (c, n) = Self::last_chunk(self);
75        (*c, n)
76    }
77    fn pruned_chunks(&self) -> usize {
78        Self::pruned_chunks(self)
79    }
80    fn len(&self) -> u64 {
81        Self::len(self)
82    }
83}
84
85/// Bitmap-accelerated floor scan. Skips locations where the bitmap bit is
86/// unset, avoiding I/O reads for inactive operations.
87pub(crate) struct BitmapScan<'a, B, const N: usize> {
88    bitmap: &'a B,
89}
90
91impl<'a, B: BitmapRead<N>, const N: usize> BitmapScan<'a, B, N> {
92    pub(crate) const fn new(bitmap: &'a B) -> Self {
93        Self { bitmap }
94    }
95}
96
97impl<B: BitmapRead<N>, const N: usize> FloorScan for BitmapScan<'_, B, N> {
98    fn next_candidate(&mut self, floor: Location, tip: u64) -> Option<Location> {
99        let mut loc = *floor;
100        let bitmap_len = self.bitmap.len();
101        while loc < tip {
102            // Beyond the bitmap: uncommitted ops from prior batches in the
103            // chain that aren't tracked by the bitmap yet. Conservatively
104            // treat them as candidates.
105            // Within the bitmap: only consider locations with the bit set.
106            if loc >= bitmap_len || self.bitmap.get_bit(loc) {
107                return Some(Location::new(loc));
108            }
109            loc += 1;
110        }
111        None
112    }
113}
114
115/// Uncommitted bitmap changes on top of a base bitmap. Records pushed bits
116/// and cleared bits without cloning the base. Implements [`BitmapRead`] for
117/// read-through access.
118pub struct BitmapDiff<'a, B: BitmapRead<N>, const N: usize> {
119    /// The parent bitmap this diff is built on top of.
120    base: &'a B,
121    /// Number of bits in the base bitmap at diff creation time.
122    base_len: u64,
123    /// New bits appended beyond the base bitmap.
124    pushed_bits: Vec<bool>,
125    /// Locations of base bits that have been deactivated.
126    cleared_bits: Vec<Location>,
127    /// Chunk indices containing cleared bits that need grafted MMR recomputation.
128    dirty_chunks: HashSet<usize>,
129    /// Number of complete chunks in the base bitmap at diff creation time.
130    old_grafted_leaves: usize,
131}
132
133impl<'a, B: BitmapRead<N>, const N: usize> BitmapDiff<'a, B, N> {
134    const CHUNK_SIZE_BITS: u64 = BitMap::<N>::CHUNK_SIZE_BITS;
135
136    fn new(base: &'a B, old_grafted_leaves: usize) -> Self {
137        Self {
138            base_len: base.len(),
139            base,
140            pushed_bits: Vec::new(),
141            cleared_bits: Vec::new(),
142            dirty_chunks: HashSet::new(),
143            old_grafted_leaves,
144        }
145    }
146
147    fn push_bit(&mut self, active: bool) {
148        self.pushed_bits.push(active);
149    }
150
151    fn clear_bit(&mut self, loc: Location) {
152        self.cleared_bits.push(loc);
153        let chunk = BitMap::<N>::to_chunk_index(*loc);
154        if chunk < self.old_grafted_leaves {
155            self.dirty_chunks.insert(chunk);
156        }
157    }
158
159    fn pushed_bits(&self) -> &[bool] {
160        &self.pushed_bits
161    }
162
163    fn cleared_bits(&self) -> &[Location] {
164        &self.cleared_bits
165    }
166}
167
168impl<B: BitmapRead<N>, const N: usize> BitmapRead<N> for BitmapDiff<'_, B, N> {
169    fn complete_chunks(&self) -> usize {
170        (self.len() / Self::CHUNK_SIZE_BITS) as usize
171    }
172
173    fn get_chunk(&self, idx: usize) -> [u8; N] {
174        let chunk_start = idx as u64 * Self::CHUNK_SIZE_BITS;
175        let chunk_end = chunk_start + Self::CHUNK_SIZE_BITS;
176
177        // Start with base data.
178        let base_complete = self.base.complete_chunks();
179        let base_has_partial = !self.base_len.is_multiple_of(Self::CHUNK_SIZE_BITS);
180        let mut chunk = if idx < base_complete {
181            self.base.get_chunk(idx)
182        } else if idx == base_complete && base_has_partial {
183            self.base.last_chunk().0
184        } else {
185            [0u8; N]
186        };
187
188        // Apply pushed bits. The relevant slice is identified in O(1) since pushes
189        // are contiguous from base_len.
190        let push_start = self.base_len;
191        let push_end = push_start + self.pushed_bits.len() as u64;
192        if push_start < chunk_end && push_end > chunk_start {
193            let abs_start = push_start.max(chunk_start);
194            let abs_end = push_end.min(chunk_end);
195            let from = (abs_start - push_start) as usize;
196            let to = (abs_end - push_start) as usize;
197            let rel_offset = (abs_start - chunk_start) as usize;
198            for (j, &bit) in self.pushed_bits[from..to].iter().enumerate() {
199                if bit {
200                    let rel = rel_offset + j;
201                    chunk[rel / 8] |= 1 << (rel % 8);
202                }
203            }
204        }
205
206        // Apply clears.
207        for &loc in &self.cleared_bits {
208            let bit = *loc;
209            if bit >= chunk_start && bit < chunk_end {
210                let rel = (bit - chunk_start) as usize;
211                chunk[rel / 8] &= !(1 << (rel % 8));
212            }
213        }
214
215        chunk
216    }
217
218    fn last_chunk(&self) -> ([u8; N], u64) {
219        let total = self.len();
220        if total == 0 {
221            return ([0u8; N], 0);
222        }
223        let rem = total % Self::CHUNK_SIZE_BITS;
224        let bits_in_last = if rem == 0 { Self::CHUNK_SIZE_BITS } else { rem };
225        let last_idx = self.complete_chunks();
226        // If chunk-aligned, last complete chunk is at complete_chunks - 1.
227        let idx = if rem == 0 {
228            last_idx.saturating_sub(1)
229        } else {
230            last_idx
231        };
232        (self.get_chunk(idx), bits_in_last)
233    }
234
235    fn pruned_chunks(&self) -> usize {
236        self.base.pruned_chunks()
237    }
238
239    fn len(&self) -> u64 {
240        self.base_len + self.pushed_bits.len() as u64
241    }
242}
243
244/// Adapter that resolves ops MMR nodes for a batch's `compute_current_layer`.
245///
246/// Tries the batch chain's sync [`Readable`] first (which covers nodes appended
247/// or overwritten by the batch, plus anything still in the in-memory MMR).
248/// Falls through to the base's async [`MmrStorage`].
249struct BatchStorageAdapter<'a, D: Digest, R: Readable<D>, S: MmrStorage<D>> {
250    batch: &'a R,
251    base: &'a S,
252    _phantom: core::marker::PhantomData<D>,
253}
254
255impl<'a, D: Digest, R: Readable<D>, S: MmrStorage<D>> BatchStorageAdapter<'a, D, R, S> {
256    const fn new(batch: &'a R, base: &'a S) -> Self {
257        Self {
258            batch,
259            base,
260            _phantom: core::marker::PhantomData,
261        }
262    }
263}
264
265impl<D: Digest, R: Readable<D>, S: MmrStorage<D>> MmrStorage<D>
266    for BatchStorageAdapter<'_, D, R, S>
267{
268    async fn size(&self) -> Position {
269        self.batch.size()
270    }
271    async fn get_node(&self, pos: Position) -> Result<Option<D>, mmr::Error> {
272        if let Some(node) = self.batch.get_node(pos) {
273            return Ok(Some(node));
274        }
275        self.base.get_node(pos).await
276    }
277}
278
279/// A speculative batch of mutations whose root digest has not yet been computed,
280/// in contrast to [MerkleizedBatch].
281#[allow(clippy::type_complexity)]
282pub struct UnmerkleizedBatch<'a, E, K, V, C, I, H, U, P, G, B, const N: usize>
283where
284    E: Storage + Clock + Metrics,
285    K: Key,
286    V: ValueEncoding,
287    U: update::Update<K, V> + Send + Sync,
288    C: Contiguous<Item = Operation<K, V, U>>,
289    I: UnorderedIndex<Value = Location>,
290    H: Hasher,
291    Operation<K, V, U>: Codec,
292    P: Readable<H::Digest> + BatchChainInfo<H::Digest> + BatchChain<Operation<K, V, U>>,
293    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
294    B: BitmapRead<N>,
295{
296    /// The inner any-layer batch that handles mutations, journal, and floor raise.
297    inner: any::batch::UnmerkleizedBatch<'a, E, K, V, C, I, H, U, P>,
298
299    /// The committed current-layer DB (for bitmap and grafted MMR access).
300    current_db: &'a super::db::Db<E, C, I, H, U, N>,
301
302    /// Bitmap pushes accumulated by prior batches in the chain.
303    base_bitmap_pushes: Vec<Arc<Vec<bool>>>,
304
305    /// Bitmap clears accumulated by prior batches in the chain.
306    base_bitmap_clears: Vec<Arc<Vec<Location>>>,
307
308    /// Parent's grafted MMR state.
309    grafted_parent: &'a G,
310
311    /// Parent's bitmap state.
312    bitmap_parent: &'a B,
313}
314
315/// A speculative batch of operations whose root digest has been computed,
316/// in contrast to [UnmerkleizedBatch].
317#[allow(clippy::type_complexity)]
318pub struct MerkleizedBatch<'a, E, K, V, C, I, H, U, P, G, B, const N: usize>
319where
320    E: Storage + Clock + Metrics,
321    K: Key,
322    V: ValueEncoding,
323    U: update::Update<K, V> + Send + Sync,
324    C: Contiguous<Item = Operation<K, V, U>>,
325    I: UnorderedIndex<Value = Location>,
326    H: Hasher,
327    Operation<K, V, U>: Codec,
328    P: Readable<H::Digest> + BatchChainInfo<H::Digest> + BatchChain<Operation<K, V, U>>,
329    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
330    B: BitmapRead<N>,
331{
332    /// The inner any-layer merkleized batch.
333    inner: any::batch::MerkleizedBatch<'a, E, K, V, C, I, H, U, P>,
334
335    /// The committed current-layer DB (for bitmap and grafted MMR access).
336    current_db: &'a super::db::Db<E, C, I, H, U, N>,
337
338    /// Bitmap pushes accumulated by prior batches in the chain.
339    base_bitmap_pushes: Vec<Arc<Vec<bool>>>,
340
341    /// Bitmap clears accumulated by prior batches in the chain.
342    base_bitmap_clears: Vec<Arc<Vec<Location>>>,
343
344    /// Merkleized grafted MMR changes on top of the parent's state.
345    grafted_merkleized: mmr::MerkleizedBatch<'a, H::Digest, G>,
346
347    /// Uncommitted bitmap changes on top of the parent bitmap.
348    bitmap_diff: BitmapDiff<'a, B, N>,
349
350    /// The canonical root (ops MMR root + grafted MMR root + partial chunk).
351    canonical_root: H::Digest,
352}
353
354/// An owned changeset that can be applied to the database.
355pub struct Changeset<K, D: Digest, Item: Send, const N: usize> {
356    /// The inner any-layer changeset.
357    pub(super) inner: any::batch::Changeset<K, D, Item>,
358
359    /// One bool per operation in the batch chain (pushes applied before clears).
360    pub(super) bitmap_pushes: Vec<bool>,
361
362    /// Locations of bits to clear after pushing.
363    pub(super) bitmap_clears: Vec<Location>,
364
365    /// Changeset for the grafted MMR.
366    pub(super) grafted_changeset: mmr::Changeset<D>,
367
368    /// Precomputed canonical root.
369    pub(super) canonical_root: D,
370}
371
372impl<'a, E, K, V, C, I, H, U, P, G, B, const N: usize>
373    UnmerkleizedBatch<'a, E, K, V, C, I, H, U, P, G, B, N>
374where
375    E: Storage + Clock + Metrics,
376    K: Key,
377    V: ValueEncoding,
378    U: update::Update<K, V> + Send + Sync,
379    C: Contiguous<Item = Operation<K, V, U>>,
380    I: UnorderedIndex<Value = Location>,
381    H: Hasher,
382    Operation<K, V, U>: Codec,
383    P: Readable<H::Digest> + BatchChainInfo<H::Digest> + BatchChain<Operation<K, V, U>>,
384    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
385    B: BitmapRead<N>,
386{
387    pub(super) const fn new(
388        inner: any::batch::UnmerkleizedBatch<'a, E, K, V, C, I, H, U, P>,
389        current_db: &'a super::db::Db<E, C, I, H, U, N>,
390        base_bitmap_pushes: Vec<Arc<Vec<bool>>>,
391        base_bitmap_clears: Vec<Arc<Vec<Location>>>,
392        grafted_parent: &'a G,
393        bitmap_parent: &'a B,
394    ) -> Self {
395        Self {
396            inner,
397            current_db,
398            base_bitmap_pushes,
399            base_bitmap_clears,
400            grafted_parent,
401            bitmap_parent,
402        }
403    }
404
405    /// Record a mutation. Use `Some(value)` for update/create, `None` for delete.
406    ///
407    /// If the same key is written multiple times within a batch, the last
408    /// value wins.
409    pub fn write(&mut self, key: K, value: Option<V::Value>) {
410        self.inner.write(key, value);
411    }
412}
413
414// Unordered get + merkleize.
415impl<'a, E, K, V, C, I, H, P, G, B, const N: usize>
416    UnmerkleizedBatch<'a, E, K, V, C, I, H, update::Unordered<K, V>, P, G, B, N>
417where
418    E: Storage + Clock + Metrics,
419    K: Key,
420    V: ValueEncoding,
421    C: Mutable<Item = Operation<K, V, update::Unordered<K, V>>>,
422    I: UnorderedIndex<Value = Location> + 'static,
423    H: Hasher,
424    Operation<K, V, update::Unordered<K, V>>: Codec,
425    V::Value: Send + Sync,
426    P: Readable<H::Digest>
427        + BatchChainInfo<H::Digest>
428        + BatchChain<Operation<K, V, update::Unordered<K, V>>>,
429    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
430    B: BitmapRead<N>,
431{
432    /// Read through: mutations -> base diff -> committed DB.
433    pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error> {
434        self.inner.get(key).await
435    }
436
437    /// Resolve mutations into operations, merkleize, and return a [`MerkleizedBatch`].
438    pub async fn merkleize(
439        self,
440        metadata: Option<V::Value>,
441    ) -> Result<MerkleizedBatch<'a, E, K, V, C, I, H, update::Unordered<K, V>, P, G, B, N>, Error>
442    {
443        let Self {
444            inner,
445            current_db,
446            base_bitmap_pushes,
447            base_bitmap_clears,
448            grafted_parent,
449            bitmap_parent,
450        } = self;
451        let scan = BitmapScan::new(bitmap_parent);
452        let inner = inner.merkleize_with_floor_scan(metadata, scan).await?;
453        compute_current_layer(
454            inner,
455            current_db,
456            base_bitmap_pushes,
457            base_bitmap_clears,
458            grafted_parent,
459            bitmap_parent,
460        )
461        .await
462    }
463}
464
465// Ordered get + merkleize.
466impl<'a, E, K, V, C, I, H, P, G, B, const N: usize>
467    UnmerkleizedBatch<'a, E, K, V, C, I, H, update::Ordered<K, V>, P, G, B, N>
468where
469    E: Storage + Clock + Metrics,
470    K: Key,
471    V: ValueEncoding,
472    C: Mutable<Item = Operation<K, V, update::Ordered<K, V>>>,
473    I: crate::index::Ordered<Value = Location> + 'static,
474    H: Hasher,
475    Operation<K, V, update::Ordered<K, V>>: Codec,
476    V::Value: Send + Sync,
477    P: Readable<H::Digest>
478        + BatchChainInfo<H::Digest>
479        + BatchChain<Operation<K, V, update::Ordered<K, V>>>,
480    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
481    B: BitmapRead<N>,
482{
483    /// Read through: mutations -> base diff -> committed DB.
484    pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error> {
485        self.inner.get(key).await
486    }
487
488    /// Resolve mutations into operations, merkleize, and return a [`MerkleizedBatch`].
489    pub async fn merkleize(
490        self,
491        metadata: Option<V::Value>,
492    ) -> Result<MerkleizedBatch<'a, E, K, V, C, I, H, update::Ordered<K, V>, P, G, B, N>, Error>
493    {
494        let Self {
495            inner,
496            current_db,
497            base_bitmap_pushes,
498            base_bitmap_clears,
499            grafted_parent,
500            bitmap_parent,
501        } = self;
502        let scan = BitmapScan::new(bitmap_parent);
503        let inner = inner.merkleize_with_floor_scan(metadata, scan).await?;
504        compute_current_layer(
505            inner,
506            current_db,
507            base_bitmap_pushes,
508            base_bitmap_clears,
509            grafted_parent,
510            bitmap_parent,
511        )
512        .await
513    }
514}
515
516/// Push one bitmap bit per operation in `segment`. An Update is active only if
517/// the merged diff shows it as the final location for its key.
518fn push_operation_bits<K, V, U, B, const N: usize>(
519    bitmap: &mut BitmapDiff<'_, B, N>,
520    segment: &[Operation<K, V, U>],
521    segment_base: u64,
522    diff: &BTreeMap<K, DiffEntry<V::Value>>,
523) where
524    K: Key,
525    V: ValueEncoding,
526    U: update::Update<K, V>,
527    B: BitmapRead<N>,
528    Operation<K, V, U>: Codec,
529{
530    for (i, op) in segment.iter().enumerate() {
531        let op_loc = Location::new(segment_base + i as u64);
532        match op {
533            Operation::Update(update) => {
534                let is_active = diff
535                    .get(update.key())
536                    .is_some_and(|entry| entry.loc() == Some(op_loc));
537                bitmap.push_bit(is_active);
538            }
539            Operation::CommitFloor(..) => {
540                // Active until the next commit supersedes it.
541                bitmap.push_bit(true);
542            }
543            Operation::Delete(..) => {
544                bitmap.push_bit(false);
545            }
546        }
547    }
548}
549
550/// Clear bits for base-DB operations superseded by this chain's diff.
551fn clear_base_old_locs<K, V, B, const N: usize>(
552    bitmap: &mut BitmapDiff<'_, B, N>,
553    diff: &BTreeMap<K, DiffEntry<V>>,
554) where
555    K: Ord,
556    B: BitmapRead<N>,
557{
558    for entry in diff.values() {
559        if let Some(old) = entry.base_old_loc() {
560            bitmap.clear_bit(old);
561        }
562    }
563}
564
565/// Clear bits for ancestor-segment operations superseded by a later segment.
566/// Only relevant for chained batches (chain length > 1).
567#[allow(clippy::type_complexity)]
568fn clear_ancestor_superseded<K, V, U, B, const N: usize>(
569    bitmap: &mut BitmapDiff<'_, B, N>,
570    chain: &[std::sync::Arc<Vec<Operation<K, V, U>>>],
571    diff: &BTreeMap<K, DiffEntry<V::Value>>,
572    db_base: u64,
573) where
574    K: Key,
575    V: ValueEncoding,
576    U: update::Update<K, V>,
577    B: BitmapRead<N>,
578    Operation<K, V, U>: Codec,
579{
580    let mut seg_base = db_base;
581    for ancestor_seg in &chain[..chain.len() - 1] {
582        for (j, op) in ancestor_seg.iter().enumerate() {
583            if let Some(key) = op.key() {
584                let ancestor_loc = Location::new(seg_base + j as u64);
585                if let Some(entry) = diff.get(key) {
586                    if entry.loc() != Some(ancestor_loc) {
587                        bitmap.clear_bit(ancestor_loc);
588                    }
589                }
590            }
591        }
592        seg_base += ancestor_seg.len() as u64;
593    }
594}
595
596/// Compute the current layer (bitmap + grafted MMR + canonical root) on top of
597/// a merkleized any batch.
598///
599/// Creates a `BitmapDiff` and grafted MMR batch from the immediate parent's
600/// state, and produces the speculative grafted `MerkleizedBatch` and
601/// `BitmapDiff` that live on the returned `MerkleizedBatch`. The ancestor
602/// chain's accumulated bitmap pushes/clears are stored alongside the diff
603/// so that `finalize()` can concatenate them without recomputation.
604async fn compute_current_layer<'a, E, K, V, U, C, I, H, P, G, B, const N: usize>(
605    inner: any::batch::MerkleizedBatch<'a, E, K, V, C, I, H, U, P>,
606    current_db: &'a super::db::Db<E, C, I, H, U, N>,
607    base_bitmap_pushes: Vec<Arc<Vec<bool>>>,
608    base_bitmap_clears: Vec<Arc<Vec<Location>>>,
609    grafted_parent: &'a G,
610    bitmap_parent: &'a B,
611) -> Result<MerkleizedBatch<'a, E, K, V, C, I, H, U, P, G, B, N>, Error>
612where
613    E: Storage + Clock + Metrics,
614    K: Key,
615    V: ValueEncoding,
616    U: update::Update<K, V> + Send + Sync,
617    C: Contiguous<Item = Operation<K, V, U>>,
618    I: UnorderedIndex<Value = Location>,
619    H: Hasher,
620    Operation<K, V, U>: Codec,
621    P: Readable<H::Digest> + BatchChainInfo<H::Digest> + BatchChain<Operation<K, V, U>>,
622    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
623    B: BitmapRead<N>,
624{
625    let old_grafted_leaves = *grafted_parent.leaves() as usize;
626    let mut bitmap = BitmapDiff::new(bitmap_parent, old_grafted_leaves);
627
628    let chain = &inner.base_operations;
629    let this_segment = chain.last().expect("operation chain should not be empty");
630    let segment_base = *inner.new_last_commit_loc + 1 - this_segment.len() as u64;
631
632    // 1. Inactivate previous commit.
633    let prev_commit_loc = Location::new(segment_base - 1);
634    bitmap.clear_bit(prev_commit_loc);
635
636    // 2. Push bitmap bits for this segment's operations.
637    push_operation_bits(&mut bitmap, this_segment, segment_base, &inner.diff);
638
639    // 3. Clear superseded base-DB operations.
640    clear_base_old_locs(&mut bitmap, &inner.diff);
641
642    // 4. Clear ancestor-segment superseded operations (chaining only).
643    if chain.len() > 1 {
644        clear_ancestor_superseded(
645            &mut bitmap,
646            chain,
647            &inner.diff,
648            *current_db.any.last_commit_loc + 1,
649        );
650    }
651
652    // 5. Compute grafted leaves for dirty + new chunks.
653    //    dirty_chunks contains indices < old_grafted_leaves (existing chunks
654    //    modified by clears). New chunks are in [old_grafted_leaves, new_grafted_leaves).
655    //    These ranges never overlap, so each chunk is processed exactly once.
656    let new_grafted_leaves = bitmap.complete_chunks();
657    let chunks_to_update = (old_grafted_leaves..new_grafted_leaves)
658        .chain(bitmap.dirty_chunks.iter().copied())
659        .map(|i| (i, bitmap.get_chunk(i)));
660    let ops_mmr_adapter = BatchStorageAdapter::new(&inner.journal_batch, &current_db.any.log.mmr);
661    let mut hasher = StandardHasher::<H>::new();
662    let new_leaves = compute_grafted_leaves::<H, N>(
663        &mut hasher,
664        &ops_mmr_adapter,
665        chunks_to_update,
666        current_db.thread_pool.as_ref(),
667    )
668    .await?;
669
670    // 6. Build grafted MMR batch from parent ref (no clone).
671    let grafting_height = grafting::height::<N>();
672    let grafted_merkleized = {
673        let mut grafted_batch =
674            mmr::UnmerkleizedBatch::new(grafted_parent).with_pool(current_db.thread_pool.clone());
675        for &(ops_pos, digest) in &new_leaves {
676            let grafted_pos = grafting::ops_to_grafted_pos(ops_pos, grafting_height);
677            if grafted_pos < grafted_batch.size() {
678                let loc = Location::try_from(grafted_pos).expect("grafted_pos overflow");
679                grafted_batch
680                    .update_leaf_digest(loc, digest)
681                    .expect("update_leaf_digest failed");
682            } else {
683                grafted_batch.add_leaf_digest(digest);
684            }
685        }
686        let mut gh = grafting::GraftedHasher::new(hasher.fork(), grafting_height);
687        grafted_batch.merkleize(&mut gh)
688    };
689
690    // 7. Compute canonical root using the merkleized batch directly.
691    let ops_root = inner.root();
692    let grafted_storage =
693        grafting::Storage::new(&grafted_merkleized, grafting_height, &ops_mmr_adapter);
694    let partial = partial_chunk(&bitmap);
695    let canonical_root =
696        compute_db_root::<H, _, _, N>(&mut hasher, &grafted_storage, partial, &ops_root).await?;
697
698    Ok(MerkleizedBatch {
699        inner,
700        current_db,
701        base_bitmap_pushes,
702        base_bitmap_clears,
703        grafted_merkleized,
704        bitmap_diff: bitmap,
705        canonical_root,
706    })
707}
708
709impl<'a, E, K, V, C, I, H, U, P, G, B, const N: usize>
710    MerkleizedBatch<'a, E, K, V, C, I, H, U, P, G, B, N>
711where
712    E: Storage + Clock + Metrics,
713    K: Key,
714    V: ValueEncoding,
715    U: update::Update<K, V> + Send + Sync,
716    C: Contiguous<Item = Operation<K, V, U>>,
717    I: UnorderedIndex<Value = Location>,
718    H: Hasher,
719    Operation<K, V, U>: Codec,
720    P: Readable<H::Digest> + BatchChainInfo<H::Digest> + BatchChain<Operation<K, V, U>>,
721    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
722    B: BitmapRead<N>,
723{
724    /// Return the speculative root.
725    pub const fn root(&self) -> H::Digest {
726        self.canonical_root
727    }
728
729    /// Return the ops-only MMR root.
730    pub fn ops_root(&self) -> H::Digest {
731        self.inner.root()
732    }
733
734    /// Create a new speculative batch of operations with this batch as its parent.
735    #[allow(clippy::type_complexity)]
736    pub fn new_batch(
737        &self,
738    ) -> UnmerkleizedBatch<
739        '_,
740        E,
741        K,
742        V,
743        C,
744        I,
745        H,
746        U,
747        authenticated::MerkleizedBatch<'a, H, P, Operation<K, V, U>>,
748        mmr::MerkleizedBatch<'a, H::Digest, G>,
749        BitmapDiff<'a, B, N>,
750        N,
751    > {
752        // Clone the chain of Arc segments (1 Arc bump per batch in the chain), then push this
753        // batch's diff data as a new segment.
754        let mut pushes = self.base_bitmap_pushes.clone();
755        pushes.push(Arc::new(self.bitmap_diff.pushed_bits().to_vec()));
756        let mut clears = self.base_bitmap_clears.clone();
757        clears.push(Arc::new(self.bitmap_diff.cleared_bits().to_vec()));
758        UnmerkleizedBatch {
759            inner: self.inner.new_batch(),
760            current_db: self.current_db,
761            base_bitmap_pushes: pushes,
762            base_bitmap_clears: clears,
763            grafted_parent: &self.grafted_merkleized,
764            bitmap_parent: &self.bitmap_diff,
765        }
766    }
767}
768
769// Unordered get.
770impl<'a, E, K, V, C, I, H, P, G, B, const N: usize>
771    MerkleizedBatch<'a, E, K, V, C, I, H, update::Unordered<K, V>, P, G, B, N>
772where
773    E: Storage + Clock + Metrics,
774    K: Key,
775    V: ValueEncoding,
776    C: Contiguous<Item = Operation<K, V, update::Unordered<K, V>>>,
777    I: UnorderedIndex<Value = Location> + 'static,
778    H: Hasher,
779    Operation<K, V, update::Unordered<K, V>>: Codec,
780    V::Value: Send + Sync,
781    P: Readable<H::Digest>
782        + BatchChainInfo<H::Digest>
783        + BatchChain<Operation<K, V, update::Unordered<K, V>>>,
784    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
785    B: BitmapRead<N>,
786{
787    /// Read through: diff -> committed DB.
788    pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error> {
789        self.inner.get(key).await
790    }
791}
792
793// Ordered get.
794impl<'a, E, K, V, C, I, H, P, G, B, const N: usize>
795    MerkleizedBatch<'a, E, K, V, C, I, H, update::Ordered<K, V>, P, G, B, N>
796where
797    E: Storage + Clock + Metrics,
798    K: Key,
799    V: ValueEncoding,
800    C: Contiguous<Item = Operation<K, V, update::Ordered<K, V>>>,
801    I: crate::index::Ordered<Value = Location> + 'static,
802    H: Hasher,
803    Operation<K, V, update::Ordered<K, V>>: Codec,
804    V::Value: Send + Sync,
805    P: Readable<H::Digest>
806        + BatchChainInfo<H::Digest>
807        + BatchChain<Operation<K, V, update::Ordered<K, V>>>,
808    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
809    B: BitmapRead<N>,
810{
811    /// Read through: diff -> committed DB.
812    pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error> {
813        self.inner.get(key).await
814    }
815}
816
817// Finalize (requires Mutable journal for apply_batch).
818impl<'a, E, K, V, C, I, H, U, P, G, B, const N: usize>
819    MerkleizedBatch<'a, E, K, V, C, I, H, U, P, G, B, N>
820where
821    E: Storage + Clock + Metrics,
822    K: Key,
823    V: ValueEncoding,
824    U: update::Update<K, V> + Send + Sync + 'static,
825    C: Mutable<Item = Operation<K, V, U>>,
826    I: UnorderedIndex<Value = Location>,
827    H: Hasher,
828    Operation<K, V, U>: Codec,
829    P: Readable<H::Digest> + BatchChainInfo<H::Digest> + BatchChain<Operation<K, V, U>>,
830    G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
831    B: BitmapRead<N>,
832{
833    /// Consume this batch, producing an owned [`Changeset`].
834    pub fn finalize(self) -> Changeset<K, H::Digest, Operation<K, V, U>, N> {
835        // Flatten the chain of Arc segments + this level's diff into flat Vecs.
836        let total_pushes: usize = self
837            .base_bitmap_pushes
838            .iter()
839            .map(|s| s.len())
840            .sum::<usize>()
841            + self.bitmap_diff.pushed_bits().len();
842        let mut bitmap_pushes = Vec::with_capacity(total_pushes);
843        for seg in &self.base_bitmap_pushes {
844            bitmap_pushes.extend_from_slice(seg);
845        }
846        bitmap_pushes.extend_from_slice(self.bitmap_diff.pushed_bits());
847
848        let total_clears: usize = self
849            .base_bitmap_clears
850            .iter()
851            .map(|s| s.len())
852            .sum::<usize>()
853            + self.bitmap_diff.cleared_bits().len();
854        let mut bitmap_clears = Vec::with_capacity(total_clears);
855        for seg in &self.base_bitmap_clears {
856            bitmap_clears.extend_from_slice(seg);
857        }
858        bitmap_clears.extend_from_slice(self.bitmap_diff.cleared_bits());
859
860        Changeset {
861            inner: self.inner.finalize(),
862            bitmap_pushes,
863            bitmap_clears,
864            grafted_changeset: self.grafted_merkleized.finalize(),
865            canonical_root: self.canonical_root,
866        }
867    }
868}
869
870#[cfg(any(test, feature = "test-traits"))]
871mod trait_impls {
872    use super::*;
873    use crate::{
874        journal::contiguous::Mutable,
875        mmr::journaled::Mmr,
876        qmdb::any::traits::{
877            BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
878            UnmerkleizedBatch as UnmerkleizedBatchTrait,
879        },
880        Persistable,
881    };
882    use std::future::Future;
883
884    type CurrentDb<E, C, I, H, U, const N: usize> = crate::qmdb::current::db::Db<E, C, I, H, U, N>;
885
886    impl<'a, E, K, V, C, I, H, P, G, B, const N: usize> UnmerkleizedBatchTrait
887        for UnmerkleizedBatch<'a, E, K, V, C, I, H, update::Unordered<K, V>, P, G, B, N>
888    where
889        E: Storage + Clock + Metrics,
890        K: Key,
891        V: ValueEncoding + 'static,
892        C: Mutable<Item = Operation<K, V, update::Unordered<K, V>>>,
893        I: UnorderedIndex<Value = Location> + 'static,
894        H: Hasher,
895        Operation<K, V, update::Unordered<K, V>>: Codec,
896        V::Value: Send + Sync,
897        P: Readable<H::Digest>
898            + BatchChainInfo<H::Digest>
899            + BatchChain<Operation<K, V, update::Unordered<K, V>>>,
900        G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
901        B: BitmapRead<N>,
902    {
903        type K = K;
904        type V = V::Value;
905        type Metadata = V::Value;
906        type Merkleized =
907            super::MerkleizedBatch<'a, E, K, V, C, I, H, update::Unordered<K, V>, P, G, B, N>;
908
909        fn write(&mut self, key: K, value: Option<V::Value>) {
910            UnmerkleizedBatch::write(self, key, value);
911        }
912
913        fn merkleize(
914            self,
915            metadata: Option<V::Value>,
916        ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error>> {
917            self.merkleize(metadata)
918        }
919    }
920
921    impl<'a, E, K, V, C, I, H, P, G, B, const N: usize> UnmerkleizedBatchTrait
922        for UnmerkleizedBatch<'a, E, K, V, C, I, H, update::Ordered<K, V>, P, G, B, N>
923    where
924        E: Storage + Clock + Metrics,
925        K: Key,
926        V: ValueEncoding + 'static,
927        C: Mutable<Item = Operation<K, V, update::Ordered<K, V>>>,
928        I: crate::index::Ordered<Value = Location> + 'static,
929        H: Hasher,
930        Operation<K, V, update::Ordered<K, V>>: Codec,
931        V::Value: Send + Sync,
932        P: Readable<H::Digest>
933            + BatchChainInfo<H::Digest>
934            + BatchChain<Operation<K, V, update::Ordered<K, V>>>,
935        G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
936        B: BitmapRead<N>,
937    {
938        type K = K;
939        type V = V::Value;
940        type Metadata = V::Value;
941        type Merkleized =
942            super::MerkleizedBatch<'a, E, K, V, C, I, H, update::Ordered<K, V>, P, G, B, N>;
943
944        fn write(&mut self, key: K, value: Option<V::Value>) {
945            UnmerkleizedBatch::write(self, key, value);
946        }
947
948        fn merkleize(
949            self,
950            metadata: Option<V::Value>,
951        ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error>> {
952            self.merkleize(metadata)
953        }
954    }
955
956    impl<'a, E, K, V, C, I, H, U, P, G, B, const N: usize> MerkleizedBatchTrait
957        for super::MerkleizedBatch<'a, E, K, V, C, I, H, U, P, G, B, N>
958    where
959        E: Storage + Clock + Metrics,
960        K: Key,
961        V: ValueEncoding,
962        U: update::Update<K, V> + Send + Sync + 'static,
963        C: Mutable<Item = Operation<K, V, U>>,
964        I: UnorderedIndex<Value = Location>,
965        H: Hasher,
966        Operation<K, V, U>: Codec,
967        P: Readable<H::Digest> + BatchChainInfo<H::Digest> + BatchChain<Operation<K, V, U>>,
968        G: Readable<H::Digest> + BatchChainInfo<H::Digest>,
969        B: BitmapRead<N>,
970    {
971        type Digest = H::Digest;
972        type Changeset = Changeset<K, H::Digest, Operation<K, V, U>, N>;
973
974        fn root(&self) -> H::Digest {
975            self.root()
976        }
977
978        fn finalize(self) -> Self::Changeset {
979            self.finalize()
980        }
981    }
982
983    impl<E, K, V, C, I, H, const N: usize> BatchableDb
984        for CurrentDb<E, C, I, H, update::Unordered<K, V>, N>
985    where
986        E: Storage + Clock + Metrics,
987        K: Key,
988        V: ValueEncoding + 'static,
989        C: Mutable<Item = Operation<K, V, update::Unordered<K, V>>>
990            + Persistable<Error = crate::journal::Error>,
991        I: UnorderedIndex<Value = Location> + 'static,
992        H: Hasher,
993        Operation<K, V, update::Unordered<K, V>>: Codec,
994        V::Value: Send + Sync,
995    {
996        type K = K;
997        type V = V::Value;
998        type Changeset = Changeset<K, H::Digest, Operation<K, V, update::Unordered<K, V>>, N>;
999        type Batch<'a>
1000            = UnmerkleizedBatch<
1001            'a,
1002            E,
1003            K,
1004            V,
1005            C,
1006            I,
1007            H,
1008            update::Unordered<K, V>,
1009            Mmr<E, H::Digest>,
1010            mmr::mem::Mmr<H::Digest>,
1011            commonware_utils::bitmap::Prunable<N>,
1012            N,
1013        >
1014        where
1015            Self: 'a;
1016
1017        fn new_batch(&self) -> Self::Batch<'_> {
1018            self.new_batch()
1019        }
1020
1021        fn apply_batch(
1022            &mut self,
1023            batch: Self::Changeset,
1024        ) -> impl Future<Output = Result<core::ops::Range<Location>, crate::qmdb::Error>> {
1025            self.apply_batch(batch)
1026        }
1027    }
1028
1029    impl<E, K, V, C, I, H, const N: usize> BatchableDb
1030        for CurrentDb<E, C, I, H, update::Ordered<K, V>, N>
1031    where
1032        E: Storage + Clock + Metrics,
1033        K: Key,
1034        V: ValueEncoding + 'static,
1035        C: Mutable<Item = Operation<K, V, update::Ordered<K, V>>>
1036            + Persistable<Error = crate::journal::Error>,
1037        I: crate::index::Ordered<Value = Location> + 'static,
1038        H: Hasher,
1039        Operation<K, V, update::Ordered<K, V>>: Codec,
1040        V::Value: Send + Sync,
1041    {
1042        type K = K;
1043        type V = V::Value;
1044        type Changeset = Changeset<K, H::Digest, Operation<K, V, update::Ordered<K, V>>, N>;
1045        type Batch<'a>
1046            = UnmerkleizedBatch<
1047            'a,
1048            E,
1049            K,
1050            V,
1051            C,
1052            I,
1053            H,
1054            update::Ordered<K, V>,
1055            Mmr<E, H::Digest>,
1056            mmr::mem::Mmr<H::Digest>,
1057            commonware_utils::bitmap::Prunable<N>,
1058            N,
1059        >
1060        where
1061            Self: 'a;
1062
1063        fn new_batch(&self) -> Self::Batch<'_> {
1064            self.new_batch()
1065        }
1066
1067        fn apply_batch(
1068            &mut self,
1069            batch: Self::Changeset,
1070        ) -> impl Future<Output = Result<core::ops::Range<Location>, crate::qmdb::Error>> {
1071            self.apply_batch(batch)
1072        }
1073    }
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::*;
1079    use commonware_utils::bitmap::Prunable as BitMap;
1080
1081    // N=4 -> CHUNK_SIZE_BITS = 32
1082    const N: usize = 4;
1083    type Bm = BitMap<N>;
1084
1085    fn make_bitmap(bits: &[bool]) -> Bm {
1086        let mut bm = Bm::new();
1087        for &b in bits {
1088            bm.push(b);
1089        }
1090        bm
1091    }
1092
1093    // ---- BitmapDiff tests ----
1094
1095    #[test]
1096    fn bitmap_diff_push_only() {
1097        let base = Bm::new();
1098        let mut diff = BitmapDiff::<Bm, N>::new(&base, 0);
1099        for i in 0..10 {
1100            diff.push_bit(i % 2 == 0);
1101        }
1102        assert_eq!(diff.len(), 10);
1103        assert_eq!(diff.complete_chunks(), 0);
1104
1105        let chunk = diff.get_chunk(0);
1106        // Bits 0,2,4,6,8 set -> 0b_0000_0001_0101_0101 in LE byte order
1107        // byte 0: bits 0..7 -> 0b0101_0101 = 0x55
1108        // byte 1: bits 8..9 -> bit 8 set -> 0b0000_0001 = 0x01
1109        assert_eq!(chunk[0], 0x55);
1110        assert_eq!(chunk[1], 0x01);
1111        assert_eq!(chunk[2], 0);
1112        assert_eq!(chunk[3], 0);
1113    }
1114
1115    #[test]
1116    fn bitmap_diff_clear_on_base() {
1117        // Base has bits 0..8 all set.
1118        let base = make_bitmap(&[true; 8]);
1119        let mut diff = BitmapDiff::<Bm, N>::new(&base, 0);
1120
1121        // Clear bit 3.
1122        diff.clear_bit(Location::new(3));
1123        let chunk = diff.get_chunk(0);
1124        // 0xFF with bit 3 cleared -> 0b1111_0111 = 0xF7
1125        assert_eq!(chunk[0], 0xF7);
1126    }
1127
1128    #[test]
1129    fn bitmap_diff_push_and_clear_same_chunk() {
1130        // Base has 8 bits set.
1131        let base = make_bitmap(&[true; 8]);
1132        let mut diff = BitmapDiff::<Bm, N>::new(&base, 0);
1133
1134        // Push 2 more bits (active).
1135        diff.push_bit(true);
1136        diff.push_bit(true);
1137        assert_eq!(diff.len(), 10);
1138
1139        // Clear base bit 0.
1140        diff.clear_bit(Location::new(0));
1141        let chunk = diff.get_chunk(0);
1142        // Base byte 0: 0xFF -> clear bit 0 -> 0xFE
1143        // Pushed bits at positions 8,9 -> byte 1: bit 0 and 1 set -> 0x03
1144        assert_eq!(chunk[0], 0xFE);
1145        assert_eq!(chunk[1], 0x03);
1146    }
1147
1148    #[test]
1149    fn bitmap_diff_cross_chunk_boundary() {
1150        // Base has 30 bits (partial first chunk, CHUNK_SIZE_BITS=32).
1151        let base = make_bitmap(&[true; 30]);
1152        let mut diff = BitmapDiff::<Bm, N>::new(&base, 0);
1153
1154        // Push 5 bits to cross into second chunk (total 35).
1155        for _ in 0..5 {
1156            diff.push_bit(true);
1157        }
1158        assert_eq!(diff.len(), 35);
1159        assert_eq!(diff.complete_chunks(), 1);
1160
1161        // First chunk (32 bits) should be all ones.
1162        let c0 = diff.get_chunk(0);
1163        assert_eq!(c0, [0xFF, 0xFF, 0xFF, 0xFF]);
1164
1165        // Second chunk: 3 bits set (positions 32,33,34).
1166        let c1 = diff.get_chunk(1);
1167        assert_eq!(c1[0], 0x07);
1168        assert_eq!(c1[1], 0);
1169    }
1170
1171    #[test]
1172    fn bitmap_diff_last_chunk_partial() {
1173        let base = Bm::new();
1174        let mut diff = BitmapDiff::<Bm, N>::new(&base, 0);
1175        // Push 5 bits.
1176        for _ in 0..5 {
1177            diff.push_bit(true);
1178        }
1179        let (chunk, bits_in_last) = diff.last_chunk();
1180        assert_eq!(bits_in_last, 5);
1181        assert_eq!(chunk[0], 0x1F); // lower 5 bits set
1182    }
1183
1184    #[test]
1185    fn bitmap_diff_last_chunk_aligned() {
1186        let base = Bm::new();
1187        let mut diff = BitmapDiff::<Bm, N>::new(&base, 0);
1188        // Push exactly CHUNK_SIZE_BITS = 32 bits.
1189        for _ in 0..32 {
1190            diff.push_bit(true);
1191        }
1192        let (chunk, bits_in_last) = diff.last_chunk();
1193        assert_eq!(bits_in_last, 32);
1194        assert_eq!(chunk, [0xFF; 4]);
1195    }
1196
1197    #[test]
1198    fn bitmap_diff_dirty_chunks_tracking() {
1199        // 3 complete chunks = 96 bits.
1200        let base = make_bitmap(&[true; 96]);
1201        assert_eq!(base.complete_chunks(), 3);
1202
1203        let mut diff = BitmapDiff::<Bm, N>::new(&base, 3);
1204
1205        // Clear bit in chunk 1 (bit 40 -> chunk 40/32 = 1).
1206        diff.clear_bit(Location::new(40));
1207        assert!(diff.dirty_chunks.contains(&1));
1208
1209        // Push a bit, then clear it. It's beyond old_grafted_leaves, so
1210        // dirty_chunks should still only contain {1}.
1211        diff.push_bit(true);
1212        diff.clear_bit(Location::new(96)); // in the pushed region
1213        assert_eq!(diff.dirty_chunks.len(), 1);
1214        assert!(diff.dirty_chunks.contains(&1));
1215    }
1216
1217    // ---- push_operation_bits test ----
1218
1219    #[test]
1220    fn push_operation_bits_mixed() {
1221        use crate::qmdb::any::{
1222            operation::{update, Operation},
1223            value::FixedEncoding,
1224        };
1225        use commonware_utils::sequence::FixedBytes;
1226
1227        type K = FixedBytes<4>;
1228        type V = FixedEncoding<u64>;
1229        type U = update::Unordered<K, V>;
1230        type Op = Operation<K, V, U>;
1231
1232        let key1 = FixedBytes::from([1, 0, 0, 0]);
1233        let key2 = FixedBytes::from([2, 0, 0, 0]);
1234        let key3 = FixedBytes::from([3, 0, 0, 0]);
1235
1236        // Segment: Update(key1), Update(key2), Delete(key3), CommitFloor
1237        let segment: Vec<Op> = vec![
1238            Op::Update(update::Unordered(key1.clone(), 100u64)),
1239            Op::Update(update::Unordered(key2.clone(), 200u64)),
1240            Op::Delete(key3),
1241            Op::CommitFloor(None, Location::new(99)),
1242        ];
1243
1244        // Diff: key1 active at loc=0, key2 superseded (active at loc=99, not loc=1).
1245        let mut diff = BTreeMap::new();
1246        diff.insert(
1247            key1,
1248            DiffEntry::Active {
1249                value: 100u64,
1250                loc: Location::new(0),
1251                base_old_loc: None,
1252            },
1253        );
1254        diff.insert(
1255            key2,
1256            DiffEntry::Active {
1257                value: 200u64,
1258                loc: Location::new(99), // not loc=1, so superseded
1259                base_old_loc: None,
1260            },
1261        );
1262
1263        let base = Bm::new();
1264        let mut bitmap = BitmapDiff::<Bm, N>::new(&base, 0);
1265        push_operation_bits::<K, V, U, Bm, N>(&mut bitmap, &segment, 0, &diff);
1266
1267        // Expected: [true(key1 active), false(key2 superseded), false(delete), true(commit)]
1268        assert_eq!(bitmap.pushed_bits(), &[true, false, false, true]);
1269    }
1270
1271    // ---- clear_base_old_locs test ----
1272
1273    #[test]
1274    fn clear_base_old_locs_mixed() {
1275        type K = u64;
1276
1277        // Base bitmap with 64 bits.
1278        let base = make_bitmap(&[true; 64]);
1279        let mut bitmap = BitmapDiff::<Bm, N>::new(&base, 2);
1280
1281        let mut diff: BTreeMap<K, DiffEntry<u64>> = BTreeMap::new();
1282
1283        // key1: Active with base_old_loc = Some(5) -> should clear bit 5.
1284        diff.insert(
1285            1,
1286            DiffEntry::Active {
1287                value: 100,
1288                loc: Location::new(70),
1289                base_old_loc: Some(Location::new(5)),
1290            },
1291        );
1292
1293        // key2: Deleted with base_old_loc = Some(10) -> should clear bit 10.
1294        diff.insert(
1295            2,
1296            DiffEntry::Deleted {
1297                base_old_loc: Some(Location::new(10)),
1298            },
1299        );
1300
1301        // key3: Active with base_old_loc = None -> no clear.
1302        diff.insert(
1303            3,
1304            DiffEntry::Active {
1305                value: 300,
1306                loc: Location::new(71),
1307                base_old_loc: None,
1308            },
1309        );
1310
1311        clear_base_old_locs::<K, u64, Bm, N>(&mut bitmap, &diff);
1312
1313        assert_eq!(bitmap.cleared_bits().len(), 2);
1314
1315        // Verify bits 5 and 10 are cleared.
1316        let c0 = bitmap.get_chunk(0);
1317        assert_eq!(c0[0] & (1 << 5), 0); // bit 5 cleared
1318        assert_eq!(c0[1] & (1 << 2), 0); // bit 10 = byte 1, bit 2 cleared
1319
1320        // Other bits should still be set.
1321        assert_eq!(c0[0] & (1 << 4), 1 << 4); // bit 4 still set
1322        assert_eq!(c0[1] & (1 << 3), 1 << 3); // bit 11 still set
1323    }
1324
1325    // ---- FloorScan tests ----
1326
1327    use crate::qmdb::any::batch::{FloorScan, SequentialScan};
1328
1329    #[test]
1330    fn sequential_scan_returns_floor_when_below_tip() {
1331        let mut scan = SequentialScan;
1332        assert_eq!(
1333            scan.next_candidate(Location::new(5), 10),
1334            Some(Location::new(5))
1335        );
1336    }
1337
1338    #[test]
1339    fn sequential_scan_returns_none_at_tip() {
1340        let mut scan = SequentialScan;
1341        assert_eq!(scan.next_candidate(Location::new(10), 10), None);
1342        assert_eq!(scan.next_candidate(Location::new(11), 10), None);
1343    }
1344
1345    #[test]
1346    fn bitmap_scan_all_active() {
1347        let bm = make_bitmap(&[true; 8]);
1348        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1349        for i in 0..8 {
1350            assert_eq!(
1351                scan.next_candidate(Location::new(i), 8),
1352                Some(Location::new(i))
1353            );
1354        }
1355        assert_eq!(scan.next_candidate(Location::new(8), 8), None);
1356    }
1357
1358    #[test]
1359    fn bitmap_scan_all_inactive() {
1360        let bm = make_bitmap(&[false; 8]);
1361        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1362        assert_eq!(scan.next_candidate(Location::new(0), 8), None);
1363    }
1364
1365    #[test]
1366    fn bitmap_scan_skips_inactive() {
1367        // Pattern: inactive, inactive, active, inactive, active
1368        let bm = make_bitmap(&[false, false, true, false, true]);
1369        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1370
1371        assert_eq!(
1372            scan.next_candidate(Location::new(0), 5),
1373            Some(Location::new(2))
1374        );
1375        assert_eq!(
1376            scan.next_candidate(Location::new(3), 5),
1377            Some(Location::new(4))
1378        );
1379        assert_eq!(scan.next_candidate(Location::new(5), 5), None);
1380    }
1381
1382    #[test]
1383    fn bitmap_scan_beyond_bitmap_len_returns_candidate() {
1384        // Bitmap has 4 bits, but tip is 8. Locations 4..8 are beyond the
1385        // bitmap and should be returned as candidates.
1386        let bm = make_bitmap(&[false; 4]);
1387        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1388
1389        // All bitmap bits are unset, so 0..4 are skipped.
1390        // Location 4 is beyond bitmap -> candidate.
1391        assert_eq!(
1392            scan.next_candidate(Location::new(0), 8),
1393            Some(Location::new(4))
1394        );
1395        assert_eq!(
1396            scan.next_candidate(Location::new(6), 8),
1397            Some(Location::new(6))
1398        );
1399    }
1400
1401    #[test]
1402    fn bitmap_scan_respects_tip() {
1403        let bm = make_bitmap(&[false, false, false, true]);
1404        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1405
1406        // Active bit at 3, but tip is 3 so it's excluded.
1407        assert_eq!(scan.next_candidate(Location::new(0), 3), None);
1408        // With tip=4, bit 3 is included.
1409        assert_eq!(
1410            scan.next_candidate(Location::new(0), 4),
1411            Some(Location::new(3))
1412        );
1413    }
1414
1415    #[test]
1416    fn bitmap_scan_floor_at_tip() {
1417        let bm = make_bitmap(&[true; 4]);
1418        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1419        assert_eq!(scan.next_candidate(Location::new(4), 4), None);
1420    }
1421
1422    #[test]
1423    fn bitmap_scan_empty_bitmap() {
1424        let bm = Bm::new();
1425        let mut scan = BitmapScan::<Bm, N>::new(&bm);
1426
1427        // Empty bitmap, but tip > 0: all locations are beyond bitmap.
1428        assert_eq!(
1429            scan.next_candidate(Location::new(0), 5),
1430            Some(Location::new(0))
1431        );
1432        // Empty bitmap, tip = 0: no candidates.
1433        assert_eq!(scan.next_candidate(Location::new(0), 0), None);
1434    }
1435
1436    #[test]
1437    fn bitmap_scan_with_bitmap_diff() {
1438        // Base: bits 0..8 all active.
1439        let base = make_bitmap(&[true; 8]);
1440        let mut diff = BitmapDiff::<Bm, N>::new(&base, 0);
1441
1442        // Clear bits 2 and 5.
1443        diff.clear_bit(Location::new(2));
1444        diff.clear_bit(Location::new(5));
1445
1446        // Push two inactive bits and one active bit beyond the base.
1447        diff.push_bit(false);
1448        diff.push_bit(false);
1449        diff.push_bit(true);
1450
1451        let mut scan = BitmapScan::<BitmapDiff<'_, Bm, N>, N>::new(&diff);
1452
1453        // Should skip bit 2 (cleared), return 0.
1454        assert_eq!(
1455            scan.next_candidate(Location::new(0), 11),
1456            Some(Location::new(0))
1457        );
1458        // From 2: skip cleared bit 2, return 3.
1459        assert_eq!(
1460            scan.next_candidate(Location::new(2), 11),
1461            Some(Location::new(3))
1462        );
1463        // From 5: skip cleared bit 5, return 6.
1464        assert_eq!(
1465            scan.next_candidate(Location::new(5), 11),
1466            Some(Location::new(6))
1467        );
1468        // From 8: skip pushed inactive 8,9, return active pushed 10.
1469        assert_eq!(
1470            scan.next_candidate(Location::new(8), 11),
1471            Some(Location::new(10))
1472        );
1473    }
1474}