Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Check if config requires field/char skipping or char limiting.
119#[inline(always)]
120fn needs_key_extraction(config: &UniqConfig) -> bool {
121    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
122}
123
124/// Fast path comparison: no field/char extraction needed, no case folding.
125/// Uses pointer+length equality shortcut and 8-byte prefix rejection.
126#[inline(always)]
127fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
128    let alen = a.len();
129    if alen != b.len() {
130        return false;
131    }
132    if alen == 0 {
133        return true;
134    }
135    // 8-byte prefix check: reject most non-equal lines without full memcmp
136    if alen >= 8 {
137        let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
138        let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
139        if a8 != b8 {
140            return false;
141        }
142    }
143    a == b
144}
145
146/// Write a count-prefixed line in GNU uniq format.
147/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
148/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
149#[inline(always)]
150fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
151    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
152    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
153    let digits = itoa_right_aligned_into(&mut prefix, count);
154    let width = digits.max(7); // minimum 7 chars
155    let prefix_len = width + 1; // +1 for trailing space
156    prefix[width] = b' ';
157
158    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
159    let total = prefix_len + line.len() + 1;
160    if total <= 256 {
161        let mut buf = [0u8; 256];
162        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
163        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
164        buf[prefix_len + line.len()] = term;
165        out.write_all(&buf[..total])
166    } else {
167        out.write_all(&prefix[..prefix_len])?;
168        out.write_all(line)?;
169        out.write_all(&[term])
170    }
171}
172
173/// Write u64 decimal right-aligned into prefix buffer.
174/// Buffer is pre-filled with spaces. Returns number of digits written.
175#[inline(always)]
176fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
177    if val == 0 {
178        buf[6] = b'0';
179        return 7; // 6 spaces + '0' = 7 chars
180    }
181    // Write digits right-to-left from position 27 (leaving room for trailing space)
182    let mut pos = 27;
183    while val > 0 {
184        pos -= 1;
185        buf[pos] = b'0' + (val % 10) as u8;
186        val /= 10;
187    }
188    let num_digits = 27 - pos;
189    if num_digits >= 7 {
190        // Number is wide enough, shift to front
191        buf.copy_within(pos..27, 0);
192        num_digits
193    } else {
194        // Right-align in 7-char field: spaces then digits
195        let pad = 7 - num_digits;
196        buf.copy_within(pos..27, pad);
197        // buf[0..pad] is already spaces from initialization
198        7
199    }
200}
201
202// ============================================================================
203// High-performance mmap-based processing (for byte slices, zero-copy)
204// ============================================================================
205
206/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
207pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
208    // 16MB output buffer for fewer flush syscalls on large inputs
209    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
210    let term = if config.zero_terminated { b'\0' } else { b'\n' };
211
212    match config.mode {
213        OutputMode::Group(method) => {
214            process_group_bytes(data, &mut writer, config, method, term)?;
215        }
216        OutputMode::AllRepeated(method) => {
217            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
218        }
219        _ => {
220            process_standard_bytes(data, &mut writer, config, term)?;
221        }
222    }
223
224    writer.flush()?;
225    Ok(())
226}
227
228/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
229/// Uses memchr for SIMD-accelerated line boundary detection.
230struct LineIter<'a> {
231    data: &'a [u8],
232    pos: usize,
233    term: u8,
234}
235
236impl<'a> LineIter<'a> {
237    #[inline(always)]
238    fn new(data: &'a [u8], term: u8) -> Self {
239        Self { data, pos: 0, term }
240    }
241}
242
243impl<'a> Iterator for LineIter<'a> {
244    /// (line content without terminator, full line including terminator for output)
245    type Item = (&'a [u8], &'a [u8]);
246
247    #[inline(always)]
248    fn next(&mut self) -> Option<Self::Item> {
249        if self.pos >= self.data.len() {
250            return None;
251        }
252
253        let remaining = &self.data[self.pos..];
254        match memchr::memchr(self.term, remaining) {
255            Some(idx) => {
256                let line_start = self.pos;
257                let line_end = self.pos + idx; // without terminator
258                let full_end = self.pos + idx + 1; // with terminator
259                self.pos = full_end;
260                Some((
261                    &self.data[line_start..line_end],
262                    &self.data[line_start..full_end],
263                ))
264            }
265            None => {
266                // Last line without terminator
267                let line_start = self.pos;
268                self.pos = self.data.len();
269                let line = &self.data[line_start..];
270                Some((line, line))
271            }
272        }
273    }
274}
275
276/// Get line content (without terminator) from pre-computed positions.
277/// `content_end` is the end of actual content (excludes trailing terminator if present).
278#[inline(always)]
279fn line_content_at<'a>(
280    data: &'a [u8],
281    line_starts: &[usize],
282    idx: usize,
283    content_end: usize,
284) -> &'a [u8] {
285    let start = line_starts[idx];
286    let end = if idx + 1 < line_starts.len() {
287        line_starts[idx + 1] - 1 // exclude terminator
288    } else {
289        content_end // last line: pre-computed to exclude trailing terminator
290    };
291    &data[start..end]
292}
293
294/// Get full line (with terminator) from pre-computed positions.
295#[inline(always)]
296fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
297    let start = line_starts[idx];
298    let end = if idx + 1 < line_starts.len() {
299        line_starts[idx + 1] // include terminator
300    } else {
301        data.len()
302    };
303    &data[start..end]
304}
305
306/// Linear scan for the end of a duplicate group.
307/// Returns the index of the first line that differs from line_starts[group_start].
308/// Must use linear scan (not binary search) because uniq input may NOT be sorted —
309/// equal lines can appear in non-adjacent groups separated by different lines.
310#[inline]
311fn linear_scan_group_end(
312    data: &[u8],
313    line_starts: &[usize],
314    group_start: usize,
315    num_lines: usize,
316    content_end: usize,
317) -> usize {
318    let key = line_content_at(data, line_starts, group_start, content_end);
319    let mut i = group_start + 1;
320    while i < num_lines {
321        if !lines_equal_fast(key, line_content_at(data, line_starts, i, content_end)) {
322            return i;
323        }
324        i += 1;
325    }
326    i
327}
328
329/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
330/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
331/// General path: pre-computed line positions with binary search for groups.
332fn process_standard_bytes(
333    data: &[u8],
334    writer: &mut impl Write,
335    config: &UniqConfig,
336    term: u8,
337) -> io::Result<()> {
338    if data.is_empty() {
339        return Ok(());
340    }
341
342    let fast = !needs_key_extraction(config) && !config.ignore_case;
343
344    // Ultra-fast path: default mode, no count, no key extraction.
345    // Single-pass: scan with memchr, compare adjacent lines inline.
346    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
347    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
348        return process_default_fast_singlepass(data, writer, term);
349    }
350
351    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
352    if fast
353        && !config.count
354        && matches!(
355            config.mode,
356            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
357        )
358    {
359        return process_filter_fast_singlepass(data, writer, config, term);
360    }
361
362    // General path: pre-computed line positions for binary search on groups
363    let estimated_lines = (data.len() / 40).max(64);
364    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
365    line_starts.push(0);
366    for pos in memchr::memchr_iter(term, data) {
367        if pos + 1 < data.len() {
368            line_starts.push(pos + 1);
369        }
370    }
371    let num_lines = line_starts.len();
372    if num_lines == 0 {
373        return Ok(());
374    }
375
376    // Pre-compute content end: if data ends with terminator, exclude it for last line
377    let content_end = if data.last() == Some(&term) {
378        data.len() - 1
379    } else {
380        data.len()
381    };
382
383    // Ultra-fast path: default mode, no count, no key extraction
384    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
385        // Write first line
386        let first_full = line_full_at(data, &line_starts, 0);
387        let first_content = line_content_at(data, &line_starts, 0, content_end);
388        write_all_raw(writer, first_full)?;
389        if first_full.len() == first_content.len() {
390            writer.write_all(&[term])?;
391        }
392
393        let mut i = 1;
394        while i < num_lines {
395            let prev = line_content_at(data, &line_starts, i - 1, content_end);
396            let cur = line_content_at(data, &line_starts, i, content_end);
397
398            if lines_equal_fast(prev, cur) {
399                // Duplicate detected — linear scan for end of group
400                let group_end =
401                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
402                i = group_end;
403                continue;
404            }
405
406            // Unique line — write it
407            let cur_full = line_full_at(data, &line_starts, i);
408            write_all_raw(writer, cur_full)?;
409            if cur_full.len() == cur.len() {
410                writer.write_all(&[term])?;
411            }
412            i += 1;
413        }
414        return Ok(());
415    }
416
417    // General path with count tracking
418    let mut i = 0;
419    while i < num_lines {
420        let content = line_content_at(data, &line_starts, i, content_end);
421        let full = line_full_at(data, &line_starts, i);
422
423        let group_end = if fast
424            && i + 1 < num_lines
425            && lines_equal_fast(
426                content,
427                line_content_at(data, &line_starts, i + 1, content_end),
428            ) {
429            // Duplicate detected — linear scan for end
430            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
431        } else if !fast
432            && i + 1 < num_lines
433            && lines_equal(
434                content,
435                line_content_at(data, &line_starts, i + 1, content_end),
436                config,
437            )
438        {
439            // Slow path linear scan with key extraction
440            let mut j = i + 2;
441            while j < num_lines {
442                if !lines_equal(
443                    content,
444                    line_content_at(data, &line_starts, j, content_end),
445                    config,
446                ) {
447                    break;
448                }
449                j += 1;
450            }
451            j
452        } else {
453            i + 1
454        };
455
456        let count = (group_end - i) as u64;
457        output_group_bytes(writer, content, full, count, config, term)?;
458        i = group_end;
459    }
460
461    Ok(())
462}
463
464/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
465/// No pre-computed positions, no binary search, no Vec allocation.
466/// Outputs each line that differs from the previous.
467fn process_default_fast_singlepass(
468    data: &[u8],
469    writer: &mut impl Write,
470    term: u8,
471) -> io::Result<()> {
472    // Batch output into a contiguous buffer for fewer write() syscalls.
473    let mut outbuf = Vec::with_capacity(data.len());
474
475    let mut prev_start: usize = 0;
476    let mut prev_end: usize; // exclusive, without terminator
477
478    // Find end of first line
479    match memchr::memchr(term, data) {
480        Some(pos) => {
481            prev_end = pos;
482            // Write first line (always output)
483            outbuf.extend_from_slice(&data[0..pos + 1]);
484        }
485        None => {
486            // Single line, no terminator
487            outbuf.extend_from_slice(data);
488            outbuf.push(term);
489            return writer.write_all(&outbuf);
490        }
491    }
492
493    let mut cur_start = prev_end + 1;
494
495    while cur_start < data.len() {
496        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
497            Some(offset) => cur_start + offset,
498            None => data.len(), // last line without terminator
499        };
500
501        let prev_content = &data[prev_start..prev_end];
502        let cur_content = &data[cur_start..cur_end];
503
504        if !lines_equal_fast(prev_content, cur_content) {
505            // Different line — output it
506            if cur_end < data.len() {
507                outbuf.extend_from_slice(&data[cur_start..cur_end + 1]);
508            } else {
509                outbuf.extend_from_slice(&data[cur_start..cur_end]);
510                outbuf.push(term);
511            }
512            prev_start = cur_start;
513            prev_end = cur_end;
514        }
515
516        if cur_end < data.len() {
517            cur_start = cur_end + 1;
518        } else {
519            break;
520        }
521    }
522
523    writer.write_all(&outbuf)
524}
525
526/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
527fn process_filter_fast_singlepass(
528    data: &[u8],
529    writer: &mut impl Write,
530    config: &UniqConfig,
531    term: u8,
532) -> io::Result<()> {
533    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
534    let mut outbuf = Vec::with_capacity(data.len() / 2);
535
536    let prev_start: usize = 0;
537    let prev_end: usize = match memchr::memchr(term, data) {
538        Some(pos) => pos,
539        None => {
540            // Single line: unique (count=1)
541            if !repeated {
542                outbuf.extend_from_slice(data);
543                outbuf.push(term);
544            }
545            return writer.write_all(&outbuf);
546        }
547    };
548
549    let mut prev_start_mut = prev_start;
550    let mut prev_end_mut = prev_end;
551    let mut count: u64 = 1;
552    let mut cur_start = prev_end + 1;
553
554    while cur_start < data.len() {
555        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
556            Some(offset) => cur_start + offset,
557            None => data.len(),
558        };
559
560        let prev_content = &data[prev_start_mut..prev_end_mut];
561        let cur_content = &data[cur_start..cur_end];
562
563        if lines_equal_fast(prev_content, cur_content) {
564            count += 1;
565        } else {
566            // Output previous group based on mode
567            let should_print = if repeated { count > 1 } else { count == 1 };
568            if should_print {
569                if prev_end_mut < data.len() && data.get(prev_end_mut) == Some(&term) {
570                    outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut + 1]);
571                } else {
572                    outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut]);
573                    outbuf.push(term);
574                }
575            }
576            prev_start_mut = cur_start;
577            prev_end_mut = cur_end;
578            count = 1;
579        }
580
581        if cur_end < data.len() {
582            cur_start = cur_end + 1;
583        } else {
584            break;
585        }
586    }
587
588    // Output last group
589    let should_print = if repeated { count > 1 } else { count == 1 };
590    if should_print {
591        if prev_end_mut < data.len() && data.get(prev_end_mut) == Some(&term) {
592            outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut + 1]);
593        } else {
594            outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut]);
595            outbuf.push(term);
596        }
597    }
598
599    writer.write_all(&outbuf)
600}
601
602/// Output a group for standard modes (bytes path).
603#[inline(always)]
604fn output_group_bytes(
605    writer: &mut impl Write,
606    content: &[u8],
607    full: &[u8],
608    count: u64,
609    config: &UniqConfig,
610    term: u8,
611) -> io::Result<()> {
612    let should_print = match config.mode {
613        OutputMode::Default => true,
614        OutputMode::RepeatedOnly => count > 1,
615        OutputMode::UniqueOnly => count == 1,
616        _ => true,
617    };
618
619    if should_print {
620        if config.count {
621            write_count_line(writer, count, content, term)?;
622        } else {
623            writer.write_all(full)?;
624            // Add terminator if the original line didn't have one
625            if full.len() == content.len() {
626                writer.write_all(&[term])?;
627            }
628        }
629    }
630
631    Ok(())
632}
633
634/// Process --all-repeated / -D mode on byte slices.
635fn process_all_repeated_bytes(
636    data: &[u8],
637    writer: &mut impl Write,
638    config: &UniqConfig,
639    method: AllRepeatedMethod,
640    term: u8,
641) -> io::Result<()> {
642    let mut lines = LineIter::new(data, term);
643
644    let first = match lines.next() {
645        Some(v) => v,
646        None => return Ok(()),
647    };
648
649    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
650    // For all-repeated we need to buffer group lines since we only print if count > 1
651    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
652    group_lines.push(first);
653    let mut first_group_printed = false;
654
655    let fast = !needs_key_extraction(config) && !config.ignore_case;
656
657    for (cur_content, cur_full) in lines {
658        let prev_content = group_lines.last().unwrap().0;
659        let equal = if fast {
660            lines_equal_fast(prev_content, cur_content)
661        } else {
662            lines_equal(prev_content, cur_content, config)
663        };
664
665        if equal {
666            group_lines.push((cur_content, cur_full));
667        } else {
668            // Flush group
669            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
670            group_lines.clear();
671            group_lines.push((cur_content, cur_full));
672        }
673    }
674
675    // Flush last group
676    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
677
678    Ok(())
679}
680
681/// Flush a group for --all-repeated mode (bytes path).
682fn flush_all_repeated_bytes(
683    writer: &mut impl Write,
684    group: &[(&[u8], &[u8])],
685    method: AllRepeatedMethod,
686    first_group_printed: &mut bool,
687    term: u8,
688) -> io::Result<()> {
689    if group.len() <= 1 {
690        return Ok(()); // Not a duplicate group
691    }
692
693    match method {
694        AllRepeatedMethod::Prepend => {
695            writer.write_all(&[term])?;
696        }
697        AllRepeatedMethod::Separate => {
698            if *first_group_printed {
699                writer.write_all(&[term])?;
700            }
701        }
702        AllRepeatedMethod::None => {}
703    }
704
705    for &(content, full) in group {
706        writer.write_all(full)?;
707        if full.len() == content.len() {
708            writer.write_all(&[term])?;
709        }
710    }
711
712    *first_group_printed = true;
713    Ok(())
714}
715
716/// Process --group mode on byte slices.
717fn process_group_bytes(
718    data: &[u8],
719    writer: &mut impl Write,
720    config: &UniqConfig,
721    method: GroupMethod,
722    term: u8,
723) -> io::Result<()> {
724    let mut lines = LineIter::new(data, term);
725
726    let (prev_content, prev_full) = match lines.next() {
727        Some(v) => v,
728        None => return Ok(()),
729    };
730
731    // Prepend/Both: separator before first group
732    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
733        writer.write_all(&[term])?;
734    }
735
736    // Write first line
737    writer.write_all(prev_full)?;
738    if prev_full.len() == prev_content.len() {
739        writer.write_all(&[term])?;
740    }
741
742    let mut prev_content = prev_content;
743    let fast = !needs_key_extraction(config) && !config.ignore_case;
744
745    for (cur_content, cur_full) in lines {
746        let equal = if fast {
747            lines_equal_fast(prev_content, cur_content)
748        } else {
749            lines_equal(prev_content, cur_content, config)
750        };
751
752        if !equal {
753            // New group — write separator
754            writer.write_all(&[term])?;
755        }
756
757        writer.write_all(cur_full)?;
758        if cur_full.len() == cur_content.len() {
759            writer.write_all(&[term])?;
760        }
761
762        prev_content = cur_content;
763    }
764
765    // Append/Both: separator after last group
766    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
767        writer.write_all(&[term])?;
768    }
769
770    Ok(())
771}
772
773// ============================================================================
774// Streaming processing (for stdin / pipe input)
775// ============================================================================
776
777/// Main streaming uniq processor.
778/// Reads from `input`, writes to `output`.
779pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
780    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
781    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
782    let term = if config.zero_terminated { b'\0' } else { b'\n' };
783
784    match config.mode {
785        OutputMode::Group(method) => {
786            process_group_stream(reader, &mut writer, config, method, term)?;
787        }
788        OutputMode::AllRepeated(method) => {
789            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
790        }
791        _ => {
792            process_standard_stream(reader, &mut writer, config, term)?;
793        }
794    }
795
796    writer.flush()?;
797    Ok(())
798}
799
800/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
801fn process_standard_stream<R: BufRead, W: Write>(
802    mut reader: R,
803    writer: &mut W,
804    config: &UniqConfig,
805    term: u8,
806) -> io::Result<()> {
807    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
808    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
809
810    // Read first line
811    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
812        return Ok(()); // empty input
813    }
814    let mut count: u64 = 1;
815
816    loop {
817        current_line.clear();
818        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
819
820        if bytes_read == 0 {
821            // End of input — output the last group
822            output_group_stream(writer, &prev_line, count, config, term)?;
823            break;
824        }
825
826        if compare_lines_stream(&prev_line, &current_line, config, term) {
827            count += 1;
828        } else {
829            output_group_stream(writer, &prev_line, count, config, term)?;
830            std::mem::swap(&mut prev_line, &mut current_line);
831            count = 1;
832        }
833    }
834
835    Ok(())
836}
837
838/// Compare two lines (with terminators) in streaming mode.
839#[inline(always)]
840fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
841    let a_stripped = strip_term(a, term);
842    let b_stripped = strip_term(b, term);
843    lines_equal(a_stripped, b_stripped, config)
844}
845
846/// Strip terminator from end of line.
847#[inline(always)]
848fn strip_term(line: &[u8], term: u8) -> &[u8] {
849    if line.last() == Some(&term) {
850        &line[..line.len() - 1]
851    } else {
852        line
853    }
854}
855
856/// Output a group in streaming mode.
857#[inline(always)]
858fn output_group_stream(
859    writer: &mut impl Write,
860    line: &[u8],
861    count: u64,
862    config: &UniqConfig,
863    term: u8,
864) -> io::Result<()> {
865    let should_print = match config.mode {
866        OutputMode::Default => true,
867        OutputMode::RepeatedOnly => count > 1,
868        OutputMode::UniqueOnly => count == 1,
869        _ => true,
870    };
871
872    if should_print {
873        let content = strip_term(line, term);
874        if config.count {
875            write_count_line(writer, count, content, term)?;
876        } else {
877            writer.write_all(content)?;
878            writer.write_all(&[term])?;
879        }
880    }
881
882    Ok(())
883}
884
885/// Process --all-repeated / -D mode (streaming).
886fn process_all_repeated_stream<R: BufRead, W: Write>(
887    mut reader: R,
888    writer: &mut W,
889    config: &UniqConfig,
890    method: AllRepeatedMethod,
891    term: u8,
892) -> io::Result<()> {
893    let mut group: Vec<Vec<u8>> = Vec::new();
894    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
895    let mut first_group_printed = false;
896
897    current_line.clear();
898    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
899        return Ok(());
900    }
901    group.push(current_line.clone());
902
903    loop {
904        current_line.clear();
905        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
906
907        if bytes_read == 0 {
908            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
909            break;
910        }
911
912        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
913            group.push(current_line.clone());
914        } else {
915            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
916            group.clear();
917            group.push(current_line.clone());
918        }
919    }
920
921    Ok(())
922}
923
924/// Flush a group for --all-repeated mode (streaming).
925fn flush_all_repeated_stream(
926    writer: &mut impl Write,
927    group: &[Vec<u8>],
928    method: AllRepeatedMethod,
929    first_group_printed: &mut bool,
930    term: u8,
931) -> io::Result<()> {
932    if group.len() <= 1 {
933        return Ok(());
934    }
935
936    match method {
937        AllRepeatedMethod::Prepend => {
938            writer.write_all(&[term])?;
939        }
940        AllRepeatedMethod::Separate => {
941            if *first_group_printed {
942                writer.write_all(&[term])?;
943            }
944        }
945        AllRepeatedMethod::None => {}
946    }
947
948    for line in group {
949        let content = strip_term(line, term);
950        writer.write_all(content)?;
951        writer.write_all(&[term])?;
952    }
953
954    *first_group_printed = true;
955    Ok(())
956}
957
958/// Process --group mode (streaming).
959fn process_group_stream<R: BufRead, W: Write>(
960    mut reader: R,
961    writer: &mut W,
962    config: &UniqConfig,
963    method: GroupMethod,
964    term: u8,
965) -> io::Result<()> {
966    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
967    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
968
969    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
970        return Ok(());
971    }
972
973    // Prepend/Both: separator before first group
974    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
975        writer.write_all(&[term])?;
976    }
977
978    let content = strip_term(&prev_line, term);
979    writer.write_all(content)?;
980    writer.write_all(&[term])?;
981
982    loop {
983        current_line.clear();
984        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
985
986        if bytes_read == 0 {
987            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
988                writer.write_all(&[term])?;
989            }
990            break;
991        }
992
993        if !compare_lines_stream(&prev_line, &current_line, config, term) {
994            writer.write_all(&[term])?;
995        }
996
997        let content = strip_term(&current_line, term);
998        writer.write_all(content)?;
999        writer.write_all(&[term])?;
1000
1001        std::mem::swap(&mut prev_line, &mut current_line);
1002    }
1003
1004    Ok(())
1005}
1006
1007/// Read a line terminated by the given byte (newline or NUL).
1008/// Returns number of bytes read (0 = EOF).
1009#[inline(always)]
1010fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1011    reader.read_until(term, buf)
1012}