Skip to main content

commonware_storage/qmdb/store/
db.rs

1//! A mutable key-value database that supports variable-sized values, but without authentication.
2//!
3//! # Example
4//!
5//! ```rust
6//! use commonware_storage::{
7//!     qmdb::store::db::{Config, Db},
8//!     translator::TwoCap,
9//! };
10//! use commonware_utils::{NZUsize, NZU16, NZU64};
11//! use commonware_cryptography::{blake3::Digest, Digest as _};
12//! use commonware_math::algebra::Random;
13//! use commonware_runtime::{buffer::paged::CacheRef, deterministic::Runner, Metrics, Runner as _};
14//!
15//! use std::num::NonZeroU16;
16//! const PAGE_SIZE: NonZeroU16 = NZU16!(8192);
17//! const PAGE_CACHE_SIZE: usize = 100;
18//!
19//! let executor = Runner::default();
20//! executor.start(|mut ctx| async move {
21//!     let config = Config {
22//!         log_partition: "test-partition".into(),
23//!         log_write_buffer: NZUsize!(64 * 1024),
24//!         log_compression: None,
25//!         log_codec_config: ((), ()),
26//!         log_items_per_section: NZU64!(4),
27//!         translator: TwoCap,
28//!         page_cache: CacheRef::from_pooler(&ctx, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
29//!     };
30//!     let mut db =
31//!         Db::<_, Digest, Digest, TwoCap>::init(ctx.with_label("store"), config)
32//!             .await
33//!             .unwrap();
34//!
35//!     // Insert a key-value pair
36//!     let k = Digest::random(&mut ctx);
37//!     let v = Digest::random(&mut ctx);
38//!     db.write_batch([(k, Some(v))]).await.unwrap();
39//!
40//!     // Fetch the value
41//!     let fetched_value = db.get(&k).await.unwrap();
42//!     assert_eq!(fetched_value.unwrap(), v);
43//!
44//!     // Commit the operation to make it persistent
45//!     let metadata = Some(Digest::random(&mut ctx));
46//!     db.commit(metadata).await.unwrap();
47//!
48//!     // Delete the key's value
49//!     db.write_batch([(k, None)]).await.unwrap();
50//!
51//!     // Fetch the value
52//!     let fetched_value = db.get(&k).await.unwrap();
53//!     assert!(fetched_value.is_none());
54//!
55//!     // Commit the operation to make it persistent
56//!     db.commit(None).await.unwrap();
57//!
58//!     // Destroy the store
59//!     db.destroy().await.unwrap();
60//! });
61//! ```
62
63use crate::{
64    index::{unordered::Index, Unordered as _},
65    journal::contiguous::{
66        variable::{Config as JournalConfig, Journal},
67        Mutable as _, Reader,
68    },
69    kv::Batchable,
70    mmr::Location,
71    qmdb::{
72        any::{
73            unordered::{variable::Operation, Update},
74            VariableValue,
75        },
76        build_snapshot_from_log, delete_key,
77        operation::{Committable as _, Operation as _},
78        store::{LogStore, PrunableStore},
79        update_key, Error, FloorHelper,
80    },
81    translator::Translator,
82    Persistable,
83};
84use commonware_codec::Read;
85use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
86use commonware_utils::Array;
87use core::ops::Range;
88use std::num::{NonZeroU64, NonZeroUsize};
89use tracing::{debug, warn};
90
91/// Configuration for initializing a [Db].
92#[derive(Clone)]
93pub struct Config<T: Translator, C> {
94    /// The name of the [Storage] partition used to persist the log of operations.
95    pub log_partition: String,
96
97    /// The size of the write buffer to use for each blob in the [Journal].
98    pub log_write_buffer: NonZeroUsize,
99
100    /// Optional compression level (using `zstd`) to apply to log data before storing.
101    pub log_compression: Option<u8>,
102
103    /// The codec configuration to use for encoding and decoding log items.
104    pub log_codec_config: C,
105
106    /// The number of operations to store in each section of the [Journal].
107    pub log_items_per_section: NonZeroU64,
108
109    /// The [Translator] used by the [Index].
110    pub translator: T,
111
112    /// The [CacheRef] to use for caching data.
113    pub page_cache: CacheRef,
114}
115
116/// An unauthenticated key-value database based off of an append-only [Journal] of operations.
117pub struct Db<E, K, V, T>
118where
119    E: Storage + Clock + Metrics,
120    K: Array,
121    V: VariableValue,
122    T: Translator,
123{
124    /// A log of all [Operation]s that have been applied to the store.
125    ///
126    /// # Invariants
127    ///
128    /// - There is always at least one commit operation in the log.
129    /// - The log is never pruned beyond the inactivity floor.
130    log: Journal<E, Operation<K, V>>,
131
132    /// A snapshot of all currently active operations in the form of a map from each key to the
133    /// location containing its most recent update.
134    ///
135    /// # Invariant
136    ///
137    /// Only references operations of type [Operation::Update].
138    snapshot: Index<T, Location>,
139
140    /// The number of active keys in the store.
141    active_keys: usize,
142
143    /// A location before which all operations are "inactive" (that is, operations before this point
144    /// are over keys that have been updated by some operation at or after this point).
145    pub inactivity_floor_loc: Location,
146
147    /// The location of the last commit operation.
148    pub last_commit_loc: Location,
149
150    /// The number of _steps_ to raise the inactivity floor. Each step involves moving exactly one
151    /// active operation to tip.
152    pub steps: u64,
153}
154
155impl<E, K, V, T> Db<E, K, V, T>
156where
157    E: Storage + Clock + Metrics,
158    K: Array,
159    V: VariableValue,
160    T: Translator,
161{
162    /// Get the value of `key` in the db, or None if it has no value.
163    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
164        for &loc in self.snapshot.get(key) {
165            let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
166                unreachable!("location ({loc}) does not reference update operation");
167            };
168
169            if &k == key {
170                return Ok(Some(v));
171            }
172        }
173
174        Ok(None)
175    }
176
177    /// Whether the db currently has no active keys.
178    pub const fn is_empty(&self) -> bool {
179        self.active_keys == 0
180    }
181
182    /// Gets a [Operation] from the log at the given location. Returns [Error::OperationPruned]
183    /// if the location precedes the oldest retained location. The location is otherwise assumed
184    /// valid.
185    async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
186        let reader = self.log.reader().await;
187        assert!(*loc < reader.bounds().end);
188        reader.read(*loc).await.map_err(|e| match e {
189            crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
190            e => Error::Journal(e),
191        })
192    }
193
194    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
195    /// retained operations respectively.
196    pub async fn bounds(&self) -> std::ops::Range<Location> {
197        let bounds = self.log.reader().await.bounds();
198        Location::new(bounds.start)..Location::new(bounds.end)
199    }
200
201    /// Return the Location of the next operation appended to this db.
202    pub async fn size(&self) -> Location {
203        Location::new(self.log.size().await)
204    }
205
206    /// Return the inactivity floor location. This is the location before which all operations are
207    /// known to be inactive. Operations before this point can be safely pruned.
208    pub const fn inactivity_floor_loc(&self) -> Location {
209        self.inactivity_floor_loc
210    }
211
212    /// Get the metadata associated with the last commit.
213    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
214        let Operation::CommitFloor(metadata, _) =
215            self.log.reader().await.read(*self.last_commit_loc).await?
216        else {
217            unreachable!("last commit should be a commit floor operation");
218        };
219
220        Ok(metadata)
221    }
222
223    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root
224    /// or current snapshot.
225    pub async fn prune(&self, prune_loc: Location) -> Result<(), Error> {
226        if prune_loc > self.inactivity_floor_loc {
227            return Err(Error::PruneBeyondMinRequired(
228                prune_loc,
229                self.inactivity_floor_loc,
230            ));
231        }
232
233        // Prune the log. The log will prune at section boundaries, so the actual oldest retained
234        // location may be less than requested.
235        if !self.log.prune(*prune_loc).await? {
236            return Ok(());
237        }
238
239        let bounds = self.log.reader().await.bounds();
240        let log_size = Location::new(bounds.end);
241        let oldest_retained_loc = Location::new(bounds.start);
242        debug!(
243            ?log_size,
244            ?oldest_retained_loc,
245            ?prune_loc,
246            "pruned inactive ops"
247        );
248
249        Ok(())
250    }
251
252    /// Initializes a new [Db] with the given configuration.
253    pub async fn init(
254        context: E,
255        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
256    ) -> Result<Self, Error> {
257        let mut log = Journal::<E, Operation<K, V>>::init(
258            context.with_label("log"),
259            JournalConfig {
260                partition: cfg.log_partition,
261                items_per_section: cfg.log_items_per_section,
262                compression: cfg.log_compression,
263                codec_config: cfg.log_codec_config,
264                page_cache: cfg.page_cache,
265                write_buffer: cfg.log_write_buffer,
266            },
267        )
268        .await?;
269
270        // Rewind log to remove uncommitted operations.
271        if log.rewind_to(|op| op.is_commit()).await? == 0 {
272            warn!("Log is empty, initializing new db");
273            log.append(&Operation::CommitFloor(None, Location::new(0)))
274                .await?;
275        }
276
277        // Sync the log to avoid having to repeat any recovery that may have been performed on next
278        // startup.
279        log.sync().await?;
280
281        let last_commit_loc = Location::new(
282            log.size()
283                .await
284                .checked_sub(1)
285                .expect("commit should exist"),
286        );
287
288        // Build the snapshot.
289        let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
290        let (inactivity_floor_loc, active_keys) = {
291            let reader = log.reader().await;
292            let op = reader.read(*last_commit_loc).await?;
293            let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
294            let active_keys =
295                build_snapshot_from_log(inactivity_floor_loc, &reader, &mut snapshot, |_, _| {})
296                    .await?;
297            (inactivity_floor_loc, active_keys)
298        };
299
300        Ok(Self {
301            log,
302            snapshot,
303            active_keys,
304            inactivity_floor_loc,
305            last_commit_loc,
306            steps: 0,
307        })
308    }
309
310    /// Sync all database state to disk. While this isn't necessary to ensure durability of
311    /// committed operations, periodic invocation may reduce memory usage and the time required to
312    /// recover the database on restart.
313    pub async fn sync(&self) -> Result<(), Error> {
314        self.log.sync().await.map_err(Into::into)
315    }
316
317    /// Destroy the db, removing all data from disk.
318    pub async fn destroy(self) -> Result<(), Error> {
319        self.log.destroy().await.map_err(Into::into)
320    }
321
322    const fn as_floor_helper(
323        &mut self,
324    ) -> FloorHelper<'_, Index<T, Location>, Journal<E, Operation<K, V>>> {
325        FloorHelper {
326            snapshot: &mut self.snapshot,
327            log: &mut self.log,
328        }
329    }
330
331    /// Writes a batch of key-value pairs to the database.
332    ///
333    /// For each item in the iterator:
334    /// - `(key, Some(value))` updates or creates the key with the given value
335    /// - `(key, None)` deletes the key
336    pub async fn write_batch(
337        &mut self,
338        iter: impl IntoIterator<Item = (K, Option<V>)> + Send,
339    ) -> Result<(), Error> {
340        for (key, value) in iter {
341            if let Some(value) = value {
342                let updated = {
343                    let reader = self.log.reader().await;
344                    let new_loc = reader.bounds().end;
345                    update_key(&mut self.snapshot, &reader, &key, Location::new(new_loc)).await?
346                };
347                if updated.is_some() {
348                    self.steps += 1;
349                } else {
350                    self.active_keys += 1;
351                }
352                self.log
353                    .append(&Operation::Update(Update(key, value)))
354                    .await?;
355            } else {
356                let deleted = {
357                    let reader = self.log.reader().await;
358                    delete_key(&mut self.snapshot, &reader, &key).await?
359                };
360                if deleted.is_some() {
361                    self.log.append(&Operation::Delete(key)).await?;
362                    self.steps += 1;
363                    self.active_keys -= 1;
364                }
365            }
366        }
367        Ok(())
368    }
369
370    /// Commit any pending operations to the database, ensuring their durability upon return from
371    /// this function. Also raises the inactivity floor according to the schedule. Returns the
372    /// `(start_loc, end_loc]` location range of committed operations. The end of the returned range
373    /// includes the commit operation itself, and hence will always be equal to `op_count`.
374    ///
375    /// Note that even if no operations were added since the last commit, this is a root-state
376    /// changing operation.
377    ///
378    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
379    /// recover the database on restart.
380    ///
381    pub async fn commit(&mut self, metadata: Option<V>) -> Result<Range<Location>, Error> {
382        let start_loc = self.last_commit_loc + 1;
383
384        // Raise the inactivity floor by taking `self.steps` steps, plus 1 to account for the
385        // previous commit becoming inactive.
386        if self.is_empty() {
387            self.inactivity_floor_loc = self.size().await;
388            debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
389        } else {
390            let steps_to_take = self.steps + 1;
391            for _ in 0..steps_to_take {
392                let loc = self.inactivity_floor_loc;
393                self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
394            }
395        }
396
397        // Apply the commit operation with the new inactivity floor.
398        self.last_commit_loc = Location::new(
399            self.log
400                .append(&Operation::CommitFloor(metadata, self.inactivity_floor_loc))
401                .await?,
402        );
403
404        let range = start_loc..self.size().await;
405
406        // Commit the log to ensure durability.
407        self.log.commit().await?;
408
409        self.steps = 0;
410
411        Ok(range)
412    }
413}
414
415impl<E, K, V, T> Persistable for Db<E, K, V, T>
416where
417    E: Storage + Clock + Metrics,
418    K: Array,
419    V: VariableValue,
420    T: Translator,
421{
422    type Error = Error;
423
424    async fn commit(&self) -> Result<(), Error> {
425        // No-op, DB already in recoverable state.
426        Ok(())
427    }
428
429    async fn sync(&self) -> Result<(), Error> {
430        self.sync().await
431    }
432
433    async fn destroy(self) -> Result<(), Error> {
434        self.destroy().await
435    }
436}
437
438impl<E, K, V, T> LogStore for Db<E, K, V, T>
439where
440    E: Storage + Clock + Metrics,
441    K: Array,
442    V: VariableValue,
443    T: Translator,
444{
445    type Value = V;
446
447    async fn bounds(&self) -> std::ops::Range<Location> {
448        self.bounds().await
449    }
450
451    async fn get_metadata(&self) -> Result<Option<V>, Error> {
452        self.get_metadata().await
453    }
454}
455
456impl<E, K, V, T> PrunableStore for Db<E, K, V, T>
457where
458    E: Storage + Clock + Metrics,
459    K: Array,
460    V: VariableValue,
461    T: Translator,
462{
463    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
464        Self::prune(self, prune_loc).await
465    }
466
467    async fn inactivity_floor_loc(&self) -> Location {
468        self.inactivity_floor_loc()
469    }
470}
471
472impl<E, K, V, T> crate::kv::Gettable for Db<E, K, V, T>
473where
474    E: Storage + Clock + Metrics,
475    K: Array,
476    V: VariableValue,
477    T: Translator,
478{
479    type Key = K;
480    type Value = V;
481    type Error = Error;
482
483    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
484        self.get(key).await
485    }
486}
487
488impl<E, K, V, T> Batchable for Db<E, K, V, T>
489where
490    E: Storage + Clock + Metrics,
491    K: Array,
492    V: VariableValue,
493    T: Translator,
494{
495    async fn write_batch<'a, Iter>(&'a mut self, iter: Iter) -> Result<(), Self::Error>
496    where
497        Iter: IntoIterator<Item = (Self::Key, Option<Self::Value>)> + Send + 'a,
498        Iter::IntoIter: Send,
499    {
500        self.write_batch(iter).await
501    }
502}
503
504#[cfg(test)]
505mod test {
506    use super::*;
507    use crate::{
508        kv::{
509            tests::{assert_batchable, assert_gettable, assert_send},
510            Gettable as _, Updatable as _,
511        },
512        qmdb::store::tests::{assert_log_store, assert_prunable_store},
513        translator::TwoCap,
514    };
515    use commonware_cryptography::{
516        blake3::{Blake3, Digest},
517        Hasher as _,
518    };
519    use commonware_macros::test_traced;
520    use commonware_math::algebra::Random;
521    use commonware_runtime::{deterministic, Runner};
522    use commonware_utils::{NZUsize, NZU16, NZU64};
523    use std::num::NonZeroU16;
524
525    const PAGE_SIZE: NonZeroU16 = NZU16!(77);
526    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
527
528    /// The type of the store used in tests.
529    type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap>;
530
531    async fn create_test_store(context: deterministic::Context) -> TestStore {
532        let cfg = Config {
533            log_partition: "journal".into(),
534            log_write_buffer: NZUsize!(64 * 1024),
535            log_compression: None,
536            log_codec_config: ((), ((0..=10000).into(), ())),
537            log_items_per_section: NZU64!(7),
538            translator: TwoCap,
539            page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
540        };
541        TestStore::init(context, cfg).await.unwrap()
542    }
543
544    #[test_traced("DEBUG")]
545    pub fn test_store_construct_empty() {
546        let executor = deterministic::Runner::default();
547        executor.start(|mut context| async move {
548            let mut db = create_test_store(context.with_label("store_0")).await;
549            assert_eq!(db.bounds().await.end, 1);
550            assert_eq!(db.log.bounds().await.start, 0);
551            assert_eq!(db.inactivity_floor_loc(), 0);
552            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
553            assert!(matches!(
554                db.prune(Location::new(1)).await,
555                Err(Error::PruneBeyondMinRequired(_, _))
556            ));
557            assert!(db.get_metadata().await.unwrap().is_none());
558
559            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
560            let d1 = Digest::random(&mut context);
561            let v1 = vec![1, 2, 3];
562            db.write_batch([(d1, Some(v1))]).await.unwrap();
563            drop(db);
564
565            let mut db = create_test_store(context.with_label("store_1")).await;
566            assert_eq!(db.bounds().await.end, 1);
567
568            // Test calling commit on an empty db which should make it (durably) non-empty.
569            let metadata = vec![1, 2, 3];
570            let range = Db::commit(&mut db, Some(metadata.clone())).await.unwrap();
571            assert_eq!(range.start, 1);
572            assert_eq!(range.end, 2);
573            assert_eq!(db.bounds().await.end, 2);
574            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
575            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
576
577            let mut db = create_test_store(context.with_label("store_2")).await;
578            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
579
580            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits on a
581            // non-empty db.
582            db.write_batch([(Digest::random(&mut context), Some(vec![1, 2, 3]))])
583                .await
584                .unwrap();
585            Db::commit(&mut db, None).await.unwrap();
586            for _ in 1..100 {
587                Db::commit(&mut db, None).await.unwrap();
588                // Distance should equal 3 after the second commit, with inactivity_floor
589                // referencing the previous commit operation.
590                assert!(db.bounds().await.end - db.inactivity_floor_loc <= 3);
591                assert!(db.get_metadata().await.unwrap().is_none());
592            }
593
594            db.destroy().await.unwrap();
595        });
596    }
597
598    #[test_traced("DEBUG")]
599    fn test_store_construct_basic() {
600        let executor = deterministic::Runner::default();
601
602        executor.start(|mut ctx| async move {
603            let mut db = create_test_store(ctx.with_label("store_0")).await;
604
605            // Ensure the store is empty
606            assert_eq!(db.bounds().await.end, 1);
607            assert_eq!(db.inactivity_floor_loc, 0);
608
609            let key = Digest::random(&mut ctx);
610            let value = vec![2, 3, 4, 5];
611
612            // Attempt to get a key that does not exist
613            let result = db.get(&key).await;
614            assert!(result.unwrap().is_none());
615
616            // Insert a key-value pair
617            db.write_batch([(key, Some(value.clone()))]).await.unwrap();
618
619            assert_eq!(db.bounds().await.end, 2);
620            assert_eq!(db.inactivity_floor_loc, 0);
621
622            // Fetch the value
623            let fetched_value = db.get(&key).await.unwrap();
624            assert_eq!(fetched_value.unwrap(), value);
625
626            // Simulate commit failure.
627            drop(db);
628
629            // Re-open the store
630            let mut db = create_test_store(ctx.with_label("store_1")).await;
631
632            // Ensure the re-opened store removed the uncommitted operations
633            assert_eq!(db.bounds().await.end, 1);
634            assert_eq!(db.inactivity_floor_loc, 0);
635            assert!(db.get_metadata().await.unwrap().is_none());
636
637            // Insert a key-value pair
638            db.write_batch([(key, Some(value.clone()))]).await.unwrap();
639
640            assert_eq!(db.bounds().await.end, 2);
641            assert_eq!(db.inactivity_floor_loc, 0);
642
643            // Persist the changes
644            let metadata = vec![99, 100];
645            let range = Db::commit(&mut db, Some(metadata.clone())).await.unwrap();
646            assert_eq!(range.start, 1);
647            assert_eq!(range.end, 4);
648            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
649
650            // Even though the store was pruned, the inactivity floor was raised by 2, and
651            // the old operations remain in the same blob as an active operation, so they're
652            // retained.
653            assert_eq!(db.bounds().await.end, 4);
654            assert_eq!(db.inactivity_floor_loc, 2);
655
656            // Re-open the store
657            let mut db = create_test_store(ctx.with_label("store_2")).await;
658
659            // Ensure the re-opened store retained the committed operations
660            assert_eq!(db.bounds().await.end, 4);
661            assert_eq!(db.inactivity_floor_loc, 2);
662
663            // Fetch the value, ensuring it is still present
664            let fetched_value = db.get(&key).await.unwrap();
665            assert_eq!(fetched_value.unwrap(), value);
666
667            // Insert two new k/v pairs to force pruning of the first section.
668            let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
669            let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
670            db.write_batch([(k1, Some(v1.clone()))]).await.unwrap();
671            db.write_batch([(k2, Some(v2.clone()))]).await.unwrap();
672
673            assert_eq!(db.bounds().await.end, 6);
674            assert_eq!(db.inactivity_floor_loc, 2);
675
676            // Make sure we can still get metadata.
677            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
678
679            let range = Db::commit(&mut db, None).await.unwrap();
680            assert_eq!(range.start, 4);
681            assert_eq!(range.end, db.bounds().await.end);
682            assert_eq!(db.get_metadata().await.unwrap(), None);
683
684            assert_eq!(db.bounds().await.end, 8);
685            assert_eq!(db.inactivity_floor_loc, 3);
686
687            // Ensure all keys can be accessed, despite the first section being pruned.
688            assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
689            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
690            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
691
692            // Update existing key with modified value.
693            let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
694            v1_updated.push(7);
695            db.write_batch([(k1, Some(v1_updated))]).await.unwrap();
696            Db::commit(&mut db, None).await.unwrap();
697            assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
698
699            // Create new key.
700            let k3 = Digest::random(&mut ctx);
701            db.write_batch([(k3, Some(vec![8]))]).await.unwrap();
702            Db::commit(&mut db, None).await.unwrap();
703            assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
704
705            // Destroy the store
706            db.destroy().await.unwrap();
707        });
708    }
709
710    #[test_traced("DEBUG")]
711    fn test_store_log_replay() {
712        let executor = deterministic::Runner::default();
713
714        executor.start(|mut ctx| async move {
715            let mut db = create_test_store(ctx.with_label("store_0")).await;
716
717            // Update the same key many times.
718            const UPDATES: u64 = 100;
719            let k = Digest::random(&mut ctx);
720            for _ in 0..UPDATES {
721                let v = vec![1, 2, 3, 4, 5];
722                db.write_batch([(k, Some(v.clone()))]).await.unwrap();
723            }
724
725            let iter = db.snapshot.get(&k);
726            assert_eq!(iter.count(), 1);
727
728            Db::commit(&mut db, None).await.unwrap();
729            db.sync().await.unwrap();
730            drop(db);
731
732            // Re-open the store, prune it, then ensure it replays the log correctly.
733            let db = create_test_store(ctx.with_label("store_1")).await;
734            db.prune(db.inactivity_floor_loc()).await.unwrap();
735
736            let iter = db.snapshot.get(&k);
737            assert_eq!(iter.count(), 1);
738
739            // 100 operations were applied, each triggering one step, plus the commit op.
740            assert_eq!(db.bounds().await.end, UPDATES * 2 + 2);
741            // Only the highest `Update` operation is active, plus the commit operation above it.
742            let expected_floor = UPDATES * 2;
743            assert_eq!(db.inactivity_floor_loc, expected_floor);
744
745            // All blobs prior to the inactivity floor are pruned, so the oldest retained location
746            // is the first in the last retained blob.
747            assert_eq!(
748                db.log.bounds().await.start,
749                expected_floor - expected_floor % 7
750            );
751
752            db.destroy().await.unwrap();
753        });
754    }
755
756    #[test_traced("DEBUG")]
757    fn test_store_build_snapshot_keys_with_shared_prefix() {
758        let executor = deterministic::Runner::default();
759
760        executor.start(|mut ctx| async move {
761            let mut db = create_test_store(ctx.with_label("store_0")).await;
762
763            let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
764            let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
765
766            // Ensure k2 shares 2 bytes with k1 (test DB uses `TwoCap` translator.)
767            k2.0[0..2].copy_from_slice(&k1.0[0..2]);
768
769            db.write_batch([(k1, Some(v1.clone()))]).await.unwrap();
770            db.write_batch([(k2, Some(v2.clone()))]).await.unwrap();
771
772            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
773            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
774
775            Db::commit(&mut db, None).await.unwrap();
776            db.sync().await.unwrap();
777            drop(db);
778
779            // Re-open the store to ensure it builds the snapshot for the conflicting
780            // keys correctly.
781            let db = create_test_store(ctx.with_label("store_1")).await;
782
783            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
784            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
785
786            db.destroy().await.unwrap();
787        });
788    }
789
790    #[test_traced("DEBUG")]
791    fn test_store_delete() {
792        let executor = deterministic::Runner::default();
793
794        executor.start(|mut ctx| async move {
795            let mut db = create_test_store(ctx.with_label("store_0")).await;
796
797            // Insert a key-value pair
798            let k = Digest::random(&mut ctx);
799            let v = vec![1, 2, 3, 4, 5];
800            db.write_batch([(k, Some(v.clone()))]).await.unwrap();
801            Db::commit(&mut db, None).await.unwrap();
802
803            // Fetch the value
804            let fetched_value = db.get(&k).await.unwrap();
805            assert_eq!(fetched_value.unwrap(), v);
806
807            // Delete the key
808            assert!(db.get(&k).await.unwrap().is_some());
809            db.write_batch([(k, None)]).await.unwrap();
810
811            // Ensure the key is no longer present
812            let fetched_value = db.get(&k).await.unwrap();
813            assert!(fetched_value.is_none());
814            assert!(db.get(&k).await.unwrap().is_none());
815
816            // Commit the changes
817            Db::commit(&mut db, None).await.unwrap();
818
819            // Re-open the store and ensure the key is still deleted
820            let mut db = create_test_store(ctx.with_label("store_1")).await;
821            let fetched_value = db.get(&k).await.unwrap();
822            assert!(fetched_value.is_none());
823
824            // Re-insert the key
825            db.write_batch([(k, Some(v.clone()))]).await.unwrap();
826            let fetched_value = db.get(&k).await.unwrap();
827            assert_eq!(fetched_value.unwrap(), v);
828
829            // Commit the changes
830            Db::commit(&mut db, None).await.unwrap();
831
832            // Re-open the store and ensure the snapshot restores the key, after processing
833            // the delete and the subsequent set.
834            let mut db = create_test_store(ctx.with_label("store_2")).await;
835            let fetched_value = db.get(&k).await.unwrap();
836            assert_eq!(fetched_value.unwrap(), v);
837
838            // Delete a non-existent key (no-op)
839            let k_n = Digest::random(&mut ctx);
840            db.write_batch([(k_n, None)]).await.unwrap();
841
842            let range = Db::commit(&mut db, None).await.unwrap();
843            assert_eq!(range.start, 9);
844            assert_eq!(range.end, 11);
845
846            assert!(db.get(&k_n).await.unwrap().is_none());
847            // Make sure k is still there
848            assert!(db.get(&k).await.unwrap().is_some());
849
850            db.destroy().await.unwrap();
851        });
852    }
853
854    /// Tests the pruning example in the module documentation.
855    #[test_traced("DEBUG")]
856    fn test_store_pruning() {
857        let executor = deterministic::Runner::default();
858
859        executor.start(|mut ctx| async move {
860            let mut db = create_test_store(ctx.with_label("store")).await;
861
862            let k_a = Digest::random(&mut ctx);
863            let k_b = Digest::random(&mut ctx);
864
865            let v_a = vec![1];
866            let v_b = vec![];
867            let v_c = vec![4, 5, 6];
868
869            db.write_batch([(k_a, Some(v_a.clone()))]).await.unwrap();
870            db.write_batch([(k_b, Some(v_b.clone()))]).await.unwrap();
871
872            Db::commit(&mut db, None).await.unwrap();
873            assert_eq!(db.bounds().await.end, 5);
874            assert_eq!(db.inactivity_floor_loc, 2);
875            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
876
877            db.write_batch([(k_b, Some(v_a.clone()))]).await.unwrap();
878            db.write_batch([(k_a, Some(v_c.clone()))]).await.unwrap();
879
880            Db::commit(&mut db, None).await.unwrap();
881            assert_eq!(db.bounds().await.end, 11);
882            assert_eq!(db.inactivity_floor_loc, 8);
883            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
884            assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
885
886            db.destroy().await.unwrap();
887        });
888    }
889
890    #[test_traced("WARN")]
891    pub fn test_store_db_recovery() {
892        let executor = deterministic::Runner::default();
893        // Build a db with 1000 keys, some of which we update and some of which we delete.
894        const ELEMENTS: u64 = 1000;
895        executor.start(|context| async move {
896            let mut db = create_test_store(context.with_label("store_0")).await;
897
898            for i in 0u64..ELEMENTS {
899                let k = Blake3::hash(&i.to_be_bytes());
900                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
901                db.write_batch([(k, Some(v.clone()))]).await.unwrap();
902            }
903
904            // Simulate a failed commit and test that we rollback to the previous root.
905            drop(db);
906            let mut db = create_test_store(context.with_label("store_1")).await;
907            assert_eq!(db.bounds().await.end, 1);
908
909            // re-apply the updates and commit them this time.
910            for i in 0u64..ELEMENTS {
911                let k = Blake3::hash(&i.to_be_bytes());
912                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
913                db.write_batch([(k, Some(v.clone()))]).await.unwrap();
914            }
915            Db::commit(&mut db, None).await.unwrap();
916            let op_count = db.bounds().await.end;
917
918            // Update every 3rd key
919            for i in 0u64..ELEMENTS {
920                if i % 3 != 0 {
921                    continue;
922                }
923                let k = Blake3::hash(&i.to_be_bytes());
924                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
925                db.write_batch([(k, Some(v.clone()))]).await.unwrap();
926            }
927
928            // Simulate a failed commit and test that we rollback to the previous root.
929            drop(db);
930            let mut db = create_test_store(context.with_label("store_2")).await;
931            assert_eq!(db.bounds().await.end, op_count);
932
933            // Re-apply updates for every 3rd key and commit them this time.
934            for i in 0u64..ELEMENTS {
935                if i % 3 != 0 {
936                    continue;
937                }
938                let k = Blake3::hash(&i.to_be_bytes());
939                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
940                db.write_batch([(k, Some(v.clone()))]).await.unwrap();
941            }
942            Db::commit(&mut db, None).await.unwrap();
943            let op_count = db.bounds().await.end;
944            assert_eq!(op_count, 1673);
945            assert_eq!(db.snapshot.items(), 1000);
946
947            // Delete every 7th key
948            for i in 0u64..ELEMENTS {
949                if i % 7 != 1 {
950                    continue;
951                }
952                let k = Blake3::hash(&i.to_be_bytes());
953                db.write_batch([(k, None)]).await.unwrap();
954            }
955
956            // Simulate a failed commit and test that we rollback to the previous root.
957            drop(db);
958            let db = create_test_store(context.with_label("store_3")).await;
959            assert_eq!(db.bounds().await.end, op_count);
960
961            // Sync and reopen the store to ensure the final commit is preserved.
962            db.sync().await.unwrap();
963            drop(db);
964            let mut db = create_test_store(context.with_label("store_4")).await;
965            assert_eq!(db.bounds().await.end, op_count);
966
967            // Re-delete every 7th key and commit this time.
968            for i in 0u64..ELEMENTS {
969                if i % 7 != 1 {
970                    continue;
971                }
972                let k = Blake3::hash(&i.to_be_bytes());
973                db.write_batch([(k, None)]).await.unwrap();
974            }
975            Db::commit(&mut db, None).await.unwrap();
976
977            assert_eq!(db.bounds().await.end, 1961);
978            assert_eq!(db.inactivity_floor_loc, 756);
979
980            db.prune(db.inactivity_floor_loc()).await.unwrap();
981            assert_eq!(db.log.bounds().await.start, 756 /*- 756 % 7 == 0*/);
982            assert_eq!(db.snapshot.items(), 857);
983
984            db.destroy().await.unwrap();
985        });
986    }
987
988    #[test_traced("DEBUG")]
989    fn test_store_batchable() {
990        let executor = deterministic::Runner::default();
991
992        executor.start(|mut ctx| async move {
993            let mut db = create_test_store(ctx.with_label("store_0")).await;
994
995            // Ensure the store is empty
996            assert_eq!(db.bounds().await.end, 1);
997            assert_eq!(db.inactivity_floor_loc, 0);
998
999            let key = Digest::random(&mut ctx);
1000            let value = vec![2, 3, 4, 5];
1001
1002            let mut batch = db.new_batch();
1003
1004            // Attempt to get a key that does not exist
1005            let result = batch.get(&key).await;
1006            assert!(result.unwrap().is_none());
1007
1008            // Insert a key-value pair
1009            batch.update(key, value.clone()).await.unwrap();
1010
1011            assert_eq!(db.bounds().await.end, 1); // The batch is not applied yet
1012            assert_eq!(db.inactivity_floor_loc, 0);
1013
1014            // Fetch the value
1015            let fetched_value = batch.get(&key).await.unwrap();
1016            assert_eq!(fetched_value.unwrap(), value);
1017            db.write_batch(batch.into_iter()).await.unwrap();
1018            drop(db);
1019
1020            // Re-open the store
1021            let mut db = create_test_store(ctx.with_label("store_1")).await;
1022
1023            // Ensure the batch was not applied since we didn't commit.
1024            assert_eq!(db.bounds().await.end, 1);
1025            assert_eq!(db.inactivity_floor_loc, 0);
1026            assert!(db.get_metadata().await.unwrap().is_none());
1027
1028            // Insert a key-value pair
1029            let mut batch = db.new_batch();
1030            batch.update(key, value.clone()).await.unwrap();
1031
1032            // Persist the changes
1033            db.write_batch(batch.into_iter()).await.unwrap();
1034            assert_eq!(db.bounds().await.end, 2);
1035            assert_eq!(db.inactivity_floor_loc, 0);
1036            let metadata = vec![99, 100];
1037            let range = Db::commit(&mut db, Some(metadata.clone())).await.unwrap();
1038            assert_eq!(range.start, 1);
1039            assert_eq!(range.end, 4);
1040            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1041            drop(db);
1042
1043            // Re-open the store
1044            let db = create_test_store(ctx.with_label("store_2")).await;
1045
1046            // Ensure the re-opened store retained the committed operations
1047            assert_eq!(db.bounds().await.end, 4);
1048            assert_eq!(db.inactivity_floor_loc, 2);
1049
1050            // Fetch the value, ensuring it is still present
1051            let fetched_value = db.get(&key).await.unwrap();
1052            assert_eq!(fetched_value.unwrap(), value);
1053
1054            // Destroy the store
1055            db.destroy().await.unwrap();
1056        });
1057    }
1058
1059    #[allow(dead_code)]
1060    fn assert_read_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1061        assert_log_store(db);
1062        assert_prunable_store(db, loc);
1063        assert_gettable(db, &key);
1064        assert_send(db.sync());
1065    }
1066
1067    #[allow(dead_code)]
1068    fn assert_write_futures_are_send(
1069        db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap>,
1070        key: Digest,
1071        value: Vec<u8>,
1072    ) {
1073        assert_log_store(db);
1074        assert_gettable(db, &key);
1075        assert_send(db.write_batch([(key, Some(value.clone()))]));
1076        assert_send(db.write_batch([(key, None)]));
1077        assert_batchable(db, key, value);
1078    }
1079
1080    #[allow(dead_code)]
1081    fn assert_commit_is_send(db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap>) {
1082        assert_send(Db::commit(db, None));
1083    }
1084}