1use crate::{
8 index::unordered::Index,
9 journal::contiguous::variable::Journal,
10 merkle::{self, Location},
11 qmdb::{
12 any::{unordered, value::VariableEncoding, VariableConfig, VariableValue},
13 operation::Key,
14 Error,
15 },
16 translator::Translator,
17 Context,
18};
19use commonware_codec::{Codec, Read};
20use commonware_cryptography::Hasher;
21
22pub type Update<K, V> = unordered::Update<K, VariableEncoding<V>>;
23pub type Operation<F, K, V> = unordered::Operation<F, K, VariableEncoding<V>>;
24
25pub type Db<F, E, K, V, H, T> =
28 super::Db<F, E, Journal<E, Operation<F, K, V>>, Index<T, Location<F>>, H, Update<K, V>>;
29
30impl<F: merkle::Family, E: Context, K: Key, V: VariableValue, H: Hasher, T: Translator>
31 Db<F, E, K, V, H, T>
32where
33 Operation<F, K, V>: Codec,
34{
35 pub async fn init(
38 context: E,
39 cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
40 ) -> Result<Self, Error<F>> {
41 Self::init_with_callback(context, cfg, None, |_, _| {}).await
42 }
43
44 pub(crate) async fn init_with_callback(
51 context: E,
52 cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
53 known_inactivity_floor: Option<Location<F>>,
54 callback: impl FnMut(bool, Option<Location<F>>),
55 ) -> Result<Self, Error<F>> {
56 crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
57 }
58}
59
60pub mod partitioned {
66 pub use super::{Operation, Update};
67 use crate::{
68 index::partitioned::unordered::Index,
69 journal::contiguous::variable::Journal,
70 merkle::{self, Location},
71 qmdb::{
72 any::{VariableConfig, VariableValue},
73 operation::Key,
74 Error,
75 },
76 translator::Translator,
77 Context,
78 };
79 use commonware_codec::{Codec, Read};
80 use commonware_cryptography::Hasher;
81
82 pub type Db<F, E, K, V, H, T, const P: usize> = crate::qmdb::any::unordered::Db<
92 F,
93 E,
94 Journal<E, Operation<F, K, V>>,
95 Index<T, Location<F>, P>,
96 H,
97 Update<K, V>,
98 >;
99
100 impl<
101 F: merkle::Family,
102 E: Context,
103 K: Key,
104 V: VariableValue,
105 H: Hasher,
106 T: Translator,
107 const P: usize,
108 > Db<F, E, K, V, H, T, P>
109 where
110 Operation<F, K, V>: Codec,
111 {
112 pub async fn init(
115 context: E,
116 cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
117 ) -> Result<Self, Error<F>> {
118 Self::init_with_callback(context, cfg, None, |_, _| {}).await
119 }
120
121 pub(crate) async fn init_with_callback(
128 context: E,
129 cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
130 known_inactivity_floor: Option<Location<F>>,
131 callback: impl FnMut(bool, Option<Location<F>>),
132 ) -> Result<Self, Error<F>> {
133 crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
134 }
135 }
136
137 pub mod p256 {
139 pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 1>;
141 }
142
143 pub mod p64k {
145 pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 2>;
147 }
148}
149
150#[cfg(test)]
151pub(crate) mod test {
152 use super::*;
153 use crate::{index::Unordered as _, mmr, translator::TwoCap};
154 use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
155 use commonware_macros::test_traced;
156 use commonware_math::algebra::Random;
157 use commonware_runtime::{
158 buffer::paged::CacheRef,
159 deterministic::{self, Context},
160 BufferPooler, Metrics, Runner as _,
161 };
162 use commonware_utils::{test_rng_seeded, NZUsize, NZU16, NZU64};
163 use rand::RngCore;
164 use std::{
165 num::{NonZeroU16, NonZeroUsize},
166 sync::Arc,
167 };
168
169 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
170 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
171
172 pub(crate) fn create_test_config(seed: u64, pooler: &impl BufferPooler) -> VarConfig {
173 let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
174 VariableConfig {
175 merkle_config: crate::mmr::journaled::Config {
176 journal_partition: format!("journal-{seed}"),
177 metadata_partition: format!("metadata-{seed}"),
178 items_per_blob: NZU64!(13),
179 write_buffer: NZUsize!(1024),
180 thread_pool: None,
181 page_cache: page_cache.clone(),
182 },
183 journal_config: crate::journal::contiguous::variable::Config {
184 partition: format!("log-journal-{seed}"),
185 items_per_section: NZU64!(7),
186 write_buffer: NZUsize!(1024),
187 compression: None,
188 codec_config: ((), ((0..=10000).into(), ())),
189 page_cache,
190 },
191 translator: TwoCap,
192 }
193 }
194
195 pub(crate) type VarConfig =
196 VariableConfig<TwoCap, ((), (commonware_codec::RangeCfg<usize>, ()))>;
197
198 pub(crate) type AnyTest =
200 Db<mmr::Family, deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
201
202 pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
204 let seed = context.next_u64();
205 let config = create_test_config(seed, &context);
206 AnyTest::init(context, config).await.unwrap()
207 }
208
209 fn to_bytes(i: u64) -> Vec<u8> {
211 let len = ((i % 13) + 7) as usize;
212 vec![(i % 255) as u8; len]
213 }
214
215 pub(crate) fn create_test_ops(
219 n: usize,
220 ) -> Vec<unordered::Operation<mmr::Family, Digest, VariableEncoding<Vec<u8>>>> {
221 create_test_ops_seeded(n, 0)
222 }
223
224 pub(crate) fn create_test_ops_seeded(
227 n: usize,
228 seed: u64,
229 ) -> Vec<unordered::Operation<mmr::Family, Digest, VariableEncoding<Vec<u8>>>> {
230 let mut rng = test_rng_seeded(seed);
231 let mut prev_key = Digest::random(&mut rng);
232 let mut ops = Vec::new();
233 for i in 0..n {
234 let key = Digest::random(&mut rng);
235 if i % 10 == 0 && i > 0 {
236 ops.push(unordered::Operation::Delete(prev_key));
237 } else {
238 let value = to_bytes(rng.next_u64());
239 ops.push(unordered::Operation::Update(unordered::Update(key, value)));
240 prev_key = key;
241 }
242 }
243 ops
244 }
245
246 pub(crate) async fn apply_ops(
248 db: &mut AnyTest,
249 ops: Vec<unordered::Operation<mmr::Family, Digest, VariableEncoding<Vec<u8>>>>,
250 ) {
251 let mut batch = db.new_batch();
252 for op in ops {
253 match op {
254 unordered::Operation::Update(unordered::Update(key, value)) => {
255 batch = batch.write(key, Some(value));
256 }
257 unordered::Operation::Delete(key) => {
258 batch = batch.write(key, None);
259 }
260 unordered::Operation::CommitFloor(_, _) => {
261 panic!("CommitFloor not supported in apply_ops");
262 }
263 }
264 }
265 let merkleized = batch.merkleize(db, None).await.unwrap();
266 db.apply_batch(merkleized).await.unwrap();
267 }
268
269 async fn open_db(context: deterministic::Context) -> AnyTest {
271 let cfg = create_test_config(0, &context);
272 AnyTest::init(context, cfg).await.unwrap()
273 }
274
275 #[test_traced("WARN")]
276 pub fn test_any_variable_db_build_and_authenticate() {
277 let executor = deterministic::Runner::default();
278 executor.start(|context| async move {
279 let db = open_db(context.clone()).await;
280 crate::qmdb::any::test::test_any_db_build_and_authenticate(
281 context,
282 db,
283 |ctx| Box::pin(open_db(ctx)),
284 to_bytes,
285 )
286 .await;
287 });
288 }
289
290 #[test_traced("WARN")]
291 pub fn test_any_variable_db_recovery() {
292 let executor = deterministic::Runner::default();
293 const ELEMENTS: u64 = 1000;
295 executor.start(|context| async move {
296 let db = open_db(context.with_label("open1")).await;
297 let root = db.root();
298
299 {
301 let mut batch = db.new_batch();
302 for i in 0..ELEMENTS {
303 batch = batch.write(
304 Sha256::hash(&i.to_be_bytes()),
305 Some(vec![(i % 255) as u8; ((i % 13) + 7) as usize]),
306 );
307 }
308 let _ = batch.merkleize(&db, None).await.unwrap();
309 }
310
311 drop(db);
313 let mut db = open_db(context.with_label("open2")).await;
314 assert_eq!(root, db.root());
315
316 let mut batch = db.new_batch();
318 for i in 0u64..ELEMENTS {
319 let k = Sha256::hash(&i.to_be_bytes());
320 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
321 batch = batch.write(k, Some(v));
322 }
323 let merkleized = batch.merkleize(&db, None).await.unwrap();
324 db.apply_batch(merkleized).await.unwrap();
325 db.commit().await.unwrap();
326 let root = db.root();
327
328 {
330 let mut batch = db.new_batch();
331 for i in 0u64..ELEMENTS {
332 if i % 3 != 0 {
333 continue;
334 }
335 let k = Sha256::hash(&i.to_be_bytes());
336 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
337 batch = batch.write(k, Some(v));
338 }
339 let _ = batch.merkleize(&db, None).await.unwrap();
340 }
341
342 drop(db);
344 let mut db = open_db(context.with_label("open3")).await;
345 assert_eq!(root, db.root());
346
347 let mut batch = db.new_batch();
349 for i in 0u64..ELEMENTS {
350 if i % 3 != 0 {
351 continue;
352 }
353 let k = Sha256::hash(&i.to_be_bytes());
354 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
355 batch = batch.write(k, Some(v));
356 }
357 let merkleized = batch.merkleize(&db, None).await.unwrap();
358 db.apply_batch(merkleized).await.unwrap();
359 db.commit().await.unwrap();
360 let root = db.root();
361
362 {
364 let mut batch = db.new_batch();
365 for i in 0u64..ELEMENTS {
366 if i % 7 != 1 {
367 continue;
368 }
369 let k = Sha256::hash(&i.to_be_bytes());
370 batch = batch.write(k, None);
371 }
372 let _ = batch.merkleize(&db, None).await.unwrap();
373 }
374
375 drop(db);
377 let mut db = open_db(context.with_label("open4")).await;
378 assert_eq!(root, db.root());
379
380 let mut batch = db.new_batch();
382 for i in 0u64..ELEMENTS {
383 if i % 7 != 1 {
384 continue;
385 }
386 let k = Sha256::hash(&i.to_be_bytes());
387 batch = batch.write(k, None);
388 }
389 let merkleized = batch.merkleize(&db, None).await.unwrap();
390 db.apply_batch(merkleized).await.unwrap();
391 db.commit().await.unwrap();
392
393 let root = db.root();
394 let inactivity_floor = db.inactivity_floor_loc();
395 db.sync().await.unwrap(); db.prune(inactivity_floor).await.unwrap();
397 let bounds = db.bounds().await;
398 let snapshot_items = db.snapshot.items();
399
400 db.sync().await.unwrap();
401 drop(db);
402
403 let db = open_db(context.with_label("open5")).await;
405 assert_eq!(root, db.root());
406 assert_eq!(db.bounds().await, bounds);
407 assert_eq!(db.inactivity_floor_loc(), inactivity_floor);
408 assert_eq!(db.snapshot.items(), snapshot_items);
409
410 db.destroy().await.unwrap();
411 });
412 }
413
414 #[test_traced]
415 fn test_any_variable_db_prune_beyond_inactivity_floor() {
416 let executor = deterministic::Runner::default();
417 executor.start(|mut context| async move {
418 let mut db = open_db(context.clone()).await;
419
420 let key1 = Digest::random(&mut context);
422 let key2 = Digest::random(&mut context);
423 let key3 = Digest::random(&mut context);
424
425 let merkleized = db
426 .new_batch()
427 .write(key1, Some(vec![10]))
428 .write(key2, Some(vec![20]))
429 .write(key3, Some(vec![30]))
430 .merkleize(&db, None)
431 .await
432 .unwrap();
433 db.apply_batch(merkleized).await.unwrap();
434
435 let inactivity_floor = db.inactivity_floor_loc();
437 let beyond_floor = Location::new(*inactivity_floor + 1);
438
439 let result = db.prune(beyond_floor).await;
441 assert!(
442 matches!(result, Err(Error::PruneBeyondMinRequired(loc, floor))
443 if loc == beyond_floor && floor == inactivity_floor)
444 );
445
446 db.destroy().await.unwrap();
447 });
448 }
449
450 #[test_traced]
451 fn test_stale_batch_rejected() {
452 let executor = deterministic::Runner::default();
453 executor.start(|context| async move {
454 let mut db = open_db(context.clone()).await;
455
456 let key1 = Sha256::hash(&[1]);
457 let key2 = Sha256::hash(&[2]);
458
459 let batch_a = db
461 .new_batch()
462 .write(key1, Some(vec![10]))
463 .merkleize(&db, None)
464 .await
465 .unwrap();
466 let batch_b = db
467 .new_batch()
468 .write(key2, Some(vec![20]))
469 .merkleize(&db, None)
470 .await
471 .unwrap();
472
473 db.apply_batch(batch_a).await.unwrap();
475 let expected_root = db.root();
476 let expected_bounds = db.bounds().await;
477 assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
478 assert_eq!(db.get(&key2).await.unwrap(), None);
479
480 let result = db.apply_batch(batch_b).await;
482 assert!(
483 matches!(result, Err(Error::StaleBatch { .. })),
484 "expected StaleBatch error, got {result:?}"
485 );
486 assert_eq!(db.root(), expected_root);
487 assert_eq!(db.bounds().await, expected_bounds);
488 assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
489 assert_eq!(db.get(&key2).await.unwrap(), None);
490
491 db.destroy().await.unwrap();
492 });
493 }
494
495 #[test_traced]
498 fn test_stale_batch_rejected_different_sizes() {
499 let executor = deterministic::Runner::default();
500 executor.start(|context| async move {
501 let mut db = open_db(context.clone()).await;
502
503 let batch_a = db
505 .new_batch()
506 .write(Sha256::hash(&[1]), Some(vec![10]))
507 .merkleize(&db, None)
508 .await
509 .unwrap();
510 let batch_b = db
511 .new_batch()
512 .write(Sha256::hash(&[2]), Some(vec![20]))
513 .write(Sha256::hash(&[3]), Some(vec![30]))
514 .write(Sha256::hash(&[4]), Some(vec![40]))
515 .write(Sha256::hash(&[5]), Some(vec![50]))
516 .write(Sha256::hash(&[6]), Some(vec![60]))
517 .merkleize(&db, None)
518 .await
519 .unwrap();
520
521 assert!(batch_b.total_size > batch_a.total_size);
523
524 db.apply_batch(batch_a).await.unwrap();
526 let result = db.apply_batch(batch_b).await;
527 assert!(
528 matches!(result, Err(Error::StaleBatch { .. })),
529 "expected StaleBatch for asymmetric sibling, got {result:?}"
530 );
531
532 db.destroy().await.unwrap();
533 });
534 }
535
536 #[test_traced]
540 fn test_partial_ancestor_commit() {
541 let executor = deterministic::Runner::default();
542 executor.start(|context| async move {
543 let mut db = open_db(context.clone()).await;
544
545 let key1 = Sha256::hash(&[1]);
546 let key2 = Sha256::hash(&[2]);
547 let key3 = Sha256::hash(&[3]);
548
549 let a = db
551 .new_batch()
552 .write(key1, Some(vec![10]))
553 .merkleize(&db, None)
554 .await
555 .unwrap();
556 let b = a
557 .new_batch::<Sha256>()
558 .write(key2, Some(vec![20]))
559 .merkleize(&db, None)
560 .await
561 .unwrap();
562 let c = b
563 .new_batch::<Sha256>()
564 .write(key3, Some(vec![30]))
565 .merkleize(&db, None)
566 .await
567 .unwrap();
568
569 let expected_root = c.root();
570
571 db.apply_batch(a).await.unwrap();
573 db.apply_batch(c).await.unwrap();
574
575 assert_eq!(db.root(), expected_root);
576 assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
577 assert_eq!(db.get(&key2).await.unwrap(), Some(vec![20]));
578 assert_eq!(db.get(&key3).await.unwrap(), Some(vec![30]));
579
580 db.destroy().await.unwrap();
581 });
582 }
583
584 #[test_traced]
585 fn test_stale_batch_chained() {
586 let executor = deterministic::Runner::default();
587 executor.start(|context| async move {
588 let mut db = open_db(context.clone()).await;
589
590 let key1 = Sha256::hash(&[1]);
591 let key2 = Sha256::hash(&[2]);
592 let key3 = Sha256::hash(&[3]);
593
594 let merkleized = db
596 .new_batch()
597 .write(key1, Some(vec![10]))
598 .merkleize(&db, None)
599 .await
600 .unwrap();
601 db.apply_batch(merkleized).await.unwrap();
602
603 let parent = db
605 .new_batch()
606 .write(key2, Some(vec![20]))
607 .merkleize(&db, None)
608 .await
609 .unwrap();
610
611 let child_a = parent
612 .new_batch::<Sha256>()
613 .write(key3, Some(vec![30]))
614 .merkleize(&db, None)
615 .await
616 .unwrap();
617 let child_b = parent
618 .new_batch::<Sha256>()
619 .write(key3, Some(vec![40]))
620 .merkleize(&db, None)
621 .await
622 .unwrap();
623
624 db.apply_batch(child_a).await.unwrap();
626 let result = db.apply_batch(child_b).await;
627 assert!(
628 matches!(result, Err(Error::StaleBatch { .. })),
629 "expected StaleBatch error for sibling, got {result:?}"
630 );
631
632 db.destroy().await.unwrap();
633 });
634 }
635
636 #[test_traced]
640 fn test_sequential_commit_parent_then_child() {
641 let executor = deterministic::Runner::default();
642 executor.start(|context| async move {
643 let mut db = open_db(context.clone()).await;
644
645 let key1 = Sha256::hash(&[1]);
646 let key2 = Sha256::hash(&[2]);
647
648 let parent = db
650 .new_batch()
651 .write(key1, Some(vec![10]))
652 .merkleize(&db, None)
653 .await
654 .unwrap();
655 let child = parent
656 .new_batch::<Sha256>()
657 .write(key2, Some(vec![20]))
658 .merkleize(&db, None)
659 .await
660 .unwrap();
661
662 db.apply_batch(parent).await.unwrap();
664 db.apply_batch(child).await.unwrap();
665
666 assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
667 assert_eq!(db.get(&key2).await.unwrap(), Some(vec![20]));
668
669 db.destroy().await.unwrap();
670 });
671 }
672
673 #[test_traced]
674 fn test_stale_batch_child_applied_before_parent() {
675 let executor = deterministic::Runner::default();
676 executor.start(|context| async move {
677 let mut db = open_db(context.clone()).await;
678
679 let key1 = Sha256::hash(&[1]);
680 let key2 = Sha256::hash(&[2]);
681
682 let parent = db
684 .new_batch()
685 .write(key1, Some(vec![10]))
686 .merkleize(&db, None)
687 .await
688 .unwrap();
689 let child = parent
690 .new_batch::<Sha256>()
691 .write(key2, Some(vec![20]))
692 .merkleize(&db, None)
693 .await
694 .unwrap();
695
696 db.apply_batch(child).await.unwrap();
698 let result = db.apply_batch(parent).await;
699 assert!(
700 matches!(result, Err(Error::StaleBatch { .. })),
701 "expected StaleBatch for parent after child applied, got {result:?}"
702 );
703
704 db.destroy().await.unwrap();
705 });
706 }
707
708 mod from_sync_testable {
710 use super::*;
711 use crate::{
712 merkle::{
713 mmr::{self, journaled::Mmr},
714 Family as _,
715 },
716 qmdb::any::sync::tests::FromSyncTestable,
717 };
718 use futures::future::join_all;
719
720 type TestMmr = Mmr<deterministic::Context, Digest>;
721
722 impl FromSyncTestable for AnyTest {
723 type Mmr = TestMmr;
724
725 fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
726 (self.log.merkle, self.log.journal)
727 }
728
729 async fn pinned_nodes_at(&self, loc: mmr::Location) -> Vec<Digest> {
730 join_all(mmr::Family::nodes_to_pin(loc).map(|p| self.log.merkle.get_node(p)))
731 .await
732 .into_iter()
733 .map(|n| n.unwrap().unwrap())
734 .collect()
735 }
736 }
737 }
738
739 #[allow(dead_code, clippy::manual_async_fn)]
741 fn issue_2787_regression(
742 db: &crate::qmdb::immutable::variable::Db<
743 mmr::Family,
744 deterministic::Context,
745 Digest,
746 Vec<u8>,
747 Sha256,
748 TwoCap,
749 >,
750 key: Digest,
751 ) -> impl std::future::Future<Output = ()> + Send + use<'_> {
752 async move {
753 let _ = db.get(&key).await;
754 }
755 }
756
757 fn is_send<T: Send>(_: T) {}
758
759 #[allow(dead_code)]
760 fn assert_non_trait_futures_are_send(db: &AnyTest, key: Digest, value: Vec<u8>) {
761 let batch = db.new_batch().write(key, Some(value));
762 is_send(batch.merkleize(db, None));
763 is_send(db.get_with_loc(&key));
764 }
765
766 #[test_traced("WARN")]
768 fn test_owned_batch_root_matches() {
769 let executor = deterministic::Runner::default();
770 executor.start(|context| async move {
771 let mut db = create_test_db(context).await;
772
773 apply_ops(&mut db, create_test_ops(20)).await;
775 db.commit().await.unwrap();
776
777 let base = db.to_batch();
779
780 let ops = create_test_ops_seeded(10, 99);
782
783 let mut batch = db.new_batch();
785 for op in &ops {
786 match op {
787 unordered::Operation::Update(unordered::Update(k, v)) => {
788 batch = batch.write(*k, Some(v.clone()));
789 }
790 unordered::Operation::Delete(k) => {
791 batch = batch.write(*k, None);
792 }
793 _ => unreachable!(),
794 }
795 }
796 let borrow_root = batch.merkleize(&db, None).await.unwrap().root();
797
798 let mut batch = base.new_batch::<Sha256>();
800 for op in &ops {
801 match op {
802 unordered::Operation::Update(unordered::Update(k, v)) => {
803 batch = batch.write(*k, Some(v.clone()));
804 }
805 unordered::Operation::Delete(k) => {
806 batch = batch.write(*k, None);
807 }
808 _ => unreachable!(),
809 }
810 }
811 let batch_root = batch.merkleize(&db, None).await.unwrap().root();
812
813 assert_eq!(borrow_root, batch_root);
814
815 db.destroy().await.unwrap();
816 });
817 }
818
819 #[test_traced("WARN")]
821 fn test_owned_batch_apply() {
822 let executor = deterministic::Runner::default();
823 executor.start(|context| async move {
824 let mut db = create_test_db(context).await;
825
826 apply_ops(&mut db, create_test_ops(20)).await;
828 db.commit().await.unwrap();
829
830 let base = db.to_batch();
831
832 let key = Digest::random(&mut commonware_utils::test_rng_seeded(200));
834 let value = vec![42u8; 16];
835 let child_batch = base
836 .new_batch::<Sha256>()
837 .write(key, Some(value.clone()))
838 .merkleize(&db, None)
839 .await
840 .unwrap();
841
842 db.apply_batch(child_batch).await.unwrap();
844 db.commit().await.unwrap();
845
846 let fetched = db.get(&key).await.unwrap();
848 assert_eq!(fetched.unwrap(), value);
849
850 db.destroy().await.unwrap();
851 });
852 }
853
854 #[test_traced("WARN")]
856 fn test_owned_batch_chain_commit_parent_first() {
857 let executor = deterministic::Runner::default();
858 executor.start(|context| async move {
859 let mut db = create_test_db(context).await;
860
861 apply_ops(&mut db, create_test_ops(10)).await;
863 db.commit().await.unwrap();
864
865 let base = db.to_batch();
866
867 let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(300));
869 let val_a = vec![1u8; 10];
870 let parent_batch = base
871 .new_batch::<Sha256>()
872 .write(key_a, Some(val_a.clone()))
873 .merkleize(&db, None)
874 .await
875 .unwrap();
876
877 let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(301));
879 let val_b = vec![2u8; 10];
880 let child_batch = parent_batch
881 .new_batch::<Sha256>()
882 .write(key_b, Some(val_b.clone()))
883 .merkleize(&db, None)
884 .await
885 .unwrap();
886
887 db.apply_batch(parent_batch).await.unwrap();
888 db.commit().await.unwrap();
889
890 db.apply_batch(child_batch).await.unwrap();
892 db.commit().await.unwrap();
893
894 assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
896 assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
897
898 db.destroy().await.unwrap();
899 });
900 }
901
902 #[test_traced("WARN")]
904 fn test_owned_batch_multiple_forks() {
905 let executor = deterministic::Runner::default();
906 executor.start(|context| async move {
907 let mut db = create_test_db(context).await;
908
909 apply_ops(&mut db, create_test_ops(10)).await;
910 db.commit().await.unwrap();
911
912 let base = db.to_batch();
913
914 let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(400));
916 let fork_a = base
917 .new_batch::<Sha256>()
918 .write(key_a, Some(vec![10u8; 8]))
919 .merkleize(&db, None)
920 .await
921 .unwrap();
922
923 let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(401));
925 let fork_b = base
926 .new_batch::<Sha256>()
927 .write(key_b, Some(vec![20u8; 8]))
928 .merkleize(&db, None)
929 .await
930 .unwrap();
931
932 assert_ne!(fork_a.root(), fork_b.root());
934
935 db.apply_batch(fork_a).await.unwrap();
937 db.commit().await.unwrap();
938
939 assert_eq!(db.get(&key_a).await.unwrap().unwrap(), vec![10u8; 8]);
940 assert!(db.get(&key_b).await.unwrap().is_none());
941
942 db.destroy().await.unwrap();
943 });
944 }
945
946 #[test_traced("WARN")]
948 fn test_owned_batch_homogeneous_collection() {
949 use crate::qmdb::any::batch::MerkleizedBatch;
950 use commonware_cryptography::sha256;
951 use std::collections::HashMap;
952
953 type Snap = MerkleizedBatch<mmr::Family, sha256::Digest, super::Update<Digest, Vec<u8>>>;
954
955 let executor = deterministic::Runner::default();
956 executor.start(|context| async move {
957 let mut db = create_test_db(context).await;
958
959 apply_ops(&mut db, create_test_ops(10)).await;
960 db.commit().await.unwrap();
961
962 let base = db.to_batch();
963
964 let mut collection: HashMap<sha256::Digest, Arc<Snap>> = HashMap::new();
966
967 let key = Digest::random(&mut commonware_utils::test_rng_seeded(500));
969 let batch1 = base
970 .new_batch::<Sha256>()
971 .write(key, Some(vec![1u8; 8]))
972 .merkleize(&db, None)
973 .await
974 .unwrap();
975 collection.insert(batch1.root(), batch1);
976
977 let batch1_root = *collection.keys().next().unwrap();
979 let batch1_ref = collection.get(&batch1_root).unwrap();
980 let key = Digest::random(&mut commonware_utils::test_rng_seeded(501));
981 let batch2 = batch1_ref
982 .new_batch::<Sha256>()
983 .write(key, Some(vec![2u8; 8]))
984 .merkleize(&db, None)
985 .await
986 .unwrap();
987 collection.insert(batch2.root(), batch2);
988
989 assert_eq!(collection.len(), 2);
991
992 db.destroy().await.unwrap();
993 });
994 }
995
996 #[test_traced("WARN")]
998 fn test_owned_batch_chain_delete_after_ancestor_insert() {
999 let executor = deterministic::Runner::default();
1000 executor.start(|context| async move {
1001 let mut db = create_test_db(context).await;
1002
1003 apply_ops(&mut db, create_test_ops(5)).await;
1004 db.commit().await.unwrap();
1005
1006 let base = db.to_batch();
1007
1008 let key_x = Digest::random(&mut commonware_utils::test_rng_seeded(700));
1010 let val_a = vec![10u8; 8];
1011 let parent_batch = base
1012 .new_batch::<Sha256>()
1013 .write(key_x, Some(val_a.clone()))
1014 .merkleize(&db, None)
1015 .await
1016 .unwrap();
1017
1018 let child_batch = parent_batch
1020 .new_batch::<Sha256>()
1021 .write(key_x, None)
1022 .merkleize(&db, None)
1023 .await
1024 .unwrap();
1025
1026 db.apply_batch(parent_batch).await.unwrap();
1027 db.commit().await.unwrap();
1028 assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
1029
1030 db.apply_batch(child_batch).await.unwrap();
1032 db.commit().await.unwrap();
1033
1034 assert!(db.get(&key_x).await.unwrap().is_none());
1036
1037 db.destroy().await.unwrap();
1038 });
1039 }
1040
1041 #[test_traced("WARN")]
1043 fn test_owned_batch_chain_overlapping_keys() {
1044 let executor = deterministic::Runner::default();
1045 executor.start(|context| async move {
1046 let mut db = create_test_db(context).await;
1047
1048 apply_ops(&mut db, create_test_ops(5)).await;
1050 db.commit().await.unwrap();
1051
1052 let base = db.to_batch();
1053
1054 let key_x = Digest::random(&mut commonware_utils::test_rng_seeded(600));
1056 let val_a = vec![10u8; 8];
1057 let parent_batch = base
1058 .new_batch::<Sha256>()
1059 .write(key_x, Some(val_a.clone()))
1060 .merkleize(&db, None)
1061 .await
1062 .unwrap();
1063
1064 let val_b = vec![20u8; 8];
1066 let child_batch = parent_batch
1067 .new_batch::<Sha256>()
1068 .write(key_x, Some(val_b.clone()))
1069 .merkleize(&db, None)
1070 .await
1071 .unwrap();
1072
1073 db.apply_batch(parent_batch).await.unwrap();
1074 db.commit().await.unwrap();
1075
1076 assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
1078
1079 db.apply_batch(child_batch).await.unwrap();
1081 db.commit().await.unwrap();
1082
1083 assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_b);
1085
1086 db.destroy().await.unwrap();
1087 });
1088 }
1089
1090 #[test_traced("WARN")]
1093 fn test_owned_batch_chain_three_deep() {
1094 let executor = deterministic::Runner::default();
1095 executor.start(|context| async move {
1096 let mut db = create_test_db(context).await;
1097
1098 apply_ops(&mut db, create_test_ops(10)).await;
1099 db.commit().await.unwrap();
1100
1101 let base = db.to_batch();
1102
1103 let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(900));
1105 let val_a = vec![1u8; 10];
1106 let grandparent_batch = base
1107 .new_batch::<Sha256>()
1108 .write(key_a, Some(val_a.clone()))
1109 .merkleize(&db, None)
1110 .await
1111 .unwrap();
1112
1113 let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(901));
1115 let val_b = vec![2u8; 10];
1116 let parent_batch = grandparent_batch
1117 .new_batch::<Sha256>()
1118 .write(key_b, Some(val_b.clone()))
1119 .merkleize(&db, None)
1120 .await
1121 .unwrap();
1122
1123 let key_c = Digest::random(&mut commonware_utils::test_rng_seeded(902));
1125 let val_c = vec![3u8; 10];
1126 let child_batch = parent_batch
1127 .new_batch::<Sha256>()
1128 .write(key_c, Some(val_c.clone()))
1129 .merkleize(&db, None)
1130 .await
1131 .unwrap();
1132
1133 db.apply_batch(grandparent_batch).await.unwrap();
1134 db.commit().await.unwrap();
1135 assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
1136
1137 db.apply_batch(parent_batch).await.unwrap();
1139 db.commit().await.unwrap();
1140 assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
1141
1142 db.apply_batch(child_batch).await.unwrap();
1144 db.commit().await.unwrap();
1145 assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
1146
1147 assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
1149 assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
1150 assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
1151
1152 db.destroy().await.unwrap();
1153 });
1154 }
1155
1156 #[test_traced("WARN")]
1158 fn test_owned_batch_chain_three_deep_overlapping_key() {
1159 let executor = deterministic::Runner::default();
1160 executor.start(|context| async move {
1161 let mut db = create_test_db(context).await;
1162
1163 apply_ops(&mut db, create_test_ops(5)).await;
1164 db.commit().await.unwrap();
1165
1166 let base = db.to_batch();
1167 let key_x = Digest::random(&mut commonware_utils::test_rng_seeded(910));
1168
1169 let val_a = vec![10u8; 8];
1171 let grandparent_batch = base
1172 .new_batch::<Sha256>()
1173 .write(key_x, Some(val_a.clone()))
1174 .merkleize(&db, None)
1175 .await
1176 .unwrap();
1177
1178 let val_b = vec![20u8; 8];
1180 let parent_batch = grandparent_batch
1181 .new_batch::<Sha256>()
1182 .write(key_x, Some(val_b.clone()))
1183 .merkleize(&db, None)
1184 .await
1185 .unwrap();
1186
1187 let child_batch = parent_batch
1189 .new_batch::<Sha256>()
1190 .write(key_x, None)
1191 .merkleize(&db, None)
1192 .await
1193 .unwrap();
1194
1195 db.apply_batch(grandparent_batch).await.unwrap();
1196 db.commit().await.unwrap();
1197 assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
1198
1199 db.apply_batch(parent_batch).await.unwrap();
1201 db.commit().await.unwrap();
1202 assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_b);
1203
1204 db.apply_batch(child_batch).await.unwrap();
1206 db.commit().await.unwrap();
1207 assert!(db.get(&key_x).await.unwrap().is_none());
1208
1209 db.destroy().await.unwrap();
1210 });
1211 }
1212
1213 #[test_traced("WARN")]
1219 fn test_new_child_after_ancestor_committed_and_dropped() {
1220 let executor = deterministic::Runner::default();
1221 executor.start(|context| async move {
1222 let mut db = create_test_db(context).await;
1223
1224 apply_ops(&mut db, create_test_ops(5)).await;
1225 db.commit().await.unwrap();
1226
1227 let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(800));
1229 let val_a = vec![10u8; 8];
1230 let a = db
1231 .new_batch()
1232 .write(key_a, Some(val_a.clone()))
1233 .merkleize(&db, None)
1234 .await
1235 .unwrap();
1236
1237 let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(801));
1238 let val_b = vec![20u8; 8];
1239 let b = a
1240 .new_batch::<Sha256>()
1241 .write(key_b, Some(val_b.clone()))
1242 .merkleize(&db, None)
1243 .await
1244 .unwrap();
1245
1246 db.apply_batch(a).await.unwrap();
1248 db.commit().await.unwrap();
1249
1250 let key_c = Digest::random(&mut commonware_utils::test_rng_seeded(802));
1252 let val_c = vec![30u8; 8];
1253 let c = b
1254 .new_batch::<Sha256>()
1255 .write(key_c, Some(val_c.clone()))
1256 .merkleize(&db, None)
1257 .await
1258 .unwrap();
1259
1260 db.apply_batch(b).await.unwrap();
1262 db.commit().await.unwrap();
1263
1264 db.apply_batch(c).await.unwrap();
1266 db.commit().await.unwrap();
1267
1268 assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
1270 assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
1271 assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
1272
1273 db.destroy().await.unwrap();
1274 });
1275 }
1276
1277 #[test_traced("WARN")]
1282 fn test_apply_batch_after_ancestor_dropped_without_commit() {
1283 let executor = deterministic::Runner::default();
1284 executor.start(|context| async move {
1285 let mut db = create_test_db(context).await;
1286
1287 apply_ops(&mut db, create_test_ops(5)).await;
1288 db.commit().await.unwrap();
1289
1290 let base = db.to_batch();
1291
1292 let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(700));
1294 let val_a = vec![1u8; 10];
1295 let a = base
1296 .new_batch::<Sha256>()
1297 .write(key_a, Some(val_a.clone()))
1298 .merkleize(&db, None)
1299 .await
1300 .unwrap();
1301
1302 let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(701));
1303 let val_b = vec![2u8; 10];
1304 let b = a
1305 .new_batch::<Sha256>()
1306 .write(key_b, Some(val_b.clone()))
1307 .merkleize(&db, None)
1308 .await
1309 .unwrap();
1310
1311 let key_c = Digest::random(&mut commonware_utils::test_rng_seeded(702));
1312 let val_c = vec![3u8; 10];
1313 let c = b
1314 .new_batch::<Sha256>()
1315 .write(key_c, Some(val_c.clone()))
1316 .merkleize(&db, None)
1317 .await
1318 .unwrap();
1319
1320 drop(a);
1322 drop(b);
1323
1324 db.apply_batch(c).await.unwrap();
1327 db.commit().await.unwrap();
1328
1329 assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
1331 assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
1332 assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
1333
1334 db.destroy().await.unwrap();
1335 });
1336 }
1337}