commonware_sync/net/
wire.rs

1use crate::net::{ErrorResponse, RequestId};
2use bytes::{Buf, BufMut};
3use commonware_codec::{
4    DecodeExt, Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write,
5};
6use commonware_cryptography::Digest;
7use commonware_storage::{adb::sync::Target, mmr::verification::Proof};
8use std::num::NonZeroU64;
9
10/// Maximum number of digests in a proof.
11pub const MAX_DIGESTS: usize = 10_000;
12
13/// Request for operations from the server.
14#[derive(Debug)]
15pub struct GetOperationsRequest {
16    pub request_id: RequestId,
17    pub size: u64,
18    pub start_loc: u64,
19    pub max_ops: NonZeroU64,
20}
21
22/// Response with operations and proof.
23#[derive(Debug)]
24pub struct GetOperationsResponse<Op, D>
25where
26    D: Digest,
27{
28    pub request_id: RequestId,
29    pub proof: Proof<D>,
30    pub operations: Vec<Op>,
31}
32
33/// Request for sync target from server.
34#[derive(Debug)]
35pub struct GetSyncTargetRequest {
36    pub request_id: RequestId,
37}
38
39/// Response with sync target.
40#[derive(Debug)]
41pub struct GetSyncTargetResponse<D>
42where
43    D: Digest,
44{
45    pub request_id: RequestId,
46    pub target: Target<D>,
47}
48
49/// Messages that can be sent over the wire.
50#[derive(Debug)]
51pub enum Message<Op, D>
52where
53    D: Digest,
54{
55    GetOperationsRequest(GetOperationsRequest),
56    GetOperationsResponse(GetOperationsResponse<Op, D>),
57    GetSyncTargetRequest(GetSyncTargetRequest),
58    GetSyncTargetResponse(GetSyncTargetResponse<D>),
59    Error(ErrorResponse),
60}
61
62impl<Op, D> Message<Op, D>
63where
64    D: Digest,
65{
66    pub fn request_id(&self) -> RequestId {
67        match self {
68            Message::GetOperationsRequest(r) => r.request_id,
69            Message::GetOperationsResponse(r) => r.request_id,
70            Message::GetSyncTargetRequest(r) => r.request_id,
71            Message::GetSyncTargetResponse(r) => r.request_id,
72            Message::Error(e) => e.request_id,
73        }
74    }
75}
76
77impl<Op, D> super::Message for Message<Op, D>
78where
79    Op: Encode + DecodeExt<()> + Send + Sync + 'static,
80    D: Digest,
81{
82    fn request_id(&self) -> RequestId {
83        self.request_id()
84    }
85}
86
87impl<Op, D> Write for Message<Op, D>
88where
89    Op: Write,
90    D: Digest,
91{
92    fn write(&self, buf: &mut impl BufMut) {
93        match self {
94            Message::GetOperationsRequest(req) => {
95                0u8.write(buf);
96                req.write(buf);
97            }
98            Message::GetOperationsResponse(resp) => {
99                1u8.write(buf);
100                resp.write(buf);
101            }
102            Message::GetSyncTargetRequest(req) => {
103                2u8.write(buf);
104                req.write(buf);
105            }
106            Message::GetSyncTargetResponse(resp) => {
107                3u8.write(buf);
108                resp.write(buf);
109            }
110            Message::Error(err) => {
111                4u8.write(buf);
112                err.write(buf);
113            }
114        }
115    }
116}
117
118impl<Op, D> EncodeSize for Message<Op, D>
119where
120    Op: EncodeSize,
121    D: Digest,
122{
123    fn encode_size(&self) -> usize {
124        1 + match self {
125            Message::GetOperationsRequest(req) => req.encode_size(),
126            Message::GetOperationsResponse(resp) => resp.encode_size(),
127            Message::GetSyncTargetRequest(req) => req.encode_size(),
128            Message::GetSyncTargetResponse(resp) => resp.encode_size(),
129            Message::Error(err) => err.encode_size(),
130        }
131    }
132}
133
134impl<Op, D> Read for Message<Op, D>
135where
136    Op: Read<Cfg = ()>,
137    D: Digest,
138{
139    type Cfg = ();
140    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
141        let tag = u8::read(buf)?;
142        match tag {
143            0 => Ok(Message::GetOperationsRequest(GetOperationsRequest::read(
144                buf,
145            )?)),
146            1 => Ok(Message::GetOperationsResponse(GetOperationsResponse::read(
147                buf,
148            )?)),
149            2 => Ok(Message::GetSyncTargetRequest(GetSyncTargetRequest::read(
150                buf,
151            )?)),
152            3 => Ok(Message::GetSyncTargetResponse(GetSyncTargetResponse::read(
153                buf,
154            )?)),
155            4 => Ok(Message::Error(ErrorResponse::read(buf)?)),
156            d => Err(CodecError::InvalidEnum(d)),
157        }
158    }
159}
160
161impl Write for GetOperationsRequest {
162    fn write(&self, buf: &mut impl BufMut) {
163        self.request_id.write(buf);
164        self.size.write(buf);
165        self.start_loc.write(buf);
166        self.max_ops.get().write(buf);
167    }
168}
169
170impl EncodeSize for GetOperationsRequest {
171    fn encode_size(&self) -> usize {
172        self.request_id.encode_size()
173            + self.size.encode_size()
174            + self.start_loc.encode_size()
175            + self.max_ops.get().encode_size()
176    }
177}
178
179impl Read for GetOperationsRequest {
180    type Cfg = ();
181    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
182        let request_id = RequestId::read_cfg(buf, &())?;
183        let size = u64::read(buf)?;
184        let start_loc = u64::read(buf)?;
185        let max_ops_raw = u64::read(buf)?;
186        let max_ops = NonZeroU64::new(max_ops_raw)
187            .ok_or_else(|| CodecError::Invalid("GetOperationsRequest", "max_ops cannot be zero"))?;
188        Ok(Self {
189            request_id,
190            size,
191            start_loc,
192            max_ops,
193        })
194    }
195}
196
197impl GetOperationsRequest {
198    pub fn validate(&self) -> Result<(), crate::Error> {
199        if self.start_loc >= self.size {
200            return Err(crate::Error::InvalidRequest(format!(
201                "start_loc >= size ({}) >= ({})",
202                self.start_loc, self.size
203            )));
204        }
205        if self.max_ops.get() == 0 {
206            return Err(crate::Error::InvalidRequest(
207                "max_ops cannot be zero".to_string(),
208            ));
209        }
210        Ok(())
211    }
212}
213
214impl<Op, D> Write for GetOperationsResponse<Op, D>
215where
216    Op: Write,
217    D: Digest,
218{
219    fn write(&self, buf: &mut impl BufMut) {
220        self.request_id.write(buf);
221        self.proof.write(buf);
222        self.operations.write(buf);
223    }
224}
225
226impl<Op, D> EncodeSize for GetOperationsResponse<Op, D>
227where
228    Op: EncodeSize,
229    D: Digest,
230{
231    fn encode_size(&self) -> usize {
232        self.request_id.encode_size() + self.proof.encode_size() + self.operations.encode_size()
233    }
234}
235
236impl<Op, D> Read for GetOperationsResponse<Op, D>
237where
238    Op: Read<Cfg = ()>,
239    D: Digest,
240{
241    type Cfg = ();
242    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
243        let request_id = RequestId::read_cfg(buf, &())?;
244        let proof = Proof::<D>::read_cfg(buf, &MAX_DIGESTS)?;
245        let operations = {
246            let range_cfg = RangeCfg::from(0..=MAX_DIGESTS);
247            Vec::<Op>::read_cfg(buf, &(range_cfg, ()))?
248        };
249        Ok(Self {
250            request_id,
251            proof,
252            operations,
253        })
254    }
255}
256
257impl Write for GetSyncTargetRequest {
258    fn write(&self, buf: &mut impl BufMut) {
259        self.request_id.write(buf);
260    }
261}
262
263impl EncodeSize for GetSyncTargetRequest {
264    fn encode_size(&self) -> usize {
265        self.request_id.encode_size()
266    }
267}
268
269impl Read for GetSyncTargetRequest {
270    type Cfg = ();
271    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
272        let request_id = RequestId::read_cfg(buf, &())?;
273        Ok(Self { request_id })
274    }
275}
276
277impl<D> Write for GetSyncTargetResponse<D>
278where
279    D: Digest,
280{
281    fn write(&self, buf: &mut impl BufMut) {
282        self.request_id.write(buf);
283        self.target.write(buf);
284    }
285}
286
287impl<D> EncodeSize for GetSyncTargetResponse<D>
288where
289    D: Digest,
290{
291    fn encode_size(&self) -> usize {
292        self.request_id.encode_size() + self.target.encode_size()
293    }
294}
295
296impl<D> Read for GetSyncTargetResponse<D>
297where
298    D: Digest,
299{
300    type Cfg = ();
301    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
302        let request_id = RequestId::read_cfg(buf, &())?;
303        let target = Target::<D>::read_cfg(buf, &())?;
304        Ok(Self { request_id, target })
305    }
306}