Skip to main content

commonware_storage/qmdb/keyless/
mod.rs

1//! The [Keyless] qmdb allows for append-only storage of arbitrary variable-length data that can
2//! later be retrieved by its location.
3//!
4//! # Examples
5//!
6//! ```ignore
7//! let mut batch = db.new_batch();
8//! let loc = batch.append(value);  // returns the location
9//! let merkleized = batch.merkleize(None);  // synchronous
10//! let finalized = merkleized.finalize();
11//! db.apply_batch(finalized).await?;
12//! ```
13
14use crate::{
15    journal::{
16        authenticated,
17        contiguous::{
18            variable::{Config as JournalConfig, Journal as ContiguousJournal},
19            Contiguous, Reader,
20        },
21    },
22    mmr::{
23        journaled::{Config as MmrConfig, Mmr},
24        Location, Proof,
25    },
26    qmdb::{
27        any::VariableValue,
28        operation::Committable,
29        store::{LogStore, MerkleizedStore},
30        Error,
31    },
32};
33use commonware_cryptography::Hasher;
34use commonware_parallel::ThreadPool;
35use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
36use std::num::{NonZeroU64, NonZeroUsize};
37use tracing::{debug, warn};
38
39pub mod batch;
40mod operation;
41pub use operation::Operation;
42
43/// Configuration for a [Keyless] authenticated db.
44#[derive(Clone)]
45pub struct Config<C> {
46    /// The name of the [Storage] partition used for the MMR's backing journal.
47    pub mmr_journal_partition: String,
48
49    /// The items per blob configuration value used by the MMR journal.
50    pub mmr_items_per_blob: NonZeroU64,
51
52    /// The size of the write buffer to use for each blob in the MMR journal.
53    pub mmr_write_buffer: NonZeroUsize,
54
55    /// The name of the [Storage] partition used for the MMR's metadata.
56    pub mmr_metadata_partition: String,
57
58    /// The name of the [Storage] partition used to persist the operations log.
59    pub log_partition: String,
60
61    /// The size of the write buffer to use with the log journal.
62    pub log_write_buffer: NonZeroUsize,
63
64    /// Optional compression level (using `zstd`) to apply to log data before storing.
65    pub log_compression: Option<u8>,
66
67    /// The codec configuration to use for encoding and decoding the operations log.
68    pub log_codec_config: C,
69
70    /// The max number of operations to put in each section of the operations log.
71    pub log_items_per_section: NonZeroU64,
72
73    /// An optional thread pool to use for parallelizing batch MMR operations.
74    pub thread_pool: Option<ThreadPool>,
75
76    /// The page cache to use for caching data.
77    pub page_cache: CacheRef,
78}
79
80/// A keyless QMDB for variable length data.
81type Journal<E, V, H> = authenticated::Journal<E, ContiguousJournal<E, Operation<V>>, H>;
82
83/// A keyless authenticated database for variable-length data.
84pub struct Keyless<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> {
85    /// Authenticated journal of operations.
86    journal: Journal<E, V, H>,
87
88    /// The location of the last commit, if any.
89    last_commit_loc: Location,
90}
91
92impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> Keyless<E, V, H> {
93    /// Get the value at location `loc` in the database.
94    ///
95    /// # Errors
96    ///
97    /// Returns [Error::LocationOutOfBounds] if `loc` >= `self.bounds().await.end`.
98    pub async fn get(&self, loc: Location) -> Result<Option<V>, Error> {
99        let reader = self.journal.reader().await;
100        let op_count = reader.bounds().end;
101        if loc >= op_count {
102            return Err(Error::LocationOutOfBounds(loc, Location::new(op_count)));
103        }
104        let op = reader.read(*loc).await?;
105
106        Ok(op.into_value())
107    }
108
109    /// Returns the location of the last commit.
110    pub const fn last_commit_loc(&self) -> Location {
111        self.last_commit_loc
112    }
113
114    /// Get the metadata associated with the last commit.
115    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
116        let op = self
117            .journal
118            .reader()
119            .await
120            .read(*self.last_commit_loc)
121            .await?;
122        let Operation::Commit(metadata) = op else {
123            return Ok(None);
124        };
125
126        Ok(metadata)
127    }
128
129    /// Returns a [Keyless] qmdb initialized from `cfg`. Any uncommitted operations will be discarded
130    /// and the state of the db will be as of the last committed operation.
131    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
132        let mmr_cfg = MmrConfig {
133            journal_partition: cfg.mmr_journal_partition,
134            metadata_partition: cfg.mmr_metadata_partition,
135            items_per_blob: cfg.mmr_items_per_blob,
136            write_buffer: cfg.mmr_write_buffer,
137            thread_pool: cfg.thread_pool,
138            page_cache: cfg.page_cache.clone(),
139        };
140
141        let journal_cfg = JournalConfig {
142            partition: cfg.log_partition,
143            items_per_section: cfg.log_items_per_section,
144            compression: cfg.log_compression,
145            codec_config: cfg.log_codec_config,
146            page_cache: cfg.page_cache,
147            write_buffer: cfg.log_write_buffer,
148        };
149
150        let mut journal = Journal::new(context, mmr_cfg, journal_cfg, Operation::is_commit).await?;
151        if journal.size().await == 0 {
152            warn!("no operations found in log, creating initial commit");
153            journal.append(&Operation::Commit(None)).await?;
154            journal.sync().await?;
155        }
156
157        let last_commit_loc = journal
158            .size()
159            .await
160            .checked_sub(1)
161            .expect("at least one commit should exist");
162
163        Ok(Self {
164            journal,
165            last_commit_loc,
166        })
167    }
168
169    /// Return the root of the db.
170    pub fn root(&self) -> H::Digest {
171        self.journal.root()
172    }
173
174    /// Generate and return:
175    ///  1. a proof of all operations applied to the db in the range starting at (and including)
176    ///     location `start_loc`, and ending at the first of either:
177    ///     - the last operation performed, or
178    ///     - the operation `max_ops` from the start.
179    ///  2. the operations corresponding to the leaves in this range.
180    pub async fn proof(
181        &self,
182        start_loc: Location,
183        max_ops: NonZeroU64,
184    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
185        self.historical_proof(self.size().await, start_loc, max_ops)
186            .await
187    }
188
189    /// Analogous to proof, but with respect to the state of the MMR when it had `op_count`
190    /// operations.
191    ///
192    /// # Errors
193    ///
194    /// - Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
195    ///   [crate::mmr::MAX_LOCATION].
196    /// - Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= `op_count` or `op_count` >
197    ///   number of operations.
198    pub async fn historical_proof(
199        &self,
200        op_count: Location,
201        start_loc: Location,
202        max_ops: NonZeroU64,
203    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
204        Ok(self
205            .journal
206            .historical_proof(op_count, start_loc, max_ops)
207            .await?)
208    }
209
210    /// Prune historical operations prior to `loc`. This does not affect the db's root.
211    ///
212    /// # Errors
213    ///
214    /// - Returns [Error::PruneBeyondMinRequired] if `loc` > last commit point.
215    /// - Returns [crate::mmr::Error::LocationOverflow] if `loc` > [crate::mmr::MAX_LOCATION]
216    pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
217        if loc > self.last_commit_loc {
218            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
219        }
220        self.journal.prune(loc).await?;
221
222        Ok(())
223    }
224
225    /// Sync all database state to disk. While this isn't necessary to ensure durability of
226    /// committed operations, periodic invocation may reduce memory usage and the time required to
227    /// recover the database on restart.
228    pub async fn sync(&mut self) -> Result<(), Error> {
229        self.journal.sync().await.map_err(Into::into)
230    }
231
232    /// Destroy the db, removing all data from disk.
233    pub async fn destroy(self) -> Result<(), Error> {
234        Ok(self.journal.destroy().await?)
235    }
236
237    /// Create a new speculative batch of operations with this database as its parent.
238    #[allow(clippy::type_complexity)]
239    pub fn new_batch(&self) -> batch::UnmerkleizedBatch<'_, E, V, H, Mmr<E, H::Digest>> {
240        let journal_size = *self.last_commit_loc + 1;
241        batch::UnmerkleizedBatch {
242            keyless: self,
243            journal_batch: self.journal.new_batch(),
244            appends: Vec::new(),
245            base_operations: Vec::new(),
246            base_size: journal_size,
247            db_size: journal_size,
248        }
249    }
250
251    /// Apply a changeset to the database.
252    ///
253    /// A changeset is only valid if the database has not been modified since the
254    /// batch that produced it was created. Multiple batches can be forked from the
255    /// same parent for speculative execution, but only one may be applied. Applying
256    /// a stale changeset returns [`Error::StaleChangeset`].
257    ///
258    /// Returns the range of locations written.
259    pub async fn apply_batch(
260        &mut self,
261        batch: batch::Changeset<H::Digest, V>,
262    ) -> Result<core::ops::Range<Location>, Error> {
263        let journal_size = *self.last_commit_loc + 1;
264        if batch.db_size != journal_size {
265            return Err(Error::StaleChangeset {
266                expected: batch.db_size,
267                actual: journal_size,
268            });
269        }
270        let start_loc = self.last_commit_loc + 1;
271
272        // Write all operations to the authenticated journal + apply MMR changeset.
273        self.journal.apply_batch(batch.journal_finalized).await?;
274
275        // Flush journal to disk.
276        self.journal.commit().await?;
277
278        // Update state.
279        self.last_commit_loc = Location::new(batch.total_size - 1);
280
281        let end_loc = Location::new(batch.total_size);
282        debug!(size = ?end_loc, "applied batch");
283        Ok(start_loc..end_loc)
284    }
285}
286
287impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> MerkleizedStore
288    for Keyless<E, V, H>
289{
290    type Digest = H::Digest;
291    type Operation = Operation<V>;
292
293    fn root(&self) -> Self::Digest {
294        self.journal.root()
295    }
296
297    async fn historical_proof(
298        &self,
299        historical_size: Location,
300        start_loc: Location,
301        max_ops: NonZeroU64,
302    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
303        Ok(self
304            .journal
305            .historical_proof(historical_size, start_loc, max_ops)
306            .await?)
307    }
308}
309
310impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> LogStore for Keyless<E, V, H> {
311    type Value = V;
312
313    async fn bounds(&self) -> std::ops::Range<Location> {
314        let bounds = self.journal.reader().await.bounds();
315        Location::new(bounds.start)..Location::new(bounds.end)
316    }
317
318    async fn get_metadata(&self) -> Result<Option<Self::Value>, Error> {
319        self.get_metadata().await
320    }
321}
322
323#[cfg(test)]
324mod test {
325    use super::*;
326    use crate::{
327        mmr::StandardHasher as Standard,
328        qmdb::{store::LogStore, verify_proof},
329    };
330    use commonware_cryptography::Sha256;
331    use commonware_macros::test_traced;
332    use commonware_runtime::{deterministic, BufferPooler, Metrics, Runner as _};
333    use commonware_utils::{NZUsize, NZU16, NZU64};
334    use rand::Rng;
335    use std::num::NonZeroU16;
336
337    // Use some weird sizes here to test boundary conditions.
338    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
339    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
340
341    fn db_config(
342        suffix: &str,
343        pooler: &impl BufferPooler,
344    ) -> Config<(commonware_codec::RangeCfg<usize>, ())> {
345        Config {
346            mmr_journal_partition: format!("journal-{suffix}"),
347            mmr_metadata_partition: format!("metadata-{suffix}"),
348            mmr_items_per_blob: NZU64!(11),
349            mmr_write_buffer: NZUsize!(1024),
350            log_partition: format!("log-journal-{suffix}"),
351            log_write_buffer: NZUsize!(1024),
352            log_compression: None,
353            log_codec_config: ((0..=10000).into(), ()),
354            log_items_per_section: NZU64!(7),
355            thread_pool: None,
356            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
357        }
358    }
359
360    type Db = Keyless<deterministic::Context, Vec<u8>, Sha256>;
361
362    /// Return a [Keyless] database initialized with a fixed config.
363    async fn open_db(context: deterministic::Context) -> Db {
364        let cfg = db_config("partition", &context);
365        Db::init(context, cfg).await.unwrap()
366    }
367
368    #[test_traced("INFO")]
369    pub fn test_keyless_db_empty() {
370        let executor = deterministic::Runner::default();
371        executor.start(|context| async move {
372            let db = open_db(context.with_label("db1")).await;
373            let bounds = db.bounds().await;
374            assert_eq!(bounds.end, 1); // initial commit should exist
375            assert_eq!(bounds.start, Location::new(0));
376
377            assert_eq!(db.get_metadata().await.unwrap(), None);
378            assert_eq!(db.last_commit_loc(), Location::new(0));
379
380            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
381            let v1 = vec![1u8; 8];
382            let root = db.root();
383            {
384                let mut batch = db.new_batch();
385                batch.append(v1);
386                // Don't merkleize/finalize/apply -- simulate failed commit
387            }
388            drop(db);
389            let mut db = open_db(context.with_label("db2")).await;
390            assert_eq!(db.root(), root);
391            assert_eq!(db.bounds().await.end, 1);
392            assert_eq!(db.get_metadata().await.unwrap(), None);
393
394            // Test calling commit on an empty db which should make it (durably) non-empty.
395            let metadata = vec![3u8; 10];
396            let finalized = db.new_batch().merkleize(Some(metadata.clone())).finalize();
397            db.apply_batch(finalized).await.unwrap();
398            assert_eq!(db.bounds().await.end, 2); // 2 commit ops
399            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
400            assert_eq!(
401                db.get(Location::new(1)).await.unwrap(),
402                Some(metadata.clone())
403            ); // the commit op
404            let root = db.root();
405
406            // Commit op should remain after reopen even without clean shutdown.
407            let db = open_db(context.with_label("db3")).await;
408            assert_eq!(db.bounds().await.end, 2); // commit op should remain after re-open.
409            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
410            assert_eq!(db.root(), root);
411            assert_eq!(db.last_commit_loc(), Location::new(1));
412
413            db.destroy().await.unwrap();
414        });
415    }
416
417    #[test_traced("WARN")]
418    pub fn test_keyless_db_build_basic() {
419        let executor = deterministic::Runner::default();
420        executor.start(|context| async move {
421            // Build a db with 2 values and make sure we can get them back.
422            let mut db = open_db(context.with_label("db1")).await;
423
424            let v1 = vec![1u8; 8];
425            let v2 = vec![2u8; 20];
426
427            let finalized = {
428                let mut batch = db.new_batch();
429                let loc1 = batch.append(v1.clone());
430                let loc2 = batch.append(v2.clone());
431                assert_eq!(loc1, Location::new(1));
432                assert_eq!(loc2, Location::new(2));
433                batch.merkleize(None).finalize()
434            };
435            db.apply_batch(finalized).await.unwrap();
436            let loc1 = Location::new(1);
437            let loc2 = Location::new(2);
438
439            // Make sure closing/reopening gets us back to the same state.
440            assert_eq!(db.bounds().await.end, 4); // 2 appends, 1 commit + 1 initial commit
441            assert_eq!(db.get_metadata().await.unwrap(), None);
442            assert_eq!(db.get(Location::new(3)).await.unwrap(), None); // the commit op
443            let root = db.root();
444            db.sync().await.unwrap();
445            drop(db);
446            let db = open_db(context.with_label("db2")).await;
447            assert_eq!(db.bounds().await.end, 4);
448            assert_eq!(db.root(), root);
449
450            assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
451            assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
452
453            {
454                let mut batch = db.new_batch();
455                batch.append(v2);
456                batch.append(v1);
457                // Don't merkleize/finalize/apply -- simulate failed commit
458            }
459
460            // Make sure uncommitted items get rolled back.
461            drop(db); // Simulate failed commit
462            let db = open_db(context.with_label("db3")).await;
463            assert_eq!(db.bounds().await.end, 4);
464            assert_eq!(db.root(), root);
465
466            // Make sure commit operation remains after drop/reopen.
467            drop(db);
468            let db = open_db(context.with_label("db4")).await;
469            assert_eq!(db.bounds().await.end, 4);
470            assert_eq!(db.root(), root);
471
472            db.destroy().await.unwrap();
473        });
474    }
475
476    // Helper function to generate random values for batch appends.
477    fn generate_values<T: Rng>(rng: &mut T, num_elements: usize) -> Vec<Vec<u8>> {
478        (0..num_elements)
479            .map(|_| vec![(rng.next_u32() % 255) as u8, (rng.next_u32() % 255) as u8])
480            .collect()
481    }
482
483    #[test_traced("WARN")]
484    pub fn test_keyless_db_recovery() {
485        let executor = deterministic::Runner::default();
486        const ELEMENTS: usize = 1000;
487        executor.start(|mut context| async move {
488            let db = open_db(context.with_label("db1")).await;
489            let root = db.root();
490
491            // Create uncommitted appends then simulate failure.
492            {
493                let mut batch = db.new_batch();
494                for value in generate_values(&mut context, ELEMENTS) {
495                    batch.append(value);
496                }
497                // Don't merkleize/finalize/apply -- simulate failed commit
498            }
499            drop(db);
500            // Should rollback to the previous root.
501            let mut db = open_db(context.with_label("db2")).await;
502            assert_eq!(root, db.root());
503
504            // Apply the updates and commit them this time.
505            let finalized = {
506                let mut batch = db.new_batch();
507                for value in generate_values(&mut context, ELEMENTS) {
508                    batch.append(value);
509                }
510                batch.merkleize(None).finalize()
511            };
512            db.apply_batch(finalized).await.unwrap();
513            let root = db.root();
514
515            // Create uncommitted appends then simulate failure.
516            {
517                let mut batch = db.new_batch();
518                for value in generate_values(&mut context, ELEMENTS) {
519                    batch.append(value);
520                }
521                // Don't merkleize/finalize/apply -- simulate failed commit
522            }
523            drop(db);
524            // Should rollback to the previous root.
525            let mut db = open_db(context.with_label("db3")).await;
526            assert_eq!(root, db.root());
527
528            // Apply the updates and commit them this time.
529            let finalized = {
530                let mut batch = db.new_batch();
531                for value in generate_values(&mut context, ELEMENTS) {
532                    batch.append(value);
533                }
534                batch.merkleize(None).finalize()
535            };
536            db.apply_batch(finalized).await.unwrap();
537            let root = db.root();
538
539            // Make sure we can reopen and get back to the same state.
540            drop(db);
541            let db = open_db(context.with_label("db4")).await;
542            assert_eq!(db.bounds().await.end, 2 * ELEMENTS as u64 + 3);
543            assert_eq!(db.root(), root);
544
545            db.destroy().await.unwrap();
546        });
547    }
548
549    /// Test that various types of unclean shutdown while updating a non-empty DB recover to the
550    /// empty DB on re-open.
551    #[test_traced("WARN")]
552    fn test_keyless_db_non_empty_db_recovery() {
553        let executor = deterministic::Runner::default();
554        executor.start(|mut context| async move {
555            let mut db = open_db(context.with_label("db1")).await;
556
557            // Append many values then commit.
558            const ELEMENTS: usize = 200;
559            let finalized = {
560                let mut batch = db.new_batch();
561                for value in generate_values(&mut context, ELEMENTS) {
562                    batch.append(value);
563                }
564                batch.merkleize(None).finalize()
565            };
566            db.apply_batch(finalized).await.unwrap();
567            let root = db.root();
568            let op_count = db.bounds().await.end;
569
570            // Reopen DB without clean shutdown and make sure the state is the same.
571            let db = open_db(context.with_label("db2")).await;
572            assert_eq!(db.bounds().await.end, op_count);
573            assert_eq!(db.root(), root);
574            assert_eq!(db.last_commit_loc(), op_count - 1);
575            drop(db);
576
577            // Insert many operations without commit, then simulate failure.
578            async fn recover_from_failure(
579                mut context: deterministic::Context,
580                label1: &str,
581                label2: &str,
582                root: <Sha256 as Hasher>::Digest,
583                op_count: Location,
584            ) {
585                let db = open_db(context.with_label(label1)).await;
586
587                // Create uncommitted appends and simulate failure.
588                {
589                    let mut batch = db.new_batch();
590                    for value in generate_values(&mut context, ELEMENTS) {
591                        batch.append(value);
592                    }
593                    // Don't merkleize/finalize/apply -- simulate failed commit
594                }
595                drop(db);
596                let db = open_db(context.with_label(label2)).await;
597                assert_eq!(db.bounds().await.end, op_count);
598                assert_eq!(db.root(), root);
599            }
600
601            recover_from_failure(context.with_label("recovery1"), "a", "b", root, op_count).await;
602
603            // Repeat recover_from_failure tests after successfully pruning to the last commit.
604            let mut db = open_db(context.with_label("db3")).await;
605            db.prune(db.last_commit_loc()).await.unwrap();
606            assert_eq!(db.bounds().await.end, op_count);
607            assert_eq!(db.root(), root);
608            db.sync().await.unwrap();
609            drop(db);
610
611            recover_from_failure(context.with_label("recovery2"), "c", "d", root, op_count).await;
612
613            // Apply the ops one last time but fully commit them this time, then clean up.
614            let mut db = open_db(context.with_label("db4")).await;
615            let finalized = {
616                let mut batch = db.new_batch();
617                for value in generate_values(&mut context, ELEMENTS) {
618                    batch.append(value);
619                }
620                batch.merkleize(None).finalize()
621            };
622            db.apply_batch(finalized).await.unwrap();
623            let db = open_db(context.with_label("db5")).await;
624            let bounds = db.bounds().await;
625            assert!(bounds.end > op_count);
626            assert_ne!(db.root(), root);
627            assert_eq!(db.last_commit_loc(), bounds.end - 1);
628
629            db.destroy().await.unwrap();
630        });
631    }
632
633    /// Test that various types of unclean shutdown while updating an empty DB recover to the empty
634    /// DB on re-open.
635    #[test_traced("WARN")]
636    fn test_keyless_db_empty_db_recovery() {
637        const ELEMENTS: u64 = 1000;
638        let executor = deterministic::Runner::default();
639        executor.start(|context| async move {
640            let db = open_db(context.with_label("db1")).await;
641            let root = db.root();
642
643            // Reopen DB without clean shutdown and make sure the state is the same.
644            let db = open_db(context.with_label("db2")).await;
645            assert_eq!(db.bounds().await.end, 1); // initial commit should exist
646            assert_eq!(db.root(), root);
647
648            fn build_values() -> Vec<Vec<u8>> {
649                (0..ELEMENTS)
650                    .map(|i| vec![(i % 255) as u8; ((i % 17) + 13) as usize])
651                    .collect()
652            }
653
654            // Simulate failure after inserting operations without a commit.
655            {
656                let mut batch = db.new_batch();
657                for v in build_values() {
658                    batch.append(v);
659                }
660                // Don't merkleize/finalize/apply -- simulate failed commit
661            }
662            drop(db);
663            let db = open_db(context.with_label("db3")).await;
664            assert_eq!(db.bounds().await.end, 1); // initial commit should exist
665            assert_eq!(db.root(), root);
666
667            // Repeat: simulate failure after inserting operations without a commit.
668            {
669                let mut batch = db.new_batch();
670                for v in build_values() {
671                    batch.append(v);
672                }
673                // Don't merkleize/finalize/apply -- simulate failed commit
674            }
675            drop(db);
676            let db = open_db(context.with_label("db4")).await;
677            assert_eq!(db.bounds().await.end, 1); // initial commit should exist
678            assert_eq!(db.root(), root);
679
680            // One last check that re-open without proper shutdown still recovers the correct state.
681            {
682                let mut batch = db.new_batch();
683                for v in build_values() {
684                    batch.append(v);
685                }
686                for v in build_values() {
687                    batch.append(v);
688                }
689                for v in build_values() {
690                    batch.append(v);
691                }
692                // Don't merkleize/finalize/apply -- simulate failed commit
693            }
694            drop(db);
695            let mut db = open_db(context.with_label("db5")).await;
696            assert_eq!(db.bounds().await.end, 1); // initial commit should exist
697            assert_eq!(db.root(), root);
698            assert_eq!(db.last_commit_loc(), Location::new(0));
699
700            // Apply the ops one last time but fully commit them this time, then clean up.
701            let finalized = {
702                let mut batch = db.new_batch();
703                for v in build_values() {
704                    batch.append(v);
705                }
706                batch.merkleize(None).finalize()
707            };
708            db.apply_batch(finalized).await.unwrap();
709            let db = open_db(context.with_label("db6")).await;
710            assert!(db.bounds().await.end > 1);
711            assert_ne!(db.root(), root);
712
713            db.destroy().await.unwrap();
714        });
715    }
716
717    #[test_traced("INFO")]
718    pub fn test_keyless_db_proof_generation_and_verification() {
719        let executor = deterministic::Runner::default();
720        executor.start(|context| async move {
721            let mut hasher = Standard::<Sha256>::new();
722            let mut db = open_db(context.clone()).await;
723
724            // Build a db with some values
725            const ELEMENTS: u64 = 100;
726            let mut values = Vec::new();
727            let finalized = {
728                let mut batch = db.new_batch();
729                for i in 0u64..ELEMENTS {
730                    let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
731                    values.push(v.clone());
732                    batch.append(v);
733                }
734                batch.merkleize(None).finalize()
735            };
736            db.apply_batch(finalized).await.unwrap();
737
738            // Test that historical proof fails with op_count > number of operations
739            assert!(matches!(
740                db.historical_proof(db.bounds().await.end + 1, Location::new(5), NZU64!(10))
741                    .await,
742                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
743            ));
744
745            let root = db.root();
746
747            // Test proof generation for various ranges
748            let test_cases = vec![
749                (0, 10),           // First 10 operations
750                (10, 5),           // Middle range
751                (50, 20),          // Larger range
752                (90, 15),          // Range that extends beyond end (should be limited)
753                (0, 1),            // Single operation
754                (ELEMENTS - 1, 1), // Last append operation
755                (ELEMENTS, 1),     // The commit operation
756            ];
757
758            for (start_loc, max_ops) in test_cases {
759                let (proof, ops) = db.proof(Location::new(start_loc), NZU64!(max_ops)).await.unwrap();
760
761                // Verify the proof
762                assert!(
763                    verify_proof(&mut hasher, &proof, Location::new(start_loc), &ops, &root),
764                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
765                );
766
767                // Check that we got the expected number of operations
768                let expected_ops = std::cmp::min(max_ops, *db.bounds().await.end - start_loc);
769                assert_eq!(
770                    ops.len() as u64,
771                    expected_ops,
772                    "Expected {expected_ops} operations, got {}",
773                    ops.len(),
774                );
775
776                // Verify operation types
777                for (i, op) in ops.iter().enumerate() {
778                    let loc = start_loc + i as u64;
779                    if loc == 0 {
780                         assert!(
781                            matches!(op, Operation::Commit(None)),
782                            "Expected Initial Commit operation at location {loc}, got {op:?}",
783                        );
784                    } else if loc <= ELEMENTS {
785                        // Should be an Append operation
786                        assert!(
787                            matches!(op, Operation::Append(_)),
788                            "Expected Append operation at location {loc}, got {op:?}",
789                        );
790                    } else if loc == ELEMENTS + 1 {
791                        // Should be a Commit operation
792                        assert!(
793                            matches!(op, Operation::Commit(_)),
794                            "Expected Commit operation at location {loc}, got {op:?}",
795                        );
796                    }
797                }
798
799                // Verify that proof fails with wrong root
800                let wrong_root = Sha256::hash(&[0xFF; 32]);
801                assert!(
802                    !verify_proof(&mut hasher, &proof, Location::new(start_loc), &ops, &wrong_root),
803                    "Proof should fail with wrong root"
804                );
805
806                // Verify that proof fails with wrong start location
807                if start_loc > 0 {
808                    assert!(
809                        !verify_proof(&mut hasher, &proof, Location::new(start_loc - 1), &ops, &root),
810                        "Proof should fail with wrong start location"
811                    );
812                }
813            }
814
815            db.destroy().await.unwrap();
816        });
817    }
818
819    #[test_traced("INFO")]
820    pub fn test_keyless_db_proof_with_pruning() {
821        let executor = deterministic::Runner::default();
822        executor.start(|context| async move {
823            let mut hasher = Standard::<Sha256>::new();
824            let mut db = open_db(context.with_label("db1")).await;
825
826            // Build a db with some values
827            const ELEMENTS: u64 = 100;
828            let mut values = Vec::new();
829            let finalized = {
830                let mut batch = db.new_batch();
831                for i in 0u64..ELEMENTS {
832                    let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
833                    values.push(v.clone());
834                    batch.append(v);
835                }
836                batch.merkleize(None).finalize()
837            };
838            db.apply_batch(finalized).await.unwrap();
839
840            // Add more elements and commit again
841            let finalized = {
842                let mut batch = db.new_batch();
843                for i in ELEMENTS..ELEMENTS * 2 {
844                    let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
845                    values.push(v.clone());
846                    batch.append(v);
847                }
848                batch.merkleize(None).finalize()
849            };
850            db.apply_batch(finalized).await.unwrap();
851            let root = db.root();
852
853            println!("last commit loc: {}", db.last_commit_loc());
854
855            // Prune the first 30 operations
856            const PRUNE_LOC: u64 = 30;
857            db.prune(Location::new(PRUNE_LOC)).await.unwrap();
858
859            // Verify pruning worked
860            let oldest_retained = db.bounds().await.start;
861
862            // Root should remain the same after pruning
863            assert_eq!(
864                db.root(),
865                root,
866                "Root should not change after pruning"
867            );
868
869            db.sync().await.unwrap();
870            drop(db);
871            let mut db = open_db(context.with_label("db2")).await;
872            assert_eq!(db.root(), root);
873            let bounds = db.bounds().await;
874            assert_eq!(bounds.end, 2 * ELEMENTS + 3);
875            assert!(bounds.start <= PRUNE_LOC);
876
877            // Test that we can't get pruned values
878            for i in 0..*oldest_retained {
879                let result = db.get(Location::new(i)).await;
880                // Should either return None (for commit ops) or encounter pruned data
881                match result {
882                    Ok(None) => {} // Commit operation or pruned
883                    Ok(Some(_)) => {
884                        panic!("Should not be able to get pruned value at location {i}")
885                    }
886                    Err(_) => {} // Expected error for pruned data
887                }
888            }
889
890            // Test proof generation after pruning - should work for non-pruned ranges
891            let test_cases = vec![
892                (oldest_retained, 10), // Starting from oldest retained
893                (Location::new(50), 20),                       // Middle range (if not pruned)
894                (Location::new(150), 10),                      // Later range
895                (Location::new(190), 15),                      // Near the end
896            ];
897
898            for (start_loc, max_ops) in test_cases {
899                // Skip if start_loc is before oldest retained
900                if start_loc < oldest_retained {
901                    continue;
902                }
903
904                let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
905
906                // Verify the proof still works
907                assert!(
908                    verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
909                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
910                );
911
912                // Check that we got operations
913                let expected_ops = std::cmp::min(max_ops, *db.bounds().await.end - *start_loc);
914                assert_eq!(
915                    ops.len() as u64,
916                    expected_ops,
917                    "Expected {expected_ops} operations, got {}",
918                    ops.len(),
919                );
920            }
921
922            // Test pruning more aggressively
923            const AGGRESSIVE_PRUNE: Location = Location::new(150);
924            db.prune(AGGRESSIVE_PRUNE).await.unwrap();
925
926            let new_oldest = db.bounds().await.start;
927            assert!(new_oldest <= AGGRESSIVE_PRUNE);
928
929            // Can still generate proofs for the remaining data
930            let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
931            assert!(
932                verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
933                "Proof should still verify after aggressive pruning"
934            );
935
936            // Test edge case: prune everything except the last few operations
937            let almost_all = db.bounds().await.end - 5;
938            db.prune(almost_all).await.unwrap();
939
940            let bounds = db.bounds().await;
941            let final_oldest = bounds.start;
942
943            // Should still be able to prove the remaining operations
944            if final_oldest < bounds.end {
945                let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
946                assert!(
947                    verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
948                    "Should be able to prove remaining operations after extensive pruning"
949                );
950            }
951
952            db.destroy().await.unwrap();
953        });
954    }
955
956    #[test_traced("WARN")]
957    fn test_keyless_db_replay_with_trailing_appends() {
958        let executor = deterministic::Runner::default();
959        executor.start(|context| async move {
960            // Create initial database with committed data
961            let mut db = open_db(context.with_label("db1")).await;
962
963            // Add some initial operations and commit
964            let finalized = {
965                let mut batch = db.new_batch();
966                for i in 0..10 {
967                    let v = vec![i as u8; 10];
968                    batch.append(v);
969                }
970                batch.merkleize(None).finalize()
971            };
972            db.apply_batch(finalized).await.unwrap();
973            let committed_root = db.root();
974            let committed_size = db.bounds().await.end;
975
976            // Add exactly one more append (uncommitted)
977            {
978                let mut batch = db.new_batch();
979                batch.append(vec![99u8; 20]);
980                // Don't merkleize/finalize/apply -- simulate failed commit
981            }
982
983            // Simulate failure without commit
984            drop(db);
985
986            // Reopen database
987            let mut db = open_db(context.with_label("db2")).await;
988
989            // Verify correct recovery
990            assert_eq!(
991                db.bounds().await.end,
992                committed_size,
993                "Should rewind to last commit"
994            );
995            assert_eq!(db.root(), committed_root, "Root should match last commit");
996            assert_eq!(
997                db.last_commit_loc(),
998                committed_size - 1,
999                "Last commit location should be correct"
1000            );
1001
1002            // Verify we can append and commit new data without issues
1003            let new_value = vec![77u8; 15];
1004            let finalized = {
1005                let mut batch = db.new_batch();
1006                let loc = batch.append(new_value.clone());
1007                assert_eq!(
1008                    loc, committed_size,
1009                    "New append should get the expected location"
1010                );
1011                batch.merkleize(None).finalize()
1012            };
1013            db.apply_batch(finalized).await.unwrap();
1014
1015            // Verify we can read the new value
1016            assert_eq!(db.get(committed_size).await.unwrap(), Some(new_value));
1017
1018            let new_committed_root = db.root();
1019            let new_committed_size = db.bounds().await.end;
1020
1021            // Add multiple uncommitted appends
1022            {
1023                let mut batch = db.new_batch();
1024                for i in 0..5 {
1025                    let v = vec![(200 + i) as u8; 10];
1026                    batch.append(v);
1027                }
1028                // Don't merkleize/finalize/apply -- simulate failed commit
1029            }
1030
1031            // Simulate failure without commit
1032            drop(db);
1033
1034            // Reopen and verify correct recovery
1035            let db = open_db(context.with_label("db3")).await;
1036            assert_eq!(
1037                db.bounds().await.end,
1038                new_committed_size,
1039                "Should rewind to last commit with multiple trailing appends"
1040            );
1041            assert_eq!(
1042                db.root(),
1043                new_committed_root,
1044                "Root should match last commit after multiple appends"
1045            );
1046            assert_eq!(
1047                db.last_commit_loc(),
1048                new_committed_size - 1,
1049                "Last commit location should be correct after multiple appends"
1050            );
1051
1052            db.destroy().await.unwrap();
1053        });
1054    }
1055
1056    #[test_traced("INFO")]
1057    pub fn test_keyless_db_get_out_of_bounds() {
1058        let executor = deterministic::Runner::default();
1059        executor.start(|context| async move {
1060            let mut db = open_db(context.clone()).await;
1061
1062            // Test getting from empty database
1063            let result = db.get(Location::new(0)).await.unwrap();
1064            assert!(result.is_none());
1065
1066            // Add some values
1067            let v1 = vec![1u8; 8];
1068            let v2 = vec![2u8; 8];
1069            let finalized = {
1070                let mut batch = db.new_batch();
1071                batch.append(v1.clone());
1072                batch.append(v2.clone());
1073                batch.merkleize(None).finalize()
1074            };
1075            db.apply_batch(finalized).await.unwrap();
1076
1077            // Test getting valid locations - should succeed
1078            assert_eq!(db.get(Location::new(1)).await.unwrap().unwrap(), v1);
1079            assert_eq!(db.get(Location::new(2)).await.unwrap().unwrap(), v2);
1080
1081            // Test getting out of bounds location
1082            let result = db.get(Location::new(3)).await.unwrap();
1083            assert!(result.is_none());
1084
1085            // Test getting out of bounds location
1086            let result = db.get(Location::new(4)).await;
1087            assert!(
1088                matches!(result, Err(Error::LocationOutOfBounds(loc, size)) if loc == Location::new(4) && size == Location::new(4))
1089            );
1090            db.destroy().await.unwrap();
1091        });
1092    }
1093
1094    #[test_traced("INFO")]
1095    pub fn test_keyless_db_prune_beyond_commit() {
1096        let executor = deterministic::Runner::default();
1097        executor.start(|context| async move {
1098            let mut db = open_db(context.clone()).await;
1099
1100            // Test pruning empty database (no commits)
1101            let result = db.prune(Location::new(1)).await;
1102            assert!(
1103                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1104                    if prune_loc == Location::new(1) && commit_loc == Location::new(0))
1105            );
1106
1107            // Add values and commit
1108            let v1 = vec![1u8; 8];
1109            let v2 = vec![2u8; 8];
1110            let v3 = vec![3u8; 8];
1111            let finalized = {
1112                let mut batch = db.new_batch();
1113                batch.append(v1.clone());
1114                batch.append(v2.clone());
1115                batch.merkleize(None).finalize()
1116            };
1117            db.apply_batch(finalized).await.unwrap();
1118
1119            // op_count is 4 (initial_commit, v1, v2, commit), last_commit_loc is 3
1120            let last_commit = db.last_commit_loc();
1121            assert_eq!(last_commit, Location::new(3));
1122
1123            let finalized = {
1124                let mut batch = db.new_batch();
1125                batch.append(v3.clone());
1126                batch.merkleize(None).finalize()
1127            };
1128            db.apply_batch(finalized).await.unwrap();
1129
1130            // Test valid prune (at previous commit location 3)
1131            assert!(db.prune(Location::new(3)).await.is_ok());
1132
1133            // Test pruning beyond last commit
1134            let new_last_commit = db.last_commit_loc();
1135            let beyond = Location::new(*new_last_commit + 1);
1136            let result = db.prune(beyond).await;
1137            assert!(
1138                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1139                    if prune_loc == beyond && commit_loc == new_last_commit)
1140            );
1141
1142            db.destroy().await.unwrap();
1143        });
1144    }
1145
1146    use crate::{
1147        kv::tests::assert_send,
1148        qmdb::store::tests::{assert_log_store, assert_merkleized_store},
1149    };
1150
1151    #[allow(dead_code)]
1152    fn assert_db_futures_are_send(db: &mut Db, loc: Location) {
1153        assert_log_store(db);
1154        assert_merkleized_store(db, loc);
1155        assert_send(db.sync());
1156        assert_send(db.get(loc));
1157    }
1158
1159    /// batch.get() reads pending appends and falls through to base DB.
1160    #[test_traced("INFO")]
1161    fn test_keyless_batch_get() {
1162        let executor = deterministic::Runner::default();
1163        executor.start(|context| async move {
1164            let mut db = open_db(context.with_label("db")).await;
1165
1166            // Pre-populate with 3 values.
1167            let base_vals: Vec<Vec<u8>> = (0..3).map(|i| vec![(10 + i) as u8; 12]).collect();
1168            let mut base_locs = Vec::new();
1169            let finalized = {
1170                let mut batch = db.new_batch();
1171                for v in &base_vals {
1172                    base_locs.push(batch.append(v.clone()));
1173                }
1174                batch.merkleize(None).finalize()
1175            };
1176            db.apply_batch(finalized).await.unwrap();
1177
1178            // Second batch: read base DB values through the batch.
1179            let mut batch = db.new_batch();
1180            for (i, loc) in base_locs.iter().enumerate() {
1181                assert_eq!(
1182                    batch.get(*loc).await.unwrap(),
1183                    Some(base_vals[i].clone()),
1184                    "base DB value at loc {loc} mismatch"
1185                );
1186            }
1187
1188            // Pending append is visible.
1189            let new_val = vec![99u8; 16];
1190            let new_loc = batch.append(new_val.clone());
1191            assert_eq!(batch.get(new_loc).await.unwrap(), Some(new_val));
1192
1193            // Location past the end returns None.
1194            let beyond = Location::new(*new_loc + 1);
1195            assert_eq!(batch.get(beyond).await.unwrap(), None);
1196
1197            db.destroy().await.unwrap();
1198        });
1199    }
1200
1201    /// Child batch reads parent chain values and its own appends.
1202    #[test_traced("INFO")]
1203    fn test_keyless_batch_stacked_get() {
1204        let executor = deterministic::Runner::default();
1205        executor.start(|context| async move {
1206            let db = open_db(context.with_label("db")).await;
1207
1208            let v1 = vec![1u8; 8];
1209            let v2 = vec![2u8; 16];
1210
1211            // Parent batch appends v1.
1212            let mut parent = db.new_batch();
1213            let loc1 = parent.append(v1.clone());
1214            let parent_m = parent.merkleize(None);
1215
1216            // Child reads v1 from parent chain.
1217            let mut child = parent_m.new_batch();
1218            assert_eq!(child.get(loc1).await.unwrap(), Some(v1));
1219
1220            // Child appends v2.
1221            let loc2 = child.append(v2.clone());
1222            assert_eq!(child.get(loc2).await.unwrap(), Some(v2));
1223
1224            // Nonexistent location.
1225            let nonexistent = Location::new(9999);
1226            assert_eq!(child.get(nonexistent).await.unwrap(), None);
1227
1228            db.destroy().await.unwrap();
1229        });
1230    }
1231
1232    /// Metadata propagates through merkleize and clears with None.
1233    #[test_traced("INFO")]
1234    fn test_keyless_batch_metadata() {
1235        let executor = deterministic::Runner::default();
1236        executor.start(|context| async move {
1237            let mut db = open_db(context.with_label("db")).await;
1238
1239            // Batch with metadata.
1240            let metadata = vec![42u8; 32];
1241            let finalized = {
1242                let mut batch = db.new_batch();
1243                batch.append(vec![1u8; 8]);
1244                batch.merkleize(Some(metadata.clone())).finalize()
1245            };
1246            db.apply_batch(finalized).await.unwrap();
1247            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1248
1249            // Second batch clears metadata.
1250            let finalized = db.new_batch().merkleize(None).finalize();
1251            db.apply_batch(finalized).await.unwrap();
1252            assert_eq!(db.get_metadata().await.unwrap(), None);
1253
1254            db.destroy().await.unwrap();
1255        });
1256    }
1257
1258    /// MerkleizedBatch::root() matches db.root() after apply_batch().
1259    #[test_traced("INFO")]
1260    fn test_keyless_batch_speculative_root() {
1261        let executor = deterministic::Runner::default();
1262        executor.start(|context| async move {
1263            let mut db = open_db(context.with_label("db")).await;
1264
1265            let merkleized = {
1266                let mut batch = db.new_batch();
1267                for i in 0u8..10 {
1268                    batch.append(vec![i; 16]);
1269                }
1270                batch.merkleize(None)
1271            };
1272
1273            // Speculative root is available before apply.
1274            let speculative = merkleized.root();
1275            let finalized = merkleized.finalize();
1276            db.apply_batch(finalized).await.unwrap();
1277
1278            assert_eq!(db.root(), speculative);
1279
1280            // Second batch: verify again with metadata.
1281            let metadata = vec![55u8; 8];
1282            let merkleized = {
1283                let mut batch = db.new_batch();
1284                batch.append(vec![0xAA; 20]);
1285                batch.merkleize(Some(metadata))
1286            };
1287            let speculative = merkleized.root();
1288            let finalized = merkleized.finalize();
1289            db.apply_batch(finalized).await.unwrap();
1290
1291            assert_eq!(db.root(), speculative);
1292
1293            db.destroy().await.unwrap();
1294        });
1295    }
1296
1297    /// MerkleizedBatch::get() reads from the operation chain and base DB.
1298    #[test_traced("INFO")]
1299    fn test_keyless_merkleized_batch_get() {
1300        let executor = deterministic::Runner::default();
1301        executor.start(|context| async move {
1302            let mut db = open_db(context.with_label("db")).await;
1303
1304            // Pre-populate base DB.
1305            let base_val = vec![10u8; 12];
1306            let finalized = {
1307                let mut batch = db.new_batch();
1308                batch.append(base_val.clone());
1309                batch.merkleize(None).finalize()
1310            };
1311            db.apply_batch(finalized).await.unwrap();
1312            // Locations: 0=initial commit, 1=append, 2=commit
1313
1314            // Create a merkleized batch with new appends.
1315            let new_val = vec![20u8; 16];
1316            let mut batch = db.new_batch();
1317            batch.append(new_val.clone());
1318            let merkleized = batch.merkleize(None);
1319            // Locations: 3=append, 4=commit
1320
1321            // Read base DB value through merkleized batch.
1322            assert_eq!(
1323                merkleized.get(Location::new(1)).await.unwrap(),
1324                Some(base_val),
1325            );
1326
1327            // Read this batch's append from the operation chain.
1328            assert_eq!(
1329                merkleized.get(Location::new(3)).await.unwrap(),
1330                Some(new_val),
1331            );
1332
1333            // Commit op returns None (no metadata).
1334            assert_eq!(merkleized.get(Location::new(4)).await.unwrap(), None);
1335
1336            db.destroy().await.unwrap();
1337        });
1338    }
1339
1340    /// Chained child batch can be merkleized, finalized, and applied.
1341    ///
1342    /// The chained pattern builds speculative batches on top of each other
1343    /// without applying intermediates. Only the final child is applied, which
1344    /// contains all operations from the entire chain.
1345    #[test_traced("INFO")]
1346    fn test_keyless_batch_chained_apply() {
1347        let executor = deterministic::Runner::default();
1348        executor.start(|context| async move {
1349            let mut db = open_db(context.with_label("db")).await;
1350
1351            let v1 = vec![1u8; 8];
1352            let v2 = vec![2u8; 16];
1353            let v3 = vec![3u8; 24];
1354
1355            // Parent batch.
1356            let mut parent = db.new_batch();
1357            let loc1 = parent.append(v1.clone());
1358            let parent_m = parent.merkleize(None);
1359            let parent_root = parent_m.root();
1360
1361            // Child batch built on top of parent.
1362            let mut child = parent_m.new_batch();
1363            let loc2 = child.append(v2.clone());
1364            let loc3 = child.append(v3.clone());
1365            let child_m = child.merkleize(None);
1366            let child_root = child_m.root();
1367
1368            // Roots should differ (child has more data).
1369            assert_ne!(parent_root, child_root);
1370
1371            // Apply only the final child -- it contains everything.
1372            let child_finalized = child_m.finalize();
1373            db.apply_batch(child_finalized).await.unwrap();
1374            assert_eq!(db.root(), child_root);
1375            assert_eq!(db.get(loc1).await.unwrap(), Some(v1));
1376            assert_eq!(db.get(loc2).await.unwrap(), Some(v2));
1377            assert_eq!(db.get(loc3).await.unwrap(), Some(v3));
1378
1379            // Verify recovery after reopen.
1380            drop(db);
1381            let db = open_db(context.with_label("db2")).await;
1382            assert_eq!(db.root(), child_root);
1383
1384            db.destroy().await.unwrap();
1385        });
1386    }
1387
1388    /// Alternatively, chained batches can each be applied independently.
1389    #[test_traced("INFO")]
1390    fn test_keyless_batch_chained_apply_sequential() {
1391        let executor = deterministic::Runner::default();
1392        executor.start(|context| async move {
1393            let mut db = open_db(context.with_label("db")).await;
1394
1395            let v1 = vec![1u8; 8];
1396            let v2 = vec![2u8; 16];
1397
1398            // Parent batch.
1399            let mut parent = db.new_batch();
1400            let loc1 = parent.append(v1.clone());
1401            let parent_m = parent.merkleize(None);
1402            let parent_root = parent_m.root();
1403
1404            // Finalize and apply parent first.
1405            let parent_finalized = parent_m.finalize();
1406            db.apply_batch(parent_finalized).await.unwrap();
1407            assert_eq!(db.root(), parent_root);
1408            assert_eq!(db.get(loc1).await.unwrap(), Some(v1));
1409
1410            // Now create a second (independent) batch.
1411            let mut batch2 = db.new_batch();
1412            let loc2 = batch2.append(v2.clone());
1413            let batch2_m = batch2.merkleize(None);
1414            let batch2_root = batch2_m.root();
1415            let batch2_finalized = batch2_m.finalize();
1416            db.apply_batch(batch2_finalized).await.unwrap();
1417            assert_eq!(db.root(), batch2_root);
1418            assert_eq!(db.get(loc2).await.unwrap(), Some(v2));
1419
1420            db.destroy().await.unwrap();
1421        });
1422    }
1423
1424    /// Many sequential batches accumulate correctly.
1425    #[test_traced("INFO")]
1426    fn test_keyless_batch_many_sequential() {
1427        let executor = deterministic::Runner::default();
1428        executor.start(|context| async move {
1429            let mut db = open_db(context.with_label("db")).await;
1430            let mut hasher = Standard::<Sha256>::new();
1431
1432            const BATCHES: u64 = 20;
1433            const APPENDS_PER_BATCH: u64 = 5;
1434
1435            let mut all_values: Vec<Vec<u8>> = Vec::new();
1436            let mut all_locs: Vec<Location> = Vec::new();
1437
1438            for batch_idx in 0..BATCHES {
1439                let finalized = {
1440                    let mut batch = db.new_batch();
1441                    for j in 0..APPENDS_PER_BATCH {
1442                        let v = vec![(batch_idx * 10 + j) as u8; 8];
1443                        let loc = batch.append(v.clone());
1444                        all_values.push(v);
1445                        all_locs.push(loc);
1446                    }
1447                    batch.merkleize(None).finalize()
1448                };
1449                db.apply_batch(finalized).await.unwrap();
1450            }
1451
1452            // Verify all values are readable.
1453            for (i, loc) in all_locs.iter().enumerate() {
1454                assert_eq!(
1455                    db.get(*loc).await.unwrap(),
1456                    Some(all_values[i].clone()),
1457                    "mismatch at index {i}, loc {loc}"
1458                );
1459            }
1460
1461            // Verify proof over the full range.
1462            let root = db.root();
1463            let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1464            assert!(verify_proof(
1465                &mut hasher,
1466                &proof,
1467                Location::new(0),
1468                &ops,
1469                &root
1470            ));
1471
1472            // Expected size: 1 initial commit + BATCHES * (APPENDS_PER_BATCH + 1 commit).
1473            let expected = 1 + BATCHES * (APPENDS_PER_BATCH + 1);
1474            assert_eq!(db.bounds().await.end, expected);
1475
1476            db.destroy().await.unwrap();
1477        });
1478    }
1479
1480    /// Empty batch (zero appends) produces correct speculative root.
1481    #[test_traced("INFO")]
1482    fn test_keyless_batch_empty() {
1483        let executor = deterministic::Runner::default();
1484        executor.start(|context| async move {
1485            let mut db = open_db(context.with_label("db")).await;
1486
1487            // Apply a non-empty batch first so the DB has some state.
1488            let finalized = {
1489                let mut batch = db.new_batch();
1490                batch.append(vec![1u8; 8]);
1491                batch.merkleize(None).finalize()
1492            };
1493            db.apply_batch(finalized).await.unwrap();
1494            let root_before = db.root();
1495            let size_before = db.bounds().await.end;
1496
1497            // Empty batch with no appends.
1498            let merkleized = db.new_batch().merkleize(None);
1499            let speculative = merkleized.root();
1500            let finalized = merkleized.finalize();
1501            db.apply_batch(finalized).await.unwrap();
1502
1503            // Root changed (a new Commit op was appended).
1504            assert_ne!(db.root(), root_before);
1505            assert_eq!(db.root(), speculative);
1506            // Size grew by exactly 1 (the Commit op).
1507            assert_eq!(db.bounds().await.end, size_before + 1);
1508
1509            db.destroy().await.unwrap();
1510        });
1511    }
1512
1513    /// MerkleizedBatch::get() works on a chained child's merkleized batch.
1514    #[test_traced("INFO")]
1515    fn test_keyless_batch_chained_merkleized_get() {
1516        let executor = deterministic::Runner::default();
1517        executor.start(|context| async move {
1518            let mut db = open_db(context.with_label("db")).await;
1519
1520            // Pre-populate base DB.
1521            let base_val = vec![10u8; 12];
1522            let finalized = {
1523                let mut batch = db.new_batch();
1524                batch.append(base_val.clone());
1525                batch.merkleize(None).finalize()
1526            };
1527            db.apply_batch(finalized).await.unwrap();
1528            let base_loc = Location::new(1);
1529
1530            // Parent batch appends v1.
1531            let v1 = vec![1u8; 8];
1532            let mut parent = db.new_batch();
1533            let loc1 = parent.append(v1.clone());
1534            let parent_m = parent.merkleize(None);
1535
1536            // Child batch appends v2.
1537            let v2 = vec![2u8; 16];
1538            let mut child = parent_m.new_batch();
1539            let loc2 = child.append(v2.clone());
1540            let child_m = child.merkleize(None);
1541
1542            // Child's MerkleizedBatch can read all three layers:
1543            // base DB value
1544            assert_eq!(
1545                child_m.get(base_loc).await.unwrap(),
1546                Some(base_val),
1547                "should read base DB value"
1548            );
1549            // parent chain value
1550            assert_eq!(
1551                child_m.get(loc1).await.unwrap(),
1552                Some(v1),
1553                "should read parent chain value"
1554            );
1555            // child's own value
1556            assert_eq!(
1557                child_m.get(loc2).await.unwrap(),
1558                Some(v2),
1559                "should read child's own value"
1560            );
1561
1562            db.destroy().await.unwrap();
1563        });
1564    }
1565
1566    /// Large single batch with many appends, verifying all values and proof.
1567    #[test_traced("INFO")]
1568    fn test_keyless_batch_large() {
1569        let executor = deterministic::Runner::default();
1570        executor.start(|context| async move {
1571            let mut db = open_db(context.with_label("db")).await;
1572            let mut hasher = Standard::<Sha256>::new();
1573
1574            const N: u64 = 500;
1575            let mut values = Vec::new();
1576            let mut locs = Vec::new();
1577
1578            let finalized = {
1579                let mut batch = db.new_batch();
1580                for i in 0..N {
1581                    let v = vec![(i % 256) as u8; ((i % 29) + 3) as usize];
1582                    locs.push(batch.append(v.clone()));
1583                    values.push(v);
1584                }
1585                batch.merkleize(None).finalize()
1586            };
1587            db.apply_batch(finalized).await.unwrap();
1588
1589            // Verify every value.
1590            for (i, loc) in locs.iter().enumerate() {
1591                assert_eq!(
1592                    db.get(*loc).await.unwrap(),
1593                    Some(values[i].clone()),
1594                    "mismatch at index {i}"
1595                );
1596            }
1597
1598            // Verify proof over the full range.
1599            let root = db.root();
1600            let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1601            assert!(verify_proof(
1602                &mut hasher,
1603                &proof,
1604                Location::new(0),
1605                &ops,
1606                &root
1607            ));
1608
1609            // Expected: 1 initial commit + N appends + 1 commit.
1610            assert_eq!(db.bounds().await.end, 1 + N + 1);
1611
1612            db.destroy().await.unwrap();
1613        });
1614    }
1615
1616    #[test_traced]
1617    fn test_stale_changeset_rejected() {
1618        let executor = deterministic::Runner::default();
1619        executor.start(|context| async move {
1620            let mut db = open_db(context.with_label("db")).await;
1621
1622            // Create two batches from the same DB state.
1623            let changeset_a = {
1624                let mut batch = db.new_batch();
1625                batch.append(vec![10]);
1626                batch.merkleize(None).finalize()
1627            };
1628            let changeset_b = {
1629                let mut batch = db.new_batch();
1630                batch.append(vec![20]);
1631                batch.merkleize(None).finalize()
1632            };
1633
1634            // Apply the first -- should succeed.
1635            db.apply_batch(changeset_a).await.unwrap();
1636            let expected_root = db.root();
1637            let expected_bounds = db.bounds().await;
1638            let expected_last_commit = db.last_commit_loc();
1639            assert_eq!(db.get(Location::new(1)).await.unwrap(), Some(vec![10]));
1640            assert_eq!(db.get_metadata().await.unwrap(), None);
1641
1642            // Apply the second -- should fail because the DB was modified.
1643            let result = db.apply_batch(changeset_b).await;
1644            assert!(
1645                matches!(result, Err(Error::StaleChangeset { .. })),
1646                "expected StaleChangeset error, got {result:?}"
1647            );
1648            assert_eq!(db.root(), expected_root);
1649            assert_eq!(db.bounds().await, expected_bounds);
1650            assert_eq!(db.last_commit_loc(), expected_last_commit);
1651            assert_eq!(db.get(Location::new(1)).await.unwrap(), Some(vec![10]));
1652            assert_eq!(db.get_metadata().await.unwrap(), None);
1653
1654            db.destroy().await.unwrap();
1655        });
1656    }
1657
1658    #[test_traced]
1659    fn test_stale_changeset_chained() {
1660        let executor = deterministic::Runner::default();
1661        executor.start(|context| async move {
1662            let mut db = open_db(context.with_label("db")).await;
1663
1664            // Parent batch.
1665            let mut parent = db.new_batch();
1666            parent.append(vec![1]);
1667            let parent_m = parent.merkleize(None);
1668
1669            // Fork two children from the same parent.
1670            let child_a = {
1671                let mut batch = parent_m.new_batch();
1672                batch.append(vec![2]);
1673                batch.merkleize(None).finalize()
1674            };
1675            let child_b = {
1676                let mut batch = parent_m.new_batch();
1677                batch.append(vec![3]);
1678                batch.merkleize(None).finalize()
1679            };
1680
1681            // Apply child A.
1682            db.apply_batch(child_a).await.unwrap();
1683
1684            // Child B is stale.
1685            let result = db.apply_batch(child_b).await;
1686            assert!(
1687                matches!(result, Err(Error::StaleChangeset { .. })),
1688                "expected StaleChangeset error, got {result:?}"
1689            );
1690
1691            db.destroy().await.unwrap();
1692        });
1693    }
1694
1695    #[test_traced]
1696    fn test_stale_changeset_parent_applied_before_child() {
1697        let executor = deterministic::Runner::default();
1698        executor.start(|context| async move {
1699            let mut db = open_db(context.with_label("db")).await;
1700
1701            // Parent batch.
1702            let mut parent = db.new_batch();
1703            parent.append(vec![1]);
1704            let parent_m = parent.merkleize(None);
1705
1706            // Child batch.
1707            let mut child = parent_m.new_batch();
1708            child.append(vec![2]);
1709            let child_changeset = child.merkleize(None).finalize();
1710
1711            // Apply parent first.
1712            let parent_changeset = parent_m.finalize();
1713            db.apply_batch(parent_changeset).await.unwrap();
1714
1715            // Child is stale because it expected to be applied on top of the
1716            // pre-parent DB state.
1717            let result = db.apply_batch(child_changeset).await;
1718            assert!(
1719                matches!(result, Err(Error::StaleChangeset { .. })),
1720                "expected StaleChangeset error, got {result:?}"
1721            );
1722
1723            db.destroy().await.unwrap();
1724        });
1725    }
1726
1727    #[test_traced]
1728    fn test_stale_changeset_child_applied_before_parent() {
1729        let executor = deterministic::Runner::default();
1730        executor.start(|context| async move {
1731            let mut db = open_db(context.with_label("db")).await;
1732
1733            // Parent batch.
1734            let mut parent = db.new_batch();
1735            parent.append(vec![1]);
1736            let parent_m = parent.merkleize(None);
1737
1738            // Child batch. Finalize both before applying either so the
1739            // borrow on `db` through `parent_m` is released.
1740            let mut child = parent_m.new_batch();
1741            child.append(vec![2]);
1742            let child_changeset = child.merkleize(None).finalize();
1743            let parent_changeset = parent_m.finalize();
1744
1745            // Apply child first (it carries all parent ops too).
1746            db.apply_batch(child_changeset).await.unwrap();
1747
1748            // Parent is stale.
1749            let result = db.apply_batch(parent_changeset).await;
1750            assert!(
1751                matches!(result, Err(Error::StaleChangeset { .. })),
1752                "expected StaleChangeset error, got {result:?}"
1753            );
1754
1755            db.destroy().await.unwrap();
1756        });
1757    }
1758}