Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// Write a large contiguous buffer, retrying on partial writes.
4#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6    writer.write_all(buf)
7}
8
9/// How to delimit groups when using --all-repeated
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12    None,
13    Prepend,
14    Separate,
15}
16
17/// How to delimit groups when using --group
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20    Separate,
21    Prepend,
22    Append,
23    Both,
24}
25
26/// Output mode for uniq
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29    /// Default: print unique lines and first of each duplicate group
30    Default,
31    /// -d: print only first line of duplicate groups
32    RepeatedOnly,
33    /// -D / --all-repeated: print ALL duplicate lines
34    AllRepeated(AllRepeatedMethod),
35    /// -u: print only lines that are NOT duplicated
36    UniqueOnly,
37    /// --group: show all items with group separators
38    Group(GroupMethod),
39}
40
41/// Configuration for uniq processing
42#[derive(Debug, Clone)]
43pub struct UniqConfig {
44    pub mode: OutputMode,
45    pub count: bool,
46    pub ignore_case: bool,
47    pub skip_fields: usize,
48    pub skip_chars: usize,
49    pub check_chars: Option<usize>,
50    pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54    fn default() -> Self {
55        Self {
56            mode: OutputMode::Default,
57            count: false,
58            ignore_case: false,
59            skip_fields: 0,
60            skip_chars: 0,
61            check_chars: None,
62            zero_terminated: false,
63        }
64    }
65}
66
67/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
68/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
69#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71    let mut start = 0;
72    let len = line.len();
73
74    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
75    for _ in 0..config.skip_fields {
76        // Skip blanks (space and tab)
77        while start < len && (line[start] == b' ' || line[start] == b'\t') {
78            start += 1;
79        }
80        // Skip non-blanks (field content)
81        while start < len && line[start] != b' ' && line[start] != b'\t' {
82            start += 1;
83        }
84    }
85
86    // Skip N characters
87    if config.skip_chars > 0 {
88        let remaining = len - start;
89        let skip = config.skip_chars.min(remaining);
90        start += skip;
91    }
92
93    let slice = &line[start..];
94
95    // Limit comparison to N characters
96    if let Some(w) = config.check_chars {
97        if w < slice.len() {
98            return &slice[..w];
99        }
100    }
101
102    slice
103}
104
105/// Compare two lines (without terminators) using the config's comparison rules.
106#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108    let sa = get_compare_slice(a, config);
109    let sb = get_compare_slice(b, config);
110
111    if config.ignore_case {
112        sa.eq_ignore_ascii_case(sb)
113    } else {
114        sa == sb
115    }
116}
117
118/// Check if config requires field/char skipping or char limiting.
119#[inline(always)]
120fn needs_key_extraction(config: &UniqConfig) -> bool {
121    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
122}
123
124/// Fast path comparison: no field/char extraction needed, no case folding.
125/// Uses pointer+length equality shortcut and 8-byte prefix rejection.
126#[inline(always)]
127fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
128    let alen = a.len();
129    if alen != b.len() {
130        return false;
131    }
132    if alen == 0 {
133        return true;
134    }
135    // 8-byte prefix check: reject most non-equal lines without full memcmp
136    if alen >= 8 {
137        let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
138        let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
139        if a8 != b8 {
140            return false;
141        }
142    }
143    a == b
144}
145
146/// Write a count-prefixed line in GNU uniq format.
147/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
148/// Uses a single write_all by building the prefix in a stack buffer.
149#[inline(always)]
150fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
151    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
152    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
153    let digits = itoa_right_aligned_into(&mut prefix, count);
154    let width = digits.max(7); // minimum 7 chars
155    let prefix_len = width + 1; // +1 for trailing space
156    prefix[width] = b' ';
157    // Write prefix + line + term in as few calls as possible
158    out.write_all(&prefix[..prefix_len])?;
159    out.write_all(line)?;
160    out.write_all(&[term])?;
161    Ok(())
162}
163
164/// Write u64 decimal right-aligned into prefix buffer.
165/// Buffer is pre-filled with spaces. Returns number of digits written.
166#[inline(always)]
167fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
168    if val == 0 {
169        buf[6] = b'0';
170        return 7; // 6 spaces + '0' = 7 chars
171    }
172    // Write digits right-to-left from position 27 (leaving room for trailing space)
173    let mut pos = 27;
174    while val > 0 {
175        pos -= 1;
176        buf[pos] = b'0' + (val % 10) as u8;
177        val /= 10;
178    }
179    let num_digits = 27 - pos;
180    if num_digits >= 7 {
181        // Number is wide enough, shift to front
182        buf.copy_within(pos..27, 0);
183        num_digits
184    } else {
185        // Right-align in 7-char field: spaces then digits
186        let pad = 7 - num_digits;
187        buf.copy_within(pos..27, pad);
188        // buf[0..pad] is already spaces from initialization
189        7
190    }
191}
192
193// ============================================================================
194// High-performance mmap-based processing (for byte slices, zero-copy)
195// ============================================================================
196
197/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
198pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
199    let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, output);
200    let term = if config.zero_terminated { b'\0' } else { b'\n' };
201
202    match config.mode {
203        OutputMode::Group(method) => {
204            process_group_bytes(data, &mut writer, config, method, term)?;
205        }
206        OutputMode::AllRepeated(method) => {
207            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
208        }
209        _ => {
210            process_standard_bytes(data, &mut writer, config, term)?;
211        }
212    }
213
214    writer.flush()?;
215    Ok(())
216}
217
218/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
219/// Uses memchr for SIMD-accelerated line boundary detection.
220struct LineIter<'a> {
221    data: &'a [u8],
222    pos: usize,
223    term: u8,
224}
225
226impl<'a> LineIter<'a> {
227    #[inline(always)]
228    fn new(data: &'a [u8], term: u8) -> Self {
229        Self { data, pos: 0, term }
230    }
231}
232
233impl<'a> Iterator for LineIter<'a> {
234    /// (line content without terminator, full line including terminator for output)
235    type Item = (&'a [u8], &'a [u8]);
236
237    #[inline(always)]
238    fn next(&mut self) -> Option<Self::Item> {
239        if self.pos >= self.data.len() {
240            return None;
241        }
242
243        let remaining = &self.data[self.pos..];
244        match memchr::memchr(self.term, remaining) {
245            Some(idx) => {
246                let line_start = self.pos;
247                let line_end = self.pos + idx; // without terminator
248                let full_end = self.pos + idx + 1; // with terminator
249                self.pos = full_end;
250                Some((
251                    &self.data[line_start..line_end],
252                    &self.data[line_start..full_end],
253                ))
254            }
255            None => {
256                // Last line without terminator
257                let line_start = self.pos;
258                self.pos = self.data.len();
259                let line = &self.data[line_start..];
260                Some((line, line))
261            }
262        }
263    }
264}
265
266/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
267fn process_standard_bytes(
268    data: &[u8],
269    writer: &mut impl Write,
270    config: &UniqConfig,
271    term: u8,
272) -> io::Result<()> {
273    let mut lines = LineIter::new(data, term);
274
275    let (prev_content, prev_full) = match lines.next() {
276        Some(v) => v,
277        None => return Ok(()), // empty input
278    };
279
280    let fast = !needs_key_extraction(config) && !config.ignore_case;
281
282    // Ultra-fast path: default mode, no count, no key extraction
283    // Zero-copy: writes contiguous spans directly from mmap data, no intermediate Vec
284    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
285        let data_base = data.as_ptr() as usize;
286        let mut prev_content = prev_content;
287
288        // Write first line
289        write_all_raw(writer, prev_full)?;
290        if prev_full.len() == prev_content.len() {
291            writer.write_all(&[term])?;
292        }
293
294        // Track contiguous output spans in mmap data
295        let mut span_start: usize = usize::MAX; // sentinel = no active span
296        let mut span_end: usize = 0;
297
298        for (cur_content, cur_full) in lines {
299            if lines_equal_fast(prev_content, cur_content) {
300                // Duplicate — flush any active span, skip line
301                if span_start != usize::MAX {
302                    write_all_raw(writer, &data[span_start..span_end])?;
303                    span_start = usize::MAX;
304                }
305                prev_content = cur_content;
306                continue;
307            }
308
309            let cur_offset = cur_full.as_ptr() as usize - data_base;
310
311            if span_start == usize::MAX {
312                // Start new span
313                span_start = cur_offset;
314                span_end = cur_offset + cur_full.len();
315            } else if cur_offset == span_end {
316                // Extend contiguous span (common case — unique lines are adjacent)
317                span_end += cur_full.len();
318            } else {
319                // Non-contiguous — flush and start new span
320                write_all_raw(writer, &data[span_start..span_end])?;
321                span_start = cur_offset;
322                span_end = cur_offset + cur_full.len();
323            }
324
325            // Handle last line without terminator
326            if cur_full.len() == cur_content.len() {
327                write_all_raw(writer, &data[span_start..span_end])?;
328                writer.write_all(&[term])?;
329                span_start = usize::MAX;
330            }
331
332            prev_content = cur_content;
333        }
334
335        // Flush remaining span
336        if span_start != usize::MAX {
337            write_all_raw(writer, &data[span_start..span_end])?;
338        }
339        return Ok(());
340    }
341
342    // General path with count tracking
343    let mut prev_content = prev_content;
344    let mut prev_full = prev_full;
345    let mut count: u64 = 1;
346
347    for (cur_content, cur_full) in lines {
348        let equal = if fast {
349            lines_equal_fast(prev_content, cur_content)
350        } else {
351            lines_equal(prev_content, cur_content, config)
352        };
353
354        if equal {
355            count += 1;
356        } else {
357            // Output previous group
358            output_group_bytes(writer, prev_content, prev_full, count, config, term)?;
359            prev_content = cur_content;
360            prev_full = cur_full;
361            count = 1;
362        }
363    }
364
365    // Output last group
366    output_group_bytes(writer, prev_content, prev_full, count, config, term)?;
367    Ok(())
368}
369
370/// Output a group for standard modes (bytes path).
371#[inline(always)]
372fn output_group_bytes(
373    writer: &mut impl Write,
374    content: &[u8],
375    full: &[u8],
376    count: u64,
377    config: &UniqConfig,
378    term: u8,
379) -> io::Result<()> {
380    let should_print = match config.mode {
381        OutputMode::Default => true,
382        OutputMode::RepeatedOnly => count > 1,
383        OutputMode::UniqueOnly => count == 1,
384        _ => true,
385    };
386
387    if should_print {
388        if config.count {
389            write_count_line(writer, count, content, term)?;
390        } else {
391            writer.write_all(full)?;
392            // Add terminator if the original line didn't have one
393            if full.len() == content.len() {
394                writer.write_all(&[term])?;
395            }
396        }
397    }
398
399    Ok(())
400}
401
402/// Process --all-repeated / -D mode on byte slices.
403fn process_all_repeated_bytes(
404    data: &[u8],
405    writer: &mut impl Write,
406    config: &UniqConfig,
407    method: AllRepeatedMethod,
408    term: u8,
409) -> io::Result<()> {
410    let mut lines = LineIter::new(data, term);
411
412    let first = match lines.next() {
413        Some(v) => v,
414        None => return Ok(()),
415    };
416
417    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
418    // For all-repeated we need to buffer group lines since we only print if count > 1
419    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
420    group_lines.push(first);
421    let mut first_group_printed = false;
422
423    let fast = !needs_key_extraction(config) && !config.ignore_case;
424
425    for (cur_content, cur_full) in lines {
426        let prev_content = group_lines.last().unwrap().0;
427        let equal = if fast {
428            lines_equal_fast(prev_content, cur_content)
429        } else {
430            lines_equal(prev_content, cur_content, config)
431        };
432
433        if equal {
434            group_lines.push((cur_content, cur_full));
435        } else {
436            // Flush group
437            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
438            group_lines.clear();
439            group_lines.push((cur_content, cur_full));
440        }
441    }
442
443    // Flush last group
444    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
445
446    Ok(())
447}
448
449/// Flush a group for --all-repeated mode (bytes path).
450fn flush_all_repeated_bytes(
451    writer: &mut impl Write,
452    group: &[(&[u8], &[u8])],
453    method: AllRepeatedMethod,
454    first_group_printed: &mut bool,
455    term: u8,
456) -> io::Result<()> {
457    if group.len() <= 1 {
458        return Ok(()); // Not a duplicate group
459    }
460
461    match method {
462        AllRepeatedMethod::Prepend => {
463            writer.write_all(&[term])?;
464        }
465        AllRepeatedMethod::Separate => {
466            if *first_group_printed {
467                writer.write_all(&[term])?;
468            }
469        }
470        AllRepeatedMethod::None => {}
471    }
472
473    for &(content, full) in group {
474        writer.write_all(full)?;
475        if full.len() == content.len() {
476            writer.write_all(&[term])?;
477        }
478    }
479
480    *first_group_printed = true;
481    Ok(())
482}
483
484/// Process --group mode on byte slices.
485fn process_group_bytes(
486    data: &[u8],
487    writer: &mut impl Write,
488    config: &UniqConfig,
489    method: GroupMethod,
490    term: u8,
491) -> io::Result<()> {
492    let mut lines = LineIter::new(data, term);
493
494    let (prev_content, prev_full) = match lines.next() {
495        Some(v) => v,
496        None => return Ok(()),
497    };
498
499    // Prepend/Both: separator before first group
500    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
501        writer.write_all(&[term])?;
502    }
503
504    // Write first line
505    writer.write_all(prev_full)?;
506    if prev_full.len() == prev_content.len() {
507        writer.write_all(&[term])?;
508    }
509
510    let mut prev_content = prev_content;
511    let fast = !needs_key_extraction(config) && !config.ignore_case;
512
513    for (cur_content, cur_full) in lines {
514        let equal = if fast {
515            lines_equal_fast(prev_content, cur_content)
516        } else {
517            lines_equal(prev_content, cur_content, config)
518        };
519
520        if !equal {
521            // New group — write separator
522            writer.write_all(&[term])?;
523        }
524
525        writer.write_all(cur_full)?;
526        if cur_full.len() == cur_content.len() {
527            writer.write_all(&[term])?;
528        }
529
530        prev_content = cur_content;
531    }
532
533    // Append/Both: separator after last group
534    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
535        writer.write_all(&[term])?;
536    }
537
538    Ok(())
539}
540
541// ============================================================================
542// Streaming processing (for stdin / pipe input)
543// ============================================================================
544
545/// Main streaming uniq processor.
546/// Reads from `input`, writes to `output`.
547pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
548    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
549    let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, output);
550    let term = if config.zero_terminated { b'\0' } else { b'\n' };
551
552    match config.mode {
553        OutputMode::Group(method) => {
554            process_group_stream(reader, &mut writer, config, method, term)?;
555        }
556        OutputMode::AllRepeated(method) => {
557            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
558        }
559        _ => {
560            process_standard_stream(reader, &mut writer, config, term)?;
561        }
562    }
563
564    writer.flush()?;
565    Ok(())
566}
567
568/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
569fn process_standard_stream<R: BufRead, W: Write>(
570    mut reader: R,
571    writer: &mut W,
572    config: &UniqConfig,
573    term: u8,
574) -> io::Result<()> {
575    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
576    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
577
578    // Read first line
579    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
580        return Ok(()); // empty input
581    }
582    let mut count: u64 = 1;
583
584    loop {
585        current_line.clear();
586        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
587
588        if bytes_read == 0 {
589            // End of input — output the last group
590            output_group_stream(writer, &prev_line, count, config, term)?;
591            break;
592        }
593
594        if compare_lines_stream(&prev_line, &current_line, config, term) {
595            count += 1;
596        } else {
597            output_group_stream(writer, &prev_line, count, config, term)?;
598            std::mem::swap(&mut prev_line, &mut current_line);
599            count = 1;
600        }
601    }
602
603    Ok(())
604}
605
606/// Compare two lines (with terminators) in streaming mode.
607#[inline(always)]
608fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
609    let a_stripped = strip_term(a, term);
610    let b_stripped = strip_term(b, term);
611    lines_equal(a_stripped, b_stripped, config)
612}
613
614/// Strip terminator from end of line.
615#[inline(always)]
616fn strip_term(line: &[u8], term: u8) -> &[u8] {
617    if line.last() == Some(&term) {
618        &line[..line.len() - 1]
619    } else {
620        line
621    }
622}
623
624/// Output a group in streaming mode.
625#[inline(always)]
626fn output_group_stream(
627    writer: &mut impl Write,
628    line: &[u8],
629    count: u64,
630    config: &UniqConfig,
631    term: u8,
632) -> io::Result<()> {
633    let should_print = match config.mode {
634        OutputMode::Default => true,
635        OutputMode::RepeatedOnly => count > 1,
636        OutputMode::UniqueOnly => count == 1,
637        _ => true,
638    };
639
640    if should_print {
641        let content = strip_term(line, term);
642        if config.count {
643            write_count_line(writer, count, content, term)?;
644        } else {
645            writer.write_all(content)?;
646            writer.write_all(&[term])?;
647        }
648    }
649
650    Ok(())
651}
652
653/// Process --all-repeated / -D mode (streaming).
654fn process_all_repeated_stream<R: BufRead, W: Write>(
655    mut reader: R,
656    writer: &mut W,
657    config: &UniqConfig,
658    method: AllRepeatedMethod,
659    term: u8,
660) -> io::Result<()> {
661    let mut group: Vec<Vec<u8>> = Vec::new();
662    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
663    let mut first_group_printed = false;
664
665    current_line.clear();
666    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
667        return Ok(());
668    }
669    group.push(current_line.clone());
670
671    loop {
672        current_line.clear();
673        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
674
675        if bytes_read == 0 {
676            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
677            break;
678        }
679
680        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
681            group.push(current_line.clone());
682        } else {
683            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
684            group.clear();
685            group.push(current_line.clone());
686        }
687    }
688
689    Ok(())
690}
691
692/// Flush a group for --all-repeated mode (streaming).
693fn flush_all_repeated_stream(
694    writer: &mut impl Write,
695    group: &[Vec<u8>],
696    method: AllRepeatedMethod,
697    first_group_printed: &mut bool,
698    term: u8,
699) -> io::Result<()> {
700    if group.len() <= 1 {
701        return Ok(());
702    }
703
704    match method {
705        AllRepeatedMethod::Prepend => {
706            writer.write_all(&[term])?;
707        }
708        AllRepeatedMethod::Separate => {
709            if *first_group_printed {
710                writer.write_all(&[term])?;
711            }
712        }
713        AllRepeatedMethod::None => {}
714    }
715
716    for line in group {
717        let content = strip_term(line, term);
718        writer.write_all(content)?;
719        writer.write_all(&[term])?;
720    }
721
722    *first_group_printed = true;
723    Ok(())
724}
725
726/// Process --group mode (streaming).
727fn process_group_stream<R: BufRead, W: Write>(
728    mut reader: R,
729    writer: &mut W,
730    config: &UniqConfig,
731    method: GroupMethod,
732    term: u8,
733) -> io::Result<()> {
734    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
735    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
736
737    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
738        return Ok(());
739    }
740
741    // Prepend/Both: separator before first group
742    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
743        writer.write_all(&[term])?;
744    }
745
746    let content = strip_term(&prev_line, term);
747    writer.write_all(content)?;
748    writer.write_all(&[term])?;
749
750    loop {
751        current_line.clear();
752        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
753
754        if bytes_read == 0 {
755            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
756                writer.write_all(&[term])?;
757            }
758            break;
759        }
760
761        if !compare_lines_stream(&prev_line, &current_line, config, term) {
762            writer.write_all(&[term])?;
763        }
764
765        let content = strip_term(&current_line, term);
766        writer.write_all(content)?;
767        writer.write_all(&[term])?;
768
769        std::mem::swap(&mut prev_line, &mut current_line);
770    }
771
772    Ok(())
773}
774
775/// Read a line terminated by the given byte (newline or NUL).
776/// Returns number of bytes read (0 = EOF).
777#[inline(always)]
778fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
779    reader.read_until(term, buf)
780}