Skip to main content

commonware_storage/qmdb/sync/
resolver.rs

1use crate::{
2    mmr::{Location, Proof},
3    qmdb::{
4        self,
5        any::{
6            ordered::{
7                fixed::{Db as OrderedFixedDb, Operation as OrderedFixedOperation},
8                variable::{Db as OrderedVariableDb, Operation as OrderedVariableOperation},
9            },
10            unordered::{
11                fixed::{Db as FixedDb, Operation as FixedOperation},
12                variable::{Db as VariableDb, Operation as VariableOperation},
13            },
14            FixedValue, VariableValue,
15        },
16        immutable::{Immutable, Operation as ImmutableOp},
17        Durable, Merkleized,
18    },
19    translator::Translator,
20};
21use commonware_cryptography::{Digest, Hasher};
22use commonware_runtime::{Clock, Metrics, RwLock, Storage};
23use commonware_utils::{channel::oneshot, Array};
24use std::{future::Future, num::NonZeroU64, sync::Arc};
25
26/// Result from a fetch operation
27pub struct FetchResult<Op, D: Digest> {
28    /// The proof for the operations
29    pub proof: Proof<D>,
30    /// The operations that were fetched
31    pub operations: Vec<Op>,
32    /// Channel to report success/failure back to resolver
33    pub success_tx: oneshot::Sender<bool>,
34}
35
36impl<Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<Op, D> {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("FetchResult")
39            .field("proof", &self.proof)
40            .field("operations", &self.operations)
41            .field("success_tx", &"<callback>")
42            .finish()
43    }
44}
45
46/// Trait for network communication with the sync server
47pub trait Resolver: Send + Sync + Clone + 'static {
48    /// The digest type used in proofs returned by the resolver
49    type Digest: Digest;
50
51    /// The type of operations returned by the resolver
52    type Op;
53
54    /// The error type returned by the resolver
55    type Error: std::error::Error + Send + 'static;
56
57    /// Get the operations starting at `start_loc` in the database, up to `max_ops` operations.
58    /// Returns the operations and a proof that they were present in the database when it had
59    /// `size` operations.
60    #[allow(clippy::type_complexity)]
61    fn get_operations<'a>(
62        &'a self,
63        op_count: Location,
64        start_loc: Location,
65        max_ops: NonZeroU64,
66    ) -> impl Future<Output = Result<FetchResult<Self::Op, Self::Digest>, Self::Error>> + Send + 'a;
67}
68
69macro_rules! impl_resolver {
70    ($db:ident, $op:ident, $val_bound:ident) => {
71        impl<E, K, V, H, T> Resolver for Arc<$db<E, K, V, H, T, Merkleized<H>, Durable>>
72        where
73            E: Storage + Clock + Metrics,
74            K: Array,
75            V: $val_bound + Send + Sync + 'static,
76            H: Hasher,
77            T: Translator + Send + Sync + 'static,
78            T::Key: Send + Sync,
79        {
80            type Digest = H::Digest;
81            type Op = $op<K, V>;
82            type Error = qmdb::Error;
83
84            async fn get_operations(
85                &self,
86                op_count: Location,
87                start_loc: Location,
88                max_ops: NonZeroU64,
89            ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
90                self.historical_proof(op_count, start_loc, max_ops)
91                    .await
92                    .map(|(proof, operations)| FetchResult {
93                        proof,
94                        operations,
95                        success_tx: oneshot::channel().0,
96                    })
97            }
98        }
99
100        impl<E, K, V, H, T> Resolver for Arc<RwLock<$db<E, K, V, H, T, Merkleized<H>, Durable>>>
101        where
102            E: Storage + Clock + Metrics,
103            K: Array,
104            V: $val_bound + Send + Sync + 'static,
105            H: Hasher,
106            T: Translator + Send + Sync + 'static,
107            T::Key: Send + Sync,
108        {
109            type Digest = H::Digest;
110            type Op = $op<K, V>;
111            type Error = qmdb::Error;
112
113            async fn get_operations(
114                &self,
115                op_count: Location,
116                start_loc: Location,
117                max_ops: NonZeroU64,
118            ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
119                let db = self.read().await;
120                db.historical_proof(op_count, start_loc, max_ops).await.map(
121                    |(proof, operations)| FetchResult {
122                        proof,
123                        operations,
124                        success_tx: oneshot::channel().0,
125                    },
126                )
127            }
128        }
129
130        impl<E, K, V, H, T> Resolver
131            for Arc<RwLock<Option<$db<E, K, V, H, T, Merkleized<H>, Durable>>>>
132        where
133            E: Storage + Clock + Metrics,
134            K: Array,
135            V: $val_bound + Send + Sync + 'static,
136            H: Hasher,
137            T: Translator + Send + Sync + 'static,
138            T::Key: Send + Sync,
139        {
140            type Digest = H::Digest;
141            type Op = $op<K, V>;
142            type Error = qmdb::Error;
143
144            async fn get_operations(
145                &self,
146                op_count: Location,
147                start_loc: Location,
148                max_ops: NonZeroU64,
149            ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
150                let guard = self.read().await;
151                let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
152                db.historical_proof(op_count, start_loc, max_ops).await.map(
153                    |(proof, operations)| FetchResult {
154                        proof,
155                        operations,
156                        success_tx: oneshot::channel().0,
157                    },
158                )
159            }
160        }
161    };
162}
163
164// Unordered Fixed
165impl_resolver!(FixedDb, FixedOperation, FixedValue);
166
167// Unordered Variable
168impl_resolver!(VariableDb, VariableOperation, VariableValue);
169
170// Ordered Fixed
171impl_resolver!(OrderedFixedDb, OrderedFixedOperation, FixedValue);
172
173// Ordered Variable
174impl_resolver!(OrderedVariableDb, OrderedVariableOperation, VariableValue);
175
176// Immutable
177impl_resolver!(Immutable, ImmutableOp, VariableValue);
178
179#[cfg(test)]
180pub(crate) mod tests {
181    use super::*;
182    use std::marker::PhantomData;
183
184    /// A resolver that always fails.
185    #[derive(Clone)]
186    pub struct FailResolver<Op, D> {
187        _phantom: PhantomData<(Op, D)>,
188    }
189
190    impl<Op, D> Resolver for FailResolver<Op, D>
191    where
192        D: Digest,
193        Op: Send + Sync + Clone + 'static,
194    {
195        type Digest = D;
196        type Op = Op;
197        type Error = qmdb::Error;
198
199        async fn get_operations(
200            &self,
201            _op_count: Location,
202            _start_loc: Location,
203            _max_ops: NonZeroU64,
204        ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
205            Err(qmdb::Error::KeyNotFound) // Arbitrary dummy error
206        }
207    }
208
209    impl<Op, D> FailResolver<Op, D> {
210        pub fn new() -> Self {
211            Self {
212                _phantom: PhantomData,
213            }
214        }
215    }
216}