commonware_storage/adb/any/fixed/
unordered.rs

1//! An Any database implementation with an unordered key space and fixed-size values.
2
3use crate::{
4    adb::{
5        any::fixed::{
6            historical_proof, init_mmr_and_log, prune_db, Config, SNAPSHOT_READ_BUFFER_SIZE,
7        },
8        operation::fixed::unordered::Operation,
9        store::{self, Db},
10        Error,
11    },
12    index::{Index as _, Unordered as Index},
13    journal::contiguous::fixed::Journal,
14    mmr::{journaled::Mmr, Location, Proof, StandardHasher as Standard},
15    translator::Translator,
16};
17use commonware_codec::CodecFixed;
18use commonware_cryptography::Hasher as CHasher;
19use commonware_runtime::{Clock, Metrics, Storage};
20use commonware_utils::{Array, NZUsize};
21use futures::{pin_mut, try_join, StreamExt as _, TryFutureExt as _};
22use std::num::NonZeroU64;
23use tracing::debug;
24
25/// A key-value ADB based on an MMR over its log of operations, supporting authentication of any
26/// value ever associated with a key.
27pub struct Any<
28    E: Storage + Clock + Metrics,
29    K: Array,
30    V: CodecFixed<Cfg = ()>,
31    H: CHasher,
32    T: Translator,
33> {
34    /// An MMR over digests of the operations applied to the db.
35    ///
36    /// # Invariant
37    ///
38    /// - The number of leaves in this MMR always equals the number of operations in the unpruned
39    ///   `log`.
40    /// - The MMR is never pruned beyond the inactivity floor.
41    pub(crate) mmr: Mmr<E, H>,
42
43    /// A (pruned) log of all operations applied to the db in order of occurrence. The position of
44    /// each operation in the log is called its _location_, which is a stable identifier.
45    ///
46    /// # Invariants
47    ///
48    /// - An operation's location is always equal to the number of the MMR leaf storing the digest
49    ///   of the operation.
50    /// - The log is never pruned beyond the inactivity floor.
51    pub(crate) log: Journal<E, Operation<K, V>>,
52
53    /// A snapshot of all currently active operations in the form of a map from each key to the
54    /// location in the log containing its most recent update.
55    ///
56    /// # Invariant
57    ///
58    /// Only references operations of type [Operation::Update].
59    pub(crate) snapshot: Index<T, Location>,
60
61    /// A location before which all operations are "inactive" (that is, operations before this point
62    /// are over keys that have been updated by some operation at or after this point).
63    pub(crate) inactivity_floor_loc: Location,
64
65    /// The number of _steps_ to raise the inactivity floor. Each step involves moving exactly one
66    /// active operation to tip.
67    pub(crate) steps: u64,
68
69    /// Cryptographic hasher to re-use within mutable operations requiring digest computation.
70    pub(crate) hasher: Standard<H>,
71}
72
73impl<
74        E: Storage + Clock + Metrics,
75        K: Array,
76        V: CodecFixed<Cfg = ()>,
77        H: CHasher,
78        T: Translator,
79    > Any<E, K, V, H, T>
80{
81    /// Returns an [Any] adb initialized from `cfg`. Any uncommitted log operations will be
82    /// discarded and the state of the db will be as of the last committed operation.
83    pub async fn init(context: E, cfg: Config<T>) -> Result<Self, Error> {
84        let mut snapshot: Index<T, Location> =
85            Index::init(context.with_label("snapshot"), cfg.translator.clone());
86        let mut hasher = Standard::<H>::new();
87        let (inactivity_floor_loc, mmr, log) = init_mmr_and_log(context, cfg, &mut hasher).await?;
88
89        Self::build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, |_, _| {}).await?;
90
91        let db = Any {
92            mmr,
93            log,
94            snapshot,
95            inactivity_floor_loc,
96            steps: 0,
97            hasher,
98        };
99
100        Ok(db)
101    }
102
103    /// Builds the database's snapshot by replaying the log starting at the inactivity floor.
104    /// Assumes the log and mmr have the same number of operations and are not pruned beyond the
105    /// inactivity floor. The callback is invoked for each replayed operation, indicating activity
106    /// status updates. The first argument of the callback is the activity status of the operation,
107    /// and the second argument is the location of the operation it inactivates (if any).
108    pub(crate) async fn build_snapshot_from_log<F>(
109        inactivity_floor_loc: Location,
110        log: &Journal<E, Operation<K, V>>,
111        snapshot: &mut Index<T, Location>,
112        mut callback: F,
113    ) -> Result<(), Error>
114    where
115        F: FnMut(bool, Option<Location>),
116    {
117        let stream = log
118            .replay(NZUsize!(SNAPSHOT_READ_BUFFER_SIZE), *inactivity_floor_loc)
119            .await?;
120        pin_mut!(stream);
121        let last_commit_loc = log.size().await.saturating_sub(1);
122        while let Some(result) = stream.next().await {
123            let (i, op) = result?;
124            match op {
125                Operation::Delete(key) => {
126                    let result = super::delete_key(snapshot, log, &key).await?;
127                    callback(false, result);
128                }
129                Operation::Update(key, _) => {
130                    let new_loc = Location::new_unchecked(i);
131                    let old_loc = super::update_loc(snapshot, log, &key, new_loc).await?;
132                    callback(true, old_loc);
133                }
134                Operation::CommitFloor(_) => callback(i == last_commit_loc, None),
135            }
136        }
137
138        Ok(())
139    }
140
141    /// Get the update operation from `log` corresponding to a known location.
142    async fn get_update_op(
143        log: &Journal<E, Operation<K, V>>,
144        loc: Location,
145    ) -> Result<(K, V), Error> {
146        let Operation::Update(k, v) = log.read(*loc).await? else {
147            unreachable!("location does not reference update operation. loc={loc}");
148        };
149
150        Ok((k, v))
151    }
152
153    /// Get the value of `key` in the db, or None if it has no value.
154    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
155        Ok(self.get_key_loc(key).await?.map(|(v, _)| v))
156    }
157
158    /// Get the value & location of the active operation for `key` in the db, or None if it has no
159    /// value.
160    pub(crate) async fn get_key_loc(&self, key: &K) -> Result<Option<(V, Location)>, Error> {
161        for &loc in self.snapshot.get(key) {
162            let (k, v) = Self::get_update_op(&self.log, loc).await?;
163            if k == *key {
164                return Ok(Some((v, loc)));
165            }
166        }
167
168        Ok(None)
169    }
170
171    /// Get the number of operations that have been applied to this db, including those that are not
172    /// yet committed.
173    pub fn op_count(&self) -> Location {
174        self.mmr.leaves()
175    }
176
177    /// Whether the db currently has no active keys.
178    pub fn is_empty(&self) -> bool {
179        self.snapshot.keys() == 0
180    }
181
182    /// Return the inactivity floor location. This is the location before which all operations are
183    /// known to be inactive.
184    pub fn inactivity_floor_loc(&self) -> Location {
185        self.inactivity_floor_loc
186    }
187
188    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
189    /// subject to rollback until the next successful `commit`.
190    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
191        self.update_return_loc(key, value).await?;
192
193        Ok(())
194    }
195
196    /// Updates `key` to have value `value`, returning the old location of the key if it was
197    /// previously assigned some value, and None otherwise.
198    pub(crate) async fn update_return_loc(
199        &mut self,
200        key: K,
201        value: V,
202    ) -> Result<Option<Location>, Error> {
203        let new_loc = self.op_count();
204        let res = super::update_loc(&mut self.snapshot, &self.log, &key, new_loc).await?;
205
206        let op = Operation::Update(key, value);
207        self.as_shared().apply_op(op).await?;
208        if res.is_some() {
209            self.steps += 1;
210        }
211
212        Ok(res)
213    }
214
215    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
216    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
217    /// successful `commit`. Returns the location of the deleted value for the key (if any).
218    pub async fn delete(&mut self, key: K) -> Result<Option<Location>, Error> {
219        let r = super::delete_key(&mut self.snapshot, &self.log, &key).await?;
220        if r.is_some() {
221            self.as_shared().apply_op(Operation::Delete(key)).await?;
222            self.steps += 1;
223        };
224
225        Ok(r)
226    }
227
228    /// Returns a wrapper around the db's state that can be used to perform shared functions.
229    pub(crate) fn as_shared(
230        &mut self,
231    ) -> super::Shared<'_, E, Index<T, Location>, Operation<K, V>, H> {
232        super::Shared {
233            snapshot: &mut self.snapshot,
234            mmr: &mut self.mmr,
235            log: &mut self.log,
236            hasher: &mut self.hasher,
237        }
238    }
239
240    /// Return the root of the db.
241    ///
242    /// # Warning
243    ///
244    /// Panics if there are uncommitted operations.
245    pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
246        self.mmr.root(hasher)
247    }
248
249    /// Generate and return:
250    ///  1. a proof of all operations applied to the db in the range starting at (and including)
251    ///     location `start_loc`, and ending at the first of either:
252    ///     - the last operation performed, or
253    ///     - the operation `max_ops` from the start.
254    ///  2. the operations corresponding to the leaves in this range.
255    ///
256    /// # Errors
257    ///
258    /// Returns [crate::mmr::Error::LocationOverflow] if `start_loc` >
259    /// [crate::mmr::MAX_LOCATION].
260    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= `op_count`.
261    pub async fn proof(
262        &self,
263        start_loc: Location,
264        max_ops: NonZeroU64,
265    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
266        self.historical_proof(self.op_count(), start_loc, max_ops)
267            .await
268    }
269
270    /// Analogous to proof, but with respect to the state of the MMR when it had `op_count`
271    /// operations.
272    ///
273    /// # Errors
274    ///
275    /// Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
276    /// [crate::mmr::MAX_LOCATION].
277    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= `op_count`.
278    pub async fn historical_proof(
279        &self,
280        op_count: Location,
281        start_loc: Location,
282        max_ops: NonZeroU64,
283    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
284        historical_proof(&self.mmr, &self.log, op_count, start_loc, max_ops).await
285    }
286
287    /// Commit any pending operations to the database, ensuring their durability upon return from
288    /// this function. Also raises the inactivity floor according to the schedule.
289    ///
290    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
291    /// recover the database on restart.
292    pub async fn commit(&mut self) -> Result<(), Error> {
293        // Raise the inactivity floor by taking `self.steps` steps, plus 1 to account for the
294        // previous commit becoming inactive.
295        if self.is_empty() {
296            self.inactivity_floor_loc = self.op_count();
297            debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
298        } else {
299            let steps_to_take = self.steps + 1;
300            for _ in 0..steps_to_take {
301                let loc = self.inactivity_floor_loc;
302                self.inactivity_floor_loc = self.as_shared().raise_floor(loc).await?;
303            }
304        }
305        self.steps = 0;
306
307        // Apply the commit operation with the new inactivity floor.
308        let loc = self.inactivity_floor_loc;
309        let mut shared = self.as_shared();
310        shared.apply_op(Operation::CommitFloor(loc)).await?;
311
312        // Sync the log and process the updates to the MMR.
313        shared.sync_and_process_updates().await
314    }
315
316    /// Sync all database state to disk. While this isn't necessary to ensure durability of
317    /// committed operations, periodic invocation may reduce memory usage and the time required to
318    /// recover the database on restart.
319    pub async fn sync(&mut self) -> Result<(), Error> {
320        self.as_shared().sync().await
321    }
322
323    /// Prune historical operations prior to `target_prune_loc`. This does not affect the db's root
324    /// or current snapshot.
325    ///
326    /// # Errors
327    ///
328    /// - Returns [crate::mmr::Error::LocationOverflow] if `target_prune_loc` >
329    ///   [crate::mmr::MAX_LOCATION].
330    /// - Returns [crate::mmr::Error::RangeOutOfBounds] if `target_prune_loc` is greater than the
331    ///   inactivity floor.
332    pub async fn prune(&mut self, target_prune_loc: Location) -> Result<(), Error> {
333        let op_count = self.op_count();
334        prune_db(
335            &mut self.mmr,
336            &mut self.log,
337            &mut self.hasher,
338            target_prune_loc,
339            self.inactivity_floor_loc,
340            op_count,
341        )
342        .await
343    }
344
345    /// Close the db. Operations that have not been committed will be lost or rolled back on
346    /// restart.
347    pub async fn close(mut self) -> Result<(), Error> {
348        try_join!(
349            self.log.close().map_err(Error::Journal),
350            self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
351        )?;
352
353        Ok(())
354    }
355
356    /// Destroy the db, removing all data from disk.
357    pub async fn destroy(self) -> Result<(), Error> {
358        try_join!(
359            self.log.destroy().map_err(Error::Journal),
360            self.mmr.destroy().map_err(Error::Mmr),
361        )?;
362
363        Ok(())
364    }
365
366    /// Simulate an unclean shutdown by consuming the db without syncing (or only partially syncing)
367    /// the log and/or mmr. When _not_ fully syncing the mmr, the `write_limit` parameter dictates
368    /// how many mmr nodes to write during a partial sync (can be 0).
369    #[cfg(any(test, feature = "fuzzing"))]
370    pub async fn simulate_failure(
371        mut self,
372        sync_log: bool,
373        sync_mmr: bool,
374        write_limit: usize,
375    ) -> Result<(), Error> {
376        if sync_log {
377            self.log.sync().await?;
378        }
379        if sync_mmr {
380            assert_eq!(write_limit, 0);
381            self.mmr.sync(&mut self.hasher).await?;
382        } else if write_limit > 0 {
383            self.mmr
384                .simulate_partial_sync(&mut self.hasher, write_limit)
385                .await?;
386        }
387
388        Ok(())
389    }
390}
391
392impl<
393        E: Storage + Clock + Metrics,
394        K: Array,
395        V: CodecFixed<Cfg = ()>,
396        H: CHasher,
397        T: Translator,
398    > Db<E, K, V, T> for Any<E, K, V, H, T>
399{
400    fn op_count(&self) -> Location {
401        self.op_count()
402    }
403
404    fn inactivity_floor_loc(&self) -> Location {
405        self.inactivity_floor_loc()
406    }
407
408    async fn get(&self, key: &K) -> Result<Option<V>, store::Error> {
409        self.get(key).await.map_err(Into::into)
410    }
411
412    async fn update(&mut self, key: K, value: V) -> Result<(), store::Error> {
413        self.update(key, value).await.map_err(Into::into)
414    }
415
416    async fn delete(&mut self, key: K) -> Result<(), store::Error> {
417        self.delete(key).await.map(|_| ()).map_err(Into::into)
418    }
419
420    async fn commit(&mut self) -> Result<(), store::Error> {
421        self.commit().await.map_err(Into::into)
422    }
423
424    async fn sync(&mut self) -> Result<(), store::Error> {
425        self.sync().await.map_err(Into::into)
426    }
427
428    async fn prune(&mut self, target_prune_loc: Location) -> Result<(), store::Error> {
429        self.prune(target_prune_loc).await.map_err(Into::into)
430    }
431
432    async fn close(self) -> Result<(), store::Error> {
433        self.close().await.map_err(Into::into)
434    }
435
436    async fn destroy(self) -> Result<(), store::Error> {
437        self.destroy().await.map_err(Into::into)
438    }
439}
440
441// pub(super) so helpers can be used by the sync module.
442#[cfg(test)]
443pub(super) mod test {
444    use super::*;
445    use crate::{
446        adb::{
447            operation::fixed::{unordered::Operation, FixedOperation as _},
448            verify_proof,
449        },
450        index::{Index as IndexTrait, Unordered as Index},
451        mmr::{bitmap::BitMap, mem::Mmr as MemMmr, Position, StandardHasher as Standard},
452        translator::TwoCap,
453    };
454    use commonware_codec::{DecodeExt, FixedSize};
455    use commonware_cryptography::{sha256::Digest, Digest as _, Hasher as CHasher, Sha256};
456    use commonware_macros::test_traced;
457    use commonware_runtime::{
458        buffer::PoolRef,
459        deterministic::{self, Context},
460        Runner as _,
461    };
462    use commonware_utils::NZU64;
463    use rand::{
464        rngs::{OsRng, StdRng},
465        RngCore, SeedableRng,
466    };
467    use std::collections::{HashMap, HashSet};
468    use tracing::warn;
469
470    const SHA256_SIZE: usize = <Sha256 as CHasher>::Digest::SIZE;
471
472    // Janky page & cache sizes to exercise boundary conditions.
473    const PAGE_SIZE: usize = 101;
474    const PAGE_CACHE_SIZE: usize = 11;
475
476    pub(crate) fn any_db_config(suffix: &str) -> Config<TwoCap> {
477        Config {
478            mmr_journal_partition: format!("journal_{suffix}"),
479            mmr_metadata_partition: format!("metadata_{suffix}"),
480            mmr_items_per_blob: NZU64!(11),
481            mmr_write_buffer: NZUsize!(1024),
482            log_journal_partition: format!("log_journal_{suffix}"),
483            log_items_per_blob: NZU64!(7),
484            log_write_buffer: NZUsize!(1024),
485            translator: TwoCap,
486            thread_pool: None,
487            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
488        }
489    }
490
491    /// A type alias for the concrete [Any] type used in these unit tests.
492    pub(crate) type AnyTest = Any<deterministic::Context, Digest, Digest, Sha256, TwoCap>;
493
494    /// Return an `Any` database initialized with a fixed config.
495    async fn open_db(context: deterministic::Context) -> AnyTest {
496        AnyTest::init(context, any_db_config("partition"))
497            .await
498            .unwrap()
499    }
500
501    pub(crate) fn create_test_config(seed: u64) -> Config<TwoCap> {
502        Config {
503            mmr_journal_partition: format!("mmr_journal_{seed}"),
504            mmr_metadata_partition: format!("mmr_metadata_{seed}"),
505            mmr_items_per_blob: NZU64!(13), // intentionally small and janky size
506            mmr_write_buffer: NZUsize!(64),
507            log_journal_partition: format!("log_journal_{seed}"),
508            log_items_per_blob: NZU64!(11), // intentionally small and janky size
509            log_write_buffer: NZUsize!(64),
510            translator: TwoCap,
511            thread_pool: None,
512            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
513        }
514    }
515
516    /// Create a test database with unique partition names
517    pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
518        let seed = context.next_u64();
519        let config = create_test_config(seed);
520        AnyTest::init(context, config).await.unwrap()
521    }
522
523    /// Create n random operations. Some portion of the updates are deletes.
524    /// create_test_ops(n') is a suffix of create_test_ops(n) for n' > n.
525    pub(crate) fn create_test_ops(n: usize) -> Vec<Operation<Digest, Digest>> {
526        let mut rng = StdRng::seed_from_u64(1337);
527        let mut prev_key = Digest::random(&mut rng);
528        let mut ops = Vec::new();
529        for i in 0..n {
530            let key = Digest::random(&mut rng);
531            if i % 10 == 0 && i > 0 {
532                ops.push(Operation::Delete(prev_key));
533            } else {
534                let value = Digest::random(&mut rng);
535                ops.push(Operation::Update(key, value));
536                prev_key = key;
537            }
538        }
539        ops
540    }
541
542    /// Applies the given operations to the database.
543    pub(crate) async fn apply_ops(db: &mut AnyTest, ops: Vec<Operation<Digest, Digest>>) {
544        for op in ops {
545            match op {
546                Operation::Update(key, value) => {
547                    db.update(key, value).await.unwrap();
548                }
549                Operation::Delete(key) => {
550                    db.delete(key).await.unwrap();
551                }
552                Operation::CommitFloor(_) => {
553                    db.commit().await.unwrap();
554                }
555            }
556        }
557    }
558
559    #[test_traced("INFO")]
560    fn test_any_fixed_db_empty() {
561        let executor = deterministic::Runner::default();
562        executor.start(|context| async move {
563            let mut db = open_db(context.clone()).await;
564            let mut hasher = Standard::<Sha256>::new();
565            assert_eq!(db.op_count(), 0);
566            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
567            let empty_root = db.root(&mut hasher);
568            assert_eq!(empty_root, MemMmr::default().root(&mut hasher));
569
570            // Make sure closing/reopening gets us back to the same state, even after adding an
571            // uncommitted op, and even without a clean shutdown.
572            let d1 = Sha256::fill(1u8);
573            let d2 = Sha256::fill(2u8);
574            db.update(d1, d2).await.unwrap();
575            let mut db = open_db(context.clone()).await;
576            assert_eq!(db.op_count(), 0);
577            assert_eq!(db.root(&mut hasher), empty_root);
578
579            let empty_proof = Proof::default();
580            assert!(verify_proof(
581                &mut hasher,
582                &empty_proof,
583                Location::new_unchecked(0),
584                &[] as &[Operation<Digest, Digest>],
585                &empty_root
586            ));
587
588            // Test calling commit on an empty db which should make it (durably) non-empty.
589            db.commit().await.unwrap();
590            assert_eq!(db.op_count(), 1); // commit op added
591            let root = db.root(&mut hasher);
592            assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
593
594            // Re-opening the DB without a clean shutdown should still recover the correct state.
595            let mut db = open_db(context.clone()).await;
596            assert_eq!(db.op_count(), 1);
597            assert_eq!(db.root(&mut hasher), root);
598
599            // Empty proof should no longer verify.
600            assert!(!verify_proof(
601                &mut hasher,
602                &empty_proof,
603                Location::new_unchecked(0),
604                &[] as &[Operation<Digest, Digest>],
605                &root
606            ));
607
608            // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits on a
609            // non-empty db.
610            db.update(d1, d2).await.unwrap();
611            for _ in 1..100 {
612                db.commit().await.unwrap();
613                // Distance should equal 3 after the second commit, with inactivity_floor
614                // referencing the previous commit operation.
615                assert!(db.op_count() - db.inactivity_floor_loc <= 3);
616            }
617
618            // Confirm the inactivity floor is raised to tip when the db becomes empty.
619            db.delete(d1).await.unwrap();
620            db.commit().await.unwrap();
621            assert!(db.is_empty());
622            assert_eq!(db.op_count() - 1, db.inactivity_floor_loc);
623
624            db.destroy().await.unwrap();
625        });
626    }
627
628    #[test_traced("WARN")]
629    fn test_any_fixed_db_build_basic() {
630        let executor = deterministic::Runner::default();
631        executor.start(|context| async move {
632            // Build a db with 2 keys and make sure updates and deletions of those keys work as
633            // expected.
634            let mut hasher = Standard::<Sha256>::new();
635            let mut db = open_db(context.clone()).await;
636
637            let d1 = Sha256::fill(1u8);
638            let d2 = Sha256::fill(2u8);
639
640            assert!(db.get(&d1).await.unwrap().is_none());
641            assert!(db.get(&d2).await.unwrap().is_none());
642
643            db.update(d1, d2).await.unwrap();
644            assert_eq!(db.get(&d1).await.unwrap().unwrap(), d2);
645            assert!(db.get(&d2).await.unwrap().is_none());
646
647            db.update(d2, d1).await.unwrap();
648            assert_eq!(db.get(&d1).await.unwrap().unwrap(), d2);
649            assert_eq!(db.get(&d2).await.unwrap().unwrap(), d1);
650
651            db.delete(d1).await.unwrap(); // inactivates op 0
652            assert!(db.get(&d1).await.unwrap().is_none());
653            assert_eq!(db.get(&d2).await.unwrap().unwrap(), d1);
654
655            db.update(d1, d1).await.unwrap();
656            assert_eq!(db.get(&d1).await.unwrap().unwrap(), d1);
657
658            db.update(d2, d2).await.unwrap(); // inactivates op  1
659            assert_eq!(db.get(&d2).await.unwrap().unwrap(), d2);
660
661            assert_eq!(db.log.size().await, 5); // 4 updates, 1 deletion.
662            assert_eq!(db.snapshot.keys(), 2);
663            assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(0));
664            db.sync().await.unwrap();
665
666            // take one floor raising step, which should move the first active op (at location 3) to
667            // tip, leaving the floor at the next location (4).
668            let loc = db.inactivity_floor_loc;
669            db.inactivity_floor_loc = db.as_shared().raise_floor(loc).await.unwrap();
670            assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(4));
671            assert_eq!(db.log.size().await, 6); // 4 updates, 1 deletion, 1 commit
672            db.sync().await.unwrap();
673
674            // Delete all keys.
675            db.delete(d1).await.unwrap();
676            db.delete(d2).await.unwrap();
677            assert!(db.get(&d1).await.unwrap().is_none());
678            assert!(db.get(&d2).await.unwrap().is_none());
679            assert_eq!(db.log.size().await, 8); // 4 updates, 3 deletions, 1 commit
680
681            db.commit().await.unwrap();
682            // Since this db no longer has any active keys, the inactivity floor should have been
683            // set to tip.
684            assert_eq!(db.inactivity_floor_loc, db.op_count() - 1);
685            let root = db.root(&mut hasher);
686
687            // Multiple deletions of the same key should be a no-op.
688            db.delete(d1).await.unwrap();
689            assert_eq!(db.log.size().await, 9); // one more commit op added.
690            assert_eq!(db.root(&mut hasher), root);
691
692            // Deletions of non-existent keys should be a no-op.
693            let d3 = <Sha256 as CHasher>::Digest::decode(vec![2u8; SHA256_SIZE].as_ref()).unwrap();
694            assert!(db.delete(d3).await.unwrap().is_none());
695            assert_eq!(db.log.size().await, 9);
696            db.sync().await.unwrap();
697            assert_eq!(db.root(&mut hasher), root);
698
699            // Make sure closing/reopening gets us back to the same state.
700            assert_eq!(db.log.size().await, 9);
701            let root = db.root(&mut hasher);
702            db.close().await.unwrap();
703            let mut db = open_db(context.clone()).await;
704            assert_eq!(db.log.size().await, 9);
705            assert_eq!(db.root(&mut hasher), root);
706
707            // Re-activate the keys by updating them.
708            db.update(d1, d1).await.unwrap();
709            db.update(d2, d2).await.unwrap();
710            db.delete(d1).await.unwrap();
711            db.update(d2, d1).await.unwrap();
712            db.update(d1, d2).await.unwrap();
713            assert_eq!(db.snapshot.keys(), 2);
714
715            // Confirm close/reopen gets us back to the same state.
716            db.commit().await.unwrap();
717            let root = db.root(&mut hasher);
718            db.close().await.unwrap();
719            let mut db = open_db(context).await;
720            assert_eq!(db.root(&mut hasher), root);
721            assert_eq!(db.snapshot.keys(), 2);
722
723            // Commit will raise the inactivity floor, which won't affect state but will affect the
724            // root.
725            db.commit().await.unwrap();
726
727            assert!(db.root(&mut hasher) != root);
728
729            // Pruning inactive ops should not affect current state or root
730            let root = db.root(&mut hasher);
731            db.prune(db.inactivity_floor_loc()).await.unwrap();
732            assert_eq!(db.snapshot.keys(), 2);
733            assert_eq!(db.root(&mut hasher), root);
734
735            db.destroy().await.unwrap();
736        });
737    }
738
739    #[test_traced("WARN")]
740    fn test_any_fixed_db_build_and_authenticate() {
741        let executor = deterministic::Runner::default();
742        // Build a db with 1000 keys, some of which we update and some of which we delete, and
743        // confirm that the end state of the db matches that of an identically updated hashmap.
744        const ELEMENTS: u64 = 1000;
745        executor.start(|context| async move {
746            let mut hasher = Standard::<Sha256>::new();
747            let mut db = open_db(context.clone()).await;
748
749            let mut map = HashMap::<Digest, Digest>::default();
750            for i in 0u64..ELEMENTS {
751                let k = Sha256::hash(&i.to_be_bytes());
752                let v = Sha256::hash(&(i * 1000).to_be_bytes());
753                db.update(k, v).await.unwrap();
754                map.insert(k, v);
755            }
756
757            // Update every 3rd key
758            for i in 0u64..ELEMENTS {
759                if i % 3 != 0 {
760                    continue;
761                }
762                let k = Sha256::hash(&i.to_be_bytes());
763                let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
764                db.update(k, v).await.unwrap();
765                map.insert(k, v);
766            }
767
768            // Delete every 7th key
769            for i in 0u64..ELEMENTS {
770                if i % 7 != 1 {
771                    continue;
772                }
773                let k = Sha256::hash(&i.to_be_bytes());
774                db.delete(k).await.unwrap();
775                map.remove(&k);
776            }
777
778            assert_eq!(db.op_count(), 1477);
779            assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(0));
780            assert_eq!(db.log.size().await, 1477);
781            assert_eq!(db.snapshot.items(), 857);
782
783            // Test that commit + sync w/ pruning will raise the activity floor.
784            db.commit().await.unwrap();
785            db.sync().await.unwrap();
786            db.prune(db.inactivity_floor_loc()).await.unwrap();
787            assert_eq!(db.op_count(), 1956);
788            assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(837));
789            assert_eq!(db.snapshot.items(), 857);
790
791            // Close & reopen the db, making sure the re-opened db has exactly the same state.
792            let root = db.root(&mut hasher);
793            db.close().await.unwrap();
794            let mut db = open_db(context.clone()).await;
795            assert_eq!(root, db.root(&mut hasher));
796            assert_eq!(db.op_count(), 1956);
797            assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(837));
798            assert_eq!(db.snapshot.items(), 857);
799
800            // Confirm the db's state matches that of the separate map we computed independently.
801            for i in 0u64..1000 {
802                let k = Sha256::hash(&i.to_be_bytes());
803                if let Some(map_value) = map.get(&k) {
804                    let Some(db_value) = db.get(&k).await.unwrap() else {
805                        panic!("key not found in db: {k}");
806                    };
807                    assert_eq!(*map_value, db_value);
808                } else {
809                    assert!(db.get(&k).await.unwrap().is_none());
810                }
811            }
812
813            // Make sure size-constrained batches of operations are provable from the oldest
814            // retained op to tip.
815            let max_ops = NZU64!(4);
816            let end_loc = db.op_count();
817            let start_pos = db.mmr.pruned_to_pos();
818            let start_loc = Location::try_from(start_pos).unwrap();
819            // Raise the inactivity floor via commit and make sure historical inactive operations
820            // are still provable.
821            db.commit().await.unwrap();
822            let root = db.root(&mut hasher);
823            assert!(start_loc < db.inactivity_floor_loc);
824
825            for loc in *start_loc..*end_loc {
826                let loc = Location::new_unchecked(loc);
827                let (proof, log) = db.proof(loc, max_ops).await.unwrap();
828                assert!(verify_proof(&mut hasher, &proof, loc, &log, &root));
829            }
830
831            db.destroy().await.unwrap();
832        });
833    }
834
835    /// Test that various types of unclean shutdown while updating a non-empty DB recover to the
836    /// empty DB on re-open.
837    #[test_traced("WARN")]
838    fn test_any_fixed_non_empty_db_recovery() {
839        let executor = deterministic::Runner::default();
840        executor.start(|context| async move {
841            let mut hasher = Standard::<Sha256>::new();
842            let mut db = open_db(context.clone()).await;
843
844            // Insert 1000 keys then sync.
845            const ELEMENTS: u64 = 1000;
846            for i in 0u64..ELEMENTS {
847                let k = Sha256::hash(&i.to_be_bytes());
848                let v = Sha256::hash(&(i * 1000).to_be_bytes());
849                db.update(k, v).await.unwrap();
850            }
851            db.commit().await.unwrap();
852            db.prune(db.inactivity_floor_loc()).await.unwrap();
853            let root = db.root(&mut hasher);
854            let op_count = db.op_count();
855            let inactivity_floor_loc = db.inactivity_floor_loc();
856
857            // Reopen DB without clean shutdown and make sure the state is the same.
858            let mut db = open_db(context.clone()).await;
859            assert_eq!(db.op_count(), op_count);
860            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
861            assert_eq!(db.root(&mut hasher), root);
862
863            async fn apply_more_ops(db: &mut AnyTest) {
864                for i in 0u64..ELEMENTS {
865                    let k = Sha256::hash(&i.to_be_bytes());
866                    let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
867                    db.update(k, v).await.unwrap();
868                }
869            }
870
871            // Insert operations without commit, then simulate failure, syncing nothing.
872            apply_more_ops(&mut db).await;
873            db.simulate_failure(false, false, 0).await.unwrap();
874            let mut db = open_db(context.clone()).await;
875            assert_eq!(db.op_count(), op_count);
876            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
877            assert_eq!(db.root(&mut hasher), root);
878
879            // Repeat, though this time sync the log and only 10 elements of the mmr.
880            apply_more_ops(&mut db).await;
881            db.simulate_failure(true, false, 10).await.unwrap();
882            let mut db = open_db(context.clone()).await;
883            assert_eq!(db.op_count(), op_count);
884            assert_eq!(db.root(&mut hasher), root);
885
886            // Repeat, though this time only fully sync the mmr.
887            apply_more_ops(&mut db).await;
888            db.simulate_failure(false, true, 0).await.unwrap();
889            let mut db = open_db(context.clone()).await;
890            assert_eq!(db.op_count(), op_count);
891            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
892            assert_eq!(db.root(&mut hasher), root);
893
894            // One last check that re-open without proper shutdown still recovers the correct state.
895            apply_more_ops(&mut db).await;
896            apply_more_ops(&mut db).await;
897            apply_more_ops(&mut db).await;
898            let mut db = open_db(context.clone()).await;
899            assert_eq!(db.op_count(), op_count);
900            assert_eq!(db.root(&mut hasher), root);
901
902            // Apply the ops one last time but fully commit them this time, then clean up.
903            apply_more_ops(&mut db).await;
904            db.commit().await.unwrap();
905            let db = open_db(context.clone()).await;
906            assert!(db.op_count() > op_count);
907            assert_ne!(db.inactivity_floor_loc(), inactivity_floor_loc);
908            assert_ne!(db.root(&mut hasher), root);
909
910            db.destroy().await.unwrap();
911        });
912    }
913
914    /// Test that various types of unclean shutdown while updating an empty DB recover to the empty
915    /// DB on re-open.
916    #[test_traced("WARN")]
917    fn test_any_fixed_empty_db_recovery() {
918        let executor = deterministic::Runner::default();
919        executor.start(|context| async move {
920            // Initialize an empty db.
921            let mut hasher = Standard::<Sha256>::new();
922            let db = open_db(context.clone()).await;
923            let root = db.root(&mut hasher);
924
925            // Reopen DB without clean shutdown and make sure the state is the same.
926            let mut db = open_db(context.clone()).await;
927            assert_eq!(db.op_count(), 0);
928            assert_eq!(db.root(&mut hasher), root);
929
930            async fn apply_ops(db: &mut AnyTest) {
931                for i in 0u64..1000 {
932                    let k = Sha256::hash(&i.to_be_bytes());
933                    let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
934                    db.update(k, v).await.unwrap();
935                }
936            }
937
938            // Insert operations without commit then simulate failure, syncing nothing except one
939            // element of the mmr.
940            apply_ops(&mut db).await;
941            db.simulate_failure(false, false, 1).await.unwrap();
942            let mut db = open_db(context.clone()).await;
943            assert_eq!(db.op_count(), 0);
944            assert_eq!(db.root(&mut hasher), root);
945
946            // Repeat, though this time sync the log.
947            apply_ops(&mut db).await;
948            db.simulate_failure(true, false, 0).await.unwrap();
949            let mut db = open_db(context.clone()).await;
950            assert_eq!(db.op_count(), 0);
951            assert_eq!(db.root(&mut hasher), root);
952
953            // Repeat, though this time sync the mmr.
954            apply_ops(&mut db).await;
955            db.simulate_failure(false, true, 0).await.unwrap();
956            let mut db = open_db(context.clone()).await;
957            assert_eq!(db.op_count(), 0);
958            assert_eq!(db.root(&mut hasher), root);
959
960            // One last check that re-open without proper shutdown still recovers the correct state.
961            apply_ops(&mut db).await;
962            apply_ops(&mut db).await;
963            apply_ops(&mut db).await;
964            let mut db = open_db(context.clone()).await;
965            assert_eq!(db.op_count(), 0);
966            assert_eq!(db.root(&mut hasher), root);
967
968            // Apply the ops one last time but fully commit them this time, then clean up.
969            apply_ops(&mut db).await;
970            db.commit().await.unwrap();
971            let db = open_db(context.clone()).await;
972            assert!(db.op_count() > 0);
973            assert_ne!(db.root(&mut hasher), root);
974
975            db.destroy().await.unwrap();
976        });
977    }
978
979    // Test that replaying multiple updates of the same key on startup doesn't leave behind old data
980    // in the snapshot.
981    #[test_traced("WARN")]
982    fn test_any_fixed_db_log_replay() {
983        let executor = deterministic::Runner::default();
984        executor.start(|context| async move {
985            let mut hasher = Standard::<Sha256>::new();
986            let mut db = open_db(context.clone()).await;
987
988            // Update the same key many times.
989            const UPDATES: u64 = 100;
990            let k = Sha256::hash(&UPDATES.to_be_bytes());
991            for i in 0u64..UPDATES {
992                let v = Sha256::hash(&(i * 1000).to_be_bytes());
993                db.update(k, v).await.unwrap();
994            }
995            db.commit().await.unwrap();
996            let root = db.root(&mut hasher);
997            db.close().await.unwrap();
998
999            // Simulate a failed commit and test that the log replay doesn't leave behind old data.
1000            let db = open_db(context.clone()).await;
1001            let iter = db.snapshot.get(&k);
1002            assert_eq!(iter.cloned().collect::<Vec<_>>().len(), 1);
1003            assert_eq!(db.root(&mut hasher), root);
1004
1005            db.destroy().await.unwrap();
1006        });
1007    }
1008
1009    #[test_traced("WARN")]
1010    fn test_any_fixed_db_multiple_commits_delete_gets_replayed() {
1011        let executor = deterministic::Runner::default();
1012        executor.start(|context| async move {
1013            let mut hasher = Standard::<Sha256>::new();
1014            let mut db = open_db(context.clone()).await;
1015
1016            let mut map = HashMap::<Digest, Digest>::default();
1017            const ELEMENTS: u64 = 10;
1018            // insert & commit multiple batches to ensure repeated inactivity floor raising.
1019            for j in 0u64..ELEMENTS {
1020                for i in 0u64..ELEMENTS {
1021                    let k = Sha256::hash(&(j * 1000 + i).to_be_bytes());
1022                    let v = Sha256::hash(&(i * 1000).to_be_bytes());
1023                    db.update(k, v).await.unwrap();
1024                    map.insert(k, v);
1025                }
1026                db.commit().await.unwrap();
1027            }
1028            let k = Sha256::hash(&((ELEMENTS - 1) * 1000 + (ELEMENTS - 1)).to_be_bytes());
1029
1030            // Do one last delete operation which will be above the inactivity
1031            // floor, to make sure it gets replayed on restart.
1032            db.delete(k).await.unwrap();
1033            db.commit().await.unwrap();
1034            assert!(db.get(&k).await.unwrap().is_none());
1035
1036            // Close & reopen the db, making sure the re-opened db has exactly the same state.
1037            let root = db.root(&mut hasher);
1038            db.close().await.unwrap();
1039            let db = open_db(context.clone()).await;
1040            assert_eq!(root, db.root(&mut hasher));
1041            assert!(db.get(&k).await.unwrap().is_none());
1042
1043            db.destroy().await.unwrap();
1044        });
1045    }
1046
1047    /// This test builds a random database, and makes sure that its state can be replayed by
1048    /// `build_snapshot_from_log` with a bitmap to correctly capture the active operations.
1049    #[test_traced("WARN")]
1050    fn test_any_fixed_db_build_snapshot_with_bitmap() {
1051        // Number of elements to initially insert into the db.
1052        const ELEMENTS: u64 = 1000;
1053
1054        // Use a non-deterministic rng seed to ensure each run is different.
1055        let rng_seed = OsRng.next_u64();
1056        // Log the seed with high visibility to make failures reproducible.
1057        warn!("rng_seed={}", rng_seed);
1058        let mut rng = StdRng::seed_from_u64(rng_seed);
1059
1060        let executor = deterministic::Runner::default();
1061        executor.start(|context| async move {
1062            let mut hasher = Standard::<Sha256>::new();
1063            let mut db = open_db(context.clone()).await;
1064
1065            for i in 0u64..ELEMENTS {
1066                let k = Sha256::hash(&i.to_be_bytes());
1067                let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1068                db.update(k, v).await.unwrap();
1069            }
1070
1071            // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
1072            // frequency.
1073            for _ in 0u64..ELEMENTS * 10 {
1074                let rand_key = Sha256::hash(&(rng.next_u64() % ELEMENTS).to_be_bytes());
1075                if rng.next_u32() % 7 == 0 {
1076                    db.delete(rand_key).await.unwrap();
1077                    continue;
1078                }
1079                let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1080                db.update(rand_key, v).await.unwrap();
1081                if rng.next_u32() % 20 == 0 {
1082                    // Commit every ~20 updates.
1083                    db.commit().await.unwrap();
1084                }
1085            }
1086            db.commit().await.unwrap();
1087
1088            let root = db.root(&mut hasher);
1089            let inactivity_floor_loc = db.inactivity_floor_loc;
1090
1091            // Close the db, then replay its operations with a bitmap.
1092            db.close().await.unwrap();
1093            // Initialize the bitmap based on the current db's inactivity floor.
1094            let mut bitmap = BitMap::<_, SHA256_SIZE>::new();
1095            for _ in 0..*inactivity_floor_loc {
1096                bitmap.push(false);
1097            }
1098            bitmap.merkleize(&mut hasher).await.unwrap();
1099
1100            // Initialize the db's mmr/log.
1101            let cfg = any_db_config("partition");
1102            let (inactivity_floor_loc, mmr, log) =
1103                init_mmr_and_log(context.clone(), cfg, &mut hasher)
1104                    .await
1105                    .unwrap();
1106
1107            // Replay log to populate the bitmap. Use a TwoCap instead of EightCap here so we exercise some collisions.
1108            let mut snapshot = Index::init(context.with_label("snapshot"), TwoCap);
1109            AnyTest::build_snapshot_from_log(
1110                inactivity_floor_loc,
1111                &log,
1112                &mut snapshot,
1113                |append, loc| {
1114                    bitmap.push(append);
1115                    if let Some(loc) = loc {
1116                        bitmap.set_bit(*loc, false);
1117                    }
1118                },
1119            )
1120            .await
1121            .unwrap();
1122
1123            // Check the recovered state is correct.
1124            let db = AnyTest {
1125                mmr,
1126                log,
1127                snapshot,
1128                inactivity_floor_loc,
1129                steps: 0,
1130                hasher: Standard::<Sha256>::new(),
1131            };
1132            assert_eq!(db.root(&mut hasher), root);
1133
1134            // Check the bitmap state matches that of the snapshot.
1135            let items = db.log.size().await;
1136            assert_eq!(bitmap.len(), items);
1137            let mut active_positions = HashSet::new();
1138            // This loop checks that the expected true bits are true in the bitmap.
1139            for pos in *db.inactivity_floor_loc..items {
1140                let item = db.log.read(pos).await.unwrap();
1141                let Some(item_key) = item.key() else {
1142                    // `item` is a commit
1143                    continue;
1144                };
1145                let iter = db.snapshot.get(item_key);
1146                for loc in iter {
1147                    if *loc == pos {
1148                        // Found an active op.
1149                        active_positions.insert(pos);
1150                        assert!(bitmap.get_bit(pos));
1151                        break;
1152                    }
1153                }
1154            }
1155            // This loop checks that the expected false bits are false in the bitmap.
1156            for pos in *db.inactivity_floor_loc..items - 1 {
1157                assert_eq!(bitmap.get_bit(pos), active_positions.contains(&pos));
1158            }
1159            assert!(bitmap.get_bit(items - 1)); // last commit should always be active
1160
1161            db.destroy().await.unwrap();
1162        });
1163    }
1164
1165    #[test]
1166    fn test_any_fixed_db_historical_proof_basic() {
1167        let executor = deterministic::Runner::default();
1168        executor.start(|context| async move {
1169            let mut db = create_test_db(context.clone()).await;
1170            let ops = create_test_ops(20);
1171            apply_ops(&mut db, ops.clone()).await;
1172            db.commit().await.unwrap();
1173            let mut hasher = Standard::<Sha256>::new();
1174            let root_hash = db.root(&mut hasher);
1175            let original_op_count = db.op_count();
1176
1177            // Historical proof should match "regular" proof when historical size == current database size
1178            let max_ops = NZU64!(10);
1179            let (historical_proof, historical_ops) = db
1180                .historical_proof(original_op_count, Location::new_unchecked(5), max_ops)
1181                .await
1182                .unwrap();
1183            let (regular_proof, regular_ops) =
1184                db.proof(Location::new_unchecked(5), max_ops).await.unwrap();
1185
1186            assert_eq!(historical_proof.size, regular_proof.size);
1187            assert_eq!(historical_proof.digests, regular_proof.digests);
1188            assert_eq!(historical_ops, regular_ops);
1189            assert_eq!(historical_ops, ops[5..15]);
1190            assert!(verify_proof(
1191                &mut hasher,
1192                &historical_proof,
1193                Location::new_unchecked(5),
1194                &historical_ops,
1195                &root_hash
1196            ));
1197
1198            // Add more operations to the database
1199            let more_ops = create_test_ops(5);
1200            apply_ops(&mut db, more_ops.clone()).await;
1201            db.commit().await.unwrap();
1202
1203            // Historical proof should remain the same even though database has grown
1204            let (historical_proof, historical_ops) = db
1205                .historical_proof(original_op_count, Location::new_unchecked(5), NZU64!(10))
1206                .await
1207                .unwrap();
1208            assert_eq!(
1209                historical_proof.size,
1210                Position::try_from(original_op_count).unwrap()
1211            );
1212            assert_eq!(historical_proof.size, regular_proof.size);
1213            assert_eq!(historical_ops.len(), 10);
1214            assert_eq!(historical_proof.digests, regular_proof.digests);
1215            assert_eq!(historical_ops, regular_ops);
1216            assert!(verify_proof(
1217                &mut hasher,
1218                &historical_proof,
1219                Location::new_unchecked(5),
1220                &historical_ops,
1221                &root_hash
1222            ));
1223
1224            // Try to get historical proof with op_count > number of operations and confirm it
1225            // returns RangeOutOfBounds error.
1226            assert!(matches!(
1227                db.historical_proof(db.op_count() + 1, Location::new_unchecked(5), NZU64!(10))
1228                    .await,
1229                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1230            ));
1231
1232            db.destroy().await.unwrap();
1233        });
1234    }
1235
1236    #[test]
1237    fn test_any_fixed_db_historical_proof_edge_cases() {
1238        let executor = deterministic::Runner::default();
1239        executor.start(|context| async move {
1240            let mut db = create_test_db(context.clone()).await;
1241            let ops = create_test_ops(50);
1242            apply_ops(&mut db, ops.clone()).await;
1243            db.commit().await.unwrap();
1244
1245            let mut hasher = Standard::<Sha256>::new();
1246
1247            // Test singleton database
1248            let (single_proof, single_ops) = db
1249                .historical_proof(
1250                    Location::new_unchecked(1),
1251                    Location::new_unchecked(0),
1252                    NZU64!(1),
1253                )
1254                .await
1255                .unwrap();
1256            assert_eq!(
1257                single_proof.size,
1258                Position::try_from(Location::new_unchecked(1)).unwrap()
1259            );
1260            assert_eq!(single_ops.len(), 1);
1261
1262            // Create historical database with single operation
1263            let mut single_db = create_test_db(context.clone()).await;
1264            apply_ops(&mut single_db, ops[0..1].to_vec()).await;
1265            // Don't commit - this changes the root due to commit operations
1266            single_db.sync().await.unwrap();
1267            let single_root = single_db.root(&mut hasher);
1268
1269            assert!(verify_proof(
1270                &mut hasher,
1271                &single_proof,
1272                Location::new_unchecked(0),
1273                &single_ops,
1274                &single_root
1275            ));
1276
1277            // Test requesting more operations than available in historical position
1278            let (_limited_proof, limited_ops) = db
1279                .historical_proof(
1280                    Location::new_unchecked(10),
1281                    Location::new_unchecked(5),
1282                    NZU64!(20),
1283                )
1284                .await
1285                .unwrap();
1286            assert_eq!(limited_ops.len(), 5); // Should be limited by historical position
1287            assert_eq!(limited_ops, ops[5..10]);
1288
1289            // Test proof at minimum historical position
1290            let (min_proof, min_ops) = db
1291                .historical_proof(
1292                    Location::new_unchecked(3),
1293                    Location::new_unchecked(0),
1294                    NZU64!(3),
1295                )
1296                .await
1297                .unwrap();
1298            assert_eq!(
1299                min_proof.size,
1300                Position::try_from(Location::new_unchecked(3)).unwrap()
1301            );
1302            assert_eq!(min_ops.len(), 3);
1303            assert_eq!(min_ops, ops[0..3]);
1304
1305            single_db.destroy().await.unwrap();
1306            db.destroy().await.unwrap();
1307        });
1308    }
1309
1310    #[test]
1311    fn test_any_fixed_db_historical_proof_different_historical_sizes() {
1312        let executor = deterministic::Runner::default();
1313        executor.start(|context| async move {
1314            let mut db = create_test_db(context.clone()).await;
1315            let ops = create_test_ops(100);
1316            apply_ops(&mut db, ops.clone()).await;
1317            db.commit().await.unwrap();
1318
1319            let mut hasher = Standard::<Sha256>::new();
1320
1321            // Test historical proof generation for several historical states.
1322            let start_loc = Location::new_unchecked(20);
1323            let max_ops = NZU64!(10);
1324            for end_loc in 31..50 {
1325                let end_loc = Location::new_unchecked(end_loc);
1326                let (historical_proof, historical_ops) = db
1327                    .historical_proof(end_loc, start_loc, max_ops)
1328                    .await
1329                    .unwrap();
1330
1331                assert_eq!(historical_proof.size, Position::try_from(end_loc).unwrap());
1332
1333                // Create  reference database at the given historical size
1334                let mut ref_db = create_test_db(context.clone()).await;
1335                apply_ops(&mut ref_db, ops[0..*end_loc as usize].to_vec()).await;
1336                // Sync to process dirty nodes but don't commit - commit changes the root due to commit operations
1337                ref_db.sync().await.unwrap();
1338
1339                let (ref_proof, ref_ops) = ref_db.proof(start_loc, max_ops).await.unwrap();
1340                assert_eq!(ref_proof.size, historical_proof.size);
1341                assert_eq!(ref_ops, historical_ops);
1342                assert_eq!(ref_proof.digests, historical_proof.digests);
1343                let end_loc = std::cmp::min(start_loc.checked_add(max_ops.get()).unwrap(), end_loc);
1344                assert_eq!(ref_ops, ops[*start_loc as usize..*end_loc as usize]);
1345
1346                // Verify proof against reference root
1347                let ref_root = ref_db.root(&mut hasher);
1348                assert!(verify_proof(
1349                    &mut hasher,
1350                    &historical_proof,
1351                    start_loc,
1352                    &historical_ops,
1353                    &ref_root
1354                ),);
1355
1356                ref_db.destroy().await.unwrap();
1357            }
1358
1359            db.destroy().await.unwrap();
1360        });
1361    }
1362
1363    #[test]
1364    fn test_any_fixed_db_historical_proof_invalid() {
1365        let executor = deterministic::Runner::default();
1366        executor.start(|context| async move {
1367            let mut db = create_test_db(context.clone()).await;
1368            let ops = create_test_ops(10);
1369            apply_ops(&mut db, ops).await;
1370            db.commit().await.unwrap();
1371
1372            let historical_op_count = Location::new_unchecked(5);
1373            let historical_mmr_size = Position::try_from(historical_op_count).unwrap();
1374            let (proof, ops) = db
1375                .historical_proof(historical_op_count, Location::new_unchecked(1), NZU64!(10))
1376                .await
1377                .unwrap();
1378            assert_eq!(proof.size, historical_mmr_size);
1379            assert_eq!(ops.len(), 4);
1380
1381            let mut hasher = Standard::<Sha256>::new();
1382
1383            // Changing the proof digests should cause verification to fail
1384            {
1385                let mut proof = proof.clone();
1386                proof.digests[0] = Sha256::hash(b"invalid");
1387                let root_hash = db.root(&mut hasher);
1388                assert!(!verify_proof(
1389                    &mut hasher,
1390                    &proof,
1391                    Location::new_unchecked(0),
1392                    &ops,
1393                    &root_hash
1394                ));
1395            }
1396            {
1397                let mut proof = proof.clone();
1398                proof.digests.push(Sha256::hash(b"invalid"));
1399                let root_hash = db.root(&mut hasher);
1400                assert!(!verify_proof(
1401                    &mut hasher,
1402                    &proof,
1403                    Location::new_unchecked(0),
1404                    &ops,
1405                    &root_hash
1406                ));
1407            }
1408
1409            // Changing the ops should cause verification to fail
1410            {
1411                let mut ops = ops.clone();
1412                ops[0] = Operation::Update(Sha256::hash(b"key1"), Sha256::hash(b"value1"));
1413                let root_hash = db.root(&mut hasher);
1414                assert!(!verify_proof(
1415                    &mut hasher,
1416                    &proof,
1417                    Location::new_unchecked(0),
1418                    &ops,
1419                    &root_hash
1420                ));
1421            }
1422            {
1423                let mut ops = ops.clone();
1424                ops.push(Operation::Update(
1425                    Sha256::hash(b"key1"),
1426                    Sha256::hash(b"value1"),
1427                ));
1428                let root_hash = db.root(&mut hasher);
1429                assert!(!verify_proof(
1430                    &mut hasher,
1431                    &proof,
1432                    Location::new_unchecked(0),
1433                    &ops,
1434                    &root_hash
1435                ));
1436            }
1437
1438            // Changing the start location should cause verification to fail
1439            {
1440                let root_hash = db.root(&mut hasher);
1441                assert!(!verify_proof(
1442                    &mut hasher,
1443                    &proof,
1444                    Location::new_unchecked(1),
1445                    &ops,
1446                    &root_hash
1447                ));
1448            }
1449
1450            // Changing the root digest should cause verification to fail
1451            {
1452                assert!(!verify_proof(
1453                    &mut hasher,
1454                    &proof,
1455                    Location::new_unchecked(0),
1456                    &ops,
1457                    &Sha256::hash(b"invalid")
1458                ));
1459            }
1460
1461            // Changing the proof size should cause verification to fail
1462            {
1463                let mut proof = proof.clone();
1464                proof.size = Position::new(100);
1465                let root_hash = db.root(&mut hasher);
1466                assert!(!verify_proof(
1467                    &mut hasher,
1468                    &proof,
1469                    Location::new_unchecked(0),
1470                    &ops,
1471                    &root_hash
1472                ));
1473            }
1474
1475            db.destroy().await.unwrap();
1476        });
1477    }
1478}