Skip to main content

commonware_storage/qmdb/immutable/
mod.rs

1//! An authenticated database that only supports adding new keyed values (no updates or
2//! deletions).
3//!
4//! Two variants are available based on value encoding:
5//! - [fixed]: For fixed-size values.
6//! - [variable]: For variable-size values.
7//!
8//! # Examples
9//!
10//! ```ignore
11//! // Simple mode: apply a batch, then durably commit it.
12//! let merkleized = db.new_batch()
13//!     .set(key, value)
14//!     .merkleize(&db, None);
15//! db.apply_batch(merkleized).await?;
16//! db.commit().await?;
17//! ```
18//!
19//! ```ignore
20//! // Batches can still fork before you apply them.
21//! let parent = db.new_batch()
22//!     .set(key_a, value_a)
23//!     .merkleize(&db, None);
24//!
25//! let child_a = parent.new_batch::<Sha256>()
26//!     .set(key_b, value_b)
27//!     .merkleize(&db, None);
28//!
29//! let child_b = parent.new_batch::<Sha256>()
30//!     .set(key_c, value_c)
31//!     .merkleize(&db, None);
32//!
33//! db.apply_batch(child_a).await?;
34//! db.commit().await?;
35//! ```
36//!
37//! ```ignore
38//! // Advanced mode: while the previous batch is being committed, build exactly
39//! // one child batch from the newly published state.
40//! let parent = db.new_batch()
41//!     .set(key_a, value_a)
42//!     .merkleize(&db, None);
43//! db.apply_batch(parent).await?;
44//!
45//! let (child, commit_result) = futures::join!(
46//!     async {
47//!         db.new_batch()
48//!             .set(key_b, value_b)
49//!             .merkleize(&db, None)
50//!     },
51//!     db.commit(),
52//! );
53//! commit_result?;
54//!
55//! db.apply_batch(child).await?;
56//! db.commit().await?;
57//! ```
58
59use crate::{
60    index::{unordered::Index, Unordered as _},
61    journal::{
62        authenticated,
63        contiguous::{Contiguous, Mutable, Reader},
64        Error as JournalError,
65    },
66    merkle::{journaled::Config as MmrConfig, Family, Location, Proof},
67    qmdb::{any::ValueEncoding, build_snapshot_from_log, delete_known_loc, operation::Key, Error},
68    translator::Translator,
69    Context, Persistable,
70};
71use commonware_codec::EncodeShared;
72use commonware_cryptography::Hasher as CHasher;
73use std::{collections::BTreeSet, num::NonZeroU64, ops::Range, sync::Arc};
74use tracing::warn;
75
76pub mod batch;
77pub mod fixed;
78mod operation;
79pub mod sync;
80pub mod variable;
81
82pub use operation::Operation;
83
84/// Configuration for an [Immutable] authenticated db.
85#[derive(Clone)]
86pub struct Config<T: Translator, J> {
87    /// Configuration for the Merkle structure backing the authenticated journal.
88    pub merkle_config: MmrConfig,
89
90    /// Configuration for the operations log journal.
91    pub log: J,
92
93    /// The translator used by the compressed index.
94    pub translator: T,
95}
96
97/// An authenticated database that only supports adding new keyed values (no updates or
98/// deletions).
99///
100/// # Invariant
101///
102/// A key must be set at most once across the database history. Writing the same key more than
103/// once is undefined behavior.
104///
105/// Use [fixed::Db] or [variable::Db] for concrete instantiations.
106pub struct Immutable<
107    F: Family,
108    E: Context,
109    K: Key,
110    V: ValueEncoding,
111    C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
112    H: CHasher,
113    T: Translator,
114> where
115    C::Item: EncodeShared,
116{
117    /// Authenticated journal of operations.
118    pub(crate) journal: authenticated::Journal<F, E, C, H>,
119
120    /// A map from each active key to the location of the operation that set its value.
121    ///
122    /// # Invariant
123    ///
124    /// Only references operations of type [Operation::Set].
125    pub(crate) snapshot: Index<T, Location<F>>,
126
127    /// The location of the last commit operation.
128    pub(crate) last_commit_loc: Location<F>,
129}
130
131// Shared read-only functionality.
132impl<F, E, K, V, C, H, T> Immutable<F, E, K, V, C, H, T>
133where
134    F: Family,
135    E: Context,
136    K: Key,
137    V: ValueEncoding,
138    C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
139    C::Item: EncodeShared,
140    H: CHasher,
141    T: Translator,
142{
143    /// Initialize from a pre-constructed authenticated journal.
144    ///
145    /// Seeds an initial commit if the journal is empty, builds the in-memory snapshot,
146    /// and returns the initialized database.
147    pub(crate) async fn init_from_journal(
148        mut journal: authenticated::Journal<F, E, C, H>,
149        context: E,
150        translator: T,
151    ) -> Result<Self, Error<F>> {
152        if journal.size().await == 0 {
153            warn!("Authenticated log is empty, initialized new db.");
154            journal.append(&Operation::Commit(None)).await?;
155            journal.sync().await?;
156        }
157
158        let mut snapshot = Index::new(context.with_label("snapshot"), translator);
159
160        let last_commit_loc = {
161            // Get the start of the log.
162            let reader = journal.journal.reader().await;
163            let start_loc = Location::new(reader.bounds().start);
164
165            // Build snapshot from the log.
166            build_snapshot_from_log::<F, _, _, _>(start_loc, &reader, &mut snapshot, |_, _| {})
167                .await?;
168
169            Location::new(
170                reader
171                    .bounds()
172                    .end
173                    .checked_sub(1)
174                    .expect("commit should exist"),
175            )
176        };
177
178        Ok(Self {
179            journal,
180            snapshot,
181            last_commit_loc,
182        })
183    }
184
185    /// Return the Location of the next operation appended to this db.
186    pub async fn size(&self) -> Location<F> {
187        self.bounds().await.end
188    }
189
190    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
191    /// retained operations respectively.
192    pub async fn bounds(&self) -> Range<Location<F>> {
193        let bounds = self.journal.reader().await.bounds();
194        Location::new(bounds.start)..Location::new(bounds.end)
195    }
196
197    /// Get the value of `key` in the db, or None if it has no value or its corresponding operation
198    /// has been pruned.
199    pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error<F>> {
200        let iter = self.snapshot.get(key);
201        let reader = self.journal.reader().await;
202        let oldest = reader.bounds().start;
203        for &loc in iter {
204            if loc < oldest {
205                continue;
206            }
207            if let Some(v) = Self::get_from_loc(&reader, key, loc).await? {
208                return Ok(Some(v));
209            }
210        }
211
212        Ok(None)
213    }
214
215    /// Get the value of the operation with location `loc` in the db if it matches `key`. Returns
216    /// [`crate::qmdb::Error::OperationPruned`] if loc precedes the oldest retained location. The
217    /// location is otherwise assumed valid.
218    async fn get_from_loc(
219        reader: &impl Reader<Item = Operation<K, V>>,
220        key: &K,
221        loc: Location<F>,
222    ) -> Result<Option<V::Value>, Error<F>> {
223        if loc < reader.bounds().start {
224            return Err(Error::OperationPruned(loc));
225        }
226
227        let Operation::Set(k, v) = reader.read(*loc).await? else {
228            return Err(Error::UnexpectedData(loc));
229        };
230
231        if k != *key {
232            Ok(None)
233        } else {
234            Ok(Some(v))
235        }
236    }
237
238    /// Get the metadata associated with the last commit.
239    pub async fn get_metadata(&self) -> Result<Option<V::Value>, Error<F>> {
240        let last_commit_loc = self.last_commit_loc;
241        let Operation::Commit(metadata) = self
242            .journal
243            .journal
244            .reader()
245            .await
246            .read(*last_commit_loc)
247            .await?
248        else {
249            unreachable!("no commit operation at location of last commit {last_commit_loc}");
250        };
251
252        Ok(metadata)
253    }
254
255    /// Analogous to proof but with respect to the state of the database when it had `op_count`
256    /// operations.
257    ///
258    /// # Errors
259    ///
260    /// Returns [crate::merkle::Error::LocationOverflow] if `op_count` or `start_loc` >
261    /// [crate::merkle::Family::MAX_LEAVES].
262    /// Returns [crate::merkle::Error::RangeOutOfBounds] if `op_count` > number of operations, or
263    /// if `start_loc` >= `op_count`.
264    /// Returns [`Error::OperationPruned`] if `start_loc` has been pruned.
265    pub async fn historical_proof(
266        &self,
267        op_count: Location<F>,
268        start_loc: Location<F>,
269        max_ops: NonZeroU64,
270    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<K, V>>), Error<F>> {
271        Ok(self
272            .journal
273            .historical_proof(op_count, start_loc, max_ops)
274            .await?)
275    }
276
277    /// Generate and return:
278    ///  1. a proof of all operations applied to the db in the range starting at (and including)
279    ///     location `start_loc`, and ending at the first of either:
280    ///     - the last operation performed, or
281    ///     - the operation `max_ops` from the start.
282    ///  2. the operations corresponding to the leaves in this range.
283    pub async fn proof(
284        &self,
285        start_index: Location<F>,
286        max_ops: NonZeroU64,
287    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<K, V>>), Error<F>> {
288        let op_count = self.bounds().await.end;
289        self.historical_proof(op_count, start_index, max_ops).await
290    }
291
292    /// Prune operations prior to `prune_loc`. This does not affect the db's root, but it will
293    /// affect retrieval of any keys that were set prior to `prune_loc`.
294    ///
295    /// # Errors
296    ///
297    /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > last commit location.
298    /// - Returns [crate::merkle::Error::LocationOverflow] if `prune_loc` > [crate::merkle::Family::MAX_LEAVES].
299    pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
300        if loc > self.last_commit_loc {
301            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
302        }
303        self.journal.prune(loc).await?;
304
305        Ok(())
306    }
307
308    /// Rewind the database to `size` operations, where `size` is the location of the next append.
309    ///
310    /// This rewinds both the operations journal and its Merkle structure to the historical
311    /// state at `size`, and removes rewound set operations from the in-memory snapshot.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error when:
316    /// - `size` is not a valid rewind target
317    /// - the target's required logical range is not fully retained (for immutable, this means the
318    ///   oldest retained location is already beyond the rewind boundary)
319    /// - `size - 1` is not a commit operation
320    ///
321    /// Any error from this method is fatal for this handle. Rewind may mutate journal state
322    /// before this method finishes rebuilding in-memory rewind state. Callers must drop this
323    /// database handle after any `Err` from `rewind` and reopen from storage.
324    ///
325    /// A successful rewind is not restart-stable until a subsequent [`Immutable::commit`] or
326    /// [`Immutable::sync`].
327    pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
328        let rewind_size = *size;
329        let current_size = *self.last_commit_loc + 1;
330        if rewind_size == current_size {
331            return Ok(());
332        }
333        if rewind_size == 0 || rewind_size > current_size {
334            return Err(Error::Journal(crate::journal::Error::InvalidRewind(
335                rewind_size,
336            )));
337        }
338
339        let (rewind_last_loc, rewound_sets) = {
340            let reader = self.journal.reader().await;
341            let bounds = reader.bounds();
342            let rewind_last_loc = Location::new(rewind_size - 1);
343            if rewind_size <= bounds.start {
344                return Err(Error::Journal(crate::journal::Error::ItemPruned(
345                    *rewind_last_loc,
346                )));
347            }
348            let rewind_last_op = reader.read(*rewind_last_loc).await?;
349            if !matches!(rewind_last_op, Operation::Commit(_)) {
350                return Err(Error::UnexpectedData(rewind_last_loc));
351            }
352
353            // Immutable operations do not have an inactivity floor (`Operation::has_floor()`
354            // is always `None` here). Rewind validity is determined by retained-location bounds
355            // and commit-target checks. Duplicate keys are unsupported and undefined behavior.
356            let mut rewound_sets = Vec::new();
357            for loc in rewind_size..current_size {
358                if let Operation::Set(key, _) = reader.read(loc).await? {
359                    rewound_sets.push((Location::new(loc), key));
360                }
361            }
362
363            (rewind_last_loc, rewound_sets)
364        };
365
366        // Journal rewind happens before in-memory snapshot updates. If a later step fails, this
367        // handle may be internally diverged and must be dropped by the caller.
368        self.journal.rewind(rewind_size).await?;
369        for (loc, key) in rewound_sets {
370            delete_known_loc(&mut self.snapshot, &key, loc);
371        }
372        self.last_commit_loc = rewind_last_loc;
373
374        Ok(())
375    }
376
377    /// Return the root of the db.
378    pub fn root(&self) -> H::Digest {
379        self.journal.root()
380    }
381
382    /// Return the pinned Merkle nodes at the given location.
383    pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<H::Digest>, Error<F>> {
384        if !loc.is_valid() {
385            return Err(crate::merkle::Error::LocationOverflow(loc).into());
386        }
387        let futs: Vec<_> = F::nodes_to_pin(loc)
388            .map(|p| async move {
389                self.journal
390                    .merkle
391                    .get_node(p)
392                    .await?
393                    .ok_or(crate::merkle::Error::ElementPruned(p).into())
394            })
395            .collect();
396        futures::future::try_join_all(futs).await
397    }
398
399    /// Sync all database state to disk. While this isn't necessary to ensure durability of
400    /// committed operations, periodic invocation may reduce memory usage and the time required to
401    /// recover the database on restart.
402    pub async fn sync(&self) -> Result<(), Error<F>> {
403        Ok(self.journal.sync().await?)
404    }
405
406    /// Durably commit the journal state published by prior [`Immutable::apply_batch`] calls.
407    pub async fn commit(&self) -> Result<(), Error<F>> {
408        Ok(self.journal.commit().await?)
409    }
410
411    /// Destroy the db, removing all data from disk.
412    pub async fn destroy(self) -> Result<(), Error<F>> {
413        Ok(self.journal.destroy().await?)
414    }
415
416    /// Create a new speculative batch of operations with this database as its parent.
417    #[allow(clippy::type_complexity)]
418    pub fn new_batch(&self) -> batch::UnmerkleizedBatch<F, H, K, V> {
419        let journal_size = *self.last_commit_loc + 1;
420        batch::UnmerkleizedBatch::new(self, journal_size)
421    }
422
423    /// Apply a [`batch::MerkleizedBatch`] to the database.
424    ///
425    /// A batch is valid only if every batch applied to the database since this batch's
426    /// ancestor chain was created is an ancestor of this batch. Applying a batch from a
427    /// different fork returns [`Error::StaleBatch`].
428    ///
429    /// Returns the range of locations written.
430    ///
431    /// This publishes the batch to the in-memory database state and appends it to the
432    /// journal, but does not durably commit it. Call [`Immutable::commit`] or
433    /// [`Immutable::sync`] to guarantee durability.
434    pub async fn apply_batch(
435        &mut self,
436        batch: Arc<batch::MerkleizedBatch<F, H::Digest, K, V>>,
437    ) -> Result<Range<Location<F>>, Error<F>> {
438        let db_size = *self.last_commit_loc + 1;
439        let valid = db_size == batch.db_size
440            || db_size == batch.base_size
441            || batch.ancestor_diff_ends.contains(&db_size);
442        if !valid {
443            return Err(Error::StaleBatch {
444                db_size,
445                batch_db_size: batch.db_size,
446                batch_base_size: batch.base_size,
447            });
448        }
449        let start_loc = Location::new(db_size);
450
451        // Apply journal.
452        self.journal.apply_batch(&batch.journal_batch).await?;
453
454        // Apply snapshot inserts. Child first (child wins via `seen`), then
455        // uncommitted ancestor batches.
456        let bounds = self.journal.reader().await.bounds();
457        let mut seen = BTreeSet::new();
458        for (key, entry) in batch.diff.iter() {
459            seen.insert(key.clone());
460            self.snapshot
461                .insert_and_prune(key, entry.loc, |v| *v < bounds.start);
462        }
463        for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
464            if batch.ancestor_diff_ends[i] <= db_size {
465                continue;
466            }
467            for (key, entry) in ancestor_diff.iter() {
468                if seen.insert(key.clone()) {
469                    self.snapshot
470                        .insert_and_prune(key, entry.loc, |v| *v < bounds.start);
471                }
472            }
473        }
474
475        // Update state.
476        self.last_commit_loc = Location::new(batch.total_size - 1);
477        Ok(start_loc..Location::new(batch.total_size))
478    }
479}
480
481#[cfg(test)]
482pub(super) mod test {
483    use super::*;
484    use crate::{
485        merkle::{Family, Location},
486        qmdb::verify_proof,
487        translator::TwoCap,
488    };
489    use commonware_codec::EncodeShared;
490    use commonware_cryptography::{sha256, sha256::Digest, Sha256};
491    use commonware_runtime::{deterministic, Metrics};
492    use commonware_utils::NZU64;
493    use core::{future::Future, pin::Pin};
494    use std::ops::Range;
495
496    type StandardHasher<H> = crate::merkle::hasher::Standard<H>;
497
498    const ITEMS_PER_SECTION: u64 = 5;
499
500    type TestDb<F, V, C> = Immutable<F, deterministic::Context, Digest, V, C, Sha256, TwoCap>;
501
502    pub(crate) async fn test_immutable_empty<F: Family, V, C>(
503        context: deterministic::Context,
504        open_db: impl Fn(
505            deterministic::Context,
506        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
507    ) where
508        V: ValueEncoding<Value = Digest>,
509        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
510        C::Item: EncodeShared,
511    {
512        let db = open_db(context.with_label("first")).await;
513        let bounds = db.bounds().await;
514        assert_eq!(bounds.end, 1);
515        assert_eq!(bounds.start, Location::new(0));
516        assert!(db.get_metadata().await.unwrap().is_none());
517
518        // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
519        let k1 = Sha256::fill(1u8);
520        let v1 = Sha256::fill(2u8);
521        let root = db.root();
522        {
523            let _batch = db.new_batch().set(k1, v1);
524            // Don't merkleize/apply -- simulate failed commit
525        }
526        drop(db);
527        let mut db = open_db(context.with_label("second")).await;
528        assert_eq!(db.root(), root);
529        assert_eq!(db.bounds().await.end, 1);
530
531        // Test calling commit on an empty db which should make it (durably) non-empty.
532        db.apply_batch(db.new_batch().merkleize(&db, None))
533            .await
534            .unwrap();
535        db.commit().await.unwrap();
536        assert_eq!(db.bounds().await.end, 2); // commit op added
537        let root = db.root();
538        drop(db);
539
540        let db = open_db(context.with_label("third")).await;
541        assert_eq!(db.root(), root);
542
543        db.destroy().await.unwrap();
544    }
545
546    pub(crate) async fn test_immutable_build_basic<F: Family, V, C>(
547        context: deterministic::Context,
548        open_db: impl Fn(
549            deterministic::Context,
550        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
551    ) where
552        V: ValueEncoding<Value = Digest>,
553        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
554        C::Item: EncodeShared,
555    {
556        // Build a db with 2 keys.
557        let mut db = open_db(context.with_label("first")).await;
558
559        let k1 = Sha256::fill(1u8);
560        let k2 = Sha256::fill(2u8);
561        let v1 = Sha256::fill(3u8);
562        let v2 = Sha256::fill(4u8);
563
564        assert!(db.get(&k1).await.unwrap().is_none());
565        assert!(db.get(&k2).await.unwrap().is_none());
566
567        // Set and commit the first key.
568        let metadata = Some(Sha256::fill(99u8));
569        db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, metadata))
570            .await
571            .unwrap();
572        db.commit().await.unwrap();
573        assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
574        assert!(db.get(&k2).await.unwrap().is_none());
575        assert_eq!(db.bounds().await.end, 3);
576        assert_eq!(db.get_metadata().await.unwrap(), Some(Sha256::fill(99u8)));
577
578        // Set and commit the second key.
579        db.apply_batch(db.new_batch().set(k2, v2).merkleize(&db, None))
580            .await
581            .unwrap();
582        db.commit().await.unwrap();
583        assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
584        assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
585        assert_eq!(db.bounds().await.end, 5);
586        assert_eq!(db.get_metadata().await.unwrap(), None);
587
588        // Capture state.
589        let root = db.root();
590
591        // Add an uncommitted op then simulate failure.
592        let k3 = Sha256::fill(5u8);
593        let v3 = Sha256::fill(6u8);
594        {
595            let _batch = db.new_batch().set(k3, v3);
596            // Don't merkleize/apply -- simulate failed commit
597        }
598
599        // Reopen, make sure state is restored to last commit point.
600        drop(db); // Simulate failed commit
601        let db = open_db(context.with_label("second")).await;
602        assert!(db.get(&k3).await.unwrap().is_none());
603        assert_eq!(db.root(), root);
604        assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
605        assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
606        assert_eq!(db.bounds().await.end, 5);
607        assert_eq!(db.get_metadata().await.unwrap(), None);
608
609        // Cleanup.
610        db.destroy().await.unwrap();
611    }
612
613    pub(crate) async fn test_immutable_proof_verify<F: Family, V, C>(
614        context: deterministic::Context,
615        open_db: impl Fn(
616            deterministic::Context,
617        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
618    ) where
619        V: ValueEncoding<Value = Digest>,
620        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
621        C::Item: EncodeShared,
622    {
623        let mut db = open_db(context.with_label("first")).await;
624
625        let k1 = Sha256::fill(1u8);
626        let v1 = Sha256::fill(10u8);
627        db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, None))
628            .await
629            .unwrap();
630        db.commit().await.unwrap();
631
632        let (proof, ops) = db.proof(Location::new(0), NZU64!(100)).await.unwrap();
633        let root = db.root();
634        let hasher = StandardHasher::<Sha256>::new();
635        assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
636
637        db.destroy().await.unwrap();
638    }
639
640    pub(crate) async fn test_immutable_prune<F: Family, V, C>(
641        context: deterministic::Context,
642        open_db: impl Fn(
643            deterministic::Context,
644        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
645    ) where
646        V: ValueEncoding<Value = Digest>,
647        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
648        C::Item: EncodeShared,
649    {
650        let mut db = open_db(context.with_label("first")).await;
651
652        for i in 0..20u8 {
653            let key = Sha256::fill(i);
654            let value = Sha256::fill(i.wrapping_add(100));
655            db.apply_batch(db.new_batch().set(key, value).merkleize(&db, None))
656                .await
657                .unwrap();
658            db.commit().await.unwrap();
659        }
660
661        let root_before = db.root();
662        let bounds_before = db.bounds().await;
663
664        let prune_loc = Location::new(*bounds_before.end - 5);
665        db.prune(prune_loc).await.unwrap();
666
667        assert_eq!(db.root(), root_before);
668
669        let key_0 = Sha256::fill(0u8);
670        assert!(db.get(&key_0).await.unwrap().is_none());
671
672        let key_19 = Sha256::fill(19u8);
673        assert_eq!(
674            db.get(&key_19).await.unwrap(),
675            Some(Sha256::fill(19u8.wrapping_add(100)))
676        );
677
678        db.destroy().await.unwrap();
679    }
680
681    pub(crate) async fn test_immutable_batch_chain<F: Family, V, C>(
682        context: deterministic::Context,
683        open_db: impl Fn(
684            deterministic::Context,
685        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
686    ) where
687        V: ValueEncoding<Value = Digest>,
688        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
689        C::Item: EncodeShared,
690    {
691        let mut db = open_db(context.with_label("first")).await;
692
693        let k1 = Sha256::fill(1u8);
694        let k2 = Sha256::fill(2u8);
695        let k3 = Sha256::fill(3u8);
696        let v1 = Sha256::fill(11u8);
697        let v2 = Sha256::fill(12u8);
698        let v3 = Sha256::fill(13u8);
699
700        let parent = db.new_batch().set(k1, v1).merkleize(&db, None);
701        let child = parent
702            .new_batch::<Sha256>()
703            .set(k2, v2)
704            .merkleize(&db, None);
705
706        assert_eq!(child.get(&k1, &db).await.unwrap(), Some(v1));
707        assert_eq!(child.get(&k2, &db).await.unwrap(), Some(v2));
708        assert!(child.get(&k3, &db).await.unwrap().is_none());
709
710        db.apply_batch(child).await.unwrap();
711        db.commit().await.unwrap();
712
713        assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
714        assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
715
716        db.apply_batch(db.new_batch().set(k3, v3).merkleize(&db, None))
717            .await
718            .unwrap();
719        db.commit().await.unwrap();
720        assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
721
722        db.destroy().await.unwrap();
723    }
724
725    pub(crate) async fn test_immutable_build_and_authenticate<F: Family, V, C>(
726        context: deterministic::Context,
727        open_db: impl Fn(
728            deterministic::Context,
729        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
730    ) where
731        V: ValueEncoding<Value = Digest>,
732        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
733        C::Item: EncodeShared,
734    {
735        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
736        let hasher = StandardHasher::<Sha256>::new();
737        let mut db = open_db(context.with_label("first")).await;
738
739        let mut batch = db.new_batch();
740        for i in 0u64..2_000 {
741            let k = Sha256::hash(&i.to_be_bytes());
742            let v = Sha256::fill(i as u8);
743            batch = batch.set(k, v);
744        }
745        let merkleized = batch.merkleize(&db, None);
746        db.apply_batch(merkleized).await.unwrap();
747        db.commit().await.unwrap();
748        assert_eq!(db.bounds().await.end, 2_000 + 2);
749
750        // Drop & reopen the db, making sure it has exactly the same state.
751        let root = db.root();
752        drop(db);
753
754        let db = open_db(context.with_label("second")).await;
755        assert_eq!(root, db.root());
756        assert_eq!(db.bounds().await.end, 2_000 + 2);
757        for i in 0u64..2_000 {
758            let k = Sha256::hash(&i.to_be_bytes());
759            let v = Sha256::fill(i as u8);
760            assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
761        }
762
763        // Make sure all ranges of 5 operations are provable, including truncated ranges at the
764        // end.
765        let max_ops = NZU64!(5);
766        for i in 0..*db.bounds().await.end {
767            let (proof, log) = db.proof(Location::new(i), max_ops).await.unwrap();
768            assert!(verify_proof(&hasher, &proof, Location::new(i), &log, &root));
769        }
770
771        db.destroy().await.unwrap();
772    }
773
774    pub(crate) async fn test_immutable_recovery_from_failed_merkle_sync<F: Family, V, C>(
775        context: deterministic::Context,
776        open_db: impl Fn(
777            deterministic::Context,
778        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
779    ) where
780        V: ValueEncoding<Value = Digest>,
781        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
782        C::Item: EncodeShared,
783    {
784        // Insert 1000 keys then sync.
785        const ELEMENTS: u64 = 1000;
786        let mut db = open_db(context.with_label("first")).await;
787
788        let mut batch = db.new_batch();
789        for i in 0u64..ELEMENTS {
790            let k = Sha256::hash(&i.to_be_bytes());
791            let v = Sha256::fill(i as u8);
792            batch = batch.set(k, v);
793        }
794        let merkleized = batch.merkleize(&db, None);
795        db.apply_batch(merkleized).await.unwrap();
796        db.commit().await.unwrap();
797        assert_eq!(db.bounds().await.end, ELEMENTS + 2);
798        db.sync().await.unwrap();
799        let halfway_root = db.root();
800
801        // Insert another 1000 keys then commit.
802        let mut batch = db.new_batch();
803        for i in 0u64..ELEMENTS {
804            let k = Sha256::hash(&i.to_be_bytes());
805            let v = Sha256::fill(i as u8);
806            batch = batch.set(k, v);
807        }
808        let merkleized = batch.merkleize(&db, None);
809        db.apply_batch(merkleized).await.unwrap();
810        db.commit().await.unwrap();
811        drop(db); // Drop before syncing
812
813        // Recovery should replay the log to regenerate the merkle structure.
814        // op_count = 1002 (first batch + commit) + 1000 (second batch) + 1 (second commit) = 2003
815        let db = open_db(context.with_label("second")).await;
816        assert_eq!(db.bounds().await.end, 2003);
817        let root = db.root();
818        assert_ne!(root, halfway_root);
819
820        // Drop & reopen could preserve the final commit.
821        drop(db);
822        let db = open_db(context.with_label("third")).await;
823        assert_eq!(db.bounds().await.end, 2003);
824        assert_eq!(db.root(), root);
825
826        db.destroy().await.unwrap();
827    }
828
829    pub(crate) async fn test_immutable_recovery_from_failed_log_sync<F: Family, V, C>(
830        context: deterministic::Context,
831        open_db: impl Fn(
832            deterministic::Context,
833        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
834    ) where
835        V: ValueEncoding<Value = Digest>,
836        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
837        C::Item: EncodeShared,
838    {
839        let mut db = open_db(context.with_label("first")).await;
840
841        // Insert a single key and then commit to create a first commit point.
842        let k1 = Sha256::fill(1u8);
843        let v1 = Sha256::fill(3u8);
844        db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, None))
845            .await
846            .unwrap();
847        db.commit().await.unwrap();
848        let first_commit_root = db.root();
849
850        // Simulate failure. Sets that are never merkleized/applied are lost.
851        // Recovery should restore the last commit point.
852        drop(db);
853
854        // Recovery should back up to previous commit point.
855        let db = open_db(context.with_label("second")).await;
856        assert_eq!(db.bounds().await.end, 3);
857        let root = db.root();
858        assert_eq!(root, first_commit_root);
859
860        db.destroy().await.unwrap();
861    }
862
863    pub(crate) async fn test_immutable_pruning<F: Family, V, C>(
864        context: deterministic::Context,
865        open_db: impl Fn(
866            deterministic::Context,
867        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
868    ) where
869        V: ValueEncoding<Value = Digest>,
870        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
871        C::Item: EncodeShared,
872    {
873        // Build a db with `ELEMENTS` key/value pairs then prune some of them.
874        const ELEMENTS: u64 = 2_000;
875        let mut db = open_db(context.with_label("first")).await;
876
877        // Batch writes keys in BTreeMap-sorted order, so build the sorted key
878        // list to map between journal locations and keys.
879        let mut sorted_keys: Vec<sha256::Digest> = (1u64..ELEMENTS + 1)
880            .map(|i| Sha256::hash(&i.to_be_bytes()))
881            .collect();
882        sorted_keys.sort();
883        // Location 0: initial commit; locations 1..=ELEMENTS: Set ops in sorted
884        // key order; location ELEMENTS+1: batch commit.
885        // key_at_loc(L) = sorted_keys[L - 1] for 1 <= L <= ELEMENTS.
886
887        let mut batch = db.new_batch();
888        for i in 1u64..ELEMENTS + 1 {
889            let k = Sha256::hash(&i.to_be_bytes());
890            let v = Sha256::fill(i as u8);
891            batch = batch.set(k, v);
892        }
893        let merkleized = batch.merkleize(&db, None);
894        db.apply_batch(merkleized).await.unwrap();
895        assert_eq!(db.bounds().await.end, ELEMENTS + 2);
896
897        // Prune the db to the first half of the operations.
898        db.prune(Location::new((ELEMENTS + 2) / 2)).await.unwrap();
899        let bounds = db.bounds().await;
900        assert_eq!(bounds.end, ELEMENTS + 2);
901
902        // items_per_section is 5, so half should be exactly at a blob boundary, in which case
903        // the actual pruning location should match the requested.
904        let oldest_retained_loc = bounds.start;
905        assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
906
907        // Try to fetch a pruned key (at location oldest_retained - 1).
908        let pruned_key = sorted_keys[*oldest_retained_loc as usize - 2];
909        assert!(db.get(&pruned_key).await.unwrap().is_none());
910
911        // Try to fetch unpruned key (at location oldest_retained).
912        let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
913        assert!(db.get(&unpruned_key).await.unwrap().is_some());
914
915        // Drop & reopen the db, making sure it has exactly the same state.
916        let root = db.root();
917        db.sync().await.unwrap();
918        drop(db);
919
920        let mut db = open_db(context.with_label("second")).await;
921        assert_eq!(root, db.root());
922        let bounds = db.bounds().await;
923        assert_eq!(bounds.end, ELEMENTS + 2);
924        let oldest_retained_loc = bounds.start;
925        assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
926
927        // Prune to a non-blob boundary.
928        let loc = Location::new(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
929        db.prune(loc).await.unwrap();
930        // Actual boundary should be a multiple of 5.
931        let oldest_retained_loc = db.bounds().await.start;
932        assert_eq!(
933            oldest_retained_loc,
934            Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
935        );
936
937        // Confirm boundary persists across restart.
938        db.sync().await.unwrap();
939        drop(db);
940        let db = open_db(context.with_label("third")).await;
941        let oldest_retained_loc = db.bounds().await.start;
942        assert_eq!(
943            oldest_retained_loc,
944            Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
945        );
946
947        // Try to fetch a pruned key (at location oldest_retained - 3).
948        let pruned_key = sorted_keys[*oldest_retained_loc as usize - 4];
949        assert!(db.get(&pruned_key).await.unwrap().is_none());
950
951        // Try to fetch unpruned key (at location oldest_retained).
952        let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
953        assert!(db.get(&unpruned_key).await.unwrap().is_some());
954
955        // Confirm behavior of trying to create a proof of pruned items is as expected.
956        let pruned_pos = ELEMENTS / 2;
957        let proof_result = db
958            .proof(Location::new(pruned_pos), NZU64!(pruned_pos + 100))
959            .await;
960        assert!(
961            matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos)
962        );
963
964        db.destroy().await.unwrap();
965    }
966
967    pub(crate) async fn test_immutable_prune_beyond_commit<F: Family, V, C>(
968        context: deterministic::Context,
969        open_db: impl Fn(
970            deterministic::Context,
971        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
972    ) where
973        V: ValueEncoding<Value = Digest>,
974        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
975        C::Item: EncodeShared,
976    {
977        let mut db = open_db(context.with_label("test")).await;
978
979        // Test pruning empty database (no commits)
980        let result = db.prune(Location::new(1)).await;
981        assert!(
982            matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
983                if prune_loc == Location::new(1) && commit_loc == Location::new(0))
984        );
985
986        // Add key-value pairs and commit
987        let k1 = Digest::from(*b"12345678901234567890123456789012");
988        let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
989        let k3 = Digest::from(*b"99999999999999999999999999999999");
990        let v1 = Sha256::fill(1u8);
991        let v2 = Sha256::fill(2u8);
992        let v3 = Sha256::fill(3u8);
993
994        db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(&db, None))
995            .await
996            .unwrap();
997
998        // op_count is 4 (initial_commit, k1, k2, commit), last_commit is at location 3
999        assert_eq!(*db.last_commit_loc, 3);
1000
1001        db.apply_batch(db.new_batch().set(k3, v3).merkleize(&db, None))
1002            .await
1003            .unwrap();
1004
1005        // Test valid prune (at previous commit location 3)
1006        assert!(db.prune(Location::new(3)).await.is_ok());
1007
1008        // Test pruning beyond last commit
1009        let new_last_commit = db.last_commit_loc;
1010        let beyond = new_last_commit + 1;
1011        let result = db.prune(beyond).await;
1012        assert!(
1013            matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1014                if prune_loc == beyond && commit_loc == new_last_commit)
1015        );
1016
1017        db.destroy().await.unwrap();
1018    }
1019
1020    async fn commit_sets<F: Family, V, C>(
1021        db: &mut TestDb<F, V, C>,
1022        sets: impl IntoIterator<Item = (Digest, V::Value)>,
1023        metadata: Option<V::Value>,
1024    ) -> Range<Location<F>>
1025    where
1026        V: ValueEncoding<Value = Digest>,
1027        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1028        C::Item: EncodeShared,
1029    {
1030        let mut batch = db.new_batch();
1031        for (key, value) in sets {
1032            batch = batch.set(key, value);
1033        }
1034        let range = db.apply_batch(batch.merkleize(db, metadata)).await.unwrap();
1035        db.commit().await.unwrap();
1036        range
1037    }
1038
1039    pub(crate) async fn test_immutable_rewind_recovery<F: Family, V, C>(
1040        context: deterministic::Context,
1041        open_db: impl Fn(
1042            deterministic::Context,
1043        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1044    ) where
1045        V: ValueEncoding<Value = Digest>,
1046        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1047        C::Item: EncodeShared,
1048    {
1049        let mut db = open_db(context.with_label("db")).await;
1050
1051        let key1 = Sha256::hash(&1u64.to_be_bytes());
1052        let key2 = Sha256::hash(&2u64.to_be_bytes());
1053        let key3 = Sha256::hash(&3u64.to_be_bytes());
1054        let key4 = Sha256::hash(&4u64.to_be_bytes());
1055
1056        let value1 = Sha256::fill(11u8);
1057        let value2 = Sha256::fill(22u8);
1058        let value3 = Sha256::fill(33u8);
1059        let value4 = Sha256::fill(66u8);
1060
1061        let metadata_a = Sha256::fill(44u8);
1062        let first_range =
1063            commit_sets(&mut db, [(key1, value1), (key2, value2)], Some(metadata_a)).await;
1064        let size_before = db.bounds().await.end;
1065        let root_before = db.root();
1066        let last_commit_before = db.last_commit_loc;
1067        assert_eq!(size_before, first_range.end);
1068
1069        let metadata_b = Sha256::fill(55u8);
1070        let second_range =
1071            commit_sets(&mut db, [(key3, value3), (key4, value4)], Some(metadata_b)).await;
1072        assert_eq!(second_range.start, size_before);
1073        assert_ne!(db.root(), root_before);
1074        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_b));
1075        assert_eq!(db.get(&key3).await.unwrap(), Some(value3));
1076        assert_eq!(db.get(&key4).await.unwrap(), Some(value4));
1077
1078        db.rewind(size_before).await.unwrap();
1079        assert_eq!(db.root(), root_before);
1080        assert_eq!(db.bounds().await.end, size_before);
1081        assert_eq!(db.last_commit_loc, last_commit_before);
1082        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1083        assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1084        assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1085        assert_eq!(db.get(&key3).await.unwrap(), None);
1086        assert_eq!(db.get(&key4).await.unwrap(), None);
1087
1088        db.commit().await.unwrap();
1089        drop(db);
1090        let db = open_db(context.with_label("reopen")).await;
1091        assert_eq!(db.root(), root_before);
1092        assert_eq!(db.bounds().await.end, size_before);
1093        assert_eq!(db.last_commit_loc, last_commit_before);
1094        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1095        assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1096        assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1097        assert_eq!(db.get(&key3).await.unwrap(), None);
1098        assert_eq!(db.get(&key4).await.unwrap(), None);
1099
1100        db.destroy().await.unwrap();
1101    }
1102
1103    pub(crate) async fn test_immutable_rewind_pruned_target_errors<F: Family, V, C>(
1104        context: deterministic::Context,
1105        open_small_sections_db: impl Fn(
1106            deterministic::Context,
1107        )
1108            -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1109    ) where
1110        V: ValueEncoding<Value = Digest>,
1111        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1112        C::Item: EncodeShared,
1113    {
1114        let mut db = open_small_sections_db(context.with_label("db")).await;
1115
1116        let first_range = commit_sets(
1117            &mut db,
1118            (0u64..16).map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8))),
1119            None,
1120        )
1121        .await;
1122
1123        let mut round = 0u64;
1124        loop {
1125            round += 1;
1126            assert!(
1127                round <= 64,
1128                "failed to prune enough history for rewind test"
1129            );
1130
1131            commit_sets(
1132                &mut db,
1133                (0u64..16).map(|i| {
1134                    let seed = round * 100 + i;
1135                    (Sha256::hash(&seed.to_be_bytes()), Sha256::fill(seed as u8))
1136                }),
1137                None,
1138            )
1139            .await;
1140            db.prune(db.last_commit_loc).await.unwrap();
1141
1142            if db.bounds().await.start > first_range.start {
1143                break;
1144            }
1145        }
1146
1147        let oldest_retained = db.bounds().await.start;
1148        let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
1149        assert!(
1150            matches!(
1151                boundary_err,
1152                Error::Journal(crate::journal::Error::ItemPruned(_))
1153            ),
1154            "unexpected rewind error at retained boundary: {boundary_err:?}"
1155        );
1156
1157        let err = db.rewind(first_range.start).await.unwrap_err();
1158        assert!(
1159            matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1160            "unexpected rewind error: {err:?}"
1161        );
1162
1163        db.destroy().await.unwrap();
1164    }
1165
1166    /// batch.get() reads pending mutations and falls through to base DB.
1167    pub(crate) async fn test_immutable_batch_get_read_through<F: Family, V, C>(
1168        context: deterministic::Context,
1169        open_db: impl Fn(
1170            deterministic::Context,
1171        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1172    ) where
1173        V: ValueEncoding<Value = Digest>,
1174        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1175        C::Item: EncodeShared,
1176    {
1177        let mut db = open_db(context.with_label("db")).await;
1178
1179        // Pre-populate with key A.
1180        let key_a = Sha256::hash(&0u64.to_be_bytes());
1181        let val_a = Sha256::fill(1u8);
1182        db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None))
1183            .await
1184            .unwrap();
1185
1186        // batch.get(&A) should return DB value.
1187        let mut batch = db.new_batch();
1188        assert_eq!(batch.get(&key_a, &db).await.unwrap(), Some(val_a));
1189
1190        // Set B in batch, batch.get(&B) returns the value.
1191        let key_b = Sha256::hash(&1u64.to_be_bytes());
1192        let val_b = Sha256::fill(2u8);
1193        batch = batch.set(key_b, val_b);
1194        assert_eq!(batch.get(&key_b, &db).await.unwrap(), Some(val_b));
1195
1196        // Nonexistent key.
1197        let key_c = Sha256::hash(&2u64.to_be_bytes());
1198        assert_eq!(batch.get(&key_c, &db).await.unwrap(), None);
1199
1200        db.destroy().await.unwrap();
1201    }
1202
1203    /// Child batch reads parent diff and adds its own mutations.
1204    pub(crate) async fn test_immutable_batch_stacked_get<F: Family, V, C>(
1205        context: deterministic::Context,
1206        open_db: impl Fn(
1207            deterministic::Context,
1208        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1209    ) where
1210        V: ValueEncoding<Value = Digest>,
1211        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1212        C::Item: EncodeShared,
1213    {
1214        let db = open_db(context.with_label("db")).await;
1215
1216        // Parent batch: set A.
1217        let key_a = Sha256::hash(&0u64.to_be_bytes());
1218        let val_a = Sha256::fill(10u8);
1219        let parent = db.new_batch().set(key_a, val_a);
1220        let parent_m = parent.merkleize(&db, None);
1221
1222        // Child reads parent's A.
1223        let mut child = parent_m.new_batch::<Sha256>();
1224        assert_eq!(child.get(&key_a, &db).await.unwrap(), Some(val_a));
1225
1226        // Child sets B.
1227        let key_b = Sha256::hash(&1u64.to_be_bytes());
1228        let val_b = Sha256::fill(20u8);
1229        child = child.set(key_b, val_b);
1230        assert_eq!(child.get(&key_b, &db).await.unwrap(), Some(val_b));
1231
1232        // Nonexistent key.
1233        let key_c = Sha256::hash(&2u64.to_be_bytes());
1234        assert_eq!(child.get(&key_c, &db).await.unwrap(), None);
1235
1236        db.destroy().await.unwrap();
1237    }
1238
1239    /// Two-level stacked batch apply works end-to-end.
1240    pub(crate) async fn test_immutable_batch_stacked_apply<F: Family, V, C>(
1241        context: deterministic::Context,
1242        open_db: impl Fn(
1243            deterministic::Context,
1244        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1245    ) where
1246        V: ValueEncoding<Value = Digest>,
1247        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1248        C::Item: EncodeShared,
1249    {
1250        let mut db = open_db(context.with_label("db")).await;
1251
1252        // Sort keys so operations are in BTreeMap order (same as merkleize writes).
1253        let mut kvs_first: Vec<(Digest, Digest)> = (0u64..5)
1254            .map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
1255            .collect();
1256        kvs_first.sort_by(|a, b| a.0.cmp(&b.0));
1257
1258        let mut kvs_second: Vec<(Digest, Digest)> = (5u64..10)
1259            .map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
1260            .collect();
1261        kvs_second.sort_by(|a, b| a.0.cmp(&b.0));
1262
1263        // Parent batch: set keys 0..5.
1264        let mut parent = db.new_batch();
1265        for (k, v) in &kvs_first {
1266            parent = parent.set(*k, *v);
1267        }
1268        let parent_m = parent.merkleize(&db, None);
1269
1270        // Child batch: set keys 5..10.
1271        let mut child = parent_m.new_batch::<Sha256>();
1272        for (k, v) in &kvs_second {
1273            child = child.set(*k, *v);
1274        }
1275        let child_m = child.merkleize(&db, None);
1276        let expected_root = child_m.root();
1277        db.apply_batch(child_m).await.unwrap();
1278
1279        assert_eq!(db.root(), expected_root);
1280
1281        // All 10 keys should be accessible.
1282        for (k, v) in kvs_first.iter().chain(kvs_second.iter()) {
1283            assert_eq!(db.get(k).await.unwrap(), Some(*v));
1284        }
1285
1286        db.destroy().await.unwrap();
1287    }
1288
1289    /// MerkleizedBatch::root() matches db.root() after apply_batch().
1290    pub(crate) async fn test_immutable_batch_speculative_root<F: Family, V, C>(
1291        context: deterministic::Context,
1292        open_db: impl Fn(
1293            deterministic::Context,
1294        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1295    ) where
1296        V: ValueEncoding<Value = Digest>,
1297        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1298        C::Item: EncodeShared,
1299    {
1300        let mut db = open_db(context.with_label("db")).await;
1301
1302        let mut batch = db.new_batch();
1303        for i in 0u8..10 {
1304            let k = Sha256::hash(&[i]);
1305            batch = batch.set(k, Sha256::fill(i));
1306        }
1307        let merkleized = batch.merkleize(&db, None);
1308
1309        let speculative = merkleized.root();
1310        db.apply_batch(merkleized).await.unwrap();
1311        assert_eq!(db.root(), speculative);
1312
1313        // Second batch with metadata.
1314        let metadata = Some(Sha256::fill(55u8));
1315        let mut batch = db.new_batch();
1316        let k = Sha256::hash(&[0xAA]);
1317        batch = batch.set(k, Sha256::fill(0xAA));
1318        let merkleized = batch.merkleize(&db, metadata);
1319        let speculative = merkleized.root();
1320        db.apply_batch(merkleized).await.unwrap();
1321        assert_eq!(db.root(), speculative);
1322
1323        db.destroy().await.unwrap();
1324    }
1325
1326    /// MerkleizedBatch::get() reads from diff and base DB.
1327    pub(crate) async fn test_immutable_merkleized_batch_get<F: Family, V, C>(
1328        context: deterministic::Context,
1329        open_db: impl Fn(
1330            deterministic::Context,
1331        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1332    ) where
1333        V: ValueEncoding<Value = Digest>,
1334        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1335        C::Item: EncodeShared,
1336    {
1337        let mut db = open_db(context.with_label("db")).await;
1338
1339        // Pre-populate base DB.
1340        let key_a = Sha256::hash(&0u64.to_be_bytes());
1341        let val_a = Sha256::fill(10u8);
1342        db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None))
1343            .await
1344            .unwrap();
1345
1346        // Create a merkleized batch with a new key.
1347        let key_b = Sha256::hash(&1u64.to_be_bytes());
1348        let val_b = Sha256::fill(20u8);
1349        let merkleized = db.new_batch().set(key_b, val_b).merkleize(&db, None);
1350
1351        // Read base DB value through merkleized batch.
1352        assert_eq!(merkleized.get(&key_a, &db).await.unwrap(), Some(val_a));
1353
1354        // Read this batch's key from the diff.
1355        assert_eq!(merkleized.get(&key_b, &db).await.unwrap(), Some(val_b));
1356
1357        // Nonexistent key.
1358        let key_c = Sha256::hash(&2u64.to_be_bytes());
1359        assert_eq!(merkleized.get(&key_c, &db).await.unwrap(), None);
1360
1361        db.destroy().await.unwrap();
1362    }
1363
1364    /// Independent sequential batches applied one at a time.
1365    pub(crate) async fn test_immutable_batch_sequential_apply<F: Family, V, C>(
1366        context: deterministic::Context,
1367        open_db: impl Fn(
1368            deterministic::Context,
1369        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1370    ) where
1371        V: ValueEncoding<Value = Digest>,
1372        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1373        C::Item: EncodeShared,
1374    {
1375        let mut db = open_db(context.with_label("db")).await;
1376
1377        let key_a = Sha256::hash(&0u64.to_be_bytes());
1378        let val_a = Sha256::fill(1u8);
1379
1380        // First batch.
1381        let m = db.new_batch().set(key_a, val_a).merkleize(&db, None);
1382        let root1 = m.root();
1383        db.apply_batch(m).await.unwrap();
1384        assert_eq!(db.root(), root1);
1385        assert_eq!(db.get(&key_a).await.unwrap(), Some(val_a));
1386
1387        // Second independent batch.
1388        let key_b = Sha256::hash(&1u64.to_be_bytes());
1389        let val_b = Sha256::fill(2u8);
1390        let m = db.new_batch().set(key_b, val_b).merkleize(&db, None);
1391        let root2 = m.root();
1392        db.apply_batch(m).await.unwrap();
1393        assert_eq!(db.root(), root2);
1394        assert_eq!(db.get(&key_b).await.unwrap(), Some(val_b));
1395
1396        db.destroy().await.unwrap();
1397    }
1398
1399    /// Many sequential batches accumulate correctly.
1400    pub(crate) async fn test_immutable_batch_many_sequential<F: Family, V, C>(
1401        context: deterministic::Context,
1402        open_db: impl Fn(
1403            deterministic::Context,
1404        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1405    ) where
1406        V: ValueEncoding<Value = Digest>,
1407        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1408        C::Item: EncodeShared,
1409    {
1410        let mut db = open_db(context.with_label("db")).await;
1411        let hasher = StandardHasher::<Sha256>::new();
1412
1413        const BATCHES: u64 = 20;
1414        const KEYS_PER_BATCH: u64 = 5;
1415
1416        let mut all_kvs: Vec<(Digest, Digest)> = Vec::new();
1417
1418        for batch_idx in 0..BATCHES {
1419            let mut batch = db.new_batch();
1420            for j in 0..KEYS_PER_BATCH {
1421                let seed = batch_idx * 100 + j;
1422                let k = Sha256::hash(&seed.to_be_bytes());
1423                let v = Sha256::fill(seed as u8);
1424                batch = batch.set(k, v);
1425                all_kvs.push((k, v));
1426            }
1427            let merkleized = batch.merkleize(&db, None);
1428            db.apply_batch(merkleized).await.unwrap();
1429        }
1430
1431        // Verify all key-values are readable.
1432        for (k, v) in &all_kvs {
1433            assert_eq!(db.get(k).await.unwrap(), Some(*v));
1434        }
1435
1436        // Verify proof over the full range.
1437        let root = db.root();
1438        let (proof, ops) = db.proof(Location::new(0), NZU64!(10000)).await.unwrap();
1439        assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1440
1441        // Expected: 1 initial commit + BATCHES * (KEYS_PER_BATCH + 1 commit).
1442        let expected = 1 + BATCHES * (KEYS_PER_BATCH + 1);
1443        assert_eq!(db.bounds().await.end, expected);
1444
1445        db.destroy().await.unwrap();
1446    }
1447
1448    /// Empty batch (zero mutations) produces correct speculative root.
1449    pub(crate) async fn test_immutable_batch_empty_batch<F: Family, V, C>(
1450        context: deterministic::Context,
1451        open_db: impl Fn(
1452            deterministic::Context,
1453        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1454    ) where
1455        V: ValueEncoding<Value = Digest>,
1456        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1457        C::Item: EncodeShared,
1458    {
1459        let mut db = open_db(context.with_label("db")).await;
1460
1461        // Apply a non-empty batch first.
1462        let k = Sha256::hash(&[1u8]);
1463        db.apply_batch(
1464            db.new_batch()
1465                .set(k, Sha256::fill(1u8))
1466                .merkleize(&db, None),
1467        )
1468        .await
1469        .unwrap();
1470        let root_before = db.root();
1471        let size_before = db.bounds().await.end;
1472
1473        // Empty batch with no mutations.
1474        let merkleized = db.new_batch().merkleize(&db, None);
1475        let speculative = merkleized.root();
1476        db.apply_batch(merkleized).await.unwrap();
1477
1478        // Root changed (a new Commit op was appended).
1479        assert_ne!(db.root(), root_before);
1480        assert_eq!(db.root(), speculative);
1481        // Size grew by exactly 1 (the Commit op).
1482        assert_eq!(db.bounds().await.end, size_before + 1);
1483
1484        db.destroy().await.unwrap();
1485    }
1486
1487    /// MerkleizedBatch::get() works on a chained child's merkleized batch.
1488    pub(crate) async fn test_immutable_batch_chained_merkleized_get<F: Family, V, C>(
1489        context: deterministic::Context,
1490        open_db: impl Fn(
1491            deterministic::Context,
1492        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1493    ) where
1494        V: ValueEncoding<Value = Digest>,
1495        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1496        C::Item: EncodeShared,
1497    {
1498        let mut db = open_db(context.with_label("db")).await;
1499
1500        // Pre-populate base DB.
1501        let key_a = Sha256::hash(&0u64.to_be_bytes());
1502        let val_a = Sha256::fill(10u8);
1503        db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None))
1504            .await
1505            .unwrap();
1506
1507        // Parent batch sets key B.
1508        let key_b = Sha256::hash(&1u64.to_be_bytes());
1509        let val_b = Sha256::fill(1u8);
1510        let parent_m = db.new_batch().set(key_b, val_b).merkleize(&db, None);
1511
1512        // Child batch sets key C.
1513        let key_c = Sha256::hash(&2u64.to_be_bytes());
1514        let val_c = Sha256::fill(2u8);
1515        let child_m = parent_m
1516            .new_batch::<Sha256>()
1517            .set(key_c, val_c)
1518            .merkleize(&db, None);
1519
1520        // Child's MerkleizedBatch can read all three layers:
1521        // base DB value
1522        assert_eq!(child_m.get(&key_a, &db).await.unwrap(), Some(val_a));
1523        // parent diff value
1524        assert_eq!(child_m.get(&key_b, &db).await.unwrap(), Some(val_b));
1525        // child's own value
1526        assert_eq!(child_m.get(&key_c, &db).await.unwrap(), Some(val_c));
1527        // nonexistent key
1528        let key_d = Sha256::hash(&3u64.to_be_bytes());
1529        assert_eq!(child_m.get(&key_d, &db).await.unwrap(), None);
1530
1531        db.destroy().await.unwrap();
1532    }
1533
1534    /// Large single batch, verifying all values and proof.
1535    pub(crate) async fn test_immutable_batch_large<F: Family, V, C>(
1536        context: deterministic::Context,
1537        open_db: impl Fn(
1538            deterministic::Context,
1539        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1540    ) where
1541        V: ValueEncoding<Value = Digest>,
1542        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1543        C::Item: EncodeShared,
1544    {
1545        let mut db = open_db(context.with_label("db")).await;
1546        let hasher = StandardHasher::<Sha256>::new();
1547
1548        const N: u64 = 500;
1549        let mut kvs: Vec<(Digest, Digest)> = Vec::new();
1550
1551        let mut batch = db.new_batch();
1552        for i in 0..N {
1553            let k = Sha256::hash(&i.to_be_bytes());
1554            let v = Sha256::fill((i % 256) as u8);
1555            batch = batch.set(k, v);
1556            kvs.push((k, v));
1557        }
1558        let merkleized = batch.merkleize(&db, None);
1559        db.apply_batch(merkleized).await.unwrap();
1560
1561        // Verify every value.
1562        for (k, v) in &kvs {
1563            assert_eq!(db.get(k).await.unwrap(), Some(*v));
1564        }
1565
1566        // Verify proof over the full range.
1567        let root = db.root();
1568        let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1569        assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1570
1571        // Expected: 1 initial commit + N sets + 1 commit.
1572        assert_eq!(db.bounds().await.end, 1 + N + 1);
1573
1574        db.destroy().await.unwrap();
1575    }
1576
1577    /// Child batch overrides same key set by parent.
1578    pub(crate) async fn test_immutable_batch_chained_key_override<F: Family, V, C>(
1579        context: deterministic::Context,
1580        open_db: impl Fn(
1581            deterministic::Context,
1582        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1583    ) where
1584        V: ValueEncoding<Value = Digest>,
1585        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1586        C::Item: EncodeShared,
1587    {
1588        let mut db = open_db(context.with_label("db")).await;
1589
1590        let key = Sha256::hash(&0u64.to_be_bytes());
1591        let val_parent = Sha256::fill(1u8);
1592        let val_child = Sha256::fill(2u8);
1593
1594        // Parent sets key.
1595        let parent_m = db.new_batch().set(key, val_parent).merkleize(&db, None);
1596
1597        // Child overrides same key.
1598        let mut child = parent_m.new_batch::<Sha256>();
1599        child = child.set(key, val_child);
1600
1601        // Child's pending mutation wins over parent diff.
1602        assert_eq!(child.get(&key, &db).await.unwrap(), Some(val_child));
1603
1604        let child_m = child.merkleize(&db, None);
1605
1606        // After merkleize, child's diff wins.
1607        assert_eq!(child_m.get(&key, &db).await.unwrap(), Some(val_child));
1608
1609        // Apply and verify.
1610        db.apply_batch(child_m).await.unwrap();
1611        assert_eq!(db.get(&key).await.unwrap(), Some(val_child));
1612
1613        db.destroy().await.unwrap();
1614    }
1615
1616    /// Same key set across two sequential applied batches. The immutable DB
1617    /// keeps all versions -- `get()` returns the earliest non-pruned value.
1618    /// After pruning the first version, `get()` returns the second.
1619    ///
1620    /// `open_db_small_sections` must return a DB whose log has `items_per_section=1`
1621    /// so pruning is per-item.
1622    pub(crate) async fn test_immutable_batch_sequential_key_override<F: Family, V, C>(
1623        context: deterministic::Context,
1624        open_db_small_sections: impl Fn(
1625            deterministic::Context,
1626        )
1627            -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1628    ) where
1629        V: ValueEncoding<Value = Digest>,
1630        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1631        C::Item: EncodeShared,
1632    {
1633        let mut db = open_db_small_sections(context.with_label("db")).await;
1634
1635        let key = Sha256::hash(&0u64.to_be_bytes());
1636        let v1 = Sha256::fill(1u8);
1637        let v2 = Sha256::fill(2u8);
1638
1639        // First batch sets key.
1640        // Layout: 0=initial commit, 1=Set(key,v1), 2=Commit
1641        db.apply_batch(db.new_batch().set(key, v1).merkleize(&db, None))
1642            .await
1643            .unwrap();
1644        assert_eq!(db.get(&key).await.unwrap(), Some(v1));
1645
1646        // Second batch sets same key to different value.
1647        // Layout continues: 3=Set(key,v2), 4=Commit
1648        db.apply_batch(db.new_batch().set(key, v2).merkleize(&db, None))
1649            .await
1650            .unwrap();
1651
1652        // Immutable DB returns the earliest non-pruned value.
1653        assert_eq!(db.get(&key).await.unwrap(), Some(v1));
1654
1655        // Prune past the first Set (loc 1). With items_per_section=1,
1656        // pruning to loc 2 should remove the blob containing loc 1.
1657        db.prune(Location::new(2)).await.unwrap();
1658        assert_eq!(db.get(&key).await.unwrap(), Some(v2));
1659
1660        // Verify persists across reopen.
1661        db.sync().await.unwrap();
1662
1663        db.destroy().await.unwrap();
1664    }
1665
1666    /// Metadata propagates through merkleize and clears with None.
1667    pub(crate) async fn test_immutable_batch_metadata<F: Family, V, C>(
1668        context: deterministic::Context,
1669        open_db: impl Fn(
1670            deterministic::Context,
1671        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1672    ) where
1673        V: ValueEncoding<Value = Digest>,
1674        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1675        C::Item: EncodeShared,
1676    {
1677        let mut db = open_db(context.with_label("db")).await;
1678
1679        // Batch with metadata.
1680        let metadata = Sha256::fill(42u8);
1681        let k = Sha256::hash(&[1u8]);
1682        db.apply_batch(
1683            db.new_batch()
1684                .set(k, Sha256::fill(1u8))
1685                .merkleize(&db, Some(metadata)),
1686        )
1687        .await
1688        .unwrap();
1689        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1690
1691        // Second batch clears metadata.
1692        db.apply_batch(db.new_batch().merkleize(&db, None))
1693            .await
1694            .unwrap();
1695        assert_eq!(db.get_metadata().await.unwrap(), None);
1696
1697        db.destroy().await.unwrap();
1698    }
1699
1700    pub(crate) async fn test_immutable_stale_batch_rejected<F: Family, V, C>(
1701        context: deterministic::Context,
1702        open_db: impl Fn(
1703            deterministic::Context,
1704        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1705    ) where
1706        V: ValueEncoding<Value = Digest>,
1707        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1708        C::Item: EncodeShared,
1709    {
1710        let mut db = open_db(context.with_label("db")).await;
1711
1712        let key1 = Sha256::hash(&[1]);
1713        let key2 = Sha256::hash(&[2]);
1714        let v1 = Sha256::fill(10u8);
1715        let v2 = Sha256::fill(20u8);
1716
1717        // Create two batches from the same DB state.
1718        let batch_a = db.new_batch().set(key1, v1).merkleize(&db, None);
1719        let batch_b = db.new_batch().set(key2, v2).merkleize(&db, None);
1720
1721        // Apply the first -- should succeed.
1722        db.apply_batch(batch_a).await.unwrap();
1723        let expected_root = db.root();
1724        let expected_bounds = db.bounds().await;
1725        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1726        assert_eq!(db.get(&key2).await.unwrap(), None);
1727        assert_eq!(db.get_metadata().await.unwrap(), None);
1728
1729        // Apply the second -- should fail because the DB was modified.
1730        let result = db.apply_batch(batch_b).await;
1731        assert!(
1732            matches!(result, Err(Error::StaleBatch { .. })),
1733            "expected StaleBatch error, got {result:?}"
1734        );
1735        assert_eq!(db.root(), expected_root);
1736        assert_eq!(db.bounds().await, expected_bounds);
1737        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1738        assert_eq!(db.get(&key2).await.unwrap(), None);
1739        assert_eq!(db.get_metadata().await.unwrap(), None);
1740
1741        db.destroy().await.unwrap();
1742    }
1743
1744    pub(crate) async fn test_immutable_stale_batch_chained<F: Family, V, C>(
1745        context: deterministic::Context,
1746        open_db: impl Fn(
1747            deterministic::Context,
1748        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1749    ) where
1750        V: ValueEncoding<Value = Digest>,
1751        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1752        C::Item: EncodeShared,
1753    {
1754        let mut db = open_db(context.with_label("db")).await;
1755
1756        let key1 = Sha256::hash(&[1]);
1757        let key2 = Sha256::hash(&[2]);
1758        let key3 = Sha256::hash(&[3]);
1759
1760        // Parent batch.
1761        let parent_m = db
1762            .new_batch()
1763            .set(key1, Sha256::fill(1u8))
1764            .merkleize(&db, None);
1765
1766        // Fork two children from the same parent.
1767        let child_a = parent_m
1768            .new_batch::<Sha256>()
1769            .set(key2, Sha256::fill(2u8))
1770            .merkleize(&db, None);
1771        let child_b = parent_m
1772            .new_batch::<Sha256>()
1773            .set(key3, Sha256::fill(3u8))
1774            .merkleize(&db, None);
1775
1776        // Apply child A.
1777        db.apply_batch(child_a).await.unwrap();
1778
1779        // Child B is stale.
1780        let result = db.apply_batch(child_b).await;
1781        assert!(
1782            matches!(result, Err(Error::StaleBatch { .. })),
1783            "expected StaleBatch error, got {result:?}"
1784        );
1785
1786        db.destroy().await.unwrap();
1787    }
1788
1789    pub(crate) async fn test_immutable_partial_ancestor_commit<F: Family, V, C>(
1790        context: deterministic::Context,
1791        open_db: impl Fn(
1792            deterministic::Context,
1793        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1794    ) where
1795        V: ValueEncoding<Value = Digest>,
1796        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1797        C::Item: EncodeShared,
1798    {
1799        let mut db = open_db(context.with_label("db")).await;
1800
1801        let key1 = Sha256::hash(&[1]);
1802        let key2 = Sha256::hash(&[2]);
1803        let key3 = Sha256::hash(&[3]);
1804        let v1 = Sha256::fill(1u8);
1805        let v2 = Sha256::fill(2u8);
1806        let v3 = Sha256::fill(3u8);
1807
1808        // Chain: DB <- A <- B <- C
1809        let a = db.new_batch().set(key1, v1).merkleize(&db, None);
1810        let b = a.new_batch::<Sha256>().set(key2, v2).merkleize(&db, None);
1811        let c = b.new_batch::<Sha256>().set(key3, v3).merkleize(&db, None);
1812
1813        let expected_root = c.root();
1814
1815        // Apply only A, then apply C directly (B uncommitted).
1816        db.apply_batch(a).await.unwrap();
1817        db.apply_batch(c).await.unwrap();
1818
1819        assert_eq!(db.root(), expected_root);
1820        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1821        assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
1822        assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
1823
1824        db.destroy().await.unwrap();
1825    }
1826
1827    pub(crate) async fn test_immutable_sequential_commit_parent_then_child<F: Family, V, C>(
1828        context: deterministic::Context,
1829        open_db: impl Fn(
1830            deterministic::Context,
1831        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1832    ) where
1833        V: ValueEncoding<Value = Digest>,
1834        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1835        C::Item: EncodeShared,
1836    {
1837        let mut db = open_db(context.with_label("db")).await;
1838
1839        let key1 = Sha256::hash(&[1]);
1840        let key2 = Sha256::hash(&[2]);
1841        let v1 = Sha256::fill(1u8);
1842        let v2 = Sha256::fill(2u8);
1843
1844        // Parent batch.
1845        let parent_m = db.new_batch().set(key1, v1).merkleize(&db, None);
1846
1847        // Child batch built on parent.
1848        let child_m = parent_m
1849            .new_batch::<Sha256>()
1850            .set(key2, v2)
1851            .merkleize(&db, None);
1852
1853        // Apply parent first, then child. This is a valid sequential commit.
1854        db.apply_batch(parent_m).await.unwrap();
1855        db.apply_batch(child_m).await.unwrap();
1856
1857        // Both keys present.
1858        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1859        assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
1860
1861        db.destroy().await.unwrap();
1862    }
1863
1864    pub(crate) async fn test_immutable_child_root_matches_pending_and_committed<F: Family, V, C>(
1865        context: deterministic::Context,
1866        open_db: impl Fn(
1867            deterministic::Context,
1868        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1869    ) where
1870        V: ValueEncoding<Value = Digest>,
1871        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1872        C::Item: EncodeShared,
1873    {
1874        let mut db = open_db(context.with_label("db")).await;
1875
1876        let key1 = Sha256::hash(&[1]);
1877        let key2 = Sha256::hash(&[2]);
1878
1879        // Build the child while the parent is still pending.
1880        let parent = db
1881            .new_batch()
1882            .set(key1, Sha256::fill(1u8))
1883            .merkleize(&db, None);
1884        let pending_child = parent
1885            .new_batch::<Sha256>()
1886            .set(key2, Sha256::fill(2u8))
1887            .merkleize(&db, None);
1888
1889        // Commit the parent, then rebuild the same logical child from the
1890        // committed DB state and compare roots.
1891        db.apply_batch(parent).await.unwrap();
1892        db.commit().await.unwrap();
1893
1894        let committed_child = db
1895            .new_batch()
1896            .set(key2, Sha256::fill(2u8))
1897            .merkleize(&db, None);
1898
1899        assert_eq!(pending_child.root(), committed_child.root());
1900
1901        db.destroy().await.unwrap();
1902    }
1903
1904    pub(crate) async fn test_immutable_stale_batch_child_applied_before_parent<F: Family, V, C>(
1905        context: deterministic::Context,
1906        open_db: impl Fn(
1907            deterministic::Context,
1908        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1909    ) where
1910        V: ValueEncoding<Value = Digest>,
1911        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1912        C::Item: EncodeShared,
1913    {
1914        let mut db = open_db(context.with_label("db")).await;
1915
1916        let key1 = Sha256::hash(&[1]);
1917        let key2 = Sha256::hash(&[2]);
1918
1919        // Parent batch.
1920        let parent_m = db
1921            .new_batch()
1922            .set(key1, Sha256::fill(1u8))
1923            .merkleize(&db, None);
1924
1925        // Child batch.
1926        let child_m = parent_m
1927            .new_batch::<Sha256>()
1928            .set(key2, Sha256::fill(2u8))
1929            .merkleize(&db, None);
1930
1931        // Apply child first (it carries all parent ops too).
1932        db.apply_batch(child_m).await.unwrap();
1933
1934        // Parent is stale.
1935        let result = db.apply_batch(parent_m).await;
1936        assert!(
1937            matches!(result, Err(Error::StaleBatch { .. })),
1938            "expected StaleBatch error, got {result:?}"
1939        );
1940
1941        db.destroy().await.unwrap();
1942    }
1943
1944    /// to_batch() creates an owned snapshot whose root matches the committed DB.
1945    /// A child batch chained from it can be applied.
1946    pub(crate) async fn test_immutable_to_batch<F: Family, V, C>(
1947        context: deterministic::Context,
1948        open_db: impl Fn(
1949            deterministic::Context,
1950        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1951    ) where
1952        V: ValueEncoding<Value = Digest>,
1953        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1954        C::Item: EncodeShared,
1955    {
1956        let mut db = open_db(context.with_label("db")).await;
1957
1958        // Populate.
1959        let key1 = Sha256::hash(&[1]);
1960        let v1 = Sha256::fill(10u8);
1961        db.apply_batch(db.new_batch().set(key1, v1).merkleize(&db, None))
1962            .await
1963            .unwrap();
1964
1965        // to_batch root matches committed root.
1966        let snapshot = db.to_batch();
1967        assert_eq!(snapshot.root(), db.root());
1968
1969        // Chain a child from the snapshot, apply it.
1970        let key2 = Sha256::hash(&[2]);
1971        let v2 = Sha256::fill(20u8);
1972        let child = snapshot
1973            .new_batch::<Sha256>()
1974            .set(key2, v2)
1975            .merkleize(&db, None);
1976        db.apply_batch(child).await.unwrap();
1977
1978        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1979        assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
1980
1981        db.destroy().await.unwrap();
1982    }
1983
1984    /// Regression: applying a batch after its ancestor Arc is dropped (without
1985    /// committing) must still apply the ancestor's snapshot diffs.
1986    pub(crate) async fn test_immutable_apply_after_ancestor_dropped<F: Family, V, C>(
1987        context: deterministic::Context,
1988        open_db: impl Fn(
1989            deterministic::Context,
1990        ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1991    ) where
1992        V: ValueEncoding<Value = Digest>,
1993        C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1994        C::Item: EncodeShared,
1995    {
1996        let mut db = open_db(context.with_label("db")).await;
1997
1998        let key1 = Sha256::hash(&[1]);
1999        let key2 = Sha256::hash(&[2]);
2000        let key3 = Sha256::hash(&[3]);
2001        let v1 = Sha256::fill(1u8);
2002        let v2 = Sha256::fill(2u8);
2003        let v3 = Sha256::fill(3u8);
2004
2005        // Chain: DB <- A <- B <- C
2006        let a = db.new_batch().set(key1, v1).merkleize(&db, None);
2007        let b = a.new_batch::<Sha256>().set(key2, v2).merkleize(&db, None);
2008        let c = b.new_batch::<Sha256>().set(key3, v3).merkleize(&db, None);
2009
2010        // Drop A and B without committing. Their Weak refs in C are now dead.
2011        drop(a);
2012        drop(b);
2013
2014        // Apply only the tip. This is !skip_ancestors (DB hasn't changed).
2015        db.apply_batch(c).await.unwrap();
2016
2017        // All three keys must be in the snapshot.
2018        assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2019        assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2020        assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
2021
2022        db.destroy().await.unwrap();
2023    }
2024}