1use crate::{
5 index::{unordered::Index, Unordered as _},
6 journal::{
7 authenticated,
8 contiguous::variable::{self, Config as JournalConfig},
9 },
10 mmr::{
11 journaled::{Config as MmrConfig, Mmr},
12 mem::{Clean, Dirty, State},
13 Location, Position, Proof, StandardHasher as Standard,
14 },
15 qmdb::{any::VariableValue, build_snapshot_from_log, Error},
16 translator::Translator,
17};
18use commonware_codec::Read;
19use commonware_cryptography::{DigestOf, Hasher as CHasher};
20use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
21use commonware_utils::Array;
22use std::num::{NonZeroU64, NonZeroUsize};
23use tracing::warn;
24
25mod operation;
26pub use operation::Operation;
27
28type Journal<E, K, V, H, S> =
29 authenticated::Journal<E, variable::Journal<E, Operation<K, V>>, H, S>;
30
31pub mod sync;
32
33#[derive(Clone)]
35pub struct Config<T: Translator, C> {
36 pub mmr_journal_partition: String,
38
39 pub mmr_items_per_blob: NonZeroU64,
41
42 pub mmr_write_buffer: NonZeroUsize,
44
45 pub mmr_metadata_partition: String,
47
48 pub log_partition: String,
50
51 pub log_write_buffer: NonZeroUsize,
53
54 pub log_compression: Option<u8>,
56
57 pub log_codec_config: C,
59
60 pub log_items_per_section: NonZeroU64,
62
63 pub translator: T,
65
66 pub thread_pool: Option<ThreadPool>,
68
69 pub buffer_pool: PoolRef,
71}
72
73pub struct Immutable<
76 E: RStorage + Clock + Metrics,
77 K: Array,
78 V: VariableValue,
79 H: CHasher,
80 T: Translator,
81 S: State<DigestOf<H>> = Clean<DigestOf<H>>,
82> {
83 journal: Journal<E, K, V, H, S>,
85
86 snapshot: Index<T, Location>,
92
93 last_commit_loc: Location,
95}
96
97impl<
98 E: RStorage + Clock + Metrics,
99 K: Array,
100 V: VariableValue,
101 H: CHasher,
102 T: Translator,
103 S: State<DigestOf<H>>,
104 > Immutable<E, K, V, H, T, S>
105{
106 pub fn oldest_retained_loc(&self) -> Location {
108 self.journal
109 .oldest_retained_loc()
110 .expect("at least one operation should exist")
111 }
112
113 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
116 let oldest = self.oldest_retained_loc();
117 let iter = self.snapshot.get(key);
118 for &loc in iter {
119 if loc < oldest {
120 continue;
121 }
122 if let Some(v) = self.get_from_loc(key, loc).await? {
123 return Ok(Some(v));
124 }
125 }
126
127 Ok(None)
128 }
129
130 async fn get_from_loc(&self, key: &K, loc: Location) -> Result<Option<V>, Error> {
134 if loc < self.oldest_retained_loc() {
135 return Err(Error::OperationPruned(loc));
136 }
137
138 let Operation::Set(k, v) = self.journal.read(loc).await? else {
139 return Err(Error::UnexpectedData(loc));
140 };
141
142 if k != *key {
143 Ok(None)
144 } else {
145 Ok(Some(v))
146 }
147 }
148
149 pub fn op_count(&self) -> Location {
152 self.journal.size()
153 }
154
155 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
157 let last_commit_loc = self.last_commit_loc;
158 let Operation::Commit(metadata) = self.journal.read(last_commit_loc).await? else {
159 unreachable!("no commit operation at location of last commit {last_commit_loc}");
160 };
161
162 Ok(metadata)
163 }
164
165 pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
168 self.journal.append(op).await?;
169
170 Ok(())
171 }
172}
173
174impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
175 Immutable<E, K, V, H, T, Clean<H::Digest>>
176{
177 pub async fn init(
180 context: E,
181 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
182 ) -> Result<Self, Error> {
183 let mmr_cfg = MmrConfig {
184 journal_partition: cfg.mmr_journal_partition,
185 metadata_partition: cfg.mmr_metadata_partition,
186 items_per_blob: cfg.mmr_items_per_blob,
187 write_buffer: cfg.mmr_write_buffer,
188 thread_pool: cfg.thread_pool,
189 buffer_pool: cfg.buffer_pool.clone(),
190 };
191
192 let journal_cfg = JournalConfig {
193 partition: cfg.log_partition,
194 items_per_section: cfg.log_items_per_section,
195 compression: cfg.log_compression,
196 codec_config: cfg.log_codec_config,
197 buffer_pool: cfg.buffer_pool.clone(),
198 write_buffer: cfg.log_write_buffer,
199 };
200
201 let mut journal = Journal::new(
202 context.clone(),
203 mmr_cfg,
204 journal_cfg,
205 Operation::<K, V>::is_commit,
206 )
207 .await?;
208
209 if journal.size() == 0 {
210 warn!("Authenticated log is empty, initialized new db.");
211 journal.append(Operation::Commit(None)).await?;
212 journal.sync().await?;
213 }
214
215 let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator.clone());
216
217 let start_loc = journal.pruning_boundary();
219
220 build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
222
223 let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
224
225 Ok(Self {
226 journal,
227 snapshot,
228 last_commit_loc,
229 })
230 }
231
232 const APPLY_BATCH_SIZE: u64 = 1 << 16;
234
235 #[allow(clippy::type_complexity)]
237 pub async fn init_synced(
238 context: E,
239 cfg: sync::Config<E, K, V, T, H::Digest, <Operation<K, V> as Read>::Cfg>,
240 ) -> Result<Self, Error> {
241 let mut hasher = Standard::new();
242
243 let mmr = Mmr::init_sync(
245 context.with_label("mmr"),
246 crate::mmr::journaled::SyncConfig {
247 config: MmrConfig {
248 journal_partition: cfg.db_config.mmr_journal_partition,
249 metadata_partition: cfg.db_config.mmr_metadata_partition,
250 items_per_blob: cfg.db_config.mmr_items_per_blob,
251 write_buffer: cfg.db_config.mmr_write_buffer,
252 thread_pool: cfg.db_config.thread_pool.clone(),
253 buffer_pool: cfg.db_config.buffer_pool.clone(),
254 },
255 range: Position::try_from(cfg.range.start)?
256 ..Position::try_from(cfg.range.end.saturating_add(1))?,
257 pinned_nodes: cfg.pinned_nodes,
258 },
259 &mut hasher,
260 )
261 .await?;
262
263 let journal = Journal::<_, _, _, _, Clean<DigestOf<H>>>::from_components(
264 mmr,
265 cfg.log,
266 hasher,
267 Self::APPLY_BATCH_SIZE,
268 )
269 .await?;
270
271 let mut snapshot: Index<T, Location> = Index::new(
272 context.with_label("snapshot"),
273 cfg.db_config.translator.clone(),
274 );
275
276 let start_loc = journal.pruning_boundary();
278
279 build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
281
282 let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
283
284 let mut db = Self {
285 journal,
286 snapshot,
287 last_commit_loc,
288 };
289
290 db.sync().await?;
291 Ok(db)
292 }
293
294 pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
302 if loc > self.last_commit_loc {
303 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
304 }
305 self.journal.prune(loc).await?;
306
307 Ok(())
308 }
309
310 pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
317 let op_count = self.op_count();
318 let oldest = self.oldest_retained_loc();
319 self.snapshot
320 .insert_and_prune(&key, op_count, |v| *v < oldest);
321
322 let op = Operation::Set(key, value);
323 self.apply_op(op).await
324 }
325
326 pub const fn root(&self) -> H::Digest {
328 self.journal.root()
329 }
330
331 pub async fn proof(
338 &self,
339 start_index: Location,
340 max_ops: NonZeroU64,
341 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
342 let op_count = self.op_count();
343 self.historical_proof(op_count, start_index, max_ops).await
344 }
345
346 pub async fn historical_proof(
357 &self,
358 op_count: Location,
359 start_loc: Location,
360 max_ops: NonZeroU64,
361 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
362 Ok(self
363 .journal
364 .historical_proof(op_count, start_loc, max_ops)
365 .await?)
366 }
367
368 pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
374 let loc = self.journal.append(Operation::Commit(metadata)).await?;
375 self.journal.commit().await?;
376 self.last_commit_loc = loc;
377
378 Ok(())
379 }
380
381 pub(super) async fn sync(&mut self) -> Result<(), Error> {
385 Ok(self.journal.sync().await?)
386 }
387
388 pub async fn close(self) -> Result<(), Error> {
390 Ok(self.journal.close().await?)
391 }
392
393 pub async fn destroy(self) -> Result<(), Error> {
395 Ok(self.journal.destroy().await?)
396 }
397
398 pub fn into_dirty(self) -> Immutable<E, K, V, H, T, Dirty> {
400 Immutable {
401 journal: self.journal.into_dirty(),
402 snapshot: self.snapshot,
403 last_commit_loc: self.last_commit_loc,
404 }
405 }
406
407 #[cfg(test)]
410 pub async fn simulate_failed_commit_mmr(mut self, write_limit: usize) -> Result<(), Error>
411 where
412 V: Default,
413 {
414 self.apply_op(Operation::Commit(None)).await?;
415 self.journal.journal.close().await?;
416 self.journal.mmr.simulate_partial_sync(write_limit).await?;
417
418 Ok(())
419 }
420
421 #[cfg(test)]
424 pub async fn simulate_failed_commit_log(mut self) -> Result<(), Error>
425 where
426 V: Default,
427 {
428 self.apply_op(Operation::Commit(None)).await?;
429 let log_size = self.journal.journal.size();
430
431 self.journal.mmr.close().await?;
432 if log_size > 0 {
434 self.journal.journal.rewind(log_size - 1).await?;
435 }
436 self.journal.journal.close().await?;
437
438 Ok(())
439 }
440}
441
442impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
443 Immutable<E, K, V, H, T, Dirty>
444{
445 pub fn merkleize(self) -> Immutable<E, K, V, H, T, Clean<H::Digest>> {
447 Immutable {
448 journal: self.journal.merkleize(),
449 snapshot: self.snapshot,
450 last_commit_loc: self.last_commit_loc,
451 }
452 }
453}
454
455impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
456 crate::store::Store for Immutable<E, K, V, H, T, Clean<H::Digest>>
457{
458 type Key = K;
459 type Value = V;
460 type Error = Error;
461
462 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
463 self.get(key).await
464 }
465}
466
467impl<
468 E: RStorage + Clock + Metrics,
469 K: Array,
470 V: VariableValue,
471 H: CHasher,
472 T: Translator,
473 S: State<DigestOf<H>>,
474 > crate::qmdb::store::LogStore for Immutable<E, K, V, H, T, S>
475{
476 type Value = V;
477
478 fn op_count(&self) -> Location {
479 self.op_count()
480 }
481
482 fn inactivity_floor_loc(&self) -> Location {
484 self.journal.pruning_boundary()
485 }
486
487 fn is_empty(&self) -> bool {
488 self.op_count() == 0
489 }
490
491 async fn get_metadata(&self) -> Result<Option<V>, Error> {
492 self.get_metadata().await
493 }
494}
495
496impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
497 crate::qmdb::store::CleanStore for Immutable<E, K, V, H, T, Clean<H::Digest>>
498{
499 type Digest = H::Digest;
500 type Operation = Operation<K, V>;
501 type Dirty = Immutable<E, K, V, H, T, Dirty>;
502
503 fn root(&self) -> Self::Digest {
504 self.root()
505 }
506
507 async fn proof(
508 &self,
509 start_loc: Location,
510 max_ops: NonZeroU64,
511 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
512 self.proof(start_loc, max_ops).await
513 }
514
515 async fn historical_proof(
516 &self,
517 historical_size: Location,
518 start_loc: Location,
519 max_ops: NonZeroU64,
520 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
521 self.historical_proof(historical_size, start_loc, max_ops)
522 .await
523 }
524
525 fn into_dirty(self) -> Self::Dirty {
526 self.into_dirty()
527 }
528}
529
530impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
531 crate::qmdb::store::DirtyStore for Immutable<E, K, V, H, T, Dirty>
532{
533 type Digest = H::Digest;
534 type Operation = Operation<K, V>;
535 type Clean = Immutable<E, K, V, H, T, Clean<H::Digest>>;
536
537 async fn merkleize(self) -> Result<Self::Clean, Error> {
538 Ok(self.merkleize())
539 }
540}
541
542#[cfg(test)]
543pub(super) mod test {
544 use super::*;
545 use crate::{qmdb::verify_proof, translator::TwoCap};
546 use commonware_cryptography::{sha256::Digest, Sha256};
547 use commonware_macros::test_traced;
548 use commonware_runtime::{
549 deterministic::{self},
550 Runner as _,
551 };
552 use commonware_utils::{NZUsize, NZU64};
553
554 const PAGE_SIZE: usize = 77;
555 const PAGE_CACHE_SIZE: usize = 9;
556 const ITEMS_PER_SECTION: u64 = 5;
557
558 pub(crate) fn db_config(
559 suffix: &str,
560 ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
561 Config {
562 mmr_journal_partition: format!("journal_{suffix}"),
563 mmr_metadata_partition: format!("metadata_{suffix}"),
564 mmr_items_per_blob: NZU64!(11),
565 mmr_write_buffer: NZUsize!(1024),
566 log_partition: format!("log_{suffix}"),
567 log_items_per_section: NZU64!(ITEMS_PER_SECTION),
568 log_compression: None,
569 log_codec_config: ((0..=10000).into(), ()),
570 log_write_buffer: NZUsize!(1024),
571 translator: TwoCap,
572 thread_pool: None,
573 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
574 }
575 }
576
577 type ImmutableTest = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
579
580 async fn open_db(context: deterministic::Context) -> ImmutableTest {
582 ImmutableTest::init(context, db_config("partition"))
583 .await
584 .unwrap()
585 }
586
587 #[test_traced("WARN")]
588 pub fn test_immutable_db_empty() {
589 let executor = deterministic::Runner::default();
590 executor.start(|context| async move {
591 let mut db = open_db(context.clone()).await;
592 assert_eq!(db.op_count(), 1);
593 assert_eq!(db.oldest_retained_loc(), Location::new_unchecked(0));
594 assert!(db.get_metadata().await.unwrap().is_none());
595
596 let k1 = Sha256::fill(1u8);
598 let v1 = vec![4, 5, 6, 7];
599 let root = db.root();
600 db.set(k1, v1).await.unwrap();
601 db.close().await.unwrap();
602 let mut db = open_db(context.clone()).await;
603 assert_eq!(db.root(), root);
604 assert_eq!(db.op_count(), 1);
605
606 db.commit(None).await.unwrap();
608 assert_eq!(db.op_count(), 2); let root = db.root();
610 db.close().await.unwrap();
611
612 let db = open_db(context.clone()).await;
613 assert_eq!(db.root(), root);
614
615 db.destroy().await.unwrap();
616 });
617 }
618
619 #[test_traced("DEBUG")]
620 pub fn test_immutable_db_build_basic() {
621 let executor = deterministic::Runner::default();
622 executor.start(|context| async move {
623 let mut db = open_db(context.clone()).await;
625
626 let k1 = Sha256::fill(1u8);
627 let k2 = Sha256::fill(2u8);
628 let v1 = vec![1, 2, 3];
629 let v2 = vec![4, 5, 6, 7, 8];
630
631 assert!(db.get(&k1).await.unwrap().is_none());
632 assert!(db.get(&k2).await.unwrap().is_none());
633
634 db.set(k1, v1.clone()).await.unwrap();
636 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
637 assert!(db.get(&k2).await.unwrap().is_none());
638 assert_eq!(db.op_count(), 2);
639 let metadata = Some(vec![99, 100]);
641 db.commit(metadata.clone()).await.unwrap();
642 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
643 assert!(db.get(&k2).await.unwrap().is_none());
644 assert_eq!(db.op_count(), 3);
645 assert_eq!(db.get_metadata().await.unwrap(), metadata.clone());
646 db.set(k2, v2.clone()).await.unwrap();
648 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
649 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
650 assert_eq!(db.op_count(), 4);
651
652 assert_eq!(db.get_metadata().await.unwrap(), metadata);
654
655 db.commit(None).await.unwrap();
657 assert_eq!(db.op_count(), 5);
658 assert_eq!(db.get_metadata().await.unwrap(), None);
659
660 let root = db.root();
662
663 let k3 = Sha256::fill(3u8);
665 let v3 = vec![9, 10, 11];
666 db.set(k3, v3).await.unwrap();
667 assert_eq!(db.op_count(), 6);
668 assert_ne!(db.root(), root);
669
670 db.close().await.unwrap();
672 let db = open_db(context.clone()).await;
673 assert!(db.get(&k3).await.unwrap().is_none());
674 assert_eq!(db.op_count(), 5);
675 assert_eq!(db.root(), root);
676 assert_eq!(db.get_metadata().await.unwrap(), None);
677
678 db.destroy().await.unwrap();
680 });
681 }
682
683 #[test_traced("WARN")]
684 pub fn test_immutable_db_build_and_authenticate() {
685 let executor = deterministic::Runner::default();
686 const ELEMENTS: u64 = 2_000;
688 executor.start(|context| async move {
689 let mut hasher = Standard::<Sha256>::new();
690 let mut db = open_db(context.clone()).await;
691
692 for i in 0u64..ELEMENTS {
693 let k = Sha256::hash(&i.to_be_bytes());
694 let v = vec![i as u8; 100];
695 db.set(k, v).await.unwrap();
696 }
697
698 assert_eq!(db.op_count(), ELEMENTS + 1);
699
700 db.commit(None).await.unwrap();
701 assert_eq!(db.op_count(), ELEMENTS + 2);
702
703 let root = db.root();
705 db.close().await.unwrap();
706 let db = open_db(context.clone()).await;
707 assert_eq!(root, db.root());
708 assert_eq!(db.op_count(), ELEMENTS + 2);
709 for i in 0u64..ELEMENTS {
710 let k = Sha256::hash(&i.to_be_bytes());
711 let v = vec![i as u8; 100];
712 assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
713 }
714
715 let max_ops = NZU64!(5);
718 for i in 0..*db.op_count() {
719 let (proof, log) = db.proof(Location::new_unchecked(i), max_ops).await.unwrap();
720 assert!(verify_proof(
721 &mut hasher,
722 &proof,
723 Location::new_unchecked(i),
724 &log,
725 &root
726 ));
727 }
728
729 db.destroy().await.unwrap();
730 });
731 }
732
733 #[test_traced("WARN")]
734 pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
735 let executor = deterministic::Runner::default();
736 executor.start(|context| async move {
737 const ELEMENTS: u64 = 1000;
739 let mut db = open_db(context.clone()).await;
740
741 for i in 0u64..ELEMENTS {
742 let k = Sha256::hash(&i.to_be_bytes());
743 let v = vec![i as u8; 100];
744 db.set(k, v).await.unwrap();
745 }
746
747 assert_eq!(db.op_count(), ELEMENTS + 1);
748 db.sync().await.unwrap();
749 let halfway_root = db.root();
750
751 for i in 0u64..ELEMENTS {
753 let k = Sha256::hash(&i.to_be_bytes());
754 let v = vec![i as u8; 100];
755 db.set(k, v).await.unwrap();
756 }
757
758 db.simulate_failed_commit_mmr(101).await.unwrap();
760
761 let db = open_db(context.clone()).await;
763 assert_eq!(db.op_count(), 2002);
764 let root = db.root();
765 assert_ne!(root, halfway_root);
766
767 db.close().await.unwrap();
769 let db = open_db(context.clone()).await;
770 assert_eq!(db.op_count(), 2002);
771 assert_eq!(db.root(), root);
772
773 db.destroy().await.unwrap();
774 });
775 }
776
777 #[test_traced("WARN")]
778 pub fn test_immutable_db_recovery_from_failed_log_sync() {
779 let executor = deterministic::Runner::default();
780 executor.start(|context| async move {
781 let mut db = open_db(context.clone()).await;
782
783 let k1 = Sha256::fill(1u8);
785 let v1 = vec![1, 2, 3];
786 db.set(k1, v1).await.unwrap();
787 db.commit(None).await.unwrap();
788 let first_commit_root = db.root();
789
790 const ELEMENTS: u64 = 1000;
792
793 for i in 0u64..ELEMENTS {
794 let k = Sha256::hash(&i.to_be_bytes());
795 let v = vec![i as u8; 100];
796 db.set(k, v).await.unwrap();
797 }
798
799 assert_eq!(db.op_count(), ELEMENTS + 3);
800 db.sync().await.unwrap();
801
802 for i in 0u64..ELEMENTS {
804 let k = Sha256::hash(&i.to_be_bytes());
805 let v = vec![i as u8; 100];
806 db.set(k, v).await.unwrap();
807 }
808
809 db.simulate_failed_commit_log().await.unwrap();
811
812 let db = open_db(context.clone()).await;
814 assert_eq!(db.op_count(), 3);
815 let root = db.root();
816 assert_eq!(root, first_commit_root);
817
818 db.destroy().await.unwrap();
819 });
820 }
821
822 #[test_traced("WARN")]
823 pub fn test_immutable_db_pruning() {
824 let executor = deterministic::Runner::default();
825 const ELEMENTS: u64 = 2_000;
827 executor.start(|context| async move {
828 let mut db = open_db(context.clone()).await;
829
830 for i in 1u64..ELEMENTS+1 {
831 let k = Sha256::hash(&i.to_be_bytes());
832 let v = vec![i as u8; 100];
833 db.set(k, v).await.unwrap();
834 }
835
836 assert_eq!(db.op_count(), ELEMENTS + 1);
837
838 db.commit(None).await.unwrap();
839 assert_eq!(db.op_count(), ELEMENTS + 2);
840
841 db.prune(Location::new_unchecked((ELEMENTS+2) / 2))
843 .await
844 .unwrap();
845 assert_eq!(db.op_count(), ELEMENTS + 2);
846
847 let oldest_retained_loc = db.oldest_retained_loc();
850 assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
851
852 let pruned_loc = oldest_retained_loc - 1;
854 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
855 assert!(db.get(&pruned_key).await.unwrap().is_none());
856
857 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
859 assert!(db.get(&unpruned_key).await.unwrap().is_some());
860
861 let root = db.root();
863 db.close().await.unwrap();
864 let mut db = open_db(context.clone()).await;
865 assert_eq!(root, db.root());
866 assert_eq!(db.op_count(), ELEMENTS + 2);
867 let oldest_retained_loc = db.oldest_retained_loc();
868 assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
869
870 let loc = Location::new_unchecked(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
872 db.prune(loc).await.unwrap();
873 let oldest_retained_loc = db.oldest_retained_loc();
875 assert_eq!(
876 oldest_retained_loc,
877 Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
878 );
879
880 db.close().await.unwrap();
882 let db = open_db(context.clone()).await;
883 let oldest_retained_loc = db.oldest_retained_loc();
884 assert_eq!(
885 oldest_retained_loc,
886 Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
887 );
888
889 let pruned_loc = oldest_retained_loc - 3;
891 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
892 assert!(db.get(&pruned_key).await.unwrap().is_none());
893
894 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
896 assert!(db.get(&unpruned_key).await.unwrap().is_some());
897
898 let pruned_pos = ELEMENTS / 2;
900 let proof_result = db
901 .proof(
902 Location::new_unchecked(pruned_pos),
903 NZU64!(pruned_pos + 100),
904 )
905 .await;
906 assert!(matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos));
907
908 db.destroy().await.unwrap();
909 });
910 }
911
912 #[test_traced("INFO")]
913 pub fn test_immutable_db_prune_beyond_commit() {
914 let executor = deterministic::Runner::default();
915 executor.start(|context| async move {
916 let mut db = open_db(context.clone()).await;
917
918 let result = db.prune(Location::new_unchecked(1)).await;
920 assert!(
921 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
922 if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
923 );
924
925 let k1 = Digest::from(*b"12345678901234567890123456789012");
927 let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
928 let k3 = Digest::from(*b"99999999999999999999999999999999");
929 let v1 = vec![1u8; 16];
930 let v2 = vec![2u8; 16];
931 let v3 = vec![3u8; 16];
932
933 db.set(k1, v1.clone()).await.unwrap();
934 db.set(k2, v2.clone()).await.unwrap();
935 db.commit(None).await.unwrap();
936 db.set(k3, v3.clone()).await.unwrap();
937
938 assert_eq!(*db.last_commit_loc, 3);
940
941 assert!(db.prune(db.last_commit_loc).await.is_ok());
943
944 db.commit(None).await.unwrap();
946 let new_last_commit = db.last_commit_loc;
947
948 let beyond = new_last_commit + 1;
950 let result = db.prune(beyond).await;
951 assert!(
952 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
953 if prune_loc == beyond && commit_loc == new_last_commit)
954 );
955
956 db.destroy().await.unwrap();
957 });
958 }
959}