commonware_storage/qmdb/store/
db.rs

1//! A mutable key-value database that supports variable-sized values, but without authentication.
2//!
3//! # Lifecycle
4//!
5//! Unlike authenticated stores which have 4 potential states, an unauthenticated store only has
6//! two:
7//!
8//! - **Clean**: The store has no uncommitted operations and its key/value state is immutable. Use
9//!   `into_dirty` to transform it into a dirty state.
10//!
11//! - **Dirty**: The store has uncommitted operations and its key/value state is mutable. Use
12//!   `commit` to transform it into a clean state.
13//!
14//! # Example
15//!
16//! ```rust
17//! use commonware_storage::{
18//!     qmdb::store::db::{Config, Db},
19//!     translator::TwoCap,
20//! };
21//! use commonware_utils::{NZUsize, NZU16, NZU64};
22//! use commonware_cryptography::{blake3::Digest, Digest as _};
23//! use commonware_math::algebra::Random;
24//! use commonware_runtime::{buffer::PoolRef, deterministic::Runner, Metrics, Runner as _};
25//!
26//! use std::num::NonZeroU16;
27//! const PAGE_SIZE: NonZeroU16 = NZU16!(8192);
28//! const PAGE_CACHE_SIZE: usize = 100;
29//!
30//! let executor = Runner::default();
31//! executor.start(|mut ctx| async move {
32//!     let config = Config {
33//!         log_partition: "test_partition".to_string(),
34//!         log_write_buffer: NZUsize!(64 * 1024),
35//!         log_compression: None,
36//!         log_codec_config: (),
37//!         log_items_per_section: NZU64!(4),
38//!         translator: TwoCap,
39//!         buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
40//!     };
41//!     let db =
42//!         Db::<_, Digest, Digest, TwoCap>::init(ctx.with_label("store"), config)
43//!             .await
44//!             .unwrap();
45//!
46//!     // Insert a key-value pair
47//!     let k = Digest::random(&mut ctx);
48//!     let v = Digest::random(&mut ctx);
49//!     let mut db = db.into_dirty();
50//!     db.update(k, v).await.unwrap();
51//!
52//!     // Fetch the value
53//!     let fetched_value = db.get(&k).await.unwrap();
54//!     assert_eq!(fetched_value.unwrap(), v);
55//!
56//!     // Commit the operation to make it persistent
57//!     let metadata = Some(Digest::random(&mut ctx));
58//!     let (db, _) = db.commit(metadata).await.unwrap();
59//!
60//!     // Delete the key's value
61//!     let mut db = db.into_dirty();
62//!     db.delete(k).await.unwrap();
63//!
64//!     // Fetch the value
65//!     let fetched_value = db.get(&k).await.unwrap();
66//!     assert!(fetched_value.is_none());
67//!
68//!     // Commit the operation to make it persistent
69//!     let (db, _) = db.commit(None).await.unwrap();
70//!
71//!     // Destroy the store
72//!     db.destroy().await.unwrap();
73//! });
74//! ```
75
76use crate::{
77    index::{unordered::Index, Unordered as _},
78    journal::contiguous::{
79        variable::{Config as JournalConfig, Journal},
80        MutableContiguous as _,
81    },
82    kv::{Batchable, Deletable, Updatable},
83    mmr::Location,
84    qmdb::{
85        any::{
86            unordered::{variable::Operation, Update},
87            VariableValue,
88        },
89        build_snapshot_from_log, create_key, delete_key,
90        operation::{Committable as _, Operation as _},
91        store::{Durable, LogStore, NonDurable, PrunableStore, State},
92        update_key, Error, FloorHelper,
93    },
94    translator::Translator,
95    Persistable,
96};
97use commonware_codec::Read;
98use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage};
99use commonware_utils::Array;
100use core::ops::Range;
101use std::num::{NonZeroU64, NonZeroUsize};
102use tracing::{debug, warn};
103
104/// Configuration for initializing a [Db].
105#[derive(Clone)]
106pub struct Config<T: Translator, C> {
107    /// The name of the [Storage] partition used to persist the log of operations.
108    pub log_partition: String,
109
110    /// The size of the write buffer to use for each blob in the [Journal].
111    pub log_write_buffer: NonZeroUsize,
112
113    /// Optional compression level (using `zstd`) to apply to log data before storing.
114    pub log_compression: Option<u8>,
115
116    /// The codec configuration to use for encoding and decoding log items.
117    pub log_codec_config: C,
118
119    /// The number of operations to store in each section of the [Journal].
120    pub log_items_per_section: NonZeroU64,
121
122    /// The [Translator] used by the [Index].
123    pub translator: T,
124
125    /// The [PoolRef] to use for caching data.
126    pub buffer_pool: PoolRef,
127}
128
129/// An unauthenticated key-value database based off of an append-only [Journal] of operations.
130pub struct Db<E, K, V, T, S = Durable>
131where
132    E: Storage + Clock + Metrics,
133    K: Array,
134    V: VariableValue,
135    T: Translator,
136    S: State,
137{
138    /// A log of all [Operation]s that have been applied to the store.
139    ///
140    /// # Invariants
141    ///
142    /// - There is always at least one commit operation in the log.
143    /// - The log is never pruned beyond the inactivity floor.
144    log: Journal<E, Operation<K, V>>,
145
146    /// A snapshot of all currently active operations in the form of a map from each key to the
147    /// location containing its most recent update.
148    ///
149    /// # Invariant
150    ///
151    /// Only references operations of type [Operation::Update].
152    snapshot: Index<T, Location>,
153
154    /// The number of active keys in the store.
155    active_keys: usize,
156
157    /// A location before which all operations are "inactive" (that is, operations before this point
158    /// are over keys that have been updated by some operation at or after this point).
159    pub inactivity_floor_loc: Location,
160
161    /// The location of the last commit operation.
162    pub last_commit_loc: Location,
163
164    /// The state of the store.
165    pub state: S,
166}
167
168impl<E, K, V, T, S> Db<E, K, V, T, S>
169where
170    E: Storage + Clock + Metrics,
171    K: Array,
172    V: VariableValue,
173    T: Translator,
174    S: State,
175{
176    /// Get the value of `key` in the db, or None if it has no value.
177    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
178        for &loc in self.snapshot.get(key) {
179            let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
180                unreachable!("location ({loc}) does not reference update operation");
181            };
182
183            if &k == key {
184                return Ok(Some(v));
185            }
186        }
187
188        Ok(None)
189    }
190
191    /// Whether the db currently has no active keys.
192    pub const fn is_empty(&self) -> bool {
193        self.active_keys == 0
194    }
195
196    /// Gets a [Operation] from the log at the given location. Returns [Error::OperationPruned]
197    /// if the location precedes the oldest retained location. The location is otherwise assumed
198    /// valid.
199    async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
200        assert!(loc < self.op_count());
201
202        // Get the operation from the log at the specified position.
203        // The journal will return ItemPruned if the location is pruned.
204        self.log.read(*loc).await.map_err(|e| match e {
205            crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
206            e => Error::Journal(e),
207        })
208    }
209
210    /// The number of operations that have been applied to this db, including those that have been
211    /// pruned and those that are not yet committed.
212    pub const fn op_count(&self) -> Location {
213        Location::new_unchecked(self.log.size())
214    }
215
216    /// Return the inactivity floor location. This is the location before which all operations are
217    /// known to be inactive. Operations before this point can be safely pruned.
218    pub const fn inactivity_floor_loc(&self) -> Location {
219        self.inactivity_floor_loc
220    }
221
222    /// Get the metadata associated with the last commit.
223    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
224        let Operation::CommitFloor(metadata, _) = self.log.read(*self.last_commit_loc).await?
225        else {
226            unreachable!("last commit should be a commit floor operation");
227        };
228
229        Ok(metadata)
230    }
231
232    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root
233    /// or current snapshot.
234    pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
235        if prune_loc > self.inactivity_floor_loc {
236            return Err(Error::PruneBeyondMinRequired(
237                prune_loc,
238                self.inactivity_floor_loc,
239            ));
240        }
241
242        // Prune the log. The log will prune at section boundaries, so the actual oldest retained
243        // location may be less than requested.
244        if !self.log.prune(*prune_loc).await? {
245            return Ok(());
246        }
247
248        debug!(
249            log_size = ?self.op_count(),
250            oldest_retained_loc = ?self.log.oldest_retained_pos(),
251            ?prune_loc,
252            "pruned inactive ops"
253        );
254
255        Ok(())
256    }
257}
258
259impl<E, K, V, T> Db<E, K, V, T, Durable>
260where
261    E: Storage + Clock + Metrics,
262    K: Array,
263    V: VariableValue,
264    T: Translator,
265{
266    /// Initializes a new [Db] with the given configuration.
267    pub async fn init(
268        context: E,
269        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
270    ) -> Result<Self, Error> {
271        let mut log = Journal::<E, Operation<K, V>>::init(
272            context.with_label("log"),
273            JournalConfig {
274                partition: cfg.log_partition,
275                items_per_section: cfg.log_items_per_section,
276                compression: cfg.log_compression,
277                codec_config: cfg.log_codec_config,
278                buffer_pool: cfg.buffer_pool,
279                write_buffer: cfg.log_write_buffer,
280            },
281        )
282        .await?;
283
284        // Rewind log to remove uncommitted operations.
285        if log.rewind_to(|op| op.is_commit()).await? == 0 {
286            warn!("Log is empty, initializing new db");
287            log.append(Operation::CommitFloor(None, Location::new_unchecked(0)))
288                .await?;
289        }
290
291        // Sync the log to avoid having to repeat any recovery that may have been performed on next
292        // startup.
293        log.sync().await?;
294
295        let last_commit_loc =
296            Location::new_unchecked(log.size().checked_sub(1).expect("commit should exist"));
297        let op = log.read(*last_commit_loc).await?;
298        let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
299
300        // Build the snapshot.
301        let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
302        let active_keys =
303            build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, |_, _| {}).await?;
304
305        Ok(Self {
306            log,
307            snapshot,
308            active_keys,
309            inactivity_floor_loc,
310            last_commit_loc,
311            state: Durable,
312        })
313    }
314
315    /// Convert this clean store into its dirty counterpart for making updates.
316    pub fn into_dirty(self) -> Db<E, K, V, T, NonDurable> {
317        Db {
318            log: self.log,
319            snapshot: self.snapshot,
320            active_keys: self.active_keys,
321            inactivity_floor_loc: self.inactivity_floor_loc,
322            last_commit_loc: self.last_commit_loc,
323            state: NonDurable::default(),
324        }
325    }
326
327    /// Sync all database state to disk. While this isn't necessary to ensure durability of
328    /// committed operations, periodic invocation may reduce memory usage and the time required to
329    /// recover the database on restart.
330    pub async fn sync(&mut self) -> Result<(), Error> {
331        self.log.sync().await.map_err(Into::into)
332    }
333
334    /// Destroy the db, removing all data from disk.
335    pub async fn destroy(self) -> Result<(), Error> {
336        self.log.destroy().await.map_err(Into::into)
337    }
338}
339
340impl<E, K, V, T> Db<E, K, V, T, NonDurable>
341where
342    E: Storage + Clock + Metrics,
343    K: Array,
344    V: VariableValue,
345    T: Translator,
346{
347    const fn as_floor_helper(
348        &mut self,
349    ) -> FloorHelper<'_, Index<T, Location>, Journal<E, Operation<K, V>>> {
350        FloorHelper {
351            snapshot: &mut self.snapshot,
352            log: &mut self.log,
353        }
354    }
355
356    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
357    /// subject to rollback until the next successful `commit`.
358    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
359        let new_loc = self.op_count();
360        if update_key(&mut self.snapshot, &self.log, &key, new_loc)
361            .await?
362            .is_some()
363        {
364            self.state.steps += 1;
365        } else {
366            self.active_keys += 1;
367        }
368
369        self.log
370            .append(Operation::Update(Update(key, value)))
371            .await?;
372
373        Ok(())
374    }
375
376    /// Creates a new key-value pair in the db. The operation is reflected in the snapshot, but will
377    /// be subject to rollback until the next successful `commit`. Returns true if the key was
378    /// created, false if it already existed.
379    pub async fn create(&mut self, key: K, value: V) -> Result<bool, Error> {
380        let new_loc = self.op_count();
381        if !create_key(&mut self.snapshot, &self.log, &key, new_loc).await? {
382            return Ok(false);
383        }
384
385        self.active_keys += 1;
386        self.log
387            .append(Operation::Update(Update(key, value)))
388            .await?;
389
390        Ok(true)
391    }
392
393    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
394    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
395    /// successful `commit`. Returns true if the key was deleted, false if it was already inactive.
396    pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
397        let r = delete_key(&mut self.snapshot, &self.log, &key).await?;
398        if r.is_none() {
399            return Ok(false);
400        }
401
402        self.log.append(Operation::Delete(key)).await?;
403        self.state.steps += 1;
404        self.active_keys -= 1;
405
406        Ok(true)
407    }
408
409    /// Commit any pending operations to the database, ensuring their durability upon return from
410    /// this function. Also raises the inactivity floor according to the schedule. Returns the
411    /// `(start_loc, end_loc]` location range of committed operations. The end of the returned range
412    /// includes the commit operation itself, and hence will always be equal to `op_count`.
413    ///
414    /// Note that even if no operations were added since the last commit, this is a root-state
415    /// changing operation.
416    ///
417    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
418    /// recover the database on restart.
419    ///
420    /// Consumes this dirty store and returns a clean one.
421    pub async fn commit(
422        mut self,
423        metadata: Option<V>,
424    ) -> Result<(Db<E, K, V, T, Durable>, Range<Location>), Error> {
425        let start_loc = self.last_commit_loc + 1;
426
427        // Raise the inactivity floor by taking `self.state.steps` steps, plus 1 to account for the
428        // previous commit becoming inactive.
429        if self.is_empty() {
430            self.inactivity_floor_loc = self.op_count();
431            debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
432        } else {
433            let steps_to_take = self.state.steps + 1;
434            for _ in 0..steps_to_take {
435                let loc = self.inactivity_floor_loc;
436                self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
437            }
438        }
439
440        // Apply the commit operation with the new inactivity floor.
441        self.last_commit_loc = Location::new_unchecked(
442            self.log
443                .append(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
444                .await?,
445        );
446
447        let range = start_loc..self.op_count();
448
449        // Commit the log to ensure durability.
450        self.log.commit().await?;
451
452        Ok((
453            Db {
454                log: self.log,
455                snapshot: self.snapshot,
456                active_keys: self.active_keys,
457                inactivity_floor_loc: self.inactivity_floor_loc,
458                last_commit_loc: self.last_commit_loc,
459                state: Durable,
460            },
461            range,
462        ))
463    }
464}
465
466impl<E, K, V, T> Persistable for Db<E, K, V, T, Durable>
467where
468    E: Storage + Clock + Metrics,
469    K: Array,
470    V: VariableValue,
471    T: Translator,
472{
473    type Error = Error;
474
475    async fn commit(&mut self) -> Result<(), Error> {
476        // No-op, DB already in recoverable state.
477        Ok(())
478    }
479
480    async fn sync(&mut self) -> Result<(), Error> {
481        self.sync().await
482    }
483
484    async fn destroy(self) -> Result<(), Error> {
485        self.destroy().await
486    }
487}
488
489impl<E, K, V, T, S> LogStore for Db<E, K, V, T, S>
490where
491    E: Storage + Clock + Metrics,
492    K: Array,
493    V: VariableValue,
494    T: Translator,
495    S: State,
496{
497    type Value = V;
498
499    fn op_count(&self) -> Location {
500        self.op_count()
501    }
502
503    fn inactivity_floor_loc(&self) -> Location {
504        self.inactivity_floor_loc()
505    }
506
507    async fn get_metadata(&self) -> Result<Option<V>, Error> {
508        self.get_metadata().await
509    }
510
511    fn is_empty(&self) -> bool {
512        self.is_empty()
513    }
514}
515
516impl<E, K, V, T, S> PrunableStore for Db<E, K, V, T, S>
517where
518    E: Storage + Clock + Metrics,
519    K: Array,
520    V: VariableValue,
521    T: Translator,
522    S: State,
523{
524    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
525        self.prune(prune_loc).await
526    }
527}
528
529impl<E, K, V, T, S> crate::kv::Gettable for Db<E, K, V, T, S>
530where
531    E: Storage + Clock + Metrics,
532    K: Array,
533    V: VariableValue,
534    T: Translator,
535    S: State,
536{
537    type Key = K;
538    type Value = V;
539    type Error = Error;
540
541    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
542        self.get(key).await
543    }
544}
545
546impl<E, K, V, T> Updatable for Db<E, K, V, T, NonDurable>
547where
548    E: Storage + Clock + Metrics,
549    K: Array,
550    V: VariableValue,
551    T: Translator,
552{
553    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
554        self.update(key, value).await
555    }
556}
557
558impl<E, K, V, T> Deletable for Db<E, K, V, T, NonDurable>
559where
560    E: Storage + Clock + Metrics,
561    K: Array,
562    V: VariableValue,
563    T: Translator,
564{
565    async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
566        self.delete(key).await
567    }
568}
569
570impl<E, K, V, T> Batchable for Db<E, K, V, T, NonDurable>
571where
572    E: Storage + Clock + Metrics,
573    K: Array,
574    V: VariableValue,
575    T: Translator,
576{
577    async fn write_batch<'a, Iter>(&'a mut self, iter: Iter) -> Result<(), Self::Error>
578    where
579        Iter: Iterator<Item = (Self::Key, Option<Self::Value>)> + Send + 'a,
580    {
581        for (key, value) in iter {
582            if let Some(value) = value {
583                self.update(key, value).await?;
584            } else {
585                self.delete(key).await?;
586            }
587        }
588        Ok(())
589    }
590}
591
592#[cfg(test)]
593mod test {
594    use super::*;
595    use crate::{
596        kv::{
597            tests::{
598                assert_batchable, assert_deletable, assert_gettable, assert_send, assert_updatable,
599            },
600            Gettable as _,
601        },
602        qmdb::store::tests::{assert_log_store, assert_prunable_store},
603        translator::TwoCap,
604    };
605    use commonware_cryptography::{
606        blake3::{Blake3, Digest},
607        Hasher as _,
608    };
609    use commonware_macros::test_traced;
610    use commonware_math::algebra::Random;
611    use commonware_runtime::{deterministic, Runner};
612    use commonware_utils::{NZUsize, NZU16, NZU64};
613    use std::num::NonZeroU16;
614
615    const PAGE_SIZE: NonZeroU16 = NZU16!(77);
616    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
617
618    /// The type of the store used in tests.
619    type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap, Durable>;
620
621    async fn create_test_store(context: deterministic::Context) -> TestStore {
622        let cfg = Config {
623            log_partition: "journal".to_string(),
624            log_write_buffer: NZUsize!(64 * 1024),
625            log_compression: None,
626            log_codec_config: ((0..=10000).into(), ()),
627            log_items_per_section: NZU64!(7),
628            translator: TwoCap,
629            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
630        };
631        TestStore::init(context, cfg).await.unwrap()
632    }
633
634    #[test_traced("DEBUG")]
635    pub fn test_store_construct_empty() {
636        let executor = deterministic::Runner::default();
637        executor.start(|mut context| async move {
638            let mut db = create_test_store(context.clone()).await;
639            assert_eq!(db.op_count(), 1);
640            assert_eq!(db.log.oldest_retained_pos(), Some(0));
641            assert_eq!(db.inactivity_floor_loc(), 0);
642            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
643            assert!(matches!(
644                db.prune(Location::new_unchecked(1)).await,
645                Err(Error::PruneBeyondMinRequired(_, _))
646            ));
647            assert!(db.get_metadata().await.unwrap().is_none());
648
649            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
650            let d1 = Digest::random(&mut context);
651            let v1 = vec![1, 2, 3];
652            let mut dirty = db.into_dirty();
653            dirty.update(d1, v1).await.unwrap();
654            drop(dirty);
655
656            let db = create_test_store(context.clone()).await.into_dirty();
657            assert_eq!(db.op_count(), 1);
658
659            // Test calling commit on an empty db which should make it (durably) non-empty.
660            let metadata = vec![1, 2, 3];
661            let (mut db, range) = db.commit(Some(metadata.clone())).await.unwrap();
662            assert_eq!(range.start, 1);
663            assert_eq!(range.end, 2);
664            assert_eq!(db.op_count(), 2);
665            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
666            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
667
668            let mut db = create_test_store(context.clone()).await.into_dirty();
669            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
670
671            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits on a
672            // non-empty db.
673            db.update(Digest::random(&mut context), vec![1, 2, 3])
674                .await
675                .unwrap();
676            let (mut db, _) = db.commit(None).await.unwrap();
677            for _ in 1..100 {
678                (db, _) = db.into_dirty().commit(None).await.unwrap();
679                // Distance should equal 3 after the second commit, with inactivity_floor
680                // referencing the previous commit operation.
681                assert!(db.op_count() - db.inactivity_floor_loc <= 3);
682                assert!(db.get_metadata().await.unwrap().is_none());
683            }
684
685            db.destroy().await.unwrap();
686        });
687    }
688
689    #[test_traced("DEBUG")]
690    fn test_store_construct_basic() {
691        let executor = deterministic::Runner::default();
692
693        executor.start(|mut ctx| async move {
694            let mut db = create_test_store(ctx.with_label("store"))
695                .await
696                .into_dirty();
697
698            // Ensure the store is empty
699            assert_eq!(db.op_count(), 1);
700            assert_eq!(db.inactivity_floor_loc, 0);
701
702            let key = Digest::random(&mut ctx);
703            let value = vec![2, 3, 4, 5];
704
705            // Attempt to get a key that does not exist
706            let result = db.get(&key).await;
707            assert!(result.unwrap().is_none());
708
709            // Insert a key-value pair
710            db.update(key, value.clone()).await.unwrap();
711
712            assert_eq!(db.op_count(), 2);
713            assert_eq!(db.inactivity_floor_loc, 0);
714
715            // Fetch the value
716            let fetched_value = db.get(&key).await.unwrap();
717            assert_eq!(fetched_value.unwrap(), value);
718
719            // Simulate commit failure.
720            drop(db);
721
722            // Re-open the store
723            let mut db = create_test_store(ctx.with_label("store"))
724                .await
725                .into_dirty();
726
727            // Ensure the re-opened store removed the uncommitted operations
728            assert_eq!(db.op_count(), 1);
729            assert_eq!(db.inactivity_floor_loc, 0);
730            assert!(db.get_metadata().await.unwrap().is_none());
731
732            // Insert a key-value pair
733            db.update(key, value.clone()).await.unwrap();
734
735            assert_eq!(db.op_count(), 2);
736            assert_eq!(db.inactivity_floor_loc, 0);
737
738            // Persist the changes
739            let metadata = vec![99, 100];
740            let (db, range) = db.commit(Some(metadata.clone())).await.unwrap();
741            assert_eq!(range.start, 1);
742            assert_eq!(range.end, 4);
743            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
744
745            // Even though the store was pruned, the inactivity floor was raised by 2, and
746            // the old operations remain in the same blob as an active operation, so they're
747            // retained.
748            assert_eq!(db.op_count(), 4);
749            assert_eq!(db.inactivity_floor_loc, 2);
750
751            // Re-open the store
752            let mut db = create_test_store(ctx.with_label("store"))
753                .await
754                .into_dirty();
755
756            // Ensure the re-opened store retained the committed operations
757            assert_eq!(db.op_count(), 4);
758            assert_eq!(db.inactivity_floor_loc, 2);
759
760            // Fetch the value, ensuring it is still present
761            let fetched_value = db.get(&key).await.unwrap();
762            assert_eq!(fetched_value.unwrap(), value);
763
764            // Insert two new k/v pairs to force pruning of the first section.
765            let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
766            let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
767            db.update(k1, v1.clone()).await.unwrap();
768            db.update(k2, v2.clone()).await.unwrap();
769
770            assert_eq!(db.op_count(), 6);
771            assert_eq!(db.inactivity_floor_loc, 2);
772
773            // Make sure we can still get metadata.
774            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
775
776            let (db, range) = db.commit(None).await.unwrap();
777            assert_eq!(range.start, 4);
778            assert_eq!(range.end, db.op_count());
779            assert_eq!(db.get_metadata().await.unwrap(), None);
780            let mut db = db.into_dirty();
781
782            assert_eq!(db.op_count(), 8);
783            assert_eq!(db.inactivity_floor_loc, 3);
784
785            // Ensure all keys can be accessed, despite the first section being pruned.
786            assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
787            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
788            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
789
790            // Update existing key with modified value.
791            let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
792            v1_updated.push(7);
793            db.update(k1, v1_updated).await.unwrap();
794            let (db, _) = db.commit(None).await.unwrap();
795            assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
796
797            // Create new key.
798            let mut db = db.into_dirty();
799            let k3 = Digest::random(&mut ctx);
800            db.update(k3, vec![8]).await.unwrap();
801            let (db, _) = db.commit(None).await.unwrap();
802            assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
803
804            // Destroy the store
805            db.destroy().await.unwrap();
806        });
807    }
808
809    #[test_traced("DEBUG")]
810    fn test_store_log_replay() {
811        let executor = deterministic::Runner::default();
812
813        executor.start(|mut ctx| async move {
814            let mut db = create_test_store(ctx.with_label("store"))
815                .await
816                .into_dirty();
817
818            // Update the same key many times.
819            const UPDATES: u64 = 100;
820            let k = Digest::random(&mut ctx);
821            for _ in 0..UPDATES {
822                let v = vec![1, 2, 3, 4, 5];
823                db.update(k, v.clone()).await.unwrap();
824            }
825
826            let iter = db.snapshot.get(&k);
827            assert_eq!(iter.count(), 1);
828
829            let (mut db, _) = db.commit(None).await.unwrap();
830            db.sync().await.unwrap();
831            drop(db);
832
833            // Re-open the store, prune it, then ensure it replays the log correctly.
834            let mut db = create_test_store(ctx.with_label("store")).await;
835            db.prune(db.inactivity_floor_loc()).await.unwrap();
836
837            let iter = db.snapshot.get(&k);
838            assert_eq!(iter.count(), 1);
839
840            // 100 operations were applied, each triggering one step, plus the commit op.
841            assert_eq!(db.op_count(), UPDATES * 2 + 2);
842            // Only the highest `Update` operation is active, plus the commit operation above it.
843            let expected_floor = UPDATES * 2;
844            assert_eq!(db.inactivity_floor_loc, expected_floor);
845
846            // All blobs prior to the inactivity floor are pruned, so the oldest retained location
847            // is the first in the last retained blob.
848            assert_eq!(
849                db.log.oldest_retained_pos(),
850                Some(expected_floor - expected_floor % 7)
851            );
852
853            db.destroy().await.unwrap();
854        });
855    }
856
857    #[test_traced("DEBUG")]
858    fn test_store_build_snapshot_keys_with_shared_prefix() {
859        let executor = deterministic::Runner::default();
860
861        executor.start(|mut ctx| async move {
862            let mut db = create_test_store(ctx.with_label("store"))
863                .await
864                .into_dirty();
865
866            let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
867            let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
868
869            // Ensure k2 shares 2 bytes with k1 (test DB uses `TwoCap` translator.)
870            k2.0[0..2].copy_from_slice(&k1.0[0..2]);
871
872            db.update(k1, v1.clone()).await.unwrap();
873            db.update(k2, v2.clone()).await.unwrap();
874
875            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
876            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
877
878            let (mut db, _) = db.commit(None).await.unwrap();
879            db.sync().await.unwrap();
880            drop(db);
881
882            // Re-open the store to ensure it builds the snapshot for the conflicting
883            // keys correctly.
884            let db = create_test_store(ctx.with_label("store")).await;
885
886            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
887            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
888
889            db.destroy().await.unwrap();
890        });
891    }
892
893    #[test_traced("DEBUG")]
894    fn test_store_delete() {
895        let executor = deterministic::Runner::default();
896
897        executor.start(|mut ctx| async move {
898            let mut db = create_test_store(ctx.with_label("store"))
899                .await
900                .into_dirty();
901
902            // Insert a key-value pair
903            let k = Digest::random(&mut ctx);
904            let v = vec![1, 2, 3, 4, 5];
905            db.update(k, v.clone()).await.unwrap();
906            let (db, _) = db.commit(None).await.unwrap();
907
908            // Fetch the value
909            let fetched_value = db.get(&k).await.unwrap();
910            assert_eq!(fetched_value.unwrap(), v);
911
912            // Delete the key
913            let mut db = db.into_dirty();
914            assert!(db.delete(k).await.unwrap());
915
916            // Ensure the key is no longer present
917            let fetched_value = db.get(&k).await.unwrap();
918            assert!(fetched_value.is_none());
919            assert!(!db.delete(k).await.unwrap());
920
921            // Commit the changes
922            let _ = db.commit(None).await.unwrap();
923
924            // Re-open the store and ensure the key is still deleted
925            let mut db = create_test_store(ctx.with_label("store"))
926                .await
927                .into_dirty();
928            let fetched_value = db.get(&k).await.unwrap();
929            assert!(fetched_value.is_none());
930
931            // Re-insert the key
932            db.update(k, v.clone()).await.unwrap();
933            let fetched_value = db.get(&k).await.unwrap();
934            assert_eq!(fetched_value.unwrap(), v);
935
936            // Commit the changes
937            let _ = db.commit(None).await.unwrap();
938
939            // Re-open the store and ensure the snapshot restores the key, after processing
940            // the delete and the subsequent set.
941            let mut db = create_test_store(ctx.with_label("store"))
942                .await
943                .into_dirty();
944            let fetched_value = db.get(&k).await.unwrap();
945            assert_eq!(fetched_value.unwrap(), v);
946
947            // Delete a non-existent key (no-op)
948            let k_n = Digest::random(&mut ctx);
949            db.delete(k_n).await.unwrap();
950
951            let (db, range) = db.commit(None).await.unwrap();
952            assert_eq!(range.start, 9);
953            assert_eq!(range.end, 11);
954
955            assert!(db.get(&k_n).await.unwrap().is_none());
956            // Make sure k is still there
957            assert!(db.get(&k).await.unwrap().is_some());
958
959            db.destroy().await.unwrap();
960        });
961    }
962
963    /// Tests the pruning example in the module documentation.
964    #[test_traced("DEBUG")]
965    fn test_store_pruning() {
966        let executor = deterministic::Runner::default();
967
968        executor.start(|mut ctx| async move {
969            let mut db = create_test_store(ctx.with_label("store"))
970                .await
971                .into_dirty();
972
973            let k_a = Digest::random(&mut ctx);
974            let k_b = Digest::random(&mut ctx);
975
976            let v_a = vec![1];
977            let v_b = vec![];
978            let v_c = vec![4, 5, 6];
979
980            db.update(k_a, v_a.clone()).await.unwrap();
981            db.update(k_b, v_b.clone()).await.unwrap();
982
983            let (db, _) = db.commit(None).await.unwrap();
984            assert_eq!(db.op_count(), 5);
985            assert_eq!(db.inactivity_floor_loc, 2);
986            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
987
988            let mut db = db.into_dirty();
989            db.update(k_b, v_a.clone()).await.unwrap();
990            db.update(k_a, v_c.clone()).await.unwrap();
991
992            let (db, _) = db.commit(None).await.unwrap();
993            assert_eq!(db.op_count(), 11);
994            assert_eq!(db.inactivity_floor_loc, 8);
995            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
996            assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
997
998            db.destroy().await.unwrap();
999        });
1000    }
1001
1002    #[test_traced("WARN")]
1003    pub fn test_store_db_recovery() {
1004        let executor = deterministic::Runner::default();
1005        // Build a db with 1000 keys, some of which we update and some of which we delete.
1006        const ELEMENTS: u64 = 1000;
1007        executor.start(|context| async move {
1008            let mut db = create_test_store(context.with_label("store"))
1009                .await
1010                .into_dirty();
1011
1012            for i in 0u64..ELEMENTS {
1013                let k = Blake3::hash(&i.to_be_bytes());
1014                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1015                db.update(k, v.clone()).await.unwrap();
1016            }
1017
1018            // Simulate a failed commit and test that we rollback to the previous root.
1019            drop(db);
1020            let db = create_test_store(context.with_label("store")).await;
1021            assert_eq!(db.op_count(), 1);
1022
1023            // re-apply the updates and commit them this time.
1024            let mut db = db.into_dirty();
1025            for i in 0u64..ELEMENTS {
1026                let k = Blake3::hash(&i.to_be_bytes());
1027                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1028                db.update(k, v.clone()).await.unwrap();
1029            }
1030            let (db, _) = db.commit(None).await.unwrap();
1031            let op_count = db.op_count();
1032
1033            // Update every 3rd key
1034            let mut db = db.into_dirty();
1035            for i in 0u64..ELEMENTS {
1036                if i % 3 != 0 {
1037                    continue;
1038                }
1039                let k = Blake3::hash(&i.to_be_bytes());
1040                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1041                db.update(k, v.clone()).await.unwrap();
1042            }
1043
1044            // Simulate a failed commit and test that we rollback to the previous root.
1045            drop(db);
1046            let mut db = create_test_store(context.with_label("store"))
1047                .await
1048                .into_dirty();
1049            assert_eq!(db.op_count(), op_count);
1050
1051            // Re-apply updates for every 3rd key and commit them this time.
1052            for i in 0u64..ELEMENTS {
1053                if i % 3 != 0 {
1054                    continue;
1055                }
1056                let k = Blake3::hash(&i.to_be_bytes());
1057                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1058                db.update(k, v.clone()).await.unwrap();
1059            }
1060            let (db, _) = db.commit(None).await.unwrap();
1061            let op_count = db.op_count();
1062            assert_eq!(op_count, 1673);
1063            assert_eq!(db.snapshot.items(), 1000);
1064
1065            // Delete every 7th key
1066            let mut db = db.into_dirty();
1067            for i in 0u64..ELEMENTS {
1068                if i % 7 != 1 {
1069                    continue;
1070                }
1071                let k = Blake3::hash(&i.to_be_bytes());
1072                db.delete(k).await.unwrap();
1073            }
1074
1075            // Simulate a failed commit and test that we rollback to the previous root.
1076            drop(db);
1077            let db = create_test_store(context.with_label("store")).await;
1078            assert_eq!(db.op_count(), op_count);
1079
1080            // Sync and reopen the store to ensure the final commit is preserved.
1081            let mut db = db;
1082            db.sync().await.unwrap();
1083            drop(db);
1084            let mut db = create_test_store(context.with_label("store"))
1085                .await
1086                .into_dirty();
1087            assert_eq!(db.op_count(), op_count);
1088
1089            // Re-delete every 7th key and commit this time.
1090            for i in 0u64..ELEMENTS {
1091                if i % 7 != 1 {
1092                    continue;
1093                }
1094                let k = Blake3::hash(&i.to_be_bytes());
1095                db.delete(k).await.unwrap();
1096            }
1097            let (mut db, _) = db.commit(None).await.unwrap();
1098
1099            assert_eq!(db.op_count(), 1961);
1100            assert_eq!(db.inactivity_floor_loc, 756);
1101
1102            db.prune(db.inactivity_floor_loc()).await.unwrap();
1103            assert_eq!(db.log.oldest_retained_pos(), Some(756 /*- 756 % 7 == 0*/));
1104            assert_eq!(db.snapshot.items(), 857);
1105
1106            db.destroy().await.unwrap();
1107        });
1108    }
1109
1110    #[test_traced("DEBUG")]
1111    fn test_store_batchable() {
1112        let executor = deterministic::Runner::default();
1113
1114        executor.start(|mut ctx| async move {
1115            let mut db = create_test_store(ctx.with_label("store"))
1116                .await
1117                .into_dirty();
1118
1119            // Ensure the store is empty
1120            assert_eq!(db.op_count(), 1);
1121            assert_eq!(db.inactivity_floor_loc, 0);
1122
1123            let key = Digest::random(&mut ctx);
1124            let value = vec![2, 3, 4, 5];
1125
1126            let mut batch = db.start_batch();
1127
1128            // Attempt to get a key that does not exist
1129            let result = batch.get(&key).await;
1130            assert!(result.unwrap().is_none());
1131
1132            // Insert a key-value pair
1133            batch.update(key, value.clone()).await.unwrap();
1134
1135            assert_eq!(db.op_count(), 1); // The batch is not applied yet
1136            assert_eq!(db.inactivity_floor_loc, 0);
1137
1138            // Fetch the value
1139            let fetched_value = batch.get(&key).await.unwrap();
1140            assert_eq!(fetched_value.unwrap(), value);
1141            db.write_batch(batch.into_iter()).await.unwrap();
1142            drop(db);
1143
1144            // Re-open the store
1145            let mut db = create_test_store(ctx.with_label("store"))
1146                .await
1147                .into_dirty();
1148
1149            // Ensure the batch was not applied since we didn't commit.
1150            assert_eq!(db.op_count(), 1);
1151            assert_eq!(db.inactivity_floor_loc, 0);
1152            assert!(db.get_metadata().await.unwrap().is_none());
1153
1154            // Insert a key-value pair
1155            let mut batch = db.start_batch();
1156            batch.update(key, value.clone()).await.unwrap();
1157
1158            // Persist the changes
1159            db.write_batch(batch.into_iter()).await.unwrap();
1160            assert_eq!(db.op_count(), 2);
1161            assert_eq!(db.inactivity_floor_loc, 0);
1162            let metadata = vec![99, 100];
1163            let (db, range) = db.commit(Some(metadata.clone())).await.unwrap();
1164            assert_eq!(range.start, 1);
1165            assert_eq!(range.end, 4);
1166            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1167            drop(db);
1168
1169            // Re-open the store
1170            let db = create_test_store(ctx.with_label("store")).await;
1171
1172            // Ensure the re-opened store retained the committed operations
1173            assert_eq!(db.op_count(), 4);
1174            assert_eq!(db.inactivity_floor_loc, 2);
1175
1176            // Fetch the value, ensuring it is still present
1177            let fetched_value = db.get(&key).await.unwrap();
1178            assert_eq!(fetched_value.unwrap(), value);
1179
1180            // Destroy the store
1181            db.destroy().await.unwrap();
1182        });
1183    }
1184
1185    #[allow(dead_code)]
1186    fn assert_durable_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1187        assert_log_store(db);
1188        assert_prunable_store(db, loc);
1189        assert_gettable(db, &key);
1190        assert_send(db.sync());
1191    }
1192
1193    #[allow(dead_code)]
1194    fn assert_dirty_futures_are_send(
1195        db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap, NonDurable>,
1196        key: Digest,
1197        value: Vec<u8>,
1198    ) {
1199        assert_log_store(db);
1200        assert_gettable(db, &key);
1201        assert_updatable(db, key, value.clone());
1202        assert_deletable(db, key);
1203        assert_batchable(db, key, value);
1204    }
1205
1206    #[allow(dead_code)]
1207    fn assert_dirty_commit_is_send(
1208        db: Db<deterministic::Context, Digest, Vec<u8>, TwoCap, NonDurable>,
1209    ) {
1210        assert_send(db.commit(None));
1211    }
1212}