Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// Write all IoSlices to the writer, handling partial writes correctly.
10fn write_all_vectored(writer: &mut impl Write, slices: &[io::IoSlice<'_>]) -> io::Result<()> {
11    let n = writer.write_vectored(slices)?;
12    let expected: usize = slices.iter().map(|s| s.len()).sum();
13    if n >= expected {
14        return Ok(());
15    }
16    if n == 0 && expected > 0 {
17        return Err(io::Error::new(
18            io::ErrorKind::WriteZero,
19            "write_vectored returned 0",
20        ));
21    }
22    // Slow path: partial write — fall back to write_all per remaining slice.
23    let mut consumed = n;
24    for slice in slices {
25        if consumed == 0 {
26            writer.write_all(slice)?;
27        } else if consumed >= slice.len() {
28            consumed -= slice.len();
29        } else {
30            writer.write_all(&slice[consumed..])?;
31            consumed = 0;
32        }
33    }
34    Ok(())
35}
36
37/// How to delimit groups when using --all-repeated
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum AllRepeatedMethod {
40    None,
41    Prepend,
42    Separate,
43}
44
45/// How to delimit groups when using --group
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum GroupMethod {
48    Separate,
49    Prepend,
50    Append,
51    Both,
52}
53
54/// Output mode for uniq
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum OutputMode {
57    /// Default: print unique lines and first of each duplicate group
58    Default,
59    /// -d: print only first line of duplicate groups
60    RepeatedOnly,
61    /// -D / --all-repeated: print ALL duplicate lines
62    AllRepeated(AllRepeatedMethod),
63    /// -u: print only lines that are NOT duplicated
64    UniqueOnly,
65    /// --group: show all items with group separators
66    Group(GroupMethod),
67}
68
69/// Configuration for uniq processing
70#[derive(Debug, Clone)]
71pub struct UniqConfig {
72    pub mode: OutputMode,
73    pub count: bool,
74    pub ignore_case: bool,
75    pub skip_fields: usize,
76    pub skip_chars: usize,
77    pub check_chars: Option<usize>,
78    pub zero_terminated: bool,
79}
80
81impl Default for UniqConfig {
82    fn default() -> Self {
83        Self {
84            mode: OutputMode::Default,
85            count: false,
86            ignore_case: false,
87            skip_fields: 0,
88            skip_chars: 0,
89            check_chars: None,
90            zero_terminated: false,
91        }
92    }
93}
94
95/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
96/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
97#[inline(always)]
98fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
99    let mut start = 0;
100    let len = line.len();
101
102    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
103    for _ in 0..config.skip_fields {
104        // Skip blanks (space and tab)
105        while start < len && (line[start] == b' ' || line[start] == b'\t') {
106            start += 1;
107        }
108        // Skip non-blanks (field content)
109        while start < len && line[start] != b' ' && line[start] != b'\t' {
110            start += 1;
111        }
112    }
113
114    // Skip N characters
115    if config.skip_chars > 0 {
116        let remaining = len - start;
117        let skip = config.skip_chars.min(remaining);
118        start += skip;
119    }
120
121    let slice = &line[start..];
122
123    // Limit comparison to N characters
124    if let Some(w) = config.check_chars {
125        if w < slice.len() {
126            return &slice[..w];
127        }
128    }
129
130    slice
131}
132
133/// Compare two lines (without terminators) using the config's comparison rules.
134#[inline(always)]
135fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
136    let sa = get_compare_slice(a, config);
137    let sb = get_compare_slice(b, config);
138
139    if config.ignore_case {
140        sa.eq_ignore_ascii_case(sb)
141    } else {
142        sa == sb
143    }
144}
145
146/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
147/// Uses length check + 8-byte prefix rejection before full comparison.
148#[inline(always)]
149fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
150    let alen = a.len();
151    if alen != b.len() {
152        return false;
153    }
154    if alen == 0 {
155        return true;
156    }
157    a.eq_ignore_ascii_case(b)
158}
159
160/// Check if config requires field/char skipping or char limiting.
161#[inline(always)]
162fn needs_key_extraction(config: &UniqConfig) -> bool {
163    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
164}
165
166/// Fast path comparison: no field/char extraction needed, no case folding.
167/// Uses pointer+length equality shortcut and multi-word prefix rejection.
168/// For short lines (<= 32 bytes, common in many-dups data), avoids the
169/// full memcmp call overhead by doing direct word comparisons.
170/// For medium lines (33-256 bytes), uses a tight u64 loop covering the
171/// full line without falling through to memcmp.
172#[inline(always)]
173fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
174    let alen = a.len();
175    if alen != b.len() {
176        return false;
177    }
178    if alen == 0 {
179        return true;
180    }
181    // Short-line fast path: compare via word loads to avoid memcmp call overhead
182    if alen <= 8 {
183        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
184        return a == b;
185    }
186    unsafe {
187        let ap = a.as_ptr();
188        let bp = b.as_ptr();
189        // 8-byte prefix check: reject most non-equal lines without full memcmp
190        let a8 = (ap as *const u64).read_unaligned();
191        let b8 = (bp as *const u64).read_unaligned();
192        if a8 != b8 {
193            return false;
194        }
195        // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
196        if alen <= 16 {
197            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
198            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
199            return a_tail == b_tail;
200        }
201        // For 17-32 bytes: check first 16 + last 16 (overlapping) to avoid memcmp
202        if alen <= 32 {
203            let a16 = (ap.add(8) as *const u64).read_unaligned();
204            let b16 = (bp.add(8) as *const u64).read_unaligned();
205            if a16 != b16 {
206                return false;
207            }
208            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
209            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
210            return a_tail == b_tail;
211        }
212        // For 33-256 bytes: tight u64 loop covering the full line.
213        // Compare 32 bytes per iteration (4 u64 loads), then handle tail.
214        // This avoids the function call overhead of memcmp for medium lines.
215        if alen <= 256 {
216            let mut off = 8usize; // first 8 bytes already compared
217            // Compare 32 bytes at a time
218            while off + 32 <= alen {
219                let a0 = (ap.add(off) as *const u64).read_unaligned();
220                let b0 = (bp.add(off) as *const u64).read_unaligned();
221                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
222                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
223                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
224                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
225                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
226                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
227                // XOR all pairs and OR together: zero if all equal
228                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
229                    return false;
230                }
231                off += 32;
232            }
233            // Compare remaining 8 bytes at a time
234            while off + 8 <= alen {
235                let aw = (ap.add(off) as *const u64).read_unaligned();
236                let bw = (bp.add(off) as *const u64).read_unaligned();
237                if aw != bw {
238                    return false;
239                }
240                off += 8;
241            }
242            // Compare tail (overlapping last 8 bytes)
243            if off < alen {
244                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
245                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
246                return a_tail == b_tail;
247            }
248            return true;
249        }
250    }
251    // Longer lines (>256): prefix passed, fall through to full memcmp
252    a == b
253}
254
255/// Compare two equal-length lines starting from byte 8.
256/// Caller has already checked: lengths are equal, both >= 9 bytes, first 8 bytes match.
257/// This avoids redundant checks when the calling loop already did prefix rejection.
258#[inline(always)]
259fn lines_equal_after_prefix(a: &[u8], b: &[u8]) -> bool {
260    let alen = a.len();
261    debug_assert!(alen == b.len());
262    debug_assert!(alen > 8);
263    unsafe {
264        let ap = a.as_ptr();
265        let bp = b.as_ptr();
266        // Check last 8 bytes first (overlapping for 9-16 byte lines)
267        if alen <= 16 {
268            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
269            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
270            return a_tail == b_tail;
271        }
272        if alen <= 32 {
273            let a16 = (ap.add(8) as *const u64).read_unaligned();
274            let b16 = (bp.add(8) as *const u64).read_unaligned();
275            if a16 != b16 {
276                return false;
277            }
278            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
279            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
280            return a_tail == b_tail;
281        }
282        if alen <= 256 {
283            let mut off = 8usize;
284            while off + 32 <= alen {
285                let a0 = (ap.add(off) as *const u64).read_unaligned();
286                let b0 = (bp.add(off) as *const u64).read_unaligned();
287                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
288                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
289                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
290                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
291                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
292                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
293                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
294                    return false;
295                }
296                off += 32;
297            }
298            while off + 8 <= alen {
299                let aw = (ap.add(off) as *const u64).read_unaligned();
300                let bw = (bp.add(off) as *const u64).read_unaligned();
301                if aw != bw {
302                    return false;
303                }
304                off += 8;
305            }
306            if off < alen {
307                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
308                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
309                return a_tail == b_tail;
310            }
311            return true;
312        }
313    }
314    // >256 bytes: use memcmp via slice comparison (skipping the already-compared prefix)
315    a[8..] == b[8..]
316}
317
318/// Write a count-prefixed line in GNU uniq format.
319/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
320/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
321///
322/// Optimized with lookup table for counts 1-9 (most common case in many-dups data)
323/// and fast-path for counts < 10M (always fits in 7 chars, no copy_within needed).
324#[inline(always)]
325fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
326    // Ultra-fast path for common small counts: pre-built prefix strings
327    // Avoids all the itoa/copy_within overhead for the most common case.
328    if count <= 9 {
329        // "      N " where N is 1-9 (7 chars + space = 8 bytes prefix)
330        let prefix: &[u8] = match count {
331            1 => b"      1 ",
332            2 => b"      2 ",
333            3 => b"      3 ",
334            4 => b"      4 ",
335            5 => b"      5 ",
336            6 => b"      6 ",
337            7 => b"      7 ",
338            8 => b"      8 ",
339            9 => b"      9 ",
340            _ => unreachable!(),
341        };
342        let total = 8 + line.len() + 1;
343        if total <= 256 {
344            let mut buf = [0u8; 256];
345            unsafe {
346                std::ptr::copy_nonoverlapping(prefix.as_ptr(), buf.as_mut_ptr(), 8);
347                std::ptr::copy_nonoverlapping(line.as_ptr(), buf.as_mut_ptr().add(8), line.len());
348                *buf.as_mut_ptr().add(8 + line.len()) = term;
349            }
350            return out.write_all(&buf[..total]);
351        } else {
352            out.write_all(prefix)?;
353            out.write_all(line)?;
354            return out.write_all(&[term]);
355        }
356    }
357
358    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
359    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
360    let digits = itoa_right_aligned_into(&mut prefix, count);
361    let width = digits.max(7); // minimum 7 chars
362    let prefix_len = width + 1; // +1 for trailing space
363    prefix[width] = b' ';
364
365    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
366    let total = prefix_len + line.len() + 1;
367    if total <= 256 {
368        let mut buf = [0u8; 256];
369        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
370        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
371        buf[prefix_len + line.len()] = term;
372        out.write_all(&buf[..total])
373    } else {
374        out.write_all(&prefix[..prefix_len])?;
375        out.write_all(line)?;
376        out.write_all(&[term])
377    }
378}
379
380/// Write u64 decimal right-aligned into prefix buffer.
381/// Buffer is pre-filled with spaces. Returns number of digits written.
382#[inline(always)]
383fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
384    if val == 0 {
385        buf[6] = b'0';
386        return 7; // 6 spaces + '0' = 7 chars
387    }
388    // Write digits right-to-left from position 27 (leaving room for trailing space)
389    let mut pos = 27;
390    while val > 0 {
391        pos -= 1;
392        buf[pos] = b'0' + (val % 10) as u8;
393        val /= 10;
394    }
395    let num_digits = 27 - pos;
396    if num_digits >= 7 {
397        // Number is wide enough, shift to front
398        buf.copy_within(pos..27, 0);
399        num_digits
400    } else {
401        // Right-align in 7-char field: spaces then digits
402        let pad = 7 - num_digits;
403        buf.copy_within(pos..27, pad);
404        // buf[0..pad] is already spaces from initialization
405        7
406    }
407}
408
409// ============================================================================
410// High-performance mmap-based processing (for byte slices, zero-copy)
411// ============================================================================
412
413/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
414pub fn process_uniq_bytes(
415    data: &[u8],
416    mut output: impl Write,
417    config: &UniqConfig,
418) -> io::Result<()> {
419    let term = if config.zero_terminated { b'\0' } else { b'\n' };
420
421    // Zero-copy fast path: bypass BufWriter for standard modes with IoSlice output.
422    // Default mode: writes contiguous runs directly from mmap data via writev.
423    // Filter modes (-d/-u): IoSlice batching (512 lines per writev).
424    // Count mode (-c): IoSlice batching (340 groups per writev, prefix arena + mmap data).
425    // Without BufWriter, writes go directly via writev/vmsplice (zero-copy for data slices).
426    let fast = !needs_key_extraction(config) && !config.ignore_case;
427    if fast
428        && matches!(
429            config.mode,
430            OutputMode::Default | OutputMode::RepeatedOnly | OutputMode::UniqueOnly
431        )
432    {
433        return process_standard_bytes(data, &mut output, config, term);
434    }
435
436    // General path with BufWriter for modes that need formatting/buffering.
437    // 16MB buffer — optimal for L3 cache utilization on modern CPUs.
438    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
439
440    match config.mode {
441        OutputMode::Group(method) => {
442            process_group_bytes(data, &mut writer, config, method, term)?;
443        }
444        OutputMode::AllRepeated(method) => {
445            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
446        }
447        _ => {
448            process_standard_bytes(data, &mut writer, config, term)?;
449        }
450    }
451
452    writer.flush()?;
453    Ok(())
454}
455
456/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
457/// Uses memchr for SIMD-accelerated line boundary detection.
458struct LineIter<'a> {
459    data: &'a [u8],
460    pos: usize,
461    term: u8,
462}
463
464impl<'a> LineIter<'a> {
465    #[inline(always)]
466    fn new(data: &'a [u8], term: u8) -> Self {
467        Self { data, pos: 0, term }
468    }
469}
470
471impl<'a> Iterator for LineIter<'a> {
472    /// (line content without terminator, full line including terminator for output)
473    type Item = (&'a [u8], &'a [u8]);
474
475    #[inline(always)]
476    fn next(&mut self) -> Option<Self::Item> {
477        if self.pos >= self.data.len() {
478            return None;
479        }
480
481        let remaining = &self.data[self.pos..];
482        match memchr::memchr(self.term, remaining) {
483            Some(idx) => {
484                let line_start = self.pos;
485                let line_end = self.pos + idx; // without terminator
486                let full_end = self.pos + idx + 1; // with terminator
487                self.pos = full_end;
488                Some((
489                    &self.data[line_start..line_end],
490                    &self.data[line_start..full_end],
491                ))
492            }
493            None => {
494                // Last line without terminator
495                let line_start = self.pos;
496                self.pos = self.data.len();
497                let line = &self.data[line_start..];
498                Some((line, line))
499            }
500        }
501    }
502}
503
504/// Get line content (without terminator) from pre-computed positions.
505/// `content_end` is the end of actual content (excludes trailing terminator if present).
506#[inline(always)]
507fn line_content_at<'a>(
508    data: &'a [u8],
509    line_starts: &[usize],
510    idx: usize,
511    content_end: usize,
512) -> &'a [u8] {
513    let start = line_starts[idx];
514    let end = if idx + 1 < line_starts.len() {
515        line_starts[idx + 1] - 1 // exclude terminator
516    } else {
517        content_end // last line: pre-computed to exclude trailing terminator
518    };
519    &data[start..end]
520}
521
522/// Get full line (with terminator) from pre-computed positions.
523#[inline(always)]
524fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
525    let start = line_starts[idx];
526    let end = if idx + 1 < line_starts.len() {
527        line_starts[idx + 1] // include terminator
528    } else {
529        data.len()
530    };
531    &data[start..end]
532}
533
534/// Skip a run of identical lines using doubling memcmp.
535/// When a duplicate is found at `dup_start`, this verifies progressively larger
536/// blocks of identical `pattern_len`-byte copies using memcmp (SIMD-accelerated).
537/// Returns the byte offset just past the last verified duplicate copy.
538///
539/// For 50K identical 6-byte lines: ~16 memcmp calls (~600KB total) vs 50K per-line
540/// comparisons. At memcmp's SIMD throughput (~48GB/s), this takes ~12µs vs ~250µs.
541///
542/// Correctness: the doubling trick verifies every byte in the range by induction.
543/// Block[0..N] verified → check Block[N..2N] == Block[0..N] → Block[0..2N] verified.
544#[inline]
545fn skip_dup_run(data: &[u8], dup_start: usize, pattern_start: usize, pattern_len: usize) -> usize {
546    let data_len = data.len();
547    // Need at least 2 more copies worth of data for doubling to help
548    if pattern_len == 0 || dup_start + 2 * pattern_len > data_len {
549        return dup_start + pattern_len.min(data_len - dup_start);
550    }
551
552    let mut verified_end = dup_start + pattern_len; // 1 copy verified
553
554    // Phase 1: doubling — compare verified block vs next block of same size.
555    // Each step doubles the verified region. Total bytes compared ≈ 2 × total region.
556    let mut block_copies = 1usize;
557    loop {
558        let block_bytes = block_copies * pattern_len;
559        let next_end = verified_end + block_bytes;
560        if next_end > data_len {
561            // Not enough room for a full doubling. Check remaining complete copies.
562            let remaining = data_len - verified_end;
563            let remaining_bytes = (remaining / pattern_len) * pattern_len;
564            if remaining_bytes > 0
565                && data[dup_start..dup_start + remaining_bytes]
566                    == data[verified_end..verified_end + remaining_bytes]
567            {
568                verified_end += remaining_bytes;
569            }
570            break;
571        }
572
573        if data[dup_start..dup_start + block_bytes] == data[verified_end..next_end] {
574            verified_end = next_end;
575            block_copies *= 2;
576        } else {
577            break;
578        }
579    }
580
581    // Phase 2: linear scan for remaining lines at the boundary.
582    // At most `block_copies` iterations (the last failed block size).
583    while verified_end + pattern_len <= data_len {
584        if data[verified_end..verified_end + pattern_len]
585            == data[pattern_start..pattern_start + pattern_len]
586        {
587            verified_end += pattern_len;
588        } else {
589            break;
590        }
591    }
592
593    verified_end
594}
595
596/// Linear scan for the end of a duplicate group.
597/// Returns the index of the first line that differs from line_starts[group_start].
598/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
599/// equal lines can appear in non-adjacent groups separated by different lines.
600/// Caches key length for fast length-mismatch rejection.
601#[inline]
602fn linear_scan_group_end(
603    data: &[u8],
604    line_starts: &[usize],
605    group_start: usize,
606    num_lines: usize,
607    content_end: usize,
608) -> usize {
609    let key = line_content_at(data, line_starts, group_start, content_end);
610    let key_len = key.len();
611    let mut i = group_start + 1;
612    while i < num_lines {
613        let candidate = line_content_at(data, line_starts, i, content_end);
614        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
615            return i;
616        }
617        i += 1;
618    }
619    i
620}
621
622/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
623/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
624/// General path: pre-computed line positions with binary search for groups.
625fn process_standard_bytes(
626    data: &[u8],
627    writer: &mut impl Write,
628    config: &UniqConfig,
629    term: u8,
630) -> io::Result<()> {
631    if data.is_empty() {
632        return Ok(());
633    }
634
635    let fast = !needs_key_extraction(config) && !config.ignore_case;
636    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
637
638    // Ultra-fast path: default mode, no count, no key extraction.
639    // Single-pass: scan with memchr, compare adjacent lines inline.
640    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
641    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
642        return process_default_fast_singlepass(data, writer, term);
643    }
644
645    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
646    if fast
647        && !config.count
648        && matches!(
649            config.mode,
650            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
651        )
652    {
653        return process_filter_fast_singlepass(data, writer, config, term);
654    }
655
656    // Ultra-fast path: count mode with no key extraction.
657    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
658    // Avoids the line_starts Vec allocation (20MB+ for large files).
659    if fast && config.count {
660        return process_count_fast_singlepass(data, writer, config, term);
661    }
662
663    // Fast path for case-insensitive (-i) mode with no key extraction.
664    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
665    // Avoids the general path's line_starts Vec allocation.
666    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
667        return process_default_ci_singlepass(data, writer, term);
668    }
669
670    if fast_ci
671        && !config.count
672        && matches!(
673            config.mode,
674            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
675        )
676    {
677        return process_filter_ci_singlepass(data, writer, config, term);
678    }
679
680    if fast_ci && config.count {
681        return process_count_ci_singlepass(data, writer, config, term);
682    }
683
684    // General path: pre-computed line positions for binary search on groups
685    let estimated_lines = (data.len() / 40).max(64);
686    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
687    line_starts.push(0);
688    for pos in memchr::memchr_iter(term, data) {
689        if pos + 1 < data.len() {
690            line_starts.push(pos + 1);
691        }
692    }
693    let num_lines = line_starts.len();
694    if num_lines == 0 {
695        return Ok(());
696    }
697
698    // Pre-compute content end: if data ends with terminator, exclude it for last line
699    let content_end = if data.last() == Some(&term) {
700        data.len() - 1
701    } else {
702        data.len()
703    };
704
705    // Ultra-fast path: default mode, no count, no key extraction
706    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
707        // Write first line
708        let first_full = line_full_at(data, &line_starts, 0);
709        let first_content = line_content_at(data, &line_starts, 0, content_end);
710        write_all_raw(writer, first_full)?;
711        if first_full.len() == first_content.len() {
712            writer.write_all(&[term])?;
713        }
714
715        let mut i = 1;
716        while i < num_lines {
717            let prev = line_content_at(data, &line_starts, i - 1, content_end);
718            let cur = line_content_at(data, &line_starts, i, content_end);
719
720            if lines_equal_fast(prev, cur) {
721                // Duplicate detected — linear scan for end of group
722                let group_end =
723                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
724                i = group_end;
725                continue;
726            }
727
728            // Unique line — write it
729            let cur_full = line_full_at(data, &line_starts, i);
730            write_all_raw(writer, cur_full)?;
731            if cur_full.len() == cur.len() {
732                writer.write_all(&[term])?;
733            }
734            i += 1;
735        }
736        return Ok(());
737    }
738
739    // General path with count tracking
740    let mut i = 0;
741    while i < num_lines {
742        let content = line_content_at(data, &line_starts, i, content_end);
743        let full = line_full_at(data, &line_starts, i);
744
745        let group_end = if fast
746            && i + 1 < num_lines
747            && lines_equal_fast(
748                content,
749                line_content_at(data, &line_starts, i + 1, content_end),
750            ) {
751            // Duplicate detected — linear scan for end
752            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
753        } else if !fast
754            && i + 1 < num_lines
755            && lines_equal(
756                content,
757                line_content_at(data, &line_starts, i + 1, content_end),
758                config,
759            )
760        {
761            // Slow path linear scan with key extraction
762            let mut j = i + 2;
763            while j < num_lines {
764                if !lines_equal(
765                    content,
766                    line_content_at(data, &line_starts, j, content_end),
767                    config,
768                ) {
769                    break;
770                }
771                j += 1;
772            }
773            j
774        } else {
775            i + 1
776        };
777
778        let count = (group_end - i) as u64;
779        output_group_bytes(writer, content, full, count, config, term)?;
780        i = group_end;
781    }
782
783    Ok(())
784}
785
786/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
787/// No pre-computed positions, no binary search, no Vec allocation.
788/// Outputs each line that differs from the previous.
789///
790/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
791/// independently, then cross-chunk boundaries are resolved.
792fn process_default_fast_singlepass(
793    data: &[u8],
794    writer: &mut impl Write,
795    term: u8,
796) -> io::Result<()> {
797    // Parallel path for large files — kick in at 4MB.
798    // Lower thresholds (e.g. 2MB) hurt performance on 10MB files because
799    // the parallel overhead dominates for smaller chunks.
800    if data.len() >= 4 * 1024 * 1024 {
801        return process_default_parallel(data, writer, term);
802    }
803
804    process_default_sequential(data, writer, term)
805}
806
807/// Sequential single-pass dedup with zero-copy output.
808/// Instead of copying data to a buffer, tracks contiguous output runs and writes
809/// directly from the original data. For all-unique data, this is a single write_all.
810///
811/// Optimized for the "many duplicates" case: caches the previous line's length
812/// and first-8-byte prefix for fast rejection of non-duplicates without
813/// calling the full comparison function.
814///
815/// Uses raw pointer arithmetic throughout to avoid bounds checking in the hot loop.
816fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
817    let data_len = data.len();
818    let base = data.as_ptr();
819    let mut prev_start: usize = 0;
820
821    // Find end of first line
822    let first_end: usize = match memchr::memchr(term, data) {
823        Some(pos) => pos,
824        None => {
825            // Single line, no terminator
826            writer.write_all(data)?;
827            return writer.write_all(&[term]);
828        }
829    };
830
831    // Cache previous line metadata for fast comparison
832    let mut prev_len = first_end - prev_start;
833    let mut prev_prefix: u64 = if prev_len >= 8 {
834        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
835    } else {
836        0
837    };
838
839    // run_start tracks the beginning of the current contiguous output region.
840    // When a duplicate is found, we save the run as an IoSlice and skip the dup.
841    // Runs are batched and written with writev to reduce syscall overhead.
842    const BATCH: usize = 256;
843    let term_byte: [u8; 1] = [term];
844    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
845    let mut run_start: usize = 0;
846    let mut cur_start = first_end + 1;
847    let mut last_output_end = first_end + 1; // exclusive end including terminator
848
849    while cur_start < data_len {
850        // Speculative line-end detection: if the previous line had length L,
851        // check if data[cur_start + L] is the terminator. This avoids the
852        // memchr SIMD call for repetitive data where all lines have the same length.
853        // Falls back to memchr if the speculation is wrong.
854        let cur_end = {
855            let speculative = cur_start + prev_len;
856            if speculative < data_len && unsafe { *base.add(speculative) } == term {
857                speculative
858            } else {
859                match memchr::memchr(term, unsafe {
860                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
861                }) {
862                    Some(offset) => cur_start + offset,
863                    None => data_len,
864                }
865            }
866        };
867
868        let cur_len = cur_end - cur_start;
869
870        // Fast reject: if lengths differ, lines are definitely not equal.
871        // This branch structure is ordered by frequency: length mismatch is
872        // most common for unique data, prefix mismatch next, full compare last.
873        let is_dup = if cur_len != prev_len {
874            false
875        } else if cur_len == 0 {
876            true
877        } else if cur_len >= 8 {
878            // Compare cached 8-byte prefix first
879            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
880            if cur_prefix != prev_prefix {
881                false
882            } else if cur_len <= 8 {
883                true // prefix covers entire line
884            } else if cur_len <= 16 {
885                // Check last 8 bytes (overlapping)
886                unsafe {
887                    let a_tail =
888                        (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
889                    let b_tail = (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
890                    a_tail == b_tail
891                }
892            } else if cur_len <= 32 {
893                // Check bytes 8-16 and last 8 bytes
894                unsafe {
895                    let a16 = (base.add(prev_start + 8) as *const u64).read_unaligned();
896                    let b16 = (base.add(cur_start + 8) as *const u64).read_unaligned();
897                    if a16 != b16 {
898                        false
899                    } else {
900                        let a_tail =
901                            (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
902                        let b_tail =
903                            (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
904                        a_tail == b_tail
905                    }
906                }
907            } else if cur_len <= 256 {
908                // 33-256 bytes: tight u64 loop with XOR-OR batching.
909                // Compares 32 bytes per iteration (4 u64 loads), reducing
910                // branch mispredictions vs individual comparisons.
911                unsafe {
912                    let ap = base.add(prev_start);
913                    let bp = base.add(cur_start);
914                    let mut off = 8usize; // first 8 bytes already compared via prefix
915                    let mut eq = true;
916                    while off + 32 <= cur_len {
917                        let a0 = (ap.add(off) as *const u64).read_unaligned();
918                        let b0 = (bp.add(off) as *const u64).read_unaligned();
919                        let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
920                        let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
921                        let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
922                        let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
923                        let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
924                        let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
925                        if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
926                            eq = false;
927                            break;
928                        }
929                        off += 32;
930                    }
931                    if eq {
932                        while off + 8 <= cur_len {
933                            let aw = (ap.add(off) as *const u64).read_unaligned();
934                            let bw = (bp.add(off) as *const u64).read_unaligned();
935                            if aw != bw {
936                                eq = false;
937                                break;
938                            }
939                            off += 8;
940                        }
941                    }
942                    if eq && off < cur_len {
943                        let a_tail = (ap.add(cur_len - 8) as *const u64).read_unaligned();
944                        let b_tail = (bp.add(cur_len - 8) as *const u64).read_unaligned();
945                        eq = a_tail == b_tail;
946                    }
947                    eq
948                }
949            } else {
950                // For longer lines (>256), use unsafe slice comparison
951                unsafe {
952                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
953                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
954                    a == b
955                }
956            }
957        } else {
958            // Short line < 8 bytes — direct byte comparison
959            unsafe {
960                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
961                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
962                a == b
963            }
964        };
965
966        if is_dup {
967            // Duplicate found — use doubling memcmp to skip entire run of identical lines.
968            // For 50K identical lines, this takes ~12µs vs ~250µs per-line comparison.
969            let pattern_len = prev_len + 1; // line content + terminator
970            if run_start < cur_start {
971                slices.push(io::IoSlice::new(&data[run_start..cur_start]));
972                if slices.len() >= BATCH {
973                    write_all_vectored(writer, &slices)?;
974                    slices.clear();
975                }
976            }
977            // Skip all identical copies using doubling memcmp
978            let skip_end = skip_dup_run(data, cur_start, prev_start, pattern_len);
979            run_start = skip_end;
980            cur_start = skip_end;
981            // prev_start/prev_len/prev_prefix unchanged (still the group representative)
982            continue;
983        } else {
984            // Different line — update cached comparison state
985            prev_start = cur_start;
986            prev_len = cur_len;
987            prev_prefix = if cur_len >= 8 {
988                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
989            } else {
990                0
991            };
992            last_output_end = if cur_end < data_len {
993                cur_end + 1
994            } else {
995                cur_end
996            };
997        }
998
999        if cur_end < data_len {
1000            cur_start = cur_end + 1;
1001        } else {
1002            break;
1003        }
1004    }
1005
1006    // Flush remaining run
1007    if run_start < data_len {
1008        slices.push(io::IoSlice::new(
1009            &data[run_start..last_output_end.max(run_start)],
1010        ));
1011    }
1012
1013    // Ensure trailing terminator
1014    if data_len > 0 && unsafe { *base.add(data_len - 1) } != term {
1015        slices.push(io::IoSlice::new(&term_byte));
1016    }
1017
1018    if !slices.is_empty() {
1019        write_all_vectored(writer, &slices)?;
1020    }
1021
1022    Ok(())
1023}
1024
1025/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
1026/// positions in each chunk in parallel, then write output runs directly from
1027/// the original data. No per-chunk buffer allocation needed.
1028fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1029    use rayon::prelude::*;
1030
1031    let num_threads = rayon::current_num_threads().max(1);
1032    let chunk_target = data.len() / num_threads;
1033
1034    // Find chunk boundaries aligned to line terminators
1035    let mut boundaries = Vec::with_capacity(num_threads + 1);
1036    boundaries.push(0usize);
1037    for i in 1..num_threads {
1038        let target = i * chunk_target;
1039        if target >= data.len() {
1040            break;
1041        }
1042        if let Some(p) = memchr::memchr(term, &data[target..]) {
1043            let b = target + p + 1;
1044            if b > *boundaries.last().unwrap() && b <= data.len() {
1045                boundaries.push(b);
1046            }
1047        }
1048    }
1049    boundaries.push(data.len());
1050
1051    let n_chunks = boundaries.len() - 1;
1052    if n_chunks <= 1 {
1053        return process_default_sequential(data, writer, term);
1054    }
1055
1056    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
1057    struct ChunkResult {
1058        /// Byte ranges in the original data to output (contiguous runs)
1059        runs: Vec<(usize, usize)>,
1060        /// First line in chunk (absolute offsets into data, content without term)
1061        first_line_start: usize,
1062        first_line_end: usize,
1063        /// Last *output* line in chunk (content without term)
1064        last_line_start: usize,
1065        last_line_end: usize,
1066    }
1067
1068    let results: Vec<ChunkResult> = boundaries
1069        .windows(2)
1070        .collect::<Vec<_>>()
1071        .par_iter()
1072        .map(|w| {
1073            let chunk_start = w[0];
1074            let chunk_end = w[1];
1075            let chunk = &data[chunk_start..chunk_end];
1076
1077            let first_term = match memchr::memchr(term, chunk) {
1078                Some(pos) => pos,
1079                None => {
1080                    return ChunkResult {
1081                        runs: vec![(chunk_start, chunk_end)],
1082                        first_line_start: chunk_start,
1083                        first_line_end: chunk_end,
1084                        last_line_start: chunk_start,
1085                        last_line_end: chunk_end,
1086                    };
1087                }
1088            };
1089
1090            let first_line_start = chunk_start;
1091            let first_line_end = chunk_start + first_term;
1092
1093            let mut runs: Vec<(usize, usize)> = Vec::new();
1094            let mut run_start = chunk_start;
1095            let mut prev_start = 0usize;
1096            let mut _prev_end = first_term;
1097            let mut last_out_start = chunk_start;
1098            let mut last_out_end = first_line_end;
1099
1100            let mut prev_len = first_term;
1101            let chunk_base = chunk.as_ptr();
1102            let chunk_len = chunk.len();
1103            // Cache previous line's prefix for fast rejection
1104            let mut prev_prefix: u64 = if prev_len >= 8 {
1105                unsafe { (chunk_base as *const u64).read_unaligned() }
1106            } else {
1107                0
1108            };
1109            let mut cur_start = first_term + 1;
1110            while cur_start < chunk_len {
1111                // Speculative line-end: check if next line has same length
1112                let cur_end = {
1113                    let spec = cur_start + prev_len;
1114                    if spec < chunk_len && unsafe { *chunk_base.add(spec) } == term {
1115                        spec
1116                    } else {
1117                        match memchr::memchr(term, unsafe {
1118                            std::slice::from_raw_parts(
1119                                chunk_base.add(cur_start),
1120                                chunk_len - cur_start,
1121                            )
1122                        }) {
1123                            Some(offset) => cur_start + offset,
1124                            None => chunk_len,
1125                        }
1126                    }
1127                };
1128
1129                let cur_len = cur_end - cur_start;
1130                // Fast reject: length + prefix + full comparison
1131                let is_dup = if cur_len != prev_len {
1132                    false
1133                } else if cur_len == 0 {
1134                    true
1135                } else if cur_len >= 8 {
1136                    let cur_prefix =
1137                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() };
1138                    if cur_prefix != prev_prefix {
1139                        false
1140                    } else if cur_len <= 8 {
1141                        true
1142                    } else {
1143                        unsafe {
1144                            let a =
1145                                std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1146                            let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1147                            lines_equal_after_prefix(a, b)
1148                        }
1149                    }
1150                } else {
1151                    unsafe {
1152                        let a = std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1153                        let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1154                        a == b
1155                    }
1156                };
1157
1158                if is_dup {
1159                    // Duplicate — use doubling memcmp to skip entire run
1160                    let pattern_len = prev_len + 1;
1161                    let abs_cur = chunk_start + cur_start;
1162                    if run_start < abs_cur {
1163                        runs.push((run_start, abs_cur));
1164                    }
1165                    let skip_end = skip_dup_run(chunk, cur_start, prev_start, pattern_len);
1166                    run_start = chunk_start + skip_end;
1167                    cur_start = skip_end;
1168                    // prev_start/prev_len/prev_prefix unchanged
1169                    continue;
1170                } else {
1171                    last_out_start = chunk_start + cur_start;
1172                    last_out_end = chunk_start + cur_end;
1173                    prev_len = cur_len;
1174                    prev_prefix = if cur_len >= 8 {
1175                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() }
1176                    } else {
1177                        0
1178                    };
1179                }
1180                prev_start = cur_start;
1181                _prev_end = cur_end;
1182
1183                if cur_end < chunk_len {
1184                    cur_start = cur_end + 1;
1185                } else {
1186                    break;
1187                }
1188            }
1189
1190            // Close final run
1191            if run_start < chunk_end {
1192                runs.push((run_start, chunk_end));
1193            }
1194
1195            ChunkResult {
1196                runs,
1197                first_line_start,
1198                first_line_end,
1199                last_line_start: last_out_start,
1200                last_line_end: last_out_end,
1201            }
1202        })
1203        .collect();
1204
1205    // Write results, adjusting cross-chunk boundaries.
1206    // Batch output runs via write_vectored to reduce syscall count.
1207    const BATCH: usize = 256;
1208    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
1209    for (i, result) in results.iter().enumerate() {
1210        let skip_first = if i > 0 {
1211            let prev = &results[i - 1];
1212            let prev_last = &data[prev.last_line_start..prev.last_line_end];
1213            let cur_first = &data[result.first_line_start..result.first_line_end];
1214            lines_equal_fast(prev_last, cur_first)
1215        } else {
1216            false
1217        };
1218
1219        let skip_end = if skip_first {
1220            // Skip bytes up to and including the first line's terminator
1221            result.first_line_end + 1
1222        } else {
1223            0
1224        };
1225
1226        for &(rs, re) in &result.runs {
1227            let actual_start = rs.max(skip_end);
1228            if actual_start < re {
1229                slices.push(io::IoSlice::new(&data[actual_start..re]));
1230                if slices.len() >= BATCH {
1231                    write_all_vectored(writer, &slices)?;
1232                    slices.clear();
1233                }
1234            }
1235        }
1236    }
1237    if !slices.is_empty() {
1238        write_all_vectored(writer, &slices)?;
1239    }
1240
1241    // Ensure trailing terminator
1242    if !data.is_empty() && *data.last().unwrap() != term {
1243        writer.write_all(&[term])?;
1244    }
1245
1246    Ok(())
1247}
1248
1249/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
1250/// Zero-copy: writes directly from mmap data through BufWriter.
1251/// Uses speculative line-end detection and 8-byte prefix caching for fast
1252/// duplicate detection without full memcmp.
1253fn process_filter_fast_singlepass(
1254    data: &[u8],
1255    writer: &mut impl Write,
1256    config: &UniqConfig,
1257    term: u8,
1258) -> io::Result<()> {
1259    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1260    let data_len = data.len();
1261    let base = data.as_ptr();
1262
1263    let first_term = match memchr::memchr(term, data) {
1264        Some(pos) => pos,
1265        None => {
1266            // Single line: unique (count=1)
1267            if !repeated {
1268                writer.write_all(data)?;
1269                writer.write_all(&[term])?;
1270            }
1271            return Ok(());
1272        }
1273    };
1274
1275    let mut prev_start: usize = 0;
1276    let mut prev_end: usize = first_term;
1277    let mut prev_len = prev_end;
1278    let mut prev_prefix: u64 = if prev_len >= 8 {
1279        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1280    } else {
1281        0
1282    };
1283    let mut count: u64 = 1;
1284    let mut cur_start = first_term + 1;
1285
1286    // Batch output using IoSlice write_vectored to reduce syscall overhead.
1287    // Each output line needs 2 slices: content + terminator.
1288    const BATCH: usize = 512;
1289    let term_slice: [u8; 1] = [term];
1290    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1291
1292    while cur_start < data_len {
1293        // Speculative line-end detection
1294        let cur_end = {
1295            let speculative = cur_start + prev_len;
1296            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1297                speculative
1298            } else {
1299                match memchr::memchr(term, unsafe {
1300                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1301                }) {
1302                    Some(offset) => cur_start + offset,
1303                    None => data_len,
1304                }
1305            }
1306        };
1307
1308        let cur_len = cur_end - cur_start;
1309
1310        // Fast reject using length + 8-byte prefix.
1311        // After prefix match, use lines_equal_after_prefix which skips
1312        // the already-checked length/prefix/empty checks.
1313        let is_dup = if cur_len != prev_len {
1314            false
1315        } else if cur_len == 0 {
1316            true
1317        } else if cur_len >= 8 {
1318            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1319            if cur_prefix != prev_prefix {
1320                false
1321            } else if cur_len <= 8 {
1322                true
1323            } else {
1324                unsafe {
1325                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1326                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1327                    lines_equal_after_prefix(a, b)
1328                }
1329            }
1330        } else {
1331            unsafe {
1332                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1333                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1334                a == b
1335            }
1336        };
1337
1338        if is_dup {
1339            // Use doubling memcmp to skip entire duplicate run
1340            let pattern_len = prev_len + 1;
1341            let skip_end = skip_dup_run(data, cur_start, prev_start, pattern_len);
1342            let skipped = (skip_end - cur_start) / pattern_len;
1343            count += skipped as u64;
1344            cur_start = skip_end;
1345            continue;
1346        } else {
1347            let should_print = if repeated { count > 1 } else { count == 1 };
1348            if should_print {
1349                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1350                slices.push(io::IoSlice::new(&term_slice));
1351                if slices.len() >= BATCH * 2 {
1352                    write_all_vectored(writer, &slices)?;
1353                    slices.clear();
1354                }
1355            }
1356            prev_start = cur_start;
1357            prev_end = cur_end;
1358            prev_len = cur_len;
1359            prev_prefix = if cur_len >= 8 {
1360                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1361            } else {
1362                0
1363            };
1364            count = 1;
1365        }
1366
1367        if cur_end < data_len {
1368            cur_start = cur_end + 1;
1369        } else {
1370            break;
1371        }
1372    }
1373
1374    // Output last group
1375    let should_print = if repeated { count > 1 } else { count == 1 };
1376    if should_print {
1377        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1378        slices.push(io::IoSlice::new(&term_slice));
1379    }
1380    if !slices.is_empty() {
1381        write_all_vectored(writer, &slices)?;
1382    }
1383
1384    Ok(())
1385}
1386
1387/// Fast single-pass for count mode (-c) with all standard output modes.
1388/// Zero line_starts allocation: scans with memchr, counts groups inline,
1389/// and writes count-prefixed lines directly.
1390/// Uses cached length comparison for fast duplicate rejection.
1391/// Uses raw pointer arithmetic to avoid bounds checking.
1392///
1393/// Zero-copy output: uses writev (IoSlice) to write count prefixes from a
1394/// small arena + line content directly from mmap'd data + terminator bytes.
1395/// This avoids copying line content into an intermediate buffer entirely.
1396///
1397/// Optimizations:
1398/// - Speculative line-end detection: if all lines have the same length (common
1399///   in repetitive data), we can skip the memchr SIMD scan entirely by checking
1400///   if data[cur_start + prev_len] is the terminator.
1401/// - Cached 8-byte prefix rejection: avoids full comparison for most non-equal lines.
1402/// - IoSlice writev batching: eliminates memcpy of line content.
1403fn process_count_fast_singlepass(
1404    data: &[u8],
1405    writer: &mut impl Write,
1406    config: &UniqConfig,
1407    term: u8,
1408) -> io::Result<()> {
1409    let data_len = data.len();
1410    let base = data.as_ptr();
1411    let first_term = match memchr::memchr(term, data) {
1412        Some(pos) => pos,
1413        None => {
1414            // Single line: count=1
1415            let should_print = match config.mode {
1416                OutputMode::Default => true,
1417                OutputMode::RepeatedOnly => false,
1418                OutputMode::UniqueOnly => true,
1419                _ => true,
1420            };
1421            if should_print {
1422                write_count_line(writer, 1, data, term)?;
1423            }
1424            return Ok(());
1425        }
1426    };
1427
1428    let mut prev_start: usize = 0;
1429    let mut prev_end: usize = first_term;
1430    let mut prev_len = prev_end;
1431    let mut prev_prefix: u64 = if prev_len >= 8 {
1432        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1433    } else {
1434        0
1435    };
1436    let mut count: u64 = 1;
1437    let mut cur_start = first_term + 1;
1438
1439    // Zero-copy writev batching: accumulate groups as (prefix_offset, prefix_len,
1440    // line_start, line_end) tuples, with prefixes stored in a flat byte buffer.
1441    // Build IoSlice arrays at flush time to avoid borrow conflicts.
1442    // Line content points directly into mmap'd data — zero copy.
1443    const BATCH: usize = 340;
1444    const PREFIX_SLOT: usize = 28; // max prefix size per group
1445    let term_slice: [u8; 1] = [term];
1446    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1447    // Each group: (prefix_len, line_start_in_data, line_end_in_data)
1448    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1449
1450    while cur_start < data_len {
1451        let cur_end = {
1452            let speculative = cur_start + prev_len;
1453            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1454                speculative
1455            } else {
1456                match memchr::memchr(term, unsafe {
1457                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1458                }) {
1459                    Some(offset) => cur_start + offset,
1460                    None => data_len,
1461                }
1462            }
1463        };
1464
1465        let cur_len = cur_end - cur_start;
1466
1467        let is_dup = if cur_len != prev_len {
1468            false
1469        } else if cur_len == 0 {
1470            true
1471        } else if cur_len >= 8 {
1472            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1473            if cur_prefix != prev_prefix {
1474                false
1475            } else if cur_len <= 8 {
1476                true
1477            } else {
1478                unsafe {
1479                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1480                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1481                    lines_equal_after_prefix(a, b)
1482                }
1483            }
1484        } else {
1485            unsafe {
1486                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1487                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1488                a == b
1489            }
1490        };
1491
1492        if is_dup {
1493            // Use doubling memcmp to skip entire duplicate run
1494            let pattern_len = prev_len + 1;
1495            let skip_end = skip_dup_run(data, cur_start, prev_start, pattern_len);
1496            let skipped = (skip_end - cur_start) / pattern_len;
1497            count += skipped as u64;
1498            cur_start = skip_end;
1499            continue;
1500        } else {
1501            let should_print = match config.mode {
1502                OutputMode::RepeatedOnly => count > 1,
1503                OutputMode::UniqueOnly => count == 1,
1504                _ => true,
1505            };
1506            if should_print {
1507                let idx = groups.len();
1508                let prefix_off = idx * PREFIX_SLOT;
1509                let prefix_len = format_count_prefix_into(
1510                    count,
1511                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1512                );
1513                groups.push((prefix_len, prev_start, prev_end));
1514
1515                if groups.len() >= BATCH {
1516                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1517                    groups.clear();
1518                    // Re-fill prefix_buf with spaces for next batch
1519                    prefix_buf.fill(b' ');
1520                }
1521            }
1522            prev_start = cur_start;
1523            prev_end = cur_end;
1524            prev_len = cur_len;
1525            prev_prefix = if cur_len >= 8 {
1526                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1527            } else {
1528                0
1529            };
1530            count = 1;
1531        }
1532
1533        if cur_end < data_len {
1534            cur_start = cur_end + 1;
1535        } else {
1536            break;
1537        }
1538    }
1539
1540    // Output last group
1541    let should_print = match config.mode {
1542        OutputMode::RepeatedOnly => count > 1,
1543        OutputMode::UniqueOnly => count == 1,
1544        _ => true,
1545    };
1546    if should_print {
1547        let idx = groups.len();
1548        let prefix_off = idx * PREFIX_SLOT;
1549        let prefix_len =
1550            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1551        groups.push((prefix_len, prev_start, prev_end));
1552    }
1553    if !groups.is_empty() {
1554        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1555    }
1556
1557    Ok(())
1558}
1559
1560/// Flush batched count groups using write_vectored (writev).
1561/// Builds IoSlice arrays from the prefix buffer and mmap'd data.
1562#[inline]
1563fn flush_count_groups(
1564    writer: &mut impl Write,
1565    prefix_buf: &[u8],
1566    groups: &[(usize, usize, usize)],
1567    term_slice: &[u8; 1],
1568    data: &[u8],
1569) -> io::Result<()> {
1570    const PREFIX_SLOT: usize = 28;
1571    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(groups.len() * 3);
1572    for (i, &(prefix_len, line_start, line_end)) in groups.iter().enumerate() {
1573        let prefix_off = i * PREFIX_SLOT;
1574        slices.push(io::IoSlice::new(
1575            &prefix_buf[prefix_off..prefix_off + prefix_len],
1576        ));
1577        slices.push(io::IoSlice::new(&data[line_start..line_end]));
1578        slices.push(io::IoSlice::new(term_slice));
1579    }
1580    write_all_vectored(writer, &slices)
1581}
1582
1583/// Format a count prefix into a buffer slot, returning the prefix length.
1584/// GNU format: "%7lu " — right-aligned count in 7-char field, followed by space.
1585/// Buffer must be pre-filled with spaces and at least 28 bytes.
1586#[inline(always)]
1587fn format_count_prefix_into(count: u64, buf: &mut [u8]) -> usize {
1588    if count <= 9 {
1589        buf[6] = b'0' + count as u8;
1590        buf[7] = b' ';
1591        return 8;
1592    }
1593    // Use itoa on a temp array, then copy
1594    let mut tmp = [b' '; 28];
1595    let digits = itoa_right_aligned_into(&mut tmp, count);
1596    let width = digits.max(7);
1597    tmp[width] = b' ';
1598    let len = width + 1;
1599    buf[..len].copy_from_slice(&tmp[..len]);
1600    len
1601}
1602
1603/// Fast single-pass for case-insensitive (-i) default mode.
1604/// Uses run-tracking zero-copy output and write_vectored batching.
1605/// Includes speculative line-end detection and length-based early rejection.
1606fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1607    let data_len = data.len();
1608    let base = data.as_ptr();
1609
1610    let first_end = match memchr::memchr(term, data) {
1611        Some(pos) => pos,
1612        None => {
1613            writer.write_all(data)?;
1614            return writer.write_all(&[term]);
1615        }
1616    };
1617
1618    let mut prev_start: usize = 0;
1619    let mut prev_len = first_end;
1620
1621    // Run-tracking: flush contiguous regions from the original data.
1622    let mut run_start: usize = 0;
1623    let mut cur_start = first_end + 1;
1624    let mut _last_output_end = first_end + 1;
1625
1626    while cur_start < data_len {
1627        // Speculative line-end detection
1628        let cur_end = {
1629            let speculative = cur_start + prev_len;
1630            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1631                speculative
1632            } else {
1633                match memchr::memchr(term, unsafe {
1634                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1635                }) {
1636                    Some(offset) => cur_start + offset,
1637                    None => data_len,
1638                }
1639            }
1640        };
1641
1642        let cur_len = cur_end - cur_start;
1643
1644        // Length-based early rejection before expensive case-insensitive compare
1645        let is_dup = cur_len == prev_len
1646            && unsafe {
1647                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1648                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1649                a.eq_ignore_ascii_case(b)
1650            };
1651
1652        if is_dup {
1653            // Duplicate — flush current run up to this line, skip it
1654            if run_start < cur_start {
1655                writer.write_all(&data[run_start..cur_start])?;
1656            }
1657            run_start = if cur_end < data_len {
1658                cur_end + 1
1659            } else {
1660                cur_end
1661            };
1662        } else {
1663            prev_start = cur_start;
1664            prev_len = cur_len;
1665            _last_output_end = if cur_end < data_len {
1666                cur_end + 1
1667            } else {
1668                cur_end
1669            };
1670        }
1671
1672        if cur_end < data_len {
1673            cur_start = cur_end + 1;
1674        } else {
1675            break;
1676        }
1677    }
1678
1679    // Flush remaining run
1680    if run_start < data_len {
1681        writer.write_all(&data[run_start..data_len])?;
1682    }
1683    // Ensure trailing terminator
1684    if !data.is_empty() && data[data_len - 1] != term {
1685        writer.write_all(&[term])?;
1686    }
1687
1688    Ok(())
1689}
1690
1691/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1692/// Zero-copy: writes directly from mmap data through BufWriter.
1693/// Uses speculative line-end detection and length-based early rejection.
1694fn process_filter_ci_singlepass(
1695    data: &[u8],
1696    writer: &mut impl Write,
1697    config: &UniqConfig,
1698    term: u8,
1699) -> io::Result<()> {
1700    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1701    let data_len = data.len();
1702    let base = data.as_ptr();
1703
1704    let first_term = match memchr::memchr(term, data) {
1705        Some(pos) => pos,
1706        None => {
1707            if !repeated {
1708                writer.write_all(data)?;
1709                writer.write_all(&[term])?;
1710            }
1711            return Ok(());
1712        }
1713    };
1714
1715    let mut prev_start: usize = 0;
1716    let mut prev_end: usize = first_term;
1717    let mut prev_len = prev_end;
1718    let mut count: u64 = 1;
1719    let mut cur_start = first_term + 1;
1720
1721    // Batch output using IoSlice write_vectored
1722    const BATCH: usize = 512;
1723    let term_slice: [u8; 1] = [term];
1724    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1725
1726    while cur_start < data_len {
1727        // Speculative line-end detection
1728        let cur_end = {
1729            let speculative = cur_start + prev_len;
1730            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1731                speculative
1732            } else {
1733                match memchr::memchr(term, unsafe {
1734                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1735                }) {
1736                    Some(offset) => cur_start + offset,
1737                    None => data_len,
1738                }
1739            }
1740        };
1741
1742        let cur_len = cur_end - cur_start;
1743        // Length check + case-insensitive comparison
1744        let is_dup = cur_len == prev_len
1745            && lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]);
1746
1747        if is_dup {
1748            count += 1;
1749        } else {
1750            let should_print = if repeated { count > 1 } else { count == 1 };
1751            if should_print {
1752                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1753                slices.push(io::IoSlice::new(&term_slice));
1754                if slices.len() >= BATCH * 2 {
1755                    write_all_vectored(writer, &slices)?;
1756                    slices.clear();
1757                }
1758            }
1759            prev_start = cur_start;
1760            prev_end = cur_end;
1761            prev_len = cur_len;
1762            count = 1;
1763        }
1764
1765        if cur_end < data_len {
1766            cur_start = cur_end + 1;
1767        } else {
1768            break;
1769        }
1770    }
1771
1772    let should_print = if repeated { count > 1 } else { count == 1 };
1773    if should_print {
1774        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1775        slices.push(io::IoSlice::new(&term_slice));
1776    }
1777    if !slices.is_empty() {
1778        write_all_vectored(writer, &slices)?;
1779    }
1780
1781    Ok(())
1782}
1783
1784/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1785/// Writes directly to BufWriter — no batch_buf allocation needed.
1786fn process_count_ci_singlepass(
1787    data: &[u8],
1788    writer: &mut impl Write,
1789    config: &UniqConfig,
1790    term: u8,
1791) -> io::Result<()> {
1792    let first_term = match memchr::memchr(term, data) {
1793        Some(pos) => pos,
1794        None => {
1795            let should_print = match config.mode {
1796                OutputMode::Default => true,
1797                OutputMode::RepeatedOnly => false,
1798                OutputMode::UniqueOnly => true,
1799                _ => true,
1800            };
1801            if should_print {
1802                write_count_line(writer, 1, data, term)?;
1803            }
1804            return Ok(());
1805        }
1806    };
1807
1808    let is_default = matches!(config.mode, OutputMode::Default);
1809
1810    let mut prev_start: usize = 0;
1811    let mut prev_end: usize = first_term;
1812    let mut count: u64 = 1;
1813    let mut cur_start = first_term + 1;
1814
1815    // Zero-copy writev batching: same approach as process_count_fast_singlepass
1816    const BATCH: usize = 340;
1817    const PREFIX_SLOT: usize = 28;
1818    let term_slice: [u8; 1] = [term];
1819    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1820    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1821
1822    let base = data.as_ptr();
1823    let data_len = data.len();
1824    let mut prev_len = prev_end - prev_start;
1825
1826    while cur_start < data_len {
1827        // Speculative line-end detection
1828        let cur_end = {
1829            let speculative = cur_start + prev_len;
1830            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1831                speculative
1832            } else {
1833                match memchr::memchr(term, unsafe {
1834                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1835                }) {
1836                    Some(offset) => cur_start + offset,
1837                    None => data_len,
1838                }
1839            }
1840        };
1841
1842        let cur_len = cur_end - cur_start;
1843        // Length-based early rejection before expensive case-insensitive compare
1844        let is_dup = cur_len == prev_len
1845            && data[prev_start..prev_end].eq_ignore_ascii_case(&data[cur_start..cur_end]);
1846
1847        if is_dup {
1848            count += 1;
1849        } else {
1850            let should_print = if is_default {
1851                true
1852            } else {
1853                match config.mode {
1854                    OutputMode::RepeatedOnly => count > 1,
1855                    OutputMode::UniqueOnly => count == 1,
1856                    _ => true,
1857                }
1858            };
1859            if should_print {
1860                let idx = groups.len();
1861                let prefix_off = idx * PREFIX_SLOT;
1862                let prefix_len = format_count_prefix_into(
1863                    count,
1864                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1865                );
1866                groups.push((prefix_len, prev_start, prev_end));
1867
1868                if groups.len() >= BATCH {
1869                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1870                    groups.clear();
1871                    prefix_buf.fill(b' ');
1872                }
1873            }
1874            prev_start = cur_start;
1875            prev_end = cur_end;
1876            prev_len = cur_len;
1877            count = 1;
1878        }
1879
1880        if cur_end < data_len {
1881            cur_start = cur_end + 1;
1882        } else {
1883            break;
1884        }
1885    }
1886
1887    let should_print = if is_default {
1888        true
1889    } else {
1890        match config.mode {
1891            OutputMode::RepeatedOnly => count > 1,
1892            OutputMode::UniqueOnly => count == 1,
1893            _ => true,
1894        }
1895    };
1896    if should_print {
1897        let idx = groups.len();
1898        let prefix_off = idx * PREFIX_SLOT;
1899        let prefix_len =
1900            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1901        groups.push((prefix_len, prev_start, prev_end));
1902    }
1903    if !groups.is_empty() {
1904        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1905    }
1906
1907    Ok(())
1908}
1909
1910/// Output a group for standard modes (bytes path).
1911#[inline(always)]
1912fn output_group_bytes(
1913    writer: &mut impl Write,
1914    content: &[u8],
1915    full: &[u8],
1916    count: u64,
1917    config: &UniqConfig,
1918    term: u8,
1919) -> io::Result<()> {
1920    let should_print = match config.mode {
1921        OutputMode::Default => true,
1922        OutputMode::RepeatedOnly => count > 1,
1923        OutputMode::UniqueOnly => count == 1,
1924        _ => true,
1925    };
1926
1927    if should_print {
1928        if config.count {
1929            write_count_line(writer, count, content, term)?;
1930        } else {
1931            writer.write_all(full)?;
1932            // Add terminator if the original line didn't have one
1933            if full.len() == content.len() {
1934                writer.write_all(&[term])?;
1935            }
1936        }
1937    }
1938
1939    Ok(())
1940}
1941
1942/// Process --all-repeated / -D mode on byte slices.
1943fn process_all_repeated_bytes(
1944    data: &[u8],
1945    writer: &mut impl Write,
1946    config: &UniqConfig,
1947    method: AllRepeatedMethod,
1948    term: u8,
1949) -> io::Result<()> {
1950    let mut lines = LineIter::new(data, term);
1951
1952    let first = match lines.next() {
1953        Some(v) => v,
1954        None => return Ok(()),
1955    };
1956
1957    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1958    // For all-repeated we need to buffer group lines since we only print if count > 1
1959    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1960    group_lines.push(first);
1961    let mut first_group_printed = false;
1962
1963    let fast = !needs_key_extraction(config) && !config.ignore_case;
1964
1965    for (cur_content, cur_full) in lines {
1966        let prev_content = group_lines.last().unwrap().0;
1967        let equal = if fast {
1968            lines_equal_fast(prev_content, cur_content)
1969        } else {
1970            lines_equal(prev_content, cur_content, config)
1971        };
1972
1973        if equal {
1974            group_lines.push((cur_content, cur_full));
1975        } else {
1976            // Flush group
1977            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1978            group_lines.clear();
1979            group_lines.push((cur_content, cur_full));
1980        }
1981    }
1982
1983    // Flush last group
1984    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1985
1986    Ok(())
1987}
1988
1989/// Flush a group for --all-repeated mode (bytes path).
1990fn flush_all_repeated_bytes(
1991    writer: &mut impl Write,
1992    group: &[(&[u8], &[u8])],
1993    method: AllRepeatedMethod,
1994    first_group_printed: &mut bool,
1995    term: u8,
1996) -> io::Result<()> {
1997    if group.len() <= 1 {
1998        return Ok(()); // Not a duplicate group
1999    }
2000
2001    match method {
2002        AllRepeatedMethod::Prepend => {
2003            writer.write_all(&[term])?;
2004        }
2005        AllRepeatedMethod::Separate => {
2006            if *first_group_printed {
2007                writer.write_all(&[term])?;
2008            }
2009        }
2010        AllRepeatedMethod::None => {}
2011    }
2012
2013    for &(content, full) in group {
2014        writer.write_all(full)?;
2015        if full.len() == content.len() {
2016            writer.write_all(&[term])?;
2017        }
2018    }
2019
2020    *first_group_printed = true;
2021    Ok(())
2022}
2023
2024/// Process --group mode on byte slices.
2025fn process_group_bytes(
2026    data: &[u8],
2027    writer: &mut impl Write,
2028    config: &UniqConfig,
2029    method: GroupMethod,
2030    term: u8,
2031) -> io::Result<()> {
2032    let mut lines = LineIter::new(data, term);
2033
2034    let (prev_content, prev_full) = match lines.next() {
2035        Some(v) => v,
2036        None => return Ok(()),
2037    };
2038
2039    // Prepend/Both: separator before first group
2040    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
2041        writer.write_all(&[term])?;
2042    }
2043
2044    // Write first line
2045    writer.write_all(prev_full)?;
2046    if prev_full.len() == prev_content.len() {
2047        writer.write_all(&[term])?;
2048    }
2049
2050    let mut prev_content = prev_content;
2051    let fast = !needs_key_extraction(config) && !config.ignore_case;
2052
2053    for (cur_content, cur_full) in lines {
2054        let equal = if fast {
2055            lines_equal_fast(prev_content, cur_content)
2056        } else {
2057            lines_equal(prev_content, cur_content, config)
2058        };
2059
2060        if !equal {
2061            // New group — write separator
2062            writer.write_all(&[term])?;
2063        }
2064
2065        writer.write_all(cur_full)?;
2066        if cur_full.len() == cur_content.len() {
2067            writer.write_all(&[term])?;
2068        }
2069
2070        prev_content = cur_content;
2071    }
2072
2073    // Append/Both: separator after last group
2074    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
2075        writer.write_all(&[term])?;
2076    }
2077
2078    Ok(())
2079}
2080
2081// ============================================================================
2082// Streaming processing (for stdin / pipe input)
2083// ============================================================================
2084
2085/// Main streaming uniq processor.
2086/// Reads from `input`, writes to `output`.
2087pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
2088    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
2089    let mut writer = BufWriter::with_capacity(32 * 1024 * 1024, output);
2090    let term = if config.zero_terminated { b'\0' } else { b'\n' };
2091
2092    match config.mode {
2093        OutputMode::Group(method) => {
2094            process_group_stream(reader, &mut writer, config, method, term)?;
2095        }
2096        OutputMode::AllRepeated(method) => {
2097            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
2098        }
2099        _ => {
2100            process_standard_stream(reader, &mut writer, config, term)?;
2101        }
2102    }
2103
2104    writer.flush()?;
2105    Ok(())
2106}
2107
2108/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
2109fn process_standard_stream<R: BufRead, W: Write>(
2110    mut reader: R,
2111    writer: &mut W,
2112    config: &UniqConfig,
2113    term: u8,
2114) -> io::Result<()> {
2115    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2116    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2117
2118    // Read first line
2119    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2120        return Ok(()); // empty input
2121    }
2122    let mut count: u64 = 1;
2123
2124    loop {
2125        current_line.clear();
2126        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2127
2128        if bytes_read == 0 {
2129            // End of input — output the last group
2130            output_group_stream(writer, &prev_line, count, config, term)?;
2131            break;
2132        }
2133
2134        if compare_lines_stream(&prev_line, &current_line, config, term) {
2135            count += 1;
2136        } else {
2137            output_group_stream(writer, &prev_line, count, config, term)?;
2138            std::mem::swap(&mut prev_line, &mut current_line);
2139            count = 1;
2140        }
2141    }
2142
2143    Ok(())
2144}
2145
2146/// Compare two lines (with terminators) in streaming mode.
2147#[inline(always)]
2148fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
2149    let a_stripped = strip_term(a, term);
2150    let b_stripped = strip_term(b, term);
2151    lines_equal(a_stripped, b_stripped, config)
2152}
2153
2154/// Strip terminator from end of line.
2155#[inline(always)]
2156fn strip_term(line: &[u8], term: u8) -> &[u8] {
2157    if line.last() == Some(&term) {
2158        &line[..line.len() - 1]
2159    } else {
2160        line
2161    }
2162}
2163
2164/// Output a group in streaming mode.
2165#[inline(always)]
2166fn output_group_stream(
2167    writer: &mut impl Write,
2168    line: &[u8],
2169    count: u64,
2170    config: &UniqConfig,
2171    term: u8,
2172) -> io::Result<()> {
2173    let should_print = match config.mode {
2174        OutputMode::Default => true,
2175        OutputMode::RepeatedOnly => count > 1,
2176        OutputMode::UniqueOnly => count == 1,
2177        _ => true,
2178    };
2179
2180    if should_print {
2181        let content = strip_term(line, term);
2182        if config.count {
2183            write_count_line(writer, count, content, term)?;
2184        } else {
2185            writer.write_all(content)?;
2186            writer.write_all(&[term])?;
2187        }
2188    }
2189
2190    Ok(())
2191}
2192
2193/// Process --all-repeated / -D mode (streaming).
2194fn process_all_repeated_stream<R: BufRead, W: Write>(
2195    mut reader: R,
2196    writer: &mut W,
2197    config: &UniqConfig,
2198    method: AllRepeatedMethod,
2199    term: u8,
2200) -> io::Result<()> {
2201    let mut group: Vec<Vec<u8>> = Vec::new();
2202    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2203    let mut first_group_printed = false;
2204
2205    current_line.clear();
2206    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
2207        return Ok(());
2208    }
2209    group.push(current_line.clone());
2210
2211    loop {
2212        current_line.clear();
2213        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2214
2215        if bytes_read == 0 {
2216            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2217            break;
2218        }
2219
2220        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
2221            group.push(current_line.clone());
2222        } else {
2223            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2224            group.clear();
2225            group.push(current_line.clone());
2226        }
2227    }
2228
2229    Ok(())
2230}
2231
2232/// Flush a group for --all-repeated mode (streaming).
2233fn flush_all_repeated_stream(
2234    writer: &mut impl Write,
2235    group: &[Vec<u8>],
2236    method: AllRepeatedMethod,
2237    first_group_printed: &mut bool,
2238    term: u8,
2239) -> io::Result<()> {
2240    if group.len() <= 1 {
2241        return Ok(());
2242    }
2243
2244    match method {
2245        AllRepeatedMethod::Prepend => {
2246            writer.write_all(&[term])?;
2247        }
2248        AllRepeatedMethod::Separate => {
2249            if *first_group_printed {
2250                writer.write_all(&[term])?;
2251            }
2252        }
2253        AllRepeatedMethod::None => {}
2254    }
2255
2256    for line in group {
2257        let content = strip_term(line, term);
2258        writer.write_all(content)?;
2259        writer.write_all(&[term])?;
2260    }
2261
2262    *first_group_printed = true;
2263    Ok(())
2264}
2265
2266/// Process --group mode (streaming).
2267fn process_group_stream<R: BufRead, W: Write>(
2268    mut reader: R,
2269    writer: &mut W,
2270    config: &UniqConfig,
2271    method: GroupMethod,
2272    term: u8,
2273) -> io::Result<()> {
2274    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2275    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2276
2277    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2278        return Ok(());
2279    }
2280
2281    // Prepend/Both: separator before first group
2282    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
2283        writer.write_all(&[term])?;
2284    }
2285
2286    let content = strip_term(&prev_line, term);
2287    writer.write_all(content)?;
2288    writer.write_all(&[term])?;
2289
2290    loop {
2291        current_line.clear();
2292        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2293
2294        if bytes_read == 0 {
2295            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
2296                writer.write_all(&[term])?;
2297            }
2298            break;
2299        }
2300
2301        if !compare_lines_stream(&prev_line, &current_line, config, term) {
2302            writer.write_all(&[term])?;
2303        }
2304
2305        let content = strip_term(&current_line, term);
2306        writer.write_all(content)?;
2307        writer.write_all(&[term])?;
2308
2309        std::mem::swap(&mut prev_line, &mut current_line);
2310    }
2311
2312    Ok(())
2313}
2314
2315/// Read a line terminated by the given byte (newline or NUL).
2316/// Returns number of bytes read (0 = EOF).
2317#[inline(always)]
2318fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
2319    reader.read_until(term, buf)
2320}