1use crate::{
4 index::{Ordered as OrderedIndex, Unordered as UnorderedIndex},
5 journal::{
6 authenticated,
7 contiguous::{Contiguous, Mutable, Reader},
8 },
9 merkle::{Family, Location},
10 qmdb::{
11 any::{
12 db::Db,
13 operation::{update, Operation},
14 ordered::{find_next_key, find_prev_key},
15 ValueEncoding,
16 },
17 delete_known_loc,
18 operation::{Key, Operation as OperationTrait},
19 update_known_loc,
20 },
21 Context,
22};
23use commonware_codec::Codec;
24use commonware_cryptography::{Digest, Hasher};
25use core::{iter, ops::Range};
26use futures::future::try_join_all;
27use std::{
28 collections::{BTreeMap, BTreeSet},
29 sync::{Arc, Weak},
30};
31use tracing::debug;
32
33const MAX_CONCURRENT_READS: u64 = 64;
35
36pub(crate) trait FloorScan<F: Family> {
38 fn next_candidate(&mut self, floor: Location<F>, tip: u64) -> Option<Location<F>>;
41}
42
43pub(crate) struct SequentialScan;
46
47impl<F: Family> FloorScan<F> for SequentialScan {
48 fn next_candidate(&mut self, floor: Location<F>, tip: u64) -> Option<Location<F>> {
49 if *floor < tip {
50 Some(floor)
51 } else {
52 None
53 }
54 }
55}
56
57#[derive(Clone)]
59pub(crate) enum DiffEntry<F: Family, V> {
60 Active {
62 value: V,
63 loc: Location<F>,
65 base_old_loc: Option<Location<F>>,
69 },
70 Deleted {
72 base_old_loc: Option<Location<F>>,
75 },
76}
77
78impl<F: Family, V> DiffEntry<F, V> {
79 pub(crate) const fn base_old_loc(&self) -> Option<Location<F>> {
81 match self {
82 Self::Active { base_old_loc, .. } | Self::Deleted { base_old_loc } => *base_old_loc,
83 }
84 }
85
86 pub(crate) const fn loc(&self) -> Option<Location<F>> {
88 match self {
89 Self::Active { loc, .. } => Some(*loc),
90 Self::Deleted { .. } => None,
91 }
92 }
93
94 pub(crate) const fn value(&self) -> Option<&V> {
96 match self {
97 Self::Active { value, .. } => Some(value),
98 Self::Deleted { .. } => None,
99 }
100 }
101}
102
103enum Base<F: Family, D: Digest, U: update::Update + Send + Sync>
105where
106 Operation<F, U>: Send + Sync,
107{
108 Db {
110 db_size: u64,
111 inactivity_floor_loc: Location<F>,
112 active_keys: usize,
113 },
114 Child(Arc<MerkleizedBatch<F, D, U>>),
116}
117
118impl<F: Family, D: Digest, U: update::Update + Send + Sync> Base<F, D, U>
119where
120 Operation<F, U>: Send + Sync,
121{
122 fn base_size(&self) -> u64 {
124 match self {
125 Self::Db { db_size, .. } => *db_size,
126 Self::Child(parent) => parent.total_size,
127 }
128 }
129
130 fn db_size(&self) -> u64 {
135 match self {
136 Self::Db { db_size, .. } => *db_size,
137 Self::Child(parent) => parent.db_size,
138 }
139 }
140
141 fn inactivity_floor_loc(&self) -> Location<F> {
142 match self {
143 Self::Db {
144 inactivity_floor_loc,
145 ..
146 } => *inactivity_floor_loc,
147 Self::Child(parent) => parent.new_inactivity_floor_loc,
148 }
149 }
150
151 fn active_keys(&self) -> usize {
152 match self {
153 Self::Db { active_keys, .. } => *active_keys,
154 Self::Child(parent) => parent.total_active_keys,
155 }
156 }
157
158 const fn parent(&self) -> Option<&Arc<MerkleizedBatch<F, D, U>>> {
159 match self {
160 Self::Db { .. } => None,
161 Self::Child(parent) => Some(parent),
162 }
163 }
164}
165
166pub struct UnmerkleizedBatch<F: Family, H, U>
172where
173 U: update::Update + Send + Sync,
174 H: Hasher,
175 Operation<F, U>: Codec,
176{
177 journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, U>>,
179
180 mutations: BTreeMap<U::Key, Option<U::Value>>,
182
183 base: Base<F, H::Digest, U>,
185}
186
187#[allow(clippy::type_complexity)]
209#[derive(Clone)]
210pub struct MerkleizedBatch<F: Family, D: Digest, U: update::Update + Send + Sync>
211where
212 Operation<F, U>: Send + Sync,
213{
214 pub(crate) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<F, U>>>,
216
217 pub(crate) diff: Arc<BTreeMap<U::Key, DiffEntry<F, U::Value>>>,
219
220 parent: Option<Weak<Self>>,
222
223 pub(crate) new_inactivity_floor_loc: Location<F>,
225
226 pub(crate) new_last_commit_loc: Location<F>,
228
229 pub(crate) base_size: u64,
231
232 pub(crate) total_size: u64,
234
235 pub(crate) total_active_keys: usize,
237
238 pub(crate) db_size: u64,
243
244 pub(crate) ancestor_diffs: Vec<Arc<BTreeMap<U::Key, DiffEntry<F, U::Value>>>>,
248
249 pub(crate) ancestor_diff_ends: Vec<u64>,
253}
254
255struct Merkleizer<F: Family, H, U>
262where
263 U: update::Update + Send + Sync,
264 H: Hasher,
265 Operation<F, U>: Codec,
266{
267 journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, U>>,
268 ancestors: Vec<Arc<MerkleizedBatch<F, H::Digest, U>>>,
269 base_size: u64,
270 db_size: u64,
271 base_inactivity_floor_loc: Location<F>,
272 base_active_keys: usize,
273}
274
275fn resolve_in_ancestors<'a, F: Family, D: Digest, U: update::Update + Send + Sync>(
277 ancestors: &'a [Arc<MerkleizedBatch<F, D, U>>],
278 key: &U::Key,
279) -> Option<&'a DiffEntry<F, U::Value>>
280where
281 Operation<F, U>: Send + Sync,
282{
283 for batch in ancestors {
284 if let Some(entry) = batch.diff.get(key) {
285 return Some(entry);
286 }
287 }
288 None
289}
290
291fn apply_snapshot_diff<F: Family, V, I: UnorderedIndex<Value = Location<F>>>(
293 snapshot: &mut I,
294 key: &impl Key,
295 entry: &DiffEntry<F, V>,
296 base_old_loc: Option<Location<F>>,
297) {
298 match entry {
299 DiffEntry::Active { loc, .. } => match base_old_loc {
300 Some(old) => update_known_loc::<F, _>(snapshot, key, old, *loc),
301 None => snapshot.insert(key, *loc),
302 },
303 DiffEntry::Deleted { .. } => {
304 if let Some(old) = base_old_loc {
305 delete_known_loc::<F, _>(snapshot, key, old);
306 }
307 }
308 }
309}
310
311fn read_op_from_ancestors<F: Family, D: Digest, U: update::Update + Send + Sync>(
320 ancestors: &[Arc<MerkleizedBatch<F, D, U>>],
321 loc: u64,
322 db_size: u64,
323) -> &Operation<F, U>
324where
325 Operation<F, U>: Send + Sync,
326{
327 for (i, batch) in ancestors.iter().enumerate() {
331 let batch_base = ancestors
332 .get(i + 1)
333 .map_or(db_size, |b| b.journal_batch.size());
334 let batch_end = batch.journal_batch.size();
335 if loc >= batch_base && loc < batch_end {
336 return &batch.journal_batch.items()[(loc - batch_base) as usize];
337 }
338 }
339 unreachable!("location {loc} not found in ancestor chain (db_size={db_size})")
340}
341
342impl<F: Family, H, U> Merkleizer<F, H, U>
365where
366 U: update::Update + Send + Sync,
367 H: Hasher,
368 Operation<F, U>: Codec,
369{
370 fn try_read_op_from_uncommitted(
373 &self,
374 loc: Location<F>,
375 batch_ops: &[Operation<F, U>],
376 ) -> Option<Operation<F, U>> {
377 let loc = *loc;
378
379 if loc >= self.base_size {
380 return Some(batch_ops[(loc - self.base_size) as usize].clone());
381 }
382
383 if loc >= self.db_size {
384 return Some(read_op_from_ancestors(&self.ancestors, loc, self.db_size).clone());
385 }
386
387 None
388 }
389
390 fn try_read_op_sync<R: Reader<Item = Operation<F, U>>>(
393 &self,
394 loc: Location<F>,
395 batch_ops: &[Operation<F, U>],
396 reader: &R,
397 ) -> Option<Operation<F, U>> {
398 self.try_read_op_from_uncommitted(loc, batch_ops)
399 .or_else(|| reader.try_read_sync(*loc))
400 }
401
402 async fn read_op<R: Reader<Item = Operation<F, U>>>(
404 &self,
405 loc: Location<F>,
406 batch_ops: &[Operation<F, U>],
407 reader: &R,
408 ) -> Result<Operation<F, U>, crate::qmdb::Error<F>> {
409 match self.try_read_op_sync(loc, batch_ops, reader) {
410 Some(op) => Ok(op),
411 None => Ok(reader.read(*loc).await?),
412 }
413 }
414
415 async fn read_ops<R: Reader<Item = Operation<F, U>>>(
417 &self,
418 locations: &[Location<F>],
419 batch_ops: &[Operation<F, U>],
420 reader: &R,
421 ) -> Result<Vec<Operation<F, U>>, crate::qmdb::Error<F>> {
422 let results: Vec<Option<Operation<F, U>>> = locations
424 .iter()
425 .map(|loc| self.try_read_op_sync(*loc, batch_ops, reader))
426 .collect();
427
428 let disk_results = try_join_all(
430 locations
431 .iter()
432 .zip(results.iter())
433 .filter(|(_, cached)| cached.is_none())
434 .map(|(loc, _)| reader.read(**loc)),
435 )
436 .await?;
437
438 let mut disk_iter = disk_results.into_iter();
440 Ok(results
441 .into_iter()
442 .map(|r| r.unwrap_or_else(|| disk_iter.next().expect("disk result count mismatch")))
443 .collect())
444 }
445
446 fn gather_existing_locations<E, C, I>(
457 &self,
458 mutations: &BTreeMap<U::Key, Option<U::Value>>,
459 db: &Db<F, E, C, I, H, U>,
460 include_active_collision_siblings: bool,
461 ) -> Vec<Location<F>>
462 where
463 E: Context,
464 C: Contiguous<Item = Operation<F, U>>,
465 I: UnorderedIndex<Value = Location<F>>,
466 {
467 let mut locations = Vec::with_capacity(mutations.len() * 3 / 2);
470 if self.ancestors.is_empty() {
471 for key in mutations.keys() {
472 locations.extend(db.snapshot.get(key).copied());
473 }
474 } else {
475 for key in mutations.keys() {
476 match resolve_in_ancestors(&self.ancestors, key) {
477 Some(DiffEntry::Deleted { .. }) => {
478 }
480 Some(DiffEntry::Active {
481 loc, base_old_loc, ..
482 }) => {
483 locations.push(*loc);
484 if include_active_collision_siblings {
485 locations.extend(
486 db.snapshot
487 .get(key)
488 .copied()
489 .filter(move |loc| Some(*loc) != *base_old_loc),
490 );
491 }
492 }
493 None => {
494 locations.extend(db.snapshot.get(key).copied());
495 }
496 }
497 }
498 }
499 locations.sort();
500 locations.dedup();
501 locations
502 }
503
504 fn is_active_at<E, C, I>(
506 &self,
507 key: &U::Key,
508 loc: Location<F>,
509 batch_diff: &BTreeMap<U::Key, DiffEntry<F, U::Value>>,
510 db: &Db<F, E, C, I, H, U>,
511 ) -> bool
512 where
513 E: Context,
514 C: Contiguous<Item = Operation<F, U>>,
515 I: UnorderedIndex<Value = Location<F>>,
516 {
517 if let Some(entry) = batch_diff
518 .get(key)
519 .or_else(|| resolve_in_ancestors(&self.ancestors, key))
520 {
521 return entry.loc() == Some(loc);
522 }
523 db.snapshot.get(key).any(|&l| l == loc)
524 }
525
526 #[allow(clippy::type_complexity)]
530 fn extract_parent_deleted_creates(
531 &self,
532 mutations: &mut BTreeMap<U::Key, Option<U::Value>>,
533 ) -> BTreeMap<U::Key, (U::Value, Option<Location<F>>)> {
534 if self.ancestors.is_empty() {
535 return BTreeMap::new();
536 }
537 let mut creates = BTreeMap::new();
538 mutations.retain(|key, value| {
539 if let Some(DiffEntry::Deleted { base_old_loc }) =
540 resolve_in_ancestors(&self.ancestors, key)
541 {
542 if let Some(v) = value.take() {
543 creates.insert(key.clone(), (v, *base_old_loc));
544 return false;
545 }
546 }
547 true
548 });
549 creates
550 }
551
552 #[allow(clippy::too_many_arguments)]
555 async fn finish<E, C, I, S, R>(
556 self,
557 mut ops: Vec<Operation<F, U>>,
558 mut diff: BTreeMap<U::Key, DiffEntry<F, U::Value>>,
559 active_keys_delta: isize,
560 user_steps: u64,
561 metadata: Option<U::Value>,
562 mut scan: S,
563 reader: R,
564 db: &Db<F, E, C, I, H, U>,
565 ) -> Result<Arc<MerkleizedBatch<F, H::Digest, U>>, crate::qmdb::Error<F>>
566 where
567 E: Context,
568 C: Contiguous<Item = Operation<F, U>>,
569 I: UnorderedIndex<Value = Location<F>>,
570 S: FloorScan<F>,
571 R: Reader<Item = Operation<F, U>>,
572 {
573 let total_steps = user_steps + 1;
576 let total_active_keys = self.base_active_keys as isize + active_keys_delta;
577 let mut floor = self.base_inactivity_floor_loc;
578
579 if total_active_keys > 0 {
580 let fixed_tip = self.base_size + ops.len() as u64;
583 let mut moved = 0u64;
584 let mut scan_from = floor;
585
586 while moved < total_steps {
587 let limit = ((total_steps - moved) as usize).min(MAX_CONCURRENT_READS as usize);
591 let mut candidates = Vec::with_capacity(limit);
592 while candidates.len() < limit {
593 let Some(candidate) = scan.next_candidate(scan_from, fixed_tip) else {
594 break;
595 };
596 candidates.push(candidate);
597 scan_from = Location::new(*candidate + 1);
598 }
599 if candidates.is_empty() {
600 break;
601 }
602
603 let resolved = self.read_ops(&candidates, &ops, &reader).await?;
606
607 for (candidate, op) in candidates.into_iter().zip(resolved) {
609 floor = Location::new(*candidate + 1);
610 let Some(key) = op.key().cloned() else {
611 continue; };
613 if !self.is_active_at(&key, candidate, &diff, db) {
614 continue;
615 }
616 let new_loc = Location::new(self.base_size + ops.len() as u64);
617 let base_old_loc = diff
618 .get(&key)
619 .or_else(|| resolve_in_ancestors(&self.ancestors, &key))
620 .map_or(Some(candidate), DiffEntry::base_old_loc);
621 let value = extract_update_value(&op);
622 ops.push(op);
623 diff.insert(
624 key,
625 DiffEntry::Active {
626 value,
627 loc: new_loc,
628 base_old_loc,
629 },
630 );
631 moved += 1;
632 if moved >= total_steps {
633 break;
634 }
635 }
636 }
637 } else {
638 floor = Location::new(self.base_size + ops.len() as u64);
640 debug!(tip = ?floor, "db is empty, raising floor to tip");
641 }
642
643 drop(reader);
646
647 let commit_loc = Location::new(self.base_size + ops.len() as u64);
649 ops.push(Operation::CommitFloor(metadata, floor));
650
651 let ops = Arc::new(ops);
657 let journal = db
658 .log
659 .with_mem(|base| self.journal_batch.merkleize_with(base, ops));
660
661 let ancestor_diffs: Vec<_> = self.ancestors.iter().map(|a| Arc::clone(&a.diff)).collect();
662 let ancestor_diff_ends: Vec<_> = self.ancestors.iter().map(|a| a.total_size).collect();
663
664 debug_assert!(total_active_keys >= 0, "active_keys underflow");
665 Ok(Arc::new(MerkleizedBatch {
666 journal_batch: journal,
667 diff: Arc::new(diff),
668 parent: self.ancestors.first().map(Arc::downgrade),
669 new_inactivity_floor_loc: floor,
670 new_last_commit_loc: commit_loc,
671 base_size: self.base_size,
672 total_size: *commit_loc + 1,
673 total_active_keys: total_active_keys as usize,
674 db_size: self.db_size,
675 ancestor_diffs,
676 ancestor_diff_ends,
677 }))
678 }
679}
680
681impl<F: Family, H, U> UnmerkleizedBatch<F, H, U>
682where
683 U: update::Update + Send + Sync,
684 H: Hasher,
685 Operation<F, U>: Codec,
686{
687 pub fn write(mut self, key: U::Key, value: Option<U::Value>) -> Self {
692 self.mutations.insert(key, value);
693 self
694 }
695
696 #[allow(clippy::type_complexity)]
698 fn into_parts(self) -> (BTreeMap<U::Key, Option<U::Value>>, Merkleizer<F, H, U>) {
699 let ancestors: Vec<_> = self.base.parent().map_or_else(Vec::new, |parent| {
700 let mut v = vec![Arc::clone(parent)];
701 v.extend(parent.ancestors());
702 v
703 });
704 let db_size = self.base.db_size();
710 let effective_db_size = ancestors.last().map_or(db_size, |oldest| {
711 let oldest_base =
712 oldest.journal_batch.size() - oldest.journal_batch.items().len() as u64;
713 db_size.max(oldest_base)
714 });
715 (
716 self.mutations,
717 Merkleizer {
718 journal_batch: self.journal_batch,
719 ancestors,
720 base_size: self.base.base_size(),
721 db_size: effective_db_size,
722 base_inactivity_floor_loc: self.base.inactivity_floor_loc(),
723 base_active_keys: self.base.active_keys(),
724 },
725 )
726 }
727}
728
729impl<F: Family, H, U> UnmerkleizedBatch<F, H, U>
731where
732 U: update::Update + Send + Sync,
733 H: Hasher,
734 Operation<F, U>: Codec,
735{
736 pub async fn get<E, C, I>(
738 &self,
739 key: &U::Key,
740 db: &Db<F, E, C, I, H, U>,
741 ) -> Result<Option<U::Value>, crate::qmdb::Error<F>>
742 where
743 E: Context,
744 C: Contiguous<Item = Operation<F, U>>,
745 I: UnorderedIndex<Value = Location<F>> + 'static,
746 {
747 if let Some(value) = self.mutations.get(key) {
748 return Ok(value.clone());
749 }
750 if let Some(parent) = self.base.parent() {
751 if let Some(entry) = parent.diff.get(key) {
752 return Ok(entry.value().cloned());
753 }
754 for batch in parent.ancestors() {
755 if let Some(entry) = batch.diff.get(key) {
756 return Ok(entry.value().cloned());
757 }
758 }
759 }
760 db.get(key).await
761 }
762}
763
764impl<F: Family, K, V, H> UnmerkleizedBatch<F, H, update::Unordered<K, V>>
766where
767 K: Key,
768 V: ValueEncoding,
769 H: Hasher,
770 Operation<F, update::Unordered<K, V>>: Codec,
771{
772 pub async fn merkleize<E, C, I>(
774 self,
775 db: &Db<F, E, C, I, H, update::Unordered<K, V>>,
776 metadata: Option<V::Value>,
777 ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>, crate::qmdb::Error<F>>
778 where
779 E: Context,
780 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
781 I: UnorderedIndex<Value = Location<F>>,
782 {
783 self.merkleize_with_floor_scan(db, metadata, SequentialScan)
784 .await
785 }
786
787 pub(crate) async fn merkleize_with_floor_scan<E, C, I, S: FloorScan<F>>(
790 self,
791 db: &Db<F, E, C, I, H, update::Unordered<K, V>>,
792 metadata: Option<V::Value>,
793 scan: S,
794 ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>, crate::qmdb::Error<F>>
795 where
796 E: Context,
797 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
798 I: UnorderedIndex<Value = Location<F>>,
799 {
800 let (mut mutations, m) = self.into_parts();
801
802 let locations = m.gather_existing_locations(&mutations, db, false);
804 let reader = db.log.reader().await;
805 let results = m.read_ops(&locations, &[], &reader).await?;
806
807 let mut ops: Vec<Operation<F, update::Unordered<K, V>>> =
809 Vec::with_capacity(mutations.len() + 1);
810 let mut diff: BTreeMap<K, DiffEntry<F, V::Value>> = BTreeMap::new();
811 let mut active_keys_delta: isize = 0;
812 let mut user_steps: u64 = 0;
813
814 for (op, &old_loc) in results.iter().zip(&locations) {
817 let key = op.key().expect("updates should have a key");
818
819 let base_old_loc = if let Some(entry) = resolve_in_ancestors(&m.ancestors, key) {
826 if entry.loc() != Some(old_loc) {
827 continue;
828 }
829 entry.base_old_loc()
830 } else {
831 Some(old_loc)
832 };
833
834 let Some(mutation) = mutations.remove(key) else {
835 continue;
838 };
839
840 let new_loc = Location::new(m.base_size + ops.len() as u64);
843 match mutation {
844 Some(value) => {
845 ops.push(Operation::Update(update::Unordered(
846 key.clone(),
847 value.clone(),
848 )));
849 diff.insert(
850 key.clone(),
851 DiffEntry::Active {
852 value,
853 loc: new_loc,
854 base_old_loc,
855 },
856 );
857 user_steps += 1;
858 }
859 None => {
860 ops.push(Operation::Delete(key.clone()));
861 diff.insert(key.clone(), DiffEntry::Deleted { base_old_loc });
862 active_keys_delta -= 1;
863 user_steps += 1;
864 }
865 }
866 }
867
868 let parent_deleted_creates = m.extract_parent_deleted_creates(&mut mutations);
870
871 let mut creates: Vec<(K, V::Value, Option<Location<F>>)> =
876 Vec::with_capacity(mutations.len() + parent_deleted_creates.len());
877 for (key, value) in mutations {
878 if let Some(value) = value {
879 creates.push((key, value, None));
880 }
881 }
882 for (key, (value, base_old_loc)) in parent_deleted_creates {
883 creates.push((key, value, base_old_loc));
884 }
885 creates.sort_by(|(a, _, _), (b, _, _)| a.cmp(b));
886 for (key, value, base_old_loc) in creates {
887 let new_loc = Location::new(m.base_size + ops.len() as u64);
888 ops.push(Operation::Update(update::Unordered(
889 key.clone(),
890 value.clone(),
891 )));
892 diff.insert(
893 key,
894 DiffEntry::Active {
895 value,
896 loc: new_loc,
897 base_old_loc,
898 },
899 );
900 active_keys_delta += 1;
901 }
902
903 m.finish(
905 ops,
906 diff,
907 active_keys_delta,
908 user_steps,
909 metadata,
910 scan,
911 reader,
912 db,
913 )
914 .await
915 }
916}
917
918impl<F: Family, K, V, H> UnmerkleizedBatch<F, H, update::Ordered<K, V>>
920where
921 K: Key,
922 V: ValueEncoding,
923 H: Hasher,
924 Operation<F, update::Ordered<K, V>>: Codec,
925{
926 pub async fn merkleize<E, C, I>(
928 self,
929 db: &Db<F, E, C, I, H, update::Ordered<K, V>>,
930 metadata: Option<V::Value>,
931 ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>, crate::qmdb::Error<F>>
932 where
933 E: Context,
934 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
935 I: OrderedIndex<Value = Location<F>>,
936 {
937 self.merkleize_with_floor_scan(db, metadata, SequentialScan)
938 .await
939 }
940
941 pub(crate) async fn merkleize_with_floor_scan<E, C, I, S: FloorScan<F>>(
944 self,
945 db: &Db<F, E, C, I, H, update::Ordered<K, V>>,
946 metadata: Option<V::Value>,
947 scan: S,
948 ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>, crate::qmdb::Error<F>>
949 where
950 E: Context,
951 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
952 I: OrderedIndex<Value = Location<F>>,
953 {
954 let (mut mutations, m) = self.into_parts();
955
956 let locations = m.gather_existing_locations(&mutations, db, true);
958 let reader = db.log.reader().await;
959
960 let mut next_candidates: BTreeSet<K> = BTreeSet::new();
962 let mut prev_candidates: BTreeMap<K, (V::Value, Location<F>)> = BTreeMap::new();
963 let mut deleted: BTreeMap<K, Location<F>> = BTreeMap::new();
964 let mut updated: BTreeMap<K, (V::Value, Location<F>)> = BTreeMap::new();
965
966 for (op, &old_loc) in m
967 .read_ops(&locations, &[], &reader)
968 .await?
969 .into_iter()
970 .zip(&locations)
971 {
972 let update::Ordered {
973 key,
974 value,
975 next_key,
976 } = match op {
977 Operation::Update(data) => data,
978 _ => unreachable!("snapshot should only reference Update operations"),
979 };
980 next_candidates.insert(next_key);
981
982 let mutation = mutations.remove(&key);
983 prev_candidates.insert(key.clone(), (value, old_loc));
984
985 let Some(mutation) = mutation else {
986 continue;
990 };
991
992 if let Some(new_value) = mutation {
993 updated.insert(key, (new_value, old_loc));
994 } else {
995 deleted.insert(key, old_loc);
996 }
997 }
998
999 let parent_deleted_creates = m.extract_parent_deleted_creates(&mut mutations);
1001
1002 let mut created: Vec<(K, V::Value, Option<Location<F>>)> =
1007 Vec::with_capacity(mutations.len() + parent_deleted_creates.len());
1008 for (key, value) in mutations {
1009 let Some(value) = value else {
1010 continue; };
1012 next_candidates.insert(key.clone());
1013 created.push((key, value, None));
1014 }
1015 for (key, (value, base_old_loc)) in parent_deleted_creates {
1016 next_candidates.insert(key.clone());
1017 created.push((key, value, base_old_loc));
1018 }
1019 created.sort_by(|(a, _, _), (b, _, _)| a.cmp(b));
1020
1021 let mut prev_locations = Vec::new();
1023 for key in deleted.keys().chain(created.iter().map(|(k, _, _)| k)) {
1024 let Some((iter, _)) = db.snapshot.prev_translated_key(key) else {
1025 continue;
1026 };
1027 prev_locations.extend(iter.copied());
1028 }
1029 prev_locations.sort();
1030 prev_locations.dedup();
1031
1032 let prev_results = m.read_ops(&prev_locations, &[], &reader).await?;
1033
1034 for (op, &old_loc) in prev_results.into_iter().zip(&prev_locations) {
1035 let data = match op {
1036 Operation::Update(data) => data,
1037 _ => unreachable!("expected update operation"),
1038 };
1039 next_candidates.insert(data.next_key);
1040 prev_candidates.insert(data.key, (data.value, old_loc));
1041 }
1042
1043 let ancestor_entries = {
1048 let mut entries: BTreeMap<&K, &DiffEntry<F, V::Value>> = BTreeMap::new();
1049 for batch in &m.ancestors {
1050 for (key, entry) in batch.diff.iter() {
1051 entries.entry(key).or_insert(entry);
1052 }
1053 }
1054 entries
1055 };
1056
1057 for (key, entry) in &ancestor_entries {
1058 if updated.contains_key(*key)
1060 || created.binary_search_by(|(k, _, _)| k.cmp(*key)).is_ok()
1061 || deleted.contains_key(*key)
1062 {
1063 continue;
1064 }
1065 if let DiffEntry::Active { value, loc, .. } = entry {
1066 let op = m.read_op(*loc, &[], &reader).await?;
1067 let data = match op {
1068 Operation::Update(data) => data,
1069 _ => unreachable!("ancestor diff Active should reference Update op"),
1070 };
1071 next_candidates.insert((*key).clone());
1072 next_candidates.insert(data.next_key);
1073 prev_candidates.insert((*key).clone(), (value.clone(), *loc));
1074 }
1075 }
1076
1077 for key in deleted.keys() {
1082 prev_candidates.remove(key);
1083 next_candidates.remove(key);
1084 }
1085 for (key, entry) in &ancestor_entries {
1086 if matches!(entry, DiffEntry::Deleted { .. })
1087 && created.binary_search_by(|(k, _, _)| k.cmp(*key)).is_err()
1088 {
1089 prev_candidates.remove(*key);
1090 next_candidates.remove(*key);
1091 }
1092 }
1093
1094 let mut ops: Vec<Operation<F, update::Ordered<K, V>>> =
1096 Vec::with_capacity(deleted.len() + updated.len() + created.len() + 1);
1097 let mut diff: BTreeMap<K, DiffEntry<F, V::Value>> = BTreeMap::new();
1098 let mut active_keys_delta: isize = 0;
1099 let mut user_steps: u64 = 0;
1100 for (key, old_loc) in &deleted {
1102 ops.push(Operation::Delete(key.clone()));
1103
1104 let base_old_loc = resolve_in_ancestors(&m.ancestors, key)
1105 .map_or(Some(*old_loc), DiffEntry::base_old_loc);
1106
1107 diff.insert(key.clone(), DiffEntry::Deleted { base_old_loc });
1108 active_keys_delta -= 1;
1109 user_steps += 1;
1110 }
1111
1112 for (key, (value, old_loc)) in updated {
1114 let new_loc = Location::new(m.base_size + ops.len() as u64);
1115 let next_key = find_next_key(&key, &next_candidates);
1116 ops.push(Operation::Update(update::Ordered {
1117 key: key.clone(),
1118 value: value.clone(),
1119 next_key,
1120 }));
1121
1122 let base_old_loc = resolve_in_ancestors(&m.ancestors, &key)
1123 .map_or(Some(old_loc), DiffEntry::base_old_loc);
1124
1125 diff.insert(
1126 key,
1127 DiffEntry::Active {
1128 value,
1129 loc: new_loc,
1130 base_old_loc,
1131 },
1132 );
1133 user_steps += 1;
1134 }
1135
1136 let mut created_keys: Vec<K> = Vec::with_capacity(created.len());
1138
1139 for (key, value, base_old_loc) in created {
1141 created_keys.push(key.clone());
1142 let new_loc = Location::new(m.base_size + ops.len() as u64);
1143 let next_key = find_next_key(&key, &next_candidates);
1144 ops.push(Operation::Update(update::Ordered {
1145 key: key.clone(),
1146 value: value.clone(),
1147 next_key,
1148 }));
1149 diff.insert(
1150 key,
1151 DiffEntry::Active {
1152 value,
1153 loc: new_loc,
1154 base_old_loc,
1155 },
1156 );
1157 active_keys_delta += 1;
1158 }
1159
1160 if !prev_candidates.is_empty() {
1162 for key in created_keys.iter().chain(deleted.keys()) {
1163 let (prev_key, (prev_value, prev_loc)) = find_prev_key(key, &prev_candidates);
1164 if diff.contains_key(prev_key) {
1165 continue;
1166 }
1167
1168 let prev_new_loc = Location::new(m.base_size + ops.len() as u64);
1169 let prev_next_key = find_next_key(prev_key, &next_candidates);
1170 ops.push(Operation::Update(update::Ordered {
1171 key: prev_key.clone(),
1172 value: prev_value.clone(),
1173 next_key: prev_next_key,
1174 }));
1175
1176 let prev_base_old_loc = resolve_in_ancestors(&m.ancestors, prev_key)
1177 .map_or(Some(*prev_loc), DiffEntry::base_old_loc);
1178
1179 diff.insert(
1180 prev_key.clone(),
1181 DiffEntry::Active {
1182 value: prev_value.clone(),
1183 loc: prev_new_loc,
1184 base_old_loc: prev_base_old_loc,
1185 },
1186 );
1187 user_steps += 1;
1188 }
1189 }
1190
1191 m.finish(
1193 ops,
1194 diff,
1195 active_keys_delta,
1196 user_steps,
1197 metadata,
1198 scan,
1199 reader,
1200 db,
1201 )
1202 .await
1203 }
1204}
1205
1206impl<F: Family, D: Digest, U: update::Update + Send + Sync> MerkleizedBatch<F, D, U>
1207where
1208 Operation<F, U>: Send + Sync,
1209{
1210 pub fn root(&self) -> D {
1212 self.journal_batch.root()
1213 }
1214
1215 pub(crate) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
1218 let mut next = self.parent.as_ref().and_then(Weak::upgrade);
1219 iter::from_fn(move || {
1220 let batch = next.take()?;
1221 next = batch.parent.as_ref().and_then(Weak::upgrade);
1222 Some(batch)
1223 })
1224 }
1225}
1226
1227impl<F: Family, D: Digest, U: update::Update + Send + Sync> MerkleizedBatch<F, D, U>
1228where
1229 Operation<F, U>: Codec,
1230{
1231 pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, U>
1237 where
1238 H: Hasher<Digest = D>,
1239 {
1240 UnmerkleizedBatch {
1241 journal_batch: self.journal_batch.new_batch::<H>(),
1242 mutations: BTreeMap::new(),
1243 base: Base::Child(Arc::clone(self)),
1244 }
1245 }
1246
1247 pub async fn get<E, C, I, H>(
1249 &self,
1250 key: &U::Key,
1251 db: &Db<F, E, C, I, H, U>,
1252 ) -> Result<Option<U::Value>, crate::qmdb::Error<F>>
1253 where
1254 E: Context,
1255 C: Contiguous<Item = Operation<F, U>>,
1256 I: UnorderedIndex<Value = Location<F>> + 'static,
1257 H: Hasher<Digest = D>,
1258 {
1259 if let Some(entry) = self.diff.get(key) {
1260 return Ok(entry.value().cloned());
1261 }
1262 for batch in self.ancestors() {
1265 if let Some(entry) = batch.diff.get(key) {
1266 return Ok(entry.value().cloned());
1267 }
1268 }
1269 db.get(key).await
1270 }
1271}
1272
1273impl<F, E, C, I, H, U> Db<F, E, C, I, H, U>
1274where
1275 F: Family,
1276 E: Context,
1277 U: update::Update + Send + Sync,
1278 C: Contiguous<Item = Operation<F, U>>,
1279 I: UnorderedIndex<Value = Location<F>>,
1280 H: Hasher,
1281 Operation<F, U>: Codec,
1282{
1283 pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, U> {
1285 let journal_size = *self.last_commit_loc + 1;
1287 UnmerkleizedBatch {
1288 journal_batch: self.log.new_batch(),
1289 mutations: BTreeMap::new(),
1290 base: Base::Db {
1291 db_size: journal_size,
1292 inactivity_floor_loc: self.inactivity_floor_loc,
1293 active_keys: self.active_keys,
1294 },
1295 }
1296 }
1297}
1298
1299impl<F, E, C, I, H, U> Db<F, E, C, I, H, U>
1300where
1301 F: Family,
1302 E: Context,
1303 U: update::Update + Send + Sync + 'static,
1304 C: Mutable<Item = Operation<F, U>> + crate::Persistable<Error = crate::journal::Error>,
1305 I: UnorderedIndex<Value = Location<F>>,
1306 H: Hasher,
1307 Operation<F, U>: Codec,
1308{
1309 pub async fn apply_batch(
1319 &mut self,
1320 batch: Arc<MerkleizedBatch<F, H::Digest, U>>,
1321 ) -> Result<Range<Location<F>>, crate::qmdb::Error<F>> {
1322 let db_size = *self.last_commit_loc + 1;
1323 let valid = db_size == batch.db_size
1326 || db_size == batch.base_size
1327 || batch.ancestor_diff_ends.contains(&db_size);
1328 if !valid {
1329 return Err(crate::qmdb::Error::StaleBatch {
1330 db_size,
1331 batch_db_size: batch.db_size,
1332 batch_base_size: batch.base_size,
1333 });
1334 }
1335 let start_loc = Location::new(db_size);
1336
1337 self.log.apply_batch(&batch.journal_batch).await?;
1339
1340 let mut committed_locs: BTreeMap<&U::Key, Option<Location<F>>> = BTreeMap::new();
1344 for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
1345 if batch.ancestor_diff_ends[i] <= db_size {
1346 for (key, entry) in ancestor_diff.iter() {
1347 committed_locs.entry(key).or_insert(entry.loc());
1349 }
1350 }
1351 }
1352
1353 let mut seen = BTreeSet::<&U::Key>::new();
1355 for (key, entry) in batch.diff.iter() {
1356 seen.insert(key);
1357 let base_old_loc = committed_locs
1358 .get(key)
1359 .copied()
1360 .unwrap_or_else(|| entry.base_old_loc());
1361 apply_snapshot_diff(&mut self.snapshot, key, entry, base_old_loc);
1362 }
1363
1364 for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
1366 if batch.ancestor_diff_ends[i] <= db_size {
1367 continue;
1368 }
1369 for (key, entry) in ancestor_diff.iter() {
1370 if !seen.insert(key) {
1371 continue;
1372 }
1373 let base_old_loc = committed_locs
1374 .get(key)
1375 .copied()
1376 .unwrap_or_else(|| entry.base_old_loc());
1377 apply_snapshot_diff(&mut self.snapshot, key, entry, base_old_loc);
1378 }
1379 }
1380
1381 self.active_keys = batch.total_active_keys;
1383 self.inactivity_floor_loc = batch.new_inactivity_floor_loc;
1384 self.last_commit_loc = batch.new_last_commit_loc;
1385
1386 let end_loc = Location::new(*self.last_commit_loc + 1);
1388 Ok(start_loc..end_loc)
1389 }
1390}
1391
1392impl<F: Family, E, C, I, H, U> Db<F, E, C, I, H, U>
1393where
1394 E: Context,
1395 U: update::Update + Send + Sync,
1396 C: Contiguous<Item = Operation<F, U>>,
1397 I: UnorderedIndex<Value = Location<F>>,
1398 H: Hasher,
1399 Operation<F, U>: Codec,
1400{
1401 pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, U>> {
1405 let journal_size = *self.last_commit_loc + 1;
1407 Arc::new(MerkleizedBatch {
1408 journal_batch: self.log.to_merkleized_batch(),
1409 diff: Arc::new(BTreeMap::new()),
1410 parent: None,
1411 new_inactivity_floor_loc: self.inactivity_floor_loc,
1412 new_last_commit_loc: self.last_commit_loc,
1413 base_size: journal_size,
1414 total_size: journal_size,
1415 total_active_keys: self.active_keys,
1416 db_size: journal_size,
1417 ancestor_diffs: Vec::new(),
1418 ancestor_diff_ends: Vec::new(),
1419 })
1420 }
1421}
1422
1423fn extract_update_value<F: Family, U: update::Update>(op: &Operation<F, U>) -> U::Value {
1425 match op {
1426 Operation::Update(update) => update.value().clone(),
1427 _ => unreachable!("floor raise should only re-append Update operations"),
1428 }
1429}
1430
1431#[cfg(any(test, feature = "test-traits"))]
1432mod trait_impls {
1433 use super::*;
1434 use crate::qmdb::any::traits::{
1435 BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
1436 UnmerkleizedBatch as UnmerkleizedBatchTrait,
1437 };
1438 use std::future::Future;
1439
1440 impl<F, K, V, H, E, C, I> UnmerkleizedBatchTrait<Db<F, E, C, I, H, update::Unordered<K, V>>>
1441 for UnmerkleizedBatch<F, H, update::Unordered<K, V>>
1442 where
1443 F: Family,
1444 K: Key,
1445 V: ValueEncoding + 'static,
1446 H: Hasher,
1447 E: Context,
1448 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
1449 I: UnorderedIndex<Value = Location<F>>,
1450 Operation<F, update::Unordered<K, V>>: Codec,
1451 {
1452 type Family = F;
1453 type K = K;
1454 type V = V::Value;
1455 type Metadata = V::Value;
1456 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>;
1457
1458 fn write(mut self, key: K, value: Option<V::Value>) -> Self {
1459 self.mutations.insert(key, value);
1460 self
1461 }
1462
1463 fn merkleize(
1464 self,
1465 db: &Db<F, E, C, I, H, update::Unordered<K, V>>,
1466 metadata: Option<V::Value>,
1467 ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
1468 self.merkleize(db, metadata)
1469 }
1470 }
1471
1472 impl<F, K, V, H, E, C, I> UnmerkleizedBatchTrait<Db<F, E, C, I, H, update::Ordered<K, V>>>
1473 for UnmerkleizedBatch<F, H, update::Ordered<K, V>>
1474 where
1475 F: Family,
1476 K: Key,
1477 V: ValueEncoding + 'static,
1478 H: Hasher,
1479 E: Context,
1480 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
1481 I: OrderedIndex<Value = Location<F>>,
1482 Operation<F, update::Ordered<K, V>>: Codec,
1483 {
1484 type Family = F;
1485 type K = K;
1486 type V = V::Value;
1487 type Metadata = V::Value;
1488 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>;
1489
1490 fn write(mut self, key: K, value: Option<V::Value>) -> Self {
1491 self.mutations.insert(key, value);
1492 self
1493 }
1494
1495 fn merkleize(
1496 self,
1497 db: &Db<F, E, C, I, H, update::Ordered<K, V>>,
1498 metadata: Option<V::Value>,
1499 ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
1500 self.merkleize(db, metadata)
1501 }
1502 }
1503
1504 impl<F: Family, D: Digest, U: update::Update + Send + Sync + 'static> MerkleizedBatchTrait
1505 for Arc<MerkleizedBatch<F, D, U>>
1506 where
1507 Operation<F, U>: Codec,
1508 {
1509 type Digest = D;
1510
1511 fn root(&self) -> D {
1512 MerkleizedBatch::root(self)
1513 }
1514 }
1515
1516 impl<F, E, K, V, C, I, H> BatchableDb for Db<F, E, C, I, H, update::Unordered<K, V>>
1517 where
1518 F: Family,
1519 E: Context,
1520 K: Key,
1521 V: ValueEncoding + 'static,
1522 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
1523 + crate::Persistable<Error = crate::journal::Error>,
1524 I: UnorderedIndex<Value = Location<F>>,
1525 H: Hasher,
1526 Operation<F, update::Unordered<K, V>>: Codec,
1527 {
1528 type Family = F;
1529 type K = K;
1530 type V = V::Value;
1531 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>>>;
1532 type Batch = UnmerkleizedBatch<F, H, update::Unordered<K, V>>;
1533
1534 fn new_batch(&self) -> Self::Batch {
1535 self.new_batch()
1536 }
1537
1538 fn apply_batch(
1539 &mut self,
1540 batch: Self::Merkleized,
1541 ) -> impl Future<Output = Result<Range<Location<F>>, crate::qmdb::Error<F>>> {
1542 self.apply_batch(batch)
1543 }
1544 }
1545
1546 impl<F, E, K, V, C, I, H> BatchableDb for Db<F, E, C, I, H, update::Ordered<K, V>>
1547 where
1548 F: Family,
1549 E: Context,
1550 K: Key,
1551 V: ValueEncoding + 'static,
1552 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
1553 + crate::Persistable<Error = crate::journal::Error>,
1554 I: OrderedIndex<Value = Location<F>>,
1555 H: Hasher,
1556 Operation<F, update::Ordered<K, V>>: Codec,
1557 {
1558 type Family = F;
1559 type K = K;
1560 type V = V::Value;
1561 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>>>;
1562 type Batch = UnmerkleizedBatch<F, H, update::Ordered<K, V>>;
1563
1564 fn new_batch(&self) -> Self::Batch {
1565 self.new_batch()
1566 }
1567
1568 fn apply_batch(
1569 &mut self,
1570 batch: Self::Merkleized,
1571 ) -> impl Future<Output = Result<Range<Location<F>>, crate::qmdb::Error<F>>> {
1572 self.apply_batch(batch)
1573 }
1574 }
1575}
1576
1577#[cfg(test)]
1578mod tests {
1579 use super::*;
1580 use crate::{
1581 mmr,
1582 qmdb::any::{
1583 ordered::fixed::Db as OrderedFixedDb,
1584 test::{colliding_digest, fixed_db_config},
1585 unordered::fixed::Db as UnorderedFixedDb,
1586 },
1587 translator::OneCap,
1588 };
1589 use commonware_cryptography::{sha256, Sha256};
1590 use commonware_runtime::{deterministic, Runner as _};
1591
1592 fn extract_parent_deleted_creates<K: Ord + Clone, V: Clone>(
1595 mutations: &mut BTreeMap<K, Option<V>>,
1596 base_diff: &BTreeMap<K, DiffEntry<mmr::Family, V>>,
1597 ) -> BTreeMap<K, (V, Option<crate::mmr::Location>)> {
1598 let creates: BTreeMap<_, _> = mutations
1599 .iter()
1600 .filter_map(|(key, value)| {
1601 if let Some(DiffEntry::Deleted { base_old_loc }) = base_diff.get(key) {
1602 if let Some(value) = value {
1603 return Some((key.clone(), (value.clone(), *base_old_loc)));
1604 }
1605 }
1606 None
1607 })
1608 .collect();
1609 for key in creates.keys() {
1610 mutations.remove(key);
1611 }
1612 creates
1613 }
1614
1615 #[test]
1616 fn extract_parent_deleted_creates_basic() {
1617 let mut mutations: BTreeMap<u64, Option<u64>> = BTreeMap::new();
1618 mutations.insert(1, Some(100)); mutations.insert(2, None); mutations.insert(3, Some(300)); let mut base_diff: BTreeMap<u64, DiffEntry<mmr::Family, u64>> = BTreeMap::new();
1623 base_diff.insert(
1624 1,
1625 DiffEntry::Deleted {
1626 base_old_loc: Some(crate::mmr::Location::new(5)),
1627 },
1628 );
1629 base_diff.insert(
1630 4,
1631 DiffEntry::Active {
1632 value: 400,
1633 loc: crate::mmr::Location::new(10),
1634 base_old_loc: None,
1635 },
1636 );
1637
1638 let creates = extract_parent_deleted_creates(&mut mutations, &base_diff);
1639
1640 assert_eq!(creates.len(), 1);
1642 let (value, base_old_loc) = creates.get(&1).unwrap();
1643 assert_eq!(*value, 100);
1644 assert_eq!(*base_old_loc, Some(crate::mmr::Location::new(5)));
1645
1646 assert_eq!(mutations.len(), 2);
1648 assert!(mutations.contains_key(&2));
1649 assert!(mutations.contains_key(&3));
1650 }
1651
1652 #[test]
1653 fn extract_parent_deleted_creates_delete_not_extracted() {
1654 let mut mutations: BTreeMap<u64, Option<u64>> = BTreeMap::new();
1655 mutations.insert(1, None); let mut base_diff: BTreeMap<u64, DiffEntry<mmr::Family, u64>> = BTreeMap::new();
1658 base_diff.insert(
1659 1,
1660 DiffEntry::Deleted {
1661 base_old_loc: Some(crate::mmr::Location::new(5)),
1662 },
1663 );
1664
1665 let creates = extract_parent_deleted_creates(&mut mutations, &base_diff);
1666
1667 assert!(creates.is_empty());
1669 assert_eq!(mutations.len(), 1);
1671 assert!(mutations.contains_key(&1));
1672 }
1673
1674 #[test]
1675 fn read_ops_resolves_committed_ancestor_and_current_sources() {
1676 let runner = deterministic::Runner::default();
1677 runner.start(|context| async move {
1678 type TestDb = UnorderedFixedDb<
1679 mmr::Family,
1680 deterministic::Context,
1681 sha256::Digest,
1682 sha256::Digest,
1683 Sha256,
1684 OneCap,
1685 >;
1686
1687 let config = fixed_db_config::<OneCap>("read-locations-all-sources", &context);
1688 let mut db = TestDb::init(context, config).await.unwrap();
1689
1690 let key_db = colliding_digest(0x30, 0);
1691 let value_db = colliding_digest(0x30, 1);
1692 let key_parent = colliding_digest(0x31, 0);
1693 let value_parent = colliding_digest(0x31, 1);
1694 let key_current = colliding_digest(0x32, 0);
1695 let value_current = colliding_digest(0x32, 1);
1696
1697 let seed = db
1699 .new_batch()
1700 .write(key_db, Some(value_db))
1701 .merkleize(&db, None)
1702 .await
1703 .unwrap();
1704 db.apply_batch(seed).await.unwrap();
1705 db.commit().await.unwrap();
1706
1707 let committed_loc = db.snapshot.get(&key_db).next().copied().unwrap();
1708
1709 let parent = db
1711 .new_batch()
1712 .write(key_parent, Some(value_parent))
1713 .merkleize(&db, None)
1714 .await
1715 .unwrap();
1716 let parent_loc = parent.diff.get(&key_parent).unwrap().loc().unwrap();
1717
1718 let child = parent
1720 .new_batch::<Sha256>()
1721 .write(key_current, Some(value_current));
1722 let (_mutations, merkleizer) = child.into_parts();
1723
1724 let current_loc = Location::new(merkleizer.base_size);
1725 let batch_ops = vec![Operation::Update(update::Unordered(
1726 key_current,
1727 value_current,
1728 ))];
1729
1730 let reader = db.log.reader().await;
1732 let ops = merkleizer
1733 .read_ops(
1734 &[committed_loc, parent_loc, current_loc],
1735 &batch_ops,
1736 &reader,
1737 )
1738 .await
1739 .unwrap();
1740 drop(reader);
1741
1742 assert_eq!(
1743 ops,
1744 vec![
1745 Operation::Update(update::Unordered(key_db, value_db)),
1746 Operation::Update(update::Unordered(key_parent, value_parent)),
1747 Operation::Update(update::Unordered(key_current, value_current)),
1748 ]
1749 );
1750
1751 let reader = db.log.reader().await;
1753 let disk_op = merkleizer
1754 .read_op(committed_loc, &batch_ops, &reader)
1755 .await
1756 .unwrap();
1757 assert_eq!(
1758 disk_op,
1759 Operation::Update(update::Unordered(key_db, value_db))
1760 );
1761
1762 let ancestor_op = merkleizer
1763 .read_op(parent_loc, &batch_ops, &reader)
1764 .await
1765 .unwrap();
1766 assert_eq!(
1767 ancestor_op,
1768 Operation::Update(update::Unordered(key_parent, value_parent))
1769 );
1770
1771 let current_op = merkleizer
1772 .read_op(current_loc, &batch_ops, &reader)
1773 .await
1774 .unwrap();
1775 assert_eq!(
1776 current_op,
1777 Operation::Update(update::Unordered(key_current, value_current))
1778 );
1779 drop(reader);
1780
1781 db.destroy().await.unwrap();
1782 });
1783 }
1784
1785 #[test]
1786 fn child_root_matches_between_pending_and_committed_paths_under_collisions() {
1787 let runner = deterministic::Runner::default();
1788 runner.start(|context| async move {
1789 type TestDb = UnorderedFixedDb<
1790 mmr::Family,
1791 deterministic::Context,
1792 sha256::Digest,
1793 sha256::Digest,
1794 Sha256,
1795 OneCap,
1796 >;
1797
1798 let config = fixed_db_config::<OneCap>("batch-collision-regression", &context);
1799 let mut db = TestDb::init(context, config).await.unwrap();
1800 let key_a = colliding_digest(0xAA, 1);
1801 let key_b = colliding_digest(0xAA, 0);
1802
1803 let mut initial = db.new_batch();
1808 for i in 0..4 {
1809 initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
1810 }
1811 let initial = initial.merkleize(&db, None).await.unwrap();
1812 db.apply_batch(initial).await.unwrap();
1813 db.commit().await.unwrap();
1814
1815 let parent = db
1819 .new_batch()
1820 .write(key_a, Some(colliding_digest(0xCC, 1)))
1821 .merkleize(&db, None)
1822 .await
1823 .unwrap();
1824 assert!(
1825 !parent.diff.contains_key(&key_b),
1826 "regression requires a sibling collision to remain only in the committed snapshot"
1827 );
1828
1829 let pending_child = parent
1835 .new_batch::<Sha256>()
1836 .write(key_a, Some(colliding_digest(0xDD, 1)))
1837 .write(key_b, Some(colliding_digest(0xDD, 0)))
1838 .merkleize(&db, None)
1839 .await
1840 .unwrap();
1841
1842 let pending_root = pending_child.root();
1843
1844 db.apply_batch(parent).await.unwrap();
1845 db.commit().await.unwrap();
1846
1847 let committed_child = db
1848 .new_batch()
1849 .write(key_a, Some(colliding_digest(0xDD, 1)))
1850 .write(key_b, Some(colliding_digest(0xDD, 0)))
1851 .merkleize(&db, None)
1852 .await
1853 .unwrap();
1854
1855 assert_eq!(pending_root, committed_child.root());
1856
1857 db.apply_batch(pending_child).await.unwrap();
1860 assert_eq!(db.root(), committed_child.root());
1861
1862 db.destroy().await.unwrap();
1863 });
1864 }
1865
1866 #[test]
1867 fn ordered_child_root_matches_between_pending_and_committed_paths_under_collisions() {
1868 let runner = deterministic::Runner::default();
1869 runner.start(|context| async move {
1870 type TestDb = OrderedFixedDb<
1871 mmr::Family,
1872 deterministic::Context,
1873 sha256::Digest,
1874 sha256::Digest,
1875 Sha256,
1876 OneCap,
1877 >;
1878
1879 let config = fixed_db_config::<OneCap>("ordered-batch-collision-regression", &context);
1880 let mut db = TestDb::init(context, config).await.unwrap();
1881 let key_a = colliding_digest(0xAA, 1);
1882 let key_b = colliding_digest(0xAA, 0);
1883
1884 let mut initial = db.new_batch();
1887 for i in 0..4 {
1888 initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
1889 }
1890 let initial = initial.merkleize(&db, None).await.unwrap();
1891 db.apply_batch(initial).await.unwrap();
1892 db.commit().await.unwrap();
1893
1894 let parent = db
1898 .new_batch()
1899 .write(key_a, Some(colliding_digest(0xCC, 1)))
1900 .merkleize(&db, None)
1901 .await
1902 .unwrap();
1903 assert!(
1904 !parent.diff.contains_key(&key_b),
1905 "ordered regression requires a sibling collision to remain only in the committed snapshot"
1906 );
1907
1908 let pending_child = parent
1911 .new_batch::<Sha256>()
1912 .write(key_a, Some(colliding_digest(0xDD, 1)))
1913 .write(key_b, Some(colliding_digest(0xDD, 0)))
1914 .merkleize(&db, None)
1915 .await
1916 .unwrap();
1917
1918 let pending_root = pending_child.root();
1919
1920 db.apply_batch(parent).await.unwrap();
1921 db.commit().await.unwrap();
1922
1923 let committed_child = db
1924 .new_batch()
1925 .write(key_a, Some(colliding_digest(0xDD, 1)))
1926 .write(key_b, Some(colliding_digest(0xDD, 0)))
1927 .merkleize(&db, None)
1928 .await
1929 .unwrap();
1930
1931 assert_eq!(pending_root, committed_child.root());
1932
1933 db.apply_batch(pending_child).await.unwrap();
1936 assert_eq!(db.root(), committed_child.root());
1937
1938 db.destroy().await.unwrap();
1939 });
1940 }
1941
1942 #[test]
1943 fn sequential_commit_basic() {
1944 let runner = deterministic::Runner::default();
1947 runner.start(|context| async move {
1948 type TestDb = UnorderedFixedDb<
1949 mmr::Family,
1950 deterministic::Context,
1951 sha256::Digest,
1952 sha256::Digest,
1953 Sha256,
1954 OneCap,
1955 >;
1956
1957 let config = fixed_db_config::<OneCap>("seq-commit-basic", &context);
1958 let mut db = TestDb::init(context, config).await.unwrap();
1959
1960 let seed = db
1962 .new_batch()
1963 .write(colliding_digest(0x01, 0), Some(colliding_digest(0x01, 1)))
1964 .merkleize(&db, None)
1965 .await
1966 .unwrap();
1967 db.apply_batch(seed).await.unwrap();
1968 db.commit().await.unwrap();
1969
1970 let key_a = colliding_digest(0x02, 0);
1972 let val_a = colliding_digest(0x02, 1);
1973 let batch_a = db
1974 .new_batch()
1975 .write(key_a, Some(val_a))
1976 .merkleize(&db, None)
1977 .await
1978 .unwrap();
1979
1980 let key_b = colliding_digest(0x03, 0);
1982 let val_b = colliding_digest(0x03, 1);
1983 let batch_b = batch_a
1984 .new_batch::<Sha256>()
1985 .write(key_b, Some(val_b))
1986 .merkleize(&db, None)
1987 .await
1988 .unwrap();
1989
1990 db.apply_batch(batch_a).await.unwrap();
1991 db.commit().await.unwrap();
1992
1993 let committed_b = db
1995 .new_batch()
1996 .write(key_b, Some(val_b))
1997 .merkleize(&db, None)
1998 .await
1999 .unwrap();
2000 assert_eq!(batch_b.root(), committed_b.root());
2001
2002 db.apply_batch(batch_b).await.unwrap();
2004 assert_eq!(db.root(), committed_b.root());
2005
2006 db.destroy().await.unwrap();
2007 });
2008 }
2009
2010 #[test]
2011 fn sequential_commit_fixes_base_old_loc() {
2012 let runner = deterministic::Runner::default();
2015 runner.start(|context| async move {
2016 type TestDb = UnorderedFixedDb<
2017 mmr::Family,
2018 deterministic::Context,
2019 sha256::Digest,
2020 sha256::Digest,
2021 Sha256,
2022 OneCap,
2023 >;
2024
2025 let config = fixed_db_config::<OneCap>("seq-commit-base-old-loc", &context);
2026 let mut db = TestDb::init(context, config).await.unwrap();
2027
2028 let key = colliding_digest(0x10, 0);
2030 let seed = db
2031 .new_batch()
2032 .write(key, Some(colliding_digest(0x10, 1)))
2033 .merkleize(&db, None)
2034 .await
2035 .unwrap();
2036 db.apply_batch(seed).await.unwrap();
2037 db.commit().await.unwrap();
2038
2039 let val_a = colliding_digest(0x10, 2);
2041 let batch_a = db
2042 .new_batch()
2043 .write(key, Some(val_a))
2044 .merkleize(&db, None)
2045 .await
2046 .unwrap();
2047
2048 let a_entry = batch_a.diff.get(&key).unwrap();
2050 let a_loc = a_entry.loc();
2051 assert!(a_loc.is_some());
2052
2053 let val_b = colliding_digest(0x10, 3);
2055 let batch_b = batch_a
2056 .new_batch::<Sha256>()
2057 .write(key, Some(val_b))
2058 .merkleize(&db, None)
2059 .await
2060 .unwrap();
2061
2062 db.apply_batch(batch_a).await.unwrap();
2065 db.commit().await.unwrap();
2066
2067 let committed_b = db
2069 .new_batch()
2070 .write(key, Some(val_b))
2071 .merkleize(&db, None)
2072 .await
2073 .unwrap();
2074 assert_eq!(batch_b.root(), committed_b.root());
2075
2076 db.apply_batch(batch_b).await.unwrap();
2077 assert_eq!(db.root(), committed_b.root());
2078
2079 db.destroy().await.unwrap();
2080 });
2081 }
2082
2083 #[test]
2084 fn fork_apply_after_parent_committed() {
2085 let runner = deterministic::Runner::default();
2088 runner.start(|context| async move {
2089 type TestDb = UnorderedFixedDb<
2090 mmr::Family,
2091 deterministic::Context,
2092 sha256::Digest,
2093 sha256::Digest,
2094 Sha256,
2095 OneCap,
2096 >;
2097
2098 let config = fixed_db_config::<OneCap>("fork-after-commit", &context);
2099 let mut db = TestDb::init(context, config).await.unwrap();
2100
2101 let seed = db
2103 .new_batch()
2104 .write(colliding_digest(0x20, 0), Some(colliding_digest(0x20, 1)))
2105 .merkleize(&db, None)
2106 .await
2107 .unwrap();
2108 db.apply_batch(seed).await.unwrap();
2109 db.commit().await.unwrap();
2110
2111 let key_a = colliding_digest(0x21, 0);
2113 let val_a = colliding_digest(0x21, 1);
2114 let batch_a = db
2115 .new_batch()
2116 .write(key_a, Some(val_a))
2117 .merkleize(&db, None)
2118 .await
2119 .unwrap();
2120
2121 let key_b = colliding_digest(0x22, 0);
2123 let val_b = colliding_digest(0x22, 1);
2124 let batch_b = batch_a
2125 .new_batch::<Sha256>()
2126 .write(key_b, Some(val_b))
2127 .merkleize(&db, None)
2128 .await
2129 .unwrap();
2130 let key_c = colliding_digest(0x23, 0);
2131 let val_c = colliding_digest(0x23, 1);
2132 let batch_c = batch_a
2133 .new_batch::<Sha256>()
2134 .write(key_c, Some(val_c))
2135 .merkleize(&db, None)
2136 .await
2137 .unwrap();
2138
2139 db.apply_batch(batch_a).await.unwrap();
2140 db.commit().await.unwrap();
2141
2142 let committed_b = db
2144 .new_batch()
2145 .write(key_b, Some(val_b))
2146 .merkleize(&db, None)
2147 .await
2148 .unwrap();
2149 assert_eq!(batch_b.root(), committed_b.root());
2150
2151 let committed_c = db
2152 .new_batch()
2153 .write(key_c, Some(val_c))
2154 .merkleize(&db, None)
2155 .await
2156 .unwrap();
2157 assert_eq!(batch_c.root(), committed_c.root());
2158
2159 db.destroy().await.unwrap();
2160 });
2161 }
2162
2163 #[test]
2164 fn sequential_commit_three_deep() {
2165 let runner = deterministic::Runner::default();
2168 runner.start(|context| async move {
2169 type TestDb = UnorderedFixedDb<
2170 mmr::Family,
2171 deterministic::Context,
2172 sha256::Digest,
2173 sha256::Digest,
2174 Sha256,
2175 OneCap,
2176 >;
2177
2178 let config = fixed_db_config::<OneCap>("ff-cross", &context);
2179 let mut db = TestDb::init(context, config).await.unwrap();
2180
2181 let grandparent = db
2183 .new_batch()
2184 .write(colliding_digest(0x01, 0), Some(colliding_digest(0x01, 1)))
2185 .write(colliding_digest(0x02, 0), Some(colliding_digest(0x02, 1)))
2186 .merkleize(&db, None)
2187 .await
2188 .unwrap();
2189
2190 let parent = grandparent
2192 .new_batch::<Sha256>()
2193 .write(colliding_digest(0x03, 0), Some(colliding_digest(0x03, 1)))
2194 .merkleize(&db, None)
2195 .await
2196 .unwrap();
2197
2198 let child = parent
2200 .new_batch::<Sha256>()
2201 .write(colliding_digest(0x04, 0), Some(colliding_digest(0x04, 1)))
2202 .merkleize(&db, None)
2203 .await
2204 .unwrap();
2205
2206 db.apply_batch(grandparent).await.unwrap();
2208 db.commit().await.unwrap();
2209
2210 db.apply_batch(parent).await.unwrap();
2212 db.commit().await.unwrap();
2213
2214 db.apply_batch(child).await.unwrap();
2216
2217 for i in 1..=4 {
2219 assert_eq!(
2220 db.get(&colliding_digest(i, 0)).await.unwrap(),
2221 Some(colliding_digest(i, 1))
2222 );
2223 }
2224
2225 db.destroy().await.unwrap();
2226 });
2227 }
2228
2229 #[test]
2234 fn recreate_deleted_key_with_collision_sibling_root_matches() {
2235 let runner = deterministic::Runner::default();
2236 runner.start(|context| async move {
2237 type TestDb = UnorderedFixedDb<
2238 mmr::Family,
2239 deterministic::Context,
2240 sha256::Digest,
2241 sha256::Digest,
2242 Sha256,
2243 OneCap,
2244 >;
2245
2246 let config = fixed_db_config::<OneCap>("recreate-deleted-collision", &context);
2247 let mut db = TestDb::init(context, config).await.unwrap();
2248
2249 let k0 = colliding_digest(0xAA, 0);
2251 let k6 = colliding_digest(0xAA, 6);
2252
2253 let initial = db
2255 .new_batch()
2256 .write(k0, Some(colliding_digest(0xBB, 0)))
2257 .write(k6, Some(colliding_digest(0xBB, 6)))
2258 .merkleize(&db, None)
2259 .await
2260 .unwrap();
2261 db.apply_batch(initial).await.unwrap();
2262 db.commit().await.unwrap();
2263
2264 let parent = db
2266 .new_batch()
2267 .write(k0, None)
2268 .merkleize(&db, None)
2269 .await
2270 .unwrap();
2271
2272 let k29 = colliding_digest(0xAA, 29);
2274 let pending_child = parent
2275 .new_batch::<Sha256>()
2276 .write(k0, Some(colliding_digest(0xCC, 0)))
2277 .write(k29, Some(colliding_digest(0xCC, 29)))
2278 .merkleize(&db, None)
2279 .await
2280 .unwrap();
2281
2282 db.apply_batch(parent).await.unwrap();
2284 db.commit().await.unwrap();
2285
2286 let committed_child = db
2287 .new_batch()
2288 .write(k0, Some(colliding_digest(0xCC, 0)))
2289 .write(k29, Some(colliding_digest(0xCC, 29)))
2290 .merkleize(&db, None)
2291 .await
2292 .unwrap();
2293
2294 assert_eq!(
2295 pending_child.root(),
2296 committed_child.root(),
2297 "root depended on pending-vs-committed parent path \
2298 when re-creating a deleted key with collision siblings"
2299 );
2300
2301 db.destroy().await.unwrap();
2302 });
2303 }
2304}