1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6 writer.write_all(buf)
7}
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12 None,
13 Prepend,
14 Separate,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20 Separate,
21 Prepend,
22 Append,
23 Both,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29 Default,
31 RepeatedOnly,
33 AllRepeated(AllRepeatedMethod),
35 UniqueOnly,
37 Group(GroupMethod),
39}
40
41#[derive(Debug, Clone)]
43pub struct UniqConfig {
44 pub mode: OutputMode,
45 pub count: bool,
46 pub ignore_case: bool,
47 pub skip_fields: usize,
48 pub skip_chars: usize,
49 pub check_chars: Option<usize>,
50 pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54 fn default() -> Self {
55 Self {
56 mode: OutputMode::Default,
57 count: false,
58 ignore_case: false,
59 skip_fields: 0,
60 skip_chars: 0,
61 check_chars: None,
62 zero_terminated: false,
63 }
64 }
65}
66
67#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71 let mut start = 0;
72 let len = line.len();
73
74 for _ in 0..config.skip_fields {
76 while start < len && (line[start] == b' ' || line[start] == b'\t') {
78 start += 1;
79 }
80 while start < len && line[start] != b' ' && line[start] != b'\t' {
82 start += 1;
83 }
84 }
85
86 if config.skip_chars > 0 {
88 let remaining = len - start;
89 let skip = config.skip_chars.min(remaining);
90 start += skip;
91 }
92
93 let slice = &line[start..];
94
95 if let Some(w) = config.check_chars {
97 if w < slice.len() {
98 return &slice[..w];
99 }
100 }
101
102 slice
103}
104
105#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108 let sa = get_compare_slice(a, config);
109 let sb = get_compare_slice(b, config);
110
111 if config.ignore_case {
112 sa.eq_ignore_ascii_case(sb)
113 } else {
114 sa == sb
115 }
116}
117
118#[inline(always)]
121fn lines_equal_case_insensitive(a: &[u8], b: &[u8]) -> bool {
122 let alen = a.len();
123 if alen != b.len() {
124 return false;
125 }
126 if alen == 0 {
127 return true;
128 }
129 a.eq_ignore_ascii_case(b)
130}
131
132#[inline(always)]
134fn needs_key_extraction(config: &UniqConfig) -> bool {
135 config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
136}
137
138#[inline(always)]
141fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
142 let alen = a.len();
143 if alen != b.len() {
144 return false;
145 }
146 if alen == 0 {
147 return true;
148 }
149 if alen >= 8 {
151 let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
152 let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
153 if a8 != b8 {
154 return false;
155 }
156 }
157 a == b
158}
159
160#[inline(always)]
164fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
165 let mut prefix = [b' '; 28]; let digits = itoa_right_aligned_into(&mut prefix, count);
168 let width = digits.max(7); let prefix_len = width + 1; prefix[width] = b' ';
171
172 let total = prefix_len + line.len() + 1;
174 if total <= 256 {
175 let mut buf = [0u8; 256];
176 buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
177 buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
178 buf[prefix_len + line.len()] = term;
179 out.write_all(&buf[..total])
180 } else {
181 out.write_all(&prefix[..prefix_len])?;
182 out.write_all(line)?;
183 out.write_all(&[term])
184 }
185}
186
187#[inline(always)]
190fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
191 if val == 0 {
192 buf[6] = b'0';
193 return 7; }
195 let mut pos = 27;
197 while val > 0 {
198 pos -= 1;
199 buf[pos] = b'0' + (val % 10) as u8;
200 val /= 10;
201 }
202 let num_digits = 27 - pos;
203 if num_digits >= 7 {
204 buf.copy_within(pos..27, 0);
206 num_digits
207 } else {
208 let pad = 7 - num_digits;
210 buf.copy_within(pos..27, pad);
211 7
213 }
214}
215
216pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
222 let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
224 let term = if config.zero_terminated { b'\0' } else { b'\n' };
225
226 match config.mode {
227 OutputMode::Group(method) => {
228 process_group_bytes(data, &mut writer, config, method, term)?;
229 }
230 OutputMode::AllRepeated(method) => {
231 process_all_repeated_bytes(data, &mut writer, config, method, term)?;
232 }
233 _ => {
234 process_standard_bytes(data, &mut writer, config, term)?;
235 }
236 }
237
238 writer.flush()?;
239 Ok(())
240}
241
242struct LineIter<'a> {
245 data: &'a [u8],
246 pos: usize,
247 term: u8,
248}
249
250impl<'a> LineIter<'a> {
251 #[inline(always)]
252 fn new(data: &'a [u8], term: u8) -> Self {
253 Self { data, pos: 0, term }
254 }
255}
256
257impl<'a> Iterator for LineIter<'a> {
258 type Item = (&'a [u8], &'a [u8]);
260
261 #[inline(always)]
262 fn next(&mut self) -> Option<Self::Item> {
263 if self.pos >= self.data.len() {
264 return None;
265 }
266
267 let remaining = &self.data[self.pos..];
268 match memchr::memchr(self.term, remaining) {
269 Some(idx) => {
270 let line_start = self.pos;
271 let line_end = self.pos + idx; let full_end = self.pos + idx + 1; self.pos = full_end;
274 Some((
275 &self.data[line_start..line_end],
276 &self.data[line_start..full_end],
277 ))
278 }
279 None => {
280 let line_start = self.pos;
282 self.pos = self.data.len();
283 let line = &self.data[line_start..];
284 Some((line, line))
285 }
286 }
287 }
288}
289
290#[inline(always)]
293fn line_content_at<'a>(
294 data: &'a [u8],
295 line_starts: &[usize],
296 idx: usize,
297 content_end: usize,
298) -> &'a [u8] {
299 let start = line_starts[idx];
300 let end = if idx + 1 < line_starts.len() {
301 line_starts[idx + 1] - 1 } else {
303 content_end };
305 &data[start..end]
306}
307
308#[inline(always)]
310fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
311 let start = line_starts[idx];
312 let end = if idx + 1 < line_starts.len() {
313 line_starts[idx + 1] } else {
315 data.len()
316 };
317 &data[start..end]
318}
319
320#[inline]
325fn linear_scan_group_end(
326 data: &[u8],
327 line_starts: &[usize],
328 group_start: usize,
329 num_lines: usize,
330 content_end: usize,
331) -> usize {
332 let key = line_content_at(data, line_starts, group_start, content_end);
333 let mut i = group_start + 1;
334 while i < num_lines {
335 if !lines_equal_fast(key, line_content_at(data, line_starts, i, content_end)) {
336 return i;
337 }
338 i += 1;
339 }
340 i
341}
342
343fn process_standard_bytes(
347 data: &[u8],
348 writer: &mut impl Write,
349 config: &UniqConfig,
350 term: u8,
351) -> io::Result<()> {
352 if data.is_empty() {
353 return Ok(());
354 }
355
356 let fast = !needs_key_extraction(config) && !config.ignore_case;
357 let fast_ci = !needs_key_extraction(config) && config.ignore_case;
358
359 if fast && !config.count && matches!(config.mode, OutputMode::Default) {
363 return process_default_fast_singlepass(data, writer, term);
364 }
365
366 if fast
368 && !config.count
369 && matches!(
370 config.mode,
371 OutputMode::RepeatedOnly | OutputMode::UniqueOnly
372 )
373 {
374 return process_filter_fast_singlepass(data, writer, config, term);
375 }
376
377 if fast && config.count {
381 return process_count_fast_singlepass(data, writer, config, term);
382 }
383
384 if fast_ci && !config.count && matches!(config.mode, OutputMode::Default) {
388 return process_default_ci_singlepass(data, writer, term);
389 }
390
391 if fast_ci
392 && !config.count
393 && matches!(
394 config.mode,
395 OutputMode::RepeatedOnly | OutputMode::UniqueOnly
396 )
397 {
398 return process_filter_ci_singlepass(data, writer, config, term);
399 }
400
401 if fast_ci && config.count {
402 return process_count_ci_singlepass(data, writer, config, term);
403 }
404
405 let estimated_lines = (data.len() / 40).max(64);
407 let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
408 line_starts.push(0);
409 for pos in memchr::memchr_iter(term, data) {
410 if pos + 1 < data.len() {
411 line_starts.push(pos + 1);
412 }
413 }
414 let num_lines = line_starts.len();
415 if num_lines == 0 {
416 return Ok(());
417 }
418
419 let content_end = if data.last() == Some(&term) {
421 data.len() - 1
422 } else {
423 data.len()
424 };
425
426 if fast && !config.count && matches!(config.mode, OutputMode::Default) {
428 let first_full = line_full_at(data, &line_starts, 0);
430 let first_content = line_content_at(data, &line_starts, 0, content_end);
431 write_all_raw(writer, first_full)?;
432 if first_full.len() == first_content.len() {
433 writer.write_all(&[term])?;
434 }
435
436 let mut i = 1;
437 while i < num_lines {
438 let prev = line_content_at(data, &line_starts, i - 1, content_end);
439 let cur = line_content_at(data, &line_starts, i, content_end);
440
441 if lines_equal_fast(prev, cur) {
442 let group_end =
444 linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
445 i = group_end;
446 continue;
447 }
448
449 let cur_full = line_full_at(data, &line_starts, i);
451 write_all_raw(writer, cur_full)?;
452 if cur_full.len() == cur.len() {
453 writer.write_all(&[term])?;
454 }
455 i += 1;
456 }
457 return Ok(());
458 }
459
460 let mut i = 0;
462 while i < num_lines {
463 let content = line_content_at(data, &line_starts, i, content_end);
464 let full = line_full_at(data, &line_starts, i);
465
466 let group_end = if fast
467 && i + 1 < num_lines
468 && lines_equal_fast(
469 content,
470 line_content_at(data, &line_starts, i + 1, content_end),
471 ) {
472 linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
474 } else if !fast
475 && i + 1 < num_lines
476 && lines_equal(
477 content,
478 line_content_at(data, &line_starts, i + 1, content_end),
479 config,
480 )
481 {
482 let mut j = i + 2;
484 while j < num_lines {
485 if !lines_equal(
486 content,
487 line_content_at(data, &line_starts, j, content_end),
488 config,
489 ) {
490 break;
491 }
492 j += 1;
493 }
494 j
495 } else {
496 i + 1
497 };
498
499 let count = (group_end - i) as u64;
500 output_group_bytes(writer, content, full, count, config, term)?;
501 i = group_end;
502 }
503
504 Ok(())
505}
506
507fn process_default_fast_singlepass(
514 data: &[u8],
515 writer: &mut impl Write,
516 term: u8,
517) -> io::Result<()> {
518 if data.len() >= 4 * 1024 * 1024 {
520 return process_default_parallel(data, writer, term);
521 }
522
523 process_default_sequential(data, writer, term)
524}
525
526fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
530 let mut prev_start: usize = 0;
531 let mut prev_end: usize; match memchr::memchr(term, data) {
535 Some(pos) => {
536 prev_end = pos;
537 }
538 None => {
539 writer.write_all(data)?;
541 return writer.write_all(&[term]);
542 }
543 }
544
545 let mut run_start: usize = 0;
548 let mut cur_start = prev_end + 1;
549 let mut last_output_end = prev_end + 1; while cur_start < data.len() {
552 let cur_end = match memchr::memchr(term, &data[cur_start..]) {
553 Some(offset) => cur_start + offset,
554 None => data.len(), };
556
557 let prev_content = &data[prev_start..prev_end];
558 let cur_content = &data[cur_start..cur_end];
559
560 if lines_equal_fast(prev_content, cur_content) {
561 if run_start < cur_start {
563 writer.write_all(&data[run_start..cur_start])?;
564 }
565 if cur_end < data.len() {
567 run_start = cur_end + 1;
568 } else {
569 run_start = cur_end;
570 }
571 } else {
572 prev_start = cur_start;
574 prev_end = cur_end;
575 last_output_end = if cur_end < data.len() {
576 cur_end + 1
577 } else {
578 cur_end
579 };
580 }
581
582 if cur_end < data.len() {
583 cur_start = cur_end + 1;
584 } else {
585 break;
586 }
587 }
588
589 if run_start < data.len() {
591 writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
592 }
593
594 if !data.is_empty() && *data.last().unwrap() != term {
596 writer.write_all(&[term])?;
597 }
598
599 Ok(())
600}
601
602fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
606 use rayon::prelude::*;
607
608 let num_threads = rayon::current_num_threads().max(1);
609 let chunk_target = data.len() / num_threads;
610
611 let mut boundaries = Vec::with_capacity(num_threads + 1);
613 boundaries.push(0usize);
614 for i in 1..num_threads {
615 let target = i * chunk_target;
616 if target >= data.len() {
617 break;
618 }
619 if let Some(p) = memchr::memchr(term, &data[target..]) {
620 let b = target + p + 1;
621 if b > *boundaries.last().unwrap() && b <= data.len() {
622 boundaries.push(b);
623 }
624 }
625 }
626 boundaries.push(data.len());
627
628 let n_chunks = boundaries.len() - 1;
629 if n_chunks <= 1 {
630 return process_default_sequential(data, writer, term);
631 }
632
633 struct ChunkResult {
635 runs: Vec<(usize, usize)>,
637 first_line_start: usize,
639 first_line_end: usize,
640 last_line_start: usize,
642 last_line_end: usize,
643 }
644
645 let results: Vec<ChunkResult> = boundaries
646 .windows(2)
647 .collect::<Vec<_>>()
648 .par_iter()
649 .map(|w| {
650 let chunk_start = w[0];
651 let chunk_end = w[1];
652 let chunk = &data[chunk_start..chunk_end];
653
654 let first_term = match memchr::memchr(term, chunk) {
655 Some(pos) => pos,
656 None => {
657 return ChunkResult {
658 runs: vec![(chunk_start, chunk_end)],
659 first_line_start: chunk_start,
660 first_line_end: chunk_end,
661 last_line_start: chunk_start,
662 last_line_end: chunk_end,
663 };
664 }
665 };
666
667 let first_line_start = chunk_start;
668 let first_line_end = chunk_start + first_term;
669
670 let mut runs: Vec<(usize, usize)> = Vec::new();
671 let mut run_start = chunk_start;
672 let mut prev_start = 0usize;
673 let mut prev_end = first_term;
674 let mut last_out_start = chunk_start;
675 let mut last_out_end = first_line_end;
676
677 let mut cur_start = first_term + 1;
678 while cur_start < chunk.len() {
679 let cur_end = match memchr::memchr(term, &chunk[cur_start..]) {
680 Some(offset) => cur_start + offset,
681 None => chunk.len(),
682 };
683
684 if lines_equal_fast(&chunk[prev_start..prev_end], &chunk[cur_start..cur_end]) {
685 let abs_cur = chunk_start + cur_start;
687 if run_start < abs_cur {
688 runs.push((run_start, abs_cur));
689 }
690 run_start = chunk_start
692 + if cur_end < chunk.len() {
693 cur_end + 1
694 } else {
695 cur_end
696 };
697 } else {
698 last_out_start = chunk_start + cur_start;
699 last_out_end = chunk_start + cur_end;
700 }
701 prev_start = cur_start;
702 prev_end = cur_end;
703
704 if cur_end < chunk.len() {
705 cur_start = cur_end + 1;
706 } else {
707 break;
708 }
709 }
710
711 if run_start < chunk_end {
713 runs.push((run_start, chunk_end));
714 }
715
716 ChunkResult {
717 runs,
718 first_line_start,
719 first_line_end,
720 last_line_start: last_out_start,
721 last_line_end: last_out_end,
722 }
723 })
724 .collect();
725
726 for (i, result) in results.iter().enumerate() {
728 let skip_first = if i > 0 {
729 let prev = &results[i - 1];
730 let prev_last = &data[prev.last_line_start..prev.last_line_end];
731 let cur_first = &data[result.first_line_start..result.first_line_end];
732 lines_equal_fast(prev_last, cur_first)
733 } else {
734 false
735 };
736
737 let skip_end = if skip_first {
738 result.first_line_end + 1
740 } else {
741 0
742 };
743
744 for &(rs, re) in &result.runs {
745 let actual_start = rs.max(skip_end);
746 if actual_start < re {
747 writer.write_all(&data[actual_start..re])?;
748 }
749 }
750 }
751
752 if !data.is_empty() && *data.last().unwrap() != term {
754 writer.write_all(&[term])?;
755 }
756
757 Ok(())
758}
759
760fn process_filter_fast_singlepass(
763 data: &[u8],
764 writer: &mut impl Write,
765 config: &UniqConfig,
766 term: u8,
767) -> io::Result<()> {
768 let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
769
770 let first_term = match memchr::memchr(term, data) {
771 Some(pos) => pos,
772 None => {
773 if !repeated {
775 writer.write_all(data)?;
776 writer.write_all(&[term])?;
777 }
778 return Ok(());
779 }
780 };
781
782 let mut prev_start: usize = 0;
783 let mut prev_end: usize = first_term;
784 let mut count: u64 = 1;
785 let mut cur_start = first_term + 1;
786
787 while cur_start < data.len() {
788 let cur_end = match memchr::memchr(term, &data[cur_start..]) {
789 Some(offset) => cur_start + offset,
790 None => data.len(),
791 };
792
793 if lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
794 count += 1;
795 } else {
796 let should_print = if repeated { count > 1 } else { count == 1 };
798 if should_print {
799 if prev_end < data.len() && data[prev_end] == term {
800 writer.write_all(&data[prev_start..prev_end + 1])?;
801 } else {
802 writer.write_all(&data[prev_start..prev_end])?;
803 writer.write_all(&[term])?;
804 }
805 }
806 prev_start = cur_start;
807 prev_end = cur_end;
808 count = 1;
809 }
810
811 if cur_end < data.len() {
812 cur_start = cur_end + 1;
813 } else {
814 break;
815 }
816 }
817
818 let should_print = if repeated { count > 1 } else { count == 1 };
820 if should_print {
821 if prev_end < data.len() && data[prev_end] == term {
822 writer.write_all(&data[prev_start..prev_end + 1])?;
823 } else {
824 writer.write_all(&data[prev_start..prev_end])?;
825 writer.write_all(&[term])?;
826 }
827 }
828
829 Ok(())
830}
831
832fn process_count_fast_singlepass(
836 data: &[u8],
837 writer: &mut impl Write,
838 config: &UniqConfig,
839 term: u8,
840) -> io::Result<()> {
841 let first_term = match memchr::memchr(term, data) {
842 Some(pos) => pos,
843 None => {
844 let should_print = match config.mode {
846 OutputMode::Default => true,
847 OutputMode::RepeatedOnly => false,
848 OutputMode::UniqueOnly => true,
849 _ => true,
850 };
851 if should_print {
852 write_count_line(writer, 1, data, term)?;
853 }
854 return Ok(());
855 }
856 };
857
858 let mut prev_start: usize = 0;
859 let mut prev_end: usize = first_term;
860 let mut count: u64 = 1;
861 let mut cur_start = first_term + 1;
862
863 while cur_start < data.len() {
864 let cur_end = match memchr::memchr(term, &data[cur_start..]) {
865 Some(offset) => cur_start + offset,
866 None => data.len(),
867 };
868
869 if lines_equal_fast(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
870 count += 1;
871 } else {
872 let should_print = match config.mode {
874 OutputMode::Default => true,
875 OutputMode::RepeatedOnly => count > 1,
876 OutputMode::UniqueOnly => count == 1,
877 _ => true,
878 };
879 if should_print {
880 write_count_line(writer, count, &data[prev_start..prev_end], term)?;
881 }
882 prev_start = cur_start;
883 prev_end = cur_end;
884 count = 1;
885 }
886
887 if cur_end < data.len() {
888 cur_start = cur_end + 1;
889 } else {
890 break;
891 }
892 }
893
894 let should_print = match config.mode {
896 OutputMode::Default => true,
897 OutputMode::RepeatedOnly => count > 1,
898 OutputMode::UniqueOnly => count == 1,
899 _ => true,
900 };
901 if should_print {
902 write_count_line(writer, count, &data[prev_start..prev_end], term)?;
903 }
904
905 Ok(())
906}
907
908fn process_default_ci_singlepass(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
911 let mut prev_start: usize = 0;
912 let mut prev_end: usize;
913
914 match memchr::memchr(term, data) {
915 Some(pos) => {
916 prev_end = pos;
917 }
918 None => {
919 writer.write_all(data)?;
920 return writer.write_all(&[term]);
921 }
922 }
923
924 writer.write_all(&data[..prev_end + 1])?;
926
927 let mut cur_start = prev_end + 1;
928
929 while cur_start < data.len() {
930 let cur_end = match memchr::memchr(term, &data[cur_start..]) {
931 Some(offset) => cur_start + offset,
932 None => data.len(),
933 };
934
935 let prev_content = &data[prev_start..prev_end];
936 let cur_content = &data[cur_start..cur_end];
937
938 if !lines_equal_case_insensitive(prev_content, cur_content) {
939 if cur_end < data.len() {
941 writer.write_all(&data[cur_start..cur_end + 1])?;
942 } else {
943 writer.write_all(&data[cur_start..cur_end])?;
944 writer.write_all(&[term])?;
945 }
946 prev_start = cur_start;
947 prev_end = cur_end;
948 }
949
950 if cur_end < data.len() {
951 cur_start = cur_end + 1;
952 } else {
953 break;
954 }
955 }
956
957 Ok(())
958}
959
960fn process_filter_ci_singlepass(
962 data: &[u8],
963 writer: &mut impl Write,
964 config: &UniqConfig,
965 term: u8,
966) -> io::Result<()> {
967 let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
968
969 let first_term = match memchr::memchr(term, data) {
970 Some(pos) => pos,
971 None => {
972 if !repeated {
973 writer.write_all(data)?;
974 writer.write_all(&[term])?;
975 }
976 return Ok(());
977 }
978 };
979
980 let mut prev_start: usize = 0;
981 let mut prev_end: usize = first_term;
982 let mut count: u64 = 1;
983 let mut cur_start = first_term + 1;
984
985 while cur_start < data.len() {
986 let cur_end = match memchr::memchr(term, &data[cur_start..]) {
987 Some(offset) => cur_start + offset,
988 None => data.len(),
989 };
990
991 if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
992 count += 1;
993 } else {
994 let should_print = if repeated { count > 1 } else { count == 1 };
995 if should_print {
996 if prev_end < data.len() && data[prev_end] == term {
997 writer.write_all(&data[prev_start..prev_end + 1])?;
998 } else {
999 writer.write_all(&data[prev_start..prev_end])?;
1000 writer.write_all(&[term])?;
1001 }
1002 }
1003 prev_start = cur_start;
1004 prev_end = cur_end;
1005 count = 1;
1006 }
1007
1008 if cur_end < data.len() {
1009 cur_start = cur_end + 1;
1010 } else {
1011 break;
1012 }
1013 }
1014
1015 let should_print = if repeated { count > 1 } else { count == 1 };
1016 if should_print {
1017 if prev_end < data.len() && data[prev_end] == term {
1018 writer.write_all(&data[prev_start..prev_end + 1])?;
1019 } else {
1020 writer.write_all(&data[prev_start..prev_end])?;
1021 writer.write_all(&[term])?;
1022 }
1023 }
1024
1025 Ok(())
1026}
1027
1028fn process_count_ci_singlepass(
1030 data: &[u8],
1031 writer: &mut impl Write,
1032 config: &UniqConfig,
1033 term: u8,
1034) -> io::Result<()> {
1035 let first_term = match memchr::memchr(term, data) {
1036 Some(pos) => pos,
1037 None => {
1038 let should_print = match config.mode {
1039 OutputMode::Default => true,
1040 OutputMode::RepeatedOnly => false,
1041 OutputMode::UniqueOnly => true,
1042 _ => true,
1043 };
1044 if should_print {
1045 write_count_line(writer, 1, data, term)?;
1046 }
1047 return Ok(());
1048 }
1049 };
1050
1051 let mut prev_start: usize = 0;
1052 let mut prev_end: usize = first_term;
1053 let mut count: u64 = 1;
1054 let mut cur_start = first_term + 1;
1055
1056 while cur_start < data.len() {
1057 let cur_end = match memchr::memchr(term, &data[cur_start..]) {
1058 Some(offset) => cur_start + offset,
1059 None => data.len(),
1060 };
1061
1062 if lines_equal_case_insensitive(&data[prev_start..prev_end], &data[cur_start..cur_end]) {
1063 count += 1;
1064 } else {
1065 let should_print = match config.mode {
1066 OutputMode::Default => true,
1067 OutputMode::RepeatedOnly => count > 1,
1068 OutputMode::UniqueOnly => count == 1,
1069 _ => true,
1070 };
1071 if should_print {
1072 write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1073 }
1074 prev_start = cur_start;
1075 prev_end = cur_end;
1076 count = 1;
1077 }
1078
1079 if cur_end < data.len() {
1080 cur_start = cur_end + 1;
1081 } else {
1082 break;
1083 }
1084 }
1085
1086 let should_print = match config.mode {
1087 OutputMode::Default => true,
1088 OutputMode::RepeatedOnly => count > 1,
1089 OutputMode::UniqueOnly => count == 1,
1090 _ => true,
1091 };
1092 if should_print {
1093 write_count_line(writer, count, &data[prev_start..prev_end], term)?;
1094 }
1095
1096 Ok(())
1097}
1098
1099#[inline(always)]
1101fn output_group_bytes(
1102 writer: &mut impl Write,
1103 content: &[u8],
1104 full: &[u8],
1105 count: u64,
1106 config: &UniqConfig,
1107 term: u8,
1108) -> io::Result<()> {
1109 let should_print = match config.mode {
1110 OutputMode::Default => true,
1111 OutputMode::RepeatedOnly => count > 1,
1112 OutputMode::UniqueOnly => count == 1,
1113 _ => true,
1114 };
1115
1116 if should_print {
1117 if config.count {
1118 write_count_line(writer, count, content, term)?;
1119 } else {
1120 writer.write_all(full)?;
1121 if full.len() == content.len() {
1123 writer.write_all(&[term])?;
1124 }
1125 }
1126 }
1127
1128 Ok(())
1129}
1130
1131fn process_all_repeated_bytes(
1133 data: &[u8],
1134 writer: &mut impl Write,
1135 config: &UniqConfig,
1136 method: AllRepeatedMethod,
1137 term: u8,
1138) -> io::Result<()> {
1139 let mut lines = LineIter::new(data, term);
1140
1141 let first = match lines.next() {
1142 Some(v) => v,
1143 None => return Ok(()),
1144 };
1145
1146 let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
1149 group_lines.push(first);
1150 let mut first_group_printed = false;
1151
1152 let fast = !needs_key_extraction(config) && !config.ignore_case;
1153
1154 for (cur_content, cur_full) in lines {
1155 let prev_content = group_lines.last().unwrap().0;
1156 let equal = if fast {
1157 lines_equal_fast(prev_content, cur_content)
1158 } else {
1159 lines_equal(prev_content, cur_content, config)
1160 };
1161
1162 if equal {
1163 group_lines.push((cur_content, cur_full));
1164 } else {
1165 flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1167 group_lines.clear();
1168 group_lines.push((cur_content, cur_full));
1169 }
1170 }
1171
1172 flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
1174
1175 Ok(())
1176}
1177
1178fn flush_all_repeated_bytes(
1180 writer: &mut impl Write,
1181 group: &[(&[u8], &[u8])],
1182 method: AllRepeatedMethod,
1183 first_group_printed: &mut bool,
1184 term: u8,
1185) -> io::Result<()> {
1186 if group.len() <= 1 {
1187 return Ok(()); }
1189
1190 match method {
1191 AllRepeatedMethod::Prepend => {
1192 writer.write_all(&[term])?;
1193 }
1194 AllRepeatedMethod::Separate => {
1195 if *first_group_printed {
1196 writer.write_all(&[term])?;
1197 }
1198 }
1199 AllRepeatedMethod::None => {}
1200 }
1201
1202 for &(content, full) in group {
1203 writer.write_all(full)?;
1204 if full.len() == content.len() {
1205 writer.write_all(&[term])?;
1206 }
1207 }
1208
1209 *first_group_printed = true;
1210 Ok(())
1211}
1212
1213fn process_group_bytes(
1215 data: &[u8],
1216 writer: &mut impl Write,
1217 config: &UniqConfig,
1218 method: GroupMethod,
1219 term: u8,
1220) -> io::Result<()> {
1221 let mut lines = LineIter::new(data, term);
1222
1223 let (prev_content, prev_full) = match lines.next() {
1224 Some(v) => v,
1225 None => return Ok(()),
1226 };
1227
1228 if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1230 writer.write_all(&[term])?;
1231 }
1232
1233 writer.write_all(prev_full)?;
1235 if prev_full.len() == prev_content.len() {
1236 writer.write_all(&[term])?;
1237 }
1238
1239 let mut prev_content = prev_content;
1240 let fast = !needs_key_extraction(config) && !config.ignore_case;
1241
1242 for (cur_content, cur_full) in lines {
1243 let equal = if fast {
1244 lines_equal_fast(prev_content, cur_content)
1245 } else {
1246 lines_equal(prev_content, cur_content, config)
1247 };
1248
1249 if !equal {
1250 writer.write_all(&[term])?;
1252 }
1253
1254 writer.write_all(cur_full)?;
1255 if cur_full.len() == cur_content.len() {
1256 writer.write_all(&[term])?;
1257 }
1258
1259 prev_content = cur_content;
1260 }
1261
1262 if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1264 writer.write_all(&[term])?;
1265 }
1266
1267 Ok(())
1268}
1269
1270pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
1277 let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
1278 let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
1279 let term = if config.zero_terminated { b'\0' } else { b'\n' };
1280
1281 match config.mode {
1282 OutputMode::Group(method) => {
1283 process_group_stream(reader, &mut writer, config, method, term)?;
1284 }
1285 OutputMode::AllRepeated(method) => {
1286 process_all_repeated_stream(reader, &mut writer, config, method, term)?;
1287 }
1288 _ => {
1289 process_standard_stream(reader, &mut writer, config, term)?;
1290 }
1291 }
1292
1293 writer.flush()?;
1294 Ok(())
1295}
1296
1297fn process_standard_stream<R: BufRead, W: Write>(
1299 mut reader: R,
1300 writer: &mut W,
1301 config: &UniqConfig,
1302 term: u8,
1303) -> io::Result<()> {
1304 let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1305 let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1306
1307 if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1309 return Ok(()); }
1311 let mut count: u64 = 1;
1312
1313 loop {
1314 current_line.clear();
1315 let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1316
1317 if bytes_read == 0 {
1318 output_group_stream(writer, &prev_line, count, config, term)?;
1320 break;
1321 }
1322
1323 if compare_lines_stream(&prev_line, ¤t_line, config, term) {
1324 count += 1;
1325 } else {
1326 output_group_stream(writer, &prev_line, count, config, term)?;
1327 std::mem::swap(&mut prev_line, &mut current_line);
1328 count = 1;
1329 }
1330 }
1331
1332 Ok(())
1333}
1334
1335#[inline(always)]
1337fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
1338 let a_stripped = strip_term(a, term);
1339 let b_stripped = strip_term(b, term);
1340 lines_equal(a_stripped, b_stripped, config)
1341}
1342
1343#[inline(always)]
1345fn strip_term(line: &[u8], term: u8) -> &[u8] {
1346 if line.last() == Some(&term) {
1347 &line[..line.len() - 1]
1348 } else {
1349 line
1350 }
1351}
1352
1353#[inline(always)]
1355fn output_group_stream(
1356 writer: &mut impl Write,
1357 line: &[u8],
1358 count: u64,
1359 config: &UniqConfig,
1360 term: u8,
1361) -> io::Result<()> {
1362 let should_print = match config.mode {
1363 OutputMode::Default => true,
1364 OutputMode::RepeatedOnly => count > 1,
1365 OutputMode::UniqueOnly => count == 1,
1366 _ => true,
1367 };
1368
1369 if should_print {
1370 let content = strip_term(line, term);
1371 if config.count {
1372 write_count_line(writer, count, content, term)?;
1373 } else {
1374 writer.write_all(content)?;
1375 writer.write_all(&[term])?;
1376 }
1377 }
1378
1379 Ok(())
1380}
1381
1382fn process_all_repeated_stream<R: BufRead, W: Write>(
1384 mut reader: R,
1385 writer: &mut W,
1386 config: &UniqConfig,
1387 method: AllRepeatedMethod,
1388 term: u8,
1389) -> io::Result<()> {
1390 let mut group: Vec<Vec<u8>> = Vec::new();
1391 let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1392 let mut first_group_printed = false;
1393
1394 current_line.clear();
1395 if read_line_term(&mut reader, &mut current_line, term)? == 0 {
1396 return Ok(());
1397 }
1398 group.push(current_line.clone());
1399
1400 loop {
1401 current_line.clear();
1402 let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1403
1404 if bytes_read == 0 {
1405 flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1406 break;
1407 }
1408
1409 if compare_lines_stream(group.last().unwrap(), ¤t_line, config, term) {
1410 group.push(current_line.clone());
1411 } else {
1412 flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1413 group.clear();
1414 group.push(current_line.clone());
1415 }
1416 }
1417
1418 Ok(())
1419}
1420
1421fn flush_all_repeated_stream(
1423 writer: &mut impl Write,
1424 group: &[Vec<u8>],
1425 method: AllRepeatedMethod,
1426 first_group_printed: &mut bool,
1427 term: u8,
1428) -> io::Result<()> {
1429 if group.len() <= 1 {
1430 return Ok(());
1431 }
1432
1433 match method {
1434 AllRepeatedMethod::Prepend => {
1435 writer.write_all(&[term])?;
1436 }
1437 AllRepeatedMethod::Separate => {
1438 if *first_group_printed {
1439 writer.write_all(&[term])?;
1440 }
1441 }
1442 AllRepeatedMethod::None => {}
1443 }
1444
1445 for line in group {
1446 let content = strip_term(line, term);
1447 writer.write_all(content)?;
1448 writer.write_all(&[term])?;
1449 }
1450
1451 *first_group_printed = true;
1452 Ok(())
1453}
1454
1455fn process_group_stream<R: BufRead, W: Write>(
1457 mut reader: R,
1458 writer: &mut W,
1459 config: &UniqConfig,
1460 method: GroupMethod,
1461 term: u8,
1462) -> io::Result<()> {
1463 let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1464 let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1465
1466 if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1467 return Ok(());
1468 }
1469
1470 if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1472 writer.write_all(&[term])?;
1473 }
1474
1475 let content = strip_term(&prev_line, term);
1476 writer.write_all(content)?;
1477 writer.write_all(&[term])?;
1478
1479 loop {
1480 current_line.clear();
1481 let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1482
1483 if bytes_read == 0 {
1484 if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1485 writer.write_all(&[term])?;
1486 }
1487 break;
1488 }
1489
1490 if !compare_lines_stream(&prev_line, ¤t_line, config, term) {
1491 writer.write_all(&[term])?;
1492 }
1493
1494 let content = strip_term(¤t_line, term);
1495 writer.write_all(content)?;
1496 writer.write_all(&[term])?;
1497
1498 std::mem::swap(&mut prev_line, &mut current_line);
1499 }
1500
1501 Ok(())
1502}
1503
1504#[inline(always)]
1507fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1508 reader.read_until(term, buf)
1509}