commonware_storage/adb/store/
mod.rs

1//! A mutable key-value database that supports variable-sized values, but without authentication.
2//!
3//! # Terminology
4//!
5//! A _key_ in an unauthenticated database either has a _value_ or it doesn't. The _update_
6//! operation gives a key a specific value whether it previously had no value or had a different
7//! value.
8//!
9//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
10//! (2) it is an [Operation::Update] operation, and (3) it is the most recent operation for that
11//! key.
12//!
13//! # Lifecycle
14//!
15//! 1. **Initialization**: Create with [Store::init] using a [Config]
16//! 2. **Insertion**: Use [Store::update] to assign a value to a given key
17//! 3. **Deletions**: Use [Store::delete] to remove a key's value
18//! 4. **Persistence**: Call [Store::commit] to make changes durable
19//! 5. **Queries**: Use [Store::get] to retrieve current values
20//! 6. **Cleanup**: Call [Store::close] to shutdown gracefully or [Store::destroy] to remove all
21//!    data
22//!
23//! # Pruning
24//!
25//! The database maintains a location before which all operations are inactive, called the
26//! _inactivity floor_. These items can be cleaned from storage by calling [Db::prune].
27//!
28//! # Example
29//!
30//! ```rust
31//! use commonware_storage::{
32//!     adb::store::{Config, Store},
33//!     translator::TwoCap,
34//! };
35//! use commonware_utils::{NZUsize, NZU64};
36//! use commonware_cryptography::{blake3::Digest, Digest as _};
37//! use commonware_runtime::{buffer::PoolRef, deterministic::Runner, Metrics, Runner as _};
38//!
39//! const PAGE_SIZE: usize = 77;
40//! const PAGE_CACHE_SIZE: usize = 9;
41//!
42//! let executor = Runner::default();
43//! executor.start(|mut ctx| async move {
44//!     let config = Config {
45//!         log_journal_partition: "test_partition".to_string(),
46//!         log_write_buffer: NZUsize!(64 * 1024),
47//!         log_compression: None,
48//!         log_codec_config: (),
49//!         log_items_per_section: NZU64!(4),
50//!         locations_journal_partition: "locations_partition".to_string(),
51//!         locations_items_per_blob: NZU64!(4),
52//!         translator: TwoCap,
53//!         buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
54//!     };
55//!     let mut store =
56//!         Store::<_, Digest, Digest, TwoCap>::init(ctx.with_label("store"), config)
57//!             .await
58//!             .unwrap();
59//!
60//!     // Insert a key-value pair
61//!     let k = Digest::random(&mut ctx);
62//!     let v = Digest::random(&mut ctx);
63//!     store.update(k, v).await.unwrap();
64//!
65//!     // Fetch the value
66//!     let fetched_value = store.get(&k).await.unwrap();
67//!     assert_eq!(fetched_value.unwrap(), v);
68//!
69//!     // Commit the operation to make it persistent
70//!     let metadata = Some(Digest::random(&mut ctx));
71//!     store.commit(metadata).await.unwrap();
72//!
73//!     // Delete the key's value
74//!     store.delete(k).await.unwrap();
75//!
76//!     // Fetch the value
77//!     let fetched_value = store.get(&k).await.unwrap();
78//!     assert!(fetched_value.is_none());
79//!
80//!     // Commit the operation to make it persistent
81//!     store.commit(None).await.unwrap();
82//!
83//!     // Destroy the store
84//!     store.destroy().await.unwrap();
85//! });
86//! ```
87
88use crate::{
89    adb::operation::variable::Operation,
90    index::{Cursor, Index as _, Unordered as Index},
91    journal::{
92        contiguous::fixed::{Config as FConfig, Journal as FJournal},
93        segmented::variable::{Config as VConfig, Journal as VJournal},
94    },
95    mmr::Location,
96    translator::Translator,
97};
98use commonware_codec::{Codec, Read};
99use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage};
100use commonware_utils::{Array, NZUsize};
101use core::future::Future;
102use futures::{pin_mut, try_join, StreamExt};
103use std::{
104    collections::HashMap,
105    num::{NonZeroU64, NonZeroUsize},
106};
107use tracing::{debug, warn};
108
109/// The size of the read buffer to use for replaying the operations log when rebuilding the
110/// snapshot.
111const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
112
113/// Errors that can occur when interacting with a [Store] database.
114#[derive(thiserror::Error, Debug)]
115pub enum Error {
116    /// The requested operation has been pruned.
117    #[error("operation pruned")]
118    OperationPruned(Location),
119
120    #[error(transparent)]
121    Journal(#[from] crate::journal::Error),
122
123    #[error(transparent)]
124    Adb(#[from] crate::adb::Error),
125}
126
127/// Configuration for initializing a [Store] database.
128#[derive(Clone)]
129pub struct Config<T: Translator, C> {
130    /// The name of the [`RStorage`] partition used to persist the log of operations.
131    pub log_journal_partition: String,
132
133    /// The size of the write buffer to use for each blob in the [`VJournal`].
134    pub log_write_buffer: NonZeroUsize,
135
136    /// Optional compression level (using `zstd`) to apply to log data before storing.
137    pub log_compression: Option<u8>,
138
139    /// The codec configuration to use for encoding and decoding log items.
140    pub log_codec_config: C,
141
142    /// The number of operations to store in each section of the [`VJournal`].
143    pub log_items_per_section: NonZeroU64,
144
145    /// The name of the [`RStorage`] partition used for the location map.
146    pub locations_journal_partition: String,
147
148    /// The number of items to put in each blob in the location map.
149    pub locations_items_per_blob: NonZeroU64,
150
151    /// The [`Translator`] used by the compressed index.
152    pub translator: T,
153
154    /// The buffer pool to use for caching data.
155    pub buffer_pool: PoolRef,
156}
157
158/// A trait for any key-value store based on an append-only log of operations.
159pub trait Db<E: RStorage + Clock + Metrics, K: Array, V: Codec, T: Translator> {
160    /// The number of operations that have been applied to this db, including those that have been
161    /// pruned and those that are not yet committed.
162    fn op_count(&self) -> Location;
163
164    /// Return the inactivity floor location. This is the location before which all operations are
165    /// known to be inactive. Operations before this point can be safely pruned.
166    fn inactivity_floor_loc(&self) -> Location;
167
168    /// Get the value of `key` in the db, or None if it has no value.
169    fn get(&self, key: &K) -> impl Future<Output = Result<Option<V>, Error>>;
170
171    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
172    /// subject to rollback until the next successful `commit`.
173    fn update(&mut self, key: K, value: V) -> impl Future<Output = Result<(), Error>>;
174
175    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
176    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
177    /// successful `commit`.
178    fn delete(&mut self, key: K) -> impl Future<Output = Result<(), Error>>;
179
180    /// Commit any pending operations to the database, ensuring their durability upon return from
181    /// this function. Also raises the inactivity floor according to the schedule.
182    ///
183    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
184    /// recover the database on restart.
185    fn commit(&mut self) -> impl Future<Output = Result<(), Error>>;
186
187    /// Sync all database state to disk. While this isn't necessary to ensure durability of
188    /// committed operations, periodic invocation may reduce memory usage and the time required to
189    /// recover the database on restart.
190    fn sync(&mut self) -> impl Future<Output = Result<(), Error>>;
191
192    /// Prune historical operations prior to `target_prune_loc`. This does not affect the db's root
193    /// or current snapshot.
194    fn prune(&mut self, target_prune_loc: Location) -> impl Future<Output = Result<(), Error>>;
195
196    /// Close the db. Operations that have not been committed will be lost or rolled back on
197    /// restart.
198    fn close(self) -> impl Future<Output = Result<(), Error>>;
199
200    /// Destroy the db, removing all data from disk.
201    fn destroy(self) -> impl Future<Output = Result<(), Error>>;
202}
203
204/// An unauthenticated key-value database based off of an append-only [VJournal] of operations.
205pub struct Store<E, K, V, T>
206where
207    E: RStorage + Clock + Metrics,
208    K: Array,
209    V: Codec,
210    T: Translator,
211{
212    /// A log of all [Operation]s that have been applied to the store.
213    log: VJournal<E, Operation<K, V>>,
214
215    /// A snapshot of all currently active operations in the form of a map from each key to the
216    /// section and offset within the section containing its most recent update.
217    ///
218    /// # Invariant
219    ///
220    /// Only references operations of type [Operation::Update].
221    snapshot: Index<T, Location>,
222
223    /// The number of items to store in each section of the variable journal.
224    log_items_per_section: u64,
225
226    /// A fixed-length journal that maps an operation's location to its offset within its respective
227    /// section of the log. (The section number is derived from location.)
228    locations: FJournal<E, u32>,
229
230    /// A location before which all operations are "inactive" (that is, operations before this point
231    /// are over keys that have been updated by some operation at or after this point).
232    inactivity_floor_loc: Location,
233
234    /// The location of the oldest operation in the log that remains readable.
235    oldest_retained_loc: Location,
236
237    /// The total number of operations that have been applied to the store.
238    log_size: Location,
239
240    /// The number of _steps_ to raise the inactivity floor. Each step involves moving exactly one
241    /// active operation to tip.
242    pub(crate) steps: u64,
243
244    /// The location of the last commit operation (if any exists).
245    pub(crate) last_commit: Option<Location>,
246}
247
248impl<E, K, V, T> Store<E, K, V, T>
249where
250    E: RStorage + Clock + Metrics,
251    K: Array,
252    V: Codec,
253    T: Translator,
254{
255    /// Initializes a new [`Store`] database with the given configuration.
256    ///
257    /// ## Rollback
258    ///
259    /// Any uncommitted operations will be rolled back if the [Store] was previously closed without
260    /// committing.
261    pub async fn init(
262        context: E,
263        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
264    ) -> Result<Self, Error> {
265        let snapshot = Index::init(context.with_label("snapshot"), cfg.translator);
266
267        let log = VJournal::init(
268            context.with_label("log"),
269            VConfig {
270                partition: cfg.log_journal_partition,
271                compression: cfg.log_compression,
272                codec_config: cfg.log_codec_config,
273                buffer_pool: cfg.buffer_pool.clone(),
274                write_buffer: cfg.log_write_buffer,
275            },
276        )
277        .await?;
278
279        let locations = FJournal::init(
280            context.with_label("locations"),
281            FConfig {
282                partition: cfg.locations_journal_partition,
283                items_per_blob: cfg.locations_items_per_blob,
284                write_buffer: cfg.log_write_buffer,
285                buffer_pool: cfg.buffer_pool,
286            },
287        )
288        .await?;
289
290        let db = Self {
291            log,
292            snapshot,
293            log_items_per_section: cfg.log_items_per_section.get(),
294            locations,
295            inactivity_floor_loc: Location::new_unchecked(0),
296            oldest_retained_loc: Location::new_unchecked(0),
297            log_size: Location::new_unchecked(0),
298            steps: 0,
299            last_commit: None,
300        };
301
302        db.build_snapshot_from_log().await
303    }
304
305    /// Gets the value associated with the given key in the store.
306    ///
307    /// If the key does not exist, returns `Ok(None)`.
308    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
309        for &loc in self.snapshot.get(key) {
310            let Operation::Update(k, v) = self.get_op(loc).await? else {
311                unreachable!("location ({loc}) does not reference update operation");
312            };
313
314            if &k == key {
315                return Ok(Some(v));
316            }
317        }
318
319        Ok(None)
320    }
321
322    /// Get the value of the operation with location `loc` in the db. Returns
323    /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is
324    /// otherwise assumed valid.
325    pub async fn get_loc(&self, loc: Location) -> Result<Option<V>, Error> {
326        assert!(loc < self.log_size);
327        let op = self.get_op(loc).await?;
328
329        Ok(op.into_value())
330    }
331
332    /// Updates the value associated with the given key in the store.
333    ///
334    /// The operation is immediately visible in the snapshot for subsequent queries, but remains
335    /// uncommitted until [Store::commit] is called. Uncommitted operations will be rolled back
336    /// if the store is closed without committing.
337    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
338        let new_loc = self.log_size;
339        if let Some(old_loc) = self.get_key_loc(&key).await? {
340            Self::update_loc(&mut self.snapshot, &key, old_loc, new_loc);
341            self.steps += 1;
342        } else {
343            self.snapshot.insert(&key, new_loc);
344        };
345
346        self.apply_op(Operation::Update(key, value))
347            .await
348            .map(|_| ())
349    }
350
351    /// Updates the value associated with the given key in the store, inserting a default value
352    /// if the key does not already exist.
353    ///
354    /// The operation is immediately visible in the snapshot for subsequent queries, but remains
355    /// uncommitted until [Store::commit] is called. Uncommitted operations will be rolled back
356    /// if the store is closed without committing.
357    pub async fn upsert(&mut self, key: K, update: impl FnOnce(&mut V)) -> Result<(), Error>
358    where
359        V: Default,
360    {
361        let mut value = self.get(&key).await?.unwrap_or_default();
362        update(&mut value);
363
364        self.update(key, value).await
365    }
366
367    /// Deletes the value associated with the given key in the store. If the key has no value,
368    /// the operation is a no-op.
369    pub async fn delete(&mut self, key: K) -> Result<(), Error> {
370        let Some(old_loc) = self.get_key_loc(&key).await? else {
371            // Key does not exist, so this is a no-op.
372            return Ok(());
373        };
374
375        Self::delete_loc(&mut self.snapshot, &key, old_loc);
376        self.steps += 1;
377
378        self.apply_op(Operation::Delete(key)).await.map(|_| ())
379    }
380
381    /// Commit any pending operations to the database, ensuring their durability upon return from
382    /// this function. Also raises the inactivity floor according to the schedule. Caller can
383    /// associate an arbitrary `metadata` value with the commit.
384    ///
385    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
386    /// recover the database on restart.
387    pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
388        // Raise the inactivity floor by taking `self.steps` steps, plus 1 to account for the
389        // previous commit becoming inactive.
390        if self.is_empty() {
391            self.inactivity_floor_loc = self.op_count();
392            debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
393        } else {
394            let steps_to_take = self.steps + 1;
395            for _ in 0..steps_to_take {
396                self.raise_floor().await?;
397            }
398        }
399        self.steps = 0;
400
401        // Apply the commit operation with the new inactivity floor.
402        self.apply_op(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
403            .await?;
404        self.last_commit = Some(self.op_count() - 1);
405
406        let section = self.current_section();
407        self.log.sync(section).await?;
408
409        debug!(log_size = ?self.log_size, "commit complete");
410
411        Ok(())
412    }
413
414    /// Raise the inactivity floor by taking one _step_, which involves searching for the first
415    /// active operation above the inactivity floor, moving it to tip, and then setting the
416    /// inactivity floor to the location following the moved operation. This method is therefore
417    /// guaranteed to raise the floor by at least one.
418    ///
419    /// # Errors
420    ///
421    /// Expects there is at least one active operation above the inactivity floor, and returns Error
422    /// otherwise.
423    async fn raise_floor(&mut self) -> Result<(), Error> {
424        // Search for the first active operation above the inactivity floor and move it to tip.
425        //
426        // TODO(https://github.com/commonwarexyz/monorepo/issues/1829): optimize this w/ a bitmap.
427        let mut op = self.get_op(self.inactivity_floor_loc).await?;
428        while self
429            .move_op_if_active(op, self.inactivity_floor_loc)
430            .await?
431            .is_none()
432        {
433            self.inactivity_floor_loc += 1;
434            op = self.get_op(self.inactivity_floor_loc).await?;
435        }
436
437        // Increment the floor to the next operation since we know the current one is inactive.
438        self.inactivity_floor_loc += 1;
439
440        Ok(())
441    }
442
443    fn current_section(&self) -> u64 {
444        *self.log_size / self.log_items_per_section
445    }
446
447    /// Sync all database state to disk. While this isn't necessary to ensure durability of
448    /// committed operations, periodic invocation may reduce memory usage and the time required to
449    /// recover the database on restart.
450    pub async fn sync(&mut self) -> Result<(), Error> {
451        let current_section = *self.log_size / self.log_items_per_section;
452        try_join!(self.log.sync(current_section), self.locations.sync())?;
453
454        Ok(())
455    }
456
457    /// Prune historical operations that are behind the inactivity floor. This does not affect the
458    /// state root.
459    ///
460    /// # Panics
461    ///
462    /// Panics if `target_prune_loc` is greater than the inactivity floor.
463    pub async fn prune(&mut self, target_prune_loc: Location) -> Result<(), Error> {
464        // Calculate the target pruning position: inactivity_floor_loc.
465        assert!(target_prune_loc <= self.inactivity_floor_loc);
466        if target_prune_loc <= self.oldest_retained_loc {
467            return Ok(());
468        }
469
470        // Sync locations so it never ends up behind the log.
471        // TODO(https://github.com/commonwarexyz/monorepo/issues/1554): Extend recovery to avoid
472        // this sync.
473        self.locations.sync().await?;
474
475        // Prune the log up to the section containing the requested pruning location. We always
476        // prune the log first, and then prune the locations structure based on the log's actual
477        // pruning boundary. This procedure ensures all log operations always have corresponding
478        // location entries, even in the event of failures, with no need for special recovery.
479        let section_with_target = *target_prune_loc / self.log_items_per_section;
480        if !self.log.prune(section_with_target).await? {
481            return Ok(());
482        }
483        self.oldest_retained_loc =
484            Location::new_unchecked(section_with_target * self.log_items_per_section);
485        debug!(
486            log_size = ?self.log_size,
487            oldest_retained_loc = ?self.oldest_retained_loc,
488            ?target_prune_loc,
489            "pruned inactive ops"
490        );
491
492        // Prune the locations map up to the oldest retained item in the log after pruning.
493        self.locations
494            .prune(*self.oldest_retained_loc)
495            .await
496            .map_err(Error::Journal)?;
497
498        Ok(())
499    }
500
501    /// Get the location and metadata associated with the last commit, or None if no commit has been
502    /// made.
503    ///
504    /// # Errors
505    ///
506    /// Returns Error if there is some underlying storage failure.
507    pub async fn get_metadata(&self) -> Result<Option<(Location, Option<V>)>, Error> {
508        let Some(last_commit) = self.last_commit else {
509            return Ok(None);
510        };
511
512        let Operation::CommitFloor(metadata, _) = self.get_op(last_commit).await? else {
513            unreachable!("last commit should be a commit floor operation");
514        };
515
516        Ok(Some((last_commit, metadata)))
517    }
518
519    /// Closes the store. Any uncommitted operations will be lost if they have not been committed
520    /// via [Store::commit].
521    pub async fn close(self) -> Result<(), Error> {
522        try_join!(self.log.close(), self.locations.close())?;
523
524        Ok(())
525    }
526
527    /// Simulates a commit failure by avoiding syncing either or both of the log or locations.
528    #[cfg(any(test, feature = "fuzzing"))]
529    pub async fn simulate_failure(
530        mut self,
531        sync_locations: bool,
532        sync_log: bool,
533    ) -> Result<(), Error> {
534        if sync_locations {
535            self.locations.sync().await?;
536        }
537        if sync_log {
538            let section = self.current_section();
539            self.log.sync(section).await?;
540        }
541
542        Ok(())
543    }
544
545    /// Destroys the store permanently, removing all persistent data associated with it.
546    ///
547    /// # Warning
548    ///
549    /// This operation is irreversible. Do not call this method unless you are sure
550    /// you want to delete all data associated with this store permanently!
551    pub async fn destroy(self) -> Result<(), Error> {
552        try_join!(self.log.destroy(), self.locations.destroy())?;
553        Ok(())
554    }
555
556    /// Returns the number of operations that have been applied to the store, including those that
557    /// are not yet committed.
558    pub fn op_count(&self) -> Location {
559        self.log_size
560    }
561
562    /// Whether the db currently has no active keys.
563    pub fn is_empty(&self) -> bool {
564        self.snapshot.keys() == 0
565    }
566
567    /// Return the inactivity floor location. This is the location before which all operations are
568    /// known to be inactive.
569    pub fn inactivity_floor_loc(&self) -> Location {
570        self.inactivity_floor_loc
571    }
572
573    /// Builds the database's snapshot from the log of operations. Any operations that sit above
574    /// the latest commit operation are removed.
575    ///
576    /// Returns the number of operations that were applied to the store, the oldest retained
577    /// location, and the inactivity floor location.
578    async fn build_snapshot_from_log(mut self) -> Result<Self, Error> {
579        let mut locations_size = self.locations.size().await;
580
581        // The location and blob-offset of the first operation to follow the last known commit point.
582        let mut after_last_commit = None;
583        // The set of operations that have not yet been committed.
584        let mut uncommitted_ops = HashMap::new();
585        let mut oldest_retained_loc_found = false;
586        {
587            let stream = self
588                .log
589                .replay(0, 0, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
590                .await?;
591            pin_mut!(stream);
592            while let Some(result) = stream.next().await {
593                match result {
594                    Err(e) => {
595                        return Err(Error::Journal(e));
596                    }
597                    Ok((section, offset, _, op)) => {
598                        if !oldest_retained_loc_found {
599                            self.log_size =
600                                Location::new_unchecked(section * self.log_items_per_section);
601                            self.oldest_retained_loc = self.log_size;
602                            oldest_retained_loc_found = true;
603                        }
604
605                        let loc = self.log_size; // location of the current operation.
606                        if after_last_commit.is_none() {
607                            after_last_commit = Some((loc, offset));
608                        }
609
610                        self.log_size += 1;
611
612                        // Consistency check: confirm the provided section matches what we expect from this operation's
613                        // index.
614                        let expected = *loc / self.log_items_per_section;
615                        assert_eq!(section, expected,
616                                "given section {section} did not match expected section {expected} from location {loc}");
617
618                        if self.log_size > locations_size {
619                            warn!(section, offset, "operation was missing from location map");
620                            self.locations.append(offset).await?;
621                            locations_size += 1;
622                        }
623
624                        match op {
625                            Operation::Delete(key) => {
626                                let result = self.get_key_loc(&key).await?;
627                                if let Some(old_loc) = result {
628                                    uncommitted_ops.insert(key, (Some(old_loc), None));
629                                } else {
630                                    uncommitted_ops.remove(&key);
631                                }
632                            }
633                            Operation::Update(key, _) => {
634                                let result = self.get_key_loc(&key).await?;
635                                if let Some(old_loc) = result {
636                                    uncommitted_ops.insert(key, (Some(old_loc), Some(loc)));
637                                } else {
638                                    uncommitted_ops.insert(key, (None, Some(loc)));
639                                }
640                            }
641                            Operation::CommitFloor(_, loc) => {
642                                self.inactivity_floor_loc = loc;
643
644                                // Apply all uncommitted operations.
645                                for (key, (old_loc, new_loc)) in uncommitted_ops.iter() {
646                                    if let Some(old_loc) = old_loc {
647                                        if let Some(new_loc) = new_loc {
648                                            Self::update_loc(
649                                                &mut self.snapshot,
650                                                key,
651                                                *old_loc,
652                                                *new_loc,
653                                            );
654                                        } else {
655                                            Self::delete_loc(&mut self.snapshot, key, *old_loc);
656                                        }
657                                    } else {
658                                        assert!(new_loc.is_some());
659                                        self.snapshot.insert(key, new_loc.unwrap());
660                                    }
661                                }
662                                uncommitted_ops.clear();
663                                after_last_commit = None;
664                            }
665                            _ => unreachable!(
666                                "unexpected operation type at offset {offset} of section {section}"
667                            ),
668                        }
669                    }
670                }
671            }
672        }
673
674        // Rewind the operations log if necessary.
675        if let Some((end_loc, end_offset)) = after_last_commit {
676            assert!(!uncommitted_ops.is_empty());
677            warn!(
678                op_count = uncommitted_ops.len(),
679                log_size = ?end_loc,
680                end_offset,
681                "rewinding over uncommitted operations at end of log"
682            );
683            let prune_to_section = *end_loc / self.log_items_per_section;
684            self.log
685                .rewind_to_offset(prune_to_section, end_offset)
686                .await?;
687            self.log.sync(prune_to_section).await?;
688            self.log_size = end_loc;
689        }
690
691        // Pop any locations that are ahead of the last log commit point.
692        if locations_size > self.log_size {
693            warn!(
694                locations_size,
695                log_size = ?self.log_size,
696                "rewinding uncommitted locations"
697            );
698            self.locations.rewind(*self.log_size).await?;
699            self.locations.sync().await?;
700        }
701
702        // Confirm post-conditions hold.
703        assert_eq!(self.log_size, self.locations.size().await);
704        self.last_commit = self
705            .locations
706            .size()
707            .await
708            .checked_sub(1)
709            .map(Location::new_unchecked);
710        assert!(
711            self.last_commit.is_none()
712                || matches!(
713                    self.get_op(self.last_commit.unwrap()).await?,
714                    Operation::CommitFloor(_, _)
715                )
716        );
717
718        debug!(log_size = ?self.log_size, "build_snapshot_from_log complete");
719
720        Ok(self)
721    }
722
723    /// Append the operation to the log. The `commit` method must be called to make any applied operation
724    /// persistent & recoverable.
725    async fn apply_op(&mut self, op: Operation<K, V>) -> Result<u32, Error> {
726        // Append the operation to the current section of the operations log.
727        let section = self.current_section();
728        let (offset, _) = self.log.append(section, op).await?;
729
730        // Append the offset of the new operation to locations.
731        self.locations.append(offset).await?;
732
733        // Update the uncommitted operations count and increment the log size
734        self.log_size += 1;
735
736        // Maintain the invariant that all completely full sections are synced & immutable.
737        if self.current_section() != section {
738            self.log.sync(section).await?;
739        }
740
741        Ok(offset)
742    }
743
744    /// Gets the location of the most recent [Operation::Update] for the key, or [None] if the key
745    /// does not have a value.
746    async fn get_key_loc(&self, key: &K) -> Result<Option<Location>, Error> {
747        for loc in self.snapshot.get(key) {
748            match self.get_op(*loc).await {
749                Ok(Operation::Update(k, _)) => {
750                    if k == *key {
751                        return Ok(Some(*loc));
752                    }
753                }
754                Err(Error::OperationPruned(_)) => {
755                    unreachable!("invalid location in snapshot: loc={loc}")
756                }
757                _ => unreachable!("non-update operation referenced by snapshot: loc={loc}"),
758            }
759        }
760
761        Ok(None)
762    }
763
764    /// Gets a [Operation] from the log at the given location. Returns [Error::OperationPruned]
765    /// if the location precedes the oldest retained location. The location is otherwise assumed
766    /// valid.
767    async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
768        assert!(loc < self.log_size);
769        if loc < self.oldest_retained_loc {
770            return Err(Error::OperationPruned(loc));
771        }
772
773        let section = *loc / self.log_items_per_section;
774        let offset = self.locations.read(*loc).await?;
775
776        // Get the operation from the log at the specified section and offset.
777        self.log.get(section, offset).await.map_err(Error::Journal)
778    }
779
780    /// Updates the snapshot with the new operation location for the given key.
781    fn update_loc(
782        snapshot: &mut Index<T, Location>,
783        key: &K,
784        old_loc: Location,
785        new_loc: Location,
786    ) {
787        let Some(mut cursor) = snapshot.get_mut(key) else {
788            return;
789        };
790
791        // Find the location in the snapshot and update it.
792        if cursor.find(|&loc| loc == old_loc) {
793            cursor.update(new_loc);
794        }
795    }
796
797    /// Deletes items in the snapshot that point to the given location.
798    fn delete_loc(snapshot: &mut Index<T, Location>, key: &K, old_loc: Location) {
799        let Some(mut cursor) = snapshot.get_mut(key) else {
800            return;
801        };
802
803        // Find the key in the snapshot and delete it.
804        if cursor.find(|&loc| loc == old_loc) {
805            cursor.delete();
806        }
807    }
808
809    // Moves the given operation to the tip of the log if it is active, rendering its old location
810    // inactive. If the operation was not active, then this is a no-op. Returns the old location
811    // of the operation if it was active.
812    async fn move_op_if_active(
813        &mut self,
814        op: Operation<K, V>,
815        old_loc: Location,
816    ) -> Result<Option<Location>, Error> {
817        // If the translated key is not in the snapshot, get a cursor to look for the key.
818        let Some(key) = op.key() else {
819            // `op` is not a key-related operation, so it is not active.
820            return Ok(None);
821        };
822
823        let Some(mut cursor) = self.snapshot.get_mut(key) else {
824            return Ok(None);
825        };
826
827        let new_loc = self.log_size;
828
829        if cursor.find(|&loc| loc == old_loc) {
830            // Update the location of the operation in the snapshot.
831            cursor.update(new_loc);
832            drop(cursor);
833
834            self.apply_op(op).await?;
835            Ok(Some(old_loc))
836        } else {
837            // The operation is not active, so this is a no-op.
838            Ok(None)
839        }
840    }
841}
842
843impl<E, K, V, T> Db<E, K, V, T> for Store<E, K, V, T>
844where
845    E: RStorage + Clock + Metrics,
846    K: Array,
847    V: Codec,
848    T: Translator,
849{
850    fn op_count(&self) -> Location {
851        self.op_count()
852    }
853
854    fn inactivity_floor_loc(&self) -> Location {
855        self.inactivity_floor_loc()
856    }
857
858    async fn get(&self, key: &K) -> Result<Option<V>, Error> {
859        self.get(key).await
860    }
861
862    async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
863        self.update(key, value).await
864    }
865
866    async fn delete(&mut self, key: K) -> Result<(), Error> {
867        self.delete(key).await
868    }
869
870    async fn commit(&mut self) -> Result<(), Error> {
871        self.commit(None).await
872    }
873
874    async fn sync(&mut self) -> Result<(), Error> {
875        self.sync().await
876    }
877
878    async fn prune(&mut self, target_prune_loc: Location) -> Result<(), Error> {
879        self.prune(target_prune_loc).await
880    }
881
882    async fn close(self) -> Result<(), Error> {
883        self.close().await
884    }
885
886    async fn destroy(self) -> Result<(), Error> {
887        self.destroy().await
888    }
889}
890
891#[cfg(test)]
892mod test {
893    use super::*;
894    use crate::translator::TwoCap;
895    use commonware_cryptography::{
896        blake3::{Blake3, Digest},
897        Digest as _, Hasher as _,
898    };
899    use commonware_macros::test_traced;
900    use commonware_runtime::{deterministic, Runner};
901    use commonware_utils::{NZUsize, NZU64};
902
903    const PAGE_SIZE: usize = 77;
904    const PAGE_CACHE_SIZE: usize = 9;
905
906    /// The type of the store used in tests.
907    type TestStore = Store<deterministic::Context, Digest, Vec<u8>, TwoCap>;
908
909    async fn create_test_store(context: deterministic::Context) -> TestStore {
910        let cfg = Config {
911            log_journal_partition: "journal".to_string(),
912            log_write_buffer: NZUsize!(64 * 1024),
913            log_compression: None,
914            log_codec_config: ((0..=10000).into(), ()),
915            log_items_per_section: NZU64!(7),
916            locations_journal_partition: "locations_journal".to_string(),
917            locations_items_per_blob: NZU64!(11),
918            translator: TwoCap,
919            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
920        };
921        Store::init(context, cfg).await.unwrap()
922    }
923
924    #[test_traced("DEBUG")]
925    pub fn test_store_construct_empty() {
926        let executor = deterministic::Runner::default();
927        executor.start(|mut context| async move {
928            let mut db = create_test_store(context.clone()).await;
929            assert_eq!(db.op_count(), 0);
930            assert_eq!(db.oldest_retained_loc, 0);
931            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
932            assert!(db.get_metadata().await.unwrap().is_none());
933
934            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
935            let d1 = Digest::random(&mut context);
936            let v1 = vec![1, 2, 3];
937            db.update(d1, v1).await.unwrap();
938            db.close().await.unwrap();
939            let mut db = create_test_store(context.clone()).await;
940            assert_eq!(db.op_count(), 0);
941
942            // Test calling commit on an empty db which should make it (durably) non-empty.
943            db.commit(None).await.unwrap();
944            assert_eq!(db.op_count(), 1);
945            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
946            let mut db = create_test_store(context.clone()).await;
947
948            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits on a
949            // non-empty db.
950            db.update(Digest::random(&mut context), vec![1, 2, 3])
951                .await
952                .unwrap();
953            for _ in 1..100 {
954                db.commit(None).await.unwrap();
955                // Distance should equal 3 after the second commit, with inactivity_floor
956                // referencing the previous commit operation.
957                assert!(db.op_count() - db.inactivity_floor_loc <= 3);
958            }
959
960            db.destroy().await.unwrap();
961        });
962    }
963
964    #[test_traced("DEBUG")]
965    fn test_store_construct_basic() {
966        let executor = deterministic::Runner::default();
967
968        executor.start(|mut ctx| async move {
969            let mut store = create_test_store(ctx.with_label("store")).await;
970
971            // Ensure the store is empty
972            assert_eq!(store.op_count(), 0);
973            assert_eq!(store.inactivity_floor_loc, 0);
974
975            let key = Digest::random(&mut ctx);
976            let value = vec![2, 3, 4, 5];
977
978            // Attempt to get a key that does not exist
979            let result = store.get(&key).await;
980            assert!(result.unwrap().is_none());
981
982            // Insert a key-value pair
983            store.update(key, value.clone()).await.unwrap();
984
985            assert_eq!(store.log_size, 1);
986            assert_eq!(store.inactivity_floor_loc, 0);
987
988            // Fetch the value
989            let fetched_value = store.get(&key).await.unwrap();
990            assert_eq!(fetched_value.unwrap(), value);
991
992            // Sync the store to persist the changes
993            store.sync().await.unwrap();
994
995            // Re-open the store
996            let mut store = create_test_store(ctx.with_label("store")).await;
997
998            // Ensure the re-opened store removed the uncommitted operations
999            assert_eq!(store.log_size, 0);
1000            assert_eq!(store.inactivity_floor_loc, 0);
1001            assert!(store.get_metadata().await.unwrap().is_none());
1002
1003            // Insert a key-value pair
1004            store.update(key, value.clone()).await.unwrap();
1005
1006            assert_eq!(store.log_size, 1);
1007            assert_eq!(store.inactivity_floor_loc, 0);
1008
1009            // Persist the changes
1010            let metadata = Some(vec![99, 100]);
1011            store.commit(metadata.clone()).await.unwrap();
1012            assert_eq!(
1013                store.get_metadata().await.unwrap(),
1014                Some((Location::new_unchecked(2), metadata.clone()))
1015            );
1016
1017            // Even though the store was pruned, the inactivity floor was raised by 2, and
1018            // the old operations remain in the same blob as an active operation, so they're
1019            // retained.
1020            assert_eq!(store.log_size, 3);
1021            assert_eq!(store.inactivity_floor_loc, 1);
1022
1023            // Re-open the store
1024            let mut store = create_test_store(ctx.with_label("store")).await;
1025
1026            // Ensure the re-opened store retained the committed operations
1027            assert_eq!(store.log_size, 3);
1028            assert_eq!(store.inactivity_floor_loc, 1);
1029
1030            // Fetch the value, ensuring it is still present
1031            let fetched_value = store.get(&key).await.unwrap();
1032            assert_eq!(fetched_value.unwrap(), value);
1033
1034            // Insert two new k/v pairs to force pruning of the first section.
1035            let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
1036            let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
1037            store.update(k1, v1.clone()).await.unwrap();
1038            store.update(k2, v2.clone()).await.unwrap();
1039
1040            assert_eq!(store.log_size, 5);
1041            assert_eq!(store.inactivity_floor_loc, 1);
1042
1043            // Make sure we can still get metadata.
1044            assert_eq!(
1045                store.get_metadata().await.unwrap(),
1046                Some((Location::new_unchecked(2), metadata))
1047            );
1048
1049            store.commit(None).await.unwrap();
1050            assert_eq!(
1051                store.get_metadata().await.unwrap(),
1052                Some((Location::new_unchecked(6), None))
1053            );
1054
1055            assert_eq!(store.log_size, 7);
1056            assert_eq!(store.inactivity_floor_loc, 2);
1057
1058            // Ensure all keys can be accessed, despite the first section being pruned.
1059            assert_eq!(store.get(&key).await.unwrap().unwrap(), value);
1060            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
1061            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
1062
1063            // Destroy the store
1064            store.destroy().await.unwrap();
1065        });
1066    }
1067
1068    #[test_traced("DEBUG")]
1069    fn test_store_log_replay() {
1070        let executor = deterministic::Runner::default();
1071
1072        executor.start(|mut ctx| async move {
1073            let mut store = create_test_store(ctx.with_label("store")).await;
1074
1075            // Update the same key many times.
1076            const UPDATES: u64 = 100;
1077            let k = Digest::random(&mut ctx);
1078            for _ in 0..UPDATES {
1079                let v = vec![1, 2, 3, 4, 5];
1080                store.update(k, v.clone()).await.unwrap();
1081            }
1082
1083            let iter = store.snapshot.get(&k);
1084            assert_eq!(iter.count(), 1);
1085
1086            store.commit(None).await.unwrap();
1087            store.close().await.unwrap();
1088
1089            // Re-open the store, prune it, then ensure it replays the log correctly.
1090            let mut store = create_test_store(ctx.with_label("store")).await;
1091            store.prune(store.inactivity_floor_loc()).await.unwrap();
1092
1093            let iter = store.snapshot.get(&k);
1094            assert_eq!(iter.count(), 1);
1095
1096            // 100 operations were applied, each triggering one step, plus the commit op.
1097            assert_eq!(store.log_size, UPDATES * 2 + 1);
1098            // Only the highest `Update` operation is active, plus the commit operation above it.
1099            let expected_floor = UPDATES * 2 - 1;
1100            assert_eq!(store.inactivity_floor_loc, expected_floor);
1101
1102            // All blobs prior to the inactivity floor are pruned, so the oldest retained location
1103            // is the first in the last retained blob.
1104            assert_eq!(
1105                store.oldest_retained_loc,
1106                expected_floor - expected_floor % 7
1107            );
1108
1109            store.destroy().await.unwrap();
1110        });
1111    }
1112
1113    #[test_traced("DEBUG")]
1114    fn test_store_build_snapshot_keys_with_shared_prefix() {
1115        let executor = deterministic::Runner::default();
1116
1117        executor.start(|mut ctx| async move {
1118            let mut store = create_test_store(ctx.with_label("store")).await;
1119
1120            let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
1121            let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
1122
1123            // Ensure k2 shares 2 bytes with k1 (test DB uses `TwoCap` translator.)
1124            k2.0[0..2].copy_from_slice(&k1.0[0..2]);
1125
1126            store.update(k1, v1.clone()).await.unwrap();
1127            store.update(k2, v2.clone()).await.unwrap();
1128
1129            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
1130            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
1131
1132            store.commit(None).await.unwrap();
1133            store.close().await.unwrap();
1134
1135            // Re-open the store to ensure it builds the snapshot for the conflicting
1136            // keys correctly.
1137            let store = create_test_store(ctx.with_label("store")).await;
1138
1139            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
1140            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
1141
1142            store.destroy().await.unwrap();
1143        });
1144    }
1145
1146    #[test_traced("DEBUG")]
1147    fn test_store_delete() {
1148        let executor = deterministic::Runner::default();
1149
1150        executor.start(|mut ctx| async move {
1151            let mut store = create_test_store(ctx.with_label("store")).await;
1152
1153            // Insert a key-value pair
1154            let k = Digest::random(&mut ctx);
1155            let v = vec![1, 2, 3, 4, 5];
1156            store.update(k, v.clone()).await.unwrap();
1157
1158            // Fetch the value
1159            let fetched_value = store.get(&k).await.unwrap();
1160            assert_eq!(fetched_value.unwrap(), v);
1161
1162            // Delete the key
1163            store.delete(k).await.unwrap();
1164
1165            // Ensure the key is no longer present
1166            let fetched_value = store.get(&k).await.unwrap();
1167            assert!(fetched_value.is_none());
1168
1169            // Commit the changes
1170            store.commit(None).await.unwrap();
1171
1172            // Re-open the store and ensure the key is still deleted
1173            let mut store = create_test_store(ctx.with_label("store")).await;
1174            let fetched_value = store.get(&k).await.unwrap();
1175            assert!(fetched_value.is_none());
1176
1177            // Re-insert the key
1178            store.update(k, v.clone()).await.unwrap();
1179            let fetched_value = store.get(&k).await.unwrap();
1180            assert_eq!(fetched_value.unwrap(), v);
1181
1182            // Commit the changes
1183            store.commit(None).await.unwrap();
1184
1185            // Re-open the store and ensure the snapshot restores the key, after processing
1186            // the delete and the subsequent set.
1187            let mut store = create_test_store(ctx.with_label("store")).await;
1188            let fetched_value = store.get(&k).await.unwrap();
1189            assert_eq!(fetched_value.unwrap(), v);
1190
1191            // Delete a non-existent key (no-op)
1192            let k_n = Digest::random(&mut ctx);
1193            store.delete(k_n).await.unwrap();
1194
1195            let iter = store.snapshot.get(&k);
1196            assert_eq!(iter.count(), 1);
1197
1198            let iter = store.snapshot.get(&k_n);
1199            assert_eq!(iter.count(), 0);
1200
1201            store.destroy().await.unwrap();
1202        });
1203    }
1204
1205    /// Tests the pruning example in the module documentation.
1206    #[test_traced("DEBUG")]
1207    fn test_store_pruning() {
1208        let executor = deterministic::Runner::default();
1209
1210        executor.start(|mut ctx| async move {
1211            let mut store = create_test_store(ctx.with_label("store")).await;
1212
1213            let k_a = Digest::random(&mut ctx);
1214            let k_b = Digest::random(&mut ctx);
1215
1216            let v_a = vec![1];
1217            let v_b = vec![];
1218            let v_c = vec![4, 5, 6];
1219
1220            store.update(k_a, v_a.clone()).await.unwrap();
1221            store.update(k_b, v_b.clone()).await.unwrap();
1222
1223            store.commit(None).await.unwrap();
1224            assert_eq!(store.op_count(), 4);
1225            assert_eq!(store.inactivity_floor_loc, 1);
1226            assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_a);
1227
1228            store.update(k_b, v_a.clone()).await.unwrap();
1229            store.update(k_a, v_c.clone()).await.unwrap();
1230
1231            store.commit(None).await.unwrap();
1232            assert_eq!(store.op_count(), 10);
1233            assert_eq!(store.inactivity_floor_loc, 7);
1234            assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_c);
1235            assert_eq!(store.get(&k_b).await.unwrap().unwrap(), v_a);
1236
1237            store.destroy().await.unwrap();
1238        });
1239    }
1240
1241    #[test_traced("WARN")]
1242    pub fn test_store_db_recovery() {
1243        let executor = deterministic::Runner::default();
1244        // Build a db with 1000 keys, some of which we update and some of which we delete.
1245        const ELEMENTS: u64 = 1000;
1246        executor.start(|context| async move {
1247            let mut db = create_test_store(context.with_label("store")).await;
1248
1249            for i in 0u64..ELEMENTS {
1250                let k = Blake3::hash(&i.to_be_bytes());
1251                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1252                db.update(k, v.clone()).await.unwrap();
1253            }
1254
1255            // Simulate a failed commit and test that we rollback to the previous root.
1256            db.simulate_failure(false, false).await.unwrap();
1257            let mut db = create_test_store(context.with_label("store")).await;
1258            assert_eq!(db.op_count(), 0);
1259
1260            // re-apply the updates and commit them this time.
1261            for i in 0u64..ELEMENTS {
1262                let k = Blake3::hash(&i.to_be_bytes());
1263                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1264                db.update(k, v.clone()).await.unwrap();
1265            }
1266            db.commit(None).await.unwrap();
1267            let op_count = db.op_count();
1268
1269            // Update every 3rd key
1270            for i in 0u64..ELEMENTS {
1271                if i % 3 != 0 {
1272                    continue;
1273                }
1274                let k = Blake3::hash(&i.to_be_bytes());
1275                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1276                db.update(k, v.clone()).await.unwrap();
1277            }
1278
1279            // Simulate a failed commit and test that we rollback to the previous root.
1280            db.simulate_failure(false, false).await.unwrap();
1281            let mut db = create_test_store(context.with_label("store")).await;
1282            assert_eq!(db.op_count(), op_count);
1283
1284            // Re-apply updates for every 3rd key and commit them this time.
1285            for i in 0u64..ELEMENTS {
1286                if i % 3 != 0 {
1287                    continue;
1288                }
1289                let k = Blake3::hash(&i.to_be_bytes());
1290                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1291                db.update(k, v.clone()).await.unwrap();
1292            }
1293            db.commit(None).await.unwrap();
1294            let op_count = db.op_count();
1295            assert_eq!(op_count, 1672);
1296            assert_eq!(db.snapshot.items(), 1000);
1297
1298            // Delete every 7th key
1299            for i in 0u64..ELEMENTS {
1300                if i % 7 != 1 {
1301                    continue;
1302                }
1303                let k = Blake3::hash(&i.to_be_bytes());
1304                db.delete(k).await.unwrap();
1305            }
1306
1307            // Simulate a failed commit and test that we rollback to the previous root.
1308            db.simulate_failure(false, false).await.unwrap();
1309            let db = create_test_store(context.with_label("store")).await;
1310            assert_eq!(db.op_count(), op_count);
1311
1312            // Close and reopen the store to ensure the final commit is preserved.
1313            db.close().await.unwrap();
1314            let mut db = create_test_store(context.with_label("store")).await;
1315            assert_eq!(db.op_count(), op_count);
1316
1317            // Re-delete every 7th key and commit this time.
1318            for i in 0u64..ELEMENTS {
1319                if i % 7 != 1 {
1320                    continue;
1321                }
1322                let k = Blake3::hash(&i.to_be_bytes());
1323                db.delete(k).await.unwrap();
1324            }
1325            db.commit(None).await.unwrap();
1326
1327            assert_eq!(db.op_count(), 1960);
1328            assert_eq!(db.inactivity_floor_loc, 755);
1329
1330            db.prune(db.inactivity_floor_loc()).await.unwrap();
1331            assert_eq!(db.oldest_retained_loc, 755 - 755 % 7);
1332            assert_eq!(db.snapshot.items(), 857);
1333
1334            db.destroy().await.unwrap();
1335        });
1336    }
1337}