Skip to main content

commonware_storage/qmdb/keyless/sync/
mod.rs

1use crate::{
2    journal::{
3        authenticated,
4        contiguous::{Contiguous as _, Mutable, Reader as _},
5        Error as JournalError,
6    },
7    merkle::{
8        full::{self, Merkle},
9        Family, Location,
10    },
11    qmdb::{
12        self,
13        any::value::ValueEncoding,
14        keyless::{operation::Codec, CompactDb, Keyless, Metrics, Operation},
15        sync,
16    },
17    Context, Persistable,
18};
19use commonware_codec::{Encode, EncodeShared, Read};
20use commonware_cryptography::Hasher;
21use commonware_parallel::Strategy;
22use commonware_utils::range::NonEmptyRange;
23
24impl<F, E, V, C, H, S> sync::Database for Keyless<F, E, V, C, H, S>
25where
26    F: Family,
27    E: Context,
28    V: ValueEncoding + Codec,
29    C: Mutable<Item = Operation<F, V>>
30        + Persistable<Error = JournalError>
31        + sync::Journal<F, Context = E, Op = Operation<F, V>>,
32    C::Config: Clone + Send,
33    H: Hasher,
34    S: Strategy,
35    Operation<F, V>: EncodeShared,
36{
37    type Family = F;
38    type Op = Operation<F, V>;
39    type Journal = C;
40    type Hasher = H;
41    type Config = super::Config<C::Config, S>;
42    type Digest = H::Digest;
43    type Context = E;
44
45    /// Returns a [Keyless] db initialized from data collected in the sync process.
46    ///
47    /// # Behavior
48    ///
49    /// This method handles different initialization scenarios based on existing data:
50    /// - If the Merkle journal is empty or the last item is before the range start, it creates
51    ///   a fresh Merkle structure from the provided `pinned_nodes`
52    /// - If the Merkle journal has data but is incomplete (has length < range end), missing
53    ///   operations from the log are applied to bring it up to the target state
54    /// - If the Merkle journal has data beyond the range end, it is rewound to match the sync
55    ///   target
56    ///
57    /// # Returns
58    ///
59    /// A [Keyless] db populated with the state from the given range.
60    async fn from_sync_result(
61        context: Self::Context,
62        config: Self::Config,
63        log: Self::Journal,
64        pinned_nodes: Option<Vec<Self::Digest>>,
65        range: NonEmptyRange<Location<F>>,
66        apply_batch_size: usize,
67    ) -> Result<Self, qmdb::Error<F>> {
68        let hasher = qmdb::hasher::<H>();
69
70        let merkle = Merkle::<F, _, _, S>::init_sync(
71            context.child("merkle"),
72            full::SyncConfig {
73                config: config.merkle.clone(),
74                range: range.clone(),
75                pinned_nodes,
76            },
77        )
78        .await?;
79
80        let journal = authenticated::Journal::<F, _, _, _, S>::from_components(
81            merkle,
82            log,
83            hasher,
84            apply_batch_size as u64,
85        )
86        .await?;
87
88        let (last_commit_loc, inactivity_floor_loc) = {
89            let reader = journal.reader().await;
90            let bounds = reader.bounds();
91            let loc = bounds
92                .end
93                .checked_sub(1)
94                .ok_or(qmdb::Error::HistoricalFloorPruned(Location::new(
95                    bounds.end,
96                )))?;
97            let floor =
98                qmdb::find_inactivity_floor_at::<F, _>(&reader, Location::new(bounds.end), |op| {
99                    op.has_floor()
100                })
101                .await?;
102            (Location::new(loc), floor)
103        };
104        let inactive_peaks = F::inactive_peaks(
105            F::location_to_position(Location::new(*last_commit_loc + 1)),
106            inactivity_floor_loc,
107        );
108        let root = journal.root(inactive_peaks)?;
109
110        let metrics = Metrics::new(context);
111        let db = Self {
112            journal,
113            root,
114            last_commit_loc,
115            inactivity_floor_loc,
116            metrics,
117        };
118        db.update_metrics().await;
119
120        db.sync().await?;
121        Ok(db)
122    }
123
124    fn root(&self) -> Self::Digest {
125        self.root()
126    }
127}
128
129impl<F, E, V, H, Cfg, S> sync::compact::Database for CompactDb<F, E, V, H, Cfg, S>
130where
131    F: Family,
132    E: Context,
133    V: ValueEncoding + Codec,
134    H: Hasher,
135    S: Strategy,
136    Operation<F, V>: EncodeShared,
137    Operation<F, V>: Read<Cfg = Cfg>,
138    Cfg: Clone + Send + Sync + 'static,
139{
140    type Family = F;
141    type Op = Operation<F, V>;
142    type Config = super::CompactConfig<Cfg, S>;
143    type Digest = H::Digest;
144    type Context = E;
145    type Hasher = H;
146
147    async fn from_validated_state(
148        context: Self::Context,
149        config: Self::Config,
150        state: sync::compact::ValidatedState<Self::Family, Self::Op, Self::Digest>,
151    ) -> Result<Self, qmdb::Error<F>> {
152        let sync::compact::ValidatedState {
153            state,
154            root,
155            inactivity_floor: inactivity_floor_loc,
156        } = state;
157        let sync::compact::State {
158            leaf_count,
159            pinned_nodes,
160            last_commit_op,
161            last_commit_proof,
162        } = state;
163        let last_commit_loc = Location::new(*leaf_count - 1);
164        let Operation::Commit(last_commit_metadata, op_floor) = last_commit_op else {
165            return Err(qmdb::Error::UnexpectedData(last_commit_loc));
166        };
167        assert_eq!(op_floor, inactivity_floor_loc, "inactivity floor mismatch");
168        let commit_codec_config = config.commit_codec_config.clone();
169        let last_commit_op_bytes =
170            Operation::<F, V>::Commit(last_commit_metadata.clone(), inactivity_floor_loc)
171                .encode()
172                .to_vec();
173        let merkle = crate::merkle::compact::Merkle::init_from_compact_state(
174            context.child("merkle"),
175            config.merkle,
176            leaf_count,
177            pinned_nodes.clone(),
178        )
179        .await?;
180        Self::init_from_verified_state(
181            merkle,
182            commit_codec_config,
183            last_commit_metadata,
184            inactivity_floor_loc,
185            root,
186            last_commit_op_bytes,
187            last_commit_proof,
188            pinned_nodes,
189        )
190    }
191
192    fn inactivity_floor(op: &Self::Op) -> Option<Location<Self::Family>> {
193        op.has_floor()
194    }
195
196    fn root(&self) -> Self::Digest {
197        self.root()
198    }
199
200    async fn persist_compact_state(&self) -> Result<(), qmdb::Error<F>> {
201        self.persist_cached_witness().await
202    }
203}
204
205#[cfg(test)]
206mod tests;