1use crate::{
4 adb::{
5 any::fixed::{
6 historical_proof, init_mmr_and_log, prune_db, Config, SNAPSHOT_READ_BUFFER_SIZE,
7 },
8 operation::fixed::unordered::Operation,
9 store::{self, Db},
10 Error,
11 },
12 index::{Index as _, Unordered as Index},
13 journal::contiguous::fixed::Journal,
14 mmr::{journaled::Mmr, Location, Proof, StandardHasher as Standard},
15 translator::Translator,
16};
17use commonware_codec::CodecFixed;
18use commonware_cryptography::Hasher as CHasher;
19use commonware_runtime::{Clock, Metrics, Storage};
20use commonware_utils::{Array, NZUsize};
21use futures::{pin_mut, try_join, StreamExt as _, TryFutureExt as _};
22use std::num::NonZeroU64;
23use tracing::debug;
24
25pub struct Any<
28 E: Storage + Clock + Metrics,
29 K: Array,
30 V: CodecFixed<Cfg = ()>,
31 H: CHasher,
32 T: Translator,
33> {
34 pub(crate) mmr: Mmr<E, H>,
42
43 pub(crate) log: Journal<E, Operation<K, V>>,
52
53 pub(crate) snapshot: Index<T, Location>,
60
61 pub(crate) inactivity_floor_loc: Location,
64
65 pub(crate) steps: u64,
68
69 pub(crate) hasher: Standard<H>,
71}
72
73impl<
74 E: Storage + Clock + Metrics,
75 K: Array,
76 V: CodecFixed<Cfg = ()>,
77 H: CHasher,
78 T: Translator,
79 > Any<E, K, V, H, T>
80{
81 pub async fn init(context: E, cfg: Config<T>) -> Result<Self, Error> {
84 let mut snapshot: Index<T, Location> =
85 Index::init(context.with_label("snapshot"), cfg.translator.clone());
86 let mut hasher = Standard::<H>::new();
87 let (inactivity_floor_loc, mmr, log) = init_mmr_and_log(context, cfg, &mut hasher).await?;
88
89 Self::build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, |_, _| {}).await?;
90
91 let db = Any {
92 mmr,
93 log,
94 snapshot,
95 inactivity_floor_loc,
96 steps: 0,
97 hasher,
98 };
99
100 Ok(db)
101 }
102
103 pub(crate) async fn build_snapshot_from_log<F>(
109 inactivity_floor_loc: Location,
110 log: &Journal<E, Operation<K, V>>,
111 snapshot: &mut Index<T, Location>,
112 mut callback: F,
113 ) -> Result<(), Error>
114 where
115 F: FnMut(bool, Option<Location>),
116 {
117 let stream = log
118 .replay(NZUsize!(SNAPSHOT_READ_BUFFER_SIZE), *inactivity_floor_loc)
119 .await?;
120 pin_mut!(stream);
121 let last_commit_loc = log.size().await.saturating_sub(1);
122 while let Some(result) = stream.next().await {
123 let (i, op) = result?;
124 match op {
125 Operation::Delete(key) => {
126 let result = super::delete_key(snapshot, log, &key).await?;
127 callback(false, result);
128 }
129 Operation::Update(key, _) => {
130 let new_loc = Location::new_unchecked(i);
131 let old_loc = super::update_loc(snapshot, log, &key, new_loc).await?;
132 callback(true, old_loc);
133 }
134 Operation::CommitFloor(_) => callback(i == last_commit_loc, None),
135 }
136 }
137
138 Ok(())
139 }
140
141 async fn get_update_op(
143 log: &Journal<E, Operation<K, V>>,
144 loc: Location,
145 ) -> Result<(K, V), Error> {
146 let Operation::Update(k, v) = log.read(*loc).await? else {
147 unreachable!("location does not reference update operation. loc={loc}");
148 };
149
150 Ok((k, v))
151 }
152
153 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
155 Ok(self.get_key_loc(key).await?.map(|(v, _)| v))
156 }
157
158 pub(crate) async fn get_key_loc(&self, key: &K) -> Result<Option<(V, Location)>, Error> {
161 for &loc in self.snapshot.get(key) {
162 let (k, v) = Self::get_update_op(&self.log, loc).await?;
163 if k == *key {
164 return Ok(Some((v, loc)));
165 }
166 }
167
168 Ok(None)
169 }
170
171 pub fn op_count(&self) -> Location {
174 self.mmr.leaves()
175 }
176
177 pub fn is_empty(&self) -> bool {
179 self.snapshot.keys() == 0
180 }
181
182 pub fn inactivity_floor_loc(&self) -> Location {
185 self.inactivity_floor_loc
186 }
187
188 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
191 self.update_return_loc(key, value).await?;
192
193 Ok(())
194 }
195
196 pub(crate) async fn update_return_loc(
199 &mut self,
200 key: K,
201 value: V,
202 ) -> Result<Option<Location>, Error> {
203 let new_loc = self.op_count();
204 let res = super::update_loc(&mut self.snapshot, &self.log, &key, new_loc).await?;
205
206 let op = Operation::Update(key, value);
207 self.as_shared().apply_op(op).await?;
208 if res.is_some() {
209 self.steps += 1;
210 }
211
212 Ok(res)
213 }
214
215 pub async fn delete(&mut self, key: K) -> Result<Option<Location>, Error> {
219 let r = super::delete_key(&mut self.snapshot, &self.log, &key).await?;
220 if r.is_some() {
221 self.as_shared().apply_op(Operation::Delete(key)).await?;
222 self.steps += 1;
223 };
224
225 Ok(r)
226 }
227
228 pub(crate) fn as_shared(
230 &mut self,
231 ) -> super::Shared<'_, E, Index<T, Location>, Operation<K, V>, H> {
232 super::Shared {
233 snapshot: &mut self.snapshot,
234 mmr: &mut self.mmr,
235 log: &mut self.log,
236 hasher: &mut self.hasher,
237 }
238 }
239
240 pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
246 self.mmr.root(hasher)
247 }
248
249 pub async fn proof(
262 &self,
263 start_loc: Location,
264 max_ops: NonZeroU64,
265 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
266 self.historical_proof(self.op_count(), start_loc, max_ops)
267 .await
268 }
269
270 pub async fn historical_proof(
279 &self,
280 op_count: Location,
281 start_loc: Location,
282 max_ops: NonZeroU64,
283 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
284 historical_proof(&self.mmr, &self.log, op_count, start_loc, max_ops).await
285 }
286
287 pub async fn commit(&mut self) -> Result<(), Error> {
293 if self.is_empty() {
296 self.inactivity_floor_loc = self.op_count();
297 debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
298 } else {
299 let steps_to_take = self.steps + 1;
300 for _ in 0..steps_to_take {
301 let loc = self.inactivity_floor_loc;
302 self.inactivity_floor_loc = self.as_shared().raise_floor(loc).await?;
303 }
304 }
305 self.steps = 0;
306
307 let loc = self.inactivity_floor_loc;
309 let mut shared = self.as_shared();
310 shared.apply_op(Operation::CommitFloor(loc)).await?;
311
312 shared.sync_and_process_updates().await
314 }
315
316 pub async fn sync(&mut self) -> Result<(), Error> {
320 self.as_shared().sync().await
321 }
322
323 pub async fn prune(&mut self, target_prune_loc: Location) -> Result<(), Error> {
333 let op_count = self.op_count();
334 prune_db(
335 &mut self.mmr,
336 &mut self.log,
337 &mut self.hasher,
338 target_prune_loc,
339 self.inactivity_floor_loc,
340 op_count,
341 )
342 .await
343 }
344
345 pub async fn close(mut self) -> Result<(), Error> {
348 try_join!(
349 self.log.close().map_err(Error::Journal),
350 self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
351 )?;
352
353 Ok(())
354 }
355
356 pub async fn destroy(self) -> Result<(), Error> {
358 try_join!(
359 self.log.destroy().map_err(Error::Journal),
360 self.mmr.destroy().map_err(Error::Mmr),
361 )?;
362
363 Ok(())
364 }
365
366 #[cfg(any(test, feature = "fuzzing"))]
370 pub async fn simulate_failure(
371 mut self,
372 sync_log: bool,
373 sync_mmr: bool,
374 write_limit: usize,
375 ) -> Result<(), Error> {
376 if sync_log {
377 self.log.sync().await?;
378 }
379 if sync_mmr {
380 assert_eq!(write_limit, 0);
381 self.mmr.sync(&mut self.hasher).await?;
382 } else if write_limit > 0 {
383 self.mmr
384 .simulate_partial_sync(&mut self.hasher, write_limit)
385 .await?;
386 }
387
388 Ok(())
389 }
390}
391
392impl<
393 E: Storage + Clock + Metrics,
394 K: Array,
395 V: CodecFixed<Cfg = ()>,
396 H: CHasher,
397 T: Translator,
398 > Db<E, K, V, T> for Any<E, K, V, H, T>
399{
400 fn op_count(&self) -> Location {
401 self.op_count()
402 }
403
404 fn inactivity_floor_loc(&self) -> Location {
405 self.inactivity_floor_loc()
406 }
407
408 async fn get(&self, key: &K) -> Result<Option<V>, store::Error> {
409 self.get(key).await.map_err(Into::into)
410 }
411
412 async fn update(&mut self, key: K, value: V) -> Result<(), store::Error> {
413 self.update(key, value).await.map_err(Into::into)
414 }
415
416 async fn delete(&mut self, key: K) -> Result<(), store::Error> {
417 self.delete(key).await.map(|_| ()).map_err(Into::into)
418 }
419
420 async fn commit(&mut self) -> Result<(), store::Error> {
421 self.commit().await.map_err(Into::into)
422 }
423
424 async fn sync(&mut self) -> Result<(), store::Error> {
425 self.sync().await.map_err(Into::into)
426 }
427
428 async fn prune(&mut self, target_prune_loc: Location) -> Result<(), store::Error> {
429 self.prune(target_prune_loc).await.map_err(Into::into)
430 }
431
432 async fn close(self) -> Result<(), store::Error> {
433 self.close().await.map_err(Into::into)
434 }
435
436 async fn destroy(self) -> Result<(), store::Error> {
437 self.destroy().await.map_err(Into::into)
438 }
439}
440
441#[cfg(test)]
443pub(super) mod test {
444 use super::*;
445 use crate::{
446 adb::{
447 operation::fixed::{unordered::Operation, FixedOperation as _},
448 verify_proof,
449 },
450 index::{Index as IndexTrait, Unordered as Index},
451 mmr::{bitmap::BitMap, mem::Mmr as MemMmr, Position, StandardHasher as Standard},
452 translator::TwoCap,
453 };
454 use commonware_codec::{DecodeExt, FixedSize};
455 use commonware_cryptography::{sha256::Digest, Digest as _, Hasher as CHasher, Sha256};
456 use commonware_macros::test_traced;
457 use commonware_runtime::{
458 buffer::PoolRef,
459 deterministic::{self, Context},
460 Runner as _,
461 };
462 use commonware_utils::NZU64;
463 use rand::{
464 rngs::{OsRng, StdRng},
465 RngCore, SeedableRng,
466 };
467 use std::collections::{HashMap, HashSet};
468 use tracing::warn;
469
470 const SHA256_SIZE: usize = <Sha256 as CHasher>::Digest::SIZE;
471
472 const PAGE_SIZE: usize = 101;
474 const PAGE_CACHE_SIZE: usize = 11;
475
476 pub(crate) fn any_db_config(suffix: &str) -> Config<TwoCap> {
477 Config {
478 mmr_journal_partition: format!("journal_{suffix}"),
479 mmr_metadata_partition: format!("metadata_{suffix}"),
480 mmr_items_per_blob: NZU64!(11),
481 mmr_write_buffer: NZUsize!(1024),
482 log_journal_partition: format!("log_journal_{suffix}"),
483 log_items_per_blob: NZU64!(7),
484 log_write_buffer: NZUsize!(1024),
485 translator: TwoCap,
486 thread_pool: None,
487 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
488 }
489 }
490
491 pub(crate) type AnyTest = Any<deterministic::Context, Digest, Digest, Sha256, TwoCap>;
493
494 async fn open_db(context: deterministic::Context) -> AnyTest {
496 AnyTest::init(context, any_db_config("partition"))
497 .await
498 .unwrap()
499 }
500
501 pub(crate) fn create_test_config(seed: u64) -> Config<TwoCap> {
502 Config {
503 mmr_journal_partition: format!("mmr_journal_{seed}"),
504 mmr_metadata_partition: format!("mmr_metadata_{seed}"),
505 mmr_items_per_blob: NZU64!(13), mmr_write_buffer: NZUsize!(64),
507 log_journal_partition: format!("log_journal_{seed}"),
508 log_items_per_blob: NZU64!(11), log_write_buffer: NZUsize!(64),
510 translator: TwoCap,
511 thread_pool: None,
512 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
513 }
514 }
515
516 pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
518 let seed = context.next_u64();
519 let config = create_test_config(seed);
520 AnyTest::init(context, config).await.unwrap()
521 }
522
523 pub(crate) fn create_test_ops(n: usize) -> Vec<Operation<Digest, Digest>> {
526 let mut rng = StdRng::seed_from_u64(1337);
527 let mut prev_key = Digest::random(&mut rng);
528 let mut ops = Vec::new();
529 for i in 0..n {
530 let key = Digest::random(&mut rng);
531 if i % 10 == 0 && i > 0 {
532 ops.push(Operation::Delete(prev_key));
533 } else {
534 let value = Digest::random(&mut rng);
535 ops.push(Operation::Update(key, value));
536 prev_key = key;
537 }
538 }
539 ops
540 }
541
542 pub(crate) async fn apply_ops(db: &mut AnyTest, ops: Vec<Operation<Digest, Digest>>) {
544 for op in ops {
545 match op {
546 Operation::Update(key, value) => {
547 db.update(key, value).await.unwrap();
548 }
549 Operation::Delete(key) => {
550 db.delete(key).await.unwrap();
551 }
552 Operation::CommitFloor(_) => {
553 db.commit().await.unwrap();
554 }
555 }
556 }
557 }
558
559 #[test_traced("INFO")]
560 fn test_any_fixed_db_empty() {
561 let executor = deterministic::Runner::default();
562 executor.start(|context| async move {
563 let mut db = open_db(context.clone()).await;
564 let mut hasher = Standard::<Sha256>::new();
565 assert_eq!(db.op_count(), 0);
566 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
567 let empty_root = db.root(&mut hasher);
568 assert_eq!(empty_root, MemMmr::default().root(&mut hasher));
569
570 let d1 = Sha256::fill(1u8);
573 let d2 = Sha256::fill(2u8);
574 db.update(d1, d2).await.unwrap();
575 let mut db = open_db(context.clone()).await;
576 assert_eq!(db.op_count(), 0);
577 assert_eq!(db.root(&mut hasher), empty_root);
578
579 let empty_proof = Proof::default();
580 assert!(verify_proof(
581 &mut hasher,
582 &empty_proof,
583 Location::new_unchecked(0),
584 &[] as &[Operation<Digest, Digest>],
585 &empty_root
586 ));
587
588 db.commit().await.unwrap();
590 assert_eq!(db.op_count(), 1); let root = db.root(&mut hasher);
592 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
593
594 let mut db = open_db(context.clone()).await;
596 assert_eq!(db.op_count(), 1);
597 assert_eq!(db.root(&mut hasher), root);
598
599 assert!(!verify_proof(
601 &mut hasher,
602 &empty_proof,
603 Location::new_unchecked(0),
604 &[] as &[Operation<Digest, Digest>],
605 &root
606 ));
607
608 db.update(d1, d2).await.unwrap();
611 for _ in 1..100 {
612 db.commit().await.unwrap();
613 assert!(db.op_count() - db.inactivity_floor_loc <= 3);
616 }
617
618 db.delete(d1).await.unwrap();
620 db.commit().await.unwrap();
621 assert!(db.is_empty());
622 assert_eq!(db.op_count() - 1, db.inactivity_floor_loc);
623
624 db.destroy().await.unwrap();
625 });
626 }
627
628 #[test_traced("WARN")]
629 fn test_any_fixed_db_build_basic() {
630 let executor = deterministic::Runner::default();
631 executor.start(|context| async move {
632 let mut hasher = Standard::<Sha256>::new();
635 let mut db = open_db(context.clone()).await;
636
637 let d1 = Sha256::fill(1u8);
638 let d2 = Sha256::fill(2u8);
639
640 assert!(db.get(&d1).await.unwrap().is_none());
641 assert!(db.get(&d2).await.unwrap().is_none());
642
643 db.update(d1, d2).await.unwrap();
644 assert_eq!(db.get(&d1).await.unwrap().unwrap(), d2);
645 assert!(db.get(&d2).await.unwrap().is_none());
646
647 db.update(d2, d1).await.unwrap();
648 assert_eq!(db.get(&d1).await.unwrap().unwrap(), d2);
649 assert_eq!(db.get(&d2).await.unwrap().unwrap(), d1);
650
651 db.delete(d1).await.unwrap(); assert!(db.get(&d1).await.unwrap().is_none());
653 assert_eq!(db.get(&d2).await.unwrap().unwrap(), d1);
654
655 db.update(d1, d1).await.unwrap();
656 assert_eq!(db.get(&d1).await.unwrap().unwrap(), d1);
657
658 db.update(d2, d2).await.unwrap(); assert_eq!(db.get(&d2).await.unwrap().unwrap(), d2);
660
661 assert_eq!(db.log.size().await, 5); assert_eq!(db.snapshot.keys(), 2);
663 assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(0));
664 db.sync().await.unwrap();
665
666 let loc = db.inactivity_floor_loc;
669 db.inactivity_floor_loc = db.as_shared().raise_floor(loc).await.unwrap();
670 assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(4));
671 assert_eq!(db.log.size().await, 6); db.sync().await.unwrap();
673
674 db.delete(d1).await.unwrap();
676 db.delete(d2).await.unwrap();
677 assert!(db.get(&d1).await.unwrap().is_none());
678 assert!(db.get(&d2).await.unwrap().is_none());
679 assert_eq!(db.log.size().await, 8); db.commit().await.unwrap();
682 assert_eq!(db.inactivity_floor_loc, db.op_count() - 1);
685 let root = db.root(&mut hasher);
686
687 db.delete(d1).await.unwrap();
689 assert_eq!(db.log.size().await, 9); assert_eq!(db.root(&mut hasher), root);
691
692 let d3 = <Sha256 as CHasher>::Digest::decode(vec![2u8; SHA256_SIZE].as_ref()).unwrap();
694 assert!(db.delete(d3).await.unwrap().is_none());
695 assert_eq!(db.log.size().await, 9);
696 db.sync().await.unwrap();
697 assert_eq!(db.root(&mut hasher), root);
698
699 assert_eq!(db.log.size().await, 9);
701 let root = db.root(&mut hasher);
702 db.close().await.unwrap();
703 let mut db = open_db(context.clone()).await;
704 assert_eq!(db.log.size().await, 9);
705 assert_eq!(db.root(&mut hasher), root);
706
707 db.update(d1, d1).await.unwrap();
709 db.update(d2, d2).await.unwrap();
710 db.delete(d1).await.unwrap();
711 db.update(d2, d1).await.unwrap();
712 db.update(d1, d2).await.unwrap();
713 assert_eq!(db.snapshot.keys(), 2);
714
715 db.commit().await.unwrap();
717 let root = db.root(&mut hasher);
718 db.close().await.unwrap();
719 let mut db = open_db(context).await;
720 assert_eq!(db.root(&mut hasher), root);
721 assert_eq!(db.snapshot.keys(), 2);
722
723 db.commit().await.unwrap();
726
727 assert!(db.root(&mut hasher) != root);
728
729 let root = db.root(&mut hasher);
731 db.prune(db.inactivity_floor_loc()).await.unwrap();
732 assert_eq!(db.snapshot.keys(), 2);
733 assert_eq!(db.root(&mut hasher), root);
734
735 db.destroy().await.unwrap();
736 });
737 }
738
739 #[test_traced("WARN")]
740 fn test_any_fixed_db_build_and_authenticate() {
741 let executor = deterministic::Runner::default();
742 const ELEMENTS: u64 = 1000;
745 executor.start(|context| async move {
746 let mut hasher = Standard::<Sha256>::new();
747 let mut db = open_db(context.clone()).await;
748
749 let mut map = HashMap::<Digest, Digest>::default();
750 for i in 0u64..ELEMENTS {
751 let k = Sha256::hash(&i.to_be_bytes());
752 let v = Sha256::hash(&(i * 1000).to_be_bytes());
753 db.update(k, v).await.unwrap();
754 map.insert(k, v);
755 }
756
757 for i in 0u64..ELEMENTS {
759 if i % 3 != 0 {
760 continue;
761 }
762 let k = Sha256::hash(&i.to_be_bytes());
763 let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
764 db.update(k, v).await.unwrap();
765 map.insert(k, v);
766 }
767
768 for i in 0u64..ELEMENTS {
770 if i % 7 != 1 {
771 continue;
772 }
773 let k = Sha256::hash(&i.to_be_bytes());
774 db.delete(k).await.unwrap();
775 map.remove(&k);
776 }
777
778 assert_eq!(db.op_count(), 1477);
779 assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(0));
780 assert_eq!(db.log.size().await, 1477);
781 assert_eq!(db.snapshot.items(), 857);
782
783 db.commit().await.unwrap();
785 db.sync().await.unwrap();
786 db.prune(db.inactivity_floor_loc()).await.unwrap();
787 assert_eq!(db.op_count(), 1956);
788 assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(837));
789 assert_eq!(db.snapshot.items(), 857);
790
791 let root = db.root(&mut hasher);
793 db.close().await.unwrap();
794 let mut db = open_db(context.clone()).await;
795 assert_eq!(root, db.root(&mut hasher));
796 assert_eq!(db.op_count(), 1956);
797 assert_eq!(db.inactivity_floor_loc, Location::new_unchecked(837));
798 assert_eq!(db.snapshot.items(), 857);
799
800 for i in 0u64..1000 {
802 let k = Sha256::hash(&i.to_be_bytes());
803 if let Some(map_value) = map.get(&k) {
804 let Some(db_value) = db.get(&k).await.unwrap() else {
805 panic!("key not found in db: {k}");
806 };
807 assert_eq!(*map_value, db_value);
808 } else {
809 assert!(db.get(&k).await.unwrap().is_none());
810 }
811 }
812
813 let max_ops = NZU64!(4);
816 let end_loc = db.op_count();
817 let start_pos = db.mmr.pruned_to_pos();
818 let start_loc = Location::try_from(start_pos).unwrap();
819 db.commit().await.unwrap();
822 let root = db.root(&mut hasher);
823 assert!(start_loc < db.inactivity_floor_loc);
824
825 for loc in *start_loc..*end_loc {
826 let loc = Location::new_unchecked(loc);
827 let (proof, log) = db.proof(loc, max_ops).await.unwrap();
828 assert!(verify_proof(&mut hasher, &proof, loc, &log, &root));
829 }
830
831 db.destroy().await.unwrap();
832 });
833 }
834
835 #[test_traced("WARN")]
838 fn test_any_fixed_non_empty_db_recovery() {
839 let executor = deterministic::Runner::default();
840 executor.start(|context| async move {
841 let mut hasher = Standard::<Sha256>::new();
842 let mut db = open_db(context.clone()).await;
843
844 const ELEMENTS: u64 = 1000;
846 for i in 0u64..ELEMENTS {
847 let k = Sha256::hash(&i.to_be_bytes());
848 let v = Sha256::hash(&(i * 1000).to_be_bytes());
849 db.update(k, v).await.unwrap();
850 }
851 db.commit().await.unwrap();
852 db.prune(db.inactivity_floor_loc()).await.unwrap();
853 let root = db.root(&mut hasher);
854 let op_count = db.op_count();
855 let inactivity_floor_loc = db.inactivity_floor_loc();
856
857 let mut db = open_db(context.clone()).await;
859 assert_eq!(db.op_count(), op_count);
860 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
861 assert_eq!(db.root(&mut hasher), root);
862
863 async fn apply_more_ops(db: &mut AnyTest) {
864 for i in 0u64..ELEMENTS {
865 let k = Sha256::hash(&i.to_be_bytes());
866 let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
867 db.update(k, v).await.unwrap();
868 }
869 }
870
871 apply_more_ops(&mut db).await;
873 db.simulate_failure(false, false, 0).await.unwrap();
874 let mut db = open_db(context.clone()).await;
875 assert_eq!(db.op_count(), op_count);
876 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
877 assert_eq!(db.root(&mut hasher), root);
878
879 apply_more_ops(&mut db).await;
881 db.simulate_failure(true, false, 10).await.unwrap();
882 let mut db = open_db(context.clone()).await;
883 assert_eq!(db.op_count(), op_count);
884 assert_eq!(db.root(&mut hasher), root);
885
886 apply_more_ops(&mut db).await;
888 db.simulate_failure(false, true, 0).await.unwrap();
889 let mut db = open_db(context.clone()).await;
890 assert_eq!(db.op_count(), op_count);
891 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
892 assert_eq!(db.root(&mut hasher), root);
893
894 apply_more_ops(&mut db).await;
896 apply_more_ops(&mut db).await;
897 apply_more_ops(&mut db).await;
898 let mut db = open_db(context.clone()).await;
899 assert_eq!(db.op_count(), op_count);
900 assert_eq!(db.root(&mut hasher), root);
901
902 apply_more_ops(&mut db).await;
904 db.commit().await.unwrap();
905 let db = open_db(context.clone()).await;
906 assert!(db.op_count() > op_count);
907 assert_ne!(db.inactivity_floor_loc(), inactivity_floor_loc);
908 assert_ne!(db.root(&mut hasher), root);
909
910 db.destroy().await.unwrap();
911 });
912 }
913
914 #[test_traced("WARN")]
917 fn test_any_fixed_empty_db_recovery() {
918 let executor = deterministic::Runner::default();
919 executor.start(|context| async move {
920 let mut hasher = Standard::<Sha256>::new();
922 let db = open_db(context.clone()).await;
923 let root = db.root(&mut hasher);
924
925 let mut db = open_db(context.clone()).await;
927 assert_eq!(db.op_count(), 0);
928 assert_eq!(db.root(&mut hasher), root);
929
930 async fn apply_ops(db: &mut AnyTest) {
931 for i in 0u64..1000 {
932 let k = Sha256::hash(&i.to_be_bytes());
933 let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
934 db.update(k, v).await.unwrap();
935 }
936 }
937
938 apply_ops(&mut db).await;
941 db.simulate_failure(false, false, 1).await.unwrap();
942 let mut db = open_db(context.clone()).await;
943 assert_eq!(db.op_count(), 0);
944 assert_eq!(db.root(&mut hasher), root);
945
946 apply_ops(&mut db).await;
948 db.simulate_failure(true, false, 0).await.unwrap();
949 let mut db = open_db(context.clone()).await;
950 assert_eq!(db.op_count(), 0);
951 assert_eq!(db.root(&mut hasher), root);
952
953 apply_ops(&mut db).await;
955 db.simulate_failure(false, true, 0).await.unwrap();
956 let mut db = open_db(context.clone()).await;
957 assert_eq!(db.op_count(), 0);
958 assert_eq!(db.root(&mut hasher), root);
959
960 apply_ops(&mut db).await;
962 apply_ops(&mut db).await;
963 apply_ops(&mut db).await;
964 let mut db = open_db(context.clone()).await;
965 assert_eq!(db.op_count(), 0);
966 assert_eq!(db.root(&mut hasher), root);
967
968 apply_ops(&mut db).await;
970 db.commit().await.unwrap();
971 let db = open_db(context.clone()).await;
972 assert!(db.op_count() > 0);
973 assert_ne!(db.root(&mut hasher), root);
974
975 db.destroy().await.unwrap();
976 });
977 }
978
979 #[test_traced("WARN")]
982 fn test_any_fixed_db_log_replay() {
983 let executor = deterministic::Runner::default();
984 executor.start(|context| async move {
985 let mut hasher = Standard::<Sha256>::new();
986 let mut db = open_db(context.clone()).await;
987
988 const UPDATES: u64 = 100;
990 let k = Sha256::hash(&UPDATES.to_be_bytes());
991 for i in 0u64..UPDATES {
992 let v = Sha256::hash(&(i * 1000).to_be_bytes());
993 db.update(k, v).await.unwrap();
994 }
995 db.commit().await.unwrap();
996 let root = db.root(&mut hasher);
997 db.close().await.unwrap();
998
999 let db = open_db(context.clone()).await;
1001 let iter = db.snapshot.get(&k);
1002 assert_eq!(iter.cloned().collect::<Vec<_>>().len(), 1);
1003 assert_eq!(db.root(&mut hasher), root);
1004
1005 db.destroy().await.unwrap();
1006 });
1007 }
1008
1009 #[test_traced("WARN")]
1010 fn test_any_fixed_db_multiple_commits_delete_gets_replayed() {
1011 let executor = deterministic::Runner::default();
1012 executor.start(|context| async move {
1013 let mut hasher = Standard::<Sha256>::new();
1014 let mut db = open_db(context.clone()).await;
1015
1016 let mut map = HashMap::<Digest, Digest>::default();
1017 const ELEMENTS: u64 = 10;
1018 for j in 0u64..ELEMENTS {
1020 for i in 0u64..ELEMENTS {
1021 let k = Sha256::hash(&(j * 1000 + i).to_be_bytes());
1022 let v = Sha256::hash(&(i * 1000).to_be_bytes());
1023 db.update(k, v).await.unwrap();
1024 map.insert(k, v);
1025 }
1026 db.commit().await.unwrap();
1027 }
1028 let k = Sha256::hash(&((ELEMENTS - 1) * 1000 + (ELEMENTS - 1)).to_be_bytes());
1029
1030 db.delete(k).await.unwrap();
1033 db.commit().await.unwrap();
1034 assert!(db.get(&k).await.unwrap().is_none());
1035
1036 let root = db.root(&mut hasher);
1038 db.close().await.unwrap();
1039 let db = open_db(context.clone()).await;
1040 assert_eq!(root, db.root(&mut hasher));
1041 assert!(db.get(&k).await.unwrap().is_none());
1042
1043 db.destroy().await.unwrap();
1044 });
1045 }
1046
1047 #[test_traced("WARN")]
1050 fn test_any_fixed_db_build_snapshot_with_bitmap() {
1051 const ELEMENTS: u64 = 1000;
1053
1054 let rng_seed = OsRng.next_u64();
1056 warn!("rng_seed={}", rng_seed);
1058 let mut rng = StdRng::seed_from_u64(rng_seed);
1059
1060 let executor = deterministic::Runner::default();
1061 executor.start(|context| async move {
1062 let mut hasher = Standard::<Sha256>::new();
1063 let mut db = open_db(context.clone()).await;
1064
1065 for i in 0u64..ELEMENTS {
1066 let k = Sha256::hash(&i.to_be_bytes());
1067 let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1068 db.update(k, v).await.unwrap();
1069 }
1070
1071 for _ in 0u64..ELEMENTS * 10 {
1074 let rand_key = Sha256::hash(&(rng.next_u64() % ELEMENTS).to_be_bytes());
1075 if rng.next_u32() % 7 == 0 {
1076 db.delete(rand_key).await.unwrap();
1077 continue;
1078 }
1079 let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1080 db.update(rand_key, v).await.unwrap();
1081 if rng.next_u32() % 20 == 0 {
1082 db.commit().await.unwrap();
1084 }
1085 }
1086 db.commit().await.unwrap();
1087
1088 let root = db.root(&mut hasher);
1089 let inactivity_floor_loc = db.inactivity_floor_loc;
1090
1091 db.close().await.unwrap();
1093 let mut bitmap = BitMap::<_, SHA256_SIZE>::new();
1095 for _ in 0..*inactivity_floor_loc {
1096 bitmap.push(false);
1097 }
1098 bitmap.merkleize(&mut hasher).await.unwrap();
1099
1100 let cfg = any_db_config("partition");
1102 let (inactivity_floor_loc, mmr, log) =
1103 init_mmr_and_log(context.clone(), cfg, &mut hasher)
1104 .await
1105 .unwrap();
1106
1107 let mut snapshot = Index::init(context.with_label("snapshot"), TwoCap);
1109 AnyTest::build_snapshot_from_log(
1110 inactivity_floor_loc,
1111 &log,
1112 &mut snapshot,
1113 |append, loc| {
1114 bitmap.push(append);
1115 if let Some(loc) = loc {
1116 bitmap.set_bit(*loc, false);
1117 }
1118 },
1119 )
1120 .await
1121 .unwrap();
1122
1123 let db = AnyTest {
1125 mmr,
1126 log,
1127 snapshot,
1128 inactivity_floor_loc,
1129 steps: 0,
1130 hasher: Standard::<Sha256>::new(),
1131 };
1132 assert_eq!(db.root(&mut hasher), root);
1133
1134 let items = db.log.size().await;
1136 assert_eq!(bitmap.len(), items);
1137 let mut active_positions = HashSet::new();
1138 for pos in *db.inactivity_floor_loc..items {
1140 let item = db.log.read(pos).await.unwrap();
1141 let Some(item_key) = item.key() else {
1142 continue;
1144 };
1145 let iter = db.snapshot.get(item_key);
1146 for loc in iter {
1147 if *loc == pos {
1148 active_positions.insert(pos);
1150 assert!(bitmap.get_bit(pos));
1151 break;
1152 }
1153 }
1154 }
1155 for pos in *db.inactivity_floor_loc..items - 1 {
1157 assert_eq!(bitmap.get_bit(pos), active_positions.contains(&pos));
1158 }
1159 assert!(bitmap.get_bit(items - 1)); db.destroy().await.unwrap();
1162 });
1163 }
1164
1165 #[test]
1166 fn test_any_fixed_db_historical_proof_basic() {
1167 let executor = deterministic::Runner::default();
1168 executor.start(|context| async move {
1169 let mut db = create_test_db(context.clone()).await;
1170 let ops = create_test_ops(20);
1171 apply_ops(&mut db, ops.clone()).await;
1172 db.commit().await.unwrap();
1173 let mut hasher = Standard::<Sha256>::new();
1174 let root_hash = db.root(&mut hasher);
1175 let original_op_count = db.op_count();
1176
1177 let max_ops = NZU64!(10);
1179 let (historical_proof, historical_ops) = db
1180 .historical_proof(original_op_count, Location::new_unchecked(5), max_ops)
1181 .await
1182 .unwrap();
1183 let (regular_proof, regular_ops) =
1184 db.proof(Location::new_unchecked(5), max_ops).await.unwrap();
1185
1186 assert_eq!(historical_proof.size, regular_proof.size);
1187 assert_eq!(historical_proof.digests, regular_proof.digests);
1188 assert_eq!(historical_ops, regular_ops);
1189 assert_eq!(historical_ops, ops[5..15]);
1190 assert!(verify_proof(
1191 &mut hasher,
1192 &historical_proof,
1193 Location::new_unchecked(5),
1194 &historical_ops,
1195 &root_hash
1196 ));
1197
1198 let more_ops = create_test_ops(5);
1200 apply_ops(&mut db, more_ops.clone()).await;
1201 db.commit().await.unwrap();
1202
1203 let (historical_proof, historical_ops) = db
1205 .historical_proof(original_op_count, Location::new_unchecked(5), NZU64!(10))
1206 .await
1207 .unwrap();
1208 assert_eq!(
1209 historical_proof.size,
1210 Position::try_from(original_op_count).unwrap()
1211 );
1212 assert_eq!(historical_proof.size, regular_proof.size);
1213 assert_eq!(historical_ops.len(), 10);
1214 assert_eq!(historical_proof.digests, regular_proof.digests);
1215 assert_eq!(historical_ops, regular_ops);
1216 assert!(verify_proof(
1217 &mut hasher,
1218 &historical_proof,
1219 Location::new_unchecked(5),
1220 &historical_ops,
1221 &root_hash
1222 ));
1223
1224 assert!(matches!(
1227 db.historical_proof(db.op_count() + 1, Location::new_unchecked(5), NZU64!(10))
1228 .await,
1229 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1230 ));
1231
1232 db.destroy().await.unwrap();
1233 });
1234 }
1235
1236 #[test]
1237 fn test_any_fixed_db_historical_proof_edge_cases() {
1238 let executor = deterministic::Runner::default();
1239 executor.start(|context| async move {
1240 let mut db = create_test_db(context.clone()).await;
1241 let ops = create_test_ops(50);
1242 apply_ops(&mut db, ops.clone()).await;
1243 db.commit().await.unwrap();
1244
1245 let mut hasher = Standard::<Sha256>::new();
1246
1247 let (single_proof, single_ops) = db
1249 .historical_proof(
1250 Location::new_unchecked(1),
1251 Location::new_unchecked(0),
1252 NZU64!(1),
1253 )
1254 .await
1255 .unwrap();
1256 assert_eq!(
1257 single_proof.size,
1258 Position::try_from(Location::new_unchecked(1)).unwrap()
1259 );
1260 assert_eq!(single_ops.len(), 1);
1261
1262 let mut single_db = create_test_db(context.clone()).await;
1264 apply_ops(&mut single_db, ops[0..1].to_vec()).await;
1265 single_db.sync().await.unwrap();
1267 let single_root = single_db.root(&mut hasher);
1268
1269 assert!(verify_proof(
1270 &mut hasher,
1271 &single_proof,
1272 Location::new_unchecked(0),
1273 &single_ops,
1274 &single_root
1275 ));
1276
1277 let (_limited_proof, limited_ops) = db
1279 .historical_proof(
1280 Location::new_unchecked(10),
1281 Location::new_unchecked(5),
1282 NZU64!(20),
1283 )
1284 .await
1285 .unwrap();
1286 assert_eq!(limited_ops.len(), 5); assert_eq!(limited_ops, ops[5..10]);
1288
1289 let (min_proof, min_ops) = db
1291 .historical_proof(
1292 Location::new_unchecked(3),
1293 Location::new_unchecked(0),
1294 NZU64!(3),
1295 )
1296 .await
1297 .unwrap();
1298 assert_eq!(
1299 min_proof.size,
1300 Position::try_from(Location::new_unchecked(3)).unwrap()
1301 );
1302 assert_eq!(min_ops.len(), 3);
1303 assert_eq!(min_ops, ops[0..3]);
1304
1305 single_db.destroy().await.unwrap();
1306 db.destroy().await.unwrap();
1307 });
1308 }
1309
1310 #[test]
1311 fn test_any_fixed_db_historical_proof_different_historical_sizes() {
1312 let executor = deterministic::Runner::default();
1313 executor.start(|context| async move {
1314 let mut db = create_test_db(context.clone()).await;
1315 let ops = create_test_ops(100);
1316 apply_ops(&mut db, ops.clone()).await;
1317 db.commit().await.unwrap();
1318
1319 let mut hasher = Standard::<Sha256>::new();
1320
1321 let start_loc = Location::new_unchecked(20);
1323 let max_ops = NZU64!(10);
1324 for end_loc in 31..50 {
1325 let end_loc = Location::new_unchecked(end_loc);
1326 let (historical_proof, historical_ops) = db
1327 .historical_proof(end_loc, start_loc, max_ops)
1328 .await
1329 .unwrap();
1330
1331 assert_eq!(historical_proof.size, Position::try_from(end_loc).unwrap());
1332
1333 let mut ref_db = create_test_db(context.clone()).await;
1335 apply_ops(&mut ref_db, ops[0..*end_loc as usize].to_vec()).await;
1336 ref_db.sync().await.unwrap();
1338
1339 let (ref_proof, ref_ops) = ref_db.proof(start_loc, max_ops).await.unwrap();
1340 assert_eq!(ref_proof.size, historical_proof.size);
1341 assert_eq!(ref_ops, historical_ops);
1342 assert_eq!(ref_proof.digests, historical_proof.digests);
1343 let end_loc = std::cmp::min(start_loc.checked_add(max_ops.get()).unwrap(), end_loc);
1344 assert_eq!(ref_ops, ops[*start_loc as usize..*end_loc as usize]);
1345
1346 let ref_root = ref_db.root(&mut hasher);
1348 assert!(verify_proof(
1349 &mut hasher,
1350 &historical_proof,
1351 start_loc,
1352 &historical_ops,
1353 &ref_root
1354 ),);
1355
1356 ref_db.destroy().await.unwrap();
1357 }
1358
1359 db.destroy().await.unwrap();
1360 });
1361 }
1362
1363 #[test]
1364 fn test_any_fixed_db_historical_proof_invalid() {
1365 let executor = deterministic::Runner::default();
1366 executor.start(|context| async move {
1367 let mut db = create_test_db(context.clone()).await;
1368 let ops = create_test_ops(10);
1369 apply_ops(&mut db, ops).await;
1370 db.commit().await.unwrap();
1371
1372 let historical_op_count = Location::new_unchecked(5);
1373 let historical_mmr_size = Position::try_from(historical_op_count).unwrap();
1374 let (proof, ops) = db
1375 .historical_proof(historical_op_count, Location::new_unchecked(1), NZU64!(10))
1376 .await
1377 .unwrap();
1378 assert_eq!(proof.size, historical_mmr_size);
1379 assert_eq!(ops.len(), 4);
1380
1381 let mut hasher = Standard::<Sha256>::new();
1382
1383 {
1385 let mut proof = proof.clone();
1386 proof.digests[0] = Sha256::hash(b"invalid");
1387 let root_hash = db.root(&mut hasher);
1388 assert!(!verify_proof(
1389 &mut hasher,
1390 &proof,
1391 Location::new_unchecked(0),
1392 &ops,
1393 &root_hash
1394 ));
1395 }
1396 {
1397 let mut proof = proof.clone();
1398 proof.digests.push(Sha256::hash(b"invalid"));
1399 let root_hash = db.root(&mut hasher);
1400 assert!(!verify_proof(
1401 &mut hasher,
1402 &proof,
1403 Location::new_unchecked(0),
1404 &ops,
1405 &root_hash
1406 ));
1407 }
1408
1409 {
1411 let mut ops = ops.clone();
1412 ops[0] = Operation::Update(Sha256::hash(b"key1"), Sha256::hash(b"value1"));
1413 let root_hash = db.root(&mut hasher);
1414 assert!(!verify_proof(
1415 &mut hasher,
1416 &proof,
1417 Location::new_unchecked(0),
1418 &ops,
1419 &root_hash
1420 ));
1421 }
1422 {
1423 let mut ops = ops.clone();
1424 ops.push(Operation::Update(
1425 Sha256::hash(b"key1"),
1426 Sha256::hash(b"value1"),
1427 ));
1428 let root_hash = db.root(&mut hasher);
1429 assert!(!verify_proof(
1430 &mut hasher,
1431 &proof,
1432 Location::new_unchecked(0),
1433 &ops,
1434 &root_hash
1435 ));
1436 }
1437
1438 {
1440 let root_hash = db.root(&mut hasher);
1441 assert!(!verify_proof(
1442 &mut hasher,
1443 &proof,
1444 Location::new_unchecked(1),
1445 &ops,
1446 &root_hash
1447 ));
1448 }
1449
1450 {
1452 assert!(!verify_proof(
1453 &mut hasher,
1454 &proof,
1455 Location::new_unchecked(0),
1456 &ops,
1457 &Sha256::hash(b"invalid")
1458 ));
1459 }
1460
1461 {
1463 let mut proof = proof.clone();
1464 proof.size = Position::new(100);
1465 let root_hash = db.root(&mut hasher);
1466 assert!(!verify_proof(
1467 &mut hasher,
1468 &proof,
1469 Location::new_unchecked(0),
1470 &ops,
1471 &root_hash
1472 ));
1473 }
1474
1475 db.destroy().await.unwrap();
1476 });
1477 }
1478}