Skip to main content

datum/io/
framing.rs

1use crate::stream::{BoxStream, Flow, NotUsed};
2use crate::{StreamError, StreamResult};
3
4/// Byte order used by [`Framing::length_field`] to interpret the length header.
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum FramingByteOrder {
7    /// Most-significant byte first.
8    BigEndian,
9    /// Least-significant byte first.
10    LittleEndian,
11}
12
13#[derive(Clone)]
14enum Terminal {
15    Complete,
16    Error(StreamError),
17}
18
19fn sticky_terminal<T>(terminal: &Terminal) -> Option<StreamResult<T>> {
20    match terminal {
21        Terminal::Complete => None,
22        Terminal::Error(error) => Some(Err(error.clone())),
23    }
24}
25
26/// Byte-stream framing flows: turn a stream of arbitrary `Vec<u8>` chunks into a stream of
27/// complete logical frames. Mirrors Akka's `Framing`/`JsonFraming`.
28pub struct Framing;
29
30impl Framing {
31    /// Splits the byte stream on `delimiter`, stripping the delimiter from each emitted frame.
32    ///
33    /// Fails the stream once `maximum_frame_length` bytes accumulate without a delimiter. When the
34    /// stream ends mid-frame, `allow_truncation = true` emits the trailing bytes as a final frame;
35    /// `false` fails with [`StreamError`]. Panics if `delimiter` is empty or
36    /// `maximum_frame_length == 0`.
37    #[must_use]
38    pub fn delimiter(
39        delimiter: Vec<u8>,
40        maximum_frame_length: usize,
41        allow_truncation: bool,
42    ) -> Flow<Vec<u8>, Vec<u8>> {
43        assert!(
44            !delimiter.is_empty(),
45            "delimiter must contain at least one byte"
46        );
47        assert!(
48            maximum_frame_length > 0,
49            "maximum frame length must be greater than zero"
50        );
51        Flow::from_transform(move |input| {
52            Box::new(DelimiterFramingStream::new(
53                input,
54                delimiter.clone(),
55                maximum_frame_length,
56                allow_truncation,
57            )) as BoxStream<Vec<u8>>
58        })
59    }
60
61    /// Splits length-prefixed frames. The header is `field_length` bytes (1–4) at `field_offset`,
62    /// interpreted per `byte_order`; the emitted frame includes the header and the payload.
63    ///
64    /// The decoded length is signed (`i32`, matching Akka): a 4-byte header with the high bit set
65    /// decodes negative and fails the stream. Fails when a decoded frame would exceed
66    /// `maximum_frame_length`. Panics if `field_length` is not in `1..=4` or
67    /// `maximum_frame_length == 0`.
68    #[must_use]
69    pub fn length_field(
70        field_length: usize,
71        field_offset: usize,
72        maximum_frame_length: usize,
73        byte_order: FramingByteOrder,
74    ) -> Flow<Vec<u8>, Vec<u8>> {
75        assert!(
76            (1..=4).contains(&field_length),
77            "Length field length must be 1, 2, 3 or 4."
78        );
79        assert!(
80            maximum_frame_length > 0,
81            "maximum frame length must be greater than zero"
82        );
83        Flow::from_transform(move |input| {
84            Box::new(LengthFieldFramingStream::new(
85                input,
86                field_length,
87                field_offset,
88                maximum_frame_length,
89                byte_order,
90            )) as BoxStream<Vec<u8>>
91        })
92    }
93
94    /// Extracts top-level JSON objects from a concatenated byte stream, handling objects split
95    /// across arbitrary chunk boundaries.
96    ///
97    /// Advances past outer array brackets and commas, so both `{...}{...}` and `[{...},{...}]`
98    /// inputs work. Fails the stream if a single object exceeds `maximum_object_length`. Panics if
99    /// `maximum_object_length == 0`.
100    #[must_use]
101    pub fn json(maximum_object_length: usize) -> Flow<Vec<u8>, Vec<u8>, NotUsed> {
102        assert!(
103            maximum_object_length > 0,
104            "maximum object length must be greater than zero"
105        );
106        Flow::from_transform(move |input| {
107            Box::new(JsonFramingStream::new(input, maximum_object_length)) as BoxStream<Vec<u8>>
108        })
109    }
110}
111
112struct DelimiterFramingStream {
113    input: BoxStream<Vec<u8>>,
114    delimiter: Vec<u8>,
115    maximum_frame_length: usize,
116    allow_truncation: bool,
117    buffer: Vec<u8>,
118    terminal: Option<Terminal>,
119}
120
121impl DelimiterFramingStream {
122    fn new(
123        input: BoxStream<Vec<u8>>,
124        delimiter: Vec<u8>,
125        maximum_frame_length: usize,
126        allow_truncation: bool,
127    ) -> Self {
128        Self {
129            input,
130            delimiter,
131            maximum_frame_length,
132            allow_truncation,
133            buffer: Vec::new(),
134            terminal: None,
135        }
136    }
137
138    fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
139        self.terminal = Some(Terminal::Error(error.clone()));
140        Some(Err(error))
141    }
142
143    fn delimiter_position(&self) -> Option<usize> {
144        self.buffer
145            .windows(self.delimiter.len())
146            .position(|window| window == self.delimiter.as_slice())
147    }
148
149    fn trailing_delimiter_prefix_len(&self) -> usize {
150        let max_prefix = self
151            .delimiter
152            .len()
153            .saturating_sub(1)
154            .min(self.buffer.len());
155        (1..=max_prefix)
156            .rev()
157            .find(|&prefix_len| self.buffer.ends_with(&self.delimiter[..prefix_len]))
158            .unwrap_or(0)
159    }
160}
161
162impl Iterator for DelimiterFramingStream {
163    type Item = StreamResult<Vec<u8>>;
164
165    fn next(&mut self) -> Option<Self::Item> {
166        if let Some(terminal) = &self.terminal {
167            return sticky_terminal(terminal);
168        }
169
170        loop {
171            if let Some(position) = self.delimiter_position() {
172                if position > self.maximum_frame_length {
173                    return self.fail(StreamError::Failed(format!(
174                        "Read {position} bytes which is more than {} without seeing a line terminator",
175                        self.maximum_frame_length
176                    )));
177                }
178                let frame = self.buffer[..position].to_vec();
179                self.buffer.drain(..position + self.delimiter.len());
180                return Some(Ok(frame));
181            }
182
183            // Match Akka's Framing delimiter stage here: a trailing partial delimiter
184            // (Framing.scala lines 262-284) may still complete on the next pull, so only
185            // bytes before that partial match count toward the max-length failure.
186            let trailing_partial = self.trailing_delimiter_prefix_len();
187            let unmatched = self.buffer.len() - trailing_partial;
188            if unmatched > self.maximum_frame_length {
189                return self.fail(StreamError::Failed(format!(
190                    "Read {} bytes which is more than {} without seeing a line terminator",
191                    unmatched, self.maximum_frame_length
192                )));
193            }
194
195            match self.input.next() {
196                Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
197                Some(Err(error)) => {
198                    self.terminal = Some(Terminal::Error(error.clone()));
199                    return Some(Err(error));
200                }
201                None => {
202                    if self.buffer.is_empty() {
203                        self.terminal = Some(Terminal::Complete);
204                        return None;
205                    }
206                    if self.allow_truncation {
207                        let frame = std::mem::take(&mut self.buffer);
208                        self.terminal = Some(Terminal::Complete);
209                        return Some(Ok(frame));
210                    }
211                    return self.fail(StreamError::Failed(
212                        "Stream finished but there was a truncated final frame in the buffer"
213                            .to_owned(),
214                    ));
215                }
216            }
217        }
218    }
219}
220
221struct LengthFieldFramingStream {
222    input: BoxStream<Vec<u8>>,
223    field_length: usize,
224    field_offset: usize,
225    minimum_chunk_size: usize,
226    maximum_frame_length: usize,
227    byte_order: FramingByteOrder,
228    buffer: Vec<u8>,
229    frame_size: Option<usize>,
230    terminal: Option<Terminal>,
231}
232
233impl LengthFieldFramingStream {
234    fn new(
235        input: BoxStream<Vec<u8>>,
236        field_length: usize,
237        field_offset: usize,
238        maximum_frame_length: usize,
239        byte_order: FramingByteOrder,
240    ) -> Self {
241        let minimum_chunk_size = field_offset + field_length;
242        Self {
243            input,
244            field_length,
245            field_offset,
246            minimum_chunk_size,
247            maximum_frame_length,
248            byte_order,
249            buffer: Vec::new(),
250            frame_size: None,
251            terminal: None,
252        }
253    }
254
255    fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
256        self.terminal = Some(Terminal::Error(error.clone()));
257        Some(Err(error))
258    }
259
260    fn parse_length(&self) -> i32 {
261        let bytes = &self.buffer[self.field_offset..self.field_offset + self.field_length];
262        match (self.byte_order, self.field_length) {
263            (FramingByteOrder::BigEndian, 1) => i32::from(bytes[0]),
264            (FramingByteOrder::BigEndian, 2) => i32::from(u16::from_be_bytes([bytes[0], bytes[1]])),
265            (FramingByteOrder::BigEndian, 3) => {
266                ((i32::from(bytes[0])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[2])
267            }
268            (FramingByteOrder::BigEndian, 4) => {
269                i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
270            }
271            (FramingByteOrder::LittleEndian, 1) => i32::from(bytes[0]),
272            (FramingByteOrder::LittleEndian, 2) => {
273                i32::from(u16::from_le_bytes([bytes[0], bytes[1]]))
274            }
275            (FramingByteOrder::LittleEndian, 3) => {
276                ((i32::from(bytes[2])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[0])
277            }
278            (FramingByteOrder::LittleEndian, 4) => {
279                i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
280            }
281            _ => unreachable!("field length validated at construction"),
282        }
283    }
284}
285
286impl Iterator for LengthFieldFramingStream {
287    type Item = StreamResult<Vec<u8>>;
288
289    fn next(&mut self) -> Option<Self::Item> {
290        if let Some(terminal) = &self.terminal {
291            return sticky_terminal(terminal);
292        }
293
294        loop {
295            if let Some(frame_size) = self.frame_size {
296                if self.buffer.len() >= frame_size {
297                    let frame = self.buffer[..frame_size].to_vec();
298                    self.buffer.drain(..frame_size);
299                    self.frame_size = None;
300                    return Some(Ok(frame));
301                }
302            } else if self.buffer.len() >= self.minimum_chunk_size {
303                let parsed_length = self.parse_length();
304                if parsed_length < 0 {
305                    return self.fail(StreamError::Failed(format!(
306                        "Decoded frame header reported negative size {parsed_length}"
307                    )));
308                }
309                let frame_size = parsed_length as usize + self.minimum_chunk_size;
310                if frame_size > self.maximum_frame_length {
311                    return self.fail(StreamError::Failed(format!(
312                        "Maximum allowed frame size is {} but decoded frame header reported size {frame_size}",
313                        self.maximum_frame_length
314                    )));
315                }
316                if frame_size < self.minimum_chunk_size {
317                    return self.fail(StreamError::Failed(format!(
318                        "Computed frame size {frame_size} is less than minimum chunk size {}",
319                        self.minimum_chunk_size
320                    )));
321                }
322                self.frame_size = Some(frame_size);
323                continue;
324            }
325
326            match self.input.next() {
327                Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
328                Some(Err(error)) => {
329                    self.terminal = Some(Terminal::Error(error.clone()));
330                    return Some(Err(error));
331                }
332                None => {
333                    if self.buffer.is_empty() {
334                        self.terminal = Some(Terminal::Complete);
335                        return None;
336                    }
337                    return self.fail(StreamError::Failed(
338                        "Stream finished but there was a truncated final frame in the buffer"
339                            .to_owned(),
340                    ));
341                }
342            }
343        }
344    }
345}
346
347struct JsonFramingStream {
348    input: BoxStream<Vec<u8>>,
349    maximum_object_length: usize,
350    buffer: Vec<u8>,
351    pos: usize,
352    start: usize,
353    depth: usize,
354    completed_object: bool,
355    in_string_expression: bool,
356    in_backslash_escape: bool,
357    terminal: Option<Terminal>,
358}
359
360// Outer-level character class used by skip_to_next_object.
361const OUTER_OBJECT: u8 = 2; // b'{' — start of JSON object
362const OUTER_SKIP: u8 = 1; // b'[', b']', b',', whitespace — advance start
363const OUTER_ERROR: u8 = 0; // anything else — invalid JSON
364
365static OUTER_CHARS: [u8; 256] = {
366    let mut table = [OUTER_ERROR; 256];
367    table[b'{' as usize] = OUTER_OBJECT;
368    table[b'[' as usize] = OUTER_SKIP;
369    table[b']' as usize] = OUTER_SKIP;
370    table[b',' as usize] = OUTER_SKIP;
371    table[b' ' as usize] = OUTER_SKIP;
372    table[b'\n' as usize] = OUTER_SKIP;
373    table[b'\r' as usize] = OUTER_SKIP;
374    table[b'\t' as usize] = OUTER_SKIP;
375    table
376};
377
378// Inner-level "interesting" flag used by scan_object in non-string mode.
379// A byte is uninteresting (0) when it cannot change depth or enter a string:
380// anything other than `"`, `{`, or `}`. These bytes are skipped in a tight
381// inner loop without entering the full match, reducing branch pressure.
382static INNER_INTERESTING: [u8; 256] = {
383    let mut table = [0u8; 256];
384    table[b'"' as usize] = 1;
385    table[b'{' as usize] = 1;
386    table[b'}' as usize] = 1;
387    table
388};
389
390// String-mode "interesting" flag: only `"` (ends or is escaped by the prior
391// `\`) and `\` (starts escape). Every other byte in a string is a plain char
392// that carries no parser-state change when not in escape mode, so it can be
393// skipped without the `in_escape = false` store that the full match requires.
394static STRING_INTERESTING: [u8; 256] = {
395    let mut table = [0u8; 256];
396    table[b'"' as usize] = 1;
397    table[b'\\' as usize] = 1;
398    table
399};
400
401impl JsonFramingStream {
402    fn new(input: BoxStream<Vec<u8>>, maximum_object_length: usize) -> Self {
403        Self {
404            input,
405            maximum_object_length,
406            buffer: Vec::with_capacity(maximum_object_length.min(4096)),
407            pos: 0,
408            start: 0,
409            depth: 0,
410            completed_object: false,
411            in_string_expression: false,
412            in_backslash_escape: false,
413            terminal: None,
414        }
415    }
416
417    fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
418        self.terminal = Some(Terminal::Error(error.clone()));
419        Some(Err(error))
420    }
421
422    fn can_complete(&self) -> bool {
423        self.depth == 0
424    }
425
426    /// Compact the buffer in place, discarding bytes before `start`.
427    /// Called before extending with new input when the current slice
428    /// would otherwise force a reallocation.
429    fn compact(&mut self) {
430        if self.start == 0 {
431            return;
432        }
433        if self.start >= self.buffer.len() {
434            self.buffer.clear();
435        } else {
436            self.buffer.drain(..self.start);
437        }
438        self.pos -= self.start;
439        self.start = 0;
440    }
441
442    fn skip_to_next_object(&mut self) -> StreamResult<()> {
443        if self.depth > 0 {
444            return Ok(());
445        }
446        let max = self.buffer.len();
447        let limit = self.maximum_object_length;
448        let mut pos = self.pos;
449        let mut start = self.start;
450        while pos < max && pos - start < limit {
451            let outer = OUTER_CHARS[self.buffer[pos] as usize];
452            if outer == OUTER_SKIP {
453                start += 1;
454                pos += 1;
455            } else if outer == OUTER_OBJECT {
456                self.start = start;
457                self.pos = pos + 1;
458                self.depth = 1;
459                return Ok(());
460            } else {
461                return Err(StreamError::Failed(format!(
462                    "Invalid JSON encountered at position [{}] of [{}]",
463                    self.start,
464                    String::from_utf8_lossy(&self.buffer)
465                )));
466            }
467        }
468        self.start = start;
469        self.pos = pos;
470        Ok(())
471    }
472
473    fn scan_object(&mut self) -> StreamResult<()> {
474        let max = self.buffer.len();
475        let limit = self.maximum_object_length;
476        let mut pos = self.pos;
477        let mut depth = self.depth;
478        let mut in_string = self.in_string_expression;
479        let mut in_escape = self.in_backslash_escape;
480        let start = self.start;
481        let mut completed = false;
482
483        while pos < max && pos - start < limit {
484            let byte = self.buffer[pos];
485            if in_string {
486                if in_escape {
487                    // The escaped character is always a single byte; consume
488                    // it unconditionally and reset escape state.
489                    in_escape = false;
490                } else if STRING_INTERESTING[byte as usize] != 0 {
491                    // Only `"` and `\` require state changes in string mode.
492                    if byte == b'"' {
493                        in_string = false;
494                    } else {
495                        in_escape = true;
496                    }
497                }
498                // Uninteresting string bytes (the vast majority) take no
499                // state-change path: no match, no store. This eliminates the
500                // `in_escape = false` write for every plain character in a
501                // string value, the dominant cost in the original inner scan.
502            } else {
503                // Fast-skip uninteresting bytes in non-string mode: anything
504                // that cannot change depth or enter string state.
505                if INNER_INTERESTING[byte as usize] == 0 {
506                    pos += 1;
507                    continue;
508                }
509                match byte {
510                    b'"' => in_string = true,
511                    b'{' => depth += 1,
512                    b'}' => {
513                        depth -= 1;
514                        if depth == 0 {
515                            pos += 1;
516                            completed = true;
517                            break;
518                        }
519                    }
520                    _ => {}
521                }
522            }
523            pos += 1;
524        }
525
526        self.pos = pos;
527        self.depth = depth;
528        self.in_string_expression = in_string;
529        self.in_backslash_escape = in_escape;
530        self.completed_object = completed;
531        Ok(())
532    }
533
534    fn poll_object(&mut self) -> StreamResult<Option<Vec<u8>>> {
535        self.completed_object = false;
536        self.skip_to_next_object()?;
537        self.scan_object()?;
538
539        if self.pos.saturating_sub(self.start) >= self.maximum_object_length {
540            return Err(StreamError::Failed(format!(
541                "JSON element exceeded maximumObjectLength ({} bytes)!",
542                self.maximum_object_length
543            )));
544        }
545
546        if self.completed_object && self.start < self.pos {
547            let frame = self.buffer[self.start..self.pos].to_vec();
548            // Advance start lazily; the actual drain (compact) is deferred
549            // until the next chunk pull would otherwise force a realloc.
550            self.start = self.pos;
551            return Ok(Some(frame));
552        }
553
554        Ok(None)
555    }
556}
557
558impl Iterator for JsonFramingStream {
559    type Item = StreamResult<Vec<u8>>;
560
561    fn next(&mut self) -> Option<Self::Item> {
562        if let Some(terminal) = &self.terminal {
563            return sticky_terminal(terminal);
564        }
565
566        loop {
567            match self.poll_object() {
568                Ok(Some(frame)) => return Some(Ok(frame)),
569                Ok(None) => {}
570                Err(error) => return self.fail(error),
571            }
572
573            match self.input.next() {
574                Some(Ok(chunk)) => {
575                    // Compact before extending only when the new chunk
576                    // would not fit in remaining buffer capacity. This
577                    // avoids a memmove on every chunk; the cost is paid
578                    // only when compaction is needed to prevent realloc.
579                    if self.start > 0 && self.buffer.capacity() - self.buffer.len() < chunk.len() {
580                        let tail = self.buffer.len() - self.start;
581                        if tail + chunk.len() <= self.buffer.capacity() {
582                            self.compact();
583                        }
584                    }
585                    self.buffer.extend_from_slice(&chunk);
586                }
587                Some(Err(error)) => {
588                    self.terminal = Some(Terminal::Error(error.clone()));
589                    return Some(Err(error));
590                }
591                None => {
592                    if self.start >= self.buffer.len() {
593                        self.terminal = Some(Terminal::Complete);
594                        return None;
595                    }
596                    if self.can_complete() {
597                        self.start = 0;
598                        self.terminal = Some(Terminal::Complete);
599                        return None;
600                    }
601                    return self.fail(StreamError::Failed(
602                        "Stream finished but there was a truncated final frame in the buffer"
603                            .to_owned(),
604                    ));
605                }
606            }
607        }
608    }
609}
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614    use crate::testkit::{TestSink, TestSource};
615    use crate::{Keep, Source};
616
617    #[test]
618    fn delimiter_framing_handles_split_frames() {
619        let sink = Source::from_iter([b"ab|c".to_vec(), b"d|".to_vec()])
620            .via(Framing::delimiter(b"|".to_vec(), 16, false))
621            .run_with(TestSink::probe())
622            .expect("delimiter framing materializes");
623
624        sink.request(3);
625        sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
626        sink.expect_complete();
627    }
628
629    #[test]
630    fn delimiter_framing_fails_on_max_length_exceeded() {
631        let sink = Source::single(b"abcdef".to_vec())
632            .via(Framing::delimiter(b"|".to_vec(), 3, false))
633            .run_with(TestSink::probe())
634            .expect("delimiter framing materializes");
635
636        sink.request(1);
637        assert_eq!(
638            sink.expect_error(),
639            StreamError::Failed(
640                "Read 6 bytes which is more than 3 without seeing a line terminator".to_owned()
641            )
642        );
643    }
644
645    #[test]
646    fn delimiter_framing_accepts_max_length_frame_with_split_partial_delimiter() {
647        let frame = vec![b'a'; 64];
648        let first = [frame.clone(), vec![b'\r']].concat();
649        let second = vec![b'\n'];
650        let sink = Source::from_iter([first, second])
651            .via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
652            .run_with(TestSink::probe())
653            .expect("delimiter framing materializes");
654
655        sink.request(2);
656        sink.assert_next(frame);
657        sink.expect_complete();
658    }
659
660    #[test]
661    fn delimiter_framing_still_fails_when_max_length_is_exceeded_without_partial_match() {
662        let sink = Source::single(vec![b'a'; 65])
663            .via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
664            .run_with(TestSink::probe())
665            .expect("delimiter framing materializes");
666
667        sink.request(1);
668        assert_eq!(
669            sink.expect_error(),
670            StreamError::Failed(
671                "Read 65 bytes which is more than 64 without seeing a line terminator".to_owned()
672            )
673        );
674    }
675
676    #[test]
677    fn delimiter_framing_fails_on_truncated_eof() {
678        let sink = Source::single(b"abc".to_vec())
679            .via(Framing::delimiter(b"|".to_vec(), 8, false))
680            .run_with(TestSink::probe())
681            .expect("delimiter framing materializes");
682
683        sink.request(1);
684        assert_eq!(
685            sink.expect_error(),
686            StreamError::Failed(
687                "Stream finished but there was a truncated final frame in the buffer".to_owned()
688            )
689        );
690    }
691
692    #[test]
693    fn delimiter_framing_allows_truncation_when_enabled() {
694        let sink = Source::single(b"abc".to_vec())
695            .via(Framing::delimiter(b"|".to_vec(), 8, true))
696            .run_with(TestSink::probe())
697            .expect("delimiter framing materializes");
698
699        sink.request(2);
700        sink.assert_next(b"abc".to_vec());
701        sink.expect_complete();
702    }
703
704    #[test]
705    fn length_field_framing_handles_split_headers_and_payloads() {
706        let frame = [0_u8, 0_u8, 0_u8, 2_u8, b'o', b'k'];
707        let sink = Source::from_iter([frame[..3].to_vec(), frame[3..].to_vec()])
708            .via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
709            .run_with(TestSink::probe())
710            .expect("length field framing materializes");
711
712        sink.request(2);
713        sink.assert_next(frame.to_vec());
714        sink.expect_complete();
715    }
716
717    #[test]
718    fn length_field_framing_reads_header_at_non_zero_offset() {
719        let frame = [b'i', b'd', 0_u8, 3_u8, b'h', b'e', b'y'];
720        let sink = Source::from_iter([
721            frame[..1].to_vec(),
722            frame[1..3].to_vec(),
723            frame[3..].to_vec(),
724        ])
725        .via(Framing::length_field(2, 2, 16, FramingByteOrder::BigEndian))
726        .run_with(TestSink::probe())
727        .expect("length field framing materializes");
728
729        sink.request(2);
730        sink.assert_next(frame.to_vec());
731        sink.expect_complete();
732    }
733
734    #[test]
735    fn length_field_framing_fails_on_max_length_exceeded() {
736        let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 8_u8])
737            .via(Framing::length_field(4, 0, 6, FramingByteOrder::BigEndian))
738            .run_with(TestSink::probe())
739            .expect("length field framing materializes");
740
741        sink.request(1);
742        assert_eq!(
743            sink.expect_error(),
744            StreamError::Failed(
745                "Maximum allowed frame size is 6 but decoded frame header reported size 12"
746                    .to_owned()
747            )
748        );
749    }
750
751    #[test]
752    fn length_field_framing_fails_on_truncated_eof() {
753        let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 2_u8, b'o'])
754            .via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
755            .run_with(TestSink::probe())
756            .expect("length field framing materializes");
757
758        sink.request(1);
759        assert_eq!(
760            sink.expect_error(),
761            StreamError::Failed(
762                "Stream finished but there was a truncated final frame in the buffer".to_owned()
763            )
764        );
765    }
766
767    #[test]
768    fn length_field_framing_treats_two_byte_big_endian_lengths_as_unsigned() {
769        let payload = vec![b'x'; 0x8000];
770        let mut frame = vec![0x80, 0x00];
771        frame.extend_from_slice(&payload);
772        let sink = Source::single(frame.clone())
773            .via(Framing::length_field(
774                2,
775                0,
776                frame.len(),
777                FramingByteOrder::BigEndian,
778            ))
779            .run_with(TestSink::probe())
780            .expect("length field framing materializes");
781
782        sink.request(2);
783        sink.assert_next(frame);
784        sink.expect_complete();
785    }
786
787    #[test]
788    fn length_field_framing_treats_two_byte_little_endian_lengths_as_unsigned() {
789        let payload = vec![b'y'; 0x8000];
790        let mut frame = vec![0x00, 0x80];
791        frame.extend_from_slice(&payload);
792        let sink = Source::single(frame.clone())
793            .via(Framing::length_field(
794                2,
795                0,
796                frame.len(),
797                FramingByteOrder::LittleEndian,
798            ))
799            .run_with(TestSink::probe())
800            .expect("length field framing materializes");
801
802        sink.request(2);
803        sink.assert_next(frame);
804        sink.expect_complete();
805    }
806
807    #[test]
808    fn length_field_framing_reads_three_byte_big_endian_lengths() {
809        let frame = [0_u8, 0_u8, 2_u8, b'o', b'k'];
810        let sink = Source::single(frame.to_vec())
811            .via(Framing::length_field(3, 0, 8, FramingByteOrder::BigEndian))
812            .run_with(TestSink::probe())
813            .expect("length field framing materializes");
814
815        sink.request(2);
816        sink.assert_next(frame.to_vec());
817        sink.expect_complete();
818    }
819
820    #[test]
821    fn length_field_framing_reads_three_byte_little_endian_lengths() {
822        let frame = [2_u8, 0_u8, 0_u8, b'o', b'k'];
823        let sink = Source::single(frame.to_vec())
824            .via(Framing::length_field(
825                3,
826                0,
827                8,
828                FramingByteOrder::LittleEndian,
829            ))
830            .run_with(TestSink::probe())
831            .expect("length field framing materializes");
832
833        sink.request(2);
834        sink.assert_next(frame.to_vec());
835        sink.expect_complete();
836    }
837
838    #[test]
839    fn length_field_framing_keeps_four_byte_signed_overflow_behavior() {
840        let sink = Source::single(vec![0x80, 0x00, 0x00, 0x00])
841            .via(Framing::length_field(
842                4,
843                0,
844                usize::MAX,
845                FramingByteOrder::BigEndian,
846            ))
847            .run_with(TestSink::probe())
848            .expect("length field framing materializes");
849
850        sink.request(1);
851        assert_eq!(
852            sink.expect_error(),
853            StreamError::Failed(
854                "Decoded frame header reported negative size -2147483648".to_owned()
855            )
856        );
857    }
858
859    #[test]
860    fn json_framing_extracts_objects_split_across_chunks() {
861        let sink = Source::from_iter([b"[{\"a\":1},".to_vec(), b"{\"b\":2}]".to_vec()])
862            .via(Framing::json(64))
863            .run_with(TestSink::probe())
864            .expect("json framing materializes");
865
866        sink.request(3);
867        sink.assert_next_n([b"{\"a\":1}".to_vec(), b"{\"b\":2}".to_vec()]);
868        sink.expect_complete();
869    }
870
871    #[test]
872    fn json_framing_fails_on_max_length_exceeded() {
873        let sink = Source::single(b"{\"abcdef\":1}".to_vec())
874            .via(Framing::json(4))
875            .run_with(TestSink::probe())
876            .expect("json framing materializes");
877
878        sink.request(1);
879        assert_eq!(
880            sink.expect_error(),
881            StreamError::Failed("JSON element exceeded maximumObjectLength (4 bytes)!".to_owned())
882        );
883    }
884
885    #[test]
886    fn json_framing_fails_on_truncated_eof() {
887        let sink = Source::single(b"{\"a\":".to_vec())
888            .via(Framing::json(32))
889            .run_with(TestSink::probe())
890            .expect("json framing materializes");
891
892        sink.request(1);
893        assert_eq!(
894            sink.expect_error(),
895            StreamError::Failed(
896                "Stream finished but there was a truncated final frame in the buffer".to_owned()
897            )
898        );
899    }
900
901    #[test]
902    fn framing_preserves_upstream_errors() {
903        let (source, sink) = TestSource::probe::<Vec<u8>>()
904            .via(Framing::delimiter(b"|".to_vec(), 32, false))
905            .to_mat(TestSink::probe(), Keep::both)
906            .run()
907            .expect("probe framing materializes");
908
909        sink.request(1);
910        source.send_error(StreamError::Failed("boom".to_owned()));
911        assert_eq!(sink.expect_error(), StreamError::Failed("boom".to_owned()));
912    }
913
914    #[test]
915    fn json_framing_no_bleed_across_buffer_reuse() {
916        use crate::Sink;
917        let mut payload = Vec::new();
918        payload.push(b'[');
919        for index in 0..128_usize {
920            if index > 0 {
921                payload.push(b',');
922            }
923            payload
924                .extend_from_slice(format!("{{\"id\":{index},\"pad\":\"xxxxxxxx\"}}").as_bytes());
925        }
926        payload.push(b']');
927
928        let chunks: Vec<Vec<u8>> = payload.chunks(193).map(|c| c.to_vec()).collect();
929        assert!(
930            chunks.len() > 4,
931            "chunks must split objects across boundaries"
932        );
933
934        let frames = Source::from_iter(chunks)
935            .via(Framing::json(256))
936            .run_with(Sink::collect())
937            .expect("json framing materializes")
938            .wait()
939            .expect("json framing completes");
940
941        assert_eq!(frames.len(), 128);
942        for (index, frame) in frames.iter().enumerate() {
943            let text = std::str::from_utf8(frame).expect("frame is valid utf-8");
944            let needle = format!("\"id\":{index}");
945            assert!(
946                text.contains(&needle),
947                "frame {index} missing {needle}: {text}"
948            );
949            assert_eq!(text.as_bytes()[0], b'{');
950            assert_eq!(text.as_bytes()[text.len() - 1], b'}');
951        }
952    }
953
954    #[test]
955    fn json_framing_compacts_buffer_at_chunk_boundaries() {
956        use crate::Sink;
957        let object = br#"{"id":1,"name":"x"}"#;
958        let mut payload = Vec::new();
959        payload.push(b'[');
960        for _ in 0..32 {
961            payload.push(b',');
962            payload.extend_from_slice(object);
963        }
964        payload.push(b']');
965        let chunks: Vec<Vec<u8>> = payload.iter().copied().map(|b| vec![b]).collect();
966
967        let frames = Source::from_iter(chunks)
968            .via(Framing::json(64))
969            .run_with(Sink::collect())
970            .expect("json framing materializes")
971            .wait()
972            .expect("json framing completes");
973
974        assert_eq!(frames.len(), 32);
975        for frame in &frames {
976            assert_eq!(frame.as_slice(), object);
977        }
978    }
979}