irox_tools/
packetio.rs

1// SPDX-License-Identifier: MIT
2// Copyright 2025 IROX Contributors
3//
4
5//!
6//! Traits for packetization of data and movement of packets of data
7
8extern crate alloc;
9
10use alloc::collections::VecDeque;
11use alloc::vec::Vec;
12use irox_bits::{Bits, Error, ErrorKind, MutBits};
13
14/// A packet is a series of bytes
15pub type PacketData = Vec<u8>;
16
17pub trait Packet {
18    type PacketType;
19    fn get_bytes(&self) -> Result<Vec<u8>, Error>;
20
21    fn write_to<T: MutBits>(&self, out: &mut T) -> Result<(), Error> {
22        out.write_all_bytes(self.get_bytes()?.as_slice())
23    }
24
25    fn get_type(&self) -> Self::PacketType;
26}
27
28pub trait PacketBuilder<P> {
29    type Error;
30
31    fn build_from<T: Bits>(&self, input: &mut T) -> Result<P, Self::Error>;
32}
33
34///
35/// This trait represents a way to packetize a stream of data
36///
37pub trait Packetization<T: Bits> {
38    /// Reads the next packet from the source reader
39    fn read_next_packet(&mut self, source: &mut T) -> Result<PacketData, Error>;
40}
41
42///
43/// Represents an underlying message packet transport
44///
45pub trait PacketTransport {
46    type Error;
47
48    /// Polls the next packet from the underlying transport
49    fn poll_next_packet(&mut self) -> Result<PacketData, Self::Error>;
50
51    /// Start the underlying transport up
52    fn start(&mut self) -> Result<(), Self::Error>;
53
54    /// Stop the underlying transport
55    fn stop(&mut self) -> Result<(), Self::Error>;
56}
57
58///
59/// A packetizer binds a Read stream and a Packetization strategy
60pub struct Packetizer<'a, R: Bits, P: Packetization<R>> {
61    reader: &'a mut R,
62    chunker: &'a mut P,
63}
64
65impl<R, P> PacketTransport for Packetizer<'_, R, P>
66where
67    R: Bits,
68    P: Packetization<R>,
69{
70    type Error = irox_bits::Error;
71
72    /// Polls the next packet from the underlying transport
73    fn poll_next_packet(&mut self) -> Result<PacketData, Self::Error> {
74        self.chunker.read_next_packet(self.reader)
75    }
76
77    /// Start the underlying transport up
78    fn start(&mut self) -> Result<(), Self::Error> {
79        // noop.
80        Ok(())
81    }
82
83    /// Stop the underlying transport
84    fn stop(&mut self) -> Result<(), Self::Error> {
85        // noop.
86        Ok(())
87    }
88}
89
90///
91/// A delimited packetizer searches for a delimiter in the underlying data stream
92#[derive(Default)]
93pub struct DelimitedPacketizer {
94    /// The delimiter to search for
95    pub delimiter: Vec<u8>,
96
97    /// Include the delimiter in the packet output?
98    pub include_delimiter: bool,
99
100    /// Will scan up to max_buffer_size bytes in memory before failing.
101    pub max_buffer_size: usize,
102
103    /// Internal data buffer
104    buffer: Vec<u8>,
105}
106
107impl<T: Bits> Packetization<T> for DelimitedPacketizer {
108    fn read_next_packet(&mut self, source: &mut T) -> Result<PacketData, Error> {
109        if self.delimiter.is_empty() {
110            return Err(Error::new(ErrorKind::InvalidData, "Delimiter is empty"));
111        }
112
113        self.buffer.clear();
114        let del_len = self.delimiter.len();
115
116        let mut ringbuf: VecDeque<u8> = VecDeque::with_capacity(del_len);
117        source.read_exact_into(del_len, &mut ringbuf.as_mut_slices().0)?;
118
119        loop {
120            if ringbuf.eq(&self.delimiter) {
121                let mut outbuf = self.buffer.clone();
122                if self.include_delimiter {
123                    outbuf.extend(&self.delimiter);
124                }
125                return Ok(outbuf);
126            }
127
128            if self.buffer.len() == self.max_buffer_size {
129                return Err(Error::new(
130                    ErrorKind::OutOfMemory,
131                    "Packet exceeded max buffer size",
132                ));
133            }
134            let Some(val) = source.next_u8()? else {
135                return Ok(self.buffer.clone());
136            };
137
138            ringbuf.pop_front();
139            ringbuf.push_back(val);
140        }
141    }
142}