Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
119/// Uses length check + 8-byte prefix rejection before full comparison.
120#[inline(always)]
121fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
122    let alen = a.len();
123    if alen != b.len() {
124        return false;
125    }
126    if alen == 0 {
127        return true;
128    }
129    a.eq_ignore_ascii_case(b)
130}
131
132/// Check if config requires field/char skipping or char limiting.
133#[inline(always)]
134fn needs_key_extraction(config: &UniqConfig) -> bool {
135    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
136}
137
138/// Fast path comparison: no field/char extraction needed, no case folding.
139/// Uses pointer+length equality shortcut and multi-word prefix rejection.
140/// For short lines (<= 16 bytes, common in many-dups data), avoids the
141/// full memcmp call overhead by doing direct word comparisons.
142#[inline(always)]
143fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
144    let alen = a.len();
145    if alen != b.len() {
146        return false;
147    }
148    if alen == 0 {
149        return true;
150    }
151    // Short-line fast path: compare via word loads to avoid memcmp call overhead
152    if alen <= 8 {
153        // For <= 8 bytes: load as u64 with masking (overlap at start)
154        // Compare the first min(alen,8) bytes via a single u64 load
155        if alen >= 8 {
156            let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
157            let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
158            return a8 == b8;
159        }
160        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
161        return a == b;
162    }
163    // 8-byte prefix check: reject most non-equal lines without full memcmp
164    let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
165    let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
166    if a8 != b8 {
167        return false;
168    }
169    // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
170    if alen <= 16 {
171        let a_tail = unsafe { (a.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
172        let b_tail = unsafe { (b.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
173        return a_tail == b_tail;
174    }
175    // Longer lines: prefix passed, fall through to full memcmp
176    a == b
177}
178
179/// Write a count-prefixed line in GNU uniq format.
180/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
181/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
182#[inline(always)]
183fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
184    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
185    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
186    let digits = itoa_right_aligned_into(&mut prefix, count);
187    let width = digits.max(7); // minimum 7 chars
188    let prefix_len = width + 1; // +1 for trailing space
189    prefix[width] = b' ';
190
191    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
192    let total = prefix_len + line.len() + 1;
193    if total <= 256 {
194        let mut buf = [0u8; 256];
195        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
196        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
197        buf[prefix_len + line.len()] = term;
198        out.write_all(&buf[..total])
199    } else {
200        out.write_all(&prefix[..prefix_len])?;
201        out.write_all(line)?;
202        out.write_all(&[term])
203    }
204}
205
206/// Write u64 decimal right-aligned into prefix buffer.
207/// Buffer is pre-filled with spaces. Returns number of digits written.
208#[inline(always)]
209fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
210    if val == 0 {
211        buf[6] = b'0';
212        return 7; // 6 spaces + '0' = 7 chars
213    }
214    // Write digits right-to-left from position 27 (leaving room for trailing space)
215    let mut pos = 27;
216    while val > 0 {
217        pos -= 1;
218        buf[pos] = b'0' + (val % 10) as u8;
219        val /= 10;
220    }
221    let num_digits = 27 - pos;
222    if num_digits >= 7 {
223        // Number is wide enough, shift to front
224        buf.copy_within(pos..27, 0);
225        num_digits
226    } else {
227        // Right-align in 7-char field: spaces then digits
228        let pad = 7 - num_digits;
229        buf.copy_within(pos..27, pad);
230        // buf[0..pad] is already spaces from initialization
231        7
232    }
233}
234
235// ============================================================================
236// High-performance mmap-based processing (for byte slices, zero-copy)
237// ============================================================================
238
239/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
240pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
241    // 16MB output buffer for fewer flush syscalls on large inputs
242    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
243    let term = if config.zero_terminated { b'\0' } else { b'\n' };
244
245    match config.mode {
246        OutputMode::Group(method) => {
247            process_group_bytes(data, &mut writer, config, method, term)?;
248        }
249        OutputMode::AllRepeated(method) => {
250            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
251        }
252        _ => {
253            process_standard_bytes(data, &mut writer, config, term)?;
254        }
255    }
256
257    writer.flush()?;
258    Ok(())
259}
260
261/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
262/// Uses memchr for SIMD-accelerated line boundary detection.
263struct LineIter<'a> {
264    data: &'a [u8],
265    pos: usize,
266    term: u8,
267}
268
269impl<'a> LineIter<'a> {
270    #[inline(always)]
271    fn new(data: &'a [u8], term: u8) -> Self {
272        Self { data, pos: 0, term }
273    }
274}
275
276impl<'a> Iterator for LineIter<'a> {
277    /// (line content without terminator, full line including terminator for output)
278    type Item = (&'a [u8], &'a [u8]);
279
280    #[inline(always)]
281    fn next(&mut self) -> Option<Self::Item> {
282        if self.pos >= self.data.len() {
283            return None;
284        }
285
286        let remaining = &self.data[self.pos..];
287        match memchr::memchr(self.term, remaining) {
288            Some(idx) => {
289                let line_start = self.pos;
290                let line_end = self.pos + idx; // without terminator
291                let full_end = self.pos + idx + 1; // with terminator
292                self.pos = full_end;
293                Some((
294                    &self.data[line_start..line_end],
295                    &self.data[line_start..full_end],
296                ))
297            }
298            None => {
299                // Last line without terminator
300                let line_start = self.pos;
301                self.pos = self.data.len();
302                let line = &self.data[line_start..];
303                Some((line, line))
304            }
305        }
306    }
307}
308
309/// Get line content (without terminator) from pre-computed positions.
310/// `content_end` is the end of actual content (excludes trailing terminator if present).
311#[inline(always)]
312fn line_content_at<'a>(
313    data: &'a [u8],
314    line_starts: &[usize],
315    idx: usize,
316    content_end: usize,
317) -> &'a [u8] {
318    let start = line_starts[idx];
319    let end = if idx + 1 < line_starts.len() {
320        line_starts[idx + 1] - 1 // exclude terminator
321    } else {
322        content_end // last line: pre-computed to exclude trailing terminator
323    };
324    &data[start..end]
325}
326
327/// Get full line (with terminator) from pre-computed positions.
328#[inline(always)]
329fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
330    let start = line_starts[idx];
331    let end = if idx + 1 < line_starts.len() {
332        line_starts[idx + 1] // include terminator
333    } else {
334        data.len()
335    };
336    &data[start..end]
337}
338
339/// Linear scan for the end of a duplicate group.
340/// Returns the index of the first line that differs from line_starts[group_start].
341/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
342/// equal lines can appear in non-adjacent groups separated by different lines.
343/// Caches key length for fast length-mismatch rejection.
344#[inline]
345fn linear_scan_group_end(
346    data: &[u8],
347    line_starts: &[usize],
348    group_start: usize,
349    num_lines: usize,
350    content_end: usize,
351) -> usize {
352    let key = line_content_at(data, line_starts, group_start, content_end);
353    let key_len = key.len();
354    let mut i = group_start + 1;
355    while i < num_lines {
356        let candidate = line_content_at(data, line_starts, i, content_end);
357        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
358            return i;
359        }
360        i += 1;
361    }
362    i
363}
364
365/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
366/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
367/// General path: pre-computed line positions with binary search for groups.
368fn process_standard_bytes(
369    data: &[u8],
370    writer: &mut impl Write,
371    config: &UniqConfig,
372    term: u8,
373) -> io::Result<()> {
374    if data.is_empty() {
375        return Ok(());
376    }
377
378    let fast = !needs_key_extraction(config) && !config.ignore_case;
379    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
380
381    // Ultra-fast path: default mode, no count, no key extraction.
382    // Single-pass: scan with memchr, compare adjacent lines inline.
383    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
384    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
385        return process_default_fast_singlepass(data, writer, term);
386    }
387
388    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
389    if fast
390        && !config.count
391        && matches!(
392            config.mode,
393            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
394        )
395    {
396        return process_filter_fast_singlepass(data, writer, config, term);
397    }
398
399    // Ultra-fast path: count mode with no key extraction.
400    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
401    // Avoids the line_starts Vec allocation (20MB+ for large files).
402    if fast && config.count {
403        return process_count_fast_singlepass(data, writer, config, term);
404    }
405
406    // Fast path for case-insensitive (-i) mode with no key extraction.
407    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
408    // Avoids the general path's line_starts Vec allocation.
409    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
410        return process_default_ci_singlepass(data, writer, term);
411    }
412
413    if fast_ci
414        && !config.count
415        && matches!(
416            config.mode,
417            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
418        )
419    {
420        return process_filter_ci_singlepass(data, writer, config, term);
421    }
422
423    if fast_ci && config.count {
424        return process_count_ci_singlepass(data, writer, config, term);
425    }
426
427    // General path: pre-computed line positions for binary search on groups
428    let estimated_lines = (data.len() / 40).max(64);
429    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
430    line_starts.push(0);
431    for pos in memchr::memchr_iter(term, data) {
432        if pos + 1 < data.len() {
433            line_starts.push(pos + 1);
434        }
435    }
436    let num_lines = line_starts.len();
437    if num_lines == 0 {
438        return Ok(());
439    }
440
441    // Pre-compute content end: if data ends with terminator, exclude it for last line
442    let content_end = if data.last() == Some(&term) {
443        data.len() - 1
444    } else {
445        data.len()
446    };
447
448    // Ultra-fast path: default mode, no count, no key extraction
449    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
450        // Write first line
451        let first_full = line_full_at(data, &line_starts, 0);
452        let first_content = line_content_at(data, &line_starts, 0, content_end);
453        write_all_raw(writer, first_full)?;
454        if first_full.len() == first_content.len() {
455            writer.write_all(&[term])?;
456        }
457
458        let mut i = 1;
459        while i < num_lines {
460            let prev = line_content_at(data, &line_starts, i - 1, content_end);
461            let cur = line_content_at(data, &line_starts, i, content_end);
462
463            if lines_equal_fast(prev, cur) {
464                // Duplicate detected — linear scan for end of group
465                let group_end =
466                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
467                i = group_end;
468                continue;
469            }
470
471            // Unique line — write it
472            let cur_full = line_full_at(data, &line_starts, i);
473            write_all_raw(writer, cur_full)?;
474            if cur_full.len() == cur.len() {
475                writer.write_all(&[term])?;
476            }
477            i += 1;
478        }
479        return Ok(());
480    }
481
482    // General path with count tracking
483    let mut i = 0;
484    while i < num_lines {
485        let content = line_content_at(data, &line_starts, i, content_end);
486        let full = line_full_at(data, &line_starts, i);
487
488        let group_end = if fast
489            && i + 1 < num_lines
490            && lines_equal_fast(
491                content,
492                line_content_at(data, &line_starts, i + 1, content_end),
493            ) {
494            // Duplicate detected — linear scan for end
495            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
496        } else if !fast
497            && i + 1 < num_lines
498            && lines_equal(
499                content,
500                line_content_at(data, &line_starts, i + 1, content_end),
501                config,
502            )
503        {
504            // Slow path linear scan with key extraction
505            let mut j = i + 2;
506            while j < num_lines {
507                if !lines_equal(
508                    content,
509                    line_content_at(data, &line_starts, j, content_end),
510                    config,
511                ) {
512                    break;
513                }
514                j += 1;
515            }
516            j
517        } else {
518            i + 1
519        };
520
521        let count = (group_end - i) as u64;
522        output_group_bytes(writer, content, full, count, config, term)?;
523        i = group_end;
524    }
525
526    Ok(())
527}
528
529/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
530/// No pre-computed positions, no binary search, no Vec allocation.
531/// Outputs each line that differs from the previous.
532///
533/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
534/// independently, then cross-chunk boundaries are resolved.
535fn process_default_fast_singlepass(
536    data: &[u8],
537    writer: &mut impl Write,
538    term: u8,
539) -> io::Result<()> {
540    // Parallel path for large files
541    if data.len() >= 4 * 1024 * 1024 {
542        return process_default_parallel(data, writer, term);
543    }
544
545    process_default_sequential(data, writer, term)
546}
547
548/// Sequential single-pass dedup with zero-copy output.
549/// Instead of copying data to a buffer, tracks contiguous output runs and writes
550/// directly from the original data. For all-unique data, this is a single write_all.
551///
552/// Optimized for the "many duplicates" case: caches the previous line's length
553/// and first-8-byte prefix for fast rejection of non-duplicates without
554/// calling the full comparison function.
555fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
556    let mut prev_start: usize = 0;
557    let mut prev_end: usize; // exclusive, without terminator
558
559    // Find end of first line
560    match memchr::memchr(term, data) {
561        Some(pos) => {
562            prev_end = pos;
563        }
564        None => {
565            // Single line, no terminator
566            writer.write_all(data)?;
567            return writer.write_all(&[term]);
568        }
569    }
570
571    // Cache previous line metadata for fast comparison
572    let mut prev_len = prev_end - prev_start;
573    let mut prev_prefix: u64 = if prev_len >= 8 {
574        unsafe { (data.as_ptr().add(prev_start) as *const u64).read_unaligned() }
575    } else {
576        0
577    };
578
579    // run_start tracks the beginning of the current contiguous output region.
580    // When a duplicate is found, we flush the run up to the duplicate and skip it.
581    let mut run_start: usize = 0;
582    let mut cur_start = prev_end + 1;
583    let mut last_output_end = prev_end + 1; // exclusive end including terminator
584
585    while cur_start < data.len() {
586        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
587            Some(offset) => cur_start + offset,
588            None => data.len(), // last line without terminator
589        };
590
591        let cur_len = cur_end - cur_start;
592
593        // Fast reject: if lengths differ, lines are definitely not equal
594        let is_dup = if cur_len != prev_len {
595            false
596        } else if cur_len == 0 {
597            true
598        } else if cur_len >= 8 {
599            // Compare cached 8-byte prefix first
600            let cur_prefix =
601                unsafe { (data.as_ptr().add(cur_start) as *const u64).read_unaligned() };
602            if cur_prefix != prev_prefix {
603                false
604            } else if cur_len <= 8 {
605                true // prefix covers entire line
606            } else if cur_len <= 16 {
607                // Check last 8 bytes (overlapping)
608                let a_tail = unsafe {
609                    (data.as_ptr().add(prev_start + prev_len - 8) as *const u64).read_unaligned()
610                };
611                let b_tail = unsafe {
612                    (data.as_ptr().add(cur_start + cur_len - 8) as *const u64).read_unaligned()
613                };
614                a_tail == b_tail
615            } else {
616                data[prev_start..prev_end] == data[cur_start..cur_end]
617            }
618        } else {
619            // Short line < 8 bytes
620            data[prev_start..prev_end] == data[cur_start..cur_end]
621        };
622
623        if is_dup {
624            // Duplicate — flush the current run up to this line, then skip it
625            if run_start < cur_start {
626                writer.write_all(&data[run_start..cur_start])?;
627            }
628            // Start new run after this duplicate
629            if cur_end < data.len() {
630                run_start = cur_end + 1;
631            } else {
632                run_start = cur_end;
633            }
634        } else {
635            // Different line — update cached comparison state
636            prev_start = cur_start;
637            prev_end = cur_end;
638            prev_len = cur_len;
639            prev_prefix = if cur_len >= 8 {
640                unsafe { (data.as_ptr().add(cur_start) as *const u64).read_unaligned() }
641            } else {
642                0
643            };
644            last_output_end = if cur_end < data.len() {
645                cur_end + 1
646            } else {
647                cur_end
648            };
649        }
650
651        if cur_end < data.len() {
652            cur_start = cur_end + 1;
653        } else {
654            break;
655        }
656    }
657
658    // Flush remaining run
659    if run_start < data.len() {
660        writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
661    }
662
663    // Ensure trailing terminator
664    if !data.is_empty() && *data.last().unwrap() != term {
665        writer.write_all(&[term])?;
666    }
667
668    Ok(())
669}
670
671/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
672/// positions in each chunk in parallel, then write output runs directly from
673/// the original data. No per-chunk buffer allocation needed.
674fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
675    use rayon::prelude::*;
676
677    let num_threads = rayon::current_num_threads().max(1);
678    let chunk_target = data.len() / num_threads;
679
680    // Find chunk boundaries aligned to line terminators
681    let mut boundaries = Vec::with_capacity(num_threads + 1);
682    boundaries.push(0usize);
683    for i in 1..num_threads {
684        let target = i * chunk_target;
685        if target >= data.len() {
686            break;
687        }
688        if let Some(p) = memchr::memchr(term, &data[target..]) {
689            let b = target + p + 1;
690            if b > *boundaries.last().unwrap() && b <= data.len() {
691                boundaries.push(b);
692            }
693        }
694    }
695    boundaries.push(data.len());
696
697    let n_chunks = boundaries.len() - 1;
698    if n_chunks <= 1 {
699        return process_default_sequential(data, writer, term);
700    }
701
702    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
703    struct ChunkResult {
704        /// Byte ranges in the original data to output (contiguous runs)
705        runs: Vec<(usize, usize)>,
706        /// First line in chunk (absolute offsets into data, content without term)
707        first_line_start: usize,
708        first_line_end: usize,
709        /// Last *output* line in chunk (content without term)
710        last_line_start: usize,
711        last_line_end: usize,
712    }
713
714    let results: Vec<ChunkResult> = boundaries
715        .windows(2)
716        .collect::<Vec<_>>()
717        .par_iter()
718        .map(|w| {
719            let chunk_start = w[0];
720            let chunk_end = w[1];
721            let chunk = &data[chunk_start..chunk_end];
722
723            let first_term = match memchr::memchr(term, chunk) {
724                Some(pos) => pos,
725                None => {
726                    return ChunkResult {
727                        runs: vec![(chunk_start, chunk_end)],
728                        first_line_start: chunk_start,
729                        first_line_end: chunk_end,
730                        last_line_start: chunk_start,
731                        last_line_end: chunk_end,
732                    };
733                }
734            };
735
736            let first_line_start = chunk_start;
737            let first_line_end = chunk_start + first_term;
738
739            let mut runs: Vec<(usize, usize)> = Vec::new();
740            let mut run_start = chunk_start;
741            let mut prev_start = 0usize;
742            let mut prev_end = first_term;
743            let mut last_out_start = chunk_start;
744            let mut last_out_end = first_line_end;
745
746            let mut cur_start = first_term + 1;
747            while cur_start < chunk.len() {
748                let cur_end = match memchr::memchr(term, &chunk[cur_start..]) {
749                    Some(offset) => cur_start + offset,
750                    None => chunk.len(),
751                };
752
753                if lines_equal_fast(&chunk[prev_start..prev_end], &chunk[cur_start..cur_end]) {
754                    // Duplicate — flush current run up to this line
755                    let abs_cur = chunk_start + cur_start;
756                    if run_start < abs_cur {
757                        runs.push((run_start, abs_cur));
758                    }
759                    // New run starts after this duplicate
760                    run_start = chunk_start
761                        + if cur_end < chunk.len() {
762                            cur_end + 1
763                        } else {
764                            cur_end
765                        };
766                } else {
767                    last_out_start = chunk_start + cur_start;
768                    last_out_end = chunk_start + cur_end;
769                }
770                prev_start = cur_start;
771                prev_end = cur_end;
772
773                if cur_end < chunk.len() {
774                    cur_start = cur_end + 1;
775                } else {
776                    break;
777                }
778            }
779
780            // Close final run
781            if run_start < chunk_end {
782                runs.push((run_start, chunk_end));
783            }
784
785            ChunkResult {
786                runs,
787                first_line_start,
788                first_line_end,
789                last_line_start: last_out_start,
790                last_line_end: last_out_end,
791            }
792        })
793        .collect();
794
795    // Write results, adjusting cross-chunk boundaries
796    for (i, result) in results.iter().enumerate() {
797        let skip_first = if i > 0 {
798            let prev = &results[i - 1];
799            let prev_last = &data[prev.last_line_start..prev.last_line_end];
800            let cur_first = &data[result.first_line_start..result.first_line_end];
801            lines_equal_fast(prev_last, cur_first)
802        } else {
803            false
804        };
805
806        let skip_end = if skip_first {
807            // Skip bytes up to and including the first line's terminator
808            result.first_line_end + 1
809        } else {
810            0
811        };
812
813        for &(rs, re) in &result.runs {
814            let actual_start = rs.max(skip_end);
815            if actual_start < re {
816                writer.write_all(&data[actual_start..re])?;
817            }
818        }
819    }
820
821    // Ensure trailing terminator
822    if !data.is_empty() && *data.last().unwrap() != term {
823        writer.write_all(&[term])?;
824    }
825
826    Ok(())
827}
828
829/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
830/// Zero-copy: writes directly from input data, no output buffer allocation.
831/// Uses cached prefix comparison for fast duplicate detection.
832fn process_filter_fast_singlepass(
833    data: &[u8],
834    writer: &mut impl Write,
835    config: &UniqConfig,
836    term: u8,
837) -> io::Result<()> {
838    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
839
840    let first_term = match memchr::memchr(term, data) {
841        Some(pos) => pos,
842        None => {
843            // Single line: unique (count=1)
844            if !repeated {
845                writer.write_all(data)?;
846                writer.write_all(&[term])?;
847            }
848            return Ok(());
849        }
850    };
851
852    let mut prev_start: usize = 0;
853    let mut prev_end: usize = first_term;
854    let mut prev_len = prev_end;
855    let mut count: u64 = 1;
856    let mut cur_start = first_term + 1;
857
858    while cur_start < data.len() {
859        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
860            Some(offset) => cur_start + offset,
861            None => data.len(),
862        };
863
864        let cur_len = cur_end - cur_start;
865        let is_dup = cur_len == prev_len
866            && lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]);
867
868        if is_dup {
869            count += 1;
870        } else {
871            // Output previous group -- write directly from input data (zero-copy)
872            let should_print = if repeated { count > 1 } else { count == 1 };
873            if should_print {
874                if prev_end < data.len() && data[prev_end] == term {
875                    writer.write_all(&data[prev_start..prev_end + 1])?;
876                } else {
877                    writer.write_all(&data[prev_start..prev_end])?;
878                    writer.write_all(&[term])?;
879                }
880            }
881            prev_start = cur_start;
882            prev_end = cur_end;
883            prev_len = cur_len;
884            count = 1;
885        }
886
887        if cur_end < data.len() {
888            cur_start = cur_end + 1;
889        } else {
890            break;
891        }
892    }
893
894    // Output last group
895    let should_print = if repeated { count > 1 } else { count == 1 };
896    if should_print {
897        if prev_end < data.len() && data[prev_end] == term {
898            writer.write_all(&data[prev_start..prev_end + 1])?;
899        } else {
900            writer.write_all(&data[prev_start..prev_end])?;
901            writer.write_all(&[term])?;
902        }
903    }
904
905    Ok(())
906}
907
908/// Fast single-pass for count mode (-c) with all standard output modes.
909/// Zero line_starts allocation: scans with memchr, counts groups inline,
910/// and writes count-prefixed lines directly.
911/// Uses cached length comparison for fast duplicate rejection.
912fn process_count_fast_singlepass(
913    data: &[u8],
914    writer: &mut impl Write,
915    config: &UniqConfig,
916    term: u8,
917) -> io::Result<()> {
918    let first_term = match memchr::memchr(term, data) {
919        Some(pos) => pos,
920        None => {
921            // Single line: count=1
922            let should_print = match config.mode {
923                OutputMode::Default => true,
924                OutputMode::RepeatedOnly => false,
925                OutputMode::UniqueOnly => true,
926                _ => true,
927            };
928            if should_print {
929                write_count_line(writer, 1, data, term)?;
930            }
931            return Ok(());
932        }
933    };
934
935    let mut prev_start: usize = 0;
936    let mut prev_end: usize = first_term;
937    let mut prev_len = prev_end;
938    let mut count: u64 = 1;
939    let mut cur_start = first_term + 1;
940
941    while cur_start < data.len() {
942        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
943            Some(offset) => cur_start + offset,
944            None => data.len(),
945        };
946
947        let cur_len = cur_end - cur_start;
948        let is_dup = cur_len == prev_len
949            && lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]);
950
951        if is_dup {
952            count += 1;
953        } else {
954            // Output previous group with count
955            let should_print = match config.mode {
956                OutputMode::Default => true,
957                OutputMode::RepeatedOnly => count > 1,
958                OutputMode::UniqueOnly => count == 1,
959                _ => true,
960            };
961            if should_print {
962                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
963            }
964            prev_start = cur_start;
965            prev_end = cur_end;
966            prev_len = cur_len;
967            count = 1;
968        }
969
970        if cur_end < data.len() {
971            cur_start = cur_end + 1;
972        } else {
973            break;
974        }
975    }
976
977    // Output last group
978    let should_print = match config.mode {
979        OutputMode::Default => true,
980        OutputMode::RepeatedOnly => count > 1,
981        OutputMode::UniqueOnly => count == 1,
982        _ => true,
983    };
984    if should_print {
985        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
986    }
987
988    Ok(())
989}
990
991/// Fast single-pass for case-insensitive (-i) default mode.
992/// Same logic as process_default_sequential but uses eq_ignore_ascii_case.
993fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
994    let mut prev_start: usize = 0;
995    let mut prev_end: usize;
996
997    match memchr::memchr(term, data) {
998        Some(pos) => {
999            prev_end = pos;
1000        }
1001        None => {
1002            writer.write_all(data)?;
1003            return writer.write_all(&[term]);
1004        }
1005    }
1006
1007    // Write first line
1008    writer.write_all(&data[..prev_end + 1])?;
1009
1010    let mut cur_start = prev_end + 1;
1011
1012    while cur_start < data.len() {
1013        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1014            Some(offset) => cur_start + offset,
1015            None => data.len(),
1016        };
1017
1018        let prev_content = &data[prev_start..prev_end];
1019        let cur_content = &data[cur_start..cur_end];
1020
1021        if !lines_equal_case_insensitive(prev_content, cur_content) {
1022            // Different line — write it
1023            if cur_end < data.len() {
1024                writer.write_all(&data[cur_start..cur_end + 1])?;
1025            } else {
1026                writer.write_all(&data[cur_start..cur_end])?;
1027                writer.write_all(&[term])?;
1028            }
1029            prev_start = cur_start;
1030            prev_end = cur_end;
1031        }
1032
1033        if cur_end < data.len() {
1034            cur_start = cur_end + 1;
1035        } else {
1036            break;
1037        }
1038    }
1039
1040    Ok(())
1041}
1042
1043/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1044fn process_filter_ci_singlepass(
1045    data: &[u8],
1046    writer: &mut impl Write,
1047    config: &UniqConfig,
1048    term: u8,
1049) -> io::Result<()> {
1050    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1051
1052    let first_term = match memchr::memchr(term, data) {
1053        Some(pos) => pos,
1054        None => {
1055            if !repeated {
1056                writer.write_all(data)?;
1057                writer.write_all(&[term])?;
1058            }
1059            return Ok(());
1060        }
1061    };
1062
1063    let mut prev_start: usize = 0;
1064    let mut prev_end: usize = first_term;
1065    let mut count: u64 = 1;
1066    let mut cur_start = first_term + 1;
1067
1068    while cur_start < data.len() {
1069        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1070            Some(offset) => cur_start + offset,
1071            None => data.len(),
1072        };
1073
1074        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1075            count += 1;
1076        } else {
1077            let should_print = if repeated { count > 1 } else { count == 1 };
1078            if should_print {
1079                if prev_end < data.len() && data[prev_end] == term {
1080                    writer.write_all(&data[prev_start..prev_end + 1])?;
1081                } else {
1082                    writer.write_all(&data[prev_start..prev_end])?;
1083                    writer.write_all(&[term])?;
1084                }
1085            }
1086            prev_start = cur_start;
1087            prev_end = cur_end;
1088            count = 1;
1089        }
1090
1091        if cur_end < data.len() {
1092            cur_start = cur_end + 1;
1093        } else {
1094            break;
1095        }
1096    }
1097
1098    let should_print = if repeated { count > 1 } else { count == 1 };
1099    if should_print {
1100        if prev_end < data.len() && data[prev_end] == term {
1101            writer.write_all(&data[prev_start..prev_end + 1])?;
1102        } else {
1103            writer.write_all(&data[prev_start..prev_end])?;
1104            writer.write_all(&[term])?;
1105        }
1106    }
1107
1108    Ok(())
1109}
1110
1111/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1112fn process_count_ci_singlepass(
1113    data: &[u8],
1114    writer: &mut impl Write,
1115    config: &UniqConfig,
1116    term: u8,
1117) -> io::Result<()> {
1118    let first_term = match memchr::memchr(term, data) {
1119        Some(pos) => pos,
1120        None => {
1121            let should_print = match config.mode {
1122                OutputMode::Default => true,
1123                OutputMode::RepeatedOnly => false,
1124                OutputMode::UniqueOnly => true,
1125                _ => true,
1126            };
1127            if should_print {
1128                write_count_line(writer, 1, data, term)?;
1129            }
1130            return Ok(());
1131        }
1132    };
1133
1134    let mut prev_start: usize = 0;
1135    let mut prev_end: usize = first_term;
1136    let mut count: u64 = 1;
1137    let mut cur_start = first_term + 1;
1138
1139    while cur_start < data.len() {
1140        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1141            Some(offset) => cur_start + offset,
1142            None => data.len(),
1143        };
1144
1145        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1146            count += 1;
1147        } else {
1148            let should_print = match config.mode {
1149                OutputMode::Default => true,
1150                OutputMode::RepeatedOnly => count > 1,
1151                OutputMode::UniqueOnly => count == 1,
1152                _ => true,
1153            };
1154            if should_print {
1155                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1156            }
1157            prev_start = cur_start;
1158            prev_end = cur_end;
1159            count = 1;
1160        }
1161
1162        if cur_end < data.len() {
1163            cur_start = cur_end + 1;
1164        } else {
1165            break;
1166        }
1167    }
1168
1169    let should_print = match config.mode {
1170        OutputMode::Default => true,
1171        OutputMode::RepeatedOnly => count > 1,
1172        OutputMode::UniqueOnly => count == 1,
1173        _ => true,
1174    };
1175    if should_print {
1176        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1177    }
1178
1179    Ok(())
1180}
1181
1182/// Output a group for standard modes (bytes path).
1183#[inline(always)]
1184fn output_group_bytes(
1185    writer: &mut impl Write,
1186    content: &[u8],
1187    full: &[u8],
1188    count: u64,
1189    config: &UniqConfig,
1190    term: u8,
1191) -> io::Result<()> {
1192    let should_print = match config.mode {
1193        OutputMode::Default => true,
1194        OutputMode::RepeatedOnly => count > 1,
1195        OutputMode::UniqueOnly => count == 1,
1196        _ => true,
1197    };
1198
1199    if should_print {
1200        if config.count {
1201            write_count_line(writer, count, content, term)?;
1202        } else {
1203            writer.write_all(full)?;
1204            // Add terminator if the original line didn't have one
1205            if full.len() == content.len() {
1206                writer.write_all(&[term])?;
1207            }
1208        }
1209    }
1210
1211    Ok(())
1212}
1213
1214/// Process --all-repeated / -D mode on byte slices.
1215fn process_all_repeated_bytes(
1216    data: &[u8],
1217    writer: &mut impl Write,
1218    config: &UniqConfig,
1219    method: AllRepeatedMethod,
1220    term: u8,
1221) -> io::Result<()> {
1222    let mut lines = LineIter::new(data, term);
1223
1224    let first = match lines.next() {
1225        Some(v) => v,
1226        None => return Ok(()),
1227    };
1228
1229    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1230    // For all-repeated we need to buffer group lines since we only print if count > 1
1231    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1232    group_lines.push(first);
1233    let mut first_group_printed = false;
1234
1235    let fast = !needs_key_extraction(config) && !config.ignore_case;
1236
1237    for (cur_content, cur_full) in lines {
1238        let prev_content = group_lines.last().unwrap().0;
1239        let equal = if fast {
1240            lines_equal_fast(prev_content, cur_content)
1241        } else {
1242            lines_equal(prev_content, cur_content, config)
1243        };
1244
1245        if equal {
1246            group_lines.push((cur_content, cur_full));
1247        } else {
1248            // Flush group
1249            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1250            group_lines.clear();
1251            group_lines.push((cur_content, cur_full));
1252        }
1253    }
1254
1255    // Flush last group
1256    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1257
1258    Ok(())
1259}
1260
1261/// Flush a group for --all-repeated mode (bytes path).
1262fn flush_all_repeated_bytes(
1263    writer: &mut impl Write,
1264    group: &[(&[u8], &[u8])],
1265    method: AllRepeatedMethod,
1266    first_group_printed: &mut bool,
1267    term: u8,
1268) -> io::Result<()> {
1269    if group.len() <= 1 {
1270        return Ok(()); // Not a duplicate group
1271    }
1272
1273    match method {
1274        AllRepeatedMethod::Prepend => {
1275            writer.write_all(&[term])?;
1276        }
1277        AllRepeatedMethod::Separate => {
1278            if *first_group_printed {
1279                writer.write_all(&[term])?;
1280            }
1281        }
1282        AllRepeatedMethod::None => {}
1283    }
1284
1285    for &(content, full) in group {
1286        writer.write_all(full)?;
1287        if full.len() == content.len() {
1288            writer.write_all(&[term])?;
1289        }
1290    }
1291
1292    *first_group_printed = true;
1293    Ok(())
1294}
1295
1296/// Process --group mode on byte slices.
1297fn process_group_bytes(
1298    data: &[u8],
1299    writer: &mut impl Write,
1300    config: &UniqConfig,
1301    method: GroupMethod,
1302    term: u8,
1303) -> io::Result<()> {
1304    let mut lines = LineIter::new(data, term);
1305
1306    let (prev_content, prev_full) = match lines.next() {
1307        Some(v) => v,
1308        None => return Ok(()),
1309    };
1310
1311    // Prepend/Both: separator before first group
1312    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1313        writer.write_all(&[term])?;
1314    }
1315
1316    // Write first line
1317    writer.write_all(prev_full)?;
1318    if prev_full.len() == prev_content.len() {
1319        writer.write_all(&[term])?;
1320    }
1321
1322    let mut prev_content = prev_content;
1323    let fast = !needs_key_extraction(config) && !config.ignore_case;
1324
1325    for (cur_content, cur_full) in lines {
1326        let equal = if fast {
1327            lines_equal_fast(prev_content, cur_content)
1328        } else {
1329            lines_equal(prev_content, cur_content, config)
1330        };
1331
1332        if !equal {
1333            // New group — write separator
1334            writer.write_all(&[term])?;
1335        }
1336
1337        writer.write_all(cur_full)?;
1338        if cur_full.len() == cur_content.len() {
1339            writer.write_all(&[term])?;
1340        }
1341
1342        prev_content = cur_content;
1343    }
1344
1345    // Append/Both: separator after last group
1346    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1347        writer.write_all(&[term])?;
1348    }
1349
1350    Ok(())
1351}
1352
1353// ============================================================================
1354// Streaming processing (for stdin / pipe input)
1355// ============================================================================
1356
1357/// Main streaming uniq processor.
1358/// Reads from `input`, writes to `output`.
1359pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
1360    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
1361    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
1362    let term = if config.zero_terminated { b'\0' } else { b'\n' };
1363
1364    match config.mode {
1365        OutputMode::Group(method) => {
1366            process_group_stream(reader, &mut writer, config, method, term)?;
1367        }
1368        OutputMode::AllRepeated(method) => {
1369            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
1370        }
1371        _ => {
1372            process_standard_stream(reader, &mut writer, config, term)?;
1373        }
1374    }
1375
1376    writer.flush()?;
1377    Ok(())
1378}
1379
1380/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
1381fn process_standard_stream<R: BufRead, W: Write>(
1382    mut reader: R,
1383    writer: &mut W,
1384    config: &UniqConfig,
1385    term: u8,
1386) -> io::Result<()> {
1387    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1388    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1389
1390    // Read first line
1391    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1392        return Ok(()); // empty input
1393    }
1394    let mut count: u64 = 1;
1395
1396    loop {
1397        current_line.clear();
1398        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1399
1400        if bytes_read == 0 {
1401            // End of input — output the last group
1402            output_group_stream(writer, &prev_line, count, config, term)?;
1403            break;
1404        }
1405
1406        if compare_lines_stream(&prev_line, &current_line, config, term) {
1407            count += 1;
1408        } else {
1409            output_group_stream(writer, &prev_line, count, config, term)?;
1410            std::mem::swap(&mut prev_line, &mut current_line);
1411            count = 1;
1412        }
1413    }
1414
1415    Ok(())
1416}
1417
1418/// Compare two lines (with terminators) in streaming mode.
1419#[inline(always)]
1420fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
1421    let a_stripped = strip_term(a, term);
1422    let b_stripped = strip_term(b, term);
1423    lines_equal(a_stripped, b_stripped, config)
1424}
1425
1426/// Strip terminator from end of line.
1427#[inline(always)]
1428fn strip_term(line: &[u8], term: u8) -> &[u8] {
1429    if line.last() == Some(&term) {
1430        &line[..line.len() - 1]
1431    } else {
1432        line
1433    }
1434}
1435
1436/// Output a group in streaming mode.
1437#[inline(always)]
1438fn output_group_stream(
1439    writer: &mut impl Write,
1440    line: &[u8],
1441    count: u64,
1442    config: &UniqConfig,
1443    term: u8,
1444) -> io::Result<()> {
1445    let should_print = match config.mode {
1446        OutputMode::Default => true,
1447        OutputMode::RepeatedOnly => count > 1,
1448        OutputMode::UniqueOnly => count == 1,
1449        _ => true,
1450    };
1451
1452    if should_print {
1453        let content = strip_term(line, term);
1454        if config.count {
1455            write_count_line(writer, count, content, term)?;
1456        } else {
1457            writer.write_all(content)?;
1458            writer.write_all(&[term])?;
1459        }
1460    }
1461
1462    Ok(())
1463}
1464
1465/// Process --all-repeated / -D mode (streaming).
1466fn process_all_repeated_stream<R: BufRead, W: Write>(
1467    mut reader: R,
1468    writer: &mut W,
1469    config: &UniqConfig,
1470    method: AllRepeatedMethod,
1471    term: u8,
1472) -> io::Result<()> {
1473    let mut group: Vec<Vec<u8>> = Vec::new();
1474    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1475    let mut first_group_printed = false;
1476
1477    current_line.clear();
1478    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
1479        return Ok(());
1480    }
1481    group.push(current_line.clone());
1482
1483    loop {
1484        current_line.clear();
1485        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1486
1487        if bytes_read == 0 {
1488            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1489            break;
1490        }
1491
1492        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
1493            group.push(current_line.clone());
1494        } else {
1495            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1496            group.clear();
1497            group.push(current_line.clone());
1498        }
1499    }
1500
1501    Ok(())
1502}
1503
1504/// Flush a group for --all-repeated mode (streaming).
1505fn flush_all_repeated_stream(
1506    writer: &mut impl Write,
1507    group: &[Vec<u8>],
1508    method: AllRepeatedMethod,
1509    first_group_printed: &mut bool,
1510    term: u8,
1511) -> io::Result<()> {
1512    if group.len() <= 1 {
1513        return Ok(());
1514    }
1515
1516    match method {
1517        AllRepeatedMethod::Prepend => {
1518            writer.write_all(&[term])?;
1519        }
1520        AllRepeatedMethod::Separate => {
1521            if *first_group_printed {
1522                writer.write_all(&[term])?;
1523            }
1524        }
1525        AllRepeatedMethod::None => {}
1526    }
1527
1528    for line in group {
1529        let content = strip_term(line, term);
1530        writer.write_all(content)?;
1531        writer.write_all(&[term])?;
1532    }
1533
1534    *first_group_printed = true;
1535    Ok(())
1536}
1537
1538/// Process --group mode (streaming).
1539fn process_group_stream<R: BufRead, W: Write>(
1540    mut reader: R,
1541    writer: &mut W,
1542    config: &UniqConfig,
1543    method: GroupMethod,
1544    term: u8,
1545) -> io::Result<()> {
1546    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1547    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1548
1549    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1550        return Ok(());
1551    }
1552
1553    // Prepend/Both: separator before first group
1554    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1555        writer.write_all(&[term])?;
1556    }
1557
1558    let content = strip_term(&prev_line, term);
1559    writer.write_all(content)?;
1560    writer.write_all(&[term])?;
1561
1562    loop {
1563        current_line.clear();
1564        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1565
1566        if bytes_read == 0 {
1567            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1568                writer.write_all(&[term])?;
1569            }
1570            break;
1571        }
1572
1573        if !compare_lines_stream(&prev_line, &current_line, config, term) {
1574            writer.write_all(&[term])?;
1575        }
1576
1577        let content = strip_term(&current_line, term);
1578        writer.write_all(content)?;
1579        writer.write_all(&[term])?;
1580
1581        std::mem::swap(&mut prev_line, &mut current_line);
1582    }
1583
1584    Ok(())
1585}
1586
1587/// Read a line terminated by the given byte (newline or NUL).
1588/// Returns number of bytes read (0 = EOF).
1589#[inline(always)]
1590fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1591    reader.read_until(term, buf)
1592}