commonware_sync/net/
resolver.rs1use super::{io, wire};
2use crate::net::request_id;
3use commonware_codec::{Encode, Read};
4use commonware_cryptography::Digest;
5use commonware_runtime::{Network, Spawner};
6use commonware_storage::adb::sync;
7use futures::{
8 channel::{mpsc, oneshot},
9 SinkExt,
10};
11use std::num::NonZeroU64;
12
13#[derive(Clone)]
15pub struct Resolver<Op, D>
16where
17 Op: Read<Cfg = ()> + Encode + Send + Sync + 'static,
18 D: Digest,
19{
20 request_id_generator: request_id::Generator,
21 request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
22}
23
24impl<Op, D> Resolver<Op, D>
25where
26 Op: Send + Sync + Read<Cfg = ()> + Encode,
27 D: Digest,
28{
29 pub async fn connect<E>(
31 context: E,
32 server_addr: std::net::SocketAddr,
33 ) -> Result<Self, commonware_runtime::Error>
34 where
35 E: Network + Spawner,
36 {
37 let (sink, stream) = context.dial(server_addr).await?;
38 let (request_tx, _handle) = io::run(context, sink, stream)?;
39 Ok(Self {
40 request_id_generator: request_id::Generator::new(),
41 request_tx,
42 })
43 }
44
45 pub async fn get_sync_target(&self) -> Result<sync::Target<D>, crate::Error> {
47 let request_id = self.request_id_generator.next();
48 let request =
49 wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
50 let (tx, rx) = oneshot::channel();
51 self.request_tx
52 .clone()
53 .send(io::Request {
54 request,
55 response_tx: tx,
56 })
57 .await
58 .map_err(|_| crate::Error::RequestChannelClosed)?;
59 let response = rx
60 .await
61 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
62 match response {
63 wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
64 wire::Message::Error(err) => Err(crate::Error::Server {
65 code: err.error_code,
66 message: err.message,
67 }),
68 _ => Err(crate::Error::UnexpectedResponse { request_id }),
69 }
70 }
71}
72
73impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
74where
75 Op: Clone + Send + Sync + Read<Cfg = ()> + Encode,
76 D: Digest,
77{
78 type Digest = D;
79 type Op = Op;
80 type Error = crate::Error;
81
82 async fn get_operations(
83 &self,
84 size: u64,
85 start_loc: u64,
86 max_ops: NonZeroU64,
87 ) -> Result<sync::resolver::FetchResult<Self::Op, Self::Digest>, Self::Error> {
88 let request_id = self.request_id_generator.next();
89 let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
90 request_id,
91 size,
92 start_loc,
93 max_ops,
94 });
95 let (tx, rx) = oneshot::channel();
96 self.request_tx
97 .clone()
98 .send(io::Request {
99 request,
100 response_tx: tx,
101 })
102 .await
103 .map_err(|_| crate::Error::RequestChannelClosed)?;
104 let response = rx
105 .await
106 .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
107 let (proof, operations) = match response {
108 wire::Message::GetOperationsResponse(r) => (r.proof, r.operations),
109 wire::Message::Error(err) => {
110 return Err(crate::Error::Server {
111 code: err.error_code,
112 message: err.message,
113 })
114 }
115 _ => return Err(crate::Error::UnexpectedResponse { request_id }),
116 };
117 let (tx, _rx) = oneshot::channel();
118 Ok(sync::resolver::FetchResult {
119 proof,
120 operations,
121 success_tx: tx,
122 })
123 }
124}