Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// Write all IoSlices to the writer, handling partial writes correctly.
10fn write_all_vectored(writer: &mut impl Write, slices: &[io::IoSlice<'_>]) -> io::Result<()> {
11    let n = writer.write_vectored(slices)?;
12    let expected: usize = slices.iter().map(|s| s.len()).sum();
13    if n >= expected {
14        return Ok(());
15    }
16    if n == 0 && expected > 0 {
17        return Err(io::Error::new(
18            io::ErrorKind::WriteZero,
19            "write_vectored returned 0",
20        ));
21    }
22    // Slow path: partial write — fall back to write_all per remaining slice.
23    let mut consumed = n;
24    for slice in slices {
25        if consumed == 0 {
26            writer.write_all(slice)?;
27        } else if consumed >= slice.len() {
28            consumed -= slice.len();
29        } else {
30            writer.write_all(&slice[consumed..])?;
31            consumed = 0;
32        }
33    }
34    Ok(())
35}
36
37/// How to delimit groups when using --all-repeated
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum AllRepeatedMethod {
40    None,
41    Prepend,
42    Separate,
43}
44
45/// How to delimit groups when using --group
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum GroupMethod {
48    Separate,
49    Prepend,
50    Append,
51    Both,
52}
53
54/// Output mode for uniq
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum OutputMode {
57    /// Default: print unique lines and first of each duplicate group
58    Default,
59    /// -d: print only first line of duplicate groups
60    RepeatedOnly,
61    /// -D / --all-repeated: print ALL duplicate lines
62    AllRepeated(AllRepeatedMethod),
63    /// -u: print only lines that are NOT duplicated
64    UniqueOnly,
65    /// --group: show all items with group separators
66    Group(GroupMethod),
67}
68
69/// Configuration for uniq processing
70#[derive(Debug, Clone)]
71pub struct UniqConfig {
72    pub mode: OutputMode,
73    pub count: bool,
74    pub ignore_case: bool,
75    pub skip_fields: usize,
76    pub skip_chars: usize,
77    pub check_chars: Option<usize>,
78    pub zero_terminated: bool,
79}
80
81impl Default for UniqConfig {
82    fn default() -> Self {
83        Self {
84            mode: OutputMode::Default,
85            count: false,
86            ignore_case: false,
87            skip_fields: 0,
88            skip_chars: 0,
89            check_chars: None,
90            zero_terminated: false,
91        }
92    }
93}
94
95/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
96/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
97#[inline(always)]
98fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
99    let mut start = 0;
100    let len = line.len();
101
102    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
103    for _ in 0..config.skip_fields {
104        // Skip blanks (space and tab)
105        while start < len && (line[start] == b' ' || line[start] == b'\t') {
106            start += 1;
107        }
108        // Skip non-blanks (field content)
109        while start < len && line[start] != b' ' && line[start] != b'\t' {
110            start += 1;
111        }
112    }
113
114    // Skip N characters
115    if config.skip_chars > 0 {
116        let remaining = len - start;
117        let skip = config.skip_chars.min(remaining);
118        start += skip;
119    }
120
121    let slice = &line[start..];
122
123    // Limit comparison to N characters
124    if let Some(w) = config.check_chars {
125        if w < slice.len() {
126            return &slice[..w];
127        }
128    }
129
130    slice
131}
132
133/// Compare two lines (without terminators) using the config's comparison rules.
134#[inline(always)]
135fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
136    let sa = get_compare_slice(a, config);
137    let sb = get_compare_slice(b, config);
138
139    if config.ignore_case {
140        sa.eq_ignore_ascii_case(sb)
141    } else {
142        sa == sb
143    }
144}
145
146/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
147/// Uses length check + 8-byte prefix rejection before full comparison.
148#[inline(always)]
149fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
150    let alen = a.len();
151    if alen != b.len() {
152        return false;
153    }
154    if alen == 0 {
155        return true;
156    }
157    a.eq_ignore_ascii_case(b)
158}
159
160/// Check if config requires field/char skipping or char limiting.
161#[inline(always)]
162fn needs_key_extraction(config: &UniqConfig) -> bool {
163    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
164}
165
166/// Fast path comparison: no field/char extraction needed, no case folding.
167/// Uses pointer+length equality shortcut and multi-word prefix rejection.
168/// For short lines (<= 32 bytes, common in many-dups data), avoids the
169/// full memcmp call overhead by doing direct word comparisons.
170/// For medium lines (33-256 bytes), uses a tight u64 loop covering the
171/// full line without falling through to memcmp.
172#[inline(always)]
173fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
174    let alen = a.len();
175    if alen != b.len() {
176        return false;
177    }
178    if alen == 0 {
179        return true;
180    }
181    // Short-line fast path: compare via word loads to avoid memcmp call overhead
182    if alen <= 8 {
183        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
184        return a == b;
185    }
186    unsafe {
187        let ap = a.as_ptr();
188        let bp = b.as_ptr();
189        // 8-byte prefix check: reject most non-equal lines without full memcmp
190        let a8 = (ap as *const u64).read_unaligned();
191        let b8 = (bp as *const u64).read_unaligned();
192        if a8 != b8 {
193            return false;
194        }
195        // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
196        if alen <= 16 {
197            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
198            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
199            return a_tail == b_tail;
200        }
201        // For 17-32 bytes: check first 16 + last 16 (overlapping) to avoid memcmp
202        if alen <= 32 {
203            let a16 = (ap.add(8) as *const u64).read_unaligned();
204            let b16 = (bp.add(8) as *const u64).read_unaligned();
205            if a16 != b16 {
206                return false;
207            }
208            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
209            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
210            return a_tail == b_tail;
211        }
212        // For 33-256 bytes: tight u64 loop covering the full line.
213        // Compare 32 bytes per iteration (4 u64 loads), then handle tail.
214        // This avoids the function call overhead of memcmp for medium lines.
215        if alen <= 256 {
216            let mut off = 8usize; // first 8 bytes already compared
217            // Compare 32 bytes at a time
218            while off + 32 <= alen {
219                let a0 = (ap.add(off) as *const u64).read_unaligned();
220                let b0 = (bp.add(off) as *const u64).read_unaligned();
221                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
222                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
223                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
224                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
225                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
226                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
227                // XOR all pairs and OR together: zero if all equal
228                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
229                    return false;
230                }
231                off += 32;
232            }
233            // Compare remaining 8 bytes at a time
234            while off + 8 <= alen {
235                let aw = (ap.add(off) as *const u64).read_unaligned();
236                let bw = (bp.add(off) as *const u64).read_unaligned();
237                if aw != bw {
238                    return false;
239                }
240                off += 8;
241            }
242            // Compare tail (overlapping last 8 bytes)
243            if off < alen {
244                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
245                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
246                return a_tail == b_tail;
247            }
248            return true;
249        }
250    }
251    // Longer lines (>256): prefix passed, fall through to full memcmp
252    a == b
253}
254
255/// Compare two equal-length lines starting from byte 8.
256/// Caller has already checked: lengths are equal, both >= 9 bytes, first 8 bytes match.
257/// This avoids redundant checks when the calling loop already did prefix rejection.
258#[inline(always)]
259fn lines_equal_after_prefix(a: &[u8], b: &[u8]) -> bool {
260    let alen = a.len();
261    debug_assert!(alen == b.len());
262    debug_assert!(alen > 8);
263    unsafe {
264        let ap = a.as_ptr();
265        let bp = b.as_ptr();
266        // Check last 8 bytes first (overlapping for 9-16 byte lines)
267        if alen <= 16 {
268            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
269            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
270            return a_tail == b_tail;
271        }
272        if alen <= 32 {
273            let a16 = (ap.add(8) as *const u64).read_unaligned();
274            let b16 = (bp.add(8) as *const u64).read_unaligned();
275            if a16 != b16 {
276                return false;
277            }
278            let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
279            let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
280            return a_tail == b_tail;
281        }
282        if alen <= 256 {
283            let mut off = 8usize;
284            while off + 32 <= alen {
285                let a0 = (ap.add(off) as *const u64).read_unaligned();
286                let b0 = (bp.add(off) as *const u64).read_unaligned();
287                let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
288                let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
289                let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
290                let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
291                let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
292                let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
293                if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
294                    return false;
295                }
296                off += 32;
297            }
298            while off + 8 <= alen {
299                let aw = (ap.add(off) as *const u64).read_unaligned();
300                let bw = (bp.add(off) as *const u64).read_unaligned();
301                if aw != bw {
302                    return false;
303                }
304                off += 8;
305            }
306            if off < alen {
307                let a_tail = (ap.add(alen - 8) as *const u64).read_unaligned();
308                let b_tail = (bp.add(alen - 8) as *const u64).read_unaligned();
309                return a_tail == b_tail;
310            }
311            return true;
312        }
313    }
314    // >256 bytes: use memcmp via slice comparison (skipping the already-compared prefix)
315    a[8..] == b[8..]
316}
317
318/// Write a count-prefixed line in GNU uniq format.
319/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
320/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
321///
322/// Optimized with lookup table for counts 1-9 (most common case in many-dups data)
323/// and fast-path for counts < 10M (always fits in 7 chars, no copy_within needed).
324#[inline(always)]
325fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
326    // Ultra-fast path for common small counts: pre-built prefix strings
327    // Avoids all the itoa/copy_within overhead for the most common case.
328    if count <= 9 {
329        // "      N " where N is 1-9 (7 chars + space = 8 bytes prefix)
330        let prefix: &[u8] = match count {
331            1 => b"      1 ",
332            2 => b"      2 ",
333            3 => b"      3 ",
334            4 => b"      4 ",
335            5 => b"      5 ",
336            6 => b"      6 ",
337            7 => b"      7 ",
338            8 => b"      8 ",
339            9 => b"      9 ",
340            _ => unreachable!(),
341        };
342        let total = 8 + line.len() + 1;
343        if total <= 256 {
344            let mut buf = [0u8; 256];
345            unsafe {
346                std::ptr::copy_nonoverlapping(prefix.as_ptr(), buf.as_mut_ptr(), 8);
347                std::ptr::copy_nonoverlapping(line.as_ptr(), buf.as_mut_ptr().add(8), line.len());
348                *buf.as_mut_ptr().add(8 + line.len()) = term;
349            }
350            return out.write_all(&buf[..total]);
351        } else {
352            out.write_all(prefix)?;
353            out.write_all(line)?;
354            return out.write_all(&[term]);
355        }
356    }
357
358    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
359    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
360    let digits = itoa_right_aligned_into(&mut prefix, count);
361    let width = digits.max(7); // minimum 7 chars
362    let prefix_len = width + 1; // +1 for trailing space
363    prefix[width] = b' ';
364
365    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
366    let total = prefix_len + line.len() + 1;
367    if total <= 256 {
368        let mut buf = [0u8; 256];
369        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
370        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
371        buf[prefix_len + line.len()] = term;
372        out.write_all(&buf[..total])
373    } else {
374        out.write_all(&prefix[..prefix_len])?;
375        out.write_all(line)?;
376        out.write_all(&[term])
377    }
378}
379
380/// Write u64 decimal right-aligned into prefix buffer.
381/// Buffer is pre-filled with spaces. Returns number of digits written.
382#[inline(always)]
383fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
384    if val == 0 {
385        buf[6] = b'0';
386        return 7; // 6 spaces + '0' = 7 chars
387    }
388    // Write digits right-to-left from position 27 (leaving room for trailing space)
389    let mut pos = 27;
390    while val > 0 {
391        pos -= 1;
392        buf[pos] = b'0' + (val % 10) as u8;
393        val /= 10;
394    }
395    let num_digits = 27 - pos;
396    if num_digits >= 7 {
397        // Number is wide enough, shift to front
398        buf.copy_within(pos..27, 0);
399        num_digits
400    } else {
401        // Right-align in 7-char field: spaces then digits
402        let pad = 7 - num_digits;
403        buf.copy_within(pos..27, pad);
404        // buf[0..pad] is already spaces from initialization
405        7
406    }
407}
408
409// ============================================================================
410// High-performance mmap-based processing (for byte slices, zero-copy)
411// ============================================================================
412
413/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
414pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
415    // 16MB output buffer — optimal for L3 cache utilization on modern CPUs.
416    // 32MB can cause cache thrashing; 16MB stays within L3 while still
417    // reducing write() syscall count significantly.
418    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
419    let term = if config.zero_terminated { b'\0' } else { b'\n' };
420
421    match config.mode {
422        OutputMode::Group(method) => {
423            process_group_bytes(data, &mut writer, config, method, term)?;
424        }
425        OutputMode::AllRepeated(method) => {
426            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
427        }
428        _ => {
429            process_standard_bytes(data, &mut writer, config, term)?;
430        }
431    }
432
433    writer.flush()?;
434    Ok(())
435}
436
437/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
438/// Uses memchr for SIMD-accelerated line boundary detection.
439struct LineIter<'a> {
440    data: &'a [u8],
441    pos: usize,
442    term: u8,
443}
444
445impl<'a> LineIter<'a> {
446    #[inline(always)]
447    fn new(data: &'a [u8], term: u8) -> Self {
448        Self { data, pos: 0, term }
449    }
450}
451
452impl<'a> Iterator for LineIter<'a> {
453    /// (line content without terminator, full line including terminator for output)
454    type Item = (&'a [u8], &'a [u8]);
455
456    #[inline(always)]
457    fn next(&mut self) -> Option<Self::Item> {
458        if self.pos >= self.data.len() {
459            return None;
460        }
461
462        let remaining = &self.data[self.pos..];
463        match memchr::memchr(self.term, remaining) {
464            Some(idx) => {
465                let line_start = self.pos;
466                let line_end = self.pos + idx; // without terminator
467                let full_end = self.pos + idx + 1; // with terminator
468                self.pos = full_end;
469                Some((
470                    &self.data[line_start..line_end],
471                    &self.data[line_start..full_end],
472                ))
473            }
474            None => {
475                // Last line without terminator
476                let line_start = self.pos;
477                self.pos = self.data.len();
478                let line = &self.data[line_start..];
479                Some((line, line))
480            }
481        }
482    }
483}
484
485/// Get line content (without terminator) from pre-computed positions.
486/// `content_end` is the end of actual content (excludes trailing terminator if present).
487#[inline(always)]
488fn line_content_at<'a>(
489    data: &'a [u8],
490    line_starts: &[usize],
491    idx: usize,
492    content_end: usize,
493) -> &'a [u8] {
494    let start = line_starts[idx];
495    let end = if idx + 1 < line_starts.len() {
496        line_starts[idx + 1] - 1 // exclude terminator
497    } else {
498        content_end // last line: pre-computed to exclude trailing terminator
499    };
500    &data[start..end]
501}
502
503/// Get full line (with terminator) from pre-computed positions.
504#[inline(always)]
505fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
506    let start = line_starts[idx];
507    let end = if idx + 1 < line_starts.len() {
508        line_starts[idx + 1] // include terminator
509    } else {
510        data.len()
511    };
512    &data[start..end]
513}
514
515/// Linear scan for the end of a duplicate group.
516/// Returns the index of the first line that differs from line_starts[group_start].
517/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
518/// equal lines can appear in non-adjacent groups separated by different lines.
519/// Caches key length for fast length-mismatch rejection.
520#[inline]
521fn linear_scan_group_end(
522    data: &[u8],
523    line_starts: &[usize],
524    group_start: usize,
525    num_lines: usize,
526    content_end: usize,
527) -> usize {
528    let key = line_content_at(data, line_starts, group_start, content_end);
529    let key_len = key.len();
530    let mut i = group_start + 1;
531    while i < num_lines {
532        let candidate = line_content_at(data, line_starts, i, content_end);
533        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
534            return i;
535        }
536        i += 1;
537    }
538    i
539}
540
541/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
542/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
543/// General path: pre-computed line positions with binary search for groups.
544fn process_standard_bytes(
545    data: &[u8],
546    writer: &mut impl Write,
547    config: &UniqConfig,
548    term: u8,
549) -> io::Result<()> {
550    if data.is_empty() {
551        return Ok(());
552    }
553
554    let fast = !needs_key_extraction(config) && !config.ignore_case;
555    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
556
557    // Ultra-fast path: default mode, no count, no key extraction.
558    // Single-pass: scan with memchr, compare adjacent lines inline.
559    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
560    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
561        return process_default_fast_singlepass(data, writer, term);
562    }
563
564    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
565    if fast
566        && !config.count
567        && matches!(
568            config.mode,
569            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
570        )
571    {
572        return process_filter_fast_singlepass(data, writer, config, term);
573    }
574
575    // Ultra-fast path: count mode with no key extraction.
576    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
577    // Avoids the line_starts Vec allocation (20MB+ for large files).
578    if fast && config.count {
579        return process_count_fast_singlepass(data, writer, config, term);
580    }
581
582    // Fast path for case-insensitive (-i) mode with no key extraction.
583    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
584    // Avoids the general path's line_starts Vec allocation.
585    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
586        return process_default_ci_singlepass(data, writer, term);
587    }
588
589    if fast_ci
590        && !config.count
591        && matches!(
592            config.mode,
593            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
594        )
595    {
596        return process_filter_ci_singlepass(data, writer, config, term);
597    }
598
599    if fast_ci && config.count {
600        return process_count_ci_singlepass(data, writer, config, term);
601    }
602
603    // General path: pre-computed line positions for binary search on groups
604    let estimated_lines = (data.len() / 40).max(64);
605    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
606    line_starts.push(0);
607    for pos in memchr::memchr_iter(term, data) {
608        if pos + 1 < data.len() {
609            line_starts.push(pos + 1);
610        }
611    }
612    let num_lines = line_starts.len();
613    if num_lines == 0 {
614        return Ok(());
615    }
616
617    // Pre-compute content end: if data ends with terminator, exclude it for last line
618    let content_end = if data.last() == Some(&term) {
619        data.len() - 1
620    } else {
621        data.len()
622    };
623
624    // Ultra-fast path: default mode, no count, no key extraction
625    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
626        // Write first line
627        let first_full = line_full_at(data, &line_starts, 0);
628        let first_content = line_content_at(data, &line_starts, 0, content_end);
629        write_all_raw(writer, first_full)?;
630        if first_full.len() == first_content.len() {
631            writer.write_all(&[term])?;
632        }
633
634        let mut i = 1;
635        while i < num_lines {
636            let prev = line_content_at(data, &line_starts, i - 1, content_end);
637            let cur = line_content_at(data, &line_starts, i, content_end);
638
639            if lines_equal_fast(prev, cur) {
640                // Duplicate detected — linear scan for end of group
641                let group_end =
642                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
643                i = group_end;
644                continue;
645            }
646
647            // Unique line — write it
648            let cur_full = line_full_at(data, &line_starts, i);
649            write_all_raw(writer, cur_full)?;
650            if cur_full.len() == cur.len() {
651                writer.write_all(&[term])?;
652            }
653            i += 1;
654        }
655        return Ok(());
656    }
657
658    // General path with count tracking
659    let mut i = 0;
660    while i < num_lines {
661        let content = line_content_at(data, &line_starts, i, content_end);
662        let full = line_full_at(data, &line_starts, i);
663
664        let group_end = if fast
665            && i + 1 < num_lines
666            && lines_equal_fast(
667                content,
668                line_content_at(data, &line_starts, i + 1, content_end),
669            ) {
670            // Duplicate detected — linear scan for end
671            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
672        } else if !fast
673            && i + 1 < num_lines
674            && lines_equal(
675                content,
676                line_content_at(data, &line_starts, i + 1, content_end),
677                config,
678            )
679        {
680            // Slow path linear scan with key extraction
681            let mut j = i + 2;
682            while j < num_lines {
683                if !lines_equal(
684                    content,
685                    line_content_at(data, &line_starts, j, content_end),
686                    config,
687                ) {
688                    break;
689                }
690                j += 1;
691            }
692            j
693        } else {
694            i + 1
695        };
696
697        let count = (group_end - i) as u64;
698        output_group_bytes(writer, content, full, count, config, term)?;
699        i = group_end;
700    }
701
702    Ok(())
703}
704
705/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
706/// No pre-computed positions, no binary search, no Vec allocation.
707/// Outputs each line that differs from the previous.
708///
709/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
710/// independently, then cross-chunk boundaries are resolved.
711fn process_default_fast_singlepass(
712    data: &[u8],
713    writer: &mut impl Write,
714    term: u8,
715) -> io::Result<()> {
716    // Parallel path for large files — kick in at 4MB.
717    // Lower thresholds (e.g. 2MB) hurt performance on 10MB files because
718    // the parallel overhead dominates for smaller chunks.
719    if data.len() >= 4 * 1024 * 1024 {
720        return process_default_parallel(data, writer, term);
721    }
722
723    process_default_sequential(data, writer, term)
724}
725
726/// Sequential single-pass dedup with zero-copy output.
727/// Instead of copying data to a buffer, tracks contiguous output runs and writes
728/// directly from the original data. For all-unique data, this is a single write_all.
729///
730/// Optimized for the "many duplicates" case: caches the previous line's length
731/// and first-8-byte prefix for fast rejection of non-duplicates without
732/// calling the full comparison function.
733///
734/// Uses raw pointer arithmetic throughout to avoid bounds checking in the hot loop.
735fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
736    let data_len = data.len();
737    let base = data.as_ptr();
738    let mut prev_start: usize = 0;
739
740    // Find end of first line
741    let first_end: usize = match memchr::memchr(term, data) {
742        Some(pos) => pos,
743        None => {
744            // Single line, no terminator
745            writer.write_all(data)?;
746            return writer.write_all(&[term]);
747        }
748    };
749
750    // Cache previous line metadata for fast comparison
751    let mut prev_len = first_end - prev_start;
752    let mut prev_prefix: u64 = if prev_len >= 8 {
753        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
754    } else {
755        0
756    };
757
758    // run_start tracks the beginning of the current contiguous output region.
759    // When a duplicate is found, we flush the run up to the duplicate and skip it.
760    let mut run_start: usize = 0;
761    let mut cur_start = first_end + 1;
762    let mut last_output_end = first_end + 1; // exclusive end including terminator
763
764    while cur_start < data_len {
765        // Speculative line-end detection: if the previous line had length L,
766        // check if data[cur_start + L] is the terminator. This avoids the
767        // memchr SIMD call for repetitive data where all lines have the same length.
768        // Falls back to memchr if the speculation is wrong.
769        let cur_end = {
770            let speculative = cur_start + prev_len;
771            if speculative < data_len && unsafe { *base.add(speculative) } == term {
772                speculative
773            } else {
774                match memchr::memchr(term, unsafe {
775                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
776                }) {
777                    Some(offset) => cur_start + offset,
778                    None => data_len,
779                }
780            }
781        };
782
783        let cur_len = cur_end - cur_start;
784
785        // Fast reject: if lengths differ, lines are definitely not equal.
786        // This branch structure is ordered by frequency: length mismatch is
787        // most common for unique data, prefix mismatch next, full compare last.
788        let is_dup = if cur_len != prev_len {
789            false
790        } else if cur_len == 0 {
791            true
792        } else if cur_len >= 8 {
793            // Compare cached 8-byte prefix first
794            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
795            if cur_prefix != prev_prefix {
796                false
797            } else if cur_len <= 8 {
798                true // prefix covers entire line
799            } else if cur_len <= 16 {
800                // Check last 8 bytes (overlapping)
801                unsafe {
802                    let a_tail =
803                        (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
804                    let b_tail = (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
805                    a_tail == b_tail
806                }
807            } else if cur_len <= 32 {
808                // Check bytes 8-16 and last 8 bytes
809                unsafe {
810                    let a16 = (base.add(prev_start + 8) as *const u64).read_unaligned();
811                    let b16 = (base.add(cur_start + 8) as *const u64).read_unaligned();
812                    if a16 != b16 {
813                        false
814                    } else {
815                        let a_tail =
816                            (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
817                        let b_tail =
818                            (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
819                        a_tail == b_tail
820                    }
821                }
822            } else if cur_len <= 256 {
823                // 33-256 bytes: tight u64 loop with XOR-OR batching.
824                // Compares 32 bytes per iteration (4 u64 loads), reducing
825                // branch mispredictions vs individual comparisons.
826                unsafe {
827                    let ap = base.add(prev_start);
828                    let bp = base.add(cur_start);
829                    let mut off = 8usize; // first 8 bytes already compared via prefix
830                    let mut eq = true;
831                    while off + 32 <= cur_len {
832                        let a0 = (ap.add(off) as *const u64).read_unaligned();
833                        let b0 = (bp.add(off) as *const u64).read_unaligned();
834                        let a1 = (ap.add(off + 8) as *const u64).read_unaligned();
835                        let b1 = (bp.add(off + 8) as *const u64).read_unaligned();
836                        let a2 = (ap.add(off + 16) as *const u64).read_unaligned();
837                        let b2 = (bp.add(off + 16) as *const u64).read_unaligned();
838                        let a3 = (ap.add(off + 24) as *const u64).read_unaligned();
839                        let b3 = (bp.add(off + 24) as *const u64).read_unaligned();
840                        if (a0 ^ b0) | (a1 ^ b1) | (a2 ^ b2) | (a3 ^ b3) != 0 {
841                            eq = false;
842                            break;
843                        }
844                        off += 32;
845                    }
846                    if eq {
847                        while off + 8 <= cur_len {
848                            let aw = (ap.add(off) as *const u64).read_unaligned();
849                            let bw = (bp.add(off) as *const u64).read_unaligned();
850                            if aw != bw {
851                                eq = false;
852                                break;
853                            }
854                            off += 8;
855                        }
856                    }
857                    if eq && off < cur_len {
858                        let a_tail = (ap.add(cur_len - 8) as *const u64).read_unaligned();
859                        let b_tail = (bp.add(cur_len - 8) as *const u64).read_unaligned();
860                        eq = a_tail == b_tail;
861                    }
862                    eq
863                }
864            } else {
865                // For longer lines (>256), use unsafe slice comparison
866                unsafe {
867                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
868                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
869                    a == b
870                }
871            }
872        } else {
873            // Short line < 8 bytes — direct byte comparison
874            unsafe {
875                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
876                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
877                a == b
878            }
879        };
880
881        if is_dup {
882            // Duplicate — flush the current run up to this line, then skip it
883            if run_start < cur_start {
884                writer.write_all(&data[run_start..cur_start])?;
885            }
886            // Start new run after this duplicate
887            run_start = if cur_end < data_len {
888                cur_end + 1
889            } else {
890                cur_end
891            };
892        } else {
893            // Different line — update cached comparison state
894            prev_start = cur_start;
895            prev_len = cur_len;
896            prev_prefix = if cur_len >= 8 {
897                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
898            } else {
899                0
900            };
901            last_output_end = if cur_end < data_len {
902                cur_end + 1
903            } else {
904                cur_end
905            };
906        }
907
908        if cur_end < data_len {
909            cur_start = cur_end + 1;
910        } else {
911            break;
912        }
913    }
914
915    // Flush remaining run
916    if run_start < data_len {
917        writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
918    }
919
920    // Ensure trailing terminator
921    if data_len > 0 && unsafe { *base.add(data_len - 1) } != term {
922        writer.write_all(&[term])?;
923    }
924
925    Ok(())
926}
927
928/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
929/// positions in each chunk in parallel, then write output runs directly from
930/// the original data. No per-chunk buffer allocation needed.
931fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
932    use rayon::prelude::*;
933
934    let num_threads = rayon::current_num_threads().max(1);
935    let chunk_target = data.len() / num_threads;
936
937    // Find chunk boundaries aligned to line terminators
938    let mut boundaries = Vec::with_capacity(num_threads + 1);
939    boundaries.push(0usize);
940    for i in 1..num_threads {
941        let target = i * chunk_target;
942        if target >= data.len() {
943            break;
944        }
945        if let Some(p) = memchr::memchr(term, &data[target..]) {
946            let b = target + p + 1;
947            if b > *boundaries.last().unwrap() && b <= data.len() {
948                boundaries.push(b);
949            }
950        }
951    }
952    boundaries.push(data.len());
953
954    let n_chunks = boundaries.len() - 1;
955    if n_chunks <= 1 {
956        return process_default_sequential(data, writer, term);
957    }
958
959    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
960    struct ChunkResult {
961        /// Byte ranges in the original data to output (contiguous runs)
962        runs: Vec<(usize, usize)>,
963        /// First line in chunk (absolute offsets into data, content without term)
964        first_line_start: usize,
965        first_line_end: usize,
966        /// Last *output* line in chunk (content without term)
967        last_line_start: usize,
968        last_line_end: usize,
969    }
970
971    let results: Vec<ChunkResult> = boundaries
972        .windows(2)
973        .collect::<Vec<_>>()
974        .par_iter()
975        .map(|w| {
976            let chunk_start = w[0];
977            let chunk_end = w[1];
978            let chunk = &data[chunk_start..chunk_end];
979
980            let first_term = match memchr::memchr(term, chunk) {
981                Some(pos) => pos,
982                None => {
983                    return ChunkResult {
984                        runs: vec![(chunk_start, chunk_end)],
985                        first_line_start: chunk_start,
986                        first_line_end: chunk_end,
987                        last_line_start: chunk_start,
988                        last_line_end: chunk_end,
989                    };
990                }
991            };
992
993            let first_line_start = chunk_start;
994            let first_line_end = chunk_start + first_term;
995
996            let mut runs: Vec<(usize, usize)> = Vec::new();
997            let mut run_start = chunk_start;
998            let mut prev_start = 0usize;
999            let mut _prev_end = first_term;
1000            let mut last_out_start = chunk_start;
1001            let mut last_out_end = first_line_end;
1002
1003            let mut prev_len = first_term;
1004            let chunk_base = chunk.as_ptr();
1005            let chunk_len = chunk.len();
1006            // Cache previous line's prefix for fast rejection
1007            let mut prev_prefix: u64 = if prev_len >= 8 {
1008                unsafe { (chunk_base as *const u64).read_unaligned() }
1009            } else {
1010                0
1011            };
1012            let mut cur_start = first_term + 1;
1013            while cur_start < chunk_len {
1014                // Speculative line-end: check if next line has same length
1015                let cur_end = {
1016                    let spec = cur_start + prev_len;
1017                    if spec < chunk_len && unsafe { *chunk_base.add(spec) } == term {
1018                        spec
1019                    } else {
1020                        match memchr::memchr(term, unsafe {
1021                            std::slice::from_raw_parts(
1022                                chunk_base.add(cur_start),
1023                                chunk_len - cur_start,
1024                            )
1025                        }) {
1026                            Some(offset) => cur_start + offset,
1027                            None => chunk_len,
1028                        }
1029                    }
1030                };
1031
1032                let cur_len = cur_end - cur_start;
1033                // Fast reject: length + prefix + full comparison
1034                let is_dup = if cur_len != prev_len {
1035                    false
1036                } else if cur_len == 0 {
1037                    true
1038                } else if cur_len >= 8 {
1039                    let cur_prefix =
1040                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() };
1041                    if cur_prefix != prev_prefix {
1042                        false
1043                    } else if cur_len <= 8 {
1044                        true
1045                    } else {
1046                        unsafe {
1047                            let a =
1048                                std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1049                            let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1050                            lines_equal_after_prefix(a, b)
1051                        }
1052                    }
1053                } else {
1054                    unsafe {
1055                        let a = std::slice::from_raw_parts(chunk_base.add(prev_start), prev_len);
1056                        let b = std::slice::from_raw_parts(chunk_base.add(cur_start), cur_len);
1057                        a == b
1058                    }
1059                };
1060
1061                if is_dup {
1062                    // Duplicate — flush current run up to this line
1063                    let abs_cur = chunk_start + cur_start;
1064                    if run_start < abs_cur {
1065                        runs.push((run_start, abs_cur));
1066                    }
1067                    // New run starts after this duplicate
1068                    run_start = chunk_start
1069                        + if cur_end < chunk_len {
1070                            cur_end + 1
1071                        } else {
1072                            cur_end
1073                        };
1074                } else {
1075                    last_out_start = chunk_start + cur_start;
1076                    last_out_end = chunk_start + cur_end;
1077                    prev_len = cur_len;
1078                    prev_prefix = if cur_len >= 8 {
1079                        unsafe { (chunk_base.add(cur_start) as *const u64).read_unaligned() }
1080                    } else {
1081                        0
1082                    };
1083                }
1084                prev_start = cur_start;
1085                _prev_end = cur_end;
1086
1087                if cur_end < chunk_len {
1088                    cur_start = cur_end + 1;
1089                } else {
1090                    break;
1091                }
1092            }
1093
1094            // Close final run
1095            if run_start < chunk_end {
1096                runs.push((run_start, chunk_end));
1097            }
1098
1099            ChunkResult {
1100                runs,
1101                first_line_start,
1102                first_line_end,
1103                last_line_start: last_out_start,
1104                last_line_end: last_out_end,
1105            }
1106        })
1107        .collect();
1108
1109    // Write results, adjusting cross-chunk boundaries.
1110    // Batch output runs via write_vectored to reduce syscall count.
1111    const BATCH: usize = 256;
1112    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH);
1113    for (i, result) in results.iter().enumerate() {
1114        let skip_first = if i > 0 {
1115            let prev = &results[i - 1];
1116            let prev_last = &data[prev.last_line_start..prev.last_line_end];
1117            let cur_first = &data[result.first_line_start..result.first_line_end];
1118            lines_equal_fast(prev_last, cur_first)
1119        } else {
1120            false
1121        };
1122
1123        let skip_end = if skip_first {
1124            // Skip bytes up to and including the first line's terminator
1125            result.first_line_end + 1
1126        } else {
1127            0
1128        };
1129
1130        for &(rs, re) in &result.runs {
1131            let actual_start = rs.max(skip_end);
1132            if actual_start < re {
1133                slices.push(io::IoSlice::new(&data[actual_start..re]));
1134                if slices.len() >= BATCH {
1135                    write_all_vectored(writer, &slices)?;
1136                    slices.clear();
1137                }
1138            }
1139        }
1140    }
1141    if !slices.is_empty() {
1142        write_all_vectored(writer, &slices)?;
1143    }
1144
1145    // Ensure trailing terminator
1146    if !data.is_empty() && *data.last().unwrap() != term {
1147        writer.write_all(&[term])?;
1148    }
1149
1150    Ok(())
1151}
1152
1153/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
1154/// Zero-copy: writes directly from mmap data through BufWriter.
1155/// Uses speculative line-end detection and 8-byte prefix caching for fast
1156/// duplicate detection without full memcmp.
1157fn process_filter_fast_singlepass(
1158    data: &[u8],
1159    writer: &mut impl Write,
1160    config: &UniqConfig,
1161    term: u8,
1162) -> io::Result<()> {
1163    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1164    let data_len = data.len();
1165    let base = data.as_ptr();
1166
1167    let first_term = match memchr::memchr(term, data) {
1168        Some(pos) => pos,
1169        None => {
1170            // Single line: unique (count=1)
1171            if !repeated {
1172                writer.write_all(data)?;
1173                writer.write_all(&[term])?;
1174            }
1175            return Ok(());
1176        }
1177    };
1178
1179    let mut prev_start: usize = 0;
1180    let mut prev_end: usize = first_term;
1181    let mut prev_len = prev_end;
1182    let mut prev_prefix: u64 = if prev_len >= 8 {
1183        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1184    } else {
1185        0
1186    };
1187    let mut count: u64 = 1;
1188    let mut cur_start = first_term + 1;
1189
1190    // Batch output using IoSlice write_vectored to reduce syscall overhead.
1191    // Each output line needs 2 slices: content + terminator.
1192    const BATCH: usize = 512;
1193    let term_slice: [u8; 1] = [term];
1194    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1195
1196    while cur_start < data_len {
1197        // Speculative line-end detection
1198        let cur_end = {
1199            let speculative = cur_start + prev_len;
1200            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1201                speculative
1202            } else {
1203                match memchr::memchr(term, unsafe {
1204                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1205                }) {
1206                    Some(offset) => cur_start + offset,
1207                    None => data_len,
1208                }
1209            }
1210        };
1211
1212        let cur_len = cur_end - cur_start;
1213
1214        // Fast reject using length + 8-byte prefix.
1215        // After prefix match, use lines_equal_after_prefix which skips
1216        // the already-checked length/prefix/empty checks.
1217        let is_dup = if cur_len != prev_len {
1218            false
1219        } else if cur_len == 0 {
1220            true
1221        } else if cur_len >= 8 {
1222            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1223            if cur_prefix != prev_prefix {
1224                false
1225            } else if cur_len <= 8 {
1226                true
1227            } else {
1228                unsafe {
1229                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1230                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1231                    lines_equal_after_prefix(a, b)
1232                }
1233            }
1234        } else {
1235            unsafe {
1236                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1237                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1238                a == b
1239            }
1240        };
1241
1242        if is_dup {
1243            count += 1;
1244        } else {
1245            let should_print = if repeated { count > 1 } else { count == 1 };
1246            if should_print {
1247                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1248                slices.push(io::IoSlice::new(&term_slice));
1249                if slices.len() >= BATCH * 2 {
1250                    write_all_vectored(writer, &slices)?;
1251                    slices.clear();
1252                }
1253            }
1254            prev_start = cur_start;
1255            prev_end = cur_end;
1256            prev_len = cur_len;
1257            prev_prefix = if cur_len >= 8 {
1258                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1259            } else {
1260                0
1261            };
1262            count = 1;
1263        }
1264
1265        if cur_end < data_len {
1266            cur_start = cur_end + 1;
1267        } else {
1268            break;
1269        }
1270    }
1271
1272    // Output last group
1273    let should_print = if repeated { count > 1 } else { count == 1 };
1274    if should_print {
1275        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1276        slices.push(io::IoSlice::new(&term_slice));
1277    }
1278    if !slices.is_empty() {
1279        write_all_vectored(writer, &slices)?;
1280    }
1281
1282    Ok(())
1283}
1284
1285/// Fast single-pass for count mode (-c) with all standard output modes.
1286/// Zero line_starts allocation: scans with memchr, counts groups inline,
1287/// and writes count-prefixed lines directly.
1288/// Uses cached length comparison for fast duplicate rejection.
1289/// Uses raw pointer arithmetic to avoid bounds checking.
1290///
1291/// Zero-copy output: uses writev (IoSlice) to write count prefixes from a
1292/// small arena + line content directly from mmap'd data + terminator bytes.
1293/// This avoids copying line content into an intermediate buffer entirely.
1294///
1295/// Optimizations:
1296/// - Speculative line-end detection: if all lines have the same length (common
1297///   in repetitive data), we can skip the memchr SIMD scan entirely by checking
1298///   if data[cur_start + prev_len] is the terminator.
1299/// - Cached 8-byte prefix rejection: avoids full comparison for most non-equal lines.
1300/// - IoSlice writev batching: eliminates memcpy of line content.
1301fn process_count_fast_singlepass(
1302    data: &[u8],
1303    writer: &mut impl Write,
1304    config: &UniqConfig,
1305    term: u8,
1306) -> io::Result<()> {
1307    let data_len = data.len();
1308    let base = data.as_ptr();
1309    let first_term = match memchr::memchr(term, data) {
1310        Some(pos) => pos,
1311        None => {
1312            // Single line: count=1
1313            let should_print = match config.mode {
1314                OutputMode::Default => true,
1315                OutputMode::RepeatedOnly => false,
1316                OutputMode::UniqueOnly => true,
1317                _ => true,
1318            };
1319            if should_print {
1320                write_count_line(writer, 1, data, term)?;
1321            }
1322            return Ok(());
1323        }
1324    };
1325
1326    let mut prev_start: usize = 0;
1327    let mut prev_end: usize = first_term;
1328    let mut prev_len = prev_end;
1329    let mut prev_prefix: u64 = if prev_len >= 8 {
1330        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
1331    } else {
1332        0
1333    };
1334    let mut count: u64 = 1;
1335    let mut cur_start = first_term + 1;
1336
1337    // Zero-copy writev batching: accumulate groups as (prefix_offset, prefix_len,
1338    // line_start, line_end) tuples, with prefixes stored in a flat byte buffer.
1339    // Build IoSlice arrays at flush time to avoid borrow conflicts.
1340    // Line content points directly into mmap'd data — zero copy.
1341    const BATCH: usize = 340;
1342    const PREFIX_SLOT: usize = 28; // max prefix size per group
1343    let term_slice: [u8; 1] = [term];
1344    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1345    // Each group: (prefix_len, line_start_in_data, line_end_in_data)
1346    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1347
1348    while cur_start < data_len {
1349        let cur_end = {
1350            let speculative = cur_start + prev_len;
1351            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1352                speculative
1353            } else {
1354                match memchr::memchr(term, unsafe {
1355                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1356                }) {
1357                    Some(offset) => cur_start + offset,
1358                    None => data_len,
1359                }
1360            }
1361        };
1362
1363        let cur_len = cur_end - cur_start;
1364
1365        let is_dup = if cur_len != prev_len {
1366            false
1367        } else if cur_len == 0 {
1368            true
1369        } else if cur_len >= 8 {
1370            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
1371            if cur_prefix != prev_prefix {
1372                false
1373            } else if cur_len <= 8 {
1374                true
1375            } else {
1376                unsafe {
1377                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1378                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1379                    lines_equal_after_prefix(a, b)
1380                }
1381            }
1382        } else {
1383            unsafe {
1384                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1385                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1386                a == b
1387            }
1388        };
1389
1390        if is_dup {
1391            count += 1;
1392        } else {
1393            let should_print = match config.mode {
1394                OutputMode::RepeatedOnly => count > 1,
1395                OutputMode::UniqueOnly => count == 1,
1396                _ => true,
1397            };
1398            if should_print {
1399                let idx = groups.len();
1400                let prefix_off = idx * PREFIX_SLOT;
1401                let prefix_len = format_count_prefix_into(
1402                    count,
1403                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1404                );
1405                groups.push((prefix_len, prev_start, prev_end));
1406
1407                if groups.len() >= BATCH {
1408                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1409                    groups.clear();
1410                    // Re-fill prefix_buf with spaces for next batch
1411                    prefix_buf.fill(b' ');
1412                }
1413            }
1414            prev_start = cur_start;
1415            prev_end = cur_end;
1416            prev_len = cur_len;
1417            prev_prefix = if cur_len >= 8 {
1418                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
1419            } else {
1420                0
1421            };
1422            count = 1;
1423        }
1424
1425        if cur_end < data_len {
1426            cur_start = cur_end + 1;
1427        } else {
1428            break;
1429        }
1430    }
1431
1432    // Output last group
1433    let should_print = match config.mode {
1434        OutputMode::RepeatedOnly => count > 1,
1435        OutputMode::UniqueOnly => count == 1,
1436        _ => true,
1437    };
1438    if should_print {
1439        let idx = groups.len();
1440        let prefix_off = idx * PREFIX_SLOT;
1441        let prefix_len =
1442            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1443        groups.push((prefix_len, prev_start, prev_end));
1444    }
1445    if !groups.is_empty() {
1446        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1447    }
1448
1449    Ok(())
1450}
1451
1452/// Flush batched count groups using write_vectored (writev).
1453/// Builds IoSlice arrays from the prefix buffer and mmap'd data.
1454#[inline]
1455fn flush_count_groups(
1456    writer: &mut impl Write,
1457    prefix_buf: &[u8],
1458    groups: &[(usize, usize, usize)],
1459    term_slice: &[u8; 1],
1460    data: &[u8],
1461) -> io::Result<()> {
1462    const PREFIX_SLOT: usize = 28;
1463    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(groups.len() * 3);
1464    for (i, &(prefix_len, line_start, line_end)) in groups.iter().enumerate() {
1465        let prefix_off = i * PREFIX_SLOT;
1466        slices.push(io::IoSlice::new(
1467            &prefix_buf[prefix_off..prefix_off + prefix_len],
1468        ));
1469        slices.push(io::IoSlice::new(&data[line_start..line_end]));
1470        slices.push(io::IoSlice::new(term_slice));
1471    }
1472    write_all_vectored(writer, &slices)
1473}
1474
1475/// Format a count prefix into a buffer slot, returning the prefix length.
1476/// GNU format: "%7lu " — right-aligned count in 7-char field, followed by space.
1477/// Buffer must be pre-filled with spaces and at least 28 bytes.
1478#[inline(always)]
1479fn format_count_prefix_into(count: u64, buf: &mut [u8]) -> usize {
1480    if count <= 9 {
1481        buf[6] = b'0' + count as u8;
1482        buf[7] = b' ';
1483        return 8;
1484    }
1485    // Use itoa on a temp array, then copy
1486    let mut tmp = [b' '; 28];
1487    let digits = itoa_right_aligned_into(&mut tmp, count);
1488    let width = digits.max(7);
1489    tmp[width] = b' ';
1490    let len = width + 1;
1491    buf[..len].copy_from_slice(&tmp[..len]);
1492    len
1493}
1494
1495/// Fast single-pass for case-insensitive (-i) default mode.
1496/// Uses run-tracking zero-copy output and write_vectored batching.
1497/// Includes speculative line-end detection and length-based early rejection.
1498fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1499    let data_len = data.len();
1500    let base = data.as_ptr();
1501
1502    let first_end = match memchr::memchr(term, data) {
1503        Some(pos) => pos,
1504        None => {
1505            writer.write_all(data)?;
1506            return writer.write_all(&[term]);
1507        }
1508    };
1509
1510    let mut prev_start: usize = 0;
1511    let mut prev_len = first_end;
1512
1513    // Run-tracking: flush contiguous regions from the original data.
1514    let mut run_start: usize = 0;
1515    let mut cur_start = first_end + 1;
1516    let mut _last_output_end = first_end + 1;
1517
1518    while cur_start < data_len {
1519        // Speculative line-end detection
1520        let cur_end = {
1521            let speculative = cur_start + prev_len;
1522            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1523                speculative
1524            } else {
1525                match memchr::memchr(term, unsafe {
1526                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1527                }) {
1528                    Some(offset) => cur_start + offset,
1529                    None => data_len,
1530                }
1531            }
1532        };
1533
1534        let cur_len = cur_end - cur_start;
1535
1536        // Length-based early rejection before expensive case-insensitive compare
1537        let is_dup = cur_len == prev_len
1538            && unsafe {
1539                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1540                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1541                a.eq_ignore_ascii_case(b)
1542            };
1543
1544        if is_dup {
1545            // Duplicate — flush current run up to this line, skip it
1546            if run_start < cur_start {
1547                writer.write_all(&data[run_start..cur_start])?;
1548            }
1549            run_start = if cur_end < data_len {
1550                cur_end + 1
1551            } else {
1552                cur_end
1553            };
1554        } else {
1555            prev_start = cur_start;
1556            prev_len = cur_len;
1557            _last_output_end = if cur_end < data_len {
1558                cur_end + 1
1559            } else {
1560                cur_end
1561            };
1562        }
1563
1564        if cur_end < data_len {
1565            cur_start = cur_end + 1;
1566        } else {
1567            break;
1568        }
1569    }
1570
1571    // Flush remaining run
1572    if run_start < data_len {
1573        writer.write_all(&data[run_start..data_len])?;
1574    }
1575    // Ensure trailing terminator
1576    if !data.is_empty() && data[data_len - 1] != term {
1577        writer.write_all(&[term])?;
1578    }
1579
1580    Ok(())
1581}
1582
1583/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1584/// Zero-copy: writes directly from mmap data through BufWriter.
1585/// Uses speculative line-end detection and length-based early rejection.
1586fn process_filter_ci_singlepass(
1587    data: &[u8],
1588    writer: &mut impl Write,
1589    config: &UniqConfig,
1590    term: u8,
1591) -> io::Result<()> {
1592    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1593    let data_len = data.len();
1594    let base = data.as_ptr();
1595
1596    let first_term = match memchr::memchr(term, data) {
1597        Some(pos) => pos,
1598        None => {
1599            if !repeated {
1600                writer.write_all(data)?;
1601                writer.write_all(&[term])?;
1602            }
1603            return Ok(());
1604        }
1605    };
1606
1607    let mut prev_start: usize = 0;
1608    let mut prev_end: usize = first_term;
1609    let mut prev_len = prev_end;
1610    let mut count: u64 = 1;
1611    let mut cur_start = first_term + 1;
1612
1613    // Batch output using IoSlice write_vectored
1614    const BATCH: usize = 512;
1615    let term_slice: [u8; 1] = [term];
1616    let mut slices: Vec<io::IoSlice<'_>> = Vec::with_capacity(BATCH * 2);
1617
1618    while cur_start < data_len {
1619        // Speculative line-end detection
1620        let cur_end = {
1621            let speculative = cur_start + prev_len;
1622            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1623                speculative
1624            } else {
1625                match memchr::memchr(term, unsafe {
1626                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1627                }) {
1628                    Some(offset) => cur_start + offset,
1629                    None => data_len,
1630                }
1631            }
1632        };
1633
1634        let cur_len = cur_end - cur_start;
1635        // Length check + case-insensitive comparison
1636        let is_dup = cur_len == prev_len
1637            && lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]);
1638
1639        if is_dup {
1640            count += 1;
1641        } else {
1642            let should_print = if repeated { count > 1 } else { count == 1 };
1643            if should_print {
1644                slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1645                slices.push(io::IoSlice::new(&term_slice));
1646                if slices.len() >= BATCH * 2 {
1647                    write_all_vectored(writer, &slices)?;
1648                    slices.clear();
1649                }
1650            }
1651            prev_start = cur_start;
1652            prev_end = cur_end;
1653            prev_len = cur_len;
1654            count = 1;
1655        }
1656
1657        if cur_end < data_len {
1658            cur_start = cur_end + 1;
1659        } else {
1660            break;
1661        }
1662    }
1663
1664    let should_print = if repeated { count > 1 } else { count == 1 };
1665    if should_print {
1666        slices.push(io::IoSlice::new(&data[prev_start..prev_end]));
1667        slices.push(io::IoSlice::new(&term_slice));
1668    }
1669    if !slices.is_empty() {
1670        write_all_vectored(writer, &slices)?;
1671    }
1672
1673    Ok(())
1674}
1675
1676/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1677/// Writes directly to BufWriter — no batch_buf allocation needed.
1678fn process_count_ci_singlepass(
1679    data: &[u8],
1680    writer: &mut impl Write,
1681    config: &UniqConfig,
1682    term: u8,
1683) -> io::Result<()> {
1684    let first_term = match memchr::memchr(term, data) {
1685        Some(pos) => pos,
1686        None => {
1687            let should_print = match config.mode {
1688                OutputMode::Default => true,
1689                OutputMode::RepeatedOnly => false,
1690                OutputMode::UniqueOnly => true,
1691                _ => true,
1692            };
1693            if should_print {
1694                write_count_line(writer, 1, data, term)?;
1695            }
1696            return Ok(());
1697        }
1698    };
1699
1700    let is_default = matches!(config.mode, OutputMode::Default);
1701
1702    let mut prev_start: usize = 0;
1703    let mut prev_end: usize = first_term;
1704    let mut count: u64 = 1;
1705    let mut cur_start = first_term + 1;
1706
1707    // Zero-copy writev batching: same approach as process_count_fast_singlepass
1708    const BATCH: usize = 340;
1709    const PREFIX_SLOT: usize = 28;
1710    let term_slice: [u8; 1] = [term];
1711    let mut prefix_buf = vec![b' '; BATCH * PREFIX_SLOT];
1712    let mut groups: Vec<(usize, usize, usize)> = Vec::with_capacity(BATCH);
1713
1714    let base = data.as_ptr();
1715    let data_len = data.len();
1716    let mut prev_len = prev_end - prev_start;
1717
1718    while cur_start < data_len {
1719        // Speculative line-end detection
1720        let cur_end = {
1721            let speculative = cur_start + prev_len;
1722            if speculative < data_len && unsafe { *base.add(speculative) } == term {
1723                speculative
1724            } else {
1725                match memchr::memchr(term, unsafe {
1726                    std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
1727                }) {
1728                    Some(offset) => cur_start + offset,
1729                    None => data_len,
1730                }
1731            }
1732        };
1733
1734        let cur_len = cur_end - cur_start;
1735        // Length-based early rejection before expensive case-insensitive compare
1736        let is_dup = cur_len == prev_len
1737            && data[prev_start..prev_end].eq_ignore_ascii_case(&data[cur_start..cur_end]);
1738
1739        if is_dup {
1740            count += 1;
1741        } else {
1742            let should_print = if is_default {
1743                true
1744            } else {
1745                match config.mode {
1746                    OutputMode::RepeatedOnly => count > 1,
1747                    OutputMode::UniqueOnly => count == 1,
1748                    _ => true,
1749                }
1750            };
1751            if should_print {
1752                let idx = groups.len();
1753                let prefix_off = idx * PREFIX_SLOT;
1754                let prefix_len = format_count_prefix_into(
1755                    count,
1756                    &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT],
1757                );
1758                groups.push((prefix_len, prev_start, prev_end));
1759
1760                if groups.len() >= BATCH {
1761                    flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1762                    groups.clear();
1763                    prefix_buf.fill(b' ');
1764                }
1765            }
1766            prev_start = cur_start;
1767            prev_end = cur_end;
1768            prev_len = cur_len;
1769            count = 1;
1770        }
1771
1772        if cur_end < data_len {
1773            cur_start = cur_end + 1;
1774        } else {
1775            break;
1776        }
1777    }
1778
1779    let should_print = if is_default {
1780        true
1781    } else {
1782        match config.mode {
1783            OutputMode::RepeatedOnly => count > 1,
1784            OutputMode::UniqueOnly => count == 1,
1785            _ => true,
1786        }
1787    };
1788    if should_print {
1789        let idx = groups.len();
1790        let prefix_off = idx * PREFIX_SLOT;
1791        let prefix_len =
1792            format_count_prefix_into(count, &mut prefix_buf[prefix_off..prefix_off + PREFIX_SLOT]);
1793        groups.push((prefix_len, prev_start, prev_end));
1794    }
1795    if !groups.is_empty() {
1796        flush_count_groups(writer, &prefix_buf, &groups, &term_slice, data)?;
1797    }
1798
1799    Ok(())
1800}
1801
1802/// Output a group for standard modes (bytes path).
1803#[inline(always)]
1804fn output_group_bytes(
1805    writer: &mut impl Write,
1806    content: &[u8],
1807    full: &[u8],
1808    count: u64,
1809    config: &UniqConfig,
1810    term: u8,
1811) -> io::Result<()> {
1812    let should_print = match config.mode {
1813        OutputMode::Default => true,
1814        OutputMode::RepeatedOnly => count > 1,
1815        OutputMode::UniqueOnly => count == 1,
1816        _ => true,
1817    };
1818
1819    if should_print {
1820        if config.count {
1821            write_count_line(writer, count, content, term)?;
1822        } else {
1823            writer.write_all(full)?;
1824            // Add terminator if the original line didn't have one
1825            if full.len() == content.len() {
1826                writer.write_all(&[term])?;
1827            }
1828        }
1829    }
1830
1831    Ok(())
1832}
1833
1834/// Process --all-repeated / -D mode on byte slices.
1835fn process_all_repeated_bytes(
1836    data: &[u8],
1837    writer: &mut impl Write,
1838    config: &UniqConfig,
1839    method: AllRepeatedMethod,
1840    term: u8,
1841) -> io::Result<()> {
1842    let mut lines = LineIter::new(data, term);
1843
1844    let first = match lines.next() {
1845        Some(v) => v,
1846        None => return Ok(()),
1847    };
1848
1849    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1850    // For all-repeated we need to buffer group lines since we only print if count > 1
1851    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1852    group_lines.push(first);
1853    let mut first_group_printed = false;
1854
1855    let fast = !needs_key_extraction(config) && !config.ignore_case;
1856
1857    for (cur_content, cur_full) in lines {
1858        let prev_content = group_lines.last().unwrap().0;
1859        let equal = if fast {
1860            lines_equal_fast(prev_content, cur_content)
1861        } else {
1862            lines_equal(prev_content, cur_content, config)
1863        };
1864
1865        if equal {
1866            group_lines.push((cur_content, cur_full));
1867        } else {
1868            // Flush group
1869            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1870            group_lines.clear();
1871            group_lines.push((cur_content, cur_full));
1872        }
1873    }
1874
1875    // Flush last group
1876    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1877
1878    Ok(())
1879}
1880
1881/// Flush a group for --all-repeated mode (bytes path).
1882fn flush_all_repeated_bytes(
1883    writer: &mut impl Write,
1884    group: &[(&[u8], &[u8])],
1885    method: AllRepeatedMethod,
1886    first_group_printed: &mut bool,
1887    term: u8,
1888) -> io::Result<()> {
1889    if group.len() <= 1 {
1890        return Ok(()); // Not a duplicate group
1891    }
1892
1893    match method {
1894        AllRepeatedMethod::Prepend => {
1895            writer.write_all(&[term])?;
1896        }
1897        AllRepeatedMethod::Separate => {
1898            if *first_group_printed {
1899                writer.write_all(&[term])?;
1900            }
1901        }
1902        AllRepeatedMethod::None => {}
1903    }
1904
1905    for &(content, full) in group {
1906        writer.write_all(full)?;
1907        if full.len() == content.len() {
1908            writer.write_all(&[term])?;
1909        }
1910    }
1911
1912    *first_group_printed = true;
1913    Ok(())
1914}
1915
1916/// Process --group mode on byte slices.
1917fn process_group_bytes(
1918    data: &[u8],
1919    writer: &mut impl Write,
1920    config: &UniqConfig,
1921    method: GroupMethod,
1922    term: u8,
1923) -> io::Result<()> {
1924    let mut lines = LineIter::new(data, term);
1925
1926    let (prev_content, prev_full) = match lines.next() {
1927        Some(v) => v,
1928        None => return Ok(()),
1929    };
1930
1931    // Prepend/Both: separator before first group
1932    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1933        writer.write_all(&[term])?;
1934    }
1935
1936    // Write first line
1937    writer.write_all(prev_full)?;
1938    if prev_full.len() == prev_content.len() {
1939        writer.write_all(&[term])?;
1940    }
1941
1942    let mut prev_content = prev_content;
1943    let fast = !needs_key_extraction(config) && !config.ignore_case;
1944
1945    for (cur_content, cur_full) in lines {
1946        let equal = if fast {
1947            lines_equal_fast(prev_content, cur_content)
1948        } else {
1949            lines_equal(prev_content, cur_content, config)
1950        };
1951
1952        if !equal {
1953            // New group — write separator
1954            writer.write_all(&[term])?;
1955        }
1956
1957        writer.write_all(cur_full)?;
1958        if cur_full.len() == cur_content.len() {
1959            writer.write_all(&[term])?;
1960        }
1961
1962        prev_content = cur_content;
1963    }
1964
1965    // Append/Both: separator after last group
1966    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1967        writer.write_all(&[term])?;
1968    }
1969
1970    Ok(())
1971}
1972
1973// ============================================================================
1974// Streaming processing (for stdin / pipe input)
1975// ============================================================================
1976
1977/// Main streaming uniq processor.
1978/// Reads from `input`, writes to `output`.
1979pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
1980    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
1981    let mut writer = BufWriter::with_capacity(32 * 1024 * 1024, output);
1982    let term = if config.zero_terminated { b'\0' } else { b'\n' };
1983
1984    match config.mode {
1985        OutputMode::Group(method) => {
1986            process_group_stream(reader, &mut writer, config, method, term)?;
1987        }
1988        OutputMode::AllRepeated(method) => {
1989            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
1990        }
1991        _ => {
1992            process_standard_stream(reader, &mut writer, config, term)?;
1993        }
1994    }
1995
1996    writer.flush()?;
1997    Ok(())
1998}
1999
2000/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
2001fn process_standard_stream<R: BufRead, W: Write>(
2002    mut reader: R,
2003    writer: &mut W,
2004    config: &UniqConfig,
2005    term: u8,
2006) -> io::Result<()> {
2007    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2008    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2009
2010    // Read first line
2011    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2012        return Ok(()); // empty input
2013    }
2014    let mut count: u64 = 1;
2015
2016    loop {
2017        current_line.clear();
2018        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2019
2020        if bytes_read == 0 {
2021            // End of input — output the last group
2022            output_group_stream(writer, &prev_line, count, config, term)?;
2023            break;
2024        }
2025
2026        if compare_lines_stream(&prev_line, &current_line, config, term) {
2027            count += 1;
2028        } else {
2029            output_group_stream(writer, &prev_line, count, config, term)?;
2030            std::mem::swap(&mut prev_line, &mut current_line);
2031            count = 1;
2032        }
2033    }
2034
2035    Ok(())
2036}
2037
2038/// Compare two lines (with terminators) in streaming mode.
2039#[inline(always)]
2040fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
2041    let a_stripped = strip_term(a, term);
2042    let b_stripped = strip_term(b, term);
2043    lines_equal(a_stripped, b_stripped, config)
2044}
2045
2046/// Strip terminator from end of line.
2047#[inline(always)]
2048fn strip_term(line: &[u8], term: u8) -> &[u8] {
2049    if line.last() == Some(&term) {
2050        &line[..line.len() - 1]
2051    } else {
2052        line
2053    }
2054}
2055
2056/// Output a group in streaming mode.
2057#[inline(always)]
2058fn output_group_stream(
2059    writer: &mut impl Write,
2060    line: &[u8],
2061    count: u64,
2062    config: &UniqConfig,
2063    term: u8,
2064) -> io::Result<()> {
2065    let should_print = match config.mode {
2066        OutputMode::Default => true,
2067        OutputMode::RepeatedOnly => count > 1,
2068        OutputMode::UniqueOnly => count == 1,
2069        _ => true,
2070    };
2071
2072    if should_print {
2073        let content = strip_term(line, term);
2074        if config.count {
2075            write_count_line(writer, count, content, term)?;
2076        } else {
2077            writer.write_all(content)?;
2078            writer.write_all(&[term])?;
2079        }
2080    }
2081
2082    Ok(())
2083}
2084
2085/// Process --all-repeated / -D mode (streaming).
2086fn process_all_repeated_stream<R: BufRead, W: Write>(
2087    mut reader: R,
2088    writer: &mut W,
2089    config: &UniqConfig,
2090    method: AllRepeatedMethod,
2091    term: u8,
2092) -> io::Result<()> {
2093    let mut group: Vec<Vec<u8>> = Vec::new();
2094    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2095    let mut first_group_printed = false;
2096
2097    current_line.clear();
2098    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
2099        return Ok(());
2100    }
2101    group.push(current_line.clone());
2102
2103    loop {
2104        current_line.clear();
2105        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2106
2107        if bytes_read == 0 {
2108            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2109            break;
2110        }
2111
2112        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
2113            group.push(current_line.clone());
2114        } else {
2115            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
2116            group.clear();
2117            group.push(current_line.clone());
2118        }
2119    }
2120
2121    Ok(())
2122}
2123
2124/// Flush a group for --all-repeated mode (streaming).
2125fn flush_all_repeated_stream(
2126    writer: &mut impl Write,
2127    group: &[Vec<u8>],
2128    method: AllRepeatedMethod,
2129    first_group_printed: &mut bool,
2130    term: u8,
2131) -> io::Result<()> {
2132    if group.len() <= 1 {
2133        return Ok(());
2134    }
2135
2136    match method {
2137        AllRepeatedMethod::Prepend => {
2138            writer.write_all(&[term])?;
2139        }
2140        AllRepeatedMethod::Separate => {
2141            if *first_group_printed {
2142                writer.write_all(&[term])?;
2143            }
2144        }
2145        AllRepeatedMethod::None => {}
2146    }
2147
2148    for line in group {
2149        let content = strip_term(line, term);
2150        writer.write_all(content)?;
2151        writer.write_all(&[term])?;
2152    }
2153
2154    *first_group_printed = true;
2155    Ok(())
2156}
2157
2158/// Process --group mode (streaming).
2159fn process_group_stream<R: BufRead, W: Write>(
2160    mut reader: R,
2161    writer: &mut W,
2162    config: &UniqConfig,
2163    method: GroupMethod,
2164    term: u8,
2165) -> io::Result<()> {
2166    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
2167    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
2168
2169    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
2170        return Ok(());
2171    }
2172
2173    // Prepend/Both: separator before first group
2174    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
2175        writer.write_all(&[term])?;
2176    }
2177
2178    let content = strip_term(&prev_line, term);
2179    writer.write_all(content)?;
2180    writer.write_all(&[term])?;
2181
2182    loop {
2183        current_line.clear();
2184        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
2185
2186        if bytes_read == 0 {
2187            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
2188                writer.write_all(&[term])?;
2189            }
2190            break;
2191        }
2192
2193        if !compare_lines_stream(&prev_line, &current_line, config, term) {
2194            writer.write_all(&[term])?;
2195        }
2196
2197        let content = strip_term(&current_line, term);
2198        writer.write_all(content)?;
2199        writer.write_all(&[term])?;
2200
2201        std::mem::swap(&mut prev_line, &mut current_line);
2202    }
2203
2204    Ok(())
2205}
2206
2207/// Read a line terminated by the given byte (newline or NUL).
2208/// Returns number of bytes read (0 = EOF).
2209#[inline(always)]
2210fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
2211    reader.read_until(term, buf)
2212}