1use crate::{
6 index::Unordered as UnorderedIndex,
7 journal::contiguous::{Contiguous, Mutable},
8 merkle::{
9 self, hasher::Standard as StandardHasher, storage::Storage as MerkleStorage, Graftable,
10 Location, Position, Readable,
11 },
12 qmdb::{
13 any::{
14 self,
15 batch::{DiffEntry, FloorScan},
16 operation::{update, Operation},
17 ValueEncoding,
18 },
19 current::{
20 db::{compute_db_root, compute_grafted_leaves},
21 grafting,
22 },
23 operation::Key,
24 Error,
25 },
26 Context,
27};
28use commonware_codec::Codec;
29use commonware_cryptography::{Digest, Hasher};
30use commonware_utils::bitmap::{Prunable as BitMap, Readable as BitmapReadable};
31use std::{collections::BTreeMap, sync::Arc};
32
33#[derive(Clone, Debug, Default)]
39pub(crate) struct ChunkOverlay<const N: usize> {
40 pub(crate) chunks: BTreeMap<usize, [u8; N]>,
42 pub(crate) len: u64,
44}
45
46impl<const N: usize> ChunkOverlay<N> {
47 const CHUNK_BITS: u64 = BitMap::<N>::CHUNK_SIZE_BITS;
48
49 const fn new(len: u64) -> Self {
50 Self {
51 chunks: BTreeMap::new(),
52 len,
53 }
54 }
55
56 fn chunk_mut<B: BitmapReadable<N>>(&mut self, base: &B, idx: usize) -> &mut [u8; N] {
59 self.chunks.entry(idx).or_insert_with(|| {
60 let base_len = base.len();
61 let base_complete = base.complete_chunks();
62 let base_has_partial = !base_len.is_multiple_of(Self::CHUNK_BITS);
63 if idx < base_complete {
64 base.get_chunk(idx)
65 } else if idx == base_complete && base_has_partial {
66 base.last_chunk().0
67 } else {
68 [0u8; N]
69 }
70 })
71 }
72
73 fn set_bit<B: BitmapReadable<N>>(&mut self, base: &B, loc: u64) {
75 let idx = BitMap::<N>::to_chunk_index(loc);
76 let rel = (loc % Self::CHUNK_BITS) as usize;
77 let chunk = self.chunk_mut(base, idx);
78 chunk[rel / 8] |= 1 << (rel % 8);
79 }
80
81 fn clear_bit<B: BitmapReadable<N>>(&mut self, base: &B, loc: u64) {
84 let idx = BitMap::<N>::to_chunk_index(loc);
85 if idx < base.pruned_chunks() {
86 return;
87 }
88 let rel = (loc % Self::CHUNK_BITS) as usize;
89 let chunk = self.chunk_mut(base, idx);
90 chunk[rel / 8] &= !(1 << (rel % 8));
91 }
92
93 pub(crate) fn get(&self, idx: usize) -> Option<&[u8; N]> {
95 self.chunks.get(&idx)
96 }
97
98 pub(crate) const fn complete_chunks(&self) -> usize {
100 (self.len / Self::CHUNK_BITS) as usize
101 }
102}
103
104pub(crate) struct BitmapScan<'a, B, const N: usize> {
107 bitmap: &'a B,
108}
109
110impl<'a, B: BitmapReadable<N>, const N: usize> BitmapScan<'a, B, N> {
111 pub(crate) const fn new(bitmap: &'a B) -> Self {
112 Self { bitmap }
113 }
114}
115
116impl<F: Graftable, B: BitmapReadable<N>, const N: usize> FloorScan<F> for BitmapScan<'_, B, N> {
117 fn next_candidate(&mut self, floor: Location<F>, tip: u64) -> Option<Location<F>> {
118 let loc = *floor;
119 if loc >= tip {
120 return None;
121 }
122 let bitmap_len = self.bitmap.len();
123 if loc < bitmap_len {
129 let bound = bitmap_len.min(tip);
130 if let Some(idx) = self.bitmap.ones_iter_from(loc).next() {
131 if idx < bound {
132 return Some(Location::<F>::new(idx));
133 }
134 }
135 }
136 if bitmap_len < tip {
139 let candidate = loc.max(bitmap_len);
140 if candidate < tip {
141 return Some(Location::<F>::new(candidate));
142 }
143 }
144 None
145 }
146}
147
148struct BatchStorageAdapter<
154 'a,
155 F: Graftable,
156 D: Digest,
157 R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
158 S: MerkleStorage<F, Digest = D>,
159> {
160 batch: &'a R,
161 base: &'a S,
162 _phantom: core::marker::PhantomData<(F, D)>,
163}
164
165impl<
166 'a,
167 F: Graftable,
168 D: Digest,
169 R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
170 S: MerkleStorage<F, Digest = D>,
171 > BatchStorageAdapter<'a, F, D, R, S>
172{
173 const fn new(batch: &'a R, base: &'a S) -> Self {
174 Self {
175 batch,
176 base,
177 _phantom: core::marker::PhantomData,
178 }
179 }
180}
181
182impl<
183 F: Graftable,
184 D: Digest,
185 R: Readable<Family = F, Digest = D, Error = merkle::Error<F>>,
186 S: MerkleStorage<F, Digest = D>,
187 > MerkleStorage<F> for BatchStorageAdapter<'_, F, D, R, S>
188{
189 type Digest = D;
190
191 async fn size(&self) -> Position<F> {
192 self.batch.size()
193 }
194 async fn get_node(&self, pos: Position<F>) -> Result<Option<D>, merkle::Error<F>> {
195 if let Some(node) = self.batch.get_node(pos) {
196 return Ok(Some(node));
197 }
198 self.base.get_node(pos).await
199 }
200}
201
202struct BatchOverMem<'a, F: Graftable, D: Digest> {
207 batch: &'a merkle::batch::MerkleizedBatch<F, D>,
208 mem: &'a merkle::mem::Mem<F, D>,
209}
210
211impl<F: Graftable, D: Digest> Readable for BatchOverMem<'_, F, D> {
212 type Family = F;
213 type Digest = D;
214 type Error = merkle::Error<F>;
215
216 fn size(&self) -> Position<F> {
217 self.batch.size()
218 }
219
220 fn get_node(&self, pos: Position<F>) -> Option<D> {
221 if let Some(d) = self.batch.get_node(pos) {
222 return Some(d);
223 }
224 self.mem.get_node(pos)
225 }
226
227 fn root(&self) -> D {
228 self.batch.root()
229 }
230
231 fn pruning_boundary(&self) -> Location<F> {
232 self.batch.pruning_boundary()
233 }
234
235 fn proof(
236 &self,
237 _hasher: &impl crate::merkle::hasher::Hasher<F, Digest = D>,
238 _loc: Location<F>,
239 ) -> Result<crate::merkle::Proof<F, D>, merkle::Error<F>> {
240 unreachable!("proof not used in compute_current_layer")
241 }
242
243 fn range_proof(
244 &self,
245 _hasher: &impl crate::merkle::hasher::Hasher<F, Digest = D>,
246 _range: core::ops::Range<Location<F>>,
247 ) -> Result<crate::merkle::Proof<F, D>, merkle::Error<F>> {
248 unreachable!("range_proof not used in compute_current_layer")
249 }
250}
251
252pub struct UnmerkleizedBatch<F, H, U, const N: usize>
258where
259 F: Graftable,
260 U: update::Update + Send + Sync,
261 H: Hasher,
262 Operation<F, U>: Codec,
263{
264 inner: any::batch::UnmerkleizedBatch<F, H, U>,
266
267 grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest>>,
269
270 bitmap_parent: BitmapBatch<N>,
272}
273
274pub struct MerkleizedBatch<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize>
280where
281 Operation<F, U>: Send + Sync,
282{
283 pub(crate) inner: Arc<any::batch::MerkleizedBatch<F, D, U>>,
285
286 pub(crate) grafted: Arc<merkle::batch::MerkleizedBatch<F, D>>,
288
289 pub(crate) bitmap: BitmapBatch<N>,
291
292 pub(crate) canonical_root: D,
294}
295
296impl<F, H, U, const N: usize> UnmerkleizedBatch<F, H, U, N>
297where
298 F: Graftable,
299 U: update::Update + Send + Sync,
300 H: Hasher,
301 Operation<F, U>: Codec,
302{
303 pub(super) const fn new(
304 inner: any::batch::UnmerkleizedBatch<F, H, U>,
305 grafted_parent: Arc<merkle::batch::MerkleizedBatch<F, H::Digest>>,
306 bitmap_parent: BitmapBatch<N>,
307 ) -> Self {
308 Self {
309 inner,
310 grafted_parent,
311 bitmap_parent,
312 }
313 }
314
315 pub fn write(mut self, key: U::Key, value: Option<U::Value>) -> Self {
320 self.inner = self.inner.write(key, value);
321 self
322 }
323}
324
325impl<F, K, V, H, const N: usize> UnmerkleizedBatch<F, H, update::Unordered<K, V>, N>
327where
328 F: Graftable,
329 K: Key,
330 V: ValueEncoding,
331 H: Hasher,
332 Operation<F, update::Unordered<K, V>>: Codec,
333{
334 pub async fn get<E, C, I>(
336 &self,
337 key: &K,
338 db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N>,
339 ) -> Result<Option<V::Value>, Error<F>>
340 where
341 E: Context,
342 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
343 I: UnorderedIndex<Value = Location<F>> + 'static,
344 {
345 self.inner.get(key, &db.any).await
346 }
347
348 pub async fn merkleize<E, C, I>(
350 self,
351 db: &super::db::Db<F, E, C, I, H, update::Unordered<K, V>, N>,
352 metadata: Option<V::Value>,
353 ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N>>, Error<F>>
354 where
355 E: Context,
356 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>,
357 I: UnorderedIndex<Value = Location<F>> + 'static,
358 {
359 let Self {
360 inner,
361 grafted_parent,
362 bitmap_parent,
363 } = self;
364 let scan = BitmapScan::new(&bitmap_parent);
365 let inner = inner
366 .merkleize_with_floor_scan(&db.any, metadata, scan)
367 .await?;
368 compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
369 }
370}
371
372impl<F, K, V, H, const N: usize> UnmerkleizedBatch<F, H, update::Ordered<K, V>, N>
374where
375 F: Graftable,
376 K: Key,
377 V: ValueEncoding,
378 H: Hasher,
379 Operation<F, update::Ordered<K, V>>: Codec,
380{
381 pub async fn get<E, C, I>(
383 &self,
384 key: &K,
385 db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N>,
386 ) -> Result<Option<V::Value>, Error<F>>
387 where
388 E: Context,
389 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
390 I: crate::index::Ordered<Value = Location<F>> + 'static,
391 {
392 self.inner.get(key, &db.any).await
393 }
394
395 pub async fn merkleize<E, C, I>(
397 self,
398 db: &super::db::Db<F, E, C, I, H, update::Ordered<K, V>, N>,
399 metadata: Option<V::Value>,
400 ) -> Result<Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N>>, Error<F>>
401 where
402 E: Context,
403 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>,
404 I: crate::index::Ordered<Value = Location<F>> + 'static,
405 {
406 let Self {
407 inner,
408 grafted_parent,
409 bitmap_parent,
410 } = self;
411 let scan = BitmapScan::new(&bitmap_parent);
412 let inner = inner
413 .merkleize_with_floor_scan(&db.any, metadata, scan)
414 .await?;
415 compute_current_layer(inner, db, &grafted_parent, &bitmap_parent).await
416 }
417}
418
419#[allow(clippy::type_complexity)]
429fn build_chunk_overlay<F: Graftable, U, B: BitmapReadable<N>, const N: usize>(
430 base: &B,
431 batch_len: usize,
432 batch_base: u64,
433 diff: &BTreeMap<U::Key, DiffEntry<F, U::Value>>,
434 ancestor_diffs: &[Arc<BTreeMap<U::Key, DiffEntry<F, U::Value>>>],
435) -> ChunkOverlay<N>
436where
437 U: update::Update,
438{
439 let total_bits = base.len() + batch_len as u64;
440 let mut overlay = ChunkOverlay::new(total_bits);
441
442 let commit_loc = batch_base + batch_len as u64 - 1;
444 overlay.set_bit(base, commit_loc);
445
446 overlay.clear_bit(base, batch_base - 1);
448
449 for (key, entry) in diff {
451 if let Some(loc) = entry.loc() {
453 if *loc >= batch_base && *loc < batch_base + batch_len as u64 {
454 overlay.set_bit(base, *loc);
455 }
456 }
457
458 let mut prev_loc = entry.base_old_loc();
461 for ancestor_diff in ancestor_diffs {
462 if let Some(ancestor_entry) = ancestor_diff.get(key) {
463 prev_loc = ancestor_entry.loc();
464 break;
465 }
466 }
467 if let Some(old) = prev_loc {
468 overlay.clear_bit(base, *old);
469 }
470 }
471
472 let parent_complete = base.complete_chunks();
476 let new_complete = overlay.complete_chunks();
477 for idx in parent_complete..new_complete {
478 overlay.chunk_mut(base, idx);
479 }
480
481 overlay
482}
483
484async fn compute_current_layer<F, E, U, C, I, H, const N: usize>(
490 inner: Arc<any::batch::MerkleizedBatch<F, H::Digest, U>>,
491 current_db: &super::db::Db<F, E, C, I, H, U, N>,
492 grafted_parent: &Arc<merkle::batch::MerkleizedBatch<F, H::Digest>>,
493 bitmap_parent: &BitmapBatch<N>,
494) -> Result<Arc<MerkleizedBatch<F, H::Digest, U, N>>, Error<F>>
495where
496 F: Graftable,
497 E: Context,
498 U: update::Update + Send + Sync,
499 C: Contiguous<Item = Operation<F, U>>,
500 I: UnorderedIndex<Value = Location<F>>,
501 H: Hasher,
502 Operation<F, U>: Codec,
503{
504 let batch_len = inner.journal_batch.items().len();
505 let batch_base = *inner.new_last_commit_loc + 1 - batch_len as u64;
506
507 let overlay = build_chunk_overlay::<F, U, _, N>(
509 bitmap_parent,
510 batch_len,
511 batch_base,
512 &inner.diff,
513 &inner.ancestor_diffs,
514 );
515
516 let new_grafted_leaves = overlay.complete_chunks();
519 let chunks_to_update = overlay
520 .chunks
521 .iter()
522 .filter(|(&idx, _)| idx < new_grafted_leaves)
523 .map(|(&idx, &chunk)| (idx, chunk));
524 let ops_tree_adapter =
525 BatchStorageAdapter::new(&inner.journal_batch, ¤t_db.any.log.merkle);
526 let hasher = StandardHasher::<H>::new();
527 let new_leaves = compute_grafted_leaves::<F, H, N>(
528 &hasher,
529 &ops_tree_adapter,
530 chunks_to_update,
531 current_db.thread_pool.as_ref(),
532 )
533 .await?;
534
535 let grafting_height = grafting::height::<N>();
537 let grafted_batch = {
538 let mut grafted_batch = grafted_parent
539 .new_batch()
540 .with_pool(current_db.thread_pool.clone());
541 let old_grafted_leaves = *grafted_parent.leaves() as usize;
542 for &(chunk_idx, digest) in &new_leaves {
543 if chunk_idx < old_grafted_leaves {
544 grafted_batch = grafted_batch
545 .update_leaf_digest(Location::<F>::new(chunk_idx as u64), digest)
546 .expect("update_leaf_digest failed");
547 } else {
548 grafted_batch = grafted_batch.add_leaf_digest(digest);
549 }
550 }
551 let gh = grafting::GraftedHasher::<F, _>::new(hasher.clone(), grafting_height);
552 grafted_batch.merkleize(¤t_db.grafted_tree, &gh)
553 };
554
555 let bitmap_batch = BitmapBatch::Layer(Arc::new(BitmapBatchLayer {
559 parent: bitmap_parent.clone(),
560 overlay: Arc::new(overlay),
561 }));
562
563 let ops_root = inner.root();
566 let layered = BatchOverMem {
567 batch: &grafted_batch,
568 mem: ¤t_db.grafted_tree,
569 };
570 let grafted_storage = grafting::Storage::new(&layered, grafting_height, &ops_tree_adapter);
571 let partial = {
573 let rem = bitmap_batch.len() % BitmapBatch::<N>::CHUNK_SIZE_BITS;
574 if rem == 0 {
575 None
576 } else {
577 let idx = new_grafted_leaves;
578 let chunk = bitmap_batch.get_chunk(idx);
579 Some((chunk, rem))
580 }
581 };
582 let canonical_root = compute_db_root::<F, H, _, _, _, N>(
583 &hasher,
584 &bitmap_batch,
585 &grafted_storage,
586 partial,
587 &ops_root,
588 )
589 .await?;
590
591 Ok(Arc::new(MerkleizedBatch {
592 inner,
593 grafted: grafted_batch,
594 bitmap: bitmap_batch,
595 canonical_root,
596 }))
597}
598
599#[derive(Clone, Debug)]
603pub(crate) enum BitmapBatch<const N: usize> {
604 Base(Arc<BitMap<N>>),
606 Layer(Arc<BitmapBatchLayer<N>>),
608}
609
610#[derive(Debug)]
612pub(crate) struct BitmapBatchLayer<const N: usize> {
613 pub(crate) parent: BitmapBatch<N>,
614 pub(crate) overlay: Arc<ChunkOverlay<N>>,
616}
617
618impl<const N: usize> BitmapBatch<N> {
619 const CHUNK_SIZE_BITS: u64 = BitMap::<N>::CHUNK_SIZE_BITS;
620}
621
622impl<const N: usize> BitmapReadable<N> for BitmapBatch<N> {
623 fn complete_chunks(&self) -> usize {
624 (self.len() / Self::CHUNK_SIZE_BITS) as usize
625 }
626
627 fn get_chunk(&self, idx: usize) -> [u8; N] {
628 match self {
629 Self::Base(bm) => *bm.get_chunk(idx),
630 Self::Layer(layer) => {
631 if let Some(&chunk) = layer.overlay.get(idx) {
633 chunk
634 } else {
635 layer.parent.get_chunk(idx)
636 }
637 }
638 }
639 }
640
641 fn last_chunk(&self) -> ([u8; N], u64) {
642 let total = self.len();
643 if total == 0 {
644 return ([0u8; N], 0);
645 }
646 let rem = total % Self::CHUNK_SIZE_BITS;
647 let bits_in_last = if rem == 0 { Self::CHUNK_SIZE_BITS } else { rem };
648 let idx = if rem == 0 {
649 self.complete_chunks().saturating_sub(1)
650 } else {
651 self.complete_chunks()
652 };
653 (self.get_chunk(idx), bits_in_last)
654 }
655
656 fn pruned_chunks(&self) -> usize {
657 match self {
658 Self::Base(bm) => bm.pruned_chunks(),
659 Self::Layer(layer) => layer.parent.pruned_chunks(),
660 }
661 }
662
663 fn len(&self) -> u64 {
664 match self {
665 Self::Base(bm) => BitmapReadable::<N>::len(bm.as_ref()),
666 Self::Layer(layer) => layer.overlay.len,
667 }
668 }
669}
670
671impl<const N: usize> BitmapBatch<N> {
672 pub(super) fn apply_overlay(&mut self, overlay: Arc<ChunkOverlay<N>>) {
675 if let Self::Base(base) = self {
677 if let Some(bitmap) = Arc::get_mut(base) {
678 bitmap.extend_to(overlay.len);
680 for (&idx, chunk_bytes) in &overlay.chunks {
682 if idx >= bitmap.pruned_chunks() {
683 bitmap.set_chunk_by_index(idx, chunk_bytes);
684 }
685 }
686 return;
687 }
688 }
689
690 let parent = self.clone();
692 *self = Self::Layer(Arc::new(BitmapBatchLayer { parent, overlay }));
693 }
694
695 pub(super) fn flatten(&mut self) {
700 if matches!(self, Self::Base(_)) {
701 return;
702 }
703
704 let mut owned = std::mem::replace(self, Self::Base(Arc::new(BitMap::default())));
707
708 let mut overlays = Vec::new();
710 let base = loop {
711 match owned {
712 Self::Base(bm) => break bm,
713 Self::Layer(layer) => match Arc::try_unwrap(layer) {
714 Ok(inner) => {
715 overlays.push(inner.overlay);
716 owned = inner.parent;
717 }
718 Err(arc) => {
719 overlays.push(arc.overlay.clone());
720 owned = arc.parent.clone();
721 }
722 },
723 }
724 };
725
726 let mut bitmap = Arc::try_unwrap(base).unwrap_or_else(|arc| (*arc).clone());
728 for overlay in overlays.into_iter().rev() {
729 bitmap.extend_to(overlay.len);
731 for (&idx, chunk_bytes) in &overlay.chunks {
733 if idx >= bitmap.pruned_chunks() {
734 bitmap.set_chunk_by_index(idx, chunk_bytes);
735 }
736 }
737 }
738 *self = Self::Base(Arc::new(bitmap));
739 }
740}
741
742impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize>
743 MerkleizedBatch<F, D, U, N>
744where
745 Operation<F, U>: Send + Sync,
746{
747 pub const fn root(&self) -> D {
749 self.canonical_root
750 }
751
752 pub fn ops_root(&self) -> D {
754 self.inner.root()
755 }
756}
757
758impl<F: Graftable, D: Digest, U: update::Update + Send + Sync, const N: usize>
759 MerkleizedBatch<F, D, U, N>
760where
761 Operation<F, U>: Codec,
762{
763 pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, U, N>
769 where
770 H: Hasher<Digest = D>,
771 {
772 UnmerkleizedBatch::new(
773 self.inner.new_batch::<H>(),
774 Arc::clone(&self.grafted),
775 self.bitmap.clone(),
776 )
777 }
778
779 pub async fn get<E, C, I, H>(
781 &self,
782 key: &U::Key,
783 db: &super::db::Db<F, E, C, I, H, U, N>,
784 ) -> Result<Option<U::Value>, Error<F>>
785 where
786 E: Context,
787 C: Contiguous<Item = Operation<F, U>>,
788 I: UnorderedIndex<Value = Location<F>> + 'static,
789 H: Hasher<Digest = D>,
790 {
791 self.inner.get(key, &db.any).await
792 }
793}
794
795impl<F, E, C, I, H, U, const N: usize> super::db::Db<F, E, C, I, H, U, N>
796where
797 F: Graftable,
798 E: Context,
799 U: update::Update + Send + Sync,
800 C: Contiguous<Item = Operation<F, U>>,
801 I: UnorderedIndex<Value = Location<F>>,
802 H: Hasher,
803 Operation<F, U>: Codec,
804{
805 pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, U, N>> {
807 let grafted = self.grafted_snapshot();
808 Arc::new(MerkleizedBatch {
809 inner: self.any.to_batch(),
810 grafted,
811 bitmap: self.status.clone(),
812 canonical_root: self.root,
813 })
814 }
815}
816
817#[cfg(any(test, feature = "test-traits"))]
818mod trait_impls {
819 use super::*;
820 use crate::{
821 journal::contiguous::Mutable,
822 qmdb::any::traits::{
823 BatchableDb, MerkleizedBatch as MerkleizedBatchTrait,
824 UnmerkleizedBatch as UnmerkleizedBatchTrait,
825 },
826 Persistable,
827 };
828 use std::future::Future;
829
830 type CurrentDb<F, E, C, I, H, U, const N: usize> =
831 crate::qmdb::current::db::Db<F, E, C, I, H, U, N>;
832
833 impl<F, K, V, H, E, C, I, const N: usize>
834 UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N>>
835 for UnmerkleizedBatch<F, H, update::Unordered<K, V>, N>
836 where
837 F: Graftable,
838 K: Key,
839 V: ValueEncoding + 'static,
840 H: Hasher,
841 E: Context,
842 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
843 + Persistable<Error = crate::journal::Error>,
844 I: UnorderedIndex<Value = Location<F>> + 'static,
845 Operation<F, update::Unordered<K, V>>: Codec,
846 {
847 type Family = F;
848 type K = K;
849 type V = V::Value;
850 type Metadata = V::Value;
851 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N>>;
852
853 fn write(self, key: K, value: Option<V::Value>) -> Self {
854 Self::write(self, key, value)
855 }
856
857 fn merkleize(
858 self,
859 db: &CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N>,
860 metadata: Option<V::Value>,
861 ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
862 self.merkleize(db, metadata)
863 }
864 }
865
866 impl<F, K, V, H, E, C, I, const N: usize>
867 UnmerkleizedBatchTrait<CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N>>
868 for UnmerkleizedBatch<F, H, update::Ordered<K, V>, N>
869 where
870 F: Graftable,
871 K: Key,
872 V: ValueEncoding + 'static,
873 H: Hasher,
874 E: Context,
875 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
876 + Persistable<Error = crate::journal::Error>,
877 I: crate::index::Ordered<Value = Location<F>> + 'static,
878 Operation<F, update::Ordered<K, V>>: Codec,
879 {
880 type Family = F;
881 type K = K;
882 type V = V::Value;
883 type Metadata = V::Value;
884 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N>>;
885
886 fn write(self, key: K, value: Option<V::Value>) -> Self {
887 Self::write(self, key, value)
888 }
889
890 fn merkleize(
891 self,
892 db: &CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N>,
893 metadata: Option<V::Value>,
894 ) -> impl Future<Output = Result<Self::Merkleized, crate::qmdb::Error<F>>> {
895 self.merkleize(db, metadata)
896 }
897 }
898
899 impl<F: Graftable, D: Digest, U: update::Update + Send + Sync + 'static, const N: usize>
900 MerkleizedBatchTrait for Arc<MerkleizedBatch<F, D, U, N>>
901 where
902 Operation<F, U>: Codec,
903 {
904 type Digest = D;
905
906 fn root(&self) -> D {
907 MerkleizedBatch::root(self)
908 }
909 }
910
911 impl<F, E, K, V, C, I, H, const N: usize> BatchableDb
912 for CurrentDb<F, E, C, I, H, update::Unordered<K, V>, N>
913 where
914 F: Graftable,
915 E: Context,
916 K: Key,
917 V: ValueEncoding + 'static,
918 C: Mutable<Item = Operation<F, update::Unordered<K, V>>>
919 + Persistable<Error = crate::journal::Error>,
920 I: UnorderedIndex<Value = Location<F>> + 'static,
921 H: Hasher,
922 Operation<F, update::Unordered<K, V>>: Codec,
923 {
924 type Family = F;
925 type K = K;
926 type V = V::Value;
927 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Unordered<K, V>, N>>;
928 type Batch = UnmerkleizedBatch<F, H, update::Unordered<K, V>, N>;
929
930 fn new_batch(&self) -> Self::Batch {
931 self.new_batch()
932 }
933
934 fn apply_batch(
935 &mut self,
936 batch: Self::Merkleized,
937 ) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
938 {
939 self.apply_batch(batch)
940 }
941 }
942
943 impl<F, E, K, V, C, I, H, const N: usize> BatchableDb
944 for CurrentDb<F, E, C, I, H, update::Ordered<K, V>, N>
945 where
946 F: Graftable,
947 E: Context,
948 K: Key,
949 V: ValueEncoding + 'static,
950 C: Mutable<Item = Operation<F, update::Ordered<K, V>>>
951 + Persistable<Error = crate::journal::Error>,
952 I: crate::index::Ordered<Value = Location<F>> + 'static,
953 H: Hasher,
954 Operation<F, update::Ordered<K, V>>: Codec,
955 {
956 type Family = F;
957 type K = K;
958 type V = V::Value;
959 type Merkleized = Arc<MerkleizedBatch<F, H::Digest, update::Ordered<K, V>, N>>;
960 type Batch = UnmerkleizedBatch<F, H, update::Ordered<K, V>, N>;
961
962 fn new_batch(&self) -> Self::Batch {
963 self.new_batch()
964 }
965
966 fn apply_batch(
967 &mut self,
968 batch: Self::Merkleized,
969 ) -> impl Future<Output = Result<core::ops::Range<Location<F>>, crate::qmdb::Error<F>>>
970 {
971 self.apply_batch(batch)
972 }
973 }
974}
975
976#[cfg(test)]
977mod tests {
978 use super::*;
979 use crate::mmr;
980 use commonware_utils::bitmap::Prunable as BitMap;
981
982 const N: usize = 4;
984 type Bm = BitMap<N>;
985 type Location = mmr::Location;
986
987 fn make_bitmap(bits: &[bool]) -> Bm {
988 let mut bm = Bm::new();
989 for &b in bits {
990 bm.push(b);
991 }
992 bm
993 }
994
995 #[test]
998 fn chunk_overlay_pushes() {
999 use crate::qmdb::any::value::FixedEncoding;
1000 use commonware_utils::sequence::FixedBytes;
1001
1002 type K = FixedBytes<4>;
1003 type V = FixedEncoding<u64>;
1004 type U = crate::qmdb::any::operation::update::Unordered<K, V>;
1005
1006 let key1 = FixedBytes::from([1, 0, 0, 0]);
1007 let key2 = FixedBytes::from([2, 0, 0, 0]);
1008
1009 let base = make_bitmap(&[true; 4]);
1014 let mut diff = BTreeMap::new();
1015 diff.insert(
1016 key1,
1017 DiffEntry::Active {
1018 value: 100u64,
1019 loc: Location::new(4), base_old_loc: None,
1021 },
1022 );
1023 diff.insert(
1024 key2,
1025 DiffEntry::Active {
1026 value: 200u64,
1027 loc: Location::new(99), base_old_loc: None,
1029 },
1030 );
1031
1032 let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 4, 4, &diff, &[]);
1033
1034 let c0 = overlay.get(0).expect("chunk 0 should be dirty");
1037 assert_ne!(c0[0] & (1 << 4), 0); assert_eq!(c0[0] & (1 << 5), 0); assert_eq!(c0[0] & (1 << 6), 0); assert_ne!(c0[0] & (1 << 7), 0); assert_eq!(c0[0] & (1 << 3), 0); }
1043
1044 #[test]
1045 fn chunk_overlay_clears() {
1046 use crate::qmdb::any::value::FixedEncoding;
1047 use commonware_utils::sequence::FixedBytes;
1048
1049 type K = FixedBytes<4>;
1050 type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
1051
1052 let key1 = FixedBytes::from([1, 0, 0, 0]);
1053 let key2 = FixedBytes::from([2, 0, 0, 0]);
1054 let key3 = FixedBytes::from([3, 0, 0, 0]);
1055
1056 let base = make_bitmap(&[true; 64]);
1058
1059 let mut diff: BTreeMap<K, DiffEntry<mmr::Family, u64>> = BTreeMap::new();
1060
1061 diff.insert(
1063 key1,
1064 DiffEntry::Active {
1065 value: 100,
1066 loc: Location::new(70),
1067 base_old_loc: Some(Location::new(5)),
1068 },
1069 );
1070
1071 diff.insert(
1073 key2,
1074 DiffEntry::Deleted {
1075 base_old_loc: Some(Location::new(10)),
1076 },
1077 );
1078
1079 diff.insert(
1081 key3,
1082 DiffEntry::Active {
1083 value: 300,
1084 loc: Location::new(71),
1085 base_old_loc: None,
1086 },
1087 );
1088
1089 let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 8, 64, &diff, &[]);
1091
1092 let c0 = overlay.get(0).expect("chunk 0 should be dirty");
1094 assert_eq!(c0[0] & (1 << 5), 0); assert_eq!(c0[1] & (1 << 2), 0); assert_eq!(c0[0] & (1 << 4), 1 << 4); assert_eq!(c0[1] & (1 << 3), 1 << 3); }
1101
1102 #[test]
1106 fn chunk_overlay_preserves_partial_parent_chunk() {
1107 use crate::qmdb::any::value::FixedEncoding;
1108 use commonware_utils::sequence::FixedBytes;
1109
1110 type K = FixedBytes<4>;
1111 type U = crate::qmdb::any::operation::update::Unordered<K, FixedEncoding<u64>>;
1112
1113 let base = make_bitmap(&[true; 20]);
1115 assert_eq!(base.complete_chunks(), 0); let key1 = FixedBytes::from([1, 0, 0, 0]);
1121 let mut diff = BTreeMap::new();
1122 diff.insert(
1123 key1,
1124 DiffEntry::Active {
1125 value: 42u64,
1126 loc: Location::new(35),
1127 base_old_loc: None,
1128 },
1129 );
1130
1131 let overlay = build_chunk_overlay::<mmr::Family, U, _, N>(&base, 20, 20, &diff, &[]);
1132
1133 let c0 = overlay.get(0).expect("chunk 0 should be in overlay");
1135 assert_eq!(c0[0], 0xFF);
1137 assert_eq!(c0[1], 0xFF);
1139 assert_eq!(c0[2], 0x07);
1141 }
1142
1143 use crate::qmdb::any::batch::{FloorScan, SequentialScan};
1146
1147 #[test]
1148 fn sequential_scan_returns_floor_when_below_tip() {
1149 let mut scan = SequentialScan;
1150 assert_eq!(
1151 scan.next_candidate(Location::new(5), 10),
1152 Some(Location::new(5))
1153 );
1154 }
1155
1156 #[test]
1157 fn sequential_scan_returns_none_at_tip() {
1158 let mut scan = SequentialScan;
1159 assert_eq!(scan.next_candidate(Location::new(10), 10), None);
1160 assert_eq!(scan.next_candidate(Location::new(11), 10), None);
1161 }
1162
1163 #[test]
1164 fn bitmap_scan_all_active() {
1165 let bm = make_bitmap(&[true; 8]);
1166 let mut scan = BitmapScan::<Bm, N>::new(&bm);
1167 for i in 0..8 {
1168 assert_eq!(
1169 scan.next_candidate(Location::new(i), 8),
1170 Some(Location::new(i))
1171 );
1172 }
1173 assert_eq!(scan.next_candidate(Location::new(8), 8), None);
1174 }
1175
1176 #[test]
1177 fn bitmap_scan_all_inactive() {
1178 let bm = make_bitmap(&[false; 8]);
1179 let mut scan = BitmapScan::<Bm, N>::new(&bm);
1180 assert_eq!(scan.next_candidate(Location::new(0), 8), None);
1181 }
1182
1183 #[test]
1184 fn bitmap_scan_skips_inactive() {
1185 let bm = make_bitmap(&[false, false, true, false, true]);
1187 let mut scan = BitmapScan::<Bm, N>::new(&bm);
1188
1189 assert_eq!(
1190 scan.next_candidate(Location::new(0), 5),
1191 Some(Location::new(2))
1192 );
1193 assert_eq!(
1194 scan.next_candidate(Location::new(3), 5),
1195 Some(Location::new(4))
1196 );
1197 assert_eq!(scan.next_candidate(Location::new(5), 5), None);
1198 }
1199
1200 #[test]
1201 fn bitmap_scan_beyond_bitmap_len_returns_candidate() {
1202 let bm = make_bitmap(&[false; 4]);
1205 let mut scan = BitmapScan::<Bm, N>::new(&bm);
1206
1207 assert_eq!(
1210 scan.next_candidate(Location::new(0), 8),
1211 Some(Location::new(4))
1212 );
1213 assert_eq!(
1214 scan.next_candidate(Location::new(6), 8),
1215 Some(Location::new(6))
1216 );
1217 }
1218
1219 #[test]
1220 fn bitmap_scan_respects_tip() {
1221 let bm = make_bitmap(&[false, false, false, true]);
1222 let mut scan = BitmapScan::<Bm, N>::new(&bm);
1223
1224 assert_eq!(scan.next_candidate(Location::new(0), 3), None);
1226 assert_eq!(
1228 scan.next_candidate(Location::new(0), 4),
1229 Some(Location::new(3))
1230 );
1231 }
1232
1233 #[test]
1234 fn bitmap_scan_floor_at_tip() {
1235 let bm = make_bitmap(&[true; 4]);
1236 let mut scan = BitmapScan::<Bm, N>::new(&bm);
1237 assert_eq!(scan.next_candidate(Location::new(4), 4), None);
1238 }
1239
1240 #[test]
1241 fn bitmap_scan_empty_bitmap() {
1242 let bm = Bm::new();
1243 let mut scan = BitmapScan::<Bm, N>::new(&bm);
1244
1245 assert_eq!(
1247 scan.next_candidate(Location::new(0), 5),
1248 Some(Location::new(0))
1249 );
1250 assert_eq!(scan.next_candidate(Location::new(0), 0), None);
1252 }
1253
1254 #[test]
1255 fn test_apply_overlay() {
1256 let base = make_bitmap(&[true; 8]);
1258 let mut batch = BitmapBatch::Base(Arc::new(base));
1259
1260 let mut overlay = ChunkOverlay::new(12);
1261 let mut c0 = [0u8; N];
1262 c0[0] = 0b1111_0111; c0[1] = 0b0000_0100; overlay.chunks.insert(0, c0);
1265 batch.apply_overlay(Arc::new(overlay));
1266
1267 assert!(matches!(batch, BitmapBatch::Base(_)));
1269 assert_eq!(batch.len(), 12);
1270 assert_eq!(batch.get_chunk(0)[0] & (1 << 3), 0);
1271 assert_ne!(batch.get_chunk(0)[1] & (1 << 2), 0);
1272
1273 let BitmapBatch::Base(ref base_arc) = batch else {
1275 panic!("expected Base");
1276 };
1277 let _shared = Arc::clone(base_arc);
1278 let overlay2 = ChunkOverlay::new(12);
1279 batch.apply_overlay(Arc::new(overlay2));
1280 assert!(matches!(batch, BitmapBatch::Layer(_)));
1281 }
1282}