1use crate::{
188 index::Unordered as UnorderedIndex,
189 journal::contiguous::{fixed::Journal as FJournal, variable::Journal as VJournal},
190 mmr::{Location, StandardHasher},
191 qmdb::{
192 any::{
193 self,
194 operation::{Operation, Update},
195 FixedConfig as AnyFixedConfig, ValueEncoding, VariableConfig as AnyVariableConfig,
196 },
197 operation::{Committable, Key},
198 Error,
199 },
200 translator::Translator,
201};
202use commonware_codec::{Codec, CodecFixedShared, FixedSize, Read};
203use commonware_cryptography::Hasher;
204use commonware_parallel::ThreadPool;
205use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
206use commonware_utils::{bitmap::Prunable as BitMap, sync::AsyncMutex, Array};
207use std::num::{NonZeroU64, NonZeroUsize};
208
209pub mod batch;
210pub mod db;
211mod grafting;
212pub mod ordered;
213pub mod proof;
214pub(crate) mod sync;
215pub mod unordered;
216
217#[derive(Clone)]
219pub struct FixedConfig<T: Translator> {
220 pub mmr_journal_partition: String,
222
223 pub mmr_items_per_blob: NonZeroU64,
225
226 pub mmr_write_buffer: NonZeroUsize,
228
229 pub mmr_metadata_partition: String,
231
232 pub log_journal_partition: String,
234
235 pub log_items_per_blob: NonZeroU64,
237
238 pub log_write_buffer: NonZeroUsize,
240
241 pub grafted_mmr_metadata_partition: String,
243
244 pub translator: T,
246
247 pub thread_pool: Option<ThreadPool>,
249
250 pub page_cache: CacheRef,
252}
253
254impl<T: Translator> From<FixedConfig<T>> for AnyFixedConfig<T> {
255 fn from(cfg: FixedConfig<T>) -> Self {
256 Self {
257 mmr_journal_partition: cfg.mmr_journal_partition,
258 mmr_metadata_partition: cfg.mmr_metadata_partition,
259 mmr_items_per_blob: cfg.mmr_items_per_blob,
260 mmr_write_buffer: cfg.mmr_write_buffer,
261 log_journal_partition: cfg.log_journal_partition,
262 log_items_per_blob: cfg.log_items_per_blob,
263 log_write_buffer: cfg.log_write_buffer,
264 translator: cfg.translator,
265 thread_pool: cfg.thread_pool,
266 page_cache: cfg.page_cache,
267 }
268 }
269}
270
271#[derive(Clone)]
272pub struct VariableConfig<T: Translator, C> {
273 pub mmr_journal_partition: String,
275
276 pub mmr_items_per_blob: NonZeroU64,
278
279 pub mmr_write_buffer: NonZeroUsize,
281
282 pub mmr_metadata_partition: String,
284
285 pub log_partition: String,
287
288 pub log_write_buffer: NonZeroUsize,
290
291 pub log_compression: Option<u8>,
293
294 pub log_codec_config: C,
296
297 pub log_items_per_blob: NonZeroU64,
299
300 pub grafted_mmr_metadata_partition: String,
302
303 pub translator: T,
305
306 pub thread_pool: Option<ThreadPool>,
308
309 pub page_cache: CacheRef,
311}
312
313impl<T: Translator, C> From<VariableConfig<T, C>> for AnyVariableConfig<T, C> {
314 fn from(cfg: VariableConfig<T, C>) -> Self {
315 Self {
316 mmr_journal_partition: cfg.mmr_journal_partition,
317 mmr_metadata_partition: cfg.mmr_metadata_partition,
318 mmr_items_per_blob: cfg.mmr_items_per_blob,
319 mmr_write_buffer: cfg.mmr_write_buffer,
320 log_items_per_blob: cfg.log_items_per_blob,
321 log_partition: cfg.log_partition,
322 log_write_buffer: cfg.log_write_buffer,
323 log_compression: cfg.log_compression,
324 log_codec_config: cfg.log_codec_config,
325 translator: cfg.translator,
326 thread_pool: cfg.thread_pool,
327 page_cache: cfg.page_cache,
328 }
329 }
330}
331
332pub(super) async fn init_fixed<E, K, V, U, H, T, I, const N: usize, NewIndex>(
334 context: E,
335 config: FixedConfig<T>,
336 new_index: NewIndex,
337) -> Result<db::Db<E, FJournal<E, Operation<K, V, U>>, I, H, U, N>, Error>
338where
339 E: Storage + Clock + Metrics,
340 K: Array,
341 V: ValueEncoding,
342 U: Update<K, V> + Send + Sync,
343 H: Hasher,
344 T: Translator,
345 I: UnorderedIndex<Value = Location>,
346 NewIndex: FnOnce(E, T) -> I,
347 Operation<K, V, U>: CodecFixedShared + Committable,
348{
349 const {
351 assert!(
355 N.is_multiple_of(H::Digest::SIZE),
356 "chunk size must be some multiple of the digest size",
357 );
358 assert!(N.is_power_of_two(), "chunk size must be a power of 2");
361 }
362
363 let thread_pool = config.thread_pool.clone();
364 let metadata_partition = config.grafted_mmr_metadata_partition.clone();
365
366 let (metadata, pruned_chunks, pinned_nodes) =
368 db::init_metadata(context.with_label("metadata"), &metadata_partition).await?;
369
370 let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
372 .map_err(|_| Error::DataCorrupted("pruned chunks overflow"))?;
373
374 let last_known_inactivity_floor = Location::new(status.len());
376 let any = any::init_fixed(
377 context.with_label("any"),
378 config.into(),
379 Some(last_known_inactivity_floor),
380 |append: bool, loc: Option<Location>| {
381 status.push(append);
382 if let Some(loc) = loc {
383 status.set_bit(*loc, false);
384 }
385 },
386 new_index,
387 )
388 .await?;
389
390 let mut hasher = StandardHasher::<H>::new();
392 let grafted_mmr = db::build_grafted_mmr::<H, N>(
393 &mut hasher,
394 &status,
395 &pinned_nodes,
396 &any.log.mmr,
397 thread_pool.as_ref(),
398 )
399 .await?;
400
401 let storage = grafting::Storage::new(&grafted_mmr, grafting::height::<N>(), &any.log.mmr);
403 let partial_chunk = db::partial_chunk(&status);
404 let ops_root = any.log.root();
405 let root = db::compute_db_root(&mut hasher, &storage, partial_chunk, &ops_root).await?;
406
407 Ok(db::Db {
408 any,
409 status,
410 grafted_mmr,
411 metadata: AsyncMutex::new(metadata),
412 thread_pool,
413 root,
414 })
415}
416
417pub(super) async fn init_variable<E, K, V, U, H, T, I, const N: usize, NewIndex>(
419 context: E,
420 config: VariableConfig<T, <Operation<K, V, U> as Read>::Cfg>,
421 new_index: NewIndex,
422) -> Result<db::Db<E, VJournal<E, Operation<K, V, U>>, I, H, U, N>, Error>
423where
424 E: Storage + Clock + Metrics,
425 K: Key,
426 V: ValueEncoding,
427 U: Update<K, V> + Send + Sync,
428 H: Hasher,
429 T: Translator,
430 I: UnorderedIndex<Value = Location>,
431 NewIndex: FnOnce(E, T) -> I,
432 Operation<K, V, U>: Codec + Committable,
433{
434 const {
436 assert!(
440 N.is_multiple_of(H::Digest::SIZE),
441 "chunk size must be some multiple of the digest size",
442 );
443 assert!(N.is_power_of_two(), "chunk size must be a power of 2");
446 }
447
448 let metadata_partition = config.grafted_mmr_metadata_partition.clone();
449 let pool = config.thread_pool.clone();
450
451 let (metadata, pruned_chunks, pinned_nodes) =
453 db::init_metadata(context.with_label("metadata"), &metadata_partition).await?;
454
455 let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
457 .map_err(|_| Error::DataCorrupted("pruned chunks overflow"))?;
458
459 let last_known_inactivity_floor = Location::new(status.len());
461 let any = any::init_variable(
462 context.with_label("any"),
463 config.into(),
464 Some(last_known_inactivity_floor),
465 |append: bool, loc: Option<Location>| {
466 status.push(append);
467 if let Some(loc) = loc {
468 status.set_bit(*loc, false);
469 }
470 },
471 new_index,
472 )
473 .await?;
474
475 let mut hasher = StandardHasher::<H>::new();
477 let grafted_mmr = db::build_grafted_mmr::<H, N>(
478 &mut hasher,
479 &status,
480 &pinned_nodes,
481 &any.log.mmr,
482 pool.as_ref(),
483 )
484 .await?;
485
486 let storage = grafting::Storage::new(&grafted_mmr, grafting::height::<N>(), &any.log.mmr);
488 let partial_chunk = db::partial_chunk(&status);
489 let ops_root = any.log.root();
490 let root = db::compute_db_root(&mut hasher, &storage, partial_chunk, &ops_root).await?;
491
492 Ok(db::Db {
493 any,
494 status,
495 grafted_mmr,
496 metadata: AsyncMutex::new(metadata),
497 thread_pool: pool,
498 root,
499 })
500}
501
502#[cfg(any(test, feature = "test-traits"))]
504pub trait BitmapPrunedBits {
505 fn pruned_bits(&self) -> u64;
507
508 fn get_bit(&self, index: u64) -> bool;
510
511 fn oldest_retained(&self) -> impl core::future::Future<Output = u64> + Send;
513}
514
515#[cfg(test)]
516pub mod tests {
517 pub use super::BitmapPrunedBits;
520 use super::{ordered, unordered, FixedConfig, VariableConfig};
521 use crate::{
522 qmdb::{
523 any::traits::{DbAny, MerkleizedBatch as _, UnmerkleizedBatch as _},
524 store::{
525 tests::{TestKey, TestValue},
526 LogStore,
527 },
528 Error,
529 },
530 translator::Translator,
531 };
532 use commonware_runtime::{
533 buffer::paged::CacheRef,
534 deterministic::{self, Context},
535 BufferPooler, Metrics as _, Runner as _,
536 };
537 use commonware_utils::{NZUsize, NZU16, NZU64};
538 use core::future::Future;
539 use rand::{rngs::StdRng, RngCore, SeedableRng};
540 use std::num::{NonZeroU16, NonZeroUsize};
541 use tracing::warn;
542
543 const PAGE_SIZE: NonZeroU16 = NZU16!(88);
545 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(8);
546
547 pub(crate) fn fixed_config<T: Translator + Default>(
549 partition_prefix: &str,
550 pooler: &impl BufferPooler,
551 ) -> FixedConfig<T> {
552 FixedConfig {
553 mmr_journal_partition: format!("{partition_prefix}-journal-partition"),
554 mmr_metadata_partition: format!("{partition_prefix}-metadata-partition"),
555 mmr_items_per_blob: NZU64!(11),
556 mmr_write_buffer: NZUsize!(1024),
557 log_journal_partition: format!("{partition_prefix}-partition-prefix"),
558 log_items_per_blob: NZU64!(7),
559 log_write_buffer: NZUsize!(1024),
560 grafted_mmr_metadata_partition: format!(
561 "{partition_prefix}-grafted-mmr-metadata-partition"
562 ),
563 translator: T::default(),
564 thread_pool: None,
565 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
566 }
567 }
568
569 pub(crate) fn variable_config<T: Translator + Default>(
571 partition_prefix: &str,
572 pooler: &impl BufferPooler,
573 ) -> VariableConfig<T, ((), ())> {
574 VariableConfig {
575 mmr_journal_partition: format!("{partition_prefix}-journal-partition"),
576 mmr_metadata_partition: format!("{partition_prefix}-metadata-partition"),
577 mmr_items_per_blob: NZU64!(11),
578 mmr_write_buffer: NZUsize!(1024),
579 log_partition: format!("{partition_prefix}-partition-prefix"),
580 log_items_per_blob: NZU64!(7),
581 log_write_buffer: NZUsize!(1024),
582 log_compression: None,
583 log_codec_config: ((), ()),
584 grafted_mmr_metadata_partition: format!(
585 "{partition_prefix}-grafted-mmr-metadata-partition"
586 ),
587 translator: T::default(),
588 thread_pool: None,
589 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
590 }
591 }
592
593 async fn commit_writes<C: DbAny>(
595 db: &mut C,
596 writes: impl IntoIterator<Item = (C::Key, Option<<C as LogStore>::Value>)>,
597 ) -> Result<(), Error> {
598 let mut batch = db.new_batch();
599 for (k, v) in writes {
600 batch.write(k, v);
601 }
602 let finalized = batch.merkleize(None).await?.finalize();
603 db.apply_batch(finalized).await?;
604 Ok(())
605 }
606
607 async fn apply_random_ops_inner<C>(
612 num_elements: u64,
613 commit_changes: bool,
614 rng_seed: u64,
615 mut db: C,
616 ) -> Result<C, Error>
617 where
618 C: DbAny,
619 C::Key: TestKey,
620 <C as LogStore>::Value: TestValue,
621 {
622 warn!("rng_seed={}", rng_seed);
624 let mut rng = StdRng::seed_from_u64(rng_seed);
625
626 let writes: Vec<_> = (0u64..num_elements)
628 .map(|i| {
629 let k = TestKey::from_seed(i);
630 let v = TestValue::from_seed(rng.next_u64());
631 (k, Some(v))
632 })
633 .collect();
634 if commit_changes {
635 commit_writes(&mut db, writes).await?;
636 }
637
638 let mut pending: Vec<(C::Key, Option<<C as LogStore>::Value>)> = Vec::new();
641 for _ in 0u64..num_elements * 10 {
642 let rand_key = TestKey::from_seed(rng.next_u64() % num_elements);
643 if rng.next_u32() % 7 == 0 {
644 pending.push((rand_key, None));
645 continue;
646 }
647 let v = TestValue::from_seed(rng.next_u64());
648 pending.push((rand_key, Some(v)));
649 if commit_changes && rng.next_u32() % 20 == 0 {
650 commit_writes(&mut db, pending.drain(..)).await?;
651 }
652 }
653 if commit_changes {
654 commit_writes(&mut db, pending).await?;
655 }
656 Ok(db)
657 }
658
659 pub fn apply_random_ops<C>(
660 num_elements: u64,
661 commit_changes: bool,
662 rng_seed: u64,
663 db: C,
664 ) -> std::pin::Pin<Box<dyn Future<Output = Result<C, Error>>>>
665 where
666 C: DbAny + 'static,
667 C::Key: TestKey,
668 <C as LogStore>::Value: TestValue,
669 {
670 Box::pin(apply_random_ops_inner::<C>(
671 num_elements,
672 commit_changes,
673 rng_seed,
674 db,
675 ))
676 }
677
678 pub fn test_build_random_close_reopen<C, F, Fut>(mut open_db: F)
683 where
684 C: DbAny + 'static,
685 C::Key: TestKey,
686 <C as LogStore>::Value: TestValue,
687 F: FnMut(Context, String) -> Fut + Clone,
688 Fut: Future<Output = C>,
689 {
690 const ELEMENTS: u64 = 1000;
691
692 let executor = deterministic::Runner::default();
693 let mut open_db_clone = open_db.clone();
694 let state1 = executor.start(|mut context| async move {
695 let partition = "build-random".to_string();
696 let rng_seed = context.next_u64();
697 let mut db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
698 db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db)
699 .await
700 .unwrap();
701 let finalized = db.new_batch().merkleize(None).await.unwrap().finalize();
702 db.apply_batch(finalized).await.unwrap();
703 db.sync().await.unwrap();
704
705 let root = db.root();
707 drop(db);
708 let db: C = open_db_clone(context.with_label("second"), partition).await;
709
710 assert_eq!(db.root(), root);
712
713 db.destroy().await.unwrap();
714 context.auditor().state()
715 });
716
717 let executor = deterministic::Runner::default();
719 let state2 = executor.start(|mut context| async move {
720 let partition = "build-random".to_string();
721 let rng_seed = context.next_u64();
722 let mut db: C = open_db(context.with_label("first"), partition.clone()).await;
723 db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db)
724 .await
725 .unwrap();
726 let finalized = db.new_batch().merkleize(None).await.unwrap().finalize();
727 db.apply_batch(finalized).await.unwrap();
728 db.sync().await.unwrap();
729
730 let root = db.root();
731 drop(db);
732 let db: C = open_db(context.with_label("second"), partition).await;
733 assert_eq!(db.root(), root);
734
735 db.destroy().await.unwrap();
736 context.auditor().state()
737 });
738
739 assert_eq!(state1, state2);
740 }
741
742 pub fn test_simulate_write_failures<C, F, Fut>(mut open_db: F)
747 where
748 C: DbAny + 'static,
749 C::Key: TestKey,
750 <C as LogStore>::Value: TestValue,
751 F: FnMut(Context, String) -> Fut + Clone,
752 Fut: Future<Output = C>,
753 {
754 const ELEMENTS: u64 = 1000;
755
756 let executor = deterministic::Runner::default();
757 executor.start(|mut context| {
759 Box::pin(async move {
760 let partition = "build-random-fail-commit".to_string();
761 let rng_seed = context.next_u64();
762 let mut db: C = open_db(context.with_label("first"), partition.clone()).await;
763 db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db)
764 .await
765 .unwrap();
766 commit_writes(&mut db, []).await.unwrap();
767 let committed_root = db.root();
768 let committed_op_count = db.bounds().await.end;
769 let committed_inactivity_floor = db.inactivity_floor_loc().await;
770 db.prune(committed_inactivity_floor).await.unwrap();
771
772 let db = apply_random_ops::<C>(ELEMENTS, false, rng_seed + 1, db)
774 .await
775 .unwrap();
776
777 drop(db);
780 let db: C = open_db(context.with_label("scenario1"), partition.clone()).await;
781 assert_eq!(db.root(), committed_root);
782 assert_eq!(db.bounds().await.end, committed_op_count);
783
784 let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed + 1, db)
786 .await
787 .unwrap();
788
789 let committed_op_count = db.bounds().await.end;
793 drop(db);
794
795 let db: C = open_db(context.with_label("scenario2"), partition.clone()).await;
798 let scenario_2_root = db.root();
799
800 let fresh_partition = "build-random-fail-commit-fresh".to_string();
803 let mut db: C = open_db(context.with_label("fresh"), fresh_partition.clone()).await;
804 db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db)
805 .await
806 .unwrap();
807 commit_writes(&mut db, []).await.unwrap();
808 db = apply_random_ops::<C>(ELEMENTS, true, rng_seed + 1, db)
809 .await
810 .unwrap();
811 db.prune(db.inactivity_floor_loc().await).await.unwrap();
812 assert_eq!(db.bounds().await.end, committed_op_count);
814 assert_eq!(db.root(), scenario_2_root);
815
816 db.destroy().await.unwrap();
817 })
818 });
819 }
820
821 pub fn test_different_pruning_delays_same_root<C, F, Fut>(mut open_db: F)
826 where
827 C: DbAny,
828 C::Key: TestKey,
829 <C as LogStore>::Value: TestValue,
830 F: FnMut(Context, String) -> Fut + Clone,
831 Fut: Future<Output = C>,
832 {
833 const NUM_OPERATIONS: u64 = 1000;
834
835 let executor = deterministic::Runner::default();
836 let mut open_db_clone = open_db.clone();
837 executor.start(|context| async move {
838 let mut db_no_pruning: C =
840 open_db_clone(context.with_label("no_pruning"), "no-pruning-test".into()).await;
841 let mut db_pruning: C =
842 open_db(context.with_label("pruning"), "pruning-test".into()).await;
843
844 let mut pending_no_pruning: Vec<(C::Key, Option<<C as LogStore>::Value>)> = Vec::new();
847 let mut pending_pruning: Vec<(C::Key, Option<<C as LogStore>::Value>)> = Vec::new();
848 for i in 0..NUM_OPERATIONS {
849 let key: C::Key = TestKey::from_seed(i);
850 let value: <C as LogStore>::Value = TestValue::from_seed(i * 1000);
851
852 pending_no_pruning.push((key, Some(value.clone())));
853 pending_pruning.push((key, Some(value)));
854
855 if i % 50 == 49 {
857 commit_writes(&mut db_no_pruning, pending_no_pruning.drain(..))
858 .await
859 .unwrap();
860 commit_writes(&mut db_pruning, pending_pruning.drain(..))
861 .await
862 .unwrap();
863 db_pruning
864 .prune(db_no_pruning.inactivity_floor_loc().await)
865 .await
866 .unwrap();
867 }
868 }
869
870 commit_writes(&mut db_no_pruning, pending_no_pruning)
872 .await
873 .unwrap();
874 commit_writes(&mut db_pruning, pending_pruning)
875 .await
876 .unwrap();
877
878 let root_no_pruning = db_no_pruning.root();
880 let root_pruning = db_pruning.root();
881 assert_eq!(root_no_pruning, root_pruning);
882
883 assert_eq!(
885 db_no_pruning.inactivity_floor_loc().await,
886 db_pruning.inactivity_floor_loc().await
887 );
888
889 db_no_pruning.destroy().await.unwrap();
890 db_pruning.destroy().await.unwrap();
891 });
892 }
893
894 pub fn test_sync_persists_bitmap_pruning_boundary<C, F, Fut>(mut open_db: F)
900 where
901 C: DbAny + BitmapPrunedBits + 'static,
902 C::Key: TestKey,
903 <C as LogStore>::Value: TestValue,
904 F: FnMut(Context, String) -> Fut + Clone,
905 Fut: Future<Output = C>,
906 {
907 const ELEMENTS: u64 = 500;
908
909 let executor = deterministic::Runner::default();
910 let mut open_db_clone = open_db.clone();
911 executor.start(|mut context| async move {
912 let partition = "sync-bitmap-pruning".to_string();
913 let rng_seed = context.next_u64();
914 let mut db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
915
916 db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db).await.unwrap();
918 let finalized = db.new_batch().merkleize(None).await.unwrap().finalize();
919 db.apply_batch(finalized).await.unwrap();
920
921 let pruned_bits_before = db.pruned_bits();
923 warn!(
924 "pruned_bits_before={}, inactivity_floor={}, op_count={}",
925 pruned_bits_before,
926 *db.inactivity_floor_loc().await,
927 *db.bounds().await.end
928 );
929
930 assert!(
932 pruned_bits_before > 0,
933 "Expected bitmap to have pruned bits after merkleization"
934 );
935
936 db.sync().await.unwrap();
939
940 let root_before = db.root();
942 drop(db);
943
944 let db: C = open_db(context.with_label("second"), partition).await;
946
947 let pruned_bits_after = db.pruned_bits();
950 warn!("pruned_bits_after={}", pruned_bits_after);
951
952 assert_eq!(
953 pruned_bits_after, pruned_bits_before,
954 "Bitmap pruned bits mismatch after reopen - sync() may not have called write_pruned()"
955 );
956
957 assert_eq!(db.root(), root_before);
959
960 db.destroy().await.unwrap();
961 });
962 }
963
964 pub fn test_current_db_build_big<C, F, Fut>(mut open_db: F)
970 where
971 C: DbAny,
972 C::Key: TestKey,
973 <C as LogStore>::Value: TestValue,
974 F: FnMut(Context, String) -> Fut + Clone,
975 Fut: Future<Output = C>,
976 {
977 const ELEMENTS: u64 = 1000;
978
979 let executor = deterministic::Runner::default();
980 let mut open_db_clone = open_db.clone();
981 executor.start(|context| async move {
982 let mut db: C = open_db_clone(context.with_label("first"), "build-big".into()).await;
983
984 let mut map = std::collections::HashMap::<C::Key, <C as LogStore>::Value>::default();
985
986 let finalized = {
988 let mut batch = db.new_batch();
989
990 for i in 0u64..ELEMENTS {
992 let k: C::Key = TestKey::from_seed(i);
993 let v: <C as LogStore>::Value = TestValue::from_seed(i * 1000);
994 batch.write(k, Some(v.clone()));
995 map.insert(k, v);
996 }
997
998 for i in 0u64..ELEMENTS {
1000 if i % 3 != 0 {
1001 continue;
1002 }
1003 let k: C::Key = TestKey::from_seed(i);
1004 let v: <C as LogStore>::Value = TestValue::from_seed((i + 1) * 10000);
1005 batch.write(k, Some(v.clone()));
1006 map.insert(k, v);
1007 }
1008
1009 for i in 0u64..ELEMENTS {
1011 if i % 7 != 1 {
1012 continue;
1013 }
1014 let k: C::Key = TestKey::from_seed(i);
1015 batch.write(k, None);
1016 map.remove(&k);
1017 }
1018
1019 batch.merkleize(None).await.unwrap().finalize()
1020 };
1021 db.apply_batch(finalized).await.unwrap();
1022
1023 db.sync().await.unwrap();
1025 db.prune(db.inactivity_floor_loc().await).await.unwrap();
1026
1027 let root = db.root();
1029 db.sync().await.unwrap();
1030 drop(db);
1031
1032 let db: C = open_db(context.with_label("second"), "build-big".into()).await;
1034 assert_eq!(root, db.root());
1035
1036 for i in 0u64..ELEMENTS {
1038 let k: C::Key = TestKey::from_seed(i);
1039 if let Some(map_value) = map.get(&k) {
1040 let Some(db_value) = db.get(&k).await.unwrap() else {
1041 panic!("key not found in db: {k}");
1042 };
1043 assert_eq!(*map_value, db_value);
1044 } else {
1045 assert!(db.get(&k).await.unwrap().is_none());
1046 }
1047 }
1048 });
1049 }
1050
1051 pub fn test_stale_changeset_side_effect_free<C, F, Fut>(mut open_db: F)
1055 where
1056 C: DbAny,
1057 C::Key: TestKey,
1058 <C as LogStore>::Value: TestValue,
1059 F: FnMut(Context, String) -> Fut,
1060 Fut: Future<Output = C>,
1061 {
1062 let executor = deterministic::Runner::default();
1063 executor.start(|context| async move {
1064 let mut db: C =
1065 open_db(context.with_label("db"), "stale-side-effect-free".into()).await;
1066
1067 let key1 = <C::Key as TestKey>::from_seed(1);
1068 let key2 = <C::Key as TestKey>::from_seed(2);
1069 let value1 = <<C as LogStore>::Value as TestValue>::from_seed(10);
1070 let value2 = <<C as LogStore>::Value as TestValue>::from_seed(20);
1071
1072 let changeset_a = {
1073 let mut batch = db.new_batch();
1074 batch.write(key1, Some(value1.clone()));
1075 batch.merkleize(None).await.unwrap().finalize()
1076 };
1077 let changeset_b = {
1078 let mut batch = db.new_batch();
1079 batch.write(key2, Some(value2));
1080 batch.merkleize(None).await.unwrap().finalize()
1081 };
1082
1083 db.apply_batch(changeset_a).await.unwrap();
1084 let expected_root = db.root();
1085 let expected_bounds = db.bounds().await;
1086 let expected_metadata = db.get_metadata().await.unwrap();
1087 assert_eq!(db.get(&key1).await.unwrap(), Some(value1.clone()));
1088 assert_eq!(db.get(&key2).await.unwrap(), None);
1089
1090 let result = db.apply_batch(changeset_b).await;
1091 assert!(
1092 matches!(result, Err(Error::StaleChangeset { .. })),
1093 "expected StaleChangeset error, got {result:?}"
1094 );
1095 assert_eq!(db.root(), expected_root);
1096 assert_eq!(db.bounds().await, expected_bounds);
1097 assert_eq!(db.get_metadata().await.unwrap(), expected_metadata);
1098 assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1099 assert_eq!(db.get(&key2).await.unwrap(), None);
1100
1101 db.destroy().await.unwrap();
1102 });
1103 }
1104
1105 use crate::translator::OneCap;
1106 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
1107 use commonware_macros::{test_group, test_traced};
1108
1109 type OrderedFixedDb = ordered::fixed::Db<Context, Digest, Digest, Sha256, OneCap, 32>;
1111 type OrderedVariableDb = ordered::variable::Db<Context, Digest, Digest, Sha256, OneCap, 32>;
1112 type UnorderedFixedDb = unordered::fixed::Db<Context, Digest, Digest, Sha256, OneCap, 32>;
1113 type UnorderedVariableDb = unordered::variable::Db<Context, Digest, Digest, Sha256, OneCap, 32>;
1114 type OrderedFixedP1Db =
1115 ordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1, 32>;
1116 type OrderedVariableP1Db =
1117 ordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1, 32>;
1118 type UnorderedFixedP1Db =
1119 unordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1, 32>;
1120 type UnorderedVariableP1Db =
1121 unordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1, 32>;
1122 type OrderedFixedP2Db =
1123 ordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2, 32>;
1124 type OrderedVariableP2Db =
1125 ordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2, 32>;
1126 type UnorderedFixedP2Db =
1127 unordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2, 32>;
1128 type UnorderedVariableP2Db =
1129 unordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2, 32>;
1130
1131 macro_rules! open_db_fn {
1133 ($db:ty, $cfg:ident) => {
1134 |ctx: Context, partition: String| async move {
1135 <$db>::init(ctx.clone(), $cfg::<OneCap>(&partition, &ctx))
1136 .await
1137 .unwrap()
1138 }
1139 };
1140 }
1141
1142 macro_rules! with_all_variants {
1144 ($cb:ident!($($args:tt)*)) => {
1145 $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1146 $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1147 $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1148 $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1149 $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1150 $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1151 $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1152 $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1153 $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1154 $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1155 $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1156 $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1157 };
1158 }
1159
1160 macro_rules! with_ordered_variants {
1162 ($cb:ident!($($args:tt)*)) => {
1163 $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1164 $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1165 $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1166 $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1167 $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1168 $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1169 };
1170 }
1171
1172 macro_rules! with_unordered_variants {
1174 ($cb:ident!($($args:tt)*)) => {
1175 $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1176 $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1177 $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1178 $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1179 $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1180 $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1181 };
1182 }
1183
1184 macro_rules! test_simple {
1186 ($f:expr, $l:literal, $db:ty, $cfg:ident) => {
1187 Box::pin(async {
1188 $f(open_db_fn!($db, $cfg));
1189 })
1190 .await
1191 };
1192 }
1193
1194 macro_rules! for_all_variants {
1196 (simple: $f:expr) => {{
1197 with_all_variants!(test_simple!($f));
1198 }};
1199 (ordered: $f:expr) => {{
1200 with_ordered_variants!(test_simple!($f));
1201 }};
1202 (unordered: $f:expr) => {{
1203 with_unordered_variants!(test_simple!($f));
1204 }};
1205 }
1206
1207 fn test_ordered_build_big<C, F, Fut>(open_db: F)
1209 where
1210 C: DbAny,
1211 C::Key: TestKey,
1212 <C as LogStore>::Value: TestValue,
1213 F: FnMut(Context, String) -> Fut + Clone,
1214 Fut: Future<Output = C>,
1215 {
1216 test_current_db_build_big::<C, F, Fut>(open_db);
1217 }
1218
1219 fn test_unordered_build_big<C, F, Fut>(open_db: F)
1220 where
1221 C: DbAny,
1222 C::Key: TestKey,
1223 <C as LogStore>::Value: TestValue,
1224 F: FnMut(Context, String) -> Fut + Clone,
1225 Fut: Future<Output = C>,
1226 {
1227 test_current_db_build_big::<C, F, Fut>(open_db);
1228 }
1229
1230 #[test_group("slow")]
1231 #[test_traced("WARN")]
1232 fn test_all_variants_build_random_close_reopen() {
1233 let executor = deterministic::Runner::default();
1234 executor.start(|_context| async move {
1235 for_all_variants!(simple: test_build_random_close_reopen);
1236 });
1237 }
1238
1239 #[test_group("slow")]
1240 #[test_traced("WARN")]
1241 fn test_all_variants_simulate_write_failures() {
1242 let executor = deterministic::Runner::default();
1243 executor.start(|_context| async move {
1244 for_all_variants!(simple: test_simulate_write_failures);
1245 });
1246 }
1247
1248 #[test_group("slow")]
1249 #[test_traced("WARN")]
1250 fn test_all_variants_different_pruning_delays_same_root() {
1251 let executor = deterministic::Runner::default();
1252 executor.start(|_context| async move {
1253 for_all_variants!(simple: test_different_pruning_delays_same_root);
1254 });
1255 }
1256
1257 #[test_group("slow")]
1258 #[test_traced("WARN")]
1259 fn test_all_variants_sync_persists_bitmap_pruning_boundary() {
1260 let executor = deterministic::Runner::default();
1261 executor.start(|_context| async move {
1262 for_all_variants!(simple: test_sync_persists_bitmap_pruning_boundary);
1263 });
1264 }
1265
1266 #[test_traced("WARN")]
1267 fn test_all_variants_stale_changeset_side_effect_free() {
1268 let executor = deterministic::Runner::default();
1269 executor.start(|_context| async move {
1270 for_all_variants!(simple: test_stale_changeset_side_effect_free);
1271 });
1272 }
1273
1274 #[test_group("slow")]
1275 #[test_traced("WARN")]
1276 fn test_ordered_variants_build_big() {
1277 let executor = deterministic::Runner::default();
1278 executor.start(|_context| async move {
1279 for_all_variants!(ordered: test_ordered_build_big);
1280 });
1281 }
1282
1283 #[test_group("slow")]
1284 #[test_traced("WARN")]
1285 fn test_unordered_variants_build_big() {
1286 let executor = deterministic::Runner::default();
1287 executor.start(|_context| async move {
1288 for_all_variants!(unordered: test_unordered_build_big);
1289 });
1290 }
1291
1292 #[test_group("slow")]
1293 #[test_traced("DEBUG")]
1294 fn test_ordered_variants_build_small_close_reopen() {
1295 let executor = deterministic::Runner::default();
1296 executor.start(|_context| async move {
1297 for_all_variants!(ordered: ordered::tests::test_build_small_close_reopen);
1298 });
1299 }
1300
1301 #[test_group("slow")]
1302 #[test_traced("DEBUG")]
1303 fn test_unordered_variants_build_small_close_reopen() {
1304 let executor = deterministic::Runner::default();
1305 executor.start(|_context| async move {
1306 for_all_variants!(unordered: unordered::tests::test_build_small_close_reopen);
1307 });
1308 }
1309
1310 fn key(i: u64) -> Digest {
1317 Sha256::hash(&i.to_be_bytes())
1318 }
1319
1320 fn val(i: u64) -> Digest {
1321 Sha256::hash(&(i + 10000).to_be_bytes())
1322 }
1323
1324 #[test_traced("INFO")]
1328 fn test_current_batch_speculative_root() {
1329 let executor = deterministic::Runner::default();
1330 executor.start(|context| async move {
1331 let ctx = context.with_label("db");
1332 let mut db: UnorderedVariableDb =
1333 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("sr", &ctx))
1334 .await
1335 .unwrap();
1336
1337 let mut batch = db.new_batch();
1338 for i in 0..10 {
1339 batch.write(key(i), Some(val(i)));
1340 }
1341 let merkleized = batch.merkleize(None).await.unwrap();
1342 let speculative_root = merkleized.root();
1343 let ops_root = merkleized.ops_root();
1344
1345 assert_ne!(speculative_root, ops_root);
1347
1348 let finalized = merkleized.finalize();
1349 db.apply_batch(finalized).await.unwrap();
1350
1351 assert_eq!(db.root(), speculative_root);
1353
1354 db.destroy().await.unwrap();
1355 });
1356 }
1357
1358 #[test_traced("INFO")]
1360 fn test_current_batch_merkleized_get() {
1361 let executor = deterministic::Runner::default();
1362 executor.start(|context| async move {
1363 let ctx = context.with_label("db");
1364 let mut db: UnorderedVariableDb =
1365 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("mg", &ctx))
1366 .await
1367 .unwrap();
1368
1369 let ka = key(0);
1370 let kb = key(1);
1371 let kc = key(2);
1372
1373 {
1375 let mut batch = db.new_batch();
1376 batch.write(ka, Some(val(0)));
1377 let finalized = batch.merkleize(None).await.unwrap().finalize();
1378 db.apply_batch(finalized).await.unwrap();
1379 }
1380
1381 let va2 = val(100);
1383 let vb = val(1);
1384 let mut batch = db.new_batch();
1385 batch.write(ka, Some(va2));
1386 batch.write(kb, Some(vb));
1387 let merkleized = batch.merkleize(None).await.unwrap();
1388
1389 assert_eq!(merkleized.get(&ka).await.unwrap(), Some(va2));
1390 assert_eq!(merkleized.get(&kb).await.unwrap(), Some(vb));
1391 assert_eq!(merkleized.get(&kc).await.unwrap(), None);
1392
1393 db.destroy().await.unwrap();
1394 });
1395 }
1396
1397 #[test_traced("INFO")]
1400 fn test_current_batch_chaining() {
1401 let executor = deterministic::Runner::default();
1402 executor.start(|context| async move {
1403 let ctx = context.with_label("db");
1404 let mut db: UnorderedVariableDb =
1405 UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("ch", &ctx))
1406 .await
1407 .unwrap();
1408
1409 let mut parent = db.new_batch();
1411 for i in 0..5 {
1412 parent.write(key(i), Some(val(i)));
1413 }
1414 let parent_m = parent.merkleize(None).await.unwrap();
1415
1416 let mut child = parent_m.new_batch();
1418 for i in 5..10 {
1419 child.write(key(i), Some(val(i)));
1420 }
1421 child.write(key(0), Some(val(999)));
1422 let child_m = child.merkleize(None).await.unwrap();
1423
1424 let child_root = child_m.root();
1425
1426 assert_eq!(child_m.get(&key(0)).await.unwrap(), Some(val(999)));
1428 assert_eq!(child_m.get(&key(3)).await.unwrap(), Some(val(3)));
1429 assert_eq!(child_m.get(&key(7)).await.unwrap(), Some(val(7)));
1430
1431 let finalized = child_m.finalize();
1432 db.apply_batch(finalized).await.unwrap();
1433 assert_eq!(db.root(), child_root);
1434
1435 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(999)));
1437 for i in 1..10 {
1438 assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
1439 }
1440
1441 db.destroy().await.unwrap();
1442 });
1443 }
1444}