1use crate::net::{ErrorResponse, RequestId};
2use bytes::{Buf, BufMut};
3use commonware_codec::{
4 DecodeExt, Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write,
5};
6use commonware_cryptography::Digest;
7use commonware_storage::{
8 adb::sync::Target,
9 mmr::{Location, Proof},
10};
11use std::num::NonZeroU64;
12
13pub const MAX_DIGESTS: usize = 10_000;
15
16#[derive(Debug)]
18pub struct GetOperationsRequest {
19 pub request_id: RequestId,
20 pub op_count: Location,
21 pub start_loc: Location,
22 pub max_ops: NonZeroU64,
23}
24
25#[derive(Debug)]
27pub struct GetOperationsResponse<Op, D>
28where
29 D: Digest,
30{
31 pub request_id: RequestId,
32 pub proof: Proof<D>,
33 pub operations: Vec<Op>,
34}
35
36#[derive(Debug)]
38pub struct GetSyncTargetRequest {
39 pub request_id: RequestId,
40}
41
42#[derive(Debug)]
44pub struct GetSyncTargetResponse<D>
45where
46 D: Digest,
47{
48 pub request_id: RequestId,
49 pub target: Target<D>,
50}
51
52#[derive(Debug)]
54pub enum Message<Op, D>
55where
56 D: Digest,
57{
58 GetOperationsRequest(GetOperationsRequest),
59 GetOperationsResponse(GetOperationsResponse<Op, D>),
60 GetSyncTargetRequest(GetSyncTargetRequest),
61 GetSyncTargetResponse(GetSyncTargetResponse<D>),
62 Error(ErrorResponse),
63}
64
65impl<Op, D> Message<Op, D>
66where
67 D: Digest,
68{
69 pub fn request_id(&self) -> RequestId {
70 match self {
71 Message::GetOperationsRequest(r) => r.request_id,
72 Message::GetOperationsResponse(r) => r.request_id,
73 Message::GetSyncTargetRequest(r) => r.request_id,
74 Message::GetSyncTargetResponse(r) => r.request_id,
75 Message::Error(e) => e.request_id,
76 }
77 }
78}
79
80impl<Op, D> super::Message for Message<Op, D>
81where
82 Op: Encode + DecodeExt<()> + Send + Sync + 'static,
83 D: Digest,
84{
85 fn request_id(&self) -> RequestId {
86 self.request_id()
87 }
88}
89
90impl<Op, D> Write for Message<Op, D>
91where
92 Op: Write,
93 D: Digest,
94{
95 fn write(&self, buf: &mut impl BufMut) {
96 match self {
97 Message::GetOperationsRequest(req) => {
98 0u8.write(buf);
99 req.write(buf);
100 }
101 Message::GetOperationsResponse(resp) => {
102 1u8.write(buf);
103 resp.write(buf);
104 }
105 Message::GetSyncTargetRequest(req) => {
106 2u8.write(buf);
107 req.write(buf);
108 }
109 Message::GetSyncTargetResponse(resp) => {
110 3u8.write(buf);
111 resp.write(buf);
112 }
113 Message::Error(err) => {
114 4u8.write(buf);
115 err.write(buf);
116 }
117 }
118 }
119}
120
121impl<Op, D> EncodeSize for Message<Op, D>
122where
123 Op: EncodeSize,
124 D: Digest,
125{
126 fn encode_size(&self) -> usize {
127 1 + match self {
128 Message::GetOperationsRequest(req) => req.encode_size(),
129 Message::GetOperationsResponse(resp) => resp.encode_size(),
130 Message::GetSyncTargetRequest(req) => req.encode_size(),
131 Message::GetSyncTargetResponse(resp) => resp.encode_size(),
132 Message::Error(err) => err.encode_size(),
133 }
134 }
135}
136
137impl<Op, D> Read for Message<Op, D>
138where
139 Op: Read<Cfg = ()>,
140 D: Digest,
141{
142 type Cfg = ();
143 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
144 let tag = u8::read(buf)?;
145 match tag {
146 0 => Ok(Message::GetOperationsRequest(GetOperationsRequest::read(
147 buf,
148 )?)),
149 1 => Ok(Message::GetOperationsResponse(GetOperationsResponse::read(
150 buf,
151 )?)),
152 2 => Ok(Message::GetSyncTargetRequest(GetSyncTargetRequest::read(
153 buf,
154 )?)),
155 3 => Ok(Message::GetSyncTargetResponse(GetSyncTargetResponse::read(
156 buf,
157 )?)),
158 4 => Ok(Message::Error(ErrorResponse::read(buf)?)),
159 d => Err(CodecError::InvalidEnum(d)),
160 }
161 }
162}
163
164impl Write for GetOperationsRequest {
165 fn write(&self, buf: &mut impl BufMut) {
166 self.request_id.write(buf);
167 self.op_count.write(buf);
168 self.start_loc.write(buf);
169 self.max_ops.get().write(buf);
170 }
171}
172
173impl EncodeSize for GetOperationsRequest {
174 fn encode_size(&self) -> usize {
175 self.request_id.encode_size()
176 + self.op_count.encode_size()
177 + self.start_loc.encode_size()
178 + self.max_ops.get().encode_size()
179 }
180}
181
182impl Read for GetOperationsRequest {
183 type Cfg = ();
184 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
185 let request_id = RequestId::read_cfg(buf, &())?;
186 let op_count = u64::read(buf)?;
187 let Some(op_count) = Location::new(op_count) else {
188 return Err(CodecError::Invalid(
189 "GetOperationsRequest",
190 "op_count exceeds MAX_LOCATION",
191 ));
192 };
193 let start_loc = u64::read(buf)?;
194 let Some(start_loc) = Location::new(start_loc) else {
195 return Err(CodecError::Invalid(
196 "GetOperationsRequest",
197 "start_loc exceeds MAX_LOCATION",
198 ));
199 };
200 let max_ops = u64::read(buf)?;
201 let Some(max_ops) = NonZeroU64::new(max_ops) else {
202 return Err(CodecError::Invalid(
203 "GetOperationsRequest",
204 "max_ops cannot be zero",
205 ));
206 };
207 Ok(Self {
208 request_id,
209 op_count,
210 start_loc,
211 max_ops,
212 })
213 }
214}
215
216impl GetOperationsRequest {
217 pub fn validate(&self) -> Result<(), crate::Error> {
218 if self.start_loc >= self.op_count {
219 return Err(crate::Error::InvalidRequest(format!(
220 "start_loc >= size ({}) >= ({})",
221 self.start_loc, self.op_count
222 )));
223 }
224 Ok(())
225 }
226}
227
228impl<Op, D> Write for GetOperationsResponse<Op, D>
229where
230 Op: Write,
231 D: Digest,
232{
233 fn write(&self, buf: &mut impl BufMut) {
234 self.request_id.write(buf);
235 self.proof.write(buf);
236 self.operations.write(buf);
237 }
238}
239
240impl<Op, D> EncodeSize for GetOperationsResponse<Op, D>
241where
242 Op: EncodeSize,
243 D: Digest,
244{
245 fn encode_size(&self) -> usize {
246 self.request_id.encode_size() + self.proof.encode_size() + self.operations.encode_size()
247 }
248}
249
250impl<Op, D> Read for GetOperationsResponse<Op, D>
251where
252 Op: Read<Cfg = ()>,
253 D: Digest,
254{
255 type Cfg = ();
256 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
257 let request_id = RequestId::read_cfg(buf, &())?;
258 let proof = Proof::<D>::read_cfg(buf, &MAX_DIGESTS)?;
259 let operations = {
260 let range_cfg = RangeCfg::from(0..=MAX_DIGESTS);
261 Vec::<Op>::read_cfg(buf, &(range_cfg, ()))?
262 };
263 Ok(Self {
264 request_id,
265 proof,
266 operations,
267 })
268 }
269}
270
271impl Write for GetSyncTargetRequest {
272 fn write(&self, buf: &mut impl BufMut) {
273 self.request_id.write(buf);
274 }
275}
276
277impl EncodeSize for GetSyncTargetRequest {
278 fn encode_size(&self) -> usize {
279 self.request_id.encode_size()
280 }
281}
282
283impl Read for GetSyncTargetRequest {
284 type Cfg = ();
285 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
286 let request_id = RequestId::read_cfg(buf, &())?;
287 Ok(Self { request_id })
288 }
289}
290
291impl<D> Write for GetSyncTargetResponse<D>
292where
293 D: Digest,
294{
295 fn write(&self, buf: &mut impl BufMut) {
296 self.request_id.write(buf);
297 self.target.write(buf);
298 }
299}
300
301impl<D> EncodeSize for GetSyncTargetResponse<D>
302where
303 D: Digest,
304{
305 fn encode_size(&self) -> usize {
306 self.request_id.encode_size() + self.target.encode_size()
307 }
308}
309
310impl<D> Read for GetSyncTargetResponse<D>
311where
312 D: Digest,
313{
314 type Cfg = ();
315 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
316 let request_id = RequestId::read_cfg(buf, &())?;
317 let target = Target::<D>::read_cfg(buf, &())?;
318 Ok(Self { request_id, target })
319 }
320}