1use crate::{
60 index::{unordered::Index, Unordered as _},
61 journal::{
62 authenticated,
63 contiguous::{Contiguous, Mutable, Reader},
64 Error as JournalError,
65 },
66 merkle::{journaled::Config as MmrConfig, Family, Location, Proof},
67 qmdb::{any::ValueEncoding, build_snapshot_from_log, delete_known_loc, operation::Key, Error},
68 translator::Translator,
69 Context, Persistable,
70};
71use commonware_codec::EncodeShared;
72use commonware_cryptography::Hasher as CHasher;
73use std::{collections::BTreeSet, num::NonZeroU64, ops::Range, sync::Arc};
74use tracing::warn;
75
76pub mod batch;
77pub mod fixed;
78mod operation;
79pub mod sync;
80pub mod variable;
81
82pub use operation::Operation;
83
84#[derive(Clone)]
86pub struct Config<T: Translator, J> {
87 pub merkle_config: MmrConfig,
89
90 pub log: J,
92
93 pub translator: T,
95}
96
97pub struct Immutable<
107 F: Family,
108 E: Context,
109 K: Key,
110 V: ValueEncoding,
111 C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
112 H: CHasher,
113 T: Translator,
114> where
115 C::Item: EncodeShared,
116{
117 pub(crate) journal: authenticated::Journal<F, E, C, H>,
119
120 pub(crate) snapshot: Index<T, Location<F>>,
126
127 pub(crate) last_commit_loc: Location<F>,
129}
130
131impl<F, E, K, V, C, H, T> Immutable<F, E, K, V, C, H, T>
133where
134 F: Family,
135 E: Context,
136 K: Key,
137 V: ValueEncoding,
138 C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
139 C::Item: EncodeShared,
140 H: CHasher,
141 T: Translator,
142{
143 pub(crate) async fn init_from_journal(
148 mut journal: authenticated::Journal<F, E, C, H>,
149 context: E,
150 translator: T,
151 ) -> Result<Self, Error<F>> {
152 if journal.size().await == 0 {
153 warn!("Authenticated log is empty, initialized new db.");
154 journal.append(&Operation::Commit(None)).await?;
155 journal.sync().await?;
156 }
157
158 let mut snapshot = Index::new(context.with_label("snapshot"), translator);
159
160 let last_commit_loc = {
161 let reader = journal.journal.reader().await;
163 let start_loc = Location::new(reader.bounds().start);
164
165 build_snapshot_from_log::<F, _, _, _>(start_loc, &reader, &mut snapshot, |_, _| {})
167 .await?;
168
169 Location::new(
170 reader
171 .bounds()
172 .end
173 .checked_sub(1)
174 .expect("commit should exist"),
175 )
176 };
177
178 Ok(Self {
179 journal,
180 snapshot,
181 last_commit_loc,
182 })
183 }
184
185 pub async fn size(&self) -> Location<F> {
187 self.bounds().await.end
188 }
189
190 pub async fn bounds(&self) -> Range<Location<F>> {
193 let bounds = self.journal.reader().await.bounds();
194 Location::new(bounds.start)..Location::new(bounds.end)
195 }
196
197 pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error<F>> {
200 let iter = self.snapshot.get(key);
201 let reader = self.journal.reader().await;
202 let oldest = reader.bounds().start;
203 for &loc in iter {
204 if loc < oldest {
205 continue;
206 }
207 if let Some(v) = Self::get_from_loc(&reader, key, loc).await? {
208 return Ok(Some(v));
209 }
210 }
211
212 Ok(None)
213 }
214
215 async fn get_from_loc(
219 reader: &impl Reader<Item = Operation<K, V>>,
220 key: &K,
221 loc: Location<F>,
222 ) -> Result<Option<V::Value>, Error<F>> {
223 if loc < reader.bounds().start {
224 return Err(Error::OperationPruned(loc));
225 }
226
227 let Operation::Set(k, v) = reader.read(*loc).await? else {
228 return Err(Error::UnexpectedData(loc));
229 };
230
231 if k != *key {
232 Ok(None)
233 } else {
234 Ok(Some(v))
235 }
236 }
237
238 pub async fn get_metadata(&self) -> Result<Option<V::Value>, Error<F>> {
240 let last_commit_loc = self.last_commit_loc;
241 let Operation::Commit(metadata) = self
242 .journal
243 .journal
244 .reader()
245 .await
246 .read(*last_commit_loc)
247 .await?
248 else {
249 unreachable!("no commit operation at location of last commit {last_commit_loc}");
250 };
251
252 Ok(metadata)
253 }
254
255 pub async fn historical_proof(
266 &self,
267 op_count: Location<F>,
268 start_loc: Location<F>,
269 max_ops: NonZeroU64,
270 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<K, V>>), Error<F>> {
271 Ok(self
272 .journal
273 .historical_proof(op_count, start_loc, max_ops)
274 .await?)
275 }
276
277 pub async fn proof(
284 &self,
285 start_index: Location<F>,
286 max_ops: NonZeroU64,
287 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<K, V>>), Error<F>> {
288 let op_count = self.bounds().await.end;
289 self.historical_proof(op_count, start_index, max_ops).await
290 }
291
292 pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
300 if loc > self.last_commit_loc {
301 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
302 }
303 self.journal.prune(loc).await?;
304
305 Ok(())
306 }
307
308 pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
328 let rewind_size = *size;
329 let current_size = *self.last_commit_loc + 1;
330 if rewind_size == current_size {
331 return Ok(());
332 }
333 if rewind_size == 0 || rewind_size > current_size {
334 return Err(Error::Journal(crate::journal::Error::InvalidRewind(
335 rewind_size,
336 )));
337 }
338
339 let (rewind_last_loc, rewound_sets) = {
340 let reader = self.journal.reader().await;
341 let bounds = reader.bounds();
342 let rewind_last_loc = Location::new(rewind_size - 1);
343 if rewind_size <= bounds.start {
344 return Err(Error::Journal(crate::journal::Error::ItemPruned(
345 *rewind_last_loc,
346 )));
347 }
348 let rewind_last_op = reader.read(*rewind_last_loc).await?;
349 if !matches!(rewind_last_op, Operation::Commit(_)) {
350 return Err(Error::UnexpectedData(rewind_last_loc));
351 }
352
353 let mut rewound_sets = Vec::new();
357 for loc in rewind_size..current_size {
358 if let Operation::Set(key, _) = reader.read(loc).await? {
359 rewound_sets.push((Location::new(loc), key));
360 }
361 }
362
363 (rewind_last_loc, rewound_sets)
364 };
365
366 self.journal.rewind(rewind_size).await?;
369 for (loc, key) in rewound_sets {
370 delete_known_loc(&mut self.snapshot, &key, loc);
371 }
372 self.last_commit_loc = rewind_last_loc;
373
374 Ok(())
375 }
376
377 pub fn root(&self) -> H::Digest {
379 self.journal.root()
380 }
381
382 pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<H::Digest>, Error<F>> {
384 if !loc.is_valid() {
385 return Err(crate::merkle::Error::LocationOverflow(loc).into());
386 }
387 let futs: Vec<_> = F::nodes_to_pin(loc)
388 .map(|p| async move {
389 self.journal
390 .merkle
391 .get_node(p)
392 .await?
393 .ok_or(crate::merkle::Error::ElementPruned(p).into())
394 })
395 .collect();
396 futures::future::try_join_all(futs).await
397 }
398
399 pub async fn sync(&self) -> Result<(), Error<F>> {
403 Ok(self.journal.sync().await?)
404 }
405
406 pub async fn commit(&self) -> Result<(), Error<F>> {
408 Ok(self.journal.commit().await?)
409 }
410
411 pub async fn destroy(self) -> Result<(), Error<F>> {
413 Ok(self.journal.destroy().await?)
414 }
415
416 #[allow(clippy::type_complexity)]
418 pub fn new_batch(&self) -> batch::UnmerkleizedBatch<F, H, K, V> {
419 let journal_size = *self.last_commit_loc + 1;
420 batch::UnmerkleizedBatch::new(self, journal_size)
421 }
422
423 pub async fn apply_batch(
435 &mut self,
436 batch: Arc<batch::MerkleizedBatch<F, H::Digest, K, V>>,
437 ) -> Result<Range<Location<F>>, Error<F>> {
438 let db_size = *self.last_commit_loc + 1;
439 let valid = db_size == batch.db_size
440 || db_size == batch.base_size
441 || batch.ancestor_diff_ends.contains(&db_size);
442 if !valid {
443 return Err(Error::StaleBatch {
444 db_size,
445 batch_db_size: batch.db_size,
446 batch_base_size: batch.base_size,
447 });
448 }
449 let start_loc = Location::new(db_size);
450
451 self.journal.apply_batch(&batch.journal_batch).await?;
453
454 let bounds = self.journal.reader().await.bounds();
457 let mut seen = BTreeSet::new();
458 for (key, entry) in batch.diff.iter() {
459 seen.insert(key.clone());
460 self.snapshot
461 .insert_and_prune(key, entry.loc, |v| *v < bounds.start);
462 }
463 for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
464 if batch.ancestor_diff_ends[i] <= db_size {
465 continue;
466 }
467 for (key, entry) in ancestor_diff.iter() {
468 if seen.insert(key.clone()) {
469 self.snapshot
470 .insert_and_prune(key, entry.loc, |v| *v < bounds.start);
471 }
472 }
473 }
474
475 self.last_commit_loc = Location::new(batch.total_size - 1);
477 Ok(start_loc..Location::new(batch.total_size))
478 }
479}
480
481#[cfg(test)]
482pub(super) mod test {
483 use super::*;
484 use crate::{
485 merkle::{Family, Location},
486 qmdb::verify_proof,
487 translator::TwoCap,
488 };
489 use commonware_codec::EncodeShared;
490 use commonware_cryptography::{sha256, sha256::Digest, Sha256};
491 use commonware_runtime::{deterministic, Metrics};
492 use commonware_utils::NZU64;
493 use core::{future::Future, pin::Pin};
494 use std::ops::Range;
495
496 type StandardHasher<H> = crate::merkle::hasher::Standard<H>;
497
498 const ITEMS_PER_SECTION: u64 = 5;
499
500 type TestDb<F, V, C> = Immutable<F, deterministic::Context, Digest, V, C, Sha256, TwoCap>;
501
502 pub(crate) async fn test_immutable_empty<F: Family, V, C>(
503 context: deterministic::Context,
504 open_db: impl Fn(
505 deterministic::Context,
506 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
507 ) where
508 V: ValueEncoding<Value = Digest>,
509 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
510 C::Item: EncodeShared,
511 {
512 let db = open_db(context.with_label("first")).await;
513 let bounds = db.bounds().await;
514 assert_eq!(bounds.end, 1);
515 assert_eq!(bounds.start, Location::new(0));
516 assert!(db.get_metadata().await.unwrap().is_none());
517
518 let k1 = Sha256::fill(1u8);
520 let v1 = Sha256::fill(2u8);
521 let root = db.root();
522 {
523 let _batch = db.new_batch().set(k1, v1);
524 }
526 drop(db);
527 let mut db = open_db(context.with_label("second")).await;
528 assert_eq!(db.root(), root);
529 assert_eq!(db.bounds().await.end, 1);
530
531 db.apply_batch(db.new_batch().merkleize(&db, None))
533 .await
534 .unwrap();
535 db.commit().await.unwrap();
536 assert_eq!(db.bounds().await.end, 2); let root = db.root();
538 drop(db);
539
540 let db = open_db(context.with_label("third")).await;
541 assert_eq!(db.root(), root);
542
543 db.destroy().await.unwrap();
544 }
545
546 pub(crate) async fn test_immutable_build_basic<F: Family, V, C>(
547 context: deterministic::Context,
548 open_db: impl Fn(
549 deterministic::Context,
550 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
551 ) where
552 V: ValueEncoding<Value = Digest>,
553 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
554 C::Item: EncodeShared,
555 {
556 let mut db = open_db(context.with_label("first")).await;
558
559 let k1 = Sha256::fill(1u8);
560 let k2 = Sha256::fill(2u8);
561 let v1 = Sha256::fill(3u8);
562 let v2 = Sha256::fill(4u8);
563
564 assert!(db.get(&k1).await.unwrap().is_none());
565 assert!(db.get(&k2).await.unwrap().is_none());
566
567 let metadata = Some(Sha256::fill(99u8));
569 db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, metadata))
570 .await
571 .unwrap();
572 db.commit().await.unwrap();
573 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
574 assert!(db.get(&k2).await.unwrap().is_none());
575 assert_eq!(db.bounds().await.end, 3);
576 assert_eq!(db.get_metadata().await.unwrap(), Some(Sha256::fill(99u8)));
577
578 db.apply_batch(db.new_batch().set(k2, v2).merkleize(&db, None))
580 .await
581 .unwrap();
582 db.commit().await.unwrap();
583 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
584 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
585 assert_eq!(db.bounds().await.end, 5);
586 assert_eq!(db.get_metadata().await.unwrap(), None);
587
588 let root = db.root();
590
591 let k3 = Sha256::fill(5u8);
593 let v3 = Sha256::fill(6u8);
594 {
595 let _batch = db.new_batch().set(k3, v3);
596 }
598
599 drop(db); let db = open_db(context.with_label("second")).await;
602 assert!(db.get(&k3).await.unwrap().is_none());
603 assert_eq!(db.root(), root);
604 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
605 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
606 assert_eq!(db.bounds().await.end, 5);
607 assert_eq!(db.get_metadata().await.unwrap(), None);
608
609 db.destroy().await.unwrap();
611 }
612
613 pub(crate) async fn test_immutable_proof_verify<F: Family, V, C>(
614 context: deterministic::Context,
615 open_db: impl Fn(
616 deterministic::Context,
617 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
618 ) where
619 V: ValueEncoding<Value = Digest>,
620 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
621 C::Item: EncodeShared,
622 {
623 let mut db = open_db(context.with_label("first")).await;
624
625 let k1 = Sha256::fill(1u8);
626 let v1 = Sha256::fill(10u8);
627 db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, None))
628 .await
629 .unwrap();
630 db.commit().await.unwrap();
631
632 let (proof, ops) = db.proof(Location::new(0), NZU64!(100)).await.unwrap();
633 let root = db.root();
634 let hasher = StandardHasher::<Sha256>::new();
635 assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
636
637 db.destroy().await.unwrap();
638 }
639
640 pub(crate) async fn test_immutable_prune<F: Family, V, C>(
641 context: deterministic::Context,
642 open_db: impl Fn(
643 deterministic::Context,
644 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
645 ) where
646 V: ValueEncoding<Value = Digest>,
647 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
648 C::Item: EncodeShared,
649 {
650 let mut db = open_db(context.with_label("first")).await;
651
652 for i in 0..20u8 {
653 let key = Sha256::fill(i);
654 let value = Sha256::fill(i.wrapping_add(100));
655 db.apply_batch(db.new_batch().set(key, value).merkleize(&db, None))
656 .await
657 .unwrap();
658 db.commit().await.unwrap();
659 }
660
661 let root_before = db.root();
662 let bounds_before = db.bounds().await;
663
664 let prune_loc = Location::new(*bounds_before.end - 5);
665 db.prune(prune_loc).await.unwrap();
666
667 assert_eq!(db.root(), root_before);
668
669 let key_0 = Sha256::fill(0u8);
670 assert!(db.get(&key_0).await.unwrap().is_none());
671
672 let key_19 = Sha256::fill(19u8);
673 assert_eq!(
674 db.get(&key_19).await.unwrap(),
675 Some(Sha256::fill(19u8.wrapping_add(100)))
676 );
677
678 db.destroy().await.unwrap();
679 }
680
681 pub(crate) async fn test_immutable_batch_chain<F: Family, V, C>(
682 context: deterministic::Context,
683 open_db: impl Fn(
684 deterministic::Context,
685 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
686 ) where
687 V: ValueEncoding<Value = Digest>,
688 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
689 C::Item: EncodeShared,
690 {
691 let mut db = open_db(context.with_label("first")).await;
692
693 let k1 = Sha256::fill(1u8);
694 let k2 = Sha256::fill(2u8);
695 let k3 = Sha256::fill(3u8);
696 let v1 = Sha256::fill(11u8);
697 let v2 = Sha256::fill(12u8);
698 let v3 = Sha256::fill(13u8);
699
700 let parent = db.new_batch().set(k1, v1).merkleize(&db, None);
701 let child = parent
702 .new_batch::<Sha256>()
703 .set(k2, v2)
704 .merkleize(&db, None);
705
706 assert_eq!(child.get(&k1, &db).await.unwrap(), Some(v1));
707 assert_eq!(child.get(&k2, &db).await.unwrap(), Some(v2));
708 assert!(child.get(&k3, &db).await.unwrap().is_none());
709
710 db.apply_batch(child).await.unwrap();
711 db.commit().await.unwrap();
712
713 assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
714 assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
715
716 db.apply_batch(db.new_batch().set(k3, v3).merkleize(&db, None))
717 .await
718 .unwrap();
719 db.commit().await.unwrap();
720 assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
721
722 db.destroy().await.unwrap();
723 }
724
725 pub(crate) async fn test_immutable_build_and_authenticate<F: Family, V, C>(
726 context: deterministic::Context,
727 open_db: impl Fn(
728 deterministic::Context,
729 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
730 ) where
731 V: ValueEncoding<Value = Digest>,
732 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
733 C::Item: EncodeShared,
734 {
735 let hasher = StandardHasher::<Sha256>::new();
737 let mut db = open_db(context.with_label("first")).await;
738
739 let mut batch = db.new_batch();
740 for i in 0u64..2_000 {
741 let k = Sha256::hash(&i.to_be_bytes());
742 let v = Sha256::fill(i as u8);
743 batch = batch.set(k, v);
744 }
745 let merkleized = batch.merkleize(&db, None);
746 db.apply_batch(merkleized).await.unwrap();
747 db.commit().await.unwrap();
748 assert_eq!(db.bounds().await.end, 2_000 + 2);
749
750 let root = db.root();
752 drop(db);
753
754 let db = open_db(context.with_label("second")).await;
755 assert_eq!(root, db.root());
756 assert_eq!(db.bounds().await.end, 2_000 + 2);
757 for i in 0u64..2_000 {
758 let k = Sha256::hash(&i.to_be_bytes());
759 let v = Sha256::fill(i as u8);
760 assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
761 }
762
763 let max_ops = NZU64!(5);
766 for i in 0..*db.bounds().await.end {
767 let (proof, log) = db.proof(Location::new(i), max_ops).await.unwrap();
768 assert!(verify_proof(&hasher, &proof, Location::new(i), &log, &root));
769 }
770
771 db.destroy().await.unwrap();
772 }
773
774 pub(crate) async fn test_immutable_recovery_from_failed_merkle_sync<F: Family, V, C>(
775 context: deterministic::Context,
776 open_db: impl Fn(
777 deterministic::Context,
778 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
779 ) where
780 V: ValueEncoding<Value = Digest>,
781 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
782 C::Item: EncodeShared,
783 {
784 const ELEMENTS: u64 = 1000;
786 let mut db = open_db(context.with_label("first")).await;
787
788 let mut batch = db.new_batch();
789 for i in 0u64..ELEMENTS {
790 let k = Sha256::hash(&i.to_be_bytes());
791 let v = Sha256::fill(i as u8);
792 batch = batch.set(k, v);
793 }
794 let merkleized = batch.merkleize(&db, None);
795 db.apply_batch(merkleized).await.unwrap();
796 db.commit().await.unwrap();
797 assert_eq!(db.bounds().await.end, ELEMENTS + 2);
798 db.sync().await.unwrap();
799 let halfway_root = db.root();
800
801 let mut batch = db.new_batch();
803 for i in 0u64..ELEMENTS {
804 let k = Sha256::hash(&i.to_be_bytes());
805 let v = Sha256::fill(i as u8);
806 batch = batch.set(k, v);
807 }
808 let merkleized = batch.merkleize(&db, None);
809 db.apply_batch(merkleized).await.unwrap();
810 db.commit().await.unwrap();
811 drop(db); let db = open_db(context.with_label("second")).await;
816 assert_eq!(db.bounds().await.end, 2003);
817 let root = db.root();
818 assert_ne!(root, halfway_root);
819
820 drop(db);
822 let db = open_db(context.with_label("third")).await;
823 assert_eq!(db.bounds().await.end, 2003);
824 assert_eq!(db.root(), root);
825
826 db.destroy().await.unwrap();
827 }
828
829 pub(crate) async fn test_immutable_recovery_from_failed_log_sync<F: Family, V, C>(
830 context: deterministic::Context,
831 open_db: impl Fn(
832 deterministic::Context,
833 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
834 ) where
835 V: ValueEncoding<Value = Digest>,
836 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
837 C::Item: EncodeShared,
838 {
839 let mut db = open_db(context.with_label("first")).await;
840
841 let k1 = Sha256::fill(1u8);
843 let v1 = Sha256::fill(3u8);
844 db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, None))
845 .await
846 .unwrap();
847 db.commit().await.unwrap();
848 let first_commit_root = db.root();
849
850 drop(db);
853
854 let db = open_db(context.with_label("second")).await;
856 assert_eq!(db.bounds().await.end, 3);
857 let root = db.root();
858 assert_eq!(root, first_commit_root);
859
860 db.destroy().await.unwrap();
861 }
862
863 pub(crate) async fn test_immutable_pruning<F: Family, V, C>(
864 context: deterministic::Context,
865 open_db: impl Fn(
866 deterministic::Context,
867 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
868 ) where
869 V: ValueEncoding<Value = Digest>,
870 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
871 C::Item: EncodeShared,
872 {
873 const ELEMENTS: u64 = 2_000;
875 let mut db = open_db(context.with_label("first")).await;
876
877 let mut sorted_keys: Vec<sha256::Digest> = (1u64..ELEMENTS + 1)
880 .map(|i| Sha256::hash(&i.to_be_bytes()))
881 .collect();
882 sorted_keys.sort();
883 let mut batch = db.new_batch();
888 for i in 1u64..ELEMENTS + 1 {
889 let k = Sha256::hash(&i.to_be_bytes());
890 let v = Sha256::fill(i as u8);
891 batch = batch.set(k, v);
892 }
893 let merkleized = batch.merkleize(&db, None);
894 db.apply_batch(merkleized).await.unwrap();
895 assert_eq!(db.bounds().await.end, ELEMENTS + 2);
896
897 db.prune(Location::new((ELEMENTS + 2) / 2)).await.unwrap();
899 let bounds = db.bounds().await;
900 assert_eq!(bounds.end, ELEMENTS + 2);
901
902 let oldest_retained_loc = bounds.start;
905 assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
906
907 let pruned_key = sorted_keys[*oldest_retained_loc as usize - 2];
909 assert!(db.get(&pruned_key).await.unwrap().is_none());
910
911 let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
913 assert!(db.get(&unpruned_key).await.unwrap().is_some());
914
915 let root = db.root();
917 db.sync().await.unwrap();
918 drop(db);
919
920 let mut db = open_db(context.with_label("second")).await;
921 assert_eq!(root, db.root());
922 let bounds = db.bounds().await;
923 assert_eq!(bounds.end, ELEMENTS + 2);
924 let oldest_retained_loc = bounds.start;
925 assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
926
927 let loc = Location::new(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
929 db.prune(loc).await.unwrap();
930 let oldest_retained_loc = db.bounds().await.start;
932 assert_eq!(
933 oldest_retained_loc,
934 Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
935 );
936
937 db.sync().await.unwrap();
939 drop(db);
940 let db = open_db(context.with_label("third")).await;
941 let oldest_retained_loc = db.bounds().await.start;
942 assert_eq!(
943 oldest_retained_loc,
944 Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
945 );
946
947 let pruned_key = sorted_keys[*oldest_retained_loc as usize - 4];
949 assert!(db.get(&pruned_key).await.unwrap().is_none());
950
951 let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
953 assert!(db.get(&unpruned_key).await.unwrap().is_some());
954
955 let pruned_pos = ELEMENTS / 2;
957 let proof_result = db
958 .proof(Location::new(pruned_pos), NZU64!(pruned_pos + 100))
959 .await;
960 assert!(
961 matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos)
962 );
963
964 db.destroy().await.unwrap();
965 }
966
967 pub(crate) async fn test_immutable_prune_beyond_commit<F: Family, V, C>(
968 context: deterministic::Context,
969 open_db: impl Fn(
970 deterministic::Context,
971 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
972 ) where
973 V: ValueEncoding<Value = Digest>,
974 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
975 C::Item: EncodeShared,
976 {
977 let mut db = open_db(context.with_label("test")).await;
978
979 let result = db.prune(Location::new(1)).await;
981 assert!(
982 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
983 if prune_loc == Location::new(1) && commit_loc == Location::new(0))
984 );
985
986 let k1 = Digest::from(*b"12345678901234567890123456789012");
988 let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
989 let k3 = Digest::from(*b"99999999999999999999999999999999");
990 let v1 = Sha256::fill(1u8);
991 let v2 = Sha256::fill(2u8);
992 let v3 = Sha256::fill(3u8);
993
994 db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(&db, None))
995 .await
996 .unwrap();
997
998 assert_eq!(*db.last_commit_loc, 3);
1000
1001 db.apply_batch(db.new_batch().set(k3, v3).merkleize(&db, None))
1002 .await
1003 .unwrap();
1004
1005 assert!(db.prune(Location::new(3)).await.is_ok());
1007
1008 let new_last_commit = db.last_commit_loc;
1010 let beyond = new_last_commit + 1;
1011 let result = db.prune(beyond).await;
1012 assert!(
1013 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1014 if prune_loc == beyond && commit_loc == new_last_commit)
1015 );
1016
1017 db.destroy().await.unwrap();
1018 }
1019
1020 async fn commit_sets<F: Family, V, C>(
1021 db: &mut TestDb<F, V, C>,
1022 sets: impl IntoIterator<Item = (Digest, V::Value)>,
1023 metadata: Option<V::Value>,
1024 ) -> Range<Location<F>>
1025 where
1026 V: ValueEncoding<Value = Digest>,
1027 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1028 C::Item: EncodeShared,
1029 {
1030 let mut batch = db.new_batch();
1031 for (key, value) in sets {
1032 batch = batch.set(key, value);
1033 }
1034 let range = db.apply_batch(batch.merkleize(db, metadata)).await.unwrap();
1035 db.commit().await.unwrap();
1036 range
1037 }
1038
1039 pub(crate) async fn test_immutable_rewind_recovery<F: Family, V, C>(
1040 context: deterministic::Context,
1041 open_db: impl Fn(
1042 deterministic::Context,
1043 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1044 ) where
1045 V: ValueEncoding<Value = Digest>,
1046 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1047 C::Item: EncodeShared,
1048 {
1049 let mut db = open_db(context.with_label("db")).await;
1050
1051 let key1 = Sha256::hash(&1u64.to_be_bytes());
1052 let key2 = Sha256::hash(&2u64.to_be_bytes());
1053 let key3 = Sha256::hash(&3u64.to_be_bytes());
1054 let key4 = Sha256::hash(&4u64.to_be_bytes());
1055
1056 let value1 = Sha256::fill(11u8);
1057 let value2 = Sha256::fill(22u8);
1058 let value3 = Sha256::fill(33u8);
1059 let value4 = Sha256::fill(66u8);
1060
1061 let metadata_a = Sha256::fill(44u8);
1062 let first_range =
1063 commit_sets(&mut db, [(key1, value1), (key2, value2)], Some(metadata_a)).await;
1064 let size_before = db.bounds().await.end;
1065 let root_before = db.root();
1066 let last_commit_before = db.last_commit_loc;
1067 assert_eq!(size_before, first_range.end);
1068
1069 let metadata_b = Sha256::fill(55u8);
1070 let second_range =
1071 commit_sets(&mut db, [(key3, value3), (key4, value4)], Some(metadata_b)).await;
1072 assert_eq!(second_range.start, size_before);
1073 assert_ne!(db.root(), root_before);
1074 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_b));
1075 assert_eq!(db.get(&key3).await.unwrap(), Some(value3));
1076 assert_eq!(db.get(&key4).await.unwrap(), Some(value4));
1077
1078 db.rewind(size_before).await.unwrap();
1079 assert_eq!(db.root(), root_before);
1080 assert_eq!(db.bounds().await.end, size_before);
1081 assert_eq!(db.last_commit_loc, last_commit_before);
1082 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1083 assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1084 assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1085 assert_eq!(db.get(&key3).await.unwrap(), None);
1086 assert_eq!(db.get(&key4).await.unwrap(), None);
1087
1088 db.commit().await.unwrap();
1089 drop(db);
1090 let db = open_db(context.with_label("reopen")).await;
1091 assert_eq!(db.root(), root_before);
1092 assert_eq!(db.bounds().await.end, size_before);
1093 assert_eq!(db.last_commit_loc, last_commit_before);
1094 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1095 assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1096 assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1097 assert_eq!(db.get(&key3).await.unwrap(), None);
1098 assert_eq!(db.get(&key4).await.unwrap(), None);
1099
1100 db.destroy().await.unwrap();
1101 }
1102
1103 pub(crate) async fn test_immutable_rewind_pruned_target_errors<F: Family, V, C>(
1104 context: deterministic::Context,
1105 open_small_sections_db: impl Fn(
1106 deterministic::Context,
1107 )
1108 -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1109 ) where
1110 V: ValueEncoding<Value = Digest>,
1111 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1112 C::Item: EncodeShared,
1113 {
1114 let mut db = open_small_sections_db(context.with_label("db")).await;
1115
1116 let first_range = commit_sets(
1117 &mut db,
1118 (0u64..16).map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8))),
1119 None,
1120 )
1121 .await;
1122
1123 let mut round = 0u64;
1124 loop {
1125 round += 1;
1126 assert!(
1127 round <= 64,
1128 "failed to prune enough history for rewind test"
1129 );
1130
1131 commit_sets(
1132 &mut db,
1133 (0u64..16).map(|i| {
1134 let seed = round * 100 + i;
1135 (Sha256::hash(&seed.to_be_bytes()), Sha256::fill(seed as u8))
1136 }),
1137 None,
1138 )
1139 .await;
1140 db.prune(db.last_commit_loc).await.unwrap();
1141
1142 if db.bounds().await.start > first_range.start {
1143 break;
1144 }
1145 }
1146
1147 let oldest_retained = db.bounds().await.start;
1148 let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
1149 assert!(
1150 matches!(
1151 boundary_err,
1152 Error::Journal(crate::journal::Error::ItemPruned(_))
1153 ),
1154 "unexpected rewind error at retained boundary: {boundary_err:?}"
1155 );
1156
1157 let err = db.rewind(first_range.start).await.unwrap_err();
1158 assert!(
1159 matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1160 "unexpected rewind error: {err:?}"
1161 );
1162
1163 db.destroy().await.unwrap();
1164 }
1165
1166 pub(crate) async fn test_immutable_batch_get_read_through<F: Family, V, C>(
1168 context: deterministic::Context,
1169 open_db: impl Fn(
1170 deterministic::Context,
1171 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1172 ) where
1173 V: ValueEncoding<Value = Digest>,
1174 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1175 C::Item: EncodeShared,
1176 {
1177 let mut db = open_db(context.with_label("db")).await;
1178
1179 let key_a = Sha256::hash(&0u64.to_be_bytes());
1181 let val_a = Sha256::fill(1u8);
1182 db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None))
1183 .await
1184 .unwrap();
1185
1186 let mut batch = db.new_batch();
1188 assert_eq!(batch.get(&key_a, &db).await.unwrap(), Some(val_a));
1189
1190 let key_b = Sha256::hash(&1u64.to_be_bytes());
1192 let val_b = Sha256::fill(2u8);
1193 batch = batch.set(key_b, val_b);
1194 assert_eq!(batch.get(&key_b, &db).await.unwrap(), Some(val_b));
1195
1196 let key_c = Sha256::hash(&2u64.to_be_bytes());
1198 assert_eq!(batch.get(&key_c, &db).await.unwrap(), None);
1199
1200 db.destroy().await.unwrap();
1201 }
1202
1203 pub(crate) async fn test_immutable_batch_stacked_get<F: Family, V, C>(
1205 context: deterministic::Context,
1206 open_db: impl Fn(
1207 deterministic::Context,
1208 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1209 ) where
1210 V: ValueEncoding<Value = Digest>,
1211 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1212 C::Item: EncodeShared,
1213 {
1214 let db = open_db(context.with_label("db")).await;
1215
1216 let key_a = Sha256::hash(&0u64.to_be_bytes());
1218 let val_a = Sha256::fill(10u8);
1219 let parent = db.new_batch().set(key_a, val_a);
1220 let parent_m = parent.merkleize(&db, None);
1221
1222 let mut child = parent_m.new_batch::<Sha256>();
1224 assert_eq!(child.get(&key_a, &db).await.unwrap(), Some(val_a));
1225
1226 let key_b = Sha256::hash(&1u64.to_be_bytes());
1228 let val_b = Sha256::fill(20u8);
1229 child = child.set(key_b, val_b);
1230 assert_eq!(child.get(&key_b, &db).await.unwrap(), Some(val_b));
1231
1232 let key_c = Sha256::hash(&2u64.to_be_bytes());
1234 assert_eq!(child.get(&key_c, &db).await.unwrap(), None);
1235
1236 db.destroy().await.unwrap();
1237 }
1238
1239 pub(crate) async fn test_immutable_batch_stacked_apply<F: Family, V, C>(
1241 context: deterministic::Context,
1242 open_db: impl Fn(
1243 deterministic::Context,
1244 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1245 ) where
1246 V: ValueEncoding<Value = Digest>,
1247 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1248 C::Item: EncodeShared,
1249 {
1250 let mut db = open_db(context.with_label("db")).await;
1251
1252 let mut kvs_first: Vec<(Digest, Digest)> = (0u64..5)
1254 .map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
1255 .collect();
1256 kvs_first.sort_by(|a, b| a.0.cmp(&b.0));
1257
1258 let mut kvs_second: Vec<(Digest, Digest)> = (5u64..10)
1259 .map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
1260 .collect();
1261 kvs_second.sort_by(|a, b| a.0.cmp(&b.0));
1262
1263 let mut parent = db.new_batch();
1265 for (k, v) in &kvs_first {
1266 parent = parent.set(*k, *v);
1267 }
1268 let parent_m = parent.merkleize(&db, None);
1269
1270 let mut child = parent_m.new_batch::<Sha256>();
1272 for (k, v) in &kvs_second {
1273 child = child.set(*k, *v);
1274 }
1275 let child_m = child.merkleize(&db, None);
1276 let expected_root = child_m.root();
1277 db.apply_batch(child_m).await.unwrap();
1278
1279 assert_eq!(db.root(), expected_root);
1280
1281 for (k, v) in kvs_first.iter().chain(kvs_second.iter()) {
1283 assert_eq!(db.get(k).await.unwrap(), Some(*v));
1284 }
1285
1286 db.destroy().await.unwrap();
1287 }
1288
1289 pub(crate) async fn test_immutable_batch_speculative_root<F: Family, V, C>(
1291 context: deterministic::Context,
1292 open_db: impl Fn(
1293 deterministic::Context,
1294 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1295 ) where
1296 V: ValueEncoding<Value = Digest>,
1297 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1298 C::Item: EncodeShared,
1299 {
1300 let mut db = open_db(context.with_label("db")).await;
1301
1302 let mut batch = db.new_batch();
1303 for i in 0u8..10 {
1304 let k = Sha256::hash(&[i]);
1305 batch = batch.set(k, Sha256::fill(i));
1306 }
1307 let merkleized = batch.merkleize(&db, None);
1308
1309 let speculative = merkleized.root();
1310 db.apply_batch(merkleized).await.unwrap();
1311 assert_eq!(db.root(), speculative);
1312
1313 let metadata = Some(Sha256::fill(55u8));
1315 let mut batch = db.new_batch();
1316 let k = Sha256::hash(&[0xAA]);
1317 batch = batch.set(k, Sha256::fill(0xAA));
1318 let merkleized = batch.merkleize(&db, metadata);
1319 let speculative = merkleized.root();
1320 db.apply_batch(merkleized).await.unwrap();
1321 assert_eq!(db.root(), speculative);
1322
1323 db.destroy().await.unwrap();
1324 }
1325
1326 pub(crate) async fn test_immutable_merkleized_batch_get<F: Family, V, C>(
1328 context: deterministic::Context,
1329 open_db: impl Fn(
1330 deterministic::Context,
1331 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1332 ) where
1333 V: ValueEncoding<Value = Digest>,
1334 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1335 C::Item: EncodeShared,
1336 {
1337 let mut db = open_db(context.with_label("db")).await;
1338
1339 let key_a = Sha256::hash(&0u64.to_be_bytes());
1341 let val_a = Sha256::fill(10u8);
1342 db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None))
1343 .await
1344 .unwrap();
1345
1346 let key_b = Sha256::hash(&1u64.to_be_bytes());
1348 let val_b = Sha256::fill(20u8);
1349 let merkleized = db.new_batch().set(key_b, val_b).merkleize(&db, None);
1350
1351 assert_eq!(merkleized.get(&key_a, &db).await.unwrap(), Some(val_a));
1353
1354 assert_eq!(merkleized.get(&key_b, &db).await.unwrap(), Some(val_b));
1356
1357 let key_c = Sha256::hash(&2u64.to_be_bytes());
1359 assert_eq!(merkleized.get(&key_c, &db).await.unwrap(), None);
1360
1361 db.destroy().await.unwrap();
1362 }
1363
1364 pub(crate) async fn test_immutable_batch_sequential_apply<F: Family, V, C>(
1366 context: deterministic::Context,
1367 open_db: impl Fn(
1368 deterministic::Context,
1369 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1370 ) where
1371 V: ValueEncoding<Value = Digest>,
1372 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1373 C::Item: EncodeShared,
1374 {
1375 let mut db = open_db(context.with_label("db")).await;
1376
1377 let key_a = Sha256::hash(&0u64.to_be_bytes());
1378 let val_a = Sha256::fill(1u8);
1379
1380 let m = db.new_batch().set(key_a, val_a).merkleize(&db, None);
1382 let root1 = m.root();
1383 db.apply_batch(m).await.unwrap();
1384 assert_eq!(db.root(), root1);
1385 assert_eq!(db.get(&key_a).await.unwrap(), Some(val_a));
1386
1387 let key_b = Sha256::hash(&1u64.to_be_bytes());
1389 let val_b = Sha256::fill(2u8);
1390 let m = db.new_batch().set(key_b, val_b).merkleize(&db, None);
1391 let root2 = m.root();
1392 db.apply_batch(m).await.unwrap();
1393 assert_eq!(db.root(), root2);
1394 assert_eq!(db.get(&key_b).await.unwrap(), Some(val_b));
1395
1396 db.destroy().await.unwrap();
1397 }
1398
1399 pub(crate) async fn test_immutable_batch_many_sequential<F: Family, V, C>(
1401 context: deterministic::Context,
1402 open_db: impl Fn(
1403 deterministic::Context,
1404 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1405 ) where
1406 V: ValueEncoding<Value = Digest>,
1407 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1408 C::Item: EncodeShared,
1409 {
1410 let mut db = open_db(context.with_label("db")).await;
1411 let hasher = StandardHasher::<Sha256>::new();
1412
1413 const BATCHES: u64 = 20;
1414 const KEYS_PER_BATCH: u64 = 5;
1415
1416 let mut all_kvs: Vec<(Digest, Digest)> = Vec::new();
1417
1418 for batch_idx in 0..BATCHES {
1419 let mut batch = db.new_batch();
1420 for j in 0..KEYS_PER_BATCH {
1421 let seed = batch_idx * 100 + j;
1422 let k = Sha256::hash(&seed.to_be_bytes());
1423 let v = Sha256::fill(seed as u8);
1424 batch = batch.set(k, v);
1425 all_kvs.push((k, v));
1426 }
1427 let merkleized = batch.merkleize(&db, None);
1428 db.apply_batch(merkleized).await.unwrap();
1429 }
1430
1431 for (k, v) in &all_kvs {
1433 assert_eq!(db.get(k).await.unwrap(), Some(*v));
1434 }
1435
1436 let root = db.root();
1438 let (proof, ops) = db.proof(Location::new(0), NZU64!(10000)).await.unwrap();
1439 assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1440
1441 let expected = 1 + BATCHES * (KEYS_PER_BATCH + 1);
1443 assert_eq!(db.bounds().await.end, expected);
1444
1445 db.destroy().await.unwrap();
1446 }
1447
1448 pub(crate) async fn test_immutable_batch_empty_batch<F: Family, V, C>(
1450 context: deterministic::Context,
1451 open_db: impl Fn(
1452 deterministic::Context,
1453 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1454 ) where
1455 V: ValueEncoding<Value = Digest>,
1456 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1457 C::Item: EncodeShared,
1458 {
1459 let mut db = open_db(context.with_label("db")).await;
1460
1461 let k = Sha256::hash(&[1u8]);
1463 db.apply_batch(
1464 db.new_batch()
1465 .set(k, Sha256::fill(1u8))
1466 .merkleize(&db, None),
1467 )
1468 .await
1469 .unwrap();
1470 let root_before = db.root();
1471 let size_before = db.bounds().await.end;
1472
1473 let merkleized = db.new_batch().merkleize(&db, None);
1475 let speculative = merkleized.root();
1476 db.apply_batch(merkleized).await.unwrap();
1477
1478 assert_ne!(db.root(), root_before);
1480 assert_eq!(db.root(), speculative);
1481 assert_eq!(db.bounds().await.end, size_before + 1);
1483
1484 db.destroy().await.unwrap();
1485 }
1486
1487 pub(crate) async fn test_immutable_batch_chained_merkleized_get<F: Family, V, C>(
1489 context: deterministic::Context,
1490 open_db: impl Fn(
1491 deterministic::Context,
1492 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1493 ) where
1494 V: ValueEncoding<Value = Digest>,
1495 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1496 C::Item: EncodeShared,
1497 {
1498 let mut db = open_db(context.with_label("db")).await;
1499
1500 let key_a = Sha256::hash(&0u64.to_be_bytes());
1502 let val_a = Sha256::fill(10u8);
1503 db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None))
1504 .await
1505 .unwrap();
1506
1507 let key_b = Sha256::hash(&1u64.to_be_bytes());
1509 let val_b = Sha256::fill(1u8);
1510 let parent_m = db.new_batch().set(key_b, val_b).merkleize(&db, None);
1511
1512 let key_c = Sha256::hash(&2u64.to_be_bytes());
1514 let val_c = Sha256::fill(2u8);
1515 let child_m = parent_m
1516 .new_batch::<Sha256>()
1517 .set(key_c, val_c)
1518 .merkleize(&db, None);
1519
1520 assert_eq!(child_m.get(&key_a, &db).await.unwrap(), Some(val_a));
1523 assert_eq!(child_m.get(&key_b, &db).await.unwrap(), Some(val_b));
1525 assert_eq!(child_m.get(&key_c, &db).await.unwrap(), Some(val_c));
1527 let key_d = Sha256::hash(&3u64.to_be_bytes());
1529 assert_eq!(child_m.get(&key_d, &db).await.unwrap(), None);
1530
1531 db.destroy().await.unwrap();
1532 }
1533
1534 pub(crate) async fn test_immutable_batch_large<F: Family, V, C>(
1536 context: deterministic::Context,
1537 open_db: impl Fn(
1538 deterministic::Context,
1539 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1540 ) where
1541 V: ValueEncoding<Value = Digest>,
1542 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1543 C::Item: EncodeShared,
1544 {
1545 let mut db = open_db(context.with_label("db")).await;
1546 let hasher = StandardHasher::<Sha256>::new();
1547
1548 const N: u64 = 500;
1549 let mut kvs: Vec<(Digest, Digest)> = Vec::new();
1550
1551 let mut batch = db.new_batch();
1552 for i in 0..N {
1553 let k = Sha256::hash(&i.to_be_bytes());
1554 let v = Sha256::fill((i % 256) as u8);
1555 batch = batch.set(k, v);
1556 kvs.push((k, v));
1557 }
1558 let merkleized = batch.merkleize(&db, None);
1559 db.apply_batch(merkleized).await.unwrap();
1560
1561 for (k, v) in &kvs {
1563 assert_eq!(db.get(k).await.unwrap(), Some(*v));
1564 }
1565
1566 let root = db.root();
1568 let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1569 assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1570
1571 assert_eq!(db.bounds().await.end, 1 + N + 1);
1573
1574 db.destroy().await.unwrap();
1575 }
1576
1577 pub(crate) async fn test_immutable_batch_chained_key_override<F: Family, V, C>(
1579 context: deterministic::Context,
1580 open_db: impl Fn(
1581 deterministic::Context,
1582 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1583 ) where
1584 V: ValueEncoding<Value = Digest>,
1585 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1586 C::Item: EncodeShared,
1587 {
1588 let mut db = open_db(context.with_label("db")).await;
1589
1590 let key = Sha256::hash(&0u64.to_be_bytes());
1591 let val_parent = Sha256::fill(1u8);
1592 let val_child = Sha256::fill(2u8);
1593
1594 let parent_m = db.new_batch().set(key, val_parent).merkleize(&db, None);
1596
1597 let mut child = parent_m.new_batch::<Sha256>();
1599 child = child.set(key, val_child);
1600
1601 assert_eq!(child.get(&key, &db).await.unwrap(), Some(val_child));
1603
1604 let child_m = child.merkleize(&db, None);
1605
1606 assert_eq!(child_m.get(&key, &db).await.unwrap(), Some(val_child));
1608
1609 db.apply_batch(child_m).await.unwrap();
1611 assert_eq!(db.get(&key).await.unwrap(), Some(val_child));
1612
1613 db.destroy().await.unwrap();
1614 }
1615
1616 pub(crate) async fn test_immutable_batch_sequential_key_override<F: Family, V, C>(
1623 context: deterministic::Context,
1624 open_db_small_sections: impl Fn(
1625 deterministic::Context,
1626 )
1627 -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1628 ) where
1629 V: ValueEncoding<Value = Digest>,
1630 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1631 C::Item: EncodeShared,
1632 {
1633 let mut db = open_db_small_sections(context.with_label("db")).await;
1634
1635 let key = Sha256::hash(&0u64.to_be_bytes());
1636 let v1 = Sha256::fill(1u8);
1637 let v2 = Sha256::fill(2u8);
1638
1639 db.apply_batch(db.new_batch().set(key, v1).merkleize(&db, None))
1642 .await
1643 .unwrap();
1644 assert_eq!(db.get(&key).await.unwrap(), Some(v1));
1645
1646 db.apply_batch(db.new_batch().set(key, v2).merkleize(&db, None))
1649 .await
1650 .unwrap();
1651
1652 assert_eq!(db.get(&key).await.unwrap(), Some(v1));
1654
1655 db.prune(Location::new(2)).await.unwrap();
1658 assert_eq!(db.get(&key).await.unwrap(), Some(v2));
1659
1660 db.sync().await.unwrap();
1662
1663 db.destroy().await.unwrap();
1664 }
1665
1666 pub(crate) async fn test_immutable_batch_metadata<F: Family, V, C>(
1668 context: deterministic::Context,
1669 open_db: impl Fn(
1670 deterministic::Context,
1671 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1672 ) where
1673 V: ValueEncoding<Value = Digest>,
1674 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1675 C::Item: EncodeShared,
1676 {
1677 let mut db = open_db(context.with_label("db")).await;
1678
1679 let metadata = Sha256::fill(42u8);
1681 let k = Sha256::hash(&[1u8]);
1682 db.apply_batch(
1683 db.new_batch()
1684 .set(k, Sha256::fill(1u8))
1685 .merkleize(&db, Some(metadata)),
1686 )
1687 .await
1688 .unwrap();
1689 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1690
1691 db.apply_batch(db.new_batch().merkleize(&db, None))
1693 .await
1694 .unwrap();
1695 assert_eq!(db.get_metadata().await.unwrap(), None);
1696
1697 db.destroy().await.unwrap();
1698 }
1699
1700 pub(crate) async fn test_immutable_stale_batch_rejected<F: Family, V, C>(
1701 context: deterministic::Context,
1702 open_db: impl Fn(
1703 deterministic::Context,
1704 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1705 ) where
1706 V: ValueEncoding<Value = Digest>,
1707 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1708 C::Item: EncodeShared,
1709 {
1710 let mut db = open_db(context.with_label("db")).await;
1711
1712 let key1 = Sha256::hash(&[1]);
1713 let key2 = Sha256::hash(&[2]);
1714 let v1 = Sha256::fill(10u8);
1715 let v2 = Sha256::fill(20u8);
1716
1717 let batch_a = db.new_batch().set(key1, v1).merkleize(&db, None);
1719 let batch_b = db.new_batch().set(key2, v2).merkleize(&db, None);
1720
1721 db.apply_batch(batch_a).await.unwrap();
1723 let expected_root = db.root();
1724 let expected_bounds = db.bounds().await;
1725 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1726 assert_eq!(db.get(&key2).await.unwrap(), None);
1727 assert_eq!(db.get_metadata().await.unwrap(), None);
1728
1729 let result = db.apply_batch(batch_b).await;
1731 assert!(
1732 matches!(result, Err(Error::StaleBatch { .. })),
1733 "expected StaleBatch error, got {result:?}"
1734 );
1735 assert_eq!(db.root(), expected_root);
1736 assert_eq!(db.bounds().await, expected_bounds);
1737 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1738 assert_eq!(db.get(&key2).await.unwrap(), None);
1739 assert_eq!(db.get_metadata().await.unwrap(), None);
1740
1741 db.destroy().await.unwrap();
1742 }
1743
1744 pub(crate) async fn test_immutable_stale_batch_chained<F: Family, V, C>(
1745 context: deterministic::Context,
1746 open_db: impl Fn(
1747 deterministic::Context,
1748 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1749 ) where
1750 V: ValueEncoding<Value = Digest>,
1751 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1752 C::Item: EncodeShared,
1753 {
1754 let mut db = open_db(context.with_label("db")).await;
1755
1756 let key1 = Sha256::hash(&[1]);
1757 let key2 = Sha256::hash(&[2]);
1758 let key3 = Sha256::hash(&[3]);
1759
1760 let parent_m = db
1762 .new_batch()
1763 .set(key1, Sha256::fill(1u8))
1764 .merkleize(&db, None);
1765
1766 let child_a = parent_m
1768 .new_batch::<Sha256>()
1769 .set(key2, Sha256::fill(2u8))
1770 .merkleize(&db, None);
1771 let child_b = parent_m
1772 .new_batch::<Sha256>()
1773 .set(key3, Sha256::fill(3u8))
1774 .merkleize(&db, None);
1775
1776 db.apply_batch(child_a).await.unwrap();
1778
1779 let result = db.apply_batch(child_b).await;
1781 assert!(
1782 matches!(result, Err(Error::StaleBatch { .. })),
1783 "expected StaleBatch error, got {result:?}"
1784 );
1785
1786 db.destroy().await.unwrap();
1787 }
1788
1789 pub(crate) async fn test_immutable_partial_ancestor_commit<F: Family, V, C>(
1790 context: deterministic::Context,
1791 open_db: impl Fn(
1792 deterministic::Context,
1793 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1794 ) where
1795 V: ValueEncoding<Value = Digest>,
1796 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1797 C::Item: EncodeShared,
1798 {
1799 let mut db = open_db(context.with_label("db")).await;
1800
1801 let key1 = Sha256::hash(&[1]);
1802 let key2 = Sha256::hash(&[2]);
1803 let key3 = Sha256::hash(&[3]);
1804 let v1 = Sha256::fill(1u8);
1805 let v2 = Sha256::fill(2u8);
1806 let v3 = Sha256::fill(3u8);
1807
1808 let a = db.new_batch().set(key1, v1).merkleize(&db, None);
1810 let b = a.new_batch::<Sha256>().set(key2, v2).merkleize(&db, None);
1811 let c = b.new_batch::<Sha256>().set(key3, v3).merkleize(&db, None);
1812
1813 let expected_root = c.root();
1814
1815 db.apply_batch(a).await.unwrap();
1817 db.apply_batch(c).await.unwrap();
1818
1819 assert_eq!(db.root(), expected_root);
1820 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1821 assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
1822 assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
1823
1824 db.destroy().await.unwrap();
1825 }
1826
1827 pub(crate) async fn test_immutable_sequential_commit_parent_then_child<F: Family, V, C>(
1828 context: deterministic::Context,
1829 open_db: impl Fn(
1830 deterministic::Context,
1831 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1832 ) where
1833 V: ValueEncoding<Value = Digest>,
1834 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1835 C::Item: EncodeShared,
1836 {
1837 let mut db = open_db(context.with_label("db")).await;
1838
1839 let key1 = Sha256::hash(&[1]);
1840 let key2 = Sha256::hash(&[2]);
1841 let v1 = Sha256::fill(1u8);
1842 let v2 = Sha256::fill(2u8);
1843
1844 let parent_m = db.new_batch().set(key1, v1).merkleize(&db, None);
1846
1847 let child_m = parent_m
1849 .new_batch::<Sha256>()
1850 .set(key2, v2)
1851 .merkleize(&db, None);
1852
1853 db.apply_batch(parent_m).await.unwrap();
1855 db.apply_batch(child_m).await.unwrap();
1856
1857 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1859 assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
1860
1861 db.destroy().await.unwrap();
1862 }
1863
1864 pub(crate) async fn test_immutable_child_root_matches_pending_and_committed<F: Family, V, C>(
1865 context: deterministic::Context,
1866 open_db: impl Fn(
1867 deterministic::Context,
1868 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1869 ) where
1870 V: ValueEncoding<Value = Digest>,
1871 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1872 C::Item: EncodeShared,
1873 {
1874 let mut db = open_db(context.with_label("db")).await;
1875
1876 let key1 = Sha256::hash(&[1]);
1877 let key2 = Sha256::hash(&[2]);
1878
1879 let parent = db
1881 .new_batch()
1882 .set(key1, Sha256::fill(1u8))
1883 .merkleize(&db, None);
1884 let pending_child = parent
1885 .new_batch::<Sha256>()
1886 .set(key2, Sha256::fill(2u8))
1887 .merkleize(&db, None);
1888
1889 db.apply_batch(parent).await.unwrap();
1892 db.commit().await.unwrap();
1893
1894 let committed_child = db
1895 .new_batch()
1896 .set(key2, Sha256::fill(2u8))
1897 .merkleize(&db, None);
1898
1899 assert_eq!(pending_child.root(), committed_child.root());
1900
1901 db.destroy().await.unwrap();
1902 }
1903
1904 pub(crate) async fn test_immutable_stale_batch_child_applied_before_parent<F: Family, V, C>(
1905 context: deterministic::Context,
1906 open_db: impl Fn(
1907 deterministic::Context,
1908 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1909 ) where
1910 V: ValueEncoding<Value = Digest>,
1911 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1912 C::Item: EncodeShared,
1913 {
1914 let mut db = open_db(context.with_label("db")).await;
1915
1916 let key1 = Sha256::hash(&[1]);
1917 let key2 = Sha256::hash(&[2]);
1918
1919 let parent_m = db
1921 .new_batch()
1922 .set(key1, Sha256::fill(1u8))
1923 .merkleize(&db, None);
1924
1925 let child_m = parent_m
1927 .new_batch::<Sha256>()
1928 .set(key2, Sha256::fill(2u8))
1929 .merkleize(&db, None);
1930
1931 db.apply_batch(child_m).await.unwrap();
1933
1934 let result = db.apply_batch(parent_m).await;
1936 assert!(
1937 matches!(result, Err(Error::StaleBatch { .. })),
1938 "expected StaleBatch error, got {result:?}"
1939 );
1940
1941 db.destroy().await.unwrap();
1942 }
1943
1944 pub(crate) async fn test_immutable_to_batch<F: Family, V, C>(
1947 context: deterministic::Context,
1948 open_db: impl Fn(
1949 deterministic::Context,
1950 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1951 ) where
1952 V: ValueEncoding<Value = Digest>,
1953 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1954 C::Item: EncodeShared,
1955 {
1956 let mut db = open_db(context.with_label("db")).await;
1957
1958 let key1 = Sha256::hash(&[1]);
1960 let v1 = Sha256::fill(10u8);
1961 db.apply_batch(db.new_batch().set(key1, v1).merkleize(&db, None))
1962 .await
1963 .unwrap();
1964
1965 let snapshot = db.to_batch();
1967 assert_eq!(snapshot.root(), db.root());
1968
1969 let key2 = Sha256::hash(&[2]);
1971 let v2 = Sha256::fill(20u8);
1972 let child = snapshot
1973 .new_batch::<Sha256>()
1974 .set(key2, v2)
1975 .merkleize(&db, None);
1976 db.apply_batch(child).await.unwrap();
1977
1978 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
1979 assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
1980
1981 db.destroy().await.unwrap();
1982 }
1983
1984 pub(crate) async fn test_immutable_apply_after_ancestor_dropped<F: Family, V, C>(
1987 context: deterministic::Context,
1988 open_db: impl Fn(
1989 deterministic::Context,
1990 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1991 ) where
1992 V: ValueEncoding<Value = Digest>,
1993 C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
1994 C::Item: EncodeShared,
1995 {
1996 let mut db = open_db(context.with_label("db")).await;
1997
1998 let key1 = Sha256::hash(&[1]);
1999 let key2 = Sha256::hash(&[2]);
2000 let key3 = Sha256::hash(&[3]);
2001 let v1 = Sha256::fill(1u8);
2002 let v2 = Sha256::fill(2u8);
2003 let v3 = Sha256::fill(3u8);
2004
2005 let a = db.new_batch().set(key1, v1).merkleize(&db, None);
2007 let b = a.new_batch::<Sha256>().set(key2, v2).merkleize(&db, None);
2008 let c = b.new_batch::<Sha256>().set(key3, v3).merkleize(&db, None);
2009
2010 drop(a);
2012 drop(b);
2013
2014 db.apply_batch(c).await.unwrap();
2016
2017 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2019 assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2020 assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
2021
2022 db.destroy().await.unwrap();
2023 }
2024}