commonware_storage/qmdb/sync/
resolver.rs1use crate::{
2 mmr::{Location, Proof},
3 qmdb::{
4 self,
5 any::{
6 ordered::{
7 fixed::{Db as OrderedFixedDb, Operation as OrderedFixedOperation},
8 variable::{Db as OrderedVariableDb, Operation as OrderedVariableOperation},
9 },
10 unordered::{
11 fixed::{Db as FixedDb, Operation as FixedOperation},
12 variable::{Db as VariableDb, Operation as VariableOperation},
13 },
14 FixedValue, VariableValue,
15 },
16 immutable::{Immutable, Operation as ImmutableOp},
17 Durable, Merkleized,
18 },
19 translator::Translator,
20};
21use commonware_cryptography::{Digest, Hasher};
22use commonware_runtime::{Clock, Metrics, RwLock, Storage};
23use commonware_utils::{channel::oneshot, Array};
24use std::{future::Future, num::NonZeroU64, sync::Arc};
25
26pub struct FetchResult<Op, D: Digest> {
28 pub proof: Proof<D>,
30 pub operations: Vec<Op>,
32 pub success_tx: oneshot::Sender<bool>,
34}
35
36impl<Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<Op, D> {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("FetchResult")
39 .field("proof", &self.proof)
40 .field("operations", &self.operations)
41 .field("success_tx", &"<callback>")
42 .finish()
43 }
44}
45
46pub trait Resolver: Send + Sync + Clone + 'static {
48 type Digest: Digest;
50
51 type Op;
53
54 type Error: std::error::Error + Send + 'static;
56
57 #[allow(clippy::type_complexity)]
61 fn get_operations<'a>(
62 &'a self,
63 op_count: Location,
64 start_loc: Location,
65 max_ops: NonZeroU64,
66 ) -> impl Future<Output = Result<FetchResult<Self::Op, Self::Digest>, Self::Error>> + Send + 'a;
67}
68
69macro_rules! impl_resolver {
70 ($db:ident, $op:ident, $val_bound:ident) => {
71 impl<E, K, V, H, T> Resolver for Arc<$db<E, K, V, H, T, Merkleized<H>, Durable>>
72 where
73 E: Storage + Clock + Metrics,
74 K: Array,
75 V: $val_bound + Send + Sync + 'static,
76 H: Hasher,
77 T: Translator + Send + Sync + 'static,
78 T::Key: Send + Sync,
79 {
80 type Digest = H::Digest;
81 type Op = $op<K, V>;
82 type Error = qmdb::Error;
83
84 async fn get_operations(
85 &self,
86 op_count: Location,
87 start_loc: Location,
88 max_ops: NonZeroU64,
89 ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
90 self.historical_proof(op_count, start_loc, max_ops)
91 .await
92 .map(|(proof, operations)| FetchResult {
93 proof,
94 operations,
95 success_tx: oneshot::channel().0,
96 })
97 }
98 }
99
100 impl<E, K, V, H, T> Resolver for Arc<RwLock<$db<E, K, V, H, T, Merkleized<H>, Durable>>>
101 where
102 E: Storage + Clock + Metrics,
103 K: Array,
104 V: $val_bound + Send + Sync + 'static,
105 H: Hasher,
106 T: Translator + Send + Sync + 'static,
107 T::Key: Send + Sync,
108 {
109 type Digest = H::Digest;
110 type Op = $op<K, V>;
111 type Error = qmdb::Error;
112
113 async fn get_operations(
114 &self,
115 op_count: Location,
116 start_loc: Location,
117 max_ops: NonZeroU64,
118 ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
119 let db = self.read().await;
120 db.historical_proof(op_count, start_loc, max_ops).await.map(
121 |(proof, operations)| FetchResult {
122 proof,
123 operations,
124 success_tx: oneshot::channel().0,
125 },
126 )
127 }
128 }
129
130 impl<E, K, V, H, T> Resolver
131 for Arc<RwLock<Option<$db<E, K, V, H, T, Merkleized<H>, Durable>>>>
132 where
133 E: Storage + Clock + Metrics,
134 K: Array,
135 V: $val_bound + Send + Sync + 'static,
136 H: Hasher,
137 T: Translator + Send + Sync + 'static,
138 T::Key: Send + Sync,
139 {
140 type Digest = H::Digest;
141 type Op = $op<K, V>;
142 type Error = qmdb::Error;
143
144 async fn get_operations(
145 &self,
146 op_count: Location,
147 start_loc: Location,
148 max_ops: NonZeroU64,
149 ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
150 let guard = self.read().await;
151 let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
152 db.historical_proof(op_count, start_loc, max_ops).await.map(
153 |(proof, operations)| FetchResult {
154 proof,
155 operations,
156 success_tx: oneshot::channel().0,
157 },
158 )
159 }
160 }
161 };
162}
163
164impl_resolver!(FixedDb, FixedOperation, FixedValue);
166
167impl_resolver!(VariableDb, VariableOperation, VariableValue);
169
170impl_resolver!(OrderedFixedDb, OrderedFixedOperation, FixedValue);
172
173impl_resolver!(OrderedVariableDb, OrderedVariableOperation, VariableValue);
175
176impl_resolver!(Immutable, ImmutableOp, VariableValue);
178
179#[cfg(test)]
180pub(crate) mod tests {
181 use super::*;
182 use std::marker::PhantomData;
183
184 #[derive(Clone)]
186 pub struct FailResolver<Op, D> {
187 _phantom: PhantomData<(Op, D)>,
188 }
189
190 impl<Op, D> Resolver for FailResolver<Op, D>
191 where
192 D: Digest,
193 Op: Send + Sync + Clone + 'static,
194 {
195 type Digest = D;
196 type Op = Op;
197 type Error = qmdb::Error;
198
199 async fn get_operations(
200 &self,
201 _op_count: Location,
202 _start_loc: Location,
203 _max_ops: NonZeroU64,
204 ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
205 Err(qmdb::Error::KeyNotFound) }
207 }
208
209 impl<Op, D> FailResolver<Op, D> {
210 pub fn new() -> Self {
211 Self {
212 _phantom: PhantomData,
213 }
214 }
215 }
216}