1use crate::{
8 adb::{
9 any::fixed::{Any, Config as AConfig},
10 Error,
11 },
12 index::Index,
13 mmr::{
14 bitmap::Bitmap,
15 hasher::{Grafting, GraftingVerifier, Hasher, Standard},
16 iterator::{leaf_num_to_pos, leaf_pos_to_num},
17 storage::Grafting as GStorage,
18 verification::Proof,
19 },
20 store::operation::Fixed,
21 translator::Translator,
22};
23use commonware_codec::{CodecFixed, Encode as _, FixedSize};
24use commonware_cryptography::Hasher as CHasher;
25use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
26use commonware_utils::Array;
27use futures::{future::try_join_all, try_join};
28use std::num::{NonZeroU64, NonZeroUsize};
29use tracing::debug;
30
31#[derive(Clone)]
33pub struct Config<T: Translator> {
34 pub mmr_journal_partition: String,
36
37 pub mmr_items_per_blob: NonZeroU64,
39
40 pub mmr_write_buffer: NonZeroUsize,
42
43 pub mmr_metadata_partition: String,
45
46 pub log_journal_partition: String,
48
49 pub log_items_per_blob: NonZeroU64,
51
52 pub log_write_buffer: NonZeroUsize,
54
55 pub bitmap_metadata_partition: String,
57
58 pub translator: T,
60
61 pub thread_pool: Option<ThreadPool>,
63
64 pub buffer_pool: PoolRef,
66}
67
68pub struct Current<
75 E: RStorage + Clock + Metrics,
76 K: Array,
77 V: CodecFixed<Cfg = ()> + Clone,
78 H: CHasher,
79 T: Translator,
80 const N: usize,
81> {
82 pub any: Any<E, K, V, H, T>,
85
86 pub status: Bitmap<H, N>,
89
90 context: E,
91
92 bitmap_metadata_partition: String,
93}
94
95#[derive(Clone)]
97pub struct KeyValueProofInfo<K, V, const N: usize> {
98 pub key: K,
100
101 pub value: V,
103
104 pub loc: u64,
106
107 pub chunk: [u8; N],
109}
110
111impl<
112 E: RStorage + Clock + Metrics,
113 K: Array,
114 V: CodecFixed<Cfg = ()> + Clone,
115 H: CHasher,
116 T: Translator,
117 const N: usize,
118 > Current<E, K, V, H, T, N>
119{
120 const _CHUNK_SIZE_ASSERT: () = assert!(
123 N.is_multiple_of(H::Digest::SIZE),
124 "chunk size must be some multiple of the digest size",
125 );
126
127 const _CHUNK_SIZE_IS_POW_OF_2_ASSERT: () =
130 assert!(N.is_power_of_two(), "chunk size must be a power of 2");
131
132 pub async fn init(context: E, config: Config<T>) -> Result<Self, Error> {
135 let cfg = AConfig {
137 mmr_journal_partition: config.mmr_journal_partition,
138 mmr_metadata_partition: config.mmr_metadata_partition,
139 mmr_items_per_blob: config.mmr_items_per_blob,
140 mmr_write_buffer: config.mmr_write_buffer,
141 log_journal_partition: config.log_journal_partition,
142 log_items_per_blob: config.log_items_per_blob,
143 log_write_buffer: config.log_write_buffer,
144 translator: config.translator.clone(),
145 thread_pool: config.thread_pool,
146 buffer_pool: config.buffer_pool,
147 };
148
149 let context = context.with_label("adb::current");
150 let cloned_pool = cfg.thread_pool.clone();
151 let mut status = Bitmap::restore_pruned(
152 context.with_label("bitmap"),
153 &config.bitmap_metadata_partition,
154 cloned_pool,
155 )
156 .await?;
157
158 let mut hasher = Standard::<H>::new();
160 let (inactivity_floor_loc, mmr, log) =
161 Any::<_, _, _, _, T>::init_mmr_and_log(context.clone(), cfg, &mut hasher).await?;
162
163 let mut grafter = Grafting::new(&mut hasher, Self::grafting_height());
165 if status.bit_count() < inactivity_floor_loc {
166 while status.bit_count() < inactivity_floor_loc {
169 status.append(false);
170 }
171
172 grafter
175 .load_grafted_digests(&status.dirty_chunks(), &mmr)
176 .await?;
177 status.sync(&mut grafter).await?;
178 }
179
180 let mut snapshot = Index::init(context.with_label("snapshot"), config.translator);
182 Any::build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, Some(&mut status))
183 .await
184 .unwrap();
185 grafter
186 .load_grafted_digests(&status.dirty_chunks(), &mmr)
187 .await?;
188 status.sync(&mut grafter).await?;
189
190 let any = Any {
191 mmr,
192 log,
193 snapshot,
194 inactivity_floor_loc,
195 uncommitted_ops: 0,
196 hasher: Standard::<H>::new(),
197 };
198
199 Ok(Self {
200 any,
201 status,
202 context,
203 bitmap_metadata_partition: config.bitmap_metadata_partition,
204 })
205 }
206
207 pub fn op_count(&self) -> u64 {
210 self.any.op_count()
211 }
212
213 pub fn inactivity_floor_loc(&self) -> u64 {
215 self.any.inactivity_floor_loc()
216 }
217
218 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
220 self.any.get(key).await
221 }
222
223 pub async fn get_loc(&self, loc: u64) -> Result<Option<V>, Error> {
227 self.any.get_loc(loc).await
228 }
229
230 const fn grafting_height() -> u32 {
235 Bitmap::<H, N>::CHUNK_SIZE_BITS.trailing_zeros()
236 }
237
238 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
242 let update_result = self.any.update_return_loc(key, value).await?;
243 if let Some(old_loc) = update_result {
244 self.status.set_bit(old_loc, false);
245 }
246 self.status.append(true);
247
248 Ok(())
249 }
250
251 pub async fn delete(&mut self, key: K) -> Result<(), Error> {
255 let Some(old_loc) = self.any.delete(key).await? else {
256 return Ok(());
257 };
258
259 self.status.append(false);
260 self.status.set_bit(old_loc, false);
261
262 Ok(())
263 }
264
265 async fn commit_ops(&mut self) -> Result<(), Error> {
268 self.raise_inactivity_floor(self.any.uncommitted_ops + 1)
271 .await?;
272
273 let log_fut = async { self.any.log.sync().await.map_err(Error::Journal) };
275 let mmr_fut = async {
276 self.any.mmr.process_updates(&mut self.any.hasher);
277 Ok::<(), Error>(())
278 };
279 try_join!(log_fut, mmr_fut)?;
280 self.any.uncommitted_ops = 0;
281
282 self.any.sync().await
283 }
284
285 async fn raise_inactivity_floor(&mut self, max_steps: u64) -> Result<(), Error> {
293 for _ in 0..max_steps {
294 if self.any.inactivity_floor_loc == self.op_count() {
295 break;
296 }
297 let op = self.any.log.read(self.any.inactivity_floor_loc).await?;
298 let old_loc = self
299 .any
300 .move_op_if_active(op, self.any.inactivity_floor_loc)
301 .await?;
302 if let Some(old_loc) = old_loc {
303 self.status.set_bit(old_loc, false);
304 self.status.append(true);
305 }
306 self.any.inactivity_floor_loc += 1;
307 }
308
309 self.any
310 .apply_op(Fixed::CommitFloor(self.any.inactivity_floor_loc))
311 .await?;
312 self.status.append(false);
313
314 Ok(())
315 }
316
317 pub async fn commit(&mut self) -> Result<(), Error> {
321 self.commit_ops().await?; let mut grafter = Grafting::new(&mut self.any.hasher, Self::grafting_height());
327 grafter
328 .load_grafted_digests(&self.status.dirty_chunks(), &self.any.mmr)
329 .await?;
330 self.status.sync(&mut grafter).await?;
331
332 self.status.prune_to_bit(self.any.inactivity_floor_loc);
333 self.status
334 .write_pruned(
335 self.context.with_label("bitmap"),
336 &self.bitmap_metadata_partition,
337 )
338 .await?; Ok(())
341 }
342
343 pub async fn sync(&mut self) -> Result<(), Error> {
345 self.any.sync().await
346 }
347
348 pub async fn prune(&mut self, target_prune_loc: u64) -> Result<(), Error> {
354 self.any.prune(target_prune_loc).await
355 }
356
357 pub async fn root(&self, hasher: &mut Standard<H>) -> Result<H::Digest, Error> {
363 assert!(
364 !self.status.is_dirty(),
365 "must process updates before computing root"
366 );
367 let ops = &self.any.mmr;
368 let height = Self::grafting_height();
369 let grafted_mmr = GStorage::<'_, H, _, _>::new(&self.status, ops, height);
370 let mmr_root = grafted_mmr.root(hasher).await?;
371
372 let last_chunk = self.status.last_chunk();
376 if last_chunk.1 == 0 {
377 return Ok(mmr_root);
378 }
379
380 hasher.inner().update(last_chunk.0);
385 let last_chunk_digest = hasher.inner().finalize();
386
387 Ok(Bitmap::<H, N>::partial_chunk_root(
388 hasher.inner(),
389 &mmr_root,
390 last_chunk.1,
391 &last_chunk_digest,
392 ))
393 }
394
395 pub async fn range_proof(
404 &self,
405 hasher: &mut H,
406 start_loc: u64,
407 max_ops: NonZeroU64,
408 ) -> Result<(Proof<H::Digest>, Vec<Fixed<K, V>>, Vec<[u8; N]>), Error> {
409 assert!(
410 !self.status.is_dirty(),
411 "must process updates before computing proofs"
412 );
413
414 let mmr = &self.any.mmr;
416 let start_pos = leaf_num_to_pos(start_loc);
417 let leaves = mmr.leaves();
418 assert!(start_loc < leaves, "start_loc is invalid");
419 let max_loc = start_loc + max_ops.get();
420 let end_loc = if max_loc > leaves {
421 leaves - 1
422 } else {
423 max_loc - 1
424 };
425 let end_pos = leaf_num_to_pos(end_loc);
426
427 let height = Self::grafting_height();
429 let grafted_mmr = GStorage::<'_, H, _, _>::new(&self.status, mmr, height);
430 let mut proof = Proof::<H::Digest>::range_proof(&grafted_mmr, start_pos, end_pos).await?;
431
432 let mut ops = Vec::with_capacity((end_loc - start_loc + 1) as usize);
434 let futures = (start_loc..=end_loc)
435 .map(|i| self.any.log.read(i))
436 .collect::<Vec<_>>();
437 try_join_all(futures)
438 .await?
439 .into_iter()
440 .for_each(|op| ops.push(op));
441
442 let chunk_bits = Bitmap::<H, N>::CHUNK_SIZE_BITS;
444 let start = start_loc / chunk_bits;
445 let end = end_loc / chunk_bits;
446 let mut chunks = Vec::with_capacity((end - start + 1) as usize);
447 for i in start..=end {
448 let bit_offset = i * chunk_bits;
449 let chunk = *self.status.get_chunk(bit_offset);
450 chunks.push(chunk);
451 }
452
453 let last_chunk = self.status.last_chunk();
454 if last_chunk.1 == 0 {
455 return Ok((proof, ops, chunks));
456 }
457
458 hasher.update(last_chunk.0);
459 proof.digests.push(hasher.finalize());
460
461 Ok((proof, ops, chunks))
462 }
463
464 pub fn verify_range_proof(
467 hasher: &mut Standard<H>,
468 proof: &Proof<H::Digest>,
469 start_loc: u64,
470 ops: &[Fixed<K, V>],
471 chunks: &[[u8; N]],
472 root: &H::Digest,
473 ) -> bool {
474 let op_count = leaf_pos_to_num(proof.size);
475 let Some(op_count) = op_count else {
476 debug!("verification failed, invalid proof size");
477 return false;
478 };
479 let end_loc = start_loc + ops.len() as u64 - 1;
480 if end_loc >= op_count {
481 debug!(
482 loc = end_loc,
483 op_count, "proof verification failed, invalid range"
484 );
485 return false;
486 }
487
488 let start_pos = leaf_num_to_pos(start_loc);
489
490 let elements = ops.iter().map(|op| op.encode()).collect::<Vec<_>>();
491
492 let chunk_vec = chunks.iter().map(|c| c.as_ref()).collect::<Vec<_>>();
493 let mut verifier = GraftingVerifier::<H>::new(
494 Self::grafting_height(),
495 start_loc / Bitmap::<H, N>::CHUNK_SIZE_BITS,
496 chunk_vec,
497 );
498
499 if op_count % Bitmap::<H, N>::CHUNK_SIZE_BITS == 0 {
500 return proof.verify_range_inclusion(&mut verifier, &elements, start_pos, root);
501 }
502
503 if proof.digests.is_empty() {
505 debug!("proof has no digests");
506 return false;
507 }
508 let mut proof = proof.clone();
509 let last_chunk_digest = proof.digests.pop().unwrap();
510
511 let mmr_root = match proof.reconstruct_root(&mut verifier, &elements, start_pos) {
513 Ok(root) => root,
514 Err(error) => {
515 debug!(error = ?error, "invalid proof input");
516 return false;
517 }
518 };
519
520 let next_bit = op_count % Bitmap::<H, N>::CHUNK_SIZE_BITS;
521 let reconstructed_root = Bitmap::<H, N>::partial_chunk_root(
522 hasher.inner(),
523 &mmr_root,
524 next_bit,
525 &last_chunk_digest,
526 );
527
528 reconstructed_root == *root
529 }
530
531 pub async fn key_value_proof(
539 &self,
540 hasher: &mut H,
541 key: K,
542 ) -> Result<(Proof<H::Digest>, KeyValueProofInfo<K, V, N>), Error> {
543 assert!(
544 !self.status.is_dirty(),
545 "must process updates before computing proofs"
546 );
547 let op = self.any.get_key_loc(&key).await?;
548 let Some((value, loc)) = op else {
549 return Err(Error::KeyNotFound);
550 };
551 let pos = leaf_num_to_pos(loc);
552 let height = Self::grafting_height();
553 let grafted_mmr = GStorage::<'_, H, _, _>::new(&self.status, &self.any.mmr, height);
554
555 let mut proof = Proof::<H::Digest>::range_proof(&grafted_mmr, pos, pos).await?;
556 let chunk = *self.status.get_chunk(loc);
557
558 let last_chunk = self.status.last_chunk();
559 if last_chunk.1 != 0 {
560 hasher.update(last_chunk.0);
561 proof.digests.push(hasher.finalize());
562 }
563
564 Ok((
565 proof,
566 KeyValueProofInfo {
567 key,
568 value,
569 loc,
570 chunk,
571 },
572 ))
573 }
574
575 pub fn verify_key_value_proof(
578 hasher: &mut H,
579 proof: &Proof<H::Digest>,
580 info: &KeyValueProofInfo<K, V, N>,
581 root: &H::Digest,
582 ) -> bool {
583 let Some(op_count) = leaf_pos_to_num(proof.size) else {
584 debug!("verification failed, invalid proof size");
585 return false;
586 };
587
588 if !Bitmap::<H, N>::get_bit_from_chunk(&info.chunk, info.loc) {
591 debug!(
592 loc = info.loc,
593 "proof verification failed, operation is inactive"
594 );
595 return false;
596 }
597
598 let pos = leaf_num_to_pos(info.loc);
599 let num = info.loc / Bitmap::<H, N>::CHUNK_SIZE_BITS;
600 let mut verifier =
601 GraftingVerifier::<H>::new(Self::grafting_height(), num, vec![&info.chunk]);
602 let element = Fixed::Update(info.key.clone(), info.value.clone()).encode();
603
604 if op_count % Bitmap::<H, N>::CHUNK_SIZE_BITS == 0 {
605 return proof.verify_element_inclusion(&mut verifier, &element, pos, root);
606 }
607
608 if proof.digests.is_empty() {
610 debug!("proof has no digests");
611 return false;
612 }
613
614 let mut proof = proof.clone();
615 let last_chunk_digest = proof.digests.pop().unwrap();
616
617 if info.loc / Bitmap::<H, N>::CHUNK_SIZE_BITS == op_count / Bitmap::<H, N>::CHUNK_SIZE_BITS
621 {
622 let expected_last_chunk_digest = verifier.digest(&info.chunk);
623 if last_chunk_digest != expected_last_chunk_digest {
624 debug!("last chunk digest does not match expected value");
625 return false;
626 }
627 }
628
629 let mmr_root = match proof.reconstruct_root(&mut verifier, &[element], pos) {
631 Ok(root) => root,
632 Err(error) => {
633 debug!(error = ?error, "invalid proof input");
634 return false;
635 }
636 };
637
638 let next_bit = op_count % Bitmap::<H, N>::CHUNK_SIZE_BITS;
639 let reconstructed_root =
640 Bitmap::<H, N>::partial_chunk_root(hasher, &mmr_root, next_bit, &last_chunk_digest);
641
642 reconstructed_root == *root
643 }
644
645 pub async fn close(self) -> Result<(), Error> {
647 self.any.close().await
648 }
649
650 pub async fn destroy(self) -> Result<(), Error> {
652 Bitmap::<H, N>::destroy(self.context, &self.bitmap_metadata_partition).await?;
654
655 self.any.destroy().await
657 }
658
659 #[cfg(test)]
660 async fn operation_inclusion_proof(
662 &self,
663 hasher: &mut H,
664 loc: u64,
665 ) -> Result<(Proof<H::Digest>, Fixed<K, V>, u64, [u8; N]), Error> {
666 let op = self.any.log.read(loc).await?;
667
668 let pos = leaf_num_to_pos(loc);
669 let height = Self::grafting_height();
670 let grafted_mmr = GStorage::<'_, H, _, _>::new(&self.status, &self.any.mmr, height);
671
672 let mut proof = Proof::<H::Digest>::range_proof(&grafted_mmr, pos, pos).await?;
673 let chunk = *self.status.get_chunk(loc);
674
675 let last_chunk = self.status.last_chunk();
676 if last_chunk.1 != 0 {
677 hasher.update(last_chunk.0);
678 proof.digests.push(hasher.finalize());
679 }
680
681 Ok((proof, op, loc, chunk))
682 }
683
684 #[cfg(test)]
685 fn simulate_commit_failure_before_any_writes(self) {
688 }
690
691 #[cfg(test)]
692 async fn simulate_commit_failure_after_any_db_commit(mut self) -> Result<(), Error> {
695 self.commit_ops().await
697 }
698
699 #[cfg(test)]
700 async fn simulate_commit_failure_after_bitmap_written(mut self) -> Result<(), Error> {
703 self.commit_ops().await?; let mut grafter = Grafting::new(&mut self.any.hasher, Self::grafting_height());
707 grafter
708 .load_grafted_digests(&self.status.dirty_chunks(), &self.any.mmr)
709 .await?;
710 self.status.sync(&mut grafter).await?;
711 let target_prune_loc = self.any.inactivity_floor_loc;
712 self.status.prune_to_bit(target_prune_loc);
713 self.status
714 .write_pruned(
715 self.context.with_label("bitmap"),
716 &self.bitmap_metadata_partition,
717 )
718 .await?; Ok(())
721 }
722}
723
724#[cfg(test)]
725pub mod test {
726 use super::*;
727 use crate::translator::TwoCap;
728 use commonware_cryptography::{sha256::Digest, Sha256};
729 use commonware_macros::test_traced;
730 use commonware_runtime::{deterministic, Runner as _};
731 use commonware_utils::{NZUsize, NZU64};
732 use rand::{rngs::StdRng, RngCore, SeedableRng};
733 use tracing::warn;
734
735 const PAGE_SIZE: usize = 88;
736 const PAGE_CACHE_SIZE: usize = 8;
737
738 fn current_db_config(partition_prefix: &str) -> Config<TwoCap> {
739 Config {
740 mmr_journal_partition: format!("{partition_prefix}_journal_partition"),
741 mmr_metadata_partition: format!("{partition_prefix}_metadata_partition"),
742 mmr_items_per_blob: NZU64!(11),
743 mmr_write_buffer: NZUsize!(1024),
744 log_journal_partition: format!("{partition_prefix}_partition_prefix"),
745 log_items_per_blob: NZU64!(7),
746 log_write_buffer: NZUsize!(1024),
747 bitmap_metadata_partition: format!("{partition_prefix}_bitmap_metadata_partition"),
748 translator: TwoCap,
749 thread_pool: None,
750 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
751 }
752 }
753
754 type CurrentTest = Current<deterministic::Context, Digest, Digest, Sha256, TwoCap, 32>;
756
757 async fn open_db(context: deterministic::Context, partition_prefix: &str) -> CurrentTest {
759 CurrentTest::init(context, current_db_config(partition_prefix))
760 .await
761 .unwrap()
762 }
763
764 #[test_traced("DEBUG")]
766 pub fn test_current_db_build_small_close_reopen() {
767 let executor = deterministic::Runner::default();
768 executor.start(|context| async move {
769 let mut hasher = Standard::<Sha256>::new();
770 let partition = "build_small";
771 let mut db = open_db(context.clone(), partition).await;
772 assert_eq!(db.op_count(), 0);
773 assert_eq!(db.inactivity_floor_loc(), 0);
774 let root0 = db.root(&mut hasher).await.unwrap();
775 db.close().await.unwrap();
776 db = open_db(context.clone(), partition).await;
777 assert_eq!(db.op_count(), 0);
778 assert_eq!(db.root(&mut hasher).await.unwrap(), root0);
779
780 let k1 = Sha256::hash(&0u64.to_be_bytes());
782 let v1 = Sha256::hash(&10u64.to_be_bytes());
783 db.update(k1, v1).await.unwrap();
784 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
785 db.commit().await.unwrap();
786 assert_eq!(db.op_count(), 4); let root1 = db.root(&mut hasher).await.unwrap();
788 assert!(root1 != root0);
789 db.close().await.unwrap();
790 db = open_db(context.clone(), partition).await;
791 assert_eq!(db.op_count(), 4); assert_eq!(db.root(&mut hasher).await.unwrap(), root1);
793
794 db.delete(k1).await.unwrap();
796 db.commit().await.unwrap();
797 assert_eq!(db.op_count(), 6); let root2 = db.root(&mut hasher).await.unwrap();
799 db.close().await.unwrap();
800 db = open_db(context.clone(), partition).await;
801 assert_eq!(db.op_count(), 6); assert_eq!(db.root(&mut hasher).await.unwrap(), root2);
803
804 for i in 0..db.op_count() {
806 assert!(!db.status.get_bit(i));
807 }
808
809 db.destroy().await.unwrap();
810 });
811 }
812
813 #[test_traced("DEBUG")]
817 pub fn test_current_db_verify_proof_over_bits_in_uncommitted_chunk() {
818 let executor = deterministic::Runner::default();
819 executor.start(|context| async move {
820 let mut hasher = Standard::<Sha256>::new();
821 let partition = "build_small";
822 let mut db = open_db(context.clone(), partition).await;
823
824 let k = Sha256::fill(0x01);
826 let v1 = Sha256::fill(0xA1);
827 db.update(k, v1).await.unwrap();
828 db.commit().await.unwrap();
829
830 let op = db.any.get_key_loc(&k).await.unwrap().unwrap();
831 let proof = db
832 .operation_inclusion_proof(hasher.inner(), op.1)
833 .await
834 .unwrap();
835 let info = KeyValueProofInfo {
836 key: k,
837 value: v1,
838 loc: op.1,
839 chunk: proof.3,
840 };
841 let root = db.root(&mut hasher).await.unwrap();
842 assert!(CurrentTest::verify_key_value_proof(
844 hasher.inner(),
845 &proof.0,
846 &info,
847 &root,
848 ),);
849
850 let v2 = Sha256::fill(0xA2);
851 let mut bad_info = info.clone();
853 bad_info.value = v2;
854 assert!(!CurrentTest::verify_key_value_proof(
855 hasher.inner(),
856 &proof.0,
857 &bad_info,
858 &root,
859 ),);
860
861 db.update(k, v2).await.unwrap();
863 db.commit().await.unwrap();
864
865 let root = db.root(&mut hasher).await.unwrap();
867 assert!(!CurrentTest::verify_key_value_proof(
868 hasher.inner(),
869 &proof.0,
870 &info,
871 &root,
872 ),);
873
874 let proof_inactive = db
876 .operation_inclusion_proof(hasher.inner(), op.1)
877 .await
878 .unwrap();
879 let proof_inactive_info = KeyValueProofInfo {
882 key: k,
883 value: v1,
884 loc: proof_inactive.2,
885 chunk: proof_inactive.3,
886 };
887 assert!(!CurrentTest::verify_key_value_proof(
888 hasher.inner(),
889 &proof_inactive.0,
890 &proof_inactive_info,
891 &root,
892 ),);
893
894 let (_, active_loc) = db.any.get_key_loc(&info.key).await.unwrap().unwrap();
898 assert_ne!(active_loc, info.loc);
900 assert_eq!(
901 Bitmap::<Sha256, 32>::leaf_pos(active_loc),
902 Bitmap::<Sha256, 32>::leaf_pos(info.loc)
903 );
904 let mut info_with_modified_loc = info.clone();
905 info_with_modified_loc.loc = active_loc;
906 assert!(!CurrentTest::verify_key_value_proof(
907 hasher.inner(),
908 &proof_inactive.0,
909 &proof_inactive_info,
910 &root,
911 ),);
912
913 let mut modified_chunk = proof_inactive.3;
918 let bit_pos = proof_inactive.2;
919 let byte_idx = bit_pos / 8;
920 let bit_idx = bit_pos % 8;
921 modified_chunk[byte_idx as usize] |= 1 << bit_idx;
922
923 let mut info_with_modified_chunk = info.clone();
924 info_with_modified_chunk.chunk = modified_chunk;
925 assert!(!CurrentTest::verify_key_value_proof(
926 hasher.inner(),
927 &proof_inactive.0,
928 &info_with_modified_chunk,
929 &root,
930 ),);
931
932 db.destroy().await.unwrap();
933 });
934 }
935
936 async fn apply_random_ops(
939 num_elements: u64,
940 commit_changes: bool,
941 rng_seed: u64,
942 db: &mut CurrentTest,
943 ) -> Result<(), Error> {
944 warn!("rng_seed={}", rng_seed);
946 let mut rng = StdRng::seed_from_u64(rng_seed);
947
948 for i in 0u64..num_elements {
949 let k = Sha256::hash(&i.to_be_bytes());
950 let v = Sha256::hash(&rng.next_u32().to_be_bytes());
951 db.update(k, v).await.unwrap();
952 }
953
954 for _ in 0u64..num_elements * 10 {
957 let rand_key = Sha256::hash(&(rng.next_u64() % num_elements).to_be_bytes());
958 if rng.next_u32() % 7 == 0 {
959 db.delete(rand_key).await.unwrap();
960 continue;
961 }
962 let v = Sha256::hash(&rng.next_u32().to_be_bytes());
963 db.update(rand_key, v).await.unwrap();
964 if commit_changes && rng.next_u32() % 20 == 0 {
965 db.commit().await.unwrap();
967 }
968 }
969 if commit_changes {
970 db.commit().await.unwrap();
971 }
972
973 Ok(())
974 }
975
976 #[test_traced("DEBUG")]
977 pub fn test_current_db_range_proofs() {
978 let executor = deterministic::Runner::default();
979 executor.start(|mut context| async move {
980 let partition = "range_proofs";
981 let mut hasher = Standard::<Sha256>::new();
982 let mut db = open_db(context.clone(), partition).await;
983 apply_random_ops(200, true, context.next_u64(), &mut db)
984 .await
985 .unwrap();
986 let root = db.root(&mut hasher).await.unwrap();
987
988 let max_ops = 4;
991 let end_loc = db.op_count();
992 let start_loc = db.any.inactivity_floor_loc();
993
994 for i in start_loc..end_loc {
995 let (proof, ops, chunks) = db
996 .range_proof(hasher.inner(), i, NZU64!(max_ops))
997 .await
998 .unwrap();
999 assert!(
1000 CurrentTest::verify_range_proof(&mut hasher, &proof, i, &ops, &chunks, &root),
1001 "failed to verify range at start_loc {start_loc}",
1002 );
1003 }
1004
1005 db.destroy().await.unwrap();
1006 });
1007 }
1008
1009 #[test_traced("DEBUG")]
1010 pub fn test_current_db_key_value_proof() {
1011 let executor = deterministic::Runner::default();
1012 executor.start(|mut context| async move {
1013 let partition = "range_proofs";
1014 let mut hasher = Standard::<Sha256>::new();
1015 let mut db = open_db(context.clone(), partition).await;
1016 apply_random_ops(500, true, context.next_u64(), &mut db)
1017 .await
1018 .unwrap();
1019 let root = db.root(&mut hasher).await.unwrap();
1020
1021 let bad_key = Sha256::fill(0xAA);
1023 let res = db.key_value_proof(hasher.inner(), bad_key).await;
1024 assert!(matches!(res, Err(Error::KeyNotFound)));
1025
1026 let start = db.inactivity_floor_loc();
1027 for i in start..db.status.bit_count() {
1028 if !db.status.get_bit(i) {
1029 continue;
1030 }
1031 let op = db.any.log.read(i).await.unwrap();
1033 let key = op.key().unwrap();
1034 let (proof, info) = db.key_value_proof(hasher.inner(), *key).await.unwrap();
1035 assert_eq!(info.value, *op.value().unwrap());
1036 assert!(CurrentTest::verify_key_value_proof(
1038 hasher.inner(),
1039 &proof,
1040 &info,
1041 &root
1042 ));
1043 let wrong_val = Sha256::fill(0xFF);
1045 let mut bad_info = info.clone();
1046 bad_info.value = wrong_val;
1047 assert!(!CurrentTest::verify_key_value_proof(
1048 hasher.inner(),
1049 &proof,
1050 &bad_info,
1051 &root
1052 ));
1053 let wrong_key = Sha256::fill(0xEE);
1055 let mut bad_info = info.clone();
1056 bad_info.key = wrong_key;
1057 assert!(!CurrentTest::verify_key_value_proof(
1058 hasher.inner(),
1059 &proof,
1060 &bad_info,
1061 &root
1062 ));
1063 let wrong_root = Sha256::fill(0xDD);
1065 assert!(!CurrentTest::verify_key_value_proof(
1066 hasher.inner(),
1067 &proof,
1068 &info,
1069 &wrong_root,
1070 ),);
1071 }
1072
1073 db.destroy().await.unwrap();
1074 });
1075 }
1076
1077 #[test_traced("WARN")]
1080 pub fn test_current_db_build_random_close_reopen() {
1081 const ELEMENTS: u64 = 1000;
1083
1084 let executor = deterministic::Runner::default();
1085 executor.start(|mut context| async move {
1086 let partition = "build_random";
1087 let rng_seed = context.next_u64();
1088 let mut hasher = Standard::<Sha256>::new();
1089 let mut db = open_db(context.clone(), partition).await;
1090 apply_random_ops(ELEMENTS, true, rng_seed, &mut db)
1091 .await
1092 .unwrap();
1093
1094 let root = db.root(&mut hasher).await.unwrap();
1096 db.close().await.unwrap();
1098
1099 let db = open_db(context, partition).await;
1100 assert_eq!(db.root(&mut hasher).await.unwrap(), root);
1101
1102 db.destroy().await.unwrap();
1103 });
1104 }
1105
1106 #[test_traced("WARN")]
1109 pub fn test_current_db_proving_repeated_updates() {
1110 let executor = deterministic::Runner::default();
1111 executor.start(|context| async move {
1112 let mut hasher = Standard::<Sha256>::new();
1113 let partition = "build_small";
1114 let mut db = open_db(context.clone(), partition).await;
1115
1116 let k = Sha256::fill(0x00);
1118 let mut old_info = KeyValueProofInfo {
1119 key: k,
1120 value: Sha256::fill(0x00),
1121 loc: 0,
1122 chunk: [0; 32],
1123 };
1124 for i in 1u8..=255 {
1125 let v = Sha256::fill(i);
1126 db.update(k, v).await.unwrap();
1127 assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
1128 db.commit().await.unwrap();
1129 let root = db.root(&mut hasher).await.unwrap();
1130
1131 let (proof, info) = db.key_value_proof(hasher.inner(), k).await.unwrap();
1133 assert_eq!(info.value, v);
1134 assert!(
1135 CurrentTest::verify_key_value_proof(hasher.inner(), &proof, &info, &root),
1136 "proof of update {i} failed to verify"
1137 );
1138 assert!(
1140 !CurrentTest::verify_key_value_proof(hasher.inner(), &proof, &old_info, &root),
1141 "proof of update {i} failed to verify"
1142 );
1143 old_info = info.clone();
1144 }
1145
1146 db.destroy().await.unwrap();
1147 });
1148 }
1149
1150 #[test_traced("WARN")]
1153 pub fn test_current_db_simulate_write_failures() {
1154 const ELEMENTS: u64 = 1000;
1156
1157 let executor = deterministic::Runner::default();
1158 executor.start(|mut context| async move {
1159 let partition = "build_random_fail_commit";
1160 let rng_seed = context.next_u64();
1161 let mut hasher = Standard::<Sha256>::new();
1162 let mut db = open_db(context.clone(), partition).await;
1163 apply_random_ops(ELEMENTS, true, rng_seed, &mut db)
1164 .await
1165 .unwrap();
1166 let committed_root = db.root(&mut hasher).await.unwrap();
1167 let committed_op_count = db.op_count();
1168 let committed_inactivity_floor = db.any.inactivity_floor_loc;
1169 db.prune(committed_inactivity_floor).await.unwrap();
1170
1171 apply_random_ops(ELEMENTS, false, rng_seed + 1, &mut db)
1173 .await
1174 .unwrap();
1175
1176 db.simulate_commit_failure_before_any_writes();
1179 let mut db = open_db(context.clone(), partition).await;
1180 assert_eq!(db.root(&mut hasher).await.unwrap(), committed_root);
1181 assert_eq!(db.op_count(), committed_op_count);
1182
1183 apply_random_ops(ELEMENTS, false, rng_seed + 1, &mut db)
1185 .await
1186 .unwrap();
1187
1188 db.simulate_commit_failure_after_any_db_commit()
1191 .await
1192 .unwrap();
1193
1194 let db = open_db(context.clone(), partition).await;
1197 let scenario_2_root = db.root(&mut hasher).await.unwrap();
1198
1199 let fresh_partition = "build_random_fail_commit_fresh";
1202 let mut db = open_db(context.clone(), fresh_partition).await;
1203 apply_random_ops(ELEMENTS, true, rng_seed, &mut db)
1204 .await
1205 .unwrap();
1206 apply_random_ops(ELEMENTS, false, rng_seed + 1, &mut db)
1207 .await
1208 .unwrap();
1209 db.commit().await.unwrap();
1210 db.prune(db.any.inactivity_floor_loc()).await.unwrap();
1211 assert_eq!(db.root(&mut hasher).await.unwrap(), scenario_2_root);
1213 db.close().await.unwrap();
1214
1215 let fresh_partition = "build_random_fail_commit_fresh_2";
1218 let mut db = open_db(context.clone(), fresh_partition).await;
1219 apply_random_ops(ELEMENTS, true, rng_seed, &mut db)
1220 .await
1221 .unwrap();
1222 apply_random_ops(ELEMENTS, false, rng_seed + 1, &mut db)
1223 .await
1224 .unwrap();
1225 db.simulate_commit_failure_after_bitmap_written()
1226 .await
1227 .unwrap();
1228 let db = open_db(context.clone(), fresh_partition).await;
1229 assert_eq!(db.root(&mut hasher).await.unwrap(), scenario_2_root);
1231
1232 db.destroy().await.unwrap();
1233 });
1234 }
1235
1236 #[test_traced("WARN")]
1237 pub fn test_current_db_different_pruning_delays_same_root() {
1238 let executor = deterministic::Runner::default();
1239 executor.start(|context| async move {
1240 let mut hasher = Standard::<Sha256>::new();
1241
1242 let db_config_no_pruning = current_db_config("no_pruning_test");
1244
1245 let db_config_pruning = current_db_config("pruning_test");
1246
1247 let mut db_no_pruning =
1248 CurrentTest::init(context.clone(), db_config_no_pruning.clone())
1249 .await
1250 .unwrap();
1251 let mut db_pruning = CurrentTest::init(context.clone(), db_config_pruning.clone())
1252 .await
1253 .unwrap();
1254
1255 const NUM_OPERATIONS: u64 = 1000;
1257 for i in 0..NUM_OPERATIONS {
1258 let key = Sha256::hash(&i.to_be_bytes());
1259 let value = Sha256::hash(&(i * 1000).to_be_bytes());
1260
1261 db_no_pruning.update(key, value).await.unwrap();
1262 db_pruning.update(key, value).await.unwrap();
1263
1264 if i % 50 == 49 {
1266 db_no_pruning.commit().await.unwrap();
1267 db_pruning.commit().await.unwrap();
1268 db_pruning
1269 .prune(db_no_pruning.any.inactivity_floor_loc())
1270 .await
1271 .unwrap();
1272 }
1273 }
1274
1275 db_no_pruning.commit().await.unwrap();
1277 db_pruning.commit().await.unwrap();
1278
1279 let root_no_pruning = db_no_pruning.root(&mut hasher).await.unwrap();
1281 let root_pruning = db_pruning.root(&mut hasher).await.unwrap();
1282
1283 assert_eq!(root_no_pruning, root_pruning);
1285
1286 db_no_pruning.close().await.unwrap();
1288 db_pruning.close().await.unwrap();
1289
1290 let db_no_pruning = CurrentTest::init(context.clone(), db_config_no_pruning)
1292 .await
1293 .unwrap();
1294 let db_pruning = CurrentTest::init(context.clone(), db_config_pruning)
1295 .await
1296 .unwrap();
1297 assert_eq!(
1298 db_no_pruning.inactivity_floor_loc(),
1299 db_pruning.inactivity_floor_loc()
1300 );
1301
1302 let root_no_pruning_restart = db_no_pruning.root(&mut hasher).await.unwrap();
1304 let root_pruning_restart = db_pruning.root(&mut hasher).await.unwrap();
1305
1306 assert_eq!(root_no_pruning, root_no_pruning_restart);
1308 assert_eq!(root_pruning, root_pruning_restart);
1309
1310 db_no_pruning.destroy().await.unwrap();
1311 db_pruning.destroy().await.unwrap();
1312 });
1313 }
1314}