commonware_storage/qmdb/sync/
resolver.rs

1use crate::{
2    mmr::{Location, Proof},
3    qmdb::{
4        self,
5        any::{
6            unordered::{
7                fixed::{Db as FixedDb, Operation as FixedOperation},
8                variable::{Db as VariableDb, Operation as VariableOperation},
9            },
10            FixedValue, VariableValue,
11        },
12        immutable::{Immutable, Operation as ImmutableOp},
13        Durable, Merkleized,
14    },
15    translator::Translator,
16};
17use commonware_cryptography::{Digest, Hasher};
18use commonware_runtime::{Clock, Metrics, RwLock, Storage};
19use commonware_utils::Array;
20use futures::channel::oneshot;
21use std::{future::Future, num::NonZeroU64, sync::Arc};
22
23/// Result from a fetch operation
24pub struct FetchResult<Op, D: Digest> {
25    /// The proof for the operations
26    pub proof: Proof<D>,
27    /// The operations that were fetched
28    pub operations: Vec<Op>,
29    /// Channel to report success/failure back to resolver
30    pub success_tx: oneshot::Sender<bool>,
31}
32
33impl<Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<Op, D> {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("FetchResult")
36            .field("proof", &self.proof)
37            .field("operations", &self.operations)
38            .field("success_tx", &"<callback>")
39            .finish()
40    }
41}
42
43/// Trait for network communication with the sync server
44pub trait Resolver: Send + Sync + Clone + 'static {
45    /// The digest type used in proofs returned by the resolver
46    type Digest: Digest;
47
48    /// The type of operations returned by the resolver
49    type Op;
50
51    /// The error type returned by the resolver
52    type Error: std::error::Error + Send + 'static;
53
54    /// Get the operations starting at `start_loc` in the database, up to `max_ops` operations.
55    /// Returns the operations and a proof that they were present in the database when it had
56    /// `size` operations.
57    #[allow(clippy::type_complexity)]
58    fn get_operations<'a>(
59        &'a self,
60        op_count: Location,
61        start_loc: Location,
62        max_ops: NonZeroU64,
63    ) -> impl Future<Output = Result<FetchResult<Self::Op, Self::Digest>, Self::Error>> + Send + 'a;
64}
65
66impl<E, K, V, H, T> Resolver for Arc<FixedDb<E, K, V, H, T, Merkleized<H>, Durable>>
67where
68    E: Storage + Clock + Metrics,
69    K: Array,
70    V: FixedValue + Send + Sync + 'static,
71    H: Hasher,
72    T: Translator + Send + Sync + 'static,
73    T::Key: Send + Sync,
74{
75    type Digest = H::Digest;
76    type Op = FixedOperation<K, V>;
77    type Error = qmdb::Error;
78
79    async fn get_operations(
80        &self,
81        op_count: Location,
82        start_loc: Location,
83        max_ops: NonZeroU64,
84    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
85        self.historical_proof(op_count, start_loc, max_ops)
86            .await
87            .map(|(proof, operations)| FetchResult {
88                proof,
89                operations,
90                // Result of proof verification isn't used by this implementation.
91                success_tx: oneshot::channel().0,
92            })
93    }
94}
95
96/// Implement Resolver directly for `Arc<RwLock<FixedDb>>` to eliminate the need for wrapper types
97/// while allowing direct database access.
98impl<E, K, V, H, T> Resolver for Arc<RwLock<FixedDb<E, K, V, H, T, Merkleized<H>, Durable>>>
99where
100    E: Storage + Clock + Metrics,
101    K: Array,
102    V: FixedValue + Send + Sync + 'static,
103    H: Hasher,
104    T: Translator + Send + Sync + 'static,
105    T::Key: Send + Sync,
106{
107    type Digest = H::Digest;
108    type Op = FixedOperation<K, V>;
109    type Error = qmdb::Error;
110
111    async fn get_operations(
112        &self,
113        op_count: Location,
114        start_loc: Location,
115        max_ops: NonZeroU64,
116    ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
117        let db = self.read().await;
118        db.historical_proof(op_count, start_loc, max_ops)
119            .await
120            .map(|(proof, operations)| FetchResult {
121                proof,
122                operations,
123                // Result of proof verification isn't used by this implementation.
124                success_tx: oneshot::channel().0,
125            })
126    }
127}
128
129impl<E, K, V, H, T> Resolver for Arc<VariableDb<E, K, V, H, T, Merkleized<H>, Durable>>
130where
131    E: Storage + Clock + Metrics,
132    K: Array,
133    V: VariableValue + Send + Sync + 'static,
134    H: Hasher,
135    T: Translator + Send + Sync + 'static,
136    T::Key: Send + Sync,
137{
138    type Digest = H::Digest;
139    type Op = VariableOperation<K, V>;
140    type Error = qmdb::Error;
141
142    async fn get_operations(
143        &self,
144        op_count: Location,
145        start_loc: Location,
146        max_ops: NonZeroU64,
147    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
148        self.historical_proof(op_count, start_loc, max_ops)
149            .await
150            .map(|(proof, operations)| FetchResult {
151                proof,
152                operations,
153                // Result of proof verification isn't used by this implementation.
154                success_tx: oneshot::channel().0,
155            })
156    }
157}
158
159/// Implement Resolver directly for `Arc<RwLock<VariableDb>>` to eliminate the need for wrapper
160/// types while allowing direct database access.
161impl<E, K, V, H, T> Resolver for Arc<RwLock<VariableDb<E, K, V, H, T, Merkleized<H>, Durable>>>
162where
163    E: Storage + Clock + Metrics,
164    K: Array,
165    V: VariableValue + Send + Sync + 'static,
166    H: Hasher,
167    T: Translator + Send + Sync + 'static,
168    T::Key: Send + Sync,
169{
170    type Digest = H::Digest;
171    type Op = VariableOperation<K, V>;
172    type Error = qmdb::Error;
173
174    async fn get_operations(
175        &self,
176        op_count: Location,
177        start_loc: Location,
178        max_ops: NonZeroU64,
179    ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
180        let db = self.read().await;
181        db.historical_proof(op_count, start_loc, max_ops)
182            .await
183            .map(|(proof, operations)| FetchResult {
184                proof,
185                operations,
186                // Result of proof verification isn't used by this implementation.
187                success_tx: oneshot::channel().0,
188            })
189    }
190}
191
192/// Implement Resolver for `Arc<RwLock<Option<FixedDb>>>` to allow taking ownership during sync.
193impl<E, K, V, H, T> Resolver for Arc<RwLock<Option<FixedDb<E, K, V, H, T, Merkleized<H>, Durable>>>>
194where
195    E: Storage + Clock + Metrics,
196    K: Array,
197    V: FixedValue + Send + Sync + 'static,
198    H: Hasher,
199    T: Translator + Send + Sync + 'static,
200    T::Key: Send + Sync,
201{
202    type Digest = H::Digest;
203    type Op = FixedOperation<K, V>;
204    type Error = qmdb::Error;
205
206    async fn get_operations(
207        &self,
208        op_count: Location,
209        start_loc: Location,
210        max_ops: NonZeroU64,
211    ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
212        let guard = self.read().await;
213        let db: &FixedDb<E, K, V, H, T, Merkleized<H>, Durable> =
214            guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
215        db.historical_proof(op_count, start_loc, max_ops)
216            .await
217            .map(|(proof, operations)| FetchResult {
218                proof,
219                operations,
220                // Result of proof verification isn't used by this implementation.
221                success_tx: oneshot::channel().0,
222            })
223    }
224}
225
226/// Implement Resolver for `Arc<RwLock<Option<VariableDb>>>` to allow taking ownership during sync.
227impl<E, K, V, H, T> Resolver
228    for Arc<RwLock<Option<VariableDb<E, K, V, H, T, Merkleized<H>, Durable>>>>
229where
230    E: Storage + Clock + Metrics,
231    K: Array,
232    V: VariableValue + Send + Sync + 'static,
233    H: Hasher,
234    T: Translator + Send + Sync + 'static,
235    T::Key: Send + Sync,
236{
237    type Digest = H::Digest;
238    type Op = VariableOperation<K, V>;
239    type Error = qmdb::Error;
240
241    async fn get_operations(
242        &self,
243        op_count: Location,
244        start_loc: Location,
245        max_ops: NonZeroU64,
246    ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
247        let guard = self.read().await;
248        let db: &VariableDb<E, K, V, H, T, Merkleized<H>, Durable> =
249            guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
250        db.historical_proof(op_count, start_loc, max_ops)
251            .await
252            .map(|(proof, operations)| FetchResult {
253                proof,
254                operations,
255                // Result of proof verification isn't used by this implementation.
256                success_tx: oneshot::channel().0,
257            })
258    }
259}
260
261impl<E, K, V, H, T> Resolver for Arc<Immutable<E, K, V, H, T, Merkleized<H>, Durable>>
262where
263    E: Storage + Clock + Metrics,
264    K: Array,
265    V: VariableValue + Send + Sync + 'static,
266    H: Hasher,
267    T: Translator + Send + Sync + 'static,
268    T::Key: Send + Sync,
269{
270    type Digest = H::Digest;
271    type Op = ImmutableOp<K, V>;
272    type Error = crate::qmdb::Error;
273
274    async fn get_operations(
275        &self,
276        op_count: Location,
277        start_loc: Location,
278        max_ops: NonZeroU64,
279    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
280        self.historical_proof(op_count, start_loc, max_ops)
281            .await
282            .map(|(proof, operations)| FetchResult {
283                proof,
284                operations,
285                // Result of proof verification isn't used by this implementation.
286                success_tx: oneshot::channel().0,
287            })
288    }
289}
290
291/// Implement Resolver directly for `Arc<RwLock<Immutable>>` to eliminate the need for wrapper
292/// types while allowing direct database access.
293impl<E, K, V, H, T> Resolver for Arc<RwLock<Immutable<E, K, V, H, T, Merkleized<H>, Durable>>>
294where
295    E: Storage + Clock + Metrics,
296    K: Array,
297    V: VariableValue + Send + Sync + 'static,
298    H: Hasher,
299    T: Translator + Send + Sync + 'static,
300    T::Key: Send + Sync,
301{
302    type Digest = H::Digest;
303    type Op = ImmutableOp<K, V>;
304    type Error = crate::qmdb::Error;
305
306    async fn get_operations(
307        &self,
308        op_count: Location,
309        start_loc: Location,
310        max_ops: NonZeroU64,
311    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
312        let db = self.read().await;
313        db.historical_proof(op_count, start_loc, max_ops)
314            .await
315            .map(|(proof, operations)| FetchResult {
316                proof,
317                operations,
318                // Result of proof verification isn't used by this implementation.
319                success_tx: oneshot::channel().0,
320            })
321    }
322}
323
324#[cfg(test)]
325pub(crate) mod tests {
326    use super::*;
327    use std::marker::PhantomData;
328
329    /// A resolver that always fails.
330    #[derive(Clone)]
331    pub struct FailResolver<Op, D> {
332        _phantom: PhantomData<(Op, D)>,
333    }
334
335    impl<Op, D> Resolver for FailResolver<Op, D>
336    where
337        D: Digest,
338        Op: Send + Sync + Clone + 'static,
339    {
340        type Digest = D;
341        type Op = Op;
342        type Error = qmdb::Error;
343
344        async fn get_operations(
345            &self,
346            _op_count: Location,
347            _start_loc: Location,
348            _max_ops: NonZeroU64,
349        ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
350            Err(qmdb::Error::KeyNotFound) // Arbitrary dummy error
351        }
352    }
353
354    impl<Op, D> FailResolver<Op, D> {
355        pub fn new() -> Self {
356            Self {
357                _phantom: PhantomData,
358            }
359        }
360    }
361}