Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
119/// Uses length check + 8-byte prefix rejection before full comparison.
120#[inline(always)]
121fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
122    let alen = a.len();
123    if alen != b.len() {
124        return false;
125    }
126    if alen == 0 {
127        return true;
128    }
129    a.eq_ignore_ascii_case(b)
130}
131
132/// Check if config requires field/char skipping or char limiting.
133#[inline(always)]
134fn needs_key_extraction(config: &UniqConfig) -> bool {
135    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
136}
137
138/// Fast path comparison: no field/char extraction needed, no case folding.
139/// Uses pointer+length equality shortcut and multi-word prefix rejection.
140/// For short lines (<= 32 bytes, common in many-dups data), avoids the
141/// full memcmp call overhead by doing direct word comparisons.
142#[inline(always)]
143fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
144    let alen = a.len();
145    if alen != b.len() {
146        return false;
147    }
148    if alen == 0 {
149        return true;
150    }
151    // Short-line fast path: compare via word loads to avoid memcmp call overhead
152    if alen <= 8 {
153        // For < 8 bytes: byte-by-byte via slice (compiler vectorizes this)
154        return a == b;
155    }
156    // 8-byte prefix check: reject most non-equal lines without full memcmp
157    let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
158    let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
159    if a8 != b8 {
160        return false;
161    }
162    // Check last 8 bytes (overlapping for 9-16 byte lines, eliminating full memcmp)
163    if alen <= 16 {
164        let a_tail = unsafe { (a.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
165        let b_tail = unsafe { (b.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
166        return a_tail == b_tail;
167    }
168    // For 17-32 bytes: check first 16 + last 16 (overlapping) to avoid memcmp
169    if alen <= 32 {
170        let a16 = unsafe { (a.as_ptr().add(8) as *const u64).read_unaligned() };
171        let b16 = unsafe { (b.as_ptr().add(8) as *const u64).read_unaligned() };
172        if a16 != b16 {
173            return false;
174        }
175        let a_tail = unsafe { (a.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
176        let b_tail = unsafe { (b.as_ptr().add(alen - 8) as *const u64).read_unaligned() };
177        return a_tail == b_tail;
178    }
179    // Longer lines: prefix passed, fall through to full memcmp
180    a == b
181}
182
183/// Write a count-prefixed line in GNU uniq format.
184/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
185/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
186#[inline(always)]
187fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
188    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
189    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
190    let digits = itoa_right_aligned_into(&mut prefix, count);
191    let width = digits.max(7); // minimum 7 chars
192    let prefix_len = width + 1; // +1 for trailing space
193    prefix[width] = b' ';
194
195    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
196    let total = prefix_len + line.len() + 1;
197    if total <= 256 {
198        let mut buf = [0u8; 256];
199        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
200        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
201        buf[prefix_len + line.len()] = term;
202        out.write_all(&buf[..total])
203    } else {
204        out.write_all(&prefix[..prefix_len])?;
205        out.write_all(line)?;
206        out.write_all(&[term])
207    }
208}
209
210/// Write u64 decimal right-aligned into prefix buffer.
211/// Buffer is pre-filled with spaces. Returns number of digits written.
212#[inline(always)]
213fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
214    if val == 0 {
215        buf[6] = b'0';
216        return 7; // 6 spaces + '0' = 7 chars
217    }
218    // Write digits right-to-left from position 27 (leaving room for trailing space)
219    let mut pos = 27;
220    while val > 0 {
221        pos -= 1;
222        buf[pos] = b'0' + (val % 10) as u8;
223        val /= 10;
224    }
225    let num_digits = 27 - pos;
226    if num_digits >= 7 {
227        // Number is wide enough, shift to front
228        buf.copy_within(pos..27, 0);
229        num_digits
230    } else {
231        // Right-align in 7-char field: spaces then digits
232        let pad = 7 - num_digits;
233        buf.copy_within(pos..27, pad);
234        // buf[0..pad] is already spaces from initialization
235        7
236    }
237}
238
239// ============================================================================
240// High-performance mmap-based processing (for byte slices, zero-copy)
241// ============================================================================
242
243/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
244pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
245    // 32MB output buffer for fewer flush syscalls on large inputs.
246    // Larger buffer reduces write() syscall count for 100MB+ inputs.
247    let mut writer = BufWriter::with_capacity(32 * 1024 * 1024, output);
248    let term = if config.zero_terminated { b'\0' } else { b'\n' };
249
250    match config.mode {
251        OutputMode::Group(method) => {
252            process_group_bytes(data, &mut writer, config, method, term)?;
253        }
254        OutputMode::AllRepeated(method) => {
255            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
256        }
257        _ => {
258            process_standard_bytes(data, &mut writer, config, term)?;
259        }
260    }
261
262    writer.flush()?;
263    Ok(())
264}
265
266/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
267/// Uses memchr for SIMD-accelerated line boundary detection.
268struct LineIter<'a> {
269    data: &'a [u8],
270    pos: usize,
271    term: u8,
272}
273
274impl<'a> LineIter<'a> {
275    #[inline(always)]
276    fn new(data: &'a [u8], term: u8) -> Self {
277        Self { data, pos: 0, term }
278    }
279}
280
281impl<'a> Iterator for LineIter<'a> {
282    /// (line content without terminator, full line including terminator for output)
283    type Item = (&'a [u8], &'a [u8]);
284
285    #[inline(always)]
286    fn next(&mut self) -> Option<Self::Item> {
287        if self.pos >= self.data.len() {
288            return None;
289        }
290
291        let remaining = &self.data[self.pos..];
292        match memchr::memchr(self.term, remaining) {
293            Some(idx) => {
294                let line_start = self.pos;
295                let line_end = self.pos + idx; // without terminator
296                let full_end = self.pos + idx + 1; // with terminator
297                self.pos = full_end;
298                Some((
299                    &self.data[line_start..line_end],
300                    &self.data[line_start..full_end],
301                ))
302            }
303            None => {
304                // Last line without terminator
305                let line_start = self.pos;
306                self.pos = self.data.len();
307                let line = &self.data[line_start..];
308                Some((line, line))
309            }
310        }
311    }
312}
313
314/// Get line content (without terminator) from pre-computed positions.
315/// `content_end` is the end of actual content (excludes trailing terminator if present).
316#[inline(always)]
317fn line_content_at<'a>(
318    data: &'a [u8],
319    line_starts: &[usize],
320    idx: usize,
321    content_end: usize,
322) -> &'a [u8] {
323    let start = line_starts[idx];
324    let end = if idx + 1 < line_starts.len() {
325        line_starts[idx + 1] - 1 // exclude terminator
326    } else {
327        content_end // last line: pre-computed to exclude trailing terminator
328    };
329    &data[start..end]
330}
331
332/// Get full line (with terminator) from pre-computed positions.
333#[inline(always)]
334fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
335    let start = line_starts[idx];
336    let end = if idx + 1 < line_starts.len() {
337        line_starts[idx + 1] // include terminator
338    } else {
339        data.len()
340    };
341    &data[start..end]
342}
343
344/// Linear scan for the end of a duplicate group.
345/// Returns the index of the first line that differs from line_starts[group_start].
346/// Must use linear scan (not binary search) because uniq input may NOT be sorted --
347/// equal lines can appear in non-adjacent groups separated by different lines.
348/// Caches key length for fast length-mismatch rejection.
349#[inline]
350fn linear_scan_group_end(
351    data: &[u8],
352    line_starts: &[usize],
353    group_start: usize,
354    num_lines: usize,
355    content_end: usize,
356) -> usize {
357    let key = line_content_at(data, line_starts, group_start, content_end);
358    let key_len = key.len();
359    let mut i = group_start + 1;
360    while i < num_lines {
361        let candidate = line_content_at(data, line_starts, i, content_end);
362        if candidate.len() != key_len || !lines_equal_fast(key, candidate) {
363            return i;
364        }
365        i += 1;
366    }
367    i
368}
369
370/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
371/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
372/// General path: pre-computed line positions with binary search for groups.
373fn process_standard_bytes(
374    data: &[u8],
375    writer: &mut impl Write,
376    config: &UniqConfig,
377    term: u8,
378) -> io::Result<()> {
379    if data.is_empty() {
380        return Ok(());
381    }
382
383    let fast = !needs_key_extraction(config) && !config.ignore_case;
384    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
385
386    // Ultra-fast path: default mode, no count, no key extraction.
387    // Single-pass: scan with memchr, compare adjacent lines inline.
388    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
389    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
390        return process_default_fast_singlepass(data, writer, term);
391    }
392
393    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
394    if fast
395        && !config.count
396        && matches!(
397            config.mode,
398            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
399        )
400    {
401        return process_filter_fast_singlepass(data, writer, config, term);
402    }
403
404    // Ultra-fast path: count mode with no key extraction.
405    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
406    // Avoids the line_starts Vec allocation (20MB+ for large files).
407    if fast && config.count {
408        return process_count_fast_singlepass(data, writer, config, term);
409    }
410
411    // Fast path for case-insensitive (-i) mode with no key extraction.
412    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
413    // Avoids the general path's line_starts Vec allocation.
414    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
415        return process_default_ci_singlepass(data, writer, term);
416    }
417
418    if fast_ci
419        && !config.count
420        && matches!(
421            config.mode,
422            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
423        )
424    {
425        return process_filter_ci_singlepass(data, writer, config, term);
426    }
427
428    if fast_ci && config.count {
429        return process_count_ci_singlepass(data, writer, config, term);
430    }
431
432    // General path: pre-computed line positions for binary search on groups
433    let estimated_lines = (data.len() / 40).max(64);
434    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
435    line_starts.push(0);
436    for pos in memchr::memchr_iter(term, data) {
437        if pos + 1 < data.len() {
438            line_starts.push(pos + 1);
439        }
440    }
441    let num_lines = line_starts.len();
442    if num_lines == 0 {
443        return Ok(());
444    }
445
446    // Pre-compute content end: if data ends with terminator, exclude it for last line
447    let content_end = if data.last() == Some(&term) {
448        data.len() - 1
449    } else {
450        data.len()
451    };
452
453    // Ultra-fast path: default mode, no count, no key extraction
454    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
455        // Write first line
456        let first_full = line_full_at(data, &line_starts, 0);
457        let first_content = line_content_at(data, &line_starts, 0, content_end);
458        write_all_raw(writer, first_full)?;
459        if first_full.len() == first_content.len() {
460            writer.write_all(&[term])?;
461        }
462
463        let mut i = 1;
464        while i < num_lines {
465            let prev = line_content_at(data, &line_starts, i - 1, content_end);
466            let cur = line_content_at(data, &line_starts, i, content_end);
467
468            if lines_equal_fast(prev, cur) {
469                // Duplicate detected — linear scan for end of group
470                let group_end =
471                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
472                i = group_end;
473                continue;
474            }
475
476            // Unique line — write it
477            let cur_full = line_full_at(data, &line_starts, i);
478            write_all_raw(writer, cur_full)?;
479            if cur_full.len() == cur.len() {
480                writer.write_all(&[term])?;
481            }
482            i += 1;
483        }
484        return Ok(());
485    }
486
487    // General path with count tracking
488    let mut i = 0;
489    while i < num_lines {
490        let content = line_content_at(data, &line_starts, i, content_end);
491        let full = line_full_at(data, &line_starts, i);
492
493        let group_end = if fast
494            && i + 1 < num_lines
495            && lines_equal_fast(
496                content,
497                line_content_at(data, &line_starts, i + 1, content_end),
498            ) {
499            // Duplicate detected — linear scan for end
500            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
501        } else if !fast
502            && i + 1 < num_lines
503            && lines_equal(
504                content,
505                line_content_at(data, &line_starts, i + 1, content_end),
506                config,
507            )
508        {
509            // Slow path linear scan with key extraction
510            let mut j = i + 2;
511            while j < num_lines {
512                if !lines_equal(
513                    content,
514                    line_content_at(data, &line_starts, j, content_end),
515                    config,
516                ) {
517                    break;
518                }
519                j += 1;
520            }
521            j
522        } else {
523            i + 1
524        };
525
526        let count = (group_end - i) as u64;
527        output_group_bytes(writer, content, full, count, config, term)?;
528        i = group_end;
529    }
530
531    Ok(())
532}
533
534/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
535/// No pre-computed positions, no binary search, no Vec allocation.
536/// Outputs each line that differs from the previous.
537///
538/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
539/// independently, then cross-chunk boundaries are resolved.
540fn process_default_fast_singlepass(
541    data: &[u8],
542    writer: &mut impl Write,
543    term: u8,
544) -> io::Result<()> {
545    // Parallel path for large files — kick in at 2MB for better CPU utilization
546    if data.len() >= 2 * 1024 * 1024 {
547        return process_default_parallel(data, writer, term);
548    }
549
550    process_default_sequential(data, writer, term)
551}
552
553/// Sequential single-pass dedup with zero-copy output.
554/// Instead of copying data to a buffer, tracks contiguous output runs and writes
555/// directly from the original data. For all-unique data, this is a single write_all.
556///
557/// Optimized for the "many duplicates" case: caches the previous line's length
558/// and first-8-byte prefix for fast rejection of non-duplicates without
559/// calling the full comparison function.
560fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
561    let mut prev_start: usize = 0;
562    let mut prev_end: usize; // exclusive, without terminator
563
564    // Find end of first line
565    match memchr::memchr(term, data) {
566        Some(pos) => {
567            prev_end = pos;
568        }
569        None => {
570            // Single line, no terminator
571            writer.write_all(data)?;
572            return writer.write_all(&[term]);
573        }
574    }
575
576    // Cache previous line metadata for fast comparison
577    let mut prev_len = prev_end - prev_start;
578    let mut prev_prefix: u64 = if prev_len >= 8 {
579        unsafe { (data.as_ptr().add(prev_start) as *const u64).read_unaligned() }
580    } else {
581        0
582    };
583
584    // run_start tracks the beginning of the current contiguous output region.
585    // When a duplicate is found, we flush the run up to the duplicate and skip it.
586    let mut run_start: usize = 0;
587    let mut cur_start = prev_end + 1;
588    let mut last_output_end = prev_end + 1; // exclusive end including terminator
589
590    while cur_start < data.len() {
591        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
592            Some(offset) => cur_start + offset,
593            None => data.len(), // last line without terminator
594        };
595
596        let cur_len = cur_end - cur_start;
597
598        // Fast reject: if lengths differ, lines are definitely not equal
599        let is_dup = if cur_len != prev_len {
600            false
601        } else if cur_len == 0 {
602            true
603        } else if cur_len >= 8 {
604            // Compare cached 8-byte prefix first
605            let cur_prefix =
606                unsafe { (data.as_ptr().add(cur_start) as *const u64).read_unaligned() };
607            if cur_prefix != prev_prefix {
608                false
609            } else if cur_len <= 8 {
610                true // prefix covers entire line
611            } else if cur_len <= 16 {
612                // Check last 8 bytes (overlapping)
613                let a_tail = unsafe {
614                    (data.as_ptr().add(prev_start + prev_len - 8) as *const u64).read_unaligned()
615                };
616                let b_tail = unsafe {
617                    (data.as_ptr().add(cur_start + cur_len - 8) as *const u64).read_unaligned()
618                };
619                a_tail == b_tail
620            } else if cur_len <= 32 {
621                // Check bytes 8-16 and last 8 bytes
622                let a16 =
623                    unsafe { (data.as_ptr().add(prev_start + 8) as *const u64).read_unaligned() };
624                let b16 =
625                    unsafe { (data.as_ptr().add(cur_start + 8) as *const u64).read_unaligned() };
626                if a16 != b16 {
627                    false
628                } else {
629                    let a_tail = unsafe {
630                        (data.as_ptr().add(prev_start + prev_len - 8) as *const u64)
631                            .read_unaligned()
632                    };
633                    let b_tail = unsafe {
634                        (data.as_ptr().add(cur_start + cur_len - 8) as *const u64).read_unaligned()
635                    };
636                    a_tail == b_tail
637                }
638            } else {
639                data[prev_start..prev_end] == data[cur_start..cur_end]
640            }
641        } else {
642            // Short line < 8 bytes
643            data[prev_start..prev_end] == data[cur_start..cur_end]
644        };
645
646        if is_dup {
647            // Duplicate — flush the current run up to this line, then skip it
648            if run_start < cur_start {
649                writer.write_all(&data[run_start..cur_start])?;
650            }
651            // Start new run after this duplicate
652            if cur_end < data.len() {
653                run_start = cur_end + 1;
654            } else {
655                run_start = cur_end;
656            }
657        } else {
658            // Different line — update cached comparison state
659            prev_start = cur_start;
660            prev_end = cur_end;
661            prev_len = cur_len;
662            prev_prefix = if cur_len >= 8 {
663                unsafe { (data.as_ptr().add(cur_start) as *const u64).read_unaligned() }
664            } else {
665                0
666            };
667            last_output_end = if cur_end < data.len() {
668                cur_end + 1
669            } else {
670                cur_end
671            };
672        }
673
674        if cur_end < data.len() {
675            cur_start = cur_end + 1;
676        } else {
677            break;
678        }
679    }
680
681    // Flush remaining run
682    if run_start < data.len() {
683        writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
684    }
685
686    // Ensure trailing terminator
687    if !data.is_empty() && *data.last().unwrap() != term {
688        writer.write_all(&[term])?;
689    }
690
691    Ok(())
692}
693
694/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
695/// positions in each chunk in parallel, then write output runs directly from
696/// the original data. No per-chunk buffer allocation needed.
697fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
698    use rayon::prelude::*;
699
700    let num_threads = rayon::current_num_threads().max(1);
701    let chunk_target = data.len() / num_threads;
702
703    // Find chunk boundaries aligned to line terminators
704    let mut boundaries = Vec::with_capacity(num_threads + 1);
705    boundaries.push(0usize);
706    for i in 1..num_threads {
707        let target = i * chunk_target;
708        if target >= data.len() {
709            break;
710        }
711        if let Some(p) = memchr::memchr(term, &data[target..]) {
712            let b = target + p + 1;
713            if b > *boundaries.last().unwrap() && b <= data.len() {
714                boundaries.push(b);
715            }
716        }
717    }
718    boundaries.push(data.len());
719
720    let n_chunks = boundaries.len() - 1;
721    if n_chunks <= 1 {
722        return process_default_sequential(data, writer, term);
723    }
724
725    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
726    struct ChunkResult {
727        /// Byte ranges in the original data to output (contiguous runs)
728        runs: Vec<(usize, usize)>,
729        /// First line in chunk (absolute offsets into data, content without term)
730        first_line_start: usize,
731        first_line_end: usize,
732        /// Last *output* line in chunk (content without term)
733        last_line_start: usize,
734        last_line_end: usize,
735    }
736
737    let results: Vec<ChunkResult> = boundaries
738        .windows(2)
739        .collect::<Vec<_>>()
740        .par_iter()
741        .map(|w| {
742            let chunk_start = w[0];
743            let chunk_end = w[1];
744            let chunk = &data[chunk_start..chunk_end];
745
746            let first_term = match memchr::memchr(term, chunk) {
747                Some(pos) => pos,
748                None => {
749                    return ChunkResult {
750                        runs: vec![(chunk_start, chunk_end)],
751                        first_line_start: chunk_start,
752                        first_line_end: chunk_end,
753                        last_line_start: chunk_start,
754                        last_line_end: chunk_end,
755                    };
756                }
757            };
758
759            let first_line_start = chunk_start;
760            let first_line_end = chunk_start + first_term;
761
762            let mut runs: Vec<(usize, usize)> = Vec::new();
763            let mut run_start = chunk_start;
764            let mut prev_start = 0usize;
765            let mut prev_end = first_term;
766            let mut last_out_start = chunk_start;
767            let mut last_out_end = first_line_end;
768
769            let mut cur_start = first_term + 1;
770            while cur_start < chunk.len() {
771                let cur_end = match memchr::memchr(term, &chunk[cur_start..]) {
772                    Some(offset) => cur_start + offset,
773                    None => chunk.len(),
774                };
775
776                if lines_equal_fast(&chunk[prev_start..prev_end], &chunk[cur_start..cur_end]) {
777                    // Duplicate — flush current run up to this line
778                    let abs_cur = chunk_start + cur_start;
779                    if run_start < abs_cur {
780                        runs.push((run_start, abs_cur));
781                    }
782                    // New run starts after this duplicate
783                    run_start = chunk_start
784                        + if cur_end < chunk.len() {
785                            cur_end + 1
786                        } else {
787                            cur_end
788                        };
789                } else {
790                    last_out_start = chunk_start + cur_start;
791                    last_out_end = chunk_start + cur_end;
792                }
793                prev_start = cur_start;
794                prev_end = cur_end;
795
796                if cur_end < chunk.len() {
797                    cur_start = cur_end + 1;
798                } else {
799                    break;
800                }
801            }
802
803            // Close final run
804            if run_start < chunk_end {
805                runs.push((run_start, chunk_end));
806            }
807
808            ChunkResult {
809                runs,
810                first_line_start,
811                first_line_end,
812                last_line_start: last_out_start,
813                last_line_end: last_out_end,
814            }
815        })
816        .collect();
817
818    // Write results, adjusting cross-chunk boundaries
819    for (i, result) in results.iter().enumerate() {
820        let skip_first = if i > 0 {
821            let prev = &results[i - 1];
822            let prev_last = &data[prev.last_line_start..prev.last_line_end];
823            let cur_first = &data[result.first_line_start..result.first_line_end];
824            lines_equal_fast(prev_last, cur_first)
825        } else {
826            false
827        };
828
829        let skip_end = if skip_first {
830            // Skip bytes up to and including the first line's terminator
831            result.first_line_end + 1
832        } else {
833            0
834        };
835
836        for &(rs, re) in &result.runs {
837            let actual_start = rs.max(skip_end);
838            if actual_start < re {
839                writer.write_all(&data[actual_start..re])?;
840            }
841        }
842    }
843
844    // Ensure trailing terminator
845    if !data.is_empty() && *data.last().unwrap() != term {
846        writer.write_all(&[term])?;
847    }
848
849    Ok(())
850}
851
852/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
853/// Zero-copy: writes directly from input data, no output buffer allocation.
854/// Uses cached prefix comparison for fast duplicate detection.
855fn process_filter_fast_singlepass(
856    data: &[u8],
857    writer: &mut impl Write,
858    config: &UniqConfig,
859    term: u8,
860) -> io::Result<()> {
861    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
862
863    let first_term = match memchr::memchr(term, data) {
864        Some(pos) => pos,
865        None => {
866            // Single line: unique (count=1)
867            if !repeated {
868                writer.write_all(data)?;
869                writer.write_all(&[term])?;
870            }
871            return Ok(());
872        }
873    };
874
875    let mut prev_start: usize = 0;
876    let mut prev_end: usize = first_term;
877    let mut prev_len = prev_end;
878    let mut count: u64 = 1;
879    let mut cur_start = first_term + 1;
880
881    while cur_start < data.len() {
882        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
883            Some(offset) => cur_start + offset,
884            None => data.len(),
885        };
886
887        let cur_len = cur_end - cur_start;
888        let is_dup = cur_len == prev_len
889            && lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]);
890
891        if is_dup {
892            count += 1;
893        } else {
894            // Output previous group -- write directly from input data (zero-copy)
895            let should_print = if repeated { count > 1 } else { count == 1 };
896            if should_print {
897                if prev_end < data.len() && data[prev_end] == term {
898                    writer.write_all(&data[prev_start..prev_end + 1])?;
899                } else {
900                    writer.write_all(&data[prev_start..prev_end])?;
901                    writer.write_all(&[term])?;
902                }
903            }
904            prev_start = cur_start;
905            prev_end = cur_end;
906            prev_len = cur_len;
907            count = 1;
908        }
909
910        if cur_end < data.len() {
911            cur_start = cur_end + 1;
912        } else {
913            break;
914        }
915    }
916
917    // Output last group
918    let should_print = if repeated { count > 1 } else { count == 1 };
919    if should_print {
920        if prev_end < data.len() && data[prev_end] == term {
921            writer.write_all(&data[prev_start..prev_end + 1])?;
922        } else {
923            writer.write_all(&data[prev_start..prev_end])?;
924            writer.write_all(&[term])?;
925        }
926    }
927
928    Ok(())
929}
930
931/// Fast single-pass for count mode (-c) with all standard output modes.
932/// Zero line_starts allocation: scans with memchr, counts groups inline,
933/// and writes count-prefixed lines directly.
934/// Uses cached length comparison for fast duplicate rejection.
935fn process_count_fast_singlepass(
936    data: &[u8],
937    writer: &mut impl Write,
938    config: &UniqConfig,
939    term: u8,
940) -> io::Result<()> {
941    let first_term = match memchr::memchr(term, data) {
942        Some(pos) => pos,
943        None => {
944            // Single line: count=1
945            let should_print = match config.mode {
946                OutputMode::Default => true,
947                OutputMode::RepeatedOnly => false,
948                OutputMode::UniqueOnly => true,
949                _ => true,
950            };
951            if should_print {
952                write_count_line(writer, 1, data, term)?;
953            }
954            return Ok(());
955        }
956    };
957
958    let mut prev_start: usize = 0;
959    let mut prev_end: usize = first_term;
960    let mut prev_len = prev_end;
961    let mut count: u64 = 1;
962    let mut cur_start = first_term + 1;
963
964    while cur_start < data.len() {
965        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
966            Some(offset) => cur_start + offset,
967            None => data.len(),
968        };
969
970        let cur_len = cur_end - cur_start;
971        let is_dup = cur_len == prev_len
972            && lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]);
973
974        if is_dup {
975            count += 1;
976        } else {
977            // Output previous group with count
978            let should_print = match config.mode {
979                OutputMode::Default => true,
980                OutputMode::RepeatedOnly => count > 1,
981                OutputMode::UniqueOnly => count == 1,
982                _ => true,
983            };
984            if should_print {
985                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
986            }
987            prev_start = cur_start;
988            prev_end = cur_end;
989            prev_len = cur_len;
990            count = 1;
991        }
992
993        if cur_end < data.len() {
994            cur_start = cur_end + 1;
995        } else {
996            break;
997        }
998    }
999
1000    // Output last group
1001    let should_print = match config.mode {
1002        OutputMode::Default => true,
1003        OutputMode::RepeatedOnly => count > 1,
1004        OutputMode::UniqueOnly => count == 1,
1005        _ => true,
1006    };
1007    if should_print {
1008        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1009    }
1010
1011    Ok(())
1012}
1013
1014/// Fast single-pass for case-insensitive (-i) default mode.
1015/// Same logic as process_default_sequential but uses eq_ignore_ascii_case.
1016fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
1017    let mut prev_start: usize = 0;
1018    let mut prev_end: usize;
1019
1020    match memchr::memchr(term, data) {
1021        Some(pos) => {
1022            prev_end = pos;
1023        }
1024        None => {
1025            writer.write_all(data)?;
1026            return writer.write_all(&[term]);
1027        }
1028    }
1029
1030    // Write first line
1031    writer.write_all(&data[..prev_end + 1])?;
1032
1033    let mut cur_start = prev_end + 1;
1034
1035    while cur_start < data.len() {
1036        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1037            Some(offset) => cur_start + offset,
1038            None => data.len(),
1039        };
1040
1041        let prev_content = &data[prev_start..prev_end];
1042        let cur_content = &data[cur_start..cur_end];
1043
1044        if !lines_equal_case_insensitive(prev_content, cur_content) {
1045            // Different line — write it
1046            if cur_end < data.len() {
1047                writer.write_all(&data[cur_start..cur_end + 1])?;
1048            } else {
1049                writer.write_all(&data[cur_start..cur_end])?;
1050                writer.write_all(&[term])?;
1051            }
1052            prev_start = cur_start;
1053            prev_end = cur_end;
1054        }
1055
1056        if cur_end < data.len() {
1057            cur_start = cur_end + 1;
1058        } else {
1059            break;
1060        }
1061    }
1062
1063    Ok(())
1064}
1065
1066/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
1067fn process_filter_ci_singlepass(
1068    data: &[u8],
1069    writer: &mut impl Write,
1070    config: &UniqConfig,
1071    term: u8,
1072) -> io::Result<()> {
1073    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
1074
1075    let first_term = match memchr::memchr(term, data) {
1076        Some(pos) => pos,
1077        None => {
1078            if !repeated {
1079                writer.write_all(data)?;
1080                writer.write_all(&[term])?;
1081            }
1082            return Ok(());
1083        }
1084    };
1085
1086    let mut prev_start: usize = 0;
1087    let mut prev_end: usize = first_term;
1088    let mut count: u64 = 1;
1089    let mut cur_start = first_term + 1;
1090
1091    while cur_start < data.len() {
1092        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1093            Some(offset) => cur_start + offset,
1094            None => data.len(),
1095        };
1096
1097        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1098            count += 1;
1099        } else {
1100            let should_print = if repeated { count > 1 } else { count == 1 };
1101            if should_print {
1102                if prev_end < data.len() && data[prev_end] == term {
1103                    writer.write_all(&data[prev_start..prev_end + 1])?;
1104                } else {
1105                    writer.write_all(&data[prev_start..prev_end])?;
1106                    writer.write_all(&[term])?;
1107                }
1108            }
1109            prev_start = cur_start;
1110            prev_end = cur_end;
1111            count = 1;
1112        }
1113
1114        if cur_end < data.len() {
1115            cur_start = cur_end + 1;
1116        } else {
1117            break;
1118        }
1119    }
1120
1121    let should_print = if repeated { count > 1 } else { count == 1 };
1122    if should_print {
1123        if prev_end < data.len() && data[prev_end] == term {
1124            writer.write_all(&data[prev_start..prev_end + 1])?;
1125        } else {
1126            writer.write_all(&data[prev_start..prev_end])?;
1127            writer.write_all(&[term])?;
1128        }
1129    }
1130
1131    Ok(())
1132}
1133
1134/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1135fn process_count_ci_singlepass(
1136    data: &[u8],
1137    writer: &mut impl Write,
1138    config: &UniqConfig,
1139    term: u8,
1140) -> io::Result<()> {
1141    let first_term = match memchr::memchr(term, data) {
1142        Some(pos) => pos,
1143        None => {
1144            let should_print = match config.mode {
1145                OutputMode::Default => true,
1146                OutputMode::RepeatedOnly => false,
1147                OutputMode::UniqueOnly => true,
1148                _ => true,
1149            };
1150            if should_print {
1151                write_count_line(writer, 1, data, term)?;
1152            }
1153            return Ok(());
1154        }
1155    };
1156
1157    let mut prev_start: usize = 0;
1158    let mut prev_end: usize = first_term;
1159    let mut count: u64 = 1;
1160    let mut cur_start = first_term + 1;
1161
1162    while cur_start < data.len() {
1163        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1164            Some(offset) => cur_start + offset,
1165            None => data.len(),
1166        };
1167
1168        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1169            count += 1;
1170        } else {
1171            let should_print = match config.mode {
1172                OutputMode::Default => true,
1173                OutputMode::RepeatedOnly => count > 1,
1174                OutputMode::UniqueOnly => count == 1,
1175                _ => true,
1176            };
1177            if should_print {
1178                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1179            }
1180            prev_start = cur_start;
1181            prev_end = cur_end;
1182            count = 1;
1183        }
1184
1185        if cur_end < data.len() {
1186            cur_start = cur_end + 1;
1187        } else {
1188            break;
1189        }
1190    }
1191
1192    let should_print = match config.mode {
1193        OutputMode::Default => true,
1194        OutputMode::RepeatedOnly => count > 1,
1195        OutputMode::UniqueOnly => count == 1,
1196        _ => true,
1197    };
1198    if should_print {
1199        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1200    }
1201
1202    Ok(())
1203}
1204
1205/// Output a group for standard modes (bytes path).
1206#[inline(always)]
1207fn output_group_bytes(
1208    writer: &mut impl Write,
1209    content: &[u8],
1210    full: &[u8],
1211    count: u64,
1212    config: &UniqConfig,
1213    term: u8,
1214) -> io::Result<()> {
1215    let should_print = match config.mode {
1216        OutputMode::Default => true,
1217        OutputMode::RepeatedOnly => count > 1,
1218        OutputMode::UniqueOnly => count == 1,
1219        _ => true,
1220    };
1221
1222    if should_print {
1223        if config.count {
1224            write_count_line(writer, count, content, term)?;
1225        } else {
1226            writer.write_all(full)?;
1227            // Add terminator if the original line didn't have one
1228            if full.len() == content.len() {
1229                writer.write_all(&[term])?;
1230            }
1231        }
1232    }
1233
1234    Ok(())
1235}
1236
1237/// Process --all-repeated / -D mode on byte slices.
1238fn process_all_repeated_bytes(
1239    data: &[u8],
1240    writer: &mut impl Write,
1241    config: &UniqConfig,
1242    method: AllRepeatedMethod,
1243    term: u8,
1244) -> io::Result<()> {
1245    let mut lines = LineIter::new(data, term);
1246
1247    let first = match lines.next() {
1248        Some(v) => v,
1249        None => return Ok(()),
1250    };
1251
1252    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1253    // For all-repeated we need to buffer group lines since we only print if count > 1
1254    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1255    group_lines.push(first);
1256    let mut first_group_printed = false;
1257
1258    let fast = !needs_key_extraction(config) && !config.ignore_case;
1259
1260    for (cur_content, cur_full) in lines {
1261        let prev_content = group_lines.last().unwrap().0;
1262        let equal = if fast {
1263            lines_equal_fast(prev_content, cur_content)
1264        } else {
1265            lines_equal(prev_content, cur_content, config)
1266        };
1267
1268        if equal {
1269            group_lines.push((cur_content, cur_full));
1270        } else {
1271            // Flush group
1272            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1273            group_lines.clear();
1274            group_lines.push((cur_content, cur_full));
1275        }
1276    }
1277
1278    // Flush last group
1279    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1280
1281    Ok(())
1282}
1283
1284/// Flush a group for --all-repeated mode (bytes path).
1285fn flush_all_repeated_bytes(
1286    writer: &mut impl Write,
1287    group: &[(&[u8], &[u8])],
1288    method: AllRepeatedMethod,
1289    first_group_printed: &mut bool,
1290    term: u8,
1291) -> io::Result<()> {
1292    if group.len() <= 1 {
1293        return Ok(()); // Not a duplicate group
1294    }
1295
1296    match method {
1297        AllRepeatedMethod::Prepend => {
1298            writer.write_all(&[term])?;
1299        }
1300        AllRepeatedMethod::Separate => {
1301            if *first_group_printed {
1302                writer.write_all(&[term])?;
1303            }
1304        }
1305        AllRepeatedMethod::None => {}
1306    }
1307
1308    for &(content, full) in group {
1309        writer.write_all(full)?;
1310        if full.len() == content.len() {
1311            writer.write_all(&[term])?;
1312        }
1313    }
1314
1315    *first_group_printed = true;
1316    Ok(())
1317}
1318
1319/// Process --group mode on byte slices.
1320fn process_group_bytes(
1321    data: &[u8],
1322    writer: &mut impl Write,
1323    config: &UniqConfig,
1324    method: GroupMethod,
1325    term: u8,
1326) -> io::Result<()> {
1327    let mut lines = LineIter::new(data, term);
1328
1329    let (prev_content, prev_full) = match lines.next() {
1330        Some(v) => v,
1331        None => return Ok(()),
1332    };
1333
1334    // Prepend/Both: separator before first group
1335    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1336        writer.write_all(&[term])?;
1337    }
1338
1339    // Write first line
1340    writer.write_all(prev_full)?;
1341    if prev_full.len() == prev_content.len() {
1342        writer.write_all(&[term])?;
1343    }
1344
1345    let mut prev_content = prev_content;
1346    let fast = !needs_key_extraction(config) && !config.ignore_case;
1347
1348    for (cur_content, cur_full) in lines {
1349        let equal = if fast {
1350            lines_equal_fast(prev_content, cur_content)
1351        } else {
1352            lines_equal(prev_content, cur_content, config)
1353        };
1354
1355        if !equal {
1356            // New group — write separator
1357            writer.write_all(&[term])?;
1358        }
1359
1360        writer.write_all(cur_full)?;
1361        if cur_full.len() == cur_content.len() {
1362            writer.write_all(&[term])?;
1363        }
1364
1365        prev_content = cur_content;
1366    }
1367
1368    // Append/Both: separator after last group
1369    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1370        writer.write_all(&[term])?;
1371    }
1372
1373    Ok(())
1374}
1375
1376// ============================================================================
1377// Streaming processing (for stdin / pipe input)
1378// ============================================================================
1379
1380/// Main streaming uniq processor.
1381/// Reads from `input`, writes to `output`.
1382pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
1383    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
1384    let mut writer = BufWriter::with_capacity(32 * 1024 * 1024, output);
1385    let term = if config.zero_terminated { b'\0' } else { b'\n' };
1386
1387    match config.mode {
1388        OutputMode::Group(method) => {
1389            process_group_stream(reader, &mut writer, config, method, term)?;
1390        }
1391        OutputMode::AllRepeated(method) => {
1392            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
1393        }
1394        _ => {
1395            process_standard_stream(reader, &mut writer, config, term)?;
1396        }
1397    }
1398
1399    writer.flush()?;
1400    Ok(())
1401}
1402
1403/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
1404fn process_standard_stream<R: BufRead, W: Write>(
1405    mut reader: R,
1406    writer: &mut W,
1407    config: &UniqConfig,
1408    term: u8,
1409) -> io::Result<()> {
1410    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1411    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1412
1413    // Read first line
1414    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1415        return Ok(()); // empty input
1416    }
1417    let mut count: u64 = 1;
1418
1419    loop {
1420        current_line.clear();
1421        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1422
1423        if bytes_read == 0 {
1424            // End of input — output the last group
1425            output_group_stream(writer, &prev_line, count, config, term)?;
1426            break;
1427        }
1428
1429        if compare_lines_stream(&prev_line, &current_line, config, term) {
1430            count += 1;
1431        } else {
1432            output_group_stream(writer, &prev_line, count, config, term)?;
1433            std::mem::swap(&mut prev_line, &mut current_line);
1434            count = 1;
1435        }
1436    }
1437
1438    Ok(())
1439}
1440
1441/// Compare two lines (with terminators) in streaming mode.
1442#[inline(always)]
1443fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
1444    let a_stripped = strip_term(a, term);
1445    let b_stripped = strip_term(b, term);
1446    lines_equal(a_stripped, b_stripped, config)
1447}
1448
1449/// Strip terminator from end of line.
1450#[inline(always)]
1451fn strip_term(line: &[u8], term: u8) -> &[u8] {
1452    if line.last() == Some(&term) {
1453        &line[..line.len() - 1]
1454    } else {
1455        line
1456    }
1457}
1458
1459/// Output a group in streaming mode.
1460#[inline(always)]
1461fn output_group_stream(
1462    writer: &mut impl Write,
1463    line: &[u8],
1464    count: u64,
1465    config: &UniqConfig,
1466    term: u8,
1467) -> io::Result<()> {
1468    let should_print = match config.mode {
1469        OutputMode::Default => true,
1470        OutputMode::RepeatedOnly => count > 1,
1471        OutputMode::UniqueOnly => count == 1,
1472        _ => true,
1473    };
1474
1475    if should_print {
1476        let content = strip_term(line, term);
1477        if config.count {
1478            write_count_line(writer, count, content, term)?;
1479        } else {
1480            writer.write_all(content)?;
1481            writer.write_all(&[term])?;
1482        }
1483    }
1484
1485    Ok(())
1486}
1487
1488/// Process --all-repeated / -D mode (streaming).
1489fn process_all_repeated_stream<R: BufRead, W: Write>(
1490    mut reader: R,
1491    writer: &mut W,
1492    config: &UniqConfig,
1493    method: AllRepeatedMethod,
1494    term: u8,
1495) -> io::Result<()> {
1496    let mut group: Vec<Vec<u8>> = Vec::new();
1497    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1498    let mut first_group_printed = false;
1499
1500    current_line.clear();
1501    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
1502        return Ok(());
1503    }
1504    group.push(current_line.clone());
1505
1506    loop {
1507        current_line.clear();
1508        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1509
1510        if bytes_read == 0 {
1511            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1512            break;
1513        }
1514
1515        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
1516            group.push(current_line.clone());
1517        } else {
1518            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1519            group.clear();
1520            group.push(current_line.clone());
1521        }
1522    }
1523
1524    Ok(())
1525}
1526
1527/// Flush a group for --all-repeated mode (streaming).
1528fn flush_all_repeated_stream(
1529    writer: &mut impl Write,
1530    group: &[Vec<u8>],
1531    method: AllRepeatedMethod,
1532    first_group_printed: &mut bool,
1533    term: u8,
1534) -> io::Result<()> {
1535    if group.len() <= 1 {
1536        return Ok(());
1537    }
1538
1539    match method {
1540        AllRepeatedMethod::Prepend => {
1541            writer.write_all(&[term])?;
1542        }
1543        AllRepeatedMethod::Separate => {
1544            if *first_group_printed {
1545                writer.write_all(&[term])?;
1546            }
1547        }
1548        AllRepeatedMethod::None => {}
1549    }
1550
1551    for line in group {
1552        let content = strip_term(line, term);
1553        writer.write_all(content)?;
1554        writer.write_all(&[term])?;
1555    }
1556
1557    *first_group_printed = true;
1558    Ok(())
1559}
1560
1561/// Process --group mode (streaming).
1562fn process_group_stream<R: BufRead, W: Write>(
1563    mut reader: R,
1564    writer: &mut W,
1565    config: &UniqConfig,
1566    method: GroupMethod,
1567    term: u8,
1568) -> io::Result<()> {
1569    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1570    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1571
1572    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1573        return Ok(());
1574    }
1575
1576    // Prepend/Both: separator before first group
1577    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1578        writer.write_all(&[term])?;
1579    }
1580
1581    let content = strip_term(&prev_line, term);
1582    writer.write_all(content)?;
1583    writer.write_all(&[term])?;
1584
1585    loop {
1586        current_line.clear();
1587        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1588
1589        if bytes_read == 0 {
1590            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1591                writer.write_all(&[term])?;
1592            }
1593            break;
1594        }
1595
1596        if !compare_lines_stream(&prev_line, &current_line, config, term) {
1597            writer.write_all(&[term])?;
1598        }
1599
1600        let content = strip_term(&current_line, term);
1601        writer.write_all(content)?;
1602        writer.write_all(&[term])?;
1603
1604        std::mem::swap(&mut prev_line, &mut current_line);
1605    }
1606
1607    Ok(())
1608}
1609
1610/// Read a line terminated by the given byte (newline or NUL).
1611/// Returns number of bytes read (0 = EOF).
1612#[inline(always)]
1613fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1614    reader.read_until(term, buf)
1615}