1use crate::net::{ErrorResponse, RequestId};
2use bytes::{Buf, BufMut};
3use commonware_codec::{
4 DecodeExt, Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write,
5};
6use commonware_cryptography::Digest;
7use commonware_storage::{adb::sync::Target, mmr::verification::Proof};
8use std::num::NonZeroU64;
9
10pub const MAX_DIGESTS: usize = 10_000;
12
13#[derive(Debug)]
15pub struct GetOperationsRequest {
16 pub request_id: RequestId,
17 pub size: u64,
18 pub start_loc: u64,
19 pub max_ops: NonZeroU64,
20}
21
22#[derive(Debug)]
24pub struct GetOperationsResponse<Op, D>
25where
26 D: Digest,
27{
28 pub request_id: RequestId,
29 pub proof: Proof<D>,
30 pub operations: Vec<Op>,
31}
32
33#[derive(Debug)]
35pub struct GetSyncTargetRequest {
36 pub request_id: RequestId,
37}
38
39#[derive(Debug)]
41pub struct GetSyncTargetResponse<D>
42where
43 D: Digest,
44{
45 pub request_id: RequestId,
46 pub target: Target<D>,
47}
48
49#[derive(Debug)]
51pub enum Message<Op, D>
52where
53 D: Digest,
54{
55 GetOperationsRequest(GetOperationsRequest),
56 GetOperationsResponse(GetOperationsResponse<Op, D>),
57 GetSyncTargetRequest(GetSyncTargetRequest),
58 GetSyncTargetResponse(GetSyncTargetResponse<D>),
59 Error(ErrorResponse),
60}
61
62impl<Op, D> Message<Op, D>
63where
64 D: Digest,
65{
66 pub fn request_id(&self) -> RequestId {
67 match self {
68 Message::GetOperationsRequest(r) => r.request_id,
69 Message::GetOperationsResponse(r) => r.request_id,
70 Message::GetSyncTargetRequest(r) => r.request_id,
71 Message::GetSyncTargetResponse(r) => r.request_id,
72 Message::Error(e) => e.request_id,
73 }
74 }
75}
76
77impl<Op, D> super::Message for Message<Op, D>
78where
79 Op: Encode + DecodeExt<()> + Send + Sync + 'static,
80 D: Digest,
81{
82 fn request_id(&self) -> RequestId {
83 self.request_id()
84 }
85}
86
87impl<Op, D> Write for Message<Op, D>
88where
89 Op: Write,
90 D: Digest,
91{
92 fn write(&self, buf: &mut impl BufMut) {
93 match self {
94 Message::GetOperationsRequest(req) => {
95 0u8.write(buf);
96 req.write(buf);
97 }
98 Message::GetOperationsResponse(resp) => {
99 1u8.write(buf);
100 resp.write(buf);
101 }
102 Message::GetSyncTargetRequest(req) => {
103 2u8.write(buf);
104 req.write(buf);
105 }
106 Message::GetSyncTargetResponse(resp) => {
107 3u8.write(buf);
108 resp.write(buf);
109 }
110 Message::Error(err) => {
111 4u8.write(buf);
112 err.write(buf);
113 }
114 }
115 }
116}
117
118impl<Op, D> EncodeSize for Message<Op, D>
119where
120 Op: EncodeSize,
121 D: Digest,
122{
123 fn encode_size(&self) -> usize {
124 1 + match self {
125 Message::GetOperationsRequest(req) => req.encode_size(),
126 Message::GetOperationsResponse(resp) => resp.encode_size(),
127 Message::GetSyncTargetRequest(req) => req.encode_size(),
128 Message::GetSyncTargetResponse(resp) => resp.encode_size(),
129 Message::Error(err) => err.encode_size(),
130 }
131 }
132}
133
134impl<Op, D> Read for Message<Op, D>
135where
136 Op: Read<Cfg = ()>,
137 D: Digest,
138{
139 type Cfg = ();
140 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
141 let tag = u8::read(buf)?;
142 match tag {
143 0 => Ok(Message::GetOperationsRequest(GetOperationsRequest::read(
144 buf,
145 )?)),
146 1 => Ok(Message::GetOperationsResponse(GetOperationsResponse::read(
147 buf,
148 )?)),
149 2 => Ok(Message::GetSyncTargetRequest(GetSyncTargetRequest::read(
150 buf,
151 )?)),
152 3 => Ok(Message::GetSyncTargetResponse(GetSyncTargetResponse::read(
153 buf,
154 )?)),
155 4 => Ok(Message::Error(ErrorResponse::read(buf)?)),
156 d => Err(CodecError::InvalidEnum(d)),
157 }
158 }
159}
160
161impl Write for GetOperationsRequest {
162 fn write(&self, buf: &mut impl BufMut) {
163 self.request_id.write(buf);
164 self.size.write(buf);
165 self.start_loc.write(buf);
166 self.max_ops.get().write(buf);
167 }
168}
169
170impl EncodeSize for GetOperationsRequest {
171 fn encode_size(&self) -> usize {
172 self.request_id.encode_size()
173 + self.size.encode_size()
174 + self.start_loc.encode_size()
175 + self.max_ops.get().encode_size()
176 }
177}
178
179impl Read for GetOperationsRequest {
180 type Cfg = ();
181 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
182 let request_id = RequestId::read_cfg(buf, &())?;
183 let size = u64::read(buf)?;
184 let start_loc = u64::read(buf)?;
185 let max_ops_raw = u64::read(buf)?;
186 let max_ops = NonZeroU64::new(max_ops_raw)
187 .ok_or_else(|| CodecError::Invalid("GetOperationsRequest", "max_ops cannot be zero"))?;
188 Ok(Self {
189 request_id,
190 size,
191 start_loc,
192 max_ops,
193 })
194 }
195}
196
197impl GetOperationsRequest {
198 pub fn validate(&self) -> Result<(), crate::Error> {
199 if self.start_loc >= self.size {
200 return Err(crate::Error::InvalidRequest(format!(
201 "start_loc >= size ({}) >= ({})",
202 self.start_loc, self.size
203 )));
204 }
205 if self.max_ops.get() == 0 {
206 return Err(crate::Error::InvalidRequest(
207 "max_ops cannot be zero".to_string(),
208 ));
209 }
210 Ok(())
211 }
212}
213
214impl<Op, D> Write for GetOperationsResponse<Op, D>
215where
216 Op: Write,
217 D: Digest,
218{
219 fn write(&self, buf: &mut impl BufMut) {
220 self.request_id.write(buf);
221 self.proof.write(buf);
222 self.operations.write(buf);
223 }
224}
225
226impl<Op, D> EncodeSize for GetOperationsResponse<Op, D>
227where
228 Op: EncodeSize,
229 D: Digest,
230{
231 fn encode_size(&self) -> usize {
232 self.request_id.encode_size() + self.proof.encode_size() + self.operations.encode_size()
233 }
234}
235
236impl<Op, D> Read for GetOperationsResponse<Op, D>
237where
238 Op: Read<Cfg = ()>,
239 D: Digest,
240{
241 type Cfg = ();
242 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
243 let request_id = RequestId::read_cfg(buf, &())?;
244 let proof = Proof::<D>::read_cfg(buf, &MAX_DIGESTS)?;
245 let operations = {
246 let range_cfg = RangeCfg::from(0..=MAX_DIGESTS);
247 Vec::<Op>::read_cfg(buf, &(range_cfg, ()))?
248 };
249 Ok(Self {
250 request_id,
251 proof,
252 operations,
253 })
254 }
255}
256
257impl Write for GetSyncTargetRequest {
258 fn write(&self, buf: &mut impl BufMut) {
259 self.request_id.write(buf);
260 }
261}
262
263impl EncodeSize for GetSyncTargetRequest {
264 fn encode_size(&self) -> usize {
265 self.request_id.encode_size()
266 }
267}
268
269impl Read for GetSyncTargetRequest {
270 type Cfg = ();
271 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
272 let request_id = RequestId::read_cfg(buf, &())?;
273 Ok(Self { request_id })
274 }
275}
276
277impl<D> Write for GetSyncTargetResponse<D>
278where
279 D: Digest,
280{
281 fn write(&self, buf: &mut impl BufMut) {
282 self.request_id.write(buf);
283 self.target.write(buf);
284 }
285}
286
287impl<D> EncodeSize for GetSyncTargetResponse<D>
288where
289 D: Digest,
290{
291 fn encode_size(&self) -> usize {
292 self.request_id.encode_size() + self.target.encode_size()
293 }
294}
295
296impl<D> Read for GetSyncTargetResponse<D>
297where
298 D: Digest,
299{
300 type Cfg = ();
301 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
302 let request_id = RequestId::read_cfg(buf, &())?;
303 let target = Target::<D>::read_cfg(buf, &())?;
304 Ok(Self { request_id, target })
305 }
306}