Skip to main content

commonware_sync/net/
resolver.rs

1use super::{io, wire};
2use crate::net::request_id;
3use commonware_codec::{EncodeShared, Read};
4use commonware_cryptography::Digest;
5use commonware_runtime::{Network, Spawner};
6use commonware_storage::{mmr::Location, qmdb::sync};
7use commonware_utils::channel::{mpsc, oneshot};
8use std::num::NonZeroU64;
9
10/// Network resolver that works directly with generic wire messages.
11#[derive(Clone)]
12pub struct Resolver<Op, D>
13where
14    Op: Read<Cfg = ()> + EncodeShared + 'static,
15    D: Digest,
16{
17    request_id_generator: request_id::Generator,
18    request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
19}
20
21impl<Op, D> Resolver<Op, D>
22where
23    Op: Read<Cfg = ()> + EncodeShared,
24    D: Digest,
25{
26    /// Returns a resolver connected to the server at the given address.
27    pub async fn connect<E>(
28        context: E,
29        server_addr: std::net::SocketAddr,
30    ) -> Result<Self, commonware_runtime::Error>
31    where
32        E: Network + Spawner,
33    {
34        let (sink, stream) = context.dial(server_addr).await?;
35        let (request_tx, _handle) = io::run(context, sink, stream)?;
36        Ok(Self {
37            request_id_generator: request_id::Generator::new(),
38            request_tx,
39        })
40    }
41
42    /// Returns the current sync target from the server.
43    pub async fn get_sync_target(&self) -> Result<sync::Target<D>, crate::Error> {
44        let request_id = self.request_id_generator.next();
45        let request =
46            wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
47        let (tx, rx) = oneshot::channel();
48        self.request_tx
49            .clone()
50            .send(io::Request {
51                request,
52                response_tx: tx,
53            })
54            .await
55            .map_err(|_| crate::Error::RequestChannelClosed)?;
56        let response = rx
57            .await
58            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
59        match response {
60            wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
61            wire::Message::Error(err) => Err(crate::Error::Server {
62                code: err.error_code,
63                message: err.message,
64            }),
65            _ => Err(crate::Error::UnexpectedResponse { request_id }),
66        }
67    }
68}
69
70impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
71where
72    Op: Clone + Read<Cfg = ()> + EncodeShared,
73    D: Digest,
74{
75    type Digest = D;
76    type Op = Op;
77    type Error = crate::Error;
78
79    async fn get_operations(
80        &self,
81        op_count: Location,
82        start_loc: Location,
83        max_ops: NonZeroU64,
84    ) -> Result<sync::resolver::FetchResult<Self::Op, Self::Digest>, Self::Error> {
85        let request_id = self.request_id_generator.next();
86        let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
87            request_id,
88            op_count,
89            start_loc,
90            max_ops,
91        });
92        let (tx, rx) = oneshot::channel();
93        self.request_tx
94            .clone()
95            .send(io::Request {
96                request,
97                response_tx: tx,
98            })
99            .await
100            .map_err(|_| crate::Error::RequestChannelClosed)?;
101        let response = rx
102            .await
103            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
104        let (proof, operations) = match response {
105            wire::Message::GetOperationsResponse(r) => (r.proof, r.operations),
106            wire::Message::Error(err) => {
107                return Err(crate::Error::Server {
108                    code: err.error_code,
109                    message: err.message,
110                })
111            }
112            _ => return Err(crate::Error::UnexpectedResponse { request_id }),
113        };
114        let (tx, _rx) = oneshot::channel();
115        Ok(sync::resolver::FetchResult {
116            proof,
117            operations,
118            success_tx: tx,
119        })
120    }
121}