1use crate::{
5 adb::{operation::variable::Operation, Error},
6 index::{Index as _, Unordered as Index},
7 journal::contiguous::variable,
8 mmr::{
9 journaled::{Config as MmrConfig, Mmr},
10 Location, Position, Proof, StandardHasher as Standard,
11 },
12 translator::Translator,
13};
14use commonware_codec::{Codec, Encode as _, Read};
15use commonware_cryptography::Hasher as CHasher;
16use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
17use commonware_utils::{Array, NZUsize};
18use futures::{future::TryFutureExt, pin_mut, try_join, StreamExt};
19use std::num::{NonZeroU64, NonZeroUsize};
20use tracing::{debug, warn};
21
22pub mod sync;
23
24const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
28
29#[derive(Clone)]
31pub struct Config<T: Translator, C> {
32 pub mmr_journal_partition: String,
34
35 pub mmr_items_per_blob: NonZeroU64,
37
38 pub mmr_write_buffer: NonZeroUsize,
40
41 pub mmr_metadata_partition: String,
43
44 pub log_partition: String,
46
47 pub log_write_buffer: NonZeroUsize,
49
50 pub log_compression: Option<u8>,
52
53 pub log_codec_config: C,
55
56 pub log_items_per_section: NonZeroU64,
58
59 pub translator: T,
61
62 pub thread_pool: Option<ThreadPool>,
64
65 pub buffer_pool: PoolRef,
67}
68
69pub struct Immutable<
72 E: RStorage + Clock + Metrics,
73 K: Array,
74 V: Codec + Send,
75 H: CHasher,
76 T: Translator,
77> {
78 mmr: Mmr<E, H>,
85
86 log: variable::Journal<E, Operation<K, V>>,
89
90 snapshot: Index<T, Location>,
96
97 hasher: Standard<H>,
99
100 last_commit: Option<Location>,
102}
103
104impl<E: RStorage + Clock + Metrics, K: Array, V: Codec + Send, H: CHasher, T: Translator>
105 Immutable<E, K, V, H, T>
106{
107 pub async fn init(
110 context: E,
111 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
112 ) -> Result<Self, Error> {
113 let mut hasher = Standard::<H>::new();
114
115 let mut mmr = Mmr::init(
116 context.with_label("mmr"),
117 &mut hasher,
118 MmrConfig {
119 journal_partition: cfg.mmr_journal_partition,
120 metadata_partition: cfg.mmr_metadata_partition,
121 items_per_blob: cfg.mmr_items_per_blob,
122 write_buffer: cfg.mmr_write_buffer,
123 thread_pool: cfg.thread_pool,
124 buffer_pool: cfg.buffer_pool.clone(),
125 },
126 )
127 .await?;
128
129 let mut log = variable::Journal::init(
130 context.with_label("log"),
131 variable::Config {
132 partition: cfg.log_partition.clone(),
133 items_per_section: cfg.log_items_per_section,
134 compression: cfg.log_compression,
135 codec_config: cfg.log_codec_config.clone(),
136 buffer_pool: cfg.buffer_pool.clone(),
137 write_buffer: cfg.log_write_buffer,
138 },
139 )
140 .await?;
141
142 let mut snapshot = Index::init(context.with_label("snapshot"), cfg.translator.clone());
144 let log_size =
145 Self::build_snapshot_from_log(&mut hasher, &mut mmr, &mut log, &mut snapshot).await?;
146
147 let last_commit = log_size.checked_sub(1);
148
149 Ok(Immutable {
150 mmr,
151 log,
152 snapshot,
153 hasher,
154 last_commit,
155 })
156 }
157
158 #[allow(clippy::type_complexity)]
160 pub async fn init_synced(
161 context: E,
162 mut cfg: sync::Config<E, K, V, T, H::Digest, <Operation<K, V> as Read>::Cfg>,
163 ) -> Result<Self, Error> {
164 let mut mmr = Mmr::init_sync(
166 context.with_label("mmr"),
167 crate::mmr::journaled::SyncConfig {
168 config: MmrConfig {
169 journal_partition: cfg.db_config.mmr_journal_partition,
170 metadata_partition: cfg.db_config.mmr_metadata_partition,
171 items_per_blob: cfg.db_config.mmr_items_per_blob,
172 write_buffer: cfg.db_config.mmr_write_buffer,
173 thread_pool: cfg.db_config.thread_pool.clone(),
174 buffer_pool: cfg.db_config.buffer_pool.clone(),
175 },
176 range: Position::try_from(cfg.range.start)?
177 ..Position::try_from(cfg.range.end.saturating_add(1))?,
178 pinned_nodes: cfg.pinned_nodes,
179 },
180 )
181 .await?;
182
183 let mut snapshot = Index::init(
185 context.with_label("snapshot"),
186 cfg.db_config.translator.clone(),
187 );
188 let log_size = Self::build_snapshot_from_log(
189 &mut Standard::<H>::new(),
190 &mut mmr,
191 &mut cfg.log,
192 &mut snapshot,
193 )
194 .await?;
195
196 let last_commit = log_size.checked_sub(1);
197
198 let mut db = Immutable {
199 mmr,
200 log: cfg.log,
201 snapshot,
202 hasher: Standard::<H>::new(),
203 last_commit,
204 };
205
206 db.sync().await?;
207 Ok(db)
208 }
209
210 pub(super) async fn build_snapshot_from_log(
221 hasher: &mut Standard<H>,
222 mmr: &mut Mmr<E, H>,
223 log: &mut variable::Journal<E, Operation<K, V>>,
224 snapshot: &mut Index<T, Location>,
225 ) -> Result<Location, Error> {
226 let mut mmr_leaves = mmr.leaves();
228
229 let start_loc = match log.oldest_retained_pos() {
231 Some(loc) => loc,
232 None => log.size(),
233 };
234
235 let mut log_size = Location::new_unchecked(start_loc);
237 let mut after_last_commit = None;
239 let mut uncommitted_ops = Vec::new();
241
242 {
245 let stream = log
246 .replay(start_loc, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
247 .await?;
248 pin_mut!(stream);
249 while let Some(result) = stream.next().await {
250 let (loc, op) = result?;
251
252 let loc = Location::new_unchecked(loc); if after_last_commit.is_none() {
254 after_last_commit = Some(loc);
255 }
256
257 log_size = loc + 1;
258
259 if log_size > mmr_leaves {
260 debug!(?loc, "operation was missing from MMR");
261 mmr.add(hasher, &op.encode()).await?;
262 mmr_leaves += 1;
263 }
264 match op {
265 Operation::Set(key, _) => {
266 uncommitted_ops.push((key, loc));
267 }
268 Operation::Commit(_) => {
269 for (key, loc) in uncommitted_ops.iter() {
270 snapshot.insert(key, *loc);
271 }
272 uncommitted_ops.clear();
273 after_last_commit = None;
274 }
275 _ => {
276 unreachable!("unsupported operation at location {loc}");
277 }
278 }
279 }
280 }
281
282 if let Some(end_loc) = after_last_commit {
284 assert!(!uncommitted_ops.is_empty());
285 warn!(
286 op_count = uncommitted_ops.len(),
287 log_size = *end_loc,
288 "rewinding over uncommitted operations at end of log"
289 );
290 log.rewind(*end_loc).await.map_err(Error::Journal)?;
291 log.sync().await.map_err(Error::Journal)?;
292 log_size = end_loc;
293 }
294
295 if mmr_leaves > log_size {
297 let op_count = (*mmr_leaves - *log_size) as usize;
298 warn!(op_count, "popping uncommitted MMR operations");
299 mmr.pop(op_count).await?;
300 }
301
302 assert_eq!(log_size, Location::try_from(mmr.size()).unwrap());
304 assert_eq!(log_size, log.size());
305
306 Ok(log_size)
307 }
308
309 pub fn oldest_retained_loc(&self) -> Option<Location> {
311 self.log.oldest_retained_pos().map(Location::new_unchecked)
312 }
313
314 pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
321 let last_commit = self.last_commit.unwrap_or(Location::new_unchecked(0));
322 if loc > last_commit {
323 return Err(Error::PruneBeyondCommit(loc, last_commit));
324 }
325
326 self.log.prune(*loc).await?;
332
333 let pruning_boundary = match self.oldest_retained_loc() {
335 Some(loc) => loc,
336 None => self.op_count(),
337 };
338
339 self.mmr
341 .prune_to_pos(&mut self.hasher, Position::try_from(pruning_boundary)?)
342 .await?;
343 Ok(())
344 }
345
346 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
349 let oldest = self
350 .oldest_retained_loc()
351 .unwrap_or(Location::new_unchecked(0));
352 let iter = self.snapshot.get(key);
353 for &loc in iter {
354 if loc < oldest {
355 continue;
356 }
357 if let Some(v) = self.get_from_loc(key, loc).await? {
358 return Ok(Some(v));
359 }
360 }
361
362 Ok(None)
363 }
364
365 pub async fn get_loc(&self, loc: Location) -> Result<Option<V>, Error> {
372 let op_count = self.op_count();
373 if loc >= op_count {
374 return Err(Error::LocationOutOfBounds(loc, op_count));
375 }
376 let pruning_boundary = match self.oldest_retained_loc() {
377 Some(oldest) => oldest,
378 None => self.op_count(),
379 };
380 if loc < pruning_boundary {
381 return Err(Error::OperationPruned(loc));
382 }
383
384 let op = self.log.read(*loc).await?;
385 Ok(op.into_value())
386 }
387
388 pub async fn get_from_loc(&self, key: &K, loc: Location) -> Result<Option<V>, Error> {
392 let pruning_boundary = match self.oldest_retained_loc() {
393 Some(oldest) => oldest,
394 None => self.op_count(),
395 };
396 if loc < pruning_boundary {
397 return Err(Error::OperationPruned(loc));
398 }
399
400 let Operation::Set(k, v) = self.log.read(*loc).await? else {
401 return Err(Error::UnexpectedData(loc));
402 };
403
404 if k != *key {
405 Ok(None)
406 } else {
407 Ok(Some(v))
408 }
409 }
410
411 pub fn op_count(&self) -> Location {
414 Location::new_unchecked(self.log.size())
415 }
416
417 pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
424 let op_count = self.op_count();
425 let oldest = self
426 .oldest_retained_loc()
427 .unwrap_or(Location::new_unchecked(0));
428 self.snapshot
429 .insert_and_prune(&key, op_count, |v| *v < oldest);
430
431 let op = Operation::Set(key, value);
432 self.apply_op(op).await
433 }
434
435 pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
441 self.mmr.root(hasher)
442 }
443
444 pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
447 let encoded_op = op.encode();
448
449 let mmr_fut = async {
451 self.mmr.add_batched(&mut self.hasher, &encoded_op).await?;
452 Ok::<(), Error>(())
453 };
454
455 let log_fut = async {
457 self.log.append(op).await?;
458 Ok::<(), Error>(())
459 };
460
461 try_join!(mmr_fut, log_fut)?;
463
464 Ok(())
465 }
466
467 pub async fn proof(
478 &self,
479 start_index: Location,
480 max_ops: NonZeroU64,
481 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
482 let op_count = self.op_count();
483 self.historical_proof(op_count, start_index, max_ops).await
484 }
485
486 pub async fn historical_proof(
497 &self,
498 op_count: Location,
499 start_loc: Location,
500 max_ops: NonZeroU64,
501 ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
502 let current_op_count = self.op_count();
503 if op_count > current_op_count {
504 return Err(crate::mmr::Error::RangeOutOfBounds(op_count).into());
505 }
506 if start_loc >= op_count {
507 return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
508 }
509 let pruning_boundary = match self.oldest_retained_loc() {
510 Some(oldest) => oldest,
511 None => self.op_count(),
512 };
513 if start_loc < pruning_boundary {
514 return Err(Error::OperationPruned(start_loc));
515 }
516
517 let mmr_size = Position::try_from(op_count)?;
518 let end_loc = std::cmp::min(op_count, start_loc.saturating_add(max_ops.get()));
519 let proof = self
520 .mmr
521 .historical_range_proof(mmr_size, start_loc..end_loc)
522 .await?;
523 let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
524 for loc in *start_loc..*end_loc {
525 let op = self.log.read(loc).await?;
526 ops.push(op);
527 }
528
529 Ok((proof, ops))
530 }
531
532 pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
538 self.last_commit = Some(self.op_count());
539 let op = Operation::<K, V>::Commit(metadata);
540 let encoded_op = op.encode();
541
542 let mmr_fut = async {
544 self.mmr.add_batched(&mut self.hasher, &encoded_op).await?;
545 self.mmr.merkleize(&mut self.hasher);
546 Ok::<(), Error>(())
547 };
548
549 let log_fut = async {
551 self.log.append(op).await?;
552 self.log.sync_data().await?;
553 Ok::<(), Error>(())
554 };
555
556 try_join!(mmr_fut, log_fut)?;
558
559 Ok(())
560 }
561
562 pub async fn get_metadata(&self) -> Result<Option<(Location, Option<V>)>, Error> {
565 let Some(last_commit) = self.last_commit else {
566 return Ok(None);
567 };
568 let Operation::Commit(metadata) = self.log.read(*last_commit).await? else {
569 unreachable!("no commit operation at location of last commit {last_commit}");
570 };
571
572 Ok(Some((last_commit, metadata)))
573 }
574
575 pub(super) async fn sync(&mut self) -> Result<(), Error> {
579 try_join!(
580 self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
581 self.log.sync().map_err(Error::Journal),
582 )?;
583
584 Ok(())
585 }
586
587 pub async fn close(mut self) -> Result<(), Error> {
589 try_join!(
590 self.log.close().map_err(Error::Journal),
591 self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
592 )?;
593
594 Ok(())
595 }
596
597 pub async fn destroy(self) -> Result<(), Error> {
599 try_join!(
600 self.log.destroy().map_err(Error::Journal),
601 self.mmr.destroy().map_err(Error::Mmr),
602 )?;
603
604 Ok(())
605 }
606
607 #[cfg(test)]
610 pub async fn simulate_failed_commit_mmr(mut self, write_limit: usize) -> Result<(), Error>
611 where
612 V: Default,
613 {
614 self.apply_op(Operation::Commit(None)).await?;
615 self.log.close().await?;
616 self.mmr
617 .simulate_partial_sync(&mut self.hasher, write_limit)
618 .await?;
619
620 Ok(())
621 }
622
623 #[cfg(test)]
626 pub async fn simulate_failed_commit_log(mut self) -> Result<(), Error>
627 where
628 V: Default,
629 {
630 self.apply_op(Operation::Commit(None)).await?;
631 let log_size = self.log.size();
632
633 self.mmr.close(&mut self.hasher).await?;
634 if log_size > 0 {
636 self.log.rewind(log_size - 1).await?;
637 }
638 self.log.close().await?;
639
640 Ok(())
641 }
642}
643
644#[cfg(test)]
645pub(super) mod test {
646 use super::*;
647 use crate::{adb::verify_proof, mmr::mem::Mmr as MemMmr, translator::TwoCap};
648 use commonware_cryptography::{sha256::Digest, Sha256};
649 use commonware_macros::test_traced;
650 use commonware_runtime::{
651 deterministic::{self},
652 Runner as _,
653 };
654 use commonware_utils::{NZUsize, NZU64};
655
656 const PAGE_SIZE: usize = 77;
657 const PAGE_CACHE_SIZE: usize = 9;
658 const ITEMS_PER_SECTION: u64 = 5;
659
660 pub(crate) fn db_config(
661 suffix: &str,
662 ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
663 Config {
664 mmr_journal_partition: format!("journal_{suffix}"),
665 mmr_metadata_partition: format!("metadata_{suffix}"),
666 mmr_items_per_blob: NZU64!(11),
667 mmr_write_buffer: NZUsize!(1024),
668 log_partition: format!("log_{suffix}"),
669 log_items_per_section: NZU64!(ITEMS_PER_SECTION),
670 log_compression: None,
671 log_codec_config: ((0..=10000).into(), ()),
672 log_write_buffer: NZUsize!(1024),
673 translator: TwoCap,
674 thread_pool: None,
675 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
676 }
677 }
678
679 type ImmutableTest = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
681
682 async fn open_db(context: deterministic::Context) -> ImmutableTest {
684 ImmutableTest::init(context, db_config("partition"))
685 .await
686 .unwrap()
687 }
688
689 #[test_traced("WARN")]
690 pub fn test_immutable_db_empty() {
691 let executor = deterministic::Runner::default();
692 executor.start(|context| async move {
693 let mut db = open_db(context.clone()).await;
694 let mut hasher = Standard::<Sha256>::new();
695 assert_eq!(db.op_count(), 0);
696 assert_eq!(db.oldest_retained_loc(), None);
697 assert_eq!(db.root(&mut hasher), MemMmr::default().root(&mut hasher));
698 assert!(db.get_metadata().await.unwrap().is_none());
699
700 let k1 = Sha256::fill(1u8);
702 let v1 = vec![4, 5, 6, 7];
703 let root = db.root(&mut hasher);
704 db.set(k1, v1).await.unwrap();
705 db.close().await.unwrap();
706 let mut db = open_db(context.clone()).await;
707 assert_eq!(db.root(&mut hasher), root);
708 assert_eq!(db.op_count(), 0);
709
710 db.commit(None).await.unwrap();
712 assert_eq!(db.op_count(), 1); let root = db.root(&mut hasher);
714 db.close().await.unwrap();
715
716 let db = open_db(context.clone()).await;
717 assert_eq!(db.root(&mut hasher), root);
718
719 db.destroy().await.unwrap();
720 });
721 }
722
723 #[test_traced("DEBUG")]
724 pub fn test_immutable_db_build_basic() {
725 let executor = deterministic::Runner::default();
726 executor.start(|context| async move {
727 let mut hasher = Standard::<Sha256>::new();
729 let mut db = open_db(context.clone()).await;
730
731 let k1 = Sha256::fill(1u8);
732 let k2 = Sha256::fill(2u8);
733 let v1 = vec![1, 2, 3];
734 let v2 = vec![4, 5, 6, 7, 8];
735
736 assert!(db.get(&k1).await.unwrap().is_none());
737 assert!(db.get(&k2).await.unwrap().is_none());
738
739 db.set(k1, v1.clone()).await.unwrap();
741 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
742 assert!(db.get(&k2).await.unwrap().is_none());
743 assert_eq!(db.op_count(), 1);
744 let metadata = Some(vec![99, 100]);
746 db.commit(metadata.clone()).await.unwrap();
747 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
748 assert!(db.get(&k2).await.unwrap().is_none());
749 assert_eq!(db.op_count(), 2);
750 assert_eq!(
751 db.get_metadata().await.unwrap(),
752 Some((Location::new_unchecked(1), metadata.clone()))
753 );
754 db.set(k2, v2.clone()).await.unwrap();
756 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
757 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
758 assert_eq!(db.op_count(), 3);
759
760 assert_eq!(
762 db.get_metadata().await.unwrap(),
763 Some((Location::new_unchecked(1), metadata))
764 );
765
766 db.commit(None).await.unwrap();
768 assert_eq!(db.op_count(), 4);
769 assert_eq!(
770 db.get_metadata().await.unwrap(),
771 Some((Location::new_unchecked(3), None))
772 );
773
774 let root = db.root(&mut hasher);
776
777 let k3 = Sha256::fill(3u8);
779 let v3 = vec![9, 10, 11];
780 db.set(k3, v3).await.unwrap();
781 assert_eq!(db.op_count(), 5);
782 assert_ne!(db.root(&mut hasher), root);
783
784 db.close().await.unwrap();
786 let db = open_db(context.clone()).await;
787 assert!(db.get(&k3).await.unwrap().is_none());
788 assert_eq!(db.op_count(), 4);
789 assert_eq!(db.root(&mut hasher), root);
790 assert_eq!(
791 db.get_metadata().await.unwrap(),
792 Some((Location::new_unchecked(3), None))
793 );
794
795 db.destroy().await.unwrap();
797 });
798 }
799
800 #[test_traced("WARN")]
801 pub fn test_immutable_db_build_and_authenticate() {
802 let executor = deterministic::Runner::default();
803 const ELEMENTS: u64 = 2_000;
805 executor.start(|context| async move {
806 let mut hasher = Standard::<Sha256>::new();
807 let mut db = open_db(context.clone()).await;
808
809 for i in 0u64..ELEMENTS {
810 let k = Sha256::hash(&i.to_be_bytes());
811 let v = vec![i as u8; 100];
812 db.set(k, v).await.unwrap();
813 }
814
815 assert_eq!(db.op_count(), ELEMENTS);
816
817 db.commit(None).await.unwrap();
818 assert_eq!(db.op_count(), ELEMENTS + 1);
819
820 let root = db.root(&mut hasher);
822 db.close().await.unwrap();
823 let db = open_db(context.clone()).await;
824 assert_eq!(root, db.root(&mut hasher));
825 assert_eq!(db.op_count(), ELEMENTS + 1);
826 for i in 0u64..ELEMENTS {
827 let k = Sha256::hash(&i.to_be_bytes());
828 let v = vec![i as u8; 100];
829 assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
830 }
831
832 let max_ops = NZU64!(5);
835 for i in 0..*db.op_count() {
836 let (proof, log) = db.proof(Location::new_unchecked(i), max_ops).await.unwrap();
837 assert!(verify_proof(
838 &mut hasher,
839 &proof,
840 Location::new_unchecked(i),
841 &log,
842 &root
843 ));
844 }
845
846 db.destroy().await.unwrap();
847 });
848 }
849
850 #[test_traced("WARN")]
851 pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
852 let executor = deterministic::Runner::default();
853 executor.start(|context| async move {
854 const ELEMENTS: u64 = 1000;
856 let mut hasher = Standard::<Sha256>::new();
857 let mut db = open_db(context.clone()).await;
858
859 for i in 0u64..ELEMENTS {
860 let k = Sha256::hash(&i.to_be_bytes());
861 let v = vec![i as u8; 100];
862 db.set(k, v).await.unwrap();
863 }
864
865 assert_eq!(db.op_count(), ELEMENTS);
866 db.sync().await.unwrap();
867 let halfway_root = db.root(&mut hasher);
868
869 for i in 0u64..ELEMENTS {
871 let k = Sha256::hash(&i.to_be_bytes());
872 let v = vec![i as u8; 100];
873 db.set(k, v).await.unwrap();
874 }
875
876 db.simulate_failed_commit_mmr(101).await.unwrap();
878
879 let db = open_db(context.clone()).await;
881 assert_eq!(db.op_count(), 2001);
882 let root = db.root(&mut hasher);
883 assert_ne!(root, halfway_root);
884
885 db.close().await.unwrap();
887 let db = open_db(context.clone()).await;
888 assert_eq!(db.op_count(), 2001);
889 assert_eq!(db.root(&mut hasher), root);
890
891 db.destroy().await.unwrap();
892 });
893 }
894
895 #[test_traced("WARN")]
896 pub fn test_immutable_db_recovery_from_failed_log_sync() {
897 let executor = deterministic::Runner::default();
898 executor.start(|context| async move {
899 let mut hasher = Standard::<Sha256>::new();
900 let mut db = open_db(context.clone()).await;
901
902 let k1 = Sha256::fill(1u8);
904 let v1 = vec![1, 2, 3];
905 db.set(k1, v1).await.unwrap();
906 db.commit(None).await.unwrap();
907 let first_commit_root = db.root(&mut hasher);
908
909 const ELEMENTS: u64 = 1000;
911
912 for i in 0u64..ELEMENTS {
913 let k = Sha256::hash(&i.to_be_bytes());
914 let v = vec![i as u8; 100];
915 db.set(k, v).await.unwrap();
916 }
917
918 assert_eq!(db.op_count(), ELEMENTS + 2);
919 db.sync().await.unwrap();
920
921 for i in 0u64..ELEMENTS {
923 let k = Sha256::hash(&i.to_be_bytes());
924 let v = vec![i as u8; 100];
925 db.set(k, v).await.unwrap();
926 }
927
928 db.simulate_failed_commit_log().await.unwrap();
930
931 let db = open_db(context.clone()).await;
933 assert_eq!(db.op_count(), 2);
934 let root = db.root(&mut hasher);
935 assert_eq!(root, first_commit_root);
936
937 db.destroy().await.unwrap();
938 });
939 }
940
941 #[test_traced("WARN")]
942 pub fn test_immutable_db_pruning() {
943 let executor = deterministic::Runner::default();
944 const ELEMENTS: u64 = 2_000;
946 executor.start(|context| async move {
947 let mut hasher = Standard::<Sha256>::new();
948 let mut db = open_db(context.clone()).await;
949
950 for i in 0u64..ELEMENTS {
951 let k = Sha256::hash(&i.to_be_bytes());
952 let v = vec![i as u8; 100];
953 db.set(k, v).await.unwrap();
954 }
955
956 assert_eq!(db.op_count(), ELEMENTS);
957
958 db.commit(None).await.unwrap();
959 assert_eq!(db.op_count(), ELEMENTS + 1);
960
961 db.prune(Location::new_unchecked(ELEMENTS / 2))
963 .await
964 .unwrap();
965 assert_eq!(db.op_count(), ELEMENTS + 1);
966
967 let oldest_retained_loc = db.oldest_retained_loc().unwrap();
970 assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
971
972 let pruned_loc = oldest_retained_loc - 1;
974 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
975 assert!(db.get(&pruned_key).await.unwrap().is_none());
976
977 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
979 assert!(db.get(&unpruned_key).await.unwrap().is_some());
980
981 let root = db.root(&mut hasher);
983 db.close().await.unwrap();
984 let mut db = open_db(context.clone()).await;
985 assert_eq!(root, db.root(&mut hasher));
986 assert_eq!(db.op_count(), ELEMENTS + 1);
987 let oldest_retained_loc = db.oldest_retained_loc().unwrap();
988 assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
989
990 let loc = Location::new_unchecked(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
992 db.prune(loc).await.unwrap();
993 let oldest_retained_loc = db.oldest_retained_loc().unwrap();
995 assert_eq!(
996 oldest_retained_loc,
997 Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
998 );
999
1000 db.close().await.unwrap();
1002 let db = open_db(context.clone()).await;
1003 let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1004 assert_eq!(
1005 oldest_retained_loc,
1006 Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
1007 );
1008
1009 let pruned_loc = oldest_retained_loc - 3;
1011 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
1012 assert!(db.get(&pruned_key).await.unwrap().is_none());
1013
1014 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
1016 assert!(db.get(&unpruned_key).await.unwrap().is_some());
1017
1018 let pruned_pos = ELEMENTS / 2;
1020 let proof_result = db
1021 .proof(
1022 Location::new_unchecked(pruned_pos),
1023 NZU64!(pruned_pos + 100),
1024 )
1025 .await;
1026 assert!(matches!(proof_result, Err(Error::OperationPruned(pos)) if pos == pruned_pos));
1027
1028 db.destroy().await.unwrap();
1029 });
1030 }
1031
1032 #[test_traced("INFO")]
1033 pub fn test_immutable_db_get_loc_out_of_bounds() {
1034 let executor = deterministic::Runner::default();
1035 executor.start(|context| async move {
1036 let mut db = open_db(context.clone()).await;
1037
1038 let result = db.get_loc(Location::new_unchecked(0)).await;
1040 assert!(matches!(result, Err(Error::LocationOutOfBounds(loc, size))
1041 if loc == Location::new_unchecked(0) && size == Location::new_unchecked(0)));
1042
1043 let k1 = Digest::from(*b"12345678901234567890123456789012");
1045 let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
1046 let v1 = vec![1u8; 16];
1047 let v2 = vec![2u8; 16];
1048
1049 db.set(k1, v1.clone()).await.unwrap();
1050 db.set(k2, v2.clone()).await.unwrap();
1051 db.commit(None).await.unwrap();
1052
1053 assert_eq!(
1055 db.get_loc(Location::new_unchecked(0))
1056 .await
1057 .unwrap()
1058 .unwrap(),
1059 v1
1060 );
1061 assert_eq!(
1062 db.get_loc(Location::new_unchecked(1))
1063 .await
1064 .unwrap()
1065 .unwrap(),
1066 v2
1067 );
1068
1069 let result = db.get_loc(Location::new_unchecked(3)).await;
1071 assert!(matches!(result, Err(Error::LocationOutOfBounds(loc, size))
1072 if loc == Location::new_unchecked(3) && size == Location::new_unchecked(3)));
1073
1074 db.destroy().await.unwrap();
1075 });
1076 }
1077
1078 #[test_traced("INFO")]
1079 pub fn test_immutable_db_prune_beyond_commit() {
1080 let executor = deterministic::Runner::default();
1081 executor.start(|context| async move {
1082 let mut db = open_db(context.clone()).await;
1083
1084 let result = db.prune(Location::new_unchecked(1)).await;
1086 assert!(
1087 matches!(result, Err(Error::PruneBeyondCommit(prune_loc, commit_loc))
1088 if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
1089 );
1090
1091 let k1 = Digest::from(*b"12345678901234567890123456789012");
1093 let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
1094 let k3 = Digest::from(*b"99999999999999999999999999999999");
1095 let v1 = vec![1u8; 16];
1096 let v2 = vec![2u8; 16];
1097 let v3 = vec![3u8; 16];
1098
1099 db.set(k1, v1.clone()).await.unwrap();
1100 db.set(k2, v2.clone()).await.unwrap();
1101 db.commit(None).await.unwrap();
1102 db.set(k3, v3.clone()).await.unwrap();
1103
1104 let last_commit = db.last_commit.unwrap();
1106 assert_eq!(last_commit, Location::new_unchecked(2));
1107
1108 assert!(db.prune(last_commit).await.is_ok());
1110
1111 db.commit(None).await.unwrap();
1113 let new_last_commit = db.last_commit.unwrap();
1114
1115 let beyond = Location::new_unchecked(*new_last_commit + 1);
1117 let result = db.prune(beyond).await;
1118 assert!(
1119 matches!(result, Err(Error::PruneBeyondCommit(prune_loc, commit_loc))
1120 if prune_loc == beyond && commit_loc == new_last_commit)
1121 );
1122
1123 db.destroy().await.unwrap();
1124 });
1125 }
1126}