1use crate::{
49 merkle::{Family, Location, Proof},
50 qmdb::{
51 self,
52 any::{value::ValueEncoding, FixedValue, VariableValue},
53 immutable::{
54 fixed::{Db as ImmutableFixedDb, Operation as ImmutableFixedOp},
55 variable::{Db as ImmutableVariableDb, Operation as ImmutableVariableOp},
56 CompactDb as ImmutableCompactDb, Operation as ImmutableOp,
57 },
58 keyless::{
59 fixed::{Db as KeylessFixedDb, Operation as KeylessFixedOp},
60 variable::{Db as KeylessVariableDb, Operation as KeylessVariableOp},
61 CompactDb as KeylessCompactDb, Operation as KeylessOp,
62 },
63 operation::Key,
64 sync::{EngineError, Error},
65 verify_proof,
66 },
67 translator::Translator,
68};
69use commonware_codec::{
70 Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write,
71};
72use commonware_cryptography::{Digest, Hasher};
73use commonware_parallel::Strategy;
74use commonware_runtime::{Buf, BufMut, Clock, Metrics, Storage, Supervisor};
75use commonware_utils::{channel::oneshot, sync::AsyncRwLock, Array};
76use std::{future::Future, num::NonZeroU64, sync::Arc};
77
78#[derive(Debug)]
84pub struct Target<F: Family, D: Digest> {
85 pub root: D,
87 pub leaf_count: Location<F>,
89}
90
91impl<F: Family, D: Digest> Target<F, D> {
92 const INVALID_LEAF_COUNT: &'static str = "leaf_count must be in 1..=MAX_LEAVES";
93
94 pub const fn new(root: D, leaf_count: Location<F>) -> Self {
96 Self { root, leaf_count }
97 }
98
99 pub fn validate(&self) -> Result<(), &'static str> {
101 if !self.leaf_count.is_valid() || self.leaf_count == 0 {
102 return Err(Self::INVALID_LEAF_COUNT);
103 }
104 Ok(())
105 }
106}
107
108impl<F: Family, D: Digest> Clone for Target<F, D> {
109 fn clone(&self) -> Self {
110 Self {
111 root: self.root,
112 leaf_count: self.leaf_count,
113 }
114 }
115}
116
117impl<F: Family, D: Digest> PartialEq for Target<F, D> {
118 fn eq(&self, other: &Self) -> bool {
119 self.root == other.root && self.leaf_count == other.leaf_count
120 }
121}
122
123impl<F: Family, D: Digest> Eq for Target<F, D> {}
124
125impl<F: Family, D: Digest> Write for Target<F, D> {
126 fn write(&self, buf: &mut impl BufMut) {
127 self.root.write(buf);
128 self.leaf_count.write(buf);
129 }
130}
131
132impl<F: Family, D: Digest> EncodeSize for Target<F, D> {
133 fn encode_size(&self) -> usize {
134 self.root.encode_size() + self.leaf_count.encode_size()
135 }
136}
137
138impl<F: Family, D: Digest> Read for Target<F, D> {
139 type Cfg = ();
140
141 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
142 let root = D::read(buf)?;
143 let leaf_count = Location::<F>::read(buf)?;
144 let target = Self { root, leaf_count };
145 target.validate().map_err(|reason| {
146 CodecError::Invalid("storage::qmdb::sync::compact::Target", reason)
147 })?;
148 Ok(target)
149 }
150}
151
152#[cfg(feature = "arbitrary")]
153impl<F: Family, D: Digest> arbitrary::Arbitrary<'_> for Target<F, D>
154where
155 D: for<'a> arbitrary::Arbitrary<'a>,
156{
157 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
158 let root = u.arbitrary()?;
159 let leaf_count = Location::new(u.int_in_range(1..=*F::MAX_LEAVES)?);
160 Ok(Self { root, leaf_count })
161 }
162}
163
164#[derive(Clone, Debug)]
166pub struct State<F: Family, Op, D: Digest> {
167 pub leaf_count: Location<F>,
169 pub pinned_nodes: Vec<D>,
171 pub last_commit_op: Op,
173 pub last_commit_proof: Proof<F, D>,
175}
176
177#[derive(Clone, Debug)]
183pub struct ValidatedState<F: Family, Op, D: Digest> {
184 pub state: State<F, Op, D>,
186 pub root: D,
188 pub inactivity_floor: Location<F>,
190}
191
192impl<F: Family, Op, D: Digest> ValidatedState<F, Op, D> {
193 const fn new(state: State<F, Op, D>, root: D, inactivity_floor: Location<F>) -> Self {
194 Self {
195 state,
196 root,
197 inactivity_floor,
198 }
199 }
200}
201
202impl<F: Family, Op, D: Digest> Write for State<F, Op, D>
203where
204 Op: Write,
205{
206 fn write(&self, buf: &mut impl BufMut) {
207 self.leaf_count.write(buf);
208 self.pinned_nodes.write(buf);
209 self.last_commit_op.write(buf);
210 self.last_commit_proof.write(buf);
211 }
212}
213
214pub struct FetchResult<F: Family, Op, D: Digest> {
216 pub state: State<F, Op, D>,
218 pub callback: Option<oneshot::Sender<bool>>,
220}
221
222impl<F: Family, Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<F, Op, D> {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 f.debug_struct("FetchResult")
225 .field("state", &self.state)
226 .field("callback", &self.callback.as_ref().map(|_| "<callback>"))
227 .finish()
228 }
229}
230
231impl<F: Family, Op, D: Digest> From<State<F, Op, D>> for FetchResult<F, Op, D> {
232 fn from(state: State<F, Op, D>) -> Self {
233 Self {
234 state,
235 callback: None,
236 }
237 }
238}
239
240impl<F: Family, Op, D: Digest> EncodeSize for State<F, Op, D>
241where
242 Op: EncodeSize,
243{
244 fn encode_size(&self) -> usize {
245 self.leaf_count.encode_size()
246 + self.pinned_nodes.encode_size()
247 + self.last_commit_op.encode_size()
248 + self.last_commit_proof.encode_size()
249 }
250}
251
252impl<F: Family, Op, D: Digest> Read for State<F, Op, D>
253where
254 Op: Read,
255{
256 type Cfg = (RangeCfg<usize>, Op::Cfg, usize);
257
258 fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
259 let (pinned_nodes_cfg, op_cfg, max_proof_digests) = cfg;
260 Ok(Self {
261 leaf_count: Location::<F>::read(buf)?,
262 pinned_nodes: Vec::<D>::read_cfg(buf, &(*pinned_nodes_cfg, ()))?,
263 last_commit_op: Op::read_cfg(buf, op_cfg)?,
264 last_commit_proof: Proof::<F, D>::read_cfg(buf, max_proof_digests)?,
265 })
266 }
267}
268
269#[derive(Debug, thiserror::Error)]
271pub enum ServeError<F: Family, D: Digest> {
272 #[error("compact source database error: {0}")]
274 Database(#[from] qmdb::Error<F>),
275 #[error("invalid compact target: {0}")]
277 InvalidTarget(&'static str),
278 #[error("compact source missing")]
280 MissingSource,
281 #[error("stale compact target - requested {requested:?}, current {current:?}")]
283 StaleTarget {
284 requested: Target<F, D>,
285 current: Target<F, D>,
286 },
287}
288
289#[allow(clippy::type_complexity)]
291pub trait Resolver: Send + Sync + Clone + 'static {
292 type Family: Family;
294
295 type Digest: Digest;
297
298 type Op;
300
301 type Error: std::error::Error + Send + 'static;
303
304 fn get_compact_state<'a>(
306 &'a self,
307 target: Target<Self::Family, Self::Digest>,
308 ) -> impl Future<Output = Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error>>
309 + Send
310 + 'a;
311}
312
313pub trait CompactDbResolver<DB: Database>:
319 Resolver<Family = DB::Family, Op = DB::Op, Digest = DB::Digest>
320{
321}
322
323impl<DB, R> CompactDbResolver<DB> for R
324where
325 DB: Database,
326 R: Resolver<Family = DB::Family, Op = DB::Op, Digest = DB::Digest>,
327{
328}
329
330pub trait Database: Sized + Send {
332 type Family: Family;
333 type Op: Encode + Send;
334 type Config: Clone;
335 type Digest: Digest;
336 type Context: Storage + Clock + Metrics;
337 type Hasher: Hasher<Digest = Self::Digest>;
338
339 fn from_validated_state(
346 context: Self::Context,
347 config: Self::Config,
348 state: ValidatedState<Self::Family, Self::Op, Self::Digest>,
349 ) -> impl Future<Output = Result<Self, qmdb::Error<Self::Family>>> + Send;
350
351 fn inactivity_floor(op: &Self::Op) -> Option<Location<Self::Family>>;
353
354 fn root(&self) -> Self::Digest;
356
357 fn persist_compact_state(
359 &self,
360 ) -> impl Future<Output = Result<(), qmdb::Error<Self::Family>>> + Send;
361}
362
363pub struct Config<DB, R>
365where
366 DB: Database,
367 R: CompactDbResolver<DB>,
368{
369 pub context: DB::Context,
371 pub resolver: R,
373 pub target: Target<DB::Family, DB::Digest>,
375 pub db_config: DB::Config,
377}
378
379pub async fn sync<DB, R>(
394 config: Config<DB, R>,
395) -> Result<DB, Error<DB::Family, R::Error, DB::Digest>>
396where
397 DB: Database,
398 R: CompactDbResolver<DB>,
399{
400 let target = config.target;
401 target
402 .validate()
403 .map_err(|reason| Error::Engine(EngineError::InvalidCompactTarget(reason)))?;
404
405 loop {
408 let FetchResult { state, callback } = config
409 .resolver
410 .get_compact_state(target.clone())
411 .await
412 .map_err(Error::Resolver)?;
413
414 let validated_state = match validate_compact_state::<DB>(&target, state) {
417 Ok(state) => state,
418 Err(err) => {
419 if let Some(callback) = callback {
420 let _ = callback.send(false);
421 }
422 tracing::debug!(error = ?err, "compact state failed validation, will retry");
423 continue;
424 }
425 };
426
427 let db = DB::from_validated_state(
431 config.context.child("compact"),
432 config.db_config.clone(),
433 validated_state,
434 )
435 .await
436 .map_err(Error::Database)?;
437 assert_eq!(
438 db.root(),
439 target.root,
440 "validated compact state reconstructed unexpected root",
441 );
442
443 if let Some(callback) = callback {
444 let _ = callback.send(true);
445 }
446 db.persist_compact_state().await?;
447 return Ok(db);
448 }
449}
450
451fn validate_compact_state<DB>(
453 target: &Target<DB::Family, DB::Digest>,
454 state: State<DB::Family, DB::Op, DB::Digest>,
455) -> CompactFrontierValidation<DB>
456where
457 DB: Database,
458{
459 if state.leaf_count != target.leaf_count {
460 return Err(EngineError::UnexpectedLeafCount {
461 expected: target.leaf_count,
462 actual: state.leaf_count,
463 });
464 }
465
466 let hasher = qmdb::hasher::<DB::Hasher>();
467 let last_commit_loc = Location::new(*state.leaf_count - 1);
468 if !verify_proof(
469 &hasher,
470 &state.last_commit_proof,
471 last_commit_loc,
472 std::slice::from_ref(&state.last_commit_op),
473 &target.root,
474 ) {
475 return Err(EngineError::InvalidProof);
476 }
477
478 validate_compact_frontier::<DB>(target, state)
479}
480
481type CompactFrontierValidation<DB> = Result<
483 ValidatedState<<DB as Database>::Family, <DB as Database>::Op, <DB as Database>::Digest>,
484 EngineError<<DB as Database>::Family, <DB as Database>::Digest>,
485>;
486
487fn validate_compact_frontier<DB>(
489 target: &Target<DB::Family, DB::Digest>,
490 state: State<DB::Family, DB::Op, DB::Digest>,
491) -> CompactFrontierValidation<DB>
492where
493 DB: Database,
494{
495 let last_commit_loc = Location::new(*state.leaf_count - 1);
498 let Some(inactivity_floor_loc) = DB::inactivity_floor(&state.last_commit_op) else {
499 return Err(EngineError::InvalidProof);
500 };
501 if inactivity_floor_loc > last_commit_loc {
502 return Err(EngineError::InvalidProof);
503 }
504
505 let mem = crate::merkle::mem::Mem::<DB::Family, DB::Digest>::init(crate::merkle::mem::Config {
508 nodes: Vec::new(),
509 pruning_boundary: state.leaf_count,
510 pinned_nodes: state.pinned_nodes.clone(),
511 })
512 .map_err(|_| EngineError::InvalidProof)?;
513 let hasher = qmdb::hasher::<DB::Hasher>();
514 let inactive_peaks = DB::Family::inactive_peaks(
515 DB::Family::location_to_position(state.leaf_count),
516 inactivity_floor_loc,
517 );
518 let actual = mem
519 .root(&hasher, inactive_peaks)
520 .map_err(|_| EngineError::InvalidProof)?;
521 if actual != target.root {
522 return Err(EngineError::RootMismatch {
523 expected: target.root,
524 actual,
525 });
526 }
527
528 Ok(ValidatedState::new(
529 state,
530 target.root,
531 inactivity_floor_loc,
532 ))
533}
534
535async fn fetch_state_from_full_source<F, Op, D, Current, CurrentFut, Hist, HistFut, Pins, PinsFut>(
536 target: Target<F, D>,
537 current_target: Current,
538 historical_proof: Hist,
539 pinned_nodes_at: Pins,
540) -> Result<State<F, Op, D>, ServeError<F, D>>
541where
542 F: Family,
543 D: Digest,
544 Current: FnOnce() -> CurrentFut,
545 CurrentFut: Future<Output = Target<F, D>>,
546 Hist: FnOnce(Location<F>, Location<F>) -> HistFut,
547 HistFut: Future<Output = Result<(Proof<F, D>, Vec<Op>), qmdb::Error<F>>>,
548 Pins: FnOnce(Location<F>) -> PinsFut,
549 PinsFut: Future<Output = Result<Vec<D>, qmdb::Error<F>>>,
550{
551 target.validate().map_err(ServeError::InvalidTarget)?;
554 let current = current_target().await;
555 if target.root != current.root || target.leaf_count != current.leaf_count {
556 return Err(ServeError::StaleTarget {
557 requested: target,
558 current,
559 });
560 }
561 let leaf_count = target.leaf_count;
562 let last_commit_loc = Location::new(*leaf_count - 1);
563 let (last_commit_proof, mut operations) = historical_proof(leaf_count, last_commit_loc)
564 .await
565 .map_err(ServeError::Database)?;
566 let last_commit_op =
568 operations
569 .pop()
570 .ok_or(ServeError::Database(qmdb::Error::DataCorrupted(
571 "missing last commit operation",
572 )))?;
573 let pinned_nodes = pinned_nodes_at(leaf_count)
574 .await
575 .map_err(ServeError::Database)?;
576 Ok(State {
577 leaf_count,
578 pinned_nodes,
579 last_commit_op,
580 last_commit_proof,
581 })
582}
583
584macro_rules! impl_compact_resolver_keyless {
587 ($db:ident, $op:ident, $val_bound:ident) => {
588 impl<F, E, V, H, S> Resolver for Arc<$db<F, E, V, H, S>>
589 where
590 F: Family,
591 E: crate::Context,
592 V: $val_bound + Send + Sync + 'static,
593 H: Hasher,
594 S: Strategy,
595 {
596 type Family = F;
597 type Digest = H::Digest;
598 type Op = $op<F, V>;
599 type Error = ServeError<F, H::Digest>;
600
601 async fn get_compact_state(
602 &self,
603 target: Target<Self::Family, Self::Digest>,
604 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
605 fetch_state_from_full_source(
606 target,
607 || async { Target::new(self.root(), self.bounds().await.end) },
608 |leaf_count, last_commit_loc| {
609 self.historical_proof(
610 leaf_count,
611 last_commit_loc,
612 NonZeroU64::new(1).unwrap(),
613 )
614 },
615 |leaf_count| self.pinned_nodes_at(leaf_count),
616 )
617 .await
618 .map(Into::into)
619 }
620 }
621
622 impl<F, E, V, H, S> Resolver for Arc<AsyncRwLock<$db<F, E, V, H, S>>>
623 where
624 F: Family,
625 E: crate::Context,
626 V: $val_bound + Send + Sync + 'static,
627 H: Hasher,
628 S: Strategy,
629 {
630 type Family = F;
631 type Digest = H::Digest;
632 type Op = $op<F, V>;
633 type Error = ServeError<F, H::Digest>;
634
635 async fn get_compact_state(
636 &self,
637 target: Target<Self::Family, Self::Digest>,
638 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
639 let db = self.read().await;
640 fetch_state_from_full_source(
641 target,
642 || async { Target::new(db.root(), db.bounds().await.end) },
643 |leaf_count, last_commit_loc| {
644 db.historical_proof(
645 leaf_count,
646 last_commit_loc,
647 NonZeroU64::new(1).unwrap(),
648 )
649 },
650 |leaf_count| db.pinned_nodes_at(leaf_count),
651 )
652 .await
653 .map(Into::into)
654 }
655 }
656
657 impl<F, E, V, H, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, V, H, S>>>>
658 where
659 F: Family,
660 E: crate::Context,
661 V: $val_bound + Send + Sync + 'static,
662 H: Hasher,
663 S: Strategy,
664 {
665 type Family = F;
666 type Digest = H::Digest;
667 type Op = $op<F, V>;
668 type Error = ServeError<F, H::Digest>;
669
670 async fn get_compact_state(
671 &self,
672 target: Target<Self::Family, Self::Digest>,
673 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
674 let guard = self.read().await;
675 let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
676 fetch_state_from_full_source(
677 target,
678 || async { Target::new(db.root(), db.bounds().await.end) },
679 |leaf_count, last_commit_loc| {
680 db.historical_proof(
681 leaf_count,
682 last_commit_loc,
683 NonZeroU64::new(1).unwrap(),
684 )
685 },
686 |leaf_count| db.pinned_nodes_at(leaf_count),
687 )
688 .await
689 .map(Into::into)
690 }
691 }
692 };
693}
694
695macro_rules! impl_compact_resolver_immutable {
698 ($db:ident, $op:ident, $val_bound:ident, $key_bound:path) => {
699 impl<F, E, K, V, H, T, S> Resolver for Arc<$db<F, E, K, V, H, T, S>>
700 where
701 F: Family,
702 E: crate::Context,
703 K: $key_bound,
704 V: $val_bound + Send + Sync + 'static,
705 H: Hasher,
706 T: Translator + Send + Sync + 'static,
707 T::Key: Send + Sync,
708 S: Strategy,
709 {
710 type Family = F;
711 type Digest = H::Digest;
712 type Op = $op<F, K, V>;
713 type Error = ServeError<F, H::Digest>;
714
715 async fn get_compact_state(
716 &self,
717 target: Target<Self::Family, Self::Digest>,
718 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
719 fetch_state_from_full_source(
720 target,
721 || async { Target::new(self.root(), self.bounds().await.end) },
722 |leaf_count, last_commit_loc| {
723 self.historical_proof(
724 leaf_count,
725 last_commit_loc,
726 NonZeroU64::new(1).unwrap(),
727 )
728 },
729 |leaf_count| self.pinned_nodes_at(leaf_count),
730 )
731 .await
732 .map(Into::into)
733 }
734 }
735
736 impl<F, E, K, V, H, T, S> Resolver for Arc<AsyncRwLock<$db<F, E, K, V, H, T, S>>>
737 where
738 F: Family,
739 E: crate::Context,
740 K: $key_bound,
741 V: $val_bound + Send + Sync + 'static,
742 H: Hasher,
743 T: Translator + Send + Sync + 'static,
744 T::Key: Send + Sync,
745 S: Strategy,
746 {
747 type Family = F;
748 type Digest = H::Digest;
749 type Op = $op<F, K, V>;
750 type Error = ServeError<F, H::Digest>;
751
752 async fn get_compact_state(
753 &self,
754 target: Target<Self::Family, Self::Digest>,
755 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
756 let db = self.read().await;
757 fetch_state_from_full_source(
758 target,
759 || async { Target::new(db.root(), db.bounds().await.end) },
760 |leaf_count, last_commit_loc| {
761 db.historical_proof(
762 leaf_count,
763 last_commit_loc,
764 NonZeroU64::new(1).unwrap(),
765 )
766 },
767 |leaf_count| db.pinned_nodes_at(leaf_count),
768 )
769 .await
770 .map(Into::into)
771 }
772 }
773
774 impl<F, E, K, V, H, T, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, K, V, H, T, S>>>>
775 where
776 F: Family,
777 E: crate::Context,
778 K: $key_bound,
779 V: $val_bound + Send + Sync + 'static,
780 H: Hasher,
781 T: Translator + Send + Sync + 'static,
782 T::Key: Send + Sync,
783 S: Strategy,
784 {
785 type Family = F;
786 type Digest = H::Digest;
787 type Op = $op<F, K, V>;
788 type Error = ServeError<F, H::Digest>;
789
790 async fn get_compact_state(
791 &self,
792 target: Target<Self::Family, Self::Digest>,
793 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
794 let guard = self.read().await;
795 let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
796 fetch_state_from_full_source(
797 target,
798 || async { Target::new(db.root(), db.bounds().await.end) },
799 |leaf_count, last_commit_loc| {
800 db.historical_proof(
801 leaf_count,
802 last_commit_loc,
803 NonZeroU64::new(1).unwrap(),
804 )
805 },
806 |leaf_count| db.pinned_nodes_at(leaf_count),
807 )
808 .await
809 .map(Into::into)
810 }
811 }
812 };
813}
814
815macro_rules! impl_compact_resolver_compact_keyless {
818 ($db:ident, $op:ident) => {
819 impl<F, E, V, H, C, S> Resolver for Arc<$db<F, E, V, H, C, S>>
820 where
821 F: Family,
822 E: crate::Context,
823 V: ValueEncoding + Send + Sync + 'static,
824 H: Hasher,
825 $op<F, V>: Encode + Read<Cfg = C>,
826 C: Clone + Send + Sync + 'static,
827 S: Strategy,
828 {
829 type Family = F;
830 type Digest = H::Digest;
831 type Op = $op<F, V>;
832 type Error = ServeError<F, H::Digest>;
833
834 async fn get_compact_state(
835 &self,
836 target: Target<Self::Family, Self::Digest>,
837 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
838 self.compact_state(target).map(Into::into)
839 }
840 }
841
842 impl<F, E, V, H, C, S> Resolver for Arc<AsyncRwLock<$db<F, E, V, H, C, S>>>
843 where
844 F: Family,
845 E: crate::Context,
846 V: ValueEncoding + Send + Sync + 'static,
847 H: Hasher,
848 $op<F, V>: Encode + Read<Cfg = C>,
849 C: Clone + Send + Sync + 'static,
850 S: Strategy,
851 {
852 type Family = F;
853 type Digest = H::Digest;
854 type Op = $op<F, V>;
855 type Error = ServeError<F, H::Digest>;
856
857 async fn get_compact_state(
858 &self,
859 target: Target<Self::Family, Self::Digest>,
860 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
861 let db = self.read().await;
862 db.compact_state(target).map(Into::into)
863 }
864 }
865
866 impl<F, E, V, H, C, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, V, H, C, S>>>>
867 where
868 F: Family,
869 E: crate::Context,
870 V: ValueEncoding + Send + Sync + 'static,
871 H: Hasher,
872 $op<F, V>: Encode + Read<Cfg = C>,
873 C: Clone + Send + Sync + 'static,
874 S: Strategy,
875 {
876 type Family = F;
877 type Digest = H::Digest;
878 type Op = $op<F, V>;
879 type Error = ServeError<F, H::Digest>;
880
881 async fn get_compact_state(
882 &self,
883 target: Target<Self::Family, Self::Digest>,
884 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
885 let guard = self.read().await;
886 let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
887 db.compact_state(target).map(Into::into)
888 }
889 }
890 };
891}
892
893macro_rules! impl_compact_resolver_compact_immutable {
896 ($db:ident, $op:ident) => {
897 impl<F, E, K, V, H, C, S> Resolver for Arc<$db<F, E, K, V, H, C, S>>
898 where
899 F: Family,
900 E: crate::Context,
901 K: Key,
902 V: ValueEncoding + Send + Sync + 'static,
903 H: Hasher,
904 $op<F, K, V>: Encode + Read<Cfg = C>,
905 C: Clone + Send + Sync + 'static,
906 S: Strategy,
907 {
908 type Family = F;
909 type Digest = H::Digest;
910 type Op = $op<F, K, V>;
911 type Error = ServeError<F, H::Digest>;
912
913 async fn get_compact_state(
914 &self,
915 target: Target<Self::Family, Self::Digest>,
916 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
917 self.compact_state(target).map(Into::into)
918 }
919 }
920
921 impl<F, E, K, V, H, C, S> Resolver for Arc<AsyncRwLock<$db<F, E, K, V, H, C, S>>>
922 where
923 F: Family,
924 E: crate::Context,
925 K: Key,
926 V: ValueEncoding + Send + Sync + 'static,
927 H: Hasher,
928 $op<F, K, V>: Encode + Read<Cfg = C>,
929 C: Clone + Send + Sync + 'static,
930 S: Strategy,
931 {
932 type Family = F;
933 type Digest = H::Digest;
934 type Op = $op<F, K, V>;
935 type Error = ServeError<F, H::Digest>;
936
937 async fn get_compact_state(
938 &self,
939 target: Target<Self::Family, Self::Digest>,
940 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
941 let db = self.read().await;
942 db.compact_state(target).map(Into::into)
943 }
944 }
945
946 impl<F, E, K, V, H, C, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, K, V, H, C, S>>>>
947 where
948 F: Family,
949 E: crate::Context,
950 K: Key,
951 V: ValueEncoding + Send + Sync + 'static,
952 H: Hasher,
953 $op<F, K, V>: Encode + Read<Cfg = C>,
954 C: Clone + Send + Sync + 'static,
955 S: Strategy,
956 {
957 type Family = F;
958 type Digest = H::Digest;
959 type Op = $op<F, K, V>;
960 type Error = ServeError<F, H::Digest>;
961
962 async fn get_compact_state(
963 &self,
964 target: Target<Self::Family, Self::Digest>,
965 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
966 let guard = self.read().await;
967 let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
968 db.compact_state(target).map(Into::into)
969 }
970 }
971 };
972}
973
974impl_compact_resolver_compact_keyless!(KeylessCompactDb, KeylessOp);
975impl_compact_resolver_compact_immutable!(ImmutableCompactDb, ImmutableOp);
976
977impl_compact_resolver_keyless!(KeylessFixedDb, KeylessFixedOp, FixedValue);
978impl_compact_resolver_keyless!(KeylessVariableDb, KeylessVariableOp, VariableValue);
979impl_compact_resolver_immutable!(ImmutableFixedDb, ImmutableFixedOp, FixedValue, Array);
980impl_compact_resolver_immutable!(ImmutableVariableDb, ImmutableVariableOp, VariableValue, Key);
981
982#[cfg(test)]
983mod tests {
984 use super::{Config, Database, FetchResult, Resolver, State, Target};
985 use crate::{
986 merkle::{mmr, Location},
987 qmdb,
988 };
989 use commonware_codec::{DecodeExt as _, Encode as _, RangeCfg};
990 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
991 use commonware_parallel::Rayon;
992 use commonware_runtime::{deterministic, Runner as _};
993 use commonware_utils::sync::AsyncRwLock;
994 use std::{
995 collections::VecDeque,
996 convert::Infallible,
997 sync::{
998 atomic::{AtomicUsize, Ordering},
999 Arc,
1000 },
1001 };
1002
1003 macro_rules! assert_resolver_variants {
1004 ($db:ty) => {
1005 assert_resolver::<Arc<$db>>();
1006 assert_resolver::<Arc<AsyncRwLock<$db>>>();
1007 assert_resolver::<Arc<AsyncRwLock<Option<$db>>>>();
1008 };
1009 }
1010
1011 fn assert_resolver<R: super::Resolver>() {}
1012
1013 struct TestDb {
1014 root: Digest,
1015 }
1016
1017 impl Database for TestDb {
1018 type Family = mmr::Family;
1019 type Op = u8;
1020 type Config = (Digest, Arc<AtomicUsize>);
1021 type Digest = Digest;
1022 type Context = deterministic::Context;
1023 type Hasher = Sha256;
1024
1025 async fn from_validated_state(
1026 _context: Self::Context,
1027 (root, constructions): Self::Config,
1028 _state: super::ValidatedState<Self::Family, Self::Op, Self::Digest>,
1029 ) -> Result<Self, qmdb::Error<Self::Family>> {
1030 constructions.fetch_add(1, Ordering::SeqCst);
1031 Ok(Self { root })
1032 }
1033
1034 fn inactivity_floor(_op: &Self::Op) -> Option<Location<Self::Family>> {
1035 Some(Location::new(0))
1036 }
1037
1038 fn root(&self) -> Self::Digest {
1039 self.root
1040 }
1041
1042 async fn persist_compact_state(&self) -> Result<(), qmdb::Error<Self::Family>> {
1043 Ok(())
1044 }
1045 }
1046
1047 #[derive(Clone)]
1048 struct SequenceResolver {
1049 states: Arc<commonware_utils::sync::Mutex<VecDeque<FetchResult<mmr::Family, u8, Digest>>>>,
1050 }
1051
1052 impl Resolver for SequenceResolver {
1053 type Family = mmr::Family;
1054 type Digest = Digest;
1055 type Op = u8;
1056 type Error = Infallible;
1057
1058 async fn get_compact_state(
1059 &self,
1060 _target: Target<Self::Family, Self::Digest>,
1061 ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
1062 Ok(self
1063 .states
1064 .lock()
1065 .pop_front()
1066 .expect("missing compact fetch result"))
1067 }
1068 }
1069
1070 fn valid_state_and_target() -> (State<mmr::Family, u8, Digest>, Target<mmr::Family, Digest>) {
1071 let hasher = qmdb::hasher::<Sha256>();
1072 let mut merkle = crate::merkle::mem::Mem::<mmr::Family, Digest>::new();
1073 let op = 0u8;
1074 let first_op = 1u8;
1075 let batch = merkle
1076 .new_batch()
1077 .add(&hasher, &first_op.encode())
1078 .add(&hasher, &op.encode());
1079 let batch = batch.merkleize(&merkle, &hasher);
1080 merkle.apply_batch(&batch).unwrap();
1081 let root = merkle.root(&hasher, 0).unwrap();
1082 let leaf_count = Location::new(2);
1083 let pinned_nodes = merkle
1084 .nodes_to_pin(leaf_count)
1085 .into_values()
1086 .collect::<Vec<_>>();
1087 let proof = merkle.proof(&hasher, Location::new(1), 0).unwrap();
1088 (
1089 State {
1090 leaf_count,
1091 pinned_nodes,
1092 last_commit_op: op,
1093 last_commit_proof: proof,
1094 },
1095 Target::<mmr::Family, Digest> { root, leaf_count },
1096 )
1097 }
1098
1099 #[test]
1100 fn test_all_compact_qmdb_variants_implement_strategy_resolvers() {
1101 type KeylessFixedCompactDb = crate::qmdb::keyless::fixed::CompactDb<
1102 mmr::Family,
1103 deterministic::Context,
1104 Digest,
1105 commonware_cryptography::Sha256,
1106 Rayon,
1107 >;
1108 type KeylessVariableCompactDb = crate::qmdb::keyless::variable::CompactDb<
1109 mmr::Family,
1110 deterministic::Context,
1111 Vec<u8>,
1112 commonware_cryptography::Sha256,
1113 (RangeCfg<usize>, ()),
1114 Rayon,
1115 >;
1116 type ImmutableFixedCompactDb = crate::qmdb::immutable::fixed::CompactDb<
1117 mmr::Family,
1118 deterministic::Context,
1119 Digest,
1120 Digest,
1121 commonware_cryptography::Sha256,
1122 Rayon,
1123 >;
1124 type ImmutableVariableCompactDb = crate::qmdb::immutable::variable::CompactDb<
1125 mmr::Family,
1126 deterministic::Context,
1127 Digest,
1128 Vec<u8>,
1129 commonware_cryptography::Sha256,
1130 ((), (RangeCfg<usize>, ())),
1131 Rayon,
1132 >;
1133
1134 assert_resolver_variants!(KeylessFixedCompactDb);
1135 assert_resolver_variants!(KeylessVariableCompactDb);
1136 assert_resolver_variants!(ImmutableFixedCompactDb);
1137 assert_resolver_variants!(ImmutableVariableCompactDb);
1138 }
1139
1140 #[test]
1141 fn test_target_decode_rejects_zero_leaf_count() {
1142 let unused_root = commonware_cryptography::Sha256::hash(b"unused");
1143 let encoded = Target::<mmr::Family, Digest> {
1144 root: unused_root,
1145 leaf_count: crate::merkle::Location::new(0),
1146 }
1147 .encode();
1148
1149 assert!(Target::<mmr::Family, Digest>::decode(encoded).is_err());
1150 }
1151
1152 #[test]
1153 fn test_compact_sync_retries_invalid_state_without_feedback() {
1154 deterministic::Runner::default().start(|context| async move {
1155 let (good_state, target) = valid_state_and_target();
1156 let mut bad_state = good_state.clone();
1157 bad_state.pinned_nodes.push(Sha256::hash(b"extra pin"));
1158 let (good_tx, good_rx) = commonware_utils::channel::oneshot::channel();
1159 let constructions = Arc::new(AtomicUsize::new(0));
1160
1161 let db = super::sync::<TestDb, _>(Config {
1162 context,
1163 resolver: SequenceResolver {
1164 states: Arc::new(commonware_utils::sync::Mutex::new(VecDeque::from([
1165 FetchResult {
1166 state: bad_state,
1167 callback: None,
1168 },
1169 FetchResult {
1170 state: good_state,
1171 callback: Some(good_tx),
1172 },
1173 ]))),
1174 },
1175 target: target.clone(),
1176 db_config: (target.root, constructions.clone()),
1177 })
1178 .await
1179 .unwrap();
1180
1181 assert!(good_rx.await.expect("valid feedback should arrive"));
1182 assert_eq!(constructions.load(Ordering::SeqCst), 1);
1183 assert_eq!(db.root(), target.root);
1184 });
1185 }
1186}
1187
1188#[cfg(all(test, feature = "arbitrary"))]
1189mod conformance {
1190 use super::*;
1191 use crate::merkle::{mmb, mmr};
1192 use commonware_codec::conformance::CodecConformance;
1193 use commonware_cryptography::sha256::Digest as Sha256Digest;
1194
1195 commonware_conformance::conformance_tests! {
1196 CodecConformance<Target<mmr::Family, Sha256Digest>>,
1197 CodecConformance<Target<mmb::Family, Sha256Digest>>,
1198 }
1199}