Skip to main content

commonware_storage/qmdb/store/
db.rs

1//! A mutable key-value database that supports variable-sized values, but without authentication.
2//!
3//! # Example
4//!
5//! ```rust
6//! use commonware_storage::{
7//!     journal::contiguous::variable::Config as JournalConfig,
8//!     qmdb::store::db::{Config, Db},
9//!     translator::TwoCap,
10//! };
11//! use commonware_utils::{NZUsize, NZU16, NZU64};
12//! use commonware_cryptography::{blake3::Digest, Digest as _};
13//! use commonware_math::algebra::Random;
14//! use commonware_runtime::{
15//!     buffer::paged::CacheRef, deterministic::Runner, Metrics, Runner as _, Supervisor as _,
16//! };
17//!
18//! use std::num::NonZeroU16;
19//! const PAGE_SIZE: NonZeroU16 = NZU16!(8192);
20//! const PAGE_CACHE_SIZE: usize = 100;
21//!
22//! let executor = Runner::default();
23//! executor.start(|mut ctx| async move {
24//!     let config = Config {
25//!         log: JournalConfig {
26//!             partition: "test-partition".into(),
27//!             write_buffer: NZUsize!(64 * 1024),
28//!             compression: None,
29//!             codec_config: ((), ()),
30//!             items_per_section: NZU64!(4),
31//!             page_cache: CacheRef::from_pooler(&ctx, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
32//!         },
33//!         translator: TwoCap,
34//!     };
35//!     let mut db =
36//!         Db::<_, Digest, Digest, TwoCap>::init(ctx.child("store"), config)
37//!             .await
38//!             .unwrap();
39//!
40//!     // Insert a key-value pair
41//!     let k = Digest::random(&mut ctx);
42//!     let v = Digest::random(&mut ctx);
43//!     let metadata = Some(Digest::random(&mut ctx));
44//!     db.apply_batch(db.new_batch().update(k, v).finalize(metadata)).await.unwrap();
45//!     db.commit().await.unwrap();
46//!
47//!     // Fetch the value
48//!     let fetched_value = db.get(&k).await.unwrap();
49//!     assert_eq!(fetched_value.unwrap(), v);
50//!
51//!     // Delete the key's value
52//!     db.apply_batch(db.new_batch().delete(k).finalize(None)).await.unwrap();
53//!     db.commit().await.unwrap();
54//!
55//!     // Fetch the value
56//!     let fetched_value = db.get(&k).await.unwrap();
57//!     assert!(fetched_value.is_none());
58//!
59//!     // Destroy the store
60//!     db.destroy().await.unwrap();
61//! });
62//! ```
63//!
64//! ```ignore
65//! // Advanced mode: while the previous batch is being committed, build exactly
66//! // one child batch from the newly published state.
67//! db.apply_batch(db.new_batch().update(key_a, value_a).finalize(None)).await?;
68//!
69//! let (child_finalized, commit_result) = futures::join!(
70//!     async { db.new_batch().update(key_b, value_b).finalize(None) },
71//!     db.commit(),
72//! );
73//! commit_result?;
74//!
75//! db.apply_batch(child_finalized).await?;
76//! db.commit().await?;
77//! ```
78
79use crate::{
80    index::{unordered::Index, Unordered as _},
81    journal::contiguous::{
82        variable::{Config as JournalConfig, Journal},
83        Mutable as _, Reader,
84    },
85    merkle::mmr::Location,
86    qmdb::{
87        any::{
88            unordered::{variable::Operation, Update},
89            VariableValue,
90        },
91        build_snapshot_from_log, delete_key,
92        operation::{Committable as _, Key, Operation as _},
93        update_key, FloorHelper,
94    },
95    translator::Translator,
96    Context, Persistable,
97};
98use commonware_codec::{CodecShared, Read};
99use commonware_utils::Array;
100use core::ops::Range;
101use std::collections::BTreeMap;
102use tracing::{debug, warn};
103
104type Error = crate::qmdb::Error<crate::mmr::Family>;
105
106/// Configuration for initializing a [Db].
107#[derive(Clone)]
108pub struct Config<T: Translator, C> {
109    /// Configuration for the variable-size operations log journal.
110    pub log: JournalConfig<C>,
111
112    /// The [Translator] used by the [Index].
113    pub translator: T,
114}
115
116/// A finalized batch of writes and deletes ready to be applied to the store.
117pub struct Changeset<K: Key, V: CodecShared + Clone> {
118    diff: BTreeMap<K, Option<V>>,
119    metadata: Option<V>,
120}
121
122impl<K: Key, V: CodecShared + Clone> Changeset<K, V> {
123    fn into_parts(self) -> (BTreeMap<K, Option<V>>, Option<V>) {
124        (self.diff, self.metadata)
125    }
126}
127
128impl<K: Key, V: CodecShared + Clone> FromIterator<(K, Option<V>)> for Changeset<K, V> {
129    fn from_iter<TIter: IntoIterator<Item = (K, Option<V>)>>(iter: TIter) -> Self {
130        Self {
131            diff: iter.into_iter().collect(),
132            metadata: None,
133        }
134    }
135}
136
137impl<K: Key, V: CodecShared + Clone, const N: usize> From<[(K, Option<V>); N]> for Changeset<K, V> {
138    fn from(items: [(K, Option<V>); N]) -> Self {
139        items.into_iter().collect()
140    }
141}
142
143/// A mutable batch of writes and deletes staged against the current store state.
144pub struct Batch<'a, E, K, V, T>
145where
146    E: Context,
147    K: Array,
148    V: VariableValue,
149    T: Translator,
150{
151    db: &'a Db<E, K, V, T>,
152    diff: BTreeMap<K, Option<V>>,
153}
154
155impl<'a, E, K, V, T> Batch<'a, E, K, V, T>
156where
157    E: Context,
158    K: Array,
159    V: VariableValue,
160    T: Translator,
161{
162    const fn new(db: &'a Db<E, K, V, T>) -> Self {
163        Self {
164            db,
165            diff: BTreeMap::new(),
166        }
167    }
168
169    /// Finalize the batch into a changeset that can be applied to the store.
170    pub fn finalize(self, metadata: Option<V>) -> Changeset<K, V> {
171        Changeset {
172            diff: self.diff,
173            metadata,
174        }
175    }
176
177    /// Get the value of `key` in the batch, or the value in the store if it has
178    /// not been modified by the batch.
179    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
180        if let Some(value) = self.diff.get(key) {
181            return Ok(value.clone());
182        }
183        self.db.get(key).await
184    }
185
186    /// Update the value of `key` in the batch.
187    pub fn update(mut self, key: K, value: V) -> Self {
188        self.diff.insert(key, Some(value));
189        self
190    }
191
192    /// Delete the value of `key` in the batch.
193    pub fn delete(mut self, key: K) -> Self {
194        self.diff.insert(key, None);
195        self
196    }
197}
198
199/// An unauthenticated key-value database based off of an append-only [Journal] of operations.
200pub struct Db<E, K, V, T>
201where
202    E: Context,
203    K: Array,
204    V: VariableValue,
205    T: Translator,
206{
207    /// A log of all [Operation]s that have been applied to the store.
208    ///
209    /// # Invariants
210    ///
211    /// - There is always at least one commit operation in the log.
212    /// - The log is never pruned beyond the inactivity floor.
213    log: Journal<E, Operation<crate::mmr::Family, K, V>>,
214
215    /// A snapshot of all currently active operations in the form of a map from each key to the
216    /// location containing its most recent update.
217    ///
218    /// # Invariant
219    ///
220    /// Only references operations of type [Operation::Update].
221    snapshot: Index<T, Location>,
222
223    /// The number of active keys in the store.
224    active_keys: usize,
225
226    /// A location before which all operations are "inactive" (that is, operations before this point
227    /// are over keys that have been updated by some operation at or after this point).
228    pub inactivity_floor_loc: Location,
229
230    /// The location of the last commit operation.
231    pub last_commit_loc: Location,
232
233    /// The number of _steps_ to raise the inactivity floor. Each step involves moving exactly one
234    /// active operation to tip.
235    pub steps: u64,
236}
237
238impl<E, K, V, T> Db<E, K, V, T>
239where
240    E: Context,
241    K: Array,
242    V: VariableValue,
243    T: Translator,
244{
245    /// Get the value of `key` in the db, or None if it has no value.
246    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
247        for &loc in self.snapshot.get(key) {
248            let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
249                unreachable!("location ({loc}) does not reference update operation");
250            };
251
252            if &k == key {
253                return Ok(Some(v));
254            }
255        }
256
257        Ok(None)
258    }
259
260    /// Returns a new empty batch of changes.
261    pub const fn new_batch(&self) -> Batch<'_, E, K, V, T> {
262        Batch::new(self)
263    }
264
265    /// Whether the db currently has no active keys.
266    pub const fn is_empty(&self) -> bool {
267        self.active_keys == 0
268    }
269
270    /// Gets a [Operation] from the log at the given location. Returns [Error::OperationPruned]
271    /// if the location precedes the oldest retained location. The location is otherwise assumed
272    /// valid.
273    async fn get_op(&self, loc: Location) -> Result<Operation<crate::mmr::Family, K, V>, Error> {
274        let reader = self.log.reader().await;
275        assert!(*loc < reader.bounds().end);
276        reader.read(*loc).await.map_err(|e| match e {
277            crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
278            e => Error::Journal(e),
279        })
280    }
281
282    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
283    /// retained operations respectively.
284    pub async fn bounds(&self) -> std::ops::Range<Location> {
285        let bounds = self.log.reader().await.bounds();
286        Location::new(bounds.start)..Location::new(bounds.end)
287    }
288
289    /// Return the Location of the next operation appended to this db.
290    pub async fn size(&self) -> Location {
291        Location::new(self.log.size().await)
292    }
293
294    /// Return the inactivity floor location. This is the location before which all operations are
295    /// known to be inactive. Operations before this point can be safely pruned.
296    pub const fn inactivity_floor_loc(&self) -> Location {
297        self.inactivity_floor_loc
298    }
299
300    /// Get the metadata associated with the last commit.
301    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
302        let Operation::CommitFloor(metadata, _) =
303            self.log.reader().await.read(*self.last_commit_loc).await?
304        else {
305            unreachable!("last commit should be a commit floor operation");
306        };
307
308        Ok(metadata)
309    }
310
311    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root
312    /// or current snapshot.
313    pub async fn prune(&self, prune_loc: Location) -> Result<(), Error> {
314        if prune_loc > self.inactivity_floor_loc {
315            return Err(Error::PruneBeyondMinRequired(
316                prune_loc,
317                self.inactivity_floor_loc,
318            ));
319        }
320
321        // Prune the log. The log will prune at section boundaries, so the actual oldest retained
322        // location may be less than requested.
323        if !self.log.prune(*prune_loc).await? {
324            return Ok(());
325        }
326
327        let bounds = self.log.reader().await.bounds();
328        let log_size = Location::new(bounds.end);
329        let oldest_retained_loc = Location::new(bounds.start);
330        debug!(
331            ?log_size,
332            ?oldest_retained_loc,
333            ?prune_loc,
334            "pruned inactive ops"
335        );
336
337        Ok(())
338    }
339
340    /// Initializes a new [Db] with the given configuration.
341    pub async fn init(
342        context: E,
343        cfg: Config<T, <Operation<crate::mmr::Family, K, V> as Read>::Cfg>,
344    ) -> Result<Self, Error> {
345        let mut log =
346            Journal::<E, Operation<crate::mmr::Family, K, V>>::init(context.child("log"), cfg.log)
347                .await?;
348
349        // Rewind log to remove uncommitted operations.
350        if log.rewind_to(|op| op.is_commit()).await? == 0 {
351            warn!("Log is empty, initializing new db");
352            log.append(&Operation::CommitFloor(None, Location::new(0)))
353                .await?;
354        }
355
356        // Sync the log to avoid having to repeat any recovery that may have been performed on next
357        // startup.
358        log.sync().await?;
359
360        let last_commit_loc = Location::new(
361            log.size()
362                .await
363                .checked_sub(1)
364                .expect("commit should exist"),
365        );
366
367        // Build the snapshot.
368        let mut snapshot = Index::new(context.child("snapshot"), cfg.translator);
369        let (inactivity_floor_loc, active_keys) = {
370            let reader = log.reader().await;
371            let op = reader.read(*last_commit_loc).await?;
372            let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
373            if inactivity_floor_loc > last_commit_loc {
374                return Err(crate::qmdb::Error::DataCorrupted(
375                    "inactivity floor exceeds last commit",
376                ));
377            }
378            let active_keys =
379                build_snapshot_from_log(inactivity_floor_loc, &reader, &mut snapshot, |_, _| {})
380                    .await?;
381            (inactivity_floor_loc, active_keys)
382        };
383
384        Ok(Self {
385            log,
386            snapshot,
387            active_keys,
388            inactivity_floor_loc,
389            last_commit_loc,
390            steps: 0,
391        })
392    }
393
394    /// Sync all database state to disk. While this isn't necessary to ensure durability of
395    /// committed operations, periodic invocation may reduce memory usage and the time required to
396    /// recover the database on restart.
397    pub async fn sync(&self) -> Result<(), Error> {
398        self.log.sync().await.map_err(Into::into)
399    }
400
401    /// Destroy the db, removing all data from disk.
402    pub async fn destroy(self) -> Result<(), Error> {
403        self.log.destroy().await.map_err(Into::into)
404    }
405
406    #[allow(clippy::type_complexity)]
407    const fn as_floor_helper(
408        &mut self,
409    ) -> FloorHelper<
410        '_,
411        crate::mmr::Family,
412        Index<T, Location>,
413        Journal<E, Operation<crate::mmr::Family, K, V>>,
414    > {
415        FloorHelper {
416            snapshot: &mut self.snapshot,
417            log: &mut self.log,
418        }
419    }
420
421    /// Applies a finalized batch to the in-memory database state and appends its operations to the
422    /// journal, returning the range of written locations.
423    ///
424    /// This publishes the batch to the in-memory database state and appends it to the journal, but
425    /// does not durably persist it. Call [`Db::commit`] or [`Db::sync`] to guarantee durability.
426    pub async fn apply_batch(&mut self, batch: Changeset<K, V>) -> Result<Range<Location>, Error> {
427        let start_loc = self.last_commit_loc + 1;
428        let (diff, metadata) = batch.into_parts();
429
430        for (key, value) in diff {
431            if let Some(value) = value {
432                let updated = {
433                    let reader = self.log.reader().await;
434                    let new_loc = reader.bounds().end;
435                    update_key::<crate::mmr::Family, _, _>(
436                        &mut self.snapshot,
437                        &reader,
438                        &key,
439                        Location::new(new_loc),
440                    )
441                    .await?
442                };
443                if updated.is_some() {
444                    self.steps += 1;
445                } else {
446                    self.active_keys += 1;
447                }
448                self.log
449                    .append(&Operation::Update(Update(key, value)))
450                    .await?;
451            } else {
452                let deleted = {
453                    let reader = self.log.reader().await;
454                    delete_key::<crate::mmr::Family, _, _>(&mut self.snapshot, &reader, &key)
455                        .await?
456                };
457                if deleted.is_some() {
458                    self.log.append(&Operation::Delete(key)).await?;
459                    self.steps += 1;
460                    self.active_keys -= 1;
461                }
462            }
463        }
464
465        // Raise the inactivity floor by `self.steps` steps, plus 1 to account for the previous
466        // commit becoming inactive.
467        if self.is_empty() {
468            self.inactivity_floor_loc = self.size().await;
469            debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
470        } else {
471            let steps_to_take = self.steps + 1;
472            for _ in 0..steps_to_take {
473                let loc = self.inactivity_floor_loc;
474                self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
475            }
476        }
477
478        // Append the commit operation with the new inactivity floor.
479        self.last_commit_loc = Location::new(
480            self.log
481                .append(&Operation::CommitFloor(metadata, self.inactivity_floor_loc))
482                .await?,
483        );
484
485        self.steps = 0;
486
487        let end_loc = self.size().await;
488        Ok(start_loc..end_loc)
489    }
490
491    /// Durably commit the journal state published by prior [`Db::apply_batch`] calls.
492    pub async fn commit(&self) -> Result<(), Error> {
493        self.log.commit().await.map_err(Into::into)
494    }
495}
496
497impl<E, K, V, T> Persistable for Db<E, K, V, T>
498where
499    E: Context,
500    K: Array,
501    V: VariableValue,
502    T: Translator,
503{
504    type Error = Error;
505
506    async fn commit(&self) -> Result<(), Error> {
507        Self::commit(self).await
508    }
509
510    async fn sync(&self) -> Result<(), Error> {
511        self.sync().await
512    }
513
514    async fn destroy(self) -> Result<(), Error> {
515        self.destroy().await
516    }
517}
518
519#[cfg(test)]
520mod test {
521    use super::*;
522    use crate::translator::TwoCap;
523    use commonware_cryptography::{
524        blake3::{Blake3, Digest},
525        Hasher as _,
526    };
527    use commonware_macros::test_traced;
528    use commonware_math::algebra::Random;
529    use commonware_runtime::{buffer::paged::CacheRef, deterministic, Runner, Supervisor as _};
530    use commonware_utils::{NZUsize, NZU16, NZU64};
531    use std::num::{NonZeroU16, NonZeroUsize};
532
533    const PAGE_SIZE: NonZeroU16 = NZU16!(77);
534    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
535
536    /// The type of the store used in tests.
537    type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap>;
538
539    async fn create_test_store(context: deterministic::Context) -> TestStore {
540        let cfg = Config {
541            log: JournalConfig {
542                partition: "journal".into(),
543                write_buffer: NZUsize!(64 * 1024),
544                compression: None,
545                codec_config: ((), ((0..=10000).into(), ())),
546                items_per_section: NZU64!(7),
547                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
548            },
549            translator: TwoCap,
550        };
551        TestStore::init(context, cfg).await.unwrap()
552    }
553
554    async fn apply_entries(
555        db: &mut TestStore,
556        iter: impl IntoIterator<Item = (Digest, Option<Vec<u8>>)> + Send,
557    ) -> Range<Location> {
558        db.apply_batch(iter.into_iter().collect()).await.unwrap()
559    }
560
561    #[test_traced("DEBUG")]
562    pub fn test_store_construct_empty() {
563        let executor = deterministic::Runner::default();
564        executor.start(|mut context| async move {
565            let mut db = create_test_store(context.child("store").with_attribute("index", 0)).await;
566            assert_eq!(db.bounds().await.end, 1);
567            assert_eq!(db.log.bounds().await.start, 0);
568            assert_eq!(db.inactivity_floor_loc(), 0);
569            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
570            assert!(matches!(
571                db.prune(Location::new(1)).await,
572                Err(Error::PruneBeyondMinRequired(_, _))
573            ));
574            assert!(db.get_metadata().await.unwrap().is_none());
575
576            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
577            let d1 = Digest::random(&mut context);
578            let v1 = vec![1, 2, 3];
579            apply_entries(&mut db, [(d1, Some(v1))]).await;
580            drop(db);
581
582            let mut db = create_test_store(context.child("store").with_attribute("index", 1)).await;
583            assert_eq!(db.bounds().await.end, 1);
584
585            // Test calling commit on an empty db which should make it (durably) non-empty.
586            let metadata = vec![1, 2, 3];
587            let batch = db.new_batch().finalize(Some(metadata.clone()));
588            let range = db.apply_batch(batch).await.unwrap();
589            assert_eq!(range.start, 1);
590            assert_eq!(range.end, 2);
591            db.commit().await.unwrap();
592            assert_eq!(db.bounds().await.end, 2);
593            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
594            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
595
596            let mut db = create_test_store(context.child("store").with_attribute("index", 2)).await;
597            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
598
599            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits on a
600            // non-empty db.
601            apply_entries(
602                &mut db,
603                [(Digest::random(&mut context), Some(vec![1, 2, 3]))],
604            )
605            .await;
606            db.commit().await.unwrap();
607            for _ in 1..100 {
608                db.apply_batch(db.new_batch().finalize(None)).await.unwrap();
609                db.commit().await.unwrap();
610                // Distance should equal 3 after the second commit, with inactivity_floor
611                // referencing the previous commit operation.
612                assert!(db.bounds().await.end - db.inactivity_floor_loc <= 3);
613                assert!(db.get_metadata().await.unwrap().is_none());
614            }
615
616            db.destroy().await.unwrap();
617        });
618    }
619
620    #[test_traced("DEBUG")]
621    fn test_store_construct_basic() {
622        let executor = deterministic::Runner::default();
623
624        executor.start(|mut ctx| async move {
625            let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
626
627            // Ensure the store is empty
628            assert_eq!(db.bounds().await.end, 1);
629            assert_eq!(db.inactivity_floor_loc, 0);
630
631            let key = Digest::random(&mut ctx);
632            let value = vec![2, 3, 4, 5];
633
634            // Attempt to get a key that does not exist
635            let result = db.get(&key).await;
636            assert!(result.unwrap().is_none());
637
638            // Insert a key-value pair. apply_batch writes the Update, a floor-raise move, and a
639            // CommitFloor: 3 new ops on top of the initial commit.
640            apply_entries(&mut db, [(key, Some(value.clone()))]).await;
641
642            assert_eq!(*db.bounds().await.end, 4);
643            assert_eq!(*db.inactivity_floor_loc, 2);
644
645            // Fetch the value
646            let fetched_value = db.get(&key).await.unwrap();
647            assert_eq!(fetched_value.unwrap(), value);
648
649            // Simulate commit failure: drop without commit. The small batch fits in a single
650            // journal section so it is not auto-synced.
651            drop(db);
652
653            // Re-open the store
654            let mut db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
655
656            // Ensure the re-opened store removed the uncommitted operations
657            assert_eq!(*db.bounds().await.end, 1);
658            assert_eq!(*db.inactivity_floor_loc, 0);
659            assert!(db.get_metadata().await.unwrap().is_none());
660
661            // Insert a key-value pair and persist with metadata.
662            let metadata = vec![99, 100];
663            let range = db
664                .apply_batch(
665                    db.new_batch()
666                        .update(key, value.clone())
667                        .finalize(Some(metadata.clone())),
668                )
669                .await
670                .unwrap();
671            assert_eq!(*range.start, 1);
672            assert_eq!(*range.end, 4);
673            db.commit().await.unwrap();
674            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
675
676            assert_eq!(*db.bounds().await.end, 4);
677            assert_eq!(*db.inactivity_floor_loc, 2);
678
679            // Re-open the store
680            let mut db = create_test_store(ctx.child("store").with_attribute("index", 2)).await;
681
682            // Ensure the re-opened store retained the committed operations
683            assert_eq!(*db.bounds().await.end, 4);
684            assert_eq!(*db.inactivity_floor_loc, 2);
685
686            // Fetch the value, ensuring it is still present
687            let fetched_value = db.get(&key).await.unwrap();
688            assert_eq!(fetched_value.unwrap(), value);
689
690            // Insert two new k/v pairs to force pruning of the first section.
691            let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
692            let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
693            apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
694            apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
695
696            assert_eq!(*db.bounds().await.end, 10);
697            assert_eq!(*db.inactivity_floor_loc, 5);
698
699            // Each apply_entries writes a CommitFloor with None metadata, replacing
700            // the previously committed metadata.
701            assert_eq!(db.get_metadata().await.unwrap(), None);
702
703            db.commit().await.unwrap();
704            assert_eq!(db.get_metadata().await.unwrap(), None);
705
706            // commit() is just an fsync now, so bounds and floor are unchanged.
707            assert_eq!(*db.bounds().await.end, 10);
708            assert_eq!(*db.inactivity_floor_loc, 5);
709
710            // Ensure all keys can be accessed, despite the first section being pruned.
711            assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
712            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
713            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
714
715            // Update existing key with modified value.
716            let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
717            v1_updated.push(7);
718            apply_entries(&mut db, [(k1, Some(v1_updated))]).await;
719            db.commit().await.unwrap();
720            assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
721
722            // Create new key.
723            let k3 = Digest::random(&mut ctx);
724            apply_entries(&mut db, [(k3, Some(vec![8]))]).await;
725            db.commit().await.unwrap();
726            assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
727
728            // Destroy the store
729            db.destroy().await.unwrap();
730        });
731    }
732
733    #[test_traced("DEBUG")]
734    fn test_store_log_replay() {
735        let executor = deterministic::Runner::default();
736
737        executor.start(|mut ctx| async move {
738            let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
739
740            // Update the same key many times.
741            const UPDATES: u64 = 100;
742            let k = Digest::random(&mut ctx);
743            for _ in 0..UPDATES {
744                let v = vec![1, 2, 3, 4, 5];
745                apply_entries(&mut db, [(k, Some(v.clone()))]).await;
746            }
747
748            let iter = db.snapshot.get(&k);
749            assert_eq!(iter.count(), 1);
750
751            db.commit().await.unwrap();
752            db.sync().await.unwrap();
753            drop(db);
754
755            // Re-open the store, prune it, then ensure it replays the log correctly.
756            let db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
757            db.prune(db.inactivity_floor_loc()).await.unwrap();
758
759            let iter = db.snapshot.get(&k);
760            assert_eq!(iter.count(), 1);
761
762            // First apply_entries: Update + 1 move + CommitFloor = 3 ops. Subsequent 99: Update + 2
763            // moves + CommitFloor = 4 ops each. Total: 1 (init) + 3 + 99*4 = 400.
764            assert_eq!(*db.bounds().await.end, 400);
765            // Only the last Update and CommitFloor are active → floor = 398.
766            assert_eq!(*db.inactivity_floor_loc, 398);
767            let floor = db.inactivity_floor_loc;
768
769            // All blobs prior to the inactivity floor are pruned, so the oldest retained location
770            // is the first in the last retained blob.
771            assert_eq!(db.log.bounds().await.start, *floor - *floor % 7);
772
773            db.destroy().await.unwrap();
774        });
775    }
776
777    #[test_traced("DEBUG")]
778    fn test_store_build_snapshot_keys_with_shared_prefix() {
779        let executor = deterministic::Runner::default();
780
781        executor.start(|mut ctx| async move {
782            let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
783
784            let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
785            let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
786
787            // Ensure k2 shares 2 bytes with k1 (test DB uses `TwoCap` translator.)
788            k2.0[0..2].copy_from_slice(&k1.0[0..2]);
789
790            apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
791            apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
792
793            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
794            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
795
796            db.commit().await.unwrap();
797            db.sync().await.unwrap();
798            drop(db);
799
800            // Re-open the store to ensure it builds the snapshot for the conflicting
801            // keys correctly.
802            let db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
803
804            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
805            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
806
807            db.destroy().await.unwrap();
808        });
809    }
810
811    #[test_traced("DEBUG")]
812    fn test_store_delete() {
813        let executor = deterministic::Runner::default();
814
815        executor.start(|mut ctx| async move {
816            let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
817
818            // Insert a key-value pair
819            let k = Digest::random(&mut ctx);
820            let v = vec![1, 2, 3, 4, 5];
821            apply_entries(&mut db, [(k, Some(v.clone()))]).await;
822            db.commit().await.unwrap();
823
824            // Fetch the value
825            let fetched_value = db.get(&k).await.unwrap();
826            assert_eq!(fetched_value.unwrap(), v);
827
828            // Delete the key
829            assert!(db.get(&k).await.unwrap().is_some());
830            apply_entries(&mut db, [(k, None)]).await;
831
832            // Ensure the key is no longer present
833            let fetched_value = db.get(&k).await.unwrap();
834            assert!(fetched_value.is_none());
835            assert!(db.get(&k).await.unwrap().is_none());
836
837            // Commit the changes
838            db.commit().await.unwrap();
839
840            // Re-open the store and ensure the key is still deleted
841            let mut db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
842            let fetched_value = db.get(&k).await.unwrap();
843            assert!(fetched_value.is_none());
844
845            // Re-insert the key
846            apply_entries(&mut db, [(k, Some(v.clone()))]).await;
847            let fetched_value = db.get(&k).await.unwrap();
848            assert_eq!(fetched_value.unwrap(), v);
849
850            // Commit the changes
851            db.commit().await.unwrap();
852
853            // Re-open the store and ensure the snapshot restores the key, after processing
854            // the delete and the subsequent set.
855            let mut db = create_test_store(ctx.child("store").with_attribute("index", 2)).await;
856            let fetched_value = db.get(&k).await.unwrap();
857            assert_eq!(fetched_value.unwrap(), v);
858
859            // Delete a non-existent key (no-op)
860            let k_n = Digest::random(&mut ctx);
861            let range = apply_entries(&mut db, [(k_n, None)]).await;
862            assert_eq!(range.start, 9);
863            assert_eq!(range.end, 11);
864            db.commit().await.unwrap();
865
866            assert!(db.get(&k_n).await.unwrap().is_none());
867            // Make sure k is still there
868            assert!(db.get(&k).await.unwrap().is_some());
869
870            db.destroy().await.unwrap();
871        });
872    }
873
874    /// Tests the pruning example in the module documentation.
875    #[test_traced("DEBUG")]
876    fn test_store_pruning() {
877        let executor = deterministic::Runner::default();
878
879        executor.start(|mut ctx| async move {
880            let mut db = create_test_store(ctx.child("store")).await;
881
882            let k_a = Digest::random(&mut ctx);
883            let k_b = Digest::random(&mut ctx);
884
885            let v_a = vec![1];
886            let v_b = vec![];
887            let v_c = vec![4, 5, 6];
888
889            apply_entries(&mut db, [(k_a, Some(v_a.clone()))]).await;
890            apply_entries(&mut db, [(k_b, Some(v_b.clone()))]).await;
891
892            db.commit().await.unwrap();
893            assert_eq!(*db.bounds().await.end, 7);
894            assert_eq!(*db.inactivity_floor_loc, 3);
895            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
896
897            apply_entries(&mut db, [(k_b, Some(v_a.clone()))]).await;
898            apply_entries(&mut db, [(k_a, Some(v_c.clone()))]).await;
899
900            db.commit().await.unwrap();
901            assert_eq!(*db.bounds().await.end, 15);
902            assert_eq!(*db.inactivity_floor_loc, 12);
903            assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
904            assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
905
906            db.destroy().await.unwrap();
907        });
908    }
909
910    #[test_traced("WARN")]
911    pub fn test_store_db_recovery() {
912        let executor = deterministic::Runner::default();
913        // Build a db with 1000 keys, some of which we update and some of which we delete.
914        const ELEMENTS: u64 = 1000;
915        executor.start(|context| async move {
916            let db = create_test_store(context.child("store").with_attribute("index", 0)).await;
917
918            // Simulate building batches but not applying them (data is not persisted).
919            {
920                let mut batch = db.new_batch();
921                for i in 0u64..ELEMENTS {
922                    let k = Blake3::hash(&i.to_be_bytes());
923                    let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
924                    batch = batch.update(k, v);
925                }
926                // Drop the batch without applying -- simulates a failure before apply.
927            }
928            drop(db);
929            let mut db = create_test_store(context.child("store").with_attribute("index", 1)).await;
930            assert_eq!(*db.bounds().await.end, 1);
931
932            // Apply the updates and commit them.
933            for i in 0u64..ELEMENTS {
934                let k = Blake3::hash(&i.to_be_bytes());
935                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
936                apply_entries(&mut db, [(k, Some(v.clone()))]).await;
937            }
938            db.commit().await.unwrap();
939
940            // Update every 3rd key and commit.
941            for i in 0u64..ELEMENTS {
942                if i % 3 != 0 {
943                    continue;
944                }
945                let k = Blake3::hash(&i.to_be_bytes());
946                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
947                apply_entries(&mut db, [(k, Some(v.clone()))]).await;
948            }
949            db.commit().await.unwrap();
950            assert_eq!(db.snapshot.items(), 1000);
951
952            // Delete every 7th key and commit.
953            for i in 0u64..ELEMENTS {
954                if i % 7 != 1 {
955                    continue;
956                }
957                let k = Blake3::hash(&i.to_be_bytes());
958                apply_entries(&mut db, [(k, None)]).await;
959            }
960            db.commit().await.unwrap();
961            let final_count = db.bounds().await.end;
962            let final_floor = db.inactivity_floor_loc;
963
964            // Sync and reopen the store to ensure the state is preserved.
965            db.sync().await.unwrap();
966            drop(db);
967            let db = create_test_store(context.child("store").with_attribute("index", 2)).await;
968            assert_eq!(db.bounds().await.end, final_count);
969            assert_eq!(db.inactivity_floor_loc, final_floor);
970
971            db.prune(db.inactivity_floor_loc()).await.unwrap();
972            assert_eq!(db.log.bounds().await.start, *final_floor - *final_floor % 7);
973            assert_eq!(db.snapshot.items(), 857);
974
975            db.destroy().await.unwrap();
976        });
977    }
978
979    #[test_traced("DEBUG")]
980    fn test_store_batch() {
981        let executor = deterministic::Runner::default();
982
983        executor.start(|mut ctx| async move {
984            let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
985
986            // Ensure the store is empty
987            assert_eq!(db.bounds().await.end, 1);
988            assert_eq!(db.inactivity_floor_loc, 0);
989
990            let key = Digest::random(&mut ctx);
991            let value = vec![2, 3, 4, 5];
992
993            let batch = db.new_batch();
994
995            // Attempt to get a key that does not exist
996            let result = batch.get(&key).await;
997            assert!(result.unwrap().is_none());
998
999            // Insert a key-value pair
1000            let batch = batch.update(key, value.clone());
1001
1002            assert_eq!(db.bounds().await.end, 1); // The batch is not applied yet
1003            assert_eq!(db.inactivity_floor_loc, 0);
1004
1005            // Fetch the value
1006            let fetched_value = batch.get(&key).await.unwrap();
1007            assert_eq!(fetched_value.unwrap(), value);
1008            db.apply_batch(batch.finalize(None)).await.unwrap();
1009            drop(db);
1010
1011            // Re-open the store
1012            let mut db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
1013
1014            // Ensure the batch was not applied since we didn't commit.
1015            assert_eq!(db.bounds().await.end, 1);
1016            assert_eq!(db.inactivity_floor_loc, 0);
1017            assert!(db.get_metadata().await.unwrap().is_none());
1018
1019            // Insert a key-value pair and persist the change.
1020            let metadata = vec![99, 100];
1021            let range = db
1022                .apply_batch(
1023                    db.new_batch()
1024                        .update(key, value.clone())
1025                        .finalize(Some(metadata.clone())),
1026                )
1027                .await
1028                .unwrap();
1029            assert_eq!(range.start, 1);
1030            assert_eq!(range.end, 4);
1031            db.commit().await.unwrap();
1032            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1033            drop(db);
1034
1035            // Re-open the store
1036            let db = create_test_store(ctx.child("store").with_attribute("index", 2)).await;
1037
1038            // Ensure the re-opened store retained the committed operations
1039            assert_eq!(db.bounds().await.end, 4);
1040            assert_eq!(db.inactivity_floor_loc, 2);
1041
1042            // Fetch the value, ensuring it is still present
1043            let fetched_value = db.get(&key).await.unwrap();
1044            assert_eq!(fetched_value.unwrap(), value);
1045
1046            // Destroy the store
1047            db.destroy().await.unwrap();
1048        });
1049    }
1050
1051    fn is_send<T: Send>(_: T) {}
1052
1053    #[allow(dead_code)]
1054    fn assert_read_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1055        is_send(db.get(&key));
1056        is_send(db.get_metadata());
1057        is_send(db.prune(loc));
1058        is_send(db.sync());
1059    }
1060
1061    #[allow(dead_code)]
1062    fn assert_write_futures_are_send(
1063        db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap>,
1064        key: Digest,
1065        value: Vec<u8>,
1066    ) {
1067        is_send(db.get(&key));
1068        is_send(db.apply_batch(Changeset::from([(key, Some(value))])));
1069        is_send(db.apply_batch(Changeset::from([(key, None)])));
1070        let batch = db.new_batch();
1071        is_send(batch.get(&key));
1072    }
1073
1074    #[allow(dead_code)]
1075    fn assert_commit_is_send(db: &Db<deterministic::Context, Digest, Vec<u8>, TwoCap>) {
1076        is_send(db.commit());
1077    }
1078}