Skip to main content

commonware_storage/qmdb/immutable/
mod.rs

1//! An authenticated database that only supports adding new keyed values (no updates or
2//! deletions), where values can have varying sizes.
3//!
4//! # Examples
5//!
6//! ```ignore
7//! let mut batch = db.new_batch();
8//! batch.set(key, value);
9//! let merkleized = batch.merkleize(None);  // synchronous
10//! let finalized = merkleized.finalize();
11//! db.apply_batch(finalized).await?;
12//! ```
13
14use crate::{
15    index::{unordered::Index, Unordered as _},
16    journal::{
17        authenticated,
18        contiguous::{
19            variable::{self, Config as JournalConfig},
20            Contiguous as _, Reader,
21        },
22    },
23    kv,
24    mmr::{
25        journaled::{Config as MmrConfig, Mmr},
26        Location, Proof,
27    },
28    qmdb::{any::VariableValue, build_snapshot_from_log, Error},
29    translator::Translator,
30};
31use commonware_codec::Read;
32use commonware_cryptography::Hasher as CHasher;
33use commonware_parallel::ThreadPool;
34use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
35use commonware_utils::Array;
36use std::{
37    collections::BTreeMap,
38    num::{NonZeroU64, NonZeroUsize},
39    ops::Range,
40    sync::Arc,
41};
42use tracing::warn;
43
44pub mod batch;
45mod operation;
46pub use operation::Operation;
47
48type Journal<E, K, V, H> = authenticated::Journal<E, variable::Journal<E, Operation<K, V>>, H>;
49
50pub mod sync;
51
52/// Configuration for an [Immutable] authenticated db.
53#[derive(Clone)]
54pub struct Config<T: Translator, C> {
55    /// The name of the [RStorage] partition used for the MMR's backing journal.
56    pub mmr_journal_partition: String,
57
58    /// The items per blob configuration value used by the MMR journal.
59    pub mmr_items_per_blob: NonZeroU64,
60
61    /// The size of the write buffer to use for each blob in the MMR journal.
62    pub mmr_write_buffer: NonZeroUsize,
63
64    /// The name of the [RStorage] partition used for the MMR's metadata.
65    pub mmr_metadata_partition: String,
66
67    /// The name of the [RStorage] partition used to persist the log of operations.
68    pub log_partition: String,
69
70    /// The size of the write buffer to use for each blob in the log journal.
71    pub log_write_buffer: NonZeroUsize,
72
73    /// Optional compression level (using `zstd`) to apply to log data before storing.
74    pub log_compression: Option<u8>,
75
76    /// The codec configuration to use for encoding and decoding log items.
77    pub log_codec_config: C,
78
79    /// The number of items to put in each section of the journal.
80    pub log_items_per_section: NonZeroU64,
81
82    /// The translator used by the compressed index.
83    pub translator: T,
84
85    /// An optional thread pool to use for parallelizing batch operations.
86    pub thread_pool: Option<ThreadPool>,
87
88    /// The page cache to use for caching data.
89    pub page_cache: CacheRef,
90}
91
92/// An authenticated database that only supports adding new keyed values (no updates or
93/// deletions), where values can have varying sizes.
94pub struct Immutable<
95    E: RStorage + Clock + Metrics,
96    K: Array,
97    V: VariableValue,
98    H: CHasher,
99    T: Translator,
100> {
101    /// Authenticated journal of operations.
102    journal: Journal<E, K, V, H>,
103
104    /// A map from each active key to the location of the operation that set its value.
105    ///
106    /// # Invariant
107    ///
108    /// Only references operations of type [Operation::Set].
109    snapshot: Index<T, Location>,
110
111    /// The location of the last commit operation.
112    last_commit_loc: Location,
113}
114
115// Shared read-only functionality.
116impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
117    Immutable<E, K, V, H, T>
118{
119    /// Return the Location of the next operation appended to this db.
120    pub async fn size(&self) -> Location {
121        self.bounds().await.end
122    }
123
124    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
125    /// retained operations respectively.
126    pub async fn bounds(&self) -> std::ops::Range<Location> {
127        let bounds = self.journal.reader().await.bounds();
128        Location::new(bounds.start)..Location::new(bounds.end)
129    }
130
131    /// Get the value of `key` in the db, or None if it has no value or its corresponding operation
132    /// has been pruned.
133    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
134        let iter = self.snapshot.get(key);
135        let reader = self.journal.reader().await;
136        let oldest = reader.bounds().start;
137        for &loc in iter {
138            if loc < oldest {
139                continue;
140            }
141            if let Some(v) = Self::get_from_loc(&reader, key, loc).await? {
142                return Ok(Some(v));
143            }
144        }
145
146        Ok(None)
147    }
148
149    /// Get the value of the operation with location `loc` in the db if it matches `key`. Returns
150    /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is
151    /// otherwise assumed valid.
152    async fn get_from_loc(
153        reader: &impl Reader<Item = Operation<K, V>>,
154        key: &K,
155        loc: Location,
156    ) -> Result<Option<V>, Error> {
157        if loc < reader.bounds().start {
158            return Err(Error::OperationPruned(loc));
159        }
160
161        let Operation::Set(k, v) = reader.read(*loc).await? else {
162            return Err(Error::UnexpectedData(loc));
163        };
164
165        if k != *key {
166            Ok(None)
167        } else {
168            Ok(Some(v))
169        }
170    }
171
172    /// Get the metadata associated with the last commit.
173    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
174        let last_commit_loc = self.last_commit_loc;
175        let Operation::Commit(metadata) = self
176            .journal
177            .journal
178            .reader()
179            .await
180            .read(*last_commit_loc)
181            .await?
182        else {
183            unreachable!("no commit operation at location of last commit {last_commit_loc}");
184        };
185
186        Ok(metadata)
187    }
188
189    /// Analogous to proof but with respect to the state of the database when it had `op_count`
190    /// operations.
191    ///
192    /// # Errors
193    ///
194    /// Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
195    /// [crate::mmr::MAX_LOCATION].
196    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `op_count` > number of operations, or
197    /// if `start_loc` >= `op_count`.
198    /// Returns [`Error::OperationPruned`] if `start_loc` has been pruned.
199    pub async fn historical_proof(
200        &self,
201        op_count: Location,
202        start_loc: Location,
203        max_ops: NonZeroU64,
204    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
205        Ok(self
206            .journal
207            .historical_proof(op_count, start_loc, max_ops)
208            .await?)
209    }
210
211    /// Prune operations prior to `prune_loc`. This does not affect the db's root, but it will
212    /// affect retrieval of any keys that were set prior to `prune_loc`.
213    ///
214    /// # Errors
215    ///
216    /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > last commit location.
217    /// - Returns [crate::mmr::Error::LocationOverflow] if `prune_loc` > [crate::mmr::MAX_LOCATION].
218    pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
219        if loc > self.last_commit_loc {
220            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
221        }
222        self.journal.prune(loc).await?;
223
224        Ok(())
225    }
226    /// Return the root of the db.
227    pub fn root(&self) -> H::Digest {
228        self.journal.root()
229    }
230
231    /// Generate and return:
232    ///  1. a proof of all operations applied to the db in the range starting at (and including)
233    ///     location `start_loc`, and ending at the first of either:
234    ///     - the last operation performed, or
235    ///     - the operation `max_ops` from the start.
236    ///  2. the operations corresponding to the leaves in this range.
237    pub async fn proof(
238        &self,
239        start_index: Location,
240        max_ops: NonZeroU64,
241    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
242        let op_count = self.bounds().await.end;
243        self.historical_proof(op_count, start_index, max_ops).await
244    }
245
246    /// Returns an [Immutable] qmdb initialized from `cfg`. Any uncommitted log operations will be
247    /// discarded and the state of the db will be as of the last committed operation.
248    pub async fn init(
249        context: E,
250        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
251    ) -> Result<Self, Error> {
252        let mmr_cfg = MmrConfig {
253            journal_partition: cfg.mmr_journal_partition,
254            metadata_partition: cfg.mmr_metadata_partition,
255            items_per_blob: cfg.mmr_items_per_blob,
256            write_buffer: cfg.mmr_write_buffer,
257            thread_pool: cfg.thread_pool,
258            page_cache: cfg.page_cache.clone(),
259        };
260
261        let journal_cfg = JournalConfig {
262            partition: cfg.log_partition,
263            items_per_section: cfg.log_items_per_section,
264            compression: cfg.log_compression,
265            codec_config: cfg.log_codec_config,
266            page_cache: cfg.page_cache.clone(),
267            write_buffer: cfg.log_write_buffer,
268        };
269
270        let mut journal = Journal::new(
271            context.clone(),
272            mmr_cfg,
273            journal_cfg,
274            Operation::<K, V>::is_commit,
275        )
276        .await?;
277
278        if journal.size().await == 0 {
279            warn!("Authenticated log is empty, initialized new db.");
280            journal.append(&Operation::Commit(None)).await?;
281            journal.sync().await?;
282        }
283
284        let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator.clone());
285
286        let last_commit_loc = {
287            // Get the start of the log.
288            let reader = journal.reader().await;
289            let start_loc = Location::new(reader.bounds().start);
290
291            // Build snapshot from the log.
292            build_snapshot_from_log(start_loc, &reader, &mut snapshot, |_, _| {}).await?;
293
294            Location::new(
295                reader
296                    .bounds()
297                    .end
298                    .checked_sub(1)
299                    .expect("commit should exist"),
300            )
301        };
302
303        Ok(Self {
304            journal,
305            snapshot,
306            last_commit_loc,
307        })
308    }
309
310    /// Sync all database state to disk. While this isn't necessary to ensure durability of
311    /// committed operations, periodic invocation may reduce memory usage and the time required to
312    /// recover the database on restart.
313    pub async fn sync(&mut self) -> Result<(), Error> {
314        Ok(self.journal.sync().await?)
315    }
316
317    /// Destroy the db, removing all data from disk.
318    pub async fn destroy(self) -> Result<(), Error> {
319        Ok(self.journal.destroy().await?)
320    }
321
322    /// Create a new speculative batch of operations with this database as its parent.
323    #[allow(clippy::type_complexity)]
324    pub fn new_batch(&self) -> batch::UnmerkleizedBatch<'_, E, K, V, H, T, Mmr<E, H::Digest>> {
325        let journal_size = *self.last_commit_loc + 1;
326        batch::UnmerkleizedBatch {
327            immutable: self,
328            journal_batch: self.journal.new_batch(),
329            mutations: BTreeMap::new(),
330            base_diff: Arc::new(BTreeMap::new()),
331            base_operations: Vec::new(),
332            base_size: journal_size,
333            db_size: journal_size,
334        }
335    }
336
337    /// Apply a changeset to the database.
338    ///
339    /// A changeset is only valid if the database has not been modified since the
340    /// batch that produced it was created. Multiple batches can be forked from the
341    /// same parent for speculative execution, but only one may be applied. Applying
342    /// a stale changeset returns [`Error::StaleChangeset`].
343    ///
344    /// Returns the range of locations written.
345    pub async fn apply_batch(
346        &mut self,
347        batch: batch::Changeset<K, H::Digest, V>,
348    ) -> Result<Range<Location>, Error> {
349        let journal_size = *self.last_commit_loc + 1;
350        if batch.db_size != journal_size {
351            return Err(Error::StaleChangeset {
352                expected: batch.db_size,
353                actual: journal_size,
354            });
355        }
356        let start_loc = Location::new(journal_size);
357
358        // Write all operations to the authenticated journal + apply MMR changeset.
359        self.journal.apply_batch(batch.journal_finalized).await?;
360
361        // Flush journal to disk.
362        self.journal.commit().await?;
363
364        // Apply snapshot diffs.
365        let bounds = self.journal.reader().await.bounds();
366        for diff in batch.snapshot_diffs {
367            match diff {
368                batch::SnapshotDiff::Insert { key, new_loc } => {
369                    self.snapshot
370                        .insert_and_prune(&key, new_loc, |v| *v < bounds.start);
371                }
372            }
373        }
374
375        // Update state.
376        self.last_commit_loc = Location::new(batch.total_size - 1);
377
378        let end_loc = Location::new(batch.total_size);
379        Ok(start_loc..end_loc)
380    }
381}
382
383impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
384    kv::Gettable for Immutable<E, K, V, H, T>
385{
386    type Key = K;
387    type Value = V;
388    type Error = Error;
389
390    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
391        self.get(key).await
392    }
393}
394
395impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
396    crate::qmdb::store::LogStore for Immutable<E, K, V, H, T>
397{
398    type Value = V;
399
400    async fn bounds(&self) -> std::ops::Range<Location> {
401        self.bounds().await
402    }
403
404    async fn get_metadata(&self) -> Result<Option<V>, Error> {
405        self.get_metadata().await
406    }
407}
408
409impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
410    crate::qmdb::store::MerkleizedStore for Immutable<E, K, V, H, T>
411{
412    type Digest = H::Digest;
413    type Operation = Operation<K, V>;
414
415    fn root(&self) -> Self::Digest {
416        self.root()
417    }
418
419    async fn historical_proof(
420        &self,
421        historical_size: Location,
422        start_loc: Location,
423        max_ops: NonZeroU64,
424    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
425        self.historical_proof(historical_size, start_loc, max_ops)
426            .await
427    }
428}
429
430#[cfg(test)]
431pub(super) mod test {
432    use super::*;
433    use crate::{mmr::StandardHasher, qmdb::verify_proof, translator::TwoCap};
434    use commonware_cryptography::{sha256, sha256::Digest, Sha256};
435    use commonware_macros::test_traced;
436    use commonware_runtime::{deterministic, BufferPooler, Runner as _};
437    use commonware_utils::{NZUsize, NZU16, NZU64};
438    use std::num::NonZeroU16;
439
440    const PAGE_SIZE: NonZeroU16 = NZU16!(77);
441    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
442    const ITEMS_PER_SECTION: u64 = 5;
443
444    pub(crate) fn db_config(
445        suffix: &str,
446        pooler: &impl BufferPooler,
447    ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
448        Config {
449            mmr_journal_partition: format!("journal-{suffix}"),
450            mmr_metadata_partition: format!("metadata-{suffix}"),
451            mmr_items_per_blob: NZU64!(11),
452            mmr_write_buffer: NZUsize!(1024),
453            log_partition: format!("log-{suffix}"),
454            log_items_per_section: NZU64!(ITEMS_PER_SECTION),
455            log_compression: None,
456            log_codec_config: ((0..=10000).into(), ()),
457            log_write_buffer: NZUsize!(1024),
458            translator: TwoCap,
459            thread_pool: None,
460            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
461        }
462    }
463
464    /// Return an [Immutable] database initialized with a fixed config.
465    async fn open_db(
466        context: deterministic::Context,
467    ) -> Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap> {
468        let cfg = db_config("partition", &context);
469        Immutable::init(context, cfg).await.unwrap()
470    }
471
472    #[test_traced("WARN")]
473    pub fn test_immutable_db_empty() {
474        let executor = deterministic::Runner::default();
475        executor.start(|context| async move {
476            let db = open_db(context.with_label("first")).await;
477            let bounds = db.bounds().await;
478            assert_eq!(bounds.end, 1);
479            assert_eq!(bounds.start, Location::new(0));
480            assert!(db.get_metadata().await.unwrap().is_none());
481
482            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
483            let k1 = Sha256::fill(1u8);
484            let v1 = vec![4, 5, 6, 7];
485            let root = db.root();
486            {
487                let mut batch = db.new_batch();
488                batch.set(k1, v1);
489                // Don't merkleize/finalize/apply -- simulate failed commit
490            }
491            drop(db);
492            let mut db = open_db(context.with_label("second")).await;
493            assert_eq!(db.root(), root);
494            assert_eq!(db.bounds().await.end, 1);
495
496            // Test calling commit on an empty db which should make it (durably) non-empty.
497            let finalized = db.new_batch().merkleize(None).finalize();
498            db.apply_batch(finalized).await.unwrap();
499            assert_eq!(db.bounds().await.end, 2); // commit op added
500            let root = db.root();
501            drop(db);
502
503            let db = open_db(context.with_label("third")).await;
504            assert_eq!(db.root(), root);
505
506            db.destroy().await.unwrap();
507        });
508    }
509
510    #[test_traced("DEBUG")]
511    pub fn test_immutable_db_build_basic() {
512        let executor = deterministic::Runner::default();
513        executor.start(|context| async move {
514            // Build a db with 2 keys.
515            let mut db = open_db(context.with_label("first")).await;
516
517            let k1 = Sha256::fill(1u8);
518            let k2 = Sha256::fill(2u8);
519            let v1 = vec![1, 2, 3];
520            let v2 = vec![4, 5, 6, 7, 8];
521
522            assert!(db.get(&k1).await.unwrap().is_none());
523            assert!(db.get(&k2).await.unwrap().is_none());
524
525            // Set and commit the first key.
526            let metadata = Some(vec![99, 100]);
527            let finalized = {
528                let mut batch = db.new_batch();
529                batch.set(k1, v1.clone());
530                batch.merkleize(metadata.clone()).finalize()
531            };
532            db.apply_batch(finalized).await.unwrap();
533            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
534            assert!(db.get(&k2).await.unwrap().is_none());
535            assert_eq!(db.bounds().await.end, 3);
536            assert_eq!(db.get_metadata().await.unwrap(), metadata.clone());
537
538            // Set and commit the second key.
539            let finalized = {
540                let mut batch = db.new_batch();
541                batch.set(k2, v2.clone());
542                batch.merkleize(None).finalize()
543            };
544            db.apply_batch(finalized).await.unwrap();
545            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
546            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
547            assert_eq!(db.bounds().await.end, 5);
548            assert_eq!(db.get_metadata().await.unwrap(), None);
549
550            // Capture state.
551            let root = db.root();
552
553            // Add an uncommitted op then simulate failure.
554            let k3 = Sha256::fill(3u8);
555            let v3 = vec![9, 10, 11];
556            {
557                let mut batch = db.new_batch();
558                batch.set(k3, v3);
559                // Don't merkleize/finalize/apply -- simulate failed commit
560            }
561
562            // Reopen, make sure state is restored to last commit point.
563            drop(db); // Simulate failed commit
564            let db = open_db(context.with_label("second")).await;
565            assert!(db.get(&k3).await.unwrap().is_none());
566            assert_eq!(db.bounds().await.end, 5);
567            assert_eq!(db.root(), root);
568            assert_eq!(db.get_metadata().await.unwrap(), None);
569
570            // Cleanup.
571            db.destroy().await.unwrap();
572        });
573    }
574
575    #[test_traced("WARN")]
576    pub fn test_immutable_db_build_and_authenticate() {
577        let executor = deterministic::Runner::default();
578        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
579        const ELEMENTS: u64 = 2_000;
580        executor.start(|context| async move {
581            let mut hasher = StandardHasher::<Sha256>::new();
582            let mut db = open_db(context.with_label("first")).await;
583
584            let finalized = {
585                let mut batch = db.new_batch();
586                for i in 0u64..ELEMENTS {
587                    let k = Sha256::hash(&i.to_be_bytes());
588                    let v = vec![i as u8; 100];
589                    batch.set(k, v);
590                }
591                batch.merkleize(None).finalize()
592            };
593            db.apply_batch(finalized).await.unwrap();
594            assert_eq!(db.bounds().await.end, ELEMENTS + 2);
595
596            // Drop & reopen the db, making sure it has exactly the same state.
597            let root = db.root();
598            drop(db);
599
600            let db = open_db(context.with_label("second")).await;
601            assert_eq!(root, db.root());
602            assert_eq!(db.bounds().await.end, ELEMENTS + 2);
603            for i in 0u64..ELEMENTS {
604                let k = Sha256::hash(&i.to_be_bytes());
605                let v = vec![i as u8; 100];
606                assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
607            }
608
609            // Make sure all ranges of 5 operations are provable, including truncated ranges at the
610            // end.
611            let max_ops = NZU64!(5);
612            for i in 0..*db.bounds().await.end {
613                let (proof, log) = db.proof(Location::new(i), max_ops).await.unwrap();
614                assert!(verify_proof(
615                    &mut hasher,
616                    &proof,
617                    Location::new(i),
618                    &log,
619                    &root
620                ));
621            }
622
623            db.destroy().await.unwrap();
624        });
625    }
626
627    #[test_traced("WARN")]
628    pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
629        let executor = deterministic::Runner::default();
630        executor.start(|context| async move {
631            // Insert 1000 keys then sync.
632            const ELEMENTS: u64 = 1000;
633            let mut db = open_db(context.with_label("first")).await;
634
635            let finalized = {
636                let mut batch = db.new_batch();
637                for i in 0u64..ELEMENTS {
638                    let k = Sha256::hash(&i.to_be_bytes());
639                    let v = vec![i as u8; 100];
640                    batch.set(k, v);
641                }
642                batch.merkleize(None).finalize()
643            };
644            db.apply_batch(finalized).await.unwrap();
645            assert_eq!(db.bounds().await.end, ELEMENTS + 2);
646            db.sync().await.unwrap();
647            let halfway_root = db.root();
648
649            // Insert another 1000 keys then commit.
650            let finalized = {
651                let mut batch = db.new_batch();
652                for i in 0u64..ELEMENTS {
653                    let k = Sha256::hash(&i.to_be_bytes());
654                    let v = vec![i as u8; 100];
655                    batch.set(k, v);
656                }
657                batch.merkleize(None).finalize()
658            };
659            db.apply_batch(finalized).await.unwrap();
660            drop(db); // Drop before syncing
661
662            // Recovery should replay the log to regenerate the MMR.
663            // op_count = 1002 (first batch + commit) + 1000 (second batch) + 1 (second commit) = 2003
664            let db = open_db(context.with_label("second")).await;
665            assert_eq!(db.bounds().await.end, 2003);
666            let root = db.root();
667            assert_ne!(root, halfway_root);
668
669            // Drop & reopen could preserve the final commit.
670            drop(db);
671            let db = open_db(context.with_label("third")).await;
672            assert_eq!(db.bounds().await.end, 2003);
673            assert_eq!(db.root(), root);
674
675            db.destroy().await.unwrap();
676        });
677    }
678
679    #[test_traced("WARN")]
680    pub fn test_immutable_db_recovery_from_failed_log_sync() {
681        let executor = deterministic::Runner::default();
682        executor.start(|context| async move {
683            let mut db = open_db(context.with_label("first")).await;
684
685            // Insert a single key and then commit to create a first commit point.
686            let k1 = Sha256::fill(1u8);
687            let v1 = vec![1, 2, 3];
688            let finalized = {
689                let mut batch = db.new_batch();
690                batch.set(k1, v1);
691                batch.merkleize(None).finalize()
692            };
693            db.apply_batch(finalized).await.unwrap();
694            let first_commit_root = db.root();
695
696            // Simulate failure. Sets that are never merkleized/applied are lost.
697            // Recovery should restore the last commit point.
698            drop(db);
699
700            // Recovery should back up to previous commit point.
701            let db = open_db(context.with_label("second")).await;
702            assert_eq!(db.bounds().await.end, 3);
703            let root = db.root();
704            assert_eq!(root, first_commit_root);
705
706            db.destroy().await.unwrap();
707        });
708    }
709
710    #[test_traced("WARN")]
711    pub fn test_immutable_db_pruning() {
712        let executor = deterministic::Runner::default();
713        // Build a db with `ELEMENTS` key/value pairs then prune some of them.
714        const ELEMENTS: u64 = 2_000;
715        executor.start(|context| async move {
716            let mut db = open_db(context.with_label("first")).await;
717
718            // Batch writes keys in BTreeMap-sorted order, so build the sorted key
719            // list to map between journal locations and keys.
720            let mut sorted_keys: Vec<sha256::Digest> = (1u64..ELEMENTS + 1)
721                .map(|i| Sha256::hash(&i.to_be_bytes()))
722                .collect();
723            sorted_keys.sort();
724            // Location 0: initial commit; locations 1..=ELEMENTS: Set ops in sorted
725            // key order; location ELEMENTS+1: batch commit.
726            // key_at_loc(L) = sorted_keys[L - 1] for 1 <= L <= ELEMENTS.
727
728            let finalized = {
729                let mut batch = db.new_batch();
730                for i in 1u64..ELEMENTS + 1 {
731                    let k = Sha256::hash(&i.to_be_bytes());
732                    let v = vec![i as u8; 100];
733                    batch.set(k, v);
734                }
735                batch.merkleize(None).finalize()
736            };
737            db.apply_batch(finalized).await.unwrap();
738            assert_eq!(db.bounds().await.end, ELEMENTS + 2);
739
740            // Prune the db to the first half of the operations.
741            db.prune(Location::new((ELEMENTS + 2) / 2))
742                .await
743                .unwrap();
744            let bounds = db.bounds().await;
745            assert_eq!(bounds.end, ELEMENTS + 2);
746
747            // items_per_section is 5, so half should be exactly at a blob boundary, in which case
748            // the actual pruning location should match the requested.
749            let oldest_retained_loc = bounds.start;
750            assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
751
752            // Try to fetch a pruned key (at location oldest_retained - 1).
753            let pruned_key = sorted_keys[*oldest_retained_loc as usize - 2];
754            assert!(db.get(&pruned_key).await.unwrap().is_none());
755
756            // Try to fetch unpruned key (at location oldest_retained).
757            let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
758            assert!(db.get(&unpruned_key).await.unwrap().is_some());
759
760            // Drop & reopen the db, making sure it has exactly the same state.
761            let root = db.root();
762            db.sync().await.unwrap();
763            drop(db);
764
765            let mut db = open_db(context.with_label("second")).await;
766            assert_eq!(root, db.root());
767            let bounds = db.bounds().await;
768            assert_eq!(bounds.end, ELEMENTS + 2);
769            let oldest_retained_loc = bounds.start;
770            assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
771
772            // Prune to a non-blob boundary.
773            let loc = Location::new(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
774            db.prune(loc).await.unwrap();
775            // Actual boundary should be a multiple of 5.
776            let oldest_retained_loc = db.bounds().await.start;
777            assert_eq!(
778                oldest_retained_loc,
779                Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
780            );
781
782            // Confirm boundary persists across restart.
783            db.sync().await.unwrap();
784            drop(db);
785            let db = open_db(context.with_label("third")).await;
786            let oldest_retained_loc = db.bounds().await.start;
787            assert_eq!(
788                oldest_retained_loc,
789                Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
790            );
791
792            // Try to fetch a pruned key (at location oldest_retained - 3).
793            let pruned_key = sorted_keys[*oldest_retained_loc as usize - 4];
794            assert!(db.get(&pruned_key).await.unwrap().is_none());
795
796            // Try to fetch unpruned key (at location oldest_retained).
797            let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
798            assert!(db.get(&unpruned_key).await.unwrap().is_some());
799
800            // Confirm behavior of trying to create a proof of pruned items is as expected.
801            let pruned_pos = ELEMENTS / 2;
802            let proof_result = db
803                .proof(
804                    Location::new(pruned_pos),
805                    NZU64!(pruned_pos + 100),
806                )
807                .await;
808            assert!(matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos));
809
810            db.destroy().await.unwrap();
811        });
812    }
813
814    #[test_traced("INFO")]
815    pub fn test_immutable_db_prune_beyond_commit() {
816        let executor = deterministic::Runner::default();
817        executor.start(|context| async move {
818            let mut db = open_db(context.with_label("test")).await;
819
820            // Test pruning empty database (no commits)
821            let result = db.prune(Location::new(1)).await;
822            assert!(
823                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
824                    if prune_loc == Location::new(1) && commit_loc == Location::new(0))
825            );
826
827            // Add key-value pairs and commit
828            let k1 = Digest::from(*b"12345678901234567890123456789012");
829            let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
830            let k3 = Digest::from(*b"99999999999999999999999999999999");
831            let v1 = vec![1u8; 16];
832            let v2 = vec![2u8; 16];
833            let v3 = vec![3u8; 16];
834
835            let finalized = {
836                let mut batch = db.new_batch();
837                batch.set(k1, v1.clone());
838                batch.set(k2, v2.clone());
839                batch.merkleize(None).finalize()
840            };
841            db.apply_batch(finalized).await.unwrap();
842
843            // op_count is 4 (initial_commit, k1, k2, commit), last_commit is at location 3
844            assert_eq!(*db.last_commit_loc, 3);
845
846            let finalized = {
847                let mut batch = db.new_batch();
848                batch.set(k3, v3.clone());
849                batch.merkleize(None).finalize()
850            };
851            db.apply_batch(finalized).await.unwrap();
852
853            // Test valid prune (at previous commit location 3)
854            assert!(db.prune(Location::new(3)).await.is_ok());
855
856            // Test pruning beyond last commit
857            let new_last_commit = db.last_commit_loc;
858            let beyond = new_last_commit + 1;
859            let result = db.prune(beyond).await;
860            assert!(
861                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
862                    if prune_loc == beyond && commit_loc == new_last_commit)
863            );
864
865            db.destroy().await.unwrap();
866        });
867    }
868
869    use crate::{
870        kv::tests::{assert_gettable, assert_send},
871        qmdb::store::tests::{assert_log_store, assert_merkleized_store},
872    };
873
874    type TestDb = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
875
876    #[allow(dead_code)]
877    fn assert_db_futures_are_send(db: &mut TestDb, key: Digest, loc: Location) {
878        assert_gettable(db, &key);
879        assert_log_store(db);
880        assert_merkleized_store(db, loc);
881        assert_send(db.sync());
882    }
883
884    type Db = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
885
886    /// batch.get() reads pending mutations and falls through to base DB.
887    #[test_traced("INFO")]
888    fn test_immutable_batch_get_read_through() {
889        let executor = deterministic::Runner::default();
890        executor.start(|context| async move {
891            let cfg = db_config("readthrough", &context);
892            let mut db: Db = Immutable::init(context.with_label("db"), cfg)
893                .await
894                .unwrap();
895
896            // Pre-populate with key A.
897            let key_a = Sha256::hash(&0u64.to_be_bytes());
898            let val_a = vec![1u8; 8];
899            let finalized = {
900                let mut batch = db.new_batch();
901                batch.set(key_a, val_a.clone());
902                batch.merkleize(None).finalize()
903            };
904            db.apply_batch(finalized).await.unwrap();
905
906            // batch.get(&A) should return DB value.
907            let mut batch = db.new_batch();
908            assert_eq!(batch.get(&key_a).await.unwrap(), Some(val_a));
909
910            // Set B in batch, batch.get(&B) returns the value.
911            let key_b = Sha256::hash(&1u64.to_be_bytes());
912            let val_b = vec![2u8; 8];
913            batch.set(key_b, val_b.clone());
914            assert_eq!(batch.get(&key_b).await.unwrap(), Some(val_b));
915
916            // Nonexistent key.
917            let key_c = Sha256::hash(&2u64.to_be_bytes());
918            assert_eq!(batch.get(&key_c).await.unwrap(), None);
919
920            db.destroy().await.unwrap();
921        });
922    }
923
924    /// Child batch reads parent diff and adds its own mutations.
925    #[test_traced("INFO")]
926    fn test_immutable_batch_stacked_get() {
927        let executor = deterministic::Runner::default();
928        executor.start(|context| async move {
929            let cfg = db_config("stacked-get", &context);
930            let db: Db = Immutable::init(context.with_label("db"), cfg)
931                .await
932                .unwrap();
933
934            // Parent batch: set A.
935            let key_a = Sha256::hash(&0u64.to_be_bytes());
936            let val_a = vec![10u8; 8];
937            let mut parent = db.new_batch();
938            parent.set(key_a, val_a.clone());
939            let parent_m = parent.merkleize(None);
940
941            // Child reads parent's A.
942            let mut child = parent_m.new_batch();
943            assert_eq!(child.get(&key_a).await.unwrap(), Some(val_a));
944
945            // Child sets B.
946            let key_b = Sha256::hash(&1u64.to_be_bytes());
947            let val_b = vec![20u8; 8];
948            child.set(key_b, val_b.clone());
949            assert_eq!(child.get(&key_b).await.unwrap(), Some(val_b));
950
951            // Nonexistent key.
952            let key_c = Sha256::hash(&2u64.to_be_bytes());
953            assert_eq!(child.get(&key_c).await.unwrap(), None);
954
955            db.destroy().await.unwrap();
956        });
957    }
958
959    /// Two-level stacked batch finalize and apply works end-to-end.
960    #[test_traced("INFO")]
961    fn test_immutable_batch_stacked_finalize_apply() {
962        let executor = deterministic::Runner::default();
963        executor.start(|context| async move {
964            let cfg = db_config("stacked-apply", &context);
965            let mut db: Db = Immutable::init(context.with_label("db"), cfg)
966                .await
967                .unwrap();
968
969            // Sort keys so operations are in BTreeMap order (same as merkleize writes).
970            let mut kvs_first: Vec<(Digest, Vec<u8>)> = (0u64..5)
971                .map(|i| (Sha256::hash(&i.to_be_bytes()), vec![i as u8; 8]))
972                .collect();
973            kvs_first.sort_by(|a, b| a.0.cmp(&b.0));
974
975            let mut kvs_second: Vec<(Digest, Vec<u8>)> = (5u64..10)
976                .map(|i| (Sha256::hash(&i.to_be_bytes()), vec![i as u8; 8]))
977                .collect();
978            kvs_second.sort_by(|a, b| a.0.cmp(&b.0));
979
980            // Parent batch: set keys 0..5.
981            let mut parent = db.new_batch();
982            for (k, v) in &kvs_first {
983                parent.set(*k, v.clone());
984            }
985            let parent_m = parent.merkleize(None);
986
987            // Child batch: set keys 5..10.
988            let mut child = parent_m.new_batch();
989            for (k, v) in &kvs_second {
990                child.set(*k, v.clone());
991            }
992            let child_m = child.merkleize(None);
993            let expected_root = child_m.root();
994            let finalized = child_m.finalize();
995            db.apply_batch(finalized).await.unwrap();
996
997            assert_eq!(db.root(), expected_root);
998
999            // All 10 keys should be accessible.
1000            for (k, v) in kvs_first.iter().chain(kvs_second.iter()) {
1001                assert_eq!(db.get(k).await.unwrap(), Some(v.clone()));
1002            }
1003
1004            db.destroy().await.unwrap();
1005        });
1006    }
1007
1008    /// MerkleizedBatch::root() matches db.root() after apply_batch().
1009    #[test_traced("INFO")]
1010    fn test_immutable_batch_speculative_root() {
1011        let executor = deterministic::Runner::default();
1012        executor.start(|context| async move {
1013            let mut db = open_db(context.with_label("db")).await;
1014
1015            let merkleized = {
1016                let mut batch = db.new_batch();
1017                for i in 0u8..10 {
1018                    let k = Sha256::hash(&[i]);
1019                    batch.set(k, vec![i; 16]);
1020                }
1021                batch.merkleize(None)
1022            };
1023
1024            let speculative = merkleized.root();
1025            let finalized = merkleized.finalize();
1026            db.apply_batch(finalized).await.unwrap();
1027            assert_eq!(db.root(), speculative);
1028
1029            // Second batch with metadata.
1030            let metadata = vec![55u8; 8];
1031            let merkleized = {
1032                let mut batch = db.new_batch();
1033                let k = Sha256::hash(&[0xAA]);
1034                batch.set(k, vec![0xAA; 20]);
1035                batch.merkleize(Some(metadata))
1036            };
1037            let speculative = merkleized.root();
1038            let finalized = merkleized.finalize();
1039            db.apply_batch(finalized).await.unwrap();
1040            assert_eq!(db.root(), speculative);
1041
1042            db.destroy().await.unwrap();
1043        });
1044    }
1045
1046    /// MerkleizedBatch::get() reads from diff and base DB.
1047    #[test_traced("INFO")]
1048    fn test_immutable_merkleized_batch_get() {
1049        let executor = deterministic::Runner::default();
1050        executor.start(|context| async move {
1051            let mut db = open_db(context.with_label("db")).await;
1052
1053            // Pre-populate base DB.
1054            let key_a = Sha256::hash(&0u64.to_be_bytes());
1055            let val_a = vec![10u8; 12];
1056            let finalized = {
1057                let mut batch = db.new_batch();
1058                batch.set(key_a, val_a.clone());
1059                batch.merkleize(None).finalize()
1060            };
1061            db.apply_batch(finalized).await.unwrap();
1062
1063            // Create a merkleized batch with a new key.
1064            let key_b = Sha256::hash(&1u64.to_be_bytes());
1065            let val_b = vec![20u8; 16];
1066            let mut batch = db.new_batch();
1067            batch.set(key_b, val_b.clone());
1068            let merkleized = batch.merkleize(None);
1069
1070            // Read base DB value through merkleized batch.
1071            assert_eq!(merkleized.get(&key_a).await.unwrap(), Some(val_a));
1072
1073            // Read this batch's key from the diff.
1074            assert_eq!(merkleized.get(&key_b).await.unwrap(), Some(val_b));
1075
1076            // Nonexistent key.
1077            let key_c = Sha256::hash(&2u64.to_be_bytes());
1078            assert_eq!(merkleized.get(&key_c).await.unwrap(), None);
1079
1080            db.destroy().await.unwrap();
1081        });
1082    }
1083
1084    /// Independent sequential batches applied one at a time.
1085    #[test_traced("INFO")]
1086    fn test_immutable_batch_sequential_apply() {
1087        let executor = deterministic::Runner::default();
1088        executor.start(|context| async move {
1089            let mut db = open_db(context.with_label("db")).await;
1090
1091            let key_a = Sha256::hash(&0u64.to_be_bytes());
1092            let val_a = vec![1u8; 8];
1093
1094            // First batch.
1095            let mut batch = db.new_batch();
1096            batch.set(key_a, val_a.clone());
1097            let m = batch.merkleize(None);
1098            let root1 = m.root();
1099            db.apply_batch(m.finalize()).await.unwrap();
1100            assert_eq!(db.root(), root1);
1101            assert_eq!(db.get(&key_a).await.unwrap(), Some(val_a));
1102
1103            // Second independent batch.
1104            let key_b = Sha256::hash(&1u64.to_be_bytes());
1105            let val_b = vec![2u8; 16];
1106            let mut batch = db.new_batch();
1107            batch.set(key_b, val_b.clone());
1108            let m = batch.merkleize(None);
1109            let root2 = m.root();
1110            db.apply_batch(m.finalize()).await.unwrap();
1111            assert_eq!(db.root(), root2);
1112            assert_eq!(db.get(&key_b).await.unwrap(), Some(val_b));
1113
1114            db.destroy().await.unwrap();
1115        });
1116    }
1117
1118    /// Many sequential batches accumulate correctly.
1119    #[test_traced("INFO")]
1120    fn test_immutable_batch_many_sequential() {
1121        let executor = deterministic::Runner::default();
1122        executor.start(|context| async move {
1123            let mut db = open_db(context.with_label("db")).await;
1124            let mut hasher = StandardHasher::<Sha256>::new();
1125
1126            const BATCHES: u64 = 20;
1127            const KEYS_PER_BATCH: u64 = 5;
1128
1129            let mut all_kvs: Vec<(Digest, Vec<u8>)> = Vec::new();
1130
1131            for batch_idx in 0..BATCHES {
1132                let finalized = {
1133                    let mut batch = db.new_batch();
1134                    for j in 0..KEYS_PER_BATCH {
1135                        let seed = batch_idx * 100 + j;
1136                        let k = Sha256::hash(&seed.to_be_bytes());
1137                        let v = vec![seed as u8; 8];
1138                        batch.set(k, v.clone());
1139                        all_kvs.push((k, v));
1140                    }
1141                    batch.merkleize(None).finalize()
1142                };
1143                db.apply_batch(finalized).await.unwrap();
1144            }
1145
1146            // Verify all key-values are readable.
1147            for (k, v) in &all_kvs {
1148                assert_eq!(db.get(k).await.unwrap(), Some(v.clone()));
1149            }
1150
1151            // Verify proof over the full range.
1152            let root = db.root();
1153            let (proof, ops) = db.proof(Location::new(0), NZU64!(10000)).await.unwrap();
1154            assert!(verify_proof(
1155                &mut hasher,
1156                &proof,
1157                Location::new(0),
1158                &ops,
1159                &root
1160            ));
1161
1162            // Expected: 1 initial commit + BATCHES * (KEYS_PER_BATCH + 1 commit).
1163            let expected = 1 + BATCHES * (KEYS_PER_BATCH + 1);
1164            assert_eq!(db.bounds().await.end, expected);
1165
1166            db.destroy().await.unwrap();
1167        });
1168    }
1169
1170    /// Empty batch (zero mutations) produces correct speculative root.
1171    #[test_traced("INFO")]
1172    fn test_immutable_batch_empty() {
1173        let executor = deterministic::Runner::default();
1174        executor.start(|context| async move {
1175            let mut db = open_db(context.with_label("db")).await;
1176
1177            // Apply a non-empty batch first.
1178            let finalized = {
1179                let mut batch = db.new_batch();
1180                let k = Sha256::hash(&[1u8]);
1181                batch.set(k, vec![1u8; 8]);
1182                batch.merkleize(None).finalize()
1183            };
1184            db.apply_batch(finalized).await.unwrap();
1185            let root_before = db.root();
1186            let size_before = db.bounds().await.end;
1187
1188            // Empty batch with no mutations.
1189            let merkleized = db.new_batch().merkleize(None);
1190            let speculative = merkleized.root();
1191            let finalized = merkleized.finalize();
1192            db.apply_batch(finalized).await.unwrap();
1193
1194            // Root changed (a new Commit op was appended).
1195            assert_ne!(db.root(), root_before);
1196            assert_eq!(db.root(), speculative);
1197            // Size grew by exactly 1 (the Commit op).
1198            assert_eq!(db.bounds().await.end, size_before + 1);
1199
1200            db.destroy().await.unwrap();
1201        });
1202    }
1203
1204    /// MerkleizedBatch::get() works on a chained child's merkleized batch.
1205    #[test_traced("INFO")]
1206    fn test_immutable_batch_chained_merkleized_get() {
1207        let executor = deterministic::Runner::default();
1208        executor.start(|context| async move {
1209            let mut db = open_db(context.with_label("db")).await;
1210
1211            // Pre-populate base DB.
1212            let key_a = Sha256::hash(&0u64.to_be_bytes());
1213            let val_a = vec![10u8; 12];
1214            let finalized = {
1215                let mut batch = db.new_batch();
1216                batch.set(key_a, val_a.clone());
1217                batch.merkleize(None).finalize()
1218            };
1219            db.apply_batch(finalized).await.unwrap();
1220
1221            // Parent batch sets key B.
1222            let key_b = Sha256::hash(&1u64.to_be_bytes());
1223            let val_b = vec![1u8; 8];
1224            let mut parent = db.new_batch();
1225            parent.set(key_b, val_b.clone());
1226            let parent_m = parent.merkleize(None);
1227
1228            // Child batch sets key C.
1229            let key_c = Sha256::hash(&2u64.to_be_bytes());
1230            let val_c = vec![2u8; 16];
1231            let mut child = parent_m.new_batch();
1232            child.set(key_c, val_c.clone());
1233            let child_m = child.merkleize(None);
1234
1235            // Child's MerkleizedBatch can read all three layers:
1236            // base DB value
1237            assert_eq!(child_m.get(&key_a).await.unwrap(), Some(val_a));
1238            // parent diff value
1239            assert_eq!(child_m.get(&key_b).await.unwrap(), Some(val_b));
1240            // child's own value
1241            assert_eq!(child_m.get(&key_c).await.unwrap(), Some(val_c));
1242            // nonexistent key
1243            let key_d = Sha256::hash(&3u64.to_be_bytes());
1244            assert_eq!(child_m.get(&key_d).await.unwrap(), None);
1245
1246            db.destroy().await.unwrap();
1247        });
1248    }
1249
1250    /// Large single batch, verifying all values and proof.
1251    #[test_traced("INFO")]
1252    fn test_immutable_batch_large() {
1253        let executor = deterministic::Runner::default();
1254        executor.start(|context| async move {
1255            let mut db = open_db(context.with_label("db")).await;
1256            let mut hasher = StandardHasher::<Sha256>::new();
1257
1258            const N: u64 = 500;
1259            let mut kvs: Vec<(Digest, Vec<u8>)> = Vec::new();
1260
1261            let finalized = {
1262                let mut batch = db.new_batch();
1263                for i in 0..N {
1264                    let k = Sha256::hash(&i.to_be_bytes());
1265                    let v = vec![(i % 256) as u8; ((i % 29) + 3) as usize];
1266                    batch.set(k, v.clone());
1267                    kvs.push((k, v));
1268                }
1269                batch.merkleize(None).finalize()
1270            };
1271            db.apply_batch(finalized).await.unwrap();
1272
1273            // Verify every value.
1274            for (k, v) in &kvs {
1275                assert_eq!(db.get(k).await.unwrap(), Some(v.clone()));
1276            }
1277
1278            // Verify proof over the full range.
1279            let root = db.root();
1280            let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1281            assert!(verify_proof(
1282                &mut hasher,
1283                &proof,
1284                Location::new(0),
1285                &ops,
1286                &root
1287            ));
1288
1289            // Expected: 1 initial commit + N sets + 1 commit.
1290            assert_eq!(db.bounds().await.end, 1 + N + 1);
1291
1292            db.destroy().await.unwrap();
1293        });
1294    }
1295
1296    /// Child batch overrides same key set by parent.
1297    #[test_traced("INFO")]
1298    fn test_immutable_batch_chained_key_override() {
1299        let executor = deterministic::Runner::default();
1300        executor.start(|context| async move {
1301            let mut db = open_db(context.with_label("db")).await;
1302
1303            let key = Sha256::hash(&0u64.to_be_bytes());
1304            let val_parent = vec![1u8; 8];
1305            let val_child = vec![2u8; 16];
1306
1307            // Parent sets key.
1308            let mut parent = db.new_batch();
1309            parent.set(key, val_parent.clone());
1310            let parent_m = parent.merkleize(None);
1311
1312            // Child overrides same key.
1313            let mut child = parent_m.new_batch();
1314            child.set(key, val_child.clone());
1315
1316            // Child's pending mutation wins over parent diff.
1317            assert_eq!(child.get(&key).await.unwrap(), Some(val_child.clone()));
1318
1319            let child_m = child.merkleize(None);
1320
1321            // After merkleize, child's diff wins.
1322            assert_eq!(child_m.get(&key).await.unwrap(), Some(val_child.clone()));
1323
1324            // Apply and verify.
1325            let finalized = child_m.finalize();
1326            db.apply_batch(finalized).await.unwrap();
1327            assert_eq!(db.get(&key).await.unwrap(), Some(val_child));
1328
1329            db.destroy().await.unwrap();
1330        });
1331    }
1332
1333    /// Same key set across two sequential applied batches. The immutable DB
1334    /// keeps all versions -- `get()` returns the earliest non-pruned value.
1335    /// After pruning the first version, `get()` returns the second.
1336    #[test_traced("INFO")]
1337    fn test_immutable_batch_sequential_key_override() {
1338        let executor = deterministic::Runner::default();
1339        executor.start(|context| async move {
1340            let cfg = Config {
1341                // Use items_per_section=1 so pruning is granular.
1342                log_items_per_section: NZU64!(1),
1343                ..db_config("key-override", &context)
1344            };
1345            let mut db: Db = Immutable::init(context.with_label("db"), cfg)
1346                .await
1347                .unwrap();
1348
1349            let key = Sha256::hash(&0u64.to_be_bytes());
1350            let v1 = vec![1u8; 8];
1351            let v2 = vec![2u8; 16];
1352
1353            // First batch sets key.
1354            // Layout: 0=initial commit, 1=Set(key,v1), 2=Commit
1355            let finalized = {
1356                let mut batch = db.new_batch();
1357                batch.set(key, v1.clone());
1358                batch.merkleize(None).finalize()
1359            };
1360            db.apply_batch(finalized).await.unwrap();
1361            assert_eq!(db.get(&key).await.unwrap(), Some(v1.clone()));
1362
1363            // Second batch sets same key to different value.
1364            // Layout continues: 3=Set(key,v2), 4=Commit
1365            let finalized = {
1366                let mut batch = db.new_batch();
1367                batch.set(key, v2.clone());
1368                batch.merkleize(None).finalize()
1369            };
1370            db.apply_batch(finalized).await.unwrap();
1371
1372            // Immutable DB returns the earliest non-pruned value.
1373            assert_eq!(db.get(&key).await.unwrap(), Some(v1.clone()));
1374
1375            // Prune past the first Set (loc 1). With items_per_section=1,
1376            // pruning to loc 2 should remove the blob containing loc 1.
1377            db.prune(Location::new(2)).await.unwrap();
1378            assert_eq!(db.get(&key).await.unwrap(), Some(v2.clone()));
1379
1380            // Verify persists across reopen.
1381            db.sync().await.unwrap();
1382
1383            db.destroy().await.unwrap();
1384        });
1385    }
1386
1387    /// Metadata propagates through merkleize and clears with None.
1388    #[test_traced("INFO")]
1389    fn test_immutable_batch_metadata() {
1390        let executor = deterministic::Runner::default();
1391        executor.start(|context| async move {
1392            let mut db = open_db(context.with_label("db")).await;
1393
1394            // Batch with metadata.
1395            let metadata = vec![42u8; 32];
1396            let finalized = {
1397                let mut batch = db.new_batch();
1398                let k = Sha256::hash(&[1u8]);
1399                batch.set(k, vec![1u8; 8]);
1400                batch.merkleize(Some(metadata.clone())).finalize()
1401            };
1402            db.apply_batch(finalized).await.unwrap();
1403            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1404
1405            // Second batch clears metadata.
1406            let finalized = db.new_batch().merkleize(None).finalize();
1407            db.apply_batch(finalized).await.unwrap();
1408            assert_eq!(db.get_metadata().await.unwrap(), None);
1409
1410            db.destroy().await.unwrap();
1411        });
1412    }
1413
1414    #[test_traced]
1415    fn test_stale_changeset_rejected() {
1416        let executor = deterministic::Runner::default();
1417        executor.start(|context| async move {
1418            let mut db = open_db(context.with_label("db")).await;
1419
1420            let key1 = Sha256::hash(&[1]);
1421            let key2 = Sha256::hash(&[2]);
1422
1423            // Create two batches from the same DB state.
1424            let changeset_a = {
1425                let mut batch = db.new_batch();
1426                batch.set(key1, vec![10]);
1427                batch.merkleize(None).finalize()
1428            };
1429            let changeset_b = {
1430                let mut batch = db.new_batch();
1431                batch.set(key2, vec![20]);
1432                batch.merkleize(None).finalize()
1433            };
1434
1435            // Apply the first -- should succeed.
1436            db.apply_batch(changeset_a).await.unwrap();
1437            let expected_root = db.root();
1438            let expected_bounds = db.bounds().await;
1439            assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
1440            assert_eq!(db.get(&key2).await.unwrap(), None);
1441            assert_eq!(db.get_metadata().await.unwrap(), None);
1442
1443            // Apply the second -- should fail because the DB was modified.
1444            let result = db.apply_batch(changeset_b).await;
1445            assert!(
1446                matches!(result, Err(Error::StaleChangeset { .. })),
1447                "expected StaleChangeset error, got {result:?}"
1448            );
1449            assert_eq!(db.root(), expected_root);
1450            assert_eq!(db.bounds().await, expected_bounds);
1451            assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
1452            assert_eq!(db.get(&key2).await.unwrap(), None);
1453            assert_eq!(db.get_metadata().await.unwrap(), None);
1454
1455            db.destroy().await.unwrap();
1456        });
1457    }
1458
1459    #[test_traced]
1460    fn test_stale_changeset_chained() {
1461        let executor = deterministic::Runner::default();
1462        executor.start(|context| async move {
1463            let mut db = open_db(context.with_label("db")).await;
1464
1465            let key1 = Sha256::hash(&[1]);
1466            let key2 = Sha256::hash(&[2]);
1467            let key3 = Sha256::hash(&[3]);
1468
1469            // Parent batch.
1470            let mut parent = db.new_batch();
1471            parent.set(key1, vec![1]);
1472            let parent_m = parent.merkleize(None);
1473
1474            // Fork two children from the same parent.
1475            let child_a = {
1476                let mut batch = parent_m.new_batch();
1477                batch.set(key2, vec![2]);
1478                batch.merkleize(None).finalize()
1479            };
1480            let child_b = {
1481                let mut batch = parent_m.new_batch();
1482                batch.set(key3, vec![3]);
1483                batch.merkleize(None).finalize()
1484            };
1485
1486            // Apply child A.
1487            db.apply_batch(child_a).await.unwrap();
1488
1489            // Child B is stale.
1490            let result = db.apply_batch(child_b).await;
1491            assert!(
1492                matches!(result, Err(Error::StaleChangeset { .. })),
1493                "expected StaleChangeset error, got {result:?}"
1494            );
1495
1496            db.destroy().await.unwrap();
1497        });
1498    }
1499
1500    #[test_traced]
1501    fn test_stale_changeset_parent_applied_before_child() {
1502        let executor = deterministic::Runner::default();
1503        executor.start(|context| async move {
1504            let mut db = open_db(context.with_label("db")).await;
1505
1506            let key1 = Sha256::hash(&[1]);
1507            let key2 = Sha256::hash(&[2]);
1508
1509            // Parent batch.
1510            let mut parent = db.new_batch();
1511            parent.set(key1, vec![1]);
1512            let parent_m = parent.merkleize(None);
1513
1514            // Child batch.
1515            let mut child = parent_m.new_batch();
1516            child.set(key2, vec![2]);
1517            let child_changeset = child.merkleize(None).finalize();
1518
1519            // Apply parent first.
1520            let parent_changeset = parent_m.finalize();
1521            db.apply_batch(parent_changeset).await.unwrap();
1522
1523            // Child is stale because it expected to be applied on top of the
1524            // pre-parent DB state.
1525            let result = db.apply_batch(child_changeset).await;
1526            assert!(
1527                matches!(result, Err(Error::StaleChangeset { .. })),
1528                "expected StaleChangeset error, got {result:?}"
1529            );
1530
1531            db.destroy().await.unwrap();
1532        });
1533    }
1534
1535    #[test_traced]
1536    fn test_stale_changeset_child_applied_before_parent() {
1537        let executor = deterministic::Runner::default();
1538        executor.start(|context| async move {
1539            let mut db = open_db(context.with_label("db")).await;
1540
1541            let key1 = Sha256::hash(&[1]);
1542            let key2 = Sha256::hash(&[2]);
1543
1544            // Parent batch.
1545            let mut parent = db.new_batch();
1546            parent.set(key1, vec![1]);
1547            let parent_m = parent.merkleize(None);
1548
1549            // Child batch. Finalize both before applying either so the
1550            // borrow on `db` through `parent_m` is released.
1551            let mut child = parent_m.new_batch();
1552            child.set(key2, vec![2]);
1553            let child_changeset = child.merkleize(None).finalize();
1554            let parent_changeset = parent_m.finalize();
1555
1556            // Apply child first (it carries all parent ops too).
1557            db.apply_batch(child_changeset).await.unwrap();
1558
1559            // Parent is stale.
1560            let result = db.apply_batch(parent_changeset).await;
1561            assert!(
1562                matches!(result, Err(Error::StaleChangeset { .. })),
1563                "expected StaleChangeset error, got {result:?}"
1564            );
1565
1566            db.destroy().await.unwrap();
1567        });
1568    }
1569}