Skip to main content

commonware_storage/qmdb/current/sync/
mod.rs

1//! Shared synchronization logic for [crate::qmdb::current] databases.
2//!
3//! Contains implementation of [crate::qmdb::sync::Database] for all [Db](crate::qmdb::current::db::Db)
4//! variants (ordered/unordered, fixed/variable).
5//!
6//! The canonical root of a `current` database combines the ops root, grafted MMR root, and
7//! optional partial chunk into a single hash (see the [Root structure](super) section in the
8//! module documentation). The sync engine operates on the **ops root**, not the canonical root:
9//! it downloads operations and verifies each batch against the ops root using standard MMR
10//! range proofs (identical to `any` sync). Validating that the ops root is part of the
11//! canonical root is the caller's responsibility; the sync engine does not perform this check.
12//!
13//! After all operations are synced, the bitmap and grafted MMR are reconstructed
14//! deterministically from the operations. The canonical root is then computed from the
15//! ops root, the reconstructed grafted MMR root, and any partial chunk.
16//!
17//! The [Database]`::`[root()](crate::qmdb::sync::Database::root)
18//! implementation returns the **ops root** (not the canonical root) because that is what the
19//! sync engine verifies against.
20//!
21//! For pruned databases (`range.start > 0`), grafted MMR pinned nodes for the pruned region
22//! are read directly from the ops MMR after it is built. This works because of the zero-chunk
23//! identity: for all-zero bitmap chunks (which all pruned chunks are), the grafted leaf equals
24//! the ops subtree root, making the grafted MMR structurally identical to the ops MMR at and
25//! above the grafting height.
26
27use crate::{
28    index::{ordered, unordered},
29    journal::{
30        authenticated,
31        contiguous::{fixed, variable, Mutable},
32    },
33    mmr::{
34        self, hasher::Hasher as _, journaled::Config as MmrConfig, Location, Position,
35        StandardHasher,
36    },
37    qmdb::{
38        self,
39        any::{
40            db::Db as AnyDb,
41            operation::{update::Update, Operation},
42            ordered::{
43                fixed::{Operation as OrderedFixedOp, Update as OrderedFixedUpdate},
44                variable::{Operation as OrderedVariableOp, Update as OrderedVariableUpdate},
45            },
46            unordered::{
47                fixed::{Operation as UnorderedFixedOp, Update as UnorderedFixedUpdate},
48                variable::{Operation as UnorderedVariableOp, Update as UnorderedVariableUpdate},
49            },
50            FixedConfig as AnyFixedConfig, FixedValue, ValueEncoding,
51            VariableConfig as AnyVariableConfig, VariableValue,
52        },
53        current::{
54            db, grafting,
55            ordered::{
56                fixed::Db as CurrentOrderedFixedDb, variable::Db as CurrentOrderedVariableDb,
57            },
58            unordered::{
59                fixed::Db as CurrentUnorderedFixedDb, variable::Db as CurrentUnorderedVariableDb,
60            },
61            FixedConfig, VariableConfig,
62        },
63        operation::{Committable, Key},
64        sync::{Database, DatabaseConfig as Config},
65    },
66    translator::Translator,
67    Persistable,
68};
69use commonware_codec::{Codec, CodecShared, Read as CodecRead};
70use commonware_cryptography::{DigestOf, Hasher};
71use commonware_runtime::{Clock, Metrics, Storage};
72use commonware_utils::{bitmap::Prunable as BitMap, sync::AsyncMutex, Array};
73use std::ops::Range;
74
75#[cfg(test)]
76pub(crate) mod tests;
77
78impl<T: Translator> Config for FixedConfig<T> {
79    type JournalConfig = fixed::Config;
80
81    fn journal_config(&self) -> Self::JournalConfig {
82        let any_config: AnyFixedConfig<T> = self.clone().into();
83        <AnyFixedConfig<T> as Config>::journal_config(&any_config)
84    }
85}
86
87impl<T: Translator, C: Clone> Config for VariableConfig<T, C> {
88    type JournalConfig = variable::Config<C>;
89
90    fn journal_config(&self) -> Self::JournalConfig {
91        let any_config: AnyVariableConfig<T, C> = self.clone().into();
92        <AnyVariableConfig<T, C> as Config>::journal_config(&any_config)
93    }
94}
95
96/// Extract MMR config from [FixedConfig].
97fn mmr_config_from_fixed<T: Translator>(config: &FixedConfig<T>) -> MmrConfig {
98    MmrConfig {
99        journal_partition: config.mmr_journal_partition.clone(),
100        metadata_partition: config.mmr_metadata_partition.clone(),
101        items_per_blob: config.mmr_items_per_blob,
102        write_buffer: config.mmr_write_buffer,
103        thread_pool: config.thread_pool.clone(),
104        page_cache: config.page_cache.clone(),
105    }
106}
107
108/// Extract MMR config from [VariableConfig].
109fn mmr_config_from_variable<T: Translator, C>(config: &VariableConfig<T, C>) -> MmrConfig {
110    MmrConfig {
111        journal_partition: config.mmr_journal_partition.clone(),
112        metadata_partition: config.mmr_metadata_partition.clone(),
113        items_per_blob: config.mmr_items_per_blob,
114        write_buffer: config.mmr_write_buffer,
115        thread_pool: config.thread_pool.clone(),
116        page_cache: config.page_cache.clone(),
117    }
118}
119
120/// Shared helper to build a `current::db::Db` from sync components.
121///
122/// This follows the same pattern as `any/sync/mod.rs::build_db` but additionally:
123/// * Builds the activity bitmap by replaying the operations log.
124/// * Extracts grafted pinned nodes from the ops MMR (zero-chunk identity).
125/// * Builds the grafted MMR from the bitmap and ops MMR.
126/// * Computes and caches the canonical root.
127#[allow(clippy::too_many_arguments)]
128async fn build_db<E, K, V, U, I, H, J, const N: usize>(
129    context: E,
130    mmr_config: MmrConfig,
131    log: J,
132    index: I,
133    pinned_nodes: Option<Vec<H::Digest>>,
134    range: Range<Location>,
135    apply_batch_size: usize,
136    metadata_partition: String,
137    thread_pool: Option<commonware_parallel::ThreadPool>,
138) -> Result<db::Db<E, J, I, H, U, N>, qmdb::Error>
139where
140    E: Storage + Clock + Metrics,
141    K: Key,
142    V: ValueEncoding,
143    U: Update<K, V> + Send + Sync + 'static,
144    I: crate::index::Unordered<Value = Location>,
145    H: Hasher,
146    J: Mutable<Item = Operation<K, V, U>> + Persistable<Error = crate::journal::Error>,
147    Operation<K, V, U>: Codec + Committable + CodecShared,
148{
149    // Build authenticated log.
150    let mut hasher = StandardHasher::<H>::new();
151    let mmr = mmr::journaled::Mmr::init_sync(
152        context.with_label("mmr"),
153        mmr::journaled::SyncConfig {
154            config: mmr_config,
155            range: range.clone(),
156            pinned_nodes,
157        },
158        &mut hasher,
159    )
160    .await?;
161    let log =
162        authenticated::Journal::from_components(mmr, log, hasher, apply_batch_size as u64).await?;
163
164    // Initialize bitmap with pruned chunks.
165    //
166    // Floor division is intentional: chunks entirely below range.start are pruned.
167    // If range.start is not chunk-aligned, the partial leading chunk is reconstructed by
168    // init_from_log, which pads the gap between `pruned_chunks * CHUNK_SIZE_BITS` and the
169    // journal's inactivity floor with inactive (false) bits.
170    let pruned_chunks = (*range.start / BitMap::<N>::CHUNK_SIZE_BITS) as usize;
171    let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
172        .map_err(|_| qmdb::Error::DataCorrupted("pruned chunks overflow"))?;
173
174    // Build any::Db with bitmap callback.
175    //
176    // init_from_log replays the operations, building the snapshot (index) and invoking
177    // our callback for each operation to populate the bitmap.
178    let known_inactivity_floor = Location::new(status.len());
179    let any: AnyDb<E, J, I, H, U> = AnyDb::init_from_log(
180        index,
181        log,
182        Some(known_inactivity_floor),
183        |is_active: bool, old_loc: Option<Location>| {
184            status.push(is_active);
185            if let Some(loc) = old_loc {
186                status.set_bit(*loc, false);
187            }
188        },
189    )
190    .await?;
191
192    // Extract grafted pinned nodes from the ops MMR.
193    //
194    // With the zero-chunk identity, all-zero bitmap chunks (which all pruned chunks are)
195    // produce grafted leaves equal to the corresponding ops subtree root. The grafted
196    // MMR's pinned nodes for the pruned region are therefore the first
197    // `popcount(pruned_chunks)` ops pinned nodes (in decreasing height order).
198    //
199    // `nodes_to_pin(range.start)` returns all ops peaks, but only the first
200    // `popcount(pruned_chunks)` are at or above the grafting height. The remaining
201    // smaller peaks cover the partial trailing chunk and are not grafted pinned nodes.
202    let grafted_pinned_nodes = {
203        let ops_pin_positions = mmr::iterator::nodes_to_pin(Position::try_from(range.start)?);
204        let num_grafted_pins = (pruned_chunks as u64).count_ones() as usize;
205        let mut pins = Vec::with_capacity(num_grafted_pins);
206        for pos in ops_pin_positions.take(num_grafted_pins) {
207            let digest = any
208                .log
209                .mmr
210                .get_node(pos)
211                .await?
212                .ok_or(qmdb::Error::DataCorrupted("missing ops pinned node"))?;
213            pins.push(digest);
214        }
215        pins
216    };
217
218    // Build grafted MMR.
219    let mut hasher = StandardHasher::<H>::new();
220    let grafted_mmr = db::build_grafted_mmr::<H, N>(
221        &mut hasher,
222        &status,
223        &grafted_pinned_nodes,
224        &any.log.mmr,
225        thread_pool.as_ref(),
226    )
227    .await?;
228
229    // Compute the canonical root. The grafted root is deterministic from the ops
230    // (which are authenticated by the engine) and the bitmap (which is deterministic
231    // from the ops).
232    let storage = grafting::Storage::new(&grafted_mmr, grafting::height::<N>(), &any.log.mmr);
233    let partial = db::partial_chunk(&status);
234    let grafted_mmr_root = db::compute_grafted_mmr_root(&mut hasher, &storage).await?;
235    let ops_root = any.log.root();
236    let partial_digest = partial.map(|(chunk, next_bit)| {
237        let digest = hasher.digest(&chunk);
238        (next_bit, digest)
239    });
240    let root = db::combine_roots(
241        &mut hasher,
242        &ops_root,
243        &grafted_mmr_root,
244        partial_digest.as_ref().map(|(nb, d)| (*nb, d)),
245    );
246
247    // Initialize metadata store and construct the Db.
248    let (metadata, _, _) =
249        db::init_metadata::<E, DigestOf<H>>(context.with_label("metadata"), &metadata_partition)
250            .await?;
251
252    let current_db = db::Db {
253        any,
254        status,
255        grafted_mmr,
256        metadata: AsyncMutex::new(metadata),
257        thread_pool,
258        root,
259    };
260
261    // Persist metadata so the db can be reopened with init_fixed/init_variable.
262    current_db.sync_metadata().await?;
263
264    Ok(current_db)
265}
266
267// --- Database trait implementations ---
268
269impl<E, K, V, H, T, const N: usize> Database for CurrentUnorderedFixedDb<E, K, V, H, T, N>
270where
271    E: Storage + Clock + Metrics,
272    K: Array,
273    V: FixedValue + 'static,
274    H: Hasher,
275    T: Translator,
276{
277    type Context = E;
278    type Op = UnorderedFixedOp<K, V>;
279    type Journal = fixed::Journal<E, Self::Op>;
280    type Hasher = H;
281    type Config = FixedConfig<T>;
282    type Digest = H::Digest;
283
284    async fn from_sync_result(
285        context: Self::Context,
286        config: Self::Config,
287        log: Self::Journal,
288        pinned_nodes: Option<Vec<Self::Digest>>,
289        range: Range<Location>,
290        apply_batch_size: usize,
291    ) -> Result<Self, qmdb::Error> {
292        let mmr_config = mmr_config_from_fixed(&config);
293        let metadata_partition = config.grafted_mmr_metadata_partition.clone();
294        let thread_pool = config.thread_pool.clone();
295        let index = unordered::Index::new(context.with_label("index"), config.translator.clone());
296        build_db::<_, K, _, UnorderedFixedUpdate<K, V>, _, H, _, N>(
297            context,
298            mmr_config,
299            log,
300            index,
301            pinned_nodes,
302            range,
303            apply_batch_size,
304            metadata_partition,
305            thread_pool,
306        )
307        .await
308    }
309
310    /// Returns the ops root (not the canonical root), since the sync engine verifies
311    /// batches against the ops MMR.
312    fn root(&self) -> Self::Digest {
313        self.any.log.root()
314    }
315}
316
317impl<E, K, V, H, T, const N: usize> Database for CurrentUnorderedVariableDb<E, K, V, H, T, N>
318where
319    E: Storage + Clock + Metrics,
320    K: Key,
321    V: VariableValue + 'static,
322    H: Hasher,
323    T: Translator,
324    UnorderedVariableOp<K, V>: CodecShared,
325{
326    type Context = E;
327    type Op = UnorderedVariableOp<K, V>;
328    type Journal = variable::Journal<E, Self::Op>;
329    type Hasher = H;
330    type Config = VariableConfig<T, <UnorderedVariableOp<K, V> as CodecRead>::Cfg>;
331    type Digest = H::Digest;
332
333    async fn from_sync_result(
334        context: Self::Context,
335        config: Self::Config,
336        log: Self::Journal,
337        pinned_nodes: Option<Vec<Self::Digest>>,
338        range: Range<Location>,
339        apply_batch_size: usize,
340    ) -> Result<Self, qmdb::Error> {
341        let mmr_config = mmr_config_from_variable(&config);
342        let metadata_partition = config.grafted_mmr_metadata_partition.clone();
343        let thread_pool = config.thread_pool.clone();
344        let index = unordered::Index::new(context.with_label("index"), config.translator.clone());
345        build_db::<_, K, _, UnorderedVariableUpdate<K, V>, _, H, _, N>(
346            context,
347            mmr_config,
348            log,
349            index,
350            pinned_nodes,
351            range,
352            apply_batch_size,
353            metadata_partition,
354            thread_pool,
355        )
356        .await
357    }
358
359    /// Returns the ops root (not the canonical root), since the sync engine verifies
360    /// batches against the ops MMR.
361    fn root(&self) -> Self::Digest {
362        self.any.log.root()
363    }
364}
365
366impl<E, K, V, H, T, const N: usize> Database for CurrentOrderedFixedDb<E, K, V, H, T, N>
367where
368    E: Storage + Clock + Metrics,
369    K: Array,
370    V: FixedValue + 'static,
371    H: Hasher,
372    T: Translator,
373{
374    type Context = E;
375    type Op = OrderedFixedOp<K, V>;
376    type Journal = fixed::Journal<E, Self::Op>;
377    type Hasher = H;
378    type Config = FixedConfig<T>;
379    type Digest = H::Digest;
380
381    async fn from_sync_result(
382        context: Self::Context,
383        config: Self::Config,
384        log: Self::Journal,
385        pinned_nodes: Option<Vec<Self::Digest>>,
386        range: Range<Location>,
387        apply_batch_size: usize,
388    ) -> Result<Self, qmdb::Error> {
389        let mmr_config = mmr_config_from_fixed(&config);
390        let metadata_partition = config.grafted_mmr_metadata_partition.clone();
391        let thread_pool = config.thread_pool.clone();
392        let index = ordered::Index::new(context.with_label("index"), config.translator.clone());
393        build_db::<_, K, _, OrderedFixedUpdate<K, V>, _, H, _, N>(
394            context,
395            mmr_config,
396            log,
397            index,
398            pinned_nodes,
399            range,
400            apply_batch_size,
401            metadata_partition,
402            thread_pool,
403        )
404        .await
405    }
406
407    /// Returns the ops root (not the canonical root), since the sync engine verifies
408    /// batches against the ops MMR.
409    fn root(&self) -> Self::Digest {
410        self.any.log.root()
411    }
412}
413
414impl<E, K, V, H, T, const N: usize> Database for CurrentOrderedVariableDb<E, K, V, H, T, N>
415where
416    E: Storage + Clock + Metrics,
417    K: Key,
418    V: VariableValue + 'static,
419    H: Hasher,
420    T: Translator,
421    OrderedVariableOp<K, V>: CodecShared,
422{
423    type Context = E;
424    type Op = OrderedVariableOp<K, V>;
425    type Journal = variable::Journal<E, Self::Op>;
426    type Hasher = H;
427    type Config = VariableConfig<T, <OrderedVariableOp<K, V> as CodecRead>::Cfg>;
428    type Digest = H::Digest;
429
430    async fn from_sync_result(
431        context: Self::Context,
432        config: Self::Config,
433        log: Self::Journal,
434        pinned_nodes: Option<Vec<Self::Digest>>,
435        range: Range<Location>,
436        apply_batch_size: usize,
437    ) -> Result<Self, qmdb::Error> {
438        let mmr_config = mmr_config_from_variable(&config);
439        let metadata_partition = config.grafted_mmr_metadata_partition.clone();
440        let thread_pool = config.thread_pool.clone();
441        let index = ordered::Index::new(context.with_label("index"), config.translator.clone());
442        build_db::<_, K, _, OrderedVariableUpdate<K, V>, _, H, _, N>(
443            context,
444            mmr_config,
445            log,
446            index,
447            pinned_nodes,
448            range,
449            apply_batch_size,
450            metadata_partition,
451            thread_pool,
452        )
453        .await
454    }
455
456    /// Returns the ops root (not the canonical root), since the sync engine verifies
457    /// batches against the ops MMR.
458    fn root(&self) -> Self::Digest {
459        self.any.log.root()
460    }
461}
462
463// --- Resolver implementations ---
464//
465// The resolver for `current` databases serves ops-level proofs (not grafted proofs) from
466// the inner `any` db. The sync engine verifies each batch against the ops root.
467
468macro_rules! impl_current_resolver {
469    ($db:ident, $op:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => {
470        impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
471            for std::sync::Arc<$db<E, K, V, H, T, N>>
472        where
473            E: Storage + Clock + Metrics,
474            K: $key_bound,
475            V: $val_bound + Send + Sync + 'static,
476            H: Hasher,
477            T: Translator + Send + Sync + 'static,
478            T::Key: Send + Sync,
479            $($($where_extra)+)?
480        {
481            type Digest = H::Digest;
482            type Op = $op<K, V>;
483            type Error = qmdb::Error;
484
485            async fn get_operations(
486                &self,
487                op_count: Location,
488                start_loc: Location,
489                max_ops: std::num::NonZeroU64,
490            ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, Self::Error> {
491                self.any
492                    .historical_proof(op_count, start_loc, max_ops)
493                    .await
494                    .map(|(proof, operations)| crate::qmdb::sync::FetchResult {
495                        proof,
496                        operations,
497                        success_tx: commonware_utils::channel::oneshot::channel().0,
498                    })
499            }
500        }
501
502        impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
503            for std::sync::Arc<
504                commonware_utils::sync::AsyncRwLock<
505                    $db<E, K, V, H, T, N>,
506                >,
507            >
508        where
509            E: Storage + Clock + Metrics,
510            K: $key_bound,
511            V: $val_bound + Send + Sync + 'static,
512            H: Hasher,
513            T: Translator + Send + Sync + 'static,
514            T::Key: Send + Sync,
515            $($($where_extra)+)?
516        {
517            type Digest = H::Digest;
518            type Op = $op<K, V>;
519            type Error = qmdb::Error;
520
521            async fn get_operations(
522                &self,
523                op_count: Location,
524                start_loc: Location,
525                max_ops: std::num::NonZeroU64,
526            ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
527                let db = self.read().await;
528                db.any
529                    .historical_proof(op_count, start_loc, max_ops)
530                    .await
531                    .map(|(proof, operations)| crate::qmdb::sync::FetchResult {
532                        proof,
533                        operations,
534                        success_tx: commonware_utils::channel::oneshot::channel().0,
535                    })
536            }
537        }
538
539        impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
540            for std::sync::Arc<
541                commonware_utils::sync::AsyncRwLock<
542                    Option<$db<E, K, V, H, T, N>>,
543                >,
544            >
545        where
546            E: Storage + Clock + Metrics,
547            K: $key_bound,
548            V: $val_bound + Send + Sync + 'static,
549            H: Hasher,
550            T: Translator + Send + Sync + 'static,
551            T::Key: Send + Sync,
552            $($($where_extra)+)?
553        {
554            type Digest = H::Digest;
555            type Op = $op<K, V>;
556            type Error = qmdb::Error;
557
558            async fn get_operations(
559                &self,
560                op_count: Location,
561                start_loc: Location,
562                max_ops: std::num::NonZeroU64,
563            ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
564                let guard = self.read().await;
565                let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
566                db.any
567                    .historical_proof(op_count, start_loc, max_ops)
568                    .await
569                    .map(|(proof, operations)| crate::qmdb::sync::FetchResult {
570                        proof,
571                        operations,
572                        success_tx: commonware_utils::channel::oneshot::channel().0,
573                    })
574            }
575        }
576    };
577}
578
579// Unordered Fixed
580impl_current_resolver!(CurrentUnorderedFixedDb, UnorderedFixedOp, FixedValue, Array);
581
582// Unordered Variable
583impl_current_resolver!(
584    CurrentUnorderedVariableDb, UnorderedVariableOp, VariableValue, Key;
585    UnorderedVariableOp<K, V>: CodecShared,
586);
587
588// Ordered Fixed
589impl_current_resolver!(CurrentOrderedFixedDb, OrderedFixedOp, FixedValue, Array);
590
591// Ordered Variable
592impl_current_resolver!(
593    CurrentOrderedVariableDb, OrderedVariableOp, VariableValue, Key;
594    OrderedVariableOp<K, V>: CodecShared,
595);