Skip to main content

commonware_storage/qmdb/store/
db.rs

1//! A mutable key-value database that supports variable-sized values, but without authentication.
2//!
3//! # Lifecycle
4//!
5//! Unlike authenticated stores which have 4 potential states, an unauthenticated store only has
6//! two:
7//!
8//! - **Clean**: The store has no uncommitted operations and its key/value state is immutable. Use
9//!   `into_dirty` to transform it into a dirty state.
10//!
11//! - **Dirty**: The store has uncommitted operations and its key/value state is mutable. Use
12//!   `commit` to transform it into a clean state.
13//!
14//! # Example
15//!
16//! ```rust
17//! use commonware_storage::{
18//!     qmdb::store::db::{Config, Db},
19//!     translator::TwoCap,
20//! };
21//! use commonware_utils::{NZUsize, NZU16, NZU64};
22//! use commonware_cryptography::{blake3::Digest, Digest as _};
23//! use commonware_math::algebra::Random;
24//! use commonware_runtime::{buffer::paged::CacheRef, deterministic::Runner, Metrics, Runner as _};
25//!
26//! use std::num::NonZeroU16;
27//! const PAGE_SIZE: NonZeroU16 = NZU16!(8192);
28//! const PAGE_CACHE_SIZE: usize = 100;
29//!
30//! let executor = Runner::default();
31//! executor.start(|mut ctx| async move {
32//!     let config = Config {
33//!         log_partition: "test_partition".to_string(),
34//!         log_write_buffer: NZUsize!(64 * 1024),
35//!         log_compression: None,
36//!         log_codec_config: (),
37//!         log_items_per_section: NZU64!(4),
38//!         translator: TwoCap,
39//!         page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
40//!     };
41//!     let db =
42//!         Db::<_, Digest, Digest, TwoCap>::init(ctx.with_label("store"), config)
43//!             .await
44//!             .unwrap();
45//!
46//!     // Insert a key-value pair
47//!     let k = Digest::random(&mut ctx);
48//!     let v = Digest::random(&mut ctx);
49//!     let mut db = db.into_dirty();
50//!     db.update(k, v).await.unwrap();
51//!
52//!     // Fetch the value
53//!     let fetched_value = db.get(&k).await.unwrap();
54//!     assert_eq!(fetched_value.unwrap(), v);
55//!
56//!     // Commit the operation to make it persistent
57//!     let metadata = Some(Digest::random(&mut ctx));
58//!     let (db, _) = db.commit(metadata).await.unwrap();
59//!
60//!     // Delete the key's value
61//!     let mut db = db.into_dirty();
62//!     db.delete(k).await.unwrap();
63//!
64//!     // Fetch the value
65//!     let fetched_value = db.get(&k).await.unwrap();
66//!     assert!(fetched_value.is_none());
67//!
68//!     // Commit the operation to make it persistent
69//!     let (db, _) = db.commit(None).await.unwrap();
70//!
71//!     // Destroy the store
72//!     db.destroy().await.unwrap();
73//! });
74//! ```
75
76use crate::{
77    index::{unordered::Index, Unordered as _},
78    journal::contiguous::{
79        variable::{Config as JournalConfig, Journal},
80        MutableContiguous as _,
81    },
82    kv::{Batchable, Deletable, Updatable},
83    mmr::Location,
84    qmdb::{
85        any::{
86            unordered::{variable::Operation, Update},
87            VariableValue,
88        },
89        build_snapshot_from_log, create_key, delete_key,
90        operation::{Committable as _, Operation as _},
91        store::{Durable, LogStore, NonDurable, PrunableStore, State},
92        update_key, Error, FloorHelper,
93    },
94    translator::Translator,
95    Persistable,
96};
97use commonware_codec::Read;
98use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
99use commonware_utils::Array;
100use core::ops::Range;
101use std::num::{NonZeroU64, NonZeroUsize};
102use tracing::{debug, warn};
103
104/// Configuration for initializing a [Db].
105#[derive(Clone)]
106pub struct Config<T: Translator, C> {
107    /// The name of the [Storage] partition used to persist the log of operations.
108    pub log_partition: String,
109
110    /// The size of the write buffer to use for each blob in the [Journal].
111    pub log_write_buffer: NonZeroUsize,
112
113    /// Optional compression level (using `zstd`) to apply to log data before storing.
114    pub log_compression: Option<u8>,
115
116    /// The codec configuration to use for encoding and decoding log items.
117    pub log_codec_config: C,
118
119    /// The number of operations to store in each section of the [Journal].
120    pub log_items_per_section: NonZeroU64,
121
122    /// The [Translator] used by the [Index].
123    pub translator: T,
124
125    /// The [CacheRef] to use for caching data.
126    pub page_cache: CacheRef,
127}
128
129/// An unauthenticated key-value database based off of an append-only [Journal] of operations.
130pub struct Db<E, K, V, T, S = Durable>
131where
132    E: Storage + Clock + Metrics,
133    K: Array,
134    V: VariableValue,
135    T: Translator,
136    S: State,
137{
138    /// A log of all [Operation]s that have been applied to the store.
139    ///
140    /// # Invariants
141    ///
142    /// - There is always at least one commit operation in the log.
143    /// - The log is never pruned beyond the inactivity floor.
144    log: Journal<E, Operation<K, V>>,
145
146    /// A snapshot of all currently active operations in the form of a map from each key to the
147    /// location containing its most recent update.
148    ///
149    /// # Invariant
150    ///
151    /// Only references operations of type [Operation::Update].
152    snapshot: Index<T, Location>,
153
154    /// The number of active keys in the store.
155    active_keys: usize,
156
157    /// A location before which all operations are "inactive" (that is, operations before this point
158    /// are over keys that have been updated by some operation at or after this point).
159    pub inactivity_floor_loc: Location,
160
161    /// The location of the last commit operation.
162    pub last_commit_loc: Location,
163
164    /// The state of the store.
165    pub state: S,
166}
167
168impl<E, K, V, T, S> Db<E, K, V, T, S>
169where
170    E: Storage + Clock + Metrics,
171    K: Array,
172    V: VariableValue,
173    T: Translator,
174    S: State,
175{
176    /// Get the value of `key` in the db, or None if it has no value.
177    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
178        for &loc in self.snapshot.get(key) {
179            let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
180                unreachable!("location ({loc}) does not reference update operation");
181            };
182
183            if &k == key {
184                return Ok(Some(v));
185            }
186        }
187
188        Ok(None)
189    }
190
191    /// Whether the db currently has no active keys.
192    pub const fn is_empty(&self) -> bool {
193        self.active_keys == 0
194    }
195
196    /// Gets a [Operation] from the log at the given location. Returns [Error::OperationPruned]
197    /// if the location precedes the oldest retained location. The location is otherwise assumed
198    /// valid.
199    async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
200        assert!(*loc < self.size());
201
202        // Get the operation from the log at the specified position.
203        // The journal will return ItemPruned if the location is pruned.
204        self.log.read(*loc).await.map_err(|e| match e {
205            crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
206            e => Error::Journal(e),
207        })
208    }
209
210    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
211    /// retained operations respectively.
212    pub const fn bounds(&self) -> std::ops::Range<Location> {
213        let bounds = self.log.bounds();
214        Location::new_unchecked(bounds.start)..Location::new_unchecked(bounds.end)
215    }
216
217    /// Return the Location of the next operation appended to this db.
218    pub const fn size(&self) -> Location {
219        self.bounds().end
220    }
221
222    /// Return the inactivity floor location. This is the location before which all operations are
223    /// known to be inactive. Operations before this point can be safely pruned.
224    pub const fn inactivity_floor_loc(&self) -> Location {
225        self.inactivity_floor_loc
226    }
227
228    /// Get the metadata associated with the last commit.
229    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
230        let Operation::CommitFloor(metadata, _) = self.log.read(*self.last_commit_loc).await?
231        else {
232            unreachable!("last commit should be a commit floor operation");
233        };
234
235        Ok(metadata)
236    }
237
238    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root
239    /// or current snapshot.
240    pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
241        if prune_loc > self.inactivity_floor_loc {
242            return Err(Error::PruneBeyondMinRequired(
243                prune_loc,
244                self.inactivity_floor_loc,
245            ));
246        }
247
248        // Prune the log. The log will prune at section boundaries, so the actual oldest retained
249        // location may be less than requested.
250        if !self.log.prune(*prune_loc).await? {
251            return Ok(());
252        }
253
254        debug!(
255            log_size = ?self.size(),
256            oldest_retained_loc = ?self.bounds().start,
257            ?prune_loc,
258            "pruned inactive ops"
259        );
260
261        Ok(())
262    }
263}
264
265impl<E, K, V, T> Db<E, K, V, T, Durable>
266where
267    E: Storage + Clock + Metrics,
268    K: Array,
269    V: VariableValue,
270    T: Translator,
271{
272    /// Initializes a new [Db] with the given configuration.
273    pub async fn init(
274        context: E,
275        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
276    ) -> Result<Self, Error> {
277        let mut log = Journal::<E, Operation<K, V>>::init(
278            context.with_label("log"),
279            JournalConfig {
280                partition: cfg.log_partition,
281                items_per_section: cfg.log_items_per_section,
282                compression: cfg.log_compression,
283                codec_config: cfg.log_codec_config,
284                page_cache: cfg.page_cache,
285                write_buffer: cfg.log_write_buffer,
286            },
287        )
288        .await?;
289
290        // Rewind log to remove uncommitted operations.
291        if log.rewind_to(|op| op.is_commit()).await? == 0 {
292            warn!("Log is empty, initializing new db");
293            log.append(Operation::CommitFloor(None, Location::new_unchecked(0)))
294                .await?;
295        }
296
297        // Sync the log to avoid having to repeat any recovery that may have been performed on next
298        // startup.
299        log.sync().await?;
300
301        let last_commit_loc =
302            Location::new_unchecked(log.size().checked_sub(1).expect("commit should exist"));
303        let op = log.read(*last_commit_loc).await?;
304        let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
305
306        // Build the snapshot.
307        let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
308        let active_keys =
309            build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, |_, _| {}).await?;
310
311        Ok(Self {
312            log,
313            snapshot,
314            active_keys,
315            inactivity_floor_loc,
316            last_commit_loc,
317            state: Durable,
318        })
319    }
320
321    /// Convert this clean store into its dirty counterpart for making updates.
322    pub fn into_dirty(self) -> Db<E, K, V, T, NonDurable> {
323        Db {
324            log: self.log,
325            snapshot: self.snapshot,
326            active_keys: self.active_keys,
327            inactivity_floor_loc: self.inactivity_floor_loc,
328            last_commit_loc: self.last_commit_loc,
329            state: NonDurable::default(),
330        }
331    }
332
333    /// Sync all database state to disk. While this isn't necessary to ensure durability of
334    /// committed operations, periodic invocation may reduce memory usage and the time required to
335    /// recover the database on restart.
336    pub async fn sync(&mut self) -> Result<(), Error> {
337        self.log.sync().await.map_err(Into::into)
338    }
339
340    /// Destroy the db, removing all data from disk.
341    pub async fn destroy(self) -> Result<(), Error> {
342        self.log.destroy().await.map_err(Into::into)
343    }
344}
345
346impl<E, K, V, T> Db<E, K, V, T, NonDurable>
347where
348    E: Storage + Clock + Metrics,
349    K: Array,
350    V: VariableValue,
351    T: Translator,
352{
353    const fn as_floor_helper(
354        &mut self,
355    ) -> FloorHelper<'_, Index<T, Location>, Journal<E, Operation<K, V>>> {
356        FloorHelper {
357            snapshot: &mut self.snapshot,
358            log: &mut self.log,
359        }
360    }
361
362    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
363    /// subject to rollback until the next successful `commit`.
364    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
365        let new_loc = self.size();
366        if update_key(&mut self.snapshot, &self.log, &key, new_loc)
367            .await?
368            .is_some()
369        {
370            self.state.steps += 1;
371        } else {
372            self.active_keys += 1;
373        }
374
375        self.log
376            .append(Operation::Update(Update(key, value)))
377            .await?;
378
379        Ok(())
380    }
381
382    /// Creates a new key-value pair in the db. The operation is reflected in the snapshot, but will
383    /// be subject to rollback until the next successful `commit`. Returns true if the key was
384    /// created, false if it already existed.
385    pub async fn create(&mut self, key: K, value: V) -> Result<bool, Error> {
386        let new_loc = self.size();
387        if !create_key(&mut self.snapshot, &self.log, &key, new_loc).await? {
388            return Ok(false);
389        }
390
391        self.active_keys += 1;
392        self.log
393            .append(Operation::Update(Update(key, value)))
394            .await?;
395
396        Ok(true)
397    }
398
399    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
400    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
401    /// successful `commit`. Returns true if the key was deleted, false if it was already inactive.
402    pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
403        let r = delete_key(&mut self.snapshot, &self.log, &key).await?;
404        if r.is_none() {
405            return Ok(false);
406        }
407
408        self.log.append(Operation::Delete(key)).await?;
409        self.state.steps += 1;
410        self.active_keys -= 1;
411
412        Ok(true)
413    }
414
415    /// Commit any pending operations to the database, ensuring their durability upon return from
416    /// this function. Also raises the inactivity floor according to the schedule. Returns the
417    /// `(start_loc, end_loc]` location range of committed operations. The end of the returned range
418    /// includes the commit operation itself, and hence will always be equal to `op_count`.
419    ///
420    /// Note that even if no operations were added since the last commit, this is a root-state
421    /// changing operation.
422    ///
423    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
424    /// recover the database on restart.
425    ///
426    /// Consumes this dirty store and returns a clean one.
427    pub async fn commit(
428        mut self,
429        metadata: Option<V>,
430    ) -> Result<(Db<E, K, V, T, Durable>, Range<Location>), Error> {
431        let start_loc = self.last_commit_loc + 1;
432
433        // Raise the inactivity floor by taking `self.state.steps` steps, plus 1 to account for the
434        // previous commit becoming inactive.
435        if self.is_empty() {
436            self.inactivity_floor_loc = self.size();
437            debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
438        } else {
439            let steps_to_take = self.state.steps + 1;
440            for _ in 0..steps_to_take {
441                let loc = self.inactivity_floor_loc;
442                self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
443            }
444        }
445
446        // Apply the commit operation with the new inactivity floor.
447        self.last_commit_loc = Location::new_unchecked(
448            self.log
449                .append(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
450                .await?,
451        );
452
453        let range = start_loc..self.size();
454
455        // Commit the log to ensure durability.
456        self.log.commit().await?;
457
458        Ok((
459            Db {
460                log: self.log,
461                snapshot: self.snapshot,
462                active_keys: self.active_keys,
463                inactivity_floor_loc: self.inactivity_floor_loc,
464                last_commit_loc: self.last_commit_loc,
465                state: Durable,
466            },
467            range,
468        ))
469    }
470}
471
472impl<E, K, V, T> Persistable for Db<E, K, V, T, Durable>
473where
474    E: Storage + Clock + Metrics,
475    K: Array,
476    V: VariableValue,
477    T: Translator,
478{
479    type Error = Error;
480
481    async fn commit(&mut self) -> Result<(), Error> {
482        // No-op, DB already in recoverable state.
483        Ok(())
484    }
485
486    async fn sync(&mut self) -> Result<(), Error> {
487        self.sync().await
488    }
489
490    async fn destroy(self) -> Result<(), Error> {
491        self.destroy().await
492    }
493}
494
495impl<E, K, V, T, S> LogStore for Db<E, K, V, T, S>
496where
497    E: Storage + Clock + Metrics,
498    K: Array,
499    V: VariableValue,
500    T: Translator,
501    S: State,
502{
503    type Value = V;
504
505    fn bounds(&self) -> std::ops::Range<Location> {
506        self.bounds()
507    }
508
509    fn inactivity_floor_loc(&self) -> Location {
510        self.inactivity_floor_loc()
511    }
512
513    async fn get_metadata(&self) -> Result<Option<V>, Error> {
514        self.get_metadata().await
515    }
516
517    fn is_empty(&self) -> bool {
518        self.is_empty()
519    }
520}
521
522impl<E, K, V, T, S> PrunableStore for Db<E, K, V, T, S>
523where
524    E: Storage + Clock + Metrics,
525    K: Array,
526    V: VariableValue,
527    T: Translator,
528    S: State,
529{
530    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
531        self.prune(prune_loc).await
532    }
533}
534
535impl<E, K, V, T, S> crate::kv::Gettable for Db<E, K, V, T, S>
536where
537    E: Storage + Clock + Metrics,
538    K: Array,
539    V: VariableValue,
540    T: Translator,
541    S: State,
542{
543    type Key = K;
544    type Value = V;
545    type Error = Error;
546
547    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
548        self.get(key).await
549    }
550}
551
552impl<E, K, V, T> Updatable for Db<E, K, V, T, NonDurable>
553where
554    E: Storage + Clock + Metrics,
555    K: Array,
556    V: VariableValue,
557    T: Translator,
558{
559    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
560        self.update(key, value).await
561    }
562}
563
564impl<E, K, V, T> Deletable for Db<E, K, V, T, NonDurable>
565where
566    E: Storage + Clock + Metrics,
567    K: Array,
568    V: VariableValue,
569    T: Translator,
570{
571    async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
572        self.delete(key).await
573    }
574}
575
576impl<E, K, V, T> Batchable for Db<E, K, V, T, NonDurable>
577where
578    E: Storage + Clock + Metrics,
579    K: Array,
580    V: VariableValue,
581    T: Translator,
582{
583    async fn write_batch<'a, Iter>(&'a mut self, iter: Iter) -> Result<(), Self::Error>
584    where
585        Iter: Iterator<Item = (Self::Key, Option<Self::Value>)> + Send + 'a,
586    {
587        for (key, value) in iter {
588            if let Some(value) = value {
589                self.update(key, value).await?;
590            } else {
591                self.delete(key).await?;
592            }
593        }
594        Ok(())
595    }
596}
597
598#[cfg(test)]
599mod test {
600    use super::*;
601    use crate::{
602        kv::{
603            tests::{
604                assert_batchable, assert_deletable, assert_gettable, assert_send, assert_updatable,
605            },
606            Gettable as _,
607        },
608        qmdb::store::tests::{assert_log_store, assert_prunable_store},
609        translator::TwoCap,
610    };
611    use commonware_cryptography::{
612        blake3::{Blake3, Digest},
613        Hasher as _,
614    };
615    use commonware_macros::test_traced;
616    use commonware_math::algebra::Random;
617    use commonware_runtime::{deterministic, Runner};
618    use commonware_utils::{NZUsize, NZU16, NZU64};
619    use std::num::NonZeroU16;
620
621    const PAGE_SIZE: NonZeroU16 = NZU16!(77);
622    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
623
624    /// The type of the store used in tests.
625    type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap, Durable>;
626
627    async fn create_test_store(context: deterministic::Context) -> TestStore {
628        let cfg = Config {
629            log_partition: "journal".to_string(),
630            log_write_buffer: NZUsize!(64 * 1024),
631            log_compression: None,
632            log_codec_config: ((0..=10000).into(), ()),
633            log_items_per_section: NZU64!(7),
634            translator: TwoCap,
635            page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
636        };
637        TestStore::init(context, cfg).await.unwrap()
638    }
639
640    #[test_traced("DEBUG")]
641    pub fn test_store_construct_empty() {
642        let executor = deterministic::Runner::default();
643        executor.start(|mut context| async move {
644            let mut db = create_test_store(context.with_label("store_0")).await;
645            assert_eq!(db.bounds().end, 1);
646            assert_eq!(db.log.bounds().start, 0);
647            assert_eq!(db.inactivity_floor_loc(), 0);
648            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
649            assert!(matches!(
650                db.prune(Location::new_unchecked(1)).await,
651                Err(Error::PruneBeyondMinRequired(_, _))
652            ));
653            assert!(db.get_metadata().await.unwrap().is_none());
654
655            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
656            let d1 = Digest::random(&mut context);
657            let v1 = vec![1, 2, 3];
658            let mut dirty = db.into_dirty();
659            dirty.update(d1, v1).await.unwrap();
660            drop(dirty);
661
662            let db = create_test_store(context.with_label("store_1"))
663                .await
664                .into_dirty();
665            assert_eq!(db.bounds().end, 1);
666
667            // Test calling commit on an empty db which should make it (durably) non-empty.
668            let metadata = vec![1, 2, 3];
669            let (mut db, range) = db.commit(Some(metadata.clone())).await.unwrap();
670            assert_eq!(range.start, 1);
671            assert_eq!(range.end, 2);
672            assert_eq!(db.bounds().end, 2);
673            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
674            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
675
676            let mut db = create_test_store(context.with_label("store_2"))
677                .await
678                .into_dirty();
679            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
680
681            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits on a
682            // non-empty db.
683            db.update(Digest::random(&mut context), vec![1, 2, 3])
684                .await
685                .unwrap();
686            let (mut db, _) = db.commit(None).await.unwrap();
687            for _ in 1..100 {
688                (db, _) = db.into_dirty().commit(None).await.unwrap();
689                // Distance should equal 3 after the second commit, with inactivity_floor
690                // referencing the previous commit operation.
691                assert!(db.bounds().end - db.inactivity_floor_loc <= 3);
692                assert!(db.get_metadata().await.unwrap().is_none());
693            }
694
695            db.destroy().await.unwrap();
696        });
697    }
698
699    #[test_traced("DEBUG")]
700    fn test_store_construct_basic() {
701        let executor = deterministic::Runner::default();
702
703        executor.start(|mut ctx| async move {
704            let mut db = create_test_store(ctx.with_label("store_0"))
705                .await
706                .into_dirty();
707
708            // Ensure the store is empty
709            assert_eq!(db.bounds().end, 1);
710            assert_eq!(db.inactivity_floor_loc, 0);
711
712            let key = Digest::random(&mut ctx);
713            let value = vec![2, 3, 4, 5];
714
715            // Attempt to get a key that does not exist
716            let result = db.get(&key).await;
717            assert!(result.unwrap().is_none());
718
719            // Insert a key-value pair
720            db.update(key, value.clone()).await.unwrap();
721
722            assert_eq!(db.bounds().end, 2);
723            assert_eq!(db.inactivity_floor_loc, 0);
724
725            // Fetch the value
726            let fetched_value = db.get(&key).await.unwrap();
727            assert_eq!(fetched_value.unwrap(), value);
728
729            // Simulate commit failure.
730            drop(db);
731
732            // Re-open the store
733            let mut db = create_test_store(ctx.with_label("store_1"))
734                .await
735                .into_dirty();
736
737            // Ensure the re-opened store removed the uncommitted operations
738            assert_eq!(db.bounds().end, 1);
739            assert_eq!(db.inactivity_floor_loc, 0);
740            assert!(db.get_metadata().await.unwrap().is_none());
741
742            // Insert a key-value pair
743            db.update(key, value.clone()).await.unwrap();
744
745            assert_eq!(db.bounds().end, 2);
746            assert_eq!(db.inactivity_floor_loc, 0);
747
748            // Persist the changes
749            let metadata = vec![99, 100];
750            let (db, range) = db.commit(Some(metadata.clone())).await.unwrap();
751            assert_eq!(range.start, 1);
752            assert_eq!(range.end, 4);
753            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
754
755            // Even though the store was pruned, the inactivity floor was raised by 2, and
756            // the old operations remain in the same blob as an active operation, so they're
757            // retained.
758            assert_eq!(db.bounds().end, 4);
759            assert_eq!(db.inactivity_floor_loc, 2);
760
761            // Re-open the store
762            let mut db = create_test_store(ctx.with_label("store_2"))
763                .await
764                .into_dirty();
765
766            // Ensure the re-opened store retained the committed operations
767            assert_eq!(db.bounds().end, 4);
768            assert_eq!(db.inactivity_floor_loc, 2);
769
770            // Fetch the value, ensuring it is still present
771            let fetched_value = db.get(&key).await.unwrap();
772            assert_eq!(fetched_value.unwrap(), value);
773
774            // Insert two new k/v pairs to force pruning of the first section.
775            let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
776            let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
777            db.update(k1, v1.clone()).await.unwrap();
778            db.update(k2, v2.clone()).await.unwrap();
779
780            assert_eq!(db.bounds().end, 6);
781            assert_eq!(db.inactivity_floor_loc, 2);
782
783            // Make sure we can still get metadata.
784            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
785
786            let (db, range) = db.commit(None).await.unwrap();
787            assert_eq!(range.start, 4);
788            assert_eq!(range.end, db.bounds().end);
789            assert_eq!(db.get_metadata().await.unwrap(), None);
790            let mut db = db.into_dirty();
791
792            assert_eq!(db.bounds().end, 8);
793            assert_eq!(db.inactivity_floor_loc, 3);
794
795            // Ensure all keys can be accessed, despite the first section being pruned.
796            assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
797            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
798            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
799
800            // Update existing key with modified value.
801            let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
802            v1_updated.push(7);
803            db.update(k1, v1_updated).await.unwrap();
804            let (db, _) = db.commit(None).await.unwrap();
805            assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
806
807            // Create new key.
808            let mut db = db.into_dirty();
809            let k3 = Digest::random(&mut ctx);
810            db.update(k3, vec![8]).await.unwrap();
811            let (db, _) = db.commit(None).await.unwrap();
812            assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
813
814            // Destroy the store
815            db.destroy().await.unwrap();
816        });
817    }
818
819    #[test_traced("DEBUG")]
820    fn test_store_log_replay() {
821        let executor = deterministic::Runner::default();
822
823        executor.start(|mut ctx| async move {
824            let mut db = create_test_store(ctx.with_label("store_0"))
825                .await
826                .into_dirty();
827
828            // Update the same key many times.
829            const UPDATES: u64 = 100;
830            let k = Digest::random(&mut ctx);
831            for _ in 0..UPDATES {
832                let v = vec![1, 2, 3, 4, 5];
833                db.update(k, v.clone()).await.unwrap();
834            }
835
836            let iter = db.snapshot.get(&k);
837            assert_eq!(iter.count(), 1);
838
839            let (mut db, _) = db.commit(None).await.unwrap();
840            db.sync().await.unwrap();
841            drop(db);
842
843            // Re-open the store, prune it, then ensure it replays the log correctly.
844            let mut db = create_test_store(ctx.with_label("store_1")).await;
845            db.prune(db.inactivity_floor_loc()).await.unwrap();
846
847            let iter = db.snapshot.get(&k);
848            assert_eq!(iter.count(), 1);
849
850            // 100 operations were applied, each triggering one step, plus the commit op.
851            assert_eq!(db.bounds().end, UPDATES * 2 + 2);
852            // Only the highest `Update` operation is active, plus the commit operation above it.
853            let expected_floor = UPDATES * 2;
854            assert_eq!(db.inactivity_floor_loc, expected_floor);
855
856            // All blobs prior to the inactivity floor are pruned, so the oldest retained location
857            // is the first in the last retained blob.
858            assert_eq!(db.log.bounds().start, expected_floor - expected_floor % 7);
859
860            db.destroy().await.unwrap();
861        });
862    }
863
864    #[test_traced("DEBUG")]
865    fn test_store_build_snapshot_keys_with_shared_prefix() {
866        let executor = deterministic::Runner::default();
867
868        executor.start(|mut ctx| async move {
869            let mut db = create_test_store(ctx.with_label("store_0"))
870                .await
871                .into_dirty();
872
873            let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
874            let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
875
876            // Ensure k2 shares 2 bytes with k1 (test DB uses `TwoCap` translator.)
877            k2.0[0..2].copy_from_slice(&k1.0[0..2]);
878
879            db.update(k1, v1.clone()).await.unwrap();
880            db.update(k2, v2.clone()).await.unwrap();
881
882            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
883            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
884
885            let (mut db, _) = db.commit(None).await.unwrap();
886            db.sync().await.unwrap();
887            drop(db);
888
889            // Re-open the store to ensure it builds the snapshot for the conflicting
890            // keys correctly.
891            let db = create_test_store(ctx.with_label("store_1")).await;
892
893            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
894            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
895
896            db.destroy().await.unwrap();
897        });
898    }
899
900    #[test_traced("DEBUG")]
901    fn test_store_delete() {
902        let executor = deterministic::Runner::default();
903
904        executor.start(|mut ctx| async move {
905            let mut db = create_test_store(ctx.with_label("store_0"))
906                .await
907                .into_dirty();
908
909            // Insert a key-value pair
910            let k = Digest::random(&mut ctx);
911            let v = vec![1, 2, 3, 4, 5];
912            db.update(k, v.clone()).await.unwrap();
913            let (db, _) = db.commit(None).await.unwrap();
914
915            // Fetch the value
916            let fetched_value = db.get(&k).await.unwrap();
917            assert_eq!(fetched_value.unwrap(), v);
918
919            // Delete the key
920            let mut db = db.into_dirty();
921            assert!(db.delete(k).await.unwrap());
922
923            // Ensure the key is no longer present
924            let fetched_value = db.get(&k).await.unwrap();
925            assert!(fetched_value.is_none());
926            assert!(!db.delete(k).await.unwrap());
927
928            // Commit the changes
929            let _ = db.commit(None).await.unwrap();
930
931            // Re-open the store and ensure the key is still deleted
932            let mut db = create_test_store(ctx.with_label("store_1"))
933                .await
934                .into_dirty();
935            let fetched_value = db.get(&k).await.unwrap();
936            assert!(fetched_value.is_none());
937
938            // Re-insert the key
939            db.update(k, v.clone()).await.unwrap();
940            let fetched_value = db.get(&k).await.unwrap();
941            assert_eq!(fetched_value.unwrap(), v);
942
943            // Commit the changes
944            let _ = db.commit(None).await.unwrap();
945
946            // Re-open the store and ensure the snapshot restores the key, after processing
947            // the delete and the subsequent set.
948            let mut db = create_test_store(ctx.with_label("store_2"))
949                .await
950                .into_dirty();
951            let fetched_value = db.get(&k).await.unwrap();
952            assert_eq!(fetched_value.unwrap(), v);
953
954            // Delete a non-existent key (no-op)
955            let k_n = Digest::random(&mut ctx);
956            db.delete(k_n).await.unwrap();
957
958            let (db, range) = db.commit(None).await.unwrap();
959            assert_eq!(range.start, 9);
960            assert_eq!(range.end, 11);
961
962            assert!(db.get(&k_n).await.unwrap().is_none());
963            // Make sure k is still there
964            assert!(db.get(&k).await.unwrap().is_some());
965
966            db.destroy().await.unwrap();
967        });
968    }
969
970    /// Tests the pruning example in the module documentation.
971    #[test_traced("DEBUG")]
972    fn test_store_pruning() {
973        let executor = deterministic::Runner::default();
974
975        executor.start(|mut ctx| async move {
976            let mut db = create_test_store(ctx.with_label("store"))
977                .await
978                .into_dirty();
979
980            let k_a = Digest::random(&mut ctx);
981            let k_b = Digest::random(&mut ctx);
982
983            let v_a = vec![1];
984            let v_b = vec![];
985            let v_c = vec![4, 5, 6];
986
987            db.update(k_a, v_a.clone()).await.unwrap();
988            db.update(k_b, v_b.clone()).await.unwrap();
989
990            let (db, _) = db.commit(None).await.unwrap();
991            assert_eq!(db.bounds().end, 5);
992            assert_eq!(db.inactivity_floor_loc, 2);
993            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
994
995            let mut db = db.into_dirty();
996            db.update(k_b, v_a.clone()).await.unwrap();
997            db.update(k_a, v_c.clone()).await.unwrap();
998
999            let (db, _) = db.commit(None).await.unwrap();
1000            assert_eq!(db.bounds().end, 11);
1001            assert_eq!(db.inactivity_floor_loc, 8);
1002            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
1003            assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
1004
1005            db.destroy().await.unwrap();
1006        });
1007    }
1008
1009    #[test_traced("WARN")]
1010    pub fn test_store_db_recovery() {
1011        let executor = deterministic::Runner::default();
1012        // Build a db with 1000 keys, some of which we update and some of which we delete.
1013        const ELEMENTS: u64 = 1000;
1014        executor.start(|context| async move {
1015            let mut db = create_test_store(context.with_label("store_0"))
1016                .await
1017                .into_dirty();
1018
1019            for i in 0u64..ELEMENTS {
1020                let k = Blake3::hash(&i.to_be_bytes());
1021                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1022                db.update(k, v.clone()).await.unwrap();
1023            }
1024
1025            // Simulate a failed commit and test that we rollback to the previous root.
1026            drop(db);
1027            let db = create_test_store(context.with_label("store_1")).await;
1028            assert_eq!(db.bounds().end, 1);
1029
1030            // re-apply the updates and commit them this time.
1031            let mut db = db.into_dirty();
1032            for i in 0u64..ELEMENTS {
1033                let k = Blake3::hash(&i.to_be_bytes());
1034                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1035                db.update(k, v.clone()).await.unwrap();
1036            }
1037            let (db, _) = db.commit(None).await.unwrap();
1038            let op_count = db.bounds().end;
1039
1040            // Update every 3rd key
1041            let mut db = db.into_dirty();
1042            for i in 0u64..ELEMENTS {
1043                if i % 3 != 0 {
1044                    continue;
1045                }
1046                let k = Blake3::hash(&i.to_be_bytes());
1047                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1048                db.update(k, v.clone()).await.unwrap();
1049            }
1050
1051            // Simulate a failed commit and test that we rollback to the previous root.
1052            drop(db);
1053            let mut db = create_test_store(context.with_label("store_2"))
1054                .await
1055                .into_dirty();
1056            assert_eq!(db.bounds().end, op_count);
1057
1058            // Re-apply updates for every 3rd key and commit them this time.
1059            for i in 0u64..ELEMENTS {
1060                if i % 3 != 0 {
1061                    continue;
1062                }
1063                let k = Blake3::hash(&i.to_be_bytes());
1064                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1065                db.update(k, v.clone()).await.unwrap();
1066            }
1067            let (db, _) = db.commit(None).await.unwrap();
1068            let op_count = db.bounds().end;
1069            assert_eq!(op_count, 1673);
1070            assert_eq!(db.snapshot.items(), 1000);
1071
1072            // Delete every 7th key
1073            let mut db = db.into_dirty();
1074            for i in 0u64..ELEMENTS {
1075                if i % 7 != 1 {
1076                    continue;
1077                }
1078                let k = Blake3::hash(&i.to_be_bytes());
1079                db.delete(k).await.unwrap();
1080            }
1081
1082            // Simulate a failed commit and test that we rollback to the previous root.
1083            drop(db);
1084            let db = create_test_store(context.with_label("store_3")).await;
1085            assert_eq!(db.bounds().end, op_count);
1086
1087            // Sync and reopen the store to ensure the final commit is preserved.
1088            let mut db = db;
1089            db.sync().await.unwrap();
1090            drop(db);
1091            let mut db = create_test_store(context.with_label("store_4"))
1092                .await
1093                .into_dirty();
1094            assert_eq!(db.bounds().end, op_count);
1095
1096            // Re-delete every 7th key and commit this time.
1097            for i in 0u64..ELEMENTS {
1098                if i % 7 != 1 {
1099                    continue;
1100                }
1101                let k = Blake3::hash(&i.to_be_bytes());
1102                db.delete(k).await.unwrap();
1103            }
1104            let (mut db, _) = db.commit(None).await.unwrap();
1105
1106            assert_eq!(db.bounds().end, 1961);
1107            assert_eq!(db.inactivity_floor_loc, 756);
1108
1109            db.prune(db.inactivity_floor_loc()).await.unwrap();
1110            assert_eq!(db.log.bounds().start, 756 /*- 756 % 7 == 0*/);
1111            assert_eq!(db.snapshot.items(), 857);
1112
1113            db.destroy().await.unwrap();
1114        });
1115    }
1116
1117    #[test_traced("DEBUG")]
1118    fn test_store_batchable() {
1119        let executor = deterministic::Runner::default();
1120
1121        executor.start(|mut ctx| async move {
1122            let mut db = create_test_store(ctx.with_label("store_0"))
1123                .await
1124                .into_dirty();
1125
1126            // Ensure the store is empty
1127            assert_eq!(db.bounds().end, 1);
1128            assert_eq!(db.inactivity_floor_loc, 0);
1129
1130            let key = Digest::random(&mut ctx);
1131            let value = vec![2, 3, 4, 5];
1132
1133            let mut batch = db.start_batch();
1134
1135            // Attempt to get a key that does not exist
1136            let result = batch.get(&key).await;
1137            assert!(result.unwrap().is_none());
1138
1139            // Insert a key-value pair
1140            batch.update(key, value.clone()).await.unwrap();
1141
1142            assert_eq!(db.bounds().end, 1); // The batch is not applied yet
1143            assert_eq!(db.inactivity_floor_loc, 0);
1144
1145            // Fetch the value
1146            let fetched_value = batch.get(&key).await.unwrap();
1147            assert_eq!(fetched_value.unwrap(), value);
1148            db.write_batch(batch.into_iter()).await.unwrap();
1149            drop(db);
1150
1151            // Re-open the store
1152            let mut db = create_test_store(ctx.with_label("store_1"))
1153                .await
1154                .into_dirty();
1155
1156            // Ensure the batch was not applied since we didn't commit.
1157            assert_eq!(db.bounds().end, 1);
1158            assert_eq!(db.inactivity_floor_loc, 0);
1159            assert!(db.get_metadata().await.unwrap().is_none());
1160
1161            // Insert a key-value pair
1162            let mut batch = db.start_batch();
1163            batch.update(key, value.clone()).await.unwrap();
1164
1165            // Persist the changes
1166            db.write_batch(batch.into_iter()).await.unwrap();
1167            assert_eq!(db.bounds().end, 2);
1168            assert_eq!(db.inactivity_floor_loc, 0);
1169            let metadata = vec![99, 100];
1170            let (db, range) = db.commit(Some(metadata.clone())).await.unwrap();
1171            assert_eq!(range.start, 1);
1172            assert_eq!(range.end, 4);
1173            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1174            drop(db);
1175
1176            // Re-open the store
1177            let db = create_test_store(ctx.with_label("store_2")).await;
1178
1179            // Ensure the re-opened store retained the committed operations
1180            assert_eq!(db.bounds().end, 4);
1181            assert_eq!(db.inactivity_floor_loc, 2);
1182
1183            // Fetch the value, ensuring it is still present
1184            let fetched_value = db.get(&key).await.unwrap();
1185            assert_eq!(fetched_value.unwrap(), value);
1186
1187            // Destroy the store
1188            db.destroy().await.unwrap();
1189        });
1190    }
1191
1192    #[allow(dead_code)]
1193    fn assert_durable_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1194        assert_log_store(db);
1195        assert_prunable_store(db, loc);
1196        assert_gettable(db, &key);
1197        assert_send(db.sync());
1198    }
1199
1200    #[allow(dead_code)]
1201    fn assert_dirty_futures_are_send(
1202        db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap, NonDurable>,
1203        key: Digest,
1204        value: Vec<u8>,
1205    ) {
1206        assert_log_store(db);
1207        assert_gettable(db, &key);
1208        assert_updatable(db, key, value.clone());
1209        assert_deletable(db, key);
1210        assert_batchable(db, key, value);
1211    }
1212
1213    #[allow(dead_code)]
1214    fn assert_dirty_commit_is_send(
1215        db: Db<deterministic::Context, Digest, Vec<u8>, TwoCap, NonDurable>,
1216    ) {
1217        assert_send(db.commit(None));
1218    }
1219}