1pub const FRAME_HEADER_SIZE: usize = 11;
20
21pub const PROTOCOL_VERSION: u8 = 1;
23
24pub const DRAIN_FRAME_OVERHEAD: usize = 4;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum MsgType {
37 Request,
39 Response,
41 Push,
43 Error,
45 Other(u8),
52}
53
54impl MsgType {
55 #[inline]
57 pub fn from_u8(v: u8) -> Self {
58 match v {
59 0x00 => Self::Request,
60 0x01 => Self::Response,
61 0x02 => Self::Push,
62 0x04 => Self::Error,
63 other => Self::Other(other),
64 }
65 }
66
67 #[inline]
69 pub fn to_u8(self) -> u8 {
70 match self {
71 Self::Request => 0x00,
72 Self::Response => 0x01,
73 Self::Push => 0x02,
74 Self::Error => 0x04,
75 Self::Other(v) => v,
76 }
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub struct FrameHeader {
87 pub version: u8,
89 pub reserved: u8,
91 pub msg_type: MsgType,
93 pub sequence: u32,
95 pub payload_len: u32,
97}
98
99impl FrameHeader {
100 #[inline]
102 pub fn write_to(&self, buf: &mut Vec<u8>) {
103 let seq = self.sequence.to_le_bytes();
104 let plen = self.payload_len.to_le_bytes();
105 let header: [u8; FRAME_HEADER_SIZE] = [
106 self.version,
107 self.reserved,
108 self.msg_type.to_u8(),
109 seq[0],
110 seq[1],
111 seq[2],
112 seq[3],
113 plen[0],
114 plen[1],
115 plen[2],
116 plen[3],
117 ];
118 buf.extend_from_slice(&header);
119 }
120
121 #[inline]
125 pub fn read_from(data: &[u8]) -> Option<Self> {
126 if data.len() < FRAME_HEADER_SIZE {
127 return None;
128 }
129 let version = data[0];
130 if version != PROTOCOL_VERSION {
131 return None;
132 }
133 let reserved = data[1];
134 let msg_type = MsgType::from_u8(data[2]);
135 let sequence = u32::from_le_bytes([data[3], data[4], data[5], data[6]]);
136 let payload_len = u32::from_le_bytes([data[7], data[8], data[9], data[10]]);
137 Some(Self {
138 version,
139 reserved,
140 msg_type,
141 sequence,
142 payload_len,
143 })
144 }
145}
146
147#[inline]
153#[must_use]
154pub fn frame_pack(header: &FrameHeader, payload: &[u8]) -> Vec<u8> {
155 assert_eq!(
156 header.payload_len as usize,
157 payload.len(),
158 "frame_pack: header.payload_len ({}) != payload.len() ({})",
159 header.payload_len,
160 payload.len(),
161 );
162 let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
163 header.write_to(&mut buf);
164 buf.extend_from_slice(payload);
165 buf
166}
167
168#[inline]
173#[must_use]
174pub fn frame_unpack(data: &[u8]) -> Option<(FrameHeader, &[u8])> {
175 let header = FrameHeader::read_from(data)?;
176 let payload_end = FRAME_HEADER_SIZE.checked_add(header.payload_len as usize)?;
177 if data.len() < payload_end {
178 return None;
179 }
180 Some((header, &data[FRAME_HEADER_SIZE..payload_end]))
181}
182
183pub trait Encode {
189 fn encode(&self, buf: &mut Vec<u8>);
191
192 fn encode_size(&self) -> usize;
195}
196
197pub trait Decode: Sized {
202 const MIN_SIZE: usize = 0;
208
209 fn decode(data: &[u8]) -> Option<(Self, usize)>;
211}
212
213macro_rules! impl_wire_int {
218 ($($ty:ty),+) => {
219 $(
220 impl Encode for $ty {
221 fn encode(&self, buf: &mut Vec<u8>) {
222 buf.extend_from_slice(&self.to_le_bytes());
223 }
224
225 fn encode_size(&self) -> usize {
226 std::mem::size_of::<$ty>()
227 }
228 }
229
230 impl Decode for $ty {
231 const MIN_SIZE: usize = std::mem::size_of::<$ty>();
232
233 fn decode(data: &[u8]) -> Option<(Self, usize)> {
234 const SIZE: usize = std::mem::size_of::<$ty>();
235 if data.len() < SIZE {
236 return None;
237 }
238 let arr: [u8; SIZE] = data[..SIZE].try_into().ok()?;
239 Some((<$ty>::from_le_bytes(arr), SIZE))
240 }
241 }
242 )+
243 };
244}
245
246impl_wire_int!(u8, u16, u32, u64, i8, i16, i32, i64, f32, f64);
247
248impl Encode for bool {
250 fn encode(&self, buf: &mut Vec<u8>) {
251 buf.push(u8::from(*self));
252 }
253
254 fn encode_size(&self) -> usize {
255 1
256 }
257}
258
259impl Decode for bool {
260 const MIN_SIZE: usize = 1;
261
262 fn decode(data: &[u8]) -> Option<(Self, usize)> {
263 if data.is_empty() {
264 return None;
265 }
266 match data[0] {
267 0 => Some((false, 1)),
268 1 => Some((true, 1)),
269 _ => None,
270 }
271 }
272}
273
274impl<T: Encode> Encode for Vec<T> {
278 fn encode(&self, buf: &mut Vec<u8>) {
279 let count: u32 = self.len().try_into().unwrap_or_else(|_| {
280 panic!(
281 "conduit: vec too large ({} elements exceeds u32::MAX)",
282 self.len()
283 )
284 });
285 buf.extend_from_slice(&count.to_le_bytes());
286 for item in self {
287 item.encode(buf);
288 }
289 }
290
291 fn encode_size(&self) -> usize {
292 4 + self.iter().map(|item| item.encode_size()).sum::<usize>()
293 }
294}
295
296impl<T: Decode> Decode for Vec<T> {
297 const MIN_SIZE: usize = 4;
298
299 fn decode(data: &[u8]) -> Option<(Self, usize)> {
300 if data.len() < 4 {
301 return None;
302 }
303 let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
304 let mut off = 4;
305 let mut items = Vec::with_capacity(count);
306 for _ in 0..count {
307 let (item, consumed) = T::decode(&data[off..])?;
308 off += consumed;
309 items.push(item);
310 }
311 Some((items, off))
312 }
313}
314
315impl Encode for String {
317 fn encode(&self, buf: &mut Vec<u8>) {
318 let len: u32 = self.len().try_into().unwrap_or_else(|_| {
319 panic!(
320 "conduit: payload too large ({} bytes exceeds u32::MAX)",
321 self.len()
322 )
323 });
324 buf.extend_from_slice(&len.to_le_bytes());
325 buf.extend_from_slice(self.as_bytes());
326 }
327
328 fn encode_size(&self) -> usize {
329 4 + self.len()
330 }
331}
332
333impl Decode for String {
334 const MIN_SIZE: usize = 4;
335
336 fn decode(data: &[u8]) -> Option<(Self, usize)> {
337 if data.len() < 4 {
338 return None;
339 }
340 let len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
341 if len > data.len() - 4 {
343 return None;
344 }
345 let total = 4 + len;
346 let s = std::str::from_utf8(&data[4..total]).ok()?;
347 Some((s.to_owned(), total))
348 }
349}
350
351#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
363pub struct Bytes(pub Vec<u8>);
364
365impl From<Vec<u8>> for Bytes {
366 fn from(v: Vec<u8>) -> Self {
367 Self(v)
368 }
369}
370
371impl From<Bytes> for Vec<u8> {
372 fn from(b: Bytes) -> Self {
373 b.0
374 }
375}
376
377impl std::ops::Deref for Bytes {
378 type Target = [u8];
379 fn deref(&self) -> &[u8] {
380 &self.0
381 }
382}
383
384impl AsRef<[u8]> for Bytes {
385 fn as_ref(&self) -> &[u8] {
386 &self.0
387 }
388}
389
390impl Encode for Bytes {
391 fn encode(&self, buf: &mut Vec<u8>) {
392 let count: u32 = self.0.len().try_into().unwrap_or_else(|_| {
393 panic!(
394 "conduit: bytes too large ({} bytes exceeds u32::MAX)",
395 self.0.len()
396 )
397 });
398 buf.extend_from_slice(&count.to_le_bytes());
399 buf.extend_from_slice(&self.0);
400 }
401
402 fn encode_size(&self) -> usize {
403 4 + self.0.len()
404 }
405}
406
407impl Decode for Bytes {
408 const MIN_SIZE: usize = 4;
409
410 fn decode(data: &[u8]) -> Option<(Self, usize)> {
411 if data.len() < 4 {
412 return None;
413 }
414 let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
415 let total = 4usize.checked_add(count)?;
416 if data.len() < total {
417 return None;
418 }
419 Some((Bytes(data[4..total].to_vec()), total))
420 }
421}
422
423#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
432 fn frame_header_roundtrip() {
433 let original = FrameHeader {
434 version: PROTOCOL_VERSION,
435 reserved: 0,
436 msg_type: MsgType::Request,
437 sequence: 42,
438 payload_len: 128,
439 };
440 let mut buf = Vec::new();
441 original.write_to(&mut buf);
442 assert_eq!(buf.len(), FRAME_HEADER_SIZE);
443 let parsed = FrameHeader::read_from(&buf).unwrap();
444 assert_eq!(original, parsed);
445 }
446
447 #[test]
448 fn frame_pack_unwrap() {
449 let header = FrameHeader {
450 version: PROTOCOL_VERSION,
451 reserved: 0,
452 msg_type: MsgType::Push,
453 sequence: 7,
454 payload_len: 5,
455 };
456 let payload = b"hello";
457 let frame = frame_pack(&header, payload);
458 assert_eq!(frame.len(), FRAME_HEADER_SIZE + 5);
459
460 let (parsed_header, parsed_payload) = frame_unpack(&frame).unwrap();
461 assert_eq!(parsed_header, header);
462 assert_eq!(parsed_payload, payload);
463 }
464
465 #[test]
466 fn frame_too_short() {
467 let short = [0u8; 5];
468 assert!(FrameHeader::read_from(&short).is_none());
469 assert!(frame_unpack(&short).is_none());
470 }
471
472 #[test]
473 fn encode_decode_primitives() {
474 let mut buf = Vec::new();
476 42u8.encode(&mut buf);
477 let (val, consumed) = u8::decode(&buf).unwrap();
478 assert_eq!(val, 42u8);
479 assert_eq!(consumed, 1);
480
481 buf.clear();
483 0xDEAD_BEEFu32.encode(&mut buf);
484 let (val, consumed) = u32::decode(&buf).unwrap();
485 assert_eq!(val, 0xDEAD_BEEFu32);
486 assert_eq!(consumed, 4);
487
488 buf.clear();
490 (-999_999i64).encode(&mut buf);
491 let (val, consumed) = i64::decode(&buf).unwrap();
492 assert_eq!(val, -999_999i64);
493 assert_eq!(consumed, 8);
494
495 buf.clear();
497 std::f64::consts::PI.encode(&mut buf);
498 let (val, consumed) = f64::decode(&buf).unwrap();
499 assert_eq!(val, std::f64::consts::PI);
500 assert_eq!(consumed, 8);
501
502 buf.clear();
504 true.encode(&mut buf);
505 let (val, consumed) = bool::decode(&buf).unwrap();
506 assert!(val);
507 assert_eq!(consumed, 1);
508
509 buf.clear();
510 false.encode(&mut buf);
511 let (val, consumed) = bool::decode(&buf).unwrap();
512 assert!(!val);
513 assert_eq!(consumed, 1);
514 }
515
516 #[test]
517 fn encode_decode_vec() {
518 let original: Vec<u8> = vec![0xCA, 0xFE, 0xBA, 0xBE];
519 let mut buf = Vec::new();
520 original.encode(&mut buf);
521 assert_eq!(buf.len(), 4 + 4); let (decoded, consumed) = Vec::<u8>::decode(&buf).unwrap();
523 assert_eq!(decoded, original);
524 assert_eq!(consumed, 8);
525 }
526
527 #[test]
528 fn encode_decode_string() {
529 let original = String::from("conduit transport layer");
530 let mut buf = Vec::new();
531 original.encode(&mut buf);
532 assert_eq!(buf.len(), 4 + original.len());
533 let (decoded, consumed) = String::decode(&buf).unwrap();
534 assert_eq!(decoded, original);
535 assert_eq!(consumed, 4 + original.len());
536 }
537
538 #[test]
539 fn encode_decode_bytes() {
540 let original = Bytes(vec![10, 20, 30, 40, 50]);
541 let mut buf = Vec::new();
542 original.encode(&mut buf);
543 assert_eq!(original.encode_size(), buf.len());
544 let (decoded, consumed) = Bytes::decode(&buf).unwrap();
545 assert_eq!(decoded, original);
546 assert_eq!(consumed, buf.len());
547 }
548
549 #[test]
550 fn bytes_empty() {
551 let original = Bytes(Vec::new());
552 let mut buf = Vec::new();
553 original.encode(&mut buf);
554 assert_eq!(buf.len(), 4); let (decoded, consumed) = Bytes::decode(&buf).unwrap();
556 assert_eq!(decoded.0.len(), 0);
557 assert_eq!(consumed, 4);
558 }
559
560 #[test]
561 fn bytes_wire_compatible_with_vec_u8() {
562 let data: Vec<u8> = vec![1, 2, 3, 4, 5];
564 let bytes = Bytes(data.clone());
565
566 let mut buf_vec = Vec::new();
567 data.encode(&mut buf_vec);
568
569 let mut buf_bytes = Vec::new();
570 bytes.encode(&mut buf_bytes);
571
572 assert_eq!(
573 buf_vec, buf_bytes,
574 "Bytes and Vec<u8> must produce identical wire format"
575 );
576 }
577
578 #[test]
579 fn min_size_primitives() {
580 assert_eq!(<u8 as Decode>::MIN_SIZE, 1);
581 assert_eq!(<u16 as Decode>::MIN_SIZE, 2);
582 assert_eq!(<u32 as Decode>::MIN_SIZE, 4);
583 assert_eq!(<u64 as Decode>::MIN_SIZE, 8);
584 assert_eq!(<i8 as Decode>::MIN_SIZE, 1);
585 assert_eq!(<i16 as Decode>::MIN_SIZE, 2);
586 assert_eq!(<i32 as Decode>::MIN_SIZE, 4);
587 assert_eq!(<i64 as Decode>::MIN_SIZE, 8);
588 assert_eq!(<f32 as Decode>::MIN_SIZE, 4);
589 assert_eq!(<f64 as Decode>::MIN_SIZE, 8);
590 assert_eq!(<bool as Decode>::MIN_SIZE, 1);
591 assert_eq!(<String as Decode>::MIN_SIZE, 4);
592 assert_eq!(<Vec<u8> as Decode>::MIN_SIZE, 4);
593 assert_eq!(<Bytes as Decode>::MIN_SIZE, 4);
594 }
595}