1use crate::{
328 index::Factory as IndexFactory,
329 journal::{
330 authenticated::Inner,
331 contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
332 },
333 merkle::{self, full::Config as MerkleConfig, Location},
334 qmdb::{
335 self,
336 any::{
337 self,
338 operation::{Operation, Update},
339 Config as AnyConfig,
340 },
341 bitmap::Shared,
342 operation::Committable,
343 },
344 translator::Translator,
345 Context,
346};
347use commonware_codec::{CodecShared, FixedSize};
348use commonware_cryptography::Hasher;
349use commonware_parallel::Strategy;
350use commonware_utils::{bitmap::Prunable as BitMap, sync::AsyncMutex};
351use std::sync::Arc;
352
353pub mod batch;
354pub mod db;
355pub mod grafting;
356
357pub mod ordered;
358pub mod proof;
359pub(crate) mod sync;
360pub mod unordered;
361
362use self::db::Metrics;
363
364#[derive(Clone)]
366pub struct Config<T: Translator, J, S: Strategy> {
367 pub merkle_config: MerkleConfig<S>,
369
370 pub journal_config: J,
372
373 pub grafted_metadata_partition: String,
375
376 pub translator: T,
378}
379
380impl<T: Translator, J, S: Strategy> From<Config<T, J, S>> for AnyConfig<T, J, S> {
381 fn from(cfg: Config<T, J, S>) -> Self {
382 Self {
383 merkle_config: cfg.merkle_config,
384 journal_config: cfg.journal_config,
385 translator: cfg.translator,
386 }
387 }
388}
389
390pub type FixedConfig<T, S> = Config<T, FConfig, S>;
392
393pub type VariableConfig<T, C, S> = Config<T, VConfig<C>, S>;
395
396pub(super) async fn init<F, E, U, H, T, I, J, const N: usize, S>(
398 context: E,
399 config: Config<T, J::Config, S>,
400) -> Result<db::Db<F, E, J, I, H, U, N, S>, crate::qmdb::Error<F>>
401where
402 F: merkle::Graftable,
403 E: Context,
404 U: Update + Send + Sync,
405 H: Hasher,
406 T: Translator,
407 I: IndexFactory<T, Value = Location<F>>,
408 J: Inner<E, Item = Operation<F, U>>,
409 S: Strategy,
410 Operation<F, U>: Committable + CodecShared,
411{
412 const {
414 assert!(
418 N.is_multiple_of(H::Digest::SIZE),
419 "chunk size must be some multiple of the digest size",
420 );
421 assert!(N.is_power_of_two(), "chunk size must be a power of 2");
424 }
425
426 let strategy = config.merkle_config.strategy.clone();
427 let metadata_partition = config.grafted_metadata_partition.clone();
428
429 let (metadata, pruned_chunks, pinned_nodes) =
431 db::init_metadata(context.child("metadata"), &metadata_partition).await?;
432
433 let bitmap = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
437 .map_err(|_| crate::qmdb::Error::<F>::DataCorrupted("pruned chunks overflow"))?;
438 let bitmap = Arc::new(Shared::<N>::new(bitmap));
439
440 let any = any::init_with_bitmap(context.child("any"), config.into(), Some(bitmap)).await?;
441
442 let hasher = qmdb::hasher::<H>();
444 let ops_size = any.log.merkle.size();
445 let ops_leaves = crate::merkle::Location::<F>::try_from(ops_size)?;
446 let grafted_tree = db::build_grafted_tree::<F, H, S, N>(
447 &hasher,
448 any.bitmap.as_ref(),
449 &pinned_nodes,
450 &any.log.merkle,
451 ops_leaves,
452 &strategy,
453 )
454 .await?;
455
456 let storage = grafting::Storage::new(
458 &grafted_tree,
459 grafting::height::<N>(),
460 &any.log.merkle,
461 hasher.clone(),
462 );
463 let partial_chunk = db::partial_chunk(any.bitmap.as_ref());
464 let ops_root = any.root();
465 let root = db::compute_db_root(
466 &hasher,
467 any.bitmap.as_ref(),
468 &storage,
469 ops_leaves,
470 partial_chunk,
471 any.inactivity_floor_loc,
472 &ops_root,
473 )
474 .await?;
475
476 let metrics = Metrics::new(context);
477 let db = db::Db {
478 any,
479 grafted_tree,
480 metadata: AsyncMutex::new(metadata),
481 strategy,
482 root,
483 metrics,
484 };
485 db.update_metrics();
486 Ok(db)
487}
488
489#[cfg(any(test, feature = "test-traits"))]
491pub trait BitmapPrunedBits {
492 fn pruned_bits(&self) -> u64;
494
495 fn get_bit(&self, index: u64) -> bool;
497
498 fn oldest_retained(&self) -> impl core::future::Future<Output = u64> + Send;
500}
501
502#[cfg(test)]
503pub mod tests {
504 pub use super::BitmapPrunedBits;
507 use super::{ordered, unordered, FConfig, FixedConfig, MerkleConfig, VConfig, VariableConfig};
508 use crate::{
509 merkle::{self, mmb, mmr, Bagging::ForwardFold},
510 qmdb::{
511 self,
512 any::{
513 test::colliding_digest,
514 traits::{DbAny, MerkleizedBatch as _, UnmerkleizedBatch as _},
515 },
516 store::tests::{TestKey, TestValue},
517 },
518 translator::Translator,
519 };
520 use commonware_parallel::Sequential;
521 use commonware_runtime::{
522 buffer::paged::CacheRef,
523 deterministic::{self, Context},
524 BufferPooler, Runner as _, Supervisor as _,
525 };
526 use commonware_utils::{bitmap::Readable, NZUsize, NZU16, NZU64};
527 use core::future::Future;
528 use rand::{rngs::StdRng, RngCore, SeedableRng};
529 use std::{
530 num::{NonZeroU16, NonZeroUsize},
531 sync::Arc,
532 };
533 use tracing::warn;
534
535 type Error<F> = crate::qmdb::Error<F>;
536 type Location<F> = merkle::Location<F>;
537 type WriteVec<F, C> = Vec<(<C as DbAny<F>>::Key, Option<<C as DbAny<F>>::Value>)>;
538
539 const PAGE_SIZE: NonZeroU16 = NZU16!(88);
541 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(8);
542
543 pub(crate) fn fixed_config<T: Translator + Default>(
545 partition_prefix: &str,
546 pooler: &impl BufferPooler,
547 ) -> FixedConfig<T, Sequential> {
548 let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
549 FixedConfig {
550 merkle_config: MerkleConfig {
551 journal_partition: format!("{partition_prefix}-journal-partition"),
552 metadata_partition: format!("{partition_prefix}-metadata-partition"),
553 items_per_blob: NZU64!(11),
554 write_buffer: NZUsize!(1024),
555 strategy: Sequential,
556 page_cache: page_cache.clone(),
557 },
558 journal_config: FConfig {
559 partition: format!("{partition_prefix}-partition-prefix"),
560 items_per_blob: NZU64!(7),
561 page_cache,
562 write_buffer: NZUsize!(1024),
563 },
564 grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
565 translator: T::default(),
566 }
567 }
568
569 pub(crate) fn variable_config<T: Translator + Default>(
571 partition_prefix: &str,
572 pooler: &impl BufferPooler,
573 ) -> VariableConfig<T, ((), ()), Sequential> {
574 let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
575 VariableConfig {
576 merkle_config: MerkleConfig {
577 journal_partition: format!("{partition_prefix}-journal-partition"),
578 metadata_partition: format!("{partition_prefix}-metadata-partition"),
579 items_per_blob: NZU64!(11),
580 write_buffer: NZUsize!(1024),
581 strategy: Sequential,
582 page_cache: page_cache.clone(),
583 },
584 journal_config: VConfig {
585 partition: format!("{partition_prefix}-partition-prefix"),
586 items_per_section: NZU64!(7),
587 compression: None,
588 codec_config: ((), ()),
589 page_cache,
590 write_buffer: NZUsize!(1024),
591 },
592 grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
593 translator: T::default(),
594 }
595 }
596
597 async fn commit_writes<F: merkle::Graftable, C: DbAny<F>>(
599 db: &mut C,
600 writes: impl IntoIterator<Item = (C::Key, Option<<C as DbAny<F>>::Value>)>,
601 ) -> Result<(), Error<F>> {
602 let mut batch = db.new_batch();
603 for (k, v) in writes {
604 batch = batch.write(k, v);
605 }
606 let merkleized = batch.merkleize(db, None).await?;
607 db.apply_batch(merkleized).await?;
608 db.commit().await?;
609 Ok(())
610 }
611
612 async fn apply_random_ops_inner<F, C>(
617 num_elements: u64,
618 commit_changes: bool,
619 rng_seed: u64,
620 mut db: C,
621 ) -> Result<C, Error<F>>
622 where
623 F: merkle::Graftable,
624 C: DbAny<F>,
625 C::Key: TestKey,
626 <C as DbAny<F>>::Value: TestValue,
627 {
628 warn!("rng_seed={}", rng_seed);
630 let mut rng = StdRng::seed_from_u64(rng_seed);
631
632 let writes: Vec<_> = (0u64..num_elements)
634 .map(|i| {
635 let k = TestKey::from_seed(i);
636 let v = TestValue::from_seed(rng.next_u64());
637 (k, Some(v))
638 })
639 .collect();
640 if commit_changes {
641 commit_writes(&mut db, writes).await?;
642 }
643
644 let mut pending: WriteVec<F, C> = Vec::new();
647 for _ in 0u64..num_elements * 10 {
648 let rand_key = TestKey::from_seed(rng.next_u64() % num_elements);
649 if rng.next_u32() % 7 == 0 {
650 pending.push((rand_key, None));
651 continue;
652 }
653 let v = TestValue::from_seed(rng.next_u64());
654 pending.push((rand_key, Some(v)));
655 if commit_changes && rng.next_u32() % 20 == 0 {
656 commit_writes(&mut db, pending.drain(..)).await?;
657 }
658 }
659 if commit_changes {
660 commit_writes(&mut db, pending).await?;
661 }
662 Ok(db)
663 }
664
665 pub fn apply_random_ops<F, C>(
666 num_elements: u64,
667 commit_changes: bool,
668 rng_seed: u64,
669 db: C,
670 ) -> std::pin::Pin<Box<dyn Future<Output = Result<C, Error<F>>>>>
671 where
672 F: merkle::Graftable + 'static,
673 C: DbAny<F> + 'static,
674 C::Key: TestKey,
675 <C as DbAny<F>>::Value: TestValue,
676 {
677 Box::pin(apply_random_ops_inner::<F, C>(
678 num_elements,
679 commit_changes,
680 rng_seed,
681 db,
682 ))
683 }
684
685 pub fn test_build_random_close_reopen<M, C, F, Fut>(mut open_db: F)
690 where
691 M: merkle::Graftable + 'static,
692 C: DbAny<M> + 'static,
693 C::Key: TestKey,
694 <C as DbAny<M>>::Value: TestValue,
695 F: FnMut(Context, String) -> Fut + Clone,
696 Fut: Future<Output = C>,
697 {
698 const ELEMENTS: u64 = 1000;
699
700 let executor = deterministic::Runner::default();
701 let mut open_db_clone = open_db.clone();
702 let state1 = executor.start(|mut context| async move {
703 let partition = "build-random".to_string();
704 let rng_seed = context.next_u64();
705 let mut db: C = open_db_clone(context.child("first"), partition.clone()).await;
706 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
707 .await
708 .unwrap();
709 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
710 db.apply_batch(merkleized).await.unwrap();
711 db.sync().await.unwrap();
712
713 let root = db.root();
715 drop(db);
716 let db: C = open_db_clone(context.child("second"), partition).await;
717
718 assert_eq!(db.root(), root);
720
721 db.destroy().await.unwrap();
722 context.auditor().state()
723 });
724
725 let executor = deterministic::Runner::default();
727 let state2 = executor.start(|mut context| async move {
728 let partition = "build-random".to_string();
729 let rng_seed = context.next_u64();
730 let mut db: C = open_db(context.child("first"), partition.clone()).await;
731 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
732 .await
733 .unwrap();
734 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
735 db.apply_batch(merkleized).await.unwrap();
736 db.sync().await.unwrap();
737
738 let root = db.root();
739 drop(db);
740 let db: C = open_db(context.child("second"), partition).await;
741 assert_eq!(db.root(), root);
742
743 db.destroy().await.unwrap();
744 context.auditor().state()
745 });
746
747 assert_eq!(state1, state2);
748 }
749
750 pub fn test_simulate_write_failures<M, C, F, Fut>(mut open_db: F)
755 where
756 M: merkle::Graftable + 'static,
757 C: DbAny<M> + 'static,
758 C::Key: TestKey,
759 <C as DbAny<M>>::Value: TestValue,
760 F: FnMut(Context, String) -> Fut + Clone,
761 Fut: Future<Output = C>,
762 {
763 const ELEMENTS: u64 = 1000;
764
765 let executor = deterministic::Runner::default();
766 executor.start(|mut context| {
768 Box::pin(async move {
769 let partition = "build-random-fail-commit".to_string();
770 let rng_seed = context.next_u64();
771 let mut db: C = open_db(context.child("first"), partition.clone()).await;
772 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
773 .await
774 .unwrap();
775 commit_writes(&mut db, []).await.unwrap();
776 let committed_root = db.root();
777 let committed_op_count = db.bounds().await.end;
778 db.prune(db.sync_boundary().await).await.unwrap();
779
780 let db = apply_random_ops::<M, C>(ELEMENTS, false, rng_seed + 1, db)
782 .await
783 .unwrap();
784
785 drop(db);
788 let db: C = open_db(
789 context.child("scenario").with_attribute("index", 1),
790 partition.clone(),
791 )
792 .await;
793 assert_eq!(db.root(), committed_root);
794 assert_eq!(db.bounds().await.end, committed_op_count);
795
796 let db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
798 .await
799 .unwrap();
800
801 let committed_op_count = db.bounds().await.end;
805 drop(db);
806
807 let db: C = open_db(
810 context.child("scenario").with_attribute("index", 2),
811 partition.clone(),
812 )
813 .await;
814 let scenario_2_root = db.root();
815
816 let fresh_partition = "build-random-fail-commit-fresh".to_string();
819 let mut db: C = open_db(context.child("fresh"), fresh_partition.clone()).await;
820 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
821 .await
822 .unwrap();
823 commit_writes(&mut db, []).await.unwrap();
824 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
825 .await
826 .unwrap();
827 db.prune(db.sync_boundary().await).await.unwrap();
828 assert_eq!(db.bounds().await.end, committed_op_count);
830 assert_eq!(db.root(), scenario_2_root);
831
832 db.destroy().await.unwrap();
833 })
834 });
835 }
836
837 pub fn test_different_pruning_delays_same_root<M, C, F, Fut>(mut open_db: F)
842 where
843 M: merkle::Graftable,
844 C: DbAny<M>,
845 C::Key: TestKey,
846 <C as DbAny<M>>::Value: TestValue,
847 F: FnMut(Context, String) -> Fut + Clone,
848 Fut: Future<Output = C>,
849 {
850 const NUM_OPERATIONS: u64 = 1000;
851
852 let executor = deterministic::Runner::default();
853 let mut open_db_clone = open_db.clone();
854 executor.start(|context| async move {
855 let mut db_no_pruning: C =
857 open_db_clone(context.child("no_pruning"), "no-pruning-test".into()).await;
858 let mut db_pruning: C = open_db(context.child("pruning"), "pruning-test".into()).await;
859
860 let mut pending_no_pruning: WriteVec<M, C> = Vec::new();
863 let mut pending_pruning: WriteVec<M, C> = Vec::new();
864 for i in 0..NUM_OPERATIONS {
865 let key: C::Key = TestKey::from_seed(i);
866 let value: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
867
868 pending_no_pruning.push((key, Some(value.clone())));
869 pending_pruning.push((key, Some(value)));
870
871 if i % 50 == 49 {
873 commit_writes(&mut db_no_pruning, pending_no_pruning.drain(..))
874 .await
875 .unwrap();
876 commit_writes(&mut db_pruning, pending_pruning.drain(..))
877 .await
878 .unwrap();
879 db_pruning
880 .prune(db_no_pruning.sync_boundary().await)
881 .await
882 .unwrap();
883 }
884 }
885
886 commit_writes(&mut db_no_pruning, pending_no_pruning)
888 .await
889 .unwrap();
890 commit_writes(&mut db_pruning, pending_pruning)
891 .await
892 .unwrap();
893
894 let root_no_pruning = db_no_pruning.root();
896 let root_pruning = db_pruning.root();
897 assert_eq!(root_no_pruning, root_pruning);
898
899 assert_eq!(
901 db_no_pruning.inactivity_floor_loc().await,
902 db_pruning.inactivity_floor_loc().await
903 );
904
905 db_no_pruning.destroy().await.unwrap();
906 db_pruning.destroy().await.unwrap();
907 });
908 }
909
910 pub fn test_sync_persists_bitmap_pruning_boundary<M, C, F, Fut>(mut open_db: F)
916 where
917 M: merkle::Graftable + 'static,
918 C: DbAny<M> + BitmapPrunedBits + 'static,
919 C::Key: TestKey,
920 <C as DbAny<M>>::Value: TestValue,
921 F: FnMut(Context, String) -> Fut + Clone,
922 Fut: Future<Output = C>,
923 {
924 const ELEMENTS: u64 = 500;
925
926 let executor = deterministic::Runner::default();
927 let mut open_db_clone = open_db.clone();
928 executor.start(|mut context| async move {
929 let partition = "sync-bitmap-pruning".to_string();
930 let rng_seed = context.next_u64();
931 let mut db: C = open_db_clone(context.child("first"), partition.clone()).await;
932
933 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db).await.unwrap();
935 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
936 db.apply_batch(merkleized).await.unwrap();
937
938 db.prune(db.sync_boundary().await).await.unwrap();
940
941 let pruned_bits_before = db.pruned_bits();
942 warn!(
943 "pruned_bits_before={}, inactivity_floor={}, op_count={}",
944 pruned_bits_before,
945 *db.inactivity_floor_loc().await,
946 *db.bounds().await.end
947 );
948
949 assert!(
951 pruned_bits_before > 0,
952 "Expected bitmap to have pruned bits after prune()"
953 );
954
955 db.sync().await.unwrap();
957
958 let root_before = db.root();
960 drop(db);
961
962 let db: C = open_db(context.child("second"), partition).await;
964
965 let pruned_bits_after = db.pruned_bits();
968 warn!("pruned_bits_after={}", pruned_bits_after);
969
970 assert_eq!(
971 pruned_bits_after, pruned_bits_before,
972 "Bitmap pruned bits mismatch after reopen - sync() may not have called write_pruned()"
973 );
974
975 assert_eq!(db.root(), root_before);
977
978 db.destroy().await.unwrap();
979 });
980 }
981
982 pub fn test_current_db_build_big<M, C, F, Fut>(mut open_db: F)
988 where
989 M: merkle::Graftable,
990 C: DbAny<M>,
991 C::Key: TestKey,
992 <C as DbAny<M>>::Value: TestValue,
993 F: FnMut(Context, String) -> Fut + Clone,
994 Fut: Future<Output = C>,
995 {
996 const ELEMENTS: u64 = 1000;
997
998 let executor = deterministic::Runner::default();
999 let mut open_db_clone = open_db.clone();
1000 executor.start(|context| async move {
1001 let mut db: C = open_db_clone(context.child("first"), "build-big".into()).await;
1002
1003 let mut map = std::collections::HashMap::<C::Key, <C as DbAny<M>>::Value>::default();
1004
1005 let mut batch = db.new_batch();
1007
1008 for i in 0u64..ELEMENTS {
1010 let k: C::Key = TestKey::from_seed(i);
1011 let v: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
1012 batch = batch.write(k, Some(v.clone()));
1013 map.insert(k, v);
1014 }
1015
1016 for i in 0u64..ELEMENTS {
1018 if i % 3 != 0 {
1019 continue;
1020 }
1021 let k: C::Key = TestKey::from_seed(i);
1022 let v: <C as DbAny<M>>::Value = TestValue::from_seed((i + 1) * 10000);
1023 batch = batch.write(k, Some(v.clone()));
1024 map.insert(k, v);
1025 }
1026
1027 for i in 0u64..ELEMENTS {
1029 if i % 7 != 1 {
1030 continue;
1031 }
1032 let k: C::Key = TestKey::from_seed(i);
1033 batch = batch.write(k, None);
1034 map.remove(&k);
1035 }
1036
1037 let merkleized = batch.merkleize(&db, None).await.unwrap();
1038 db.apply_batch(merkleized).await.unwrap();
1039
1040 db.sync().await.unwrap();
1042 db.prune(db.sync_boundary().await).await.unwrap();
1043
1044 let root = db.root();
1046 db.sync().await.unwrap();
1047 drop(db);
1048
1049 let db: C = open_db(context.child("second"), "build-big".into()).await;
1051 assert_eq!(root, db.root());
1052
1053 for i in 0u64..ELEMENTS {
1055 let k: C::Key = TestKey::from_seed(i);
1056 if let Some(map_value) = map.get(&k) {
1057 let Some(db_value) = db.get(&k).await.unwrap() else {
1058 panic!("key not found in db: {k}");
1059 };
1060 assert_eq!(*map_value, db_value);
1061 } else {
1062 assert!(db.get(&k).await.unwrap().is_none());
1063 }
1064 }
1065 });
1066 }
1067
1068 pub fn test_stale_batch_side_effect_free<M, C, F, Fut>(mut open_db: F)
1072 where
1073 M: merkle::Graftable,
1074 C: DbAny<M>,
1075 C::Key: TestKey,
1076 <C as DbAny<M>>::Value: TestValue,
1077 F: FnMut(Context, String) -> Fut,
1078 Fut: Future<Output = C>,
1079 {
1080 let executor = deterministic::Runner::default();
1081 executor.start(|context| async move {
1082 let mut db: C = open_db(context.child("db"), "stale-side-effect-free".into()).await;
1083
1084 let key1 = <C::Key as TestKey>::from_seed(1);
1085 let key2 = <C::Key as TestKey>::from_seed(2);
1086 let value1 = <<C as DbAny<M>>::Value as TestValue>::from_seed(10);
1087 let value2 = <<C as DbAny<M>>::Value as TestValue>::from_seed(20);
1088
1089 let mut batch = db.new_batch();
1090 batch = batch.write(key1, Some(value1.clone()));
1091 let batch_a = batch.merkleize(&db, None).await.unwrap();
1092 let mut batch = db.new_batch();
1093 batch = batch.write(key2, Some(value2));
1094 let batch_b = batch.merkleize(&db, None).await.unwrap();
1095
1096 db.apply_batch(batch_a).await.unwrap();
1097 let expected_root = db.root();
1098 let expected_bounds = db.bounds().await;
1099 let expected_metadata = db.get_metadata().await.unwrap();
1100 assert_eq!(db.get(&key1).await.unwrap(), Some(value1.clone()));
1101 assert_eq!(db.get(&key2).await.unwrap(), None);
1102
1103 let result = db.apply_batch(batch_b).await;
1104 assert!(
1105 matches!(result, Err(Error::StaleBatch { .. })),
1106 "expected StaleBatch error, got {result:?}"
1107 );
1108 assert_eq!(db.root(), expected_root);
1109 assert_eq!(db.bounds().await, expected_bounds);
1110 assert_eq!(db.get_metadata().await.unwrap(), expected_metadata);
1111 assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1112 assert_eq!(db.get(&key2).await.unwrap(), None);
1113
1114 db.destroy().await.unwrap();
1115 });
1116 }
1117
1118 use crate::translator::OneCap;
1119 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
1120 use commonware_macros::{test_group, test_traced};
1121
1122 type OrderedFixedDb =
1123 ordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1124 type OrderedVariableDb =
1125 ordered::variable::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1126 type UnorderedFixedDb =
1127 unordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1128 type UnorderedVariableDb = unordered::variable::Db<
1129 mmr::Family,
1130 Context,
1131 Digest,
1132 Digest,
1133 Sha256,
1134 OneCap,
1135 32,
1136 Sequential,
1137 >;
1138 type OrderedFixedP1Db = ordered::fixed::partitioned::Db<
1139 mmr::Family,
1140 Context,
1141 Digest,
1142 Digest,
1143 Sha256,
1144 OneCap,
1145 1,
1146 32,
1147 Sequential,
1148 >;
1149 type OrderedVariableP1Db = ordered::variable::partitioned::Db<
1150 mmr::Family,
1151 Context,
1152 Digest,
1153 Digest,
1154 Sha256,
1155 OneCap,
1156 1,
1157 32,
1158 Sequential,
1159 >;
1160 type UnorderedFixedP1Db = unordered::fixed::partitioned::Db<
1161 mmr::Family,
1162 Context,
1163 Digest,
1164 Digest,
1165 Sha256,
1166 OneCap,
1167 1,
1168 32,
1169 Sequential,
1170 >;
1171 type UnorderedVariableP1Db = unordered::variable::partitioned::Db<
1172 mmr::Family,
1173 Context,
1174 Digest,
1175 Digest,
1176 Sha256,
1177 OneCap,
1178 1,
1179 32,
1180 Sequential,
1181 >;
1182 type OrderedFixedP2Db = ordered::fixed::partitioned::Db<
1183 mmr::Family,
1184 Context,
1185 Digest,
1186 Digest,
1187 Sha256,
1188 OneCap,
1189 2,
1190 32,
1191 Sequential,
1192 >;
1193 type OrderedVariableP2Db = ordered::variable::partitioned::Db<
1194 mmr::Family,
1195 Context,
1196 Digest,
1197 Digest,
1198 Sha256,
1199 OneCap,
1200 2,
1201 32,
1202 Sequential,
1203 >;
1204 type UnorderedFixedP2Db = unordered::fixed::partitioned::Db<
1205 mmr::Family,
1206 Context,
1207 Digest,
1208 Digest,
1209 Sha256,
1210 OneCap,
1211 2,
1212 32,
1213 Sequential,
1214 >;
1215 type UnorderedVariableP2Db = unordered::variable::partitioned::Db<
1216 mmr::Family,
1217 Context,
1218 Digest,
1219 Digest,
1220 Sha256,
1221 OneCap,
1222 2,
1223 32,
1224 Sequential,
1225 >;
1226
1227 type OrderedFixedMmbDb =
1228 ordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1229 type OrderedVariableMmbDb =
1230 ordered::variable::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1231 type UnorderedFixedMmbDb =
1232 unordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1233 type UnorderedVariableMmbDb = unordered::variable::Db<
1234 mmb::Family,
1235 Context,
1236 Digest,
1237 Digest,
1238 Sha256,
1239 OneCap,
1240 32,
1241 Sequential,
1242 >;
1243 type OrderedFixedMmbP1Db = ordered::fixed::partitioned::Db<
1244 mmb::Family,
1245 Context,
1246 Digest,
1247 Digest,
1248 Sha256,
1249 OneCap,
1250 1,
1251 32,
1252 Sequential,
1253 >;
1254 type OrderedVariableMmbP1Db = ordered::variable::partitioned::Db<
1255 mmb::Family,
1256 Context,
1257 Digest,
1258 Digest,
1259 Sha256,
1260 OneCap,
1261 1,
1262 32,
1263 Sequential,
1264 >;
1265 type UnorderedFixedMmbP1Db = unordered::fixed::partitioned::Db<
1266 mmb::Family,
1267 Context,
1268 Digest,
1269 Digest,
1270 Sha256,
1271 OneCap,
1272 1,
1273 32,
1274 Sequential,
1275 >;
1276 type UnorderedVariableMmbP1Db = unordered::variable::partitioned::Db<
1277 mmb::Family,
1278 Context,
1279 Digest,
1280 Digest,
1281 Sha256,
1282 OneCap,
1283 1,
1284 32,
1285 Sequential,
1286 >;
1287 type OrderedFixedMmbP2Db = ordered::fixed::partitioned::Db<
1288 mmb::Family,
1289 Context,
1290 Digest,
1291 Digest,
1292 Sha256,
1293 OneCap,
1294 2,
1295 32,
1296 Sequential,
1297 >;
1298 type OrderedVariableMmbP2Db = ordered::variable::partitioned::Db<
1299 mmb::Family,
1300 Context,
1301 Digest,
1302 Digest,
1303 Sha256,
1304 OneCap,
1305 2,
1306 32,
1307 Sequential,
1308 >;
1309 type UnorderedFixedMmbP2Db = unordered::fixed::partitioned::Db<
1310 mmb::Family,
1311 Context,
1312 Digest,
1313 Digest,
1314 Sha256,
1315 OneCap,
1316 2,
1317 32,
1318 Sequential,
1319 >;
1320 type UnorderedVariableMmbP2Db = unordered::variable::partitioned::Db<
1321 mmb::Family,
1322 Context,
1323 Digest,
1324 Digest,
1325 Sha256,
1326 OneCap,
1327 2,
1328 32,
1329 Sequential,
1330 >;
1331
1332 macro_rules! open_db_fn {
1334 ($db:ty, $cfg:ident) => {
1335 |ctx: Context, partition: String| async move {
1336 <$db>::init(ctx.child("storage"), $cfg::<OneCap>(&partition, &ctx))
1337 .await
1338 .unwrap()
1339 }
1340 };
1341 }
1342
1343 macro_rules! with_all_variants {
1345 ($cb:ident!($($args:tt)*)) => {
1346 $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1347 $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1348 $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1349 $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1350 $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1351 $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1352 $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1353 $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1354 $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1355 $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1356 $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1357 $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1358 $cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
1359 $cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
1360 $cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
1361 $cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
1362 $cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
1363 $cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
1364 $cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
1365 $cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
1366 $cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
1367 $cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
1368 $cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
1369 $cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
1370 };
1371 }
1372
1373 macro_rules! with_ordered_variants {
1375 ($cb:ident!($($args:tt)*)) => {
1376 $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1377 $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1378 $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1379 $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1380 $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1381 $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1382 $cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
1383 $cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
1384 $cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
1385 $cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
1386 $cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
1387 $cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
1388 };
1389 }
1390
1391 macro_rules! with_unordered_variants {
1393 ($cb:ident!($($args:tt)*)) => {
1394 $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1395 $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1396 $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1397 $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1398 $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1399 $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1400 $cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
1401 $cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
1402 $cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
1403 $cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
1404 $cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
1405 $cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
1406 };
1407 }
1408
1409 macro_rules! test_simple {
1411 ($f:expr, $l:literal, $db:ty, $cfg:ident) => {
1412 Box::pin(async {
1413 $f(open_db_fn!($db, $cfg));
1414 })
1415 .await
1416 };
1417 }
1418
1419 macro_rules! for_all_variants {
1421 (simple: $f:expr) => {{
1422 with_all_variants!(test_simple!($f));
1423 }};
1424 (ordered: $f:expr) => {{
1425 with_ordered_variants!(test_simple!($f));
1426 }};
1427 (unordered: $f:expr) => {{
1428 with_unordered_variants!(test_simple!($f));
1429 }};
1430 }
1431
1432 fn test_ordered_build_big<M, C, F, Fut>(open_db: F)
1434 where
1435 M: merkle::Graftable,
1436 C: DbAny<M>,
1437 C::Key: TestKey,
1438 <C as DbAny<M>>::Value: TestValue,
1439 F: FnMut(Context, String) -> Fut + Clone,
1440 Fut: Future<Output = C>,
1441 {
1442 test_current_db_build_big::<M, C, F, Fut>(open_db);
1443 }
1444
1445 fn test_unordered_build_big<M, C, F, Fut>(open_db: F)
1446 where
1447 M: merkle::Graftable,
1448 C: DbAny<M>,
1449 C::Key: TestKey,
1450 <C as DbAny<M>>::Value: TestValue,
1451 F: FnMut(Context, String) -> Fut + Clone,
1452 Fut: Future<Output = C>,
1453 {
1454 test_current_db_build_big::<M, C, F, Fut>(open_db);
1455 }
1456
1457 #[test_group("slow")]
1458 #[test_traced("WARN")]
1459 fn test_all_variants_build_random_close_reopen() {
1460 let executor = deterministic::Runner::default();
1461 executor.start(|_context| async move {
1462 for_all_variants!(simple: test_build_random_close_reopen);
1463 });
1464 }
1465
1466 #[test_group("slow")]
1467 #[test_traced("WARN")]
1468 fn test_all_variants_simulate_write_failures() {
1469 let executor = deterministic::Runner::default();
1470 executor.start(|_context| async move {
1471 for_all_variants!(simple: test_simulate_write_failures);
1472 });
1473 }
1474
1475 #[test_group("slow")]
1476 #[test_traced("WARN")]
1477 fn test_all_variants_different_pruning_delays_same_root() {
1478 let executor = deterministic::Runner::default();
1479 executor.start(|_context| async move {
1480 for_all_variants!(simple: test_different_pruning_delays_same_root);
1481 });
1482 }
1483
1484 #[test_group("slow")]
1485 #[test_traced("WARN")]
1486 fn test_all_variants_sync_persists_bitmap_pruning_boundary() {
1487 let executor = deterministic::Runner::default();
1488 executor.start(|_context| async move {
1489 for_all_variants!(simple: test_sync_persists_bitmap_pruning_boundary);
1490 });
1491 }
1492
1493 #[test_traced("WARN")]
1494 fn test_all_variants_stale_batch_side_effect_free() {
1495 let executor = deterministic::Runner::default();
1496 executor.start(|_context| async move {
1497 for_all_variants!(simple: test_stale_batch_side_effect_free);
1498 });
1499 }
1500
1501 #[test_group("slow")]
1502 #[test_traced("WARN")]
1503 fn test_ordered_variants_build_big() {
1504 let executor = deterministic::Runner::default();
1505 executor.start(|_context| async move {
1506 for_all_variants!(ordered: test_ordered_build_big);
1507 });
1508 }
1509
1510 #[test_group("slow")]
1511 #[test_traced("WARN")]
1512 fn test_unordered_variants_build_big() {
1513 let executor = deterministic::Runner::default();
1514 executor.start(|_context| async move {
1515 for_all_variants!(unordered: test_unordered_build_big);
1516 });
1517 }
1518
1519 #[test_group("slow")]
1520 #[test_traced("DEBUG")]
1521 fn test_ordered_variants_build_small_close_reopen() {
1522 let executor = deterministic::Runner::default();
1523 executor.start(|_context| async move {
1524 for_all_variants!(ordered: ordered::tests::test_build_small_close_reopen);
1525 });
1526 }
1527
1528 #[test_group("slow")]
1529 #[test_traced("DEBUG")]
1530 fn test_unordered_variants_build_small_close_reopen() {
1531 let executor = deterministic::Runner::default();
1532 executor.start(|_context| async move {
1533 for_all_variants!(unordered: unordered::tests::test_build_small_close_reopen);
1534 });
1535 }
1536
1537 fn key(i: u64) -> Digest {
1544 Sha256::hash(&i.to_be_bytes())
1545 }
1546
1547 fn val(i: u64) -> Digest {
1548 Sha256::hash(&(i + 10000).to_be_bytes())
1549 }
1550
1551 async fn mmb_commit(
1552 db: &mut UnorderedVariableMmbDb,
1553 writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1554 ) {
1555 let mut batch = db.new_batch();
1556 for (k, v) in writes {
1557 batch = batch.write(k, v);
1558 }
1559 let merkleized = batch.merkleize(db, None).await.unwrap();
1560 db.apply_batch(merkleized).await.unwrap();
1561 db.commit().await.unwrap();
1562 }
1563
1564 async fn commit_writes_with_metadata(
1565 db: &mut UnorderedVariableDb,
1566 writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1567 metadata: Option<Digest>,
1568 ) -> std::ops::Range<Location<mmr::Family>> {
1569 let mut batch = db.new_batch();
1570 for (k, v) in writes {
1571 batch = batch.write(k, v);
1572 }
1573 let merkleized = batch.merkleize(db, metadata).await.unwrap();
1574 let range = db.apply_batch(merkleized).await.unwrap();
1575 db.commit().await.unwrap();
1576 range
1577 }
1578
1579 #[test_traced("INFO")]
1580 fn test_current_rewind_recovery() {
1581 let executor = deterministic::Runner::default();
1582 executor.start(|context| async move {
1583 let partition = "current-rewind-recovery";
1584 let ctx = context.child("db");
1585 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
1586 ctx.child("storage"),
1587 variable_config::<OneCap>(partition, &ctx),
1588 )
1589 .await
1590 .unwrap();
1591 let initial_size = db.bounds().await.end;
1592 let initial_root = db.root();
1593 let initial_ops_root = db.ops_root();
1594 let initial_floor = db.inactivity_floor_loc();
1595
1596 let metadata_a = val(900);
1597 let first_range = commit_writes_with_metadata(
1598 &mut db,
1599 [(key(0), Some(val(0))), (key(1), Some(val(1)))],
1600 Some(metadata_a),
1601 )
1602 .await;
1603 assert_eq!(first_range.start, initial_size);
1604 let size_before = db.bounds().await.end;
1605 let root_before = db.root();
1606 let ops_root_before = db.ops_root();
1607 let floor_before = db.inactivity_floor_loc();
1608 assert_eq!(size_before, first_range.end);
1609
1610 let metadata_b = val(901);
1611 let second_range = commit_writes_with_metadata(
1612 &mut db,
1613 [
1614 (key(0), Some(val(100))),
1615 (key(1), None),
1616 (key(2), Some(val(2))),
1617 ],
1618 Some(metadata_b),
1619 )
1620 .await;
1621 assert_eq!(second_range.start, size_before);
1622 assert_ne!(db.root(), root_before);
1623 assert_eq!(db.get_metadata().await.unwrap(), Some(val(901)));
1624 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1625 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1626 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1627
1628 db.rewind(size_before).await.unwrap();
1629 assert_eq!(db.bounds().await.end, size_before);
1630 assert_eq!(db.root(), root_before);
1631 assert_eq!(db.ops_root(), ops_root_before);
1632 assert_eq!(db.inactivity_floor_loc(), floor_before);
1633 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1634 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1635 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1636 assert_eq!(db.get(&key(2)).await.unwrap(), None);
1637
1638 db.commit().await.unwrap();
1639 drop(db);
1640
1641 let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1642 context.child("reopen"),
1643 variable_config::<OneCap>(partition, &context),
1644 )
1645 .await
1646 .unwrap();
1647 assert_eq!(reopened.bounds().await.end, size_before);
1648 assert_eq!(reopened.root(), root_before);
1649 assert_eq!(reopened.ops_root(), ops_root_before);
1650 assert_eq!(reopened.inactivity_floor_loc(), floor_before);
1651 assert_eq!(reopened.get_metadata().await.unwrap(), Some(val(900)));
1652 assert_eq!(reopened.get(&key(0)).await.unwrap(), Some(val(0)));
1653 assert_eq!(reopened.get(&key(1)).await.unwrap(), Some(val(1)));
1654 assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
1655
1656 let mut reopened = reopened;
1657 reopened.rewind(initial_size).await.unwrap();
1658 assert_eq!(reopened.bounds().await.end, initial_size);
1659 assert_eq!(reopened.root(), initial_root);
1660 assert_eq!(reopened.ops_root(), initial_ops_root);
1661 assert_eq!(reopened.inactivity_floor_loc(), initial_floor);
1662 assert_eq!(reopened.get_metadata().await.unwrap(), None);
1663 assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
1664 assert_eq!(reopened.get(&key(1)).await.unwrap(), None);
1665 assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
1666
1667 reopened.commit().await.unwrap();
1668 drop(reopened);
1669
1670 let reopened_initial: UnorderedVariableDb = UnorderedVariableDb::init(
1671 context.child("reopen_initial"),
1672 variable_config::<OneCap>(partition, &context),
1673 )
1674 .await
1675 .unwrap();
1676 assert_eq!(reopened_initial.bounds().await.end, initial_size);
1677 assert_eq!(reopened_initial.root(), initial_root);
1678 assert_eq!(reopened_initial.ops_root(), initial_ops_root);
1679 assert_eq!(reopened_initial.inactivity_floor_loc(), initial_floor);
1680 assert_eq!(reopened_initial.get_metadata().await.unwrap(), None);
1681 assert_eq!(reopened_initial.get(&key(0)).await.unwrap(), None);
1682 assert_eq!(reopened_initial.get(&key(1)).await.unwrap(), None);
1683 assert_eq!(reopened_initial.get(&key(2)).await.unwrap(), None);
1684
1685 reopened_initial.destroy().await.unwrap();
1686 });
1687 }
1688
1689 #[test_traced("INFO")]
1690 fn test_current_rewind_recovery_pruned_repeated_updates() {
1691 let executor = deterministic::Runner::default();
1692 executor.start(|context| async move {
1693 const COMMITS: u64 = 96;
1694
1695 let partition = "current-rewind-pruned-recovery";
1696 let ctx = context.child("db");
1697 let mut db: UnorderedVariableDb =
1698 UnorderedVariableDb::init(ctx.child("storage"), variable_config::<OneCap>(partition, &ctx))
1699 .await
1700 .unwrap();
1701
1702 let key0 = key(0);
1703 let mut history = Vec::new();
1704 for round in 0..COMMITS {
1705 commit_writes_with_metadata(
1706 &mut db,
1707 [(key0, Some(val(20_000 + round)))],
1708 None,
1709 )
1710 .await;
1711 history.push((
1712 db.bounds().await.end,
1713 db.inactivity_floor_loc(),
1714 db.root(),
1715 db.ops_root(),
1716 val(20_000 + round),
1717 ));
1718 }
1719
1720 db.prune(Location::new(1)).await.unwrap();
1723 let pruned_bits = db.pruned_bits();
1724 assert!(pruned_bits > 0, "expected bitmap pruning for rewind test");
1725 let bounds = db.bounds().await;
1726
1727 let (target_size, target_root, target_ops_root, target_value) = history
1728 .iter()
1729 .enumerate()
1730 .find_map(|(idx, (size, floor, root, ops_root, value))| {
1731 let removed_commits = history.len() - idx - 1;
1732 if removed_commits >= 3 && *size > bounds.start && *floor >= pruned_bits {
1733 Some((*size, *root, *ops_root, *value))
1734 } else {
1735 None
1736 }
1737 })
1738 .unwrap_or_else(|| {
1739 panic!(
1740 "expected legal pruned rewind target with repeated updates; bounds={bounds:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
1741 db.inactivity_floor_loc()
1742 )
1743 });
1744
1745 db.rewind(target_size).await.unwrap();
1746 assert_eq!(db.root(), target_root);
1747 assert_eq!(db.ops_root(), target_ops_root);
1748 assert_eq!(db.bounds().await.end, target_size);
1749 assert_eq!(db.get(&key0).await.unwrap(), Some(target_value));
1750
1751 db.commit().await.unwrap();
1752 drop(db);
1753
1754 let mut reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1755 context.child("reopen_pruned_recovery"),
1756 variable_config::<OneCap>(partition, &context),
1757 )
1758 .await
1759 .unwrap();
1760 assert_eq!(reopened.root(), target_root);
1761 assert_eq!(reopened.ops_root(), target_ops_root);
1762 assert_eq!(reopened.bounds().await.end, target_size);
1763 assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
1764
1765 let metadata_after_rewind = val(30_000);
1766 let new_key = key(1);
1767 let new_value = val(30_001);
1768 let expected_end = commit_writes_with_metadata(
1769 &mut reopened,
1770 [(new_key, Some(new_value))],
1771 Some(metadata_after_rewind),
1772 )
1773 .await
1774 .end;
1775 let root_after_new_write = reopened.root();
1776 let ops_root_after_new_write = reopened.ops_root();
1777 assert_eq!(reopened.bounds().await.end, expected_end);
1778 assert_eq!(reopened.get_metadata().await.unwrap(), Some(metadata_after_rewind));
1779 assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
1780 assert_eq!(reopened.get(&new_key).await.unwrap(), Some(new_value));
1781
1782 drop(reopened);
1783 let reopened_after_new_write: UnorderedVariableDb = UnorderedVariableDb::init(
1784 context.child("reopen_pruned_after_new_write"),
1785 variable_config::<OneCap>(partition, &context),
1786 )
1787 .await
1788 .unwrap();
1789 assert_eq!(reopened_after_new_write.root(), root_after_new_write);
1790 assert_eq!(reopened_after_new_write.ops_root(), ops_root_after_new_write);
1791 assert_eq!(reopened_after_new_write.bounds().await.end, expected_end);
1792 assert_eq!(
1793 reopened_after_new_write.get_metadata().await.unwrap(),
1794 Some(metadata_after_rewind)
1795 );
1796 assert_eq!(reopened_after_new_write.get(&key0).await.unwrap(), Some(target_value));
1797 assert_eq!(
1798 reopened_after_new_write.get(&new_key).await.unwrap(),
1799 Some(new_value)
1800 );
1801
1802 reopened_after_new_write.destroy().await.unwrap();
1803 });
1804 }
1805
1806 #[test_traced("INFO")]
1809 fn test_current_mmb_settlement_guard_defers_pruning() {
1810 let executor = deterministic::Runner::default();
1811 executor.start(|context| async move {
1812 const COMMITS: u64 = 100;
1813
1814 let partition = "current-mmb-reopen-prove-after-prune";
1815 let ctx = context.child("db");
1816 let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1817 ctx.child("storage"),
1818 variable_config::<OneCap>(partition, &ctx),
1819 )
1820 .await
1821 .unwrap();
1822
1823 let k = key(0);
1824 let mut expected = None;
1825 for round in 0..COMMITS {
1826 expected = Some(val(50_000 + round));
1827 let mut batch = db.new_batch();
1828 batch = batch.write(k, expected);
1829 let merkleized = batch.merkleize(&db, None).await.unwrap();
1830 db.apply_batch(merkleized).await.unwrap();
1831 db.commit().await.unwrap();
1832 }
1833
1834 let root_before = db.root();
1835 assert!(
1836 *db.inactivity_floor_loc() >= 256,
1837 "expected inactivity floor past chunk 0"
1838 );
1839 assert_eq!(
1840 *db.sync_boundary(),
1841 0,
1842 "settlement guard should hold boundary at 0 during unsettled window"
1843 );
1844
1845 let result = db.prune(Location::<mmb::Family>::new(1)).await;
1847 assert!(
1848 matches!(result, Err(Error::PruneBeyondMinRequired(_, _))),
1849 "expected PruneBeyondMinRequired, got {result:?}"
1850 );
1851 assert_eq!(db.pruned_bits(), 0);
1852 db.sync().await.unwrap();
1853 drop(db);
1854
1855 let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1857 context.child("reopen"),
1858 variable_config::<OneCap>(partition, &context),
1859 )
1860 .await
1861 .unwrap();
1862
1863 assert_eq!(reopened.root(), root_before);
1864 assert_eq!(reopened.get(&k).await.unwrap(), expected);
1865
1866 let hasher = qmdb::hasher::<Sha256>();
1868 let _proof = reopened.key_value_proof(&hasher, k).await.unwrap();
1869
1870 reopened.destroy().await.unwrap();
1871 });
1872 }
1873
1874 #[test_traced("INFO")]
1875 fn test_current_mmb_rewind_rejects_unsettled_pruned_window() {
1876 let executor = deterministic::Runner::default();
1877 executor.start(|context| async move {
1878 const COMMITS: u64 = 320;
1879 const N: usize = 32;
1880
1881 let partition = "current-mmb-rewind-unsettled-window";
1882 let ctx = context.child("db");
1883 let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1884 ctx.child("storage"),
1885 variable_config::<OneCap>(partition, &ctx),
1886 )
1887 .await
1888 .unwrap();
1889
1890 let key0 = key(0);
1891 let mut history = Vec::new();
1892 for round in 0..COMMITS {
1893 let mut batch = db.new_batch();
1894 batch = batch.write(key0, Some(val(60_000 + round)));
1895 let merkleized = batch.merkleize(&db, None).await.unwrap();
1896 db.apply_batch(merkleized).await.unwrap();
1897 db.commit().await.unwrap();
1898 history.push((db.bounds().await.end, db.inactivity_floor_loc()));
1899 }
1900
1901 db.prune(db.sync_boundary()).await.unwrap();
1902 let pruned_bits = db.pruned_bits();
1903 assert!(pruned_bits > 0, "expected MMB bitmap pruning to be active");
1904 db.sync().await.unwrap();
1905
1906 let chunk_bits = commonware_utils::bitmap::BitMap::<N>::CHUNK_SIZE_BITS;
1907 let pruned_chunks = (pruned_bits / chunk_bits) as u64;
1908 let gh = super::grafting::height::<N>();
1909 let youngest = pruned_chunks - 1;
1910 let pair_chunk = youngest & !1;
1911 let pair_start = pair_chunk << gh;
1912 let pair_pos = <mmb::Family as merkle::Graftable>::subtree_root_position(
1913 merkle::Location::<mmb::Family>::new(pair_start),
1914 gh + 1,
1915 );
1916 let absorbed_after =
1917 <mmb::Family as merkle::Graftable>::peak_birth_size(pair_pos, gh + 1);
1918
1919 let unsafe_target = history
1920 .iter()
1921 .filter_map(|(size, floor)| {
1922 let s = **size;
1923 if s >= pruned_bits && s < absorbed_after && **floor >= pruned_bits {
1924 Some(s)
1925 } else {
1926 None
1927 }
1928 })
1929 .max()
1930 .unwrap_or_else(|| {
1931 panic!(
1932 "expected rewind target in unsettled window: pruned_bits={pruned_bits}, absorbed_after={absorbed_after}, history={history:?}"
1933 )
1934 });
1935
1936 let err = db
1937 .rewind(merkle::Location::<mmb::Family>::new(unsafe_target))
1938 .await
1939 .unwrap_err();
1940 assert!(
1941 matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1942 "unexpected rewind error for unsettled delayed-merge window: {err:?}"
1943 );
1944 drop(db);
1945
1946 let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1947 context.child("reopen"),
1948 variable_config::<OneCap>(partition, &context),
1949 )
1950 .await
1951 .unwrap();
1952 reopened.destroy().await.unwrap();
1953 });
1954 }
1955
1956 #[test_traced]
1961 fn test_current_mmb_prune_respects_sync_boundary() {
1962 let executor = deterministic::Runner::default();
1963 executor.start(|context| async move {
1964 const COMMITS: u64 = 320;
1965
1966 let ctx = context.child("db");
1967 let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1968 ctx.child("storage"),
1969 variable_config::<OneCap>("prune-clip-mmb", &ctx),
1970 )
1971 .await
1972 .unwrap();
1973
1974 let k = key(0);
1975 for round in 0..COMMITS {
1976 mmb_commit(&mut db, [(k, Some(val(70_000 + round)))]).await;
1977 }
1978
1979 db.prune(db.sync_boundary()).await.unwrap();
1980
1981 let boundary = db.sync_boundary();
1982 let floor = db.inactivity_floor_loc();
1983 assert!(
1984 boundary < floor,
1985 "delayed-merge lag must be strictly active: boundary={boundary}, floor={floor}"
1986 );
1987 assert!(
1988 db.bounds().await.start <= boundary,
1989 "ops journal was pruned past the settled bitmap boundary: \
1990 bounds.start={}, boundary={boundary}",
1991 db.bounds().await.start
1992 );
1993
1994 db.destroy().await.unwrap();
1995 });
1996 }
1997
1998 #[test_traced]
2003 fn test_current_mmr_prune_boundary_lag_is_only_chunk_alignment() {
2004 let executor = deterministic::Runner::default();
2005 executor.start(|context| async move {
2006 const COMMITS: u64 = 320;
2007 const N: usize = 32;
2008
2009 let ctx = context.child("db");
2010 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2011 ctx.child("storage"),
2012 variable_config::<OneCap>("prune-clip-mmr", &ctx),
2013 )
2014 .await
2015 .unwrap();
2016
2017 for round in 0..COMMITS {
2018 commit_writes_with_metadata(
2019 &mut db,
2020 [(key(0), Some(val(80_000 + round)))],
2021 None,
2022 )
2023 .await;
2024 }
2025
2026 db.prune(db.sync_boundary()).await.unwrap();
2027
2028 let boundary = db.sync_boundary();
2029 let floor = db.inactivity_floor_loc();
2030 let chunk_bits = commonware_utils::bitmap::BitMap::<N>::CHUNK_SIZE_BITS;
2031 assert!(
2032 boundary <= floor && *floor - *boundary < chunk_bits,
2033 "MMR lag should be only chunk alignment: boundary={boundary}, floor={floor}, chunk_bits={chunk_bits}"
2034 );
2035 assert!(
2036 db.bounds().await.start <= boundary,
2037 "ops journal bounds must be <= sync_boundary: bounds.start={}, boundary={boundary}",
2038 db.bounds().await.start
2039 );
2040
2041 db.destroy().await.unwrap();
2042 });
2043 }
2044
2045 #[test_traced]
2048 fn test_current_prune_below_settled_boundary_is_honored() {
2049 let executor = deterministic::Runner::default();
2050 executor.start(|context| async move {
2051 const COMMITS: u64 = 100;
2052
2053 let ctx = context.child("db");
2054 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2055 ctx.child("storage"),
2056 variable_config::<OneCap>("prune-below-boundary", &ctx),
2057 )
2058 .await
2059 .unwrap();
2060
2061 for round in 0..COMMITS {
2062 commit_writes_with_metadata(&mut db, [(key(0), Some(val(90_000 + round)))], None)
2063 .await;
2064 }
2065
2066 assert!(*db.inactivity_floor_loc() > 1);
2067 let small = Location::new(1);
2068 db.prune(small).await.unwrap();
2069
2070 assert!(
2071 db.bounds().await.start <= small,
2072 "journal pruning exceeded the caller-supplied target: bounds.start={}, requested={small}",
2073 db.bounds().await.start
2074 );
2075
2076 db.destroy().await.unwrap();
2077 });
2078 }
2079
2080 #[test_traced]
2083 fn test_current_mmb_reopen_and_prove_after_prune_delayed_merge() {
2084 let executor = deterministic::Runner::default();
2085 executor.start(|context| async move {
2086 let db_ctx = context.child("db_init");
2087 let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2088 db_ctx.child("db"),
2089 variable_config::<OneCap>("test_prune_delayed_merge", &db_ctx),
2090 )
2091 .await
2092 .unwrap();
2093
2094 let k = key(0);
2095
2096 for round in 0..200u64 {
2097 mmb_commit(&mut db, [(k, Some(val(60_000 + round)))]).await;
2098 }
2099
2100 db.prune(db.sync_boundary()).await.unwrap();
2101 db.sync().await.unwrap();
2102
2103 for round in 200..300u64 {
2105 mmb_commit(&mut db, [(key(1), Some(val(round)))]).await;
2106 }
2107
2108 let hasher = qmdb::hasher::<Sha256>();
2109 let proof = db.key_value_proof(&hasher, k).await.unwrap();
2110 assert!(UnorderedVariableMmbDb::verify_key_value_proof(
2111 &hasher,
2112 k,
2113 val(60_000 + 199),
2114 &proof,
2115 &db.root()
2116 ));
2117
2118 let target_root = db.root();
2119 drop(db);
2120
2121 let reopen_ctx = context.child("db_reopen");
2122 let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2123 reopen_ctx.child("db"),
2124 variable_config::<OneCap>("test_prune_delayed_merge", &reopen_ctx),
2125 )
2126 .await
2127 .unwrap();
2128
2129 assert_eq!(reopened.root(), target_root);
2130
2131 let hasher = qmdb::hasher::<Sha256>();
2132 let proof = reopened.key_value_proof(&hasher, k).await.unwrap();
2133 assert!(UnorderedVariableMmbDb::verify_key_value_proof(
2134 &hasher,
2135 k,
2136 val(60_000 + 199),
2137 &proof,
2138 &reopened.root()
2139 ));
2140
2141 reopened.destroy().await.unwrap();
2142 });
2143 }
2144
2145 #[test_traced]
2147 fn test_current_mmb_reopen_after_prune_two_chunks() {
2148 let executor = deterministic::Runner::default();
2149 executor.start(|context| async move {
2150 let db_ctx = context.child("db");
2151 let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2152 db_ctx.child("db"),
2153 variable_config::<OneCap>("test_prune_two", &db_ctx),
2154 )
2155 .await
2156 .unwrap();
2157
2158 let k = key(0);
2159 let mut expected;
2161
2162 let mut round = 0u64;
2165 loop {
2166 expected = Some(val(60_000 + round));
2167 mmb_commit(&mut db, [(k, expected)]).await;
2168 round += 1;
2169 db.prune(db.sync_boundary()).await.unwrap();
2170 if db.pruned_bits() >= 512 {
2171 break;
2172 }
2173 assert!(
2174 round < 500,
2175 "failed to reach 2 pruned chunks after {round} commits"
2176 );
2177 }
2178 db.sync().await.unwrap();
2179
2180 let target_root = db.root();
2181 drop(db);
2182
2183 let reopen_ctx = context.child("db_reopen");
2184 let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2185 reopen_ctx.child("db"),
2186 variable_config::<OneCap>("test_prune_two", &reopen_ctx),
2187 )
2188 .await
2189 .unwrap();
2190
2191 assert_eq!(reopened.root(), target_root);
2192 assert_eq!(reopened.get(&k).await.unwrap(), expected);
2193 reopened.destroy().await.unwrap();
2194 });
2195 }
2196
2197 #[test_traced]
2199 fn test_current_mmb_repeated_prune() {
2200 let executor = deterministic::Runner::default();
2201 executor.start(|context| async move {
2202 let mut db_ctx = context.child("db_init");
2203 let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2204 db_ctx.child("db"),
2205 variable_config::<OneCap>("test_repeated_prune", &db_ctx),
2206 )
2207 .await
2208 .unwrap();
2209
2210 for round in 0..3u64 {
2211 let k = key(round * 1000);
2212 let mut expected = None;
2213 for i in 0..90 {
2214 expected = Some(val(round * 1000 + i));
2215 mmb_commit(&mut db, [(k, expected)]).await;
2216 }
2217
2218 db.prune(db.sync_boundary()).await.unwrap();
2219 db.sync().await.unwrap();
2220
2221 let root_before = db.root();
2222 db_ctx = context.child("db").with_attribute("round", round);
2223
2224 let prev_db = db;
2225 db = UnorderedVariableMmbDb::init(
2226 db_ctx.child("db"),
2227 variable_config::<OneCap>("test_repeated_prune", &db_ctx),
2228 )
2229 .await
2230 .unwrap();
2231
2232 assert_eq!(db.root(), root_before);
2233 assert_eq!(db.get(&k).await.unwrap(), expected);
2234 drop(prev_db);
2235 }
2236
2237 db.destroy().await.unwrap();
2238 });
2239 }
2240
2241 #[test_traced]
2243 fn test_current_mmb_stepwise_growth_matches_unpruned_reference() {
2244 let executor = deterministic::Runner::default();
2245 executor.start(|context| async move {
2246 let db_ctx = context.child("db_stepwise");
2247 let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2248 db_ctx.child("db"),
2249 variable_config::<OneCap>("test_stepwise", &db_ctx),
2250 )
2251 .await
2252 .unwrap();
2253
2254 let ref_ctx = context.child("ref_stepwise");
2255 let mut ref_db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2256 ref_ctx.child("db"),
2257 variable_config::<OneCap>("test_stepwise_ref", &ref_ctx),
2258 )
2259 .await
2260 .unwrap();
2261
2262 let k = key(0);
2263 let mut commit_idx = 0u64;
2264
2265 while *db.inactivity_floor_loc() < 1024 {
2267 let value = Some(val(80_000 + commit_idx));
2268 mmb_commit(&mut db, [(k, value)]).await;
2269 mmb_commit(&mut ref_db, [(k, value)]).await;
2270 commit_idx += 1;
2271 }
2272
2273 db.prune(db.sync_boundary()).await.unwrap();
2274 db.sync().await.unwrap();
2275 assert_eq!(
2276 db.root(),
2277 ref_db.root(),
2278 "root mismatch immediately after prune"
2279 );
2280
2281 loop {
2283 let db_leaves =
2284 *Location::<mmb::Family>::try_from(db.any.log.merkle.size()).unwrap();
2285 if db_leaves >= 1560 {
2286 break;
2287 }
2288
2289 let value = Some(val(80_000 + commit_idx));
2290 mmb_commit(&mut db, [(k, value)]).await;
2291 mmb_commit(&mut ref_db, [(k, value)]).await;
2292 commit_idx += 1;
2293
2294 let db_leaves =
2295 *Location::<mmb::Family>::try_from(db.any.log.merkle.size()).unwrap();
2296 assert_eq!(
2297 db.root(),
2298 ref_db.root(),
2299 "stepwise root mismatch: leaves={db_leaves}, commit_idx={commit_idx}"
2300 );
2301 }
2302
2303 db.destroy().await.unwrap();
2304 ref_db.destroy().await.unwrap();
2305 });
2306 }
2307
2308 #[test_traced]
2310 fn test_current_mmb_large_repeated_prune_matches_unpruned_reference() {
2311 let executor = deterministic::Runner::default();
2312 executor.start(|context| async move {
2313 const ROUNDS: u64 = 8;
2314 const COMMITS_PER_ROUND: u64 = 120;
2315
2316 let mut db_ctx = context.child("db_init");
2317 let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2318 db_ctx.child("db"),
2319 variable_config::<OneCap>("test_large_prune", &db_ctx),
2320 )
2321 .await
2322 .unwrap();
2323
2324 let ref_ctx = context.child("ref");
2325 let mut ref_db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2326 ref_ctx.child("db"),
2327 variable_config::<OneCap>("test_large_prune_ref", &ref_ctx),
2328 )
2329 .await
2330 .unwrap();
2331
2332 let k = key(0);
2333 let mut expected = None;
2334
2335 for round in 0..ROUNDS {
2336 for i in 0..COMMITS_PER_ROUND {
2337 let value = Some(val(round * 10_000 + i));
2338 expected = value;
2339 mmb_commit(&mut db, [(k, value)]).await;
2340 mmb_commit(&mut ref_db, [(k, value)]).await;
2341 }
2342
2343 assert_eq!(
2344 db.root(),
2345 ref_db.root(),
2346 "root mismatch before prune at round {round}"
2347 );
2348
2349 db.prune(db.sync_boundary()).await.unwrap();
2350 db.sync().await.unwrap();
2351
2352 assert_eq!(
2353 db.root(),
2354 ref_db.root(),
2355 "root mismatch after prune at round {round}"
2356 );
2357
2358 let hasher = qmdb::hasher::<Sha256>();
2359 let proof = db.key_value_proof(&hasher, k).await.unwrap();
2360 assert!(
2361 UnorderedVariableMmbDb::verify_key_value_proof(
2362 &hasher,
2363 k,
2364 expected.expect("value should exist"),
2365 &proof,
2366 &db.root()
2367 ),
2368 "proof verification failed at round {round}"
2369 );
2370
2371 db_ctx = context.child("db_reopen").with_attribute("round", round);
2372 let prev_db = db;
2373 db = UnorderedVariableMmbDb::init(
2374 db_ctx.child("db"),
2375 variable_config::<OneCap>("test_large_prune", &db_ctx),
2376 )
2377 .await
2378 .unwrap();
2379
2380 assert_eq!(
2381 db.root(),
2382 ref_db.root(),
2383 "root mismatch after reopen at round {round}"
2384 );
2385 assert_eq!(
2386 db.get(&k).await.unwrap(),
2387 expected,
2388 "value mismatch after reopen at round {round}"
2389 );
2390
2391 let hasher = qmdb::hasher::<Sha256>();
2392 let proof = db.key_value_proof(&hasher, k).await.unwrap();
2393 assert!(
2394 UnorderedVariableMmbDb::verify_key_value_proof(
2395 &hasher,
2396 k,
2397 expected.expect("value should exist"),
2398 &proof,
2399 &db.root()
2400 ),
2401 "proof verification failed after reopen at round {round}"
2402 );
2403
2404 drop(prev_db);
2405 }
2406
2407 db.destroy().await.unwrap();
2408 ref_db.destroy().await.unwrap();
2409 });
2410 }
2411
2412 #[test_traced]
2414 fn test_current_prune_rejects_beyond_sync_boundary_without_mutation() {
2415 let executor = deterministic::Runner::default();
2416 executor.start(|context| async move {
2417 const COMMITS: u64 = 160;
2418
2419 let partition = "current-prune-beyond-boundary";
2420 let ctx = context.child("db");
2421 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2422 ctx.child("storage"),
2423 variable_config::<OneCap>(partition, &ctx),
2424 )
2425 .await
2426 .unwrap();
2427
2428 let key0 = key(0);
2429 for round in 0..COMMITS {
2430 commit_writes_with_metadata(&mut db, [(key0, Some(val(40_000 + round)))], None)
2431 .await;
2432 }
2433
2434 let expected_root = db.root();
2435 let expected_ops_root = db.ops_root();
2436 let expected_boundary = db.sync_boundary();
2437 let expected_pruned_bits = db.pruned_bits();
2438 let expected_value = db.get(&key0).await.unwrap();
2439
2440 let invalid_prune_loc = Location::new(*expected_boundary + 256);
2442 let result = db.prune(invalid_prune_loc).await;
2443 assert!(
2444 matches!(result, Err(Error::PruneBeyondMinRequired(loc, boundary))
2445 if loc == invalid_prune_loc && boundary == expected_boundary),
2446 "expected prune rejection above sync boundary, got {result:?}"
2447 );
2448
2449 assert_eq!(db.root(), expected_root);
2450 assert_eq!(db.ops_root(), expected_ops_root);
2451 assert_eq!(db.pruned_bits(), expected_pruned_bits);
2452 assert_eq!(db.get(&key0).await.unwrap(), expected_value);
2453
2454 drop(db);
2455
2456 let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
2457 context.child("reopen"),
2458 variable_config::<OneCap>(partition, &context),
2459 )
2460 .await
2461 .unwrap();
2462 assert_eq!(reopened.root(), expected_root);
2463 assert_eq!(reopened.ops_root(), expected_ops_root);
2464 assert_eq!(reopened.pruned_bits(), expected_pruned_bits);
2465 assert_eq!(reopened.get(&key0).await.unwrap(), expected_value);
2466
2467 reopened.destroy().await.unwrap();
2468 });
2469 }
2470
2471 #[test_traced("INFO")]
2472 fn test_current_rewind_small_delta_large_history() {
2473 let executor = deterministic::Runner::default();
2474 executor.start(|context| async move {
2475 const COMMITS: u64 = 200;
2476
2477 let partition = "current-rewind-small-delta";
2478 let ctx = context.child("db");
2479 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2480 ctx.child("storage"),
2481 variable_config::<OneCap>(partition, &ctx),
2482 )
2483 .await
2484 .unwrap();
2485
2486 let key0 = key(0);
2487 let key1 = key(1);
2488 let mut history = Vec::new();
2489
2490 for round in 0..COMMITS {
2491 let key0_value = val(40_000 + round);
2492 let key1_value = if round % 3 == 1 {
2493 None
2494 } else {
2495 Some(val(50_000 + round))
2496 };
2497
2498 commit_writes_with_metadata(
2499 &mut db,
2500 [(key0, Some(key0_value)), (key1, key1_value)],
2501 None,
2502 )
2503 .await;
2504
2505 history.push((
2506 db.bounds().await.end,
2507 db.root(),
2508 db.ops_root(),
2509 key0_value,
2510 key1_value,
2511 ));
2512 }
2513
2514 let target = *history
2515 .get(history.len() - 3)
2516 .expect("history should contain at least three commits");
2517 let (target_size, target_root, target_ops_root, target_key0, target_key1) = target;
2518
2519 db.rewind(target_size).await.unwrap();
2520 assert_eq!(db.bounds().await.end, target_size);
2521 assert_eq!(db.root(), target_root);
2522 assert_eq!(db.ops_root(), target_ops_root);
2523 assert_eq!(db.get(&key0).await.unwrap(), Some(target_key0));
2524 assert_eq!(db.get(&key1).await.unwrap(), target_key1);
2525
2526 db.commit().await.unwrap();
2527 drop(db);
2528
2529 let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
2530 context.child("reopen_small_delta"),
2531 variable_config::<OneCap>(partition, &context),
2532 )
2533 .await
2534 .unwrap();
2535 assert_eq!(reopened.bounds().await.end, target_size);
2536 assert_eq!(reopened.root(), target_root);
2537 assert_eq!(reopened.ops_root(), target_ops_root);
2538 assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_key0));
2539 assert_eq!(reopened.get(&key1).await.unwrap(), target_key1);
2540
2541 reopened.destroy().await.unwrap();
2542 });
2543 }
2544
2545 #[test_traced("INFO")]
2546 fn test_current_rewind_pruned_target_errors() {
2547 let executor = deterministic::Runner::default();
2548 executor.start(|context| async move {
2549 const KEYS: u64 = 384;
2550
2551 let partition = "current-rewind-pruned";
2552 let ctx = context.child("db");
2553 let mut db: UnorderedVariableDb =
2554 UnorderedVariableDb::init(ctx.child("storage"), variable_config::<OneCap>(partition, &ctx))
2555 .await
2556 .unwrap();
2557
2558 let first_range = commit_writes_with_metadata(
2559 &mut db,
2560 (0..KEYS).map(|i| (key(i), Some(val(i)))),
2561 None,
2562 )
2563 .await;
2564 commit_writes_with_metadata(
2565 &mut db,
2566 (0..KEYS).map(|i| (key(i), Some(val(1000 + i)))),
2567 None,
2568 )
2569 .await;
2570
2571 db.prune(db.sync_boundary()).await.unwrap();
2572 let pruned_bits = db.pruned_bits();
2573 assert!(
2574 pruned_bits > *first_range.start,
2575 "expected bitmap pruning boundary above rewind target: pruned_bits={pruned_bits}, target={:?}",
2576 first_range.start
2577 );
2578
2579 let oldest_retained = db.bounds().await.start;
2580 let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
2581 assert!(
2582 matches!(
2583 boundary_err,
2584 Error::Journal(crate::journal::Error::ItemPruned(_))
2585 ),
2586 "unexpected rewind error at retained boundary: {boundary_err:?}"
2587 );
2588
2589 let expected_pruned_loc = *first_range.start - 1;
2590 let err = db.rewind(first_range.start).await.unwrap_err();
2591 assert!(
2592 matches!(
2593 err,
2594 Error::Journal(crate::journal::Error::ItemPruned(loc))
2595 if loc == expected_pruned_loc
2596 ),
2597 "unexpected rewind error: {err:?}"
2598 );
2599
2600 db.destroy().await.unwrap();
2601 });
2602 }
2603
2604 #[test_traced("INFO")]
2605 fn test_current_rewind_rejects_target_below_bitmap_floor() {
2606 let executor = deterministic::Runner::default();
2607 executor.start(|context| async move {
2608 const COMMITS: u64 = 96;
2609
2610 let partition = "current-rewind-bitmap-floor";
2611 let ctx = context.child("db");
2612 let mut db: UnorderedVariableDb =
2613 UnorderedVariableDb::init(ctx.child("storage"), variable_config::<OneCap>(partition, &ctx))
2614 .await
2615 .unwrap();
2616
2617 let mut history = Vec::new();
2618 for round in 0..COMMITS {
2619 commit_writes_with_metadata(
2620 &mut db,
2621 [(key(0), Some(val(10_000 + round)))],
2622 None,
2623 )
2624 .await;
2625 history.push((db.bounds().await.end, db.inactivity_floor_loc()));
2626 }
2627 assert!(db.inactivity_floor_loc() > Location::new(64));
2628
2629 let prune_loc = Location::new(1);
2632 db.prune(prune_loc).await.unwrap();
2633 let pruned_bits = db.pruned_bits();
2634 assert!(pruned_bits > 0);
2635 let retained_start = db.bounds().await.start;
2636
2637 let rewind_target = history
2640 .iter()
2641 .find_map(|(size, floor)| {
2642 if *size > *retained_start
2643 && *size >= pruned_bits
2644 && *floor >= *retained_start
2645 && *floor < pruned_bits
2646 {
2647 Some(*size)
2648 } else {
2649 None
2650 }
2651 })
2652 .unwrap_or_else(|| {
2653 panic!(
2654 "expected rewind target below bitmap boundary. retained_start={retained_start:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
2655 db.inactivity_floor_loc()
2656 )
2657 });
2658
2659 let err = db.rewind(rewind_target).await.unwrap_err();
2660 assert!(
2661 matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
2662 "unexpected rewind error: {err:?}"
2663 );
2664
2665 db.destroy().await.unwrap();
2666 });
2667 }
2668
2669 pub fn test_speculative_root_matches_committed<M, C, F, Fut>(mut open_db: F)
2675 where
2676 M: merkle::Graftable + 'static,
2677 C: DbAny<M> + 'static,
2678 C::Key: TestKey,
2679 <C as DbAny<M>>::Value: TestValue,
2680 F: FnMut(Context, String) -> Fut + Clone,
2681 Fut: Future<Output = C>,
2682 {
2683 let executor = deterministic::Runner::default();
2684 let mut open_db_clone = open_db.clone();
2685 executor.start(|context| async move {
2686 let partition = "speculative-root".to_string();
2687
2688 let mut db: C = open_db_clone(context.child("init"), partition.clone()).await;
2693 let mut batch = db.new_batch();
2694 for i in 0..260 {
2695 batch = batch.write(TestKey::from_seed(i), Some(TestValue::from_seed(i + 1000)));
2696 }
2697 let merkleized = batch.merkleize(&db, None).await.unwrap();
2698 db.apply_batch(merkleized).await.unwrap();
2699 let speculative_root = db.root();
2700
2701 db.sync().await.unwrap();
2703 drop(db);
2704
2705 let db: C = open_db(context.child("reopen"), partition).await;
2706 assert_eq!(db.root(), speculative_root);
2707
2708 db.destroy().await.unwrap();
2709 });
2710 }
2711
2712 #[test_group("slow")]
2713 #[test_traced("INFO")]
2714 fn test_all_variants_speculative_root_matches_committed() {
2715 let executor = deterministic::Runner::default();
2716 executor.start(|_context| async move {
2717 for_all_variants!(simple: test_speculative_root_matches_committed);
2718 });
2719 }
2720
2721 #[test_traced("INFO")]
2723 fn test_current_batch_merkleized_get() {
2724 let executor = deterministic::Runner::default();
2725 executor.start(|context| async move {
2726 let ctx = context.child("db");
2727 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2728 ctx.child("storage"),
2729 variable_config::<OneCap>("mg", &ctx),
2730 )
2731 .await
2732 .unwrap();
2733
2734 let ka = key(0);
2735 let kb = key(1);
2736 let kc = key(2);
2737
2738 {
2740 let mut batch = db.new_batch();
2741 batch = batch.write(ka, Some(val(0)));
2742 let merkleized = batch.merkleize(&db, None).await.unwrap();
2743 db.apply_batch(merkleized).await.unwrap();
2744 }
2745
2746 let va2 = val(100);
2748 let vb = val(1);
2749 let mut batch = db.new_batch();
2750 batch = batch.write(ka, Some(va2));
2751 batch = batch.write(kb, Some(vb));
2752 let merkleized = batch.merkleize(&db, None).await.unwrap();
2753
2754 assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
2755 assert_eq!(merkleized.get(&kb, &db).await.unwrap(), Some(vb));
2756 assert_eq!(merkleized.get(&kc, &db).await.unwrap(), None);
2757
2758 db.destroy().await.unwrap();
2759 });
2760 }
2761
2762 #[test_traced("INFO")]
2765 fn test_current_batch_chaining() {
2766 let executor = deterministic::Runner::default();
2767 executor.start(|context| async move {
2768 let ctx = context.child("db");
2769 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2770 ctx.child("storage"),
2771 variable_config::<OneCap>("ch", &ctx),
2772 )
2773 .await
2774 .unwrap();
2775
2776 let mut parent = db.new_batch();
2778 for i in 0..5 {
2779 parent = parent.write(key(i), Some(val(i)));
2780 }
2781 let parent_m = parent.merkleize(&db, None).await.unwrap();
2782
2783 let mut child = parent_m.new_batch::<Sha256>();
2785 for i in 5..10 {
2786 child = child.write(key(i), Some(val(i)));
2787 }
2788 child = child.write(key(0), Some(val(999)));
2789 let child_m = child.merkleize(&db, None).await.unwrap();
2790
2791 let child_root = child_m.root();
2792
2793 assert_eq!(child_m.get(&key(0), &db).await.unwrap(), Some(val(999)));
2795 assert_eq!(child_m.get(&key(3), &db).await.unwrap(), Some(val(3)));
2796 assert_eq!(child_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
2797
2798 db.apply_batch(child_m).await.unwrap();
2799 assert_eq!(db.root(), child_root);
2800
2801 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(999)));
2803 for i in 1..10 {
2804 assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
2805 }
2806
2807 db.destroy().await.unwrap();
2808 });
2809 }
2810
2811 #[test_traced("INFO")]
2812 fn test_current_unordered_root_matches_between_pending_and_committed_paths() {
2813 let executor = deterministic::Runner::default();
2814 executor.start(|context| async move {
2815 let ctx = context.child("db");
2816 let mut db: UnorderedFixedDb =
2817 UnorderedFixedDb::init(ctx.child("storage"), fixed_config::<OneCap>("ucr", &ctx))
2818 .await
2819 .unwrap();
2820 let key_a = colliding_digest(0xAA, 1);
2821 let key_b = colliding_digest(0xAA, 0);
2822
2823 let mut initial = db.new_batch();
2828 for i in 0..4 {
2829 initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
2830 }
2831 let merkleized = initial.merkleize(&db, None).await.unwrap();
2832 db.apply_batch(merkleized).await.unwrap();
2833 db.commit().await.unwrap();
2834
2835 let parent = db
2839 .new_batch()
2840 .write(key_a, Some(colliding_digest(0xCC, 1)))
2841 .merkleize(&db, None)
2842 .await
2843 .unwrap();
2844
2845 let pending_child = parent
2849 .new_batch::<Sha256>()
2850 .write(key_a, Some(colliding_digest(0xDD, 1)))
2851 .write(key_b, Some(colliding_digest(0xDD, 0)))
2852 .merkleize(&db, None)
2853 .await
2854 .unwrap();
2855
2856 let pending_root = pending_child.root();
2857 let pending_ops_root = pending_child.ops_root();
2858
2859 db.apply_batch(parent).await.unwrap();
2860 db.commit().await.unwrap();
2861
2862 let committed_child = db
2863 .new_batch()
2864 .write(key_a, Some(colliding_digest(0xDD, 1)))
2865 .write(key_b, Some(colliding_digest(0xDD, 0)))
2866 .merkleize(&db, None)
2867 .await
2868 .unwrap();
2869
2870 assert_eq!(pending_root, committed_child.root());
2871 assert_eq!(pending_ops_root, committed_child.ops_root());
2872
2873 db.apply_batch(pending_child).await.unwrap();
2876 assert_eq!(db.root(), committed_child.root());
2877 assert_eq!(db.ops_root(), committed_child.ops_root());
2878
2879 db.destroy().await.unwrap();
2880 });
2881 }
2882
2883 #[test_traced("INFO")]
2884 fn test_current_ordered_root_matches_between_pending_and_committed_paths() {
2885 let executor = deterministic::Runner::default();
2886 executor.start(|context| async move {
2887 let ctx = context.child("db");
2888 let mut db: OrderedFixedDb =
2889 OrderedFixedDb::init(ctx.child("storage"), fixed_config::<OneCap>("ocr", &ctx))
2890 .await
2891 .unwrap();
2892 let key_a = colliding_digest(0xAA, 1);
2893 let key_b = colliding_digest(0xAA, 0);
2894
2895 let mut initial = db.new_batch();
2898 for i in 0..4 {
2899 initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
2900 }
2901 let merkleized = initial.merkleize(&db, None).await.unwrap();
2902 db.apply_batch(merkleized).await.unwrap();
2903 db.commit().await.unwrap();
2904
2905 let parent = db
2909 .new_batch()
2910 .write(key_a, Some(colliding_digest(0xCC, 1)))
2911 .merkleize(&db, None)
2912 .await
2913 .unwrap();
2914
2915 let pending_child = parent
2918 .new_batch::<Sha256>()
2919 .write(key_a, Some(colliding_digest(0xDD, 1)))
2920 .write(key_b, Some(colliding_digest(0xDD, 0)))
2921 .merkleize(&db, None)
2922 .await
2923 .unwrap();
2924
2925 let pending_root = pending_child.root();
2926 let pending_ops_root = pending_child.ops_root();
2927
2928 db.apply_batch(parent).await.unwrap();
2929 db.commit().await.unwrap();
2930
2931 let committed_child = db
2932 .new_batch()
2933 .write(key_a, Some(colliding_digest(0xDD, 1)))
2934 .write(key_b, Some(colliding_digest(0xDD, 0)))
2935 .merkleize(&db, None)
2936 .await
2937 .unwrap();
2938
2939 assert_eq!(pending_root, committed_child.root());
2940 assert_eq!(pending_ops_root, committed_child.ops_root());
2941
2942 db.apply_batch(pending_child).await.unwrap();
2945 assert_eq!(db.root(), committed_child.root());
2946 assert_eq!(db.ops_root(), committed_child.ops_root());
2947
2948 db.destroy().await.unwrap();
2949 });
2950 }
2951
2952 #[test_traced("INFO")]
2954 fn test_current_batch_apply_requires_commit_for_recovery() {
2955 let executor = deterministic::Runner::default();
2956 executor.start(|context| async move {
2957 let partition = "apply_requires_commit";
2958 let ctx = context.child("db");
2959 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2960 ctx.child("storage"),
2961 variable_config::<OneCap>(partition, &ctx),
2962 )
2963 .await
2964 .unwrap();
2965
2966 let committed_root = db.root();
2967
2968 let merkleized = db
2969 .new_batch()
2970 .write(key(0), Some(val(0)))
2971 .merkleize(&db, None)
2972 .await
2973 .unwrap();
2974 db.apply_batch(merkleized).await.unwrap();
2975
2976 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2977
2978 drop(db);
2979
2980 let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
2981 context.child("reopen"),
2982 variable_config::<OneCap>(partition, &context),
2983 )
2984 .await
2985 .unwrap();
2986 assert_eq!(reopened.root(), committed_root);
2987 assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
2988
2989 reopened.destroy().await.unwrap();
2990 });
2991 }
2992
2993 #[test_traced("INFO")]
2995 fn test_current_batch_single_stage_pipeline() {
2996 let executor = deterministic::Runner::default();
2997 executor.start(|context| async move {
2998 let ctx = context.child("db");
2999 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3000 ctx.child("storage"),
3001 variable_config::<OneCap>("pipe", &ctx),
3002 )
3003 .await
3004 .unwrap();
3005
3006 let mut batch = db.new_batch();
3007 batch = batch.write(key(0), Some(val(0)));
3008 let parent_merkleized = batch.merkleize(&db, None).await.unwrap();
3009 db.apply_batch(parent_merkleized).await.unwrap();
3010
3011 let (child_merkleized, commit_result) = futures::join!(
3012 async {
3013 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3014 let mut child = db.new_batch();
3015 child = child.write(key(1), Some(val(1)));
3016 child.merkleize(&db, None).await.unwrap()
3017 },
3018 db.commit(),
3019 );
3020 commit_result.unwrap();
3021
3022 db.apply_batch(child_merkleized).await.unwrap();
3023 db.commit().await.unwrap();
3024
3025 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3026 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3027
3028 db.destroy().await.unwrap();
3029 });
3030 }
3031
3032 #[test_traced("INFO")]
3035 fn test_current_sequential_commit() {
3036 let executor = deterministic::Runner::default();
3037 executor.start(|context| async move {
3038 let ctx = context.child("db");
3039 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3040 ctx.child("storage"),
3041 variable_config::<OneCap>("ff", &ctx),
3042 )
3043 .await
3044 .unwrap();
3045
3046 let parent_m = db
3048 .new_batch()
3049 .write(key(0), Some(val(0)))
3050 .merkleize(&db, None)
3051 .await
3052 .unwrap();
3053
3054 let child_m = parent_m
3056 .new_batch::<Sha256>()
3057 .write(key(1), Some(val(1)))
3058 .merkleize(&db, None)
3059 .await
3060 .unwrap();
3061
3062 db.apply_batch(parent_m).await.unwrap();
3063 db.apply_batch(child_m).await.unwrap();
3064
3065 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3067 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3068
3069 let ctx2 = context.child("db").with_attribute("index", 2);
3072 let mut db2: UnorderedVariableDb = UnorderedVariableDb::init(
3073 ctx2.child("db"),
3074 variable_config::<OneCap>("ff2", &ctx2),
3075 )
3076 .await
3077 .unwrap();
3078 let m1 = db2
3079 .new_batch()
3080 .write(key(0), Some(val(0)))
3081 .merkleize(&db2, None)
3082 .await
3083 .unwrap();
3084 db2.apply_batch(m1).await.unwrap();
3085 let m2 = db2
3086 .new_batch()
3087 .write(key(1), Some(val(1)))
3088 .merkleize(&db2, None)
3089 .await
3090 .unwrap();
3091 db2.apply_batch(m2).await.unwrap();
3092
3093 assert_eq!(db.root(), db2.root());
3094
3095 db.destroy().await.unwrap();
3096 db2.destroy().await.unwrap();
3097 });
3098 }
3099
3100 #[test_traced("INFO")]
3103 fn test_current_to_batch_then_chain() {
3104 let executor = deterministic::Runner::default();
3105 executor.start(|context| async move {
3106 let ctx = context.child("db");
3107 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3108 ctx.child("storage"),
3109 variable_config::<OneCap>("tb", &ctx),
3110 )
3111 .await
3112 .unwrap();
3113
3114 let m = db
3116 .new_batch()
3117 .write(key(0), Some(val(0)))
3118 .merkleize(&db, None)
3119 .await
3120 .unwrap();
3121 db.apply_batch(m).await.unwrap();
3122
3123 let snapshot = db.to_batch();
3125 assert_eq!(snapshot.root(), db.root());
3126
3127 let child = snapshot
3129 .new_batch::<Sha256>()
3130 .write(key(1), Some(val(1)))
3131 .merkleize(&db, None)
3132 .await
3133 .unwrap();
3134
3135 assert_ne!(child.root(), snapshot.root());
3137
3138 db.apply_batch(child).await.unwrap();
3140 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3141 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3142
3143 db.destroy().await.unwrap();
3144 });
3145 }
3146
3147 #[test_traced("INFO")]
3152 fn test_current_live_batch_safe_across_prune() {
3153 let executor = deterministic::Runner::default();
3154 executor.start(|context| async move {
3155 let ctx = context.child("db");
3156 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3157 ctx.child("storage"),
3158 variable_config::<OneCap>("prune-live", &ctx),
3159 )
3160 .await
3161 .unwrap();
3162
3163 let mut seed = db.new_batch();
3165 for i in 0u64..300 {
3166 seed = seed.write(key(i), Some(val(i)));
3167 }
3168 let seed_m = seed.merkleize(&db, None).await.unwrap();
3169 db.apply_batch(seed_m).await.unwrap();
3170 db.commit().await.unwrap();
3171
3172 let mut p = db.new_batch();
3174 for i in 0u64..250 {
3175 p = p.write(key(i), Some(val(i + 10_000)));
3176 }
3177 let p_m = p.merkleize(&db, None).await.unwrap();
3178 db.apply_batch(Arc::clone(&p_m)).await.unwrap();
3179 db.commit().await.unwrap();
3180
3181 let c = p_m
3183 .new_batch::<Sha256>()
3184 .write(key(250), Some(val(99_999)))
3185 .merkleize(&db, None)
3186 .await
3187 .unwrap();
3188
3189 db.prune(db.sync_boundary()).await.unwrap();
3191
3192 assert_eq!(c.get(&key(250), &db).await.unwrap(), Some(val(99_999)));
3194
3195 db.apply_batch(c).await.unwrap();
3198 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(10_000)));
3199 assert_eq!(db.get(&key(250)).await.unwrap(), Some(val(99_999)));
3200
3201 db.destroy().await.unwrap();
3202 });
3203 }
3204
3205 #[test_traced("INFO")]
3214 fn test_current_extend_applied_batch() {
3215 let executor = deterministic::Runner::default();
3216 executor.start(|context| async move {
3217 let ctx = context.child("db");
3218 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3219 ctx.child("storage"),
3220 variable_config::<OneCap>("xtend", &ctx),
3221 )
3222 .await
3223 .unwrap();
3224
3225 let a = db
3227 .new_batch()
3228 .write(key(0), Some(val(0)))
3229 .merkleize(&db, None)
3230 .await
3231 .unwrap();
3232 db.apply_batch(Arc::clone(&a)).await.unwrap();
3233
3234 let b = a
3238 .new_batch::<Sha256>()
3239 .write(key(1), Some(val(1)))
3240 .merkleize(&db, None)
3241 .await
3242 .unwrap();
3243 db.apply_batch(b).await.unwrap();
3244
3245 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3246 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3247
3248 let c = db
3250 .new_batch()
3251 .write(key(2), Some(val(2)))
3252 .merkleize(&db, None)
3253 .await
3254 .unwrap();
3255 db.apply_batch(c).await.unwrap();
3256
3257 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3258 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3259 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
3260
3261 db.destroy().await.unwrap();
3262 });
3263 }
3264
3265 #[test_traced("INFO")]
3272 fn test_current_live_batch_child_after_prune() {
3273 let executor = deterministic::Runner::default();
3274 executor.start(|context| async move {
3275 let ctx = context.child("db");
3276 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3277 ctx.child("storage"),
3278 variable_config::<OneCap>("child-after-prune", &ctx),
3279 )
3280 .await
3281 .unwrap();
3282
3283 let mut seed = db.new_batch();
3285 for i in 0u64..300 {
3286 seed = seed.write(key(i), Some(val(i)));
3287 }
3288 let seed_m = seed.merkleize(&db, None).await.unwrap();
3289 db.apply_batch(seed_m).await.unwrap();
3290 db.commit().await.unwrap();
3291
3292 let mut a_batch = db.new_batch();
3294 for i in 0u64..250 {
3295 a_batch = a_batch.write(key(i), Some(val(i + 10_000)));
3296 }
3297 let a = a_batch.merkleize(&db, None).await.unwrap();
3298 db.apply_batch(Arc::clone(&a)).await.unwrap();
3299 db.commit().await.unwrap();
3300
3301 db.prune(db.sync_boundary()).await.unwrap();
3303
3304 let b = a
3308 .new_batch::<Sha256>()
3309 .write(key(300), Some(val(300)))
3310 .merkleize(&db, None)
3311 .await
3312 .unwrap();
3313
3314 db.apply_batch(b).await.unwrap();
3315 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(10_000)));
3316 assert_eq!(db.get(&key(249)).await.unwrap(), Some(val(10_249)));
3317 assert_eq!(db.get(&key(300)).await.unwrap(), Some(val(300)));
3318
3319 db.destroy().await.unwrap();
3320 });
3321 }
3322
3323 #[test_traced("WARN")]
3327 fn test_current_apply_after_ancestor_dropped() {
3328 let executor = deterministic::Runner::default();
3329 executor.start(|context| async move {
3330 let ctx = context.child("db");
3331 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3332 ctx.child("storage"),
3333 variable_config::<OneCap>("adrop", &ctx),
3334 )
3335 .await
3336 .unwrap();
3337
3338 let mut a = db.new_batch();
3340 for i in 0..3 {
3341 a = a.write(key(i), Some(val(i)));
3342 }
3343 let a_m = a.merkleize(&db, None).await.unwrap();
3344
3345 let mut b = a_m.new_batch::<Sha256>();
3346 for i in 3..6 {
3347 b = b.write(key(i), Some(val(i)));
3348 }
3349 let b_m = b.merkleize(&db, None).await.unwrap();
3350
3351 let mut c = b_m.new_batch::<Sha256>();
3352 for i in 6..9 {
3353 c = c.write(key(i), Some(val(i)));
3354 }
3355 let c_m = c.merkleize(&db, None).await.unwrap();
3356
3357 drop(a_m);
3359 drop(b_m);
3360
3361 db.apply_batch(c_m).await.unwrap();
3363 db.commit().await.unwrap();
3364
3365 for i in 0..9 {
3367 assert_eq!(
3368 db.get(&key(i)).await.unwrap(),
3369 Some(val(i)),
3370 "key({i}) missing after apply_batch with dropped ancestors"
3371 );
3372 }
3373
3374 db.destroy().await.unwrap();
3375 });
3376 }
3377
3378 #[test_traced("WARN")]
3387 fn test_current_chain_bitmap_order_matches_sequential() {
3388 let executor = deterministic::Runner::default();
3389 executor.start(|context| async move {
3390 let ctx1 = context.child("db").with_attribute("index", 1);
3392 let mut db1: UnorderedVariableDb = UnorderedVariableDb::init(
3393 ctx1.child("db"),
3394 variable_config::<OneCap>("ord1", &ctx1),
3395 )
3396 .await
3397 .unwrap();
3398
3399 commit_writes_with_metadata(
3401 &mut db1,
3402 [(key(10), Some(val(10))), (key(11), Some(val(11)))],
3403 None,
3404 )
3405 .await;
3406
3407 let a = db1
3414 .new_batch()
3415 .write(key(10), Some(val(100)))
3416 .write(key(11), None) .merkleize(&db1, None)
3418 .await
3419 .unwrap();
3420
3421 let b = a
3422 .new_batch::<Sha256>()
3423 .write(key(12), Some(val(120)))
3424 .write(key(13), Some(val(130)))
3425 .merkleize(&db1, None)
3426 .await
3427 .unwrap();
3428
3429 let c = b
3430 .new_batch::<Sha256>()
3431 .write(key(14), Some(val(140)))
3432 .merkleize(&db1, None)
3433 .await
3434 .unwrap();
3435
3436 db1.apply_batch(c).await.unwrap();
3437 db1.commit().await.unwrap();
3438
3439 let d1 = db1
3441 .new_batch()
3442 .write(key(20), Some(val(200)))
3443 .merkleize(&db1, None)
3444 .await
3445 .unwrap();
3446 let chain_then_d_root = d1.root();
3447
3448 let ctx2 = context.child("db").with_attribute("index", 2);
3450 let mut db2: UnorderedVariableDb = UnorderedVariableDb::init(
3451 ctx2.child("db"),
3452 variable_config::<OneCap>("ord2", &ctx2),
3453 )
3454 .await
3455 .unwrap();
3456
3457 commit_writes_with_metadata(
3458 &mut db2,
3459 [(key(10), Some(val(10))), (key(11), Some(val(11)))],
3460 None,
3461 )
3462 .await;
3463
3464 let a2 = db2
3465 .new_batch()
3466 .write(key(10), Some(val(100)))
3467 .write(key(11), None)
3468 .merkleize(&db2, None)
3469 .await
3470 .unwrap();
3471 db2.apply_batch(a2).await.unwrap();
3472 db2.commit().await.unwrap();
3473
3474 let b2 = db2
3475 .new_batch()
3476 .write(key(12), Some(val(120)))
3477 .write(key(13), Some(val(130)))
3478 .merkleize(&db2, None)
3479 .await
3480 .unwrap();
3481 db2.apply_batch(b2).await.unwrap();
3482 db2.commit().await.unwrap();
3483
3484 let c2 = db2
3485 .new_batch()
3486 .write(key(14), Some(val(140)))
3487 .merkleize(&db2, None)
3488 .await
3489 .unwrap();
3490 db2.apply_batch(c2).await.unwrap();
3491 db2.commit().await.unwrap();
3492
3493 let d2 = db2
3494 .new_batch()
3495 .write(key(20), Some(val(200)))
3496 .merkleize(&db2, None)
3497 .await
3498 .unwrap();
3499 let sequential_then_d_root = d2.root();
3500
3501 assert_eq!(
3502 chain_then_d_root, sequential_then_d_root,
3503 "batch D's root on top of chain-applied state must match sequential state"
3504 );
3505
3506 db1.destroy().await.unwrap();
3507 db2.destroy().await.unwrap();
3508 });
3509 }
3510
3511 #[test_traced("WARN")]
3523 fn test_current_stale_bitmap_clears_after_prune() {
3524 let executor = deterministic::Runner::default();
3525 executor.start(|context| async move {
3526 let ctx = context.child("db");
3527 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3528 ctx.child("storage"),
3529 variable_config::<OneCap>("stale-clears", &ctx),
3530 )
3531 .await
3532 .unwrap();
3533
3534 let mut seed = db.new_batch();
3536 for i in 0u64..255 {
3537 seed = seed.write(key(i), Some(val(i)));
3538 }
3539 let seed_m = seed.merkleize(&db, None).await.unwrap();
3540 db.apply_batch(seed_m).await.unwrap();
3541 db.commit().await.unwrap();
3542
3543 let mut p = db.new_batch();
3546 for i in 1u64..255 {
3547 p = p.write(key(i), Some(val(i + 10000)));
3548 }
3549 let p_m = p.merkleize(&db, None).await.unwrap();
3550
3551 let c_m = p_m
3553 .new_batch::<Sha256>()
3554 .write(key(0), Some(val(9999)))
3555 .merkleize(&db, None)
3556 .await
3557 .unwrap();
3558
3559 db.apply_batch(p_m).await.unwrap();
3561 db.commit().await.unwrap();
3562
3563 let floor = *db.inactivity_floor_loc();
3564 assert!(floor >= 256, "floor must be past chunk 0: floor={floor}",);
3565
3566 db.prune(db.sync_boundary()).await.unwrap();
3567 db.apply_batch(c_m).await.unwrap();
3568
3569 db.destroy().await.unwrap();
3570 });
3571 }
3572
3573 #[test_traced("INFO")]
3576 fn test_current_partial_ancestor_commit() {
3577 let executor = deterministic::Runner::default();
3578 executor.start(|context| async move {
3579 let ctx = context.child("db");
3580 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3581 ctx.child("storage"),
3582 variable_config::<OneCap>("pac", &ctx),
3583 )
3584 .await
3585 .unwrap();
3586
3587 let a = db
3588 .new_batch()
3589 .write(key(0), Some(val(0)))
3590 .merkleize(&db, None)
3591 .await
3592 .unwrap();
3593 let b = a
3594 .new_batch::<Sha256>()
3595 .write(key(1), Some(val(1)))
3596 .merkleize(&db, None)
3597 .await
3598 .unwrap();
3599 let c = b
3600 .new_batch::<Sha256>()
3601 .write(key(2), Some(val(2)))
3602 .merkleize(&db, None)
3603 .await
3604 .unwrap();
3605
3606 let expected_root = c.root();
3607
3608 db.apply_batch(a).await.unwrap();
3609 db.apply_batch(c).await.unwrap();
3610
3611 assert_eq!(db.root(), expected_root);
3612 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3613 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3614 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
3615
3616 db.destroy().await.unwrap();
3617 });
3618 }
3619
3620 #[test_traced("INFO")]
3624 fn test_current_partial_ancestor_bitmap_ordering() {
3625 let executor = deterministic::Runner::default();
3626 executor.start(|context| async move {
3627 let ctx = context.child("db");
3628 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3629 ctx.child("storage"),
3630 variable_config::<OneCap>("bmo", &ctx),
3631 )
3632 .await
3633 .unwrap();
3634
3635 let a = db
3637 .new_batch()
3638 .write(key(0), Some(val(0)))
3639 .merkleize(&db, None)
3640 .await
3641 .unwrap();
3642 let b = a
3643 .new_batch::<Sha256>()
3644 .write(key(1), Some(val(1)))
3645 .merkleize(&db, None)
3646 .await
3647 .unwrap();
3648 let c = b
3649 .new_batch::<Sha256>()
3650 .write(key(2), Some(val(2)))
3651 .merkleize(&db, None)
3652 .await
3653 .unwrap();
3654 let d = c
3655 .new_batch::<Sha256>()
3656 .write(key(3), Some(val(3)))
3657 .merkleize(&db, None)
3658 .await
3659 .unwrap();
3660
3661 db.apply_batch(a).await.unwrap();
3665 db.apply_batch(d.clone()).await.unwrap();
3666
3667 let e = db
3672 .new_batch()
3673 .write(key(4), Some(val(4)))
3674 .merkleize(&db, None)
3675 .await
3676 .unwrap();
3677 db.apply_batch(e).await.unwrap();
3678
3679 let ref_ctx = context.child("ref");
3681 let mut ref_db: UnorderedVariableDb = UnorderedVariableDb::init(
3682 ref_ctx.child("db"),
3683 variable_config::<OneCap>("bmo_ref", &ref_ctx),
3684 )
3685 .await
3686 .unwrap();
3687 for i in 0..5 {
3688 let batch = ref_db
3689 .new_batch()
3690 .write(key(i), Some(val(i)))
3691 .merkleize(&ref_db, None)
3692 .await
3693 .unwrap();
3694 ref_db.apply_batch(batch).await.unwrap();
3695 }
3696
3697 assert_eq!(
3698 db.root(),
3699 ref_db.root(),
3700 "root mismatch: bitmap ordering bug"
3701 );
3702
3703 db.destroy().await.unwrap();
3704 ref_db.destroy().await.unwrap();
3705 });
3706 }
3707
3708 #[test_traced("INFO")]
3718 fn test_current_apply_chunks_match_speculative_chunks() {
3719 const N: usize = 32;
3720 const CHUNK_SIZE_BITS: u64 = commonware_utils::bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
3721 const SEED_KEYS: u64 = CHUNK_SIZE_BITS + 50;
3724
3725 let executor = deterministic::Runner::default();
3726 executor.start(|context| async move {
3727 let ctx = context.child("db");
3728 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3729 ctx.child("storage"),
3730 variable_config::<OneCap>("spec_eq", &ctx),
3731 )
3732 .await
3733 .unwrap();
3734
3735 let seed = (0..SEED_KEYS).fold(db.new_batch(), |b, i| b.write(key(i), Some(val(i))));
3737 let seed = seed.merkleize(&db, None).await.unwrap();
3738 db.apply_batch(seed).await.unwrap();
3739 db.commit().await.unwrap();
3740
3741 assert!(
3743 Readable::<N>::len(db.any.bitmap.as_ref()) > CHUNK_SIZE_BITS,
3744 "setup must cross a chunk boundary",
3745 );
3746
3747 let parent = db
3749 .new_batch()
3750 .write(key(10), Some(val(110))) .write(key(50), None) .write(key(CHUNK_SIZE_BITS + 5), Some(val(120))) .write(key(SEED_KEYS), Some(val(130))) .write(key(SEED_KEYS + 1), Some(val(131))) .merkleize(&db, None)
3756 .await
3757 .unwrap();
3758
3759 let child = parent
3764 .new_batch::<Sha256>()
3765 .write(key(10), Some(val(210)))
3766 .write(key(SEED_KEYS), None)
3767 .write(key(75), None)
3768 .write(key(CHUNK_SIZE_BITS + 30), Some(val(220)))
3769 .merkleize(&db, None)
3770 .await
3771 .unwrap();
3772
3773 let speculative_chunks: Vec<[u8; N]> = {
3775 let len = Readable::<N>::len(&child.bitmap);
3776 let chunk_count = len.div_ceil(CHUNK_SIZE_BITS) as usize;
3777 (0..chunk_count)
3778 .map(|idx| Readable::<N>::get_chunk(&child.bitmap, idx))
3779 .collect()
3780 };
3781 assert!(speculative_chunks.len() >= 2);
3783
3784 db.apply_batch(child).await.unwrap();
3788 let committed_chunks: Vec<[u8; N]> = {
3789 let len = Readable::<N>::len(db.any.bitmap.as_ref());
3790 let chunk_count = len.div_ceil(CHUNK_SIZE_BITS) as usize;
3791 (0..chunk_count)
3792 .map(|idx| Readable::<N>::get_chunk(db.any.bitmap.as_ref(), idx))
3793 .collect()
3794 };
3795
3796 assert_eq!(
3797 speculative_chunks, committed_chunks,
3798 "speculative chunks must equal post-apply committed chunks across all chunks",
3799 );
3800
3801 db.destroy().await.unwrap();
3802 });
3803 }
3804
3805 #[test_traced("INFO")]
3807 fn test_current_mmb_ops_historical_proof_verifies_with_backward_bagging() {
3808 use crate::{merkle::hasher::Standard, qmdb::verify_proof};
3809 use commonware_utils::NZU64;
3810
3811 let executor = deterministic::Runner::default();
3812 executor.start(|context| async move {
3813 let ctx = context.child("db");
3814 let mut db: UnorderedFixedMmbDb = UnorderedFixedMmbDb::init(
3815 ctx.child("storage"),
3816 fixed_config::<OneCap>("mmb-ops-proof", &ctx),
3817 )
3818 .await
3819 .unwrap();
3820
3821 let writes: Vec<(Digest, Option<Digest>)> =
3823 (0u64..16).map(|i| (key(i), Some(val(i)))).collect();
3824 commit_writes(&mut db, writes).await.unwrap();
3825
3826 let ops_root = db.ops_root();
3827 let historical_size = db.bounds().await.end;
3828 let (proof, ops) = db
3829 .ops_historical_proof(historical_size, Location::new(0), NZU64!(32))
3830 .await
3831 .unwrap();
3832
3833 let hasher = qmdb::hasher::<Sha256>();
3835 assert!(verify_proof(
3836 &hasher,
3837 &proof,
3838 Location::new(0),
3839 &ops,
3840 &ops_root
3841 ));
3842
3843 let plain = Standard::<Sha256>::new(ForwardFold);
3845 assert!(!verify_proof(
3846 &plain,
3847 &proof,
3848 Location::new(0),
3849 &ops,
3850 &ops_root
3851 ));
3852
3853 db.destroy().await.unwrap();
3854 });
3855 }
3856}