Skip to main content

commonware_storage/qmdb/sync/
resolver.rs

1use crate::{
2    merkle::mmr::{self, Location, Proof},
3    qmdb::{
4        self,
5        any::{
6            ordered::{
7                fixed::{Db as OrderedFixedDb, Operation as OrderedFixedOperation},
8                variable::{Db as OrderedVariableDb, Operation as OrderedVariableOperation},
9            },
10            unordered::{
11                fixed::{Db as FixedDb, Operation as FixedOperation},
12                variable::{Db as VariableDb, Operation as VariableOperation},
13            },
14            FixedValue, VariableValue,
15        },
16        immutable::{
17            fixed::{Db as ImmutableFixedDb, Operation as ImmutableFixedOp},
18            variable::{Db as ImmutableVariableDb, Operation as ImmutableVariableOp},
19        },
20        operation::Key,
21    },
22    translator::Translator,
23    Context,
24};
25use commonware_cryptography::{Digest, Hasher};
26use commonware_utils::{channel::oneshot, sync::AsyncRwLock, Array};
27use std::{future::Future, num::NonZeroU64, sync::Arc};
28
29/// Result from a fetch operation
30pub struct FetchResult<Op, D: Digest> {
31    /// The proof for the operations
32    pub proof: Proof<D>,
33    /// The operations that were fetched
34    pub operations: Vec<Op>,
35    /// Channel to report success/failure back to resolver
36    pub success_tx: oneshot::Sender<bool>,
37    /// Pinned MMR nodes at the start location, if requested
38    pub pinned_nodes: Option<Vec<D>>,
39}
40
41impl<Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<Op, D> {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("FetchResult")
44            .field("proof", &self.proof)
45            .field("operations", &self.operations)
46            .field("success_tx", &"<callback>")
47            .field("pinned_nodes", &self.pinned_nodes)
48            .finish()
49    }
50}
51
52/// Trait for network communication with the sync server
53pub trait Resolver: Send + Sync + Clone + 'static {
54    /// The digest type used in proofs returned by the resolver
55    type Digest: Digest;
56
57    /// The type of operations returned by the resolver
58    type Op;
59
60    /// The error type returned by the resolver
61    type Error: std::error::Error + Send + 'static;
62
63    /// Get the operations starting at `start_loc` in the database, up to `max_ops` operations.
64    /// Returns the operations and a proof that they were present in the database when it had
65    /// `op_count` operations. If `include_pinned_nodes` is true, the result will include the
66    /// pinned MMR nodes at `start_loc`.
67    ///
68    /// The corresponding `cancel_tx` is dropped when the engine no longer needs this
69    /// request (e.g. due to a target update), causing `cancel_rx.await` to return
70    /// `Err`. Implementations may `select!` on it to abort in-flight work early.
71    #[allow(clippy::type_complexity)]
72    fn get_operations<'a>(
73        &'a self,
74        op_count: Location,
75        start_loc: Location,
76        max_ops: NonZeroU64,
77        include_pinned_nodes: bool,
78        cancel_rx: oneshot::Receiver<()>,
79    ) -> impl Future<Output = Result<FetchResult<Self::Op, Self::Digest>, Self::Error>> + Send + 'a;
80}
81
82macro_rules! impl_resolver {
83    ($db:ident, $op:ident, $val_bound:ident) => {
84        impl<E, K, V, H, T> Resolver
85            for Arc<$db<crate::merkle::mmr::Family, E, K, V, H, T>>
86        where
87            E: Context,
88            K: Array,
89            V: $val_bound + Send + Sync + 'static,
90            H: Hasher,
91            T: Translator + Send + Sync + 'static,
92            T::Key: Send + Sync,
93        {
94            type Digest = H::Digest;
95            type Op = $op<crate::merkle::mmr::Family, K, V>;
96            type Error = qmdb::Error<crate::merkle::mmr::Family>;
97
98            async fn get_operations(
99                &self,
100                op_count: Location,
101                start_loc: Location,
102                max_ops: NonZeroU64,
103                include_pinned_nodes: bool,
104                _cancel_rx: oneshot::Receiver<()>,
105            ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
106                let (proof, operations) =
107                    self.historical_proof(op_count, start_loc, max_ops).await?;
108                let pinned_nodes = if include_pinned_nodes {
109                    Some(self.pinned_nodes_at(start_loc).await?)
110                } else {
111                    None
112                };
113                Ok(FetchResult {
114                    proof,
115                    operations,
116                    success_tx: oneshot::channel().0,
117                    pinned_nodes,
118                })
119            }
120        }
121
122        impl<E, K, V, H, T> Resolver
123            for Arc<AsyncRwLock<$db<crate::merkle::mmr::Family, E, K, V, H, T>>>
124        where
125            E: Context,
126            K: Array,
127            V: $val_bound + Send + Sync + 'static,
128            H: Hasher,
129            T: Translator + Send + Sync + 'static,
130            T::Key: Send + Sync,
131        {
132            type Digest = H::Digest;
133            type Op = $op<crate::merkle::mmr::Family, K, V>;
134            type Error = qmdb::Error<crate::merkle::mmr::Family>;
135
136            async fn get_operations(
137                &self,
138                op_count: Location,
139                start_loc: Location,
140                max_ops: NonZeroU64,
141                include_pinned_nodes: bool,
142                _cancel_rx: oneshot::Receiver<()>,
143            ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error<crate::merkle::mmr::Family>> {
144                let db = self.read().await;
145                let (proof, operations) = db.historical_proof(op_count, start_loc, max_ops).await?;
146                let pinned_nodes = if include_pinned_nodes {
147                    Some(db.pinned_nodes_at(start_loc).await?)
148                } else {
149                    None
150                };
151                Ok(FetchResult {
152                    proof,
153                    operations,
154                    success_tx: oneshot::channel().0,
155                    pinned_nodes,
156                })
157            }
158        }
159
160        impl<E, K, V, H, T> Resolver
161            for Arc<AsyncRwLock<Option<$db<crate::merkle::mmr::Family, E, K, V, H, T>>>>
162        where
163            E: Context,
164            K: Array,
165            V: $val_bound + Send + Sync + 'static,
166            H: Hasher,
167            T: Translator + Send + Sync + 'static,
168            T::Key: Send + Sync,
169        {
170            type Digest = H::Digest;
171            type Op = $op<crate::merkle::mmr::Family, K, V>;
172            type Error = qmdb::Error<crate::merkle::mmr::Family>;
173
174            async fn get_operations(
175                &self,
176                op_count: Location,
177                start_loc: Location,
178                max_ops: NonZeroU64,
179                include_pinned_nodes: bool,
180                _cancel_rx: oneshot::Receiver<()>,
181            ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error<crate::merkle::mmr::Family>> {
182                let guard = self.read().await;
183                let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
184                let (proof, operations) = db.historical_proof(op_count, start_loc, max_ops).await?;
185                let pinned_nodes = if include_pinned_nodes {
186                    Some(db.pinned_nodes_at(start_loc).await?)
187                } else {
188                    None
189                };
190                Ok(FetchResult {
191                    proof,
192                    operations,
193                    success_tx: oneshot::channel().0,
194                    pinned_nodes,
195                })
196            }
197        }
198    };
199}
200
201// Unordered Fixed
202impl_resolver!(FixedDb, FixedOperation, FixedValue);
203
204// Unordered Variable
205impl_resolver!(VariableDb, VariableOperation, VariableValue);
206
207// Ordered Fixed
208impl_resolver!(OrderedFixedDb, OrderedFixedOperation, FixedValue);
209
210// Ordered Variable
211impl_resolver!(OrderedVariableDb, OrderedVariableOperation, VariableValue);
212
213// Immutable types have a different Operation signature (no F parameter),
214// so we use a separate macro.
215macro_rules! impl_resolver_immutable {
216    ($db:ident, $op:ident, $val_bound:ident, $key_bound:path) => {
217        impl<E, K, V, H, T> Resolver for Arc<$db<mmr::Family, E, K, V, H, T>>
218        where
219            E: Context,
220            K: $key_bound,
221            V: $val_bound + Send + Sync + 'static,
222            H: Hasher,
223            T: Translator + Send + Sync + 'static,
224            T::Key: Send + Sync,
225        {
226            type Digest = H::Digest;
227            type Op = $op<K, V>;
228            type Error = qmdb::Error<mmr::Family>;
229
230            async fn get_operations(
231                &self,
232                op_count: Location,
233                start_loc: Location,
234                max_ops: NonZeroU64,
235                include_pinned_nodes: bool,
236                _cancel_rx: oneshot::Receiver<()>,
237            ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
238                let (proof, operations) =
239                    self.historical_proof(op_count, start_loc, max_ops).await?;
240                let pinned_nodes = if include_pinned_nodes {
241                    Some(self.pinned_nodes_at(start_loc).await?)
242                } else {
243                    None
244                };
245                Ok(FetchResult {
246                    proof,
247                    operations,
248                    success_tx: oneshot::channel().0,
249                    pinned_nodes,
250                })
251            }
252        }
253
254        impl<E, K, V, H, T> Resolver for Arc<AsyncRwLock<$db<mmr::Family, E, K, V, H, T>>>
255        where
256            E: Context,
257            K: $key_bound,
258            V: $val_bound + Send + Sync + 'static,
259            H: Hasher,
260            T: Translator + Send + Sync + 'static,
261            T::Key: Send + Sync,
262        {
263            type Digest = H::Digest;
264            type Op = $op<K, V>;
265            type Error = qmdb::Error<mmr::Family>;
266
267            async fn get_operations(
268                &self,
269                op_count: Location,
270                start_loc: Location,
271                max_ops: NonZeroU64,
272                include_pinned_nodes: bool,
273                _cancel_rx: oneshot::Receiver<()>,
274            ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error<mmr::Family>> {
275                let db = self.read().await;
276                let (proof, operations) = db.historical_proof(op_count, start_loc, max_ops).await?;
277                let pinned_nodes = if include_pinned_nodes {
278                    Some(db.pinned_nodes_at(start_loc).await?)
279                } else {
280                    None
281                };
282                Ok(FetchResult {
283                    proof,
284                    operations,
285                    success_tx: oneshot::channel().0,
286                    pinned_nodes,
287                })
288            }
289        }
290
291        impl<E, K, V, H, T> Resolver for Arc<AsyncRwLock<Option<$db<mmr::Family, E, K, V, H, T>>>>
292        where
293            E: Context,
294            K: $key_bound,
295            V: $val_bound + Send + Sync + 'static,
296            H: Hasher,
297            T: Translator + Send + Sync + 'static,
298            T::Key: Send + Sync,
299        {
300            type Digest = H::Digest;
301            type Op = $op<K, V>;
302            type Error = qmdb::Error<mmr::Family>;
303
304            async fn get_operations(
305                &self,
306                op_count: Location,
307                start_loc: Location,
308                max_ops: NonZeroU64,
309                include_pinned_nodes: bool,
310                _cancel_rx: oneshot::Receiver<()>,
311            ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error<mmr::Family>> {
312                let guard = self.read().await;
313                let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
314                let (proof, operations) = db.historical_proof(op_count, start_loc, max_ops).await?;
315                let pinned_nodes = if include_pinned_nodes {
316                    Some(db.pinned_nodes_at(start_loc).await?)
317                } else {
318                    None
319                };
320                Ok(FetchResult {
321                    proof,
322                    operations,
323                    success_tx: oneshot::channel().0,
324                    pinned_nodes,
325                })
326            }
327        }
328    };
329}
330
331// Immutable Fixed
332impl_resolver_immutable!(ImmutableFixedDb, ImmutableFixedOp, FixedValue, Array);
333
334// Immutable Variable
335impl_resolver_immutable!(ImmutableVariableDb, ImmutableVariableOp, VariableValue, Key);
336
337#[cfg(test)]
338pub(crate) mod tests {
339    use super::*;
340    use std::marker::PhantomData;
341
342    /// A resolver that always fails.
343    #[derive(Clone)]
344    pub struct FailResolver<Op, D> {
345        _phantom: PhantomData<(Op, D)>,
346    }
347
348    impl<Op, D> Resolver for FailResolver<Op, D>
349    where
350        D: Digest,
351        Op: Send + Sync + Clone + 'static,
352    {
353        type Digest = D;
354        type Op = Op;
355        type Error = qmdb::Error<crate::merkle::mmr::Family>;
356
357        async fn get_operations(
358            &self,
359            _op_count: Location,
360            _start_loc: Location,
361            _max_ops: NonZeroU64,
362            _include_pinned_nodes: bool,
363            _cancel: oneshot::Receiver<()>,
364        ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error<crate::merkle::mmr::Family>>
365        {
366            Err(qmdb::Error::KeyNotFound) // Arbitrary dummy error
367        }
368    }
369
370    impl<Op, D> FailResolver<Op, D> {
371        pub fn new() -> Self {
372            Self {
373                _phantom: PhantomData,
374            }
375        }
376    }
377}