commonware_storage/qmdb/keyless/
mod.rs

1//! The [Keyless] qmdb allows for append-only storage of arbitrary variable-length data that can
2//! later be retrieved by its location.
3
4use crate::{
5    journal::{
6        authenticated,
7        contiguous::variable::{Config as JournalConfig, Journal as ContiguousJournal},
8    },
9    mmr::{journaled::Config as MmrConfig, Location, Proof},
10    qmdb::{
11        any::VariableValue,
12        operation::Committable,
13        store::{LogStore, MerkleizedStore, PrunableStore},
14        DurabilityState, Durable, Error, MerkleizationState, Merkleized, NonDurable, Unmerkleized,
15    },
16};
17use commonware_cryptography::{DigestOf, Hasher};
18use commonware_parallel::ThreadPool;
19use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage};
20use core::{marker::PhantomData, ops::Range};
21use std::num::{NonZeroU64, NonZeroUsize};
22use tracing::{debug, warn};
23
24mod operation;
25pub use operation::Operation;
26
27/// Configuration for a [Keyless] authenticated db.
28#[derive(Clone)]
29pub struct Config<C> {
30    /// The name of the [Storage] partition used for the MMR's backing journal.
31    pub mmr_journal_partition: String,
32
33    /// The items per blob configuration value used by the MMR journal.
34    pub mmr_items_per_blob: NonZeroU64,
35
36    /// The size of the write buffer to use for each blob in the MMR journal.
37    pub mmr_write_buffer: NonZeroUsize,
38
39    /// The name of the [Storage] partition used for the MMR's metadata.
40    pub mmr_metadata_partition: String,
41
42    /// The name of the [Storage] partition used to persist the operations log.
43    pub log_partition: String,
44
45    /// The size of the write buffer to use with the log journal.
46    pub log_write_buffer: NonZeroUsize,
47
48    /// Optional compression level (using `zstd`) to apply to log data before storing.
49    pub log_compression: Option<u8>,
50
51    /// The codec configuration to use for encoding and decoding the operations log.
52    pub log_codec_config: C,
53
54    /// The max number of operations to put in each section of the operations log.
55    pub log_items_per_section: NonZeroU64,
56
57    /// An optional thread pool to use for parallelizing batch MMR operations.
58    pub thread_pool: Option<ThreadPool>,
59
60    /// The buffer pool to use for caching data.
61    pub buffer_pool: PoolRef,
62}
63
64/// A keyless QMDB for variable length data.
65type Journal<E, V, H, S> = authenticated::Journal<E, ContiguousJournal<E, Operation<V>>, H, S>;
66
67/// A keyless authenticated database for variable-length data.
68pub struct Keyless<
69    E: Storage + Clock + Metrics,
70    V: VariableValue,
71    H: Hasher,
72    M: MerkleizationState<DigestOf<H>> = Merkleized<H>,
73    D: DurabilityState = Durable,
74> {
75    /// Authenticated journal of operations.
76    journal: Journal<E, V, H, M>,
77
78    /// The location of the last commit, if any.
79    last_commit_loc: Location,
80
81    /// Marker for durability state.
82    _durability: PhantomData<D>,
83}
84
85// Impl block for functionality available in all states.
86impl<
87        E: Storage + Clock + Metrics,
88        V: VariableValue,
89        H: Hasher,
90        M: MerkleizationState<DigestOf<H>>,
91        D: DurabilityState,
92    > Keyless<E, V, H, M, D>
93{
94    /// Get the value at location `loc` in the database.
95    ///
96    /// # Errors
97    ///
98    /// Returns [Error::LocationOutOfBounds] if `loc` >= `self.op_count()`.
99    pub async fn get(&self, loc: Location) -> Result<Option<V>, Error> {
100        let op_count = self.op_count();
101        if loc >= op_count {
102            return Err(Error::LocationOutOfBounds(loc, op_count));
103        }
104        let op = self.journal.read(loc).await?;
105
106        Ok(op.into_value())
107    }
108
109    /// Get the number of operations (appends + commits) that have been applied to the db since
110    /// inception.
111    pub fn op_count(&self) -> Location {
112        self.journal.size()
113    }
114
115    /// Returns the location of the last commit.
116    pub const fn last_commit_loc(&self) -> Location {
117        self.last_commit_loc
118    }
119
120    /// Return the oldest location that remains retrievable.
121    pub fn oldest_retained_loc(&self) -> Location {
122        self.journal
123            .oldest_retained_loc()
124            .expect("at least one operation should exist")
125    }
126
127    /// Return the oldest location that is no longer required to be retained.
128    pub fn inactivity_floor_loc(&self) -> Location {
129        self.journal.pruning_boundary()
130    }
131
132    /// Get the metadata associated with the last commit.
133    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
134        let op = self.journal.read(self.last_commit_loc).await?;
135        let Operation::Commit(metadata) = op else {
136            return Ok(None);
137        };
138
139        Ok(metadata)
140    }
141}
142
143// Implementation for the Clean state.
144impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
145    Keyless<E, V, H, Merkleized<H>, Durable>
146{
147    /// Returns a [Keyless] qmdb initialized from `cfg`. Any uncommitted operations will be discarded
148    /// and the state of the db will be as of the last committed operation.
149    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
150        let mmr_cfg = MmrConfig {
151            journal_partition: cfg.mmr_journal_partition,
152            metadata_partition: cfg.mmr_metadata_partition,
153            items_per_blob: cfg.mmr_items_per_blob,
154            write_buffer: cfg.mmr_write_buffer,
155            thread_pool: cfg.thread_pool,
156            buffer_pool: cfg.buffer_pool.clone(),
157        };
158
159        let journal_cfg = JournalConfig {
160            partition: cfg.log_partition,
161            items_per_section: cfg.log_items_per_section,
162            compression: cfg.log_compression,
163            codec_config: cfg.log_codec_config,
164            buffer_pool: cfg.buffer_pool,
165            write_buffer: cfg.log_write_buffer,
166        };
167
168        let mut journal = Journal::new(context, mmr_cfg, journal_cfg, Operation::is_commit).await?;
169        if journal.size() == 0 {
170            warn!("no operations found in log, creating initial commit");
171            journal.append(Operation::Commit(None)).await?;
172            journal.sync().await?;
173        }
174
175        let last_commit_loc = journal
176            .size()
177            .checked_sub(1)
178            .expect("at least one commit should exist");
179
180        Ok(Self {
181            journal,
182            last_commit_loc,
183            _durability: PhantomData,
184        })
185    }
186
187    /// Return the root of the db.
188    pub const fn root(&self) -> H::Digest {
189        self.journal.root()
190    }
191
192    /// Generate and return:
193    ///  1. a proof of all operations applied to the db in the range starting at (and including)
194    ///     location `start_loc`, and ending at the first of either:
195    ///     - the last operation performed, or
196    ///     - the operation `max_ops` from the start.
197    ///  2. the operations corresponding to the leaves in this range.
198    pub async fn proof(
199        &self,
200        start_loc: Location,
201        max_ops: NonZeroU64,
202    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
203        self.historical_proof(self.op_count(), start_loc, max_ops)
204            .await
205    }
206
207    /// Analogous to proof, but with respect to the state of the MMR when it had `op_count`
208    /// operations.
209    ///
210    /// # Errors
211    ///
212    /// - Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
213    ///   [crate::mmr::MAX_LOCATION].
214    /// - Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= `op_count` or `op_count` >
215    ///   number of operations.
216    pub async fn historical_proof(
217        &self,
218        op_count: Location,
219        start_loc: Location,
220        max_ops: NonZeroU64,
221    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
222        Ok(self
223            .journal
224            .historical_proof(op_count, start_loc, max_ops)
225            .await?)
226    }
227
228    /// Prune historical operations prior to `loc`. This does not affect the db's root.
229    ///
230    /// # Errors
231    ///
232    /// - Returns [Error::PruneBeyondMinRequired] if `loc` > last commit point.
233    /// - Returns [crate::mmr::Error::LocationOverflow] if `loc` > [crate::mmr::MAX_LOCATION]
234    pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
235        if loc > self.last_commit_loc {
236            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
237        }
238        self.journal.prune(loc).await?;
239
240        Ok(())
241    }
242
243    /// Sync all database state to disk. While this isn't necessary to ensure durability of
244    /// committed operations, periodic invocation may reduce memory usage and the time required to
245    /// recover the database on restart.
246    pub async fn sync(&mut self) -> Result<(), Error> {
247        self.journal.sync().await.map_err(Into::into)
248    }
249
250    /// Destroy the db, removing all data from disk.
251    pub async fn destroy(self) -> Result<(), Error> {
252        Ok(self.journal.destroy().await?)
253    }
254
255    /// Convert this database into the Mutable state for accepting new operations.
256    pub fn into_mutable(self) -> Keyless<E, V, H, Unmerkleized, NonDurable> {
257        Keyless {
258            journal: self.journal.into_dirty(),
259            last_commit_loc: self.last_commit_loc,
260            _durability: PhantomData,
261        }
262    }
263}
264
265// Implementation for the Mutable state.
266impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
267    Keyless<E, V, H, Unmerkleized, NonDurable>
268{
269    /// Append a value to the db, returning its location which can be used to retrieve it.
270    pub async fn append(&mut self, value: V) -> Result<Location, Error> {
271        self.journal
272            .append(Operation::Append(value))
273            .await
274            .map_err(Into::into)
275    }
276
277    /// Commits any pending operations and transitions the database to the Durable state.
278    ///
279    /// The caller can associate an arbitrary `metadata` value with the commit. Returns the
280    /// `(start_loc, end_loc]` location range of committed operations. The end of the returned
281    /// range includes the commit operation itself, and hence will always be equal to `op_count`.
282    pub async fn commit(
283        mut self,
284        metadata: Option<V>,
285    ) -> Result<(Keyless<E, V, H, Unmerkleized, Durable>, Range<Location>), Error> {
286        let start_loc = self.last_commit_loc + 1;
287        self.last_commit_loc = self.journal.append(Operation::Commit(metadata)).await?;
288        self.journal.commit().await?;
289        debug!(size = ?self.op_count(), "committed db");
290
291        let op_count = self.op_count();
292        let durable = Keyless {
293            journal: self.journal,
294            last_commit_loc: self.last_commit_loc,
295            _durability: PhantomData,
296        };
297
298        Ok((durable, start_loc..op_count))
299    }
300
301    pub fn into_merkleized(self) -> Keyless<E, V, H, Merkleized<H>, Durable> {
302        Keyless {
303            journal: self.journal.merkleize(),
304            last_commit_loc: self.last_commit_loc,
305            _durability: PhantomData,
306        }
307    }
308}
309
310// Implementation for the (Unmerkleized, Durable) state.
311impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
312    Keyless<E, V, H, Unmerkleized, Durable>
313{
314    /// Convert this database into the Mutable state for accepting more operations without
315    /// re-merkleizing.
316    pub fn into_mutable(self) -> Keyless<E, V, H, Unmerkleized, NonDurable> {
317        Keyless {
318            journal: self.journal,
319            last_commit_loc: self.last_commit_loc,
320            _durability: PhantomData,
321        }
322    }
323
324    /// Compute the merkle root and transition to the Merkleized, Durable state.
325    pub fn into_merkleized(self) -> Keyless<E, V, H, Merkleized<H>, Durable> {
326        Keyless {
327            journal: self.journal.merkleize(),
328            last_commit_loc: self.last_commit_loc,
329            _durability: PhantomData,
330        }
331    }
332}
333
334// Implementation of MerkleizedStore for the Merkleized state (any durability).
335impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, D: DurabilityState> MerkleizedStore
336    for Keyless<E, V, H, Merkleized<H>, D>
337{
338    type Digest = H::Digest;
339    type Operation = Operation<V>;
340
341    fn root(&self) -> Self::Digest {
342        self.journal.root()
343    }
344
345    async fn historical_proof(
346        &self,
347        historical_size: Location,
348        start_loc: Location,
349        max_ops: NonZeroU64,
350    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
351        Ok(self
352            .journal
353            .historical_proof(historical_size, start_loc, max_ops)
354            .await?)
355    }
356}
357
358// Implementation of LogStore for all states.
359impl<
360        E: Storage + Clock + Metrics,
361        V: VariableValue,
362        H: Hasher,
363        M: MerkleizationState<DigestOf<H>>,
364        D: DurabilityState,
365    > LogStore for Keyless<E, V, H, M, D>
366{
367    type Value = V;
368
369    fn is_empty(&self) -> bool {
370        // A keyless database is never "empty" in the traditional sense since it always
371        // has at least one commit operation. We consider it empty if there are no appends.
372        self.op_count() <= 1
373    }
374
375    fn op_count(&self) -> Location {
376        self.op_count()
377    }
378
379    fn inactivity_floor_loc(&self) -> Location {
380        self.inactivity_floor_loc()
381    }
382
383    async fn get_metadata(&self) -> Result<Option<Self::Value>, Error> {
384        self.get_metadata().await
385    }
386}
387
388// Implementation of PrunableStore for the Merkleized state (any durability).
389impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, D: DurabilityState> PrunableStore
390    for Keyless<E, V, H, Merkleized<H>, D>
391{
392    async fn prune(&mut self, loc: Location) -> Result<(), Error> {
393        if loc > self.last_commit_loc {
394            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
395        }
396        self.journal.prune(loc).await?;
397        Ok(())
398    }
399}
400
401#[cfg(test)]
402mod test {
403    use super::*;
404    use crate::{mmr::StandardHasher as Standard, qmdb::verify_proof};
405    use commonware_cryptography::Sha256;
406    use commonware_macros::test_traced;
407    use commonware_runtime::{deterministic, Runner as _};
408    use commonware_utils::{NZUsize, NZU16, NZU64};
409    use rand::Rng;
410    use std::num::NonZeroU16;
411
412    // Use some weird sizes here to test boundary conditions.
413    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
414    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
415
416    fn db_config(suffix: &str) -> Config<(commonware_codec::RangeCfg<usize>, ())> {
417        Config {
418            mmr_journal_partition: format!("journal_{suffix}"),
419            mmr_metadata_partition: format!("metadata_{suffix}"),
420            mmr_items_per_blob: NZU64!(11),
421            mmr_write_buffer: NZUsize!(1024),
422            log_partition: format!("log_journal_{suffix}"),
423            log_write_buffer: NZUsize!(1024),
424            log_compression: None,
425            log_codec_config: ((0..=10000).into(), ()),
426            log_items_per_section: NZU64!(7),
427            thread_pool: None,
428            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
429        }
430    }
431
432    /// Type alias for the Merkleized, Durable state.
433    type CleanDb = Keyless<deterministic::Context, Vec<u8>, Sha256, Merkleized<Sha256>, Durable>;
434
435    /// Type alias for the Mutable (Unmerkleized, NonDurable) state.
436    type MutableDb = Keyless<deterministic::Context, Vec<u8>, Sha256, Unmerkleized, NonDurable>;
437
438    /// Return a [Keyless] database initialized with a fixed config.
439    async fn open_db(context: deterministic::Context) -> CleanDb {
440        CleanDb::init(context, db_config("partition"))
441            .await
442            .unwrap()
443    }
444
445    #[test_traced("INFO")]
446    pub fn test_keyless_db_empty() {
447        let executor = deterministic::Runner::default();
448        executor.start(|context| async move {
449            let db = open_db(context.clone()).await;
450            assert_eq!(db.op_count(), 1); // initial commit should exist
451            assert_eq!(db.oldest_retained_loc(), Location::new_unchecked(0));
452
453            assert_eq!(db.get_metadata().await.unwrap(), None);
454            assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
455
456            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
457            let v1 = vec![1u8; 8];
458            let root = db.root();
459            let mut db = db.into_mutable();
460            db.append(v1).await.unwrap();
461            drop(db); // Simulate failed commit
462            let db = open_db(context.clone()).await;
463            assert_eq!(db.root(), root);
464            assert_eq!(db.op_count(), 1);
465            assert_eq!(db.get_metadata().await.unwrap(), None);
466
467            // Test calling commit on an empty db which should make it (durably) non-empty.
468            let metadata = vec![3u8; 10];
469            let db = db.into_mutable();
470            let (durable, _) = db.commit(Some(metadata.clone())).await.unwrap();
471            let db = durable.into_merkleized();
472            assert_eq!(db.op_count(), 2); // 2 commit ops
473            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
474            assert_eq!(
475                db.get(Location::new_unchecked(1)).await.unwrap(),
476                Some(metadata.clone())
477            ); // the commit op
478            let root = db.root();
479
480            // Commit op should remain after reopen even without clean shutdown.
481            let db = open_db(context.clone()).await;
482            assert_eq!(db.op_count(), 2); // commit op should remain after re-open.
483            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
484            assert_eq!(db.root(), root);
485            assert_eq!(db.last_commit_loc(), Location::new_unchecked(1));
486
487            db.destroy().await.unwrap();
488        });
489    }
490
491    #[test_traced("WARN")]
492    pub fn test_keyless_db_build_basic() {
493        let executor = deterministic::Runner::default();
494        executor.start(|context| async move {
495            // Build a db with 2 values and make sure we can get them back.
496            let db = open_db(context.clone()).await;
497            let mut db = db.into_mutable();
498
499            let v1 = vec![1u8; 8];
500            let v2 = vec![2u8; 20];
501
502            let loc1 = db.append(v1.clone()).await.unwrap();
503            assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
504
505            let loc2 = db.append(v2.clone()).await.unwrap();
506            assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
507
508            // Make sure closing/reopening gets us back to the same state.
509            let (durable, _) = db.commit(None).await.unwrap();
510            let mut db = durable.into_merkleized();
511            assert_eq!(db.op_count(), 4); // 2 appends, 1 commit + 1 initial commit
512            assert_eq!(db.get_metadata().await.unwrap(), None);
513            assert_eq!(db.get(Location::new_unchecked(3)).await.unwrap(), None); // the commit op
514            let root = db.root();
515            db.sync().await.unwrap();
516            drop(db);
517            let db = open_db(context.clone()).await;
518            assert_eq!(db.op_count(), 4);
519            assert_eq!(db.root(), root);
520
521            assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
522            assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
523
524            let mut db = db.into_mutable();
525            db.append(v2).await.unwrap();
526            db.append(v1).await.unwrap();
527
528            // Make sure uncommitted items get rolled back.
529            drop(db); // Simulate failed commit
530            let db = open_db(context.clone()).await;
531            assert_eq!(db.op_count(), 4);
532            assert_eq!(db.root(), root);
533
534            // Make sure commit operation remains after drop/reopen.
535            drop(db);
536            let db = open_db(context.clone()).await;
537            assert_eq!(db.op_count(), 4);
538            assert_eq!(db.root(), root);
539
540            db.destroy().await.unwrap();
541        });
542    }
543
544    // Helper function to append random elements to a database.
545    async fn append_elements<T: Rng>(db: &mut MutableDb, rng: &mut T, num_elements: usize) {
546        for _ in 0..num_elements {
547            let value = vec![(rng.next_u32() % 255) as u8, (rng.next_u32() % 255) as u8];
548            db.append(value).await.unwrap();
549        }
550    }
551
552    #[test_traced("WARN")]
553    pub fn test_keyless_db_recovery() {
554        let executor = deterministic::Runner::default();
555        const ELEMENTS: usize = 1000;
556        executor.start(|mut context| async move {
557            let db = open_db(context.clone()).await;
558            let root = db.root();
559            let mut db = db.into_mutable();
560
561            append_elements(&mut db, &mut context, ELEMENTS).await;
562
563            // Simulate a failure before committing.
564            drop(db);
565            // Should rollback to the previous root.
566            let db = open_db(context.clone()).await;
567            assert_eq!(root, db.root());
568
569            // Re-apply the updates and commit them this time.
570            let mut db = db.into_mutable();
571            append_elements(&mut db, &mut context, ELEMENTS).await;
572            let (durable, _) = db.commit(None).await.unwrap();
573            let db = durable.into_merkleized();
574            let root = db.root();
575
576            // Append more values.
577            let mut db = db.into_mutable();
578            append_elements(&mut db, &mut context, ELEMENTS).await;
579
580            // Simulate a failure.
581            drop(db);
582            // Should rollback to the previous root.
583            let db = open_db(context.clone()).await;
584            assert_eq!(root, db.root());
585
586            // Re-apply the updates and commit them this time.
587            let mut db = db.into_mutable();
588            append_elements(&mut db, &mut context, ELEMENTS).await;
589            let (durable, _) = db.commit(None).await.unwrap();
590            let db = durable.into_merkleized();
591            let root = db.root();
592
593            // Make sure we can reopen and get back to the same state.
594            drop(db);
595            let db = open_db(context.clone()).await;
596            assert_eq!(db.op_count(), 2 * ELEMENTS as u64 + 3);
597            assert_eq!(db.root(), root);
598
599            db.destroy().await.unwrap();
600        });
601    }
602
603    /// Test that various types of unclean shutdown while updating a non-empty DB recover to the
604    /// empty DB on re-open.
605    #[test_traced("WARN")]
606    fn test_keyless_db_non_empty_db_recovery() {
607        let executor = deterministic::Runner::default();
608        executor.start(|mut context| async move {
609            let db = open_db(context.clone()).await;
610
611            // Append many values then commit.
612            const ELEMENTS: usize = 200;
613            let mut db = db.into_mutable();
614            append_elements(&mut db, &mut context, ELEMENTS).await;
615            let (durable, _) = db.commit(None).await.unwrap();
616            let db = durable.into_merkleized();
617            let root = db.root();
618            let op_count = db.op_count();
619
620            // Reopen DB without clean shutdown and make sure the state is the same.
621            let db = open_db(context.clone()).await;
622            assert_eq!(db.op_count(), op_count);
623            assert_eq!(db.root(), root);
624            assert_eq!(db.last_commit_loc(), op_count - 1);
625            drop(db);
626
627            // Insert many operations without commit, then simulate failure.
628            async fn recover_from_failure(
629                mut context: deterministic::Context,
630                root: <Sha256 as Hasher>::Digest,
631                op_count: Location,
632            ) {
633                let mut db = open_db(context.clone()).await.into_mutable();
634
635                // Append operations and simulate failure.
636                append_elements(&mut db, &mut context, ELEMENTS).await;
637                drop(db);
638                let db = open_db(context.clone()).await;
639                assert_eq!(db.op_count(), op_count);
640                assert_eq!(db.root(), root);
641            }
642
643            recover_from_failure(context.clone(), root, op_count).await;
644
645            // Repeat recover_from_failure tests after successfully pruning to the last commit.
646            let mut db = open_db(context.clone()).await;
647            db.prune(db.last_commit_loc()).await.unwrap();
648            assert_eq!(db.op_count(), op_count);
649            assert_eq!(db.root(), root);
650            db.sync().await.unwrap();
651            drop(db);
652
653            recover_from_failure(context.clone(), root, op_count).await;
654
655            // Apply the ops one last time but fully commit them this time, then clean up.
656            let mut db = open_db(context.clone()).await.into_mutable();
657            append_elements(&mut db, &mut context, ELEMENTS).await;
658            let (_durable, _) = db.commit(None).await.unwrap();
659            let db = open_db(context.clone()).await;
660            assert!(db.op_count() > op_count);
661            assert_ne!(db.root(), root);
662            assert_eq!(db.last_commit_loc(), db.op_count() - 1);
663
664            db.destroy().await.unwrap();
665        });
666    }
667
668    /// Test that various types of unclean shutdown while updating an empty DB recover to the empty
669    /// DB on re-open.
670    #[test_traced("WARN")]
671    fn test_keyless_db_empty_db_recovery() {
672        const ELEMENTS: u64 = 1000;
673        let executor = deterministic::Runner::default();
674        executor.start(|context| async move {
675            let db = open_db(context.clone()).await;
676            let root = db.root();
677
678            // Reopen DB without clean shutdown and make sure the state is the same.
679            let db = open_db(context.clone()).await;
680            assert_eq!(db.op_count(), 1); // initial commit should exist
681            assert_eq!(db.root(), root);
682
683            async fn apply_ops(db: &mut MutableDb) {
684                for i in 0..ELEMENTS {
685                    let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
686                    db.append(v).await.unwrap();
687                }
688            }
689
690            // Simulate failure after inserting operations without a commit.
691            let mut db = db.into_mutable();
692            apply_ops(&mut db).await;
693            drop(db);
694            let db = open_db(context.clone()).await;
695            assert_eq!(db.op_count(), 1); // initial commit should exist
696            assert_eq!(db.root(), root);
697
698            // Repeat: simulate failure after inserting operations without a commit.
699            let mut db = db.into_mutable();
700            apply_ops(&mut db).await;
701            drop(db);
702            let db = open_db(context.clone()).await;
703            assert_eq!(db.op_count(), 1); // initial commit should exist
704            assert_eq!(db.root(), root);
705
706            // One last check that re-open without proper shutdown still recovers the correct state.
707            let mut db = db.into_mutable();
708            apply_ops(&mut db).await;
709            apply_ops(&mut db).await;
710            apply_ops(&mut db).await;
711            drop(db);
712            let db = open_db(context.clone()).await;
713            assert_eq!(db.op_count(), 1); // initial commit should exist
714            assert_eq!(db.root(), root);
715            assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
716
717            // Apply the ops one last time but fully commit them this time, then clean up.
718            let mut db = db.into_mutable();
719            apply_ops(&mut db).await;
720            let (_db, _) = db.commit(None).await.unwrap();
721            let db = open_db(context.clone()).await;
722            assert!(db.op_count() > 1);
723            assert_ne!(db.root(), root);
724
725            db.destroy().await.unwrap();
726        });
727    }
728
729    #[test_traced("INFO")]
730    pub fn test_keyless_db_proof_generation_and_verification() {
731        let executor = deterministic::Runner::default();
732        executor.start(|context| async move {
733            let mut hasher = Standard::<Sha256>::new();
734            let db = open_db(context.clone()).await;
735            let mut db = db.into_mutable();
736
737            // Build a db with some values
738            const ELEMENTS: u64 = 100;
739            let mut values = Vec::new();
740            for i in 0u64..ELEMENTS {
741                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
742                values.push(v.clone());
743                db.append(v).await.unwrap();
744            }
745            let (durable, _) = db.commit(None).await.unwrap();
746            let db = durable.into_merkleized();
747
748            // Test that historical proof fails with op_count > number of operations
749            assert!(matches!(
750                db.historical_proof(db.op_count() + 1, Location::new_unchecked(5), NZU64!(10))
751                    .await,
752                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
753            ));
754
755            let root = db.root();
756
757            // Test proof generation for various ranges
758            let test_cases = vec![
759                (0, 10),           // First 10 operations
760                (10, 5),           // Middle range
761                (50, 20),          // Larger range
762                (90, 15),          // Range that extends beyond end (should be limited)
763                (0, 1),            // Single operation
764                (ELEMENTS - 1, 1), // Last append operation
765                (ELEMENTS, 1),     // The commit operation
766            ];
767
768            for (start_loc, max_ops) in test_cases {
769                let (proof, ops) = db.proof(Location::new_unchecked(start_loc), NZU64!(max_ops)).await.unwrap();
770
771                // Verify the proof
772                assert!(
773                    verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &root),
774                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
775                );
776
777                // Check that we got the expected number of operations
778                let expected_ops = std::cmp::min(max_ops, *db.op_count() - start_loc);
779                assert_eq!(
780                    ops.len() as u64,
781                    expected_ops,
782                    "Expected {expected_ops} operations, got {}",
783                    ops.len(),
784                );
785
786                // Verify operation types
787                for (i, op) in ops.iter().enumerate() {
788                    let loc = start_loc + i as u64;
789                    if loc == 0 {
790                         assert!(
791                            matches!(op, Operation::Commit(None)),
792                            "Expected Initial Commit operation at location {loc}, got {op:?}",
793                        );
794                    } else if loc <= ELEMENTS {
795                        // Should be an Append operation
796                        assert!(
797                            matches!(op, Operation::Append(_)),
798                            "Expected Append operation at location {loc}, got {op:?}",
799                        );
800                    } else if loc == ELEMENTS + 1 {
801                        // Should be a Commit operation
802                        assert!(
803                            matches!(op, Operation::Commit(_)),
804                            "Expected Commit operation at location {loc}, got {op:?}",
805                        );
806                    }
807                }
808
809                // Verify that proof fails with wrong root
810                let wrong_root = Sha256::hash(&[0xFF; 32]);
811                assert!(
812                    !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &wrong_root),
813                    "Proof should fail with wrong root"
814                );
815
816                // Verify that proof fails with wrong start location
817                if start_loc > 0 {
818                    assert!(
819                        !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc - 1), &ops, &root),
820                        "Proof should fail with wrong start location"
821                    );
822                }
823            }
824
825            db.destroy().await.unwrap();
826        });
827    }
828
829    #[test_traced("INFO")]
830    pub fn test_keyless_db_proof_with_pruning() {
831        let executor = deterministic::Runner::default();
832        executor.start(|context| async move {
833            let mut hasher = Standard::<Sha256>::new();
834            let db = open_db(context.clone()).await;
835            let mut db = db.into_mutable();
836
837            // Build a db with some values
838            const ELEMENTS: u64 = 100;
839            let mut values = Vec::new();
840            for i in 0u64..ELEMENTS {
841                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
842                values.push(v.clone());
843                db.append(v).await.unwrap();
844            }
845            let (durable, _) = db.commit(None).await.unwrap();
846
847            // Add more elements and commit again
848            let mut db = durable.into_mutable();
849            for i in ELEMENTS..ELEMENTS * 2 {
850                let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
851                values.push(v.clone());
852                db.append(v).await.unwrap();
853            }
854            let (durable, _) = db.commit(None).await.unwrap();
855            let mut db = durable.into_merkleized();
856            let root = db.root();
857
858            println!("last commit loc: {}", db.last_commit_loc());
859
860            // Prune the first 30 operations
861            const PRUNE_LOC: u64 = 30;
862            db.prune(Location::new_unchecked(PRUNE_LOC)).await.unwrap();
863
864            // Verify pruning worked
865            let oldest_retained = db.oldest_retained_loc();
866
867            // Root should remain the same after pruning
868            assert_eq!(
869                db.root(),
870                root,
871                "Root should not change after pruning"
872            );
873
874            db.sync().await.unwrap();
875            drop(db);
876            let mut db = open_db(context.clone()).await;
877            assert_eq!(db.root(), root);
878            assert_eq!(db.op_count(), 2 * ELEMENTS + 3);
879            assert!(db.oldest_retained_loc() <= PRUNE_LOC);
880
881            // Test that we can't get pruned values
882            for i in 0..*oldest_retained {
883                let result = db.get(Location::new_unchecked(i)).await;
884                // Should either return None (for commit ops) or encounter pruned data
885                match result {
886                    Ok(None) => {} // Commit operation or pruned
887                    Ok(Some(_)) => {
888                        panic!("Should not be able to get pruned value at location {i}")
889                    }
890                    Err(_) => {} // Expected error for pruned data
891                }
892            }
893
894            // Test proof generation after pruning - should work for non-pruned ranges
895            let test_cases = vec![
896                (oldest_retained, 10), // Starting from oldest retained
897                (Location::new_unchecked(50), 20),                       // Middle range (if not pruned)
898                (Location::new_unchecked(150), 10),                      // Later range
899                (Location::new_unchecked(190), 15),                      // Near the end
900            ];
901
902            for (start_loc, max_ops) in test_cases {
903                // Skip if start_loc is before oldest retained
904                if start_loc < oldest_retained {
905                    continue;
906                }
907
908                let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
909
910                // Verify the proof still works
911                assert!(
912                    verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
913                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
914                );
915
916                // Check that we got operations
917                let expected_ops = std::cmp::min(max_ops, *db.op_count() - *start_loc);
918                assert_eq!(
919                    ops.len() as u64,
920                    expected_ops,
921                    "Expected {expected_ops} operations, got {}",
922                    ops.len(),
923                );
924            }
925
926            // Test pruning more aggressively
927            const AGGRESSIVE_PRUNE: Location = Location::new_unchecked(150);
928            db.prune(AGGRESSIVE_PRUNE).await.unwrap();
929
930            let new_oldest = db.oldest_retained_loc();
931            assert!(new_oldest <= AGGRESSIVE_PRUNE);
932
933            // Can still generate proofs for the remaining data
934            let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
935            assert!(
936                verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
937                "Proof should still verify after aggressive pruning"
938            );
939
940            // Test edge case: prune everything except the last few operations
941            let almost_all = db.op_count() - 5;
942            db.prune(almost_all).await.unwrap();
943
944            let final_oldest = db.oldest_retained_loc();
945
946            // Should still be able to prove the remaining operations
947            if final_oldest < db.op_count() {
948                let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
949                assert!(
950                    verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
951                    "Should be able to prove remaining operations after extensive pruning"
952                );
953            }
954
955            db.destroy().await.unwrap();
956        });
957    }
958
959    #[test_traced("WARN")]
960    fn test_keyless_db_replay_with_trailing_appends() {
961        let executor = deterministic::Runner::default();
962        executor.start(|context| async move {
963            // Create initial database with committed data
964            let db = open_db(context.clone()).await;
965            let mut db = db.into_mutable();
966
967            // Add some initial operations and commit
968            for i in 0..10 {
969                let v = vec![i as u8; 10];
970                db.append(v).await.unwrap();
971            }
972            let (durable, _) = db.commit(None).await.unwrap();
973            let db = durable.into_merkleized();
974            let committed_root = db.root();
975            let committed_size = db.op_count();
976
977            // Add exactly one more append (uncommitted)
978            let uncommitted_value = vec![99u8; 20];
979            let mut db = db.into_mutable();
980            db.append(uncommitted_value.clone()).await.unwrap();
981
982            // Simulate failure without commit
983            drop(db);
984
985            // Reopen database
986            let db = open_db(context.clone()).await;
987
988            // Verify correct recovery
989            assert_eq!(
990                db.op_count(),
991                committed_size,
992                "Should rewind to last commit"
993            );
994            assert_eq!(db.root(), committed_root, "Root should match last commit");
995            assert_eq!(
996                db.last_commit_loc(),
997                committed_size - 1,
998                "Last commit location should be correct"
999            );
1000
1001            // Verify the uncommitted append was properly discarded
1002            // We should be able to append new data without issues
1003            let mut db = db.into_mutable();
1004            let new_value = vec![77u8; 15];
1005            let loc = db.append(new_value.clone()).await.unwrap();
1006            assert_eq!(
1007                loc, committed_size,
1008                "New append should get the expected location"
1009            );
1010
1011            // Verify we can read the new value
1012            assert_eq!(db.get(loc).await.unwrap(), Some(new_value));
1013
1014            // Test with multiple trailing appends to ensure robustness
1015            let (durable, _) = db.commit(None).await.unwrap();
1016            let db = durable.into_merkleized();
1017            let new_committed_root = db.root();
1018            let new_committed_size = db.op_count();
1019
1020            // Add multiple uncommitted appends
1021            let mut db = db.into_mutable();
1022            for i in 0..5 {
1023                let v = vec![(200 + i) as u8; 10];
1024                db.append(v).await.unwrap();
1025            }
1026
1027            // Simulate failure without commit
1028            drop(db);
1029
1030            // Reopen and verify correct recovery
1031            let db = open_db(context.clone()).await;
1032            assert_eq!(
1033                db.op_count(),
1034                new_committed_size,
1035                "Should rewind to last commit with multiple trailing appends"
1036            );
1037            assert_eq!(
1038                db.root(),
1039                new_committed_root,
1040                "Root should match last commit after multiple appends"
1041            );
1042            assert_eq!(
1043                db.last_commit_loc(),
1044                new_committed_size - 1,
1045                "Last commit location should be correct after multiple appends"
1046            );
1047
1048            db.destroy().await.unwrap();
1049        });
1050    }
1051
1052    #[test_traced("INFO")]
1053    pub fn test_keyless_db_get_out_of_bounds() {
1054        let executor = deterministic::Runner::default();
1055        executor.start(|context| async move {
1056            let db = open_db(context.clone()).await;
1057
1058            // Test getting from empty database
1059            let result = db.get(Location::new_unchecked(0)).await.unwrap();
1060            assert!(result.is_none());
1061
1062            // Add some values
1063            let v1 = vec![1u8; 8];
1064            let v2 = vec![2u8; 8];
1065            let mut db = db.into_mutable();
1066            db.append(v1.clone()).await.unwrap();
1067            db.append(v2.clone()).await.unwrap();
1068            let (durable, _) = db.commit(None).await.unwrap();
1069
1070            // Test getting valid locations - should succeed
1071            assert_eq!(durable.get(Location::new_unchecked(1)).await.unwrap().unwrap(), v1);
1072            assert_eq!(durable.get(Location::new_unchecked(2)).await.unwrap().unwrap(), v2);
1073
1074            // Test getting out of bounds location
1075            let result = durable.get(Location::new_unchecked(3)).await.unwrap();
1076            assert!(result.is_none());
1077
1078            // Test getting out of bounds location
1079            let result = durable.get(Location::new_unchecked(4)).await;
1080            assert!(
1081                matches!(result, Err(Error::LocationOutOfBounds(loc, size)) if loc == Location::new_unchecked(4) && size == Location::new_unchecked(4))
1082            );
1083
1084            let db = durable.into_merkleized();
1085            db.destroy().await.unwrap();
1086        });
1087    }
1088
1089    #[test_traced("INFO")]
1090    pub fn test_keyless_db_prune_beyond_commit() {
1091        let executor = deterministic::Runner::default();
1092        executor.start(|context| async move {
1093            let mut db = open_db(context.clone()).await;
1094
1095            // Test pruning empty database (no commits)
1096            let result = db.prune(Location::new_unchecked(1)).await;
1097            assert!(
1098                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1099                    if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
1100            );
1101
1102            // Add values and commit
1103            let v1 = vec![1u8; 8];
1104            let v2 = vec![2u8; 8];
1105            let v3 = vec![3u8; 8];
1106            let mut db = db.into_mutable();
1107            db.append(v1.clone()).await.unwrap();
1108            db.append(v2.clone()).await.unwrap();
1109            let (db, _) = db.commit(None).await.unwrap();
1110            let mut db = db.into_mutable();
1111            db.append(v3.clone()).await.unwrap();
1112
1113            // op_count is 5 (initial_commit, v1, v2, commit, v3), last_commit_loc is 3
1114            let last_commit = db.last_commit_loc();
1115            assert_eq!(last_commit, Location::new_unchecked(3));
1116
1117            // Test valid prune (at last commit) - need Clean state for prune
1118            let (durable, _) = db.commit(None).await.unwrap();
1119            let mut db = durable.into_merkleized();
1120            assert!(db.prune(Location::new_unchecked(3)).await.is_ok());
1121
1122            // Test pruning beyond last commit
1123            let new_last_commit = db.last_commit_loc();
1124            let beyond = Location::new_unchecked(*new_last_commit + 1);
1125            let result = db.prune(beyond).await;
1126            assert!(
1127                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1128                    if prune_loc == beyond && commit_loc == new_last_commit)
1129            );
1130
1131            db.destroy().await.unwrap();
1132        });
1133    }
1134
1135    use crate::{
1136        kv::tests::assert_send,
1137        qmdb::store::tests::{assert_log_store, assert_merkleized_store, assert_prunable_store},
1138    };
1139
1140    #[allow(dead_code)]
1141    fn assert_clean_db_futures_are_send(db: &mut CleanDb, loc: Location) {
1142        assert_log_store(db);
1143        assert_prunable_store(db, loc);
1144        assert_merkleized_store(db, loc);
1145        assert_send(db.sync());
1146        assert_send(db.get(loc));
1147    }
1148
1149    #[allow(dead_code)]
1150    fn assert_mutable_db_futures_are_send(db: &mut MutableDb, loc: Location, value: Vec<u8>) {
1151        assert_log_store(db);
1152        assert_send(db.get(loc));
1153        assert_send(db.append(value));
1154    }
1155
1156    #[allow(dead_code)]
1157    fn assert_mutable_db_commit_is_send(db: MutableDb) {
1158        assert_send(db.commit(None));
1159    }
1160}