Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Check if config requires field/char skipping or char limiting.
119#[inline(always)]
120fn needs_key_extraction(config: &UniqConfig) -> bool {
121    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
122}
123
124/// Fast path comparison: no field/char extraction needed, no case folding.
125/// Uses pointer+length equality shortcut and 8-byte prefix rejection.
126#[inline(always)]
127fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
128    let alen = a.len();
129    if alen != b.len() {
130        return false;
131    }
132    if alen == 0 {
133        return true;
134    }
135    // 8-byte prefix check: reject most non-equal lines without full memcmp
136    if alen >= 8 {
137        let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
138        let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
139        if a8 != b8 {
140            return false;
141        }
142    }
143    a == b
144}
145
146/// Write a count-prefixed line in GNU uniq format.
147/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
148/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
149#[inline(always)]
150fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
151    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
152    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
153    let digits = itoa_right_aligned_into(&mut prefix, count);
154    let width = digits.max(7); // minimum 7 chars
155    let prefix_len = width + 1; // +1 for trailing space
156    prefix[width] = b' ';
157
158    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
159    let total = prefix_len + line.len() + 1;
160    if total <= 256 {
161        let mut buf = [0u8; 256];
162        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
163        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
164        buf[prefix_len + line.len()] = term;
165        out.write_all(&buf[..total])
166    } else {
167        out.write_all(&prefix[..prefix_len])?;
168        out.write_all(line)?;
169        out.write_all(&[term])
170    }
171}
172
173/// Write u64 decimal right-aligned into prefix buffer.
174/// Buffer is pre-filled with spaces. Returns number of digits written.
175#[inline(always)]
176fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
177    if val == 0 {
178        buf[6] = b'0';
179        return 7; // 6 spaces + '0' = 7 chars
180    }
181    // Write digits right-to-left from position 27 (leaving room for trailing space)
182    let mut pos = 27;
183    while val > 0 {
184        pos -= 1;
185        buf[pos] = b'0' + (val % 10) as u8;
186        val /= 10;
187    }
188    let num_digits = 27 - pos;
189    if num_digits >= 7 {
190        // Number is wide enough, shift to front
191        buf.copy_within(pos..27, 0);
192        num_digits
193    } else {
194        // Right-align in 7-char field: spaces then digits
195        let pad = 7 - num_digits;
196        buf.copy_within(pos..27, pad);
197        // buf[0..pad] is already spaces from initialization
198        7
199    }
200}
201
202// ============================================================================
203// High-performance mmap-based processing (for byte slices, zero-copy)
204// ============================================================================
205
206/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
207pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
208    // 16MB output buffer for fewer flush syscalls on large inputs
209    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
210    let term = if config.zero_terminated { b'\0' } else { b'\n' };
211
212    match config.mode {
213        OutputMode::Group(method) => {
214            process_group_bytes(data, &mut writer, config, method, term)?;
215        }
216        OutputMode::AllRepeated(method) => {
217            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
218        }
219        _ => {
220            process_standard_bytes(data, &mut writer, config, term)?;
221        }
222    }
223
224    writer.flush()?;
225    Ok(())
226}
227
228/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
229/// Uses memchr for SIMD-accelerated line boundary detection.
230struct LineIter<'a> {
231    data: &'a [u8],
232    pos: usize,
233    term: u8,
234}
235
236impl<'a> LineIter<'a> {
237    #[inline(always)]
238    fn new(data: &'a [u8], term: u8) -> Self {
239        Self { data, pos: 0, term }
240    }
241}
242
243impl<'a> Iterator for LineIter<'a> {
244    /// (line content without terminator, full line including terminator for output)
245    type Item = (&'a [u8], &'a [u8]);
246
247    #[inline(always)]
248    fn next(&mut self) -> Option<Self::Item> {
249        if self.pos >= self.data.len() {
250            return None;
251        }
252
253        let remaining = &self.data[self.pos..];
254        match memchr::memchr(self.term, remaining) {
255            Some(idx) => {
256                let line_start = self.pos;
257                let line_end = self.pos + idx; // without terminator
258                let full_end = self.pos + idx + 1; // with terminator
259                self.pos = full_end;
260                Some((
261                    &self.data[line_start..line_end],
262                    &self.data[line_start..full_end],
263                ))
264            }
265            None => {
266                // Last line without terminator
267                let line_start = self.pos;
268                self.pos = self.data.len();
269                let line = &self.data[line_start..];
270                Some((line, line))
271            }
272        }
273    }
274}
275
276/// Get line content (without terminator) from pre-computed positions.
277#[inline(always)]
278fn line_content_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
279    let start = line_starts[idx];
280    let end = if idx + 1 < line_starts.len() {
281        line_starts[idx + 1] - 1 // exclude terminator
282    } else {
283        data.len() // last line (may not have terminator)
284    };
285    &data[start..end]
286}
287
288/// Get full line (with terminator) from pre-computed positions.
289#[inline(always)]
290fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
291    let start = line_starts[idx];
292    let end = if idx + 1 < line_starts.len() {
293        line_starts[idx + 1] // include terminator
294    } else {
295        data.len()
296    };
297    &data[start..end]
298}
299
300/// Binary search for the end of a duplicate group.
301/// Returns the index of the first line that differs from line_starts[group_start].
302/// All lines from group_start..result are equal.
303#[inline]
304fn binary_search_group_end(
305    data: &[u8],
306    line_starts: &[usize],
307    group_start: usize,
308    num_lines: usize,
309) -> usize {
310    let key = line_content_at(data, line_starts, group_start);
311    let mut lo = group_start + 1;
312    let mut hi = num_lines;
313    while lo < hi {
314        let mid = lo + (hi - lo) / 2;
315        if lines_equal_fast(key, line_content_at(data, line_starts, mid)) {
316            lo = mid + 1;
317        } else {
318            hi = mid;
319        }
320    }
321    lo
322}
323
324/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
325/// Uses pre-computed line positions for faster iteration and binary search
326/// for duplicate group sizes on repetitive data.
327fn process_standard_bytes(
328    data: &[u8],
329    writer: &mut impl Write,
330    config: &UniqConfig,
331    term: u8,
332) -> io::Result<()> {
333    if data.is_empty() {
334        return Ok(());
335    }
336
337    let fast = !needs_key_extraction(config) && !config.ignore_case;
338
339    // Pre-compute all line start positions in a single SIMD pass.
340    // memchr_iter precomputes SIMD state once (vs per-call recomputation in LineIter).
341    let estimated_lines = (data.len() / 40).max(64);
342    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
343    line_starts.push(0); // first line starts at offset 0
344    for pos in memchr::memchr_iter(term, data) {
345        if pos + 1 < data.len() {
346            line_starts.push(pos + 1);
347        }
348    }
349    let num_lines = line_starts.len();
350    if num_lines == 0 {
351        return Ok(());
352    }
353
354    // Ultra-fast path: default mode, no count, no key extraction
355    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
356        // Write first line
357        let first_full = line_full_at(data, &line_starts, 0);
358        let first_content = line_content_at(data, &line_starts, 0);
359        write_all_raw(writer, first_full)?;
360        if first_full.len() == first_content.len() {
361            writer.write_all(&[term])?;
362        }
363
364        let mut i = 1;
365        while i < num_lines {
366            let prev = line_content_at(data, &line_starts, i - 1);
367            let cur = line_content_at(data, &line_starts, i);
368
369            if lines_equal_fast(prev, cur) {
370                // Duplicate detected — binary search for end of group
371                let group_end = binary_search_group_end(data, &line_starts, i - 1, num_lines);
372                i = group_end;
373                continue;
374            }
375
376            // Unique line — write it
377            let cur_full = line_full_at(data, &line_starts, i);
378            write_all_raw(writer, cur_full)?;
379            if cur_full.len() == cur.len() {
380                writer.write_all(&[term])?;
381            }
382            i += 1;
383        }
384        return Ok(());
385    }
386
387    // General path with count tracking + binary search for duplicate groups
388    let mut i = 0;
389    while i < num_lines {
390        let content = line_content_at(data, &line_starts, i);
391        let full = line_full_at(data, &line_starts, i);
392
393        // Find group size: check next line, if equal use binary search
394        let group_end = if fast
395            && i + 1 < num_lines
396            && lines_equal_fast(content, line_content_at(data, &line_starts, i + 1))
397        {
398            // Duplicate detected — binary search for end
399            binary_search_group_end(data, &line_starts, i, num_lines)
400        } else if !fast
401            && i + 1 < num_lines
402            && lines_equal(content, line_content_at(data, &line_starts, i + 1), config)
403        {
404            // Slow path binary search with key extraction
405            let mut lo = i + 2;
406            let mut hi = num_lines;
407            while lo < hi {
408                let mid = lo + (hi - lo) / 2;
409                if lines_equal(content, line_content_at(data, &line_starts, mid), config) {
410                    lo = mid + 1;
411                } else {
412                    hi = mid;
413                }
414            }
415            lo
416        } else {
417            i + 1
418        };
419
420        let count = (group_end - i) as u64;
421        output_group_bytes(writer, content, full, count, config, term)?;
422        i = group_end;
423    }
424
425    Ok(())
426}
427
428/// Output a group for standard modes (bytes path).
429#[inline(always)]
430fn output_group_bytes(
431    writer: &mut impl Write,
432    content: &[u8],
433    full: &[u8],
434    count: u64,
435    config: &UniqConfig,
436    term: u8,
437) -> io::Result<()> {
438    let should_print = match config.mode {
439        OutputMode::Default => true,
440        OutputMode::RepeatedOnly => count > 1,
441        OutputMode::UniqueOnly => count == 1,
442        _ => true,
443    };
444
445    if should_print {
446        if config.count {
447            write_count_line(writer, count, content, term)?;
448        } else {
449            writer.write_all(full)?;
450            // Add terminator if the original line didn't have one
451            if full.len() == content.len() {
452                writer.write_all(&[term])?;
453            }
454        }
455    }
456
457    Ok(())
458}
459
460/// Process --all-repeated / -D mode on byte slices.
461fn process_all_repeated_bytes(
462    data: &[u8],
463    writer: &mut impl Write,
464    config: &UniqConfig,
465    method: AllRepeatedMethod,
466    term: u8,
467) -> io::Result<()> {
468    let mut lines = LineIter::new(data, term);
469
470    let first = match lines.next() {
471        Some(v) => v,
472        None => return Ok(()),
473    };
474
475    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
476    // For all-repeated we need to buffer group lines since we only print if count > 1
477    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
478    group_lines.push(first);
479    let mut first_group_printed = false;
480
481    let fast = !needs_key_extraction(config) && !config.ignore_case;
482
483    for (cur_content, cur_full) in lines {
484        let prev_content = group_lines.last().unwrap().0;
485        let equal = if fast {
486            lines_equal_fast(prev_content, cur_content)
487        } else {
488            lines_equal(prev_content, cur_content, config)
489        };
490
491        if equal {
492            group_lines.push((cur_content, cur_full));
493        } else {
494            // Flush group
495            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
496            group_lines.clear();
497            group_lines.push((cur_content, cur_full));
498        }
499    }
500
501    // Flush last group
502    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
503
504    Ok(())
505}
506
507/// Flush a group for --all-repeated mode (bytes path).
508fn flush_all_repeated_bytes(
509    writer: &mut impl Write,
510    group: &[(&[u8], &[u8])],
511    method: AllRepeatedMethod,
512    first_group_printed: &mut bool,
513    term: u8,
514) -> io::Result<()> {
515    if group.len() <= 1 {
516        return Ok(()); // Not a duplicate group
517    }
518
519    match method {
520        AllRepeatedMethod::Prepend => {
521            writer.write_all(&[term])?;
522        }
523        AllRepeatedMethod::Separate => {
524            if *first_group_printed {
525                writer.write_all(&[term])?;
526            }
527        }
528        AllRepeatedMethod::None => {}
529    }
530
531    for &(content, full) in group {
532        writer.write_all(full)?;
533        if full.len() == content.len() {
534            writer.write_all(&[term])?;
535        }
536    }
537
538    *first_group_printed = true;
539    Ok(())
540}
541
542/// Process --group mode on byte slices.
543fn process_group_bytes(
544    data: &[u8],
545    writer: &mut impl Write,
546    config: &UniqConfig,
547    method: GroupMethod,
548    term: u8,
549) -> io::Result<()> {
550    let mut lines = LineIter::new(data, term);
551
552    let (prev_content, prev_full) = match lines.next() {
553        Some(v) => v,
554        None => return Ok(()),
555    };
556
557    // Prepend/Both: separator before first group
558    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
559        writer.write_all(&[term])?;
560    }
561
562    // Write first line
563    writer.write_all(prev_full)?;
564    if prev_full.len() == prev_content.len() {
565        writer.write_all(&[term])?;
566    }
567
568    let mut prev_content = prev_content;
569    let fast = !needs_key_extraction(config) && !config.ignore_case;
570
571    for (cur_content, cur_full) in lines {
572        let equal = if fast {
573            lines_equal_fast(prev_content, cur_content)
574        } else {
575            lines_equal(prev_content, cur_content, config)
576        };
577
578        if !equal {
579            // New group — write separator
580            writer.write_all(&[term])?;
581        }
582
583        writer.write_all(cur_full)?;
584        if cur_full.len() == cur_content.len() {
585            writer.write_all(&[term])?;
586        }
587
588        prev_content = cur_content;
589    }
590
591    // Append/Both: separator after last group
592    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
593        writer.write_all(&[term])?;
594    }
595
596    Ok(())
597}
598
599// ============================================================================
600// Streaming processing (for stdin / pipe input)
601// ============================================================================
602
603/// Main streaming uniq processor.
604/// Reads from `input`, writes to `output`.
605pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
606    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
607    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
608    let term = if config.zero_terminated { b'\0' } else { b'\n' };
609
610    match config.mode {
611        OutputMode::Group(method) => {
612            process_group_stream(reader, &mut writer, config, method, term)?;
613        }
614        OutputMode::AllRepeated(method) => {
615            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
616        }
617        _ => {
618            process_standard_stream(reader, &mut writer, config, term)?;
619        }
620    }
621
622    writer.flush()?;
623    Ok(())
624}
625
626/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
627fn process_standard_stream<R: BufRead, W: Write>(
628    mut reader: R,
629    writer: &mut W,
630    config: &UniqConfig,
631    term: u8,
632) -> io::Result<()> {
633    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
634    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
635
636    // Read first line
637    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
638        return Ok(()); // empty input
639    }
640    let mut count: u64 = 1;
641
642    loop {
643        current_line.clear();
644        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
645
646        if bytes_read == 0 {
647            // End of input — output the last group
648            output_group_stream(writer, &prev_line, count, config, term)?;
649            break;
650        }
651
652        if compare_lines_stream(&prev_line, &current_line, config, term) {
653            count += 1;
654        } else {
655            output_group_stream(writer, &prev_line, count, config, term)?;
656            std::mem::swap(&mut prev_line, &mut current_line);
657            count = 1;
658        }
659    }
660
661    Ok(())
662}
663
664/// Compare two lines (with terminators) in streaming mode.
665#[inline(always)]
666fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
667    let a_stripped = strip_term(a, term);
668    let b_stripped = strip_term(b, term);
669    lines_equal(a_stripped, b_stripped, config)
670}
671
672/// Strip terminator from end of line.
673#[inline(always)]
674fn strip_term(line: &[u8], term: u8) -> &[u8] {
675    if line.last() == Some(&term) {
676        &line[..line.len() - 1]
677    } else {
678        line
679    }
680}
681
682/// Output a group in streaming mode.
683#[inline(always)]
684fn output_group_stream(
685    writer: &mut impl Write,
686    line: &[u8],
687    count: u64,
688    config: &UniqConfig,
689    term: u8,
690) -> io::Result<()> {
691    let should_print = match config.mode {
692        OutputMode::Default => true,
693        OutputMode::RepeatedOnly => count > 1,
694        OutputMode::UniqueOnly => count == 1,
695        _ => true,
696    };
697
698    if should_print {
699        let content = strip_term(line, term);
700        if config.count {
701            write_count_line(writer, count, content, term)?;
702        } else {
703            writer.write_all(content)?;
704            writer.write_all(&[term])?;
705        }
706    }
707
708    Ok(())
709}
710
711/// Process --all-repeated / -D mode (streaming).
712fn process_all_repeated_stream<R: BufRead, W: Write>(
713    mut reader: R,
714    writer: &mut W,
715    config: &UniqConfig,
716    method: AllRepeatedMethod,
717    term: u8,
718) -> io::Result<()> {
719    let mut group: Vec<Vec<u8>> = Vec::new();
720    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
721    let mut first_group_printed = false;
722
723    current_line.clear();
724    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
725        return Ok(());
726    }
727    group.push(current_line.clone());
728
729    loop {
730        current_line.clear();
731        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
732
733        if bytes_read == 0 {
734            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
735            break;
736        }
737
738        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
739            group.push(current_line.clone());
740        } else {
741            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
742            group.clear();
743            group.push(current_line.clone());
744        }
745    }
746
747    Ok(())
748}
749
750/// Flush a group for --all-repeated mode (streaming).
751fn flush_all_repeated_stream(
752    writer: &mut impl Write,
753    group: &[Vec<u8>],
754    method: AllRepeatedMethod,
755    first_group_printed: &mut bool,
756    term: u8,
757) -> io::Result<()> {
758    if group.len() <= 1 {
759        return Ok(());
760    }
761
762    match method {
763        AllRepeatedMethod::Prepend => {
764            writer.write_all(&[term])?;
765        }
766        AllRepeatedMethod::Separate => {
767            if *first_group_printed {
768                writer.write_all(&[term])?;
769            }
770        }
771        AllRepeatedMethod::None => {}
772    }
773
774    for line in group {
775        let content = strip_term(line, term);
776        writer.write_all(content)?;
777        writer.write_all(&[term])?;
778    }
779
780    *first_group_printed = true;
781    Ok(())
782}
783
784/// Process --group mode (streaming).
785fn process_group_stream<R: BufRead, W: Write>(
786    mut reader: R,
787    writer: &mut W,
788    config: &UniqConfig,
789    method: GroupMethod,
790    term: u8,
791) -> io::Result<()> {
792    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
793    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
794
795    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
796        return Ok(());
797    }
798
799    // Prepend/Both: separator before first group
800    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
801        writer.write_all(&[term])?;
802    }
803
804    let content = strip_term(&prev_line, term);
805    writer.write_all(content)?;
806    writer.write_all(&[term])?;
807
808    loop {
809        current_line.clear();
810        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
811
812        if bytes_read == 0 {
813            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
814                writer.write_all(&[term])?;
815            }
816            break;
817        }
818
819        if !compare_lines_stream(&prev_line, &current_line, config, term) {
820            writer.write_all(&[term])?;
821        }
822
823        let content = strip_term(&current_line, term);
824        writer.write_all(content)?;
825        writer.write_all(&[term])?;
826
827        std::mem::swap(&mut prev_line, &mut current_line);
828    }
829
830    Ok(())
831}
832
833/// Read a line terminated by the given byte (newline or NUL).
834/// Returns number of bytes read (0 = EOF).
835#[inline(always)]
836fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
837    reader.read_until(term, buf)
838}