1use crate::{
5 index::{unordered::Index, Unordered as _},
6 journal::{
7 authenticated,
8 contiguous::variable::{self, Config as JournalConfig},
9 },
10 kv,
11 mmr::{
12 journaled::{Config as MmrConfig, Mmr},
13 Location, Position, Proof, StandardHasher as Standard,
14 },
15 qmdb::{
16 any::VariableValue, build_snapshot_from_log, DurabilityState, Durable, Error,
17 MerkleizationState, Merkleized, NonDurable, Unmerkleized,
18 },
19 translator::Translator,
20};
21use commonware_codec::Read;
22use commonware_cryptography::{DigestOf, Hasher as CHasher};
23use commonware_parallel::ThreadPool;
24use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage};
25use commonware_utils::Array;
26use std::{
27 num::{NonZeroU64, NonZeroUsize},
28 ops::Range,
29};
30use tracing::warn;
31
32mod operation;
33pub use operation::Operation;
34
35type Journal<E, K, V, H, S> =
36 authenticated::Journal<E, variable::Journal<E, Operation<K, V>>, H, S>;
37
38pub mod sync;
39
40#[derive(Clone)]
42pub struct Config<T: Translator, C> {
43 pub mmr_journal_partition: String,
45
46 pub mmr_items_per_blob: NonZeroU64,
48
49 pub mmr_write_buffer: NonZeroUsize,
51
52 pub mmr_metadata_partition: String,
54
55 pub log_partition: String,
57
58 pub log_write_buffer: NonZeroUsize,
60
61 pub log_compression: Option<u8>,
63
64 pub log_codec_config: C,
66
67 pub log_items_per_section: NonZeroU64,
69
70 pub translator: T,
72
73 pub thread_pool: Option<ThreadPool>,
75
76 pub buffer_pool: PoolRef,
78}
79
80pub struct Immutable<
83 E: RStorage + Clock + Metrics,
84 K: Array,
85 V: VariableValue,
86 H: CHasher,
87 T: Translator,
88 M: MerkleizationState<DigestOf<H>> + Send + Sync = Merkleized<H>,
89 D: DurabilityState = Durable,
90> {
91 journal: Journal<E, K, V, H, M>,
93
94 snapshot: Index<T, Location>,
100
101 last_commit_loc: Location,
103
104 _durable: core::marker::PhantomData<D>,
106}
107
108impl<
110 E: RStorage + Clock + Metrics,
111 K: Array,
112 V: VariableValue,
113 H: CHasher,
114 T: Translator,
115 M: MerkleizationState<DigestOf<H>> + Send + Sync,
116 D: DurabilityState,
117 > Immutable<E, K, V, H, T, M, D>
118{
119 pub fn oldest_retained_loc(&self) -> Location {
121 self.journal
122 .oldest_retained_loc()
123 .expect("at least one operation should exist")
124 }
125
126 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
129 let oldest = self.oldest_retained_loc();
130 let iter = self.snapshot.get(key);
131 for &loc in iter {
132 if loc < oldest {
133 continue;
134 }
135 if let Some(v) = self.get_from_loc(key, loc).await? {
136 return Ok(Some(v));
137 }
138 }
139
140 Ok(None)
141 }
142
143 async fn get_from_loc(&self, key: &K, loc: Location) -> Result<Option<V>, Error> {
147 if loc < self.oldest_retained_loc() {
148 return Err(Error::OperationPruned(loc));
149 }
150
151 let Operation::Set(k, v) = self.journal.read(loc).await? else {
152 return Err(Error::UnexpectedData(loc));
153 };
154
155 if k != *key {
156 Ok(None)
157 } else {
158 Ok(Some(v))
159 }
160 }
161
162 pub fn op_count(&self) -> Location {
165 self.journal.size()
166 }
167
168 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
170 let last_commit_loc = self.last_commit_loc;
171 let Operation::Commit(metadata) = self.journal.read(last_commit_loc).await? else {
172 unreachable!("no commit operation at location of last commit {last_commit_loc}");
173 };
174
175 Ok(metadata)
176 }
177}
178
179impl<
181 E: RStorage + Clock + Metrics,
182 K: Array,
183 V: VariableValue,
184 H: CHasher,
185 T: Translator,
186 D: DurabilityState,
187 > Immutable<E, K, V, H, T, Merkleized<H>, D>
188{
189 pub const fn root(&self) -> H::Digest {
191 self.journal.root()
192 }
193
194 pub async fn proof(
201 &self,
202 start_index: Location,
203 max_ops: NonZeroU64,
204 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
205 let op_count = self.op_count();
206 self.historical_proof(op_count, start_index, max_ops).await
207 }
208
209 pub async fn historical_proof(
220 &self,
221 op_count: Location,
222 start_loc: Location,
223 max_ops: NonZeroU64,
224 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
225 Ok(self
226 .journal
227 .historical_proof(op_count, start_loc, max_ops)
228 .await?)
229 }
230
231 pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
239 if loc > self.last_commit_loc {
240 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
241 }
242 self.journal.prune(loc).await?;
243
244 Ok(())
245 }
246}
247
248impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
250 Immutable<E, K, V, H, T, Merkleized<H>, Durable>
251{
252 pub async fn init(
255 context: E,
256 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
257 ) -> Result<Self, Error> {
258 let mmr_cfg = MmrConfig {
259 journal_partition: cfg.mmr_journal_partition,
260 metadata_partition: cfg.mmr_metadata_partition,
261 items_per_blob: cfg.mmr_items_per_blob,
262 write_buffer: cfg.mmr_write_buffer,
263 thread_pool: cfg.thread_pool,
264 buffer_pool: cfg.buffer_pool.clone(),
265 };
266
267 let journal_cfg = JournalConfig {
268 partition: cfg.log_partition,
269 items_per_section: cfg.log_items_per_section,
270 compression: cfg.log_compression,
271 codec_config: cfg.log_codec_config,
272 buffer_pool: cfg.buffer_pool.clone(),
273 write_buffer: cfg.log_write_buffer,
274 };
275
276 let mut journal = Journal::new(
277 context.clone(),
278 mmr_cfg,
279 journal_cfg,
280 Operation::<K, V>::is_commit,
281 )
282 .await?;
283
284 if journal.size() == 0 {
285 warn!("Authenticated log is empty, initialized new db.");
286 journal.append(Operation::Commit(None)).await?;
287 journal.sync().await?;
288 }
289
290 let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator.clone());
291
292 let start_loc = journal.pruning_boundary();
294
295 build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
297
298 let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
299
300 Ok(Self {
301 journal,
302 snapshot,
303 last_commit_loc,
304 _durable: core::marker::PhantomData,
305 })
306 }
307
308 const APPLY_BATCH_SIZE: u64 = 1 << 16;
310
311 #[allow(clippy::type_complexity)]
313 pub async fn init_synced(
314 context: E,
315 cfg: sync::Config<E, K, V, T, H::Digest, <Operation<K, V> as Read>::Cfg>,
316 ) -> Result<Self, Error> {
317 let mut hasher = Standard::new();
318
319 let mmr = Mmr::init_sync(
321 context.with_label("mmr"),
322 crate::mmr::journaled::SyncConfig {
323 config: MmrConfig {
324 journal_partition: cfg.db_config.mmr_journal_partition,
325 metadata_partition: cfg.db_config.mmr_metadata_partition,
326 items_per_blob: cfg.db_config.mmr_items_per_blob,
327 write_buffer: cfg.db_config.mmr_write_buffer,
328 thread_pool: cfg.db_config.thread_pool.clone(),
329 buffer_pool: cfg.db_config.buffer_pool.clone(),
330 },
331 range: Position::try_from(cfg.range.start)?
332 ..Position::try_from(cfg.range.end.saturating_add(1))?,
333 pinned_nodes: cfg.pinned_nodes,
334 },
335 &mut hasher,
336 )
337 .await?;
338
339 let journal = Journal::<_, _, _, _, Merkleized<H>>::from_components(
340 mmr,
341 cfg.log,
342 hasher,
343 Self::APPLY_BATCH_SIZE,
344 )
345 .await?;
346
347 let mut snapshot: Index<T, Location> = Index::new(
348 context.with_label("snapshot"),
349 cfg.db_config.translator.clone(),
350 );
351
352 let start_loc = journal.pruning_boundary();
354
355 build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
357
358 let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
359
360 let mut db = Self {
361 journal,
362 snapshot,
363 last_commit_loc,
364 _durable: core::marker::PhantomData,
365 };
366
367 db.sync().await?;
368 Ok(db)
369 }
370
371 pub async fn sync(&mut self) -> Result<(), Error> {
375 Ok(self.journal.sync().await?)
376 }
377
378 pub async fn destroy(self) -> Result<(), Error> {
380 Ok(self.journal.destroy().await?)
381 }
382
383 pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
385 Immutable {
386 journal: self.journal.into_dirty(),
387 snapshot: self.snapshot,
388 last_commit_loc: self.last_commit_loc,
389 _durable: core::marker::PhantomData,
390 }
391 }
392}
393
394impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
396 Immutable<E, K, V, H, T, Unmerkleized, Durable>
397{
398 pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
400 Immutable {
401 journal: self.journal,
402 snapshot: self.snapshot,
403 last_commit_loc: self.last_commit_loc,
404 _durable: core::marker::PhantomData,
405 }
406 }
407
408 pub fn into_merkleized(self) -> Immutable<E, K, V, H, T, Merkleized<H>, Durable> {
410 Immutable {
411 journal: self.journal.merkleize(),
412 snapshot: self.snapshot,
413 last_commit_loc: self.last_commit_loc,
414 _durable: core::marker::PhantomData,
415 }
416 }
417}
418
419impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
421 Immutable<E, K, V, H, T, Merkleized<H>, NonDurable>
422{
423 pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
425 Immutable {
426 journal: self.journal.into_dirty(),
427 snapshot: self.snapshot,
428 last_commit_loc: self.last_commit_loc,
429 _durable: core::marker::PhantomData,
430 }
431 }
432}
433
434impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
436 Immutable<E, K, V, H, T, Unmerkleized, NonDurable>
437{
438 pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
441 self.journal.append(op).await?;
442
443 Ok(())
444 }
445
446 pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
453 let op_count = self.op_count();
454 let oldest = self.oldest_retained_loc();
455 self.snapshot
456 .insert_and_prune(&key, op_count, |v| *v < oldest);
457
458 let op = Operation::Set(key, value);
459 self.apply_op(op).await
460 }
461
462 pub async fn commit(
467 mut self,
468 metadata: Option<V>,
469 ) -> Result<
470 (
471 Immutable<E, K, V, H, T, Unmerkleized, Durable>,
472 Range<Location>,
473 ),
474 Error,
475 > {
476 let loc = self.journal.append(Operation::Commit(metadata)).await?;
477 self.journal.commit().await?;
478 self.last_commit_loc = loc;
479 let range = loc..self.op_count();
480
481 let db = Immutable {
482 journal: self.journal,
483 snapshot: self.snapshot,
484 last_commit_loc: self.last_commit_loc,
485 _durable: core::marker::PhantomData,
486 };
487
488 Ok((db, range))
489 }
490
491 pub fn into_merkleized(self) -> Immutable<E, K, V, H, T, Merkleized<H>, NonDurable> {
493 Immutable {
494 journal: self.journal.merkleize(),
495 snapshot: self.snapshot,
496 last_commit_loc: self.last_commit_loc,
497 _durable: core::marker::PhantomData,
498 }
499 }
500}
501
502impl<
503 E: RStorage + Clock + Metrics,
504 K: Array,
505 V: VariableValue,
506 H: CHasher,
507 T: Translator,
508 M: MerkleizationState<DigestOf<H>> + Send + Sync,
509 D: DurabilityState,
510 > kv::Gettable for Immutable<E, K, V, H, T, M, D>
511{
512 type Key = K;
513 type Value = V;
514 type Error = Error;
515
516 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
517 self.get(key).await
518 }
519}
520
521impl<
522 E: RStorage + Clock + Metrics,
523 K: Array,
524 V: VariableValue,
525 H: CHasher,
526 T: Translator,
527 M: MerkleizationState<DigestOf<H>> + Send + Sync,
528 D: DurabilityState,
529 > crate::qmdb::store::LogStore for Immutable<E, K, V, H, T, M, D>
530{
531 type Value = V;
532
533 fn op_count(&self) -> Location {
534 self.op_count()
535 }
536
537 fn inactivity_floor_loc(&self) -> Location {
539 self.journal.pruning_boundary()
540 }
541
542 fn is_empty(&self) -> bool {
543 self.op_count() == 0
544 }
545
546 async fn get_metadata(&self) -> Result<Option<V>, Error> {
547 self.get_metadata().await
548 }
549}
550
551impl<
552 E: RStorage + Clock + Metrics,
553 K: Array,
554 V: VariableValue,
555 H: CHasher,
556 T: Translator,
557 D: DurabilityState,
558 > crate::qmdb::store::MerkleizedStore for Immutable<E, K, V, H, T, Merkleized<H>, D>
559{
560 type Digest = H::Digest;
561 type Operation = Operation<K, V>;
562
563 fn root(&self) -> Self::Digest {
564 self.root()
565 }
566
567 async fn historical_proof(
568 &self,
569 historical_size: Location,
570 start_loc: Location,
571 max_ops: NonZeroU64,
572 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
573 self.historical_proof(historical_size, start_loc, max_ops)
574 .await
575 }
576}
577
578impl<
579 E: RStorage + Clock + Metrics,
580 K: Array,
581 V: VariableValue,
582 H: CHasher,
583 T: Translator,
584 D: DurabilityState,
585 > crate::qmdb::store::PrunableStore for Immutable<E, K, V, H, T, Merkleized<H>, D>
586{
587 async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
588 self.prune(prune_loc).await
589 }
590}
591
592#[cfg(test)]
593pub(super) mod test {
594 use super::*;
595 use crate::{qmdb::verify_proof, translator::TwoCap};
596 use commonware_cryptography::{sha256::Digest, Sha256};
597 use commonware_macros::test_traced;
598 use commonware_runtime::{
599 deterministic::{self},
600 Runner as _,
601 };
602 use commonware_utils::{NZUsize, NZU16, NZU64};
603 use std::num::NonZeroU16;
604
605 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
606 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
607 const ITEMS_PER_SECTION: u64 = 5;
608
609 pub(crate) fn db_config(
610 suffix: &str,
611 ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
612 Config {
613 mmr_journal_partition: format!("journal_{suffix}"),
614 mmr_metadata_partition: format!("metadata_{suffix}"),
615 mmr_items_per_blob: NZU64!(11),
616 mmr_write_buffer: NZUsize!(1024),
617 log_partition: format!("log_{suffix}"),
618 log_items_per_section: NZU64!(ITEMS_PER_SECTION),
619 log_compression: None,
620 log_codec_config: ((0..=10000).into(), ()),
621 log_write_buffer: NZUsize!(1024),
622 translator: TwoCap,
623 thread_pool: None,
624 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
625 }
626 }
627
628 async fn open_db(
630 context: deterministic::Context,
631 ) -> Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap> {
632 Immutable::init(context, db_config("partition"))
633 .await
634 .unwrap()
635 }
636
637 #[test_traced("WARN")]
638 pub fn test_immutable_db_empty() {
639 let executor = deterministic::Runner::default();
640 executor.start(|context| async move {
641 let db = open_db(context.clone()).await;
642 assert_eq!(db.op_count(), 1);
643 assert_eq!(db.oldest_retained_loc(), Location::new_unchecked(0));
644 assert!(db.get_metadata().await.unwrap().is_none());
645
646 let k1 = Sha256::fill(1u8);
648 let v1 = vec![4, 5, 6, 7];
649 let root = db.root();
650 let mut db = db.into_mutable();
651 db.set(k1, v1).await.unwrap();
652 drop(db); let db = open_db(context.clone()).await;
654 assert_eq!(db.root(), root);
655 assert_eq!(db.op_count(), 1);
656
657 let db = db.into_mutable();
659 let (durable_db, _) = db.commit(None).await.unwrap();
660 let db = durable_db.into_merkleized();
661 assert_eq!(db.op_count(), 2); let root = db.root();
663 drop(db);
664
665 let db = open_db(context.clone()).await;
666 assert_eq!(db.root(), root);
667
668 db.destroy().await.unwrap();
669 });
670 }
671
672 #[test_traced("DEBUG")]
673 pub fn test_immutable_db_build_basic() {
674 let executor = deterministic::Runner::default();
675 executor.start(|context| async move {
676 let db = open_db(context.clone()).await;
678
679 let k1 = Sha256::fill(1u8);
680 let k2 = Sha256::fill(2u8);
681 let v1 = vec![1, 2, 3];
682 let v2 = vec![4, 5, 6, 7, 8];
683
684 assert!(db.get(&k1).await.unwrap().is_none());
685 assert!(db.get(&k2).await.unwrap().is_none());
686
687 let mut db = db.into_mutable();
689 db.set(k1, v1.clone()).await.unwrap();
690 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
691 assert!(db.get(&k2).await.unwrap().is_none());
692 assert_eq!(db.op_count(), 2);
693 let metadata = Some(vec![99, 100]);
695 let (durable_db, _) = db.commit(metadata.clone()).await.unwrap();
696 let db = durable_db.into_merkleized();
697 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
698 assert!(db.get(&k2).await.unwrap().is_none());
699 assert_eq!(db.op_count(), 3);
700 assert_eq!(db.get_metadata().await.unwrap(), metadata.clone());
701 let mut db = db.into_mutable();
703 db.set(k2, v2.clone()).await.unwrap();
704 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
705 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
706 assert_eq!(db.op_count(), 4);
707
708 assert_eq!(db.get_metadata().await.unwrap(), metadata);
710
711 let (durable_db, _) = db.commit(None).await.unwrap();
713 let db = durable_db.into_merkleized();
714 assert_eq!(db.op_count(), 5);
715 assert_eq!(db.get_metadata().await.unwrap(), None);
716
717 let root = db.root();
719
720 let k3 = Sha256::fill(3u8);
722 let v3 = vec![9, 10, 11];
723 let mut db = db.into_mutable();
724 db.set(k3, v3).await.unwrap();
725 assert_eq!(db.op_count(), 6);
726
727 drop(db); let db = open_db(context.clone()).await;
730 assert!(db.get(&k3).await.unwrap().is_none());
731 assert_eq!(db.op_count(), 5);
732 assert_eq!(db.root(), root);
733 assert_eq!(db.get_metadata().await.unwrap(), None);
734
735 db.destroy().await.unwrap();
737 });
738 }
739
740 #[test_traced("WARN")]
741 pub fn test_immutable_db_build_and_authenticate() {
742 let executor = deterministic::Runner::default();
743 const ELEMENTS: u64 = 2_000;
745 executor.start(|context| async move {
746 let mut hasher = Standard::<Sha256>::new();
747 let db = open_db(context.clone()).await;
748 let mut db = db.into_mutable();
749
750 for i in 0u64..ELEMENTS {
751 let k = Sha256::hash(&i.to_be_bytes());
752 let v = vec![i as u8; 100];
753 db.set(k, v).await.unwrap();
754 }
755
756 assert_eq!(db.op_count(), ELEMENTS + 1);
757
758 let (durable_db, _) = db.commit(None).await.unwrap();
759 let db = durable_db.into_merkleized();
760 assert_eq!(db.op_count(), ELEMENTS + 2);
761
762 let root = db.root();
764 drop(db);
765
766 let db = open_db(context.clone()).await;
767 assert_eq!(root, db.root());
768 assert_eq!(db.op_count(), ELEMENTS + 2);
769 for i in 0u64..ELEMENTS {
770 let k = Sha256::hash(&i.to_be_bytes());
771 let v = vec![i as u8; 100];
772 assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
773 }
774
775 let max_ops = NZU64!(5);
778 for i in 0..*db.op_count() {
779 let (proof, log) = db.proof(Location::new_unchecked(i), max_ops).await.unwrap();
780 assert!(verify_proof(
781 &mut hasher,
782 &proof,
783 Location::new_unchecked(i),
784 &log,
785 &root
786 ));
787 }
788
789 db.destroy().await.unwrap();
790 });
791 }
792
793 #[test_traced("WARN")]
794 pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
795 let executor = deterministic::Runner::default();
796 executor.start(|context| async move {
797 const ELEMENTS: u64 = 1000;
799 let db = open_db(context.clone()).await;
800 let mut db = db.into_mutable();
801
802 for i in 0u64..ELEMENTS {
803 let k = Sha256::hash(&i.to_be_bytes());
804 let v = vec![i as u8; 100];
805 db.set(k, v).await.unwrap();
806 }
807
808 assert_eq!(db.op_count(), ELEMENTS + 1);
809 let (durable_db, _) = db.commit(None).await.unwrap();
810 let mut db = durable_db.into_merkleized();
811 db.sync().await.unwrap();
812 let halfway_root = db.root();
813
814 let mut db = db.into_mutable();
816 for i in 0u64..ELEMENTS {
817 let k = Sha256::hash(&i.to_be_bytes());
818 let v = vec![i as u8; 100];
819 db.set(k, v).await.unwrap();
820 }
821
822 let (durable_db, _) = db.commit(None).await.unwrap();
825 drop(durable_db); let db = open_db(context.clone()).await;
830 assert_eq!(db.op_count(), 2003);
831 let root = db.root();
832 assert_ne!(root, halfway_root);
833
834 drop(db);
836 let db = open_db(context.clone()).await;
837 assert_eq!(db.op_count(), 2003);
838 assert_eq!(db.root(), root);
839
840 db.destroy().await.unwrap();
841 });
842 }
843
844 #[test_traced("WARN")]
845 pub fn test_immutable_db_recovery_from_failed_log_sync() {
846 let executor = deterministic::Runner::default();
847 executor.start(|context| async move {
848 let mut db = open_db(context.clone()).await.into_mutable();
849
850 let k1 = Sha256::fill(1u8);
852 let v1 = vec![1, 2, 3];
853 db.set(k1, v1).await.unwrap();
854 let (durable_db, _) = db.commit(None).await.unwrap();
855 let db = durable_db.into_merkleized();
856 let first_commit_root = db.root();
857
858 const ELEMENTS: u64 = 1000;
860
861 let mut db = db.into_mutable();
862 for i in 0u64..ELEMENTS {
863 let k = Sha256::hash(&i.to_be_bytes());
864 let v = vec![i as u8; 100];
865 db.set(k, v).await.unwrap();
866 }
867
868 assert_eq!(db.op_count(), ELEMENTS + 3);
869
870 for i in 0u64..ELEMENTS {
872 let k = Sha256::hash(&i.to_be_bytes());
873 let v = vec![i as u8; 100];
874 db.set(k, v).await.unwrap();
875 }
876
877 drop(db);
879
880 let db = open_db(context.clone()).await;
882 assert_eq!(db.op_count(), 3);
883 let root = db.root();
884 assert_eq!(root, first_commit_root);
885
886 db.destroy().await.unwrap();
887 });
888 }
889
890 #[test_traced("WARN")]
891 pub fn test_immutable_db_pruning() {
892 let executor = deterministic::Runner::default();
893 const ELEMENTS: u64 = 2_000;
895 executor.start(|context| async move {
896 let db = open_db(context.clone()).await;
897 let mut db = db.into_mutable();
898
899 for i in 1u64..ELEMENTS+1 {
900 let k = Sha256::hash(&i.to_be_bytes());
901 let v = vec![i as u8; 100];
902 db.set(k, v).await.unwrap();
903 }
904
905 assert_eq!(db.op_count(), ELEMENTS + 1);
906
907 let (durable_db, _) = db.commit(None).await.unwrap();
908 let mut db = durable_db.into_merkleized();
909 assert_eq!(db.op_count(), ELEMENTS + 2);
910
911 db.prune(Location::new_unchecked((ELEMENTS+2) / 2))
913 .await
914 .unwrap();
915 assert_eq!(db.op_count(), ELEMENTS + 2);
916
917 let oldest_retained_loc = db.oldest_retained_loc();
920 assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
921
922 let pruned_loc = oldest_retained_loc - 1;
924 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
925 assert!(db.get(&pruned_key).await.unwrap().is_none());
926
927 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
929 assert!(db.get(&unpruned_key).await.unwrap().is_some());
930
931 let root = db.root();
933 db.sync().await.unwrap();
934 drop(db);
935
936 let mut db = open_db(context.clone()).await;
937 assert_eq!(root, db.root());
938 assert_eq!(db.op_count(), ELEMENTS + 2);
939 let oldest_retained_loc = db.oldest_retained_loc();
940 assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
941
942 let loc = Location::new_unchecked(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
944 db.prune(loc).await.unwrap();
945 let oldest_retained_loc = db.oldest_retained_loc();
947 assert_eq!(
948 oldest_retained_loc,
949 Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
950 );
951
952 db.sync().await.unwrap();
954 drop(db);
955 let db = open_db(context.clone()).await;
956 let oldest_retained_loc = db.oldest_retained_loc();
957 assert_eq!(
958 oldest_retained_loc,
959 Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
960 );
961
962 let pruned_loc = oldest_retained_loc - 3;
964 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
965 assert!(db.get(&pruned_key).await.unwrap().is_none());
966
967 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
969 assert!(db.get(&unpruned_key).await.unwrap().is_some());
970
971 let pruned_pos = ELEMENTS / 2;
973 let proof_result = db
974 .proof(
975 Location::new_unchecked(pruned_pos),
976 NZU64!(pruned_pos + 100),
977 )
978 .await;
979 assert!(matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos));
980
981 db.destroy().await.unwrap();
982 });
983 }
984
985 #[test_traced("INFO")]
986 pub fn test_immutable_db_prune_beyond_commit() {
987 let executor = deterministic::Runner::default();
988 executor.start(|context| async move {
989 let mut db = open_db(context.clone()).await;
990
991 let result = db.prune(Location::new_unchecked(1)).await;
993 assert!(
994 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
995 if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
996 );
997
998 let k1 = Digest::from(*b"12345678901234567890123456789012");
1000 let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
1001 let k3 = Digest::from(*b"99999999999999999999999999999999");
1002 let v1 = vec![1u8; 16];
1003 let v2 = vec![2u8; 16];
1004 let v3 = vec![3u8; 16];
1005
1006 let mut db = db.into_mutable();
1007 db.set(k1, v1.clone()).await.unwrap();
1008 db.set(k2, v2.clone()).await.unwrap();
1009 let (durable_db, _) = db.commit(None).await.unwrap();
1010 let db = durable_db.into_merkleized();
1011 let mut db = db.into_mutable();
1012 db.set(k3, v3.clone()).await.unwrap();
1013
1014 assert_eq!(*db.last_commit_loc, 3);
1016
1017 let (durable_db, _) = db.commit(None).await.unwrap();
1019 let mut db = durable_db.into_merkleized();
1020 assert!(db.prune(Location::new_unchecked(3)).await.is_ok());
1021
1022 let new_last_commit = db.last_commit_loc;
1024 let beyond = new_last_commit + 1;
1025 let result = db.prune(beyond).await;
1026 assert!(
1027 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1028 if prune_loc == beyond && commit_loc == new_last_commit)
1029 );
1030
1031 db.destroy().await.unwrap();
1032 });
1033 }
1034
1035 use crate::{
1036 kv::tests::{assert_gettable, assert_send},
1037 qmdb::store::tests::{assert_log_store, assert_merkleized_store, assert_prunable_store},
1038 };
1039
1040 type MerkleizedDb =
1041 Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap, Merkleized<Sha256>>;
1042 type MutableDb = Immutable<
1043 deterministic::Context,
1044 Digest,
1045 Vec<u8>,
1046 Sha256,
1047 TwoCap,
1048 Unmerkleized,
1049 NonDurable,
1050 >;
1051
1052 #[allow(dead_code)]
1053 fn assert_merkleized_db_futures_are_send(db: &mut MerkleizedDb, key: Digest, loc: Location) {
1054 assert_gettable(db, &key);
1055 assert_log_store(db);
1056 assert_prunable_store(db, loc);
1057 assert_merkleized_store(db, loc);
1058 assert_send(db.sync());
1059 }
1060
1061 #[allow(dead_code)]
1062 fn assert_mutable_db_futures_are_send(db: &mut MutableDb, key: Digest, value: Vec<u8>) {
1063 assert_gettable(db, &key);
1064 assert_log_store(db);
1065 assert_send(db.set(key, value));
1066 }
1067
1068 #[allow(dead_code)]
1069 fn assert_mutable_db_commit_is_send(db: MutableDb) {
1070 assert_send(db.commit(None));
1071 }
1072}