1use crate::{
10 bitmap::{CleanBitMap, DirtyBitMap},
11 mmr::{
12 mem::{Clean, Dirty, State},
13 Location, Proof, StandardHasher,
14 },
15 qmdb::{
16 any::{
17 unordered::{
18 fixed::{Db as AnyDb, Operation},
19 Update,
20 },
21 CleanAny, DirtyAny, FixedValue,
22 },
23 current::{
24 merkleize_grafted_bitmap,
25 proof::{OperationProof, RangeProof},
26 root, FixedConfig as Config,
27 },
28 store::{Batchable, CleanStore, DirtyStore, LogStore},
29 Error,
30 },
31 translator::Translator,
32 AuthenticatedBitMap as BitMap,
33};
34use commonware_codec::FixedSize;
35use commonware_cryptography::{DigestOf, Hasher};
36use commonware_runtime::{Clock, Metrics, Storage as RStorage};
37use commonware_utils::Array;
38use core::ops::Range;
39use std::num::NonZeroU64;
40
41pub type KeyValueProof<D, const N: usize> = OperationProof<D, N>;
43
44pub struct Db<
51 E: RStorage + Clock + Metrics,
52 K: Array,
53 V: FixedValue,
54 H: Hasher,
55 T: Translator,
56 const N: usize,
57 S: State<DigestOf<H>> = Clean<DigestOf<H>>,
58> {
59 any: AnyDb<E, K, V, H, T, S>,
62
63 status: BitMap<H::Digest, N, S>,
66
67 context: E,
68
69 bitmap_metadata_partition: String,
70
71 cached_root: Option<H::Digest>,
73}
74
75impl<
76 E: RStorage + Clock + Metrics,
77 K: Array,
78 V: FixedValue,
79 H: Hasher,
80 T: Translator,
81 const N: usize,
82 S: State<DigestOf<H>>,
83 > Db<E, K, V, H, T, N, S>
84{
85 pub fn op_count(&self) -> Location {
88 self.any.op_count()
89 }
90
91 pub const fn inactivity_floor_loc(&self) -> Location {
94 self.any.inactivity_floor_loc()
95 }
96
97 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
99 self.any.get(key).await
100 }
101
102 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
104 self.any.get_metadata().await
105 }
106
107 const fn grafting_height() -> u32 {
112 CleanBitMap::<H::Digest, N>::CHUNK_SIZE_BITS.trailing_zeros()
113 }
114
115 pub fn verify_key_value_proof(
118 hasher: &mut H,
119 key: K,
120 value: V,
121 proof: &KeyValueProof<H::Digest, N>,
122 root: &H::Digest,
123 ) -> bool {
124 let op = Operation::Update(Update(key, value));
125
126 proof.verify(hasher, Self::grafting_height(), op, root)
127 }
128
129 pub fn verify_range_proof(
132 hasher: &mut H,
133 proof: &RangeProof<H::Digest>,
134 start_loc: Location,
135 ops: &[Operation<K, V>],
136 chunks: &[[u8; N]],
137 root: &H::Digest,
138 ) -> bool {
139 let height = Self::grafting_height();
140
141 proof.verify(hasher, height, start_loc, ops, chunks, root)
142 }
143}
144
145impl<
146 E: RStorage + Clock + Metrics,
147 K: Array,
148 V: FixedValue,
149 H: Hasher,
150 T: Translator,
151 const N: usize,
152 > Db<E, K, V, H, T, N>
153{
154 pub async fn init(context: E, config: Config<T>) -> Result<Self, Error> {
157 const {
159 assert!(
163 N.is_multiple_of(H::Digest::SIZE),
164 "chunk size must be some multiple of the digest size",
165 );
166 assert!(N.is_power_of_two(), "chunk size must be a power of 2");
169 }
170
171 let thread_pool = config.thread_pool.clone();
172 let bitmap_metadata_partition = config.bitmap_metadata_partition.clone();
173
174 let mut hasher = StandardHasher::<H>::new();
175 let mut status = CleanBitMap::restore_pruned(
176 context.with_label("bitmap"),
177 &bitmap_metadata_partition,
178 thread_pool,
179 &mut hasher,
180 )
181 .await?
182 .into_dirty();
183
184 let last_known_inactivity_floor = Location::new_unchecked(status.len());
186 let any = AnyDb::init_with_callback(
187 context.with_label("any"),
188 config.to_any_config(),
189 Some(last_known_inactivity_floor),
190 |append: bool, loc: Option<Location>| {
191 status.push(append);
192 if let Some(loc) = loc {
193 status.set_bit(*loc, false);
194 }
195 },
196 )
197 .await?;
198
199 let height = Self::grafting_height();
200 let status = merkleize_grafted_bitmap(&mut hasher, status, &any.log.mmr, height).await?;
201
202 let cached_root = Some(root(&mut hasher, height, &status, &any.log.mmr).await?);
204
205 Ok(Self {
206 any,
207 status,
208 context,
209 bitmap_metadata_partition,
210 cached_root,
211 })
212 }
213
214 pub const fn root(&self) -> H::Digest {
216 self.cached_root.expect("Clean state must have cached root")
217 }
218
219 pub async fn range_proof(
229 &self,
230 hasher: &mut H,
231 start_loc: Location,
232 max_ops: NonZeroU64,
233 ) -> Result<(RangeProof<H::Digest>, Vec<Operation<K, V>>, Vec<[u8; N]>), Error> {
234 RangeProof::<H::Digest>::new_with_ops(
235 hasher,
236 &self.status,
237 Self::grafting_height(),
238 &self.any.log.mmr,
239 &self.any.log,
240 start_loc,
241 max_ops,
242 )
243 .await
244 }
245
246 pub async fn key_value_proof(
254 &self,
255 hasher: &mut H,
256 key: K,
257 ) -> Result<KeyValueProof<H::Digest, N>, Error> {
258 let op_loc = self.any.get_with_loc(&key).await?;
259 let Some((_, loc)) = op_loc else {
260 return Err(Error::KeyNotFound);
261 };
262 let height = Self::grafting_height();
263 let mmr = &self.any.log.mmr;
264
265 OperationProof::<H::Digest, N>::new(hasher, &self.status, height, mmr, loc).await
266 }
267
268 #[cfg(test)]
269 fn simulate_commit_failure_before_any_writes(self) {
272 }
274
275 #[cfg(test)]
276 async fn simulate_commit_failure_after_any_db_commit(mut self) -> Result<(), Error> {
279 let _ = self.commit_to_log(None).await?;
281 Ok(())
282 }
283
284 async fn commit_to_log(
290 &mut self,
291 metadata: Option<V>,
292 ) -> Result<DirtyBitMap<H::Digest, N>, Error> {
293 let empty_status = CleanBitMap::<H::Digest, N>::new(&mut self.any.log.hasher, None);
294 let mut status = std::mem::replace(&mut self.status, empty_status).into_dirty();
295
296 status.set_bit(*self.any.last_commit_loc, false);
298
299 let inactivity_floor_loc = self.any.raise_floor_with_bitmap(&mut status).await?;
302
303 status.push(true);
305 let commit_op = Operation::CommitFloor(metadata, inactivity_floor_loc);
306
307 self.any.apply_commit_op(commit_op).await?;
308
309 Ok(status)
310 }
311
312 pub async fn commit(&mut self, metadata: Option<V>) -> Result<Range<Location>, Error> {
316 let start_loc = self.any.last_commit_loc + 1;
317
318 let status = self.commit_to_log(metadata).await?;
320
321 let mmr = &self.any.log.mmr;
323 let height = Self::grafting_height();
324 self.status =
325 merkleize_grafted_bitmap(&mut self.any.log.hasher, status, mmr, height).await?;
326
327 self.status.prune_to_bit(*self.any.inactivity_floor_loc())?;
329
330 self.cached_root = Some(root(&mut self.any.log.hasher, height, &self.status, mmr).await?);
332
333 Ok(start_loc..self.op_count())
334 }
335
336 pub async fn sync(&mut self) -> Result<(), Error> {
338 self.any.sync().await?;
339
340 self.status
343 .write_pruned(
344 self.context.with_label("bitmap"),
345 &self.bitmap_metadata_partition,
346 )
347 .await
348 .map_err(Into::into)
349 }
350
351 pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
354 self.status
358 .write_pruned(
359 self.context.with_label("bitmap"),
360 &self.bitmap_metadata_partition,
361 )
362 .await?;
363
364 self.any.prune(prune_loc).await
365 }
366
367 pub async fn close(self) -> Result<(), Error> {
370 self.any.close().await
371 }
372
373 pub async fn destroy(self) -> Result<(), Error> {
375 CleanBitMap::<H::Digest, N>::destroy(self.context, &self.bitmap_metadata_partition).await?;
377
378 self.any.destroy().await
380 }
381
382 pub fn into_dirty(self) -> Db<E, K, V, H, T, N, Dirty> {
384 Db {
385 any: self.any.into_dirty(),
386 status: self.status.into_dirty(),
387 context: self.context,
388 bitmap_metadata_partition: self.bitmap_metadata_partition,
389 cached_root: None,
390 }
391 }
392}
393
394impl<
395 E: RStorage + Clock + Metrics,
396 K: Array,
397 V: FixedValue,
398 H: Hasher,
399 T: Translator,
400 const N: usize,
401 > Db<E, K, V, H, T, N, Dirty>
402{
403 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
406 if let Some(old_loc) = self.any.update_key(key, value).await? {
407 self.status.set_bit(*old_loc, false);
408 }
409 self.status.push(true);
410
411 Ok(())
412 }
413
414 pub async fn create(&mut self, key: K, value: V) -> Result<bool, Error> {
418 if !self.any.create(key, value).await? {
419 return Ok(false);
420 }
421 self.status.push(true);
422
423 Ok(true)
424 }
425
426 pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
430 let Some(loc) = self.any.delete_key(key).await? else {
431 return Ok(false);
432 };
433
434 self.status.push(false);
435 self.status.set_bit(*loc, false);
436
437 Ok(true)
438 }
439
440 pub async fn merkleize(self) -> Result<Db<E, K, V, H, T, N, Clean<DigestOf<H>>>, Error> {
444 let clean_any = self.any.merkleize();
446
447 let mut hasher = StandardHasher::<H>::new();
449 let height = Self::grafting_height();
450 let status =
451 merkleize_grafted_bitmap(&mut hasher, self.status, &clean_any.log.mmr, height).await?;
452
453 let cached_root = Some(root(&mut hasher, height, &status, &clean_any.log.mmr).await?);
455
456 Ok(Db {
457 any: clean_any,
458 status,
459 context: self.context,
460 bitmap_metadata_partition: self.bitmap_metadata_partition,
461 cached_root,
462 })
463 }
464}
465
466impl<
467 E: RStorage + Clock + Metrics,
468 K: Array,
469 V: FixedValue,
470 H: Hasher,
471 T: Translator,
472 const N: usize,
473 > crate::qmdb::store::LogStorePrunable for Db<E, K, V, H, T, N>
474{
475 async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
476 self.prune(prune_loc).await
477 }
478}
479
480impl<
481 E: RStorage + Clock + Metrics,
482 K: Array,
483 V: FixedValue,
484 H: Hasher,
485 T: Translator,
486 const N: usize,
487 S: State<DigestOf<H>>,
488 > LogStore for Db<E, K, V, H, T, N, S>
489{
490 type Value = V;
491
492 fn op_count(&self) -> Location {
493 self.op_count()
494 }
495
496 fn inactivity_floor_loc(&self) -> Location {
497 self.inactivity_floor_loc()
498 }
499
500 async fn get_metadata(&self) -> Result<Option<V>, Error> {
501 self.get_metadata().await
502 }
503
504 fn is_empty(&self) -> bool {
505 self.any.is_empty()
506 }
507}
508
509impl<
510 E: RStorage + Clock + Metrics,
511 K: Array,
512 V: FixedValue,
513 H: Hasher,
514 T: Translator,
515 const N: usize,
516 S: State<DigestOf<H>>,
517 > crate::store::Store for Db<E, K, V, H, T, N, S>
518{
519 type Key = K;
520 type Value = V;
521 type Error = Error;
522
523 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
524 self.get(key).await
525 }
526}
527
528impl<
529 E: RStorage + Clock + Metrics,
530 K: Array,
531 V: FixedValue,
532 H: Hasher,
533 T: Translator,
534 const N: usize,
535 > crate::store::StoreMut for Db<E, K, V, H, T, N, Dirty>
536{
537 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
538 self.update(key, value).await
539 }
540}
541
542impl<
543 E: RStorage + Clock + Metrics,
544 K: Array,
545 V: FixedValue,
546 H: Hasher,
547 T: Translator,
548 const N: usize,
549 > crate::store::StoreDeletable for Db<E, K, V, H, T, N, Dirty>
550{
551 async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
552 self.delete(key).await
553 }
554}
555
556impl<
557 E: RStorage + Clock + Metrics,
558 K: Array,
559 V: FixedValue,
560 H: Hasher,
561 T: Translator,
562 const N: usize,
563 > CleanStore for Db<E, K, V, H, T, N, Clean<DigestOf<H>>>
564{
565 type Digest = H::Digest;
566 type Operation = Operation<K, V>;
567 type Dirty = Db<E, K, V, H, T, N, Dirty>;
568
569 fn root(&self) -> Self::Digest {
570 self.root()
571 }
572
573 async fn proof(
574 &self,
575 start_loc: Location,
576 max_ops: NonZeroU64,
577 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
578 self.any.proof(start_loc, max_ops).await
579 }
580
581 async fn historical_proof(
582 &self,
583 historical_size: Location,
584 start_loc: Location,
585 max_ops: NonZeroU64,
586 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
587 self.any
588 .historical_proof(historical_size, start_loc, max_ops)
589 .await
590 }
591
592 fn into_dirty(self) -> Self::Dirty {
593 self.into_dirty()
594 }
595}
596
597impl<E, K, V, T, H, const N: usize> Batchable for Db<E, K, V, H, T, N, Dirty>
598where
599 E: RStorage + Clock + Metrics,
600 K: Array,
601 V: FixedValue,
602 T: Translator,
603 H: Hasher,
604{
605 async fn write_batch(
606 &mut self,
607 iter: impl Iterator<Item = (K, Option<V>)>,
608 ) -> Result<(), Error> {
609 let status = &mut self.status;
610 self.any
611 .write_batch_with_callback(iter, move |append: bool, loc: Option<Location>| {
612 status.push(append);
613 if let Some(loc) = loc {
614 status.set_bit(*loc, false);
615 }
616 })
617 .await
618 }
619}
620
621impl<
622 E: RStorage + Clock + Metrics,
623 K: Array,
624 V: FixedValue,
625 H: Hasher,
626 T: Translator,
627 const N: usize,
628 > DirtyStore for Db<E, K, V, H, T, N, Dirty>
629{
630 type Digest = H::Digest;
631 type Operation = Operation<K, V>;
632 type Clean = Db<E, K, V, H, T, N, Clean<DigestOf<H>>>;
633
634 async fn merkleize(self) -> Result<Self::Clean, Error> {
635 self.merkleize().await
636 }
637}
638
639impl<
640 E: RStorage + Clock + Metrics,
641 K: Array,
642 V: FixedValue,
643 H: Hasher,
644 T: Translator,
645 const N: usize,
646 > CleanAny for Db<E, K, V, H, T, N, Clean<DigestOf<H>>>
647{
648 type Key = K;
649
650 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Error> {
651 self.get(key).await
652 }
653
654 async fn commit(&mut self, metadata: Option<Self::Value>) -> Result<Range<Location>, Error> {
655 self.commit(metadata).await
656 }
657
658 async fn sync(&mut self) -> Result<(), Error> {
659 self.sync().await
660 }
661
662 async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
663 self.prune(prune_loc).await
664 }
665
666 async fn close(self) -> Result<(), Error> {
667 self.close().await
668 }
669
670 async fn destroy(self) -> Result<(), Error> {
671 self.destroy().await
672 }
673}
674
675impl<
676 E: RStorage + Clock + Metrics,
677 K: Array,
678 V: FixedValue,
679 H: Hasher,
680 T: Translator,
681 const N: usize,
682 > DirtyAny for Db<E, K, V, H, T, N, Dirty>
683{
684 type Key = K;
685
686 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Error> {
687 self.get(key).await
688 }
689
690 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Error> {
691 self.update(key, value).await
692 }
693
694 async fn create(&mut self, key: Self::Key, value: Self::Value) -> Result<bool, Error> {
695 self.create(key, value).await
696 }
697
698 async fn delete(&mut self, key: Self::Key) -> Result<bool, Error> {
699 self.delete(key).await
700 }
701}
702
703#[cfg(test)]
704pub mod test {
705 use super::*;
706 use crate::{
707 index::Unordered as _,
708 mmr::hasher::Hasher as _,
709 qmdb::{any::AnyExt, store::batch_tests},
710 translator::TwoCap,
711 };
712 use commonware_cryptography::{sha256::Digest, Sha256};
713 use commonware_macros::test_traced;
714 use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _};
715 use commonware_utils::{NZUsize, NZU64};
716 use rand::{rngs::StdRng, RngCore, SeedableRng};
717 use std::collections::HashMap;
718 use tracing::warn;
719
720 const PAGE_SIZE: usize = 88;
721 const PAGE_CACHE_SIZE: usize = 8;
722
723 fn current_db_config(partition_prefix: &str) -> Config<TwoCap> {
724 Config {
725 mmr_journal_partition: format!("{partition_prefix}_journal_partition"),
726 mmr_metadata_partition: format!("{partition_prefix}_metadata_partition"),
727 mmr_items_per_blob: NZU64!(11),
728 mmr_write_buffer: NZUsize!(1024),
729 log_journal_partition: format!("{partition_prefix}_partition_prefix"),
730 log_items_per_blob: NZU64!(7),
731 log_write_buffer: NZUsize!(1024),
732 bitmap_metadata_partition: format!("{partition_prefix}_bitmap_metadata_partition"),
733 translator: TwoCap,
734 thread_pool: None,
735 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
736 }
737 }
738
739 type CleanCurrentTest = Db<deterministic::Context, Digest, Digest, Sha256, TwoCap, 32>;
741
742 type DirtyCurrentTest = Db<deterministic::Context, Digest, Digest, Sha256, TwoCap, 32, Dirty>;
744
745 async fn open_db(context: deterministic::Context, partition_prefix: &str) -> CleanCurrentTest {
747 CleanCurrentTest::init(context, current_db_config(partition_prefix))
748 .await
749 .unwrap()
750 }
751
752 #[test_traced("DEBUG")]
754 pub fn test_current_db_build_small_close_reopen() {
755 let executor = deterministic::Runner::default();
756 executor.start(|context| async move {
757 let partition = "build_small";
758 let db = open_db(context.clone(), partition).await;
759 assert_eq!(db.op_count(), 1);
760 assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(0));
761 let root0 = db.root();
762 db.close().await.unwrap();
763 let db = open_db(context.clone(), partition).await;
764 assert_eq!(db.op_count(), 1);
765 assert!(db.get_metadata().await.unwrap().is_none());
766 assert_eq!(db.root(), root0);
767
768 let k1 = Sha256::hash(&0u64.to_be_bytes());
770 let v1 = Sha256::hash(&10u64.to_be_bytes());
771 let mut db = db.into_dirty();
772 assert!(db.create(k1, v1).await.unwrap());
773 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
774 let mut db = db.merkleize().await.unwrap();
775 let range = db.commit(None).await.unwrap();
776 assert_eq!(range.start, 1);
777 assert_eq!(range.end, 4);
778 assert!(db.get_metadata().await.unwrap().is_none());
779 assert_eq!(db.op_count(), 4); let root1 = db.root();
781 assert!(root1 != root0);
782 db.close().await.unwrap();
783 let db = open_db(context.clone(), partition).await;
784 assert_eq!(db.op_count(), 4); assert!(db.get_metadata().await.unwrap().is_none());
786 assert_eq!(db.root(), root1);
787
788 let mut db = db.into_dirty();
790 assert!(!db.create(k1, v1).await.unwrap());
791
792 assert!(db.delete(k1).await.unwrap());
794 let metadata = Sha256::hash(&1u64.to_be_bytes());
795 let mut db = db.merkleize().await.unwrap();
796 let range = db.commit(Some(metadata)).await.unwrap();
797 assert_eq!(range.start, 4);
798 assert_eq!(range.end, 6);
799
800 assert_eq!(db.op_count(), 6); assert_eq!(db.get_metadata().await.unwrap().unwrap(), metadata);
802 let root2 = db.root();
803
804 let mut db = db.into_dirty();
806 assert!(!db.delete(k1).await.unwrap());
807 let db = db.merkleize().await.unwrap();
808
809 db.close().await.unwrap();
811 let db = open_db(context.clone(), partition).await;
812 assert_eq!(db.op_count(), 6); assert_eq!(db.get_metadata().await.unwrap().unwrap(), metadata);
814 assert_eq!(db.root(), root2);
815
816 for i in 0..*db.op_count() - 1 {
818 assert!(!db.status.get_bit(i));
819 }
820 assert!(db.status.get_bit(*db.op_count() - 1));
821
822 db.destroy().await.unwrap();
823 });
824 }
825
826 #[test_traced("WARN")]
827 fn test_current_db_build_big() {
828 let executor = deterministic::Runner::default();
829 const ELEMENTS: u64 = 1000;
832 executor.start(|context| async move {
833 let mut db = open_db(context.clone(), "build_big").await.into_dirty();
834
835 let mut map = HashMap::<Digest, Digest>::default();
836 for i in 0u64..ELEMENTS {
837 let k = Sha256::hash(&i.to_be_bytes());
838 let v = Sha256::hash(&(i * 1000).to_be_bytes());
839 db.update(k, v).await.unwrap();
840 map.insert(k, v);
841 }
842
843 for i in 0u64..ELEMENTS {
845 if i % 3 != 0 {
846 continue;
847 }
848 let k = Sha256::hash(&i.to_be_bytes());
849 let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
850 db.update(k, v).await.unwrap();
851 map.insert(k, v);
852 }
853
854 for i in 0u64..ELEMENTS {
856 if i % 7 != 1 {
857 continue;
858 }
859 let k = Sha256::hash(&i.to_be_bytes());
860 db.delete(k).await.unwrap();
861 map.remove(&k);
862 }
863
864 assert_eq!(db.op_count(), 1478);
865 assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(0));
866 assert_eq!(db.op_count(), 1478);
867 assert_eq!(db.any.snapshot.items(), 857);
868
869 let mut db = db.merkleize().await.unwrap();
871 db.commit(None).await.unwrap();
872 db.sync().await.unwrap();
873 db.prune(db.inactivity_floor_loc()).await.unwrap();
874 assert_eq!(db.op_count(), 1957);
875 assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(838));
876 assert_eq!(db.any.snapshot.items(), 857);
877
878 let root = db.root();
880 db.close().await.unwrap();
881 let db = open_db(context.clone(), "build_big").await;
882 assert_eq!(root, db.root());
883 assert_eq!(db.op_count(), 1957);
884 assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(838));
885 assert_eq!(db.any.snapshot.items(), 857);
886
887 for i in 0u64..1000 {
889 let k = Sha256::hash(&i.to_be_bytes());
890 if let Some(map_value) = map.get(&k) {
891 let Some(db_value) = db.get(&k).await.unwrap() else {
892 panic!("key not found in db: {k}");
893 };
894 assert_eq!(*map_value, db_value);
895 } else {
896 assert!(db.get(&k).await.unwrap().is_none());
897 }
898 }
899 });
900 }
901
902 #[test_traced("DEBUG")]
906 pub fn test_current_db_verify_proof_over_bits_in_uncommitted_chunk() {
907 let executor = deterministic::Runner::default();
908 executor.start(|context| async move {
909 let mut hasher = StandardHasher::<Sha256>::new();
910 let partition = "build_small";
911 let mut db = open_db(context.clone(), partition).await.into_dirty();
912
913 let k = Sha256::fill(0x01);
915 let v1 = Sha256::fill(0xA1);
916 db.update(k, v1).await.unwrap();
917 let mut db = db.merkleize().await.unwrap();
918 db.commit(None).await.unwrap();
919
920 let (_, op_loc) = db.any.get_with_loc(&k).await.unwrap().unwrap();
921 let proof = db.key_value_proof(hasher.inner(), k).await.unwrap();
922
923 let root = db.root();
925 assert!(CleanCurrentTest::verify_key_value_proof(
926 hasher.inner(),
927 k,
928 v1,
929 &proof,
930 &root
931 ));
932
933 let v2 = Sha256::fill(0xA2);
934 assert!(!CleanCurrentTest::verify_key_value_proof(
936 hasher.inner(),
937 k,
938 v2,
939 &proof,
940 &root,
941 ));
942
943 let mut db = db.into_dirty();
945 db.update(k, v2).await.unwrap();
946 let mut db = db.merkleize().await.unwrap();
947 db.commit(None).await.unwrap();
948 let root = db.root();
949
950 assert!(!CleanCurrentTest::verify_key_value_proof(
952 hasher.inner(),
953 k,
954 v2,
955 &proof,
956 &root,
957 ));
958
959 let proof = db.key_value_proof(hasher.inner(), k).await.unwrap();
961 assert!(CleanCurrentTest::verify_key_value_proof(
962 hasher.inner(),
963 k,
964 v2,
965 &proof,
966 &root,
967 ));
968 assert!(!CleanCurrentTest::verify_key_value_proof(
970 hasher.inner(),
971 k,
972 v1,
973 &proof,
974 &root,
975 ));
976
977 let (range_proof, _, chunks) = db
980 .range_proof(hasher.inner(), op_loc, NZU64!(1))
981 .await
982 .unwrap();
983 let proof_inactive = KeyValueProof {
984 loc: op_loc,
985 chunk: chunks[0],
986 range_proof,
987 };
988 let op = Operation::Update(Update(k, v1));
991 assert!(CleanCurrentTest::verify_range_proof(
992 hasher.inner(),
993 &proof_inactive.range_proof,
994 proof_inactive.loc,
995 &[op],
996 &[proof_inactive.chunk],
997 &root,
998 ));
999 assert!(!CleanCurrentTest::verify_key_value_proof(
1002 hasher.inner(),
1003 k,
1004 v1,
1005 &proof_inactive,
1006 &root,
1007 ));
1008
1009 let (_, active_loc) = db.any.get_with_loc(&k).await.unwrap().unwrap();
1013 assert_ne!(active_loc, proof_inactive.loc);
1015 assert_eq!(
1016 CleanBitMap::<Digest, 32>::leaf_pos(*active_loc),
1017 CleanBitMap::<Digest, 32>::leaf_pos(*proof_inactive.loc)
1018 );
1019 let mut fake_proof = proof_inactive.clone();
1020 fake_proof.loc = active_loc;
1021 assert!(!CleanCurrentTest::verify_key_value_proof(
1022 hasher.inner(),
1023 k,
1024 v1,
1025 &fake_proof,
1026 &root,
1027 ));
1028
1029 let mut modified_chunk = proof_inactive.chunk;
1034 let bit_pos = *proof_inactive.loc;
1035 let byte_idx = bit_pos / 8;
1036 let bit_idx = bit_pos % 8;
1037 modified_chunk[byte_idx as usize] |= 1 << bit_idx;
1038
1039 let mut fake_proof = proof_inactive.clone();
1040 fake_proof.chunk = modified_chunk;
1041 assert!(!CleanCurrentTest::verify_key_value_proof(
1042 hasher.inner(),
1043 k,
1044 v1,
1045 &fake_proof,
1046 &root,
1047 ));
1048
1049 db.destroy().await.unwrap();
1050 });
1051 }
1052
1053 async fn apply_random_ops(
1056 num_elements: u64,
1057 commit_changes: bool,
1058 rng_seed: u64,
1059 mut db: DirtyCurrentTest,
1060 ) -> Result<CleanCurrentTest, Error> {
1061 warn!("rng_seed={}", rng_seed);
1063 let mut rng = StdRng::seed_from_u64(rng_seed);
1064
1065 for i in 0u64..num_elements {
1066 let k = Sha256::hash(&i.to_be_bytes());
1067 let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1068 db.update(k, v).await.unwrap();
1069 }
1070
1071 for _ in 0u64..num_elements * 10 {
1074 let rand_key = Sha256::hash(&(rng.next_u64() % num_elements).to_be_bytes());
1075 if rng.next_u32() % 7 == 0 {
1076 db.delete(rand_key).await.unwrap();
1077 continue;
1078 }
1079 let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1080 db.update(rand_key, v).await.unwrap();
1081 if commit_changes && rng.next_u32() % 20 == 0 {
1082 let mut clean_db = db.merkleize().await?;
1084 clean_db.commit(None).await?;
1085 db = clean_db.into_dirty();
1086 }
1087 }
1088 if commit_changes {
1089 let mut clean_db = db.merkleize().await?;
1090 clean_db.commit(None).await?;
1091 Ok(clean_db)
1092 } else {
1093 db.merkleize().await
1094 }
1095 }
1096
1097 #[test_traced("DEBUG")]
1098 pub fn test_current_db_range_proofs() {
1099 let executor = deterministic::Runner::default();
1100 executor.start(|mut context| async move {
1101 let partition = "range_proofs";
1102 let mut hasher = StandardHasher::<Sha256>::new();
1103 let db = open_db(context.clone(), partition).await.into_dirty();
1104 let db = apply_random_ops(200, true, context.next_u64(), db)
1105 .await
1106 .unwrap();
1107 let root = db.root();
1108
1109 let max_ops = 4;
1112 let end_loc = db.op_count();
1113 let start_loc = db.any.inactivity_floor_loc();
1114
1115 for loc in *start_loc..*end_loc {
1116 let loc = Location::new_unchecked(loc);
1117 let (proof, ops, chunks) = db
1118 .range_proof(hasher.inner(), loc, NZU64!(max_ops))
1119 .await
1120 .unwrap();
1121 assert!(
1122 CleanCurrentTest::verify_range_proof(
1123 hasher.inner(),
1124 &proof,
1125 loc,
1126 &ops,
1127 &chunks,
1128 &root
1129 ),
1130 "failed to verify range at start_loc {start_loc}",
1131 );
1132 }
1133
1134 db.destroy().await.unwrap();
1135 });
1136 }
1137
1138 #[test_traced("DEBUG")]
1139 pub fn test_current_db_key_value_proof() {
1140 let executor = deterministic::Runner::default();
1141 executor.start(|mut context| async move {
1142 let partition = "range_proofs";
1143 let mut hasher = StandardHasher::<Sha256>::new();
1144 let db = open_db(context.clone(), partition).await.into_dirty();
1145 let db = apply_random_ops(500, true, context.next_u64(), db)
1146 .await
1147 .unwrap();
1148 let root = db.root();
1149
1150 let bad_key = Sha256::fill(0xAA);
1152 let res = db.key_value_proof(hasher.inner(), bad_key).await;
1153 assert!(matches!(res, Err(Error::KeyNotFound)));
1154
1155 let start = *db.inactivity_floor_loc();
1156 for i in start..db.status.len() {
1157 if !db.status.get_bit(i) {
1158 continue;
1159 }
1160 let (key, value) = match db.any.log.read(Location::new_unchecked(i)).await.unwrap()
1163 {
1164 Operation::Update(Update(key, value)) => (key, value),
1165 Operation::CommitFloor(_, _) => continue,
1166 Operation::Delete(_) => {
1167 unreachable!("location does not reference update/commit operation")
1168 }
1169 };
1170
1171 let proof = db.key_value_proof(hasher.inner(), key).await.unwrap();
1172 assert!(CleanCurrentTest::verify_key_value_proof(
1174 hasher.inner(),
1175 key,
1176 value,
1177 &proof,
1178 &root
1179 ));
1180 let wrong_val = Sha256::fill(0xFF);
1182 assert!(!CleanCurrentTest::verify_key_value_proof(
1183 hasher.inner(),
1184 key,
1185 wrong_val,
1186 &proof,
1187 &root
1188 ));
1189 let wrong_key = Sha256::fill(0xEE);
1191 assert!(!CleanCurrentTest::verify_key_value_proof(
1192 hasher.inner(),
1193 wrong_key,
1194 value,
1195 &proof,
1196 &root
1197 ));
1198 let wrong_root = Sha256::fill(0xDD);
1200 assert!(!CleanCurrentTest::verify_key_value_proof(
1201 hasher.inner(),
1202 key,
1203 value,
1204 &proof,
1205 &wrong_root,
1206 ));
1207 }
1208
1209 db.destroy().await.unwrap();
1210 });
1211 }
1212
1213 #[test_traced("WARN")]
1216 pub fn test_current_db_build_random_close_reopen() {
1217 const ELEMENTS: u64 = 1000;
1219
1220 let executor = deterministic::Runner::default();
1221 executor.start(|mut context| async move {
1222 let partition = "build_random";
1223 let rng_seed = context.next_u64();
1224 let db = open_db(context.clone(), partition).await.into_dirty();
1225 let db = apply_random_ops(ELEMENTS, true, rng_seed, db)
1226 .await
1227 .unwrap();
1228
1229 let root = db.root();
1231 db.close().await.unwrap();
1233
1234 let db = open_db(context, partition).await;
1235 assert_eq!(db.root(), root);
1236
1237 db.destroy().await.unwrap();
1238 });
1239 }
1240
1241 #[test_traced("WARN")]
1244 pub fn test_current_db_proving_repeated_updates() {
1245 let executor = deterministic::Runner::default();
1246 executor.start(|context| async move {
1247 let mut hasher = StandardHasher::<Sha256>::new();
1248 let partition = "build_small";
1249 let mut db = open_db(context.clone(), partition).await;
1250
1251 let k = Sha256::fill(0x00);
1253 let mut old_val = Sha256::fill(0x00);
1254 for i in 1u8..=255 {
1255 let v = Sha256::fill(i);
1256 let mut dirty_db = db.into_dirty();
1257 dirty_db.update(k, v).await.unwrap();
1258 assert_eq!(dirty_db.get(&k).await.unwrap().unwrap(), v);
1259 db = dirty_db.merkleize().await.unwrap();
1260 db.commit(None).await.unwrap();
1261 let root = db.root();
1262
1263 let proof = db.key_value_proof(hasher.inner(), k).await.unwrap();
1265 assert!(
1266 CleanCurrentTest::verify_key_value_proof(hasher.inner(), k, v, &proof, &root),
1267 "proof of update {i} failed to verify"
1268 );
1269 assert!(
1271 !CleanCurrentTest::verify_key_value_proof(
1272 hasher.inner(),
1273 k,
1274 old_val,
1275 &proof,
1276 &root
1277 ),
1278 "proof of update {i} verified when it should not have"
1279 );
1280 old_val = v;
1281 }
1282
1283 db.destroy().await.unwrap();
1284 });
1285 }
1286
1287 #[test_traced("WARN")]
1290 pub fn test_current_db_simulate_write_failures() {
1291 const ELEMENTS: u64 = 1000;
1293
1294 let executor = deterministic::Runner::default();
1295 executor.start(|mut context| async move {
1296 let partition = "build_random_fail_commit";
1297 let rng_seed = context.next_u64();
1298 let db = open_db(context.clone(), partition).await.into_dirty();
1299 let mut db = apply_random_ops(ELEMENTS, true, rng_seed, db)
1300 .await
1301 .unwrap();
1302 let committed_root = db.root();
1303 let committed_op_count = db.op_count();
1304 let committed_inactivity_floor = db.any.inactivity_floor_loc();
1305 db.prune(committed_inactivity_floor).await.unwrap();
1306
1307 let db = apply_random_ops(ELEMENTS, false, rng_seed + 1, db.into_dirty())
1309 .await
1310 .unwrap();
1311
1312 db.simulate_commit_failure_before_any_writes();
1315 let db = open_db(context.clone(), partition).await;
1316 assert_eq!(db.root(), committed_root);
1317 assert_eq!(db.op_count(), committed_op_count);
1318
1319 let db = apply_random_ops(ELEMENTS, false, rng_seed + 1, db.into_dirty())
1321 .await
1322 .unwrap();
1323
1324 db.simulate_commit_failure_after_any_db_commit()
1327 .await
1328 .unwrap();
1329
1330 let db = open_db(context.clone(), partition).await;
1333 let scenario_2_root = db.root();
1334
1335 let fresh_partition = "build_random_fail_commit_fresh";
1338 let db = open_db(context.clone(), fresh_partition).await.into_dirty();
1339 let db = apply_random_ops(ELEMENTS, true, rng_seed, db)
1340 .await
1341 .unwrap();
1342 let db = apply_random_ops(ELEMENTS, false, rng_seed + 1, db.into_dirty())
1343 .await
1344 .unwrap();
1345 let mut db = db.into_dirty().merkleize().await.unwrap();
1346 db.commit(None).await.unwrap();
1347 db.prune(db.any.inactivity_floor_loc()).await.unwrap();
1348 assert_eq!(db.root(), scenario_2_root);
1350
1351 db.destroy().await.unwrap();
1352 });
1353 }
1354
1355 #[test_traced("WARN")]
1356 pub fn test_current_db_different_pruning_delays_same_root() {
1357 let executor = deterministic::Runner::default();
1358 executor.start(|context| async move {
1359 let db_config_no_pruning = current_db_config("no_pruning_test");
1361
1362 let db_config_pruning = current_db_config("pruning_test");
1363
1364 let mut db_no_pruning =
1365 CleanCurrentTest::init(context.clone(), db_config_no_pruning.clone())
1366 .await
1367 .unwrap()
1368 .into_dirty();
1369 let mut db_pruning = CleanCurrentTest::init(context.clone(), db_config_pruning.clone())
1370 .await
1371 .unwrap()
1372 .into_dirty();
1373
1374 const NUM_OPERATIONS: u64 = 1000;
1376 for i in 0..NUM_OPERATIONS {
1377 let key = Sha256::hash(&i.to_be_bytes());
1378 let value = Sha256::hash(&(i * 1000).to_be_bytes());
1379
1380 db_no_pruning.update(key, value).await.unwrap();
1381 db_pruning.update(key, value).await.unwrap();
1382
1383 if i % 50 == 49 {
1385 let mut clean_no_pruning = db_no_pruning.merkleize().await.unwrap();
1386 clean_no_pruning.commit(None).await.unwrap();
1387 let mut clean_pruning = db_pruning.merkleize().await.unwrap();
1388 clean_pruning.commit(None).await.unwrap();
1389 clean_pruning
1390 .prune(clean_no_pruning.any.inactivity_floor_loc())
1391 .await
1392 .unwrap();
1393 db_no_pruning = clean_no_pruning.into_dirty();
1394 db_pruning = clean_pruning.into_dirty();
1395 }
1396 }
1397
1398 let mut db_no_pruning = db_no_pruning.merkleize().await.unwrap();
1400 db_no_pruning.commit(None).await.unwrap();
1401 let mut db_pruning = db_pruning.merkleize().await.unwrap();
1402 db_pruning.commit(None).await.unwrap();
1403
1404 let root_no_pruning = db_no_pruning.root();
1406 let root_pruning = db_pruning.root();
1407
1408 assert_eq!(root_no_pruning, root_pruning);
1410
1411 db_no_pruning.close().await.unwrap();
1413 db_pruning.close().await.unwrap();
1414
1415 let db_no_pruning = CleanCurrentTest::init(context.clone(), db_config_no_pruning)
1417 .await
1418 .unwrap();
1419 let db_pruning = CleanCurrentTest::init(context.clone(), db_config_pruning)
1420 .await
1421 .unwrap();
1422 assert_eq!(
1423 db_no_pruning.inactivity_floor_loc(),
1424 db_pruning.inactivity_floor_loc()
1425 );
1426
1427 let root_no_pruning_restart = db_no_pruning.root();
1429 let root_pruning_restart = db_pruning.root();
1430
1431 assert_eq!(root_no_pruning, root_no_pruning_restart);
1433 assert_eq!(root_pruning, root_pruning_restart);
1434
1435 db_no_pruning.destroy().await.unwrap();
1436 db_pruning.destroy().await.unwrap();
1437 });
1438 }
1439
1440 #[test_traced("DEBUG")]
1441 fn test_batch() {
1442 batch_tests::test_batch(|mut ctx| async move {
1443 let seed = ctx.next_u64();
1444 let prefix = format!("current_unordered_batch_{seed}");
1445 AnyExt::new(open_db(ctx, &prefix).await)
1446 });
1447 }
1448}