1pub const FRAME_HEADER_SIZE: usize = 11;
20
21pub const PROTOCOL_VERSION: u8 = 1;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum MsgType {
34 Request,
36 Response,
38 Push,
40 Error,
42 Other(u8),
44}
45
46impl MsgType {
47 #[inline]
49 pub fn from_u8(v: u8) -> Self {
50 match v {
51 0x00 => Self::Request,
52 0x01 => Self::Response,
53 0x02 => Self::Push,
54 0x04 => Self::Error,
55 other => Self::Other(other),
56 }
57 }
58
59 #[inline]
61 pub fn to_u8(self) -> u8 {
62 match self {
63 Self::Request => 0x00,
64 Self::Response => 0x01,
65 Self::Push => 0x02,
66 Self::Error => 0x04,
67 Self::Other(v) => v,
68 }
69 }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub struct FrameHeader {
79 pub version: u8,
81 pub transport_tier: u8,
83 pub msg_type: MsgType,
85 pub sequence: u32,
87 pub payload_len: u32,
89}
90
91impl FrameHeader {
92 #[inline]
94 pub fn write_to(&self, buf: &mut Vec<u8>) {
95 buf.push(self.version);
96 buf.push(self.transport_tier);
97 buf.push(self.msg_type.to_u8());
98 buf.extend_from_slice(&self.sequence.to_le_bytes());
99 buf.extend_from_slice(&self.payload_len.to_le_bytes());
100 }
101
102 #[inline]
106 pub fn read_from(data: &[u8]) -> Option<Self> {
107 if data.len() < FRAME_HEADER_SIZE {
108 return None;
109 }
110 let version = data[0];
111 let transport_tier = data[1];
112 let msg_type = MsgType::from_u8(data[2]);
113 let sequence = u32::from_le_bytes([data[3], data[4], data[5], data[6]]);
114 let payload_len = u32::from_le_bytes([data[7], data[8], data[9], data[10]]);
115 Some(Self {
116 version,
117 transport_tier,
118 msg_type,
119 sequence,
120 payload_len,
121 })
122 }
123}
124
125#[inline]
131#[must_use]
132pub fn frame_wrap(header: &FrameHeader, payload: &[u8]) -> Vec<u8> {
133 let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
134 header.write_to(&mut buf);
135 buf.extend_from_slice(payload);
136 buf
137}
138
139#[inline]
144#[must_use]
145pub fn frame_unwrap(data: &[u8]) -> Option<(FrameHeader, &[u8])> {
146 let header = FrameHeader::read_from(data)?;
147 let payload_end = FRAME_HEADER_SIZE + header.payload_len as usize;
148 if data.len() < payload_end {
149 return None;
150 }
151 Some((header, &data[FRAME_HEADER_SIZE..payload_end]))
152}
153
154pub trait WireEncode {
160 fn wire_encode(&self, buf: &mut Vec<u8>);
162
163 fn wire_size(&self) -> usize;
166}
167
168pub trait WireDecode: Sized {
173 fn wire_decode(data: &[u8]) -> Option<(Self, usize)>;
175}
176
177macro_rules! impl_wire_int {
182 ($($ty:ty),+) => {
183 $(
184 impl WireEncode for $ty {
185 fn wire_encode(&self, buf: &mut Vec<u8>) {
186 buf.extend_from_slice(&self.to_le_bytes());
187 }
188
189 fn wire_size(&self) -> usize {
190 std::mem::size_of::<$ty>()
191 }
192 }
193
194 impl WireDecode for $ty {
195 fn wire_decode(data: &[u8]) -> Option<(Self, usize)> {
196 const SIZE: usize = std::mem::size_of::<$ty>();
197 if data.len() < SIZE {
198 return None;
199 }
200 let arr: [u8; SIZE] = data[..SIZE].try_into().ok()?;
201 Some((<$ty>::from_le_bytes(arr), SIZE))
202 }
203 }
204 )+
205 };
206}
207
208impl_wire_int!(u8, u16, u32, u64, i8, i16, i32, i64, f32, f64);
209
210impl WireEncode for bool {
212 fn wire_encode(&self, buf: &mut Vec<u8>) {
213 buf.push(u8::from(*self));
214 }
215
216 fn wire_size(&self) -> usize {
217 1
218 }
219}
220
221impl WireDecode for bool {
222 fn wire_decode(data: &[u8]) -> Option<(Self, usize)> {
223 if data.is_empty() {
224 return None;
225 }
226 match data[0] {
227 0 => Some((false, 1)),
228 1 => Some((true, 1)),
229 _ => None,
230 }
231 }
232}
233
234impl WireEncode for Vec<u8> {
236 fn wire_encode(&self, buf: &mut Vec<u8>) {
237 let len: u32 = self.len().try_into().unwrap_or_else(|_| {
238 panic!(
239 "conduit: payload too large ({} bytes exceeds u32::MAX)",
240 self.len()
241 )
242 });
243 buf.extend_from_slice(&len.to_le_bytes());
244 buf.extend_from_slice(self);
245 }
246
247 fn wire_size(&self) -> usize {
248 4 + self.len()
249 }
250}
251
252impl WireDecode for Vec<u8> {
253 fn wire_decode(data: &[u8]) -> Option<(Self, usize)> {
254 if data.len() < 4 {
255 return None;
256 }
257 let len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
258 let total = 4 + len;
259 if data.len() < total {
260 return None;
261 }
262 Some((data[4..total].to_vec(), total))
263 }
264}
265
266impl WireEncode for String {
268 fn wire_encode(&self, buf: &mut Vec<u8>) {
269 let len: u32 = self.len().try_into().unwrap_or_else(|_| {
270 panic!(
271 "conduit: payload too large ({} bytes exceeds u32::MAX)",
272 self.len()
273 )
274 });
275 buf.extend_from_slice(&len.to_le_bytes());
276 buf.extend_from_slice(self.as_bytes());
277 }
278
279 fn wire_size(&self) -> usize {
280 4 + self.len()
281 }
282}
283
284impl WireDecode for String {
285 fn wire_decode(data: &[u8]) -> Option<(Self, usize)> {
286 if data.len() < 4 {
287 return None;
288 }
289 let len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
290 let total = 4 + len;
291 if data.len() < total {
292 return None;
293 }
294 let s = std::str::from_utf8(&data[4..total]).ok()?;
295 Some((s.to_owned(), total))
296 }
297}
298
299#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn frame_header_roundtrip() {
309 let original = FrameHeader {
310 version: PROTOCOL_VERSION,
311 transport_tier: 0,
312 msg_type: MsgType::Request,
313 sequence: 42,
314 payload_len: 128,
315 };
316 let mut buf = Vec::new();
317 original.write_to(&mut buf);
318 assert_eq!(buf.len(), FRAME_HEADER_SIZE);
319 let parsed = FrameHeader::read_from(&buf).unwrap();
320 assert_eq!(original, parsed);
321 }
322
323 #[test]
324 fn frame_wrap_unwrap() {
325 let header = FrameHeader {
326 version: PROTOCOL_VERSION,
327 transport_tier: 0,
328 msg_type: MsgType::Push,
329 sequence: 7,
330 payload_len: 5,
331 };
332 let payload = b"hello";
333 let frame = frame_wrap(&header, payload);
334 assert_eq!(frame.len(), FRAME_HEADER_SIZE + 5);
335
336 let (parsed_header, parsed_payload) = frame_unwrap(&frame).unwrap();
337 assert_eq!(parsed_header, header);
338 assert_eq!(parsed_payload, payload);
339 }
340
341 #[test]
342 fn frame_too_short() {
343 let short = [0u8; 5];
344 assert!(FrameHeader::read_from(&short).is_none());
345 assert!(frame_unwrap(&short).is_none());
346 }
347
348 #[test]
349 fn wire_encode_decode_primitives() {
350 let mut buf = Vec::new();
352 42u8.wire_encode(&mut buf);
353 let (val, consumed) = u8::wire_decode(&buf).unwrap();
354 assert_eq!(val, 42u8);
355 assert_eq!(consumed, 1);
356
357 buf.clear();
359 0xDEAD_BEEFu32.wire_encode(&mut buf);
360 let (val, consumed) = u32::wire_decode(&buf).unwrap();
361 assert_eq!(val, 0xDEAD_BEEFu32);
362 assert_eq!(consumed, 4);
363
364 buf.clear();
366 (-999_999i64).wire_encode(&mut buf);
367 let (val, consumed) = i64::wire_decode(&buf).unwrap();
368 assert_eq!(val, -999_999i64);
369 assert_eq!(consumed, 8);
370
371 buf.clear();
373 std::f64::consts::PI.wire_encode(&mut buf);
374 let (val, consumed) = f64::wire_decode(&buf).unwrap();
375 assert_eq!(val, std::f64::consts::PI);
376 assert_eq!(consumed, 8);
377
378 buf.clear();
380 true.wire_encode(&mut buf);
381 let (val, consumed) = bool::wire_decode(&buf).unwrap();
382 assert!(val);
383 assert_eq!(consumed, 1);
384
385 buf.clear();
386 false.wire_encode(&mut buf);
387 let (val, consumed) = bool::wire_decode(&buf).unwrap();
388 assert!(!val);
389 assert_eq!(consumed, 1);
390 }
391
392 #[test]
393 fn wire_encode_decode_vec() {
394 let original: Vec<u8> = vec![0xCA, 0xFE, 0xBA, 0xBE];
395 let mut buf = Vec::new();
396 original.wire_encode(&mut buf);
397 assert_eq!(buf.len(), 4 + 4); let (decoded, consumed) = Vec::<u8>::wire_decode(&buf).unwrap();
399 assert_eq!(decoded, original);
400 assert_eq!(consumed, 8);
401 }
402
403 #[test]
404 fn wire_encode_decode_string() {
405 let original = String::from("conduit transport layer");
406 let mut buf = Vec::new();
407 original.wire_encode(&mut buf);
408 assert_eq!(buf.len(), 4 + original.len());
409 let (decoded, consumed) = String::wire_decode(&buf).unwrap();
410 assert_eq!(decoded, original);
411 assert_eq!(consumed, 4 + original.len());
412 }
413}