commonware_sync/net/
resolver.rs1use super::{io, wire};
2use crate::net::request_id;
3use commonware_codec::{EncodeShared, Read};
4use commonware_cryptography::Digest;
5use commonware_runtime::{Network, Spawner};
6use commonware_storage::{mmr::Location, qmdb::sync};
7use commonware_utils::channel::{mpsc, oneshot};
8use std::num::NonZeroU64;
9
10#[derive(Clone)]
12pub struct Resolver<Op, D>
13where
14 Op: Read<Cfg = ()> + EncodeShared + 'static,
15 D: Digest,
16{
17 request_id_generator: request_id::Generator,
18 request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
19}
20
21impl<Op, D> Resolver<Op, D>
22where
23 Op: Read<Cfg = ()> + EncodeShared,
24 D: Digest,
25{
26 pub async fn connect<E>(
28 context: E,
29 server_addr: std::net::SocketAddr,
30 ) -> Result<Self, commonware_runtime::Error>
31 where
32 E: Network + Spawner,
33 {
34 let (sink, stream) = context.dial(server_addr).await?;
35 let (request_tx, _handle) = io::run(context, sink, stream)?;
36 Ok(Self {
37 request_id_generator: request_id::Generator::new(),
38 request_tx,
39 })
40 }
41
42 pub async fn get_sync_target(&self) -> Result<sync::Target<D>, crate::Error> {
44 let request_id = self.request_id_generator.next();
45 let request =
46 wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
47 let (tx, rx) = oneshot::channel();
48 self.request_tx
49 .clone()
50 .send(io::Request {
51 request,
52 response_tx: tx,
53 })
54 .await
55 .map_err(|_| crate::Error::RequestChannelClosed)?;
56 let response = rx
57 .await
58 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
59 match response {
60 wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
61 wire::Message::Error(err) => Err(crate::Error::Server {
62 code: err.error_code,
63 message: err.message,
64 }),
65 _ => Err(crate::Error::UnexpectedResponse { request_id }),
66 }
67 }
68}
69
70impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
71where
72 Op: Clone + Read<Cfg = ()> + EncodeShared,
73 D: Digest,
74{
75 type Digest = D;
76 type Op = Op;
77 type Error = crate::Error;
78
79 async fn get_operations(
80 &self,
81 op_count: Location,
82 start_loc: Location,
83 max_ops: NonZeroU64,
84 ) -> Result<sync::resolver::FetchResult<Self::Op, Self::Digest>, Self::Error> {
85 let request_id = self.request_id_generator.next();
86 let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
87 request_id,
88 op_count,
89 start_loc,
90 max_ops,
91 });
92 let (tx, rx) = oneshot::channel();
93 self.request_tx
94 .clone()
95 .send(io::Request {
96 request,
97 response_tx: tx,
98 })
99 .await
100 .map_err(|_| crate::Error::RequestChannelClosed)?;
101 let response = rx
102 .await
103 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
104 let (proof, operations) = match response {
105 wire::Message::GetOperationsResponse(r) => (r.proof, r.operations),
106 wire::Message::Error(err) => {
107 return Err(crate::Error::Server {
108 code: err.error_code,
109 message: err.message,
110 })
111 }
112 _ => return Err(crate::Error::UnexpectedResponse { request_id }),
113 };
114 let (tx, _rx) = oneshot::channel();
115 Ok(sync::resolver::FetchResult {
116 proof,
117 operations,
118 success_tx: tx,
119 })
120 }
121}