1use crate::{
15 index::{unordered::Index, Unordered as _},
16 journal::{
17 authenticated,
18 contiguous::{
19 variable::{self, Config as JournalConfig},
20 Contiguous as _, Reader,
21 },
22 },
23 kv,
24 mmr::{
25 journaled::{Config as MmrConfig, Mmr},
26 Location, Proof,
27 },
28 qmdb::{any::VariableValue, build_snapshot_from_log, Error},
29 translator::Translator,
30};
31use commonware_codec::Read;
32use commonware_cryptography::Hasher as CHasher;
33use commonware_parallel::ThreadPool;
34use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
35use commonware_utils::Array;
36use std::{
37 collections::BTreeMap,
38 num::{NonZeroU64, NonZeroUsize},
39 ops::Range,
40 sync::Arc,
41};
42use tracing::warn;
43
44pub mod batch;
45mod operation;
46pub use operation::Operation;
47
48type Journal<E, K, V, H> = authenticated::Journal<E, variable::Journal<E, Operation<K, V>>, H>;
49
50pub mod sync;
51
52#[derive(Clone)]
54pub struct Config<T: Translator, C> {
55 pub mmr_journal_partition: String,
57
58 pub mmr_items_per_blob: NonZeroU64,
60
61 pub mmr_write_buffer: NonZeroUsize,
63
64 pub mmr_metadata_partition: String,
66
67 pub log_partition: String,
69
70 pub log_write_buffer: NonZeroUsize,
72
73 pub log_compression: Option<u8>,
75
76 pub log_codec_config: C,
78
79 pub log_items_per_section: NonZeroU64,
81
82 pub translator: T,
84
85 pub thread_pool: Option<ThreadPool>,
87
88 pub page_cache: CacheRef,
90}
91
92pub struct Immutable<
95 E: RStorage + Clock + Metrics,
96 K: Array,
97 V: VariableValue,
98 H: CHasher,
99 T: Translator,
100> {
101 journal: Journal<E, K, V, H>,
103
104 snapshot: Index<T, Location>,
110
111 last_commit_loc: Location,
113}
114
115impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
117 Immutable<E, K, V, H, T>
118{
119 pub async fn size(&self) -> Location {
121 self.bounds().await.end
122 }
123
124 pub async fn bounds(&self) -> std::ops::Range<Location> {
127 let bounds = self.journal.reader().await.bounds();
128 Location::new(bounds.start)..Location::new(bounds.end)
129 }
130
131 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
134 let iter = self.snapshot.get(key);
135 let reader = self.journal.reader().await;
136 let oldest = reader.bounds().start;
137 for &loc in iter {
138 if loc < oldest {
139 continue;
140 }
141 if let Some(v) = Self::get_from_loc(&reader, key, loc).await? {
142 return Ok(Some(v));
143 }
144 }
145
146 Ok(None)
147 }
148
149 async fn get_from_loc(
153 reader: &impl Reader<Item = Operation<K, V>>,
154 key: &K,
155 loc: Location,
156 ) -> Result<Option<V>, Error> {
157 if loc < reader.bounds().start {
158 return Err(Error::OperationPruned(loc));
159 }
160
161 let Operation::Set(k, v) = reader.read(*loc).await? else {
162 return Err(Error::UnexpectedData(loc));
163 };
164
165 if k != *key {
166 Ok(None)
167 } else {
168 Ok(Some(v))
169 }
170 }
171
172 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
174 let last_commit_loc = self.last_commit_loc;
175 let Operation::Commit(metadata) = self
176 .journal
177 .journal
178 .reader()
179 .await
180 .read(*last_commit_loc)
181 .await?
182 else {
183 unreachable!("no commit operation at location of last commit {last_commit_loc}");
184 };
185
186 Ok(metadata)
187 }
188
189 pub async fn historical_proof(
200 &self,
201 op_count: Location,
202 start_loc: Location,
203 max_ops: NonZeroU64,
204 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
205 Ok(self
206 .journal
207 .historical_proof(op_count, start_loc, max_ops)
208 .await?)
209 }
210
211 pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
219 if loc > self.last_commit_loc {
220 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
221 }
222 self.journal.prune(loc).await?;
223
224 Ok(())
225 }
226 pub fn root(&self) -> H::Digest {
228 self.journal.root()
229 }
230
231 pub async fn proof(
238 &self,
239 start_index: Location,
240 max_ops: NonZeroU64,
241 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
242 let op_count = self.bounds().await.end;
243 self.historical_proof(op_count, start_index, max_ops).await
244 }
245
246 pub async fn init(
249 context: E,
250 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
251 ) -> Result<Self, Error> {
252 let mmr_cfg = MmrConfig {
253 journal_partition: cfg.mmr_journal_partition,
254 metadata_partition: cfg.mmr_metadata_partition,
255 items_per_blob: cfg.mmr_items_per_blob,
256 write_buffer: cfg.mmr_write_buffer,
257 thread_pool: cfg.thread_pool,
258 page_cache: cfg.page_cache.clone(),
259 };
260
261 let journal_cfg = JournalConfig {
262 partition: cfg.log_partition,
263 items_per_section: cfg.log_items_per_section,
264 compression: cfg.log_compression,
265 codec_config: cfg.log_codec_config,
266 page_cache: cfg.page_cache.clone(),
267 write_buffer: cfg.log_write_buffer,
268 };
269
270 let mut journal = Journal::new(
271 context.clone(),
272 mmr_cfg,
273 journal_cfg,
274 Operation::<K, V>::is_commit,
275 )
276 .await?;
277
278 if journal.size().await == 0 {
279 warn!("Authenticated log is empty, initialized new db.");
280 journal.append(&Operation::Commit(None)).await?;
281 journal.sync().await?;
282 }
283
284 let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator.clone());
285
286 let last_commit_loc = {
287 let reader = journal.reader().await;
289 let start_loc = Location::new(reader.bounds().start);
290
291 build_snapshot_from_log(start_loc, &reader, &mut snapshot, |_, _| {}).await?;
293
294 Location::new(
295 reader
296 .bounds()
297 .end
298 .checked_sub(1)
299 .expect("commit should exist"),
300 )
301 };
302
303 Ok(Self {
304 journal,
305 snapshot,
306 last_commit_loc,
307 })
308 }
309
310 pub async fn sync(&mut self) -> Result<(), Error> {
314 Ok(self.journal.sync().await?)
315 }
316
317 pub async fn destroy(self) -> Result<(), Error> {
319 Ok(self.journal.destroy().await?)
320 }
321
322 #[allow(clippy::type_complexity)]
324 pub fn new_batch(&self) -> batch::UnmerkleizedBatch<'_, E, K, V, H, T, Mmr<E, H::Digest>> {
325 let journal_size = *self.last_commit_loc + 1;
326 batch::UnmerkleizedBatch {
327 immutable: self,
328 journal_batch: self.journal.new_batch(),
329 mutations: BTreeMap::new(),
330 base_diff: Arc::new(BTreeMap::new()),
331 base_operations: Vec::new(),
332 base_size: journal_size,
333 db_size: journal_size,
334 }
335 }
336
337 pub async fn apply_batch(
346 &mut self,
347 batch: batch::Changeset<K, H::Digest, V>,
348 ) -> Result<Range<Location>, Error> {
349 let journal_size = *self.last_commit_loc + 1;
350 if batch.db_size != journal_size {
351 return Err(Error::StaleChangeset {
352 expected: batch.db_size,
353 actual: journal_size,
354 });
355 }
356 let start_loc = Location::new(journal_size);
357
358 self.journal.apply_batch(batch.journal_finalized).await?;
360
361 self.journal.commit().await?;
363
364 let bounds = self.journal.reader().await.bounds();
366 for diff in batch.snapshot_diffs {
367 match diff {
368 batch::SnapshotDiff::Insert { key, new_loc } => {
369 self.snapshot
370 .insert_and_prune(&key, new_loc, |v| *v < bounds.start);
371 }
372 }
373 }
374
375 self.last_commit_loc = Location::new(batch.total_size - 1);
377
378 let end_loc = Location::new(batch.total_size);
379 Ok(start_loc..end_loc)
380 }
381}
382
383impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
384 kv::Gettable for Immutable<E, K, V, H, T>
385{
386 type Key = K;
387 type Value = V;
388 type Error = Error;
389
390 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
391 self.get(key).await
392 }
393}
394
395impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
396 crate::qmdb::store::LogStore for Immutable<E, K, V, H, T>
397{
398 type Value = V;
399
400 async fn bounds(&self) -> std::ops::Range<Location> {
401 self.bounds().await
402 }
403
404 async fn get_metadata(&self) -> Result<Option<V>, Error> {
405 self.get_metadata().await
406 }
407}
408
409impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
410 crate::qmdb::store::MerkleizedStore for Immutable<E, K, V, H, T>
411{
412 type Digest = H::Digest;
413 type Operation = Operation<K, V>;
414
415 fn root(&self) -> Self::Digest {
416 self.root()
417 }
418
419 async fn historical_proof(
420 &self,
421 historical_size: Location,
422 start_loc: Location,
423 max_ops: NonZeroU64,
424 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
425 self.historical_proof(historical_size, start_loc, max_ops)
426 .await
427 }
428}
429
430#[cfg(test)]
431pub(super) mod test {
432 use super::*;
433 use crate::{mmr::StandardHasher, qmdb::verify_proof, translator::TwoCap};
434 use commonware_cryptography::{sha256, sha256::Digest, Sha256};
435 use commonware_macros::test_traced;
436 use commonware_runtime::{deterministic, BufferPooler, Runner as _};
437 use commonware_utils::{NZUsize, NZU16, NZU64};
438 use std::num::NonZeroU16;
439
440 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
441 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
442 const ITEMS_PER_SECTION: u64 = 5;
443
444 pub(crate) fn db_config(
445 suffix: &str,
446 pooler: &impl BufferPooler,
447 ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
448 Config {
449 mmr_journal_partition: format!("journal-{suffix}"),
450 mmr_metadata_partition: format!("metadata-{suffix}"),
451 mmr_items_per_blob: NZU64!(11),
452 mmr_write_buffer: NZUsize!(1024),
453 log_partition: format!("log-{suffix}"),
454 log_items_per_section: NZU64!(ITEMS_PER_SECTION),
455 log_compression: None,
456 log_codec_config: ((0..=10000).into(), ()),
457 log_write_buffer: NZUsize!(1024),
458 translator: TwoCap,
459 thread_pool: None,
460 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
461 }
462 }
463
464 async fn open_db(
466 context: deterministic::Context,
467 ) -> Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap> {
468 let cfg = db_config("partition", &context);
469 Immutable::init(context, cfg).await.unwrap()
470 }
471
472 #[test_traced("WARN")]
473 pub fn test_immutable_db_empty() {
474 let executor = deterministic::Runner::default();
475 executor.start(|context| async move {
476 let db = open_db(context.with_label("first")).await;
477 let bounds = db.bounds().await;
478 assert_eq!(bounds.end, 1);
479 assert_eq!(bounds.start, Location::new(0));
480 assert!(db.get_metadata().await.unwrap().is_none());
481
482 let k1 = Sha256::fill(1u8);
484 let v1 = vec![4, 5, 6, 7];
485 let root = db.root();
486 {
487 let mut batch = db.new_batch();
488 batch.set(k1, v1);
489 }
491 drop(db);
492 let mut db = open_db(context.with_label("second")).await;
493 assert_eq!(db.root(), root);
494 assert_eq!(db.bounds().await.end, 1);
495
496 let finalized = db.new_batch().merkleize(None).finalize();
498 db.apply_batch(finalized).await.unwrap();
499 assert_eq!(db.bounds().await.end, 2); let root = db.root();
501 drop(db);
502
503 let db = open_db(context.with_label("third")).await;
504 assert_eq!(db.root(), root);
505
506 db.destroy().await.unwrap();
507 });
508 }
509
510 #[test_traced("DEBUG")]
511 pub fn test_immutable_db_build_basic() {
512 let executor = deterministic::Runner::default();
513 executor.start(|context| async move {
514 let mut db = open_db(context.with_label("first")).await;
516
517 let k1 = Sha256::fill(1u8);
518 let k2 = Sha256::fill(2u8);
519 let v1 = vec![1, 2, 3];
520 let v2 = vec![4, 5, 6, 7, 8];
521
522 assert!(db.get(&k1).await.unwrap().is_none());
523 assert!(db.get(&k2).await.unwrap().is_none());
524
525 let metadata = Some(vec![99, 100]);
527 let finalized = {
528 let mut batch = db.new_batch();
529 batch.set(k1, v1.clone());
530 batch.merkleize(metadata.clone()).finalize()
531 };
532 db.apply_batch(finalized).await.unwrap();
533 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
534 assert!(db.get(&k2).await.unwrap().is_none());
535 assert_eq!(db.bounds().await.end, 3);
536 assert_eq!(db.get_metadata().await.unwrap(), metadata.clone());
537
538 let finalized = {
540 let mut batch = db.new_batch();
541 batch.set(k2, v2.clone());
542 batch.merkleize(None).finalize()
543 };
544 db.apply_batch(finalized).await.unwrap();
545 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
546 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
547 assert_eq!(db.bounds().await.end, 5);
548 assert_eq!(db.get_metadata().await.unwrap(), None);
549
550 let root = db.root();
552
553 let k3 = Sha256::fill(3u8);
555 let v3 = vec![9, 10, 11];
556 {
557 let mut batch = db.new_batch();
558 batch.set(k3, v3);
559 }
561
562 drop(db); let db = open_db(context.with_label("second")).await;
565 assert!(db.get(&k3).await.unwrap().is_none());
566 assert_eq!(db.bounds().await.end, 5);
567 assert_eq!(db.root(), root);
568 assert_eq!(db.get_metadata().await.unwrap(), None);
569
570 db.destroy().await.unwrap();
572 });
573 }
574
575 #[test_traced("WARN")]
576 pub fn test_immutable_db_build_and_authenticate() {
577 let executor = deterministic::Runner::default();
578 const ELEMENTS: u64 = 2_000;
580 executor.start(|context| async move {
581 let mut hasher = StandardHasher::<Sha256>::new();
582 let mut db = open_db(context.with_label("first")).await;
583
584 let finalized = {
585 let mut batch = db.new_batch();
586 for i in 0u64..ELEMENTS {
587 let k = Sha256::hash(&i.to_be_bytes());
588 let v = vec![i as u8; 100];
589 batch.set(k, v);
590 }
591 batch.merkleize(None).finalize()
592 };
593 db.apply_batch(finalized).await.unwrap();
594 assert_eq!(db.bounds().await.end, ELEMENTS + 2);
595
596 let root = db.root();
598 drop(db);
599
600 let db = open_db(context.with_label("second")).await;
601 assert_eq!(root, db.root());
602 assert_eq!(db.bounds().await.end, ELEMENTS + 2);
603 for i in 0u64..ELEMENTS {
604 let k = Sha256::hash(&i.to_be_bytes());
605 let v = vec![i as u8; 100];
606 assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
607 }
608
609 let max_ops = NZU64!(5);
612 for i in 0..*db.bounds().await.end {
613 let (proof, log) = db.proof(Location::new(i), max_ops).await.unwrap();
614 assert!(verify_proof(
615 &mut hasher,
616 &proof,
617 Location::new(i),
618 &log,
619 &root
620 ));
621 }
622
623 db.destroy().await.unwrap();
624 });
625 }
626
627 #[test_traced("WARN")]
628 pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
629 let executor = deterministic::Runner::default();
630 executor.start(|context| async move {
631 const ELEMENTS: u64 = 1000;
633 let mut db = open_db(context.with_label("first")).await;
634
635 let finalized = {
636 let mut batch = db.new_batch();
637 for i in 0u64..ELEMENTS {
638 let k = Sha256::hash(&i.to_be_bytes());
639 let v = vec![i as u8; 100];
640 batch.set(k, v);
641 }
642 batch.merkleize(None).finalize()
643 };
644 db.apply_batch(finalized).await.unwrap();
645 assert_eq!(db.bounds().await.end, ELEMENTS + 2);
646 db.sync().await.unwrap();
647 let halfway_root = db.root();
648
649 let finalized = {
651 let mut batch = db.new_batch();
652 for i in 0u64..ELEMENTS {
653 let k = Sha256::hash(&i.to_be_bytes());
654 let v = vec![i as u8; 100];
655 batch.set(k, v);
656 }
657 batch.merkleize(None).finalize()
658 };
659 db.apply_batch(finalized).await.unwrap();
660 drop(db); let db = open_db(context.with_label("second")).await;
665 assert_eq!(db.bounds().await.end, 2003);
666 let root = db.root();
667 assert_ne!(root, halfway_root);
668
669 drop(db);
671 let db = open_db(context.with_label("third")).await;
672 assert_eq!(db.bounds().await.end, 2003);
673 assert_eq!(db.root(), root);
674
675 db.destroy().await.unwrap();
676 });
677 }
678
679 #[test_traced("WARN")]
680 pub fn test_immutable_db_recovery_from_failed_log_sync() {
681 let executor = deterministic::Runner::default();
682 executor.start(|context| async move {
683 let mut db = open_db(context.with_label("first")).await;
684
685 let k1 = Sha256::fill(1u8);
687 let v1 = vec![1, 2, 3];
688 let finalized = {
689 let mut batch = db.new_batch();
690 batch.set(k1, v1);
691 batch.merkleize(None).finalize()
692 };
693 db.apply_batch(finalized).await.unwrap();
694 let first_commit_root = db.root();
695
696 drop(db);
699
700 let db = open_db(context.with_label("second")).await;
702 assert_eq!(db.bounds().await.end, 3);
703 let root = db.root();
704 assert_eq!(root, first_commit_root);
705
706 db.destroy().await.unwrap();
707 });
708 }
709
710 #[test_traced("WARN")]
711 pub fn test_immutable_db_pruning() {
712 let executor = deterministic::Runner::default();
713 const ELEMENTS: u64 = 2_000;
715 executor.start(|context| async move {
716 let mut db = open_db(context.with_label("first")).await;
717
718 let mut sorted_keys: Vec<sha256::Digest> = (1u64..ELEMENTS + 1)
721 .map(|i| Sha256::hash(&i.to_be_bytes()))
722 .collect();
723 sorted_keys.sort();
724 let finalized = {
729 let mut batch = db.new_batch();
730 for i in 1u64..ELEMENTS + 1 {
731 let k = Sha256::hash(&i.to_be_bytes());
732 let v = vec![i as u8; 100];
733 batch.set(k, v);
734 }
735 batch.merkleize(None).finalize()
736 };
737 db.apply_batch(finalized).await.unwrap();
738 assert_eq!(db.bounds().await.end, ELEMENTS + 2);
739
740 db.prune(Location::new((ELEMENTS + 2) / 2))
742 .await
743 .unwrap();
744 let bounds = db.bounds().await;
745 assert_eq!(bounds.end, ELEMENTS + 2);
746
747 let oldest_retained_loc = bounds.start;
750 assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
751
752 let pruned_key = sorted_keys[*oldest_retained_loc as usize - 2];
754 assert!(db.get(&pruned_key).await.unwrap().is_none());
755
756 let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
758 assert!(db.get(&unpruned_key).await.unwrap().is_some());
759
760 let root = db.root();
762 db.sync().await.unwrap();
763 drop(db);
764
765 let mut db = open_db(context.with_label("second")).await;
766 assert_eq!(root, db.root());
767 let bounds = db.bounds().await;
768 assert_eq!(bounds.end, ELEMENTS + 2);
769 let oldest_retained_loc = bounds.start;
770 assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
771
772 let loc = Location::new(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
774 db.prune(loc).await.unwrap();
775 let oldest_retained_loc = db.bounds().await.start;
777 assert_eq!(
778 oldest_retained_loc,
779 Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
780 );
781
782 db.sync().await.unwrap();
784 drop(db);
785 let db = open_db(context.with_label("third")).await;
786 let oldest_retained_loc = db.bounds().await.start;
787 assert_eq!(
788 oldest_retained_loc,
789 Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
790 );
791
792 let pruned_key = sorted_keys[*oldest_retained_loc as usize - 4];
794 assert!(db.get(&pruned_key).await.unwrap().is_none());
795
796 let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
798 assert!(db.get(&unpruned_key).await.unwrap().is_some());
799
800 let pruned_pos = ELEMENTS / 2;
802 let proof_result = db
803 .proof(
804 Location::new(pruned_pos),
805 NZU64!(pruned_pos + 100),
806 )
807 .await;
808 assert!(matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos));
809
810 db.destroy().await.unwrap();
811 });
812 }
813
814 #[test_traced("INFO")]
815 pub fn test_immutable_db_prune_beyond_commit() {
816 let executor = deterministic::Runner::default();
817 executor.start(|context| async move {
818 let mut db = open_db(context.with_label("test")).await;
819
820 let result = db.prune(Location::new(1)).await;
822 assert!(
823 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
824 if prune_loc == Location::new(1) && commit_loc == Location::new(0))
825 );
826
827 let k1 = Digest::from(*b"12345678901234567890123456789012");
829 let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
830 let k3 = Digest::from(*b"99999999999999999999999999999999");
831 let v1 = vec![1u8; 16];
832 let v2 = vec![2u8; 16];
833 let v3 = vec![3u8; 16];
834
835 let finalized = {
836 let mut batch = db.new_batch();
837 batch.set(k1, v1.clone());
838 batch.set(k2, v2.clone());
839 batch.merkleize(None).finalize()
840 };
841 db.apply_batch(finalized).await.unwrap();
842
843 assert_eq!(*db.last_commit_loc, 3);
845
846 let finalized = {
847 let mut batch = db.new_batch();
848 batch.set(k3, v3.clone());
849 batch.merkleize(None).finalize()
850 };
851 db.apply_batch(finalized).await.unwrap();
852
853 assert!(db.prune(Location::new(3)).await.is_ok());
855
856 let new_last_commit = db.last_commit_loc;
858 let beyond = new_last_commit + 1;
859 let result = db.prune(beyond).await;
860 assert!(
861 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
862 if prune_loc == beyond && commit_loc == new_last_commit)
863 );
864
865 db.destroy().await.unwrap();
866 });
867 }
868
869 use crate::{
870 kv::tests::{assert_gettable, assert_send},
871 qmdb::store::tests::{assert_log_store, assert_merkleized_store},
872 };
873
874 type TestDb = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
875
876 #[allow(dead_code)]
877 fn assert_db_futures_are_send(db: &mut TestDb, key: Digest, loc: Location) {
878 assert_gettable(db, &key);
879 assert_log_store(db);
880 assert_merkleized_store(db, loc);
881 assert_send(db.sync());
882 }
883
884 type Db = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
885
886 #[test_traced("INFO")]
888 fn test_immutable_batch_get_read_through() {
889 let executor = deterministic::Runner::default();
890 executor.start(|context| async move {
891 let cfg = db_config("readthrough", &context);
892 let mut db: Db = Immutable::init(context.with_label("db"), cfg)
893 .await
894 .unwrap();
895
896 let key_a = Sha256::hash(&0u64.to_be_bytes());
898 let val_a = vec![1u8; 8];
899 let finalized = {
900 let mut batch = db.new_batch();
901 batch.set(key_a, val_a.clone());
902 batch.merkleize(None).finalize()
903 };
904 db.apply_batch(finalized).await.unwrap();
905
906 let mut batch = db.new_batch();
908 assert_eq!(batch.get(&key_a).await.unwrap(), Some(val_a));
909
910 let key_b = Sha256::hash(&1u64.to_be_bytes());
912 let val_b = vec![2u8; 8];
913 batch.set(key_b, val_b.clone());
914 assert_eq!(batch.get(&key_b).await.unwrap(), Some(val_b));
915
916 let key_c = Sha256::hash(&2u64.to_be_bytes());
918 assert_eq!(batch.get(&key_c).await.unwrap(), None);
919
920 db.destroy().await.unwrap();
921 });
922 }
923
924 #[test_traced("INFO")]
926 fn test_immutable_batch_stacked_get() {
927 let executor = deterministic::Runner::default();
928 executor.start(|context| async move {
929 let cfg = db_config("stacked-get", &context);
930 let db: Db = Immutable::init(context.with_label("db"), cfg)
931 .await
932 .unwrap();
933
934 let key_a = Sha256::hash(&0u64.to_be_bytes());
936 let val_a = vec![10u8; 8];
937 let mut parent = db.new_batch();
938 parent.set(key_a, val_a.clone());
939 let parent_m = parent.merkleize(None);
940
941 let mut child = parent_m.new_batch();
943 assert_eq!(child.get(&key_a).await.unwrap(), Some(val_a));
944
945 let key_b = Sha256::hash(&1u64.to_be_bytes());
947 let val_b = vec![20u8; 8];
948 child.set(key_b, val_b.clone());
949 assert_eq!(child.get(&key_b).await.unwrap(), Some(val_b));
950
951 let key_c = Sha256::hash(&2u64.to_be_bytes());
953 assert_eq!(child.get(&key_c).await.unwrap(), None);
954
955 db.destroy().await.unwrap();
956 });
957 }
958
959 #[test_traced("INFO")]
961 fn test_immutable_batch_stacked_finalize_apply() {
962 let executor = deterministic::Runner::default();
963 executor.start(|context| async move {
964 let cfg = db_config("stacked-apply", &context);
965 let mut db: Db = Immutable::init(context.with_label("db"), cfg)
966 .await
967 .unwrap();
968
969 let mut kvs_first: Vec<(Digest, Vec<u8>)> = (0u64..5)
971 .map(|i| (Sha256::hash(&i.to_be_bytes()), vec![i as u8; 8]))
972 .collect();
973 kvs_first.sort_by(|a, b| a.0.cmp(&b.0));
974
975 let mut kvs_second: Vec<(Digest, Vec<u8>)> = (5u64..10)
976 .map(|i| (Sha256::hash(&i.to_be_bytes()), vec![i as u8; 8]))
977 .collect();
978 kvs_second.sort_by(|a, b| a.0.cmp(&b.0));
979
980 let mut parent = db.new_batch();
982 for (k, v) in &kvs_first {
983 parent.set(*k, v.clone());
984 }
985 let parent_m = parent.merkleize(None);
986
987 let mut child = parent_m.new_batch();
989 for (k, v) in &kvs_second {
990 child.set(*k, v.clone());
991 }
992 let child_m = child.merkleize(None);
993 let expected_root = child_m.root();
994 let finalized = child_m.finalize();
995 db.apply_batch(finalized).await.unwrap();
996
997 assert_eq!(db.root(), expected_root);
998
999 for (k, v) in kvs_first.iter().chain(kvs_second.iter()) {
1001 assert_eq!(db.get(k).await.unwrap(), Some(v.clone()));
1002 }
1003
1004 db.destroy().await.unwrap();
1005 });
1006 }
1007
1008 #[test_traced("INFO")]
1010 fn test_immutable_batch_speculative_root() {
1011 let executor = deterministic::Runner::default();
1012 executor.start(|context| async move {
1013 let mut db = open_db(context.with_label("db")).await;
1014
1015 let merkleized = {
1016 let mut batch = db.new_batch();
1017 for i in 0u8..10 {
1018 let k = Sha256::hash(&[i]);
1019 batch.set(k, vec![i; 16]);
1020 }
1021 batch.merkleize(None)
1022 };
1023
1024 let speculative = merkleized.root();
1025 let finalized = merkleized.finalize();
1026 db.apply_batch(finalized).await.unwrap();
1027 assert_eq!(db.root(), speculative);
1028
1029 let metadata = vec![55u8; 8];
1031 let merkleized = {
1032 let mut batch = db.new_batch();
1033 let k = Sha256::hash(&[0xAA]);
1034 batch.set(k, vec![0xAA; 20]);
1035 batch.merkleize(Some(metadata))
1036 };
1037 let speculative = merkleized.root();
1038 let finalized = merkleized.finalize();
1039 db.apply_batch(finalized).await.unwrap();
1040 assert_eq!(db.root(), speculative);
1041
1042 db.destroy().await.unwrap();
1043 });
1044 }
1045
1046 #[test_traced("INFO")]
1048 fn test_immutable_merkleized_batch_get() {
1049 let executor = deterministic::Runner::default();
1050 executor.start(|context| async move {
1051 let mut db = open_db(context.with_label("db")).await;
1052
1053 let key_a = Sha256::hash(&0u64.to_be_bytes());
1055 let val_a = vec![10u8; 12];
1056 let finalized = {
1057 let mut batch = db.new_batch();
1058 batch.set(key_a, val_a.clone());
1059 batch.merkleize(None).finalize()
1060 };
1061 db.apply_batch(finalized).await.unwrap();
1062
1063 let key_b = Sha256::hash(&1u64.to_be_bytes());
1065 let val_b = vec![20u8; 16];
1066 let mut batch = db.new_batch();
1067 batch.set(key_b, val_b.clone());
1068 let merkleized = batch.merkleize(None);
1069
1070 assert_eq!(merkleized.get(&key_a).await.unwrap(), Some(val_a));
1072
1073 assert_eq!(merkleized.get(&key_b).await.unwrap(), Some(val_b));
1075
1076 let key_c = Sha256::hash(&2u64.to_be_bytes());
1078 assert_eq!(merkleized.get(&key_c).await.unwrap(), None);
1079
1080 db.destroy().await.unwrap();
1081 });
1082 }
1083
1084 #[test_traced("INFO")]
1086 fn test_immutable_batch_sequential_apply() {
1087 let executor = deterministic::Runner::default();
1088 executor.start(|context| async move {
1089 let mut db = open_db(context.with_label("db")).await;
1090
1091 let key_a = Sha256::hash(&0u64.to_be_bytes());
1092 let val_a = vec![1u8; 8];
1093
1094 let mut batch = db.new_batch();
1096 batch.set(key_a, val_a.clone());
1097 let m = batch.merkleize(None);
1098 let root1 = m.root();
1099 db.apply_batch(m.finalize()).await.unwrap();
1100 assert_eq!(db.root(), root1);
1101 assert_eq!(db.get(&key_a).await.unwrap(), Some(val_a));
1102
1103 let key_b = Sha256::hash(&1u64.to_be_bytes());
1105 let val_b = vec![2u8; 16];
1106 let mut batch = db.new_batch();
1107 batch.set(key_b, val_b.clone());
1108 let m = batch.merkleize(None);
1109 let root2 = m.root();
1110 db.apply_batch(m.finalize()).await.unwrap();
1111 assert_eq!(db.root(), root2);
1112 assert_eq!(db.get(&key_b).await.unwrap(), Some(val_b));
1113
1114 db.destroy().await.unwrap();
1115 });
1116 }
1117
1118 #[test_traced("INFO")]
1120 fn test_immutable_batch_many_sequential() {
1121 let executor = deterministic::Runner::default();
1122 executor.start(|context| async move {
1123 let mut db = open_db(context.with_label("db")).await;
1124 let mut hasher = StandardHasher::<Sha256>::new();
1125
1126 const BATCHES: u64 = 20;
1127 const KEYS_PER_BATCH: u64 = 5;
1128
1129 let mut all_kvs: Vec<(Digest, Vec<u8>)> = Vec::new();
1130
1131 for batch_idx in 0..BATCHES {
1132 let finalized = {
1133 let mut batch = db.new_batch();
1134 for j in 0..KEYS_PER_BATCH {
1135 let seed = batch_idx * 100 + j;
1136 let k = Sha256::hash(&seed.to_be_bytes());
1137 let v = vec![seed as u8; 8];
1138 batch.set(k, v.clone());
1139 all_kvs.push((k, v));
1140 }
1141 batch.merkleize(None).finalize()
1142 };
1143 db.apply_batch(finalized).await.unwrap();
1144 }
1145
1146 for (k, v) in &all_kvs {
1148 assert_eq!(db.get(k).await.unwrap(), Some(v.clone()));
1149 }
1150
1151 let root = db.root();
1153 let (proof, ops) = db.proof(Location::new(0), NZU64!(10000)).await.unwrap();
1154 assert!(verify_proof(
1155 &mut hasher,
1156 &proof,
1157 Location::new(0),
1158 &ops,
1159 &root
1160 ));
1161
1162 let expected = 1 + BATCHES * (KEYS_PER_BATCH + 1);
1164 assert_eq!(db.bounds().await.end, expected);
1165
1166 db.destroy().await.unwrap();
1167 });
1168 }
1169
1170 #[test_traced("INFO")]
1172 fn test_immutable_batch_empty() {
1173 let executor = deterministic::Runner::default();
1174 executor.start(|context| async move {
1175 let mut db = open_db(context.with_label("db")).await;
1176
1177 let finalized = {
1179 let mut batch = db.new_batch();
1180 let k = Sha256::hash(&[1u8]);
1181 batch.set(k, vec![1u8; 8]);
1182 batch.merkleize(None).finalize()
1183 };
1184 db.apply_batch(finalized).await.unwrap();
1185 let root_before = db.root();
1186 let size_before = db.bounds().await.end;
1187
1188 let merkleized = db.new_batch().merkleize(None);
1190 let speculative = merkleized.root();
1191 let finalized = merkleized.finalize();
1192 db.apply_batch(finalized).await.unwrap();
1193
1194 assert_ne!(db.root(), root_before);
1196 assert_eq!(db.root(), speculative);
1197 assert_eq!(db.bounds().await.end, size_before + 1);
1199
1200 db.destroy().await.unwrap();
1201 });
1202 }
1203
1204 #[test_traced("INFO")]
1206 fn test_immutable_batch_chained_merkleized_get() {
1207 let executor = deterministic::Runner::default();
1208 executor.start(|context| async move {
1209 let mut db = open_db(context.with_label("db")).await;
1210
1211 let key_a = Sha256::hash(&0u64.to_be_bytes());
1213 let val_a = vec![10u8; 12];
1214 let finalized = {
1215 let mut batch = db.new_batch();
1216 batch.set(key_a, val_a.clone());
1217 batch.merkleize(None).finalize()
1218 };
1219 db.apply_batch(finalized).await.unwrap();
1220
1221 let key_b = Sha256::hash(&1u64.to_be_bytes());
1223 let val_b = vec![1u8; 8];
1224 let mut parent = db.new_batch();
1225 parent.set(key_b, val_b.clone());
1226 let parent_m = parent.merkleize(None);
1227
1228 let key_c = Sha256::hash(&2u64.to_be_bytes());
1230 let val_c = vec![2u8; 16];
1231 let mut child = parent_m.new_batch();
1232 child.set(key_c, val_c.clone());
1233 let child_m = child.merkleize(None);
1234
1235 assert_eq!(child_m.get(&key_a).await.unwrap(), Some(val_a));
1238 assert_eq!(child_m.get(&key_b).await.unwrap(), Some(val_b));
1240 assert_eq!(child_m.get(&key_c).await.unwrap(), Some(val_c));
1242 let key_d = Sha256::hash(&3u64.to_be_bytes());
1244 assert_eq!(child_m.get(&key_d).await.unwrap(), None);
1245
1246 db.destroy().await.unwrap();
1247 });
1248 }
1249
1250 #[test_traced("INFO")]
1252 fn test_immutable_batch_large() {
1253 let executor = deterministic::Runner::default();
1254 executor.start(|context| async move {
1255 let mut db = open_db(context.with_label("db")).await;
1256 let mut hasher = StandardHasher::<Sha256>::new();
1257
1258 const N: u64 = 500;
1259 let mut kvs: Vec<(Digest, Vec<u8>)> = Vec::new();
1260
1261 let finalized = {
1262 let mut batch = db.new_batch();
1263 for i in 0..N {
1264 let k = Sha256::hash(&i.to_be_bytes());
1265 let v = vec![(i % 256) as u8; ((i % 29) + 3) as usize];
1266 batch.set(k, v.clone());
1267 kvs.push((k, v));
1268 }
1269 batch.merkleize(None).finalize()
1270 };
1271 db.apply_batch(finalized).await.unwrap();
1272
1273 for (k, v) in &kvs {
1275 assert_eq!(db.get(k).await.unwrap(), Some(v.clone()));
1276 }
1277
1278 let root = db.root();
1280 let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1281 assert!(verify_proof(
1282 &mut hasher,
1283 &proof,
1284 Location::new(0),
1285 &ops,
1286 &root
1287 ));
1288
1289 assert_eq!(db.bounds().await.end, 1 + N + 1);
1291
1292 db.destroy().await.unwrap();
1293 });
1294 }
1295
1296 #[test_traced("INFO")]
1298 fn test_immutable_batch_chained_key_override() {
1299 let executor = deterministic::Runner::default();
1300 executor.start(|context| async move {
1301 let mut db = open_db(context.with_label("db")).await;
1302
1303 let key = Sha256::hash(&0u64.to_be_bytes());
1304 let val_parent = vec![1u8; 8];
1305 let val_child = vec![2u8; 16];
1306
1307 let mut parent = db.new_batch();
1309 parent.set(key, val_parent.clone());
1310 let parent_m = parent.merkleize(None);
1311
1312 let mut child = parent_m.new_batch();
1314 child.set(key, val_child.clone());
1315
1316 assert_eq!(child.get(&key).await.unwrap(), Some(val_child.clone()));
1318
1319 let child_m = child.merkleize(None);
1320
1321 assert_eq!(child_m.get(&key).await.unwrap(), Some(val_child.clone()));
1323
1324 let finalized = child_m.finalize();
1326 db.apply_batch(finalized).await.unwrap();
1327 assert_eq!(db.get(&key).await.unwrap(), Some(val_child));
1328
1329 db.destroy().await.unwrap();
1330 });
1331 }
1332
1333 #[test_traced("INFO")]
1337 fn test_immutable_batch_sequential_key_override() {
1338 let executor = deterministic::Runner::default();
1339 executor.start(|context| async move {
1340 let cfg = Config {
1341 log_items_per_section: NZU64!(1),
1343 ..db_config("key-override", &context)
1344 };
1345 let mut db: Db = Immutable::init(context.with_label("db"), cfg)
1346 .await
1347 .unwrap();
1348
1349 let key = Sha256::hash(&0u64.to_be_bytes());
1350 let v1 = vec![1u8; 8];
1351 let v2 = vec![2u8; 16];
1352
1353 let finalized = {
1356 let mut batch = db.new_batch();
1357 batch.set(key, v1.clone());
1358 batch.merkleize(None).finalize()
1359 };
1360 db.apply_batch(finalized).await.unwrap();
1361 assert_eq!(db.get(&key).await.unwrap(), Some(v1.clone()));
1362
1363 let finalized = {
1366 let mut batch = db.new_batch();
1367 batch.set(key, v2.clone());
1368 batch.merkleize(None).finalize()
1369 };
1370 db.apply_batch(finalized).await.unwrap();
1371
1372 assert_eq!(db.get(&key).await.unwrap(), Some(v1.clone()));
1374
1375 db.prune(Location::new(2)).await.unwrap();
1378 assert_eq!(db.get(&key).await.unwrap(), Some(v2.clone()));
1379
1380 db.sync().await.unwrap();
1382
1383 db.destroy().await.unwrap();
1384 });
1385 }
1386
1387 #[test_traced("INFO")]
1389 fn test_immutable_batch_metadata() {
1390 let executor = deterministic::Runner::default();
1391 executor.start(|context| async move {
1392 let mut db = open_db(context.with_label("db")).await;
1393
1394 let metadata = vec![42u8; 32];
1396 let finalized = {
1397 let mut batch = db.new_batch();
1398 let k = Sha256::hash(&[1u8]);
1399 batch.set(k, vec![1u8; 8]);
1400 batch.merkleize(Some(metadata.clone())).finalize()
1401 };
1402 db.apply_batch(finalized).await.unwrap();
1403 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1404
1405 let finalized = db.new_batch().merkleize(None).finalize();
1407 db.apply_batch(finalized).await.unwrap();
1408 assert_eq!(db.get_metadata().await.unwrap(), None);
1409
1410 db.destroy().await.unwrap();
1411 });
1412 }
1413
1414 #[test_traced]
1415 fn test_stale_changeset_rejected() {
1416 let executor = deterministic::Runner::default();
1417 executor.start(|context| async move {
1418 let mut db = open_db(context.with_label("db")).await;
1419
1420 let key1 = Sha256::hash(&[1]);
1421 let key2 = Sha256::hash(&[2]);
1422
1423 let changeset_a = {
1425 let mut batch = db.new_batch();
1426 batch.set(key1, vec![10]);
1427 batch.merkleize(None).finalize()
1428 };
1429 let changeset_b = {
1430 let mut batch = db.new_batch();
1431 batch.set(key2, vec![20]);
1432 batch.merkleize(None).finalize()
1433 };
1434
1435 db.apply_batch(changeset_a).await.unwrap();
1437 let expected_root = db.root();
1438 let expected_bounds = db.bounds().await;
1439 assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
1440 assert_eq!(db.get(&key2).await.unwrap(), None);
1441 assert_eq!(db.get_metadata().await.unwrap(), None);
1442
1443 let result = db.apply_batch(changeset_b).await;
1445 assert!(
1446 matches!(result, Err(Error::StaleChangeset { .. })),
1447 "expected StaleChangeset error, got {result:?}"
1448 );
1449 assert_eq!(db.root(), expected_root);
1450 assert_eq!(db.bounds().await, expected_bounds);
1451 assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
1452 assert_eq!(db.get(&key2).await.unwrap(), None);
1453 assert_eq!(db.get_metadata().await.unwrap(), None);
1454
1455 db.destroy().await.unwrap();
1456 });
1457 }
1458
1459 #[test_traced]
1460 fn test_stale_changeset_chained() {
1461 let executor = deterministic::Runner::default();
1462 executor.start(|context| async move {
1463 let mut db = open_db(context.with_label("db")).await;
1464
1465 let key1 = Sha256::hash(&[1]);
1466 let key2 = Sha256::hash(&[2]);
1467 let key3 = Sha256::hash(&[3]);
1468
1469 let mut parent = db.new_batch();
1471 parent.set(key1, vec![1]);
1472 let parent_m = parent.merkleize(None);
1473
1474 let child_a = {
1476 let mut batch = parent_m.new_batch();
1477 batch.set(key2, vec![2]);
1478 batch.merkleize(None).finalize()
1479 };
1480 let child_b = {
1481 let mut batch = parent_m.new_batch();
1482 batch.set(key3, vec![3]);
1483 batch.merkleize(None).finalize()
1484 };
1485
1486 db.apply_batch(child_a).await.unwrap();
1488
1489 let result = db.apply_batch(child_b).await;
1491 assert!(
1492 matches!(result, Err(Error::StaleChangeset { .. })),
1493 "expected StaleChangeset error, got {result:?}"
1494 );
1495
1496 db.destroy().await.unwrap();
1497 });
1498 }
1499
1500 #[test_traced]
1501 fn test_stale_changeset_parent_applied_before_child() {
1502 let executor = deterministic::Runner::default();
1503 executor.start(|context| async move {
1504 let mut db = open_db(context.with_label("db")).await;
1505
1506 let key1 = Sha256::hash(&[1]);
1507 let key2 = Sha256::hash(&[2]);
1508
1509 let mut parent = db.new_batch();
1511 parent.set(key1, vec![1]);
1512 let parent_m = parent.merkleize(None);
1513
1514 let mut child = parent_m.new_batch();
1516 child.set(key2, vec![2]);
1517 let child_changeset = child.merkleize(None).finalize();
1518
1519 let parent_changeset = parent_m.finalize();
1521 db.apply_batch(parent_changeset).await.unwrap();
1522
1523 let result = db.apply_batch(child_changeset).await;
1526 assert!(
1527 matches!(result, Err(Error::StaleChangeset { .. })),
1528 "expected StaleChangeset error, got {result:?}"
1529 );
1530
1531 db.destroy().await.unwrap();
1532 });
1533 }
1534
1535 #[test_traced]
1536 fn test_stale_changeset_child_applied_before_parent() {
1537 let executor = deterministic::Runner::default();
1538 executor.start(|context| async move {
1539 let mut db = open_db(context.with_label("db")).await;
1540
1541 let key1 = Sha256::hash(&[1]);
1542 let key2 = Sha256::hash(&[2]);
1543
1544 let mut parent = db.new_batch();
1546 parent.set(key1, vec![1]);
1547 let parent_m = parent.merkleize(None);
1548
1549 let mut child = parent_m.new_batch();
1552 child.set(key2, vec![2]);
1553 let child_changeset = child.merkleize(None).finalize();
1554 let parent_changeset = parent_m.finalize();
1555
1556 db.apply_batch(child_changeset).await.unwrap();
1558
1559 let result = db.apply_batch(parent_changeset).await;
1561 assert!(
1562 matches!(result, Err(Error::StaleChangeset { .. })),
1563 "expected StaleChangeset error, got {result:?}"
1564 );
1565
1566 db.destroy().await.unwrap();
1567 });
1568 }
1569}