1use crate::{
181 index::Factory as IndexFactory,
182 journal::{
183 authenticated::Inner,
184 contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
185 },
186 merkle::{self, Location},
187 mmr::{journaled::Config as MmrConfig, StandardHasher},
188 qmdb::{
189 any::{
190 self,
191 operation::{Operation, Update},
192 Config as AnyConfig,
193 },
194 operation::Committable,
195 },
196 translator::Translator,
197 Context,
198};
199use commonware_codec::{CodecShared, FixedSize};
200use commonware_cryptography::Hasher;
201use commonware_utils::{bitmap::Prunable as BitMap, sync::AsyncMutex};
202use std::sync::Arc;
203
204pub mod batch;
205pub mod db;
206mod grafting;
207
208pub mod ordered;
209pub mod proof;
210pub(crate) mod sync;
211pub mod unordered;
212
213#[derive(Clone)]
215pub struct Config<T: Translator, J> {
216 pub merkle_config: MmrConfig,
218
219 pub journal_config: J,
221
222 pub grafted_metadata_partition: String,
224
225 pub translator: T,
227}
228
229impl<T: Translator, J> From<Config<T, J>> for AnyConfig<T, J> {
230 fn from(cfg: Config<T, J>) -> Self {
231 Self {
232 merkle_config: cfg.merkle_config,
233 journal_config: cfg.journal_config,
234 translator: cfg.translator,
235 }
236 }
237}
238
239pub type FixedConfig<T> = Config<T, FConfig>;
241
242pub type VariableConfig<T, C> = Config<T, VConfig<C>>;
244
245pub(super) async fn init<F, E, U, H, T, I, J, const N: usize>(
247 context: E,
248 config: Config<T, J::Config>,
249) -> Result<db::Db<F, E, J, I, H, U, N>, crate::qmdb::Error<F>>
250where
251 F: merkle::Graftable,
252 E: Context,
253 U: Update + Send + Sync,
254 H: Hasher,
255 T: Translator,
256 I: IndexFactory<T, Value = Location<F>>,
257 J: Inner<E, Item = Operation<F, U>>,
258 Operation<F, U>: Committable + CodecShared,
259{
260 const {
262 assert!(
266 N.is_multiple_of(H::Digest::SIZE),
267 "chunk size must be some multiple of the digest size",
268 );
269 assert!(N.is_power_of_two(), "chunk size must be a power of 2");
272 }
273
274 let thread_pool = config.merkle_config.thread_pool.clone();
275 let metadata_partition = config.grafted_metadata_partition.clone();
276
277 let (metadata, pruned_chunks, pinned_nodes) =
279 db::init_metadata(context.with_label("metadata"), &metadata_partition).await?;
280
281 let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
283 .map_err(|_| crate::qmdb::Error::<F>::DataCorrupted("pruned chunks overflow"))?;
284
285 let last_known_inactivity_floor = Location::new(status.len());
287 let any = any::init(
288 context.with_label("any"),
289 config.into(),
290 Some(last_known_inactivity_floor),
291 |append: bool, loc: Option<Location<F>>| {
292 status.push(append);
293 if let Some(loc) = loc {
294 status.set_bit(*loc, false);
295 }
296 },
297 )
298 .await?;
299
300 let hasher = StandardHasher::<H>::new();
302 let grafted_tree = db::build_grafted_tree::<F, H, N>(
303 &hasher,
304 &status,
305 &pinned_nodes,
306 &any.log.merkle,
307 thread_pool.as_ref(),
308 )
309 .await?;
310
311 let storage = grafting::Storage::new(&grafted_tree, grafting::height::<N>(), &any.log.merkle);
313 let partial_chunk = db::partial_chunk(&status);
314 let ops_root = any.log.root();
315 let root = db::compute_db_root(&hasher, &status, &storage, partial_chunk, &ops_root).await?;
316
317 Ok(db::Db {
318 any,
319 status: batch::BitmapBatch::Base(Arc::new(status)),
320 grafted_tree,
321 metadata: AsyncMutex::new(metadata),
322 thread_pool,
323 root,
324 })
325}
326
327#[cfg(any(test, feature = "test-traits"))]
329pub trait BitmapPrunedBits {
330 fn pruned_bits(&self) -> u64;
332
333 fn get_bit(&self, index: u64) -> bool;
335
336 fn oldest_retained(&self) -> impl core::future::Future<Output = u64> + Send;
338}
339
340#[cfg(test)]
341pub mod tests {
342 pub use super::BitmapPrunedBits;
345 use super::{ordered, unordered, FConfig, FixedConfig, MmrConfig, VConfig, VariableConfig};
346 use crate::{
347 merkle::{self, mmb, mmr},
348 qmdb::{
349 any::{
350 test::colliding_digest,
351 traits::{DbAny, MerkleizedBatch as _, UnmerkleizedBatch as _},
352 },
353 store::tests::{TestKey, TestValue},
354 },
355 translator::Translator,
356 };
357 use commonware_runtime::{
358 buffer::paged::CacheRef,
359 deterministic::{self, Context},
360 BufferPooler, Metrics as _, Runner as _,
361 };
362 use commonware_utils::{NZUsize, NZU16, NZU64};
363 use core::future::Future;
364 use rand::{rngs::StdRng, RngCore, SeedableRng};
365 use std::num::{NonZeroU16, NonZeroUsize};
366 use tracing::warn;
367
368 type Error<F> = crate::qmdb::Error<F>;
369 type Location<F> = merkle::Location<F>;
370 type WriteVec<F, C> = Vec<(<C as DbAny<F>>::Key, Option<<C as DbAny<F>>::Value>)>;
371
372 const PAGE_SIZE: NonZeroU16 = NZU16!(88);
374 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(8);
375
376 pub(crate) fn fixed_config<T: Translator + Default>(
378 partition_prefix: &str,
379 pooler: &impl BufferPooler,
380 ) -> FixedConfig<T> {
381 let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
382 FixedConfig {
383 merkle_config: MmrConfig {
384 journal_partition: format!("{partition_prefix}-journal-partition"),
385 metadata_partition: format!("{partition_prefix}-metadata-partition"),
386 items_per_blob: NZU64!(11),
387 write_buffer: NZUsize!(1024),
388 thread_pool: None,
389 page_cache: page_cache.clone(),
390 },
391 journal_config: FConfig {
392 partition: format!("{partition_prefix}-partition-prefix"),
393 items_per_blob: NZU64!(7),
394 page_cache,
395 write_buffer: NZUsize!(1024),
396 },
397 grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
398 translator: T::default(),
399 }
400 }
401
402 pub(crate) fn variable_config<T: Translator + Default>(
404 partition_prefix: &str,
405 pooler: &impl BufferPooler,
406 ) -> VariableConfig<T, ((), ())> {
407 let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
408 VariableConfig {
409 merkle_config: MmrConfig {
410 journal_partition: format!("{partition_prefix}-journal-partition"),
411 metadata_partition: format!("{partition_prefix}-metadata-partition"),
412 items_per_blob: NZU64!(11),
413 write_buffer: NZUsize!(1024),
414 thread_pool: None,
415 page_cache: page_cache.clone(),
416 },
417 journal_config: VConfig {
418 partition: format!("{partition_prefix}-partition-prefix"),
419 items_per_section: NZU64!(7),
420 compression: None,
421 codec_config: ((), ()),
422 page_cache,
423 write_buffer: NZUsize!(1024),
424 },
425 grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
426 translator: T::default(),
427 }
428 }
429
430 async fn commit_writes<F: merkle::Graftable, C: DbAny<F>>(
432 db: &mut C,
433 writes: impl IntoIterator<Item = (C::Key, Option<<C as DbAny<F>>::Value>)>,
434 ) -> Result<(), Error<F>> {
435 let mut batch = db.new_batch();
436 for (k, v) in writes {
437 batch = batch.write(k, v);
438 }
439 let merkleized = batch.merkleize(db, None).await?;
440 db.apply_batch(merkleized).await?;
441 db.commit().await?;
442 Ok(())
443 }
444
445 async fn apply_random_ops_inner<F, C>(
450 num_elements: u64,
451 commit_changes: bool,
452 rng_seed: u64,
453 mut db: C,
454 ) -> Result<C, Error<F>>
455 where
456 F: merkle::Graftable,
457 C: DbAny<F>,
458 C::Key: TestKey,
459 <C as DbAny<F>>::Value: TestValue,
460 {
461 warn!("rng_seed={}", rng_seed);
463 let mut rng = StdRng::seed_from_u64(rng_seed);
464
465 let writes: Vec<_> = (0u64..num_elements)
467 .map(|i| {
468 let k = TestKey::from_seed(i);
469 let v = TestValue::from_seed(rng.next_u64());
470 (k, Some(v))
471 })
472 .collect();
473 if commit_changes {
474 commit_writes(&mut db, writes).await?;
475 }
476
477 let mut pending: WriteVec<F, C> = Vec::new();
480 for _ in 0u64..num_elements * 10 {
481 let rand_key = TestKey::from_seed(rng.next_u64() % num_elements);
482 if rng.next_u32() % 7 == 0 {
483 pending.push((rand_key, None));
484 continue;
485 }
486 let v = TestValue::from_seed(rng.next_u64());
487 pending.push((rand_key, Some(v)));
488 if commit_changes && rng.next_u32() % 20 == 0 {
489 commit_writes(&mut db, pending.drain(..)).await?;
490 }
491 }
492 if commit_changes {
493 commit_writes(&mut db, pending).await?;
494 }
495 Ok(db)
496 }
497
498 pub fn apply_random_ops<F, C>(
499 num_elements: u64,
500 commit_changes: bool,
501 rng_seed: u64,
502 db: C,
503 ) -> std::pin::Pin<Box<dyn Future<Output = Result<C, Error<F>>>>>
504 where
505 F: merkle::Graftable + 'static,
506 C: DbAny<F> + 'static,
507 C::Key: TestKey,
508 <C as DbAny<F>>::Value: TestValue,
509 {
510 Box::pin(apply_random_ops_inner::<F, C>(
511 num_elements,
512 commit_changes,
513 rng_seed,
514 db,
515 ))
516 }
517
518 pub fn test_build_random_close_reopen<M, C, F, Fut>(mut open_db: F)
523 where
524 M: merkle::Graftable + 'static,
525 C: DbAny<M> + 'static,
526 C::Key: TestKey,
527 <C as DbAny<M>>::Value: TestValue,
528 F: FnMut(Context, String) -> Fut + Clone,
529 Fut: Future<Output = C>,
530 {
531 const ELEMENTS: u64 = 1000;
532
533 let executor = deterministic::Runner::default();
534 let mut open_db_clone = open_db.clone();
535 let state1 = executor.start(|mut context| async move {
536 let partition = "build-random".to_string();
537 let rng_seed = context.next_u64();
538 let mut db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
539 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
540 .await
541 .unwrap();
542 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
543 db.apply_batch(merkleized).await.unwrap();
544 db.sync().await.unwrap();
545
546 let root = db.root();
548 drop(db);
549 let db: C = open_db_clone(context.with_label("second"), partition).await;
550
551 assert_eq!(db.root(), root);
553
554 db.destroy().await.unwrap();
555 context.auditor().state()
556 });
557
558 let executor = deterministic::Runner::default();
560 let state2 = executor.start(|mut context| async move {
561 let partition = "build-random".to_string();
562 let rng_seed = context.next_u64();
563 let mut db: C = open_db(context.with_label("first"), partition.clone()).await;
564 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
565 .await
566 .unwrap();
567 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
568 db.apply_batch(merkleized).await.unwrap();
569 db.sync().await.unwrap();
570
571 let root = db.root();
572 drop(db);
573 let db: C = open_db(context.with_label("second"), partition).await;
574 assert_eq!(db.root(), root);
575
576 db.destroy().await.unwrap();
577 context.auditor().state()
578 });
579
580 assert_eq!(state1, state2);
581 }
582
583 pub fn test_simulate_write_failures<M, C, F, Fut>(mut open_db: F)
588 where
589 M: merkle::Graftable + 'static,
590 C: DbAny<M> + 'static,
591 C::Key: TestKey,
592 <C as DbAny<M>>::Value: TestValue,
593 F: FnMut(Context, String) -> Fut + Clone,
594 Fut: Future<Output = C>,
595 {
596 const ELEMENTS: u64 = 1000;
597
598 let executor = deterministic::Runner::default();
599 executor.start(|mut context| {
601 Box::pin(async move {
602 let partition = "build-random-fail-commit".to_string();
603 let rng_seed = context.next_u64();
604 let mut db: C = open_db(context.with_label("first"), partition.clone()).await;
605 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
606 .await
607 .unwrap();
608 commit_writes(&mut db, []).await.unwrap();
609 let committed_root = db.root();
610 let committed_op_count = db.bounds().await.end;
611 let committed_inactivity_floor = db.inactivity_floor_loc().await;
612 db.prune(committed_inactivity_floor).await.unwrap();
613
614 let db = apply_random_ops::<M, C>(ELEMENTS, false, rng_seed + 1, db)
616 .await
617 .unwrap();
618
619 drop(db);
622 let db: C = open_db(context.with_label("scenario1"), partition.clone()).await;
623 assert_eq!(db.root(), committed_root);
624 assert_eq!(db.bounds().await.end, committed_op_count);
625
626 let db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
628 .await
629 .unwrap();
630
631 let committed_op_count = db.bounds().await.end;
635 drop(db);
636
637 let db: C = open_db(context.with_label("scenario2"), partition.clone()).await;
640 let scenario_2_root = db.root();
641
642 let fresh_partition = "build-random-fail-commit-fresh".to_string();
645 let mut db: C = open_db(context.with_label("fresh"), fresh_partition.clone()).await;
646 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
647 .await
648 .unwrap();
649 commit_writes(&mut db, []).await.unwrap();
650 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
651 .await
652 .unwrap();
653 db.prune(db.inactivity_floor_loc().await).await.unwrap();
654 assert_eq!(db.bounds().await.end, committed_op_count);
656 assert_eq!(db.root(), scenario_2_root);
657
658 db.destroy().await.unwrap();
659 })
660 });
661 }
662
663 pub fn test_different_pruning_delays_same_root<M, C, F, Fut>(mut open_db: F)
668 where
669 M: merkle::Graftable,
670 C: DbAny<M>,
671 C::Key: TestKey,
672 <C as DbAny<M>>::Value: TestValue,
673 F: FnMut(Context, String) -> Fut + Clone,
674 Fut: Future<Output = C>,
675 {
676 const NUM_OPERATIONS: u64 = 1000;
677
678 let executor = deterministic::Runner::default();
679 let mut open_db_clone = open_db.clone();
680 executor.start(|context| async move {
681 let mut db_no_pruning: C =
683 open_db_clone(context.with_label("no_pruning"), "no-pruning-test".into()).await;
684 let mut db_pruning: C =
685 open_db(context.with_label("pruning"), "pruning-test".into()).await;
686
687 let mut pending_no_pruning: WriteVec<M, C> = Vec::new();
690 let mut pending_pruning: WriteVec<M, C> = Vec::new();
691 for i in 0..NUM_OPERATIONS {
692 let key: C::Key = TestKey::from_seed(i);
693 let value: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
694
695 pending_no_pruning.push((key, Some(value.clone())));
696 pending_pruning.push((key, Some(value)));
697
698 if i % 50 == 49 {
700 commit_writes(&mut db_no_pruning, pending_no_pruning.drain(..))
701 .await
702 .unwrap();
703 commit_writes(&mut db_pruning, pending_pruning.drain(..))
704 .await
705 .unwrap();
706 db_pruning
707 .prune(db_no_pruning.inactivity_floor_loc().await)
708 .await
709 .unwrap();
710 }
711 }
712
713 commit_writes(&mut db_no_pruning, pending_no_pruning)
715 .await
716 .unwrap();
717 commit_writes(&mut db_pruning, pending_pruning)
718 .await
719 .unwrap();
720
721 let root_no_pruning = db_no_pruning.root();
723 let root_pruning = db_pruning.root();
724 assert_eq!(root_no_pruning, root_pruning);
725
726 assert_eq!(
728 db_no_pruning.inactivity_floor_loc().await,
729 db_pruning.inactivity_floor_loc().await
730 );
731
732 db_no_pruning.destroy().await.unwrap();
733 db_pruning.destroy().await.unwrap();
734 });
735 }
736
737 pub fn test_sync_persists_bitmap_pruning_boundary<M, C, F, Fut>(mut open_db: F)
743 where
744 M: merkle::Graftable + 'static,
745 C: DbAny<M> + BitmapPrunedBits + 'static,
746 C::Key: TestKey,
747 <C as DbAny<M>>::Value: TestValue,
748 F: FnMut(Context, String) -> Fut + Clone,
749 Fut: Future<Output = C>,
750 {
751 const ELEMENTS: u64 = 500;
752
753 let executor = deterministic::Runner::default();
754 let mut open_db_clone = open_db.clone();
755 executor.start(|mut context| async move {
756 let partition = "sync-bitmap-pruning".to_string();
757 let rng_seed = context.next_u64();
758 let mut db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
759
760 db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db).await.unwrap();
762 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
763 db.apply_batch(merkleized).await.unwrap();
764
765 let floor = db.inactivity_floor_loc().await;
767 db.prune(floor).await.unwrap();
768
769 let pruned_bits_before = db.pruned_bits();
770 warn!(
771 "pruned_bits_before={}, inactivity_floor={}, op_count={}",
772 pruned_bits_before,
773 *db.inactivity_floor_loc().await,
774 *db.bounds().await.end
775 );
776
777 assert!(
779 pruned_bits_before > 0,
780 "Expected bitmap to have pruned bits after prune()"
781 );
782
783 db.sync().await.unwrap();
785
786 let root_before = db.root();
788 drop(db);
789
790 let db: C = open_db(context.with_label("second"), partition).await;
792
793 let pruned_bits_after = db.pruned_bits();
796 warn!("pruned_bits_after={}", pruned_bits_after);
797
798 assert_eq!(
799 pruned_bits_after, pruned_bits_before,
800 "Bitmap pruned bits mismatch after reopen - sync() may not have called write_pruned()"
801 );
802
803 assert_eq!(db.root(), root_before);
805
806 db.destroy().await.unwrap();
807 });
808 }
809
810 pub fn test_current_db_build_big<M, C, F, Fut>(mut open_db: F)
816 where
817 M: merkle::Graftable,
818 C: DbAny<M>,
819 C::Key: TestKey,
820 <C as DbAny<M>>::Value: TestValue,
821 F: FnMut(Context, String) -> Fut + Clone,
822 Fut: Future<Output = C>,
823 {
824 const ELEMENTS: u64 = 1000;
825
826 let executor = deterministic::Runner::default();
827 let mut open_db_clone = open_db.clone();
828 executor.start(|context| async move {
829 let mut db: C = open_db_clone(context.with_label("first"), "build-big".into()).await;
830
831 let mut map = std::collections::HashMap::<C::Key, <C as DbAny<M>>::Value>::default();
832
833 let mut batch = db.new_batch();
835
836 for i in 0u64..ELEMENTS {
838 let k: C::Key = TestKey::from_seed(i);
839 let v: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
840 batch = batch.write(k, Some(v.clone()));
841 map.insert(k, v);
842 }
843
844 for i in 0u64..ELEMENTS {
846 if i % 3 != 0 {
847 continue;
848 }
849 let k: C::Key = TestKey::from_seed(i);
850 let v: <C as DbAny<M>>::Value = TestValue::from_seed((i + 1) * 10000);
851 batch = batch.write(k, Some(v.clone()));
852 map.insert(k, v);
853 }
854
855 for i in 0u64..ELEMENTS {
857 if i % 7 != 1 {
858 continue;
859 }
860 let k: C::Key = TestKey::from_seed(i);
861 batch = batch.write(k, None);
862 map.remove(&k);
863 }
864
865 let merkleized = batch.merkleize(&db, None).await.unwrap();
866 db.apply_batch(merkleized).await.unwrap();
867
868 db.sync().await.unwrap();
870 db.prune(db.inactivity_floor_loc().await).await.unwrap();
871
872 let root = db.root();
874 db.sync().await.unwrap();
875 drop(db);
876
877 let db: C = open_db(context.with_label("second"), "build-big".into()).await;
879 assert_eq!(root, db.root());
880
881 for i in 0u64..ELEMENTS {
883 let k: C::Key = TestKey::from_seed(i);
884 if let Some(map_value) = map.get(&k) {
885 let Some(db_value) = db.get(&k).await.unwrap() else {
886 panic!("key not found in db: {k}");
887 };
888 assert_eq!(*map_value, db_value);
889 } else {
890 assert!(db.get(&k).await.unwrap().is_none());
891 }
892 }
893 });
894 }
895
896 pub fn test_stale_batch_side_effect_free<M, C, F, Fut>(mut open_db: F)
900 where
901 M: merkle::Graftable,
902 C: DbAny<M>,
903 C::Key: TestKey,
904 <C as DbAny<M>>::Value: TestValue,
905 F: FnMut(Context, String) -> Fut,
906 Fut: Future<Output = C>,
907 {
908 let executor = deterministic::Runner::default();
909 executor.start(|context| async move {
910 let mut db: C =
911 open_db(context.with_label("db"), "stale-side-effect-free".into()).await;
912
913 let key1 = <C::Key as TestKey>::from_seed(1);
914 let key2 = <C::Key as TestKey>::from_seed(2);
915 let value1 = <<C as DbAny<M>>::Value as TestValue>::from_seed(10);
916 let value2 = <<C as DbAny<M>>::Value as TestValue>::from_seed(20);
917
918 let mut batch = db.new_batch();
919 batch = batch.write(key1, Some(value1.clone()));
920 let batch_a = batch.merkleize(&db, None).await.unwrap();
921 let mut batch = db.new_batch();
922 batch = batch.write(key2, Some(value2));
923 let batch_b = batch.merkleize(&db, None).await.unwrap();
924
925 db.apply_batch(batch_a).await.unwrap();
926 let expected_root = db.root();
927 let expected_bounds = db.bounds().await;
928 let expected_metadata = db.get_metadata().await.unwrap();
929 assert_eq!(db.get(&key1).await.unwrap(), Some(value1.clone()));
930 assert_eq!(db.get(&key2).await.unwrap(), None);
931
932 let result = db.apply_batch(batch_b).await;
933 assert!(
934 matches!(result, Err(Error::StaleBatch { .. })),
935 "expected StaleBatch error, got {result:?}"
936 );
937 assert_eq!(db.root(), expected_root);
938 assert_eq!(db.bounds().await, expected_bounds);
939 assert_eq!(db.get_metadata().await.unwrap(), expected_metadata);
940 assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
941 assert_eq!(db.get(&key2).await.unwrap(), None);
942
943 db.destroy().await.unwrap();
944 });
945 }
946
947 use crate::translator::OneCap;
948 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
949 use commonware_macros::{test_group, test_traced};
950
951 type OrderedFixedDb =
952 ordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
953 type OrderedVariableDb =
954 ordered::variable::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
955 type UnorderedFixedDb =
956 unordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
957 type UnorderedVariableDb =
958 unordered::variable::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
959 type OrderedFixedP1Db = ordered::fixed::partitioned::Db<
960 mmr::Family,
961 Context,
962 Digest,
963 Digest,
964 Sha256,
965 OneCap,
966 1,
967 32,
968 >;
969 type OrderedVariableP1Db = ordered::variable::partitioned::Db<
970 mmr::Family,
971 Context,
972 Digest,
973 Digest,
974 Sha256,
975 OneCap,
976 1,
977 32,
978 >;
979 type UnorderedFixedP1Db = unordered::fixed::partitioned::Db<
980 mmr::Family,
981 Context,
982 Digest,
983 Digest,
984 Sha256,
985 OneCap,
986 1,
987 32,
988 >;
989 type UnorderedVariableP1Db = unordered::variable::partitioned::Db<
990 mmr::Family,
991 Context,
992 Digest,
993 Digest,
994 Sha256,
995 OneCap,
996 1,
997 32,
998 >;
999 type OrderedFixedP2Db = ordered::fixed::partitioned::Db<
1000 mmr::Family,
1001 Context,
1002 Digest,
1003 Digest,
1004 Sha256,
1005 OneCap,
1006 2,
1007 32,
1008 >;
1009 type OrderedVariableP2Db = ordered::variable::partitioned::Db<
1010 mmr::Family,
1011 Context,
1012 Digest,
1013 Digest,
1014 Sha256,
1015 OneCap,
1016 2,
1017 32,
1018 >;
1019 type UnorderedFixedP2Db = unordered::fixed::partitioned::Db<
1020 mmr::Family,
1021 Context,
1022 Digest,
1023 Digest,
1024 Sha256,
1025 OneCap,
1026 2,
1027 32,
1028 >;
1029 type UnorderedVariableP2Db = unordered::variable::partitioned::Db<
1030 mmr::Family,
1031 Context,
1032 Digest,
1033 Digest,
1034 Sha256,
1035 OneCap,
1036 2,
1037 32,
1038 >;
1039
1040 type OrderedFixedMmbDb =
1041 ordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
1042 type OrderedVariableMmbDb =
1043 ordered::variable::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
1044 type UnorderedFixedMmbDb =
1045 unordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
1046 type UnorderedVariableMmbDb =
1047 unordered::variable::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
1048 type OrderedFixedMmbP1Db = ordered::fixed::partitioned::Db<
1049 mmb::Family,
1050 Context,
1051 Digest,
1052 Digest,
1053 Sha256,
1054 OneCap,
1055 1,
1056 32,
1057 >;
1058 type OrderedVariableMmbP1Db = ordered::variable::partitioned::Db<
1059 mmb::Family,
1060 Context,
1061 Digest,
1062 Digest,
1063 Sha256,
1064 OneCap,
1065 1,
1066 32,
1067 >;
1068 type UnorderedFixedMmbP1Db = unordered::fixed::partitioned::Db<
1069 mmb::Family,
1070 Context,
1071 Digest,
1072 Digest,
1073 Sha256,
1074 OneCap,
1075 1,
1076 32,
1077 >;
1078 type UnorderedVariableMmbP1Db = unordered::variable::partitioned::Db<
1079 mmb::Family,
1080 Context,
1081 Digest,
1082 Digest,
1083 Sha256,
1084 OneCap,
1085 1,
1086 32,
1087 >;
1088 type OrderedFixedMmbP2Db = ordered::fixed::partitioned::Db<
1089 mmb::Family,
1090 Context,
1091 Digest,
1092 Digest,
1093 Sha256,
1094 OneCap,
1095 2,
1096 32,
1097 >;
1098 type OrderedVariableMmbP2Db = ordered::variable::partitioned::Db<
1099 mmb::Family,
1100 Context,
1101 Digest,
1102 Digest,
1103 Sha256,
1104 OneCap,
1105 2,
1106 32,
1107 >;
1108 type UnorderedFixedMmbP2Db = unordered::fixed::partitioned::Db<
1109 mmb::Family,
1110 Context,
1111 Digest,
1112 Digest,
1113 Sha256,
1114 OneCap,
1115 2,
1116 32,
1117 >;
1118 type UnorderedVariableMmbP2Db = unordered::variable::partitioned::Db<
1119 mmb::Family,
1120 Context,
1121 Digest,
1122 Digest,
1123 Sha256,
1124 OneCap,
1125 2,
1126 32,
1127 >;
1128
1129 macro_rules! open_db_fn {
1131 ($db:ty, $cfg:ident) => {
1132 |ctx: Context, partition: String| async move {
1133 <$db>::init(ctx.clone(), $cfg::<OneCap>(&partition, &ctx))
1134 .await
1135 .unwrap()
1136 }
1137 };
1138 }
1139
1140 macro_rules! with_all_variants {
1142 ($cb:ident!($($args:tt)*)) => {
1143 $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1144 $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1145 $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1146 $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1147 $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1148 $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1149 $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1150 $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1151 $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1152 $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1153 $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1154 $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1155 $cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
1156 $cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
1157 $cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
1158 $cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
1159 $cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
1160 $cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
1161 $cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
1162 $cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
1163 $cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
1164 $cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
1165 $cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
1166 $cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
1167 };
1168 }
1169
1170 macro_rules! with_ordered_variants {
1172 ($cb:ident!($($args:tt)*)) => {
1173 $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1174 $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1175 $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1176 $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1177 $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1178 $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1179 $cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
1180 $cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
1181 $cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
1182 $cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
1183 $cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
1184 $cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
1185 };
1186 }
1187
1188 macro_rules! with_unordered_variants {
1190 ($cb:ident!($($args:tt)*)) => {
1191 $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1192 $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1193 $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1194 $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1195 $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1196 $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1197 $cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
1198 $cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
1199 $cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
1200 $cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
1201 $cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
1202 $cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
1203 };
1204 }
1205
1206 macro_rules! test_simple {
1208 ($f:expr, $l:literal, $db:ty, $cfg:ident) => {
1209 Box::pin(async {
1210 $f(open_db_fn!($db, $cfg));
1211 })
1212 .await
1213 };
1214 }
1215
1216 macro_rules! for_all_variants {
1218 (simple: $f:expr) => {{
1219 with_all_variants!(test_simple!($f));
1220 }};
1221 (ordered: $f:expr) => {{
1222 with_ordered_variants!(test_simple!($f));
1223 }};
1224 (unordered: $f:expr) => {{
1225 with_unordered_variants!(test_simple!($f));
1226 }};
1227 }
1228
1229 fn test_ordered_build_big<M, C, F, Fut>(open_db: F)
1231 where
1232 M: merkle::Graftable,
1233 C: DbAny<M>,
1234 C::Key: TestKey,
1235 <C as DbAny<M>>::Value: TestValue,
1236 F: FnMut(Context, String) -> Fut + Clone,
1237 Fut: Future<Output = C>,
1238 {
1239 test_current_db_build_big::<M, C, F, Fut>(open_db);
1240 }
1241
1242 fn test_unordered_build_big<M, C, F, Fut>(open_db: F)
1243 where
1244 M: merkle::Graftable,
1245 C: DbAny<M>,
1246 C::Key: TestKey,
1247 <C as DbAny<M>>::Value: TestValue,
1248 F: FnMut(Context, String) -> Fut + Clone,
1249 Fut: Future<Output = C>,
1250 {
1251 test_current_db_build_big::<M, C, F, Fut>(open_db);
1252 }
1253
1254 #[test_group("slow")]
1255 #[test_traced("WARN")]
1256 fn test_all_variants_build_random_close_reopen() {
1257 let executor = deterministic::Runner::default();
1258 executor.start(|_context| async move {
1259 for_all_variants!(simple: test_build_random_close_reopen);
1260 });
1261 }
1262
1263 #[test_group("slow")]
1264 #[test_traced("WARN")]
1265 fn test_all_variants_simulate_write_failures() {
1266 let executor = deterministic::Runner::default();
1267 executor.start(|_context| async move {
1268 for_all_variants!(simple: test_simulate_write_failures);
1269 });
1270 }
1271
1272 #[test_group("slow")]
1273 #[test_traced("WARN")]
1274 fn test_all_variants_different_pruning_delays_same_root() {
1275 let executor = deterministic::Runner::default();
1276 executor.start(|_context| async move {
1277 for_all_variants!(simple: test_different_pruning_delays_same_root);
1278 });
1279 }
1280
1281 #[test_group("slow")]
1282 #[test_traced("WARN")]
1283 fn test_all_variants_sync_persists_bitmap_pruning_boundary() {
1284 let executor = deterministic::Runner::default();
1285 executor.start(|_context| async move {
1286 for_all_variants!(simple: test_sync_persists_bitmap_pruning_boundary);
1287 });
1288 }
1289
1290 #[test_traced("WARN")]
1291 fn test_all_variants_stale_batch_side_effect_free() {
1292 let executor = deterministic::Runner::default();
1293 executor.start(|_context| async move {
1294 for_all_variants!(simple: test_stale_batch_side_effect_free);
1295 });
1296 }
1297
1298 #[test_group("slow")]
1299 #[test_traced("WARN")]
1300 fn test_ordered_variants_build_big() {
1301 let executor = deterministic::Runner::default();
1302 executor.start(|_context| async move {
1303 for_all_variants!(ordered: test_ordered_build_big);
1304 });
1305 }
1306
1307 #[test_group("slow")]
1308 #[test_traced("WARN")]
1309 fn test_unordered_variants_build_big() {
1310 let executor = deterministic::Runner::default();
1311 executor.start(|_context| async move {
1312 for_all_variants!(unordered: test_unordered_build_big);
1313 });
1314 }
1315
1316 #[test_group("slow")]
1317 #[test_traced("DEBUG")]
1318 fn test_ordered_variants_build_small_close_reopen() {
1319 let executor = deterministic::Runner::default();
1320 executor.start(|_context| async move {
1321 for_all_variants!(ordered: ordered::tests::test_build_small_close_reopen);
1322 });
1323 }
1324
1325 #[test_group("slow")]
1326 #[test_traced("DEBUG")]
1327 fn test_unordered_variants_build_small_close_reopen() {
1328 let executor = deterministic::Runner::default();
1329 executor.start(|_context| async move {
1330 for_all_variants!(unordered: unordered::tests::test_build_small_close_reopen);
1331 });
1332 }
1333
1334 fn key(i: u64) -> Digest {
1341 Sha256::hash(&i.to_be_bytes())
1342 }
1343
1344 fn val(i: u64) -> Digest {
1345 Sha256::hash(&(i + 10000).to_be_bytes())
1346 }
1347
1348 async fn commit_writes_with_metadata(
1349 db: &mut UnorderedVariableDb,
1350 writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1351 metadata: Option<Digest>,
1352 ) -> std::ops::Range<Location<mmr::Family>> {
1353 let mut batch = db.new_batch();
1354 for (k, v) in writes {
1355 batch = batch.write(k, v);
1356 }
1357 let merkleized = batch.merkleize(db, metadata).await.unwrap();
1358 let range = db.apply_batch(merkleized).await.unwrap();
1359 db.commit().await.unwrap();
1360 range
1361 }
1362
1363 #[test_traced("INFO")]
1364 fn test_current_rewind_recovery() {
1365 let executor = deterministic::Runner::default();
1366 executor.start(|context| async move {
1367 let partition = "current-rewind-recovery";
1368 let ctx = context.with_label("db");
1369 let mut db: UnorderedVariableDb =
1370 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1371 .await
1372 .unwrap();
1373 let initial_size = db.bounds().await.end;
1374 let initial_root = db.root();
1375 let initial_ops_root = db.ops_root();
1376 let initial_floor = db.inactivity_floor_loc();
1377
1378 let metadata_a = val(900);
1379 let first_range = commit_writes_with_metadata(
1380 &mut db,
1381 [(key(0), Some(val(0))), (key(1), Some(val(1)))],
1382 Some(metadata_a),
1383 )
1384 .await;
1385 assert_eq!(first_range.start, initial_size);
1386 let size_before = db.bounds().await.end;
1387 let root_before = db.root();
1388 let ops_root_before = db.ops_root();
1389 let floor_before = db.inactivity_floor_loc();
1390 assert_eq!(size_before, first_range.end);
1391
1392 let metadata_b = val(901);
1393 let second_range = commit_writes_with_metadata(
1394 &mut db,
1395 [
1396 (key(0), Some(val(100))),
1397 (key(1), None),
1398 (key(2), Some(val(2))),
1399 ],
1400 Some(metadata_b),
1401 )
1402 .await;
1403 assert_eq!(second_range.start, size_before);
1404 assert_ne!(db.root(), root_before);
1405 assert_eq!(db.get_metadata().await.unwrap(), Some(val(901)));
1406 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1407 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1408 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1409
1410 db.rewind(size_before).await.unwrap();
1411 assert_eq!(db.bounds().await.end, size_before);
1412 assert_eq!(db.root(), root_before);
1413 assert_eq!(db.ops_root(), ops_root_before);
1414 assert_eq!(db.inactivity_floor_loc(), floor_before);
1415 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1416 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1417 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1418 assert_eq!(db.get(&key(2)).await.unwrap(), None);
1419
1420 db.commit().await.unwrap();
1421 drop(db);
1422
1423 let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1424 context.with_label("reopen"),
1425 variable_config::<OneCap>(partition, &context),
1426 )
1427 .await
1428 .unwrap();
1429 assert_eq!(reopened.bounds().await.end, size_before);
1430 assert_eq!(reopened.root(), root_before);
1431 assert_eq!(reopened.ops_root(), ops_root_before);
1432 assert_eq!(reopened.inactivity_floor_loc(), floor_before);
1433 assert_eq!(reopened.get_metadata().await.unwrap(), Some(val(900)));
1434 assert_eq!(reopened.get(&key(0)).await.unwrap(), Some(val(0)));
1435 assert_eq!(reopened.get(&key(1)).await.unwrap(), Some(val(1)));
1436 assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
1437
1438 let mut reopened = reopened;
1439 reopened.rewind(initial_size).await.unwrap();
1440 assert_eq!(reopened.bounds().await.end, initial_size);
1441 assert_eq!(reopened.root(), initial_root);
1442 assert_eq!(reopened.ops_root(), initial_ops_root);
1443 assert_eq!(reopened.inactivity_floor_loc(), initial_floor);
1444 assert_eq!(reopened.get_metadata().await.unwrap(), None);
1445 assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
1446 assert_eq!(reopened.get(&key(1)).await.unwrap(), None);
1447 assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
1448
1449 reopened.commit().await.unwrap();
1450 drop(reopened);
1451
1452 let reopened_initial: UnorderedVariableDb = UnorderedVariableDb::init(
1453 context.with_label("reopen_initial"),
1454 variable_config::<OneCap>(partition, &context),
1455 )
1456 .await
1457 .unwrap();
1458 assert_eq!(reopened_initial.bounds().await.end, initial_size);
1459 assert_eq!(reopened_initial.root(), initial_root);
1460 assert_eq!(reopened_initial.ops_root(), initial_ops_root);
1461 assert_eq!(reopened_initial.inactivity_floor_loc(), initial_floor);
1462 assert_eq!(reopened_initial.get_metadata().await.unwrap(), None);
1463 assert_eq!(reopened_initial.get(&key(0)).await.unwrap(), None);
1464 assert_eq!(reopened_initial.get(&key(1)).await.unwrap(), None);
1465 assert_eq!(reopened_initial.get(&key(2)).await.unwrap(), None);
1466
1467 reopened_initial.destroy().await.unwrap();
1468 });
1469 }
1470
1471 #[test_traced("INFO")]
1472 fn test_current_rewind_recovery_pruned_repeated_updates() {
1473 let executor = deterministic::Runner::default();
1474 executor.start(|context| async move {
1475 const COMMITS: u64 = 96;
1476
1477 let partition = "current-rewind-pruned-recovery";
1478 let ctx = context.with_label("db");
1479 let mut db: UnorderedVariableDb =
1480 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1481 .await
1482 .unwrap();
1483
1484 let key0 = key(0);
1485 let mut history = Vec::new();
1486 for round in 0..COMMITS {
1487 commit_writes_with_metadata(
1488 &mut db,
1489 [(key0, Some(val(20_000 + round)))],
1490 None,
1491 )
1492 .await;
1493 history.push((
1494 db.bounds().await.end,
1495 db.inactivity_floor_loc(),
1496 db.root(),
1497 db.ops_root(),
1498 val(20_000 + round),
1499 ));
1500 }
1501
1502 db.prune(Location::new(1)).await.unwrap();
1505 let pruned_bits = db.pruned_bits();
1506 assert!(pruned_bits > 0, "expected bitmap pruning for rewind test");
1507 let bounds = db.bounds().await;
1508
1509 let (target_size, target_root, target_ops_root, target_value) = history
1510 .iter()
1511 .enumerate()
1512 .find_map(|(idx, (size, floor, root, ops_root, value))| {
1513 let removed_commits = history.len() - idx - 1;
1514 if removed_commits >= 3 && *size > bounds.start && *floor >= pruned_bits {
1515 Some((*size, *root, *ops_root, *value))
1516 } else {
1517 None
1518 }
1519 })
1520 .unwrap_or_else(|| {
1521 panic!(
1522 "expected legal pruned rewind target with repeated updates; bounds={bounds:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
1523 db.inactivity_floor_loc()
1524 )
1525 });
1526
1527 db.rewind(target_size).await.unwrap();
1528 assert_eq!(db.root(), target_root);
1529 assert_eq!(db.ops_root(), target_ops_root);
1530 assert_eq!(db.bounds().await.end, target_size);
1531 assert_eq!(db.get(&key0).await.unwrap(), Some(target_value));
1532
1533 db.commit().await.unwrap();
1534 drop(db);
1535
1536 let mut reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1537 context.with_label("reopen_pruned_recovery"),
1538 variable_config::<OneCap>(partition, &context),
1539 )
1540 .await
1541 .unwrap();
1542 assert_eq!(reopened.root(), target_root);
1543 assert_eq!(reopened.ops_root(), target_ops_root);
1544 assert_eq!(reopened.bounds().await.end, target_size);
1545 assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
1546
1547 let metadata_after_rewind = val(30_000);
1548 let new_key = key(1);
1549 let new_value = val(30_001);
1550 let expected_end = commit_writes_with_metadata(
1551 &mut reopened,
1552 [(new_key, Some(new_value))],
1553 Some(metadata_after_rewind),
1554 )
1555 .await
1556 .end;
1557 let root_after_new_write = reopened.root();
1558 let ops_root_after_new_write = reopened.ops_root();
1559 assert_eq!(reopened.bounds().await.end, expected_end);
1560 assert_eq!(reopened.get_metadata().await.unwrap(), Some(metadata_after_rewind));
1561 assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
1562 assert_eq!(reopened.get(&new_key).await.unwrap(), Some(new_value));
1563
1564 drop(reopened);
1565 let reopened_after_new_write: UnorderedVariableDb = UnorderedVariableDb::init(
1566 context.with_label("reopen_pruned_after_new_write"),
1567 variable_config::<OneCap>(partition, &context),
1568 )
1569 .await
1570 .unwrap();
1571 assert_eq!(reopened_after_new_write.root(), root_after_new_write);
1572 assert_eq!(reopened_after_new_write.ops_root(), ops_root_after_new_write);
1573 assert_eq!(reopened_after_new_write.bounds().await.end, expected_end);
1574 assert_eq!(
1575 reopened_after_new_write.get_metadata().await.unwrap(),
1576 Some(metadata_after_rewind)
1577 );
1578 assert_eq!(reopened_after_new_write.get(&key0).await.unwrap(), Some(target_value));
1579 assert_eq!(
1580 reopened_after_new_write.get(&new_key).await.unwrap(),
1581 Some(new_value)
1582 );
1583
1584 reopened_after_new_write.destroy().await.unwrap();
1585 });
1586 }
1587
1588 #[test_traced("INFO")]
1596 fn test_current_mmb_reopen_and_prove_after_prune_multi_peak_chunk() {
1597 let executor = deterministic::Runner::default();
1598 executor.start(|context| async move {
1599 const COMMITS: u64 = 100;
1600
1601 let partition = "current-mmb-reopen-prove-after-prune";
1602 let ctx = context.with_label("db");
1603 let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1604 ctx.clone(),
1605 variable_config::<OneCap>(partition, &ctx),
1606 )
1607 .await
1608 .unwrap();
1609
1610 let k = key(0);
1611 let mut expected = None;
1612 for round in 0..COMMITS {
1613 expected = Some(val(50_000 + round));
1614 let mut batch = db.new_batch();
1615 batch = batch.write(k, expected);
1616 let merkleized = batch.merkleize(&db, None).await.unwrap();
1617 db.apply_batch(merkleized).await.unwrap();
1618 db.commit().await.unwrap();
1619 }
1620
1621 let root_before = db.root();
1622 assert!(
1623 *db.inactivity_floor_loc() >= 256,
1624 "expected inactivity floor past chunk 0"
1625 );
1626
1627 db.prune(Location::<mmb::Family>::new(1)).await.unwrap();
1628 assert_eq!(db.pruned_bits(), 256);
1629 db.sync().await.unwrap();
1630 drop(db);
1631
1632 let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1634 context.with_label("reopen"),
1635 variable_config::<OneCap>(partition, &context),
1636 )
1637 .await
1638 .unwrap();
1639
1640 assert_eq!(reopened.root(), root_before);
1641 assert_eq!(reopened.get(&k).await.unwrap(), expected);
1642
1643 let mut hasher = commonware_cryptography::Sha256::new();
1645 let _proof = reopened.key_value_proof(&mut hasher, k).await.unwrap();
1646
1647 reopened.destroy().await.unwrap();
1648 });
1649 }
1650
1651 #[test_traced("INFO")]
1652 fn test_current_rewind_small_delta_large_history() {
1653 let executor = deterministic::Runner::default();
1654 executor.start(|context| async move {
1655 const COMMITS: u64 = 200;
1656
1657 let partition = "current-rewind-small-delta";
1658 let ctx = context.with_label("db");
1659 let mut db: UnorderedVariableDb =
1660 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1661 .await
1662 .unwrap();
1663
1664 let key0 = key(0);
1665 let key1 = key(1);
1666 let mut history = Vec::new();
1667
1668 for round in 0..COMMITS {
1669 let key0_value = val(40_000 + round);
1670 let key1_value = if round % 3 == 1 {
1671 None
1672 } else {
1673 Some(val(50_000 + round))
1674 };
1675
1676 commit_writes_with_metadata(
1677 &mut db,
1678 [(key0, Some(key0_value)), (key1, key1_value)],
1679 None,
1680 )
1681 .await;
1682
1683 history.push((
1684 db.bounds().await.end,
1685 db.root(),
1686 db.ops_root(),
1687 key0_value,
1688 key1_value,
1689 ));
1690 }
1691
1692 let target = *history
1693 .get(history.len() - 3)
1694 .expect("history should contain at least three commits");
1695 let (target_size, target_root, target_ops_root, target_key0, target_key1) = target;
1696
1697 db.rewind(target_size).await.unwrap();
1698 assert_eq!(db.bounds().await.end, target_size);
1699 assert_eq!(db.root(), target_root);
1700 assert_eq!(db.ops_root(), target_ops_root);
1701 assert_eq!(db.get(&key0).await.unwrap(), Some(target_key0));
1702 assert_eq!(db.get(&key1).await.unwrap(), target_key1);
1703
1704 db.commit().await.unwrap();
1705 drop(db);
1706
1707 let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1708 context.with_label("reopen_small_delta"),
1709 variable_config::<OneCap>(partition, &context),
1710 )
1711 .await
1712 .unwrap();
1713 assert_eq!(reopened.bounds().await.end, target_size);
1714 assert_eq!(reopened.root(), target_root);
1715 assert_eq!(reopened.ops_root(), target_ops_root);
1716 assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_key0));
1717 assert_eq!(reopened.get(&key1).await.unwrap(), target_key1);
1718
1719 reopened.destroy().await.unwrap();
1720 });
1721 }
1722
1723 #[test_traced("INFO")]
1724 fn test_current_rewind_pruned_target_errors() {
1725 let executor = deterministic::Runner::default();
1726 executor.start(|context| async move {
1727 const KEYS: u64 = 384;
1728
1729 let partition = "current-rewind-pruned";
1730 let ctx = context.with_label("db");
1731 let mut db: UnorderedVariableDb =
1732 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1733 .await
1734 .unwrap();
1735
1736 let first_range = commit_writes_with_metadata(
1737 &mut db,
1738 (0..KEYS).map(|i| (key(i), Some(val(i)))),
1739 None,
1740 )
1741 .await;
1742 commit_writes_with_metadata(
1743 &mut db,
1744 (0..KEYS).map(|i| (key(i), Some(val(1000 + i)))),
1745 None,
1746 )
1747 .await;
1748
1749 db.prune(db.inactivity_floor_loc()).await.unwrap();
1750 let pruned_bits = db.pruned_bits();
1751 assert!(
1752 pruned_bits > *first_range.start,
1753 "expected bitmap pruning boundary above rewind target: pruned_bits={pruned_bits}, target={:?}",
1754 first_range.start
1755 );
1756
1757 let oldest_retained = db.bounds().await.start;
1758 let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
1759 assert!(
1760 matches!(
1761 boundary_err,
1762 Error::Journal(crate::journal::Error::ItemPruned(_))
1763 ),
1764 "unexpected rewind error at retained boundary: {boundary_err:?}"
1765 );
1766
1767 let expected_pruned_loc = *first_range.start - 1;
1768 let err = db.rewind(first_range.start).await.unwrap_err();
1769 assert!(
1770 matches!(
1771 err,
1772 Error::Journal(crate::journal::Error::ItemPruned(loc))
1773 if loc == expected_pruned_loc
1774 ),
1775 "unexpected rewind error: {err:?}"
1776 );
1777
1778 db.destroy().await.unwrap();
1779 });
1780 }
1781
1782 #[test_traced("INFO")]
1783 fn test_current_rewind_rejects_target_below_bitmap_floor() {
1784 let executor = deterministic::Runner::default();
1785 executor.start(|context| async move {
1786 const COMMITS: u64 = 96;
1787
1788 let partition = "current-rewind-bitmap-floor";
1789 let ctx = context.with_label("db");
1790 let mut db: UnorderedVariableDb =
1791 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1792 .await
1793 .unwrap();
1794
1795 let mut history = Vec::new();
1796 for round in 0..COMMITS {
1797 commit_writes_with_metadata(
1798 &mut db,
1799 [(key(0), Some(val(10_000 + round)))],
1800 None,
1801 )
1802 .await;
1803 history.push((db.bounds().await.end, db.inactivity_floor_loc()));
1804 }
1805 assert!(db.inactivity_floor_loc() > Location::new(64));
1806
1807 let prune_loc = Location::new(1);
1810 db.prune(prune_loc).await.unwrap();
1811 let pruned_bits = db.pruned_bits();
1812 assert!(pruned_bits > 0);
1813 let retained_start = db.bounds().await.start;
1814
1815 let rewind_target = history
1818 .iter()
1819 .find_map(|(size, floor)| {
1820 if *size > *retained_start
1821 && *size >= pruned_bits
1822 && *floor >= *retained_start
1823 && *floor < pruned_bits
1824 {
1825 Some(*size)
1826 } else {
1827 None
1828 }
1829 })
1830 .unwrap_or_else(|| {
1831 panic!(
1832 "expected rewind target below bitmap boundary. retained_start={retained_start:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
1833 db.inactivity_floor_loc()
1834 )
1835 });
1836
1837 let err = db.rewind(rewind_target).await.unwrap_err();
1838 assert!(
1839 matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1840 "unexpected rewind error: {err:?}"
1841 );
1842
1843 db.destroy().await.unwrap();
1844 });
1845 }
1846
1847 pub fn test_speculative_root_matches_committed<M, C, F, Fut>(mut open_db: F)
1853 where
1854 M: merkle::Graftable + 'static,
1855 C: DbAny<M> + 'static,
1856 C::Key: TestKey,
1857 <C as DbAny<M>>::Value: TestValue,
1858 F: FnMut(Context, String) -> Fut + Clone,
1859 Fut: Future<Output = C>,
1860 {
1861 let executor = deterministic::Runner::default();
1862 let mut open_db_clone = open_db.clone();
1863 executor.start(|context| async move {
1864 let partition = "speculative-root".to_string();
1865
1866 let mut db: C = open_db_clone(context.with_label("init"), partition.clone()).await;
1871 let mut batch = db.new_batch();
1872 for i in 0..260 {
1873 batch = batch.write(TestKey::from_seed(i), Some(TestValue::from_seed(i + 1000)));
1874 }
1875 let merkleized = batch.merkleize(&db, None).await.unwrap();
1876 db.apply_batch(merkleized).await.unwrap();
1877 let speculative_root = db.root();
1878
1879 db.sync().await.unwrap();
1881 drop(db);
1882
1883 let db: C = open_db(context.with_label("reopen"), partition).await;
1884 assert_eq!(db.root(), speculative_root);
1885
1886 db.destroy().await.unwrap();
1887 });
1888 }
1889
1890 #[test_traced("INFO")]
1891 fn test_all_variants_speculative_root_matches_committed() {
1892 let executor = deterministic::Runner::default();
1893 executor.start(|_context| async move {
1894 for_all_variants!(simple: test_speculative_root_matches_committed);
1895 });
1896 }
1897
1898 #[test_traced("INFO")]
1900 fn test_current_batch_merkleized_get() {
1901 let executor = deterministic::Runner::default();
1902 executor.start(|context| async move {
1903 let ctx = context.with_label("db");
1904 let mut db: UnorderedVariableDb =
1905 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("mg", &ctx))
1906 .await
1907 .unwrap();
1908
1909 let ka = key(0);
1910 let kb = key(1);
1911 let kc = key(2);
1912
1913 {
1915 let mut batch = db.new_batch();
1916 batch = batch.write(ka, Some(val(0)));
1917 let merkleized = batch.merkleize(&db, None).await.unwrap();
1918 db.apply_batch(merkleized).await.unwrap();
1919 }
1920
1921 let va2 = val(100);
1923 let vb = val(1);
1924 let mut batch = db.new_batch();
1925 batch = batch.write(ka, Some(va2));
1926 batch = batch.write(kb, Some(vb));
1927 let merkleized = batch.merkleize(&db, None).await.unwrap();
1928
1929 assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
1930 assert_eq!(merkleized.get(&kb, &db).await.unwrap(), Some(vb));
1931 assert_eq!(merkleized.get(&kc, &db).await.unwrap(), None);
1932
1933 db.destroy().await.unwrap();
1934 });
1935 }
1936
1937 #[test_traced("INFO")]
1940 fn test_current_batch_chaining() {
1941 let executor = deterministic::Runner::default();
1942 executor.start(|context| async move {
1943 let ctx = context.with_label("db");
1944 let mut db: UnorderedVariableDb =
1945 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("ch", &ctx))
1946 .await
1947 .unwrap();
1948
1949 let mut parent = db.new_batch();
1951 for i in 0..5 {
1952 parent = parent.write(key(i), Some(val(i)));
1953 }
1954 let parent_m = parent.merkleize(&db, None).await.unwrap();
1955
1956 let mut child = parent_m.new_batch::<Sha256>();
1958 for i in 5..10 {
1959 child = child.write(key(i), Some(val(i)));
1960 }
1961 child = child.write(key(0), Some(val(999)));
1962 let child_m = child.merkleize(&db, None).await.unwrap();
1963
1964 let child_root = child_m.root();
1965
1966 assert_eq!(child_m.get(&key(0), &db).await.unwrap(), Some(val(999)));
1968 assert_eq!(child_m.get(&key(3), &db).await.unwrap(), Some(val(3)));
1969 assert_eq!(child_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
1970
1971 db.apply_batch(child_m).await.unwrap();
1972 assert_eq!(db.root(), child_root);
1973
1974 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(999)));
1976 for i in 1..10 {
1977 assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
1978 }
1979
1980 db.destroy().await.unwrap();
1981 });
1982 }
1983
1984 #[test_traced("INFO")]
1985 fn test_current_unordered_root_matches_between_pending_and_committed_paths() {
1986 let executor = deterministic::Runner::default();
1987 executor.start(|context| async move {
1988 let ctx = context.with_label("db");
1989 let mut db: UnorderedFixedDb =
1990 UnorderedFixedDb::init(ctx.clone(), fixed_config::<OneCap>("ucr", &ctx))
1991 .await
1992 .unwrap();
1993 let key_a = colliding_digest(0xAA, 1);
1994 let key_b = colliding_digest(0xAA, 0);
1995
1996 let mut initial = db.new_batch();
2001 for i in 0..4 {
2002 initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
2003 }
2004 let merkleized = initial.merkleize(&db, None).await.unwrap();
2005 db.apply_batch(merkleized).await.unwrap();
2006 db.commit().await.unwrap();
2007
2008 let parent = db
2012 .new_batch()
2013 .write(key_a, Some(colliding_digest(0xCC, 1)))
2014 .merkleize(&db, None)
2015 .await
2016 .unwrap();
2017
2018 let pending_child = parent
2022 .new_batch::<Sha256>()
2023 .write(key_a, Some(colliding_digest(0xDD, 1)))
2024 .write(key_b, Some(colliding_digest(0xDD, 0)))
2025 .merkleize(&db, None)
2026 .await
2027 .unwrap();
2028
2029 let pending_root = pending_child.root();
2030 let pending_ops_root = pending_child.ops_root();
2031
2032 db.apply_batch(parent).await.unwrap();
2033 db.commit().await.unwrap();
2034
2035 let committed_child = db
2036 .new_batch()
2037 .write(key_a, Some(colliding_digest(0xDD, 1)))
2038 .write(key_b, Some(colliding_digest(0xDD, 0)))
2039 .merkleize(&db, None)
2040 .await
2041 .unwrap();
2042
2043 assert_eq!(pending_root, committed_child.root());
2044 assert_eq!(pending_ops_root, committed_child.ops_root());
2045
2046 db.apply_batch(pending_child).await.unwrap();
2049 assert_eq!(db.root(), committed_child.root());
2050 assert_eq!(db.ops_root(), committed_child.ops_root());
2051
2052 db.destroy().await.unwrap();
2053 });
2054 }
2055
2056 #[test_traced("INFO")]
2057 fn test_current_ordered_root_matches_between_pending_and_committed_paths() {
2058 let executor = deterministic::Runner::default();
2059 executor.start(|context| async move {
2060 let ctx = context.with_label("db");
2061 let mut db: OrderedFixedDb =
2062 OrderedFixedDb::init(ctx.clone(), fixed_config::<OneCap>("ocr", &ctx))
2063 .await
2064 .unwrap();
2065 let key_a = colliding_digest(0xAA, 1);
2066 let key_b = colliding_digest(0xAA, 0);
2067
2068 let mut initial = db.new_batch();
2071 for i in 0..4 {
2072 initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
2073 }
2074 let merkleized = initial.merkleize(&db, None).await.unwrap();
2075 db.apply_batch(merkleized).await.unwrap();
2076 db.commit().await.unwrap();
2077
2078 let parent = db
2082 .new_batch()
2083 .write(key_a, Some(colliding_digest(0xCC, 1)))
2084 .merkleize(&db, None)
2085 .await
2086 .unwrap();
2087
2088 let pending_child = parent
2091 .new_batch::<Sha256>()
2092 .write(key_a, Some(colliding_digest(0xDD, 1)))
2093 .write(key_b, Some(colliding_digest(0xDD, 0)))
2094 .merkleize(&db, None)
2095 .await
2096 .unwrap();
2097
2098 let pending_root = pending_child.root();
2099 let pending_ops_root = pending_child.ops_root();
2100
2101 db.apply_batch(parent).await.unwrap();
2102 db.commit().await.unwrap();
2103
2104 let committed_child = db
2105 .new_batch()
2106 .write(key_a, Some(colliding_digest(0xDD, 1)))
2107 .write(key_b, Some(colliding_digest(0xDD, 0)))
2108 .merkleize(&db, None)
2109 .await
2110 .unwrap();
2111
2112 assert_eq!(pending_root, committed_child.root());
2113 assert_eq!(pending_ops_root, committed_child.ops_root());
2114
2115 db.apply_batch(pending_child).await.unwrap();
2118 assert_eq!(db.root(), committed_child.root());
2119 assert_eq!(db.ops_root(), committed_child.ops_root());
2120
2121 db.destroy().await.unwrap();
2122 });
2123 }
2124
2125 #[test_traced("INFO")]
2127 fn test_current_batch_apply_requires_commit_for_recovery() {
2128 let executor = deterministic::Runner::default();
2129 executor.start(|context| async move {
2130 let partition = "apply_requires_commit";
2131 let ctx = context.with_label("db");
2132 let mut db: UnorderedVariableDb =
2133 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
2134 .await
2135 .unwrap();
2136
2137 let committed_root = db.root();
2138
2139 let merkleized = db
2140 .new_batch()
2141 .write(key(0), Some(val(0)))
2142 .merkleize(&db, None)
2143 .await
2144 .unwrap();
2145 db.apply_batch(merkleized).await.unwrap();
2146
2147 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2148
2149 drop(db);
2150
2151 let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
2152 context.with_label("reopen"),
2153 variable_config::<OneCap>(partition, &context),
2154 )
2155 .await
2156 .unwrap();
2157 assert_eq!(reopened.root(), committed_root);
2158 assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
2159
2160 reopened.destroy().await.unwrap();
2161 });
2162 }
2163
2164 #[test_traced("INFO")]
2166 fn test_current_batch_single_stage_pipeline() {
2167 let executor = deterministic::Runner::default();
2168 executor.start(|context| async move {
2169 let ctx = context.with_label("db");
2170 let mut db: UnorderedVariableDb =
2171 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("pipe", &ctx))
2172 .await
2173 .unwrap();
2174
2175 let mut batch = db.new_batch();
2176 batch = batch.write(key(0), Some(val(0)));
2177 let parent_merkleized = batch.merkleize(&db, None).await.unwrap();
2178 db.apply_batch(parent_merkleized).await.unwrap();
2179
2180 let (child_merkleized, commit_result) = futures::join!(
2181 async {
2182 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2183 let mut child = db.new_batch();
2184 child = child.write(key(1), Some(val(1)));
2185 child.merkleize(&db, None).await.unwrap()
2186 },
2187 db.commit(),
2188 );
2189 commit_result.unwrap();
2190
2191 db.apply_batch(child_merkleized).await.unwrap();
2192 db.commit().await.unwrap();
2193
2194 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2195 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2196
2197 db.destroy().await.unwrap();
2198 });
2199 }
2200
2201 #[test_traced("INFO")]
2204 fn test_current_sequential_commit() {
2205 let executor = deterministic::Runner::default();
2206 executor.start(|context| async move {
2207 let ctx = context.with_label("db");
2208 let mut db: UnorderedVariableDb =
2209 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("ff", &ctx))
2210 .await
2211 .unwrap();
2212
2213 let parent_m = db
2215 .new_batch()
2216 .write(key(0), Some(val(0)))
2217 .merkleize(&db, None)
2218 .await
2219 .unwrap();
2220
2221 let child_m = parent_m
2223 .new_batch::<Sha256>()
2224 .write(key(1), Some(val(1)))
2225 .merkleize(&db, None)
2226 .await
2227 .unwrap();
2228
2229 db.apply_batch(parent_m).await.unwrap();
2230 db.apply_batch(child_m).await.unwrap();
2231
2232 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2234 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2235
2236 let ctx2 = context.with_label("db2");
2239 let mut db2: UnorderedVariableDb =
2240 UnorderedVariableDb::init(ctx2.clone(), variable_config::<OneCap>("ff2", &ctx2))
2241 .await
2242 .unwrap();
2243 let m1 = db2
2244 .new_batch()
2245 .write(key(0), Some(val(0)))
2246 .merkleize(&db2, None)
2247 .await
2248 .unwrap();
2249 db2.apply_batch(m1).await.unwrap();
2250 let m2 = db2
2251 .new_batch()
2252 .write(key(1), Some(val(1)))
2253 .merkleize(&db2, None)
2254 .await
2255 .unwrap();
2256 db2.apply_batch(m2).await.unwrap();
2257
2258 assert_eq!(db.root(), db2.root());
2259
2260 db.destroy().await.unwrap();
2261 db2.destroy().await.unwrap();
2262 });
2263 }
2264
2265 #[test_traced("INFO")]
2268 fn test_current_to_batch_then_chain() {
2269 let executor = deterministic::Runner::default();
2270 executor.start(|context| async move {
2271 let ctx = context.with_label("db");
2272 let mut db: UnorderedVariableDb =
2273 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("tb", &ctx))
2274 .await
2275 .unwrap();
2276
2277 let m = db
2279 .new_batch()
2280 .write(key(0), Some(val(0)))
2281 .merkleize(&db, None)
2282 .await
2283 .unwrap();
2284 db.apply_batch(m).await.unwrap();
2285
2286 let snapshot = db.to_batch();
2288 assert_eq!(snapshot.root(), db.root());
2289
2290 let child = snapshot
2292 .new_batch::<Sha256>()
2293 .write(key(1), Some(val(1)))
2294 .merkleize(&db, None)
2295 .await
2296 .unwrap();
2297
2298 assert_ne!(child.root(), snapshot.root());
2300
2301 db.apply_batch(child).await.unwrap();
2303 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2304 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2305
2306 db.destroy().await.unwrap();
2307 });
2308 }
2309
2310 #[test_traced("INFO")]
2312 fn test_flatten_noop_on_fresh_db() {
2313 let executor = deterministic::Runner::default();
2314 executor.start(|context| async move {
2315 let ctx = context.with_label("db");
2316 let mut db: UnorderedVariableDb =
2317 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("fl-noop", &ctx))
2318 .await
2319 .unwrap();
2320
2321 let root_before = db.root();
2322 db.flatten();
2323 assert_eq!(db.root(), root_before);
2324
2325 db.destroy().await.unwrap();
2326 });
2327 }
2328
2329 #[test_traced("INFO")]
2331 fn test_flatten_preserves_root_after_batches() {
2332 let executor = deterministic::Runner::default();
2333 executor.start(|context| async move {
2334 let ctx = context.with_label("db");
2335 let mut db: UnorderedVariableDb =
2336 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("fl-root", &ctx))
2337 .await
2338 .unwrap();
2339
2340 for i in 0u64..5 {
2342 let m = db
2343 .new_batch()
2344 .write(key(i), Some(val(i)))
2345 .merkleize(&db, None)
2346 .await
2347 .unwrap();
2348 db.apply_batch(m).await.unwrap();
2349 }
2350
2351 let root_before = db.root();
2352 db.flatten();
2353 assert_eq!(db.root(), root_before);
2354
2355 for i in 0u64..5 {
2357 assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
2358 }
2359
2360 db.destroy().await.unwrap();
2361 });
2362 }
2363
2364 #[test_traced("INFO")]
2366 fn test_flatten_idempotent() {
2367 let executor = deterministic::Runner::default();
2368 executor.start(|context| async move {
2369 let ctx = context.with_label("db");
2370 let mut db: UnorderedVariableDb =
2371 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("fl-idem", &ctx))
2372 .await
2373 .unwrap();
2374
2375 let m = db
2376 .new_batch()
2377 .write(key(0), Some(val(0)))
2378 .merkleize(&db, None)
2379 .await
2380 .unwrap();
2381 db.apply_batch(m).await.unwrap();
2382
2383 db.flatten();
2384 let root_after_first = db.root();
2385
2386 db.flatten();
2387 assert_eq!(db.root(), root_after_first);
2388
2389 db.destroy().await.unwrap();
2390 });
2391 }
2392
2393 #[test_traced("INFO")]
2395 fn test_flatten_then_new_batch() {
2396 let executor = deterministic::Runner::default();
2397 executor.start(|context| async move {
2398 let ctx = context.with_label("db");
2399 let mut db: UnorderedVariableDb =
2400 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("fl-then", &ctx))
2401 .await
2402 .unwrap();
2403
2404 let m = db
2406 .new_batch()
2407 .write(key(0), Some(val(0)))
2408 .merkleize(&db, None)
2409 .await
2410 .unwrap();
2411 db.apply_batch(m).await.unwrap();
2412 db.flatten();
2413
2414 let m = db
2415 .new_batch()
2416 .write(key(1), Some(val(1)))
2417 .merkleize(&db, None)
2418 .await
2419 .unwrap();
2420 db.apply_batch(m).await.unwrap();
2421
2422 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2423 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2424
2425 db.destroy().await.unwrap();
2426 });
2427 }
2428
2429 #[test_traced("WARN")]
2433 fn test_current_apply_after_ancestor_dropped() {
2434 let executor = deterministic::Runner::default();
2435 executor.start(|context| async move {
2436 let ctx = context.with_label("db");
2437 let mut db: UnorderedVariableDb =
2438 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("adrop", &ctx))
2439 .await
2440 .unwrap();
2441
2442 let mut a = db.new_batch();
2444 for i in 0..3 {
2445 a = a.write(key(i), Some(val(i)));
2446 }
2447 let a_m = a.merkleize(&db, None).await.unwrap();
2448
2449 let mut b = a_m.new_batch::<Sha256>();
2450 for i in 3..6 {
2451 b = b.write(key(i), Some(val(i)));
2452 }
2453 let b_m = b.merkleize(&db, None).await.unwrap();
2454
2455 let mut c = b_m.new_batch::<Sha256>();
2456 for i in 6..9 {
2457 c = c.write(key(i), Some(val(i)));
2458 }
2459 let c_m = c.merkleize(&db, None).await.unwrap();
2460
2461 drop(a_m);
2463 drop(b_m);
2464
2465 db.apply_batch(c_m).await.unwrap();
2467 db.commit().await.unwrap();
2468
2469 for i in 0..9 {
2471 assert_eq!(
2472 db.get(&key(i)).await.unwrap(),
2473 Some(val(i)),
2474 "key({i}) missing after apply_batch with dropped ancestors"
2475 );
2476 }
2477
2478 db.destroy().await.unwrap();
2479 });
2480 }
2481
2482 #[test_traced("WARN")]
2491 fn test_current_chain_bitmap_order_matches_sequential() {
2492 let executor = deterministic::Runner::default();
2493 executor.start(|context| async move {
2494 let ctx1 = context.with_label("db1");
2496 let mut db1: UnorderedVariableDb =
2497 UnorderedVariableDb::init(ctx1.clone(), variable_config::<OneCap>("ord1", &ctx1))
2498 .await
2499 .unwrap();
2500
2501 commit_writes_with_metadata(
2503 &mut db1,
2504 [(key(10), Some(val(10))), (key(11), Some(val(11)))],
2505 None,
2506 )
2507 .await;
2508
2509 let a = db1
2516 .new_batch()
2517 .write(key(10), Some(val(100)))
2518 .write(key(11), None) .merkleize(&db1, None)
2520 .await
2521 .unwrap();
2522
2523 let b = a
2524 .new_batch::<Sha256>()
2525 .write(key(12), Some(val(120)))
2526 .write(key(13), Some(val(130)))
2527 .merkleize(&db1, None)
2528 .await
2529 .unwrap();
2530
2531 let c = b
2532 .new_batch::<Sha256>()
2533 .write(key(14), Some(val(140)))
2534 .merkleize(&db1, None)
2535 .await
2536 .unwrap();
2537
2538 db1.apply_batch(c).await.unwrap();
2539 db1.commit().await.unwrap();
2540
2541 let d1 = db1
2543 .new_batch()
2544 .write(key(20), Some(val(200)))
2545 .merkleize(&db1, None)
2546 .await
2547 .unwrap();
2548 let chain_then_d_root = d1.root();
2549
2550 let ctx2 = context.with_label("db2");
2552 let mut db2: UnorderedVariableDb =
2553 UnorderedVariableDb::init(ctx2.clone(), variable_config::<OneCap>("ord2", &ctx2))
2554 .await
2555 .unwrap();
2556
2557 commit_writes_with_metadata(
2558 &mut db2,
2559 [(key(10), Some(val(10))), (key(11), Some(val(11)))],
2560 None,
2561 )
2562 .await;
2563
2564 let a2 = db2
2565 .new_batch()
2566 .write(key(10), Some(val(100)))
2567 .write(key(11), None)
2568 .merkleize(&db2, None)
2569 .await
2570 .unwrap();
2571 db2.apply_batch(a2).await.unwrap();
2572 db2.commit().await.unwrap();
2573
2574 let b2 = db2
2575 .new_batch()
2576 .write(key(12), Some(val(120)))
2577 .write(key(13), Some(val(130)))
2578 .merkleize(&db2, None)
2579 .await
2580 .unwrap();
2581 db2.apply_batch(b2).await.unwrap();
2582 db2.commit().await.unwrap();
2583
2584 let c2 = db2
2585 .new_batch()
2586 .write(key(14), Some(val(140)))
2587 .merkleize(&db2, None)
2588 .await
2589 .unwrap();
2590 db2.apply_batch(c2).await.unwrap();
2591 db2.commit().await.unwrap();
2592
2593 let d2 = db2
2594 .new_batch()
2595 .write(key(20), Some(val(200)))
2596 .merkleize(&db2, None)
2597 .await
2598 .unwrap();
2599 let sequential_then_d_root = d2.root();
2600
2601 assert_eq!(
2602 chain_then_d_root, sequential_then_d_root,
2603 "batch D's root on top of chain-applied state must match sequential state"
2604 );
2605
2606 db1.destroy().await.unwrap();
2607 db2.destroy().await.unwrap();
2608 });
2609 }
2610
2611 #[test_traced("WARN")]
2621 fn test_current_stale_bitmap_clears_after_prune() {
2622 let executor = deterministic::Runner::default();
2623 executor.start(|context| async move {
2624 let ctx = context.with_label("db");
2625 let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2626 ctx.clone(),
2627 variable_config::<OneCap>("stale-clears", &ctx),
2628 )
2629 .await
2630 .unwrap();
2631
2632 let mut seed = db.new_batch();
2634 for i in 0u64..255 {
2635 seed = seed.write(key(i), Some(val(i)));
2636 }
2637 let seed_m = seed.merkleize(&db, None).await.unwrap();
2638 db.apply_batch(seed_m).await.unwrap();
2639 db.commit().await.unwrap();
2640
2641 let mut p = db.new_batch();
2644 for i in 1u64..255 {
2645 p = p.write(key(i), Some(val(i + 10000)));
2646 }
2647 let p_m = p.merkleize(&db, None).await.unwrap();
2648
2649 let c_m = p_m
2651 .new_batch::<Sha256>()
2652 .write(key(0), Some(val(9999)))
2653 .merkleize(&db, None)
2654 .await
2655 .unwrap();
2656
2657 db.apply_batch(p_m).await.unwrap();
2659 db.commit().await.unwrap();
2660
2661 let floor = *db.inactivity_floor_loc();
2662 assert!(floor >= 256, "floor must be past chunk 0: floor={floor}",);
2663
2664 db.prune(db.inactivity_floor_loc()).await.unwrap();
2665 db.apply_batch(c_m).await.unwrap();
2666 db.flatten();
2667
2668 db.destroy().await.unwrap();
2669 });
2670 }
2671
2672 #[test_traced("INFO")]
2675 fn test_current_partial_ancestor_commit() {
2676 let executor = deterministic::Runner::default();
2677 executor.start(|context| async move {
2678 let ctx = context.with_label("db");
2679 let mut db: UnorderedVariableDb =
2680 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("pac", &ctx))
2681 .await
2682 .unwrap();
2683
2684 let a = db
2685 .new_batch()
2686 .write(key(0), Some(val(0)))
2687 .merkleize(&db, None)
2688 .await
2689 .unwrap();
2690 let b = a
2691 .new_batch::<Sha256>()
2692 .write(key(1), Some(val(1)))
2693 .merkleize(&db, None)
2694 .await
2695 .unwrap();
2696 let c = b
2697 .new_batch::<Sha256>()
2698 .write(key(2), Some(val(2)))
2699 .merkleize(&db, None)
2700 .await
2701 .unwrap();
2702
2703 let expected_root = c.root();
2704
2705 db.apply_batch(a).await.unwrap();
2706 db.apply_batch(c).await.unwrap();
2707
2708 assert_eq!(db.root(), expected_root);
2709 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2710 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2711 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
2712
2713 db.destroy().await.unwrap();
2714 });
2715 }
2716
2717 #[test_traced("INFO")]
2721 fn test_current_partial_ancestor_bitmap_ordering() {
2722 let executor = deterministic::Runner::default();
2723 executor.start(|context| async move {
2724 let ctx = context.with_label("db");
2725 let mut db: UnorderedVariableDb =
2726 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("bmo", &ctx))
2727 .await
2728 .unwrap();
2729
2730 let a = db
2732 .new_batch()
2733 .write(key(0), Some(val(0)))
2734 .merkleize(&db, None)
2735 .await
2736 .unwrap();
2737 let b = a
2738 .new_batch::<Sha256>()
2739 .write(key(1), Some(val(1)))
2740 .merkleize(&db, None)
2741 .await
2742 .unwrap();
2743 let c = b
2744 .new_batch::<Sha256>()
2745 .write(key(2), Some(val(2)))
2746 .merkleize(&db, None)
2747 .await
2748 .unwrap();
2749 let d = c
2750 .new_batch::<Sha256>()
2751 .write(key(3), Some(val(3)))
2752 .merkleize(&db, None)
2753 .await
2754 .unwrap();
2755
2756 db.apply_batch(a).await.unwrap();
2760 db.apply_batch(d.clone()).await.unwrap();
2761
2762 let e = db
2767 .new_batch()
2768 .write(key(4), Some(val(4)))
2769 .merkleize(&db, None)
2770 .await
2771 .unwrap();
2772 db.apply_batch(e).await.unwrap();
2773
2774 let ref_ctx = context.with_label("ref");
2776 let mut ref_db: UnorderedVariableDb = UnorderedVariableDb::init(
2777 ref_ctx.clone(),
2778 variable_config::<OneCap>("bmo_ref", &ref_ctx),
2779 )
2780 .await
2781 .unwrap();
2782 for i in 0..5 {
2783 let batch = ref_db
2784 .new_batch()
2785 .write(key(i), Some(val(i)))
2786 .merkleize(&ref_db, None)
2787 .await
2788 .unwrap();
2789 ref_db.apply_batch(batch).await.unwrap();
2790 }
2791
2792 assert_eq!(
2793 db.root(),
2794 ref_db.root(),
2795 "root mismatch: bitmap ordering bug"
2796 );
2797
2798 db.destroy().await.unwrap();
2799 ref_db.destroy().await.unwrap();
2800 });
2801 }
2802}