Skip to main content

commonware_storage/qmdb/immutable/
mod.rs

1//! An authenticated database that only supports adding new keyed values (no updates or
2//! deletions).
3//!
4//! Two variants are available based on value encoding:
5//! - [fixed]: For fixed-size values.
6//! - [variable]: For variable-size values.
7//!
8//! # Inactivity floor
9//!
10//! Each commit carries an inactivity floor: a location before which the application
11//! declares operations are no longer needed. The floor is embedded in the operation
12//! log and included in the Merkle root, so all replicas processing the same operations
13//! arrive at the same floor.
14//!
15//! The floor controls two things:
16//! - **Pruning**: [`Immutable::prune`] only allows pruning up to the floor.
17//! - **Reconstruction**: on restart or sync, the snapshot is rebuilt from the floor
18//!   onward. Keys set before the floor are not loaded into memory.
19//!
20//! The floor must be monotonically non-decreasing across commits and must not exceed
21//! the batch's total operation count. Pass `db.inactivity_floor_loc()` to keep the
22//! floor unchanged, or a higher value to advance it.
23//!
24//! # Examples
25//!
26//! ```ignore
27//! // Simple mode: apply a batch, then durably commit it.
28//! // The third argument to merkleize is the inactivity floor -- operations
29//! // before this location are declared inactive by the application.
30//! let floor = db.inactivity_floor_loc();
31//! let merkleized = db.new_batch()
32//!     .set(key, value)
33//!     .merkleize(&db, None, floor);
34//! db.apply_batch(merkleized).await?;
35//! db.commit().await?;
36//! ```
37//!
38//! ```ignore
39//! // Batches can still fork before you apply them.
40//! let floor = db.inactivity_floor_loc();
41//! let parent = db.new_batch()
42//!     .set(key_a, value_a)
43//!     .merkleize(&db, None, floor);
44//!
45//! let child_a = parent.new_batch::<Sha256>()
46//!     .set(key_b, value_b)
47//!     .merkleize(&db, None, floor);
48//!
49//! let child_b = parent.new_batch::<Sha256>()
50//!     .set(key_c, value_c)
51//!     .merkleize(&db, None, floor);
52//!
53//! db.apply_batch(child_a).await?;
54//! db.commit().await?;
55//! ```
56//!
57//! ```ignore
58//! // Advanced mode: while the previous batch is being committed, build exactly
59//! // one child batch from the newly published state.
60//! let floor = db.inactivity_floor_loc();
61//! let parent = db.new_batch()
62//!     .set(key_a, value_a)
63//!     .merkleize(&db, None, floor);
64//! db.apply_batch(parent).await?;
65//!
66//! let (child, commit_result) = futures::join!(
67//!     async {
68//!         db.new_batch()
69//!             .set(key_b, value_b)
70//!             .merkleize(&db, None, floor)
71//!     },
72//!     db.commit(),
73//! );
74//! commit_result?;
75//!
76//! db.apply_batch(child).await?;
77//! db.commit().await?;
78//! ```
79
80use crate::{
81    index::{unordered::Index, Unordered as _},
82    journal::{
83        authenticated,
84        contiguous::{Contiguous, Mutable, Reader},
85        Error as JournalError,
86    },
87    merkle::{full::Config as MerkleConfig, Family, Location, Proof},
88    qmdb::{
89        any::ValueEncoding,
90        build_snapshot_from_log,
91        metrics::{KeyReadMetrics, OperationMetrics, StateMetrics},
92        operation::Key,
93        Error,
94    },
95    translator::Translator,
96    Context, Persistable,
97};
98use commonware_codec::EncodeShared;
99use commonware_cryptography::Hasher as CHasher;
100use commonware_parallel::Strategy;
101use std::{collections::HashSet, num::NonZeroU64, ops::Range, sync::Arc};
102use tracing::warn;
103
104/// Metrics for Immutable QMDBs.
105pub(crate) struct Metrics<E: Context> {
106    /// State gauges.
107    pub state: StateMetrics,
108    /// Write and durability metrics.
109    pub operations: OperationMetrics<E>,
110    /// Key read metrics.
111    pub reads: KeyReadMetrics<E>,
112}
113
114impl<E: Context> Metrics<E> {
115    /// Create and register metrics.
116    pub fn new(context: E) -> Self {
117        let context = Arc::new(context);
118        Self {
119            state: StateMetrics::new(context.as_ref()),
120            operations: OperationMetrics::new(context.clone()),
121            reads: KeyReadMetrics::new(context),
122        }
123    }
124}
125
126pub mod batch;
127mod compact;
128pub mod fixed;
129mod operation;
130pub mod sync;
131pub mod variable;
132
133pub use compact::{
134    Config as CompactConfig, Db as CompactDb, MerkleizedBatch as CompactMerkleizedBatch,
135    UnmerkleizedBatch as CompactUnmerkleizedBatch,
136};
137pub use operation::Operation;
138
139/// Configuration for an [Immutable] authenticated db.
140#[derive(Clone)]
141pub struct Config<T: Translator, J, S: Strategy> {
142    /// Configuration for the Merkle structure backing the authenticated journal.
143    pub merkle_config: MerkleConfig<S>,
144
145    /// Configuration for the operations log journal.
146    pub log: J,
147
148    /// The translator used by the compressed index.
149    pub translator: T,
150}
151
152/// An authenticated database that only supports adding new keyed values (no updates or
153/// deletions).
154///
155/// # Invariant
156///
157/// A key must be set at most once across the database history. Writing the same key more than
158/// once is undefined behavior.
159///
160/// Use [fixed::Db] or [variable::Db] for concrete instantiations.
161pub struct Immutable<
162    F: Family,
163    E: Context,
164    K: Key,
165    V: ValueEncoding,
166    C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
167    H: CHasher,
168    T: Translator,
169    S: Strategy,
170> where
171    C::Item: EncodeShared,
172{
173    /// Authenticated journal of operations.
174    pub(crate) journal: authenticated::Journal<F, E, C, H, S>,
175
176    /// Cached canonical operations root.
177    pub(crate) root: H::Digest,
178
179    /// A map from each active key to the location of the operation that set its value.
180    ///
181    /// # Invariant
182    ///
183    /// Only references operations of type [Operation::Set].
184    pub(crate) snapshot: Index<T, Location<F>>,
185
186    /// The location of the last commit operation.
187    pub(crate) last_commit_loc: Location<F>,
188
189    /// The inactivity floor declared by the last committed batch.
190    /// Operations before this location are considered inactive by the application.
191    pub(crate) inactivity_floor_loc: Location<F>,
192
193    /// Metrics for this database.
194    metrics: Metrics<E>,
195}
196
197// Shared read-only functionality.
198impl<F, E, K, V, C, H, T, S> Immutable<F, E, K, V, C, H, T, S>
199where
200    F: Family,
201    E: Context,
202    K: Key,
203    V: ValueEncoding,
204    C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
205    C::Item: EncodeShared,
206    H: CHasher,
207    T: Translator,
208    S: Strategy,
209{
210    /// Initialize from a pre-constructed authenticated journal.
211    ///
212    /// Seeds an initial commit if the journal is empty, builds the in-memory snapshot,
213    /// and returns the initialized database.
214    pub(crate) async fn init_from_journal(
215        mut journal: authenticated::Journal<F, E, C, H, S>,
216        context: E,
217        translator: T,
218    ) -> Result<Self, Error<F>> {
219        if journal.size().await == 0 {
220            warn!("Authenticated log is empty, initialized new db.");
221            journal
222                .append(&Operation::Commit(None, Location::new(0)))
223                .await?;
224            journal.sync().await?;
225        }
226
227        let mut snapshot = Index::new(context.child("snapshot"), translator);
228
229        let (last_commit_loc, inactivity_floor_loc) = {
230            let reader = journal.journal.reader().await;
231            let bounds = reader.bounds();
232            let last_commit_loc =
233                Location::new(bounds.end.checked_sub(1).expect("commit should exist"));
234
235            // Read the floor from the last commit operation.
236            let last_op = reader.read(*last_commit_loc).await?;
237            let inactivity_floor_loc = last_op
238                .has_floor()
239                .expect("last operation should be a commit with floor");
240            if inactivity_floor_loc > last_commit_loc {
241                return Err(Error::DataCorrupted("inactivity floor exceeds last commit"));
242            }
243
244            // Replay the log from the inactivity floor to build the snapshot.
245            build_snapshot_from_log::<F, _, _, _>(
246                inactivity_floor_loc,
247                &reader,
248                &mut snapshot,
249                |_, _| {},
250            )
251            .await?;
252
253            (last_commit_loc, inactivity_floor_loc)
254        };
255        let inactive_peaks = F::inactive_peaks(
256            F::location_to_position(Location::new(*last_commit_loc + 1)),
257            inactivity_floor_loc,
258        );
259        let root = journal.root(inactive_peaks)?;
260
261        let metrics = Metrics::new(context);
262        let db = Self {
263            journal,
264            root,
265            snapshot,
266            last_commit_loc,
267            inactivity_floor_loc,
268            metrics,
269        };
270        db.update_metrics().await;
271        Ok(db)
272    }
273
274    /// Return the inactivity floor location declared by the last committed batch.
275    pub const fn inactivity_floor_loc(&self) -> Location<F> {
276        self.inactivity_floor_loc
277    }
278
279    /// Return the Location of the next operation appended to this db.
280    pub async fn size(&self) -> Location<F> {
281        self.bounds().await.end
282    }
283
284    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
285    /// retained operations respectively.
286    pub async fn bounds(&self) -> Range<Location<F>> {
287        let bounds = self.journal.reader().await.bounds();
288        Location::new(bounds.start)..Location::new(bounds.end)
289    }
290
291    /// Update state gauges from the current database state.
292    async fn update_metrics(&self) {
293        let bounds = self.journal.reader().await.bounds();
294        self.metrics.state.set(
295            bounds.end,
296            bounds.start,
297            *self.inactivity_floor_loc,
298            *self.last_commit_loc,
299        );
300    }
301
302    /// Return the most recent location from which this database can safely be synced, and the
303    /// upper bound on [`Self::prune`]'s `loc`. For immutable databases, this equals the
304    /// inactivity floor declared by the last committed batch.
305    pub const fn sync_boundary(&self) -> Location<F> {
306        self.inactivity_floor_loc
307    }
308
309    /// Get the value of `key` in the db, or None if it has no value or its corresponding operation
310    /// has been pruned.
311    pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error<F>> {
312        let _timer = self.metrics.reads.get_timer();
313        self.metrics.reads.get_calls.inc();
314        self.metrics.reads.keys_requested.inc();
315        let iter = self.snapshot.get(key);
316        let reader = self.journal.reader().await;
317        let oldest = reader.bounds().start;
318        let mut result = None;
319        for &loc in iter {
320            if loc < oldest {
321                continue;
322            }
323            if let Some(v) = Self::get_from_loc(&reader, key, loc).await? {
324                result = Some(v);
325                break;
326            }
327        }
328
329        Ok(result)
330    }
331
332    /// Batch read multiple keys.
333    ///
334    /// Returns results in the same order as the input keys.
335    pub async fn get_many(&self, keys: &[&K]) -> Result<Vec<Option<V::Value>>, Error<F>> {
336        if keys.is_empty() {
337            return Ok(Vec::new());
338        }
339
340        let _timer = self.metrics.reads.get_many_timer();
341        self.metrics.reads.get_many_calls.inc();
342        self.metrics.reads.keys_requested.inc_by(keys.len() as u64);
343        let mut candidates: Vec<(usize, u64)> = Vec::with_capacity(keys.len());
344        let mut results: Vec<Option<V::Value>> = vec![None; keys.len()];
345
346        let reader = self.journal.reader().await;
347        let oldest = reader.bounds().start;
348
349        for (key_idx, key) in keys.iter().enumerate() {
350            for &loc in self.snapshot.get(key) {
351                if loc < oldest {
352                    continue;
353                }
354                candidates.push((key_idx, *loc));
355            }
356        }
357
358        if candidates.is_empty() {
359            return Ok(results);
360        }
361
362        candidates.sort_unstable_by_key(|&(_, pos)| pos);
363
364        let mut positions: Vec<u64> = Vec::with_capacity(candidates.len());
365        for &(_, pos) in &candidates {
366            if positions.last() != Some(&pos) {
367                positions.push(pos);
368            }
369        }
370
371        let ops = reader.read_many(&positions).await?;
372
373        for &(key_idx, pos) in &candidates {
374            if results[key_idx].is_some() {
375                continue;
376            }
377            let op_idx = positions
378                .binary_search(&pos)
379                .expect("position was deduped from candidates");
380            let Operation::Set(k, v) = &ops[op_idx] else {
381                return Err(Error::UnexpectedData(Location::new(pos)));
382            };
383            if k == keys[key_idx] {
384                results[key_idx] = Some(v.clone());
385            }
386        }
387
388        Ok(results)
389    }
390
391    /// Get the value of the operation with location `loc` in the db if it matches `key`. Returns
392    /// [`crate::qmdb::Error::OperationPruned`] if loc precedes the oldest retained location. The
393    /// location is otherwise assumed valid.
394    async fn get_from_loc(
395        reader: &impl Reader<Item = Operation<F, K, V>>,
396        key: &K,
397        loc: Location<F>,
398    ) -> Result<Option<V::Value>, Error<F>> {
399        if loc < reader.bounds().start {
400            return Err(Error::OperationPruned(loc));
401        }
402
403        let Operation::Set(k, v) = reader.read(*loc).await? else {
404            return Err(Error::UnexpectedData(loc));
405        };
406
407        if k != *key {
408            Ok(None)
409        } else {
410            Ok(Some(v))
411        }
412    }
413
414    /// Get the metadata associated with the last commit.
415    pub async fn get_metadata(&self) -> Result<Option<V::Value>, Error<F>> {
416        let last_commit_loc = self.last_commit_loc;
417        let Operation::Commit(metadata, _floor) = self
418            .journal
419            .journal
420            .reader()
421            .await
422            .read(*last_commit_loc)
423            .await?
424        else {
425            unreachable!("no commit operation at location of last commit {last_commit_loc}");
426        };
427
428        Ok(metadata)
429    }
430
431    /// Analogous to proof but with respect to the state of the database when it had `op_count`
432    /// operations.
433    ///
434    /// # Contract
435    ///
436    /// `op_count` must be a commit-boundary size: the operation at `op_count - 1` must
437    /// itself be a commit op. Non-commit-boundary sizes are not supported because the
438    /// inactivity floor governing them is not directly retrievable.
439    ///
440    /// # Errors
441    ///
442    /// Returns [crate::merkle::Error::LocationOverflow] if `op_count` or `start_loc` >
443    /// [crate::merkle::Family::MAX_LEAVES].
444    /// Returns [crate::merkle::Error::RangeOutOfBounds] if `op_count` > number of operations, or
445    /// if `start_loc` >= `op_count`.
446    /// Returns [`Error::OperationPruned`] if `start_loc` has been pruned.
447    /// Returns [`Error::HistoricalFloorPruned`] if `op_count - 1` is retained but is not a
448    /// commit op, either because the caller passed a non-commit-boundary `op_count` or
449    /// because pruning removed the commit that would have governed `op_count`.
450    pub async fn historical_proof(
451        &self,
452        op_count: Location<F>,
453        start_loc: Location<F>,
454        max_ops: NonZeroU64,
455    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, K, V>>), Error<F>> {
456        if op_count > self.journal.size().await {
457            return Err(crate::merkle::Error::RangeOutOfBounds(op_count).into());
458        }
459
460        let reader = self.journal.reader().await;
461        let inactive_peaks =
462            crate::qmdb::inactive_peaks_at::<F, _>(&reader, op_count, |op| op.has_floor()).await?;
463
464        Ok(self
465            .journal
466            .historical_proof(op_count, start_loc, max_ops, inactive_peaks)
467            .await?)
468    }
469
470    /// Generate and return:
471    ///  1. a proof of all operations applied to the db in the range starting at (and including)
472    ///     location `start_loc`, and ending at the first of either:
473    ///     - the last operation performed, or
474    ///     - the operation `max_ops` from the start.
475    ///  2. the operations corresponding to the leaves in this range.
476    pub async fn proof(
477        &self,
478        start_index: Location<F>,
479        max_ops: NonZeroU64,
480    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, K, V>>), Error<F>> {
481        let op_count = self.bounds().await.end;
482        self.historical_proof(op_count, start_index, max_ops).await
483    }
484
485    /// Prune operations prior to `prune_loc`. This does not affect the db's root, but it will
486    /// affect retrieval of any keys that were set prior to `prune_loc`.
487    ///
488    /// Pruning is irreversible. Callers must ensure any floor-raising batch has been durably
489    /// committed (via [`Immutable::commit`] or [`Immutable::sync`]) before pruning. The
490    /// inactivity floor used to gate pruning is updated by [`Immutable::apply_batch`] before
491    /// the batch is durable. If the batch is lost on crash, recovery replays from the prior
492    /// durable floor, which may reference data that has already been pruned.
493    ///
494    /// # Errors
495    ///
496    /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor.
497    /// - Returns [crate::merkle::Error::LocationOverflow] if `prune_loc` > [crate::merkle::Family::MAX_LEAVES].
498    pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
499        let _timer = self.metrics.operations.prune_timer();
500        self.metrics.operations.prune_calls.inc();
501        if loc > self.inactivity_floor_loc {
502            return Err(Error::PruneBeyondMinRequired(
503                loc,
504                self.inactivity_floor_loc,
505            ));
506        }
507        self.journal.prune(loc).await?;
508        self.update_metrics().await;
509        Ok(())
510    }
511
512    /// Rewind the database to `size` operations, where `size` is the location of the next append.
513    ///
514    /// This rewinds both the operations journal and its Merkle structure to the historical
515    /// state at `size`, and removes rewound set operations from the in-memory snapshot.
516    ///
517    /// # Errors
518    ///
519    /// Returns an error when:
520    /// - `size` is not a valid rewind target
521    /// - the target's required logical range is not fully retained (for immutable, this means the
522    ///   oldest retained location is already beyond the rewind boundary)
523    /// - `size - 1` is not a commit operation
524    ///
525    /// Any error from this method is fatal for this handle. Rewind may mutate journal state
526    /// before this method finishes rebuilding in-memory rewind state. Callers must drop this
527    /// database handle after any `Err` from `rewind` and reopen from storage.
528    ///
529    /// A successful rewind is not restart-stable until a subsequent [`Immutable::commit`] or
530    /// [`Immutable::sync`].
531    pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
532        let rewind_size = *size;
533        let current_size = *self.last_commit_loc + 1;
534        if rewind_size == current_size {
535            return Ok(());
536        }
537        if rewind_size == 0 || rewind_size > current_size {
538            return Err(Error::Journal(crate::journal::Error::InvalidRewind(
539                rewind_size,
540            )));
541        }
542
543        let (rewind_last_loc, rewind_floor, rewound_keys) = {
544            let reader = self.journal.reader().await;
545            let bounds = reader.bounds();
546            let rewind_last_loc = Location::new(rewind_size - 1);
547            if rewind_size <= bounds.start {
548                return Err(Error::Journal(crate::journal::Error::ItemPruned(
549                    *rewind_last_loc,
550                )));
551            }
552            let rewind_last_op = reader.read(*rewind_last_loc).await?;
553            let Operation::Commit(_, rewind_floor) = &rewind_last_op else {
554                return Err(Error::UnexpectedData(rewind_last_loc));
555            };
556            let rewind_floor = *rewind_floor;
557            if *rewind_floor < bounds.start {
558                return Err(Error::Journal(crate::journal::Error::ItemPruned(
559                    *rewind_floor,
560                )));
561            }
562
563            let mut rewound_keys = Vec::new();
564            for loc in rewind_size..current_size {
565                if let Operation::Set(key, _) = reader.read(loc).await? {
566                    rewound_keys.push(key);
567                }
568            }
569
570            (rewind_last_loc, rewind_floor, rewound_keys)
571        };
572
573        let old_floor = self.inactivity_floor_loc;
574
575        // Journal rewind happens before in-memory snapshot updates. If a later step fails, this
576        // handle may be internally diverged and must be dropped by the caller.
577        self.journal.rewind(rewind_size).await?;
578
579        // Remove keys that were set in the range [rewind_size, current_size) from the snapshot.
580        let rewind_loc = Location::<F>::new(rewind_size);
581        for key in &rewound_keys {
582            // Filter by location to make sure we don't also prune keys that happen to collide.
583            self.snapshot.retain(key, |loc| *loc < rewind_loc);
584        }
585
586        // If the rewind target has a lower floor than the current snapshot was
587        // built from, insert keys from the gap [rewind_floor, old_floor) that
588        // were excluded by the higher-floor reconstruction.
589        //
590        // Iterate in reverse so front-insertion preserves ascending loc order
591        // for repeated keys, matching the ordering that apply_batch produces.
592        if rewind_floor < old_floor {
593            let reader = self.journal.journal.reader().await;
594            let gap_end = core::cmp::min(*old_floor, rewind_size);
595            for loc in (*rewind_floor..gap_end).rev() {
596                if let Operation::Set(key, _) = reader.read(loc).await? {
597                    self.snapshot.insert(&key, Location::new(loc));
598                }
599            }
600        }
601
602        self.last_commit_loc = rewind_last_loc;
603        self.inactivity_floor_loc = rewind_floor;
604        let inactive_peaks = F::inactive_peaks(F::location_to_position(size), rewind_floor);
605        self.root = self.journal.root(inactive_peaks)?;
606        self.update_metrics().await;
607
608        Ok(())
609    }
610
611    /// Return the canonical QMDB root of the db.
612    pub const fn root(&self) -> H::Digest {
613        self.root
614    }
615
616    /// Return a reference to the merkleization strategy.
617    pub const fn strategy(&self) -> &S {
618        self.journal.strategy()
619    }
620
621    /// Return the pinned Merkle nodes at the given location.
622    pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<H::Digest>, Error<F>> {
623        self.journal
624            .merkle
625            .pinned_nodes_at(loc)
626            .await
627            .map_err(Into::into)
628    }
629
630    /// Sync all database state to disk. While this isn't necessary to ensure durability of
631    /// committed operations, periodic invocation may reduce memory usage and the time required to
632    /// recover the database on restart.
633    pub async fn sync(&self) -> Result<(), Error<F>> {
634        let _timer = self.metrics.operations.sync_timer();
635        self.metrics.operations.sync_calls.inc();
636        self.journal.sync().await?;
637        Ok(())
638    }
639
640    /// Durably commit the journal state published by prior [`Immutable::apply_batch`] calls.
641    pub async fn commit(&self) -> Result<(), Error<F>> {
642        let _timer = self.metrics.operations.commit_timer();
643        self.metrics.operations.commit_calls.inc();
644        self.journal.commit().await?;
645        Ok(())
646    }
647
648    /// Destroy the db, removing all data from disk.
649    pub async fn destroy(self) -> Result<(), Error<F>> {
650        Ok(self.journal.destroy().await?)
651    }
652
653    /// Create a new speculative batch of operations with this database as its parent.
654    #[allow(clippy::type_complexity)]
655    pub fn new_batch(&self) -> batch::UnmerkleizedBatch<F, H, K, V, S> {
656        let journal_size = *self.last_commit_loc + 1;
657        batch::UnmerkleizedBatch::new(self, journal_size)
658    }
659
660    /// Apply a [`batch::MerkleizedBatch`] to the database.
661    ///
662    /// A batch is valid only if every batch applied to the database since this batch's
663    /// ancestor chain was created is an ancestor of this batch. Applying a batch from a
664    /// different fork returns [`Error::StaleBatch`].
665    ///
666    /// Returns the range of locations written.
667    ///
668    /// # Errors
669    ///
670    /// - [`Error::StaleBatch`] if the batch was created from a stale DB state.
671    /// - [`Error::FloorRegressed`] if any commit in the chain (the tip or any
672    ///   unapplied ancestor) declares an inactivity floor below the previous
673    ///   commit's floor (or, for the oldest unapplied commit, below the
674    ///   database's current floor).
675    /// - [`Error::FloorBeyondSize`] if any commit in the chain (the tip or any
676    ///   unapplied ancestor) declares an inactivity floor that exceeds its
677    ///   own commit operation's location. The maximum valid floor for a
678    ///   commit is its own location; a floor past the commit would permit
679    ///   pruning the commit itself.
680    ///
681    /// On any floor error, the database state is unchanged.
682    ///
683    /// This publishes the batch to the in-memory database state and appends it to the
684    /// journal, but does not durably commit it. Call [`Immutable::commit`] or
685    /// [`Immutable::sync`] to guarantee durability.
686    pub async fn apply_batch(
687        &mut self,
688        batch: Arc<batch::MerkleizedBatch<F, H::Digest, K, V, S>>,
689    ) -> Result<Range<Location<F>>, Error<F>> {
690        let _timer = self.metrics.operations.apply_batch_timer();
691        self.metrics.operations.apply_batch_calls.inc();
692        let db_size = *self.last_commit_loc + 1;
693        batch
694            .bounds
695            .validate_apply_to(db_size, self.inactivity_floor_loc)?;
696        let start_loc = Location::new(db_size);
697
698        // Apply journal.
699        self.journal.apply_batch(&batch.journal_batch).await?;
700
701        // Apply snapshot inserts. Child first (child wins via `seen`), then
702        // uncommitted ancestor batches.
703        let bounds = self.journal.reader().await.bounds();
704        let mut seen = HashSet::new();
705        for (key, entry) in batch.diff.iter() {
706            seen.insert(key.clone());
707            self.snapshot
708                .insert_and_retain(key, entry.loc, |v| *v >= bounds.start);
709        }
710        for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
711            if batch.bounds.ancestors[i].end <= db_size {
712                continue;
713            }
714            for (key, entry) in ancestor_diff.iter() {
715                if seen.insert(key.clone()) {
716                    self.snapshot
717                        .insert_and_retain(key, entry.loc, |v| *v >= bounds.start);
718                }
719            }
720        }
721
722        // Update state.
723        self.last_commit_loc = Location::new(batch.bounds.total_size - 1);
724        self.inactivity_floor_loc = batch.bounds.inactivity_floor;
725        self.root = batch.root;
726        let range = start_loc..Location::new(batch.bounds.total_size);
727        self.update_metrics().await;
728        self.metrics
729            .operations
730            .operations_applied
731            .inc_by(*range.end - *range.start);
732        Ok(range)
733    }
734}
735
736#[cfg(test)]
737pub(super) mod test {
738    use super::*;
739    use crate::{
740        merkle::{Family, Location},
741        qmdb::{self, verify_proof},
742        translator::TwoCap,
743    };
744    use commonware_codec::EncodeShared;
745    use commonware_cryptography::{sha256, sha256::Digest, Sha256};
746    use commonware_runtime::{deterministic, Supervisor as _};
747    use commonware_utils::NZU64;
748    use core::{future::Future, pin::Pin};
749    use std::ops::Range;
750
751    const ITEMS_PER_SECTION: u64 = 5;
752
753    type TestDb<F, V, C> = Immutable<
754        F,
755        deterministic::Context,
756        Digest,
757        V,
758        C,
759        Sha256,
760        TwoCap,
761        commonware_parallel::Sequential,
762    >;
763
764    pub(crate) async fn test_immutable_empty<F: Family, V, C>(
765        context: deterministic::Context,
766        open_db: impl Fn(
767            deterministic::Context,
768        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
769    ) where
770        V: ValueEncoding<Value = Digest>,
771        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
772        C::Item: EncodeShared,
773    {
774        let db = open_db(context.child("first")).await;
775        let bounds = db.bounds().await;
776        assert_eq!(bounds.end, 1);
777        assert_eq!(bounds.start, Location::new(0));
778        assert_eq!(db.inactivity_floor_loc(), Location::new(0));
779        assert!(db.get_metadata().await.unwrap().is_none());
780
781        // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
782        let k1 = Sha256::fill(1u8);
783        let v1 = Sha256::fill(2u8);
784        let root = db.root();
785        {
786            let _batch = db.new_batch().set(k1, v1);
787            // Don't merkleize/apply -- simulate failed commit
788        }
789        drop(db);
790        let mut db = open_db(context.child("second")).await;
791        assert_eq!(db.root(), root);
792        assert_eq!(db.bounds().await.end, 1);
793
794        // Test calling commit on an empty db which should make it (durably) non-empty.
795        db.apply_batch(db.new_batch().merkleize(&db, None, Location::new(0)))
796            .await
797            .unwrap();
798        db.commit().await.unwrap();
799        assert_eq!(db.bounds().await.end, 2); // commit op added
800        let root = db.root();
801        drop(db);
802
803        let db = open_db(context.child("third")).await;
804        assert_eq!(db.root(), root);
805
806        db.destroy().await.unwrap();
807    }
808
809    pub(crate) async fn test_immutable_build_basic<F: Family, V, C>(
810        context: deterministic::Context,
811        open_db: impl Fn(
812            deterministic::Context,
813        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
814    ) where
815        V: ValueEncoding<Value = Digest>,
816        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
817        C::Item: EncodeShared,
818    {
819        // Build a db with 2 keys.
820        let mut db = open_db(context.child("first")).await;
821
822        let k1 = Sha256::fill(1u8);
823        let k2 = Sha256::fill(2u8);
824        let v1 = Sha256::fill(3u8);
825        let v2 = Sha256::fill(4u8);
826
827        assert!(db.get(&k1).await.unwrap().is_none());
828        assert!(db.get(&k2).await.unwrap().is_none());
829
830        // Set and commit the first key.
831        let metadata = Some(Sha256::fill(99u8));
832        db.apply_batch(
833            db.new_batch()
834                .set(k1, v1)
835                .merkleize(&db, metadata, Location::new(0)),
836        )
837        .await
838        .unwrap();
839        db.commit().await.unwrap();
840        assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
841        assert!(db.get(&k2).await.unwrap().is_none());
842        assert_eq!(db.bounds().await.end, 3);
843        assert_eq!(db.get_metadata().await.unwrap(), Some(Sha256::fill(99u8)));
844
845        // Set and commit the second key.
846        db.apply_batch(
847            db.new_batch()
848                .set(k2, v2)
849                .merkleize(&db, None, Location::new(0)),
850        )
851        .await
852        .unwrap();
853        db.commit().await.unwrap();
854        assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
855        assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
856        assert_eq!(db.bounds().await.end, 5);
857        assert_eq!(db.get_metadata().await.unwrap(), None);
858
859        // Capture state.
860        let root = db.root();
861
862        // Add an uncommitted op then simulate failure.
863        let k3 = Sha256::fill(5u8);
864        let v3 = Sha256::fill(6u8);
865        {
866            let _batch = db.new_batch().set(k3, v3);
867            // Don't merkleize/apply -- simulate failed commit
868        }
869
870        // Reopen, make sure state is restored to last commit point.
871        drop(db); // Simulate failed commit
872        let db = open_db(context.child("second")).await;
873        assert!(db.get(&k3).await.unwrap().is_none());
874        assert_eq!(db.root(), root);
875        assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
876        assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
877        assert_eq!(db.bounds().await.end, 5);
878        assert_eq!(db.get_metadata().await.unwrap(), None);
879
880        // Cleanup.
881        db.destroy().await.unwrap();
882    }
883
884    pub(crate) async fn test_immutable_proof_verify<F: Family, V, C>(
885        context: deterministic::Context,
886        open_db: impl Fn(
887            deterministic::Context,
888        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
889    ) where
890        V: ValueEncoding<Value = Digest>,
891        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
892        C::Item: EncodeShared,
893    {
894        let mut db = open_db(context.child("first")).await;
895
896        let k1 = Sha256::fill(1u8);
897        let v1 = Sha256::fill(10u8);
898        db.apply_batch(
899            db.new_batch()
900                .set(k1, v1)
901                .merkleize(&db, None, Location::new(0)),
902        )
903        .await
904        .unwrap();
905        db.commit().await.unwrap();
906
907        let (proof, ops) = db.proof(Location::new(0), NZU64!(100)).await.unwrap();
908        let root = db.root();
909        let hasher = qmdb::hasher::<Sha256>();
910        assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
911
912        db.destroy().await.unwrap();
913    }
914
915    pub(crate) async fn test_immutable_prune<F: Family, V, C>(
916        context: deterministic::Context,
917        open_db: impl Fn(
918            deterministic::Context,
919        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
920    ) where
921        V: ValueEncoding<Value = Digest>,
922        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
923        C::Item: EncodeShared,
924    {
925        let mut db = open_db(context.child("first")).await;
926
927        for i in 0..20u8 {
928            let key = Sha256::fill(i);
929            let value = Sha256::fill(i.wrapping_add(100));
930            let floor = db.bounds().await.end;
931            db.apply_batch(db.new_batch().set(key, value).merkleize(&db, None, floor))
932                .await
933                .unwrap();
934            db.commit().await.unwrap();
935        }
936
937        let root_before = db.root();
938        let bounds_before = db.bounds().await;
939
940        let prune_loc = Location::new(*bounds_before.end - 5);
941        db.prune(prune_loc).await.unwrap();
942
943        assert_eq!(db.root(), root_before);
944
945        let key_0 = Sha256::fill(0u8);
946        assert!(db.get(&key_0).await.unwrap().is_none());
947
948        let key_19 = Sha256::fill(19u8);
949        assert_eq!(
950            db.get(&key_19).await.unwrap(),
951            Some(Sha256::fill(19u8.wrapping_add(100)))
952        );
953
954        db.destroy().await.unwrap();
955    }
956
957    pub(crate) async fn test_immutable_batch_chain<F: Family, V, C>(
958        context: deterministic::Context,
959        open_db: impl Fn(
960            deterministic::Context,
961        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
962    ) where
963        V: ValueEncoding<Value = Digest>,
964        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
965        C::Item: EncodeShared,
966    {
967        let mut db = open_db(context.child("first")).await;
968
969        let k1 = Sha256::fill(1u8);
970        let k2 = Sha256::fill(2u8);
971        let k3 = Sha256::fill(3u8);
972        let v1 = Sha256::fill(11u8);
973        let v2 = Sha256::fill(12u8);
974        let v3 = Sha256::fill(13u8);
975
976        let parent = db
977            .new_batch()
978            .set(k1, v1)
979            .merkleize(&db, None, Location::new(0));
980        let child = parent
981            .new_batch::<Sha256>()
982            .set(k2, v2)
983            .merkleize(&db, None, Location::new(0));
984
985        assert_eq!(child.get(&k1, &db).await.unwrap(), Some(v1));
986        assert_eq!(child.get(&k2, &db).await.unwrap(), Some(v2));
987        assert!(child.get(&k3, &db).await.unwrap().is_none());
988
989        db.apply_batch(child).await.unwrap();
990        db.commit().await.unwrap();
991
992        assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
993        assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
994
995        db.apply_batch(
996            db.new_batch()
997                .set(k3, v3)
998                .merkleize(&db, None, Location::new(0)),
999        )
1000        .await
1001        .unwrap();
1002        db.commit().await.unwrap();
1003        assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
1004
1005        db.destroy().await.unwrap();
1006    }
1007
1008    pub(crate) async fn test_immutable_build_and_authenticate<F: Family, V, C>(
1009        context: deterministic::Context,
1010        open_db: impl Fn(
1011            deterministic::Context,
1012        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1013    ) where
1014        V: ValueEncoding<Value = Digest>,
1015        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1016        C::Item: EncodeShared,
1017    {
1018        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
1019        let hasher = qmdb::hasher::<Sha256>();
1020        let mut db = open_db(context.child("first")).await;
1021
1022        let mut batch = db.new_batch();
1023        for i in 0u64..2_000 {
1024            let k = Sha256::hash(&i.to_be_bytes());
1025            let v = Sha256::fill(i as u8);
1026            batch = batch.set(k, v);
1027        }
1028        let merkleized = batch.merkleize(&db, None, Location::new(0));
1029        db.apply_batch(merkleized).await.unwrap();
1030        db.commit().await.unwrap();
1031        assert_eq!(db.bounds().await.end, 2_000 + 2);
1032
1033        // Drop & reopen the db, making sure it has exactly the same state.
1034        let root = db.root();
1035        drop(db);
1036
1037        let db = open_db(context.child("second")).await;
1038        assert_eq!(root, db.root());
1039        assert_eq!(db.bounds().await.end, 2_000 + 2);
1040        for i in 0u64..2_000 {
1041            let k = Sha256::hash(&i.to_be_bytes());
1042            let v = Sha256::fill(i as u8);
1043            assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
1044        }
1045
1046        // Make sure all ranges of 5 operations are provable, including truncated ranges at the
1047        // end.
1048        let max_ops = NZU64!(5);
1049        for i in 0..*db.bounds().await.end {
1050            let (proof, log) = db.proof(Location::new(i), max_ops).await.unwrap();
1051            assert!(verify_proof(&hasher, &proof, Location::new(i), &log, &root));
1052        }
1053
1054        db.destroy().await.unwrap();
1055    }
1056
1057    pub(crate) async fn test_immutable_recovery_from_failed_merkle_sync<F: Family, V, C>(
1058        context: deterministic::Context,
1059        open_db: impl Fn(
1060            deterministic::Context,
1061        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1062    ) where
1063        V: ValueEncoding<Value = Digest>,
1064        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1065        C::Item: EncodeShared,
1066    {
1067        // Insert 1000 keys then sync.
1068        const ELEMENTS: u64 = 1000;
1069        let mut db = open_db(context.child("first")).await;
1070
1071        let mut batch = db.new_batch();
1072        for i in 0u64..ELEMENTS {
1073            let k = Sha256::hash(&i.to_be_bytes());
1074            let v = Sha256::fill(i as u8);
1075            batch = batch.set(k, v);
1076        }
1077        let merkleized = batch.merkleize(&db, None, Location::new(0));
1078        db.apply_batch(merkleized).await.unwrap();
1079        db.commit().await.unwrap();
1080        assert_eq!(db.bounds().await.end, ELEMENTS + 2);
1081        db.sync().await.unwrap();
1082        let halfway_root = db.root();
1083
1084        // Insert another 1000 keys (different from the first batch) then commit.
1085        let mut batch = db.new_batch();
1086        for i in ELEMENTS..ELEMENTS * 2 {
1087            let k = Sha256::hash(&i.to_be_bytes());
1088            let v = Sha256::fill(i as u8);
1089            batch = batch.set(k, v);
1090        }
1091        let merkleized = batch.merkleize(&db, None, Location::new(0));
1092        db.apply_batch(merkleized).await.unwrap();
1093        db.commit().await.unwrap();
1094        drop(db); // Drop before syncing
1095
1096        // Recovery should replay the log to regenerate the merkle structure.
1097        // op_count = 1002 (first batch + commit) + 1000 (second batch) + 1 (second commit) = 2003
1098        let db = open_db(context.child("second")).await;
1099        assert_eq!(db.bounds().await.end, 2003);
1100        let root = db.root();
1101        assert_ne!(root, halfway_root);
1102
1103        // Drop & reopen could preserve the final commit.
1104        drop(db);
1105        let db = open_db(context.child("third")).await;
1106        assert_eq!(db.bounds().await.end, 2003);
1107        assert_eq!(db.root(), root);
1108
1109        db.destroy().await.unwrap();
1110    }
1111
1112    pub(crate) async fn test_immutable_recovery_from_failed_log_sync<F: Family, V, C>(
1113        context: deterministic::Context,
1114        open_db: impl Fn(
1115            deterministic::Context,
1116        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1117    ) where
1118        V: ValueEncoding<Value = Digest>,
1119        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1120        C::Item: EncodeShared,
1121    {
1122        let mut db = open_db(context.child("first")).await;
1123
1124        // Insert a single key and then commit to create a first commit point.
1125        let k1 = Sha256::fill(1u8);
1126        let v1 = Sha256::fill(3u8);
1127        db.apply_batch(
1128            db.new_batch()
1129                .set(k1, v1)
1130                .merkleize(&db, None, Location::new(0)),
1131        )
1132        .await
1133        .unwrap();
1134        db.commit().await.unwrap();
1135        let first_commit_root = db.root();
1136
1137        // Simulate failure. Sets that are never merkleized/applied are lost.
1138        // Recovery should restore the last commit point.
1139        drop(db);
1140
1141        // Recovery should back up to previous commit point.
1142        let db = open_db(context.child("second")).await;
1143        assert_eq!(db.bounds().await.end, 3);
1144        let root = db.root();
1145        assert_eq!(root, first_commit_root);
1146
1147        db.destroy().await.unwrap();
1148    }
1149
1150    pub(crate) async fn test_immutable_pruning<F: Family, V, C>(
1151        context: deterministic::Context,
1152        open_db: impl Fn(
1153            deterministic::Context,
1154        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1155    ) where
1156        V: ValueEncoding<Value = Digest>,
1157        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1158        C::Item: EncodeShared,
1159    {
1160        // Build a db with `ELEMENTS` key/value pairs then prune some of them.
1161        const ELEMENTS: u64 = 2_000;
1162        let mut db = open_db(context.child("first")).await;
1163
1164        // Batch writes keys in BTreeMap-sorted order, so build the sorted key
1165        // list to map between journal locations and keys.
1166        let mut sorted_keys: Vec<sha256::Digest> = (1u64..ELEMENTS + 1)
1167            .map(|i| Sha256::hash(&i.to_be_bytes()))
1168            .collect();
1169        sorted_keys.sort();
1170        // Location 0: initial commit; locations 1..=ELEMENTS: Set ops in sorted
1171        // key order; location ELEMENTS+1: batch commit.
1172        // key_at_loc(L) = sorted_keys[L - 1] for 1 <= L <= ELEMENTS.
1173
1174        let mut batch = db.new_batch();
1175        for i in 1u64..ELEMENTS + 1 {
1176            let k = Sha256::hash(&i.to_be_bytes());
1177            let v = Sha256::fill(i as u8);
1178            batch = batch.set(k, v);
1179        }
1180        // The inactivity floor must cover both prune targets in this test.
1181        // Second prune request is at ELEMENTS / 2 + ITEMS_PER_SECTION * 2 - 1.
1182        let inactivity_floor = Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION * 2 - 1);
1183        let merkleized = batch.merkleize(&db, None, inactivity_floor);
1184        db.apply_batch(merkleized).await.unwrap();
1185        assert_eq!(db.bounds().await.end, ELEMENTS + 2);
1186
1187        // Prune the db to the first half of the operations.
1188        db.prune(Location::new((ELEMENTS + 2) / 2)).await.unwrap();
1189        let bounds = db.bounds().await;
1190        assert_eq!(bounds.end, ELEMENTS + 2);
1191
1192        // items_per_section is 5, so half should be exactly at a blob boundary, in which case
1193        // the actual pruning location should match the requested.
1194        let oldest_retained_loc = bounds.start;
1195        assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
1196
1197        // Try to fetch a pruned key (at location oldest_retained - 1).
1198        let pruned_key = sorted_keys[*oldest_retained_loc as usize - 2];
1199        assert!(db.get(&pruned_key).await.unwrap().is_none());
1200
1201        // Try to fetch unpruned key (at location oldest_retained).
1202        let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
1203        assert!(db.get(&unpruned_key).await.unwrap().is_some());
1204
1205        // Drop & reopen the db, making sure it has exactly the same state.
1206        let root = db.root();
1207        db.sync().await.unwrap();
1208        drop(db);
1209
1210        let mut db = open_db(context.child("second")).await;
1211        assert_eq!(root, db.root());
1212        let bounds = db.bounds().await;
1213        assert_eq!(bounds.end, ELEMENTS + 2);
1214        let oldest_retained_loc = bounds.start;
1215        assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
1216
1217        // Prune to a non-blob boundary.
1218        let loc = Location::new(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
1219        db.prune(loc).await.unwrap();
1220        // Actual boundary should be a multiple of 5.
1221        let oldest_retained_loc = db.bounds().await.start;
1222        assert_eq!(
1223            oldest_retained_loc,
1224            Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
1225        );
1226
1227        // Confirm boundary persists across restart.
1228        db.sync().await.unwrap();
1229        drop(db);
1230        let db = open_db(context.child("third")).await;
1231        let oldest_retained_loc = db.bounds().await.start;
1232        assert_eq!(
1233            oldest_retained_loc,
1234            Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
1235        );
1236
1237        // Try to fetch a key before the inactivity floor (not in snapshot after reopen).
1238        let floor_val = ELEMENTS / 2 + ITEMS_PER_SECTION * 2 - 1;
1239        let inactive_key = sorted_keys[floor_val as usize - 2];
1240        assert!(db.get(&inactive_key).await.unwrap().is_none());
1241
1242        // Try to fetch a key at the inactivity floor (in snapshot after reopen).
1243        let active_key = sorted_keys[floor_val as usize - 1];
1244        assert!(db.get(&active_key).await.unwrap().is_some());
1245
1246        // Confirm behavior of trying to create a proof of pruned items is as expected.
1247        let pruned_pos = ELEMENTS / 2;
1248        let proof_result = db
1249            .proof(Location::new(pruned_pos), NZU64!(pruned_pos + 100))
1250            .await;
1251        assert!(
1252            matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos)
1253        );
1254
1255        db.destroy().await.unwrap();
1256    }
1257
1258    pub(crate) async fn test_immutable_prune_beyond_floor<F: Family, V, C>(
1259        context: deterministic::Context,
1260        open_db: impl Fn(
1261            deterministic::Context,
1262        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1263    ) where
1264        V: ValueEncoding<Value = Digest>,
1265        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1266        C::Item: EncodeShared,
1267    {
1268        let mut db = open_db(context.child("test")).await;
1269
1270        // Test pruning empty database (floor=0, so prune(1) fails)
1271        let result = db.prune(Location::new(1)).await;
1272        assert!(
1273            matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, floor))
1274                if prune_loc == Location::new(1) && floor == Location::new(0))
1275        );
1276
1277        // Add key-value pairs and commit
1278        let k1 = Digest::from(*b"12345678901234567890123456789012");
1279        let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
1280        let k3 = Digest::from(*b"99999999999999999999999999999999");
1281        let v1 = Sha256::fill(1u8);
1282        let v2 = Sha256::fill(2u8);
1283        let v3 = Sha256::fill(3u8);
1284
1285        // First batch with floor=3 (the commit location).
1286        db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
1287            &db,
1288            None,
1289            Location::new(3),
1290        ))
1291        .await
1292        .unwrap();
1293
1294        // op_count is 4 (initial_commit, k1, k2, commit), last_commit is at location 3
1295        assert_eq!(*db.last_commit_loc, 3);
1296
1297        // Second batch with floor=5 (the new commit location).
1298        db.apply_batch(
1299            db.new_batch()
1300                .set(k3, v3)
1301                .merkleize(&db, None, Location::new(5)),
1302        )
1303        .await
1304        .unwrap();
1305
1306        // Test valid prune (3 <= floor of 5)
1307        assert!(db.prune(Location::new(3)).await.is_ok());
1308
1309        // Test pruning beyond inactivity floor
1310        let floor = db.inactivity_floor_loc();
1311        let beyond = floor + 1;
1312        let result = db.prune(beyond).await;
1313        assert!(
1314            matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, f))
1315                if prune_loc == beyond && f == floor)
1316        );
1317
1318        db.destroy().await.unwrap();
1319    }
1320
1321    async fn commit_sets<F: Family, V, C>(
1322        db: &mut TestDb<F, V, C>,
1323        sets: impl IntoIterator<Item = (Digest, V::Value)>,
1324        metadata: Option<V::Value>,
1325    ) -> Range<Location<F>>
1326    where
1327        V: ValueEncoding<Value = Digest>,
1328        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1329        C::Item: EncodeShared,
1330    {
1331        commit_sets_with_floor(db, sets, metadata, Location::new(0)).await
1332    }
1333
1334    async fn commit_sets_with_floor<F: Family, V, C>(
1335        db: &mut TestDb<F, V, C>,
1336        sets: impl IntoIterator<Item = (Digest, V::Value)>,
1337        metadata: Option<V::Value>,
1338        floor: Location<F>,
1339    ) -> Range<Location<F>>
1340    where
1341        V: ValueEncoding<Value = Digest>,
1342        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1343        C::Item: EncodeShared,
1344    {
1345        let mut batch = db.new_batch();
1346        for (key, value) in sets {
1347            batch = batch.set(key, value);
1348        }
1349        let range = db
1350            .apply_batch(batch.merkleize(db, metadata, floor))
1351            .await
1352            .unwrap();
1353        db.commit().await.unwrap();
1354        range
1355    }
1356
1357    pub(crate) async fn test_immutable_rewind_recovery<F: Family, V, C>(
1358        context: deterministic::Context,
1359        open_db: impl Fn(
1360            deterministic::Context,
1361        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1362    ) where
1363        V: ValueEncoding<Value = Digest>,
1364        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1365        C::Item: EncodeShared,
1366    {
1367        let mut db = open_db(context.child("db")).await;
1368
1369        let key1 = Sha256::hash(&1u64.to_be_bytes());
1370        let key2 = Sha256::hash(&2u64.to_be_bytes());
1371        let key3 = Sha256::hash(&3u64.to_be_bytes());
1372        let key4 = Sha256::hash(&4u64.to_be_bytes());
1373
1374        let value1 = Sha256::fill(11u8);
1375        let value2 = Sha256::fill(22u8);
1376        let value3 = Sha256::fill(33u8);
1377        let value4 = Sha256::fill(66u8);
1378
1379        let metadata_a = Sha256::fill(44u8);
1380        let first_range =
1381            commit_sets(&mut db, [(key1, value1), (key2, value2)], Some(metadata_a)).await;
1382        let size_before = db.bounds().await.end;
1383        let root_before = db.root();
1384        let last_commit_before = db.last_commit_loc;
1385        assert_eq!(size_before, first_range.end);
1386
1387        let metadata_b = Sha256::fill(55u8);
1388        let second_range =
1389            commit_sets(&mut db, [(key3, value3), (key4, value4)], Some(metadata_b)).await;
1390        assert_eq!(second_range.start, size_before);
1391        assert_ne!(db.root(), root_before);
1392        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_b));
1393        assert_eq!(db.get(&key3).await.unwrap(), Some(value3));
1394        assert_eq!(db.get(&key4).await.unwrap(), Some(value4));
1395
1396        db.rewind(size_before).await.unwrap();
1397        assert_eq!(db.root(), root_before);
1398        assert_eq!(db.bounds().await.end, size_before);
1399        assert_eq!(db.last_commit_loc, last_commit_before);
1400        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1401        assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1402        assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1403        assert_eq!(db.get(&key3).await.unwrap(), None);
1404        assert_eq!(db.get(&key4).await.unwrap(), None);
1405
1406        db.commit().await.unwrap();
1407        drop(db);
1408        let db = open_db(context.child("reopen")).await;
1409        assert_eq!(db.root(), root_before);
1410        assert_eq!(db.bounds().await.end, size_before);
1411        assert_eq!(db.last_commit_loc, last_commit_before);
1412        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1413        assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1414        assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1415        assert_eq!(db.get(&key3).await.unwrap(), None);
1416        assert_eq!(db.get(&key4).await.unwrap(), None);
1417
1418        db.destroy().await.unwrap();
1419    }
1420
1421    /// Regression: a key Set before the rewind boundary that translator-collides with a key in the
1422    /// rewound suffix must survive rewind. Earlier the snapshot remove pruned the entire translated
1423    /// bucket and dropped the retained key.
1424    pub(crate) async fn test_immutable_rewind_preserves_collision_bucket<F: Family, V, C>(
1425        context: deterministic::Context,
1426        open_db: impl Fn(
1427            deterministic::Context,
1428        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1429    ) where
1430        V: ValueEncoding<Value = Digest>,
1431        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1432        C::Item: EncodeShared,
1433    {
1434        let mut db = open_db(context.child("db")).await;
1435
1436        // Two keys sharing the first two bytes collide under TwoCap.
1437        let mut k1_bytes = [0u8; 32];
1438        let mut k2_bytes = [0u8; 32];
1439        k1_bytes[0] = 0xAA;
1440        k1_bytes[1] = 0xBB;
1441        k2_bytes[0] = 0xAA;
1442        k2_bytes[1] = 0xBB;
1443        k1_bytes[31] = 0x01;
1444        k2_bytes[31] = 0x02;
1445        let key1 = Digest::from(k1_bytes);
1446        let key2 = Digest::from(k2_bytes);
1447        let value1 = Sha256::fill(11u8);
1448        let value2 = Sha256::fill(22u8);
1449
1450        commit_sets(&mut db, [(key1, value1)], None).await;
1451        let size_after_first = db.bounds().await.end;
1452        commit_sets(&mut db, [(key2, value2)], None).await;
1453        assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1454        assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1455
1456        db.rewind(size_after_first).await.unwrap();
1457
1458        // The retained key must still be readable; pre-fix this returned None because the
1459        // translator bucket was wiped by the suffix-key remove.
1460        assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1461        assert_eq!(db.get(&key2).await.unwrap(), None);
1462
1463        db.destroy().await.unwrap();
1464    }
1465
1466    pub(crate) async fn test_immutable_rewind_pruned_target_errors<F: Family, V, C>(
1467        context: deterministic::Context,
1468        open_small_sections_db: impl Fn(
1469            deterministic::Context,
1470        )
1471            -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1472    ) where
1473        V: ValueEncoding<Value = Digest>,
1474        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1475        C::Item: EncodeShared,
1476    {
1477        let mut db = open_small_sections_db(context.child("db")).await;
1478
1479        let first_range = commit_sets(
1480            &mut db,
1481            (0u64..16).map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8))),
1482            None,
1483        )
1484        .await;
1485
1486        let mut round = 0u64;
1487        loop {
1488            round += 1;
1489            assert!(
1490                round <= 64,
1491                "failed to prune enough history for rewind test"
1492            );
1493
1494            // Floor must be >= last_commit_loc for prune to succeed.
1495            // With 16 sets, commit is at current end + 16.
1496            let floor = Location::new(*db.bounds().await.end + 16);
1497            commit_sets_with_floor(
1498                &mut db,
1499                (0u64..16).map(|i| {
1500                    let seed = round * 100 + i;
1501                    (Sha256::hash(&seed.to_be_bytes()), Sha256::fill(seed as u8))
1502                }),
1503                None,
1504                floor,
1505            )
1506            .await;
1507            db.prune(db.last_commit_loc).await.unwrap();
1508
1509            if db.bounds().await.start > first_range.start {
1510                break;
1511            }
1512        }
1513
1514        let oldest_retained = db.bounds().await.start;
1515        let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
1516        assert!(
1517            matches!(
1518                boundary_err,
1519                Error::Journal(crate::journal::Error::ItemPruned(_))
1520            ),
1521            "unexpected rewind error at retained boundary: {boundary_err:?}"
1522        );
1523
1524        let err = db.rewind(first_range.start).await.unwrap_err();
1525        assert!(
1526            matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1527            "unexpected rewind error: {err:?}"
1528        );
1529
1530        db.destroy().await.unwrap();
1531    }
1532
1533    /// batch.get() reads pending mutations and falls through to base DB.
1534    pub(crate) async fn test_immutable_batch_get_read_through<F: Family, V, C>(
1535        context: deterministic::Context,
1536        open_db: impl Fn(
1537            deterministic::Context,
1538        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1539    ) where
1540        V: ValueEncoding<Value = Digest>,
1541        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1542        C::Item: EncodeShared,
1543    {
1544        let mut db = open_db(context.child("db")).await;
1545
1546        // Pre-populate with key A.
1547        let key_a = Sha256::hash(&0u64.to_be_bytes());
1548        let val_a = Sha256::fill(1u8);
1549        db.apply_batch(
1550            db.new_batch()
1551                .set(key_a, val_a)
1552                .merkleize(&db, None, Location::new(0)),
1553        )
1554        .await
1555        .unwrap();
1556
1557        // batch.get(&A) should return DB value.
1558        let mut batch = db.new_batch();
1559        assert_eq!(batch.get(&key_a, &db).await.unwrap(), Some(val_a));
1560
1561        // Set B in batch, batch.get(&B) returns the value.
1562        let key_b = Sha256::hash(&1u64.to_be_bytes());
1563        let val_b = Sha256::fill(2u8);
1564        batch = batch.set(key_b, val_b);
1565        assert_eq!(batch.get(&key_b, &db).await.unwrap(), Some(val_b));
1566
1567        // Nonexistent key.
1568        let key_c = Sha256::hash(&2u64.to_be_bytes());
1569        assert_eq!(batch.get(&key_c, &db).await.unwrap(), None);
1570
1571        db.destroy().await.unwrap();
1572    }
1573
1574    /// Child batch reads parent diff and adds its own mutations.
1575    pub(crate) async fn test_immutable_batch_stacked_get<F: Family, V, C>(
1576        context: deterministic::Context,
1577        open_db: impl Fn(
1578            deterministic::Context,
1579        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1580    ) where
1581        V: ValueEncoding<Value = Digest>,
1582        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1583        C::Item: EncodeShared,
1584    {
1585        let db = open_db(context.child("db")).await;
1586
1587        // Parent batch: set A.
1588        let key_a = Sha256::hash(&0u64.to_be_bytes());
1589        let val_a = Sha256::fill(10u8);
1590        let parent = db.new_batch().set(key_a, val_a);
1591        let parent_m = parent.merkleize(&db, None, Location::new(0));
1592
1593        // Child reads parent's A.
1594        let mut child = parent_m.new_batch::<Sha256>();
1595        assert_eq!(child.get(&key_a, &db).await.unwrap(), Some(val_a));
1596
1597        // Child sets B.
1598        let key_b = Sha256::hash(&1u64.to_be_bytes());
1599        let val_b = Sha256::fill(20u8);
1600        child = child.set(key_b, val_b);
1601        assert_eq!(child.get(&key_b, &db).await.unwrap(), Some(val_b));
1602
1603        // Nonexistent key.
1604        let key_c = Sha256::hash(&2u64.to_be_bytes());
1605        assert_eq!(child.get(&key_c, &db).await.unwrap(), None);
1606
1607        db.destroy().await.unwrap();
1608    }
1609
1610    /// Two-level stacked batch apply works end-to-end.
1611    pub(crate) async fn test_immutable_batch_stacked_apply<F: Family, V, C>(
1612        context: deterministic::Context,
1613        open_db: impl Fn(
1614            deterministic::Context,
1615        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1616    ) where
1617        V: ValueEncoding<Value = Digest>,
1618        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1619        C::Item: EncodeShared,
1620    {
1621        let mut db = open_db(context.child("db")).await;
1622
1623        // Sort keys so operations are in BTreeMap order (same as merkleize writes).
1624        let mut kvs_first: Vec<(Digest, Digest)> = (0u64..5)
1625            .map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
1626            .collect();
1627        kvs_first.sort_by_key(|a| a.0);
1628
1629        let mut kvs_second: Vec<(Digest, Digest)> = (5u64..10)
1630            .map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
1631            .collect();
1632        kvs_second.sort_by_key(|a| a.0);
1633
1634        // Parent batch: set keys 0..5.
1635        let mut parent = db.new_batch();
1636        for (k, v) in &kvs_first {
1637            parent = parent.set(*k, *v);
1638        }
1639        let parent_m = parent.merkleize(&db, None, Location::new(0));
1640
1641        // Child batch: set keys 5..10.
1642        let mut child = parent_m.new_batch::<Sha256>();
1643        for (k, v) in &kvs_second {
1644            child = child.set(*k, *v);
1645        }
1646        let child_m = child.merkleize(&db, None, Location::new(0));
1647        let expected_root = child_m.root();
1648        db.apply_batch(child_m).await.unwrap();
1649
1650        assert_eq!(db.root(), expected_root);
1651
1652        // All 10 keys should be accessible.
1653        for (k, v) in kvs_first.iter().chain(kvs_second.iter()) {
1654            assert_eq!(db.get(k).await.unwrap(), Some(*v));
1655        }
1656
1657        db.destroy().await.unwrap();
1658    }
1659
1660    /// MerkleizedBatch::root() matches db.root() after apply_batch().
1661    pub(crate) async fn test_immutable_batch_speculative_root<F: Family, V, C>(
1662        context: deterministic::Context,
1663        open_db: impl Fn(
1664            deterministic::Context,
1665        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1666    ) where
1667        V: ValueEncoding<Value = Digest>,
1668        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1669        C::Item: EncodeShared,
1670    {
1671        let mut db = open_db(context.child("db")).await;
1672
1673        let mut batch = db.new_batch();
1674        for i in 0u8..10 {
1675            let k = Sha256::hash(&[i]);
1676            batch = batch.set(k, Sha256::fill(i));
1677        }
1678        let merkleized = batch.merkleize(&db, None, Location::new(0));
1679
1680        let speculative = merkleized.root();
1681        db.apply_batch(merkleized).await.unwrap();
1682        assert_eq!(db.root(), speculative);
1683
1684        // Second batch with metadata.
1685        let metadata = Some(Sha256::fill(55u8));
1686        let mut batch = db.new_batch();
1687        let k = Sha256::hash(&[0xAA]);
1688        batch = batch.set(k, Sha256::fill(0xAA));
1689        let merkleized = batch.merkleize(&db, metadata, Location::new(0));
1690        let speculative = merkleized.root();
1691        db.apply_batch(merkleized).await.unwrap();
1692        assert_eq!(db.root(), speculative);
1693
1694        db.destroy().await.unwrap();
1695    }
1696
1697    /// MerkleizedBatch::get() reads from diff and base DB.
1698    pub(crate) async fn test_immutable_merkleized_batch_get<F: Family, V, C>(
1699        context: deterministic::Context,
1700        open_db: impl Fn(
1701            deterministic::Context,
1702        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1703    ) where
1704        V: ValueEncoding<Value = Digest>,
1705        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1706        C::Item: EncodeShared,
1707    {
1708        let mut db = open_db(context.child("db")).await;
1709
1710        // Pre-populate base DB.
1711        let key_a = Sha256::hash(&0u64.to_be_bytes());
1712        let val_a = Sha256::fill(10u8);
1713        db.apply_batch(
1714            db.new_batch()
1715                .set(key_a, val_a)
1716                .merkleize(&db, None, Location::new(0)),
1717        )
1718        .await
1719        .unwrap();
1720
1721        // Create a merkleized batch with a new key.
1722        let key_b = Sha256::hash(&1u64.to_be_bytes());
1723        let val_b = Sha256::fill(20u8);
1724        let merkleized = db
1725            .new_batch()
1726            .set(key_b, val_b)
1727            .merkleize(&db, None, Location::new(0));
1728
1729        // Read base DB value through merkleized batch.
1730        assert_eq!(merkleized.get(&key_a, &db).await.unwrap(), Some(val_a));
1731
1732        // Read this batch's key from the diff.
1733        assert_eq!(merkleized.get(&key_b, &db).await.unwrap(), Some(val_b));
1734
1735        // Nonexistent key.
1736        let key_c = Sha256::hash(&2u64.to_be_bytes());
1737        assert_eq!(merkleized.get(&key_c, &db).await.unwrap(), None);
1738
1739        db.destroy().await.unwrap();
1740    }
1741
1742    /// Independent sequential batches applied one at a time.
1743    pub(crate) async fn test_immutable_batch_sequential_apply<F: Family, V, C>(
1744        context: deterministic::Context,
1745        open_db: impl Fn(
1746            deterministic::Context,
1747        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1748    ) where
1749        V: ValueEncoding<Value = Digest>,
1750        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1751        C::Item: EncodeShared,
1752    {
1753        let mut db = open_db(context.child("db")).await;
1754
1755        let key_a = Sha256::hash(&0u64.to_be_bytes());
1756        let val_a = Sha256::fill(1u8);
1757
1758        // First batch.
1759        let m = db
1760            .new_batch()
1761            .set(key_a, val_a)
1762            .merkleize(&db, None, Location::new(0));
1763        let root1 = m.root();
1764        db.apply_batch(m).await.unwrap();
1765        assert_eq!(db.root(), root1);
1766        assert_eq!(db.get(&key_a).await.unwrap(), Some(val_a));
1767
1768        // Second independent batch.
1769        let key_b = Sha256::hash(&1u64.to_be_bytes());
1770        let val_b = Sha256::fill(2u8);
1771        let m = db
1772            .new_batch()
1773            .set(key_b, val_b)
1774            .merkleize(&db, None, Location::new(0));
1775        let root2 = m.root();
1776        db.apply_batch(m).await.unwrap();
1777        assert_eq!(db.root(), root2);
1778        assert_eq!(db.get(&key_b).await.unwrap(), Some(val_b));
1779
1780        db.destroy().await.unwrap();
1781    }
1782
1783    /// Many sequential batches accumulate correctly.
1784    pub(crate) async fn test_immutable_batch_many_sequential<F: Family, V, C>(
1785        context: deterministic::Context,
1786        open_db: impl Fn(
1787            deterministic::Context,
1788        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1789    ) where
1790        V: ValueEncoding<Value = Digest>,
1791        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1792        C::Item: EncodeShared,
1793    {
1794        let mut db = open_db(context.child("db")).await;
1795        let hasher = qmdb::hasher::<Sha256>();
1796
1797        const BATCHES: u64 = 20;
1798        const KEYS_PER_BATCH: u64 = 5;
1799
1800        let mut all_kvs: Vec<(Digest, Digest)> = Vec::new();
1801
1802        for batch_idx in 0..BATCHES {
1803            let mut batch = db.new_batch();
1804            for j in 0..KEYS_PER_BATCH {
1805                let seed = batch_idx * 100 + j;
1806                let k = Sha256::hash(&seed.to_be_bytes());
1807                let v = Sha256::fill(seed as u8);
1808                batch = batch.set(k, v);
1809                all_kvs.push((k, v));
1810            }
1811            let merkleized = batch.merkleize(&db, None, Location::new(0));
1812            db.apply_batch(merkleized).await.unwrap();
1813        }
1814
1815        // Verify all key-values are readable.
1816        for (k, v) in &all_kvs {
1817            assert_eq!(db.get(k).await.unwrap(), Some(*v));
1818        }
1819
1820        // Verify proof over the full range.
1821        let root = db.root();
1822        let (proof, ops) = db.proof(Location::new(0), NZU64!(10000)).await.unwrap();
1823        assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1824
1825        // Expected: 1 initial commit + BATCHES * (KEYS_PER_BATCH + 1 commit).
1826        let expected = 1 + BATCHES * (KEYS_PER_BATCH + 1);
1827        assert_eq!(db.bounds().await.end, expected);
1828
1829        db.destroy().await.unwrap();
1830    }
1831
1832    /// Empty batch (zero mutations) produces correct speculative root.
1833    pub(crate) async fn test_immutable_batch_empty_batch<F: Family, V, C>(
1834        context: deterministic::Context,
1835        open_db: impl Fn(
1836            deterministic::Context,
1837        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1838    ) where
1839        V: ValueEncoding<Value = Digest>,
1840        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1841        C::Item: EncodeShared,
1842    {
1843        let mut db = open_db(context.child("db")).await;
1844
1845        // Apply a non-empty batch first.
1846        let k = Sha256::hash(&[1u8]);
1847        db.apply_batch(db.new_batch().set(k, Sha256::fill(1u8)).merkleize(
1848            &db,
1849            None,
1850            Location::new(0),
1851        ))
1852        .await
1853        .unwrap();
1854        let root_before = db.root();
1855        let size_before = db.bounds().await.end;
1856
1857        // Empty batch with no mutations.
1858        let merkleized = db.new_batch().merkleize(&db, None, Location::new(0));
1859        let speculative = merkleized.root();
1860        db.apply_batch(merkleized).await.unwrap();
1861
1862        // Root changed (a new Commit op was appended).
1863        assert_ne!(db.root(), root_before);
1864        assert_eq!(db.root(), speculative);
1865        // Size grew by exactly 1 (the Commit op).
1866        assert_eq!(db.bounds().await.end, size_before + 1);
1867
1868        db.destroy().await.unwrap();
1869    }
1870
1871    /// MerkleizedBatch::get() works on a chained child's merkleized batch.
1872    pub(crate) async fn test_immutable_batch_chained_merkleized_get<F: Family, V, C>(
1873        context: deterministic::Context,
1874        open_db: impl Fn(
1875            deterministic::Context,
1876        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1877    ) where
1878        V: ValueEncoding<Value = Digest>,
1879        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1880        C::Item: EncodeShared,
1881    {
1882        let mut db = open_db(context.child("db")).await;
1883
1884        // Pre-populate base DB.
1885        let key_a = Sha256::hash(&0u64.to_be_bytes());
1886        let val_a = Sha256::fill(10u8);
1887        db.apply_batch(
1888            db.new_batch()
1889                .set(key_a, val_a)
1890                .merkleize(&db, None, Location::new(0)),
1891        )
1892        .await
1893        .unwrap();
1894
1895        // Parent batch sets key B.
1896        let key_b = Sha256::hash(&1u64.to_be_bytes());
1897        let val_b = Sha256::fill(1u8);
1898        let parent_m = db
1899            .new_batch()
1900            .set(key_b, val_b)
1901            .merkleize(&db, None, Location::new(0));
1902
1903        // Child batch sets key C.
1904        let key_c = Sha256::hash(&2u64.to_be_bytes());
1905        let val_c = Sha256::fill(2u8);
1906        let child_m =
1907            parent_m
1908                .new_batch::<Sha256>()
1909                .set(key_c, val_c)
1910                .merkleize(&db, None, Location::new(0));
1911
1912        // Child's MerkleizedBatch can read all three layers:
1913        // base DB value
1914        assert_eq!(child_m.get(&key_a, &db).await.unwrap(), Some(val_a));
1915        // parent diff value
1916        assert_eq!(child_m.get(&key_b, &db).await.unwrap(), Some(val_b));
1917        // child's own value
1918        assert_eq!(child_m.get(&key_c, &db).await.unwrap(), Some(val_c));
1919        // nonexistent key
1920        let key_d = Sha256::hash(&3u64.to_be_bytes());
1921        assert_eq!(child_m.get(&key_d, &db).await.unwrap(), None);
1922
1923        db.destroy().await.unwrap();
1924    }
1925
1926    /// Large single batch, verifying all values and proof.
1927    pub(crate) async fn test_immutable_batch_large<F: Family, V, C>(
1928        context: deterministic::Context,
1929        open_db: impl Fn(
1930            deterministic::Context,
1931        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1932    ) where
1933        V: ValueEncoding<Value = Digest>,
1934        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1935        C::Item: EncodeShared,
1936    {
1937        let mut db = open_db(context.child("db")).await;
1938        let hasher = qmdb::hasher::<Sha256>();
1939
1940        const N: u64 = 500;
1941        let mut kvs: Vec<(Digest, Digest)> = Vec::new();
1942
1943        let mut batch = db.new_batch();
1944        for i in 0..N {
1945            let k = Sha256::hash(&i.to_be_bytes());
1946            let v = Sha256::fill((i % 256) as u8);
1947            batch = batch.set(k, v);
1948            kvs.push((k, v));
1949        }
1950        let merkleized = batch.merkleize(&db, None, Location::new(0));
1951        db.apply_batch(merkleized).await.unwrap();
1952
1953        // Verify every value.
1954        for (k, v) in &kvs {
1955            assert_eq!(db.get(k).await.unwrap(), Some(*v));
1956        }
1957
1958        // Verify proof over the full range.
1959        let root = db.root();
1960        let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1961        assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1962
1963        // Expected: 1 initial commit + N sets + 1 commit.
1964        assert_eq!(db.bounds().await.end, 1 + N + 1);
1965
1966        db.destroy().await.unwrap();
1967    }
1968
1969    /// Child batch overrides same key set by parent.
1970    pub(crate) async fn test_immutable_batch_chained_key_override<F: Family, V, C>(
1971        context: deterministic::Context,
1972        open_db: impl Fn(
1973            deterministic::Context,
1974        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1975    ) where
1976        V: ValueEncoding<Value = Digest>,
1977        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1978        C::Item: EncodeShared,
1979    {
1980        let mut db = open_db(context.child("db")).await;
1981
1982        let key = Sha256::hash(&0u64.to_be_bytes());
1983        let val_parent = Sha256::fill(1u8);
1984        let val_child = Sha256::fill(2u8);
1985
1986        // Parent sets key.
1987        let parent_m = db
1988            .new_batch()
1989            .set(key, val_parent)
1990            .merkleize(&db, None, Location::new(0));
1991
1992        // Child overrides same key.
1993        let mut child = parent_m.new_batch::<Sha256>();
1994        child = child.set(key, val_child);
1995
1996        // Child's pending mutation wins over parent diff.
1997        assert_eq!(child.get(&key, &db).await.unwrap(), Some(val_child));
1998
1999        let child_m = child.merkleize(&db, None, Location::new(0));
2000
2001        // After merkleize, child's diff wins.
2002        assert_eq!(child_m.get(&key, &db).await.unwrap(), Some(val_child));
2003
2004        // Apply and verify.
2005        db.apply_batch(child_m).await.unwrap();
2006        assert_eq!(db.get(&key).await.unwrap(), Some(val_child));
2007
2008        db.destroy().await.unwrap();
2009    }
2010
2011    /// Same key set across two sequential applied batches. The immutable DB
2012    /// keeps all versions -- `get()` returns the earliest non-pruned value.
2013    /// After pruning the first version, `get()` returns the second.
2014    ///
2015    /// `open_db_small_sections` must return a DB whose log has `items_per_section=1`
2016    /// so pruning is per-item.
2017    pub(crate) async fn test_immutable_batch_sequential_key_override<F: Family, V, C>(
2018        context: deterministic::Context,
2019        open_db_small_sections: impl Fn(
2020            deterministic::Context,
2021        )
2022            -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2023    ) where
2024        V: ValueEncoding<Value = Digest>,
2025        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2026        C::Item: EncodeShared,
2027    {
2028        let mut db = open_db_small_sections(context.child("db")).await;
2029
2030        let key = Sha256::hash(&0u64.to_be_bytes());
2031        let v1 = Sha256::fill(1u8);
2032        let v2 = Sha256::fill(2u8);
2033
2034        // First batch sets key.
2035        // Layout: 0=initial commit, 1=Set(key,v1), 2=Commit
2036        db.apply_batch(
2037            db.new_batch()
2038                .set(key, v1)
2039                .merkleize(&db, None, Location::new(0)),
2040        )
2041        .await
2042        .unwrap();
2043        assert_eq!(db.get(&key).await.unwrap(), Some(v1));
2044
2045        // Second batch sets same key to different value.
2046        // Layout continues: 3=Set(key,v2), 4=Commit
2047        // Floor=4 so that prune(2) succeeds (2 <= 4).
2048        db.apply_batch(
2049            db.new_batch()
2050                .set(key, v2)
2051                .merkleize(&db, None, Location::new(4)),
2052        )
2053        .await
2054        .unwrap();
2055
2056        // Immutable DB returns the earliest non-pruned value.
2057        assert_eq!(db.get(&key).await.unwrap(), Some(v1));
2058
2059        // Prune past the first Set (loc 1). With items_per_section=1,
2060        // pruning to loc 2 should remove the blob containing loc 1.
2061        db.prune(Location::new(2)).await.unwrap();
2062        assert_eq!(db.get(&key).await.unwrap(), Some(v2));
2063
2064        db.destroy().await.unwrap();
2065    }
2066
2067    /// Metadata propagates through merkleize and clears with None.
2068    pub(crate) async fn test_immutable_batch_metadata<F: Family, V, C>(
2069        context: deterministic::Context,
2070        open_db: impl Fn(
2071            deterministic::Context,
2072        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2073    ) where
2074        V: ValueEncoding<Value = Digest>,
2075        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2076        C::Item: EncodeShared,
2077    {
2078        let mut db = open_db(context.child("db")).await;
2079
2080        // Batch with metadata.
2081        let metadata = Sha256::fill(42u8);
2082        let k = Sha256::hash(&[1u8]);
2083        db.apply_batch(db.new_batch().set(k, Sha256::fill(1u8)).merkleize(
2084            &db,
2085            Some(metadata),
2086            Location::new(0),
2087        ))
2088        .await
2089        .unwrap();
2090        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
2091
2092        // Second batch clears metadata.
2093        db.apply_batch(db.new_batch().merkleize(&db, None, Location::new(0)))
2094            .await
2095            .unwrap();
2096        assert_eq!(db.get_metadata().await.unwrap(), None);
2097
2098        db.destroy().await.unwrap();
2099    }
2100
2101    pub(crate) async fn test_immutable_stale_batch_rejected<F: Family, V, C>(
2102        context: deterministic::Context,
2103        open_db: impl Fn(
2104            deterministic::Context,
2105        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2106    ) where
2107        V: ValueEncoding<Value = Digest>,
2108        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2109        C::Item: EncodeShared,
2110    {
2111        let mut db = open_db(context.child("db")).await;
2112
2113        let key1 = Sha256::hash(&[1]);
2114        let key2 = Sha256::hash(&[2]);
2115        let v1 = Sha256::fill(10u8);
2116        let v2 = Sha256::fill(20u8);
2117
2118        // Create two batches from the same DB state.
2119        let batch_a = db
2120            .new_batch()
2121            .set(key1, v1)
2122            .merkleize(&db, None, Location::new(0));
2123        let batch_b = db
2124            .new_batch()
2125            .set(key2, v2)
2126            .merkleize(&db, None, Location::new(0));
2127
2128        // Apply the first -- should succeed.
2129        db.apply_batch(batch_a).await.unwrap();
2130        let expected_root = db.root();
2131        let expected_bounds = db.bounds().await;
2132        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2133        assert_eq!(db.get(&key2).await.unwrap(), None);
2134        assert_eq!(db.get_metadata().await.unwrap(), None);
2135
2136        // Apply the second -- should fail because the DB was modified.
2137        let result = db.apply_batch(batch_b).await;
2138        assert!(
2139            matches!(result, Err(Error::StaleBatch { .. })),
2140            "expected StaleBatch error, got {result:?}"
2141        );
2142        assert_eq!(db.root(), expected_root);
2143        assert_eq!(db.bounds().await, expected_bounds);
2144        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2145        assert_eq!(db.get(&key2).await.unwrap(), None);
2146        assert_eq!(db.get_metadata().await.unwrap(), None);
2147
2148        db.destroy().await.unwrap();
2149    }
2150
2151    pub(crate) async fn test_immutable_stale_batch_chained<F: Family, V, C>(
2152        context: deterministic::Context,
2153        open_db: impl Fn(
2154            deterministic::Context,
2155        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2156    ) where
2157        V: ValueEncoding<Value = Digest>,
2158        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2159        C::Item: EncodeShared,
2160    {
2161        let mut db = open_db(context.child("db")).await;
2162
2163        let key1 = Sha256::hash(&[1]);
2164        let key2 = Sha256::hash(&[2]);
2165        let key3 = Sha256::hash(&[3]);
2166
2167        // Parent batch.
2168        let parent_m =
2169            db.new_batch()
2170                .set(key1, Sha256::fill(1u8))
2171                .merkleize(&db, None, Location::new(0));
2172
2173        // Fork two children from the same parent.
2174        let child_a = parent_m
2175            .new_batch::<Sha256>()
2176            .set(key2, Sha256::fill(2u8))
2177            .merkleize(&db, None, Location::new(0));
2178        let child_b = parent_m
2179            .new_batch::<Sha256>()
2180            .set(key3, Sha256::fill(3u8))
2181            .merkleize(&db, None, Location::new(0));
2182
2183        // Apply child A.
2184        db.apply_batch(child_a).await.unwrap();
2185
2186        // Child B is stale.
2187        let result = db.apply_batch(child_b).await;
2188        assert!(
2189            matches!(result, Err(Error::StaleBatch { .. })),
2190            "expected StaleBatch error, got {result:?}"
2191        );
2192
2193        db.destroy().await.unwrap();
2194    }
2195
2196    pub(crate) async fn test_immutable_partial_ancestor_commit<F: Family, V, C>(
2197        context: deterministic::Context,
2198        open_db: impl Fn(
2199            deterministic::Context,
2200        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2201    ) where
2202        V: ValueEncoding<Value = Digest>,
2203        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2204        C::Item: EncodeShared,
2205    {
2206        let mut db = open_db(context.child("db")).await;
2207
2208        let key1 = Sha256::hash(&[1]);
2209        let key2 = Sha256::hash(&[2]);
2210        let key3 = Sha256::hash(&[3]);
2211        let v1 = Sha256::fill(1u8);
2212        let v2 = Sha256::fill(2u8);
2213        let v3 = Sha256::fill(3u8);
2214
2215        // Chain: DB <- A <- B <- C
2216        let a = db
2217            .new_batch()
2218            .set(key1, v1)
2219            .merkleize(&db, None, Location::new(0));
2220        let b = a
2221            .new_batch::<Sha256>()
2222            .set(key2, v2)
2223            .merkleize(&db, None, Location::new(0));
2224        let c = b
2225            .new_batch::<Sha256>()
2226            .set(key3, v3)
2227            .merkleize(&db, None, Location::new(0));
2228
2229        let expected_root = c.root();
2230
2231        // Apply only A, then apply C directly (B uncommitted).
2232        db.apply_batch(a).await.unwrap();
2233        db.apply_batch(c).await.unwrap();
2234
2235        assert_eq!(db.root(), expected_root);
2236        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2237        assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2238        assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
2239
2240        db.destroy().await.unwrap();
2241    }
2242
2243    pub(crate) async fn test_immutable_sequential_commit_parent_then_child<F: Family, V, C>(
2244        context: deterministic::Context,
2245        open_db: impl Fn(
2246            deterministic::Context,
2247        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2248    ) where
2249        V: ValueEncoding<Value = Digest>,
2250        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2251        C::Item: EncodeShared,
2252    {
2253        let mut db = open_db(context.child("db")).await;
2254
2255        let key1 = Sha256::hash(&[1]);
2256        let key2 = Sha256::hash(&[2]);
2257        let v1 = Sha256::fill(1u8);
2258        let v2 = Sha256::fill(2u8);
2259
2260        // Parent batch.
2261        let parent_m = db
2262            .new_batch()
2263            .set(key1, v1)
2264            .merkleize(&db, None, Location::new(0));
2265
2266        // Child batch built on parent.
2267        let child_m =
2268            parent_m
2269                .new_batch::<Sha256>()
2270                .set(key2, v2)
2271                .merkleize(&db, None, Location::new(0));
2272
2273        // Apply parent first, then child. This is a valid sequential commit.
2274        db.apply_batch(parent_m).await.unwrap();
2275        db.apply_batch(child_m).await.unwrap();
2276
2277        // Both keys present.
2278        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2279        assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2280
2281        db.destroy().await.unwrap();
2282    }
2283
2284    pub(crate) async fn test_immutable_child_root_matches_pending_and_committed<F: Family, V, C>(
2285        context: deterministic::Context,
2286        open_db: impl Fn(
2287            deterministic::Context,
2288        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2289    ) where
2290        V: ValueEncoding<Value = Digest>,
2291        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2292        C::Item: EncodeShared,
2293    {
2294        let mut db = open_db(context.child("db")).await;
2295
2296        let key1 = Sha256::hash(&[1]);
2297        let key2 = Sha256::hash(&[2]);
2298
2299        // Build the child while the parent is still pending.
2300        let parent =
2301            db.new_batch()
2302                .set(key1, Sha256::fill(1u8))
2303                .merkleize(&db, None, Location::new(0));
2304        let pending_child = parent
2305            .new_batch::<Sha256>()
2306            .set(key2, Sha256::fill(2u8))
2307            .merkleize(&db, None, Location::new(0));
2308
2309        // Commit the parent, then rebuild the same logical child from the
2310        // committed DB state and compare roots.
2311        db.apply_batch(parent).await.unwrap();
2312        db.commit().await.unwrap();
2313
2314        let committed_child =
2315            db.new_batch()
2316                .set(key2, Sha256::fill(2u8))
2317                .merkleize(&db, None, Location::new(0));
2318
2319        assert_eq!(pending_child.root(), committed_child.root());
2320
2321        db.destroy().await.unwrap();
2322    }
2323
2324    pub(crate) async fn test_immutable_stale_batch_child_applied_before_parent<F: Family, V, C>(
2325        context: deterministic::Context,
2326        open_db: impl Fn(
2327            deterministic::Context,
2328        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2329    ) where
2330        V: ValueEncoding<Value = Digest>,
2331        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2332        C::Item: EncodeShared,
2333    {
2334        let mut db = open_db(context.child("db")).await;
2335
2336        let key1 = Sha256::hash(&[1]);
2337        let key2 = Sha256::hash(&[2]);
2338
2339        // Parent batch.
2340        let parent_m =
2341            db.new_batch()
2342                .set(key1, Sha256::fill(1u8))
2343                .merkleize(&db, None, Location::new(0));
2344
2345        // Child batch.
2346        let child_m = parent_m
2347            .new_batch::<Sha256>()
2348            .set(key2, Sha256::fill(2u8))
2349            .merkleize(&db, None, Location::new(0));
2350
2351        // Apply child first (it carries all parent ops too).
2352        db.apply_batch(child_m).await.unwrap();
2353
2354        // Parent is stale.
2355        let result = db.apply_batch(parent_m).await;
2356        assert!(
2357            matches!(result, Err(Error::StaleBatch { .. })),
2358            "expected StaleBatch error, got {result:?}"
2359        );
2360
2361        db.destroy().await.unwrap();
2362    }
2363
2364    /// to_batch() creates an owned snapshot whose root matches the committed DB.
2365    /// A child batch chained from it can be applied.
2366    pub(crate) async fn test_immutable_to_batch<F: Family, V, C>(
2367        context: deterministic::Context,
2368        open_db: impl Fn(
2369            deterministic::Context,
2370        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2371    ) where
2372        V: ValueEncoding<Value = Digest>,
2373        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2374        C::Item: EncodeShared,
2375    {
2376        let mut db = open_db(context.child("db")).await;
2377
2378        // Populate.
2379        let key1 = Sha256::hash(&[1]);
2380        let v1 = Sha256::fill(10u8);
2381        db.apply_batch(
2382            db.new_batch()
2383                .set(key1, v1)
2384                .merkleize(&db, None, Location::new(0)),
2385        )
2386        .await
2387        .unwrap();
2388
2389        // to_batch root matches committed root.
2390        let snapshot = db.to_batch();
2391        assert_eq!(snapshot.root(), db.root());
2392
2393        // Chain a child from the snapshot, apply it.
2394        let key2 = Sha256::hash(&[2]);
2395        let v2 = Sha256::fill(20u8);
2396        let child =
2397            snapshot
2398                .new_batch::<Sha256>()
2399                .set(key2, v2)
2400                .merkleize(&db, None, Location::new(0));
2401        db.apply_batch(child).await.unwrap();
2402
2403        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2404        assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2405
2406        db.destroy().await.unwrap();
2407    }
2408
2409    /// Regression: applying a batch after its ancestor Arc is dropped (without
2410    /// committing) must still apply the ancestor's snapshot diffs.
2411    pub(crate) async fn test_immutable_apply_after_ancestor_dropped<F: Family, V, C>(
2412        context: deterministic::Context,
2413        open_db: impl Fn(
2414            deterministic::Context,
2415        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2416    ) where
2417        V: ValueEncoding<Value = Digest>,
2418        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2419        C::Item: EncodeShared,
2420    {
2421        let mut db = open_db(context.child("db")).await;
2422
2423        let key1 = Sha256::hash(&[1]);
2424        let key2 = Sha256::hash(&[2]);
2425        let key3 = Sha256::hash(&[3]);
2426        let v1 = Sha256::fill(1u8);
2427        let v2 = Sha256::fill(2u8);
2428        let v3 = Sha256::fill(3u8);
2429
2430        // Chain: DB <- A <- B <- C
2431        let a = db
2432            .new_batch()
2433            .set(key1, v1)
2434            .merkleize(&db, None, Location::new(0));
2435        let b = a
2436            .new_batch::<Sha256>()
2437            .set(key2, v2)
2438            .merkleize(&db, None, Location::new(0));
2439        let c = b
2440            .new_batch::<Sha256>()
2441            .set(key3, v3)
2442            .merkleize(&db, None, Location::new(0));
2443
2444        // Drop A and B without committing. Their Weak refs in C are now dead.
2445        drop(a);
2446        drop(b);
2447
2448        // Apply only the tip. This is !skip_ancestors (DB hasn't changed).
2449        db.apply_batch(c).await.unwrap();
2450
2451        // All three keys must be in the snapshot.
2452        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2453        assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2454        assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
2455
2456        db.destroy().await.unwrap();
2457    }
2458
2459    /// Verify the inactivity floor is zero for a fresh empty database and is
2460    /// correctly set after applying batches with specific floor values.
2461    pub(crate) async fn test_immutable_inactivity_floor_tracking<F: Family, V, C>(
2462        context: deterministic::Context,
2463        open_db: impl Fn(
2464            deterministic::Context,
2465        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2466    ) where
2467        V: ValueEncoding<Value = Digest>,
2468        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2469        C::Item: EncodeShared,
2470    {
2471        let mut db = open_db(context.child("test")).await;
2472
2473        // Empty DB has floor=0.
2474        assert_eq!(db.inactivity_floor_loc(), Location::new(0));
2475
2476        // Apply batch with floor=0, floor stays 0.
2477        let k1 = Sha256::fill(1u8);
2478        let v1 = Sha256::fill(2u8);
2479        db.apply_batch(
2480            db.new_batch()
2481                .set(k1, v1)
2482                .merkleize(&db, None, Location::new(0)),
2483        )
2484        .await
2485        .unwrap();
2486        assert_eq!(db.inactivity_floor_loc(), Location::new(0));
2487
2488        // Apply batch with floor=3, floor advances.
2489        let k2 = Sha256::fill(3u8);
2490        let v2 = Sha256::fill(4u8);
2491        db.apply_batch(
2492            db.new_batch()
2493                .set(k2, v2)
2494                .merkleize(&db, None, Location::new(3)),
2495        )
2496        .await
2497        .unwrap();
2498        assert_eq!(db.inactivity_floor_loc(), Location::new(3));
2499
2500        // Floor persists across restart.
2501        db.commit().await.unwrap();
2502        db.sync().await.unwrap();
2503        drop(db);
2504        let db = open_db(context.child("reopen")).await;
2505        assert_eq!(db.inactivity_floor_loc(), Location::new(3));
2506
2507        db.destroy().await.unwrap();
2508    }
2509
2510    /// Verify that applying a batch with a floor equal to the current floor succeeds,
2511    /// and that a higher floor also succeeds.
2512    pub(crate) async fn test_immutable_floor_monotonicity<F: Family, V, C>(
2513        context: deterministic::Context,
2514        open_db: impl Fn(
2515            deterministic::Context,
2516        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2517    ) where
2518        V: ValueEncoding<Value = Digest>,
2519        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2520        C::Item: EncodeShared,
2521    {
2522        let mut db = open_db(context.child("test")).await;
2523
2524        // DB starts with 1 op (initial commit).
2525        // First batch: 1 set + 1 commit = total_size 3. Use floor=2 (the commit loc).
2526        let k1 = Sha256::fill(1u8);
2527        let v1 = Sha256::fill(2u8);
2528        db.apply_batch(
2529            db.new_batch()
2530                .set(k1, v1)
2531                .merkleize(&db, None, Location::new(2)),
2532        )
2533        .await
2534        .unwrap();
2535        assert_eq!(db.inactivity_floor_loc(), Location::new(2));
2536
2537        // Same floor is OK. Second batch: 1 set + 1 commit = total_size 5. floor=2 < 5.
2538        let k2 = Sha256::fill(3u8);
2539        let v2 = Sha256::fill(4u8);
2540        db.apply_batch(
2541            db.new_batch()
2542                .set(k2, v2)
2543                .merkleize(&db, None, Location::new(2)),
2544        )
2545        .await
2546        .unwrap();
2547        assert_eq!(db.inactivity_floor_loc(), Location::new(2));
2548
2549        // Higher floor also succeeds. Third batch: 1 set + 1 commit = total_size 7. floor=5 < 7.
2550        let k3 = Sha256::fill(5u8);
2551        let v3 = Sha256::fill(6u8);
2552        db.apply_batch(
2553            db.new_batch()
2554                .set(k3, v3)
2555                .merkleize(&db, None, Location::new(5)),
2556        )
2557        .await
2558        .unwrap();
2559        assert_eq!(db.inactivity_floor_loc(), Location::new(5));
2560
2561        db.destroy().await.unwrap();
2562    }
2563
2564    /// Verify that the inactivity floor is correctly restored after a rewind.
2565    pub(crate) async fn test_immutable_rewind_restores_floor<F: Family, V, C>(
2566        context: deterministic::Context,
2567        open_db: impl Fn(
2568            deterministic::Context,
2569        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2570    ) where
2571        V: ValueEncoding<Value = Digest>,
2572        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2573        C::Item: EncodeShared,
2574    {
2575        let mut db = open_db(context.child("test")).await;
2576
2577        // Apply first batch with floor=2.
2578        let k1 = Sha256::fill(1u8);
2579        let v1 = Sha256::fill(2u8);
2580        db.apply_batch(
2581            db.new_batch()
2582                .set(k1, v1)
2583                .merkleize(&db, None, Location::new(2)),
2584        )
2585        .await
2586        .unwrap();
2587        db.commit().await.unwrap();
2588        let first_size = db.bounds().await.end;
2589        assert_eq!(db.inactivity_floor_loc(), Location::new(2));
2590
2591        // Apply second batch with floor=4 (the new commit's location).
2592        let k2 = Sha256::fill(3u8);
2593        let v2 = Sha256::fill(4u8);
2594        db.apply_batch(
2595            db.new_batch()
2596                .set(k2, v2)
2597                .merkleize(&db, None, Location::new(4)),
2598        )
2599        .await
2600        .unwrap();
2601        db.commit().await.unwrap();
2602        assert_eq!(db.inactivity_floor_loc(), Location::new(4));
2603
2604        // Rewind to the first batch.
2605        db.rewind(first_size).await.unwrap();
2606        assert_eq!(db.inactivity_floor_loc(), Location::new(2));
2607
2608        db.destroy().await.unwrap();
2609    }
2610
2611    /// Verify that applying a batch with a floor lower than the current floor
2612    /// returns an error.
2613    pub(crate) async fn test_immutable_floor_monotonicity_violation<F: Family, V, C>(
2614        context: deterministic::Context,
2615        open_db: impl Fn(
2616            deterministic::Context,
2617        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2618    ) where
2619        V: ValueEncoding<Value = Digest>,
2620        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2621        C::Item: EncodeShared,
2622    {
2623        let mut db = open_db(context.child("test")).await;
2624
2625        // DB starts with 1 op. First batch: 1 set + 1 commit = total_size 3. floor=2.
2626        let k1 = Sha256::fill(1u8);
2627        let v1 = Sha256::fill(2u8);
2628        db.apply_batch(
2629            db.new_batch()
2630                .set(k1, v1)
2631                .merkleize(&db, None, Location::new(2)),
2632        )
2633        .await
2634        .unwrap();
2635
2636        // Apply batch with floor=1 (regression). Should return an error.
2637        let k2 = Sha256::fill(3u8);
2638        let v2 = Sha256::fill(4u8);
2639        let result = db
2640            .apply_batch(
2641                db.new_batch()
2642                    .set(k2, v2)
2643                    .merkleize(&db, None, Location::new(1)),
2644            )
2645            .await;
2646        assert!(matches!(result, Err(Error::FloorRegressed(new, current))
2647                if new == Location::new(1) && current == Location::new(2)));
2648
2649        db.destroy().await.unwrap();
2650    }
2651
2652    /// Verify that applying a batch with a floor beyond the total operation
2653    /// count returns an error.
2654    pub(crate) async fn test_immutable_floor_beyond_size<F: Family, V, C>(
2655        context: deterministic::Context,
2656        open_db: impl Fn(
2657            deterministic::Context,
2658        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2659    ) where
2660        V: ValueEncoding<Value = Digest>,
2661        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2662        C::Item: EncodeShared,
2663    {
2664        let mut db = open_db(context.child("test")).await;
2665
2666        // DB has 1 op (initial commit). A batch with 1 set + 1 commit = total_size 3.
2667        // Setting floor=100 exceeds total_size.
2668        let k1 = Sha256::fill(1u8);
2669        let v1 = Sha256::fill(2u8);
2670        let result = db
2671            .apply_batch(
2672                db.new_batch()
2673                    .set(k1, v1)
2674                    .merkleize(&db, None, Location::new(100)),
2675            )
2676            .await;
2677        assert!(matches!(result, Err(Error::FloorBeyondSize(floor, commit))
2678                if floor == Location::new(100) && commit == Location::new(2)));
2679
2680        // Boundary: floor == total_size must also be rejected. The commit op is
2681        // at total_size - 1, so a floor equal to total_size would allow a later
2682        // prune to remove the commit and leave the db unrecoverable.
2683        let k2 = Sha256::fill(3u8);
2684        let v2 = Sha256::fill(4u8);
2685        let result = db
2686            .apply_batch(
2687                db.new_batch()
2688                    .set(k2, v2)
2689                    .merkleize(&db, None, Location::new(3)),
2690            )
2691            .await;
2692        assert!(matches!(result, Err(Error::FloorBeyondSize(floor, commit))
2693                if floor == Location::new(3) && commit == Location::new(2)));
2694
2695        // Floor == total_size - 1 (the commit location) is the maximum valid.
2696        db.apply_batch(
2697            db.new_batch()
2698                .set(k2, v2)
2699                .merkleize(&db, None, Location::new(2)),
2700        )
2701        .await
2702        .unwrap();
2703
2704        db.destroy().await.unwrap();
2705    }
2706
2707    /// A chained batch where an *ancestor* declares a floor below the previous ancestor's
2708    /// floor must be rejected, even when that ancestor's floor is still at or above the
2709    /// database's current floor. This isolates the cross-batch monotonicity step (every
2710    /// commit's floor at or above the previous commit's floor) from the simpler "every
2711    /// commit's floor at or above the live floor" rule.
2712    pub(crate) async fn test_immutable_chained_ancestor_floor_regression<F: Family, V, C>(
2713        context: deterministic::Context,
2714        open_db: impl Fn(
2715            deterministic::Context,
2716        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2717    ) where
2718        V: ValueEncoding<Value = Digest>,
2719        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2720        C::Item: EncodeShared,
2721    {
2722        let mut db = open_db(context.child("test")).await;
2723
2724        // Live floor is 0 (from the seeded initial commit).
2725        // a: 1 set + commit at loc 2, floor=2 (valid: >= 0, == commit_loc).
2726        // b: 1 set + commit at loc 4, floor=1 (regresses below a's floor=2, but still >= 0).
2727        // c: 1 set + commit at loc 6, floor=2 (would be valid in isolation).
2728        // Applying c must fail at b with FloorRegressed(1, 2) -- not at b vs the live floor,
2729        // and not at c, proving the per-ancestor running floor is what catches it.
2730        let a = db
2731            .new_batch()
2732            .set(Sha256::fill(1u8), Sha256::fill(2u8))
2733            .merkleize(&db, None, Location::new(2));
2734        let b = a
2735            .new_batch::<Sha256>()
2736            .set(Sha256::fill(3u8), Sha256::fill(4u8))
2737            .merkleize(&db, None, Location::new(1));
2738        let c = b
2739            .new_batch::<Sha256>()
2740            .set(Sha256::fill(5u8), Sha256::fill(6u8))
2741            .merkleize(&db, None, Location::new(2));
2742
2743        let root_before = db.root();
2744        let last_commit_before = db.last_commit_loc;
2745        let floor_before = db.inactivity_floor_loc();
2746
2747        let err = db.apply_batch(c).await.unwrap_err();
2748        assert!(
2749            matches!(err, Error::FloorRegressed(new, prev)
2750                if new == Location::new(1) && prev == Location::new(2)),
2751            "unexpected error: {err:?}"
2752        );
2753
2754        // Database state must be unchanged on a rejected chain.
2755        assert_eq!(db.root(), root_before);
2756        assert_eq!(db.last_commit_loc, last_commit_before);
2757        assert_eq!(db.inactivity_floor_loc(), floor_before);
2758
2759        db.destroy().await.unwrap();
2760    }
2761
2762    /// A chained batch where an *ancestor's* floor exceeds its own commit location must be
2763    /// rejected, identifying the ancestor's commit_loc (not the tip's). This is the more
2764    /// dangerous variant: monotonicity can still be satisfied while the floor poisons future
2765    /// `historical_proof` and rewind.
2766    pub(crate) async fn test_immutable_chained_ancestor_floor_beyond_size<F: Family, V, C>(
2767        context: deterministic::Context,
2768        open_db: impl Fn(
2769            deterministic::Context,
2770        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2771    ) where
2772        V: ValueEncoding<Value = Digest>,
2773        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2774        C::Item: EncodeShared,
2775    {
2776        let mut db = open_db(context.child("test")).await;
2777
2778        // a: 1 set + commit at loc 2; declare floor=3 (one past the commit -- invalid).
2779        // b: tip valid on its own (floor=0 <= b's commit_loc), but a's floor is bad.
2780        let a = db
2781            .new_batch()
2782            .set(Sha256::fill(1u8), Sha256::fill(2u8))
2783            .merkleize(&db, None, Location::new(3));
2784        let b = a
2785            .new_batch::<Sha256>()
2786            .set(Sha256::fill(3u8), Sha256::fill(4u8))
2787            .merkleize(&db, None, Location::new(0));
2788
2789        let root_before = db.root();
2790        let last_commit_before = db.last_commit_loc;
2791        let floor_before = db.inactivity_floor_loc();
2792
2793        let err = db.apply_batch(b).await.unwrap_err();
2794        // The error must identify the ancestor's commit_loc (2), not the tip's (4).
2795        assert!(
2796            matches!(err, Error::FloorBeyondSize(floor, commit)
2797                if floor == Location::new(3) && commit == Location::new(2)),
2798            "unexpected error: {err:?}"
2799        );
2800
2801        // Database state must be unchanged on a rejected chain.
2802        assert_eq!(db.root(), root_before);
2803        assert_eq!(db.last_commit_loc, last_commit_before);
2804        assert_eq!(db.inactivity_floor_loc(), floor_before);
2805
2806        db.destroy().await.unwrap();
2807    }
2808
2809    /// Regression test for rewind-after-reopen with floor change.
2810    ///
2811    /// After reopening a database (which rebuilds the snapshot from the latest
2812    /// floor), rewinding to an earlier commit with a lower floor must restore
2813    /// all keys that were live at the rewind target -- not just the ones that
2814    /// happened to be in the rebuilt snapshot.
2815    pub(crate) async fn test_immutable_rewind_after_reopen_with_floor_change<F: Family, V, C>(
2816        context: deterministic::Context,
2817        open_db: impl Fn(
2818            deterministic::Context,
2819        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2820    ) where
2821        V: ValueEncoding<Value = Digest>,
2822        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2823        C::Item: EncodeShared,
2824    {
2825        let mut db = open_db(context.child("first")).await;
2826
2827        let k1 = Sha256::fill(1u8);
2828        let k2 = Sha256::fill(2u8);
2829        let k3 = Sha256::fill(3u8);
2830        let v1 = Sha256::fill(11u8);
2831        let v2 = Sha256::fill(12u8);
2832        let v3 = Sha256::fill(13u8);
2833
2834        // Commit A: 3 keys with floor=0.
2835        commit_sets(&mut db, [(k1, v1), (k2, v2), (k3, v3)], None).await;
2836        let first_size = db.bounds().await.end;
2837        let first_root = db.root();
2838
2839        // Commit B: 3 more keys with floor=first_size (declares batch A inactive).
2840        let k4 = Sha256::fill(4u8);
2841        let k5 = Sha256::fill(5u8);
2842        let k6 = Sha256::fill(6u8);
2843        let v4 = Sha256::fill(14u8);
2844        let v5 = Sha256::fill(15u8);
2845        let v6 = Sha256::fill(16u8);
2846        commit_sets_with_floor(&mut db, [(k4, v4), (k5, v5), (k6, v6)], None, first_size).await;
2847        db.sync().await.unwrap();
2848
2849        // Reopen: snapshot rebuilt from floor=first_size, batch A keys excluded.
2850        drop(db);
2851        let mut db = open_db(context.child("second")).await;
2852
2853        // Verify batch A keys are NOT in the reopened snapshot (expected).
2854        assert!(db.get(&k1).await.unwrap().is_none());
2855
2856        // Rewind to commit A.
2857        db.rewind(first_size).await.unwrap();
2858
2859        // All batch A keys must be accessible after rewind.
2860        assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
2861        assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
2862        assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
2863        assert_eq!(db.root(), first_root);
2864        assert_eq!(db.inactivity_floor_loc(), Location::new(0));
2865
2866        // Batch B keys must NOT be accessible.
2867        assert!(db.get(&k4).await.unwrap().is_none());
2868
2869        db.destroy().await.unwrap();
2870    }
2871
2872    /// Regression test: rewind-after-reopen where the rewind target is NOT the
2873    /// immediate predecessor. This ensures the snapshot gap fill only covers
2874    /// [rewind_floor, old_floor) and does not re-insert keys already present.
2875    pub(crate) async fn test_immutable_rewind_after_reopen_partial_floor_gap<F: Family, V, C>(
2876        context: deterministic::Context,
2877        open_db: impl Fn(
2878            deterministic::Context,
2879        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2880    ) where
2881        V: ValueEncoding<Value = Digest>,
2882        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2883        C::Item: EncodeShared,
2884    {
2885        let mut db = open_db(context.child("first")).await;
2886
2887        let k1 = Sha256::fill(1u8);
2888        let v1 = Sha256::fill(11u8);
2889
2890        // Commit A: 1 key, floor=0.
2891        commit_sets(&mut db, [(k1, v1)], None).await;
2892        let first_size = db.bounds().await.end;
2893        let first_root = db.root();
2894
2895        // Commit B: 1 key, floor=first_size.
2896        let k2 = Sha256::fill(2u8);
2897        let v2 = Sha256::fill(12u8);
2898        commit_sets_with_floor(&mut db, [(k2, v2)], None, first_size).await;
2899        let second_size = db.bounds().await.end;
2900
2901        // Commit C: 1 key, floor=second_size. This raises the floor
2902        // above commit B's keys, so reopen excludes both A and B keys.
2903        let k3 = Sha256::fill(3u8);
2904        let v3 = Sha256::fill(13u8);
2905        commit_sets_with_floor(&mut db, [(k3, v3)], None, second_size).await;
2906        db.sync().await.unwrap();
2907
2908        // Reopen: snapshot rebuilt from floor=second_size. Only k3 is in snapshot.
2909        drop(db);
2910        let mut db = open_db(context.child("second")).await;
2911        assert!(db.get(&k1).await.unwrap().is_none());
2912        assert!(db.get(&k2).await.unwrap().is_none());
2913        assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
2914
2915        // Rewind to commit B (not A). The gap fill should add keys from
2916        // [first_size, second_size) -- which includes k2 but not k1.
2917        // k3 is in the suffix and gets removed. k2 from the gap gets inserted.
2918        db.rewind(second_size).await.unwrap();
2919        assert!(db.get(&k1).await.unwrap().is_none()); // below B's floor
2920        assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
2921        assert!(db.get(&k3).await.unwrap().is_none()); // in suffix, removed
2922
2923        // Now rewind further to commit A.
2924        db.rewind(first_size).await.unwrap();
2925        assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
2926        assert!(db.get(&k2).await.unwrap().is_none()); // above first_size, truncated
2927        assert_eq!(db.root(), first_root);
2928        assert_eq!(db.inactivity_floor_loc(), Location::new(0));
2929
2930        db.destroy().await.unwrap();
2931    }
2932
2933    /// Regression: rewind-after-reopen with a repeated key in the floor gap.
2934    /// The gap-fill must maintain the same ordering as the live `apply_batch`
2935    /// path so `get()` returns the same value.
2936    pub(crate) async fn test_immutable_rewind_after_reopen_repeated_key_gap<F: Family, V, C>(
2937        context: deterministic::Context,
2938        open_db: impl Fn(
2939            deterministic::Context,
2940        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2941    ) where
2942        V: ValueEncoding<Value = Digest>,
2943        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2944        C::Item: EncodeShared,
2945    {
2946        let mut db = open_db(context.child("first")).await;
2947
2948        let key = Sha256::fill(7u8);
2949        let v1 = Sha256::fill(17u8);
2950        let v2 = Sha256::fill(18u8);
2951        let k3 = Sha256::fill(8u8);
2952        let v3 = Sha256::fill(19u8);
2953
2954        // Commit A: Set(key, v1) with floor=0.
2955        commit_sets(&mut db, [(key, v1)], None).await;
2956
2957        // Commit B: Set(key, v2) with floor=0. get() returns v1 (earliest).
2958        commit_sets(&mut db, [(key, v2)], None).await;
2959        let second_size = db.bounds().await.end;
2960        assert_eq!(db.get(&key).await.unwrap(), Some(v1));
2961
2962        // Commit C: raises floor above both earlier writes.
2963        commit_sets_with_floor(&mut db, [(k3, v3)], None, second_size).await;
2964        db.sync().await.unwrap();
2965
2966        // Reopen: snapshot rebuilt from floor=second_size, key excluded.
2967        drop(db);
2968        let mut db = open_db(context.child("second")).await;
2969        assert!(db.get(&key).await.unwrap().is_none());
2970        assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
2971
2972        // Rewind to commit B: gap fill re-inserts both Set(key,...) entries.
2973        db.rewind(second_size).await.unwrap();
2974        assert_eq!(db.get(&key).await.unwrap(), Some(v1));
2975
2976        db.destroy().await.unwrap();
2977    }
2978
2979    /// Regression: after restart, the snapshot can contain the newer write for
2980    /// a repeated key. If rewind restores an older write for that same key, the
2981    /// older write must be checked first, matching the pre-restart snapshot.
2982    pub(crate) async fn test_immutable_rewind_after_reopen_mixed_gap_retained<F: Family, V, C>(
2983        context: deterministic::Context,
2984        open_db: impl Fn(
2985            deterministic::Context,
2986        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2987    ) where
2988        V: ValueEncoding<Value = Digest>,
2989        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2990        C::Item: EncodeShared,
2991    {
2992        let mut db = open_db(context.child("first")).await;
2993
2994        let key = Sha256::fill(7u8);
2995        let v1 = Sha256::fill(17u8);
2996        let v2 = Sha256::fill(18u8);
2997        let k3 = Sha256::fill(8u8);
2998        let v3 = Sha256::fill(19u8);
2999
3000        // Commit A: Set(key, v1) at loc=0, floor=0.
3001        commit_sets(&mut db, [(key, v1)], None).await;
3002        let first_size = db.bounds().await.end;
3003
3004        // Commit B: Set(key, v2), floor=0. get() returns v1 (earliest).
3005        commit_sets(&mut db, [(key, v2)], None).await;
3006        let second_size = db.bounds().await.end;
3007        assert_eq!(db.get(&key).await.unwrap(), Some(v1));
3008
3009        // Commit C: raises floor to first_size, so loc=0 is below floor but
3010        // loc for v2 is retained.
3011        commit_sets_with_floor(&mut db, [(k3, v3)], None, first_size).await;
3012        db.sync().await.unwrap();
3013
3014        // Reopen: snapshot rebuilt from floor=first_size. The v2 write for key
3015        // is retained; the v1 write is excluded.
3016        drop(db);
3017        let mut db = open_db(context.child("second")).await;
3018        assert_eq!(db.get(&key).await.unwrap(), Some(v2));
3019
3020        // Rewind to commit B: gap fill re-inserts the v1 write. The older
3021        // write must appear before the retained v2 entry so get() returns v1.
3022        db.rewind(second_size).await.unwrap();
3023        assert_eq!(db.get(&key).await.unwrap(), Some(v1));
3024
3025        db.destroy().await.unwrap();
3026    }
3027
3028    /// After committing with `floor = commit_loc` and pruning down to it, the live set is
3029    /// exactly one operation — the commit itself. This is the minimum non-empty live set
3030    /// achievable under the per-commit bound. The DB must remain fully usable:
3031    ///
3032    /// - `prune(commit_loc + 1)` is rejected (the floor is a hard ceiling).
3033    /// - `prune` does not affect the root (documented invariant).
3034    /// - Reopen reconstructs `inactivity_floor_loc` from the sole surviving commit op, and the
3035    ///   in-memory snapshot is empty (all Sets were below the floor).
3036    /// - A follow-on batch applies cleanly on top from the floor-at-max state.
3037    pub(crate) async fn test_immutable_single_commit_live_set<F: Family, V, C>(
3038        context: deterministic::Context,
3039        open_db: impl Fn(
3040            deterministic::Context,
3041        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
3042    ) where
3043        V: ValueEncoding<Value = Digest>,
3044        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
3045        C::Item: EncodeShared,
3046    {
3047        let mut db = open_db(context.child("test")).await;
3048
3049        // Initial commit is at loc 0. 3 sets + 1 commit → commit lands at loc 4.
3050        // Declare floor = 4 (= commit_loc), the tight maximum.
3051        let metadata = Sha256::fill(42u8);
3052        let commit_loc = Location::<F>::new(4);
3053        let k1 = Sha256::fill(1u8);
3054        let k2 = Sha256::fill(2u8);
3055        let k3 = Sha256::fill(3u8);
3056        let v1 = Sha256::fill(11u8);
3057        let v2 = Sha256::fill(12u8);
3058        let v3 = Sha256::fill(13u8);
3059        db.apply_batch(
3060            db.new_batch()
3061                .set(k1, v1)
3062                .set(k2, v2)
3063                .set(k3, v3)
3064                .merkleize(&db, Some(metadata), commit_loc),
3065        )
3066        .await
3067        .unwrap();
3068        db.commit().await.unwrap();
3069        assert_eq!(db.last_commit_loc, commit_loc);
3070        assert_eq!(db.inactivity_floor_loc(), commit_loc);
3071        let root_after_commit = db.root();
3072
3073        // All three keys are in the in-memory snapshot pre-prune.
3074        assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
3075        assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
3076        assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
3077
3078        // Prune at the floor — the maximum prune allowed.
3079        // Pruning is blob-aligned, so `bounds.start` may not physically advance all the way
3080        // to `commit_loc`; what matters semantically is that the floor authorizes pruning
3081        // of everything below the commit and that any further prune is rejected.
3082        db.prune(commit_loc).await.unwrap();
3083        let bounds = db.bounds().await;
3084        assert!(
3085            bounds.start <= commit_loc,
3086            "prune must not advance bounds.start past the floor"
3087        );
3088        assert_eq!(bounds.end, Location::new(*commit_loc + 1));
3089
3090        // Pruning one past the floor must be rejected — the floor is the hard ceiling.
3091        let err = db.prune(Location::new(*commit_loc + 1)).await.unwrap_err();
3092        assert!(matches!(err, Error::PruneBeyondMinRequired(p, f)
3093                if *p == *commit_loc + 1 && *f == *commit_loc));
3094
3095        // State preserved across the prune; root unchanged; commit metadata still readable.
3096        assert_eq!(db.last_commit_loc, commit_loc);
3097        assert_eq!(db.inactivity_floor_loc(), commit_loc);
3098        assert_eq!(db.root(), root_after_commit);
3099        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
3100
3101        // Persist and reopen. `init_from_journal` rebuilds the snapshot by replaying from
3102        // the floor (= commit_loc). The only op at/above the floor is the commit, which
3103        // contributes no keys — so the rebuilt snapshot is empty.
3104        db.sync().await.unwrap();
3105        drop(db);
3106        let mut db = open_db(context.child("reopened")).await;
3107        assert_eq!(db.last_commit_loc, commit_loc);
3108        assert_eq!(db.inactivity_floor_loc(), commit_loc);
3109        assert_eq!(db.root(), root_after_commit);
3110        // The commit op at `commit_loc` is the anchor that survived pruning — its metadata
3111        // must come back through `get_metadata` after the snapshot rebuild.
3112        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
3113
3114        // Keys set below the floor are excluded from the rebuilt snapshot.
3115        assert!(db.get(&k1).await.unwrap().is_none());
3116        assert!(db.get(&k2).await.unwrap().is_none());
3117        assert!(db.get(&k3).await.unwrap().is_none());
3118
3119        // A follow-on batch applies on top. Monotonicity requires the new floor to be at
3120        // least `commit_loc` (= 4); advancing to the new tight max (= 6) exercises the
3121        // floor-at-max → new-batch transition.
3122        let k4 = Sha256::fill(4u8);
3123        let v4 = Sha256::fill(14u8);
3124        let next_commit_loc = Location::<F>::new(6);
3125        db.apply_batch(
3126            db.new_batch()
3127                .set(k4, v4)
3128                .merkleize(&db, None, next_commit_loc),
3129        )
3130        .await
3131        .unwrap();
3132        db.commit().await.unwrap();
3133        assert_eq!(db.last_commit_loc, next_commit_loc);
3134        assert_eq!(db.inactivity_floor_loc(), next_commit_loc);
3135
3136        // New key readable; keys from the pre-prune batch remain excluded.
3137        assert_eq!(db.get(&k4).await.unwrap(), Some(v4));
3138        assert!(db.get(&k1).await.unwrap().is_none());
3139        // Follow-on commit replaced the anchor: its metadata was `None`, so `get_metadata`
3140        // should no longer return the original metadata.
3141        assert_eq!(db.get_metadata().await.unwrap(), None);
3142
3143        db.destroy().await.unwrap();
3144    }
3145
3146    /// `get_many` on the DB and on unmerkleized/merkleized batches returns results
3147    /// that match individual `get` calls.
3148    pub(crate) async fn test_immutable_get_many<F: Family, V, C>(
3149        context: deterministic::Context,
3150        open_db: impl Fn(
3151            deterministic::Context,
3152        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
3153    ) where
3154        V: ValueEncoding<Value = Digest>,
3155        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
3156        C::Item: EncodeShared,
3157    {
3158        let mut db = open_db(context.child("db")).await;
3159
3160        let k1 = Sha256::fill(1u8);
3161        let k2 = Sha256::fill(2u8);
3162        let k3 = Sha256::fill(3u8);
3163        let k_missing = Sha256::fill(99u8);
3164
3165        let v1 = Sha256::fill(11u8);
3166        let v2 = Sha256::fill(12u8);
3167        let v3 = Sha256::fill(13u8);
3168
3169        // Commit k1 and k2 to disk.
3170        db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
3171            &db,
3172            None,
3173            db.inactivity_floor_loc(),
3174        ))
3175        .await
3176        .unwrap();
3177        db.commit().await.unwrap();
3178
3179        // DB-level get_many.
3180        let results = db.get_many(&[&k1, &k2, &k_missing]).await.unwrap();
3181        assert_eq!(results, vec![Some(v1), Some(v2), None]);
3182
3183        // Empty input.
3184        let results = db.get_many(&([] as [&Digest; 0])).await.unwrap();
3185        assert!(results.is_empty());
3186
3187        // Unmerkleized batch: mutations + DB fallthrough.
3188        let batch = db.new_batch().set(k3, v3);
3189        let results = batch.get_many(&[&k3, &k1, &k_missing], &db).await.unwrap();
3190        assert_eq!(results, vec![Some(v3), Some(v1), None]);
3191
3192        // Merkleized batch: diff + parent chain + DB fallthrough.
3193        let parent = db
3194            .new_batch()
3195            .set(k3, v3)
3196            .merkleize(&db, None, db.inactivity_floor_loc());
3197        let results = parent.get_many(&[&k1, &k3, &k_missing], &db).await.unwrap();
3198        assert_eq!(results, vec![Some(v1), Some(v3), None]);
3199
3200        // Child of merkleized parent reads parent diff.
3201        let v3_new = Sha256::fill(30u8);
3202        let child = parent.new_batch::<Sha256>().set(k3, v3_new);
3203        let results = child.get_many(&[&k1, &k3, &k_missing], &db).await.unwrap();
3204        assert_eq!(results, vec![Some(v1), Some(v3_new), None]);
3205
3206        db.destroy().await.unwrap();
3207    }
3208
3209    /// `get_many` reports unexpected data when the snapshot points at a non-`Set` operation.
3210    pub(crate) async fn test_immutable_get_many_unexpected_data<F: Family, V, C>(
3211        context: deterministic::Context,
3212        open_db: impl Fn(
3213            deterministic::Context,
3214        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
3215    ) where
3216        V: ValueEncoding<Value = Digest>,
3217        C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
3218        C::Item: EncodeShared,
3219    {
3220        let mut db = open_db(context.child("db")).await;
3221
3222        let key = Sha256::fill(1u8);
3223        let value = Sha256::fill(11u8);
3224        db.apply_batch(db.new_batch().set(key, value).merkleize(
3225            &db,
3226            None,
3227            db.inactivity_floor_loc(),
3228        ))
3229        .await
3230        .unwrap();
3231        db.commit().await.unwrap();
3232
3233        let bad_key = Sha256::fill(99u8);
3234        let bad_loc = db.last_commit_loc;
3235        db.snapshot.insert(&bad_key, bad_loc);
3236
3237        let err = db.get(&bad_key).await.unwrap_err();
3238        assert!(matches!(err, Error::UnexpectedData(loc) if loc == bad_loc));
3239
3240        let err = db.get_many(&[&bad_key]).await.unwrap_err();
3241        assert!(matches!(err, Error::UnexpectedData(loc) if loc == bad_loc));
3242
3243        db.destroy().await.unwrap();
3244    }
3245}