Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// Write all IoSlices to the writer, handling partial writes correctly.
10fn write_all_vectored(writer: &mut impl Write, slices: &[io::IoSlice<'_>]) -> io::Result<()> {
11    let n = writer.write_vectored(slices)?;
12    let expected: usize = slices.iter().map(|s| s.len()).sum();
13    if n >= expected {
14        return Ok(());
15    }
16    if n == 0 && expected > 0 {
17        return Err(io::Error::new(
18            io::ErrorKind::WriteZero,
19            "write_vectored returned 0",
20        ));
21    }
22    // Slow path: partial write — fall back to write_all per remaining slice.
23    let mut consumed = n;
24    for slice in slices {
25        if consumed == 0 {
26            writer.write_all(slice)?;
27        } else if consumed >= slice.len() {
28            consumed -= slice.len();
29        } else {
30            writer.write_all(&slice[consumed..])?;
31            consumed = 0;
32        }
33    }
34    Ok(())
35}
36
37/// How to delimit groups when using --all-repeated
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum AllRepeatedMethod {
40    None,
41    Prepend,
42    Separate,
43}
44
45/// How to delimit groups when using --group
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum GroupMethod {
48    Separate,
49    Prepend,
50    Append,
51    Both,
52}
53
54/// Output mode for uniq
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum OutputMode {
57    /// Default: print unique lines and first of each duplicate group
58    Default,
59    /// -d: print only first line of duplicate groups
60    RepeatedOnly,
61    /// -D / --all-repeated: print ALL duplicate lines
62    AllRepeated(AllRepeatedMethod),
63    /// -u: print only lines that are NOT duplicated
64    UniqueOnly,
65    /// --group: show all items with group separators
66    Group(GroupMethod),
67}
68
69/// Configuration for uniq processing
70#[derive(Debug, Clone)]
71pub struct UniqConfig {
72    pub mode: OutputMode,
73    pub count: bool,
74    pub ignore_case: bool,
75    pub skip_fields: usize,
76    pub skip_chars: usize,
77    pub check_chars: Option<usize>,
78    pub zero_terminated: bool,
79}
80
81impl Default for UniqConfig {
82    fn default() -> Self {
83        Self {
84            mode: OutputMode::Default,
85            count: false,
86            ignore_case: false,
87            skip_fields: 0,
88            skip_chars: 0,
89            check_chars: None,
90            zero_terminated: false,
91        }
92    }
93}
94
95/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
96/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
97#[inline(always)]
98fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
99    let mut start = 0;
100    let len = line.len();
101
102    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
103    // Early-exit if already past end of line to avoid O(skip_fields) loop
104    let mut fields_remaining = config.skip_fields;
105    while fields_remaining > 0 && start < len {
106        // Skip blanks (space and tab)
107        while start < len && (line[start] == b' ' || line[start] == b'\t') {
108            start += 1;
109        }
110        // Skip non-blanks (field content)
111        while start < len && line[start] != b' ' && line[start] != b'\t' {
112            start += 1;
113        }
114        fields_remaining -= 1;
115    }
116
117    // Skip N characters
118    if config.skip_chars > 0 {
119        let remaining = len - start;
120        let skip = config.skip_chars.min(remaining);
121        start += skip;
122    }
123
124    let slice = &line[start..];
125
126    // Limit comparison to N characters
127    if let Some(w) = config.check_chars {
128        if w < slice.len() {
129            return &slice[..w];
130        }
131    }
132
133    slice
134}
135
136/// Compare two lines (without terminators) using the config's comparison rules.
137#[inline(always)]
138fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
139    let sa = get_compare_slice(a, config);
140    let sb = get_compare_slice(b, config);
141
142    if config.ignore_case {
143        sa.eq_ignore_ascii_case(sb)
144    } else {
145        sa == sb
146    }
147}
148
149/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
150/// Uses length check + 8-byte prefix rejection before full comparison.
151#[inline(always)]
152fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
153    let alen = a.len();
154    if alen != b.len() {
155        return false;
156    }
157    if alen == 0 {
158        return true;
159    }
160    a.eq_ignore_ascii_case(b)
161}
162
163/// Check if config requires field/char skipping or char limiting.
164#[inline(always)]
165fn needs_key_extraction(config: &UniqConfig) -> bool {
166    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
167}
168
169/// Fast path comparison: no field/char extraction needed, no case folding.
170/// Uses pointer+length equality shortcut and multi-word prefix rejection.
171/// For short lines (<= 32 bytes, common in many-dups data), avoids the
172/// full memcmp call overhead by doing direct word comparisons.
173/// For medium lines (33-256 bytes), uses a tight u64 loop covering the
174/// full line without falling through to memcmp.
175#[inline(always)]
176fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
177    let alen = a.len();
178    if alen != b.len() {
179        return false;
180    }
181    if alen == 0 {
182        return true;
183    }
184    // Short-line fast path: compare via word loads to avoid memcmp call overhead
185    if alen <= 8 {
186        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
187        return a == b;
188    }
189    unsafe {
190        let ap = a.as_ptr();
191        let bp = b.as_ptr();
192        // 8-byte prefix check: reject most non-equal lines without full memcmp
193        let a8 = (ap as *const u64).read_unaligned();
194        let b8 = (bp as *const u64).read_unaligned();
195        if a8 != b8 {
196            return false;
197        }
198        // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
199        if alen <= 16 {
200            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
201            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
202            return a_tail == b_tail;
203        }
204        // For 17-32 bytes: check first 16 + last 16 (overlapping) to avoid memcmp
205        if alen <= 32 {
206            let a16 = (ap.add(8) as *const u64).read_unaligned();
207            let b16 = (bp.add(8) as *const u64).read_unaligned();
208            if a16 != b16 {
209                return false;
210            }
211            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
212            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
213            return a_tail == b_tail;
214        }
215        // For 33-256 bytes: tight u64 loop covering the full line.
216        // Compare 32 bytes per iteration (4 u64 loads), then handle tail.
217        // This avoids the function call overhead of memcmp for medium lines.
218        if alen <= 256 {
219            let mut off = 8usize; // first 8 bytes already compared
220            // Compare 32 bytes at a time
221            while off + 32 <= alen {
222                let a0 = (ap.add(off) as *const u64).read_unaligned();
223                let b0 = (bp.add(off) as *const u64).read_unaligned();
224                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
225                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
226                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
227                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
228                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
229                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
230                // XOR all pairs and OR together: zero if all equal
231                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
232                    return false;
233                }
234                off += 32;
235            }
236            // Compare remaining 8 bytes at a time
237            while off + 8 <= alen {
238                let aw = (ap.add(off) as *const u64).read_unaligned();
239                let bw = (bp.add(off) as *const u64).read_unaligned();
240                if aw != bw {
241                    return false;
242                }
243                off += 8;
244            }
245            // Compare tail (overlapping last 8 bytes)
246            if off < alen {
247                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
248                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
249                return a_tail == b_tail;
250            }
251            return true;
252        }
253    }
254    // Longer lines (>256): prefix passed, fall through to full memcmp
255    a == b
256}
257
258/// Compare two equal-length lines starting from byte 8.
259/// Caller has already checked: lengths are equal, both >= 9 bytes, first 8 bytes match.
260/// This avoids redundant checks when the calling loop already did prefix rejection.
261#[inline(always)]
262fn lines_equal_after_prefix(a: &[u8], b: &[u8]) -> bool {
263    let alen = a.len();
264    debug_assert!(alen == b.len());
265    debug_assert!(alen > 8);
266    unsafe {
267        let ap = a.as_ptr();
268        let bp = b.as_ptr();
269        // Check last 8 bytes first (overlapping for 9-16 byte lines)
270        if alen <= 16 {
271            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
272            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
273            return a_tail == b_tail;
274        }
275        if alen <= 32 {
276            let a16 = (ap.add(8) as *const u64).read_unaligned();
277            let b16 = (bp.add(8) as *const u64).read_unaligned();
278            if a16 != b16 {
279                return false;
280            }
281            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
282            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
283            return a_tail == b_tail;
284        }
285        if alen <= 256 {
286            let mut off = 8usize;
287            while off + 32 <= alen {
288                let a0 = (ap.add(off) as *const u64).read_unaligned();
289                let b0 = (bp.add(off) as *const u64).read_unaligned();
290                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
291                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
292                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
293                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
294                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
295                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
296                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
297                    return false;
298                }
299                off += 32;
300            }
301            while off + 8 <= alen {
302                let aw = (ap.add(off) as *const u64).read_unaligned();
303                let bw = (bp.add(off) as *const u64).read_unaligned();
304                if aw != bw {
305                    return false;
306                }
307                off += 8;
308            }
309            if off < alen {
310                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
311                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
312                return a_tail == b_tail;
313            }
314            return true;
315        }
316    }
317    // >256 bytes: use memcmp via slice comparison (skipping the already-compared prefix)
318    a[8..] == b[8..]
319}
320
321/// Write a count-prefixed line in GNU uniq format.
322/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
323/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
324///
325/// Optimized with lookup table for counts 1-9 (most common case in many-dups data)
326/// and fast-path for counts < 10M (always fits in 7 chars, no copy_within needed).
327#[inline(always)]
328fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
329    // Ultra-fast path for common small counts: pre-built prefix strings
330    // Avoids all the itoa/copy_within overhead for the most common case.
331    if count <= 9 {
332        // "      N " where N is 1-9 (7 chars + space = 8 bytes prefix)
333        let prefix: &[u8] = match count {
334            1 => b"      1 ",
335            2 => b"      2 ",
336            3 => b"      3 ",
337            4 => b"      4 ",
338            5 => b"      5 ",
339            6 => b"      6 ",
340            7 => b"      7 ",
341            8 => b"      8 ",
342            9 => b"      9 ",
343            _ => unreachable!(),
344        };
345        let total = 8 + line.len() + 1;
346        if total <= 256 {
347            let mut buf = [0u8; 256];
348            unsafe {
349                std::ptr::copy_nonoverlapping(prefix.as_ptr(), buf.as_mut_ptr(), 8);
350                std::ptr::copy_nonoverlapping(line.as_ptr(), buf.as_mut_ptr().add(8), line.len());
351                *buf.as_mut_ptr().add(8 + line.len()) = term;
352            }
353            return out.write_all(&buf[..total]);
354        } else {
355            out.write_all(prefix)?;
356            out.write_all(line)?;
357            return out.write_all(&[term]);
358        }
359    }
360
361    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
362    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
363    let digits = itoa_right_aligned_into(&mut prefix, count);
364    let width = digits.max(7); // minimum 7 chars
365    let prefix_len = width + 1; // +1 for trailing space
366    prefix[width] = b' ';
367
368    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
369    let total = prefix_len + line.len() + 1;
370    if total <= 256 {
371        let mut buf = [0u8; 256];
372        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
373        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
374        buf[prefix_len + line.len()] = term;
375        out.write_all(&buf[..total])
376    } else {
377        out.write_all(&prefix[..prefix_len])?;
378        out.write_all(line)?;
379        out.write_all(&[term])
380    }
381}
382
383/// Write u64 decimal right-aligned into prefix buffer.
384/// Buffer is pre-filled with spaces. Returns number of digits written.
385#[inline(always)]
386fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
387    if val == 0 {
388        buf[6] = b'0';
389        return 7; // 6 spaces + '0' = 7 chars
390    }
391    // Write digits right-to-left from position 27 (leaving room for trailing space)
392    let mut pos = 27;
393    while val > 0 {
394        pos -= 1;
395        buf[pos] = b'0' + (val % 10) as u8;
396        val /= 10;
397    }
398    let num_digits = 27 - pos;
399    if num_digits >= 7 {
400        // Number is wide enough, shift to front
401        buf.copy_within(pos..27, 0);
402        num_digits
403    } else {
404        // Right-align in 7-char field: spaces then digits
405        let pad = 7 - num_digits;
406        buf.copy_within(pos..27, pad);
407        // buf[0..pad] is already spaces from initialization
408        7
409    }
410}
411
412// ============================================================================
413// High-performance mmap-based processing (for byte slices, zero-copy)
414// ============================================================================
415
416/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
417pub fn process_uniq_bytes(
418    data: &[u8],
419    mut output: impl Write,
420    config: &UniqConfig,
421) -> io::Result<()> {
422    let term = if config.zero_terminated { b'\0' } else { b'\n' };
423
424    // Zero-copy fast path: bypass BufWriter for standard modes with IoSlice output.
425    // Default mode: writes contiguous runs directly from mmap data via writev.
426    // Filter modes (-d/-u): IoSlice batching (512 lines per writev).
427    // Count mode (-c): IoSlice batching (340 groups per writev, prefix arena + mmap data).
428    // Without BufWriter, writes go directly via writev/vmsplice (zero-copy for data slices).
429    let fast = !needs_key_extraction(config) && !config.ignore_case;
430    if fast
431        && matches!(
432            config.mode,
433            OutputMode::Default | OutputMode::RepeatedOnly | OutputMode::UniqueOnly
434        )
435    {
436        return process_standard_bytes(data, &mut output, config, term);
437    }
438
439    // General path with BufWriter for modes that need formatting/buffering.
440    // 16MB buffer — optimal for L3 cache utilization on modern CPUs.
441    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
442
443    match config.mode {
444        OutputMode::Group(method) => {
445            process_group_bytes(data, &mut writer, config, method, term)?;
446        }
447        OutputMode::AllRepeated(method) => {
448            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
449        }
450        _ => {
451            process_standard_bytes(data, &mut writer, config, term)?;
452        }
453    }
454
455    writer.flush()?;
456    Ok(())
457}
458
459/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
460/// Uses memchr for SIMD-accelerated line boundary detection.
461struct LineIter<'a> {
462    data: &'a [u8],
463    pos: usize,
464    term: u8,
465}
466
467impl<'a> LineIter<'a> {
468    #[inline(always)]
469    fn new(data: &'a [u8], term: u8) -> Self {
470        Self { data, pos: 0, term }
471    }
472}
473
474impl<'a> Iterator for LineIter<'a> {
475    /// (line content without terminator, full line including terminator for output)
476    type Item = (&'a [u8], &'a [u8]);
477
478    #[inline(always)]
479    fn next(&mut self) -> Option<Self::Item> {
480        if self.pos >= self.data.len() {
481            return None;
482        }
483
484        let remaining = &self.data[self.pos..];
485        match memchr::memchr(self.term, remaining) {
486            Some(idx) => {
487                let line_start = self.pos;
488                let line_end = self.pos + idx; // without terminator
489                let full_end = self.pos + idx + 1; // with terminator
490                self.pos = full_end;
491                Some((
492                    &self.data[line_start..line_end],
493                    &self.data[line_start..full_end],
494                ))
495            }
496            None => {
497                // Last line without terminator
498                let line_start = self.pos;
499                self.pos = self.data.len();
500                let line = &self.data[line_start..];
501                Some((line, line))
502            }
503        }
504    }
505}
506
507/// Get line content (without terminator) from pre-computed positions.
508/// `content_end` is the end of actual content (excludes trailing terminator if present).
509#[inline(always)]
510fn line_content_at<'a>(
511    data: &'a [u8],
512    line_starts: &[usize],
513    idx: usize,
514    content_end: usize,
515) -> &'a [u8] {
516    let start = line_starts[idx];
517    let end = if idx + 1 < line_starts.len() {
518        line_starts[idx + 1] - 1 // exclude terminator
519    } else {
520        content_end // last line: pre-computed to exclude trailing terminator
521    };
522    &data[start..end]
523}
524
525/// Get full line (with terminator) from pre-computed positions.
526#[inline(always)]
527fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
528    let start = line_starts[idx];
529    let end = if idx + 1 < line_starts.len() {
530        line_starts[idx + 1] // include terminator
531    } else {
532        data.len()
533    };
534    &data[start..end]
535}
536
537/// Skip a run of identical lines using doubling memcmp.
538/// When a duplicate is found at `dup_start`, this verifies progressively larger
539/// blocks of identical `pattern_len`-byte copies using memcmp (SIMD-accelerated).
540/// Returns the byte offset just past the last verified duplicate copy.
541///
542/// For 50K identical 6-byte lines: ~16 memcmp calls (~600KB total) vs 50K per-line
543/// comparisons. At memcmp's SIMD throughput (~48GB/s), this takes ~12µs vs ~250µs.
544///
545/// Correctness: the doubling trick verifies every byte in the range by induction.
546/// Block[0..N] verified → check Block[N..2N] == Block[0..N] → Block[0..2N] verified.
547#[inline]
548fn skip_dup_run(data: &[u8], dup_start: usize, pattern_start: usize, pattern_len: usize) -> usize {
549    let data_len = data.len();
550    // Need at least 2 more copies worth of data for doubling to help
551    if pattern_len == 0 || dup_start + 2 * pattern_len > data_len {
552        return dup_start + pattern_len.min(data_len - dup_start);
553    }
554
555    let mut verified_end = dup_start + pattern_len; // 1 copy verified
556
557    // Phase 1: doubling — compare verified block vs next block of same size.
558    // Each step doubles the verified region. Total bytes compared ≈ 2 × total region.
559    let mut block_copies = 1usize;
560    loop {
561        let block_bytes = block_copies * pattern_len;
562        let next_end = verified_end + block_bytes;
563        if next_end > data_len {
564            // Not enough room for a full doubling. Check remaining complete copies.
565            let remaining = data_len - verified_end;
566            let remaining_bytes = (remaining / pattern_len) * pattern_len;
567            if remaining_bytes > 0
568                && data[dup_start..dup_start + remaining_bytes]
569                    == data[verified_end..verified_end + remaining_bytes]
570            {
571                verified_end += remaining_bytes;
572            }
573            break;
574        }
575
576        if data[dup_start..dup_start + block_bytes] == data[verified_end..next_end] {
577            verified_end = next_end;
578            block_copies *= 2;
579        } else {
580            break;
581        }
582    }
583
584    // Phase 2: linear scan for remaining lines at the boundary.
585    // At most `block_copies` iterations (the last failed block size).
586    while verified_end + pattern_len <= data_len {
587        if data[verified_end..verified_end + pattern_len]
588            == data[pattern_start..pattern_start + pattern_len]
589        {
590            verified_end += pattern_len;
591        } else {
592            break;
593        }
594    }
595
596    verified_end
597}
598
599/// Linear scan for the end of a duplicate group.
600/// Returns the index of the first line that differs from line_starts[group_start].
601/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
602/// equal lines can appear in non-adjacent groups separated by different lines.
603/// Caches key length for fast length-mismatch rejection.
604#[inline]
605fn linear_scan_group_end(
606    data: &[u8],
607    line_starts: &[usize],
608    group_start: usize,
609    num_lines: usize,
610    content_end: usize,
611) -> usize {
612    let key = line_content_at(data, line_starts, group_start, content_end);
613    let key_len = key.len();
614    let mut i = group_start + 1;
615    while i < num_lines {
616        let candidate = line_content_at(data, line_starts, i, content_end);
617        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
618            return i;
619        }
620        i += 1;
621    }
622    i
623}
624
625/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
626/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
627/// General path: pre-computed line positions with binary search for groups.
628fn process_standard_bytes(
629    data: &[u8],
630    writer: &mut impl Write,
631    config: &UniqConfig,
632    term: u8,
633) -> io::Result<()> {
634    if data.is_empty() {
635        return Ok(());
636    }
637
638    let fast = !needs_key_extraction(config) && !config.ignore_case;
639    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
640
641    // Ultra-fast path: default mode, no count, no key extraction.
642    // Single-pass: scan with memchr, compare adjacent lines inline.
643    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
644    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
645        return process_default_fast_singlepass(data, writer, term);
646    }
647
648    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
649    if fast
650        && !config.count
651        && matches!(
652            config.mode,
653            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
654        )
655    {
656        return process_filter_fast_singlepass(data, writer, config, term);
657    }
658
659    // Ultra-fast path: count mode with no key extraction.
660    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
661    // Avoids the line_starts Vec allocation (20MB+ for large files).
662    if fast && config.count {
663        return process_count_fast_singlepass(data, writer, config, term);
664    }
665
666    // Fast path for case-insensitive (-i) mode with no key extraction.
667    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
668    // Avoids the general path's line_starts Vec allocation.
669    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
670        return process_default_ci_singlepass(data, writer, term);
671    }
672
673    if fast_ci
674        && !config.count
675        && matches!(
676            config.mode,
677            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
678        )
679    {
680        return process_filter_ci_singlepass(data, writer, config, term);
681    }
682
683    if fast_ci && config.count {
684        return process_count_ci_singlepass(data, writer, config, term);
685    }
686
687    // General path: pre-computed line positions for binary search on groups
688    let estimated_lines = (data.len() / 40).max(64);
689    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
690    line_starts.push(0);
691    for pos in memchr::memchr_iter(term, data) {
692        if pos + 1 < data.len() {
693            line_starts.push(pos + 1);
694        }
695    }
696    let num_lines = line_starts.len();
697    if num_lines == 0 {
698        return Ok(());
699    }
700
701    // Pre-compute content end: if data ends with terminator, exclude it for last line
702    let content_end = if data.last() == Some(&term) {
703        data.len() - 1
704    } else {
705        data.len()
706    };
707
708    // Ultra-fast path: default mode, no count, no key extraction
709    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
710        // Write first line
711        let first_full = line_full_at(data, &line_starts, 0);
712        let first_content = line_content_at(data, &line_starts, 0, content_end);
713        write_all_raw(writer, first_full)?;
714        if first_full.len() == first_content.len() {
715            writer.write_all(&[term])?;
716        }
717
718        let mut i = 1;
719        while i < num_lines {
720            let prev = line_content_at(data, &line_starts, i - 1, content_end);
721            let cur = line_content_at(data, &line_starts, i, content_end);
722
723            if lines_equal_fast(prev, cur) {
724                // Duplicate detected — linear scan for end of group
725                let group_end =
726                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
727                i = group_end;
728                continue;
729            }
730
731            // Unique line — write it
732            let cur_full = line_full_at(data, &line_starts, i);
733            write_all_raw(writer, cur_full)?;
734            if cur_full.len() == cur.len() {
735                writer.write_all(&[term])?;
736            }
737            i += 1;
738        }
739        return Ok(());
740    }
741
742    // General path with count tracking
743    let mut i = 0;
744    while i < num_lines {
745        let content = line_content_at(data, &line_starts, i, content_end);
746        let full = line_full_at(data, &line_starts, i);
747
748        let group_end = if fast
749            && i + 1 < num_lines
750            && lines_equal_fast(
751                content,
752                line_content_at(data, &line_starts, i + 1, content_end),
753            ) {
754            // Duplicate detected — linear scan for end
755            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
756        } else if !fast
757            && i + 1 < num_lines
758            && lines_equal(
759                content,
760                line_content_at(data, &line_starts, i + 1, content_end),
761                config,
762            )
763        {
764            // Slow path linear scan with key extraction
765            let mut j = i + 2;
766            while j < num_lines {
767                if !lines_equal(
768                    content,
769                    line_content_at(data, &line_starts, j, content_end),
770                    config,
771                ) {
772                    break;
773                }
774                j += 1;
775            }
776            j
777        } else {
778            i + 1
779        };
780
781        let count = (group_end - i) as u64;
782        output_group_bytes(writer, content, full, count, config, term)?;
783        i = group_end;
784    }
785
786    Ok(())
787}
788
789/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
790/// No pre-computed positions, no binary search, no Vec allocation.
791/// Outputs each line that differs from the previous.
792///
793/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
794/// independently, then cross-chunk boundaries are resolved.
795fn process_default_fast_singlepass(
796    data: &[u8],
797    writer: &mut impl Write,
798    term: u8,
799) -> io::Result<()> {
800    // Parallel path for large files — kick in at 4MB.
801    // Lower thresholds (e.g. 2MB) hurt performance on 10MB files because
802    // the parallel overhead dominates for smaller chunks.
803    if data.len() >= 4 * 1024 * 1024 {
804        return process_default_parallel(data, writer, term);
805    }
806
807    process_default_sequential(data, writer, term)
808}
809
810/// Sequential single-pass dedup with zero-copy output.
811/// Instead of copying data to a buffer, tracks contiguous output runs and writes
812/// directly from the original data. For all-unique data, this is a single write_all.
813///
814/// Optimized for the "many duplicates" case: caches the previous line's length
815/// and first-8-byte prefix for fast rejection of non-duplicates without
816/// calling the full comparison function.
817///
818/// Uses raw pointer arithmetic throughout to avoid bounds checking in the hot loop.
819fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
820    let data_len = data.len();
821    let base = data.as_ptr();
822    let mut prev_start: usize = 0;
823
824    // Find end of first line
825    let first_end: usize = match memchr::memchr(term, data) {
826        Some(pos) => pos,
827        None => {
828            // Single line, no terminator
829            writer.write_all(data)?;
830            return writer.write_all(&[term]);
831        }
832    };
833
834    // Cache previous line metadata for fast comparison
835    let mut prev_len = first_end - prev_start;
836    let mut prev_prefix: u64 = if prev_len >= 8 {
837        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
838    } else {
839        0
840    };
841
842    // run_start tracks the beginning of the current contiguous output region.
843    // When a duplicate is found, we save the run as an IoSlice and skip the dup.
844    // Runs are batched and written with writev to reduce syscall overhead.
845    const BATCH: usize = 256;
846    let term_byte: [u8; 1] = [term];
847    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
848    let mut run_start: usize = 0;
849    let mut cur_start = first_end + 1;
850    let mut last_output_end = first_end + 1; // exclusive end including terminator
851
852    while cur_start < data_len {
853        // Speculative line-end detection: if the previous line had length L,
854        // check if data[cur_start + L] is the terminator. This avoids the
855        // memchr SIMD call for repetitive data where all lines have the same length.
856        // Falls back to memchr if the speculation is wrong.
857        let cur_end = {
858            let speculative = cur_start + prev_len;
859            if speculative < data_len && unsafe { *base.add(speculative) } == term {
860                speculative
861            } else {
862                match memchr::memchr(term, unsafe {
863                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
864                }) {
865                    Some(offset) => cur_start + offset,
866                    None => data_len,
867                }
868            }
869        };
870
871        let cur_len = cur_end - cur_start;
872
873        // Fast reject: if lengths differ, lines are definitely not equal.
874        // This branch structure is ordered by frequency: length mismatch is
875        // most common for unique data, prefix mismatch next, full compare last.
876        let is_dup = if cur_len != prev_len {
877            false
878        } else if cur_len == 0 {
879            true
880        } else if cur_len >= 8 {
881            // Compare cached 8-byte prefix first
882            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
883            if cur_prefix != prev_prefix {
884                false
885            } else if cur_len <= 8 {
886                true // prefix covers entire line
887            } else if cur_len <= 16 {
888                // Check last 8 bytes (overlapping)
889                unsafe {
890                    let a_tail =
891                        (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
892                    let b_tail = (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
893                    a_tail == b_tail
894                }
895            } else if cur_len <= 32 {
896                // Check bytes 8-16 and last 8 bytes
897                unsafe {
898                    let a16 = (base.add(prev_start + 8) as *const u64).read_unaligned();
899                    let b16 = (base.add(cur_start + 8) as *const u64).read_unaligned();
900                    if a16 != b16 {
901                        false
902                    } else {
903                        let a_tail =
904                            (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
905                        let b_tail =
906                            (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
907                        a_tail == b_tail
908                    }
909                }
910            } else if cur_len <= 256 {
911                // 33-256 bytes: tight u64 loop with XOR-OR batching.
912                // Compares 32 bytes per iteration (4 u64 loads), reducing
913                // branch mispredictions vs individual comparisons.
914                unsafe {
915                    let ap = base.add(prev_start);
916                    let bp = base.add(cur_start);
917                    let mut off = 8usize; // first 8 bytes already compared via prefix
918                    let mut eq = true;
919                    while off + 32 <= cur_len {
920                        let a0 = (ap.add(off) as *const u64).read_unaligned();
921                        let b0 = (bp.add(off) as *const u64).read_unaligned();
922                        let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
923                        let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
924                        let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
925                        let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
926                        let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
927                        let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
928                        if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
929                            eq = false;
930                            break;
931                        }
932                        off += 32;
933                    }
934                    if eq {
935                        while off + 8 <= cur_len {
936                            let aw = (ap.add(off) as *const u64).read_unaligned();
937                            let bw = (bp.add(off) as *const u64).read_unaligned();
938                            if aw != bw {
939                                eq = false;
940                                break;
941                            }
942                            off += 8;
943                        }
944                    }
945                    if eq && off < cur_len {
946                        let a_tail = (ap.add(cur_len - 8) as *const u64).read_unaligned();
947                        let b_tail = (bp.add(cur_len - 8) as *const u64).read_unaligned();
948                        eq = a_tail == b_tail;
949                    }
950                    eq
951                }
952            } else {
953                // For longer lines (>256), use unsafe slice comparison
954                unsafe {
955                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
956                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
957                    a == b
958                }
959            }
960        } else {
961            // Short line < 8 bytes — direct byte comparison
962            unsafe {
963                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
964                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
965                a == b
966            }
967        };
968
969        if is_dup {
970            // Duplicate found — use doubling memcmp to skip entire run of identical lines.
971            // For 50K identical lines, this takes ~12µs vs ~250µs per-line comparison.
972            let pattern_len = prev_len + 1; // line content + terminator
973            if run_start < cur_start {
974                slices.push(io::IoSlice::new(&data[run_start..cur_start]));
975                if slices.len() >= BATCH {
976                    write_all_vectored(writer, &slices)?;
977                    slices.clear();
978                }
979            }
980            // Skip all identical copies using doubling memcmp
981            let skip_end = skip_dup_run(data, cur_start, prev_start, pattern_len);
982            run_start = skip_end;
983            cur_start = skip_end;
984            // prev_start/prev_len/prev_prefix unchanged (still the group representative)
985            continue;
986        } else {
987            // Different line — update cached comparison state
988            prev_start = cur_start;
989            prev_len = cur_len;
990            prev_prefix = if cur_len >= 8 {
991                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
992            } else {
993                0
994            };
995            last_output_end = if cur_end < data_len {
996                cur_end + 1
997            } else {
998                cur_end
999            };
1000        }
1001
1002        if cur_end < data_len {
1003            cur_start = cur_end + 1;
1004        } else {
1005            break;
1006        }
1007    }
1008
1009    // Flush remaining run
1010    if run_start < data_len {
1011        slices.push(io::IoSlice::new(
1012            &data[run_start..last_output_end.max(run_start)],
1013        ));
1014    }
1015
1016    // Ensure trailing terminator
1017    if data_len > 0 && unsafe { *base.add(data_len - 1) } != term {
1018        slices.push(io::IoSlice::new(&term_byte));
1019    }
1020
1021    if !slices.is_empty() {
1022        write_all_vectored(writer, &slices)?;
1023    }
1024
1025    Ok(())
1026}
1027
1028/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
1029/// positions in each chunk in parallel, then write output runs directly from
1030/// the original data. No per-chunk buffer allocation needed.
1031fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1032    use rayon::prelude::*;
1033
1034    let num_threads = rayon::current_num_threads().max(1);
1035    let chunk_target = data.len() / num_threads;
1036
1037    // Find chunk boundaries aligned to line terminators
1038    let mut boundaries = Vec::with_capacity(num_threads + 1);
1039    boundaries.push(0usize);
1040    for i in 1..num_threads {
1041        let target = i * chunk_target;
1042        if target >= data.len() {
1043            break;
1044        }
1045        if let Some(p) = memchr::memchr(term, &data[target..]) {
1046            let b = target + p + 1;
1047            if b > *boundaries.last().unwrap() && b <= data.len() {
1048                boundaries.push(b);
1049            }
1050        }
1051    }
1052    boundaries.push(data.len());
1053
1054    let n_chunks = boundaries.len() - 1;
1055    if n_chunks <= 1 {
1056        return process_default_sequential(data, writer, term);
1057    }
1058
1059    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
1060    struct ChunkResult {
1061        /// Byte ranges in the original data to output (contiguous runs)
1062        runs: Vec<(usize, usize)>,
1063        /// First line in chunk (absolute offsets into data, content without term)
1064        first_line_start: usize,
1065        first_line_end: usize,
1066        /// Last *output* line in chunk (content without term)
1067        last_line_start: usize,
1068        last_line_end: usize,
1069    }
1070
1071    let results: Vec<ChunkResult> = boundaries
1072        .windows(2)
1073        .collect::<Vec<_>>()
1074        .par_iter()
1075        .map(|w| {
1076            let chunk_start = w[0];
1077            let chunk_end = w[1];
1078            let chunk = &data[chunk_start..chunk_end];
1079
1080            let first_term = match memchr::memchr(term, chunk) {
1081                Some(pos) => pos,
1082                None => {
1083                    return ChunkResult {
1084                        runs: vec![(chunk_start, chunk_end)],
1085                        first_line_start: chunk_start,
1086                        first_line_end: chunk_end,
1087                        last_line_start: chunk_start,
1088                        last_line_end: chunk_end,
1089                    };
1090                }
1091            };
1092
1093            let first_line_start = chunk_start;
1094            let first_line_end = chunk_start + first_term;
1095
1096            let mut runs: Vec<(usize, usize)> = Vec::new();
1097            let mut run_start = chunk_start;
1098            let mut prev_start = 0usize;
1099            let mut _prev_end = first_term;
1100            let mut last_out_start = chunk_start;
1101            let mut last_out_end = first_line_end;
1102
1103            let mut prev_len = first_term;
1104            let chunk_base = chunk.as_ptr();
1105            let chunk_len = chunk.len();
1106            // Cache previous line's prefix for fast rejection
1107            let mut prev_prefix: u64 = if prev_len >= 8 {
1108                unsafe { (chunk_base as *const u64).read_unaligned() }
1109            } else {
1110                0
1111            };
1112            let mut cur_start = first_term + 1;
1113            while cur_start < chunk_len {
1114                // Speculative line-end: check if next line has same length
1115                let cur_end = {
1116                    let spec = cur_start + prev_len;
1117                    if spec < chunk_len && unsafe { *chunk_base.add(spec) } == term {
1118                        spec
1119                    } else {
1120                        match memchr::memchr(term, unsafe {
1121                            std::slice::from_raw_parts(
1122                                chunk_base.add(cur_start),
1123                                chunk_len - cur_start,
1124                            )
1125                        }) {
1126                            Some(offset) => cur_start + offset,
1127                            None => chunk_len,
1128                        }
1129                    }
1130                };
1131
1132                let cur_len = cur_end - cur_start;
1133                // Fast reject: length + prefix + full comparison
1134                let is_dup = if cur_len != prev_len {
1135                    false
1136                } else if cur_len == 0 {
1137                    true
1138                } else if cur_len >= 8 {
1139                    let cur_prefix =
1140                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() };
1141                    if cur_prefix != prev_prefix {
1142                        false
1143                    } else if cur_len <= 8 {
1144                        true
1145                    } else {
1146                        unsafe {
1147                            let a =
1148                                std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1149                            let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1150                            lines_equal_after_prefix(a, b)
1151                        }
1152                    }
1153                } else {
1154                    unsafe {
1155                        let a = std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1156                        let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1157                        a == b
1158                    }
1159                };
1160
1161                if is_dup {
1162                    // Duplicate — use doubling memcmp to skip entire run
1163                    let pattern_len = prev_len + 1;
1164                    let abs_cur = chunk_start + cur_start;
1165                    if run_start < abs_cur {
1166                        runs.push((run_start, abs_cur));
1167                    }
1168                    let skip_end = skip_dup_run(chunk, cur_start, prev_start, pattern_len);
1169                    run_start = chunk_start + skip_end;
1170                    cur_start = skip_end;
1171                    // prev_start/prev_len/prev_prefix unchanged
1172                    continue;
1173                } else {
1174                    last_out_start = chunk_start + cur_start;
1175                    last_out_end = chunk_start + cur_end;
1176                    prev_len = cur_len;
1177                    prev_prefix = if cur_len >= 8 {
1178                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() }
1179                    } else {
1180                        0
1181                    };
1182                }
1183                prev_start = cur_start;
1184                _prev_end = cur_end;
1185
1186                if cur_end < chunk_len {
1187                    cur_start = cur_end + 1;
1188                } else {
1189                    break;
1190                }
1191            }
1192
1193            // Close final run
1194            if run_start < chunk_end {
1195                runs.push((run_start, chunk_end));
1196            }
1197
1198            ChunkResult {
1199                runs,
1200                first_line_start,
1201                first_line_end,
1202                last_line_start: last_out_start,
1203                last_line_end: last_out_end,
1204            }
1205        })
1206        .collect();
1207
1208    // Write results, adjusting cross-chunk boundaries.
1209    // Batch output runs via write_vectored to reduce syscall count.
1210    const BATCH: usize = 256;
1211    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
1212    for (i, result) in results.iter().enumerate() {
1213        let skip_first = if i > 0 {
1214            let prev = &results[i - 1];
1215            let prev_last = &data[prev.last_line_start..prev.last_line_end];
1216            let cur_first = &data[result.first_line_start..result.first_line_end];
1217            lines_equal_fast(prev_last, cur_first)
1218        } else {
1219            false
1220        };
1221
1222        let skip_end = if skip_first {
1223            // Skip bytes up to and including the first line's terminator
1224            result.first_line_end + 1
1225        } else {
1226            0
1227        };
1228
1229        for &(rs, re) in &result.runs {
1230            let actual_start = rs.max(skip_end);
1231            if actual_start < re {
1232                slices.push(io::IoSlice::new(&data[actual_start..re]));
1233                if slices.len() >= BATCH {
1234                    write_all_vectored(writer, &slices)?;
1235                    slices.clear();
1236                }
1237            }
1238        }
1239    }
1240    if !slices.is_empty() {
1241        write_all_vectored(writer, &slices)?;
1242    }
1243
1244    // Ensure trailing terminator
1245    if !data.is_empty() && *data.last().unwrap() != term {
1246        writer.write_all(&[term])?;
1247    }
1248
1249    Ok(())
1250}
1251
1252/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
1253/// Zero-copy: writes directly from mmap data through BufWriter.
1254/// Uses speculative line-end detection and 8-byte prefix caching for fast
1255/// duplicate detection without full memcmp.
1256fn process_filter_fast_singlepass(
1257    data: &[u8],
1258    writer: &mut impl Write,
1259    config: &UniqConfig,
1260    term: u8,
1261) -> io::Result<()> {
1262    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1263    let data_len = data.len();
1264    let base = data.as_ptr();
1265
1266    let first_term = match memchr::memchr(term, data) {
1267        Some(pos) => pos,
1268        None => {
1269            // Single line: unique (count=1)
1270            if !repeated {
1271                writer.write_all(data)?;
1272                writer.write_all(&[term])?;
1273            }
1274            return Ok(());
1275        }
1276    };
1277
1278    let mut prev_start: usize = 0;
1279    let mut prev_end: usize = first_term;
1280    let mut prev_len = prev_end;
1281    let mut prev_prefix: u64 = if prev_len >= 8 {
1282        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1283    } else {
1284        0
1285    };
1286    let mut count: u64 = 1;
1287    let mut cur_start = first_term + 1;
1288
1289    // Batch output using IoSlice write_vectored to reduce syscall overhead.
1290    // Each output line needs 2 slices: content + terminator.
1291    const BATCH: usize = 512;
1292    let term_slice: [u8; 1] = [term];
1293    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1294
1295    while cur_start < data_len {
1296        // Speculative line-end detection
1297        let cur_end = {
1298            let speculative = cur_start + prev_len;
1299            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1300                speculative
1301            } else {
1302                match memchr::memchr(term, unsafe {
1303                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1304                }) {
1305                    Some(offset) => cur_start + offset,
1306                    None => data_len,
1307                }
1308            }
1309        };
1310
1311        let cur_len = cur_end - cur_start;
1312
1313        // Fast reject using length + 8-byte prefix.
1314        // After prefix match, use lines_equal_after_prefix which skips
1315        // the already-checked length/prefix/empty checks.
1316        let is_dup = if cur_len != prev_len {
1317            false
1318        } else if cur_len == 0 {
1319            true
1320        } else if cur_len >= 8 {
1321            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1322            if cur_prefix != prev_prefix {
1323                false
1324            } else if cur_len <= 8 {
1325                true
1326            } else {
1327                unsafe {
1328                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1329                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1330                    lines_equal_after_prefix(a, b)
1331                }
1332            }
1333        } else {
1334            unsafe {
1335                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1336                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1337                a == b
1338            }
1339        };
1340
1341        if is_dup {
1342            // Use doubling memcmp to skip entire duplicate run
1343            let pattern_len = prev_len + 1;
1344            let skip_end = skip_dup_run(data, cur_start, prev_start, pattern_len);
1345            let skipped = (skip_end - cur_start) / pattern_len;
1346            count += skipped as u64;
1347            cur_start = skip_end;
1348            continue;
1349        } else {
1350            let should_print = if repeated { count > 1 } else { count == 1 };
1351            if should_print {
1352                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1353                slices.push(io::IoSlice::new(&term_slice));
1354                if slices.len() >= BATCH * 2 {
1355                    write_all_vectored(writer, &slices)?;
1356                    slices.clear();
1357                }
1358            }
1359            prev_start = cur_start;
1360            prev_end = cur_end;
1361            prev_len = cur_len;
1362            prev_prefix = if cur_len >= 8 {
1363                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1364            } else {
1365                0
1366            };
1367            count = 1;
1368        }
1369
1370        if cur_end < data_len {
1371            cur_start = cur_end + 1;
1372        } else {
1373            break;
1374        }
1375    }
1376
1377    // Output last group
1378    let should_print = if repeated { count > 1 } else { count == 1 };
1379    if should_print {
1380        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1381        slices.push(io::IoSlice::new(&term_slice));
1382    }
1383    if !slices.is_empty() {
1384        write_all_vectored(writer, &slices)?;
1385    }
1386
1387    Ok(())
1388}
1389
1390/// Fast single-pass for count mode (-c) with all standard output modes.
1391/// Zero line_starts allocation: scans with memchr, counts groups inline,
1392/// and writes count-prefixed lines directly.
1393/// Uses cached length comparison for fast duplicate rejection.
1394/// Uses raw pointer arithmetic to avoid bounds checking.
1395///
1396/// Zero-copy output: uses writev (IoSlice) to write count prefixes from a
1397/// small arena + line content directly from mmap'd data + terminator bytes.
1398/// This avoids copying line content into an intermediate buffer entirely.
1399///
1400/// Optimizations:
1401/// - Speculative line-end detection: if all lines have the same length (common
1402///   in repetitive data), we can skip the memchr SIMD scan entirely by checking
1403///   if data[cur_start + prev_len] is the terminator.
1404/// - Cached 8-byte prefix rejection: avoids full comparison for most non-equal lines.
1405/// - IoSlice writev batching: eliminates memcpy of line content.
1406fn process_count_fast_singlepass(
1407    data: &[u8],
1408    writer: &mut impl Write,
1409    config: &UniqConfig,
1410    term: u8,
1411) -> io::Result<()> {
1412    let data_len = data.len();
1413    let base = data.as_ptr();
1414    let first_term = match memchr::memchr(term, data) {
1415        Some(pos) => pos,
1416        None => {
1417            // Single line: count=1
1418            let should_print = match config.mode {
1419                OutputMode::Default => true,
1420                OutputMode::RepeatedOnly => false,
1421                OutputMode::UniqueOnly => true,
1422                _ => true,
1423            };
1424            if should_print {
1425                write_count_line(writer, 1, data, term)?;
1426            }
1427            return Ok(());
1428        }
1429    };
1430
1431    let mut prev_start: usize = 0;
1432    let mut prev_end: usize = first_term;
1433    let mut prev_len = prev_end;
1434    let mut prev_prefix: u64 = if prev_len >= 8 {
1435        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1436    } else {
1437        0
1438    };
1439    let mut count: u64 = 1;
1440    let mut cur_start = first_term + 1;
1441
1442    // Zero-copy writev batching: accumulate groups as (prefix_offset, prefix_len,
1443    // line_start, line_end) tuples, with prefixes stored in a flat byte buffer.
1444    // Build IoSlice arrays at flush time to avoid borrow conflicts.
1445    // Line content points directly into mmap'd data — zero copy.
1446    const BATCH: usize = 340;
1447    const PREFIX_SLOT: usize = 28; // max prefix size per group
1448    let term_slice: [u8; 1] = [term];
1449    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1450    // Each group: (prefix_len, line_start_in_data, line_end_in_data)
1451    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1452
1453    while cur_start < data_len {
1454        let cur_end = {
1455            let speculative = cur_start + prev_len;
1456            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1457                speculative
1458            } else {
1459                match memchr::memchr(term, unsafe {
1460                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1461                }) {
1462                    Some(offset) => cur_start + offset,
1463                    None => data_len,
1464                }
1465            }
1466        };
1467
1468        let cur_len = cur_end - cur_start;
1469
1470        let is_dup = if cur_len != prev_len {
1471            false
1472        } else if cur_len == 0 {
1473            true
1474        } else if cur_len >= 8 {
1475            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1476            if cur_prefix != prev_prefix {
1477                false
1478            } else if cur_len <= 8 {
1479                true
1480            } else {
1481                unsafe {
1482                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1483                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1484                    lines_equal_after_prefix(a, b)
1485                }
1486            }
1487        } else {
1488            unsafe {
1489                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1490                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1491                a == b
1492            }
1493        };
1494
1495        if is_dup {
1496            // Use doubling memcmp to skip entire duplicate run
1497            let pattern_len = prev_len + 1;
1498            let skip_end = skip_dup_run(data, cur_start, prev_start, pattern_len);
1499            let skipped = (skip_end - cur_start) / pattern_len;
1500            count += skipped as u64;
1501            cur_start = skip_end;
1502            continue;
1503        } else {
1504            let should_print = match config.mode {
1505                OutputMode::RepeatedOnly => count > 1,
1506                OutputMode::UniqueOnly => count == 1,
1507                _ => true,
1508            };
1509            if should_print {
1510                let idx = groups.len();
1511                let prefix_off = idx * PREFIX_SLOT;
1512                let prefix_len = format_count_prefix_into(
1513                    count,
1514                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1515                );
1516                groups.push((prefix_len, prev_start, prev_end));
1517
1518                if groups.len() >= BATCH {
1519                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1520                    groups.clear();
1521                    // Re-fill prefix_buf with spaces for next batch
1522                    prefix_buf.fill(b' ');
1523                }
1524            }
1525            prev_start = cur_start;
1526            prev_end = cur_end;
1527            prev_len = cur_len;
1528            prev_prefix = if cur_len >= 8 {
1529                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1530            } else {
1531                0
1532            };
1533            count = 1;
1534        }
1535
1536        if cur_end < data_len {
1537            cur_start = cur_end + 1;
1538        } else {
1539            break;
1540        }
1541    }
1542
1543    // Output last group
1544    let should_print = match config.mode {
1545        OutputMode::RepeatedOnly => count > 1,
1546        OutputMode::UniqueOnly => count == 1,
1547        _ => true,
1548    };
1549    if should_print {
1550        let idx = groups.len();
1551        let prefix_off = idx * PREFIX_SLOT;
1552        let prefix_len =
1553            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1554        groups.push((prefix_len, prev_start, prev_end));
1555    }
1556    if !groups.is_empty() {
1557        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1558    }
1559
1560    Ok(())
1561}
1562
1563/// Flush batched count groups using write_vectored (writev).
1564/// Builds IoSlice arrays from the prefix buffer and mmap'd data.
1565#[inline]
1566fn flush_count_groups(
1567    writer: &mut impl Write,
1568    prefix_buf: &[u8],
1569    groups: &[(usize, usize, usize)],
1570    term_slice: &[u8; 1],
1571    data: &[u8],
1572) -> io::Result<()> {
1573    const PREFIX_SLOT: usize = 28;
1574    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(groups.len() * 3);
1575    for (i, &(prefix_len, line_start, line_end)) in groups.iter().enumerate() {
1576        let prefix_off = i * PREFIX_SLOT;
1577        slices.push(io::IoSlice::new(
1578            &prefix_buf[prefix_off..prefix_off + prefix_len],
1579        ));
1580        slices.push(io::IoSlice::new(&data[line_start..line_end]));
1581        slices.push(io::IoSlice::new(term_slice));
1582    }
1583    write_all_vectored(writer, &slices)
1584}
1585
1586/// Format a count prefix into a buffer slot, returning the prefix length.
1587/// GNU format: "%7lu " — right-aligned count in 7-char field, followed by space.
1588/// Buffer must be pre-filled with spaces and at least 28 bytes.
1589#[inline(always)]
1590fn format_count_prefix_into(count: u64, buf: &mut [u8]) -> usize {
1591    if count <= 9 {
1592        buf[6] = b'0' + count as u8;
1593        buf[7] = b' ';
1594        return 8;
1595    }
1596    // Use itoa on a temp array, then copy
1597    let mut tmp = [b' '; 28];
1598    let digits = itoa_right_aligned_into(&mut tmp, count);
1599    let width = digits.max(7);
1600    tmp[width] = b' ';
1601    let len = width + 1;
1602    buf[..len].copy_from_slice(&tmp[..len]);
1603    len
1604}
1605
1606/// Fast single-pass for case-insensitive (-i) default mode.
1607/// Uses run-tracking zero-copy output and write_vectored batching.
1608/// Includes speculative line-end detection and length-based early rejection.
1609fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1610    let data_len = data.len();
1611    let base = data.as_ptr();
1612
1613    let first_end = match memchr::memchr(term, data) {
1614        Some(pos) => pos,
1615        None => {
1616            writer.write_all(data)?;
1617            return writer.write_all(&[term]);
1618        }
1619    };
1620
1621    let mut prev_start: usize = 0;
1622    let mut prev_len = first_end;
1623
1624    // Run-tracking: flush contiguous regions from the original data.
1625    let mut run_start: usize = 0;
1626    let mut cur_start = first_end + 1;
1627    let mut _last_output_end = first_end + 1;
1628
1629    while cur_start < data_len {
1630        // Speculative line-end detection
1631        let cur_end = {
1632            let speculative = cur_start + prev_len;
1633            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1634                speculative
1635            } else {
1636                match memchr::memchr(term, unsafe {
1637                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1638                }) {
1639                    Some(offset) => cur_start + offset,
1640                    None => data_len,
1641                }
1642            }
1643        };
1644
1645        let cur_len = cur_end - cur_start;
1646
1647        // Length-based early rejection before expensive case-insensitive compare
1648        let is_dup = cur_len == prev_len
1649            && unsafe {
1650                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1651                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1652                a.eq_ignore_ascii_case(b)
1653            };
1654
1655        if is_dup {
1656            // Duplicate — flush current run up to this line, skip it
1657            if run_start < cur_start {
1658                writer.write_all(&data[run_start..cur_start])?;
1659            }
1660            run_start = if cur_end < data_len {
1661                cur_end + 1
1662            } else {
1663                cur_end
1664            };
1665        } else {
1666            prev_start = cur_start;
1667            prev_len = cur_len;
1668            _last_output_end = if cur_end < data_len {
1669                cur_end + 1
1670            } else {
1671                cur_end
1672            };
1673        }
1674
1675        if cur_end < data_len {
1676            cur_start = cur_end + 1;
1677        } else {
1678            break;
1679        }
1680    }
1681
1682    // Flush remaining run
1683    if run_start < data_len {
1684        writer.write_all(&data[run_start..data_len])?;
1685    }
1686    // Ensure trailing terminator
1687    if !data.is_empty() && data[data_len - 1] != term {
1688        writer.write_all(&[term])?;
1689    }
1690
1691    Ok(())
1692}
1693
1694/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1695/// Zero-copy: writes directly from mmap data through BufWriter.
1696/// Uses speculative line-end detection and length-based early rejection.
1697fn process_filter_ci_singlepass(
1698    data: &[u8],
1699    writer: &mut impl Write,
1700    config: &UniqConfig,
1701    term: u8,
1702) -> io::Result<()> {
1703    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1704    let data_len = data.len();
1705    let base = data.as_ptr();
1706
1707    let first_term = match memchr::memchr(term, data) {
1708        Some(pos) => pos,
1709        None => {
1710            if !repeated {
1711                writer.write_all(data)?;
1712                writer.write_all(&[term])?;
1713            }
1714            return Ok(());
1715        }
1716    };
1717
1718    let mut prev_start: usize = 0;
1719    let mut prev_end: usize = first_term;
1720    let mut prev_len = prev_end;
1721    let mut count: u64 = 1;
1722    let mut cur_start = first_term + 1;
1723
1724    // Batch output using IoSlice write_vectored
1725    const BATCH: usize = 512;
1726    let term_slice: [u8; 1] = [term];
1727    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1728
1729    while cur_start < data_len {
1730        // Speculative line-end detection
1731        let cur_end = {
1732            let speculative = cur_start + prev_len;
1733            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1734                speculative
1735            } else {
1736                match memchr::memchr(term, unsafe {
1737                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1738                }) {
1739                    Some(offset) => cur_start + offset,
1740                    None => data_len,
1741                }
1742            }
1743        };
1744
1745        let cur_len = cur_end - cur_start;
1746        // Length check + case-insensitive comparison
1747        let is_dup = cur_len == prev_len
1748            && lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]);
1749
1750        if is_dup {
1751            count += 1;
1752        } else {
1753            let should_print = if repeated { count > 1 } else { count == 1 };
1754            if should_print {
1755                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1756                slices.push(io::IoSlice::new(&term_slice));
1757                if slices.len() >= BATCH * 2 {
1758                    write_all_vectored(writer, &slices)?;
1759                    slices.clear();
1760                }
1761            }
1762            prev_start = cur_start;
1763            prev_end = cur_end;
1764            prev_len = cur_len;
1765            count = 1;
1766        }
1767
1768        if cur_end < data_len {
1769            cur_start = cur_end + 1;
1770        } else {
1771            break;
1772        }
1773    }
1774
1775    let should_print = if repeated { count > 1 } else { count == 1 };
1776    if should_print {
1777        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1778        slices.push(io::IoSlice::new(&term_slice));
1779    }
1780    if !slices.is_empty() {
1781        write_all_vectored(writer, &slices)?;
1782    }
1783
1784    Ok(())
1785}
1786
1787/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1788/// Writes directly to BufWriter — no batch_buf allocation needed.
1789fn process_count_ci_singlepass(
1790    data: &[u8],
1791    writer: &mut impl Write,
1792    config: &UniqConfig,
1793    term: u8,
1794) -> io::Result<()> {
1795    let first_term = match memchr::memchr(term, data) {
1796        Some(pos) => pos,
1797        None => {
1798            let should_print = match config.mode {
1799                OutputMode::Default => true,
1800                OutputMode::RepeatedOnly => false,
1801                OutputMode::UniqueOnly => true,
1802                _ => true,
1803            };
1804            if should_print {
1805                write_count_line(writer, 1, data, term)?;
1806            }
1807            return Ok(());
1808        }
1809    };
1810
1811    let is_default = matches!(config.mode, OutputMode::Default);
1812
1813    let mut prev_start: usize = 0;
1814    let mut prev_end: usize = first_term;
1815    let mut count: u64 = 1;
1816    let mut cur_start = first_term + 1;
1817
1818    // Zero-copy writev batching: same approach as process_count_fast_singlepass
1819    const BATCH: usize = 340;
1820    const PREFIX_SLOT: usize = 28;
1821    let term_slice: [u8; 1] = [term];
1822    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1823    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1824
1825    let base = data.as_ptr();
1826    let data_len = data.len();
1827    let mut prev_len = prev_end - prev_start;
1828
1829    while cur_start < data_len {
1830        // Speculative line-end detection
1831        let cur_end = {
1832            let speculative = cur_start + prev_len;
1833            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1834                speculative
1835            } else {
1836                match memchr::memchr(term, unsafe {
1837                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1838                }) {
1839                    Some(offset) => cur_start + offset,
1840                    None => data_len,
1841                }
1842            }
1843        };
1844
1845        let cur_len = cur_end - cur_start;
1846        // Length-based early rejection before expensive case-insensitive compare
1847        let is_dup = cur_len == prev_len
1848            && data[prev_start..prev_end].eq_ignore_ascii_case(&data[cur_start..cur_end]);
1849
1850        if is_dup {
1851            count += 1;
1852        } else {
1853            let should_print = if is_default {
1854                true
1855            } else {
1856                match config.mode {
1857                    OutputMode::RepeatedOnly => count > 1,
1858                    OutputMode::UniqueOnly => count == 1,
1859                    _ => true,
1860                }
1861            };
1862            if should_print {
1863                let idx = groups.len();
1864                let prefix_off = idx * PREFIX_SLOT;
1865                let prefix_len = format_count_prefix_into(
1866                    count,
1867                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1868                );
1869                groups.push((prefix_len, prev_start, prev_end));
1870
1871                if groups.len() >= BATCH {
1872                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1873                    groups.clear();
1874                    prefix_buf.fill(b' ');
1875                }
1876            }
1877            prev_start = cur_start;
1878            prev_end = cur_end;
1879            prev_len = cur_len;
1880            count = 1;
1881        }
1882
1883        if cur_end < data_len {
1884            cur_start = cur_end + 1;
1885        } else {
1886            break;
1887        }
1888    }
1889
1890    let should_print = if is_default {
1891        true
1892    } else {
1893        match config.mode {
1894            OutputMode::RepeatedOnly => count > 1,
1895            OutputMode::UniqueOnly => count == 1,
1896            _ => true,
1897        }
1898    };
1899    if should_print {
1900        let idx = groups.len();
1901        let prefix_off = idx * PREFIX_SLOT;
1902        let prefix_len =
1903            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1904        groups.push((prefix_len, prev_start, prev_end));
1905    }
1906    if !groups.is_empty() {
1907        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1908    }
1909
1910    Ok(())
1911}
1912
1913/// Output a group for standard modes (bytes path).
1914#[inline(always)]
1915fn output_group_bytes(
1916    writer: &mut impl Write,
1917    content: &[u8],
1918    full: &[u8],
1919    count: u64,
1920    config: &UniqConfig,
1921    term: u8,
1922) -> io::Result<()> {
1923    let should_print = match config.mode {
1924        OutputMode::Default => true,
1925        OutputMode::RepeatedOnly => count > 1,
1926        OutputMode::UniqueOnly => count == 1,
1927        _ => true,
1928    };
1929
1930    if should_print {
1931        if config.count {
1932            write_count_line(writer, count, content, term)?;
1933        } else {
1934            writer.write_all(full)?;
1935            // Add terminator if the original line didn't have one
1936            if full.len() == content.len() {
1937                writer.write_all(&[term])?;
1938            }
1939        }
1940    }
1941
1942    Ok(())
1943}
1944
1945/// Process --all-repeated / -D mode on byte slices.
1946fn process_all_repeated_bytes(
1947    data: &[u8],
1948    writer: &mut impl Write,
1949    config: &UniqConfig,
1950    method: AllRepeatedMethod,
1951    term: u8,
1952) -> io::Result<()> {
1953    let mut lines = LineIter::new(data, term);
1954
1955    let first = match lines.next() {
1956        Some(v) => v,
1957        None => return Ok(()),
1958    };
1959
1960    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1961    // For all-repeated we need to buffer group lines since we only print if count > 1
1962    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1963    group_lines.push(first);
1964    let mut first_group_printed = false;
1965
1966    let fast = !needs_key_extraction(config) && !config.ignore_case;
1967
1968    for (cur_content, cur_full) in lines {
1969        let prev_content = group_lines.last().unwrap().0;
1970        let equal = if fast {
1971            lines_equal_fast(prev_content, cur_content)
1972        } else {
1973            lines_equal(prev_content, cur_content, config)
1974        };
1975
1976        if equal {
1977            group_lines.push((cur_content, cur_full));
1978        } else {
1979            // Flush group
1980            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1981            group_lines.clear();
1982            group_lines.push((cur_content, cur_full));
1983        }
1984    }
1985
1986    // Flush last group
1987    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1988
1989    Ok(())
1990}
1991
1992/// Flush a group for --all-repeated mode (bytes path).
1993fn flush_all_repeated_bytes(
1994    writer: &mut impl Write,
1995    group: &[(&[u8], &[u8])],
1996    method: AllRepeatedMethod,
1997    first_group_printed: &mut bool,
1998    term: u8,
1999) -> io::Result<()> {
2000    if group.len() <= 1 {
2001        return Ok(()); // Not a duplicate group
2002    }
2003
2004    match method {
2005        AllRepeatedMethod::Prepend => {
2006            writer.write_all(&[term])?;
2007        }
2008        AllRepeatedMethod::Separate => {
2009            if *first_group_printed {
2010                writer.write_all(&[term])?;
2011            }
2012        }
2013        AllRepeatedMethod::None => {}
2014    }
2015
2016    for &(content, full) in group {
2017        writer.write_all(full)?;
2018        if full.len() == content.len() {
2019            writer.write_all(&[term])?;
2020        }
2021    }
2022
2023    *first_group_printed = true;
2024    Ok(())
2025}
2026
2027/// Process --group mode on byte slices.
2028fn process_group_bytes(
2029    data: &[u8],
2030    writer: &mut impl Write,
2031    config: &UniqConfig,
2032    method: GroupMethod,
2033    term: u8,
2034) -> io::Result<()> {
2035    let mut lines = LineIter::new(data, term);
2036
2037    let (prev_content, prev_full) = match lines.next() {
2038        Some(v) => v,
2039        None => return Ok(()),
2040    };
2041
2042    // Prepend/Both: separator before first group
2043    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
2044        writer.write_all(&[term])?;
2045    }
2046
2047    // Write first line
2048    writer.write_all(prev_full)?;
2049    if prev_full.len() == prev_content.len() {
2050        writer.write_all(&[term])?;
2051    }
2052
2053    let mut prev_content = prev_content;
2054    let fast = !needs_key_extraction(config) && !config.ignore_case;
2055
2056    for (cur_content, cur_full) in lines {
2057        let equal = if fast {
2058            lines_equal_fast(prev_content, cur_content)
2059        } else {
2060            lines_equal(prev_content, cur_content, config)
2061        };
2062
2063        if !equal {
2064            // New group — write separator
2065            writer.write_all(&[term])?;
2066        }
2067
2068        writer.write_all(cur_full)?;
2069        if cur_full.len() == cur_content.len() {
2070            writer.write_all(&[term])?;
2071        }
2072
2073        prev_content = cur_content;
2074    }
2075
2076    // Append/Both: separator after last group
2077    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
2078        writer.write_all(&[term])?;
2079    }
2080
2081    Ok(())
2082}
2083
2084// ============================================================================
2085// Streaming processing (for stdin / pipe input)
2086// ============================================================================
2087
2088/// Main streaming uniq processor.
2089/// Reads from `input`, writes to `output`.
2090pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
2091    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
2092    let mut writer = BufWriter::with_capacity(32 * 1024 * 1024, output);
2093    let term = if config.zero_terminated { b'\0' } else { b'\n' };
2094
2095    match config.mode {
2096        OutputMode::Group(method) => {
2097            process_group_stream(reader, &mut writer, config, method, term)?;
2098        }
2099        OutputMode::AllRepeated(method) => {
2100            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
2101        }
2102        _ => {
2103            process_standard_stream(reader, &mut writer, config, term)?;
2104        }
2105    }
2106
2107    writer.flush()?;
2108    Ok(())
2109}
2110
2111/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
2112fn process_standard_stream<R: BufRead, W: Write>(
2113    mut reader: R,
2114    writer: &mut W,
2115    config: &UniqConfig,
2116    term: u8,
2117) -> io::Result<()> {
2118    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2119    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2120
2121    // Read first line
2122    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2123        return Ok(()); // empty input
2124    }
2125    let mut count: u64 = 1;
2126
2127    loop {
2128        current_line.clear();
2129        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2130
2131        if bytes_read == 0 {
2132            // End of input — output the last group
2133            output_group_stream(writer, &prev_line, count, config, term)?;
2134            break;
2135        }
2136
2137        if compare_lines_stream(&prev_line, &current_line, config, term) {
2138            count += 1;
2139        } else {
2140            output_group_stream(writer, &prev_line, count, config, term)?;
2141            std::mem::swap(&mut prev_line, &mut current_line);
2142            count = 1;
2143        }
2144    }
2145
2146    Ok(())
2147}
2148
2149/// Compare two lines (with terminators) in streaming mode.
2150#[inline(always)]
2151fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
2152    let a_stripped = strip_term(a, term);
2153    let b_stripped = strip_term(b, term);
2154    lines_equal(a_stripped, b_stripped, config)
2155}
2156
2157/// Strip terminator from end of line.
2158#[inline(always)]
2159fn strip_term(line: &[u8], term: u8) -> &[u8] {
2160    if line.last() == Some(&term) {
2161        &line[..line.len() - 1]
2162    } else {
2163        line
2164    }
2165}
2166
2167/// Output a group in streaming mode.
2168#[inline(always)]
2169fn output_group_stream(
2170    writer: &mut impl Write,
2171    line: &[u8],
2172    count: u64,
2173    config: &UniqConfig,
2174    term: u8,
2175) -> io::Result<()> {
2176    let should_print = match config.mode {
2177        OutputMode::Default => true,
2178        OutputMode::RepeatedOnly => count > 1,
2179        OutputMode::UniqueOnly => count == 1,
2180        _ => true,
2181    };
2182
2183    if should_print {
2184        let content = strip_term(line, term);
2185        if config.count {
2186            write_count_line(writer, count, content, term)?;
2187        } else {
2188            writer.write_all(content)?;
2189            writer.write_all(&[term])?;
2190        }
2191    }
2192
2193    Ok(())
2194}
2195
2196/// Process --all-repeated / -D mode (streaming).
2197fn process_all_repeated_stream<R: BufRead, W: Write>(
2198    mut reader: R,
2199    writer: &mut W,
2200    config: &UniqConfig,
2201    method: AllRepeatedMethod,
2202    term: u8,
2203) -> io::Result<()> {
2204    let mut group: Vec<Vec<u8>> = Vec::new();
2205    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2206    let mut first_group_printed = false;
2207
2208    current_line.clear();
2209    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
2210        return Ok(());
2211    }
2212    group.push(current_line.clone());
2213
2214    loop {
2215        current_line.clear();
2216        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2217
2218        if bytes_read == 0 {
2219            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2220            break;
2221        }
2222
2223        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
2224            group.push(current_line.clone());
2225        } else {
2226            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2227            group.clear();
2228            group.push(current_line.clone());
2229        }
2230    }
2231
2232    Ok(())
2233}
2234
2235/// Flush a group for --all-repeated mode (streaming).
2236fn flush_all_repeated_stream(
2237    writer: &mut impl Write,
2238    group: &[Vec<u8>],
2239    method: AllRepeatedMethod,
2240    first_group_printed: &mut bool,
2241    term: u8,
2242) -> io::Result<()> {
2243    if group.len() <= 1 {
2244        return Ok(());
2245    }
2246
2247    match method {
2248        AllRepeatedMethod::Prepend => {
2249            writer.write_all(&[term])?;
2250        }
2251        AllRepeatedMethod::Separate => {
2252            if *first_group_printed {
2253                writer.write_all(&[term])?;
2254            }
2255        }
2256        AllRepeatedMethod::None => {}
2257    }
2258
2259    for line in group {
2260        let content = strip_term(line, term);
2261        writer.write_all(content)?;
2262        writer.write_all(&[term])?;
2263    }
2264
2265    *first_group_printed = true;
2266    Ok(())
2267}
2268
2269/// Process --group mode (streaming).
2270fn process_group_stream<R: BufRead, W: Write>(
2271    mut reader: R,
2272    writer: &mut W,
2273    config: &UniqConfig,
2274    method: GroupMethod,
2275    term: u8,
2276) -> io::Result<()> {
2277    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2278    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2279
2280    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2281        return Ok(());
2282    }
2283
2284    // Prepend/Both: separator before first group
2285    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
2286        writer.write_all(&[term])?;
2287    }
2288
2289    let content = strip_term(&prev_line, term);
2290    writer.write_all(content)?;
2291    writer.write_all(&[term])?;
2292
2293    loop {
2294        current_line.clear();
2295        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2296
2297        if bytes_read == 0 {
2298            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
2299                writer.write_all(&[term])?;
2300            }
2301            break;
2302        }
2303
2304        if !compare_lines_stream(&prev_line, &current_line, config, term) {
2305            writer.write_all(&[term])?;
2306        }
2307
2308        let content = strip_term(&current_line, term);
2309        writer.write_all(content)?;
2310        writer.write_all(&[term])?;
2311
2312        std::mem::swap(&mut prev_line, &mut current_line);
2313    }
2314
2315    Ok(())
2316}
2317
2318/// Read a line terminated by the given byte (newline or NUL).
2319/// Returns number of bytes read (0 = EOF).
2320#[inline(always)]
2321fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
2322    reader.read_until(term, buf)
2323}