commonware_sync/net/
wire.rs

1use crate::net::{ErrorResponse, RequestId};
2use bytes::{Buf, BufMut};
3use commonware_codec::{
4    DecodeExt, Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write,
5};
6use commonware_cryptography::Digest;
7use commonware_storage::{
8    mmr::{Location, Proof},
9    qmdb::sync::Target,
10};
11use std::num::NonZeroU64;
12
13/// Maximum number of digests in a proof.
14pub const MAX_DIGESTS: usize = 10_000;
15
16/// Request for operations from the server.
17#[derive(Debug)]
18pub struct GetOperationsRequest {
19    pub request_id: RequestId,
20    pub op_count: Location,
21    pub start_loc: Location,
22    pub max_ops: NonZeroU64,
23}
24
25/// Response with operations and proof.
26#[derive(Debug)]
27pub struct GetOperationsResponse<Op, D>
28where
29    D: Digest,
30{
31    pub request_id: RequestId,
32    pub proof: Proof<D>,
33    pub operations: Vec<Op>,
34}
35
36/// Request for sync target from server.
37#[derive(Debug)]
38pub struct GetSyncTargetRequest {
39    pub request_id: RequestId,
40}
41
42/// Response with sync target.
43#[derive(Debug)]
44pub struct GetSyncTargetResponse<D>
45where
46    D: Digest,
47{
48    pub request_id: RequestId,
49    pub target: Target<D>,
50}
51
52/// Messages that can be sent over the wire.
53#[derive(Debug)]
54pub enum Message<Op, D>
55where
56    D: Digest,
57{
58    GetOperationsRequest(GetOperationsRequest),
59    GetOperationsResponse(GetOperationsResponse<Op, D>),
60    GetSyncTargetRequest(GetSyncTargetRequest),
61    GetSyncTargetResponse(GetSyncTargetResponse<D>),
62    Error(ErrorResponse),
63}
64
65impl<Op, D> Message<Op, D>
66where
67    D: Digest,
68{
69    pub const fn request_id(&self) -> RequestId {
70        match self {
71            Self::GetOperationsRequest(r) => r.request_id,
72            Self::GetOperationsResponse(r) => r.request_id,
73            Self::GetSyncTargetRequest(r) => r.request_id,
74            Self::GetSyncTargetResponse(r) => r.request_id,
75            Self::Error(e) => e.request_id,
76        }
77    }
78}
79
80impl<Op, D> super::Message for Message<Op, D>
81where
82    Op: Encode + DecodeExt<()> + Send + Sync + 'static,
83    D: Digest,
84{
85    fn request_id(&self) -> RequestId {
86        self.request_id()
87    }
88}
89
90impl<Op, D> Write for Message<Op, D>
91where
92    Op: Write,
93    D: Digest,
94{
95    fn write(&self, buf: &mut impl BufMut) {
96        match self {
97            Self::GetOperationsRequest(req) => {
98                0u8.write(buf);
99                req.write(buf);
100            }
101            Self::GetOperationsResponse(resp) => {
102                1u8.write(buf);
103                resp.write(buf);
104            }
105            Self::GetSyncTargetRequest(req) => {
106                2u8.write(buf);
107                req.write(buf);
108            }
109            Self::GetSyncTargetResponse(resp) => {
110                3u8.write(buf);
111                resp.write(buf);
112            }
113            Self::Error(err) => {
114                4u8.write(buf);
115                err.write(buf);
116            }
117        }
118    }
119}
120
121impl<Op, D> EncodeSize for Message<Op, D>
122where
123    Op: EncodeSize,
124    D: Digest,
125{
126    fn encode_size(&self) -> usize {
127        1 + match self {
128            Self::GetOperationsRequest(req) => req.encode_size(),
129            Self::GetOperationsResponse(resp) => resp.encode_size(),
130            Self::GetSyncTargetRequest(req) => req.encode_size(),
131            Self::GetSyncTargetResponse(resp) => resp.encode_size(),
132            Self::Error(err) => err.encode_size(),
133        }
134    }
135}
136
137impl<Op, D> Read for Message<Op, D>
138where
139    Op: Read<Cfg = ()>,
140    D: Digest,
141{
142    type Cfg = ();
143    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
144        let tag = u8::read(buf)?;
145        match tag {
146            0 => Ok(Self::GetOperationsRequest(GetOperationsRequest::read(buf)?)),
147            1 => Ok(Self::GetOperationsResponse(GetOperationsResponse::read(
148                buf,
149            )?)),
150            2 => Ok(Self::GetSyncTargetRequest(GetSyncTargetRequest::read(buf)?)),
151            3 => Ok(Self::GetSyncTargetResponse(GetSyncTargetResponse::read(
152                buf,
153            )?)),
154            4 => Ok(Self::Error(ErrorResponse::read(buf)?)),
155            d => Err(CodecError::InvalidEnum(d)),
156        }
157    }
158}
159
160impl Write for GetOperationsRequest {
161    fn write(&self, buf: &mut impl BufMut) {
162        self.request_id.write(buf);
163        self.op_count.write(buf);
164        self.start_loc.write(buf);
165        self.max_ops.get().write(buf);
166    }
167}
168
169impl EncodeSize for GetOperationsRequest {
170    fn encode_size(&self) -> usize {
171        self.request_id.encode_size()
172            + self.op_count.encode_size()
173            + self.start_loc.encode_size()
174            + self.max_ops.get().encode_size()
175    }
176}
177
178impl Read for GetOperationsRequest {
179    type Cfg = ();
180    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
181        let request_id = RequestId::read_cfg(buf, &())?;
182        let op_count = u64::read(buf)?;
183        let Some(op_count) = Location::new(op_count) else {
184            return Err(CodecError::Invalid(
185                "GetOperationsRequest",
186                "op_count exceeds MAX_LOCATION",
187            ));
188        };
189        let start_loc = u64::read(buf)?;
190        let Some(start_loc) = Location::new(start_loc) else {
191            return Err(CodecError::Invalid(
192                "GetOperationsRequest",
193                "start_loc exceeds MAX_LOCATION",
194            ));
195        };
196        let max_ops = u64::read(buf)?;
197        let Some(max_ops) = NonZeroU64::new(max_ops) else {
198            return Err(CodecError::Invalid(
199                "GetOperationsRequest",
200                "max_ops cannot be zero",
201            ));
202        };
203        Ok(Self {
204            request_id,
205            op_count,
206            start_loc,
207            max_ops,
208        })
209    }
210}
211
212impl GetOperationsRequest {
213    pub fn validate(&self) -> Result<(), crate::Error> {
214        if self.start_loc >= self.op_count {
215            return Err(crate::Error::InvalidRequest(format!(
216                "start_loc >= size ({}) >= ({})",
217                self.start_loc, self.op_count
218            )));
219        }
220        Ok(())
221    }
222}
223
224impl<Op, D> Write for GetOperationsResponse<Op, D>
225where
226    Op: Write,
227    D: Digest,
228{
229    fn write(&self, buf: &mut impl BufMut) {
230        self.request_id.write(buf);
231        self.proof.write(buf);
232        self.operations.write(buf);
233    }
234}
235
236impl<Op, D> EncodeSize for GetOperationsResponse<Op, D>
237where
238    Op: EncodeSize,
239    D: Digest,
240{
241    fn encode_size(&self) -> usize {
242        self.request_id.encode_size() + self.proof.encode_size() + self.operations.encode_size()
243    }
244}
245
246impl<Op, D> Read for GetOperationsResponse<Op, D>
247where
248    Op: Read<Cfg = ()>,
249    D: Digest,
250{
251    type Cfg = ();
252    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
253        let request_id = RequestId::read_cfg(buf, &())?;
254        let proof = Proof::<D>::read_cfg(buf, &MAX_DIGESTS)?;
255        let operations = {
256            let range_cfg = RangeCfg::from(0..=MAX_DIGESTS);
257            Vec::<Op>::read_cfg(buf, &(range_cfg, ()))?
258        };
259        Ok(Self {
260            request_id,
261            proof,
262            operations,
263        })
264    }
265}
266
267impl Write for GetSyncTargetRequest {
268    fn write(&self, buf: &mut impl BufMut) {
269        self.request_id.write(buf);
270    }
271}
272
273impl EncodeSize for GetSyncTargetRequest {
274    fn encode_size(&self) -> usize {
275        self.request_id.encode_size()
276    }
277}
278
279impl Read for GetSyncTargetRequest {
280    type Cfg = ();
281    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
282        let request_id = RequestId::read_cfg(buf, &())?;
283        Ok(Self { request_id })
284    }
285}
286
287impl<D> Write for GetSyncTargetResponse<D>
288where
289    D: Digest,
290{
291    fn write(&self, buf: &mut impl BufMut) {
292        self.request_id.write(buf);
293        self.target.write(buf);
294    }
295}
296
297impl<D> EncodeSize for GetSyncTargetResponse<D>
298where
299    D: Digest,
300{
301    fn encode_size(&self) -> usize {
302        self.request_id.encode_size() + self.target.encode_size()
303    }
304}
305
306impl<D> Read for GetSyncTargetResponse<D>
307where
308    D: Digest,
309{
310    type Cfg = ();
311    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
312        let request_id = RequestId::read_cfg(buf, &())?;
313        let target = Target::<D>::read_cfg(buf, &())?;
314        Ok(Self { request_id, target })
315    }
316}