Skip to main content

commonware_sync/net/
wire.rs

1use crate::net::{ErrorResponse, RequestId};
2use commonware_codec::{
3    Encode, EncodeSize, Error as CodecError, IsUnit, RangeCfg, Read, ReadExt as _, Write,
4};
5use commonware_cryptography::Digest;
6use commonware_runtime::{Buf, BufMut};
7use commonware_storage::{
8    merkle::MAX_PINNED_NODES,
9    mmr::{self, Location, Proof},
10    qmdb::sync::{compact, compact::State, Target},
11};
12use std::num::NonZeroU64;
13
14/// Maximum number of digests in a proof.
15pub const MAX_DIGESTS: usize = 10_000;
16
17/// Request for operations from the server.
18#[derive(Debug)]
19pub struct GetOperationsRequest {
20    pub request_id: RequestId,
21    pub op_count: Location,
22    pub start_loc: Location,
23    pub max_ops: NonZeroU64,
24    pub include_pinned_nodes: bool,
25}
26
27/// Response with operations and proof.
28#[derive(Debug)]
29pub struct GetOperationsResponse<Op, D>
30where
31    D: Digest,
32{
33    pub request_id: RequestId,
34    pub proof: Proof<D>,
35    pub operations: Vec<Op>,
36    pub pinned_nodes: Option<Vec<D>>,
37}
38
39/// Request for sync target from server.
40#[derive(Debug)]
41pub struct GetSyncTargetRequest {
42    pub request_id: RequestId,
43}
44
45/// Response with sync target.
46#[derive(Debug)]
47pub struct GetSyncTargetResponse<D>
48where
49    D: Digest,
50{
51    pub request_id: RequestId,
52    pub target: Target<mmr::Family, D>,
53}
54
55/// Request for compact authenticated state.
56#[derive(Debug)]
57pub struct GetCompactTargetRequest {
58    pub request_id: RequestId,
59}
60
61/// Response with compact-sync target.
62#[derive(Debug)]
63pub struct GetCompactTargetResponse<D>
64where
65    D: Digest,
66{
67    pub request_id: RequestId,
68    pub target: compact::Target<mmr::Family, D>,
69}
70
71/// Request for compact authenticated state.
72#[derive(Debug)]
73pub struct GetCompactStateRequest<D>
74where
75    D: Digest,
76{
77    pub request_id: RequestId,
78    pub target: compact::Target<mmr::Family, D>,
79}
80
81/// Response with compact authenticated state.
82#[derive(Debug)]
83pub struct GetCompactStateResponse<Op, D>
84where
85    D: Digest,
86{
87    pub request_id: RequestId,
88    pub state: State<mmr::Family, Op, D>,
89}
90
91/// Messages that can be sent over the wire.
92#[derive(Debug)]
93pub enum Message<Op, D>
94where
95    D: Digest,
96{
97    GetOperationsRequest(GetOperationsRequest),
98    GetOperationsResponse(GetOperationsResponse<Op, D>),
99    GetSyncTargetRequest(GetSyncTargetRequest),
100    GetSyncTargetResponse(GetSyncTargetResponse<D>),
101    GetCompactTargetRequest(GetCompactTargetRequest),
102    GetCompactTargetResponse(GetCompactTargetResponse<D>),
103    GetCompactStateRequest(GetCompactStateRequest<D>),
104    GetCompactStateResponse(GetCompactStateResponse<Op, D>),
105    Error(ErrorResponse),
106}
107
108impl<Op, D> Message<Op, D>
109where
110    D: Digest,
111{
112    pub const fn request_id(&self) -> RequestId {
113        match self {
114            Self::GetOperationsRequest(r) => r.request_id,
115            Self::GetOperationsResponse(r) => r.request_id,
116            Self::GetSyncTargetRequest(r) => r.request_id,
117            Self::GetSyncTargetResponse(r) => r.request_id,
118            Self::GetCompactTargetRequest(r) => r.request_id,
119            Self::GetCompactTargetResponse(r) => r.request_id,
120            Self::GetCompactStateRequest(r) => r.request_id,
121            Self::GetCompactStateResponse(r) => r.request_id,
122            Self::Error(e) => e.request_id,
123        }
124    }
125}
126
127impl<Op, D> super::Message for Message<Op, D>
128where
129    Op: Encode + Read + Send + Sync + 'static,
130    Op::Cfg: IsUnit,
131    D: Digest,
132{
133    fn request_id(&self) -> RequestId {
134        self.request_id()
135    }
136}
137
138impl<Op, D> Write for Message<Op, D>
139where
140    Op: Write,
141    D: Digest,
142{
143    fn write(&self, buf: &mut impl BufMut) {
144        match self {
145            Self::GetOperationsRequest(req) => {
146                0u8.write(buf);
147                req.write(buf);
148            }
149            Self::GetOperationsResponse(resp) => {
150                1u8.write(buf);
151                resp.write(buf);
152            }
153            Self::GetSyncTargetRequest(req) => {
154                2u8.write(buf);
155                req.write(buf);
156            }
157            Self::GetSyncTargetResponse(resp) => {
158                3u8.write(buf);
159                resp.write(buf);
160            }
161            Self::GetCompactTargetRequest(req) => {
162                4u8.write(buf);
163                req.write(buf);
164            }
165            Self::GetCompactTargetResponse(resp) => {
166                5u8.write(buf);
167                resp.write(buf);
168            }
169            Self::GetCompactStateRequest(req) => {
170                6u8.write(buf);
171                req.write(buf);
172            }
173            Self::GetCompactStateResponse(resp) => {
174                7u8.write(buf);
175                resp.write(buf);
176            }
177            Self::Error(err) => {
178                8u8.write(buf);
179                err.write(buf);
180            }
181        }
182    }
183}
184
185impl<Op, D> EncodeSize for Message<Op, D>
186where
187    Op: EncodeSize,
188    D: Digest,
189{
190    fn encode_size(&self) -> usize {
191        1 + match self {
192            Self::GetOperationsRequest(req) => req.encode_size(),
193            Self::GetOperationsResponse(resp) => resp.encode_size(),
194            Self::GetSyncTargetRequest(req) => req.encode_size(),
195            Self::GetSyncTargetResponse(resp) => resp.encode_size(),
196            Self::GetCompactTargetRequest(req) => req.encode_size(),
197            Self::GetCompactTargetResponse(resp) => resp.encode_size(),
198            Self::GetCompactStateRequest(req) => req.encode_size(),
199            Self::GetCompactStateResponse(resp) => resp.encode_size(),
200            Self::Error(err) => err.encode_size(),
201        }
202    }
203}
204
205impl<Op, D> Read for Message<Op, D>
206where
207    Op: Read,
208    Op::Cfg: IsUnit,
209    D: Digest,
210{
211    type Cfg = ();
212    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
213        let tag = u8::read(buf)?;
214        match tag {
215            0 => Ok(Self::GetOperationsRequest(GetOperationsRequest::read(buf)?)),
216            1 => Ok(Self::GetOperationsResponse(GetOperationsResponse::read(
217                buf,
218            )?)),
219            2 => Ok(Self::GetSyncTargetRequest(GetSyncTargetRequest::read(buf)?)),
220            3 => Ok(Self::GetSyncTargetResponse(GetSyncTargetResponse::read(
221                buf,
222            )?)),
223            4 => Ok(Self::GetCompactTargetRequest(
224                GetCompactTargetRequest::read(buf)?,
225            )),
226            5 => Ok(Self::GetCompactTargetResponse(
227                GetCompactTargetResponse::read(buf)?,
228            )),
229            6 => Ok(Self::GetCompactStateRequest(GetCompactStateRequest::read(
230                buf,
231            )?)),
232            7 => Ok(Self::GetCompactStateResponse(
233                GetCompactStateResponse::read(buf)?,
234            )),
235            8 => Ok(Self::Error(ErrorResponse::read(buf)?)),
236            d => Err(CodecError::InvalidEnum(d)),
237        }
238    }
239}
240
241impl Write for GetOperationsRequest {
242    fn write(&self, buf: &mut impl BufMut) {
243        self.request_id.write(buf);
244        self.op_count.write(buf);
245        self.start_loc.write(buf);
246        self.max_ops.get().write(buf);
247        (self.include_pinned_nodes as u8).write(buf);
248    }
249}
250
251impl EncodeSize for GetOperationsRequest {
252    fn encode_size(&self) -> usize {
253        self.request_id.encode_size()
254            + self.op_count.encode_size()
255            + self.start_loc.encode_size()
256            + self.max_ops.get().encode_size()
257            + 1u8.encode_size()
258    }
259}
260
261impl Read for GetOperationsRequest {
262    type Cfg = ();
263    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
264        let request_id = RequestId::read_cfg(buf, &())?;
265        let op_count = Location::read(buf)?;
266        let start_loc = Location::read(buf)?;
267        let max_ops = u64::read(buf)?;
268        let Some(max_ops) = NonZeroU64::new(max_ops) else {
269            return Err(CodecError::Invalid(
270                "GetOperationsRequest",
271                "max_ops cannot be zero",
272            ));
273        };
274        let include_pinned_nodes = u8::read(buf)? != 0;
275        Ok(Self {
276            request_id,
277            op_count,
278            start_loc,
279            max_ops,
280            include_pinned_nodes,
281        })
282    }
283}
284
285impl GetOperationsRequest {
286    pub fn validate(&self) -> Result<(), crate::Error> {
287        if self.start_loc >= self.op_count {
288            return Err(crate::Error::InvalidRequest(format!(
289                "start_loc >= size ({}) >= ({})",
290                self.start_loc, self.op_count
291            )));
292        }
293        Ok(())
294    }
295}
296
297impl<Op, D> Write for GetOperationsResponse<Op, D>
298where
299    Op: Write,
300    D: Digest,
301{
302    fn write(&self, buf: &mut impl BufMut) {
303        self.request_id.write(buf);
304        self.proof.write(buf);
305        self.operations.write(buf);
306        match &self.pinned_nodes {
307            Some(nodes) => {
308                1u8.write(buf);
309                nodes.write(buf);
310            }
311            None => {
312                0u8.write(buf);
313            }
314        }
315    }
316}
317
318impl<Op, D> EncodeSize for GetOperationsResponse<Op, D>
319where
320    Op: EncodeSize,
321    D: Digest,
322{
323    fn encode_size(&self) -> usize {
324        self.request_id.encode_size()
325            + self.proof.encode_size()
326            + self.operations.encode_size()
327            + 1u8.encode_size()
328            + self
329                .pinned_nodes
330                .as_ref()
331                .map_or(0, |nodes| nodes.encode_size())
332    }
333}
334
335impl<Op, D> Read for GetOperationsResponse<Op, D>
336where
337    Op: Read,
338    Op::Cfg: IsUnit,
339    D: Digest,
340{
341    type Cfg = ();
342    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
343        let request_id = RequestId::read_cfg(buf, &())?;
344        let proof = Proof::<D>::read_cfg(buf, &MAX_DIGESTS)?;
345        let operations = {
346            let range_cfg = RangeCfg::from(0..=MAX_DIGESTS);
347            Vec::<Op>::read_cfg(buf, &(range_cfg, Op::Cfg::default()))?
348        };
349        let has_pinned_nodes = u8::read(buf)? != 0;
350        let pinned_nodes = if has_pinned_nodes {
351            let range_cfg = RangeCfg::from(0..=MAX_PINNED_NODES);
352            Some(Vec::<D>::read_cfg(buf, &(range_cfg, ()))?)
353        } else {
354            None
355        };
356        Ok(Self {
357            request_id,
358            proof,
359            operations,
360            pinned_nodes,
361        })
362    }
363}
364
365impl Write for GetSyncTargetRequest {
366    fn write(&self, buf: &mut impl BufMut) {
367        self.request_id.write(buf);
368    }
369}
370
371impl EncodeSize for GetSyncTargetRequest {
372    fn encode_size(&self) -> usize {
373        self.request_id.encode_size()
374    }
375}
376
377impl Read for GetSyncTargetRequest {
378    type Cfg = ();
379    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
380        let request_id = RequestId::read_cfg(buf, &())?;
381        Ok(Self { request_id })
382    }
383}
384
385impl<D> Write for GetSyncTargetResponse<D>
386where
387    D: Digest,
388{
389    fn write(&self, buf: &mut impl BufMut) {
390        self.request_id.write(buf);
391        self.target.write(buf);
392    }
393}
394
395impl<D> EncodeSize for GetSyncTargetResponse<D>
396where
397    D: Digest,
398{
399    fn encode_size(&self) -> usize {
400        self.request_id.encode_size() + self.target.encode_size()
401    }
402}
403
404impl<D> Read for GetSyncTargetResponse<D>
405where
406    D: Digest,
407{
408    type Cfg = ();
409    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
410        let request_id = RequestId::read_cfg(buf, &())?;
411        let target = Target::<mmr::Family, D>::read_cfg(buf, &())?;
412        Ok(Self { request_id, target })
413    }
414}
415
416impl<D> Write for GetCompactStateRequest<D>
417where
418    D: Digest,
419{
420    fn write(&self, buf: &mut impl BufMut) {
421        self.request_id.write(buf);
422        self.target.write(buf);
423    }
424}
425
426impl<D> EncodeSize for GetCompactStateRequest<D>
427where
428    D: Digest,
429{
430    fn encode_size(&self) -> usize {
431        self.request_id.encode_size() + self.target.encode_size()
432    }
433}
434
435impl<D> Read for GetCompactStateRequest<D>
436where
437    D: Digest,
438{
439    type Cfg = ();
440    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
441        let request_id = RequestId::read_cfg(buf, &())?;
442        let target = compact::Target::<mmr::Family, D>::read_cfg(buf, &())?;
443        Ok(Self { request_id, target })
444    }
445}
446
447impl Write for GetCompactTargetRequest {
448    fn write(&self, buf: &mut impl BufMut) {
449        self.request_id.write(buf);
450    }
451}
452
453impl EncodeSize for GetCompactTargetRequest {
454    fn encode_size(&self) -> usize {
455        self.request_id.encode_size()
456    }
457}
458
459impl Read for GetCompactTargetRequest {
460    type Cfg = ();
461    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
462        let request_id = RequestId::read_cfg(buf, &())?;
463        Ok(Self { request_id })
464    }
465}
466
467impl<D> Write for GetCompactTargetResponse<D>
468where
469    D: Digest,
470{
471    fn write(&self, buf: &mut impl BufMut) {
472        self.request_id.write(buf);
473        self.target.write(buf);
474    }
475}
476
477impl<D> EncodeSize for GetCompactTargetResponse<D>
478where
479    D: Digest,
480{
481    fn encode_size(&self) -> usize {
482        self.request_id.encode_size() + self.target.encode_size()
483    }
484}
485
486impl<D> Read for GetCompactTargetResponse<D>
487where
488    D: Digest,
489{
490    type Cfg = ();
491    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
492        let request_id = RequestId::read_cfg(buf, &())?;
493        let target = compact::Target::<mmr::Family, D>::read_cfg(buf, &())?;
494        Ok(Self { request_id, target })
495    }
496}
497
498impl<Op, D> Write for GetCompactStateResponse<Op, D>
499where
500    Op: Write,
501    D: Digest,
502{
503    fn write(&self, buf: &mut impl BufMut) {
504        self.request_id.write(buf);
505        self.state.write(buf);
506    }
507}
508
509impl<Op, D> EncodeSize for GetCompactStateResponse<Op, D>
510where
511    Op: EncodeSize,
512    D: Digest,
513{
514    fn encode_size(&self) -> usize {
515        self.request_id.encode_size() + self.state.encode_size()
516    }
517}
518
519impl<Op, D> Read for GetCompactStateResponse<Op, D>
520where
521    Op: Read,
522    Op::Cfg: IsUnit,
523    D: Digest,
524{
525    type Cfg = ();
526    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
527        let request_id = RequestId::read_cfg(buf, &())?;
528        let state = State::<mmr::Family, Op, D>::read_cfg(
529            buf,
530            &(
531                RangeCfg::from(0..=MAX_PINNED_NODES),
532                Op::Cfg::default(),
533                MAX_DIGESTS,
534            ),
535        )?;
536        Ok(Self { request_id, state })
537    }
538}