Skip to main content

commonware_sync/net/
resolver.rs

1use super::{io, wire};
2use crate::net::request_id;
3use commonware_codec::{EncodeShared, IsUnit, Read};
4use commonware_cryptography::Digest;
5use commonware_runtime::{Network, Spawner};
6use commonware_storage::{mmr::Location, qmdb::sync};
7use commonware_utils::channel::{mpsc, oneshot};
8use std::num::NonZeroU64;
9
10/// Network resolver that works directly with generic wire messages.
11#[derive(Clone)]
12pub struct Resolver<Op, D>
13where
14    Op: Read + EncodeShared + 'static,
15    Op::Cfg: IsUnit,
16    D: Digest,
17{
18    request_id_generator: request_id::Generator,
19    request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
20}
21
22impl<Op, D> Resolver<Op, D>
23where
24    Op: Read + EncodeShared,
25    Op::Cfg: IsUnit,
26    D: Digest,
27{
28    /// Returns a resolver connected to the server at the given address.
29    pub async fn connect<E>(
30        context: E,
31        server_addr: std::net::SocketAddr,
32    ) -> Result<Self, commonware_runtime::Error>
33    where
34        E: Network + Spawner,
35    {
36        let (sink, stream) = context.dial(server_addr).await?;
37        let (request_tx, _handle) = io::run(context, sink, stream)?;
38        Ok(Self {
39            request_id_generator: request_id::Generator::new(),
40            request_tx,
41        })
42    }
43
44    /// Returns the current sync target from the server.
45    pub async fn get_sync_target(&self) -> Result<sync::Target<D>, crate::Error> {
46        let request_id = self.request_id_generator.next();
47        let request =
48            wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
49        let (tx, rx) = oneshot::channel();
50        self.request_tx
51            .clone()
52            .send(io::Request {
53                request,
54                response_tx: tx,
55            })
56            .await
57            .map_err(|_| crate::Error::RequestChannelClosed)?;
58        let response = rx
59            .await
60            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
61        match response {
62            wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
63            wire::Message::Error(err) => Err(crate::Error::Server {
64                code: err.error_code,
65                message: err.message,
66            }),
67            _ => Err(crate::Error::UnexpectedResponse { request_id }),
68        }
69    }
70}
71
72impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
73where
74    Op: Clone + Read + EncodeShared,
75    Op::Cfg: IsUnit,
76    D: Digest,
77{
78    type Digest = D;
79    type Op = Op;
80    type Error = crate::Error;
81
82    async fn get_operations(
83        &self,
84        op_count: Location,
85        start_loc: Location,
86        max_ops: NonZeroU64,
87        include_pinned_nodes: bool,
88        _cancel_rx: oneshot::Receiver<()>,
89    ) -> Result<sync::resolver::FetchResult<Self::Op, Self::Digest>, Self::Error> {
90        let request_id = self.request_id_generator.next();
91        let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
92            request_id,
93            op_count,
94            start_loc,
95            max_ops,
96            include_pinned_nodes,
97        });
98        let (tx, rx) = oneshot::channel();
99        self.request_tx
100            .clone()
101            .send(io::Request {
102                request,
103                response_tx: tx,
104            })
105            .await
106            .map_err(|_| crate::Error::RequestChannelClosed)?;
107        let response = rx
108            .await
109            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
110        let (proof, operations, pinned_nodes) = match response {
111            wire::Message::GetOperationsResponse(r) => (r.proof, r.operations, r.pinned_nodes),
112            wire::Message::Error(err) => {
113                return Err(crate::Error::Server {
114                    code: err.error_code,
115                    message: err.message,
116                })
117            }
118            _ => return Err(crate::Error::UnexpectedResponse { request_id }),
119        };
120        let (tx, _rx) = oneshot::channel();
121        Ok(sync::resolver::FetchResult {
122            proof,
123            operations,
124            success_tx: tx,
125            pinned_nodes,
126        })
127    }
128}