commonware_storage/adb/any/fixed/
mod.rs

1//! An authenticated database (ADB) that provides succinct proofs of _any_ value ever associated
2//! with a key. Its implementation is based on an [Mmr] over a log of state-change operations backed
3//! by a [Journal].
4//!
5//! In the [Any] db, it is not possible to prove whether the value of a key is the currently active
6//! one, only that it was associated with the key at some point in the past. This type of
7//! authenticated database is most useful for applications involving keys that are given values once
8//! and cannot be updated after.
9
10use crate::{
11    adb::Error,
12    index::Index,
13    journal::fixed::{Config as JConfig, Journal},
14    mmr::{
15        bitmap::Bitmap,
16        hasher::Standard,
17        iterator::{leaf_num_to_pos, leaf_pos_to_num},
18        journaled::{Config as MmrConfig, Mmr},
19        verification::Proof,
20    },
21    store::operation::Fixed as Operation,
22    translator::Translator,
23};
24use commonware_codec::{CodecFixed, Encode as _};
25use commonware_cryptography::Hasher as CHasher;
26use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage, ThreadPool};
27use commonware_utils::{Array, NZUsize};
28use futures::{
29    future::{try_join_all, TryFutureExt},
30    pin_mut, try_join, StreamExt,
31};
32use std::num::{NonZeroU64, NonZeroUsize};
33use tracing::{debug, warn};
34
35pub mod sync;
36
37/// Indicator that the generic parameter N is unused by the call. N is only
38/// needed if the caller is providing the optional bitmap.
39const UNUSED_N: usize = 0;
40
41/// The size of the read buffer to use for replaying the operations log when rebuilding the
42/// snapshot.
43const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
44
45/// Configuration for an `Any` authenticated db.
46#[derive(Clone)]
47pub struct Config<T: Translator> {
48    /// The name of the [Storage] partition used for the MMR's backing journal.
49    pub mmr_journal_partition: String,
50
51    /// The items per blob configuration value used by the MMR journal.
52    pub mmr_items_per_blob: NonZeroU64,
53
54    /// The size of the write buffer to use for each blob in the MMR journal.
55    pub mmr_write_buffer: NonZeroUsize,
56
57    /// The name of the [Storage] partition used for the MMR's metadata.
58    pub mmr_metadata_partition: String,
59
60    /// The name of the [Storage] partition used to persist the (pruned) log of operations.
61    pub log_journal_partition: String,
62
63    /// The items per blob configuration value used by the log journal.
64    pub log_items_per_blob: NonZeroU64,
65
66    /// The size of the write buffer to use for each blob in the log journal.
67    pub log_write_buffer: NonZeroUsize,
68
69    /// The translator used by the compressed index.
70    pub translator: T,
71
72    /// An optional thread pool to use for parallelizing batch operations.
73    pub thread_pool: Option<ThreadPool>,
74
75    /// The buffer pool to use for caching data.
76    pub buffer_pool: PoolRef,
77}
78
79/// A key-value ADB based on an MMR over its log of operations, supporting authentication of any
80/// value ever associated with a key.
81pub struct Any<
82    E: Storage + Clock + Metrics,
83    K: Array,
84    V: CodecFixed<Cfg = ()>,
85    H: CHasher,
86    T: Translator,
87> {
88    /// An MMR over digests of the operations applied to the db.
89    ///
90    /// # Invariant
91    ///
92    /// - The number of leaves in this MMR always equals the number of operations in the unpruned
93    ///   `log`.
94    /// - The MMR is never pruned beyond the inactivity floor.
95    pub(crate) mmr: Mmr<E, H>,
96
97    /// A (pruned) log of all operations applied to the db in order of occurrence. The position of
98    /// each operation in the log is called its _location_, which is a stable identifier.
99    ///
100    /// # Invariants
101    ///
102    /// - An operation's location is always equal to the number of the MMR leaf storing the digest
103    ///   of the operation.
104    /// - The log is never pruned beyond the inactivity floor.
105    pub(crate) log: Journal<E, Operation<K, V>>,
106
107    /// A snapshot of all currently active operations in the form of a map from each key to the
108    /// location in the log containing its most recent update.
109    ///
110    /// # Invariant
111    ///
112    /// Only references operations of type [Operation::Update].
113    pub(crate) snapshot: Index<T, u64>,
114
115    /// A location before which all operations are "inactive" (that is, operations before this point
116    /// are over keys that have been updated by some operation at or after this point).
117    pub(crate) inactivity_floor_loc: u64,
118
119    /// The number of operations that are pending commit.
120    pub(crate) uncommitted_ops: u64,
121
122    /// Cryptographic hasher to re-use within mutable operations requiring digest computation.
123    pub(crate) hasher: Standard<H>,
124}
125
126impl<
127        E: Storage + Clock + Metrics,
128        K: Array,
129        V: CodecFixed<Cfg = ()>,
130        H: CHasher,
131        T: Translator,
132    > Any<E, K, V, H, T>
133{
134    /// Returns an [Any] adb initialized from `cfg`. Any uncommitted log operations will be
135    /// discarded and the state of the db will be as of the last committed operation.
136    pub async fn init(context: E, cfg: Config<T>) -> Result<Self, Error> {
137        let mut snapshot: Index<T, u64> =
138            Index::init(context.with_label("snapshot"), cfg.translator.clone());
139        let mut hasher = Standard::<H>::new();
140        let (inactivity_floor_loc, mmr, log) =
141            Self::init_mmr_and_log(context, cfg, &mut hasher).await?;
142
143        Self::build_snapshot_from_log(
144            inactivity_floor_loc,
145            &log,
146            &mut snapshot,
147            None::<&mut Bitmap<H, UNUSED_N>>,
148        )
149        .await?;
150
151        let db = Any {
152            mmr,
153            log,
154            snapshot,
155            inactivity_floor_loc,
156            uncommitted_ops: 0,
157            hasher,
158        };
159
160        Ok(db)
161    }
162
163    /// Initialize and return the mmr and log from the given config, correcting any inconsistencies
164    /// between them. Any uncommitted operations in the log will be rolled back and the state of the
165    /// db will be as of the last committed operation.
166    pub(crate) async fn init_mmr_and_log(
167        context: E,
168        cfg: Config<T>,
169        hasher: &mut Standard<H>,
170    ) -> Result<(u64, Mmr<E, H>, Journal<E, Operation<K, V>>), Error> {
171        let mut mmr = Mmr::init(
172            context.with_label("mmr"),
173            hasher,
174            MmrConfig {
175                journal_partition: cfg.mmr_journal_partition,
176                metadata_partition: cfg.mmr_metadata_partition,
177                items_per_blob: cfg.mmr_items_per_blob,
178                write_buffer: cfg.mmr_write_buffer,
179                thread_pool: cfg.thread_pool,
180                buffer_pool: cfg.buffer_pool.clone(),
181            },
182        )
183        .await?;
184
185        let mut log = Journal::init(
186            context.with_label("log"),
187            JConfig {
188                partition: cfg.log_journal_partition,
189                items_per_blob: cfg.log_items_per_blob,
190                write_buffer: cfg.log_write_buffer,
191                buffer_pool: cfg.buffer_pool,
192            },
193        )
194        .await?;
195
196        // Back up over / discard any uncommitted operations in the log.
197        let mut log_size = log.size().await?;
198        let mut rewind_leaf_num = log_size;
199        let mut inactivity_floor_loc = 0;
200        while rewind_leaf_num > 0 {
201            if let Operation::CommitFloor(loc) = log.read(rewind_leaf_num - 1).await? {
202                inactivity_floor_loc = loc;
203                break;
204            }
205            rewind_leaf_num -= 1;
206        }
207        if rewind_leaf_num != log_size {
208            let op_count = log_size - rewind_leaf_num;
209            warn!(
210                log_size,
211                op_count, "rewinding over uncommitted log operations"
212            );
213            log.rewind(rewind_leaf_num).await?;
214            log.sync().await?;
215            log_size = rewind_leaf_num;
216        }
217
218        // Pop any MMR elements that are ahead of the last log commit point.
219        let mut next_mmr_leaf_num = leaf_pos_to_num(mmr.size()).unwrap();
220        if next_mmr_leaf_num > log_size {
221            let op_count = next_mmr_leaf_num - log_size;
222            warn!(log_size, op_count, "popping uncommitted MMR operations");
223            mmr.pop(op_count as usize).await?;
224            next_mmr_leaf_num = log_size;
225        }
226
227        // If the MMR is behind, replay log operations to catch up.
228        if next_mmr_leaf_num < log_size {
229            let op_count = log_size - next_mmr_leaf_num;
230            warn!(
231                log_size,
232                op_count, "MMR lags behind log, replaying log to catch up"
233            );
234            while next_mmr_leaf_num < log_size {
235                let op = log.read(next_mmr_leaf_num).await?;
236                mmr.add_batched(hasher, &op.encode()).await?;
237                next_mmr_leaf_num += 1;
238            }
239            mmr.sync(hasher).await.map_err(Error::Mmr)?;
240        }
241
242        // At this point the MMR and log should be consistent.
243        assert_eq!(log.size().await?, leaf_pos_to_num(mmr.size()).unwrap());
244
245        Ok((inactivity_floor_loc, mmr, log))
246    }
247
248    /// Builds the database's snapshot by replaying the log starting at the inactivity floor.
249    /// Assumes the log and mmr have the same number of operations and are not pruned beyond the
250    /// inactivity floor.
251    ///
252    /// If a bitmap is provided, then a bit is appended for each operation in the operation log,
253    /// with its value reflecting its activity status. The caller is responsible for syncing any
254    /// changes made to the bitmap.
255    pub(crate) async fn build_snapshot_from_log<const N: usize>(
256        inactivity_floor_loc: u64,
257        log: &Journal<E, Operation<K, V>>,
258        snapshot: &mut Index<T, u64>,
259        mut bitmap: Option<&mut Bitmap<H, N>>,
260    ) -> Result<(), Error> {
261        if let Some(ref bitmap) = bitmap {
262            assert_eq!(inactivity_floor_loc, bitmap.bit_count());
263        }
264
265        let stream = log
266            .replay(NZUsize!(SNAPSHOT_READ_BUFFER_SIZE), inactivity_floor_loc)
267            .await?;
268        pin_mut!(stream);
269        while let Some(result) = stream.next().await {
270            match result {
271                Err(e) => {
272                    return Err(Error::Journal(e));
273                }
274                Ok((i, op)) => {
275                    match op {
276                        Operation::Delete(key) => {
277                            let result =
278                                Any::<E, K, V, H, T>::delete_key(snapshot, log, &key, i).await?;
279                            if let Some(ref mut bitmap) = bitmap {
280                                // Mark previous location (if any) of the deleted key as inactive.
281                                if let Some(old_loc) = result {
282                                    bitmap.set_bit(old_loc, false);
283                                }
284                            }
285                        }
286                        Operation::Update(key, _) => {
287                            let result =
288                                Any::<E, K, V, H, T>::update_loc(snapshot, log, &key, i).await?;
289                            if let Some(ref mut bitmap) = bitmap {
290                                if let Some(old_loc) = result {
291                                    bitmap.set_bit(old_loc, false);
292                                }
293                                bitmap.append(true);
294                            }
295                        }
296                        Operation::CommitFloor(_) => {}
297                    }
298                    if let Some(ref mut bitmap) = bitmap {
299                        // If we reach this point and a bit hasn't been added for the operation, then it's
300                        // an inactive operation and we need to tag it as such in the bitmap.
301                        if bitmap.bit_count() == i {
302                            bitmap.append(false);
303                        }
304                    }
305                }
306            }
307        }
308
309        Ok(())
310    }
311
312    /// Update the location of `key` to `new_loc` in the snapshot and return its old location, or
313    /// insert it if the key isn't already present.
314    async fn update_loc(
315        snapshot: &mut Index<T, u64>,
316        log: &Journal<E, Operation<K, V>>,
317        key: &K,
318        new_loc: u64,
319    ) -> Result<Option<u64>, Error> {
320        // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
321        // cursor to look for the key.
322        let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
323            return Ok(None);
324        };
325
326        // Iterate over conflicts in the snapshot.
327        while let Some(&loc) = cursor.next() {
328            let (k, _) = Self::get_update_op(log, loc).await?;
329            if k == *key {
330                // Found the key in the snapshot.
331                assert!(new_loc > loc);
332                cursor.update(new_loc);
333                return Ok(Some(loc));
334            }
335        }
336
337        // The key wasn't in the snapshot, so add it to the cursor.
338        cursor.insert(new_loc);
339
340        Ok(None)
341    }
342
343    /// Get the update operation corresponding to a location from the snapshot.
344    ///
345    /// # Warning
346    ///
347    /// Panics if the location does not reference an update operation. This should never happen
348    /// unless the snapshot is buggy, or this method is being used to look up an operation
349    /// independent of the snapshot contents.
350    async fn get_update_op(log: &Journal<E, Operation<K, V>>, loc: u64) -> Result<(K, V), Error> {
351        let Operation::Update(k, v) = log.read(loc).await? else {
352            panic!("location does not reference update operation. loc={loc}");
353        };
354
355        Ok((k, v))
356    }
357
358    /// Get the value of `key` in the db, or None if it has no value.
359    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
360        Ok(self.get_key_loc(key).await?.map(|(v, _)| v))
361    }
362
363    /// Get the value of the operation with location `loc` in the db. Returns [Error::OperationPruned]
364    /// if the location precedes the oldest retained location. The location is otherwise assumed
365    /// valid.
366    pub async fn get_loc(&self, loc: u64) -> Result<Option<V>, Error> {
367        assert!(loc < self.op_count());
368        if loc < self.inactivity_floor_loc {
369            return Err(Error::OperationPruned(loc));
370        }
371
372        Ok(self.log.read(loc).await?.into_value())
373    }
374
375    /// Get the value & location of the active operation for `key` in the db, or None if it has no
376    /// value.
377    pub(crate) async fn get_key_loc(&self, key: &K) -> Result<Option<(V, u64)>, Error> {
378        for &loc in self.snapshot.get(key) {
379            let (k, v) = Self::get_update_op(&self.log, loc).await?;
380            if k == *key {
381                return Ok(Some((v, loc)));
382            }
383        }
384
385        Ok(None)
386    }
387
388    /// Get the number of operations that have been applied to this db, including those that are not
389    /// yet committed.
390    pub fn op_count(&self) -> u64 {
391        leaf_pos_to_num(self.mmr.size()).unwrap()
392    }
393
394    /// Return the inactivity floor location. This is the location before which all operations are
395    /// known to be inactive.
396    pub fn inactivity_floor_loc(&self) -> u64 {
397        self.inactivity_floor_loc
398    }
399
400    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
401    /// subject to rollback until the next successful `commit`.
402    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
403        self.update_return_loc(key, value).await?;
404
405        Ok(())
406    }
407
408    /// Updates `key` to have value `value`, returning the old location of the key if it was
409    /// previously assigned some value, and None otherwise.
410    pub(crate) async fn update_return_loc(
411        &mut self,
412        key: K,
413        value: V,
414    ) -> Result<Option<u64>, Error> {
415        let new_loc = self.op_count();
416        let res =
417            Any::<_, _, _, H, T>::update_loc(&mut self.snapshot, &self.log, &key, new_loc).await?;
418
419        let op = Operation::Update(key, value);
420        self.apply_op(op).await?;
421
422        Ok(res)
423    }
424
425    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
426    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
427    /// successful `commit`. Returns the location of the deleted value for the key (if any).
428    pub async fn delete(&mut self, key: K) -> Result<Option<u64>, Error> {
429        let loc = self.op_count();
430        let r = Self::delete_key(&mut self.snapshot, &self.log, &key, loc).await?;
431        if r.is_some() {
432            self.apply_op(Operation::Delete(key)).await?;
433        };
434
435        Ok(r)
436    }
437
438    /// Delete `key` from the snapshot if it exists, returning the location that was previously
439    /// associated with it.
440    async fn delete_key(
441        snapshot: &mut Index<T, u64>,
442        log: &Journal<E, Operation<K, V>>,
443        key: &K,
444        delete_loc: u64,
445    ) -> Result<Option<u64>, Error> {
446        // If the translated key is in the snapshot, get a cursor to look for the key.
447        let Some(mut cursor) = snapshot.get_mut(key) else {
448            return Ok(None);
449        };
450        // Iterate over all conflicting keys in the snapshot.
451        while let Some(&loc) = cursor.next() {
452            let (k, _) = Self::get_update_op(log, loc).await?;
453            if k == *key {
454                // The key is in the snapshot, so delete it.
455                //
456                // If there are no longer any conflicting keys in the cursor, it will
457                // automatically be removed from the snapshot.
458                assert!(loc < delete_loc);
459                cursor.delete();
460                return Ok(Some(loc));
461            }
462        }
463
464        // The key isn't in the conflicting keys, so this is a no-op.
465        Ok(None)
466    }
467
468    /// Return the root of the db.
469    ///
470    /// # Warning
471    ///
472    /// Panics if there are uncommitted operations.
473    pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
474        self.mmr.root(hasher)
475    }
476
477    /// Append `op` to the log and add it to the MMR. The operation will be subject to rollback
478    /// until the next successful `commit`.
479    pub(crate) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
480        let encoded_op = op.encode();
481
482        // Append operation to the log and update the MMR in parallel.
483        try_join!(
484            self.mmr
485                .add_batched(&mut self.hasher, &encoded_op)
486                .map_err(Error::Mmr),
487            self.log.append(op).map_err(Error::Journal)
488        )?;
489        self.uncommitted_ops += 1;
490
491        Ok(())
492    }
493
494    /// Generate and return:
495    ///  1. a proof of all operations applied to the db in the range starting at (and including)
496    ///     location `start_loc`, and ending at the first of either:
497    ///     - the last operation performed, or
498    ///     - the operation `max_ops` from the start.
499    ///  2. the operations corresponding to the leaves in this range.
500    ///
501    /// # Warning
502    ///
503    /// Panics if there are uncommitted operations.
504    pub async fn proof(
505        &self,
506        start_loc: u64,
507        max_ops: NonZeroU64,
508    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
509        self.historical_proof(self.op_count(), start_loc, max_ops)
510            .await
511    }
512
513    /// Analogous to proof, but with respect to the state of the MMR when it had `size` elements.
514    pub async fn historical_proof(
515        &self,
516        size: u64,
517        start_loc: u64,
518        max_ops: NonZeroU64,
519    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
520        let start_pos = leaf_num_to_pos(start_loc);
521        let end_loc = std::cmp::min(
522            size.saturating_sub(1),
523            start_loc.saturating_add(max_ops.get()) - 1,
524        );
525        let end_pos = leaf_num_to_pos(end_loc);
526        let mmr_size = leaf_num_to_pos(size);
527
528        let proof = self
529            .mmr
530            .historical_range_proof(mmr_size, start_pos, end_pos)
531            .await?;
532        let mut ops = Vec::with_capacity((end_loc - start_loc + 1) as usize);
533        let futures = (start_loc..=end_loc)
534            .map(|i| self.log.read(i))
535            .collect::<Vec<_>>();
536        try_join_all(futures)
537            .await?
538            .into_iter()
539            .for_each(|op| ops.push(op));
540
541        Ok((proof, ops))
542    }
543
544    /// Commit any pending operations to the database, ensuring their durability upon return from
545    /// this function. Also raises the inactivity floor according to the schedule.
546    ///
547    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
548    /// recover the database on restart.
549    pub async fn commit(&mut self) -> Result<(), Error> {
550        // Raise the inactivity floor by the # of uncommitted operations, plus 1 to account for the
551        // commit op that will be appended.
552        self.raise_inactivity_floor(self.uncommitted_ops + 1)
553            .await?;
554
555        // Sync the log and process the updates to the MMR in parallel.
556        let mmr_fut = async {
557            self.mmr.process_updates(&mut self.hasher);
558            Ok::<(), Error>(())
559        };
560        try_join!(self.log.sync().map_err(Error::Journal), mmr_fut)?;
561        self.uncommitted_ops = 0;
562
563        Ok(())
564    }
565
566    /// Sync all database state to disk. While this isn't necessary to ensure durability of
567    /// committed operations, periodic invocation may reduce memory usage and the time required to
568    /// recover the database on restart.
569    pub async fn sync(&mut self) -> Result<(), Error> {
570        try_join!(
571            self.log.sync().map_err(Error::Journal),
572            self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
573        )?;
574
575        Ok(())
576    }
577
578    // Moves the given operation to the tip of the log if it is active, rendering its old location
579    // inactive. If the operation was not active, then this is a no-op. Returns the old location
580    // of the operation if it was active.
581    pub(crate) async fn move_op_if_active(
582        &mut self,
583        op: Operation<K, V>,
584        old_loc: u64,
585    ) -> Result<Option<u64>, Error> {
586        // If the translated key is not in the snapshot, get a cursor to look for the key.
587        let Some(key) = op.key() else {
588            return Ok(None); // operations without keys cannot be active
589        };
590        let new_loc = self.op_count();
591        let Some(mut cursor) = self.snapshot.get_mut(key) else {
592            return Ok(None);
593        };
594
595        // Iterate over all conflicting keys in the snapshot.
596        while let Some(&loc) = cursor.next() {
597            if loc == old_loc {
598                // Update the location of the operation in the snapshot.
599                cursor.update(new_loc);
600                drop(cursor);
601
602                // Update the MMR with the operation.
603                self.apply_op(op).await?;
604                return Ok(Some(old_loc));
605            }
606        }
607
608        // The operation is not active, so this is a no-op.
609        Ok(None)
610    }
611
612    /// Raise the inactivity floor by exactly `max_steps` steps, followed by applying a commit
613    /// operation. Each step either advances over an inactive operation, or re-applies an active
614    /// operation to the tip and then advances over it.
615    ///
616    /// This method does not change the state of the db's snapshot, but it always changes the root
617    /// since it applies at least one operation.
618    async fn raise_inactivity_floor(&mut self, max_steps: u64) -> Result<(), Error> {
619        for _ in 0..max_steps {
620            if self.inactivity_floor_loc == self.op_count() {
621                break;
622            }
623            let op = self.log.read(self.inactivity_floor_loc).await?;
624            self.move_op_if_active(op, self.inactivity_floor_loc)
625                .await?;
626            self.inactivity_floor_loc += 1;
627        }
628
629        self.apply_op(Operation::CommitFloor(self.inactivity_floor_loc))
630            .await?;
631
632        Ok(())
633    }
634
635    /// Prune historical operations prior to `target_prune_loc`. This does not affect the db's root
636    /// or current snapshot.
637    ///
638    /// # Panic
639    ///
640    /// Panics if `target_prune_loc` is greater than the inactivity floor.
641    pub async fn prune(&mut self, target_prune_loc: u64) -> Result<(), Error> {
642        assert!(target_prune_loc <= self.inactivity_floor_loc);
643        if self.mmr.size() == 0 {
644            // DB is empty, nothing to prune.
645            return Ok(());
646        };
647
648        // Sync the mmr before pruning the log, otherwise the MMR tip could end up behind the log's
649        // pruning boundary on restart from an unclean shutdown, and there would be no way to replay
650        // the operations between the MMR tip and the log pruning boundary.
651        self.mmr.sync(&mut self.hasher).await?;
652
653        if !self.log.prune(target_prune_loc).await? {
654            return Ok(());
655        }
656
657        debug!(
658            log_size = self.op_count(),
659            target_prune_loc, "pruned inactive ops"
660        );
661
662        self.mmr
663            .prune_to_pos(&mut self.hasher, leaf_num_to_pos(target_prune_loc))
664            .await?;
665
666        Ok(())
667    }
668
669    /// Close the db. Operations that have not been committed will be lost or rolled back on
670    /// restart.
671    pub async fn close(mut self) -> Result<(), Error> {
672        if self.uncommitted_ops > 0 {
673            warn!(
674                op_count = self.uncommitted_ops,
675                "closing db with uncommitted operations"
676            );
677        }
678
679        self.sync().await?;
680
681        Ok(())
682    }
683
684    /// Destroy the db, removing all data from disk.
685    pub async fn destroy(self) -> Result<(), Error> {
686        try_join!(
687            self.log.destroy().map_err(Error::Journal),
688            self.mmr.destroy().map_err(Error::Mmr),
689        )?;
690
691        Ok(())
692    }
693
694    /// Simulate an unclean shutdown by consuming the db without syncing (or only partially syncing)
695    /// the log and/or mmr. When _not_ fully syncing the mmr, the `write_limit` parameter dictates
696    /// how many mmr nodes to write during a partial sync (can be 0).
697    #[cfg(test)]
698    pub async fn simulate_failure(
699        mut self,
700        sync_log: bool,
701        sync_mmr: bool,
702        write_limit: usize,
703    ) -> Result<(), Error> {
704        if sync_log {
705            self.log.sync().await?;
706        }
707        if sync_mmr {
708            assert_eq!(write_limit, 0);
709            self.mmr.sync(&mut self.hasher).await?;
710        } else if write_limit > 0 {
711            self.mmr
712                .simulate_partial_sync(&mut self.hasher, write_limit)
713                .await?;
714        }
715
716        Ok(())
717    }
718}
719
720// pub(super) so helpers can be used by the sync module.
721#[cfg(test)]
722pub(super) mod test {
723    use super::*;
724    use crate::{
725        adb::verify_proof,
726        mmr::{hasher::Standard, mem::Mmr as MemMmr},
727        translator::TwoCap,
728    };
729    use commonware_codec::{DecodeExt, FixedSize};
730    use commonware_cryptography::{sha256::Digest, Digest as _, Hasher as CHasher, Sha256};
731    use commonware_macros::test_traced;
732    use commonware_runtime::{
733        deterministic::{self, Context},
734        Runner as _,
735    };
736    use commonware_utils::NZU64;
737    use rand::{
738        rngs::{OsRng, StdRng},
739        RngCore, SeedableRng,
740    };
741    use std::collections::{HashMap, HashSet};
742
743    const SHA256_SIZE: usize = <Sha256 as CHasher>::Digest::SIZE;
744
745    // Janky page & cache sizes to exercise boundary conditions.
746    const PAGE_SIZE: usize = 101;
747    const PAGE_CACHE_SIZE: usize = 11;
748
749    pub(super) fn any_db_config(suffix: &str) -> Config<TwoCap> {
750        Config {
751            mmr_journal_partition: format!("journal_{suffix}"),
752            mmr_metadata_partition: format!("metadata_{suffix}"),
753            mmr_items_per_blob: NZU64!(11),
754            mmr_write_buffer: NZUsize!(1024),
755            log_journal_partition: format!("log_journal_{suffix}"),
756            log_items_per_blob: NZU64!(7),
757            log_write_buffer: NZUsize!(1024),
758            translator: TwoCap,
759            thread_pool: None,
760            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
761        }
762    }
763
764    /// A type alias for the concrete [Any] type used in these unit tests.
765    pub(super) type AnyTest = Any<deterministic::Context, Digest, Digest, Sha256, TwoCap>;
766
767    /// Return an `Any` database initialized with a fixed config.
768    async fn open_db(context: deterministic::Context) -> AnyTest {
769        AnyTest::init(context, any_db_config("partition"))
770            .await
771            .unwrap()
772    }
773
774    pub(super) fn create_test_config(seed: u64) -> Config<TwoCap> {
775        Config {
776            mmr_journal_partition: format!("mmr_journal_{seed}"),
777            mmr_metadata_partition: format!("mmr_metadata_{seed}"),
778            mmr_items_per_blob: NZU64!(13), // intentionally small and janky size
779            mmr_write_buffer: NZUsize!(64),
780            log_journal_partition: format!("log_journal_{seed}"),
781            log_items_per_blob: NZU64!(11), // intentionally small and janky size
782            log_write_buffer: NZUsize!(64),
783            translator: TwoCap,
784            thread_pool: None,
785            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
786        }
787    }
788
789    /// Create a test database with unique partition names
790    pub(super) async fn create_test_db(mut context: Context) -> AnyTest {
791        let seed = context.next_u64();
792        let config = create_test_config(seed);
793        AnyTest::init(context, config).await.unwrap()
794    }
795
796    /// Create n random operations. Some portion of the updates are deletes.
797    /// create_test_ops(n') is a suffix of create_test_ops(n) for n' > n.
798    pub(super) fn create_test_ops(n: usize) -> Vec<Operation<Digest, Digest>> {
799        let mut rng = StdRng::seed_from_u64(1337);
800        let mut prev_key = Digest::random(&mut rng);
801        let mut ops = Vec::new();
802        for i in 0..n {
803            let key = Digest::random(&mut rng);
804            if i % 10 == 0 && i > 0 {
805                ops.push(Operation::Delete(prev_key));
806            } else {
807                let value = Digest::random(&mut rng);
808                ops.push(Operation::Update(key, value));
809                prev_key = key;
810            }
811        }
812        ops
813    }
814
815    /// Applies the given operations to the database.
816    pub(super) async fn apply_ops(db: &mut AnyTest, ops: Vec<Operation<Digest, Digest>>) {
817        for op in ops {
818            match op {
819                Operation::Update(key, value) => {
820                    db.update(key, value).await.unwrap();
821                }
822                Operation::Delete(key) => {
823                    db.delete(key).await.unwrap();
824                }
825                Operation::CommitFloor(_) => {
826                    db.commit().await.unwrap();
827                }
828            }
829        }
830    }
831
832    #[test_traced("WARN")]
833    fn test_any_fixed_db_empty() {
834        let executor = deterministic::Runner::default();
835        executor.start(|context| async move {
836            let mut db = open_db(context.clone()).await;
837            let mut hasher = Standard::<Sha256>::new();
838            assert_eq!(db.op_count(), 0);
839            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
840            assert_eq!(db.root(&mut hasher), MemMmr::default().root(&mut hasher));
841
842            // Make sure closing/reopening gets us back to the same state, even after adding an
843            // uncommitted op, and even without a clean shutdown.
844            let d1 = Sha256::fill(1u8);
845            let d2 = Sha256::fill(2u8);
846            let root = db.root(&mut hasher);
847            db.update(d1, d2).await.unwrap();
848            let mut db = open_db(context.clone()).await;
849            assert_eq!(db.root(&mut hasher), root);
850            assert_eq!(db.op_count(), 0);
851
852            // Test calling commit on an empty db which should make it (durably) non-empty.
853            db.commit().await.unwrap();
854            assert_eq!(db.op_count(), 1); // floor op added
855            let root = db.root(&mut hasher);
856            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
857
858            // Re-opening the DB without a clean shutdown should still recover the correct state.
859            let mut db = open_db(context.clone()).await;
860            assert_eq!(db.op_count(), 1);
861            assert_eq!(db.root(&mut hasher), root);
862
863            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits.
864            for _ in 1..100 {
865                db.commit().await.unwrap();
866                assert_eq!(db.op_count() - 1, db.inactivity_floor_loc);
867            }
868
869            db.destroy().await.unwrap();
870        });
871    }
872
873    #[test_traced("WARN")]
874    fn test_any_fixed_db_build_basic() {
875        let executor = deterministic::Runner::default();
876        executor.start(|context| async move {
877            // Build a db with 2 keys and make sure updates and deletions of those keys work as
878            // expected.
879            let mut hasher = Standard::<Sha256>::new();
880            let mut db = open_db(context.clone()).await;
881
882            let d1 = Sha256::fill(1u8);
883            let d2 = Sha256::fill(2u8);
884
885            assert!(db.get(&d1).await.unwrap().is_none());
886            assert!(db.get(&d2).await.unwrap().is_none());
887
888            db.update(d1, d2).await.unwrap();
889            assert_eq!(db.get(&d1).await.unwrap().unwrap(), d2);
890            assert!(db.get(&d2).await.unwrap().is_none());
891
892            db.update(d2, d1).await.unwrap();
893            assert_eq!(db.get(&d1).await.unwrap().unwrap(), d2);
894            assert_eq!(db.get(&d2).await.unwrap().unwrap(), d1);
895
896            db.delete(d1).await.unwrap();
897            assert!(db.get(&d1).await.unwrap().is_none());
898            assert_eq!(db.get(&d2).await.unwrap().unwrap(), d1);
899
900            db.update(d1, d1).await.unwrap();
901            assert_eq!(db.get(&d1).await.unwrap().unwrap(), d1);
902
903            db.update(d2, d2).await.unwrap();
904            assert_eq!(db.get(&d2).await.unwrap().unwrap(), d2);
905
906            assert_eq!(db.log.size().await.unwrap(), 5); // 4 updates, 1 deletion.
907            assert_eq!(db.snapshot.keys(), 2);
908            assert_eq!(db.inactivity_floor_loc, 0);
909            db.sync().await.unwrap();
910
911            // Advance over 3 inactive operations.
912            db.raise_inactivity_floor(3).await.unwrap();
913            assert_eq!(db.inactivity_floor_loc, 3);
914            assert_eq!(db.log.size().await.unwrap(), 6); // 4 updates, 1 deletion, 1 commit
915            db.sync().await.unwrap();
916
917            // Delete all keys.
918            db.delete(d1).await.unwrap();
919            db.delete(d2).await.unwrap();
920            assert!(db.get(&d1).await.unwrap().is_none());
921            assert!(db.get(&d2).await.unwrap().is_none());
922            assert_eq!(db.log.size().await.unwrap(), 8); // 4 updates, 3 deletions, 1 commit
923            assert_eq!(db.inactivity_floor_loc, 3);
924
925            db.sync().await.unwrap();
926            let root = db.root(&mut hasher);
927
928            // Multiple deletions of the same key should be a no-op.
929            db.delete(d1).await.unwrap();
930            assert_eq!(db.log.size().await.unwrap(), 8);
931            assert_eq!(db.root(&mut hasher), root);
932
933            // Deletions of non-existent keys should be a no-op.
934            let d3 = <Sha256 as CHasher>::Digest::decode(vec![2u8; SHA256_SIZE].as_ref()).unwrap();
935            assert!(db.delete(d3).await.unwrap().is_none());
936            assert_eq!(db.log.size().await.unwrap(), 8);
937            db.sync().await.unwrap();
938            assert_eq!(db.root(&mut hasher), root);
939
940            // Make sure closing/reopening gets us back to the same state.
941            db.commit().await.unwrap();
942            assert_eq!(db.log.size().await.unwrap(), 9);
943            let root = db.root(&mut hasher);
944            db.close().await.unwrap();
945            let mut db = open_db(context.clone()).await;
946            assert_eq!(db.log.size().await.unwrap(), 9);
947            assert_eq!(db.root(&mut hasher), root);
948
949            // Since this db no longer has any active keys, we should be able to raise the
950            // inactivity floor to the tip (only the inactive commit op remains).
951            db.raise_inactivity_floor(100).await.unwrap();
952            assert_eq!(db.inactivity_floor_loc, db.op_count() - 1);
953
954            // Re-activate the keys by updating them.
955            db.update(d1, d1).await.unwrap();
956            db.update(d2, d2).await.unwrap();
957            db.delete(d1).await.unwrap();
958            db.update(d2, d1).await.unwrap();
959            db.update(d1, d2).await.unwrap();
960            assert_eq!(db.snapshot.keys(), 2);
961
962            // Confirm close/reopen gets us back to the same state.
963            db.commit().await.unwrap();
964            let root = db.root(&mut hasher);
965            db.close().await.unwrap();
966            let mut db = open_db(context).await;
967            assert_eq!(db.root(&mut hasher), root);
968            assert_eq!(db.snapshot.keys(), 2);
969
970            // Commit will raise the inactivity floor, which won't affect state but will affect the
971            // root.
972            db.commit().await.unwrap();
973
974            assert!(db.root(&mut hasher) != root);
975
976            // Pruning inactive ops should not affect current state or root
977            let root = db.root(&mut hasher);
978            db.prune(db.inactivity_floor_loc()).await.unwrap();
979            assert_eq!(db.snapshot.keys(), 2);
980            assert_eq!(db.root(&mut hasher), root);
981
982            db.destroy().await.unwrap();
983        });
984    }
985
986    #[test_traced("WARN")]
987    fn test_any_fixed_db_build_and_authenticate() {
988        let executor = deterministic::Runner::default();
989        // Build a db with 1000 keys, some of which we update and some of which we delete, and
990        // confirm that the end state of the db matches that of an identically updated hashmap.
991        const ELEMENTS: u64 = 1000;
992        executor.start(|context| async move {
993            let mut hasher = Standard::<Sha256>::new();
994            let mut db = open_db(context.clone()).await;
995
996            let mut map = HashMap::<Digest, Digest>::default();
997            for i in 0u64..ELEMENTS {
998                let k = Sha256::hash(&i.to_be_bytes());
999                let v = Sha256::hash(&(i * 1000).to_be_bytes());
1000                db.update(k, v).await.unwrap();
1001                map.insert(k, v);
1002            }
1003
1004            // Update every 3rd key
1005            for i in 0u64..ELEMENTS {
1006                if i % 3 != 0 {
1007                    continue;
1008                }
1009                let k = Sha256::hash(&i.to_be_bytes());
1010                let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
1011                db.update(k, v).await.unwrap();
1012                map.insert(k, v);
1013            }
1014
1015            // Delete every 7th key
1016            for i in 0u64..ELEMENTS {
1017                if i % 7 != 1 {
1018                    continue;
1019                }
1020                let k = Sha256::hash(&i.to_be_bytes());
1021                db.delete(k).await.unwrap();
1022                map.remove(&k);
1023            }
1024
1025            assert_eq!(db.op_count(), 1477);
1026            assert_eq!(db.inactivity_floor_loc, 0);
1027            assert_eq!(db.log.size().await.unwrap(), 1477);
1028            assert_eq!(db.snapshot.items(), 857);
1029
1030            // Test that commit + sync w/ pruning will raise the activity floor.
1031            db.commit().await.unwrap();
1032            db.sync().await.unwrap();
1033            db.prune(db.inactivity_floor_loc()).await.unwrap();
1034            assert_eq!(db.op_count(), 2336);
1035            assert_eq!(db.inactivity_floor_loc, 1478);
1036            assert_eq!(db.snapshot.items(), 857);
1037
1038            // Close & reopen the db, making sure the re-opened db has exactly the same state.
1039            let root = db.root(&mut hasher);
1040            db.close().await.unwrap();
1041            let mut db = open_db(context.clone()).await;
1042            assert_eq!(root, db.root(&mut hasher));
1043            assert_eq!(db.op_count(), 2336);
1044            assert_eq!(db.inactivity_floor_loc, 1478);
1045            assert_eq!(db.snapshot.items(), 857);
1046
1047            // Raise the inactivity floor to the point where all inactive operations can be pruned.
1048            db.raise_inactivity_floor(3000).await.unwrap();
1049            db.prune(db.inactivity_floor_loc()).await.unwrap();
1050            assert_eq!(db.inactivity_floor_loc, 4478);
1051            // Inactivity floor should be 858 operations from tip since 858 operations are active
1052            // (counting the floor op itself).
1053            assert_eq!(db.op_count(), 4478 + 858);
1054            assert_eq!(db.snapshot.items(), 857);
1055
1056            // Confirm the db's state matches that of the separate map we computed independently.
1057            for i in 0u64..1000 {
1058                let k = Sha256::hash(&i.to_be_bytes());
1059                if let Some(map_value) = map.get(&k) {
1060                    let Some(db_value) = db.get(&k).await.unwrap() else {
1061                        panic!("key not found in db: {k}");
1062                    };
1063                    assert_eq!(*map_value, db_value);
1064                } else {
1065                    assert!(db.get(&k).await.unwrap().is_none());
1066                }
1067            }
1068
1069            // Make sure size-constrained batches of operations are provable from the oldest
1070            // retained op to tip.
1071            let max_ops = NZU64!(4);
1072            let end_loc = db.op_count();
1073            let start_pos = db.mmr.pruned_to_pos();
1074            let start_loc = leaf_pos_to_num(start_pos).unwrap();
1075            // Raise the inactivity floor and make sure historical inactive operations are still provable.
1076            db.raise_inactivity_floor(100).await.unwrap();
1077            db.sync().await.unwrap();
1078            let root = db.root(&mut hasher);
1079            assert!(start_loc < db.inactivity_floor_loc);
1080
1081            for i in start_loc..end_loc {
1082                let (proof, log) = db.proof(i, max_ops).await.unwrap();
1083                assert!(verify_proof(&mut hasher, &proof, i, &log, &root));
1084            }
1085
1086            db.destroy().await.unwrap();
1087        });
1088    }
1089
1090    /// Test that various types of unclean shutdown while updating a non-empty DB recover to the
1091    /// empty DB on re-open.
1092    #[test_traced("WARN")]
1093    fn test_any_fixed_non_empty_db_recovery() {
1094        let executor = deterministic::Runner::default();
1095        executor.start(|context| async move {
1096            let mut hasher = Standard::<Sha256>::new();
1097            let mut db = open_db(context.clone()).await;
1098
1099            // Insert 1000 keys then sync.
1100            const ELEMENTS: u64 = 1000;
1101            for i in 0u64..ELEMENTS {
1102                let k = Sha256::hash(&i.to_be_bytes());
1103                let v = Sha256::hash(&(i * 1000).to_be_bytes());
1104                db.update(k, v).await.unwrap();
1105            }
1106            db.commit().await.unwrap();
1107            db.prune(db.inactivity_floor_loc()).await.unwrap();
1108            let root = db.root(&mut hasher);
1109            let op_count = db.op_count();
1110            let inactivity_floor_loc = db.inactivity_floor_loc();
1111
1112            // Reopen DB without clean shutdown and make sure the state is the same.
1113            let mut db = open_db(context.clone()).await;
1114            assert_eq!(db.op_count(), op_count);
1115            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1116            assert_eq!(db.root(&mut hasher), root);
1117
1118            async fn apply_more_ops(db: &mut AnyTest) {
1119                for i in 0u64..ELEMENTS {
1120                    let k = Sha256::hash(&i.to_be_bytes());
1121                    let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
1122                    db.update(k, v).await.unwrap();
1123                }
1124            }
1125
1126            // Insert operations without commit, then simulate failure, syncing nothing.
1127            apply_more_ops(&mut db).await;
1128            db.simulate_failure(false, false, 0).await.unwrap();
1129            let mut db = open_db(context.clone()).await;
1130            assert_eq!(db.op_count(), op_count);
1131            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1132            assert_eq!(db.root(&mut hasher), root);
1133
1134            // Repeat, though this time sync the log and only 10 elements of the mmr.
1135            apply_more_ops(&mut db).await;
1136            db.simulate_failure(true, false, 10).await.unwrap();
1137            let mut db = open_db(context.clone()).await;
1138            assert_eq!(db.op_count(), op_count);
1139            assert_eq!(db.root(&mut hasher), root);
1140
1141            // Repeat, though this time only fully sync the mmr.
1142            apply_more_ops(&mut db).await;
1143            db.simulate_failure(false, true, 0).await.unwrap();
1144            let mut db = open_db(context.clone()).await;
1145            assert_eq!(db.op_count(), op_count);
1146            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1147            assert_eq!(db.root(&mut hasher), root);
1148
1149            // One last check that re-open without proper shutdown still recovers the correct state.
1150            apply_more_ops(&mut db).await;
1151            apply_more_ops(&mut db).await;
1152            apply_more_ops(&mut db).await;
1153            let mut db = open_db(context.clone()).await;
1154            assert_eq!(db.op_count(), op_count);
1155            assert_eq!(db.root(&mut hasher), root);
1156
1157            // Apply the ops one last time but fully commit them this time, then clean up.
1158            apply_more_ops(&mut db).await;
1159            db.commit().await.unwrap();
1160            let db = open_db(context.clone()).await;
1161            assert!(db.op_count() > op_count);
1162            assert_ne!(db.inactivity_floor_loc(), inactivity_floor_loc);
1163            assert_ne!(db.root(&mut hasher), root);
1164
1165            db.destroy().await.unwrap();
1166        });
1167    }
1168
1169    /// Test that various types of unclean shutdown while updating an empty DB recover to the empty
1170    /// DB on re-open.
1171    #[test_traced("WARN")]
1172    fn test_any_fixed_empty_db_recovery() {
1173        let executor = deterministic::Runner::default();
1174        executor.start(|context| async move {
1175            // Initialize an empty db.
1176            let mut hasher = Standard::<Sha256>::new();
1177            let db = open_db(context.clone()).await;
1178            let root = db.root(&mut hasher);
1179
1180            // Reopen DB without clean shutdown and make sure the state is the same.
1181            let mut db = open_db(context.clone()).await;
1182            assert_eq!(db.op_count(), 0);
1183            assert_eq!(db.root(&mut hasher), root);
1184
1185            async fn apply_ops(db: &mut AnyTest) {
1186                for i in 0u64..1000 {
1187                    let k = Sha256::hash(&i.to_be_bytes());
1188                    let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
1189                    db.update(k, v).await.unwrap();
1190                }
1191            }
1192
1193            // Insert operations without commit then simulate failure, syncing nothing except one
1194            // element of the mmr.
1195            apply_ops(&mut db).await;
1196            db.simulate_failure(false, false, 1).await.unwrap();
1197            let mut db = open_db(context.clone()).await;
1198            assert_eq!(db.op_count(), 0);
1199            assert_eq!(db.root(&mut hasher), root);
1200
1201            // Repeat, though this time sync the log.
1202            apply_ops(&mut db).await;
1203            db.simulate_failure(true, false, 0).await.unwrap();
1204            let mut db = open_db(context.clone()).await;
1205            assert_eq!(db.op_count(), 0);
1206            assert_eq!(db.root(&mut hasher), root);
1207
1208            // Repeat, though this time sync the mmr.
1209            apply_ops(&mut db).await;
1210            db.simulate_failure(false, true, 0).await.unwrap();
1211            let mut db = open_db(context.clone()).await;
1212            assert_eq!(db.op_count(), 0);
1213            assert_eq!(db.root(&mut hasher), root);
1214
1215            // One last check that re-open without proper shutdown still recovers the correct state.
1216            apply_ops(&mut db).await;
1217            apply_ops(&mut db).await;
1218            apply_ops(&mut db).await;
1219            let mut db = open_db(context.clone()).await;
1220            assert_eq!(db.op_count(), 0);
1221            assert_eq!(db.root(&mut hasher), root);
1222
1223            // Apply the ops one last time but fully commit them this time, then clean up.
1224            apply_ops(&mut db).await;
1225            db.commit().await.unwrap();
1226            let db = open_db(context.clone()).await;
1227            assert!(db.op_count() > 0);
1228            assert_ne!(db.root(&mut hasher), root);
1229
1230            db.destroy().await.unwrap();
1231        });
1232    }
1233
1234    // Test that replaying multiple updates of the same key on startup doesn't leave behind old data
1235    // in the snapshot.
1236    #[test_traced("WARN")]
1237    fn test_any_fixed_db_log_replay() {
1238        let executor = deterministic::Runner::default();
1239        executor.start(|context| async move {
1240            let mut hasher = Standard::<Sha256>::new();
1241            let mut db = open_db(context.clone()).await;
1242
1243            // Update the same key many times.
1244            const UPDATES: u64 = 100;
1245            let k = Sha256::hash(&UPDATES.to_be_bytes());
1246            for i in 0u64..UPDATES {
1247                let v = Sha256::hash(&(i * 1000).to_be_bytes());
1248                db.update(k, v).await.unwrap();
1249            }
1250            db.commit().await.unwrap();
1251            let root = db.root(&mut hasher);
1252            db.close().await.unwrap();
1253
1254            // Simulate a failed commit and test that the log replay doesn't leave behind old data.
1255            let db = open_db(context.clone()).await;
1256            let iter = db.snapshot.get(&k);
1257            assert_eq!(iter.cloned().collect::<Vec<_>>().len(), 1);
1258            assert_eq!(db.root(&mut hasher), root);
1259
1260            db.destroy().await.unwrap();
1261        });
1262    }
1263
1264    #[test_traced("WARN")]
1265    fn test_any_fixed_db_multiple_commits_delete_gets_replayed() {
1266        let executor = deterministic::Runner::default();
1267        executor.start(|context| async move {
1268            let mut hasher = Standard::<Sha256>::new();
1269            let mut db = open_db(context.clone()).await;
1270
1271            let mut map = HashMap::<Digest, Digest>::default();
1272            const ELEMENTS: u64 = 10;
1273            // insert & commit multiple batches to ensure repeated inactivity floor raising.
1274            for j in 0u64..ELEMENTS {
1275                for i in 0u64..ELEMENTS {
1276                    let k = Sha256::hash(&(j * 1000 + i).to_be_bytes());
1277                    let v = Sha256::hash(&(i * 1000).to_be_bytes());
1278                    db.update(k, v).await.unwrap();
1279                    map.insert(k, v);
1280                }
1281                db.commit().await.unwrap();
1282            }
1283            let k = Sha256::hash(&((ELEMENTS - 1) * 1000 + (ELEMENTS - 1)).to_be_bytes());
1284
1285            // Do one last delete operation which will be above the inactivity
1286            // floor, to make sure it gets replayed on restart.
1287            db.delete(k).await.unwrap();
1288            db.commit().await.unwrap();
1289            assert!(db.get(&k).await.unwrap().is_none());
1290
1291            // Close & reopen the db, making sure the re-opened db has exactly the same state.
1292            let root = db.root(&mut hasher);
1293            db.close().await.unwrap();
1294            let db = open_db(context.clone()).await;
1295            assert_eq!(root, db.root(&mut hasher));
1296            assert!(db.get(&k).await.unwrap().is_none());
1297
1298            db.destroy().await.unwrap();
1299        });
1300    }
1301
1302    /// This test builds a random database, and makes sure that its state can be replayed by
1303    /// `build_snapshot_from_log` with a bitmap to correctly capture the active operations.
1304    #[test_traced("WARN")]
1305    fn test_any_fixed_db_build_snapshot_with_bitmap() {
1306        // Number of elements to initially insert into the db.
1307        const ELEMENTS: u64 = 1000;
1308
1309        // Use a non-deterministic rng seed to ensure each run is different.
1310        let rng_seed = OsRng.next_u64();
1311        // Log the seed with high visibility to make failures reproducible.
1312        warn!("rng_seed={}", rng_seed);
1313        let mut rng = StdRng::seed_from_u64(rng_seed);
1314
1315        let executor = deterministic::Runner::default();
1316        executor.start(|context| async move {
1317            let mut hasher = Standard::<Sha256>::new();
1318            let mut db = open_db(context.clone()).await;
1319
1320            for i in 0u64..ELEMENTS {
1321                let k = Sha256::hash(&i.to_be_bytes());
1322                let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1323                db.update(k, v).await.unwrap();
1324            }
1325
1326            // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
1327            // frequency.
1328            for _ in 0u64..ELEMENTS * 10 {
1329                let rand_key = Sha256::hash(&(rng.next_u64() % ELEMENTS).to_be_bytes());
1330                if rng.next_u32() % 7 == 0 {
1331                    db.delete(rand_key).await.unwrap();
1332                    continue;
1333                }
1334                let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1335                db.update(rand_key, v).await.unwrap();
1336                if rng.next_u32() % 20 == 0 {
1337                    // Commit every ~20 updates.
1338                    db.commit().await.unwrap();
1339                }
1340            }
1341            db.commit().await.unwrap();
1342
1343            let root = db.root(&mut hasher);
1344            let inactivity_floor_loc = db.inactivity_floor_loc;
1345
1346            // Close the db, then replay its operations with a bitmap.
1347            db.close().await.unwrap();
1348            // Initialize the bitmap based on the current db's inactivity floor.
1349            let mut bitmap = Bitmap::<_, SHA256_SIZE>::new();
1350            for _ in 0..inactivity_floor_loc {
1351                bitmap.append(false);
1352            }
1353            bitmap.sync(&mut hasher).await.unwrap();
1354
1355            // Initialize the db's mmr/log.
1356            let cfg = any_db_config("partition");
1357            let (inactivity_floor_loc, mmr, log) =
1358                AnyTest::init_mmr_and_log(context.clone(), cfg, &mut hasher)
1359                    .await
1360                    .unwrap();
1361
1362            // Replay log to populate the bitmap. Use a TwoCap instead of EightCap here so we exercise some collisions.
1363            let mut snapshot: Index<TwoCap, u64> =
1364                Index::init(context.with_label("snapshot"), TwoCap);
1365            AnyTest::build_snapshot_from_log::<SHA256_SIZE>(
1366                inactivity_floor_loc,
1367                &log,
1368                &mut snapshot,
1369                Some(&mut bitmap),
1370            )
1371            .await
1372            .unwrap();
1373
1374            // Check the recovered state is correct.
1375            let db = AnyTest {
1376                mmr,
1377                log,
1378                snapshot,
1379                inactivity_floor_loc,
1380                uncommitted_ops: 0,
1381                hasher: Standard::<Sha256>::new(),
1382            };
1383            assert_eq!(db.root(&mut hasher), root);
1384
1385            // Check the bitmap state matches that of the snapshot.
1386            let items = db.log.size().await.unwrap();
1387            assert_eq!(bitmap.bit_count(), items);
1388            let mut active_positions = HashSet::new();
1389            // This loop checks that the expected true bits are true in the bitmap.
1390            for pos in db.inactivity_floor_loc..items {
1391                let item = db.log.read(pos).await.unwrap();
1392                let Some(item_key) = item.key() else {
1393                    // `item` is a commit
1394                    continue;
1395                };
1396                let iter = db.snapshot.get(item_key);
1397                for loc in iter {
1398                    if *loc == pos {
1399                        // Found an active op.
1400                        active_positions.insert(pos);
1401                        assert!(bitmap.get_bit(pos));
1402                        break;
1403                    }
1404                }
1405            }
1406            // This loop checks that the expected false bits are false in the bitmap.
1407            for pos in db.inactivity_floor_loc..items {
1408                if !active_positions.contains(&pos) {
1409                    assert!(!bitmap.get_bit(pos));
1410                }
1411            }
1412
1413            db.destroy().await.unwrap();
1414        });
1415    }
1416
1417    #[test]
1418    fn test_any_fixed_db_historical_proof_basic() {
1419        let executor = deterministic::Runner::default();
1420        executor.start(|context| async move {
1421            let mut db = create_test_db(context.clone()).await;
1422            let ops = create_test_ops(20);
1423            apply_ops(&mut db, ops.clone()).await;
1424            db.commit().await.unwrap();
1425            let mut hasher = Standard::<Sha256>::new();
1426            let root_hash = db.root(&mut hasher);
1427            let original_op_count = db.op_count();
1428
1429            // Historical proof should match "regular" proof when historical size == current database size
1430            let max_ops = NZU64!(10);
1431            let (historical_proof, historical_ops) = db
1432                .historical_proof(original_op_count, 5, max_ops)
1433                .await
1434                .unwrap();
1435            let (regular_proof, regular_ops) = db.proof(5, max_ops).await.unwrap();
1436
1437            assert_eq!(historical_proof.size, regular_proof.size);
1438            assert_eq!(historical_proof.digests, regular_proof.digests);
1439            assert_eq!(historical_ops, regular_ops);
1440            assert_eq!(historical_ops, ops[5..15]);
1441            assert!(verify_proof(
1442                &mut hasher,
1443                &historical_proof,
1444                5,
1445                &historical_ops,
1446                &root_hash
1447            ));
1448
1449            // Add more operations to the database
1450            let more_ops = create_test_ops(5);
1451            apply_ops(&mut db, more_ops.clone()).await;
1452            db.commit().await.unwrap();
1453
1454            // Historical proof should remain the same even though database has grown
1455            let (historical_proof, historical_ops) = db
1456                .historical_proof(original_op_count, 5, NZU64!(10))
1457                .await
1458                .unwrap();
1459            assert_eq!(historical_proof.size, leaf_num_to_pos(original_op_count));
1460            assert_eq!(historical_proof.size, regular_proof.size);
1461            assert_eq!(historical_ops.len(), 10);
1462            assert_eq!(historical_proof.digests, regular_proof.digests);
1463            assert_eq!(historical_ops, regular_ops);
1464            assert!(verify_proof(
1465                &mut hasher,
1466                &historical_proof,
1467                5,
1468                &historical_ops,
1469                &root_hash
1470            ));
1471
1472            db.destroy().await.unwrap();
1473        });
1474    }
1475
1476    #[test]
1477    fn test_any_fixed_db_historical_proof_edge_cases() {
1478        let executor = deterministic::Runner::default();
1479        executor.start(|context| async move {
1480            let mut db = create_test_db(context.clone()).await;
1481            let ops = create_test_ops(50);
1482            apply_ops(&mut db, ops.clone()).await;
1483            db.commit().await.unwrap();
1484
1485            let mut hasher = Standard::<Sha256>::new();
1486
1487            // Test singleton database
1488            let (single_proof, single_ops) = db.historical_proof(1, 0, NZU64!(1)).await.unwrap();
1489            assert_eq!(single_proof.size, leaf_num_to_pos(1));
1490            assert_eq!(single_ops.len(), 1);
1491
1492            // Create historical database with single operation
1493            let mut single_db = create_test_db(context.clone()).await;
1494            apply_ops(&mut single_db, ops[0..1].to_vec()).await;
1495            // Don't commit - this changes the root due to commit operations
1496            single_db.sync().await.unwrap();
1497            let single_root = single_db.root(&mut hasher);
1498
1499            assert!(verify_proof(
1500                &mut hasher,
1501                &single_proof,
1502                0,
1503                &single_ops,
1504                &single_root
1505            ));
1506
1507            // Test requesting more operations than available in historical position
1508            let (_limited_proof, limited_ops) =
1509                db.historical_proof(10, 5, NZU64!(20)).await.unwrap();
1510            assert_eq!(limited_ops.len(), 5); // Should be limited by historical position
1511            assert_eq!(limited_ops, ops[5..10]);
1512
1513            // Test proof at minimum historical position
1514            let (min_proof, min_ops) = db.historical_proof(3, 0, NZU64!(3)).await.unwrap();
1515            assert_eq!(min_proof.size, leaf_num_to_pos(3));
1516            assert_eq!(min_ops.len(), 3);
1517            assert_eq!(min_ops, ops[0..3]);
1518
1519            single_db.destroy().await.unwrap();
1520            db.destroy().await.unwrap();
1521        });
1522    }
1523
1524    #[test]
1525    fn test_any_fixed_db_historical_proof_different_historical_sizes() {
1526        let executor = deterministic::Runner::default();
1527        executor.start(|context| async move {
1528            let mut db = create_test_db(context.clone()).await;
1529            let ops = create_test_ops(100);
1530            apply_ops(&mut db, ops.clone()).await;
1531            db.commit().await.unwrap();
1532
1533            let mut hasher = Standard::<Sha256>::new();
1534
1535            // Test historical proof generation for several historical states.
1536            let start_loc = 20;
1537            let max_ops = NZU64!(10);
1538            for end_loc in 31..50 {
1539                let (historical_proof, historical_ops) = db
1540                    .historical_proof(end_loc, start_loc, max_ops)
1541                    .await
1542                    .unwrap();
1543
1544                assert_eq!(historical_proof.size, leaf_num_to_pos(end_loc));
1545
1546                // Create  reference database at the given historical size
1547                let mut ref_db = create_test_db(context.clone()).await;
1548                apply_ops(&mut ref_db, ops[0..end_loc as usize].to_vec()).await;
1549                // Sync to process dirty nodes but don't commit - commit changes the root due to commit operations
1550                ref_db.sync().await.unwrap();
1551
1552                let (ref_proof, ref_ops) = ref_db.proof(start_loc, max_ops).await.unwrap();
1553                assert_eq!(ref_proof.size, historical_proof.size);
1554                assert_eq!(ref_ops, historical_ops);
1555                assert_eq!(ref_proof.digests, historical_proof.digests);
1556                let end_loc = std::cmp::min(start_loc + max_ops.get(), end_loc);
1557                assert_eq!(ref_ops, ops[start_loc as usize..end_loc as usize]);
1558
1559                // Verify proof against reference root
1560                let ref_root = ref_db.root(&mut hasher);
1561                assert!(verify_proof(
1562                    &mut hasher,
1563                    &historical_proof,
1564                    start_loc,
1565                    &historical_ops,
1566                    &ref_root
1567                ),);
1568
1569                ref_db.destroy().await.unwrap();
1570            }
1571
1572            db.destroy().await.unwrap();
1573        });
1574    }
1575
1576    #[test]
1577    fn test_any_fixed_db_historical_proof_invalid() {
1578        let executor = deterministic::Runner::default();
1579        executor.start(|context| async move {
1580            let mut db = create_test_db(context.clone()).await;
1581            let ops = create_test_ops(10);
1582            apply_ops(&mut db, ops).await;
1583            db.commit().await.unwrap();
1584
1585            let (proof, ops) = db.historical_proof(5, 1, NZU64!(10)).await.unwrap();
1586            assert_eq!(proof.size, leaf_num_to_pos(5));
1587            assert_eq!(ops.len(), 4);
1588
1589            let mut hasher = Standard::<Sha256>::new();
1590
1591            // Changing the proof digests should cause verification to fail
1592            {
1593                let mut proof = proof.clone();
1594                proof.digests[0] = Sha256::hash(b"invalid");
1595                let root_hash = db.root(&mut hasher);
1596                assert!(!verify_proof(&mut hasher, &proof, 0, &ops, &root_hash));
1597            }
1598            {
1599                let mut proof = proof.clone();
1600                proof.digests.push(Sha256::hash(b"invalid"));
1601                let root_hash = db.root(&mut hasher);
1602                assert!(!verify_proof(&mut hasher, &proof, 0, &ops, &root_hash));
1603            }
1604
1605            // Changing the ops should cause verification to fail
1606            {
1607                let mut ops = ops.clone();
1608                ops[0] = Operation::Update(Sha256::hash(b"key1"), Sha256::hash(b"value1"));
1609                let root_hash = db.root(&mut hasher);
1610                assert!(!verify_proof(&mut hasher, &proof, 0, &ops, &root_hash));
1611            }
1612            {
1613                let mut ops = ops.clone();
1614                ops.push(Operation::Update(
1615                    Sha256::hash(b"key1"),
1616                    Sha256::hash(b"value1"),
1617                ));
1618                let root_hash = db.root(&mut hasher);
1619                assert!(!verify_proof(&mut hasher, &proof, 0, &ops, &root_hash));
1620            }
1621
1622            // Changing the start location should cause verification to fail
1623            {
1624                let root_hash = db.root(&mut hasher);
1625                assert!(!verify_proof(&mut hasher, &proof, 1, &ops, &root_hash));
1626            }
1627
1628            // Changing the root digest should cause verification to fail
1629            {
1630                assert!(!verify_proof(
1631                    &mut hasher,
1632                    &proof,
1633                    0,
1634                    &ops,
1635                    &Sha256::hash(b"invalid")
1636                ));
1637            }
1638
1639            // Changing the proof size should cause verification to fail
1640            {
1641                let mut proof = proof.clone();
1642                proof.size = 100;
1643                let root_hash = db.root(&mut hasher);
1644                assert!(!verify_proof(&mut hasher, &proof, 0, &ops, &root_hash));
1645            }
1646
1647            db.destroy().await.unwrap();
1648        });
1649    }
1650}