Skip to main content

tokio_process_tools/output_stream/line/
parser.rs

1//! Stateful line parser that splits arbitrary byte chunks into lines.
2//!
3//! The parser exposes one primitive — [`LineParser::next_line`] — that both the sync and async
4//! sides of [`LineAdapter`](super::adapter::LineAdapter) drive. A single state machine handles
5//! the chunk-spanning, max-line-length, and gap cases for both paths.
6
7use super::options::{LineOverflowBehavior, LineParsingOptions};
8use bytes::BytesMut;
9use memchr::memchr;
10use std::borrow::Cow;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13enum LineParserMode {
14    ReadingLine,
15    DiscardUntilNewline,
16    PendingLimitDelimiter,
17}
18
19/// Converts bytes to text with a fast path for proper UTF-8 text.
20fn decode_line_lossy(bytes: &[u8]) -> Cow<'_, str> {
21    String::from_utf8_lossy(bytes)
22}
23
24/// Stateful parser for turning arbitrary byte chunks into lines.
25///
26/// Drive it by calling [`Self::next_line`] in a loop with a slice cursor that you advance
27/// across calls; on EOF call [`Self::finish`] once to flush any unterminated trailing line.
28/// On a stream gap (chunks dropped between deliveries) call [`Self::on_gap`] to discard the
29/// in-progress partial line and resynchronize at the next newline.
30pub struct LineParser {
31    /// Bytes accumulated for the current in-progress line. Cleared (via `split`) when the line
32    /// is emitted.
33    line_buffer: BytesMut,
34
35    /// Holds the most-recently emitted line so its bytes outlive the call that produced them.
36    /// Each emission overwrites this slot via `BytesMut::split`, and the returned `Cow` borrows
37    /// from here when the line did not fit entirely in a single chunk. The borrow checker
38    /// enforces that the previous line is dropped before the next `next_line` call.
39    emitted: BytesMut,
40
41    mode: LineParserMode,
42}
43
44impl LineParser {
45    /// Creates a new parser in `ReadingLine` mode with empty buffers.
46    #[must_use]
47    pub fn new() -> Self {
48        Self {
49            line_buffer: BytesMut::new(),
50            emitted: BytesMut::new(),
51            mode: LineParserMode::ReadingLine,
52        }
53    }
54
55    /// Notifies the parser that the upstream delivery dropped chunks. Discards any partial
56    /// line in progress and resynchronizes at the next newline instead of joining bytes
57    /// across the gap.
58    pub fn on_gap(&mut self) {
59        self.line_buffer.clear();
60        self.mode = LineParserMode::DiscardUntilNewline;
61    }
62
63    /// Advances through `chunk` and yields the next parsed line, if any.
64    ///
65    /// `chunk` is mutated in place to advance past the consumed prefix. Call repeatedly,
66    /// reusing the same slice cursor, until this returns `None`; at that point the chunk is
67    /// exhausted and any partial line is buffered for the next chunk.
68    ///
69    /// The returned [`Cow`] borrows from the chunk slice when the line fits entirely in this
70    /// call and no partial line was already buffered (zero-allocation fast path), and borrows
71    /// from the parser's internal emitted-line slot otherwise. Either way, drop the `Cow`
72    /// before the next call — the borrow checker enforces this through the `&'a mut self`
73    /// signature.
74    pub fn next_line<'a, 'b>(
75        &'a mut self,
76        chunk: &mut &'b [u8],
77        options: LineParsingOptions,
78    ) -> Option<Cow<'a, str>>
79    where
80        'b: 'a,
81    {
82        self.compact_if_needed(options.buffer_compaction_threshold);
83        while !chunk.is_empty() {
84            match self.mode {
85                LineParserMode::DiscardUntilNewline => {
86                    if let Some(pos) = memchr(b'\n', chunk) {
87                        self.mode = LineParserMode::ReadingLine;
88                        *chunk = &chunk[pos + 1..];
89                    } else {
90                        *chunk = &[];
91                        return None;
92                    }
93                    continue;
94                }
95                LineParserMode::PendingLimitDelimiter => {
96                    self.mode = LineParserMode::ReadingLine;
97                    if chunk.first() == Some(&b'\n') {
98                        *chunk = &chunk[1..];
99                        continue;
100                    }
101                }
102                LineParserMode::ReadingLine => {}
103            }
104
105            if options.max_line_length.0 != 0 && self.line_buffer.len() == options.max_line_length.0
106            {
107                // Mutate `self.mode` BEFORE the emit, so the returned `Cow`'s borrow on
108                // `self` is the only outstanding borrow when we return.
109                self.mode = match options.overflow_behavior {
110                    LineOverflowBehavior::DropAdditionalData => LineParserMode::DiscardUntilNewline,
111                    LineOverflowBehavior::EmitAdditionalAsNewLines => {
112                        LineParserMode::PendingLimitDelimiter
113                    }
114                };
115                return Some(self.emit_buffered_line());
116            }
117
118            let remaining_line_length = if options.max_line_length.0 == 0 {
119                chunk.len()
120            } else {
121                options.max_line_length.0 - self.line_buffer.len()
122            };
123            let scan_len = remaining_line_length.min(chunk.len());
124            let scan = &chunk[..scan_len];
125
126            if let Some(pos) = memchr(b'\n', scan) {
127                if self.line_buffer.is_empty() {
128                    // Fast path: the whole line fits in this chunk and no prefix was buffered;
129                    // borrow directly from the chunk slice without copying.
130                    let line = decode_line_lossy(&scan[..pos]);
131                    *chunk = &chunk[pos + 1..];
132                    return Some(line);
133                }
134                self.line_buffer.extend_from_slice(&scan[..pos]);
135                *chunk = &chunk[pos + 1..];
136                return Some(self.emit_buffered_line());
137            }
138
139            self.line_buffer.extend_from_slice(scan);
140            *chunk = &chunk[scan_len..];
141
142            if options.max_line_length.0 != 0
143                && self.line_buffer.len() == options.max_line_length.0
144                && matches!(
145                    options.overflow_behavior,
146                    LineOverflowBehavior::EmitAdditionalAsNewLines
147                )
148            {
149                self.mode = LineParserMode::PendingLimitDelimiter;
150                return Some(self.emit_buffered_line());
151            }
152        }
153
154        None
155    }
156
157    /// Flushes any unterminated trailing line at EOF.
158    ///
159    /// Returns `None` when there is nothing to flush — the buffer is empty, or the parser is
160    /// in `DiscardUntilNewline` mode (a gap or overflow truncation is still draining and the
161    /// buffered remainder is conservatively dropped). Otherwise returns the buffered line as a
162    /// [`Cow`] borrowing from the parser's emitted-line slot.
163    pub fn finish(&mut self) -> Option<Cow<'_, str>> {
164        if self.mode == LineParserMode::DiscardUntilNewline || self.line_buffer.is_empty() {
165            None
166        } else {
167            Some(self.emit_buffered_line())
168        }
169    }
170
171    /// Drops over-sized buffer allocations so a single large line does not pin memory for the
172    /// parser's whole lifetime.
173    ///
174    /// Runs at the start of [`Self::next_line`] (never inside `emit_buffered_line`, since the
175    /// returned `Cow` borrows from `self.emitted` and reassigning it there would invalidate the
176    /// still-alive borrow). At entry, the borrow checker has already proven that any previous `Cow`
177    /// is dropped, so `self.emitted` is free to replace.
178    ///
179    /// `self.line_buffer` is intentionally **only** replaced when empty: a non-empty `line_buffer`
180    /// holds partial-line bytes accumulated from earlier chunks of the in-progress line, and we
181    /// must not drop those bytes mid-line. As a consequence, an over-sized `line_buffer` that
182    /// happens to carry a small partial line stays pinned until the in-progress line emits — at
183    /// which point swap-and-clear in `emit_buffered_line` rebalances the two slots and the next
184    /// `next_line` call reclaims the excess. The peak memory bound (`2 × max_line_length`) is the
185    /// same whether or not compaction is enabled; compaction only improves the steady-state
186    /// average after outliers, with a worst-case "still over-sized" window equal to the duration
187    /// of one in-progress line.
188    fn compact_if_needed(&mut self, threshold: Option<crate::NumBytes>) {
189        let Some(threshold) = threshold else {
190            return;
191        };
192        let threshold = threshold.bytes();
193        if self.line_buffer.is_empty() && self.line_buffer.capacity() > threshold {
194            self.line_buffer = BytesMut::new();
195        }
196        if self.emitted.capacity() > threshold {
197            self.emitted = BytesMut::new();
198        }
199    }
200
201    /// Moves the in-progress line bytes into the emitted slot and decodes them.
202    ///
203    /// Uses swap-and-clear instead of `split()`: the in-progress buffer becomes the new
204    /// `emitted`, the previous `emitted`'s allocation moves into `line_buffer` and gets
205    /// cleared (length to 0, capacity retained) so the next line accumulates without
206    /// allocating. Both buffers therefore behave as high-water-mark caches: each can grow up
207    /// to `LineParsingOptions::max_line_length` and stays at that size for the parser's
208    /// lifetime — no per-line allocator churn after the warm-up.
209    ///
210    /// The bytes live in `self.emitted` until the next emission swaps them out, which is
211    /// exactly long enough for the returned `Cow` to remain valid until the caller drops it.
212    fn emit_buffered_line(&mut self) -> Cow<'_, str> {
213        std::mem::swap(&mut self.line_buffer, &mut self.emitted);
214        self.line_buffer.clear();
215        decode_line_lossy(&self.emitted)
216    }
217}
218
219impl Default for LineParser {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use crate::{NumBytes, NumBytesExt};
229    use assertr::prelude::*;
230
231    /// Drives the parser across all chunks and collects every emitted line, plus the trailing
232    /// flush at EOF. Used by every test case below.
233    fn run_test_case(
234        chunks: &[&[u8]],
235        mark_gap_before_chunk: Option<usize>,
236        expected_lines: &[&str],
237        options: LineParsingOptions,
238    ) {
239        let mut parser = LineParser::new();
240        let mut collected_lines = Vec::<String>::new();
241
242        for (index, chunk) in chunks.iter().enumerate() {
243            if mark_gap_before_chunk == Some(index) {
244                parser.on_gap();
245            }
246
247            let mut bytes: &[u8] = chunk;
248            while let Some(line) = parser.next_line(&mut bytes, options) {
249                collected_lines.push(line.into_owned());
250            }
251        }
252
253        if let Some(line) = parser.finish() {
254            collected_lines.push(line.into_owned());
255        }
256
257        let expected_lines: Vec<String> = expected_lines.iter().map(ToString::to_string).collect();
258        assert_that!(collected_lines).is_equal_to(expected_lines);
259    }
260
261    fn emit_additional_options() -> LineParsingOptions {
262        LineParsingOptions {
263            max_line_length: 4.bytes(),
264            overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
265            buffer_compaction_threshold: None,
266        }
267    }
268
269    fn as_single_byte_chunks(data: &str) -> Vec<&[u8]> {
270        data.as_bytes().iter().map(std::slice::from_ref).collect()
271    }
272
273    #[test]
274    fn basic_line_parsing_cases() {
275        let default_options = LineParsingOptions::default();
276        let drop_additional_options = LineParsingOptions {
277            max_line_length: 4.bytes(),
278            overflow_behavior: LineOverflowBehavior::DropAdditionalData,
279            buffer_compaction_threshold: None,
280        };
281
282        run_test_case(&[b""], None, &[], default_options);
283        run_test_case(
284            &[b"no newlines here"],
285            None,
286            &["no newlines here"],
287            default_options,
288        );
289        run_test_case(&[b"one line\n"], None, &["one line"], default_options);
290        run_test_case(
291            &[b"first line\nsecond line\nthird line\n"],
292            None,
293            &["first line", "second line", "third line"],
294            default_options,
295        );
296        run_test_case(
297            &[b"complete line\npartial"],
298            None,
299            &["complete line", "partial"],
300            default_options,
301        );
302        run_test_case(
303            &[b"previous: continuation\nmore lines\n"],
304            None,
305            &["previous: continuation", "more lines"],
306            default_options,
307        );
308        run_test_case(&[b"1234\n\n"], None, &["1234", ""], drop_additional_options);
309        run_test_case(
310            &[b"ok\n123456789\nnext\n"],
311            None,
312            &["ok", "1234", "next"],
313            drop_additional_options,
314        );
315    }
316
317    #[test]
318    fn invalid_utf8_data() {
319        run_test_case(
320            &[b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n"],
321            None,
322            &["valid utf8\u{FFFD}(\u{FFFD}\u{FFFD} invalid utf8"],
323            LineParsingOptions::default(),
324        );
325    }
326
327    #[test]
328    fn rest_of_too_long_line_is_dropped() {
329        run_test_case(
330            &[b"123456789\nabcdefghi\n"],
331            None,
332            &["1234", "abcd"],
333            LineParsingOptions {
334                max_line_length: 4.bytes(),
335                overflow_behavior: LineOverflowBehavior::DropAdditionalData,
336                buffer_compaction_threshold: None,
337            },
338        );
339    }
340
341    #[test]
342    fn rest_of_too_long_line_is_returned_as_additional_lines() {
343        run_test_case(
344            &[b"123456789\nabcdefghi\n"],
345            None,
346            &["1234", "5678", "9", "abcd", "efgh", "i"],
347            emit_additional_options(),
348        );
349    }
350
351    #[test]
352    fn emit_additional_as_new_lines_does_not_emit_synthetic_empty_lines() {
353        let options = emit_additional_options();
354
355        run_test_case(&[b"1234\n"], None, &["1234"], options);
356        run_test_case(&[b"1234", b"\n"], None, &["1234"], options);
357        run_test_case(&[b"12345678\n"], None, &["1234", "5678"], options);
358        run_test_case(&[b"1234\n\n"], None, &["1234", ""], options);
359    }
360
361    #[test]
362    fn max_line_length_of_0_disables_line_length_checks() {
363        run_test_case(
364            &[b"123456789\nabcdefghi\n"],
365            None,
366            &["123456789", "abcdefghi"],
367            LineParsingOptions {
368                max_line_length: NumBytes::zero(),
369                overflow_behavior: LineOverflowBehavior::DropAdditionalData,
370                buffer_compaction_threshold: None,
371            },
372        );
373        run_test_case(
374            &[b"123456789\nabcdefghi\n"],
375            None,
376            &["123456789", "abcdefghi"],
377            LineParsingOptions {
378                max_line_length: NumBytes::zero(),
379                overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
380                buffer_compaction_threshold: None,
381            },
382        );
383    }
384
385    #[test]
386    fn leading_and_trailing_whitespace_is_preserved() {
387        run_test_case(
388            &[b"   123456789     \n    abcdefghi        \n"],
389            None,
390            &["   123456789     ", "    abcdefghi        "],
391            LineParsingOptions {
392                max_line_length: NumBytes::zero(),
393                overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
394                buffer_compaction_threshold: None,
395            },
396        );
397    }
398
399    #[test]
400    fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
401        let chunks =
402            as_single_byte_chunks("\u{2764}\u{FE0F}\u{2764}\u{FE0F}\u{2764}\u{FE0F}\n\u{1F44D}\n");
403        run_test_case(
404            &chunks,
405            None,
406            &[
407                "\u{2764}\u{FE0F}\u{2764}\u{FE0F}\u{2764}\u{FE0F}",
408                "\u{1F44D}",
409            ],
410            LineParsingOptions::default(),
411        );
412    }
413
414    #[test]
415    fn overflow_drop_additional_data_persists_across_chunks() {
416        run_test_case(
417            &[b"1234", b"5678", b"9\nok\n"],
418            None,
419            &["1234", "ok"],
420            LineParsingOptions {
421                max_line_length: 4.bytes(),
422                overflow_behavior: LineOverflowBehavior::DropAdditionalData,
423                buffer_compaction_threshold: None,
424            },
425        );
426    }
427
428    #[test]
429    fn gap_discards_partial_line_until_next_newline() {
430        run_test_case(
431            &[b"rea", b"dy\nnext\n"],
432            Some(1),
433            &["next"],
434            LineParsingOptions::default(),
435        );
436    }
437
438    #[test]
439    fn fast_path_borrows_when_line_fits_in_chunk_with_empty_buffer() {
440        // When the entire line is in this chunk and the buffer is empty, the parser hands back
441        // a `Cow::Borrowed` referencing the chunk slice. We can't observe Borrowed-vs-Owned
442        // directly through `into_owned`, so check the variant before consuming.
443        let mut parser = LineParser::new();
444        let chunk: &[u8] = b"hello\nworld\n";
445        let mut bytes = chunk;
446        let line = parser
447            .next_line(&mut bytes, LineParsingOptions::default())
448            .expect("first line is yielded");
449        assert_that!(matches!(line, Cow::Borrowed(_))).is_true();
450        drop(line);
451        let line = parser
452            .next_line(&mut bytes, LineParsingOptions::default())
453            .expect("second line is yielded");
454        assert_that!(matches!(line, Cow::Borrowed(_))).is_true();
455    }
456
457    mod properties {
458        //! Property-based coverage for [`LineParser`].
459        //!
460        //! These tests randomize chunk boundaries, line content (including embedded NULs and
461        //! multibyte UTF-8 sequences split mid-codepoint), and overflow behavior, then assert
462        //! invariants that no individual case-based test can cover comprehensively.
463
464        use super::{LineOverflowBehavior, LineParser, LineParsingOptions, NumBytesExt};
465        use proptest::collection::vec;
466        use proptest::prelude::{any, prop, prop_assert, prop_assert_eq, proptest};
467        use proptest::strategy::Strategy;
468
469        /// Drives the parser over `chunks`, returning every emitted line plus any trailing
470        /// flush at EOF. Inserts a gap before any chunk index in `gap_before`.
471        fn drive_parser(
472            chunks: &[Vec<u8>],
473            gap_before: &[usize],
474            options: LineParsingOptions,
475        ) -> Vec<String> {
476            let mut parser = LineParser::new();
477            let mut out = Vec::<String>::new();
478            for (i, chunk) in chunks.iter().enumerate() {
479                if gap_before.contains(&i) {
480                    parser.on_gap();
481                }
482                let mut bytes: &[u8] = chunk;
483                while let Some(line) = parser.next_line(&mut bytes, options) {
484                    out.push(line.into_owned());
485                }
486            }
487            if let Some(line) = parser.finish() {
488                out.push(line.into_owned());
489            }
490            out
491        }
492
493        /// Recombines `chunks` into one byte string and runs the parser over it as a single
494        /// chunk. Used as the reference oracle for "rechunking does not change observed lines."
495        fn drive_single_chunk(bytes: &[u8], options: LineParsingOptions) -> Vec<String> {
496            drive_parser(&[bytes.to_vec()], &[], options)
497        }
498
499        /// Splits `bytes` into chunks at the supplied (sorted, deduplicated, in-range) split
500        /// indices.
501        fn split_at_indices(bytes: &[u8], splits: &[usize]) -> Vec<Vec<u8>> {
502            let mut chunks = Vec::with_capacity(splits.len() + 1);
503            let mut prev = 0usize;
504            for &s in splits {
505                chunks.push(bytes[prev..s].to_vec());
506                prev = s;
507            }
508            chunks.push(bytes[prev..].to_vec());
509            chunks
510        }
511
512        /// Produces an ASCII-only line with no newlines so byte length equals character length.
513        fn ascii_no_newline_line() -> impl Strategy<Value = String> {
514            prop::string::string_regex("[a-zA-Z0-9 _.,;:!?-]{0,40}").unwrap()
515        }
516
517        /// Joins `lines` with `\n` and appends a trailing newline if `terminate_last` is true.
518        fn join_lines(lines: &[String], terminate_last: bool) -> String {
519            let mut s = String::new();
520            for (i, line) in lines.iter().enumerate() {
521                if i > 0 {
522                    s.push('\n');
523                }
524                s.push_str(line);
525            }
526            if terminate_last && !lines.is_empty() {
527                s.push('\n');
528            }
529            s
530        }
531
532        proptest! {
533            /// Splitting the same byte stream at any boundary set must yield the same lines as
534            /// feeding the whole stream in one chunk. This is the core "chunk boundary
535            /// invariance" property.
536            #[test]
537            fn rechunking_preserves_lines(
538                lines in vec(ascii_no_newline_line(), 0..6),
539                terminate_last in any::<bool>(),
540                splits_seed in vec(any::<u16>(), 0..8),
541            ) {
542                let combined = join_lines(&lines, terminate_last);
543                let bytes = combined.as_bytes();
544
545                let mut splits: Vec<usize> = splits_seed
546                    .into_iter()
547                    .filter_map(|n| {
548                        let len = bytes.len();
549                        if len == 0 { None } else { Some((n as usize) % len) }
550                    })
551                    .collect();
552                splits.sort_unstable();
553                splits.dedup();
554
555                let chunks = split_at_indices(bytes, &splits);
556                let options = LineParsingOptions::default();
557
558                let from_chunks = drive_parser(&chunks, &[], options);
559                let from_single = drive_single_chunk(bytes, options);
560                prop_assert_eq!(from_chunks, from_single);
561            }
562
563            /// Single-byte chunk feeding (the worst case for chunk-boundary handling and
564            /// multibyte UTF-8 reassembly) must produce the same output as feeding the whole
565            /// stream at once.
566            #[test]
567            fn single_byte_chunks_match_single_chunk(
568                content in prop::string::string_regex(
569                    "([a-zA-Z0-9 \u{2764}\u{1F44D}]{0,12}\n){0,4}([a-zA-Z0-9 \u{2764}\u{1F44D}]{0,12})?",
570                ).unwrap(),
571            ) {
572                let bytes = content.as_bytes();
573                let single_byte_chunks: Vec<Vec<u8>> =
574                    bytes.iter().map(|b| vec![*b]).collect();
575                let options = LineParsingOptions::default();
576
577                let from_single_byte = drive_parser(&single_byte_chunks, &[], options);
578                let from_single = drive_single_chunk(bytes, options);
579                prop_assert_eq!(from_single_byte, from_single);
580            }
581
582            /// Embedded NUL bytes are treated as ordinary content; lines remain split only on
583            /// `\n`. Round-tripping through the parser preserves the line count and byte
584            /// content (modulo the dropped delimiters) for ASCII-plus-NUL data.
585            #[test]
586            fn embedded_nuls_are_treated_as_content(
587                lines in vec(
588                    prop::string::string_regex("[a-z\\x00]{0,16}").unwrap(),
589                    1..5,
590                ),
591            ) {
592                let combined = join_lines(&lines, true);
593                let bytes = combined.as_bytes();
594
595                let result = drive_single_chunk(bytes, LineParsingOptions::default());
596                prop_assert_eq!(result.len(), lines.len());
597                for (got, expected) in result.iter().zip(lines.iter()) {
598                    prop_assert_eq!(got, expected);
599                }
600            }
601
602            /// Multibyte UTF-8 codepoints split across chunk boundaries are reassembled
603            /// identically to the single-chunk feed (not split into replacement characters).
604            #[test]
605            fn multibyte_utf8_survives_chunk_split(
606                splits_seed in vec(any::<u8>(), 0..8),
607            ) {
608                let combined = "\u{2764}\u{FE0F}hello\n\u{1F44D}world\nplain\n";
609                let bytes = combined.as_bytes();
610
611                let mut splits: Vec<usize> = splits_seed
612                    .into_iter()
613                    .map(|n| (n as usize) % bytes.len())
614                    .collect();
615                splits.sort_unstable();
616                splits.dedup();
617
618                let chunks = split_at_indices(bytes, &splits);
619                let options = LineParsingOptions::default();
620
621                let from_chunks = drive_parser(&chunks, &[], options);
622                let from_single = drive_single_chunk(bytes, options);
623                prop_assert_eq!(from_chunks, from_single);
624            }
625
626            /// `DropAdditionalData` truncates each emitted line to at most `max_line_length`
627            /// bytes (when `max_line_length > 0`), regardless of how the input is chunked.
628            #[test]
629            fn drop_additional_caps_emitted_line_length(
630                lines in vec(
631                    prop::string::string_regex("[a-z]{0,30}").unwrap(),
632                    1..5,
633                ),
634                max_line in 1usize..=8,
635                splits_seed in vec(any::<u16>(), 0..6),
636            ) {
637                let combined = join_lines(&lines, true);
638                let bytes = combined.as_bytes();
639                let options = LineParsingOptions {
640                    max_line_length: max_line.bytes(),
641                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
642                    buffer_compaction_threshold: None,
643                };
644
645                let mut splits: Vec<usize> = splits_seed
646                    .into_iter()
647                    .filter_map(|n| {
648                        let len = bytes.len();
649                        if len == 0 { None } else { Some((n as usize) % len) }
650                    })
651                    .collect();
652                splits.sort_unstable();
653                splits.dedup();
654                let chunks = split_at_indices(bytes, &splits);
655
656                let result = drive_parser(&chunks, &[], options);
657                for line in &result {
658                    prop_assert!(
659                        line.len() <= max_line,
660                        "line {line:?} exceeds max_line_length {max_line}",
661                    );
662                }
663            }
664
665            /// `EmitAdditionalAsNewLines` preserves all input bytes (modulo `\n` delimiters)
666            /// across the emitted lines, without inventing or dropping any data.
667            #[test]
668            fn emit_additional_preserves_all_bytes(
669                lines in vec(
670                    prop::string::string_regex("[a-z]{0,20}").unwrap(),
671                    1..5,
672                ),
673                max_line in 1usize..=8,
674            ) {
675                let combined = join_lines(&lines, true);
676                let bytes = combined.as_bytes();
677                let options = LineParsingOptions {
678                    max_line_length: max_line.bytes(),
679                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
680                    buffer_compaction_threshold: None,
681                };
682
683                let result = drive_single_chunk(bytes, options);
684                let original_no_newlines: String =
685                    combined.chars().filter(|c| *c != '\n').collect();
686                let recombined: String = result.concat();
687                prop_assert_eq!(recombined, original_no_newlines);
688            }
689
690            /// After a gap, the parser drops any partial line and resyncs at the next
691            /// newline. Whatever the parser emits after a gap is therefore a strict subset of
692            /// the lines produced by the same input without the gap.
693            #[test]
694            fn gap_emits_subset_of_no_gap_run(
695                pre_lines in vec(ascii_no_newline_line(), 0..3),
696                post_lines in vec(ascii_no_newline_line(), 1..4),
697            ) {
698                let pre = join_lines(&pre_lines, true);
699                let post = join_lines(&post_lines, true);
700
701                let chunks = vec![pre.as_bytes().to_vec(), post.as_bytes().to_vec()];
702                let options = LineParsingOptions::default();
703
704                let with_gap = drive_parser(&chunks, &[1], options);
705                let without_gap = drive_parser(&chunks, &[], options);
706
707                for line in &with_gap {
708                    prop_assert!(
709                        without_gap.contains(line),
710                        "gap output {line:?} not present in no-gap output {without_gap:?}",
711                    );
712                }
713                // Mid-line gap discards at most one line beyond what the gap arrived in.
714                prop_assert!(with_gap.len() <= without_gap.len());
715            }
716        }
717    }
718
719    mod buffer_compaction {
720        use super::*;
721
722        /// Forces a multi-chunk line so the buffered (non-fast-path) emission machinery engages and
723        /// the bytes flow through `line_buffer` and into `emitted` via `emit_buffered_line`.
724        fn run_split_line(parser: &mut LineParser, line: &[u8], options: LineParsingOptions) {
725            // Split the line in two; feed the first half (no newline → buffered), then the
726            // second half plus newline.
727            let mid = line.len() / 2;
728            let first: &[u8] = &line[..mid];
729            let mut second = Vec::with_capacity(line.len() - mid + 1);
730            second.extend_from_slice(&line[mid..]);
731            second.push(b'\n');
732
733            let mut bytes = first;
734            assert_that!(parser.next_line(&mut bytes, options).is_none()).is_true();
735            let mut bytes: &[u8] = &second;
736            let emitted = parser
737                .next_line(&mut bytes, options)
738                .expect("line emits when newline arrives");
739            assert_that!(emitted.len()).is_equal_to(line.len());
740            drop(emitted);
741        }
742
743        fn unbounded_options(threshold: Option<NumBytes>) -> LineParsingOptions {
744            LineParsingOptions {
745                max_line_length: NumBytes::zero(),
746                overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
747                buffer_compaction_threshold: threshold,
748            }
749        }
750
751        #[test]
752        fn no_compaction_keeps_high_water_mark_when_threshold_is_none() {
753            let mut parser = LineParser::new();
754            let options = unbounded_options(None);
755
756            // Swap-and-clear ping-pongs capacity between the two buffers each emission, so
757            // the meaningful invariant is on the *larger* of the two — not on either buffer
758            // individually. The larger of the two is what bounds the next-line cost: as long
759            // as it stays >= 200 bytes, no reallocation is needed when a 200-byte line shows
760            // up again.
761            run_split_line(&mut parser, &b"a".repeat(200), options);
762            let larger = parser.line_buffer.capacity().max(parser.emitted.capacity());
763            assert_that!(larger >= 200).is_true();
764
765            run_split_line(&mut parser, &b"b".repeat(8), options);
766
767            // No threshold ⇒ the largest retained capacity does not shrink below the
768            // 200-byte high-water mark even after the small line.
769            let after = parser.line_buffer.capacity().max(parser.emitted.capacity());
770            assert_that!(after >= 200).is_true();
771        }
772
773        #[test]
774        fn compaction_releases_emitted_capacity_when_over_threshold() {
775            let mut parser = LineParser::new();
776            // Threshold of 16 B is well below the 200-byte outlier but well above the typical
777            // 8-byte lines used afterwards.
778            let options = unbounded_options(Some(16.bytes()));
779
780            run_split_line(&mut parser, &b"a".repeat(200), options);
781            assert_that!(parser.emitted.capacity() >= 200).is_true();
782
783            // The next `next_line` call observes the over-threshold capacity at entry and
784            // drops the allocation; the small line then re-grows `emitted` to a small size.
785            run_split_line(&mut parser, &b"b".repeat(8), options);
786            assert_that!(parser.emitted.capacity() <= 200).is_true();
787            assert_that!(parser.emitted.capacity() < 64).is_true();
788        }
789
790        #[test]
791        fn compaction_does_not_drop_mid_line_partial_buffer() {
792            let mut parser = LineParser::new();
793            // Pick a threshold smaller than the in-progress line. If `compact_if_needed`
794            // wrongly clobbered `line_buffer` mid-line, the trailing part of the line would
795            // get re-emitted on its own; the assertion below catches that.
796            let options = unbounded_options(Some(4.bytes()));
797
798            // Feed a partial line — no newline yet, so it stays buffered.
799            let mut bytes: &[u8] = b"abcdefgh";
800            assert_that!(parser.next_line(&mut bytes, options).is_none()).is_true();
801            assert_that!(parser.line_buffer.len()).is_equal_to(8);
802
803            // Even though `line_buffer.capacity() > threshold`, the buffer is non-empty so
804            // compaction must be skipped. After feeding the rest with a newline, the full
805            // line is emitted intact.
806            let mut bytes: &[u8] = b"ij\n";
807            let emitted = parser
808                .next_line(&mut bytes, options)
809                .expect("full line emitted once newline arrives");
810            assert_that!(emitted.as_ref()).is_equal_to("abcdefghij");
811        }
812    }
813}