Skip to main content

commonware_storage/qmdb/sync/
compact.rs

1//! Compact sync for compact-storage qmdbs.
2//!
3//! Compact sync does not transfer or reconstruct the full historical operation log. Instead, the
4//! source serves the minimum authenticated state needed to recreate the latest committed compact db
5//! state:
6//!
7//! - the total committed leaf count,
8//! - the compact frontier's pinned nodes for that leaf count,
9//! - the final commit operation, and
10//! - a proof authenticating that final commit against the requested root.
11//!
12//! # What compact dbs store
13//!
14//! A compact db persists two pieces of state that must always describe the same committed tip:
15//!
16//! 1. the compact Merkle frontier (persisted by [`crate::merkle::compact`]), and
17//! 2. a db-level witness for the last commit (persisted by `qmdb::compact::witness`).
18//!
19//! The witness exists because only the db layer knows how to encode and decode the typed commit
20//! operation. Without it, a compact db could recover its root and continue appending, but it could
21//! not serve compact sync to another node.
22//!
23//! # When compact state changes
24//!
25//! The servable compact state advances only on durable persistence:
26//!
27//! - [`sync`] verifies the final commit proof and compact frontier before database construction.
28//! - [`Database::from_validated_state`] reconstructs the already-validated state in memory only.
29//! - Compact db-local commits persist the frontier and witness together during `sync`/`commit`.
30//! - `rewind` restores both the frontier and the witness from the previous slot together.
31//!
32//! Unsynced in-memory mutations are therefore intentionally not servable: `current_target()` and
33//! compact-state responses lag behind `apply_batch()` until the next durable sync.
34//!
35//! # Safety and invariants
36//!
37//! The compact path relies on these invariants:
38//!
39//! - the served commit proof must authenticate the final commit at `leaf_count - 1`,
40//! - the frontier pins and witness must move together in the same ping-pong slot,
41//! - reopen and rewind must re-verify the persisted witness against the root restored from that
42//!   slot, and
43//! - reconstructed state must not be persisted until the db recomputes the requested root locally.
44//!
45//! If those invariants are violated by missing or corrupted persisted data, compact db reopen fails
46//! with `DataCorrupted` rather than silently serving or restoring mismatched state.
47
48use crate::{
49    merkle::{Family, Location, Proof},
50    qmdb::{
51        self,
52        any::{value::ValueEncoding, FixedValue, VariableValue},
53        immutable::{
54            fixed::{Db as ImmutableFixedDb, Operation as ImmutableFixedOp},
55            variable::{Db as ImmutableVariableDb, Operation as ImmutableVariableOp},
56            CompactDb as ImmutableCompactDb, Operation as ImmutableOp,
57        },
58        keyless::{
59            fixed::{Db as KeylessFixedDb, Operation as KeylessFixedOp},
60            variable::{Db as KeylessVariableDb, Operation as KeylessVariableOp},
61            CompactDb as KeylessCompactDb, Operation as KeylessOp,
62        },
63        operation::Key,
64        sync::{EngineError, Error},
65        verify_proof,
66    },
67    translator::Translator,
68};
69use commonware_codec::{
70    Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write,
71};
72use commonware_cryptography::{Digest, Hasher};
73use commonware_parallel::Strategy;
74use commonware_runtime::{Buf, BufMut, Clock, Metrics, Storage, Supervisor};
75use commonware_utils::{channel::oneshot, sync::AsyncRwLock, Array};
76use std::{future::Future, num::NonZeroU64, sync::Arc};
77
78/// Compact-sync target for a compact-storage database.
79///
80/// Compact sync authenticates only the final committed root and total leaf count. Unlike replay
81/// sync, there is no lower replay bound here because compact sync does not transfer or reconstruct
82/// historical operations.
83#[derive(Debug)]
84pub struct Target<F: Family, D: Digest> {
85    /// Authenticated root of the committed compact state.
86    pub root: D,
87    /// Total committed operations/leaves in that state.
88    pub leaf_count: Location<F>,
89}
90
91impl<F: Family, D: Digest> Target<F, D> {
92    const INVALID_LEAF_COUNT: &'static str = "leaf_count must be in 1..=MAX_LEAVES";
93
94    /// Create a compact-sync target.
95    pub const fn new(root: D, leaf_count: Location<F>) -> Self {
96        Self { root, leaf_count }
97    }
98
99    /// Validate a compact target that may have been constructed programmatically.
100    pub fn validate(&self) -> Result<(), &'static str> {
101        if !self.leaf_count.is_valid() || self.leaf_count == 0 {
102            return Err(Self::INVALID_LEAF_COUNT);
103        }
104        Ok(())
105    }
106}
107
108impl<F: Family, D: Digest> Clone for Target<F, D> {
109    fn clone(&self) -> Self {
110        Self {
111            root: self.root,
112            leaf_count: self.leaf_count,
113        }
114    }
115}
116
117impl<F: Family, D: Digest> PartialEq for Target<F, D> {
118    fn eq(&self, other: &Self) -> bool {
119        self.root == other.root && self.leaf_count == other.leaf_count
120    }
121}
122
123impl<F: Family, D: Digest> Eq for Target<F, D> {}
124
125impl<F: Family, D: Digest> Write for Target<F, D> {
126    fn write(&self, buf: &mut impl BufMut) {
127        self.root.write(buf);
128        self.leaf_count.write(buf);
129    }
130}
131
132impl<F: Family, D: Digest> EncodeSize for Target<F, D> {
133    fn encode_size(&self) -> usize {
134        self.root.encode_size() + self.leaf_count.encode_size()
135    }
136}
137
138impl<F: Family, D: Digest> Read for Target<F, D> {
139    type Cfg = ();
140
141    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
142        let root = D::read(buf)?;
143        let leaf_count = Location::<F>::read(buf)?;
144        let target = Self { root, leaf_count };
145        target.validate().map_err(|reason| {
146            CodecError::Invalid("storage::qmdb::sync::compact::Target", reason)
147        })?;
148        Ok(target)
149    }
150}
151
152#[cfg(feature = "arbitrary")]
153impl<F: Family, D: Digest> arbitrary::Arbitrary<'_> for Target<F, D>
154where
155    D: for<'a> arbitrary::Arbitrary<'a>,
156{
157    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
158        let root = u.arbitrary()?;
159        let leaf_count = Location::new(u.int_in_range(1..=*F::MAX_LEAVES)?);
160        Ok(Self { root, leaf_count })
161    }
162}
163
164/// Authenticated state for initializing a compact-storage database at a target root.
165#[derive(Clone, Debug)]
166pub struct State<F: Family, Op, D: Digest> {
167    /// Total number of operations/leaves in the target database.
168    pub leaf_count: Location<F>,
169    /// Pinned Merkle nodes for the current frontier.
170    pub pinned_nodes: Vec<D>,
171    /// The final commit operation at `leaf_count - 1`.
172    pub last_commit_op: Op,
173    /// Proof authenticating `last_commit_op` against the target root.
174    pub last_commit_proof: Proof<F, D>,
175}
176
177/// Compact state that has been validated against a target root.
178///
179/// This carries the original compact state plus the values derived while validating it. Compact
180/// database constructors still build their storage-backed Merkle state and witness cache, but they
181/// should use these values instead of re-deriving them from peer-provided state.
182#[derive(Clone, Debug)]
183pub struct ValidatedState<F: Family, Op, D: Digest> {
184    /// The compact state fetched from a peer after validation.
185    pub state: State<F, Op, D>,
186    /// The target root that `state` was validated against.
187    pub root: D,
188    /// The inactivity floor derived from the final commit operation.
189    pub inactivity_floor: Location<F>,
190}
191
192impl<F: Family, Op, D: Digest> ValidatedState<F, Op, D> {
193    const fn new(state: State<F, Op, D>, root: D, inactivity_floor: Location<F>) -> Self {
194        Self {
195            state,
196            root,
197            inactivity_floor,
198        }
199    }
200}
201
202impl<F: Family, Op, D: Digest> Write for State<F, Op, D>
203where
204    Op: Write,
205{
206    fn write(&self, buf: &mut impl BufMut) {
207        self.leaf_count.write(buf);
208        self.pinned_nodes.write(buf);
209        self.last_commit_op.write(buf);
210        self.last_commit_proof.write(buf);
211    }
212}
213
214/// Result from a compact-state fetch.
215pub struct FetchResult<F: Family, Op, D: Digest> {
216    /// The fetched compact state.
217    pub state: State<F, Op, D>,
218    /// Callback used to report whether downstream accepted the state.
219    pub callback: Option<oneshot::Sender<bool>>,
220}
221
222impl<F: Family, Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<F, Op, D> {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        f.debug_struct("FetchResult")
225            .field("state", &self.state)
226            .field("callback", &self.callback.as_ref().map(|_| "<callback>"))
227            .finish()
228    }
229}
230
231impl<F: Family, Op, D: Digest> From<State<F, Op, D>> for FetchResult<F, Op, D> {
232    fn from(state: State<F, Op, D>) -> Self {
233        Self {
234            state,
235            callback: None,
236        }
237    }
238}
239
240impl<F: Family, Op, D: Digest> EncodeSize for State<F, Op, D>
241where
242    Op: EncodeSize,
243{
244    fn encode_size(&self) -> usize {
245        self.leaf_count.encode_size()
246            + self.pinned_nodes.encode_size()
247            + self.last_commit_op.encode_size()
248            + self.last_commit_proof.encode_size()
249    }
250}
251
252impl<F: Family, Op, D: Digest> Read for State<F, Op, D>
253where
254    Op: Read,
255{
256    type Cfg = (RangeCfg<usize>, Op::Cfg, usize);
257
258    fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
259        let (pinned_nodes_cfg, op_cfg, max_proof_digests) = cfg;
260        Ok(Self {
261            leaf_count: Location::<F>::read(buf)?,
262            pinned_nodes: Vec::<D>::read_cfg(buf, &(*pinned_nodes_cfg, ()))?,
263            last_commit_op: Op::read_cfg(buf, op_cfg)?,
264            last_commit_proof: Proof::<F, D>::read_cfg(buf, max_proof_digests)?,
265        })
266    }
267}
268
269/// Resolver-side errors for compact state serving.
270#[derive(Debug, thiserror::Error)]
271pub enum ServeError<F: Family, D: Digest> {
272    /// The source database returned an error while building compact state.
273    #[error("compact source database error: {0}")]
274    Database(#[from] qmdb::Error<F>),
275    /// The caller requested a target that compact sync cannot serve.
276    #[error("invalid compact target: {0}")]
277    InvalidTarget(&'static str),
278    /// The resolver wrapper did not currently hold a database.
279    #[error("compact source missing")]
280    MissingSource,
281    /// The caller requested a target different from the source's current witness.
282    #[error("stale compact target - requested {requested:?}, current {current:?}")]
283    StaleTarget {
284        requested: Target<F, D>,
285        current: Target<F, D>,
286    },
287}
288
289/// Trait for compact sync fetches from a source database.
290#[allow(clippy::type_complexity)]
291pub trait Resolver: Send + Sync + Clone + 'static {
292    /// The merkle family backing the resolver's proofs.
293    type Family: Family;
294
295    /// The digest type used in proofs returned by the resolver.
296    type Digest: Digest;
297
298    /// The type of operations returned by the resolver.
299    type Op;
300
301    /// The error type returned by the resolver.
302    type Error: std::error::Error + Send + 'static;
303
304    /// Fetch the authenticated state for `target`.
305    fn get_compact_state<'a>(
306        &'a self,
307        target: Target<Self::Family, Self::Digest>,
308    ) -> impl Future<Output = Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error>>
309           + Send
310           + 'a;
311}
312
313/// Marker trait for resolvers whose associated types match a specific compact-sync database.
314///
315/// This is a trait-alias pattern used to avoid repeating
316/// `Resolver<Family = DB::Family, Op = DB::Op, Digest = DB::Digest>`.
317/// Blanket-implemented for any matching [`Resolver`], so callers never implement this directly.
318pub trait CompactDbResolver<DB: Database>:
319    Resolver<Family = DB::Family, Op = DB::Op, Digest = DB::Digest>
320{
321}
322
323impl<DB, R> CompactDbResolver<DB> for R
324where
325    DB: Database,
326    R: Resolver<Family = DB::Family, Op = DB::Op, Digest = DB::Digest>,
327{
328}
329
330/// Database types that can be initialized directly from compact state.
331pub trait Database: Sized + Send {
332    type Family: Family;
333    type Op: Encode + Send;
334    type Config: Clone;
335    type Digest: Digest;
336    type Context: Storage + Clock + Metrics;
337    type Hasher: Hasher<Digest = Self::Digest>;
338
339    /// Build a database from authenticated state in memory.
340    ///
341    /// The caller has already validated `last_commit_proof` and the compact frontier against the
342    /// requested target root and passes the derived validation artifacts with the state. This
343    /// constructor must not durably persist anything; persistence happens only after the caller
344    /// re-checks that `Self::root()` matches the target root.
345    fn from_validated_state(
346        context: Self::Context,
347        config: Self::Config,
348        state: ValidatedState<Self::Family, Self::Op, Self::Digest>,
349    ) -> impl Future<Output = Result<Self, qmdb::Error<Self::Family>>> + Send;
350
351    /// Return the inactivity floor if the operation is a commit.
352    fn inactivity_floor(op: &Self::Op) -> Option<Location<Self::Family>>;
353
354    /// Get the root digest for final verification.
355    fn root(&self) -> Self::Digest;
356
357    /// Persist the compact-initialized state once the caller has verified its root.
358    fn persist_compact_state(
359        &self,
360    ) -> impl Future<Output = Result<(), qmdb::Error<Self::Family>>> + Send;
361}
362
363/// Configuration for compact synchronization into a compact-storage database.
364pub struct Config<DB, R>
365where
366    DB: Database,
367    R: CompactDbResolver<DB>,
368{
369    /// Runtime context for creating database components.
370    pub context: DB::Context,
371    /// Source resolver for fetching compact authenticated state.
372    pub resolver: R,
373    /// Sync target (root digest and total leaf count).
374    pub target: Target<DB::Family, DB::Digest>,
375    /// Database-specific configuration.
376    pub db_config: DB::Config,
377}
378
379/// Create/open a compact-storage database and initialize it from compact authenticated state.
380///
381/// Unlike streaming sync, compact sync jumps directly to `target.leaf_count`. This path
382/// authenticates the final commit and frontier state for the target root rather than replaying a
383/// retained operation range.
384///
385/// Verification order:
386/// 1. Fetch the proposed compact state for `target`.
387/// 2. Verify the final commit proof against `target.root`.
388/// 3. Rebuild the compact frontier in memory and compare its root against `target.root`.
389/// 4. Build the compact db from that already-validated state.
390/// 5. Assert the db root still matches and persist the state.
391///
392/// Any failure leaves the local compact db unopened or unchanged on disk.
393pub async fn sync<DB, R>(
394    config: Config<DB, R>,
395) -> Result<DB, Error<DB::Family, R::Error, DB::Digest>>
396where
397    DB: Database,
398    R: CompactDbResolver<DB>,
399{
400    let target = config.target;
401    target
402        .validate()
403        .map_err(|reason| Error::Engine(EngineError::InvalidCompactTarget(reason)))?;
404
405    // Compact sync has no request scheduler, so this loop is its retry boundary for bad peer
406    // responses. Resolver errors and local construction failures remain terminal.
407    loop {
408        let FetchResult { state, callback } = config
409            .resolver
410            .get_compact_state(target.clone())
411            .await
412            .map_err(Error::Resolver)?;
413
414        // Validation failures describe a bad compact response. Reject it if the resolver supplied
415        // feedback, then fetch another candidate.
416        let validated_state = match validate_compact_state::<DB>(&target, state) {
417            Ok(state) => state,
418            Err(err) => {
419                if let Some(callback) = callback {
420                    let _ = callback.send(false);
421                }
422                tracing::debug!(error = ?err, "compact state failed validation, will retry");
423                continue;
424            }
425        };
426
427        // The peer response has already authenticated the final commit and frontier. From here,
428        // construction should only fail for local database/storage reasons; a root mismatch is a
429        // bug in this path.
430        let db = DB::from_validated_state(
431            config.context.child("compact"),
432            config.db_config.clone(),
433            validated_state,
434        )
435        .await
436        .map_err(Error::Database)?;
437        assert_eq!(
438            db.root(),
439            target.root,
440            "validated compact state reconstructed unexpected root",
441        );
442
443        if let Some(callback) = callback {
444            let _ = callback.send(true);
445        }
446        db.persist_compact_state().await?;
447        return Ok(db);
448    }
449}
450
451/// Validate the peer-provided compact state before constructing local database storage.
452fn validate_compact_state<DB>(
453    target: &Target<DB::Family, DB::Digest>,
454    state: State<DB::Family, DB::Op, DB::Digest>,
455) -> CompactFrontierValidation<DB>
456where
457    DB: Database,
458{
459    if state.leaf_count != target.leaf_count {
460        return Err(EngineError::UnexpectedLeafCount {
461            expected: target.leaf_count,
462            actual: state.leaf_count,
463        });
464    }
465
466    let hasher = qmdb::hasher::<DB::Hasher>();
467    let last_commit_loc = Location::new(*state.leaf_count - 1);
468    if !verify_proof(
469        &hasher,
470        &state.last_commit_proof,
471        last_commit_loc,
472        std::slice::from_ref(&state.last_commit_op),
473        &target.root,
474    ) {
475        return Err(EngineError::InvalidProof);
476    }
477
478    validate_compact_frontier::<DB>(target, state)
479}
480
481/// Result of validating a peer-provided compact frontier.
482type CompactFrontierValidation<DB> = Result<
483    ValidatedState<<DB as Database>::Family, <DB as Database>::Op, <DB as Database>::Digest>,
484    EngineError<<DB as Database>::Family, <DB as Database>::Digest>,
485>;
486
487/// Validate that a peer-provided compact frontier authenticates the requested target root.
488fn validate_compact_frontier<DB>(
489    target: &Target<DB::Family, DB::Digest>,
490    state: State<DB::Family, DB::Op, DB::Digest>,
491) -> CompactFrontierValidation<DB>
492where
493    DB: Database,
494{
495    // The final commit is the only operation carried in compact state. Its floor determines which
496    // peaks are inactive when authenticating the compact frontier root.
497    let last_commit_loc = Location::new(*state.leaf_count - 1);
498    let Some(inactivity_floor_loc) = DB::inactivity_floor(&state.last_commit_op) else {
499        return Err(EngineError::InvalidProof);
500    };
501    if inactivity_floor_loc > last_commit_loc {
502        return Err(EngineError::InvalidProof);
503    }
504
505    // Rebuild a disposable Merkle view from the pinned frontier before opening any database
506    // storage. Invalid pin counts or inactive peak layouts are treated as bad peer proofs.
507    let mem = crate::merkle::mem::Mem::<DB::Family, DB::Digest>::init(crate::merkle::mem::Config {
508        nodes: Vec::new(),
509        pruning_boundary: state.leaf_count,
510        pinned_nodes: state.pinned_nodes.clone(),
511    })
512    .map_err(|_| EngineError::InvalidProof)?;
513    let hasher = qmdb::hasher::<DB::Hasher>();
514    let inactive_peaks = DB::Family::inactive_peaks(
515        DB::Family::location_to_position(state.leaf_count),
516        inactivity_floor_loc,
517    );
518    let actual = mem
519        .root(&hasher, inactive_peaks)
520        .map_err(|_| EngineError::InvalidProof)?;
521    if actual != target.root {
522        return Err(EngineError::RootMismatch {
523            expected: target.root,
524            actual,
525        });
526    }
527
528    Ok(ValidatedState::new(
529        state,
530        target.root,
531        inactivity_floor_loc,
532    ))
533}
534
535async fn fetch_state_from_full_source<F, Op, D, Current, CurrentFut, Hist, HistFut, Pins, PinsFut>(
536    target: Target<F, D>,
537    current_target: Current,
538    historical_proof: Hist,
539    pinned_nodes_at: Pins,
540) -> Result<State<F, Op, D>, ServeError<F, D>>
541where
542    F: Family,
543    D: Digest,
544    Current: FnOnce() -> CurrentFut,
545    CurrentFut: Future<Output = Target<F, D>>,
546    Hist: FnOnce(Location<F>, Location<F>) -> HistFut,
547    HistFut: Future<Output = Result<(Proof<F, D>, Vec<Op>), qmdb::Error<F>>>,
548    Pins: FnOnce(Location<F>) -> PinsFut,
549    PinsFut: Future<Output = Result<Vec<D>, qmdb::Error<F>>>,
550{
551    // Full sources do not cache a compact witness. Instead, derive the compact payload on demand
552    // from the current tip commit plus the frontier pins at the requested tree size.
553    target.validate().map_err(ServeError::InvalidTarget)?;
554    let current = current_target().await;
555    if target.root != current.root || target.leaf_count != current.leaf_count {
556        return Err(ServeError::StaleTarget {
557            requested: target,
558            current,
559        });
560    }
561    let leaf_count = target.leaf_count;
562    let last_commit_loc = Location::new(*leaf_count - 1);
563    let (last_commit_proof, mut operations) = historical_proof(leaf_count, last_commit_loc)
564        .await
565        .map_err(ServeError::Database)?;
566    // Compact sync always authenticates exactly the final commit leaf.
567    let last_commit_op =
568        operations
569            .pop()
570            .ok_or(ServeError::Database(qmdb::Error::DataCorrupted(
571                "missing last commit operation",
572            )))?;
573    let pinned_nodes = pinned_nodes_at(leaf_count)
574        .await
575        .map_err(ServeError::Database)?;
576    Ok(State {
577        leaf_count,
578        pinned_nodes,
579        last_commit_op,
580        last_commit_proof,
581    })
582}
583
584// Resolver impls for full keyless databases. These synthesize compact state by querying the
585// historical tip proof and current frontier pins from the full source.
586macro_rules! impl_compact_resolver_keyless {
587    ($db:ident, $op:ident, $val_bound:ident) => {
588        impl<F, E, V, H, S> Resolver for Arc<$db<F, E, V, H, S>>
589        where
590            F: Family,
591            E: crate::Context,
592            V: $val_bound + Send + Sync + 'static,
593            H: Hasher,
594            S: Strategy,
595        {
596            type Family = F;
597            type Digest = H::Digest;
598            type Op = $op<F, V>;
599            type Error = ServeError<F, H::Digest>;
600
601            async fn get_compact_state(
602                &self,
603                target: Target<Self::Family, Self::Digest>,
604            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
605                fetch_state_from_full_source(
606                    target,
607                    || async { Target::new(self.root(), self.bounds().await.end) },
608                    |leaf_count, last_commit_loc| {
609                        self.historical_proof(
610                            leaf_count,
611                            last_commit_loc,
612                            NonZeroU64::new(1).unwrap(),
613                        )
614                    },
615                    |leaf_count| self.pinned_nodes_at(leaf_count),
616                )
617                .await
618                .map(Into::into)
619            }
620        }
621
622        impl<F, E, V, H, S> Resolver for Arc<AsyncRwLock<$db<F, E, V, H, S>>>
623        where
624            F: Family,
625            E: crate::Context,
626            V: $val_bound + Send + Sync + 'static,
627            H: Hasher,
628            S: Strategy,
629        {
630            type Family = F;
631            type Digest = H::Digest;
632            type Op = $op<F, V>;
633            type Error = ServeError<F, H::Digest>;
634
635            async fn get_compact_state(
636                &self,
637                target: Target<Self::Family, Self::Digest>,
638            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
639                let db = self.read().await;
640                fetch_state_from_full_source(
641                    target,
642                    || async { Target::new(db.root(), db.bounds().await.end) },
643                    |leaf_count, last_commit_loc| {
644                        db.historical_proof(
645                            leaf_count,
646                            last_commit_loc,
647                            NonZeroU64::new(1).unwrap(),
648                        )
649                    },
650                    |leaf_count| db.pinned_nodes_at(leaf_count),
651                )
652                .await
653                .map(Into::into)
654            }
655        }
656
657        impl<F, E, V, H, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, V, H, S>>>>
658        where
659            F: Family,
660            E: crate::Context,
661            V: $val_bound + Send + Sync + 'static,
662            H: Hasher,
663            S: Strategy,
664        {
665            type Family = F;
666            type Digest = H::Digest;
667            type Op = $op<F, V>;
668            type Error = ServeError<F, H::Digest>;
669
670            async fn get_compact_state(
671                &self,
672                target: Target<Self::Family, Self::Digest>,
673            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
674                let guard = self.read().await;
675                let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
676                fetch_state_from_full_source(
677                    target,
678                    || async { Target::new(db.root(), db.bounds().await.end) },
679                    |leaf_count, last_commit_loc| {
680                        db.historical_proof(
681                            leaf_count,
682                            last_commit_loc,
683                            NonZeroU64::new(1).unwrap(),
684                        )
685                    },
686                    |leaf_count| db.pinned_nodes_at(leaf_count),
687                )
688                .await
689                .map(Into::into)
690            }
691        }
692    };
693}
694
695// Resolver impls for full immutable databases. Same pattern as keyless, but with the extra key and
696// translator parameters carried by immutable variants.
697macro_rules! impl_compact_resolver_immutable {
698    ($db:ident, $op:ident, $val_bound:ident, $key_bound:path) => {
699        impl<F, E, K, V, H, T, S> Resolver for Arc<$db<F, E, K, V, H, T, S>>
700        where
701            F: Family,
702            E: crate::Context,
703            K: $key_bound,
704            V: $val_bound + Send + Sync + 'static,
705            H: Hasher,
706            T: Translator + Send + Sync + 'static,
707            T::Key: Send + Sync,
708            S: Strategy,
709        {
710            type Family = F;
711            type Digest = H::Digest;
712            type Op = $op<F, K, V>;
713            type Error = ServeError<F, H::Digest>;
714
715            async fn get_compact_state(
716                &self,
717                target: Target<Self::Family, Self::Digest>,
718            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
719                fetch_state_from_full_source(
720                    target,
721                    || async { Target::new(self.root(), self.bounds().await.end) },
722                    |leaf_count, last_commit_loc| {
723                        self.historical_proof(
724                            leaf_count,
725                            last_commit_loc,
726                            NonZeroU64::new(1).unwrap(),
727                        )
728                    },
729                    |leaf_count| self.pinned_nodes_at(leaf_count),
730                )
731                .await
732                .map(Into::into)
733            }
734        }
735
736        impl<F, E, K, V, H, T, S> Resolver for Arc<AsyncRwLock<$db<F, E, K, V, H, T, S>>>
737        where
738            F: Family,
739            E: crate::Context,
740            K: $key_bound,
741            V: $val_bound + Send + Sync + 'static,
742            H: Hasher,
743            T: Translator + Send + Sync + 'static,
744            T::Key: Send + Sync,
745            S: Strategy,
746        {
747            type Family = F;
748            type Digest = H::Digest;
749            type Op = $op<F, K, V>;
750            type Error = ServeError<F, H::Digest>;
751
752            async fn get_compact_state(
753                &self,
754                target: Target<Self::Family, Self::Digest>,
755            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
756                let db = self.read().await;
757                fetch_state_from_full_source(
758                    target,
759                    || async { Target::new(db.root(), db.bounds().await.end) },
760                    |leaf_count, last_commit_loc| {
761                        db.historical_proof(
762                            leaf_count,
763                            last_commit_loc,
764                            NonZeroU64::new(1).unwrap(),
765                        )
766                    },
767                    |leaf_count| db.pinned_nodes_at(leaf_count),
768                )
769                .await
770                .map(Into::into)
771            }
772        }
773
774        impl<F, E, K, V, H, T, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, K, V, H, T, S>>>>
775        where
776            F: Family,
777            E: crate::Context,
778            K: $key_bound,
779            V: $val_bound + Send + Sync + 'static,
780            H: Hasher,
781            T: Translator + Send + Sync + 'static,
782            T::Key: Send + Sync,
783            S: Strategy,
784        {
785            type Family = F;
786            type Digest = H::Digest;
787            type Op = $op<F, K, V>;
788            type Error = ServeError<F, H::Digest>;
789
790            async fn get_compact_state(
791                &self,
792                target: Target<Self::Family, Self::Digest>,
793            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
794                let guard = self.read().await;
795                let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
796                fetch_state_from_full_source(
797                    target,
798                    || async { Target::new(db.root(), db.bounds().await.end) },
799                    |leaf_count, last_commit_loc| {
800                        db.historical_proof(
801                            leaf_count,
802                            last_commit_loc,
803                            NonZeroU64::new(1).unwrap(),
804                        )
805                    },
806                    |leaf_count| db.pinned_nodes_at(leaf_count),
807                )
808                .await
809                .map(Into::into)
810            }
811        }
812    };
813}
814
815// Resolver impls for compact keyless databases. These already persist a compact witness, so serving
816// is just a target check over the current witness rather than reconstructing anything from history.
817macro_rules! impl_compact_resolver_compact_keyless {
818    ($db:ident, $op:ident) => {
819        impl<F, E, V, H, C, S> Resolver for Arc<$db<F, E, V, H, C, S>>
820        where
821            F: Family,
822            E: crate::Context,
823            V: ValueEncoding + Send + Sync + 'static,
824            H: Hasher,
825            $op<F, V>: Encode + Read<Cfg = C>,
826            C: Clone + Send + Sync + 'static,
827            S: Strategy,
828        {
829            type Family = F;
830            type Digest = H::Digest;
831            type Op = $op<F, V>;
832            type Error = ServeError<F, H::Digest>;
833
834            async fn get_compact_state(
835                &self,
836                target: Target<Self::Family, Self::Digest>,
837            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
838                self.compact_state(target).map(Into::into)
839            }
840        }
841
842        impl<F, E, V, H, C, S> Resolver for Arc<AsyncRwLock<$db<F, E, V, H, C, S>>>
843        where
844            F: Family,
845            E: crate::Context,
846            V: ValueEncoding + Send + Sync + 'static,
847            H: Hasher,
848            $op<F, V>: Encode + Read<Cfg = C>,
849            C: Clone + Send + Sync + 'static,
850            S: Strategy,
851        {
852            type Family = F;
853            type Digest = H::Digest;
854            type Op = $op<F, V>;
855            type Error = ServeError<F, H::Digest>;
856
857            async fn get_compact_state(
858                &self,
859                target: Target<Self::Family, Self::Digest>,
860            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
861                let db = self.read().await;
862                db.compact_state(target).map(Into::into)
863            }
864        }
865
866        impl<F, E, V, H, C, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, V, H, C, S>>>>
867        where
868            F: Family,
869            E: crate::Context,
870            V: ValueEncoding + Send + Sync + 'static,
871            H: Hasher,
872            $op<F, V>: Encode + Read<Cfg = C>,
873            C: Clone + Send + Sync + 'static,
874            S: Strategy,
875        {
876            type Family = F;
877            type Digest = H::Digest;
878            type Op = $op<F, V>;
879            type Error = ServeError<F, H::Digest>;
880
881            async fn get_compact_state(
882                &self,
883                target: Target<Self::Family, Self::Digest>,
884            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
885                let guard = self.read().await;
886                let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
887                db.compact_state(target).map(Into::into)
888            }
889        }
890    };
891}
892
893// Resolver impls for compact immutable databases. Like the keyless compact path, these read the
894// persisted witness directly instead of rebuilding it from a full operation log.
895macro_rules! impl_compact_resolver_compact_immutable {
896    ($db:ident, $op:ident) => {
897        impl<F, E, K, V, H, C, S> Resolver for Arc<$db<F, E, K, V, H, C, S>>
898        where
899            F: Family,
900            E: crate::Context,
901            K: Key,
902            V: ValueEncoding + Send + Sync + 'static,
903            H: Hasher,
904            $op<F, K, V>: Encode + Read<Cfg = C>,
905            C: Clone + Send + Sync + 'static,
906            S: Strategy,
907        {
908            type Family = F;
909            type Digest = H::Digest;
910            type Op = $op<F, K, V>;
911            type Error = ServeError<F, H::Digest>;
912
913            async fn get_compact_state(
914                &self,
915                target: Target<Self::Family, Self::Digest>,
916            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
917                self.compact_state(target).map(Into::into)
918            }
919        }
920
921        impl<F, E, K, V, H, C, S> Resolver for Arc<AsyncRwLock<$db<F, E, K, V, H, C, S>>>
922        where
923            F: Family,
924            E: crate::Context,
925            K: Key,
926            V: ValueEncoding + Send + Sync + 'static,
927            H: Hasher,
928            $op<F, K, V>: Encode + Read<Cfg = C>,
929            C: Clone + Send + Sync + 'static,
930            S: Strategy,
931        {
932            type Family = F;
933            type Digest = H::Digest;
934            type Op = $op<F, K, V>;
935            type Error = ServeError<F, H::Digest>;
936
937            async fn get_compact_state(
938                &self,
939                target: Target<Self::Family, Self::Digest>,
940            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
941                let db = self.read().await;
942                db.compact_state(target).map(Into::into)
943            }
944        }
945
946        impl<F, E, K, V, H, C, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, K, V, H, C, S>>>>
947        where
948            F: Family,
949            E: crate::Context,
950            K: Key,
951            V: ValueEncoding + Send + Sync + 'static,
952            H: Hasher,
953            $op<F, K, V>: Encode + Read<Cfg = C>,
954            C: Clone + Send + Sync + 'static,
955            S: Strategy,
956        {
957            type Family = F;
958            type Digest = H::Digest;
959            type Op = $op<F, K, V>;
960            type Error = ServeError<F, H::Digest>;
961
962            async fn get_compact_state(
963                &self,
964                target: Target<Self::Family, Self::Digest>,
965            ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
966                let guard = self.read().await;
967                let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
968                db.compact_state(target).map(Into::into)
969            }
970        }
971    };
972}
973
974impl_compact_resolver_compact_keyless!(KeylessCompactDb, KeylessOp);
975impl_compact_resolver_compact_immutable!(ImmutableCompactDb, ImmutableOp);
976
977impl_compact_resolver_keyless!(KeylessFixedDb, KeylessFixedOp, FixedValue);
978impl_compact_resolver_keyless!(KeylessVariableDb, KeylessVariableOp, VariableValue);
979impl_compact_resolver_immutable!(ImmutableFixedDb, ImmutableFixedOp, FixedValue, Array);
980impl_compact_resolver_immutable!(ImmutableVariableDb, ImmutableVariableOp, VariableValue, Key);
981
982#[cfg(test)]
983mod tests {
984    use super::{Config, Database, FetchResult, Resolver, State, Target};
985    use crate::{
986        merkle::{mmr, Location},
987        qmdb,
988    };
989    use commonware_codec::{DecodeExt as _, Encode as _, RangeCfg};
990    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
991    use commonware_parallel::Rayon;
992    use commonware_runtime::{deterministic, Runner as _};
993    use commonware_utils::sync::AsyncRwLock;
994    use std::{
995        collections::VecDeque,
996        convert::Infallible,
997        sync::{
998            atomic::{AtomicUsize, Ordering},
999            Arc,
1000        },
1001    };
1002
1003    macro_rules! assert_resolver_variants {
1004        ($db:ty) => {
1005            assert_resolver::<Arc<$db>>();
1006            assert_resolver::<Arc<AsyncRwLock<$db>>>();
1007            assert_resolver::<Arc<AsyncRwLock<Option<$db>>>>();
1008        };
1009    }
1010
1011    fn assert_resolver<R: super::Resolver>() {}
1012
1013    struct TestDb {
1014        root: Digest,
1015    }
1016
1017    impl Database for TestDb {
1018        type Family = mmr::Family;
1019        type Op = u8;
1020        type Config = (Digest, Arc<AtomicUsize>);
1021        type Digest = Digest;
1022        type Context = deterministic::Context;
1023        type Hasher = Sha256;
1024
1025        async fn from_validated_state(
1026            _context: Self::Context,
1027            (root, constructions): Self::Config,
1028            _state: super::ValidatedState<Self::Family, Self::Op, Self::Digest>,
1029        ) -> Result<Self, qmdb::Error<Self::Family>> {
1030            constructions.fetch_add(1, Ordering::SeqCst);
1031            Ok(Self { root })
1032        }
1033
1034        fn inactivity_floor(_op: &Self::Op) -> Option<Location<Self::Family>> {
1035            Some(Location::new(0))
1036        }
1037
1038        fn root(&self) -> Self::Digest {
1039            self.root
1040        }
1041
1042        async fn persist_compact_state(&self) -> Result<(), qmdb::Error<Self::Family>> {
1043            Ok(())
1044        }
1045    }
1046
1047    #[derive(Clone)]
1048    struct SequenceResolver {
1049        states: Arc<commonware_utils::sync::Mutex<VecDeque<FetchResult<mmr::Family, u8, Digest>>>>,
1050    }
1051
1052    impl Resolver for SequenceResolver {
1053        type Family = mmr::Family;
1054        type Digest = Digest;
1055        type Op = u8;
1056        type Error = Infallible;
1057
1058        async fn get_compact_state(
1059            &self,
1060            _target: Target<Self::Family, Self::Digest>,
1061        ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
1062            Ok(self
1063                .states
1064                .lock()
1065                .pop_front()
1066                .expect("missing compact fetch result"))
1067        }
1068    }
1069
1070    fn valid_state_and_target() -> (State<mmr::Family, u8, Digest>, Target<mmr::Family, Digest>) {
1071        let hasher = qmdb::hasher::<Sha256>();
1072        let mut merkle = crate::merkle::mem::Mem::<mmr::Family, Digest>::new();
1073        let op = 0u8;
1074        let first_op = 1u8;
1075        let batch = merkle
1076            .new_batch()
1077            .add(&hasher, &first_op.encode())
1078            .add(&hasher, &op.encode());
1079        let batch = batch.merkleize(&merkle, &hasher);
1080        merkle.apply_batch(&batch).unwrap();
1081        let root = merkle.root(&hasher, 0).unwrap();
1082        let leaf_count = Location::new(2);
1083        let pinned_nodes = merkle
1084            .nodes_to_pin(leaf_count)
1085            .into_values()
1086            .collect::<Vec<_>>();
1087        let proof = merkle.proof(&hasher, Location::new(1), 0).unwrap();
1088        (
1089            State {
1090                leaf_count,
1091                pinned_nodes,
1092                last_commit_op: op,
1093                last_commit_proof: proof,
1094            },
1095            Target::<mmr::Family, Digest> { root, leaf_count },
1096        )
1097    }
1098
1099    #[test]
1100    fn test_all_compact_qmdb_variants_implement_strategy_resolvers() {
1101        type KeylessFixedCompactDb = crate::qmdb::keyless::fixed::CompactDb<
1102            mmr::Family,
1103            deterministic::Context,
1104            Digest,
1105            commonware_cryptography::Sha256,
1106            Rayon,
1107        >;
1108        type KeylessVariableCompactDb = crate::qmdb::keyless::variable::CompactDb<
1109            mmr::Family,
1110            deterministic::Context,
1111            Vec<u8>,
1112            commonware_cryptography::Sha256,
1113            (RangeCfg<usize>, ()),
1114            Rayon,
1115        >;
1116        type ImmutableFixedCompactDb = crate::qmdb::immutable::fixed::CompactDb<
1117            mmr::Family,
1118            deterministic::Context,
1119            Digest,
1120            Digest,
1121            commonware_cryptography::Sha256,
1122            Rayon,
1123        >;
1124        type ImmutableVariableCompactDb = crate::qmdb::immutable::variable::CompactDb<
1125            mmr::Family,
1126            deterministic::Context,
1127            Digest,
1128            Vec<u8>,
1129            commonware_cryptography::Sha256,
1130            ((), (RangeCfg<usize>, ())),
1131            Rayon,
1132        >;
1133
1134        assert_resolver_variants!(KeylessFixedCompactDb);
1135        assert_resolver_variants!(KeylessVariableCompactDb);
1136        assert_resolver_variants!(ImmutableFixedCompactDb);
1137        assert_resolver_variants!(ImmutableVariableCompactDb);
1138    }
1139
1140    #[test]
1141    fn test_target_decode_rejects_zero_leaf_count() {
1142        let unused_root = commonware_cryptography::Sha256::hash(b"unused");
1143        let encoded = Target::<mmr::Family, Digest> {
1144            root: unused_root,
1145            leaf_count: crate::merkle::Location::new(0),
1146        }
1147        .encode();
1148
1149        assert!(Target::<mmr::Family, Digest>::decode(encoded).is_err());
1150    }
1151
1152    #[test]
1153    fn test_compact_sync_retries_invalid_state_without_feedback() {
1154        deterministic::Runner::default().start(|context| async move {
1155            let (good_state, target) = valid_state_and_target();
1156            let mut bad_state = good_state.clone();
1157            bad_state.pinned_nodes.push(Sha256::hash(b"extra pin"));
1158            let (good_tx, good_rx) = commonware_utils::channel::oneshot::channel();
1159            let constructions = Arc::new(AtomicUsize::new(0));
1160
1161            let db = super::sync::<TestDb, _>(Config {
1162                context,
1163                resolver: SequenceResolver {
1164                    states: Arc::new(commonware_utils::sync::Mutex::new(VecDeque::from([
1165                        FetchResult {
1166                            state: bad_state,
1167                            callback: None,
1168                        },
1169                        FetchResult {
1170                            state: good_state,
1171                            callback: Some(good_tx),
1172                        },
1173                    ]))),
1174                },
1175                target: target.clone(),
1176                db_config: (target.root, constructions.clone()),
1177            })
1178            .await
1179            .unwrap();
1180
1181            assert!(good_rx.await.expect("valid feedback should arrive"));
1182            assert_eq!(constructions.load(Ordering::SeqCst), 1);
1183            assert_eq!(db.root(), target.root);
1184        });
1185    }
1186}
1187
1188#[cfg(all(test, feature = "arbitrary"))]
1189mod conformance {
1190    use super::*;
1191    use crate::merkle::{mmb, mmr};
1192    use commonware_codec::conformance::CodecConformance;
1193    use commonware_cryptography::sha256::Digest as Sha256Digest;
1194
1195    commonware_conformance::conformance_tests! {
1196        CodecConformance<Target<mmr::Family, Sha256Digest>>,
1197        CodecConformance<Target<mmb::Family, Sha256Digest>>,
1198    }
1199}