use crate::stream::{BoxStream, Flow, NotUsed};
use crate::{StreamError, StreamResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FramingByteOrder {
BigEndian,
LittleEndian,
}
#[derive(Clone)]
enum Terminal {
Complete,
Error(StreamError),
}
fn sticky_terminal<T>(terminal: &Terminal) -> Option<StreamResult<T>> {
match terminal {
Terminal::Complete => None,
Terminal::Error(error) => Some(Err(error.clone())),
}
}
pub struct Framing;
impl Framing {
#[must_use]
pub fn delimiter(
delimiter: Vec<u8>,
maximum_frame_length: usize,
allow_truncation: bool,
) -> Flow<Vec<u8>, Vec<u8>> {
assert!(
!delimiter.is_empty(),
"delimiter must contain at least one byte"
);
assert!(
maximum_frame_length > 0,
"maximum frame length must be greater than zero"
);
Flow::from_transform(move |input| {
Box::new(DelimiterFramingStream::new(
input,
delimiter.clone(),
maximum_frame_length,
allow_truncation,
)) as BoxStream<Vec<u8>>
})
}
#[must_use]
pub fn length_field(
field_length: usize,
field_offset: usize,
maximum_frame_length: usize,
byte_order: FramingByteOrder,
) -> Flow<Vec<u8>, Vec<u8>> {
assert!(
(1..=4).contains(&field_length),
"Length field length must be 1, 2, 3 or 4."
);
assert!(
maximum_frame_length > 0,
"maximum frame length must be greater than zero"
);
Flow::from_transform(move |input| {
Box::new(LengthFieldFramingStream::new(
input,
field_length,
field_offset,
maximum_frame_length,
byte_order,
)) as BoxStream<Vec<u8>>
})
}
#[must_use]
pub fn json(maximum_object_length: usize) -> Flow<Vec<u8>, Vec<u8>, NotUsed> {
assert!(
maximum_object_length > 0,
"maximum object length must be greater than zero"
);
Flow::from_transform(move |input| {
Box::new(JsonFramingStream::new(input, maximum_object_length)) as BoxStream<Vec<u8>>
})
}
}
struct DelimiterFramingStream {
input: BoxStream<Vec<u8>>,
delimiter: Vec<u8>,
maximum_frame_length: usize,
allow_truncation: bool,
buffer: Vec<u8>,
terminal: Option<Terminal>,
}
impl DelimiterFramingStream {
fn new(
input: BoxStream<Vec<u8>>,
delimiter: Vec<u8>,
maximum_frame_length: usize,
allow_truncation: bool,
) -> Self {
Self {
input,
delimiter,
maximum_frame_length,
allow_truncation,
buffer: Vec::new(),
terminal: None,
}
}
fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
self.terminal = Some(Terminal::Error(error.clone()));
Some(Err(error))
}
fn delimiter_position(&self) -> Option<usize> {
self.buffer
.windows(self.delimiter.len())
.position(|window| window == self.delimiter.as_slice())
}
fn trailing_delimiter_prefix_len(&self) -> usize {
let max_prefix = self
.delimiter
.len()
.saturating_sub(1)
.min(self.buffer.len());
(1..=max_prefix)
.rev()
.find(|&prefix_len| self.buffer.ends_with(&self.delimiter[..prefix_len]))
.unwrap_or(0)
}
}
impl Iterator for DelimiterFramingStream {
type Item = StreamResult<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(terminal) = &self.terminal {
return sticky_terminal(terminal);
}
loop {
if let Some(position) = self.delimiter_position() {
if position > self.maximum_frame_length {
return self.fail(StreamError::Failed(format!(
"Read {position} bytes which is more than {} without seeing a line terminator",
self.maximum_frame_length
)));
}
let frame = self.buffer[..position].to_vec();
self.buffer.drain(..position + self.delimiter.len());
return Some(Ok(frame));
}
let trailing_partial = self.trailing_delimiter_prefix_len();
let unmatched = self.buffer.len() - trailing_partial;
if unmatched > self.maximum_frame_length {
return self.fail(StreamError::Failed(format!(
"Read {} bytes which is more than {} without seeing a line terminator",
unmatched, self.maximum_frame_length
)));
}
match self.input.next() {
Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
Some(Err(error)) => {
self.terminal = Some(Terminal::Error(error.clone()));
return Some(Err(error));
}
None => {
if self.buffer.is_empty() {
self.terminal = Some(Terminal::Complete);
return None;
}
if self.allow_truncation {
let frame = std::mem::take(&mut self.buffer);
self.terminal = Some(Terminal::Complete);
return Some(Ok(frame));
}
return self.fail(StreamError::Failed(
"Stream finished but there was a truncated final frame in the buffer"
.to_owned(),
));
}
}
}
}
}
struct LengthFieldFramingStream {
input: BoxStream<Vec<u8>>,
field_length: usize,
field_offset: usize,
minimum_chunk_size: usize,
maximum_frame_length: usize,
byte_order: FramingByteOrder,
buffer: Vec<u8>,
frame_size: Option<usize>,
terminal: Option<Terminal>,
}
impl LengthFieldFramingStream {
fn new(
input: BoxStream<Vec<u8>>,
field_length: usize,
field_offset: usize,
maximum_frame_length: usize,
byte_order: FramingByteOrder,
) -> Self {
let minimum_chunk_size = field_offset + field_length;
Self {
input,
field_length,
field_offset,
minimum_chunk_size,
maximum_frame_length,
byte_order,
buffer: Vec::new(),
frame_size: None,
terminal: None,
}
}
fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
self.terminal = Some(Terminal::Error(error.clone()));
Some(Err(error))
}
fn parse_length(&self) -> i32 {
let bytes = &self.buffer[self.field_offset..self.field_offset + self.field_length];
match (self.byte_order, self.field_length) {
(FramingByteOrder::BigEndian, 1) => i32::from(bytes[0]),
(FramingByteOrder::BigEndian, 2) => i32::from(u16::from_be_bytes([bytes[0], bytes[1]])),
(FramingByteOrder::BigEndian, 3) => {
((i32::from(bytes[0])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[2])
}
(FramingByteOrder::BigEndian, 4) => {
i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
}
(FramingByteOrder::LittleEndian, 1) => i32::from(bytes[0]),
(FramingByteOrder::LittleEndian, 2) => {
i32::from(u16::from_le_bytes([bytes[0], bytes[1]]))
}
(FramingByteOrder::LittleEndian, 3) => {
((i32::from(bytes[2])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[0])
}
(FramingByteOrder::LittleEndian, 4) => {
i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
}
_ => unreachable!("field length validated at construction"),
}
}
}
impl Iterator for LengthFieldFramingStream {
type Item = StreamResult<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(terminal) = &self.terminal {
return sticky_terminal(terminal);
}
loop {
if let Some(frame_size) = self.frame_size {
if self.buffer.len() >= frame_size {
let frame = self.buffer[..frame_size].to_vec();
self.buffer.drain(..frame_size);
self.frame_size = None;
return Some(Ok(frame));
}
} else if self.buffer.len() >= self.minimum_chunk_size {
let parsed_length = self.parse_length();
if parsed_length < 0 {
return self.fail(StreamError::Failed(format!(
"Decoded frame header reported negative size {parsed_length}"
)));
}
let frame_size = parsed_length as usize + self.minimum_chunk_size;
if frame_size > self.maximum_frame_length {
return self.fail(StreamError::Failed(format!(
"Maximum allowed frame size is {} but decoded frame header reported size {frame_size}",
self.maximum_frame_length
)));
}
if frame_size < self.minimum_chunk_size {
return self.fail(StreamError::Failed(format!(
"Computed frame size {frame_size} is less than minimum chunk size {}",
self.minimum_chunk_size
)));
}
self.frame_size = Some(frame_size);
continue;
}
match self.input.next() {
Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
Some(Err(error)) => {
self.terminal = Some(Terminal::Error(error.clone()));
return Some(Err(error));
}
None => {
if self.buffer.is_empty() {
self.terminal = Some(Terminal::Complete);
return None;
}
return self.fail(StreamError::Failed(
"Stream finished but there was a truncated final frame in the buffer"
.to_owned(),
));
}
}
}
}
}
struct JsonFramingStream {
input: BoxStream<Vec<u8>>,
maximum_object_length: usize,
buffer: Vec<u8>,
pos: usize,
start: usize,
depth: usize,
completed_object: bool,
in_string_expression: bool,
in_backslash_escape: bool,
terminal: Option<Terminal>,
}
const OUTER_OBJECT: u8 = 2; const OUTER_SKIP: u8 = 1; const OUTER_ERROR: u8 = 0;
static OUTER_CHARS: [u8; 256] = {
let mut table = [OUTER_ERROR; 256];
table[b'{' as usize] = OUTER_OBJECT;
table[b'[' as usize] = OUTER_SKIP;
table[b']' as usize] = OUTER_SKIP;
table[b',' as usize] = OUTER_SKIP;
table[b' ' as usize] = OUTER_SKIP;
table[b'\n' as usize] = OUTER_SKIP;
table[b'\r' as usize] = OUTER_SKIP;
table[b'\t' as usize] = OUTER_SKIP;
table
};
static INNER_INTERESTING: [u8; 256] = {
let mut table = [0u8; 256];
table[b'"' as usize] = 1;
table[b'{' as usize] = 1;
table[b'}' as usize] = 1;
table
};
static STRING_INTERESTING: [u8; 256] = {
let mut table = [0u8; 256];
table[b'"' as usize] = 1;
table[b'\\' as usize] = 1;
table
};
impl JsonFramingStream {
fn new(input: BoxStream<Vec<u8>>, maximum_object_length: usize) -> Self {
Self {
input,
maximum_object_length,
buffer: Vec::with_capacity(maximum_object_length.min(4096)),
pos: 0,
start: 0,
depth: 0,
completed_object: false,
in_string_expression: false,
in_backslash_escape: false,
terminal: None,
}
}
fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
self.terminal = Some(Terminal::Error(error.clone()));
Some(Err(error))
}
fn can_complete(&self) -> bool {
self.depth == 0
}
fn compact(&mut self) {
if self.start == 0 {
return;
}
if self.start >= self.buffer.len() {
self.buffer.clear();
} else {
self.buffer.drain(..self.start);
}
self.pos -= self.start;
self.start = 0;
}
fn skip_to_next_object(&mut self) -> StreamResult<()> {
if self.depth > 0 {
return Ok(());
}
let max = self.buffer.len();
let limit = self.maximum_object_length;
let mut pos = self.pos;
let mut start = self.start;
while pos < max && pos - start < limit {
let outer = OUTER_CHARS[self.buffer[pos] as usize];
if outer == OUTER_SKIP {
start += 1;
pos += 1;
} else if outer == OUTER_OBJECT {
self.start = start;
self.pos = pos + 1;
self.depth = 1;
return Ok(());
} else {
return Err(StreamError::Failed(format!(
"Invalid JSON encountered at position [{}] of [{}]",
self.start,
String::from_utf8_lossy(&self.buffer)
)));
}
}
self.start = start;
self.pos = pos;
Ok(())
}
fn scan_object(&mut self) -> StreamResult<()> {
let max = self.buffer.len();
let limit = self.maximum_object_length;
let mut pos = self.pos;
let mut depth = self.depth;
let mut in_string = self.in_string_expression;
let mut in_escape = self.in_backslash_escape;
let start = self.start;
let mut completed = false;
while pos < max && pos - start < limit {
let byte = self.buffer[pos];
if in_string {
if in_escape {
in_escape = false;
} else if STRING_INTERESTING[byte as usize] != 0 {
if byte == b'"' {
in_string = false;
} else {
in_escape = true;
}
}
} else {
if INNER_INTERESTING[byte as usize] == 0 {
pos += 1;
continue;
}
match byte {
b'"' => in_string = true,
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
pos += 1;
completed = true;
break;
}
}
_ => {}
}
}
pos += 1;
}
self.pos = pos;
self.depth = depth;
self.in_string_expression = in_string;
self.in_backslash_escape = in_escape;
self.completed_object = completed;
Ok(())
}
fn poll_object(&mut self) -> StreamResult<Option<Vec<u8>>> {
self.completed_object = false;
self.skip_to_next_object()?;
self.scan_object()?;
if self.pos.saturating_sub(self.start) >= self.maximum_object_length {
return Err(StreamError::Failed(format!(
"JSON element exceeded maximumObjectLength ({} bytes)!",
self.maximum_object_length
)));
}
if self.completed_object && self.start < self.pos {
let frame = self.buffer[self.start..self.pos].to_vec();
self.start = self.pos;
return Ok(Some(frame));
}
Ok(None)
}
}
impl Iterator for JsonFramingStream {
type Item = StreamResult<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(terminal) = &self.terminal {
return sticky_terminal(terminal);
}
loop {
match self.poll_object() {
Ok(Some(frame)) => return Some(Ok(frame)),
Ok(None) => {}
Err(error) => return self.fail(error),
}
match self.input.next() {
Some(Ok(chunk)) => {
if self.start > 0 && self.buffer.capacity() - self.buffer.len() < chunk.len() {
let tail = self.buffer.len() - self.start;
if tail + chunk.len() <= self.buffer.capacity() {
self.compact();
}
}
self.buffer.extend_from_slice(&chunk);
}
Some(Err(error)) => {
self.terminal = Some(Terminal::Error(error.clone()));
return Some(Err(error));
}
None => {
if self.start >= self.buffer.len() {
self.terminal = Some(Terminal::Complete);
return None;
}
if self.can_complete() {
self.start = 0;
self.terminal = Some(Terminal::Complete);
return None;
}
return self.fail(StreamError::Failed(
"Stream finished but there was a truncated final frame in the buffer"
.to_owned(),
));
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testkit::{TestSink, TestSource};
use crate::{Keep, Source};
#[test]
fn delimiter_framing_handles_split_frames() {
let sink = Source::from_iter([b"ab|c".to_vec(), b"d|".to_vec()])
.via(Framing::delimiter(b"|".to_vec(), 16, false))
.run_with(TestSink::probe())
.expect("delimiter framing materializes");
sink.request(3);
sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
sink.expect_complete();
}
#[test]
fn delimiter_framing_fails_on_max_length_exceeded() {
let sink = Source::single(b"abcdef".to_vec())
.via(Framing::delimiter(b"|".to_vec(), 3, false))
.run_with(TestSink::probe())
.expect("delimiter framing materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed(
"Read 6 bytes which is more than 3 without seeing a line terminator".to_owned()
)
);
}
#[test]
fn delimiter_framing_accepts_max_length_frame_with_split_partial_delimiter() {
let frame = vec![b'a'; 64];
let first = [frame.clone(), vec![b'\r']].concat();
let second = vec![b'\n'];
let sink = Source::from_iter([first, second])
.via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
.run_with(TestSink::probe())
.expect("delimiter framing materializes");
sink.request(2);
sink.assert_next(frame);
sink.expect_complete();
}
#[test]
fn delimiter_framing_still_fails_when_max_length_is_exceeded_without_partial_match() {
let sink = Source::single(vec![b'a'; 65])
.via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
.run_with(TestSink::probe())
.expect("delimiter framing materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed(
"Read 65 bytes which is more than 64 without seeing a line terminator".to_owned()
)
);
}
#[test]
fn delimiter_framing_fails_on_truncated_eof() {
let sink = Source::single(b"abc".to_vec())
.via(Framing::delimiter(b"|".to_vec(), 8, false))
.run_with(TestSink::probe())
.expect("delimiter framing materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed(
"Stream finished but there was a truncated final frame in the buffer".to_owned()
)
);
}
#[test]
fn delimiter_framing_allows_truncation_when_enabled() {
let sink = Source::single(b"abc".to_vec())
.via(Framing::delimiter(b"|".to_vec(), 8, true))
.run_with(TestSink::probe())
.expect("delimiter framing materializes");
sink.request(2);
sink.assert_next(b"abc".to_vec());
sink.expect_complete();
}
#[test]
fn length_field_framing_handles_split_headers_and_payloads() {
let frame = [0_u8, 0_u8, 0_u8, 2_u8, b'o', b'k'];
let sink = Source::from_iter([frame[..3].to_vec(), frame[3..].to_vec()])
.via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
.run_with(TestSink::probe())
.expect("length field framing materializes");
sink.request(2);
sink.assert_next(frame.to_vec());
sink.expect_complete();
}
#[test]
fn length_field_framing_fails_on_max_length_exceeded() {
let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 8_u8])
.via(Framing::length_field(4, 0, 6, FramingByteOrder::BigEndian))
.run_with(TestSink::probe())
.expect("length field framing materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed(
"Maximum allowed frame size is 6 but decoded frame header reported size 12"
.to_owned()
)
);
}
#[test]
fn length_field_framing_fails_on_truncated_eof() {
let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 2_u8, b'o'])
.via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
.run_with(TestSink::probe())
.expect("length field framing materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed(
"Stream finished but there was a truncated final frame in the buffer".to_owned()
)
);
}
#[test]
fn length_field_framing_treats_two_byte_big_endian_lengths_as_unsigned() {
let payload = vec![b'x'; 0x8000];
let mut frame = vec![0x80, 0x00];
frame.extend_from_slice(&payload);
let sink = Source::single(frame.clone())
.via(Framing::length_field(
2,
0,
frame.len(),
FramingByteOrder::BigEndian,
))
.run_with(TestSink::probe())
.expect("length field framing materializes");
sink.request(2);
sink.assert_next(frame);
sink.expect_complete();
}
#[test]
fn length_field_framing_treats_two_byte_little_endian_lengths_as_unsigned() {
let payload = vec![b'y'; 0x8000];
let mut frame = vec![0x00, 0x80];
frame.extend_from_slice(&payload);
let sink = Source::single(frame.clone())
.via(Framing::length_field(
2,
0,
frame.len(),
FramingByteOrder::LittleEndian,
))
.run_with(TestSink::probe())
.expect("length field framing materializes");
sink.request(2);
sink.assert_next(frame);
sink.expect_complete();
}
#[test]
fn length_field_framing_keeps_four_byte_signed_overflow_behavior() {
let sink = Source::single(vec![0x80, 0x00, 0x00, 0x00])
.via(Framing::length_field(
4,
0,
usize::MAX,
FramingByteOrder::BigEndian,
))
.run_with(TestSink::probe())
.expect("length field framing materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed(
"Decoded frame header reported negative size -2147483648".to_owned()
)
);
}
#[test]
fn json_framing_extracts_objects_split_across_chunks() {
let sink = Source::from_iter([b"[{\"a\":1},".to_vec(), b"{\"b\":2}]".to_vec()])
.via(Framing::json(64))
.run_with(TestSink::probe())
.expect("json framing materializes");
sink.request(3);
sink.assert_next_n([b"{\"a\":1}".to_vec(), b"{\"b\":2}".to_vec()]);
sink.expect_complete();
}
#[test]
fn json_framing_fails_on_max_length_exceeded() {
let sink = Source::single(b"{\"abcdef\":1}".to_vec())
.via(Framing::json(4))
.run_with(TestSink::probe())
.expect("json framing materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed("JSON element exceeded maximumObjectLength (4 bytes)!".to_owned())
);
}
#[test]
fn json_framing_fails_on_truncated_eof() {
let sink = Source::single(b"{\"a\":".to_vec())
.via(Framing::json(32))
.run_with(TestSink::probe())
.expect("json framing materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed(
"Stream finished but there was a truncated final frame in the buffer".to_owned()
)
);
}
#[test]
fn framing_preserves_upstream_errors() {
let (source, sink) = TestSource::probe::<Vec<u8>>()
.via(Framing::delimiter(b"|".to_vec(), 32, false))
.to_mat(TestSink::probe(), Keep::both)
.run()
.expect("probe framing materializes");
sink.request(1);
source.send_error(StreamError::Failed("boom".to_owned()));
assert_eq!(sink.expect_error(), StreamError::Failed("boom".to_owned()));
}
#[test]
fn json_framing_no_bleed_across_buffer_reuse() {
use crate::Sink;
let mut payload = Vec::new();
payload.push(b'[');
for index in 0..128_usize {
if index > 0 {
payload.push(b',');
}
payload
.extend_from_slice(format!("{{\"id\":{index},\"pad\":\"xxxxxxxx\"}}").as_bytes());
}
payload.push(b']');
let chunks: Vec<Vec<u8>> = payload.chunks(193).map(|c| c.to_vec()).collect();
assert!(
chunks.len() > 4,
"chunks must split objects across boundaries"
);
let frames = Source::from_iter(chunks)
.via(Framing::json(256))
.run_with(Sink::collect())
.expect("json framing materializes")
.wait()
.expect("json framing completes");
assert_eq!(frames.len(), 128);
for (index, frame) in frames.iter().enumerate() {
let text = std::str::from_utf8(frame).expect("frame is valid utf-8");
let needle = format!("\"id\":{index}");
assert!(
text.contains(&needle),
"frame {index} missing {needle}: {text}"
);
assert_eq!(text.as_bytes()[0], b'{');
assert_eq!(text.as_bytes()[text.len() - 1], b'}');
}
}
#[test]
fn json_framing_compacts_buffer_at_chunk_boundaries() {
use crate::Sink;
let object = br#"{"id":1,"name":"x"}"#;
let mut payload = Vec::new();
payload.push(b'[');
for _ in 0..32 {
payload.push(b',');
payload.extend_from_slice(object);
}
payload.push(b']');
let chunks: Vec<Vec<u8>> = payload.iter().copied().map(|b| vec![b]).collect();
let frames = Source::from_iter(chunks)
.via(Framing::json(64))
.run_with(Sink::collect())
.expect("json framing materializes")
.wait()
.expect("json framing completes");
assert_eq!(frames.len(), 32);
for frame in &frames {
assert_eq!(frame.as_slice(), object);
}
}
}