Skip to main content

coreutils_rs/uniq/
core.rs

1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3/// How to delimit groups when using --all-repeated
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum AllRepeatedMethod {
6    None,
7    Prepend,
8    Separate,
9}
10
11/// How to delimit groups when using --group
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum GroupMethod {
14    Separate,
15    Prepend,
16    Append,
17    Both,
18}
19
20/// Output mode for uniq
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum OutputMode {
23    /// Default: print unique lines and first of each duplicate group
24    Default,
25    /// -d: print only first line of duplicate groups
26    RepeatedOnly,
27    /// -D / --all-repeated: print ALL duplicate lines
28    AllRepeated(AllRepeatedMethod),
29    /// -u: print only lines that are NOT duplicated
30    UniqueOnly,
31    /// --group: show all items with group separators
32    Group(GroupMethod),
33}
34
35/// Configuration for uniq processing
36#[derive(Debug, Clone)]
37pub struct UniqConfig {
38    pub mode: OutputMode,
39    pub count: bool,
40    pub ignore_case: bool,
41    pub skip_fields: usize,
42    pub skip_chars: usize,
43    pub check_chars: Option<usize>,
44    pub zero_terminated: bool,
45}
46
47impl Default for UniqConfig {
48    fn default() -> Self {
49        Self {
50            mode: OutputMode::Default,
51            count: false,
52            ignore_case: false,
53            skip_fields: 0,
54            skip_chars: 0,
55            check_chars: None,
56            zero_terminated: false,
57        }
58    }
59}
60
61/// Extract the comparison key from a line according to skip_fields, skip_chars, check_chars.
62/// Matches GNU uniq field-skip semantics exactly: for each field, skip blanks then non-blanks.
63#[inline(always)]
64fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
65    let mut start = 0;
66    let len = line.len();
67
68    // Skip N fields (GNU: each field = run of blanks + run of non-blanks)
69    for _ in 0..config.skip_fields {
70        // Skip blanks (space and tab)
71        while start < len && (line[start] == b' ' || line[start] == b'\t') {
72            start += 1;
73        }
74        // Skip non-blanks (field content)
75        while start < len && line[start] != b' ' && line[start] != b'\t' {
76            start += 1;
77        }
78    }
79
80    // Skip N characters
81    if config.skip_chars > 0 {
82        let remaining = len - start;
83        let skip = config.skip_chars.min(remaining);
84        start += skip;
85    }
86
87    let slice = &line[start..];
88
89    // Limit comparison to N characters
90    if let Some(w) = config.check_chars {
91        if w < slice.len() {
92            return &slice[..w];
93        }
94    }
95
96    slice
97}
98
99/// Compare two lines (without terminators) using the config's comparison rules.
100#[inline(always)]
101fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
102    let sa = get_compare_slice(a, config);
103    let sb = get_compare_slice(b, config);
104
105    if config.ignore_case {
106        sa.eq_ignore_ascii_case(sb)
107    } else {
108        sa == sb
109    }
110}
111
112/// Check if config requires field/char skipping or char limiting.
113#[inline(always)]
114fn needs_key_extraction(config: &UniqConfig) -> bool {
115    config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
116}
117
118/// Fast path comparison: no field/char extraction needed, no case folding.
119#[inline(always)]
120fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
121    a == b
122}
123
124/// Write a count-prefixed line in GNU uniq format.
125/// GNU format: "%7lu " — right-aligned in 7-char field, followed by space.
126#[inline(always)]
127fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
128    // Fast path for small counts (vast majority of cases)
129    // Manually format to avoid write!() macro overhead
130    let mut buf = [b' '; 20]; // Enough for u64 max
131    let count_len = {
132        let count_str = itoa_right_aligned(&mut buf, count);
133        count_str.len()
134    };
135    let start = 20 - count_len;
136    if count_len < 7 {
137        // Pad with spaces to width 7
138        let pad = 7 - count_len;
139        out.write_all(&buf[..pad])?; // spaces
140    }
141    out.write_all(&buf[start..])?;
142    out.write_all(b" ")?;
143    out.write_all(line)?;
144    out.write_all(&[term])?;
145    Ok(())
146}
147
148/// Convert u64 to decimal string in a stack buffer, right-aligned.
149/// Returns the slice of the buffer containing the decimal digits.
150#[inline(always)]
151fn itoa_right_aligned(buf: &mut [u8; 20], mut val: u64) -> &[u8] {
152    if val == 0 {
153        buf[19] = b'0';
154        return &buf[19..20];
155    }
156    let mut pos = 20;
157    while val > 0 {
158        pos -= 1;
159        buf[pos] = b'0' + (val % 10) as u8;
160        val /= 10;
161    }
162    &buf[pos..20]
163}
164
165// ============================================================================
166// High-performance mmap-based processing (for byte slices, zero-copy)
167// ============================================================================
168
169/// Process uniq from a byte slice (mmap'd file). Zero-copy, no per-line allocation.
170pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
171    let mut writer = BufWriter::with_capacity(4 * 1024 * 1024, output);
172    let term = if config.zero_terminated { b'\0' } else { b'\n' };
173
174    match config.mode {
175        OutputMode::Group(method) => {
176            process_group_bytes(data, &mut writer, config, method, term)?;
177        }
178        OutputMode::AllRepeated(method) => {
179            process_all_repeated_bytes(data, &mut writer, config, method, term)?;
180        }
181        _ => {
182            process_standard_bytes(data, &mut writer, config, term)?;
183        }
184    }
185
186    writer.flush()?;
187    Ok(())
188}
189
190/// Iterator over lines in a byte slice, yielding (line_without_terminator, has_terminator).
191/// Uses memchr for SIMD-accelerated line boundary detection.
192struct LineIter<'a> {
193    data: &'a [u8],
194    pos: usize,
195    term: u8,
196}
197
198impl<'a> LineIter<'a> {
199    #[inline(always)]
200    fn new(data: &'a [u8], term: u8) -> Self {
201        Self { data, pos: 0, term }
202    }
203}
204
205impl<'a> Iterator for LineIter<'a> {
206    /// (line content without terminator, full line including terminator for output)
207    type Item = (&'a [u8], &'a [u8]);
208
209    #[inline(always)]
210    fn next(&mut self) -> Option<Self::Item> {
211        if self.pos >= self.data.len() {
212            return None;
213        }
214
215        let remaining = &self.data[self.pos..];
216        match memchr::memchr(self.term, remaining) {
217            Some(idx) => {
218                let line_start = self.pos;
219                let line_end = self.pos + idx; // without terminator
220                let full_end = self.pos + idx + 1; // with terminator
221                self.pos = full_end;
222                Some((
223                    &self.data[line_start..line_end],
224                    &self.data[line_start..full_end],
225                ))
226            }
227            None => {
228                // Last line without terminator
229                let line_start = self.pos;
230                self.pos = self.data.len();
231                let line = &self.data[line_start..];
232                Some((line, line))
233            }
234        }
235    }
236}
237
238/// Batched output buffer size for uniq.
239const UNIQ_BUF_SIZE: usize = 4 * 1024 * 1024;
240
241/// Standard processing for Default, RepeatedOnly, UniqueOnly on byte slices.
242fn process_standard_bytes(
243    data: &[u8],
244    writer: &mut impl Write,
245    config: &UniqConfig,
246    term: u8,
247) -> io::Result<()> {
248    let mut lines = LineIter::new(data, term);
249
250    let (prev_content, prev_full) = match lines.next() {
251        Some(v) => v,
252        None => return Ok(()), // empty input
253    };
254
255    let fast = !needs_key_extraction(config) && !config.ignore_case;
256
257    // Ultra-fast path: default mode, no count, no key extraction
258    // Uses batched Vec output to avoid per-line write_all overhead
259    if fast && !config.count && matches!(config.mode, OutputMode::Default) {
260        let mut prev_content = prev_content;
261        let mut out_buf = Vec::with_capacity(UNIQ_BUF_SIZE);
262        out_buf.extend_from_slice(prev_full);
263        if prev_full.len() == prev_content.len() {
264            out_buf.push(term);
265        }
266        for (cur_content, cur_full) in lines {
267            if !lines_equal_fast(prev_content, cur_content) {
268                out_buf.extend_from_slice(cur_full);
269                if cur_full.len() == cur_content.len() {
270                    out_buf.push(term);
271                }
272                if out_buf.len() >= UNIQ_BUF_SIZE {
273                    writer.write_all(&out_buf)?;
274                    out_buf.clear();
275                }
276                prev_content = cur_content;
277            }
278        }
279        if !out_buf.is_empty() {
280            writer.write_all(&out_buf)?;
281        }
282        return Ok(());
283    }
284
285    // General path with count tracking
286    let mut prev_content = prev_content;
287    let mut prev_full = prev_full;
288    let mut count: u64 = 1;
289
290    for (cur_content, cur_full) in lines {
291        let equal = if fast {
292            lines_equal_fast(prev_content, cur_content)
293        } else {
294            lines_equal(prev_content, cur_content, config)
295        };
296
297        if equal {
298            count += 1;
299        } else {
300            // Output previous group
301            output_group_bytes(writer, prev_content, prev_full, count, config, term)?;
302            prev_content = cur_content;
303            prev_full = cur_full;
304            count = 1;
305        }
306    }
307
308    // Output last group
309    output_group_bytes(writer, prev_content, prev_full, count, config, term)?;
310    Ok(())
311}
312
313/// Output a group for standard modes (bytes path).
314#[inline(always)]
315fn output_group_bytes(
316    writer: &mut impl Write,
317    content: &[u8],
318    full: &[u8],
319    count: u64,
320    config: &UniqConfig,
321    term: u8,
322) -> io::Result<()> {
323    let should_print = match config.mode {
324        OutputMode::Default => true,
325        OutputMode::RepeatedOnly => count > 1,
326        OutputMode::UniqueOnly => count == 1,
327        _ => true,
328    };
329
330    if should_print {
331        if config.count {
332            write_count_line(writer, count, content, term)?;
333        } else {
334            writer.write_all(full)?;
335            // Add terminator if the original line didn't have one
336            if full.len() == content.len() {
337                writer.write_all(&[term])?;
338            }
339        }
340    }
341
342    Ok(())
343}
344
345/// Process --all-repeated / -D mode on byte slices.
346fn process_all_repeated_bytes(
347    data: &[u8],
348    writer: &mut impl Write,
349    config: &UniqConfig,
350    method: AllRepeatedMethod,
351    term: u8,
352) -> io::Result<()> {
353    let mut lines = LineIter::new(data, term);
354
355    let first = match lines.next() {
356        Some(v) => v,
357        None => return Ok(()),
358    };
359
360    // Collect groups as (start_offset, line_count, first_line_content, lines_vec)
361    // For all-repeated we need to buffer group lines since we only print if count > 1
362    let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
363    group_lines.push(first);
364    let mut first_group_printed = false;
365
366    let fast = !needs_key_extraction(config) && !config.ignore_case;
367
368    for (cur_content, cur_full) in lines {
369        let prev_content = group_lines.last().unwrap().0;
370        let equal = if fast {
371            lines_equal_fast(prev_content, cur_content)
372        } else {
373            lines_equal(prev_content, cur_content, config)
374        };
375
376        if equal {
377            group_lines.push((cur_content, cur_full));
378        } else {
379            // Flush group
380            flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
381            group_lines.clear();
382            group_lines.push((cur_content, cur_full));
383        }
384    }
385
386    // Flush last group
387    flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
388
389    Ok(())
390}
391
392/// Flush a group for --all-repeated mode (bytes path).
393fn flush_all_repeated_bytes(
394    writer: &mut impl Write,
395    group: &[(&[u8], &[u8])],
396    method: AllRepeatedMethod,
397    first_group_printed: &mut bool,
398    term: u8,
399) -> io::Result<()> {
400    if group.len() <= 1 {
401        return Ok(()); // Not a duplicate group
402    }
403
404    match method {
405        AllRepeatedMethod::Prepend => {
406            writer.write_all(&[term])?;
407        }
408        AllRepeatedMethod::Separate => {
409            if *first_group_printed {
410                writer.write_all(&[term])?;
411            }
412        }
413        AllRepeatedMethod::None => {}
414    }
415
416    for &(content, full) in group {
417        writer.write_all(full)?;
418        if full.len() == content.len() {
419            writer.write_all(&[term])?;
420        }
421    }
422
423    *first_group_printed = true;
424    Ok(())
425}
426
427/// Process --group mode on byte slices.
428fn process_group_bytes(
429    data: &[u8],
430    writer: &mut impl Write,
431    config: &UniqConfig,
432    method: GroupMethod,
433    term: u8,
434) -> io::Result<()> {
435    let mut lines = LineIter::new(data, term);
436
437    let (prev_content, prev_full) = match lines.next() {
438        Some(v) => v,
439        None => return Ok(()),
440    };
441
442    // Prepend/Both: separator before first group
443    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
444        writer.write_all(&[term])?;
445    }
446
447    // Write first line
448    writer.write_all(prev_full)?;
449    if prev_full.len() == prev_content.len() {
450        writer.write_all(&[term])?;
451    }
452
453    let mut prev_content = prev_content;
454    let fast = !needs_key_extraction(config) && !config.ignore_case;
455
456    for (cur_content, cur_full) in lines {
457        let equal = if fast {
458            lines_equal_fast(prev_content, cur_content)
459        } else {
460            lines_equal(prev_content, cur_content, config)
461        };
462
463        if !equal {
464            // New group — write separator
465            writer.write_all(&[term])?;
466        }
467
468        writer.write_all(cur_full)?;
469        if cur_full.len() == cur_content.len() {
470            writer.write_all(&[term])?;
471        }
472
473        prev_content = cur_content;
474    }
475
476    // Append/Both: separator after last group
477    if matches!(method, GroupMethod::Append | GroupMethod::Both) {
478        writer.write_all(&[term])?;
479    }
480
481    Ok(())
482}
483
484// ============================================================================
485// Streaming processing (for stdin / pipe input)
486// ============================================================================
487
488/// Main streaming uniq processor.
489/// Reads from `input`, writes to `output`.
490pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
491    let reader = BufReader::with_capacity(4 * 1024 * 1024, input);
492    let mut writer = BufWriter::with_capacity(4 * 1024 * 1024, output);
493    let term = if config.zero_terminated { b'\0' } else { b'\n' };
494
495    match config.mode {
496        OutputMode::Group(method) => {
497            process_group_stream(reader, &mut writer, config, method, term)?;
498        }
499        OutputMode::AllRepeated(method) => {
500            process_all_repeated_stream(reader, &mut writer, config, method, term)?;
501        }
502        _ => {
503            process_standard_stream(reader, &mut writer, config, term)?;
504        }
505    }
506
507    writer.flush()?;
508    Ok(())
509}
510
511/// Standard processing for Default, RepeatedOnly, UniqueOnly modes (streaming).
512fn process_standard_stream<R: BufRead, W: Write>(
513    mut reader: R,
514    writer: &mut W,
515    config: &UniqConfig,
516    term: u8,
517) -> io::Result<()> {
518    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
519    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
520
521    // Read first line
522    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
523        return Ok(()); // empty input
524    }
525    let mut count: u64 = 1;
526
527    loop {
528        current_line.clear();
529        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
530
531        if bytes_read == 0 {
532            // End of input — output the last group
533            output_group_stream(writer, &prev_line, count, config, term)?;
534            break;
535        }
536
537        if compare_lines_stream(&prev_line, &current_line, config, term) {
538            count += 1;
539        } else {
540            output_group_stream(writer, &prev_line, count, config, term)?;
541            std::mem::swap(&mut prev_line, &mut current_line);
542            count = 1;
543        }
544    }
545
546    Ok(())
547}
548
549/// Compare two lines (with terminators) in streaming mode.
550#[inline(always)]
551fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
552    let a_stripped = strip_term(a, term);
553    let b_stripped = strip_term(b, term);
554    lines_equal(a_stripped, b_stripped, config)
555}
556
557/// Strip terminator from end of line.
558#[inline(always)]
559fn strip_term(line: &[u8], term: u8) -> &[u8] {
560    if line.last() == Some(&term) {
561        &line[..line.len() - 1]
562    } else {
563        line
564    }
565}
566
567/// Output a group in streaming mode.
568#[inline(always)]
569fn output_group_stream(
570    writer: &mut impl Write,
571    line: &[u8],
572    count: u64,
573    config: &UniqConfig,
574    term: u8,
575) -> io::Result<()> {
576    let should_print = match config.mode {
577        OutputMode::Default => true,
578        OutputMode::RepeatedOnly => count > 1,
579        OutputMode::UniqueOnly => count == 1,
580        _ => true,
581    };
582
583    if should_print {
584        let content = strip_term(line, term);
585        if config.count {
586            write_count_line(writer, count, content, term)?;
587        } else {
588            writer.write_all(content)?;
589            writer.write_all(&[term])?;
590        }
591    }
592
593    Ok(())
594}
595
596/// Process --all-repeated / -D mode (streaming).
597fn process_all_repeated_stream<R: BufRead, W: Write>(
598    mut reader: R,
599    writer: &mut W,
600    config: &UniqConfig,
601    method: AllRepeatedMethod,
602    term: u8,
603) -> io::Result<()> {
604    let mut group: Vec<Vec<u8>> = Vec::new();
605    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
606    let mut first_group_printed = false;
607
608    current_line.clear();
609    if read_line_term(&mut reader, &mut current_line, term)? == 0 {
610        return Ok(());
611    }
612    group.push(current_line.clone());
613
614    loop {
615        current_line.clear();
616        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
617
618        if bytes_read == 0 {
619            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
620            break;
621        }
622
623        if compare_lines_stream(group.last().unwrap(), &current_line, config, term) {
624            group.push(current_line.clone());
625        } else {
626            flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
627            group.clear();
628            group.push(current_line.clone());
629        }
630    }
631
632    Ok(())
633}
634
635/// Flush a group for --all-repeated mode (streaming).
636fn flush_all_repeated_stream(
637    writer: &mut impl Write,
638    group: &[Vec<u8>],
639    method: AllRepeatedMethod,
640    first_group_printed: &mut bool,
641    term: u8,
642) -> io::Result<()> {
643    if group.len() <= 1 {
644        return Ok(());
645    }
646
647    match method {
648        AllRepeatedMethod::Prepend => {
649            writer.write_all(&[term])?;
650        }
651        AllRepeatedMethod::Separate => {
652            if *first_group_printed {
653                writer.write_all(&[term])?;
654            }
655        }
656        AllRepeatedMethod::None => {}
657    }
658
659    for line in group {
660        let content = strip_term(line, term);
661        writer.write_all(content)?;
662        writer.write_all(&[term])?;
663    }
664
665    *first_group_printed = true;
666    Ok(())
667}
668
669/// Process --group mode (streaming).
670fn process_group_stream<R: BufRead, W: Write>(
671    mut reader: R,
672    writer: &mut W,
673    config: &UniqConfig,
674    method: GroupMethod,
675    term: u8,
676) -> io::Result<()> {
677    let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
678    let mut current_line: Vec<u8> = Vec::with_capacity(4096);
679
680    if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
681        return Ok(());
682    }
683
684    // Prepend/Both: separator before first group
685    if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
686        writer.write_all(&[term])?;
687    }
688
689    let content = strip_term(&prev_line, term);
690    writer.write_all(content)?;
691    writer.write_all(&[term])?;
692
693    loop {
694        current_line.clear();
695        let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
696
697        if bytes_read == 0 {
698            if matches!(method, GroupMethod::Append | GroupMethod::Both) {
699                writer.write_all(&[term])?;
700            }
701            break;
702        }
703
704        if !compare_lines_stream(&prev_line, &current_line, config, term) {
705            writer.write_all(&[term])?;
706        }
707
708        let content = strip_term(&current_line, term);
709        writer.write_all(content)?;
710        writer.write_all(&[term])?;
711
712        std::mem::swap(&mut prev_line, &mut current_line);
713    }
714
715    Ok(())
716}
717
718/// Read a line terminated by the given byte (newline or NUL).
719/// Returns number of bytes read (0 = EOF).
720#[inline(always)]
721fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
722    reader.read_until(term, buf)
723}