Skip to main content

commonware_storage/qmdb/immutable/sync/
mod.rs

1use crate::{
2    index::unordered::Index,
3    journal::{
4        authenticated,
5        contiguous::{Mutable, Reader as _},
6        Error as JournalError,
7    },
8    merkle::{
9        full::{self, Merkle},
10        Family, Location,
11    },
12    qmdb::{
13        self,
14        any::ValueEncoding,
15        build_snapshot_from_log,
16        immutable::{self, CompactDb, Metrics, Operation},
17        operation::Key,
18        sync::{self},
19        Error,
20    },
21    translator::Translator,
22    Context, Persistable,
23};
24use commonware_codec::{Encode, EncodeShared, Read};
25use commonware_cryptography::Hasher;
26use commonware_parallel::Strategy;
27use commonware_utils::range::NonEmptyRange;
28
29impl<F, E, K, V, C, H, T, S> sync::Database for immutable::Immutable<F, E, K, V, C, H, T, S>
30where
31    F: Family,
32    E: Context,
33    K: Key,
34    V: ValueEncoding,
35    C: Mutable<Item = Operation<F, K, V>>
36        + Persistable<Error = JournalError>
37        + sync::Journal<F, Context = E, Op = Operation<F, K, V>>,
38    C::Item: EncodeShared,
39    C::Config: Clone + Send,
40    H: Hasher,
41    T: Translator,
42    S: Strategy,
43{
44    type Family = F;
45    type Op = Operation<F, K, V>;
46    type Journal = C;
47    type Hasher = H;
48    type Config = immutable::Config<T, C::Config, S>;
49    type Digest = H::Digest;
50    type Context = E;
51
52    /// Returns an [Immutable](immutable::Immutable) initialized from data collected in the sync process.
53    ///
54    /// # Behavior
55    ///
56    /// This method handles different initialization scenarios based on existing data:
57    /// - If the Merkle journal is empty or the last item is before the range start, it creates a
58    ///   fresh Merkle structure from the provided `pinned_nodes`
59    /// - If the Merkle journal has data but is incomplete (has length < range end), missing
60    ///   operations from the log are applied to bring it up to the target state
61    /// - If the Merkle journal has data beyond the range end, it is rewound to match the sync
62    ///   target
63    ///
64    /// # Returns
65    ///
66    /// A [super::Immutable] db populated with the state from the given range.
67    /// The pruning boundary is set to the range start.
68    async fn from_sync_result(
69        context: Self::Context,
70        db_config: Self::Config,
71        log: Self::Journal,
72        pinned_nodes: Option<Vec<Self::Digest>>,
73        range: NonEmptyRange<Location<F>>,
74        apply_batch_size: usize,
75    ) -> Result<Self, Error<F>> {
76        let hasher = qmdb::hasher::<H>();
77
78        // Initialize Merkle structure for sync
79        let merkle = Merkle::<F, _, _, S>::init_sync(
80            context.child("merkle"),
81            full::SyncConfig {
82                config: db_config.merkle_config.clone(),
83                range: range.clone(),
84                pinned_nodes,
85            },
86        )
87        .await?;
88
89        let journal = authenticated::Journal::<_, _, _, _, S>::from_components(
90            merkle,
91            log,
92            hasher,
93            apply_batch_size as u64,
94        )
95        .await?;
96
97        let mut snapshot: Index<T, Location<F>> =
98            Index::new(context.child("snapshot"), db_config.translator.clone());
99
100        let (last_commit_loc, inactivity_floor_loc) = {
101            let reader = journal.journal.reader().await;
102            let bounds = reader.bounds();
103            let last_commit_loc = Location::<F>::new(
104                bounds
105                    .end
106                    .checked_sub(1)
107                    .ok_or(Error::HistoricalFloorPruned(Location::new(bounds.end)))?,
108            );
109            let inactivity_floor_loc = crate::qmdb::find_inactivity_floor_at::<F, _>(
110                &reader,
111                Location::new(bounds.end),
112                |op| op.has_floor(),
113            )
114            .await?;
115
116            // Replay the log from the inactivity floor to build the snapshot.
117            build_snapshot_from_log::<F, _, _, _>(
118                inactivity_floor_loc,
119                &reader,
120                &mut snapshot,
121                |_, _| {},
122            )
123            .await?;
124
125            (last_commit_loc, inactivity_floor_loc)
126        };
127        let inactive_peaks = F::inactive_peaks(
128            F::location_to_position(Location::new(*last_commit_loc + 1)),
129            inactivity_floor_loc,
130        );
131        let root = journal.root(inactive_peaks)?;
132
133        let metrics = Metrics::new(context);
134        let db = Self {
135            journal,
136            root,
137            snapshot,
138            last_commit_loc,
139            inactivity_floor_loc,
140            metrics,
141        };
142        db.update_metrics().await;
143
144        db.sync().await?;
145        Ok(db)
146    }
147
148    fn root(&self) -> Self::Digest {
149        self.root()
150    }
151}
152
153impl<F, E, K, V, H, Cfg, S> sync::compact::Database for CompactDb<F, E, K, V, H, Cfg, S>
154where
155    F: Family,
156    E: Context,
157    K: Key,
158    V: ValueEncoding,
159    H: Hasher,
160    S: Strategy,
161    Operation<F, K, V>: EncodeShared,
162    Operation<F, K, V>: Read<Cfg = Cfg>,
163    Cfg: Clone + Send + Sync + 'static,
164{
165    type Family = F;
166    type Op = Operation<F, K, V>;
167    type Config = immutable::CompactConfig<Cfg, S>;
168    type Digest = H::Digest;
169    type Context = E;
170    type Hasher = H;
171
172    async fn from_validated_state(
173        context: Self::Context,
174        config: Self::Config,
175        state: sync::compact::ValidatedState<Self::Family, Self::Op, Self::Digest>,
176    ) -> Result<Self, Error<F>> {
177        let sync::compact::ValidatedState {
178            state,
179            root,
180            inactivity_floor: inactivity_floor_loc,
181        } = state;
182        let sync::compact::State {
183            leaf_count,
184            pinned_nodes,
185            last_commit_op,
186            last_commit_proof,
187        } = state;
188        let last_commit_loc = Location::new(*leaf_count - 1);
189        let Operation::Commit(last_commit_metadata, op_floor) = last_commit_op else {
190            return Err(Error::UnexpectedData(last_commit_loc));
191        };
192        assert_eq!(op_floor, inactivity_floor_loc, "inactivity floor mismatch");
193        let commit_codec_config = config.commit_codec_config.clone();
194        let last_commit_op_bytes =
195            Operation::<F, K, V>::Commit(last_commit_metadata.clone(), inactivity_floor_loc)
196                .encode()
197                .to_vec();
198        let merkle = crate::merkle::compact::Merkle::init_from_compact_state(
199            context.child("merkle"),
200            config.merkle,
201            leaf_count,
202            pinned_nodes.clone(),
203        )
204        .await?;
205        Self::init_from_verified_state(
206            merkle,
207            commit_codec_config,
208            last_commit_metadata,
209            inactivity_floor_loc,
210            root,
211            last_commit_op_bytes,
212            last_commit_proof,
213            pinned_nodes,
214        )
215    }
216
217    fn inactivity_floor(op: &Self::Op) -> Option<Location<Self::Family>> {
218        op.has_floor()
219    }
220
221    fn root(&self) -> Self::Digest {
222        self.root()
223    }
224
225    async fn persist_compact_state(&self) -> Result<(), Error<F>> {
226        self.persist_cached_witness().await
227    }
228}
229
230#[cfg(test)]
231mod tests;