Skip to main content

commonware_storage/qmdb/store/
db.rs

1//! A mutable key-value database that supports variable-sized values, but without authentication.
2//!
3//! # Example
4//!
5//! ```rust
6//! use commonware_storage::{
7//!     journal::contiguous::variable::Config as JournalConfig,
8//!     qmdb::store::db::{Config, Db},
9//!     translator::TwoCap,
10//! };
11//! use commonware_utils::{NZUsize, NZU16, NZU64};
12//! use commonware_cryptography::{blake3::Digest, Digest as _};
13//! use commonware_math::algebra::Random;
14//! use commonware_runtime::{buffer::paged::CacheRef, deterministic::Runner, Metrics, Runner as _};
15//!
16//! use std::num::NonZeroU16;
17//! const PAGE_SIZE: NonZeroU16 = NZU16!(8192);
18//! const PAGE_CACHE_SIZE: usize = 100;
19//!
20//! let executor = Runner::default();
21//! executor.start(|mut ctx| async move {
22//!     let config = Config {
23//!         log: JournalConfig {
24//!             partition: "test-partition".into(),
25//!             write_buffer: NZUsize!(64 * 1024),
26//!             compression: None,
27//!             codec_config: ((), ()),
28//!             items_per_section: NZU64!(4),
29//!             page_cache: CacheRef::from_pooler(&ctx, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
30//!         },
31//!         translator: TwoCap,
32//!     };
33//!     let mut db =
34//!         Db::<_, Digest, Digest, TwoCap>::init(ctx.with_label("store"), config)
35//!             .await
36//!             .unwrap();
37//!
38//!     // Insert a key-value pair
39//!     let k = Digest::random(&mut ctx);
40//!     let v = Digest::random(&mut ctx);
41//!     let metadata = Some(Digest::random(&mut ctx));
42//!     db.apply_batch(db.new_batch().update(k, v).finalize(metadata)).await.unwrap();
43//!     db.commit().await.unwrap();
44//!
45//!     // Fetch the value
46//!     let fetched_value = db.get(&k).await.unwrap();
47//!     assert_eq!(fetched_value.unwrap(), v);
48//!
49//!     // Delete the key's value
50//!     db.apply_batch(db.new_batch().delete(k).finalize(None)).await.unwrap();
51//!     db.commit().await.unwrap();
52//!
53//!     // Fetch the value
54//!     let fetched_value = db.get(&k).await.unwrap();
55//!     assert!(fetched_value.is_none());
56//!
57//!     // Destroy the store
58//!     db.destroy().await.unwrap();
59//! });
60//! ```
61//!
62//! ```ignore
63//! // Advanced mode: while the previous batch is being committed, build exactly
64//! // one child batch from the newly published state.
65//! db.apply_batch(db.new_batch().update(key_a, value_a).finalize(None)).await?;
66//!
67//! let (child_finalized, commit_result) = futures::join!(
68//!     async { db.new_batch().update(key_b, value_b).finalize(None) },
69//!     db.commit(),
70//! );
71//! commit_result?;
72//!
73//! db.apply_batch(child_finalized).await?;
74//! db.commit().await?;
75//! ```
76
77use crate::{
78    index::{unordered::Index, Unordered as _},
79    journal::contiguous::{
80        variable::{Config as JournalConfig, Journal},
81        Mutable as _, Reader,
82    },
83    merkle::mmr::Location,
84    qmdb::{
85        any::{
86            unordered::{variable::Operation, Update},
87            VariableValue,
88        },
89        build_snapshot_from_log, delete_key,
90        operation::{Committable as _, Key, Operation as _},
91        update_key, FloorHelper,
92    },
93    translator::Translator,
94    Context, Persistable,
95};
96use commonware_codec::{CodecShared, Read};
97use commonware_utils::Array;
98use core::ops::Range;
99use std::collections::BTreeMap;
100use tracing::{debug, warn};
101
102type Error = crate::qmdb::Error<crate::mmr::Family>;
103
104/// Configuration for initializing a [Db].
105#[derive(Clone)]
106pub struct Config<T: Translator, C> {
107    /// Configuration for the variable-size operations log journal.
108    pub log: JournalConfig<C>,
109
110    /// The [Translator] used by the [Index].
111    pub translator: T,
112}
113
114/// A finalized batch of writes and deletes ready to be applied to the store.
115pub struct Changeset<K: Key, V: CodecShared + Clone> {
116    diff: BTreeMap<K, Option<V>>,
117    metadata: Option<V>,
118}
119
120impl<K: Key, V: CodecShared + Clone> Changeset<K, V> {
121    fn into_parts(self) -> (BTreeMap<K, Option<V>>, Option<V>) {
122        (self.diff, self.metadata)
123    }
124}
125
126impl<K: Key, V: CodecShared + Clone> FromIterator<(K, Option<V>)> for Changeset<K, V> {
127    fn from_iter<TIter: IntoIterator<Item = (K, Option<V>)>>(iter: TIter) -> Self {
128        Self {
129            diff: iter.into_iter().collect(),
130            metadata: None,
131        }
132    }
133}
134
135impl<K: Key, V: CodecShared + Clone, const N: usize> From<[(K, Option<V>); N]> for Changeset<K, V> {
136    fn from(items: [(K, Option<V>); N]) -> Self {
137        items.into_iter().collect()
138    }
139}
140
141/// A mutable batch of writes and deletes staged against the current store state.
142pub struct Batch<'a, E, K, V, T>
143where
144    E: Context,
145    K: Array,
146    V: VariableValue,
147    T: Translator,
148{
149    db: &'a Db<E, K, V, T>,
150    diff: BTreeMap<K, Option<V>>,
151}
152
153impl<'a, E, K, V, T> Batch<'a, E, K, V, T>
154where
155    E: Context,
156    K: Array,
157    V: VariableValue,
158    T: Translator,
159{
160    const fn new(db: &'a Db<E, K, V, T>) -> Self {
161        Self {
162            db,
163            diff: BTreeMap::new(),
164        }
165    }
166
167    /// Finalize the batch into a changeset that can be applied to the store.
168    pub fn finalize(self, metadata: Option<V>) -> Changeset<K, V> {
169        Changeset {
170            diff: self.diff,
171            metadata,
172        }
173    }
174
175    /// Get the value of `key` in the batch, or the value in the store if it has
176    /// not been modified by the batch.
177    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
178        if let Some(value) = self.diff.get(key) {
179            return Ok(value.clone());
180        }
181        self.db.get(key).await
182    }
183
184    /// Update the value of `key` in the batch.
185    pub fn update(mut self, key: K, value: V) -> Self {
186        self.diff.insert(key, Some(value));
187        self
188    }
189
190    /// Delete the value of `key` in the batch.
191    pub fn delete(mut self, key: K) -> Self {
192        self.diff.insert(key, None);
193        self
194    }
195}
196
197/// An unauthenticated key-value database based off of an append-only [Journal] of operations.
198pub struct Db<E, K, V, T>
199where
200    E: Context,
201    K: Array,
202    V: VariableValue,
203    T: Translator,
204{
205    /// A log of all [Operation]s that have been applied to the store.
206    ///
207    /// # Invariants
208    ///
209    /// - There is always at least one commit operation in the log.
210    /// - The log is never pruned beyond the inactivity floor.
211    log: Journal<E, Operation<crate::mmr::Family, K, V>>,
212
213    /// A snapshot of all currently active operations in the form of a map from each key to the
214    /// location containing its most recent update.
215    ///
216    /// # Invariant
217    ///
218    /// Only references operations of type [Operation::Update].
219    snapshot: Index<T, Location>,
220
221    /// The number of active keys in the store.
222    active_keys: usize,
223
224    /// A location before which all operations are "inactive" (that is, operations before this point
225    /// are over keys that have been updated by some operation at or after this point).
226    pub inactivity_floor_loc: Location,
227
228    /// The location of the last commit operation.
229    pub last_commit_loc: Location,
230
231    /// The number of _steps_ to raise the inactivity floor. Each step involves moving exactly one
232    /// active operation to tip.
233    pub steps: u64,
234}
235
236impl<E, K, V, T> Db<E, K, V, T>
237where
238    E: Context,
239    K: Array,
240    V: VariableValue,
241    T: Translator,
242{
243    /// Get the value of `key` in the db, or None if it has no value.
244    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
245        for &loc in self.snapshot.get(key) {
246            let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
247                unreachable!("location ({loc}) does not reference update operation");
248            };
249
250            if &k == key {
251                return Ok(Some(v));
252            }
253        }
254
255        Ok(None)
256    }
257
258    /// Returns a new empty batch of changes.
259    pub const fn new_batch(&self) -> Batch<'_, E, K, V, T> {
260        Batch::new(self)
261    }
262
263    /// Whether the db currently has no active keys.
264    pub const fn is_empty(&self) -> bool {
265        self.active_keys == 0
266    }
267
268    /// Gets a [Operation] from the log at the given location. Returns [Error::OperationPruned]
269    /// if the location precedes the oldest retained location. The location is otherwise assumed
270    /// valid.
271    async fn get_op(&self, loc: Location) -> Result<Operation<crate::mmr::Family, K, V>, Error> {
272        let reader = self.log.reader().await;
273        assert!(*loc < reader.bounds().end);
274        reader.read(*loc).await.map_err(|e| match e {
275            crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
276            e => Error::Journal(e),
277        })
278    }
279
280    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
281    /// retained operations respectively.
282    pub async fn bounds(&self) -> std::ops::Range<Location> {
283        let bounds = self.log.reader().await.bounds();
284        Location::new(bounds.start)..Location::new(bounds.end)
285    }
286
287    /// Return the Location of the next operation appended to this db.
288    pub async fn size(&self) -> Location {
289        Location::new(self.log.size().await)
290    }
291
292    /// Return the inactivity floor location. This is the location before which all operations are
293    /// known to be inactive. Operations before this point can be safely pruned.
294    pub const fn inactivity_floor_loc(&self) -> Location {
295        self.inactivity_floor_loc
296    }
297
298    /// Get the metadata associated with the last commit.
299    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
300        let Operation::CommitFloor(metadata, _) =
301            self.log.reader().await.read(*self.last_commit_loc).await?
302        else {
303            unreachable!("last commit should be a commit floor operation");
304        };
305
306        Ok(metadata)
307    }
308
309    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root
310    /// or current snapshot.
311    pub async fn prune(&self, prune_loc: Location) -> Result<(), Error> {
312        if prune_loc > self.inactivity_floor_loc {
313            return Err(Error::PruneBeyondMinRequired(
314                prune_loc,
315                self.inactivity_floor_loc,
316            ));
317        }
318
319        // Prune the log. The log will prune at section boundaries, so the actual oldest retained
320        // location may be less than requested.
321        if !self.log.prune(*prune_loc).await? {
322            return Ok(());
323        }
324
325        let bounds = self.log.reader().await.bounds();
326        let log_size = Location::new(bounds.end);
327        let oldest_retained_loc = Location::new(bounds.start);
328        debug!(
329            ?log_size,
330            ?oldest_retained_loc,
331            ?prune_loc,
332            "pruned inactive ops"
333        );
334
335        Ok(())
336    }
337
338    /// Initializes a new [Db] with the given configuration.
339    pub async fn init(
340        context: E,
341        cfg: Config<T, <Operation<crate::mmr::Family, K, V> as Read>::Cfg>,
342    ) -> Result<Self, Error> {
343        let mut log = Journal::<E, Operation<crate::mmr::Family, K, V>>::init(
344            context.with_label("log"),
345            cfg.log,
346        )
347        .await?;
348
349        // Rewind log to remove uncommitted operations.
350        if log.rewind_to(|op| op.is_commit()).await? == 0 {
351            warn!("Log is empty, initializing new db");
352            log.append(&Operation::CommitFloor(None, Location::new(0)))
353                .await?;
354        }
355
356        // Sync the log to avoid having to repeat any recovery that may have been performed on next
357        // startup.
358        log.sync().await?;
359
360        let last_commit_loc = Location::new(
361            log.size()
362                .await
363                .checked_sub(1)
364                .expect("commit should exist"),
365        );
366
367        // Build the snapshot.
368        let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
369        let (inactivity_floor_loc, active_keys) = {
370            let reader = log.reader().await;
371            let op = reader.read(*last_commit_loc).await?;
372            let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
373            let active_keys =
374                build_snapshot_from_log(inactivity_floor_loc, &reader, &mut snapshot, |_, _| {})
375                    .await?;
376            (inactivity_floor_loc, active_keys)
377        };
378
379        Ok(Self {
380            log,
381            snapshot,
382            active_keys,
383            inactivity_floor_loc,
384            last_commit_loc,
385            steps: 0,
386        })
387    }
388
389    /// Sync all database state to disk. While this isn't necessary to ensure durability of
390    /// committed operations, periodic invocation may reduce memory usage and the time required to
391    /// recover the database on restart.
392    pub async fn sync(&self) -> Result<(), Error> {
393        self.log.sync().await.map_err(Into::into)
394    }
395
396    /// Destroy the db, removing all data from disk.
397    pub async fn destroy(self) -> Result<(), Error> {
398        self.log.destroy().await.map_err(Into::into)
399    }
400
401    #[allow(clippy::type_complexity)]
402    const fn as_floor_helper(
403        &mut self,
404    ) -> FloorHelper<
405        '_,
406        crate::mmr::Family,
407        Index<T, Location>,
408        Journal<E, Operation<crate::mmr::Family, K, V>>,
409    > {
410        FloorHelper {
411            snapshot: &mut self.snapshot,
412            log: &mut self.log,
413        }
414    }
415
416    /// Applies a finalized batch to the in-memory database state and appends its operations to the
417    /// journal, returning the range of written locations.
418    ///
419    /// This publishes the batch to the in-memory database state and appends it to the journal, but
420    /// does not durably persist it. Call [`Db::commit`] or [`Db::sync`] to guarantee durability.
421    pub async fn apply_batch(&mut self, batch: Changeset<K, V>) -> Result<Range<Location>, Error> {
422        let start_loc = self.last_commit_loc + 1;
423        let (diff, metadata) = batch.into_parts();
424
425        for (key, value) in diff {
426            if let Some(value) = value {
427                let updated = {
428                    let reader = self.log.reader().await;
429                    let new_loc = reader.bounds().end;
430                    update_key::<crate::mmr::Family, _, _>(
431                        &mut self.snapshot,
432                        &reader,
433                        &key,
434                        Location::new(new_loc),
435                    )
436                    .await?
437                };
438                if updated.is_some() {
439                    self.steps += 1;
440                } else {
441                    self.active_keys += 1;
442                }
443                self.log
444                    .append(&Operation::Update(Update(key, value)))
445                    .await?;
446            } else {
447                let deleted = {
448                    let reader = self.log.reader().await;
449                    delete_key::<crate::mmr::Family, _, _>(&mut self.snapshot, &reader, &key)
450                        .await?
451                };
452                if deleted.is_some() {
453                    self.log.append(&Operation::Delete(key)).await?;
454                    self.steps += 1;
455                    self.active_keys -= 1;
456                }
457            }
458        }
459
460        // Raise the inactivity floor by `self.steps` steps, plus 1 to account for the previous
461        // commit becoming inactive.
462        if self.is_empty() {
463            self.inactivity_floor_loc = self.size().await;
464            debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
465        } else {
466            let steps_to_take = self.steps + 1;
467            for _ in 0..steps_to_take {
468                let loc = self.inactivity_floor_loc;
469                self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
470            }
471        }
472
473        // Append the commit operation with the new inactivity floor.
474        self.last_commit_loc = Location::new(
475            self.log
476                .append(&Operation::CommitFloor(metadata, self.inactivity_floor_loc))
477                .await?,
478        );
479
480        self.steps = 0;
481
482        let end_loc = self.size().await;
483        Ok(start_loc..end_loc)
484    }
485
486    /// Durably commit the journal state published by prior [`Db::apply_batch`] calls.
487    pub async fn commit(&self) -> Result<(), Error> {
488        self.log.commit().await.map_err(Into::into)
489    }
490}
491
492impl<E, K, V, T> Persistable for Db<E, K, V, T>
493where
494    E: Context,
495    K: Array,
496    V: VariableValue,
497    T: Translator,
498{
499    type Error = Error;
500
501    async fn commit(&self) -> Result<(), Error> {
502        Self::commit(self).await
503    }
504
505    async fn sync(&self) -> Result<(), Error> {
506        self.sync().await
507    }
508
509    async fn destroy(self) -> Result<(), Error> {
510        self.destroy().await
511    }
512}
513
514#[cfg(test)]
515mod test {
516    use super::*;
517    use crate::translator::TwoCap;
518    use commonware_cryptography::{
519        blake3::{Blake3, Digest},
520        Hasher as _,
521    };
522    use commonware_macros::test_traced;
523    use commonware_math::algebra::Random;
524    use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
525    use commonware_utils::{NZUsize, NZU16, NZU64};
526    use std::num::{NonZeroU16, NonZeroUsize};
527
528    const PAGE_SIZE: NonZeroU16 = NZU16!(77);
529    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
530
531    /// The type of the store used in tests.
532    type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap>;
533
534    async fn create_test_store(context: deterministic::Context) -> TestStore {
535        let cfg = Config {
536            log: JournalConfig {
537                partition: "journal".into(),
538                write_buffer: NZUsize!(64 * 1024),
539                compression: None,
540                codec_config: ((), ((0..=10000).into(), ())),
541                items_per_section: NZU64!(7),
542                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
543            },
544            translator: TwoCap,
545        };
546        TestStore::init(context, cfg).await.unwrap()
547    }
548
549    async fn apply_entries(
550        db: &mut TestStore,
551        iter: impl IntoIterator<Item = (Digest, Option<Vec<u8>>)> + Send,
552    ) -> Range<Location> {
553        db.apply_batch(iter.into_iter().collect()).await.unwrap()
554    }
555
556    #[test_traced("DEBUG")]
557    pub fn test_store_construct_empty() {
558        let executor = deterministic::Runner::default();
559        executor.start(|mut context| async move {
560            let mut db = create_test_store(context.with_label("store_0")).await;
561            assert_eq!(db.bounds().await.end, 1);
562            assert_eq!(db.log.bounds().await.start, 0);
563            assert_eq!(db.inactivity_floor_loc(), 0);
564            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
565            assert!(matches!(
566                db.prune(Location::new(1)).await,
567                Err(Error::PruneBeyondMinRequired(_, _))
568            ));
569            assert!(db.get_metadata().await.unwrap().is_none());
570
571            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
572            let d1 = Digest::random(&mut context);
573            let v1 = vec![1, 2, 3];
574            apply_entries(&mut db, [(d1, Some(v1))]).await;
575            drop(db);
576
577            let mut db = create_test_store(context.with_label("store_1")).await;
578            assert_eq!(db.bounds().await.end, 1);
579
580            // Test calling commit on an empty db which should make it (durably) non-empty.
581            let metadata = vec![1, 2, 3];
582            let batch = db.new_batch().finalize(Some(metadata.clone()));
583            let range = db.apply_batch(batch).await.unwrap();
584            assert_eq!(range.start, 1);
585            assert_eq!(range.end, 2);
586            db.commit().await.unwrap();
587            assert_eq!(db.bounds().await.end, 2);
588            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
589            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
590
591            let mut db = create_test_store(context.with_label("store_2")).await;
592            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
593
594            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits on a
595            // non-empty db.
596            apply_entries(
597                &mut db,
598                [(Digest::random(&mut context), Some(vec![1, 2, 3]))],
599            )
600            .await;
601            db.commit().await.unwrap();
602            for _ in 1..100 {
603                db.apply_batch(db.new_batch().finalize(None)).await.unwrap();
604                db.commit().await.unwrap();
605                // Distance should equal 3 after the second commit, with inactivity_floor
606                // referencing the previous commit operation.
607                assert!(db.bounds().await.end - db.inactivity_floor_loc <= 3);
608                assert!(db.get_metadata().await.unwrap().is_none());
609            }
610
611            db.destroy().await.unwrap();
612        });
613    }
614
615    #[test_traced("DEBUG")]
616    fn test_store_construct_basic() {
617        let executor = deterministic::Runner::default();
618
619        executor.start(|mut ctx| async move {
620            let mut db = create_test_store(ctx.with_label("store_0")).await;
621
622            // Ensure the store is empty
623            assert_eq!(db.bounds().await.end, 1);
624            assert_eq!(db.inactivity_floor_loc, 0);
625
626            let key = Digest::random(&mut ctx);
627            let value = vec![2, 3, 4, 5];
628
629            // Attempt to get a key that does not exist
630            let result = db.get(&key).await;
631            assert!(result.unwrap().is_none());
632
633            // Insert a key-value pair. apply_batch writes the Update, a floor-raise move, and a
634            // CommitFloor: 3 new ops on top of the initial commit.
635            apply_entries(&mut db, [(key, Some(value.clone()))]).await;
636
637            assert_eq!(*db.bounds().await.end, 4);
638            assert_eq!(*db.inactivity_floor_loc, 2);
639
640            // Fetch the value
641            let fetched_value = db.get(&key).await.unwrap();
642            assert_eq!(fetched_value.unwrap(), value);
643
644            // Simulate commit failure: drop without commit. The small batch fits in a single
645            // journal section so it is not auto-synced.
646            drop(db);
647
648            // Re-open the store
649            let mut db = create_test_store(ctx.with_label("store_1")).await;
650
651            // Ensure the re-opened store removed the uncommitted operations
652            assert_eq!(*db.bounds().await.end, 1);
653            assert_eq!(*db.inactivity_floor_loc, 0);
654            assert!(db.get_metadata().await.unwrap().is_none());
655
656            // Insert a key-value pair and persist with metadata.
657            let metadata = vec![99, 100];
658            let range = db
659                .apply_batch(
660                    db.new_batch()
661                        .update(key, value.clone())
662                        .finalize(Some(metadata.clone())),
663                )
664                .await
665                .unwrap();
666            assert_eq!(*range.start, 1);
667            assert_eq!(*range.end, 4);
668            db.commit().await.unwrap();
669            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
670
671            assert_eq!(*db.bounds().await.end, 4);
672            assert_eq!(*db.inactivity_floor_loc, 2);
673
674            // Re-open the store
675            let mut db = create_test_store(ctx.with_label("store_2")).await;
676
677            // Ensure the re-opened store retained the committed operations
678            assert_eq!(*db.bounds().await.end, 4);
679            assert_eq!(*db.inactivity_floor_loc, 2);
680
681            // Fetch the value, ensuring it is still present
682            let fetched_value = db.get(&key).await.unwrap();
683            assert_eq!(fetched_value.unwrap(), value);
684
685            // Insert two new k/v pairs to force pruning of the first section.
686            let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
687            let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
688            apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
689            apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
690
691            assert_eq!(*db.bounds().await.end, 10);
692            assert_eq!(*db.inactivity_floor_loc, 5);
693
694            // Each apply_entries writes a CommitFloor with None metadata, replacing
695            // the previously committed metadata.
696            assert_eq!(db.get_metadata().await.unwrap(), None);
697
698            db.commit().await.unwrap();
699            assert_eq!(db.get_metadata().await.unwrap(), None);
700
701            // commit() is just an fsync now, so bounds and floor are unchanged.
702            assert_eq!(*db.bounds().await.end, 10);
703            assert_eq!(*db.inactivity_floor_loc, 5);
704
705            // Ensure all keys can be accessed, despite the first section being pruned.
706            assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
707            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
708            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
709
710            // Update existing key with modified value.
711            let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
712            v1_updated.push(7);
713            apply_entries(&mut db, [(k1, Some(v1_updated))]).await;
714            db.commit().await.unwrap();
715            assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
716
717            // Create new key.
718            let k3 = Digest::random(&mut ctx);
719            apply_entries(&mut db, [(k3, Some(vec![8]))]).await;
720            db.commit().await.unwrap();
721            assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
722
723            // Destroy the store
724            db.destroy().await.unwrap();
725        });
726    }
727
728    #[test_traced("DEBUG")]
729    fn test_store_log_replay() {
730        let executor = deterministic::Runner::default();
731
732        executor.start(|mut ctx| async move {
733            let mut db = create_test_store(ctx.with_label("store_0")).await;
734
735            // Update the same key many times.
736            const UPDATES: u64 = 100;
737            let k = Digest::random(&mut ctx);
738            for _ in 0..UPDATES {
739                let v = vec![1, 2, 3, 4, 5];
740                apply_entries(&mut db, [(k, Some(v.clone()))]).await;
741            }
742
743            let iter = db.snapshot.get(&k);
744            assert_eq!(iter.count(), 1);
745
746            db.commit().await.unwrap();
747            db.sync().await.unwrap();
748            drop(db);
749
750            // Re-open the store, prune it, then ensure it replays the log correctly.
751            let db = create_test_store(ctx.with_label("store_1")).await;
752            db.prune(db.inactivity_floor_loc()).await.unwrap();
753
754            let iter = db.snapshot.get(&k);
755            assert_eq!(iter.count(), 1);
756
757            // First apply_entries: Update + 1 move + CommitFloor = 3 ops. Subsequent 99: Update + 2
758            // moves + CommitFloor = 4 ops each. Total: 1 (init) + 3 + 99*4 = 400.
759            assert_eq!(*db.bounds().await.end, 400);
760            // Only the last Update and CommitFloor are active → floor = 398.
761            assert_eq!(*db.inactivity_floor_loc, 398);
762            let floor = db.inactivity_floor_loc;
763
764            // All blobs prior to the inactivity floor are pruned, so the oldest retained location
765            // is the first in the last retained blob.
766            assert_eq!(db.log.bounds().await.start, *floor - *floor % 7);
767
768            db.destroy().await.unwrap();
769        });
770    }
771
772    #[test_traced("DEBUG")]
773    fn test_store_build_snapshot_keys_with_shared_prefix() {
774        let executor = deterministic::Runner::default();
775
776        executor.start(|mut ctx| async move {
777            let mut db = create_test_store(ctx.with_label("store_0")).await;
778
779            let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
780            let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
781
782            // Ensure k2 shares 2 bytes with k1 (test DB uses `TwoCap` translator.)
783            k2.0[0..2].copy_from_slice(&k1.0[0..2]);
784
785            apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
786            apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
787
788            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
789            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
790
791            db.commit().await.unwrap();
792            db.sync().await.unwrap();
793            drop(db);
794
795            // Re-open the store to ensure it builds the snapshot for the conflicting
796            // keys correctly.
797            let db = create_test_store(ctx.with_label("store_1")).await;
798
799            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
800            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
801
802            db.destroy().await.unwrap();
803        });
804    }
805
806    #[test_traced("DEBUG")]
807    fn test_store_delete() {
808        let executor = deterministic::Runner::default();
809
810        executor.start(|mut ctx| async move {
811            let mut db = create_test_store(ctx.with_label("store_0")).await;
812
813            // Insert a key-value pair
814            let k = Digest::random(&mut ctx);
815            let v = vec![1, 2, 3, 4, 5];
816            apply_entries(&mut db, [(k, Some(v.clone()))]).await;
817            db.commit().await.unwrap();
818
819            // Fetch the value
820            let fetched_value = db.get(&k).await.unwrap();
821            assert_eq!(fetched_value.unwrap(), v);
822
823            // Delete the key
824            assert!(db.get(&k).await.unwrap().is_some());
825            apply_entries(&mut db, [(k, None)]).await;
826
827            // Ensure the key is no longer present
828            let fetched_value = db.get(&k).await.unwrap();
829            assert!(fetched_value.is_none());
830            assert!(db.get(&k).await.unwrap().is_none());
831
832            // Commit the changes
833            db.commit().await.unwrap();
834
835            // Re-open the store and ensure the key is still deleted
836            let mut db = create_test_store(ctx.with_label("store_1")).await;
837            let fetched_value = db.get(&k).await.unwrap();
838            assert!(fetched_value.is_none());
839
840            // Re-insert the key
841            apply_entries(&mut db, [(k, Some(v.clone()))]).await;
842            let fetched_value = db.get(&k).await.unwrap();
843            assert_eq!(fetched_value.unwrap(), v);
844
845            // Commit the changes
846            db.commit().await.unwrap();
847
848            // Re-open the store and ensure the snapshot restores the key, after processing
849            // the delete and the subsequent set.
850            let mut db = create_test_store(ctx.with_label("store_2")).await;
851            let fetched_value = db.get(&k).await.unwrap();
852            assert_eq!(fetched_value.unwrap(), v);
853
854            // Delete a non-existent key (no-op)
855            let k_n = Digest::random(&mut ctx);
856            let range = apply_entries(&mut db, [(k_n, None)]).await;
857            assert_eq!(range.start, 9);
858            assert_eq!(range.end, 11);
859            db.commit().await.unwrap();
860
861            assert!(db.get(&k_n).await.unwrap().is_none());
862            // Make sure k is still there
863            assert!(db.get(&k).await.unwrap().is_some());
864
865            db.destroy().await.unwrap();
866        });
867    }
868
869    /// Tests the pruning example in the module documentation.
870    #[test_traced("DEBUG")]
871    fn test_store_pruning() {
872        let executor = deterministic::Runner::default();
873
874        executor.start(|mut ctx| async move {
875            let mut db = create_test_store(ctx.with_label("store")).await;
876
877            let k_a = Digest::random(&mut ctx);
878            let k_b = Digest::random(&mut ctx);
879
880            let v_a = vec![1];
881            let v_b = vec![];
882            let v_c = vec![4, 5, 6];
883
884            apply_entries(&mut db, [(k_a, Some(v_a.clone()))]).await;
885            apply_entries(&mut db, [(k_b, Some(v_b.clone()))]).await;
886
887            db.commit().await.unwrap();
888            assert_eq!(*db.bounds().await.end, 7);
889            assert_eq!(*db.inactivity_floor_loc, 3);
890            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
891
892            apply_entries(&mut db, [(k_b, Some(v_a.clone()))]).await;
893            apply_entries(&mut db, [(k_a, Some(v_c.clone()))]).await;
894
895            db.commit().await.unwrap();
896            assert_eq!(*db.bounds().await.end, 15);
897            assert_eq!(*db.inactivity_floor_loc, 12);
898            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
899            assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
900
901            db.destroy().await.unwrap();
902        });
903    }
904
905    #[test_traced("WARN")]
906    pub fn test_store_db_recovery() {
907        let executor = deterministic::Runner::default();
908        // Build a db with 1000 keys, some of which we update and some of which we delete.
909        const ELEMENTS: u64 = 1000;
910        executor.start(|context| async move {
911            let db = create_test_store(context.with_label("store_0")).await;
912
913            // Simulate building batches but not applying them (data is not persisted).
914            {
915                let mut batch = db.new_batch();
916                for i in 0u64..ELEMENTS {
917                    let k = Blake3::hash(&i.to_be_bytes());
918                    let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
919                    batch = batch.update(k, v);
920                }
921                // Drop the batch without applying -- simulates a failure before apply.
922            }
923            drop(db);
924            let mut db = create_test_store(context.with_label("store_1")).await;
925            assert_eq!(*db.bounds().await.end, 1);
926
927            // Apply the updates and commit them.
928            for i in 0u64..ELEMENTS {
929                let k = Blake3::hash(&i.to_be_bytes());
930                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
931                apply_entries(&mut db, [(k, Some(v.clone()))]).await;
932            }
933            db.commit().await.unwrap();
934
935            // Update every 3rd key and commit.
936            for i in 0u64..ELEMENTS {
937                if i % 3 != 0 {
938                    continue;
939                }
940                let k = Blake3::hash(&i.to_be_bytes());
941                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
942                apply_entries(&mut db, [(k, Some(v.clone()))]).await;
943            }
944            db.commit().await.unwrap();
945            assert_eq!(db.snapshot.items(), 1000);
946
947            // Delete every 7th key and commit.
948            for i in 0u64..ELEMENTS {
949                if i % 7 != 1 {
950                    continue;
951                }
952                let k = Blake3::hash(&i.to_be_bytes());
953                apply_entries(&mut db, [(k, None)]).await;
954            }
955            db.commit().await.unwrap();
956            let final_count = db.bounds().await.end;
957            let final_floor = db.inactivity_floor_loc;
958
959            // Sync and reopen the store to ensure the state is preserved.
960            db.sync().await.unwrap();
961            drop(db);
962            let db = create_test_store(context.with_label("store_2")).await;
963            assert_eq!(db.bounds().await.end, final_count);
964            assert_eq!(db.inactivity_floor_loc, final_floor);
965
966            db.prune(db.inactivity_floor_loc()).await.unwrap();
967            assert_eq!(db.log.bounds().await.start, *final_floor - *final_floor % 7);
968            assert_eq!(db.snapshot.items(), 857);
969
970            db.destroy().await.unwrap();
971        });
972    }
973
974    #[test_traced("DEBUG")]
975    fn test_store_batch() {
976        let executor = deterministic::Runner::default();
977
978        executor.start(|mut ctx| async move {
979            let mut db = create_test_store(ctx.with_label("store_0")).await;
980
981            // Ensure the store is empty
982            assert_eq!(db.bounds().await.end, 1);
983            assert_eq!(db.inactivity_floor_loc, 0);
984
985            let key = Digest::random(&mut ctx);
986            let value = vec![2, 3, 4, 5];
987
988            let batch = db.new_batch();
989
990            // Attempt to get a key that does not exist
991            let result = batch.get(&key).await;
992            assert!(result.unwrap().is_none());
993
994            // Insert a key-value pair
995            let batch = batch.update(key, value.clone());
996
997            assert_eq!(db.bounds().await.end, 1); // The batch is not applied yet
998            assert_eq!(db.inactivity_floor_loc, 0);
999
1000            // Fetch the value
1001            let fetched_value = batch.get(&key).await.unwrap();
1002            assert_eq!(fetched_value.unwrap(), value);
1003            db.apply_batch(batch.finalize(None)).await.unwrap();
1004            drop(db);
1005
1006            // Re-open the store
1007            let mut db = create_test_store(ctx.with_label("store_1")).await;
1008
1009            // Ensure the batch was not applied since we didn't commit.
1010            assert_eq!(db.bounds().await.end, 1);
1011            assert_eq!(db.inactivity_floor_loc, 0);
1012            assert!(db.get_metadata().await.unwrap().is_none());
1013
1014            // Insert a key-value pair and persist the change.
1015            let metadata = vec![99, 100];
1016            let range = db
1017                .apply_batch(
1018                    db.new_batch()
1019                        .update(key, value.clone())
1020                        .finalize(Some(metadata.clone())),
1021                )
1022                .await
1023                .unwrap();
1024            assert_eq!(range.start, 1);
1025            assert_eq!(range.end, 4);
1026            db.commit().await.unwrap();
1027            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1028            drop(db);
1029
1030            // Re-open the store
1031            let db = create_test_store(ctx.with_label("store_2")).await;
1032
1033            // Ensure the re-opened store retained the committed operations
1034            assert_eq!(db.bounds().await.end, 4);
1035            assert_eq!(db.inactivity_floor_loc, 2);
1036
1037            // Fetch the value, ensuring it is still present
1038            let fetched_value = db.get(&key).await.unwrap();
1039            assert_eq!(fetched_value.unwrap(), value);
1040
1041            // Destroy the store
1042            db.destroy().await.unwrap();
1043        });
1044    }
1045
1046    fn is_send<T: Send>(_: T) {}
1047
1048    #[allow(dead_code)]
1049    fn assert_read_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1050        is_send(db.get(&key));
1051        is_send(db.get_metadata());
1052        is_send(db.prune(loc));
1053        is_send(db.sync());
1054    }
1055
1056    #[allow(dead_code)]
1057    fn assert_write_futures_are_send(
1058        db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap>,
1059        key: Digest,
1060        value: Vec<u8>,
1061    ) {
1062        is_send(db.get(&key));
1063        is_send(db.apply_batch(Changeset::from([(key, Some(value))])));
1064        is_send(db.apply_batch(Changeset::from([(key, None)])));
1065        let batch = db.new_batch();
1066        is_send(batch.get(&key));
1067    }
1068
1069    #[allow(dead_code)]
1070    fn assert_commit_is_send(db: &Db<deterministic::Context, Digest, Vec<u8>, TwoCap>) {
1071        is_send(db.commit());
1072    }
1073}