commonware_storage/adb/sync/
resolver.rs

1use crate::{
2    adb::{
3        self,
4        any::fixed::unordered::Any,
5        immutable::Immutable,
6        operation::{fixed::unordered::Operation as Fixed, variable::Operation as Variable},
7    },
8    mmr::{Location, Proof},
9    translator::Translator,
10};
11use commonware_codec::CodecFixed;
12use commonware_cryptography::{Digest, Hasher};
13use commonware_runtime::{Clock, Metrics, RwLock, Storage};
14use commonware_utils::Array;
15use futures::channel::oneshot;
16use std::{future::Future, num::NonZeroU64, sync::Arc};
17
18/// Result from a fetch operation
19pub struct FetchResult<Op, D: Digest> {
20    /// The proof for the operations
21    pub proof: Proof<D>,
22    /// The operations that were fetched
23    pub operations: Vec<Op>,
24    /// Channel to report success/failure back to resolver
25    pub success_tx: oneshot::Sender<bool>,
26}
27
28impl<Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<Op, D> {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        f.debug_struct("FetchResult")
31            .field("proof", &self.proof)
32            .field("operations", &self.operations)
33            .field("success_tx", &"<callback>")
34            .finish()
35    }
36}
37
38/// Trait for network communication with the sync server
39pub trait Resolver: Send + Sync + Clone + 'static {
40    /// The digest type used in proofs returned by the resolver
41    type Digest: Digest;
42
43    /// The type of operations returned by the resolver
44    type Op;
45
46    /// The error type returned by the resolver
47    type Error: std::error::Error + Send + 'static;
48
49    /// Get the operations starting at `start_loc` in the database, up to `max_ops` operations.
50    /// Returns the operations and a proof that they were present in the database when it had
51    /// `size` operations.
52    #[allow(clippy::type_complexity)]
53    fn get_operations<'a>(
54        &'a self,
55        op_count: Location,
56        start_loc: Location,
57        max_ops: NonZeroU64,
58    ) -> impl Future<Output = Result<FetchResult<Self::Op, Self::Digest>, Self::Error>> + Send + 'a;
59}
60
61impl<E, K, V, H, T> Resolver for Arc<Any<E, K, V, H, T>>
62where
63    E: Storage + Clock + Metrics,
64    K: Array,
65    V: CodecFixed<Cfg = ()> + Send + Sync + 'static,
66    H: Hasher,
67    T: Translator + Send + Sync + 'static,
68    T::Key: Send + Sync,
69{
70    type Digest = H::Digest;
71    type Op = Fixed<K, V>;
72    type Error = adb::Error;
73
74    async fn get_operations(
75        &self,
76        op_count: Location,
77        start_loc: Location,
78        max_ops: NonZeroU64,
79    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
80        self.historical_proof(op_count, start_loc, max_ops)
81            .await
82            .map(|(proof, operations)| FetchResult {
83                proof,
84                operations,
85                // Result of proof verification isn't used by this implementation.
86                success_tx: oneshot::channel().0,
87            })
88    }
89}
90
91/// Implement Resolver directly for `Arc<RwLock<Any>>` to eliminate the need for wrapper types
92/// while allowing direct database access.
93impl<E, K, V, H, T> Resolver for Arc<RwLock<Any<E, K, V, H, T>>>
94where
95    E: Storage + Clock + Metrics,
96    K: Array,
97    V: CodecFixed<Cfg = ()> + Send + Sync + 'static,
98    H: Hasher,
99    T: Translator + Send + Sync + 'static,
100    T::Key: Send + Sync,
101{
102    type Digest = H::Digest;
103    type Op = Fixed<K, V>;
104    type Error = adb::Error;
105
106    async fn get_operations(
107        &self,
108        op_count: Location,
109        start_loc: Location,
110        max_ops: NonZeroU64,
111    ) -> Result<FetchResult<Self::Op, Self::Digest>, adb::Error> {
112        let db = self.read().await;
113        db.historical_proof(op_count, start_loc, max_ops)
114            .await
115            .map(|(proof, operations)| FetchResult {
116                proof,
117                operations,
118                // Result of proof verification isn't used by this implementation.
119                success_tx: oneshot::channel().0,
120            })
121    }
122}
123
124impl<E, K, V, H, T> Resolver for Arc<Immutable<E, K, V, H, T>>
125where
126    E: Storage + Clock + Metrics,
127    K: Array,
128    V: commonware_codec::Codec + Send + Sync + 'static,
129    H: Hasher,
130    T: Translator + Send + Sync + 'static,
131    T::Key: Send + Sync,
132{
133    type Digest = H::Digest;
134    type Op = Variable<K, V>;
135    type Error = crate::adb::Error;
136
137    async fn get_operations(
138        &self,
139        op_count: Location,
140        start_loc: Location,
141        max_ops: NonZeroU64,
142    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
143        self.historical_proof(op_count, start_loc, max_ops)
144            .await
145            .map(|(proof, operations)| FetchResult {
146                proof,
147                operations,
148                // Result of proof verification isn't used by this implementation.
149                success_tx: oneshot::channel().0,
150            })
151    }
152}
153
154/// Implement Resolver directly for `Arc<RwLock<Immutable>>` to eliminate the need for wrapper
155/// types while allowing direct database access.
156impl<E, K, V, H, T> Resolver for Arc<RwLock<Immutable<E, K, V, H, T>>>
157where
158    E: Storage + Clock + Metrics,
159    K: Array,
160    V: commonware_codec::Codec + Send + Sync + 'static,
161    H: Hasher,
162    T: Translator + Send + Sync + 'static,
163    T::Key: Send + Sync,
164{
165    type Digest = H::Digest;
166    type Op = Variable<K, V>;
167    type Error = crate::adb::Error;
168
169    async fn get_operations(
170        &self,
171        op_count: Location,
172        start_loc: Location,
173        max_ops: NonZeroU64,
174    ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
175        let db = self.read().await;
176        db.historical_proof(op_count, start_loc, max_ops)
177            .await
178            .map(|(proof, operations)| FetchResult {
179                proof,
180                operations,
181                // Result of proof verification isn't used by this implementation.
182                success_tx: oneshot::channel().0,
183            })
184    }
185}
186
187#[cfg(test)]
188pub(crate) mod tests {
189    use super::*;
190    use std::marker::PhantomData;
191
192    #[derive(Clone)]
193    pub struct FailResolver<D, K, V> {
194        _digest: PhantomData<D>,
195        _key: PhantomData<K>,
196        _value: PhantomData<V>,
197    }
198
199    impl<D, K, V> Resolver for FailResolver<D, K, V>
200    where
201        D: Digest,
202        K: Array,
203        V: CodecFixed<Cfg = ()> + Clone + Send + Sync + 'static,
204    {
205        type Digest = D;
206        type Op = Fixed<K, V>;
207        type Error = adb::Error;
208
209        async fn get_operations(
210            &self,
211            _op_count: Location,
212            _start_loc: Location,
213            _max_ops: NonZeroU64,
214        ) -> Result<FetchResult<Self::Op, Self::Digest>, adb::Error> {
215            Err(adb::Error::KeyNotFound) // Arbitrary dummy error
216        }
217    }
218
219    impl<D, K, V> FailResolver<D, K, V> {
220        pub fn new() -> Self {
221            Self {
222                _digest: PhantomData,
223                _key: PhantomData,
224                _value: PhantomData,
225            }
226        }
227    }
228}