Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
119/// Uses length check + 8-byte prefix rejection before full comparison.
120#[inline(always)]
121fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
122    let alen = a.len();
123    if alen != b.len() {
124        return false;
125    }
126    if alen == 0 {
127        return true;
128    }
129    a.eq_ignore_ascii_case(b)
130}
131
132/// Check if config requires field/char skipping or char limiting.
133#[inline(always)]
134fn needs_key_extraction(config: &UniqConfig) -> bool {
135    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
136}
137
138/// Fast path comparison: no field/char extraction needed, no case folding.
139/// Uses pointer+length equality shortcut and multi-word prefix rejection.
140/// For short lines (<= 32 bytes, common in many-dups data), avoids the
141/// full memcmp call overhead by doing direct word comparisons.
142#[inline(always)]
143fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
144    let alen = a.len();
145    if alen != b.len() {
146        return false;
147    }
148    if alen == 0 {
149        return true;
150    }
151    // Short-line fast path: compare via word loads to avoid memcmp call overhead
152    if alen <= 8 {
153        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
154        return a == b;
155    }
156    // 8-byte prefix check: reject most non-equal lines without full memcmp
157    let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
158    let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
159    if a8 != b8 {
160        return false;
161    }
162    // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
163    if alen <= 16 {
164        let a_tail = unsafe { (a.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
165        let b_tail = unsafe { (b.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
166        return a_tail == b_tail;
167    }
168    // For 17-32 bytes: check first 16 + last 16 (overlapping) to avoid memcmp
169    if alen <= 32 {
170        let a16 = unsafe { (a.as_ptr().add(8) as *const u64).read_unaligned() };
171        let b16 = unsafe { (b.as_ptr().add(8) as *const u64).read_unaligned() };
172        if a16 != b16 {
173            return false;
174        }
175        let a_tail = unsafe { (a.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
176        let b_tail = unsafe { (b.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
177        return a_tail == b_tail;
178    }
179    // Longer lines: prefix passed, fall through to full memcmp
180    a == b
181}
182
183/// Write a count-prefixed line in GNU uniq format.
184/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
185/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
186#[inline(always)]
187fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
188    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
189    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
190    let digits = itoa_right_aligned_into(&mut prefix, count);
191    let width = digits.max(7); // minimum 7 chars
192    let prefix_len = width + 1; // +1 for trailing space
193    prefix[width] = b' ';
194
195    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
196    let total = prefix_len + line.len() + 1;
197    if total <= 256 {
198        let mut buf = [0u8; 256];
199        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
200        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
201        buf[prefix_len + line.len()] = term;
202        out.write_all(&buf[..total])
203    } else {
204        out.write_all(&prefix[..prefix_len])?;
205        out.write_all(line)?;
206        out.write_all(&[term])
207    }
208}
209
210/// Write u64 decimal right-aligned into prefix buffer.
211/// Buffer is pre-filled with spaces. Returns number of digits written.
212#[inline(always)]
213fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
214    if val == 0 {
215        buf[6] = b'0';
216        return 7; // 6 spaces + '0' = 7 chars
217    }
218    // Write digits right-to-left from position 27 (leaving room for trailing space)
219    let mut pos = 27;
220    while val > 0 {
221        pos -= 1;
222        buf[pos] = b'0' + (val % 10) as u8;
223        val /= 10;
224    }
225    let num_digits = 27 - pos;
226    if num_digits >= 7 {
227        // Number is wide enough, shift to front
228        buf.copy_within(pos..27, 0);
229        num_digits
230    } else {
231        // Right-align in 7-char field: spaces then digits
232        let pad = 7 - num_digits;
233        buf.copy_within(pos..27, pad);
234        // buf[0..pad] is already spaces from initialization
235        7
236    }
237}
238
239// ============================================================================
240// High-performance mmap-based processing (for byte slices, zero-copy)
241// ============================================================================
242
243/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
244pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
245    // 16MB output buffer — optimal for L3 cache utilization on modern CPUs.
246    // 32MB can cause cache thrashing; 16MB stays within L3 while still
247    // reducing write() syscall count significantly.
248    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
249    let term = if config.zero_terminated { b'\0' } else { b'\n' };
250
251    match config.mode {
252        OutputMode::Group(method) => {
253            process_group_bytes(data, &mut writer, config, method, term)?;
254        }
255        OutputMode::AllRepeated(method) => {
256            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
257        }
258        _ => {
259            process_standard_bytes(data, &mut writer, config, term)?;
260        }
261    }
262
263    writer.flush()?;
264    Ok(())
265}
266
267/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
268/// Uses memchr for SIMD-accelerated line boundary detection.
269struct LineIter<'a> {
270    data: &'a [u8],
271    pos: usize,
272    term: u8,
273}
274
275impl<'a> LineIter<'a> {
276    #[inline(always)]
277    fn new(data: &'a [u8], term: u8) -> Self {
278        Self { data, pos: 0, term }
279    }
280}
281
282impl<'a> Iterator for LineIter<'a> {
283    /// (line content without terminator, full line including terminator for output)
284    type Item = (&'a [u8], &'a [u8]);
285
286    #[inline(always)]
287    fn next(&mut self) -> Option<Self::Item> {
288        if self.pos >= self.data.len() {
289            return None;
290        }
291
292        let remaining = &self.data[self.pos..];
293        match memchr::memchr(self.term, remaining) {
294            Some(idx) => {
295                let line_start = self.pos;
296                let line_end = self.pos + idx; // without terminator
297                let full_end = self.pos + idx + 1; // with terminator
298                self.pos = full_end;
299                Some((
300                    &self.data[line_start..line_end],
301                    &self.data[line_start..full_end],
302                ))
303            }
304            None => {
305                // Last line without terminator
306                let line_start = self.pos;
307                self.pos = self.data.len();
308                let line = &self.data[line_start..];
309                Some((line, line))
310            }
311        }
312    }
313}
314
315/// Get line content (without terminator) from pre-computed positions.
316/// `content_end` is the end of actual content (excludes trailing terminator if present).
317#[inline(always)]
318fn line_content_at<'a>(
319    data: &'a [u8],
320    line_starts: &[usize],
321    idx: usize,
322    content_end: usize,
323) -> &'a [u8] {
324    let start = line_starts[idx];
325    let end = if idx + 1 < line_starts.len() {
326        line_starts[idx + 1] - 1 // exclude terminator
327    } else {
328        content_end // last line: pre-computed to exclude trailing terminator
329    };
330    &data[start..end]
331}
332
333/// Get full line (with terminator) from pre-computed positions.
334#[inline(always)]
335fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
336    let start = line_starts[idx];
337    let end = if idx + 1 < line_starts.len() {
338        line_starts[idx + 1] // include terminator
339    } else {
340        data.len()
341    };
342    &data[start..end]
343}
344
345/// Linear scan for the end of a duplicate group.
346/// Returns the index of the first line that differs from line_starts[group_start].
347/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
348/// equal lines can appear in non-adjacent groups separated by different lines.
349/// Caches key length for fast length-mismatch rejection.
350#[inline]
351fn linear_scan_group_end(
352    data: &[u8],
353    line_starts: &[usize],
354    group_start: usize,
355    num_lines: usize,
356    content_end: usize,
357) -> usize {
358    let key = line_content_at(data, line_starts, group_start, content_end);
359    let key_len = key.len();
360    let mut i = group_start + 1;
361    while i < num_lines {
362        let candidate = line_content_at(data, line_starts, i, content_end);
363        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
364            return i;
365        }
366        i += 1;
367    }
368    i
369}
370
371/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
372/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
373/// General path: pre-computed line positions with binary search for groups.
374fn process_standard_bytes(
375    data: &[u8],
376    writer: &mut impl Write,
377    config: &UniqConfig,
378    term: u8,
379) -> io::Result<()> {
380    if data.is_empty() {
381        return Ok(());
382    }
383
384    let fast = !needs_key_extraction(config) && !config.ignore_case;
385    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
386
387    // Ultra-fast path: default mode, no count, no key extraction.
388    // Single-pass: scan with memchr, compare adjacent lines inline.
389    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
390    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
391        return process_default_fast_singlepass(data, writer, term);
392    }
393
394    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
395    if fast
396        && !config.count
397        && matches!(
398            config.mode,
399            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
400        )
401    {
402        return process_filter_fast_singlepass(data, writer, config, term);
403    }
404
405    // Ultra-fast path: count mode with no key extraction.
406    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
407    // Avoids the line_starts Vec allocation (20MB+ for large files).
408    if fast && config.count {
409        return process_count_fast_singlepass(data, writer, config, term);
410    }
411
412    // Fast path for case-insensitive (-i) mode with no key extraction.
413    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
414    // Avoids the general path's line_starts Vec allocation.
415    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
416        return process_default_ci_singlepass(data, writer, term);
417    }
418
419    if fast_ci
420        && !config.count
421        && matches!(
422            config.mode,
423            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
424        )
425    {
426        return process_filter_ci_singlepass(data, writer, config, term);
427    }
428
429    if fast_ci && config.count {
430        return process_count_ci_singlepass(data, writer, config, term);
431    }
432
433    // General path: pre-computed line positions for binary search on groups
434    let estimated_lines = (data.len() / 40).max(64);
435    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
436    line_starts.push(0);
437    for pos in memchr::memchr_iter(term, data) {
438        if pos + 1 < data.len() {
439            line_starts.push(pos + 1);
440        }
441    }
442    let num_lines = line_starts.len();
443    if num_lines == 0 {
444        return Ok(());
445    }
446
447    // Pre-compute content end: if data ends with terminator, exclude it for last line
448    let content_end = if data.last() == Some(&term) {
449        data.len() - 1
450    } else {
451        data.len()
452    };
453
454    // Ultra-fast path: default mode, no count, no key extraction
455    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
456        // Write first line
457        let first_full = line_full_at(data, &line_starts, 0);
458        let first_content = line_content_at(data, &line_starts, 0, content_end);
459        write_all_raw(writer, first_full)?;
460        if first_full.len() == first_content.len() {
461            writer.write_all(&[term])?;
462        }
463
464        let mut i = 1;
465        while i < num_lines {
466            let prev = line_content_at(data, &line_starts, i - 1, content_end);
467            let cur = line_content_at(data, &line_starts, i, content_end);
468
469            if lines_equal_fast(prev, cur) {
470                // Duplicate detected — linear scan for end of group
471                let group_end =
472                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
473                i = group_end;
474                continue;
475            }
476
477            // Unique line — write it
478            let cur_full = line_full_at(data, &line_starts, i);
479            write_all_raw(writer, cur_full)?;
480            if cur_full.len() == cur.len() {
481                writer.write_all(&[term])?;
482            }
483            i += 1;
484        }
485        return Ok(());
486    }
487
488    // General path with count tracking
489    let mut i = 0;
490    while i < num_lines {
491        let content = line_content_at(data, &line_starts, i, content_end);
492        let full = line_full_at(data, &line_starts, i);
493
494        let group_end = if fast
495            && i + 1 < num_lines
496            && lines_equal_fast(
497                content,
498                line_content_at(data, &line_starts, i + 1, content_end),
499            ) {
500            // Duplicate detected — linear scan for end
501            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
502        } else if !fast
503            && i + 1 < num_lines
504            && lines_equal(
505                content,
506                line_content_at(data, &line_starts, i + 1, content_end),
507                config,
508            )
509        {
510            // Slow path linear scan with key extraction
511            let mut j = i + 2;
512            while j < num_lines {
513                if !lines_equal(
514                    content,
515                    line_content_at(data, &line_starts, j, content_end),
516                    config,
517                ) {
518                    break;
519                }
520                j += 1;
521            }
522            j
523        } else {
524            i + 1
525        };
526
527        let count = (group_end - i) as u64;
528        output_group_bytes(writer, content, full, count, config, term)?;
529        i = group_end;
530    }
531
532    Ok(())
533}
534
535/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
536/// No pre-computed positions, no binary search, no Vec allocation.
537/// Outputs each line that differs from the previous.
538///
539/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
540/// independently, then cross-chunk boundaries are resolved.
541fn process_default_fast_singlepass(
542    data: &[u8],
543    writer: &mut impl Write,
544    term: u8,
545) -> io::Result<()> {
546    // Parallel path for large files — kick in at 4MB.
547    // Lower thresholds (e.g. 2MB) hurt performance on 10MB files because
548    // the parallel overhead dominates for smaller chunks.
549    if data.len() >= 4 * 1024 * 1024 {
550        return process_default_parallel(data, writer, term);
551    }
552
553    process_default_sequential(data, writer, term)
554}
555
556/// Sequential single-pass dedup with zero-copy output.
557/// Instead of copying data to a buffer, tracks contiguous output runs and writes
558/// directly from the original data. For all-unique data, this is a single write_all.
559///
560/// Optimized for the "many duplicates" case: caches the previous line's length
561/// and first-8-byte prefix for fast rejection of non-duplicates without
562/// calling the full comparison function.
563///
564/// Uses raw pointer arithmetic throughout to avoid bounds checking in the hot loop.
565fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
566    let data_len = data.len();
567    let base = data.as_ptr();
568    let mut prev_start: usize = 0;
569
570    // Find end of first line
571    let first_end: usize = match memchr::memchr(term, data) {
572        Some(pos) => pos,
573        None => {
574            // Single line, no terminator
575            writer.write_all(data)?;
576            return writer.write_all(&[term]);
577        }
578    };
579
580    // Cache previous line metadata for fast comparison
581    let mut prev_len = first_end - prev_start;
582    let mut prev_prefix: u64 = if prev_len >= 8 {
583        unsafe { (base.add(prev_start) as *const u64).read_unaligned() }
584    } else {
585        0
586    };
587
588    // run_start tracks the beginning of the current contiguous output region.
589    // When a duplicate is found, we flush the run up to the duplicate and skip it.
590    let mut run_start: usize = 0;
591    let mut cur_start = first_end + 1;
592    let mut last_output_end = first_end + 1; // exclusive end including terminator
593
594    while cur_start < data_len {
595        let cur_end = match memchr::memchr(term, unsafe {
596            std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
597        }) {
598            Some(offset) => cur_start + offset,
599            None => data_len, // last line without terminator
600        };
601
602        let cur_len = cur_end - cur_start;
603
604        // Fast reject: if lengths differ, lines are definitely not equal.
605        // This branch structure is ordered by frequency: length mismatch is
606        // most common for unique data, prefix mismatch next, full compare last.
607        let is_dup = if cur_len != prev_len {
608            false
609        } else if cur_len == 0 {
610            true
611        } else if cur_len >= 8 {
612            // Compare cached 8-byte prefix first
613            let cur_prefix = unsafe { (base.add(cur_start) as *const u64).read_unaligned() };
614            if cur_prefix != prev_prefix {
615                false
616            } else if cur_len <= 8 {
617                true // prefix covers entire line
618            } else if cur_len <= 16 {
619                // Check last 8 bytes (overlapping)
620                unsafe {
621                    let a_tail =
622                        (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
623                    let b_tail = (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
624                    a_tail == b_tail
625                }
626            } else if cur_len <= 32 {
627                // Check bytes 8-16 and last 8 bytes
628                unsafe {
629                    let a16 = (base.add(prev_start + 8) as *const u64).read_unaligned();
630                    let b16 = (base.add(cur_start + 8) as *const u64).read_unaligned();
631                    if a16 != b16 {
632                        false
633                    } else {
634                        let a_tail =
635                            (base.add(prev_start + prev_len - 8) as *const u64).read_unaligned();
636                        let b_tail =
637                            (base.add(cur_start + cur_len - 8) as *const u64).read_unaligned();
638                        a_tail == b_tail
639                    }
640                }
641            } else {
642                // For longer lines, use unsafe slice comparison to skip bounds checks
643                unsafe {
644                    let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
645                    let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
646                    a == b
647                }
648            }
649        } else {
650            // Short line < 8 bytes — direct byte comparison
651            unsafe {
652                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
653                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
654                a == b
655            }
656        };
657
658        if is_dup {
659            // Duplicate — flush the current run up to this line, then skip it
660            if run_start < cur_start {
661                writer.write_all(&data[run_start..cur_start])?;
662            }
663            // Start new run after this duplicate
664            run_start = if cur_end < data_len {
665                cur_end + 1
666            } else {
667                cur_end
668            };
669        } else {
670            // Different line — update cached comparison state
671            prev_start = cur_start;
672            prev_len = cur_len;
673            prev_prefix = if cur_len >= 8 {
674                unsafe { (base.add(cur_start) as *const u64).read_unaligned() }
675            } else {
676                0
677            };
678            last_output_end = if cur_end < data_len {
679                cur_end + 1
680            } else {
681                cur_end
682            };
683        }
684
685        if cur_end < data_len {
686            cur_start = cur_end + 1;
687        } else {
688            break;
689        }
690    }
691
692    // Flush remaining run
693    if run_start < data_len {
694        writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
695    }
696
697    // Ensure trailing terminator
698    if data_len > 0 && unsafe { *base.add(data_len - 1) } != term {
699        writer.write_all(&[term])?;
700    }
701
702    Ok(())
703}
704
705/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
706/// positions in each chunk in parallel, then write output runs directly from
707/// the original data. No per-chunk buffer allocation needed.
708fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
709    use rayon::prelude::*;
710
711    let num_threads = rayon::current_num_threads().max(1);
712    let chunk_target = data.len() / num_threads;
713
714    // Find chunk boundaries aligned to line terminators
715    let mut boundaries = Vec::with_capacity(num_threads + 1);
716    boundaries.push(0usize);
717    for i in 1..num_threads {
718        let target = i * chunk_target;
719        if target >= data.len() {
720            break;
721        }
722        if let Some(p) = memchr::memchr(term, &data[target..]) {
723            let b = target + p + 1;
724            if b > *boundaries.last().unwrap() && b <= data.len() {
725                boundaries.push(b);
726            }
727        }
728    }
729    boundaries.push(data.len());
730
731    let n_chunks = boundaries.len() - 1;
732    if n_chunks <= 1 {
733        return process_default_sequential(data, writer, term);
734    }
735
736    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
737    struct ChunkResult {
738        /// Byte ranges in the original data to output (contiguous runs)
739        runs: Vec<(usize, usize)>,
740        /// First line in chunk (absolute offsets into data, content without term)
741        first_line_start: usize,
742        first_line_end: usize,
743        /// Last *output* line in chunk (content without term)
744        last_line_start: usize,
745        last_line_end: usize,
746    }
747
748    let results: Vec<ChunkResult> = boundaries
749        .windows(2)
750        .collect::<Vec<_>>()
751        .par_iter()
752        .map(|w| {
753            let chunk_start = w[0];
754            let chunk_end = w[1];
755            let chunk = &data[chunk_start..chunk_end];
756
757            let first_term = match memchr::memchr(term, chunk) {
758                Some(pos) => pos,
759                None => {
760                    return ChunkResult {
761                        runs: vec![(chunk_start, chunk_end)],
762                        first_line_start: chunk_start,
763                        first_line_end: chunk_end,
764                        last_line_start: chunk_start,
765                        last_line_end: chunk_end,
766                    };
767                }
768            };
769
770            let first_line_start = chunk_start;
771            let first_line_end = chunk_start + first_term;
772
773            let mut runs: Vec<(usize, usize)> = Vec::new();
774            let mut run_start = chunk_start;
775            let mut prev_start = 0usize;
776            let mut prev_end = first_term;
777            let mut last_out_start = chunk_start;
778            let mut last_out_end = first_line_end;
779
780            let mut cur_start = first_term + 1;
781            while cur_start < chunk.len() {
782                let cur_end = match memchr::memchr(term, &chunk[cur_start..]) {
783                    Some(offset) => cur_start + offset,
784                    None => chunk.len(),
785                };
786
787                if lines_equal_fast(&chunk[prev_start..prev_end], &chunk[cur_start..cur_end]) {
788                    // Duplicate — flush current run up to this line
789                    let abs_cur = chunk_start + cur_start;
790                    if run_start < abs_cur {
791                        runs.push((run_start, abs_cur));
792                    }
793                    // New run starts after this duplicate
794                    run_start = chunk_start
795                        + if cur_end < chunk.len() {
796                            cur_end + 1
797                        } else {
798                            cur_end
799                        };
800                } else {
801                    last_out_start = chunk_start + cur_start;
802                    last_out_end = chunk_start + cur_end;
803                }
804                prev_start = cur_start;
805                prev_end = cur_end;
806
807                if cur_end < chunk.len() {
808                    cur_start = cur_end + 1;
809                } else {
810                    break;
811                }
812            }
813
814            // Close final run
815            if run_start < chunk_end {
816                runs.push((run_start, chunk_end));
817            }
818
819            ChunkResult {
820                runs,
821                first_line_start,
822                first_line_end,
823                last_line_start: last_out_start,
824                last_line_end: last_out_end,
825            }
826        })
827        .collect();
828
829    // Write results, adjusting cross-chunk boundaries
830    for (i, result) in results.iter().enumerate() {
831        let skip_first = if i > 0 {
832            let prev = &results[i - 1];
833            let prev_last = &data[prev.last_line_start..prev.last_line_end];
834            let cur_first = &data[result.first_line_start..result.first_line_end];
835            lines_equal_fast(prev_last, cur_first)
836        } else {
837            false
838        };
839
840        let skip_end = if skip_first {
841            // Skip bytes up to and including the first line's terminator
842            result.first_line_end + 1
843        } else {
844            0
845        };
846
847        for &(rs, re) in &result.runs {
848            let actual_start = rs.max(skip_end);
849            if actual_start < re {
850                writer.write_all(&data[actual_start..re])?;
851            }
852        }
853    }
854
855    // Ensure trailing terminator
856    if !data.is_empty() && *data.last().unwrap() != term {
857        writer.write_all(&[term])?;
858    }
859
860    Ok(())
861}
862
863/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
864/// Zero-copy: writes directly from input data, no output buffer allocation.
865/// Uses cached prefix comparison for fast duplicate detection.
866/// Uses raw pointer arithmetic throughout to avoid bounds checking.
867fn process_filter_fast_singlepass(
868    data: &[u8],
869    writer: &mut impl Write,
870    config: &UniqConfig,
871    term: u8,
872) -> io::Result<()> {
873    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
874    let data_len = data.len();
875    let base = data.as_ptr();
876
877    let first_term = match memchr::memchr(term, data) {
878        Some(pos) => pos,
879        None => {
880            // Single line: unique (count=1)
881            if !repeated {
882                writer.write_all(data)?;
883                writer.write_all(&[term])?;
884            }
885            return Ok(());
886        }
887    };
888
889    let mut prev_start: usize = 0;
890    let mut prev_end: usize = first_term;
891    let mut prev_len = prev_end;
892    let mut count: u64 = 1;
893    let mut cur_start = first_term + 1;
894
895    while cur_start < data_len {
896        let cur_end = match memchr::memchr(term, unsafe {
897            std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
898        }) {
899            Some(offset) => cur_start + offset,
900            None => data_len,
901        };
902
903        let cur_len = cur_end - cur_start;
904        let is_dup = cur_len == prev_len
905            && unsafe {
906                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
907                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
908                lines_equal_fast(a, b)
909            };
910
911        if is_dup {
912            count += 1;
913        } else {
914            // Output previous group -- write directly from input data (zero-copy)
915            let should_print = if repeated { count > 1 } else { count == 1 };
916            if should_print {
917                if prev_end < data_len && unsafe { *base.add(prev_end) } == term {
918                    writer.write_all(&data[prev_start..prev_end + 1])?;
919                } else {
920                    writer.write_all(&data[prev_start..prev_end])?;
921                    writer.write_all(&[term])?;
922                }
923            }
924            prev_start = cur_start;
925            prev_end = cur_end;
926            prev_len = cur_len;
927            count = 1;
928        }
929
930        if cur_end < data_len {
931            cur_start = cur_end + 1;
932        } else {
933            break;
934        }
935    }
936
937    // Output last group
938    let should_print = if repeated { count > 1 } else { count == 1 };
939    if should_print {
940        if prev_end < data_len && unsafe { *base.add(prev_end) } == term {
941            writer.write_all(&data[prev_start..prev_end + 1])?;
942        } else {
943            writer.write_all(&data[prev_start..prev_end])?;
944            writer.write_all(&[term])?;
945        }
946    }
947
948    Ok(())
949}
950
951/// Fast single-pass for count mode (-c) with all standard output modes.
952/// Zero line_starts allocation: scans with memchr, counts groups inline,
953/// and writes count-prefixed lines directly.
954/// Uses cached length comparison for fast duplicate rejection.
955/// Uses raw pointer arithmetic to avoid bounds checking.
956fn process_count_fast_singlepass(
957    data: &[u8],
958    writer: &mut impl Write,
959    config: &UniqConfig,
960    term: u8,
961) -> io::Result<()> {
962    let data_len = data.len();
963    let base = data.as_ptr();
964
965    let first_term = match memchr::memchr(term, data) {
966        Some(pos) => pos,
967        None => {
968            // Single line: count=1
969            let should_print = match config.mode {
970                OutputMode::Default => true,
971                OutputMode::RepeatedOnly => false,
972                OutputMode::UniqueOnly => true,
973                _ => true,
974            };
975            if should_print {
976                write_count_line(writer, 1, data, term)?;
977            }
978            return Ok(());
979        }
980    };
981
982    let mut prev_start: usize = 0;
983    let mut prev_end: usize = first_term;
984    let mut prev_len = prev_end;
985    let mut count: u64 = 1;
986    let mut cur_start = first_term + 1;
987
988    while cur_start < data_len {
989        let cur_end = match memchr::memchr(term, unsafe {
990            std::slice::from_raw_parts(base.add(cur_start), data_len - cur_start)
991        }) {
992            Some(offset) => cur_start + offset,
993            None => data_len,
994        };
995
996        let cur_len = cur_end - cur_start;
997        let is_dup = cur_len == prev_len
998            && unsafe {
999                let a = std::slice::from_raw_parts(base.add(prev_start), prev_len);
1000                let b = std::slice::from_raw_parts(base.add(cur_start), cur_len);
1001                lines_equal_fast(a, b)
1002            };
1003
1004        if is_dup {
1005            count += 1;
1006        } else {
1007            // Output previous group with count
1008            let should_print = match config.mode {
1009                OutputMode::Default => true,
1010                OutputMode::RepeatedOnly => count > 1,
1011                OutputMode::UniqueOnly => count == 1,
1012                _ => true,
1013            };
1014            if should_print {
1015                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1016            }
1017            prev_start = cur_start;
1018            prev_end = cur_end;
1019            prev_len = cur_len;
1020            count = 1;
1021        }
1022
1023        if cur_end < data_len {
1024            cur_start = cur_end + 1;
1025        } else {
1026            break;
1027        }
1028    }
1029
1030    // Output last group
1031    let should_print = match config.mode {
1032        OutputMode::Default => true,
1033        OutputMode::RepeatedOnly => count > 1,
1034        OutputMode::UniqueOnly => count == 1,
1035        _ => true,
1036    };
1037    if should_print {
1038        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1039    }
1040
1041    Ok(())
1042}
1043
1044/// Fast single-pass for case-insensitive (-i) default mode.
1045/// Same logic as process_default_sequential but uses eq_ignore_ascii_case.
1046fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1047    let mut prev_start: usize = 0;
1048    let mut prev_end: usize;
1049
1050    match memchr::memchr(term, data) {
1051        Some(pos) => {
1052            prev_end = pos;
1053        }
1054        None => {
1055            writer.write_all(data)?;
1056            return writer.write_all(&[term]);
1057        }
1058    }
1059
1060    // Write first line
1061    writer.write_all(&data[..prev_end + 1])?;
1062
1063    let mut cur_start = prev_end + 1;
1064
1065    while cur_start < data.len() {
1066        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1067            Some(offset) => cur_start + offset,
1068            None => data.len(),
1069        };
1070
1071        let prev_content = &data[prev_start..prev_end];
1072        let cur_content = &data[cur_start..cur_end];
1073
1074        if !lines_equal_case_insensitive(prev_content, cur_content) {
1075            // Different line — write it
1076            if cur_end < data.len() {
1077                writer.write_all(&data[cur_start..cur_end + 1])?;
1078            } else {
1079                writer.write_all(&data[cur_start..cur_end])?;
1080                writer.write_all(&[term])?;
1081            }
1082            prev_start = cur_start;
1083            prev_end = cur_end;
1084        }
1085
1086        if cur_end < data.len() {
1087            cur_start = cur_end + 1;
1088        } else {
1089            break;
1090        }
1091    }
1092
1093    Ok(())
1094}
1095
1096/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1097fn process_filter_ci_singlepass(
1098    data: &[u8],
1099    writer: &mut impl Write,
1100    config: &UniqConfig,
1101    term: u8,
1102) -> io::Result<()> {
1103    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1104
1105    let first_term = match memchr::memchr(term, data) {
1106        Some(pos) => pos,
1107        None => {
1108            if !repeated {
1109                writer.write_all(data)?;
1110                writer.write_all(&[term])?;
1111            }
1112            return Ok(());
1113        }
1114    };
1115
1116    let mut prev_start: usize = 0;
1117    let mut prev_end: usize = first_term;
1118    let mut count: u64 = 1;
1119    let mut cur_start = first_term + 1;
1120
1121    while cur_start < data.len() {
1122        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1123            Some(offset) => cur_start + offset,
1124            None => data.len(),
1125        };
1126
1127        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1128            count += 1;
1129        } else {
1130            let should_print = if repeated { count > 1 } else { count == 1 };
1131            if should_print {
1132                if prev_end < data.len() && data[prev_end] == term {
1133                    writer.write_all(&data[prev_start..prev_end + 1])?;
1134                } else {
1135                    writer.write_all(&data[prev_start..prev_end])?;
1136                    writer.write_all(&[term])?;
1137                }
1138            }
1139            prev_start = cur_start;
1140            prev_end = cur_end;
1141            count = 1;
1142        }
1143
1144        if cur_end < data.len() {
1145            cur_start = cur_end + 1;
1146        } else {
1147            break;
1148        }
1149    }
1150
1151    let should_print = if repeated { count > 1 } else { count == 1 };
1152    if should_print {
1153        if prev_end < data.len() && data[prev_end] == term {
1154            writer.write_all(&data[prev_start..prev_end + 1])?;
1155        } else {
1156            writer.write_all(&data[prev_start..prev_end])?;
1157            writer.write_all(&[term])?;
1158        }
1159    }
1160
1161    Ok(())
1162}
1163
1164/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1165fn process_count_ci_singlepass(
1166    data: &[u8],
1167    writer: &mut impl Write,
1168    config: &UniqConfig,
1169    term: u8,
1170) -> io::Result<()> {
1171    let first_term = match memchr::memchr(term, data) {
1172        Some(pos) => pos,
1173        None => {
1174            let should_print = match config.mode {
1175                OutputMode::Default => true,
1176                OutputMode::RepeatedOnly => false,
1177                OutputMode::UniqueOnly => true,
1178                _ => true,
1179            };
1180            if should_print {
1181                write_count_line(writer, 1, data, term)?;
1182            }
1183            return Ok(());
1184        }
1185    };
1186
1187    let mut prev_start: usize = 0;
1188    let mut prev_end: usize = first_term;
1189    let mut count: u64 = 1;
1190    let mut cur_start = first_term + 1;
1191
1192    while cur_start < data.len() {
1193        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1194            Some(offset) => cur_start + offset,
1195            None => data.len(),
1196        };
1197
1198        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1199            count += 1;
1200        } else {
1201            let should_print = match config.mode {
1202                OutputMode::Default => true,
1203                OutputMode::RepeatedOnly => count > 1,
1204                OutputMode::UniqueOnly => count == 1,
1205                _ => true,
1206            };
1207            if should_print {
1208                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1209            }
1210            prev_start = cur_start;
1211            prev_end = cur_end;
1212            count = 1;
1213        }
1214
1215        if cur_end < data.len() {
1216            cur_start = cur_end + 1;
1217        } else {
1218            break;
1219        }
1220    }
1221
1222    let should_print = match config.mode {
1223        OutputMode::Default => true,
1224        OutputMode::RepeatedOnly => count > 1,
1225        OutputMode::UniqueOnly => count == 1,
1226        _ => true,
1227    };
1228    if should_print {
1229        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1230    }
1231
1232    Ok(())
1233}
1234
1235/// Output a group for standard modes (bytes path).
1236#[inline(always)]
1237fn output_group_bytes(
1238    writer: &mut impl Write,
1239    content: &[u8],
1240    full: &[u8],
1241    count: u64,
1242    config: &UniqConfig,
1243    term: u8,
1244) -> io::Result<()> {
1245    let should_print = match config.mode {
1246        OutputMode::Default => true,
1247        OutputMode::RepeatedOnly => count > 1,
1248        OutputMode::UniqueOnly => count == 1,
1249        _ => true,
1250    };
1251
1252    if should_print {
1253        if config.count {
1254            write_count_line(writer, count, content, term)?;
1255        } else {
1256            writer.write_all(full)?;
1257            // Add terminator if the original line didn't have one
1258            if full.len() == content.len() {
1259                writer.write_all(&[term])?;
1260            }
1261        }
1262    }
1263
1264    Ok(())
1265}
1266
1267/// Process --all-repeated / -D mode on byte slices.
1268fn process_all_repeated_bytes(
1269    data: &[u8],
1270    writer: &mut impl Write,
1271    config: &UniqConfig,
1272    method: AllRepeatedMethod,
1273    term: u8,
1274) -> io::Result<()> {
1275    let mut lines = LineIter::new(data, term);
1276
1277    let first = match lines.next() {
1278        Some(v) => v,
1279        None => return Ok(()),
1280    };
1281
1282    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1283    // For all-repeated we need to buffer group lines since we only print if count > 1
1284    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1285    group_lines.push(first);
1286    let mut first_group_printed = false;
1287
1288    let fast = !needs_key_extraction(config) && !config.ignore_case;
1289
1290    for (cur_content, cur_full) in lines {
1291        let prev_content = group_lines.last().unwrap().0;
1292        let equal = if fast {
1293            lines_equal_fast(prev_content, cur_content)
1294        } else {
1295            lines_equal(prev_content, cur_content, config)
1296        };
1297
1298        if equal {
1299            group_lines.push((cur_content, cur_full));
1300        } else {
1301            // Flush group
1302            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1303            group_lines.clear();
1304            group_lines.push((cur_content, cur_full));
1305        }
1306    }
1307
1308    // Flush last group
1309    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1310
1311    Ok(())
1312}
1313
1314/// Flush a group for --all-repeated mode (bytes path).
1315fn flush_all_repeated_bytes(
1316    writer: &mut impl Write,
1317    group: &[(&[u8], &[u8])],
1318    method: AllRepeatedMethod,
1319    first_group_printed: &mut bool,
1320    term: u8,
1321) -> io::Result<()> {
1322    if group.len() <= 1 {
1323        return Ok(()); // Not a duplicate group
1324    }
1325
1326    match method {
1327        AllRepeatedMethod::Prepend => {
1328            writer.write_all(&[term])?;
1329        }
1330        AllRepeatedMethod::Separate => {
1331            if *first_group_printed {
1332                writer.write_all(&[term])?;
1333            }
1334        }
1335        AllRepeatedMethod::None => {}
1336    }
1337
1338    for &(content, full) in group {
1339        writer.write_all(full)?;
1340        if full.len() == content.len() {
1341            writer.write_all(&[term])?;
1342        }
1343    }
1344
1345    *first_group_printed = true;
1346    Ok(())
1347}
1348
1349/// Process --group mode on byte slices.
1350fn process_group_bytes(
1351    data: &[u8],
1352    writer: &mut impl Write,
1353    config: &UniqConfig,
1354    method: GroupMethod,
1355    term: u8,
1356) -> io::Result<()> {
1357    let mut lines = LineIter::new(data, term);
1358
1359    let (prev_content, prev_full) = match lines.next() {
1360        Some(v) => v,
1361        None => return Ok(()),
1362    };
1363
1364    // Prepend/Both: separator before first group
1365    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1366        writer.write_all(&[term])?;
1367    }
1368
1369    // Write first line
1370    writer.write_all(prev_full)?;
1371    if prev_full.len() == prev_content.len() {
1372        writer.write_all(&[term])?;
1373    }
1374
1375    let mut prev_content = prev_content;
1376    let fast = !needs_key_extraction(config) && !config.ignore_case;
1377
1378    for (cur_content, cur_full) in lines {
1379        let equal = if fast {
1380            lines_equal_fast(prev_content, cur_content)
1381        } else {
1382            lines_equal(prev_content, cur_content, config)
1383        };
1384
1385        if !equal {
1386            // New group — write separator
1387            writer.write_all(&[term])?;
1388        }
1389
1390        writer.write_all(cur_full)?;
1391        if cur_full.len() == cur_content.len() {
1392            writer.write_all(&[term])?;
1393        }
1394
1395        prev_content = cur_content;
1396    }
1397
1398    // Append/Both: separator after last group
1399    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1400        writer.write_all(&[term])?;
1401    }
1402
1403    Ok(())
1404}
1405
1406// ============================================================================
1407// Streaming processing (for stdin / pipe input)
1408// ============================================================================
1409
1410/// Main streaming uniq processor.
1411/// Reads from `input`, writes to `output`.
1412pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
1413    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
1414    let mut writer = BufWriter::with_capacity(32 * 1024 * 1024, output);
1415    let term = if config.zero_terminated { b'\0' } else { b'\n' };
1416
1417    match config.mode {
1418        OutputMode::Group(method) => {
1419            process_group_stream(reader, &mut writer, config, method, term)?;
1420        }
1421        OutputMode::AllRepeated(method) => {
1422            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
1423        }
1424        _ => {
1425            process_standard_stream(reader, &mut writer, config, term)?;
1426        }
1427    }
1428
1429    writer.flush()?;
1430    Ok(())
1431}
1432
1433/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
1434fn process_standard_stream<R: BufRead, W: Write>(
1435    mut reader: R,
1436    writer: &mut W,
1437    config: &UniqConfig,
1438    term: u8,
1439) -> io::Result<()> {
1440    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1441    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1442
1443    // Read first line
1444    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1445        return Ok(()); // empty input
1446    }
1447    let mut count: u64 = 1;
1448
1449    loop {
1450        current_line.clear();
1451        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1452
1453        if bytes_read == 0 {
1454            // End of input — output the last group
1455            output_group_stream(writer, &prev_line, count, config, term)?;
1456            break;
1457        }
1458
1459        if compare_lines_stream(&prev_line, &current_line, config, term) {
1460            count += 1;
1461        } else {
1462            output_group_stream(writer, &prev_line, count, config, term)?;
1463            std::mem::swap(&mut prev_line, &mut current_line);
1464            count = 1;
1465        }
1466    }
1467
1468    Ok(())
1469}
1470
1471/// Compare two lines (with terminators) in streaming mode.
1472#[inline(always)]
1473fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
1474    let a_stripped = strip_term(a, term);
1475    let b_stripped = strip_term(b, term);
1476    lines_equal(a_stripped, b_stripped, config)
1477}
1478
1479/// Strip terminator from end of line.
1480#[inline(always)]
1481fn strip_term(line: &[u8], term: u8) -> &[u8] {
1482    if line.last() == Some(&term) {
1483        &line[..line.len() - 1]
1484    } else {
1485        line
1486    }
1487}
1488
1489/// Output a group in streaming mode.
1490#[inline(always)]
1491fn output_group_stream(
1492    writer: &mut impl Write,
1493    line: &[u8],
1494    count: u64,
1495    config: &UniqConfig,
1496    term: u8,
1497) -> io::Result<()> {
1498    let should_print = match config.mode {
1499        OutputMode::Default => true,
1500        OutputMode::RepeatedOnly => count > 1,
1501        OutputMode::UniqueOnly => count == 1,
1502        _ => true,
1503    };
1504
1505    if should_print {
1506        let content = strip_term(line, term);
1507        if config.count {
1508            write_count_line(writer, count, content, term)?;
1509        } else {
1510            writer.write_all(content)?;
1511            writer.write_all(&[term])?;
1512        }
1513    }
1514
1515    Ok(())
1516}
1517
1518/// Process --all-repeated / -D mode (streaming).
1519fn process_all_repeated_stream<R: BufRead, W: Write>(
1520    mut reader: R,
1521    writer: &mut W,
1522    config: &UniqConfig,
1523    method: AllRepeatedMethod,
1524    term: u8,
1525) -> io::Result<()> {
1526    let mut group: Vec<Vec<u8>> = Vec::new();
1527    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1528    let mut first_group_printed = false;
1529
1530    current_line.clear();
1531    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
1532        return Ok(());
1533    }
1534    group.push(current_line.clone());
1535
1536    loop {
1537        current_line.clear();
1538        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1539
1540        if bytes_read == 0 {
1541            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1542            break;
1543        }
1544
1545        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
1546            group.push(current_line.clone());
1547        } else {
1548            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1549            group.clear();
1550            group.push(current_line.clone());
1551        }
1552    }
1553
1554    Ok(())
1555}
1556
1557/// Flush a group for --all-repeated mode (streaming).
1558fn flush_all_repeated_stream(
1559    writer: &mut impl Write,
1560    group: &[Vec<u8>],
1561    method: AllRepeatedMethod,
1562    first_group_printed: &mut bool,
1563    term: u8,
1564) -> io::Result<()> {
1565    if group.len() <= 1 {
1566        return Ok(());
1567    }
1568
1569    match method {
1570        AllRepeatedMethod::Prepend => {
1571            writer.write_all(&[term])?;
1572        }
1573        AllRepeatedMethod::Separate => {
1574            if *first_group_printed {
1575                writer.write_all(&[term])?;
1576            }
1577        }
1578        AllRepeatedMethod::None => {}
1579    }
1580
1581    for line in group {
1582        let content = strip_term(line, term);
1583        writer.write_all(content)?;
1584        writer.write_all(&[term])?;
1585    }
1586
1587    *first_group_printed = true;
1588    Ok(())
1589}
1590
1591/// Process --group mode (streaming).
1592fn process_group_stream<R: BufRead, W: Write>(
1593    mut reader: R,
1594    writer: &mut W,
1595    config: &UniqConfig,
1596    method: GroupMethod,
1597    term: u8,
1598) -> io::Result<()> {
1599    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1600    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1601
1602    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1603        return Ok(());
1604    }
1605
1606    // Prepend/Both: separator before first group
1607    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1608        writer.write_all(&[term])?;
1609    }
1610
1611    let content = strip_term(&prev_line, term);
1612    writer.write_all(content)?;
1613    writer.write_all(&[term])?;
1614
1615    loop {
1616        current_line.clear();
1617        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1618
1619        if bytes_read == 0 {
1620            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1621                writer.write_all(&[term])?;
1622            }
1623            break;
1624        }
1625
1626        if !compare_lines_stream(&prev_line, &current_line, config, term) {
1627            writer.write_all(&[term])?;
1628        }
1629
1630        let content = strip_term(&current_line, term);
1631        writer.write_all(content)?;
1632        writer.write_all(&[term])?;
1633
1634        std::mem::swap(&mut prev_line, &mut current_line);
1635    }
1636
1637    Ok(())
1638}
1639
1640/// Read a line terminated by the given byte (newline or NUL).
1641/// Returns number of bytes read (0 = EOF).
1642#[inline(always)]
1643fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1644    reader.read_until(term, buf)
1645}