1use chrono::{DateTime, TimeZone, Utc};
25use uuid::Uuid;
26
27use crate::causal::{CausalContext, MessageId};
28use crate::channel::SchemaId;
29use crate::envelope::{Envelope, PublisherId};
30
31const MAGIC: [u8; 4] = *b"LMW1";
33const UUID_LEN: usize = 16;
34const LEN_PREFIX: usize = 4;
35
36#[derive(Clone, Debug, PartialEq, Eq)]
38pub enum WireError {
39 Truncated,
41 BadMagic,
43 BadUtf8,
45}
46
47impl std::fmt::Display for WireError {
48 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 Self::Truncated => formatter.write_str("cross-node envelope frame was truncated"),
51 Self::BadMagic => formatter.write_str("cross-node envelope frame had an unknown magic"),
52 Self::BadUtf8 => {
53 formatter.write_str("cross-node envelope publisher id was not valid UTF-8")
54 }
55 }
56 }
57}
58
59impl std::error::Error for WireError {}
60
61#[must_use]
63pub fn encode_envelope(envelope: &Envelope) -> Vec<u8> {
64 let publisher = envelope.publisher_id.as_str().as_bytes();
65 let chain = envelope
66 .causal_context
67 .as_ref()
68 .map(CausalContext::parent_chain)
69 .unwrap_or_default();
70 let mut bytes = Vec::with_capacity(
71 MAGIC.len()
72 + UUID_LEN * 2
73 + 8
74 + LEN_PREFIX
75 + publisher.len()
76 + LEN_PREFIX
77 + envelope.payload.len()
78 + LEN_PREFIX
79 + chain.len() * UUID_LEN,
80 );
81 bytes.extend_from_slice(&MAGIC);
82 bytes.extend_from_slice(envelope.message_id.as_uuid().as_bytes());
83 bytes.extend_from_slice(envelope.schema_id.as_uuid().as_bytes());
84 bytes.extend_from_slice(&envelope.timestamp.timestamp_millis().to_be_bytes());
85 write_length_prefixed(&mut bytes, publisher);
86 write_length_prefixed(&mut bytes, &envelope.payload);
87 write_u32(&mut bytes, u32_len(chain.len()));
88 for parent in chain {
89 bytes.extend_from_slice(parent.as_uuid().as_bytes());
90 }
91 bytes
92}
93
94pub fn decode_envelope(bytes: &[u8]) -> Result<Envelope, WireError> {
100 let mut cursor = Cursor::new(bytes);
101 if cursor.take(MAGIC.len())? != MAGIC {
102 return Err(WireError::BadMagic);
103 }
104 let message_id = MessageId::from_uuid(read_uuid(&mut cursor)?);
105 let schema_id = SchemaId::from_uuid(read_uuid(&mut cursor)?);
106 let timestamp_millis = i64::from_be_bytes(read_array(&mut cursor)?);
107 let timestamp = millis_to_datetime(timestamp_millis);
108 let publisher = std::str::from_utf8(cursor.take_length_prefixed()?)
109 .map_err(|_| WireError::BadUtf8)?
110 .to_owned();
111 let payload = cursor.take_length_prefixed()?.to_vec();
112 let chain_len = u32::from_be_bytes(read_array(&mut cursor)?) as usize;
113 let mut parent_chain = Vec::with_capacity(chain_len);
114 for _ in 0..chain_len {
115 parent_chain.push(MessageId::from_uuid(read_uuid(&mut cursor)?));
116 }
117 let causal_context = if parent_chain.is_empty() {
118 None
119 } else {
120 Some(CausalContext::from_parent_chain(parent_chain))
121 };
122 Ok(Envelope::with_message_id_and_timestamp(
123 message_id,
124 payload,
125 causal_context,
126 schema_id,
127 PublisherId::from(publisher),
128 timestamp,
129 ))
130}
131
132fn write_length_prefixed(bytes: &mut Vec<u8>, value: &[u8]) {
133 write_u32(bytes, u32_len(value.len()));
134 bytes.extend_from_slice(value);
135}
136
137fn write_u32(bytes: &mut Vec<u8>, value: u32) {
138 bytes.extend_from_slice(&value.to_be_bytes());
139}
140
141fn u32_len(value: usize) -> u32 {
145 u32::try_from(value).unwrap_or(u32::MAX)
146}
147
148fn millis_to_datetime(millis: i64) -> DateTime<Utc> {
149 Utc.timestamp_millis_opt(millis)
150 .single()
151 .unwrap_or_else(Utc::now)
152}
153
154fn read_uuid(cursor: &mut Cursor<'_>) -> Result<Uuid, WireError> {
155 Ok(Uuid::from_bytes(read_array::<UUID_LEN>(cursor)?))
156}
157
158fn read_array<const N: usize>(cursor: &mut Cursor<'_>) -> Result<[u8; N], WireError> {
159 let slice = cursor.take(N)?;
160 let mut array = [0_u8; N];
161 array.copy_from_slice(slice);
162 Ok(array)
163}
164
165struct Cursor<'a> {
167 bytes: &'a [u8],
168 offset: usize,
169}
170
171impl<'a> Cursor<'a> {
172 const fn new(bytes: &'a [u8]) -> Self {
173 Self { bytes, offset: 0 }
174 }
175
176 fn take(&mut self, count: usize) -> Result<&'a [u8], WireError> {
177 let end = self.offset.checked_add(count).ok_or(WireError::Truncated)?;
178 let slice = self
179 .bytes
180 .get(self.offset..end)
181 .ok_or(WireError::Truncated)?;
182 self.offset = end;
183 Ok(slice)
184 }
185
186 fn take_length_prefixed(&mut self) -> Result<&'a [u8], WireError> {
187 let mut length = [0_u8; LEN_PREFIX];
188 length.copy_from_slice(self.take(LEN_PREFIX)?);
189 self.take(u32::from_be_bytes(length) as usize)
190 }
191}
192
193#[cfg(test)]
194#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
195mod tests {
196 use super::{MAGIC, WireError, decode_envelope, encode_envelope};
197 use crate::causal::{CausalContext, MessageId};
198 use crate::channel::SchemaId;
199 use crate::envelope::{Envelope, PublisherId};
200 use chrono::{TimeZone, Utc};
201
202 fn sample_envelope() -> Envelope {
203 let parent = MessageId::new();
204 let grandparent = MessageId::new();
205 Envelope::with_message_id_and_timestamp(
206 MessageId::new(),
207 b"{\"value\":42}".to_vec(),
208 Some(CausalContext::from_parent_chain(vec![parent, grandparent])),
209 SchemaId::new(),
210 PublisherId::from("publisher-7"),
211 Utc.timestamp_millis_opt(1_700_000_000_123)
212 .single()
213 .expect("valid millis"),
214 )
215 }
216
217 #[test]
218 fn round_trips_a_full_envelope() {
219 let original = sample_envelope();
220 let bytes = encode_envelope(&original);
221 let decoded = decode_envelope(&bytes).expect("decode should succeed");
222 assert_eq!(decoded, original);
223 }
224
225 #[test]
226 fn round_trips_an_envelope_without_causal_context() {
227 let original = Envelope::with_message_id_and_timestamp(
228 MessageId::new(),
229 Vec::new(),
230 None,
231 SchemaId::new(),
232 PublisherId::default(),
233 Utc.timestamp_millis_opt(0).single().expect("epoch millis"),
234 );
235 let bytes = encode_envelope(&original);
236 let decoded = decode_envelope(&bytes).expect("decode should succeed");
237 assert_eq!(decoded, original);
238 assert!(decoded.causal_context.is_none());
239 }
240
241 #[test]
242 fn rejects_an_unknown_magic() {
243 let mut bytes = encode_envelope(&sample_envelope());
244 bytes[0] = b'X';
245 assert_eq!(decode_envelope(&bytes), Err(WireError::BadMagic));
246 }
247
248 #[test]
249 fn rejects_a_truncated_frame() {
250 let bytes = encode_envelope(&sample_envelope());
251 let truncated = &bytes[..bytes.len() - 4];
252 assert_eq!(decode_envelope(truncated), Err(WireError::Truncated));
253 }
254
255 #[test]
256 fn rejects_an_empty_frame() {
257 assert_eq!(decode_envelope(&[]), Err(WireError::Truncated));
258 }
259
260 #[test]
261 fn magic_is_stable() {
262 assert_eq!(&MAGIC, b"LMW1");
263 }
264}