1use super::options::{LineOverflowBehavior, LineParsingOptions};
8use bytes::BytesMut;
9use memchr::memchr;
10use std::borrow::Cow;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13enum LineParserMode {
14 ReadingLine,
15 DiscardUntilNewline,
16 PendingLimitDelimiter,
17}
18
19fn decode_line_lossy(bytes: &[u8]) -> Cow<'_, str> {
21 String::from_utf8_lossy(bytes)
22}
23
24pub struct LineParser {
31 line_buffer: BytesMut,
34
35 emitted: BytesMut,
40
41 mode: LineParserMode,
42}
43
44impl LineParser {
45 #[must_use]
47 pub fn new() -> Self {
48 Self {
49 line_buffer: BytesMut::new(),
50 emitted: BytesMut::new(),
51 mode: LineParserMode::ReadingLine,
52 }
53 }
54
55 pub fn on_gap(&mut self) {
59 self.line_buffer.clear();
60 self.mode = LineParserMode::DiscardUntilNewline;
61 }
62
63 pub fn next_line<'a, 'b>(
75 &'a mut self,
76 chunk: &mut &'b [u8],
77 options: LineParsingOptions,
78 ) -> Option<Cow<'a, str>>
79 where
80 'b: 'a,
81 {
82 self.compact_if_needed(options.buffer_compaction_threshold);
83 while !chunk.is_empty() {
84 match self.mode {
85 LineParserMode::DiscardUntilNewline => {
86 if let Some(pos) = memchr(b'\n', chunk) {
87 self.mode = LineParserMode::ReadingLine;
88 *chunk = &chunk[pos + 1..];
89 } else {
90 *chunk = &[];
91 return None;
92 }
93 continue;
94 }
95 LineParserMode::PendingLimitDelimiter => {
96 self.mode = LineParserMode::ReadingLine;
97 if chunk.first() == Some(&b'\n') {
98 *chunk = &chunk[1..];
99 continue;
100 }
101 }
102 LineParserMode::ReadingLine => {}
103 }
104
105 if options.max_line_length.0 != 0 && self.line_buffer.len() == options.max_line_length.0
106 {
107 self.mode = match options.overflow_behavior {
110 LineOverflowBehavior::DropAdditionalData => LineParserMode::DiscardUntilNewline,
111 LineOverflowBehavior::EmitAdditionalAsNewLines => {
112 LineParserMode::PendingLimitDelimiter
113 }
114 };
115 return Some(self.emit_buffered_line());
116 }
117
118 let remaining_line_length = if options.max_line_length.0 == 0 {
119 chunk.len()
120 } else {
121 options.max_line_length.0 - self.line_buffer.len()
122 };
123 let scan_len = remaining_line_length.min(chunk.len());
124 let scan = &chunk[..scan_len];
125
126 if let Some(pos) = memchr(b'\n', scan) {
127 if self.line_buffer.is_empty() {
128 let line = decode_line_lossy(&scan[..pos]);
131 *chunk = &chunk[pos + 1..];
132 return Some(line);
133 }
134 self.line_buffer.extend_from_slice(&scan[..pos]);
135 *chunk = &chunk[pos + 1..];
136 return Some(self.emit_buffered_line());
137 }
138
139 self.line_buffer.extend_from_slice(scan);
140 *chunk = &chunk[scan_len..];
141
142 if options.max_line_length.0 != 0
143 && self.line_buffer.len() == options.max_line_length.0
144 && matches!(
145 options.overflow_behavior,
146 LineOverflowBehavior::EmitAdditionalAsNewLines
147 )
148 {
149 self.mode = LineParserMode::PendingLimitDelimiter;
150 return Some(self.emit_buffered_line());
151 }
152 }
153
154 None
155 }
156
157 pub fn finish(&mut self) -> Option<Cow<'_, str>> {
164 if self.mode == LineParserMode::DiscardUntilNewline || self.line_buffer.is_empty() {
165 None
166 } else {
167 Some(self.emit_buffered_line())
168 }
169 }
170
171 fn compact_if_needed(&mut self, threshold: Option<crate::NumBytes>) {
189 let Some(threshold) = threshold else {
190 return;
191 };
192 let threshold = threshold.bytes();
193 if self.line_buffer.is_empty() && self.line_buffer.capacity() > threshold {
194 self.line_buffer = BytesMut::new();
195 }
196 if self.emitted.capacity() > threshold {
197 self.emitted = BytesMut::new();
198 }
199 }
200
201 fn emit_buffered_line(&mut self) -> Cow<'_, str> {
213 std::mem::swap(&mut self.line_buffer, &mut self.emitted);
214 self.line_buffer.clear();
215 decode_line_lossy(&self.emitted)
216 }
217}
218
219impl Default for LineParser {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use crate::{NumBytes, NumBytesExt};
229 use assertr::prelude::*;
230
231 fn run_test_case(
234 chunks: &[&[u8]],
235 mark_gap_before_chunk: Option<usize>,
236 expected_lines: &[&str],
237 options: LineParsingOptions,
238 ) {
239 let mut parser = LineParser::new();
240 let mut collected_lines = Vec::<String>::new();
241
242 for (index, chunk) in chunks.iter().enumerate() {
243 if mark_gap_before_chunk == Some(index) {
244 parser.on_gap();
245 }
246
247 let mut bytes: &[u8] = chunk;
248 while let Some(line) = parser.next_line(&mut bytes, options) {
249 collected_lines.push(line.into_owned());
250 }
251 }
252
253 if let Some(line) = parser.finish() {
254 collected_lines.push(line.into_owned());
255 }
256
257 let expected_lines: Vec<String> = expected_lines.iter().map(ToString::to_string).collect();
258 assert_that!(collected_lines).is_equal_to(expected_lines);
259 }
260
261 fn emit_additional_options() -> LineParsingOptions {
262 LineParsingOptions {
263 max_line_length: 4.bytes(),
264 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
265 buffer_compaction_threshold: None,
266 }
267 }
268
269 fn as_single_byte_chunks(data: &str) -> Vec<&[u8]> {
270 data.as_bytes().iter().map(std::slice::from_ref).collect()
271 }
272
273 #[test]
274 fn basic_line_parsing_cases() {
275 let default_options = LineParsingOptions::default();
276 let drop_additional_options = LineParsingOptions {
277 max_line_length: 4.bytes(),
278 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
279 buffer_compaction_threshold: None,
280 };
281
282 run_test_case(&[b""], None, &[], default_options);
283 run_test_case(
284 &[b"no newlines here"],
285 None,
286 &["no newlines here"],
287 default_options,
288 );
289 run_test_case(&[b"one line\n"], None, &["one line"], default_options);
290 run_test_case(
291 &[b"first line\nsecond line\nthird line\n"],
292 None,
293 &["first line", "second line", "third line"],
294 default_options,
295 );
296 run_test_case(
297 &[b"complete line\npartial"],
298 None,
299 &["complete line", "partial"],
300 default_options,
301 );
302 run_test_case(
303 &[b"previous: continuation\nmore lines\n"],
304 None,
305 &["previous: continuation", "more lines"],
306 default_options,
307 );
308 run_test_case(&[b"1234\n\n"], None, &["1234", ""], drop_additional_options);
309 run_test_case(
310 &[b"ok\n123456789\nnext\n"],
311 None,
312 &["ok", "1234", "next"],
313 drop_additional_options,
314 );
315 }
316
317 #[test]
318 fn invalid_utf8_data() {
319 run_test_case(
320 &[b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n"],
321 None,
322 &["valid utf8\u{FFFD}(\u{FFFD}\u{FFFD} invalid utf8"],
323 LineParsingOptions::default(),
324 );
325 }
326
327 #[test]
328 fn rest_of_too_long_line_is_dropped() {
329 run_test_case(
330 &[b"123456789\nabcdefghi\n"],
331 None,
332 &["1234", "abcd"],
333 LineParsingOptions {
334 max_line_length: 4.bytes(),
335 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
336 buffer_compaction_threshold: None,
337 },
338 );
339 }
340
341 #[test]
342 fn rest_of_too_long_line_is_returned_as_additional_lines() {
343 run_test_case(
344 &[b"123456789\nabcdefghi\n"],
345 None,
346 &["1234", "5678", "9", "abcd", "efgh", "i"],
347 emit_additional_options(),
348 );
349 }
350
351 #[test]
352 fn emit_additional_as_new_lines_does_not_emit_synthetic_empty_lines() {
353 let options = emit_additional_options();
354
355 run_test_case(&[b"1234\n"], None, &["1234"], options);
356 run_test_case(&[b"1234", b"\n"], None, &["1234"], options);
357 run_test_case(&[b"12345678\n"], None, &["1234", "5678"], options);
358 run_test_case(&[b"1234\n\n"], None, &["1234", ""], options);
359 }
360
361 #[test]
362 fn max_line_length_of_0_disables_line_length_checks() {
363 run_test_case(
364 &[b"123456789\nabcdefghi\n"],
365 None,
366 &["123456789", "abcdefghi"],
367 LineParsingOptions {
368 max_line_length: NumBytes::zero(),
369 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
370 buffer_compaction_threshold: None,
371 },
372 );
373 run_test_case(
374 &[b"123456789\nabcdefghi\n"],
375 None,
376 &["123456789", "abcdefghi"],
377 LineParsingOptions {
378 max_line_length: NumBytes::zero(),
379 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
380 buffer_compaction_threshold: None,
381 },
382 );
383 }
384
385 #[test]
386 fn leading_and_trailing_whitespace_is_preserved() {
387 run_test_case(
388 &[b" 123456789 \n abcdefghi \n"],
389 None,
390 &[" 123456789 ", " abcdefghi "],
391 LineParsingOptions {
392 max_line_length: NumBytes::zero(),
393 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
394 buffer_compaction_threshold: None,
395 },
396 );
397 }
398
399 #[test]
400 fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
401 let chunks =
402 as_single_byte_chunks("\u{2764}\u{FE0F}\u{2764}\u{FE0F}\u{2764}\u{FE0F}\n\u{1F44D}\n");
403 run_test_case(
404 &chunks,
405 None,
406 &[
407 "\u{2764}\u{FE0F}\u{2764}\u{FE0F}\u{2764}\u{FE0F}",
408 "\u{1F44D}",
409 ],
410 LineParsingOptions::default(),
411 );
412 }
413
414 #[test]
415 fn overflow_drop_additional_data_persists_across_chunks() {
416 run_test_case(
417 &[b"1234", b"5678", b"9\nok\n"],
418 None,
419 &["1234", "ok"],
420 LineParsingOptions {
421 max_line_length: 4.bytes(),
422 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
423 buffer_compaction_threshold: None,
424 },
425 );
426 }
427
428 #[test]
429 fn gap_discards_partial_line_until_next_newline() {
430 run_test_case(
431 &[b"rea", b"dy\nnext\n"],
432 Some(1),
433 &["next"],
434 LineParsingOptions::default(),
435 );
436 }
437
438 #[test]
439 fn fast_path_borrows_when_line_fits_in_chunk_with_empty_buffer() {
440 let mut parser = LineParser::new();
444 let chunk: &[u8] = b"hello\nworld\n";
445 let mut bytes = chunk;
446 let line = parser
447 .next_line(&mut bytes, LineParsingOptions::default())
448 .expect("first line is yielded");
449 assert_that!(matches!(line, Cow::Borrowed(_))).is_true();
450 drop(line);
451 let line = parser
452 .next_line(&mut bytes, LineParsingOptions::default())
453 .expect("second line is yielded");
454 assert_that!(matches!(line, Cow::Borrowed(_))).is_true();
455 }
456
457 mod properties {
458 use super::{LineOverflowBehavior, LineParser, LineParsingOptions, NumBytesExt};
465 use proptest::collection::vec;
466 use proptest::prelude::{any, prop, prop_assert, prop_assert_eq, proptest};
467 use proptest::strategy::Strategy;
468
469 fn drive_parser(
472 chunks: &[Vec<u8>],
473 gap_before: &[usize],
474 options: LineParsingOptions,
475 ) -> Vec<String> {
476 let mut parser = LineParser::new();
477 let mut out = Vec::<String>::new();
478 for (i, chunk) in chunks.iter().enumerate() {
479 if gap_before.contains(&i) {
480 parser.on_gap();
481 }
482 let mut bytes: &[u8] = chunk;
483 while let Some(line) = parser.next_line(&mut bytes, options) {
484 out.push(line.into_owned());
485 }
486 }
487 if let Some(line) = parser.finish() {
488 out.push(line.into_owned());
489 }
490 out
491 }
492
493 fn drive_single_chunk(bytes: &[u8], options: LineParsingOptions) -> Vec<String> {
496 drive_parser(&[bytes.to_vec()], &[], options)
497 }
498
499 fn split_at_indices(bytes: &[u8], splits: &[usize]) -> Vec<Vec<u8>> {
502 let mut chunks = Vec::with_capacity(splits.len() + 1);
503 let mut prev = 0usize;
504 for &s in splits {
505 chunks.push(bytes[prev..s].to_vec());
506 prev = s;
507 }
508 chunks.push(bytes[prev..].to_vec());
509 chunks
510 }
511
512 fn ascii_no_newline_line() -> impl Strategy<Value = String> {
514 prop::string::string_regex("[a-zA-Z0-9 _.,;:!?-]{0,40}").unwrap()
515 }
516
517 fn join_lines(lines: &[String], terminate_last: bool) -> String {
519 let mut s = String::new();
520 for (i, line) in lines.iter().enumerate() {
521 if i > 0 {
522 s.push('\n');
523 }
524 s.push_str(line);
525 }
526 if terminate_last && !lines.is_empty() {
527 s.push('\n');
528 }
529 s
530 }
531
532 proptest! {
533 #[test]
537 fn rechunking_preserves_lines(
538 lines in vec(ascii_no_newline_line(), 0..6),
539 terminate_last in any::<bool>(),
540 splits_seed in vec(any::<u16>(), 0..8),
541 ) {
542 let combined = join_lines(&lines, terminate_last);
543 let bytes = combined.as_bytes();
544
545 let mut splits: Vec<usize> = splits_seed
546 .into_iter()
547 .filter_map(|n| {
548 let len = bytes.len();
549 if len == 0 { None } else { Some((n as usize) % len) }
550 })
551 .collect();
552 splits.sort_unstable();
553 splits.dedup();
554
555 let chunks = split_at_indices(bytes, &splits);
556 let options = LineParsingOptions::default();
557
558 let from_chunks = drive_parser(&chunks, &[], options);
559 let from_single = drive_single_chunk(bytes, options);
560 prop_assert_eq!(from_chunks, from_single);
561 }
562
563 #[test]
567 fn single_byte_chunks_match_single_chunk(
568 content in prop::string::string_regex(
569 "([a-zA-Z0-9 \u{2764}\u{1F44D}]{0,12}\n){0,4}([a-zA-Z0-9 \u{2764}\u{1F44D}]{0,12})?",
570 ).unwrap(),
571 ) {
572 let bytes = content.as_bytes();
573 let single_byte_chunks: Vec<Vec<u8>> =
574 bytes.iter().map(|b| vec![*b]).collect();
575 let options = LineParsingOptions::default();
576
577 let from_single_byte = drive_parser(&single_byte_chunks, &[], options);
578 let from_single = drive_single_chunk(bytes, options);
579 prop_assert_eq!(from_single_byte, from_single);
580 }
581
582 #[test]
586 fn embedded_nuls_are_treated_as_content(
587 lines in vec(
588 prop::string::string_regex("[a-z\\x00]{0,16}").unwrap(),
589 1..5,
590 ),
591 ) {
592 let combined = join_lines(&lines, true);
593 let bytes = combined.as_bytes();
594
595 let result = drive_single_chunk(bytes, LineParsingOptions::default());
596 prop_assert_eq!(result.len(), lines.len());
597 for (got, expected) in result.iter().zip(lines.iter()) {
598 prop_assert_eq!(got, expected);
599 }
600 }
601
602 #[test]
605 fn multibyte_utf8_survives_chunk_split(
606 splits_seed in vec(any::<u8>(), 0..8),
607 ) {
608 let combined = "\u{2764}\u{FE0F}hello\n\u{1F44D}world\nplain\n";
609 let bytes = combined.as_bytes();
610
611 let mut splits: Vec<usize> = splits_seed
612 .into_iter()
613 .map(|n| (n as usize) % bytes.len())
614 .collect();
615 splits.sort_unstable();
616 splits.dedup();
617
618 let chunks = split_at_indices(bytes, &splits);
619 let options = LineParsingOptions::default();
620
621 let from_chunks = drive_parser(&chunks, &[], options);
622 let from_single = drive_single_chunk(bytes, options);
623 prop_assert_eq!(from_chunks, from_single);
624 }
625
626 #[test]
629 fn drop_additional_caps_emitted_line_length(
630 lines in vec(
631 prop::string::string_regex("[a-z]{0,30}").unwrap(),
632 1..5,
633 ),
634 max_line in 1usize..=8,
635 splits_seed in vec(any::<u16>(), 0..6),
636 ) {
637 let combined = join_lines(&lines, true);
638 let bytes = combined.as_bytes();
639 let options = LineParsingOptions {
640 max_line_length: max_line.bytes(),
641 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
642 buffer_compaction_threshold: None,
643 };
644
645 let mut splits: Vec<usize> = splits_seed
646 .into_iter()
647 .filter_map(|n| {
648 let len = bytes.len();
649 if len == 0 { None } else { Some((n as usize) % len) }
650 })
651 .collect();
652 splits.sort_unstable();
653 splits.dedup();
654 let chunks = split_at_indices(bytes, &splits);
655
656 let result = drive_parser(&chunks, &[], options);
657 for line in &result {
658 prop_assert!(
659 line.len() <= max_line,
660 "line {line:?} exceeds max_line_length {max_line}",
661 );
662 }
663 }
664
665 #[test]
668 fn emit_additional_preserves_all_bytes(
669 lines in vec(
670 prop::string::string_regex("[a-z]{0,20}").unwrap(),
671 1..5,
672 ),
673 max_line in 1usize..=8,
674 ) {
675 let combined = join_lines(&lines, true);
676 let bytes = combined.as_bytes();
677 let options = LineParsingOptions {
678 max_line_length: max_line.bytes(),
679 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
680 buffer_compaction_threshold: None,
681 };
682
683 let result = drive_single_chunk(bytes, options);
684 let original_no_newlines: String =
685 combined.chars().filter(|c| *c != '\n').collect();
686 let recombined: String = result.concat();
687 prop_assert_eq!(recombined, original_no_newlines);
688 }
689
690 #[test]
694 fn gap_emits_subset_of_no_gap_run(
695 pre_lines in vec(ascii_no_newline_line(), 0..3),
696 post_lines in vec(ascii_no_newline_line(), 1..4),
697 ) {
698 let pre = join_lines(&pre_lines, true);
699 let post = join_lines(&post_lines, true);
700
701 let chunks = vec![pre.as_bytes().to_vec(), post.as_bytes().to_vec()];
702 let options = LineParsingOptions::default();
703
704 let with_gap = drive_parser(&chunks, &[1], options);
705 let without_gap = drive_parser(&chunks, &[], options);
706
707 for line in &with_gap {
708 prop_assert!(
709 without_gap.contains(line),
710 "gap output {line:?} not present in no-gap output {without_gap:?}",
711 );
712 }
713 prop_assert!(with_gap.len() <= without_gap.len());
715 }
716 }
717 }
718
719 mod buffer_compaction {
720 use super::*;
721
722 fn run_split_line(parser: &mut LineParser, line: &[u8], options: LineParsingOptions) {
725 let mid = line.len() / 2;
728 let first: &[u8] = &line[..mid];
729 let mut second = Vec::with_capacity(line.len() - mid + 1);
730 second.extend_from_slice(&line[mid..]);
731 second.push(b'\n');
732
733 let mut bytes = first;
734 assert_that!(parser.next_line(&mut bytes, options).is_none()).is_true();
735 let mut bytes: &[u8] = &second;
736 let emitted = parser
737 .next_line(&mut bytes, options)
738 .expect("line emits when newline arrives");
739 assert_that!(emitted.len()).is_equal_to(line.len());
740 drop(emitted);
741 }
742
743 fn unbounded_options(threshold: Option<NumBytes>) -> LineParsingOptions {
744 LineParsingOptions {
745 max_line_length: NumBytes::zero(),
746 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
747 buffer_compaction_threshold: threshold,
748 }
749 }
750
751 #[test]
752 fn no_compaction_keeps_high_water_mark_when_threshold_is_none() {
753 let mut parser = LineParser::new();
754 let options = unbounded_options(None);
755
756 run_split_line(&mut parser, &b"a".repeat(200), options);
762 let larger = parser.line_buffer.capacity().max(parser.emitted.capacity());
763 assert_that!(larger >= 200).is_true();
764
765 run_split_line(&mut parser, &b"b".repeat(8), options);
766
767 let after = parser.line_buffer.capacity().max(parser.emitted.capacity());
770 assert_that!(after >= 200).is_true();
771 }
772
773 #[test]
774 fn compaction_releases_emitted_capacity_when_over_threshold() {
775 let mut parser = LineParser::new();
776 let options = unbounded_options(Some(16.bytes()));
779
780 run_split_line(&mut parser, &b"a".repeat(200), options);
781 assert_that!(parser.emitted.capacity() >= 200).is_true();
782
783 run_split_line(&mut parser, &b"b".repeat(8), options);
786 assert_that!(parser.emitted.capacity() <= 200).is_true();
787 assert_that!(parser.emitted.capacity() < 64).is_true();
788 }
789
790 #[test]
791 fn compaction_does_not_drop_mid_line_partial_buffer() {
792 let mut parser = LineParser::new();
793 let options = unbounded_options(Some(4.bytes()));
797
798 let mut bytes: &[u8] = b"abcdefgh";
800 assert_that!(parser.next_line(&mut bytes, options).is_none()).is_true();
801 assert_that!(parser.line_buffer.len()).is_equal_to(8);
802
803 let mut bytes: &[u8] = b"ij\n";
807 let emitted = parser
808 .next_line(&mut bytes, options)
809 .expect("full line emitted once newline arrives");
810 assert_that!(emitted.as_ref()).is_equal_to("abcdefghij");
811 }
812 }
813}