Skip to main content

commonware_storage/qmdb/any/
batch.rs

1//! Batch mutation API for Any QMDBs.
2
3use crate::{
4    index::{Ordered as OrderedIndex, Unordered as UnorderedIndex},
5    journal::{
6        authenticated,
7        contiguous::{Contiguous, Mutable, Reader},
8    },
9    merkle::{Family, Location},
10    qmdb::{
11        any::{
12            db::Db,
13            operation::{update, Operation},
14            ordered::{find_next_key, find_prev_key},
15            ValueEncoding,
16        },
17        delete_known_loc,
18        operation::{Key, Operation as OperationTrait},
19        update_known_loc,
20    },
21    Context,
22};
23use commonware_codec::Codec;
24use commonware_cryptography::{Digest, Hasher};
25use core::{iter, ops::Range};
26use futures::future::try_join_all;
27use std::{
28    collections::{BTreeMap, BTreeSet},
29    sync::{Arc, Weak},
30};
31use tracing::debug;
32
33/// Maximum number of journal reads to issue concurrently during floor raising.
34const MAX_CONCURRENT_READS: u64 = 64;
35
36/// Strategy for finding the next active location during floor raising.
37pub(crate) trait FloorScan<F: Family> {
38    /// Return the next location at or after `floor` that might be active,
39    /// below `tip`. Returns `None` if no candidate exists in `[floor, tip)`.
40    fn next_candidate(&mut self, floor: Location<F>, tip: u64) -> Option<Location<F>>;
41}
42
43/// Sequential scan: every location is a candidate.
44// TODO(#1829): Always use bitmap for floor raising.
45pub(crate) struct SequentialScan;
46
47impl<F: Family> FloorScan<F> for SequentialScan {
48    fn next_candidate(&mut self, floor: Location<F>, tip: u64) -> Option<Location<F>> {
49        if *floor < tip {
50            Some(floor)
51        } else {
52            None
53        }
54    }
55}
56
57/// What happened to a key in this batch.
58#[derive(Clone)]
59pub(crate) enum DiffEntry<F: Family, V> {
60    /// Key was updated (existing) or created (new).
61    Active {
62        value: V,
63        /// Uncommitted location where this operation will be written.
64        loc: Location<F>,
65        /// The key's committed location in the DB snapshot, or `None` if the key did not exist
66        /// in the committed DB. Resolved during merkleize (either from the snapshot directly,
67        /// or inherited from the nearest ancestor that touched this key).
68        base_old_loc: Option<Location<F>>,
69    },
70    /// Key was deleted.
71    Deleted {
72        /// The key's committed location in the DB snapshot, or `None` if the key was created
73        /// by an ancestor batch and never existed in the committed DB.
74        base_old_loc: Option<Location<F>>,
75    },
76}
77
78impl<F: Family, V> DiffEntry<F, V> {
79    /// The key's location in the base DB snapshot, regardless of variant.
80    pub(crate) const fn base_old_loc(&self) -> Option<Location<F>> {
81        match self {
82            Self::Active { base_old_loc, .. } | Self::Deleted { base_old_loc } => *base_old_loc,
83        }
84    }
85
86    /// The uncommitted location if active, `None` if deleted.
87    pub(crate) const fn loc(&self) -> Option<Location<F>> {
88        match self {
89            Self::Active { loc, .. } => Some(*loc),
90            Self::Deleted { .. } => None,
91        }
92    }
93
94    /// The value if active, `None` if deleted.
95    pub(crate) const fn value(&self) -> Option<&V> {
96        match self {
97            Self::Active { value, .. } => Some(value),
98            Self::Deleted { .. } => None,
99        }
100    }
101}
102
103/// Where this batch's inherited state comes from.
104enum Base<F: Family, D: Digest, U: update::Update + Send + Sync>
105where
106    Operation<F, U>: Send + Sync,
107{
108    /// Created from the DB via `db.new_batch()`.
109    Db {
110        db_size: u64,
111        inactivity_floor_loc: Location<F>,
112        active_keys: usize,
113    },
114    /// Created from a parent batch via `parent.new_batch()`.
115    Child(Arc<MerkleizedBatch<F, D, U>>),
116}
117
118impl<F: Family, D: Digest, U: update::Update + Send + Sync> Base<F, D, U>
119where
120    Operation<F, U>: Send + Sync,
121{
122    /// Total operations before this batch (committed DB + ancestor batches).
123    fn base_size(&self) -> u64 {
124        match self {
125            Self::Db { db_size, .. } => *db_size,
126            Self::Child(parent) => parent.total_size,
127        }
128    }
129
130    /// Effective number of committed DB operations at the base of the batch chain.
131    /// For `Db`, this is the DB size when `new_batch()` was called.
132    /// For `Child`, this is inherited from the parent (which may be higher than
133    /// the original DB size if ancestors were dropped before merkleize).
134    fn db_size(&self) -> u64 {
135        match self {
136            Self::Db { db_size, .. } => *db_size,
137            Self::Child(parent) => parent.db_size,
138        }
139    }
140
141    fn inactivity_floor_loc(&self) -> Location<F> {
142        match self {
143            Self::Db {
144                inactivity_floor_loc,
145                ..
146            } => *inactivity_floor_loc,
147            Self::Child(parent) => parent.new_inactivity_floor_loc,
148        }
149    }
150
151    fn active_keys(&self) -> usize {
152        match self {
153            Self::Db { active_keys, .. } => *active_keys,
154            Self::Child(parent) => parent.total_active_keys,
155        }
156    }
157
158    const fn parent(&self) -> Option<&Arc<MerkleizedBatch<F, D, U>>> {
159        match self {
160            Self::Db { .. } => None,
161            Self::Child(parent) => Some(parent),
162        }
163    }
164}
165
166/// A speculative batch of operations whose root digest has not yet been computed,
167/// in contrast to [`MerkleizedBatch`].
168///
169/// Methods that need the committed DB (e.g. `get`, `merkleize`) accept it as a
170/// parameter, so the batch is lifetime-free and can be stored independently of the DB.
171pub struct UnmerkleizedBatch<F: Family, H, U>
172where
173    U: update::Update + Send + Sync,
174    H: Hasher,
175    Operation<F, U>: Codec,
176{
177    /// Authenticated journal batch for computing the speculative Merkle root.
178    journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, U>>,
179
180    /// Pending mutations. `Some(value)` for upsert, `None` for delete.
181    mutations: BTreeMap<U::Key, Option<U::Value>>,
182
183    /// The committed DB or parent batch this batch was created from.
184    base: Base<F, H::Digest, U>,
185}
186
187/// A speculative batch of operations whose root digest has been computed,
188/// in contrast to [`UnmerkleizedBatch`].
189///
190/// # Forking
191///
192/// Multiple children can share the same parent, forming a tree:
193///
194/// ```text
195/// DB <-- B1 <-- B2 <-- B4
196///                \
197///                 B3
198/// ```
199///
200/// # Committing batches
201///
202/// [`Db::apply_batch`] applies the batch and any uncommitted ancestors automatically.
203///
204/// ```text
205/// db.apply_batch(b1).await.unwrap();
206/// db.apply_batch(b3).await.unwrap();  // Also applies b2's changes.
207/// ```
208#[allow(clippy::type_complexity)]
209#[derive(Clone)]
210pub struct MerkleizedBatch<F: Family, D: Digest, U: update::Update + Send + Sync>
211where
212    Operation<F, U>: Send + Sync,
213{
214    /// Merkleized authenticated journal batch (provides the speculative Merkle root).
215    pub(crate) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<F, U>>>,
216
217    /// This batch's local key-level changes only (not accumulated from ancestors).
218    pub(crate) diff: Arc<BTreeMap<U::Key, DiffEntry<F, U::Value>>>,
219
220    /// The parent batch in the chain, if any.
221    parent: Option<Weak<Self>>,
222
223    /// Inactivity floor location after this batch's floor raise.
224    pub(crate) new_inactivity_floor_loc: Location<F>,
225
226    /// Location of the CommitFloor operation appended by this batch.
227    pub(crate) new_last_commit_loc: Location<F>,
228
229    /// Total operations before this batch's own ops (DB + ancestor batches).
230    pub(crate) base_size: u64,
231
232    /// Total operation count after this batch.
233    pub(crate) total_size: u64,
234
235    /// Total active keys after this batch.
236    pub(crate) total_active_keys: usize,
237
238    /// Effective DB size at the base of this batch's ancestor chain. Equals `base_size`
239    /// when all ancestors are alive, but shifts up if ancestors were dropped before
240    /// merkleize (to account for the gap left by dead ancestors). Used by `apply_batch`
241    /// to validate that the DB hasn't diverged from this batch's chain.
242    pub(crate) db_size: u64,
243
244    /// Arc refs to each ancestor's diff, collected during `finish()` while ancestors are
245    /// alive. Used by `apply_batch` to apply uncommitted ancestor snapshot diffs.
246    /// 1:1 with `ancestor_diff_ends` (same length, same ordering).
247    pub(crate) ancestor_diffs: Vec<Arc<BTreeMap<U::Key, DiffEntry<F, U::Value>>>>,
248
249    /// Each ancestor's `total_size` (operation count after that ancestor).
250    /// 1:1 with `ancestor_diffs`: `ancestor_diff_ends[i]` is the boundary for
251    /// `ancestor_diffs[i]`. A batch is committed when `ancestor_diff_ends[i] <= db_size`.
252    pub(crate) ancestor_diff_ends: Vec<u64>,
253}
254
255/// Batch-infrastructure state used during merkleization.
256///
257/// Created by [`UnmerkleizedBatch::into_parts()`], which separates the pending mutations
258/// from the resolution/merkleization machinery. Helpers that need access to the parent
259/// chain, DB snapshot, or operation log are methods on this struct, eliminating parameter
260/// threading.
261struct Merkleizer<F: Family, H, U>
262where
263    U: update::Update + Send + Sync,
264    H: Hasher,
265    Operation<F, U>: Codec,
266{
267    journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, U>>,
268    ancestors: Vec<Arc<MerkleizedBatch<F, H::Digest, U>>>,
269    base_size: u64,
270    db_size: u64,
271    base_inactivity_floor_loc: Location<F>,
272    base_active_keys: usize,
273}
274
275/// Look up a key in the ancestor chain (immediate parent first).
276fn resolve_in_ancestors<'a, F: Family, D: Digest, U: update::Update + Send + Sync>(
277    ancestors: &'a [Arc<MerkleizedBatch<F, D, U>>],
278    key: &U::Key,
279) -> Option<&'a DiffEntry<F, U::Value>>
280where
281    Operation<F, U>: Send + Sync,
282{
283    for batch in ancestors {
284        if let Some(entry) = batch.diff.get(key) {
285            return Some(entry);
286        }
287    }
288    None
289}
290
291/// Apply a single diff entry to the snapshot index.
292fn apply_snapshot_diff<F: Family, V, I: UnorderedIndex<Value = Location<F>>>(
293    snapshot: &mut I,
294    key: &impl Key,
295    entry: &DiffEntry<F, V>,
296    base_old_loc: Option<Location<F>>,
297) {
298    match entry {
299        DiffEntry::Active { loc, .. } => match base_old_loc {
300            Some(old) => update_known_loc::<F, _>(snapshot, key, old, *loc),
301            None => snapshot.insert(key, *loc),
302        },
303        DiffEntry::Deleted { .. } => {
304            if let Some(old) = base_old_loc {
305                delete_known_loc::<F, _>(snapshot, key, old);
306            }
307        }
308    }
309}
310
311/// Resolve `loc` to an op within the in-memory ancestor region
312/// `[db_size, ancestors[0].journal_batch.size())`, walked parent-first.
313///
314/// # Panics
315///
316/// Panics if `loc` cannot be located in the chain: either it falls outside the region (including
317/// when `ancestors` is empty), or the ancestor spans are non-contiguous (a bookkeeping invariant
318/// violation).
319fn read_op_from_ancestors<F: Family, D: Digest, U: update::Update + Send + Sync>(
320    ancestors: &[Arc<MerkleizedBatch<F, D, U>>],
321    loc: u64,
322    db_size: u64,
323) -> &Operation<F, U>
324where
325    Operation<F, U>: Send + Sync,
326{
327    // ancestors is ordered parent-first: [parent, grandparent, ...].
328    // Each batch's items span [next_batch.size(), this_batch.size()).
329    // The last ancestor's base is db_size (committed DB boundary).
330    for (i, batch) in ancestors.iter().enumerate() {
331        let batch_base = ancestors
332            .get(i + 1)
333            .map_or(db_size, |b| b.journal_batch.size());
334        let batch_end = batch.journal_batch.size();
335        if loc >= batch_base && loc < batch_end {
336            return &batch.journal_batch.items()[(loc - batch_base) as usize];
337        }
338    }
339    unreachable!("location {loc} not found in ancestor chain (db_size={db_size})")
340}
341
342/// Read helpers on [`Merkleizer`].
343///
344/// # Operation-location model
345///
346/// The operation space is divided into three contiguous regions:
347///
348/// ```text
349///  [0 ........... db_size)  [db_size ..... base_size)  [base_size .. base_size+len)
350///   committed (on disk)     ancestors (in mem)          this batch (in mem)
351/// ```
352///
353/// `db_size` is the boundary between disk and in-memory ancestors. It equals the original DB size
354/// when the full ancestor chain is alive, or a higher value if ancestors were freed (see
355/// `into_parts`). For batches created directly from the DB (no uncommitted ancestors), the ancestor
356/// region is empty (`db_size == base_size`).
357///
358/// # Contract for all read methods
359///
360/// Callers must pass a `loc` that is a valid operation location: specifically `loc < base_size +
361/// batch_ops.len()` (i.e., within one of the three regions). Passing an out-of-range `loc` may
362/// panic (via `batch_ops` indexing or the ancestor-chain walk) or result in a disk-read error.
363/// In-memory locations are resolved synchronously; only disk locations await the `reader`.
364impl<F: Family, H, U> Merkleizer<F, H, U>
365where
366    U: update::Update + Send + Sync,
367    H: Hasher,
368    Operation<F, U>: Codec,
369{
370    /// Returns `Some(op)` if `loc` falls in the batch or ancestor regions, and `None` when `loc` is
371    /// in the committed region (`loc < db_size`).
372    fn try_read_op_from_uncommitted(
373        &self,
374        loc: Location<F>,
375        batch_ops: &[Operation<F, U>],
376    ) -> Option<Operation<F, U>> {
377        let loc = *loc;
378
379        if loc >= self.base_size {
380            return Some(batch_ops[(loc - self.base_size) as usize].clone());
381        }
382
383        if loc >= self.db_size {
384            return Some(read_op_from_ancestors(&self.ancestors, loc, self.db_size).clone());
385        }
386
387        None
388    }
389
390    /// Resolve an operation by its location `loc` if it can be done synchronously (e.g. without
391    /// I/O), or return `None` otherwise.
392    fn try_read_op_sync<R: Reader<Item = Operation<F, U>>>(
393        &self,
394        loc: Location<F>,
395        batch_ops: &[Operation<F, U>],
396        reader: &R,
397    ) -> Option<Operation<F, U>> {
398        self.try_read_op_from_uncommitted(loc, batch_ops)
399            .or_else(|| reader.try_read_sync(*loc))
400    }
401
402    /// Read a single operation by location.
403    async fn read_op<R: Reader<Item = Operation<F, U>>>(
404        &self,
405        loc: Location<F>,
406        batch_ops: &[Operation<F, U>],
407        reader: &R,
408    ) -> Result<Operation<F, U>, crate::qmdb::Error<F>> {
409        match self.try_read_op_sync(loc, batch_ops, reader) {
410            Some(op) => Ok(op),
411            None => Ok(reader.read(*loc).await?),
412        }
413    }
414
415    /// Read multiple operations by location.
416    async fn read_ops<R: Reader<Item = Operation<F, U>>>(
417        &self,
418        locations: &[Location<F>],
419        batch_ops: &[Operation<F, U>],
420        reader: &R,
421    ) -> Result<Vec<Operation<F, U>>, crate::qmdb::Error<F>> {
422        // Resolve hits synchronously: batch/ancestor first, then journal page cache.
423        let results: Vec<Option<Operation<F, U>>> = locations
424            .iter()
425            .map(|loc| self.try_read_op_sync(*loc, batch_ops, reader))
426            .collect();
427
428        // Batch-read disk misses concurrently.
429        let disk_results = try_join_all(
430            locations
431                .iter()
432                .zip(results.iter())
433                .filter(|(_, cached)| cached.is_none())
434                .map(|(loc, _)| reader.read(**loc)),
435        )
436        .await?;
437
438        // Merge disk results back in order.
439        let mut disk_iter = disk_results.into_iter();
440        Ok(results
441            .into_iter()
442            .map(|r| r.unwrap_or_else(|| disk_iter.next().expect("disk result count mismatch")))
443            .collect())
444    }
445
446    /// Gather existing-key locations for all keys in `mutations`.
447    ///
448    /// For each mutation key, checks the ancestor diffs first (returning the uncommitted
449    /// location for Active entries, skipping Deleted entries). Keys not in the ancestor diffs
450    /// fall back to the committed DB snapshot.
451    ///
452    /// When `include_active_collision_siblings` is true, Active entries also scan the snapshot
453    /// bucket for collision siblings (other keys sharing the same translated-key bucket). The
454    /// ordered path needs these so their `next_key` pointers are rewritten when a sibling is
455    /// deleted; the unordered path can skip them.
456    fn gather_existing_locations<E, C, I>(
457        &self,
458        mutations: &BTreeMap<U::Key, Option<U::Value>>,
459        db: &Db<F, E, C, I, H, U>,
460        include_active_collision_siblings: bool,
461    ) -> Vec<Location<F>>
462    where
463        E: Context,
464        C: Contiguous<Item = Operation<F, U>>,
465        I: UnorderedIndex<Value = Location<F>>,
466    {
467        // Extra slack (*3/2) avoids re-allocations when index collisions cause more than one
468        // location per key.
469        let mut locations = Vec::with_capacity(mutations.len() * 3 / 2);
470        if self.ancestors.is_empty() {
471            for key in mutations.keys() {
472                locations.extend(db.snapshot.get(key).copied());
473            }
474        } else {
475            for key in mutations.keys() {
476                match resolve_in_ancestors(&self.ancestors, key) {
477                    Some(DiffEntry::Deleted { .. }) => {
478                        // Stale; handled via extract_parent_deleted_creates.
479                    }
480                    Some(DiffEntry::Active {
481                        loc, base_old_loc, ..
482                    }) => {
483                        locations.push(*loc);
484                        if include_active_collision_siblings {
485                            locations.extend(
486                                db.snapshot
487                                    .get(key)
488                                    .copied()
489                                    .filter(move |loc| Some(*loc) != *base_old_loc),
490                            );
491                        }
492                    }
493                    None => {
494                        locations.extend(db.snapshot.get(key).copied());
495                    }
496                }
497            }
498        }
499        locations.sort();
500        locations.dedup();
501        locations
502    }
503
504    /// Check if the operation at `loc` for `key` is still active.
505    fn is_active_at<E, C, I>(
506        &self,
507        key: &U::Key,
508        loc: Location<F>,
509        batch_diff: &BTreeMap<U::Key, DiffEntry<F, U::Value>>,
510        db: &Db<F, E, C, I, H, U>,
511    ) -> bool
512    where
513        E: Context,
514        C: Contiguous<Item = Operation<F, U>>,
515        I: UnorderedIndex<Value = Location<F>>,
516    {
517        if let Some(entry) = batch_diff
518            .get(key)
519            .or_else(|| resolve_in_ancestors(&self.ancestors, key))
520        {
521            return entry.loc() == Some(loc);
522        }
523        db.snapshot.get(key).any(|&l| l == loc)
524    }
525
526    /// Extract keys that were deleted by a parent batch but are being
527    /// re-created by this child batch. Removes those keys from `mutations`
528    /// and returns `(key, (value, base_old_loc))` entries.
529    #[allow(clippy::type_complexity)]
530    fn extract_parent_deleted_creates(
531        &self,
532        mutations: &mut BTreeMap<U::Key, Option<U::Value>>,
533    ) -> BTreeMap<U::Key, (U::Value, Option<Location<F>>)> {
534        if self.ancestors.is_empty() {
535            return BTreeMap::new();
536        }
537        let mut creates = BTreeMap::new();
538        mutations.retain(|key, value| {
539            if let Some(DiffEntry::Deleted { base_old_loc }) =
540                resolve_in_ancestors(&self.ancestors, key)
541            {
542                if let Some(v) = value.take() {
543                    creates.insert(key.clone(), (v, *base_old_loc));
544                    return false;
545                }
546            }
547            true
548        });
549        creates
550    }
551
552    /// Shared final phases of merkleization: floor raise, CommitFloor, journal
553    /// merkleize, diff merge, and `MerkleizedBatch` construction.
554    #[allow(clippy::too_many_arguments)]
555    async fn finish<E, C, I, S, R>(
556        self,
557        mut ops: Vec<Operation<F, U>>,
558        mut diff: BTreeMap<U::Key, DiffEntry<F, U::Value>>,
559        active_keys_delta: isize,
560        user_steps: u64,
561        metadata: Option<U::Value>,
562        mut scan: S,
563        reader: R,
564        db: &Db<F, E, C, I, H, U>,
565    ) -> Result<Arc<MerkleizedBatch<F, H::Digest, U>>, crate::qmdb::Error<F>>
566    where
567        E: Context,
568        C: Contiguous<Item = Operation<F, U>>,
569        I: UnorderedIndex<Value = Location<F>>,
570        S: FloorScan<F>,
571        R: Reader<Item = Operation<F, U>>,
572    {
573        // Floor raise.
574        // Steps = user_steps + 1 (+1 for previous commit becoming inactive).
575        let total_steps = user_steps + 1;
576        let total_active_keys = self.base_active_keys as isize + active_keys_delta;
577        let mut floor = self.base_inactivity_floor_loc;
578
579        if total_active_keys > 0 {
580            // Floor raise: advance the inactivity floor by `total_steps` active operations.
581            // `fixed_tip` prevents scanning into floor-raise moves just appended.
582            let fixed_tip = self.base_size + ops.len() as u64;
583            let mut moved = 0u64;
584            let mut scan_from = floor;
585
586            while moved < total_steps {
587                // Collect candidates, capped by the number of active ops still needed.
588                // `scan_from` tracks prefetch progress separately from `floor`, so
589                // early exit cannot leave `floor` past unprocessed candidates.
590                let limit = ((total_steps - moved) as usize).min(MAX_CONCURRENT_READS as usize);
591                let mut candidates = Vec::with_capacity(limit);
592                while candidates.len() < limit {
593                    let Some(candidate) = scan.next_candidate(scan_from, fixed_tip) else {
594                        break;
595                    };
596                    candidates.push(candidate);
597                    scan_from = Location::new(*candidate + 1);
598                }
599                if candidates.is_empty() {
600                    break;
601                }
602
603                // Batch-read candidates: cache hits resolve synchronously, disk misses
604                // are fetched concurrently.
605                let resolved = self.read_ops(&candidates, &ops, &reader).await?;
606
607                // Process results in order, moving active ops to the tip.
608                for (candidate, op) in candidates.into_iter().zip(resolved) {
609                    floor = Location::new(*candidate + 1);
610                    let Some(key) = op.key().cloned() else {
611                        continue; // skip CommitFloor and other non-keyed ops
612                    };
613                    if !self.is_active_at(&key, candidate, &diff, db) {
614                        continue;
615                    }
616                    let new_loc = Location::new(self.base_size + ops.len() as u64);
617                    let base_old_loc = diff
618                        .get(&key)
619                        .or_else(|| resolve_in_ancestors(&self.ancestors, &key))
620                        .map_or(Some(candidate), DiffEntry::base_old_loc);
621                    let value = extract_update_value(&op);
622                    ops.push(op);
623                    diff.insert(
624                        key,
625                        DiffEntry::Active {
626                            value,
627                            loc: new_loc,
628                            base_old_loc,
629                        },
630                    );
631                    moved += 1;
632                    if moved >= total_steps {
633                        break;
634                    }
635                }
636            }
637        } else {
638            // DB is empty after this batch; raise floor to tip.
639            floor = Location::new(self.base_size + ops.len() as u64);
640            debug!(tip = ?floor, "db is empty, raising floor to tip");
641        }
642
643        // Release the reader guard before CPU-only work (merkleization) so
644        // concurrent writers are not blocked.
645        drop(reader);
646
647        // CommitFloor operation.
648        let commit_loc = Location::new(self.base_size + ops.len() as u64);
649        ops.push(Operation::CommitFloor(metadata, floor));
650
651        // Merkleize the journal batch.
652        // The journal batch was created eagerly at batch construction time and its
653        // parent already contains all prior batches' Merkle state, so we only
654        // add THIS batch's operations. Parent operations are never re-cloned,
655        // re-encoded, or re-hashed.
656        let ops = Arc::new(ops);
657        let journal = db
658            .log
659            .with_mem(|base| self.journal_batch.merkleize_with(base, ops));
660
661        let ancestor_diffs: Vec<_> = self.ancestors.iter().map(|a| Arc::clone(&a.diff)).collect();
662        let ancestor_diff_ends: Vec<_> = self.ancestors.iter().map(|a| a.total_size).collect();
663
664        debug_assert!(total_active_keys >= 0, "active_keys underflow");
665        Ok(Arc::new(MerkleizedBatch {
666            journal_batch: journal,
667            diff: Arc::new(diff),
668            parent: self.ancestors.first().map(Arc::downgrade),
669            new_inactivity_floor_loc: floor,
670            new_last_commit_loc: commit_loc,
671            base_size: self.base_size,
672            total_size: *commit_loc + 1,
673            total_active_keys: total_active_keys as usize,
674            db_size: self.db_size,
675            ancestor_diffs,
676            ancestor_diff_ends,
677        }))
678    }
679}
680
681impl<F: Family, H, U> UnmerkleizedBatch<F, H, U>
682where
683    U: update::Update + Send + Sync,
684    H: Hasher,
685    Operation<F, U>: Codec,
686{
687    /// Record a mutation. Use `Some(value)` for update/create, `None` for delete.
688    ///
689    /// If the same key is written multiple times within a batch, the last
690    /// value wins.
691    pub fn write(mut self, key: U::Key, value: Option<U::Value>) -> Self {
692        self.mutations.insert(key, value);
693        self
694    }
695
696    /// Split into pending mutations and the merkleization machinery.
697    #[allow(clippy::type_complexity)]
698    fn into_parts(self) -> (BTreeMap<U::Key, Option<U::Value>>, Merkleizer<F, H, U>) {
699        let ancestors: Vec<_> = self.base.parent().map_or_else(Vec::new, |parent| {
700            let mut v = vec![Arc::clone(parent)];
701            v.extend(parent.ancestors());
702            v
703        });
704        // If the Weak parent chain was truncated (an ancestor was committed and freed), the
705        // oldest alive ancestor's items don't start at db_size. Example: chain A -> B -> C,
706        // A committed and dropped. ancestors() yields [B] (A's Weak is dead). B's items start
707        // at A.size(), not db_size. We use the journal (strong Arcs, always intact) to compute
708        // the actual base so reads fall through to disk for locations in the gap.
709        let db_size = self.base.db_size();
710        let effective_db_size = ancestors.last().map_or(db_size, |oldest| {
711            let oldest_base =
712                oldest.journal_batch.size() - oldest.journal_batch.items().len() as u64;
713            db_size.max(oldest_base)
714        });
715        (
716            self.mutations,
717            Merkleizer {
718                journal_batch: self.journal_batch,
719                ancestors,
720                base_size: self.base.base_size(),
721                db_size: effective_db_size,
722                base_inactivity_floor_loc: self.base.inactivity_floor_loc(),
723                base_active_keys: self.base.active_keys(),
724            },
725        )
726    }
727}
728
729// Generic get() for both ordered and unordered UnmerkleizedBatch.
730impl<F: Family, H, U> UnmerkleizedBatch<F, H, U>
731where
732    U: update::Update + Send + Sync,
733    H: Hasher,
734    Operation<F, U>: Codec,
735{
736    /// Read through: mutations -> ancestor diffs -> committed DB.
737    pub async fn get<E, C, I>(
738        &self,
739        key: &U::Key,
740        db: &Db<F, E, C, I, H, U>,
741    ) -> Result<Option<U::Value>, crate::qmdb::Error<F>>
742    where
743        E: Context,
744        C: Contiguous<Item = Operation<F, U>>,
745        I: UnorderedIndex<Value = Location<F>> + 'static,
746    {
747        if let Some(value) = self.mutations.get(key) {
748            return Ok(value.clone());
749        }
750        if let Some(parent) = self.base.parent() {
751            if let Some(entry) = parent.diff.get(key) {
752                return Ok(entry.value().cloned());
753            }
754            for batch in parent.ancestors() {
755                if let Some(entry) = batch.diff.get(key) {
756                    return Ok(entry.value().cloned());
757                }
758            }
759        }
760        db.get(key).await
761    }
762}
763
764// Unordered-specific methods.
765impl<F: Family, K, V, H> UnmerkleizedBatch<F, H, update::Unordered<K, V>>
766where
767    K: Key,
768    V: ValueEncoding,
769    H: Hasher,
770    Operation<F, update::Unordered<K, V>>: Codec,
771{
772    /// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
773    pub async fn merkleize<E, C, I>(
774        self,
775        db: &Db<F, E, C, I, H, update::Unordered<K, V>>,
776        metadata: Option<V::Value>,
777    ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>, crate::qmdb::Error<F>>
778    where
779        E: Context,
780        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
781        I: UnorderedIndex<Value = Location<F>>,
782    {
783        self.merkleize_with_floor_scan(db, metadata, SequentialScan)
784            .await
785    }
786
787    /// Like [`merkleize`](Self::merkleize) but accepts a custom [`FloorScan`]
788    /// to accelerate floor raising.
789    pub(crate) async fn merkleize_with_floor_scan<E, C, I, S: FloorScan<F>>(
790        self,
791        db: &Db<F, E, C, I, H, update::Unordered<K, V>>,
792        metadata: Option<V::Value>,
793        scan: S,
794    ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>, crate::qmdb::Error<F>>
795    where
796        E: Context,
797        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
798        I: UnorderedIndex<Value = Location<F>>,
799    {
800        let (mut mutations, m) = self.into_parts();
801
802        // Resolve existing keys.
803        let locations = m.gather_existing_locations(&mutations, db, false);
804        let reader = db.log.reader().await;
805        let results = m.read_ops(&locations, &[], &reader).await?;
806
807        // Generate user mutation operations.
808        let mut ops: Vec<Operation<F, update::Unordered<K, V>>> =
809            Vec::with_capacity(mutations.len() + 1);
810        let mut diff: BTreeMap<K, DiffEntry<F, V::Value>> = BTreeMap::new();
811        let mut active_keys_delta: isize = 0;
812        let mut user_steps: u64 = 0;
813
814        // Process updates/deletes of existing keys in location order.
815        // This includes keys from both the committed snapshot and ancestor diffs.
816        for (op, &old_loc) in results.iter().zip(&locations) {
817            let key = op.key().expect("updates should have a key");
818
819            // A key resolved via the ancestor diff must only match at its ancestor-diff
820            // location. Without this guard, a stale snapshot collision (the pre-parent DB
821            // snapshot still containing the key's old location) can consume the mutation at the
822            // wrong sort position, changing the operation order relative to the committed-state
823            // path. When the ancestor diff entry does match, use it to trace `base_old_loc`
824            // back to the key's location in the committed DB snapshot.
825            let base_old_loc = if let Some(entry) = resolve_in_ancestors(&m.ancestors, key) {
826                if entry.loc() != Some(old_loc) {
827                    continue;
828                }
829                entry.base_old_loc()
830            } else {
831                Some(old_loc)
832            };
833
834            let Some(mutation) = mutations.remove(key) else {
835                // Snapshot index collision: this operation's key does not match
836                // any mutation key. The mutation will be handled as a create below.
837                continue;
838            };
839
840            // Write the user mutation at the next batch location while
841            // preserving the committed-base provenance computed above.
842            let new_loc = Location::new(m.base_size + ops.len() as u64);
843            match mutation {
844                Some(value) => {
845                    ops.push(Operation::Update(update::Unordered(
846                        key.clone(),
847                        value.clone(),
848                    )));
849                    diff.insert(
850                        key.clone(),
851                        DiffEntry::Active {
852                            value,
853                            loc: new_loc,
854                            base_old_loc,
855                        },
856                    );
857                    user_steps += 1;
858                }
859                None => {
860                    ops.push(Operation::Delete(key.clone()));
861                    diff.insert(key.clone(), DiffEntry::Deleted { base_old_loc });
862                    active_keys_delta -= 1;
863                    user_steps += 1;
864                }
865            }
866        }
867
868        // Handle parent-deleted keys that the child wants to re-create.
869        let parent_deleted_creates = m.extract_parent_deleted_creates(&mut mutations);
870
871        // Process creates: remaining mutations (fresh keys) plus parent-deleted
872        // keys being re-created. Both get an Update op and active_keys_delta += 1.
873        // Merge into a single sorted Vec so iteration order is deterministic
874        // regardless of whether the parent is pending or committed.
875        let mut creates: Vec<(K, V::Value, Option<Location<F>>)> =
876            Vec::with_capacity(mutations.len() + parent_deleted_creates.len());
877        for (key, value) in mutations {
878            if let Some(value) = value {
879                creates.push((key, value, None));
880            }
881        }
882        for (key, (value, base_old_loc)) in parent_deleted_creates {
883            creates.push((key, value, base_old_loc));
884        }
885        creates.sort_by(|(a, _, _), (b, _, _)| a.cmp(b));
886        for (key, value, base_old_loc) in creates {
887            let new_loc = Location::new(m.base_size + ops.len() as u64);
888            ops.push(Operation::Update(update::Unordered(
889                key.clone(),
890                value.clone(),
891            )));
892            diff.insert(
893                key,
894                DiffEntry::Active {
895                    value,
896                    loc: new_loc,
897                    base_old_loc,
898                },
899            );
900            active_keys_delta += 1;
901        }
902
903        // Remaining phases: floor raise, CommitFloor, journal, diff merge.
904        m.finish(
905            ops,
906            diff,
907            active_keys_delta,
908            user_steps,
909            metadata,
910            scan,
911            reader,
912            db,
913        )
914        .await
915    }
916}
917
918// Ordered-specific methods.
919impl<F: Family, K, V, H> UnmerkleizedBatch<F, H, update::Ordered<K, V>>
920where
921    K: Key,
922    V: ValueEncoding,
923    H: Hasher,
924    Operation<F, update::Ordered<K, V>>: Codec,
925{
926    /// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
927    pub async fn merkleize<E, C, I>(
928        self,
929        db: &Db<F, E, C, I, H, update::Ordered<K, V>>,
930        metadata: Option<V::Value>,
931    ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>, crate::qmdb::Error<F>>
932    where
933        E: Context,
934        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
935        I: OrderedIndex<Value = Location<F>>,
936    {
937        self.merkleize_with_floor_scan(db, metadata, SequentialScan)
938            .await
939    }
940
941    /// Like [`merkleize`](Self::merkleize) but accepts a custom [`FloorScan`]
942    /// to accelerate floor raising.
943    pub(crate) async fn merkleize_with_floor_scan<E, C, I, S: FloorScan<F>>(
944        self,
945        db: &Db<F, E, C, I, H, update::Ordered<K, V>>,
946        metadata: Option<V::Value>,
947        scan: S,
948    ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>, crate::qmdb::Error<F>>
949    where
950        E: Context,
951        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
952        I: OrderedIndex<Value = Location<F>>,
953    {
954        let (mut mutations, m) = self.into_parts();
955
956        // Resolve existing keys.
957        let locations = m.gather_existing_locations(&mutations, db, true);
958        let reader = db.log.reader().await;
959
960        // Classify mutations into deleted, created, updated.
961        let mut next_candidates: BTreeSet<K> = BTreeSet::new();
962        let mut prev_candidates: BTreeMap<K, (V::Value, Location<F>)> = BTreeMap::new();
963        let mut deleted: BTreeMap<K, Location<F>> = BTreeMap::new();
964        let mut updated: BTreeMap<K, (V::Value, Location<F>)> = BTreeMap::new();
965
966        for (op, &old_loc) in m
967            .read_ops(&locations, &[], &reader)
968            .await?
969            .into_iter()
970            .zip(&locations)
971        {
972            let update::Ordered {
973                key,
974                value,
975                next_key,
976            } = match op {
977                Operation::Update(data) => data,
978                _ => unreachable!("snapshot should only reference Update operations"),
979            };
980            next_candidates.insert(next_key);
981
982            let mutation = mutations.remove(&key);
983            prev_candidates.insert(key.clone(), (value, old_loc));
984
985            let Some(mutation) = mutation else {
986                // Snapshot index collision: this operation's key does not match
987                // the mutation key (the snapshot uses a compressed translated key
988                // that can collide). The mutation will be handled as a create below.
989                continue;
990            };
991
992            if let Some(new_value) = mutation {
993                updated.insert(key, (new_value, old_loc));
994            } else {
995                deleted.insert(key, old_loc);
996            }
997        }
998
999        // Handle parent-deleted keys that the child wants to re-create.
1000        let parent_deleted_creates = m.extract_parent_deleted_creates(&mut mutations);
1001
1002        // Remaining mutations are creates. Each entry carries the value and
1003        // base_old_loc (None for fresh creates, Some for parent-deleted recreates).
1004        // Merge into a single sorted Vec so iteration order is deterministic
1005        // regardless of whether the parent is pending or committed.
1006        let mut created: Vec<(K, V::Value, Option<Location<F>>)> =
1007            Vec::with_capacity(mutations.len() + parent_deleted_creates.len());
1008        for (key, value) in mutations {
1009            let Some(value) = value else {
1010                continue; // delete of non-existent key
1011            };
1012            next_candidates.insert(key.clone());
1013            created.push((key, value, None));
1014        }
1015        for (key, (value, base_old_loc)) in parent_deleted_creates {
1016            next_candidates.insert(key.clone());
1017            created.push((key, value, base_old_loc));
1018        }
1019        created.sort_by(|(a, _, _), (b, _, _)| a.cmp(b));
1020
1021        // Look up prev_translated_key for created/deleted keys.
1022        let mut prev_locations = Vec::new();
1023        for key in deleted.keys().chain(created.iter().map(|(k, _, _)| k)) {
1024            let Some((iter, _)) = db.snapshot.prev_translated_key(key) else {
1025                continue;
1026            };
1027            prev_locations.extend(iter.copied());
1028        }
1029        prev_locations.sort();
1030        prev_locations.dedup();
1031
1032        let prev_results = m.read_ops(&prev_locations, &[], &reader).await?;
1033
1034        for (op, &old_loc) in prev_results.into_iter().zip(&prev_locations) {
1035            let data = match op {
1036                Operation::Update(data) => data,
1037                _ => unreachable!("expected update operation"),
1038            };
1039            next_candidates.insert(data.next_key);
1040            prev_candidates.insert(data.key, (data.value, old_loc));
1041        }
1042
1043        // Add ancestor-diff-created keys to candidate sets. These keys may be predecessors
1044        // or successors of this batch's mutations but are invisible to the base-DB-only
1045        // prev_translated_key lookup above. Walk the parent chain to collect the effective
1046        // state for each key (closest ancestor wins).
1047        let ancestor_entries = {
1048            let mut entries: BTreeMap<&K, &DiffEntry<F, V::Value>> = BTreeMap::new();
1049            for batch in &m.ancestors {
1050                for (key, entry) in batch.diff.iter() {
1051                    entries.entry(key).or_insert(entry);
1052                }
1053            }
1054            entries
1055        };
1056
1057        for (key, entry) in &ancestor_entries {
1058            // Skip keys already handled by this batch's mutations.
1059            if updated.contains_key(*key)
1060                || created.binary_search_by(|(k, _, _)| k.cmp(*key)).is_ok()
1061                || deleted.contains_key(*key)
1062            {
1063                continue;
1064            }
1065            if let DiffEntry::Active { value, loc, .. } = entry {
1066                let op = m.read_op(*loc, &[], &reader).await?;
1067                let data = match op {
1068                    Operation::Update(data) => data,
1069                    _ => unreachable!("ancestor diff Active should reference Update op"),
1070                };
1071                next_candidates.insert((*key).clone());
1072                next_candidates.insert(data.next_key);
1073                prev_candidates.insert((*key).clone(), (value.clone(), *loc));
1074            }
1075        }
1076
1077        // Remove all known-deleted keys from possible_* sets. The prev_translated_key lookup
1078        // already did this for this batch's deletes, but the ancestor diff incorporation may
1079        // have re-added them via next_key references. Also remove parent-deleted keys that the
1080        // base DB lookup may have added.
1081        for key in deleted.keys() {
1082            prev_candidates.remove(key);
1083            next_candidates.remove(key);
1084        }
1085        for (key, entry) in &ancestor_entries {
1086            if matches!(entry, DiffEntry::Deleted { .. })
1087                && created.binary_search_by(|(k, _, _)| k.cmp(*key)).is_err()
1088            {
1089                prev_candidates.remove(*key);
1090                next_candidates.remove(*key);
1091            }
1092        }
1093
1094        // Generate operations.
1095        let mut ops: Vec<Operation<F, update::Ordered<K, V>>> =
1096            Vec::with_capacity(deleted.len() + updated.len() + created.len() + 1);
1097        let mut diff: BTreeMap<K, DiffEntry<F, V::Value>> = BTreeMap::new();
1098        let mut active_keys_delta: isize = 0;
1099        let mut user_steps: u64 = 0;
1100        // Process deletes.
1101        for (key, old_loc) in &deleted {
1102            ops.push(Operation::Delete(key.clone()));
1103
1104            let base_old_loc = resolve_in_ancestors(&m.ancestors, key)
1105                .map_or(Some(*old_loc), DiffEntry::base_old_loc);
1106
1107            diff.insert(key.clone(), DiffEntry::Deleted { base_old_loc });
1108            active_keys_delta -= 1;
1109            user_steps += 1;
1110        }
1111
1112        // Process updates of existing keys.
1113        for (key, (value, old_loc)) in updated {
1114            let new_loc = Location::new(m.base_size + ops.len() as u64);
1115            let next_key = find_next_key(&key, &next_candidates);
1116            ops.push(Operation::Update(update::Ordered {
1117                key: key.clone(),
1118                value: value.clone(),
1119                next_key,
1120            }));
1121
1122            let base_old_loc = resolve_in_ancestors(&m.ancestors, &key)
1123                .map_or(Some(old_loc), DiffEntry::base_old_loc);
1124
1125            diff.insert(
1126                key,
1127                DiffEntry::Active {
1128                    value,
1129                    loc: new_loc,
1130                    base_old_loc,
1131                },
1132            );
1133            user_steps += 1;
1134        }
1135
1136        // Collect created keys for the predecessor loop before consuming.
1137        let mut created_keys: Vec<K> = Vec::with_capacity(created.len());
1138
1139        // Process creates.
1140        for (key, value, base_old_loc) in created {
1141            created_keys.push(key.clone());
1142            let new_loc = Location::new(m.base_size + ops.len() as u64);
1143            let next_key = find_next_key(&key, &next_candidates);
1144            ops.push(Operation::Update(update::Ordered {
1145                key: key.clone(),
1146                value: value.clone(),
1147                next_key,
1148            }));
1149            diff.insert(
1150                key,
1151                DiffEntry::Active {
1152                    value,
1153                    loc: new_loc,
1154                    base_old_loc,
1155                },
1156            );
1157            active_keys_delta += 1;
1158        }
1159
1160        // Update predecessors of created and deleted keys.
1161        if !prev_candidates.is_empty() {
1162            for key in created_keys.iter().chain(deleted.keys()) {
1163                let (prev_key, (prev_value, prev_loc)) = find_prev_key(key, &prev_candidates);
1164                if diff.contains_key(prev_key) {
1165                    continue;
1166                }
1167
1168                let prev_new_loc = Location::new(m.base_size + ops.len() as u64);
1169                let prev_next_key = find_next_key(prev_key, &next_candidates);
1170                ops.push(Operation::Update(update::Ordered {
1171                    key: prev_key.clone(),
1172                    value: prev_value.clone(),
1173                    next_key: prev_next_key,
1174                }));
1175
1176                let prev_base_old_loc = resolve_in_ancestors(&m.ancestors, prev_key)
1177                    .map_or(Some(*prev_loc), DiffEntry::base_old_loc);
1178
1179                diff.insert(
1180                    prev_key.clone(),
1181                    DiffEntry::Active {
1182                        value: prev_value.clone(),
1183                        loc: prev_new_loc,
1184                        base_old_loc: prev_base_old_loc,
1185                    },
1186                );
1187                user_steps += 1;
1188            }
1189        }
1190
1191        // Remaining phases: floor raise, CommitFloor, journal, diff merge.
1192        m.finish(
1193            ops,
1194            diff,
1195            active_keys_delta,
1196            user_steps,
1197            metadata,
1198            scan,
1199            reader,
1200            db,
1201        )
1202        .await
1203    }
1204}
1205
1206impl<F: Family, D: Digest, U: update::Update + Send + Sync> MerkleizedBatch<F, D, U>
1207where
1208    Operation<F, U>: Send + Sync,
1209{
1210    /// Return the speculative root.
1211    pub fn root(&self) -> D {
1212        self.journal_batch.root()
1213    }
1214
1215    /// Iterate over ancestor batches (parent first, then grandparent, etc.). Stops when a
1216    /// Weak ref fails to upgrade (ancestor was freed).
1217    pub(crate) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
1218        let mut next = self.parent.as_ref().and_then(Weak::upgrade);
1219        iter::from_fn(move || {
1220            let batch = next.take()?;
1221            next = batch.parent.as_ref().and_then(Weak::upgrade);
1222            Some(batch)
1223        })
1224    }
1225}
1226
1227impl<F: Family, D: Digest, U: update::Update + Send + Sync> MerkleizedBatch<F, D, U>
1228where
1229    Operation<F, U>: Codec,
1230{
1231    /// Create a new speculative batch of operations with this batch as its parent.
1232    ///
1233    /// All uncommitted ancestors in the chain must be kept alive until the child (or any
1234    /// descendant) is merkleized. Dropping an uncommitted ancestor causes data
1235    /// loss detected at `apply_batch` time.
1236    pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, U>
1237    where
1238        H: Hasher<Digest = D>,
1239    {
1240        UnmerkleizedBatch {
1241            journal_batch: self.journal_batch.new_batch::<H>(),
1242            mutations: BTreeMap::new(),
1243            base: Base::Child(Arc::clone(self)),
1244        }
1245    }
1246
1247    /// Read through: local diff -> parent chain -> committed DB.
1248    pub async fn get<E, C, I, H>(
1249        &self,
1250        key: &U::Key,
1251        db: &Db<F, E, C, I, H, U>,
1252    ) -> Result<Option<U::Value>, crate::qmdb::Error<F>>
1253    where
1254        E: Context,
1255        C: Contiguous<Item = Operation<F, U>>,
1256        I: UnorderedIndex<Value = Location<F>> + 'static,
1257        H: Hasher<Digest = D>,
1258    {
1259        if let Some(entry) = self.diff.get(key) {
1260            return Ok(entry.value().cloned());
1261        }
1262        // Walk parent chain. If a parent was freed (committed and dropped), the iterator
1263        // stops and we fall through to DB.
1264        for batch in self.ancestors() {
1265            if let Some(entry) = batch.diff.get(key) {
1266                return Ok(entry.value().cloned());
1267            }
1268        }
1269        db.get(key).await
1270    }
1271}
1272
1273impl<F, E, C, I, H, U> Db<F, E, C, I, H, U>
1274where
1275    F: Family,
1276    E: Context,
1277    U: update::Update + Send + Sync,
1278    C: Contiguous<Item = Operation<F, U>>,
1279    I: UnorderedIndex<Value = Location<F>>,
1280    H: Hasher,
1281    Operation<F, U>: Codec,
1282{
1283    /// Create a new speculative batch of operations with this database as its parent.
1284    pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, U> {
1285        // The DB is always committed, so journal size = last_commit_loc + 1.
1286        let journal_size = *self.last_commit_loc + 1;
1287        UnmerkleizedBatch {
1288            journal_batch: self.log.new_batch(),
1289            mutations: BTreeMap::new(),
1290            base: Base::Db {
1291                db_size: journal_size,
1292                inactivity_floor_loc: self.inactivity_floor_loc,
1293                active_keys: self.active_keys,
1294            },
1295        }
1296    }
1297}
1298
1299impl<F, E, C, I, H, U> Db<F, E, C, I, H, U>
1300where
1301    F: Family,
1302    E: Context,
1303    U: update::Update + Send + Sync + 'static,
1304    C: Mutable<Item = Operation<F, U>> + crate::Persistable<Error = crate::journal::Error>,
1305    I: UnorderedIndex<Value = Location<F>>,
1306    H: Hasher,
1307    Operation<F, U>: Codec,
1308{
1309    /// Apply a batch to the database, returning the range of written operations.
1310    ///
1311    /// A batch is valid only if every batch applied to the database since this batch's
1312    /// ancestor chain was created is an ancestor of this batch. Applying a batch from a
1313    /// different fork returns [`crate::qmdb::Error::StaleBatch`].
1314    ///
1315    /// This publishes the batch to the in-memory database state and appends it to the
1316    /// journal, but does not durably persist it. Call [`Db::commit`] or [`Db::sync`] to
1317    /// guarantee durability.
1318    pub async fn apply_batch(
1319        &mut self,
1320        batch: Arc<MerkleizedBatch<F, H::Digest, U>>,
1321    ) -> Result<Range<Location<F>>, crate::qmdb::Error<F>> {
1322        let db_size = *self.last_commit_loc + 1;
1323        // Valid db_size values: batch.db_size (nothing committed), batch.base_size
1324        // (all ancestors committed), or any ancestor_diff_ends[i] (partial commit).
1325        let valid = db_size == batch.db_size
1326            || db_size == batch.base_size
1327            || batch.ancestor_diff_ends.contains(&db_size);
1328        if !valid {
1329            return Err(crate::qmdb::Error::StaleBatch {
1330                db_size,
1331                batch_db_size: batch.db_size,
1332                batch_base_size: batch.base_size,
1333            });
1334        }
1335        let start_loc = Location::new(db_size);
1336
1337        // 1. Apply journal (handles its own partial ancestor skipping).
1338        self.log.apply_batch(&batch.journal_batch).await?;
1339
1340        // 2. Build committed_locs: for each key in a committed ancestor batch,
1341        //    record the nearest (to child) committed ancestor's final state.
1342        //    Some(loc) = Active at loc, None = Deleted.
1343        let mut committed_locs: BTreeMap<&U::Key, Option<Location<F>>> = BTreeMap::new();
1344        for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
1345            if batch.ancestor_diff_ends[i] <= db_size {
1346                for (key, entry) in ancestor_diff.iter() {
1347                    // parent-first order: .or_insert keeps the nearest committed.
1348                    committed_locs.entry(key).or_insert(entry.loc());
1349                }
1350            }
1351        }
1352
1353        // 3. Apply child's diff (child wins via seen set).
1354        let mut seen = BTreeSet::<&U::Key>::new();
1355        for (key, entry) in batch.diff.iter() {
1356            seen.insert(key);
1357            let base_old_loc = committed_locs
1358                .get(key)
1359                .copied()
1360                .unwrap_or_else(|| entry.base_old_loc());
1361            apply_snapshot_diff(&mut self.snapshot, key, entry, base_old_loc);
1362        }
1363
1364        // 4. Apply uncommitted ancestor diffs (skip committed batches, skip seen keys).
1365        for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
1366            if batch.ancestor_diff_ends[i] <= db_size {
1367                continue;
1368            }
1369            for (key, entry) in ancestor_diff.iter() {
1370                if !seen.insert(key) {
1371                    continue;
1372                }
1373                let base_old_loc = committed_locs
1374                    .get(key)
1375                    .copied()
1376                    .unwrap_or_else(|| entry.base_old_loc());
1377                apply_snapshot_diff(&mut self.snapshot, key, entry, base_old_loc);
1378            }
1379        }
1380
1381        // 5. Update DB metadata.
1382        self.active_keys = batch.total_active_keys;
1383        self.inactivity_floor_loc = batch.new_inactivity_floor_loc;
1384        self.last_commit_loc = batch.new_last_commit_loc;
1385
1386        // 6. Return range of operations that were written to the log.
1387        let end_loc = Location::new(*self.last_commit_loc + 1);
1388        Ok(start_loc..end_loc)
1389    }
1390}
1391
1392impl<F: Family, E, C, I, H, U> Db<F, E, C, I, H, U>
1393where
1394    E: Context,
1395    U: update::Update + Send + Sync,
1396    C: Contiguous<Item = Operation<F, U>>,
1397    I: UnorderedIndex<Value = Location<F>>,
1398    H: Hasher,
1399    Operation<F, U>: Codec,
1400{
1401    /// Create an initial [`MerkleizedBatch`] from the committed DB state.
1402    ///
1403    /// This is the starting point for building owned batch chains.
1404    pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, U>> {
1405        // The DB is always committed, so journal size = last_commit_loc + 1.
1406        let journal_size = *self.last_commit_loc + 1;
1407        Arc::new(MerkleizedBatch {
1408            journal_batch: self.log.to_merkleized_batch(),
1409            diff: Arc::new(BTreeMap::new()),
1410            parent: None,
1411            new_inactivity_floor_loc: self.inactivity_floor_loc,
1412            new_last_commit_loc: self.last_commit_loc,
1413            base_size: journal_size,
1414            total_size: journal_size,
1415            total_active_keys: self.active_keys,
1416            db_size: journal_size,
1417            ancestor_diffs: Vec::new(),
1418            ancestor_diff_ends: Vec::new(),
1419        })
1420    }
1421}
1422
1423/// Extract the value from an Update operation via the `Update` trait.
1424fn extract_update_value<F: Family, U: update::Update>(op: &Operation<F, U>) -> U::Value {
1425    match op {
1426        Operation::Update(update) => update.value().clone(),
1427        _ => unreachable!("floor raise should only re-append Update operations"),
1428    }
1429}
1430
1431#[cfg(any(test, feature = "test-traits"))]
1432mod trait_impls {
1433    use super::*;
1434    use crate::qmdb::any::traits::{
1435        BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
1436        UnmerkleizedBatch as UnmerkleizedBatchTrait,
1437    };
1438    use std::future::Future;
1439
1440    impl<F, K, V, H, E, C, I> UnmerkleizedBatchTrait<Db<F, E, C, I, H, update::Unordered<K, V>>>
1441        for UnmerkleizedBatch<F, H, update::Unordered<K, V>>
1442    where
1443        F: Family,
1444        K: Key,
1445        V: ValueEncoding + 'static,
1446        H: Hasher,
1447        E: Context,
1448        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
1449        I: UnorderedIndex<Value = Location<F>>,
1450        Operation<F, update::Unordered<K, V>>: Codec,
1451    {
1452        type Family = F;
1453        type K = K;
1454        type V = V::Value;
1455        type Metadata = V::Value;
1456        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>;
1457
1458        fn write(mut self, key: K, value: Option<V::Value>) -> Self {
1459            self.mutations.insert(key, value);
1460            self
1461        }
1462
1463        fn merkleize(
1464            self,
1465            db: &Db<F, E, C, I, H, update::Unordered<K, V>>,
1466            metadata: Option<V::Value>,
1467        ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
1468            self.merkleize(db, metadata)
1469        }
1470    }
1471
1472    impl<F, K, V, H, E, C, I> UnmerkleizedBatchTrait<Db<F, E, C, I, H, update::Ordered<K, V>>>
1473        for UnmerkleizedBatch<F, H, update::Ordered<K, V>>
1474    where
1475        F: Family,
1476        K: Key,
1477        V: ValueEncoding + 'static,
1478        H: Hasher,
1479        E: Context,
1480        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
1481        I: OrderedIndex<Value = Location<F>>,
1482        Operation<F, update::Ordered<K, V>>: Codec,
1483    {
1484        type Family = F;
1485        type K = K;
1486        type V = V::Value;
1487        type Metadata = V::Value;
1488        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>;
1489
1490        fn write(mut self, key: K, value: Option<V::Value>) -> Self {
1491            self.mutations.insert(key, value);
1492            self
1493        }
1494
1495        fn merkleize(
1496            self,
1497            db: &Db<F, E, C, I, H, update::Ordered<K, V>>,
1498            metadata: Option<V::Value>,
1499        ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
1500            self.merkleize(db, metadata)
1501        }
1502    }
1503
1504    impl<F: Family, D: Digest, U: update::Update + Send + Sync + 'static> MerkleizedBatchTrait
1505        for Arc<MerkleizedBatch<F, D, U>>
1506    where
1507        Operation<F, U>: Codec,
1508    {
1509        type Digest = D;
1510
1511        fn root(&self) -> D {
1512            MerkleizedBatch::root(self)
1513        }
1514    }
1515
1516    impl<F, E, K, V, C, I, H> BatchableDb for Db<F, E, C, I, H, update::Unordered<K, V>>
1517    where
1518        F: Family,
1519        E: Context,
1520        K: Key,
1521        V: ValueEncoding + 'static,
1522        C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
1523            + crate::Persistable<Error = crate::journal::Error>,
1524        I: UnorderedIndex<Value = Location<F>>,
1525        H: Hasher,
1526        Operation<F, update::Unordered<K, V>>: Codec,
1527    {
1528        type Family = F;
1529        type K = K;
1530        type V = V::Value;
1531        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>;
1532        type Batch = UnmerkleizedBatch<F, H, update::Unordered<K, V>>;
1533
1534        fn new_batch(&self) -> Self::Batch {
1535            self.new_batch()
1536        }
1537
1538        fn apply_batch(
1539            &mut self,
1540            batch: Self::Merkleized,
1541        ) -> impl Future<Output = Result<Range<Location<F>>, crate::qmdb::Error<F>>> {
1542            self.apply_batch(batch)
1543        }
1544    }
1545
1546    impl<F, E, K, V, C, I, H> BatchableDb for Db<F, E, C, I, H, update::Ordered<K, V>>
1547    where
1548        F: Family,
1549        E: Context,
1550        K: Key,
1551        V: ValueEncoding + 'static,
1552        C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
1553            + crate::Persistable<Error = crate::journal::Error>,
1554        I: OrderedIndex<Value = Location<F>>,
1555        H: Hasher,
1556        Operation<F, update::Ordered<K, V>>: Codec,
1557    {
1558        type Family = F;
1559        type K = K;
1560        type V = V::Value;
1561        type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>;
1562        type Batch = UnmerkleizedBatch<F, H, update::Ordered<K, V>>;
1563
1564        fn new_batch(&self) -> Self::Batch {
1565            self.new_batch()
1566        }
1567
1568        fn apply_batch(
1569            &mut self,
1570            batch: Self::Merkleized,
1571        ) -> impl Future<Output = Result<Range<Location<F>>, crate::qmdb::Error<F>>> {
1572            self.apply_batch(batch)
1573        }
1574    }
1575}
1576
1577#[cfg(test)]
1578mod tests {
1579    use super::*;
1580    use crate::{
1581        mmr,
1582        qmdb::any::{
1583            ordered::fixed::Db as OrderedFixedDb,
1584            test::{colliding_digest, fixed_db_config},
1585            unordered::fixed::Db as UnorderedFixedDb,
1586        },
1587        translator::OneCap,
1588    };
1589    use commonware_cryptography::{sha256, Sha256};
1590    use commonware_runtime::{deterministic, Runner as _};
1591
1592    /// Test helper: same logic as `Merkleizer::extract_parent_deleted_creates`
1593    /// but without requiring a full Merkleizer instance.
1594    fn extract_parent_deleted_creates<K: Ord + Clone, V: Clone>(
1595        mutations: &mut BTreeMap<K, Option<V>>,
1596        base_diff: &BTreeMap<K, DiffEntry<mmr::Family, V>>,
1597    ) -> BTreeMap<K, (V, Option<crate::mmr::Location>)> {
1598        let creates: BTreeMap<_, _> = mutations
1599            .iter()
1600            .filter_map(|(key, value)| {
1601                if let Some(DiffEntry::Deleted { base_old_loc }) = base_diff.get(key) {
1602                    if let Some(value) = value {
1603                        return Some((key.clone(), (value.clone(), *base_old_loc)));
1604                    }
1605                }
1606                None
1607            })
1608            .collect();
1609        for key in creates.keys() {
1610            mutations.remove(key);
1611        }
1612        creates
1613    }
1614
1615    #[test]
1616    fn extract_parent_deleted_creates_basic() {
1617        let mut mutations: BTreeMap<u64, Option<u64>> = BTreeMap::new();
1618        mutations.insert(1, Some(100)); // update over parent-deleted key
1619        mutations.insert(2, None); // delete (not a create)
1620        mutations.insert(3, Some(300)); // update, but not in base diff
1621
1622        let mut base_diff: BTreeMap<u64, DiffEntry<mmr::Family, u64>> = BTreeMap::new();
1623        base_diff.insert(
1624            1,
1625            DiffEntry::Deleted {
1626                base_old_loc: Some(crate::mmr::Location::new(5)),
1627            },
1628        );
1629        base_diff.insert(
1630            4,
1631            DiffEntry::Active {
1632                value: 400,
1633                loc: crate::mmr::Location::new(10),
1634                base_old_loc: None,
1635            },
1636        );
1637
1638        let creates = extract_parent_deleted_creates(&mut mutations, &base_diff);
1639
1640        // key1 extracted: value=100, base_old_loc=Some(5)
1641        assert_eq!(creates.len(), 1);
1642        let (value, base_old_loc) = creates.get(&1).unwrap();
1643        assert_eq!(*value, 100);
1644        assert_eq!(*base_old_loc, Some(crate::mmr::Location::new(5)));
1645
1646        // key1 removed from mutations, key2 and key3 remain.
1647        assert_eq!(mutations.len(), 2);
1648        assert!(mutations.contains_key(&2));
1649        assert!(mutations.contains_key(&3));
1650    }
1651
1652    #[test]
1653    fn extract_parent_deleted_creates_delete_not_extracted() {
1654        let mut mutations: BTreeMap<u64, Option<u64>> = BTreeMap::new();
1655        mutations.insert(1, None); // deleting a parent-deleted key
1656
1657        let mut base_diff: BTreeMap<u64, DiffEntry<mmr::Family, u64>> = BTreeMap::new();
1658        base_diff.insert(
1659            1,
1660            DiffEntry::Deleted {
1661                base_old_loc: Some(crate::mmr::Location::new(5)),
1662            },
1663        );
1664
1665        let creates = extract_parent_deleted_creates(&mut mutations, &base_diff);
1666
1667        // Delete of a deleted key is not a create.
1668        assert!(creates.is_empty());
1669        // Mutation unchanged.
1670        assert_eq!(mutations.len(), 1);
1671        assert!(mutations.contains_key(&1));
1672    }
1673
1674    #[test]
1675    fn read_ops_resolves_committed_ancestor_and_current_sources() {
1676        let runner = deterministic::Runner::default();
1677        runner.start(|context| async move {
1678            type TestDb = UnorderedFixedDb<
1679                mmr::Family,
1680                deterministic::Context,
1681                sha256::Digest,
1682                sha256::Digest,
1683                Sha256,
1684                OneCap,
1685            >;
1686
1687            let config = fixed_db_config::<OneCap>("read-locations-all-sources", &context);
1688            let mut db = TestDb::init(context, config).await.unwrap();
1689
1690            let key_db = colliding_digest(0x30, 0);
1691            let value_db = colliding_digest(0x30, 1);
1692            let key_parent = colliding_digest(0x31, 0);
1693            let value_parent = colliding_digest(0x31, 1);
1694            let key_current = colliding_digest(0x32, 0);
1695            let value_current = colliding_digest(0x32, 1);
1696
1697            // Commit one key to the DB so it's on disk.
1698            let seed = db
1699                .new_batch()
1700                .write(key_db, Some(value_db))
1701                .merkleize(&db, None)
1702                .await
1703                .unwrap();
1704            db.apply_batch(seed).await.unwrap();
1705            db.commit().await.unwrap();
1706
1707            let committed_loc = db.snapshot.get(&key_db).next().copied().unwrap();
1708
1709            // Create a parent batch with a second key (in-memory ancestor).
1710            let parent = db
1711                .new_batch()
1712                .write(key_parent, Some(value_parent))
1713                .merkleize(&db, None)
1714                .await
1715                .unwrap();
1716            let parent_loc = parent.diff.get(&key_parent).unwrap().loc().unwrap();
1717
1718            // Create a child batch with a third key (current ops).
1719            let child = parent
1720                .new_batch::<Sha256>()
1721                .write(key_current, Some(value_current));
1722            let (_mutations, merkleizer) = child.into_parts();
1723
1724            let current_loc = Location::new(merkleizer.base_size);
1725            let batch_ops = vec![Operation::Update(update::Unordered(
1726                key_current,
1727                value_current,
1728            ))];
1729
1730            // read_ops should resolve all three sources correctly.
1731            let reader = db.log.reader().await;
1732            let ops = merkleizer
1733                .read_ops(
1734                    &[committed_loc, parent_loc, current_loc],
1735                    &batch_ops,
1736                    &reader,
1737                )
1738                .await
1739                .unwrap();
1740            drop(reader);
1741
1742            assert_eq!(
1743                ops,
1744                vec![
1745                    Operation::Update(update::Unordered(key_db, value_db)),
1746                    Operation::Update(update::Unordered(key_parent, value_parent)),
1747                    Operation::Update(update::Unordered(key_current, value_current)),
1748                ]
1749            );
1750
1751            // read_op: single-location reads across all three sources.
1752            let reader = db.log.reader().await;
1753            let disk_op = merkleizer
1754                .read_op(committed_loc, &batch_ops, &reader)
1755                .await
1756                .unwrap();
1757            assert_eq!(
1758                disk_op,
1759                Operation::Update(update::Unordered(key_db, value_db))
1760            );
1761
1762            let ancestor_op = merkleizer
1763                .read_op(parent_loc, &batch_ops, &reader)
1764                .await
1765                .unwrap();
1766            assert_eq!(
1767                ancestor_op,
1768                Operation::Update(update::Unordered(key_parent, value_parent))
1769            );
1770
1771            let current_op = merkleizer
1772                .read_op(current_loc, &batch_ops, &reader)
1773                .await
1774                .unwrap();
1775            assert_eq!(
1776                current_op,
1777                Operation::Update(update::Unordered(key_current, value_current))
1778            );
1779            drop(reader);
1780
1781            db.destroy().await.unwrap();
1782        });
1783    }
1784
1785    #[test]
1786    fn child_root_matches_between_pending_and_committed_paths_under_collisions() {
1787        let runner = deterministic::Runner::default();
1788        runner.start(|context| async move {
1789            type TestDb = UnorderedFixedDb<
1790                mmr::Family,
1791                deterministic::Context,
1792                sha256::Digest,
1793                sha256::Digest,
1794                Sha256,
1795                OneCap,
1796            >;
1797
1798            let config = fixed_db_config::<OneCap>("batch-collision-regression", &context);
1799            let mut db = TestDb::init(context, config).await.unwrap();
1800            let key_a = colliding_digest(0xAA, 1);
1801            let key_b = colliding_digest(0xAA, 0);
1802
1803            // Seed four colliding committed keys, then update only key_a.
1804            // The specific 4 / 1 / 0 shape is a concrete counterexample:
1805            // key_b remains outside parent.diff and is still resolved through
1806            // the committed snapshot in the child.
1807            let mut initial = db.new_batch();
1808            for i in 0..4 {
1809                initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
1810            }
1811            let initial = initial.merkleize(&db, None).await.unwrap();
1812            db.apply_batch(initial).await.unwrap();
1813            db.commit().await.unwrap();
1814
1815            // Update only key_a so the colliding sibling key_b remains outside
1816            // parent.diff and must still be resolved through the committed
1817            // snapshot in the child.
1818            let parent = db
1819                .new_batch()
1820                .write(key_a, Some(colliding_digest(0xCC, 1)))
1821                .merkleize(&db, None)
1822                .await
1823                .unwrap();
1824            assert!(
1825                !parent.diff.contains_key(&key_b),
1826                "regression requires a sibling collision to remain only in the committed snapshot"
1827            );
1828
1829            // Build the child while the parent is still pending. The child
1830            // mutates the parent-updated key plus the colliding sibling that
1831            // still resolves through the committed snapshot. Without the
1832            // ancestor-diff location guard, the stale snapshot entry for key_a
1833            // can consume key_a's mutation before the actual ancestor location.
1834            let pending_child = parent
1835                .new_batch::<Sha256>()
1836                .write(key_a, Some(colliding_digest(0xDD, 1)))
1837                .write(key_b, Some(colliding_digest(0xDD, 0)))
1838                .merkleize(&db, None)
1839                .await
1840                .unwrap();
1841
1842            let pending_root = pending_child.root();
1843
1844            db.apply_batch(parent).await.unwrap();
1845            db.commit().await.unwrap();
1846
1847            let committed_child = db
1848                .new_batch()
1849                .write(key_a, Some(colliding_digest(0xDD, 1)))
1850                .write(key_b, Some(colliding_digest(0xDD, 0)))
1851                .merkleize(&db, None)
1852                .await
1853                .unwrap();
1854
1855            assert_eq!(pending_root, committed_child.root());
1856
1857            // Apply pending child. The resulting root should match a
1858            // child built directly from the committed DB.
1859            db.apply_batch(pending_child).await.unwrap();
1860            assert_eq!(db.root(), committed_child.root());
1861
1862            db.destroy().await.unwrap();
1863        });
1864    }
1865
1866    #[test]
1867    fn ordered_child_root_matches_between_pending_and_committed_paths_under_collisions() {
1868        let runner = deterministic::Runner::default();
1869        runner.start(|context| async move {
1870            type TestDb = OrderedFixedDb<
1871                mmr::Family,
1872                deterministic::Context,
1873                sha256::Digest,
1874                sha256::Digest,
1875                Sha256,
1876                OneCap,
1877            >;
1878
1879            let config = fixed_db_config::<OneCap>("ordered-batch-collision-regression", &context);
1880            let mut db = TestDb::init(context, config).await.unwrap();
1881            let key_a = colliding_digest(0xAA, 1);
1882            let key_b = colliding_digest(0xAA, 0);
1883
1884            // Match the unordered counterexample shape on the ordered path so
1885            // both variants exercise the same collision pattern.
1886            let mut initial = db.new_batch();
1887            for i in 0..4 {
1888                initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
1889            }
1890            let initial = initial.merkleize(&db, None).await.unwrap();
1891            db.apply_batch(initial).await.unwrap();
1892            db.commit().await.unwrap();
1893
1894            // Update only key_a so the colliding sibling key_b remains outside
1895            // parent.diff and must still be resolved through the committed
1896            // snapshot in the child.
1897            let parent = db
1898                .new_batch()
1899                .write(key_a, Some(colliding_digest(0xCC, 1)))
1900                .merkleize(&db, None)
1901                .await
1902                .unwrap();
1903            assert!(
1904                !parent.diff.contains_key(&key_b),
1905                "ordered regression requires a sibling collision to remain only in the committed snapshot"
1906            );
1907
1908            // Build the child while the parent is still pending, then rebuild
1909            // the same logical child after committing the parent.
1910            let pending_child = parent
1911                .new_batch::<Sha256>()
1912                .write(key_a, Some(colliding_digest(0xDD, 1)))
1913                .write(key_b, Some(colliding_digest(0xDD, 0)))
1914                .merkleize(&db, None)
1915                .await
1916                .unwrap();
1917
1918            let pending_root = pending_child.root();
1919
1920            db.apply_batch(parent).await.unwrap();
1921            db.commit().await.unwrap();
1922
1923            let committed_child = db
1924                .new_batch()
1925                .write(key_a, Some(colliding_digest(0xDD, 1)))
1926                .write(key_b, Some(colliding_digest(0xDD, 0)))
1927                .merkleize(&db, None)
1928                .await
1929                .unwrap();
1930
1931            assert_eq!(pending_root, committed_child.root());
1932
1933            // Apply pending child. The resulting root should match a
1934            // child built directly from the committed DB.
1935            db.apply_batch(pending_child).await.unwrap();
1936            assert_eq!(db.root(), committed_child.root());
1937
1938            db.destroy().await.unwrap();
1939        });
1940    }
1941
1942    #[test]
1943    fn sequential_commit_basic() {
1944        // Build DB -> A -> B, commit A, then apply B. Verify B
1945        // produces the same DB state as building B directly from the committed DB.
1946        let runner = deterministic::Runner::default();
1947        runner.start(|context| async move {
1948            type TestDb = UnorderedFixedDb<
1949                mmr::Family,
1950                deterministic::Context,
1951                sha256::Digest,
1952                sha256::Digest,
1953                Sha256,
1954                OneCap,
1955            >;
1956
1957            let config = fixed_db_config::<OneCap>("seq-commit-basic", &context);
1958            let mut db = TestDb::init(context, config).await.unwrap();
1959
1960            // Seed an initial key.
1961            let seed = db
1962                .new_batch()
1963                .write(colliding_digest(0x01, 0), Some(colliding_digest(0x01, 1)))
1964                .merkleize(&db, None)
1965                .await
1966                .unwrap();
1967            db.apply_batch(seed).await.unwrap();
1968            db.commit().await.unwrap();
1969
1970            // Build batch A.
1971            let key_a = colliding_digest(0x02, 0);
1972            let val_a = colliding_digest(0x02, 1);
1973            let batch_a = db
1974                .new_batch()
1975                .write(key_a, Some(val_a))
1976                .merkleize(&db, None)
1977                .await
1978                .unwrap();
1979
1980            // Build batch B as child of A.
1981            let key_b = colliding_digest(0x03, 0);
1982            let val_b = colliding_digest(0x03, 1);
1983            let batch_b = batch_a
1984                .new_batch::<Sha256>()
1985                .write(key_b, Some(val_b))
1986                .merkleize(&db, None)
1987                .await
1988                .unwrap();
1989
1990            db.apply_batch(batch_a).await.unwrap();
1991            db.commit().await.unwrap();
1992
1993            // Build the same logical B from committed DB for comparison.
1994            let committed_b = db
1995                .new_batch()
1996                .write(key_b, Some(val_b))
1997                .merkleize(&db, None)
1998                .await
1999                .unwrap();
2000            assert_eq!(batch_b.root(), committed_b.root());
2001
2002            // Apply B.
2003            db.apply_batch(batch_b).await.unwrap();
2004            assert_eq!(db.root(), committed_b.root());
2005
2006            db.destroy().await.unwrap();
2007        });
2008    }
2009
2010    #[test]
2011    fn sequential_commit_fixes_base_old_loc() {
2012        // Build DB -> A -> B where both touch the same key K.
2013        // Commit A, then apply B. Verify base_old_loc is adjusted.
2014        let runner = deterministic::Runner::default();
2015        runner.start(|context| async move {
2016            type TestDb = UnorderedFixedDb<
2017                mmr::Family,
2018                deterministic::Context,
2019                sha256::Digest,
2020                sha256::Digest,
2021                Sha256,
2022                OneCap,
2023            >;
2024
2025            let config = fixed_db_config::<OneCap>("seq-commit-base-old-loc", &context);
2026            let mut db = TestDb::init(context, config).await.unwrap();
2027
2028            // Seed an initial key so we have an existing entry.
2029            let key = colliding_digest(0x10, 0);
2030            let seed = db
2031                .new_batch()
2032                .write(key, Some(colliding_digest(0x10, 1)))
2033                .merkleize(&db, None)
2034                .await
2035                .unwrap();
2036            db.apply_batch(seed).await.unwrap();
2037            db.commit().await.unwrap();
2038
2039            // Build batch A that updates the key.
2040            let val_a = colliding_digest(0x10, 2);
2041            let batch_a = db
2042                .new_batch()
2043                .write(key, Some(val_a))
2044                .merkleize(&db, None)
2045                .await
2046                .unwrap();
2047
2048            // A's diff should have base_old_loc pointing to the seed's location.
2049            let a_entry = batch_a.diff.get(&key).unwrap();
2050            let a_loc = a_entry.loc();
2051            assert!(a_loc.is_some());
2052
2053            // Build batch B as child of A, also updating the same key.
2054            let val_b = colliding_digest(0x10, 3);
2055            let batch_b = batch_a
2056                .new_batch::<Sha256>()
2057                .write(key, Some(val_b))
2058                .merkleize(&db, None)
2059                .await
2060                .unwrap();
2061
2062            // Commit A. The base_old_loc fixup is deferred to apply_batch,
2063            // which reads A's diff by reference.
2064            db.apply_batch(batch_a).await.unwrap();
2065            db.commit().await.unwrap();
2066
2067            // Verify B produces the same root as a fresh build.
2068            let committed_b = db
2069                .new_batch()
2070                .write(key, Some(val_b))
2071                .merkleize(&db, None)
2072                .await
2073                .unwrap();
2074            assert_eq!(batch_b.root(), committed_b.root());
2075
2076            db.apply_batch(batch_b).await.unwrap();
2077            assert_eq!(db.root(), committed_b.root());
2078
2079            db.destroy().await.unwrap();
2080        });
2081    }
2082
2083    #[test]
2084    fn fork_apply_after_parent_committed() {
2085        // Fork: DB -> A -> B and DB -> A -> C.
2086        // Commit A, then apply B and C independently.
2087        let runner = deterministic::Runner::default();
2088        runner.start(|context| async move {
2089            type TestDb = UnorderedFixedDb<
2090                mmr::Family,
2091                deterministic::Context,
2092                sha256::Digest,
2093                sha256::Digest,
2094                Sha256,
2095                OneCap,
2096            >;
2097
2098            let config = fixed_db_config::<OneCap>("fork-after-commit", &context);
2099            let mut db = TestDb::init(context, config).await.unwrap();
2100
2101            // Seed.
2102            let seed = db
2103                .new_batch()
2104                .write(colliding_digest(0x20, 0), Some(colliding_digest(0x20, 1)))
2105                .merkleize(&db, None)
2106                .await
2107                .unwrap();
2108            db.apply_batch(seed).await.unwrap();
2109            db.commit().await.unwrap();
2110
2111            // Build batch A.
2112            let key_a = colliding_digest(0x21, 0);
2113            let val_a = colliding_digest(0x21, 1);
2114            let batch_a = db
2115                .new_batch()
2116                .write(key_a, Some(val_a))
2117                .merkleize(&db, None)
2118                .await
2119                .unwrap();
2120
2121            // Fork: B and C both derive from A.
2122            let key_b = colliding_digest(0x22, 0);
2123            let val_b = colliding_digest(0x22, 1);
2124            let batch_b = batch_a
2125                .new_batch::<Sha256>()
2126                .write(key_b, Some(val_b))
2127                .merkleize(&db, None)
2128                .await
2129                .unwrap();
2130            let key_c = colliding_digest(0x23, 0);
2131            let val_c = colliding_digest(0x23, 1);
2132            let batch_c = batch_a
2133                .new_batch::<Sha256>()
2134                .write(key_c, Some(val_c))
2135                .merkleize(&db, None)
2136                .await
2137                .unwrap();
2138
2139            db.apply_batch(batch_a).await.unwrap();
2140            db.commit().await.unwrap();
2141
2142            // Verify both produce correct roots.
2143            let committed_b = db
2144                .new_batch()
2145                .write(key_b, Some(val_b))
2146                .merkleize(&db, None)
2147                .await
2148                .unwrap();
2149            assert_eq!(batch_b.root(), committed_b.root());
2150
2151            let committed_c = db
2152                .new_batch()
2153                .write(key_c, Some(val_c))
2154                .merkleize(&db, None)
2155                .await
2156                .unwrap();
2157            assert_eq!(batch_c.root(), committed_c.root());
2158
2159            db.destroy().await.unwrap();
2160        });
2161    }
2162
2163    #[test]
2164    fn sequential_commit_three_deep() {
2165        // Build DB -> grandparent -> parent -> child, commit each
2166        // sequentially. Tests applying across batch boundaries.
2167        let runner = deterministic::Runner::default();
2168        runner.start(|context| async move {
2169            type TestDb = UnorderedFixedDb<
2170                mmr::Family,
2171                deterministic::Context,
2172                sha256::Digest,
2173                sha256::Digest,
2174                Sha256,
2175                OneCap,
2176            >;
2177
2178            let config = fixed_db_config::<OneCap>("ff-cross", &context);
2179            let mut db = TestDb::init(context, config).await.unwrap();
2180
2181            // Grandparent: 2 keys.
2182            let grandparent = db
2183                .new_batch()
2184                .write(colliding_digest(0x01, 0), Some(colliding_digest(0x01, 1)))
2185                .write(colliding_digest(0x02, 0), Some(colliding_digest(0x02, 1)))
2186                .merkleize(&db, None)
2187                .await
2188                .unwrap();
2189
2190            // Parent: 1 key.
2191            let parent = grandparent
2192                .new_batch::<Sha256>()
2193                .write(colliding_digest(0x03, 0), Some(colliding_digest(0x03, 1)))
2194                .merkleize(&db, None)
2195                .await
2196                .unwrap();
2197
2198            // Child: 1 key.
2199            let child = parent
2200                .new_batch::<Sha256>()
2201                .write(colliding_digest(0x04, 0), Some(colliding_digest(0x04, 1)))
2202                .merkleize(&db, None)
2203                .await
2204                .unwrap();
2205
2206            // Commit grandparent.
2207            db.apply_batch(grandparent).await.unwrap();
2208            db.commit().await.unwrap();
2209
2210            // Commit parent.
2211            db.apply_batch(parent).await.unwrap();
2212            db.commit().await.unwrap();
2213
2214            // Commit child.
2215            db.apply_batch(child).await.unwrap();
2216
2217            // All 4 keys should be present.
2218            for i in 1..=4 {
2219                assert_eq!(
2220                    db.get(&colliding_digest(i, 0)).await.unwrap(),
2221                    Some(colliding_digest(i, 1))
2222                );
2223            }
2224
2225            db.destroy().await.unwrap();
2226        });
2227    }
2228
2229    /// Regression test for issue #3519 / #3520: when a parent batch deletes a
2230    /// key that has a collision sibling and the child re-creates that key, the
2231    /// `fresh.chain(recreates)` iterator produced operations in a different
2232    /// order depending on whether the parent was pending or committed.
2233    #[test]
2234    fn recreate_deleted_key_with_collision_sibling_root_matches() {
2235        let runner = deterministic::Runner::default();
2236        runner.start(|context| async move {
2237            type TestDb = UnorderedFixedDb<
2238                mmr::Family,
2239                deterministic::Context,
2240                sha256::Digest,
2241                sha256::Digest,
2242                Sha256,
2243                OneCap,
2244            >;
2245
2246            let config = fixed_db_config::<OneCap>("recreate-deleted-collision", &context);
2247            let mut db = TestDb::init(context, config).await.unwrap();
2248
2249            // Two colliding keys: K0 (suffix 0) and K6 (suffix 6).
2250            let k0 = colliding_digest(0xAA, 0);
2251            let k6 = colliding_digest(0xAA, 6);
2252
2253            // Seed both keys so the snapshot bucket contains two entries.
2254            let initial = db
2255                .new_batch()
2256                .write(k0, Some(colliding_digest(0xBB, 0)))
2257                .write(k6, Some(colliding_digest(0xBB, 6)))
2258                .merkleize(&db, None)
2259                .await
2260                .unwrap();
2261            db.apply_batch(initial).await.unwrap();
2262            db.commit().await.unwrap();
2263
2264            // Parent: delete K0. K6 remains untouched.
2265            let parent = db
2266                .new_batch()
2267                .write(k0, None)
2268                .merkleize(&db, None)
2269                .await
2270                .unwrap();
2271
2272            // Child (pending parent): re-create K0 and write a new colliding key K29.
2273            let k29 = colliding_digest(0xAA, 29);
2274            let pending_child = parent
2275                .new_batch::<Sha256>()
2276                .write(k0, Some(colliding_digest(0xCC, 0)))
2277                .write(k29, Some(colliding_digest(0xCC, 29)))
2278                .merkleize(&db, None)
2279                .await
2280                .unwrap();
2281
2282            // Commit the parent, then rebuild the same child.
2283            db.apply_batch(parent).await.unwrap();
2284            db.commit().await.unwrap();
2285
2286            let committed_child = db
2287                .new_batch()
2288                .write(k0, Some(colliding_digest(0xCC, 0)))
2289                .write(k29, Some(colliding_digest(0xCC, 29)))
2290                .merkleize(&db, None)
2291                .await
2292                .unwrap();
2293
2294            assert_eq!(
2295                pending_child.root(),
2296                committed_child.root(),
2297                "root depended on pending-vs-committed parent path \
2298                 when re-creating a deleted key with collision siblings"
2299            );
2300
2301            db.destroy().await.unwrap();
2302        });
2303    }
2304}