Skip to main content

liminal/channel/
wire.rs

1//! SRV-005: a deterministic byte codec for [`Envelope`] used to carry a
2//! published message across a beamr distribution link to a remote subscriber.
3//!
4//! Cross-node fan-out (the cluster `sync` module in `liminal-server`) sends a
5//! published envelope to each remote pg member as a single beamr binary term.
6//! The binary payload is encoded here and decoded back inside the remote
7//! subscriber process before it lands in that subscriber's inbox. The format is
8//! self-describing and length-prefixed so a partial or corrupt frame is
9//! rejected rather than mis-parsed — there is no reliance on `serde` or on the
10//! beamr ETF term shape, keeping the wire contract owned entirely by liminal.
11//!
12//! Layout (all integers big-endian):
13//! ```text
14//! magic:    4 bytes  = b"LMW1"
15//! message_id:       16 bytes (UUID)
16//! schema_id:        16 bytes (UUID)
17//! timestamp_millis:  8 bytes (i64)
18//! publisher_id:      4-byte length + UTF-8 bytes
19//! payload:           4-byte length + bytes
20//! parent_chain_len:  4 bytes (count of MessageId entries)
21//! parent_chain:      count * 16 bytes (UUID each)
22//! ```
23
24use chrono::{DateTime, TimeZone, Utc};
25use uuid::Uuid;
26
27use crate::causal::{CausalContext, MessageId};
28use crate::channel::SchemaId;
29use crate::envelope::{Envelope, PublisherId};
30
31/// Magic prefix identifying a liminal cross-node envelope frame, version 1.
32const MAGIC: [u8; 4] = *b"LMW1";
33const UUID_LEN: usize = 16;
34const LEN_PREFIX: usize = 4;
35
36/// A reason an [`Envelope`] frame could not be decoded from received bytes.
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub enum WireError {
39    /// The frame was shorter than the layout requires at some field.
40    Truncated,
41    /// The leading magic bytes did not match a liminal envelope frame.
42    BadMagic,
43    /// A UTF-8 field (the publisher id) was not valid UTF-8.
44    BadUtf8,
45}
46
47impl std::fmt::Display for WireError {
48    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            Self::Truncated => formatter.write_str("cross-node envelope frame was truncated"),
51            Self::BadMagic => formatter.write_str("cross-node envelope frame had an unknown magic"),
52            Self::BadUtf8 => {
53                formatter.write_str("cross-node envelope publisher id was not valid UTF-8")
54            }
55        }
56    }
57}
58
59impl std::error::Error for WireError {}
60
61/// Encodes `envelope` into a self-describing, length-prefixed byte frame.
62#[must_use]
63pub fn encode_envelope(envelope: &Envelope) -> Vec<u8> {
64    let publisher = envelope.publisher_id.as_str().as_bytes();
65    let chain = envelope
66        .causal_context
67        .as_ref()
68        .map(CausalContext::parent_chain)
69        .unwrap_or_default();
70    let mut bytes = Vec::with_capacity(
71        MAGIC.len()
72            + UUID_LEN * 2
73            + 8
74            + LEN_PREFIX
75            + publisher.len()
76            + LEN_PREFIX
77            + envelope.payload.len()
78            + LEN_PREFIX
79            + chain.len() * UUID_LEN,
80    );
81    bytes.extend_from_slice(&MAGIC);
82    bytes.extend_from_slice(envelope.message_id.as_uuid().as_bytes());
83    bytes.extend_from_slice(envelope.schema_id.as_uuid().as_bytes());
84    bytes.extend_from_slice(&envelope.timestamp.timestamp_millis().to_be_bytes());
85    write_length_prefixed(&mut bytes, publisher);
86    write_length_prefixed(&mut bytes, &envelope.payload);
87    write_u32(&mut bytes, u32_len(chain.len()));
88    for parent in chain {
89        bytes.extend_from_slice(parent.as_uuid().as_bytes());
90    }
91    bytes
92}
93
94/// Decodes an [`Envelope`] from a byte frame produced by [`encode_envelope`].
95///
96/// # Errors
97/// Returns [`WireError`] when the frame is truncated, has an unknown magic, or
98/// carries a non-UTF-8 publisher id.
99pub fn decode_envelope(bytes: &[u8]) -> Result<Envelope, WireError> {
100    let mut cursor = Cursor::new(bytes);
101    if cursor.take(MAGIC.len())? != MAGIC {
102        return Err(WireError::BadMagic);
103    }
104    let message_id = MessageId::from_uuid(read_uuid(&mut cursor)?);
105    let schema_id = SchemaId::from_uuid(read_uuid(&mut cursor)?);
106    let timestamp_millis = i64::from_be_bytes(read_array(&mut cursor)?);
107    let timestamp = millis_to_datetime(timestamp_millis);
108    let publisher = std::str::from_utf8(cursor.take_length_prefixed()?)
109        .map_err(|_| WireError::BadUtf8)?
110        .to_owned();
111    let payload = cursor.take_length_prefixed()?.to_vec();
112    let chain_len = u32::from_be_bytes(read_array(&mut cursor)?) as usize;
113    let mut parent_chain = Vec::with_capacity(chain_len);
114    for _ in 0..chain_len {
115        parent_chain.push(MessageId::from_uuid(read_uuid(&mut cursor)?));
116    }
117    let causal_context = if parent_chain.is_empty() {
118        None
119    } else {
120        Some(CausalContext::from_parent_chain(parent_chain))
121    };
122    Ok(Envelope::with_message_id_and_timestamp(
123        message_id,
124        payload,
125        causal_context,
126        schema_id,
127        PublisherId::from(publisher),
128        timestamp,
129    ))
130}
131
132fn write_length_prefixed(bytes: &mut Vec<u8>, value: &[u8]) {
133    write_u32(bytes, u32_len(value.len()));
134    bytes.extend_from_slice(value);
135}
136
137fn write_u32(bytes: &mut Vec<u8>, value: u32) {
138    bytes.extend_from_slice(&value.to_be_bytes());
139}
140
141/// Saturating conversion so an implausibly large field can never wrap; a frame
142/// that genuinely exceeds `u32::MAX` bytes would be rejected as truncated on
143/// decode rather than silently mis-encoded.
144fn u32_len(value: usize) -> u32 {
145    u32::try_from(value).unwrap_or(u32::MAX)
146}
147
148fn millis_to_datetime(millis: i64) -> DateTime<Utc> {
149    Utc.timestamp_millis_opt(millis)
150        .single()
151        .unwrap_or_else(Utc::now)
152}
153
154fn read_uuid(cursor: &mut Cursor<'_>) -> Result<Uuid, WireError> {
155    Ok(Uuid::from_bytes(read_array::<UUID_LEN>(cursor)?))
156}
157
158fn read_array<const N: usize>(cursor: &mut Cursor<'_>) -> Result<[u8; N], WireError> {
159    let slice = cursor.take(N)?;
160    let mut array = [0_u8; N];
161    array.copy_from_slice(slice);
162    Ok(array)
163}
164
165/// A forward-only reader over a borrowed byte frame.
166struct Cursor<'a> {
167    bytes: &'a [u8],
168    offset: usize,
169}
170
171impl<'a> Cursor<'a> {
172    const fn new(bytes: &'a [u8]) -> Self {
173        Self { bytes, offset: 0 }
174    }
175
176    fn take(&mut self, count: usize) -> Result<&'a [u8], WireError> {
177        let end = self.offset.checked_add(count).ok_or(WireError::Truncated)?;
178        let slice = self
179            .bytes
180            .get(self.offset..end)
181            .ok_or(WireError::Truncated)?;
182        self.offset = end;
183        Ok(slice)
184    }
185
186    fn take_length_prefixed(&mut self) -> Result<&'a [u8], WireError> {
187        let mut length = [0_u8; LEN_PREFIX];
188        length.copy_from_slice(self.take(LEN_PREFIX)?);
189        self.take(u32::from_be_bytes(length) as usize)
190    }
191}
192
193#[cfg(test)]
194#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
195mod tests {
196    use super::{MAGIC, WireError, decode_envelope, encode_envelope};
197    use crate::causal::{CausalContext, MessageId};
198    use crate::channel::SchemaId;
199    use crate::envelope::{Envelope, PublisherId};
200    use chrono::{TimeZone, Utc};
201
202    fn sample_envelope() -> Envelope {
203        let parent = MessageId::new();
204        let grandparent = MessageId::new();
205        Envelope::with_message_id_and_timestamp(
206            MessageId::new(),
207            b"{\"value\":42}".to_vec(),
208            Some(CausalContext::from_parent_chain(vec![parent, grandparent])),
209            SchemaId::new(),
210            PublisherId::from("publisher-7"),
211            Utc.timestamp_millis_opt(1_700_000_000_123)
212                .single()
213                .expect("valid millis"),
214        )
215    }
216
217    #[test]
218    fn round_trips_a_full_envelope() {
219        let original = sample_envelope();
220        let bytes = encode_envelope(&original);
221        let decoded = decode_envelope(&bytes).expect("decode should succeed");
222        assert_eq!(decoded, original);
223    }
224
225    #[test]
226    fn round_trips_an_envelope_without_causal_context() {
227        let original = Envelope::with_message_id_and_timestamp(
228            MessageId::new(),
229            Vec::new(),
230            None,
231            SchemaId::new(),
232            PublisherId::default(),
233            Utc.timestamp_millis_opt(0).single().expect("epoch millis"),
234        );
235        let bytes = encode_envelope(&original);
236        let decoded = decode_envelope(&bytes).expect("decode should succeed");
237        assert_eq!(decoded, original);
238        assert!(decoded.causal_context.is_none());
239    }
240
241    #[test]
242    fn rejects_an_unknown_magic() {
243        let mut bytes = encode_envelope(&sample_envelope());
244        bytes[0] = b'X';
245        assert_eq!(decode_envelope(&bytes), Err(WireError::BadMagic));
246    }
247
248    #[test]
249    fn rejects_a_truncated_frame() {
250        let bytes = encode_envelope(&sample_envelope());
251        let truncated = &bytes[..bytes.len() - 4];
252        assert_eq!(decode_envelope(truncated), Err(WireError::Truncated));
253    }
254
255    #[test]
256    fn rejects_an_empty_frame() {
257        assert_eq!(decode_envelope(&[]), Err(WireError::Truncated));
258    }
259
260    #[test]
261    fn magic_is_stable() {
262        assert_eq!(&MAGIC, b"LMW1");
263    }
264}