Skip to main content

commonware_storage/qmdb/keyless/
mod.rs

1//! The [Keyless] qmdb allows for append-only storage of arbitrary variable-length data that can
2//! later be retrieved by its location.
3
4use crate::{
5    journal::{
6        authenticated,
7        contiguous::variable::{Config as JournalConfig, Journal as ContiguousJournal},
8    },
9    mmr::{journaled::Config as MmrConfig, Location, Proof},
10    qmdb::{
11        any::VariableValue,
12        operation::Committable,
13        store::{LogStore, MerkleizedStore, PrunableStore},
14        DurabilityState, Durable, Error, MerkleizationState, Merkleized, NonDurable, Unmerkleized,
15    },
16};
17use commonware_cryptography::{DigestOf, Hasher};
18use commonware_parallel::ThreadPool;
19use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
20use core::{marker::PhantomData, ops::Range};
21use std::num::{NonZeroU64, NonZeroUsize};
22use tracing::{debug, warn};
23
24mod operation;
25pub use operation::Operation;
26
27/// Configuration for a [Keyless] authenticated db.
28#[derive(Clone)]
29pub struct Config<C> {
30    /// The name of the [Storage] partition used for the MMR's backing journal.
31    pub mmr_journal_partition: String,
32
33    /// The items per blob configuration value used by the MMR journal.
34    pub mmr_items_per_blob: NonZeroU64,
35
36    /// The size of the write buffer to use for each blob in the MMR journal.
37    pub mmr_write_buffer: NonZeroUsize,
38
39    /// The name of the [Storage] partition used for the MMR's metadata.
40    pub mmr_metadata_partition: String,
41
42    /// The name of the [Storage] partition used to persist the operations log.
43    pub log_partition: String,
44
45    /// The size of the write buffer to use with the log journal.
46    pub log_write_buffer: NonZeroUsize,
47
48    /// Optional compression level (using `zstd`) to apply to log data before storing.
49    pub log_compression: Option<u8>,
50
51    /// The codec configuration to use for encoding and decoding the operations log.
52    pub log_codec_config: C,
53
54    /// The max number of operations to put in each section of the operations log.
55    pub log_items_per_section: NonZeroU64,
56
57    /// An optional thread pool to use for parallelizing batch MMR operations.
58    pub thread_pool: Option<ThreadPool>,
59
60    /// The page cache to use for caching data.
61    pub page_cache: CacheRef,
62}
63
64/// A keyless QMDB for variable length data.
65type Journal<E, V, H, S> = authenticated::Journal<E, ContiguousJournal<E, Operation<V>>, H, S>;
66
67/// A keyless authenticated database for variable-length data.
68pub struct Keyless<
69    E: Storage + Clock + Metrics,
70    V: VariableValue,
71    H: Hasher,
72    M: MerkleizationState<DigestOf<H>> = Merkleized<H>,
73    D: DurabilityState = Durable,
74> {
75    /// Authenticated journal of operations.
76    journal: Journal<E, V, H, M>,
77
78    /// The location of the last commit, if any.
79    last_commit_loc: Location,
80
81    /// Marker for durability state.
82    _durability: PhantomData<D>,
83}
84
85// Impl block for functionality available in all states.
86impl<
87        E: Storage + Clock + Metrics,
88        V: VariableValue,
89        H: Hasher,
90        M: MerkleizationState<DigestOf<H>>,
91        D: DurabilityState,
92    > Keyless<E, V, H, M, D>
93{
94    /// Get the value at location `loc` in the database.
95    ///
96    /// # Errors
97    ///
98    /// Returns [Error::LocationOutOfBounds] if `loc` >= `self.journal.bounds().end`.
99    pub async fn get(&self, loc: Location) -> Result<Option<V>, Error> {
100        let op_count = self.journal.bounds().end;
101        if loc >= op_count {
102            return Err(Error::LocationOutOfBounds(loc, op_count));
103        }
104        let op = self.journal.read(loc).await?;
105
106        Ok(op.into_value())
107    }
108
109    /// Returns the location of the last commit.
110    pub const fn last_commit_loc(&self) -> Location {
111        self.last_commit_loc
112    }
113
114    /// Return the oldest location that is no longer required to be retained.
115    pub fn inactivity_floor_loc(&self) -> Location {
116        self.bounds().start
117    }
118
119    /// Get the metadata associated with the last commit.
120    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
121        let op = self.journal.read(self.last_commit_loc).await?;
122        let Operation::Commit(metadata) = op else {
123            return Ok(None);
124        };
125
126        Ok(metadata)
127    }
128}
129
130// Implementation for the Clean state.
131impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
132    Keyless<E, V, H, Merkleized<H>, Durable>
133{
134    /// Returns a [Keyless] qmdb initialized from `cfg`. Any uncommitted operations will be discarded
135    /// and the state of the db will be as of the last committed operation.
136    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
137        let mmr_cfg = MmrConfig {
138            journal_partition: cfg.mmr_journal_partition,
139            metadata_partition: cfg.mmr_metadata_partition,
140            items_per_blob: cfg.mmr_items_per_blob,
141            write_buffer: cfg.mmr_write_buffer,
142            thread_pool: cfg.thread_pool,
143            page_cache: cfg.page_cache.clone(),
144        };
145
146        let journal_cfg = JournalConfig {
147            partition: cfg.log_partition,
148            items_per_section: cfg.log_items_per_section,
149            compression: cfg.log_compression,
150            codec_config: cfg.log_codec_config,
151            page_cache: cfg.page_cache,
152            write_buffer: cfg.log_write_buffer,
153        };
154
155        let mut journal = Journal::new(context, mmr_cfg, journal_cfg, Operation::is_commit).await?;
156        if journal.size() == 0 {
157            warn!("no operations found in log, creating initial commit");
158            let mut dirty_journal = journal.into_dirty();
159            dirty_journal.append(Operation::Commit(None)).await?;
160            journal = dirty_journal.merkleize();
161            journal.sync().await?;
162        }
163
164        let last_commit_loc = journal
165            .size()
166            .checked_sub(1)
167            .expect("at least one commit should exist");
168
169        Ok(Self {
170            journal,
171            last_commit_loc,
172            _durability: PhantomData,
173        })
174    }
175
176    /// Return the root of the db.
177    pub const fn root(&self) -> H::Digest {
178        self.journal.root()
179    }
180
181    /// Generate and return:
182    ///  1. a proof of all operations applied to the db in the range starting at (and including)
183    ///     location `start_loc`, and ending at the first of either:
184    ///     - the last operation performed, or
185    ///     - the operation `max_ops` from the start.
186    ///  2. the operations corresponding to the leaves in this range.
187    pub async fn proof(
188        &self,
189        start_loc: Location,
190        max_ops: NonZeroU64,
191    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
192        self.historical_proof(self.size(), start_loc, max_ops).await
193    }
194
195    /// Analogous to proof, but with respect to the state of the MMR when it had `op_count`
196    /// operations.
197    ///
198    /// # Errors
199    ///
200    /// - Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
201    ///   [crate::mmr::MAX_LOCATION].
202    /// - Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= `op_count` or `op_count` >
203    ///   number of operations.
204    pub async fn historical_proof(
205        &self,
206        op_count: Location,
207        start_loc: Location,
208        max_ops: NonZeroU64,
209    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
210        Ok(self
211            .journal
212            .historical_proof(op_count, start_loc, max_ops)
213            .await?)
214    }
215
216    /// Prune historical operations prior to `loc`. This does not affect the db's root.
217    ///
218    /// # Errors
219    ///
220    /// - Returns [Error::PruneBeyondMinRequired] if `loc` > last commit point.
221    /// - Returns [crate::mmr::Error::LocationOverflow] if `loc` > [crate::mmr::MAX_LOCATION]
222    pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
223        if loc > self.last_commit_loc {
224            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
225        }
226        self.journal.prune(loc).await?;
227
228        Ok(())
229    }
230
231    /// Sync all database state to disk. While this isn't necessary to ensure durability of
232    /// committed operations, periodic invocation may reduce memory usage and the time required to
233    /// recover the database on restart.
234    pub async fn sync(&mut self) -> Result<(), Error> {
235        self.journal.sync().await.map_err(Into::into)
236    }
237
238    /// Destroy the db, removing all data from disk.
239    pub async fn destroy(self) -> Result<(), Error> {
240        Ok(self.journal.destroy().await?)
241    }
242
243    /// Convert this database into the Mutable state for accepting new operations.
244    pub fn into_mutable(self) -> Keyless<E, V, H, Unmerkleized, NonDurable> {
245        Keyless {
246            journal: self.journal.into_dirty(),
247            last_commit_loc: self.last_commit_loc,
248            _durability: PhantomData,
249        }
250    }
251}
252
253// Implementation for the Mutable state.
254impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
255    Keyless<E, V, H, Unmerkleized, NonDurable>
256{
257    /// Append a value to the db, returning its location which can be used to retrieve it.
258    pub async fn append(&mut self, value: V) -> Result<Location, Error> {
259        self.journal
260            .append(Operation::Append(value))
261            .await
262            .map_err(Into::into)
263    }
264
265    /// Commits any pending operations and transitions the database to the Durable state.
266    ///
267    /// The caller can associate an arbitrary `metadata` value with the commit. Returns the
268    /// `(start_loc, end_loc]` location range of committed operations. The end of the returned
269    /// range includes the commit operation itself, and hence will always be equal to `op_count`.
270    pub async fn commit(
271        mut self,
272        metadata: Option<V>,
273    ) -> Result<(Keyless<E, V, H, Unmerkleized, Durable>, Range<Location>), Error> {
274        let start_loc = self.last_commit_loc + 1;
275        self.last_commit_loc = self.journal.append(Operation::Commit(metadata)).await?;
276        self.journal.commit().await?;
277        let op_count = self.bounds().end;
278        debug!(size = ?op_count, "committed db");
279
280        let durable = Keyless {
281            journal: self.journal,
282            last_commit_loc: self.last_commit_loc,
283            _durability: PhantomData,
284        };
285
286        Ok((durable, start_loc..op_count))
287    }
288
289    pub fn into_merkleized(self) -> Keyless<E, V, H, Merkleized<H>, Durable> {
290        Keyless {
291            journal: self.journal.merkleize(),
292            last_commit_loc: self.last_commit_loc,
293            _durability: PhantomData,
294        }
295    }
296}
297
298// Implementation for the (Unmerkleized, Durable) state.
299impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
300    Keyless<E, V, H, Unmerkleized, Durable>
301{
302    /// Convert this database into the Mutable state for accepting more operations without
303    /// re-merkleizing.
304    pub fn into_mutable(self) -> Keyless<E, V, H, Unmerkleized, NonDurable> {
305        Keyless {
306            journal: self.journal,
307            last_commit_loc: self.last_commit_loc,
308            _durability: PhantomData,
309        }
310    }
311
312    /// Compute the merkle root and transition to the Merkleized, Durable state.
313    pub fn into_merkleized(self) -> Keyless<E, V, H, Merkleized<H>, Durable> {
314        Keyless {
315            journal: self.journal.merkleize(),
316            last_commit_loc: self.last_commit_loc,
317            _durability: PhantomData,
318        }
319    }
320}
321
322// Implementation of MerkleizedStore for the Merkleized state (any durability).
323impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, D: DurabilityState> MerkleizedStore
324    for Keyless<E, V, H, Merkleized<H>, D>
325{
326    type Digest = H::Digest;
327    type Operation = Operation<V>;
328
329    fn root(&self) -> Self::Digest {
330        self.journal.root()
331    }
332
333    async fn historical_proof(
334        &self,
335        historical_size: Location,
336        start_loc: Location,
337        max_ops: NonZeroU64,
338    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
339        Ok(self
340            .journal
341            .historical_proof(historical_size, start_loc, max_ops)
342            .await?)
343    }
344}
345
346// Implementation of LogStore for all states.
347impl<
348        E: Storage + Clock + Metrics,
349        V: VariableValue,
350        H: Hasher,
351        M: MerkleizationState<DigestOf<H>>,
352        D: DurabilityState,
353    > LogStore for Keyless<E, V, H, M, D>
354{
355    type Value = V;
356
357    fn is_empty(&self) -> bool {
358        // A keyless database is never "empty" in the traditional sense since it always
359        // has at least one commit operation. We consider it empty if there are no appends.
360        self.bounds().end <= 1
361    }
362
363    fn bounds(&self) -> std::ops::Range<Location> {
364        self.journal.bounds()
365    }
366
367    fn inactivity_floor_loc(&self) -> Location {
368        self.inactivity_floor_loc()
369    }
370
371    async fn get_metadata(&self) -> Result<Option<Self::Value>, Error> {
372        self.get_metadata().await
373    }
374}
375
376// Implementation of PrunableStore for the Merkleized state (any durability).
377impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, D: DurabilityState> PrunableStore
378    for Keyless<E, V, H, Merkleized<H>, D>
379{
380    async fn prune(&mut self, loc: Location) -> Result<(), Error> {
381        if loc > self.last_commit_loc {
382            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
383        }
384        self.journal.prune(loc).await?;
385        Ok(())
386    }
387}
388
389#[cfg(test)]
390mod test {
391    use super::*;
392    use crate::{
393        mmr::StandardHasher as Standard,
394        qmdb::{store::LogStore, verify_proof},
395    };
396    use commonware_cryptography::Sha256;
397    use commonware_macros::test_traced;
398    use commonware_runtime::{deterministic, Metrics, Runner as _};
399    use commonware_utils::{NZUsize, NZU16, NZU64};
400    use rand::Rng;
401    use std::num::NonZeroU16;
402
403    // Use some weird sizes here to test boundary conditions.
404    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
405    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
406
407    fn db_config(suffix: &str) -> Config<(commonware_codec::RangeCfg<usize>, ())> {
408        Config {
409            mmr_journal_partition: format!("journal_{suffix}"),
410            mmr_metadata_partition: format!("metadata_{suffix}"),
411            mmr_items_per_blob: NZU64!(11),
412            mmr_write_buffer: NZUsize!(1024),
413            log_partition: format!("log_journal_{suffix}"),
414            log_write_buffer: NZUsize!(1024),
415            log_compression: None,
416            log_codec_config: ((0..=10000).into(), ()),
417            log_items_per_section: NZU64!(7),
418            thread_pool: None,
419            page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
420        }
421    }
422
423    /// Type alias for the Merkleized, Durable state.
424    type CleanDb = Keyless<deterministic::Context, Vec<u8>, Sha256, Merkleized<Sha256>, Durable>;
425
426    /// Type alias for the Mutable (Unmerkleized, NonDurable) state.
427    type MutableDb = Keyless<deterministic::Context, Vec<u8>, Sha256, Unmerkleized, NonDurable>;
428
429    /// Return a [Keyless] database initialized with a fixed config.
430    async fn open_db(context: deterministic::Context) -> CleanDb {
431        CleanDb::init(context, db_config("partition"))
432            .await
433            .unwrap()
434    }
435
436    #[test_traced("INFO")]
437    pub fn test_keyless_db_empty() {
438        let executor = deterministic::Runner::default();
439        executor.start(|context| async move {
440            let db = open_db(context.with_label("db1")).await;
441            assert_eq!(db.bounds().end, 1); // initial commit should exist
442            assert_eq!(db.journal.bounds().start, Location::new_unchecked(0));
443
444            assert_eq!(db.get_metadata().await.unwrap(), None);
445            assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
446
447            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
448            let v1 = vec![1u8; 8];
449            let root = db.root();
450            let mut db = db.into_mutable();
451            db.append(v1).await.unwrap();
452            drop(db); // Simulate failed commit
453            let db = open_db(context.with_label("db2")).await;
454            assert_eq!(db.root(), root);
455            assert_eq!(db.bounds().end, 1);
456            assert_eq!(db.get_metadata().await.unwrap(), None);
457
458            // Test calling commit on an empty db which should make it (durably) non-empty.
459            let metadata = vec![3u8; 10];
460            let db = db.into_mutable();
461            let (durable, _) = db.commit(Some(metadata.clone())).await.unwrap();
462            let db = durable.into_merkleized();
463            assert_eq!(db.bounds().end, 2); // 2 commit ops
464            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
465            assert_eq!(
466                db.get(Location::new_unchecked(1)).await.unwrap(),
467                Some(metadata.clone())
468            ); // the commit op
469            let root = db.root();
470
471            // Commit op should remain after reopen even without clean shutdown.
472            let db = open_db(context.with_label("db3")).await;
473            assert_eq!(db.bounds().end, 2); // commit op should remain after re-open.
474            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
475            assert_eq!(db.root(), root);
476            assert_eq!(db.last_commit_loc(), Location::new_unchecked(1));
477
478            db.destroy().await.unwrap();
479        });
480    }
481
482    #[test_traced("WARN")]
483    pub fn test_keyless_db_build_basic() {
484        let executor = deterministic::Runner::default();
485        executor.start(|context| async move {
486            // Build a db with 2 values and make sure we can get them back.
487            let db = open_db(context.with_label("db1")).await;
488            let mut db = db.into_mutable();
489
490            let v1 = vec![1u8; 8];
491            let v2 = vec![2u8; 20];
492
493            let loc1 = db.append(v1.clone()).await.unwrap();
494            assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
495
496            let loc2 = db.append(v2.clone()).await.unwrap();
497            assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
498
499            // Make sure closing/reopening gets us back to the same state.
500            let (durable, _) = db.commit(None).await.unwrap();
501            let mut db = durable.into_merkleized();
502            assert_eq!(db.bounds().end, 4); // 2 appends, 1 commit + 1 initial commit
503            assert_eq!(db.get_metadata().await.unwrap(), None);
504            assert_eq!(db.get(Location::new_unchecked(3)).await.unwrap(), None); // the commit op
505            let root = db.root();
506            db.sync().await.unwrap();
507            drop(db);
508            let db = open_db(context.with_label("db2")).await;
509            assert_eq!(db.bounds().end, 4);
510            assert_eq!(db.root(), root);
511
512            assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
513            assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
514
515            let mut db = db.into_mutable();
516            db.append(v2).await.unwrap();
517            db.append(v1).await.unwrap();
518
519            // Make sure uncommitted items get rolled back.
520            drop(db); // Simulate failed commit
521            let db = open_db(context.with_label("db3")).await;
522            assert_eq!(db.bounds().end, 4);
523            assert_eq!(db.root(), root);
524
525            // Make sure commit operation remains after drop/reopen.
526            drop(db);
527            let db = open_db(context.with_label("db4")).await;
528            assert_eq!(db.bounds().end, 4);
529            assert_eq!(db.root(), root);
530
531            db.destroy().await.unwrap();
532        });
533    }
534
535    // Helper function to append random elements to a database.
536    async fn append_elements<T: Rng>(db: &mut MutableDb, rng: &mut T, num_elements: usize) {
537        for _ in 0..num_elements {
538            let value = vec![(rng.next_u32() % 255) as u8, (rng.next_u32() % 255) as u8];
539            db.append(value).await.unwrap();
540        }
541    }
542
543    #[test_traced("WARN")]
544    pub fn test_keyless_db_recovery() {
545        let executor = deterministic::Runner::default();
546        const ELEMENTS: usize = 1000;
547        executor.start(|mut context| async move {
548            let db = open_db(context.with_label("db1")).await;
549            let root = db.root();
550            let mut db = db.into_mutable();
551
552            append_elements(&mut db, &mut context, ELEMENTS).await;
553
554            // Simulate a failure before committing.
555            drop(db);
556            // Should rollback to the previous root.
557            let db = open_db(context.with_label("db2")).await;
558            assert_eq!(root, db.root());
559
560            // Re-apply the updates and commit them this time.
561            let mut db = db.into_mutable();
562            append_elements(&mut db, &mut context, ELEMENTS).await;
563            let (durable, _) = db.commit(None).await.unwrap();
564            let db = durable.into_merkleized();
565            let root = db.root();
566
567            // Append more values.
568            let mut db = db.into_mutable();
569            append_elements(&mut db, &mut context, ELEMENTS).await;
570
571            // Simulate a failure.
572            drop(db);
573            // Should rollback to the previous root.
574            let db = open_db(context.with_label("db3")).await;
575            assert_eq!(root, db.root());
576
577            // Re-apply the updates and commit them this time.
578            let mut db = db.into_mutable();
579            append_elements(&mut db, &mut context, ELEMENTS).await;
580            let (durable, _) = db.commit(None).await.unwrap();
581            let db = durable.into_merkleized();
582            let root = db.root();
583
584            // Make sure we can reopen and get back to the same state.
585            drop(db);
586            let db = open_db(context.with_label("db4")).await;
587            assert_eq!(db.bounds().end, 2 * ELEMENTS as u64 + 3);
588            assert_eq!(db.root(), root);
589
590            db.destroy().await.unwrap();
591        });
592    }
593
594    /// Test that various types of unclean shutdown while updating a non-empty DB recover to the
595    /// empty DB on re-open.
596    #[test_traced("WARN")]
597    fn test_keyless_db_non_empty_db_recovery() {
598        let executor = deterministic::Runner::default();
599        executor.start(|mut context| async move {
600            let db = open_db(context.with_label("db1")).await;
601
602            // Append many values then commit.
603            const ELEMENTS: usize = 200;
604            let mut db = db.into_mutable();
605            append_elements(&mut db, &mut context, ELEMENTS).await;
606            let (durable, _) = db.commit(None).await.unwrap();
607            let db = durable.into_merkleized();
608            let root = db.root();
609            let op_count = db.bounds().end;
610
611            // Reopen DB without clean shutdown and make sure the state is the same.
612            let db = open_db(context.with_label("db2")).await;
613            assert_eq!(db.bounds().end, op_count);
614            assert_eq!(db.root(), root);
615            assert_eq!(db.last_commit_loc(), op_count - 1);
616            drop(db);
617
618            // Insert many operations without commit, then simulate failure.
619            async fn recover_from_failure(
620                mut context: deterministic::Context,
621                label1: &str,
622                label2: &str,
623                root: <Sha256 as Hasher>::Digest,
624                op_count: Location,
625            ) {
626                let mut db = open_db(context.with_label(label1)).await.into_mutable();
627
628                // Append operations and simulate failure.
629                append_elements(&mut db, &mut context, ELEMENTS).await;
630                drop(db);
631                let db = open_db(context.with_label(label2)).await;
632                assert_eq!(db.bounds().end, op_count);
633                assert_eq!(db.root(), root);
634            }
635
636            recover_from_failure(context.with_label("recovery1"), "a", "b", root, op_count).await;
637
638            // Repeat recover_from_failure tests after successfully pruning to the last commit.
639            let mut db = open_db(context.with_label("db3")).await;
640            db.prune(db.last_commit_loc()).await.unwrap();
641            assert_eq!(db.bounds().end, op_count);
642            assert_eq!(db.root(), root);
643            db.sync().await.unwrap();
644            drop(db);
645
646            recover_from_failure(context.with_label("recovery2"), "c", "d", root, op_count).await;
647
648            // Apply the ops one last time but fully commit them this time, then clean up.
649            let mut db = open_db(context.with_label("db4")).await.into_mutable();
650            append_elements(&mut db, &mut context, ELEMENTS).await;
651            let (_durable, _) = db.commit(None).await.unwrap();
652            let db = open_db(context.with_label("db5")).await;
653            assert!(db.bounds().end > op_count);
654            assert_ne!(db.root(), root);
655            assert_eq!(db.last_commit_loc(), db.bounds().end - 1);
656
657            db.destroy().await.unwrap();
658        });
659    }
660
661    /// Test that various types of unclean shutdown while updating an empty DB recover to the empty
662    /// DB on re-open.
663    #[test_traced("WARN")]
664    fn test_keyless_db_empty_db_recovery() {
665        const ELEMENTS: u64 = 1000;
666        let executor = deterministic::Runner::default();
667        executor.start(|context| async move {
668            let db = open_db(context.with_label("db1")).await;
669            let root = db.root();
670
671            // Reopen DB without clean shutdown and make sure the state is the same.
672            let db = open_db(context.with_label("db2")).await;
673            assert_eq!(db.bounds().end, 1); // initial commit should exist
674            assert_eq!(db.root(), root);
675
676            async fn apply_ops(db: &mut MutableDb) {
677                for i in 0..ELEMENTS {
678                    let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
679                    db.append(v).await.unwrap();
680                }
681            }
682
683            // Simulate failure after inserting operations without a commit.
684            let mut db = db.into_mutable();
685            apply_ops(&mut db).await;
686            drop(db);
687            let db = open_db(context.with_label("db3")).await;
688            assert_eq!(db.bounds().end, 1); // initial commit should exist
689            assert_eq!(db.root(), root);
690
691            // Repeat: simulate failure after inserting operations without a commit.
692            let mut db = db.into_mutable();
693            apply_ops(&mut db).await;
694            drop(db);
695            let db = open_db(context.with_label("db4")).await;
696            assert_eq!(db.bounds().end, 1); // initial commit should exist
697            assert_eq!(db.root(), root);
698
699            // One last check that re-open without proper shutdown still recovers the correct state.
700            let mut db = db.into_mutable();
701            apply_ops(&mut db).await;
702            apply_ops(&mut db).await;
703            apply_ops(&mut db).await;
704            drop(db);
705            let db = open_db(context.with_label("db5")).await;
706            assert_eq!(db.bounds().end, 1); // initial commit should exist
707            assert_eq!(db.root(), root);
708            assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
709
710            // Apply the ops one last time but fully commit them this time, then clean up.
711            let mut db = db.into_mutable();
712            apply_ops(&mut db).await;
713            let (_db, _) = db.commit(None).await.unwrap();
714            let db = open_db(context.with_label("db6")).await;
715            assert!(db.bounds().end > 1);
716            assert_ne!(db.root(), root);
717
718            db.destroy().await.unwrap();
719        });
720    }
721
722    #[test_traced("INFO")]
723    pub fn test_keyless_db_proof_generation_and_verification() {
724        let executor = deterministic::Runner::default();
725        executor.start(|context| async move {
726            let mut hasher = Standard::<Sha256>::new();
727            let db = open_db(context.clone()).await;
728            let mut db = db.into_mutable();
729
730            // Build a db with some values
731            const ELEMENTS: u64 = 100;
732            let mut values = Vec::new();
733            for i in 0u64..ELEMENTS {
734                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
735                values.push(v.clone());
736                db.append(v).await.unwrap();
737            }
738            let (durable, _) = db.commit(None).await.unwrap();
739            let db = durable.into_merkleized();
740
741            // Test that historical proof fails with op_count > number of operations
742            assert!(matches!(
743                db.historical_proof(db.bounds().end + 1, Location::new_unchecked(5), NZU64!(10))
744                    .await,
745                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
746            ));
747
748            let root = db.root();
749
750            // Test proof generation for various ranges
751            let test_cases = vec![
752                (0, 10),           // First 10 operations
753                (10, 5),           // Middle range
754                (50, 20),          // Larger range
755                (90, 15),          // Range that extends beyond end (should be limited)
756                (0, 1),            // Single operation
757                (ELEMENTS - 1, 1), // Last append operation
758                (ELEMENTS, 1),     // The commit operation
759            ];
760
761            for (start_loc, max_ops) in test_cases {
762                let (proof, ops) = db.proof(Location::new_unchecked(start_loc), NZU64!(max_ops)).await.unwrap();
763
764                // Verify the proof
765                assert!(
766                    verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &root),
767                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
768                );
769
770                // Check that we got the expected number of operations
771                let expected_ops = std::cmp::min(max_ops, *db.bounds().end - start_loc);
772                assert_eq!(
773                    ops.len() as u64,
774                    expected_ops,
775                    "Expected {expected_ops} operations, got {}",
776                    ops.len(),
777                );
778
779                // Verify operation types
780                for (i, op) in ops.iter().enumerate() {
781                    let loc = start_loc + i as u64;
782                    if loc == 0 {
783                         assert!(
784                            matches!(op, Operation::Commit(None)),
785                            "Expected Initial Commit operation at location {loc}, got {op:?}",
786                        );
787                    } else if loc <= ELEMENTS {
788                        // Should be an Append operation
789                        assert!(
790                            matches!(op, Operation::Append(_)),
791                            "Expected Append operation at location {loc}, got {op:?}",
792                        );
793                    } else if loc == ELEMENTS + 1 {
794                        // Should be a Commit operation
795                        assert!(
796                            matches!(op, Operation::Commit(_)),
797                            "Expected Commit operation at location {loc}, got {op:?}",
798                        );
799                    }
800                }
801
802                // Verify that proof fails with wrong root
803                let wrong_root = Sha256::hash(&[0xFF; 32]);
804                assert!(
805                    !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &wrong_root),
806                    "Proof should fail with wrong root"
807                );
808
809                // Verify that proof fails with wrong start location
810                if start_loc > 0 {
811                    assert!(
812                        !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc - 1), &ops, &root),
813                        "Proof should fail with wrong start location"
814                    );
815                }
816            }
817
818            db.destroy().await.unwrap();
819        });
820    }
821
822    #[test_traced("INFO")]
823    pub fn test_keyless_db_proof_with_pruning() {
824        let executor = deterministic::Runner::default();
825        executor.start(|context| async move {
826            let mut hasher = Standard::<Sha256>::new();
827            let db = open_db(context.with_label("db1")).await;
828            let mut db = db.into_mutable();
829
830            // Build a db with some values
831            const ELEMENTS: u64 = 100;
832            let mut values = Vec::new();
833            for i in 0u64..ELEMENTS {
834                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
835                values.push(v.clone());
836                db.append(v).await.unwrap();
837            }
838            let (durable, _) = db.commit(None).await.unwrap();
839
840            // Add more elements and commit again
841            let mut db = durable.into_mutable();
842            for i in ELEMENTS..ELEMENTS * 2 {
843                let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
844                values.push(v.clone());
845                db.append(v).await.unwrap();
846            }
847            let (durable, _) = db.commit(None).await.unwrap();
848            let mut db = durable.into_merkleized();
849            let root = db.root();
850
851            println!("last commit loc: {}", db.last_commit_loc());
852
853            // Prune the first 30 operations
854            const PRUNE_LOC: u64 = 30;
855            db.prune(Location::new_unchecked(PRUNE_LOC)).await.unwrap();
856
857            // Verify pruning worked
858            let oldest_retained = db.journal.bounds().start;
859
860            // Root should remain the same after pruning
861            assert_eq!(
862                db.root(),
863                root,
864                "Root should not change after pruning"
865            );
866
867            db.sync().await.unwrap();
868            drop(db);
869            let mut db = open_db(context.with_label("db2")).await;
870            assert_eq!(db.root(), root);
871            assert_eq!(db.bounds().end, 2 * ELEMENTS + 3);
872            assert!(db.journal.bounds().start <= PRUNE_LOC);
873
874            // Test that we can't get pruned values
875            for i in 0..*oldest_retained {
876                let result = db.get(Location::new_unchecked(i)).await;
877                // Should either return None (for commit ops) or encounter pruned data
878                match result {
879                    Ok(None) => {} // Commit operation or pruned
880                    Ok(Some(_)) => {
881                        panic!("Should not be able to get pruned value at location {i}")
882                    }
883                    Err(_) => {} // Expected error for pruned data
884                }
885            }
886
887            // Test proof generation after pruning - should work for non-pruned ranges
888            let test_cases = vec![
889                (oldest_retained, 10), // Starting from oldest retained
890                (Location::new_unchecked(50), 20),                       // Middle range (if not pruned)
891                (Location::new_unchecked(150), 10),                      // Later range
892                (Location::new_unchecked(190), 15),                      // Near the end
893            ];
894
895            for (start_loc, max_ops) in test_cases {
896                // Skip if start_loc is before oldest retained
897                if start_loc < oldest_retained {
898                    continue;
899                }
900
901                let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
902
903                // Verify the proof still works
904                assert!(
905                    verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
906                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
907                );
908
909                // Check that we got operations
910                let expected_ops = std::cmp::min(max_ops, *db.bounds().end - *start_loc);
911                assert_eq!(
912                    ops.len() as u64,
913                    expected_ops,
914                    "Expected {expected_ops} operations, got {}",
915                    ops.len(),
916                );
917            }
918
919            // Test pruning more aggressively
920            const AGGRESSIVE_PRUNE: Location = Location::new_unchecked(150);
921            db.prune(AGGRESSIVE_PRUNE).await.unwrap();
922
923            let new_oldest = db.journal.bounds().start;
924            assert!(new_oldest <= AGGRESSIVE_PRUNE);
925
926            // Can still generate proofs for the remaining data
927            let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
928            assert!(
929                verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
930                "Proof should still verify after aggressive pruning"
931            );
932
933            // Test edge case: prune everything except the last few operations
934            let almost_all = db.bounds().end - 5;
935            db.prune(almost_all).await.unwrap();
936
937            let final_oldest = db.journal.bounds().start;
938
939            // Should still be able to prove the remaining operations
940            if final_oldest < db.bounds().end {
941                let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
942                assert!(
943                    verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
944                    "Should be able to prove remaining operations after extensive pruning"
945                );
946            }
947
948            db.destroy().await.unwrap();
949        });
950    }
951
952    #[test_traced("WARN")]
953    fn test_keyless_db_replay_with_trailing_appends() {
954        let executor = deterministic::Runner::default();
955        executor.start(|context| async move {
956            // Create initial database with committed data
957            let db = open_db(context.with_label("db1")).await;
958            let mut db = db.into_mutable();
959
960            // Add some initial operations and commit
961            for i in 0..10 {
962                let v = vec![i as u8; 10];
963                db.append(v).await.unwrap();
964            }
965            let (durable, _) = db.commit(None).await.unwrap();
966            let db = durable.into_merkleized();
967            let committed_root = db.root();
968            let committed_size = db.bounds().end;
969
970            // Add exactly one more append (uncommitted)
971            let uncommitted_value = vec![99u8; 20];
972            let mut db = db.into_mutable();
973            db.append(uncommitted_value.clone()).await.unwrap();
974
975            // Simulate failure without commit
976            drop(db);
977
978            // Reopen database
979            let db = open_db(context.with_label("db2")).await;
980
981            // Verify correct recovery
982            assert_eq!(
983                db.bounds().end,
984                committed_size,
985                "Should rewind to last commit"
986            );
987            assert_eq!(db.root(), committed_root, "Root should match last commit");
988            assert_eq!(
989                db.last_commit_loc(),
990                committed_size - 1,
991                "Last commit location should be correct"
992            );
993
994            // Verify the uncommitted append was properly discarded
995            // We should be able to append new data without issues
996            let mut db = db.into_mutable();
997            let new_value = vec![77u8; 15];
998            let loc = db.append(new_value.clone()).await.unwrap();
999            assert_eq!(
1000                loc, committed_size,
1001                "New append should get the expected location"
1002            );
1003
1004            // Verify we can read the new value
1005            assert_eq!(db.get(loc).await.unwrap(), Some(new_value));
1006
1007            // Test with multiple trailing appends to ensure robustness
1008            let (durable, _) = db.commit(None).await.unwrap();
1009            let db = durable.into_merkleized();
1010            let new_committed_root = db.root();
1011            let new_committed_size = db.bounds().end;
1012
1013            // Add multiple uncommitted appends
1014            let mut db = db.into_mutable();
1015            for i in 0..5 {
1016                let v = vec![(200 + i) as u8; 10];
1017                db.append(v).await.unwrap();
1018            }
1019
1020            // Simulate failure without commit
1021            drop(db);
1022
1023            // Reopen and verify correct recovery
1024            let db = open_db(context.with_label("db3")).await;
1025            assert_eq!(
1026                db.bounds().end,
1027                new_committed_size,
1028                "Should rewind to last commit with multiple trailing appends"
1029            );
1030            assert_eq!(
1031                db.root(),
1032                new_committed_root,
1033                "Root should match last commit after multiple appends"
1034            );
1035            assert_eq!(
1036                db.last_commit_loc(),
1037                new_committed_size - 1,
1038                "Last commit location should be correct after multiple appends"
1039            );
1040
1041            db.destroy().await.unwrap();
1042        });
1043    }
1044
1045    #[test_traced("INFO")]
1046    pub fn test_keyless_db_get_out_of_bounds() {
1047        let executor = deterministic::Runner::default();
1048        executor.start(|context| async move {
1049            let db = open_db(context.clone()).await;
1050
1051            // Test getting from empty database
1052            let result = db.get(Location::new_unchecked(0)).await.unwrap();
1053            assert!(result.is_none());
1054
1055            // Add some values
1056            let v1 = vec![1u8; 8];
1057            let v2 = vec![2u8; 8];
1058            let mut db = db.into_mutable();
1059            db.append(v1.clone()).await.unwrap();
1060            db.append(v2.clone()).await.unwrap();
1061            let (durable, _) = db.commit(None).await.unwrap();
1062
1063            // Test getting valid locations - should succeed
1064            assert_eq!(durable.get(Location::new_unchecked(1)).await.unwrap().unwrap(), v1);
1065            assert_eq!(durable.get(Location::new_unchecked(2)).await.unwrap().unwrap(), v2);
1066
1067            // Test getting out of bounds location
1068            let result = durable.get(Location::new_unchecked(3)).await.unwrap();
1069            assert!(result.is_none());
1070
1071            // Test getting out of bounds location
1072            let result = durable.get(Location::new_unchecked(4)).await;
1073            assert!(
1074                matches!(result, Err(Error::LocationOutOfBounds(loc, size)) if loc == Location::new_unchecked(4) && size == Location::new_unchecked(4))
1075            );
1076
1077            let db = durable.into_merkleized();
1078            db.destroy().await.unwrap();
1079        });
1080    }
1081
1082    #[test_traced("INFO")]
1083    pub fn test_keyless_db_prune_beyond_commit() {
1084        let executor = deterministic::Runner::default();
1085        executor.start(|context| async move {
1086            let mut db = open_db(context.clone()).await;
1087
1088            // Test pruning empty database (no commits)
1089            let result = db.prune(Location::new_unchecked(1)).await;
1090            assert!(
1091                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1092                    if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
1093            );
1094
1095            // Add values and commit
1096            let v1 = vec![1u8; 8];
1097            let v2 = vec![2u8; 8];
1098            let v3 = vec![3u8; 8];
1099            let mut db = db.into_mutable();
1100            db.append(v1.clone()).await.unwrap();
1101            db.append(v2.clone()).await.unwrap();
1102            let (db, _) = db.commit(None).await.unwrap();
1103            let mut db = db.into_mutable();
1104            db.append(v3.clone()).await.unwrap();
1105
1106            // op_count is 5 (initial_commit, v1, v2, commit, v3), last_commit_loc is 3
1107            let last_commit = db.last_commit_loc();
1108            assert_eq!(last_commit, Location::new_unchecked(3));
1109
1110            // Test valid prune (at last commit) - need Clean state for prune
1111            let (durable, _) = db.commit(None).await.unwrap();
1112            let mut db = durable.into_merkleized();
1113            assert!(db.prune(Location::new_unchecked(3)).await.is_ok());
1114
1115            // Test pruning beyond last commit
1116            let new_last_commit = db.last_commit_loc();
1117            let beyond = Location::new_unchecked(*new_last_commit + 1);
1118            let result = db.prune(beyond).await;
1119            assert!(
1120                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1121                    if prune_loc == beyond && commit_loc == new_last_commit)
1122            );
1123
1124            db.destroy().await.unwrap();
1125        });
1126    }
1127
1128    use crate::{
1129        kv::tests::assert_send,
1130        qmdb::store::tests::{assert_log_store, assert_merkleized_store, assert_prunable_store},
1131    };
1132
1133    #[allow(dead_code)]
1134    fn assert_clean_db_futures_are_send(db: &mut CleanDb, loc: Location) {
1135        assert_log_store(db);
1136        assert_prunable_store(db, loc);
1137        assert_merkleized_store(db, loc);
1138        assert_send(db.sync());
1139        assert_send(db.get(loc));
1140    }
1141
1142    #[allow(dead_code)]
1143    fn assert_mutable_db_futures_are_send(db: &mut MutableDb, loc: Location, value: Vec<u8>) {
1144        assert_log_store(db);
1145        assert_send(db.get(loc));
1146        assert_send(db.append(value));
1147    }
1148
1149    #[allow(dead_code)]
1150    fn assert_mutable_db_commit_is_send(db: MutableDb) {
1151        assert_send(db.commit(None));
1152    }
1153}