Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Check if config requires field/char skipping or char limiting.
119#[inline(always)]
120fn needs_key_extraction(config: &UniqConfig) -> bool {
121    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
122}
123
124/// Fast path comparison: no field/char extraction needed, no case folding.
125/// Uses 8-byte prefix check to quickly reject non-equal lines before full comparison.
126#[inline(always)]
127fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
128    // Quick length + prefix check: resolves most non-equal comparisons
129    // without scanning the full line
130    if a.len() != b.len() {
131        return false;
132    }
133    a == b
134}
135
136/// Write a count-prefixed line in GNU uniq format.
137/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
138/// Uses a single write_all by building the prefix in a stack buffer.
139#[inline(always)]
140fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
141    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
142    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
143    let digits = itoa_right_aligned_into(&mut prefix, count);
144    let width = digits.max(7); // minimum 7 chars
145    let prefix_len = width + 1; // +1 for trailing space
146    prefix[width] = b' ';
147    // Write prefix + line + term in as few calls as possible
148    out.write_all(&prefix[..prefix_len])?;
149    out.write_all(line)?;
150    out.write_all(&[term])?;
151    Ok(())
152}
153
154/// Write u64 decimal right-aligned into prefix buffer.
155/// Buffer is pre-filled with spaces. Returns number of digits written.
156#[inline(always)]
157fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
158    if val == 0 {
159        buf[6] = b'0';
160        return 7; // 6 spaces + '0' = 7 chars
161    }
162    // Write digits right-to-left from position 27 (leaving room for trailing space)
163    let mut pos = 27;
164    while val > 0 {
165        pos -= 1;
166        buf[pos] = b'0' + (val % 10) as u8;
167        val /= 10;
168    }
169    let num_digits = 27 - pos;
170    if num_digits >= 7 {
171        // Number is wide enough, shift to front
172        buf.copy_within(pos..27, 0);
173        num_digits
174    } else {
175        // Right-align in 7-char field: spaces then digits
176        let pad = 7 - num_digits;
177        buf.copy_within(pos..27, pad);
178        // buf[0..pad] is already spaces from initialization
179        7
180    }
181}
182
183// ============================================================================
184// High-performance mmap-based processing (for byte slices, zero-copy)
185// ============================================================================
186
187/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
188pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
189    let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, output);
190    let term = if config.zero_terminated { b'\0' } else { b'\n' };
191
192    match config.mode {
193        OutputMode::Group(method) => {
194            process_group_bytes(data, &mut writer, config, method, term)?;
195        }
196        OutputMode::AllRepeated(method) => {
197            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
198        }
199        _ => {
200            process_standard_bytes(data, &mut writer, config, term)?;
201        }
202    }
203
204    writer.flush()?;
205    Ok(())
206}
207
208/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
209/// Uses memchr for SIMD-accelerated line boundary detection.
210struct LineIter<'a> {
211    data: &'a [u8],
212    pos: usize,
213    term: u8,
214}
215
216impl<'a> LineIter<'a> {
217    #[inline(always)]
218    fn new(data: &'a [u8], term: u8) -> Self {
219        Self { data, pos: 0, term }
220    }
221}
222
223impl<'a> Iterator for LineIter<'a> {
224    /// (line content without terminator, full line including terminator for output)
225    type Item = (&'a [u8], &'a [u8]);
226
227    #[inline(always)]
228    fn next(&mut self) -> Option<Self::Item> {
229        if self.pos >= self.data.len() {
230            return None;
231        }
232
233        let remaining = &self.data[self.pos..];
234        match memchr::memchr(self.term, remaining) {
235            Some(idx) => {
236                let line_start = self.pos;
237                let line_end = self.pos + idx; // without terminator
238                let full_end = self.pos + idx + 1; // with terminator
239                self.pos = full_end;
240                Some((
241                    &self.data[line_start..line_end],
242                    &self.data[line_start..full_end],
243                ))
244            }
245            None => {
246                // Last line without terminator
247                let line_start = self.pos;
248                self.pos = self.data.len();
249                let line = &self.data[line_start..];
250                Some((line, line))
251            }
252        }
253    }
254}
255
256/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
257fn process_standard_bytes(
258    data: &[u8],
259    writer: &mut impl Write,
260    config: &UniqConfig,
261    term: u8,
262) -> io::Result<()> {
263    let mut lines = LineIter::new(data, term);
264
265    let (prev_content, prev_full) = match lines.next() {
266        Some(v) => v,
267        None => return Ok(()), // empty input
268    };
269
270    let fast = !needs_key_extraction(config) && !config.ignore_case;
271
272    // Ultra-fast path: default mode, no count, no key extraction
273    // Zero-copy: writes contiguous spans directly from mmap data, no intermediate Vec
274    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
275        let data_base = data.as_ptr() as usize;
276        let mut prev_content = prev_content;
277
278        // Write first line
279        write_all_raw(writer, prev_full)?;
280        if prev_full.len() == prev_content.len() {
281            writer.write_all(&[term])?;
282        }
283
284        // Track contiguous output spans in mmap data
285        let mut span_start: usize = usize::MAX; // sentinel = no active span
286        let mut span_end: usize = 0;
287
288        for (cur_content, cur_full) in lines {
289            if lines_equal_fast(prev_content, cur_content) {
290                // Duplicate — flush any active span, skip line
291                if span_start != usize::MAX {
292                    write_all_raw(writer, &data[span_start..span_end])?;
293                    span_start = usize::MAX;
294                }
295                prev_content = cur_content;
296                continue;
297            }
298
299            let cur_offset = cur_full.as_ptr() as usize - data_base;
300
301            if span_start == usize::MAX {
302                // Start new span
303                span_start = cur_offset;
304                span_end = cur_offset + cur_full.len();
305            } else if cur_offset == span_end {
306                // Extend contiguous span (common case — unique lines are adjacent)
307                span_end += cur_full.len();
308            } else {
309                // Non-contiguous — flush and start new span
310                write_all_raw(writer, &data[span_start..span_end])?;
311                span_start = cur_offset;
312                span_end = cur_offset + cur_full.len();
313            }
314
315            // Handle last line without terminator
316            if cur_full.len() == cur_content.len() {
317                write_all_raw(writer, &data[span_start..span_end])?;
318                writer.write_all(&[term])?;
319                span_start = usize::MAX;
320            }
321
322            prev_content = cur_content;
323        }
324
325        // Flush remaining span
326        if span_start != usize::MAX {
327            write_all_raw(writer, &data[span_start..span_end])?;
328        }
329        return Ok(());
330    }
331
332    // General path with count tracking
333    let mut prev_content = prev_content;
334    let mut prev_full = prev_full;
335    let mut count: u64 = 1;
336
337    for (cur_content, cur_full) in lines {
338        let equal = if fast {
339            lines_equal_fast(prev_content, cur_content)
340        } else {
341            lines_equal(prev_content, cur_content, config)
342        };
343
344        if equal {
345            count += 1;
346        } else {
347            // Output previous group
348            output_group_bytes(writer, prev_content, prev_full, count, config, term)?;
349            prev_content = cur_content;
350            prev_full = cur_full;
351            count = 1;
352        }
353    }
354
355    // Output last group
356    output_group_bytes(writer, prev_content, prev_full, count, config, term)?;
357    Ok(())
358}
359
360/// Output a group for standard modes (bytes path).
361#[inline(always)]
362fn output_group_bytes(
363    writer: &mut impl Write,
364    content: &[u8],
365    full: &[u8],
366    count: u64,
367    config: &UniqConfig,
368    term: u8,
369) -> io::Result<()> {
370    let should_print = match config.mode {
371        OutputMode::Default => true,
372        OutputMode::RepeatedOnly => count > 1,
373        OutputMode::UniqueOnly => count == 1,
374        _ => true,
375    };
376
377    if should_print {
378        if config.count {
379            write_count_line(writer, count, content, term)?;
380        } else {
381            writer.write_all(full)?;
382            // Add terminator if the original line didn't have one
383            if full.len() == content.len() {
384                writer.write_all(&[term])?;
385            }
386        }
387    }
388
389    Ok(())
390}
391
392/// Process --all-repeated / -D mode on byte slices.
393fn process_all_repeated_bytes(
394    data: &[u8],
395    writer: &mut impl Write,
396    config: &UniqConfig,
397    method: AllRepeatedMethod,
398    term: u8,
399) -> io::Result<()> {
400    let mut lines = LineIter::new(data, term);
401
402    let first = match lines.next() {
403        Some(v) => v,
404        None => return Ok(()),
405    };
406
407    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
408    // For all-repeated we need to buffer group lines since we only print if count > 1
409    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
410    group_lines.push(first);
411    let mut first_group_printed = false;
412
413    let fast = !needs_key_extraction(config) && !config.ignore_case;
414
415    for (cur_content, cur_full) in lines {
416        let prev_content = group_lines.last().unwrap().0;
417        let equal = if fast {
418            lines_equal_fast(prev_content, cur_content)
419        } else {
420            lines_equal(prev_content, cur_content, config)
421        };
422
423        if equal {
424            group_lines.push((cur_content, cur_full));
425        } else {
426            // Flush group
427            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
428            group_lines.clear();
429            group_lines.push((cur_content, cur_full));
430        }
431    }
432
433    // Flush last group
434    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
435
436    Ok(())
437}
438
439/// Flush a group for --all-repeated mode (bytes path).
440fn flush_all_repeated_bytes(
441    writer: &mut impl Write,
442    group: &[(&[u8], &[u8])],
443    method: AllRepeatedMethod,
444    first_group_printed: &mut bool,
445    term: u8,
446) -> io::Result<()> {
447    if group.len() <= 1 {
448        return Ok(()); // Not a duplicate group
449    }
450
451    match method {
452        AllRepeatedMethod::Prepend => {
453            writer.write_all(&[term])?;
454        }
455        AllRepeatedMethod::Separate => {
456            if *first_group_printed {
457                writer.write_all(&[term])?;
458            }
459        }
460        AllRepeatedMethod::None => {}
461    }
462
463    for &(content, full) in group {
464        writer.write_all(full)?;
465        if full.len() == content.len() {
466            writer.write_all(&[term])?;
467        }
468    }
469
470    *first_group_printed = true;
471    Ok(())
472}
473
474/// Process --group mode on byte slices.
475fn process_group_bytes(
476    data: &[u8],
477    writer: &mut impl Write,
478    config: &UniqConfig,
479    method: GroupMethod,
480    term: u8,
481) -> io::Result<()> {
482    let mut lines = LineIter::new(data, term);
483
484    let (prev_content, prev_full) = match lines.next() {
485        Some(v) => v,
486        None => return Ok(()),
487    };
488
489    // Prepend/Both: separator before first group
490    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
491        writer.write_all(&[term])?;
492    }
493
494    // Write first line
495    writer.write_all(prev_full)?;
496    if prev_full.len() == prev_content.len() {
497        writer.write_all(&[term])?;
498    }
499
500    let mut prev_content = prev_content;
501    let fast = !needs_key_extraction(config) && !config.ignore_case;
502
503    for (cur_content, cur_full) in lines {
504        let equal = if fast {
505            lines_equal_fast(prev_content, cur_content)
506        } else {
507            lines_equal(prev_content, cur_content, config)
508        };
509
510        if !equal {
511            // New group — write separator
512            writer.write_all(&[term])?;
513        }
514
515        writer.write_all(cur_full)?;
516        if cur_full.len() == cur_content.len() {
517            writer.write_all(&[term])?;
518        }
519
520        prev_content = cur_content;
521    }
522
523    // Append/Both: separator after last group
524    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
525        writer.write_all(&[term])?;
526    }
527
528    Ok(())
529}
530
531// ============================================================================
532// Streaming processing (for stdin / pipe input)
533// ============================================================================
534
535/// Main streaming uniq processor.
536/// Reads from `input`, writes to `output`.
537pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
538    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
539    let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, output);
540    let term = if config.zero_terminated { b'\0' } else { b'\n' };
541
542    match config.mode {
543        OutputMode::Group(method) => {
544            process_group_stream(reader, &mut writer, config, method, term)?;
545        }
546        OutputMode::AllRepeated(method) => {
547            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
548        }
549        _ => {
550            process_standard_stream(reader, &mut writer, config, term)?;
551        }
552    }
553
554    writer.flush()?;
555    Ok(())
556}
557
558/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
559fn process_standard_stream<R: BufRead, W: Write>(
560    mut reader: R,
561    writer: &mut W,
562    config: &UniqConfig,
563    term: u8,
564) -> io::Result<()> {
565    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
566    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
567
568    // Read first line
569    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
570        return Ok(()); // empty input
571    }
572    let mut count: u64 = 1;
573
574    loop {
575        current_line.clear();
576        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
577
578        if bytes_read == 0 {
579            // End of input — output the last group
580            output_group_stream(writer, &prev_line, count, config, term)?;
581            break;
582        }
583
584        if compare_lines_stream(&prev_line, &current_line, config, term) {
585            count += 1;
586        } else {
587            output_group_stream(writer, &prev_line, count, config, term)?;
588            std::mem::swap(&mut prev_line, &mut current_line);
589            count = 1;
590        }
591    }
592
593    Ok(())
594}
595
596/// Compare two lines (with terminators) in streaming mode.
597#[inline(always)]
598fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
599    let a_stripped = strip_term(a, term);
600    let b_stripped = strip_term(b, term);
601    lines_equal(a_stripped, b_stripped, config)
602}
603
604/// Strip terminator from end of line.
605#[inline(always)]
606fn strip_term(line: &[u8], term: u8) -> &[u8] {
607    if line.last() == Some(&term) {
608        &line[..line.len() - 1]
609    } else {
610        line
611    }
612}
613
614/// Output a group in streaming mode.
615#[inline(always)]
616fn output_group_stream(
617    writer: &mut impl Write,
618    line: &[u8],
619    count: u64,
620    config: &UniqConfig,
621    term: u8,
622) -> io::Result<()> {
623    let should_print = match config.mode {
624        OutputMode::Default => true,
625        OutputMode::RepeatedOnly => count > 1,
626        OutputMode::UniqueOnly => count == 1,
627        _ => true,
628    };
629
630    if should_print {
631        let content = strip_term(line, term);
632        if config.count {
633            write_count_line(writer, count, content, term)?;
634        } else {
635            writer.write_all(content)?;
636            writer.write_all(&[term])?;
637        }
638    }
639
640    Ok(())
641}
642
643/// Process --all-repeated / -D mode (streaming).
644fn process_all_repeated_stream<R: BufRead, W: Write>(
645    mut reader: R,
646    writer: &mut W,
647    config: &UniqConfig,
648    method: AllRepeatedMethod,
649    term: u8,
650) -> io::Result<()> {
651    let mut group: Vec<Vec<u8>> = Vec::new();
652    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
653    let mut first_group_printed = false;
654
655    current_line.clear();
656    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
657        return Ok(());
658    }
659    group.push(current_line.clone());
660
661    loop {
662        current_line.clear();
663        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
664
665        if bytes_read == 0 {
666            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
667            break;
668        }
669
670        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
671            group.push(current_line.clone());
672        } else {
673            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
674            group.clear();
675            group.push(current_line.clone());
676        }
677    }
678
679    Ok(())
680}
681
682/// Flush a group for --all-repeated mode (streaming).
683fn flush_all_repeated_stream(
684    writer: &mut impl Write,
685    group: &[Vec<u8>],
686    method: AllRepeatedMethod,
687    first_group_printed: &mut bool,
688    term: u8,
689) -> io::Result<()> {
690    if group.len() <= 1 {
691        return Ok(());
692    }
693
694    match method {
695        AllRepeatedMethod::Prepend => {
696            writer.write_all(&[term])?;
697        }
698        AllRepeatedMethod::Separate => {
699            if *first_group_printed {
700                writer.write_all(&[term])?;
701            }
702        }
703        AllRepeatedMethod::None => {}
704    }
705
706    for line in group {
707        let content = strip_term(line, term);
708        writer.write_all(content)?;
709        writer.write_all(&[term])?;
710    }
711
712    *first_group_printed = true;
713    Ok(())
714}
715
716/// Process --group mode (streaming).
717fn process_group_stream<R: BufRead, W: Write>(
718    mut reader: R,
719    writer: &mut W,
720    config: &UniqConfig,
721    method: GroupMethod,
722    term: u8,
723) -> io::Result<()> {
724    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
725    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
726
727    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
728        return Ok(());
729    }
730
731    // Prepend/Both: separator before first group
732    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
733        writer.write_all(&[term])?;
734    }
735
736    let content = strip_term(&prev_line, term);
737    writer.write_all(content)?;
738    writer.write_all(&[term])?;
739
740    loop {
741        current_line.clear();
742        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
743
744        if bytes_read == 0 {
745            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
746                writer.write_all(&[term])?;
747            }
748            break;
749        }
750
751        if !compare_lines_stream(&prev_line, &current_line, config, term) {
752            writer.write_all(&[term])?;
753        }
754
755        let content = strip_term(&current_line, term);
756        writer.write_all(content)?;
757        writer.write_all(&[term])?;
758
759        std::mem::swap(&mut prev_line, &mut current_line);
760    }
761
762    Ok(())
763}
764
765/// Read a line terminated by the given byte (newline or NUL).
766/// Returns number of bytes read (0 = EOF).
767#[inline(always)]
768fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
769    reader.read_until(term, buf)
770}