commonware_storage/store/
mod.rs

1//! A mutable key-value database that supports variable-sized values.
2//!
3//! # Terminology
4//!
5//! A _key_ in an unauthenticated database either has a _value_ or it doesn't. The _update_
6//! operation gives a key a specific value whether it previously had no value or had a different
7//! value.
8//!
9//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
10//! (2) it is an [Operation::Update] operation, and (3) it is the most recent operation for that
11//! key.
12//!
13//! # Lifecycle
14//!
15//! 1. **Initialization**: Create with [Store::init] using a [Config]
16//! 2. **Insertion**: Use [Store::update] to assign a value to a given key
17//! 3. **Deletions**: Use [Store::delete] to remove a key's value
18//! 4. **Persistence**: Call [Store::commit] to make changes durable
19//! 5. **Queries**: Use [Store::get] to retrieve current values
20//! 6. **Cleanup**: Call [Store::close] to shutdown gracefully or [Store::destroy] to remove all
21//!    data
22//!
23//! # Pruning
24//!
25//! The database prunes _inactive_ operations every time [Store::commit] is called. To achieve
26//! this, an _inactivity floor_ is maintained, which is the location at which all operations before
27//! are inactive. At commit-time, the inactivity floor is raised by the number of uncommitted
28//! operations plus 1 for the tailing commit op. During this process, any encountered active
29//! operations are re-applied to the tip of the log.
30//!
31//! |                               Log State                                            | Inactivity Floor | Uncommitted Ops |
32//! |------------------------------------------------------------------------------------|------------------|-----------------|
33//! | [pre-commit] Update(a, v), Update(a, v')                                           |                0 |               2 |
34//! | [raise-floor] Update(a, v), Update(a, v'), Update(a, v'), Update(a, v')            |                3 |               2 |
35//! | [prune+commit] Update(a, v'), Commit(3)                                            |                3 |               0 |
36//! | [pre-commit] Update(a, v'), Commit(3), Update(b, v), Update(a, v'')                |                3 |               2 |
37//! | [raise-floor] Update(a, v'), Commit(3), Update(b, v), Update(a, v''), Update(b, v) |                6 |               2 |
38//! | [prune+commit] Update(a, v''), Update(b, v), Commit(6)                             |                6 |               0 |
39//!
40//! # Example
41//!
42//! ```rust
43//! use commonware_storage::{
44//!     store::{Config, Store},
45//!     translator::TwoCap,
46//! };
47//! use commonware_utils::{NZUsize, NZU64};
48//! use commonware_cryptography::{blake3::Digest, Digest as _};
49//! use commonware_runtime::{buffer::PoolRef, deterministic::Runner, Metrics, Runner as _};
50//!
51//! const PAGE_SIZE: usize = 77;
52//! const PAGE_CACHE_SIZE: usize = 9;
53//!
54//! let executor = Runner::default();
55//! executor.start(|mut ctx| async move {
56//!     let config = Config {
57//!         log_journal_partition: "test_partition".to_string(),
58//!         log_write_buffer: NZUsize!(64 * 1024),
59//!         log_compression: None,
60//!         log_codec_config: (),
61//!         log_items_per_section: NZU64!(4),
62//!         locations_journal_partition: "locations_partition".to_string(),
63//!         locations_items_per_blob: NZU64!(4),
64//!         translator: TwoCap,
65//!         buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
66//!     };
67//!     let mut store =
68//!         Store::<_, Digest, Digest, TwoCap>::init(ctx.with_label("store"), config)
69//!             .await
70//!             .unwrap();
71//!
72//!     // Insert a key-value pair
73//!     let k = Digest::random(&mut ctx);
74//!     let v = Digest::random(&mut ctx);
75//!     store.update(k, v).await.unwrap();
76//!
77//!     // Fetch the value
78//!     let fetched_value = store.get(&k).await.unwrap();
79//!     assert_eq!(fetched_value.unwrap(), v);
80//!
81//!     // Commit the operation to make it persistent
82//!     store.commit().await.unwrap();
83//!
84//!     // Delete the key's value
85//!     store.delete(k).await.unwrap();
86//!
87//!     // Fetch the value
88//!     let fetched_value = store.get(&k).await.unwrap();
89//!     assert!(fetched_value.is_none());
90//!
91//!     // Commit the operation to make it persistent
92//!     store.commit().await.unwrap();
93//!
94//!     // Destroy the store
95//!     store.destroy().await.unwrap();
96//! });
97//! ```
98
99use crate::{
100    index::Index,
101    journal::{
102        fixed::{Config as FConfig, Journal as FJournal},
103        variable::{Config as VConfig, Journal as VJournal},
104    },
105    store::operation::Variable as Operation,
106    translator::Translator,
107};
108use commonware_codec::{Codec, Read};
109use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage};
110use commonware_utils::{sequence::U32, Array, NZUsize};
111use futures::{pin_mut, try_join, StreamExt};
112use std::{
113    collections::HashMap,
114    num::{NonZeroU64, NonZeroUsize},
115};
116use tracing::{debug, warn};
117
118pub mod operation;
119
120/// The size of the read buffer to use for replaying the operations log when rebuilding the
121/// snapshot.
122const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
123
124/// Errors that can occur when interacting with a [Store] database.
125#[derive(thiserror::Error, Debug)]
126pub enum Error {
127    #[error(transparent)]
128    Journal(#[from] crate::journal::Error),
129
130    /// The requested key was not found in the snapshot.
131    #[error("key not found")]
132    KeyNotFound,
133}
134
135/// Configuration for initializing a [Store] database.
136#[derive(Clone)]
137pub struct Config<T: Translator, C> {
138    /// The name of the [`RStorage`] partition used to persist the log of operations.
139    pub log_journal_partition: String,
140
141    /// The size of the write buffer to use for each blob in the [`VJournal`].
142    pub log_write_buffer: NonZeroUsize,
143
144    /// Optional compression level (using `zstd`) to apply to log data before storing.
145    pub log_compression: Option<u8>,
146
147    /// The codec configuration to use for encoding and decoding log items.
148    pub log_codec_config: C,
149
150    /// The number of operations to store in each section of the [`VJournal`].
151    pub log_items_per_section: NonZeroU64,
152
153    /// The name of the [`RStorage`] partition used for the location map.
154    pub locations_journal_partition: String,
155
156    /// The number of items to put in each blob in the location map.
157    pub locations_items_per_blob: NonZeroU64,
158
159    /// The [`Translator`] used by the compressed index.
160    pub translator: T,
161
162    /// The buffer pool to use for caching data.
163    pub buffer_pool: PoolRef,
164}
165
166/// An unauthenticated key-value database based off of an append-only [VJournal] of operations.
167pub struct Store<E, K, V, T>
168where
169    E: RStorage + Clock + Metrics,
170    K: Array,
171    V: Codec,
172    T: Translator,
173{
174    /// A log of all [Operation]s that have been applied to the store.
175    log: VJournal<E, Operation<K, V>>,
176
177    /// A snapshot of all currently active operations in the form of a map from each key to the
178    /// section and offset within the section containing its most recent update.
179    ///
180    /// # Invariant
181    ///
182    /// Only references operations of type [Operation::Update].
183    snapshot: Index<T, u64>,
184
185    /// The number of items to store in each section of the variable journal.
186    log_items_per_section: u64,
187
188    /// A fixed-length journal that maps an operation's location to its offset within its respective
189    /// section of the log. (The section number is derived from location.)
190    locations: FJournal<E, U32>,
191
192    /// A location before which all operations are "inactive" (that is, operations before this point
193    /// are over keys that have been updated by some operation at or after this point).
194    inactivity_floor_loc: u64,
195
196    /// The location of the oldest operation in the log that remains readable.
197    oldest_retained_loc: u64,
198
199    /// The total number of operations that have been applied to the store.
200    log_size: u64,
201
202    /// The number of operations that are pending commit.
203    uncommitted_ops: u64,
204}
205
206impl<E, K, V, T> Store<E, K, V, T>
207where
208    E: RStorage + Clock + Metrics,
209    K: Array,
210    V: Codec,
211    T: Translator,
212{
213    /// Initializes a new [`Store`] database with the given configuration.
214    ///
215    /// ## Rollback
216    ///
217    /// Any uncommitted operations will be rolled back if the [Store] was previously closed without
218    /// committing.
219    pub async fn init(
220        context: E,
221        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
222    ) -> Result<Self, Error> {
223        let snapshot: Index<T, u64> = Index::init(context.with_label("snapshot"), cfg.translator);
224
225        let log = VJournal::init(
226            context.with_label("log"),
227            VConfig {
228                partition: cfg.log_journal_partition,
229                compression: cfg.log_compression,
230                codec_config: cfg.log_codec_config,
231                buffer_pool: cfg.buffer_pool.clone(),
232                write_buffer: cfg.log_write_buffer,
233            },
234        )
235        .await?;
236
237        let locations = FJournal::init(
238            context.with_label("locations"),
239            FConfig {
240                partition: cfg.locations_journal_partition,
241                items_per_blob: cfg.locations_items_per_blob,
242                write_buffer: cfg.log_write_buffer,
243                buffer_pool: cfg.buffer_pool,
244            },
245        )
246        .await?;
247
248        let db = Self {
249            log,
250            snapshot,
251            log_items_per_section: cfg.log_items_per_section.get(),
252            locations,
253            inactivity_floor_loc: 0,
254            oldest_retained_loc: 0,
255            log_size: 0,
256            uncommitted_ops: 0,
257        };
258        db.build_snapshot_from_log().await
259    }
260
261    /// Gets the value associated with the given key in the store.
262    ///
263    /// If the key does not exist, returns `Ok(None)`.
264    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
265        for location in self.snapshot.get(key) {
266            let Operation::Update(k, v) = self.get_op(*location).await? else {
267                panic!("location ({location}) does not reference set operation",);
268            };
269
270            if &k == key {
271                return Ok(Some(v));
272            }
273        }
274
275        Ok(None)
276    }
277
278    /// Updates the value associated with the given key in the store.
279    ///
280    /// The operation is immediately visible in the snapshot for subsequent queries, but remains
281    /// uncommitted until [Store::commit] is called. Uncommitted operations will be rolled back
282    /// if the store is closed without committing.
283    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
284        let new_location = self.log_size;
285        if let Some(old_location) = self.get_loc(&key).await? {
286            Self::update_loc(&mut self.snapshot, &key, old_location, new_location);
287        } else {
288            self.snapshot.insert(&key, new_location);
289        };
290
291        self.apply_op(Operation::Update(key, value))
292            .await
293            .map(|_| ())
294    }
295
296    /// Deletes the value associated with the given key in the store. If the key has no value,
297    /// the operation is a no-op.
298    pub async fn delete(&mut self, key: K) -> Result<(), Error> {
299        let Some(old_loc) = self.get_loc(&key).await? else {
300            // Key does not exist, so this is a no-op.
301            return Ok(());
302        };
303
304        Self::delete_loc(&mut self.snapshot, &key, old_loc);
305
306        self.apply_op(Operation::Delete(key)).await.map(|_| ())
307    }
308
309    /// Commits all uncommitted operations to the store, making them persistent and recoverable.
310    pub async fn commit(&mut self) -> Result<(), Error> {
311        self.raise_inactivity_floor(self.uncommitted_ops + 1)
312            .await?;
313        self.uncommitted_ops = 0;
314
315        self.sync().await?;
316        self.prune_inactive().await
317    }
318
319    /// Closes the store. Any uncommitted operations will be lost if they have not been committed
320    /// via [Store::commit].
321    pub async fn close(self) -> Result<(), Error> {
322        if self.uncommitted_ops > 0 {
323            warn!(
324                log_size = self.log_size,
325                uncommitted_ops = self.uncommitted_ops,
326                "closing store with uncommitted operations"
327            );
328        }
329
330        try_join!(self.log.close(), self.locations.close())?;
331        Ok(())
332    }
333
334    /// Simulates a commit failure by avoiding syncing either or both of the log or locations.
335    #[cfg(test)]
336    pub async fn simulate_failure(
337        mut self,
338        sync_locations: bool,
339        sync_log: bool,
340    ) -> Result<(), Error> {
341        if sync_locations {
342            self.locations.sync().await?;
343        }
344        if sync_log {
345            self.log
346                .sync(self.log_size / self.log_items_per_section)
347                .await?;
348        }
349
350        Ok(())
351    }
352
353    /// Destroys the store permanently, removing all persistent data associated with it.
354    ///
355    /// # Warning
356    ///
357    /// This operation is irreversible. Do not call this method unless you are sure
358    /// you want to delete all data associated with this store permanently!
359    pub async fn destroy(self) -> Result<(), Error> {
360        try_join!(self.log.destroy(), self.locations.destroy())?;
361        Ok(())
362    }
363
364    /// Returns the number of operations that have been applied to the store, including those that
365    /// are not yet committed.
366    pub fn op_count(&self) -> u64 {
367        self.log_size
368    }
369
370    /// Syncs the active section of the log to persistent storage.
371    ///
372    /// This method ensures that all buffered data is written to disk, but unlike [Store::commit],
373    /// does not create a commit point.
374    ///
375    /// Use this method when you want to ensure data durability without creating a formal
376    /// transaction boundary.
377    async fn sync(&mut self) -> Result<(), Error> {
378        let current_section = self.log_size / self.log_items_per_section;
379        try_join!(self.log.sync(current_section), self.locations.sync())?;
380        Ok(())
381    }
382
383    /// Builds the database's snapshot from the log of operations. Any operations that sit above
384    /// the latest commit operation are removed.
385    ///
386    /// Returns the number of operations that were applied to the store, the oldest retained
387    /// location, and the inactivity floor location.
388    async fn build_snapshot_from_log(mut self) -> Result<Self, Error> {
389        let mut locations_size = self.locations.size().await?;
390
391        // The size of the log at the last commit point (including the commit operation), or 0 if
392        // none.
393        let mut end_loc = 0;
394        // The offset into the log at the end_loc.
395        let mut end_offset = 0;
396        // The set of operations that have not yet been committed.
397        let mut uncommitted_ops = HashMap::new();
398        let mut oldest_retained_loc_found = false;
399        {
400            let stream = self.log.replay(NZUsize!(SNAPSHOT_READ_BUFFER_SIZE)).await?;
401            pin_mut!(stream);
402            while let Some(result) = stream.next().await {
403                match result {
404                    Err(e) => {
405                        return Err(Error::Journal(e));
406                    }
407                    Ok((section, offset, size, op)) => {
408                        if !oldest_retained_loc_found {
409                            self.log_size = section * self.log_items_per_section;
410                            self.oldest_retained_loc = self.log_size;
411                            oldest_retained_loc_found = true;
412                        }
413                        let loc = self.log_size; // location of the current operation.
414                        self.log_size += 1;
415
416                        // Consistency check: confirm the provided section matches what we expect from this operation's
417                        // index.
418                        let expected = loc / self.log_items_per_section;
419                        assert_eq!(section, expected,
420                                "given section {section} did not match expected section {expected} from location {loc}");
421
422                        if self.log_size > locations_size {
423                            warn!(section, offset, "operation was missing from location map");
424                            self.locations.append(offset.into()).await?;
425                            locations_size += 1;
426                        }
427
428                        match op {
429                            Operation::Delete(key) => {
430                                let result = self.get_loc(&key).await?;
431                                if let Some(old_loc) = result {
432                                    uncommitted_ops.insert(key, (Some(old_loc), None));
433                                } else {
434                                    uncommitted_ops.remove(&key);
435                                }
436                            }
437                            Operation::Update(key, _) => {
438                                let result = self.get_loc(&key).await?;
439                                if let Some(old_loc) = result {
440                                    uncommitted_ops.insert(key, (Some(old_loc), Some(loc)));
441                                } else {
442                                    uncommitted_ops.insert(key, (None, Some(loc)));
443                                }
444                            }
445                            Operation::CommitFloor(loc) => {
446                                self.inactivity_floor_loc = loc;
447
448                                // Apply all uncommitted operations.
449                                for (key, (old_loc, new_loc)) in uncommitted_ops.iter() {
450                                    if let Some(old_loc) = old_loc {
451                                        if let Some(new_loc) = new_loc {
452                                            Self::update_loc(
453                                                &mut self.snapshot,
454                                                key,
455                                                *old_loc,
456                                                *new_loc,
457                                            );
458                                        } else {
459                                            Self::delete_loc(&mut self.snapshot, key, *old_loc);
460                                        }
461                                    } else {
462                                        assert!(new_loc.is_some());
463                                        self.snapshot.insert(key, new_loc.unwrap());
464                                    }
465                                }
466                                uncommitted_ops.clear();
467                                end_loc = self.log_size;
468                                end_offset = offset + size;
469                            }
470                            _ => unreachable!(
471                                "unexpected operation type at offset {offset} of section {section}"
472                            ),
473                        }
474                    }
475                }
476            }
477        }
478        if end_loc < self.log_size {
479            warn!(
480                op_count = uncommitted_ops.len(),
481                log_size = end_loc,
482                "rewinding over uncommitted operations at end of log"
483            );
484            let prune_to_section = end_loc.saturating_sub(1) / self.log_items_per_section;
485            self.log
486                .rewind_to_offset(prune_to_section, end_offset)
487                .await?;
488            self.log.sync(prune_to_section).await?;
489            self.log_size = end_loc;
490        }
491
492        // Pop any locations that are ahead of the last log commit point.
493        if locations_size > self.log_size {
494            warn!(
495                locations_size,
496                log_size = self.log_size,
497                "rewinding uncommitted locations"
498            );
499            self.locations.rewind(self.log_size).await?;
500        }
501
502        // Confirm post-conditions hold.
503        assert_eq!(self.log_size, self.locations.size().await?);
504
505        debug!(log_size = self.log_size, "build_snapshot_from_log complete");
506
507        Ok(self)
508    }
509
510    /// Append the operation to the log. The `commit` method must be called to make any applied operation
511    /// persistent & recoverable.
512    async fn apply_op(&mut self, op: Operation<K, V>) -> Result<u32, Error> {
513        let current_section = self.log_size / self.log_items_per_section;
514
515        // Append the operation to the entry log, and the offset to the locations log.
516        //
517        // The section number can be derived from the location by dividing the location
518        // by the number of items to store in each section, hence why we only store a
519        // map of location -> offset within the section.
520        let (offset, _) = self.log.append(current_section, op).await?;
521        self.locations.append(offset.into()).await?;
522
523        self.uncommitted_ops += 1;
524        self.log_size += 1;
525
526        let new_section = self.log_size / self.log_items_per_section;
527
528        // Sync the previous section if we transitioned to a new section
529        if new_section != current_section {
530            self.log.sync(current_section).await?;
531        }
532
533        Ok(offset)
534    }
535
536    /// Gets the location of the most recent [Operation::Update] for the key, or [None] if
537    /// the key does not have a value.
538    async fn get_loc(&self, key: &K) -> Result<Option<u64>, Error> {
539        for loc in self.snapshot.get(key) {
540            match self.get_op(*loc).await {
541                Ok(Operation::Update(k, _)) => {
542                    if k == *key {
543                        return Ok(Some(*loc));
544                    }
545                }
546                Err(Error::KeyNotFound) => return Ok(None),
547                _ => continue,
548            }
549        }
550
551        Ok(None)
552    }
553
554    /// Gets a [Operation] from the log at the given location.
555    async fn get_op(&self, location: u64) -> Result<Operation<K, V>, Error> {
556        let section = location / self.log_items_per_section;
557        let offset = self.locations.read(location).await?.into();
558
559        // Get the operation from the log at the specified section and offset.
560        let Some(op) = self.log.get(section, offset).await? else {
561            return Err(Error::KeyNotFound);
562        };
563
564        Ok(op)
565    }
566
567    /// Updates the snapshot with the new operation location for the given key.
568    fn update_loc(snapshot: &mut Index<T, u64>, key: &K, old_location: u64, new_location: u64) {
569        let Some(mut cursor) = snapshot.get_mut(key) else {
570            return;
571        };
572
573        // Iterate over conflicts in the snapshot.
574        while let Some(location) = cursor.next() {
575            if *location == old_location {
576                // Update the cursor with the new location for this key.
577                cursor.update(new_location);
578                return;
579            }
580        }
581    }
582
583    /// Deletes items in the snapshot that point to the given location.
584    fn delete_loc(snapshot: &mut Index<T, u64>, key: &K, old_location: u64) {
585        let Some(mut cursor) = snapshot.get_mut(key) else {
586            return;
587        };
588
589        // Iterate over conflicts in the snapshot.
590        while let Some(location) = cursor.next() {
591            if *location == old_location {
592                // Delete the element from the cursor.
593                cursor.delete();
594                return;
595            }
596        }
597    }
598
599    // Moves the given operation to the tip of the log if it is active, rendering its old location
600    // inactive. If the operation was not active, then this is a no-op. Returns the old location
601    // of the operation if it was active.
602    async fn move_op_if_active(
603        &mut self,
604        op: Operation<K, V>,
605        old_location: u64,
606    ) -> Result<Option<u64>, Error> {
607        // If the translated key is not in the snapshot, get a cursor to look for the key.
608        let Some(key) = op.to_key() else {
609            // `op` is not a key-related operation, so it is not active.
610            return Ok(None);
611        };
612
613        let Some(mut cursor) = self.snapshot.get_mut(key) else {
614            return Ok(None);
615        };
616
617        let new_location = self.log_size;
618
619        // Iterate over all conflicting keys in the snapshot.
620        while let Some(&location) = cursor.next() {
621            if location == old_location {
622                // Update the location of the operation in the snapshot.
623                cursor.update(new_location);
624                drop(cursor);
625
626                self.apply_op(op).await?;
627                return Ok(Some(old_location));
628            }
629        }
630
631        // The operation is not active, so this is a no-op.
632        Ok(None)
633    }
634
635    /// Raise the inactivity floor by exactly `max_steps` steps, followed by applying a commit
636    /// operation. Each step either advances over an inactive operation, or re-applies an active
637    /// operation to the tip and then advances over it.
638    ///
639    /// This method does not change the state of the db's snapshot.
640    async fn raise_inactivity_floor(&mut self, max_steps: u64) -> Result<(), Error> {
641        for _ in 0..max_steps {
642            if self.inactivity_floor_loc == self.log_size {
643                break;
644            }
645            let op = self.get_op(self.inactivity_floor_loc).await?;
646            self.move_op_if_active(op, self.inactivity_floor_loc)
647                .await?;
648            self.inactivity_floor_loc += 1;
649        }
650
651        self.apply_op(Operation::CommitFloor(self.inactivity_floor_loc))
652            .await
653            .map(|_| ())
654    }
655
656    /// Prune historical operations that are behind the inactivity floor. This does not affect the
657    /// current snapshot.
658    async fn prune_inactive(&mut self) -> Result<(), Error> {
659        if self.log_size == 0 {
660            return Ok(());
661        }
662
663        // Calculate the target pruning position: inactivity_floor_loc.
664        let target_prune_loc = self.inactivity_floor_loc;
665        let ops_to_prune = target_prune_loc.saturating_sub(self.oldest_retained_loc);
666        if ops_to_prune == 0 {
667            return Ok(());
668        }
669        debug!(ops_to_prune, target_prune_loc, "pruning inactive ops");
670
671        // Prune the log up to the section containing the requested pruning location. We always
672        // prune the log first, and then prune the locations structure based on the log's
673        // actual pruning boundary. This procedure ensures all log operations always have
674        // corresponding location entries, even in the event of failures, with no need for
675        // special recovery.
676        let section = target_prune_loc / self.log_items_per_section;
677        self.log.prune(section).await?;
678        self.oldest_retained_loc = section * self.log_items_per_section;
679
680        // Prune the locations map up to the oldest retained item in the log after pruning.
681        self.locations
682            .prune(self.oldest_retained_loc)
683            .await
684            .map_err(Error::Journal)
685    }
686}
687
688#[cfg(test)]
689mod test {
690    use super::*;
691    use crate::translator::TwoCap;
692    use commonware_cryptography::{
693        blake3::{hash, Digest},
694        Digest as _,
695    };
696    use commonware_macros::test_traced;
697    use commonware_runtime::{deterministic, Runner};
698    use commonware_utils::{NZUsize, NZU64};
699
700    const PAGE_SIZE: usize = 77;
701    const PAGE_CACHE_SIZE: usize = 9;
702
703    /// The type of the store used in tests.
704    type TestStore = Store<deterministic::Context, Digest, Vec<u8>, TwoCap>;
705
706    async fn create_test_store(context: deterministic::Context) -> TestStore {
707        let cfg = Config {
708            log_journal_partition: "journal".to_string(),
709            log_write_buffer: NZUsize!(64 * 1024),
710            log_compression: None,
711            log_codec_config: ((0..=10000).into(), ()),
712            log_items_per_section: NZU64!(7),
713            locations_journal_partition: "locations_journal".to_string(),
714            locations_items_per_blob: NZU64!(11),
715            translator: TwoCap,
716            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
717        };
718        Store::init(context, cfg).await.unwrap()
719    }
720
721    #[test_traced("DEBUG")]
722    pub fn test_store_construct_empty() {
723        let executor = deterministic::Runner::default();
724        executor.start(|mut context| async move {
725            let mut db = create_test_store(context.clone()).await;
726            assert_eq!(db.op_count(), 0);
727            assert_eq!(db.oldest_retained_loc, 0);
728            assert!(matches!(db.prune_inactive().await, Ok(())));
729
730            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
731            let d1 = Digest::random(&mut context);
732            let v1 = vec![1, 2, 3];
733            db.update(d1, v1).await.unwrap();
734            db.close().await.unwrap();
735            let mut db = create_test_store(context.clone()).await;
736            assert_eq!(db.op_count(), 0);
737
738            // Test calling commit on an empty db which should make it (durably) non-empty.
739            db.commit().await.unwrap();
740            assert_eq!(db.op_count(), 1);
741            assert!(matches!(db.prune_inactive().await, Ok(())));
742            let mut db = create_test_store(context.clone()).await;
743
744            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits.
745            for _ in 1..100 {
746                db.commit().await.unwrap();
747                assert_eq!(db.op_count() - 1, db.inactivity_floor_loc);
748            }
749
750            db.destroy().await.unwrap();
751        });
752    }
753
754    #[test_traced("DEBUG")]
755    fn test_store_construct_basic() {
756        let executor = deterministic::Runner::default();
757
758        executor.start(|mut ctx| async move {
759            let mut store = create_test_store(ctx.with_label("store")).await;
760
761            // Ensure the store is empty
762            assert_eq!(store.op_count(), 0);
763            assert_eq!(store.uncommitted_ops, 0);
764            assert_eq!(store.inactivity_floor_loc, 0);
765
766            let key = Digest::random(&mut ctx);
767            let value = vec![2, 3, 4, 5];
768
769            // Attempt to get a key that does not exist
770            let result = store.get(&key).await;
771            assert!(result.unwrap().is_none());
772
773            // Insert a key-value pair
774            store.update(key, value.clone()).await.unwrap();
775
776            assert_eq!(store.log_size, 1);
777            assert_eq!(store.uncommitted_ops, 1);
778            assert_eq!(store.inactivity_floor_loc, 0);
779
780            // Fetch the value
781            let fetched_value = store.get(&key).await.unwrap();
782            assert_eq!(fetched_value.unwrap(), value);
783
784            // Sync the store to persist the changes
785            store.sync().await.unwrap();
786
787            // Re-open the store
788            let mut store = create_test_store(ctx.with_label("store")).await;
789
790            // Ensure the re-opened store removed the uncommitted operations
791            assert_eq!(store.log_size, 0);
792            assert_eq!(store.uncommitted_ops, 0);
793            assert_eq!(store.inactivity_floor_loc, 0);
794
795            // Insert a key-value pair
796            store.update(key, value.clone()).await.unwrap();
797
798            assert_eq!(store.log_size, 1);
799            assert_eq!(store.uncommitted_ops, 1);
800            assert_eq!(store.inactivity_floor_loc, 0);
801
802            // Persist the changes
803            store.commit().await.unwrap();
804
805            // Even though the store was pruned, the inactivity floor was raised by 2, and
806            // the old operations remain in the same blob as an active operation, so they're
807            // retained.
808            assert_eq!(store.log_size, 4);
809            assert_eq!(store.uncommitted_ops, 0);
810            assert_eq!(store.inactivity_floor_loc, 2);
811
812            // Re-open the store
813            let mut store = create_test_store(ctx.with_label("store")).await;
814
815            // Ensure the re-opened store retained the committed operations
816            assert_eq!(store.log_size, 4);
817            assert_eq!(store.uncommitted_ops, 0);
818            assert_eq!(store.inactivity_floor_loc, 2);
819
820            // Fetch the value, ensuring it is still present
821            let fetched_value = store.get(&key).await.unwrap();
822            assert_eq!(fetched_value.unwrap(), value);
823
824            // Insert two new k/v pairs to force pruning of the first section.
825            let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
826            let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
827            store.update(k1, v1.clone()).await.unwrap();
828            store.update(k2, v2.clone()).await.unwrap();
829
830            assert_eq!(store.log_size, 6);
831            assert_eq!(store.uncommitted_ops, 2);
832            assert_eq!(store.inactivity_floor_loc, 2);
833
834            store.commit().await.unwrap();
835
836            assert_eq!(store.log_size, 9);
837            assert_eq!(store.uncommitted_ops, 0);
838            assert_eq!(store.inactivity_floor_loc, 5);
839
840            // Ensure all keys can be accessed, despite the first section being pruned.
841            assert_eq!(store.get(&key).await.unwrap().unwrap(), value);
842            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
843            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
844
845            // Destroy the store
846            store.destroy().await.unwrap();
847        });
848    }
849
850    #[test_traced("DEBUG")]
851    fn test_store_log_replay() {
852        let executor = deterministic::Runner::default();
853
854        executor.start(|mut ctx| async move {
855            let mut store = create_test_store(ctx.with_label("store")).await;
856
857            // Update the same key many times.
858            const UPDATES: u64 = 100;
859            let k = Digest::random(&mut ctx);
860            for _ in 0..UPDATES {
861                let v = vec![1, 2, 3, 4, 5];
862                store.update(k, v.clone()).await.unwrap();
863            }
864
865            let iter = store.snapshot.get(&k);
866            assert_eq!(iter.count(), 1);
867
868            store.commit().await.unwrap();
869            store.close().await.unwrap();
870
871            // Re-open the store to ensure it replays the log correctly.
872            let store = create_test_store(ctx.with_label("store")).await;
873
874            let iter = store.snapshot.get(&k);
875            assert_eq!(iter.count(), 1);
876
877            // 100 operations were applied, and two were moved due to their activity, plus
878            // the commit operation.
879            assert_eq!(store.log_size, UPDATES + 3);
880            // Only the highest `Update` operation is active, plus the commit operation above it.
881            assert_eq!(store.inactivity_floor_loc, UPDATES + 1);
882            // All blobs prior to the inactivity floor are pruned, so the oldest retained location
883            // is the first in the last retained blob.
884            assert_eq!(store.oldest_retained_loc, UPDATES - UPDATES % 7);
885            assert_eq!(store.uncommitted_ops, 0);
886
887            store.destroy().await.unwrap();
888        });
889    }
890
891    #[test_traced("DEBUG")]
892    fn test_store_build_snapshot_keys_with_shared_prefix() {
893        let executor = deterministic::Runner::default();
894
895        executor.start(|mut ctx| async move {
896            let mut store = create_test_store(ctx.with_label("store")).await;
897
898            let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
899            let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
900
901            // Ensure k2 shares 2 bytes with k1 (test DB uses `TwoCap` translator.)
902            k2.0[0..2].copy_from_slice(&k1.0[0..2]);
903
904            store.update(k1, v1.clone()).await.unwrap();
905            store.update(k2, v2.clone()).await.unwrap();
906
907            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
908            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
909
910            store.commit().await.unwrap();
911            store.close().await.unwrap();
912
913            // Re-open the store to ensure it builds the snapshot for the conflicting
914            // keys correctly.
915            let store = create_test_store(ctx.with_label("store")).await;
916
917            assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
918            assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
919
920            store.destroy().await.unwrap();
921        });
922    }
923
924    #[test_traced("DEBUG")]
925    fn test_store_delete() {
926        let executor = deterministic::Runner::default();
927
928        executor.start(|mut ctx| async move {
929            let mut store = create_test_store(ctx.with_label("store")).await;
930
931            // Insert a key-value pair
932            let k = Digest::random(&mut ctx);
933            let v = vec![1, 2, 3, 4, 5];
934            store.update(k, v.clone()).await.unwrap();
935
936            // Fetch the value
937            let fetched_value = store.get(&k).await.unwrap();
938            assert_eq!(fetched_value.unwrap(), v);
939
940            // Delete the key
941            store.delete(k).await.unwrap();
942
943            // Ensure the key is no longer present
944            let fetched_value = store.get(&k).await.unwrap();
945            assert!(fetched_value.is_none());
946
947            // Commit the changes
948            store.commit().await.unwrap();
949
950            // Re-open the store and ensure the key is still deleted
951            let mut store = create_test_store(ctx.with_label("store")).await;
952            let fetched_value = store.get(&k).await.unwrap();
953            assert!(fetched_value.is_none());
954
955            // Re-insert the key
956            store.update(k, v.clone()).await.unwrap();
957            let fetched_value = store.get(&k).await.unwrap();
958            assert_eq!(fetched_value.unwrap(), v);
959
960            // Commit the changes
961            store.commit().await.unwrap();
962
963            // Re-open the store and ensure the snapshot restores the key, after processing
964            // the delete and the subsequent set.
965            let mut store = create_test_store(ctx.with_label("store")).await;
966            let fetched_value = store.get(&k).await.unwrap();
967            assert_eq!(fetched_value.unwrap(), v);
968
969            // Delete a non-existent key (no-op)
970            let k_n = Digest::random(&mut ctx);
971            store.delete(k_n).await.unwrap();
972
973            let iter = store.snapshot.get(&k);
974            assert_eq!(iter.count(), 1);
975
976            let iter = store.snapshot.get(&k_n);
977            assert_eq!(iter.count(), 0);
978
979            store.destroy().await.unwrap();
980        });
981    }
982
983    /// Tests the pruning example in the module documentation.
984    #[test_traced("DEBUG")]
985    fn test_store_pruning() {
986        let executor = deterministic::Runner::default();
987
988        executor.start(|mut ctx| async move {
989            let mut store = create_test_store(ctx.with_label("store")).await;
990
991            let k_a = Digest::random(&mut ctx);
992            let k_b = Digest::random(&mut ctx);
993
994            let v_a = vec![1];
995            let v_b = vec![];
996            let v_c = vec![4, 5, 6];
997
998            store.update(k_a, v_a.clone()).await.unwrap();
999            store.update(k_b, v_b.clone()).await.unwrap();
1000
1001            store.commit().await.unwrap();
1002            assert_eq!(store.op_count(), 6);
1003            assert_eq!(store.uncommitted_ops, 0);
1004            assert_eq!(store.inactivity_floor_loc, 3);
1005            assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_a);
1006
1007            store.update(k_b, v_a.clone()).await.unwrap();
1008            store.update(k_a, v_c.clone()).await.unwrap();
1009
1010            store.commit().await.unwrap();
1011            assert_eq!(store.op_count(), 9);
1012            assert_eq!(store.uncommitted_ops, 0);
1013            assert_eq!(store.inactivity_floor_loc, 6);
1014            assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_c);
1015            assert_eq!(store.get(&k_b).await.unwrap().unwrap(), v_a);
1016
1017            store.destroy().await.unwrap();
1018        });
1019    }
1020
1021    #[test_traced("WARN")]
1022    pub fn test_store_db_recovery() {
1023        let executor = deterministic::Runner::default();
1024        // Build a db with 1000 keys, some of which we update and some of which we delete.
1025        const ELEMENTS: u64 = 1000;
1026        executor.start(|context| async move {
1027            let mut db = create_test_store(context.with_label("store")).await;
1028
1029            for i in 0u64..ELEMENTS {
1030                let k = hash(&i.to_be_bytes());
1031                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1032                db.update(k, v.clone()).await.unwrap();
1033            }
1034
1035            // Simulate a failed commit and test that we rollback to the previous root.
1036            db.simulate_failure(false, false).await.unwrap();
1037            let mut db = create_test_store(context.with_label("store")).await;
1038            assert_eq!(db.op_count(), 0);
1039
1040            // re-apply the updates and commit them this time.
1041            for i in 0u64..ELEMENTS {
1042                let k = hash(&i.to_be_bytes());
1043                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1044                db.update(k, v.clone()).await.unwrap();
1045            }
1046            db.commit().await.unwrap();
1047            let op_count = db.op_count();
1048
1049            // Update every 3rd key
1050            for i in 0u64..ELEMENTS {
1051                if i % 3 != 0 {
1052                    continue;
1053                }
1054                let k = hash(&i.to_be_bytes());
1055                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1056                db.update(k, v.clone()).await.unwrap();
1057            }
1058
1059            // Simulate a failed commit and test that we rollback to the previous root.
1060            db.simulate_failure(false, false).await.unwrap();
1061            let mut db = create_test_store(context.with_label("store")).await;
1062            assert_eq!(db.op_count(), op_count);
1063
1064            // Re-apply updates for every 3rd key and commit them this time.
1065            for i in 0u64..ELEMENTS {
1066                if i % 3 != 0 {
1067                    continue;
1068                }
1069                let k = hash(&i.to_be_bytes());
1070                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1071                db.update(k, v.clone()).await.unwrap();
1072            }
1073            db.commit().await.unwrap();
1074            let op_count = db.op_count();
1075            assert_eq!(op_count, 2561);
1076            assert_eq!(db.snapshot.items(), 1000);
1077
1078            // Delete every 7th key
1079            for i in 0u64..ELEMENTS {
1080                if i % 7 != 1 {
1081                    continue;
1082                }
1083                let k = hash(&i.to_be_bytes());
1084                db.delete(k).await.unwrap();
1085            }
1086
1087            // Simulate a failed commit and test that we rollback to the previous root.
1088            db.simulate_failure(false, false).await.unwrap();
1089            let mut db = create_test_store(context.with_label("store")).await;
1090            assert_eq!(db.op_count(), op_count);
1091
1092            // Re-delete every 7th key and commit this time.
1093            for i in 0u64..ELEMENTS {
1094                if i % 7 != 1 {
1095                    continue;
1096                }
1097                let k = hash(&i.to_be_bytes());
1098                db.delete(k).await.unwrap();
1099            }
1100            db.commit().await.unwrap();
1101
1102            assert_eq!(db.op_count(), 2787);
1103            assert_eq!(db.inactivity_floor_loc, 1480);
1104            assert_eq!(db.oldest_retained_loc, 1480 - 1480 % 7);
1105            assert_eq!(db.snapshot.items(), 857);
1106
1107            db.destroy().await.unwrap();
1108        });
1109    }
1110}