1use crate::stream::{BoxStream, Flow, NotUsed};
2use crate::{StreamError, StreamResult};
3#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4pub enum FramingByteOrder {
5 BigEndian,
6 LittleEndian,
7}
8
9#[derive(Clone)]
10enum Terminal {
11 Complete,
12 Error(StreamError),
13}
14
15fn sticky_terminal<T>(terminal: &Terminal) -> Option<StreamResult<T>> {
16 match terminal {
17 Terminal::Complete => None,
18 Terminal::Error(error) => Some(Err(error.clone())),
19 }
20}
21
22pub struct Framing;
23
24impl Framing {
25 #[must_use]
26 pub fn delimiter(
27 delimiter: Vec<u8>,
28 maximum_frame_length: usize,
29 allow_truncation: bool,
30 ) -> Flow<Vec<u8>, Vec<u8>> {
31 assert!(
32 !delimiter.is_empty(),
33 "delimiter must contain at least one byte"
34 );
35 assert!(
36 maximum_frame_length > 0,
37 "maximum frame length must be greater than zero"
38 );
39 Flow::from_transform(move |input| {
40 Box::new(DelimiterFramingStream::new(
41 input,
42 delimiter.clone(),
43 maximum_frame_length,
44 allow_truncation,
45 )) as BoxStream<Vec<u8>>
46 })
47 }
48
49 #[must_use]
50 pub fn length_field(
51 field_length: usize,
52 field_offset: usize,
53 maximum_frame_length: usize,
54 byte_order: FramingByteOrder,
55 ) -> Flow<Vec<u8>, Vec<u8>> {
56 assert!(
57 (1..=4).contains(&field_length),
58 "Length field length must be 1, 2, 3 or 4."
59 );
60 assert!(
61 maximum_frame_length > 0,
62 "maximum frame length must be greater than zero"
63 );
64 Flow::from_transform(move |input| {
65 Box::new(LengthFieldFramingStream::new(
66 input,
67 field_length,
68 field_offset,
69 maximum_frame_length,
70 byte_order,
71 )) as BoxStream<Vec<u8>>
72 })
73 }
74
75 #[must_use]
76 pub fn json(maximum_object_length: usize) -> Flow<Vec<u8>, Vec<u8>, NotUsed> {
77 assert!(
78 maximum_object_length > 0,
79 "maximum object length must be greater than zero"
80 );
81 Flow::from_transform(move |input| {
82 Box::new(JsonFramingStream::new(input, maximum_object_length)) as BoxStream<Vec<u8>>
83 })
84 }
85}
86
87struct DelimiterFramingStream {
88 input: BoxStream<Vec<u8>>,
89 delimiter: Vec<u8>,
90 maximum_frame_length: usize,
91 allow_truncation: bool,
92 buffer: Vec<u8>,
93 terminal: Option<Terminal>,
94}
95
96impl DelimiterFramingStream {
97 fn new(
98 input: BoxStream<Vec<u8>>,
99 delimiter: Vec<u8>,
100 maximum_frame_length: usize,
101 allow_truncation: bool,
102 ) -> Self {
103 Self {
104 input,
105 delimiter,
106 maximum_frame_length,
107 allow_truncation,
108 buffer: Vec::new(),
109 terminal: None,
110 }
111 }
112
113 fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
114 self.terminal = Some(Terminal::Error(error.clone()));
115 Some(Err(error))
116 }
117
118 fn delimiter_position(&self) -> Option<usize> {
119 self.buffer
120 .windows(self.delimiter.len())
121 .position(|window| window == self.delimiter.as_slice())
122 }
123
124 fn trailing_delimiter_prefix_len(&self) -> usize {
125 let max_prefix = self
126 .delimiter
127 .len()
128 .saturating_sub(1)
129 .min(self.buffer.len());
130 (1..=max_prefix)
131 .rev()
132 .find(|&prefix_len| self.buffer.ends_with(&self.delimiter[..prefix_len]))
133 .unwrap_or(0)
134 }
135}
136
137impl Iterator for DelimiterFramingStream {
138 type Item = StreamResult<Vec<u8>>;
139
140 fn next(&mut self) -> Option<Self::Item> {
141 if let Some(terminal) = &self.terminal {
142 return sticky_terminal(terminal);
143 }
144
145 loop {
146 if let Some(position) = self.delimiter_position() {
147 if position > self.maximum_frame_length {
148 return self.fail(StreamError::Failed(format!(
149 "Read {position} bytes which is more than {} without seeing a line terminator",
150 self.maximum_frame_length
151 )));
152 }
153 let frame = self.buffer[..position].to_vec();
154 self.buffer.drain(..position + self.delimiter.len());
155 return Some(Ok(frame));
156 }
157
158 let trailing_partial = self.trailing_delimiter_prefix_len();
162 let unmatched = self.buffer.len() - trailing_partial;
163 if unmatched > self.maximum_frame_length {
164 return self.fail(StreamError::Failed(format!(
165 "Read {} bytes which is more than {} without seeing a line terminator",
166 unmatched, self.maximum_frame_length
167 )));
168 }
169
170 match self.input.next() {
171 Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
172 Some(Err(error)) => {
173 self.terminal = Some(Terminal::Error(error.clone()));
174 return Some(Err(error));
175 }
176 None => {
177 if self.buffer.is_empty() {
178 self.terminal = Some(Terminal::Complete);
179 return None;
180 }
181 if self.allow_truncation {
182 let frame = std::mem::take(&mut self.buffer);
183 self.terminal = Some(Terminal::Complete);
184 return Some(Ok(frame));
185 }
186 return self.fail(StreamError::Failed(
187 "Stream finished but there was a truncated final frame in the buffer"
188 .to_owned(),
189 ));
190 }
191 }
192 }
193 }
194}
195
196struct LengthFieldFramingStream {
197 input: BoxStream<Vec<u8>>,
198 field_length: usize,
199 field_offset: usize,
200 minimum_chunk_size: usize,
201 maximum_frame_length: usize,
202 byte_order: FramingByteOrder,
203 buffer: Vec<u8>,
204 frame_size: Option<usize>,
205 terminal: Option<Terminal>,
206}
207
208impl LengthFieldFramingStream {
209 fn new(
210 input: BoxStream<Vec<u8>>,
211 field_length: usize,
212 field_offset: usize,
213 maximum_frame_length: usize,
214 byte_order: FramingByteOrder,
215 ) -> Self {
216 let minimum_chunk_size = field_offset + field_length;
217 Self {
218 input,
219 field_length,
220 field_offset,
221 minimum_chunk_size,
222 maximum_frame_length,
223 byte_order,
224 buffer: Vec::new(),
225 frame_size: None,
226 terminal: None,
227 }
228 }
229
230 fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
231 self.terminal = Some(Terminal::Error(error.clone()));
232 Some(Err(error))
233 }
234
235 fn parse_length(&self) -> i32 {
236 let bytes = &self.buffer[self.field_offset..self.field_offset + self.field_length];
237 match (self.byte_order, self.field_length) {
238 (FramingByteOrder::BigEndian, 1) => i32::from(bytes[0]),
239 (FramingByteOrder::BigEndian, 2) => i32::from(u16::from_be_bytes([bytes[0], bytes[1]])),
240 (FramingByteOrder::BigEndian, 3) => {
241 ((i32::from(bytes[0])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[2])
242 }
243 (FramingByteOrder::BigEndian, 4) => {
244 i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
245 }
246 (FramingByteOrder::LittleEndian, 1) => i32::from(bytes[0]),
247 (FramingByteOrder::LittleEndian, 2) => {
248 i32::from(u16::from_le_bytes([bytes[0], bytes[1]]))
249 }
250 (FramingByteOrder::LittleEndian, 3) => {
251 ((i32::from(bytes[2])) << 16) | ((i32::from(bytes[1])) << 8) | i32::from(bytes[0])
252 }
253 (FramingByteOrder::LittleEndian, 4) => {
254 i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
255 }
256 _ => unreachable!("field length validated at construction"),
257 }
258 }
259}
260
261impl Iterator for LengthFieldFramingStream {
262 type Item = StreamResult<Vec<u8>>;
263
264 fn next(&mut self) -> Option<Self::Item> {
265 if let Some(terminal) = &self.terminal {
266 return sticky_terminal(terminal);
267 }
268
269 loop {
270 if let Some(frame_size) = self.frame_size {
271 if self.buffer.len() >= frame_size {
272 let frame = self.buffer[..frame_size].to_vec();
273 self.buffer.drain(..frame_size);
274 self.frame_size = None;
275 return Some(Ok(frame));
276 }
277 } else if self.buffer.len() >= self.minimum_chunk_size {
278 let parsed_length = self.parse_length();
279 if parsed_length < 0 {
280 return self.fail(StreamError::Failed(format!(
281 "Decoded frame header reported negative size {parsed_length}"
282 )));
283 }
284 let frame_size = parsed_length as usize + self.minimum_chunk_size;
285 if frame_size > self.maximum_frame_length {
286 return self.fail(StreamError::Failed(format!(
287 "Maximum allowed frame size is {} but decoded frame header reported size {frame_size}",
288 self.maximum_frame_length
289 )));
290 }
291 if frame_size < self.minimum_chunk_size {
292 return self.fail(StreamError::Failed(format!(
293 "Computed frame size {frame_size} is less than minimum chunk size {}",
294 self.minimum_chunk_size
295 )));
296 }
297 self.frame_size = Some(frame_size);
298 continue;
299 }
300
301 match self.input.next() {
302 Some(Ok(chunk)) => self.buffer.extend_from_slice(&chunk),
303 Some(Err(error)) => {
304 self.terminal = Some(Terminal::Error(error.clone()));
305 return Some(Err(error));
306 }
307 None => {
308 if self.buffer.is_empty() {
309 self.terminal = Some(Terminal::Complete);
310 return None;
311 }
312 return self.fail(StreamError::Failed(
313 "Stream finished but there was a truncated final frame in the buffer"
314 .to_owned(),
315 ));
316 }
317 }
318 }
319 }
320}
321
322struct JsonFramingStream {
323 input: BoxStream<Vec<u8>>,
324 maximum_object_length: usize,
325 buffer: Vec<u8>,
326 pos: usize,
327 start: usize,
328 depth: usize,
329 completed_object: bool,
330 in_string_expression: bool,
331 in_backslash_escape: bool,
332 terminal: Option<Terminal>,
333}
334
335const OUTER_OBJECT: u8 = 2; const OUTER_SKIP: u8 = 1; const OUTER_ERROR: u8 = 0; static OUTER_CHARS: [u8; 256] = {
341 let mut table = [OUTER_ERROR; 256];
342 table[b'{' as usize] = OUTER_OBJECT;
343 table[b'[' as usize] = OUTER_SKIP;
344 table[b']' as usize] = OUTER_SKIP;
345 table[b',' as usize] = OUTER_SKIP;
346 table[b' ' as usize] = OUTER_SKIP;
347 table[b'\n' as usize] = OUTER_SKIP;
348 table[b'\r' as usize] = OUTER_SKIP;
349 table[b'\t' as usize] = OUTER_SKIP;
350 table
351};
352
353static INNER_INTERESTING: [u8; 256] = {
358 let mut table = [0u8; 256];
359 table[b'"' as usize] = 1;
360 table[b'{' as usize] = 1;
361 table[b'}' as usize] = 1;
362 table
363};
364
365static STRING_INTERESTING: [u8; 256] = {
370 let mut table = [0u8; 256];
371 table[b'"' as usize] = 1;
372 table[b'\\' as usize] = 1;
373 table
374};
375
376impl JsonFramingStream {
377 fn new(input: BoxStream<Vec<u8>>, maximum_object_length: usize) -> Self {
378 Self {
379 input,
380 maximum_object_length,
381 buffer: Vec::with_capacity(maximum_object_length.min(4096)),
382 pos: 0,
383 start: 0,
384 depth: 0,
385 completed_object: false,
386 in_string_expression: false,
387 in_backslash_escape: false,
388 terminal: None,
389 }
390 }
391
392 fn fail<T>(&mut self, error: StreamError) -> Option<StreamResult<T>> {
393 self.terminal = Some(Terminal::Error(error.clone()));
394 Some(Err(error))
395 }
396
397 fn can_complete(&self) -> bool {
398 self.depth == 0
399 }
400
401 fn compact(&mut self) {
405 if self.start == 0 {
406 return;
407 }
408 if self.start >= self.buffer.len() {
409 self.buffer.clear();
410 } else {
411 self.buffer.drain(..self.start);
412 }
413 self.pos -= self.start;
414 self.start = 0;
415 }
416
417 fn skip_to_next_object(&mut self) -> StreamResult<()> {
418 if self.depth > 0 {
419 return Ok(());
420 }
421 let max = self.buffer.len();
422 let limit = self.maximum_object_length;
423 let mut pos = self.pos;
424 let mut start = self.start;
425 while pos < max && pos - start < limit {
426 let outer = OUTER_CHARS[self.buffer[pos] as usize];
427 if outer == OUTER_SKIP {
428 start += 1;
429 pos += 1;
430 } else if outer == OUTER_OBJECT {
431 self.start = start;
432 self.pos = pos + 1;
433 self.depth = 1;
434 return Ok(());
435 } else {
436 return Err(StreamError::Failed(format!(
437 "Invalid JSON encountered at position [{}] of [{}]",
438 self.start,
439 String::from_utf8_lossy(&self.buffer)
440 )));
441 }
442 }
443 self.start = start;
444 self.pos = pos;
445 Ok(())
446 }
447
448 fn scan_object(&mut self) -> StreamResult<()> {
449 let max = self.buffer.len();
450 let limit = self.maximum_object_length;
451 let mut pos = self.pos;
452 let mut depth = self.depth;
453 let mut in_string = self.in_string_expression;
454 let mut in_escape = self.in_backslash_escape;
455 let start = self.start;
456 let mut completed = false;
457
458 while pos < max && pos - start < limit {
459 let byte = self.buffer[pos];
460 if in_string {
461 if in_escape {
462 in_escape = false;
465 } else if STRING_INTERESTING[byte as usize] != 0 {
466 if byte == b'"' {
468 in_string = false;
469 } else {
470 in_escape = true;
471 }
472 }
473 } else {
478 if INNER_INTERESTING[byte as usize] == 0 {
481 pos += 1;
482 continue;
483 }
484 match byte {
485 b'"' => in_string = true,
486 b'{' => depth += 1,
487 b'}' => {
488 depth -= 1;
489 if depth == 0 {
490 pos += 1;
491 completed = true;
492 break;
493 }
494 }
495 _ => {}
496 }
497 }
498 pos += 1;
499 }
500
501 self.pos = pos;
502 self.depth = depth;
503 self.in_string_expression = in_string;
504 self.in_backslash_escape = in_escape;
505 self.completed_object = completed;
506 Ok(())
507 }
508
509 fn poll_object(&mut self) -> StreamResult<Option<Vec<u8>>> {
510 self.completed_object = false;
511 self.skip_to_next_object()?;
512 self.scan_object()?;
513
514 if self.pos.saturating_sub(self.start) >= self.maximum_object_length {
515 return Err(StreamError::Failed(format!(
516 "JSON element exceeded maximumObjectLength ({} bytes)!",
517 self.maximum_object_length
518 )));
519 }
520
521 if self.completed_object && self.start < self.pos {
522 let frame = self.buffer[self.start..self.pos].to_vec();
523 self.start = self.pos;
526 return Ok(Some(frame));
527 }
528
529 Ok(None)
530 }
531}
532
533impl Iterator for JsonFramingStream {
534 type Item = StreamResult<Vec<u8>>;
535
536 fn next(&mut self) -> Option<Self::Item> {
537 if let Some(terminal) = &self.terminal {
538 return sticky_terminal(terminal);
539 }
540
541 loop {
542 match self.poll_object() {
543 Ok(Some(frame)) => return Some(Ok(frame)),
544 Ok(None) => {}
545 Err(error) => return self.fail(error),
546 }
547
548 match self.input.next() {
549 Some(Ok(chunk)) => {
550 if self.start > 0 && self.buffer.capacity() - self.buffer.len() < chunk.len() {
555 let tail = self.buffer.len() - self.start;
556 if tail + chunk.len() <= self.buffer.capacity() {
557 self.compact();
558 }
559 }
560 self.buffer.extend_from_slice(&chunk);
561 }
562 Some(Err(error)) => {
563 self.terminal = Some(Terminal::Error(error.clone()));
564 return Some(Err(error));
565 }
566 None => {
567 if self.start >= self.buffer.len() {
568 self.terminal = Some(Terminal::Complete);
569 return None;
570 }
571 if self.can_complete() {
572 self.start = 0;
573 self.terminal = Some(Terminal::Complete);
574 return None;
575 }
576 return self.fail(StreamError::Failed(
577 "Stream finished but there was a truncated final frame in the buffer"
578 .to_owned(),
579 ));
580 }
581 }
582 }
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use crate::testkit::{TestSink, TestSource};
590 use crate::{Keep, Source};
591
592 #[test]
593 fn delimiter_framing_handles_split_frames() {
594 let sink = Source::from_iter([b"ab|c".to_vec(), b"d|".to_vec()])
595 .via(Framing::delimiter(b"|".to_vec(), 16, false))
596 .run_with(TestSink::probe())
597 .expect("delimiter framing materializes");
598
599 sink.request(3);
600 sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
601 sink.expect_complete();
602 }
603
604 #[test]
605 fn delimiter_framing_fails_on_max_length_exceeded() {
606 let sink = Source::single(b"abcdef".to_vec())
607 .via(Framing::delimiter(b"|".to_vec(), 3, false))
608 .run_with(TestSink::probe())
609 .expect("delimiter framing materializes");
610
611 sink.request(1);
612 assert_eq!(
613 sink.expect_error(),
614 StreamError::Failed(
615 "Read 6 bytes which is more than 3 without seeing a line terminator".to_owned()
616 )
617 );
618 }
619
620 #[test]
621 fn delimiter_framing_accepts_max_length_frame_with_split_partial_delimiter() {
622 let frame = vec![b'a'; 64];
623 let first = [frame.clone(), vec![b'\r']].concat();
624 let second = vec![b'\n'];
625 let sink = Source::from_iter([first, second])
626 .via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
627 .run_with(TestSink::probe())
628 .expect("delimiter framing materializes");
629
630 sink.request(2);
631 sink.assert_next(frame);
632 sink.expect_complete();
633 }
634
635 #[test]
636 fn delimiter_framing_still_fails_when_max_length_is_exceeded_without_partial_match() {
637 let sink = Source::single(vec![b'a'; 65])
638 .via(Framing::delimiter(b"\r\n".to_vec(), 64, false))
639 .run_with(TestSink::probe())
640 .expect("delimiter framing materializes");
641
642 sink.request(1);
643 assert_eq!(
644 sink.expect_error(),
645 StreamError::Failed(
646 "Read 65 bytes which is more than 64 without seeing a line terminator".to_owned()
647 )
648 );
649 }
650
651 #[test]
652 fn delimiter_framing_fails_on_truncated_eof() {
653 let sink = Source::single(b"abc".to_vec())
654 .via(Framing::delimiter(b"|".to_vec(), 8, false))
655 .run_with(TestSink::probe())
656 .expect("delimiter framing materializes");
657
658 sink.request(1);
659 assert_eq!(
660 sink.expect_error(),
661 StreamError::Failed(
662 "Stream finished but there was a truncated final frame in the buffer".to_owned()
663 )
664 );
665 }
666
667 #[test]
668 fn delimiter_framing_allows_truncation_when_enabled() {
669 let sink = Source::single(b"abc".to_vec())
670 .via(Framing::delimiter(b"|".to_vec(), 8, true))
671 .run_with(TestSink::probe())
672 .expect("delimiter framing materializes");
673
674 sink.request(2);
675 sink.assert_next(b"abc".to_vec());
676 sink.expect_complete();
677 }
678
679 #[test]
680 fn length_field_framing_handles_split_headers_and_payloads() {
681 let frame = [0_u8, 0_u8, 0_u8, 2_u8, b'o', b'k'];
682 let sink = Source::from_iter([frame[..3].to_vec(), frame[3..].to_vec()])
683 .via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
684 .run_with(TestSink::probe())
685 .expect("length field framing materializes");
686
687 sink.request(2);
688 sink.assert_next(frame.to_vec());
689 sink.expect_complete();
690 }
691
692 #[test]
693 fn length_field_framing_fails_on_max_length_exceeded() {
694 let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 8_u8])
695 .via(Framing::length_field(4, 0, 6, FramingByteOrder::BigEndian))
696 .run_with(TestSink::probe())
697 .expect("length field framing materializes");
698
699 sink.request(1);
700 assert_eq!(
701 sink.expect_error(),
702 StreamError::Failed(
703 "Maximum allowed frame size is 6 but decoded frame header reported size 12"
704 .to_owned()
705 )
706 );
707 }
708
709 #[test]
710 fn length_field_framing_fails_on_truncated_eof() {
711 let sink = Source::single(vec![0_u8, 0_u8, 0_u8, 2_u8, b'o'])
712 .via(Framing::length_field(4, 0, 16, FramingByteOrder::BigEndian))
713 .run_with(TestSink::probe())
714 .expect("length field framing materializes");
715
716 sink.request(1);
717 assert_eq!(
718 sink.expect_error(),
719 StreamError::Failed(
720 "Stream finished but there was a truncated final frame in the buffer".to_owned()
721 )
722 );
723 }
724
725 #[test]
726 fn length_field_framing_treats_two_byte_big_endian_lengths_as_unsigned() {
727 let payload = vec![b'x'; 0x8000];
728 let mut frame = vec![0x80, 0x00];
729 frame.extend_from_slice(&payload);
730 let sink = Source::single(frame.clone())
731 .via(Framing::length_field(
732 2,
733 0,
734 frame.len(),
735 FramingByteOrder::BigEndian,
736 ))
737 .run_with(TestSink::probe())
738 .expect("length field framing materializes");
739
740 sink.request(2);
741 sink.assert_next(frame);
742 sink.expect_complete();
743 }
744
745 #[test]
746 fn length_field_framing_treats_two_byte_little_endian_lengths_as_unsigned() {
747 let payload = vec![b'y'; 0x8000];
748 let mut frame = vec![0x00, 0x80];
749 frame.extend_from_slice(&payload);
750 let sink = Source::single(frame.clone())
751 .via(Framing::length_field(
752 2,
753 0,
754 frame.len(),
755 FramingByteOrder::LittleEndian,
756 ))
757 .run_with(TestSink::probe())
758 .expect("length field framing materializes");
759
760 sink.request(2);
761 sink.assert_next(frame);
762 sink.expect_complete();
763 }
764
765 #[test]
766 fn length_field_framing_keeps_four_byte_signed_overflow_behavior() {
767 let sink = Source::single(vec![0x80, 0x00, 0x00, 0x00])
768 .via(Framing::length_field(
769 4,
770 0,
771 usize::MAX,
772 FramingByteOrder::BigEndian,
773 ))
774 .run_with(TestSink::probe())
775 .expect("length field framing materializes");
776
777 sink.request(1);
778 assert_eq!(
779 sink.expect_error(),
780 StreamError::Failed(
781 "Decoded frame header reported negative size -2147483648".to_owned()
782 )
783 );
784 }
785
786 #[test]
787 fn json_framing_extracts_objects_split_across_chunks() {
788 let sink = Source::from_iter([b"[{\"a\":1},".to_vec(), b"{\"b\":2}]".to_vec()])
789 .via(Framing::json(64))
790 .run_with(TestSink::probe())
791 .expect("json framing materializes");
792
793 sink.request(3);
794 sink.assert_next_n([b"{\"a\":1}".to_vec(), b"{\"b\":2}".to_vec()]);
795 sink.expect_complete();
796 }
797
798 #[test]
799 fn json_framing_fails_on_max_length_exceeded() {
800 let sink = Source::single(b"{\"abcdef\":1}".to_vec())
801 .via(Framing::json(4))
802 .run_with(TestSink::probe())
803 .expect("json framing materializes");
804
805 sink.request(1);
806 assert_eq!(
807 sink.expect_error(),
808 StreamError::Failed("JSON element exceeded maximumObjectLength (4 bytes)!".to_owned())
809 );
810 }
811
812 #[test]
813 fn json_framing_fails_on_truncated_eof() {
814 let sink = Source::single(b"{\"a\":".to_vec())
815 .via(Framing::json(32))
816 .run_with(TestSink::probe())
817 .expect("json framing materializes");
818
819 sink.request(1);
820 assert_eq!(
821 sink.expect_error(),
822 StreamError::Failed(
823 "Stream finished but there was a truncated final frame in the buffer".to_owned()
824 )
825 );
826 }
827
828 #[test]
829 fn framing_preserves_upstream_errors() {
830 let (source, sink) = TestSource::probe::<Vec<u8>>()
831 .via(Framing::delimiter(b"|".to_vec(), 32, false))
832 .to_mat(TestSink::probe(), Keep::both)
833 .run()
834 .expect("probe framing materializes");
835
836 sink.request(1);
837 source.send_error(StreamError::Failed("boom".to_owned()));
838 assert_eq!(sink.expect_error(), StreamError::Failed("boom".to_owned()));
839 }
840
841 #[test]
842 fn json_framing_no_bleed_across_buffer_reuse() {
843 use crate::Sink;
844 let mut payload = Vec::new();
845 payload.push(b'[');
846 for index in 0..128_usize {
847 if index > 0 {
848 payload.push(b',');
849 }
850 payload
851 .extend_from_slice(format!("{{\"id\":{index},\"pad\":\"xxxxxxxx\"}}").as_bytes());
852 }
853 payload.push(b']');
854
855 let chunks: Vec<Vec<u8>> = payload.chunks(193).map(|c| c.to_vec()).collect();
856 assert!(
857 chunks.len() > 4,
858 "chunks must split objects across boundaries"
859 );
860
861 let frames = Source::from_iter(chunks)
862 .via(Framing::json(256))
863 .run_with(Sink::collect())
864 .expect("json framing materializes")
865 .wait()
866 .expect("json framing completes");
867
868 assert_eq!(frames.len(), 128);
869 for (index, frame) in frames.iter().enumerate() {
870 let text = std::str::from_utf8(frame).expect("frame is valid utf-8");
871 let needle = format!("\"id\":{index}");
872 assert!(
873 text.contains(&needle),
874 "frame {index} missing {needle}: {text}"
875 );
876 assert_eq!(text.as_bytes()[0], b'{');
877 assert_eq!(text.as_bytes()[text.len() - 1], b'}');
878 }
879 }
880
881 #[test]
882 fn json_framing_compacts_buffer_at_chunk_boundaries() {
883 use crate::Sink;
884 let object = br#"{"id":1,"name":"x"}"#;
885 let mut payload = Vec::new();
886 payload.push(b'[');
887 for _ in 0..32 {
888 payload.push(b',');
889 payload.extend_from_slice(object);
890 }
891 payload.push(b']');
892 let chunks: Vec<Vec<u8>> = payload.iter().copied().map(|b| vec![b]).collect();
893
894 let frames = Source::from_iter(chunks)
895 .via(Framing::json(64))
896 .run_with(Sink::collect())
897 .expect("json framing materializes")
898 .wait()
899 .expect("json framing completes");
900
901 assert_eq!(frames.len(), 32);
902 for frame in &frames {
903 assert_eq!(frame.as_slice(), object);
904 }
905 }
906}