Skip to main content

commonware_storage/qmdb/current/sync/
mod.rs

1//! Shared synchronization logic for [crate::qmdb::current] databases.
2//!
3//! Contains implementation of [crate::qmdb::sync::Database] for all
4//! [Db](crate::qmdb::current::db::Db) variants (ordered/unordered, fixed/variable).
5//!
6//! The canonical root of a `current` database combines the ops root, grafted root, and optional
7//! pending and partial chunk digests into a single hash (see the [Root structure](super) section in
8//! the module documentation). The sync engine operates on the **ops root**, not the canonical root:
9//! it downloads operations and verifies each batch against the ops root using ops-tree range proofs
10//! (identical to `any` sync). Callers that verify current ops proofs directly should use
11//! `qmdb::hasher`. [crate::qmdb::current::proof::OpsRootWitness] can be used by callers that need
12//! to authenticate the synced ops root against a trusted canonical root; the sync engine does not
13//! perform this check itself.
14//!
15//! After all operations are synced, the bitmap and grafted tree are reconstructed deterministically
16//! from the operations. The canonical root is then computed from the ops root, the reconstructed
17//! grafted root, and any pending or partial chunk digests.
18//!
19//! The [Database]`::`[root()](crate::qmdb::sync::Database::root) implementation returns the **ops
20//! root** (not the canonical root) because that is what the sync engine verifies against.
21//!
22//! For pruned databases (`range.start > 0`), grafted pinned nodes for the pruned region are read
23//! directly from the ops tree after it is built. This works because of the zero-chunk identity: for
24//! all-zero bitmap chunks (which all pruned chunks are), the grafted leaf equals the ops subtree
25//! root, making the grafted tree structurally identical to the ops tree at and above the grafting
26//! height.
27
28use crate::{
29    index::Factory as IndexFactory,
30    journal::{
31        authenticated,
32        contiguous::{fixed, variable, Mutable, Reader as _},
33    },
34    merkle::{
35        full::{self, Merkle},
36        Graftable, Location,
37    },
38    qmdb::{
39        self,
40        any::{
41            db::{Db as AnyDb, Metrics as AnyMetrics},
42            operation::{update::Update, Operation},
43            ordered::{
44                fixed::{Operation as OrderedFixedOp, Update as OrderedFixedUpdate},
45                variable::{Operation as OrderedVariableOp, Update as OrderedVariableUpdate},
46            },
47            unordered::{
48                fixed::{Operation as UnorderedFixedOp, Update as UnorderedFixedUpdate},
49                variable::{Operation as UnorderedVariableOp, Update as UnorderedVariableUpdate},
50            },
51            FixedValue, VariableValue,
52        },
53        bitmap::Shared,
54        current::{
55            db, grafting,
56            ordered::{
57                fixed::Db as CurrentOrderedFixedDb, variable::Db as CurrentOrderedVariableDb,
58            },
59            unordered::{
60                fixed::Db as CurrentUnorderedFixedDb, variable::Db as CurrentUnorderedVariableDb,
61            },
62            FixedConfig, VariableConfig,
63        },
64        operation::{Committable, Key, Operation as _},
65        sync::{resolver::fetch_operations, Database, DatabaseConfig as Config},
66    },
67    translator::Translator,
68    Context, Persistable,
69};
70use commonware_codec::{Codec, CodecShared, Read as CodecRead};
71use commonware_cryptography::{DigestOf, Hasher};
72use commonware_parallel::Strategy;
73use commonware_utils::{
74    bitmap::Prunable as BitMap, channel::oneshot, range::NonEmptyRange, sync::AsyncMutex, Array,
75};
76use std::sync::Arc;
77
78#[cfg(test)]
79pub(crate) mod tests;
80
81impl<T: Translator, J: Clone, S: Strategy> Config for super::Config<T, J, S> {
82    type JournalConfig = J;
83
84    fn journal_config(&self) -> Self::JournalConfig {
85        self.journal_config.clone()
86    }
87}
88
89/// Shared helper to build a `current::db::Db` from sync components.
90///
91/// This follows the same pattern as `any/sync/mod.rs::build_db` but additionally:
92/// * Builds the activity bitmap by replaying the operations log.
93/// * Extracts grafted pinned nodes from the ops tree (zero-chunk identity).
94/// * Builds the grafted tree from the bitmap and ops tree.
95/// * Computes and caches the canonical root.
96#[allow(clippy::too_many_arguments)]
97async fn build_db<F, E, U, I, H, J, T, const N: usize, S>(
98    context: E,
99    merkle_config: full::Config<S>,
100    log: J,
101    translator: T,
102    pinned_nodes: Option<Vec<H::Digest>>,
103    range: NonEmptyRange<Location<F>>,
104    apply_batch_size: usize,
105    metadata_partition: String,
106    strategy: S,
107) -> Result<db::Db<F, E, J, I, H, U, N, S>, qmdb::Error<F>>
108where
109    F: Graftable,
110    E: Context,
111    U: Update + Send + Sync + 'static,
112    I: IndexFactory<T, Value = Location<F>>,
113    H: Hasher,
114    T: Translator,
115    J: Mutable<Item = Operation<F, U>> + Persistable<Error = crate::journal::Error>,
116    S: Strategy,
117    Operation<F, U>: Codec + Committable + CodecShared,
118{
119    // Build authenticated log.
120    let hasher = qmdb::hasher::<H>();
121    let merkle = Merkle::<F, _, _, S>::init_sync(
122        context.child("merkle"),
123        full::SyncConfig {
124            config: merkle_config,
125            range: range.clone(),
126            pinned_nodes,
127        },
128    )
129    .await?;
130    let index = I::new(context.child("index"), translator);
131    let log = authenticated::Journal::<F, _, _, _, S>::from_components(
132        merkle,
133        log,
134        hasher,
135        apply_batch_size as u64,
136    )
137    .await?;
138
139    // Initialize bitmap with pruned chunks.
140    //
141    // Floor division is intentional: chunks entirely below range.start are pruned.
142    // If range.start is not chunk-aligned, the partial leading chunk is reconstructed by
143    // init_from_log, which pads the gap between `pruned_chunks * CHUNK_SIZE_BITS` and the
144    // journal's inactivity floor with inactive (false) bits.
145    let pruned_chunks = (*range.start() / BitMap::<N>::CHUNK_SIZE_BITS) as usize;
146    let bitmap = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
147        .map_err(|_| qmdb::Error::<F>::DataCorrupted("pruned chunks overflow"))?;
148    let bitmap = Arc::new(Shared::<N>::new(bitmap));
149
150    // Build any::Db, handing it the pre-allocated bitmap. `init_from_log` populates the bitmap
151    // during replay.
152    let any_metrics = AnyMetrics::new(context.child("any"));
153    let any: AnyDb<F, E, J, I, H, U, N, S> =
154        AnyDb::init_from_log(index, log, Some(bitmap), any_metrics).await?;
155
156    // Fetch grafted pinned nodes from the ops tree. For each position the grafted family
157    // needs at its pruning boundary, source the digest from the ops tree via the zero-chunk
158    // identity: when the covered chunks are all zero (which pruned chunks always are), the
159    // ops-family digest at the mapped position equals the grafted digest.
160    //
161    // Requires `range.start <=` target's [`Db::sync_boundary`](db::Db::sync_boundary): that
162    // bound guarantees every required ops-tree node is born at `range.end`.
163    let grafted_pinned_nodes = {
164        let grafted_boundary = Location::<F>::new(pruned_chunks as u64);
165        let grafting_height = grafting::height::<N>();
166        let mut pins = Vec::new();
167        for grafted_pos in F::nodes_to_pin(grafted_boundary) {
168            let ops_pos = grafting::grafted_to_ops_pos::<F>(grafted_pos, grafting_height);
169            let digest = any
170                .log
171                .merkle
172                .get_node(ops_pos)
173                .await?
174                .ok_or(qmdb::Error::<F>::DataCorrupted("missing ops pinned node"))?;
175            pins.push(digest);
176        }
177        pins
178    };
179
180    // Build grafted tree.
181    let hasher = qmdb::hasher::<H>();
182    let ops_size = any.log.merkle.size();
183    let ops_leaves = Location::<F>::try_from(ops_size)?;
184    let grafted_tree = db::build_grafted_tree::<F, H, S, N>(
185        &hasher,
186        any.bitmap.as_ref(),
187        &grafted_pinned_nodes,
188        &any.log.merkle,
189        ops_leaves,
190        &strategy,
191    )
192    .await?;
193
194    // Compute the canonical root. The grafted root is deterministic from the ops
195    // (which are authenticated by the engine) and the bitmap (which is deterministic
196    // from the ops).
197    let storage = grafting::Storage::new(
198        &grafted_tree,
199        grafting::height::<N>(),
200        &any.log.merkle,
201        hasher.clone(),
202    );
203    let partial = db::partial_chunk(any.bitmap.as_ref());
204    let grafted_root = db::compute_grafted_root(
205        &hasher,
206        any.bitmap.as_ref(),
207        &storage,
208        ops_leaves,
209        any.inactivity_floor_loc,
210    )
211    .await?;
212    let ops_root = any.root();
213    let partial_digest = partial.map(|(chunk, next_bit)| {
214        let digest = hasher.digest(&chunk);
215        (next_bit, digest)
216    });
217    let pending_digest =
218        db::pending_chunk::<F, _, N>(any.bitmap.as_ref(), ops_leaves, grafting::height::<N>())?
219            .map(|chunk| hasher.digest(&chunk));
220    let root = db::combine_roots(
221        &hasher,
222        &ops_root,
223        &grafted_root,
224        pending_digest.as_ref(),
225        partial_digest.as_ref().map(|(nb, d)| (*nb, d)),
226    );
227
228    // Initialize metadata store and construct the Db.
229    let (metadata, _, _) =
230        db::init_metadata::<F, E, DigestOf<H>>(context.child("metadata"), &metadata_partition)
231            .await?;
232
233    let metrics = db::Metrics::new(context);
234    let current_db = db::Db {
235        any,
236        grafted_tree,
237        metadata: AsyncMutex::new(metadata),
238        strategy,
239        root,
240        metrics,
241    };
242    current_db.update_metrics();
243
244    // Persist metadata so the db can be reopened with init_fixed/init_variable.
245    current_db.sync_metadata().await?;
246
247    Ok(current_db)
248}
249
250// --- Database trait implementations ---
251
252macro_rules! impl_current_sync_database {
253    ($db:ident, $op:ident, $update:ident,
254     $journal:ty, $config:ty,
255     $key_bound:path, $value_bound:ident
256     $(; $($where_extra:tt)+)?) => {
257        impl<F, E, K, V, H, T, const N: usize, S> Database for $db<F, E, K, V, H, T, N, S>
258        where
259            F: Graftable,
260            E: Context,
261            K: $key_bound,
262            V: $value_bound + 'static,
263            H: Hasher,
264            T: Translator,
265            S: Strategy,
266            $($($where_extra)+)?
267        {
268            type Family = F;
269            type Context = E;
270            type Op = $op<F, K, V>;
271            type Journal = $journal;
272            type Hasher = H;
273            type Config = $config;
274            type Digest = H::Digest;
275
276            async fn from_sync_result(
277                context: Self::Context,
278                config: Self::Config,
279                log: Self::Journal,
280                pinned_nodes: Option<Vec<Self::Digest>>,
281                range: NonEmptyRange<Location<F>>,
282                apply_batch_size: usize,
283            ) -> Result<Self, qmdb::Error<F>> {
284                let merkle_config = config.merkle_config.clone();
285                let metadata_partition = config.grafted_metadata_partition.clone();
286                let strategy = config.merkle_config.strategy.clone();
287                let translator = config.translator.clone();
288                build_db::<F, _, $update<K, V>, _, H, _, T, N, _>(
289                    context,
290                    merkle_config,
291                    log,
292                    translator,
293                    pinned_nodes,
294                    range,
295                    apply_batch_size,
296                    metadata_partition,
297                    strategy,
298                )
299                .await
300            }
301
302            async fn local_boundary_nodes(
303                context: Self::Context,
304                config: &Self::Config,
305                target: &qmdb::sync::Target<Self::Family, Self::Digest>,
306                journal: &Self::Journal,
307            ) -> Result<Option<Vec<Self::Digest>>, qmdb::Error<F>> {
308                if target.range.start() == Location::new(0) {
309                    return Ok(None);
310                }
311
312                let reader = journal.reader().await;
313                let bounds = reader.bounds();
314                if Location::new(bounds.start) > target.range.start()
315                    || Location::new(bounds.end) != target.range.end()
316                {
317                    return Ok(None);
318                }
319
320                let inactivity_floor = qmdb::find_inactivity_floor_at::<F, _>(
321                    &reader,
322                    target.range.end(),
323                    |op| op.has_floor(),
324                )
325                .await?;
326                drop(reader);
327
328                let hasher = qmdb::hasher::<H>();
329                let merkle = Merkle::<F, _, _, S>::init(
330                    context.child("local_boundary_merkle"),
331                    &hasher,
332                    config.merkle_config.clone(),
333                )
334                .await?;
335                let bounds = merkle.bounds();
336                if bounds.start > target.range.start() || bounds.end != target.range.end() {
337                    return Ok(None);
338                }
339
340                let inactive_peaks = F::inactive_peaks(
341                    F::location_to_position(target.range.end()),
342                    inactivity_floor,
343                );
344                if merkle.root(&hasher, inactive_peaks)? != target.root {
345                    return Ok(None);
346                }
347
348                merkle
349                    .pinned_nodes_at(target.range.start())
350                    .await
351                    .map(Some)
352                    .map_err(Into::into)
353            }
354
355            /// Returns the ops root (not the canonical root), since the sync engine verifies
356            /// batches against the ops tree.
357            fn root(&self) -> Self::Digest {
358                self.any.root()
359            }
360        }
361    };
362}
363
364impl_current_sync_database!(
365    CurrentUnorderedFixedDb, UnorderedFixedOp, UnorderedFixedUpdate,
366    fixed::Journal<E, Self::Op>, FixedConfig<T, S>,
367    Array, FixedValue
368);
369
370impl_current_sync_database!(
371    CurrentUnorderedVariableDb, UnorderedVariableOp, UnorderedVariableUpdate,
372    variable::Journal<E, Self::Op>,
373    VariableConfig<T, <UnorderedVariableOp<F, K, V> as CodecRead>::Cfg, S>,
374    Key, VariableValue;
375    UnorderedVariableOp<F, K, V>: CodecShared
376);
377
378impl_current_sync_database!(
379    CurrentOrderedFixedDb, OrderedFixedOp, OrderedFixedUpdate,
380    fixed::Journal<E, Self::Op>, FixedConfig<T, S>,
381    Array, FixedValue
382);
383
384impl_current_sync_database!(
385    CurrentOrderedVariableDb, OrderedVariableOp, OrderedVariableUpdate,
386    variable::Journal<E, Self::Op>,
387    VariableConfig<T, <OrderedVariableOp<F, K, V> as CodecRead>::Cfg, S>,
388    Key, VariableValue;
389    OrderedVariableOp<F, K, V>: CodecShared
390);
391
392// --- Resolver implementations ---
393//
394// The resolver for `current` databases serves ops-level proofs (not grafted proofs) from
395// the inner `any` db. The sync engine verifies each batch against the ops root.
396
397macro_rules! impl_current_resolver {
398    ($db:ident, $op:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => {
399        impl<F, E, K, V, H, T, const N: usize, S> crate::qmdb::sync::Resolver
400            for std::sync::Arc<$db<F, E, K, V, H, T, N, S>>
401        where
402            F: Graftable,
403            E: Context,
404            K: $key_bound,
405            V: $val_bound + Send + Sync + 'static,
406            H: Hasher,
407            T: Translator + Send + Sync + 'static,
408            T::Key: Send + Sync,
409            S: Strategy,
410            $($($where_extra)+)?
411        {
412            type Family = F;
413            type Digest = H::Digest;
414            type Op = $op<F, K, V>;
415            type Error = qmdb::Error<F>;
416
417            async fn get_operations(
418                &self,
419                op_count: Location<F>,
420                start_loc: Location<F>,
421                max_ops: std::num::NonZeroU64,
422                include_pinned_nodes: bool,
423                _cancel_rx: oneshot::Receiver<()>,
424            ) -> Result<crate::qmdb::sync::FetchResult<F, Self::Op, Self::Digest>, Self::Error> {
425                fetch_operations(
426                    op_count,
427                    start_loc,
428                    max_ops,
429                    include_pinned_nodes,
430                    |op_count, start_loc, max_ops| {
431                        self.any.historical_proof(op_count, start_loc, max_ops)
432                    },
433                    |start_loc| self.any.pinned_nodes_at(start_loc),
434                )
435                .await
436            }
437        }
438
439        impl<F, E, K, V, H, T, const N: usize, S> crate::qmdb::sync::Resolver
440            for std::sync::Arc<
441                commonware_utils::sync::AsyncRwLock<
442                    $db<F, E, K, V, H, T, N, S>,
443                >,
444            >
445        where
446            F: Graftable,
447            E: Context,
448            K: $key_bound,
449            V: $val_bound + Send + Sync + 'static,
450            H: Hasher,
451            T: Translator + Send + Sync + 'static,
452            T::Key: Send + Sync,
453            S: Strategy,
454            $($($where_extra)+)?
455        {
456            type Family = F;
457            type Digest = H::Digest;
458            type Op = $op<F, K, V>;
459            type Error = qmdb::Error<F>;
460
461            async fn get_operations(
462                &self,
463                op_count: Location<F>,
464                start_loc: Location<F>,
465                max_ops: std::num::NonZeroU64,
466                include_pinned_nodes: bool,
467                _cancel_rx: oneshot::Receiver<()>,
468            ) -> Result<crate::qmdb::sync::FetchResult<F, Self::Op, Self::Digest>, qmdb::Error<F>> {
469                let db = self.read().await;
470                fetch_operations(
471                    op_count,
472                    start_loc,
473                    max_ops,
474                    include_pinned_nodes,
475                    |op_count, start_loc, max_ops| {
476                        db.any.historical_proof(op_count, start_loc, max_ops)
477                    },
478                    |start_loc| db.any.pinned_nodes_at(start_loc),
479                )
480                .await
481            }
482        }
483
484        impl<F, E, K, V, H, T, const N: usize, S> crate::qmdb::sync::Resolver
485            for std::sync::Arc<
486                commonware_utils::sync::AsyncRwLock<
487                    Option<$db<F, E, K, V, H, T, N, S>>,
488                >,
489            >
490        where
491            F: Graftable,
492            E: Context,
493            K: $key_bound,
494            V: $val_bound + Send + Sync + 'static,
495            H: Hasher,
496            T: Translator + Send + Sync + 'static,
497            T::Key: Send + Sync,
498            S: Strategy,
499            $($($where_extra)+)?
500        {
501            type Family = F;
502            type Digest = H::Digest;
503            type Op = $op<F, K, V>;
504            type Error = qmdb::Error<F>;
505
506            async fn get_operations(
507                &self,
508                op_count: Location<F>,
509                start_loc: Location<F>,
510                max_ops: std::num::NonZeroU64,
511                include_pinned_nodes: bool,
512                _cancel_rx: oneshot::Receiver<()>,
513            ) -> Result<crate::qmdb::sync::FetchResult<F, Self::Op, Self::Digest>, qmdb::Error<F>> {
514                let guard = self.read().await;
515                let db = guard.as_ref().ok_or(qmdb::Error::<F>::KeyNotFound)?;
516                fetch_operations(
517                    op_count,
518                    start_loc,
519                    max_ops,
520                    include_pinned_nodes,
521                    |op_count, start_loc, max_ops| {
522                        db.any.historical_proof(op_count, start_loc, max_ops)
523                    },
524                    |start_loc| db.any.pinned_nodes_at(start_loc),
525                )
526                .await
527            }
528        }
529    };
530}
531
532// Unordered Fixed
533impl_current_resolver!(CurrentUnorderedFixedDb, UnorderedFixedOp, FixedValue, Array);
534
535// Unordered Variable
536impl_current_resolver!(
537    CurrentUnorderedVariableDb, UnorderedVariableOp, VariableValue, Key;
538    UnorderedVariableOp<F, K, V>: CodecShared,
539);
540
541// Ordered Fixed
542impl_current_resolver!(CurrentOrderedFixedDb, OrderedFixedOp, FixedValue, Array);
543
544// Ordered Variable
545impl_current_resolver!(
546    CurrentOrderedVariableDb, OrderedVariableOp, VariableValue, Key;
547    OrderedVariableOp<F, K, V>: CodecShared,
548);