Skip to main content

datum/io/
framing.rs

1use crate::stream::{BoxStream, Flow, NotUsed};
2use crate::{StreamError, StreamResult};
3#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4pub enum FramingByteOrder {
5    BigEndian,
6    LittleEndian,
7}
8
9#[derive(Clone)]
10enum Terminal {
11    Complete,
12    Error(StreamError),
13}
14
15fn sticky_terminal<T>(terminal: &Terminal) -> Option<StreamResult<T>> {
16    match terminal {
17        Terminal::Complete => None,
18        Terminal::Error(error) => Some(Err(error.clone())),
19    }
20}
21
22pub struct Framing;
23
24impl Framing {
25    #[must_use]
26    pub fn delimiter(
27        delimiter: Vec<u8>,
28        maximum_frame_length: usize,
29        allow_truncation: bool,
30    ) -> Flow<Vec<u8>, Vec<u8>> {
31        assert!(
32            !delimiter.is_empty(),
33            "delimiter must contain at least one byte"
34        );
35        assert!(
36            maximum_frame_length > 0,
37            "maximum frame length must be greater than zero"
38        );
39        Flow::from_transform(move |input| {
40            Box::new(DelimiterFramingStream::new(
41                input,
42                delimiter.clone(),
43                maximum_frame_length,
44                allow_truncation,
45            )) as BoxStream<Vec<u8>>
46        })
47    }
48
49    #[must_use]
50    pub fn length_field(
51        field_length: usize,
52        field_offset: usize,
53        maximum_frame_length: usize,
54        byte_order: FramingByteOrder,
55    ) -> Flow<Vec<u8>, Vec<u8>> {
56        assert!(
57            (1..=4).contains(&field_length),
58            "Length field length must be 1, 2, 3 or 4."
59        );
60        assert!(
61            maximum_frame_length > 0,
62            "maximum frame length must be greater than zero"
63        );
64        Flow::from_transform(move |input| {
65            Box::new(LengthFieldFramingStream::new(
66                input,
67                field_length,
68                field_offset,
69                maximum_frame_length,
70                byte_order,
71            )) as BoxStream<Vec<u8>>
72        })
73    }
74
75    #[must_use]
76    pub fn json(maximum_object_length: usize) -> Flow<Vec<u8>, Vec<u8>, NotUsed> {
77        assert!(
78            maximum_object_length > 0,
79            "maximum object length must be greater than zero"
80        );
81        Flow::from_transform(move |input| {
82            Box::new(JsonFramingStream::new(input, maximum_object_length)) as BoxStream<Vec<u8>>
83        })
84    }
85}
86
87struct DelimiterFramingStream {
88    input: BoxStream<Vec<u8>>,
89    delimiter: Vec<u8>,
90    maximum_frame_length: usize,
91    allow_truncation: bool,
92    buffer: Vec<u8>,
93    terminal: Option<Terminal>,
94}
95
96impl DelimiterFramingStream {
97    fn new(
98        input: BoxStream<Vec<u8>>,
99        delimiter: Vec<u8>,
100        maximum_frame_length: usize,
101        allow_truncation: bool,
102    ) -> Self {
103        Self {
104            input,
105            delimiter,
106            maximum_frame_length,
107            allow_truncation,
108            buffer: Vec::new(),
109            terminal: None,
110        }
111    }
112
113    fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
114        self.terminal = Some(Terminal::Error(error.clone()));
115        Some(Err(error))
116    }
117
118    fn delimiter_position(&self) -> Option<usize> {
119        self.buffer
120            .windows(self.delimiter.len())
121            .position(|window| window == self.delimiter.as_slice())
122    }
123
124    fn trailing_delimiter_prefix_len(&self) -> usize {
125        let max_prefix = self
126            .delimiter
127            .len()
128            .saturating_sub(1)
129            .min(self.buffer.len());
130        (1..=max_prefix)
131            .rev()
132            .find(|&prefix_len| self.buffer.ends_with(&self.delimiter[..prefix_len]))
133            .unwrap_or(0)
134    }
135}
136
137impl Iterator for DelimiterFramingStream {
138    type Item = StreamResult<Vec<u8>>;
139
140    fn next(&mut self) -> Option<Self::Item> {
141        if let Some(terminal) = &self.terminal {
142            return sticky_terminal(terminal);
143        }
144
145        loop {
146            if let Some(position) = self.delimiter_position() {
147                if position > self.maximum_frame_length {
148                    return self.fail(StreamError::Failed(format!(
149                        "Read {position} bytes which is more than {} without seeing a line terminator",
150                        self.maximum_frame_length
151                    )));
152                }
153                let frame = self.buffer[..position].to_vec();
154                self.buffer.drain(..position + self.delimiter.len());
155                return Some(Ok(frame));
156            }
157
158            // Match Akka's Framing delimiter stage here: a trailing partial delimiter
159            // (Framing.scala lines 262-284) may still complete on the next pull, so only
160            // bytes before that partial match count toward the max-length failure.
161            let trailing_partial = self.trailing_delimiter_prefix_len();
162            let unmatched = self.buffer.len() - trailing_partial;
163            if unmatched > self.maximum_frame_length {
164                return self.fail(StreamError::Failed(format!(
165                    "Read {} bytes which is more than {} without seeing a line terminator",
166                    unmatched, self.maximum_frame_length
167                )));
168            }
169
170            match self.input.next() {
171                Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
172                Some(Err(error)) => {
173                    self.terminal = Some(Terminal::Error(error.clone()));
174                    return Some(Err(error));
175                }
176                None => {
177                    if self.buffer.is_empty() {
178                        self.terminal = Some(Terminal::Complete);
179                        return None;
180                    }
181                    if self.allow_truncation {
182                        let frame = std::mem::take(&mut self.buffer);
183                        self.terminal = Some(Terminal::Complete);
184                        return Some(Ok(frame));
185                    }
186                    return self.fail(StreamError::Failed(
187                        "Stream finished but there was a truncated final frame in the buffer"
188                            .to_owned(),
189                    ));
190                }
191            }
192        }
193    }
194}
195
196struct LengthFieldFramingStream {
197    input: BoxStream<Vec<u8>>,
198    field_length: usize,
199    field_offset: usize,
200    minimum_chunk_size: usize,
201    maximum_frame_length: usize,
202    byte_order: FramingByteOrder,
203    buffer: Vec<u8>,
204    frame_size: Option<usize>,
205    terminal: Option<Terminal>,
206}
207
208impl LengthFieldFramingStream {
209    fn new(
210        input: BoxStream<Vec<u8>>,
211        field_length: usize,
212        field_offset: usize,
213        maximum_frame_length: usize,
214        byte_order: FramingByteOrder,
215    ) -> Self {
216        let minimum_chunk_size = field_offset + field_length;
217        Self {
218            input,
219            field_length,
220            field_offset,
221            minimum_chunk_size,
222            maximum_frame_length,
223            byte_order,
224            buffer: Vec::new(),
225            frame_size: None,
226            terminal: None,
227        }
228    }
229
230    fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
231        self.terminal = Some(Terminal::Error(error.clone()));
232        Some(Err(error))
233    }
234
235    fn parse_length(&self) -> i32 {
236        let bytes = &self.buffer[self.field_offset..self.field_offset + self.field_length];
237        match (self.byte_order, self.field_length) {
238            (FramingByteOrder::BigEndian, 1) => i32::from(bytes[0]),
239            (FramingByteOrder::BigEndian, 2) => i32::from(u16::from_be_bytes([bytes[0], bytes[1]])),
240            (FramingByteOrder::BigEndian, 3) => {
241                ((i32::from(bytes[0])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[2])
242            }
243            (FramingByteOrder::BigEndian, 4) => {
244                i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
245            }
246            (FramingByteOrder::LittleEndian, 1) => i32::from(bytes[0]),
247            (FramingByteOrder::LittleEndian, 2) => {
248                i32::from(u16::from_le_bytes([bytes[0], bytes[1]]))
249            }
250            (FramingByteOrder::LittleEndian, 3) => {
251                ((i32::from(bytes[2])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[0])
252            }
253            (FramingByteOrder::LittleEndian, 4) => {
254                i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
255            }
256            _ => unreachable!("field length validated at construction"),
257        }
258    }
259}
260
261impl Iterator for LengthFieldFramingStream {
262    type Item = StreamResult<Vec<u8>>;
263
264    fn next(&mut self) -> Option<Self::Item> {
265        if let Some(terminal) = &self.terminal {
266            return sticky_terminal(terminal);
267        }
268
269        loop {
270            if let Some(frame_size) = self.frame_size {
271                if self.buffer.len() >= frame_size {
272                    let frame = self.buffer[..frame_size].to_vec();
273                    self.buffer.drain(..frame_size);
274                    self.frame_size = None;
275                    return Some(Ok(frame));
276                }
277            } else if self.buffer.len() >= self.minimum_chunk_size {
278                let parsed_length = self.parse_length();
279                if parsed_length < 0 {
280                    return self.fail(StreamError::Failed(format!(
281                        "Decoded frame header reported negative size {parsed_length}"
282                    )));
283                }
284                let frame_size = parsed_length as usize + self.minimum_chunk_size;
285                if frame_size > self.maximum_frame_length {
286                    return self.fail(StreamError::Failed(format!(
287                        "Maximum allowed frame size is {} but decoded frame header reported size {frame_size}",
288                        self.maximum_frame_length
289                    )));
290                }
291                if frame_size < self.minimum_chunk_size {
292                    return self.fail(StreamError::Failed(format!(
293                        "Computed frame size {frame_size} is less than minimum chunk size {}",
294                        self.minimum_chunk_size
295                    )));
296                }
297                self.frame_size = Some(frame_size);
298                continue;
299            }
300
301            match self.input.next() {
302                Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
303                Some(Err(error)) => {
304                    self.terminal = Some(Terminal::Error(error.clone()));
305                    return Some(Err(error));
306                }
307                None => {
308                    if self.buffer.is_empty() {
309                        self.terminal = Some(Terminal::Complete);
310                        return None;
311                    }
312                    return self.fail(StreamError::Failed(
313                        "Stream finished but there was a truncated final frame in the buffer"
314                            .to_owned(),
315                    ));
316                }
317            }
318        }
319    }
320}
321
322struct JsonFramingStream {
323    input: BoxStream<Vec<u8>>,
324    maximum_object_length: usize,
325    buffer: Vec<u8>,
326    pos: usize,
327    start: usize,
328    depth: usize,
329    completed_object: bool,
330    in_string_expression: bool,
331    in_backslash_escape: bool,
332    terminal: Option<Terminal>,
333}
334
335// Outer-level character class used by skip_to_next_object.
336const OUTER_OBJECT: u8 = 2; // b'{' — start of JSON object
337const OUTER_SKIP: u8 = 1; // b'[', b']', b',', whitespace — advance start
338const OUTER_ERROR: u8 = 0; // anything else — invalid JSON
339
340static OUTER_CHARS: [u8; 256] = {
341    let mut table = [OUTER_ERROR; 256];
342    table[b'{' as usize] = OUTER_OBJECT;
343    table[b'[' as usize] = OUTER_SKIP;
344    table[b']' as usize] = OUTER_SKIP;
345    table[b',' as usize] = OUTER_SKIP;
346    table[b' ' as usize] = OUTER_SKIP;
347    table[b'\n' as usize] = OUTER_SKIP;
348    table[b'\r' as usize] = OUTER_SKIP;
349    table[b'\t' as usize] = OUTER_SKIP;
350    table
351};
352
353// Inner-level "interesting" flag used by scan_object in non-string mode.
354// A byte is uninteresting (0) when it cannot change depth or enter a string:
355// anything other than `"`, `{`, or `}`. These bytes are skipped in a tight
356// inner loop without entering the full match, reducing branch pressure.
357static INNER_INTERESTING: [u8; 256] = {
358    let mut table = [0u8; 256];
359    table[b'"' as usize] = 1;
360    table[b'{' as usize] = 1;
361    table[b'}' as usize] = 1;
362    table
363};
364
365// String-mode "interesting" flag: only `"` (ends or is escaped by the prior
366// `\`) and `\` (starts escape). Every other byte in a string is a plain char
367// that carries no parser-state change when not in escape mode, so it can be
368// skipped without the `in_escape = false` store that the full match requires.
369static STRING_INTERESTING: [u8; 256] = {
370    let mut table = [0u8; 256];
371    table[b'"' as usize] = 1;
372    table[b'\\' as usize] = 1;
373    table
374};
375
376impl JsonFramingStream {
377    fn new(input: BoxStream<Vec<u8>>, maximum_object_length: usize) -> Self {
378        Self {
379            input,
380            maximum_object_length,
381            buffer: Vec::with_capacity(maximum_object_length.min(4096)),
382            pos: 0,
383            start: 0,
384            depth: 0,
385            completed_object: false,
386            in_string_expression: false,
387            in_backslash_escape: false,
388            terminal: None,
389        }
390    }
391
392    fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
393        self.terminal = Some(Terminal::Error(error.clone()));
394        Some(Err(error))
395    }
396
397    fn can_complete(&self) -> bool {
398        self.depth == 0
399    }
400
401    /// Compact the buffer in place, discarding bytes before `start`.
402    /// Called before extending with new input when the current slice
403    /// would otherwise force a reallocation.
404    fn compact(&mut self) {
405        if self.start == 0 {
406            return;
407        }
408        if self.start >= self.buffer.len() {
409            self.buffer.clear();
410        } else {
411            self.buffer.drain(..self.start);
412        }
413        self.pos -= self.start;
414        self.start = 0;
415    }
416
417    fn skip_to_next_object(&mut self) -> StreamResult<()> {
418        if self.depth > 0 {
419            return Ok(());
420        }
421        let max = self.buffer.len();
422        let limit = self.maximum_object_length;
423        let mut pos = self.pos;
424        let mut start = self.start;
425        while pos < max && pos - start < limit {
426            let outer = OUTER_CHARS[self.buffer[pos] as usize];
427            if outer == OUTER_SKIP {
428                start += 1;
429                pos += 1;
430            } else if outer == OUTER_OBJECT {
431                self.start = start;
432                self.pos = pos + 1;
433                self.depth = 1;
434                return Ok(());
435            } else {
436                return Err(StreamError::Failed(format!(
437                    "Invalid JSON encountered at position [{}] of [{}]",
438                    self.start,
439                    String::from_utf8_lossy(&self.buffer)
440                )));
441            }
442        }
443        self.start = start;
444        self.pos = pos;
445        Ok(())
446    }
447
448    fn scan_object(&mut self) -> StreamResult<()> {
449        let max = self.buffer.len();
450        let limit = self.maximum_object_length;
451        let mut pos = self.pos;
452        let mut depth = self.depth;
453        let mut in_string = self.in_string_expression;
454        let mut in_escape = self.in_backslash_escape;
455        let start = self.start;
456        let mut completed = false;
457
458        while pos < max && pos - start < limit {
459            let byte = self.buffer[pos];
460            if in_string {
461                if in_escape {
462                    // The escaped character is always a single byte; consume
463                    // it unconditionally and reset escape state.
464                    in_escape = false;
465                } else if STRING_INTERESTING[byte as usize] != 0 {
466                    // Only `"` and `\` require state changes in string mode.
467                    if byte == b'"' {
468                        in_string = false;
469                    } else {
470                        in_escape = true;
471                    }
472                }
473                // Uninteresting string bytes (the vast majority) take no
474                // state-change path: no match, no store. This eliminates the
475                // `in_escape = false` write for every plain character in a
476                // string value, the dominant cost in the original inner scan.
477            } else {
478                // Fast-skip uninteresting bytes in non-string mode: anything
479                // that cannot change depth or enter string state.
480                if INNER_INTERESTING[byte as usize] == 0 {
481                    pos += 1;
482                    continue;
483                }
484                match byte {
485                    b'"' => in_string = true,
486                    b'{' => depth += 1,
487                    b'}' => {
488                        depth -= 1;
489                        if depth == 0 {
490                            pos += 1;
491                            completed = true;
492                            break;
493                        }
494                    }
495                    _ => {}
496                }
497            }
498            pos += 1;
499        }
500
501        self.pos = pos;
502        self.depth = depth;
503        self.in_string_expression = in_string;
504        self.in_backslash_escape = in_escape;
505        self.completed_object = completed;
506        Ok(())
507    }
508
509    fn poll_object(&mut self) -> StreamResult<Option<Vec<u8>>> {
510        self.completed_object = false;
511        self.skip_to_next_object()?;
512        self.scan_object()?;
513
514        if self.pos.saturating_sub(self.start) >= self.maximum_object_length {
515            return Err(StreamError::Failed(format!(
516                "JSON element exceeded maximumObjectLength ({} bytes)!",
517                self.maximum_object_length
518            )));
519        }
520
521        if self.completed_object && self.start < self.pos {
522            let frame = self.buffer[self.start..self.pos].to_vec();
523            // Advance start lazily; the actual drain (compact) is deferred
524            // until the next chunk pull would otherwise force a realloc.
525            self.start = self.pos;
526            return Ok(Some(frame));
527        }
528
529        Ok(None)
530    }
531}
532
533impl Iterator for JsonFramingStream {
534    type Item = StreamResult<Vec<u8>>;
535
536    fn next(&mut self) -> Option<Self::Item> {
537        if let Some(terminal) = &self.terminal {
538            return sticky_terminal(terminal);
539        }
540
541        loop {
542            match self.poll_object() {
543                Ok(Some(frame)) => return Some(Ok(frame)),
544                Ok(None) => {}
545                Err(error) => return self.fail(error),
546            }
547
548            match self.input.next() {
549                Some(Ok(chunk)) => {
550                    // Compact before extending only when the new chunk
551                    // would not fit in remaining buffer capacity. This
552                    // avoids a memmove on every chunk; the cost is paid
553                    // only when compaction is needed to prevent realloc.
554                    if self.start > 0 && self.buffer.capacity() - self.buffer.len() < chunk.len() {
555                        let tail = self.buffer.len() - self.start;
556                        if tail + chunk.len() <= self.buffer.capacity() {
557                            self.compact();
558                        }
559                    }
560                    self.buffer.extend_from_slice(&chunk);
561                }
562                Some(Err(error)) => {
563                    self.terminal = Some(Terminal::Error(error.clone()));
564                    return Some(Err(error));
565                }
566                None => {
567                    if self.start >= self.buffer.len() {
568                        self.terminal = Some(Terminal::Complete);
569                        return None;
570                    }
571                    if self.can_complete() {
572                        self.start = 0;
573                        self.terminal = Some(Terminal::Complete);
574                        return None;
575                    }
576                    return self.fail(StreamError::Failed(
577                        "Stream finished but there was a truncated final frame in the buffer"
578                            .to_owned(),
579                    ));
580                }
581            }
582        }
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589    use crate::testkit::{TestSink, TestSource};
590    use crate::{Keep, Source};
591
592    #[test]
593    fn delimiter_framing_handles_split_frames() {
594        let sink = Source::from_iter([b"ab|c".to_vec(), b"d|".to_vec()])
595            .via(Framing::delimiter(b"|".to_vec(), 16, false))
596            .run_with(TestSink::probe())
597            .expect("delimiter framing materializes");
598
599        sink.request(3);
600        sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
601        sink.expect_complete();
602    }
603
604    #[test]
605    fn delimiter_framing_fails_on_max_length_exceeded() {
606        let sink = Source::single(b"abcdef".to_vec())
607            .via(Framing::delimiter(b"|".to_vec(), 3, false))
608            .run_with(TestSink::probe())
609            .expect("delimiter framing materializes");
610
611        sink.request(1);
612        assert_eq!(
613            sink.expect_error(),
614            StreamError::Failed(
615                "Read 6 bytes which is more than 3 without seeing a line terminator".to_owned()
616            )
617        );
618    }
619
620    #[test]
621    fn delimiter_framing_accepts_max_length_frame_with_split_partial_delimiter() {
622        let frame = vec![b'a'; 64];
623        let first = [frame.clone(), vec![b'\r']].concat();
624        let second = vec![b'\n'];
625        let sink = Source::from_iter([first, second])
626            .via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
627            .run_with(TestSink::probe())
628            .expect("delimiter framing materializes");
629
630        sink.request(2);
631        sink.assert_next(frame);
632        sink.expect_complete();
633    }
634
635    #[test]
636    fn delimiter_framing_still_fails_when_max_length_is_exceeded_without_partial_match() {
637        let sink = Source::single(vec![b'a'; 65])
638            .via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
639            .run_with(TestSink::probe())
640            .expect("delimiter framing materializes");
641
642        sink.request(1);
643        assert_eq!(
644            sink.expect_error(),
645            StreamError::Failed(
646                "Read 65 bytes which is more than 64 without seeing a line terminator".to_owned()
647            )
648        );
649    }
650
651    #[test]
652    fn delimiter_framing_fails_on_truncated_eof() {
653        let sink = Source::single(b"abc".to_vec())
654            .via(Framing::delimiter(b"|".to_vec(), 8, false))
655            .run_with(TestSink::probe())
656            .expect("delimiter framing materializes");
657
658        sink.request(1);
659        assert_eq!(
660            sink.expect_error(),
661            StreamError::Failed(
662                "Stream finished but there was a truncated final frame in the buffer".to_owned()
663            )
664        );
665    }
666
667    #[test]
668    fn delimiter_framing_allows_truncation_when_enabled() {
669        let sink = Source::single(b"abc".to_vec())
670            .via(Framing::delimiter(b"|".to_vec(), 8, true))
671            .run_with(TestSink::probe())
672            .expect("delimiter framing materializes");
673
674        sink.request(2);
675        sink.assert_next(b"abc".to_vec());
676        sink.expect_complete();
677    }
678
679    #[test]
680    fn length_field_framing_handles_split_headers_and_payloads() {
681        let frame = [0_u8, 0_u8, 0_u8, 2_u8, b'o', b'k'];
682        let sink = Source::from_iter([frame[..3].to_vec(), frame[3..].to_vec()])
683            .via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
684            .run_with(TestSink::probe())
685            .expect("length field framing materializes");
686
687        sink.request(2);
688        sink.assert_next(frame.to_vec());
689        sink.expect_complete();
690    }
691
692    #[test]
693    fn length_field_framing_fails_on_max_length_exceeded() {
694        let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 8_u8])
695            .via(Framing::length_field(4, 0, 6, FramingByteOrder::BigEndian))
696            .run_with(TestSink::probe())
697            .expect("length field framing materializes");
698
699        sink.request(1);
700        assert_eq!(
701            sink.expect_error(),
702            StreamError::Failed(
703                "Maximum allowed frame size is 6 but decoded frame header reported size 12"
704                    .to_owned()
705            )
706        );
707    }
708
709    #[test]
710    fn length_field_framing_fails_on_truncated_eof() {
711        let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 2_u8, b'o'])
712            .via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
713            .run_with(TestSink::probe())
714            .expect("length field framing materializes");
715
716        sink.request(1);
717        assert_eq!(
718            sink.expect_error(),
719            StreamError::Failed(
720                "Stream finished but there was a truncated final frame in the buffer".to_owned()
721            )
722        );
723    }
724
725    #[test]
726    fn length_field_framing_treats_two_byte_big_endian_lengths_as_unsigned() {
727        let payload = vec![b'x'; 0x8000];
728        let mut frame = vec![0x80, 0x00];
729        frame.extend_from_slice(&payload);
730        let sink = Source::single(frame.clone())
731            .via(Framing::length_field(
732                2,
733                0,
734                frame.len(),
735                FramingByteOrder::BigEndian,
736            ))
737            .run_with(TestSink::probe())
738            .expect("length field framing materializes");
739
740        sink.request(2);
741        sink.assert_next(frame);
742        sink.expect_complete();
743    }
744
745    #[test]
746    fn length_field_framing_treats_two_byte_little_endian_lengths_as_unsigned() {
747        let payload = vec![b'y'; 0x8000];
748        let mut frame = vec![0x00, 0x80];
749        frame.extend_from_slice(&payload);
750        let sink = Source::single(frame.clone())
751            .via(Framing::length_field(
752                2,
753                0,
754                frame.len(),
755                FramingByteOrder::LittleEndian,
756            ))
757            .run_with(TestSink::probe())
758            .expect("length field framing materializes");
759
760        sink.request(2);
761        sink.assert_next(frame);
762        sink.expect_complete();
763    }
764
765    #[test]
766    fn length_field_framing_keeps_four_byte_signed_overflow_behavior() {
767        let sink = Source::single(vec![0x80, 0x00, 0x00, 0x00])
768            .via(Framing::length_field(
769                4,
770                0,
771                usize::MAX,
772                FramingByteOrder::BigEndian,
773            ))
774            .run_with(TestSink::probe())
775            .expect("length field framing materializes");
776
777        sink.request(1);
778        assert_eq!(
779            sink.expect_error(),
780            StreamError::Failed(
781                "Decoded frame header reported negative size -2147483648".to_owned()
782            )
783        );
784    }
785
786    #[test]
787    fn json_framing_extracts_objects_split_across_chunks() {
788        let sink = Source::from_iter([b"[{\"a\":1},".to_vec(), b"{\"b\":2}]".to_vec()])
789            .via(Framing::json(64))
790            .run_with(TestSink::probe())
791            .expect("json framing materializes");
792
793        sink.request(3);
794        sink.assert_next_n([b"{\"a\":1}".to_vec(), b"{\"b\":2}".to_vec()]);
795        sink.expect_complete();
796    }
797
798    #[test]
799    fn json_framing_fails_on_max_length_exceeded() {
800        let sink = Source::single(b"{\"abcdef\":1}".to_vec())
801            .via(Framing::json(4))
802            .run_with(TestSink::probe())
803            .expect("json framing materializes");
804
805        sink.request(1);
806        assert_eq!(
807            sink.expect_error(),
808            StreamError::Failed("JSON element exceeded maximumObjectLength (4 bytes)!".to_owned())
809        );
810    }
811
812    #[test]
813    fn json_framing_fails_on_truncated_eof() {
814        let sink = Source::single(b"{\"a\":".to_vec())
815            .via(Framing::json(32))
816            .run_with(TestSink::probe())
817            .expect("json framing materializes");
818
819        sink.request(1);
820        assert_eq!(
821            sink.expect_error(),
822            StreamError::Failed(
823                "Stream finished but there was a truncated final frame in the buffer".to_owned()
824            )
825        );
826    }
827
828    #[test]
829    fn framing_preserves_upstream_errors() {
830        let (source, sink) = TestSource::probe::<Vec<u8>>()
831            .via(Framing::delimiter(b"|".to_vec(), 32, false))
832            .to_mat(TestSink::probe(), Keep::both)
833            .run()
834            .expect("probe framing materializes");
835
836        sink.request(1);
837        source.send_error(StreamError::Failed("boom".to_owned()));
838        assert_eq!(sink.expect_error(), StreamError::Failed("boom".to_owned()));
839    }
840
841    #[test]
842    fn json_framing_no_bleed_across_buffer_reuse() {
843        use crate::Sink;
844        let mut payload = Vec::new();
845        payload.push(b'[');
846        for index in 0..128_usize {
847            if index > 0 {
848                payload.push(b',');
849            }
850            payload
851                .extend_from_slice(format!("{{\"id\":{index},\"pad\":\"xxxxxxxx\"}}").as_bytes());
852        }
853        payload.push(b']');
854
855        let chunks: Vec<Vec<u8>> = payload.chunks(193).map(|c| c.to_vec()).collect();
856        assert!(
857            chunks.len() > 4,
858            "chunks must split objects across boundaries"
859        );
860
861        let frames = Source::from_iter(chunks)
862            .via(Framing::json(256))
863            .run_with(Sink::collect())
864            .expect("json framing materializes")
865            .wait()
866            .expect("json framing completes");
867
868        assert_eq!(frames.len(), 128);
869        for (index, frame) in frames.iter().enumerate() {
870            let text = std::str::from_utf8(frame).expect("frame is valid utf-8");
871            let needle = format!("\"id\":{index}");
872            assert!(
873                text.contains(&needle),
874                "frame {index} missing {needle}: {text}"
875            );
876            assert_eq!(text.as_bytes()[0], b'{');
877            assert_eq!(text.as_bytes()[text.len() - 1], b'}');
878        }
879    }
880
881    #[test]
882    fn json_framing_compacts_buffer_at_chunk_boundaries() {
883        use crate::Sink;
884        let object = br#"{"id":1,"name":"x"}"#;
885        let mut payload = Vec::new();
886        payload.push(b'[');
887        for _ in 0..32 {
888            payload.push(b',');
889            payload.extend_from_slice(object);
890        }
891        payload.push(b']');
892        let chunks: Vec<Vec<u8>> = payload.iter().copied().map(|b| vec![b]).collect();
893
894        let frames = Source::from_iter(chunks)
895            .via(Framing::json(64))
896            .run_with(Sink::collect())
897            .expect("json framing materializes")
898            .wait()
899            .expect("json framing completes");
900
901        assert_eq!(frames.len(), 32);
902        for frame in &frames {
903            assert_eq!(frame.as_slice(), object);
904        }
905    }
906}