Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Check if config requires field/char skipping or char limiting.
119#[inline(always)]
120fn needs_key_extraction(config: &UniqConfig) -> bool {
121    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
122}
123
124/// Fast path comparison: no field/char extraction needed, no case folding.
125/// Uses pointer+length equality shortcut and 8-byte prefix rejection.
126#[inline(always)]
127fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
128    let alen = a.len();
129    if alen != b.len() {
130        return false;
131    }
132    if alen == 0 {
133        return true;
134    }
135    // 8-byte prefix check: reject most non-equal lines without full memcmp
136    if alen >= 8 {
137        let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
138        let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
139        if a8 != b8 {
140            return false;
141        }
142    }
143    a == b
144}
145
146/// Write a count-prefixed line in GNU uniq format.
147/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
148/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
149#[inline(always)]
150fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
151    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
152    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
153    let digits = itoa_right_aligned_into(&mut prefix, count);
154    let width = digits.max(7); // minimum 7 chars
155    let prefix_len = width + 1; // +1 for trailing space
156    prefix[width] = b' ';
157
158    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
159    let total = prefix_len + line.len() + 1;
160    if total <= 256 {
161        let mut buf = [0u8; 256];
162        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
163        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
164        buf[prefix_len + line.len()] = term;
165        out.write_all(&buf[..total])
166    } else {
167        out.write_all(&prefix[..prefix_len])?;
168        out.write_all(line)?;
169        out.write_all(&[term])
170    }
171}
172
173/// Write u64 decimal right-aligned into prefix buffer.
174/// Buffer is pre-filled with spaces. Returns number of digits written.
175#[inline(always)]
176fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
177    if val == 0 {
178        buf[6] = b'0';
179        return 7; // 6 spaces + '0' = 7 chars
180    }
181    // Write digits right-to-left from position 27 (leaving room for trailing space)
182    let mut pos = 27;
183    while val > 0 {
184        pos -= 1;
185        buf[pos] = b'0' + (val % 10) as u8;
186        val /= 10;
187    }
188    let num_digits = 27 - pos;
189    if num_digits >= 7 {
190        // Number is wide enough, shift to front
191        buf.copy_within(pos..27, 0);
192        num_digits
193    } else {
194        // Right-align in 7-char field: spaces then digits
195        let pad = 7 - num_digits;
196        buf.copy_within(pos..27, pad);
197        // buf[0..pad] is already spaces from initialization
198        7
199    }
200}
201
202// ============================================================================
203// High-performance mmap-based processing (for byte slices, zero-copy)
204// ============================================================================
205
206/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
207pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
208    // 16MB output buffer for fewer flush syscalls on large inputs
209    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
210    let term = if config.zero_terminated { b'\0' } else { b'\n' };
211
212    match config.mode {
213        OutputMode::Group(method) => {
214            process_group_bytes(data, &mut writer, config, method, term)?;
215        }
216        OutputMode::AllRepeated(method) => {
217            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
218        }
219        _ => {
220            process_standard_bytes(data, &mut writer, config, term)?;
221        }
222    }
223
224    writer.flush()?;
225    Ok(())
226}
227
228/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
229/// Uses memchr for SIMD-accelerated line boundary detection.
230struct LineIter<'a> {
231    data: &'a [u8],
232    pos: usize,
233    term: u8,
234}
235
236impl<'a> LineIter<'a> {
237    #[inline(always)]
238    fn new(data: &'a [u8], term: u8) -> Self {
239        Self { data, pos: 0, term }
240    }
241}
242
243impl<'a> Iterator for LineIter<'a> {
244    /// (line content without terminator, full line including terminator for output)
245    type Item = (&'a [u8], &'a [u8]);
246
247    #[inline(always)]
248    fn next(&mut self) -> Option<Self::Item> {
249        if self.pos >= self.data.len() {
250            return None;
251        }
252
253        let remaining = &self.data[self.pos..];
254        match memchr::memchr(self.term, remaining) {
255            Some(idx) => {
256                let line_start = self.pos;
257                let line_end = self.pos + idx; // without terminator
258                let full_end = self.pos + idx + 1; // with terminator
259                self.pos = full_end;
260                Some((
261                    &self.data[line_start..line_end],
262                    &self.data[line_start..full_end],
263                ))
264            }
265            None => {
266                // Last line without terminator
267                let line_start = self.pos;
268                self.pos = self.data.len();
269                let line = &self.data[line_start..];
270                Some((line, line))
271            }
272        }
273    }
274}
275
276/// Get line content (without terminator) from pre-computed positions.
277/// `content_end` is the end of actual content (excludes trailing terminator if present).
278#[inline(always)]
279fn line_content_at<'a>(
280    data: &'a [u8],
281    line_starts: &[usize],
282    idx: usize,
283    content_end: usize,
284) -> &'a [u8] {
285    let start = line_starts[idx];
286    let end = if idx + 1 < line_starts.len() {
287        line_starts[idx + 1] - 1 // exclude terminator
288    } else {
289        content_end // last line: pre-computed to exclude trailing terminator
290    };
291    &data[start..end]
292}
293
294/// Get full line (with terminator) from pre-computed positions.
295#[inline(always)]
296fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
297    let start = line_starts[idx];
298    let end = if idx + 1 < line_starts.len() {
299        line_starts[idx + 1] // include terminator
300    } else {
301        data.len()
302    };
303    &data[start..end]
304}
305
306/// Linear scan for the end of a duplicate group.
307/// Returns the index of the first line that differs from line_starts[group_start].
308/// Must use linear scan (not binary search) because uniq input may NOT be sorted —
309/// equal lines can appear in non-adjacent groups separated by different lines.
310#[inline]
311fn linear_scan_group_end(
312    data: &[u8],
313    line_starts: &[usize],
314    group_start: usize,
315    num_lines: usize,
316    content_end: usize,
317) -> usize {
318    let key = line_content_at(data, line_starts, group_start, content_end);
319    let mut i = group_start + 1;
320    while i < num_lines {
321        if !lines_equal_fast(key, line_content_at(data, line_starts, i, content_end)) {
322            return i;
323        }
324        i += 1;
325    }
326    i
327}
328
329/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
330/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
331/// General path: pre-computed line positions with binary search for groups.
332fn process_standard_bytes(
333    data: &[u8],
334    writer: &mut impl Write,
335    config: &UniqConfig,
336    term: u8,
337) -> io::Result<()> {
338    if data.is_empty() {
339        return Ok(());
340    }
341
342    let fast = !needs_key_extraction(config) && !config.ignore_case;
343
344    // Ultra-fast path: default mode, no count, no key extraction.
345    // Single-pass: scan with memchr, compare adjacent lines inline.
346    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
347    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
348        return process_default_fast_singlepass(data, writer, term);
349    }
350
351    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
352    if fast
353        && !config.count
354        && matches!(
355            config.mode,
356            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
357        )
358    {
359        return process_filter_fast_singlepass(data, writer, config, term);
360    }
361
362    // General path: pre-computed line positions for binary search on groups
363    let estimated_lines = (data.len() / 40).max(64);
364    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
365    line_starts.push(0);
366    for pos in memchr::memchr_iter(term, data) {
367        if pos + 1 < data.len() {
368            line_starts.push(pos + 1);
369        }
370    }
371    let num_lines = line_starts.len();
372    if num_lines == 0 {
373        return Ok(());
374    }
375
376    // Pre-compute content end: if data ends with terminator, exclude it for last line
377    let content_end = if data.last() == Some(&term) {
378        data.len() - 1
379    } else {
380        data.len()
381    };
382
383    // Ultra-fast path: default mode, no count, no key extraction
384    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
385        // Write first line
386        let first_full = line_full_at(data, &line_starts, 0);
387        let first_content = line_content_at(data, &line_starts, 0, content_end);
388        write_all_raw(writer, first_full)?;
389        if first_full.len() == first_content.len() {
390            writer.write_all(&[term])?;
391        }
392
393        let mut i = 1;
394        while i < num_lines {
395            let prev = line_content_at(data, &line_starts, i - 1, content_end);
396            let cur = line_content_at(data, &line_starts, i, content_end);
397
398            if lines_equal_fast(prev, cur) {
399                // Duplicate detected — linear scan for end of group
400                let group_end =
401                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
402                i = group_end;
403                continue;
404            }
405
406            // Unique line — write it
407            let cur_full = line_full_at(data, &line_starts, i);
408            write_all_raw(writer, cur_full)?;
409            if cur_full.len() == cur.len() {
410                writer.write_all(&[term])?;
411            }
412            i += 1;
413        }
414        return Ok(());
415    }
416
417    // General path with count tracking
418    let mut i = 0;
419    while i < num_lines {
420        let content = line_content_at(data, &line_starts, i, content_end);
421        let full = line_full_at(data, &line_starts, i);
422
423        let group_end = if fast
424            && i + 1 < num_lines
425            && lines_equal_fast(
426                content,
427                line_content_at(data, &line_starts, i + 1, content_end),
428            ) {
429            // Duplicate detected — linear scan for end
430            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
431        } else if !fast
432            && i + 1 < num_lines
433            && lines_equal(
434                content,
435                line_content_at(data, &line_starts, i + 1, content_end),
436                config,
437            )
438        {
439            // Slow path linear scan with key extraction
440            let mut j = i + 2;
441            while j < num_lines {
442                if !lines_equal(
443                    content,
444                    line_content_at(data, &line_starts, j, content_end),
445                    config,
446                ) {
447                    break;
448                }
449                j += 1;
450            }
451            j
452        } else {
453            i + 1
454        };
455
456        let count = (group_end - i) as u64;
457        output_group_bytes(writer, content, full, count, config, term)?;
458        i = group_end;
459    }
460
461    Ok(())
462}
463
464/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
465/// No pre-computed positions, no binary search, no Vec allocation.
466/// Outputs each line that differs from the previous.
467///
468/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
469/// independently, then cross-chunk boundaries are resolved.
470fn process_default_fast_singlepass(
471    data: &[u8],
472    writer: &mut impl Write,
473    term: u8,
474) -> io::Result<()> {
475    // Parallel path for large files
476    if data.len() >= 4 * 1024 * 1024 {
477        return process_default_parallel(data, writer, term);
478    }
479
480    process_default_sequential(data, writer, term)
481}
482
483/// Sequential single-pass dedup with zero-copy output.
484/// Instead of copying data to a buffer, tracks contiguous output runs and writes
485/// directly from the original data. For all-unique data, this is a single write_all.
486fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
487    let mut prev_start: usize = 0;
488    let mut prev_end: usize; // exclusive, without terminator
489
490    // Find end of first line
491    match memchr::memchr(term, data) {
492        Some(pos) => {
493            prev_end = pos;
494        }
495        None => {
496            // Single line, no terminator
497            writer.write_all(data)?;
498            return writer.write_all(&[term]);
499        }
500    }
501
502    // run_start tracks the beginning of the current contiguous output region.
503    // When a duplicate is found, we flush the run up to the duplicate and skip it.
504    let mut run_start: usize = 0;
505    let mut cur_start = prev_end + 1;
506    let mut last_output_end = prev_end + 1; // exclusive end including terminator
507
508    while cur_start < data.len() {
509        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
510            Some(offset) => cur_start + offset,
511            None => data.len(), // last line without terminator
512        };
513
514        let prev_content = &data[prev_start..prev_end];
515        let cur_content = &data[cur_start..cur_end];
516
517        if lines_equal_fast(prev_content, cur_content) {
518            // Duplicate — flush the current run up to this line, then skip it
519            if run_start < cur_start {
520                writer.write_all(&data[run_start..cur_start])?;
521            }
522            // Start new run after this duplicate
523            if cur_end < data.len() {
524                run_start = cur_end + 1;
525            } else {
526                run_start = cur_end;
527            }
528        } else {
529            // Different line — extend the current run
530            prev_start = cur_start;
531            prev_end = cur_end;
532            last_output_end = if cur_end < data.len() {
533                cur_end + 1
534            } else {
535                cur_end
536            };
537        }
538
539        if cur_end < data.len() {
540            cur_start = cur_end + 1;
541        } else {
542            break;
543        }
544    }
545
546    // Flush remaining run
547    if run_start < data.len() {
548        writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
549    }
550
551    // Ensure trailing terminator
552    if !data.is_empty() && *data.last().unwrap() != term {
553        writer.write_all(&[term])?;
554    }
555
556    Ok(())
557}
558
559/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
560/// positions in each chunk in parallel, then write output runs directly from
561/// the original data. No per-chunk buffer allocation needed.
562fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
563    use rayon::prelude::*;
564
565    let num_threads = rayon::current_num_threads().max(1);
566    let chunk_target = data.len() / num_threads;
567
568    // Find chunk boundaries aligned to line terminators
569    let mut boundaries = Vec::with_capacity(num_threads + 1);
570    boundaries.push(0usize);
571    for i in 1..num_threads {
572        let target = i * chunk_target;
573        if target >= data.len() {
574            break;
575        }
576        if let Some(p) = memchr::memchr(term, &data[target..]) {
577            let b = target + p + 1;
578            if b > *boundaries.last().unwrap() && b <= data.len() {
579                boundaries.push(b);
580            }
581        }
582    }
583    boundaries.push(data.len());
584
585    let n_chunks = boundaries.len() - 1;
586    if n_chunks <= 1 {
587        return process_default_sequential(data, writer, term);
588    }
589
590    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
591    struct ChunkResult {
592        /// Byte ranges in the original data to output (contiguous runs)
593        runs: Vec<(usize, usize)>,
594        /// First line in chunk (absolute offsets into data, content without term)
595        first_line_start: usize,
596        first_line_end: usize,
597        /// Last *output* line in chunk (content without term)
598        last_line_start: usize,
599        last_line_end: usize,
600    }
601
602    let results: Vec<ChunkResult> = boundaries
603        .windows(2)
604        .collect::<Vec<_>>()
605        .par_iter()
606        .map(|w| {
607            let chunk_start = w[0];
608            let chunk_end = w[1];
609            let chunk = &data[chunk_start..chunk_end];
610
611            let first_term = match memchr::memchr(term, chunk) {
612                Some(pos) => pos,
613                None => {
614                    return ChunkResult {
615                        runs: vec![(chunk_start, chunk_end)],
616                        first_line_start: chunk_start,
617                        first_line_end: chunk_end,
618                        last_line_start: chunk_start,
619                        last_line_end: chunk_end,
620                    };
621                }
622            };
623
624            let first_line_start = chunk_start;
625            let first_line_end = chunk_start + first_term;
626
627            let mut runs: Vec<(usize, usize)> = Vec::new();
628            let mut run_start = chunk_start;
629            let mut prev_start = 0usize;
630            let mut prev_end = first_term;
631            let mut last_out_start = chunk_start;
632            let mut last_out_end = first_line_end;
633
634            let mut cur_start = first_term + 1;
635            while cur_start < chunk.len() {
636                let cur_end = match memchr::memchr(term, &chunk[cur_start..]) {
637                    Some(offset) => cur_start + offset,
638                    None => chunk.len(),
639                };
640
641                if lines_equal_fast(&chunk[prev_start..prev_end], &chunk[cur_start..cur_end]) {
642                    // Duplicate — flush current run up to this line
643                    let abs_cur = chunk_start + cur_start;
644                    if run_start < abs_cur {
645                        runs.push((run_start, abs_cur));
646                    }
647                    // New run starts after this duplicate
648                    run_start = chunk_start
649                        + if cur_end < chunk.len() {
650                            cur_end + 1
651                        } else {
652                            cur_end
653                        };
654                } else {
655                    last_out_start = chunk_start + cur_start;
656                    last_out_end = chunk_start + cur_end;
657                }
658                prev_start = cur_start;
659                prev_end = cur_end;
660
661                if cur_end < chunk.len() {
662                    cur_start = cur_end + 1;
663                } else {
664                    break;
665                }
666            }
667
668            // Close final run
669            if run_start < chunk_end {
670                runs.push((run_start, chunk_end));
671            }
672
673            ChunkResult {
674                runs,
675                first_line_start,
676                first_line_end,
677                last_line_start: last_out_start,
678                last_line_end: last_out_end,
679            }
680        })
681        .collect();
682
683    // Write results, adjusting cross-chunk boundaries
684    for (i, result) in results.iter().enumerate() {
685        let skip_first = if i > 0 {
686            let prev = &results[i - 1];
687            let prev_last = &data[prev.last_line_start..prev.last_line_end];
688            let cur_first = &data[result.first_line_start..result.first_line_end];
689            lines_equal_fast(prev_last, cur_first)
690        } else {
691            false
692        };
693
694        let skip_end = if skip_first {
695            // Skip bytes up to and including the first line's terminator
696            result.first_line_end + 1
697        } else {
698            0
699        };
700
701        for &(rs, re) in &result.runs {
702            let actual_start = rs.max(skip_end);
703            if actual_start < re {
704                writer.write_all(&data[actual_start..re])?;
705            }
706        }
707    }
708
709    // Ensure trailing terminator
710    if !data.is_empty() && *data.last().unwrap() != term {
711        writer.write_all(&[term])?;
712    }
713
714    Ok(())
715}
716
717/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
718fn process_filter_fast_singlepass(
719    data: &[u8],
720    writer: &mut impl Write,
721    config: &UniqConfig,
722    term: u8,
723) -> io::Result<()> {
724    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
725    let mut outbuf = Vec::with_capacity(data.len() / 2);
726
727    let prev_start: usize = 0;
728    let prev_end: usize = match memchr::memchr(term, data) {
729        Some(pos) => pos,
730        None => {
731            // Single line: unique (count=1)
732            if !repeated {
733                outbuf.extend_from_slice(data);
734                outbuf.push(term);
735            }
736            return writer.write_all(&outbuf);
737        }
738    };
739
740    let mut prev_start_mut = prev_start;
741    let mut prev_end_mut = prev_end;
742    let mut count: u64 = 1;
743    let mut cur_start = prev_end + 1;
744
745    while cur_start < data.len() {
746        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
747            Some(offset) => cur_start + offset,
748            None => data.len(),
749        };
750
751        let prev_content = &data[prev_start_mut..prev_end_mut];
752        let cur_content = &data[cur_start..cur_end];
753
754        if lines_equal_fast(prev_content, cur_content) {
755            count += 1;
756        } else {
757            // Output previous group based on mode
758            let should_print = if repeated { count > 1 } else { count == 1 };
759            if should_print {
760                if prev_end_mut < data.len() && data.get(prev_end_mut) == Some(&term) {
761                    outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut + 1]);
762                } else {
763                    outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut]);
764                    outbuf.push(term);
765                }
766            }
767            prev_start_mut = cur_start;
768            prev_end_mut = cur_end;
769            count = 1;
770        }
771
772        if cur_end < data.len() {
773            cur_start = cur_end + 1;
774        } else {
775            break;
776        }
777    }
778
779    // Output last group
780    let should_print = if repeated { count > 1 } else { count == 1 };
781    if should_print {
782        if prev_end_mut < data.len() && data.get(prev_end_mut) == Some(&term) {
783            outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut + 1]);
784        } else {
785            outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut]);
786            outbuf.push(term);
787        }
788    }
789
790    writer.write_all(&outbuf)
791}
792
793/// Output a group for standard modes (bytes path).
794#[inline(always)]
795fn output_group_bytes(
796    writer: &mut impl Write,
797    content: &[u8],
798    full: &[u8],
799    count: u64,
800    config: &UniqConfig,
801    term: u8,
802) -> io::Result<()> {
803    let should_print = match config.mode {
804        OutputMode::Default => true,
805        OutputMode::RepeatedOnly => count > 1,
806        OutputMode::UniqueOnly => count == 1,
807        _ => true,
808    };
809
810    if should_print {
811        if config.count {
812            write_count_line(writer, count, content, term)?;
813        } else {
814            writer.write_all(full)?;
815            // Add terminator if the original line didn't have one
816            if full.len() == content.len() {
817                writer.write_all(&[term])?;
818            }
819        }
820    }
821
822    Ok(())
823}
824
825/// Process --all-repeated / -D mode on byte slices.
826fn process_all_repeated_bytes(
827    data: &[u8],
828    writer: &mut impl Write,
829    config: &UniqConfig,
830    method: AllRepeatedMethod,
831    term: u8,
832) -> io::Result<()> {
833    let mut lines = LineIter::new(data, term);
834
835    let first = match lines.next() {
836        Some(v) => v,
837        None => return Ok(()),
838    };
839
840    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
841    // For all-repeated we need to buffer group lines since we only print if count > 1
842    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
843    group_lines.push(first);
844    let mut first_group_printed = false;
845
846    let fast = !needs_key_extraction(config) && !config.ignore_case;
847
848    for (cur_content, cur_full) in lines {
849        let prev_content = group_lines.last().unwrap().0;
850        let equal = if fast {
851            lines_equal_fast(prev_content, cur_content)
852        } else {
853            lines_equal(prev_content, cur_content, config)
854        };
855
856        if equal {
857            group_lines.push((cur_content, cur_full));
858        } else {
859            // Flush group
860            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
861            group_lines.clear();
862            group_lines.push((cur_content, cur_full));
863        }
864    }
865
866    // Flush last group
867    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
868
869    Ok(())
870}
871
872/// Flush a group for --all-repeated mode (bytes path).
873fn flush_all_repeated_bytes(
874    writer: &mut impl Write,
875    group: &[(&[u8], &[u8])],
876    method: AllRepeatedMethod,
877    first_group_printed: &mut bool,
878    term: u8,
879) -> io::Result<()> {
880    if group.len() <= 1 {
881        return Ok(()); // Not a duplicate group
882    }
883
884    match method {
885        AllRepeatedMethod::Prepend => {
886            writer.write_all(&[term])?;
887        }
888        AllRepeatedMethod::Separate => {
889            if *first_group_printed {
890                writer.write_all(&[term])?;
891            }
892        }
893        AllRepeatedMethod::None => {}
894    }
895
896    for &(content, full) in group {
897        writer.write_all(full)?;
898        if full.len() == content.len() {
899            writer.write_all(&[term])?;
900        }
901    }
902
903    *first_group_printed = true;
904    Ok(())
905}
906
907/// Process --group mode on byte slices.
908fn process_group_bytes(
909    data: &[u8],
910    writer: &mut impl Write,
911    config: &UniqConfig,
912    method: GroupMethod,
913    term: u8,
914) -> io::Result<()> {
915    let mut lines = LineIter::new(data, term);
916
917    let (prev_content, prev_full) = match lines.next() {
918        Some(v) => v,
919        None => return Ok(()),
920    };
921
922    // Prepend/Both: separator before first group
923    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
924        writer.write_all(&[term])?;
925    }
926
927    // Write first line
928    writer.write_all(prev_full)?;
929    if prev_full.len() == prev_content.len() {
930        writer.write_all(&[term])?;
931    }
932
933    let mut prev_content = prev_content;
934    let fast = !needs_key_extraction(config) && !config.ignore_case;
935
936    for (cur_content, cur_full) in lines {
937        let equal = if fast {
938            lines_equal_fast(prev_content, cur_content)
939        } else {
940            lines_equal(prev_content, cur_content, config)
941        };
942
943        if !equal {
944            // New group — write separator
945            writer.write_all(&[term])?;
946        }
947
948        writer.write_all(cur_full)?;
949        if cur_full.len() == cur_content.len() {
950            writer.write_all(&[term])?;
951        }
952
953        prev_content = cur_content;
954    }
955
956    // Append/Both: separator after last group
957    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
958        writer.write_all(&[term])?;
959    }
960
961    Ok(())
962}
963
964// ============================================================================
965// Streaming processing (for stdin / pipe input)
966// ============================================================================
967
968/// Main streaming uniq processor.
969/// Reads from `input`, writes to `output`.
970pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
971    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
972    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
973    let term = if config.zero_terminated { b'\0' } else { b'\n' };
974
975    match config.mode {
976        OutputMode::Group(method) => {
977            process_group_stream(reader, &mut writer, config, method, term)?;
978        }
979        OutputMode::AllRepeated(method) => {
980            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
981        }
982        _ => {
983            process_standard_stream(reader, &mut writer, config, term)?;
984        }
985    }
986
987    writer.flush()?;
988    Ok(())
989}
990
991/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
992fn process_standard_stream<R: BufRead, W: Write>(
993    mut reader: R,
994    writer: &mut W,
995    config: &UniqConfig,
996    term: u8,
997) -> io::Result<()> {
998    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
999    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1000
1001    // Read first line
1002    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1003        return Ok(()); // empty input
1004    }
1005    let mut count: u64 = 1;
1006
1007    loop {
1008        current_line.clear();
1009        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1010
1011        if bytes_read == 0 {
1012            // End of input — output the last group
1013            output_group_stream(writer, &prev_line, count, config, term)?;
1014            break;
1015        }
1016
1017        if compare_lines_stream(&prev_line, &current_line, config, term) {
1018            count += 1;
1019        } else {
1020            output_group_stream(writer, &prev_line, count, config, term)?;
1021            std::mem::swap(&mut prev_line, &mut current_line);
1022            count = 1;
1023        }
1024    }
1025
1026    Ok(())
1027}
1028
1029/// Compare two lines (with terminators) in streaming mode.
1030#[inline(always)]
1031fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
1032    let a_stripped = strip_term(a, term);
1033    let b_stripped = strip_term(b, term);
1034    lines_equal(a_stripped, b_stripped, config)
1035}
1036
1037/// Strip terminator from end of line.
1038#[inline(always)]
1039fn strip_term(line: &[u8], term: u8) -> &[u8] {
1040    if line.last() == Some(&term) {
1041        &line[..line.len() - 1]
1042    } else {
1043        line
1044    }
1045}
1046
1047/// Output a group in streaming mode.
1048#[inline(always)]
1049fn output_group_stream(
1050    writer: &mut impl Write,
1051    line: &[u8],
1052    count: u64,
1053    config: &UniqConfig,
1054    term: u8,
1055) -> io::Result<()> {
1056    let should_print = match config.mode {
1057        OutputMode::Default => true,
1058        OutputMode::RepeatedOnly => count > 1,
1059        OutputMode::UniqueOnly => count == 1,
1060        _ => true,
1061    };
1062
1063    if should_print {
1064        let content = strip_term(line, term);
1065        if config.count {
1066            write_count_line(writer, count, content, term)?;
1067        } else {
1068            writer.write_all(content)?;
1069            writer.write_all(&[term])?;
1070        }
1071    }
1072
1073    Ok(())
1074}
1075
1076/// Process --all-repeated / -D mode (streaming).
1077fn process_all_repeated_stream<R: BufRead, W: Write>(
1078    mut reader: R,
1079    writer: &mut W,
1080    config: &UniqConfig,
1081    method: AllRepeatedMethod,
1082    term: u8,
1083) -> io::Result<()> {
1084    let mut group: Vec<Vec<u8>> = Vec::new();
1085    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1086    let mut first_group_printed = false;
1087
1088    current_line.clear();
1089    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
1090        return Ok(());
1091    }
1092    group.push(current_line.clone());
1093
1094    loop {
1095        current_line.clear();
1096        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1097
1098        if bytes_read == 0 {
1099            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1100            break;
1101        }
1102
1103        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
1104            group.push(current_line.clone());
1105        } else {
1106            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1107            group.clear();
1108            group.push(current_line.clone());
1109        }
1110    }
1111
1112    Ok(())
1113}
1114
1115/// Flush a group for --all-repeated mode (streaming).
1116fn flush_all_repeated_stream(
1117    writer: &mut impl Write,
1118    group: &[Vec<u8>],
1119    method: AllRepeatedMethod,
1120    first_group_printed: &mut bool,
1121    term: u8,
1122) -> io::Result<()> {
1123    if group.len() <= 1 {
1124        return Ok(());
1125    }
1126
1127    match method {
1128        AllRepeatedMethod::Prepend => {
1129            writer.write_all(&[term])?;
1130        }
1131        AllRepeatedMethod::Separate => {
1132            if *first_group_printed {
1133                writer.write_all(&[term])?;
1134            }
1135        }
1136        AllRepeatedMethod::None => {}
1137    }
1138
1139    for line in group {
1140        let content = strip_term(line, term);
1141        writer.write_all(content)?;
1142        writer.write_all(&[term])?;
1143    }
1144
1145    *first_group_printed = true;
1146    Ok(())
1147}
1148
1149/// Process --group mode (streaming).
1150fn process_group_stream<R: BufRead, W: Write>(
1151    mut reader: R,
1152    writer: &mut W,
1153    config: &UniqConfig,
1154    method: GroupMethod,
1155    term: u8,
1156) -> io::Result<()> {
1157    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1158    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1159
1160    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1161        return Ok(());
1162    }
1163
1164    // Prepend/Both: separator before first group
1165    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1166        writer.write_all(&[term])?;
1167    }
1168
1169    let content = strip_term(&prev_line, term);
1170    writer.write_all(content)?;
1171    writer.write_all(&[term])?;
1172
1173    loop {
1174        current_line.clear();
1175        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1176
1177        if bytes_read == 0 {
1178            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1179                writer.write_all(&[term])?;
1180            }
1181            break;
1182        }
1183
1184        if !compare_lines_stream(&prev_line, &current_line, config, term) {
1185            writer.write_all(&[term])?;
1186        }
1187
1188        let content = strip_term(&current_line, term);
1189        writer.write_all(content)?;
1190        writer.write_all(&[term])?;
1191
1192        std::mem::swap(&mut prev_line, &mut current_line);
1193    }
1194
1195    Ok(())
1196}
1197
1198/// Read a line terminated by the given byte (newline or NUL).
1199/// Returns number of bytes read (0 = EOF).
1200#[inline(always)]
1201fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1202    reader.read_until(term, buf)
1203}