commonware_storage/adb/any/variable/
mod.rs

1//! An authenticated database (ADB) that provides succinct proofs of _any_ value ever associated
2//! with a key, where values can have varying sizes.
3//!
4//! _If the values you wish to store all have the same size, use the [crate::adb::any::fixed::Any]
5//! db instead._
6
7use crate::{
8    adb::{align_mmr_and_locations, Error},
9    index::Index,
10    journal::{
11        fixed::{Config as FConfig, Journal as FJournal},
12        variable::{Config as VConfig, Journal as VJournal},
13    },
14    mmr::{
15        hasher::Standard,
16        iterator::{leaf_num_to_pos, leaf_pos_to_num},
17        journaled::{Config as MmrConfig, Mmr},
18        verification::Proof,
19    },
20    store::operation::Variable as Operation,
21    translator::Translator,
22};
23use commonware_codec::{Codec, Encode as _, Read};
24use commonware_cryptography::Hasher as CHasher;
25use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
26use commonware_utils::{Array, NZUsize};
27use futures::{future::TryFutureExt, pin_mut, try_join, StreamExt};
28use std::{
29    collections::HashMap,
30    num::{NonZeroU64, NonZeroUsize},
31};
32use tracing::{debug, warn};
33
34pub mod sync;
35
36/// The size of the read buffer to use for replaying the operations log when rebuilding the
37/// snapshot.
38const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
39
40/// Configuration for an `Any` authenticated db.
41#[derive(Clone)]
42pub struct Config<T: Translator, C> {
43    /// The name of the [RStorage] partition used for the MMR's backing journal.
44    pub mmr_journal_partition: String,
45
46    /// The items per blob configuration value used by the MMR journal.
47    pub mmr_items_per_blob: NonZeroU64,
48
49    /// The size of the write buffer to use for each blob in the MMR journal.
50    pub mmr_write_buffer: NonZeroUsize,
51
52    /// The name of the [RStorage] partition used for the MMR's metadata.
53    pub mmr_metadata_partition: String,
54
55    /// The name of the [RStorage] partition used to persist the (pruned) log of operations.
56    pub log_journal_partition: String,
57
58    /// The size of the write buffer to use for each blob in the log journal.
59    pub log_write_buffer: NonZeroUsize,
60
61    /// Optional compression level (using `zstd`) to apply to log data before storing.
62    pub log_compression: Option<u8>,
63
64    /// The codec configuration to use for encoding and decoding log items.
65    pub log_codec_config: C,
66
67    /// The number of items to put in each section of the journal.
68    pub log_items_per_section: NonZeroU64,
69
70    /// The name of the [RStorage] partition used for the location map.
71    pub locations_journal_partition: String,
72
73    /// The number of items to put in each blob in the location map.
74    pub locations_items_per_blob: NonZeroU64,
75
76    /// The translator used by the compressed index.
77    pub translator: T,
78
79    /// An optional thread pool to use for parallelizing batch operations.
80    pub thread_pool: Option<ThreadPool>,
81
82    /// The buffer pool to use for caching data.
83    pub buffer_pool: PoolRef,
84}
85
86/// A key-value ADB based on an MMR over its log of operations, supporting authentication of any
87/// value ever associated with a key.
88pub struct Any<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translator> {
89    /// An MMR over digests of the operations applied to the db.
90    ///
91    /// # Invariant
92    ///
93    /// The number of leaves in this MMR always equals the number of operations in the unpruned
94    /// `log`.
95    mmr: Mmr<E, H>,
96
97    /// A (pruned) log of all operations applied to the db in order of occurrence. The position of
98    /// each operation in the log is called its _location_, which is a stable identifier. Pruning is
99    /// indicated by a non-zero value for `pruned_loc`, which provides the location of the first
100    /// operation in the log.
101    ///
102    /// # Invariant
103    ///
104    /// An operation's location is always equal to the number of the MMR leaf storing the digest of
105    /// the operation.
106    log: VJournal<E, Operation<K, V>>,
107
108    /// The number of operations that have been appended to the log (which must equal the number of
109    /// leaves in the MMR).
110    log_size: u64,
111
112    /// The number of items to put in each section of the journal.
113    log_items_per_section: u64,
114
115    /// A fixed-length journal that maps an operation's location to its offset within its respective
116    /// section of the log. (The section number is derived from location.)
117    locations: FJournal<E, u32>,
118
119    /// A location before which all operations are "inactive" (that is, operations before this point
120    /// are over keys that have been updated by some operation at or after this point).
121    inactivity_floor_loc: u64,
122
123    /// The location of the oldest operation in the log that remains readable.
124    oldest_retained_loc: u64,
125
126    /// A snapshot of all currently active operations in the form of a map from each key to the
127    /// location in the log containing its most recent update.
128    ///
129    /// # Invariant
130    ///
131    /// Only references operations of type Operation::Update.
132    pub(super) snapshot: Index<T, u64>,
133
134    /// The number of operations that are pending commit.
135    pub(super) uncommitted_ops: u64,
136
137    /// Cryptographic hasher to re-use within mutable operations requiring digest computation.
138    pub(super) hasher: Standard<H>,
139}
140
141impl<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translator>
142    Any<E, K, V, H, T>
143{
144    /// Returns a [Any] adb initialized from `cfg`. Any uncommitted log operations will be
145    /// discarded and the state of the db will be as of the last committed operation.
146    pub async fn init(
147        context: E,
148        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
149    ) -> Result<Self, Error> {
150        let snapshot: Index<T, u64> =
151            Index::init(context.with_label("snapshot"), cfg.translator.clone());
152        let mut hasher = Standard::<H>::new();
153
154        let mmr = Mmr::init(
155            context.with_label("mmr"),
156            &mut hasher,
157            MmrConfig {
158                journal_partition: cfg.mmr_journal_partition,
159                metadata_partition: cfg.mmr_metadata_partition,
160                items_per_blob: cfg.mmr_items_per_blob,
161                write_buffer: cfg.mmr_write_buffer,
162                thread_pool: cfg.thread_pool,
163                buffer_pool: cfg.buffer_pool.clone(),
164            },
165        )
166        .await?;
167
168        let log = VJournal::init(
169            context.with_label("log"),
170            VConfig {
171                partition: cfg.log_journal_partition,
172                compression: cfg.log_compression,
173                codec_config: cfg.log_codec_config,
174                buffer_pool: cfg.buffer_pool.clone(),
175                write_buffer: cfg.log_write_buffer,
176            },
177        )
178        .await?;
179
180        let locations = FJournal::init(
181            context.with_label("locations"),
182            FConfig {
183                partition: cfg.locations_journal_partition,
184                items_per_blob: cfg.locations_items_per_blob,
185                write_buffer: cfg.log_write_buffer,
186                buffer_pool: cfg.buffer_pool,
187            },
188        )
189        .await?;
190
191        let db = Self {
192            mmr,
193            log,
194            log_size: 0,
195            inactivity_floor_loc: 0,
196            oldest_retained_loc: 0,
197            locations,
198            log_items_per_section: cfg.log_items_per_section.get(),
199            uncommitted_ops: 0,
200            snapshot,
201            hasher,
202        };
203
204        db.build_snapshot_from_log().await
205    }
206
207    /// Builds the database's snapshot by replaying the log from inception, while also:
208    ///   - trimming any uncommitted operations from the log,
209    ///   - adding log operations to the MMR & location map if they are missing,
210    ///   - removing any elements from the MMR & location map that don't remain in the log after
211    ///     trimming.
212    ///
213    /// # Post-condition
214    ///
215    /// The number of operations in the log, locations, and the number of leaves in the MMR are
216    /// equal.
217    async fn build_snapshot_from_log(mut self) -> Result<Self, Error> {
218        // Align the mmr with the location map. Any elements we remove here that are still in the
219        // log will be re-added later.
220        let mut mmr_leaves = align_mmr_and_locations(&mut self.mmr, &mut self.locations).await?;
221
222        // The location and blob-offset of the first operation to follow the last known commit point.
223        let mut after_last_commit = None;
224        // The set of operations that have not yet been committed.
225        let mut uncommitted_ops = HashMap::new();
226        let mut oldest_retained_loc_found = false;
227
228        // Replay the log from inception to build the snapshot, keeping track of any uncommitted
229        // operations, and any log operations that need to be re-added to the MMR & locations.
230        {
231            let stream = self
232                .log
233                .replay(0, 0, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
234                .await?;
235            pin_mut!(stream);
236            while let Some(result) = stream.next().await {
237                match result {
238                    Err(e) => {
239                        return Err(Error::Journal(e));
240                    }
241                    Ok((section, offset, _, op)) => {
242                        if !oldest_retained_loc_found {
243                            self.log_size = section * self.log_items_per_section;
244                            self.oldest_retained_loc = self.log_size;
245                            oldest_retained_loc_found = true;
246                        }
247
248                        let loc = self.log_size; // location of the current operation.
249                        if after_last_commit.is_none() {
250                            after_last_commit = Some((loc, offset));
251                        }
252
253                        self.log_size += 1;
254
255                        // Consistency check: confirm the provided section matches what we expect from this operation's
256                        // index.
257                        let expected = loc / self.log_items_per_section;
258                        assert_eq!(section, expected,
259                                "given section {section} did not match expected section {expected} from location {loc}");
260
261                        if self.log_size > mmr_leaves {
262                            warn!(
263                                section,
264                                offset, "operation was missing from MMR/location map"
265                            );
266                            self.mmr.add(&mut self.hasher, &op.encode()).await?;
267                            self.locations.append(offset).await?;
268                            mmr_leaves += 1;
269                        }
270
271                        match op {
272                            Operation::Delete(key) => {
273                                let result = self.get_key_loc(&key).await?;
274                                if let Some(old_loc) = result {
275                                    uncommitted_ops.insert(key, (Some(old_loc), None));
276                                } else {
277                                    uncommitted_ops.remove(&key);
278                                }
279                            }
280                            Operation::Update(key, _) => {
281                                let result = self.get_key_loc(&key).await?;
282                                if let Some(old_loc) = result {
283                                    uncommitted_ops.insert(key, (Some(old_loc), Some(loc)));
284                                } else {
285                                    uncommitted_ops.insert(key, (None, Some(loc)));
286                                }
287                            }
288                            Operation::CommitFloor(_, loc) => {
289                                self.inactivity_floor_loc = loc;
290
291                                // Apply all uncommitted operations.
292                                for (key, (old_loc, new_loc)) in uncommitted_ops.iter() {
293                                    if let Some(old_loc) = old_loc {
294                                        if let Some(new_loc) = new_loc {
295                                            Self::update_loc(
296                                                &mut self.snapshot,
297                                                key,
298                                                *old_loc,
299                                                *new_loc,
300                                            );
301                                        } else {
302                                            Self::delete_loc(&mut self.snapshot, key, *old_loc);
303                                        }
304                                    } else {
305                                        assert!(new_loc.is_some());
306                                        self.snapshot.insert(key, new_loc.unwrap());
307                                    }
308                                }
309                                uncommitted_ops.clear();
310                                after_last_commit = None;
311                            }
312                            _ => unreachable!(
313                                "unexpected operation type at offset {offset} of section {section}"
314                            ),
315                        }
316                    }
317                }
318            }
319        }
320
321        // Rewind the operations log if necessary.
322        if let Some((end_loc, end_offset)) = after_last_commit {
323            assert!(!uncommitted_ops.is_empty());
324            warn!(
325                op_count = uncommitted_ops.len(),
326                log_size = end_loc,
327                end_offset,
328                "rewinding over uncommitted operations at end of log"
329            );
330            let prune_to_section = end_loc / self.log_items_per_section;
331            self.log
332                .rewind_to_offset(prune_to_section, end_offset)
333                .await?;
334            self.log.sync(prune_to_section).await?;
335            self.log_size = end_loc;
336        }
337
338        // Pop any MMR elements that are ahead of the last log commit point.
339        if mmr_leaves > self.log_size {
340            self.locations.rewind(self.log_size).await?;
341            self.locations.sync().await?;
342
343            let op_count = mmr_leaves - self.log_size;
344            warn!(op_count, "popping uncommitted MMR operations");
345            self.mmr.pop(op_count as usize).await?;
346        }
347
348        // Confirm post-conditions hold.
349        assert_eq!(self.log_size, leaf_pos_to_num(self.mmr.size()).unwrap());
350        assert_eq!(self.log_size, self.locations.size().await?);
351
352        debug!(log_size = self.log_size, "build_snapshot_from_log complete");
353
354        Ok(self)
355    }
356
357    /// Get the value of `key` in the db, or None if it has no value.
358    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
359        let iter = self.snapshot.get(key);
360        for &loc in iter {
361            if let Some(v) = self.get_from_loc(key, loc).await? {
362                return Ok(Some(v));
363            }
364        }
365
366        Ok(None)
367    }
368
369    /// Get the value of the operation with location `loc` in the db. Returns [Error::OperationPruned]
370    /// if loc precedes the oldest retained location. The location is otherwise assumed valid.
371    ///
372    /// # Panics
373    ///
374    /// Panics if `loc` is greater than or equal to the number of operations in the log.
375    pub async fn get_loc(&self, loc: u64) -> Result<Option<V>, Error> {
376        assert!(loc < self.op_count());
377        if loc < self.oldest_retained_loc {
378            return Err(Error::OperationPruned(loc));
379        }
380
381        let offset = self.locations.read(loc).await?;
382        let section = loc / self.log_items_per_section;
383        let op = self.log.get(section, offset).await?;
384
385        Ok(op.into_value())
386    }
387
388    /// Returns the location of the operation that set the key's current value, or None if the key
389    /// isn't currently assigned any value.
390    pub async fn get_key_loc(&self, key: &K) -> Result<Option<u64>, Error> {
391        let iter = self.snapshot.get(key);
392        for &loc in iter {
393            if self.get_from_loc(key, loc).await?.is_some() {
394                return Ok(Some(loc));
395            }
396        }
397
398        Ok(None)
399    }
400
401    /// Remove the location `delete_loc` from the snapshot if it's associated with `key`.
402    fn delete_loc(snapshot: &mut Index<T, u64>, key: &K, delete_loc: u64) {
403        let Some(mut cursor) = snapshot.get_mut(key) else {
404            return;
405        };
406
407        while let Some(&loc) = cursor.next() {
408            if loc == delete_loc {
409                cursor.delete();
410                return;
411            }
412        }
413    }
414
415    /// Update the location associated with `key` with value `old_loc` to `new_loc`. If there is no
416    /// such key or value, this is a no-op.
417    fn update_loc(snapshot: &mut Index<T, u64>, key: &K, old_loc: u64, new_loc: u64) {
418        let Some(mut cursor) = snapshot.get_mut(key) else {
419            return;
420        };
421
422        while let Some(&loc) = cursor.next() {
423            if loc == old_loc {
424                cursor.update(new_loc);
425                return;
426            }
427        }
428    }
429
430    /// Get the value of the operation with location `loc` in the db if it matches `key`. The
431    /// location is assumed valid.
432    ///
433    /// # Panics
434    ///
435    /// Panics if `loc` is greater than or equal to the number of operations in the log.
436    pub async fn get_from_loc(&self, key: &K, loc: u64) -> Result<Option<V>, Error> {
437        assert!(loc < self.op_count());
438
439        match self.locations.read(loc).await {
440            Ok(offset) => {
441                return self.get_from_offset(key, loc, offset).await;
442            }
443            Err(e) => Err(Error::Journal(e)),
444        }
445    }
446
447    /// Get the operation at location `loc` in the log.
448    async fn get_op(&self, loc: u64) -> Result<Operation<K, V>, Error> {
449        match self.locations.read(loc).await {
450            Ok(offset) => {
451                let section = loc / self.log_items_per_section;
452                self.log.get(section, offset).await.map_err(Error::Journal)
453            }
454            Err(e) => Err(Error::Journal(e)),
455        }
456    }
457
458    /// Get the value of the operation with location `loc` and offset `offset` in the log if it
459    /// matches `key`.
460    async fn get_from_offset(&self, key: &K, loc: u64, offset: u32) -> Result<Option<V>, Error> {
461        let section = loc / self.log_items_per_section;
462        let Operation::Update(k, v) = self.log.get(section, offset).await? else {
463            panic!("didn't find Update operation at location {loc} and offset {offset}");
464        };
465
466        if k != *key {
467            Ok(None)
468        } else {
469            Ok(Some(v))
470        }
471    }
472
473    /// Get the number of operations that have been applied to this db, including those that are not
474    /// yet committed.
475    pub fn op_count(&self) -> u64 {
476        self.log_size
477    }
478
479    /// Returns the section of the log where we are currently writing new items.
480    fn current_section(&self) -> u64 {
481        self.log_size / self.log_items_per_section
482    }
483
484    /// Return the oldest location that remains retrievable.
485    pub fn oldest_retained_loc(&self) -> Option<u64> {
486        if self.log_size == 0 {
487            None
488        } else {
489            Some(self.oldest_retained_loc)
490        }
491    }
492
493    /// Return the inactivity floor location. This is the location before which all operations are
494    /// known to be inactive.
495    pub fn inactivity_floor_loc(&self) -> u64 {
496        self.inactivity_floor_loc
497    }
498
499    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
500    /// subject to rollback until the next successful `commit`.
501    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
502        let new_loc = self.op_count();
503        if let Some(old_loc) = self.get_key_loc(&key).await? {
504            Self::update_loc(&mut self.snapshot, &key, old_loc, new_loc);
505        } else {
506            self.snapshot.insert(&key, new_loc);
507        };
508
509        let op = Operation::Update(key, value);
510        self.apply_op(op).await?;
511
512        Ok(())
513    }
514
515    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
516    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
517    /// successful `commit`.
518    pub async fn delete(&mut self, key: K) -> Result<(), Error> {
519        let Some(old_loc) = self.get_key_loc(&key).await? else {
520            return Ok(());
521        };
522
523        Self::delete_loc(&mut self.snapshot, &key, old_loc);
524        self.apply_op(Operation::Delete(key)).await?;
525
526        Ok(())
527    }
528
529    /// Return the root of the db.
530    ///
531    /// # Warning
532    ///
533    /// Panics if there are uncommitted operations.
534    pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
535        self.mmr.root(hasher)
536    }
537
538    /// Update the operations MMR with the given operation, and append the operation to the log. The
539    /// `commit` method must be called to make any applied operation persistent & recoverable.
540    pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
541        let encoded_op = op.encode();
542        let section = self.current_section();
543
544        // Create a future that appends the operation to the log, then puts its resulting offset
545        // into locations.
546        let log_fut = async {
547            let (offset, _) = self.log.append(section, op).await?;
548            self.locations.append(offset).await?;
549
550            Ok::<(), Error>(())
551        };
552
553        // Run the log update future in parallel with adding the operation to the MMR.
554        try_join!(
555            log_fut,
556            self.mmr
557                .add_batched(&mut self.hasher, &encoded_op)
558                .map_err(Error::Mmr),
559        )?;
560        self.uncommitted_ops += 1;
561        self.log_size += 1;
562
563        // Maintain invariant that all filled sections are synced and immutable.
564        if self.current_section() != section {
565            self.log.sync(section).await?;
566        }
567
568        Ok(())
569    }
570
571    /// Generate and return:
572    ///  1. a proof of all operations applied to the db in the range starting at (and including)
573    ///     location `start_loc`, and ending at the first of either:
574    ///     - the last operation performed, or
575    ///     - the operation `max_ops` from the start.
576    ///  2. the operations corresponding to the leaves in this range.
577    ///
578    /// # Warning
579    ///
580    /// Panics if there are uncommitted operations.
581    pub async fn proof(
582        &self,
583        start_loc: u64,
584        max_ops: u64,
585    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
586        self.historical_proof(self.op_count(), start_loc, max_ops)
587            .await
588    }
589
590    /// Analogous to proof, but with respect to the state of the MMR when it had `size` elements.
591    ///
592    /// # Panics
593    ///
594    /// - Panics if `start_loc` greater than or equal to `size`.
595    /// - Panics if `size` is greater than the number of operations.
596    pub async fn historical_proof(
597        &self,
598        size: u64,
599        start_loc: u64,
600        max_ops: u64,
601    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
602        assert!(size <= self.op_count());
603        assert!(start_loc < size);
604
605        let start_pos = leaf_num_to_pos(start_loc);
606        let end_index = std::cmp::min(size - 1, start_loc + max_ops - 1);
607        let end_pos = leaf_num_to_pos(end_index);
608        let mmr_size = leaf_num_to_pos(size);
609
610        let proof = self
611            .mmr
612            .historical_range_proof(mmr_size, start_pos, end_pos)
613            .await?;
614        let mut ops = Vec::with_capacity((end_index - start_loc + 1) as usize);
615        for loc in start_loc..=end_index {
616            let section = loc / self.log_items_per_section;
617            let offset = self.locations.read(loc).await?;
618            let op = self.log.get(section, offset).await?;
619            ops.push(op);
620        }
621
622        Ok((proof, ops))
623    }
624
625    /// Commit any pending operations to the database, ensuring their durability
626    /// upon return from this function. Also raises the inactivity floor
627    /// according to the schedule. Caller can associate an arbitrary `metadata`
628    /// value with the commit.
629    ///
630    /// Failures after commit (but before `sync` or `close`) may still require
631    /// reprocessing to recover the database on restart.
632    pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
633        // Raise the inactivity floor by the # of uncommitted operations, plus 1 to account for the
634        // commit op that will be appended.
635        self.raise_inactivity_floor(metadata, self.uncommitted_ops + 1)
636            .await?;
637
638        // Sync the log and process the updates to the MMR in parallel.
639        let section = self.current_section();
640        let mmr_fut = async {
641            self.mmr.process_updates(&mut self.hasher);
642            Ok::<(), Error>(())
643        };
644        try_join!(self.log.sync(section).map_err(Error::Journal), mmr_fut)?;
645
646        debug!(log_size = self.log_size, "commit complete");
647        self.uncommitted_ops = 0;
648
649        Ok(())
650    }
651
652    /// Get the location and metadata associated with the last commit, or None if no commit has been
653    /// made.
654    pub async fn get_metadata(&self) -> Result<Option<(u64, Option<V>)>, Error> {
655        let mut last_commit = self.op_count() - self.uncommitted_ops;
656        if last_commit == 0 {
657            return Ok(None);
658        }
659        last_commit -= 1;
660        let section = last_commit / self.log_items_per_section;
661        let offset = self.locations.read(last_commit).await?;
662        let Operation::CommitFloor(metadata, _) = self.log.get(section, offset).await? else {
663            unreachable!("no commit operation at location of last commit {last_commit}");
664        };
665
666        Ok(Some((last_commit, metadata)))
667    }
668
669    /// Sync all database state to disk. While this isn't necessary to ensure durability of
670    /// committed operations, periodic invocation may reduce memory usage and the time required to
671    /// recover the database on restart.
672    pub async fn sync(&mut self) -> Result<(), Error> {
673        let section = self.current_section();
674        try_join!(
675            self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
676            self.log.sync(section).map_err(Error::Journal),
677            self.locations.sync().map_err(Error::Journal),
678        )?;
679
680        Ok(())
681    }
682
683    // Moves the given operation to the tip of the log if it is active, rendering its old location
684    // inactive. If the operation was not active, then this is a no-op. Returns the old location
685    // of the operation if it was active.
686    pub(super) async fn move_op_if_active(
687        &mut self,
688        op: Operation<K, V>,
689        old_loc: u64,
690    ) -> Result<Option<u64>, Error> {
691        // If the translated key is not in the snapshot, get a cursor to look for the key.
692        let Some(key) = op.key() else {
693            // `op` is not a key-related operation, so it is not active.
694            return Ok(None);
695        };
696        let new_loc = self.op_count();
697        let Some(mut cursor) = self.snapshot.get_mut(key) else {
698            return Ok(None);
699        };
700
701        // Iterate over all conflicting keys in the snapshot.
702        while let Some(&loc) = cursor.next() {
703            if loc == old_loc {
704                // Update the location of the operation in the snapshot.
705                cursor.update(new_loc);
706                drop(cursor);
707
708                // Apply the moved operation.
709                self.apply_op(op).await?;
710                return Ok(Some(old_loc));
711            }
712        }
713
714        // The operation is not active, so this is a no-op.
715        Ok(None)
716    }
717
718    /// Raise the inactivity floor by exactly `max_steps` steps, followed by applying a commit
719    /// operation. Each step either advances over an inactive operation, or re-applies an active
720    /// operation to the tip and then advances over it.
721    ///
722    /// This method does not change the state of the db's snapshot, but it always changes the root
723    /// since it applies at least one operation.
724    async fn raise_inactivity_floor(
725        &mut self,
726        metadata: Option<V>,
727        max_steps: u64,
728    ) -> Result<(), Error> {
729        for _ in 0..max_steps {
730            if self.inactivity_floor_loc == self.op_count() {
731                break;
732            }
733            let op = self.get_op(self.inactivity_floor_loc).await?;
734            self.move_op_if_active(op, self.inactivity_floor_loc)
735                .await?;
736            self.inactivity_floor_loc += 1;
737        }
738
739        self.apply_op(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
740            .await?;
741
742        Ok(())
743    }
744
745    /// Prune historical operations. This does not affect the db's root or current snapshot.
746    ///
747    /// # Panics
748    ///
749    /// Panics if `target_prune_loc` is greater than the inactivity floor.
750    pub async fn prune(&mut self, target_prune_loc: u64) -> Result<(), Error> {
751        assert!(target_prune_loc <= self.inactivity_floor_loc);
752        if target_prune_loc <= self.oldest_retained_loc {
753            return Ok(());
754        }
755
756        // Sync the mmr before pruning the log, otherwise the MMR tip could end up behind the log's
757        // pruning boundary on restart from an unclean shutdown, and there would be no way to replay
758        // the operations between the MMR tip and the log pruning boundary.
759        // TODO(https://github.com/commonwarexyz/monorepo/issues/1554): We currently sync locations
760        // as well, but this could be avoided by extending recovery.
761        try_join!(
762            self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
763            self.locations.sync().map_err(Error::Journal),
764        )?;
765
766        // Prune the log up to the section containing the requested pruning location. We always
767        // prune the log first, and then prune the MMR+locations structures based on the log's
768        // actual pruning boundary. This procedure ensures all log operations always have
769        // corresponding MMR & location entries, even in the event of failures, with no need for
770        // special recovery.
771        let section_with_target = target_prune_loc / self.log_items_per_section;
772        if !self.log.prune(section_with_target).await? {
773            return Ok(());
774        }
775        self.oldest_retained_loc = section_with_target * self.log_items_per_section;
776
777        debug!(
778            log_size = self.log_size,
779            oldest_retained_loc = self.oldest_retained_loc,
780            "pruned inactive ops"
781        );
782
783        // Prune the MMR & locations map up to the oldest retained item in the log after pruning.
784        try_join!(
785            self.locations
786                .prune(self.oldest_retained_loc)
787                .map_err(Error::Journal),
788            self.mmr
789                .prune_to_pos(&mut self.hasher, leaf_num_to_pos(self.oldest_retained_loc))
790                .map_err(Error::Mmr),
791        )?;
792
793        Ok(())
794    }
795
796    /// Close the db. Operations that have not been committed will be lost.
797    pub async fn close(mut self) -> Result<(), Error> {
798        if self.uncommitted_ops > 0 {
799            warn!(
800                op_count = self.uncommitted_ops,
801                "closing db with uncommitted operations"
802            );
803        }
804
805        try_join!(
806            self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
807            self.log.close().map_err(Error::Journal),
808            self.locations.close().map_err(Error::Journal),
809        )?;
810
811        Ok(())
812    }
813
814    /// Destroy the db, removing all data from disk.
815    pub async fn destroy(self) -> Result<(), Error> {
816        try_join!(
817            self.log.destroy().map_err(Error::Journal),
818            self.mmr.destroy().map_err(Error::Mmr),
819            self.locations.destroy().map_err(Error::Journal),
820        )?;
821
822        Ok(())
823    }
824
825    /// Simulate an unclean shutdown by consuming the db without syncing (or only partially syncing)
826    /// the log and/or locations and/or mmr. When _not_ fully syncing the mmr, the `write_limit`
827    /// parameter dictates how many mmr nodes to write during a partial sync (can be 0).
828    #[cfg(test)]
829    pub(super) async fn simulate_failure(
830        mut self,
831        sync_log: bool,
832        sync_locations: bool,
833        sync_mmr: bool,
834        write_limit: usize,
835    ) -> Result<(), Error> {
836        let section = self.current_section();
837        if sync_log {
838            self.log.sync(section).await?;
839        }
840        if sync_locations {
841            self.locations.sync().await?;
842        }
843        if sync_mmr {
844            assert_eq!(write_limit, 0);
845            self.mmr.sync(&mut self.hasher).await?;
846        } else if write_limit > 0 {
847            self.mmr
848                .simulate_partial_sync(&mut self.hasher, write_limit)
849                .await?;
850        }
851
852        Ok(())
853    }
854}
855
856#[cfg(test)]
857pub(super) mod test {
858    use super::*;
859    use crate::{
860        adb::verify_proof,
861        mmr::{hasher::Standard, mem::Mmr as MemMmr},
862        translator::TwoCap,
863    };
864    use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
865    use commonware_macros::test_traced;
866    use commonware_runtime::{deterministic, Runner as _};
867    use commonware_utils::NZU64;
868    use std::collections::HashMap;
869
870    const PAGE_SIZE: usize = 77;
871    const PAGE_CACHE_SIZE: usize = 9;
872
873    fn db_config(suffix: &str) -> Config<TwoCap, (commonware_codec::RangeCfg, ())> {
874        Config {
875            mmr_journal_partition: format!("journal_{suffix}"),
876            mmr_metadata_partition: format!("metadata_{suffix}"),
877            mmr_items_per_blob: NZU64!(11),
878            mmr_write_buffer: NZUsize!(1024),
879            log_journal_partition: format!("log_journal_{suffix}"),
880            log_items_per_section: NZU64!(7),
881            log_write_buffer: NZUsize!(1024),
882            log_compression: None,
883            log_codec_config: ((0..=10000).into(), ()),
884            locations_journal_partition: format!("locations_journal_{suffix}"),
885            locations_items_per_blob: NZU64!(7),
886            translator: TwoCap,
887            thread_pool: None,
888            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
889        }
890    }
891
892    /// A type alias for the concrete [Any] type used in these unit tests.
893    type AnyTest = Any<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
894
895    /// Return an `Any` database initialized with a fixed config.
896    async fn open_db(context: deterministic::Context) -> AnyTest {
897        AnyTest::init(context, db_config("partition"))
898            .await
899            .unwrap()
900    }
901
902    #[test_traced("WARN")]
903    pub fn test_any_variable_db_empty() {
904        let executor = deterministic::Runner::default();
905        executor.start(|context| async move {
906            let mut db = open_db(context.clone()).await;
907            let mut hasher = Standard::<Sha256>::new();
908            assert_eq!(db.op_count(), 0);
909            assert_eq!(db.oldest_retained_loc(), None);
910            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
911            assert_eq!(db.root(&mut hasher), MemMmr::default().root(&mut hasher));
912
913            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
914            let d1 = Sha256::fill(1u8);
915            let v1 = vec![1u8; 8];
916            let root = db.root(&mut hasher);
917            db.update(d1, v1).await.unwrap();
918            db.close().await.unwrap();
919            let mut db = open_db(context.clone()).await;
920            assert_eq!(db.root(&mut hasher), root);
921            assert_eq!(db.op_count(), 0);
922
923            // Test calling commit on an empty db which should make it (durably) non-empty.
924            db.commit(None).await.unwrap();
925            assert_eq!(db.op_count(), 1); // floor op added
926            let root = db.root(&mut hasher);
927            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
928            let mut db = open_db(context.clone()).await;
929            assert_eq!(db.root(&mut hasher), root);
930
931            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits.
932            for _ in 1..100 {
933                db.commit(None).await.unwrap();
934                assert_eq!(db.op_count() - 1, db.inactivity_floor_loc);
935            }
936
937            db.destroy().await.unwrap();
938        });
939    }
940
941    #[test_traced("WARN")]
942    pub fn test_any_variable_db_build_basic() {
943        let executor = deterministic::Runner::default();
944        executor.start(|context| async move {
945            // Build a db with 2 keys and make sure updates and deletions of those keys work as
946            // expected.
947            let mut hasher = Standard::<Sha256>::new();
948            let mut db = open_db(context.clone()).await;
949
950            let d1 = Sha256::fill(1u8);
951            let d2 = Sha256::fill(2u8);
952            let v1 = vec![1u8; 8];
953            let v2 = vec![2u8; 20];
954
955            assert!(db.get(&d1).await.unwrap().is_none());
956            assert!(db.get(&d2).await.unwrap().is_none());
957
958            db.update(d1, v1.clone()).await.unwrap();
959            assert_eq!(db.get(&d1).await.unwrap().unwrap(), v1);
960            assert!(db.get(&d2).await.unwrap().is_none());
961
962            db.update(d2, v1.clone()).await.unwrap();
963            assert_eq!(db.get(&d1).await.unwrap().unwrap(), v1);
964            assert_eq!(db.get(&d2).await.unwrap().unwrap(), v1);
965
966            db.delete(d1).await.unwrap();
967            assert!(db.get(&d1).await.unwrap().is_none());
968            assert_eq!(db.get(&d2).await.unwrap().unwrap(), v1);
969
970            db.update(d1, v2.clone()).await.unwrap();
971            assert_eq!(db.get(&d1).await.unwrap().unwrap(), v2);
972
973            db.update(d2, v1.clone()).await.unwrap();
974            assert_eq!(db.get(&d2).await.unwrap().unwrap(), v1);
975
976            assert_eq!(db.op_count(), 5); // 4 updates, 1 deletion.
977            assert_eq!(db.snapshot.keys(), 2);
978            assert_eq!(db.inactivity_floor_loc, 0);
979            db.sync().await.unwrap();
980
981            // Advance over 3 inactive operations.
982            db.raise_inactivity_floor(None, 3).await.unwrap();
983            assert_eq!(db.inactivity_floor_loc, 3);
984            assert_eq!(db.op_count(), 6); // 4 updates, 1 deletion, 1 commit
985            db.sync().await.unwrap();
986
987            // Delete all keys.
988            db.delete(d1).await.unwrap();
989            db.delete(d2).await.unwrap();
990            assert!(db.get(&d1).await.unwrap().is_none());
991            assert!(db.get(&d2).await.unwrap().is_none());
992            assert_eq!(db.op_count(), 8); // 4 updates, 3 deletions, 1 commit
993            assert_eq!(db.inactivity_floor_loc, 3);
994
995            db.sync().await.unwrap();
996
997            // Multiple deletions of the same key should be a no-op.
998            db.delete(d1).await.unwrap();
999            assert_eq!(db.op_count(), 8);
1000
1001            // Deletions of non-existent keys should be a no-op.
1002            let d3 = Sha256::fill(3u8);
1003            db.delete(d3).await.unwrap();
1004            assert_eq!(db.op_count(), 8);
1005
1006            // Make sure closing/reopening gets us back to the same state.
1007            let metadata = Some(vec![99, 100]);
1008            db.commit(metadata.clone()).await.unwrap();
1009            assert_eq!(db.op_count(), 9);
1010            let root = db.root(&mut hasher);
1011            db.close().await.unwrap();
1012            let mut db = open_db(context.clone()).await;
1013            assert_eq!(db.op_count(), 9);
1014            assert_eq!(db.root(&mut hasher), root);
1015
1016            // Since this db no longer has any active keys, we should be able to raise the
1017            // inactivity floor to the tip (only the inactive commit op remains).
1018            db.raise_inactivity_floor(None, 100).await.unwrap();
1019            assert_eq!(db.inactivity_floor_loc, db.op_count() - 1);
1020
1021            // Make sure we can still get the metadata.
1022            assert_eq!(db.get_metadata().await.unwrap(), Some((8, metadata)));
1023
1024            // Re-activate the keys by updating them.
1025            db.update(d1, v1.clone()).await.unwrap();
1026            db.update(d2, v2.clone()).await.unwrap();
1027            db.delete(d1).await.unwrap();
1028            db.update(d2, v1.clone()).await.unwrap();
1029            db.update(d1, v2.clone()).await.unwrap();
1030            assert_eq!(db.snapshot.keys(), 2);
1031
1032            // Confirm close/reopen gets us back to the same state.
1033            db.commit(None).await.unwrap();
1034            assert_eq!(db.op_count(), 19);
1035            let root = db.root(&mut hasher);
1036            db.close().await.unwrap();
1037            let mut db = open_db(context.clone()).await;
1038            assert_eq!(db.root(&mut hasher), root);
1039            assert_eq!(db.snapshot.keys(), 2);
1040            assert_eq!(db.op_count(), 19);
1041            assert_eq!(db.get_metadata().await.unwrap(), Some((18, None)));
1042
1043            // Commit will raise the inactivity floor, which won't affect state but will affect the
1044            // root.
1045            db.commit(None).await.unwrap();
1046
1047            assert!(db.root(&mut hasher) != root);
1048
1049            // Pruning inactive ops should not affect current state or root
1050            let root = db.root(&mut hasher);
1051            db.prune(db.inactivity_floor_loc()).await.unwrap();
1052            assert_eq!(db.snapshot.keys(), 2);
1053            assert_eq!(db.root(&mut hasher), root);
1054
1055            db.destroy().await.unwrap();
1056        });
1057    }
1058
1059    #[test_traced("WARN")]
1060    pub fn test_any_variable_db_build_and_authenticate() {
1061        let executor = deterministic::Runner::default();
1062        // Build a db with 1000 keys, some of which we update and some of which we delete, and
1063        // confirm that the end state of the db matches that of an identically updated hashmap.
1064        const ELEMENTS: u64 = 1000;
1065        executor.start(|context| async move {
1066            let mut hasher = Standard::<Sha256>::new();
1067            let mut db = open_db(context.clone()).await;
1068
1069            let mut map = HashMap::<Digest, Vec<u8>>::default();
1070            for i in 0u64..ELEMENTS {
1071                let k = Sha256::hash(&i.to_be_bytes());
1072                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1073                db.update(k, v.clone()).await.unwrap();
1074                map.insert(k, v);
1075            }
1076
1077            // Update every 3rd key
1078            for i in 0u64..ELEMENTS {
1079                if i % 3 != 0 {
1080                    continue;
1081                }
1082                let k = Sha256::hash(&i.to_be_bytes());
1083                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1084                db.update(k, v.clone()).await.unwrap();
1085                map.insert(k, v);
1086            }
1087
1088            // Delete every 7th key
1089            for i in 0u64..ELEMENTS {
1090                if i % 7 != 1 {
1091                    continue;
1092                }
1093                let k = Sha256::hash(&i.to_be_bytes());
1094                db.delete(k).await.unwrap();
1095                map.remove(&k);
1096            }
1097
1098            assert_eq!(db.op_count(), 1477);
1099            assert_eq!(db.inactivity_floor_loc, 0);
1100            assert_eq!(db.oldest_retained_loc().unwrap(), 0); // no pruning yet
1101            assert_eq!(db.snapshot.items(), 857);
1102
1103            // Test that commit will raise the activity floor.
1104            db.commit(None).await.unwrap();
1105            assert_eq!(db.op_count(), 2336);
1106            assert_eq!(db.inactivity_floor_loc, 1478);
1107            db.sync().await.unwrap();
1108            db.prune(db.inactivity_floor_loc()).await.unwrap();
1109            assert_eq!(db.oldest_retained_loc().unwrap(), 1477);
1110            assert_eq!(db.snapshot.items(), 857);
1111
1112            // Close & reopen the db, making sure the re-opened db has exactly the same state.
1113            let root = db.root(&mut hasher);
1114            db.close().await.unwrap();
1115            let mut db = open_db(context.clone()).await;
1116            assert_eq!(root, db.root(&mut hasher));
1117            assert_eq!(db.op_count(), 2336);
1118            assert_eq!(db.inactivity_floor_loc, 1478);
1119            assert_eq!(db.snapshot.items(), 857);
1120
1121            // Raise the inactivity floor to the point where all inactive operations can be pruned.
1122            db.raise_inactivity_floor(None, 3000).await.unwrap();
1123            db.prune(db.inactivity_floor_loc()).await.unwrap();
1124            assert_eq!(db.inactivity_floor_loc, 4478);
1125            // Inactivity floor should be 858 operations from tip since 858 operations are active
1126            // (counting the floor op itself).
1127            assert_eq!(db.op_count(), 4478 + 858);
1128            assert_eq!(db.snapshot.items(), 857);
1129
1130            // Confirm the db's state matches that of the separate map we computed independently.
1131            for i in 0u64..1000 {
1132                let k = Sha256::hash(&i.to_be_bytes());
1133                if let Some(map_value) = map.get(&k) {
1134                    let Some(db_value) = db.get(&k).await.unwrap() else {
1135                        panic!("key not found in db: {k}");
1136                    };
1137                    assert_eq!(*map_value, db_value);
1138                } else {
1139                    assert!(db.get(&k).await.unwrap().is_none());
1140                }
1141            }
1142
1143            // Make sure size-constrained batches of operations are provable from the oldest
1144            // retained op to tip.
1145            let max_ops = 4;
1146            let end_loc = db.op_count();
1147            let start_pos = db.mmr.pruned_to_pos();
1148            let start_loc = leaf_pos_to_num(start_pos).unwrap();
1149            // Raise the inactivity floor and make sure historical inactive operations are still provable.
1150            db.raise_inactivity_floor(None, 100).await.unwrap();
1151            db.sync().await.unwrap();
1152            let root = db.root(&mut hasher);
1153            assert!(start_loc < db.inactivity_floor_loc);
1154
1155            for i in start_loc..end_loc {
1156                let (proof, log) = db.proof(i, max_ops).await.unwrap();
1157                assert!(verify_proof(&mut hasher, &proof, i, &log, &root));
1158            }
1159
1160            db.destroy().await.unwrap();
1161        });
1162    }
1163
1164    // Test that replaying multiple updates of the same key on startup doesn't leave behind old data
1165    // in the snapshot.
1166    #[test_traced("WARN")]
1167    pub fn test_any_db_log_replay() {
1168        let executor = deterministic::Runner::default();
1169        executor.start(|context| async move {
1170            let mut hasher = Standard::<Sha256>::new();
1171            let mut db = open_db(context.clone()).await;
1172
1173            // Update the same key many times.
1174            const UPDATES: u64 = 100;
1175            let k = Sha256::hash(&UPDATES.to_be_bytes());
1176            for i in 0u64..UPDATES {
1177                let v = vec![(i % 255) as u8; ((i % 7) + 3) as usize];
1178                db.update(k, v).await.unwrap();
1179            }
1180            db.commit(None).await.unwrap();
1181            let root = db.root(&mut hasher);
1182            db.close().await.unwrap();
1183
1184            // Simulate a failed commit and test that the log replay doesn't leave behind old data.
1185            let db = open_db(context.clone()).await;
1186            let iter = db.snapshot.get(&k);
1187            assert_eq!(iter.cloned().collect::<Vec<_>>().len(), 1);
1188            assert_eq!(db.root(&mut hasher), root);
1189
1190            db.destroy().await.unwrap();
1191        });
1192    }
1193
1194    #[test_traced("WARN")]
1195    pub fn test_any_db_multiple_commits_delete_gets_replayed() {
1196        let executor = deterministic::Runner::default();
1197        executor.start(|context| async move {
1198            let mut hasher = Standard::<Sha256>::new();
1199            let mut db = open_db(context.clone()).await;
1200
1201            let mut map = HashMap::<Digest, Vec<u8>>::default();
1202            const ELEMENTS: u64 = 10;
1203            // insert & commit multiple batches to ensure repeated inactivity floor raising.
1204            for j in 0u64..ELEMENTS {
1205                for i in 0u64..ELEMENTS {
1206                    let k = Sha256::hash(&(j * 1000 + i).to_be_bytes());
1207                    let v = vec![(i % 255) as u8; ((i % 7) + 3) as usize];
1208                    db.update(k, v.clone()).await.unwrap();
1209                    map.insert(k, v);
1210                }
1211                db.commit(None).await.unwrap();
1212            }
1213            let k = Sha256::hash(&((ELEMENTS - 1) * 1000 + (ELEMENTS - 1)).to_be_bytes());
1214
1215            // Do one last delete operation which will be above the inactivity
1216            // floor, to make sure it gets replayed on restart.
1217            db.delete(k).await.unwrap();
1218            db.commit(None).await.unwrap();
1219            assert!(db.get(&k).await.unwrap().is_none());
1220
1221            // Close & reopen the db, making sure the re-opened db has exactly the same state.
1222            let root = db.root(&mut hasher);
1223            db.close().await.unwrap();
1224            let db = open_db(context.clone()).await;
1225            assert_eq!(root, db.root(&mut hasher));
1226            assert!(db.get(&k).await.unwrap().is_none());
1227
1228            db.destroy().await.unwrap();
1229        });
1230    }
1231
1232    #[test_traced("WARN")]
1233    pub fn test_any_variable_db_recovery() {
1234        let executor = deterministic::Runner::default();
1235        // Build a db with 1000 keys, some of which we update and some of which we delete.
1236        const ELEMENTS: u64 = 1000;
1237        executor.start(|context| async move {
1238            let mut hasher = Standard::<Sha256>::new();
1239            let mut db = open_db(context.clone()).await;
1240            let root = db.root(&mut hasher);
1241
1242            for i in 0u64..ELEMENTS {
1243                let k = Sha256::hash(&i.to_be_bytes());
1244                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1245                db.update(k, v.clone()).await.unwrap();
1246            }
1247
1248            // Simulate a failure and test that we rollback to the previous root.
1249            db.simulate_failure(false, false, false, 0).await.unwrap();
1250            let mut db = open_db(context.clone()).await;
1251            assert_eq!(root, db.root(&mut hasher));
1252
1253            // re-apply the updates and commit them this time.
1254            for i in 0u64..ELEMENTS {
1255                let k = Sha256::hash(&i.to_be_bytes());
1256                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1257                db.update(k, v.clone()).await.unwrap();
1258            }
1259            db.commit(None).await.unwrap();
1260            let root = db.root(&mut hasher);
1261
1262            // Update every 3rd key
1263            for i in 0u64..ELEMENTS {
1264                if i % 3 != 0 {
1265                    continue;
1266                }
1267                let k = Sha256::hash(&i.to_be_bytes());
1268                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1269                db.update(k, v.clone()).await.unwrap();
1270            }
1271
1272            // Simulate a failure and test that we rollback to the previous root.
1273            db.simulate_failure(false, false, false, 0).await.unwrap();
1274            let mut db = open_db(context.clone()).await;
1275            assert_eq!(root, db.root(&mut hasher));
1276
1277            // Re-apply updates for every 3rd key and commit them this time.
1278            for i in 0u64..ELEMENTS {
1279                if i % 3 != 0 {
1280                    continue;
1281                }
1282                let k = Sha256::hash(&i.to_be_bytes());
1283                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1284                db.update(k, v.clone()).await.unwrap();
1285            }
1286            db.commit(None).await.unwrap();
1287            let root = db.root(&mut hasher);
1288
1289            // Delete every 7th key
1290            for i in 0u64..ELEMENTS {
1291                if i % 7 != 1 {
1292                    continue;
1293                }
1294                let k = Sha256::hash(&i.to_be_bytes());
1295                db.delete(k).await.unwrap();
1296            }
1297
1298            // Simulate a failure and test that we rollback to the previous root.
1299            db.simulate_failure(false, false, false, 0).await.unwrap();
1300            let mut db = open_db(context.clone()).await;
1301            assert_eq!(root, db.root(&mut hasher));
1302
1303            // Re-delete every 7th key and commit this time.
1304            for i in 0u64..ELEMENTS {
1305                if i % 7 != 1 {
1306                    continue;
1307                }
1308                let k = Sha256::hash(&i.to_be_bytes());
1309                db.delete(k).await.unwrap();
1310            }
1311            db.commit(None).await.unwrap();
1312
1313            let root = db.root(&mut hasher);
1314            assert_eq!(db.op_count(), 2787);
1315            assert_eq!(leaf_pos_to_num(db.mmr.size()), Some(2787));
1316            assert_eq!(db.locations.size().await.unwrap(), 2787);
1317            assert_eq!(db.inactivity_floor_loc, 1480);
1318            db.sync().await.unwrap(); // test pruning boundary after sync w/ prune
1319            db.prune(db.inactivity_floor_loc()).await.unwrap();
1320            assert_eq!(db.oldest_retained_loc().unwrap(), 1477);
1321            assert_eq!(db.snapshot.items(), 857);
1322            db.close().await.unwrap();
1323
1324            let db = open_db(context.clone()).await;
1325            assert_eq!(root, db.root(&mut hasher));
1326            assert_eq!(db.op_count(), 2787);
1327            assert_eq!(leaf_pos_to_num(db.mmr.size()), Some(2787));
1328            assert_eq!(db.locations.size().await.unwrap(), 2787);
1329            assert_eq!(db.inactivity_floor_loc, 1480);
1330            assert_eq!(db.oldest_retained_loc().unwrap(), 1477);
1331            assert_eq!(db.snapshot.items(), 857);
1332
1333            db.destroy().await.unwrap();
1334        });
1335    }
1336
1337    /// Test that various types of unclean shutdown while updating a non-empty DB recover to the
1338    /// empty DB on re-open.
1339    #[test_traced("WARN")]
1340    fn test_any_variable_non_empty_db_recovery() {
1341        let executor = deterministic::Runner::default();
1342        executor.start(|context| async move {
1343            let mut hasher = Standard::<Sha256>::new();
1344            let mut db = open_db(context.clone()).await;
1345
1346            // Insert 1000 keys then sync.
1347            for i in 0u64..1000 {
1348                let k = Sha256::hash(&i.to_be_bytes());
1349                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1350                db.update(k, v).await.unwrap();
1351            }
1352            db.commit(None).await.unwrap();
1353            db.prune(db.inactivity_floor_loc()).await.unwrap();
1354            let root = db.root(&mut hasher);
1355            let op_count = db.op_count();
1356            let inactivity_floor_loc = db.inactivity_floor_loc();
1357
1358            // Reopen DB without clean shutdown and make sure the state is the same.
1359            let mut db = open_db(context.clone()).await;
1360            assert_eq!(db.op_count(), op_count);
1361            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1362            assert_eq!(db.root(&mut hasher), root);
1363
1364            async fn apply_more_ops(
1365                db: &mut Any<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>,
1366            ) {
1367                for i in 0u64..1000 {
1368                    let k = Sha256::hash(&i.to_be_bytes());
1369                    let v = vec![(i % 255) as u8; ((i % 13) + 8) as usize];
1370                    db.update(k, v).await.unwrap();
1371                }
1372            }
1373
1374            // Insert operations without commit, then simulate failure, syncing nothing.
1375            apply_more_ops(&mut db).await;
1376            db.simulate_failure(false, false, false, 0).await.unwrap();
1377            let mut db = open_db(context.clone()).await;
1378            assert_eq!(db.op_count(), op_count);
1379            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1380            assert_eq!(db.root(&mut hasher), root);
1381
1382            // Repeat, though this time sync the log.
1383            apply_more_ops(&mut db).await;
1384            db.simulate_failure(true, false, false, 10).await.unwrap();
1385            let mut db = open_db(context.clone()).await;
1386            assert_eq!(db.op_count(), op_count);
1387            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1388            assert_eq!(db.root(&mut hasher), root);
1389
1390            // Repeat, though this time only fully sync locations.
1391            apply_more_ops(&mut db).await;
1392            db.simulate_failure(false, true, false, 0).await.unwrap();
1393            let mut db = open_db(context.clone()).await;
1394            assert_eq!(db.op_count(), op_count);
1395            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1396            assert_eq!(db.root(&mut hasher), root);
1397
1398            // Repeat, though this time only fully sync mmr.
1399            apply_more_ops(&mut db).await;
1400            db.simulate_failure(false, false, true, 0).await.unwrap();
1401            let mut db = open_db(context.clone()).await;
1402            assert_eq!(db.op_count(), op_count);
1403            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1404            assert_eq!(db.root(&mut hasher), root);
1405
1406            // Repeat, though this time fully sync log + mmr.
1407            apply_more_ops(&mut db).await;
1408            db.simulate_failure(true, false, false, 0).await.unwrap();
1409            let mut db = open_db(context.clone()).await;
1410            assert_eq!(db.op_count(), op_count);
1411            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1412            assert_eq!(db.root(&mut hasher), root);
1413            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1414
1415            // Repeat, though this time fully sync log + locations.
1416            apply_more_ops(&mut db).await;
1417            db.simulate_failure(true, true, false, 0).await.unwrap();
1418            let mut db = open_db(context.clone()).await;
1419            assert_eq!(db.op_count(), op_count);
1420            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1421            assert_eq!(db.root(&mut hasher), root);
1422            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1423
1424            // Repeat, though this time fully sync only locations + mmr.
1425            apply_more_ops(&mut db).await;
1426            db.simulate_failure(false, true, true, 0).await.unwrap();
1427            let mut db = open_db(context.clone()).await;
1428            assert_eq!(db.op_count(), op_count);
1429            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1430            assert_eq!(db.root(&mut hasher), root);
1431            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1432
1433            // One last check that re-open without proper shutdown still recovers the correct state.
1434            apply_more_ops(&mut db).await;
1435            apply_more_ops(&mut db).await;
1436            apply_more_ops(&mut db).await;
1437            let mut db = open_db(context.clone()).await;
1438            assert_eq!(db.op_count(), op_count);
1439            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1440            assert_eq!(db.root(&mut hasher), root);
1441
1442            // Apply the ops one last time but fully commit them this time, then clean up.
1443            apply_more_ops(&mut db).await;
1444            db.commit(None).await.unwrap();
1445            let db = open_db(context.clone()).await;
1446            assert!(db.op_count() > op_count);
1447            assert_ne!(db.inactivity_floor_loc(), inactivity_floor_loc);
1448            assert_ne!(db.root(&mut hasher), root);
1449
1450            db.destroy().await.unwrap();
1451        });
1452    }
1453
1454    /// Test that various types of unclean shutdown while updating an empty DB recover to the empty
1455    /// DB on re-open.
1456    #[test_traced("WARN")]
1457    fn test_any_variable_empty_db_recovery() {
1458        let executor = deterministic::Runner::default();
1459        executor.start(|context| async move {
1460            // Initialize an empty db.
1461            let mut hasher = Standard::<Sha256>::new();
1462            let db = open_db(context.clone()).await;
1463            let root = db.root(&mut hasher);
1464
1465            // Reopen DB without clean shutdown and make sure the state is the same.
1466            let mut db = open_db(context.clone()).await;
1467            assert_eq!(db.op_count(), 0);
1468            assert_eq!(db.root(&mut hasher), root);
1469
1470            async fn apply_ops(
1471                db: &mut Any<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>,
1472            ) {
1473                for i in 0u64..1000 {
1474                    let k = Sha256::hash(&i.to_be_bytes());
1475                    let v = vec![(i % 255) as u8; ((i % 13) + 8) as usize];
1476                    db.update(k, v).await.unwrap();
1477                }
1478            }
1479
1480            // Insert operations without commit then simulate failure (partially sync mmr).
1481            apply_ops(&mut db).await;
1482            db.simulate_failure(false, false, false, 1).await.unwrap();
1483            let mut db = open_db(context.clone()).await;
1484            assert_eq!(db.op_count(), 0);
1485            assert_eq!(db.root(&mut hasher), root);
1486
1487            // Insert another 1000 keys then simulate failure (sync only the log).
1488            apply_ops(&mut db).await;
1489            db.simulate_failure(true, false, false, 0).await.unwrap();
1490            let mut db = open_db(context.clone()).await;
1491            assert_eq!(db.op_count(), 0);
1492            assert_eq!(db.root(&mut hasher), root);
1493
1494            // Insert another 1000 keys then simulate failure (sync only the mmr).
1495            apply_ops(&mut db).await;
1496            db.simulate_failure(false, true, false, 0).await.unwrap();
1497            let mut db = open_db(context.clone()).await;
1498            assert_eq!(db.op_count(), 0);
1499            assert_eq!(db.root(&mut hasher), root);
1500
1501            // One last check that re-open without proper shutdown still recovers the correct state.
1502            apply_ops(&mut db).await;
1503            apply_ops(&mut db).await;
1504            apply_ops(&mut db).await;
1505            let mut db = open_db(context.clone()).await;
1506            assert_eq!(db.op_count(), 0);
1507            assert_eq!(db.root(&mut hasher), root);
1508
1509            // Apply the ops one last time but fully commit them this time, then clean up.
1510            apply_ops(&mut db).await;
1511            db.commit(None).await.unwrap();
1512            let db = open_db(context.clone()).await;
1513            assert!(db.op_count() > 0);
1514            assert_ne!(db.root(&mut hasher), root);
1515
1516            db.destroy().await.unwrap();
1517        });
1518    }
1519}