1pub const FRAME_HEADER_SIZE: usize = 11;
20
21pub const PROTOCOL_VERSION: u8 = 1;
23
24pub const DRAIN_FRAME_OVERHEAD: usize = 4;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum MsgType {
37 Request,
39 Response,
41 Push,
43 Error,
45 Other(u8),
52}
53
54impl MsgType {
55 #[inline]
57 pub fn from_u8(v: u8) -> Self {
58 match v {
59 0x00 => Self::Request,
60 0x01 => Self::Response,
61 0x02 => Self::Push,
62 0x04 => Self::Error,
63 other => Self::Other(other),
64 }
65 }
66
67 #[inline]
69 pub fn to_u8(self) -> u8 {
70 match self {
71 Self::Request => 0x00,
72 Self::Response => 0x01,
73 Self::Push => 0x02,
74 Self::Error => 0x04,
75 Self::Other(v) => v,
76 }
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub struct FrameHeader {
87 pub version: u8,
89 pub reserved: u8,
91 pub msg_type: MsgType,
93 pub sequence: u32,
95 pub payload_len: u32,
97}
98
99impl FrameHeader {
100 #[inline]
102 pub fn write_to(&self, buf: &mut Vec<u8>) {
103 let seq = self.sequence.to_le_bytes();
104 let plen = self.payload_len.to_le_bytes();
105 let header: [u8; FRAME_HEADER_SIZE] = [
106 self.version,
107 self.reserved,
108 self.msg_type.to_u8(),
109 seq[0],
110 seq[1],
111 seq[2],
112 seq[3],
113 plen[0],
114 plen[1],
115 plen[2],
116 plen[3],
117 ];
118 buf.extend_from_slice(&header);
119 }
120
121 #[inline]
125 pub fn read_from(data: &[u8]) -> Option<Self> {
126 if data.len() < FRAME_HEADER_SIZE {
127 return None;
128 }
129 let version = data[0];
130 if version != PROTOCOL_VERSION {
131 return None;
132 }
133 let reserved = data[1];
134 let msg_type = MsgType::from_u8(data[2]);
135 let sequence = u32::from_le_bytes([data[3], data[4], data[5], data[6]]);
136 let payload_len = u32::from_le_bytes([data[7], data[8], data[9], data[10]]);
137 Some(Self {
138 version,
139 reserved,
140 msg_type,
141 sequence,
142 payload_len,
143 })
144 }
145}
146
147#[inline]
153#[must_use]
154pub fn frame_pack(header: &FrameHeader, payload: &[u8]) -> Vec<u8> {
155 assert_eq!(
156 header.payload_len as usize,
157 payload.len(),
158 "frame_pack: header.payload_len ({}) != payload.len() ({})",
159 header.payload_len,
160 payload.len(),
161 );
162 let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
163 header.write_to(&mut buf);
164 buf.extend_from_slice(payload);
165 buf
166}
167
168#[inline]
173#[must_use]
174pub fn frame_unpack(data: &[u8]) -> Option<(FrameHeader, &[u8])> {
175 let header = FrameHeader::read_from(data)?;
176 let payload_end = FRAME_HEADER_SIZE.checked_add(header.payload_len as usize)?;
177 if data.len() < payload_end {
178 return None;
179 }
180 Some((header, &data[FRAME_HEADER_SIZE..payload_end]))
181}
182
183pub trait Encode {
189 fn encode(&self, buf: &mut Vec<u8>);
191
192 fn encode_size(&self) -> usize;
195}
196
197pub trait Decode: Sized {
202 fn decode(data: &[u8]) -> Option<(Self, usize)>;
204}
205
206macro_rules! impl_wire_int {
211 ($($ty:ty),+) => {
212 $(
213 impl Encode for $ty {
214 fn encode(&self, buf: &mut Vec<u8>) {
215 buf.extend_from_slice(&self.to_le_bytes());
216 }
217
218 fn encode_size(&self) -> usize {
219 std::mem::size_of::<$ty>()
220 }
221 }
222
223 impl Decode for $ty {
224 fn decode(data: &[u8]) -> Option<(Self, usize)> {
225 const SIZE: usize = std::mem::size_of::<$ty>();
226 if data.len() < SIZE {
227 return None;
228 }
229 let arr: [u8; SIZE] = data[..SIZE].try_into().ok()?;
230 Some((<$ty>::from_le_bytes(arr), SIZE))
231 }
232 }
233 )+
234 };
235}
236
237impl_wire_int!(u8, u16, u32, u64, i8, i16, i32, i64, f32, f64);
238
239impl Encode for bool {
241 fn encode(&self, buf: &mut Vec<u8>) {
242 buf.push(u8::from(*self));
243 }
244
245 fn encode_size(&self) -> usize {
246 1
247 }
248}
249
250impl Decode for bool {
251 fn decode(data: &[u8]) -> Option<(Self, usize)> {
252 if data.is_empty() {
253 return None;
254 }
255 match data[0] {
256 0 => Some((false, 1)),
257 1 => Some((true, 1)),
258 _ => None,
259 }
260 }
261}
262
263impl<T: Encode> Encode for Vec<T> {
267 fn encode(&self, buf: &mut Vec<u8>) {
268 let count: u32 = self.len().try_into().unwrap_or_else(|_| {
269 panic!(
270 "conduit: vec too large ({} elements exceeds u32::MAX)",
271 self.len()
272 )
273 });
274 buf.extend_from_slice(&count.to_le_bytes());
275 for item in self {
276 item.encode(buf);
277 }
278 }
279
280 fn encode_size(&self) -> usize {
281 4 + self.iter().map(|item| item.encode_size()).sum::<usize>()
282 }
283}
284
285impl<T: Decode> Decode for Vec<T> {
286 fn decode(data: &[u8]) -> Option<(Self, usize)> {
287 if data.len() < 4 {
288 return None;
289 }
290 let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
291 let mut off = 4;
292 let mut items = Vec::with_capacity(count);
293 for _ in 0..count {
294 let (item, consumed) = T::decode(&data[off..])?;
295 off += consumed;
296 items.push(item);
297 }
298 Some((items, off))
299 }
300}
301
302impl Encode for String {
304 fn encode(&self, buf: &mut Vec<u8>) {
305 let len: u32 = self.len().try_into().unwrap_or_else(|_| {
306 panic!(
307 "conduit: payload too large ({} bytes exceeds u32::MAX)",
308 self.len()
309 )
310 });
311 buf.extend_from_slice(&len.to_le_bytes());
312 buf.extend_from_slice(self.as_bytes());
313 }
314
315 fn encode_size(&self) -> usize {
316 4 + self.len()
317 }
318}
319
320impl Decode for String {
321 fn decode(data: &[u8]) -> Option<(Self, usize)> {
322 if data.len() < 4 {
323 return None;
324 }
325 let len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
326 if len > data.len() - 4 {
328 return None;
329 }
330 let total = 4 + len;
331 let s = std::str::from_utf8(&data[4..total]).ok()?;
332 Some((s.to_owned(), total))
333 }
334}
335
336#[cfg(test)]
341mod tests {
342 use super::*;
343
344 #[test]
345 fn frame_header_roundtrip() {
346 let original = FrameHeader {
347 version: PROTOCOL_VERSION,
348 reserved: 0,
349 msg_type: MsgType::Request,
350 sequence: 42,
351 payload_len: 128,
352 };
353 let mut buf = Vec::new();
354 original.write_to(&mut buf);
355 assert_eq!(buf.len(), FRAME_HEADER_SIZE);
356 let parsed = FrameHeader::read_from(&buf).unwrap();
357 assert_eq!(original, parsed);
358 }
359
360 #[test]
361 fn frame_pack_unwrap() {
362 let header = FrameHeader {
363 version: PROTOCOL_VERSION,
364 reserved: 0,
365 msg_type: MsgType::Push,
366 sequence: 7,
367 payload_len: 5,
368 };
369 let payload = b"hello";
370 let frame = frame_pack(&header, payload);
371 assert_eq!(frame.len(), FRAME_HEADER_SIZE + 5);
372
373 let (parsed_header, parsed_payload) = frame_unpack(&frame).unwrap();
374 assert_eq!(parsed_header, header);
375 assert_eq!(parsed_payload, payload);
376 }
377
378 #[test]
379 fn frame_too_short() {
380 let short = [0u8; 5];
381 assert!(FrameHeader::read_from(&short).is_none());
382 assert!(frame_unpack(&short).is_none());
383 }
384
385 #[test]
386 fn encode_decode_primitives() {
387 let mut buf = Vec::new();
389 42u8.encode(&mut buf);
390 let (val, consumed) = u8::decode(&buf).unwrap();
391 assert_eq!(val, 42u8);
392 assert_eq!(consumed, 1);
393
394 buf.clear();
396 0xDEAD_BEEFu32.encode(&mut buf);
397 let (val, consumed) = u32::decode(&buf).unwrap();
398 assert_eq!(val, 0xDEAD_BEEFu32);
399 assert_eq!(consumed, 4);
400
401 buf.clear();
403 (-999_999i64).encode(&mut buf);
404 let (val, consumed) = i64::decode(&buf).unwrap();
405 assert_eq!(val, -999_999i64);
406 assert_eq!(consumed, 8);
407
408 buf.clear();
410 std::f64::consts::PI.encode(&mut buf);
411 let (val, consumed) = f64::decode(&buf).unwrap();
412 assert_eq!(val, std::f64::consts::PI);
413 assert_eq!(consumed, 8);
414
415 buf.clear();
417 true.encode(&mut buf);
418 let (val, consumed) = bool::decode(&buf).unwrap();
419 assert!(val);
420 assert_eq!(consumed, 1);
421
422 buf.clear();
423 false.encode(&mut buf);
424 let (val, consumed) = bool::decode(&buf).unwrap();
425 assert!(!val);
426 assert_eq!(consumed, 1);
427 }
428
429 #[test]
430 fn encode_decode_vec() {
431 let original: Vec<u8> = vec![0xCA, 0xFE, 0xBA, 0xBE];
432 let mut buf = Vec::new();
433 original.encode(&mut buf);
434 assert_eq!(buf.len(), 4 + 4); let (decoded, consumed) = Vec::<u8>::decode(&buf).unwrap();
436 assert_eq!(decoded, original);
437 assert_eq!(consumed, 8);
438 }
439
440 #[test]
441 fn encode_decode_string() {
442 let original = String::from("conduit transport layer");
443 let mut buf = Vec::new();
444 original.encode(&mut buf);
445 assert_eq!(buf.len(), 4 + original.len());
446 let (decoded, consumed) = String::decode(&buf).unwrap();
447 assert_eq!(decoded, original);
448 assert_eq!(consumed, 4 + original.len());
449 }
450}