Skip to main content

commonware_storage/qmdb/current/sync/
mod.rs

1//! Shared synchronization logic for [crate::qmdb::current] databases.
2//!
3//! Contains implementation of [crate::qmdb::sync::Database] for all [Db](crate::qmdb::current::db::Db)
4//! variants (ordered/unordered, fixed/variable).
5//!
6//! The canonical root of a `current` database combines the ops root, grafted MMR root, and
7//! optional partial chunk into a single hash (see the [Root structure](super) section in the
8//! module documentation). The sync engine operates on the **ops root**, not the canonical root:
9//! it downloads operations and verifies each batch against the ops root using standard MMR
10//! range proofs (identical to `any` sync). Validating that the ops root is part of the
11//! canonical root is the caller's responsibility; the sync engine does not perform this check.
12//!
13//! After all operations are synced, the bitmap and grafted MMR are reconstructed
14//! deterministically from the operations. The canonical root is then computed from the
15//! ops root, the reconstructed grafted MMR root, and any partial chunk.
16//!
17//! The [Database]`::`[root()](crate::qmdb::sync::Database::root)
18//! implementation returns the **ops root** (not the canonical root) because that is what the
19//! sync engine verifies against.
20//!
21//! For pruned databases (`range.start > 0`), grafted MMR pinned nodes for the pruned region
22//! are read directly from the ops MMR after it is built. This works because of the zero-chunk
23//! identity: for all-zero bitmap chunks (which all pruned chunks are), the grafted leaf equals
24//! the ops subtree root, making the grafted MMR structurally identical to the ops MMR at and
25//! above the grafting height.
26
27use crate::{
28    index::Factory as IndexFactory,
29    journal::{
30        authenticated,
31        contiguous::{fixed, variable, Mutable},
32    },
33    merkle::{
34        mmr::{self, Family, Location, StandardHasher},
35        Family as _,
36    },
37    qmdb::{
38        self,
39        any::{
40            db::Db as AnyDb,
41            operation::{update::Update, Operation},
42            ordered::{
43                fixed::{Operation as OrderedFixedOp, Update as OrderedFixedUpdate},
44                variable::{Operation as OrderedVariableOp, Update as OrderedVariableUpdate},
45            },
46            unordered::{
47                fixed::{Operation as UnorderedFixedOp, Update as UnorderedFixedUpdate},
48                variable::{Operation as UnorderedVariableOp, Update as UnorderedVariableUpdate},
49            },
50            FixedValue, VariableValue,
51        },
52        current::{
53            db, grafting,
54            ordered::{
55                fixed::Db as CurrentOrderedFixedDb, variable::Db as CurrentOrderedVariableDb,
56            },
57            unordered::{
58                fixed::Db as CurrentUnorderedFixedDb, variable::Db as CurrentUnorderedVariableDb,
59            },
60            FixedConfig, VariableConfig,
61        },
62        operation::{Committable, Key},
63        sync::{Database, DatabaseConfig as Config},
64    },
65    translator::Translator,
66    Context, Persistable,
67};
68use commonware_codec::{Codec, CodecShared, Read as CodecRead};
69use commonware_cryptography::{DigestOf, Hasher};
70use commonware_utils::{bitmap::Prunable as BitMap, channel::oneshot, sync::AsyncMutex, Array};
71use std::{ops::Range, sync::Arc};
72
73#[cfg(test)]
74pub(crate) mod tests;
75
76impl<T: Translator, J: Clone> Config for super::Config<T, J> {
77    type JournalConfig = J;
78
79    fn journal_config(&self) -> Self::JournalConfig {
80        self.journal_config.clone()
81    }
82}
83
84/// Shared helper to build a `current::db::Db` from sync components.
85///
86/// This follows the same pattern as `any/sync/mod.rs::build_db` but additionally:
87/// * Builds the activity bitmap by replaying the operations log.
88/// * Extracts grafted pinned nodes from the ops MMR (zero-chunk identity).
89/// * Builds the grafted MMR from the bitmap and ops MMR.
90/// * Computes and caches the canonical root.
91#[allow(clippy::too_many_arguments)]
92async fn build_db<E, U, I, H, J, T, const N: usize>(
93    context: E,
94    mmr_config: mmr::journaled::Config,
95    log: J,
96    translator: T,
97    pinned_nodes: Option<Vec<H::Digest>>,
98    range: Range<Location>,
99    apply_batch_size: usize,
100    metadata_partition: String,
101    thread_pool: Option<commonware_parallel::ThreadPool>,
102) -> Result<db::Db<Family, E, J, I, H, U, N>, qmdb::Error<Family>>
103where
104    E: Context,
105    U: Update + Send + Sync + 'static,
106    I: IndexFactory<T, Value = Location>,
107    H: Hasher,
108    T: Translator,
109    J: Mutable<Item = Operation<Family, U>> + Persistable<Error = crate::journal::Error>,
110    Operation<Family, U>: Codec + Committable + CodecShared,
111{
112    // Build authenticated log.
113    let hasher = StandardHasher::<H>::new();
114    let mmr = mmr::journaled::Mmr::init_sync(
115        context.with_label("mmr"),
116        mmr::journaled::SyncConfig {
117            config: mmr_config,
118            range: range.clone(),
119            pinned_nodes,
120        },
121        &hasher,
122    )
123    .await?;
124    let index = I::new(context.with_label("index"), translator);
125    let log = authenticated::Journal::<Family, _, _, _>::from_components(
126        mmr,
127        log,
128        hasher,
129        apply_batch_size as u64,
130    )
131    .await?;
132
133    // Initialize bitmap with pruned chunks.
134    //
135    // Floor division is intentional: chunks entirely below range.start are pruned.
136    // If range.start is not chunk-aligned, the partial leading chunk is reconstructed by
137    // init_from_log, which pads the gap between `pruned_chunks * CHUNK_SIZE_BITS` and the
138    // journal's inactivity floor with inactive (false) bits.
139    let pruned_chunks = (*range.start / BitMap::<N>::CHUNK_SIZE_BITS) as usize;
140    let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
141        .map_err(|_| qmdb::Error::<Family>::DataCorrupted("pruned chunks overflow"))?;
142
143    // Build any::Db with bitmap callback.
144    //
145    // init_from_log replays the operations, building the snapshot (index) and invoking
146    // our callback for each operation to populate the bitmap.
147    let known_inactivity_floor = Location::new(status.len());
148    let any: AnyDb<Family, E, J, I, H, U> = AnyDb::init_from_log(
149        index,
150        log,
151        Some(known_inactivity_floor),
152        |is_active: bool, old_loc: Option<Location>| {
153            status.push(is_active);
154            if let Some(loc) = old_loc {
155                status.set_bit(*loc, false);
156            }
157        },
158    )
159    .await?;
160
161    // Extract grafted pinned nodes from the ops MMR.
162    //
163    // With the zero-chunk identity, all-zero bitmap chunks (which all pruned chunks are)
164    // produce grafted leaves equal to the corresponding ops subtree root. The grafted
165    // MMR's pinned nodes for the pruned region are therefore the first
166    // `popcount(pruned_chunks)` ops pinned nodes (in decreasing height order).
167    //
168    // `nodes_to_pin(range.start)` returns all ops peaks, but only the first
169    // `popcount(pruned_chunks)` are at or above the grafting height. The remaining
170    // smaller peaks cover the partial trailing chunk and are not grafted pinned nodes.
171    let grafted_pinned_nodes = {
172        let ops_pin_positions = mmr::Family::nodes_to_pin(range.start);
173        let num_grafted_pins = (pruned_chunks as u64).count_ones() as usize;
174        let mut pins = Vec::with_capacity(num_grafted_pins);
175        for pos in ops_pin_positions.take(num_grafted_pins) {
176            let digest = any.log.merkle.get_node(pos).await?.ok_or(
177                qmdb::Error::<mmr::Family>::DataCorrupted("missing ops pinned node"),
178            )?;
179            pins.push(digest);
180        }
181        pins
182    };
183
184    // Build grafted MMR.
185    let hasher = StandardHasher::<H>::new();
186    let grafted_tree = db::build_grafted_tree::<Family, H, N>(
187        &hasher,
188        &status,
189        &grafted_pinned_nodes,
190        &any.log.merkle,
191        thread_pool.as_ref(),
192    )
193    .await?;
194
195    // Compute the canonical root. The grafted root is deterministic from the ops
196    // (which are authenticated by the engine) and the bitmap (which is deterministic
197    // from the ops).
198    let storage = grafting::Storage::new(&grafted_tree, grafting::height::<N>(), &any.log.merkle);
199    let partial = db::partial_chunk(&status);
200    let grafted_root = db::compute_grafted_root(&hasher, &status, &storage).await?;
201    let ops_root = any.log.root();
202    let partial_digest = partial.map(|(chunk, next_bit)| {
203        let digest = hasher.digest(&chunk);
204        (next_bit, digest)
205    });
206    let root = db::combine_roots(
207        &hasher,
208        &ops_root,
209        &grafted_root,
210        partial_digest.as_ref().map(|(nb, d)| (*nb, d)),
211    );
212
213    // Initialize metadata store and construct the Db.
214    let (metadata, _, _) = db::init_metadata::<Family, E, DigestOf<H>>(
215        context.with_label("metadata"),
216        &metadata_partition,
217    )
218    .await?;
219
220    let current_db = db::Db {
221        any,
222        status: crate::qmdb::current::batch::BitmapBatch::Base(Arc::new(status)),
223        grafted_tree,
224        metadata: AsyncMutex::new(metadata),
225        thread_pool,
226        root,
227    };
228
229    // Persist metadata so the db can be reopened with init_fixed/init_variable.
230    current_db.sync_metadata().await?;
231
232    Ok(current_db)
233}
234
235// --- Database trait implementations ---
236
237macro_rules! impl_current_sync_database {
238    ($db:ident, $op:ident, $update:ident,
239     $journal:ty, $config:ty,
240     $key_bound:path, $value_bound:ident
241     $(; $($where_extra:tt)+)?) => {
242        impl<E, K, V, H, T, const N: usize> Database for $db<Family, E, K, V, H, T, N>
243        where
244            E: Context,
245            K: $key_bound,
246            V: $value_bound + 'static,
247            H: Hasher,
248            T: Translator,
249            $($($where_extra)+)?
250        {
251            type Context = E;
252            type Op = $op<Family, K, V>;
253            type Journal = $journal;
254            type Hasher = H;
255            type Config = $config;
256            type Digest = H::Digest;
257
258            async fn from_sync_result(
259                context: Self::Context,
260                config: Self::Config,
261                log: Self::Journal,
262                pinned_nodes: Option<Vec<Self::Digest>>,
263                range: Range<Location>,
264                apply_batch_size: usize,
265            ) -> Result<Self, qmdb::Error<Family>> {
266                let mmr_config = config.merkle_config.clone();
267                let metadata_partition = config.grafted_metadata_partition.clone();
268                let thread_pool = config.merkle_config.thread_pool.clone();
269                let translator = config.translator.clone();
270                build_db::<_, $update<K, V>, _, H, _, T, N>(
271                    context,
272                    mmr_config,
273                    log,
274                    translator,
275                    pinned_nodes,
276                    range,
277                    apply_batch_size,
278                    metadata_partition,
279                    thread_pool,
280                )
281                .await
282            }
283
284            /// Returns the ops root (not the canonical root), since the sync engine verifies
285            /// batches against the ops MMR.
286            fn root(&self) -> Self::Digest {
287                self.any.log.root()
288            }
289        }
290    };
291}
292
293impl_current_sync_database!(
294    CurrentUnorderedFixedDb, UnorderedFixedOp, UnorderedFixedUpdate,
295    fixed::Journal<E, Self::Op>, FixedConfig<T>,
296    Array, FixedValue
297);
298
299impl_current_sync_database!(
300    CurrentUnorderedVariableDb, UnorderedVariableOp, UnorderedVariableUpdate,
301    variable::Journal<E, Self::Op>,
302    VariableConfig<T, <UnorderedVariableOp<Family, K, V> as CodecRead>::Cfg>,
303    Key, VariableValue;
304    UnorderedVariableOp<Family, K, V>: CodecShared
305);
306
307impl_current_sync_database!(
308    CurrentOrderedFixedDb, OrderedFixedOp, OrderedFixedUpdate,
309    fixed::Journal<E, Self::Op>, FixedConfig<T>,
310    Array, FixedValue
311);
312
313impl_current_sync_database!(
314    CurrentOrderedVariableDb, OrderedVariableOp, OrderedVariableUpdate,
315    variable::Journal<E, Self::Op>,
316    VariableConfig<T, <OrderedVariableOp<Family, K, V> as CodecRead>::Cfg>,
317    Key, VariableValue;
318    OrderedVariableOp<Family, K, V>: CodecShared
319);
320
321// --- Resolver implementations ---
322//
323// The resolver for `current` databases serves ops-level proofs (not grafted proofs) from
324// the inner `any` db. The sync engine verifies each batch against the ops root.
325
326macro_rules! impl_current_resolver {
327    ($db:ident, $op:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => {
328        impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
329            for std::sync::Arc<$db<Family, E, K, V, H, T, N>>
330        where
331            E: Context,
332            K: $key_bound,
333            V: $val_bound + Send + Sync + 'static,
334            H: Hasher,
335            T: Translator + Send + Sync + 'static,
336            T::Key: Send + Sync,
337            $($($where_extra)+)?
338        {
339            type Digest = H::Digest;
340            type Op = $op<Family, K, V>;
341            type Error = qmdb::Error<Family>;
342
343            async fn get_operations(
344                &self,
345                op_count: Location,
346                start_loc: Location,
347                max_ops: std::num::NonZeroU64,
348                include_pinned_nodes: bool,
349                _cancel_rx: oneshot::Receiver<()>,
350            ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, Self::Error> {
351                let (proof, operations) = self.any
352                    .historical_proof(op_count, start_loc, max_ops)
353                    .await?;
354                let pinned_nodes = if include_pinned_nodes {
355                    Some(self.any.pinned_nodes_at(start_loc).await?)
356                } else {
357                    None
358                };
359                Ok(crate::qmdb::sync::FetchResult {
360                    proof,
361                    operations,
362                    success_tx: oneshot::channel().0,
363                    pinned_nodes,
364                })
365            }
366        }
367
368        impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
369            for std::sync::Arc<
370                commonware_utils::sync::AsyncRwLock<
371                    $db<Family, E, K, V, H, T, N>,
372                >,
373            >
374        where
375            E: Context,
376            K: $key_bound,
377            V: $val_bound + Send + Sync + 'static,
378            H: Hasher,
379            T: Translator + Send + Sync + 'static,
380            T::Key: Send + Sync,
381            $($($where_extra)+)?
382        {
383            type Digest = H::Digest;
384            type Op = $op<Family, K, V>;
385            type Error = qmdb::Error<Family>;
386
387            async fn get_operations(
388                &self,
389                op_count: Location,
390                start_loc: Location,
391                max_ops: std::num::NonZeroU64,
392                include_pinned_nodes: bool,
393                _cancel_rx: oneshot::Receiver<()>,
394            ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, qmdb::Error<Family>> {
395                let db = self.read().await;
396                let (proof, operations) = db.any
397                    .historical_proof(op_count, start_loc, max_ops)
398                    .await?;
399                let pinned_nodes = if include_pinned_nodes {
400                    Some(db.any.pinned_nodes_at(start_loc).await?)
401                } else {
402                    None
403                };
404                Ok(crate::qmdb::sync::FetchResult {
405                    proof,
406                    operations,
407                    success_tx: oneshot::channel().0,
408                    pinned_nodes,
409                })
410            }
411        }
412
413        impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
414            for std::sync::Arc<
415                commonware_utils::sync::AsyncRwLock<
416                    Option<$db<Family, E, K, V, H, T, N>>,
417                >,
418            >
419        where
420            E: Context,
421            K: $key_bound,
422            V: $val_bound + Send + Sync + 'static,
423            H: Hasher,
424            T: Translator + Send + Sync + 'static,
425            T::Key: Send + Sync,
426            $($($where_extra)+)?
427        {
428            type Digest = H::Digest;
429            type Op = $op<Family, K, V>;
430            type Error = qmdb::Error<Family>;
431
432            async fn get_operations(
433                &self,
434                op_count: Location,
435                start_loc: Location,
436                max_ops: std::num::NonZeroU64,
437                include_pinned_nodes: bool,
438                _cancel_rx: oneshot::Receiver<()>,
439            ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, qmdb::Error<Family>> {
440                let guard = self.read().await;
441                let db = guard.as_ref().ok_or(qmdb::Error::<Family>::KeyNotFound)?;
442                let (proof, operations) = db.any
443                    .historical_proof(op_count, start_loc, max_ops)
444                    .await?;
445                let pinned_nodes = if include_pinned_nodes {
446                    Some(db.any.pinned_nodes_at(start_loc).await?)
447                } else {
448                    None
449                };
450                Ok(crate::qmdb::sync::FetchResult {
451                    proof,
452                    operations,
453                    success_tx: oneshot::channel().0,
454                    pinned_nodes,
455                })
456            }
457        }
458    };
459}
460
461// Unordered Fixed
462impl_current_resolver!(CurrentUnorderedFixedDb, UnorderedFixedOp, FixedValue, Array);
463
464// Unordered Variable
465impl_current_resolver!(
466    CurrentUnorderedVariableDb, UnorderedVariableOp, VariableValue, Key;
467    UnorderedVariableOp<Family, K, V>: CodecShared,
468);
469
470// Ordered Fixed
471impl_current_resolver!(CurrentOrderedFixedDb, OrderedFixedOp, FixedValue, Array);
472
473// Ordered Variable
474impl_current_resolver!(
475    CurrentOrderedVariableDb, OrderedVariableOp, VariableValue, Key;
476    OrderedVariableOp<Family, K, V>: CodecShared,
477);