1use crate::{
8 adb::{align_mmr_and_locations, Error},
9 index::Index,
10 journal::{
11 fixed::{Config as FConfig, Journal as FJournal},
12 variable::{Config as VConfig, Journal as VJournal},
13 },
14 mmr::{
15 hasher::Standard,
16 iterator::{leaf_num_to_pos, leaf_pos_to_num},
17 journaled::{Config as MmrConfig, Mmr},
18 verification::Proof,
19 },
20 store::operation::Variable as Operation,
21 translator::Translator,
22};
23use commonware_codec::{Codec, Encode as _, Read};
24use commonware_cryptography::Hasher as CHasher;
25use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
26use commonware_utils::{Array, NZUsize};
27use futures::{future::TryFutureExt, pin_mut, try_join, StreamExt};
28use std::{
29 collections::HashMap,
30 num::{NonZeroU64, NonZeroUsize},
31};
32use tracing::{debug, warn};
33
34pub mod sync;
35
36const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
39
40#[derive(Clone)]
42pub struct Config<T: Translator, C> {
43 pub mmr_journal_partition: String,
45
46 pub mmr_items_per_blob: NonZeroU64,
48
49 pub mmr_write_buffer: NonZeroUsize,
51
52 pub mmr_metadata_partition: String,
54
55 pub log_journal_partition: String,
57
58 pub log_write_buffer: NonZeroUsize,
60
61 pub log_compression: Option<u8>,
63
64 pub log_codec_config: C,
66
67 pub log_items_per_section: NonZeroU64,
69
70 pub locations_journal_partition: String,
72
73 pub locations_items_per_blob: NonZeroU64,
75
76 pub translator: T,
78
79 pub thread_pool: Option<ThreadPool>,
81
82 pub buffer_pool: PoolRef,
84}
85
86pub struct Any<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translator> {
89 mmr: Mmr<E, H>,
96
97 log: VJournal<E, Operation<K, V>>,
107
108 log_size: u64,
111
112 log_items_per_section: u64,
114
115 locations: FJournal<E, u32>,
118
119 inactivity_floor_loc: u64,
122
123 oldest_retained_loc: u64,
125
126 pub(super) snapshot: Index<T, u64>,
133
134 pub(super) uncommitted_ops: u64,
136
137 pub(super) hasher: Standard<H>,
139}
140
141impl<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translator>
142 Any<E, K, V, H, T>
143{
144 pub async fn init(
147 context: E,
148 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
149 ) -> Result<Self, Error> {
150 let snapshot: Index<T, u64> =
151 Index::init(context.with_label("snapshot"), cfg.translator.clone());
152 let mut hasher = Standard::<H>::new();
153
154 let mmr = Mmr::init(
155 context.with_label("mmr"),
156 &mut hasher,
157 MmrConfig {
158 journal_partition: cfg.mmr_journal_partition,
159 metadata_partition: cfg.mmr_metadata_partition,
160 items_per_blob: cfg.mmr_items_per_blob,
161 write_buffer: cfg.mmr_write_buffer,
162 thread_pool: cfg.thread_pool,
163 buffer_pool: cfg.buffer_pool.clone(),
164 },
165 )
166 .await?;
167
168 let log = VJournal::init(
169 context.with_label("log"),
170 VConfig {
171 partition: cfg.log_journal_partition,
172 compression: cfg.log_compression,
173 codec_config: cfg.log_codec_config,
174 buffer_pool: cfg.buffer_pool.clone(),
175 write_buffer: cfg.log_write_buffer,
176 },
177 )
178 .await?;
179
180 let locations = FJournal::init(
181 context.with_label("locations"),
182 FConfig {
183 partition: cfg.locations_journal_partition,
184 items_per_blob: cfg.locations_items_per_blob,
185 write_buffer: cfg.log_write_buffer,
186 buffer_pool: cfg.buffer_pool,
187 },
188 )
189 .await?;
190
191 let db = Self {
192 mmr,
193 log,
194 log_size: 0,
195 inactivity_floor_loc: 0,
196 oldest_retained_loc: 0,
197 locations,
198 log_items_per_section: cfg.log_items_per_section.get(),
199 uncommitted_ops: 0,
200 snapshot,
201 hasher,
202 };
203
204 db.build_snapshot_from_log().await
205 }
206
207 async fn build_snapshot_from_log(mut self) -> Result<Self, Error> {
218 let mut mmr_leaves = align_mmr_and_locations(&mut self.mmr, &mut self.locations).await?;
221
222 let mut after_last_commit = None;
224 let mut uncommitted_ops = HashMap::new();
226 let mut oldest_retained_loc_found = false;
227
228 {
231 let stream = self
232 .log
233 .replay(0, 0, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
234 .await?;
235 pin_mut!(stream);
236 while let Some(result) = stream.next().await {
237 match result {
238 Err(e) => {
239 return Err(Error::Journal(e));
240 }
241 Ok((section, offset, _, op)) => {
242 if !oldest_retained_loc_found {
243 self.log_size = section * self.log_items_per_section;
244 self.oldest_retained_loc = self.log_size;
245 oldest_retained_loc_found = true;
246 }
247
248 let loc = self.log_size; if after_last_commit.is_none() {
250 after_last_commit = Some((loc, offset));
251 }
252
253 self.log_size += 1;
254
255 let expected = loc / self.log_items_per_section;
258 assert_eq!(section, expected,
259 "given section {section} did not match expected section {expected} from location {loc}");
260
261 if self.log_size > mmr_leaves {
262 warn!(
263 section,
264 offset, "operation was missing from MMR/location map"
265 );
266 self.mmr.add(&mut self.hasher, &op.encode()).await?;
267 self.locations.append(offset).await?;
268 mmr_leaves += 1;
269 }
270
271 match op {
272 Operation::Delete(key) => {
273 let result = self.get_key_loc(&key).await?;
274 if let Some(old_loc) = result {
275 uncommitted_ops.insert(key, (Some(old_loc), None));
276 } else {
277 uncommitted_ops.remove(&key);
278 }
279 }
280 Operation::Update(key, _) => {
281 let result = self.get_key_loc(&key).await?;
282 if let Some(old_loc) = result {
283 uncommitted_ops.insert(key, (Some(old_loc), Some(loc)));
284 } else {
285 uncommitted_ops.insert(key, (None, Some(loc)));
286 }
287 }
288 Operation::CommitFloor(_, loc) => {
289 self.inactivity_floor_loc = loc;
290
291 for (key, (old_loc, new_loc)) in uncommitted_ops.iter() {
293 if let Some(old_loc) = old_loc {
294 if let Some(new_loc) = new_loc {
295 Self::update_loc(
296 &mut self.snapshot,
297 key,
298 *old_loc,
299 *new_loc,
300 );
301 } else {
302 Self::delete_loc(&mut self.snapshot, key, *old_loc);
303 }
304 } else {
305 assert!(new_loc.is_some());
306 self.snapshot.insert(key, new_loc.unwrap());
307 }
308 }
309 uncommitted_ops.clear();
310 after_last_commit = None;
311 }
312 _ => unreachable!(
313 "unexpected operation type at offset {offset} of section {section}"
314 ),
315 }
316 }
317 }
318 }
319 }
320
321 if let Some((end_loc, end_offset)) = after_last_commit {
323 assert!(!uncommitted_ops.is_empty());
324 warn!(
325 op_count = uncommitted_ops.len(),
326 log_size = end_loc,
327 end_offset,
328 "rewinding over uncommitted operations at end of log"
329 );
330 let prune_to_section = end_loc / self.log_items_per_section;
331 self.log
332 .rewind_to_offset(prune_to_section, end_offset)
333 .await?;
334 self.log.sync(prune_to_section).await?;
335 self.log_size = end_loc;
336 }
337
338 if mmr_leaves > self.log_size {
340 self.locations.rewind(self.log_size).await?;
341 self.locations.sync().await?;
342
343 let op_count = mmr_leaves - self.log_size;
344 warn!(op_count, "popping uncommitted MMR operations");
345 self.mmr.pop(op_count as usize).await?;
346 }
347
348 assert_eq!(self.log_size, leaf_pos_to_num(self.mmr.size()).unwrap());
350 assert_eq!(self.log_size, self.locations.size().await?);
351
352 debug!(log_size = self.log_size, "build_snapshot_from_log complete");
353
354 Ok(self)
355 }
356
357 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
359 let iter = self.snapshot.get(key);
360 for &loc in iter {
361 if let Some(v) = self.get_from_loc(key, loc).await? {
362 return Ok(Some(v));
363 }
364 }
365
366 Ok(None)
367 }
368
369 pub async fn get_loc(&self, loc: u64) -> Result<Option<V>, Error> {
376 assert!(loc < self.op_count());
377 if loc < self.oldest_retained_loc {
378 return Err(Error::OperationPruned(loc));
379 }
380
381 let offset = self.locations.read(loc).await?;
382 let section = loc / self.log_items_per_section;
383 let op = self.log.get(section, offset).await?;
384
385 Ok(op.into_value())
386 }
387
388 pub async fn get_key_loc(&self, key: &K) -> Result<Option<u64>, Error> {
391 let iter = self.snapshot.get(key);
392 for &loc in iter {
393 if self.get_from_loc(key, loc).await?.is_some() {
394 return Ok(Some(loc));
395 }
396 }
397
398 Ok(None)
399 }
400
401 fn delete_loc(snapshot: &mut Index<T, u64>, key: &K, delete_loc: u64) {
403 let Some(mut cursor) = snapshot.get_mut(key) else {
404 return;
405 };
406
407 while let Some(&loc) = cursor.next() {
408 if loc == delete_loc {
409 cursor.delete();
410 return;
411 }
412 }
413 }
414
415 fn update_loc(snapshot: &mut Index<T, u64>, key: &K, old_loc: u64, new_loc: u64) {
418 let Some(mut cursor) = snapshot.get_mut(key) else {
419 return;
420 };
421
422 while let Some(&loc) = cursor.next() {
423 if loc == old_loc {
424 cursor.update(new_loc);
425 return;
426 }
427 }
428 }
429
430 pub async fn get_from_loc(&self, key: &K, loc: u64) -> Result<Option<V>, Error> {
437 assert!(loc < self.op_count());
438
439 match self.locations.read(loc).await {
440 Ok(offset) => {
441 return self.get_from_offset(key, loc, offset).await;
442 }
443 Err(e) => Err(Error::Journal(e)),
444 }
445 }
446
447 async fn get_op(&self, loc: u64) -> Result<Operation<K, V>, Error> {
449 match self.locations.read(loc).await {
450 Ok(offset) => {
451 let section = loc / self.log_items_per_section;
452 self.log.get(section, offset).await.map_err(Error::Journal)
453 }
454 Err(e) => Err(Error::Journal(e)),
455 }
456 }
457
458 async fn get_from_offset(&self, key: &K, loc: u64, offset: u32) -> Result<Option<V>, Error> {
461 let section = loc / self.log_items_per_section;
462 let Operation::Update(k, v) = self.log.get(section, offset).await? else {
463 panic!("didn't find Update operation at location {loc} and offset {offset}");
464 };
465
466 if k != *key {
467 Ok(None)
468 } else {
469 Ok(Some(v))
470 }
471 }
472
473 pub fn op_count(&self) -> u64 {
476 self.log_size
477 }
478
479 fn current_section(&self) -> u64 {
481 self.log_size / self.log_items_per_section
482 }
483
484 pub fn oldest_retained_loc(&self) -> Option<u64> {
486 if self.log_size == 0 {
487 None
488 } else {
489 Some(self.oldest_retained_loc)
490 }
491 }
492
493 pub fn inactivity_floor_loc(&self) -> u64 {
496 self.inactivity_floor_loc
497 }
498
499 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
502 let new_loc = self.op_count();
503 if let Some(old_loc) = self.get_key_loc(&key).await? {
504 Self::update_loc(&mut self.snapshot, &key, old_loc, new_loc);
505 } else {
506 self.snapshot.insert(&key, new_loc);
507 };
508
509 let op = Operation::Update(key, value);
510 self.apply_op(op).await?;
511
512 Ok(())
513 }
514
515 pub async fn delete(&mut self, key: K) -> Result<(), Error> {
519 let Some(old_loc) = self.get_key_loc(&key).await? else {
520 return Ok(());
521 };
522
523 Self::delete_loc(&mut self.snapshot, &key, old_loc);
524 self.apply_op(Operation::Delete(key)).await?;
525
526 Ok(())
527 }
528
529 pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
535 self.mmr.root(hasher)
536 }
537
538 pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
541 let encoded_op = op.encode();
542 let section = self.current_section();
543
544 let log_fut = async {
547 let (offset, _) = self.log.append(section, op).await?;
548 self.locations.append(offset).await?;
549
550 Ok::<(), Error>(())
551 };
552
553 try_join!(
555 log_fut,
556 self.mmr
557 .add_batched(&mut self.hasher, &encoded_op)
558 .map_err(Error::Mmr),
559 )?;
560 self.uncommitted_ops += 1;
561 self.log_size += 1;
562
563 if self.current_section() != section {
565 self.log.sync(section).await?;
566 }
567
568 Ok(())
569 }
570
571 pub async fn proof(
582 &self,
583 start_loc: u64,
584 max_ops: u64,
585 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
586 self.historical_proof(self.op_count(), start_loc, max_ops)
587 .await
588 }
589
590 pub async fn historical_proof(
597 &self,
598 size: u64,
599 start_loc: u64,
600 max_ops: u64,
601 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
602 assert!(size <= self.op_count());
603 assert!(start_loc < size);
604
605 let start_pos = leaf_num_to_pos(start_loc);
606 let end_index = std::cmp::min(size - 1, start_loc + max_ops - 1);
607 let end_pos = leaf_num_to_pos(end_index);
608 let mmr_size = leaf_num_to_pos(size);
609
610 let proof = self
611 .mmr
612 .historical_range_proof(mmr_size, start_pos, end_pos)
613 .await?;
614 let mut ops = Vec::with_capacity((end_index - start_loc + 1) as usize);
615 for loc in start_loc..=end_index {
616 let section = loc / self.log_items_per_section;
617 let offset = self.locations.read(loc).await?;
618 let op = self.log.get(section, offset).await?;
619 ops.push(op);
620 }
621
622 Ok((proof, ops))
623 }
624
625 pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
633 self.raise_inactivity_floor(metadata, self.uncommitted_ops + 1)
636 .await?;
637
638 let section = self.current_section();
640 let mmr_fut = async {
641 self.mmr.process_updates(&mut self.hasher);
642 Ok::<(), Error>(())
643 };
644 try_join!(self.log.sync(section).map_err(Error::Journal), mmr_fut)?;
645
646 debug!(log_size = self.log_size, "commit complete");
647 self.uncommitted_ops = 0;
648
649 Ok(())
650 }
651
652 pub async fn get_metadata(&self) -> Result<Option<(u64, Option<V>)>, Error> {
655 let mut last_commit = self.op_count() - self.uncommitted_ops;
656 if last_commit == 0 {
657 return Ok(None);
658 }
659 last_commit -= 1;
660 let section = last_commit / self.log_items_per_section;
661 let offset = self.locations.read(last_commit).await?;
662 let Operation::CommitFloor(metadata, _) = self.log.get(section, offset).await? else {
663 unreachable!("no commit operation at location of last commit {last_commit}");
664 };
665
666 Ok(Some((last_commit, metadata)))
667 }
668
669 pub async fn sync(&mut self) -> Result<(), Error> {
673 let section = self.current_section();
674 try_join!(
675 self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
676 self.log.sync(section).map_err(Error::Journal),
677 self.locations.sync().map_err(Error::Journal),
678 )?;
679
680 Ok(())
681 }
682
683 pub(super) async fn move_op_if_active(
687 &mut self,
688 op: Operation<K, V>,
689 old_loc: u64,
690 ) -> Result<Option<u64>, Error> {
691 let Some(key) = op.key() else {
693 return Ok(None);
695 };
696 let new_loc = self.op_count();
697 let Some(mut cursor) = self.snapshot.get_mut(key) else {
698 return Ok(None);
699 };
700
701 while let Some(&loc) = cursor.next() {
703 if loc == old_loc {
704 cursor.update(new_loc);
706 drop(cursor);
707
708 self.apply_op(op).await?;
710 return Ok(Some(old_loc));
711 }
712 }
713
714 Ok(None)
716 }
717
718 async fn raise_inactivity_floor(
725 &mut self,
726 metadata: Option<V>,
727 max_steps: u64,
728 ) -> Result<(), Error> {
729 for _ in 0..max_steps {
730 if self.inactivity_floor_loc == self.op_count() {
731 break;
732 }
733 let op = self.get_op(self.inactivity_floor_loc).await?;
734 self.move_op_if_active(op, self.inactivity_floor_loc)
735 .await?;
736 self.inactivity_floor_loc += 1;
737 }
738
739 self.apply_op(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
740 .await?;
741
742 Ok(())
743 }
744
745 pub async fn prune(&mut self, target_prune_loc: u64) -> Result<(), Error> {
751 assert!(target_prune_loc <= self.inactivity_floor_loc);
752 if target_prune_loc <= self.oldest_retained_loc {
753 return Ok(());
754 }
755
756 try_join!(
762 self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
763 self.locations.sync().map_err(Error::Journal),
764 )?;
765
766 let section_with_target = target_prune_loc / self.log_items_per_section;
772 if !self.log.prune(section_with_target).await? {
773 return Ok(());
774 }
775 self.oldest_retained_loc = section_with_target * self.log_items_per_section;
776
777 debug!(
778 log_size = self.log_size,
779 oldest_retained_loc = self.oldest_retained_loc,
780 "pruned inactive ops"
781 );
782
783 try_join!(
785 self.locations
786 .prune(self.oldest_retained_loc)
787 .map_err(Error::Journal),
788 self.mmr
789 .prune_to_pos(&mut self.hasher, leaf_num_to_pos(self.oldest_retained_loc))
790 .map_err(Error::Mmr),
791 )?;
792
793 Ok(())
794 }
795
796 pub async fn close(mut self) -> Result<(), Error> {
798 if self.uncommitted_ops > 0 {
799 warn!(
800 op_count = self.uncommitted_ops,
801 "closing db with uncommitted operations"
802 );
803 }
804
805 try_join!(
806 self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
807 self.log.close().map_err(Error::Journal),
808 self.locations.close().map_err(Error::Journal),
809 )?;
810
811 Ok(())
812 }
813
814 pub async fn destroy(self) -> Result<(), Error> {
816 try_join!(
817 self.log.destroy().map_err(Error::Journal),
818 self.mmr.destroy().map_err(Error::Mmr),
819 self.locations.destroy().map_err(Error::Journal),
820 )?;
821
822 Ok(())
823 }
824
825 #[cfg(test)]
829 pub(super) async fn simulate_failure(
830 mut self,
831 sync_log: bool,
832 sync_locations: bool,
833 sync_mmr: bool,
834 write_limit: usize,
835 ) -> Result<(), Error> {
836 let section = self.current_section();
837 if sync_log {
838 self.log.sync(section).await?;
839 }
840 if sync_locations {
841 self.locations.sync().await?;
842 }
843 if sync_mmr {
844 assert_eq!(write_limit, 0);
845 self.mmr.sync(&mut self.hasher).await?;
846 } else if write_limit > 0 {
847 self.mmr
848 .simulate_partial_sync(&mut self.hasher, write_limit)
849 .await?;
850 }
851
852 Ok(())
853 }
854}
855
856#[cfg(test)]
857pub(super) mod test {
858 use super::*;
859 use crate::{
860 adb::verify_proof,
861 mmr::{hasher::Standard, mem::Mmr as MemMmr},
862 translator::TwoCap,
863 };
864 use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
865 use commonware_macros::test_traced;
866 use commonware_runtime::{deterministic, Runner as _};
867 use commonware_utils::NZU64;
868 use std::collections::HashMap;
869
870 const PAGE_SIZE: usize = 77;
871 const PAGE_CACHE_SIZE: usize = 9;
872
873 fn db_config(suffix: &str) -> Config<TwoCap, (commonware_codec::RangeCfg, ())> {
874 Config {
875 mmr_journal_partition: format!("journal_{suffix}"),
876 mmr_metadata_partition: format!("metadata_{suffix}"),
877 mmr_items_per_blob: NZU64!(11),
878 mmr_write_buffer: NZUsize!(1024),
879 log_journal_partition: format!("log_journal_{suffix}"),
880 log_items_per_section: NZU64!(7),
881 log_write_buffer: NZUsize!(1024),
882 log_compression: None,
883 log_codec_config: ((0..=10000).into(), ()),
884 locations_journal_partition: format!("locations_journal_{suffix}"),
885 locations_items_per_blob: NZU64!(7),
886 translator: TwoCap,
887 thread_pool: None,
888 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
889 }
890 }
891
892 type AnyTest = Any<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
894
895 async fn open_db(context: deterministic::Context) -> AnyTest {
897 AnyTest::init(context, db_config("partition"))
898 .await
899 .unwrap()
900 }
901
902 #[test_traced("WARN")]
903 pub fn test_any_variable_db_empty() {
904 let executor = deterministic::Runner::default();
905 executor.start(|context| async move {
906 let mut db = open_db(context.clone()).await;
907 let mut hasher = Standard::<Sha256>::new();
908 assert_eq!(db.op_count(), 0);
909 assert_eq!(db.oldest_retained_loc(), None);
910 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
911 assert_eq!(db.root(&mut hasher), MemMmr::default().root(&mut hasher));
912
913 let d1 = Sha256::fill(1u8);
915 let v1 = vec![1u8; 8];
916 let root = db.root(&mut hasher);
917 db.update(d1, v1).await.unwrap();
918 db.close().await.unwrap();
919 let mut db = open_db(context.clone()).await;
920 assert_eq!(db.root(&mut hasher), root);
921 assert_eq!(db.op_count(), 0);
922
923 db.commit(None).await.unwrap();
925 assert_eq!(db.op_count(), 1); let root = db.root(&mut hasher);
927 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
928 let mut db = open_db(context.clone()).await;
929 assert_eq!(db.root(&mut hasher), root);
930
931 for _ in 1..100 {
933 db.commit(None).await.unwrap();
934 assert_eq!(db.op_count() - 1, db.inactivity_floor_loc);
935 }
936
937 db.destroy().await.unwrap();
938 });
939 }
940
941 #[test_traced("WARN")]
942 pub fn test_any_variable_db_build_basic() {
943 let executor = deterministic::Runner::default();
944 executor.start(|context| async move {
945 let mut hasher = Standard::<Sha256>::new();
948 let mut db = open_db(context.clone()).await;
949
950 let d1 = Sha256::fill(1u8);
951 let d2 = Sha256::fill(2u8);
952 let v1 = vec![1u8; 8];
953 let v2 = vec![2u8; 20];
954
955 assert!(db.get(&d1).await.unwrap().is_none());
956 assert!(db.get(&d2).await.unwrap().is_none());
957
958 db.update(d1, v1.clone()).await.unwrap();
959 assert_eq!(db.get(&d1).await.unwrap().unwrap(), v1);
960 assert!(db.get(&d2).await.unwrap().is_none());
961
962 db.update(d2, v1.clone()).await.unwrap();
963 assert_eq!(db.get(&d1).await.unwrap().unwrap(), v1);
964 assert_eq!(db.get(&d2).await.unwrap().unwrap(), v1);
965
966 db.delete(d1).await.unwrap();
967 assert!(db.get(&d1).await.unwrap().is_none());
968 assert_eq!(db.get(&d2).await.unwrap().unwrap(), v1);
969
970 db.update(d1, v2.clone()).await.unwrap();
971 assert_eq!(db.get(&d1).await.unwrap().unwrap(), v2);
972
973 db.update(d2, v1.clone()).await.unwrap();
974 assert_eq!(db.get(&d2).await.unwrap().unwrap(), v1);
975
976 assert_eq!(db.op_count(), 5); assert_eq!(db.snapshot.keys(), 2);
978 assert_eq!(db.inactivity_floor_loc, 0);
979 db.sync().await.unwrap();
980
981 db.raise_inactivity_floor(None, 3).await.unwrap();
983 assert_eq!(db.inactivity_floor_loc, 3);
984 assert_eq!(db.op_count(), 6); db.sync().await.unwrap();
986
987 db.delete(d1).await.unwrap();
989 db.delete(d2).await.unwrap();
990 assert!(db.get(&d1).await.unwrap().is_none());
991 assert!(db.get(&d2).await.unwrap().is_none());
992 assert_eq!(db.op_count(), 8); assert_eq!(db.inactivity_floor_loc, 3);
994
995 db.sync().await.unwrap();
996
997 db.delete(d1).await.unwrap();
999 assert_eq!(db.op_count(), 8);
1000
1001 let d3 = Sha256::fill(3u8);
1003 db.delete(d3).await.unwrap();
1004 assert_eq!(db.op_count(), 8);
1005
1006 let metadata = Some(vec![99, 100]);
1008 db.commit(metadata.clone()).await.unwrap();
1009 assert_eq!(db.op_count(), 9);
1010 let root = db.root(&mut hasher);
1011 db.close().await.unwrap();
1012 let mut db = open_db(context.clone()).await;
1013 assert_eq!(db.op_count(), 9);
1014 assert_eq!(db.root(&mut hasher), root);
1015
1016 db.raise_inactivity_floor(None, 100).await.unwrap();
1019 assert_eq!(db.inactivity_floor_loc, db.op_count() - 1);
1020
1021 assert_eq!(db.get_metadata().await.unwrap(), Some((8, metadata)));
1023
1024 db.update(d1, v1.clone()).await.unwrap();
1026 db.update(d2, v2.clone()).await.unwrap();
1027 db.delete(d1).await.unwrap();
1028 db.update(d2, v1.clone()).await.unwrap();
1029 db.update(d1, v2.clone()).await.unwrap();
1030 assert_eq!(db.snapshot.keys(), 2);
1031
1032 db.commit(None).await.unwrap();
1034 assert_eq!(db.op_count(), 19);
1035 let root = db.root(&mut hasher);
1036 db.close().await.unwrap();
1037 let mut db = open_db(context.clone()).await;
1038 assert_eq!(db.root(&mut hasher), root);
1039 assert_eq!(db.snapshot.keys(), 2);
1040 assert_eq!(db.op_count(), 19);
1041 assert_eq!(db.get_metadata().await.unwrap(), Some((18, None)));
1042
1043 db.commit(None).await.unwrap();
1046
1047 assert!(db.root(&mut hasher) != root);
1048
1049 let root = db.root(&mut hasher);
1051 db.prune(db.inactivity_floor_loc()).await.unwrap();
1052 assert_eq!(db.snapshot.keys(), 2);
1053 assert_eq!(db.root(&mut hasher), root);
1054
1055 db.destroy().await.unwrap();
1056 });
1057 }
1058
1059 #[test_traced("WARN")]
1060 pub fn test_any_variable_db_build_and_authenticate() {
1061 let executor = deterministic::Runner::default();
1062 const ELEMENTS: u64 = 1000;
1065 executor.start(|context| async move {
1066 let mut hasher = Standard::<Sha256>::new();
1067 let mut db = open_db(context.clone()).await;
1068
1069 let mut map = HashMap::<Digest, Vec<u8>>::default();
1070 for i in 0u64..ELEMENTS {
1071 let k = Sha256::hash(&i.to_be_bytes());
1072 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1073 db.update(k, v.clone()).await.unwrap();
1074 map.insert(k, v);
1075 }
1076
1077 for i in 0u64..ELEMENTS {
1079 if i % 3 != 0 {
1080 continue;
1081 }
1082 let k = Sha256::hash(&i.to_be_bytes());
1083 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1084 db.update(k, v.clone()).await.unwrap();
1085 map.insert(k, v);
1086 }
1087
1088 for i in 0u64..ELEMENTS {
1090 if i % 7 != 1 {
1091 continue;
1092 }
1093 let k = Sha256::hash(&i.to_be_bytes());
1094 db.delete(k).await.unwrap();
1095 map.remove(&k);
1096 }
1097
1098 assert_eq!(db.op_count(), 1477);
1099 assert_eq!(db.inactivity_floor_loc, 0);
1100 assert_eq!(db.oldest_retained_loc().unwrap(), 0); assert_eq!(db.snapshot.items(), 857);
1102
1103 db.commit(None).await.unwrap();
1105 assert_eq!(db.op_count(), 2336);
1106 assert_eq!(db.inactivity_floor_loc, 1478);
1107 db.sync().await.unwrap();
1108 db.prune(db.inactivity_floor_loc()).await.unwrap();
1109 assert_eq!(db.oldest_retained_loc().unwrap(), 1477);
1110 assert_eq!(db.snapshot.items(), 857);
1111
1112 let root = db.root(&mut hasher);
1114 db.close().await.unwrap();
1115 let mut db = open_db(context.clone()).await;
1116 assert_eq!(root, db.root(&mut hasher));
1117 assert_eq!(db.op_count(), 2336);
1118 assert_eq!(db.inactivity_floor_loc, 1478);
1119 assert_eq!(db.snapshot.items(), 857);
1120
1121 db.raise_inactivity_floor(None, 3000).await.unwrap();
1123 db.prune(db.inactivity_floor_loc()).await.unwrap();
1124 assert_eq!(db.inactivity_floor_loc, 4478);
1125 assert_eq!(db.op_count(), 4478 + 858);
1128 assert_eq!(db.snapshot.items(), 857);
1129
1130 for i in 0u64..1000 {
1132 let k = Sha256::hash(&i.to_be_bytes());
1133 if let Some(map_value) = map.get(&k) {
1134 let Some(db_value) = db.get(&k).await.unwrap() else {
1135 panic!("key not found in db: {k}");
1136 };
1137 assert_eq!(*map_value, db_value);
1138 } else {
1139 assert!(db.get(&k).await.unwrap().is_none());
1140 }
1141 }
1142
1143 let max_ops = 4;
1146 let end_loc = db.op_count();
1147 let start_pos = db.mmr.pruned_to_pos();
1148 let start_loc = leaf_pos_to_num(start_pos).unwrap();
1149 db.raise_inactivity_floor(None, 100).await.unwrap();
1151 db.sync().await.unwrap();
1152 let root = db.root(&mut hasher);
1153 assert!(start_loc < db.inactivity_floor_loc);
1154
1155 for i in start_loc..end_loc {
1156 let (proof, log) = db.proof(i, max_ops).await.unwrap();
1157 assert!(verify_proof(&mut hasher, &proof, i, &log, &root));
1158 }
1159
1160 db.destroy().await.unwrap();
1161 });
1162 }
1163
1164 #[test_traced("WARN")]
1167 pub fn test_any_db_log_replay() {
1168 let executor = deterministic::Runner::default();
1169 executor.start(|context| async move {
1170 let mut hasher = Standard::<Sha256>::new();
1171 let mut db = open_db(context.clone()).await;
1172
1173 const UPDATES: u64 = 100;
1175 let k = Sha256::hash(&UPDATES.to_be_bytes());
1176 for i in 0u64..UPDATES {
1177 let v = vec![(i % 255) as u8; ((i % 7) + 3) as usize];
1178 db.update(k, v).await.unwrap();
1179 }
1180 db.commit(None).await.unwrap();
1181 let root = db.root(&mut hasher);
1182 db.close().await.unwrap();
1183
1184 let db = open_db(context.clone()).await;
1186 let iter = db.snapshot.get(&k);
1187 assert_eq!(iter.cloned().collect::<Vec<_>>().len(), 1);
1188 assert_eq!(db.root(&mut hasher), root);
1189
1190 db.destroy().await.unwrap();
1191 });
1192 }
1193
1194 #[test_traced("WARN")]
1195 pub fn test_any_db_multiple_commits_delete_gets_replayed() {
1196 let executor = deterministic::Runner::default();
1197 executor.start(|context| async move {
1198 let mut hasher = Standard::<Sha256>::new();
1199 let mut db = open_db(context.clone()).await;
1200
1201 let mut map = HashMap::<Digest, Vec<u8>>::default();
1202 const ELEMENTS: u64 = 10;
1203 for j in 0u64..ELEMENTS {
1205 for i in 0u64..ELEMENTS {
1206 let k = Sha256::hash(&(j * 1000 + i).to_be_bytes());
1207 let v = vec![(i % 255) as u8; ((i % 7) + 3) as usize];
1208 db.update(k, v.clone()).await.unwrap();
1209 map.insert(k, v);
1210 }
1211 db.commit(None).await.unwrap();
1212 }
1213 let k = Sha256::hash(&((ELEMENTS - 1) * 1000 + (ELEMENTS - 1)).to_be_bytes());
1214
1215 db.delete(k).await.unwrap();
1218 db.commit(None).await.unwrap();
1219 assert!(db.get(&k).await.unwrap().is_none());
1220
1221 let root = db.root(&mut hasher);
1223 db.close().await.unwrap();
1224 let db = open_db(context.clone()).await;
1225 assert_eq!(root, db.root(&mut hasher));
1226 assert!(db.get(&k).await.unwrap().is_none());
1227
1228 db.destroy().await.unwrap();
1229 });
1230 }
1231
1232 #[test_traced("WARN")]
1233 pub fn test_any_variable_db_recovery() {
1234 let executor = deterministic::Runner::default();
1235 const ELEMENTS: u64 = 1000;
1237 executor.start(|context| async move {
1238 let mut hasher = Standard::<Sha256>::new();
1239 let mut db = open_db(context.clone()).await;
1240 let root = db.root(&mut hasher);
1241
1242 for i in 0u64..ELEMENTS {
1243 let k = Sha256::hash(&i.to_be_bytes());
1244 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1245 db.update(k, v.clone()).await.unwrap();
1246 }
1247
1248 db.simulate_failure(false, false, false, 0).await.unwrap();
1250 let mut db = open_db(context.clone()).await;
1251 assert_eq!(root, db.root(&mut hasher));
1252
1253 for i in 0u64..ELEMENTS {
1255 let k = Sha256::hash(&i.to_be_bytes());
1256 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1257 db.update(k, v.clone()).await.unwrap();
1258 }
1259 db.commit(None).await.unwrap();
1260 let root = db.root(&mut hasher);
1261
1262 for i in 0u64..ELEMENTS {
1264 if i % 3 != 0 {
1265 continue;
1266 }
1267 let k = Sha256::hash(&i.to_be_bytes());
1268 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1269 db.update(k, v.clone()).await.unwrap();
1270 }
1271
1272 db.simulate_failure(false, false, false, 0).await.unwrap();
1274 let mut db = open_db(context.clone()).await;
1275 assert_eq!(root, db.root(&mut hasher));
1276
1277 for i in 0u64..ELEMENTS {
1279 if i % 3 != 0 {
1280 continue;
1281 }
1282 let k = Sha256::hash(&i.to_be_bytes());
1283 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1284 db.update(k, v.clone()).await.unwrap();
1285 }
1286 db.commit(None).await.unwrap();
1287 let root = db.root(&mut hasher);
1288
1289 for i in 0u64..ELEMENTS {
1291 if i % 7 != 1 {
1292 continue;
1293 }
1294 let k = Sha256::hash(&i.to_be_bytes());
1295 db.delete(k).await.unwrap();
1296 }
1297
1298 db.simulate_failure(false, false, false, 0).await.unwrap();
1300 let mut db = open_db(context.clone()).await;
1301 assert_eq!(root, db.root(&mut hasher));
1302
1303 for i in 0u64..ELEMENTS {
1305 if i % 7 != 1 {
1306 continue;
1307 }
1308 let k = Sha256::hash(&i.to_be_bytes());
1309 db.delete(k).await.unwrap();
1310 }
1311 db.commit(None).await.unwrap();
1312
1313 let root = db.root(&mut hasher);
1314 assert_eq!(db.op_count(), 2787);
1315 assert_eq!(leaf_pos_to_num(db.mmr.size()), Some(2787));
1316 assert_eq!(db.locations.size().await.unwrap(), 2787);
1317 assert_eq!(db.inactivity_floor_loc, 1480);
1318 db.sync().await.unwrap(); db.prune(db.inactivity_floor_loc()).await.unwrap();
1320 assert_eq!(db.oldest_retained_loc().unwrap(), 1477);
1321 assert_eq!(db.snapshot.items(), 857);
1322 db.close().await.unwrap();
1323
1324 let db = open_db(context.clone()).await;
1325 assert_eq!(root, db.root(&mut hasher));
1326 assert_eq!(db.op_count(), 2787);
1327 assert_eq!(leaf_pos_to_num(db.mmr.size()), Some(2787));
1328 assert_eq!(db.locations.size().await.unwrap(), 2787);
1329 assert_eq!(db.inactivity_floor_loc, 1480);
1330 assert_eq!(db.oldest_retained_loc().unwrap(), 1477);
1331 assert_eq!(db.snapshot.items(), 857);
1332
1333 db.destroy().await.unwrap();
1334 });
1335 }
1336
1337 #[test_traced("WARN")]
1340 fn test_any_variable_non_empty_db_recovery() {
1341 let executor = deterministic::Runner::default();
1342 executor.start(|context| async move {
1343 let mut hasher = Standard::<Sha256>::new();
1344 let mut db = open_db(context.clone()).await;
1345
1346 for i in 0u64..1000 {
1348 let k = Sha256::hash(&i.to_be_bytes());
1349 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1350 db.update(k, v).await.unwrap();
1351 }
1352 db.commit(None).await.unwrap();
1353 db.prune(db.inactivity_floor_loc()).await.unwrap();
1354 let root = db.root(&mut hasher);
1355 let op_count = db.op_count();
1356 let inactivity_floor_loc = db.inactivity_floor_loc();
1357
1358 let mut db = open_db(context.clone()).await;
1360 assert_eq!(db.op_count(), op_count);
1361 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1362 assert_eq!(db.root(&mut hasher), root);
1363
1364 async fn apply_more_ops(
1365 db: &mut Any<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>,
1366 ) {
1367 for i in 0u64..1000 {
1368 let k = Sha256::hash(&i.to_be_bytes());
1369 let v = vec![(i % 255) as u8; ((i % 13) + 8) as usize];
1370 db.update(k, v).await.unwrap();
1371 }
1372 }
1373
1374 apply_more_ops(&mut db).await;
1376 db.simulate_failure(false, false, false, 0).await.unwrap();
1377 let mut db = open_db(context.clone()).await;
1378 assert_eq!(db.op_count(), op_count);
1379 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1380 assert_eq!(db.root(&mut hasher), root);
1381
1382 apply_more_ops(&mut db).await;
1384 db.simulate_failure(true, false, false, 10).await.unwrap();
1385 let mut db = open_db(context.clone()).await;
1386 assert_eq!(db.op_count(), op_count);
1387 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1388 assert_eq!(db.root(&mut hasher), root);
1389
1390 apply_more_ops(&mut db).await;
1392 db.simulate_failure(false, true, false, 0).await.unwrap();
1393 let mut db = open_db(context.clone()).await;
1394 assert_eq!(db.op_count(), op_count);
1395 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1396 assert_eq!(db.root(&mut hasher), root);
1397
1398 apply_more_ops(&mut db).await;
1400 db.simulate_failure(false, false, true, 0).await.unwrap();
1401 let mut db = open_db(context.clone()).await;
1402 assert_eq!(db.op_count(), op_count);
1403 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1404 assert_eq!(db.root(&mut hasher), root);
1405
1406 apply_more_ops(&mut db).await;
1408 db.simulate_failure(true, false, false, 0).await.unwrap();
1409 let mut db = open_db(context.clone()).await;
1410 assert_eq!(db.op_count(), op_count);
1411 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1412 assert_eq!(db.root(&mut hasher), root);
1413 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1414
1415 apply_more_ops(&mut db).await;
1417 db.simulate_failure(true, true, false, 0).await.unwrap();
1418 let mut db = open_db(context.clone()).await;
1419 assert_eq!(db.op_count(), op_count);
1420 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1421 assert_eq!(db.root(&mut hasher), root);
1422 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1423
1424 apply_more_ops(&mut db).await;
1426 db.simulate_failure(false, true, true, 0).await.unwrap();
1427 let mut db = open_db(context.clone()).await;
1428 assert_eq!(db.op_count(), op_count);
1429 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1430 assert_eq!(db.root(&mut hasher), root);
1431 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1432
1433 apply_more_ops(&mut db).await;
1435 apply_more_ops(&mut db).await;
1436 apply_more_ops(&mut db).await;
1437 let mut db = open_db(context.clone()).await;
1438 assert_eq!(db.op_count(), op_count);
1439 assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
1440 assert_eq!(db.root(&mut hasher), root);
1441
1442 apply_more_ops(&mut db).await;
1444 db.commit(None).await.unwrap();
1445 let db = open_db(context.clone()).await;
1446 assert!(db.op_count() > op_count);
1447 assert_ne!(db.inactivity_floor_loc(), inactivity_floor_loc);
1448 assert_ne!(db.root(&mut hasher), root);
1449
1450 db.destroy().await.unwrap();
1451 });
1452 }
1453
1454 #[test_traced("WARN")]
1457 fn test_any_variable_empty_db_recovery() {
1458 let executor = deterministic::Runner::default();
1459 executor.start(|context| async move {
1460 let mut hasher = Standard::<Sha256>::new();
1462 let db = open_db(context.clone()).await;
1463 let root = db.root(&mut hasher);
1464
1465 let mut db = open_db(context.clone()).await;
1467 assert_eq!(db.op_count(), 0);
1468 assert_eq!(db.root(&mut hasher), root);
1469
1470 async fn apply_ops(
1471 db: &mut Any<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>,
1472 ) {
1473 for i in 0u64..1000 {
1474 let k = Sha256::hash(&i.to_be_bytes());
1475 let v = vec![(i % 255) as u8; ((i % 13) + 8) as usize];
1476 db.update(k, v).await.unwrap();
1477 }
1478 }
1479
1480 apply_ops(&mut db).await;
1482 db.simulate_failure(false, false, false, 1).await.unwrap();
1483 let mut db = open_db(context.clone()).await;
1484 assert_eq!(db.op_count(), 0);
1485 assert_eq!(db.root(&mut hasher), root);
1486
1487 apply_ops(&mut db).await;
1489 db.simulate_failure(true, false, false, 0).await.unwrap();
1490 let mut db = open_db(context.clone()).await;
1491 assert_eq!(db.op_count(), 0);
1492 assert_eq!(db.root(&mut hasher), root);
1493
1494 apply_ops(&mut db).await;
1496 db.simulate_failure(false, true, false, 0).await.unwrap();
1497 let mut db = open_db(context.clone()).await;
1498 assert_eq!(db.op_count(), 0);
1499 assert_eq!(db.root(&mut hasher), root);
1500
1501 apply_ops(&mut db).await;
1503 apply_ops(&mut db).await;
1504 apply_ops(&mut db).await;
1505 let mut db = open_db(context.clone()).await;
1506 assert_eq!(db.op_count(), 0);
1507 assert_eq!(db.root(&mut hasher), root);
1508
1509 apply_ops(&mut db).await;
1511 db.commit(None).await.unwrap();
1512 let db = open_db(context.clone()).await;
1513 assert!(db.op_count() > 0);
1514 assert_ne!(db.root(&mut hasher), root);
1515
1516 db.destroy().await.unwrap();
1517 });
1518 }
1519}