commonware_sync/net/
resolver.rs

1use super::{io, wire};
2use crate::net::request_id;
3use commonware_codec::{Encode, Read};
4use commonware_cryptography::Digest;
5use commonware_runtime::{Network, Spawner};
6use commonware_storage::adb::sync;
7use futures::{
8    channel::{mpsc, oneshot},
9    SinkExt,
10};
11use std::num::NonZeroU64;
12
13/// Network resolver that works directly with generic wire messages.
14#[derive(Clone)]
15pub struct Resolver<Op, D>
16where
17    Op: Read<Cfg = ()> + Encode + Send + Sync + 'static,
18    D: Digest,
19{
20    request_id_generator: request_id::Generator,
21    request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
22}
23
24impl<Op, D> Resolver<Op, D>
25where
26    Op: Send + Sync + Read<Cfg = ()> + Encode,
27    D: Digest,
28{
29    /// Returns a resolver connected to the server at the given address.
30    pub async fn connect<E>(
31        context: E,
32        server_addr: std::net::SocketAddr,
33    ) -> Result<Self, commonware_runtime::Error>
34    where
35        E: Network + Spawner,
36    {
37        let (sink, stream) = context.dial(server_addr).await?;
38        let (request_tx, _handle) = io::run(context, sink, stream)?;
39        Ok(Self {
40            request_id_generator: request_id::Generator::new(),
41            request_tx,
42        })
43    }
44
45    /// Returns the current sync target from the server.
46    pub async fn get_sync_target(&self) -> Result<sync::Target<D>, crate::Error> {
47        let request_id = self.request_id_generator.next();
48        let request =
49            wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
50        let (tx, rx) = oneshot::channel();
51        self.request_tx
52            .clone()
53            .send(io::Request {
54                request,
55                response_tx: tx,
56            })
57            .await
58            .map_err(|_| crate::Error::RequestChannelClosed)?;
59        let response = rx
60            .await
61            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
62        match response {
63            wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
64            wire::Message::Error(err) => Err(crate::Error::Server {
65                code: err.error_code,
66                message: err.message,
67            }),
68            _ => Err(crate::Error::UnexpectedResponse { request_id }),
69        }
70    }
71}
72
73impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
74where
75    Op: Clone + Send + Sync + Read<Cfg = ()> + Encode,
76    D: Digest,
77{
78    type Digest = D;
79    type Op = Op;
80    type Error = crate::Error;
81
82    async fn get_operations(
83        &self,
84        size: u64,
85        start_loc: u64,
86        max_ops: NonZeroU64,
87    ) -> Result<sync::resolver::FetchResult<Self::Op, Self::Digest>, Self::Error> {
88        let request_id = self.request_id_generator.next();
89        let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
90            request_id,
91            size,
92            start_loc,
93            max_ops,
94        });
95        let (tx, rx) = oneshot::channel();
96        self.request_tx
97            .clone()
98            .send(io::Request {
99                request,
100                response_tx: tx,
101            })
102            .await
103            .map_err(|_| crate::Error::RequestChannelClosed)?;
104        let response = rx
105            .await
106            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
107        let (proof, operations) = match response {
108            wire::Message::GetOperationsResponse(r) => (r.proof, r.operations),
109            wire::Message::Error(err) => {
110                return Err(crate::Error::Server {
111                    code: err.error_code,
112                    message: err.message,
113                })
114            }
115            _ => return Err(crate::Error::UnexpectedResponse { request_id }),
116        };
117        let (tx, _rx) = oneshot::channel();
118        Ok(sync::resolver::FetchResult {
119            proof,
120            operations,
121            success_tx: tx,
122        })
123    }
124}