1use crate::{
6 index::Unordered as UnorderedIndex,
7 journal::contiguous::{Contiguous, Mutable},
8 merkle::{
9 self, batch::MerkleizedBatch as GenericMerkleizedBatch, mem::Mem,
10 storage::Storage as MerkleStorage, Graftable, Location, Position, Readable,
11 },
12 qmdb::{
13 self,
14 any::{
15 self,
16 batch::{lookup_sorted, DiffEntry},
17 operation::{update, Operation},
18 ValueEncoding,
19 },
20 batch_chain::Bounds,
21 bitmap::Shared,
22 current::{
23 db::{compute_db_root, compute_grafted_leaves},
24 grafting,
25 },
26 operation::Key,
27 Error,
28 },
29 Context,
30};
31use ahash::AHasher;
32use commonware_codec::Codec;
33use commonware_cryptography::{Digest, Hasher};
34use commonware_parallel::Strategy;
35use commonware_utils::bitmap::{self, Readable as _};
36use std::{
37 collections::{BTreeSet, HashMap},
38 hash::BuildHasherDefault,
39 sync::Arc,
40};
41
42#[derive(Clone, Debug, Default)]
48pub(crate) struct ChunkOverlay<const N: usize> {
49 pub(crate) chunks: HashMap<usize, [u8; N], BuildHasherDefault<AHasher>>,
54 pub(crate) len: u64,
56}
57
58impl<const N: usize> ChunkOverlay<N> {
59 const CHUNK_BITS: u64 = bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
60
61 fn new(len: u64) -> Self {
62 Self {
63 chunks: HashMap::default(),
64 len,
65 }
66 }
67
68 fn chunk_mut<B: bitmap::Readable<N>>(&mut self, base: &B, idx: usize) -> &mut [u8; N] {
71 self.chunks.entry(idx).or_insert_with(|| {
72 let base_len = base.len();
73 let base_complete = base.complete_chunks();
74 let base_has_partial = !base_len.is_multiple_of(Self::CHUNK_BITS);
75 if idx < base_complete {
76 base.get_chunk(idx)
77 } else if idx == base_complete && base_has_partial {
78 base.last_chunk().0
79 } else {
80 [0u8; N]
81 }
82 })
83 }
84
85 fn set_bit<B: bitmap::Readable<N>>(&mut self, base: &B, loc: u64) {
87 let idx = bitmap::Prunable::<N>::to_chunk_index(loc);
88 let rel = (loc % Self::CHUNK_BITS) as usize;
89 let chunk = self.chunk_mut(base, idx);
90 chunk[rel / 8] |= 1 << (rel % 8);
91 }
92
93 fn clear_bit<B: bitmap::Readable<N>>(&mut self, base: &B, pruned_chunks: usize, loc: u64) {
97 let idx = bitmap::Prunable::<N>::to_chunk_index(loc);
98 if idx < pruned_chunks {
99 return;
100 }
101 let rel = (loc % Self::CHUNK_BITS) as usize;
102 let chunk = self.chunk_mut(base, idx);
103 chunk[rel / 8] &= !(1 << (rel % 8));
104 }
105
106 pub(crate) fn get(&self, idx: usize) -> Option<&[u8; N]> {
108 self.chunks.get(&idx)
109 }
110
111 pub(crate) const fn complete_chunks(&self) -> usize {
113 (self.len / Self::CHUNK_BITS) as usize
114 }
115}
116
117pub(crate) fn next_candidate<F: Graftable, B: bitmap::Readable<N>, const N: usize>(
131 bitmap: &B,
132 floor: Location<F>,
133 tip: u64,
134) -> Option<Location<F>> {
135 let floor = *floor;
136 let bitmap_len = bitmap.len();
137 let committed_end = bitmap_len.min(tip);
138 if floor < committed_end {
139 if let Some(idx) = bitmap.ones_iter_from(floor).next() {
140 if idx < committed_end {
141 return Some(Location::<F>::new(idx));
142 }
143 }
144 }
145 let candidate = floor.max(bitmap_len);
146 (candidate < tip).then(|| Location::<F>::new(candidate))
147}
148
149struct BatchStorageAdapter<
155 'a,
156 F: Graftable,
157 D: Digest,
158 R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
159 S: MerkleStorage<F, Digest = D>,
160> {
161 batch: &'a R,
162 base: &'a S,
163 _phantom: core::marker::PhantomData<(F, D)>,
164}
165
166impl<
167 'a,
168 F: Graftable,
169 D: Digest,
170 R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
171 S: MerkleStorage<F, Digest = D>,
172 > BatchStorageAdapter<'a, F, D, R, S>
173{
174 const fn new(batch: &'a R, base: &'a S) -> Self {
175 Self {
176 batch,
177 base,
178 _phantom: core::marker::PhantomData,
179 }
180 }
181}
182
183impl<
184 F: Graftable,
185 D: Digest,
186 R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
187 S: MerkleStorage<F, Digest = D>,
188 > MerkleStorage<F> for BatchStorageAdapter<'_, F, D, R, S>
189{
190 type Digest = D;
191
192 async fn size(&self) -> Position<F> {
193 self.batch.size()
194 }
195 async fn get_node(&self, pos: Position<F>) -> Result<Option<D>, merkle::Error<F>> {
196 if let Some(node) = self.batch.get_node(pos) {
197 return Ok(Some(node));
198 }
199 self.base.get_node(pos).await
200 }
201}
202
203struct BatchOverMem<'a, F: Graftable, D: Digest, S: Strategy> {
208 batch: &'a GenericMerkleizedBatch<F, D, S>,
209 mem: &'a Mem<F, D>,
210}
211
212impl<F: Graftable, D: Digest, S: Strategy> Readable for BatchOverMem<'_, F, D, S> {
213 type Family = F;
214 type Digest = D;
215 type Error = merkle::Error<F>;
216
217 fn size(&self) -> Position<F> {
218 self.batch.size()
219 }
220
221 fn get_node(&self, pos: Position<F>) -> Option<D> {
222 if let Some(d) = self.batch.get_node(pos) {
223 return Some(d);
224 }
225 self.mem.get_node(pos)
226 }
227
228 fn pruning_boundary(&self) -> Location<F> {
229 self.batch.pruning_boundary()
230 }
231}
232
233pub struct UnmerkleizedBatch<F, H, U, const N: usize, S: Strategy>
239where
240 F: Graftable,
241 U: update::Update + Send + Sync,
242 H: Hasher,
243 Operation<F, U>: Codec,
244{
245 inner: any::batch::UnmerkleizedBatch<F, H, U, S>,
247
248 grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>>,
250
251 bitmap_parent: BitmapBatch<N>,
253}
254
255pub struct MerkleizedBatch<
293 F: Graftable,
294 D: Digest,
295 U: update::Update + Send + Sync,
296 const N: usize,
297 S: Strategy,
298> where
299 Operation<F, U>: Send + Sync,
300{
301 pub(crate) inner: Arc<any::batch::MerkleizedBatch<F, D, U, S>>,
303
304 pub(crate) grafted: Arc<merkle::batch::MerkleizedBatch<F, D, S>>,
306
307 pub(crate) bitmap: BitmapBatch<N>,
309
310 pub(crate) canonical_root: D,
312}
313
314impl<F, H, U, const N: usize, S: Strategy> UnmerkleizedBatch<F, H, U, N, S>
315where
316 F: Graftable,
317 U: update::Update + Send + Sync,
318 H: Hasher,
319 Operation<F, U>: Codec,
320{
321 pub(super) const fn new(
322 inner: any::batch::UnmerkleizedBatch<F, H, U, S>,
323 grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>>,
324 bitmap_parent: BitmapBatch<N>,
325 ) -> Self {
326 Self {
327 inner,
328 grafted_parent,
329 bitmap_parent,
330 }
331 }
332
333 pub fn write(mut self, key: U::Key, value: Option<U::Value>) -> Self {
338 self.inner = self.inner.write(key, value);
339 self
340 }
341}
342
343impl<F, K, V, H, const N: usize, S: Strategy> UnmerkleizedBatch<F, H, update::Unordered<K, V>, N, S>
345where
346 F: Graftable,
347 K: Key,
348 V: ValueEncoding,
349 H: Hasher,
350 Operation<F, update::Unordered<K, V>>: Codec,
351{
352 pub async fn get<E, C, I>(
354 &self,
355 key: &K,
356 db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
357 ) -> Result<Option<V::Value>, Error<F>>
358 where
359 E: Context,
360 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
361 I: UnorderedIndex<Value = Location<F>> + 'static,
362 {
363 self.inner.get(key, &db.any).await
364 }
365
366 pub async fn get_many<E, C, I>(
370 &self,
371 keys: &[&K],
372 db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
373 ) -> Result<Vec<Option<V::Value>>, Error<F>>
374 where
375 E: Context,
376 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
377 I: UnorderedIndex<Value = Location<F>> + 'static,
378 {
379 self.inner.get_many(keys, &db.any).await
380 }
381
382 pub async fn merkleize<E, C, I>(
384 self,
385 db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N, S>,
386 metadata: Option<V::Value>,
387 ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N, S>>, Error<F>>
388 where
389 E: Context,
390 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
391 I: UnorderedIndex<Value = Location<F>> + 'static,
392 {
393 let Self {
394 inner,
395 grafted_parent,
396 bitmap_parent,
397 } = self;
398 let inner = inner
400 .merkleize_with_floor_scan(&db.any, metadata, |floor, tip| {
401 next_candidate(&bitmap_parent, floor, tip)
402 })
403 .await?;
404 compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
405 }
406}
407
408impl<F, K, V, H, const N: usize, S: Strategy> UnmerkleizedBatch<F, H, update::Ordered<K, V>, N, S>
410where
411 F: Graftable,
412 K: Key,
413 V: ValueEncoding,
414 H: Hasher,
415 Operation<F, update::Ordered<K, V>>: Codec,
416{
417 pub async fn get<E, C, I>(
419 &self,
420 key: &K,
421 db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
422 ) -> Result<Option<V::Value>, Error<F>>
423 where
424 E: Context,
425 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
426 I: crate::index::Ordered<Value = Location<F>> + 'static,
427 {
428 self.inner.get(key, &db.any).await
429 }
430
431 pub async fn get_many<E, C, I>(
435 &self,
436 keys: &[&K],
437 db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
438 ) -> Result<Vec<Option<V::Value>>, Error<F>>
439 where
440 E: Context,
441 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
442 I: crate::index::Ordered<Value = Location<F>> + 'static,
443 {
444 self.inner.get_many(keys, &db.any).await
445 }
446
447 pub async fn merkleize<E, C, I>(
449 self,
450 db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N, S>,
451 metadata: Option<V::Value>,
452 ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N, S>>, Error<F>>
453 where
454 E: Context,
455 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
456 I: crate::index::Ordered<Value = Location<F>> + 'static,
457 {
458 let Self {
459 inner,
460 grafted_parent,
461 bitmap_parent,
462 } = self;
463 let inner = inner
465 .merkleize_with_floor_scan(&db.any, metadata, |floor, tip| {
466 next_candidate(&bitmap_parent, floor, tip)
467 })
468 .await?;
469 compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
470 }
471}
472
473#[allow(clippy::type_complexity)]
483fn build_chunk_overlay<F: Graftable, U, B: bitmap::Readable<N>, const N: usize>(
484 base: &B,
485 batch_len: usize,
486 batch_base: u64,
487 diff: &[(U::Key, DiffEntry<F, U::Value>)],
488 ancestor_diffs: &[Arc<Vec<(U::Key, DiffEntry<F, U::Value>)>>],
489) -> ChunkOverlay<N>
490where
491 U: update::Update,
492{
493 let total_bits = base.len() + batch_len as u64;
494 let mut overlay = ChunkOverlay::new(total_bits);
495 let pruned_chunks = base.pruned_chunks();
496
497 let commit_loc = batch_base + batch_len as u64 - 1;
499 overlay.set_bit(base, commit_loc);
500
501 overlay.clear_bit(base, pruned_chunks, batch_base - 1);
503
504 for (key, entry) in diff {
506 if let Some(loc) = entry.loc() {
508 if *loc >= batch_base && *loc < batch_base + batch_len as u64 {
509 overlay.set_bit(base, *loc);
510 }
511 }
512
513 let mut prev_loc = entry.base_old_loc();
516 for ancestor_diff in ancestor_diffs {
517 if let Some(ancestor_entry) = lookup_sorted(ancestor_diff.as_slice(), key) {
518 prev_loc = ancestor_entry.loc();
519 break;
520 }
521 }
522 if let Some(old) = prev_loc {
523 overlay.clear_bit(base, pruned_chunks, *old);
524 }
525 }
526
527 let parent_complete = base.complete_chunks();
531 let new_complete = overlay.complete_chunks();
532 for idx in parent_complete..new_complete {
533 overlay.chunk_mut(base, idx);
534 }
535
536 overlay
537}
538
539async fn compute_current_layer<F, E, U, C, I, H, const N: usize, S>(
545 inner: Arc<any::batch::MerkleizedBatch<F, H::Digest, U, S>>,
546 current_db: &super::db::Db<F, E, C, I, H, U, N, S>,
547 grafted_parent: &Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>>,
548 bitmap_parent: &BitmapBatch<N>,
549) -> Result<Arc<MerkleizedBatch<F, H::Digest, U, N, S>>, Error<F>>
550where
551 F: Graftable,
552 E: Context,
553 U: update::Update + Send + Sync,
554 C: Contiguous<Item = Operation<F, U>>,
555 I: UnorderedIndex<Value = Location<F>>,
556 H: Hasher,
557 S: Strategy,
558 Operation<F, U>: Codec,
559{
560 let batch_len = inner.journal_batch.items().len();
561 let batch_base = inner.bounds.total_size - batch_len as u64;
562
563 let overlay = build_chunk_overlay::<F, U, _, N>(
565 bitmap_parent,
566 batch_len,
567 batch_base,
568 &inner.diff,
569 &inner.ancestor_diffs,
570 );
571
572 let grafting_height = grafting::height::<N>();
573 let ops_tree_adapter =
574 BatchStorageAdapter::new(&inner.journal_batch, ¤t_db.any.log.merkle);
575
576 let overlay_ops_leaves = Location::<F>::new(inner.bounds.total_size);
579
580 let new_complete_chunks = overlay.complete_chunks();
588 let graftable_overlay = grafting::graftable_chunks::<F>(*overlay_ops_leaves, grafting_height)
589 .min(new_complete_chunks as u64) as usize;
590 let graftable_parent = *grafted_parent.leaves() as usize;
591 let pruned_chunks = bitmap_parent.pruned_chunks();
592 assert!(
593 pruned_chunks <= graftable_parent && graftable_parent <= graftable_overlay && graftable_overlay <= new_complete_chunks,
594 "invariant violated: pruned={pruned_chunks} graftable_parent={graftable_parent} graftable_overlay={graftable_overlay} new_complete={new_complete_chunks}"
595 );
596
597 let mut chunk_indices_to_update: BTreeSet<usize> = overlay
603 .chunks
604 .iter()
605 .filter(|(&idx, _)| idx < graftable_overlay && idx >= pruned_chunks)
606 .map(|(&idx, _)| idx)
607 .collect();
608 for idx in graftable_parent..graftable_overlay {
609 chunk_indices_to_update.insert(idx);
610 }
611 let chunks_to_update = chunk_indices_to_update.into_iter().map(|idx| {
612 let chunk = overlay
613 .get(idx)
614 .copied()
615 .unwrap_or_else(|| bitmap_parent.get_chunk(idx));
616 (idx, chunk)
617 });
618
619 let hasher = qmdb::hasher::<H>();
620 let new_leaves = compute_grafted_leaves::<F, H, S, N>(
621 &hasher,
622 &ops_tree_adapter,
623 chunks_to_update,
624 ¤t_db.strategy,
625 )
626 .await?;
627
628 let grafted_batch = {
630 let mut grafted_batch = grafted_parent.new_batch();
631 let old_grafted_leaves = *grafted_parent.leaves() as usize;
632 for &(chunk_idx, digest) in &new_leaves {
633 if chunk_idx < old_grafted_leaves {
634 grafted_batch = grafted_batch
635 .update_leaf_digest(Location::<F>::new(chunk_idx as u64), digest)
636 .expect("update_leaf_digest failed");
637 } else {
638 grafted_batch = grafted_batch.add_leaf_digest(digest);
639 }
640 }
641 let gh = grafting::GraftedHasher::<F, _>::new(hasher.clone(), grafting_height);
642 grafted_batch.merkleize(¤t_db.grafted_tree, &gh)
643 };
644
645 let bitmap_batch = BitmapBatch::Layer(Arc::new(BitmapBatchLayer {
649 parent: bitmap_parent.clone(),
650 overlay: Arc::new(overlay),
651 shared: Arc::clone(bitmap_parent.shared()),
652 }));
653
654 let ops_root = inner.root();
657 let layered = BatchOverMem {
658 batch: &grafted_batch,
659 mem: ¤t_db.grafted_tree,
660 };
661 let grafted_storage =
662 grafting::Storage::new(&layered, grafting_height, &ops_tree_adapter, hasher.clone());
663 let partial = {
669 let rem = bitmap_batch.len() % BitmapBatch::<N>::CHUNK_SIZE_BITS;
670 if rem == 0 {
671 None
672 } else {
673 let idx = new_complete_chunks;
674 let chunk = bitmap_batch.get_chunk(idx);
675 Some((chunk, rem))
676 }
677 };
678 let canonical_root = compute_db_root::<F, H, _, _, N>(
679 &hasher,
680 &bitmap_batch,
681 &grafted_storage,
682 overlay_ops_leaves,
683 partial,
684 inner.bounds.inactivity_floor,
685 &ops_root,
686 )
687 .await?;
688
689 Ok(Arc::new(MerkleizedBatch {
690 inner,
691 grafted: grafted_batch,
692 bitmap: bitmap_batch,
693 canonical_root,
694 }))
695}
696
697#[derive(Clone, Debug)]
703pub(crate) enum BitmapBatch<const N: usize> {
704 Base(Arc<Shared<N>>),
706 Layer(Arc<BitmapBatchLayer<N>>),
708}
709
710#[derive(Debug)]
712pub(crate) struct BitmapBatchLayer<const N: usize> {
713 pub(crate) parent: BitmapBatch<N>,
714 pub(crate) overlay: Arc<ChunkOverlay<N>>,
716 pub(crate) shared: Arc<Shared<N>>,
719}
720
721impl<const N: usize> BitmapBatch<N> {
722 const CHUNK_SIZE_BITS: u64 = bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
723
724 fn shared(&self) -> &Arc<Shared<N>> {
726 match self {
727 Self::Base(s) => s,
728 Self::Layer(layer) => &layer.shared,
729 }
730 }
731
732 fn trim_committed(&self) -> Self {
736 let shared = self.shared();
737 let committed = bitmap::Readable::<N>::len(shared.as_ref());
738 let mut kept = Vec::new();
739 let mut current = self;
740 while let Self::Layer(layer) = current {
741 if layer.overlay.len <= committed {
742 break;
743 }
744 kept.push(Arc::clone(&layer.overlay));
745 current = &layer.parent;
746 }
747 let mut result = Self::Base(Arc::clone(shared));
748 for overlay in kept.into_iter().rev() {
749 result = Self::Layer(Arc::new(BitmapBatchLayer {
750 parent: result,
751 overlay,
752 shared: Arc::clone(shared),
753 }));
754 }
755 result
756 }
757}
758
759impl<const N: usize> bitmap::Readable<N> for BitmapBatch<N> {
760 fn complete_chunks(&self) -> usize {
761 (self.len() / Self::CHUNK_SIZE_BITS) as usize
762 }
763
764 fn get_chunk(&self, idx: usize) -> [u8; N] {
765 let mut current = self;
768 loop {
769 match current {
770 Self::Base(shared) => return shared.get_chunk(idx),
771 Self::Layer(layer) => {
772 if let Some(&chunk) = layer.overlay.get(idx) {
773 return chunk;
774 }
775 current = &layer.parent;
776 }
777 }
778 }
779 }
780
781 fn last_chunk(&self) -> ([u8; N], u64) {
782 let total = self.len();
783 if total == 0 {
784 return ([0u8; N], 0);
785 }
786 let rem = total % Self::CHUNK_SIZE_BITS;
787 let bits_in_last = if rem == 0 { Self::CHUNK_SIZE_BITS } else { rem };
788 let idx = if rem == 0 {
789 self.complete_chunks().saturating_sub(1)
790 } else {
791 self.complete_chunks()
792 };
793 (self.get_chunk(idx), bits_in_last)
794 }
795
796 fn pruned_chunks(&self) -> usize {
797 self.shared().pruned_chunks()
798 }
799
800 fn len(&self) -> u64 {
801 match self {
802 Self::Base(shared) => bitmap::Readable::<N>::len(shared.as_ref()),
803 Self::Layer(layer) => layer.overlay.len,
804 }
805 }
806}
807
808impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize, S: Strategy>
809 MerkleizedBatch<F, D, U, N, S>
810where
811 Operation<F, U>: Send + Sync,
812{
813 pub const fn root(&self) -> D {
815 self.canonical_root
816 }
817
818 pub fn ops_root(&self) -> D {
820 self.inner.root()
821 }
822
823 pub fn bounds(&self) -> &Bounds<F> {
825 self.inner.bounds()
826 }
827
828 pub fn sync_boundary(&self) -> Location<F> {
830 super::db::sync_boundary::<F, N>(
831 self.bitmap.pruned_chunks() as u64,
832 self.inner.bounds().total_size,
833 )
834 }
835}
836
837impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize, S: Strategy>
838 MerkleizedBatch<F, D, U, N, S>
839where
840 Operation<F, U>: Codec,
841{
842 pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, U, N, S>
852 where
853 H: Hasher<Digest = D>,
854 {
855 UnmerkleizedBatch::new(
856 self.inner.new_batch::<H>(),
857 Arc::clone(&self.grafted),
858 self.bitmap.trim_committed(),
859 )
860 }
861
862 pub async fn get<E, C, I, H>(
867 &self,
868 key: &U::Key,
869 db: &super::db::Db<F, E, C, I, H, U, N, S>,
870 ) -> Result<Option<U::Value>, Error<F>>
871 where
872 E: Context,
873 C: Contiguous<Item = Operation<F, U>>,
874 I: UnorderedIndex<Value = Location<F>> + 'static,
875 H: Hasher<Digest = D>,
876 {
877 self.inner.get(key, &db.any).await
878 }
879
880 pub async fn get_many<E, C, I, H>(
884 &self,
885 keys: &[&U::Key],
886 db: &super::db::Db<F, E, C, I, H, U, N, S>,
887 ) -> Result<Vec<Option<U::Value>>, Error<F>>
888 where
889 E: Context,
890 C: Contiguous<Item = Operation<F, U>>,
891 I: UnorderedIndex<Value = Location<F>> + 'static,
892 H: Hasher<Digest = D>,
893 {
894 self.inner.get_many(keys, &db.any).await
895 }
896}
897
898impl<F, E, C, I, H, U, const N: usize, S> super::db::Db<F, E, C, I, H, U, N, S>
899where
900 F: Graftable,
901 E: Context,
902 U: update::Update + Send + Sync,
903 C: Contiguous<Item = Operation<F, U>>,
904 I: UnorderedIndex<Value = Location<F>>,
905 H: Hasher,
906 S: Strategy,
907 Operation<F, U>: Codec,
908{
909 pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, U, N, S>> {
915 let grafted = self.grafted_snapshot();
916 Arc::new(MerkleizedBatch {
917 inner: self.any.to_batch(),
918 grafted,
919 bitmap: BitmapBatch::Base(Arc::clone(&self.any.bitmap)),
920 canonical_root: self.root,
921 })
922 }
923}
924
925#[cfg(any(test, feature = "test-traits"))]
926mod trait_impls {
927 use super::*;
928 use crate::{
929 journal::contiguous::Mutable,
930 qmdb::any::traits::{
931 BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
932 UnmerkleizedBatch as UnmerkleizedBatchTrait,
933 },
934 Persistable,
935 };
936 use std::future::Future;
937
938 type CurrentDb<F, E, C, I, H, U, const N: usize, S> =
939 crate::qmdb::current::db::Db<F, E, C, I, H, U, N, S>;
940
941 impl<F, K, V, H, E, C, I, const N: usize, S>
942 UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N, S>>
943 for UnmerkleizedBatch<F, H, update::Unordered<K, V>, N, S>
944 where
945 F: Graftable,
946 K: Key,
947 V: ValueEncoding + 'static,
948 H: Hasher,
949 E: Context,
950 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
951 + Persistable<Error = crate::journal::Error>,
952 I: UnorderedIndex<Value = Location<F>> + 'static,
953 S: Strategy,
954 Operation<F, update::Unordered<K, V>>: Codec,
955 {
956 type Family = F;
957 type K = K;
958 type V = V::Value;
959 type Metadata = V::Value;
960 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N, S>>;
961
962 fn write(self, key: K, value: Option<V::Value>) -> Self {
963 Self::write(self, key, value)
964 }
965
966 async fn merkleize(
967 self,
968 db: &CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N, S>,
969 metadata: Option<V::Value>,
970 ) -> Result<Self::Merkleized, crate::qmdb::Error<F>> {
971 self.merkleize(db, metadata).await
972 }
973 }
974
975 impl<F, K, V, H, E, C, I, const N: usize, S>
976 UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N, S>>
977 for UnmerkleizedBatch<F, H, update::Ordered<K, V>, N, S>
978 where
979 F: Graftable,
980 K: Key,
981 V: ValueEncoding + 'static,
982 H: Hasher,
983 E: Context,
984 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
985 + Persistable<Error = crate::journal::Error>,
986 I: crate::index::Ordered<Value = Location<F>> + 'static,
987 S: Strategy,
988 Operation<F, update::Ordered<K, V>>: Codec,
989 {
990 type Family = F;
991 type K = K;
992 type V = V::Value;
993 type Metadata = V::Value;
994 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N, S>>;
995
996 fn write(self, key: K, value: Option<V::Value>) -> Self {
997 Self::write(self, key, value)
998 }
999
1000 async fn merkleize(
1001 self,
1002 db: &CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N, S>,
1003 metadata: Option<V::Value>,
1004 ) -> Result<Self::Merkleized, crate::qmdb::Error<F>> {
1005 self.merkleize(db, metadata).await
1006 }
1007 }
1008
1009 impl<
1010 F: Graftable,
1011 D: Digest,
1012 U: update::Update + Send + Sync + 'static,
1013 const N: usize,
1014 S: Strategy,
1015 > MerkleizedBatchTrait for Arc<MerkleizedBatch<F, D, U, N, S>>
1016 where
1017 Operation<F, U>: Codec,
1018 {
1019 type Digest = D;
1020
1021 fn root(&self) -> D {
1022 MerkleizedBatch::root(self)
1023 }
1024 }
1025
1026 impl<F, E, K, V, C, I, H, const N: usize, S> BatchableDb
1027 for CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N, S>
1028 where
1029 F: Graftable,
1030 E: Context,
1031 K: Key,
1032 V: ValueEncoding + 'static,
1033 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
1034 + Persistable<Error = crate::journal::Error>,
1035 I: UnorderedIndex<Value = Location<F>> + 'static,
1036 H: Hasher,
1037 S: Strategy,
1038 Operation<F, update::Unordered<K, V>>: Codec,
1039 {
1040 type Family = F;
1041 type K = K;
1042 type V = V::Value;
1043 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N, S>>;
1044 type Batch = UnmerkleizedBatch<F, H, update::Unordered<K, V>, N, S>;
1045
1046 fn new_batch(&self) -> Self::Batch {
1047 self.new_batch()
1048 }
1049
1050 fn apply_batch(
1051 &mut self,
1052 batch: Self::Merkleized,
1053 ) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
1054 {
1055 self.apply_batch(batch)
1056 }
1057 }
1058
1059 impl<F, E, K, V, C, I, H, const N: usize, S> BatchableDb
1060 for CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N, S>
1061 where
1062 F: Graftable,
1063 E: Context,
1064 K: Key,
1065 V: ValueEncoding + 'static,
1066 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
1067 + Persistable<Error = crate::journal::Error>,
1068 I: crate::index::Ordered<Value = Location<F>> + 'static,
1069 H: Hasher,
1070 S: Strategy,
1071 Operation<F, update::Ordered<K, V>>: Codec,
1072 {
1073 type Family = F;
1074 type K = K;
1075 type V = V::Value;
1076 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N, S>>;
1077 type Batch = UnmerkleizedBatch<F, H, update::Ordered<K, V>, N, S>;
1078
1079 fn new_batch(&self) -> Self::Batch {
1080 self.new_batch()
1081 }
1082
1083 fn apply_batch(
1084 &mut self,
1085 batch: Self::Merkleized,
1086 ) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
1087 {
1088 self.apply_batch(batch)
1089 }
1090 }
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095 use super::*;
1096 use crate::mmr;
1097 use commonware_utils::bitmap::Prunable as BitMap;
1098
1099 const N: usize = 4;
1101 type Bm = BitMap<N>;
1102 type Location = mmr::Location;
1103
1104 fn make_bitmap(bits: &[bool]) -> Bm {
1105 let mut bm = Bm::new();
1106 for &b in bits {
1107 bm.push(b);
1108 }
1109 bm
1110 }
1111
1112 #[test]
1115 fn chunk_overlay_pushes() {
1116 use crate::qmdb::any::value::FixedEncoding;
1117 use commonware_utils::sequence::FixedBytes;
1118
1119 type K = FixedBytes<4>;
1120 type V = FixedEncoding<u64>;
1121 type U = crate::qmdb::any::operation::update::Unordered<K, V>;
1122
1123 let key1 = FixedBytes::from([1, 0, 0, 0]);
1124 let key2 = FixedBytes::from([2, 0, 0, 0]);
1125
1126 let base = make_bitmap(&[true; 4]);
1131 let mut diff = vec![
1132 (
1133 key1,
1134 DiffEntry::Active {
1135 value: 100u64,
1136 loc: Location::new(4), base_old_loc: None,
1138 },
1139 ),
1140 (
1141 key2,
1142 DiffEntry::Active {
1143 value: 200u64,
1144 loc: Location::new(99), base_old_loc: None,
1146 },
1147 ),
1148 ];
1149 diff.sort_by(|a, b| a.0.cmp(&b.0));
1150
1151 let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 4, 4, &diff, &[]);
1152
1153 let c0 = overlay.get(0).expect("chunk 0 should be dirty");
1156 assert_ne!(c0[0] & (1 << 4), 0); assert_eq!(c0[0] & (1 << 5), 0); assert_eq!(c0[0] & (1 << 6), 0); assert_ne!(c0[0] & (1 << 7), 0); assert_eq!(c0[0] & (1 << 3), 0); }
1162
1163 #[test]
1164 fn chunk_overlay_clears() {
1165 use crate::qmdb::any::value::FixedEncoding;
1166 use commonware_utils::sequence::FixedBytes;
1167
1168 type K = FixedBytes<4>;
1169 type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
1170
1171 let key1 = FixedBytes::from([1, 0, 0, 0]);
1172 let key2 = FixedBytes::from([2, 0, 0, 0]);
1173 let key3 = FixedBytes::from([3, 0, 0, 0]);
1174
1175 let base = make_bitmap(&[true; 64]);
1177
1178 let mut diff: Vec<(K, DiffEntry<mmr::Family, u64>)> = vec![
1179 (
1180 key1,
1181 DiffEntry::Active {
1182 value: 100,
1183 loc: Location::new(70),
1184 base_old_loc: Some(Location::new(5)),
1185 },
1186 ),
1187 (
1188 key2,
1189 DiffEntry::Deleted {
1190 base_old_loc: Some(Location::new(10)),
1191 },
1192 ),
1193 (
1194 key3,
1195 DiffEntry::Active {
1196 value: 300,
1197 loc: Location::new(71),
1198 base_old_loc: None,
1199 },
1200 ),
1201 ];
1202 diff.sort_by(|a, b| a.0.cmp(&b.0));
1203
1204 let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 8, 64, &diff, &[]);
1206
1207 let c0 = overlay.get(0).expect("chunk 0 should be dirty");
1209 assert_eq!(c0[0] & (1 << 5), 0); assert_eq!(c0[1] & (1 << 2), 0); assert_eq!(c0[0] & (1 << 4), 1 << 4); assert_eq!(c0[1] & (1 << 3), 1 << 3); }
1216
1217 #[test]
1221 fn chunk_overlay_preserves_partial_parent_chunk() {
1222 use crate::qmdb::any::value::FixedEncoding;
1223 use commonware_utils::sequence::FixedBytes;
1224
1225 type K = FixedBytes<4>;
1226 type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
1227
1228 let base = make_bitmap(&[true; 20]);
1230 assert_eq!(base.complete_chunks(), 0); let key1 = FixedBytes::from([1, 0, 0, 0]);
1236 let mut diff = vec![(
1237 key1,
1238 DiffEntry::Active {
1239 value: 42u64,
1240 loc: Location::new(35),
1241 base_old_loc: None,
1242 },
1243 )];
1244 diff.sort_by(|a, b| a.0.cmp(&b.0));
1245
1246 let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 20, 20, &diff, &[]);
1247
1248 let c0 = overlay.get(0).expect("chunk 0 should be in overlay");
1250 assert_eq!(c0[0], 0xFF);
1252 assert_eq!(c0[1], 0xFF);
1254 assert_eq!(c0[2], 0x07);
1256 }
1257
1258 #[test]
1261 fn bitmap_scan_all_active() {
1262 let bm = make_bitmap(&[true; 8]);
1263 for i in 0..8 {
1264 assert_eq!(
1265 next_candidate(&bm, Location::new(i), 8),
1266 Some(Location::new(i))
1267 );
1268 }
1269 assert_eq!(next_candidate(&bm, Location::new(8), 8), None);
1270 }
1271
1272 #[test]
1273 fn bitmap_scan_all_inactive() {
1274 let bm = make_bitmap(&[false; 8]);
1275 assert_eq!(next_candidate(&bm, Location::new(0), 8), None);
1276 }
1277
1278 #[test]
1279 fn bitmap_scan_skips_inactive() {
1280 let bm = make_bitmap(&[false, false, true, false, true]);
1282 assert_eq!(
1283 next_candidate(&bm, Location::new(0), 5),
1284 Some(Location::new(2))
1285 );
1286 assert_eq!(
1287 next_candidate(&bm, Location::new(3), 5),
1288 Some(Location::new(4))
1289 );
1290 assert_eq!(next_candidate(&bm, Location::new(5), 5), None);
1291 }
1292
1293 #[test]
1294 fn bitmap_scan_beyond_bitmap_len_returns_candidate() {
1295 let bm = make_bitmap(&[false; 4]);
1298 assert_eq!(
1300 next_candidate(&bm, Location::new(0), 8),
1301 Some(Location::new(4))
1302 );
1303 assert_eq!(
1304 next_candidate(&bm, Location::new(6), 8),
1305 Some(Location::new(6))
1306 );
1307 }
1308
1309 #[test]
1310 fn bitmap_scan_respects_tip() {
1311 let bm = make_bitmap(&[false, false, false, true]);
1312 assert_eq!(next_candidate(&bm, Location::new(0), 3), None);
1314 assert_eq!(
1316 next_candidate(&bm, Location::new(0), 4),
1317 Some(Location::new(3))
1318 );
1319 }
1320
1321 #[test]
1322 fn bitmap_scan_floor_at_tip() {
1323 let bm = make_bitmap(&[true; 4]);
1324 assert_eq!(next_candidate(&bm, Location::new(4), 4), None);
1325 }
1326
1327 #[test]
1328 fn bitmap_scan_empty_bitmap() {
1329 let bm = Bm::new();
1330 assert_eq!(
1332 next_candidate(&bm, Location::new(0), 5),
1333 Some(Location::new(0))
1334 );
1335 assert_eq!(next_candidate(&bm, Location::new(0), 0), None);
1337 }
1338
1339 fn make_chain(shared: &Arc<Shared<N>>, overlay_lens: &[u64]) -> BitmapBatch<N> {
1352 let mut chain = BitmapBatch::Base(Arc::clone(shared));
1353 for &len in overlay_lens {
1354 chain = BitmapBatch::Layer(Arc::new(BitmapBatchLayer {
1355 parent: chain,
1356 overlay: Arc::new(ChunkOverlay::new(len)),
1357 shared: Arc::clone(shared),
1358 }));
1359 }
1360 chain
1361 }
1362
1363 fn chain_overlays(batch: &BitmapBatch<N>) -> Vec<u64> {
1367 let mut lens = Vec::new();
1368 let mut current = batch;
1369 while let BitmapBatch::Layer(layer) = current {
1370 lens.push(layer.overlay.len);
1371 current = &layer.parent;
1372 }
1373 assert!(matches!(current, BitmapBatch::Base(_)));
1374 lens.reverse();
1375 lens
1376 }
1377
1378 #[test]
1383 fn trim_committed_already_base() {
1384 let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
1385 let base = BitmapBatch::Base(Arc::clone(&shared));
1386 let result = base.trim_committed();
1387 match result {
1389 BitmapBatch::Base(s) => assert!(Arc::ptr_eq(&s, &shared)),
1390 BitmapBatch::Layer(_) => panic!("expected Base"),
1391 }
1392 }
1393
1394 #[test]
1400 fn trim_committed_all_committed() {
1401 let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
1403 let chain = make_chain(&shared, &[32]);
1404 let result = chain.trim_committed();
1405 match result {
1407 BitmapBatch::Base(s) => assert!(Arc::ptr_eq(&s, &shared)),
1408 BitmapBatch::Layer(_) => panic!("expected Base after full trim"),
1409 }
1410 }
1411
1412 #[test]
1417 fn trim_committed_none_committed() {
1418 let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 32])));
1420 let chain = make_chain(&shared, &[64, 96]);
1421 let result = chain.trim_committed();
1422 assert_eq!(chain_overlays(&result), vec![64, 96]);
1424 }
1425
1426 #[test]
1432 fn trim_committed_exactly_one_uncommitted() {
1433 let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
1435 let chain = make_chain(&shared, &[64, 96]);
1436 let result = chain.trim_committed();
1437 assert_eq!(chain_overlays(&result), vec![96]);
1439 assert!(Arc::ptr_eq(result.shared(), &shared));
1441 }
1442
1443 #[test]
1448 fn trim_committed_multiple_uncommitted() {
1449 let shared = Arc::new(Shared::<N>::new(make_bitmap(&[true; 64])));
1451 let chain = make_chain(&shared, &[64, 96, 128]);
1452 let result = chain.trim_committed();
1453 assert_eq!(chain_overlays(&result), vec![96, 128]);
1455 assert!(Arc::ptr_eq(result.shared(), &shared));
1457 }
1458}