1use crate::{
39 index::Unordered as UnorderedIndex,
40 journal::{
41 authenticated,
42 contiguous::{
43 fixed::{Config as FConfig, Journal as FJournal},
44 variable::{Config as VConfig, Journal as VJournal},
45 },
46 },
47 mmr::{journaled::Config as MmrConfig, Location},
48 qmdb::{
49 any::operation::{Operation, Update},
50 operation::{Committable, Key},
51 Error,
52 },
53 translator::Translator,
54};
55use commonware_codec::{Codec, CodecFixedShared, Read};
56use commonware_cryptography::Hasher;
57use commonware_parallel::ThreadPool;
58use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
59use commonware_utils::Array;
60use std::num::{NonZeroU64, NonZeroUsize};
61use tracing::warn;
62
63pub mod batch;
64pub(crate) mod db;
65pub(crate) mod operation;
66#[cfg(any(test, feature = "test-traits"))]
67pub mod traits;
68pub(crate) mod value;
69pub(crate) use value::{FixedValue, ValueEncoding, VariableValue};
70pub mod ordered;
71pub(crate) mod sync;
72pub mod unordered;
73
74#[derive(Clone)]
76pub struct FixedConfig<T: Translator> {
77 pub mmr_journal_partition: String,
79
80 pub mmr_items_per_blob: NonZeroU64,
82
83 pub mmr_write_buffer: NonZeroUsize,
85
86 pub mmr_metadata_partition: String,
88
89 pub log_journal_partition: String,
91
92 pub log_items_per_blob: NonZeroU64,
94
95 pub log_write_buffer: NonZeroUsize,
97
98 pub translator: T,
100
101 pub thread_pool: Option<ThreadPool>,
103
104 pub page_cache: CacheRef,
106}
107
108#[derive(Clone)]
110pub struct VariableConfig<T: Translator, C> {
111 pub mmr_journal_partition: String,
113
114 pub mmr_items_per_blob: NonZeroU64,
116
117 pub mmr_write_buffer: NonZeroUsize,
119
120 pub mmr_metadata_partition: String,
122
123 pub log_partition: String,
125
126 pub log_write_buffer: NonZeroUsize,
128
129 pub log_compression: Option<u8>,
131
132 pub log_codec_config: C,
134
135 pub log_items_per_blob: NonZeroU64,
137
138 pub translator: T,
140
141 pub thread_pool: Option<ThreadPool>,
143
144 pub page_cache: CacheRef,
146}
147
148pub(super) async fn init_fixed<E, K, V, U, H, T, I, F, NewIndex>(
150 context: E,
151 cfg: FixedConfig<T>,
152 known_inactivity_floor: Option<Location>,
153 callback: F,
154 new_index: NewIndex,
155) -> Result<db::Db<E, FJournal<E, Operation<K, V, U>>, I, H, U>, Error>
156where
157 E: Storage + Clock + Metrics,
158 K: Array,
159 V: ValueEncoding,
160 U: Update<K, V> + Send + Sync,
161 H: Hasher,
162 T: Translator,
163 I: UnorderedIndex<Value = Location>,
164 F: FnMut(bool, Option<Location>),
165 NewIndex: FnOnce(E, T) -> I,
166 Operation<K, V, U>: CodecFixedShared + Committable,
167{
168 let mmr_config = MmrConfig {
169 journal_partition: cfg.mmr_journal_partition,
170 metadata_partition: cfg.mmr_metadata_partition,
171 items_per_blob: cfg.mmr_items_per_blob,
172 write_buffer: cfg.mmr_write_buffer,
173 thread_pool: cfg.thread_pool,
174 page_cache: cfg.page_cache.clone(),
175 };
176
177 let journal_config = FConfig {
178 partition: cfg.log_journal_partition,
179 items_per_blob: cfg.log_items_per_blob,
180 write_buffer: cfg.log_write_buffer,
181 page_cache: cfg.page_cache,
182 };
183
184 let mut log = authenticated::Journal::<_, FJournal<_, _>, _>::new(
185 context.with_label("log"),
186 mmr_config,
187 journal_config,
188 Operation::is_commit,
189 )
190 .await?;
191
192 if log.size().await == 0 {
193 warn!("Authenticated log is empty, initializing new db");
194 let commit_floor = Operation::CommitFloor(None, Location::new(0));
195 log.append(&commit_floor).await?;
196 log.sync().await?;
197 }
198
199 let index = new_index(context.with_label("index"), cfg.translator);
200 db::Db::init_from_log(index, log, known_inactivity_floor, callback).await
201}
202
203pub(super) async fn init_variable<E, K, V, U, H, T, I, F, NewIndex>(
205 context: E,
206 cfg: VariableConfig<T, <Operation<K, V, U> as Read>::Cfg>,
207 known_inactivity_floor: Option<Location>,
208 callback: F,
209 new_index: NewIndex,
210) -> Result<db::Db<E, VJournal<E, Operation<K, V, U>>, I, H, U>, Error>
211where
212 E: Storage + Clock + Metrics,
213 K: Key,
214 V: ValueEncoding,
215 U: Update<K, V> + Send + Sync,
216 H: Hasher,
217 T: Translator,
218 I: UnorderedIndex<Value = Location>,
219 F: FnMut(bool, Option<Location>),
220 NewIndex: FnOnce(E, T) -> I,
221 Operation<K, V, U>: Codec + Committable,
222{
223 let mmr_config = MmrConfig {
224 journal_partition: cfg.mmr_journal_partition,
225 metadata_partition: cfg.mmr_metadata_partition,
226 items_per_blob: cfg.mmr_items_per_blob,
227 write_buffer: cfg.mmr_write_buffer,
228 thread_pool: cfg.thread_pool,
229 page_cache: cfg.page_cache.clone(),
230 };
231
232 let journal_config = VConfig {
233 partition: cfg.log_partition,
234 items_per_section: cfg.log_items_per_blob,
235 compression: cfg.log_compression,
236 codec_config: cfg.log_codec_config,
237 page_cache: cfg.page_cache,
238 write_buffer: cfg.log_write_buffer,
239 };
240
241 let mut log = authenticated::Journal::<_, VJournal<_, _>, _>::new(
242 context.with_label("log"),
243 mmr_config,
244 journal_config,
245 Operation::is_commit,
246 )
247 .await?;
248
249 if log.size().await == 0 {
250 warn!("Authenticated log is empty, initializing new db");
251 let commit_floor = Operation::CommitFloor(None, Location::new(0));
252 log.append(&commit_floor).await?;
253 log.sync().await?;
254 }
255
256 let index = new_index(context.with_label("index"), cfg.translator);
257 db::Db::init_from_log(index, log, known_inactivity_floor, callback).await
258}
259
260#[cfg(test)]
261pub(crate) mod test {
263 use super::*;
264 use crate::{
265 qmdb::any::{FixedConfig, VariableConfig},
266 translator::OneCap,
267 };
268 use commonware_utils::{NZUsize, NZU16, NZU64};
269 use std::num::NonZeroU16;
270
271 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
273 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
274
275 pub(crate) fn fixed_db_config<T: Translator + Default>(
276 suffix: &str,
277 pooler: &impl BufferPooler,
278 ) -> FixedConfig<T> {
279 FixedConfig {
280 mmr_journal_partition: format!("journal-{suffix}"),
281 mmr_metadata_partition: format!("metadata-{suffix}"),
282 mmr_items_per_blob: NZU64!(11),
283 mmr_write_buffer: NZUsize!(1024),
284 log_journal_partition: format!("log-journal-{suffix}"),
285 log_items_per_blob: NZU64!(7),
286 log_write_buffer: NZUsize!(1024),
287 translator: T::default(),
288 thread_pool: None,
289 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
290 }
291 }
292
293 pub(crate) fn variable_db_config<T: Translator + Default>(
294 suffix: &str,
295 pooler: &impl BufferPooler,
296 ) -> VariableConfig<T, ((), ())> {
297 VariableConfig {
298 mmr_journal_partition: format!("journal-{suffix}"),
299 mmr_metadata_partition: format!("metadata-{suffix}"),
300 mmr_items_per_blob: NZU64!(11),
301 mmr_write_buffer: NZUsize!(1024),
302 log_partition: format!("log-journal-{suffix}"),
303 log_items_per_blob: NZU64!(7),
304 log_write_buffer: NZUsize!(1024),
305 log_compression: None,
306 log_codec_config: ((), ()),
307 translator: T::default(),
308 thread_pool: None,
309 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
310 }
311 }
312
313 use crate::{
314 kv::Gettable,
315 mmr::Location,
316 qmdb::{
317 any::traits::{DbAny, MerkleizedBatch as _, UnmerkleizedBatch as _},
318 store::MerkleizedStore,
319 },
320 };
321 use commonware_codec::{Codec, CodecShared};
322 use commonware_cryptography::{sha256::Digest, Sha256};
323 use commonware_runtime::{deterministic::Context, BufferPooler};
324 use core::{future::Future, pin::Pin};
325 use std::collections::HashMap;
326
327 pub(crate) async fn test_any_db_non_empty_recovery<D, V: Clone + CodecShared>(
329 context: Context,
330 mut db: D,
331 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
332 make_value: impl Fn(u64) -> V,
333 ) where
334 D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
335 {
336 const ELEMENTS: u64 = 1000;
337
338 let finalized = {
340 let mut batch = db.new_batch();
341 for i in 0u64..ELEMENTS {
342 let k = Sha256::hash(&i.to_be_bytes());
343 let v = make_value(i * 1000);
344 batch.write(k, Some(v));
345 }
346 batch.merkleize(None).await.unwrap().finalize()
347 };
348 db.apply_batch(finalized).await.unwrap();
349 db.prune(db.inactivity_floor_loc().await).await.unwrap();
350 let root = db.root();
351 let op_count = db.size().await;
352 let inactivity_floor_loc = db.inactivity_floor_loc().await;
353
354 let db = reopen_db(context.with_label("reopen1")).await;
355 assert_eq!(db.size().await, op_count);
356 assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
357 assert_eq!(db.root(), root);
358
359 {
361 let mut batch = db.new_batch();
362 for i in 0u64..ELEMENTS {
363 let k = Sha256::hash(&i.to_be_bytes());
364 let v = make_value((i + 1) * 10000);
365 batch.write(k, Some(v));
366 }
367 let _ = batch.merkleize(None).await.unwrap().finalize();
368 }
369 let db = reopen_db(context.with_label("reopen2")).await;
370 assert_eq!(db.size().await, op_count);
371 assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
372 assert_eq!(db.root(), root);
373
374 {
376 let mut batch = db.new_batch();
377 for i in 0u64..ELEMENTS {
378 let k = Sha256::hash(&i.to_be_bytes());
379 let v = make_value((i + 1) * 10000);
380 batch.write(k, Some(v));
381 }
382 let _ = batch.merkleize(None).await.unwrap().finalize();
383 }
384 let db = reopen_db(context.with_label("reopen3")).await;
385 assert_eq!(db.size().await, op_count);
386 assert_eq!(db.root(), root);
387
388 for _ in 0..3 {
390 let mut batch = db.new_batch();
391 for i in 0u64..ELEMENTS {
392 let k = Sha256::hash(&i.to_be_bytes());
393 let v = make_value((i + 1) * 10000);
394 batch.write(k, Some(v));
395 }
396 let _ = batch.merkleize(None).await.unwrap().finalize();
397 }
398 let mut db = reopen_db(context.with_label("reopen4")).await;
399 assert_eq!(db.size().await, op_count);
400 assert_eq!(db.root(), root);
401
402 let finalized = {
404 let mut batch = db.new_batch();
405 for i in 0u64..ELEMENTS {
406 let k = Sha256::hash(&i.to_be_bytes());
407 let v = make_value((i + 1) * 10000);
408 batch.write(k, Some(v));
409 }
410 batch.merkleize(None).await.unwrap().finalize()
411 };
412 db.apply_batch(finalized).await.unwrap();
413 let db = reopen_db(context.with_label("reopen5")).await;
414 assert!(db.size().await > op_count);
415 assert_ne!(db.inactivity_floor_loc().await, inactivity_floor_loc);
416 assert_ne!(db.root(), root);
417
418 db.destroy().await.unwrap();
419 }
420
421 pub(crate) async fn test_any_db_empty_recovery<D, V: Clone + CodecShared>(
423 context: Context,
424 db: D,
425 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
426 make_value: impl Fn(u64) -> V,
427 ) where
428 D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
429 {
430 let root = db.root();
431
432 let db = reopen_db(context.with_label("reopen1")).await;
433 assert_eq!(db.size().await, 1);
434 assert_eq!(db.root(), root);
435
436 {
438 let mut batch = db.new_batch();
439 for i in 0u64..1000 {
440 let k = Sha256::hash(&i.to_be_bytes());
441 let v = make_value((i + 1) * 10000);
442 batch.write(k, Some(v));
443 }
444 let _ = batch.merkleize(None).await.unwrap().finalize();
445 }
446 let db = reopen_db(context.with_label("reopen2")).await;
447 assert_eq!(db.size().await, 1);
448 assert_eq!(db.root(), root);
449
450 {
452 let mut batch = db.new_batch();
453 for i in 0u64..1000 {
454 let k = Sha256::hash(&i.to_be_bytes());
455 let v = make_value((i + 1) * 10000);
456 batch.write(k, Some(v));
457 }
458 let _ = batch.merkleize(None).await.unwrap().finalize();
459 }
460 drop(db);
461 let db = reopen_db(context.with_label("reopen3")).await;
462 assert_eq!(db.size().await, 1);
463 assert_eq!(db.root(), root);
464
465 for _ in 0..3 {
467 let mut batch = db.new_batch();
468 for i in 0u64..1000 {
469 let k = Sha256::hash(&i.to_be_bytes());
470 let v = make_value((i + 1) * 10000);
471 batch.write(k, Some(v));
472 }
473 let _ = batch.merkleize(None).await.unwrap().finalize();
474 }
475 drop(db);
476 let mut db = reopen_db(context.with_label("reopen4")).await;
477 assert_eq!(db.size().await, 1);
478 assert_eq!(db.root(), root);
479
480 let finalized = {
482 let mut batch = db.new_batch();
483 for i in 0u64..1000 {
484 let k = Sha256::hash(&i.to_be_bytes());
485 let v = make_value((i + 1) * 10000);
486 batch.write(k, Some(v));
487 }
488 batch.merkleize(None).await.unwrap().finalize()
489 };
490 db.apply_batch(finalized).await.unwrap();
491 drop(db);
492 let db = reopen_db(context.with_label("reopen5")).await;
493 assert!(db.size().await > 1);
494 assert_ne!(db.root(), root);
495
496 db.destroy().await.unwrap();
497 }
498
499 pub(crate) async fn test_any_db_build_and_authenticate<D, V>(
501 context: Context,
502 mut db: D,
503 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
504 make_value: impl Fn(u64) -> V,
505 ) where
506 D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
507 V: CodecShared + Clone + Eq + std::hash::Hash + std::fmt::Debug,
508 <D as MerkleizedStore>::Operation: Codec,
509 {
510 use crate::{mmr::StandardHasher, qmdb::verify_proof};
511
512 const ELEMENTS: u64 = 1000;
513
514 let mut map = HashMap::<Digest, V>::default();
515 let finalized = {
516 let mut batch = db.new_batch();
517 for i in 0u64..ELEMENTS {
518 let k = Sha256::hash(&i.to_be_bytes());
519 let v = make_value(i * 1000);
520 batch.write(k, Some(v.clone()));
521 map.insert(k, v);
522 }
523
524 for i in 0u64..ELEMENTS {
526 if i % 3 != 0 {
527 continue;
528 }
529 let k = Sha256::hash(&i.to_be_bytes());
530 let v = make_value((i + 1) * 10000);
531 batch.write(k, Some(v.clone()));
532 map.insert(k, v);
533 }
534
535 for i in 0u64..ELEMENTS {
537 if i % 7 != 1 {
538 continue;
539 }
540 let k = Sha256::hash(&i.to_be_bytes());
541 batch.write(k, None);
542 map.remove(&k);
543 }
544
545 batch.merkleize(None).await.unwrap().finalize()
546 };
547 db.apply_batch(finalized).await.unwrap();
549 db.sync().await.unwrap();
550 db.prune(db.inactivity_floor_loc().await).await.unwrap();
551
552 let root = db.root();
554 db.sync().await.unwrap();
555 drop(db);
556 let db = reopen_db(context.with_label("reopened")).await;
557 assert_eq!(root, db.root());
558
559 for i in 0u64..ELEMENTS {
561 let k = Sha256::hash(&i.to_be_bytes());
562 if let Some(map_value) = map.get(&k) {
563 let Some(db_value) = db.get(&k).await.unwrap() else {
564 panic!("key not found in db: {k}");
565 };
566 assert_eq!(*map_value, db_value);
567 } else {
568 assert!(db.get(&k).await.unwrap().is_none());
569 }
570 }
571
572 let mut hasher = StandardHasher::<Sha256>::new();
573 let bounds = db.bounds().await;
574 let inactivity_floor = db.inactivity_floor_loc().await;
575 for loc in *inactivity_floor..*bounds.end {
576 let loc = Location::new(loc);
577 let (proof, ops) = db.proof(loc, NZU64!(10)).await.unwrap();
578 assert!(verify_proof(&mut hasher, &proof, loc, &ops, &root));
579 }
580
581 db.destroy().await.unwrap();
582 }
583
584 pub(crate) async fn test_any_db_log_replay<
586 D,
587 V: Clone + CodecShared + PartialEq + std::fmt::Debug,
588 >(
589 context: Context,
590 mut db: D,
591 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
592 make_value: impl Fn(u64) -> V,
593 ) where
594 D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
595 {
596 const UPDATES: u64 = 100;
598 let k = Sha256::hash(&UPDATES.to_be_bytes());
599 let mut last_value = None;
600 let finalized = {
601 let mut batch = db.new_batch();
602 for i in 0u64..UPDATES {
603 let v = make_value(i * 1000);
604 last_value = Some(v.clone());
605 batch.write(k, Some(v));
606 }
607 batch.merkleize(None).await.unwrap().finalize()
608 };
609 db.apply_batch(finalized).await.unwrap();
610 let root = db.root();
611
612 drop(db);
614 let db = reopen_db(context.with_label("reopened")).await;
615 assert_eq!(db.root(), root);
616 assert_eq!(db.get(&k).await.unwrap(), last_value);
617
618 db.destroy().await.unwrap();
619 }
620
621 pub(crate) async fn test_any_db_historical_proof_basic<D, V: Clone + CodecShared>(
623 _context: Context,
624 mut db: D,
625 make_value: impl Fn(u64) -> V,
626 ) where
627 D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
628 <D as MerkleizedStore>::Operation: Codec + PartialEq + std::fmt::Debug,
629 {
630 use crate::{mmr::StandardHasher, qmdb::verify_proof};
631 use commonware_utils::NZU64;
632
633 const OPS: u64 = 20;
635 let finalized = {
636 let mut batch = db.new_batch();
637 for i in 0u64..OPS {
638 let k = Sha256::hash(&i.to_be_bytes());
639 let v = make_value(i * 1000);
640 batch.write(k, Some(v));
641 }
642 batch.merkleize(None).await.unwrap().finalize()
643 };
644 db.apply_batch(finalized).await.unwrap();
645 let root_hash = db.root();
646 let original_op_count = db.size().await;
647
648 let max_ops = NZU64!(10);
650 let start_loc = Location::new(5);
651 let (historical_proof, historical_ops) = db
652 .historical_proof(original_op_count, start_loc, max_ops)
653 .await
654 .unwrap();
655 let (regular_proof, regular_ops) = db.proof(start_loc, max_ops).await.unwrap();
656
657 assert_eq!(historical_proof.leaves, regular_proof.leaves);
658 assert_eq!(historical_proof.digests, regular_proof.digests);
659 assert_eq!(historical_ops, regular_ops);
660 let mut hasher = StandardHasher::<Sha256>::new();
661 assert!(verify_proof(
662 &mut hasher,
663 &historical_proof,
664 start_loc,
665 &historical_ops,
666 &root_hash
667 ));
668
669 let finalized = {
671 let mut batch = db.new_batch();
672 for i in OPS..(OPS + 5) {
673 let k = Sha256::hash(&(i + 1000).to_be_bytes()); let v = make_value(i * 1000);
675 batch.write(k, Some(v));
676 }
677 batch.merkleize(None).await.unwrap().finalize()
678 };
679 db.apply_batch(finalized).await.unwrap();
680
681 let (historical_proof2, historical_ops2) = db
683 .historical_proof(original_op_count, start_loc, max_ops)
684 .await
685 .unwrap();
686 assert_eq!(historical_proof2.leaves, original_op_count);
687 assert_eq!(historical_proof2.digests, regular_proof.digests);
688 assert_eq!(historical_ops2, regular_ops);
689 assert!(verify_proof(
690 &mut hasher,
691 &historical_proof2,
692 start_loc,
693 &historical_ops2,
694 &root_hash
695 ));
696
697 db.destroy().await.unwrap();
698 }
699
700 pub(crate) async fn test_any_db_historical_proof_invalid<D, V: Clone + CodecShared>(
702 _context: Context,
703 mut db: D,
704 make_value: impl Fn(u64) -> V,
705 ) where
706 D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
707 <D as MerkleizedStore>::Operation: Codec + PartialEq + std::fmt::Debug + Clone,
708 {
709 use crate::{mmr::StandardHasher, qmdb::verify_proof};
710 use commonware_utils::NZU64;
711
712 let finalized = {
714 let mut batch = db.new_batch();
715 for i in 0u64..10 {
716 let k = Sha256::hash(&i.to_be_bytes());
717 let v = make_value(i * 1000);
718 batch.write(k, Some(v));
719 }
720 batch.merkleize(None).await.unwrap().finalize()
721 };
722 db.apply_batch(finalized).await.unwrap();
723
724 let historical_op_count = Location::new(5);
725 let (proof, ops) = db
726 .historical_proof(historical_op_count, Location::new(1), NZU64!(10))
727 .await
728 .unwrap();
729 assert_eq!(proof.leaves, historical_op_count);
730 assert_eq!(ops.len(), 4);
731
732 let mut hasher = StandardHasher::<Sha256>::new();
733
734 {
736 let mut tampered_proof = proof.clone();
737 tampered_proof.digests[0] = Sha256::hash(b"invalid");
738 let root_hash = db.root();
739 assert!(!verify_proof(
740 &mut hasher,
741 &tampered_proof,
742 Location::new(1),
743 &ops,
744 &root_hash
745 ));
746 }
747
748 {
750 let mut tampered_proof = proof.clone();
751 tampered_proof.digests.push(Sha256::hash(b"invalid"));
752 let root_hash = db.root();
753 assert!(!verify_proof(
754 &mut hasher,
755 &tampered_proof,
756 Location::new(1),
757 &ops,
758 &root_hash
759 ));
760 }
761
762 {
764 let root_hash = db.root();
765 let mut tampered_ops = ops.clone();
766 if tampered_ops.len() >= 2 {
768 tampered_ops.swap(0, 1);
769 assert!(!verify_proof(
770 &mut hasher,
771 &proof,
772 Location::new(1),
773 &tampered_ops,
774 &root_hash
775 ));
776 }
777 }
778
779 {
781 let root_hash = db.root();
782 let mut tampered_ops = ops.clone();
783 tampered_ops.push(tampered_ops[0].clone());
784 assert!(!verify_proof(
785 &mut hasher,
786 &proof,
787 Location::new(1),
788 &tampered_ops,
789 &root_hash
790 ));
791 }
792
793 {
795 let root_hash = db.root();
796 assert!(!verify_proof(
797 &mut hasher,
798 &proof,
799 Location::new(2),
800 &ops,
801 &root_hash
802 ));
803 }
804
805 {
807 let invalid_root = Sha256::hash(b"invalid");
808 assert!(!verify_proof(
809 &mut hasher,
810 &proof,
811 Location::new(1),
812 &ops,
813 &invalid_root
814 ));
815 }
816
817 {
819 let mut tampered_proof = proof.clone();
820 tampered_proof.leaves = Location::new(100);
821 let root_hash = db.root();
822 assert!(!verify_proof(
823 &mut hasher,
824 &tampered_proof,
825 Location::new(1),
826 &ops,
827 &root_hash
828 ));
829 }
830
831 db.destroy().await.unwrap();
832 }
833
834 pub(crate) async fn test_any_db_historical_proof_edge_cases<D, V: Clone + CodecShared>(
836 _context: Context,
837 mut db: D,
838 make_value: impl Fn(u64) -> V,
839 ) where
840 D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
841 <D as MerkleizedStore>::Operation: Codec + PartialEq + std::fmt::Debug,
842 {
843 use commonware_utils::NZU64;
844
845 let finalized = {
847 let mut batch = db.new_batch();
848 for i in 0u64..50 {
849 let k = Sha256::hash(&i.to_be_bytes());
850 let v = make_value(i * 1000);
851 batch.write(k, Some(v));
852 }
853 batch.merkleize(None).await.unwrap().finalize()
854 };
855 db.apply_batch(finalized).await.unwrap();
856
857 let (single_proof, single_ops) = db
859 .historical_proof(Location::new(2), Location::new(1), NZU64!(1))
860 .await
861 .unwrap();
862 assert_eq!(single_proof.leaves, Location::new(2));
863 assert_eq!(single_ops.len(), 1);
864
865 let (_limited_proof, limited_ops) = db
867 .historical_proof(Location::new(11), Location::new(6), NZU64!(20))
868 .await
869 .unwrap();
870 assert_eq!(limited_ops.len(), 5); let (min_proof, min_ops) = db
874 .historical_proof(Location::new(4), Location::new(1), NZU64!(3))
875 .await
876 .unwrap();
877 assert_eq!(min_proof.leaves, Location::new(4));
878 assert_eq!(min_ops.len(), 3);
879
880 db.destroy().await.unwrap();
881 }
882
883 pub(crate) async fn test_any_db_multiple_commits_delete_replayed<D, V>(
885 context: Context,
886 mut db: D,
887 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
888 make_value: impl Fn(u64) -> V,
889 ) where
890 D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
891 V: Clone + CodecShared + Eq + std::fmt::Debug,
892 {
893 let mut map = HashMap::<Digest, V>::default();
894 const ELEMENTS: u64 = 10;
895 let metadata_value = make_value(42);
896 let key_at = |j: u64, i: u64| Sha256::hash(&(j * 1000 + i).to_be_bytes());
897 for j in 0u64..ELEMENTS {
898 let finalized = {
899 let mut batch = db.new_batch();
900 for i in 0u64..ELEMENTS {
901 let k = key_at(j, i);
902 let v = make_value(i * 1000);
903 batch.write(k, Some(v.clone()));
904 map.insert(k, v);
905 }
906 batch
907 .merkleize(Some(metadata_value.clone()))
908 .await
909 .unwrap()
910 .finalize()
911 };
912 db.apply_batch(finalized).await.unwrap();
913 }
914 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_value));
915 let k = key_at(ELEMENTS - 1, ELEMENTS - 1);
916
917 let finalized = {
918 let mut batch = db.new_batch();
919 batch.write(k, None);
920 batch.merkleize(None).await.unwrap().finalize()
921 };
922 db.apply_batch(finalized).await.unwrap();
923 assert_eq!(db.get_metadata().await.unwrap(), None);
924 assert!(db.get(&k).await.unwrap().is_none());
925
926 let root = db.root();
927 drop(db);
928 let db = reopen_db(context.with_label("reopened")).await;
929 assert_eq!(root, db.root());
930 assert_eq!(db.get_metadata().await.unwrap(), None);
931 assert!(db.get(&k).await.unwrap().is_none());
932
933 db.destroy().await.unwrap();
934 }
935
936 use crate::qmdb::any::{
937 ordered::{fixed::Db as OrderedFixedDb, variable::Db as OrderedVariableDb},
938 unordered::{fixed::Db as UnorderedFixedDb, variable::Db as UnorderedVariableDb},
939 };
940 use commonware_macros::{test_group, test_traced};
941 use commonware_runtime::{deterministic, Runner as _};
942
943 type UnorderedFixed = UnorderedFixedDb<Context, Digest, Digest, Sha256, OneCap>;
945 type UnorderedVariable = UnorderedVariableDb<Context, Digest, Digest, Sha256, OneCap>;
946 type OrderedFixed = OrderedFixedDb<Context, Digest, Digest, Sha256, OneCap>;
947 type OrderedVariable = OrderedVariableDb<Context, Digest, Digest, Sha256, OneCap>;
948 type UnorderedFixedP1 =
949 unordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1>;
950 type UnorderedVariableP1 =
951 unordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1>;
952 type OrderedFixedP1 =
953 ordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1>;
954 type OrderedVariableP1 =
955 ordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1>;
956 type UnorderedFixedP2 =
957 unordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2>;
958 type UnorderedVariableP2 =
959 unordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2>;
960 type OrderedFixedP2 =
961 ordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2>;
962 type OrderedVariableP2 =
963 ordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2>;
964
965 #[inline]
966 fn to_digest(i: u64) -> Digest {
967 Sha256::hash(&i.to_be_bytes())
968 }
969
970 macro_rules! with_all_variants {
972 ($cb:ident!($($args:tt)*)) => {
973 $cb!($($args)*, "uf", UnorderedFixed, fixed_db_config);
974 $cb!($($args)*, "uv", UnorderedVariable, variable_db_config);
975 $cb!($($args)*, "of", OrderedFixed, fixed_db_config);
976 $cb!($($args)*, "ov", OrderedVariable, variable_db_config);
977 $cb!($($args)*, "ufp1", UnorderedFixedP1, fixed_db_config);
978 $cb!($($args)*, "uvp1", UnorderedVariableP1, variable_db_config);
979 $cb!($($args)*, "ofp1", OrderedFixedP1, fixed_db_config);
980 $cb!($($args)*, "ovp1", OrderedVariableP1, variable_db_config);
981 $cb!($($args)*, "ufp2", UnorderedFixedP2, fixed_db_config);
982 $cb!($($args)*, "uvp2", UnorderedVariableP2, variable_db_config);
983 $cb!($($args)*, "ofp2", OrderedFixedP2, fixed_db_config);
984 $cb!($($args)*, "ovp2", OrderedVariableP2, variable_db_config);
985 };
986 }
987
988 macro_rules! test_with_reopen {
991 ($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
992 let p = concat!($l, "_", $sfx);
993 Box::pin(async {
994 let ctx = $ctx.with_label($l);
995 let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
996 .await
997 .unwrap();
998 $f(
999 ctx,
1000 db,
1001 |ctx| {
1002 Box::pin(async move {
1003 <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1004 .await
1005 .unwrap()
1006 })
1007 },
1008 to_digest,
1009 )
1010 .await;
1011 })
1012 .await
1013 }};
1014 }
1015
1016 macro_rules! test_with_make_value {
1017 ($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
1018 let p = concat!($l, "_", $sfx);
1019 Box::pin(async {
1020 let ctx = $ctx.with_label($l);
1021 let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1022 .await
1023 .unwrap();
1024 $f(ctx, db, to_digest).await;
1025 })
1026 .await
1027 }};
1028 }
1029
1030 macro_rules! for_all_variants {
1032 ($ctx:expr, $sfx:expr, with_reopen: $f:expr) => {{
1033 with_all_variants!(test_with_reopen!($ctx, $sfx, $f));
1034 }};
1035 ($ctx:expr, $sfx:expr, with_make_value: $f:expr) => {{
1036 with_all_variants!(test_with_make_value!($ctx, $sfx, $f));
1037 }};
1038 }
1039
1040 #[test_group("slow")]
1041 #[test_traced("WARN")]
1042 fn test_all_variants_log_replay() {
1043 let executor = deterministic::Runner::default();
1044 executor.start(|context| async move {
1045 for_all_variants!(context, "lr", with_reopen: test_any_db_log_replay);
1046 });
1047 }
1048
1049 #[test_group("slow")]
1050 #[test_traced("WARN")]
1051 fn test_all_variants_build_and_authenticate() {
1052 let executor = deterministic::Runner::default();
1053 executor.start(|context| async move {
1054 for_all_variants!(context, "baa", with_reopen: test_any_db_build_and_authenticate);
1055 });
1056 }
1057
1058 #[test_group("slow")]
1059 #[test_traced("WARN")]
1060 fn test_all_variants_historical_proof_basic() {
1061 let executor = deterministic::Runner::default();
1062 executor.start(|context| async move {
1063 for_all_variants!(context, "hpb", with_make_value: test_any_db_historical_proof_basic);
1064 });
1065 }
1066
1067 #[test_group("slow")]
1068 #[test_traced("WARN")]
1069 fn test_all_variants_historical_proof_invalid() {
1070 let executor = deterministic::Runner::default();
1071 executor.start(|context| async move {
1072 for_all_variants!(context, "hpi", with_make_value: test_any_db_historical_proof_invalid);
1073 });
1074 }
1075
1076 #[test_group("slow")]
1077 #[test_traced("WARN")]
1078 fn test_all_variants_historical_proof_edge_cases() {
1079 let executor = deterministic::Runner::default();
1080 executor.start(|context| async move {
1081 for_all_variants!(context, "hpec", with_make_value: test_any_db_historical_proof_edge_cases);
1082 });
1083 }
1084
1085 #[test_group("slow")]
1086 #[test_traced("WARN")]
1087 fn test_all_variants_multiple_commits_delete_replayed() {
1088 let executor = deterministic::Runner::default();
1089 executor.start(|context| async move {
1090 for_all_variants!(context, "mcdr", with_reopen: test_any_db_multiple_commits_delete_replayed);
1091 });
1092 }
1093
1094 #[test_group("slow")]
1095 #[test_traced("WARN")]
1096 fn test_all_variants_non_empty_recovery() {
1097 let executor = deterministic::Runner::default();
1098 executor.start(|context| async move {
1099 for_all_variants!(context, "ner", with_reopen: test_any_db_non_empty_recovery);
1100 });
1101 }
1102
1103 #[test_group("slow")]
1104 #[test_traced("WARN")]
1105 fn test_all_variants_empty_recovery() {
1106 let executor = deterministic::Runner::default();
1107 executor.start(|context| async move {
1108 for_all_variants!(context, "er", with_reopen: test_any_db_empty_recovery);
1109 });
1110 }
1111
1112 fn key(i: u64) -> Digest {
1113 Sha256::hash(&i.to_be_bytes())
1114 }
1115
1116 fn val(i: u64) -> Digest {
1117 Sha256::hash(&(i + 10000).to_be_bytes())
1118 }
1119
1120 async fn commit_writes(
1122 db: &mut UnorderedVariable,
1123 writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1124 metadata: Option<Digest>,
1125 ) -> std::ops::Range<Location> {
1126 let mut batch = db.new_batch();
1127 for (k, v) in writes {
1128 batch.write(k, v);
1129 }
1130 let finalized = batch.merkleize(metadata).await.unwrap().finalize();
1131 db.apply_batch(finalized).await.unwrap()
1132 }
1133
1134 #[test_traced("INFO")]
1136 fn test_any_batch_empty() {
1137 let executor = deterministic::Runner::default();
1138 executor.start(|context| async move {
1139 let ctx = context.with_label("db");
1140 let mut db: UnorderedVariable =
1141 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("e", &ctx))
1142 .await
1143 .unwrap();
1144
1145 let root_before = db.root();
1146 let batch = db.new_batch();
1147 let finalized = batch.merkleize(None).await.unwrap().finalize();
1148 db.apply_batch(finalized).await.unwrap();
1149
1150 assert_ne!(db.root(), root_before);
1152
1153 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1155 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1156
1157 db.destroy().await.unwrap();
1158 });
1159 }
1160
1161 #[test_traced("INFO")]
1163 fn test_any_batch_metadata() {
1164 let executor = deterministic::Runner::default();
1165 executor.start(|context| async move {
1166 let ctx = context.with_label("db");
1167 let mut db: UnorderedVariable =
1168 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("m", &ctx))
1169 .await
1170 .unwrap();
1171
1172 let metadata = val(42);
1173
1174 commit_writes(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
1176 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1177
1178 let batch = db.new_batch();
1180 let finalized = batch.merkleize(None).await.unwrap().finalize();
1181 db.apply_batch(finalized).await.unwrap();
1182 assert_eq!(db.get_metadata().await.unwrap(), None);
1183
1184 db.destroy().await.unwrap();
1185 });
1186 }
1187
1188 #[test_traced("INFO")]
1191 fn test_any_batch_get_read_through() {
1192 let executor = deterministic::Runner::default();
1193 executor.start(|context| async move {
1194 let ctx = context.with_label("db");
1195 let mut db: UnorderedVariable =
1196 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("g", &ctx))
1197 .await
1198 .unwrap();
1199
1200 let ka = key(0);
1202 let va = val(0);
1203 commit_writes(&mut db, [(ka, Some(va))], None).await;
1204
1205 let kb = key(1);
1206 let vb = val(1);
1207 let kc = key(2);
1208
1209 let mut batch = db.new_batch();
1210
1211 assert_eq!(batch.get(&ka).await.unwrap(), Some(va));
1213
1214 batch.write(kb, Some(vb));
1216 assert_eq!(batch.get(&kb).await.unwrap(), Some(vb));
1217
1218 assert_eq!(batch.get(&kc).await.unwrap(), None);
1220
1221 let va2 = val(100);
1223 batch.write(ka, Some(va2));
1224 assert_eq!(batch.get(&ka).await.unwrap(), Some(va2));
1225
1226 batch.write(ka, None);
1228 assert_eq!(batch.get(&ka).await.unwrap(), None);
1229
1230 db.destroy().await.unwrap();
1231 });
1232 }
1233
1234 #[test_traced("INFO")]
1236 fn test_any_batch_get_on_merkleized() {
1237 let executor = deterministic::Runner::default();
1238 executor.start(|context| async move {
1239 let ctx = context.with_label("db");
1240 let mut db: UnorderedVariable =
1241 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("mg", &ctx))
1242 .await
1243 .unwrap();
1244
1245 let ka = key(0);
1246 let kb = key(1);
1247 let kc = key(2);
1248 let kd = key(3);
1249
1250 commit_writes(&mut db, [(ka, Some(val(0))), (kb, Some(val(1)))], None).await;
1252
1253 let va2 = val(100);
1255 let vc = val(2);
1256 let mut batch = db.new_batch();
1257 batch.write(ka, Some(va2));
1258 batch.write(kb, None);
1259 batch.write(kc, Some(vc));
1260 let merkleized = batch.merkleize(None).await.unwrap();
1261
1262 assert_eq!(merkleized.get(&ka).await.unwrap(), Some(va2));
1263 assert_eq!(merkleized.get(&kb).await.unwrap(), None);
1264 assert_eq!(merkleized.get(&kc).await.unwrap(), Some(vc));
1265 assert_eq!(merkleized.get(&kd).await.unwrap(), None);
1266
1267 db.destroy().await.unwrap();
1268 });
1269 }
1270
1271 #[test_traced("INFO")]
1273 fn test_any_batch_stacked_get() {
1274 let executor = deterministic::Runner::default();
1275 executor.start(|context| async move {
1276 let ctx = context.with_label("db");
1277 let db: UnorderedVariable =
1278 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("sg", &ctx))
1279 .await
1280 .unwrap();
1281
1282 let ka = key(0);
1283 let kb = key(1);
1284
1285 let mut batch = db.new_batch();
1287 batch.write(ka, Some(val(0)));
1288 let merkleized = batch.merkleize(None).await.unwrap();
1289
1290 let mut child = merkleized.new_batch();
1292 assert_eq!(child.get(&ka).await.unwrap(), Some(val(0)));
1293
1294 child.write(ka, Some(val(100)));
1296 assert_eq!(child.get(&ka).await.unwrap(), Some(val(100)));
1297
1298 child.write(kb, Some(val(1)));
1300 assert_eq!(child.get(&kb).await.unwrap(), Some(val(1)));
1301
1302 child.write(ka, None);
1304 assert_eq!(child.get(&ka).await.unwrap(), None);
1305
1306 db.destroy().await.unwrap();
1307 });
1308 }
1309
1310 #[test_traced("INFO")]
1312 fn test_any_batch_stacked_delete_recreate() {
1313 let executor = deterministic::Runner::default();
1314 executor.start(|context| async move {
1315 let ctx = context.with_label("db");
1316 let mut db: UnorderedVariable =
1317 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dr", &ctx))
1318 .await
1319 .unwrap();
1320
1321 let ka = key(0);
1322
1323 commit_writes(&mut db, [(ka, Some(val(0)))], None).await;
1325
1326 let mut parent = db.new_batch();
1328 parent.write(ka, None);
1329 let parent_m = parent.merkleize(None).await.unwrap();
1330 assert_eq!(parent_m.get(&ka).await.unwrap(), None);
1331
1332 let mut child = parent_m.new_batch();
1334 child.write(ka, Some(val(200)));
1335 let child_m = child.merkleize(None).await.unwrap();
1336 assert_eq!(child_m.get(&ka).await.unwrap(), Some(val(200)));
1337
1338 let finalized = child_m.finalize();
1340 db.apply_batch(finalized).await.unwrap();
1341 assert_eq!(db.get(&ka).await.unwrap(), Some(val(200)));
1342
1343 db.destroy().await.unwrap();
1344 });
1345 }
1346
1347 #[test_traced("INFO")]
1350 fn test_any_batch_floor_raise() {
1351 let executor = deterministic::Runner::default();
1352 executor.start(|context| async move {
1353 let ctx = context.with_label("db");
1354 let mut db: UnorderedVariable =
1355 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("fr", &ctx))
1356 .await
1357 .unwrap();
1358
1359 let init: Vec<_> = (0..100).map(|i| (key(i), Some(val(i)))).collect();
1361 commit_writes(&mut db, init, None).await;
1362
1363 let floor_before = db.inactivity_floor_loc();
1364
1365 let updates: Vec<_> = (0..30).map(|i| (key(i), Some(val(i + 500)))).collect();
1367 commit_writes(&mut db, updates, None).await;
1368
1369 assert!(db.inactivity_floor_loc() > floor_before);
1371
1372 for i in 0..30 {
1374 assert_eq!(
1375 db.get(&key(i)).await.unwrap(),
1376 Some(val(i + 500)),
1377 "updated key {i} mismatch"
1378 );
1379 }
1380 for i in 30..100 {
1381 assert_eq!(
1382 db.get(&key(i)).await.unwrap(),
1383 Some(val(i)),
1384 "untouched key {i} mismatch"
1385 );
1386 }
1387
1388 db.destroy().await.unwrap();
1389 });
1390 }
1391
1392 #[test_traced("INFO")]
1394 fn test_any_batch_apply_returns_range() {
1395 let executor = deterministic::Runner::default();
1396 executor.start(|context| async move {
1397 let ctx = context.with_label("db");
1398 let mut db: UnorderedVariable =
1399 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ar", &ctx))
1400 .await
1401 .unwrap();
1402
1403 let writes: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1405 let range1 = commit_writes(&mut db, writes, None).await;
1406
1407 assert_eq!(range1.start, Location::new(1));
1409 assert!(range1.end.saturating_sub(*range1.start) >= 6);
1411
1412 let writes: Vec<_> = (5..10).map(|i| (key(i), Some(val(i)))).collect();
1414 let range2 = commit_writes(&mut db, writes, None).await;
1415 assert_eq!(range2.start, range1.end);
1416
1417 db.destroy().await.unwrap();
1418 });
1419 }
1420
1421 #[test_traced("INFO")]
1423 fn test_any_batch_deep_chain() {
1424 let executor = deterministic::Runner::default();
1425 executor.start(|context| async move {
1426 let ctx = context.with_label("db");
1427 let mut db: UnorderedVariable =
1428 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dc", &ctx))
1429 .await
1430 .unwrap();
1431
1432 let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1434 commit_writes(&mut db, init, None).await;
1435
1436 let mut parent = db.new_batch();
1438 parent.write(key(0), Some(val(100)));
1439 parent.write(key(5), Some(val(5)));
1440 let parent_m = parent.merkleize(None).await.unwrap();
1441
1442 let mut child = parent_m.new_batch();
1444 child.write(key(1), Some(val(101)));
1445 child.write(key(6), Some(val(6)));
1446 let child_m = child.merkleize(None).await.unwrap();
1447
1448 let mut grandchild = child_m.new_batch();
1450 grandchild.write(key(2), None);
1451 grandchild.write(key(7), Some(val(7)));
1452 let grandchild_m = grandchild.merkleize(None).await.unwrap();
1453
1454 assert_eq!(grandchild_m.get(&key(0)).await.unwrap(), Some(val(100)));
1456 assert_eq!(grandchild_m.get(&key(1)).await.unwrap(), Some(val(101)));
1457 assert_eq!(grandchild_m.get(&key(2)).await.unwrap(), None);
1458 assert_eq!(grandchild_m.get(&key(7)).await.unwrap(), Some(val(7)));
1459
1460 let finalized = grandchild_m.finalize();
1462 db.apply_batch(finalized).await.unwrap();
1463
1464 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1465 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(101)));
1466 assert_eq!(db.get(&key(2)).await.unwrap(), None);
1467 assert_eq!(db.get(&key(3)).await.unwrap(), Some(val(3)));
1468 assert_eq!(db.get(&key(4)).await.unwrap(), Some(val(4)));
1469 assert_eq!(db.get(&key(5)).await.unwrap(), Some(val(5)));
1470 assert_eq!(db.get(&key(6)).await.unwrap(), Some(val(6)));
1471 assert_eq!(db.get(&key(7)).await.unwrap(), Some(val(7)));
1472
1473 db.destroy().await.unwrap();
1474 });
1475 }
1476
1477 #[test_traced("INFO")]
1479 fn test_any_batch_chain_matches_sequential() {
1480 let executor = deterministic::Runner::default();
1481 executor.start(|context| async move {
1482 let ctx = context.with_label("db");
1483
1484 let ctx_a = ctx.with_label("a");
1486 let mut db_a: UnorderedVariable = UnorderedVariableDb::init(
1487 ctx_a.clone(),
1488 variable_db_config::<OneCap>("cms-a", &ctx_a),
1489 )
1490 .await
1491 .unwrap();
1492
1493 let ctx_b = ctx.with_label("b");
1495 let mut db_b: UnorderedVariable = UnorderedVariableDb::init(
1496 ctx_b.clone(),
1497 variable_db_config::<OneCap>("cms-b", &ctx_b),
1498 )
1499 .await
1500 .unwrap();
1501
1502 let writes1: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1504
1505 let writes2 = vec![
1507 (key(0), Some(val(100))),
1508 (key(1), None),
1509 (key(5), Some(val(5))),
1510 ];
1511
1512 commit_writes(&mut db_a, writes1.clone(), None).await;
1514 commit_writes(&mut db_a, writes2.clone(), None).await;
1515
1516 let mut parent = db_b.new_batch();
1518 for (k, v) in &writes1 {
1519 parent.write(*k, *v);
1520 }
1521 let parent_m = parent.merkleize(None).await.unwrap();
1522
1523 let mut child = parent_m.new_batch();
1524 for (k, v) in &writes2 {
1525 child.write(*k, *v);
1526 }
1527 let child_m = child.merkleize(None).await.unwrap();
1528 let finalized = child_m.finalize();
1529 db_b.apply_batch(finalized).await.unwrap();
1530
1531 assert_eq!(db_a.root(), db_b.root());
1533 for i in 0..6 {
1534 assert_eq!(
1535 db_a.get(&key(i)).await.unwrap(),
1536 db_b.get(&key(i)).await.unwrap(),
1537 "key {i} mismatch"
1538 );
1539 }
1540
1541 db_a.destroy().await.unwrap();
1542 db_b.destroy().await.unwrap();
1543 });
1544 }
1545
1546 #[test_traced("INFO")]
1548 fn test_any_batch_create_then_delete_same_batch() {
1549 let executor = deterministic::Runner::default();
1550 executor.start(|context| async move {
1551 let ctx = context.with_label("db");
1552 let mut db: UnorderedVariable =
1553 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("cd", &ctx))
1554 .await
1555 .unwrap();
1556
1557 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1559
1560 let mut batch = db.new_batch();
1562 batch.write(key(1), Some(val(1))); batch.write(key(1), None); batch.write(key(2), Some(val(2))); batch.write(key(0), None); let finalized = batch.merkleize(None).await.unwrap().finalize();
1567 db.apply_batch(finalized).await.unwrap();
1568
1569 assert_eq!(db.get(&key(0)).await.unwrap(), None);
1570 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1571 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1572
1573 db.destroy().await.unwrap();
1574 });
1575 }
1576
1577 #[test_traced("INFO")]
1579 fn test_any_batch_delete_all_keys() {
1580 let executor = deterministic::Runner::default();
1581 executor.start(|context| async move {
1582 let ctx = context.with_label("db");
1583 let mut db: UnorderedVariable =
1584 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("da", &ctx))
1585 .await
1586 .unwrap();
1587
1588 let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1590 commit_writes(&mut db, init, None).await;
1591
1592 let deletes: Vec<_> = (0..5).map(|i| (key(i), None)).collect();
1594 commit_writes(&mut db, deletes, None).await;
1595
1596 for i in 0..5 {
1597 assert_eq!(db.get(&key(i)).await.unwrap(), None, "key {i} not deleted");
1598 }
1599
1600 commit_writes(&mut db, [(key(10), Some(val(10)))], None).await;
1602 assert_eq!(db.get(&key(10)).await.unwrap(), Some(val(10)));
1603
1604 db.destroy().await.unwrap();
1605 });
1606 }
1607
1608 #[test_traced("INFO")]
1610 fn test_any_batch_parallel_forks() {
1611 let executor = deterministic::Runner::default();
1612 executor.start(|context| async move {
1613 let ctx = context.with_label("db");
1614 let mut db: UnorderedVariable =
1615 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("pf", &ctx))
1616 .await
1617 .unwrap();
1618
1619 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1621 let root_before = db.root();
1622
1623 let mut fork_a = db.new_batch();
1625 fork_a.write(key(0), Some(val(100)));
1626 fork_a.write(key(1), Some(val(1)));
1627 let fork_a_m = fork_a.merkleize(None).await.unwrap();
1628
1629 let mut fork_b = db.new_batch();
1631 fork_b.write(key(0), None);
1632 fork_b.write(key(2), Some(val(2)));
1633 let fork_b_m = fork_b.merkleize(None).await.unwrap();
1634
1635 assert_ne!(fork_a_m.root(), fork_b_m.root());
1637
1638 assert_eq!(db.root(), root_before);
1640 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1641 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1642
1643 let finalized = fork_a_m.finalize();
1645 db.apply_batch(finalized).await.unwrap();
1646 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1647 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1648 assert_eq!(db.get(&key(2)).await.unwrap(), None);
1649
1650 db.destroy().await.unwrap();
1651 });
1652 }
1653
1654 #[test_traced("INFO")]
1656 fn test_any_batch_floor_raise_chained() {
1657 let executor = deterministic::Runner::default();
1658 executor.start(|context| async move {
1659 let ctx = context.with_label("db");
1660 let mut db: UnorderedVariable =
1661 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("frc", &ctx))
1662 .await
1663 .unwrap();
1664
1665 let init: Vec<_> = (0..50).map(|i| (key(i), Some(val(i)))).collect();
1667 commit_writes(&mut db, init, None).await;
1668 let floor_before = db.inactivity_floor_loc();
1669
1670 let mut parent = db.new_batch();
1672 for i in 0..20 {
1673 parent.write(key(i), Some(val(i + 500)));
1674 }
1675 let parent_m = parent.merkleize(None).await.unwrap();
1676
1677 let mut child = parent_m.new_batch();
1679 for i in 20..30 {
1680 child.write(key(i), Some(val(i + 500)));
1681 }
1682 let child_m = child.merkleize(None).await.unwrap();
1683
1684 let finalized = child_m.finalize();
1685 db.apply_batch(finalized).await.unwrap();
1686
1687 assert!(db.inactivity_floor_loc() > floor_before);
1689
1690 for i in 0..30 {
1692 assert_eq!(
1693 db.get(&key(i)).await.unwrap(),
1694 Some(val(i + 500)),
1695 "updated key {i} mismatch"
1696 );
1697 }
1698 for i in 30..50 {
1699 assert_eq!(
1700 db.get(&key(i)).await.unwrap(),
1701 Some(val(i)),
1702 "untouched key {i} mismatch"
1703 );
1704 }
1705
1706 db.destroy().await.unwrap();
1707 });
1708 }
1709
1710 #[test_traced("INFO")]
1712 fn test_any_batch_abandoned() {
1713 let executor = deterministic::Runner::default();
1714 executor.start(|context| async move {
1715 let ctx = context.with_label("db");
1716 let mut db: UnorderedVariable =
1717 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ab", &ctx))
1718 .await
1719 .unwrap();
1720
1721 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1722 let root_before = db.root();
1723
1724 {
1726 let mut batch = db.new_batch();
1727 batch.write(key(0), Some(val(999)));
1728 batch.write(key(1), Some(val(1)));
1729 let _merkleized = batch.merkleize(None).await.unwrap();
1730 }
1732
1733 assert_eq!(db.root(), root_before);
1735 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1736 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1737
1738 db.destroy().await.unwrap();
1739 });
1740 }
1741}