commonware_storage/store/
mod.rs

1//! A mutable key-value database that supports variable-sized values.
2//!
3//! # Terminology
4//!
5//! A _key_ in an unauthenticated database either has a _value_ or it doesn't. The _update_
6//! operation gives a key a specific value whether it previously had no value or had a different
7//! value.
8//!
9//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
10//! (2) it is an [Operation::Update] operation, and (3) it is the most recent operation for that
11//! key.
12//!
13//! # Lifecycle
14//!
15//! 1. **Initialization**: Create with [Store::init] using a [Config]
16//! 2. **Insertion**: Use [Store::update] to assign a value to a given key
17//! 3. **Deletions**: Use [Store::delete] to remove a key's value
18//! 4. **Persistence**: Call [Store::commit] to make changes durable
19//! 5. **Queries**: Use [Store::get] to retrieve current values
20//! 6. **Cleanup**: Call [Store::close] to shutdown gracefully or [Store::destroy] to remove all
21//!    data
22//!
23//! # Pruning
24//!
25//! The database prunes _inactive_ operations every time [Store::commit] is called. To achieve
26//! this, an _inactivity floor_ is maintained, which is the location at which all operations before
27//! are inactive. At commit-time, the inactivity floor is raised by the number of uncommitted
28//! operations plus 1 for the tailing commit op. During this process, any encountered active
29//! operations are re-applied to the tip of the log.
30//!
31//! |                               Log State                                            | Inactivity Floor | Uncommitted Ops |
32//! |------------------------------------------------------------------------------------|------------------|-----------------|
33//! | [pre-commit] Update(a, v), Update(a, v')                                           |                0 |               2 |
34//! | [raise-floor] Update(a, v), Update(a, v'), Update(a, v'), Update(a, v')            |                3 |               2 |
35//! | [prune+commit] Update(a, v'), Commit(3)                                            |                3 |               0 |
36//! | [pre-commit] Update(a, v'), Commit(3), Update(b, v), Update(a, v'')                |                3 |               2 |
37//! | [raise-floor] Update(a, v'), Commit(3), Update(b, v), Update(a, v''), Update(b, v) |                6 |               2 |
38//! | [prune+commit] Update(a, v''), Update(b, v), Commit(6)                             |                6 |               0 |
39//!
40//! # Example
41//!
42//! ```rust
43//! use commonware_storage::{
44//!     store::{Config, Store},
45//!     translator::TwoCap,
46//! };
47//! use commonware_utils::{NZUsize, NZU64};
48//! use commonware_cryptography::{blake3::Digest, Digest as _};
49//! use commonware_runtime::{buffer::PoolRef, deterministic::Runner, Metrics, Runner as _};
50//!
51//! const PAGE_SIZE: usize = 77;
52//! const PAGE_CACHE_SIZE: usize = 9;
53//!
54//! let executor = Runner::default();
55//! executor.start(|mut ctx| async move {
56//!     let config = Config {
57//!         log_journal_partition: "test_partition".to_string(),
58//!         log_write_buffer: NZUsize!(64 * 1024),
59//!         log_compression: None,
60//!         log_codec_config: (),
61//!         log_items_per_section: NZU64!(4),
62//!         locations_journal_partition: "locations_partition".to_string(),
63//!         locations_items_per_blob: NZU64!(4),
64//!         translator: TwoCap,
65//!         buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
66//!     };
67//!     let mut store =
68//!         Store::<_, Digest, Digest, TwoCap>::init(ctx.with_label("store"), config)
69//!             .await
70//!             .unwrap();
71//!
72//!     // Insert a key-value pair
73//!     let k = Digest::random(&mut ctx);
74//!     let v = Digest::random(&mut ctx);
75//!     store.update(k, v).await.unwrap();
76//!
77//!     // Fetch the value
78//!     let fetched_value = store.get(&k).await.unwrap();
79//!     assert_eq!(fetched_value.unwrap(), v);
80//!
81//!     // Commit the operation to make it persistent
82//!     let metadata = Some(Digest::random(&mut ctx));
83//!     store.commit(metadata).await.unwrap();
84//!
85//!     // Delete the key's value
86//!     store.delete(k).await.unwrap();
87//!
88//!     // Fetch the value
89//!     let fetched_value = store.get(&k).await.unwrap();
90//!     assert!(fetched_value.is_none());
91//!
92//!     // Commit the operation to make it persistent
93//!     store.commit(None).await.unwrap();
94//!
95//!     // Destroy the store
96//!     store.destroy().await.unwrap();
97//! });
98//! ```
99
100use crate::{
101    index::Index,
102    journal::{
103        fixed::{Config as FConfig, Journal as FJournal},
104        variable::{Config as VConfig, Journal as VJournal},
105    },
106    store::operation::Variable as Operation,
107    translator::Translator,
108};
109use commonware_codec::{Codec, Read};
110use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage};
111use commonware_utils::{Array, NZUsize};
112use futures::{pin_mut, try_join, StreamExt};
113use std::{
114    collections::HashMap,
115    num::{NonZeroU64, NonZeroUsize},
116};
117use tracing::{debug, warn};
118
119pub mod operation;
120
121/// The size of the read buffer to use for replaying the operations log when rebuilding the
122/// snapshot.
123const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
124
125/// Errors that can occur when interacting with a [Store] database.
126#[derive(thiserror::Error, Debug)]
127pub enum Error {
128    #[error(transparent)]
129    Journal(#[from] crate::journal::Error),
130
131    /// The requested operation has been pruned.
132    #[error("operation pruned")]
133    OperationPruned(u64),
134}
135
136/// Configuration for initializing a [Store] database.
137#[derive(Clone)]
138pub struct Config<T: Translator, C> {
139    /// The name of the [`RStorage`] partition used to persist the log of operations.
140    pub log_journal_partition: String,
141
142    /// The size of the write buffer to use for each blob in the [`VJournal`].
143    pub log_write_buffer: NonZeroUsize,
144
145    /// Optional compression level (using `zstd`) to apply to log data before storing.
146    pub log_compression: Option<u8>,
147
148    /// The codec configuration to use for encoding and decoding log items.
149    pub log_codec_config: C,
150
151    /// The number of operations to store in each section of the [`VJournal`].
152    pub log_items_per_section: NonZeroU64,
153
154    /// The name of the [`RStorage`] partition used for the location map.
155    pub locations_journal_partition: String,
156
157    /// The number of items to put in each blob in the location map.
158    pub locations_items_per_blob: NonZeroU64,
159
160    /// The [`Translator`] used by the compressed index.
161    pub translator: T,
162
163    /// The buffer pool to use for caching data.
164    pub buffer_pool: PoolRef,
165}
166
167/// An unauthenticated key-value database based off of an append-only [VJournal] of operations.
168pub struct Store<E, K, V, T>
169where
170    E: RStorage + Clock + Metrics,
171    K: Array,
172    V: Codec,
173    T: Translator,
174{
175    /// A log of all [Operation]s that have been applied to the store.
176    log: VJournal<E, Operation<K, V>>,
177
178    /// A snapshot of all currently active operations in the form of a map from each key to the
179    /// section and offset within the section containing its most recent update.
180    ///
181    /// # Invariant
182    ///
183    /// Only references operations of type [Operation::Update].
184    snapshot: Index<T, u64>,
185
186    /// The number of items to store in each section of the variable journal.
187    log_items_per_section: u64,
188
189    /// A fixed-length journal that maps an operation's location to its offset within its respective
190    /// section of the log. (The section number is derived from location.)
191    locations: FJournal<E, u32>,
192
193    /// A location before which all operations are "inactive" (that is, operations before this point
194    /// are over keys that have been updated by some operation at or after this point).
195    inactivity_floor_loc: u64,
196
197    /// The location of the oldest operation in the log that remains readable.
198    oldest_retained_loc: u64,
199
200    /// The total number of operations that have been applied to the store.
201    log_size: u64,
202
203    /// The number of operations that are pending commit.
204    uncommitted_ops: u64,
205}
206
207impl<E, K, V, T> Store<E, K, V, T>
208where
209    E: RStorage + Clock + Metrics,
210    K: Array,
211    V: Codec,
212    T: Translator,
213{
214    /// Initializes a new [`Store`] database with the given configuration.
215    ///
216    /// ## Rollback
217    ///
218    /// Any uncommitted operations will be rolled back if the [Store] was previously closed without
219    /// committing.
220    pub async fn init(
221        context: E,
222        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
223    ) -> Result<Self, Error> {
224        let snapshot: Index<T, u64> = Index::init(context.with_label("snapshot"), cfg.translator);
225
226        let log = VJournal::init(
227            context.with_label("log"),
228            VConfig {
229                partition: cfg.log_journal_partition,
230                compression: cfg.log_compression,
231                codec_config: cfg.log_codec_config,
232                buffer_pool: cfg.buffer_pool.clone(),
233                write_buffer: cfg.log_write_buffer,
234            },
235        )
236        .await?;
237
238        let locations = FJournal::init(
239            context.with_label("locations"),
240            FConfig {
241                partition: cfg.locations_journal_partition,
242                items_per_blob: cfg.locations_items_per_blob,
243                write_buffer: cfg.log_write_buffer,
244                buffer_pool: cfg.buffer_pool,
245            },
246        )
247        .await?;
248
249        let db = Self {
250            log,
251            snapshot,
252            log_items_per_section: cfg.log_items_per_section.get(),
253            locations,
254            inactivity_floor_loc: 0,
255            oldest_retained_loc: 0,
256            log_size: 0,
257            uncommitted_ops: 0,
258        };
259
260        db.build_snapshot_from_log().await
261    }
262
263    /// Gets the value associated with the given key in the store.
264    ///
265    /// If the key does not exist, returns `Ok(None)`.
266    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
267        for &loc in self.snapshot.get(key) {
268            let Operation::Update(k, v) = self.get_op(loc).await? else {
269                unreachable!("location ({loc}) does not reference update operation");
270            };
271
272            if &k == key {
273                return Ok(Some(v));
274            }
275        }
276
277        Ok(None)
278    }
279
280    /// Get the value of the operation with location `loc` in the db. Returns
281    /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is
282    /// otherwise assumed valid.
283    pub async fn get_loc(&self, loc: u64) -> Result<Option<V>, Error> {
284        assert!(loc < self.log_size);
285        let op = self.get_op(loc).await?;
286
287        Ok(op.into_value())
288    }
289
290    /// Updates the value associated with the given key in the store.
291    ///
292    /// The operation is immediately visible in the snapshot for subsequent queries, but remains
293    /// uncommitted until [Store::commit] is called. Uncommitted operations will be rolled back
294    /// if the store is closed without committing.
295    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
296        let new_loc = self.log_size;
297        if let Some(old_loc) = self.get_key_loc(&key).await? {
298            Self::update_loc(&mut self.snapshot, &key, old_loc, new_loc);
299        } else {
300            self.snapshot.insert(&key, new_loc);
301        };
302
303        self.apply_op(Operation::Update(key, value))
304            .await
305            .map(|_| ())
306    }
307
308    /// Deletes the value associated with the given key in the store. If the key has no value,
309    /// the operation is a no-op.
310    pub async fn delete(&mut self, key: K) -> Result<(), Error> {
311        let Some(old_loc) = self.get_key_loc(&key).await? else {
312            // Key does not exist, so this is a no-op.
313            return Ok(());
314        };
315
316        Self::delete_loc(&mut self.snapshot, &key, old_loc);
317
318        self.apply_op(Operation::Delete(key)).await.map(|_| ())
319    }
320
321    /// Commit any pending operations to the database, ensuring their durability upon return from
322    /// this function. Also raises the inactivity floor according to the schedule. Caller can
323    /// associate an arbitrary `metadata` value with the commit.
324    ///
325    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
326    /// recover the database on restart.
327    pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
328        self.raise_inactivity_floor(metadata, self.uncommitted_ops + 1)
329            .await?;
330        self.uncommitted_ops = 0;
331
332        let section = self.current_section();
333        self.log.sync(section).await?;
334        debug!(log_size = self.log_size, "commit complete");
335
336        Ok(())
337    }
338
339    fn current_section(&self) -> u64 {
340        self.log_size / self.log_items_per_section
341    }
342
343    /// Sync all database state to disk. While this isn't necessary to ensure durability of
344    /// committed operations, periodic invocation may reduce memory usage and the time required to
345    /// recover the database on restart.
346    pub async fn sync(&mut self) -> Result<(), Error> {
347        let current_section = self.log_size / self.log_items_per_section;
348        try_join!(self.log.sync(current_section), self.locations.sync())?;
349
350        Ok(())
351    }
352
353    /// Prune historical operations that are behind the inactivity floor. This does not affect the
354    /// state root.
355    ///
356    /// # Panics
357    ///
358    /// Panics if `target_prune_loc` is greater than the inactivity floor.
359    pub async fn prune(&mut self, target_prune_loc: u64) -> Result<(), Error> {
360        // Calculate the target pruning position: inactivity_floor_loc.
361        assert!(target_prune_loc <= self.inactivity_floor_loc);
362        if target_prune_loc <= self.oldest_retained_loc {
363            return Ok(());
364        }
365
366        // Sync locations so it never ends up behind the log.
367        // TODO(https://github.com/commonwarexyz/monorepo/issues/1554): Extend recovery to avoid
368        // this sync.
369        self.locations.sync().await?;
370
371        // Prune the log up to the section containing the requested pruning location. We always
372        // prune the log first, and then prune the locations structure based on the log's actual
373        // pruning boundary. This procedure ensures all log operations always have corresponding
374        // location entries, even in the event of failures, with no need for special recovery.
375        let section_with_target = target_prune_loc / self.log_items_per_section;
376        if !self.log.prune(section_with_target).await? {
377            return Ok(());
378        }
379        self.oldest_retained_loc = section_with_target * self.log_items_per_section;
380        debug!(
381            log_size = self.log_size,
382            oldest_retained_loc = self.oldest_retained_loc,
383            target_prune_loc,
384            "pruned inactive ops"
385        );
386
387        // Prune the locations map up to the oldest retained item in the log after pruning.
388        self.locations
389            .prune(self.oldest_retained_loc)
390            .await
391            .map_err(Error::Journal)?;
392
393        Ok(())
394    }
395
396    /// Get the location and metadata associated with the last commit, or None if no commit has been
397    /// made.
398    pub async fn get_metadata(&self) -> Result<Option<(u64, Option<V>)>, Error> {
399        let mut last_commit = self.op_count() - self.uncommitted_ops;
400        if last_commit == 0 {
401            return Ok(None);
402        }
403        last_commit -= 1;
404        let section = last_commit / self.log_items_per_section;
405        let offset = self.locations.read(last_commit).await?;
406        let Some(Operation::CommitFloor(metadata, _)) = self.log.get(section, offset).await? else {
407            unreachable!("no commit operation at location of last commit {last_commit}");
408        };
409
410        Ok(Some((last_commit, metadata)))
411    }
412
413    /// Closes the store. Any uncommitted operations will be lost if they have not been committed
414    /// via [Store::commit].
415    pub async fn close(self) -> Result<(), Error> {
416        if self.uncommitted_ops > 0 {
417            warn!(
418                log_size = self.log_size,
419                uncommitted_ops = self.uncommitted_ops,
420                "closing store with uncommitted operations"
421            );
422        }
423
424        try_join!(self.log.close(), self.locations.close())?;
425        Ok(())
426    }
427
428    /// Simulates a commit failure by avoiding syncing either or both of the log or locations.
429    #[cfg(test)]
430    pub async fn simulate_failure(
431        mut self,
432        sync_locations: bool,
433        sync_log: bool,
434    ) -> Result<(), Error> {
435        if sync_locations {
436            self.locations.sync().await?;
437        }
438        if sync_log {
439            let section = self.current_section();
440            self.log.sync(section).await?;
441        }
442
443        Ok(())
444    }
445
446    /// Destroys the store permanently, removing all persistent data associated with it.
447    ///
448    /// # Warning
449    ///
450    /// This operation is irreversible. Do not call this method unless you are sure
451    /// you want to delete all data associated with this store permanently!
452    pub async fn destroy(self) -> Result<(), Error> {
453        try_join!(self.log.destroy(), self.locations.destroy())?;
454        Ok(())
455    }
456
457    /// Returns the number of operations that have been applied to the store, including those that
458    /// are not yet committed.
459    pub fn op_count(&self) -> u64 {
460        self.log_size
461    }
462
463    /// Return the inactivity floor location. This is the location before which all operations are
464    /// known to be inactive.
465    pub fn inactivity_floor_loc(&self) -> u64 {
466        self.inactivity_floor_loc
467    }
468
469    /// Builds the database's snapshot from the log of operations. Any operations that sit above
470    /// the latest commit operation are removed.
471    ///
472    /// Returns the number of operations that were applied to the store, the oldest retained
473    /// location, and the inactivity floor location.
474    async fn build_snapshot_from_log(mut self) -> Result<Self, Error> {
475        let mut locations_size = self.locations.size().await?;
476
477        // The location and blob-offset of the first operation to follow the last known commit point.
478        let mut after_last_commit = None;
479        // The set of operations that have not yet been committed.
480        let mut uncommitted_ops = HashMap::new();
481        let mut oldest_retained_loc_found = false;
482        {
483            let stream = self
484                .log
485                .replay(0, 0, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
486                .await?;
487            pin_mut!(stream);
488            while let Some(result) = stream.next().await {
489                match result {
490                    Err(e) => {
491                        return Err(Error::Journal(e));
492                    }
493                    Ok((section, offset, _, op)) => {
494                        if !oldest_retained_loc_found {
495                            self.log_size = section * self.log_items_per_section;
496                            self.oldest_retained_loc = self.log_size;
497                            oldest_retained_loc_found = true;
498                        }
499
500                        let loc = self.log_size; // location of the current operation.
501                        if after_last_commit.is_none() {
502                            after_last_commit = Some((loc, offset));
503                        }
504
505                        self.log_size += 1;
506
507                        // Consistency check: confirm the provided section matches what we expect from this operation's
508                        // index.
509                        let expected = loc / self.log_items_per_section;
510                        assert_eq!(section, expected,
511                                "given section {section} did not match expected section {expected} from location {loc}");
512
513                        if self.log_size > locations_size {
514                            warn!(section, offset, "operation was missing from location map");
515                            self.locations.append(offset).await?;
516                            locations_size += 1;
517                        }
518
519                        match op {
520                            Operation::Delete(key) => {
521                                let result = self.get_key_loc(&key).await?;
522                                if let Some(old_loc) = result {
523                                    uncommitted_ops.insert(key, (Some(old_loc), None));
524                                } else {
525                                    uncommitted_ops.remove(&key);
526                                }
527                            }
528                            Operation::Update(key, _) => {
529                                let result = self.get_key_loc(&key).await?;
530                                if let Some(old_loc) = result {
531                                    uncommitted_ops.insert(key, (Some(old_loc), Some(loc)));
532                                } else {
533                                    uncommitted_ops.insert(key, (None, Some(loc)));
534                                }
535                            }
536                            Operation::CommitFloor(_, loc) => {
537                                self.inactivity_floor_loc = loc;
538
539                                // Apply all uncommitted operations.
540                                for (key, (old_loc, new_loc)) in uncommitted_ops.iter() {
541                                    if let Some(old_loc) = old_loc {
542                                        if let Some(new_loc) = new_loc {
543                                            Self::update_loc(
544                                                &mut self.snapshot,
545                                                key,
546                                                *old_loc,
547                                                *new_loc,
548                                            );
549                                        } else {
550                                            Self::delete_loc(&mut self.snapshot, key, *old_loc);
551                                        }
552                                    } else {
553                                        assert!(new_loc.is_some());
554                                        self.snapshot.insert(key, new_loc.unwrap());
555                                    }
556                                }
557                                uncommitted_ops.clear();
558                                after_last_commit = None;
559                            }
560                            _ => unreachable!(
561                                "unexpected operation type at offset {offset} of section {section}"
562                            ),
563                        }
564                    }
565                }
566            }
567        }
568
569        // Rewind the operations log if necessary.
570        if let Some((end_loc, end_offset)) = after_last_commit {
571            assert!(!uncommitted_ops.is_empty());
572            warn!(
573                op_count = uncommitted_ops.len(),
574                log_size = end_loc,
575                end_offset,
576                "rewinding over uncommitted operations at end of log"
577            );
578            let prune_to_section = end_loc / self.log_items_per_section;
579            self.log
580                .rewind_to_offset(prune_to_section, end_offset)
581                .await?;
582            self.log.sync(prune_to_section).await?;
583            self.log_size = end_loc;
584        }
585
586        // Pop any locations that are ahead of the last log commit point.
587        if locations_size > self.log_size {
588            warn!(
589                locations_size,
590                log_size = self.log_size,
591                "rewinding uncommitted locations"
592            );
593            self.locations.rewind(self.log_size).await?;
594            self.locations.sync().await?;
595        }
596
597        // Confirm post-conditions hold.
598        assert_eq!(self.log_size, self.locations.size().await?);
599
600        debug!(log_size = self.log_size, "build_snapshot_from_log complete");
601
602        Ok(self)
603    }
604
605    /// Append the operation to the log. The `commit` method must be called to make any applied operation
606    /// persistent & recoverable.
607    async fn apply_op(&mut self, op: Operation<K, V>) -> Result<u32, Error> {
608        // Append the operation to the current section of the operations log.
609        let section = self.current_section();
610        let (offset, _) = self.log.append(section, op).await?;
611
612        // Append the offset of the new operation to locations.
613        self.locations.append(offset).await?;
614
615        // Update the uncommitted operations count and increment the log size
616        self.uncommitted_ops += 1;
617        self.log_size += 1;
618
619        // Maintain the invariant that all completely full sections are synced & immutable.
620        if self.current_section() != section {
621            self.log.sync(section).await?;
622        }
623
624        Ok(offset)
625    }
626
627    /// Gets the location of the most recent [Operation::Update] for the key, or [None] if the key
628    /// does not have a value.
629    async fn get_key_loc(&self, key: &K) -> Result<Option<u64>, Error> {
630        for loc in self.snapshot.get(key) {
631            match self.get_op(*loc).await {
632                Ok(Operation::Update(k, _)) => {
633                    if k == *key {
634                        return Ok(Some(*loc));
635                    }
636                }
637                Err(Error::OperationPruned(_)) => {
638                    unreachable!("invalid location in snapshot: loc={loc}")
639                }
640                _ => unreachable!("non-update operation referenced by snapshot: loc={loc}"),
641            }
642        }
643
644        Ok(None)
645    }
646
647    /// Gets a [Operation] from the log at the given location. Returns [Error::OperationPruned]
648    /// if the location precedes the oldest retained location. The location is otherwise assumed
649    /// valid.
650    async fn get_op(&self, loc: u64) -> Result<Operation<K, V>, Error> {
651        assert!(loc < self.log_size);
652        if loc < self.oldest_retained_loc {
653            return Err(Error::OperationPruned(loc));
654        }
655
656        let section = loc / self.log_items_per_section;
657        let offset = self.locations.read(loc).await?;
658
659        // Get the operation from the log at the specified section and offset.
660        let Some(op) = self.log.get(section, offset).await? else {
661            panic!("invalid location {loc}");
662        };
663
664        Ok(op)
665    }
666
667    /// Updates the snapshot with the new operation location for the given key.
668    fn update_loc(snapshot: &mut Index<T, u64>, key: &K, old_loc: u64, new_loc: u64) {
669        let Some(mut cursor) = snapshot.get_mut(key) else {
670            return;
671        };
672
673        // Iterate over conflicts in the snapshot.
674        while let Some(loc) = cursor.next() {
675            if *loc == old_loc {
676                // Update the cursor with the new location for this key.
677                cursor.update(new_loc);
678                return;
679            }
680        }
681    }
682
683    /// Deletes items in the snapshot that point to the given location.
684    fn delete_loc(snapshot: &mut Index<T, u64>, key: &K, old_loc: u64) {
685        let Some(mut cursor) = snapshot.get_mut(key) else {
686            return;
687        };
688
689        // Iterate over conflicts in the snapshot.
690        while let Some(loc) = cursor.next() {
691            if *loc == old_loc {
692                // Delete the element from the cursor.
693                cursor.delete();
694                return;
695            }
696        }
697    }
698
699    // Moves the given operation to the tip of the log if it is active, rendering its old location
700    // inactive. If the operation was not active, then this is a no-op. Returns the old location
701    // of the operation if it was active.
702    async fn move_op_if_active(
703        &mut self,
704        op: Operation<K, V>,
705        old_loc: u64,
706    ) -> Result<Option<u64>, Error> {
707        // If the translated key is not in the snapshot, get a cursor to look for the key.
708        let Some(key) = op.key() else {
709            // `op` is not a key-related operation, so it is not active.
710            return Ok(None);
711        };
712
713        let Some(mut cursor) = self.snapshot.get_mut(key) else {
714            return Ok(None);
715        };
716
717        let new_loc = self.log_size;
718
719        // Iterate over all conflicting keys in the snapshot.
720        while let Some(&loc) = cursor.next() {
721            if loc == old_loc {
722                // Update the location of the operation in the snapshot.
723                cursor.update(new_loc);
724                drop(cursor);
725
726                self.apply_op(op).await?;
727                return Ok(Some(old_loc));
728            }
729        }
730
731        // The operation is not active, so this is a no-op.
732        Ok(None)
733    }
734
735    /// Raise the inactivity floor by exactly `max_steps` steps, followed by applying a commit
736    /// operation. Each step either advances over an inactive operation, or re-applies an active
737    /// operation to the tip and then advances over it.
738    ///
739    /// This method does not change the state of the db's snapshot.
740    async fn raise_inactivity_floor(
741        &mut self,
742        metadata: Option<V>,
743        max_steps: u64,
744    ) -> Result<(), Error> {
745        for _ in 0..max_steps {
746            if self.inactivity_floor_loc == self.log_size {
747                break;
748            }
749            let op = self.get_op(self.inactivity_floor_loc).await?;
750            self.move_op_if_active(op, self.inactivity_floor_loc)
751                .await?;
752            self.inactivity_floor_loc += 1;
753        }
754
755        self.apply_op(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
756            .await
757            .map(|_| ())
758    }
759}
760
761#[cfg(test)]
762mod test {
763    use super::*;
764    use crate::translator::TwoCap;
765    use commonware_cryptography::{
766        blake3::{hash, Digest},
767        Digest as _,
768    };
769    use commonware_macros::test_traced;
770    use commonware_runtime::{deterministic, Runner};
771    use commonware_utils::{NZUsize, NZU64};
772
773    const PAGE_SIZE: usize = 77;
774    const PAGE_CACHE_SIZE: usize = 9;
775
776    /// The type of the store used in tests.
777    type TestStore = Store<deterministic::Context, Digest, Vec<u8>, TwoCap>;
778
779    async fn create_test_store(context: deterministic::Context) -> TestStore {
780        let cfg = Config {
781            log_journal_partition: "journal".to_string(),
782            log_write_buffer: NZUsize!(64 * 1024),
783            log_compression: None,
784            log_codec_config: ((0..=10000).into(), ()),
785            log_items_per_section: NZU64!(7),
786            locations_journal_partition: "locations_journal".to_string(),
787            locations_items_per_blob: NZU64!(11),
788            translator: TwoCap,
789            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
790        };
791        Store::init(context, cfg).await.unwrap()
792    }
793
794    #[test_traced("DEBUG")]
795    pub fn test_store_construct_empty() {
796        let executor = deterministic::Runner::default();
797        executor.start(|mut context| async move {
798            let mut db = create_test_store(context.clone()).await;
799            assert_eq!(db.op_count(), 0);
800            assert_eq!(db.oldest_retained_loc, 0);
801            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
802
803            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
804            let d1 = Digest::random(&mut context);
805            let v1 = vec![1, 2, 3];
806            db.update(d1, v1).await.unwrap();
807            db.close().await.unwrap();
808            let mut db = create_test_store(context.clone()).await;
809            assert_eq!(db.op_count(), 0);
810
811            // Test calling commit on an empty db which should make it (durably) non-empty.
812            db.commit(None).await.unwrap();
813            assert_eq!(db.op_count(), 1);
814            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
815            let mut db = create_test_store(context.clone()).await;
816
817            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits.
818            for _ in 1..100 {
819                db.commit(None).await.unwrap();
820                assert_eq!(db.op_count() - 1, db.inactivity_floor_loc);
821            }
822
823            db.destroy().await.unwrap();
824        });
825    }
826
827    #[test_traced("DEBUG")]
828    fn test_store_construct_basic() {
829        let executor = deterministic::Runner::default();
830
831        executor.start(|mut ctx| async move {
832            let mut store = create_test_store(ctx.with_label("store")).await;
833
834            // Ensure the store is empty
835            assert_eq!(store.op_count(), 0);
836            assert_eq!(store.uncommitted_ops, 0);
837            assert_eq!(store.inactivity_floor_loc, 0);
838
839            let key = Digest::random(&mut ctx);
840            let value = vec![2, 3, 4, 5];
841
842            // Attempt to get a key that does not exist
843            let result = store.get(&key).await;
844            assert!(result.unwrap().is_none());
845
846            // Insert a key-value pair
847            store.update(key, value.clone()).await.unwrap();
848
849            assert_eq!(store.log_size, 1);
850            assert_eq!(store.uncommitted_ops, 1);
851            assert_eq!(store.inactivity_floor_loc, 0);
852
853            // Fetch the value
854            let fetched_value = store.get(&key).await.unwrap();
855            assert_eq!(fetched_value.unwrap(), value);
856
857            // Sync the store to persist the changes
858            store.sync().await.unwrap();
859
860            // Re-open the store
861            let mut store = create_test_store(ctx.with_label("store")).await;
862
863            // Ensure the re-opened store removed the uncommitted operations
864            assert_eq!(store.log_size, 0);
865            assert_eq!(store.uncommitted_ops, 0);
866            assert_eq!(store.inactivity_floor_loc, 0);
867            assert_eq!(store.get_metadata().await.unwrap(), None);
868
869            // Insert a key-value pair
870            store.update(key, value.clone()).await.unwrap();
871
872            assert_eq!(store.log_size, 1);
873            assert_eq!(store.uncommitted_ops, 1);
874            assert_eq!(store.inactivity_floor_loc, 0);
875
876            // Persist the changes
877            let metadata = Some(vec![99, 100]);
878            store.commit(metadata.clone()).await.unwrap();
879            assert_eq!(
880                store.get_metadata().await.unwrap(),
881                Some((3, metadata.clone()))
882            );
883
884            // Even though the store was pruned, the inactivity floor was raised by 2, and
885            // the old operations remain in the same blob as an active operation, so they're
886            // retained.
887            assert_eq!(store.log_size, 4);
888            assert_eq!(store.uncommitted_ops, 0);
889            assert_eq!(store.inactivity_floor_loc, 2);
890
891            // Re-open the store
892            let mut store = create_test_store(ctx.with_label("store")).await;
893
894            // Ensure the re-opened store retained the committed operations
895            assert_eq!(store.log_size, 4);
896            assert_eq!(store.uncommitted_ops, 0);
897            assert_eq!(store.inactivity_floor_loc, 2);
898
899            // Fetch the value, ensuring it is still present
900            let fetched_value = store.get(&key).await.unwrap();
901            assert_eq!(fetched_value.unwrap(), value);
902
903            // Insert two new k/v pairs to force pruning of the first section.
904            let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
905            let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
906            store.update(k1, v1.clone()).await.unwrap();
907            store.update(k2, v2.clone()).await.unwrap();
908
909            assert_eq!(store.log_size, 6);
910            assert_eq!(store.uncommitted_ops, 2);
911            assert_eq!(store.inactivity_floor_loc, 2);
912
913            // Make sure we can still get metadata.
914            assert_eq!(store.get_metadata().await.unwrap(), Some((3, metadata)));
915
916            store.commit(None).await.unwrap();
917            assert_eq!(store.get_metadata().await.unwrap(), Some((8, None)));
918
919            assert_eq!(store.log_size, 9);
920            assert_eq!(store.uncommitted_ops, 0);
921            assert_eq!(store.inactivity_floor_loc, 5);
922
923            // Ensure all keys can be accessed, despite the first section being pruned.
924            assert_eq!(store.get(&key).await.unwrap().unwrap(), value);
925            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
926            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
927
928            // Destroy the store
929            store.destroy().await.unwrap();
930        });
931    }
932
933    #[test_traced("DEBUG")]
934    fn test_store_log_replay() {
935        let executor = deterministic::Runner::default();
936
937        executor.start(|mut ctx| async move {
938            let mut store = create_test_store(ctx.with_label("store")).await;
939
940            // Update the same key many times.
941            const UPDATES: u64 = 100;
942            let k = Digest::random(&mut ctx);
943            for _ in 0..UPDATES {
944                let v = vec![1, 2, 3, 4, 5];
945                store.update(k, v.clone()).await.unwrap();
946            }
947
948            let iter = store.snapshot.get(&k);
949            assert_eq!(iter.count(), 1);
950
951            store.commit(None).await.unwrap();
952            store.close().await.unwrap();
953
954            // Re-open the store, prune it, then ensure it replays the log correctly.
955            let mut store = create_test_store(ctx.with_label("store")).await;
956            store.prune(store.inactivity_floor_loc()).await.unwrap();
957
958            let iter = store.snapshot.get(&k);
959            assert_eq!(iter.count(), 1);
960
961            // 100 operations were applied, and two were moved due to their activity, plus
962            // the commit operation.
963            assert_eq!(store.log_size, UPDATES + 3);
964            // Only the highest `Update` operation is active, plus the commit operation above it.
965            assert_eq!(store.inactivity_floor_loc, UPDATES + 1);
966
967            // All blobs prior to the inactivity floor are pruned, so the oldest retained location
968            // is the first in the last retained blob.
969            assert_eq!(store.oldest_retained_loc, UPDATES - UPDATES % 7);
970            assert_eq!(store.uncommitted_ops, 0);
971
972            store.destroy().await.unwrap();
973        });
974    }
975
976    #[test_traced("DEBUG")]
977    fn test_store_build_snapshot_keys_with_shared_prefix() {
978        let executor = deterministic::Runner::default();
979
980        executor.start(|mut ctx| async move {
981            let mut store = create_test_store(ctx.with_label("store")).await;
982
983            let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
984            let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
985
986            // Ensure k2 shares 2 bytes with k1 (test DB uses `TwoCap` translator.)
987            k2.0[0..2].copy_from_slice(&k1.0[0..2]);
988
989            store.update(k1, v1.clone()).await.unwrap();
990            store.update(k2, v2.clone()).await.unwrap();
991
992            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
993            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
994
995            store.commit(None).await.unwrap();
996            store.close().await.unwrap();
997
998            // Re-open the store to ensure it builds the snapshot for the conflicting
999            // keys correctly.
1000            let store = create_test_store(ctx.with_label("store")).await;
1001
1002            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
1003            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
1004
1005            store.destroy().await.unwrap();
1006        });
1007    }
1008
1009    #[test_traced("DEBUG")]
1010    fn test_store_delete() {
1011        let executor = deterministic::Runner::default();
1012
1013        executor.start(|mut ctx| async move {
1014            let mut store = create_test_store(ctx.with_label("store")).await;
1015
1016            // Insert a key-value pair
1017            let k = Digest::random(&mut ctx);
1018            let v = vec![1, 2, 3, 4, 5];
1019            store.update(k, v.clone()).await.unwrap();
1020
1021            // Fetch the value
1022            let fetched_value = store.get(&k).await.unwrap();
1023            assert_eq!(fetched_value.unwrap(), v);
1024
1025            // Delete the key
1026            store.delete(k).await.unwrap();
1027
1028            // Ensure the key is no longer present
1029            let fetched_value = store.get(&k).await.unwrap();
1030            assert!(fetched_value.is_none());
1031
1032            // Commit the changes
1033            store.commit(None).await.unwrap();
1034
1035            // Re-open the store and ensure the key is still deleted
1036            let mut store = create_test_store(ctx.with_label("store")).await;
1037            let fetched_value = store.get(&k).await.unwrap();
1038            assert!(fetched_value.is_none());
1039
1040            // Re-insert the key
1041            store.update(k, v.clone()).await.unwrap();
1042            let fetched_value = store.get(&k).await.unwrap();
1043            assert_eq!(fetched_value.unwrap(), v);
1044
1045            // Commit the changes
1046            store.commit(None).await.unwrap();
1047
1048            // Re-open the store and ensure the snapshot restores the key, after processing
1049            // the delete and the subsequent set.
1050            let mut store = create_test_store(ctx.with_label("store")).await;
1051            let fetched_value = store.get(&k).await.unwrap();
1052            assert_eq!(fetched_value.unwrap(), v);
1053
1054            // Delete a non-existent key (no-op)
1055            let k_n = Digest::random(&mut ctx);
1056            store.delete(k_n).await.unwrap();
1057
1058            let iter = store.snapshot.get(&k);
1059            assert_eq!(iter.count(), 1);
1060
1061            let iter = store.snapshot.get(&k_n);
1062            assert_eq!(iter.count(), 0);
1063
1064            store.destroy().await.unwrap();
1065        });
1066    }
1067
1068    /// Tests the pruning example in the module documentation.
1069    #[test_traced("DEBUG")]
1070    fn test_store_pruning() {
1071        let executor = deterministic::Runner::default();
1072
1073        executor.start(|mut ctx| async move {
1074            let mut store = create_test_store(ctx.with_label("store")).await;
1075
1076            let k_a = Digest::random(&mut ctx);
1077            let k_b = Digest::random(&mut ctx);
1078
1079            let v_a = vec![1];
1080            let v_b = vec![];
1081            let v_c = vec![4, 5, 6];
1082
1083            store.update(k_a, v_a.clone()).await.unwrap();
1084            store.update(k_b, v_b.clone()).await.unwrap();
1085
1086            store.commit(None).await.unwrap();
1087            assert_eq!(store.op_count(), 6);
1088            assert_eq!(store.uncommitted_ops, 0);
1089            assert_eq!(store.inactivity_floor_loc, 3);
1090            assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_a);
1091
1092            store.update(k_b, v_a.clone()).await.unwrap();
1093            store.update(k_a, v_c.clone()).await.unwrap();
1094
1095            store.commit(None).await.unwrap();
1096            assert_eq!(store.op_count(), 9);
1097            assert_eq!(store.uncommitted_ops, 0);
1098            assert_eq!(store.inactivity_floor_loc, 6);
1099            assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_c);
1100            assert_eq!(store.get(&k_b).await.unwrap().unwrap(), v_a);
1101
1102            store.destroy().await.unwrap();
1103        });
1104    }
1105
1106    #[test_traced("WARN")]
1107    pub fn test_store_db_recovery() {
1108        let executor = deterministic::Runner::default();
1109        // Build a db with 1000 keys, some of which we update and some of which we delete.
1110        const ELEMENTS: u64 = 1000;
1111        executor.start(|context| async move {
1112            let mut db = create_test_store(context.with_label("store")).await;
1113
1114            for i in 0u64..ELEMENTS {
1115                let k = hash(&i.to_be_bytes());
1116                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1117                db.update(k, v.clone()).await.unwrap();
1118            }
1119
1120            // Simulate a failed commit and test that we rollback to the previous root.
1121            db.simulate_failure(false, false).await.unwrap();
1122            let mut db = create_test_store(context.with_label("store")).await;
1123            assert_eq!(db.op_count(), 0);
1124
1125            // re-apply the updates and commit them this time.
1126            for i in 0u64..ELEMENTS {
1127                let k = hash(&i.to_be_bytes());
1128                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1129                db.update(k, v.clone()).await.unwrap();
1130            }
1131            db.commit(None).await.unwrap();
1132            let op_count = db.op_count();
1133
1134            // Update every 3rd key
1135            for i in 0u64..ELEMENTS {
1136                if i % 3 != 0 {
1137                    continue;
1138                }
1139                let k = hash(&i.to_be_bytes());
1140                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1141                db.update(k, v.clone()).await.unwrap();
1142            }
1143
1144            // Simulate a failed commit and test that we rollback to the previous root.
1145            db.simulate_failure(false, false).await.unwrap();
1146            let mut db = create_test_store(context.with_label("store")).await;
1147            assert_eq!(db.op_count(), op_count);
1148
1149            // Re-apply updates for every 3rd key and commit them this time.
1150            for i in 0u64..ELEMENTS {
1151                if i % 3 != 0 {
1152                    continue;
1153                }
1154                let k = hash(&i.to_be_bytes());
1155                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1156                db.update(k, v.clone()).await.unwrap();
1157            }
1158            db.commit(None).await.unwrap();
1159            let op_count = db.op_count();
1160            assert_eq!(op_count, 2561);
1161            assert_eq!(db.snapshot.items(), 1000);
1162
1163            // Delete every 7th key
1164            for i in 0u64..ELEMENTS {
1165                if i % 7 != 1 {
1166                    continue;
1167                }
1168                let k = hash(&i.to_be_bytes());
1169                db.delete(k).await.unwrap();
1170            }
1171
1172            // Simulate a failed commit and test that we rollback to the previous root.
1173            db.simulate_failure(false, false).await.unwrap();
1174            let db = create_test_store(context.with_label("store")).await;
1175            assert_eq!(db.op_count(), op_count);
1176
1177            // Close and reopen the store to ensure the final commit is preserved.
1178            db.close().await.unwrap();
1179            let mut db = create_test_store(context.with_label("store")).await;
1180            assert_eq!(db.op_count(), op_count);
1181
1182            // Re-delete every 7th key and commit this time.
1183            for i in 0u64..ELEMENTS {
1184                if i % 7 != 1 {
1185                    continue;
1186                }
1187                let k = hash(&i.to_be_bytes());
1188                db.delete(k).await.unwrap();
1189            }
1190            db.commit(None).await.unwrap();
1191
1192            assert_eq!(db.op_count(), 2787);
1193            assert_eq!(db.inactivity_floor_loc, 1480);
1194
1195            db.prune(db.inactivity_floor_loc()).await.unwrap();
1196            assert_eq!(db.oldest_retained_loc, 1480 - 1480 % 7);
1197            assert_eq!(db.snapshot.items(), 857);
1198
1199            db.destroy().await.unwrap();
1200        });
1201    }
1202}