commonware_storage/qmdb/keyless/
mod.rs

1//! The [Keyless] qmdb allows for append-only storage of arbitrary variable-length data that can
2//! later be retrieved by its location.
3//!
4//! The implementation consists of an `mmr` over the operations applied to the database and an
5//! operations `log` storing these operations.
6//!
7//! The state of the operations log up until the last commit point is the "source of truth". In the
8//! event of unclean shutdown, the mmr will be brought back into alignment with the log on startup.
9
10use crate::{
11    journal::{
12        authenticated,
13        contiguous::variable::{Config as JournalConfig, Journal as ContiguousJournal},
14    },
15    mmr::{
16        journaled::Config as MmrConfig,
17        mem::{Clean, Dirty, State},
18        Location, Proof,
19    },
20    qmdb::{any::VariableValue, operation::Committable, Error},
21};
22use commonware_cryptography::{DigestOf, Hasher};
23use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage, ThreadPool};
24use core::ops::Range;
25use std::num::{NonZeroU64, NonZeroUsize};
26use tracing::{debug, warn};
27
28mod operation;
29pub use operation::Operation;
30
31/// Configuration for a [Keyless] authenticated db.
32#[derive(Clone)]
33pub struct Config<C> {
34    /// The name of the [Storage] partition used for the MMR's backing journal.
35    pub mmr_journal_partition: String,
36
37    /// The items per blob configuration value used by the MMR journal.
38    pub mmr_items_per_blob: NonZeroU64,
39
40    /// The size of the write buffer to use for each blob in the MMR journal.
41    pub mmr_write_buffer: NonZeroUsize,
42
43    /// The name of the [Storage] partition used for the MMR's metadata.
44    pub mmr_metadata_partition: String,
45
46    /// The name of the [Storage] partition used to persist the operations log.
47    pub log_partition: String,
48
49    /// The size of the write buffer to use with the log journal.
50    pub log_write_buffer: NonZeroUsize,
51
52    /// Optional compression level (using `zstd`) to apply to log data before storing.
53    pub log_compression: Option<u8>,
54
55    /// The codec configuration to use for encoding and decoding the operations log.
56    pub log_codec_config: C,
57
58    /// The max number of operations to put in each section of the operations log.
59    pub log_items_per_section: NonZeroU64,
60
61    /// An optional thread pool to use for parallelizing batch MMR operations.
62    pub thread_pool: Option<ThreadPool>,
63
64    /// The buffer pool to use for caching data.
65    pub buffer_pool: PoolRef,
66}
67
68/// A keyless QMDB for variable length data.
69type Journal<E, V, H, S> = authenticated::Journal<E, ContiguousJournal<E, Operation<V>>, H, S>;
70
71pub struct Keyless<
72    E: Storage + Clock + Metrics,
73    V: VariableValue,
74    H: Hasher,
75    S: State<DigestOf<H>> = Dirty,
76> {
77    /// Authenticated journal of operations.
78    journal: Journal<E, V, H, S>,
79
80    /// The location of the last commit, if any.
81    last_commit_loc: Location,
82}
83
84impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, S: State<DigestOf<H>>>
85    Keyless<E, V, H, S>
86{
87    /// Get the value at location `loc` in the database.
88    ///
89    /// # Errors
90    ///
91    /// Returns [Error::LocationOutOfBounds] if `loc` >= `self.op_count()`.
92    pub async fn get(&self, loc: Location) -> Result<Option<V>, Error> {
93        let op_count = self.op_count();
94        if loc >= op_count {
95            return Err(Error::LocationOutOfBounds(loc, op_count));
96        }
97        let op = self.journal.read(loc).await?;
98
99        Ok(op.into_value())
100    }
101
102    /// Get the number of operations (appends + commits) that have been applied to the db since
103    /// inception.
104    pub fn op_count(&self) -> Location {
105        self.journal.size()
106    }
107
108    /// Returns the location of the last commit.
109    pub const fn last_commit_loc(&self) -> Location {
110        self.last_commit_loc
111    }
112
113    /// Return the oldest location that remains retrievable.
114    pub fn oldest_retained_loc(&self) -> Location {
115        self.journal
116            .oldest_retained_loc()
117            .expect("at least one operation should exist")
118    }
119
120    /// Append a value to the db, returning its location which can be used to retrieve it.
121    pub async fn append(&mut self, value: V) -> Result<Location, Error> {
122        self.journal
123            .append(Operation::Append(value))
124            .await
125            .map_err(Into::into)
126    }
127
128    /// Get the metadata associated with the last commit.
129    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
130        let op = self.journal.read(self.last_commit_loc).await?;
131        let Operation::Commit(metadata) = op else {
132            return Ok(None);
133        };
134
135        Ok(metadata)
136    }
137}
138
139impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> Keyless<E, V, H, Clean<H::Digest>> {
140    /// Returns a [Keyless] qmdb initialized from `cfg`. Any uncommitted operations will be discarded
141    /// and the state of the db will be as of the last committed operation.
142    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
143        let mmr_cfg = MmrConfig {
144            journal_partition: cfg.mmr_journal_partition,
145            metadata_partition: cfg.mmr_metadata_partition,
146            items_per_blob: cfg.mmr_items_per_blob,
147            write_buffer: cfg.mmr_write_buffer,
148            thread_pool: cfg.thread_pool,
149            buffer_pool: cfg.buffer_pool.clone(),
150        };
151
152        let journal_cfg = JournalConfig {
153            partition: cfg.log_partition,
154            items_per_section: cfg.log_items_per_section,
155            compression: cfg.log_compression,
156            codec_config: cfg.log_codec_config,
157            buffer_pool: cfg.buffer_pool,
158            write_buffer: cfg.log_write_buffer,
159        };
160
161        let mut journal = Journal::new(context, mmr_cfg, journal_cfg, Operation::is_commit).await?;
162        if journal.size() == 0 {
163            warn!("no operations found in log, creating initial commit");
164            journal.append(Operation::Commit(None)).await?;
165            journal.sync().await?;
166        }
167
168        let last_commit_loc = journal
169            .size()
170            .checked_sub(1)
171            .expect("at least one commit should exist");
172
173        Ok(Self {
174            journal,
175            last_commit_loc,
176        })
177    }
178
179    /// Return the root of the db.
180    pub const fn root(&self) -> H::Digest {
181        self.journal.root()
182    }
183
184    /// Generate and return:
185    ///  1. a proof of all operations applied to the db in the range starting at (and including)
186    ///     location `start_loc`, and ending at the first of either:
187    ///     - the last operation performed, or
188    ///     - the operation `max_ops` from the start.
189    ///  2. the operations corresponding to the leaves in this range.
190    pub async fn proof(
191        &self,
192        start_loc: Location,
193        max_ops: NonZeroU64,
194    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
195        self.historical_proof(self.op_count(), start_loc, max_ops)
196            .await
197    }
198
199    /// Analogous to proof, but with respect to the state of the MMR when it had `op_count`
200    /// operations.
201    ///
202    /// # Errors
203    ///
204    /// - Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
205    ///   [crate::mmr::MAX_LOCATION].
206    /// - Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= `op_count` or `op_count` >
207    ///   number of operations.
208    pub async fn historical_proof(
209        &self,
210        op_count: Location,
211        start_loc: Location,
212        max_ops: NonZeroU64,
213    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
214        Ok(self
215            .journal
216            .historical_proof(op_count, start_loc, max_ops)
217            .await?)
218    }
219
220    /// Commit any pending operations to the database, ensuring their durability upon return from
221    /// this function. Caller can associate an arbitrary `metadata` value with the commit. Returns
222    /// the `(start_loc, end_loc]` location range of committed operations. The end of the returned
223    /// range includes the commit operation itself, and hence will always be equal to `op_count`.
224    ///
225    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
226    /// recover the database on restart.
227    pub async fn commit(&mut self, metadata: Option<V>) -> Result<Range<Location>, Error> {
228        let start_loc = self.last_commit_loc + 1;
229        self.last_commit_loc = self.journal.append(Operation::Commit(metadata)).await?;
230        self.journal.commit().await?;
231        debug!(size = ?self.op_count(), "committed db");
232
233        Ok(start_loc..self.op_count())
234    }
235
236    /// Prune historical operations prior to `loc`. This does not affect the db's root.
237    ///
238    /// # Errors
239    ///
240    /// - Returns [Error::PruneBeyondMinRequired] if `loc` > last commit point.
241    /// - Returns [crate::mmr::Error::LocationOverflow] if `loc` > [crate::mmr::MAX_LOCATION]
242    pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
243        if loc > self.last_commit_loc {
244            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
245        }
246        self.journal.prune(loc).await?;
247
248        Ok(())
249    }
250
251    /// Sync all database state to disk. While this isn't necessary to ensure durability of
252    /// committed operations, periodic invocation may reduce memory usage and the time required to
253    /// recover the database on restart.
254    pub async fn sync(&mut self) -> Result<(), Error> {
255        self.journal.sync().await.map_err(Into::into)
256    }
257
258    /// Close the db. Operations that have not been committed will be lost.
259    pub async fn close(self) -> Result<(), Error> {
260        Ok(self.journal.close().await?)
261    }
262
263    /// Destroy the db, removing all data from disk.
264    pub async fn destroy(self) -> Result<(), Error> {
265        Ok(self.journal.destroy().await?)
266    }
267
268    #[cfg(any(test, feature = "fuzzing"))]
269    /// Simulate failure by consuming the db but without syncing / closing the various structures.
270    pub async fn simulate_failure(mut self, sync_log: bool, sync_mmr: bool) -> Result<(), Error> {
271        if sync_log {
272            self.journal.journal.sync().await.map_err(Error::Journal)?;
273        }
274        if sync_mmr {
275            self.journal.mmr.sync().await.map_err(Error::Mmr)?;
276        }
277
278        Ok(())
279    }
280
281    #[cfg(test)]
282    /// Simulate pruning failure by consuming the db and abandoning pruning operation mid-flight.
283    pub(super) async fn simulate_prune_failure(mut self, loc: Location) -> Result<(), Error> {
284        if loc > self.last_commit_loc {
285            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
286        }
287        // Perform the same steps as pruning except "crash" right after the log is pruned.
288        self.journal.mmr.sync().await.map_err(Error::Mmr)?;
289        assert!(
290            self.journal
291                .journal
292                .prune(*loc)
293                .await
294                .map_err(Error::Journal)?,
295            "nothing was pruned, so could not simulate failure"
296        );
297
298        // "fail" before mmr is pruned.
299        Ok(())
300    }
301
302    /// Convert this database into its dirty counterpart for batched updates.
303    pub fn into_dirty(self) -> Keyless<E, V, H, Dirty> {
304        Keyless {
305            journal: self.journal.into_dirty(),
306            last_commit_loc: self.last_commit_loc,
307        }
308    }
309}
310
311impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> Keyless<E, V, H, Dirty> {
312    /// Merkleize the database and compute the root digest.
313    pub fn merkleize(self) -> Keyless<E, V, H, Clean<H::Digest>> {
314        Keyless {
315            journal: self.journal.merkleize(),
316            last_commit_loc: self.last_commit_loc,
317        }
318    }
319}
320
321impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, S: State<DigestOf<H>>>
322    crate::qmdb::store::LogStore for Keyless<E, V, H, S>
323{
324    type Value = V;
325
326    fn op_count(&self) -> Location {
327        self.op_count()
328    }
329
330    // All unpruned operations are active in a keyless store.
331    fn inactivity_floor_loc(&self) -> Location {
332        self.journal.pruning_boundary()
333    }
334
335    fn is_empty(&self) -> bool {
336        self.op_count() == 0
337    }
338
339    async fn get_metadata(&self) -> Result<Option<V>, Error> {
340        self.get_metadata().await
341    }
342}
343
344impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> crate::qmdb::store::CleanStore
345    for Keyless<E, V, H, Clean<H::Digest>>
346{
347    type Digest = H::Digest;
348    type Operation = Operation<V>;
349    type Dirty = Keyless<E, V, H, Dirty>;
350
351    fn root(&self) -> Self::Digest {
352        self.root()
353    }
354
355    async fn proof(
356        &self,
357        start_loc: Location,
358        max_ops: NonZeroU64,
359    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
360        self.proof(start_loc, max_ops).await
361    }
362
363    async fn historical_proof(
364        &self,
365        historical_size: Location,
366        start_loc: Location,
367        max_ops: NonZeroU64,
368    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
369        self.historical_proof(historical_size, start_loc, max_ops)
370            .await
371    }
372
373    fn into_dirty(self) -> Self::Dirty {
374        self.into_dirty()
375    }
376}
377
378impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> crate::qmdb::store::DirtyStore
379    for Keyless<E, V, H, Dirty>
380{
381    type Digest = H::Digest;
382    type Operation = Operation<V>;
383    type Clean = Keyless<E, V, H, Clean<H::Digest>>;
384
385    async fn merkleize(self) -> Result<Self::Clean, Error> {
386        Ok(self.merkleize())
387    }
388}
389
390#[cfg(test)]
391mod test {
392    use super::*;
393    use crate::{mmr::StandardHasher as Standard, qmdb::verify_proof};
394    use commonware_cryptography::Sha256;
395    use commonware_macros::test_traced;
396    use commonware_runtime::{deterministic, Runner as _};
397    use commonware_utils::{NZUsize, NZU64};
398    use rand::Rng;
399
400    // Use some weird sizes here to test boundary conditions.
401    const PAGE_SIZE: usize = 101;
402    const PAGE_CACHE_SIZE: usize = 11;
403
404    fn db_config(suffix: &str) -> Config<(commonware_codec::RangeCfg<usize>, ())> {
405        Config {
406            mmr_journal_partition: format!("journal_{suffix}"),
407            mmr_metadata_partition: format!("metadata_{suffix}"),
408            mmr_items_per_blob: NZU64!(11),
409            mmr_write_buffer: NZUsize!(1024),
410            log_partition: format!("log_journal_{suffix}"),
411            log_write_buffer: NZUsize!(1024),
412            log_compression: None,
413            log_codec_config: ((0..=10000).into(), ()),
414            log_items_per_section: NZU64!(7),
415            thread_pool: None,
416            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
417        }
418    }
419
420    /// A type alias for the concrete [Keyless] type used in these unit tests.
421    type Db = Keyless<deterministic::Context, Vec<u8>, Sha256, Clean<<Sha256 as Hasher>::Digest>>;
422
423    /// Return a [Keyless] database initialized with a fixed config.
424    async fn open_db(context: deterministic::Context) -> Db {
425        Db::init(context, db_config("partition")).await.unwrap()
426    }
427
428    #[test_traced("INFO")]
429    pub fn test_keyless_db_empty() {
430        let executor = deterministic::Runner::default();
431        executor.start(|context| async move {
432            let mut db = open_db(context.clone()).await;
433            assert_eq!(db.op_count(), 1); // initial commit should exist
434            assert_eq!(db.oldest_retained_loc(), Location::new_unchecked(0));
435
436            assert_eq!(db.get_metadata().await.unwrap(), None);
437            assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
438
439            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
440            let v1 = vec![1u8; 8];
441            let root = db.root();
442            db.append(v1).await.unwrap();
443            db.close().await.unwrap();
444            let mut db = open_db(context.clone()).await;
445            assert_eq!(db.root(), root);
446            assert_eq!(db.op_count(), 1);
447            assert_eq!(db.get_metadata().await.unwrap(), None);
448
449            // Test calling commit on an empty db which should make it (durably) non-empty.
450            let metadata = vec![3u8; 10];
451            db.commit(Some(metadata.clone())).await.unwrap();
452            assert_eq!(db.op_count(), 2); // 2 commit ops
453            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
454            assert_eq!(
455                db.get(Location::new_unchecked(1)).await.unwrap(),
456                Some(metadata.clone())
457            ); // the commit op
458            let root = db.root();
459
460            // Commit op should remain after reopen even without clean shutdown.
461            let db = open_db(context.clone()).await;
462            assert_eq!(db.op_count(), 2); // commit op should remain after re-open.
463            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
464            assert_eq!(db.root(), root);
465            assert_eq!(db.last_commit_loc(), Location::new_unchecked(1));
466
467            db.destroy().await.unwrap();
468        });
469    }
470
471    #[test_traced("WARN")]
472    pub fn test_keyless_db_build_basic() {
473        let executor = deterministic::Runner::default();
474        executor.start(|context| async move {
475            // Build a db with 2 values and make sure we can get them back.
476            let mut db = open_db(context.clone()).await;
477
478            let v1 = vec![1u8; 8];
479            let v2 = vec![2u8; 20];
480
481            let loc1 = db.append(v1.clone()).await.unwrap();
482            assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
483
484            let loc2 = db.append(v2.clone()).await.unwrap();
485            assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
486
487            // Make sure closing/reopening gets us back to the same state.
488            db.commit(None).await.unwrap();
489            assert_eq!(db.op_count(), 4); // 2 appends, 1 commit + 1 initial commit
490            assert_eq!(db.get_metadata().await.unwrap(), None);
491            assert_eq!(db.get(Location::new_unchecked(3)).await.unwrap(), None); // the commit op
492            let root = db.root();
493            db.close().await.unwrap();
494            let mut db = open_db(context.clone()).await;
495            assert_eq!(db.op_count(), 4);
496            assert_eq!(db.root(), root);
497
498            assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
499            assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
500
501            db.append(v2).await.unwrap();
502            db.append(v1).await.unwrap();
503
504            // Make sure uncommitted items get rolled back.
505            db.close().await.unwrap();
506            let db = open_db(context.clone()).await;
507            assert_eq!(db.op_count(), 4);
508            assert_eq!(db.root(), root);
509
510            // Make sure commit operation remains after close/reopen.
511            db.close().await.unwrap();
512            let db = open_db(context.clone()).await;
513            assert_eq!(db.op_count(), 4);
514            assert_eq!(db.root(), root);
515
516            db.destroy().await.unwrap();
517        });
518    }
519
520    // Helper function to append random elements to a database.
521    async fn append_elements<T: Rng>(db: &mut Db, rng: &mut T, num_elements: usize) {
522        for _ in 0..num_elements {
523            let value = vec![(rng.next_u32() % 255) as u8, (rng.next_u32() % 255) as u8];
524            db.append(value).await.unwrap();
525        }
526    }
527
528    #[test_traced("WARN")]
529    pub fn test_keyless_db_recovery() {
530        let executor = deterministic::Runner::default();
531        const ELEMENTS: usize = 1000;
532        executor.start(|mut context| async move {
533            let mut db = open_db(context.clone()).await;
534            let root = db.root();
535
536            append_elements(&mut db, &mut context, ELEMENTS).await;
537
538            // Simulate a failure before committing.
539            db.simulate_failure(false, false).await.unwrap();
540            // Should rollback to the previous root.
541            let mut db = open_db(context.clone()).await;
542            assert_eq!(root, db.root());
543
544            // Re-apply the updates and commit them this time.
545            append_elements(&mut db, &mut context, ELEMENTS).await;
546            db.commit(None).await.unwrap();
547            let root = db.root();
548
549            // Append more values.
550            append_elements(&mut db, &mut context, ELEMENTS).await;
551
552            // Simulate a failure.
553            db.simulate_failure(false, false).await.unwrap();
554            // Should rollback to the previous root.
555            let mut db = open_db(context.clone()).await;
556            assert_eq!(root, db.root());
557
558            // Re-apply the updates.
559            append_elements(&mut db, &mut context, ELEMENTS).await;
560            // Simulate a failure after syncing log but not MMR.
561            db.simulate_failure(true, false).await.unwrap();
562            // Should rollback to the previous root.
563            let mut db = open_db(context.clone()).await;
564            assert_eq!(root, db.root());
565
566            // Re-apply the updates.
567            append_elements(&mut db, &mut context, ELEMENTS).await;
568            // Simulate a failure after syncing MMR but not log.
569            db.simulate_failure(false, true).await.unwrap();
570            // Should rollback to the previous root.
571            let mut db = open_db(context.clone()).await;
572            assert_eq!(root, db.root());
573
574            // Re-apply the updates and commit them this time.
575            append_elements(&mut db, &mut context, ELEMENTS).await;
576            db.commit(None).await.unwrap();
577            let root = db.root();
578
579            // Make sure we can close/reopen and get back to the same state.
580            db.close().await.unwrap();
581            let db = open_db(context.clone()).await;
582            assert_eq!(db.op_count(), 2 * ELEMENTS as u64 + 3);
583            assert_eq!(db.root(), root);
584
585            db.destroy().await.unwrap();
586        });
587    }
588
589    /// Test that various types of unclean shutdown while updating a non-empty DB recover to the
590    /// empty DB on re-open.
591    #[test_traced("WARN")]
592    fn test_keyless_db_non_empty_db_recovery() {
593        let executor = deterministic::Runner::default();
594        executor.start(|mut context| async move {
595            let mut db = open_db(context.clone()).await;
596
597            // Append many values then commit.
598            const ELEMENTS: usize = 200;
599            append_elements(&mut db, &mut context, ELEMENTS).await;
600            db.commit(None).await.unwrap();
601            let root = db.root();
602            let op_count = db.op_count();
603
604            // Reopen DB without clean shutdown and make sure the state is the same.
605            let db = open_db(context.clone()).await;
606            assert_eq!(db.op_count(), op_count);
607            assert_eq!(db.root(), root);
608            assert_eq!(db.last_commit_loc(), op_count - 1);
609            db.close().await.unwrap();
610
611            // Insert many operations without commit, then simulate various types of failures.
612            async fn recover_from_failure(
613                mut context: deterministic::Context,
614                root: <Sha256 as Hasher>::Digest,
615                op_count: Location,
616            ) {
617                let mut db = open_db(context.clone()).await;
618
619                // Append operations and simulate failure.
620                append_elements(&mut db, &mut context, ELEMENTS).await;
621                db.simulate_failure(false, false).await.unwrap();
622                let mut db = open_db(context.clone()).await;
623                assert_eq!(db.op_count(), op_count);
624                assert_eq!(db.root(), root);
625
626                // Append operations and simulate failure after syncing log but not MMR.
627                append_elements(&mut db, &mut context, ELEMENTS).await;
628                db.simulate_failure(true, false).await.unwrap();
629                let mut db = open_db(context.clone()).await;
630                assert_eq!(db.op_count(), op_count);
631                assert_eq!(db.root(), root);
632
633                // Append operations and simulate failure after syncing MMR but not log.
634                append_elements(&mut db, &mut context, ELEMENTS).await;
635                db.simulate_failure(false, true).await.unwrap();
636                let db = open_db(context.clone()).await;
637                assert_eq!(db.op_count(), op_count);
638                assert_eq!(db.root(), root);
639            }
640
641            recover_from_failure(context.clone(), root, op_count).await;
642
643            // Simulate a failure during pruning and ensure we recover.
644            let db = open_db(context.clone()).await;
645            let last_commit_loc = db.last_commit_loc();
646            db.simulate_prune_failure(last_commit_loc).await.unwrap();
647            let db = open_db(context.clone()).await;
648            assert_eq!(db.op_count(), op_count);
649            assert_eq!(db.root(), root);
650            db.close().await.unwrap();
651
652            // Repeat recover_from_failure tests after successfully pruning to the last commit.
653            let mut db = open_db(context.clone()).await;
654            db.prune(db.last_commit_loc()).await.unwrap();
655            assert_eq!(db.op_count(), op_count);
656            assert_eq!(db.root(), root);
657            db.close().await.unwrap();
658
659            recover_from_failure(context.clone(), root, op_count).await;
660
661            // Apply the ops one last time but fully commit them this time, then clean up.
662            let mut db = open_db(context.clone()).await;
663            append_elements(&mut db, &mut context, ELEMENTS).await;
664            db.commit(None).await.unwrap();
665            let db = open_db(context.clone()).await;
666            assert!(db.op_count() > op_count);
667            assert_ne!(db.root(), root);
668            assert_eq!(db.last_commit_loc(), db.op_count() - 1);
669
670            db.destroy().await.unwrap();
671        });
672    }
673
674    /// Test that various types of unclean shutdown while updating an empty DB recover to the empty
675    /// DB on re-open.
676    #[test_traced("WARN")]
677    fn test_keyless_db_empty_db_recovery() {
678        const ELEMENTS: u64 = 1000;
679        let executor = deterministic::Runner::default();
680        executor.start(|context| async move {
681            let db = open_db(context.clone()).await;
682            let root = db.root();
683
684            // Reopen DB without clean shutdown and make sure the state is the same.
685            let mut db = open_db(context.clone()).await;
686            assert_eq!(db.op_count(), 1); // initial commit should exist
687            assert_eq!(db.root(), root);
688
689            async fn apply_ops(db: &mut Db) {
690                for i in 0..ELEMENTS {
691                    let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
692                    db.append(v).await.unwrap();
693                }
694            }
695
696            // Simulate various failure types after inserting operations without a commit.
697            apply_ops(&mut db).await;
698            db.simulate_failure(false, false).await.unwrap();
699            let mut db = open_db(context.clone()).await;
700            assert_eq!(db.op_count(), 1); // initial commit should exist
701            assert_eq!(db.root(), root);
702
703            apply_ops(&mut db).await;
704            db.simulate_failure(true, false).await.unwrap();
705            let mut db = open_db(context.clone()).await;
706            assert_eq!(db.op_count(), 1); // initial commit should exist
707            assert_eq!(db.root(), root);
708
709            apply_ops(&mut db).await;
710            db.simulate_failure(false, false).await.unwrap();
711            let mut db = open_db(context.clone()).await;
712            assert_eq!(db.op_count(), 1); // initial commit should exist
713            assert_eq!(db.root(), root);
714
715            apply_ops(&mut db).await;
716            db.simulate_failure(false, true).await.unwrap();
717            let mut db = open_db(context.clone()).await;
718            assert_eq!(db.op_count(), 1); // initial commit should exist
719            assert_eq!(db.root(), root);
720
721            apply_ops(&mut db).await;
722            db.simulate_failure(true, false).await.unwrap();
723            let mut db = open_db(context.clone()).await;
724            assert_eq!(db.op_count(), 1); // initial commit should exist
725            assert_eq!(db.root(), root);
726
727            apply_ops(&mut db).await;
728            db.simulate_failure(true, true).await.unwrap();
729            let mut db = open_db(context.clone()).await;
730            assert_eq!(db.op_count(), 1); // initial commit should exist
731            assert_eq!(db.root(), root);
732
733            apply_ops(&mut db).await;
734            db.simulate_failure(false, true).await.unwrap();
735            let mut db = open_db(context.clone()).await;
736            assert_eq!(db.op_count(), 1); // initial commit should exist
737            assert_eq!(db.root(), root);
738
739            // One last check that re-open without proper shutdown still recovers the correct state.
740            apply_ops(&mut db).await;
741            apply_ops(&mut db).await;
742            apply_ops(&mut db).await;
743            let mut db = open_db(context.clone()).await;
744            assert_eq!(db.op_count(), 1); // initial commit should exist
745            assert_eq!(db.root(), root);
746            assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
747
748            // Apply the ops one last time but fully commit them this time, then clean up.
749            apply_ops(&mut db).await;
750            db.commit(None).await.unwrap();
751            let db = open_db(context.clone()).await;
752            assert!(db.op_count() > 1);
753            assert_ne!(db.root(), root);
754
755            db.destroy().await.unwrap();
756        });
757    }
758
759    #[test_traced("INFO")]
760    pub fn test_keyless_db_proof_generation_and_verification() {
761        let executor = deterministic::Runner::default();
762        executor.start(|context| async move {
763            let mut hasher = Standard::<Sha256>::new();
764            let mut db = open_db(context.clone()).await;
765
766            // Build a db with some values
767            const ELEMENTS: u64 = 100;
768            let mut values = Vec::new();
769            for i in 0u64..ELEMENTS {
770                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
771                values.push(v.clone());
772                db.append(v).await.unwrap();
773            }
774            db.commit(None).await.unwrap();
775
776            // Test that historical proof fails with op_count > number of operations
777            assert!(matches!(
778                db.historical_proof(db.op_count() + 1, Location::new_unchecked(5), NZU64!(10))
779                    .await,
780                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
781            ));
782
783            let root = db.root();
784
785            // Test proof generation for various ranges
786            let test_cases = vec![
787                (0, 10),           // First 10 operations
788                (10, 5),           // Middle range
789                (50, 20),          // Larger range
790                (90, 15),          // Range that extends beyond end (should be limited)
791                (0, 1),            // Single operation
792                (ELEMENTS - 1, 1), // Last append operation
793                (ELEMENTS, 1),     // The commit operation
794            ];
795
796            for (start_loc, max_ops) in test_cases {
797                let (proof, ops) = db.proof(Location::new_unchecked(start_loc), NZU64!(max_ops)).await.unwrap();
798
799                // Verify the proof
800                assert!(
801                    verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &root),
802                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
803                );
804
805                // Check that we got the expected number of operations
806                let expected_ops = std::cmp::min(max_ops, *db.op_count() - start_loc);
807                assert_eq!(
808                    ops.len() as u64,
809                    expected_ops,
810                    "Expected {expected_ops} operations, got {}",
811                    ops.len(),
812                );
813
814                // Verify operation types
815                for (i, op) in ops.iter().enumerate() {
816                    let loc = start_loc + i as u64;
817                    if loc == 0 {
818                         assert!(
819                            matches!(op, Operation::Commit(None)),
820                            "Expected Initial Commit operation at location {loc}, got {op:?}",
821                        );
822                    } else if loc <= ELEMENTS {
823                        // Should be an Append operation
824                        assert!(
825                            matches!(op, Operation::Append(_)),
826                            "Expected Append operation at location {loc}, got {op:?}",
827                        );
828                    } else if loc == ELEMENTS + 1 {
829                        // Should be a Commit operation
830                        assert!(
831                            matches!(op, Operation::Commit(_)),
832                            "Expected Commit operation at location {loc}, got {op:?}",
833                        );
834                    }
835                }
836
837                // Verify that proof fails with wrong root
838                let wrong_root = Sha256::hash(&[0xFF; 32]);
839                assert!(
840                    !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &wrong_root),
841                    "Proof should fail with wrong root"
842                );
843
844                // Verify that proof fails with wrong start location
845                if start_loc > 0 {
846                    assert!(
847                        !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc - 1), &ops, &root),
848                        "Proof should fail with wrong start location"
849                    );
850                }
851            }
852
853            db.destroy().await.unwrap();
854        });
855    }
856
857    #[test_traced("INFO")]
858    pub fn test_keyless_db_proof_with_pruning() {
859        let executor = deterministic::Runner::default();
860        executor.start(|context| async move {
861            let mut hasher = Standard::<Sha256>::new();
862            let mut db = open_db(context.clone()).await;
863
864            // Build a db with some values
865            const ELEMENTS: u64 = 100;
866            let mut values = Vec::new();
867            for i in 0u64..ELEMENTS {
868                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
869                values.push(v.clone());
870                db.append(v).await.unwrap();
871            }
872            db.commit(None).await.unwrap();
873
874            // Add more elements and commit again
875            for i in ELEMENTS..ELEMENTS * 2 {
876                let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
877                values.push(v.clone());
878                db.append(v).await.unwrap();
879            }
880            db.commit(None).await.unwrap();
881            let root = db.root();
882
883            println!("last commit loc: {}", db.last_commit_loc());
884
885            // Prune the first 30 operations
886            const PRUNE_LOC: u64 = 30;
887            db.prune(Location::new_unchecked(PRUNE_LOC)).await.unwrap();
888
889            // Verify pruning worked
890            let oldest_retained = db.oldest_retained_loc();
891
892            // Root should remain the same after pruning
893            assert_eq!(
894                db.root(),
895                root,
896                "Root should not change after pruning"
897            );
898
899            db.close().await.unwrap();
900            let mut db = open_db(context.clone()).await;
901            assert_eq!(db.root(), root);
902            assert_eq!(db.op_count(), 2 * ELEMENTS + 3);
903            assert!(db.oldest_retained_loc() <= PRUNE_LOC);
904
905            // Test that we can't get pruned values
906            for i in 0..*oldest_retained {
907                let result = db.get(Location::new_unchecked(i)).await;
908                // Should either return None (for commit ops) or encounter pruned data
909                match result {
910                    Ok(None) => {} // Commit operation or pruned
911                    Ok(Some(_)) => {
912                        panic!("Should not be able to get pruned value at location {i}")
913                    }
914                    Err(_) => {} // Expected error for pruned data
915                }
916            }
917
918            // Test proof generation after pruning - should work for non-pruned ranges
919            let test_cases = vec![
920                (oldest_retained, 10), // Starting from oldest retained
921                (Location::new_unchecked(50), 20),                       // Middle range (if not pruned)
922                (Location::new_unchecked(150), 10),                      // Later range
923                (Location::new_unchecked(190), 15),                      // Near the end
924            ];
925
926            for (start_loc, max_ops) in test_cases {
927                // Skip if start_loc is before oldest retained
928                if start_loc < oldest_retained {
929                    continue;
930                }
931
932                let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
933
934                // Verify the proof still works
935                assert!(
936                    verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
937                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
938                );
939
940                // Check that we got operations
941                let expected_ops = std::cmp::min(max_ops, *db.op_count() - *start_loc);
942                assert_eq!(
943                    ops.len() as u64,
944                    expected_ops,
945                    "Expected {expected_ops} operations, got {}",
946                    ops.len(),
947                );
948            }
949
950            // Test pruning more aggressively
951            const AGGRESSIVE_PRUNE: Location = Location::new_unchecked(150);
952            db.prune(AGGRESSIVE_PRUNE).await.unwrap();
953
954            let new_oldest = db.oldest_retained_loc();
955            assert!(new_oldest <= AGGRESSIVE_PRUNE);
956
957            // Can still generate proofs for the remaining data
958            let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
959            assert!(
960                verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
961                "Proof should still verify after aggressive pruning"
962            );
963
964            // Test edge case: prune everything except the last few operations
965            let almost_all = db.op_count() - 5;
966            db.prune(almost_all).await.unwrap();
967
968            let final_oldest = db.oldest_retained_loc();
969
970            // Should still be able to prove the remaining operations
971            if final_oldest < db.op_count() {
972                let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
973                assert!(
974                    verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
975                    "Should be able to prove remaining operations after extensive pruning"
976                );
977            }
978
979            db.destroy().await.unwrap();
980        });
981    }
982
983    #[test_traced("WARN")]
984    fn test_keyless_db_replay_with_trailing_appends() {
985        let executor = deterministic::Runner::default();
986        executor.start(|context| async move {
987            // Create initial database with committed data
988            let mut db = open_db(context.clone()).await;
989
990            // Add some initial operations and commit
991            for i in 0..10 {
992                let v = vec![i as u8; 10];
993                db.append(v).await.unwrap();
994            }
995            db.commit(None).await.unwrap();
996            let committed_root = db.root();
997            let committed_size = db.op_count();
998
999            // Add exactly one more append (uncommitted)
1000            let uncommitted_value = vec![99u8; 20];
1001            db.append(uncommitted_value.clone()).await.unwrap();
1002
1003            // Sync only the log (not MMR)
1004            db.simulate_failure(true, false).await.unwrap();
1005
1006            // Reopen database
1007            let mut db = open_db(context.clone()).await;
1008
1009            // Verify correct recovery
1010            assert_eq!(
1011                db.op_count(),
1012                committed_size,
1013                "Should rewind to last commit"
1014            );
1015            assert_eq!(db.root(), committed_root, "Root should match last commit");
1016            assert_eq!(
1017                db.last_commit_loc(),
1018                committed_size - 1,
1019                "Last commit location should be correct"
1020            );
1021
1022            // Verify the uncommitted append was properly discarded
1023            // We should be able to append new data without issues
1024            let new_value = vec![77u8; 15];
1025            let loc = db.append(new_value.clone()).await.unwrap();
1026            assert_eq!(
1027                loc, committed_size,
1028                "New append should get the expected location"
1029            );
1030
1031            // Verify we can read the new value
1032            assert_eq!(db.get(loc).await.unwrap(), Some(new_value));
1033
1034            // Test with multiple trailing appends to ensure robustness
1035            db.commit(None).await.unwrap();
1036            let new_committed_root = db.root();
1037            let new_committed_size = db.op_count();
1038
1039            // Add multiple uncommitted appends
1040            for i in 0..5 {
1041                let v = vec![(200 + i) as u8; 10];
1042                db.append(v).await.unwrap();
1043            }
1044
1045            // Simulate the same partial failure scenario
1046            db.simulate_failure(true, false).await.unwrap();
1047
1048            // Reopen and verify correct recovery
1049            let db = open_db(context.clone()).await;
1050            assert_eq!(
1051                db.op_count(),
1052                new_committed_size,
1053                "Should rewind to last commit with multiple trailing appends"
1054            );
1055            assert_eq!(
1056                db.root(),
1057                new_committed_root,
1058                "Root should match last commit after multiple appends"
1059            );
1060            assert_eq!(
1061                db.last_commit_loc(),
1062                new_committed_size - 1,
1063                "Last commit location should be correct after multiple appends"
1064            );
1065
1066            db.destroy().await.unwrap();
1067        });
1068    }
1069
1070    #[test_traced("INFO")]
1071    pub fn test_keyless_db_get_out_of_bounds() {
1072        let executor = deterministic::Runner::default();
1073        executor.start(|context| async move {
1074            let mut db = open_db(context.clone()).await;
1075
1076            // Test getting from empty database
1077            let result = db.get(Location::new_unchecked(0)).await.unwrap();
1078            assert!(result.is_none());
1079
1080            // Add some values
1081            let v1 = vec![1u8; 8];
1082            let v2 = vec![2u8; 8];
1083            db.append(v1.clone()).await.unwrap();
1084            db.append(v2.clone()).await.unwrap();
1085            db.commit(None).await.unwrap();
1086
1087            // Test getting valid locations - should succeed
1088            assert_eq!(db.get(Location::new_unchecked(1)).await.unwrap().unwrap(), v1);
1089            assert_eq!(db.get(Location::new_unchecked(2)).await.unwrap().unwrap(), v2);
1090
1091            // Test getting out of bounds location
1092            let result = db.get(Location::new_unchecked(3)).await.unwrap();
1093            assert!(result.is_none());
1094
1095            // Test getting out of bounds location
1096            let result = db.get(Location::new_unchecked(4)).await;
1097            assert!(
1098                matches!(result, Err(Error::LocationOutOfBounds(loc, size)) if loc == Location::new_unchecked(4) && size == Location::new_unchecked(4))
1099            );
1100
1101            db.destroy().await.unwrap();
1102        });
1103    }
1104
1105    #[test_traced("INFO")]
1106    pub fn test_keyless_db_prune_beyond_commit() {
1107        let executor = deterministic::Runner::default();
1108        executor.start(|context| async move {
1109            let mut db = open_db(context.clone()).await;
1110
1111            // Test pruning empty database (no commits)
1112            let result = db.prune(Location::new_unchecked(1)).await;
1113            assert!(
1114                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1115                    if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
1116            );
1117
1118            // Add values and commit
1119            let v1 = vec![1u8; 8];
1120            let v2 = vec![2u8; 8];
1121            let v3 = vec![3u8; 8];
1122            db.append(v1.clone()).await.unwrap();
1123            db.append(v2.clone()).await.unwrap();
1124            db.commit(None).await.unwrap();
1125            db.append(v3.clone()).await.unwrap();
1126
1127            // op_count is 4 (v1, v2, commit, v3) + 1, last_commit_loc is 3
1128            let last_commit = db.last_commit_loc();
1129            assert_eq!(last_commit, Location::new_unchecked(3));
1130
1131            // Test valid prune (at last commit)
1132            assert!(db.prune(last_commit).await.is_ok());
1133
1134            // Add more and commit again
1135            db.commit(None).await.unwrap();
1136            let new_last_commit = db.last_commit_loc();
1137
1138            // Test pruning beyond last commit
1139            let beyond = Location::new_unchecked(*new_last_commit + 1);
1140            let result = db.prune(beyond).await;
1141            assert!(
1142                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1143                    if prune_loc == beyond && commit_loc == new_last_commit)
1144            );
1145
1146            db.destroy().await.unwrap();
1147        });
1148    }
1149}