1use super::{io, wire};
2use crate::net::request_id;
3use commonware_codec::{EncodeShared, IsUnit, Read};
4use commonware_cryptography::Digest;
5use commonware_runtime::{Network, Spawner};
6use commonware_storage::{
7 mmr::{self, Location},
8 qmdb::sync::{self, compact, resolver::fetch_operation_range},
9};
10use commonware_utils::channel::{mpsc, oneshot};
11use std::num::NonZeroU64;
12
13#[derive(Clone)]
15pub struct Resolver<Op, D>
16where
17 Op: Read + EncodeShared + 'static,
18 Op::Cfg: IsUnit,
19 D: Digest,
20{
21 request_id_generator: request_id::Generator,
22 request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
23}
24
25impl<Op, D> Resolver<Op, D>
26where
27 Op: Read + EncodeShared,
28 Op::Cfg: IsUnit,
29 D: Digest,
30{
31 pub async fn connect<E>(
33 context: E,
34 server_addr: std::net::SocketAddr,
35 ) -> Result<Self, commonware_runtime::Error>
36 where
37 E: Network + Spawner,
38 {
39 let (sink, stream) = context.dial(server_addr).await?;
40 let (request_tx, _handle) = io::run(context, sink, stream)?;
41 Ok(Self {
42 request_id_generator: request_id::Generator::new(),
43 request_tx,
44 })
45 }
46
47 pub async fn get_sync_target(&self) -> Result<sync::Target<mmr::Family, D>, crate::Error> {
49 let request_id = self.request_id_generator.next();
50 let request =
51 wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
52 let (tx, rx) = oneshot::channel();
53 self.request_tx
54 .clone()
55 .send(io::Request {
56 request,
57 response_tx: tx,
58 })
59 .await
60 .map_err(|_| crate::Error::RequestChannelClosed)?;
61 let response = rx
62 .await
63 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
64 match response {
65 wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
66 wire::Message::Error(err) => Err(crate::Error::Server {
67 code: err.error_code,
68 message: err.message,
69 }),
70 _ => Err(crate::Error::UnexpectedResponse { request_id }),
71 }
72 }
73
74 pub async fn get_compact_target(
76 &self,
77 ) -> Result<compact::Target<mmr::Family, D>, crate::Error> {
78 let request_id = self.request_id_generator.next();
79 let request =
80 wire::Message::GetCompactTargetRequest(wire::GetCompactTargetRequest { request_id });
81 let (tx, rx) = oneshot::channel();
82 self.request_tx
83 .clone()
84 .send(io::Request {
85 request,
86 response_tx: tx,
87 })
88 .await
89 .map_err(|_| crate::Error::RequestChannelClosed)?;
90 let response = rx
91 .await
92 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
93 match response {
94 wire::Message::GetCompactTargetResponse(r) => Ok(r.target),
95 wire::Message::Error(err) => Err(crate::Error::Server {
96 code: err.error_code,
97 message: err.message,
98 }),
99 _ => Err(crate::Error::UnexpectedResponse { request_id }),
100 }
101 }
102
103 pub async fn get_compact_state(
105 &self,
106 target: compact::Target<mmr::Family, D>,
107 ) -> Result<compact::State<mmr::Family, Op, D>, crate::Error> {
108 let request_id = self.request_id_generator.next();
109 let request = wire::Message::GetCompactStateRequest(wire::GetCompactStateRequest {
110 request_id,
111 target,
112 });
113 let (tx, rx) = oneshot::channel();
114 self.request_tx
115 .clone()
116 .send(io::Request {
117 request,
118 response_tx: tx,
119 })
120 .await
121 .map_err(|_| crate::Error::RequestChannelClosed)?;
122 let response = rx
123 .await
124 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
125 match response {
126 wire::Message::GetCompactStateResponse(r) => Ok(r.state),
127 wire::Message::Error(err) => Err(crate::Error::Server {
128 code: err.error_code,
129 message: err.message,
130 }),
131 _ => Err(crate::Error::UnexpectedResponse { request_id }),
132 }
133 }
134}
135
136impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
137where
138 Op: Clone + Read + EncodeShared,
139 Op::Cfg: IsUnit,
140 D: Digest,
141{
142 type Family = mmr::Family;
143 type Digest = D;
144 type Op = Op;
145 type Error = crate::Error;
146
147 async fn get_operations(
148 &self,
149 op_count: Location,
150 start_loc: Location,
151 max_ops: NonZeroU64,
152 include_pinned_nodes: bool,
153 _cancel_rx: oneshot::Receiver<()>,
154 ) -> Result<sync::resolver::FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error>
155 {
156 fetch_operation_range(
157 op_count,
158 start_loc,
159 max_ops,
160 include_pinned_nodes,
161 |op_count, start_loc, max_ops, include_pinned_nodes| async move {
162 let request_id = self.request_id_generator.next();
163 let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
164 request_id,
165 op_count,
166 start_loc,
167 max_ops,
168 include_pinned_nodes,
169 });
170 let (tx, rx) = oneshot::channel();
171 self.request_tx
172 .clone()
173 .send(io::Request {
174 request,
175 response_tx: tx,
176 })
177 .await
178 .map_err(|_| crate::Error::RequestChannelClosed)?;
179 let response = rx
180 .await
181 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
182 match response {
183 wire::Message::GetOperationsResponse(r) => {
184 Ok(sync::resolver::FetchedOperations::new(
185 r.proof,
186 r.operations,
187 r.pinned_nodes,
188 ))
189 }
190 wire::Message::Error(err) => Err(crate::Error::Server {
191 code: err.error_code,
192 message: err.message,
193 }),
194 _ => Err(crate::Error::UnexpectedResponse { request_id }),
195 }
196 },
197 )
198 .await
199 }
200}
201
202impl<Op, D> compact::Resolver for Resolver<Op, D>
203where
204 Op: Clone + Read + EncodeShared,
205 Op::Cfg: IsUnit,
206 D: Digest,
207{
208 type Family = mmr::Family;
209 type Digest = D;
210 type Op = Op;
211 type Error = crate::Error;
212
213 async fn get_compact_state(
214 &self,
215 target: compact::Target<Self::Family, Self::Digest>,
216 ) -> Result<compact::FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
217 self.get_compact_state(target).await.map(Into::into)
218 }
219}