Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// Write all IoSlices to the writer, handling partial writes correctly.
10fn write_all_vectored(writer: &mut impl Write, slices: &[io::IoSlice<'_>]) -> io::Result<()> {
11    let n = writer.write_vectored(slices)?;
12    let expected: usize = slices.iter().map(|s| s.len()).sum();
13    if n >= expected {
14        return Ok(());
15    }
16    if n == 0 && expected > 0 {
17        return Err(io::Error::new(
18            io::ErrorKind::WriteZero,
19            "write_vectored returned 0",
20        ));
21    }
22    // Slow path: partial write — fall back to write_all per remaining slice.
23    let mut consumed = n;
24    for slice in slices {
25        if consumed == 0 {
26            writer.write_all(slice)?;
27        } else if consumed >= slice.len() {
28            consumed -= slice.len();
29        } else {
30            writer.write_all(&slice[consumed..])?;
31            consumed = 0;
32        }
33    }
34    Ok(())
35}
36
37/// How to delimit groups when using --all-repeated
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum AllRepeatedMethod {
40    None,
41    Prepend,
42    Separate,
43}
44
45/// How to delimit groups when using --group
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum GroupMethod {
48    Separate,
49    Prepend,
50    Append,
51    Both,
52}
53
54/// Output mode for uniq
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum OutputMode {
57    /// Default: print unique lines and first of each duplicate group
58    Default,
59    /// -d: print only first line of duplicate groups
60    RepeatedOnly,
61    /// -D / --all-repeated: print ALL duplicate lines
62    AllRepeated(AllRepeatedMethod),
63    /// -u: print only lines that are NOT duplicated
64    UniqueOnly,
65    /// --group: show all items with group separators
66    Group(GroupMethod),
67}
68
69/// Configuration for uniq processing
70#[derive(Debug, Clone)]
71pub struct UniqConfig {
72    pub mode: OutputMode,
73    pub count: bool,
74    pub ignore_case: bool,
75    pub skip_fields: usize,
76    pub skip_chars: usize,
77    pub check_chars: Option<usize>,
78    pub zero_terminated: bool,
79}
80
81impl Default for UniqConfig {
82    fn default() -> Self {
83        Self {
84            mode: OutputMode::Default,
85            count: false,
86            ignore_case: false,
87            skip_fields: 0,
88            skip_chars: 0,
89            check_chars: None,
90            zero_terminated: false,
91        }
92    }
93}
94
95/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
96/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
97#[inline(always)]
98fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
99    let mut start = 0;
100    let len = line.len();
101
102    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
103    for _ in 0..config.skip_fields {
104        // Skip blanks (space and tab)
105        while start < len && (line[start] == b' ' || line[start] == b'\t') {
106            start += 1;
107        }
108        // Skip non-blanks (field content)
109        while start < len && line[start] != b' ' && line[start] != b'\t' {
110            start += 1;
111        }
112    }
113
114    // Skip N characters
115    if config.skip_chars > 0 {
116        let remaining = len - start;
117        let skip = config.skip_chars.min(remaining);
118        start += skip;
119    }
120
121    let slice = &line[start..];
122
123    // Limit comparison to N characters
124    if let Some(w) = config.check_chars {
125        if w < slice.len() {
126            return &slice[..w];
127        }
128    }
129
130    slice
131}
132
133/// Compare two lines (without terminators) using the config's comparison rules.
134#[inline(always)]
135fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
136    let sa = get_compare_slice(a, config);
137    let sb = get_compare_slice(b, config);
138
139    if config.ignore_case {
140        sa.eq_ignore_ascii_case(sb)
141    } else {
142        sa == sb
143    }
144}
145
146/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
147/// Uses length check + 8-byte prefix rejection before full comparison.
148#[inline(always)]
149fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
150    let alen = a.len();
151    if alen != b.len() {
152        return false;
153    }
154    if alen == 0 {
155        return true;
156    }
157    a.eq_ignore_ascii_case(b)
158}
159
160/// Check if config requires field/char skipping or char limiting.
161#[inline(always)]
162fn needs_key_extraction(config: &UniqConfig) -> bool {
163    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
164}
165
166/// Fast path comparison: no field/char extraction needed, no case folding.
167/// Uses pointer+length equality shortcut and multi-word prefix rejection.
168/// For short lines (<= 32 bytes, common in many-dups data), avoids the
169/// full memcmp call overhead by doing direct word comparisons.
170/// For medium lines (33-256 bytes), uses a tight u64 loop covering the
171/// full line without falling through to memcmp.
172#[inline(always)]
173fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
174    let alen = a.len();
175    if alen != b.len() {
176        return false;
177    }
178    if alen == 0 {
179        return true;
180    }
181    // Short-line fast path: compare via word loads to avoid memcmp call overhead
182    if alen <= 8 {
183        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
184        return a == b;
185    }
186    unsafe {
187        let ap = a.as_ptr();
188        let bp = b.as_ptr();
189        // 8-byte prefix check: reject most non-equal lines without full memcmp
190        let a8 = (ap as *const u64).read_unaligned();
191        let b8 = (bp as *const u64).read_unaligned();
192        if a8 != b8 {
193            return false;
194        }
195        // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
196        if alen <= 16 {
197            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
198            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
199            return a_tail == b_tail;
200        }
201        // For 17-32 bytes: check first 16 + last 16 (overlapping) to avoid memcmp
202        if alen <= 32 {
203            let a16 = (ap.add(8) as *const u64).read_unaligned();
204            let b16 = (bp.add(8) as *const u64).read_unaligned();
205            if a16 != b16 {
206                return false;
207            }
208            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
209            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
210            return a_tail == b_tail;
211        }
212        // For 33-256 bytes: tight u64 loop covering the full line.
213        // Compare 32 bytes per iteration (4 u64 loads), then handle tail.
214        // This avoids the function call overhead of memcmp for medium lines.
215        if alen <= 256 {
216            let mut off = 8usize; // first 8 bytes already compared
217            // Compare 32 bytes at a time
218            while off + 32 <= alen {
219                let a0 = (ap.add(off) as *const u64).read_unaligned();
220                let b0 = (bp.add(off) as *const u64).read_unaligned();
221                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
222                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
223                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
224                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
225                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
226                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
227                // XOR all pairs and OR together: zero if all equal
228                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
229                    return false;
230                }
231                off += 32;
232            }
233            // Compare remaining 8 bytes at a time
234            while off + 8 <= alen {
235                let aw = (ap.add(off) as *const u64).read_unaligned();
236                let bw = (bp.add(off) as *const u64).read_unaligned();
237                if aw != bw {
238                    return false;
239                }
240                off += 8;
241            }
242            // Compare tail (overlapping last 8 bytes)
243            if off < alen {
244                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
245                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
246                return a_tail == b_tail;
247            }
248            return true;
249        }
250    }
251    // Longer lines (>256): prefix passed, fall through to full memcmp
252    a == b
253}
254
255/// Compare two equal-length lines starting from byte 8.
256/// Caller has already checked: lengths are equal, both >= 9 bytes, first 8 bytes match.
257/// This avoids redundant checks when the calling loop already did prefix rejection.
258#[inline(always)]
259fn lines_equal_after_prefix(a: &[u8], b: &[u8]) -> bool {
260    let alen = a.len();
261    debug_assert!(alen == b.len());
262    debug_assert!(alen > 8);
263    unsafe {
264        let ap = a.as_ptr();
265        let bp = b.as_ptr();
266        // Check last 8 bytes first (overlapping for 9-16 byte lines)
267        if alen <= 16 {
268            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
269            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
270            return a_tail == b_tail;
271        }
272        if alen <= 32 {
273            let a16 = (ap.add(8) as *const u64).read_unaligned();
274            let b16 = (bp.add(8) as *const u64).read_unaligned();
275            if a16 != b16 {
276                return false;
277            }
278            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
279            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
280            return a_tail == b_tail;
281        }
282        if alen <= 256 {
283            let mut off = 8usize;
284            while off + 32 <= alen {
285                let a0 = (ap.add(off) as *const u64).read_unaligned();
286                let b0 = (bp.add(off) as *const u64).read_unaligned();
287                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
288                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
289                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
290                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
291                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
292                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
293                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
294                    return false;
295                }
296                off += 32;
297            }
298            while off + 8 <= alen {
299                let aw = (ap.add(off) as *const u64).read_unaligned();
300                let bw = (bp.add(off) as *const u64).read_unaligned();
301                if aw != bw {
302                    return false;
303                }
304                off += 8;
305            }
306            if off < alen {
307                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
308                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
309                return a_tail == b_tail;
310            }
311            return true;
312        }
313    }
314    // >256 bytes: use memcmp via slice comparison (skipping the already-compared prefix)
315    a[8..] == b[8..]
316}
317
318/// Write a count-prefixed line in GNU uniq format.
319/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
320/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
321///
322/// Optimized with lookup table for counts 1-9 (most common case in many-dups data)
323/// and fast-path for counts < 10M (always fits in 7 chars, no copy_within needed).
324#[inline(always)]
325fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
326    // Ultra-fast path for common small counts: pre-built prefix strings
327    // Avoids all the itoa/copy_within overhead for the most common case.
328    if count <= 9 {
329        // "      N " where N is 1-9 (7 chars + space = 8 bytes prefix)
330        let prefix: &[u8] = match count {
331            1 => b"      1 ",
332            2 => b"      2 ",
333            3 => b"      3 ",
334            4 => b"      4 ",
335            5 => b"      5 ",
336            6 => b"      6 ",
337            7 => b"      7 ",
338            8 => b"      8 ",
339            9 => b"      9 ",
340            _ => unreachable!(),
341        };
342        let total = 8 + line.len() + 1;
343        if total <= 256 {
344            let mut buf = [0u8; 256];
345            unsafe {
346                std::ptr::copy_nonoverlapping(prefix.as_ptr(), buf.as_mut_ptr(), 8);
347                std::ptr::copy_nonoverlapping(line.as_ptr(), buf.as_mut_ptr().add(8), line.len());
348                *buf.as_mut_ptr().add(8 + line.len()) = term;
349            }
350            return out.write_all(&buf[..total]);
351        } else {
352            out.write_all(prefix)?;
353            out.write_all(line)?;
354            return out.write_all(&[term]);
355        }
356    }
357
358    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
359    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
360    let digits = itoa_right_aligned_into(&mut prefix, count);
361    let width = digits.max(7); // minimum 7 chars
362    let prefix_len = width + 1; // +1 for trailing space
363    prefix[width] = b' ';
364
365    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
366    let total = prefix_len + line.len() + 1;
367    if total <= 256 {
368        let mut buf = [0u8; 256];
369        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
370        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
371        buf[prefix_len + line.len()] = term;
372        out.write_all(&buf[..total])
373    } else {
374        out.write_all(&prefix[..prefix_len])?;
375        out.write_all(line)?;
376        out.write_all(&[term])
377    }
378}
379
380/// Write u64 decimal right-aligned into prefix buffer.
381/// Buffer is pre-filled with spaces. Returns number of digits written.
382#[inline(always)]
383fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
384    if val == 0 {
385        buf[6] = b'0';
386        return 7; // 6 spaces + '0' = 7 chars
387    }
388    // Write digits right-to-left from position 27 (leaving room for trailing space)
389    let mut pos = 27;
390    while val > 0 {
391        pos -= 1;
392        buf[pos] = b'0' + (val % 10) as u8;
393        val /= 10;
394    }
395    let num_digits = 27 - pos;
396    if num_digits >= 7 {
397        // Number is wide enough, shift to front
398        buf.copy_within(pos..27, 0);
399        num_digits
400    } else {
401        // Right-align in 7-char field: spaces then digits
402        let pad = 7 - num_digits;
403        buf.copy_within(pos..27, pad);
404        // buf[0..pad] is already spaces from initialization
405        7
406    }
407}
408
409// ============================================================================
410// High-performance mmap-based processing (for byte slices, zero-copy)
411// ============================================================================
412
413/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
414pub fn process_uniq_bytes(
415    data: &[u8],
416    mut output: impl Write,
417    config: &UniqConfig,
418) -> io::Result<()> {
419    let term = if config.zero_terminated { b'\0' } else { b'\n' };
420
421    // Zero-copy fast path: bypass BufWriter for standard modes with IoSlice output.
422    // Default mode: writes contiguous runs directly from mmap data via writev.
423    // Filter modes (-d/-u): IoSlice batching (512 lines per writev).
424    // Count mode (-c): IoSlice batching (340 groups per writev, prefix arena + mmap data).
425    // Without BufWriter, writes go directly via writev/vmsplice (zero-copy for data slices).
426    let fast = !needs_key_extraction(config) && !config.ignore_case;
427    if fast
428        && matches!(
429            config.mode,
430            OutputMode::Default | OutputMode::RepeatedOnly | OutputMode::UniqueOnly
431        )
432    {
433        return process_standard_bytes(data, &mut output, config, term);
434    }
435
436    // General path with BufWriter for modes that need formatting/buffering.
437    // 16MB buffer — optimal for L3 cache utilization on modern CPUs.
438    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
439
440    match config.mode {
441        OutputMode::Group(method) => {
442            process_group_bytes(data, &mut writer, config, method, term)?;
443        }
444        OutputMode::AllRepeated(method) => {
445            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
446        }
447        _ => {
448            process_standard_bytes(data, &mut writer, config, term)?;
449        }
450    }
451
452    writer.flush()?;
453    Ok(())
454}
455
456/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
457/// Uses memchr for SIMD-accelerated line boundary detection.
458struct LineIter<'a> {
459    data: &'a [u8],
460    pos: usize,
461    term: u8,
462}
463
464impl<'a> LineIter<'a> {
465    #[inline(always)]
466    fn new(data: &'a [u8], term: u8) -> Self {
467        Self { data, pos: 0, term }
468    }
469}
470
471impl<'a> Iterator for LineIter<'a> {
472    /// (line content without terminator, full line including terminator for output)
473    type Item = (&'a [u8], &'a [u8]);
474
475    #[inline(always)]
476    fn next(&mut self) -> Option<Self::Item> {
477        if self.pos >= self.data.len() {
478            return None;
479        }
480
481        let remaining = &self.data[self.pos..];
482        match memchr::memchr(self.term, remaining) {
483            Some(idx) => {
484                let line_start = self.pos;
485                let line_end = self.pos + idx; // without terminator
486                let full_end = self.pos + idx + 1; // with terminator
487                self.pos = full_end;
488                Some((
489                    &self.data[line_start..line_end],
490                    &self.data[line_start..full_end],
491                ))
492            }
493            None => {
494                // Last line without terminator
495                let line_start = self.pos;
496                self.pos = self.data.len();
497                let line = &self.data[line_start..];
498                Some((line, line))
499            }
500        }
501    }
502}
503
504/// Get line content (without terminator) from pre-computed positions.
505/// `content_end` is the end of actual content (excludes trailing terminator if present).
506#[inline(always)]
507fn line_content_at<'a>(
508    data: &'a [u8],
509    line_starts: &[usize],
510    idx: usize,
511    content_end: usize,
512) -> &'a [u8] {
513    let start = line_starts[idx];
514    let end = if idx + 1 < line_starts.len() {
515        line_starts[idx + 1] - 1 // exclude terminator
516    } else {
517        content_end // last line: pre-computed to exclude trailing terminator
518    };
519    &data[start..end]
520}
521
522/// Get full line (with terminator) from pre-computed positions.
523#[inline(always)]
524fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
525    let start = line_starts[idx];
526    let end = if idx + 1 < line_starts.len() {
527        line_starts[idx + 1] // include terminator
528    } else {
529        data.len()
530    };
531    &data[start..end]
532}
533
534/// Linear scan for the end of a duplicate group.
535/// Returns the index of the first line that differs from line_starts[group_start].
536/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
537/// equal lines can appear in non-adjacent groups separated by different lines.
538/// Caches key length for fast length-mismatch rejection.
539#[inline]
540fn linear_scan_group_end(
541    data: &[u8],
542    line_starts: &[usize],
543    group_start: usize,
544    num_lines: usize,
545    content_end: usize,
546) -> usize {
547    let key = line_content_at(data, line_starts, group_start, content_end);
548    let key_len = key.len();
549    let mut i = group_start + 1;
550    while i < num_lines {
551        let candidate = line_content_at(data, line_starts, i, content_end);
552        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
553            return i;
554        }
555        i += 1;
556    }
557    i
558}
559
560/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
561/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
562/// General path: pre-computed line positions with binary search for groups.
563fn process_standard_bytes(
564    data: &[u8],
565    writer: &mut impl Write,
566    config: &UniqConfig,
567    term: u8,
568) -> io::Result<()> {
569    if data.is_empty() {
570        return Ok(());
571    }
572
573    let fast = !needs_key_extraction(config) && !config.ignore_case;
574    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
575
576    // Ultra-fast path: default mode, no count, no key extraction.
577    // Single-pass: scan with memchr, compare adjacent lines inline.
578    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
579    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
580        return process_default_fast_singlepass(data, writer, term);
581    }
582
583    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
584    if fast
585        && !config.count
586        && matches!(
587            config.mode,
588            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
589        )
590    {
591        return process_filter_fast_singlepass(data, writer, config, term);
592    }
593
594    // Ultra-fast path: count mode with no key extraction.
595    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
596    // Avoids the line_starts Vec allocation (20MB+ for large files).
597    if fast && config.count {
598        return process_count_fast_singlepass(data, writer, config, term);
599    }
600
601    // Fast path for case-insensitive (-i) mode with no key extraction.
602    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
603    // Avoids the general path's line_starts Vec allocation.
604    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
605        return process_default_ci_singlepass(data, writer, term);
606    }
607
608    if fast_ci
609        && !config.count
610        && matches!(
611            config.mode,
612            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
613        )
614    {
615        return process_filter_ci_singlepass(data, writer, config, term);
616    }
617
618    if fast_ci && config.count {
619        return process_count_ci_singlepass(data, writer, config, term);
620    }
621
622    // General path: pre-computed line positions for binary search on groups
623    let estimated_lines = (data.len() / 40).max(64);
624    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
625    line_starts.push(0);
626    for pos in memchr::memchr_iter(term, data) {
627        if pos + 1 < data.len() {
628            line_starts.push(pos + 1);
629        }
630    }
631    let num_lines = line_starts.len();
632    if num_lines == 0 {
633        return Ok(());
634    }
635
636    // Pre-compute content end: if data ends with terminator, exclude it for last line
637    let content_end = if data.last() == Some(&term) {
638        data.len() - 1
639    } else {
640        data.len()
641    };
642
643    // Ultra-fast path: default mode, no count, no key extraction
644    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
645        // Write first line
646        let first_full = line_full_at(data, &line_starts, 0);
647        let first_content = line_content_at(data, &line_starts, 0, content_end);
648        write_all_raw(writer, first_full)?;
649        if first_full.len() == first_content.len() {
650            writer.write_all(&[term])?;
651        }
652
653        let mut i = 1;
654        while i < num_lines {
655            let prev = line_content_at(data, &line_starts, i - 1, content_end);
656            let cur = line_content_at(data, &line_starts, i, content_end);
657
658            if lines_equal_fast(prev, cur) {
659                // Duplicate detected — linear scan for end of group
660                let group_end =
661                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
662                i = group_end;
663                continue;
664            }
665
666            // Unique line — write it
667            let cur_full = line_full_at(data, &line_starts, i);
668            write_all_raw(writer, cur_full)?;
669            if cur_full.len() == cur.len() {
670                writer.write_all(&[term])?;
671            }
672            i += 1;
673        }
674        return Ok(());
675    }
676
677    // General path with count tracking
678    let mut i = 0;
679    while i < num_lines {
680        let content = line_content_at(data, &line_starts, i, content_end);
681        let full = line_full_at(data, &line_starts, i);
682
683        let group_end = if fast
684            && i + 1 < num_lines
685            && lines_equal_fast(
686                content,
687                line_content_at(data, &line_starts, i + 1, content_end),
688            ) {
689            // Duplicate detected — linear scan for end
690            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
691        } else if !fast
692            && i + 1 < num_lines
693            && lines_equal(
694                content,
695                line_content_at(data, &line_starts, i + 1, content_end),
696                config,
697            )
698        {
699            // Slow path linear scan with key extraction
700            let mut j = i + 2;
701            while j < num_lines {
702                if !lines_equal(
703                    content,
704                    line_content_at(data, &line_starts, j, content_end),
705                    config,
706                ) {
707                    break;
708                }
709                j += 1;
710            }
711            j
712        } else {
713            i + 1
714        };
715
716        let count = (group_end - i) as u64;
717        output_group_bytes(writer, content, full, count, config, term)?;
718        i = group_end;
719    }
720
721    Ok(())
722}
723
724/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
725/// No pre-computed positions, no binary search, no Vec allocation.
726/// Outputs each line that differs from the previous.
727///
728/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
729/// independently, then cross-chunk boundaries are resolved.
730fn process_default_fast_singlepass(
731    data: &[u8],
732    writer: &mut impl Write,
733    term: u8,
734) -> io::Result<()> {
735    // Parallel path for large files — kick in at 4MB.
736    // Lower thresholds (e.g. 2MB) hurt performance on 10MB files because
737    // the parallel overhead dominates for smaller chunks.
738    if data.len() >= 4 * 1024 * 1024 {
739        return process_default_parallel(data, writer, term);
740    }
741
742    process_default_sequential(data, writer, term)
743}
744
745/// Sequential single-pass dedup with zero-copy output.
746/// Instead of copying data to a buffer, tracks contiguous output runs and writes
747/// directly from the original data. For all-unique data, this is a single write_all.
748///
749/// Optimized for the "many duplicates" case: caches the previous line's length
750/// and first-8-byte prefix for fast rejection of non-duplicates without
751/// calling the full comparison function.
752///
753/// Uses raw pointer arithmetic throughout to avoid bounds checking in the hot loop.
754fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
755    let data_len = data.len();
756    let base = data.as_ptr();
757    let mut prev_start: usize = 0;
758
759    // Find end of first line
760    let first_end: usize = match memchr::memchr(term, data) {
761        Some(pos) => pos,
762        None => {
763            // Single line, no terminator
764            writer.write_all(data)?;
765            return writer.write_all(&[term]);
766        }
767    };
768
769    // Cache previous line metadata for fast comparison
770    let mut prev_len = first_end - prev_start;
771    let mut prev_prefix: u64 = if prev_len >= 8 {
772        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
773    } else {
774        0
775    };
776
777    // run_start tracks the beginning of the current contiguous output region.
778    // When a duplicate is found, we save the run as an IoSlice and skip the dup.
779    // Runs are batched and written with writev to reduce syscall overhead.
780    const BATCH: usize = 256;
781    let term_byte: [u8; 1] = [term];
782    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
783    let mut run_start: usize = 0;
784    let mut cur_start = first_end + 1;
785    let mut last_output_end = first_end + 1; // exclusive end including terminator
786
787    while cur_start < data_len {
788        // Speculative line-end detection: if the previous line had length L,
789        // check if data[cur_start + L] is the terminator. This avoids the
790        // memchr SIMD call for repetitive data where all lines have the same length.
791        // Falls back to memchr if the speculation is wrong.
792        let cur_end = {
793            let speculative = cur_start + prev_len;
794            if speculative < data_len && unsafe { *base.add(speculative) } == term {
795                speculative
796            } else {
797                match memchr::memchr(term, unsafe {
798                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
799                }) {
800                    Some(offset) => cur_start + offset,
801                    None => data_len,
802                }
803            }
804        };
805
806        let cur_len = cur_end - cur_start;
807
808        // Fast reject: if lengths differ, lines are definitely not equal.
809        // This branch structure is ordered by frequency: length mismatch is
810        // most common for unique data, prefix mismatch next, full compare last.
811        let is_dup = if cur_len != prev_len {
812            false
813        } else if cur_len == 0 {
814            true
815        } else if cur_len >= 8 {
816            // Compare cached 8-byte prefix first
817            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
818            if cur_prefix != prev_prefix {
819                false
820            } else if cur_len <= 8 {
821                true // prefix covers entire line
822            } else if cur_len <= 16 {
823                // Check last 8 bytes (overlapping)
824                unsafe {
825                    let a_tail =
826                        (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
827                    let b_tail = (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
828                    a_tail == b_tail
829                }
830            } else if cur_len <= 32 {
831                // Check bytes 8-16 and last 8 bytes
832                unsafe {
833                    let a16 = (base.add(prev_start + 8) as *const u64).read_unaligned();
834                    let b16 = (base.add(cur_start + 8) as *const u64).read_unaligned();
835                    if a16 != b16 {
836                        false
837                    } else {
838                        let a_tail =
839                            (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
840                        let b_tail =
841                            (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
842                        a_tail == b_tail
843                    }
844                }
845            } else if cur_len <= 256 {
846                // 33-256 bytes: tight u64 loop with XOR-OR batching.
847                // Compares 32 bytes per iteration (4 u64 loads), reducing
848                // branch mispredictions vs individual comparisons.
849                unsafe {
850                    let ap = base.add(prev_start);
851                    let bp = base.add(cur_start);
852                    let mut off = 8usize; // first 8 bytes already compared via prefix
853                    let mut eq = true;
854                    while off + 32 <= cur_len {
855                        let a0 = (ap.add(off) as *const u64).read_unaligned();
856                        let b0 = (bp.add(off) as *const u64).read_unaligned();
857                        let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
858                        let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
859                        let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
860                        let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
861                        let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
862                        let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
863                        if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
864                            eq = false;
865                            break;
866                        }
867                        off += 32;
868                    }
869                    if eq {
870                        while off + 8 <= cur_len {
871                            let aw = (ap.add(off) as *const u64).read_unaligned();
872                            let bw = (bp.add(off) as *const u64).read_unaligned();
873                            if aw != bw {
874                                eq = false;
875                                break;
876                            }
877                            off += 8;
878                        }
879                    }
880                    if eq && off < cur_len {
881                        let a_tail = (ap.add(cur_len - 8) as *const u64).read_unaligned();
882                        let b_tail = (bp.add(cur_len - 8) as *const u64).read_unaligned();
883                        eq = a_tail == b_tail;
884                    }
885                    eq
886                }
887            } else {
888                // For longer lines (>256), use unsafe slice comparison
889                unsafe {
890                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
891                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
892                    a == b
893                }
894            }
895        } else {
896            // Short line < 8 bytes — direct byte comparison
897            unsafe {
898                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
899                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
900                a == b
901            }
902        };
903
904        if is_dup {
905            // Duplicate — save the current run up to this line, then skip it
906            if run_start < cur_start {
907                slices.push(io::IoSlice::new(&data[run_start..cur_start]));
908                if slices.len() >= BATCH {
909                    write_all_vectored(writer, &slices)?;
910                    slices.clear();
911                }
912            }
913            // Start new run after this duplicate
914            run_start = if cur_end < data_len {
915                cur_end + 1
916            } else {
917                cur_end
918            };
919        } else {
920            // Different line — update cached comparison state
921            prev_start = cur_start;
922            prev_len = cur_len;
923            prev_prefix = if cur_len >= 8 {
924                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
925            } else {
926                0
927            };
928            last_output_end = if cur_end < data_len {
929                cur_end + 1
930            } else {
931                cur_end
932            };
933        }
934
935        if cur_end < data_len {
936            cur_start = cur_end + 1;
937        } else {
938            break;
939        }
940    }
941
942    // Flush remaining run
943    if run_start < data_len {
944        slices.push(io::IoSlice::new(
945            &data[run_start..last_output_end.max(run_start)],
946        ));
947    }
948
949    // Ensure trailing terminator
950    if data_len > 0 && unsafe { *base.add(data_len - 1) } != term {
951        slices.push(io::IoSlice::new(&term_byte));
952    }
953
954    if !slices.is_empty() {
955        write_all_vectored(writer, &slices)?;
956    }
957
958    Ok(())
959}
960
961/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
962/// positions in each chunk in parallel, then write output runs directly from
963/// the original data. No per-chunk buffer allocation needed.
964fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
965    use rayon::prelude::*;
966
967    let num_threads = rayon::current_num_threads().max(1);
968    let chunk_target = data.len() / num_threads;
969
970    // Find chunk boundaries aligned to line terminators
971    let mut boundaries = Vec::with_capacity(num_threads + 1);
972    boundaries.push(0usize);
973    for i in 1..num_threads {
974        let target = i * chunk_target;
975        if target >= data.len() {
976            break;
977        }
978        if let Some(p) = memchr::memchr(term, &data[target..]) {
979            let b = target + p + 1;
980            if b > *boundaries.last().unwrap() && b <= data.len() {
981                boundaries.push(b);
982            }
983        }
984    }
985    boundaries.push(data.len());
986
987    let n_chunks = boundaries.len() - 1;
988    if n_chunks <= 1 {
989        return process_default_sequential(data, writer, term);
990    }
991
992    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
993    struct ChunkResult {
994        /// Byte ranges in the original data to output (contiguous runs)
995        runs: Vec<(usize, usize)>,
996        /// First line in chunk (absolute offsets into data, content without term)
997        first_line_start: usize,
998        first_line_end: usize,
999        /// Last *output* line in chunk (content without term)
1000        last_line_start: usize,
1001        last_line_end: usize,
1002    }
1003
1004    let results: Vec<ChunkResult> = boundaries
1005        .windows(2)
1006        .collect::<Vec<_>>()
1007        .par_iter()
1008        .map(|w| {
1009            let chunk_start = w[0];
1010            let chunk_end = w[1];
1011            let chunk = &data[chunk_start..chunk_end];
1012
1013            let first_term = match memchr::memchr(term, chunk) {
1014                Some(pos) => pos,
1015                None => {
1016                    return ChunkResult {
1017                        runs: vec![(chunk_start, chunk_end)],
1018                        first_line_start: chunk_start,
1019                        first_line_end: chunk_end,
1020                        last_line_start: chunk_start,
1021                        last_line_end: chunk_end,
1022                    };
1023                }
1024            };
1025
1026            let first_line_start = chunk_start;
1027            let first_line_end = chunk_start + first_term;
1028
1029            let mut runs: Vec<(usize, usize)> = Vec::new();
1030            let mut run_start = chunk_start;
1031            let mut prev_start = 0usize;
1032            let mut _prev_end = first_term;
1033            let mut last_out_start = chunk_start;
1034            let mut last_out_end = first_line_end;
1035
1036            let mut prev_len = first_term;
1037            let chunk_base = chunk.as_ptr();
1038            let chunk_len = chunk.len();
1039            // Cache previous line's prefix for fast rejection
1040            let mut prev_prefix: u64 = if prev_len >= 8 {
1041                unsafe { (chunk_base as *const u64).read_unaligned() }
1042            } else {
1043                0
1044            };
1045            let mut cur_start = first_term + 1;
1046            while cur_start < chunk_len {
1047                // Speculative line-end: check if next line has same length
1048                let cur_end = {
1049                    let spec = cur_start + prev_len;
1050                    if spec < chunk_len && unsafe { *chunk_base.add(spec) } == term {
1051                        spec
1052                    } else {
1053                        match memchr::memchr(term, unsafe {
1054                            std::slice::from_raw_parts(
1055                                chunk_base.add(cur_start),
1056                                chunk_len - cur_start,
1057                            )
1058                        }) {
1059                            Some(offset) => cur_start + offset,
1060                            None => chunk_len,
1061                        }
1062                    }
1063                };
1064
1065                let cur_len = cur_end - cur_start;
1066                // Fast reject: length + prefix + full comparison
1067                let is_dup = if cur_len != prev_len {
1068                    false
1069                } else if cur_len == 0 {
1070                    true
1071                } else if cur_len >= 8 {
1072                    let cur_prefix =
1073                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() };
1074                    if cur_prefix != prev_prefix {
1075                        false
1076                    } else if cur_len <= 8 {
1077                        true
1078                    } else {
1079                        unsafe {
1080                            let a =
1081                                std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1082                            let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1083                            lines_equal_after_prefix(a, b)
1084                        }
1085                    }
1086                } else {
1087                    unsafe {
1088                        let a = std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1089                        let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1090                        a == b
1091                    }
1092                };
1093
1094                if is_dup {
1095                    // Duplicate — flush current run up to this line
1096                    let abs_cur = chunk_start + cur_start;
1097                    if run_start < abs_cur {
1098                        runs.push((run_start, abs_cur));
1099                    }
1100                    // New run starts after this duplicate
1101                    run_start = chunk_start
1102                        + if cur_end < chunk_len {
1103                            cur_end + 1
1104                        } else {
1105                            cur_end
1106                        };
1107                } else {
1108                    last_out_start = chunk_start + cur_start;
1109                    last_out_end = chunk_start + cur_end;
1110                    prev_len = cur_len;
1111                    prev_prefix = if cur_len >= 8 {
1112                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() }
1113                    } else {
1114                        0
1115                    };
1116                }
1117                prev_start = cur_start;
1118                _prev_end = cur_end;
1119
1120                if cur_end < chunk_len {
1121                    cur_start = cur_end + 1;
1122                } else {
1123                    break;
1124                }
1125            }
1126
1127            // Close final run
1128            if run_start < chunk_end {
1129                runs.push((run_start, chunk_end));
1130            }
1131
1132            ChunkResult {
1133                runs,
1134                first_line_start,
1135                first_line_end,
1136                last_line_start: last_out_start,
1137                last_line_end: last_out_end,
1138            }
1139        })
1140        .collect();
1141
1142    // Write results, adjusting cross-chunk boundaries.
1143    // Batch output runs via write_vectored to reduce syscall count.
1144    const BATCH: usize = 256;
1145    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
1146    for (i, result) in results.iter().enumerate() {
1147        let skip_first = if i > 0 {
1148            let prev = &results[i - 1];
1149            let prev_last = &data[prev.last_line_start..prev.last_line_end];
1150            let cur_first = &data[result.first_line_start..result.first_line_end];
1151            lines_equal_fast(prev_last, cur_first)
1152        } else {
1153            false
1154        };
1155
1156        let skip_end = if skip_first {
1157            // Skip bytes up to and including the first line's terminator
1158            result.first_line_end + 1
1159        } else {
1160            0
1161        };
1162
1163        for &(rs, re) in &result.runs {
1164            let actual_start = rs.max(skip_end);
1165            if actual_start < re {
1166                slices.push(io::IoSlice::new(&data[actual_start..re]));
1167                if slices.len() >= BATCH {
1168                    write_all_vectored(writer, &slices)?;
1169                    slices.clear();
1170                }
1171            }
1172        }
1173    }
1174    if !slices.is_empty() {
1175        write_all_vectored(writer, &slices)?;
1176    }
1177
1178    // Ensure trailing terminator
1179    if !data.is_empty() && *data.last().unwrap() != term {
1180        writer.write_all(&[term])?;
1181    }
1182
1183    Ok(())
1184}
1185
1186/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
1187/// Zero-copy: writes directly from mmap data through BufWriter.
1188/// Uses speculative line-end detection and 8-byte prefix caching for fast
1189/// duplicate detection without full memcmp.
1190fn process_filter_fast_singlepass(
1191    data: &[u8],
1192    writer: &mut impl Write,
1193    config: &UniqConfig,
1194    term: u8,
1195) -> io::Result<()> {
1196    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1197    let data_len = data.len();
1198    let base = data.as_ptr();
1199
1200    let first_term = match memchr::memchr(term, data) {
1201        Some(pos) => pos,
1202        None => {
1203            // Single line: unique (count=1)
1204            if !repeated {
1205                writer.write_all(data)?;
1206                writer.write_all(&[term])?;
1207            }
1208            return Ok(());
1209        }
1210    };
1211
1212    let mut prev_start: usize = 0;
1213    let mut prev_end: usize = first_term;
1214    let mut prev_len = prev_end;
1215    let mut prev_prefix: u64 = if prev_len >= 8 {
1216        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1217    } else {
1218        0
1219    };
1220    let mut count: u64 = 1;
1221    let mut cur_start = first_term + 1;
1222
1223    // Batch output using IoSlice write_vectored to reduce syscall overhead.
1224    // Each output line needs 2 slices: content + terminator.
1225    const BATCH: usize = 512;
1226    let term_slice: [u8; 1] = [term];
1227    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1228
1229    while cur_start < data_len {
1230        // Speculative line-end detection
1231        let cur_end = {
1232            let speculative = cur_start + prev_len;
1233            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1234                speculative
1235            } else {
1236                match memchr::memchr(term, unsafe {
1237                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1238                }) {
1239                    Some(offset) => cur_start + offset,
1240                    None => data_len,
1241                }
1242            }
1243        };
1244
1245        let cur_len = cur_end - cur_start;
1246
1247        // Fast reject using length + 8-byte prefix.
1248        // After prefix match, use lines_equal_after_prefix which skips
1249        // the already-checked length/prefix/empty checks.
1250        let is_dup = if cur_len != prev_len {
1251            false
1252        } else if cur_len == 0 {
1253            true
1254        } else if cur_len >= 8 {
1255            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1256            if cur_prefix != prev_prefix {
1257                false
1258            } else if cur_len <= 8 {
1259                true
1260            } else {
1261                unsafe {
1262                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1263                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1264                    lines_equal_after_prefix(a, b)
1265                }
1266            }
1267        } else {
1268            unsafe {
1269                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1270                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1271                a == b
1272            }
1273        };
1274
1275        if is_dup {
1276            count += 1;
1277        } else {
1278            let should_print = if repeated { count > 1 } else { count == 1 };
1279            if should_print {
1280                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1281                slices.push(io::IoSlice::new(&term_slice));
1282                if slices.len() >= BATCH * 2 {
1283                    write_all_vectored(writer, &slices)?;
1284                    slices.clear();
1285                }
1286            }
1287            prev_start = cur_start;
1288            prev_end = cur_end;
1289            prev_len = cur_len;
1290            prev_prefix = if cur_len >= 8 {
1291                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1292            } else {
1293                0
1294            };
1295            count = 1;
1296        }
1297
1298        if cur_end < data_len {
1299            cur_start = cur_end + 1;
1300        } else {
1301            break;
1302        }
1303    }
1304
1305    // Output last group
1306    let should_print = if repeated { count > 1 } else { count == 1 };
1307    if should_print {
1308        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1309        slices.push(io::IoSlice::new(&term_slice));
1310    }
1311    if !slices.is_empty() {
1312        write_all_vectored(writer, &slices)?;
1313    }
1314
1315    Ok(())
1316}
1317
1318/// Fast single-pass for count mode (-c) with all standard output modes.
1319/// Zero line_starts allocation: scans with memchr, counts groups inline,
1320/// and writes count-prefixed lines directly.
1321/// Uses cached length comparison for fast duplicate rejection.
1322/// Uses raw pointer arithmetic to avoid bounds checking.
1323///
1324/// Zero-copy output: uses writev (IoSlice) to write count prefixes from a
1325/// small arena + line content directly from mmap'd data + terminator bytes.
1326/// This avoids copying line content into an intermediate buffer entirely.
1327///
1328/// Optimizations:
1329/// - Speculative line-end detection: if all lines have the same length (common
1330///   in repetitive data), we can skip the memchr SIMD scan entirely by checking
1331///   if data[cur_start + prev_len] is the terminator.
1332/// - Cached 8-byte prefix rejection: avoids full comparison for most non-equal lines.
1333/// - IoSlice writev batching: eliminates memcpy of line content.
1334fn process_count_fast_singlepass(
1335    data: &[u8],
1336    writer: &mut impl Write,
1337    config: &UniqConfig,
1338    term: u8,
1339) -> io::Result<()> {
1340    let data_len = data.len();
1341    let base = data.as_ptr();
1342    let first_term = match memchr::memchr(term, data) {
1343        Some(pos) => pos,
1344        None => {
1345            // Single line: count=1
1346            let should_print = match config.mode {
1347                OutputMode::Default => true,
1348                OutputMode::RepeatedOnly => false,
1349                OutputMode::UniqueOnly => true,
1350                _ => true,
1351            };
1352            if should_print {
1353                write_count_line(writer, 1, data, term)?;
1354            }
1355            return Ok(());
1356        }
1357    };
1358
1359    let mut prev_start: usize = 0;
1360    let mut prev_end: usize = first_term;
1361    let mut prev_len = prev_end;
1362    let mut prev_prefix: u64 = if prev_len >= 8 {
1363        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1364    } else {
1365        0
1366    };
1367    let mut count: u64 = 1;
1368    let mut cur_start = first_term + 1;
1369
1370    // Zero-copy writev batching: accumulate groups as (prefix_offset, prefix_len,
1371    // line_start, line_end) tuples, with prefixes stored in a flat byte buffer.
1372    // Build IoSlice arrays at flush time to avoid borrow conflicts.
1373    // Line content points directly into mmap'd data — zero copy.
1374    const BATCH: usize = 340;
1375    const PREFIX_SLOT: usize = 28; // max prefix size per group
1376    let term_slice: [u8; 1] = [term];
1377    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1378    // Each group: (prefix_len, line_start_in_data, line_end_in_data)
1379    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1380
1381    while cur_start < data_len {
1382        let cur_end = {
1383            let speculative = cur_start + prev_len;
1384            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1385                speculative
1386            } else {
1387                match memchr::memchr(term, unsafe {
1388                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1389                }) {
1390                    Some(offset) => cur_start + offset,
1391                    None => data_len,
1392                }
1393            }
1394        };
1395
1396        let cur_len = cur_end - cur_start;
1397
1398        let is_dup = if cur_len != prev_len {
1399            false
1400        } else if cur_len == 0 {
1401            true
1402        } else if cur_len >= 8 {
1403            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1404            if cur_prefix != prev_prefix {
1405                false
1406            } else if cur_len <= 8 {
1407                true
1408            } else {
1409                unsafe {
1410                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1411                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1412                    lines_equal_after_prefix(a, b)
1413                }
1414            }
1415        } else {
1416            unsafe {
1417                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1418                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1419                a == b
1420            }
1421        };
1422
1423        if is_dup {
1424            count += 1;
1425        } else {
1426            let should_print = match config.mode {
1427                OutputMode::RepeatedOnly => count > 1,
1428                OutputMode::UniqueOnly => count == 1,
1429                _ => true,
1430            };
1431            if should_print {
1432                let idx = groups.len();
1433                let prefix_off = idx * PREFIX_SLOT;
1434                let prefix_len = format_count_prefix_into(
1435                    count,
1436                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1437                );
1438                groups.push((prefix_len, prev_start, prev_end));
1439
1440                if groups.len() >= BATCH {
1441                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1442                    groups.clear();
1443                    // Re-fill prefix_buf with spaces for next batch
1444                    prefix_buf.fill(b' ');
1445                }
1446            }
1447            prev_start = cur_start;
1448            prev_end = cur_end;
1449            prev_len = cur_len;
1450            prev_prefix = if cur_len >= 8 {
1451                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1452            } else {
1453                0
1454            };
1455            count = 1;
1456        }
1457
1458        if cur_end < data_len {
1459            cur_start = cur_end + 1;
1460        } else {
1461            break;
1462        }
1463    }
1464
1465    // Output last group
1466    let should_print = match config.mode {
1467        OutputMode::RepeatedOnly => count > 1,
1468        OutputMode::UniqueOnly => count == 1,
1469        _ => true,
1470    };
1471    if should_print {
1472        let idx = groups.len();
1473        let prefix_off = idx * PREFIX_SLOT;
1474        let prefix_len =
1475            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1476        groups.push((prefix_len, prev_start, prev_end));
1477    }
1478    if !groups.is_empty() {
1479        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1480    }
1481
1482    Ok(())
1483}
1484
1485/// Flush batched count groups using write_vectored (writev).
1486/// Builds IoSlice arrays from the prefix buffer and mmap'd data.
1487#[inline]
1488fn flush_count_groups(
1489    writer: &mut impl Write,
1490    prefix_buf: &[u8],
1491    groups: &[(usize, usize, usize)],
1492    term_slice: &[u8; 1],
1493    data: &[u8],
1494) -> io::Result<()> {
1495    const PREFIX_SLOT: usize = 28;
1496    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(groups.len() * 3);
1497    for (i, &(prefix_len, line_start, line_end)) in groups.iter().enumerate() {
1498        let prefix_off = i * PREFIX_SLOT;
1499        slices.push(io::IoSlice::new(
1500            &prefix_buf[prefix_off..prefix_off + prefix_len],
1501        ));
1502        slices.push(io::IoSlice::new(&data[line_start..line_end]));
1503        slices.push(io::IoSlice::new(term_slice));
1504    }
1505    write_all_vectored(writer, &slices)
1506}
1507
1508/// Format a count prefix into a buffer slot, returning the prefix length.
1509/// GNU format: "%7lu " — right-aligned count in 7-char field, followed by space.
1510/// Buffer must be pre-filled with spaces and at least 28 bytes.
1511#[inline(always)]
1512fn format_count_prefix_into(count: u64, buf: &mut [u8]) -> usize {
1513    if count <= 9 {
1514        buf[6] = b'0' + count as u8;
1515        buf[7] = b' ';
1516        return 8;
1517    }
1518    // Use itoa on a temp array, then copy
1519    let mut tmp = [b' '; 28];
1520    let digits = itoa_right_aligned_into(&mut tmp, count);
1521    let width = digits.max(7);
1522    tmp[width] = b' ';
1523    let len = width + 1;
1524    buf[..len].copy_from_slice(&tmp[..len]);
1525    len
1526}
1527
1528/// Fast single-pass for case-insensitive (-i) default mode.
1529/// Uses run-tracking zero-copy output and write_vectored batching.
1530/// Includes speculative line-end detection and length-based early rejection.
1531fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1532    let data_len = data.len();
1533    let base = data.as_ptr();
1534
1535    let first_end = match memchr::memchr(term, data) {
1536        Some(pos) => pos,
1537        None => {
1538            writer.write_all(data)?;
1539            return writer.write_all(&[term]);
1540        }
1541    };
1542
1543    let mut prev_start: usize = 0;
1544    let mut prev_len = first_end;
1545
1546    // Run-tracking: flush contiguous regions from the original data.
1547    let mut run_start: usize = 0;
1548    let mut cur_start = first_end + 1;
1549    let mut _last_output_end = first_end + 1;
1550
1551    while cur_start < data_len {
1552        // Speculative line-end detection
1553        let cur_end = {
1554            let speculative = cur_start + prev_len;
1555            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1556                speculative
1557            } else {
1558                match memchr::memchr(term, unsafe {
1559                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1560                }) {
1561                    Some(offset) => cur_start + offset,
1562                    None => data_len,
1563                }
1564            }
1565        };
1566
1567        let cur_len = cur_end - cur_start;
1568
1569        // Length-based early rejection before expensive case-insensitive compare
1570        let is_dup = cur_len == prev_len
1571            && unsafe {
1572                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1573                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1574                a.eq_ignore_ascii_case(b)
1575            };
1576
1577        if is_dup {
1578            // Duplicate — flush current run up to this line, skip it
1579            if run_start < cur_start {
1580                writer.write_all(&data[run_start..cur_start])?;
1581            }
1582            run_start = if cur_end < data_len {
1583                cur_end + 1
1584            } else {
1585                cur_end
1586            };
1587        } else {
1588            prev_start = cur_start;
1589            prev_len = cur_len;
1590            _last_output_end = if cur_end < data_len {
1591                cur_end + 1
1592            } else {
1593                cur_end
1594            };
1595        }
1596
1597        if cur_end < data_len {
1598            cur_start = cur_end + 1;
1599        } else {
1600            break;
1601        }
1602    }
1603
1604    // Flush remaining run
1605    if run_start < data_len {
1606        writer.write_all(&data[run_start..data_len])?;
1607    }
1608    // Ensure trailing terminator
1609    if !data.is_empty() && data[data_len - 1] != term {
1610        writer.write_all(&[term])?;
1611    }
1612
1613    Ok(())
1614}
1615
1616/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1617/// Zero-copy: writes directly from mmap data through BufWriter.
1618/// Uses speculative line-end detection and length-based early rejection.
1619fn process_filter_ci_singlepass(
1620    data: &[u8],
1621    writer: &mut impl Write,
1622    config: &UniqConfig,
1623    term: u8,
1624) -> io::Result<()> {
1625    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1626    let data_len = data.len();
1627    let base = data.as_ptr();
1628
1629    let first_term = match memchr::memchr(term, data) {
1630        Some(pos) => pos,
1631        None => {
1632            if !repeated {
1633                writer.write_all(data)?;
1634                writer.write_all(&[term])?;
1635            }
1636            return Ok(());
1637        }
1638    };
1639
1640    let mut prev_start: usize = 0;
1641    let mut prev_end: usize = first_term;
1642    let mut prev_len = prev_end;
1643    let mut count: u64 = 1;
1644    let mut cur_start = first_term + 1;
1645
1646    // Batch output using IoSlice write_vectored
1647    const BATCH: usize = 512;
1648    let term_slice: [u8; 1] = [term];
1649    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1650
1651    while cur_start < data_len {
1652        // Speculative line-end detection
1653        let cur_end = {
1654            let speculative = cur_start + prev_len;
1655            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1656                speculative
1657            } else {
1658                match memchr::memchr(term, unsafe {
1659                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1660                }) {
1661                    Some(offset) => cur_start + offset,
1662                    None => data_len,
1663                }
1664            }
1665        };
1666
1667        let cur_len = cur_end - cur_start;
1668        // Length check + case-insensitive comparison
1669        let is_dup = cur_len == prev_len
1670            && lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]);
1671
1672        if is_dup {
1673            count += 1;
1674        } else {
1675            let should_print = if repeated { count > 1 } else { count == 1 };
1676            if should_print {
1677                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1678                slices.push(io::IoSlice::new(&term_slice));
1679                if slices.len() >= BATCH * 2 {
1680                    write_all_vectored(writer, &slices)?;
1681                    slices.clear();
1682                }
1683            }
1684            prev_start = cur_start;
1685            prev_end = cur_end;
1686            prev_len = cur_len;
1687            count = 1;
1688        }
1689
1690        if cur_end < data_len {
1691            cur_start = cur_end + 1;
1692        } else {
1693            break;
1694        }
1695    }
1696
1697    let should_print = if repeated { count > 1 } else { count == 1 };
1698    if should_print {
1699        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1700        slices.push(io::IoSlice::new(&term_slice));
1701    }
1702    if !slices.is_empty() {
1703        write_all_vectored(writer, &slices)?;
1704    }
1705
1706    Ok(())
1707}
1708
1709/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1710/// Writes directly to BufWriter — no batch_buf allocation needed.
1711fn process_count_ci_singlepass(
1712    data: &[u8],
1713    writer: &mut impl Write,
1714    config: &UniqConfig,
1715    term: u8,
1716) -> io::Result<()> {
1717    let first_term = match memchr::memchr(term, data) {
1718        Some(pos) => pos,
1719        None => {
1720            let should_print = match config.mode {
1721                OutputMode::Default => true,
1722                OutputMode::RepeatedOnly => false,
1723                OutputMode::UniqueOnly => true,
1724                _ => true,
1725            };
1726            if should_print {
1727                write_count_line(writer, 1, data, term)?;
1728            }
1729            return Ok(());
1730        }
1731    };
1732
1733    let is_default = matches!(config.mode, OutputMode::Default);
1734
1735    let mut prev_start: usize = 0;
1736    let mut prev_end: usize = first_term;
1737    let mut count: u64 = 1;
1738    let mut cur_start = first_term + 1;
1739
1740    // Zero-copy writev batching: same approach as process_count_fast_singlepass
1741    const BATCH: usize = 340;
1742    const PREFIX_SLOT: usize = 28;
1743    let term_slice: [u8; 1] = [term];
1744    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1745    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1746
1747    let base = data.as_ptr();
1748    let data_len = data.len();
1749    let mut prev_len = prev_end - prev_start;
1750
1751    while cur_start < data_len {
1752        // Speculative line-end detection
1753        let cur_end = {
1754            let speculative = cur_start + prev_len;
1755            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1756                speculative
1757            } else {
1758                match memchr::memchr(term, unsafe {
1759                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1760                }) {
1761                    Some(offset) => cur_start + offset,
1762                    None => data_len,
1763                }
1764            }
1765        };
1766
1767        let cur_len = cur_end - cur_start;
1768        // Length-based early rejection before expensive case-insensitive compare
1769        let is_dup = cur_len == prev_len
1770            && data[prev_start..prev_end].eq_ignore_ascii_case(&data[cur_start..cur_end]);
1771
1772        if is_dup {
1773            count += 1;
1774        } else {
1775            let should_print = if is_default {
1776                true
1777            } else {
1778                match config.mode {
1779                    OutputMode::RepeatedOnly => count > 1,
1780                    OutputMode::UniqueOnly => count == 1,
1781                    _ => true,
1782                }
1783            };
1784            if should_print {
1785                let idx = groups.len();
1786                let prefix_off = idx * PREFIX_SLOT;
1787                let prefix_len = format_count_prefix_into(
1788                    count,
1789                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1790                );
1791                groups.push((prefix_len, prev_start, prev_end));
1792
1793                if groups.len() >= BATCH {
1794                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1795                    groups.clear();
1796                    prefix_buf.fill(b' ');
1797                }
1798            }
1799            prev_start = cur_start;
1800            prev_end = cur_end;
1801            prev_len = cur_len;
1802            count = 1;
1803        }
1804
1805        if cur_end < data_len {
1806            cur_start = cur_end + 1;
1807        } else {
1808            break;
1809        }
1810    }
1811
1812    let should_print = if is_default {
1813        true
1814    } else {
1815        match config.mode {
1816            OutputMode::RepeatedOnly => count > 1,
1817            OutputMode::UniqueOnly => count == 1,
1818            _ => true,
1819        }
1820    };
1821    if should_print {
1822        let idx = groups.len();
1823        let prefix_off = idx * PREFIX_SLOT;
1824        let prefix_len =
1825            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1826        groups.push((prefix_len, prev_start, prev_end));
1827    }
1828    if !groups.is_empty() {
1829        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1830    }
1831
1832    Ok(())
1833}
1834
1835/// Output a group for standard modes (bytes path).
1836#[inline(always)]
1837fn output_group_bytes(
1838    writer: &mut impl Write,
1839    content: &[u8],
1840    full: &[u8],
1841    count: u64,
1842    config: &UniqConfig,
1843    term: u8,
1844) -> io::Result<()> {
1845    let should_print = match config.mode {
1846        OutputMode::Default => true,
1847        OutputMode::RepeatedOnly => count > 1,
1848        OutputMode::UniqueOnly => count == 1,
1849        _ => true,
1850    };
1851
1852    if should_print {
1853        if config.count {
1854            write_count_line(writer, count, content, term)?;
1855        } else {
1856            writer.write_all(full)?;
1857            // Add terminator if the original line didn't have one
1858            if full.len() == content.len() {
1859                writer.write_all(&[term])?;
1860            }
1861        }
1862    }
1863
1864    Ok(())
1865}
1866
1867/// Process --all-repeated / -D mode on byte slices.
1868fn process_all_repeated_bytes(
1869    data: &[u8],
1870    writer: &mut impl Write,
1871    config: &UniqConfig,
1872    method: AllRepeatedMethod,
1873    term: u8,
1874) -> io::Result<()> {
1875    let mut lines = LineIter::new(data, term);
1876
1877    let first = match lines.next() {
1878        Some(v) => v,
1879        None => return Ok(()),
1880    };
1881
1882    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1883    // For all-repeated we need to buffer group lines since we only print if count > 1
1884    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1885    group_lines.push(first);
1886    let mut first_group_printed = false;
1887
1888    let fast = !needs_key_extraction(config) && !config.ignore_case;
1889
1890    for (cur_content, cur_full) in lines {
1891        let prev_content = group_lines.last().unwrap().0;
1892        let equal = if fast {
1893            lines_equal_fast(prev_content, cur_content)
1894        } else {
1895            lines_equal(prev_content, cur_content, config)
1896        };
1897
1898        if equal {
1899            group_lines.push((cur_content, cur_full));
1900        } else {
1901            // Flush group
1902            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1903            group_lines.clear();
1904            group_lines.push((cur_content, cur_full));
1905        }
1906    }
1907
1908    // Flush last group
1909    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1910
1911    Ok(())
1912}
1913
1914/// Flush a group for --all-repeated mode (bytes path).
1915fn flush_all_repeated_bytes(
1916    writer: &mut impl Write,
1917    group: &[(&[u8], &[u8])],
1918    method: AllRepeatedMethod,
1919    first_group_printed: &mut bool,
1920    term: u8,
1921) -> io::Result<()> {
1922    if group.len() <= 1 {
1923        return Ok(()); // Not a duplicate group
1924    }
1925
1926    match method {
1927        AllRepeatedMethod::Prepend => {
1928            writer.write_all(&[term])?;
1929        }
1930        AllRepeatedMethod::Separate => {
1931            if *first_group_printed {
1932                writer.write_all(&[term])?;
1933            }
1934        }
1935        AllRepeatedMethod::None => {}
1936    }
1937
1938    for &(content, full) in group {
1939        writer.write_all(full)?;
1940        if full.len() == content.len() {
1941            writer.write_all(&[term])?;
1942        }
1943    }
1944
1945    *first_group_printed = true;
1946    Ok(())
1947}
1948
1949/// Process --group mode on byte slices.
1950fn process_group_bytes(
1951    data: &[u8],
1952    writer: &mut impl Write,
1953    config: &UniqConfig,
1954    method: GroupMethod,
1955    term: u8,
1956) -> io::Result<()> {
1957    let mut lines = LineIter::new(data, term);
1958
1959    let (prev_content, prev_full) = match lines.next() {
1960        Some(v) => v,
1961        None => return Ok(()),
1962    };
1963
1964    // Prepend/Both: separator before first group
1965    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1966        writer.write_all(&[term])?;
1967    }
1968
1969    // Write first line
1970    writer.write_all(prev_full)?;
1971    if prev_full.len() == prev_content.len() {
1972        writer.write_all(&[term])?;
1973    }
1974
1975    let mut prev_content = prev_content;
1976    let fast = !needs_key_extraction(config) && !config.ignore_case;
1977
1978    for (cur_content, cur_full) in lines {
1979        let equal = if fast {
1980            lines_equal_fast(prev_content, cur_content)
1981        } else {
1982            lines_equal(prev_content, cur_content, config)
1983        };
1984
1985        if !equal {
1986            // New group — write separator
1987            writer.write_all(&[term])?;
1988        }
1989
1990        writer.write_all(cur_full)?;
1991        if cur_full.len() == cur_content.len() {
1992            writer.write_all(&[term])?;
1993        }
1994
1995        prev_content = cur_content;
1996    }
1997
1998    // Append/Both: separator after last group
1999    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
2000        writer.write_all(&[term])?;
2001    }
2002
2003    Ok(())
2004}
2005
2006// ============================================================================
2007// Streaming processing (for stdin / pipe input)
2008// ============================================================================
2009
2010/// Main streaming uniq processor.
2011/// Reads from `input`, writes to `output`.
2012pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
2013    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
2014    let mut writer = BufWriter::with_capacity(32 * 1024 * 1024, output);
2015    let term = if config.zero_terminated { b'\0' } else { b'\n' };
2016
2017    match config.mode {
2018        OutputMode::Group(method) => {
2019            process_group_stream(reader, &mut writer, config, method, term)?;
2020        }
2021        OutputMode::AllRepeated(method) => {
2022            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
2023        }
2024        _ => {
2025            process_standard_stream(reader, &mut writer, config, term)?;
2026        }
2027    }
2028
2029    writer.flush()?;
2030    Ok(())
2031}
2032
2033/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
2034fn process_standard_stream<R: BufRead, W: Write>(
2035    mut reader: R,
2036    writer: &mut W,
2037    config: &UniqConfig,
2038    term: u8,
2039) -> io::Result<()> {
2040    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2041    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2042
2043    // Read first line
2044    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2045        return Ok(()); // empty input
2046    }
2047    let mut count: u64 = 1;
2048
2049    loop {
2050        current_line.clear();
2051        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2052
2053        if bytes_read == 0 {
2054            // End of input — output the last group
2055            output_group_stream(writer, &prev_line, count, config, term)?;
2056            break;
2057        }
2058
2059        if compare_lines_stream(&prev_line, &current_line, config, term) {
2060            count += 1;
2061        } else {
2062            output_group_stream(writer, &prev_line, count, config, term)?;
2063            std::mem::swap(&mut prev_line, &mut current_line);
2064            count = 1;
2065        }
2066    }
2067
2068    Ok(())
2069}
2070
2071/// Compare two lines (with terminators) in streaming mode.
2072#[inline(always)]
2073fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
2074    let a_stripped = strip_term(a, term);
2075    let b_stripped = strip_term(b, term);
2076    lines_equal(a_stripped, b_stripped, config)
2077}
2078
2079/// Strip terminator from end of line.
2080#[inline(always)]
2081fn strip_term(line: &[u8], term: u8) -> &[u8] {
2082    if line.last() == Some(&term) {
2083        &line[..line.len() - 1]
2084    } else {
2085        line
2086    }
2087}
2088
2089/// Output a group in streaming mode.
2090#[inline(always)]
2091fn output_group_stream(
2092    writer: &mut impl Write,
2093    line: &[u8],
2094    count: u64,
2095    config: &UniqConfig,
2096    term: u8,
2097) -> io::Result<()> {
2098    let should_print = match config.mode {
2099        OutputMode::Default => true,
2100        OutputMode::RepeatedOnly => count > 1,
2101        OutputMode::UniqueOnly => count == 1,
2102        _ => true,
2103    };
2104
2105    if should_print {
2106        let content = strip_term(line, term);
2107        if config.count {
2108            write_count_line(writer, count, content, term)?;
2109        } else {
2110            writer.write_all(content)?;
2111            writer.write_all(&[term])?;
2112        }
2113    }
2114
2115    Ok(())
2116}
2117
2118/// Process --all-repeated / -D mode (streaming).
2119fn process_all_repeated_stream<R: BufRead, W: Write>(
2120    mut reader: R,
2121    writer: &mut W,
2122    config: &UniqConfig,
2123    method: AllRepeatedMethod,
2124    term: u8,
2125) -> io::Result<()> {
2126    let mut group: Vec<Vec<u8>> = Vec::new();
2127    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2128    let mut first_group_printed = false;
2129
2130    current_line.clear();
2131    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
2132        return Ok(());
2133    }
2134    group.push(current_line.clone());
2135
2136    loop {
2137        current_line.clear();
2138        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2139
2140        if bytes_read == 0 {
2141            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2142            break;
2143        }
2144
2145        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
2146            group.push(current_line.clone());
2147        } else {
2148            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2149            group.clear();
2150            group.push(current_line.clone());
2151        }
2152    }
2153
2154    Ok(())
2155}
2156
2157/// Flush a group for --all-repeated mode (streaming).
2158fn flush_all_repeated_stream(
2159    writer: &mut impl Write,
2160    group: &[Vec<u8>],
2161    method: AllRepeatedMethod,
2162    first_group_printed: &mut bool,
2163    term: u8,
2164) -> io::Result<()> {
2165    if group.len() <= 1 {
2166        return Ok(());
2167    }
2168
2169    match method {
2170        AllRepeatedMethod::Prepend => {
2171            writer.write_all(&[term])?;
2172        }
2173        AllRepeatedMethod::Separate => {
2174            if *first_group_printed {
2175                writer.write_all(&[term])?;
2176            }
2177        }
2178        AllRepeatedMethod::None => {}
2179    }
2180
2181    for line in group {
2182        let content = strip_term(line, term);
2183        writer.write_all(content)?;
2184        writer.write_all(&[term])?;
2185    }
2186
2187    *first_group_printed = true;
2188    Ok(())
2189}
2190
2191/// Process --group mode (streaming).
2192fn process_group_stream<R: BufRead, W: Write>(
2193    mut reader: R,
2194    writer: &mut W,
2195    config: &UniqConfig,
2196    method: GroupMethod,
2197    term: u8,
2198) -> io::Result<()> {
2199    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2200    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2201
2202    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2203        return Ok(());
2204    }
2205
2206    // Prepend/Both: separator before first group
2207    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
2208        writer.write_all(&[term])?;
2209    }
2210
2211    let content = strip_term(&prev_line, term);
2212    writer.write_all(content)?;
2213    writer.write_all(&[term])?;
2214
2215    loop {
2216        current_line.clear();
2217        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2218
2219        if bytes_read == 0 {
2220            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
2221                writer.write_all(&[term])?;
2222            }
2223            break;
2224        }
2225
2226        if !compare_lines_stream(&prev_line, &current_line, config, term) {
2227            writer.write_all(&[term])?;
2228        }
2229
2230        let content = strip_term(&current_line, term);
2231        writer.write_all(content)?;
2232        writer.write_all(&[term])?;
2233
2234        std::mem::swap(&mut prev_line, &mut current_line);
2235    }
2236
2237    Ok(())
2238}
2239
2240/// Read a line terminated by the given byte (newline or NUL).
2241/// Returns number of bytes read (0 = EOF).
2242#[inline(always)]
2243fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
2244    reader.read_until(term, buf)
2245}