Skip to main content

libaprs_engine/
transport.rs

1use std::io::{self, Read};
2
3use crate::{DiagnosticLayer, ErrorDiagnostic, MAX_PACKET_LEN};
4
5/// Default maximum byte batch accepted by transport helper readers.
6pub const DEFAULT_TRANSPORT_READ_LIMIT: usize = 1024 * 1024;
7
8/// Stable transport diagnostic categories for logs and external systems.
9#[derive(Clone, Copy, Debug, Eq, PartialEq)]
10pub enum TransportErrorCode {
11    /// Input exceeded the configured byte limit.
12    OversizedInput,
13    /// Input framing was invalid for the transport.
14    InvalidFrame,
15    /// Transport reached EOF before a complete packet/frame was available.
16    UnexpectedEof,
17    /// Transport operation timed out.
18    Timeout,
19    /// Underlying I/O failed.
20    Io,
21}
22
23impl TransportErrorCode {
24    /// Returns a stable machine-readable diagnostic code.
25    #[must_use]
26    pub const fn code(self) -> &'static str {
27        match self {
28            Self::OversizedInput => "transport.oversized_input",
29            Self::InvalidFrame => "transport.invalid_frame",
30            Self::UnexpectedEof => "transport.unexpected_eof",
31            Self::Timeout => "transport.timeout",
32            Self::Io => "transport.io",
33        }
34    }
35
36    /// Returns structured transport error metadata for operator diagnostics.
37    #[must_use]
38    pub fn diagnostic(self) -> ErrorDiagnostic {
39        match self {
40            Self::OversizedInput => ErrorDiagnostic {
41                layer: DiagnosticLayer::Transport,
42                code: self.code(),
43                name: "oversized_input",
44                description: "transport input exceeded the configured byte limit",
45                remediation: "keep bounded transport reads enabled and reject the oversized input",
46            },
47            Self::InvalidFrame => ErrorDiagnostic {
48                layer: DiagnosticLayer::Transport,
49                code: self.code(),
50                name: "invalid_frame",
51                description: "transport-specific framing was invalid before codec parsing",
52                remediation: "drop the frame and inspect upstream framing or escaping",
53            },
54            Self::UnexpectedEof => ErrorDiagnostic {
55                layer: DiagnosticLayer::Transport,
56                code: self.code(),
57                name: "unexpected_eof",
58                description: "transport ended before a complete packet or frame was available",
59                remediation: "discard the incomplete record and wait for a complete bounded frame",
60            },
61            Self::Timeout => ErrorDiagnostic {
62                layer: DiagnosticLayer::Transport,
63                code: self.code(),
64                name: "timeout",
65                description: "transport operation exceeded the caller-owned timeout",
66                remediation: "apply reconnect or retry policy outside the core parser",
67            },
68            Self::Io => ErrorDiagnostic {
69                layer: DiagnosticLayer::Transport,
70                code: self.code(),
71                name: "io",
72                description: "underlying transport I/O failed",
73                remediation:
74                    "handle the I/O failure at the adapter boundary before parsing more bytes",
75            },
76        }
77    }
78}
79
80/// Common packet-source contract for transport adapters.
81pub trait PacketSource {
82    /// Source-specific error type.
83    type Error;
84
85    /// Reads a bounded batch of packet byte vectors.
86    fn recv_packets(&mut self) -> Result<Vec<Vec<u8>>, Self::Error>;
87}
88
89/// Common packet-sink contract for transport adapters.
90pub trait PacketSink {
91    /// Sink-specific error type.
92    type Error;
93
94    /// Sends one packet byte slice without mutating or normalizing it.
95    fn send_packet(&mut self, packet: &[u8]) -> Result<(), Self::Error>;
96}
97
98/// Reads all available bytes from a reader while enforcing a hard limit.
99pub fn read_all_with_limit(mut reader: impl Read, max_bytes: usize) -> io::Result<Vec<u8>> {
100    let mut input = Vec::new();
101    let mut limited = (&mut reader).take(max_bytes.saturating_add(1) as u64);
102    limited.read_to_end(&mut input)?;
103    if input.len() > max_bytes {
104        return Err(oversized_input_error());
105    }
106    Ok(input)
107}
108
109/// Creates the standard oversized-input transport error.
110#[must_use]
111pub fn oversized_input_error() -> io::Error {
112    io::Error::new(
113        io::ErrorKind::InvalidData,
114        TransportErrorCode::OversizedInput.code(),
115    )
116}
117
118/// Line-oriented packet source for file/stdin style transports.
119#[derive(Clone, Debug, Eq, PartialEq)]
120pub struct LineTransport<'a> {
121    input: &'a [u8],
122}
123
124impl<'a> LineTransport<'a> {
125    /// Creates a transport over newline-separated packet bytes.
126    #[must_use]
127    pub fn new(input: &'a [u8]) -> Self {
128        Self { input }
129    }
130
131    /// Iterates packet lines without trailing CR/LF bytes.
132    #[must_use]
133    pub fn packets(&self) -> Vec<&'a [u8]> {
134        self.input
135            .split(|byte| *byte == b'\n')
136            .map(trim_trailing_carriage_return)
137            .filter(|line| !line.is_empty())
138            .collect()
139    }
140
141    /// Iterates packet lines while enforcing a per-packet byte limit.
142    ///
143    /// The limit is applied after removing one trailing carriage return from
144    /// CRLF-framed lines and before allocating owned packet copies.
145    pub fn packets_with_limit(&self, max_packet_len: usize) -> io::Result<Vec<&'a [u8]>> {
146        let mut packets = Vec::new();
147        for line in self
148            .input
149            .split(|byte| *byte == b'\n')
150            .map(trim_trailing_carriage_return)
151            .filter(|line| !line.is_empty())
152        {
153            if line.len() > max_packet_len {
154                return Err(oversized_input_error());
155            }
156            packets.push(line);
157        }
158        Ok(packets)
159    }
160}
161
162impl PacketSource for LineTransport<'_> {
163    type Error = io::Error;
164
165    fn recv_packets(&mut self) -> Result<Vec<Vec<u8>>, Self::Error> {
166        Ok(self
167            .packets_with_limit(MAX_PACKET_LEN)?
168            .into_iter()
169            .map(<[u8]>::to_vec)
170            .collect())
171    }
172}
173
174impl PacketSink for Vec<Vec<u8>> {
175    type Error = io::Error;
176
177    fn send_packet(&mut self, packet: &[u8]) -> Result<(), Self::Error> {
178        self.push(packet.to_vec());
179        Ok(())
180    }
181}
182
183fn trim_trailing_carriage_return(line: &[u8]) -> &[u8] {
184    line.strip_suffix(b"\r").unwrap_or(line)
185}