commonware_sync/net/
wire.rs

1use crate::net::{ErrorResponse, RequestId};
2use bytes::{Buf, BufMut};
3use commonware_codec::{
4    DecodeExt, Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write,
5};
6use commonware_cryptography::Digest;
7use commonware_storage::{
8    adb::sync::Target,
9    mmr::{Location, Proof},
10};
11use std::num::NonZeroU64;
12
13/// Maximum number of digests in a proof.
14pub const MAX_DIGESTS: usize = 10_000;
15
16/// Request for operations from the server.
17#[derive(Debug)]
18pub struct GetOperationsRequest {
19    pub request_id: RequestId,
20    pub op_count: Location,
21    pub start_loc: Location,
22    pub max_ops: NonZeroU64,
23}
24
25/// Response with operations and proof.
26#[derive(Debug)]
27pub struct GetOperationsResponse<Op, D>
28where
29    D: Digest,
30{
31    pub request_id: RequestId,
32    pub proof: Proof<D>,
33    pub operations: Vec<Op>,
34}
35
36/// Request for sync target from server.
37#[derive(Debug)]
38pub struct GetSyncTargetRequest {
39    pub request_id: RequestId,
40}
41
42/// Response with sync target.
43#[derive(Debug)]
44pub struct GetSyncTargetResponse<D>
45where
46    D: Digest,
47{
48    pub request_id: RequestId,
49    pub target: Target<D>,
50}
51
52/// Messages that can be sent over the wire.
53#[derive(Debug)]
54pub enum Message<Op, D>
55where
56    D: Digest,
57{
58    GetOperationsRequest(GetOperationsRequest),
59    GetOperationsResponse(GetOperationsResponse<Op, D>),
60    GetSyncTargetRequest(GetSyncTargetRequest),
61    GetSyncTargetResponse(GetSyncTargetResponse<D>),
62    Error(ErrorResponse),
63}
64
65impl<Op, D> Message<Op, D>
66where
67    D: Digest,
68{
69    pub fn request_id(&self) -> RequestId {
70        match self {
71            Message::GetOperationsRequest(r) => r.request_id,
72            Message::GetOperationsResponse(r) => r.request_id,
73            Message::GetSyncTargetRequest(r) => r.request_id,
74            Message::GetSyncTargetResponse(r) => r.request_id,
75            Message::Error(e) => e.request_id,
76        }
77    }
78}
79
80impl<Op, D> super::Message for Message<Op, D>
81where
82    Op: Encode + DecodeExt<()> + Send + Sync + 'static,
83    D: Digest,
84{
85    fn request_id(&self) -> RequestId {
86        self.request_id()
87    }
88}
89
90impl<Op, D> Write for Message<Op, D>
91where
92    Op: Write,
93    D: Digest,
94{
95    fn write(&self, buf: &mut impl BufMut) {
96        match self {
97            Message::GetOperationsRequest(req) => {
98                0u8.write(buf);
99                req.write(buf);
100            }
101            Message::GetOperationsResponse(resp) => {
102                1u8.write(buf);
103                resp.write(buf);
104            }
105            Message::GetSyncTargetRequest(req) => {
106                2u8.write(buf);
107                req.write(buf);
108            }
109            Message::GetSyncTargetResponse(resp) => {
110                3u8.write(buf);
111                resp.write(buf);
112            }
113            Message::Error(err) => {
114                4u8.write(buf);
115                err.write(buf);
116            }
117        }
118    }
119}
120
121impl<Op, D> EncodeSize for Message<Op, D>
122where
123    Op: EncodeSize,
124    D: Digest,
125{
126    fn encode_size(&self) -> usize {
127        1 + match self {
128            Message::GetOperationsRequest(req) => req.encode_size(),
129            Message::GetOperationsResponse(resp) => resp.encode_size(),
130            Message::GetSyncTargetRequest(req) => req.encode_size(),
131            Message::GetSyncTargetResponse(resp) => resp.encode_size(),
132            Message::Error(err) => err.encode_size(),
133        }
134    }
135}
136
137impl<Op, D> Read for Message<Op, D>
138where
139    Op: Read<Cfg = ()>,
140    D: Digest,
141{
142    type Cfg = ();
143    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
144        let tag = u8::read(buf)?;
145        match tag {
146            0 => Ok(Message::GetOperationsRequest(GetOperationsRequest::read(
147                buf,
148            )?)),
149            1 => Ok(Message::GetOperationsResponse(GetOperationsResponse::read(
150                buf,
151            )?)),
152            2 => Ok(Message::GetSyncTargetRequest(GetSyncTargetRequest::read(
153                buf,
154            )?)),
155            3 => Ok(Message::GetSyncTargetResponse(GetSyncTargetResponse::read(
156                buf,
157            )?)),
158            4 => Ok(Message::Error(ErrorResponse::read(buf)?)),
159            d => Err(CodecError::InvalidEnum(d)),
160        }
161    }
162}
163
164impl Write for GetOperationsRequest {
165    fn write(&self, buf: &mut impl BufMut) {
166        self.request_id.write(buf);
167        self.op_count.write(buf);
168        self.start_loc.write(buf);
169        self.max_ops.get().write(buf);
170    }
171}
172
173impl EncodeSize for GetOperationsRequest {
174    fn encode_size(&self) -> usize {
175        self.request_id.encode_size()
176            + self.op_count.encode_size()
177            + self.start_loc.encode_size()
178            + self.max_ops.get().encode_size()
179    }
180}
181
182impl Read for GetOperationsRequest {
183    type Cfg = ();
184    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
185        let request_id = RequestId::read_cfg(buf, &())?;
186        let op_count = u64::read(buf)?;
187        let Some(op_count) = Location::new(op_count) else {
188            return Err(CodecError::Invalid(
189                "GetOperationsRequest",
190                "op_count exceeds MAX_LOCATION",
191            ));
192        };
193        let start_loc = u64::read(buf)?;
194        let Some(start_loc) = Location::new(start_loc) else {
195            return Err(CodecError::Invalid(
196                "GetOperationsRequest",
197                "start_loc exceeds MAX_LOCATION",
198            ));
199        };
200        let max_ops = u64::read(buf)?;
201        let Some(max_ops) = NonZeroU64::new(max_ops) else {
202            return Err(CodecError::Invalid(
203                "GetOperationsRequest",
204                "max_ops cannot be zero",
205            ));
206        };
207        Ok(Self {
208            request_id,
209            op_count,
210            start_loc,
211            max_ops,
212        })
213    }
214}
215
216impl GetOperationsRequest {
217    pub fn validate(&self) -> Result<(), crate::Error> {
218        if self.start_loc >= self.op_count {
219            return Err(crate::Error::InvalidRequest(format!(
220                "start_loc >= size ({}) >= ({})",
221                self.start_loc, self.op_count
222            )));
223        }
224        Ok(())
225    }
226}
227
228impl<Op, D> Write for GetOperationsResponse<Op, D>
229where
230    Op: Write,
231    D: Digest,
232{
233    fn write(&self, buf: &mut impl BufMut) {
234        self.request_id.write(buf);
235        self.proof.write(buf);
236        self.operations.write(buf);
237    }
238}
239
240impl<Op, D> EncodeSize for GetOperationsResponse<Op, D>
241where
242    Op: EncodeSize,
243    D: Digest,
244{
245    fn encode_size(&self) -> usize {
246        self.request_id.encode_size() + self.proof.encode_size() + self.operations.encode_size()
247    }
248}
249
250impl<Op, D> Read for GetOperationsResponse<Op, D>
251where
252    Op: Read<Cfg = ()>,
253    D: Digest,
254{
255    type Cfg = ();
256    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
257        let request_id = RequestId::read_cfg(buf, &())?;
258        let proof = Proof::<D>::read_cfg(buf, &MAX_DIGESTS)?;
259        let operations = {
260            let range_cfg = RangeCfg::from(0..=MAX_DIGESTS);
261            Vec::<Op>::read_cfg(buf, &(range_cfg, ()))?
262        };
263        Ok(Self {
264            request_id,
265            proof,
266            operations,
267        })
268    }
269}
270
271impl Write for GetSyncTargetRequest {
272    fn write(&self, buf: &mut impl BufMut) {
273        self.request_id.write(buf);
274    }
275}
276
277impl EncodeSize for GetSyncTargetRequest {
278    fn encode_size(&self) -> usize {
279        self.request_id.encode_size()
280    }
281}
282
283impl Read for GetSyncTargetRequest {
284    type Cfg = ();
285    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
286        let request_id = RequestId::read_cfg(buf, &())?;
287        Ok(Self { request_id })
288    }
289}
290
291impl<D> Write for GetSyncTargetResponse<D>
292where
293    D: Digest,
294{
295    fn write(&self, buf: &mut impl BufMut) {
296        self.request_id.write(buf);
297        self.target.write(buf);
298    }
299}
300
301impl<D> EncodeSize for GetSyncTargetResponse<D>
302where
303    D: Digest,
304{
305    fn encode_size(&self) -> usize {
306        self.request_id.encode_size() + self.target.encode_size()
307    }
308}
309
310impl<D> Read for GetSyncTargetResponse<D>
311where
312    D: Digest,
313{
314    type Cfg = ();
315    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
316        let request_id = RequestId::read_cfg(buf, &())?;
317        let target = Target::<D>::read_cfg(buf, &())?;
318        Ok(Self { request_id, target })
319    }
320}