Skip to main content

commonware_storage/qmdb/keyless/
mod.rs

1//! The [Keyless] qmdb allows for append-only storage of data that can later be retrieved by its
2//! location. Both fixed-size and variable-size values are supported via the [fixed] and [variable]
3//! submodules.
4//!
5//! # Examples
6//!
7//! ```ignore
8//! // Simple mode: apply a batch, then durably commit it.
9//! let batch = db.new_batch().append(value);
10//! let merkleized = batch.merkleize(&db, None);
11//! db.apply_batch(merkleized).await?;
12//! db.commit().await?;
13//! ```
14//!
15//! ```ignore
16//! // Batches can still fork before you apply them.
17//! let parent = db.new_batch().append(value_a);
18//! let parent = parent.merkleize(&db, None);
19//!
20//! let child_a = parent.new_batch();
21//! let child_a = child_a.append(value_b);
22//! let child_a = child_a.merkleize(&db, None);
23//!
24//! let child_b = parent.new_batch();
25//! let child_b = child_b.append(value_c);
26//! let child_b = child_b.merkleize(&db, None);
27//!
28//! db.apply_batch(child_a).await?;
29//! db.commit().await?;
30//! ```
31//!
32//! ```ignore
33//! // Sequential commit: apply parent then child.
34//! let parent = db.new_batch().append(value_a);
35//! let parent_m = parent.merkleize(&db, None);
36//! let child = parent_m.new_batch().append(value_b);
37//! let child_m = child.merkleize(&db, None);
38//!
39//! db.apply_batch(parent_m).await?;
40//! db.apply_batch(child_m).await?;
41//! db.commit().await?;
42//! ```
43
44use crate::{
45    journal::{
46        authenticated,
47        contiguous::{Contiguous, Mutable, Reader},
48        Error as JournalError,
49    },
50    merkle::{journaled::Config as MerkleConfig, Family, Location, Proof},
51    qmdb::{any::value::ValueEncoding, Error},
52    Context, Persistable,
53};
54use commonware_codec::EncodeShared;
55use commonware_cryptography::Hasher;
56use std::{num::NonZeroU64, sync::Arc};
57use tracing::{debug, warn};
58
59pub mod batch;
60pub mod fixed;
61mod operation;
62pub mod variable;
63pub use operation::Operation;
64
65/// Configuration for a [Keyless] authenticated db.
66#[derive(Clone)]
67pub struct Config<J> {
68    /// Configuration for the Merkle structure backing the authenticated journal.
69    pub merkle: MerkleConfig,
70
71    /// Configuration for the operations log journal.
72    pub log: J,
73}
74
75/// A keyless authenticated database.
76pub struct Keyless<F, E, V, C, H>
77where
78    F: Family,
79    E: Context,
80    V: ValueEncoding,
81    C: Contiguous<Item = Operation<V>>,
82    H: Hasher,
83    Operation<V>: EncodeShared,
84{
85    /// Authenticated journal of operations.
86    journal: authenticated::Journal<F, E, C, H>,
87
88    /// The location of the last commit, if any.
89    last_commit_loc: Location<F>,
90}
91
92impl<F, E, V, C, H> Keyless<F, E, V, C, H>
93where
94    F: Family,
95    E: Context,
96    V: ValueEncoding,
97    C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
98    H: Hasher,
99    Operation<V>: EncodeShared,
100{
101    pub(crate) async fn init_from_journal(
102        mut journal: authenticated::Journal<F, E, C, H>,
103    ) -> Result<Self, Error<F>> {
104        if journal.size().await == 0 {
105            warn!("no operations found in log, creating initial commit");
106            journal.append(&Operation::Commit(None)).await?;
107            journal.sync().await?;
108        }
109
110        let last_commit_loc = journal
111            .size()
112            .await
113            .checked_sub(1)
114            .expect("at least one commit should exist");
115
116        Ok(Self {
117            journal,
118            last_commit_loc,
119        })
120    }
121
122    /// Get the value at location `loc` in the database.
123    ///
124    /// # Errors
125    ///
126    /// Returns [`Error::LocationOutOfBounds`] if `loc` >=
127    /// `self.bounds().await.end`.
128    pub async fn get(&self, loc: Location<F>) -> Result<Option<V::Value>, Error<F>> {
129        let reader = self.journal.reader().await;
130        let op_count = reader.bounds().end;
131        if loc >= op_count {
132            return Err(Error::LocationOutOfBounds(loc, Location::new(op_count)));
133        }
134        let op = reader.read(*loc).await?;
135
136        Ok(op.into_value())
137    }
138
139    /// Returns the location of the last commit.
140    pub const fn last_commit_loc(&self) -> Location<F> {
141        self.last_commit_loc
142    }
143
144    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
145    /// retained operations respectively.
146    pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
147        let bounds = self.journal.reader().await.bounds();
148        Location::new(bounds.start)..Location::new(bounds.end)
149    }
150
151    /// Get the metadata associated with the last commit.
152    pub async fn get_metadata(&self) -> Result<Option<V::Value>, Error<F>> {
153        let op = self
154            .journal
155            .reader()
156            .await
157            .read(*self.last_commit_loc)
158            .await?;
159        let Operation::Commit(metadata) = op else {
160            return Ok(None);
161        };
162
163        Ok(metadata)
164    }
165
166    /// Return the root of the db.
167    pub fn root(&self) -> H::Digest {
168        self.journal.root()
169    }
170
171    /// Generate and return:
172    ///  1. a proof of all operations applied to the db in the range starting at (and including)
173    ///     location `start_loc`, and ending at the first of either:
174    ///     - the last operation performed, or
175    ///     - the operation `max_ops` from the start.
176    ///  2. the operations corresponding to the leaves in this range.
177    ///
178    /// # Errors
179    ///
180    /// - Returns [`Error::Merkle`] with [`crate::merkle::Error::RangeOutOfBounds`] if `start_loc`
181    ///   >= the number of operations.
182    /// - Returns [`Error::Journal`] with [`crate::journal::Error::ItemPruned`] if `start_loc` has
183    ///   been pruned.
184    pub async fn proof(
185        &self,
186        start_loc: Location<F>,
187        max_ops: NonZeroU64,
188    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<V>>), Error<F>> {
189        self.historical_proof(self.bounds().await.end, start_loc, max_ops)
190            .await
191    }
192
193    /// Analogous to proof, but with respect to the state of the Merkle structure when it had
194    /// `op_count` operations.
195    ///
196    /// # Errors
197    ///
198    /// - Returns [`Error::Merkle`] with [`crate::merkle::Error::RangeOutOfBounds`] if `start_loc`
199    ///   >= `op_count` or `op_count` > number of operations.
200    /// - Returns [`Error::Journal`] with [`crate::journal::Error::ItemPruned`] if `start_loc` has
201    ///   been pruned.
202    pub async fn historical_proof(
203        &self,
204        op_count: Location<F>,
205        start_loc: Location<F>,
206        max_ops: NonZeroU64,
207    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<V>>), Error<F>> {
208        Ok(self
209            .journal
210            .historical_proof(op_count, start_loc, max_ops)
211            .await?)
212    }
213
214    /// Prune historical operations prior to `loc`. This does not affect the db's root.
215    ///
216    /// # Errors
217    ///
218    /// - Returns [`Error::PruneBeyondMinRequired`] if `loc` > last commit point.
219    pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
220        if loc > self.last_commit_loc {
221            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
222        }
223        self.journal.prune(loc).await?;
224
225        Ok(())
226    }
227
228    /// Rewind the database to `size` operations, where `size` is the location of the next append.
229    ///
230    /// This rewinds both the operations journal and its Merkle structure to the historical state
231    /// at `size`.
232    ///
233    /// # Errors
234    ///
235    /// - Returns [`Error::Journal`] with [`crate::journal::Error::InvalidRewind`] if `size` is 0
236    ///   or exceeds the current committed size.
237    /// - Returns [`Error::Journal`] with [`crate::journal::Error::ItemPruned`] if the operation at
238    ///   `size - 1` has been pruned.
239    /// - Returns [`Error::UnexpectedData`] if the operation at `size - 1` is not a commit.
240    ///
241    /// Any error from this method is fatal for this handle. Rewind may mutate journal state
242    /// before this method finishes updating in-memory rewind state. Callers must drop this
243    /// database handle after any `Err` from `rewind` and reopen from storage.
244    ///
245    /// A successful rewind is not restart-stable until a subsequent [`Self::commit`] or
246    /// [`Self::sync`].
247    pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
248        let rewind_size = *size;
249        let current_size = *self.last_commit_loc + 1;
250        if rewind_size == current_size {
251            return Ok(());
252        }
253        if rewind_size == 0 || rewind_size > current_size {
254            return Err(Error::Journal(crate::journal::Error::InvalidRewind(
255                rewind_size,
256            )));
257        }
258
259        let rewind_last_loc = Location::new(rewind_size - 1);
260        {
261            let reader = self.journal.reader().await;
262            let bounds = reader.bounds();
263            if rewind_size <= bounds.start {
264                return Err(Error::Journal(crate::journal::Error::ItemPruned(
265                    *rewind_last_loc,
266                )));
267            }
268            let rewind_last_op = reader.read(*rewind_last_loc).await?;
269            if !matches!(rewind_last_op, Operation::Commit(_)) {
270                return Err(Error::UnexpectedData(rewind_last_loc));
271            }
272        }
273
274        // Journal rewind happens before in-memory commit-location updates. If a later step fails,
275        // this handle may be internally diverged and must be dropped by the caller.
276        self.journal.rewind(rewind_size).await?;
277        self.last_commit_loc = rewind_last_loc;
278        Ok(())
279    }
280
281    /// Sync all database state to disk. While this isn't necessary to ensure durability of
282    /// committed operations, periodic invocation may reduce memory usage and the time required to
283    /// recover the database on restart.
284    pub async fn sync(&self) -> Result<(), Error<F>> {
285        self.journal.sync().await.map_err(Into::into)
286    }
287
288    /// Durably commit the journal state published by prior [`Keyless::apply_batch`] calls.
289    pub async fn commit(&self) -> Result<(), Error<F>> {
290        self.journal.commit().await.map_err(Into::into)
291    }
292
293    /// Destroy the db, removing all data from disk.
294    pub async fn destroy(self) -> Result<(), Error<F>> {
295        Ok(self.journal.destroy().await?)
296    }
297
298    /// Create a new speculative batch of operations with this database as its parent.
299    pub fn new_batch(&self) -> batch::UnmerkleizedBatch<F, H, V> {
300        let journal_size = *self.last_commit_loc + 1;
301        batch::UnmerkleizedBatch::new(self, journal_size)
302    }
303
304    /// Create an initial [`batch::MerkleizedBatch`] from the committed DB state.
305    pub fn to_batch(&self) -> Arc<batch::MerkleizedBatch<F, H::Digest, V>> {
306        let journal_size = *self.last_commit_loc + 1;
307        Arc::new(batch::MerkleizedBatch {
308            journal_batch: self.journal.to_merkleized_batch(),
309            parent: None,
310            base_size: journal_size,
311            total_size: journal_size,
312            db_size: journal_size,
313            ancestor_batch_ends: Vec::new(),
314        })
315    }
316
317    /// Apply a [`batch::MerkleizedBatch`] to the database.
318    ///
319    /// A batch is valid only if every batch applied to the database since this batch's
320    /// ancestor chain was created is an ancestor of this batch. Applying a batch from a
321    /// different fork returns [`Error::StaleBatch`].
322    ///
323    /// Returns the range of locations written.
324    ///
325    /// This publishes the batch to the in-memory database state and appends it to the
326    /// journal, but does not durably commit it. Call [`Keyless::commit`] or
327    /// [`Keyless::sync`] to guarantee durability.
328    pub async fn apply_batch(
329        &mut self,
330        batch: Arc<batch::MerkleizedBatch<F, H::Digest, V>>,
331    ) -> Result<core::ops::Range<Location<F>>, Error<F>> {
332        let db_size = *self.last_commit_loc + 1;
333        let valid = db_size == batch.db_size
334            || db_size == batch.base_size
335            || batch.ancestor_batch_ends.contains(&db_size);
336        if !valid {
337            return Err(Error::StaleBatch {
338                db_size,
339                batch_db_size: batch.db_size,
340                batch_base_size: batch.base_size,
341            });
342        }
343        let start_loc = self.last_commit_loc + 1;
344
345        self.journal.apply_batch(&batch.journal_batch).await?;
346
347        self.last_commit_loc = Location::new(batch.total_size - 1);
348        let end_loc = Location::new(batch.total_size);
349        debug!(size = ?end_loc, "applied batch");
350        Ok(start_loc..end_loc)
351    }
352}
353
354#[cfg(test)]
355pub(crate) mod tests {
356    use super::*;
357    use crate::{
358        journal::{contiguous::Mutable, Error as JournalError},
359        merkle::hasher::Standard,
360        qmdb::verify_proof,
361        Persistable,
362    };
363    use commonware_cryptography::Sha256;
364    use commonware_runtime::{deterministic, Metrics};
365    use commonware_utils::NZU64;
366    use std::{future::Future, pin::Pin};
367
368    pub(crate) type Reopen<D> =
369        Box<dyn Fn(deterministic::Context) -> Pin<Box<dyn Future<Output = D> + Send>>>;
370
371    /// Test value factory: creates distinct values from an index.
372    pub(crate) trait TestValue: Clone + PartialEq + std::fmt::Debug + Send + Sync {
373        fn make(i: u64) -> Self;
374    }
375
376    impl TestValue for Vec<u8> {
377        fn make(i: u64) -> Self {
378            vec![(i % 255) as u8; ((i % 13) + 7) as usize]
379        }
380    }
381
382    impl TestValue for commonware_utils::sequence::U64 {
383        fn make(i: u64) -> Self {
384            Self::new(i * 10 + 1)
385        }
386    }
387
388    pub(crate) async fn test_keyless_db_empty<F: Family, V, C, H>(
389        context: deterministic::Context,
390        db: Keyless<F, deterministic::Context, V, C, H>,
391        reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
392    ) where
393        V: ValueEncoding<Value: TestValue>,
394        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
395        H: Hasher,
396        Operation<V>: EncodeShared,
397    {
398        let bounds = db.bounds().await;
399        assert_eq!(bounds.end, 1); // initial commit should exist
400        assert_eq!(bounds.start, Location::new(0));
401        assert_eq!(db.get_metadata().await.unwrap(), None);
402        assert_eq!(db.last_commit_loc(), Location::new(0));
403
404        // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
405        let root = db.root();
406        {
407            db.new_batch().append(V::Value::make(1));
408            // Don't merkleize/apply -- simulate failed commit
409        }
410        drop(db);
411
412        let mut db = reopen(context.with_label("db2")).await;
413        assert_eq!(db.root(), root);
414        assert_eq!(db.bounds().await.end, 1);
415        assert_eq!(db.get_metadata().await.unwrap(), None);
416
417        // Test calling commit on an empty db which should make it (durably) non-empty.
418        let metadata = V::Value::make(99);
419        let merkleized = db.new_batch().merkleize(&db, Some(metadata.clone()));
420        db.apply_batch(merkleized).await.unwrap();
421        db.commit().await.unwrap();
422        assert_eq!(db.bounds().await.end, 2); // 2 commit ops
423        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
424        assert_eq!(
425            db.get(Location::new(1)).await.unwrap(),
426            Some(metadata.clone())
427        ); // the commit op
428        let root = db.root();
429
430        // Commit op should remain after reopen even without clean shutdown.
431        let db = reopen(context.with_label("db3")).await;
432        assert_eq!(db.bounds().await.end, 2); // commit op should remain after re-open.
433        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
434        assert_eq!(db.root(), root);
435        assert_eq!(db.last_commit_loc(), Location::new(1));
436
437        db.destroy().await.unwrap();
438    }
439
440    pub(crate) async fn test_keyless_db_build_basic<F: Family, V, C, H>(
441        context: deterministic::Context,
442        mut db: Keyless<F, deterministic::Context, V, C, H>,
443        reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
444    ) where
445        V: ValueEncoding<Value: TestValue>,
446        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
447        H: Hasher,
448        Operation<V>: EncodeShared,
449    {
450        // Build a db with 2 values and make sure we can get them back.
451        let v1 = V::Value::make(1);
452        let v2 = V::Value::make(2);
453
454        {
455            let batch = db.new_batch();
456            let loc1 = batch.size();
457            let batch = batch.append(v1.clone());
458            let loc2 = batch.size();
459            let batch = batch.append(v2.clone());
460            assert_eq!(loc1, Location::new(1));
461            assert_eq!(loc2, Location::new(2));
462            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
463        }
464
465        // Make sure closing/reopening gets us back to the same state.
466        assert_eq!(db.bounds().await.end, 4); // 2 appends, 1 commit + 1 initial commit
467        assert_eq!(db.get_metadata().await.unwrap(), None);
468        assert_eq!(db.get(Location::new(3)).await.unwrap(), None); // the commit op
469        let root = db.root();
470        db.sync().await.unwrap();
471        drop(db);
472
473        let db = reopen(context.with_label("db2")).await;
474        assert_eq!(db.bounds().await.end, 4);
475        assert_eq!(db.root(), root);
476        assert_eq!(db.get(Location::new(1)).await.unwrap().unwrap(), v1);
477        assert_eq!(db.get(Location::new(2)).await.unwrap().unwrap(), v2);
478
479        // Make sure commit operation remains after drop/reopen.
480        drop(db);
481        let db = reopen(context.with_label("db3")).await;
482        assert_eq!(db.bounds().await.end, 4);
483        assert_eq!(db.root(), root);
484
485        db.destroy().await.unwrap();
486    }
487
488    pub(crate) async fn test_keyless_db_recovery<F: Family, V, C, H>(
489        context: deterministic::Context,
490        db: Keyless<F, deterministic::Context, V, C, H>,
491        reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
492    ) where
493        V: ValueEncoding<Value: TestValue>,
494        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
495        H: Hasher,
496        Operation<V>: EncodeShared,
497    {
498        let root = db.root();
499        const ELEMENTS: u64 = 100;
500
501        // Create uncommitted appends then simulate failure.
502        {
503            let mut batch = db.new_batch();
504            for i in 0..ELEMENTS {
505                batch = batch.append(V::Value::make(i));
506            }
507            // Don't merkleize/apply -- simulate failed commit
508        }
509        drop(db);
510        // Should rollback to the previous root.
511        let mut db = reopen(context.with_label("db2")).await;
512        assert_eq!(root, db.root());
513
514        // Apply the updates and commit them this time.
515        {
516            let mut batch = db.new_batch();
517            for i in 0..ELEMENTS {
518                batch = batch.append(V::Value::make(i + 100));
519            }
520            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
521        }
522        db.commit().await.unwrap();
523        let root = db.root();
524
525        // Create uncommitted appends then simulate failure.
526        {
527            let mut batch = db.new_batch();
528            for i in 0..ELEMENTS {
529                batch = batch.append(V::Value::make(i + 200));
530            }
531            // Don't merkleize/apply -- simulate failed commit
532        }
533        drop(db);
534        // Should rollback to the previous root.
535        let mut db = reopen(context.with_label("db3")).await;
536        assert_eq!(root, db.root());
537
538        // Apply the updates and commit them this time.
539        {
540            let mut batch = db.new_batch();
541            for i in 0..ELEMENTS {
542                batch = batch.append(V::Value::make(i + 300));
543            }
544            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
545        }
546        db.commit().await.unwrap();
547        let root = db.root();
548
549        // Make sure we can reopen and get back to the same state.
550        drop(db);
551        let db = reopen(context.with_label("db4")).await;
552        assert_eq!(db.bounds().await.end, 2 * ELEMENTS + 3);
553        assert_eq!(db.root(), root);
554
555        db.destroy().await.unwrap();
556    }
557
558    pub(crate) async fn test_keyless_db_proof<F: Family, V, C>(
559        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
560    ) where
561        V: ValueEncoding<Value: TestValue>,
562        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
563        Operation<V>: EncodeShared + std::fmt::Debug,
564    {
565        let hasher = Standard::<Sha256>::new();
566        const ELEMENTS: u64 = 50;
567
568        {
569            let mut batch = db.new_batch();
570            for i in 0..ELEMENTS {
571                batch = batch.append(V::Value::make(i));
572            }
573            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
574        }
575        let root = db.root();
576
577        let (proof, ops) = db.proof(Location::new(0), NZU64!(100)).await.unwrap();
578        assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
579        assert_eq!(ops.len() as u64, 1 + ELEMENTS + 1);
580
581        let (proof, ops) = db.proof(Location::new(10), NZU64!(5)).await.unwrap();
582        assert!(verify_proof(
583            &hasher,
584            &proof,
585            Location::new(10),
586            &ops,
587            &root
588        ));
589        assert_eq!(ops.len(), 5);
590
591        db.destroy().await.unwrap();
592    }
593
594    pub(crate) async fn test_keyless_db_metadata<F: Family, V, C, H>(
595        mut db: Keyless<F, deterministic::Context, V, C, H>,
596    ) where
597        V: ValueEncoding<Value: TestValue>,
598        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
599        H: Hasher,
600        Operation<V>: EncodeShared,
601    {
602        let metadata = V::Value::make(99);
603        let merkleized = db
604            .new_batch()
605            .append(V::Value::make(1))
606            .merkleize(&db, Some(metadata.clone()));
607        db.apply_batch(merkleized).await.unwrap();
608        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
609
610        let merkleized = db.new_batch().merkleize(&db, None);
611        db.apply_batch(merkleized).await.unwrap();
612        assert_eq!(db.get_metadata().await.unwrap(), None);
613
614        db.destroy().await.unwrap();
615    }
616
617    pub(crate) async fn test_keyless_db_pruning<F: Family, V, C, H>(
618        mut db: Keyless<F, deterministic::Context, V, C, H>,
619    ) where
620        V: ValueEncoding<Value: TestValue>,
621        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
622        H: Hasher,
623        Operation<V>: EncodeShared,
624    {
625        // Test pruning empty database (no appends beyond initial commit).
626        let result = db.prune(Location::new(1)).await;
627        assert!(
628            matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
629                if prune_loc == Location::new(1) && commit_loc == Location::new(0))
630        );
631
632        // Add values and commit.
633        let merkleized = db
634            .new_batch()
635            .append(V::Value::make(1))
636            .append(V::Value::make(2))
637            .merkleize(&db, None);
638        db.apply_batch(merkleized).await.unwrap();
639
640        // op_count is 4 (initial_commit, v1, v2, commit), last_commit_loc is 3.
641        let last_commit = db.last_commit_loc();
642        assert_eq!(last_commit, Location::new(3));
643
644        let merkleized = db
645            .new_batch()
646            .append(V::Value::make(3))
647            .merkleize(&db, None);
648        db.apply_batch(merkleized).await.unwrap();
649
650        // Test valid prune (at previous commit location 3).
651        let root = db.root();
652        assert!(db.prune(Location::new(3)).await.is_ok());
653        assert_eq!(db.root(), root);
654
655        // Test pruning beyond last commit.
656        let new_last_commit = db.last_commit_loc();
657        let beyond = Location::new(*new_last_commit + 1);
658        let result = db.prune(beyond).await;
659        assert!(
660            matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
661                if prune_loc == beyond && commit_loc == new_last_commit)
662        );
663
664        db.destroy().await.unwrap();
665    }
666
667    pub(crate) async fn test_keyless_db_empty_db_recovery<F: Family, V, C, H>(
668        context: deterministic::Context,
669        db: Keyless<F, deterministic::Context, V, C, H>,
670        reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
671    ) where
672        V: ValueEncoding<Value: TestValue>,
673        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
674        H: Hasher,
675        Operation<V>: EncodeShared,
676    {
677        let root = db.root();
678        const ELEMENTS: u64 = 200;
679
680        // Reopen DB without clean shutdown and make sure the state is the same.
681        let db = reopen(context.with_label("db2")).await;
682        assert_eq!(db.bounds().await.end, 1); // initial commit should exist
683        assert_eq!(db.root(), root);
684
685        // Simulate failure after inserting operations without a commit.
686        {
687            let mut batch = db.new_batch();
688            for i in 0..ELEMENTS {
689                batch = batch.append(V::Value::make(i));
690            }
691            // Don't merkleize/apply -- simulate failed commit
692        }
693        drop(db);
694        let db = reopen(context.with_label("db3")).await;
695        assert_eq!(db.bounds().await.end, 1); // initial commit should exist
696        assert_eq!(db.root(), root);
697
698        // Repeat: simulate failure after inserting operations without a commit.
699        {
700            let mut batch = db.new_batch();
701            for i in 0..ELEMENTS {
702                batch = batch.append(V::Value::make(i + 500));
703            }
704            // Don't merkleize/apply -- simulate failed commit
705        }
706        drop(db);
707        let db = reopen(context.with_label("db4")).await;
708        assert_eq!(db.bounds().await.end, 1); // initial commit should exist
709        assert_eq!(db.root(), root);
710
711        // One last check: multiple batches of uncommitted appends.
712        {
713            let mut batch = db.new_batch();
714            for i in 0..ELEMENTS * 3 {
715                batch = batch.append(V::Value::make(i + 1000));
716            }
717            // Don't merkleize/apply -- simulate failed commit
718        }
719        drop(db);
720        let mut db = reopen(context.with_label("db5")).await;
721        assert_eq!(db.bounds().await.end, 1); // initial commit should exist
722        assert_eq!(db.root(), root);
723        assert_eq!(db.last_commit_loc(), Location::new(0));
724
725        // Apply the ops one last time but fully commit them this time, then clean up.
726        {
727            let mut batch = db.new_batch();
728            for i in 0..ELEMENTS {
729                batch = batch.append(V::Value::make(i + 2000));
730            }
731            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
732        }
733        db.commit().await.unwrap();
734        let db = reopen(context.with_label("db6")).await;
735        assert!(db.bounds().await.end > 1);
736        assert_ne!(db.root(), root);
737
738        db.destroy().await.unwrap();
739    }
740
741    pub(crate) async fn test_keyless_db_replay_with_trailing_appends<F: Family, V, C, H>(
742        context: deterministic::Context,
743        mut db: Keyless<F, deterministic::Context, V, C, H>,
744        reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
745    ) where
746        V: ValueEncoding<Value: TestValue>,
747        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
748        H: Hasher,
749        Operation<V>: EncodeShared,
750    {
751        // Add some initial operations and commit.
752        {
753            let mut batch = db.new_batch();
754            for i in 0..10u64 {
755                batch = batch.append(V::Value::make(i));
756            }
757            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
758        }
759        db.commit().await.unwrap();
760        let committed_root = db.root();
761        let committed_size = db.bounds().await.end;
762
763        // Add exactly one more append (uncommitted).
764        {
765            db.new_batch().append(V::Value::make(99));
766            // Don't merkleize/apply -- simulate failed commit
767        }
768        drop(db);
769
770        // Reopen and verify correct recovery.
771        let mut db = reopen(context.with_label("db2")).await;
772        assert_eq!(
773            db.bounds().await.end,
774            committed_size,
775            "Should rewind to last commit"
776        );
777        assert_eq!(db.root(), committed_root, "Root should match last commit");
778        assert_eq!(
779            db.last_commit_loc(),
780            committed_size - 1,
781            "Last commit location should be correct"
782        );
783
784        // Verify we can append and commit new data after recovery.
785        let new_value = V::Value::make(77);
786        {
787            let batch = db.new_batch();
788            let loc = batch.size();
789            let batch = batch.append(new_value.clone());
790            assert_eq!(
791                loc, committed_size,
792                "New append should get the expected location"
793            );
794            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
795        }
796        db.commit().await.unwrap();
797
798        assert_eq!(db.get(committed_size).await.unwrap(), Some(new_value));
799
800        let new_committed_root = db.root();
801        let new_committed_size = db.bounds().await.end;
802
803        // Add multiple uncommitted appends.
804        {
805            let mut batch = db.new_batch();
806            for i in 0..5u64 {
807                batch = batch.append(V::Value::make(200 + i));
808            }
809            // Don't merkleize/apply -- simulate failed commit
810        }
811        drop(db);
812
813        // Reopen and verify correct recovery.
814        let db = reopen(context.with_label("db3")).await;
815        assert_eq!(
816            db.bounds().await.end,
817            new_committed_size,
818            "Should rewind to last commit with multiple trailing appends"
819        );
820        assert_eq!(
821            db.root(),
822            new_committed_root,
823            "Root should match last commit after multiple appends"
824        );
825        assert_eq!(
826            db.last_commit_loc(),
827            new_committed_size - 1,
828            "Last commit location should be correct after multiple appends"
829        );
830
831        db.destroy().await.unwrap();
832    }
833
834    pub(crate) async fn test_keyless_batch_chained<F: Family, V, C>(
835        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
836    ) where
837        V: ValueEncoding<Value: TestValue>,
838        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
839        Operation<V>: EncodeShared,
840    {
841        let v1 = V::Value::make(10);
842        let v2 = V::Value::make(20);
843        let v3 = V::Value::make(30);
844
845        let parent = db.new_batch();
846        let loc1 = parent.size();
847        let parent = parent.append(v1.clone());
848        let parent_m = parent.merkleize(&db, None);
849
850        let child = parent_m.new_batch::<Sha256>();
851        let loc2 = child.size();
852        let child = child.append(v2.clone());
853        let loc3 = child.size();
854        let child = child.append(v3.clone());
855        let child_m = child.merkleize(&db, None);
856        let child_root = child_m.root();
857
858        db.apply_batch(child_m).await.unwrap();
859        db.commit().await.unwrap();
860
861        assert_eq!(db.root(), child_root);
862        assert_eq!(db.get(loc1).await.unwrap(), Some(v1));
863        assert_eq!(db.get(loc2).await.unwrap(), Some(v2));
864        assert_eq!(db.get(loc3).await.unwrap(), Some(v3));
865
866        db.destroy().await.unwrap();
867    }
868
869    pub(crate) async fn test_keyless_stale_batch<F: Family, V, C, H>(
870        mut db: Keyless<F, deterministic::Context, V, C, H>,
871    ) where
872        V: ValueEncoding<Value: TestValue>,
873        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
874        H: Hasher,
875        Operation<V>: EncodeShared,
876    {
877        let batch_a = db
878            .new_batch()
879            .append(V::Value::make(10))
880            .merkleize(&db, None);
881        let batch_b = db
882            .new_batch()
883            .append(V::Value::make(20))
884            .merkleize(&db, None);
885
886        db.apply_batch(batch_a).await.unwrap();
887
888        let result = db.apply_batch(batch_b).await;
889        assert!(matches!(result, Err(Error::StaleBatch { .. })));
890
891        db.destroy().await.unwrap();
892    }
893
894    pub(crate) async fn test_keyless_partial_ancestor_commit<F: Family, V, C, H>(
895        mut db: Keyless<F, deterministic::Context, V, C, H>,
896    ) where
897        V: ValueEncoding<Value: TestValue>,
898        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
899        H: Hasher,
900        Operation<V>: EncodeShared,
901    {
902        // Chain: DB <- A <- B <- C
903        let a = db
904            .new_batch()
905            .append(V::Value::make(10))
906            .merkleize(&db, None);
907        let b = a
908            .new_batch::<H>()
909            .append(V::Value::make(20))
910            .merkleize(&db, None);
911        let c = b
912            .new_batch::<H>()
913            .append(V::Value::make(30))
914            .merkleize(&db, None);
915
916        let expected_root = c.root();
917
918        // Apply only A, then apply C directly (B's items applied via ancestor batches).
919        db.apply_batch(a).await.unwrap();
920        db.apply_batch(c).await.unwrap();
921
922        // Root must match what the full chain produces.
923        assert_eq!(db.root(), expected_root);
924
925        db.destroy().await.unwrap();
926    }
927
928    pub(crate) async fn test_keyless_to_batch<F: Family, V, C>(
929        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
930    ) where
931        V: ValueEncoding<Value: TestValue>,
932        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
933        Operation<V>: EncodeShared,
934    {
935        let batch = db.new_batch();
936        let loc1 = batch.size();
937        let batch = batch.append(V::Value::make(10));
938        db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
939
940        let snapshot = db.to_batch();
941        assert_eq!(snapshot.root(), db.root());
942
943        let child_batch = snapshot.new_batch::<Sha256>();
944        let loc2 = child_batch.size();
945        let child_batch = child_batch.append(V::Value::make(20));
946        db.apply_batch(child_batch.merkleize(&db, None))
947            .await
948            .unwrap();
949
950        assert_eq!(db.get(loc1).await.unwrap(), Some(V::Value::make(10)));
951        assert_eq!(db.get(loc2).await.unwrap(), Some(V::Value::make(20)));
952
953        db.destroy().await.unwrap();
954    }
955
956    pub(crate) async fn test_keyless_db_non_empty_recovery<F: Family, V, C, H>(
957        context: deterministic::Context,
958        mut db: Keyless<F, deterministic::Context, V, C, H>,
959        reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
960    ) where
961        V: ValueEncoding<Value: TestValue>,
962        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
963        H: Hasher,
964        Operation<V>: EncodeShared,
965    {
966        // Append many values then commit.
967        const ELEMENTS: u64 = 200;
968        {
969            let mut batch = db.new_batch();
970            for i in 0..ELEMENTS {
971                batch = batch.append(V::Value::make(i));
972            }
973            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
974        }
975        db.commit().await.unwrap();
976        let root = db.root();
977        let op_count = db.bounds().await.end;
978
979        // Reopen DB without clean shutdown and make sure the state is the same.
980        let db = reopen(context.with_label("db2")).await;
981        assert_eq!(db.bounds().await.end, op_count);
982        assert_eq!(db.root(), root);
983        assert_eq!(db.last_commit_loc(), op_count - 1);
984        drop(db);
985
986        // Insert many operations without commit, then simulate failure.
987        let db = reopen(context.with_label("recovery_a")).await;
988        {
989            let mut batch = db.new_batch();
990            for i in 0..ELEMENTS {
991                batch = batch.append(V::Value::make(i + 1000));
992            }
993            // Don't merkleize/apply -- simulate failed commit
994        }
995        drop(db);
996        let db = reopen(context.with_label("recovery_b")).await;
997        assert_eq!(db.bounds().await.end, op_count);
998        assert_eq!(db.root(), root);
999        drop(db);
1000
1001        // Repeat after pruning to the last commit.
1002        let mut db = reopen(context.with_label("db3")).await;
1003        db.prune(db.last_commit_loc()).await.unwrap();
1004        assert_eq!(db.bounds().await.end, op_count);
1005        assert_eq!(db.root(), root);
1006        db.sync().await.unwrap();
1007        drop(db);
1008
1009        let db = reopen(context.with_label("recovery_c")).await;
1010        {
1011            let mut batch = db.new_batch();
1012            for i in 0..ELEMENTS {
1013                batch = batch.append(V::Value::make(i + 2000));
1014            }
1015        }
1016        drop(db);
1017        let db = reopen(context.with_label("recovery_d")).await;
1018        assert_eq!(db.bounds().await.end, op_count);
1019        assert_eq!(db.root(), root);
1020        drop(db);
1021
1022        // Apply the ops one last time but fully commit them this time, then clean up.
1023        let mut db = reopen(context.with_label("db4")).await;
1024        {
1025            let mut batch = db.new_batch();
1026            for i in 0..ELEMENTS {
1027                batch = batch.append(V::Value::make(i + 3000));
1028            }
1029            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1030        }
1031        db.commit().await.unwrap();
1032        let db = reopen(context.with_label("db5")).await;
1033        let bounds = db.bounds().await;
1034        assert!(bounds.end > op_count);
1035        assert_ne!(db.root(), root);
1036        assert_eq!(db.last_commit_loc(), bounds.end - 1);
1037
1038        db.destroy().await.unwrap();
1039    }
1040
1041    pub(crate) async fn test_keyless_db_proof_comprehensive<F: Family, V, C>(
1042        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1043    ) where
1044        V: ValueEncoding<Value: TestValue>,
1045        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1046        Operation<V>: EncodeShared + std::fmt::Debug,
1047    {
1048        let hasher = Standard::<Sha256>::new();
1049
1050        // Build a db with some values.
1051        const ELEMENTS: u64 = 100;
1052        {
1053            let mut batch = db.new_batch();
1054            for i in 0u64..ELEMENTS {
1055                batch = batch.append(V::Value::make(i));
1056            }
1057            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1058        }
1059
1060        // Test that historical proof fails with op_count > number of operations.
1061        assert!(matches!(
1062            db.historical_proof(db.bounds().await.end + 1, Location::new(5), NZU64!(10))
1063                .await,
1064            Err(Error::<F>::Merkle(crate::merkle::Error::RangeOutOfBounds(
1065                _
1066            )))
1067        ));
1068
1069        let root = db.root();
1070
1071        for (start_loc, max_ops) in [
1072            (0, 10),
1073            (10, 5),
1074            (50, 20),
1075            (90, 15),
1076            (0, 1),
1077            (ELEMENTS - 1, 1),
1078            (ELEMENTS, 1),
1079        ] {
1080            let (proof, ops) = db
1081                .proof(Location::new(start_loc), NZU64!(max_ops))
1082                .await
1083                .unwrap();
1084            assert!(
1085                verify_proof(&hasher, &proof, Location::new(start_loc), &ops, &root),
1086                "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
1087            );
1088            let expected_ops = std::cmp::min(max_ops, *db.bounds().await.end - start_loc);
1089            assert_eq!(ops.len() as u64, expected_ops);
1090
1091            let wrong_root = Sha256::hash(&[0xFF; 32]);
1092            assert!(!verify_proof(
1093                &hasher,
1094                &proof,
1095                Location::new(start_loc),
1096                &ops,
1097                &wrong_root
1098            ));
1099            if start_loc > 0 {
1100                assert!(!verify_proof(
1101                    &hasher,
1102                    &proof,
1103                    Location::new(start_loc - 1),
1104                    &ops,
1105                    &root
1106                ));
1107            }
1108        }
1109
1110        db.destroy().await.unwrap();
1111    }
1112
1113    pub(crate) async fn test_keyless_db_proof_with_pruning<F: Family, V, C>(
1114        context: deterministic::Context,
1115        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1116        reopen: Reopen<Keyless<F, deterministic::Context, V, C, Sha256>>,
1117    ) where
1118        V: ValueEncoding<Value: TestValue>,
1119        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1120        Operation<V>: EncodeShared + std::fmt::Debug,
1121    {
1122        let hasher = Standard::<Sha256>::new();
1123
1124        const ELEMENTS: u64 = 100;
1125        {
1126            let mut batch = db.new_batch();
1127            for i in 0u64..ELEMENTS {
1128                batch = batch.append(V::Value::make(i));
1129            }
1130            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1131        }
1132
1133        {
1134            let mut batch = db.new_batch();
1135            for i in ELEMENTS..ELEMENTS * 2 {
1136                batch = batch.append(V::Value::make(i));
1137            }
1138            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1139        }
1140        let root = db.root();
1141
1142        const PRUNE_LOC: u64 = 30;
1143        db.prune(Location::new(PRUNE_LOC)).await.unwrap();
1144        let oldest_retained = db.bounds().await.start;
1145        assert_eq!(db.root(), root);
1146
1147        db.sync().await.unwrap();
1148        drop(db);
1149        let mut db = reopen(context).await;
1150        assert_eq!(db.root(), root);
1151
1152        for (start_loc, max_ops) in [
1153            (oldest_retained, 10),
1154            (Location::new(50), 20),
1155            (Location::new(150), 10),
1156            (Location::new(190), 15),
1157        ] {
1158            if start_loc < oldest_retained {
1159                continue;
1160            }
1161            let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
1162            assert!(verify_proof(&hasher, &proof, start_loc, &ops, &root));
1163        }
1164
1165        let aggressive_prune: Location<F> = Location::new(150);
1166        db.prune(aggressive_prune).await.unwrap();
1167
1168        let new_oldest = db.bounds().await.start;
1169        let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
1170        assert!(verify_proof(&hasher, &proof, new_oldest, &ops, &root));
1171
1172        let almost_all = db.bounds().await.end - 5;
1173        db.prune(almost_all).await.unwrap();
1174        let final_oldest = db.bounds().await.start;
1175        if final_oldest < db.bounds().await.end {
1176            let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
1177            assert!(verify_proof(
1178                &hasher,
1179                &final_proof,
1180                final_oldest,
1181                &final_ops,
1182                &root
1183            ));
1184        }
1185
1186        db.destroy().await.unwrap();
1187    }
1188
1189    pub(crate) async fn test_keyless_db_get_out_of_bounds<F: Family, V, C, H>(
1190        mut db: Keyless<F, deterministic::Context, V, C, H>,
1191    ) where
1192        V: ValueEncoding<Value: TestValue>,
1193        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1194        H: Hasher,
1195        Operation<V>: EncodeShared,
1196    {
1197        assert!(db.get(Location::new(0)).await.unwrap().is_none());
1198
1199        let merkleized = db
1200            .new_batch()
1201            .append(V::Value::make(1))
1202            .append(V::Value::make(2))
1203            .merkleize(&db, None);
1204        db.apply_batch(merkleized).await.unwrap();
1205
1206        assert_eq!(
1207            db.get(Location::new(1)).await.unwrap(),
1208            Some(V::Value::make(1))
1209        );
1210        assert!(db.get(Location::new(3)).await.unwrap().is_none());
1211        assert!(matches!(
1212            db.get(Location::new(4)).await,
1213            Err(Error::LocationOutOfBounds(loc, size))
1214                if loc == Location::new(4) && size == Location::new(4)
1215        ));
1216
1217        db.destroy().await.unwrap();
1218    }
1219
1220    pub(crate) async fn test_keyless_batch_get<F: Family, V, C, H>(
1221        mut db: Keyless<F, deterministic::Context, V, C, H>,
1222    ) where
1223        V: ValueEncoding<Value: TestValue>,
1224        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1225        H: Hasher,
1226        Operation<V>: EncodeShared,
1227    {
1228        let base_vals: Vec<V::Value> = (0..3).map(|i| V::Value::make(10 + i)).collect();
1229        let mut base_locs = Vec::new();
1230        {
1231            let mut batch = db.new_batch();
1232            for v in &base_vals {
1233                let loc = batch.size();
1234                batch = batch.append(v.clone());
1235                base_locs.push(loc);
1236            }
1237            db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1238        }
1239
1240        let batch = db.new_batch();
1241        for (i, loc) in base_locs.iter().enumerate() {
1242            assert_eq!(
1243                batch.get(*loc, &db).await.unwrap(),
1244                Some(base_vals[i].clone()),
1245            );
1246        }
1247
1248        let new_val = V::Value::make(99);
1249        let new_loc = batch.size();
1250        let batch = batch.append(new_val.clone());
1251        assert_eq!(batch.get(new_loc, &db).await.unwrap(), Some(new_val));
1252        assert_eq!(
1253            batch.get(Location::new(*new_loc + 1), &db).await.unwrap(),
1254            None
1255        );
1256
1257        db.destroy().await.unwrap();
1258    }
1259
1260    pub(crate) async fn test_keyless_batch_stacked_get<F: Family, V, C>(
1261        db: Keyless<F, deterministic::Context, V, C, Sha256>,
1262    ) where
1263        V: ValueEncoding<Value: TestValue>,
1264        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1265        Operation<V>: EncodeShared,
1266    {
1267        let v1 = V::Value::make(1);
1268        let v2 = V::Value::make(2);
1269
1270        let parent = db.new_batch();
1271        let loc1 = parent.size();
1272        let parent = parent.append(v1.clone());
1273        let parent_m = parent.merkleize(&db, None);
1274
1275        let child = parent_m.new_batch::<Sha256>();
1276        assert_eq!(child.get(loc1, &db).await.unwrap(), Some(v1));
1277
1278        let loc2 = child.size();
1279        let child = child.append(v2.clone());
1280        assert_eq!(child.get(loc2, &db).await.unwrap(), Some(v2));
1281        assert_eq!(child.get(Location::new(9999), &db).await.unwrap(), None);
1282
1283        db.destroy().await.unwrap();
1284    }
1285
1286    pub(crate) async fn test_keyless_batch_speculative_root<F: Family, V, C, H>(
1287        mut db: Keyless<F, deterministic::Context, V, C, H>,
1288    ) where
1289        V: ValueEncoding<Value: TestValue>,
1290        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1291        H: Hasher,
1292        Operation<V>: EncodeShared,
1293    {
1294        let mut batch = db.new_batch();
1295        for i in 0u64..10 {
1296            batch = batch.append(V::Value::make(i));
1297        }
1298        let merkleized = batch.merkleize(&db, None);
1299        let speculative = merkleized.root();
1300        db.apply_batch(merkleized).await.unwrap();
1301        assert_eq!(db.root(), speculative);
1302
1303        let merkleized = db
1304            .new_batch()
1305            .append(V::Value::make(100))
1306            .merkleize(&db, Some(V::Value::make(55)));
1307        let speculative = merkleized.root();
1308        db.apply_batch(merkleized).await.unwrap();
1309        assert_eq!(db.root(), speculative);
1310
1311        db.destroy().await.unwrap();
1312    }
1313
1314    pub(crate) async fn test_keyless_merkleized_batch_get<F: Family, V, C>(
1315        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1316    ) where
1317        V: ValueEncoding<Value: TestValue>,
1318        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1319        Operation<V>: EncodeShared,
1320    {
1321        let base_val = V::Value::make(10);
1322        let merkleized = db.new_batch().append(base_val.clone()).merkleize(&db, None);
1323        db.apply_batch(merkleized).await.unwrap();
1324
1325        let new_val = V::Value::make(20);
1326        let merkleized = db.new_batch().append(new_val.clone()).merkleize(&db, None);
1327
1328        assert_eq!(
1329            merkleized.get(Location::new(1), &db).await.unwrap(),
1330            Some(base_val),
1331        );
1332        assert_eq!(
1333            merkleized.get(Location::new(3), &db).await.unwrap(),
1334            Some(new_val),
1335        );
1336        assert_eq!(merkleized.get(Location::new(4), &db).await.unwrap(), None);
1337
1338        db.destroy().await.unwrap();
1339    }
1340
1341    pub(crate) async fn test_keyless_batch_chained_apply_sequential<F: Family, V, C, H>(
1342        mut db: Keyless<F, deterministic::Context, V, C, H>,
1343    ) where
1344        V: ValueEncoding<Value: TestValue>,
1345        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1346        H: Hasher,
1347        Operation<V>: EncodeShared,
1348    {
1349        let v1 = V::Value::make(1);
1350        let v2 = V::Value::make(2);
1351
1352        let parent = db.new_batch();
1353        let loc1 = parent.size();
1354        let parent = parent.append(v1.clone());
1355        let parent_m = parent.merkleize(&db, None);
1356        let parent_root = parent_m.root();
1357
1358        db.apply_batch(parent_m).await.unwrap();
1359        assert_eq!(db.root(), parent_root);
1360        assert_eq!(db.get(loc1).await.unwrap(), Some(v1));
1361
1362        let batch2 = db.new_batch();
1363        let loc2 = batch2.size();
1364        let batch2 = batch2.append(v2.clone());
1365        let batch2_m = batch2.merkleize(&db, None);
1366        let batch2_root = batch2_m.root();
1367        db.apply_batch(batch2_m).await.unwrap();
1368        assert_eq!(db.root(), batch2_root);
1369        assert_eq!(db.get(loc2).await.unwrap(), Some(v2));
1370
1371        db.destroy().await.unwrap();
1372    }
1373
1374    pub(crate) async fn test_keyless_batch_many_sequential<F: Family, V, C>(
1375        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1376    ) where
1377        V: ValueEncoding<Value: TestValue>,
1378        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1379        Operation<V>: EncodeShared + std::fmt::Debug,
1380    {
1381        let hasher = Standard::<Sha256>::new();
1382
1383        const BATCHES: u64 = 20;
1384        const APPENDS_PER_BATCH: u64 = 5;
1385        let mut all_values: Vec<V::Value> = Vec::new();
1386        let mut all_locs: Vec<Location<F>> = Vec::new();
1387
1388        for batch_idx in 0..BATCHES {
1389            let mut batch = db.new_batch();
1390            for j in 0..APPENDS_PER_BATCH {
1391                let v = V::Value::make(batch_idx * 10 + j);
1392                let loc = batch.size();
1393                batch = batch.append(v.clone());
1394                all_values.push(v);
1395                all_locs.push(loc);
1396            }
1397            let merkleized = batch.merkleize(&db, None);
1398            db.apply_batch(merkleized).await.unwrap();
1399        }
1400
1401        for (i, loc) in all_locs.iter().enumerate() {
1402            assert_eq!(db.get(*loc).await.unwrap(), Some(all_values[i].clone()));
1403        }
1404
1405        let root = db.root();
1406        let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1407        assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1408        assert_eq!(db.bounds().await.end, 1 + BATCHES * (APPENDS_PER_BATCH + 1));
1409
1410        db.destroy().await.unwrap();
1411    }
1412
1413    pub(crate) async fn test_keyless_batch_empty<F: Family, V, C, H>(
1414        mut db: Keyless<F, deterministic::Context, V, C, H>,
1415    ) where
1416        V: ValueEncoding<Value: TestValue>,
1417        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1418        H: Hasher,
1419        Operation<V>: EncodeShared,
1420    {
1421        let merkleized = db
1422            .new_batch()
1423            .append(V::Value::make(1))
1424            .merkleize(&db, None);
1425        db.apply_batch(merkleized).await.unwrap();
1426        let root_before = db.root();
1427        let size_before = db.bounds().await.end;
1428
1429        let merkleized = db.new_batch().merkleize(&db, None);
1430        let speculative = merkleized.root();
1431        db.apply_batch(merkleized).await.unwrap();
1432
1433        assert_ne!(db.root(), root_before);
1434        assert_eq!(db.root(), speculative);
1435        assert_eq!(db.bounds().await.end, size_before + 1);
1436
1437        db.destroy().await.unwrap();
1438    }
1439
1440    pub(crate) async fn test_keyless_batch_chained_merkleized_get<F: Family, V, C>(
1441        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1442    ) where
1443        V: ValueEncoding<Value: TestValue>,
1444        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1445        Operation<V>: EncodeShared,
1446    {
1447        let base_val = V::Value::make(10);
1448        db.apply_batch(db.new_batch().append(base_val.clone()).merkleize(&db, None))
1449            .await
1450            .unwrap();
1451
1452        let v1 = V::Value::make(1);
1453        let parent = db.new_batch();
1454        let loc1 = parent.size();
1455        let parent_m = parent.append(v1.clone()).merkleize(&db, None);
1456
1457        let v2 = V::Value::make(2);
1458        let child = parent_m.new_batch::<Sha256>();
1459        let loc2 = child.size();
1460        let child_m = child.append(v2.clone()).merkleize(&db, None);
1461
1462        assert_eq!(
1463            child_m.get(Location::new(1), &db).await.unwrap(),
1464            Some(base_val),
1465        );
1466        assert_eq!(child_m.get(loc1, &db).await.unwrap(), Some(v1));
1467        assert_eq!(child_m.get(loc2, &db).await.unwrap(), Some(v2));
1468
1469        db.destroy().await.unwrap();
1470    }
1471
1472    pub(crate) async fn test_keyless_batch_large<F: Family, V, C>(
1473        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1474    ) where
1475        V: ValueEncoding<Value: TestValue>,
1476        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1477        Operation<V>: EncodeShared + std::fmt::Debug,
1478    {
1479        let hasher = Standard::<Sha256>::new();
1480        const N: u64 = 500;
1481        let mut values = Vec::new();
1482        let mut locs = Vec::new();
1483
1484        let mut batch = db.new_batch();
1485        for i in 0..N {
1486            let v = V::Value::make(i);
1487            locs.push(batch.size());
1488            batch = batch.append(v.clone());
1489            values.push(v);
1490        }
1491        let merkleized = batch.merkleize(&db, None);
1492        db.apply_batch(merkleized).await.unwrap();
1493
1494        for (i, loc) in locs.iter().enumerate() {
1495            assert_eq!(db.get(*loc).await.unwrap(), Some(values[i].clone()));
1496        }
1497
1498        let root = db.root();
1499        let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1500        assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1501        assert_eq!(db.bounds().await.end, 1 + N + 1);
1502
1503        db.destroy().await.unwrap();
1504    }
1505
1506    pub(crate) async fn test_keyless_stale_batch_chained<F: Family, V, C>(
1507        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1508    ) where
1509        V: ValueEncoding<Value: TestValue>,
1510        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1511        Operation<V>: EncodeShared,
1512    {
1513        let parent = db
1514            .new_batch()
1515            .append(V::Value::make(1))
1516            .merkleize(&db, None);
1517        let child_a = parent
1518            .new_batch::<Sha256>()
1519            .append(V::Value::make(2))
1520            .merkleize(&db, None);
1521        let child_b = parent
1522            .new_batch::<Sha256>()
1523            .append(V::Value::make(3))
1524            .merkleize(&db, None);
1525
1526        db.apply_batch(child_a).await.unwrap();
1527        assert!(matches!(
1528            db.apply_batch(child_b).await,
1529            Err(Error::StaleBatch { .. })
1530        ));
1531
1532        db.destroy().await.unwrap();
1533    }
1534
1535    pub(crate) async fn test_keyless_sequential_commit_parent_then_child<F: Family, V, C>(
1536        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1537    ) where
1538        V: ValueEncoding<Value: TestValue>,
1539        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1540        Operation<V>: EncodeShared,
1541    {
1542        let parent = db
1543            .new_batch()
1544            .append(V::Value::make(1))
1545            .merkleize(&db, None);
1546        let child = parent
1547            .new_batch::<Sha256>()
1548            .append(V::Value::make(2))
1549            .merkleize(&db, None);
1550
1551        db.apply_batch(parent).await.unwrap();
1552        db.apply_batch(child).await.unwrap();
1553
1554        db.destroy().await.unwrap();
1555    }
1556
1557    pub(crate) async fn test_keyless_stale_batch_child_before_parent<F: Family, V, C>(
1558        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1559    ) where
1560        V: ValueEncoding<Value: TestValue>,
1561        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1562        Operation<V>: EncodeShared,
1563    {
1564        let parent = db
1565            .new_batch()
1566            .append(V::Value::make(1))
1567            .merkleize(&db, None);
1568        let child = parent
1569            .new_batch::<Sha256>()
1570            .append(V::Value::make(2))
1571            .merkleize(&db, None);
1572
1573        db.apply_batch(child).await.unwrap();
1574        assert!(matches!(
1575            db.apply_batch(parent).await,
1576            Err(Error::StaleBatch { .. })
1577        ));
1578
1579        db.destroy().await.unwrap();
1580    }
1581
1582    pub(crate) async fn test_keyless_child_root_matches_pending_and_committed<F: Family, V, C>(
1583        mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1584    ) where
1585        V: ValueEncoding<Value: TestValue>,
1586        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1587        Operation<V>: EncodeShared,
1588    {
1589        // Build the child while the parent is still pending.
1590        let parent = db
1591            .new_batch()
1592            .append(V::Value::make(1))
1593            .merkleize(&db, None);
1594        let pending_child = parent
1595            .new_batch::<Sha256>()
1596            .append(V::Value::make(2))
1597            .merkleize(&db, None);
1598
1599        // Commit the parent, then rebuild the same logical child from the
1600        // committed DB state and compare roots.
1601        db.apply_batch(parent).await.unwrap();
1602        db.commit().await.unwrap();
1603
1604        let committed_child = db
1605            .new_batch()
1606            .append(V::Value::make(2))
1607            .merkleize(&db, None);
1608
1609        assert_eq!(pending_child.root(), committed_child.root());
1610
1611        db.destroy().await.unwrap();
1612    }
1613
1614    async fn commit_appends<F: Family, V, C, H>(
1615        db: &mut Keyless<F, deterministic::Context, V, C, H>,
1616        values: impl IntoIterator<Item = V::Value>,
1617        metadata: Option<V::Value>,
1618    ) -> core::ops::Range<Location<F>>
1619    where
1620        V: ValueEncoding<Value: TestValue>,
1621        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1622        H: Hasher,
1623        Operation<V>: EncodeShared,
1624    {
1625        let mut batch = db.new_batch();
1626        for value in values {
1627            batch = batch.append(value);
1628        }
1629        let range = db.apply_batch(batch.merkleize(db, metadata)).await.unwrap();
1630        db.commit().await.unwrap();
1631        range
1632    }
1633
1634    pub(crate) async fn test_keyless_db_rewind_recovery<F: Family, V, C, H>(
1635        context: deterministic::Context,
1636        mut db: Keyless<F, deterministic::Context, V, C, H>,
1637        reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
1638    ) where
1639        V: ValueEncoding<Value: TestValue>,
1640        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1641        H: Hasher,
1642        Operation<V>: EncodeShared,
1643    {
1644        let initial_root = db.root();
1645        let initial_size = db.bounds().await.end;
1646
1647        let value_a = V::Value::make(1);
1648        let value_b = V::Value::make(2);
1649        let metadata_a = V::Value::make(3);
1650        let first_range = commit_appends(
1651            &mut db,
1652            [value_a.clone(), value_b.clone()],
1653            Some(metadata_a.clone()),
1654        )
1655        .await;
1656
1657        let root_before = db.root();
1658        let size_before = db.bounds().await.end;
1659        let commit_before = db.last_commit_loc();
1660        assert_eq!(size_before, first_range.end);
1661
1662        let value_c = V::Value::make(4);
1663        let metadata_b = V::Value::make(5);
1664        let second_range =
1665            commit_appends(&mut db, [value_c.clone()], Some(metadata_b.clone())).await;
1666        assert_eq!(second_range.start, size_before);
1667        assert_ne!(db.root(), root_before);
1668        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_b));
1669
1670        db.rewind(size_before).await.unwrap();
1671        assert_eq!(db.root(), root_before);
1672        assert_eq!(db.bounds().await.end, size_before);
1673        assert_eq!(db.last_commit_loc(), commit_before);
1674        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a.clone()));
1675        assert_eq!(
1676            db.get(Location::new(1)).await.unwrap(),
1677            Some(value_a.clone())
1678        );
1679        assert_eq!(
1680            db.get(Location::new(2)).await.unwrap(),
1681            Some(value_b.clone())
1682        );
1683        assert!(
1684            matches!(
1685                db.get(Location::new(4)).await,
1686                Err(Error::LocationOutOfBounds(_, size)) if size == size_before
1687            ),
1688            "rewound append should be out of bounds",
1689        );
1690
1691        db.commit().await.unwrap();
1692        drop(db);
1693        let mut db = reopen(context.with_label("reopen")).await;
1694        assert_eq!(db.root(), root_before);
1695        assert_eq!(db.bounds().await.end, size_before);
1696        assert_eq!(db.last_commit_loc(), commit_before);
1697        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1698        assert_eq!(
1699            db.get(Location::new(1)).await.unwrap(),
1700            Some(value_a.clone())
1701        );
1702        assert_eq!(
1703            db.get(Location::new(2)).await.unwrap(),
1704            Some(value_b.clone())
1705        );
1706        assert!(matches!(
1707            db.get(Location::new(4)).await,
1708            Err(Error::LocationOutOfBounds(_, size)) if size == size_before
1709        ));
1710
1711        db.rewind(initial_size).await.unwrap();
1712        assert_eq!(db.root(), initial_root);
1713        assert_eq!(db.bounds().await.end, initial_size);
1714        assert_eq!(db.get_metadata().await.unwrap(), None);
1715        assert!(matches!(
1716            db.get(Location::new(1)).await,
1717            Err(Error::LocationOutOfBounds(_, size)) if size == initial_size
1718        ));
1719
1720        db.commit().await.unwrap();
1721        drop(db);
1722        let db = reopen(context.with_label("reopen_initial_boundary")).await;
1723        assert_eq!(db.root(), initial_root);
1724        assert_eq!(db.bounds().await.end, initial_size);
1725        assert_eq!(db.get_metadata().await.unwrap(), None);
1726        assert!(matches!(
1727            db.get(Location::new(1)).await,
1728            Err(Error::LocationOutOfBounds(_, size)) if size == initial_size
1729        ));
1730
1731        db.destroy().await.unwrap();
1732    }
1733
1734    pub(crate) async fn test_keyless_db_rewind_pruned_target_errors<F: Family, V, C, H>(
1735        mut db: Keyless<F, deterministic::Context, V, C, H>,
1736    ) where
1737        V: ValueEncoding<Value: TestValue>,
1738        C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1739        H: Hasher,
1740        Operation<V>: EncodeShared,
1741    {
1742        let first_range = commit_appends(&mut db, (0..16).map(V::Value::make), None).await;
1743
1744        let mut round = 0u64;
1745        loop {
1746            round += 1;
1747            assert!(
1748                round <= 64,
1749                "failed to prune enough history for rewind test"
1750            );
1751
1752            commit_appends(
1753                &mut db,
1754                (0..16).map(|i| V::Value::make(round * 100 + i)),
1755                None,
1756            )
1757            .await;
1758            db.prune(db.last_commit_loc()).await.unwrap();
1759
1760            if db.bounds().await.start > first_range.start {
1761                break;
1762            }
1763        }
1764
1765        let oldest_retained = db.bounds().await.start;
1766        let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
1767        assert!(
1768            matches!(
1769                boundary_err,
1770                Error::Journal(crate::journal::Error::ItemPruned(_))
1771            ),
1772            "unexpected rewind error at retained boundary: {boundary_err:?}"
1773        );
1774
1775        let err = db.rewind(first_range.start).await.unwrap_err();
1776        assert!(
1777            matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1778            "unexpected rewind error: {err:?}"
1779        );
1780
1781        db.destroy().await.unwrap();
1782    }
1783}