libaprs_engine/
transport.rs1use std::io::{self, Read};
2
3use crate::{DiagnosticLayer, ErrorDiagnostic, MAX_PACKET_LEN};
4
5pub const DEFAULT_TRANSPORT_READ_LIMIT: usize = 1024 * 1024;
7
8#[derive(Clone, Copy, Debug, Eq, PartialEq)]
10pub enum TransportErrorCode {
11 OversizedInput,
13 InvalidFrame,
15 UnexpectedEof,
17 Timeout,
19 Io,
21}
22
23impl TransportErrorCode {
24 #[must_use]
26 pub const fn code(self) -> &'static str {
27 match self {
28 Self::OversizedInput => "transport.oversized_input",
29 Self::InvalidFrame => "transport.invalid_frame",
30 Self::UnexpectedEof => "transport.unexpected_eof",
31 Self::Timeout => "transport.timeout",
32 Self::Io => "transport.io",
33 }
34 }
35
36 #[must_use]
38 pub fn diagnostic(self) -> ErrorDiagnostic {
39 match self {
40 Self::OversizedInput => ErrorDiagnostic {
41 layer: DiagnosticLayer::Transport,
42 code: self.code(),
43 name: "oversized_input",
44 description: "transport input exceeded the configured byte limit",
45 remediation: "keep bounded transport reads enabled and reject the oversized input",
46 },
47 Self::InvalidFrame => ErrorDiagnostic {
48 layer: DiagnosticLayer::Transport,
49 code: self.code(),
50 name: "invalid_frame",
51 description: "transport-specific framing was invalid before codec parsing",
52 remediation: "drop the frame and inspect upstream framing or escaping",
53 },
54 Self::UnexpectedEof => ErrorDiagnostic {
55 layer: DiagnosticLayer::Transport,
56 code: self.code(),
57 name: "unexpected_eof",
58 description: "transport ended before a complete packet or frame was available",
59 remediation: "discard the incomplete record and wait for a complete bounded frame",
60 },
61 Self::Timeout => ErrorDiagnostic {
62 layer: DiagnosticLayer::Transport,
63 code: self.code(),
64 name: "timeout",
65 description: "transport operation exceeded the caller-owned timeout",
66 remediation: "apply reconnect or retry policy outside the core parser",
67 },
68 Self::Io => ErrorDiagnostic {
69 layer: DiagnosticLayer::Transport,
70 code: self.code(),
71 name: "io",
72 description: "underlying transport I/O failed",
73 remediation:
74 "handle the I/O failure at the adapter boundary before parsing more bytes",
75 },
76 }
77 }
78}
79
80pub trait PacketSource {
82 type Error;
84
85 fn recv_packets(&mut self) -> Result<Vec<Vec<u8>>, Self::Error>;
87}
88
89pub trait PacketSink {
91 type Error;
93
94 fn send_packet(&mut self, packet: &[u8]) -> Result<(), Self::Error>;
96}
97
98pub fn read_all_with_limit(mut reader: impl Read, max_bytes: usize) -> io::Result<Vec<u8>> {
100 let mut input = Vec::new();
101 let mut limited = (&mut reader).take(max_bytes.saturating_add(1) as u64);
102 limited.read_to_end(&mut input)?;
103 if input.len() > max_bytes {
104 return Err(oversized_input_error());
105 }
106 Ok(input)
107}
108
109#[must_use]
111pub fn oversized_input_error() -> io::Error {
112 io::Error::new(
113 io::ErrorKind::InvalidData,
114 TransportErrorCode::OversizedInput.code(),
115 )
116}
117
118#[derive(Clone, Debug, Eq, PartialEq)]
120pub struct LineTransport<'a> {
121 input: &'a [u8],
122}
123
124impl<'a> LineTransport<'a> {
125 #[must_use]
127 pub fn new(input: &'a [u8]) -> Self {
128 Self { input }
129 }
130
131 #[must_use]
133 pub fn packets(&self) -> Vec<&'a [u8]> {
134 self.input
135 .split(|byte| *byte == b'\n')
136 .map(trim_trailing_carriage_return)
137 .filter(|line| !line.is_empty())
138 .collect()
139 }
140
141 pub fn packets_with_limit(&self, max_packet_len: usize) -> io::Result<Vec<&'a [u8]>> {
146 let mut packets = Vec::new();
147 for line in self
148 .input
149 .split(|byte| *byte == b'\n')
150 .map(trim_trailing_carriage_return)
151 .filter(|line| !line.is_empty())
152 {
153 if line.len() > max_packet_len {
154 return Err(oversized_input_error());
155 }
156 packets.push(line);
157 }
158 Ok(packets)
159 }
160}
161
162impl PacketSource for LineTransport<'_> {
163 type Error = io::Error;
164
165 fn recv_packets(&mut self) -> Result<Vec<Vec<u8>>, Self::Error> {
166 Ok(self
167 .packets_with_limit(MAX_PACKET_LEN)?
168 .into_iter()
169 .map(<[u8]>::to_vec)
170 .collect())
171 }
172}
173
174impl PacketSink for Vec<Vec<u8>> {
175 type Error = io::Error;
176
177 fn send_packet(&mut self, packet: &[u8]) -> Result<(), Self::Error> {
178 self.push(packet.to_vec());
179 Ok(())
180 }
181}
182
183fn trim_trailing_carriage_return(line: &[u8]) -> &[u8] {
184 line.strip_suffix(b"\r").unwrap_or(line)
185}