Skip to main content

commonware_sync/net/
wire.rs

1use crate::net::{ErrorResponse, RequestId};
2use commonware_codec::{
3    DecodeExt, Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write,
4};
5use commonware_cryptography::Digest;
6use commonware_runtime::{Buf, BufMut};
7use commonware_storage::{
8    mmr::{Location, Proof},
9    qmdb::sync::Target,
10};
11use std::num::NonZeroU64;
12
13/// Maximum number of digests in a proof.
14pub const MAX_DIGESTS: usize = 10_000;
15
16/// Request for operations from the server.
17#[derive(Debug)]
18pub struct GetOperationsRequest {
19    pub request_id: RequestId,
20    pub op_count: Location,
21    pub start_loc: Location,
22    pub max_ops: NonZeroU64,
23}
24
25/// Response with operations and proof.
26#[derive(Debug)]
27pub struct GetOperationsResponse<Op, D>
28where
29    D: Digest,
30{
31    pub request_id: RequestId,
32    pub proof: Proof<D>,
33    pub operations: Vec<Op>,
34}
35
36/// Request for sync target from server.
37#[derive(Debug)]
38pub struct GetSyncTargetRequest {
39    pub request_id: RequestId,
40}
41
42/// Response with sync target.
43#[derive(Debug)]
44pub struct GetSyncTargetResponse<D>
45where
46    D: Digest,
47{
48    pub request_id: RequestId,
49    pub target: Target<D>,
50}
51
52/// Messages that can be sent over the wire.
53#[derive(Debug)]
54pub enum Message<Op, D>
55where
56    D: Digest,
57{
58    GetOperationsRequest(GetOperationsRequest),
59    GetOperationsResponse(GetOperationsResponse<Op, D>),
60    GetSyncTargetRequest(GetSyncTargetRequest),
61    GetSyncTargetResponse(GetSyncTargetResponse<D>),
62    Error(ErrorResponse),
63}
64
65impl<Op, D> Message<Op, D>
66where
67    D: Digest,
68{
69    pub const fn request_id(&self) -> RequestId {
70        match self {
71            Self::GetOperationsRequest(r) => r.request_id,
72            Self::GetOperationsResponse(r) => r.request_id,
73            Self::GetSyncTargetRequest(r) => r.request_id,
74            Self::GetSyncTargetResponse(r) => r.request_id,
75            Self::Error(e) => e.request_id,
76        }
77    }
78}
79
80impl<Op, D> super::Message for Message<Op, D>
81where
82    Op: Encode + DecodeExt<()> + Send + Sync + 'static,
83    D: Digest,
84{
85    fn request_id(&self) -> RequestId {
86        self.request_id()
87    }
88}
89
90impl<Op, D> Write for Message<Op, D>
91where
92    Op: Write,
93    D: Digest,
94{
95    fn write(&self, buf: &mut impl BufMut) {
96        match self {
97            Self::GetOperationsRequest(req) => {
98                0u8.write(buf);
99                req.write(buf);
100            }
101            Self::GetOperationsResponse(resp) => {
102                1u8.write(buf);
103                resp.write(buf);
104            }
105            Self::GetSyncTargetRequest(req) => {
106                2u8.write(buf);
107                req.write(buf);
108            }
109            Self::GetSyncTargetResponse(resp) => {
110                3u8.write(buf);
111                resp.write(buf);
112            }
113            Self::Error(err) => {
114                4u8.write(buf);
115                err.write(buf);
116            }
117        }
118    }
119}
120
121impl<Op, D> EncodeSize for Message<Op, D>
122where
123    Op: EncodeSize,
124    D: Digest,
125{
126    fn encode_size(&self) -> usize {
127        1 + match self {
128            Self::GetOperationsRequest(req) => req.encode_size(),
129            Self::GetOperationsResponse(resp) => resp.encode_size(),
130            Self::GetSyncTargetRequest(req) => req.encode_size(),
131            Self::GetSyncTargetResponse(resp) => resp.encode_size(),
132            Self::Error(err) => err.encode_size(),
133        }
134    }
135}
136
137impl<Op, D> Read for Message<Op, D>
138where
139    Op: Read<Cfg = ()>,
140    D: Digest,
141{
142    type Cfg = ();
143    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
144        let tag = u8::read(buf)?;
145        match tag {
146            0 => Ok(Self::GetOperationsRequest(GetOperationsRequest::read(buf)?)),
147            1 => Ok(Self::GetOperationsResponse(GetOperationsResponse::read(
148                buf,
149            )?)),
150            2 => Ok(Self::GetSyncTargetRequest(GetSyncTargetRequest::read(buf)?)),
151            3 => Ok(Self::GetSyncTargetResponse(GetSyncTargetResponse::read(
152                buf,
153            )?)),
154            4 => Ok(Self::Error(ErrorResponse::read(buf)?)),
155            d => Err(CodecError::InvalidEnum(d)),
156        }
157    }
158}
159
160impl Write for GetOperationsRequest {
161    fn write(&self, buf: &mut impl BufMut) {
162        self.request_id.write(buf);
163        self.op_count.write(buf);
164        self.start_loc.write(buf);
165        self.max_ops.get().write(buf);
166    }
167}
168
169impl EncodeSize for GetOperationsRequest {
170    fn encode_size(&self) -> usize {
171        self.request_id.encode_size()
172            + self.op_count.encode_size()
173            + self.start_loc.encode_size()
174            + self.max_ops.get().encode_size()
175    }
176}
177
178impl Read for GetOperationsRequest {
179    type Cfg = ();
180    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
181        let request_id = RequestId::read_cfg(buf, &())?;
182        let op_count = Location::read(buf)?;
183        let start_loc = Location::read(buf)?;
184        let max_ops = u64::read(buf)?;
185        let Some(max_ops) = NonZeroU64::new(max_ops) else {
186            return Err(CodecError::Invalid(
187                "GetOperationsRequest",
188                "max_ops cannot be zero",
189            ));
190        };
191        Ok(Self {
192            request_id,
193            op_count,
194            start_loc,
195            max_ops,
196        })
197    }
198}
199
200impl GetOperationsRequest {
201    pub fn validate(&self) -> Result<(), crate::Error> {
202        if self.start_loc >= self.op_count {
203            return Err(crate::Error::InvalidRequest(format!(
204                "start_loc >= size ({}) >= ({})",
205                self.start_loc, self.op_count
206            )));
207        }
208        Ok(())
209    }
210}
211
212impl<Op, D> Write for GetOperationsResponse<Op, D>
213where
214    Op: Write,
215    D: Digest,
216{
217    fn write(&self, buf: &mut impl BufMut) {
218        self.request_id.write(buf);
219        self.proof.write(buf);
220        self.operations.write(buf);
221    }
222}
223
224impl<Op, D> EncodeSize for GetOperationsResponse<Op, D>
225where
226    Op: EncodeSize,
227    D: Digest,
228{
229    fn encode_size(&self) -> usize {
230        self.request_id.encode_size() + self.proof.encode_size() + self.operations.encode_size()
231    }
232}
233
234impl<Op, D> Read for GetOperationsResponse<Op, D>
235where
236    Op: Read<Cfg = ()>,
237    D: Digest,
238{
239    type Cfg = ();
240    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
241        let request_id = RequestId::read_cfg(buf, &())?;
242        let proof = Proof::<D>::read_cfg(buf, &MAX_DIGESTS)?;
243        let operations = {
244            let range_cfg = RangeCfg::from(0..=MAX_DIGESTS);
245            Vec::<Op>::read_cfg(buf, &(range_cfg, ()))?
246        };
247        Ok(Self {
248            request_id,
249            proof,
250            operations,
251        })
252    }
253}
254
255impl Write for GetSyncTargetRequest {
256    fn write(&self, buf: &mut impl BufMut) {
257        self.request_id.write(buf);
258    }
259}
260
261impl EncodeSize for GetSyncTargetRequest {
262    fn encode_size(&self) -> usize {
263        self.request_id.encode_size()
264    }
265}
266
267impl Read for GetSyncTargetRequest {
268    type Cfg = ();
269    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
270        let request_id = RequestId::read_cfg(buf, &())?;
271        Ok(Self { request_id })
272    }
273}
274
275impl<D> Write for GetSyncTargetResponse<D>
276where
277    D: Digest,
278{
279    fn write(&self, buf: &mut impl BufMut) {
280        self.request_id.write(buf);
281        self.target.write(buf);
282    }
283}
284
285impl<D> EncodeSize for GetSyncTargetResponse<D>
286where
287    D: Digest,
288{
289    fn encode_size(&self) -> usize {
290        self.request_id.encode_size() + self.target.encode_size()
291    }
292}
293
294impl<D> Read for GetSyncTargetResponse<D>
295where
296    D: Digest,
297{
298    type Cfg = ();
299    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
300        let request_id = RequestId::read_cfg(buf, &())?;
301        let target = Target::<D>::read_cfg(buf, &())?;
302        Ok(Self { request_id, target })
303    }
304}