commonware_storage/qmdb/sync/
resolver.rs

1use crate::{
2    mmr::{Location, Proof},
3    qmdb::{
4        self,
5        any::{
6            unordered::fixed::{Db, Operation},
7            FixedValue, VariableValue,
8        },
9        immutable::{Immutable, Operation as ImmutableOp},
10        store::CleanStore as _,
11    },
12    translator::Translator,
13};
14use commonware_cryptography::{Digest, Hasher};
15use commonware_runtime::{Clock, Metrics, RwLock, Storage};
16use commonware_utils::Array;
17use futures::channel::oneshot;
18use std::{future::Future, num::NonZeroU64, sync::Arc};
19
20/// Result from a fetch operation
21pub struct FetchResult<Op, D: Digest> {
22    /// The proof for the operations
23    pub proof: Proof<D>,
24    /// The operations that were fetched
25    pub operations: Vec<Op>,
26    /// Channel to report success/failure back to resolver
27    pub success_tx: oneshot::Sender<bool>,
28}
29
30impl<Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<Op, D> {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("FetchResult")
33            .field("proof", &self.proof)
34            .field("operations", &self.operations)
35            .field("success_tx", &"<callback>")
36            .finish()
37    }
38}
39
40/// Trait for network communication with the sync server
41pub trait Resolver: Send + Sync + Clone + 'static {
42    /// The digest type used in proofs returned by the resolver
43    type Digest: Digest;
44
45    /// The type of operations returned by the resolver
46    type Op;
47
48    /// The error type returned by the resolver
49    type Error: std::error::Error + Send + 'static;
50
51    /// Get the operations starting at `start_loc` in the database, up to `max_ops` operations.
52    /// Returns the operations and a proof that they were present in the database when it had
53    /// `size` operations.
54    #[allow(clippy::type_complexity)]
55    fn get_operations<'a>(
56        &'a self,
57        op_count: Location,
58        start_loc: Location,
59        max_ops: NonZeroU64,
60    ) -> impl Future<Output = Result<FetchResult<Self::Op, Self::Digest>, Self::Error>> + Send + 'a;
61}
62
63impl<E, K, V, H, T> Resolver for Arc<Db<E, K, V, H, T>>
64where
65    E: Storage + Clock + Metrics,
66    K: Array,
67    V: FixedValue + Send + Sync + 'static,
68    H: Hasher,
69    T: Translator + Send + Sync + 'static,
70    T::Key: Send + Sync,
71{
72    type Digest = H::Digest;
73    type Op = Operation<K, V>;
74    type Error = qmdb::Error;
75
76    async fn get_operations(
77        &self,
78        op_count: Location,
79        start_loc: Location,
80        max_ops: NonZeroU64,
81    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
82        self.historical_proof(op_count, start_loc, max_ops)
83            .await
84            .map(|(proof, operations)| FetchResult {
85                proof,
86                operations,
87                // Result of proof verification isn't used by this implementation.
88                success_tx: oneshot::channel().0,
89            })
90    }
91}
92
93/// Implement Resolver directly for `Arc<RwLock<Db>>` to eliminate the need for wrapper types
94/// while allowing direct database access.
95impl<E, K, V, H, T> Resolver for Arc<RwLock<Db<E, K, V, H, T>>>
96where
97    E: Storage + Clock + Metrics,
98    K: Array,
99    V: FixedValue + Send + Sync + 'static,
100    H: Hasher,
101    T: Translator + Send + Sync + 'static,
102    T::Key: Send + Sync,
103{
104    type Digest = H::Digest;
105    type Op = Operation<K, V>;
106    type Error = qmdb::Error;
107
108    async fn get_operations(
109        &self,
110        op_count: Location,
111        start_loc: Location,
112        max_ops: NonZeroU64,
113    ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
114        let db = self.read().await;
115        db.historical_proof(op_count, start_loc, max_ops)
116            .await
117            .map(|(proof, operations)| FetchResult {
118                proof,
119                operations,
120                // Result of proof verification isn't used by this implementation.
121                success_tx: oneshot::channel().0,
122            })
123    }
124}
125
126impl<E, K, V, H, T> Resolver for Arc<Immutable<E, K, V, H, T>>
127where
128    E: Storage + Clock + Metrics,
129    K: Array,
130    V: VariableValue + Send + Sync + 'static,
131    H: Hasher,
132    T: Translator + Send + Sync + 'static,
133    T::Key: Send + Sync,
134{
135    type Digest = H::Digest;
136    type Op = ImmutableOp<K, V>;
137    type Error = crate::qmdb::Error;
138
139    async fn get_operations(
140        &self,
141        op_count: Location,
142        start_loc: Location,
143        max_ops: NonZeroU64,
144    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
145        self.historical_proof(op_count, start_loc, max_ops)
146            .await
147            .map(|(proof, operations)| FetchResult {
148                proof,
149                operations,
150                // Result of proof verification isn't used by this implementation.
151                success_tx: oneshot::channel().0,
152            })
153    }
154}
155
156/// Implement Resolver directly for `Arc<RwLock<Immutable>>` to eliminate the need for wrapper
157/// types while allowing direct database access.
158impl<E, K, V, H, T> Resolver for Arc<RwLock<Immutable<E, K, V, H, T>>>
159where
160    E: Storage + Clock + Metrics,
161    K: Array,
162    V: VariableValue + Send + Sync + 'static,
163    H: Hasher,
164    T: Translator + Send + Sync + 'static,
165    T::Key: Send + Sync,
166{
167    type Digest = H::Digest;
168    type Op = ImmutableOp<K, V>;
169    type Error = crate::qmdb::Error;
170
171    async fn get_operations(
172        &self,
173        op_count: Location,
174        start_loc: Location,
175        max_ops: NonZeroU64,
176    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
177        let db = self.read().await;
178        db.historical_proof(op_count, start_loc, max_ops)
179            .await
180            .map(|(proof, operations)| FetchResult {
181                proof,
182                operations,
183                // Result of proof verification isn't used by this implementation.
184                success_tx: oneshot::channel().0,
185            })
186    }
187}
188
189#[cfg(test)]
190pub(crate) mod tests {
191    use super::*;
192    use std::marker::PhantomData;
193
194    #[derive(Clone)]
195    pub struct FailResolver<D, K, V> {
196        _digest: PhantomData<D>,
197        _key: PhantomData<K>,
198        _value: PhantomData<V>,
199    }
200
201    impl<D, K, V> Resolver for FailResolver<D, K, V>
202    where
203        D: Digest,
204        K: Array,
205        V: FixedValue + Clone + Send + Sync + 'static,
206    {
207        type Digest = D;
208        type Op = Operation<K, V>;
209        type Error = qmdb::Error;
210
211        async fn get_operations(
212            &self,
213            _op_count: Location,
214            _start_loc: Location,
215            _max_ops: NonZeroU64,
216        ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
217            Err(qmdb::Error::KeyNotFound) // Arbitrary dummy error
218        }
219    }
220
221    impl<D, K, V> FailResolver<D, K, V> {
222        pub fn new() -> Self {
223            Self {
224                _digest: PhantomData,
225                _key: PhantomData,
226                _value: PhantomData,
227            }
228        }
229    }
230}