commonware_storage/qmdb/sync/
resolver.rs1use crate::{
2 mmr::{Location, Proof},
3 qmdb::{
4 self,
5 any::{
6 ordered::{
7 fixed::{Db as OrderedFixedDb, Operation as OrderedFixedOperation},
8 variable::{Db as OrderedVariableDb, Operation as OrderedVariableOperation},
9 },
10 unordered::{
11 fixed::{Db as FixedDb, Operation as FixedOperation},
12 variable::{Db as VariableDb, Operation as VariableOperation},
13 },
14 FixedValue, VariableValue,
15 },
16 immutable::{Immutable, Operation as ImmutableOp},
17 },
18 translator::Translator,
19};
20use commonware_cryptography::{Digest, Hasher};
21use commonware_runtime::{Clock, Metrics, Storage};
22use commonware_utils::{channel::oneshot, sync::AsyncRwLock, Array};
23use std::{future::Future, num::NonZeroU64, sync::Arc};
24
25pub struct FetchResult<Op, D: Digest> {
27 pub proof: Proof<D>,
29 pub operations: Vec<Op>,
31 pub success_tx: oneshot::Sender<bool>,
33}
34
35impl<Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<Op, D> {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("FetchResult")
38 .field("proof", &self.proof)
39 .field("operations", &self.operations)
40 .field("success_tx", &"<callback>")
41 .finish()
42 }
43}
44
45pub trait Resolver: Send + Sync + Clone + 'static {
47 type Digest: Digest;
49
50 type Op;
52
53 type Error: std::error::Error + Send + 'static;
55
56 #[allow(clippy::type_complexity)]
60 fn get_operations<'a>(
61 &'a self,
62 op_count: Location,
63 start_loc: Location,
64 max_ops: NonZeroU64,
65 ) -> impl Future<Output = Result<FetchResult<Self::Op, Self::Digest>, Self::Error>> + Send + 'a;
66}
67
68macro_rules! impl_resolver {
69 ($db:ident, $op:ident, $val_bound:ident) => {
70 impl<E, K, V, H, T> Resolver for Arc<$db<E, K, V, H, T>>
71 where
72 E: Storage + Clock + Metrics,
73 K: Array,
74 V: $val_bound + Send + Sync + 'static,
75 H: Hasher,
76 T: Translator + Send + Sync + 'static,
77 T::Key: Send + Sync,
78 {
79 type Digest = H::Digest;
80 type Op = $op<K, V>;
81 type Error = qmdb::Error;
82
83 async fn get_operations(
84 &self,
85 op_count: Location,
86 start_loc: Location,
87 max_ops: NonZeroU64,
88 ) -> Result<FetchResult<Self::Op, Self::Digest>, Self::Error> {
89 self.historical_proof(op_count, start_loc, max_ops)
90 .await
91 .map(|(proof, operations)| FetchResult {
92 proof,
93 operations,
94 success_tx: oneshot::channel().0,
95 })
96 }
97 }
98
99 impl<E, K, V, H, T> Resolver for Arc<AsyncRwLock<$db<E, K, V, H, T>>>
100 where
101 E: Storage + Clock + Metrics,
102 K: Array,
103 V: $val_bound + Send + Sync + 'static,
104 H: Hasher,
105 T: Translator + Send + Sync + 'static,
106 T::Key: Send + Sync,
107 {
108 type Digest = H::Digest;
109 type Op = $op<K, V>;
110 type Error = qmdb::Error;
111
112 async fn get_operations(
113 &self,
114 op_count: Location,
115 start_loc: Location,
116 max_ops: NonZeroU64,
117 ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
118 let db = self.read().await;
119 db.historical_proof(op_count, start_loc, max_ops).await.map(
120 |(proof, operations)| FetchResult {
121 proof,
122 operations,
123 success_tx: oneshot::channel().0,
124 },
125 )
126 }
127 }
128
129 impl<E, K, V, H, T> Resolver for Arc<AsyncRwLock<Option<$db<E, K, V, H, T>>>>
130 where
131 E: Storage + Clock + Metrics,
132 K: Array,
133 V: $val_bound + Send + Sync + 'static,
134 H: Hasher,
135 T: Translator + Send + Sync + 'static,
136 T::Key: Send + Sync,
137 {
138 type Digest = H::Digest;
139 type Op = $op<K, V>;
140 type Error = qmdb::Error;
141
142 async fn get_operations(
143 &self,
144 op_count: Location,
145 start_loc: Location,
146 max_ops: NonZeroU64,
147 ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
148 let guard = self.read().await;
149 let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
150 db.historical_proof(op_count, start_loc, max_ops).await.map(
151 |(proof, operations)| FetchResult {
152 proof,
153 operations,
154 success_tx: oneshot::channel().0,
155 },
156 )
157 }
158 }
159 };
160}
161
162impl_resolver!(FixedDb, FixedOperation, FixedValue);
164
165impl_resolver!(VariableDb, VariableOperation, VariableValue);
167
168impl_resolver!(OrderedFixedDb, OrderedFixedOperation, FixedValue);
170
171impl_resolver!(OrderedVariableDb, OrderedVariableOperation, VariableValue);
173
174impl_resolver!(Immutable, ImmutableOp, VariableValue);
176
177#[cfg(test)]
178pub(crate) mod tests {
179 use super::*;
180 use std::marker::PhantomData;
181
182 #[derive(Clone)]
184 pub struct FailResolver<Op, D> {
185 _phantom: PhantomData<(Op, D)>,
186 }
187
188 impl<Op, D> Resolver for FailResolver<Op, D>
189 where
190 D: Digest,
191 Op: Send + Sync + Clone + 'static,
192 {
193 type Digest = D;
194 type Op = Op;
195 type Error = qmdb::Error;
196
197 async fn get_operations(
198 &self,
199 _op_count: Location,
200 _start_loc: Location,
201 _max_ops: NonZeroU64,
202 ) -> Result<FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
203 Err(qmdb::Error::KeyNotFound) }
205 }
206
207 impl<Op, D> FailResolver<Op, D> {
208 pub fn new() -> Self {
209 Self {
210 _phantom: PhantomData,
211 }
212 }
213 }
214}