1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6 writer.write_all(buf)
7}
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12 None,
13 Prepend,
14 Separate,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20 Separate,
21 Prepend,
22 Append,
23 Both,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29 Default,
31 RepeatedOnly,
33 AllRepeated(AllRepeatedMethod),
35 UniqueOnly,
37 Group(GroupMethod),
39}
40
41#[derive(Debug, Clone)]
43pub struct UniqConfig {
44 pub mode: OutputMode,
45 pub count: bool,
46 pub ignore_case: bool,
47 pub skip_fields: usize,
48 pub skip_chars: usize,
49 pub check_chars: Option<usize>,
50 pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54 fn default() -> Self {
55 Self {
56 mode: OutputMode::Default,
57 count: false,
58 ignore_case: false,
59 skip_fields: 0,
60 skip_chars: 0,
61 check_chars: None,
62 zero_terminated: false,
63 }
64 }
65}
66
67#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71 let mut start = 0;
72 let len = line.len();
73
74 for _ in 0..config.skip_fields {
76 while start < len && (line[start] == b' ' || line[start] == b'\t') {
78 start += 1;
79 }
80 while start < len && line[start] != b' ' && line[start] != b'\t' {
82 start += 1;
83 }
84 }
85
86 if config.skip_chars > 0 {
88 let remaining = len - start;
89 let skip = config.skip_chars.min(remaining);
90 start += skip;
91 }
92
93 let slice = &line[start..];
94
95 if let Some(w) = config.check_chars {
97 if w < slice.len() {
98 return &slice[..w];
99 }
100 }
101
102 slice
103}
104
105#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108 let sa = get_compare_slice(a, config);
109 let sb = get_compare_slice(b, config);
110
111 if config.ignore_case {
112 sa.eq_ignore_ascii_case(sb)
113 } else {
114 sa == sb
115 }
116}
117
118#[inline(always)]
120fn needs_key_extraction(config: &UniqConfig) -> bool {
121 config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
122}
123
124#[inline(always)]
127fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
128 let alen = a.len();
129 if alen != b.len() {
130 return false;
131 }
132 if alen == 0 {
133 return true;
134 }
135 if alen >= 8 {
137 let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
138 let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
139 if a8 != b8 {
140 return false;
141 }
142 }
143 a == b
144}
145
146#[inline(always)]
150fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
151 let mut prefix = [b' '; 28]; let digits = itoa_right_aligned_into(&mut prefix, count);
154 let width = digits.max(7); let prefix_len = width + 1; prefix[width] = b' ';
157
158 let total = prefix_len + line.len() + 1;
160 if total <= 256 {
161 let mut buf = [0u8; 256];
162 buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
163 buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
164 buf[prefix_len + line.len()] = term;
165 out.write_all(&buf[..total])
166 } else {
167 out.write_all(&prefix[..prefix_len])?;
168 out.write_all(line)?;
169 out.write_all(&[term])
170 }
171}
172
173#[inline(always)]
176fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
177 if val == 0 {
178 buf[6] = b'0';
179 return 7; }
181 let mut pos = 27;
183 while val > 0 {
184 pos -= 1;
185 buf[pos] = b'0' + (val % 10) as u8;
186 val /= 10;
187 }
188 let num_digits = 27 - pos;
189 if num_digits >= 7 {
190 buf.copy_within(pos..27, 0);
192 num_digits
193 } else {
194 let pad = 7 - num_digits;
196 buf.copy_within(pos..27, pad);
197 7
199 }
200}
201
202pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
208 let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
210 let term = if config.zero_terminated { b'\0' } else { b'\n' };
211
212 match config.mode {
213 OutputMode::Group(method) => {
214 process_group_bytes(data, &mut writer, config, method, term)?;
215 }
216 OutputMode::AllRepeated(method) => {
217 process_all_repeated_bytes(data, &mut writer, config, method, term)?;
218 }
219 _ => {
220 process_standard_bytes(data, &mut writer, config, term)?;
221 }
222 }
223
224 writer.flush()?;
225 Ok(())
226}
227
228struct LineIter<'a> {
231 data: &'a [u8],
232 pos: usize,
233 term: u8,
234}
235
236impl<'a> LineIter<'a> {
237 #[inline(always)]
238 fn new(data: &'a [u8], term: u8) -> Self {
239 Self { data, pos: 0, term }
240 }
241}
242
243impl<'a> Iterator for LineIter<'a> {
244 type Item = (&'a [u8], &'a [u8]);
246
247 #[inline(always)]
248 fn next(&mut self) -> Option<Self::Item> {
249 if self.pos >= self.data.len() {
250 return None;
251 }
252
253 let remaining = &self.data[self.pos..];
254 match memchr::memchr(self.term, remaining) {
255 Some(idx) => {
256 let line_start = self.pos;
257 let line_end = self.pos + idx; let full_end = self.pos + idx + 1; self.pos = full_end;
260 Some((
261 &self.data[line_start..line_end],
262 &self.data[line_start..full_end],
263 ))
264 }
265 None => {
266 let line_start = self.pos;
268 self.pos = self.data.len();
269 let line = &self.data[line_start..];
270 Some((line, line))
271 }
272 }
273 }
274}
275
276#[inline(always)]
279fn line_content_at<'a>(
280 data: &'a [u8],
281 line_starts: &[usize],
282 idx: usize,
283 content_end: usize,
284) -> &'a [u8] {
285 let start = line_starts[idx];
286 let end = if idx + 1 < line_starts.len() {
287 line_starts[idx + 1] - 1 } else {
289 content_end };
291 &data[start..end]
292}
293
294#[inline(always)]
296fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
297 let start = line_starts[idx];
298 let end = if idx + 1 < line_starts.len() {
299 line_starts[idx + 1] } else {
301 data.len()
302 };
303 &data[start..end]
304}
305
306#[inline]
311fn linear_scan_group_end(
312 data: &[u8],
313 line_starts: &[usize],
314 group_start: usize,
315 num_lines: usize,
316 content_end: usize,
317) -> usize {
318 let key = line_content_at(data, line_starts, group_start, content_end);
319 let mut i = group_start + 1;
320 while i < num_lines {
321 if !lines_equal_fast(key, line_content_at(data, line_starts, i, content_end)) {
322 return i;
323 }
324 i += 1;
325 }
326 i
327}
328
329fn process_standard_bytes(
333 data: &[u8],
334 writer: &mut impl Write,
335 config: &UniqConfig,
336 term: u8,
337) -> io::Result<()> {
338 if data.is_empty() {
339 return Ok(());
340 }
341
342 let fast = !needs_key_extraction(config) && !config.ignore_case;
343
344 if fast && !config.count && matches!(config.mode, OutputMode::Default) {
348 return process_default_fast_singlepass(data, writer, term);
349 }
350
351 if fast
353 && !config.count
354 && matches!(
355 config.mode,
356 OutputMode::RepeatedOnly | OutputMode::UniqueOnly
357 )
358 {
359 return process_filter_fast_singlepass(data, writer, config, term);
360 }
361
362 let estimated_lines = (data.len() / 40).max(64);
364 let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
365 line_starts.push(0);
366 for pos in memchr::memchr_iter(term, data) {
367 if pos + 1 < data.len() {
368 line_starts.push(pos + 1);
369 }
370 }
371 let num_lines = line_starts.len();
372 if num_lines == 0 {
373 return Ok(());
374 }
375
376 let content_end = if data.last() == Some(&term) {
378 data.len() - 1
379 } else {
380 data.len()
381 };
382
383 if fast && !config.count && matches!(config.mode, OutputMode::Default) {
385 let first_full = line_full_at(data, &line_starts, 0);
387 let first_content = line_content_at(data, &line_starts, 0, content_end);
388 write_all_raw(writer, first_full)?;
389 if first_full.len() == first_content.len() {
390 writer.write_all(&[term])?;
391 }
392
393 let mut i = 1;
394 while i < num_lines {
395 let prev = line_content_at(data, &line_starts, i - 1, content_end);
396 let cur = line_content_at(data, &line_starts, i, content_end);
397
398 if lines_equal_fast(prev, cur) {
399 let group_end =
401 linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
402 i = group_end;
403 continue;
404 }
405
406 let cur_full = line_full_at(data, &line_starts, i);
408 write_all_raw(writer, cur_full)?;
409 if cur_full.len() == cur.len() {
410 writer.write_all(&[term])?;
411 }
412 i += 1;
413 }
414 return Ok(());
415 }
416
417 let mut i = 0;
419 while i < num_lines {
420 let content = line_content_at(data, &line_starts, i, content_end);
421 let full = line_full_at(data, &line_starts, i);
422
423 let group_end = if fast
424 && i + 1 < num_lines
425 && lines_equal_fast(
426 content,
427 line_content_at(data, &line_starts, i + 1, content_end),
428 ) {
429 linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
431 } else if !fast
432 && i + 1 < num_lines
433 && lines_equal(
434 content,
435 line_content_at(data, &line_starts, i + 1, content_end),
436 config,
437 )
438 {
439 let mut j = i + 2;
441 while j < num_lines {
442 if !lines_equal(
443 content,
444 line_content_at(data, &line_starts, j, content_end),
445 config,
446 ) {
447 break;
448 }
449 j += 1;
450 }
451 j
452 } else {
453 i + 1
454 };
455
456 let count = (group_end - i) as u64;
457 output_group_bytes(writer, content, full, count, config, term)?;
458 i = group_end;
459 }
460
461 Ok(())
462}
463
464fn process_default_fast_singlepass(
471 data: &[u8],
472 writer: &mut impl Write,
473 term: u8,
474) -> io::Result<()> {
475 if data.len() >= 4 * 1024 * 1024 {
477 return process_default_parallel(data, writer, term);
478 }
479
480 process_default_sequential(data, writer, term)
481}
482
483fn process_default_sequential(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
487 let mut prev_start: usize = 0;
488 let mut prev_end: usize; match memchr::memchr(term, data) {
492 Some(pos) => {
493 prev_end = pos;
494 }
495 None => {
496 writer.write_all(data)?;
498 return writer.write_all(&[term]);
499 }
500 }
501
502 let mut run_start: usize = 0;
505 let mut cur_start = prev_end + 1;
506 let mut last_output_end = prev_end + 1; while cur_start < data.len() {
509 let cur_end = match memchr::memchr(term, &data[cur_start..]) {
510 Some(offset) => cur_start + offset,
511 None => data.len(), };
513
514 let prev_content = &data[prev_start..prev_end];
515 let cur_content = &data[cur_start..cur_end];
516
517 if lines_equal_fast(prev_content, cur_content) {
518 if run_start < cur_start {
520 writer.write_all(&data[run_start..cur_start])?;
521 }
522 if cur_end < data.len() {
524 run_start = cur_end + 1;
525 } else {
526 run_start = cur_end;
527 }
528 } else {
529 prev_start = cur_start;
531 prev_end = cur_end;
532 last_output_end = if cur_end < data.len() {
533 cur_end + 1
534 } else {
535 cur_end
536 };
537 }
538
539 if cur_end < data.len() {
540 cur_start = cur_end + 1;
541 } else {
542 break;
543 }
544 }
545
546 if run_start < data.len() {
548 writer.write_all(&data[run_start..last_output_end.max(run_start)])?;
549 }
550
551 if !data.is_empty() && *data.last().unwrap() != term {
553 writer.write_all(&[term])?;
554 }
555
556 Ok(())
557}
558
559fn process_default_parallel(data: &[u8], writer: &mut impl Write, term: u8) -> io::Result<()> {
563 use rayon::prelude::*;
564
565 let num_threads = rayon::current_num_threads().max(1);
566 let chunk_target = data.len() / num_threads;
567
568 let mut boundaries = Vec::with_capacity(num_threads + 1);
570 boundaries.push(0usize);
571 for i in 1..num_threads {
572 let target = i * chunk_target;
573 if target >= data.len() {
574 break;
575 }
576 if let Some(p) = memchr::memchr(term, &data[target..]) {
577 let b = target + p + 1;
578 if b > *boundaries.last().unwrap() && b <= data.len() {
579 boundaries.push(b);
580 }
581 }
582 }
583 boundaries.push(data.len());
584
585 let n_chunks = boundaries.len() - 1;
586 if n_chunks <= 1 {
587 return process_default_sequential(data, writer, term);
588 }
589
590 struct ChunkResult {
592 runs: Vec<(usize, usize)>,
594 first_line_start: usize,
596 first_line_end: usize,
597 last_line_start: usize,
599 last_line_end: usize,
600 }
601
602 let results: Vec<ChunkResult> = boundaries
603 .windows(2)
604 .collect::<Vec<_>>()
605 .par_iter()
606 .map(|w| {
607 let chunk_start = w[0];
608 let chunk_end = w[1];
609 let chunk = &data[chunk_start..chunk_end];
610
611 let first_term = match memchr::memchr(term, chunk) {
612 Some(pos) => pos,
613 None => {
614 return ChunkResult {
615 runs: vec![(chunk_start, chunk_end)],
616 first_line_start: chunk_start,
617 first_line_end: chunk_end,
618 last_line_start: chunk_start,
619 last_line_end: chunk_end,
620 };
621 }
622 };
623
624 let first_line_start = chunk_start;
625 let first_line_end = chunk_start + first_term;
626
627 let mut runs: Vec<(usize, usize)> = Vec::new();
628 let mut run_start = chunk_start;
629 let mut prev_start = 0usize;
630 let mut prev_end = first_term;
631 let mut last_out_start = chunk_start;
632 let mut last_out_end = first_line_end;
633
634 let mut cur_start = first_term + 1;
635 while cur_start < chunk.len() {
636 let cur_end = match memchr::memchr(term, &chunk[cur_start..]) {
637 Some(offset) => cur_start + offset,
638 None => chunk.len(),
639 };
640
641 if lines_equal_fast(&chunk[prev_start..prev_end], &chunk[cur_start..cur_end]) {
642 let abs_cur = chunk_start + cur_start;
644 if run_start < abs_cur {
645 runs.push((run_start, abs_cur));
646 }
647 run_start = chunk_start
649 + if cur_end < chunk.len() {
650 cur_end + 1
651 } else {
652 cur_end
653 };
654 } else {
655 last_out_start = chunk_start + cur_start;
656 last_out_end = chunk_start + cur_end;
657 }
658 prev_start = cur_start;
659 prev_end = cur_end;
660
661 if cur_end < chunk.len() {
662 cur_start = cur_end + 1;
663 } else {
664 break;
665 }
666 }
667
668 if run_start < chunk_end {
670 runs.push((run_start, chunk_end));
671 }
672
673 ChunkResult {
674 runs,
675 first_line_start,
676 first_line_end,
677 last_line_start: last_out_start,
678 last_line_end: last_out_end,
679 }
680 })
681 .collect();
682
683 for (i, result) in results.iter().enumerate() {
685 let skip_first = if i > 0 {
686 let prev = &results[i - 1];
687 let prev_last = &data[prev.last_line_start..prev.last_line_end];
688 let cur_first = &data[result.first_line_start..result.first_line_end];
689 lines_equal_fast(prev_last, cur_first)
690 } else {
691 false
692 };
693
694 let skip_end = if skip_first {
695 result.first_line_end + 1
697 } else {
698 0
699 };
700
701 for &(rs, re) in &result.runs {
702 let actual_start = rs.max(skip_end);
703 if actual_start < re {
704 writer.write_all(&data[actual_start..re])?;
705 }
706 }
707 }
708
709 if !data.is_empty() && *data.last().unwrap() != term {
711 writer.write_all(&[term])?;
712 }
713
714 Ok(())
715}
716
717fn process_filter_fast_singlepass(
719 data: &[u8],
720 writer: &mut impl Write,
721 config: &UniqConfig,
722 term: u8,
723) -> io::Result<()> {
724 let repeated = matches!(config.mode, OutputMode::RepeatedOnly);
725 let mut outbuf = Vec::with_capacity(data.len() / 2);
726
727 let prev_start: usize = 0;
728 let prev_end: usize = match memchr::memchr(term, data) {
729 Some(pos) => pos,
730 None => {
731 if !repeated {
733 outbuf.extend_from_slice(data);
734 outbuf.push(term);
735 }
736 return writer.write_all(&outbuf);
737 }
738 };
739
740 let mut prev_start_mut = prev_start;
741 let mut prev_end_mut = prev_end;
742 let mut count: u64 = 1;
743 let mut cur_start = prev_end + 1;
744
745 while cur_start < data.len() {
746 let cur_end = match memchr::memchr(term, &data[cur_start..]) {
747 Some(offset) => cur_start + offset,
748 None => data.len(),
749 };
750
751 let prev_content = &data[prev_start_mut..prev_end_mut];
752 let cur_content = &data[cur_start..cur_end];
753
754 if lines_equal_fast(prev_content, cur_content) {
755 count += 1;
756 } else {
757 let should_print = if repeated { count > 1 } else { count == 1 };
759 if should_print {
760 if prev_end_mut < data.len() && data.get(prev_end_mut) == Some(&term) {
761 outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut + 1]);
762 } else {
763 outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut]);
764 outbuf.push(term);
765 }
766 }
767 prev_start_mut = cur_start;
768 prev_end_mut = cur_end;
769 count = 1;
770 }
771
772 if cur_end < data.len() {
773 cur_start = cur_end + 1;
774 } else {
775 break;
776 }
777 }
778
779 let should_print = if repeated { count > 1 } else { count == 1 };
781 if should_print {
782 if prev_end_mut < data.len() && data.get(prev_end_mut) == Some(&term) {
783 outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut + 1]);
784 } else {
785 outbuf.extend_from_slice(&data[prev_start_mut..prev_end_mut]);
786 outbuf.push(term);
787 }
788 }
789
790 writer.write_all(&outbuf)
791}
792
793#[inline(always)]
795fn output_group_bytes(
796 writer: &mut impl Write,
797 content: &[u8],
798 full: &[u8],
799 count: u64,
800 config: &UniqConfig,
801 term: u8,
802) -> io::Result<()> {
803 let should_print = match config.mode {
804 OutputMode::Default => true,
805 OutputMode::RepeatedOnly => count > 1,
806 OutputMode::UniqueOnly => count == 1,
807 _ => true,
808 };
809
810 if should_print {
811 if config.count {
812 write_count_line(writer, count, content, term)?;
813 } else {
814 writer.write_all(full)?;
815 if full.len() == content.len() {
817 writer.write_all(&[term])?;
818 }
819 }
820 }
821
822 Ok(())
823}
824
825fn process_all_repeated_bytes(
827 data: &[u8],
828 writer: &mut impl Write,
829 config: &UniqConfig,
830 method: AllRepeatedMethod,
831 term: u8,
832) -> io::Result<()> {
833 let mut lines = LineIter::new(data, term);
834
835 let first = match lines.next() {
836 Some(v) => v,
837 None => return Ok(()),
838 };
839
840 let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
843 group_lines.push(first);
844 let mut first_group_printed = false;
845
846 let fast = !needs_key_extraction(config) && !config.ignore_case;
847
848 for (cur_content, cur_full) in lines {
849 let prev_content = group_lines.last().unwrap().0;
850 let equal = if fast {
851 lines_equal_fast(prev_content, cur_content)
852 } else {
853 lines_equal(prev_content, cur_content, config)
854 };
855
856 if equal {
857 group_lines.push((cur_content, cur_full));
858 } else {
859 flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
861 group_lines.clear();
862 group_lines.push((cur_content, cur_full));
863 }
864 }
865
866 flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
868
869 Ok(())
870}
871
872fn flush_all_repeated_bytes(
874 writer: &mut impl Write,
875 group: &[(&[u8], &[u8])],
876 method: AllRepeatedMethod,
877 first_group_printed: &mut bool,
878 term: u8,
879) -> io::Result<()> {
880 if group.len() <= 1 {
881 return Ok(()); }
883
884 match method {
885 AllRepeatedMethod::Prepend => {
886 writer.write_all(&[term])?;
887 }
888 AllRepeatedMethod::Separate => {
889 if *first_group_printed {
890 writer.write_all(&[term])?;
891 }
892 }
893 AllRepeatedMethod::None => {}
894 }
895
896 for &(content, full) in group {
897 writer.write_all(full)?;
898 if full.len() == content.len() {
899 writer.write_all(&[term])?;
900 }
901 }
902
903 *first_group_printed = true;
904 Ok(())
905}
906
907fn process_group_bytes(
909 data: &[u8],
910 writer: &mut impl Write,
911 config: &UniqConfig,
912 method: GroupMethod,
913 term: u8,
914) -> io::Result<()> {
915 let mut lines = LineIter::new(data, term);
916
917 let (prev_content, prev_full) = match lines.next() {
918 Some(v) => v,
919 None => return Ok(()),
920 };
921
922 if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
924 writer.write_all(&[term])?;
925 }
926
927 writer.write_all(prev_full)?;
929 if prev_full.len() == prev_content.len() {
930 writer.write_all(&[term])?;
931 }
932
933 let mut prev_content = prev_content;
934 let fast = !needs_key_extraction(config) && !config.ignore_case;
935
936 for (cur_content, cur_full) in lines {
937 let equal = if fast {
938 lines_equal_fast(prev_content, cur_content)
939 } else {
940 lines_equal(prev_content, cur_content, config)
941 };
942
943 if !equal {
944 writer.write_all(&[term])?;
946 }
947
948 writer.write_all(cur_full)?;
949 if cur_full.len() == cur_content.len() {
950 writer.write_all(&[term])?;
951 }
952
953 prev_content = cur_content;
954 }
955
956 if matches!(method, GroupMethod::Append | GroupMethod::Both) {
958 writer.write_all(&[term])?;
959 }
960
961 Ok(())
962}
963
964pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
971 let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
972 let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
973 let term = if config.zero_terminated { b'\0' } else { b'\n' };
974
975 match config.mode {
976 OutputMode::Group(method) => {
977 process_group_stream(reader, &mut writer, config, method, term)?;
978 }
979 OutputMode::AllRepeated(method) => {
980 process_all_repeated_stream(reader, &mut writer, config, method, term)?;
981 }
982 _ => {
983 process_standard_stream(reader, &mut writer, config, term)?;
984 }
985 }
986
987 writer.flush()?;
988 Ok(())
989}
990
991fn process_standard_stream<R: BufRead, W: Write>(
993 mut reader: R,
994 writer: &mut W,
995 config: &UniqConfig,
996 term: u8,
997) -> io::Result<()> {
998 let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
999 let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1000
1001 if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1003 return Ok(()); }
1005 let mut count: u64 = 1;
1006
1007 loop {
1008 current_line.clear();
1009 let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1010
1011 if bytes_read == 0 {
1012 output_group_stream(writer, &prev_line, count, config, term)?;
1014 break;
1015 }
1016
1017 if compare_lines_stream(&prev_line, ¤t_line, config, term) {
1018 count += 1;
1019 } else {
1020 output_group_stream(writer, &prev_line, count, config, term)?;
1021 std::mem::swap(&mut prev_line, &mut current_line);
1022 count = 1;
1023 }
1024 }
1025
1026 Ok(())
1027}
1028
1029#[inline(always)]
1031fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
1032 let a_stripped = strip_term(a, term);
1033 let b_stripped = strip_term(b, term);
1034 lines_equal(a_stripped, b_stripped, config)
1035}
1036
1037#[inline(always)]
1039fn strip_term(line: &[u8], term: u8) -> &[u8] {
1040 if line.last() == Some(&term) {
1041 &line[..line.len() - 1]
1042 } else {
1043 line
1044 }
1045}
1046
1047#[inline(always)]
1049fn output_group_stream(
1050 writer: &mut impl Write,
1051 line: &[u8],
1052 count: u64,
1053 config: &UniqConfig,
1054 term: u8,
1055) -> io::Result<()> {
1056 let should_print = match config.mode {
1057 OutputMode::Default => true,
1058 OutputMode::RepeatedOnly => count > 1,
1059 OutputMode::UniqueOnly => count == 1,
1060 _ => true,
1061 };
1062
1063 if should_print {
1064 let content = strip_term(line, term);
1065 if config.count {
1066 write_count_line(writer, count, content, term)?;
1067 } else {
1068 writer.write_all(content)?;
1069 writer.write_all(&[term])?;
1070 }
1071 }
1072
1073 Ok(())
1074}
1075
1076fn process_all_repeated_stream<R: BufRead, W: Write>(
1078 mut reader: R,
1079 writer: &mut W,
1080 config: &UniqConfig,
1081 method: AllRepeatedMethod,
1082 term: u8,
1083) -> io::Result<()> {
1084 let mut group: Vec<Vec<u8>> = Vec::new();
1085 let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1086 let mut first_group_printed = false;
1087
1088 current_line.clear();
1089 if read_line_term(&mut reader, &mut current_line, term)? == 0 {
1090 return Ok(());
1091 }
1092 group.push(current_line.clone());
1093
1094 loop {
1095 current_line.clear();
1096 let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1097
1098 if bytes_read == 0 {
1099 flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1100 break;
1101 }
1102
1103 if compare_lines_stream(group.last().unwrap(), ¤t_line, config, term) {
1104 group.push(current_line.clone());
1105 } else {
1106 flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
1107 group.clear();
1108 group.push(current_line.clone());
1109 }
1110 }
1111
1112 Ok(())
1113}
1114
1115fn flush_all_repeated_stream(
1117 writer: &mut impl Write,
1118 group: &[Vec<u8>],
1119 method: AllRepeatedMethod,
1120 first_group_printed: &mut bool,
1121 term: u8,
1122) -> io::Result<()> {
1123 if group.len() <= 1 {
1124 return Ok(());
1125 }
1126
1127 match method {
1128 AllRepeatedMethod::Prepend => {
1129 writer.write_all(&[term])?;
1130 }
1131 AllRepeatedMethod::Separate => {
1132 if *first_group_printed {
1133 writer.write_all(&[term])?;
1134 }
1135 }
1136 AllRepeatedMethod::None => {}
1137 }
1138
1139 for line in group {
1140 let content = strip_term(line, term);
1141 writer.write_all(content)?;
1142 writer.write_all(&[term])?;
1143 }
1144
1145 *first_group_printed = true;
1146 Ok(())
1147}
1148
1149fn process_group_stream<R: BufRead, W: Write>(
1151 mut reader: R,
1152 writer: &mut W,
1153 config: &UniqConfig,
1154 method: GroupMethod,
1155 term: u8,
1156) -> io::Result<()> {
1157 let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
1158 let mut current_line: Vec<u8> = Vec::with_capacity(4096);
1159
1160 if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
1161 return Ok(());
1162 }
1163
1164 if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
1166 writer.write_all(&[term])?;
1167 }
1168
1169 let content = strip_term(&prev_line, term);
1170 writer.write_all(content)?;
1171 writer.write_all(&[term])?;
1172
1173 loop {
1174 current_line.clear();
1175 let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
1176
1177 if bytes_read == 0 {
1178 if matches!(method, GroupMethod::Append | GroupMethod::Both) {
1179 writer.write_all(&[term])?;
1180 }
1181 break;
1182 }
1183
1184 if !compare_lines_stream(&prev_line, ¤t_line, config, term) {
1185 writer.write_all(&[term])?;
1186 }
1187
1188 let content = strip_term(¤t_line, term);
1189 writer.write_all(content)?;
1190 writer.write_all(&[term])?;
1191
1192 std::mem::swap(&mut prev_line, &mut current_line);
1193 }
1194
1195 Ok(())
1196}
1197
1198#[inline(always)]
1201fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
1202 reader.read_until(term, buf)
1203}