perl_content_length_framing/
lib.rs1use std::fmt;
11
12const HEADER_SENTINEL: &[u8] = b"Content-Length:";
13const HEADER_END: &[u8] = b"\r\n\r\n";
14const RESYNC_TAIL_BYTES: usize = 8 * 1024;
15const MAX_DESYNC_BUFFER_BYTES: usize = 64 * 1024;
16
17pub const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
19
20#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum FramingError {
23 InvalidHeader,
25 InvalidHeaderUtf8,
27 MissingContentLength,
29 InvalidContentLength,
31 FrameTooLarge { len: usize },
33}
34
35impl fmt::Display for FramingError {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 match self {
38 Self::InvalidHeader => write!(f, "invalid Content-Length header"),
39 Self::InvalidHeaderUtf8 => write!(f, "header contains invalid UTF-8"),
40 Self::MissingContentLength => write!(f, "missing Content-Length header"),
41 Self::InvalidContentLength => write!(f, "invalid Content-Length value"),
42 Self::FrameTooLarge { len } => write!(f, "frame too large: {len} bytes"),
43 }
44 }
45}
46
47impl std::error::Error for FramingError {}
48
49#[derive(Default, Debug, Clone)]
51pub struct ContentLengthFramer {
52 buf: Vec<u8>,
53}
54
55impl ContentLengthFramer {
56 #[must_use]
58 pub fn new() -> Self {
59 Self { buf: Vec::new() }
60 }
61
62 pub fn push(&mut self, bytes: &[u8]) {
64 self.buf.extend_from_slice(bytes);
65 self.resync_if_needed();
66 }
67
68 pub fn try_next(&mut self) -> Result<Option<Vec<u8>>, FramingError> {
75 self.resync_if_needed();
76
77 let Some(start) = find_header_start(&self.buf) else {
78 if let Some(header_end) = find_subslice(&self.buf, HEADER_END) {
79 match std::str::from_utf8(&self.buf[..header_end]) {
80 Ok(header) => {
81 let has_header_shape = header
82 .split("\r\n")
83 .any(|line| !line.trim().is_empty() && line.contains(':'));
84 self.consume_header_block(header_end);
85 if has_header_shape {
86 return Err(FramingError::MissingContentLength);
87 }
88 return Err(FramingError::InvalidHeader);
89 }
90 Err(_) => {
91 self.consume_header_block(header_end);
92 return Err(FramingError::InvalidHeaderUtf8);
93 }
94 }
95 }
96 return Ok(None);
97 };
98 if start > 0 {
99 self.buf.drain(..start);
100 }
101
102 let Some(header_end) = find_subslice(&self.buf, HEADER_END) else {
103 return Ok(None);
104 };
105
106 let header_bytes = &self.buf[..header_end];
107 let header_str = match std::str::from_utf8(header_bytes) {
108 Ok(header) => header,
109 Err(_) => {
110 self.consume_header_block(header_end);
111 return Err(FramingError::InvalidHeaderUtf8);
112 }
113 };
114
115 let length = match parse_content_length(header_str) {
116 ContentLengthParse::Found(len) => len,
117 ContentLengthParse::Missing => {
118 self.consume_header_block(header_end);
119 return Err(FramingError::MissingContentLength);
120 }
121 ContentLengthParse::Invalid => {
122 self.consume_header_block(header_end);
123 return Err(FramingError::InvalidContentLength);
124 }
125 ContentLengthParse::MalformedHeader => {
126 self.consume_header_block(header_end);
127 return Err(FramingError::InvalidHeader);
128 }
129 };
130
131 if length > MAX_FRAME_SIZE {
132 self.consume_header_block(header_end);
133 return Err(FramingError::FrameTooLarge { len: length });
134 }
135
136 let body_start = header_end + HEADER_END.len();
137 let Some(body_end) = body_start.checked_add(length) else {
138 self.consume_header_block(header_end);
139 return Err(FramingError::InvalidContentLength);
140 };
141
142 if self.buf.len() < body_end {
143 return Ok(None);
144 }
145
146 let body = self.buf[body_start..body_end].to_vec();
147 self.buf.drain(..body_end);
148 self.resync_if_needed();
149 Ok(Some(body))
150 }
151
152 fn consume_header_block(&mut self, header_end: usize) {
153 let drain_to = (header_end + HEADER_END.len()).min(self.buf.len());
154 self.buf.drain(..drain_to);
155 self.resync_if_needed();
156 }
157
158 fn resync_if_needed(&mut self) {
159 match find_header_start(&self.buf) {
160 Some(0) => {}
161 Some(prefix_len) => {
162 self.buf.drain(..prefix_len);
163 }
164 None => {
165 if self.buf.len() > MAX_DESYNC_BUFFER_BYTES {
166 let keep = RESYNC_TAIL_BYTES.min(self.buf.len());
167 self.buf.drain(..self.buf.len() - keep);
168 }
169 }
170 }
171 }
172}
173
174#[must_use]
176pub fn frame(body: &[u8]) -> Vec<u8> {
177 let mut out = Vec::with_capacity(HEADER_SENTINEL.len() + 32 + HEADER_END.len() + body.len());
178 out.extend_from_slice(b"Content-Length: ");
179 out.extend_from_slice(body.len().to_string().as_bytes());
180 out.extend_from_slice(HEADER_END);
181 out.extend_from_slice(body);
182 out
183}
184
185enum ContentLengthParse {
186 Found(usize),
187 Missing,
188 Invalid,
189 MalformedHeader,
190}
191
192fn parse_content_length(header: &str) -> ContentLengthParse {
193 let mut found = None;
194 for line in header.split("\r\n") {
195 if line.is_empty() {
196 continue;
197 }
198
199 let Some((name, value)) = line.split_once(':') else {
200 return ContentLengthParse::MalformedHeader;
201 };
202
203 if name.trim().eq_ignore_ascii_case("Content-Length") {
204 match value.trim().parse::<usize>() {
205 Ok(length) => found = Some(length),
206 Err(_) => return ContentLengthParse::Invalid,
207 }
208 }
209 }
210
211 found.map_or(ContentLengthParse::Missing, ContentLengthParse::Found)
212}
213
214fn find_header_start(hay: &[u8]) -> Option<usize> {
215 hay.windows(HEADER_SENTINEL.len())
216 .position(|window| window.eq_ignore_ascii_case(HEADER_SENTINEL))
217}
218
219fn find_subslice(hay: &[u8], needle: &[u8]) -> Option<usize> {
220 if needle.is_empty() {
221 return Some(0);
222 }
223 hay.windows(needle.len()).position(|window| window == needle)
224}
225
226#[cfg(test)]
227mod tests {
228 use super::{ContentLengthFramer, FramingError, MAX_FRAME_SIZE, frame};
229
230 fn take_body(result: Result<Option<Vec<u8>>, FramingError>) -> Vec<u8> {
231 match result {
232 Ok(Some(body)) => body,
233 other => {
234 assert!(
235 matches!(other, Ok(Some(_))),
236 "expected Ok(Some(_)) from framer, got {other:?}"
237 );
238 Vec::new()
239 }
240 }
241 }
242
243 fn assert_pending(result: Result<Option<Vec<u8>>, FramingError>) {
244 assert!(matches!(result, Ok(None)), "expected Ok(None) from framer, got {result:?}");
245 }
246
247 fn take_error(result: Result<Option<Vec<u8>>, FramingError>) -> FramingError {
248 match result {
249 Err(error) => error,
250 other => {
251 assert!(other.is_err(), "expected Err(_) from framer, got {other:?}");
252 FramingError::InvalidHeader
253 }
254 }
255 }
256
257 #[test]
258 fn extracts_single_frame() {
259 let mut framer = ContentLengthFramer::new();
260 let body = br#"{"jsonrpc":"2.0","id":1}"#;
261
262 framer.push(&frame(body));
263 let got = take_body(framer.try_next());
264 assert_eq!(got, body);
265 assert_pending(framer.try_next());
266 }
267
268 #[test]
269 fn handles_split_header_and_body() {
270 let mut framer = ContentLengthFramer::new();
271 let body = br#"{"x":1}"#;
272 let msg = frame(body);
273
274 framer.push(&msg[..5]);
275 assert_pending(framer.try_next());
276
277 framer.push(&msg[5..msg.len() - 2]);
278 assert_pending(framer.try_next());
279
280 framer.push(&msg[msg.len() - 2..]);
281 let got = take_body(framer.try_next());
282 assert_eq!(got, body);
283 }
284
285 #[test]
286 fn extracts_multiple_frames_back_to_back() {
287 let mut framer = ContentLengthFramer::new();
288 let a = br#"{"a":1}"#;
289 let b = br#"{"b":2}"#;
290 let mut combined = frame(a);
291 combined.extend_from_slice(&frame(b));
292
293 framer.push(&combined);
294 assert_eq!(take_body(framer.try_next()), a);
295 assert_eq!(take_body(framer.try_next()), b);
296 assert_pending(framer.try_next());
297 }
298
299 #[test]
300 fn drains_garbage_prefix_before_header() {
301 let mut framer = ContentLengthFramer::new();
302 let body = br#"{"ok":true}"#;
303 let mut msg = b"junkjunk".to_vec();
304 msg.extend_from_slice(&frame(body));
305
306 framer.push(&msg);
307 assert_eq!(take_body(framer.try_next()), body);
308 }
309
310 #[test]
311 fn rejects_non_numeric_content_length() {
312 let mut framer = ContentLengthFramer::new();
313 framer.push(b"Content-Length: nope\r\n\r\n{}");
314
315 let err = take_error(framer.try_next());
316 assert_eq!(err, FramingError::InvalidContentLength);
317 }
318
319 #[test]
320 fn rejects_missing_content_length() {
321 let mut framer = ContentLengthFramer::new();
322 framer.push(b"Content-Type: application/json\r\n\r\n{}");
323
324 let err = take_error(framer.try_next());
325 assert_eq!(err, FramingError::MissingContentLength);
326 }
327
328 #[test]
329 fn rejects_invalid_utf8_in_header() {
330 let mut framer = ContentLengthFramer::new();
331 framer.push(b"Content-Length: 2\r\nX-Test: \xFF\r\n\r\n{}");
332
333 let err = take_error(framer.try_next());
334 assert_eq!(err, FramingError::InvalidHeaderUtf8);
335 }
336
337 #[test]
338 fn rejects_oversized_frame() {
339 let mut framer = ContentLengthFramer::new();
340 let too_large = MAX_FRAME_SIZE + 1;
341 let header = format!("Content-Length: {too_large}\r\n\r\n");
342 framer.push(header.as_bytes());
343
344 let err = take_error(framer.try_next());
345 assert_eq!(err, FramingError::FrameTooLarge { len: too_large });
346 }
347
348 #[test]
349 fn supports_case_insensitive_header_name() {
350 let mut framer = ContentLengthFramer::new();
351 let body = br#"{"ok":1}"#;
352 let msg = format!(
353 "content-length: {}\r\n\r\n{}",
354 body.len(),
355 std::str::from_utf8(body).unwrap_or("")
356 );
357 framer.push(msg.as_bytes());
358
359 let got = take_body(framer.try_next());
360 assert_eq!(got, body);
361 }
362}