Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
119/// Uses length check + 8-byte prefix rejection before full comparison.
120#[inline(always)]
121fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
122    let alen = a.len();
123    if alen != b.len() {
124        return false;
125    }
126    if alen == 0 {
127        return true;
128    }
129    a.eq_ignore_ascii_case(b)
130}
131
132/// Check if config requires field/char skipping or char limiting.
133#[inline(always)]
134fn needs_key_extraction(config: &UniqConfig) -> bool {
135    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
136}
137
138/// Fast path comparison: no field/char extraction needed, no case folding.
139/// Uses pointer+length equality shortcut and multi-word prefix rejection.
140/// For short lines (<= 32 bytes, common in many-dups data), avoids the
141/// full memcmp call overhead by doing direct word comparisons.
142#[inline(always)]
143fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
144    let alen = a.len();
145    if alen != b.len() {
146        return false;
147    }
148    if alen == 0 {
149        return true;
150    }
151    // Short-line fast path: compare via word loads to avoid memcmp call overhead
152    if alen <= 8 {
153        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
154        return a == b;
155    }
156    // 8-byte prefix check: reject most non-equal lines without full memcmp
157    let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
158    let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
159    if a8 != b8 {
160        return false;
161    }
162    // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
163    if alen <= 16 {
164        let a_tail = unsafe { (a.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
165        let b_tail = unsafe { (b.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
166        return a_tail == b_tail;
167    }
168    // For 17-32 bytes: check first 16 + last 16 (overlapping) to avoid memcmp
169    if alen <= 32 {
170        let a16 = unsafe { (a.as_ptr().add(8) as *const u64).read_unaligned() };
171        let b16 = unsafe { (b.as_ptr().add(8) as *const u64).read_unaligned() };
172        if a16 != b16 {
173            return false;
174        }
175        let a_tail = unsafe { (a.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
176        let b_tail = unsafe { (b.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
177        return a_tail == b_tail;
178    }
179    // Longer lines: prefix passed, fall through to full memcmp
180    a == b
181}
182
183/// Write a count-prefixed line in GNU uniq format.
184/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
185/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
186#[inline(always)]
187fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
188    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
189    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
190    let digits = itoa_right_aligned_into(&mut prefix, count);
191    let width = digits.max(7); // minimum 7 chars
192    let prefix_len = width + 1; // +1 for trailing space
193    prefix[width] = b' ';
194
195    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
196    let total = prefix_len + line.len() + 1;
197    if total <= 256 {
198        let mut buf = [0u8; 256];
199        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
200        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
201        buf[prefix_len + line.len()] = term;
202        out.write_all(&buf[..total])
203    } else {
204        out.write_all(&prefix[..prefix_len])?;
205        out.write_all(line)?;
206        out.write_all(&[term])
207    }
208}
209
210/// Write u64 decimal right-aligned into prefix buffer.
211/// Buffer is pre-filled with spaces. Returns number of digits written.
212#[inline(always)]
213fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
214    if val == 0 {
215        buf[6] = b'0';
216        return 7; // 6 spaces + '0' = 7 chars
217    }
218    // Write digits right-to-left from position 27 (leaving room for trailing space)
219    let mut pos = 27;
220    while val > 0 {
221        pos -= 1;
222        buf[pos] = b'0' + (val % 10) as u8;
223        val /= 10;
224    }
225    let num_digits = 27 - pos;
226    if num_digits >= 7 {
227        // Number is wide enough, shift to front
228        buf.copy_within(pos..27, 0);
229        num_digits
230    } else {
231        // Right-align in 7-char field: spaces then digits
232        let pad = 7 - num_digits;
233        buf.copy_within(pos..27, pad);
234        // buf[0..pad] is already spaces from initialization
235        7
236    }
237}
238
239// ============================================================================
240// High-performance mmap-based processing (for byte slices, zero-copy)
241// ============================================================================
242
243/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
244pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
245    // 16MB output buffer for fewer flush syscalls on large inputs
246    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
247    let term = if config.zero_terminated { b'\0' } else { b'\n' };
248
249    match config.mode {
250        OutputMode::Group(method) => {
251            process_group_bytes(data, &mut writer, config, method, term)?;
252        }
253        OutputMode::AllRepeated(method) => {
254            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
255        }
256        _ => {
257            process_standard_bytes(data, &mut writer, config, term)?;
258        }
259    }
260
261    writer.flush()?;
262    Ok(())
263}
264
265/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
266/// Uses memchr for SIMD-accelerated line boundary detection.
267struct LineIter<'a> {
268    data: &'a [u8],
269    pos: usize,
270    term: u8,
271}
272
273impl<'a> LineIter<'a> {
274    #[inline(always)]
275    fn new(data: &'a [u8], term: u8) -> Self {
276        Self { data, pos: 0, term }
277    }
278}
279
280impl<'a> Iterator for LineIter<'a> {
281    /// (line content without terminator, full line including terminator for output)
282    type Item = (&'a [u8], &'a [u8]);
283
284    #[inline(always)]
285    fn next(&mut self) -> Option<Self::Item> {
286        if self.pos >= self.data.len() {
287            return None;
288        }
289
290        let remaining = &self.data[self.pos..];
291        match memchr::memchr(self.term, remaining) {
292            Some(idx) => {
293                let line_start = self.pos;
294                let line_end = self.pos + idx; // without terminator
295                let full_end = self.pos + idx + 1; // with terminator
296                self.pos = full_end;
297                Some((
298                    &self.data[line_start..line_end],
299                    &self.data[line_start..full_end],
300                ))
301            }
302            None => {
303                // Last line without terminator
304                let line_start = self.pos;
305                self.pos = self.data.len();
306                let line = &self.data[line_start..];
307                Some((line, line))
308            }
309        }
310    }
311}
312
313/// Get line content (without terminator) from pre-computed positions.
314/// `content_end` is the end of actual content (excludes trailing terminator if present).
315#[inline(always)]
316fn line_content_at<'a>(
317    data: &'a [u8],
318    line_starts: &[usize],
319    idx: usize,
320    content_end: usize,
321) -> &'a [u8] {
322    let start = line_starts[idx];
323    let end = if idx + 1 < line_starts.len() {
324        line_starts[idx + 1] - 1 // exclude terminator
325    } else {
326        content_end // last line: pre-computed to exclude trailing terminator
327    };
328    &data[start..end]
329}
330
331/// Get full line (with terminator) from pre-computed positions.
332#[inline(always)]
333fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
334    let start = line_starts[idx];
335    let end = if idx + 1 < line_starts.len() {
336        line_starts[idx + 1] // include terminator
337    } else {
338        data.len()
339    };
340    &data[start..end]
341}
342
343/// Linear scan for the end of a duplicate group.
344/// Returns the index of the first line that differs from line_starts[group_start].
345/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
346/// equal lines can appear in non-adjacent groups separated by different lines.
347/// Caches key length for fast length-mismatch rejection.
348#[inline]
349fn linear_scan_group_end(
350    data: &[u8],
351    line_starts: &[usize],
352    group_start: usize,
353    num_lines: usize,
354    content_end: usize,
355) -> usize {
356    let key = line_content_at(data, line_starts, group_start, content_end);
357    let key_len = key.len();
358    let mut i = group_start + 1;
359    while i < num_lines {
360        let candidate = line_content_at(data, line_starts, i, content_end);
361        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
362            return i;
363        }
364        i += 1;
365    }
366    i
367}
368
369/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
370/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
371/// General path: pre-computed line positions with binary search for groups.
372fn process_standard_bytes(
373    data: &[u8],
374    writer: &mut impl Write,
375    config: &UniqConfig,
376    term: u8,
377) -> io::Result<()> {
378    if data.is_empty() {
379        return Ok(());
380    }
381
382    let fast = !needs_key_extraction(config) && !config.ignore_case;
383    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
384
385    // Ultra-fast path: default mode, no count, no key extraction.
386    // Single-pass: scan with memchr, compare adjacent lines inline.
387    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
388    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
389        return process_default_fast_singlepass(data, writer, term);
390    }
391
392    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
393    if fast
394        && !config.count
395        && matches!(
396            config.mode,
397            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
398        )
399    {
400        return process_filter_fast_singlepass(data, writer, config, term);
401    }
402
403    // Ultra-fast path: count mode with no key extraction.
404    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
405    // Avoids the line_starts Vec allocation (20MB+ for large files).
406    if fast && config.count {
407        return process_count_fast_singlepass(data, writer, config, term);
408    }
409
410    // Fast path for case-insensitive (-i) mode with no key extraction.
411    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
412    // Avoids the general path's line_starts Vec allocation.
413    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
414        return process_default_ci_singlepass(data, writer, term);
415    }
416
417    if fast_ci
418        && !config.count
419        && matches!(
420            config.mode,
421            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
422        )
423    {
424        return process_filter_ci_singlepass(data, writer, config, term);
425    }
426
427    if fast_ci && config.count {
428        return process_count_ci_singlepass(data, writer, config, term);
429    }
430
431    // General path: pre-computed line positions for binary search on groups
432    let estimated_lines = (data.len() / 40).max(64);
433    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
434    line_starts.push(0);
435    for pos in memchr::memchr_iter(term, data) {
436        if pos + 1 < data.len() {
437            line_starts.push(pos + 1);
438        }
439    }
440    let num_lines = line_starts.len();
441    if num_lines == 0 {
442        return Ok(());
443    }
444
445    // Pre-compute content end: if data ends with terminator, exclude it for last line
446    let content_end = if data.last() == Some(&term) {
447        data.len() - 1
448    } else {
449        data.len()
450    };
451
452    // Ultra-fast path: default mode, no count, no key extraction
453    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
454        // Write first line
455        let first_full = line_full_at(data, &line_starts, 0);
456        let first_content = line_content_at(data, &line_starts, 0, content_end);
457        write_all_raw(writer, first_full)?;
458        if first_full.len() == first_content.len() {
459            writer.write_all(&[term])?;
460        }
461
462        let mut i = 1;
463        while i < num_lines {
464            let prev = line_content_at(data, &line_starts, i - 1, content_end);
465            let cur = line_content_at(data, &line_starts, i, content_end);
466
467            if lines_equal_fast(prev, cur) {
468                // Duplicate detected — linear scan for end of group
469                let group_end =
470                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
471                i = group_end;
472                continue;
473            }
474
475            // Unique line — write it
476            let cur_full = line_full_at(data, &line_starts, i);
477            write_all_raw(writer, cur_full)?;
478            if cur_full.len() == cur.len() {
479                writer.write_all(&[term])?;
480            }
481            i += 1;
482        }
483        return Ok(());
484    }
485
486    // General path with count tracking
487    let mut i = 0;
488    while i < num_lines {
489        let content = line_content_at(data, &line_starts, i, content_end);
490        let full = line_full_at(data, &line_starts, i);
491
492        let group_end = if fast
493            && i + 1 < num_lines
494            && lines_equal_fast(
495                content,
496                line_content_at(data, &line_starts, i + 1, content_end),
497            ) {
498            // Duplicate detected — linear scan for end
499            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
500        } else if !fast
501            && i + 1 < num_lines
502            && lines_equal(
503                content,
504                line_content_at(data, &line_starts, i + 1, content_end),
505                config,
506            )
507        {
508            // Slow path linear scan with key extraction
509            let mut j = i + 2;
510            while j < num_lines {
511                if !lines_equal(
512                    content,
513                    line_content_at(data, &line_starts, j, content_end),
514                    config,
515                ) {
516                    break;
517                }
518                j += 1;
519            }
520            j
521        } else {
522            i + 1
523        };
524
525        let count = (group_end - i) as u64;
526        output_group_bytes(writer, content, full, count, config, term)?;
527        i = group_end;
528    }
529
530    Ok(())
531}
532
533/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
534/// No pre-computed positions, no binary search, no Vec allocation.
535/// Outputs each line that differs from the previous.
536///
537/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
538/// independently, then cross-chunk boundaries are resolved.
539fn process_default_fast_singlepass(
540    data: &[u8],
541    writer: &mut impl Write,
542    term: u8,
543) -> io::Result<()> {
544    // Parallel path for large files
545    if data.len() >= 4 * 1024 * 1024 {
546        return process_default_parallel(data, writer, term);
547    }
548
549    process_default_sequential(data, writer, term)
550}
551
552/// Sequential single-pass dedup with zero-copy output.
553/// Instead of copying data to a buffer, tracks contiguous output runs and writes
554/// directly from the original data. For all-unique data, this is a single write_all.
555///
556/// Optimized for the "many duplicates" case: caches the previous line's length
557/// and first-8-byte prefix for fast rejection of non-duplicates without
558/// calling the full comparison function.
559fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
560    let mut prev_start: usize = 0;
561    let mut prev_end: usize; // exclusive, without terminator
562
563    // Find end of first line
564    match memchr::memchr(term, data) {
565        Some(pos) => {
566            prev_end = pos;
567        }
568        None => {
569            // Single line, no terminator
570            writer.write_all(data)?;
571            return writer.write_all(&[term]);
572        }
573    }
574
575    // Cache previous line metadata for fast comparison
576    let mut prev_len = prev_end - prev_start;
577    let mut prev_prefix: u64 = if prev_len >= 8 {
578        unsafe { (data.as_ptr().add(prev_start) as *const u64).read_unaligned() }
579    } else {
580        0
581    };
582
583    // run_start tracks the beginning of the current contiguous output region.
584    // When a duplicate is found, we flush the run up to the duplicate and skip it.
585    let mut run_start: usize = 0;
586    let mut cur_start = prev_end + 1;
587    let mut last_output_end = prev_end + 1; // exclusive end including terminator
588
589    while cur_start < data.len() {
590        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
591            Some(offset) => cur_start + offset,
592            None => data.len(), // last line without terminator
593        };
594
595        let cur_len = cur_end - cur_start;
596
597        // Fast reject: if lengths differ, lines are definitely not equal
598        let is_dup = if cur_len != prev_len {
599            false
600        } else if cur_len == 0 {
601            true
602        } else if cur_len >= 8 {
603            // Compare cached 8-byte prefix first
604            let cur_prefix =
605                unsafe { (data.as_ptr().add(cur_start) as *const u64).read_unaligned() };
606            if cur_prefix != prev_prefix {
607                false
608            } else if cur_len <= 8 {
609                true // prefix covers entire line
610            } else if cur_len <= 16 {
611                // Check last 8 bytes (overlapping)
612                let a_tail = unsafe {
613                    (data.as_ptr().add(prev_start + prev_len - 8) as *const u64).read_unaligned()
614                };
615                let b_tail = unsafe {
616                    (data.as_ptr().add(cur_start + cur_len - 8) as *const u64).read_unaligned()
617                };
618                a_tail == b_tail
619            } else if cur_len <= 32 {
620                // Check bytes 8-16 and last 8 bytes
621                let a16 =
622                    unsafe { (data.as_ptr().add(prev_start + 8) as *const u64).read_unaligned() };
623                let b16 =
624                    unsafe { (data.as_ptr().add(cur_start + 8) as *const u64).read_unaligned() };
625                if a16 != b16 {
626                    false
627                } else {
628                    let a_tail = unsafe {
629                        (data.as_ptr().add(prev_start + prev_len - 8) as *const u64)
630                            .read_unaligned()
631                    };
632                    let b_tail = unsafe {
633                        (data.as_ptr().add(cur_start + cur_len - 8) as *const u64).read_unaligned()
634                    };
635                    a_tail == b_tail
636                }
637            } else {
638                data[prev_start..prev_end] == data[cur_start..cur_end]
639            }
640        } else {
641            // Short line < 8 bytes
642            data[prev_start..prev_end] == data[cur_start..cur_end]
643        };
644
645        if is_dup {
646            // Duplicate — flush the current run up to this line, then skip it
647            if run_start < cur_start {
648                writer.write_all(&data[run_start..cur_start])?;
649            }
650            // Start new run after this duplicate
651            if cur_end < data.len() {
652                run_start = cur_end + 1;
653            } else {
654                run_start = cur_end;
655            }
656        } else {
657            // Different line — update cached comparison state
658            prev_start = cur_start;
659            prev_end = cur_end;
660            prev_len = cur_len;
661            prev_prefix = if cur_len >= 8 {
662                unsafe { (data.as_ptr().add(cur_start) as *const u64).read_unaligned() }
663            } else {
664                0
665            };
666            last_output_end = if cur_end < data.len() {
667                cur_end + 1
668            } else {
669                cur_end
670            };
671        }
672
673        if cur_end < data.len() {
674            cur_start = cur_end + 1;
675        } else {
676            break;
677        }
678    }
679
680    // Flush remaining run
681    if run_start < data.len() {
682        writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
683    }
684
685    // Ensure trailing terminator
686    if !data.is_empty() && *data.last().unwrap() != term {
687        writer.write_all(&[term])?;
688    }
689
690    Ok(())
691}
692
693/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
694/// positions in each chunk in parallel, then write output runs directly from
695/// the original data. No per-chunk buffer allocation needed.
696fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
697    use rayon::prelude::*;
698
699    let num_threads = rayon::current_num_threads().max(1);
700    let chunk_target = data.len() / num_threads;
701
702    // Find chunk boundaries aligned to line terminators
703    let mut boundaries = Vec::with_capacity(num_threads + 1);
704    boundaries.push(0usize);
705    for i in 1..num_threads {
706        let target = i * chunk_target;
707        if target >= data.len() {
708            break;
709        }
710        if let Some(p) = memchr::memchr(term, &data[target..]) {
711            let b = target + p + 1;
712            if b > *boundaries.last().unwrap() && b <= data.len() {
713                boundaries.push(b);
714            }
715        }
716    }
717    boundaries.push(data.len());
718
719    let n_chunks = boundaries.len() - 1;
720    if n_chunks <= 1 {
721        return process_default_sequential(data, writer, term);
722    }
723
724    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
725    struct ChunkResult {
726        /// Byte ranges in the original data to output (contiguous runs)
727        runs: Vec<(usize, usize)>,
728        /// First line in chunk (absolute offsets into data, content without term)
729        first_line_start: usize,
730        first_line_end: usize,
731        /// Last *output* line in chunk (content without term)
732        last_line_start: usize,
733        last_line_end: usize,
734    }
735
736    let results: Vec<ChunkResult> = boundaries
737        .windows(2)
738        .collect::<Vec<_>>()
739        .par_iter()
740        .map(|w| {
741            let chunk_start = w[0];
742            let chunk_end = w[1];
743            let chunk = &data[chunk_start..chunk_end];
744
745            let first_term = match memchr::memchr(term, chunk) {
746                Some(pos) => pos,
747                None => {
748                    return ChunkResult {
749                        runs: vec![(chunk_start, chunk_end)],
750                        first_line_start: chunk_start,
751                        first_line_end: chunk_end,
752                        last_line_start: chunk_start,
753                        last_line_end: chunk_end,
754                    };
755                }
756            };
757
758            let first_line_start = chunk_start;
759            let first_line_end = chunk_start + first_term;
760
761            let mut runs: Vec<(usize, usize)> = Vec::new();
762            let mut run_start = chunk_start;
763            let mut prev_start = 0usize;
764            let mut prev_end = first_term;
765            let mut last_out_start = chunk_start;
766            let mut last_out_end = first_line_end;
767
768            let mut cur_start = first_term + 1;
769            while cur_start < chunk.len() {
770                let cur_end = match memchr::memchr(term, &chunk[cur_start..]) {
771                    Some(offset) => cur_start + offset,
772                    None => chunk.len(),
773                };
774
775                if lines_equal_fast(&chunk[prev_start..prev_end], &chunk[cur_start..cur_end]) {
776                    // Duplicate — flush current run up to this line
777                    let abs_cur = chunk_start + cur_start;
778                    if run_start < abs_cur {
779                        runs.push((run_start, abs_cur));
780                    }
781                    // New run starts after this duplicate
782                    run_start = chunk_start
783                        + if cur_end < chunk.len() {
784                            cur_end + 1
785                        } else {
786                            cur_end
787                        };
788                } else {
789                    last_out_start = chunk_start + cur_start;
790                    last_out_end = chunk_start + cur_end;
791                }
792                prev_start = cur_start;
793                prev_end = cur_end;
794
795                if cur_end < chunk.len() {
796                    cur_start = cur_end + 1;
797                } else {
798                    break;
799                }
800            }
801
802            // Close final run
803            if run_start < chunk_end {
804                runs.push((run_start, chunk_end));
805            }
806
807            ChunkResult {
808                runs,
809                first_line_start,
810                first_line_end,
811                last_line_start: last_out_start,
812                last_line_end: last_out_end,
813            }
814        })
815        .collect();
816
817    // Write results, adjusting cross-chunk boundaries
818    for (i, result) in results.iter().enumerate() {
819        let skip_first = if i > 0 {
820            let prev = &results[i - 1];
821            let prev_last = &data[prev.last_line_start..prev.last_line_end];
822            let cur_first = &data[result.first_line_start..result.first_line_end];
823            lines_equal_fast(prev_last, cur_first)
824        } else {
825            false
826        };
827
828        let skip_end = if skip_first {
829            // Skip bytes up to and including the first line's terminator
830            result.first_line_end + 1
831        } else {
832            0
833        };
834
835        for &(rs, re) in &result.runs {
836            let actual_start = rs.max(skip_end);
837            if actual_start < re {
838                writer.write_all(&data[actual_start..re])?;
839            }
840        }
841    }
842
843    // Ensure trailing terminator
844    if !data.is_empty() && *data.last().unwrap() != term {
845        writer.write_all(&[term])?;
846    }
847
848    Ok(())
849}
850
851/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
852/// Zero-copy: writes directly from input data, no output buffer allocation.
853/// Uses cached prefix comparison for fast duplicate detection.
854fn process_filter_fast_singlepass(
855    data: &[u8],
856    writer: &mut impl Write,
857    config: &UniqConfig,
858    term: u8,
859) -> io::Result<()> {
860    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
861
862    let first_term = match memchr::memchr(term, data) {
863        Some(pos) => pos,
864        None => {
865            // Single line: unique (count=1)
866            if !repeated {
867                writer.write_all(data)?;
868                writer.write_all(&[term])?;
869            }
870            return Ok(());
871        }
872    };
873
874    let mut prev_start: usize = 0;
875    let mut prev_end: usize = first_term;
876    let mut prev_len = prev_end;
877    let mut count: u64 = 1;
878    let mut cur_start = first_term + 1;
879
880    while cur_start < data.len() {
881        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
882            Some(offset) => cur_start + offset,
883            None => data.len(),
884        };
885
886        let cur_len = cur_end - cur_start;
887        let is_dup = cur_len == prev_len
888            && lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]);
889
890        if is_dup {
891            count += 1;
892        } else {
893            // Output previous group -- write directly from input data (zero-copy)
894            let should_print = if repeated { count > 1 } else { count == 1 };
895            if should_print {
896                if prev_end < data.len() && data[prev_end] == term {
897                    writer.write_all(&data[prev_start..prev_end + 1])?;
898                } else {
899                    writer.write_all(&data[prev_start..prev_end])?;
900                    writer.write_all(&[term])?;
901                }
902            }
903            prev_start = cur_start;
904            prev_end = cur_end;
905            prev_len = cur_len;
906            count = 1;
907        }
908
909        if cur_end < data.len() {
910            cur_start = cur_end + 1;
911        } else {
912            break;
913        }
914    }
915
916    // Output last group
917    let should_print = if repeated { count > 1 } else { count == 1 };
918    if should_print {
919        if prev_end < data.len() && data[prev_end] == term {
920            writer.write_all(&data[prev_start..prev_end + 1])?;
921        } else {
922            writer.write_all(&data[prev_start..prev_end])?;
923            writer.write_all(&[term])?;
924        }
925    }
926
927    Ok(())
928}
929
930/// Fast single-pass for count mode (-c) with all standard output modes.
931/// Zero line_starts allocation: scans with memchr, counts groups inline,
932/// and writes count-prefixed lines directly.
933/// Uses cached length comparison for fast duplicate rejection.
934fn process_count_fast_singlepass(
935    data: &[u8],
936    writer: &mut impl Write,
937    config: &UniqConfig,
938    term: u8,
939) -> io::Result<()> {
940    let first_term = match memchr::memchr(term, data) {
941        Some(pos) => pos,
942        None => {
943            // Single line: count=1
944            let should_print = match config.mode {
945                OutputMode::Default => true,
946                OutputMode::RepeatedOnly => false,
947                OutputMode::UniqueOnly => true,
948                _ => true,
949            };
950            if should_print {
951                write_count_line(writer, 1, data, term)?;
952            }
953            return Ok(());
954        }
955    };
956
957    let mut prev_start: usize = 0;
958    let mut prev_end: usize = first_term;
959    let mut prev_len = prev_end;
960    let mut count: u64 = 1;
961    let mut cur_start = first_term + 1;
962
963    while cur_start < data.len() {
964        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
965            Some(offset) => cur_start + offset,
966            None => data.len(),
967        };
968
969        let cur_len = cur_end - cur_start;
970        let is_dup = cur_len == prev_len
971            && lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]);
972
973        if is_dup {
974            count += 1;
975        } else {
976            // Output previous group with count
977            let should_print = match config.mode {
978                OutputMode::Default => true,
979                OutputMode::RepeatedOnly => count > 1,
980                OutputMode::UniqueOnly => count == 1,
981                _ => true,
982            };
983            if should_print {
984                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
985            }
986            prev_start = cur_start;
987            prev_end = cur_end;
988            prev_len = cur_len;
989            count = 1;
990        }
991
992        if cur_end < data.len() {
993            cur_start = cur_end + 1;
994        } else {
995            break;
996        }
997    }
998
999    // Output last group
1000    let should_print = match config.mode {
1001        OutputMode::Default => true,
1002        OutputMode::RepeatedOnly => count > 1,
1003        OutputMode::UniqueOnly => count == 1,
1004        _ => true,
1005    };
1006    if should_print {
1007        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1008    }
1009
1010    Ok(())
1011}
1012
1013/// Fast single-pass for case-insensitive (-i) default mode.
1014/// Same logic as process_default_sequential but uses eq_ignore_ascii_case.
1015fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1016    let mut prev_start: usize = 0;
1017    let mut prev_end: usize;
1018
1019    match memchr::memchr(term, data) {
1020        Some(pos) => {
1021            prev_end = pos;
1022        }
1023        None => {
1024            writer.write_all(data)?;
1025            return writer.write_all(&[term]);
1026        }
1027    }
1028
1029    // Write first line
1030    writer.write_all(&data[..prev_end + 1])?;
1031
1032    let mut cur_start = prev_end + 1;
1033
1034    while cur_start < data.len() {
1035        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1036            Some(offset) => cur_start + offset,
1037            None => data.len(),
1038        };
1039
1040        let prev_content = &data[prev_start..prev_end];
1041        let cur_content = &data[cur_start..cur_end];
1042
1043        if !lines_equal_case_insensitive(prev_content, cur_content) {
1044            // Different line — write it
1045            if cur_end < data.len() {
1046                writer.write_all(&data[cur_start..cur_end + 1])?;
1047            } else {
1048                writer.write_all(&data[cur_start..cur_end])?;
1049                writer.write_all(&[term])?;
1050            }
1051            prev_start = cur_start;
1052            prev_end = cur_end;
1053        }
1054
1055        if cur_end < data.len() {
1056            cur_start = cur_end + 1;
1057        } else {
1058            break;
1059        }
1060    }
1061
1062    Ok(())
1063}
1064
1065/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1066fn process_filter_ci_singlepass(
1067    data: &[u8],
1068    writer: &mut impl Write,
1069    config: &UniqConfig,
1070    term: u8,
1071) -> io::Result<()> {
1072    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1073
1074    let first_term = match memchr::memchr(term, data) {
1075        Some(pos) => pos,
1076        None => {
1077            if !repeated {
1078                writer.write_all(data)?;
1079                writer.write_all(&[term])?;
1080            }
1081            return Ok(());
1082        }
1083    };
1084
1085    let mut prev_start: usize = 0;
1086    let mut prev_end: usize = first_term;
1087    let mut count: u64 = 1;
1088    let mut cur_start = first_term + 1;
1089
1090    while cur_start < data.len() {
1091        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1092            Some(offset) => cur_start + offset,
1093            None => data.len(),
1094        };
1095
1096        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1097            count += 1;
1098        } else {
1099            let should_print = if repeated { count > 1 } else { count == 1 };
1100            if should_print {
1101                if prev_end < data.len() && data[prev_end] == term {
1102                    writer.write_all(&data[prev_start..prev_end + 1])?;
1103                } else {
1104                    writer.write_all(&data[prev_start..prev_end])?;
1105                    writer.write_all(&[term])?;
1106                }
1107            }
1108            prev_start = cur_start;
1109            prev_end = cur_end;
1110            count = 1;
1111        }
1112
1113        if cur_end < data.len() {
1114            cur_start = cur_end + 1;
1115        } else {
1116            break;
1117        }
1118    }
1119
1120    let should_print = if repeated { count > 1 } else { count == 1 };
1121    if should_print {
1122        if prev_end < data.len() && data[prev_end] == term {
1123            writer.write_all(&data[prev_start..prev_end + 1])?;
1124        } else {
1125            writer.write_all(&data[prev_start..prev_end])?;
1126            writer.write_all(&[term])?;
1127        }
1128    }
1129
1130    Ok(())
1131}
1132
1133/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1134fn process_count_ci_singlepass(
1135    data: &[u8],
1136    writer: &mut impl Write,
1137    config: &UniqConfig,
1138    term: u8,
1139) -> io::Result<()> {
1140    let first_term = match memchr::memchr(term, data) {
1141        Some(pos) => pos,
1142        None => {
1143            let should_print = match config.mode {
1144                OutputMode::Default => true,
1145                OutputMode::RepeatedOnly => false,
1146                OutputMode::UniqueOnly => true,
1147                _ => true,
1148            };
1149            if should_print {
1150                write_count_line(writer, 1, data, term)?;
1151            }
1152            return Ok(());
1153        }
1154    };
1155
1156    let mut prev_start: usize = 0;
1157    let mut prev_end: usize = first_term;
1158    let mut count: u64 = 1;
1159    let mut cur_start = first_term + 1;
1160
1161    while cur_start < data.len() {
1162        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1163            Some(offset) => cur_start + offset,
1164            None => data.len(),
1165        };
1166
1167        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1168            count += 1;
1169        } else {
1170            let should_print = match config.mode {
1171                OutputMode::Default => true,
1172                OutputMode::RepeatedOnly => count > 1,
1173                OutputMode::UniqueOnly => count == 1,
1174                _ => true,
1175            };
1176            if should_print {
1177                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1178            }
1179            prev_start = cur_start;
1180            prev_end = cur_end;
1181            count = 1;
1182        }
1183
1184        if cur_end < data.len() {
1185            cur_start = cur_end + 1;
1186        } else {
1187            break;
1188        }
1189    }
1190
1191    let should_print = match config.mode {
1192        OutputMode::Default => true,
1193        OutputMode::RepeatedOnly => count > 1,
1194        OutputMode::UniqueOnly => count == 1,
1195        _ => true,
1196    };
1197    if should_print {
1198        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1199    }
1200
1201    Ok(())
1202}
1203
1204/// Output a group for standard modes (bytes path).
1205#[inline(always)]
1206fn output_group_bytes(
1207    writer: &mut impl Write,
1208    content: &[u8],
1209    full: &[u8],
1210    count: u64,
1211    config: &UniqConfig,
1212    term: u8,
1213) -> io::Result<()> {
1214    let should_print = match config.mode {
1215        OutputMode::Default => true,
1216        OutputMode::RepeatedOnly => count > 1,
1217        OutputMode::UniqueOnly => count == 1,
1218        _ => true,
1219    };
1220
1221    if should_print {
1222        if config.count {
1223            write_count_line(writer, count, content, term)?;
1224        } else {
1225            writer.write_all(full)?;
1226            // Add terminator if the original line didn't have one
1227            if full.len() == content.len() {
1228                writer.write_all(&[term])?;
1229            }
1230        }
1231    }
1232
1233    Ok(())
1234}
1235
1236/// Process --all-repeated / -D mode on byte slices.
1237fn process_all_repeated_bytes(
1238    data: &[u8],
1239    writer: &mut impl Write,
1240    config: &UniqConfig,
1241    method: AllRepeatedMethod,
1242    term: u8,
1243) -> io::Result<()> {
1244    let mut lines = LineIter::new(data, term);
1245
1246    let first = match lines.next() {
1247        Some(v) => v,
1248        None => return Ok(()),
1249    };
1250
1251    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1252    // For all-repeated we need to buffer group lines since we only print if count > 1
1253    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1254    group_lines.push(first);
1255    let mut first_group_printed = false;
1256
1257    let fast = !needs_key_extraction(config) && !config.ignore_case;
1258
1259    for (cur_content, cur_full) in lines {
1260        let prev_content = group_lines.last().unwrap().0;
1261        let equal = if fast {
1262            lines_equal_fast(prev_content, cur_content)
1263        } else {
1264            lines_equal(prev_content, cur_content, config)
1265        };
1266
1267        if equal {
1268            group_lines.push((cur_content, cur_full));
1269        } else {
1270            // Flush group
1271            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1272            group_lines.clear();
1273            group_lines.push((cur_content, cur_full));
1274        }
1275    }
1276
1277    // Flush last group
1278    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1279
1280    Ok(())
1281}
1282
1283/// Flush a group for --all-repeated mode (bytes path).
1284fn flush_all_repeated_bytes(
1285    writer: &mut impl Write,
1286    group: &[(&[u8], &[u8])],
1287    method: AllRepeatedMethod,
1288    first_group_printed: &mut bool,
1289    term: u8,
1290) -> io::Result<()> {
1291    if group.len() <= 1 {
1292        return Ok(()); // Not a duplicate group
1293    }
1294
1295    match method {
1296        AllRepeatedMethod::Prepend => {
1297            writer.write_all(&[term])?;
1298        }
1299        AllRepeatedMethod::Separate => {
1300            if *first_group_printed {
1301                writer.write_all(&[term])?;
1302            }
1303        }
1304        AllRepeatedMethod::None => {}
1305    }
1306
1307    for &(content, full) in group {
1308        writer.write_all(full)?;
1309        if full.len() == content.len() {
1310            writer.write_all(&[term])?;
1311        }
1312    }
1313
1314    *first_group_printed = true;
1315    Ok(())
1316}
1317
1318/// Process --group mode on byte slices.
1319fn process_group_bytes(
1320    data: &[u8],
1321    writer: &mut impl Write,
1322    config: &UniqConfig,
1323    method: GroupMethod,
1324    term: u8,
1325) -> io::Result<()> {
1326    let mut lines = LineIter::new(data, term);
1327
1328    let (prev_content, prev_full) = match lines.next() {
1329        Some(v) => v,
1330        None => return Ok(()),
1331    };
1332
1333    // Prepend/Both: separator before first group
1334    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1335        writer.write_all(&[term])?;
1336    }
1337
1338    // Write first line
1339    writer.write_all(prev_full)?;
1340    if prev_full.len() == prev_content.len() {
1341        writer.write_all(&[term])?;
1342    }
1343
1344    let mut prev_content = prev_content;
1345    let fast = !needs_key_extraction(config) && !config.ignore_case;
1346
1347    for (cur_content, cur_full) in lines {
1348        let equal = if fast {
1349            lines_equal_fast(prev_content, cur_content)
1350        } else {
1351            lines_equal(prev_content, cur_content, config)
1352        };
1353
1354        if !equal {
1355            // New group — write separator
1356            writer.write_all(&[term])?;
1357        }
1358
1359        writer.write_all(cur_full)?;
1360        if cur_full.len() == cur_content.len() {
1361            writer.write_all(&[term])?;
1362        }
1363
1364        prev_content = cur_content;
1365    }
1366
1367    // Append/Both: separator after last group
1368    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1369        writer.write_all(&[term])?;
1370    }
1371
1372    Ok(())
1373}
1374
1375// ============================================================================
1376// Streaming processing (for stdin / pipe input)
1377// ============================================================================
1378
1379/// Main streaming uniq processor.
1380/// Reads from `input`, writes to `output`.
1381pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
1382    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
1383    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
1384    let term = if config.zero_terminated { b'\0' } else { b'\n' };
1385
1386    match config.mode {
1387        OutputMode::Group(method) => {
1388            process_group_stream(reader, &mut writer, config, method, term)?;
1389        }
1390        OutputMode::AllRepeated(method) => {
1391            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
1392        }
1393        _ => {
1394            process_standard_stream(reader, &mut writer, config, term)?;
1395        }
1396    }
1397
1398    writer.flush()?;
1399    Ok(())
1400}
1401
1402/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
1403fn process_standard_stream<R: BufRead, W: Write>(
1404    mut reader: R,
1405    writer: &mut W,
1406    config: &UniqConfig,
1407    term: u8,
1408) -> io::Result<()> {
1409    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1410    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1411
1412    // Read first line
1413    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1414        return Ok(()); // empty input
1415    }
1416    let mut count: u64 = 1;
1417
1418    loop {
1419        current_line.clear();
1420        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1421
1422        if bytes_read == 0 {
1423            // End of input — output the last group
1424            output_group_stream(writer, &prev_line, count, config, term)?;
1425            break;
1426        }
1427
1428        if compare_lines_stream(&prev_line, &current_line, config, term) {
1429            count += 1;
1430        } else {
1431            output_group_stream(writer, &prev_line, count, config, term)?;
1432            std::mem::swap(&mut prev_line, &mut current_line);
1433            count = 1;
1434        }
1435    }
1436
1437    Ok(())
1438}
1439
1440/// Compare two lines (with terminators) in streaming mode.
1441#[inline(always)]
1442fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
1443    let a_stripped = strip_term(a, term);
1444    let b_stripped = strip_term(b, term);
1445    lines_equal(a_stripped, b_stripped, config)
1446}
1447
1448/// Strip terminator from end of line.
1449#[inline(always)]
1450fn strip_term(line: &[u8], term: u8) -> &[u8] {
1451    if line.last() == Some(&term) {
1452        &line[..line.len() - 1]
1453    } else {
1454        line
1455    }
1456}
1457
1458/// Output a group in streaming mode.
1459#[inline(always)]
1460fn output_group_stream(
1461    writer: &mut impl Write,
1462    line: &[u8],
1463    count: u64,
1464    config: &UniqConfig,
1465    term: u8,
1466) -> io::Result<()> {
1467    let should_print = match config.mode {
1468        OutputMode::Default => true,
1469        OutputMode::RepeatedOnly => count > 1,
1470        OutputMode::UniqueOnly => count == 1,
1471        _ => true,
1472    };
1473
1474    if should_print {
1475        let content = strip_term(line, term);
1476        if config.count {
1477            write_count_line(writer, count, content, term)?;
1478        } else {
1479            writer.write_all(content)?;
1480            writer.write_all(&[term])?;
1481        }
1482    }
1483
1484    Ok(())
1485}
1486
1487/// Process --all-repeated / -D mode (streaming).
1488fn process_all_repeated_stream<R: BufRead, W: Write>(
1489    mut reader: R,
1490    writer: &mut W,
1491    config: &UniqConfig,
1492    method: AllRepeatedMethod,
1493    term: u8,
1494) -> io::Result<()> {
1495    let mut group: Vec<Vec<u8>> = Vec::new();
1496    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1497    let mut first_group_printed = false;
1498
1499    current_line.clear();
1500    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
1501        return Ok(());
1502    }
1503    group.push(current_line.clone());
1504
1505    loop {
1506        current_line.clear();
1507        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1508
1509        if bytes_read == 0 {
1510            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1511            break;
1512        }
1513
1514        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
1515            group.push(current_line.clone());
1516        } else {
1517            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1518            group.clear();
1519            group.push(current_line.clone());
1520        }
1521    }
1522
1523    Ok(())
1524}
1525
1526/// Flush a group for --all-repeated mode (streaming).
1527fn flush_all_repeated_stream(
1528    writer: &mut impl Write,
1529    group: &[Vec<u8>],
1530    method: AllRepeatedMethod,
1531    first_group_printed: &mut bool,
1532    term: u8,
1533) -> io::Result<()> {
1534    if group.len() <= 1 {
1535        return Ok(());
1536    }
1537
1538    match method {
1539        AllRepeatedMethod::Prepend => {
1540            writer.write_all(&[term])?;
1541        }
1542        AllRepeatedMethod::Separate => {
1543            if *first_group_printed {
1544                writer.write_all(&[term])?;
1545            }
1546        }
1547        AllRepeatedMethod::None => {}
1548    }
1549
1550    for line in group {
1551        let content = strip_term(line, term);
1552        writer.write_all(content)?;
1553        writer.write_all(&[term])?;
1554    }
1555
1556    *first_group_printed = true;
1557    Ok(())
1558}
1559
1560/// Process --group mode (streaming).
1561fn process_group_stream<R: BufRead, W: Write>(
1562    mut reader: R,
1563    writer: &mut W,
1564    config: &UniqConfig,
1565    method: GroupMethod,
1566    term: u8,
1567) -> io::Result<()> {
1568    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1569    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1570
1571    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1572        return Ok(());
1573    }
1574
1575    // Prepend/Both: separator before first group
1576    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1577        writer.write_all(&[term])?;
1578    }
1579
1580    let content = strip_term(&prev_line, term);
1581    writer.write_all(content)?;
1582    writer.write_all(&[term])?;
1583
1584    loop {
1585        current_line.clear();
1586        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1587
1588        if bytes_read == 0 {
1589            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1590                writer.write_all(&[term])?;
1591            }
1592            break;
1593        }
1594
1595        if !compare_lines_stream(&prev_line, &current_line, config, term) {
1596            writer.write_all(&[term])?;
1597        }
1598
1599        let content = strip_term(&current_line, term);
1600        writer.write_all(content)?;
1601        writer.write_all(&[term])?;
1602
1603        std::mem::swap(&mut prev_line, &mut current_line);
1604    }
1605
1606    Ok(())
1607}
1608
1609/// Read a line terminated by the given byte (newline or NUL).
1610/// Returns number of bytes read (0 = EOF).
1611#[inline(always)]
1612fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1613    reader.read_until(term, buf)
1614}