Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
119/// Uses length check + 8-byte prefix rejection before full comparison.
120#[inline(always)]
121fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
122    let alen = a.len();
123    if alen != b.len() {
124        return false;
125    }
126    if alen == 0 {
127        return true;
128    }
129    a.eq_ignore_ascii_case(b)
130}
131
132/// Check if config requires field/char skipping or char limiting.
133#[inline(always)]
134fn needs_key_extraction(config: &UniqConfig) -> bool {
135    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
136}
137
138/// Fast path comparison: no field/char extraction needed, no case folding.
139/// Uses pointer+length equality shortcut and multi-word prefix rejection.
140/// For short lines (<= 32 bytes, common in many-dups data), avoids the
141/// full memcmp call overhead by doing direct word comparisons.
142#[inline(always)]
143fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
144    let alen = a.len();
145    if alen != b.len() {
146        return false;
147    }
148    if alen == 0 {
149        return true;
150    }
151    // Short-line fast path: compare via word loads to avoid memcmp call overhead
152    if alen <= 8 {
153        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
154        return a == b;
155    }
156    // 8-byte prefix check: reject most non-equal lines without full memcmp
157    let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
158    let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
159    if a8 != b8 {
160        return false;
161    }
162    // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
163    if alen <= 16 {
164        let a_tail = unsafe { (a.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
165        let b_tail = unsafe { (b.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
166        return a_tail == b_tail;
167    }
168    // For 17-32 bytes: check first 16 + last 16 (overlapping) to avoid memcmp
169    if alen <= 32 {
170        let a16 = unsafe { (a.as_ptr().add(8) as *const u64).read_unaligned() };
171        let b16 = unsafe { (b.as_ptr().add(8) as *const u64).read_unaligned() };
172        if a16 != b16 {
173            return false;
174        }
175        let a_tail = unsafe { (a.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
176        let b_tail = unsafe { (b.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
177        return a_tail == b_tail;
178    }
179    // Longer lines: prefix passed, fall through to full memcmp
180    a == b
181}
182
183/// Write a count-prefixed line in GNU uniq format.
184/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
185/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
186#[inline(always)]
187fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
188    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
189    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
190    let digits = itoa_right_aligned_into(&mut prefix, count);
191    let width = digits.max(7); // minimum 7 chars
192    let prefix_len = width + 1; // +1 for trailing space
193    prefix[width] = b' ';
194
195    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
196    let total = prefix_len + line.len() + 1;
197    if total <= 256 {
198        let mut buf = [0u8; 256];
199        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
200        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
201        buf[prefix_len + line.len()] = term;
202        out.write_all(&buf[..total])
203    } else {
204        out.write_all(&prefix[..prefix_len])?;
205        out.write_all(line)?;
206        out.write_all(&[term])
207    }
208}
209
210/// Write u64 decimal right-aligned into prefix buffer.
211/// Buffer is pre-filled with spaces. Returns number of digits written.
212#[inline(always)]
213fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
214    if val == 0 {
215        buf[6] = b'0';
216        return 7; // 6 spaces + '0' = 7 chars
217    }
218    // Write digits right-to-left from position 27 (leaving room for trailing space)
219    let mut pos = 27;
220    while val > 0 {
221        pos -= 1;
222        buf[pos] = b'0' + (val % 10) as u8;
223        val /= 10;
224    }
225    let num_digits = 27 - pos;
226    if num_digits >= 7 {
227        // Number is wide enough, shift to front
228        buf.copy_within(pos..27, 0);
229        num_digits
230    } else {
231        // Right-align in 7-char field: spaces then digits
232        let pad = 7 - num_digits;
233        buf.copy_within(pos..27, pad);
234        // buf[0..pad] is already spaces from initialization
235        7
236    }
237}
238
239// ============================================================================
240// High-performance mmap-based processing (for byte slices, zero-copy)
241// ============================================================================
242
243/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
244pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
245    // 16MB output buffer — optimal for L3 cache utilization on modern CPUs.
246    // 32MB can cause cache thrashing; 16MB stays within L3 while still
247    // reducing write() syscall count significantly.
248    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
249    let term = if config.zero_terminated { b'\0' } else { b'\n' };
250
251    match config.mode {
252        OutputMode::Group(method) => {
253            process_group_bytes(data, &mut writer, config, method, term)?;
254        }
255        OutputMode::AllRepeated(method) => {
256            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
257        }
258        _ => {
259            process_standard_bytes(data, &mut writer, config, term)?;
260        }
261    }
262
263    writer.flush()?;
264    Ok(())
265}
266
267/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
268/// Uses memchr for SIMD-accelerated line boundary detection.
269struct LineIter<'a> {
270    data: &'a [u8],
271    pos: usize,
272    term: u8,
273}
274
275impl<'a> LineIter<'a> {
276    #[inline(always)]
277    fn new(data: &'a [u8], term: u8) -> Self {
278        Self { data, pos: 0, term }
279    }
280}
281
282impl<'a> Iterator for LineIter<'a> {
283    /// (line content without terminator, full line including terminator for output)
284    type Item = (&'a [u8], &'a [u8]);
285
286    #[inline(always)]
287    fn next(&mut self) -> Option<Self::Item> {
288        if self.pos >= self.data.len() {
289            return None;
290        }
291
292        let remaining = &self.data[self.pos..];
293        match memchr::memchr(self.term, remaining) {
294            Some(idx) => {
295                let line_start = self.pos;
296                let line_end = self.pos + idx; // without terminator
297                let full_end = self.pos + idx + 1; // with terminator
298                self.pos = full_end;
299                Some((
300                    &self.data[line_start..line_end],
301                    &self.data[line_start..full_end],
302                ))
303            }
304            None => {
305                // Last line without terminator
306                let line_start = self.pos;
307                self.pos = self.data.len();
308                let line = &self.data[line_start..];
309                Some((line, line))
310            }
311        }
312    }
313}
314
315/// Get line content (without terminator) from pre-computed positions.
316/// `content_end` is the end of actual content (excludes trailing terminator if present).
317#[inline(always)]
318fn line_content_at<'a>(
319    data: &'a [u8],
320    line_starts: &[usize],
321    idx: usize,
322    content_end: usize,
323) -> &'a [u8] {
324    let start = line_starts[idx];
325    let end = if idx + 1 < line_starts.len() {
326        line_starts[idx + 1] - 1 // exclude terminator
327    } else {
328        content_end // last line: pre-computed to exclude trailing terminator
329    };
330    &data[start..end]
331}
332
333/// Get full line (with terminator) from pre-computed positions.
334#[inline(always)]
335fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
336    let start = line_starts[idx];
337    let end = if idx + 1 < line_starts.len() {
338        line_starts[idx + 1] // include terminator
339    } else {
340        data.len()
341    };
342    &data[start..end]
343}
344
345/// Linear scan for the end of a duplicate group.
346/// Returns the index of the first line that differs from line_starts[group_start].
347/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
348/// equal lines can appear in non-adjacent groups separated by different lines.
349/// Caches key length for fast length-mismatch rejection.
350#[inline]
351fn linear_scan_group_end(
352    data: &[u8],
353    line_starts: &[usize],
354    group_start: usize,
355    num_lines: usize,
356    content_end: usize,
357) -> usize {
358    let key = line_content_at(data, line_starts, group_start, content_end);
359    let key_len = key.len();
360    let mut i = group_start + 1;
361    while i < num_lines {
362        let candidate = line_content_at(data, line_starts, i, content_end);
363        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
364            return i;
365        }
366        i += 1;
367    }
368    i
369}
370
371/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
372/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
373/// General path: pre-computed line positions with binary search for groups.
374fn process_standard_bytes(
375    data: &[u8],
376    writer: &mut impl Write,
377    config: &UniqConfig,
378    term: u8,
379) -> io::Result<()> {
380    if data.is_empty() {
381        return Ok(());
382    }
383
384    let fast = !needs_key_extraction(config) && !config.ignore_case;
385    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
386
387    // Ultra-fast path: default mode, no count, no key extraction.
388    // Single-pass: scan with memchr, compare adjacent lines inline.
389    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
390    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
391        return process_default_fast_singlepass(data, writer, term);
392    }
393
394    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
395    if fast
396        && !config.count
397        && matches!(
398            config.mode,
399            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
400        )
401    {
402        return process_filter_fast_singlepass(data, writer, config, term);
403    }
404
405    // Ultra-fast path: count mode with no key extraction.
406    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
407    // Avoids the line_starts Vec allocation (20MB+ for large files).
408    if fast && config.count {
409        return process_count_fast_singlepass(data, writer, config, term);
410    }
411
412    // Fast path for case-insensitive (-i) mode with no key extraction.
413    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
414    // Avoids the general path's line_starts Vec allocation.
415    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
416        return process_default_ci_singlepass(data, writer, term);
417    }
418
419    if fast_ci
420        && !config.count
421        && matches!(
422            config.mode,
423            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
424        )
425    {
426        return process_filter_ci_singlepass(data, writer, config, term);
427    }
428
429    if fast_ci && config.count {
430        return process_count_ci_singlepass(data, writer, config, term);
431    }
432
433    // General path: pre-computed line positions for binary search on groups
434    let estimated_lines = (data.len() / 40).max(64);
435    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
436    line_starts.push(0);
437    for pos in memchr::memchr_iter(term, data) {
438        if pos + 1 < data.len() {
439            line_starts.push(pos + 1);
440        }
441    }
442    let num_lines = line_starts.len();
443    if num_lines == 0 {
444        return Ok(());
445    }
446
447    // Pre-compute content end: if data ends with terminator, exclude it for last line
448    let content_end = if data.last() == Some(&term) {
449        data.len() - 1
450    } else {
451        data.len()
452    };
453
454    // Ultra-fast path: default mode, no count, no key extraction
455    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
456        // Write first line
457        let first_full = line_full_at(data, &line_starts, 0);
458        let first_content = line_content_at(data, &line_starts, 0, content_end);
459        write_all_raw(writer, first_full)?;
460        if first_full.len() == first_content.len() {
461            writer.write_all(&[term])?;
462        }
463
464        let mut i = 1;
465        while i < num_lines {
466            let prev = line_content_at(data, &line_starts, i - 1, content_end);
467            let cur = line_content_at(data, &line_starts, i, content_end);
468
469            if lines_equal_fast(prev, cur) {
470                // Duplicate detected — linear scan for end of group
471                let group_end =
472                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
473                i = group_end;
474                continue;
475            }
476
477            // Unique line — write it
478            let cur_full = line_full_at(data, &line_starts, i);
479            write_all_raw(writer, cur_full)?;
480            if cur_full.len() == cur.len() {
481                writer.write_all(&[term])?;
482            }
483            i += 1;
484        }
485        return Ok(());
486    }
487
488    // General path with count tracking
489    let mut i = 0;
490    while i < num_lines {
491        let content = line_content_at(data, &line_starts, i, content_end);
492        let full = line_full_at(data, &line_starts, i);
493
494        let group_end = if fast
495            && i + 1 < num_lines
496            && lines_equal_fast(
497                content,
498                line_content_at(data, &line_starts, i + 1, content_end),
499            ) {
500            // Duplicate detected — linear scan for end
501            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
502        } else if !fast
503            && i + 1 < num_lines
504            && lines_equal(
505                content,
506                line_content_at(data, &line_starts, i + 1, content_end),
507                config,
508            )
509        {
510            // Slow path linear scan with key extraction
511            let mut j = i + 2;
512            while j < num_lines {
513                if !lines_equal(
514                    content,
515                    line_content_at(data, &line_starts, j, content_end),
516                    config,
517                ) {
518                    break;
519                }
520                j += 1;
521            }
522            j
523        } else {
524            i + 1
525        };
526
527        let count = (group_end - i) as u64;
528        output_group_bytes(writer, content, full, count, config, term)?;
529        i = group_end;
530    }
531
532    Ok(())
533}
534
535/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
536/// No pre-computed positions, no binary search, no Vec allocation.
537/// Outputs each line that differs from the previous.
538///
539/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
540/// independently, then cross-chunk boundaries are resolved.
541fn process_default_fast_singlepass(
542    data: &[u8],
543    writer: &mut impl Write,
544    term: u8,
545) -> io::Result<()> {
546    // Parallel path for large files — kick in at 4MB.
547    // Lower thresholds (e.g. 2MB) hurt performance on 10MB files because
548    // the parallel overhead dominates for smaller chunks.
549    if data.len() >= 4 * 1024 * 1024 {
550        return process_default_parallel(data, writer, term);
551    }
552
553    process_default_sequential(data, writer, term)
554}
555
556/// Sequential single-pass dedup with zero-copy output.
557/// Instead of copying data to a buffer, tracks contiguous output runs and writes
558/// directly from the original data. For all-unique data, this is a single write_all.
559///
560/// Optimized for the "many duplicates" case: caches the previous line's length
561/// and first-8-byte prefix for fast rejection of non-duplicates without
562/// calling the full comparison function.
563fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
564    let mut prev_start: usize = 0;
565    let mut prev_end: usize; // exclusive, without terminator
566
567    // Find end of first line
568    match memchr::memchr(term, data) {
569        Some(pos) => {
570            prev_end = pos;
571        }
572        None => {
573            // Single line, no terminator
574            writer.write_all(data)?;
575            return writer.write_all(&[term]);
576        }
577    }
578
579    // Cache previous line metadata for fast comparison
580    let mut prev_len = prev_end - prev_start;
581    let mut prev_prefix: u64 = if prev_len >= 8 {
582        unsafe { (data.as_ptr().add(prev_start) as *const u64).read_unaligned() }
583    } else {
584        0
585    };
586
587    // run_start tracks the beginning of the current contiguous output region.
588    // When a duplicate is found, we flush the run up to the duplicate and skip it.
589    let mut run_start: usize = 0;
590    let mut cur_start = prev_end + 1;
591    let mut last_output_end = prev_end + 1; // exclusive end including terminator
592
593    while cur_start < data.len() {
594        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
595            Some(offset) => cur_start + offset,
596            None => data.len(), // last line without terminator
597        };
598
599        let cur_len = cur_end - cur_start;
600
601        // Fast reject: if lengths differ, lines are definitely not equal
602        let is_dup = if cur_len != prev_len {
603            false
604        } else if cur_len == 0 {
605            true
606        } else if cur_len >= 8 {
607            // Compare cached 8-byte prefix first
608            let cur_prefix =
609                unsafe { (data.as_ptr().add(cur_start) as *const u64).read_unaligned() };
610            if cur_prefix != prev_prefix {
611                false
612            } else if cur_len <= 8 {
613                true // prefix covers entire line
614            } else if cur_len <= 16 {
615                // Check last 8 bytes (overlapping)
616                let a_tail = unsafe {
617                    (data.as_ptr().add(prev_start + prev_len - 8) as *const u64).read_unaligned()
618                };
619                let b_tail = unsafe {
620                    (data.as_ptr().add(cur_start + cur_len - 8) as *const u64).read_unaligned()
621                };
622                a_tail == b_tail
623            } else if cur_len <= 32 {
624                // Check bytes 8-16 and last 8 bytes
625                let a16 =
626                    unsafe { (data.as_ptr().add(prev_start + 8) as *const u64).read_unaligned() };
627                let b16 =
628                    unsafe { (data.as_ptr().add(cur_start + 8) as *const u64).read_unaligned() };
629                if a16 != b16 {
630                    false
631                } else {
632                    let a_tail = unsafe {
633                        (data.as_ptr().add(prev_start + prev_len - 8) as *const u64)
634                            .read_unaligned()
635                    };
636                    let b_tail = unsafe {
637                        (data.as_ptr().add(cur_start + cur_len - 8) as *const u64).read_unaligned()
638                    };
639                    a_tail == b_tail
640                }
641            } else {
642                data[prev_start..prev_end] == data[cur_start..cur_end]
643            }
644        } else {
645            // Short line < 8 bytes
646            data[prev_start..prev_end] == data[cur_start..cur_end]
647        };
648
649        if is_dup {
650            // Duplicate — flush the current run up to this line, then skip it
651            if run_start < cur_start {
652                writer.write_all(&data[run_start..cur_start])?;
653            }
654            // Start new run after this duplicate
655            if cur_end < data.len() {
656                run_start = cur_end + 1;
657            } else {
658                run_start = cur_end;
659            }
660        } else {
661            // Different line — update cached comparison state
662            prev_start = cur_start;
663            prev_end = cur_end;
664            prev_len = cur_len;
665            prev_prefix = if cur_len >= 8 {
666                unsafe { (data.as_ptr().add(cur_start) as *const u64).read_unaligned() }
667            } else {
668                0
669            };
670            last_output_end = if cur_end < data.len() {
671                cur_end + 1
672            } else {
673                cur_end
674            };
675        }
676
677        if cur_end < data.len() {
678            cur_start = cur_end + 1;
679        } else {
680            break;
681        }
682    }
683
684    // Flush remaining run
685    if run_start < data.len() {
686        writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
687    }
688
689    // Ensure trailing terminator
690    if !data.is_empty() && *data.last().unwrap() != term {
691        writer.write_all(&[term])?;
692    }
693
694    Ok(())
695}
696
697/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
698/// positions in each chunk in parallel, then write output runs directly from
699/// the original data. No per-chunk buffer allocation needed.
700fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
701    use rayon::prelude::*;
702
703    let num_threads = rayon::current_num_threads().max(1);
704    let chunk_target = data.len() / num_threads;
705
706    // Find chunk boundaries aligned to line terminators
707    let mut boundaries = Vec::with_capacity(num_threads + 1);
708    boundaries.push(0usize);
709    for i in 1..num_threads {
710        let target = i * chunk_target;
711        if target >= data.len() {
712            break;
713        }
714        if let Some(p) = memchr::memchr(term, &data[target..]) {
715            let b = target + p + 1;
716            if b > *boundaries.last().unwrap() && b <= data.len() {
717                boundaries.push(b);
718            }
719        }
720    }
721    boundaries.push(data.len());
722
723    let n_chunks = boundaries.len() - 1;
724    if n_chunks <= 1 {
725        return process_default_sequential(data, writer, term);
726    }
727
728    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
729    struct ChunkResult {
730        /// Byte ranges in the original data to output (contiguous runs)
731        runs: Vec<(usize, usize)>,
732        /// First line in chunk (absolute offsets into data, content without term)
733        first_line_start: usize,
734        first_line_end: usize,
735        /// Last *output* line in chunk (content without term)
736        last_line_start: usize,
737        last_line_end: usize,
738    }
739
740    let results: Vec<ChunkResult> = boundaries
741        .windows(2)
742        .collect::<Vec<_>>()
743        .par_iter()
744        .map(|w| {
745            let chunk_start = w[0];
746            let chunk_end = w[1];
747            let chunk = &data[chunk_start..chunk_end];
748
749            let first_term = match memchr::memchr(term, chunk) {
750                Some(pos) => pos,
751                None => {
752                    return ChunkResult {
753                        runs: vec![(chunk_start, chunk_end)],
754                        first_line_start: chunk_start,
755                        first_line_end: chunk_end,
756                        last_line_start: chunk_start,
757                        last_line_end: chunk_end,
758                    };
759                }
760            };
761
762            let first_line_start = chunk_start;
763            let first_line_end = chunk_start + first_term;
764
765            let mut runs: Vec<(usize, usize)> = Vec::new();
766            let mut run_start = chunk_start;
767            let mut prev_start = 0usize;
768            let mut prev_end = first_term;
769            let mut last_out_start = chunk_start;
770            let mut last_out_end = first_line_end;
771
772            let mut cur_start = first_term + 1;
773            while cur_start < chunk.len() {
774                let cur_end = match memchr::memchr(term, &chunk[cur_start..]) {
775                    Some(offset) => cur_start + offset,
776                    None => chunk.len(),
777                };
778
779                if lines_equal_fast(&chunk[prev_start..prev_end], &chunk[cur_start..cur_end]) {
780                    // Duplicate — flush current run up to this line
781                    let abs_cur = chunk_start + cur_start;
782                    if run_start < abs_cur {
783                        runs.push((run_start, abs_cur));
784                    }
785                    // New run starts after this duplicate
786                    run_start = chunk_start
787                        + if cur_end < chunk.len() {
788                            cur_end + 1
789                        } else {
790                            cur_end
791                        };
792                } else {
793                    last_out_start = chunk_start + cur_start;
794                    last_out_end = chunk_start + cur_end;
795                }
796                prev_start = cur_start;
797                prev_end = cur_end;
798
799                if cur_end < chunk.len() {
800                    cur_start = cur_end + 1;
801                } else {
802                    break;
803                }
804            }
805
806            // Close final run
807            if run_start < chunk_end {
808                runs.push((run_start, chunk_end));
809            }
810
811            ChunkResult {
812                runs,
813                first_line_start,
814                first_line_end,
815                last_line_start: last_out_start,
816                last_line_end: last_out_end,
817            }
818        })
819        .collect();
820
821    // Write results, adjusting cross-chunk boundaries
822    for (i, result) in results.iter().enumerate() {
823        let skip_first = if i > 0 {
824            let prev = &results[i - 1];
825            let prev_last = &data[prev.last_line_start..prev.last_line_end];
826            let cur_first = &data[result.first_line_start..result.first_line_end];
827            lines_equal_fast(prev_last, cur_first)
828        } else {
829            false
830        };
831
832        let skip_end = if skip_first {
833            // Skip bytes up to and including the first line's terminator
834            result.first_line_end + 1
835        } else {
836            0
837        };
838
839        for &(rs, re) in &result.runs {
840            let actual_start = rs.max(skip_end);
841            if actual_start < re {
842                writer.write_all(&data[actual_start..re])?;
843            }
844        }
845    }
846
847    // Ensure trailing terminator
848    if !data.is_empty() && *data.last().unwrap() != term {
849        writer.write_all(&[term])?;
850    }
851
852    Ok(())
853}
854
855/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
856/// Zero-copy: writes directly from input data, no output buffer allocation.
857/// Uses cached prefix comparison for fast duplicate detection.
858fn process_filter_fast_singlepass(
859    data: &[u8],
860    writer: &mut impl Write,
861    config: &UniqConfig,
862    term: u8,
863) -> io::Result<()> {
864    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
865
866    let first_term = match memchr::memchr(term, data) {
867        Some(pos) => pos,
868        None => {
869            // Single line: unique (count=1)
870            if !repeated {
871                writer.write_all(data)?;
872                writer.write_all(&[term])?;
873            }
874            return Ok(());
875        }
876    };
877
878    let mut prev_start: usize = 0;
879    let mut prev_end: usize = first_term;
880    let mut prev_len = prev_end;
881    let mut count: u64 = 1;
882    let mut cur_start = first_term + 1;
883
884    while cur_start < data.len() {
885        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
886            Some(offset) => cur_start + offset,
887            None => data.len(),
888        };
889
890        let cur_len = cur_end - cur_start;
891        let is_dup = cur_len == prev_len
892            && lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]);
893
894        if is_dup {
895            count += 1;
896        } else {
897            // Output previous group -- write directly from input data (zero-copy)
898            let should_print = if repeated { count > 1 } else { count == 1 };
899            if should_print {
900                if prev_end < data.len() && data[prev_end] == term {
901                    writer.write_all(&data[prev_start..prev_end + 1])?;
902                } else {
903                    writer.write_all(&data[prev_start..prev_end])?;
904                    writer.write_all(&[term])?;
905                }
906            }
907            prev_start = cur_start;
908            prev_end = cur_end;
909            prev_len = cur_len;
910            count = 1;
911        }
912
913        if cur_end < data.len() {
914            cur_start = cur_end + 1;
915        } else {
916            break;
917        }
918    }
919
920    // Output last group
921    let should_print = if repeated { count > 1 } else { count == 1 };
922    if should_print {
923        if prev_end < data.len() && data[prev_end] == term {
924            writer.write_all(&data[prev_start..prev_end + 1])?;
925        } else {
926            writer.write_all(&data[prev_start..prev_end])?;
927            writer.write_all(&[term])?;
928        }
929    }
930
931    Ok(())
932}
933
934/// Fast single-pass for count mode (-c) with all standard output modes.
935/// Zero line_starts allocation: scans with memchr, counts groups inline,
936/// and writes count-prefixed lines directly.
937/// Uses cached length comparison for fast duplicate rejection.
938fn process_count_fast_singlepass(
939    data: &[u8],
940    writer: &mut impl Write,
941    config: &UniqConfig,
942    term: u8,
943) -> io::Result<()> {
944    let first_term = match memchr::memchr(term, data) {
945        Some(pos) => pos,
946        None => {
947            // Single line: count=1
948            let should_print = match config.mode {
949                OutputMode::Default => true,
950                OutputMode::RepeatedOnly => false,
951                OutputMode::UniqueOnly => true,
952                _ => true,
953            };
954            if should_print {
955                write_count_line(writer, 1, data, term)?;
956            }
957            return Ok(());
958        }
959    };
960
961    let mut prev_start: usize = 0;
962    let mut prev_end: usize = first_term;
963    let mut prev_len = prev_end;
964    let mut count: u64 = 1;
965    let mut cur_start = first_term + 1;
966
967    while cur_start < data.len() {
968        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
969            Some(offset) => cur_start + offset,
970            None => data.len(),
971        };
972
973        let cur_len = cur_end - cur_start;
974        let is_dup = cur_len == prev_len
975            && lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]);
976
977        if is_dup {
978            count += 1;
979        } else {
980            // Output previous group with count
981            let should_print = match config.mode {
982                OutputMode::Default => true,
983                OutputMode::RepeatedOnly => count > 1,
984                OutputMode::UniqueOnly => count == 1,
985                _ => true,
986            };
987            if should_print {
988                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
989            }
990            prev_start = cur_start;
991            prev_end = cur_end;
992            prev_len = cur_len;
993            count = 1;
994        }
995
996        if cur_end < data.len() {
997            cur_start = cur_end + 1;
998        } else {
999            break;
1000        }
1001    }
1002
1003    // Output last group
1004    let should_print = match config.mode {
1005        OutputMode::Default => true,
1006        OutputMode::RepeatedOnly => count > 1,
1007        OutputMode::UniqueOnly => count == 1,
1008        _ => true,
1009    };
1010    if should_print {
1011        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1012    }
1013
1014    Ok(())
1015}
1016
1017/// Fast single-pass for case-insensitive (-i) default mode.
1018/// Same logic as process_default_sequential but uses eq_ignore_ascii_case.
1019fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1020    let mut prev_start: usize = 0;
1021    let mut prev_end: usize;
1022
1023    match memchr::memchr(term, data) {
1024        Some(pos) => {
1025            prev_end = pos;
1026        }
1027        None => {
1028            writer.write_all(data)?;
1029            return writer.write_all(&[term]);
1030        }
1031    }
1032
1033    // Write first line
1034    writer.write_all(&data[..prev_end + 1])?;
1035
1036    let mut cur_start = prev_end + 1;
1037
1038    while cur_start < data.len() {
1039        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1040            Some(offset) => cur_start + offset,
1041            None => data.len(),
1042        };
1043
1044        let prev_content = &data[prev_start..prev_end];
1045        let cur_content = &data[cur_start..cur_end];
1046
1047        if !lines_equal_case_insensitive(prev_content, cur_content) {
1048            // Different line — write it
1049            if cur_end < data.len() {
1050                writer.write_all(&data[cur_start..cur_end + 1])?;
1051            } else {
1052                writer.write_all(&data[cur_start..cur_end])?;
1053                writer.write_all(&[term])?;
1054            }
1055            prev_start = cur_start;
1056            prev_end = cur_end;
1057        }
1058
1059        if cur_end < data.len() {
1060            cur_start = cur_end + 1;
1061        } else {
1062            break;
1063        }
1064    }
1065
1066    Ok(())
1067}
1068
1069/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1070fn process_filter_ci_singlepass(
1071    data: &[u8],
1072    writer: &mut impl Write,
1073    config: &UniqConfig,
1074    term: u8,
1075) -> io::Result<()> {
1076    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1077
1078    let first_term = match memchr::memchr(term, data) {
1079        Some(pos) => pos,
1080        None => {
1081            if !repeated {
1082                writer.write_all(data)?;
1083                writer.write_all(&[term])?;
1084            }
1085            return Ok(());
1086        }
1087    };
1088
1089    let mut prev_start: usize = 0;
1090    let mut prev_end: usize = first_term;
1091    let mut count: u64 = 1;
1092    let mut cur_start = first_term + 1;
1093
1094    while cur_start < data.len() {
1095        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1096            Some(offset) => cur_start + offset,
1097            None => data.len(),
1098        };
1099
1100        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1101            count += 1;
1102        } else {
1103            let should_print = if repeated { count > 1 } else { count == 1 };
1104            if should_print {
1105                if prev_end < data.len() && data[prev_end] == term {
1106                    writer.write_all(&data[prev_start..prev_end + 1])?;
1107                } else {
1108                    writer.write_all(&data[prev_start..prev_end])?;
1109                    writer.write_all(&[term])?;
1110                }
1111            }
1112            prev_start = cur_start;
1113            prev_end = cur_end;
1114            count = 1;
1115        }
1116
1117        if cur_end < data.len() {
1118            cur_start = cur_end + 1;
1119        } else {
1120            break;
1121        }
1122    }
1123
1124    let should_print = if repeated { count > 1 } else { count == 1 };
1125    if should_print {
1126        if prev_end < data.len() && data[prev_end] == term {
1127            writer.write_all(&data[prev_start..prev_end + 1])?;
1128        } else {
1129            writer.write_all(&data[prev_start..prev_end])?;
1130            writer.write_all(&[term])?;
1131        }
1132    }
1133
1134    Ok(())
1135}
1136
1137/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1138fn process_count_ci_singlepass(
1139    data: &[u8],
1140    writer: &mut impl Write,
1141    config: &UniqConfig,
1142    term: u8,
1143) -> io::Result<()> {
1144    let first_term = match memchr::memchr(term, data) {
1145        Some(pos) => pos,
1146        None => {
1147            let should_print = match config.mode {
1148                OutputMode::Default => true,
1149                OutputMode::RepeatedOnly => false,
1150                OutputMode::UniqueOnly => true,
1151                _ => true,
1152            };
1153            if should_print {
1154                write_count_line(writer, 1, data, term)?;
1155            }
1156            return Ok(());
1157        }
1158    };
1159
1160    let mut prev_start: usize = 0;
1161    let mut prev_end: usize = first_term;
1162    let mut count: u64 = 1;
1163    let mut cur_start = first_term + 1;
1164
1165    while cur_start < data.len() {
1166        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1167            Some(offset) => cur_start + offset,
1168            None => data.len(),
1169        };
1170
1171        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1172            count += 1;
1173        } else {
1174            let should_print = match config.mode {
1175                OutputMode::Default => true,
1176                OutputMode::RepeatedOnly => count > 1,
1177                OutputMode::UniqueOnly => count == 1,
1178                _ => true,
1179            };
1180            if should_print {
1181                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1182            }
1183            prev_start = cur_start;
1184            prev_end = cur_end;
1185            count = 1;
1186        }
1187
1188        if cur_end < data.len() {
1189            cur_start = cur_end + 1;
1190        } else {
1191            break;
1192        }
1193    }
1194
1195    let should_print = match config.mode {
1196        OutputMode::Default => true,
1197        OutputMode::RepeatedOnly => count > 1,
1198        OutputMode::UniqueOnly => count == 1,
1199        _ => true,
1200    };
1201    if should_print {
1202        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1203    }
1204
1205    Ok(())
1206}
1207
1208/// Output a group for standard modes (bytes path).
1209#[inline(always)]
1210fn output_group_bytes(
1211    writer: &mut impl Write,
1212    content: &[u8],
1213    full: &[u8],
1214    count: u64,
1215    config: &UniqConfig,
1216    term: u8,
1217) -> io::Result<()> {
1218    let should_print = match config.mode {
1219        OutputMode::Default => true,
1220        OutputMode::RepeatedOnly => count > 1,
1221        OutputMode::UniqueOnly => count == 1,
1222        _ => true,
1223    };
1224
1225    if should_print {
1226        if config.count {
1227            write_count_line(writer, count, content, term)?;
1228        } else {
1229            writer.write_all(full)?;
1230            // Add terminator if the original line didn't have one
1231            if full.len() == content.len() {
1232                writer.write_all(&[term])?;
1233            }
1234        }
1235    }
1236
1237    Ok(())
1238}
1239
1240/// Process --all-repeated / -D mode on byte slices.
1241fn process_all_repeated_bytes(
1242    data: &[u8],
1243    writer: &mut impl Write,
1244    config: &UniqConfig,
1245    method: AllRepeatedMethod,
1246    term: u8,
1247) -> io::Result<()> {
1248    let mut lines = LineIter::new(data, term);
1249
1250    let first = match lines.next() {
1251        Some(v) => v,
1252        None => return Ok(()),
1253    };
1254
1255    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1256    // For all-repeated we need to buffer group lines since we only print if count > 1
1257    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1258    group_lines.push(first);
1259    let mut first_group_printed = false;
1260
1261    let fast = !needs_key_extraction(config) && !config.ignore_case;
1262
1263    for (cur_content, cur_full) in lines {
1264        let prev_content = group_lines.last().unwrap().0;
1265        let equal = if fast {
1266            lines_equal_fast(prev_content, cur_content)
1267        } else {
1268            lines_equal(prev_content, cur_content, config)
1269        };
1270
1271        if equal {
1272            group_lines.push((cur_content, cur_full));
1273        } else {
1274            // Flush group
1275            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1276            group_lines.clear();
1277            group_lines.push((cur_content, cur_full));
1278        }
1279    }
1280
1281    // Flush last group
1282    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1283
1284    Ok(())
1285}
1286
1287/// Flush a group for --all-repeated mode (bytes path).
1288fn flush_all_repeated_bytes(
1289    writer: &mut impl Write,
1290    group: &[(&[u8], &[u8])],
1291    method: AllRepeatedMethod,
1292    first_group_printed: &mut bool,
1293    term: u8,
1294) -> io::Result<()> {
1295    if group.len() <= 1 {
1296        return Ok(()); // Not a duplicate group
1297    }
1298
1299    match method {
1300        AllRepeatedMethod::Prepend => {
1301            writer.write_all(&[term])?;
1302        }
1303        AllRepeatedMethod::Separate => {
1304            if *first_group_printed {
1305                writer.write_all(&[term])?;
1306            }
1307        }
1308        AllRepeatedMethod::None => {}
1309    }
1310
1311    for &(content, full) in group {
1312        writer.write_all(full)?;
1313        if full.len() == content.len() {
1314            writer.write_all(&[term])?;
1315        }
1316    }
1317
1318    *first_group_printed = true;
1319    Ok(())
1320}
1321
1322/// Process --group mode on byte slices.
1323fn process_group_bytes(
1324    data: &[u8],
1325    writer: &mut impl Write,
1326    config: &UniqConfig,
1327    method: GroupMethod,
1328    term: u8,
1329) -> io::Result<()> {
1330    let mut lines = LineIter::new(data, term);
1331
1332    let (prev_content, prev_full) = match lines.next() {
1333        Some(v) => v,
1334        None => return Ok(()),
1335    };
1336
1337    // Prepend/Both: separator before first group
1338    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1339        writer.write_all(&[term])?;
1340    }
1341
1342    // Write first line
1343    writer.write_all(prev_full)?;
1344    if prev_full.len() == prev_content.len() {
1345        writer.write_all(&[term])?;
1346    }
1347
1348    let mut prev_content = prev_content;
1349    let fast = !needs_key_extraction(config) && !config.ignore_case;
1350
1351    for (cur_content, cur_full) in lines {
1352        let equal = if fast {
1353            lines_equal_fast(prev_content, cur_content)
1354        } else {
1355            lines_equal(prev_content, cur_content, config)
1356        };
1357
1358        if !equal {
1359            // New group — write separator
1360            writer.write_all(&[term])?;
1361        }
1362
1363        writer.write_all(cur_full)?;
1364        if cur_full.len() == cur_content.len() {
1365            writer.write_all(&[term])?;
1366        }
1367
1368        prev_content = cur_content;
1369    }
1370
1371    // Append/Both: separator after last group
1372    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1373        writer.write_all(&[term])?;
1374    }
1375
1376    Ok(())
1377}
1378
1379// ============================================================================
1380// Streaming processing (for stdin / pipe input)
1381// ============================================================================
1382
1383/// Main streaming uniq processor.
1384/// Reads from `input`, writes to `output`.
1385pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
1386    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
1387    let mut writer = BufWriter::with_capacity(32 * 1024 * 1024, output);
1388    let term = if config.zero_terminated { b'\0' } else { b'\n' };
1389
1390    match config.mode {
1391        OutputMode::Group(method) => {
1392            process_group_stream(reader, &mut writer, config, method, term)?;
1393        }
1394        OutputMode::AllRepeated(method) => {
1395            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
1396        }
1397        _ => {
1398            process_standard_stream(reader, &mut writer, config, term)?;
1399        }
1400    }
1401
1402    writer.flush()?;
1403    Ok(())
1404}
1405
1406/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
1407fn process_standard_stream<R: BufRead, W: Write>(
1408    mut reader: R,
1409    writer: &mut W,
1410    config: &UniqConfig,
1411    term: u8,
1412) -> io::Result<()> {
1413    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1414    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1415
1416    // Read first line
1417    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1418        return Ok(()); // empty input
1419    }
1420    let mut count: u64 = 1;
1421
1422    loop {
1423        current_line.clear();
1424        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1425
1426        if bytes_read == 0 {
1427            // End of input — output the last group
1428            output_group_stream(writer, &prev_line, count, config, term)?;
1429            break;
1430        }
1431
1432        if compare_lines_stream(&prev_line, &current_line, config, term) {
1433            count += 1;
1434        } else {
1435            output_group_stream(writer, &prev_line, count, config, term)?;
1436            std::mem::swap(&mut prev_line, &mut current_line);
1437            count = 1;
1438        }
1439    }
1440
1441    Ok(())
1442}
1443
1444/// Compare two lines (with terminators) in streaming mode.
1445#[inline(always)]
1446fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
1447    let a_stripped = strip_term(a, term);
1448    let b_stripped = strip_term(b, term);
1449    lines_equal(a_stripped, b_stripped, config)
1450}
1451
1452/// Strip terminator from end of line.
1453#[inline(always)]
1454fn strip_term(line: &[u8], term: u8) -> &[u8] {
1455    if line.last() == Some(&term) {
1456        &line[..line.len() - 1]
1457    } else {
1458        line
1459    }
1460}
1461
1462/// Output a group in streaming mode.
1463#[inline(always)]
1464fn output_group_stream(
1465    writer: &mut impl Write,
1466    line: &[u8],
1467    count: u64,
1468    config: &UniqConfig,
1469    term: u8,
1470) -> io::Result<()> {
1471    let should_print = match config.mode {
1472        OutputMode::Default => true,
1473        OutputMode::RepeatedOnly => count > 1,
1474        OutputMode::UniqueOnly => count == 1,
1475        _ => true,
1476    };
1477
1478    if should_print {
1479        let content = strip_term(line, term);
1480        if config.count {
1481            write_count_line(writer, count, content, term)?;
1482        } else {
1483            writer.write_all(content)?;
1484            writer.write_all(&[term])?;
1485        }
1486    }
1487
1488    Ok(())
1489}
1490
1491/// Process --all-repeated / -D mode (streaming).
1492fn process_all_repeated_stream<R: BufRead, W: Write>(
1493    mut reader: R,
1494    writer: &mut W,
1495    config: &UniqConfig,
1496    method: AllRepeatedMethod,
1497    term: u8,
1498) -> io::Result<()> {
1499    let mut group: Vec<Vec<u8>> = Vec::new();
1500    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1501    let mut first_group_printed = false;
1502
1503    current_line.clear();
1504    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
1505        return Ok(());
1506    }
1507    group.push(current_line.clone());
1508
1509    loop {
1510        current_line.clear();
1511        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1512
1513        if bytes_read == 0 {
1514            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1515            break;
1516        }
1517
1518        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
1519            group.push(current_line.clone());
1520        } else {
1521            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1522            group.clear();
1523            group.push(current_line.clone());
1524        }
1525    }
1526
1527    Ok(())
1528}
1529
1530/// Flush a group for --all-repeated mode (streaming).
1531fn flush_all_repeated_stream(
1532    writer: &mut impl Write,
1533    group: &[Vec<u8>],
1534    method: AllRepeatedMethod,
1535    first_group_printed: &mut bool,
1536    term: u8,
1537) -> io::Result<()> {
1538    if group.len() <= 1 {
1539        return Ok(());
1540    }
1541
1542    match method {
1543        AllRepeatedMethod::Prepend => {
1544            writer.write_all(&[term])?;
1545        }
1546        AllRepeatedMethod::Separate => {
1547            if *first_group_printed {
1548                writer.write_all(&[term])?;
1549            }
1550        }
1551        AllRepeatedMethod::None => {}
1552    }
1553
1554    for line in group {
1555        let content = strip_term(line, term);
1556        writer.write_all(content)?;
1557        writer.write_all(&[term])?;
1558    }
1559
1560    *first_group_printed = true;
1561    Ok(())
1562}
1563
1564/// Process --group mode (streaming).
1565fn process_group_stream<R: BufRead, W: Write>(
1566    mut reader: R,
1567    writer: &mut W,
1568    config: &UniqConfig,
1569    method: GroupMethod,
1570    term: u8,
1571) -> io::Result<()> {
1572    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1573    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1574
1575    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1576        return Ok(());
1577    }
1578
1579    // Prepend/Both: separator before first group
1580    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1581        writer.write_all(&[term])?;
1582    }
1583
1584    let content = strip_term(&prev_line, term);
1585    writer.write_all(content)?;
1586    writer.write_all(&[term])?;
1587
1588    loop {
1589        current_line.clear();
1590        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1591
1592        if bytes_read == 0 {
1593            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1594                writer.write_all(&[term])?;
1595            }
1596            break;
1597        }
1598
1599        if !compare_lines_stream(&prev_line, &current_line, config, term) {
1600            writer.write_all(&[term])?;
1601        }
1602
1603        let content = strip_term(&current_line, term);
1604        writer.write_all(content)?;
1605        writer.write_all(&[term])?;
1606
1607        std::mem::swap(&mut prev_line, &mut current_line);
1608    }
1609
1610    Ok(())
1611}
1612
1613/// Read a line terminated by the given byte (newline or NUL).
1614/// Returns number of bytes read (0 = EOF).
1615#[inline(always)]
1616fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1617    reader.read_until(term, buf)
1618}