Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write all IoSlices to the writer, handling partial writes correctly.
4fn write_all_vectored(writer: &mut impl Write, slices: &[io::IoSlice<'_>]) -> io::Result<()> {
5    let n = writer.write_vectored(slices)?;
6    let expected: usize = slices.iter().map(|s| s.len()).sum();
7    if n >= expected {
8        return Ok(());
9    }
10    if n == 0 && expected > 0 {
11        return Err(io::Error::new(
12            io::ErrorKind::WriteZero,
13            "write_vectored returned 0",
14        ));
15    }
16    // Slow path: partial write — fall back to write_all per remaining slice.
17    let mut consumed = n;
18    for slice in slices {
19        if consumed == 0 {
20            writer.write_all(slice)?;
21        } else if consumed >= slice.len() {
22            consumed -= slice.len();
23        } else {
24            writer.write_all(&slice[consumed..])?;
25            consumed = 0;
26        }
27    }
28    Ok(())
29}
30
31/// How to delimit groups when using --all-repeated
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum AllRepeatedMethod {
34    None,
35    Prepend,
36    Separate,
37}
38
39/// How to delimit groups when using --group
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum GroupMethod {
42    Separate,
43    Prepend,
44    Append,
45    Both,
46}
47
48/// Output mode for uniq
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum OutputMode {
51    /// Default: print unique lines and first of each duplicate group
52    Default,
53    /// -d: print only first line of duplicate groups
54    RepeatedOnly,
55    /// -D / --all-repeated: print ALL duplicate lines
56    AllRepeated(AllRepeatedMethod),
57    /// -u: print only lines that are NOT duplicated
58    UniqueOnly,
59    /// --group: show all items with group separators
60    Group(GroupMethod),
61}
62
63/// Configuration for uniq processing
64#[derive(Debug, Clone)]
65pub struct UniqConfig {
66    pub mode: OutputMode,
67    pub count: bool,
68    pub ignore_case: bool,
69    pub skip_fields: usize,
70    pub skip_chars: usize,
71    pub check_chars: Option<usize>,
72    pub zero_terminated: bool,
73}
74
75impl Default for UniqConfig {
76    fn default() -> Self {
77        Self {
78            mode: OutputMode::Default,
79            count: false,
80            ignore_case: false,
81            skip_fields: 0,
82            skip_chars: 0,
83            check_chars: None,
84            zero_terminated: false,
85        }
86    }
87}
88
89/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
90/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
91#[inline(always)]
92fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
93    let mut start = 0;
94    let len = line.len();
95
96    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
97    // Early-exit if already past end of line to avoid O(skip_fields) loop
98    let mut fields_remaining = config.skip_fields;
99    while fields_remaining > 0 && start < len {
100        // Skip blanks (space and tab)
101        while start < len && (line[start] == b' ' || line[start] == b'\t') {
102            start += 1;
103        }
104        // Skip non-blanks (field content)
105        while start < len && line[start] != b' ' && line[start] != b'\t' {
106            start += 1;
107        }
108        fields_remaining -= 1;
109    }
110
111    // Skip N characters
112    if config.skip_chars > 0 {
113        let remaining = len - start;
114        let skip = config.skip_chars.min(remaining);
115        start += skip;
116    }
117
118    let slice = &line[start..];
119
120    // Limit comparison to N characters
121    if let Some(w) = config.check_chars {
122        if w < slice.len() {
123            return &slice[..w];
124        }
125    }
126
127    slice
128}
129
130/// Compare two lines (without terminators) using the config's comparison rules.
131#[inline(always)]
132fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
133    let sa = get_compare_slice(a, config);
134    let sb = get_compare_slice(b, config);
135
136    if config.ignore_case {
137        sa.eq_ignore_ascii_case(sb)
138    } else {
139        sa == sb
140    }
141}
142
143/// Case-insensitive comparison of two byte slices.
144/// Uses the standard library's correct per-byte ASCII case folding.
145#[inline(always)]
146fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
147    a.eq_ignore_ascii_case(b)
148}
149
150/// Check if config requires field/char skipping or char limiting.
151#[inline(always)]
152fn needs_key_extraction(config: &UniqConfig) -> bool {
153    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
154}
155
156/// Fast path comparison: no field/char extraction needed, no case folding.
157/// Uses pointer+length equality shortcut and multi-word prefix rejection.
158/// For short lines (<= 32 bytes, common in many-dups data), avoids the
159/// full memcmp call overhead by doing direct word comparisons.
160/// For medium lines (33-256 bytes), uses a tight u64 loop covering the
161/// full line without falling through to memcmp.
162#[inline(always)]
163fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
164    let alen = a.len();
165    if alen != b.len() {
166        return false;
167    }
168    if alen == 0 {
169        return true;
170    }
171    // Short-line fast path: compare via word loads to avoid memcmp call overhead
172    if alen <= 8 {
173        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
174        return a == b;
175    }
176    unsafe {
177        let ap = a.as_ptr();
178        let bp = b.as_ptr();
179        // 8-byte prefix check: reject most non-equal lines without full memcmp
180        let a8 = (ap as *const u64).read_unaligned();
181        let b8 = (bp as *const u64).read_unaligned();
182        if a8 != b8 {
183            return false;
184        }
185        // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
186        if alen <= 16 {
187            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
188            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
189            return a_tail == b_tail;
190        }
191        // For 17-32 bytes: check first 16 + last 16 (overlapping) to avoid memcmp
192        if alen <= 32 {
193            let a16 = (ap.add(8) as *const u64).read_unaligned();
194            let b16 = (bp.add(8) as *const u64).read_unaligned();
195            if a16 != b16 {
196                return false;
197            }
198            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
199            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
200            return a_tail == b_tail;
201        }
202        // For 33-256 bytes: tight u64 loop covering the full line.
203        // Compare 32 bytes per iteration (4 u64 loads), then handle tail.
204        // This avoids the function call overhead of memcmp for medium lines.
205        if alen <= 256 {
206            let mut off = 8usize; // first 8 bytes already compared
207            // Compare 32 bytes at a time
208            while off + 32 <= alen {
209                let a0 = (ap.add(off) as *const u64).read_unaligned();
210                let b0 = (bp.add(off) as *const u64).read_unaligned();
211                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
212                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
213                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
214                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
215                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
216                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
217                // XOR all pairs and OR together: zero if all equal
218                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
219                    return false;
220                }
221                off += 32;
222            }
223            // Compare remaining 8 bytes at a time
224            while off + 8 <= alen {
225                let aw = (ap.add(off) as *const u64).read_unaligned();
226                let bw = (bp.add(off) as *const u64).read_unaligned();
227                if aw != bw {
228                    return false;
229                }
230                off += 8;
231            }
232            // Compare tail (overlapping last 8 bytes)
233            if off < alen {
234                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
235                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
236                return a_tail == b_tail;
237            }
238            return true;
239        }
240    }
241    // Longer lines (>256): prefix passed, fall through to full memcmp
242    a == b
243}
244
245/// Compare two equal-length lines starting from byte 8.
246/// Caller has already checked: lengths are equal, both >= 9 bytes, first 8 bytes match.
247/// This avoids redundant checks when the calling loop already did prefix rejection.
248#[inline(always)]
249fn lines_equal_after_prefix(a: &[u8], b: &[u8]) -> bool {
250    let alen = a.len();
251    debug_assert!(alen == b.len());
252    debug_assert!(alen > 8);
253    unsafe {
254        let ap = a.as_ptr();
255        let bp = b.as_ptr();
256        // Check last 8 bytes first (overlapping for 9-16 byte lines)
257        if alen <= 16 {
258            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
259            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
260            return a_tail == b_tail;
261        }
262        if alen <= 32 {
263            let a16 = (ap.add(8) as *const u64).read_unaligned();
264            let b16 = (bp.add(8) as *const u64).read_unaligned();
265            if a16 != b16 {
266                return false;
267            }
268            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
269            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
270            return a_tail == b_tail;
271        }
272        if alen <= 256 {
273            let mut off = 8usize;
274            while off + 32 <= alen {
275                let a0 = (ap.add(off) as *const u64).read_unaligned();
276                let b0 = (bp.add(off) as *const u64).read_unaligned();
277                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
278                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
279                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
280                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
281                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
282                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
283                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
284                    return false;
285                }
286                off += 32;
287            }
288            while off + 8 <= alen {
289                let aw = (ap.add(off) as *const u64).read_unaligned();
290                let bw = (bp.add(off) as *const u64).read_unaligned();
291                if aw != bw {
292                    return false;
293                }
294                off += 8;
295            }
296            if off < alen {
297                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
298                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
299                return a_tail == b_tail;
300            }
301            return true;
302        }
303    }
304    // >256 bytes: use memcmp via slice comparison (skipping the already-compared prefix)
305    a[8..] == b[8..]
306}
307
308/// Write a count-prefixed line in GNU uniq format.
309/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
310/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
311///
312/// Optimized with lookup table for counts 1-9 (most common case in many-dups data)
313/// and fast-path for counts < 10M (always fits in 7 chars, no copy_within needed).
314#[inline(always)]
315fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
316    // Ultra-fast path for common small counts: pre-built prefix strings
317    // Avoids all the itoa/copy_within overhead for the most common case.
318    if count <= 9 {
319        // "      N " where N is 1-9 (7 chars + space = 8 bytes prefix)
320        let prefix: &[u8] = match count {
321            1 => b"      1 ",
322            2 => b"      2 ",
323            3 => b"      3 ",
324            4 => b"      4 ",
325            5 => b"      5 ",
326            6 => b"      6 ",
327            7 => b"      7 ",
328            8 => b"      8 ",
329            9 => b"      9 ",
330            _ => unreachable!(),
331        };
332        let total = 8 + line.len() + 1;
333        if total <= 256 {
334            let mut buf = [0u8; 256];
335            unsafe {
336                std::ptr::copy_nonoverlapping(prefix.as_ptr(), buf.as_mut_ptr(), 8);
337                std::ptr::copy_nonoverlapping(line.as_ptr(), buf.as_mut_ptr().add(8), line.len());
338                *buf.as_mut_ptr().add(8 + line.len()) = term;
339            }
340            return out.write_all(&buf[..total]);
341        } else {
342            out.write_all(prefix)?;
343            out.write_all(line)?;
344            return out.write_all(&[term]);
345        }
346    }
347
348    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
349    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
350    let digits = itoa_right_aligned_into(&mut prefix, count);
351    let width = digits.max(7); // minimum 7 chars
352    let prefix_len = width + 1; // +1 for trailing space
353    prefix[width] = b' ';
354
355    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
356    let total = prefix_len + line.len() + 1;
357    if total <= 256 {
358        let mut buf = [0u8; 256];
359        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
360        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
361        buf[prefix_len + line.len()] = term;
362        out.write_all(&buf[..total])
363    } else {
364        out.write_all(&prefix[..prefix_len])?;
365        out.write_all(line)?;
366        out.write_all(&[term])
367    }
368}
369
370/// Write u64 decimal right-aligned into prefix buffer.
371/// Buffer is pre-filled with spaces. Returns number of digits written.
372#[inline(always)]
373fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
374    if val == 0 {
375        buf[6] = b'0';
376        return 7; // 6 spaces + '0' = 7 chars
377    }
378    // Write digits right-to-left from position 27 (leaving room for trailing space)
379    let mut pos = 27;
380    while val > 0 {
381        pos -= 1;
382        buf[pos] = b'0' + (val % 10) as u8;
383        val /= 10;
384    }
385    let num_digits = 27 - pos;
386    if num_digits >= 7 {
387        // Number is wide enough, shift to front
388        buf.copy_within(pos..27, 0);
389        num_digits
390    } else {
391        // Right-align in 7-char field: spaces then digits
392        let pad = 7 - num_digits;
393        buf.copy_within(pos..27, pad);
394        // buf[0..pad] is already spaces from initialization
395        7
396    }
397}
398
399// ============================================================================
400// High-performance mmap-based processing (for byte slices, zero-copy)
401// ============================================================================
402
403/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
404pub fn process_uniq_bytes(
405    data: &[u8],
406    mut output: impl Write,
407    config: &UniqConfig,
408) -> io::Result<()> {
409    let term = if config.zero_terminated { b'\0' } else { b'\n' };
410
411    // Zero-copy fast path: bypass BufWriter for standard modes with IoSlice output.
412    // Default mode: writes contiguous runs directly from mmap data via writev.
413    // Filter modes (-d/-u): IoSlice batching (512 lines per writev).
414    // Count mode (-c): IoSlice batching (340 groups per writev, prefix arena + mmap data).
415    // Without BufWriter, writes go directly via writev/vmsplice (zero-copy for data slices).
416    let fast = !needs_key_extraction(config) && !config.ignore_case;
417    if fast
418        && matches!(
419            config.mode,
420            OutputMode::Default | OutputMode::RepeatedOnly | OutputMode::UniqueOnly
421        )
422    {
423        return process_standard_bytes(data, &mut output, config, term);
424    }
425
426    // General path with BufWriter for modes that need formatting/buffering.
427    // 16MB buffer — optimal for L3 cache utilization on modern CPUs.
428    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
429
430    match config.mode {
431        OutputMode::Group(method) => {
432            process_group_bytes(data, &mut writer, config, method, term)?;
433        }
434        OutputMode::AllRepeated(method) => {
435            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
436        }
437        _ => {
438            process_standard_bytes(data, &mut writer, config, term)?;
439        }
440    }
441
442    writer.flush()?;
443    Ok(())
444}
445
446/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
447/// Uses memchr for SIMD-accelerated line boundary detection.
448struct LineIter<'a> {
449    data: &'a [u8],
450    pos: usize,
451    term: u8,
452}
453
454impl<'a> LineIter<'a> {
455    #[inline(always)]
456    fn new(data: &'a [u8], term: u8) -> Self {
457        Self { data, pos: 0, term }
458    }
459}
460
461impl<'a> Iterator for LineIter<'a> {
462    /// (line content without terminator, full line including terminator for output)
463    type Item = (&'a [u8], &'a [u8]);
464
465    #[inline(always)]
466    fn next(&mut self) -> Option<Self::Item> {
467        if self.pos >= self.data.len() {
468            return None;
469        }
470
471        let remaining = &self.data[self.pos..];
472        match memchr::memchr(self.term, remaining) {
473            Some(idx) => {
474                let line_start = self.pos;
475                let line_end = self.pos + idx; // without terminator
476                let full_end = self.pos + idx + 1; // with terminator
477                self.pos = full_end;
478                Some((
479                    &self.data[line_start..line_end],
480                    &self.data[line_start..full_end],
481                ))
482            }
483            None => {
484                // Last line without terminator
485                let line_start = self.pos;
486                self.pos = self.data.len();
487                let line = &self.data[line_start..];
488                Some((line, line))
489            }
490        }
491    }
492}
493
494/// Get line content (without terminator) from pre-computed positions.
495/// `content_end` is the end of actual content (excludes trailing terminator if present).
496#[inline(always)]
497fn line_content_at<'a>(
498    data: &'a [u8],
499    line_starts: &[usize],
500    idx: usize,
501    content_end: usize,
502) -> &'a [u8] {
503    let start = line_starts[idx];
504    let end = if idx + 1 < line_starts.len() {
505        line_starts[idx + 1] - 1 // exclude terminator
506    } else {
507        content_end // last line: pre-computed to exclude trailing terminator
508    };
509    &data[start..end]
510}
511
512/// Get full line (with terminator) from pre-computed positions.
513#[inline(always)]
514fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
515    let start = line_starts[idx];
516    let end = if idx + 1 < line_starts.len() {
517        line_starts[idx + 1] // include terminator
518    } else {
519        data.len()
520    };
521    &data[start..end]
522}
523
524/// Skip a run of identical lines using doubling memcmp.
525/// When a duplicate is found at `dup_start`, this verifies progressively larger
526/// blocks of identical `pattern_len`-byte copies using memcmp (SIMD-accelerated).
527/// Returns the byte offset just past the last verified duplicate copy.
528///
529/// For 50K identical 6-byte lines: ~16 memcmp calls (~600KB total) vs 50K per-line
530/// comparisons. At memcmp's SIMD throughput (~48GB/s), this takes ~12µs vs ~250µs.
531///
532/// Correctness: the doubling trick verifies every byte in the range by induction.
533/// Block[0..N] verified → check Block[N..2N] == Block[0..N] → Block[0..2N] verified.
534#[inline]
535fn skip_dup_run(data: &[u8], dup_start: usize, pattern_start: usize, pattern_len: usize) -> usize {
536    let data_len = data.len();
537    // Need at least 2 more copies worth of data for doubling to help
538    if pattern_len == 0 || dup_start + 2 * pattern_len > data_len {
539        return dup_start + pattern_len.min(data_len - dup_start);
540    }
541
542    let mut verified_end = dup_start + pattern_len; // 1 copy verified
543
544    // Phase 1: doubling — compare verified block vs next block of same size.
545    // Each step doubles the verified region. Total bytes compared ≈ 2 × total region.
546    let mut block_copies = 1usize;
547    loop {
548        let block_bytes = block_copies * pattern_len;
549        let next_end = verified_end + block_bytes;
550        if next_end > data_len {
551            // Not enough room for a full doubling. Check remaining complete copies.
552            let remaining = data_len - verified_end;
553            let remaining_bytes = (remaining / pattern_len) * pattern_len;
554            if remaining_bytes > 0
555                && data[dup_start..dup_start + remaining_bytes]
556                    == data[verified_end..verified_end + remaining_bytes]
557            {
558                verified_end += remaining_bytes;
559            }
560            break;
561        }
562
563        if data[dup_start..dup_start + block_bytes] == data[verified_end..next_end] {
564            verified_end = next_end;
565            block_copies *= 2;
566        } else {
567            break;
568        }
569    }
570
571    // Phase 2: linear scan for remaining lines at the boundary.
572    // At most `block_copies` iterations (the last failed block size).
573    while verified_end + pattern_len <= data_len {
574        if data[verified_end..verified_end + pattern_len]
575            == data[pattern_start..pattern_start + pattern_len]
576        {
577            verified_end += pattern_len;
578        } else {
579            break;
580        }
581    }
582
583    verified_end
584}
585
586/// Linear scan for the end of a duplicate group.
587/// Returns the index of the first line that differs from line_starts[group_start].
588/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
589/// equal lines can appear in non-adjacent groups separated by different lines.
590/// Caches key length for fast length-mismatch rejection.
591#[inline]
592fn linear_scan_group_end(
593    data: &[u8],
594    line_starts: &[usize],
595    group_start: usize,
596    num_lines: usize,
597    content_end: usize,
598) -> usize {
599    let key = line_content_at(data, line_starts, group_start, content_end);
600    let key_len = key.len();
601    let mut i = group_start + 1;
602    while i < num_lines {
603        let candidate = line_content_at(data, line_starts, i, content_end);
604        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
605            return i;
606        }
607        i += 1;
608    }
609    i
610}
611
612/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
613/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
614/// General path: pre-computed line positions with binary search for groups.
615fn process_standard_bytes(
616    data: &[u8],
617    writer: &mut impl Write,
618    config: &UniqConfig,
619    term: u8,
620) -> io::Result<()> {
621    if data.is_empty() {
622        return Ok(());
623    }
624
625    let fast = !needs_key_extraction(config) && !config.ignore_case;
626    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
627
628    // Ultra-fast path: default mode, no count, no key extraction.
629    // Single-pass: scan with memchr, compare adjacent lines inline.
630    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
631    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
632        return process_default_fast_singlepass(data, writer, term);
633    }
634
635    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
636    if fast
637        && !config.count
638        && matches!(
639            config.mode,
640            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
641        )
642    {
643        return process_filter_fast_singlepass(data, writer, config, term);
644    }
645
646    // Ultra-fast path: count mode with no key extraction.
647    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
648    // Avoids the line_starts Vec allocation (20MB+ for large files).
649    if fast && config.count {
650        return process_count_fast_singlepass(data, writer, config, term);
651    }
652
653    // Fast path for case-insensitive (-i) mode with no key extraction.
654    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
655    // Avoids the general path's line_starts Vec allocation.
656    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
657        return process_default_ci_singlepass(data, writer, term);
658    }
659
660    if fast_ci
661        && !config.count
662        && matches!(
663            config.mode,
664            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
665        )
666    {
667        return process_filter_ci_singlepass(data, writer, config, term);
668    }
669
670    if fast_ci && config.count {
671        return process_count_ci_singlepass(data, writer, config, term);
672    }
673
674    // General path: pre-computed line positions for binary search on groups
675    let estimated_lines = (data.len() / 40).max(64);
676    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
677    line_starts.push(0);
678    for pos in memchr::memchr_iter(term, data) {
679        if pos + 1 < data.len() {
680            line_starts.push(pos + 1);
681        }
682    }
683    let num_lines = line_starts.len();
684    if num_lines == 0 {
685        return Ok(());
686    }
687
688    // Pre-compute content end: if data ends with terminator, exclude it for last line
689    let content_end = if data.last() == Some(&term) {
690        data.len() - 1
691    } else {
692        data.len()
693    };
694
695    // Ultra-fast path: default mode, no count, no key extraction
696    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
697        // Write first line
698        let first_full = line_full_at(data, &line_starts, 0);
699        let first_content = line_content_at(data, &line_starts, 0, content_end);
700        writer.write_all(first_full)?;
701        if first_full.len() == first_content.len() {
702            writer.write_all(&[term])?;
703        }
704
705        let mut i = 1;
706        while i < num_lines {
707            let prev = line_content_at(data, &line_starts, i - 1, content_end);
708            let cur = line_content_at(data, &line_starts, i, content_end);
709
710            if lines_equal_fast(prev, cur) {
711                // Duplicate detected — linear scan for end of group
712                let group_end =
713                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
714                i = group_end;
715                continue;
716            }
717
718            // Unique line — write it
719            let cur_full = line_full_at(data, &line_starts, i);
720            writer.write_all(cur_full)?;
721            if cur_full.len() == cur.len() {
722                writer.write_all(&[term])?;
723            }
724            i += 1;
725        }
726        return Ok(());
727    }
728
729    // General path with count tracking
730    let mut i = 0;
731    while i < num_lines {
732        let content = line_content_at(data, &line_starts, i, content_end);
733        let full = line_full_at(data, &line_starts, i);
734
735        let group_end = if fast
736            && i + 1 < num_lines
737            && lines_equal_fast(
738                content,
739                line_content_at(data, &line_starts, i + 1, content_end),
740            ) {
741            // Duplicate detected — linear scan for end
742            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
743        } else if !fast
744            && i + 1 < num_lines
745            && lines_equal(
746                content,
747                line_content_at(data, &line_starts, i + 1, content_end),
748                config,
749            )
750        {
751            // Slow path linear scan with key extraction
752            let mut j = i + 2;
753            while j < num_lines {
754                if !lines_equal(
755                    content,
756                    line_content_at(data, &line_starts, j, content_end),
757                    config,
758                ) {
759                    break;
760                }
761                j += 1;
762            }
763            j
764        } else {
765            i + 1
766        };
767
768        let count = (group_end - i) as u64;
769        output_group_bytes(writer, content, full, count, config, term)?;
770        i = group_end;
771    }
772
773    Ok(())
774}
775
776/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
777/// No pre-computed positions, no binary search, no Vec allocation.
778/// Outputs each line that differs from the previous.
779///
780/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
781/// independently, then cross-chunk boundaries are resolved.
782fn process_default_fast_singlepass(
783    data: &[u8],
784    writer: &mut impl Write,
785    term: u8,
786) -> io::Result<()> {
787    // Parallel path for large files — kick in at 4MB.
788    // Lower thresholds (e.g. 2MB) hurt performance on 10MB files because
789    // the parallel overhead dominates for smaller chunks.
790    if data.len() >= 4 * 1024 * 1024 {
791        return process_default_parallel(data, writer, term);
792    }
793
794    process_default_sequential(data, writer, term)
795}
796
797/// Sequential single-pass dedup with zero-copy output.
798/// Instead of copying data to a buffer, tracks contiguous output runs and writes
799/// directly from the original data. For all-unique data, this is a single write_all.
800///
801/// Optimized for the "many duplicates" case: caches the previous line's length
802/// and first-8-byte prefix for fast rejection of non-duplicates without
803/// calling the full comparison function.
804///
805/// Uses raw pointer arithmetic throughout to avoid bounds checking in the hot loop.
806fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
807    let data_len = data.len();
808    let base = data.as_ptr();
809    let mut prev_start: usize = 0;
810
811    // Find end of first line
812    let first_end: usize = match memchr::memchr(term, data) {
813        Some(pos) => pos,
814        None => {
815            // Single line, no terminator
816            writer.write_all(data)?;
817            return writer.write_all(&[term]);
818        }
819    };
820
821    // Cache previous line metadata for fast comparison
822    let mut prev_len = first_end - prev_start;
823    let mut prev_prefix: u64 = if prev_len >= 8 {
824        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
825    } else {
826        0
827    };
828
829    // run_start tracks the beginning of the current contiguous output region.
830    // When a duplicate is found, we save the run as an IoSlice and skip the dup.
831    // Runs are batched and written with writev to reduce syscall overhead.
832    const BATCH: usize = 256;
833    let term_byte: [u8; 1] = [term];
834    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
835    let mut run_start: usize = 0;
836    let mut cur_start = first_end + 1;
837    let mut last_output_end = first_end + 1; // exclusive end including terminator
838
839    while cur_start < data_len {
840        // Speculative line-end detection: if the previous line had length L,
841        // check if data[cur_start + L] is the terminator. This avoids the
842        // memchr SIMD call for repetitive data where all lines have the same length.
843        // Falls back to memchr if the speculation is wrong.
844        let cur_end = {
845            let speculative = cur_start + prev_len;
846            if speculative < data_len && unsafe { *base.add(speculative) } == term {
847                speculative
848            } else {
849                match memchr::memchr(term, unsafe {
850                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
851                }) {
852                    Some(offset) => cur_start + offset,
853                    None => data_len,
854                }
855            }
856        };
857
858        let cur_len = cur_end - cur_start;
859
860        // Fast reject: if lengths differ, lines are definitely not equal.
861        // This branch structure is ordered by frequency: length mismatch is
862        // most common for unique data, prefix mismatch next, full compare last.
863        let is_dup = if cur_len != prev_len {
864            false
865        } else if cur_len == 0 {
866            true
867        } else if cur_len >= 8 {
868            // Compare cached 8-byte prefix first
869            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
870            if cur_prefix != prev_prefix {
871                false
872            } else if cur_len <= 8 {
873                true // prefix covers entire line
874            } else if cur_len <= 16 {
875                // Check last 8 bytes (overlapping)
876                unsafe {
877                    let a_tail =
878                        (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
879                    let b_tail = (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
880                    a_tail == b_tail
881                }
882            } else if cur_len <= 32 {
883                // Check bytes 8-16 and last 8 bytes
884                unsafe {
885                    let a16 = (base.add(prev_start + 8) as *const u64).read_unaligned();
886                    let b16 = (base.add(cur_start + 8) as *const u64).read_unaligned();
887                    if a16 != b16 {
888                        false
889                    } else {
890                        let a_tail =
891                            (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
892                        let b_tail =
893                            (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
894                        a_tail == b_tail
895                    }
896                }
897            } else if cur_len <= 256 {
898                // 33-256 bytes: tight u64 loop with XOR-OR batching.
899                // Compares 32 bytes per iteration (4 u64 loads), reducing
900                // branch mispredictions vs individual comparisons.
901                unsafe {
902                    let ap = base.add(prev_start);
903                    let bp = base.add(cur_start);
904                    let mut off = 8usize; // first 8 bytes already compared via prefix
905                    let mut eq = true;
906                    while off + 32 <= cur_len {
907                        let a0 = (ap.add(off) as *const u64).read_unaligned();
908                        let b0 = (bp.add(off) as *const u64).read_unaligned();
909                        let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
910                        let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
911                        let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
912                        let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
913                        let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
914                        let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
915                        if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
916                            eq = false;
917                            break;
918                        }
919                        off += 32;
920                    }
921                    if eq {
922                        while off + 8 <= cur_len {
923                            let aw = (ap.add(off) as *const u64).read_unaligned();
924                            let bw = (bp.add(off) as *const u64).read_unaligned();
925                            if aw != bw {
926                                eq = false;
927                                break;
928                            }
929                            off += 8;
930                        }
931                    }
932                    if eq && off < cur_len {
933                        let a_tail = (ap.add(cur_len - 8) as *const u64).read_unaligned();
934                        let b_tail = (bp.add(cur_len - 8) as *const u64).read_unaligned();
935                        eq = a_tail == b_tail;
936                    }
937                    eq
938                }
939            } else {
940                // For longer lines (>256), use unsafe slice comparison
941                unsafe {
942                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
943                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
944                    a == b
945                }
946            }
947        } else {
948            // Short line < 8 bytes — direct byte comparison
949            unsafe {
950                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
951                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
952                a == b
953            }
954        };
955
956        if is_dup {
957            // Duplicate found — use doubling memcmp to skip entire run of identical lines.
958            // For 50K identical lines, this takes ~12µs vs ~250µs per-line comparison.
959            let pattern_len = prev_len + 1; // line content + terminator
960            if run_start < cur_start {
961                slices.push(io::IoSlice::new(&data[run_start..cur_start]));
962                if slices.len() >= BATCH {
963                    write_all_vectored(writer, &slices)?;
964                    slices.clear();
965                }
966            }
967            // Skip all identical copies using doubling memcmp
968            let skip_end = skip_dup_run(data, cur_start, prev_start, pattern_len);
969            run_start = skip_end;
970            cur_start = skip_end;
971            // prev_start/prev_len/prev_prefix unchanged (still the group representative)
972            continue;
973        } else {
974            // Different line — update cached comparison state
975            prev_start = cur_start;
976            prev_len = cur_len;
977            prev_prefix = if cur_len >= 8 {
978                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
979            } else {
980                0
981            };
982            last_output_end = if cur_end < data_len {
983                cur_end + 1
984            } else {
985                cur_end
986            };
987        }
988
989        if cur_end < data_len {
990            cur_start = cur_end + 1;
991        } else {
992            break;
993        }
994    }
995
996    // Flush remaining run
997    if run_start < data_len {
998        slices.push(io::IoSlice::new(
999            &data[run_start..last_output_end.max(run_start)],
1000        ));
1001    }
1002
1003    // Ensure trailing terminator
1004    if data_len > 0 && unsafe { *base.add(data_len - 1) } != term {
1005        slices.push(io::IoSlice::new(&term_byte));
1006    }
1007
1008    if !slices.is_empty() {
1009        write_all_vectored(writer, &slices)?;
1010    }
1011
1012    Ok(())
1013}
1014
1015/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
1016/// positions in each chunk in parallel, then write output runs directly from
1017/// the original data. No per-chunk buffer allocation needed.
1018fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1019    use rayon::prelude::*;
1020
1021    let num_threads = rayon::current_num_threads().max(1);
1022    let chunk_target = data.len() / num_threads;
1023
1024    // Find chunk boundaries aligned to line terminators
1025    let mut boundaries = Vec::with_capacity(num_threads + 1);
1026    boundaries.push(0usize);
1027    for i in 1..num_threads {
1028        let target = i * chunk_target;
1029        if target >= data.len() {
1030            break;
1031        }
1032        if let Some(p) = memchr::memchr(term, &data[target..]) {
1033            let b = target + p + 1;
1034            if b > *boundaries.last().unwrap() && b <= data.len() {
1035                boundaries.push(b);
1036            }
1037        }
1038    }
1039    boundaries.push(data.len());
1040
1041    let n_chunks = boundaries.len() - 1;
1042    if n_chunks <= 1 {
1043        return process_default_sequential(data, writer, term);
1044    }
1045
1046    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
1047    struct ChunkResult {
1048        /// Byte ranges in the original data to output (contiguous runs)
1049        runs: Vec<(usize, usize)>,
1050        /// First line in chunk (absolute offsets into data, content without term)
1051        first_line_start: usize,
1052        first_line_end: usize,
1053        /// Last *output* line in chunk (content without term)
1054        last_line_start: usize,
1055        last_line_end: usize,
1056    }
1057
1058    let results: Vec<ChunkResult> = boundaries
1059        .windows(2)
1060        .collect::<Vec<_>>()
1061        .par_iter()
1062        .map(|w| {
1063            let chunk_start = w[0];
1064            let chunk_end = w[1];
1065            let chunk = &data[chunk_start..chunk_end];
1066
1067            let first_term = match memchr::memchr(term, chunk) {
1068                Some(pos) => pos,
1069                None => {
1070                    return ChunkResult {
1071                        runs: vec![(chunk_start, chunk_end)],
1072                        first_line_start: chunk_start,
1073                        first_line_end: chunk_end,
1074                        last_line_start: chunk_start,
1075                        last_line_end: chunk_end,
1076                    };
1077                }
1078            };
1079
1080            let first_line_start = chunk_start;
1081            let first_line_end = chunk_start + first_term;
1082
1083            let mut runs: Vec<(usize, usize)> = Vec::new();
1084            let mut run_start = chunk_start;
1085            let mut prev_start = 0usize;
1086            let mut _prev_end = first_term;
1087            let mut last_out_start = chunk_start;
1088            let mut last_out_end = first_line_end;
1089
1090            let mut prev_len = first_term;
1091            let chunk_base = chunk.as_ptr();
1092            let chunk_len = chunk.len();
1093            // Cache previous line's prefix for fast rejection
1094            let mut prev_prefix: u64 = if prev_len >= 8 {
1095                unsafe { (chunk_base as *const u64).read_unaligned() }
1096            } else {
1097                0
1098            };
1099            let mut cur_start = first_term + 1;
1100            while cur_start < chunk_len {
1101                // Speculative line-end: check if next line has same length
1102                let cur_end = {
1103                    let spec = cur_start + prev_len;
1104                    if spec < chunk_len && unsafe { *chunk_base.add(spec) } == term {
1105                        spec
1106                    } else {
1107                        match memchr::memchr(term, unsafe {
1108                            std::slice::from_raw_parts(
1109                                chunk_base.add(cur_start),
1110                                chunk_len - cur_start,
1111                            )
1112                        }) {
1113                            Some(offset) => cur_start + offset,
1114                            None => chunk_len,
1115                        }
1116                    }
1117                };
1118
1119                let cur_len = cur_end - cur_start;
1120                // Fast reject: length + prefix + full comparison
1121                let is_dup = if cur_len != prev_len {
1122                    false
1123                } else if cur_len == 0 {
1124                    true
1125                } else if cur_len >= 8 {
1126                    let cur_prefix =
1127                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() };
1128                    if cur_prefix != prev_prefix {
1129                        false
1130                    } else if cur_len <= 8 {
1131                        true
1132                    } else {
1133                        unsafe {
1134                            let a =
1135                                std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1136                            let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1137                            lines_equal_after_prefix(a, b)
1138                        }
1139                    }
1140                } else {
1141                    unsafe {
1142                        let a = std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1143                        let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1144                        a == b
1145                    }
1146                };
1147
1148                if is_dup {
1149                    // Duplicate — use doubling memcmp to skip entire run
1150                    let pattern_len = prev_len + 1;
1151                    let abs_cur = chunk_start + cur_start;
1152                    if run_start < abs_cur {
1153                        runs.push((run_start, abs_cur));
1154                    }
1155                    let skip_end = skip_dup_run(chunk, cur_start, prev_start, pattern_len);
1156                    run_start = chunk_start + skip_end;
1157                    cur_start = skip_end;
1158                    // prev_start/prev_len/prev_prefix unchanged
1159                    continue;
1160                } else {
1161                    last_out_start = chunk_start + cur_start;
1162                    last_out_end = chunk_start + cur_end;
1163                    prev_len = cur_len;
1164                    prev_prefix = if cur_len >= 8 {
1165                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() }
1166                    } else {
1167                        0
1168                    };
1169                }
1170                prev_start = cur_start;
1171                _prev_end = cur_end;
1172
1173                if cur_end < chunk_len {
1174                    cur_start = cur_end + 1;
1175                } else {
1176                    break;
1177                }
1178            }
1179
1180            // Close final run
1181            if run_start < chunk_end {
1182                runs.push((run_start, chunk_end));
1183            }
1184
1185            ChunkResult {
1186                runs,
1187                first_line_start,
1188                first_line_end,
1189                last_line_start: last_out_start,
1190                last_line_end: last_out_end,
1191            }
1192        })
1193        .collect();
1194
1195    // Write results, adjusting cross-chunk boundaries.
1196    // Batch output runs via write_vectored to reduce syscall count.
1197    const BATCH: usize = 256;
1198    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
1199    for (i, result) in results.iter().enumerate() {
1200        let skip_first = if i > 0 {
1201            let prev = &results[i - 1];
1202            let prev_last = &data[prev.last_line_start..prev.last_line_end];
1203            let cur_first = &data[result.first_line_start..result.first_line_end];
1204            lines_equal_fast(prev_last, cur_first)
1205        } else {
1206            false
1207        };
1208
1209        let skip_end = if skip_first {
1210            // Skip bytes up to and including the first line's terminator
1211            result.first_line_end + 1
1212        } else {
1213            0
1214        };
1215
1216        for &(rs, re) in &result.runs {
1217            let actual_start = rs.max(skip_end);
1218            if actual_start < re {
1219                slices.push(io::IoSlice::new(&data[actual_start..re]));
1220                if slices.len() >= BATCH {
1221                    write_all_vectored(writer, &slices)?;
1222                    slices.clear();
1223                }
1224            }
1225        }
1226    }
1227    if !slices.is_empty() {
1228        write_all_vectored(writer, &slices)?;
1229    }
1230
1231    // Ensure trailing terminator
1232    if !data.is_empty() && *data.last().unwrap() != term {
1233        writer.write_all(&[term])?;
1234    }
1235
1236    Ok(())
1237}
1238
1239/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
1240/// Zero-copy: writes directly from mmap data through BufWriter.
1241/// Uses speculative line-end detection and 8-byte prefix caching for fast
1242/// duplicate detection without full memcmp.
1243fn process_filter_fast_singlepass(
1244    data: &[u8],
1245    writer: &mut impl Write,
1246    config: &UniqConfig,
1247    term: u8,
1248) -> io::Result<()> {
1249    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1250    let data_len = data.len();
1251    let base = data.as_ptr();
1252
1253    let first_term = match memchr::memchr(term, data) {
1254        Some(pos) => pos,
1255        None => {
1256            // Single line: unique (count=1)
1257            if !repeated {
1258                writer.write_all(data)?;
1259                writer.write_all(&[term])?;
1260            }
1261            return Ok(());
1262        }
1263    };
1264
1265    let mut prev_start: usize = 0;
1266    let mut prev_end: usize = first_term;
1267    let mut prev_len = prev_end;
1268    let mut prev_prefix: u64 = if prev_len >= 8 {
1269        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1270    } else {
1271        0
1272    };
1273    let mut count: u64 = 1;
1274    let mut cur_start = first_term + 1;
1275
1276    // Batch output using IoSlice write_vectored to reduce syscall overhead.
1277    // Each output line needs 2 slices: content + terminator.
1278    const BATCH: usize = 512;
1279    let term_slice: [u8; 1] = [term];
1280    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1281
1282    while cur_start < data_len {
1283        // Speculative line-end detection
1284        let cur_end = {
1285            let speculative = cur_start + prev_len;
1286            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1287                speculative
1288            } else {
1289                match memchr::memchr(term, unsafe {
1290                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1291                }) {
1292                    Some(offset) => cur_start + offset,
1293                    None => data_len,
1294                }
1295            }
1296        };
1297
1298        let cur_len = cur_end - cur_start;
1299
1300        // Fast reject using length + 8-byte prefix.
1301        // After prefix match, use lines_equal_after_prefix which skips
1302        // the already-checked length/prefix/empty checks.
1303        let is_dup = if cur_len != prev_len {
1304            false
1305        } else if cur_len == 0 {
1306            true
1307        } else if cur_len >= 8 {
1308            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1309            if cur_prefix != prev_prefix {
1310                false
1311            } else if cur_len <= 8 {
1312                true
1313            } else {
1314                unsafe {
1315                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1316                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1317                    lines_equal_after_prefix(a, b)
1318                }
1319            }
1320        } else {
1321            unsafe {
1322                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1323                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1324                a == b
1325            }
1326        };
1327
1328        if is_dup {
1329            // Use doubling memcmp to skip entire duplicate run
1330            let pattern_len = prev_len + 1;
1331            let skip_end = skip_dup_run(data, cur_start, prev_start, pattern_len);
1332            let skipped = (skip_end - cur_start) / pattern_len;
1333            count += skipped as u64;
1334            cur_start = skip_end;
1335            continue;
1336        } else {
1337            let should_print = if repeated { count > 1 } else { count == 1 };
1338            if should_print {
1339                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1340                slices.push(io::IoSlice::new(&term_slice));
1341                if slices.len() >= BATCH * 2 {
1342                    write_all_vectored(writer, &slices)?;
1343                    slices.clear();
1344                }
1345            }
1346            prev_start = cur_start;
1347            prev_end = cur_end;
1348            prev_len = cur_len;
1349            prev_prefix = if cur_len >= 8 {
1350                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1351            } else {
1352                0
1353            };
1354            count = 1;
1355        }
1356
1357        if cur_end < data_len {
1358            cur_start = cur_end + 1;
1359        } else {
1360            break;
1361        }
1362    }
1363
1364    // Output last group
1365    let should_print = if repeated { count > 1 } else { count == 1 };
1366    if should_print {
1367        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1368        slices.push(io::IoSlice::new(&term_slice));
1369    }
1370    if !slices.is_empty() {
1371        write_all_vectored(writer, &slices)?;
1372    }
1373
1374    Ok(())
1375}
1376
1377/// Fast single-pass for count mode (-c) with all standard output modes.
1378/// Zero line_starts allocation: scans with memchr, counts groups inline,
1379/// and writes count-prefixed lines directly.
1380/// Uses cached length comparison for fast duplicate rejection.
1381/// Uses raw pointer arithmetic to avoid bounds checking.
1382///
1383/// Zero-copy output: uses writev (IoSlice) to write count prefixes from a
1384/// small arena + line content directly from mmap'd data + terminator bytes.
1385/// This avoids copying line content into an intermediate buffer entirely.
1386///
1387/// Optimizations:
1388/// - Speculative line-end detection: if all lines have the same length (common
1389///   in repetitive data), we can skip the memchr SIMD scan entirely by checking
1390///   if data[cur_start + prev_len] is the terminator.
1391/// - Cached 8-byte prefix rejection: avoids full comparison for most non-equal lines.
1392/// - IoSlice writev batching: eliminates memcpy of line content.
1393fn process_count_fast_singlepass(
1394    data: &[u8],
1395    writer: &mut impl Write,
1396    config: &UniqConfig,
1397    term: u8,
1398) -> io::Result<()> {
1399    let data_len = data.len();
1400    let base = data.as_ptr();
1401    let first_term = match memchr::memchr(term, data) {
1402        Some(pos) => pos,
1403        None => {
1404            // Single line: count=1
1405            let should_print = match config.mode {
1406                OutputMode::Default => true,
1407                OutputMode::RepeatedOnly => false,
1408                OutputMode::UniqueOnly => true,
1409                _ => true,
1410            };
1411            if should_print {
1412                write_count_line(writer, 1, data, term)?;
1413            }
1414            return Ok(());
1415        }
1416    };
1417
1418    let mut prev_start: usize = 0;
1419    let mut prev_end: usize = first_term;
1420    let mut prev_len = prev_end;
1421    let mut prev_prefix: u64 = if prev_len >= 8 {
1422        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1423    } else {
1424        0
1425    };
1426    let mut count: u64 = 1;
1427    let mut cur_start = first_term + 1;
1428
1429    // Zero-copy writev batching: accumulate groups as (prefix_offset, prefix_len,
1430    // line_start, line_end) tuples, with prefixes stored in a flat byte buffer.
1431    // Build IoSlice arrays at flush time to avoid borrow conflicts.
1432    // Line content points directly into mmap'd data — zero copy.
1433    const BATCH: usize = 340;
1434    const PREFIX_SLOT: usize = 28; // max prefix size per group
1435    let term_slice: [u8; 1] = [term];
1436    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1437    // Each group: (prefix_len, line_start_in_data, line_end_in_data)
1438    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1439
1440    while cur_start < data_len {
1441        let cur_end = {
1442            let speculative = cur_start + prev_len;
1443            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1444                speculative
1445            } else {
1446                match memchr::memchr(term, unsafe {
1447                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1448                }) {
1449                    Some(offset) => cur_start + offset,
1450                    None => data_len,
1451                }
1452            }
1453        };
1454
1455        let cur_len = cur_end - cur_start;
1456
1457        let is_dup = if cur_len != prev_len {
1458            false
1459        } else if cur_len == 0 {
1460            true
1461        } else if cur_len >= 8 {
1462            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1463            if cur_prefix != prev_prefix {
1464                false
1465            } else if cur_len <= 8 {
1466                true
1467            } else {
1468                unsafe {
1469                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1470                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1471                    lines_equal_after_prefix(a, b)
1472                }
1473            }
1474        } else {
1475            unsafe {
1476                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1477                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1478                a == b
1479            }
1480        };
1481
1482        if is_dup {
1483            // Use doubling memcmp to skip entire duplicate run
1484            let pattern_len = prev_len + 1;
1485            let skip_end = skip_dup_run(data, cur_start, prev_start, pattern_len);
1486            let skipped = (skip_end - cur_start) / pattern_len;
1487            count += skipped as u64;
1488            cur_start = skip_end;
1489            continue;
1490        } else {
1491            let should_print = match config.mode {
1492                OutputMode::RepeatedOnly => count > 1,
1493                OutputMode::UniqueOnly => count == 1,
1494                _ => true,
1495            };
1496            if should_print {
1497                let idx = groups.len();
1498                let prefix_off = idx * PREFIX_SLOT;
1499                let prefix_len = format_count_prefix_into(
1500                    count,
1501                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1502                );
1503                groups.push((prefix_len, prev_start, prev_end));
1504
1505                if groups.len() >= BATCH {
1506                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1507                    groups.clear();
1508                    // Re-fill prefix_buf with spaces for next batch
1509                    prefix_buf.fill(b' ');
1510                }
1511            }
1512            prev_start = cur_start;
1513            prev_end = cur_end;
1514            prev_len = cur_len;
1515            prev_prefix = if cur_len >= 8 {
1516                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1517            } else {
1518                0
1519            };
1520            count = 1;
1521        }
1522
1523        if cur_end < data_len {
1524            cur_start = cur_end + 1;
1525        } else {
1526            break;
1527        }
1528    }
1529
1530    // Output last group
1531    let should_print = match config.mode {
1532        OutputMode::RepeatedOnly => count > 1,
1533        OutputMode::UniqueOnly => count == 1,
1534        _ => true,
1535    };
1536    if should_print {
1537        let idx = groups.len();
1538        let prefix_off = idx * PREFIX_SLOT;
1539        let prefix_len =
1540            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1541        groups.push((prefix_len, prev_start, prev_end));
1542    }
1543    if !groups.is_empty() {
1544        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1545    }
1546
1547    Ok(())
1548}
1549
1550/// Flush batched count groups using write_vectored (writev).
1551/// Builds IoSlice arrays from the prefix buffer and mmap'd data.
1552#[inline]
1553fn flush_count_groups(
1554    writer: &mut impl Write,
1555    prefix_buf: &[u8],
1556    groups: &[(usize, usize, usize)],
1557    term_slice: &[u8; 1],
1558    data: &[u8],
1559) -> io::Result<()> {
1560    const PREFIX_SLOT: usize = 28;
1561    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(groups.len() * 3);
1562    for (i, &(prefix_len, line_start, line_end)) in groups.iter().enumerate() {
1563        let prefix_off = i * PREFIX_SLOT;
1564        slices.push(io::IoSlice::new(
1565            &prefix_buf[prefix_off..prefix_off + prefix_len],
1566        ));
1567        slices.push(io::IoSlice::new(&data[line_start..line_end]));
1568        slices.push(io::IoSlice::new(term_slice));
1569    }
1570    write_all_vectored(writer, &slices)
1571}
1572
1573/// Format a count prefix into a buffer slot, returning the prefix length.
1574/// GNU format: "%7lu " — right-aligned count in 7-char field, followed by space.
1575/// Buffer must be pre-filled with spaces and at least 28 bytes.
1576#[inline(always)]
1577fn format_count_prefix_into(count: u64, buf: &mut [u8]) -> usize {
1578    if count <= 9 {
1579        buf[6] = b'0' + count as u8;
1580        buf[7] = b' ';
1581        return 8;
1582    }
1583    // Use itoa on a temp array, then copy
1584    let mut tmp = [b' '; 28];
1585    let digits = itoa_right_aligned_into(&mut tmp, count);
1586    let width = digits.max(7);
1587    tmp[width] = b' ';
1588    let len = width + 1;
1589    buf[..len].copy_from_slice(&tmp[..len]);
1590    len
1591}
1592
1593/// Fast single-pass for case-insensitive (-i) default mode.
1594/// Uses u64 SWAR prefix caching, IoSlice batching, and speculative line-end detection.
1595fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1596    let data_len = data.len();
1597    let base = data.as_ptr();
1598
1599    let first_end = match memchr::memchr(term, data) {
1600        Some(pos) => pos,
1601        None => {
1602            writer.write_all(data)?;
1603            return writer.write_all(&[term]);
1604        }
1605    };
1606
1607    let mut prev_start: usize = 0;
1608    let mut prev_len = first_end;
1609    // Cache case-folded 8-byte prefix (clear bit 5 → uppercase) for fast rejection
1610    let mut prev_prefix_upper: u64 = if prev_len >= 8 {
1611        unsafe { (base.add(prev_start) as *const u64).read_unaligned() & 0xDFDFDFDFDFDFDFDFu64 }
1612    } else {
1613        0
1614    };
1615
1616    // Run-tracking with IoSlice batching (mirrors process_default_sequential)
1617    const BATCH: usize = 256;
1618    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
1619    let mut run_start: usize = 0;
1620    let mut cur_start = first_end + 1;
1621
1622    while cur_start < data_len {
1623        // Speculative line-end detection
1624        let cur_end = {
1625            let speculative = cur_start + prev_len;
1626            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1627                speculative
1628            } else {
1629                match memchr::memchr(term, unsafe {
1630                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1631                }) {
1632                    Some(offset) => cur_start + offset,
1633                    None => data_len,
1634                }
1635            }
1636        };
1637
1638        let cur_len = cur_end - cur_start;
1639
1640        // Fast multi-level rejection: length → 8-byte prefix (upper-cased) → full CI compare
1641        let is_dup = if cur_len != prev_len {
1642            false
1643        } else if cur_len == 0 {
1644            true
1645        } else if cur_len >= 8 {
1646            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1647            let cur_prefix_upper = cur_prefix & 0xDFDFDFDFDFDFDFDFu64;
1648            if cur_prefix_upper != prev_prefix_upper {
1649                false
1650            } else {
1651                unsafe {
1652                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1653                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1654                    lines_equal_case_insensitive(a, b)
1655                }
1656            }
1657        } else {
1658            unsafe {
1659                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1660                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1661                lines_equal_case_insensitive(a, b)
1662            }
1663        };
1664
1665        if is_dup {
1666            // Duplicate — save current run, skip the duplicate line
1667            if run_start < cur_start {
1668                slices.push(io::IoSlice::new(&data[run_start..cur_start]));
1669                if slices.len() >= BATCH {
1670                    write_all_vectored(writer, &slices)?;
1671                    slices.clear();
1672                }
1673            }
1674            run_start = if cur_end < data_len {
1675                cur_end + 1
1676            } else {
1677                cur_end
1678            };
1679        } else {
1680            prev_start = cur_start;
1681            prev_len = cur_len;
1682            prev_prefix_upper = if cur_len >= 8 {
1683                unsafe {
1684                    (base.add(cur_start) as *const u64).read_unaligned() & 0xDFDFDFDFDFDFDFDFu64
1685                }
1686            } else {
1687                0
1688            };
1689        }
1690
1691        if cur_end < data_len {
1692            cur_start = cur_end + 1;
1693        } else {
1694            break;
1695        }
1696    }
1697
1698    // Flush remaining run
1699    if run_start < data_len {
1700        slices.push(io::IoSlice::new(&data[run_start..data_len]));
1701    }
1702    // Ensure trailing terminator
1703    if !data.is_empty() && data[data_len - 1] != term {
1704        let term_byte: [u8; 1] = [term];
1705        slices.push(io::IoSlice::new(&term_byte));
1706        write_all_vectored(writer, &slices)?;
1707    } else if !slices.is_empty() {
1708        write_all_vectored(writer, &slices)?;
1709    }
1710
1711    Ok(())
1712}
1713
1714/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1715/// Zero-copy: writes directly from mmap data through BufWriter.
1716/// Uses speculative line-end detection and length-based early rejection.
1717fn process_filter_ci_singlepass(
1718    data: &[u8],
1719    writer: &mut impl Write,
1720    config: &UniqConfig,
1721    term: u8,
1722) -> io::Result<()> {
1723    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1724    let data_len = data.len();
1725    let base = data.as_ptr();
1726
1727    let first_term = match memchr::memchr(term, data) {
1728        Some(pos) => pos,
1729        None => {
1730            if !repeated {
1731                writer.write_all(data)?;
1732                writer.write_all(&[term])?;
1733            }
1734            return Ok(());
1735        }
1736    };
1737
1738    let mut prev_start: usize = 0;
1739    let mut prev_end: usize = first_term;
1740    let mut prev_len = prev_end;
1741    let mut count: u64 = 1;
1742    let mut cur_start = first_term + 1;
1743
1744    // Batch output using IoSlice write_vectored
1745    const BATCH: usize = 512;
1746    let term_slice: [u8; 1] = [term];
1747    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1748
1749    while cur_start < data_len {
1750        // Speculative line-end detection
1751        let cur_end = {
1752            let speculative = cur_start + prev_len;
1753            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1754                speculative
1755            } else {
1756                match memchr::memchr(term, unsafe {
1757                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1758                }) {
1759                    Some(offset) => cur_start + offset,
1760                    None => data_len,
1761                }
1762            }
1763        };
1764
1765        let cur_len = cur_end - cur_start;
1766        // Length check + case-insensitive comparison
1767        let is_dup = cur_len == prev_len
1768            && lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]);
1769
1770        if is_dup {
1771            count += 1;
1772        } else {
1773            let should_print = if repeated { count > 1 } else { count == 1 };
1774            if should_print {
1775                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1776                slices.push(io::IoSlice::new(&term_slice));
1777                if slices.len() >= BATCH * 2 {
1778                    write_all_vectored(writer, &slices)?;
1779                    slices.clear();
1780                }
1781            }
1782            prev_start = cur_start;
1783            prev_end = cur_end;
1784            prev_len = cur_len;
1785            count = 1;
1786        }
1787
1788        if cur_end < data_len {
1789            cur_start = cur_end + 1;
1790        } else {
1791            break;
1792        }
1793    }
1794
1795    let should_print = if repeated { count > 1 } else { count == 1 };
1796    if should_print {
1797        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1798        slices.push(io::IoSlice::new(&term_slice));
1799    }
1800    if !slices.is_empty() {
1801        write_all_vectored(writer, &slices)?;
1802    }
1803
1804    Ok(())
1805}
1806
1807/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1808/// Writes directly to BufWriter — no batch_buf allocation needed.
1809fn process_count_ci_singlepass(
1810    data: &[u8],
1811    writer: &mut impl Write,
1812    config: &UniqConfig,
1813    term: u8,
1814) -> io::Result<()> {
1815    let first_term = match memchr::memchr(term, data) {
1816        Some(pos) => pos,
1817        None => {
1818            let should_print = match config.mode {
1819                OutputMode::Default => true,
1820                OutputMode::RepeatedOnly => false,
1821                OutputMode::UniqueOnly => true,
1822                _ => true,
1823            };
1824            if should_print {
1825                write_count_line(writer, 1, data, term)?;
1826            }
1827            return Ok(());
1828        }
1829    };
1830
1831    let is_default = matches!(config.mode, OutputMode::Default);
1832
1833    let mut prev_start: usize = 0;
1834    let mut prev_end: usize = first_term;
1835    let mut count: u64 = 1;
1836    let mut cur_start = first_term + 1;
1837
1838    // Zero-copy writev batching: same approach as process_count_fast_singlepass
1839    const BATCH: usize = 340;
1840    const PREFIX_SLOT: usize = 28;
1841    let term_slice: [u8; 1] = [term];
1842    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1843    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1844
1845    let base = data.as_ptr();
1846    let data_len = data.len();
1847    let mut prev_len = prev_end - prev_start;
1848
1849    while cur_start < data_len {
1850        // Speculative line-end detection
1851        let cur_end = {
1852            let speculative = cur_start + prev_len;
1853            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1854                speculative
1855            } else {
1856                match memchr::memchr(term, unsafe {
1857                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1858                }) {
1859                    Some(offset) => cur_start + offset,
1860                    None => data_len,
1861                }
1862            }
1863        };
1864
1865        let cur_len = cur_end - cur_start;
1866        // Length-based early rejection before expensive case-insensitive compare
1867        let is_dup = cur_len == prev_len
1868            && data[prev_start..prev_end].eq_ignore_ascii_case(&data[cur_start..cur_end]);
1869
1870        if is_dup {
1871            count += 1;
1872        } else {
1873            let should_print = if is_default {
1874                true
1875            } else {
1876                match config.mode {
1877                    OutputMode::RepeatedOnly => count > 1,
1878                    OutputMode::UniqueOnly => count == 1,
1879                    _ => true,
1880                }
1881            };
1882            if should_print {
1883                let idx = groups.len();
1884                let prefix_off = idx * PREFIX_SLOT;
1885                let prefix_len = format_count_prefix_into(
1886                    count,
1887                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1888                );
1889                groups.push((prefix_len, prev_start, prev_end));
1890
1891                if groups.len() >= BATCH {
1892                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1893                    groups.clear();
1894                    prefix_buf.fill(b' ');
1895                }
1896            }
1897            prev_start = cur_start;
1898            prev_end = cur_end;
1899            prev_len = cur_len;
1900            count = 1;
1901        }
1902
1903        if cur_end < data_len {
1904            cur_start = cur_end + 1;
1905        } else {
1906            break;
1907        }
1908    }
1909
1910    let should_print = if is_default {
1911        true
1912    } else {
1913        match config.mode {
1914            OutputMode::RepeatedOnly => count > 1,
1915            OutputMode::UniqueOnly => count == 1,
1916            _ => true,
1917        }
1918    };
1919    if should_print {
1920        let idx = groups.len();
1921        let prefix_off = idx * PREFIX_SLOT;
1922        let prefix_len =
1923            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1924        groups.push((prefix_len, prev_start, prev_end));
1925    }
1926    if !groups.is_empty() {
1927        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1928    }
1929
1930    Ok(())
1931}
1932
1933/// Output a group for standard modes (bytes path).
1934#[inline(always)]
1935fn output_group_bytes(
1936    writer: &mut impl Write,
1937    content: &[u8],
1938    full: &[u8],
1939    count: u64,
1940    config: &UniqConfig,
1941    term: u8,
1942) -> io::Result<()> {
1943    let should_print = match config.mode {
1944        OutputMode::Default => true,
1945        OutputMode::RepeatedOnly => count > 1,
1946        OutputMode::UniqueOnly => count == 1,
1947        _ => true,
1948    };
1949
1950    if should_print {
1951        if config.count {
1952            write_count_line(writer, count, content, term)?;
1953        } else {
1954            writer.write_all(full)?;
1955            // Add terminator if the original line didn't have one
1956            if full.len() == content.len() {
1957                writer.write_all(&[term])?;
1958            }
1959        }
1960    }
1961
1962    Ok(())
1963}
1964
1965/// Process --all-repeated / -D mode on byte slices.
1966fn process_all_repeated_bytes(
1967    data: &[u8],
1968    writer: &mut impl Write,
1969    config: &UniqConfig,
1970    method: AllRepeatedMethod,
1971    term: u8,
1972) -> io::Result<()> {
1973    let mut lines = LineIter::new(data, term);
1974
1975    let first = match lines.next() {
1976        Some(v) => v,
1977        None => return Ok(()),
1978    };
1979
1980    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1981    // For all-repeated we need to buffer group lines since we only print if count > 1
1982    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1983    group_lines.push(first);
1984    let mut first_group_printed = false;
1985
1986    let fast = !needs_key_extraction(config) && !config.ignore_case;
1987
1988    for (cur_content, cur_full) in lines {
1989        let prev_content = group_lines.last().unwrap().0;
1990        let equal = if fast {
1991            lines_equal_fast(prev_content, cur_content)
1992        } else {
1993            lines_equal(prev_content, cur_content, config)
1994        };
1995
1996        if equal {
1997            group_lines.push((cur_content, cur_full));
1998        } else {
1999            // Flush group
2000            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
2001            group_lines.clear();
2002            group_lines.push((cur_content, cur_full));
2003        }
2004    }
2005
2006    // Flush last group
2007    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
2008
2009    Ok(())
2010}
2011
2012/// Flush a group for --all-repeated mode (bytes path).
2013fn flush_all_repeated_bytes(
2014    writer: &mut impl Write,
2015    group: &[(&[u8], &[u8])],
2016    method: AllRepeatedMethod,
2017    first_group_printed: &mut bool,
2018    term: u8,
2019) -> io::Result<()> {
2020    if group.len() <= 1 {
2021        return Ok(()); // Not a duplicate group
2022    }
2023
2024    match method {
2025        AllRepeatedMethod::Prepend => {
2026            writer.write_all(&[term])?;
2027        }
2028        AllRepeatedMethod::Separate => {
2029            if *first_group_printed {
2030                writer.write_all(&[term])?;
2031            }
2032        }
2033        AllRepeatedMethod::None => {}
2034    }
2035
2036    for &(content, full) in group {
2037        writer.write_all(full)?;
2038        if full.len() == content.len() {
2039            writer.write_all(&[term])?;
2040        }
2041    }
2042
2043    *first_group_printed = true;
2044    Ok(())
2045}
2046
2047/// Process --group mode on byte slices.
2048fn process_group_bytes(
2049    data: &[u8],
2050    writer: &mut impl Write,
2051    config: &UniqConfig,
2052    method: GroupMethod,
2053    term: u8,
2054) -> io::Result<()> {
2055    let mut lines = LineIter::new(data, term);
2056
2057    let (prev_content, prev_full) = match lines.next() {
2058        Some(v) => v,
2059        None => return Ok(()),
2060    };
2061
2062    // Prepend/Both: separator before first group
2063    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
2064        writer.write_all(&[term])?;
2065    }
2066
2067    // Write first line
2068    writer.write_all(prev_full)?;
2069    if prev_full.len() == prev_content.len() {
2070        writer.write_all(&[term])?;
2071    }
2072
2073    let mut prev_content = prev_content;
2074    let fast = !needs_key_extraction(config) && !config.ignore_case;
2075
2076    for (cur_content, cur_full) in lines {
2077        let equal = if fast {
2078            lines_equal_fast(prev_content, cur_content)
2079        } else {
2080            lines_equal(prev_content, cur_content, config)
2081        };
2082
2083        if !equal {
2084            // New group — write separator
2085            writer.write_all(&[term])?;
2086        }
2087
2088        writer.write_all(cur_full)?;
2089        if cur_full.len() == cur_content.len() {
2090            writer.write_all(&[term])?;
2091        }
2092
2093        prev_content = cur_content;
2094    }
2095
2096    // Append/Both: separator after last group
2097    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
2098        writer.write_all(&[term])?;
2099    }
2100
2101    Ok(())
2102}
2103
2104// ============================================================================
2105// Streaming processing (for stdin / pipe input)
2106// ============================================================================
2107
2108/// Main streaming uniq processor.
2109/// Reads from `input`, writes to `output`.
2110pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
2111    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
2112    let mut writer = BufWriter::with_capacity(32 * 1024 * 1024, output);
2113    let term = if config.zero_terminated { b'\0' } else { b'\n' };
2114
2115    match config.mode {
2116        OutputMode::Group(method) => {
2117            process_group_stream(reader, &mut writer, config, method, term)?;
2118        }
2119        OutputMode::AllRepeated(method) => {
2120            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
2121        }
2122        _ => {
2123            process_standard_stream(reader, &mut writer, config, term)?;
2124        }
2125    }
2126
2127    writer.flush()?;
2128    Ok(())
2129}
2130
2131/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
2132fn process_standard_stream<R: BufRead, W: Write>(
2133    mut reader: R,
2134    writer: &mut W,
2135    config: &UniqConfig,
2136    term: u8,
2137) -> io::Result<()> {
2138    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2139    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2140
2141    // Read first line
2142    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2143        return Ok(()); // empty input
2144    }
2145    let mut count: u64 = 1;
2146
2147    loop {
2148        current_line.clear();
2149        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2150
2151        if bytes_read == 0 {
2152            // End of input — output the last group
2153            output_group_stream(writer, &prev_line, count, config, term)?;
2154            break;
2155        }
2156
2157        if compare_lines_stream(&prev_line, &current_line, config, term) {
2158            count += 1;
2159        } else {
2160            output_group_stream(writer, &prev_line, count, config, term)?;
2161            std::mem::swap(&mut prev_line, &mut current_line);
2162            count = 1;
2163        }
2164    }
2165
2166    Ok(())
2167}
2168
2169/// Compare two lines (with terminators) in streaming mode.
2170#[inline(always)]
2171fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
2172    let a_stripped = strip_term(a, term);
2173    let b_stripped = strip_term(b, term);
2174    lines_equal(a_stripped, b_stripped, config)
2175}
2176
2177/// Strip terminator from end of line.
2178#[inline(always)]
2179fn strip_term(line: &[u8], term: u8) -> &[u8] {
2180    if line.last() == Some(&term) {
2181        &line[..line.len() - 1]
2182    } else {
2183        line
2184    }
2185}
2186
2187/// Output a group in streaming mode.
2188#[inline(always)]
2189fn output_group_stream(
2190    writer: &mut impl Write,
2191    line: &[u8],
2192    count: u64,
2193    config: &UniqConfig,
2194    term: u8,
2195) -> io::Result<()> {
2196    let should_print = match config.mode {
2197        OutputMode::Default => true,
2198        OutputMode::RepeatedOnly => count > 1,
2199        OutputMode::UniqueOnly => count == 1,
2200        _ => true,
2201    };
2202
2203    if should_print {
2204        let content = strip_term(line, term);
2205        if config.count {
2206            write_count_line(writer, count, content, term)?;
2207        } else {
2208            writer.write_all(content)?;
2209            writer.write_all(&[term])?;
2210        }
2211    }
2212
2213    Ok(())
2214}
2215
2216/// Process --all-repeated / -D mode (streaming).
2217fn process_all_repeated_stream<R: BufRead, W: Write>(
2218    mut reader: R,
2219    writer: &mut W,
2220    config: &UniqConfig,
2221    method: AllRepeatedMethod,
2222    term: u8,
2223) -> io::Result<()> {
2224    let mut group: Vec<Vec<u8>> = Vec::new();
2225    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2226    let mut first_group_printed = false;
2227
2228    current_line.clear();
2229    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
2230        return Ok(());
2231    }
2232    group.push(current_line.clone());
2233
2234    loop {
2235        current_line.clear();
2236        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2237
2238        if bytes_read == 0 {
2239            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2240            break;
2241        }
2242
2243        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
2244            group.push(current_line.clone());
2245        } else {
2246            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2247            group.clear();
2248            group.push(current_line.clone());
2249        }
2250    }
2251
2252    Ok(())
2253}
2254
2255/// Flush a group for --all-repeated mode (streaming).
2256fn flush_all_repeated_stream(
2257    writer: &mut impl Write,
2258    group: &[Vec<u8>],
2259    method: AllRepeatedMethod,
2260    first_group_printed: &mut bool,
2261    term: u8,
2262) -> io::Result<()> {
2263    if group.len() <= 1 {
2264        return Ok(());
2265    }
2266
2267    match method {
2268        AllRepeatedMethod::Prepend => {
2269            writer.write_all(&[term])?;
2270        }
2271        AllRepeatedMethod::Separate => {
2272            if *first_group_printed {
2273                writer.write_all(&[term])?;
2274            }
2275        }
2276        AllRepeatedMethod::None => {}
2277    }
2278
2279    for line in group {
2280        let content = strip_term(line, term);
2281        writer.write_all(content)?;
2282        writer.write_all(&[term])?;
2283    }
2284
2285    *first_group_printed = true;
2286    Ok(())
2287}
2288
2289/// Process --group mode (streaming).
2290fn process_group_stream<R: BufRead, W: Write>(
2291    mut reader: R,
2292    writer: &mut W,
2293    config: &UniqConfig,
2294    method: GroupMethod,
2295    term: u8,
2296) -> io::Result<()> {
2297    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2298    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2299
2300    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2301        return Ok(());
2302    }
2303
2304    // Prepend/Both: separator before first group
2305    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
2306        writer.write_all(&[term])?;
2307    }
2308
2309    let content = strip_term(&prev_line, term);
2310    writer.write_all(content)?;
2311    writer.write_all(&[term])?;
2312
2313    loop {
2314        current_line.clear();
2315        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2316
2317        if bytes_read == 0 {
2318            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
2319                writer.write_all(&[term])?;
2320            }
2321            break;
2322        }
2323
2324        if !compare_lines_stream(&prev_line, &current_line, config, term) {
2325            writer.write_all(&[term])?;
2326        }
2327
2328        let content = strip_term(&current_line, term);
2329        writer.write_all(content)?;
2330        writer.write_all(&[term])?;
2331
2332        std::mem::swap(&mut prev_line, &mut current_line);
2333    }
2334
2335    Ok(())
2336}
2337
2338/// Read a line terminated by the given byte (newline or NUL).
2339/// Returns number of bytes read (0 = EOF).
2340#[inline(always)]
2341fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
2342    reader.read_until(term, buf)
2343}