1use crate::{
5 index::{unordered::Index, Unordered as _},
6 journal::{
7 authenticated,
8 contiguous::variable::{self, Config as JournalConfig},
9 },
10 kv,
11 mmr::{journaled::Config as MmrConfig, Location, Proof},
12 qmdb::{
13 any::VariableValue, build_snapshot_from_log, DurabilityState, Durable, Error,
14 MerkleizationState, Merkleized, NonDurable, Unmerkleized,
15 },
16 translator::Translator,
17};
18use commonware_codec::Read;
19use commonware_cryptography::{DigestOf, Hasher as CHasher};
20use commonware_parallel::ThreadPool;
21use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
22use commonware_utils::Array;
23use std::{
24 num::{NonZeroU64, NonZeroUsize},
25 ops::Range,
26};
27use tracing::warn;
28
29mod operation;
30pub use operation::Operation;
31
32type Journal<E, K, V, H, S> =
33 authenticated::Journal<E, variable::Journal<E, Operation<K, V>>, H, S>;
34
35pub mod sync;
36
37#[derive(Clone)]
39pub struct Config<T: Translator, C> {
40 pub mmr_journal_partition: String,
42
43 pub mmr_items_per_blob: NonZeroU64,
45
46 pub mmr_write_buffer: NonZeroUsize,
48
49 pub mmr_metadata_partition: String,
51
52 pub log_partition: String,
54
55 pub log_write_buffer: NonZeroUsize,
57
58 pub log_compression: Option<u8>,
60
61 pub log_codec_config: C,
63
64 pub log_items_per_section: NonZeroU64,
66
67 pub translator: T,
69
70 pub thread_pool: Option<ThreadPool>,
72
73 pub page_cache: CacheRef,
75}
76
77pub struct Immutable<
80 E: RStorage + Clock + Metrics,
81 K: Array,
82 V: VariableValue,
83 H: CHasher,
84 T: Translator,
85 M: MerkleizationState<DigestOf<H>> + Send + Sync = Merkleized<H>,
86 D: DurabilityState = Durable,
87> {
88 journal: Journal<E, K, V, H, M>,
90
91 snapshot: Index<T, Location>,
97
98 last_commit_loc: Location,
100
101 _durable: core::marker::PhantomData<D>,
103}
104
105impl<
107 E: RStorage + Clock + Metrics,
108 K: Array,
109 V: VariableValue,
110 H: CHasher,
111 T: Translator,
112 M: MerkleizationState<DigestOf<H>> + Send + Sync,
113 D: DurabilityState,
114 > Immutable<E, K, V, H, T, M, D>
115{
116 pub fn size(&self) -> Location {
118 self.bounds().end
119 }
120
121 pub fn bounds(&self) -> std::ops::Range<Location> {
124 self.journal.bounds()
125 }
126
127 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
130 let oldest = self.journal.bounds().start;
131 let iter = self.snapshot.get(key);
132 for &loc in iter {
133 if loc < oldest {
134 continue;
135 }
136 if let Some(v) = self.get_from_loc(key, loc).await? {
137 return Ok(Some(v));
138 }
139 }
140
141 Ok(None)
142 }
143
144 async fn get_from_loc(&self, key: &K, loc: Location) -> Result<Option<V>, Error> {
148 if loc < self.journal.bounds().start {
149 return Err(Error::OperationPruned(loc));
150 }
151
152 let Operation::Set(k, v) = self.journal.read(loc).await? else {
153 return Err(Error::UnexpectedData(loc));
154 };
155
156 if k != *key {
157 Ok(None)
158 } else {
159 Ok(Some(v))
160 }
161 }
162
163 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
165 let last_commit_loc = self.last_commit_loc;
166 let Operation::Commit(metadata) = self.journal.read(last_commit_loc).await? else {
167 unreachable!("no commit operation at location of last commit {last_commit_loc}");
168 };
169
170 Ok(metadata)
171 }
172}
173
174impl<
176 E: RStorage + Clock + Metrics,
177 K: Array,
178 V: VariableValue,
179 H: CHasher,
180 T: Translator,
181 D: DurabilityState,
182 > Immutable<E, K, V, H, T, Merkleized<H>, D>
183{
184 pub const fn root(&self) -> H::Digest {
186 self.journal.root()
187 }
188
189 pub async fn proof(
196 &self,
197 start_index: Location,
198 max_ops: NonZeroU64,
199 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
200 let op_count = self.bounds().end;
201 self.historical_proof(op_count, start_index, max_ops).await
202 }
203
204 pub async fn historical_proof(
215 &self,
216 op_count: Location,
217 start_loc: Location,
218 max_ops: NonZeroU64,
219 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
220 Ok(self
221 .journal
222 .historical_proof(op_count, start_loc, max_ops)
223 .await?)
224 }
225
226 pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
234 if loc > self.last_commit_loc {
235 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
236 }
237 self.journal.prune(loc).await?;
238
239 Ok(())
240 }
241}
242
243impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
245 Immutable<E, K, V, H, T, Merkleized<H>, Durable>
246{
247 pub async fn init(
250 context: E,
251 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
252 ) -> Result<Self, Error> {
253 let mmr_cfg = MmrConfig {
254 journal_partition: cfg.mmr_journal_partition,
255 metadata_partition: cfg.mmr_metadata_partition,
256 items_per_blob: cfg.mmr_items_per_blob,
257 write_buffer: cfg.mmr_write_buffer,
258 thread_pool: cfg.thread_pool,
259 page_cache: cfg.page_cache.clone(),
260 };
261
262 let journal_cfg = JournalConfig {
263 partition: cfg.log_partition,
264 items_per_section: cfg.log_items_per_section,
265 compression: cfg.log_compression,
266 codec_config: cfg.log_codec_config,
267 page_cache: cfg.page_cache.clone(),
268 write_buffer: cfg.log_write_buffer,
269 };
270
271 let mut journal = Journal::new(
272 context.clone(),
273 mmr_cfg,
274 journal_cfg,
275 Operation::<K, V>::is_commit,
276 )
277 .await?;
278
279 if journal.size() == 0 {
280 warn!("Authenticated log is empty, initialized new db.");
281 let mut dirty_journal = journal.into_dirty();
282 dirty_journal.append(Operation::Commit(None)).await?;
283 journal = dirty_journal.merkleize();
284 journal.sync().await?;
285 }
286
287 let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator.clone());
288
289 let start_loc = journal.bounds().start;
291
292 build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
294
295 let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
296
297 Ok(Self {
298 journal,
299 snapshot,
300 last_commit_loc,
301 _durable: core::marker::PhantomData,
302 })
303 }
304
305 pub async fn sync(&mut self) -> Result<(), Error> {
309 Ok(self.journal.sync().await?)
310 }
311
312 pub async fn destroy(self) -> Result<(), Error> {
314 Ok(self.journal.destroy().await?)
315 }
316
317 pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
319 Immutable {
320 journal: self.journal.into_dirty(),
321 snapshot: self.snapshot,
322 last_commit_loc: self.last_commit_loc,
323 _durable: core::marker::PhantomData,
324 }
325 }
326}
327
328impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
330 Immutable<E, K, V, H, T, Unmerkleized, Durable>
331{
332 pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
334 Immutable {
335 journal: self.journal,
336 snapshot: self.snapshot,
337 last_commit_loc: self.last_commit_loc,
338 _durable: core::marker::PhantomData,
339 }
340 }
341
342 pub fn into_merkleized(self) -> Immutable<E, K, V, H, T, Merkleized<H>, Durable> {
344 Immutable {
345 journal: self.journal.merkleize(),
346 snapshot: self.snapshot,
347 last_commit_loc: self.last_commit_loc,
348 _durable: core::marker::PhantomData,
349 }
350 }
351}
352
353impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
355 Immutable<E, K, V, H, T, Merkleized<H>, NonDurable>
356{
357 pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
359 Immutable {
360 journal: self.journal.into_dirty(),
361 snapshot: self.snapshot,
362 last_commit_loc: self.last_commit_loc,
363 _durable: core::marker::PhantomData,
364 }
365 }
366}
367
368impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
370 Immutable<E, K, V, H, T, Unmerkleized, NonDurable>
371{
372 pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
375 self.journal.append(op).await?;
376
377 Ok(())
378 }
379
380 pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
387 let bounds = self.bounds();
388 self.snapshot
389 .insert_and_prune(&key, bounds.end, |v| *v < bounds.start);
390
391 let op = Operation::Set(key, value);
392 self.apply_op(op).await
393 }
394
395 pub async fn commit(
400 mut self,
401 metadata: Option<V>,
402 ) -> Result<
403 (
404 Immutable<E, K, V, H, T, Unmerkleized, Durable>,
405 Range<Location>,
406 ),
407 Error,
408 > {
409 let loc = self.journal.append(Operation::Commit(metadata)).await?;
410 self.journal.commit().await?;
411 self.last_commit_loc = loc;
412 let range = loc..self.bounds().end;
413
414 let db = Immutable {
415 journal: self.journal,
416 snapshot: self.snapshot,
417 last_commit_loc: self.last_commit_loc,
418 _durable: core::marker::PhantomData,
419 };
420
421 Ok((db, range))
422 }
423
424 pub fn into_merkleized(self) -> Immutable<E, K, V, H, T, Merkleized<H>, NonDurable> {
426 Immutable {
427 journal: self.journal.merkleize(),
428 snapshot: self.snapshot,
429 last_commit_loc: self.last_commit_loc,
430 _durable: core::marker::PhantomData,
431 }
432 }
433}
434
435impl<
436 E: RStorage + Clock + Metrics,
437 K: Array,
438 V: VariableValue,
439 H: CHasher,
440 T: Translator,
441 M: MerkleizationState<DigestOf<H>> + Send + Sync,
442 D: DurabilityState,
443 > kv::Gettable for Immutable<E, K, V, H, T, M, D>
444{
445 type Key = K;
446 type Value = V;
447 type Error = Error;
448
449 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
450 self.get(key).await
451 }
452}
453
454impl<
455 E: RStorage + Clock + Metrics,
456 K: Array,
457 V: VariableValue,
458 H: CHasher,
459 T: Translator,
460 M: MerkleizationState<DigestOf<H>> + Send + Sync,
461 D: DurabilityState,
462 > crate::qmdb::store::LogStore for Immutable<E, K, V, H, T, M, D>
463{
464 type Value = V;
465
466 fn bounds(&self) -> std::ops::Range<Location> {
467 self.journal.bounds()
468 }
469
470 fn inactivity_floor_loc(&self) -> Location {
472 self.bounds().start
473 }
474
475 fn is_empty(&self) -> bool {
476 self.bounds().is_empty()
477 }
478
479 async fn get_metadata(&self) -> Result<Option<V>, Error> {
480 self.get_metadata().await
481 }
482}
483
484impl<
485 E: RStorage + Clock + Metrics,
486 K: Array,
487 V: VariableValue,
488 H: CHasher,
489 T: Translator,
490 D: DurabilityState,
491 > crate::qmdb::store::MerkleizedStore for Immutable<E, K, V, H, T, Merkleized<H>, D>
492{
493 type Digest = H::Digest;
494 type Operation = Operation<K, V>;
495
496 fn root(&self) -> Self::Digest {
497 self.root()
498 }
499
500 async fn historical_proof(
501 &self,
502 historical_size: Location,
503 start_loc: Location,
504 max_ops: NonZeroU64,
505 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
506 self.historical_proof(historical_size, start_loc, max_ops)
507 .await
508 }
509}
510
511impl<
512 E: RStorage + Clock + Metrics,
513 K: Array,
514 V: VariableValue,
515 H: CHasher,
516 T: Translator,
517 D: DurabilityState,
518 > crate::qmdb::store::PrunableStore for Immutable<E, K, V, H, T, Merkleized<H>, D>
519{
520 async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
521 self.prune(prune_loc).await
522 }
523}
524
525#[cfg(test)]
526pub(super) mod test {
527 use super::*;
528 use crate::{mmr::StandardHasher, qmdb::verify_proof, translator::TwoCap};
529 use commonware_cryptography::{sha256::Digest, Sha256};
530 use commonware_macros::test_traced;
531 use commonware_runtime::{
532 deterministic::{self},
533 Runner as _,
534 };
535 use commonware_utils::{NZUsize, NZU16, NZU64};
536 use std::num::NonZeroU16;
537
538 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
539 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
540 const ITEMS_PER_SECTION: u64 = 5;
541
542 pub(crate) fn db_config(
543 suffix: &str,
544 ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
545 Config {
546 mmr_journal_partition: format!("journal_{suffix}"),
547 mmr_metadata_partition: format!("metadata_{suffix}"),
548 mmr_items_per_blob: NZU64!(11),
549 mmr_write_buffer: NZUsize!(1024),
550 log_partition: format!("log_{suffix}"),
551 log_items_per_section: NZU64!(ITEMS_PER_SECTION),
552 log_compression: None,
553 log_codec_config: ((0..=10000).into(), ()),
554 log_write_buffer: NZUsize!(1024),
555 translator: TwoCap,
556 thread_pool: None,
557 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
558 }
559 }
560
561 async fn open_db(
563 context: deterministic::Context,
564 ) -> Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap> {
565 Immutable::init(context, db_config("partition"))
566 .await
567 .unwrap()
568 }
569
570 #[test_traced("WARN")]
571 pub fn test_immutable_db_empty() {
572 let executor = deterministic::Runner::default();
573 executor.start(|context| async move {
574 let db = open_db(context.with_label("first")).await;
575 assert_eq!(db.bounds().end, 1);
576 assert_eq!(db.journal.bounds().start, Location::new_unchecked(0));
577 assert!(db.get_metadata().await.unwrap().is_none());
578
579 let k1 = Sha256::fill(1u8);
581 let v1 = vec![4, 5, 6, 7];
582 let root = db.root();
583 let mut db = db.into_mutable();
584 db.set(k1, v1).await.unwrap();
585 drop(db); let db = open_db(context.with_label("second")).await;
587 assert_eq!(db.root(), root);
588 assert_eq!(db.bounds().end, 1);
589
590 let db = db.into_mutable();
592 let (durable_db, _) = db.commit(None).await.unwrap();
593 let db = durable_db.into_merkleized();
594 assert_eq!(db.bounds().end, 2); let root = db.root();
596 drop(db);
597
598 let db = open_db(context.with_label("third")).await;
599 assert_eq!(db.root(), root);
600
601 db.destroy().await.unwrap();
602 });
603 }
604
605 #[test_traced("DEBUG")]
606 pub fn test_immutable_db_build_basic() {
607 let executor = deterministic::Runner::default();
608 executor.start(|context| async move {
609 let db = open_db(context.with_label("first")).await;
611
612 let k1 = Sha256::fill(1u8);
613 let k2 = Sha256::fill(2u8);
614 let v1 = vec![1, 2, 3];
615 let v2 = vec![4, 5, 6, 7, 8];
616
617 assert!(db.get(&k1).await.unwrap().is_none());
618 assert!(db.get(&k2).await.unwrap().is_none());
619
620 let mut db = db.into_mutable();
622 db.set(k1, v1.clone()).await.unwrap();
623 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
624 assert!(db.get(&k2).await.unwrap().is_none());
625 assert_eq!(db.bounds().end, 2);
626 let metadata = Some(vec![99, 100]);
628 let (durable_db, _) = db.commit(metadata.clone()).await.unwrap();
629 let db = durable_db.into_merkleized();
630 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
631 assert!(db.get(&k2).await.unwrap().is_none());
632 assert_eq!(db.bounds().end, 3);
633 assert_eq!(db.get_metadata().await.unwrap(), metadata.clone());
634 let mut db = db.into_mutable();
636 db.set(k2, v2.clone()).await.unwrap();
637 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
638 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
639 assert_eq!(db.bounds().end, 4);
640
641 assert_eq!(db.get_metadata().await.unwrap(), metadata);
643
644 let (durable_db, _) = db.commit(None).await.unwrap();
646 let db = durable_db.into_merkleized();
647 assert_eq!(db.bounds().end, 5);
648 assert_eq!(db.get_metadata().await.unwrap(), None);
649
650 let root = db.root();
652
653 let k3 = Sha256::fill(3u8);
655 let v3 = vec![9, 10, 11];
656 let mut db = db.into_mutable();
657 db.set(k3, v3).await.unwrap();
658 assert_eq!(db.bounds().end, 6);
659
660 drop(db); let db = open_db(context.with_label("second")).await;
663 assert!(db.get(&k3).await.unwrap().is_none());
664 assert_eq!(db.bounds().end, 5);
665 assert_eq!(db.root(), root);
666 assert_eq!(db.get_metadata().await.unwrap(), None);
667
668 db.destroy().await.unwrap();
670 });
671 }
672
673 #[test_traced("WARN")]
674 pub fn test_immutable_db_build_and_authenticate() {
675 let executor = deterministic::Runner::default();
676 const ELEMENTS: u64 = 2_000;
678 executor.start(|context| async move {
679 let mut hasher = StandardHasher::<Sha256>::new();
680 let db = open_db(context.with_label("first")).await;
681 let mut db = db.into_mutable();
682
683 for i in 0u64..ELEMENTS {
684 let k = Sha256::hash(&i.to_be_bytes());
685 let v = vec![i as u8; 100];
686 db.set(k, v).await.unwrap();
687 }
688
689 assert_eq!(db.bounds().end, ELEMENTS + 1);
690
691 let (durable_db, _) = db.commit(None).await.unwrap();
692 let db = durable_db.into_merkleized();
693 assert_eq!(db.bounds().end, ELEMENTS + 2);
694
695 let root = db.root();
697 drop(db);
698
699 let db = open_db(context.with_label("second")).await;
700 assert_eq!(root, db.root());
701 assert_eq!(db.bounds().end, ELEMENTS + 2);
702 for i in 0u64..ELEMENTS {
703 let k = Sha256::hash(&i.to_be_bytes());
704 let v = vec![i as u8; 100];
705 assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
706 }
707
708 let max_ops = NZU64!(5);
711 for i in 0..*db.bounds().end {
712 let (proof, log) = db.proof(Location::new_unchecked(i), max_ops).await.unwrap();
713 assert!(verify_proof(
714 &mut hasher,
715 &proof,
716 Location::new_unchecked(i),
717 &log,
718 &root
719 ));
720 }
721
722 db.destroy().await.unwrap();
723 });
724 }
725
726 #[test_traced("WARN")]
727 pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
728 let executor = deterministic::Runner::default();
729 executor.start(|context| async move {
730 const ELEMENTS: u64 = 1000;
732 let db = open_db(context.with_label("first")).await;
733 let mut db = db.into_mutable();
734
735 for i in 0u64..ELEMENTS {
736 let k = Sha256::hash(&i.to_be_bytes());
737 let v = vec![i as u8; 100];
738 db.set(k, v).await.unwrap();
739 }
740
741 assert_eq!(db.bounds().end, ELEMENTS + 1);
742 let (durable_db, _) = db.commit(None).await.unwrap();
743 let mut db = durable_db.into_merkleized();
744 db.sync().await.unwrap();
745 let halfway_root = db.root();
746
747 let mut db = db.into_mutable();
749 for i in 0u64..ELEMENTS {
750 let k = Sha256::hash(&i.to_be_bytes());
751 let v = vec![i as u8; 100];
752 db.set(k, v).await.unwrap();
753 }
754
755 let (durable_db, _) = db.commit(None).await.unwrap();
758 drop(durable_db); let db = open_db(context.with_label("second")).await;
763 assert_eq!(db.bounds().end, 2003);
764 let root = db.root();
765 assert_ne!(root, halfway_root);
766
767 drop(db);
769 let db = open_db(context.with_label("third")).await;
770 assert_eq!(db.bounds().end, 2003);
771 assert_eq!(db.root(), root);
772
773 db.destroy().await.unwrap();
774 });
775 }
776
777 #[test_traced("WARN")]
778 pub fn test_immutable_db_recovery_from_failed_log_sync() {
779 let executor = deterministic::Runner::default();
780 executor.start(|context| async move {
781 let mut db = open_db(context.with_label("first")).await.into_mutable();
782
783 let k1 = Sha256::fill(1u8);
785 let v1 = vec![1, 2, 3];
786 db.set(k1, v1).await.unwrap();
787 let (durable_db, _) = db.commit(None).await.unwrap();
788 let db = durable_db.into_merkleized();
789 let first_commit_root = db.root();
790
791 const ELEMENTS: u64 = 1000;
793
794 let mut db = db.into_mutable();
795 for i in 0u64..ELEMENTS {
796 let k = Sha256::hash(&i.to_be_bytes());
797 let v = vec![i as u8; 100];
798 db.set(k, v).await.unwrap();
799 }
800
801 assert_eq!(db.bounds().end, ELEMENTS + 3);
802
803 for i in 0u64..ELEMENTS {
805 let k = Sha256::hash(&i.to_be_bytes());
806 let v = vec![i as u8; 100];
807 db.set(k, v).await.unwrap();
808 }
809
810 drop(db);
812
813 let db = open_db(context.with_label("second")).await;
815 assert_eq!(db.bounds().end, 3);
816 let root = db.root();
817 assert_eq!(root, first_commit_root);
818
819 db.destroy().await.unwrap();
820 });
821 }
822
823 #[test_traced("WARN")]
824 pub fn test_immutable_db_pruning() {
825 let executor = deterministic::Runner::default();
826 const ELEMENTS: u64 = 2_000;
828 executor.start(|context| async move {
829 let db = open_db(context.with_label("first")).await;
830 let mut db = db.into_mutable();
831
832 for i in 1u64..ELEMENTS+1 {
833 let k = Sha256::hash(&i.to_be_bytes());
834 let v = vec![i as u8; 100];
835 db.set(k, v).await.unwrap();
836 }
837
838 assert_eq!(db.bounds().end, ELEMENTS + 1);
839
840 let (durable_db, _) = db.commit(None).await.unwrap();
841 let mut db = durable_db.into_merkleized();
842 assert_eq!(db.bounds().end, ELEMENTS + 2);
843
844 db.prune(Location::new_unchecked((ELEMENTS+2) / 2))
846 .await
847 .unwrap();
848 assert_eq!(db.bounds().end, ELEMENTS + 2);
849
850 let oldest_retained_loc = db.journal.bounds().start;
853 assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
854
855 let pruned_loc = oldest_retained_loc - 1;
857 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
858 assert!(db.get(&pruned_key).await.unwrap().is_none());
859
860 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
862 assert!(db.get(&unpruned_key).await.unwrap().is_some());
863
864 let root = db.root();
866 db.sync().await.unwrap();
867 drop(db);
868
869 let mut db = open_db(context.with_label("second")).await;
870 assert_eq!(root, db.root());
871 assert_eq!(db.bounds().end, ELEMENTS + 2);
872 let oldest_retained_loc = db.journal.bounds().start;
873 assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
874
875 let loc = Location::new_unchecked(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
877 db.prune(loc).await.unwrap();
878 let oldest_retained_loc = db.journal.bounds().start;
880 assert_eq!(
881 oldest_retained_loc,
882 Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
883 );
884
885 db.sync().await.unwrap();
887 drop(db);
888 let db = open_db(context.with_label("third")).await;
889 let oldest_retained_loc = db.journal.bounds().start;
890 assert_eq!(
891 oldest_retained_loc,
892 Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
893 );
894
895 let pruned_loc = oldest_retained_loc - 3;
897 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
898 assert!(db.get(&pruned_key).await.unwrap().is_none());
899
900 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
902 assert!(db.get(&unpruned_key).await.unwrap().is_some());
903
904 let pruned_pos = ELEMENTS / 2;
906 let proof_result = db
907 .proof(
908 Location::new_unchecked(pruned_pos),
909 NZU64!(pruned_pos + 100),
910 )
911 .await;
912 assert!(matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos));
913
914 db.destroy().await.unwrap();
915 });
916 }
917
918 #[test_traced("INFO")]
919 pub fn test_immutable_db_prune_beyond_commit() {
920 let executor = deterministic::Runner::default();
921 executor.start(|context| async move {
922 let mut db = open_db(context.with_label("test")).await;
923
924 let result = db.prune(Location::new_unchecked(1)).await;
926 assert!(
927 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
928 if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
929 );
930
931 let k1 = Digest::from(*b"12345678901234567890123456789012");
933 let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
934 let k3 = Digest::from(*b"99999999999999999999999999999999");
935 let v1 = vec![1u8; 16];
936 let v2 = vec![2u8; 16];
937 let v3 = vec![3u8; 16];
938
939 let mut db = db.into_mutable();
940 db.set(k1, v1.clone()).await.unwrap();
941 db.set(k2, v2.clone()).await.unwrap();
942 let (durable_db, _) = db.commit(None).await.unwrap();
943 let db = durable_db.into_merkleized();
944 let mut db = db.into_mutable();
945 db.set(k3, v3.clone()).await.unwrap();
946
947 assert_eq!(*db.last_commit_loc, 3);
949
950 let (durable_db, _) = db.commit(None).await.unwrap();
952 let mut db = durable_db.into_merkleized();
953 assert!(db.prune(Location::new_unchecked(3)).await.is_ok());
954
955 let new_last_commit = db.last_commit_loc;
957 let beyond = new_last_commit + 1;
958 let result = db.prune(beyond).await;
959 assert!(
960 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
961 if prune_loc == beyond && commit_loc == new_last_commit)
962 );
963
964 db.destroy().await.unwrap();
965 });
966 }
967
968 use crate::{
969 kv::tests::{assert_gettable, assert_send},
970 qmdb::store::tests::{assert_log_store, assert_merkleized_store, assert_prunable_store},
971 };
972
973 type MerkleizedDb =
974 Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap, Merkleized<Sha256>>;
975 type MutableDb = Immutable<
976 deterministic::Context,
977 Digest,
978 Vec<u8>,
979 Sha256,
980 TwoCap,
981 Unmerkleized,
982 NonDurable,
983 >;
984
985 #[allow(dead_code)]
986 fn assert_merkleized_db_futures_are_send(db: &mut MerkleizedDb, key: Digest, loc: Location) {
987 assert_gettable(db, &key);
988 assert_log_store(db);
989 assert_prunable_store(db, loc);
990 assert_merkleized_store(db, loc);
991 assert_send(db.sync());
992 }
993
994 #[allow(dead_code)]
995 fn assert_mutable_db_futures_are_send(db: &mut MutableDb, key: Digest, value: Vec<u8>) {
996 assert_gettable(db, &key);
997 assert_log_store(db);
998 assert_send(db.set(key, value));
999 }
1000
1001 #[allow(dead_code)]
1002 fn assert_mutable_db_commit_is_send(db: MutableDb) {
1003 assert_send(db.commit(None));
1004 }
1005}