Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Check if config requires field/char skipping or char limiting.
119#[inline(always)]
120fn needs_key_extraction(config: &UniqConfig) -> bool {
121    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
122}
123
124/// Fast path comparison: no field/char extraction needed, no case folding.
125/// Uses pointer+length equality shortcut and 8-byte prefix rejection.
126#[inline(always)]
127fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
128    let alen = a.len();
129    if alen != b.len() {
130        return false;
131    }
132    if alen == 0 {
133        return true;
134    }
135    // 8-byte prefix check: reject most non-equal lines without full memcmp
136    if alen >= 8 {
137        let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
138        let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
139        if a8 != b8 {
140            return false;
141        }
142    }
143    a == b
144}
145
146/// Write a count-prefixed line in GNU uniq format.
147/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
148/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
149#[inline(always)]
150fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
151    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
152    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
153    let digits = itoa_right_aligned_into(&mut prefix, count);
154    let width = digits.max(7); // minimum 7 chars
155    let prefix_len = width + 1; // +1 for trailing space
156    prefix[width] = b' ';
157
158    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
159    let total = prefix_len + line.len() + 1;
160    if total <= 256 {
161        let mut buf = [0u8; 256];
162        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
163        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
164        buf[prefix_len + line.len()] = term;
165        out.write_all(&buf[..total])
166    } else {
167        out.write_all(&prefix[..prefix_len])?;
168        out.write_all(line)?;
169        out.write_all(&[term])
170    }
171}
172
173/// Write u64 decimal right-aligned into prefix buffer.
174/// Buffer is pre-filled with spaces. Returns number of digits written.
175#[inline(always)]
176fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
177    if val == 0 {
178        buf[6] = b'0';
179        return 7; // 6 spaces + '0' = 7 chars
180    }
181    // Write digits right-to-left from position 27 (leaving room for trailing space)
182    let mut pos = 27;
183    while val > 0 {
184        pos -= 1;
185        buf[pos] = b'0' + (val % 10) as u8;
186        val /= 10;
187    }
188    let num_digits = 27 - pos;
189    if num_digits >= 7 {
190        // Number is wide enough, shift to front
191        buf.copy_within(pos..27, 0);
192        num_digits
193    } else {
194        // Right-align in 7-char field: spaces then digits
195        let pad = 7 - num_digits;
196        buf.copy_within(pos..27, pad);
197        // buf[0..pad] is already spaces from initialization
198        7
199    }
200}
201
202// ============================================================================
203// High-performance mmap-based processing (for byte slices, zero-copy)
204// ============================================================================
205
206/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
207pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
208    // 16MB output buffer for fewer flush syscalls on large inputs
209    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
210    let term = if config.zero_terminated { b'\0' } else { b'\n' };
211
212    match config.mode {
213        OutputMode::Group(method) => {
214            process_group_bytes(data, &mut writer, config, method, term)?;
215        }
216        OutputMode::AllRepeated(method) => {
217            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
218        }
219        _ => {
220            process_standard_bytes(data, &mut writer, config, term)?;
221        }
222    }
223
224    writer.flush()?;
225    Ok(())
226}
227
228/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
229/// Uses memchr for SIMD-accelerated line boundary detection.
230struct LineIter<'a> {
231    data: &'a [u8],
232    pos: usize,
233    term: u8,
234}
235
236impl<'a> LineIter<'a> {
237    #[inline(always)]
238    fn new(data: &'a [u8], term: u8) -> Self {
239        Self { data, pos: 0, term }
240    }
241}
242
243impl<'a> Iterator for LineIter<'a> {
244    /// (line content without terminator, full line including terminator for output)
245    type Item = (&'a [u8], &'a [u8]);
246
247    #[inline(always)]
248    fn next(&mut self) -> Option<Self::Item> {
249        if self.pos >= self.data.len() {
250            return None;
251        }
252
253        let remaining = &self.data[self.pos..];
254        match memchr::memchr(self.term, remaining) {
255            Some(idx) => {
256                let line_start = self.pos;
257                let line_end = self.pos + idx; // without terminator
258                let full_end = self.pos + idx + 1; // with terminator
259                self.pos = full_end;
260                Some((
261                    &self.data[line_start..line_end],
262                    &self.data[line_start..full_end],
263                ))
264            }
265            None => {
266                // Last line without terminator
267                let line_start = self.pos;
268                self.pos = self.data.len();
269                let line = &self.data[line_start..];
270                Some((line, line))
271            }
272        }
273    }
274}
275
276/// Get line content (without terminator) from pre-computed positions.
277/// `content_end` is the end of actual content (excludes trailing terminator if present).
278#[inline(always)]
279fn line_content_at<'a>(
280    data: &'a [u8],
281    line_starts: &[usize],
282    idx: usize,
283    content_end: usize,
284) -> &'a [u8] {
285    let start = line_starts[idx];
286    let end = if idx + 1 < line_starts.len() {
287        line_starts[idx + 1] - 1 // exclude terminator
288    } else {
289        content_end // last line: pre-computed to exclude trailing terminator
290    };
291    &data[start..end]
292}
293
294/// Get full line (with terminator) from pre-computed positions.
295#[inline(always)]
296fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
297    let start = line_starts[idx];
298    let end = if idx + 1 < line_starts.len() {
299        line_starts[idx + 1] // include terminator
300    } else {
301        data.len()
302    };
303    &data[start..end]
304}
305
306/// Linear scan for the end of a duplicate group.
307/// Returns the index of the first line that differs from line_starts[group_start].
308/// Must use linear scan (not binary search) because uniq input may NOT be sorted —
309/// equal lines can appear in non-adjacent groups separated by different lines.
310#[inline]
311fn linear_scan_group_end(
312    data: &[u8],
313    line_starts: &[usize],
314    group_start: usize,
315    num_lines: usize,
316    content_end: usize,
317) -> usize {
318    let key = line_content_at(data, line_starts, group_start, content_end);
319    let mut i = group_start + 1;
320    while i < num_lines {
321        if !lines_equal_fast(key, line_content_at(data, line_starts, i, content_end)) {
322            return i;
323        }
324        i += 1;
325    }
326    i
327}
328
329/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
330/// Uses pre-computed line positions for faster iteration and binary search
331/// for duplicate group sizes on repetitive data.
332fn process_standard_bytes(
333    data: &[u8],
334    writer: &mut impl Write,
335    config: &UniqConfig,
336    term: u8,
337) -> io::Result<()> {
338    if data.is_empty() {
339        return Ok(());
340    }
341
342    let fast = !needs_key_extraction(config) && !config.ignore_case;
343
344    // Pre-compute all line start positions in a single SIMD pass.
345    // memchr_iter precomputes SIMD state once (vs per-call recomputation in LineIter).
346    let estimated_lines = (data.len() / 40).max(64);
347    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
348    line_starts.push(0); // first line starts at offset 0
349    for pos in memchr::memchr_iter(term, data) {
350        if pos + 1 < data.len() {
351            line_starts.push(pos + 1);
352        }
353    }
354    let num_lines = line_starts.len();
355    if num_lines == 0 {
356        return Ok(());
357    }
358
359    // Pre-compute content end: if data ends with terminator, exclude it for last line
360    let content_end = if data.last() == Some(&term) {
361        data.len() - 1
362    } else {
363        data.len()
364    };
365
366    // Ultra-fast path: default mode, no count, no key extraction
367    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
368        // Write first line
369        let first_full = line_full_at(data, &line_starts, 0);
370        let first_content = line_content_at(data, &line_starts, 0, content_end);
371        write_all_raw(writer, first_full)?;
372        if first_full.len() == first_content.len() {
373            writer.write_all(&[term])?;
374        }
375
376        let mut i = 1;
377        while i < num_lines {
378            let prev = line_content_at(data, &line_starts, i - 1, content_end);
379            let cur = line_content_at(data, &line_starts, i, content_end);
380
381            if lines_equal_fast(prev, cur) {
382                // Duplicate detected — binary search for end of group
383                let group_end =
384                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
385                i = group_end;
386                continue;
387            }
388
389            // Unique line — write it
390            let cur_full = line_full_at(data, &line_starts, i);
391            write_all_raw(writer, cur_full)?;
392            if cur_full.len() == cur.len() {
393                writer.write_all(&[term])?;
394            }
395            i += 1;
396        }
397        return Ok(());
398    }
399
400    // General path with count tracking + binary search for duplicate groups
401    let mut i = 0;
402    while i < num_lines {
403        let content = line_content_at(data, &line_starts, i, content_end);
404        let full = line_full_at(data, &line_starts, i);
405
406        // Find group size: check next line, if equal use binary search
407        let group_end = if fast
408            && i + 1 < num_lines
409            && lines_equal_fast(
410                content,
411                line_content_at(data, &line_starts, i + 1, content_end),
412            ) {
413            // Duplicate detected — binary search for end
414            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
415        } else if !fast
416            && i + 1 < num_lines
417            && lines_equal(
418                content,
419                line_content_at(data, &line_starts, i + 1, content_end),
420                config,
421            )
422        {
423            // Slow path linear scan with key extraction
424            let mut j = i + 2;
425            while j < num_lines {
426                if !lines_equal(
427                    content,
428                    line_content_at(data, &line_starts, j, content_end),
429                    config,
430                ) {
431                    break;
432                }
433                j += 1;
434            }
435            j
436        } else {
437            i + 1
438        };
439
440        let count = (group_end - i) as u64;
441        output_group_bytes(writer, content, full, count, config, term)?;
442        i = group_end;
443    }
444
445    Ok(())
446}
447
448/// Output a group for standard modes (bytes path).
449#[inline(always)]
450fn output_group_bytes(
451    writer: &mut impl Write,
452    content: &[u8],
453    full: &[u8],
454    count: u64,
455    config: &UniqConfig,
456    term: u8,
457) -> io::Result<()> {
458    let should_print = match config.mode {
459        OutputMode::Default => true,
460        OutputMode::RepeatedOnly => count > 1,
461        OutputMode::UniqueOnly => count == 1,
462        _ => true,
463    };
464
465    if should_print {
466        if config.count {
467            write_count_line(writer, count, content, term)?;
468        } else {
469            writer.write_all(full)?;
470            // Add terminator if the original line didn't have one
471            if full.len() == content.len() {
472                writer.write_all(&[term])?;
473            }
474        }
475    }
476
477    Ok(())
478}
479
480/// Process --all-repeated / -D mode on byte slices.
481fn process_all_repeated_bytes(
482    data: &[u8],
483    writer: &mut impl Write,
484    config: &UniqConfig,
485    method: AllRepeatedMethod,
486    term: u8,
487) -> io::Result<()> {
488    let mut lines = LineIter::new(data, term);
489
490    let first = match lines.next() {
491        Some(v) => v,
492        None => return Ok(()),
493    };
494
495    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
496    // For all-repeated we need to buffer group lines since we only print if count > 1
497    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
498    group_lines.push(first);
499    let mut first_group_printed = false;
500
501    let fast = !needs_key_extraction(config) && !config.ignore_case;
502
503    for (cur_content, cur_full) in lines {
504        let prev_content = group_lines.last().unwrap().0;
505        let equal = if fast {
506            lines_equal_fast(prev_content, cur_content)
507        } else {
508            lines_equal(prev_content, cur_content, config)
509        };
510
511        if equal {
512            group_lines.push((cur_content, cur_full));
513        } else {
514            // Flush group
515            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
516            group_lines.clear();
517            group_lines.push((cur_content, cur_full));
518        }
519    }
520
521    // Flush last group
522    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
523
524    Ok(())
525}
526
527/// Flush a group for --all-repeated mode (bytes path).
528fn flush_all_repeated_bytes(
529    writer: &mut impl Write,
530    group: &[(&[u8], &[u8])],
531    method: AllRepeatedMethod,
532    first_group_printed: &mut bool,
533    term: u8,
534) -> io::Result<()> {
535    if group.len() <= 1 {
536        return Ok(()); // Not a duplicate group
537    }
538
539    match method {
540        AllRepeatedMethod::Prepend => {
541            writer.write_all(&[term])?;
542        }
543        AllRepeatedMethod::Separate => {
544            if *first_group_printed {
545                writer.write_all(&[term])?;
546            }
547        }
548        AllRepeatedMethod::None => {}
549    }
550
551    for &(content, full) in group {
552        writer.write_all(full)?;
553        if full.len() == content.len() {
554            writer.write_all(&[term])?;
555        }
556    }
557
558    *first_group_printed = true;
559    Ok(())
560}
561
562/// Process --group mode on byte slices.
563fn process_group_bytes(
564    data: &[u8],
565    writer: &mut impl Write,
566    config: &UniqConfig,
567    method: GroupMethod,
568    term: u8,
569) -> io::Result<()> {
570    let mut lines = LineIter::new(data, term);
571
572    let (prev_content, prev_full) = match lines.next() {
573        Some(v) => v,
574        None => return Ok(()),
575    };
576
577    // Prepend/Both: separator before first group
578    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
579        writer.write_all(&[term])?;
580    }
581
582    // Write first line
583    writer.write_all(prev_full)?;
584    if prev_full.len() == prev_content.len() {
585        writer.write_all(&[term])?;
586    }
587
588    let mut prev_content = prev_content;
589    let fast = !needs_key_extraction(config) && !config.ignore_case;
590
591    for (cur_content, cur_full) in lines {
592        let equal = if fast {
593            lines_equal_fast(prev_content, cur_content)
594        } else {
595            lines_equal(prev_content, cur_content, config)
596        };
597
598        if !equal {
599            // New group — write separator
600            writer.write_all(&[term])?;
601        }
602
603        writer.write_all(cur_full)?;
604        if cur_full.len() == cur_content.len() {
605            writer.write_all(&[term])?;
606        }
607
608        prev_content = cur_content;
609    }
610
611    // Append/Both: separator after last group
612    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
613        writer.write_all(&[term])?;
614    }
615
616    Ok(())
617}
618
619// ============================================================================
620// Streaming processing (for stdin / pipe input)
621// ============================================================================
622
623/// Main streaming uniq processor.
624/// Reads from `input`, writes to `output`.
625pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
626    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
627    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
628    let term = if config.zero_terminated { b'\0' } else { b'\n' };
629
630    match config.mode {
631        OutputMode::Group(method) => {
632            process_group_stream(reader, &mut writer, config, method, term)?;
633        }
634        OutputMode::AllRepeated(method) => {
635            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
636        }
637        _ => {
638            process_standard_stream(reader, &mut writer, config, term)?;
639        }
640    }
641
642    writer.flush()?;
643    Ok(())
644}
645
646/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
647fn process_standard_stream<R: BufRead, W: Write>(
648    mut reader: R,
649    writer: &mut W,
650    config: &UniqConfig,
651    term: u8,
652) -> io::Result<()> {
653    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
654    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
655
656    // Read first line
657    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
658        return Ok(()); // empty input
659    }
660    let mut count: u64 = 1;
661
662    loop {
663        current_line.clear();
664        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
665
666        if bytes_read == 0 {
667            // End of input — output the last group
668            output_group_stream(writer, &prev_line, count, config, term)?;
669            break;
670        }
671
672        if compare_lines_stream(&prev_line, &current_line, config, term) {
673            count += 1;
674        } else {
675            output_group_stream(writer, &prev_line, count, config, term)?;
676            std::mem::swap(&mut prev_line, &mut current_line);
677            count = 1;
678        }
679    }
680
681    Ok(())
682}
683
684/// Compare two lines (with terminators) in streaming mode.
685#[inline(always)]
686fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
687    let a_stripped = strip_term(a, term);
688    let b_stripped = strip_term(b, term);
689    lines_equal(a_stripped, b_stripped, config)
690}
691
692/// Strip terminator from end of line.
693#[inline(always)]
694fn strip_term(line: &[u8], term: u8) -> &[u8] {
695    if line.last() == Some(&term) {
696        &line[..line.len() - 1]
697    } else {
698        line
699    }
700}
701
702/// Output a group in streaming mode.
703#[inline(always)]
704fn output_group_stream(
705    writer: &mut impl Write,
706    line: &[u8],
707    count: u64,
708    config: &UniqConfig,
709    term: u8,
710) -> io::Result<()> {
711    let should_print = match config.mode {
712        OutputMode::Default => true,
713        OutputMode::RepeatedOnly => count > 1,
714        OutputMode::UniqueOnly => count == 1,
715        _ => true,
716    };
717
718    if should_print {
719        let content = strip_term(line, term);
720        if config.count {
721            write_count_line(writer, count, content, term)?;
722        } else {
723            writer.write_all(content)?;
724            writer.write_all(&[term])?;
725        }
726    }
727
728    Ok(())
729}
730
731/// Process --all-repeated / -D mode (streaming).
732fn process_all_repeated_stream<R: BufRead, W: Write>(
733    mut reader: R,
734    writer: &mut W,
735    config: &UniqConfig,
736    method: AllRepeatedMethod,
737    term: u8,
738) -> io::Result<()> {
739    let mut group: Vec<Vec<u8>> = Vec::new();
740    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
741    let mut first_group_printed = false;
742
743    current_line.clear();
744    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
745        return Ok(());
746    }
747    group.push(current_line.clone());
748
749    loop {
750        current_line.clear();
751        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
752
753        if bytes_read == 0 {
754            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
755            break;
756        }
757
758        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
759            group.push(current_line.clone());
760        } else {
761            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
762            group.clear();
763            group.push(current_line.clone());
764        }
765    }
766
767    Ok(())
768}
769
770/// Flush a group for --all-repeated mode (streaming).
771fn flush_all_repeated_stream(
772    writer: &mut impl Write,
773    group: &[Vec<u8>],
774    method: AllRepeatedMethod,
775    first_group_printed: &mut bool,
776    term: u8,
777) -> io::Result<()> {
778    if group.len() <= 1 {
779        return Ok(());
780    }
781
782    match method {
783        AllRepeatedMethod::Prepend => {
784            writer.write_all(&[term])?;
785        }
786        AllRepeatedMethod::Separate => {
787            if *first_group_printed {
788                writer.write_all(&[term])?;
789            }
790        }
791        AllRepeatedMethod::None => {}
792    }
793
794    for line in group {
795        let content = strip_term(line, term);
796        writer.write_all(content)?;
797        writer.write_all(&[term])?;
798    }
799
800    *first_group_printed = true;
801    Ok(())
802}
803
804/// Process --group mode (streaming).
805fn process_group_stream<R: BufRead, W: Write>(
806    mut reader: R,
807    writer: &mut W,
808    config: &UniqConfig,
809    method: GroupMethod,
810    term: u8,
811) -> io::Result<()> {
812    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
813    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
814
815    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
816        return Ok(());
817    }
818
819    // Prepend/Both: separator before first group
820    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
821        writer.write_all(&[term])?;
822    }
823
824    let content = strip_term(&prev_line, term);
825    writer.write_all(content)?;
826    writer.write_all(&[term])?;
827
828    loop {
829        current_line.clear();
830        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
831
832        if bytes_read == 0 {
833            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
834                writer.write_all(&[term])?;
835            }
836            break;
837        }
838
839        if !compare_lines_stream(&prev_line, &current_line, config, term) {
840            writer.write_all(&[term])?;
841        }
842
843        let content = strip_term(&current_line, term);
844        writer.write_all(content)?;
845        writer.write_all(&[term])?;
846
847        std::mem::swap(&mut prev_line, &mut current_line);
848    }
849
850    Ok(())
851}
852
853/// Read a line terminated by the given byte (newline or NUL).
854/// Returns number of bytes read (0 = EOF).
855#[inline(always)]
856fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
857    reader.read_until(term, buf)
858}