1use crate::{
5 adb::{any::fixed::sync::init_journal, Error},
6 index::Index,
7 journal::{fixed, variable},
8 mmr::{
9 hasher::Standard,
10 iterator::{leaf_num_to_pos, leaf_pos_to_num},
11 journaled::{Config as MmrConfig, Mmr},
12 verification::Proof,
13 },
14 store::operation::Variable,
15 translator::Translator,
16};
17use commonware_codec::{Codec, Encode as _, Read};
18use commonware_cryptography::Hasher as CHasher;
19use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
20use commonware_utils::{Array, NZUsize};
21use futures::{future::TryFutureExt, pin_mut, try_join, StreamExt};
22use std::num::{NonZeroU64, NonZeroUsize};
23use tracing::{debug, warn};
24
25pub mod sync;
26
27const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
31
32#[derive(Clone)]
34pub struct Config<T: Translator, C> {
35 pub mmr_journal_partition: String,
37
38 pub mmr_items_per_blob: NonZeroU64,
40
41 pub mmr_write_buffer: NonZeroUsize,
43
44 pub mmr_metadata_partition: String,
46
47 pub log_journal_partition: String,
49
50 pub log_write_buffer: NonZeroUsize,
52
53 pub log_compression: Option<u8>,
55
56 pub log_codec_config: C,
58
59 pub log_items_per_section: NonZeroU64,
61
62 pub locations_journal_partition: String,
64
65 pub locations_items_per_blob: NonZeroU64,
67
68 pub translator: T,
70
71 pub thread_pool: Option<ThreadPool>,
73
74 pub buffer_pool: PoolRef,
76}
77
78pub struct Immutable<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translator> {
81 mmr: Mmr<E, H>,
88
89 log: variable::Journal<E, Variable<K, V>>,
93
94 log_size: u64,
97
98 log_items_per_section: u64,
100
101 locations: fixed::Journal<E, u32>,
104
105 oldest_retained_loc: u64,
107
108 snapshot: Index<T, u64>,
114
115 hasher: Standard<H>,
117
118 last_commit: Option<u64>,
120}
121
122impl<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translator>
123 Immutable<E, K, V, H, T>
124{
125 pub async fn init(
128 context: E,
129 cfg: Config<T, <Variable<K, V> as Read>::Cfg>,
130 ) -> Result<Self, Error> {
131 let mut hasher = Standard::<H>::new();
132
133 let mut mmr = Mmr::init(
134 context.with_label("mmr"),
135 &mut hasher,
136 MmrConfig {
137 journal_partition: cfg.mmr_journal_partition,
138 metadata_partition: cfg.mmr_metadata_partition,
139 items_per_blob: cfg.mmr_items_per_blob,
140 write_buffer: cfg.mmr_write_buffer,
141 thread_pool: cfg.thread_pool,
142 buffer_pool: cfg.buffer_pool.clone(),
143 },
144 )
145 .await?;
146
147 let mut log = variable::Journal::init(
148 context.with_label("log"),
149 variable::Config {
150 partition: cfg.log_journal_partition,
151 compression: cfg.log_compression,
152 codec_config: cfg.log_codec_config,
153 buffer_pool: cfg.buffer_pool.clone(),
154 write_buffer: cfg.log_write_buffer,
155 },
156 )
157 .await?;
158
159 let mut locations = fixed::Journal::init(
160 context.with_label("locations"),
161 fixed::Config {
162 partition: cfg.locations_journal_partition,
163 items_per_blob: cfg.locations_items_per_blob,
164 write_buffer: cfg.log_write_buffer,
165 buffer_pool: cfg.buffer_pool,
166 },
167 )
168 .await?;
169
170 let mut snapshot: Index<T, u64> =
171 Index::init(context.with_label("snapshot"), cfg.translator.clone());
172 let (log_size, oldest_retained_loc) = Self::build_snapshot_from_log(
173 &mut hasher,
174 cfg.log_items_per_section,
175 &mut mmr,
176 &mut log,
177 &mut locations,
178 &mut snapshot,
179 )
180 .await?;
181
182 let last_commit = log_size.checked_sub(1);
183
184 Ok(Immutable {
185 mmr,
186 log,
187 log_size,
188 oldest_retained_loc,
189 locations,
190 log_items_per_section: cfg.log_items_per_section.get(),
191 snapshot,
192 hasher,
193 last_commit,
194 })
195 }
196
197 #[allow(clippy::type_complexity)]
199 pub async fn init_synced(
200 context: E,
201 mut cfg: sync::Config<E, K, V, T, H::Digest, <Variable<K, V> as Read>::Cfg>,
202 ) -> Result<Self, Error> {
203 let mut mmr = Mmr::init_sync(
205 context.with_label("mmr"),
206 crate::mmr::journaled::SyncConfig {
207 config: MmrConfig {
208 journal_partition: cfg.db_config.mmr_journal_partition,
209 metadata_partition: cfg.db_config.mmr_metadata_partition,
210 items_per_blob: cfg.db_config.mmr_items_per_blob,
211 write_buffer: cfg.db_config.mmr_write_buffer,
212 thread_pool: cfg.db_config.thread_pool.clone(),
213 buffer_pool: cfg.db_config.buffer_pool.clone(),
214 },
215 lower_bound: leaf_num_to_pos(cfg.lower_bound),
216 upper_bound: leaf_num_to_pos(cfg.upper_bound + 1) - 1,
217 pinned_nodes: cfg.pinned_nodes,
218 },
219 )
220 .await
221 .map_err(Error::Mmr)?;
222
223 let mut locations = init_journal(
225 context.with_label("locations"),
226 fixed::Config {
227 partition: cfg.db_config.locations_journal_partition,
228 items_per_blob: cfg.db_config.locations_items_per_blob,
229 write_buffer: cfg.db_config.log_write_buffer,
230 buffer_pool: cfg.db_config.buffer_pool.clone(),
231 },
232 cfg.lower_bound,
233 cfg.upper_bound,
234 )
235 .await?;
236
237 let mut snapshot = Index::init(
239 context.with_label("snapshot"),
240 cfg.db_config.translator.clone(),
241 );
242 let (log_size, oldest_retained_loc) = Self::build_snapshot_from_log(
243 &mut Standard::<H>::new(),
244 cfg.db_config.log_items_per_section,
245 &mut mmr,
246 &mut cfg.log,
247 &mut locations,
248 &mut snapshot,
249 )
250 .await?;
251
252 let last_commit = log_size.checked_sub(1);
253
254 let mut db = Immutable {
255 mmr,
256 log: cfg.log,
257 log_size,
258 oldest_retained_loc,
259 locations,
260 log_items_per_section: cfg.db_config.log_items_per_section.get(),
261 snapshot,
262 hasher: Standard::<H>::new(),
263 last_commit,
264 };
265
266 db.sync().await?;
267 Ok(db)
268 }
269
270 pub(super) async fn build_snapshot_from_log(
283 hasher: &mut Standard<H>,
284 log_items_per_section: NonZeroU64,
285 mmr: &mut Mmr<E, H>,
286 log: &mut variable::Journal<E, Variable<K, V>>,
287 locations: &mut fixed::Journal<E, u32>,
288 snapshot: &mut Index<T, u64>,
289 ) -> Result<(u64, u64), Error> {
290 let mut mmr_leaves = super::align_mmr_and_locations(mmr, locations).await?;
292
293 let mut log_size = 0;
295 let mut after_last_commit = None;
297 let mut uncommitted_ops = Vec::new();
299 let mut oldest_retained_loc = None;
300
301 {
305 let stream = log
306 .replay(0, 0, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
307 .await?;
308 pin_mut!(stream);
309 while let Some(result) = stream.next().await {
310 match result {
311 Err(e) => {
312 return Err(Error::Journal(e));
313 }
314 Ok((section, offset, _, op)) => {
315 if oldest_retained_loc.is_none() {
316 log_size = section * log_items_per_section.get();
317 oldest_retained_loc = Some(log_size);
318 }
319
320 let loc = log_size; if after_last_commit.is_none() {
322 after_last_commit = Some((loc, offset));
323 }
324
325 log_size += 1;
326
327 let expected = loc / log_items_per_section.get();
330 assert_eq!(section, expected,
331 "section {section} did not match expected session {expected} from location {loc}");
332
333 if log_size > mmr_leaves {
334 debug!(
335 section,
336 offset, "operation was missing from MMR/location map"
337 );
338 mmr.add(hasher, &op.encode()).await?;
339 locations.append(offset).await?;
340 mmr_leaves += 1;
341 }
342 match op {
343 Variable::Set(key, _) => {
344 uncommitted_ops.push((key, loc));
345 }
346 Variable::Commit(_) => {
347 for (key, loc) in uncommitted_ops.iter() {
348 snapshot.insert(key, *loc);
349 }
350 uncommitted_ops.clear();
351 after_last_commit = None;
352 }
353 _ => {
354 unreachable!(
355 "unsupported operation at offset {offset} in section {section}"
356 );
357 }
358 }
359 }
360 }
361 }
362 }
363
364 if let Some((end_loc, end_offset)) = after_last_commit {
366 assert!(!uncommitted_ops.is_empty());
367 warn!(
368 op_count = uncommitted_ops.len(),
369 log_size = end_loc,
370 end_offset,
371 "rewinding over uncommitted operations at end of log"
372 );
373 let prune_to_section = end_loc / log_items_per_section.get();
374 log.rewind_to_offset(prune_to_section, end_offset).await?;
375 log.sync(prune_to_section).await?;
376 log_size = end_loc;
377 }
378
379 if mmr_leaves > log_size {
381 locations.rewind(log_size).await?;
382 locations.sync().await?;
383
384 let op_count = mmr_leaves - log_size;
385 warn!(op_count, "popping uncommitted MMR operations");
386 mmr.pop(op_count as usize).await?;
387 }
388
389 assert_eq!(log_size, leaf_pos_to_num(mmr.size()).unwrap());
391 assert_eq!(log_size, locations.size().await?);
392
393 Ok((log_size, oldest_retained_loc.unwrap_or(0)))
394 }
395
396 fn current_section(&self) -> u64 {
398 self.log_size / self.log_items_per_section
399 }
400
401 pub fn oldest_retained_loc(&self) -> Option<u64> {
403 if self.log_size == 0 {
404 None
405 } else {
406 Some(self.oldest_retained_loc)
407 }
408 }
409
410 pub async fn prune(&mut self, loc: u64) -> Result<(), Error> {
417 assert!(loc <= self.last_commit.unwrap_or(0));
418
419 let section = loc / self.log_items_per_section;
425 self.log.prune(section).await?;
426 self.oldest_retained_loc = section * self.log_items_per_section;
427
428 self.locations.prune(self.oldest_retained_loc).await?;
430 self.mmr
431 .prune_to_pos(&mut self.hasher, leaf_num_to_pos(self.oldest_retained_loc))
432 .await
433 .map_err(Error::Mmr)
434 }
435
436 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
439 let iter = self.snapshot.get(key);
440 for &loc in iter {
441 if loc < self.oldest_retained_loc {
442 continue;
443 }
444 if let Some(v) = self.get_from_loc(key, loc).await? {
445 return Ok(Some(v));
446 }
447 }
448
449 Ok(None)
450 }
451
452 pub async fn get_loc(&self, loc: u64) -> Result<Option<V>, Error> {
455 assert!(loc < self.op_count());
456 if loc < self.oldest_retained_loc {
457 return Err(Error::OperationPruned(loc));
458 }
459
460 let offset = self.locations.read(loc).await?;
461 let section = loc / self.log_items_per_section;
462 let op = self.log.get(section, offset).await?;
463
464 Ok(op.into_value())
465 }
466
467 pub async fn get_from_loc(&self, key: &K, loc: u64) -> Result<Option<V>, Error> {
471 if loc < self.oldest_retained_loc {
472 return Err(Error::OperationPruned(loc));
473 }
474
475 match self.locations.read(loc).await {
476 Ok(offset) => {
477 return self.get_from_offset(key, loc, offset).await;
478 }
479 Err(e) => Err(Error::Journal(e)),
480 }
481 }
482
483 async fn get_from_offset(&self, key: &K, loc: u64, offset: u32) -> Result<Option<V>, Error> {
487 if loc < self.oldest_retained_loc {
488 return Err(Error::OperationPruned(loc));
489 }
490
491 let section = loc / self.log_items_per_section;
492 let Variable::Set(k, v) = self.log.get(section, offset).await? else {
493 panic!("didn't find Set operation at location {loc} and offset {offset}");
494 };
495
496 if k != *key {
497 Ok(None)
498 } else {
499 Ok(Some(v))
500 }
501 }
502
503 pub fn op_count(&self) -> u64 {
506 self.log_size
507 }
508
509 pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
516 let loc = self.log_size;
517 self.snapshot
518 .insert_and_prune(&key, loc, |v| *v < self.oldest_retained_loc);
519
520 let op = Variable::Set(key, value);
521 self.apply_op(op).await
522 }
523
524 pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
530 self.mmr.root(hasher)
531 }
532
533 pub(super) async fn apply_op(&mut self, op: Variable<K, V>) -> Result<(), Error> {
536 let section = self.current_section();
537 let encoded_op = op.encode();
538
539 let mmr_fut = async {
541 self.mmr.add_batched(&mut self.hasher, &encoded_op).await?;
542 Ok::<(), Error>(())
543 };
544
545 let log_fut = async {
548 let (offset, _) = self.log.append(section, op).await?;
549 self.locations.append(offset).await?;
550 Ok::<(), Error>(())
551 };
552
553 try_join!(mmr_fut, log_fut)?;
555 self.log_size += 1;
556
557 if section != self.current_section() {
559 self.log.sync(section).await?;
560 }
561
562 Ok(())
563 }
564
565 pub async fn proof(
576 &self,
577 start_index: u64,
578 max_ops: NonZeroU64,
579 ) -> Result<(Proof<H::Digest>, Vec<Variable<K, V>>), Error> {
580 self.historical_proof(self.op_count(), start_index, max_ops)
581 .await
582 }
583
584 pub async fn historical_proof(
586 &self,
587 size: u64,
588 start_loc: u64,
589 max_ops: NonZeroU64,
590 ) -> Result<(Proof<H::Digest>, Vec<Variable<K, V>>), Error> {
591 if start_loc < self.oldest_retained_loc {
592 return Err(Error::OperationPruned(start_loc));
593 }
594
595 let start_pos = leaf_num_to_pos(start_loc);
596 let end_loc = std::cmp::min(size - 1, start_loc + max_ops.get() - 1);
597 let end_pos = leaf_num_to_pos(end_loc);
598 let mmr_size = leaf_num_to_pos(size);
599
600 let proof = self
601 .mmr
602 .historical_range_proof(mmr_size, start_pos, end_pos)
603 .await?;
604 let mut ops = Vec::with_capacity((end_loc - start_loc + 1) as usize);
605 for loc in start_loc..=end_loc {
606 let section = loc / self.log_items_per_section;
607 let offset = self.locations.read(loc).await?;
608 let op = self.log.get(section, offset).await?;
609 ops.push(op);
610 }
611
612 Ok((proof, ops))
613 }
614
615 pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
621 self.last_commit = Some(self.log_size);
622 let op = Variable::<K, V>::Commit(metadata);
623 let encoded_op = op.encode();
624 let section = self.current_section();
625
626 let mmr_fut = async {
628 self.mmr.add_batched(&mut self.hasher, &encoded_op).await?;
629 self.mmr.process_updates(&mut self.hasher);
630 Ok::<(), Error>(())
631 };
632
633 let log_fut = async {
636 let (offset, _) = self.log.append(section, op).await?;
637 try_join!(
639 self.log.sync(section).map_err(Error::Journal),
640 self.locations.append(offset).map_err(Error::Journal),
641 )?;
642 Ok::<(), Error>(())
643 };
644
645 try_join!(mmr_fut, log_fut)?;
647 self.log_size += 1;
648
649 Ok(())
650 }
651
652 pub async fn get_metadata(&self) -> Result<Option<(u64, Option<V>)>, Error> {
655 let Some(last_commit) = self.last_commit else {
656 return Ok(None);
657 };
658 let section = last_commit / self.log_items_per_section;
659 let offset = self.locations.read(last_commit).await?;
660 let Variable::Commit(metadata) = self.log.get(section, offset).await? else {
661 unreachable!("no commit operation at location of last commit {last_commit}");
662 };
663
664 Ok(Some((last_commit, metadata)))
665 }
666
667 pub(super) async fn sync(&mut self) -> Result<(), Error> {
671 let section = self.current_section();
672 try_join!(
673 self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
674 self.log.sync(section).map_err(Error::Journal),
675 self.locations.sync().map_err(Error::Journal),
676 )?;
677
678 Ok(())
679 }
680
681 pub async fn close(mut self) -> Result<(), Error> {
683 try_join!(
684 self.log.close().map_err(Error::Journal),
685 self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
686 self.locations.close().map_err(Error::Journal),
687 )?;
688
689 Ok(())
690 }
691
692 pub async fn destroy(self) -> Result<(), Error> {
694 try_join!(
695 self.log.destroy().map_err(Error::Journal),
696 self.mmr.destroy().map_err(Error::Mmr),
697 self.locations.destroy().map_err(Error::Journal),
698 )?;
699
700 Ok(())
701 }
702
703 #[cfg(test)]
706 pub async fn simulate_failed_commit_mmr(mut self, write_limit: usize) -> Result<(), Error>
707 where
708 V: Default,
709 {
710 self.apply_op(Variable::Commit(None)).await?;
711 self.log.close().await?;
712 self.locations.close().await?;
713 self.mmr
714 .simulate_partial_sync(&mut self.hasher, write_limit)
715 .await?;
716
717 Ok(())
718 }
719
720 #[cfg(test)]
723 pub async fn simulate_failed_commit_log(mut self) -> Result<(), Error>
724 where
725 V: Default,
726 {
727 self.apply_op(Variable::Commit(None)).await?;
728 let mut section = self.current_section();
729
730 self.mmr.close(&mut self.hasher).await?;
731 let mut size = self.log.size(section).await?;
733 if size == 0 {
734 section -= 1;
735 size = self.log.size(section).await?;
736 }
737 self.log.rewind(section, size - 1).await?;
738 self.log.close().await?;
739
740 Ok(())
741 }
742
743 #[cfg(test)]
746 pub async fn simulate_failed_commit_locations(
747 mut self,
748 operations_to_trim: u64,
749 ) -> Result<(), Error>
750 where
751 V: Default,
752 {
753 self.apply_op(Variable::Commit(None)).await?;
754 let op_count = self.op_count();
755 assert!(op_count >= operations_to_trim);
756
757 self.log.close().await?;
758 self.mmr.close(&mut self.hasher).await?;
759 self.locations.rewind(op_count - operations_to_trim).await?;
760 self.locations.close().await?;
761
762 Ok(())
763 }
764}
765
766#[cfg(test)]
767pub(super) mod test {
768 use super::*;
769 use crate::{adb::verify_proof, mmr::mem::Mmr as MemMmr, translator::TwoCap};
770 use commonware_cryptography::{sha256::Digest, Sha256};
771 use commonware_macros::test_traced;
772 use commonware_runtime::{
773 deterministic::{self},
774 Runner as _,
775 };
776 use commonware_utils::{NZUsize, NZU64};
777
778 const PAGE_SIZE: usize = 77;
779 const PAGE_CACHE_SIZE: usize = 9;
780 const ITEMS_PER_SECTION: u64 = 5;
781
782 pub(crate) fn db_config(suffix: &str) -> Config<TwoCap, (commonware_codec::RangeCfg, ())> {
783 Config {
784 mmr_journal_partition: format!("journal_{suffix}"),
785 mmr_metadata_partition: format!("metadata_{suffix}"),
786 mmr_items_per_blob: NZU64!(11),
787 mmr_write_buffer: NZUsize!(1024),
788 log_journal_partition: format!("log_journal_{suffix}"),
789 log_items_per_section: NZU64!(ITEMS_PER_SECTION),
790 log_compression: None,
791 log_codec_config: ((0..=10000).into(), ()),
792 log_write_buffer: NZUsize!(1024),
793 locations_journal_partition: format!("locations_journal_{suffix}"),
794 locations_items_per_blob: NZU64!(7),
795 translator: TwoCap,
796 thread_pool: None,
797 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
798 }
799 }
800
801 type ImmutableTest = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
803
804 async fn open_db(context: deterministic::Context) -> ImmutableTest {
806 ImmutableTest::init(context, db_config("partition"))
807 .await
808 .unwrap()
809 }
810
811 #[test_traced("WARN")]
812 pub fn test_immutable_db_empty() {
813 let executor = deterministic::Runner::default();
814 executor.start(|context| async move {
815 let mut db = open_db(context.clone()).await;
816 let mut hasher = Standard::<Sha256>::new();
817 assert_eq!(db.op_count(), 0);
818 assert_eq!(db.oldest_retained_loc(), None);
819 assert_eq!(db.root(&mut hasher), MemMmr::default().root(&mut hasher));
820 assert!(db.get_metadata().await.unwrap().is_none());
821
822 let k1 = Sha256::fill(1u8);
824 let v1 = vec![4, 5, 6, 7];
825 let root = db.root(&mut hasher);
826 db.set(k1, v1).await.unwrap();
827 db.close().await.unwrap();
828 let mut db = open_db(context.clone()).await;
829 assert_eq!(db.root(&mut hasher), root);
830 assert_eq!(db.op_count(), 0);
831
832 db.commit(None).await.unwrap();
834 assert_eq!(db.op_count(), 1); let root = db.root(&mut hasher);
836 db.close().await.unwrap();
837
838 let db = open_db(context.clone()).await;
839 assert_eq!(db.root(&mut hasher), root);
840
841 db.destroy().await.unwrap();
842 });
843 }
844
845 #[test_traced("DEBUG")]
846 pub fn test_immutable_db_build_basic() {
847 let executor = deterministic::Runner::default();
848 executor.start(|context| async move {
849 let mut hasher = Standard::<Sha256>::new();
851 let mut db = open_db(context.clone()).await;
852
853 let k1 = Sha256::fill(1u8);
854 let k2 = Sha256::fill(2u8);
855 let v1 = vec![1, 2, 3];
856 let v2 = vec![4, 5, 6, 7, 8];
857
858 assert!(db.get(&k1).await.unwrap().is_none());
859 assert!(db.get(&k2).await.unwrap().is_none());
860
861 db.set(k1, v1.clone()).await.unwrap();
863 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
864 assert!(db.get(&k2).await.unwrap().is_none());
865 assert_eq!(db.op_count(), 1);
866 let metadata = Some(vec![99, 100]);
868 db.commit(metadata.clone()).await.unwrap();
869 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
870 assert!(db.get(&k2).await.unwrap().is_none());
871 assert_eq!(db.op_count(), 2);
872 assert_eq!(
873 db.get_metadata().await.unwrap(),
874 Some((1, metadata.clone()))
875 );
876 db.set(k2, v2.clone()).await.unwrap();
878 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
879 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
880 assert_eq!(db.op_count(), 3);
881
882 assert_eq!(db.get_metadata().await.unwrap(), Some((1, metadata)));
884
885 db.commit(None).await.unwrap();
887 assert_eq!(db.op_count(), 4);
888 assert_eq!(db.get_metadata().await.unwrap(), Some((3, None)));
889
890 let root = db.root(&mut hasher);
892
893 let k3 = Sha256::fill(3u8);
895 let v3 = vec![9, 10, 11];
896 db.set(k3, v3).await.unwrap();
897 assert_eq!(db.op_count(), 5);
898 assert_ne!(db.root(&mut hasher), root);
899
900 db.close().await.unwrap();
902 let db = open_db(context.clone()).await;
903 assert!(db.get(&k3).await.unwrap().is_none());
904 assert_eq!(db.op_count(), 4);
905 assert_eq!(db.root(&mut hasher), root);
906 assert_eq!(db.get_metadata().await.unwrap(), Some((3, None)));
907
908 db.destroy().await.unwrap();
910 });
911 }
912
913 #[test_traced("WARN")]
914 pub fn test_immutable_db_build_and_authenticate() {
915 let executor = deterministic::Runner::default();
916 const ELEMENTS: u64 = 2_000;
918 executor.start(|context| async move {
919 let mut hasher = Standard::<Sha256>::new();
920 let mut db = open_db(context.clone()).await;
921
922 for i in 0u64..ELEMENTS {
923 let k = Sha256::hash(&i.to_be_bytes());
924 let v = vec![i as u8; 100];
925 db.set(k, v).await.unwrap();
926 }
927
928 assert_eq!(db.op_count(), ELEMENTS);
929
930 db.commit(None).await.unwrap();
931 assert_eq!(db.op_count(), ELEMENTS + 1);
932
933 let root = db.root(&mut hasher);
935 db.close().await.unwrap();
936 let db = open_db(context.clone()).await;
937 assert_eq!(root, db.root(&mut hasher));
938 assert_eq!(db.op_count(), ELEMENTS + 1);
939 for i in 0u64..ELEMENTS {
940 let k = Sha256::hash(&i.to_be_bytes());
941 let v = vec![i as u8; 100];
942 assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
943 }
944
945 let max_ops = NZU64!(5);
948 for i in 0..db.op_count() {
949 let (proof, log) = db.proof(i, max_ops).await.unwrap();
950 assert!(verify_proof(&mut hasher, &proof, i, &log, &root));
951 }
952
953 db.destroy().await.unwrap();
954 });
955 }
956
957 #[test_traced("WARN")]
958 pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
959 let executor = deterministic::Runner::default();
960 executor.start(|context| async move {
961 const ELEMENTS: u64 = 1000;
963 let mut hasher = Standard::<Sha256>::new();
964 let mut db = open_db(context.clone()).await;
965
966 for i in 0u64..ELEMENTS {
967 let k = Sha256::hash(&i.to_be_bytes());
968 let v = vec![i as u8; 100];
969 db.set(k, v).await.unwrap();
970 }
971
972 assert_eq!(db.op_count(), ELEMENTS);
973 db.sync().await.unwrap();
974 let halfway_root = db.root(&mut hasher);
975
976 for i in 0u64..ELEMENTS {
978 let k = Sha256::hash(&i.to_be_bytes());
979 let v = vec![i as u8; 100];
980 db.set(k, v).await.unwrap();
981 }
982
983 db.simulate_failed_commit_mmr(101).await.unwrap();
985
986 let db = open_db(context.clone()).await;
988 assert_eq!(db.op_count(), 2001);
989 let root = db.root(&mut hasher);
990 assert_ne!(root, halfway_root);
991
992 db.close().await.unwrap();
994 let db = open_db(context.clone()).await;
995 assert_eq!(db.op_count(), 2001);
996 assert_eq!(db.root(&mut hasher), root);
997
998 db.destroy().await.unwrap();
999 });
1000 }
1001
1002 #[test_traced("WARN")]
1003 pub fn test_immutable_db_recovery_from_failed_locations_sync() {
1004 let executor = deterministic::Runner::default();
1005 executor.start(|context| async move {
1006 const ELEMENTS: u64 = 1000;
1008 let mut hasher = Standard::<Sha256>::new();
1009 let mut db = open_db(context.clone()).await;
1010
1011 for i in 0u64..ELEMENTS {
1012 let k = Sha256::hash(&i.to_be_bytes());
1013 let v = vec![i as u8; 100];
1014 db.set(k, v).await.unwrap();
1015 }
1016
1017 assert_eq!(db.op_count(), ELEMENTS);
1018 db.sync().await.unwrap();
1019 let halfway_root = db.root(&mut hasher);
1020
1021 for i in 0u64..ELEMENTS {
1023 let k = Sha256::hash(&i.to_be_bytes());
1024 let v = vec![i as u8; 100];
1025 db.set(k, v).await.unwrap();
1026 }
1027
1028 db.simulate_failed_commit_locations(101).await.unwrap();
1030
1031 let db = open_db(context.clone()).await;
1033 assert_eq!(db.op_count(), 2001);
1034 let root = db.root(&mut hasher);
1035 assert_ne!(root, halfway_root);
1036
1037 db.destroy().await.unwrap();
1038 });
1039 }
1040
1041 #[test_traced("WARN")]
1042 pub fn test_immutable_db_recovery_from_failed_log_sync() {
1043 let executor = deterministic::Runner::default();
1044 executor.start(|context| async move {
1045 let mut hasher = Standard::<Sha256>::new();
1046 let mut db = open_db(context.clone()).await;
1047
1048 let k1 = Sha256::fill(1u8);
1050 let v1 = vec![1, 2, 3];
1051 db.set(k1, v1).await.unwrap();
1052 db.commit(None).await.unwrap();
1053 let first_commit_root = db.root(&mut hasher);
1054
1055 const ELEMENTS: u64 = 1000;
1057
1058 for i in 0u64..ELEMENTS {
1059 let k = Sha256::hash(&i.to_be_bytes());
1060 let v = vec![i as u8; 100];
1061 db.set(k, v).await.unwrap();
1062 }
1063
1064 assert_eq!(db.op_count(), ELEMENTS + 2);
1065 db.sync().await.unwrap();
1066
1067 for i in 0u64..ELEMENTS {
1069 let k = Sha256::hash(&i.to_be_bytes());
1070 let v = vec![i as u8; 100];
1071 db.set(k, v).await.unwrap();
1072 }
1073
1074 db.simulate_failed_commit_log().await.unwrap();
1076
1077 let db = open_db(context.clone()).await;
1079 assert_eq!(db.op_count(), 2);
1080 let root = db.root(&mut hasher);
1081 assert_eq!(root, first_commit_root);
1082
1083 db.destroy().await.unwrap();
1084 });
1085 }
1086
1087 #[test_traced("WARN")]
1088 pub fn test_immutable_db_pruning() {
1089 let executor = deterministic::Runner::default();
1090 const ELEMENTS: u64 = 2_000;
1092 executor.start(|context| async move {
1093 let mut hasher = Standard::<Sha256>::new();
1094 let mut db = open_db(context.clone()).await;
1095
1096 for i in 0u64..ELEMENTS {
1097 let k = Sha256::hash(&i.to_be_bytes());
1098 let v = vec![i as u8; 100];
1099 db.set(k, v).await.unwrap();
1100 }
1101
1102 assert_eq!(db.op_count(), ELEMENTS);
1103
1104 db.commit(None).await.unwrap();
1105 assert_eq!(db.op_count(), ELEMENTS + 1);
1106
1107 db.prune(ELEMENTS / 2).await.unwrap();
1109 assert_eq!(db.op_count(), ELEMENTS + 1);
1110
1111 let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1114 assert_eq!(oldest_retained_loc, ELEMENTS / 2);
1115
1116 let pruned_loc = oldest_retained_loc - 1;
1118 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
1119 assert!(db.get(&pruned_key).await.unwrap().is_none());
1120
1121 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
1123 assert!(db.get(&unpruned_key).await.unwrap().is_some());
1124
1125 let root = db.root(&mut hasher);
1127 db.close().await.unwrap();
1128 let mut db = open_db(context.clone()).await;
1129 assert_eq!(root, db.root(&mut hasher));
1130 assert_eq!(db.op_count(), ELEMENTS + 1);
1131 let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1132 assert_eq!(oldest_retained_loc, ELEMENTS / 2);
1133
1134 db.prune(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1))
1136 .await
1137 .unwrap();
1138 let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1140 assert_eq!(oldest_retained_loc, ELEMENTS / 2 + ITEMS_PER_SECTION);
1141
1142 db.close().await.unwrap();
1144 let db = open_db(context.clone()).await;
1145 let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1146 assert_eq!(oldest_retained_loc, ELEMENTS / 2 + ITEMS_PER_SECTION);
1147
1148 let pruned_loc = oldest_retained_loc - 3;
1150 let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
1151 assert!(db.get(&pruned_key).await.unwrap().is_none());
1152
1153 let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
1155 assert!(db.get(&unpruned_key).await.unwrap().is_some());
1156
1157 let pruned_pos = ELEMENTS / 2;
1159 let proof_result = db.proof(pruned_pos, NZU64!(pruned_pos + 100)).await;
1160 assert!(matches!(proof_result, Err(Error::OperationPruned(pos)) if pos == pruned_pos));
1161
1162 db.destroy().await.unwrap();
1163 });
1164 }
1165}