Skip to main content

commonware_sync/net/
resolver.rs

1use super::{io, wire};
2use crate::net::request_id;
3use commonware_codec::{EncodeShared, IsUnit, Read};
4use commonware_cryptography::Digest;
5use commonware_runtime::{Network, Spawner};
6use commonware_storage::{
7    mmr::{self, Location},
8    qmdb::sync::{self, compact, resolver::fetch_operation_range},
9};
10use commonware_utils::channel::{mpsc, oneshot};
11use std::num::NonZeroU64;
12
13/// Network resolver that works directly with generic wire messages.
14#[derive(Clone)]
15pub struct Resolver<Op, D>
16where
17    Op: Read + EncodeShared + 'static,
18    Op::Cfg: IsUnit,
19    D: Digest,
20{
21    request_id_generator: request_id::Generator,
22    request_tx: mpsc::Sender<io::Request<wire::Message<Op, D>>>,
23}
24
25impl<Op, D> Resolver<Op, D>
26where
27    Op: Read + EncodeShared,
28    Op::Cfg: IsUnit,
29    D: Digest,
30{
31    /// Returns a resolver connected to the server at the given address.
32    pub async fn connect<E>(
33        context: E,
34        server_addr: std::net::SocketAddr,
35    ) -> Result<Self, commonware_runtime::Error>
36    where
37        E: Network + Spawner,
38    {
39        let (sink, stream) = context.dial(server_addr).await?;
40        let (request_tx, _handle) = io::run(context, sink, stream)?;
41        Ok(Self {
42            request_id_generator: request_id::Generator::new(),
43            request_tx,
44        })
45    }
46
47    /// Returns the current sync target from the server.
48    pub async fn get_sync_target(&self) -> Result<sync::Target<mmr::Family, D>, crate::Error> {
49        let request_id = self.request_id_generator.next();
50        let request =
51            wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
52        let (tx, rx) = oneshot::channel();
53        self.request_tx
54            .clone()
55            .send(io::Request {
56                request,
57                response_tx: tx,
58            })
59            .await
60            .map_err(|_| crate::Error::RequestChannelClosed)?;
61        let response = rx
62            .await
63            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
64        match response {
65            wire::Message::GetSyncTargetResponse(r) => Ok(r.target),
66            wire::Message::Error(err) => Err(crate::Error::Server {
67                code: err.error_code,
68                message: err.message,
69            }),
70            _ => Err(crate::Error::UnexpectedResponse { request_id }),
71        }
72    }
73
74    /// Returns the compact sync target currently served by the remote.
75    pub async fn get_compact_target(
76        &self,
77    ) -> Result<compact::Target<mmr::Family, D>, crate::Error> {
78        let request_id = self.request_id_generator.next();
79        let request =
80            wire::Message::GetCompactTargetRequest(wire::GetCompactTargetRequest { request_id });
81        let (tx, rx) = oneshot::channel();
82        self.request_tx
83            .clone()
84            .send(io::Request {
85                request,
86                response_tx: tx,
87            })
88            .await
89            .map_err(|_| crate::Error::RequestChannelClosed)?;
90        let response = rx
91            .await
92            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
93        match response {
94            wire::Message::GetCompactTargetResponse(r) => Ok(r.target),
95            wire::Message::Error(err) => Err(crate::Error::Server {
96                code: err.error_code,
97                message: err.message,
98            }),
99            _ => Err(crate::Error::UnexpectedResponse { request_id }),
100        }
101    }
102
103    /// Returns compact authenticated state for the given target.
104    pub async fn get_compact_state(
105        &self,
106        target: compact::Target<mmr::Family, D>,
107    ) -> Result<compact::State<mmr::Family, Op, D>, crate::Error> {
108        let request_id = self.request_id_generator.next();
109        let request = wire::Message::GetCompactStateRequest(wire::GetCompactStateRequest {
110            request_id,
111            target,
112        });
113        let (tx, rx) = oneshot::channel();
114        self.request_tx
115            .clone()
116            .send(io::Request {
117                request,
118                response_tx: tx,
119            })
120            .await
121            .map_err(|_| crate::Error::RequestChannelClosed)?;
122        let response = rx
123            .await
124            .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
125        match response {
126            wire::Message::GetCompactStateResponse(r) => Ok(r.state),
127            wire::Message::Error(err) => Err(crate::Error::Server {
128                code: err.error_code,
129                message: err.message,
130            }),
131            _ => Err(crate::Error::UnexpectedResponse { request_id }),
132        }
133    }
134}
135
136impl<Op, D> sync::resolver::Resolver for Resolver<Op, D>
137where
138    Op: Clone + Read + EncodeShared,
139    Op::Cfg: IsUnit,
140    D: Digest,
141{
142    type Family = mmr::Family;
143    type Digest = D;
144    type Op = Op;
145    type Error = crate::Error;
146
147    async fn get_operations(
148        &self,
149        op_count: Location,
150        start_loc: Location,
151        max_ops: NonZeroU64,
152        include_pinned_nodes: bool,
153        _cancel_rx: oneshot::Receiver<()>,
154    ) -> Result<sync::resolver::FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error>
155    {
156        fetch_operation_range(
157            op_count,
158            start_loc,
159            max_ops,
160            include_pinned_nodes,
161            |op_count, start_loc, max_ops, include_pinned_nodes| async move {
162                let request_id = self.request_id_generator.next();
163                let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
164                    request_id,
165                    op_count,
166                    start_loc,
167                    max_ops,
168                    include_pinned_nodes,
169                });
170                let (tx, rx) = oneshot::channel();
171                self.request_tx
172                    .clone()
173                    .send(io::Request {
174                        request,
175                        response_tx: tx,
176                    })
177                    .await
178                    .map_err(|_| crate::Error::RequestChannelClosed)?;
179                let response = rx
180                    .await
181                    .map_err(|_| crate::Error::ResponseChannelClosed { request_id })??;
182                match response {
183                    wire::Message::GetOperationsResponse(r) => {
184                        Ok(sync::resolver::FetchedOperations::new(
185                            r.proof,
186                            r.operations,
187                            r.pinned_nodes,
188                        ))
189                    }
190                    wire::Message::Error(err) => Err(crate::Error::Server {
191                        code: err.error_code,
192                        message: err.message,
193                    }),
194                    _ => Err(crate::Error::UnexpectedResponse { request_id }),
195                }
196            },
197        )
198        .await
199    }
200}
201
202impl<Op, D> compact::Resolver for Resolver<Op, D>
203where
204    Op: Clone + Read + EncodeShared,
205    Op::Cfg: IsUnit,
206    D: Digest,
207{
208    type Family = mmr::Family;
209    type Digest = D;
210    type Op = Op;
211    type Error = crate::Error;
212
213    async fn get_compact_state(
214        &self,
215        target: compact::Target<Self::Family, Self::Digest>,
216    ) -> Result<compact::FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
217        self.get_compact_state(target).await.map(Into::into)
218    }
219}