commonware_sync/net/
resolver.rs1use super::{io, wire};
2use crate::net::request_id;
3use commonware_codec::{EncodeShared, IsUnit, Read};
4use commonware_cryptography::Digest;
5use commonware_runtime::{Network, Spawner};
6use commonware_storage::{mmr::Location, qmdb::sync};
7use commonware_utils::channel::{mpsc, oneshot};
8use std::num::NonZeroU64;
9
10#[derive(Clone)]
12pub struct Resolver<Op, D>
13where
14 Op: Read + EncodeShared + 'static,
15 Op::Cfg: IsUnit,
16 D: Digest,
17{
18 request_id_generator: request_id::Generator,
19 request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
20}
21
22impl<Op, D> Resolver<Op, D>
23where
24 Op: Read + EncodeShared,
25 Op::Cfg: IsUnit,
26 D: Digest,
27{
28 pub async fn connect<E>(
30 context: E,
31 server_addr: std::net::SocketAddr,
32 ) -> Result<Self, commonware_runtime::Error>
33 where
34 E: Network + Spawner,
35 {
36 let (sink, stream) = context.dial(server_addr).await?;
37 let (request_tx, _handle) = io::run(context, sink, stream)?;
38 Ok(Self {
39 request_id_generator: request_id::Generator::new(),
40 request_tx,
41 })
42 }
43
44 pub async fn get_sync_target(&self) -> Result<sync::Target<D>, crate::Error> {
46 let request_id = self.request_id_generator.next();
47 let request =
48 wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
49 let (tx, rx) = oneshot::channel();
50 self.request_tx
51 .clone()
52 .send(io::Request {
53 request,
54 response_tx: tx,
55 })
56 .await
57 .map_err(|_| crate::Error::RequestChannelClosed)?;
58 let response = rx
59 .await
60 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
61 match response {
62 wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
63 wire::Message::Error(err) => Err(crate::Error::Server {
64 code: err.error_code,
65 message: err.message,
66 }),
67 _ => Err(crate::Error::UnexpectedResponse { request_id }),
68 }
69 }
70}
71
72impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
73where
74 Op: Clone + Read + EncodeShared,
75 Op::Cfg: IsUnit,
76 D: Digest,
77{
78 type Digest = D;
79 type Op = Op;
80 type Error = crate::Error;
81
82 async fn get_operations(
83 &self,
84 op_count: Location,
85 start_loc: Location,
86 max_ops: NonZeroU64,
87 include_pinned_nodes: bool,
88 _cancel_rx: oneshot::Receiver<()>,
89 ) -> Result<sync::resolver::FetchResult<Self::Op, Self::Digest>, Self::Error> {
90 let request_id = self.request_id_generator.next();
91 let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
92 request_id,
93 op_count,
94 start_loc,
95 max_ops,
96 include_pinned_nodes,
97 });
98 let (tx, rx) = oneshot::channel();
99 self.request_tx
100 .clone()
101 .send(io::Request {
102 request,
103 response_tx: tx,
104 })
105 .await
106 .map_err(|_| crate::Error::RequestChannelClosed)?;
107 let response = rx
108 .await
109 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
110 let (proof, operations, pinned_nodes) = match response {
111 wire::Message::GetOperationsResponse(r) => (r.proof, r.operations, r.pinned_nodes),
112 wire::Message::Error(err) => {
113 return Err(crate::Error::Server {
114 code: err.error_code,
115 message: err.message,
116 })
117 }
118 _ => return Err(crate::Error::UnexpectedResponse { request_id }),
119 };
120 let (tx, _rx) = oneshot::channel();
121 Ok(sync::resolver::FetchResult {
122 proof,
123 operations,
124 success_tx: tx,
125 pinned_nodes,
126 })
127 }
128}