Skip to main content

liminal/protocol/
envelope.rs

1use super::causal::CausalContext;
2use super::error::ProtocolError;
3
4const U32_LEN: usize = 4;
5
6/// Content schema hash that identifies the opaque payload's type.
7#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
8pub struct SchemaId([u8; Self::WIRE_LEN]);
9
10impl SchemaId {
11    /// Number of bytes used by the schema id hash on the wire.
12    pub const WIRE_LEN: usize = 32;
13
14    /// Wrap a 32-byte content schema hash.
15    #[must_use]
16    pub const fn new(bytes: [u8; Self::WIRE_LEN]) -> Self {
17        Self(bytes)
18    }
19
20    /// Return the wrapped hash bytes.
21    #[must_use]
22    pub const fn as_bytes(&self) -> &[u8; Self::WIRE_LEN] {
23        &self.0
24    }
25
26    /// Consume this schema id and return its hash bytes.
27    #[must_use]
28    pub const fn into_bytes(self) -> [u8; Self::WIRE_LEN] {
29        self.0
30    }
31}
32
33/// Protocol message envelope carried by publish and conversation-message frames.
34#[derive(Clone, Debug, PartialEq, Eq)]
35pub struct MessageEnvelope {
36    /// Content type hash identifying the payload schema.
37    pub schema_id: SchemaId,
38    /// Structured causal metadata used by the bus for ordering decisions.
39    pub causal_context: CausalContext,
40    /// Opaque application payload bytes.
41    pub payload: Vec<u8>,
42}
43
44impl MessageEnvelope {
45    /// Create a protocol message envelope from all of its fields.
46    #[must_use]
47    pub const fn new(schema_id: SchemaId, causal_context: CausalContext, payload: Vec<u8>) -> Self {
48        Self {
49            schema_id,
50            causal_context,
51            payload,
52        }
53    }
54
55    /// Return the deterministic encoded length of this envelope.
56    ///
57    /// # Errors
58    ///
59    /// Returns [`ProtocolError::CodecError`] when the causal-context or payload
60    /// length cannot fit in the protocol's `u32` length fields, or when the total
61    /// length overflows `usize`.
62    pub fn encoded_len(&self) -> Result<usize, ProtocolError> {
63        let causal_len = self.causal_context.encoded_len()?;
64        checked_u32_len(causal_len, "causal context")?;
65        checked_u32_len(self.payload.len(), "payload")?;
66        sum_lengths(&[
67            SchemaId::WIRE_LEN,
68            U32_LEN,
69            causal_len,
70            U32_LEN,
71            self.payload.len(),
72        ])
73    }
74
75    /// Serialize this envelope with deterministic big-endian field encoding.
76    ///
77    /// # Errors
78    ///
79    /// Returns [`ProtocolError::CodecError`] when a length field cannot represent
80    /// the encoded envelope.
81    pub fn serialize(&self) -> Result<Vec<u8>, ProtocolError> {
82        let causal_bytes = self.causal_context.serialize()?;
83        checked_u32_len(causal_bytes.len(), "causal context")?;
84        checked_u32_len(self.payload.len(), "payload")?;
85        let len = sum_lengths(&[
86            SchemaId::WIRE_LEN,
87            U32_LEN,
88            causal_bytes.len(),
89            U32_LEN,
90            self.payload.len(),
91        ])?;
92        let mut bytes = Vec::with_capacity(len);
93
94        bytes.extend_from_slice(self.schema_id.as_bytes());
95        write_u32(&mut bytes, causal_bytes.len(), "causal context")?;
96        bytes.extend_from_slice(&causal_bytes);
97        write_u32(&mut bytes, self.payload.len(), "payload")?;
98        bytes.extend_from_slice(&self.payload);
99
100        if bytes.len() == len {
101            Ok(bytes)
102        } else {
103            Err(ProtocolError::codec(
104                "message envelope encoder produced an unexpected length",
105            ))
106        }
107    }
108
109    /// Serialize this envelope for wire transport.
110    ///
111    /// # Errors
112    ///
113    /// Returns [`ProtocolError::CodecError`] when a length field cannot represent
114    /// the encoded envelope.
115    pub fn to_wire_bytes(&self) -> Result<Vec<u8>, ProtocolError> {
116        self.serialize()
117    }
118
119    /// Deserialize an envelope from deterministic wire bytes.
120    ///
121    /// # Errors
122    ///
123    /// Returns [`ProtocolError::CodecError`] when the bytes are truncated,
124    /// contain malformed causal context bytes, or contain trailing bytes.
125    pub fn deserialize(bytes: &[u8]) -> Result<Self, ProtocolError> {
126        Self::from_wire_bytes(bytes)
127    }
128
129    /// Deserialize an envelope from deterministic wire bytes.
130    ///
131    /// # Errors
132    ///
133    /// Returns [`ProtocolError::CodecError`] when the bytes are truncated,
134    /// contain malformed causal context bytes, or contain trailing bytes.
135    pub fn from_wire_bytes(bytes: &[u8]) -> Result<Self, ProtocolError> {
136        let mut offset = 0;
137        let schema_id = SchemaId::new(read_schema_id(bytes, &mut offset)?);
138        let causal_len = read_u32_as_usize(bytes, &mut offset, "causal context length")?;
139        let causal_bytes = read_slice(bytes, &mut offset, causal_len, "causal context bytes")?;
140        let causal_context = CausalContext::deserialize(causal_bytes)?;
141        let payload_len = read_u32_as_usize(bytes, &mut offset, "payload length")?;
142        let payload = read_slice(bytes, &mut offset, payload_len, "payload bytes")?.to_vec();
143
144        if offset == bytes.len() {
145            Ok(Self {
146                schema_id,
147                causal_context,
148                payload,
149            })
150        } else {
151            Err(ProtocolError::codec(
152                "message envelope contained trailing bytes",
153            ))
154        }
155    }
156}
157
158fn checked_u32_len(len: usize, field: &str) -> Result<(), ProtocolError> {
159    u32::try_from(len)
160        .map(|_| ())
161        .map_err(|_| ProtocolError::codec(format!("{field} length exceeded u32::MAX")))
162}
163
164fn sum_lengths(parts: &[usize]) -> Result<usize, ProtocolError> {
165    let mut total = 0_usize;
166    for part in parts {
167        total = total
168            .checked_add(*part)
169            .ok_or_else(|| ProtocolError::codec("message envelope length overflowed usize"))?;
170    }
171    Ok(total)
172}
173
174fn write_u32(buffer: &mut Vec<u8>, value: usize, field: &str) -> Result<(), ProtocolError> {
175    let value = u32::try_from(value)
176        .map_err(|_| ProtocolError::codec(format!("{field} length exceeded u32::MAX")))?;
177    buffer.extend_from_slice(&value.to_be_bytes());
178    Ok(())
179}
180
181fn read_schema_id(
182    bytes: &[u8],
183    offset: &mut usize,
184) -> Result<[u8; SchemaId::WIRE_LEN], ProtocolError> {
185    let schema_bytes = read_slice(bytes, offset, SchemaId::WIRE_LEN, "schema id")?;
186    let mut schema_id = [0_u8; SchemaId::WIRE_LEN];
187    schema_id.copy_from_slice(schema_bytes);
188    Ok(schema_id)
189}
190
191fn read_u32_as_usize(
192    bytes: &[u8],
193    offset: &mut usize,
194    field: &str,
195) -> Result<usize, ProtocolError> {
196    let bytes = read_slice(bytes, offset, U32_LEN, field)?;
197    let [b0, b1, b2, b3] = bytes else {
198        return Err(ProtocolError::codec(format!("{field} was truncated")));
199    };
200    usize::try_from(u32::from_be_bytes([*b0, *b1, *b2, *b3]))
201        .map_err(|_| ProtocolError::codec(format!("{field} cannot fit usize")))
202}
203
204fn read_slice<'a>(
205    bytes: &'a [u8],
206    offset: &mut usize,
207    len: usize,
208    field: &str,
209) -> Result<&'a [u8], ProtocolError> {
210    let end = offset
211        .checked_add(len)
212        .ok_or_else(|| ProtocolError::codec(format!("{field} offset overflowed usize")))?;
213    let Some(slice) = bytes.get(*offset..end) else {
214        return Err(ProtocolError::codec(format!(
215            "{field} exceeded available bytes"
216        )));
217    };
218    *offset = end;
219    Ok(slice)
220}
221
222#[cfg(test)]
223mod tests {
224    use std::fmt::Debug;
225
226    use super::{MessageEnvelope, SchemaId};
227    use crate::protocol::{CausalContext, MessageId, ProtocolError, extract_causal_context};
228
229    #[test]
230    fn envelope_trait_bounds_are_available() {
231        fn assert_schema_traits<T: Debug + Clone + Copy + PartialEq + Eq>() {}
232        fn assert_envelope_traits<T: Debug + Clone + PartialEq + Eq>() {}
233
234        assert_schema_traits::<SchemaId>();
235        assert_envelope_traits::<MessageEnvelope>();
236    }
237
238    #[test]
239    fn schema_id_wraps_exactly_thirty_two_bytes() {
240        let bytes = [0xAB; SchemaId::WIRE_LEN];
241        let schema_id = SchemaId::new(bytes);
242
243        assert_eq!(SchemaId::WIRE_LEN, 32);
244        assert_eq!(schema_id.as_bytes(), &bytes);
245        assert_eq!(schema_id.into_bytes(), bytes);
246    }
247
248    #[test]
249    fn constructor_sets_all_fields() {
250        let schema_id = SchemaId::new([1; 32]);
251        let causal_context = CausalContext::with_parent(MessageId::from("parent"));
252        let payload = vec![1, 2, 3];
253
254        let envelope = MessageEnvelope::new(schema_id, causal_context.clone(), payload.clone());
255
256        assert_eq!(envelope.schema_id, schema_id);
257        assert_eq!(envelope.causal_context, causal_context);
258        assert_eq!(envelope.payload, payload);
259    }
260
261    #[test]
262    fn identical_fields_produce_identical_bytes() -> Result<(), ProtocolError> {
263        let first = sample_envelope(vec![5, 6, 7]);
264        let second = sample_envelope(vec![5, 6, 7]);
265
266        assert_eq!(first.serialize()?, second.serialize()?);
267        Ok(())
268    }
269
270    #[test]
271    fn serialization_round_trips_losslessly() -> Result<(), ProtocolError> {
272        let envelope = sample_envelope(vec![9, 8, 7]);
273        let encoded = envelope.serialize()?;
274        let decoded = MessageEnvelope::deserialize(&encoded)?;
275
276        assert_eq!(decoded, envelope);
277        Ok(())
278    }
279
280    #[test]
281    fn encoded_layout_starts_with_schema_id_and_big_endian_lengths() -> Result<(), ProtocolError> {
282        let schema_id = SchemaId::new([0x42; 32]);
283        let causal_context = CausalContext {
284            parent_id: Some(MessageId::from("parent")),
285            vector_clock_entry: Some(0x0102_0304_0506_0708),
286        };
287        let causal_len = causal_context.encoded_len()?;
288        let envelope = MessageEnvelope::new(schema_id, causal_context, vec![0xAA, 0xBB]);
289        let encoded = envelope.serialize()?;
290
291        assert_eq!(&encoded[..32], schema_id.as_bytes());
292        assert_eq!(
293            &encoded[32..36],
294            &u32::try_from(causal_len)
295                .map_err(|_| ProtocolError::codec("test causal length exceeded u32"))?
296                .to_be_bytes()
297        );
298        let payload_len_offset = 36 + causal_len;
299        assert_eq!(
300            &encoded[payload_len_offset..payload_len_offset + 4],
301            &2_u32.to_be_bytes()
302        );
303        Ok(())
304    }
305
306    #[test]
307    fn empty_payload_round_trips() -> Result<(), ProtocolError> {
308        let envelope = sample_envelope(Vec::new());
309        let encoded = envelope.serialize()?;
310        let decoded = MessageEnvelope::deserialize(&encoded)?;
311
312        assert_eq!(decoded, envelope);
313        assert_eq!(decoded.payload, Vec::<u8>::new());
314        Ok(())
315    }
316
317    #[test]
318    fn causal_context_is_extractable_from_envelope_bytes() -> Result<(), ProtocolError> {
319        let envelope = sample_envelope(vec![1, 2, 3, 4]);
320        let encoded = envelope.serialize()?;
321
322        assert_eq!(extract_causal_context(&encoded)?, envelope.causal_context);
323        Ok(())
324    }
325
326    fn sample_envelope(payload: Vec<u8>) -> MessageEnvelope {
327        MessageEnvelope::new(
328            SchemaId::new([0x11; 32]),
329            CausalContext {
330                parent_id: Some(MessageId::from("parent-1")),
331                vector_clock_entry: Some(99),
332            },
333            payload,
334        )
335    }
336}