Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Fast case-insensitive comparison: no field/char extraction, just case-insensitive.
119/// Uses length check + 8-byte prefix rejection before full comparison.
120#[inline(always)]
121fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
122    let alen = a.len();
123    if alen != b.len() {
124        return false;
125    }
126    if alen == 0 {
127        return true;
128    }
129    a.eq_ignore_ascii_case(b)
130}
131
132/// Check if config requires field/char skipping or char limiting.
133#[inline(always)]
134fn needs_key_extraction(config: &UniqConfig) -> bool {
135    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
136}
137
138/// Fast path comparison: no field/char extraction needed, no case folding.
139/// Uses pointer+length equality shortcut and 8-byte prefix rejection.
140#[inline(always)]
141fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
142    let alen = a.len();
143    if alen != b.len() {
144        return false;
145    }
146    if alen == 0 {
147        return true;
148    }
149    // 8-byte prefix check: reject most non-equal lines without full memcmp
150    if alen >= 8 {
151        let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
152        let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
153        if a8 != b8 {
154            return false;
155        }
156    }
157    a == b
158}
159
160/// Write a count-prefixed line in GNU uniq format.
161/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
162/// Combines prefix + line + term into a single write for short lines (< 240 bytes).
163#[inline(always)]
164fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
165    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
166    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
167    let digits = itoa_right_aligned_into(&mut prefix, count);
168    let width = digits.max(7); // minimum 7 chars
169    let prefix_len = width + 1; // +1 for trailing space
170    prefix[width] = b' ';
171
172    // Single write for short lines (common case) — avoids 3 separate BufWriter calls
173    let total = prefix_len + line.len() + 1;
174    if total <= 256 {
175        let mut buf = [0u8; 256];
176        buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
177        buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
178        buf[prefix_len + line.len()] = term;
179        out.write_all(&buf[..total])
180    } else {
181        out.write_all(&prefix[..prefix_len])?;
182        out.write_all(line)?;
183        out.write_all(&[term])
184    }
185}
186
187/// Write u64 decimal right-aligned into prefix buffer.
188/// Buffer is pre-filled with spaces. Returns number of digits written.
189#[inline(always)]
190fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
191    if val == 0 {
192        buf[6] = b'0';
193        return 7; // 6 spaces + '0' = 7 chars
194    }
195    // Write digits right-to-left from position 27 (leaving room for trailing space)
196    let mut pos = 27;
197    while val > 0 {
198        pos -= 1;
199        buf[pos] = b'0' + (val % 10) as u8;
200        val /= 10;
201    }
202    let num_digits = 27 - pos;
203    if num_digits >= 7 {
204        // Number is wide enough, shift to front
205        buf.copy_within(pos..27, 0);
206        num_digits
207    } else {
208        // Right-align in 7-char field: spaces then digits
209        let pad = 7 - num_digits;
210        buf.copy_within(pos..27, pad);
211        // buf[0..pad] is already spaces from initialization
212        7
213    }
214}
215
216// ============================================================================
217// High-performance mmap-based processing (for byte slices, zero-copy)
218// ============================================================================
219
220/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
221pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
222    // 16MB output buffer for fewer flush syscalls on large inputs
223    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
224    let term = if config.zero_terminated { b'\0' } else { b'\n' };
225
226    match config.mode {
227        OutputMode::Group(method) => {
228            process_group_bytes(data, &mut writer, config, method, term)?;
229        }
230        OutputMode::AllRepeated(method) => {
231            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
232        }
233        _ => {
234            process_standard_bytes(data, &mut writer, config, term)?;
235        }
236    }
237
238    writer.flush()?;
239    Ok(())
240}
241
242/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
243/// Uses memchr for SIMD-accelerated line boundary detection.
244struct LineIter<'a> {
245    data: &'a [u8],
246    pos: usize,
247    term: u8,
248}
249
250impl<'a> LineIter<'a> {
251    #[inline(always)]
252    fn new(data: &'a [u8], term: u8) -> Self {
253        Self { data, pos: 0, term }
254    }
255}
256
257impl<'a> Iterator for LineIter<'a> {
258    /// (line content without terminator, full line including terminator for output)
259    type Item = (&'a [u8], &'a [u8]);
260
261    #[inline(always)]
262    fn next(&mut self) -> Option<Self::Item> {
263        if self.pos >= self.data.len() {
264            return None;
265        }
266
267        let remaining = &self.data[self.pos..];
268        match memchr::memchr(self.term, remaining) {
269            Some(idx) => {
270                let line_start = self.pos;
271                let line_end = self.pos + idx; // without terminator
272                let full_end = self.pos + idx + 1; // with terminator
273                self.pos = full_end;
274                Some((
275                    &self.data[line_start..line_end],
276                    &self.data[line_start..full_end],
277                ))
278            }
279            None => {
280                // Last line without terminator
281                let line_start = self.pos;
282                self.pos = self.data.len();
283                let line = &self.data[line_start..];
284                Some((line, line))
285            }
286        }
287    }
288}
289
290/// Get line content (without terminator) from pre-computed positions.
291/// `content_end` is the end of actual content (excludes trailing terminator if present).
292#[inline(always)]
293fn line_content_at<'a>(
294    data: &'a [u8],
295    line_starts: &[usize],
296    idx: usize,
297    content_end: usize,
298) -> &'a [u8] {
299    let start = line_starts[idx];
300    let end = if idx + 1 < line_starts.len() {
301        line_starts[idx + 1] - 1 // exclude terminator
302    } else {
303        content_end // last line: pre-computed to exclude trailing terminator
304    };
305    &data[start..end]
306}
307
308/// Get full line (with terminator) from pre-computed positions.
309#[inline(always)]
310fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
311    let start = line_starts[idx];
312    let end = if idx + 1 < line_starts.len() {
313        line_starts[idx + 1] // include terminator
314    } else {
315        data.len()
316    };
317    &data[start..end]
318}
319
320/// Linear scan for the end of a duplicate group.
321/// Returns the index of the first line that differs from line_starts[group_start].
322/// Must use linear scan (not binary search) because uniq input may NOT be sorted —
323/// equal lines can appear in non-adjacent groups separated by different lines.
324#[inline]
325fn linear_scan_group_end(
326    data: &[u8],
327    line_starts: &[usize],
328    group_start: usize,
329    num_lines: usize,
330    content_end: usize,
331) -> usize {
332    let key = line_content_at(data, line_starts, group_start, content_end);
333    let mut i = group_start + 1;
334    while i < num_lines {
335        if !lines_equal_fast(key, line_content_at(data, line_starts, i, content_end)) {
336            return i;
337        }
338        i += 1;
339    }
340    i
341}
342
343/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
344/// Ultra-fast path: single-pass inline scanning with memchr, no line_starts Vec.
345/// General path: pre-computed line positions with binary search for groups.
346fn process_standard_bytes(
347    data: &[u8],
348    writer: &mut impl Write,
349    config: &UniqConfig,
350    term: u8,
351) -> io::Result<()> {
352    if data.is_empty() {
353        return Ok(());
354    }
355
356    let fast = !needs_key_extraction(config) && !config.ignore_case;
357    let fast_ci = !needs_key_extraction(config) && config.ignore_case;
358
359    // Ultra-fast path: default mode, no count, no key extraction.
360    // Single-pass: scan with memchr, compare adjacent lines inline.
361    // Avoids the 20MB+ line_starts allocation + cache misses from random access.
362    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
363        return process_default_fast_singlepass(data, writer, term);
364    }
365
366    // Ultra-fast path: repeated-only or unique-only, no count, no key extraction
367    if fast
368        && !config.count
369        && matches!(
370            config.mode,
371            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
372        )
373    {
374        return process_filter_fast_singlepass(data, writer, config, term);
375    }
376
377    // Ultra-fast path: count mode with no key extraction.
378    // Single-pass: scan with memchr, count groups inline, emit count-prefixed lines.
379    // Avoids the line_starts Vec allocation (20MB+ for large files).
380    if fast && config.count {
381        return process_count_fast_singlepass(data, writer, config, term);
382    }
383
384    // Fast path for case-insensitive (-i) mode with no key extraction.
385    // Single-pass: scan with memchr, compare adjacent lines with eq_ignore_ascii_case.
386    // Avoids the general path's line_starts Vec allocation.
387    if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
388        return process_default_ci_singlepass(data, writer, term);
389    }
390
391    if fast_ci
392        && !config.count
393        && matches!(
394            config.mode,
395            OutputMode::RepeatedOnly | OutputMode::UniqueOnly
396        )
397    {
398        return process_filter_ci_singlepass(data, writer, config, term);
399    }
400
401    if fast_ci && config.count {
402        return process_count_ci_singlepass(data, writer, config, term);
403    }
404
405    // General path: pre-computed line positions for binary search on groups
406    let estimated_lines = (data.len() / 40).max(64);
407    let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
408    line_starts.push(0);
409    for pos in memchr::memchr_iter(term, data) {
410        if pos + 1 < data.len() {
411            line_starts.push(pos + 1);
412        }
413    }
414    let num_lines = line_starts.len();
415    if num_lines == 0 {
416        return Ok(());
417    }
418
419    // Pre-compute content end: if data ends with terminator, exclude it for last line
420    let content_end = if data.last() == Some(&term) {
421        data.len() - 1
422    } else {
423        data.len()
424    };
425
426    // Ultra-fast path: default mode, no count, no key extraction
427    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
428        // Write first line
429        let first_full = line_full_at(data, &line_starts, 0);
430        let first_content = line_content_at(data, &line_starts, 0, content_end);
431        write_all_raw(writer, first_full)?;
432        if first_full.len() == first_content.len() {
433            writer.write_all(&[term])?;
434        }
435
436        let mut i = 1;
437        while i < num_lines {
438            let prev = line_content_at(data, &line_starts, i - 1, content_end);
439            let cur = line_content_at(data, &line_starts, i, content_end);
440
441            if lines_equal_fast(prev, cur) {
442                // Duplicate detected — linear scan for end of group
443                let group_end =
444                    linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
445                i = group_end;
446                continue;
447            }
448
449            // Unique line — write it
450            let cur_full = line_full_at(data, &line_starts, i);
451            write_all_raw(writer, cur_full)?;
452            if cur_full.len() == cur.len() {
453                writer.write_all(&[term])?;
454            }
455            i += 1;
456        }
457        return Ok(());
458    }
459
460    // General path with count tracking
461    let mut i = 0;
462    while i < num_lines {
463        let content = line_content_at(data, &line_starts, i, content_end);
464        let full = line_full_at(data, &line_starts, i);
465
466        let group_end = if fast
467            && i + 1 < num_lines
468            && lines_equal_fast(
469                content,
470                line_content_at(data, &line_starts, i + 1, content_end),
471            ) {
472            // Duplicate detected — linear scan for end
473            linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
474        } else if !fast
475            && i + 1 < num_lines
476            && lines_equal(
477                content,
478                line_content_at(data, &line_starts, i + 1, content_end),
479                config,
480            )
481        {
482            // Slow path linear scan with key extraction
483            let mut j = i + 2;
484            while j < num_lines {
485                if !lines_equal(
486                    content,
487                    line_content_at(data, &line_starts, j, content_end),
488                    config,
489                ) {
490                    break;
491                }
492                j += 1;
493            }
494            j
495        } else {
496            i + 1
497        };
498
499        let count = (group_end - i) as u64;
500        output_group_bytes(writer, content, full, count, config, term)?;
501        i = group_end;
502    }
503
504    Ok(())
505}
506
507/// Ultra-fast single-pass default mode: scan with memchr, compare adjacent lines inline.
508/// No pre-computed positions, no binary search, no Vec allocation.
509/// Outputs each line that differs from the previous.
510///
511/// For large files (>4MB), uses parallel chunk processing: each chunk is deduplicated
512/// independently, then cross-chunk boundaries are resolved.
513fn process_default_fast_singlepass(
514    data: &[u8],
515    writer: &mut impl Write,
516    term: u8,
517) -> io::Result<()> {
518    // Parallel path for large files
519    if data.len() >= 4 * 1024 * 1024 {
520        return process_default_parallel(data, writer, term);
521    }
522
523    process_default_sequential(data, writer, term)
524}
525
526/// Sequential single-pass dedup with zero-copy output.
527/// Instead of copying data to a buffer, tracks contiguous output runs and writes
528/// directly from the original data. For all-unique data, this is a single write_all.
529fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
530    let mut prev_start: usize = 0;
531    let mut prev_end: usize; // exclusive, without terminator
532
533    // Find end of first line
534    match memchr::memchr(term, data) {
535        Some(pos) => {
536            prev_end = pos;
537        }
538        None => {
539            // Single line, no terminator
540            writer.write_all(data)?;
541            return writer.write_all(&[term]);
542        }
543    }
544
545    // run_start tracks the beginning of the current contiguous output region.
546    // When a duplicate is found, we flush the run up to the duplicate and skip it.
547    let mut run_start: usize = 0;
548    let mut cur_start = prev_end + 1;
549    let mut last_output_end = prev_end + 1; // exclusive end including terminator
550
551    while cur_start < data.len() {
552        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
553            Some(offset) => cur_start + offset,
554            None => data.len(), // last line without terminator
555        };
556
557        let prev_content = &data[prev_start..prev_end];
558        let cur_content = &data[cur_start..cur_end];
559
560        if lines_equal_fast(prev_content, cur_content) {
561            // Duplicate — flush the current run up to this line, then skip it
562            if run_start < cur_start {
563                writer.write_all(&data[run_start..cur_start])?;
564            }
565            // Start new run after this duplicate
566            if cur_end < data.len() {
567                run_start = cur_end + 1;
568            } else {
569                run_start = cur_end;
570            }
571        } else {
572            // Different line — extend the current run
573            prev_start = cur_start;
574            prev_end = cur_end;
575            last_output_end = if cur_end < data.len() {
576                cur_end + 1
577            } else {
578                cur_end
579            };
580        }
581
582        if cur_end < data.len() {
583            cur_start = cur_end + 1;
584        } else {
585            break;
586        }
587    }
588
589    // Flush remaining run
590    if run_start < data.len() {
591        writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
592    }
593
594    // Ensure trailing terminator
595    if !data.is_empty() && *data.last().unwrap() != term {
596        writer.write_all(&[term])?;
597    }
598
599    Ok(())
600}
601
602/// Parallel zero-copy dedup for large files: split into chunks, find duplicate
603/// positions in each chunk in parallel, then write output runs directly from
604/// the original data. No per-chunk buffer allocation needed.
605fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
606    use rayon::prelude::*;
607
608    let num_threads = rayon::current_num_threads().max(1);
609    let chunk_target = data.len() / num_threads;
610
611    // Find chunk boundaries aligned to line terminators
612    let mut boundaries = Vec::with_capacity(num_threads + 1);
613    boundaries.push(0usize);
614    for i in 1..num_threads {
615        let target = i * chunk_target;
616        if target >= data.len() {
617            break;
618        }
619        if let Some(p) = memchr::memchr(term, &data[target..]) {
620            let b = target + p + 1;
621            if b > *boundaries.last().unwrap() && b <= data.len() {
622                boundaries.push(b);
623            }
624        }
625    }
626    boundaries.push(data.len());
627
628    let n_chunks = boundaries.len() - 1;
629    if n_chunks <= 1 {
630        return process_default_sequential(data, writer, term);
631    }
632
633    // Each chunk produces: output runs (zero-copy refs to data) + first/last line info
634    struct ChunkResult {
635        /// Byte ranges in the original data to output (contiguous runs)
636        runs: Vec<(usize, usize)>,
637        /// First line in chunk (absolute offsets into data, content without term)
638        first_line_start: usize,
639        first_line_end: usize,
640        /// Last *output* line in chunk (content without term)
641        last_line_start: usize,
642        last_line_end: usize,
643    }
644
645    let results: Vec<ChunkResult> = boundaries
646        .windows(2)
647        .collect::<Vec<_>>()
648        .par_iter()
649        .map(|w| {
650            let chunk_start = w[0];
651            let chunk_end = w[1];
652            let chunk = &data[chunk_start..chunk_end];
653
654            let first_term = match memchr::memchr(term, chunk) {
655                Some(pos) => pos,
656                None => {
657                    return ChunkResult {
658                        runs: vec![(chunk_start, chunk_end)],
659                        first_line_start: chunk_start,
660                        first_line_end: chunk_end,
661                        last_line_start: chunk_start,
662                        last_line_end: chunk_end,
663                    };
664                }
665            };
666
667            let first_line_start = chunk_start;
668            let first_line_end = chunk_start + first_term;
669
670            let mut runs: Vec<(usize, usize)> = Vec::new();
671            let mut run_start = chunk_start;
672            let mut prev_start = 0usize;
673            let mut prev_end = first_term;
674            let mut last_out_start = chunk_start;
675            let mut last_out_end = first_line_end;
676
677            let mut cur_start = first_term + 1;
678            while cur_start < chunk.len() {
679                let cur_end = match memchr::memchr(term, &chunk[cur_start..]) {
680                    Some(offset) => cur_start + offset,
681                    None => chunk.len(),
682                };
683
684                if lines_equal_fast(&chunk[prev_start..prev_end], &chunk[cur_start..cur_end]) {
685                    // Duplicate — flush current run up to this line
686                    let abs_cur = chunk_start + cur_start;
687                    if run_start < abs_cur {
688                        runs.push((run_start, abs_cur));
689                    }
690                    // New run starts after this duplicate
691                    run_start = chunk_start
692                        + if cur_end < chunk.len() {
693                            cur_end + 1
694                        } else {
695                            cur_end
696                        };
697                } else {
698                    last_out_start = chunk_start + cur_start;
699                    last_out_end = chunk_start + cur_end;
700                }
701                prev_start = cur_start;
702                prev_end = cur_end;
703
704                if cur_end < chunk.len() {
705                    cur_start = cur_end + 1;
706                } else {
707                    break;
708                }
709            }
710
711            // Close final run
712            if run_start < chunk_end {
713                runs.push((run_start, chunk_end));
714            }
715
716            ChunkResult {
717                runs,
718                first_line_start,
719                first_line_end,
720                last_line_start: last_out_start,
721                last_line_end: last_out_end,
722            }
723        })
724        .collect();
725
726    // Write results, adjusting cross-chunk boundaries
727    for (i, result) in results.iter().enumerate() {
728        let skip_first = if i > 0 {
729            let prev = &results[i - 1];
730            let prev_last = &data[prev.last_line_start..prev.last_line_end];
731            let cur_first = &data[result.first_line_start..result.first_line_end];
732            lines_equal_fast(prev_last, cur_first)
733        } else {
734            false
735        };
736
737        let skip_end = if skip_first {
738            // Skip bytes up to and including the first line's terminator
739            result.first_line_end + 1
740        } else {
741            0
742        };
743
744        for &(rs, re) in &result.runs {
745            let actual_start = rs.max(skip_end);
746            if actual_start < re {
747                writer.write_all(&data[actual_start..re])?;
748            }
749        }
750    }
751
752    // Ensure trailing terminator
753    if !data.is_empty() && *data.last().unwrap() != term {
754        writer.write_all(&[term])?;
755    }
756
757    Ok(())
758}
759
760/// Fast single-pass for RepeatedOnly (-d) and UniqueOnly (-u) modes.
761/// Zero-copy: writes directly from input data, no output buffer allocation.
762fn process_filter_fast_singlepass(
763    data: &[u8],
764    writer: &mut impl Write,
765    config: &UniqConfig,
766    term: u8,
767) -> io::Result<()> {
768    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
769
770    let first_term = match memchr::memchr(term, data) {
771        Some(pos) => pos,
772        None => {
773            // Single line: unique (count=1)
774            if !repeated {
775                writer.write_all(data)?;
776                writer.write_all(&[term])?;
777            }
778            return Ok(());
779        }
780    };
781
782    let mut prev_start: usize = 0;
783    let mut prev_end: usize = first_term;
784    let mut count: u64 = 1;
785    let mut cur_start = first_term + 1;
786
787    while cur_start < data.len() {
788        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
789            Some(offset) => cur_start + offset,
790            None => data.len(),
791        };
792
793        if lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
794            count += 1;
795        } else {
796            // Output previous group — write directly from input data (zero-copy)
797            let should_print = if repeated { count > 1 } else { count == 1 };
798            if should_print {
799                if prev_end < data.len() && data[prev_end] == term {
800                    writer.write_all(&data[prev_start..prev_end + 1])?;
801                } else {
802                    writer.write_all(&data[prev_start..prev_end])?;
803                    writer.write_all(&[term])?;
804                }
805            }
806            prev_start = cur_start;
807            prev_end = cur_end;
808            count = 1;
809        }
810
811        if cur_end < data.len() {
812            cur_start = cur_end + 1;
813        } else {
814            break;
815        }
816    }
817
818    // Output last group
819    let should_print = if repeated { count > 1 } else { count == 1 };
820    if should_print {
821        if prev_end < data.len() && data[prev_end] == term {
822            writer.write_all(&data[prev_start..prev_end + 1])?;
823        } else {
824            writer.write_all(&data[prev_start..prev_end])?;
825            writer.write_all(&[term])?;
826        }
827    }
828
829    Ok(())
830}
831
832/// Fast single-pass for count mode (-c) with all standard output modes.
833/// Zero line_starts allocation: scans with memchr, counts groups inline,
834/// and writes count-prefixed lines directly.
835fn process_count_fast_singlepass(
836    data: &[u8],
837    writer: &mut impl Write,
838    config: &UniqConfig,
839    term: u8,
840) -> io::Result<()> {
841    let first_term = match memchr::memchr(term, data) {
842        Some(pos) => pos,
843        None => {
844            // Single line: count=1
845            let should_print = match config.mode {
846                OutputMode::Default => true,
847                OutputMode::RepeatedOnly => false,
848                OutputMode::UniqueOnly => true,
849                _ => true,
850            };
851            if should_print {
852                write_count_line(writer, 1, data, term)?;
853            }
854            return Ok(());
855        }
856    };
857
858    let mut prev_start: usize = 0;
859    let mut prev_end: usize = first_term;
860    let mut count: u64 = 1;
861    let mut cur_start = first_term + 1;
862
863    while cur_start < data.len() {
864        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
865            Some(offset) => cur_start + offset,
866            None => data.len(),
867        };
868
869        if lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
870            count += 1;
871        } else {
872            // Output previous group with count
873            let should_print = match config.mode {
874                OutputMode::Default => true,
875                OutputMode::RepeatedOnly => count > 1,
876                OutputMode::UniqueOnly => count == 1,
877                _ => true,
878            };
879            if should_print {
880                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
881            }
882            prev_start = cur_start;
883            prev_end = cur_end;
884            count = 1;
885        }
886
887        if cur_end < data.len() {
888            cur_start = cur_end + 1;
889        } else {
890            break;
891        }
892    }
893
894    // Output last group
895    let should_print = match config.mode {
896        OutputMode::Default => true,
897        OutputMode::RepeatedOnly => count > 1,
898        OutputMode::UniqueOnly => count == 1,
899        _ => true,
900    };
901    if should_print {
902        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
903    }
904
905    Ok(())
906}
907
908/// Fast single-pass for case-insensitive (-i) default mode.
909/// Same logic as process_default_sequential but uses eq_ignore_ascii_case.
910fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
911    let mut prev_start: usize = 0;
912    let mut prev_end: usize;
913
914    match memchr::memchr(term, data) {
915        Some(pos) => {
916            prev_end = pos;
917        }
918        None => {
919            writer.write_all(data)?;
920            return writer.write_all(&[term]);
921        }
922    }
923
924    // Write first line
925    writer.write_all(&data[..prev_end + 1])?;
926
927    let mut cur_start = prev_end + 1;
928
929    while cur_start < data.len() {
930        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
931            Some(offset) => cur_start + offset,
932            None => data.len(),
933        };
934
935        let prev_content = &data[prev_start..prev_end];
936        let cur_content = &data[cur_start..cur_end];
937
938        if !lines_equal_case_insensitive(prev_content, cur_content) {
939            // Different line — write it
940            if cur_end < data.len() {
941                writer.write_all(&data[cur_start..cur_end + 1])?;
942            } else {
943                writer.write_all(&data[cur_start..cur_end])?;
944                writer.write_all(&[term])?;
945            }
946            prev_start = cur_start;
947            prev_end = cur_end;
948        }
949
950        if cur_end < data.len() {
951            cur_start = cur_end + 1;
952        } else {
953            break;
954        }
955    }
956
957    Ok(())
958}
959
960/// Fast single-pass for case-insensitive (-i) repeated/unique-only modes.
961fn process_filter_ci_singlepass(
962    data: &[u8],
963    writer: &mut impl Write,
964    config: &UniqConfig,
965    term: u8,
966) -> io::Result<()> {
967    let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
968
969    let first_term = match memchr::memchr(term, data) {
970        Some(pos) => pos,
971        None => {
972            if !repeated {
973                writer.write_all(data)?;
974                writer.write_all(&[term])?;
975            }
976            return Ok(());
977        }
978    };
979
980    let mut prev_start: usize = 0;
981    let mut prev_end: usize = first_term;
982    let mut count: u64 = 1;
983    let mut cur_start = first_term + 1;
984
985    while cur_start < data.len() {
986        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
987            Some(offset) => cur_start + offset,
988            None => data.len(),
989        };
990
991        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
992            count += 1;
993        } else {
994            let should_print = if repeated { count > 1 } else { count == 1 };
995            if should_print {
996                if prev_end < data.len() && data[prev_end] == term {
997                    writer.write_all(&data[prev_start..prev_end + 1])?;
998                } else {
999                    writer.write_all(&data[prev_start..prev_end])?;
1000                    writer.write_all(&[term])?;
1001                }
1002            }
1003            prev_start = cur_start;
1004            prev_end = cur_end;
1005            count = 1;
1006        }
1007
1008        if cur_end < data.len() {
1009            cur_start = cur_end + 1;
1010        } else {
1011            break;
1012        }
1013    }
1014
1015    let should_print = if repeated { count > 1 } else { count == 1 };
1016    if should_print {
1017        if prev_end < data.len() && data[prev_end] == term {
1018            writer.write_all(&data[prev_start..prev_end + 1])?;
1019        } else {
1020            writer.write_all(&data[prev_start..prev_end])?;
1021            writer.write_all(&[term])?;
1022        }
1023    }
1024
1025    Ok(())
1026}
1027
1028/// Fast single-pass for case-insensitive (-i) count (-c) mode.
1029fn process_count_ci_singlepass(
1030    data: &[u8],
1031    writer: &mut impl Write,
1032    config: &UniqConfig,
1033    term: u8,
1034) -> io::Result<()> {
1035    let first_term = match memchr::memchr(term, data) {
1036        Some(pos) => pos,
1037        None => {
1038            let should_print = match config.mode {
1039                OutputMode::Default => true,
1040                OutputMode::RepeatedOnly => false,
1041                OutputMode::UniqueOnly => true,
1042                _ => true,
1043            };
1044            if should_print {
1045                write_count_line(writer, 1, data, term)?;
1046            }
1047            return Ok(());
1048        }
1049    };
1050
1051    let mut prev_start: usize = 0;
1052    let mut prev_end: usize = first_term;
1053    let mut count: u64 = 1;
1054    let mut cur_start = first_term + 1;
1055
1056    while cur_start < data.len() {
1057        let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1058            Some(offset) => cur_start + offset,
1059            None => data.len(),
1060        };
1061
1062        if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1063            count += 1;
1064        } else {
1065            let should_print = match config.mode {
1066                OutputMode::Default => true,
1067                OutputMode::RepeatedOnly => count > 1,
1068                OutputMode::UniqueOnly => count == 1,
1069                _ => true,
1070            };
1071            if should_print {
1072                write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1073            }
1074            prev_start = cur_start;
1075            prev_end = cur_end;
1076            count = 1;
1077        }
1078
1079        if cur_end < data.len() {
1080            cur_start = cur_end + 1;
1081        } else {
1082            break;
1083        }
1084    }
1085
1086    let should_print = match config.mode {
1087        OutputMode::Default => true,
1088        OutputMode::RepeatedOnly => count > 1,
1089        OutputMode::UniqueOnly => count == 1,
1090        _ => true,
1091    };
1092    if should_print {
1093        write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1094    }
1095
1096    Ok(())
1097}
1098
1099/// Output a group for standard modes (bytes path).
1100#[inline(always)]
1101fn output_group_bytes(
1102    writer: &mut impl Write,
1103    content: &[u8],
1104    full: &[u8],
1105    count: u64,
1106    config: &UniqConfig,
1107    term: u8,
1108) -> io::Result<()> {
1109    let should_print = match config.mode {
1110        OutputMode::Default => true,
1111        OutputMode::RepeatedOnly => count > 1,
1112        OutputMode::UniqueOnly => count == 1,
1113        _ => true,
1114    };
1115
1116    if should_print {
1117        if config.count {
1118            write_count_line(writer, count, content, term)?;
1119        } else {
1120            writer.write_all(full)?;
1121            // Add terminator if the original line didn't have one
1122            if full.len() == content.len() {
1123                writer.write_all(&[term])?;
1124            }
1125        }
1126    }
1127
1128    Ok(())
1129}
1130
1131/// Process --all-repeated / -D mode on byte slices.
1132fn process_all_repeated_bytes(
1133    data: &[u8],
1134    writer: &mut impl Write,
1135    config: &UniqConfig,
1136    method: AllRepeatedMethod,
1137    term: u8,
1138) -> io::Result<()> {
1139    let mut lines = LineIter::new(data, term);
1140
1141    let first = match lines.next() {
1142        Some(v) => v,
1143        None => return Ok(()),
1144    };
1145
1146    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
1147    // For all-repeated we need to buffer group lines since we only print if count > 1
1148    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1149    group_lines.push(first);
1150    let mut first_group_printed = false;
1151
1152    let fast = !needs_key_extraction(config) && !config.ignore_case;
1153
1154    for (cur_content, cur_full) in lines {
1155        let prev_content = group_lines.last().unwrap().0;
1156        let equal = if fast {
1157            lines_equal_fast(prev_content, cur_content)
1158        } else {
1159            lines_equal(prev_content, cur_content, config)
1160        };
1161
1162        if equal {
1163            group_lines.push((cur_content, cur_full));
1164        } else {
1165            // Flush group
1166            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1167            group_lines.clear();
1168            group_lines.push((cur_content, cur_full));
1169        }
1170    }
1171
1172    // Flush last group
1173    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1174
1175    Ok(())
1176}
1177
1178/// Flush a group for --all-repeated mode (bytes path).
1179fn flush_all_repeated_bytes(
1180    writer: &mut impl Write,
1181    group: &[(&[u8], &[u8])],
1182    method: AllRepeatedMethod,
1183    first_group_printed: &mut bool,
1184    term: u8,
1185) -> io::Result<()> {
1186    if group.len() <= 1 {
1187        return Ok(()); // Not a duplicate group
1188    }
1189
1190    match method {
1191        AllRepeatedMethod::Prepend => {
1192            writer.write_all(&[term])?;
1193        }
1194        AllRepeatedMethod::Separate => {
1195            if *first_group_printed {
1196                writer.write_all(&[term])?;
1197            }
1198        }
1199        AllRepeatedMethod::None => {}
1200    }
1201
1202    for &(content, full) in group {
1203        writer.write_all(full)?;
1204        if full.len() == content.len() {
1205            writer.write_all(&[term])?;
1206        }
1207    }
1208
1209    *first_group_printed = true;
1210    Ok(())
1211}
1212
1213/// Process --group mode on byte slices.
1214fn process_group_bytes(
1215    data: &[u8],
1216    writer: &mut impl Write,
1217    config: &UniqConfig,
1218    method: GroupMethod,
1219    term: u8,
1220) -> io::Result<()> {
1221    let mut lines = LineIter::new(data, term);
1222
1223    let (prev_content, prev_full) = match lines.next() {
1224        Some(v) => v,
1225        None => return Ok(()),
1226    };
1227
1228    // Prepend/Both: separator before first group
1229    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1230        writer.write_all(&[term])?;
1231    }
1232
1233    // Write first line
1234    writer.write_all(prev_full)?;
1235    if prev_full.len() == prev_content.len() {
1236        writer.write_all(&[term])?;
1237    }
1238
1239    let mut prev_content = prev_content;
1240    let fast = !needs_key_extraction(config) && !config.ignore_case;
1241
1242    for (cur_content, cur_full) in lines {
1243        let equal = if fast {
1244            lines_equal_fast(prev_content, cur_content)
1245        } else {
1246            lines_equal(prev_content, cur_content, config)
1247        };
1248
1249        if !equal {
1250            // New group — write separator
1251            writer.write_all(&[term])?;
1252        }
1253
1254        writer.write_all(cur_full)?;
1255        if cur_full.len() == cur_content.len() {
1256            writer.write_all(&[term])?;
1257        }
1258
1259        prev_content = cur_content;
1260    }
1261
1262    // Append/Both: separator after last group
1263    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1264        writer.write_all(&[term])?;
1265    }
1266
1267    Ok(())
1268}
1269
1270// ============================================================================
1271// Streaming processing (for stdin / pipe input)
1272// ============================================================================
1273
1274/// Main streaming uniq processor.
1275/// Reads from `input`, writes to `output`.
1276pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
1277    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
1278    let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
1279    let term = if config.zero_terminated { b'\0' } else { b'\n' };
1280
1281    match config.mode {
1282        OutputMode::Group(method) => {
1283            process_group_stream(reader, &mut writer, config, method, term)?;
1284        }
1285        OutputMode::AllRepeated(method) => {
1286            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
1287        }
1288        _ => {
1289            process_standard_stream(reader, &mut writer, config, term)?;
1290        }
1291    }
1292
1293    writer.flush()?;
1294    Ok(())
1295}
1296
1297/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
1298fn process_standard_stream<R: BufRead, W: Write>(
1299    mut reader: R,
1300    writer: &mut W,
1301    config: &UniqConfig,
1302    term: u8,
1303) -> io::Result<()> {
1304    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1305    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1306
1307    // Read first line
1308    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1309        return Ok(()); // empty input
1310    }
1311    let mut count: u64 = 1;
1312
1313    loop {
1314        current_line.clear();
1315        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1316
1317        if bytes_read == 0 {
1318            // End of input — output the last group
1319            output_group_stream(writer, &prev_line, count, config, term)?;
1320            break;
1321        }
1322
1323        if compare_lines_stream(&prev_line, &current_line, config, term) {
1324            count += 1;
1325        } else {
1326            output_group_stream(writer, &prev_line, count, config, term)?;
1327            std::mem::swap(&mut prev_line, &mut current_line);
1328            count = 1;
1329        }
1330    }
1331
1332    Ok(())
1333}
1334
1335/// Compare two lines (with terminators) in streaming mode.
1336#[inline(always)]
1337fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
1338    let a_stripped = strip_term(a, term);
1339    let b_stripped = strip_term(b, term);
1340    lines_equal(a_stripped, b_stripped, config)
1341}
1342
1343/// Strip terminator from end of line.
1344#[inline(always)]
1345fn strip_term(line: &[u8], term: u8) -> &[u8] {
1346    if line.last() == Some(&term) {
1347        &line[..line.len() - 1]
1348    } else {
1349        line
1350    }
1351}
1352
1353/// Output a group in streaming mode.
1354#[inline(always)]
1355fn output_group_stream(
1356    writer: &mut impl Write,
1357    line: &[u8],
1358    count: u64,
1359    config: &UniqConfig,
1360    term: u8,
1361) -> io::Result<()> {
1362    let should_print = match config.mode {
1363        OutputMode::Default => true,
1364        OutputMode::RepeatedOnly => count > 1,
1365        OutputMode::UniqueOnly => count == 1,
1366        _ => true,
1367    };
1368
1369    if should_print {
1370        let content = strip_term(line, term);
1371        if config.count {
1372            write_count_line(writer, count, content, term)?;
1373        } else {
1374            writer.write_all(content)?;
1375            writer.write_all(&[term])?;
1376        }
1377    }
1378
1379    Ok(())
1380}
1381
1382/// Process --all-repeated / -D mode (streaming).
1383fn process_all_repeated_stream<R: BufRead, W: Write>(
1384    mut reader: R,
1385    writer: &mut W,
1386    config: &UniqConfig,
1387    method: AllRepeatedMethod,
1388    term: u8,
1389) -> io::Result<()> {
1390    let mut group: Vec<Vec<u8>> = Vec::new();
1391    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1392    let mut first_group_printed = false;
1393
1394    current_line.clear();
1395    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
1396        return Ok(());
1397    }
1398    group.push(current_line.clone());
1399
1400    loop {
1401        current_line.clear();
1402        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1403
1404        if bytes_read == 0 {
1405            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1406            break;
1407        }
1408
1409        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
1410            group.push(current_line.clone());
1411        } else {
1412            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1413            group.clear();
1414            group.push(current_line.clone());
1415        }
1416    }
1417
1418    Ok(())
1419}
1420
1421/// Flush a group for --all-repeated mode (streaming).
1422fn flush_all_repeated_stream(
1423    writer: &mut impl Write,
1424    group: &[Vec<u8>],
1425    method: AllRepeatedMethod,
1426    first_group_printed: &mut bool,
1427    term: u8,
1428) -> io::Result<()> {
1429    if group.len() <= 1 {
1430        return Ok(());
1431    }
1432
1433    match method {
1434        AllRepeatedMethod::Prepend => {
1435            writer.write_all(&[term])?;
1436        }
1437        AllRepeatedMethod::Separate => {
1438            if *first_group_printed {
1439                writer.write_all(&[term])?;
1440            }
1441        }
1442        AllRepeatedMethod::None => {}
1443    }
1444
1445    for line in group {
1446        let content = strip_term(line, term);
1447        writer.write_all(content)?;
1448        writer.write_all(&[term])?;
1449    }
1450
1451    *first_group_printed = true;
1452    Ok(())
1453}
1454
1455/// Process --group mode (streaming).
1456fn process_group_stream<R: BufRead, W: Write>(
1457    mut reader: R,
1458    writer: &mut W,
1459    config: &UniqConfig,
1460    method: GroupMethod,
1461    term: u8,
1462) -> io::Result<()> {
1463    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1464    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1465
1466    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1467        return Ok(());
1468    }
1469
1470    // Prepend/Both: separator before first group
1471    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1472        writer.write_all(&[term])?;
1473    }
1474
1475    let content = strip_term(&prev_line, term);
1476    writer.write_all(content)?;
1477    writer.write_all(&[term])?;
1478
1479    loop {
1480        current_line.clear();
1481        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1482
1483        if bytes_read == 0 {
1484            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1485                writer.write_all(&[term])?;
1486            }
1487            break;
1488        }
1489
1490        if !compare_lines_stream(&prev_line, &current_line, config, term) {
1491            writer.write_all(&[term])?;
1492        }
1493
1494        let content = strip_term(&current_line, term);
1495        writer.write_all(content)?;
1496        writer.write_all(&[term])?;
1497
1498        std::mem::swap(&mut prev_line, &mut current_line);
1499    }
1500
1501    Ok(())
1502}
1503
1504/// Read a line terminated by the given byte (newline or NUL).
1505/// Returns number of bytes read (0 = EOF).
1506#[inline(always)]
1507fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1508    reader.read_until(term, buf)
1509}