1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// SPDX-License-Identifier: MIT
// Copyright 2023 IROX Contributors

//!
//! Traits for packetization of data and movement of packets of data

use std::collections::VecDeque;
use std::io::{ErrorKind, Read, Write};

/// A packet is a series of bytes
pub type PacketData = Vec<u8>;

pub trait Packet {
    type PacketType;
    fn get_bytes(&self) -> Result<Vec<u8>, std::io::Error>;

    fn write_to<T: Write>(&self, out: &mut T) -> Result<(), std::io::Error> {
        out.write_all(self.get_bytes()?.as_slice())
    }

    fn get_type(&self) -> Self::PacketType;
}

pub trait PacketBuilder<P> {
    type Error;

    fn build_from<T: Read>(&self, input: &mut T) -> Result<P, Self::Error>;
}

///
/// This trait represents a way to packetize a stream of data
///
pub trait Packetization<T: Read> {
    /// Reads the next packet from the source reader
    fn read_next_packet(&mut self, source: &mut T) -> Result<PacketData, std::io::Error>;
}

///
/// Represents an underlying message packet transport
///
pub trait PacketTransport {
    type Error;

    /// Polls the next packet from the underlying transport
    fn poll_next_packet(&mut self) -> Result<PacketData, Self::Error>;

    /// Start the underlying transport up
    fn start(&mut self) -> Result<(), Self::Error>;

    /// Stop the underlying transport
    fn stop(&mut self) -> Result<(), Self::Error>;
}

///
/// A packetizer binds a Read stream and a Packetization strategy
pub struct Packetizer<'a, R: Read, P: Packetization<R>> {
    reader: &'a mut R,
    chunker: &'a mut P,
}

impl<'a, R, P> PacketTransport for Packetizer<'a, R, P>
where
    R: Read,
    P: Packetization<R>,
{
    type Error = std::io::Error;

    /// Polls the next packet from the underlying transport
    fn poll_next_packet(&mut self) -> Result<PacketData, Self::Error> {
        self.chunker.read_next_packet(self.reader)
    }

    /// Start the underlying transport up
    fn start(&mut self) -> Result<(), Self::Error> {
        // noop.
        Ok(())
    }

    /// Stop the underlying transport
    fn stop(&mut self) -> Result<(), Self::Error> {
        // noop.
        Ok(())
    }
}

///
/// A delimited packetizer searches for a delimiter in the underlying data stream
#[derive(Default)]
pub struct DelimitedPacketizer {
    /// The delimiter to search for
    pub delimiter: Vec<u8>,

    /// Include the delimiter in the packet output?
    pub include_delimiter: bool,

    /// Will scan up to max_buffer_size bytes in memory before failing.
    pub max_buffer_size: usize,

    /// Internal data buffer
    buffer: Vec<u8>,
}

impl<T: Read> Packetization<T> for DelimitedPacketizer {
    fn read_next_packet(&mut self, source: &mut T) -> Result<PacketData, std::io::Error> {
        if self.delimiter.is_empty() {
            return Err(std::io::Error::new(
                ErrorKind::InvalidData,
                "Delimiter is empty",
            ));
        }

        self.buffer.clear();
        let del_len = self.delimiter.len();

        let mut ringbuf: VecDeque<u8> = VecDeque::with_capacity(del_len);
        source.read_exact(ringbuf.as_mut_slices().0)?;

        let mut onebuf: [u8; 1] = [0; 1];
        loop {
            if ringbuf.eq(&self.delimiter) {
                let mut outbuf = self.buffer.clone();
                if self.include_delimiter {
                    outbuf.extend(&self.delimiter);
                }
                return Ok(outbuf);
            }

            if self.buffer.len() == self.max_buffer_size {
                return Err(std::io::Error::new(
                    ErrorKind::OutOfMemory,
                    "Packet exceeded max buffer size",
                ));
            }

            if source.read(&mut onebuf)? == 0 {
                return Ok(self.buffer.clone());
            }

            ringbuf.pop_front();
            ringbuf.push_back(onebuf[0]);
        }
    }
}