Skip to main content

commonware_storage/qmdb/sync/
resolver.rs

1use crate::{
2    mmr::{Location, Proof},
3    qmdb::{
4        self,
5        any::{
6            ordered::{
7                fixed::{Db as OrderedFixedDb, Operation as OrderedFixedOperation},
8                variable::{Db as OrderedVariableDb, Operation as OrderedVariableOperation},
9            },
10            unordered::{
11                fixed::{Db as FixedDb, Operation as FixedOperation},
12                variable::{Db as VariableDb, Operation as VariableOperation},
13            },
14            FixedValue, VariableValue,
15        },
16        immutable::{Immutable, Operation as ImmutableOp},
17    },
18    translator::Translator,
19};
20use commonware_cryptography::{Digest, Hasher};
21use commonware_runtime::{Clock, Metrics, Storage};
22use commonware_utils::{channel::oneshot, sync::AsyncRwLock, Array};
23use std::{future::Future, num::NonZeroU64, sync::Arc};
24
25/// Result from a fetch operation
26pub struct FetchResult<Op, D: Digest> {
27    /// The proof for the operations
28    pub proof: Proof<D>,
29    /// The operations that were fetched
30    pub operations: Vec<Op>,
31    /// Channel to report success/failure back to resolver
32    pub success_tx: oneshot::Sender<bool>,
33}
34
35impl<Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<Op, D> {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("FetchResult")
38            .field("proof", &self.proof)
39            .field("operations", &self.operations)
40            .field("success_tx", &"<callback>")
41            .finish()
42    }
43}
44
45/// Trait for network communication with the sync server
46pub trait Resolver: Send + Sync + Clone + 'static {
47    /// The digest type used in proofs returned by the resolver
48    type Digest: Digest;
49
50    /// The type of operations returned by the resolver
51    type Op;
52
53    /// The error type returned by the resolver
54    type Error: std::error::Error + Send + 'static;
55
56    /// Get the operations starting at `start_loc` in the database, up to `max_ops` operations.
57    /// Returns the operations and a proof that they were present in the database when it had
58    /// `size` operations.
59    #[allow(clippy::type_complexity)]
60    fn get_operations<'a>(
61        &'a self,
62        op_count: Location,
63        start_loc: Location,
64        max_ops: NonZeroU64,
65    ) -> impl Future<Output = Result<FetchResult<Self::Op, Self::Digest>, Self::Error>> + Send + 'a;
66}
67
68macro_rules! impl_resolver {
69    ($db:ident, $op:ident, $val_bound:ident) => {
70        impl<E, K, V, H, T> Resolver for Arc<$db<E, K, V, H, T>>
71        where
72            E: Storage + Clock + Metrics,
73            K: Array,
74            V: $val_bound + Send + Sync + 'static,
75            H: Hasher,
76            T: Translator + Send + Sync + 'static,
77            T::Key: Send + Sync,
78        {
79            type Digest = H::Digest;
80            type Op = $op<K, V>;
81            type Error = qmdb::Error;
82
83            async fn get_operations(
84                &self,
85                op_count: Location,
86                start_loc: Location,
87                max_ops: NonZeroU64,
88            ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
89                self.historical_proof(op_count, start_loc, max_ops)
90                    .await
91                    .map(|(proof, operations)| FetchResult {
92                        proof,
93                        operations,
94                        success_tx: oneshot::channel().0,
95                    })
96            }
97        }
98
99        impl<E, K, V, H, T> Resolver for Arc<AsyncRwLock<$db<E, K, V, H, T>>>
100        where
101            E: Storage + Clock + Metrics,
102            K: Array,
103            V: $val_bound + Send + Sync + 'static,
104            H: Hasher,
105            T: Translator + Send + Sync + 'static,
106            T::Key: Send + Sync,
107        {
108            type Digest = H::Digest;
109            type Op = $op<K, V>;
110            type Error = qmdb::Error;
111
112            async fn get_operations(
113                &self,
114                op_count: Location,
115                start_loc: Location,
116                max_ops: NonZeroU64,
117            ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
118                let db = self.read().await;
119                db.historical_proof(op_count, start_loc, max_ops).await.map(
120                    |(proof, operations)| FetchResult {
121                        proof,
122                        operations,
123                        success_tx: oneshot::channel().0,
124                    },
125                )
126            }
127        }
128
129        impl<E, K, V, H, T> Resolver for Arc<AsyncRwLock<Option<$db<E, K, V, H, T>>>>
130        where
131            E: Storage + Clock + Metrics,
132            K: Array,
133            V: $val_bound + Send + Sync + 'static,
134            H: Hasher,
135            T: Translator + Send + Sync + 'static,
136            T::Key: Send + Sync,
137        {
138            type Digest = H::Digest;
139            type Op = $op<K, V>;
140            type Error = qmdb::Error;
141
142            async fn get_operations(
143                &self,
144                op_count: Location,
145                start_loc: Location,
146                max_ops: NonZeroU64,
147            ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
148                let guard = self.read().await;
149                let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
150                db.historical_proof(op_count, start_loc, max_ops).await.map(
151                    |(proof, operations)| FetchResult {
152                        proof,
153                        operations,
154                        success_tx: oneshot::channel().0,
155                    },
156                )
157            }
158        }
159    };
160}
161
162// Unordered Fixed
163impl_resolver!(FixedDb, FixedOperation, FixedValue);
164
165// Unordered Variable
166impl_resolver!(VariableDb, VariableOperation, VariableValue);
167
168// Ordered Fixed
169impl_resolver!(OrderedFixedDb, OrderedFixedOperation, FixedValue);
170
171// Ordered Variable
172impl_resolver!(OrderedVariableDb, OrderedVariableOperation, VariableValue);
173
174// Immutable
175impl_resolver!(Immutable, ImmutableOp, VariableValue);
176
177#[cfg(test)]
178pub(crate) mod tests {
179    use super::*;
180    use std::marker::PhantomData;
181
182    /// A resolver that always fails.
183    #[derive(Clone)]
184    pub struct FailResolver<Op, D> {
185        _phantom: PhantomData<(Op, D)>,
186    }
187
188    impl<Op, D> Resolver for FailResolver<Op, D>
189    where
190        D: Digest,
191        Op: Send + Sync + Clone + 'static,
192    {
193        type Digest = D;
194        type Op = Op;
195        type Error = qmdb::Error;
196
197        async fn get_operations(
198            &self,
199            _op_count: Location,
200            _start_loc: Location,
201            _max_ops: NonZeroU64,
202        ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
203            Err(qmdb::Error::KeyNotFound) // Arbitrary dummy error
204        }
205    }
206
207    impl<Op, D> FailResolver<Op, D> {
208        pub fn new() -> Self {
209            Self {
210                _phantom: PhantomData,
211            }
212        }
213    }
214}