commonware_storage/qmdb/store/
mod.rs

1//! A mutable key-value database that supports variable-sized values, but without authentication.
2//!
3//! # Terminology
4//!
5//! A _key_ in an unauthenticated database either has a _value_ or it doesn't. The _update_
6//! operation gives a key a specific value whether it previously had no value or had a different
7//! value.
8//!
9//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
10//! (2) it is an update operation, and (3) it is the most recent operation for that key.
11//!
12//! # Lifecycle
13//!
14//! 1. **Initialization**: Create with [Store::init] using a [Config]
15//! 2. **Insertion**: Use [Store::update] to assign a value to a given key
16//! 3. **Deletions**: Use [Store::delete] to remove a key's value
17//! 4. **Persistence**: Call [Store::commit] to make changes durable
18//! 5. **Queries**: Use [Store::get] to retrieve current values
19//! 6. **Cleanup**: Call [Store::close] to shutdown gracefully or [Store::destroy] to remove all
20//!    data
21//!
22//! # Pruning
23//!
24//! The database maintains a location before which all operations are inactive, called the
25//! _inactivity floor_. These items can be cleaned from storage by calling [Store::prune].
26//!
27//! # Example
28//!
29//! ```rust
30//! use commonware_storage::{
31//!     qmdb::store::{Config, Store},
32//!     translator::TwoCap,
33//! };
34//! use commonware_utils::{NZUsize, NZU64};
35//! use commonware_cryptography::{blake3::Digest, Digest as _};
36//! use commonware_math::algebra::Random;
37//! use commonware_runtime::{buffer::PoolRef, deterministic::Runner, Metrics, Runner as _};
38//!
39//! const PAGE_SIZE: usize = 77;
40//! const PAGE_CACHE_SIZE: usize = 9;
41//!
42//! let executor = Runner::default();
43//! executor.start(|mut ctx| async move {
44//!     let config = Config {
45//!         log_partition: "test_partition".to_string(),
46//!         log_write_buffer: NZUsize!(64 * 1024),
47//!         log_compression: None,
48//!         log_codec_config: (),
49//!         log_items_per_section: NZU64!(4),
50//!         translator: TwoCap,
51//!         buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
52//!     };
53//!     let mut store =
54//!         Store::<_, Digest, Digest, TwoCap>::init(ctx.with_label("store"), config)
55//!             .await
56//!             .unwrap();
57//!
58//!     // Insert a key-value pair
59//!     let k = Digest::random(&mut ctx);
60//!     let v = Digest::random(&mut ctx);
61//!     store.update(k, v).await.unwrap();
62//!
63//!     // Fetch the value
64//!     let fetched_value = store.get(&k).await.unwrap();
65//!     assert_eq!(fetched_value.unwrap(), v);
66//!
67//!     // Commit the operation to make it persistent
68//!     let metadata = Some(Digest::random(&mut ctx));
69//!     store.commit(metadata).await.unwrap();
70//!
71//!     // Delete the key's value
72//!     store.delete(k).await.unwrap();
73//!
74//!     // Fetch the value
75//!     let fetched_value = store.get(&k).await.unwrap();
76//!     assert!(fetched_value.is_none());
77//!
78//!     // Commit the operation to make it persistent
79//!     store.commit(None).await.unwrap();
80//!
81//!     // Destroy the store
82//!     store.destroy().await.unwrap();
83//! });
84//! ```
85
86use crate::{
87    index::{unordered::Index, Unordered as _},
88    journal::contiguous::{
89        variable::{Config as JournalConfig, Journal},
90        MutableContiguous as _,
91    },
92    mmr::{Location, Proof},
93    qmdb::{
94        any::{
95            unordered::{variable::Operation, Update},
96            VariableValue,
97        },
98        build_snapshot_from_log, create_key, delete_key,
99        operation::{Committable as _, Operation as _},
100        update_key, Error, FloorHelper,
101    },
102    translator::Translator,
103};
104use commonware_codec::{Codec, Read};
105use commonware_cryptography::Digest;
106use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage};
107use commonware_utils::Array;
108use core::{future::Future, ops::Range};
109use std::num::{NonZeroU64, NonZeroUsize};
110use tracing::{debug, warn};
111
112mod batch;
113#[cfg(test)]
114pub use batch::tests as batch_tests;
115pub use batch::{Batch, Batchable, Getter};
116
117/// Configuration for initializing a [Store] database.
118#[derive(Clone)]
119pub struct Config<T: Translator, C> {
120    /// The name of the [`Storage`] partition used to persist the log of operations.
121    pub log_partition: String,
122
123    /// The size of the write buffer to use for each blob in the [Journal].
124    pub log_write_buffer: NonZeroUsize,
125
126    /// Optional compression level (using `zstd`) to apply to log data before storing.
127    pub log_compression: Option<u8>,
128
129    /// The codec configuration to use for encoding and decoding log items.
130    pub log_codec_config: C,
131
132    /// The number of operations to store in each section of the [Journal].
133    pub log_items_per_section: NonZeroU64,
134
135    /// The [`Translator`] used by the compressed index.
136    pub translator: T,
137
138    /// The buffer pool to use for caching data.
139    pub buffer_pool: PoolRef,
140}
141
142/// A trait for any key-value store based on an append-only log of operations.
143pub trait LogStore {
144    type Value: Codec + Clone;
145
146    /// Returns true if there are no active keys in the database.
147    fn is_empty(&self) -> bool;
148
149    /// The number of operations that have been applied to this db, including those that have been
150    /// pruned and those that are not yet committed.
151    fn op_count(&self) -> Location;
152
153    /// Return the inactivity floor location. This is the location before which all operations are
154    /// known to be inactive. Operations before this point can be safely pruned.
155    fn inactivity_floor_loc(&self) -> Location;
156
157    /// Get the metadata associated with the last commit.
158    fn get_metadata(&self) -> impl Future<Output = Result<Option<Self::Value>, Error>>;
159}
160
161/// A trait for log stores that support pruning.
162pub trait LogStorePrunable: LogStore {
163    /// Prune historical operations prior to `prune_loc`.
164    fn prune(&mut self, prune_loc: Location) -> impl Future<Output = Result<(), Error>>;
165}
166
167/// A trait for authenticated stores in a "dirty" state with unmerkleized operations.
168pub trait DirtyStore: LogStore {
169    /// The digest type used for authentication.
170    type Digest: Digest;
171
172    /// The operation type stored in the log.
173    type Operation;
174
175    /// The clean state type that this dirty store transitions to.
176    type Clean: CleanStore<
177        Digest = Self::Digest,
178        Operation = Self::Operation,
179        Dirty = Self,
180        Value = Self::Value,
181    >;
182
183    /// Merkleize the store and compute the root digest.
184    ///
185    /// Consumes this dirty store and returns a clean store with the computed root.
186    fn merkleize(self) -> impl Future<Output = Result<Self::Clean, Error>>;
187}
188
189/// A trait for authenticated stores in a "clean" state where the MMR root is computed.
190pub trait CleanStore: LogStore {
191    /// The digest type used for authentication.
192    type Digest: Digest;
193
194    /// The operation type stored in the log.
195    type Operation;
196
197    /// The dirty state type that this clean store transitions to.
198    type Dirty: DirtyStore<
199        Digest = Self::Digest,
200        Operation = Self::Operation,
201        Clean = Self,
202        Value = Self::Value,
203    >;
204
205    /// Returns the root digest of the authenticated store.
206    fn root(&self) -> Self::Digest;
207
208    /// Generate and return:
209    ///  1. a proof of all operations applied to the store in the range starting at (and including)
210    ///     location `start_loc`, and ending at the first of either:
211    ///     - the last operation performed, or
212    ///     - the operation `max_ops` from the start.
213    ///  2. the operations corresponding to the leaves in this range.
214    ///
215    /// # Errors
216    ///
217    /// Returns [crate::mmr::Error::LocationOverflow] if `start_loc` > [crate::mmr::MAX_LOCATION].
218    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= `op_count`.
219    #[allow(clippy::type_complexity)]
220    fn proof(
221        &self,
222        start_loc: Location,
223        max_ops: NonZeroU64,
224    ) -> impl Future<Output = Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error>>;
225
226    /// Generate and return:
227    ///  1. a proof of all operations applied to the store in the range starting at (and including)
228    ///     location `start_loc`, and ending at the first of either:
229    ///     - the last operation performed, or
230    ///     - the operation `max_ops` from the start.
231    ///  2. the operations corresponding to the leaves in this range.
232    ///
233    /// for the store when it had `historical_size` operations.
234    ///
235    /// # Errors
236    ///
237    /// Returns [crate::mmr::Error::LocationOverflow] if `start_loc` > [crate::mmr::MAX_LOCATION].
238    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= `op_count`.
239    #[allow(clippy::type_complexity)]
240    fn historical_proof(
241        &self,
242        historical_size: Location,
243        start_loc: Location,
244        max_ops: NonZeroU64,
245    ) -> impl Future<Output = Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error>>;
246
247    /// Convert this clean store into its dirty counterpart for batched updates.
248    fn into_dirty(self) -> Self::Dirty;
249}
250
251/// An unauthenticated key-value database based off of an append-only [Journal] of operations.
252pub struct Store<E, K, V, T>
253where
254    E: Storage + Clock + Metrics,
255    K: Array,
256    V: VariableValue,
257    T: Translator,
258{
259    /// A log of all [Operation]s that have been applied to the store.
260    ///
261    /// # Invariants
262    ///
263    /// - There is always at least one commit operation in the log.
264    /// - The log is never pruned beyond the inactivity floor.
265    log: Journal<E, Operation<K, V>>,
266
267    /// A snapshot of all currently active operations in the form of a map from each key to the
268    /// location containing its most recent update.
269    ///
270    /// # Invariant
271    ///
272    /// Only references operations of type [Operation::Update].
273    snapshot: Index<T, Location>,
274
275    /// The number of active keys in the store.
276    active_keys: usize,
277
278    /// A location before which all operations are "inactive" (that is, operations before this point
279    /// are over keys that have been updated by some operation at or after this point).
280    inactivity_floor_loc: Location,
281
282    /// The number of _steps_ to raise the inactivity floor. Each step involves moving exactly one
283    /// active operation to tip.
284    steps: u64,
285
286    /// The location of the last commit operation.
287    last_commit_loc: Location,
288}
289
290/// Type alias for the shared state wrapper used by this Any database variant.
291type FloorHelperState<'a, E, K, V, T> =
292    FloorHelper<'a, Index<T, Location>, Journal<E, Operation<K, V>>>;
293
294impl<E, K, V, T> Store<E, K, V, T>
295where
296    E: Storage + Clock + Metrics,
297    K: Array,
298    V: VariableValue,
299    T: Translator,
300{
301    /// Initializes a new [`Store`] database with the given configuration.
302    ///
303    /// ## Rollback
304    ///
305    /// Any uncommitted operations will be rolled back if the [Store] was previously closed without
306    /// committing.
307    pub async fn init(
308        context: E,
309        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
310    ) -> Result<Self, Error> {
311        let mut log = Journal::<E, Operation<K, V>>::init(
312            context.with_label("log"),
313            JournalConfig {
314                partition: cfg.log_partition,
315                items_per_section: cfg.log_items_per_section,
316                compression: cfg.log_compression,
317                codec_config: cfg.log_codec_config,
318                buffer_pool: cfg.buffer_pool,
319                write_buffer: cfg.log_write_buffer,
320            },
321        )
322        .await?;
323
324        // Rewind log to remove uncommitted operations.
325        if log.rewind_to(|op| op.is_commit()).await? == 0 {
326            warn!("Log is empty, initializing new db");
327            log.append(Operation::CommitFloor(None, Location::new_unchecked(0)))
328                .await?;
329        }
330
331        // Sync the log to avoid having to repeat any recovery that may have been performed on next
332        // startup.
333        log.sync().await?;
334
335        let last_commit_loc =
336            Location::new_unchecked(log.size().checked_sub(1).expect("commit should exist"));
337        let op = log.read(*last_commit_loc).await?;
338        let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
339
340        // Build the snapshot.
341        let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
342        let active_keys =
343            build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, |_, _| {}).await?;
344
345        Ok(Self {
346            log,
347            snapshot,
348            active_keys,
349            inactivity_floor_loc,
350            steps: 0,
351            last_commit_loc,
352        })
353    }
354
355    /// Get the value of `key` in the db, or None if it has no value.
356    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
357        for &loc in self.snapshot.get(key) {
358            let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
359                unreachable!("location ({loc}) does not reference update operation");
360            };
361
362            if &k == key {
363                return Ok(Some(v));
364            }
365        }
366
367        Ok(None)
368    }
369
370    const fn as_floor_helper(&mut self) -> FloorHelperState<'_, E, K, V, T> {
371        FloorHelper {
372            snapshot: &mut self.snapshot,
373            log: &mut self.log,
374        }
375    }
376
377    /// Whether the db currently has no active keys.
378    pub const fn is_empty(&self) -> bool {
379        self.active_keys == 0
380    }
381
382    /// Gets a [Operation] from the log at the given location. Returns [Error::OperationPruned]
383    /// if the location precedes the oldest retained location. The location is otherwise assumed
384    /// valid.
385    async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
386        assert!(loc < self.op_count());
387
388        // Get the operation from the log at the specified position.
389        // The journal will return ItemPruned if the location is pruned.
390        self.log.read(*loc).await.map_err(|e| match e {
391            crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
392            e => Error::Journal(e),
393        })
394    }
395
396    /// The number of operations that have been applied to this db, including those that have been
397    /// pruned and those that are not yet committed.
398    pub const fn op_count(&self) -> Location {
399        Location::new_unchecked(self.log.size())
400    }
401
402    /// Return the inactivity floor location. This is the location before which all operations are
403    /// known to be inactive. Operations before this point can be safely pruned.
404    pub const fn inactivity_floor_loc(&self) -> Location {
405        self.inactivity_floor_loc
406    }
407
408    /// Get the metadata associated with the last commit.
409    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
410        let Operation::CommitFloor(metadata, _) = self.log.read(*self.last_commit_loc).await?
411        else {
412            unreachable!("last commit should be a commit floor operation");
413        };
414
415        Ok(metadata)
416    }
417
418    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
419    /// subject to rollback until the next successful `commit`.
420    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
421        let new_loc = self.op_count();
422        if update_key(&mut self.snapshot, &self.log, &key, new_loc)
423            .await?
424            .is_some()
425        {
426            self.steps += 1;
427        } else {
428            self.active_keys += 1;
429        }
430
431        self.log
432            .append(Operation::Update(Update(key, value)))
433            .await?;
434
435        Ok(())
436    }
437
438    /// Creates a new key-value pair in the db. The operation is reflected in the snapshot, but will
439    /// be subject to rollback until the next successful `commit`. Returns true if the key was
440    /// created, false if it already existed.
441    pub async fn create(&mut self, key: K, value: V) -> Result<bool, Error> {
442        let new_loc = self.op_count();
443        if !create_key(&mut self.snapshot, &self.log, &key, new_loc).await? {
444            return Ok(false);
445        }
446
447        self.active_keys += 1;
448        self.log
449            .append(Operation::Update(Update(key, value)))
450            .await?;
451
452        Ok(true)
453    }
454
455    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
456    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
457    /// successful `commit`. Returns true if the key was deleted, false if it was already inactive.
458    pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
459        let r = delete_key(&mut self.snapshot, &self.log, &key).await?;
460        if r.is_none() {
461            return Ok(false);
462        }
463
464        self.log.append(Operation::Delete(key)).await?;
465        self.steps += 1;
466        self.active_keys -= 1;
467
468        Ok(true)
469    }
470
471    /// Commit any pending operations to the database, ensuring their durability upon return from
472    /// this function. Also raises the inactivity floor according to the schedule. Returns the
473    /// `(start_loc, end_loc]` location range of committed operations. The end of the returned range
474    /// includes the commit operation itself, and hence will always be equal to `op_count`.
475    ///
476    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
477    /// recover the database on restart.
478    pub async fn commit(&mut self, metadata: Option<V>) -> Result<Range<Location>, Error> {
479        let start_loc = self.last_commit_loc + 1;
480
481        // Raise the inactivity floor by taking `self.steps` steps, plus 1 to account for the
482        // previous commit becoming inactive.
483        if self.is_empty() {
484            self.inactivity_floor_loc = self.op_count();
485            debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
486        } else {
487            let steps_to_take = self.steps + 1;
488            for _ in 0..steps_to_take {
489                let loc = self.inactivity_floor_loc;
490                self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
491            }
492        }
493        self.steps = 0;
494
495        // Apply the commit operation with the new inactivity floor.
496        self.last_commit_loc = Location::new_unchecked(
497            self.log
498                .append(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
499                .await?,
500        );
501
502        // Commit the log to ensure this commit is durable.
503        self.log.commit().await?;
504
505        Ok(start_loc..self.op_count())
506    }
507
508    /// Sync all database state to disk. While this isn't necessary to ensure durability of
509    /// committed operations, periodic invocation may reduce memory usage and the time required to
510    /// recover the database on restart.
511    pub async fn sync(&mut self) -> Result<(), Error> {
512        self.log.sync().await.map_err(Into::into)
513    }
514
515    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root
516    /// or current snapshot.
517    pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
518        if prune_loc > self.inactivity_floor_loc {
519            return Err(Error::PruneBeyondMinRequired(
520                prune_loc,
521                self.inactivity_floor_loc,
522            ));
523        }
524
525        // Prune the log. The log will prune at section boundaries, so the actual oldest retained
526        // location may be less than requested.
527        if !self.log.prune(*prune_loc).await? {
528            return Ok(());
529        }
530
531        debug!(
532            log_size = ?self.op_count(),
533            oldest_retained_loc = ?self.log.oldest_retained_pos(),
534            ?prune_loc,
535            "pruned inactive ops"
536        );
537
538        Ok(())
539    }
540
541    /// Close the db. Operations that have not been committed will be lost or rolled back on
542    /// restart.
543    pub async fn close(self) -> Result<(), Error> {
544        self.log.close().await.map_err(Into::into)
545    }
546
547    /// Destroy the db, removing all data from disk.
548    pub async fn destroy(self) -> Result<(), Error> {
549        self.log.destroy().await.map_err(Into::into)
550    }
551}
552
553impl<E, K, V, T> LogStorePrunable for Store<E, K, V, T>
554where
555    E: Storage + Clock + Metrics,
556    K: Array,
557    V: VariableValue,
558    T: Translator,
559{
560    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
561        self.prune(prune_loc).await
562    }
563}
564
565impl<E, K, V, T> crate::store::StorePersistable for Store<E, K, V, T>
566where
567    E: Storage + Clock + Metrics,
568    K: Array,
569    V: VariableValue,
570    T: Translator,
571{
572    async fn commit(&mut self) -> Result<(), Error> {
573        self.commit(None).await.map(|_| ())
574    }
575
576    async fn destroy(self) -> Result<(), Error> {
577        self.destroy().await
578    }
579}
580
581impl<E, K, V, T> LogStore for Store<E, K, V, T>
582where
583    E: Storage + Clock + Metrics,
584    K: Array,
585    V: VariableValue,
586    T: Translator,
587{
588    type Value = V;
589
590    fn op_count(&self) -> Location {
591        self.op_count()
592    }
593
594    fn inactivity_floor_loc(&self) -> Location {
595        self.inactivity_floor_loc()
596    }
597
598    async fn get_metadata(&self) -> Result<Option<V>, Error> {
599        self.get_metadata().await
600    }
601
602    fn is_empty(&self) -> bool {
603        self.is_empty()
604    }
605}
606
607impl<E, K, V, T> crate::store::Store for Store<E, K, V, T>
608where
609    E: Storage + Clock + Metrics,
610    K: Array,
611    V: VariableValue,
612    T: Translator,
613{
614    type Key = K;
615    type Value = V;
616    type Error = Error;
617
618    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
619        self.get(key).await
620    }
621}
622
623impl<E, K, V, T> crate::store::StoreMut for Store<E, K, V, T>
624where
625    E: Storage + Clock + Metrics,
626    K: Array,
627    V: VariableValue,
628    T: Translator,
629{
630    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
631        self.update(key, value).await
632    }
633}
634
635impl<E, K, V, T> crate::store::StoreDeletable for Store<E, K, V, T>
636where
637    E: Storage + Clock + Metrics,
638    K: Array,
639    V: VariableValue,
640    T: Translator,
641{
642    async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
643        self.delete(key).await
644    }
645}
646
647impl<E, K, V, T> Batchable for Store<E, K, V, T>
648where
649    E: Storage + Clock + Metrics,
650    K: Array,
651    V: VariableValue,
652    T: Translator,
653{
654}
655
656#[cfg(test)]
657mod test {
658    use super::*;
659    use crate::{qmdb::store::batch_tests, store::StoreMut as _, translator::TwoCap};
660    use commonware_cryptography::{
661        blake3::{Blake3, Digest},
662        Hasher as _,
663    };
664    use commonware_macros::test_traced;
665    use commonware_math::algebra::Random;
666    use commonware_runtime::{deterministic, Runner};
667    use commonware_utils::{NZUsize, NZU64};
668
669    const PAGE_SIZE: usize = 77;
670    const PAGE_CACHE_SIZE: usize = 9;
671
672    /// The type of the store used in tests.
673    type TestStore = Store<deterministic::Context, Digest, Vec<u8>, TwoCap>;
674
675    async fn create_test_store(context: deterministic::Context) -> TestStore {
676        let cfg = Config {
677            log_partition: "journal".to_string(),
678            log_write_buffer: NZUsize!(64 * 1024),
679            log_compression: None,
680            log_codec_config: ((0..=10000).into(), ()),
681            log_items_per_section: NZU64!(7),
682            translator: TwoCap,
683            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
684        };
685        Store::init(context, cfg).await.unwrap()
686    }
687
688    #[test_traced("DEBUG")]
689    pub fn test_store_construct_empty() {
690        let executor = deterministic::Runner::default();
691        executor.start(|mut context| async move {
692            let mut db = create_test_store(context.clone()).await;
693            assert_eq!(db.op_count(), 1);
694            assert_eq!(db.log.oldest_retained_pos(), Some(0));
695            assert_eq!(db.inactivity_floor_loc(), 0);
696            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
697            assert!(matches!(
698                db.prune(Location::new_unchecked(1)).await,
699                Err(Error::PruneBeyondMinRequired(_, _))
700            ));
701            assert!(db.get_metadata().await.unwrap().is_none());
702
703            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
704            let d1 = Digest::random(&mut context);
705            let v1 = vec![1, 2, 3];
706            db.update(d1, v1).await.unwrap();
707            db.close().await.unwrap();
708            let mut db = create_test_store(context.clone()).await;
709            assert_eq!(db.op_count(), 1);
710
711            // Test calling commit on an empty db which should make it (durably) non-empty.
712            let metadata = vec![1, 2, 3];
713            let range = db.commit(Some(metadata.clone())).await.unwrap();
714            assert_eq!(range.start, 1);
715            assert_eq!(range.end, 2);
716            assert_eq!(db.op_count(), 2);
717            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
718            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
719
720            let mut db = create_test_store(context.clone()).await;
721            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
722
723            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits on a
724            // non-empty db.
725            db.update(Digest::random(&mut context), vec![1, 2, 3])
726                .await
727                .unwrap();
728            for _ in 1..100 {
729                db.commit(None).await.unwrap();
730                // Distance should equal 3 after the second commit, with inactivity_floor
731                // referencing the previous commit operation.
732                assert!(db.op_count() - db.inactivity_floor_loc <= 3);
733                assert!(db.get_metadata().await.unwrap().is_none());
734            }
735
736            db.destroy().await.unwrap();
737        });
738    }
739
740    #[test_traced("DEBUG")]
741    fn test_store_construct_basic() {
742        let executor = deterministic::Runner::default();
743
744        executor.start(|mut ctx| async move {
745            let mut store = create_test_store(ctx.with_label("store")).await;
746
747            // Ensure the store is empty
748            assert_eq!(store.op_count(), 1);
749            assert_eq!(store.inactivity_floor_loc, 0);
750
751            let key = Digest::random(&mut ctx);
752            let value = vec![2, 3, 4, 5];
753
754            // Attempt to get a key that does not exist
755            let result = store.get(&key).await;
756            assert!(result.unwrap().is_none());
757
758            // Insert a key-value pair
759            store.update(key, value.clone()).await.unwrap();
760
761            assert_eq!(store.op_count(), 2);
762            assert_eq!(store.inactivity_floor_loc, 0);
763
764            // Fetch the value
765            let fetched_value = store.get(&key).await.unwrap();
766            assert_eq!(fetched_value.unwrap(), value);
767
768            // Sync the store to persist the changes
769            store.sync().await.unwrap();
770
771            // Re-open the store
772            let mut store = create_test_store(ctx.with_label("store")).await;
773
774            // Ensure the re-opened store removed the uncommitted operations
775            assert_eq!(store.op_count(), 1);
776            assert_eq!(store.inactivity_floor_loc, 0);
777            assert!(store.get_metadata().await.unwrap().is_none());
778
779            // Insert a key-value pair
780            store.update(key, value.clone()).await.unwrap();
781
782            assert_eq!(store.op_count(), 2);
783            assert_eq!(store.inactivity_floor_loc, 0);
784
785            // Persist the changes
786            let metadata = vec![99, 100];
787            let range = store.commit(Some(metadata.clone())).await.unwrap();
788            assert_eq!(range.start, 1);
789            assert_eq!(range.end, 4);
790            assert_eq!(store.get_metadata().await.unwrap(), Some(metadata.clone()));
791
792            // Even though the store was pruned, the inactivity floor was raised by 2, and
793            // the old operations remain in the same blob as an active operation, so they're
794            // retained.
795            assert_eq!(store.op_count(), 4);
796            assert_eq!(store.inactivity_floor_loc, 2);
797
798            // Re-open the store
799            let mut store = create_test_store(ctx.with_label("store")).await;
800
801            // Ensure the re-opened store retained the committed operations
802            assert_eq!(store.op_count(), 4);
803            assert_eq!(store.inactivity_floor_loc, 2);
804
805            // Fetch the value, ensuring it is still present
806            let fetched_value = store.get(&key).await.unwrap();
807            assert_eq!(fetched_value.unwrap(), value);
808
809            // Insert two new k/v pairs to force pruning of the first section.
810            let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
811            let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
812            store.update(k1, v1.clone()).await.unwrap();
813            store.update(k2, v2.clone()).await.unwrap();
814
815            assert_eq!(store.op_count(), 6);
816            assert_eq!(store.inactivity_floor_loc, 2);
817
818            // Make sure we can still get metadata.
819            assert_eq!(store.get_metadata().await.unwrap(), Some(metadata));
820
821            let range = store.commit(None).await.unwrap();
822            assert_eq!(range.start, 4);
823            assert_eq!(range.end, store.op_count());
824            assert_eq!(store.get_metadata().await.unwrap(), None);
825
826            assert_eq!(store.op_count(), 8);
827            assert_eq!(store.inactivity_floor_loc, 3);
828
829            // Ensure all keys can be accessed, despite the first section being pruned.
830            assert_eq!(store.get(&key).await.unwrap().unwrap(), value);
831            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
832            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
833
834            // Ensure upsert works for existing key.
835            store.upsert(k1, |v| v.push(7)).await.unwrap();
836            assert_eq!(
837                store.get(&k1).await.unwrap().unwrap(),
838                vec![2, 3, 4, 5, 6, 7]
839            );
840
841            // Ensure upsert works for new key.
842            let k3 = Digest::random(&mut ctx);
843            store.upsert(k3, |v| v.push(8)).await.unwrap();
844            assert_eq!(store.get(&k3).await.unwrap().unwrap(), vec![8]);
845
846            // Destroy the store
847            store.destroy().await.unwrap();
848        });
849    }
850
851    #[test_traced("DEBUG")]
852    fn test_store_log_replay() {
853        let executor = deterministic::Runner::default();
854
855        executor.start(|mut ctx| async move {
856            let mut store = create_test_store(ctx.with_label("store")).await;
857
858            // Update the same key many times.
859            const UPDATES: u64 = 100;
860            let k = Digest::random(&mut ctx);
861            for _ in 0..UPDATES {
862                let v = vec![1, 2, 3, 4, 5];
863                store.update(k, v.clone()).await.unwrap();
864            }
865
866            let iter = store.snapshot.get(&k);
867            assert_eq!(iter.count(), 1);
868
869            store.commit(None).await.unwrap();
870            store.close().await.unwrap();
871
872            // Re-open the store, prune it, then ensure it replays the log correctly.
873            let mut store = create_test_store(ctx.with_label("store")).await;
874            store.prune(store.inactivity_floor_loc()).await.unwrap();
875
876            let iter = store.snapshot.get(&k);
877            assert_eq!(iter.count(), 1);
878
879            // 100 operations were applied, each triggering one step, plus the commit op.
880            assert_eq!(store.op_count(), UPDATES * 2 + 2);
881            // Only the highest `Update` operation is active, plus the commit operation above it.
882            let expected_floor = UPDATES * 2;
883            assert_eq!(store.inactivity_floor_loc, expected_floor);
884
885            // All blobs prior to the inactivity floor are pruned, so the oldest retained location
886            // is the first in the last retained blob.
887            assert_eq!(
888                store.log.oldest_retained_pos(),
889                Some(expected_floor - expected_floor % 7)
890            );
891
892            store.destroy().await.unwrap();
893        });
894    }
895
896    #[test_traced("DEBUG")]
897    fn test_store_build_snapshot_keys_with_shared_prefix() {
898        let executor = deterministic::Runner::default();
899
900        executor.start(|mut ctx| async move {
901            let mut store = create_test_store(ctx.with_label("store")).await;
902
903            let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
904            let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
905
906            // Ensure k2 shares 2 bytes with k1 (test DB uses `TwoCap` translator.)
907            k2.0[0..2].copy_from_slice(&k1.0[0..2]);
908
909            store.update(k1, v1.clone()).await.unwrap();
910            store.update(k2, v2.clone()).await.unwrap();
911
912            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
913            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
914
915            store.commit(None).await.unwrap();
916            store.close().await.unwrap();
917
918            // Re-open the store to ensure it builds the snapshot for the conflicting
919            // keys correctly.
920            let store = create_test_store(ctx.with_label("store")).await;
921
922            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
923            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
924
925            store.destroy().await.unwrap();
926        });
927    }
928
929    #[test_traced("DEBUG")]
930    fn test_store_delete() {
931        let executor = deterministic::Runner::default();
932
933        executor.start(|mut ctx| async move {
934            let mut store = create_test_store(ctx.with_label("store")).await;
935
936            // Insert a key-value pair
937            let k = Digest::random(&mut ctx);
938            let v = vec![1, 2, 3, 4, 5];
939            store.update(k, v.clone()).await.unwrap();
940
941            // Fetch the value
942            let fetched_value = store.get(&k).await.unwrap();
943            assert_eq!(fetched_value.unwrap(), v);
944
945            // Delete the key
946            assert!(store.delete(k).await.unwrap());
947
948            // Ensure the key is no longer present
949            let fetched_value = store.get(&k).await.unwrap();
950            assert!(fetched_value.is_none());
951            assert!(!store.delete(k).await.unwrap());
952
953            // Commit the changes
954            store.commit(None).await.unwrap();
955
956            // Re-open the store and ensure the key is still deleted
957            let mut store = create_test_store(ctx.with_label("store")).await;
958            let fetched_value = store.get(&k).await.unwrap();
959            assert!(fetched_value.is_none());
960
961            // Re-insert the key
962            store.update(k, v.clone()).await.unwrap();
963            let fetched_value = store.get(&k).await.unwrap();
964            assert_eq!(fetched_value.unwrap(), v);
965
966            // Commit the changes
967            store.commit(None).await.unwrap();
968
969            // Re-open the store and ensure the snapshot restores the key, after processing
970            // the delete and the subsequent set.
971            let mut store = create_test_store(ctx.with_label("store")).await;
972            let fetched_value = store.get(&k).await.unwrap();
973            assert_eq!(fetched_value.unwrap(), v);
974
975            // Delete a non-existent key (no-op)
976            let k_n = Digest::random(&mut ctx);
977            store.delete(k_n).await.unwrap();
978
979            let iter = store.snapshot.get(&k);
980            assert_eq!(iter.count(), 1);
981
982            let iter = store.snapshot.get(&k_n);
983            assert_eq!(iter.count(), 0);
984
985            store.destroy().await.unwrap();
986        });
987    }
988
989    /// Tests the pruning example in the module documentation.
990    #[test_traced("DEBUG")]
991    fn test_store_pruning() {
992        let executor = deterministic::Runner::default();
993
994        executor.start(|mut ctx| async move {
995            let mut store = create_test_store(ctx.with_label("store")).await;
996
997            let k_a = Digest::random(&mut ctx);
998            let k_b = Digest::random(&mut ctx);
999
1000            let v_a = vec![1];
1001            let v_b = vec![];
1002            let v_c = vec![4, 5, 6];
1003
1004            store.update(k_a, v_a.clone()).await.unwrap();
1005            store.update(k_b, v_b.clone()).await.unwrap();
1006
1007            store.commit(None).await.unwrap();
1008            assert_eq!(store.op_count(), 5);
1009            assert_eq!(store.inactivity_floor_loc, 2);
1010            assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_a);
1011
1012            store.update(k_b, v_a.clone()).await.unwrap();
1013            store.update(k_a, v_c.clone()).await.unwrap();
1014
1015            store.commit(None).await.unwrap();
1016            assert_eq!(store.op_count(), 11);
1017            assert_eq!(store.inactivity_floor_loc, 8);
1018            assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_c);
1019            assert_eq!(store.get(&k_b).await.unwrap().unwrap(), v_a);
1020
1021            store.destroy().await.unwrap();
1022        });
1023    }
1024
1025    #[test_traced("WARN")]
1026    pub fn test_store_db_recovery() {
1027        let executor = deterministic::Runner::default();
1028        // Build a db with 1000 keys, some of which we update and some of which we delete.
1029        const ELEMENTS: u64 = 1000;
1030        executor.start(|context| async move {
1031            let mut db = create_test_store(context.with_label("store")).await;
1032
1033            for i in 0u64..ELEMENTS {
1034                let k = Blake3::hash(&i.to_be_bytes());
1035                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1036                db.update(k, v.clone()).await.unwrap();
1037            }
1038
1039            // Simulate a failed commit and test that we rollback to the previous root.
1040            drop(db);
1041            let mut db = create_test_store(context.with_label("store")).await;
1042            assert_eq!(db.op_count(), 1);
1043
1044            // re-apply the updates and commit them this time.
1045            for i in 0u64..ELEMENTS {
1046                let k = Blake3::hash(&i.to_be_bytes());
1047                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1048                db.update(k, v.clone()).await.unwrap();
1049            }
1050            db.commit(None).await.unwrap();
1051            let op_count = db.op_count();
1052
1053            // Update every 3rd key
1054            for i in 0u64..ELEMENTS {
1055                if i % 3 != 0 {
1056                    continue;
1057                }
1058                let k = Blake3::hash(&i.to_be_bytes());
1059                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1060                db.update(k, v.clone()).await.unwrap();
1061            }
1062
1063            // Simulate a failed commit and test that we rollback to the previous root.
1064            drop(db);
1065            let mut db = create_test_store(context.with_label("store")).await;
1066            assert_eq!(db.op_count(), op_count);
1067
1068            // Re-apply updates for every 3rd key and commit them this time.
1069            for i in 0u64..ELEMENTS {
1070                if i % 3 != 0 {
1071                    continue;
1072                }
1073                let k = Blake3::hash(&i.to_be_bytes());
1074                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1075                db.update(k, v.clone()).await.unwrap();
1076            }
1077            db.commit(None).await.unwrap();
1078            let op_count = db.op_count();
1079            assert_eq!(op_count, 1673);
1080            assert_eq!(db.snapshot.items(), 1000);
1081
1082            // Delete every 7th key
1083            for i in 0u64..ELEMENTS {
1084                if i % 7 != 1 {
1085                    continue;
1086                }
1087                let k = Blake3::hash(&i.to_be_bytes());
1088                db.delete(k).await.unwrap();
1089            }
1090
1091            // Simulate a failed commit and test that we rollback to the previous root.
1092            drop(db);
1093            let db = create_test_store(context.with_label("store")).await;
1094            assert_eq!(db.op_count(), op_count);
1095
1096            // Close and reopen the store to ensure the final commit is preserved.
1097            db.close().await.unwrap();
1098            let mut db = create_test_store(context.with_label("store")).await;
1099            assert_eq!(db.op_count(), op_count);
1100
1101            // Re-delete every 7th key and commit this time.
1102            for i in 0u64..ELEMENTS {
1103                if i % 7 != 1 {
1104                    continue;
1105                }
1106                let k = Blake3::hash(&i.to_be_bytes());
1107                db.delete(k).await.unwrap();
1108            }
1109            db.commit(None).await.unwrap();
1110
1111            assert_eq!(db.op_count(), 1961);
1112            assert_eq!(db.inactivity_floor_loc, 756);
1113
1114            db.prune(db.inactivity_floor_loc()).await.unwrap();
1115            assert_eq!(db.log.oldest_retained_pos(), Some(756 /*- 756 % 7 == 0*/));
1116            assert_eq!(db.snapshot.items(), 857);
1117
1118            db.destroy().await.unwrap();
1119        });
1120    }
1121
1122    #[test_traced("DEBUG")]
1123    fn test_batch() {
1124        batch_tests::test_batch(
1125            |ctx| async move { create_test_store(ctx.with_label("batch")).await },
1126        );
1127    }
1128}