Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// How to delimit groups when using --all-repeated
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum AllRepeatedMethod {
6    None,
7    Prepend,
8    Separate,
9}
10
11/// How to delimit groups when using --group
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum GroupMethod {
14    Separate,
15    Prepend,
16    Append,
17    Both,
18}
19
20/// Output mode for uniq
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum OutputMode {
23    /// Default: print unique lines and first of each duplicate group
24    Default,
25    /// -d: print only first line of duplicate groups
26    RepeatedOnly,
27    /// -D / --all-repeated: print ALL duplicate lines
28    AllRepeated(AllRepeatedMethod),
29    /// -u: print only lines that are NOT duplicated
30    UniqueOnly,
31    /// --group: show all items with group separators
32    Group(GroupMethod),
33}
34
35/// Configuration for uniq processing
36#[derive(Debug, Clone)]
37pub struct UniqConfig {
38    pub mode: OutputMode,
39    pub count: bool,
40    pub ignore_case: bool,
41    pub skip_fields: usize,
42    pub skip_chars: usize,
43    pub check_chars: Option<usize>,
44    pub zero_terminated: bool,
45}
46
47impl Default for UniqConfig {
48    fn default() -> Self {
49        Self {
50            mode: OutputMode::Default,
51            count: false,
52            ignore_case: false,
53            skip_fields: 0,
54            skip_chars: 0,
55            check_chars: None,
56            zero_terminated: false,
57        }
58    }
59}
60
61/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
62/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
63#[inline(always)]
64fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
65    let mut start = 0;
66    let len = line.len();
67
68    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
69    for _ in 0..config.skip_fields {
70        // Skip blanks (space and tab)
71        while start < len && (line[start] == b' ' || line[start] == b'\t') {
72            start += 1;
73        }
74        // Skip non-blanks (field content)
75        while start < len && line[start] != b' ' && line[start] != b'\t' {
76            start += 1;
77        }
78    }
79
80    // Skip N characters
81    if config.skip_chars > 0 {
82        let remaining = len - start;
83        let skip = config.skip_chars.min(remaining);
84        start += skip;
85    }
86
87    let slice = &line[start..];
88
89    // Limit comparison to N characters
90    if let Some(w) = config.check_chars {
91        if w < slice.len() {
92            return &slice[..w];
93        }
94    }
95
96    slice
97}
98
99/// Compare two lines (without terminators) using the config's comparison rules.
100#[inline(always)]
101fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
102    let sa = get_compare_slice(a, config);
103    let sb = get_compare_slice(b, config);
104
105    if config.ignore_case {
106        sa.eq_ignore_ascii_case(sb)
107    } else {
108        sa == sb
109    }
110}
111
112/// Check if config requires field/char skipping or char limiting.
113#[inline(always)]
114fn needs_key_extraction(config: &UniqConfig) -> bool {
115    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
116}
117
118/// Fast path comparison: no field/char extraction needed, no case folding.
119#[inline(always)]
120fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
121    a == b
122}
123
124/// Write a count-prefixed line in GNU uniq format.
125/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
126/// Uses a single write_all by building the prefix in a stack buffer.
127#[inline(always)]
128fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
129    // Build prefix "     N " in a stack buffer (max 21 bytes for u64 + spaces)
130    let mut prefix = [b' '; 28]; // Enough for u64 max + padding + space
131    let digits = itoa_right_aligned_into(&mut prefix, count);
132    let width = digits.max(7); // minimum 7 chars
133    let prefix_len = width + 1; // +1 for trailing space
134    prefix[width] = b' ';
135    // Write prefix + line + term in as few calls as possible
136    out.write_all(&prefix[..prefix_len])?;
137    out.write_all(line)?;
138    out.write_all(&[term])?;
139    Ok(())
140}
141
142/// Write u64 decimal right-aligned into prefix buffer.
143/// Buffer is pre-filled with spaces. Returns number of digits written.
144#[inline(always)]
145fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
146    if val == 0 {
147        buf[6] = b'0';
148        return 7; // 6 spaces + '0' = 7 chars
149    }
150    // Write digits right-to-left from position 27 (leaving room for trailing space)
151    let mut pos = 27;
152    while val > 0 {
153        pos -= 1;
154        buf[pos] = b'0' + (val % 10) as u8;
155        val /= 10;
156    }
157    let num_digits = 27 - pos;
158    if num_digits >= 7 {
159        // Number is wide enough, shift to front
160        buf.copy_within(pos..27, 0);
161        num_digits
162    } else {
163        // Right-align in 7-char field: spaces then digits
164        let pad = 7 - num_digits;
165        buf.copy_within(pos..27, pad);
166        // buf[0..pad] is already spaces from initialization
167        7
168    }
169}
170
171// ============================================================================
172// High-performance mmap-based processing (for byte slices, zero-copy)
173// ============================================================================
174
175/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
176pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
177    let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, output);
178    let term = if config.zero_terminated { b'\0' } else { b'\n' };
179
180    match config.mode {
181        OutputMode::Group(method) => {
182            process_group_bytes(data, &mut writer, config, method, term)?;
183        }
184        OutputMode::AllRepeated(method) => {
185            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
186        }
187        _ => {
188            process_standard_bytes(data, &mut writer, config, term)?;
189        }
190    }
191
192    writer.flush()?;
193    Ok(())
194}
195
196/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
197/// Uses memchr for SIMD-accelerated line boundary detection.
198struct LineIter<'a> {
199    data: &'a [u8],
200    pos: usize,
201    term: u8,
202}
203
204impl<'a> LineIter<'a> {
205    #[inline(always)]
206    fn new(data: &'a [u8], term: u8) -> Self {
207        Self { data, pos: 0, term }
208    }
209}
210
211impl<'a> Iterator for LineIter<'a> {
212    /// (line content without terminator, full line including terminator for output)
213    type Item = (&'a [u8], &'a [u8]);
214
215    #[inline(always)]
216    fn next(&mut self) -> Option<Self::Item> {
217        if self.pos >= self.data.len() {
218            return None;
219        }
220
221        let remaining = &self.data[self.pos..];
222        match memchr::memchr(self.term, remaining) {
223            Some(idx) => {
224                let line_start = self.pos;
225                let line_end = self.pos + idx; // without terminator
226                let full_end = self.pos + idx + 1; // with terminator
227                self.pos = full_end;
228                Some((
229                    &self.data[line_start..line_end],
230                    &self.data[line_start..full_end],
231                ))
232            }
233            None => {
234                // Last line without terminator
235                let line_start = self.pos;
236                self.pos = self.data.len();
237                let line = &self.data[line_start..];
238                Some((line, line))
239            }
240        }
241    }
242}
243
244/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
245fn process_standard_bytes(
246    data: &[u8],
247    writer: &mut impl Write,
248    config: &UniqConfig,
249    term: u8,
250) -> io::Result<()> {
251    let mut lines = LineIter::new(data, term);
252
253    let (prev_content, prev_full) = match lines.next() {
254        Some(v) => v,
255        None => return Ok(()), // empty input
256    };
257
258    let fast = !needs_key_extraction(config) && !config.ignore_case;
259
260    // Ultra-fast path: default mode, no count, no key extraction
261    // Zero-copy: writes contiguous spans directly from mmap data, no intermediate Vec
262    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
263        let data_base = data.as_ptr() as usize;
264        let mut prev_content = prev_content;
265
266        // Write first line
267        writer.write_all(prev_full)?;
268        if prev_full.len() == prev_content.len() {
269            writer.write_all(&[term])?;
270        }
271
272        // Track contiguous output spans in mmap data
273        let mut span_start: usize = usize::MAX; // sentinel = no active span
274        let mut span_end: usize = 0;
275
276        for (cur_content, cur_full) in lines {
277            if lines_equal_fast(prev_content, cur_content) {
278                // Duplicate — flush any active span, skip line
279                if span_start != usize::MAX {
280                    writer.write_all(&data[span_start..span_end])?;
281                    span_start = usize::MAX;
282                }
283                prev_content = cur_content;
284                continue;
285            }
286
287            let cur_offset = cur_full.as_ptr() as usize - data_base;
288
289            if span_start == usize::MAX {
290                // Start new span
291                span_start = cur_offset;
292                span_end = cur_offset + cur_full.len();
293            } else if cur_offset == span_end {
294                // Extend contiguous span (common case — unique lines are adjacent)
295                span_end += cur_full.len();
296            } else {
297                // Non-contiguous — flush and start new span
298                writer.write_all(&data[span_start..span_end])?;
299                span_start = cur_offset;
300                span_end = cur_offset + cur_full.len();
301            }
302
303            // Handle last line without terminator
304            if cur_full.len() == cur_content.len() {
305                writer.write_all(&data[span_start..span_end])?;
306                writer.write_all(&[term])?;
307                span_start = usize::MAX;
308            }
309
310            prev_content = cur_content;
311        }
312
313        // Flush remaining span
314        if span_start != usize::MAX {
315            writer.write_all(&data[span_start..span_end])?;
316        }
317        return Ok(());
318    }
319
320    // General path with count tracking
321    let mut prev_content = prev_content;
322    let mut prev_full = prev_full;
323    let mut count: u64 = 1;
324
325    for (cur_content, cur_full) in lines {
326        let equal = if fast {
327            lines_equal_fast(prev_content, cur_content)
328        } else {
329            lines_equal(prev_content, cur_content, config)
330        };
331
332        if equal {
333            count += 1;
334        } else {
335            // Output previous group
336            output_group_bytes(writer, prev_content, prev_full, count, config, term)?;
337            prev_content = cur_content;
338            prev_full = cur_full;
339            count = 1;
340        }
341    }
342
343    // Output last group
344    output_group_bytes(writer, prev_content, prev_full, count, config, term)?;
345    Ok(())
346}
347
348/// Output a group for standard modes (bytes path).
349#[inline(always)]
350fn output_group_bytes(
351    writer: &mut impl Write,
352    content: &[u8],
353    full: &[u8],
354    count: u64,
355    config: &UniqConfig,
356    term: u8,
357) -> io::Result<()> {
358    let should_print = match config.mode {
359        OutputMode::Default => true,
360        OutputMode::RepeatedOnly => count > 1,
361        OutputMode::UniqueOnly => count == 1,
362        _ => true,
363    };
364
365    if should_print {
366        if config.count {
367            write_count_line(writer, count, content, term)?;
368        } else {
369            writer.write_all(full)?;
370            // Add terminator if the original line didn't have one
371            if full.len() == content.len() {
372                writer.write_all(&[term])?;
373            }
374        }
375    }
376
377    Ok(())
378}
379
380/// Process --all-repeated / -D mode on byte slices.
381fn process_all_repeated_bytes(
382    data: &[u8],
383    writer: &mut impl Write,
384    config: &UniqConfig,
385    method: AllRepeatedMethod,
386    term: u8,
387) -> io::Result<()> {
388    let mut lines = LineIter::new(data, term);
389
390    let first = match lines.next() {
391        Some(v) => v,
392        None => return Ok(()),
393    };
394
395    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
396    // For all-repeated we need to buffer group lines since we only print if count > 1
397    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
398    group_lines.push(first);
399    let mut first_group_printed = false;
400
401    let fast = !needs_key_extraction(config) && !config.ignore_case;
402
403    for (cur_content, cur_full) in lines {
404        let prev_content = group_lines.last().unwrap().0;
405        let equal = if fast {
406            lines_equal_fast(prev_content, cur_content)
407        } else {
408            lines_equal(prev_content, cur_content, config)
409        };
410
411        if equal {
412            group_lines.push((cur_content, cur_full));
413        } else {
414            // Flush group
415            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
416            group_lines.clear();
417            group_lines.push((cur_content, cur_full));
418        }
419    }
420
421    // Flush last group
422    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
423
424    Ok(())
425}
426
427/// Flush a group for --all-repeated mode (bytes path).
428fn flush_all_repeated_bytes(
429    writer: &mut impl Write,
430    group: &[(&[u8], &[u8])],
431    method: AllRepeatedMethod,
432    first_group_printed: &mut bool,
433    term: u8,
434) -> io::Result<()> {
435    if group.len() <= 1 {
436        return Ok(()); // Not a duplicate group
437    }
438
439    match method {
440        AllRepeatedMethod::Prepend => {
441            writer.write_all(&[term])?;
442        }
443        AllRepeatedMethod::Separate => {
444            if *first_group_printed {
445                writer.write_all(&[term])?;
446            }
447        }
448        AllRepeatedMethod::None => {}
449    }
450
451    for &(content, full) in group {
452        writer.write_all(full)?;
453        if full.len() == content.len() {
454            writer.write_all(&[term])?;
455        }
456    }
457
458    *first_group_printed = true;
459    Ok(())
460}
461
462/// Process --group mode on byte slices.
463fn process_group_bytes(
464    data: &[u8],
465    writer: &mut impl Write,
466    config: &UniqConfig,
467    method: GroupMethod,
468    term: u8,
469) -> io::Result<()> {
470    let mut lines = LineIter::new(data, term);
471
472    let (prev_content, prev_full) = match lines.next() {
473        Some(v) => v,
474        None => return Ok(()),
475    };
476
477    // Prepend/Both: separator before first group
478    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
479        writer.write_all(&[term])?;
480    }
481
482    // Write first line
483    writer.write_all(prev_full)?;
484    if prev_full.len() == prev_content.len() {
485        writer.write_all(&[term])?;
486    }
487
488    let mut prev_content = prev_content;
489    let fast = !needs_key_extraction(config) && !config.ignore_case;
490
491    for (cur_content, cur_full) in lines {
492        let equal = if fast {
493            lines_equal_fast(prev_content, cur_content)
494        } else {
495            lines_equal(prev_content, cur_content, config)
496        };
497
498        if !equal {
499            // New group — write separator
500            writer.write_all(&[term])?;
501        }
502
503        writer.write_all(cur_full)?;
504        if cur_full.len() == cur_content.len() {
505            writer.write_all(&[term])?;
506        }
507
508        prev_content = cur_content;
509    }
510
511    // Append/Both: separator after last group
512    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
513        writer.write_all(&[term])?;
514    }
515
516    Ok(())
517}
518
519// ============================================================================
520// Streaming processing (for stdin / pipe input)
521// ============================================================================
522
523/// Main streaming uniq processor.
524/// Reads from `input`, writes to `output`.
525pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
526    let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
527    let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, output);
528    let term = if config.zero_terminated { b'\0' } else { b'\n' };
529
530    match config.mode {
531        OutputMode::Group(method) => {
532            process_group_stream(reader, &mut writer, config, method, term)?;
533        }
534        OutputMode::AllRepeated(method) => {
535            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
536        }
537        _ => {
538            process_standard_stream(reader, &mut writer, config, term)?;
539        }
540    }
541
542    writer.flush()?;
543    Ok(())
544}
545
546/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
547fn process_standard_stream<R: BufRead, W: Write>(
548    mut reader: R,
549    writer: &mut W,
550    config: &UniqConfig,
551    term: u8,
552) -> io::Result<()> {
553    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
554    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
555
556    // Read first line
557    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
558        return Ok(()); // empty input
559    }
560    let mut count: u64 = 1;
561
562    loop {
563        current_line.clear();
564        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
565
566        if bytes_read == 0 {
567            // End of input — output the last group
568            output_group_stream(writer, &prev_line, count, config, term)?;
569            break;
570        }
571
572        if compare_lines_stream(&prev_line, &current_line, config, term) {
573            count += 1;
574        } else {
575            output_group_stream(writer, &prev_line, count, config, term)?;
576            std::mem::swap(&mut prev_line, &mut current_line);
577            count = 1;
578        }
579    }
580
581    Ok(())
582}
583
584/// Compare two lines (with terminators) in streaming mode.
585#[inline(always)]
586fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
587    let a_stripped = strip_term(a, term);
588    let b_stripped = strip_term(b, term);
589    lines_equal(a_stripped, b_stripped, config)
590}
591
592/// Strip terminator from end of line.
593#[inline(always)]
594fn strip_term(line: &[u8], term: u8) -> &[u8] {
595    if line.last() == Some(&term) {
596        &line[..line.len() - 1]
597    } else {
598        line
599    }
600}
601
602/// Output a group in streaming mode.
603#[inline(always)]
604fn output_group_stream(
605    writer: &mut impl Write,
606    line: &[u8],
607    count: u64,
608    config: &UniqConfig,
609    term: u8,
610) -> io::Result<()> {
611    let should_print = match config.mode {
612        OutputMode::Default => true,
613        OutputMode::RepeatedOnly => count > 1,
614        OutputMode::UniqueOnly => count == 1,
615        _ => true,
616    };
617
618    if should_print {
619        let content = strip_term(line, term);
620        if config.count {
621            write_count_line(writer, count, content, term)?;
622        } else {
623            writer.write_all(content)?;
624            writer.write_all(&[term])?;
625        }
626    }
627
628    Ok(())
629}
630
631/// Process --all-repeated / -D mode (streaming).
632fn process_all_repeated_stream<R: BufRead, W: Write>(
633    mut reader: R,
634    writer: &mut W,
635    config: &UniqConfig,
636    method: AllRepeatedMethod,
637    term: u8,
638) -> io::Result<()> {
639    let mut group: Vec<Vec<u8>> = Vec::new();
640    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
641    let mut first_group_printed = false;
642
643    current_line.clear();
644    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
645        return Ok(());
646    }
647    group.push(current_line.clone());
648
649    loop {
650        current_line.clear();
651        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
652
653        if bytes_read == 0 {
654            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
655            break;
656        }
657
658        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
659            group.push(current_line.clone());
660        } else {
661            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
662            group.clear();
663            group.push(current_line.clone());
664        }
665    }
666
667    Ok(())
668}
669
670/// Flush a group for --all-repeated mode (streaming).
671fn flush_all_repeated_stream(
672    writer: &mut impl Write,
673    group: &[Vec<u8>],
674    method: AllRepeatedMethod,
675    first_group_printed: &mut bool,
676    term: u8,
677) -> io::Result<()> {
678    if group.len() <= 1 {
679        return Ok(());
680    }
681
682    match method {
683        AllRepeatedMethod::Prepend => {
684            writer.write_all(&[term])?;
685        }
686        AllRepeatedMethod::Separate => {
687            if *first_group_printed {
688                writer.write_all(&[term])?;
689            }
690        }
691        AllRepeatedMethod::None => {}
692    }
693
694    for line in group {
695        let content = strip_term(line, term);
696        writer.write_all(content)?;
697        writer.write_all(&[term])?;
698    }
699
700    *first_group_printed = true;
701    Ok(())
702}
703
704/// Process --group mode (streaming).
705fn process_group_stream<R: BufRead, W: Write>(
706    mut reader: R,
707    writer: &mut W,
708    config: &UniqConfig,
709    method: GroupMethod,
710    term: u8,
711) -> io::Result<()> {
712    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
713    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
714
715    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
716        return Ok(());
717    }
718
719    // Prepend/Both: separator before first group
720    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
721        writer.write_all(&[term])?;
722    }
723
724    let content = strip_term(&prev_line, term);
725    writer.write_all(content)?;
726    writer.write_all(&[term])?;
727
728    loop {
729        current_line.clear();
730        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
731
732        if bytes_read == 0 {
733            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
734                writer.write_all(&[term])?;
735            }
736            break;
737        }
738
739        if !compare_lines_stream(&prev_line, &current_line, config, term) {
740            writer.write_all(&[term])?;
741        }
742
743        let content = strip_term(&current_line, term);
744        writer.write_all(content)?;
745        writer.write_all(&[term])?;
746
747        std::mem::swap(&mut prev_line, &mut current_line);
748    }
749
750    Ok(())
751}
752
753/// Read a line terminated by the given byte (newline or NUL).
754/// Returns number of bytes read (0 = EOF).
755#[inline(always)]
756fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
757    reader.read_until(term, buf)
758}