1use crate::{
81 index::{unordered::Index, Unordered as _},
82 journal::{
83 authenticated,
84 contiguous::{Contiguous, Mutable, Reader},
85 Error as JournalError,
86 },
87 merkle::{full::Config as MerkleConfig, Family, Location, Proof},
88 qmdb::{
89 any::ValueEncoding,
90 build_snapshot_from_log,
91 metrics::{KeyReadMetrics, OperationMetrics, StateMetrics},
92 operation::Key,
93 Error,
94 },
95 translator::Translator,
96 Context, Persistable,
97};
98use commonware_codec::EncodeShared;
99use commonware_cryptography::Hasher as CHasher;
100use commonware_parallel::Strategy;
101use std::{collections::HashSet, num::NonZeroU64, ops::Range, sync::Arc};
102use tracing::warn;
103
104pub(crate) struct Metrics<E: Context> {
106 pub state: StateMetrics,
108 pub operations: OperationMetrics<E>,
110 pub reads: KeyReadMetrics<E>,
112}
113
114impl<E: Context> Metrics<E> {
115 pub fn new(context: E) -> Self {
117 let context = Arc::new(context);
118 Self {
119 state: StateMetrics::new(context.as_ref()),
120 operations: OperationMetrics::new(context.clone()),
121 reads: KeyReadMetrics::new(context),
122 }
123 }
124}
125
126pub mod batch;
127mod compact;
128pub mod fixed;
129mod operation;
130pub mod sync;
131pub mod variable;
132
133pub use compact::{
134 Config as CompactConfig, Db as CompactDb, MerkleizedBatch as CompactMerkleizedBatch,
135 UnmerkleizedBatch as CompactUnmerkleizedBatch,
136};
137pub use operation::Operation;
138
139#[derive(Clone)]
141pub struct Config<T: Translator, J, S: Strategy> {
142 pub merkle_config: MerkleConfig<S>,
144
145 pub log: J,
147
148 pub translator: T,
150}
151
152pub struct Immutable<
162 F: Family,
163 E: Context,
164 K: Key,
165 V: ValueEncoding,
166 C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
167 H: CHasher,
168 T: Translator,
169 S: Strategy,
170> where
171 C::Item: EncodeShared,
172{
173 pub(crate) journal: authenticated::Journal<F, E, C, H, S>,
175
176 pub(crate) root: H::Digest,
178
179 pub(crate) snapshot: Index<T, Location<F>>,
185
186 pub(crate) last_commit_loc: Location<F>,
188
189 pub(crate) inactivity_floor_loc: Location<F>,
192
193 metrics: Metrics<E>,
195}
196
197impl<F, E, K, V, C, H, T, S> Immutable<F, E, K, V, C, H, T, S>
199where
200 F: Family,
201 E: Context,
202 K: Key,
203 V: ValueEncoding,
204 C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
205 C::Item: EncodeShared,
206 H: CHasher,
207 T: Translator,
208 S: Strategy,
209{
210 pub(crate) async fn init_from_journal(
215 mut journal: authenticated::Journal<F, E, C, H, S>,
216 context: E,
217 translator: T,
218 ) -> Result<Self, Error<F>> {
219 if journal.size().await == 0 {
220 warn!("Authenticated log is empty, initialized new db.");
221 journal
222 .append(&Operation::Commit(None, Location::new(0)))
223 .await?;
224 journal.sync().await?;
225 }
226
227 let mut snapshot = Index::new(context.child("snapshot"), translator);
228
229 let (last_commit_loc, inactivity_floor_loc) = {
230 let reader = journal.journal.reader().await;
231 let bounds = reader.bounds();
232 let last_commit_loc =
233 Location::new(bounds.end.checked_sub(1).expect("commit should exist"));
234
235 let last_op = reader.read(*last_commit_loc).await?;
237 let inactivity_floor_loc = last_op
238 .has_floor()
239 .expect("last operation should be a commit with floor");
240 if inactivity_floor_loc > last_commit_loc {
241 return Err(Error::DataCorrupted("inactivity floor exceeds last commit"));
242 }
243
244 build_snapshot_from_log::<F, _, _, _>(
246 inactivity_floor_loc,
247 &reader,
248 &mut snapshot,
249 |_, _| {},
250 )
251 .await?;
252
253 (last_commit_loc, inactivity_floor_loc)
254 };
255 let inactive_peaks = F::inactive_peaks(
256 F::location_to_position(Location::new(*last_commit_loc + 1)),
257 inactivity_floor_loc,
258 );
259 let root = journal.root(inactive_peaks)?;
260
261 let metrics = Metrics::new(context);
262 let db = Self {
263 journal,
264 root,
265 snapshot,
266 last_commit_loc,
267 inactivity_floor_loc,
268 metrics,
269 };
270 db.update_metrics().await;
271 Ok(db)
272 }
273
274 pub const fn inactivity_floor_loc(&self) -> Location<F> {
276 self.inactivity_floor_loc
277 }
278
279 pub async fn size(&self) -> Location<F> {
281 self.bounds().await.end
282 }
283
284 pub async fn bounds(&self) -> Range<Location<F>> {
287 let bounds = self.journal.reader().await.bounds();
288 Location::new(bounds.start)..Location::new(bounds.end)
289 }
290
291 async fn update_metrics(&self) {
293 let bounds = self.journal.reader().await.bounds();
294 self.metrics.state.set(
295 bounds.end,
296 bounds.start,
297 *self.inactivity_floor_loc,
298 *self.last_commit_loc,
299 );
300 }
301
302 pub const fn sync_boundary(&self) -> Location<F> {
306 self.inactivity_floor_loc
307 }
308
309 pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error<F>> {
312 let _timer = self.metrics.reads.get_timer();
313 self.metrics.reads.get_calls.inc();
314 self.metrics.reads.keys_requested.inc();
315 let iter = self.snapshot.get(key);
316 let reader = self.journal.reader().await;
317 let oldest = reader.bounds().start;
318 let mut result = None;
319 for &loc in iter {
320 if loc < oldest {
321 continue;
322 }
323 if let Some(v) = Self::get_from_loc(&reader, key, loc).await? {
324 result = Some(v);
325 break;
326 }
327 }
328
329 Ok(result)
330 }
331
332 pub async fn get_many(&self, keys: &[&K]) -> Result<Vec<Option<V::Value>>, Error<F>> {
336 if keys.is_empty() {
337 return Ok(Vec::new());
338 }
339
340 let _timer = self.metrics.reads.get_many_timer();
341 self.metrics.reads.get_many_calls.inc();
342 self.metrics.reads.keys_requested.inc_by(keys.len() as u64);
343 let mut candidates: Vec<(usize, u64)> = Vec::with_capacity(keys.len());
344 let mut results: Vec<Option<V::Value>> = vec![None; keys.len()];
345
346 let reader = self.journal.reader().await;
347 let oldest = reader.bounds().start;
348
349 for (key_idx, key) in keys.iter().enumerate() {
350 for &loc in self.snapshot.get(key) {
351 if loc < oldest {
352 continue;
353 }
354 candidates.push((key_idx, *loc));
355 }
356 }
357
358 if candidates.is_empty() {
359 return Ok(results);
360 }
361
362 candidates.sort_unstable_by_key(|&(_, pos)| pos);
363
364 let mut positions: Vec<u64> = Vec::with_capacity(candidates.len());
365 for &(_, pos) in &candidates {
366 if positions.last() != Some(&pos) {
367 positions.push(pos);
368 }
369 }
370
371 let ops = reader.read_many(&positions).await?;
372
373 for &(key_idx, pos) in &candidates {
374 if results[key_idx].is_some() {
375 continue;
376 }
377 let op_idx = positions
378 .binary_search(&pos)
379 .expect("position was deduped from candidates");
380 let Operation::Set(k, v) = &ops[op_idx] else {
381 return Err(Error::UnexpectedData(Location::new(pos)));
382 };
383 if k == keys[key_idx] {
384 results[key_idx] = Some(v.clone());
385 }
386 }
387
388 Ok(results)
389 }
390
391 async fn get_from_loc(
395 reader: &impl Reader<Item = Operation<F, K, V>>,
396 key: &K,
397 loc: Location<F>,
398 ) -> Result<Option<V::Value>, Error<F>> {
399 if loc < reader.bounds().start {
400 return Err(Error::OperationPruned(loc));
401 }
402
403 let Operation::Set(k, v) = reader.read(*loc).await? else {
404 return Err(Error::UnexpectedData(loc));
405 };
406
407 if k != *key {
408 Ok(None)
409 } else {
410 Ok(Some(v))
411 }
412 }
413
414 pub async fn get_metadata(&self) -> Result<Option<V::Value>, Error<F>> {
416 let last_commit_loc = self.last_commit_loc;
417 let Operation::Commit(metadata, _floor) = self
418 .journal
419 .journal
420 .reader()
421 .await
422 .read(*last_commit_loc)
423 .await?
424 else {
425 unreachable!("no commit operation at location of last commit {last_commit_loc}");
426 };
427
428 Ok(metadata)
429 }
430
431 pub async fn historical_proof(
451 &self,
452 op_count: Location<F>,
453 start_loc: Location<F>,
454 max_ops: NonZeroU64,
455 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, K, V>>), Error<F>> {
456 if op_count > self.journal.size().await {
457 return Err(crate::merkle::Error::RangeOutOfBounds(op_count).into());
458 }
459
460 let reader = self.journal.reader().await;
461 let inactive_peaks =
462 crate::qmdb::inactive_peaks_at::<F, _>(&reader, op_count, |op| op.has_floor()).await?;
463
464 Ok(self
465 .journal
466 .historical_proof(op_count, start_loc, max_ops, inactive_peaks)
467 .await?)
468 }
469
470 pub async fn proof(
477 &self,
478 start_index: Location<F>,
479 max_ops: NonZeroU64,
480 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, K, V>>), Error<F>> {
481 let op_count = self.bounds().await.end;
482 self.historical_proof(op_count, start_index, max_ops).await
483 }
484
485 pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
499 let _timer = self.metrics.operations.prune_timer();
500 self.metrics.operations.prune_calls.inc();
501 if loc > self.inactivity_floor_loc {
502 return Err(Error::PruneBeyondMinRequired(
503 loc,
504 self.inactivity_floor_loc,
505 ));
506 }
507 self.journal.prune(loc).await?;
508 self.update_metrics().await;
509 Ok(())
510 }
511
512 pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
532 let rewind_size = *size;
533 let current_size = *self.last_commit_loc + 1;
534 if rewind_size == current_size {
535 return Ok(());
536 }
537 if rewind_size == 0 || rewind_size > current_size {
538 return Err(Error::Journal(crate::journal::Error::InvalidRewind(
539 rewind_size,
540 )));
541 }
542
543 let (rewind_last_loc, rewind_floor, rewound_keys) = {
544 let reader = self.journal.reader().await;
545 let bounds = reader.bounds();
546 let rewind_last_loc = Location::new(rewind_size - 1);
547 if rewind_size <= bounds.start {
548 return Err(Error::Journal(crate::journal::Error::ItemPruned(
549 *rewind_last_loc,
550 )));
551 }
552 let rewind_last_op = reader.read(*rewind_last_loc).await?;
553 let Operation::Commit(_, rewind_floor) = &rewind_last_op else {
554 return Err(Error::UnexpectedData(rewind_last_loc));
555 };
556 let rewind_floor = *rewind_floor;
557 if *rewind_floor < bounds.start {
558 return Err(Error::Journal(crate::journal::Error::ItemPruned(
559 *rewind_floor,
560 )));
561 }
562
563 let mut rewound_keys = Vec::new();
564 for loc in rewind_size..current_size {
565 if let Operation::Set(key, _) = reader.read(loc).await? {
566 rewound_keys.push(key);
567 }
568 }
569
570 (rewind_last_loc, rewind_floor, rewound_keys)
571 };
572
573 let old_floor = self.inactivity_floor_loc;
574
575 self.journal.rewind(rewind_size).await?;
578
579 let rewind_loc = Location::<F>::new(rewind_size);
581 for key in &rewound_keys {
582 self.snapshot.retain(key, |loc| *loc < rewind_loc);
584 }
585
586 if rewind_floor < old_floor {
593 let reader = self.journal.journal.reader().await;
594 let gap_end = core::cmp::min(*old_floor, rewind_size);
595 for loc in (*rewind_floor..gap_end).rev() {
596 if let Operation::Set(key, _) = reader.read(loc).await? {
597 self.snapshot.insert(&key, Location::new(loc));
598 }
599 }
600 }
601
602 self.last_commit_loc = rewind_last_loc;
603 self.inactivity_floor_loc = rewind_floor;
604 let inactive_peaks = F::inactive_peaks(F::location_to_position(size), rewind_floor);
605 self.root = self.journal.root(inactive_peaks)?;
606 self.update_metrics().await;
607
608 Ok(())
609 }
610
611 pub const fn root(&self) -> H::Digest {
613 self.root
614 }
615
616 pub const fn strategy(&self) -> &S {
618 self.journal.strategy()
619 }
620
621 pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<H::Digest>, Error<F>> {
623 self.journal
624 .merkle
625 .pinned_nodes_at(loc)
626 .await
627 .map_err(Into::into)
628 }
629
630 pub async fn sync(&self) -> Result<(), Error<F>> {
634 let _timer = self.metrics.operations.sync_timer();
635 self.metrics.operations.sync_calls.inc();
636 self.journal.sync().await?;
637 Ok(())
638 }
639
640 pub async fn commit(&self) -> Result<(), Error<F>> {
642 let _timer = self.metrics.operations.commit_timer();
643 self.metrics.operations.commit_calls.inc();
644 self.journal.commit().await?;
645 Ok(())
646 }
647
648 pub async fn destroy(self) -> Result<(), Error<F>> {
650 Ok(self.journal.destroy().await?)
651 }
652
653 #[allow(clippy::type_complexity)]
655 pub fn new_batch(&self) -> batch::UnmerkleizedBatch<F, H, K, V, S> {
656 let journal_size = *self.last_commit_loc + 1;
657 batch::UnmerkleizedBatch::new(self, journal_size)
658 }
659
660 pub async fn apply_batch(
687 &mut self,
688 batch: Arc<batch::MerkleizedBatch<F, H::Digest, K, V, S>>,
689 ) -> Result<Range<Location<F>>, Error<F>> {
690 let _timer = self.metrics.operations.apply_batch_timer();
691 self.metrics.operations.apply_batch_calls.inc();
692 let db_size = *self.last_commit_loc + 1;
693 batch
694 .bounds
695 .validate_apply_to(db_size, self.inactivity_floor_loc)?;
696 let start_loc = Location::new(db_size);
697
698 self.journal.apply_batch(&batch.journal_batch).await?;
700
701 let bounds = self.journal.reader().await.bounds();
704 let mut seen = HashSet::new();
705 for (key, entry) in batch.diff.iter() {
706 seen.insert(key.clone());
707 self.snapshot
708 .insert_and_retain(key, entry.loc, |v| *v >= bounds.start);
709 }
710 for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
711 if batch.bounds.ancestors[i].end <= db_size {
712 continue;
713 }
714 for (key, entry) in ancestor_diff.iter() {
715 if seen.insert(key.clone()) {
716 self.snapshot
717 .insert_and_retain(key, entry.loc, |v| *v >= bounds.start);
718 }
719 }
720 }
721
722 self.last_commit_loc = Location::new(batch.bounds.total_size - 1);
724 self.inactivity_floor_loc = batch.bounds.inactivity_floor;
725 self.root = batch.root;
726 let range = start_loc..Location::new(batch.bounds.total_size);
727 self.update_metrics().await;
728 self.metrics
729 .operations
730 .operations_applied
731 .inc_by(*range.end - *range.start);
732 Ok(range)
733 }
734}
735
736#[cfg(test)]
737pub(super) mod test {
738 use super::*;
739 use crate::{
740 merkle::{Family, Location},
741 qmdb::{self, verify_proof},
742 translator::TwoCap,
743 };
744 use commonware_codec::EncodeShared;
745 use commonware_cryptography::{sha256, sha256::Digest, Sha256};
746 use commonware_runtime::{deterministic, Supervisor as _};
747 use commonware_utils::NZU64;
748 use core::{future::Future, pin::Pin};
749 use std::ops::Range;
750
751 const ITEMS_PER_SECTION: u64 = 5;
752
753 type TestDb<F, V, C> = Immutable<
754 F,
755 deterministic::Context,
756 Digest,
757 V,
758 C,
759 Sha256,
760 TwoCap,
761 commonware_parallel::Sequential,
762 >;
763
764 pub(crate) async fn test_immutable_empty<F: Family, V, C>(
765 context: deterministic::Context,
766 open_db: impl Fn(
767 deterministic::Context,
768 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
769 ) where
770 V: ValueEncoding<Value = Digest>,
771 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
772 C::Item: EncodeShared,
773 {
774 let db = open_db(context.child("first")).await;
775 let bounds = db.bounds().await;
776 assert_eq!(bounds.end, 1);
777 assert_eq!(bounds.start, Location::new(0));
778 assert_eq!(db.inactivity_floor_loc(), Location::new(0));
779 assert!(db.get_metadata().await.unwrap().is_none());
780
781 let k1 = Sha256::fill(1u8);
783 let v1 = Sha256::fill(2u8);
784 let root = db.root();
785 {
786 let _batch = db.new_batch().set(k1, v1);
787 }
789 drop(db);
790 let mut db = open_db(context.child("second")).await;
791 assert_eq!(db.root(), root);
792 assert_eq!(db.bounds().await.end, 1);
793
794 db.apply_batch(db.new_batch().merkleize(&db, None, Location::new(0)))
796 .await
797 .unwrap();
798 db.commit().await.unwrap();
799 assert_eq!(db.bounds().await.end, 2); let root = db.root();
801 drop(db);
802
803 let db = open_db(context.child("third")).await;
804 assert_eq!(db.root(), root);
805
806 db.destroy().await.unwrap();
807 }
808
809 pub(crate) async fn test_immutable_build_basic<F: Family, V, C>(
810 context: deterministic::Context,
811 open_db: impl Fn(
812 deterministic::Context,
813 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
814 ) where
815 V: ValueEncoding<Value = Digest>,
816 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
817 C::Item: EncodeShared,
818 {
819 let mut db = open_db(context.child("first")).await;
821
822 let k1 = Sha256::fill(1u8);
823 let k2 = Sha256::fill(2u8);
824 let v1 = Sha256::fill(3u8);
825 let v2 = Sha256::fill(4u8);
826
827 assert!(db.get(&k1).await.unwrap().is_none());
828 assert!(db.get(&k2).await.unwrap().is_none());
829
830 let metadata = Some(Sha256::fill(99u8));
832 db.apply_batch(
833 db.new_batch()
834 .set(k1, v1)
835 .merkleize(&db, metadata, Location::new(0)),
836 )
837 .await
838 .unwrap();
839 db.commit().await.unwrap();
840 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
841 assert!(db.get(&k2).await.unwrap().is_none());
842 assert_eq!(db.bounds().await.end, 3);
843 assert_eq!(db.get_metadata().await.unwrap(), Some(Sha256::fill(99u8)));
844
845 db.apply_batch(
847 db.new_batch()
848 .set(k2, v2)
849 .merkleize(&db, None, Location::new(0)),
850 )
851 .await
852 .unwrap();
853 db.commit().await.unwrap();
854 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
855 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
856 assert_eq!(db.bounds().await.end, 5);
857 assert_eq!(db.get_metadata().await.unwrap(), None);
858
859 let root = db.root();
861
862 let k3 = Sha256::fill(5u8);
864 let v3 = Sha256::fill(6u8);
865 {
866 let _batch = db.new_batch().set(k3, v3);
867 }
869
870 drop(db); let db = open_db(context.child("second")).await;
873 assert!(db.get(&k3).await.unwrap().is_none());
874 assert_eq!(db.root(), root);
875 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
876 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
877 assert_eq!(db.bounds().await.end, 5);
878 assert_eq!(db.get_metadata().await.unwrap(), None);
879
880 db.destroy().await.unwrap();
882 }
883
884 pub(crate) async fn test_immutable_proof_verify<F: Family, V, C>(
885 context: deterministic::Context,
886 open_db: impl Fn(
887 deterministic::Context,
888 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
889 ) where
890 V: ValueEncoding<Value = Digest>,
891 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
892 C::Item: EncodeShared,
893 {
894 let mut db = open_db(context.child("first")).await;
895
896 let k1 = Sha256::fill(1u8);
897 let v1 = Sha256::fill(10u8);
898 db.apply_batch(
899 db.new_batch()
900 .set(k1, v1)
901 .merkleize(&db, None, Location::new(0)),
902 )
903 .await
904 .unwrap();
905 db.commit().await.unwrap();
906
907 let (proof, ops) = db.proof(Location::new(0), NZU64!(100)).await.unwrap();
908 let root = db.root();
909 let hasher = qmdb::hasher::<Sha256>();
910 assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
911
912 db.destroy().await.unwrap();
913 }
914
915 pub(crate) async fn test_immutable_prune<F: Family, V, C>(
916 context: deterministic::Context,
917 open_db: impl Fn(
918 deterministic::Context,
919 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
920 ) where
921 V: ValueEncoding<Value = Digest>,
922 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
923 C::Item: EncodeShared,
924 {
925 let mut db = open_db(context.child("first")).await;
926
927 for i in 0..20u8 {
928 let key = Sha256::fill(i);
929 let value = Sha256::fill(i.wrapping_add(100));
930 let floor = db.bounds().await.end;
931 db.apply_batch(db.new_batch().set(key, value).merkleize(&db, None, floor))
932 .await
933 .unwrap();
934 db.commit().await.unwrap();
935 }
936
937 let root_before = db.root();
938 let bounds_before = db.bounds().await;
939
940 let prune_loc = Location::new(*bounds_before.end - 5);
941 db.prune(prune_loc).await.unwrap();
942
943 assert_eq!(db.root(), root_before);
944
945 let key_0 = Sha256::fill(0u8);
946 assert!(db.get(&key_0).await.unwrap().is_none());
947
948 let key_19 = Sha256::fill(19u8);
949 assert_eq!(
950 db.get(&key_19).await.unwrap(),
951 Some(Sha256::fill(19u8.wrapping_add(100)))
952 );
953
954 db.destroy().await.unwrap();
955 }
956
957 pub(crate) async fn test_immutable_batch_chain<F: Family, V, C>(
958 context: deterministic::Context,
959 open_db: impl Fn(
960 deterministic::Context,
961 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
962 ) where
963 V: ValueEncoding<Value = Digest>,
964 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
965 C::Item: EncodeShared,
966 {
967 let mut db = open_db(context.child("first")).await;
968
969 let k1 = Sha256::fill(1u8);
970 let k2 = Sha256::fill(2u8);
971 let k3 = Sha256::fill(3u8);
972 let v1 = Sha256::fill(11u8);
973 let v2 = Sha256::fill(12u8);
974 let v3 = Sha256::fill(13u8);
975
976 let parent = db
977 .new_batch()
978 .set(k1, v1)
979 .merkleize(&db, None, Location::new(0));
980 let child = parent
981 .new_batch::<Sha256>()
982 .set(k2, v2)
983 .merkleize(&db, None, Location::new(0));
984
985 assert_eq!(child.get(&k1, &db).await.unwrap(), Some(v1));
986 assert_eq!(child.get(&k2, &db).await.unwrap(), Some(v2));
987 assert!(child.get(&k3, &db).await.unwrap().is_none());
988
989 db.apply_batch(child).await.unwrap();
990 db.commit().await.unwrap();
991
992 assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
993 assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
994
995 db.apply_batch(
996 db.new_batch()
997 .set(k3, v3)
998 .merkleize(&db, None, Location::new(0)),
999 )
1000 .await
1001 .unwrap();
1002 db.commit().await.unwrap();
1003 assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
1004
1005 db.destroy().await.unwrap();
1006 }
1007
1008 pub(crate) async fn test_immutable_build_and_authenticate<F: Family, V, C>(
1009 context: deterministic::Context,
1010 open_db: impl Fn(
1011 deterministic::Context,
1012 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1013 ) where
1014 V: ValueEncoding<Value = Digest>,
1015 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1016 C::Item: EncodeShared,
1017 {
1018 let hasher = qmdb::hasher::<Sha256>();
1020 let mut db = open_db(context.child("first")).await;
1021
1022 let mut batch = db.new_batch();
1023 for i in 0u64..2_000 {
1024 let k = Sha256::hash(&i.to_be_bytes());
1025 let v = Sha256::fill(i as u8);
1026 batch = batch.set(k, v);
1027 }
1028 let merkleized = batch.merkleize(&db, None, Location::new(0));
1029 db.apply_batch(merkleized).await.unwrap();
1030 db.commit().await.unwrap();
1031 assert_eq!(db.bounds().await.end, 2_000 + 2);
1032
1033 let root = db.root();
1035 drop(db);
1036
1037 let db = open_db(context.child("second")).await;
1038 assert_eq!(root, db.root());
1039 assert_eq!(db.bounds().await.end, 2_000 + 2);
1040 for i in 0u64..2_000 {
1041 let k = Sha256::hash(&i.to_be_bytes());
1042 let v = Sha256::fill(i as u8);
1043 assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
1044 }
1045
1046 let max_ops = NZU64!(5);
1049 for i in 0..*db.bounds().await.end {
1050 let (proof, log) = db.proof(Location::new(i), max_ops).await.unwrap();
1051 assert!(verify_proof(&hasher, &proof, Location::new(i), &log, &root));
1052 }
1053
1054 db.destroy().await.unwrap();
1055 }
1056
1057 pub(crate) async fn test_immutable_recovery_from_failed_merkle_sync<F: Family, V, C>(
1058 context: deterministic::Context,
1059 open_db: impl Fn(
1060 deterministic::Context,
1061 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1062 ) where
1063 V: ValueEncoding<Value = Digest>,
1064 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1065 C::Item: EncodeShared,
1066 {
1067 const ELEMENTS: u64 = 1000;
1069 let mut db = open_db(context.child("first")).await;
1070
1071 let mut batch = db.new_batch();
1072 for i in 0u64..ELEMENTS {
1073 let k = Sha256::hash(&i.to_be_bytes());
1074 let v = Sha256::fill(i as u8);
1075 batch = batch.set(k, v);
1076 }
1077 let merkleized = batch.merkleize(&db, None, Location::new(0));
1078 db.apply_batch(merkleized).await.unwrap();
1079 db.commit().await.unwrap();
1080 assert_eq!(db.bounds().await.end, ELEMENTS + 2);
1081 db.sync().await.unwrap();
1082 let halfway_root = db.root();
1083
1084 let mut batch = db.new_batch();
1086 for i in ELEMENTS..ELEMENTS * 2 {
1087 let k = Sha256::hash(&i.to_be_bytes());
1088 let v = Sha256::fill(i as u8);
1089 batch = batch.set(k, v);
1090 }
1091 let merkleized = batch.merkleize(&db, None, Location::new(0));
1092 db.apply_batch(merkleized).await.unwrap();
1093 db.commit().await.unwrap();
1094 drop(db); let db = open_db(context.child("second")).await;
1099 assert_eq!(db.bounds().await.end, 2003);
1100 let root = db.root();
1101 assert_ne!(root, halfway_root);
1102
1103 drop(db);
1105 let db = open_db(context.child("third")).await;
1106 assert_eq!(db.bounds().await.end, 2003);
1107 assert_eq!(db.root(), root);
1108
1109 db.destroy().await.unwrap();
1110 }
1111
1112 pub(crate) async fn test_immutable_recovery_from_failed_log_sync<F: Family, V, C>(
1113 context: deterministic::Context,
1114 open_db: impl Fn(
1115 deterministic::Context,
1116 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1117 ) where
1118 V: ValueEncoding<Value = Digest>,
1119 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1120 C::Item: EncodeShared,
1121 {
1122 let mut db = open_db(context.child("first")).await;
1123
1124 let k1 = Sha256::fill(1u8);
1126 let v1 = Sha256::fill(3u8);
1127 db.apply_batch(
1128 db.new_batch()
1129 .set(k1, v1)
1130 .merkleize(&db, None, Location::new(0)),
1131 )
1132 .await
1133 .unwrap();
1134 db.commit().await.unwrap();
1135 let first_commit_root = db.root();
1136
1137 drop(db);
1140
1141 let db = open_db(context.child("second")).await;
1143 assert_eq!(db.bounds().await.end, 3);
1144 let root = db.root();
1145 assert_eq!(root, first_commit_root);
1146
1147 db.destroy().await.unwrap();
1148 }
1149
1150 pub(crate) async fn test_immutable_pruning<F: Family, V, C>(
1151 context: deterministic::Context,
1152 open_db: impl Fn(
1153 deterministic::Context,
1154 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1155 ) where
1156 V: ValueEncoding<Value = Digest>,
1157 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1158 C::Item: EncodeShared,
1159 {
1160 const ELEMENTS: u64 = 2_000;
1162 let mut db = open_db(context.child("first")).await;
1163
1164 let mut sorted_keys: Vec<sha256::Digest> = (1u64..ELEMENTS + 1)
1167 .map(|i| Sha256::hash(&i.to_be_bytes()))
1168 .collect();
1169 sorted_keys.sort();
1170 let mut batch = db.new_batch();
1175 for i in 1u64..ELEMENTS + 1 {
1176 let k = Sha256::hash(&i.to_be_bytes());
1177 let v = Sha256::fill(i as u8);
1178 batch = batch.set(k, v);
1179 }
1180 let inactivity_floor = Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION * 2 - 1);
1183 let merkleized = batch.merkleize(&db, None, inactivity_floor);
1184 db.apply_batch(merkleized).await.unwrap();
1185 assert_eq!(db.bounds().await.end, ELEMENTS + 2);
1186
1187 db.prune(Location::new((ELEMENTS + 2) / 2)).await.unwrap();
1189 let bounds = db.bounds().await;
1190 assert_eq!(bounds.end, ELEMENTS + 2);
1191
1192 let oldest_retained_loc = bounds.start;
1195 assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
1196
1197 let pruned_key = sorted_keys[*oldest_retained_loc as usize - 2];
1199 assert!(db.get(&pruned_key).await.unwrap().is_none());
1200
1201 let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
1203 assert!(db.get(&unpruned_key).await.unwrap().is_some());
1204
1205 let root = db.root();
1207 db.sync().await.unwrap();
1208 drop(db);
1209
1210 let mut db = open_db(context.child("second")).await;
1211 assert_eq!(root, db.root());
1212 let bounds = db.bounds().await;
1213 assert_eq!(bounds.end, ELEMENTS + 2);
1214 let oldest_retained_loc = bounds.start;
1215 assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
1216
1217 let loc = Location::new(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
1219 db.prune(loc).await.unwrap();
1220 let oldest_retained_loc = db.bounds().await.start;
1222 assert_eq!(
1223 oldest_retained_loc,
1224 Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
1225 );
1226
1227 db.sync().await.unwrap();
1229 drop(db);
1230 let db = open_db(context.child("third")).await;
1231 let oldest_retained_loc = db.bounds().await.start;
1232 assert_eq!(
1233 oldest_retained_loc,
1234 Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
1235 );
1236
1237 let floor_val = ELEMENTS / 2 + ITEMS_PER_SECTION * 2 - 1;
1239 let inactive_key = sorted_keys[floor_val as usize - 2];
1240 assert!(db.get(&inactive_key).await.unwrap().is_none());
1241
1242 let active_key = sorted_keys[floor_val as usize - 1];
1244 assert!(db.get(&active_key).await.unwrap().is_some());
1245
1246 let pruned_pos = ELEMENTS / 2;
1248 let proof_result = db
1249 .proof(Location::new(pruned_pos), NZU64!(pruned_pos + 100))
1250 .await;
1251 assert!(
1252 matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos)
1253 );
1254
1255 db.destroy().await.unwrap();
1256 }
1257
1258 pub(crate) async fn test_immutable_prune_beyond_floor<F: Family, V, C>(
1259 context: deterministic::Context,
1260 open_db: impl Fn(
1261 deterministic::Context,
1262 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1263 ) where
1264 V: ValueEncoding<Value = Digest>,
1265 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1266 C::Item: EncodeShared,
1267 {
1268 let mut db = open_db(context.child("test")).await;
1269
1270 let result = db.prune(Location::new(1)).await;
1272 assert!(
1273 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, floor))
1274 if prune_loc == Location::new(1) && floor == Location::new(0))
1275 );
1276
1277 let k1 = Digest::from(*b"12345678901234567890123456789012");
1279 let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
1280 let k3 = Digest::from(*b"99999999999999999999999999999999");
1281 let v1 = Sha256::fill(1u8);
1282 let v2 = Sha256::fill(2u8);
1283 let v3 = Sha256::fill(3u8);
1284
1285 db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
1287 &db,
1288 None,
1289 Location::new(3),
1290 ))
1291 .await
1292 .unwrap();
1293
1294 assert_eq!(*db.last_commit_loc, 3);
1296
1297 db.apply_batch(
1299 db.new_batch()
1300 .set(k3, v3)
1301 .merkleize(&db, None, Location::new(5)),
1302 )
1303 .await
1304 .unwrap();
1305
1306 assert!(db.prune(Location::new(3)).await.is_ok());
1308
1309 let floor = db.inactivity_floor_loc();
1311 let beyond = floor + 1;
1312 let result = db.prune(beyond).await;
1313 assert!(
1314 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, f))
1315 if prune_loc == beyond && f == floor)
1316 );
1317
1318 db.destroy().await.unwrap();
1319 }
1320
1321 async fn commit_sets<F: Family, V, C>(
1322 db: &mut TestDb<F, V, C>,
1323 sets: impl IntoIterator<Item = (Digest, V::Value)>,
1324 metadata: Option<V::Value>,
1325 ) -> Range<Location<F>>
1326 where
1327 V: ValueEncoding<Value = Digest>,
1328 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1329 C::Item: EncodeShared,
1330 {
1331 commit_sets_with_floor(db, sets, metadata, Location::new(0)).await
1332 }
1333
1334 async fn commit_sets_with_floor<F: Family, V, C>(
1335 db: &mut TestDb<F, V, C>,
1336 sets: impl IntoIterator<Item = (Digest, V::Value)>,
1337 metadata: Option<V::Value>,
1338 floor: Location<F>,
1339 ) -> Range<Location<F>>
1340 where
1341 V: ValueEncoding<Value = Digest>,
1342 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1343 C::Item: EncodeShared,
1344 {
1345 let mut batch = db.new_batch();
1346 for (key, value) in sets {
1347 batch = batch.set(key, value);
1348 }
1349 let range = db
1350 .apply_batch(batch.merkleize(db, metadata, floor))
1351 .await
1352 .unwrap();
1353 db.commit().await.unwrap();
1354 range
1355 }
1356
1357 pub(crate) async fn test_immutable_rewind_recovery<F: Family, V, C>(
1358 context: deterministic::Context,
1359 open_db: impl Fn(
1360 deterministic::Context,
1361 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1362 ) where
1363 V: ValueEncoding<Value = Digest>,
1364 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1365 C::Item: EncodeShared,
1366 {
1367 let mut db = open_db(context.child("db")).await;
1368
1369 let key1 = Sha256::hash(&1u64.to_be_bytes());
1370 let key2 = Sha256::hash(&2u64.to_be_bytes());
1371 let key3 = Sha256::hash(&3u64.to_be_bytes());
1372 let key4 = Sha256::hash(&4u64.to_be_bytes());
1373
1374 let value1 = Sha256::fill(11u8);
1375 let value2 = Sha256::fill(22u8);
1376 let value3 = Sha256::fill(33u8);
1377 let value4 = Sha256::fill(66u8);
1378
1379 let metadata_a = Sha256::fill(44u8);
1380 let first_range =
1381 commit_sets(&mut db, [(key1, value1), (key2, value2)], Some(metadata_a)).await;
1382 let size_before = db.bounds().await.end;
1383 let root_before = db.root();
1384 let last_commit_before = db.last_commit_loc;
1385 assert_eq!(size_before, first_range.end);
1386
1387 let metadata_b = Sha256::fill(55u8);
1388 let second_range =
1389 commit_sets(&mut db, [(key3, value3), (key4, value4)], Some(metadata_b)).await;
1390 assert_eq!(second_range.start, size_before);
1391 assert_ne!(db.root(), root_before);
1392 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_b));
1393 assert_eq!(db.get(&key3).await.unwrap(), Some(value3));
1394 assert_eq!(db.get(&key4).await.unwrap(), Some(value4));
1395
1396 db.rewind(size_before).await.unwrap();
1397 assert_eq!(db.root(), root_before);
1398 assert_eq!(db.bounds().await.end, size_before);
1399 assert_eq!(db.last_commit_loc, last_commit_before);
1400 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1401 assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1402 assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1403 assert_eq!(db.get(&key3).await.unwrap(), None);
1404 assert_eq!(db.get(&key4).await.unwrap(), None);
1405
1406 db.commit().await.unwrap();
1407 drop(db);
1408 let db = open_db(context.child("reopen")).await;
1409 assert_eq!(db.root(), root_before);
1410 assert_eq!(db.bounds().await.end, size_before);
1411 assert_eq!(db.last_commit_loc, last_commit_before);
1412 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1413 assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1414 assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1415 assert_eq!(db.get(&key3).await.unwrap(), None);
1416 assert_eq!(db.get(&key4).await.unwrap(), None);
1417
1418 db.destroy().await.unwrap();
1419 }
1420
1421 pub(crate) async fn test_immutable_rewind_preserves_collision_bucket<F: Family, V, C>(
1425 context: deterministic::Context,
1426 open_db: impl Fn(
1427 deterministic::Context,
1428 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1429 ) where
1430 V: ValueEncoding<Value = Digest>,
1431 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1432 C::Item: EncodeShared,
1433 {
1434 let mut db = open_db(context.child("db")).await;
1435
1436 let mut k1_bytes = [0u8; 32];
1438 let mut k2_bytes = [0u8; 32];
1439 k1_bytes[0] = 0xAA;
1440 k1_bytes[1] = 0xBB;
1441 k2_bytes[0] = 0xAA;
1442 k2_bytes[1] = 0xBB;
1443 k1_bytes[31] = 0x01;
1444 k2_bytes[31] = 0x02;
1445 let key1 = Digest::from(k1_bytes);
1446 let key2 = Digest::from(k2_bytes);
1447 let value1 = Sha256::fill(11u8);
1448 let value2 = Sha256::fill(22u8);
1449
1450 commit_sets(&mut db, [(key1, value1)], None).await;
1451 let size_after_first = db.bounds().await.end;
1452 commit_sets(&mut db, [(key2, value2)], None).await;
1453 assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1454 assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
1455
1456 db.rewind(size_after_first).await.unwrap();
1457
1458 assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1461 assert_eq!(db.get(&key2).await.unwrap(), None);
1462
1463 db.destroy().await.unwrap();
1464 }
1465
1466 pub(crate) async fn test_immutable_rewind_pruned_target_errors<F: Family, V, C>(
1467 context: deterministic::Context,
1468 open_small_sections_db: impl Fn(
1469 deterministic::Context,
1470 )
1471 -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1472 ) where
1473 V: ValueEncoding<Value = Digest>,
1474 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1475 C::Item: EncodeShared,
1476 {
1477 let mut db = open_small_sections_db(context.child("db")).await;
1478
1479 let first_range = commit_sets(
1480 &mut db,
1481 (0u64..16).map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8))),
1482 None,
1483 )
1484 .await;
1485
1486 let mut round = 0u64;
1487 loop {
1488 round += 1;
1489 assert!(
1490 round <= 64,
1491 "failed to prune enough history for rewind test"
1492 );
1493
1494 let floor = Location::new(*db.bounds().await.end + 16);
1497 commit_sets_with_floor(
1498 &mut db,
1499 (0u64..16).map(|i| {
1500 let seed = round * 100 + i;
1501 (Sha256::hash(&seed.to_be_bytes()), Sha256::fill(seed as u8))
1502 }),
1503 None,
1504 floor,
1505 )
1506 .await;
1507 db.prune(db.last_commit_loc).await.unwrap();
1508
1509 if db.bounds().await.start > first_range.start {
1510 break;
1511 }
1512 }
1513
1514 let oldest_retained = db.bounds().await.start;
1515 let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
1516 assert!(
1517 matches!(
1518 boundary_err,
1519 Error::Journal(crate::journal::Error::ItemPruned(_))
1520 ),
1521 "unexpected rewind error at retained boundary: {boundary_err:?}"
1522 );
1523
1524 let err = db.rewind(first_range.start).await.unwrap_err();
1525 assert!(
1526 matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1527 "unexpected rewind error: {err:?}"
1528 );
1529
1530 db.destroy().await.unwrap();
1531 }
1532
1533 pub(crate) async fn test_immutable_batch_get_read_through<F: Family, V, C>(
1535 context: deterministic::Context,
1536 open_db: impl Fn(
1537 deterministic::Context,
1538 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1539 ) where
1540 V: ValueEncoding<Value = Digest>,
1541 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1542 C::Item: EncodeShared,
1543 {
1544 let mut db = open_db(context.child("db")).await;
1545
1546 let key_a = Sha256::hash(&0u64.to_be_bytes());
1548 let val_a = Sha256::fill(1u8);
1549 db.apply_batch(
1550 db.new_batch()
1551 .set(key_a, val_a)
1552 .merkleize(&db, None, Location::new(0)),
1553 )
1554 .await
1555 .unwrap();
1556
1557 let mut batch = db.new_batch();
1559 assert_eq!(batch.get(&key_a, &db).await.unwrap(), Some(val_a));
1560
1561 let key_b = Sha256::hash(&1u64.to_be_bytes());
1563 let val_b = Sha256::fill(2u8);
1564 batch = batch.set(key_b, val_b);
1565 assert_eq!(batch.get(&key_b, &db).await.unwrap(), Some(val_b));
1566
1567 let key_c = Sha256::hash(&2u64.to_be_bytes());
1569 assert_eq!(batch.get(&key_c, &db).await.unwrap(), None);
1570
1571 db.destroy().await.unwrap();
1572 }
1573
1574 pub(crate) async fn test_immutable_batch_stacked_get<F: Family, V, C>(
1576 context: deterministic::Context,
1577 open_db: impl Fn(
1578 deterministic::Context,
1579 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1580 ) where
1581 V: ValueEncoding<Value = Digest>,
1582 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1583 C::Item: EncodeShared,
1584 {
1585 let db = open_db(context.child("db")).await;
1586
1587 let key_a = Sha256::hash(&0u64.to_be_bytes());
1589 let val_a = Sha256::fill(10u8);
1590 let parent = db.new_batch().set(key_a, val_a);
1591 let parent_m = parent.merkleize(&db, None, Location::new(0));
1592
1593 let mut child = parent_m.new_batch::<Sha256>();
1595 assert_eq!(child.get(&key_a, &db).await.unwrap(), Some(val_a));
1596
1597 let key_b = Sha256::hash(&1u64.to_be_bytes());
1599 let val_b = Sha256::fill(20u8);
1600 child = child.set(key_b, val_b);
1601 assert_eq!(child.get(&key_b, &db).await.unwrap(), Some(val_b));
1602
1603 let key_c = Sha256::hash(&2u64.to_be_bytes());
1605 assert_eq!(child.get(&key_c, &db).await.unwrap(), None);
1606
1607 db.destroy().await.unwrap();
1608 }
1609
1610 pub(crate) async fn test_immutable_batch_stacked_apply<F: Family, V, C>(
1612 context: deterministic::Context,
1613 open_db: impl Fn(
1614 deterministic::Context,
1615 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1616 ) where
1617 V: ValueEncoding<Value = Digest>,
1618 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1619 C::Item: EncodeShared,
1620 {
1621 let mut db = open_db(context.child("db")).await;
1622
1623 let mut kvs_first: Vec<(Digest, Digest)> = (0u64..5)
1625 .map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
1626 .collect();
1627 kvs_first.sort_by_key(|a| a.0);
1628
1629 let mut kvs_second: Vec<(Digest, Digest)> = (5u64..10)
1630 .map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
1631 .collect();
1632 kvs_second.sort_by_key(|a| a.0);
1633
1634 let mut parent = db.new_batch();
1636 for (k, v) in &kvs_first {
1637 parent = parent.set(*k, *v);
1638 }
1639 let parent_m = parent.merkleize(&db, None, Location::new(0));
1640
1641 let mut child = parent_m.new_batch::<Sha256>();
1643 for (k, v) in &kvs_second {
1644 child = child.set(*k, *v);
1645 }
1646 let child_m = child.merkleize(&db, None, Location::new(0));
1647 let expected_root = child_m.root();
1648 db.apply_batch(child_m).await.unwrap();
1649
1650 assert_eq!(db.root(), expected_root);
1651
1652 for (k, v) in kvs_first.iter().chain(kvs_second.iter()) {
1654 assert_eq!(db.get(k).await.unwrap(), Some(*v));
1655 }
1656
1657 db.destroy().await.unwrap();
1658 }
1659
1660 pub(crate) async fn test_immutable_batch_speculative_root<F: Family, V, C>(
1662 context: deterministic::Context,
1663 open_db: impl Fn(
1664 deterministic::Context,
1665 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1666 ) where
1667 V: ValueEncoding<Value = Digest>,
1668 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1669 C::Item: EncodeShared,
1670 {
1671 let mut db = open_db(context.child("db")).await;
1672
1673 let mut batch = db.new_batch();
1674 for i in 0u8..10 {
1675 let k = Sha256::hash(&[i]);
1676 batch = batch.set(k, Sha256::fill(i));
1677 }
1678 let merkleized = batch.merkleize(&db, None, Location::new(0));
1679
1680 let speculative = merkleized.root();
1681 db.apply_batch(merkleized).await.unwrap();
1682 assert_eq!(db.root(), speculative);
1683
1684 let metadata = Some(Sha256::fill(55u8));
1686 let mut batch = db.new_batch();
1687 let k = Sha256::hash(&[0xAA]);
1688 batch = batch.set(k, Sha256::fill(0xAA));
1689 let merkleized = batch.merkleize(&db, metadata, Location::new(0));
1690 let speculative = merkleized.root();
1691 db.apply_batch(merkleized).await.unwrap();
1692 assert_eq!(db.root(), speculative);
1693
1694 db.destroy().await.unwrap();
1695 }
1696
1697 pub(crate) async fn test_immutable_merkleized_batch_get<F: Family, V, C>(
1699 context: deterministic::Context,
1700 open_db: impl Fn(
1701 deterministic::Context,
1702 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1703 ) where
1704 V: ValueEncoding<Value = Digest>,
1705 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1706 C::Item: EncodeShared,
1707 {
1708 let mut db = open_db(context.child("db")).await;
1709
1710 let key_a = Sha256::hash(&0u64.to_be_bytes());
1712 let val_a = Sha256::fill(10u8);
1713 db.apply_batch(
1714 db.new_batch()
1715 .set(key_a, val_a)
1716 .merkleize(&db, None, Location::new(0)),
1717 )
1718 .await
1719 .unwrap();
1720
1721 let key_b = Sha256::hash(&1u64.to_be_bytes());
1723 let val_b = Sha256::fill(20u8);
1724 let merkleized = db
1725 .new_batch()
1726 .set(key_b, val_b)
1727 .merkleize(&db, None, Location::new(0));
1728
1729 assert_eq!(merkleized.get(&key_a, &db).await.unwrap(), Some(val_a));
1731
1732 assert_eq!(merkleized.get(&key_b, &db).await.unwrap(), Some(val_b));
1734
1735 let key_c = Sha256::hash(&2u64.to_be_bytes());
1737 assert_eq!(merkleized.get(&key_c, &db).await.unwrap(), None);
1738
1739 db.destroy().await.unwrap();
1740 }
1741
1742 pub(crate) async fn test_immutable_batch_sequential_apply<F: Family, V, C>(
1744 context: deterministic::Context,
1745 open_db: impl Fn(
1746 deterministic::Context,
1747 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1748 ) where
1749 V: ValueEncoding<Value = Digest>,
1750 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1751 C::Item: EncodeShared,
1752 {
1753 let mut db = open_db(context.child("db")).await;
1754
1755 let key_a = Sha256::hash(&0u64.to_be_bytes());
1756 let val_a = Sha256::fill(1u8);
1757
1758 let m = db
1760 .new_batch()
1761 .set(key_a, val_a)
1762 .merkleize(&db, None, Location::new(0));
1763 let root1 = m.root();
1764 db.apply_batch(m).await.unwrap();
1765 assert_eq!(db.root(), root1);
1766 assert_eq!(db.get(&key_a).await.unwrap(), Some(val_a));
1767
1768 let key_b = Sha256::hash(&1u64.to_be_bytes());
1770 let val_b = Sha256::fill(2u8);
1771 let m = db
1772 .new_batch()
1773 .set(key_b, val_b)
1774 .merkleize(&db, None, Location::new(0));
1775 let root2 = m.root();
1776 db.apply_batch(m).await.unwrap();
1777 assert_eq!(db.root(), root2);
1778 assert_eq!(db.get(&key_b).await.unwrap(), Some(val_b));
1779
1780 db.destroy().await.unwrap();
1781 }
1782
1783 pub(crate) async fn test_immutable_batch_many_sequential<F: Family, V, C>(
1785 context: deterministic::Context,
1786 open_db: impl Fn(
1787 deterministic::Context,
1788 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1789 ) where
1790 V: ValueEncoding<Value = Digest>,
1791 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1792 C::Item: EncodeShared,
1793 {
1794 let mut db = open_db(context.child("db")).await;
1795 let hasher = qmdb::hasher::<Sha256>();
1796
1797 const BATCHES: u64 = 20;
1798 const KEYS_PER_BATCH: u64 = 5;
1799
1800 let mut all_kvs: Vec<(Digest, Digest)> = Vec::new();
1801
1802 for batch_idx in 0..BATCHES {
1803 let mut batch = db.new_batch();
1804 for j in 0..KEYS_PER_BATCH {
1805 let seed = batch_idx * 100 + j;
1806 let k = Sha256::hash(&seed.to_be_bytes());
1807 let v = Sha256::fill(seed as u8);
1808 batch = batch.set(k, v);
1809 all_kvs.push((k, v));
1810 }
1811 let merkleized = batch.merkleize(&db, None, Location::new(0));
1812 db.apply_batch(merkleized).await.unwrap();
1813 }
1814
1815 for (k, v) in &all_kvs {
1817 assert_eq!(db.get(k).await.unwrap(), Some(*v));
1818 }
1819
1820 let root = db.root();
1822 let (proof, ops) = db.proof(Location::new(0), NZU64!(10000)).await.unwrap();
1823 assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1824
1825 let expected = 1 + BATCHES * (KEYS_PER_BATCH + 1);
1827 assert_eq!(db.bounds().await.end, expected);
1828
1829 db.destroy().await.unwrap();
1830 }
1831
1832 pub(crate) async fn test_immutable_batch_empty_batch<F: Family, V, C>(
1834 context: deterministic::Context,
1835 open_db: impl Fn(
1836 deterministic::Context,
1837 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1838 ) where
1839 V: ValueEncoding<Value = Digest>,
1840 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1841 C::Item: EncodeShared,
1842 {
1843 let mut db = open_db(context.child("db")).await;
1844
1845 let k = Sha256::hash(&[1u8]);
1847 db.apply_batch(db.new_batch().set(k, Sha256::fill(1u8)).merkleize(
1848 &db,
1849 None,
1850 Location::new(0),
1851 ))
1852 .await
1853 .unwrap();
1854 let root_before = db.root();
1855 let size_before = db.bounds().await.end;
1856
1857 let merkleized = db.new_batch().merkleize(&db, None, Location::new(0));
1859 let speculative = merkleized.root();
1860 db.apply_batch(merkleized).await.unwrap();
1861
1862 assert_ne!(db.root(), root_before);
1864 assert_eq!(db.root(), speculative);
1865 assert_eq!(db.bounds().await.end, size_before + 1);
1867
1868 db.destroy().await.unwrap();
1869 }
1870
1871 pub(crate) async fn test_immutable_batch_chained_merkleized_get<F: Family, V, C>(
1873 context: deterministic::Context,
1874 open_db: impl Fn(
1875 deterministic::Context,
1876 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1877 ) where
1878 V: ValueEncoding<Value = Digest>,
1879 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1880 C::Item: EncodeShared,
1881 {
1882 let mut db = open_db(context.child("db")).await;
1883
1884 let key_a = Sha256::hash(&0u64.to_be_bytes());
1886 let val_a = Sha256::fill(10u8);
1887 db.apply_batch(
1888 db.new_batch()
1889 .set(key_a, val_a)
1890 .merkleize(&db, None, Location::new(0)),
1891 )
1892 .await
1893 .unwrap();
1894
1895 let key_b = Sha256::hash(&1u64.to_be_bytes());
1897 let val_b = Sha256::fill(1u8);
1898 let parent_m = db
1899 .new_batch()
1900 .set(key_b, val_b)
1901 .merkleize(&db, None, Location::new(0));
1902
1903 let key_c = Sha256::hash(&2u64.to_be_bytes());
1905 let val_c = Sha256::fill(2u8);
1906 let child_m =
1907 parent_m
1908 .new_batch::<Sha256>()
1909 .set(key_c, val_c)
1910 .merkleize(&db, None, Location::new(0));
1911
1912 assert_eq!(child_m.get(&key_a, &db).await.unwrap(), Some(val_a));
1915 assert_eq!(child_m.get(&key_b, &db).await.unwrap(), Some(val_b));
1917 assert_eq!(child_m.get(&key_c, &db).await.unwrap(), Some(val_c));
1919 let key_d = Sha256::hash(&3u64.to_be_bytes());
1921 assert_eq!(child_m.get(&key_d, &db).await.unwrap(), None);
1922
1923 db.destroy().await.unwrap();
1924 }
1925
1926 pub(crate) async fn test_immutable_batch_large<F: Family, V, C>(
1928 context: deterministic::Context,
1929 open_db: impl Fn(
1930 deterministic::Context,
1931 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1932 ) where
1933 V: ValueEncoding<Value = Digest>,
1934 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1935 C::Item: EncodeShared,
1936 {
1937 let mut db = open_db(context.child("db")).await;
1938 let hasher = qmdb::hasher::<Sha256>();
1939
1940 const N: u64 = 500;
1941 let mut kvs: Vec<(Digest, Digest)> = Vec::new();
1942
1943 let mut batch = db.new_batch();
1944 for i in 0..N {
1945 let k = Sha256::hash(&i.to_be_bytes());
1946 let v = Sha256::fill((i % 256) as u8);
1947 batch = batch.set(k, v);
1948 kvs.push((k, v));
1949 }
1950 let merkleized = batch.merkleize(&db, None, Location::new(0));
1951 db.apply_batch(merkleized).await.unwrap();
1952
1953 for (k, v) in &kvs {
1955 assert_eq!(db.get(k).await.unwrap(), Some(*v));
1956 }
1957
1958 let root = db.root();
1960 let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1961 assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1962
1963 assert_eq!(db.bounds().await.end, 1 + N + 1);
1965
1966 db.destroy().await.unwrap();
1967 }
1968
1969 pub(crate) async fn test_immutable_batch_chained_key_override<F: Family, V, C>(
1971 context: deterministic::Context,
1972 open_db: impl Fn(
1973 deterministic::Context,
1974 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
1975 ) where
1976 V: ValueEncoding<Value = Digest>,
1977 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
1978 C::Item: EncodeShared,
1979 {
1980 let mut db = open_db(context.child("db")).await;
1981
1982 let key = Sha256::hash(&0u64.to_be_bytes());
1983 let val_parent = Sha256::fill(1u8);
1984 let val_child = Sha256::fill(2u8);
1985
1986 let parent_m = db
1988 .new_batch()
1989 .set(key, val_parent)
1990 .merkleize(&db, None, Location::new(0));
1991
1992 let mut child = parent_m.new_batch::<Sha256>();
1994 child = child.set(key, val_child);
1995
1996 assert_eq!(child.get(&key, &db).await.unwrap(), Some(val_child));
1998
1999 let child_m = child.merkleize(&db, None, Location::new(0));
2000
2001 assert_eq!(child_m.get(&key, &db).await.unwrap(), Some(val_child));
2003
2004 db.apply_batch(child_m).await.unwrap();
2006 assert_eq!(db.get(&key).await.unwrap(), Some(val_child));
2007
2008 db.destroy().await.unwrap();
2009 }
2010
2011 pub(crate) async fn test_immutable_batch_sequential_key_override<F: Family, V, C>(
2018 context: deterministic::Context,
2019 open_db_small_sections: impl Fn(
2020 deterministic::Context,
2021 )
2022 -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2023 ) where
2024 V: ValueEncoding<Value = Digest>,
2025 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2026 C::Item: EncodeShared,
2027 {
2028 let mut db = open_db_small_sections(context.child("db")).await;
2029
2030 let key = Sha256::hash(&0u64.to_be_bytes());
2031 let v1 = Sha256::fill(1u8);
2032 let v2 = Sha256::fill(2u8);
2033
2034 db.apply_batch(
2037 db.new_batch()
2038 .set(key, v1)
2039 .merkleize(&db, None, Location::new(0)),
2040 )
2041 .await
2042 .unwrap();
2043 assert_eq!(db.get(&key).await.unwrap(), Some(v1));
2044
2045 db.apply_batch(
2049 db.new_batch()
2050 .set(key, v2)
2051 .merkleize(&db, None, Location::new(4)),
2052 )
2053 .await
2054 .unwrap();
2055
2056 assert_eq!(db.get(&key).await.unwrap(), Some(v1));
2058
2059 db.prune(Location::new(2)).await.unwrap();
2062 assert_eq!(db.get(&key).await.unwrap(), Some(v2));
2063
2064 db.destroy().await.unwrap();
2065 }
2066
2067 pub(crate) async fn test_immutable_batch_metadata<F: Family, V, C>(
2069 context: deterministic::Context,
2070 open_db: impl Fn(
2071 deterministic::Context,
2072 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2073 ) where
2074 V: ValueEncoding<Value = Digest>,
2075 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2076 C::Item: EncodeShared,
2077 {
2078 let mut db = open_db(context.child("db")).await;
2079
2080 let metadata = Sha256::fill(42u8);
2082 let k = Sha256::hash(&[1u8]);
2083 db.apply_batch(db.new_batch().set(k, Sha256::fill(1u8)).merkleize(
2084 &db,
2085 Some(metadata),
2086 Location::new(0),
2087 ))
2088 .await
2089 .unwrap();
2090 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
2091
2092 db.apply_batch(db.new_batch().merkleize(&db, None, Location::new(0)))
2094 .await
2095 .unwrap();
2096 assert_eq!(db.get_metadata().await.unwrap(), None);
2097
2098 db.destroy().await.unwrap();
2099 }
2100
2101 pub(crate) async fn test_immutable_stale_batch_rejected<F: Family, V, C>(
2102 context: deterministic::Context,
2103 open_db: impl Fn(
2104 deterministic::Context,
2105 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2106 ) where
2107 V: ValueEncoding<Value = Digest>,
2108 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2109 C::Item: EncodeShared,
2110 {
2111 let mut db = open_db(context.child("db")).await;
2112
2113 let key1 = Sha256::hash(&[1]);
2114 let key2 = Sha256::hash(&[2]);
2115 let v1 = Sha256::fill(10u8);
2116 let v2 = Sha256::fill(20u8);
2117
2118 let batch_a = db
2120 .new_batch()
2121 .set(key1, v1)
2122 .merkleize(&db, None, Location::new(0));
2123 let batch_b = db
2124 .new_batch()
2125 .set(key2, v2)
2126 .merkleize(&db, None, Location::new(0));
2127
2128 db.apply_batch(batch_a).await.unwrap();
2130 let expected_root = db.root();
2131 let expected_bounds = db.bounds().await;
2132 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2133 assert_eq!(db.get(&key2).await.unwrap(), None);
2134 assert_eq!(db.get_metadata().await.unwrap(), None);
2135
2136 let result = db.apply_batch(batch_b).await;
2138 assert!(
2139 matches!(result, Err(Error::StaleBatch { .. })),
2140 "expected StaleBatch error, got {result:?}"
2141 );
2142 assert_eq!(db.root(), expected_root);
2143 assert_eq!(db.bounds().await, expected_bounds);
2144 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2145 assert_eq!(db.get(&key2).await.unwrap(), None);
2146 assert_eq!(db.get_metadata().await.unwrap(), None);
2147
2148 db.destroy().await.unwrap();
2149 }
2150
2151 pub(crate) async fn test_immutable_stale_batch_chained<F: Family, V, C>(
2152 context: deterministic::Context,
2153 open_db: impl Fn(
2154 deterministic::Context,
2155 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2156 ) where
2157 V: ValueEncoding<Value = Digest>,
2158 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2159 C::Item: EncodeShared,
2160 {
2161 let mut db = open_db(context.child("db")).await;
2162
2163 let key1 = Sha256::hash(&[1]);
2164 let key2 = Sha256::hash(&[2]);
2165 let key3 = Sha256::hash(&[3]);
2166
2167 let parent_m =
2169 db.new_batch()
2170 .set(key1, Sha256::fill(1u8))
2171 .merkleize(&db, None, Location::new(0));
2172
2173 let child_a = parent_m
2175 .new_batch::<Sha256>()
2176 .set(key2, Sha256::fill(2u8))
2177 .merkleize(&db, None, Location::new(0));
2178 let child_b = parent_m
2179 .new_batch::<Sha256>()
2180 .set(key3, Sha256::fill(3u8))
2181 .merkleize(&db, None, Location::new(0));
2182
2183 db.apply_batch(child_a).await.unwrap();
2185
2186 let result = db.apply_batch(child_b).await;
2188 assert!(
2189 matches!(result, Err(Error::StaleBatch { .. })),
2190 "expected StaleBatch error, got {result:?}"
2191 );
2192
2193 db.destroy().await.unwrap();
2194 }
2195
2196 pub(crate) async fn test_immutable_partial_ancestor_commit<F: Family, V, C>(
2197 context: deterministic::Context,
2198 open_db: impl Fn(
2199 deterministic::Context,
2200 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2201 ) where
2202 V: ValueEncoding<Value = Digest>,
2203 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2204 C::Item: EncodeShared,
2205 {
2206 let mut db = open_db(context.child("db")).await;
2207
2208 let key1 = Sha256::hash(&[1]);
2209 let key2 = Sha256::hash(&[2]);
2210 let key3 = Sha256::hash(&[3]);
2211 let v1 = Sha256::fill(1u8);
2212 let v2 = Sha256::fill(2u8);
2213 let v3 = Sha256::fill(3u8);
2214
2215 let a = db
2217 .new_batch()
2218 .set(key1, v1)
2219 .merkleize(&db, None, Location::new(0));
2220 let b = a
2221 .new_batch::<Sha256>()
2222 .set(key2, v2)
2223 .merkleize(&db, None, Location::new(0));
2224 let c = b
2225 .new_batch::<Sha256>()
2226 .set(key3, v3)
2227 .merkleize(&db, None, Location::new(0));
2228
2229 let expected_root = c.root();
2230
2231 db.apply_batch(a).await.unwrap();
2233 db.apply_batch(c).await.unwrap();
2234
2235 assert_eq!(db.root(), expected_root);
2236 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2237 assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2238 assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
2239
2240 db.destroy().await.unwrap();
2241 }
2242
2243 pub(crate) async fn test_immutable_sequential_commit_parent_then_child<F: Family, V, C>(
2244 context: deterministic::Context,
2245 open_db: impl Fn(
2246 deterministic::Context,
2247 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2248 ) where
2249 V: ValueEncoding<Value = Digest>,
2250 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2251 C::Item: EncodeShared,
2252 {
2253 let mut db = open_db(context.child("db")).await;
2254
2255 let key1 = Sha256::hash(&[1]);
2256 let key2 = Sha256::hash(&[2]);
2257 let v1 = Sha256::fill(1u8);
2258 let v2 = Sha256::fill(2u8);
2259
2260 let parent_m = db
2262 .new_batch()
2263 .set(key1, v1)
2264 .merkleize(&db, None, Location::new(0));
2265
2266 let child_m =
2268 parent_m
2269 .new_batch::<Sha256>()
2270 .set(key2, v2)
2271 .merkleize(&db, None, Location::new(0));
2272
2273 db.apply_batch(parent_m).await.unwrap();
2275 db.apply_batch(child_m).await.unwrap();
2276
2277 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2279 assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2280
2281 db.destroy().await.unwrap();
2282 }
2283
2284 pub(crate) async fn test_immutable_child_root_matches_pending_and_committed<F: Family, V, C>(
2285 context: deterministic::Context,
2286 open_db: impl Fn(
2287 deterministic::Context,
2288 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2289 ) where
2290 V: ValueEncoding<Value = Digest>,
2291 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2292 C::Item: EncodeShared,
2293 {
2294 let mut db = open_db(context.child("db")).await;
2295
2296 let key1 = Sha256::hash(&[1]);
2297 let key2 = Sha256::hash(&[2]);
2298
2299 let parent =
2301 db.new_batch()
2302 .set(key1, Sha256::fill(1u8))
2303 .merkleize(&db, None, Location::new(0));
2304 let pending_child = parent
2305 .new_batch::<Sha256>()
2306 .set(key2, Sha256::fill(2u8))
2307 .merkleize(&db, None, Location::new(0));
2308
2309 db.apply_batch(parent).await.unwrap();
2312 db.commit().await.unwrap();
2313
2314 let committed_child =
2315 db.new_batch()
2316 .set(key2, Sha256::fill(2u8))
2317 .merkleize(&db, None, Location::new(0));
2318
2319 assert_eq!(pending_child.root(), committed_child.root());
2320
2321 db.destroy().await.unwrap();
2322 }
2323
2324 pub(crate) async fn test_immutable_stale_batch_child_applied_before_parent<F: Family, V, C>(
2325 context: deterministic::Context,
2326 open_db: impl Fn(
2327 deterministic::Context,
2328 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2329 ) where
2330 V: ValueEncoding<Value = Digest>,
2331 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2332 C::Item: EncodeShared,
2333 {
2334 let mut db = open_db(context.child("db")).await;
2335
2336 let key1 = Sha256::hash(&[1]);
2337 let key2 = Sha256::hash(&[2]);
2338
2339 let parent_m =
2341 db.new_batch()
2342 .set(key1, Sha256::fill(1u8))
2343 .merkleize(&db, None, Location::new(0));
2344
2345 let child_m = parent_m
2347 .new_batch::<Sha256>()
2348 .set(key2, Sha256::fill(2u8))
2349 .merkleize(&db, None, Location::new(0));
2350
2351 db.apply_batch(child_m).await.unwrap();
2353
2354 let result = db.apply_batch(parent_m).await;
2356 assert!(
2357 matches!(result, Err(Error::StaleBatch { .. })),
2358 "expected StaleBatch error, got {result:?}"
2359 );
2360
2361 db.destroy().await.unwrap();
2362 }
2363
2364 pub(crate) async fn test_immutable_to_batch<F: Family, V, C>(
2367 context: deterministic::Context,
2368 open_db: impl Fn(
2369 deterministic::Context,
2370 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2371 ) where
2372 V: ValueEncoding<Value = Digest>,
2373 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2374 C::Item: EncodeShared,
2375 {
2376 let mut db = open_db(context.child("db")).await;
2377
2378 let key1 = Sha256::hash(&[1]);
2380 let v1 = Sha256::fill(10u8);
2381 db.apply_batch(
2382 db.new_batch()
2383 .set(key1, v1)
2384 .merkleize(&db, None, Location::new(0)),
2385 )
2386 .await
2387 .unwrap();
2388
2389 let snapshot = db.to_batch();
2391 assert_eq!(snapshot.root(), db.root());
2392
2393 let key2 = Sha256::hash(&[2]);
2395 let v2 = Sha256::fill(20u8);
2396 let child =
2397 snapshot
2398 .new_batch::<Sha256>()
2399 .set(key2, v2)
2400 .merkleize(&db, None, Location::new(0));
2401 db.apply_batch(child).await.unwrap();
2402
2403 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2404 assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2405
2406 db.destroy().await.unwrap();
2407 }
2408
2409 pub(crate) async fn test_immutable_apply_after_ancestor_dropped<F: Family, V, C>(
2412 context: deterministic::Context,
2413 open_db: impl Fn(
2414 deterministic::Context,
2415 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2416 ) where
2417 V: ValueEncoding<Value = Digest>,
2418 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2419 C::Item: EncodeShared,
2420 {
2421 let mut db = open_db(context.child("db")).await;
2422
2423 let key1 = Sha256::hash(&[1]);
2424 let key2 = Sha256::hash(&[2]);
2425 let key3 = Sha256::hash(&[3]);
2426 let v1 = Sha256::fill(1u8);
2427 let v2 = Sha256::fill(2u8);
2428 let v3 = Sha256::fill(3u8);
2429
2430 let a = db
2432 .new_batch()
2433 .set(key1, v1)
2434 .merkleize(&db, None, Location::new(0));
2435 let b = a
2436 .new_batch::<Sha256>()
2437 .set(key2, v2)
2438 .merkleize(&db, None, Location::new(0));
2439 let c = b
2440 .new_batch::<Sha256>()
2441 .set(key3, v3)
2442 .merkleize(&db, None, Location::new(0));
2443
2444 drop(a);
2446 drop(b);
2447
2448 db.apply_batch(c).await.unwrap();
2450
2451 assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
2453 assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
2454 assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
2455
2456 db.destroy().await.unwrap();
2457 }
2458
2459 pub(crate) async fn test_immutable_inactivity_floor_tracking<F: Family, V, C>(
2462 context: deterministic::Context,
2463 open_db: impl Fn(
2464 deterministic::Context,
2465 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2466 ) where
2467 V: ValueEncoding<Value = Digest>,
2468 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2469 C::Item: EncodeShared,
2470 {
2471 let mut db = open_db(context.child("test")).await;
2472
2473 assert_eq!(db.inactivity_floor_loc(), Location::new(0));
2475
2476 let k1 = Sha256::fill(1u8);
2478 let v1 = Sha256::fill(2u8);
2479 db.apply_batch(
2480 db.new_batch()
2481 .set(k1, v1)
2482 .merkleize(&db, None, Location::new(0)),
2483 )
2484 .await
2485 .unwrap();
2486 assert_eq!(db.inactivity_floor_loc(), Location::new(0));
2487
2488 let k2 = Sha256::fill(3u8);
2490 let v2 = Sha256::fill(4u8);
2491 db.apply_batch(
2492 db.new_batch()
2493 .set(k2, v2)
2494 .merkleize(&db, None, Location::new(3)),
2495 )
2496 .await
2497 .unwrap();
2498 assert_eq!(db.inactivity_floor_loc(), Location::new(3));
2499
2500 db.commit().await.unwrap();
2502 db.sync().await.unwrap();
2503 drop(db);
2504 let db = open_db(context.child("reopen")).await;
2505 assert_eq!(db.inactivity_floor_loc(), Location::new(3));
2506
2507 db.destroy().await.unwrap();
2508 }
2509
2510 pub(crate) async fn test_immutable_floor_monotonicity<F: Family, V, C>(
2513 context: deterministic::Context,
2514 open_db: impl Fn(
2515 deterministic::Context,
2516 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2517 ) where
2518 V: ValueEncoding<Value = Digest>,
2519 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2520 C::Item: EncodeShared,
2521 {
2522 let mut db = open_db(context.child("test")).await;
2523
2524 let k1 = Sha256::fill(1u8);
2527 let v1 = Sha256::fill(2u8);
2528 db.apply_batch(
2529 db.new_batch()
2530 .set(k1, v1)
2531 .merkleize(&db, None, Location::new(2)),
2532 )
2533 .await
2534 .unwrap();
2535 assert_eq!(db.inactivity_floor_loc(), Location::new(2));
2536
2537 let k2 = Sha256::fill(3u8);
2539 let v2 = Sha256::fill(4u8);
2540 db.apply_batch(
2541 db.new_batch()
2542 .set(k2, v2)
2543 .merkleize(&db, None, Location::new(2)),
2544 )
2545 .await
2546 .unwrap();
2547 assert_eq!(db.inactivity_floor_loc(), Location::new(2));
2548
2549 let k3 = Sha256::fill(5u8);
2551 let v3 = Sha256::fill(6u8);
2552 db.apply_batch(
2553 db.new_batch()
2554 .set(k3, v3)
2555 .merkleize(&db, None, Location::new(5)),
2556 )
2557 .await
2558 .unwrap();
2559 assert_eq!(db.inactivity_floor_loc(), Location::new(5));
2560
2561 db.destroy().await.unwrap();
2562 }
2563
2564 pub(crate) async fn test_immutable_rewind_restores_floor<F: Family, V, C>(
2566 context: deterministic::Context,
2567 open_db: impl Fn(
2568 deterministic::Context,
2569 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2570 ) where
2571 V: ValueEncoding<Value = Digest>,
2572 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2573 C::Item: EncodeShared,
2574 {
2575 let mut db = open_db(context.child("test")).await;
2576
2577 let k1 = Sha256::fill(1u8);
2579 let v1 = Sha256::fill(2u8);
2580 db.apply_batch(
2581 db.new_batch()
2582 .set(k1, v1)
2583 .merkleize(&db, None, Location::new(2)),
2584 )
2585 .await
2586 .unwrap();
2587 db.commit().await.unwrap();
2588 let first_size = db.bounds().await.end;
2589 assert_eq!(db.inactivity_floor_loc(), Location::new(2));
2590
2591 let k2 = Sha256::fill(3u8);
2593 let v2 = Sha256::fill(4u8);
2594 db.apply_batch(
2595 db.new_batch()
2596 .set(k2, v2)
2597 .merkleize(&db, None, Location::new(4)),
2598 )
2599 .await
2600 .unwrap();
2601 db.commit().await.unwrap();
2602 assert_eq!(db.inactivity_floor_loc(), Location::new(4));
2603
2604 db.rewind(first_size).await.unwrap();
2606 assert_eq!(db.inactivity_floor_loc(), Location::new(2));
2607
2608 db.destroy().await.unwrap();
2609 }
2610
2611 pub(crate) async fn test_immutable_floor_monotonicity_violation<F: Family, V, C>(
2614 context: deterministic::Context,
2615 open_db: impl Fn(
2616 deterministic::Context,
2617 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2618 ) where
2619 V: ValueEncoding<Value = Digest>,
2620 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2621 C::Item: EncodeShared,
2622 {
2623 let mut db = open_db(context.child("test")).await;
2624
2625 let k1 = Sha256::fill(1u8);
2627 let v1 = Sha256::fill(2u8);
2628 db.apply_batch(
2629 db.new_batch()
2630 .set(k1, v1)
2631 .merkleize(&db, None, Location::new(2)),
2632 )
2633 .await
2634 .unwrap();
2635
2636 let k2 = Sha256::fill(3u8);
2638 let v2 = Sha256::fill(4u8);
2639 let result = db
2640 .apply_batch(
2641 db.new_batch()
2642 .set(k2, v2)
2643 .merkleize(&db, None, Location::new(1)),
2644 )
2645 .await;
2646 assert!(matches!(result, Err(Error::FloorRegressed(new, current))
2647 if new == Location::new(1) && current == Location::new(2)));
2648
2649 db.destroy().await.unwrap();
2650 }
2651
2652 pub(crate) async fn test_immutable_floor_beyond_size<F: Family, V, C>(
2655 context: deterministic::Context,
2656 open_db: impl Fn(
2657 deterministic::Context,
2658 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2659 ) where
2660 V: ValueEncoding<Value = Digest>,
2661 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2662 C::Item: EncodeShared,
2663 {
2664 let mut db = open_db(context.child("test")).await;
2665
2666 let k1 = Sha256::fill(1u8);
2669 let v1 = Sha256::fill(2u8);
2670 let result = db
2671 .apply_batch(
2672 db.new_batch()
2673 .set(k1, v1)
2674 .merkleize(&db, None, Location::new(100)),
2675 )
2676 .await;
2677 assert!(matches!(result, Err(Error::FloorBeyondSize(floor, commit))
2678 if floor == Location::new(100) && commit == Location::new(2)));
2679
2680 let k2 = Sha256::fill(3u8);
2684 let v2 = Sha256::fill(4u8);
2685 let result = db
2686 .apply_batch(
2687 db.new_batch()
2688 .set(k2, v2)
2689 .merkleize(&db, None, Location::new(3)),
2690 )
2691 .await;
2692 assert!(matches!(result, Err(Error::FloorBeyondSize(floor, commit))
2693 if floor == Location::new(3) && commit == Location::new(2)));
2694
2695 db.apply_batch(
2697 db.new_batch()
2698 .set(k2, v2)
2699 .merkleize(&db, None, Location::new(2)),
2700 )
2701 .await
2702 .unwrap();
2703
2704 db.destroy().await.unwrap();
2705 }
2706
2707 pub(crate) async fn test_immutable_chained_ancestor_floor_regression<F: Family, V, C>(
2713 context: deterministic::Context,
2714 open_db: impl Fn(
2715 deterministic::Context,
2716 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2717 ) where
2718 V: ValueEncoding<Value = Digest>,
2719 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2720 C::Item: EncodeShared,
2721 {
2722 let mut db = open_db(context.child("test")).await;
2723
2724 let a = db
2731 .new_batch()
2732 .set(Sha256::fill(1u8), Sha256::fill(2u8))
2733 .merkleize(&db, None, Location::new(2));
2734 let b = a
2735 .new_batch::<Sha256>()
2736 .set(Sha256::fill(3u8), Sha256::fill(4u8))
2737 .merkleize(&db, None, Location::new(1));
2738 let c = b
2739 .new_batch::<Sha256>()
2740 .set(Sha256::fill(5u8), Sha256::fill(6u8))
2741 .merkleize(&db, None, Location::new(2));
2742
2743 let root_before = db.root();
2744 let last_commit_before = db.last_commit_loc;
2745 let floor_before = db.inactivity_floor_loc();
2746
2747 let err = db.apply_batch(c).await.unwrap_err();
2748 assert!(
2749 matches!(err, Error::FloorRegressed(new, prev)
2750 if new == Location::new(1) && prev == Location::new(2)),
2751 "unexpected error: {err:?}"
2752 );
2753
2754 assert_eq!(db.root(), root_before);
2756 assert_eq!(db.last_commit_loc, last_commit_before);
2757 assert_eq!(db.inactivity_floor_loc(), floor_before);
2758
2759 db.destroy().await.unwrap();
2760 }
2761
2762 pub(crate) async fn test_immutable_chained_ancestor_floor_beyond_size<F: Family, V, C>(
2767 context: deterministic::Context,
2768 open_db: impl Fn(
2769 deterministic::Context,
2770 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2771 ) where
2772 V: ValueEncoding<Value = Digest>,
2773 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2774 C::Item: EncodeShared,
2775 {
2776 let mut db = open_db(context.child("test")).await;
2777
2778 let a = db
2781 .new_batch()
2782 .set(Sha256::fill(1u8), Sha256::fill(2u8))
2783 .merkleize(&db, None, Location::new(3));
2784 let b = a
2785 .new_batch::<Sha256>()
2786 .set(Sha256::fill(3u8), Sha256::fill(4u8))
2787 .merkleize(&db, None, Location::new(0));
2788
2789 let root_before = db.root();
2790 let last_commit_before = db.last_commit_loc;
2791 let floor_before = db.inactivity_floor_loc();
2792
2793 let err = db.apply_batch(b).await.unwrap_err();
2794 assert!(
2796 matches!(err, Error::FloorBeyondSize(floor, commit)
2797 if floor == Location::new(3) && commit == Location::new(2)),
2798 "unexpected error: {err:?}"
2799 );
2800
2801 assert_eq!(db.root(), root_before);
2803 assert_eq!(db.last_commit_loc, last_commit_before);
2804 assert_eq!(db.inactivity_floor_loc(), floor_before);
2805
2806 db.destroy().await.unwrap();
2807 }
2808
2809 pub(crate) async fn test_immutable_rewind_after_reopen_with_floor_change<F: Family, V, C>(
2816 context: deterministic::Context,
2817 open_db: impl Fn(
2818 deterministic::Context,
2819 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2820 ) where
2821 V: ValueEncoding<Value = Digest>,
2822 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2823 C::Item: EncodeShared,
2824 {
2825 let mut db = open_db(context.child("first")).await;
2826
2827 let k1 = Sha256::fill(1u8);
2828 let k2 = Sha256::fill(2u8);
2829 let k3 = Sha256::fill(3u8);
2830 let v1 = Sha256::fill(11u8);
2831 let v2 = Sha256::fill(12u8);
2832 let v3 = Sha256::fill(13u8);
2833
2834 commit_sets(&mut db, [(k1, v1), (k2, v2), (k3, v3)], None).await;
2836 let first_size = db.bounds().await.end;
2837 let first_root = db.root();
2838
2839 let k4 = Sha256::fill(4u8);
2841 let k5 = Sha256::fill(5u8);
2842 let k6 = Sha256::fill(6u8);
2843 let v4 = Sha256::fill(14u8);
2844 let v5 = Sha256::fill(15u8);
2845 let v6 = Sha256::fill(16u8);
2846 commit_sets_with_floor(&mut db, [(k4, v4), (k5, v5), (k6, v6)], None, first_size).await;
2847 db.sync().await.unwrap();
2848
2849 drop(db);
2851 let mut db = open_db(context.child("second")).await;
2852
2853 assert!(db.get(&k1).await.unwrap().is_none());
2855
2856 db.rewind(first_size).await.unwrap();
2858
2859 assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
2861 assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
2862 assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
2863 assert_eq!(db.root(), first_root);
2864 assert_eq!(db.inactivity_floor_loc(), Location::new(0));
2865
2866 assert!(db.get(&k4).await.unwrap().is_none());
2868
2869 db.destroy().await.unwrap();
2870 }
2871
2872 pub(crate) async fn test_immutable_rewind_after_reopen_partial_floor_gap<F: Family, V, C>(
2876 context: deterministic::Context,
2877 open_db: impl Fn(
2878 deterministic::Context,
2879 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2880 ) where
2881 V: ValueEncoding<Value = Digest>,
2882 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2883 C::Item: EncodeShared,
2884 {
2885 let mut db = open_db(context.child("first")).await;
2886
2887 let k1 = Sha256::fill(1u8);
2888 let v1 = Sha256::fill(11u8);
2889
2890 commit_sets(&mut db, [(k1, v1)], None).await;
2892 let first_size = db.bounds().await.end;
2893 let first_root = db.root();
2894
2895 let k2 = Sha256::fill(2u8);
2897 let v2 = Sha256::fill(12u8);
2898 commit_sets_with_floor(&mut db, [(k2, v2)], None, first_size).await;
2899 let second_size = db.bounds().await.end;
2900
2901 let k3 = Sha256::fill(3u8);
2904 let v3 = Sha256::fill(13u8);
2905 commit_sets_with_floor(&mut db, [(k3, v3)], None, second_size).await;
2906 db.sync().await.unwrap();
2907
2908 drop(db);
2910 let mut db = open_db(context.child("second")).await;
2911 assert!(db.get(&k1).await.unwrap().is_none());
2912 assert!(db.get(&k2).await.unwrap().is_none());
2913 assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
2914
2915 db.rewind(second_size).await.unwrap();
2919 assert!(db.get(&k1).await.unwrap().is_none()); assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
2921 assert!(db.get(&k3).await.unwrap().is_none()); db.rewind(first_size).await.unwrap();
2925 assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
2926 assert!(db.get(&k2).await.unwrap().is_none()); assert_eq!(db.root(), first_root);
2928 assert_eq!(db.inactivity_floor_loc(), Location::new(0));
2929
2930 db.destroy().await.unwrap();
2931 }
2932
2933 pub(crate) async fn test_immutable_rewind_after_reopen_repeated_key_gap<F: Family, V, C>(
2937 context: deterministic::Context,
2938 open_db: impl Fn(
2939 deterministic::Context,
2940 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2941 ) where
2942 V: ValueEncoding<Value = Digest>,
2943 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2944 C::Item: EncodeShared,
2945 {
2946 let mut db = open_db(context.child("first")).await;
2947
2948 let key = Sha256::fill(7u8);
2949 let v1 = Sha256::fill(17u8);
2950 let v2 = Sha256::fill(18u8);
2951 let k3 = Sha256::fill(8u8);
2952 let v3 = Sha256::fill(19u8);
2953
2954 commit_sets(&mut db, [(key, v1)], None).await;
2956
2957 commit_sets(&mut db, [(key, v2)], None).await;
2959 let second_size = db.bounds().await.end;
2960 assert_eq!(db.get(&key).await.unwrap(), Some(v1));
2961
2962 commit_sets_with_floor(&mut db, [(k3, v3)], None, second_size).await;
2964 db.sync().await.unwrap();
2965
2966 drop(db);
2968 let mut db = open_db(context.child("second")).await;
2969 assert!(db.get(&key).await.unwrap().is_none());
2970 assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
2971
2972 db.rewind(second_size).await.unwrap();
2974 assert_eq!(db.get(&key).await.unwrap(), Some(v1));
2975
2976 db.destroy().await.unwrap();
2977 }
2978
2979 pub(crate) async fn test_immutable_rewind_after_reopen_mixed_gap_retained<F: Family, V, C>(
2983 context: deterministic::Context,
2984 open_db: impl Fn(
2985 deterministic::Context,
2986 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
2987 ) where
2988 V: ValueEncoding<Value = Digest>,
2989 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
2990 C::Item: EncodeShared,
2991 {
2992 let mut db = open_db(context.child("first")).await;
2993
2994 let key = Sha256::fill(7u8);
2995 let v1 = Sha256::fill(17u8);
2996 let v2 = Sha256::fill(18u8);
2997 let k3 = Sha256::fill(8u8);
2998 let v3 = Sha256::fill(19u8);
2999
3000 commit_sets(&mut db, [(key, v1)], None).await;
3002 let first_size = db.bounds().await.end;
3003
3004 commit_sets(&mut db, [(key, v2)], None).await;
3006 let second_size = db.bounds().await.end;
3007 assert_eq!(db.get(&key).await.unwrap(), Some(v1));
3008
3009 commit_sets_with_floor(&mut db, [(k3, v3)], None, first_size).await;
3012 db.sync().await.unwrap();
3013
3014 drop(db);
3017 let mut db = open_db(context.child("second")).await;
3018 assert_eq!(db.get(&key).await.unwrap(), Some(v2));
3019
3020 db.rewind(second_size).await.unwrap();
3023 assert_eq!(db.get(&key).await.unwrap(), Some(v1));
3024
3025 db.destroy().await.unwrap();
3026 }
3027
3028 pub(crate) async fn test_immutable_single_commit_live_set<F: Family, V, C>(
3038 context: deterministic::Context,
3039 open_db: impl Fn(
3040 deterministic::Context,
3041 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
3042 ) where
3043 V: ValueEncoding<Value = Digest>,
3044 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
3045 C::Item: EncodeShared,
3046 {
3047 let mut db = open_db(context.child("test")).await;
3048
3049 let metadata = Sha256::fill(42u8);
3052 let commit_loc = Location::<F>::new(4);
3053 let k1 = Sha256::fill(1u8);
3054 let k2 = Sha256::fill(2u8);
3055 let k3 = Sha256::fill(3u8);
3056 let v1 = Sha256::fill(11u8);
3057 let v2 = Sha256::fill(12u8);
3058 let v3 = Sha256::fill(13u8);
3059 db.apply_batch(
3060 db.new_batch()
3061 .set(k1, v1)
3062 .set(k2, v2)
3063 .set(k3, v3)
3064 .merkleize(&db, Some(metadata), commit_loc),
3065 )
3066 .await
3067 .unwrap();
3068 db.commit().await.unwrap();
3069 assert_eq!(db.last_commit_loc, commit_loc);
3070 assert_eq!(db.inactivity_floor_loc(), commit_loc);
3071 let root_after_commit = db.root();
3072
3073 assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
3075 assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
3076 assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
3077
3078 db.prune(commit_loc).await.unwrap();
3083 let bounds = db.bounds().await;
3084 assert!(
3085 bounds.start <= commit_loc,
3086 "prune must not advance bounds.start past the floor"
3087 );
3088 assert_eq!(bounds.end, Location::new(*commit_loc + 1));
3089
3090 let err = db.prune(Location::new(*commit_loc + 1)).await.unwrap_err();
3092 assert!(matches!(err, Error::PruneBeyondMinRequired(p, f)
3093 if *p == *commit_loc + 1 && *f == *commit_loc));
3094
3095 assert_eq!(db.last_commit_loc, commit_loc);
3097 assert_eq!(db.inactivity_floor_loc(), commit_loc);
3098 assert_eq!(db.root(), root_after_commit);
3099 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
3100
3101 db.sync().await.unwrap();
3105 drop(db);
3106 let mut db = open_db(context.child("reopened")).await;
3107 assert_eq!(db.last_commit_loc, commit_loc);
3108 assert_eq!(db.inactivity_floor_loc(), commit_loc);
3109 assert_eq!(db.root(), root_after_commit);
3110 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
3113
3114 assert!(db.get(&k1).await.unwrap().is_none());
3116 assert!(db.get(&k2).await.unwrap().is_none());
3117 assert!(db.get(&k3).await.unwrap().is_none());
3118
3119 let k4 = Sha256::fill(4u8);
3123 let v4 = Sha256::fill(14u8);
3124 let next_commit_loc = Location::<F>::new(6);
3125 db.apply_batch(
3126 db.new_batch()
3127 .set(k4, v4)
3128 .merkleize(&db, None, next_commit_loc),
3129 )
3130 .await
3131 .unwrap();
3132 db.commit().await.unwrap();
3133 assert_eq!(db.last_commit_loc, next_commit_loc);
3134 assert_eq!(db.inactivity_floor_loc(), next_commit_loc);
3135
3136 assert_eq!(db.get(&k4).await.unwrap(), Some(v4));
3138 assert!(db.get(&k1).await.unwrap().is_none());
3139 assert_eq!(db.get_metadata().await.unwrap(), None);
3142
3143 db.destroy().await.unwrap();
3144 }
3145
3146 pub(crate) async fn test_immutable_get_many<F: Family, V, C>(
3149 context: deterministic::Context,
3150 open_db: impl Fn(
3151 deterministic::Context,
3152 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
3153 ) where
3154 V: ValueEncoding<Value = Digest>,
3155 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
3156 C::Item: EncodeShared,
3157 {
3158 let mut db = open_db(context.child("db")).await;
3159
3160 let k1 = Sha256::fill(1u8);
3161 let k2 = Sha256::fill(2u8);
3162 let k3 = Sha256::fill(3u8);
3163 let k_missing = Sha256::fill(99u8);
3164
3165 let v1 = Sha256::fill(11u8);
3166 let v2 = Sha256::fill(12u8);
3167 let v3 = Sha256::fill(13u8);
3168
3169 db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
3171 &db,
3172 None,
3173 db.inactivity_floor_loc(),
3174 ))
3175 .await
3176 .unwrap();
3177 db.commit().await.unwrap();
3178
3179 let results = db.get_many(&[&k1, &k2, &k_missing]).await.unwrap();
3181 assert_eq!(results, vec![Some(v1), Some(v2), None]);
3182
3183 let results = db.get_many(&([] as [&Digest; 0])).await.unwrap();
3185 assert!(results.is_empty());
3186
3187 let batch = db.new_batch().set(k3, v3);
3189 let results = batch.get_many(&[&k3, &k1, &k_missing], &db).await.unwrap();
3190 assert_eq!(results, vec![Some(v3), Some(v1), None]);
3191
3192 let parent = db
3194 .new_batch()
3195 .set(k3, v3)
3196 .merkleize(&db, None, db.inactivity_floor_loc());
3197 let results = parent.get_many(&[&k1, &k3, &k_missing], &db).await.unwrap();
3198 assert_eq!(results, vec![Some(v1), Some(v3), None]);
3199
3200 let v3_new = Sha256::fill(30u8);
3202 let child = parent.new_batch::<Sha256>().set(k3, v3_new);
3203 let results = child.get_many(&[&k1, &k3, &k_missing], &db).await.unwrap();
3204 assert_eq!(results, vec![Some(v1), Some(v3_new), None]);
3205
3206 db.destroy().await.unwrap();
3207 }
3208
3209 pub(crate) async fn test_immutable_get_many_unexpected_data<F: Family, V, C>(
3211 context: deterministic::Context,
3212 open_db: impl Fn(
3213 deterministic::Context,
3214 ) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
3215 ) where
3216 V: ValueEncoding<Value = Digest>,
3217 C: Mutable<Item = Operation<F, Digest, V>> + Persistable<Error = JournalError>,
3218 C::Item: EncodeShared,
3219 {
3220 let mut db = open_db(context.child("db")).await;
3221
3222 let key = Sha256::fill(1u8);
3223 let value = Sha256::fill(11u8);
3224 db.apply_batch(db.new_batch().set(key, value).merkleize(
3225 &db,
3226 None,
3227 db.inactivity_floor_loc(),
3228 ))
3229 .await
3230 .unwrap();
3231 db.commit().await.unwrap();
3232
3233 let bad_key = Sha256::fill(99u8);
3234 let bad_loc = db.last_commit_loc;
3235 db.snapshot.insert(&bad_key, bad_loc);
3236
3237 let err = db.get(&bad_key).await.unwrap_err();
3238 assert!(matches!(err, Error::UnexpectedData(loc) if loc == bad_loc));
3239
3240 let err = db.get_many(&[&bad_key]).await.unwrap_err();
3241 assert!(matches!(err, Error::UnexpectedData(loc) if loc == bad_loc));
3242
3243 db.destroy().await.unwrap();
3244 }
3245}