1use crate::stream::{BoxStream, Flow, NotUsed};
2use crate::{StreamError, StreamResult};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum FramingByteOrder {
7 BigEndian,
9 LittleEndian,
11}
12
13#[derive(Clone)]
14enum Terminal {
15 Complete,
16 Error(StreamError),
17}
18
19fn sticky_terminal<T>(terminal: &Terminal) -> Option<StreamResult<T>> {
20 match terminal {
21 Terminal::Complete => None,
22 Terminal::Error(error) => Some(Err(error.clone())),
23 }
24}
25
26pub struct Framing;
29
30impl Framing {
31 #[must_use]
38 pub fn delimiter(
39 delimiter: Vec<u8>,
40 maximum_frame_length: usize,
41 allow_truncation: bool,
42 ) -> Flow<Vec<u8>, Vec<u8>> {
43 assert!(
44 !delimiter.is_empty(),
45 "delimiter must contain at least one byte"
46 );
47 assert!(
48 maximum_frame_length > 0,
49 "maximum frame length must be greater than zero"
50 );
51 Flow::from_transform(move |input| {
52 Box::new(DelimiterFramingStream::new(
53 input,
54 delimiter.clone(),
55 maximum_frame_length,
56 allow_truncation,
57 )) as BoxStream<Vec<u8>>
58 })
59 }
60
61 #[must_use]
69 pub fn length_field(
70 field_length: usize,
71 field_offset: usize,
72 maximum_frame_length: usize,
73 byte_order: FramingByteOrder,
74 ) -> Flow<Vec<u8>, Vec<u8>> {
75 assert!(
76 (1..=4).contains(&field_length),
77 "Length field length must be 1, 2, 3 or 4."
78 );
79 assert!(
80 maximum_frame_length > 0,
81 "maximum frame length must be greater than zero"
82 );
83 Flow::from_transform(move |input| {
84 Box::new(LengthFieldFramingStream::new(
85 input,
86 field_length,
87 field_offset,
88 maximum_frame_length,
89 byte_order,
90 )) as BoxStream<Vec<u8>>
91 })
92 }
93
94 #[must_use]
101 pub fn json(maximum_object_length: usize) -> Flow<Vec<u8>, Vec<u8>, NotUsed> {
102 assert!(
103 maximum_object_length > 0,
104 "maximum object length must be greater than zero"
105 );
106 Flow::from_transform(move |input| {
107 Box::new(JsonFramingStream::new(input, maximum_object_length)) as BoxStream<Vec<u8>>
108 })
109 }
110}
111
112struct DelimiterFramingStream {
113 input: BoxStream<Vec<u8>>,
114 delimiter: Vec<u8>,
115 maximum_frame_length: usize,
116 allow_truncation: bool,
117 buffer: Vec<u8>,
118 terminal: Option<Terminal>,
119}
120
121impl DelimiterFramingStream {
122 fn new(
123 input: BoxStream<Vec<u8>>,
124 delimiter: Vec<u8>,
125 maximum_frame_length: usize,
126 allow_truncation: bool,
127 ) -> Self {
128 Self {
129 input,
130 delimiter,
131 maximum_frame_length,
132 allow_truncation,
133 buffer: Vec::new(),
134 terminal: None,
135 }
136 }
137
138 fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
139 self.terminal = Some(Terminal::Error(error.clone()));
140 Some(Err(error))
141 }
142
143 fn delimiter_position(&self) -> Option<usize> {
144 self.buffer
145 .windows(self.delimiter.len())
146 .position(|window| window == self.delimiter.as_slice())
147 }
148
149 fn trailing_delimiter_prefix_len(&self) -> usize {
150 let max_prefix = self
151 .delimiter
152 .len()
153 .saturating_sub(1)
154 .min(self.buffer.len());
155 (1..=max_prefix)
156 .rev()
157 .find(|&prefix_len| self.buffer.ends_with(&self.delimiter[..prefix_len]))
158 .unwrap_or(0)
159 }
160}
161
162impl Iterator for DelimiterFramingStream {
163 type Item = StreamResult<Vec<u8>>;
164
165 fn next(&mut self) -> Option<Self::Item> {
166 if let Some(terminal) = &self.terminal {
167 return sticky_terminal(terminal);
168 }
169
170 loop {
171 if let Some(position) = self.delimiter_position() {
172 if position > self.maximum_frame_length {
173 return self.fail(StreamError::Failed(format!(
174 "Read {position} bytes which is more than {} without seeing a line terminator",
175 self.maximum_frame_length
176 )));
177 }
178 let frame = self.buffer[..position].to_vec();
179 self.buffer.drain(..position + self.delimiter.len());
180 return Some(Ok(frame));
181 }
182
183 let trailing_partial = self.trailing_delimiter_prefix_len();
187 let unmatched = self.buffer.len() - trailing_partial;
188 if unmatched > self.maximum_frame_length {
189 return self.fail(StreamError::Failed(format!(
190 "Read {} bytes which is more than {} without seeing a line terminator",
191 unmatched, self.maximum_frame_length
192 )));
193 }
194
195 match self.input.next() {
196 Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
197 Some(Err(error)) => {
198 self.terminal = Some(Terminal::Error(error.clone()));
199 return Some(Err(error));
200 }
201 None => {
202 if self.buffer.is_empty() {
203 self.terminal = Some(Terminal::Complete);
204 return None;
205 }
206 if self.allow_truncation {
207 let frame = std::mem::take(&mut self.buffer);
208 self.terminal = Some(Terminal::Complete);
209 return Some(Ok(frame));
210 }
211 return self.fail(StreamError::Failed(
212 "Stream finished but there was a truncated final frame in the buffer"
213 .to_owned(),
214 ));
215 }
216 }
217 }
218 }
219}
220
221struct LengthFieldFramingStream {
222 input: BoxStream<Vec<u8>>,
223 field_length: usize,
224 field_offset: usize,
225 minimum_chunk_size: usize,
226 maximum_frame_length: usize,
227 byte_order: FramingByteOrder,
228 buffer: Vec<u8>,
229 frame_size: Option<usize>,
230 terminal: Option<Terminal>,
231}
232
233impl LengthFieldFramingStream {
234 fn new(
235 input: BoxStream<Vec<u8>>,
236 field_length: usize,
237 field_offset: usize,
238 maximum_frame_length: usize,
239 byte_order: FramingByteOrder,
240 ) -> Self {
241 let minimum_chunk_size = field_offset + field_length;
242 Self {
243 input,
244 field_length,
245 field_offset,
246 minimum_chunk_size,
247 maximum_frame_length,
248 byte_order,
249 buffer: Vec::new(),
250 frame_size: None,
251 terminal: None,
252 }
253 }
254
255 fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
256 self.terminal = Some(Terminal::Error(error.clone()));
257 Some(Err(error))
258 }
259
260 fn parse_length(&self) -> i32 {
261 let bytes = &self.buffer[self.field_offset..self.field_offset + self.field_length];
262 match (self.byte_order, self.field_length) {
263 (FramingByteOrder::BigEndian, 1) => i32::from(bytes[0]),
264 (FramingByteOrder::BigEndian, 2) => i32::from(u16::from_be_bytes([bytes[0], bytes[1]])),
265 (FramingByteOrder::BigEndian, 3) => {
266 ((i32::from(bytes[0])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[2])
267 }
268 (FramingByteOrder::BigEndian, 4) => {
269 i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
270 }
271 (FramingByteOrder::LittleEndian, 1) => i32::from(bytes[0]),
272 (FramingByteOrder::LittleEndian, 2) => {
273 i32::from(u16::from_le_bytes([bytes[0], bytes[1]]))
274 }
275 (FramingByteOrder::LittleEndian, 3) => {
276 ((i32::from(bytes[2])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[0])
277 }
278 (FramingByteOrder::LittleEndian, 4) => {
279 i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
280 }
281 _ => unreachable!("field length validated at construction"),
282 }
283 }
284}
285
286impl Iterator for LengthFieldFramingStream {
287 type Item = StreamResult<Vec<u8>>;
288
289 fn next(&mut self) -> Option<Self::Item> {
290 if let Some(terminal) = &self.terminal {
291 return sticky_terminal(terminal);
292 }
293
294 loop {
295 if let Some(frame_size) = self.frame_size {
296 if self.buffer.len() >= frame_size {
297 let frame = self.buffer[..frame_size].to_vec();
298 self.buffer.drain(..frame_size);
299 self.frame_size = None;
300 return Some(Ok(frame));
301 }
302 } else if self.buffer.len() >= self.minimum_chunk_size {
303 let parsed_length = self.parse_length();
304 if parsed_length < 0 {
305 return self.fail(StreamError::Failed(format!(
306 "Decoded frame header reported negative size {parsed_length}"
307 )));
308 }
309 let frame_size = parsed_length as usize + self.minimum_chunk_size;
310 if frame_size > self.maximum_frame_length {
311 return self.fail(StreamError::Failed(format!(
312 "Maximum allowed frame size is {} but decoded frame header reported size {frame_size}",
313 self.maximum_frame_length
314 )));
315 }
316 if frame_size < self.minimum_chunk_size {
317 return self.fail(StreamError::Failed(format!(
318 "Computed frame size {frame_size} is less than minimum chunk size {}",
319 self.minimum_chunk_size
320 )));
321 }
322 self.frame_size = Some(frame_size);
323 continue;
324 }
325
326 match self.input.next() {
327 Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
328 Some(Err(error)) => {
329 self.terminal = Some(Terminal::Error(error.clone()));
330 return Some(Err(error));
331 }
332 None => {
333 if self.buffer.is_empty() {
334 self.terminal = Some(Terminal::Complete);
335 return None;
336 }
337 return self.fail(StreamError::Failed(
338 "Stream finished but there was a truncated final frame in the buffer"
339 .to_owned(),
340 ));
341 }
342 }
343 }
344 }
345}
346
347struct JsonFramingStream {
348 input: BoxStream<Vec<u8>>,
349 maximum_object_length: usize,
350 buffer: Vec<u8>,
351 pos: usize,
352 start: usize,
353 depth: usize,
354 completed_object: bool,
355 in_string_expression: bool,
356 in_backslash_escape: bool,
357 terminal: Option<Terminal>,
358}
359
360const OUTER_OBJECT: u8 = 2; const OUTER_SKIP: u8 = 1; const OUTER_ERROR: u8 = 0; static OUTER_CHARS: [u8; 256] = {
366 let mut table = [OUTER_ERROR; 256];
367 table[b'{' as usize] = OUTER_OBJECT;
368 table[b'[' as usize] = OUTER_SKIP;
369 table[b']' as usize] = OUTER_SKIP;
370 table[b',' as usize] = OUTER_SKIP;
371 table[b' ' as usize] = OUTER_SKIP;
372 table[b'\n' as usize] = OUTER_SKIP;
373 table[b'\r' as usize] = OUTER_SKIP;
374 table[b'\t' as usize] = OUTER_SKIP;
375 table
376};
377
378static INNER_INTERESTING: [u8; 256] = {
383 let mut table = [0u8; 256];
384 table[b'"' as usize] = 1;
385 table[b'{' as usize] = 1;
386 table[b'}' as usize] = 1;
387 table
388};
389
390static STRING_INTERESTING: [u8; 256] = {
395 let mut table = [0u8; 256];
396 table[b'"' as usize] = 1;
397 table[b'\\' as usize] = 1;
398 table
399};
400
401impl JsonFramingStream {
402 fn new(input: BoxStream<Vec<u8>>, maximum_object_length: usize) -> Self {
403 Self {
404 input,
405 maximum_object_length,
406 buffer: Vec::with_capacity(maximum_object_length.min(4096)),
407 pos: 0,
408 start: 0,
409 depth: 0,
410 completed_object: false,
411 in_string_expression: false,
412 in_backslash_escape: false,
413 terminal: None,
414 }
415 }
416
417 fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
418 self.terminal = Some(Terminal::Error(error.clone()));
419 Some(Err(error))
420 }
421
422 fn can_complete(&self) -> bool {
423 self.depth == 0
424 }
425
426 fn compact(&mut self) {
430 if self.start == 0 {
431 return;
432 }
433 if self.start >= self.buffer.len() {
434 self.buffer.clear();
435 } else {
436 self.buffer.drain(..self.start);
437 }
438 self.pos -= self.start;
439 self.start = 0;
440 }
441
442 fn skip_to_next_object(&mut self) -> StreamResult<()> {
443 if self.depth > 0 {
444 return Ok(());
445 }
446 let max = self.buffer.len();
447 let limit = self.maximum_object_length;
448 let mut pos = self.pos;
449 let mut start = self.start;
450 while pos < max && pos - start < limit {
451 let outer = OUTER_CHARS[self.buffer[pos] as usize];
452 if outer == OUTER_SKIP {
453 start += 1;
454 pos += 1;
455 } else if outer == OUTER_OBJECT {
456 self.start = start;
457 self.pos = pos + 1;
458 self.depth = 1;
459 return Ok(());
460 } else {
461 return Err(StreamError::Failed(format!(
462 "Invalid JSON encountered at position [{}] of [{}]",
463 self.start,
464 String::from_utf8_lossy(&self.buffer)
465 )));
466 }
467 }
468 self.start = start;
469 self.pos = pos;
470 Ok(())
471 }
472
473 fn scan_object(&mut self) -> StreamResult<()> {
474 let max = self.buffer.len();
475 let limit = self.maximum_object_length;
476 let mut pos = self.pos;
477 let mut depth = self.depth;
478 let mut in_string = self.in_string_expression;
479 let mut in_escape = self.in_backslash_escape;
480 let start = self.start;
481 let mut completed = false;
482
483 while pos < max && pos - start < limit {
484 let byte = self.buffer[pos];
485 if in_string {
486 if in_escape {
487 in_escape = false;
490 } else if STRING_INTERESTING[byte as usize] != 0 {
491 if byte == b'"' {
493 in_string = false;
494 } else {
495 in_escape = true;
496 }
497 }
498 } else {
503 if INNER_INTERESTING[byte as usize] == 0 {
506 pos += 1;
507 continue;
508 }
509 match byte {
510 b'"' => in_string = true,
511 b'{' => depth += 1,
512 b'}' => {
513 depth -= 1;
514 if depth == 0 {
515 pos += 1;
516 completed = true;
517 break;
518 }
519 }
520 _ => {}
521 }
522 }
523 pos += 1;
524 }
525
526 self.pos = pos;
527 self.depth = depth;
528 self.in_string_expression = in_string;
529 self.in_backslash_escape = in_escape;
530 self.completed_object = completed;
531 Ok(())
532 }
533
534 fn poll_object(&mut self) -> StreamResult<Option<Vec<u8>>> {
535 self.completed_object = false;
536 self.skip_to_next_object()?;
537 self.scan_object()?;
538
539 if self.pos.saturating_sub(self.start) >= self.maximum_object_length {
540 return Err(StreamError::Failed(format!(
541 "JSON element exceeded maximumObjectLength ({} bytes)!",
542 self.maximum_object_length
543 )));
544 }
545
546 if self.completed_object && self.start < self.pos {
547 let frame = self.buffer[self.start..self.pos].to_vec();
548 self.start = self.pos;
551 return Ok(Some(frame));
552 }
553
554 Ok(None)
555 }
556}
557
558impl Iterator for JsonFramingStream {
559 type Item = StreamResult<Vec<u8>>;
560
561 fn next(&mut self) -> Option<Self::Item> {
562 if let Some(terminal) = &self.terminal {
563 return sticky_terminal(terminal);
564 }
565
566 loop {
567 match self.poll_object() {
568 Ok(Some(frame)) => return Some(Ok(frame)),
569 Ok(None) => {}
570 Err(error) => return self.fail(error),
571 }
572
573 match self.input.next() {
574 Some(Ok(chunk)) => {
575 if self.start > 0 && self.buffer.capacity() - self.buffer.len() < chunk.len() {
580 let tail = self.buffer.len() - self.start;
581 if tail + chunk.len() <= self.buffer.capacity() {
582 self.compact();
583 }
584 }
585 self.buffer.extend_from_slice(&chunk);
586 }
587 Some(Err(error)) => {
588 self.terminal = Some(Terminal::Error(error.clone()));
589 return Some(Err(error));
590 }
591 None => {
592 if self.start >= self.buffer.len() {
593 self.terminal = Some(Terminal::Complete);
594 return None;
595 }
596 if self.can_complete() {
597 self.start = 0;
598 self.terminal = Some(Terminal::Complete);
599 return None;
600 }
601 return self.fail(StreamError::Failed(
602 "Stream finished but there was a truncated final frame in the buffer"
603 .to_owned(),
604 ));
605 }
606 }
607 }
608 }
609}
610
611#[cfg(test)]
612mod tests {
613 use super::*;
614 use crate::testkit::{TestSink, TestSource};
615 use crate::{Keep, Source};
616
617 #[test]
618 fn delimiter_framing_handles_split_frames() {
619 let sink = Source::from_iter([b"ab|c".to_vec(), b"d|".to_vec()])
620 .via(Framing::delimiter(b"|".to_vec(), 16, false))
621 .run_with(TestSink::probe())
622 .expect("delimiter framing materializes");
623
624 sink.request(3);
625 sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
626 sink.expect_complete();
627 }
628
629 #[test]
630 fn delimiter_framing_fails_on_max_length_exceeded() {
631 let sink = Source::single(b"abcdef".to_vec())
632 .via(Framing::delimiter(b"|".to_vec(), 3, false))
633 .run_with(TestSink::probe())
634 .expect("delimiter framing materializes");
635
636 sink.request(1);
637 assert_eq!(
638 sink.expect_error(),
639 StreamError::Failed(
640 "Read 6 bytes which is more than 3 without seeing a line terminator".to_owned()
641 )
642 );
643 }
644
645 #[test]
646 fn delimiter_framing_accepts_max_length_frame_with_split_partial_delimiter() {
647 let frame = vec![b'a'; 64];
648 let first = [frame.clone(), vec![b'\r']].concat();
649 let second = vec![b'\n'];
650 let sink = Source::from_iter([first, second])
651 .via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
652 .run_with(TestSink::probe())
653 .expect("delimiter framing materializes");
654
655 sink.request(2);
656 sink.assert_next(frame);
657 sink.expect_complete();
658 }
659
660 #[test]
661 fn delimiter_framing_still_fails_when_max_length_is_exceeded_without_partial_match() {
662 let sink = Source::single(vec![b'a'; 65])
663 .via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
664 .run_with(TestSink::probe())
665 .expect("delimiter framing materializes");
666
667 sink.request(1);
668 assert_eq!(
669 sink.expect_error(),
670 StreamError::Failed(
671 "Read 65 bytes which is more than 64 without seeing a line terminator".to_owned()
672 )
673 );
674 }
675
676 #[test]
677 fn delimiter_framing_fails_on_truncated_eof() {
678 let sink = Source::single(b"abc".to_vec())
679 .via(Framing::delimiter(b"|".to_vec(), 8, false))
680 .run_with(TestSink::probe())
681 .expect("delimiter framing materializes");
682
683 sink.request(1);
684 assert_eq!(
685 sink.expect_error(),
686 StreamError::Failed(
687 "Stream finished but there was a truncated final frame in the buffer".to_owned()
688 )
689 );
690 }
691
692 #[test]
693 fn delimiter_framing_allows_truncation_when_enabled() {
694 let sink = Source::single(b"abc".to_vec())
695 .via(Framing::delimiter(b"|".to_vec(), 8, true))
696 .run_with(TestSink::probe())
697 .expect("delimiter framing materializes");
698
699 sink.request(2);
700 sink.assert_next(b"abc".to_vec());
701 sink.expect_complete();
702 }
703
704 #[test]
705 fn length_field_framing_handles_split_headers_and_payloads() {
706 let frame = [0_u8, 0_u8, 0_u8, 2_u8, b'o', b'k'];
707 let sink = Source::from_iter([frame[..3].to_vec(), frame[3..].to_vec()])
708 .via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
709 .run_with(TestSink::probe())
710 .expect("length field framing materializes");
711
712 sink.request(2);
713 sink.assert_next(frame.to_vec());
714 sink.expect_complete();
715 }
716
717 #[test]
718 fn length_field_framing_reads_header_at_non_zero_offset() {
719 let frame = [b'i', b'd', 0_u8, 3_u8, b'h', b'e', b'y'];
720 let sink = Source::from_iter([
721 frame[..1].to_vec(),
722 frame[1..3].to_vec(),
723 frame[3..].to_vec(),
724 ])
725 .via(Framing::length_field(2, 2, 16, FramingByteOrder::BigEndian))
726 .run_with(TestSink::probe())
727 .expect("length field framing materializes");
728
729 sink.request(2);
730 sink.assert_next(frame.to_vec());
731 sink.expect_complete();
732 }
733
734 #[test]
735 fn length_field_framing_fails_on_max_length_exceeded() {
736 let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 8_u8])
737 .via(Framing::length_field(4, 0, 6, FramingByteOrder::BigEndian))
738 .run_with(TestSink::probe())
739 .expect("length field framing materializes");
740
741 sink.request(1);
742 assert_eq!(
743 sink.expect_error(),
744 StreamError::Failed(
745 "Maximum allowed frame size is 6 but decoded frame header reported size 12"
746 .to_owned()
747 )
748 );
749 }
750
751 #[test]
752 fn length_field_framing_fails_on_truncated_eof() {
753 let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 2_u8, b'o'])
754 .via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
755 .run_with(TestSink::probe())
756 .expect("length field framing materializes");
757
758 sink.request(1);
759 assert_eq!(
760 sink.expect_error(),
761 StreamError::Failed(
762 "Stream finished but there was a truncated final frame in the buffer".to_owned()
763 )
764 );
765 }
766
767 #[test]
768 fn length_field_framing_treats_two_byte_big_endian_lengths_as_unsigned() {
769 let payload = vec![b'x'; 0x8000];
770 let mut frame = vec![0x80, 0x00];
771 frame.extend_from_slice(&payload);
772 let sink = Source::single(frame.clone())
773 .via(Framing::length_field(
774 2,
775 0,
776 frame.len(),
777 FramingByteOrder::BigEndian,
778 ))
779 .run_with(TestSink::probe())
780 .expect("length field framing materializes");
781
782 sink.request(2);
783 sink.assert_next(frame);
784 sink.expect_complete();
785 }
786
787 #[test]
788 fn length_field_framing_treats_two_byte_little_endian_lengths_as_unsigned() {
789 let payload = vec![b'y'; 0x8000];
790 let mut frame = vec![0x00, 0x80];
791 frame.extend_from_slice(&payload);
792 let sink = Source::single(frame.clone())
793 .via(Framing::length_field(
794 2,
795 0,
796 frame.len(),
797 FramingByteOrder::LittleEndian,
798 ))
799 .run_with(TestSink::probe())
800 .expect("length field framing materializes");
801
802 sink.request(2);
803 sink.assert_next(frame);
804 sink.expect_complete();
805 }
806
807 #[test]
808 fn length_field_framing_reads_three_byte_big_endian_lengths() {
809 let frame = [0_u8, 0_u8, 2_u8, b'o', b'k'];
810 let sink = Source::single(frame.to_vec())
811 .via(Framing::length_field(3, 0, 8, FramingByteOrder::BigEndian))
812 .run_with(TestSink::probe())
813 .expect("length field framing materializes");
814
815 sink.request(2);
816 sink.assert_next(frame.to_vec());
817 sink.expect_complete();
818 }
819
820 #[test]
821 fn length_field_framing_reads_three_byte_little_endian_lengths() {
822 let frame = [2_u8, 0_u8, 0_u8, b'o', b'k'];
823 let sink = Source::single(frame.to_vec())
824 .via(Framing::length_field(
825 3,
826 0,
827 8,
828 FramingByteOrder::LittleEndian,
829 ))
830 .run_with(TestSink::probe())
831 .expect("length field framing materializes");
832
833 sink.request(2);
834 sink.assert_next(frame.to_vec());
835 sink.expect_complete();
836 }
837
838 #[test]
839 fn length_field_framing_keeps_four_byte_signed_overflow_behavior() {
840 let sink = Source::single(vec![0x80, 0x00, 0x00, 0x00])
841 .via(Framing::length_field(
842 4,
843 0,
844 usize::MAX,
845 FramingByteOrder::BigEndian,
846 ))
847 .run_with(TestSink::probe())
848 .expect("length field framing materializes");
849
850 sink.request(1);
851 assert_eq!(
852 sink.expect_error(),
853 StreamError::Failed(
854 "Decoded frame header reported negative size -2147483648".to_owned()
855 )
856 );
857 }
858
859 #[test]
860 fn json_framing_extracts_objects_split_across_chunks() {
861 let sink = Source::from_iter([b"[{\"a\":1},".to_vec(), b"{\"b\":2}]".to_vec()])
862 .via(Framing::json(64))
863 .run_with(TestSink::probe())
864 .expect("json framing materializes");
865
866 sink.request(3);
867 sink.assert_next_n([b"{\"a\":1}".to_vec(), b"{\"b\":2}".to_vec()]);
868 sink.expect_complete();
869 }
870
871 #[test]
872 fn json_framing_fails_on_max_length_exceeded() {
873 let sink = Source::single(b"{\"abcdef\":1}".to_vec())
874 .via(Framing::json(4))
875 .run_with(TestSink::probe())
876 .expect("json framing materializes");
877
878 sink.request(1);
879 assert_eq!(
880 sink.expect_error(),
881 StreamError::Failed("JSON element exceeded maximumObjectLength (4 bytes)!".to_owned())
882 );
883 }
884
885 #[test]
886 fn json_framing_fails_on_truncated_eof() {
887 let sink = Source::single(b"{\"a\":".to_vec())
888 .via(Framing::json(32))
889 .run_with(TestSink::probe())
890 .expect("json framing materializes");
891
892 sink.request(1);
893 assert_eq!(
894 sink.expect_error(),
895 StreamError::Failed(
896 "Stream finished but there was a truncated final frame in the buffer".to_owned()
897 )
898 );
899 }
900
901 #[test]
902 fn framing_preserves_upstream_errors() {
903 let (source, sink) = TestSource::probe::<Vec<u8>>()
904 .via(Framing::delimiter(b"|".to_vec(), 32, false))
905 .to_mat(TestSink::probe(), Keep::both)
906 .run()
907 .expect("probe framing materializes");
908
909 sink.request(1);
910 source.send_error(StreamError::Failed("boom".to_owned()));
911 assert_eq!(sink.expect_error(), StreamError::Failed("boom".to_owned()));
912 }
913
914 #[test]
915 fn json_framing_no_bleed_across_buffer_reuse() {
916 use crate::Sink;
917 let mut payload = Vec::new();
918 payload.push(b'[');
919 for index in 0..128_usize {
920 if index > 0 {
921 payload.push(b',');
922 }
923 payload
924 .extend_from_slice(format!("{{\"id\":{index},\"pad\":\"xxxxxxxx\"}}").as_bytes());
925 }
926 payload.push(b']');
927
928 let chunks: Vec<Vec<u8>> = payload.chunks(193).map(|c| c.to_vec()).collect();
929 assert!(
930 chunks.len() > 4,
931 "chunks must split objects across boundaries"
932 );
933
934 let frames = Source::from_iter(chunks)
935 .via(Framing::json(256))
936 .run_with(Sink::collect())
937 .expect("json framing materializes")
938 .wait()
939 .expect("json framing completes");
940
941 assert_eq!(frames.len(), 128);
942 for (index, frame) in frames.iter().enumerate() {
943 let text = std::str::from_utf8(frame).expect("frame is valid utf-8");
944 let needle = format!("\"id\":{index}");
945 assert!(
946 text.contains(&needle),
947 "frame {index} missing {needle}: {text}"
948 );
949 assert_eq!(text.as_bytes()[0], b'{');
950 assert_eq!(text.as_bytes()[text.len() - 1], b'}');
951 }
952 }
953
954 #[test]
955 fn json_framing_compacts_buffer_at_chunk_boundaries() {
956 use crate::Sink;
957 let object = br#"{"id":1,"name":"x"}"#;
958 let mut payload = Vec::new();
959 payload.push(b'[');
960 for _ in 0..32 {
961 payload.push(b',');
962 payload.extend_from_slice(object);
963 }
964 payload.push(b']');
965 let chunks: Vec<Vec<u8>> = payload.iter().copied().map(|b| vec![b]).collect();
966
967 let frames = Source::from_iter(chunks)
968 .via(Framing::json(64))
969 .run_with(Sink::collect())
970 .expect("json framing materializes")
971 .wait()
972 .expect("json framing completes");
973
974 assert_eq!(frames.len(), 32);
975 for frame in &frames {
976 assert_eq!(frame.as_slice(), object);
977 }
978 }
979}