1use super::causal::CausalContext;
2use super::error::ProtocolError;
3
4const U32_LEN: usize = 4;
5
6#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
8pub struct SchemaId([u8; Self::WIRE_LEN]);
9
10impl SchemaId {
11 pub const WIRE_LEN: usize = 32;
13
14 #[must_use]
16 pub const fn new(bytes: [u8; Self::WIRE_LEN]) -> Self {
17 Self(bytes)
18 }
19
20 #[must_use]
22 pub const fn as_bytes(&self) -> &[u8; Self::WIRE_LEN] {
23 &self.0
24 }
25
26 #[must_use]
28 pub const fn into_bytes(self) -> [u8; Self::WIRE_LEN] {
29 self.0
30 }
31}
32
33#[derive(Clone, Debug, PartialEq, Eq)]
35pub struct MessageEnvelope {
36 pub schema_id: SchemaId,
38 pub causal_context: CausalContext,
40 pub payload: Vec<u8>,
42}
43
44impl MessageEnvelope {
45 #[must_use]
47 pub const fn new(schema_id: SchemaId, causal_context: CausalContext, payload: Vec<u8>) -> Self {
48 Self {
49 schema_id,
50 causal_context,
51 payload,
52 }
53 }
54
55 pub fn encoded_len(&self) -> Result<usize, ProtocolError> {
63 let causal_len = self.causal_context.encoded_len()?;
64 checked_u32_len(causal_len, "causal context")?;
65 checked_u32_len(self.payload.len(), "payload")?;
66 sum_lengths(&[
67 SchemaId::WIRE_LEN,
68 U32_LEN,
69 causal_len,
70 U32_LEN,
71 self.payload.len(),
72 ])
73 }
74
75 pub fn serialize(&self) -> Result<Vec<u8>, ProtocolError> {
82 let causal_bytes = self.causal_context.serialize()?;
83 checked_u32_len(causal_bytes.len(), "causal context")?;
84 checked_u32_len(self.payload.len(), "payload")?;
85 let len = sum_lengths(&[
86 SchemaId::WIRE_LEN,
87 U32_LEN,
88 causal_bytes.len(),
89 U32_LEN,
90 self.payload.len(),
91 ])?;
92 let mut bytes = Vec::with_capacity(len);
93
94 bytes.extend_from_slice(self.schema_id.as_bytes());
95 write_u32(&mut bytes, causal_bytes.len(), "causal context")?;
96 bytes.extend_from_slice(&causal_bytes);
97 write_u32(&mut bytes, self.payload.len(), "payload")?;
98 bytes.extend_from_slice(&self.payload);
99
100 if bytes.len() == len {
101 Ok(bytes)
102 } else {
103 Err(ProtocolError::codec(
104 "message envelope encoder produced an unexpected length",
105 ))
106 }
107 }
108
109 pub fn to_wire_bytes(&self) -> Result<Vec<u8>, ProtocolError> {
116 self.serialize()
117 }
118
119 pub fn deserialize(bytes: &[u8]) -> Result<Self, ProtocolError> {
126 Self::from_wire_bytes(bytes)
127 }
128
129 pub fn from_wire_bytes(bytes: &[u8]) -> Result<Self, ProtocolError> {
136 let mut offset = 0;
137 let schema_id = SchemaId::new(read_schema_id(bytes, &mut offset)?);
138 let causal_len = read_u32_as_usize(bytes, &mut offset, "causal context length")?;
139 let causal_bytes = read_slice(bytes, &mut offset, causal_len, "causal context bytes")?;
140 let causal_context = CausalContext::deserialize(causal_bytes)?;
141 let payload_len = read_u32_as_usize(bytes, &mut offset, "payload length")?;
142 let payload = read_slice(bytes, &mut offset, payload_len, "payload bytes")?.to_vec();
143
144 if offset == bytes.len() {
145 Ok(Self {
146 schema_id,
147 causal_context,
148 payload,
149 })
150 } else {
151 Err(ProtocolError::codec(
152 "message envelope contained trailing bytes",
153 ))
154 }
155 }
156}
157
158fn checked_u32_len(len: usize, field: &str) -> Result<(), ProtocolError> {
159 u32::try_from(len)
160 .map(|_| ())
161 .map_err(|_| ProtocolError::codec(format!("{field} length exceeded u32::MAX")))
162}
163
164fn sum_lengths(parts: &[usize]) -> Result<usize, ProtocolError> {
165 let mut total = 0_usize;
166 for part in parts {
167 total = total
168 .checked_add(*part)
169 .ok_or_else(|| ProtocolError::codec("message envelope length overflowed usize"))?;
170 }
171 Ok(total)
172}
173
174fn write_u32(buffer: &mut Vec<u8>, value: usize, field: &str) -> Result<(), ProtocolError> {
175 let value = u32::try_from(value)
176 .map_err(|_| ProtocolError::codec(format!("{field} length exceeded u32::MAX")))?;
177 buffer.extend_from_slice(&value.to_be_bytes());
178 Ok(())
179}
180
181fn read_schema_id(
182 bytes: &[u8],
183 offset: &mut usize,
184) -> Result<[u8; SchemaId::WIRE_LEN], ProtocolError> {
185 let schema_bytes = read_slice(bytes, offset, SchemaId::WIRE_LEN, "schema id")?;
186 let mut schema_id = [0_u8; SchemaId::WIRE_LEN];
187 schema_id.copy_from_slice(schema_bytes);
188 Ok(schema_id)
189}
190
191fn read_u32_as_usize(
192 bytes: &[u8],
193 offset: &mut usize,
194 field: &str,
195) -> Result<usize, ProtocolError> {
196 let bytes = read_slice(bytes, offset, U32_LEN, field)?;
197 let [b0, b1, b2, b3] = bytes else {
198 return Err(ProtocolError::codec(format!("{field} was truncated")));
199 };
200 usize::try_from(u32::from_be_bytes([*b0, *b1, *b2, *b3]))
201 .map_err(|_| ProtocolError::codec(format!("{field} cannot fit usize")))
202}
203
204fn read_slice<'a>(
205 bytes: &'a [u8],
206 offset: &mut usize,
207 len: usize,
208 field: &str,
209) -> Result<&'a [u8], ProtocolError> {
210 let end = offset
211 .checked_add(len)
212 .ok_or_else(|| ProtocolError::codec(format!("{field} offset overflowed usize")))?;
213 let Some(slice) = bytes.get(*offset..end) else {
214 return Err(ProtocolError::codec(format!(
215 "{field} exceeded available bytes"
216 )));
217 };
218 *offset = end;
219 Ok(slice)
220}
221
222#[cfg(test)]
223mod tests {
224 use std::fmt::Debug;
225
226 use super::{MessageEnvelope, SchemaId};
227 use crate::protocol::{CausalContext, MessageId, ProtocolError, extract_causal_context};
228
229 #[test]
230 fn envelope_trait_bounds_are_available() {
231 fn assert_schema_traits<T: Debug + Clone + Copy + PartialEq + Eq>() {}
232 fn assert_envelope_traits<T: Debug + Clone + PartialEq + Eq>() {}
233
234 assert_schema_traits::<SchemaId>();
235 assert_envelope_traits::<MessageEnvelope>();
236 }
237
238 #[test]
239 fn schema_id_wraps_exactly_thirty_two_bytes() {
240 let bytes = [0xAB; SchemaId::WIRE_LEN];
241 let schema_id = SchemaId::new(bytes);
242
243 assert_eq!(SchemaId::WIRE_LEN, 32);
244 assert_eq!(schema_id.as_bytes(), &bytes);
245 assert_eq!(schema_id.into_bytes(), bytes);
246 }
247
248 #[test]
249 fn constructor_sets_all_fields() {
250 let schema_id = SchemaId::new([1; 32]);
251 let causal_context = CausalContext::with_parent(MessageId::from("parent"));
252 let payload = vec![1, 2, 3];
253
254 let envelope = MessageEnvelope::new(schema_id, causal_context.clone(), payload.clone());
255
256 assert_eq!(envelope.schema_id, schema_id);
257 assert_eq!(envelope.causal_context, causal_context);
258 assert_eq!(envelope.payload, payload);
259 }
260
261 #[test]
262 fn identical_fields_produce_identical_bytes() -> Result<(), ProtocolError> {
263 let first = sample_envelope(vec![5, 6, 7]);
264 let second = sample_envelope(vec![5, 6, 7]);
265
266 assert_eq!(first.serialize()?, second.serialize()?);
267 Ok(())
268 }
269
270 #[test]
271 fn serialization_round_trips_losslessly() -> Result<(), ProtocolError> {
272 let envelope = sample_envelope(vec![9, 8, 7]);
273 let encoded = envelope.serialize()?;
274 let decoded = MessageEnvelope::deserialize(&encoded)?;
275
276 assert_eq!(decoded, envelope);
277 Ok(())
278 }
279
280 #[test]
281 fn encoded_layout_starts_with_schema_id_and_big_endian_lengths() -> Result<(), ProtocolError> {
282 let schema_id = SchemaId::new([0x42; 32]);
283 let causal_context = CausalContext {
284 parent_id: Some(MessageId::from("parent")),
285 vector_clock_entry: Some(0x0102_0304_0506_0708),
286 };
287 let causal_len = causal_context.encoded_len()?;
288 let envelope = MessageEnvelope::new(schema_id, causal_context, vec![0xAA, 0xBB]);
289 let encoded = envelope.serialize()?;
290
291 assert_eq!(&encoded[..32], schema_id.as_bytes());
292 assert_eq!(
293 &encoded[32..36],
294 &u32::try_from(causal_len)
295 .map_err(|_| ProtocolError::codec("test causal length exceeded u32"))?
296 .to_be_bytes()
297 );
298 let payload_len_offset = 36 + causal_len;
299 assert_eq!(
300 &encoded[payload_len_offset..payload_len_offset + 4],
301 &2_u32.to_be_bytes()
302 );
303 Ok(())
304 }
305
306 #[test]
307 fn empty_payload_round_trips() -> Result<(), ProtocolError> {
308 let envelope = sample_envelope(Vec::new());
309 let encoded = envelope.serialize()?;
310 let decoded = MessageEnvelope::deserialize(&encoded)?;
311
312 assert_eq!(decoded, envelope);
313 assert_eq!(decoded.payload, Vec::<u8>::new());
314 Ok(())
315 }
316
317 #[test]
318 fn causal_context_is_extractable_from_envelope_bytes() -> Result<(), ProtocolError> {
319 let envelope = sample_envelope(vec![1, 2, 3, 4]);
320 let encoded = envelope.serialize()?;
321
322 assert_eq!(extract_causal_context(&encoded)?, envelope.causal_context);
323 Ok(())
324 }
325
326 fn sample_envelope(payload: Vec<u8>) -> MessageEnvelope {
327 MessageEnvelope::new(
328 SchemaId::new([0x11; 32]),
329 CausalContext {
330 parent_id: Some(MessageId::from("parent-1")),
331 vector_clock_entry: Some(99),
332 },
333 payload,
334 )
335 }
336}