1use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
2
3#[inline]
5fn write_all_raw(writer: &mut impl Write, buf: &[u8]) -> io::Result<()> {
6 writer.write_all(buf)
7}
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum AllRepeatedMethod {
12 None,
13 Prepend,
14 Separate,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum GroupMethod {
20 Separate,
21 Prepend,
22 Append,
23 Both,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum OutputMode {
29 Default,
31 RepeatedOnly,
33 AllRepeated(AllRepeatedMethod),
35 UniqueOnly,
37 Group(GroupMethod),
39}
40
41#[derive(Debug, Clone)]
43pub struct UniqConfig {
44 pub mode: OutputMode,
45 pub count: bool,
46 pub ignore_case: bool,
47 pub skip_fields: usize,
48 pub skip_chars: usize,
49 pub check_chars: Option<usize>,
50 pub zero_terminated: bool,
51}
52
53impl Default for UniqConfig {
54 fn default() -> Self {
55 Self {
56 mode: OutputMode::Default,
57 count: false,
58 ignore_case: false,
59 skip_fields: 0,
60 skip_chars: 0,
61 check_chars: None,
62 zero_terminated: false,
63 }
64 }
65}
66
67#[inline(always)]
70fn get_compare_slice<'a>(line: &'a [u8], config: &UniqConfig) -> &'a [u8] {
71 let mut start = 0;
72 let len = line.len();
73
74 for _ in 0..config.skip_fields {
76 while start < len && (line[start] == b' ' || line[start] == b'\t') {
78 start += 1;
79 }
80 while start < len && line[start] != b' ' && line[start] != b'\t' {
82 start += 1;
83 }
84 }
85
86 if config.skip_chars > 0 {
88 let remaining = len - start;
89 let skip = config.skip_chars.min(remaining);
90 start += skip;
91 }
92
93 let slice = &line[start..];
94
95 if let Some(w) = config.check_chars {
97 if w < slice.len() {
98 return &slice[..w];
99 }
100 }
101
102 slice
103}
104
105#[inline(always)]
107fn lines_equal(a: &[u8], b: &[u8], config: &UniqConfig) -> bool {
108 let sa = get_compare_slice(a, config);
109 let sb = get_compare_slice(b, config);
110
111 if config.ignore_case {
112 sa.eq_ignore_ascii_case(sb)
113 } else {
114 sa == sb
115 }
116}
117
118#[inline(always)]
120fn needs_key_extraction(config: &UniqConfig) -> bool {
121 config.skip_fields > 0 || config.skip_chars > 0 || config.check_chars.is_some()
122}
123
124#[inline(always)]
127fn lines_equal_fast(a: &[u8], b: &[u8]) -> bool {
128 let alen = a.len();
129 if alen != b.len() {
130 return false;
131 }
132 if alen == 0 {
133 return true;
134 }
135 if alen >= 8 {
137 let a8 = unsafe { (a.as_ptr() as *const u64).read_unaligned() };
138 let b8 = unsafe { (b.as_ptr() as *const u64).read_unaligned() };
139 if a8 != b8 {
140 return false;
141 }
142 }
143 a == b
144}
145
146#[inline(always)]
150fn write_count_line(out: &mut impl Write, count: u64, line: &[u8], term: u8) -> io::Result<()> {
151 let mut prefix = [b' '; 28]; let digits = itoa_right_aligned_into(&mut prefix, count);
154 let width = digits.max(7); let prefix_len = width + 1; prefix[width] = b' ';
157
158 let total = prefix_len + line.len() + 1;
160 if total <= 256 {
161 let mut buf = [0u8; 256];
162 buf[..prefix_len].copy_from_slice(&prefix[..prefix_len]);
163 buf[prefix_len..prefix_len + line.len()].copy_from_slice(line);
164 buf[prefix_len + line.len()] = term;
165 out.write_all(&buf[..total])
166 } else {
167 out.write_all(&prefix[..prefix_len])?;
168 out.write_all(line)?;
169 out.write_all(&[term])
170 }
171}
172
173#[inline(always)]
176fn itoa_right_aligned_into(buf: &mut [u8; 28], mut val: u64) -> usize {
177 if val == 0 {
178 buf[6] = b'0';
179 return 7; }
181 let mut pos = 27;
183 while val > 0 {
184 pos -= 1;
185 buf[pos] = b'0' + (val % 10) as u8;
186 val /= 10;
187 }
188 let num_digits = 27 - pos;
189 if num_digits >= 7 {
190 buf.copy_within(pos..27, 0);
192 num_digits
193 } else {
194 let pad = 7 - num_digits;
196 buf.copy_within(pos..27, pad);
197 7
199 }
200}
201
202pub fn process_uniq_bytes(data: &[u8], output: impl Write, config: &UniqConfig) -> io::Result<()> {
208 let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
210 let term = if config.zero_terminated { b'\0' } else { b'\n' };
211
212 match config.mode {
213 OutputMode::Group(method) => {
214 process_group_bytes(data, &mut writer, config, method, term)?;
215 }
216 OutputMode::AllRepeated(method) => {
217 process_all_repeated_bytes(data, &mut writer, config, method, term)?;
218 }
219 _ => {
220 process_standard_bytes(data, &mut writer, config, term)?;
221 }
222 }
223
224 writer.flush()?;
225 Ok(())
226}
227
228struct LineIter<'a> {
231 data: &'a [u8],
232 pos: usize,
233 term: u8,
234}
235
236impl<'a> LineIter<'a> {
237 #[inline(always)]
238 fn new(data: &'a [u8], term: u8) -> Self {
239 Self { data, pos: 0, term }
240 }
241}
242
243impl<'a> Iterator for LineIter<'a> {
244 type Item = (&'a [u8], &'a [u8]);
246
247 #[inline(always)]
248 fn next(&mut self) -> Option<Self::Item> {
249 if self.pos >= self.data.len() {
250 return None;
251 }
252
253 let remaining = &self.data[self.pos..];
254 match memchr::memchr(self.term, remaining) {
255 Some(idx) => {
256 let line_start = self.pos;
257 let line_end = self.pos + idx; let full_end = self.pos + idx + 1; self.pos = full_end;
260 Some((
261 &self.data[line_start..line_end],
262 &self.data[line_start..full_end],
263 ))
264 }
265 None => {
266 let line_start = self.pos;
268 self.pos = self.data.len();
269 let line = &self.data[line_start..];
270 Some((line, line))
271 }
272 }
273 }
274}
275
276#[inline(always)]
279fn line_content_at<'a>(
280 data: &'a [u8],
281 line_starts: &[usize],
282 idx: usize,
283 content_end: usize,
284) -> &'a [u8] {
285 let start = line_starts[idx];
286 let end = if idx + 1 < line_starts.len() {
287 line_starts[idx + 1] - 1 } else {
289 content_end };
291 &data[start..end]
292}
293
294#[inline(always)]
296fn line_full_at<'a>(data: &'a [u8], line_starts: &[usize], idx: usize) -> &'a [u8] {
297 let start = line_starts[idx];
298 let end = if idx + 1 < line_starts.len() {
299 line_starts[idx + 1] } else {
301 data.len()
302 };
303 &data[start..end]
304}
305
306#[inline]
311fn linear_scan_group_end(
312 data: &[u8],
313 line_starts: &[usize],
314 group_start: usize,
315 num_lines: usize,
316 content_end: usize,
317) -> usize {
318 let key = line_content_at(data, line_starts, group_start, content_end);
319 let mut i = group_start + 1;
320 while i < num_lines {
321 if !lines_equal_fast(key, line_content_at(data, line_starts, i, content_end)) {
322 return i;
323 }
324 i += 1;
325 }
326 i
327}
328
329fn process_standard_bytes(
333 data: &[u8],
334 writer: &mut impl Write,
335 config: &UniqConfig,
336 term: u8,
337) -> io::Result<()> {
338 if data.is_empty() {
339 return Ok(());
340 }
341
342 let fast = !needs_key_extraction(config) && !config.ignore_case;
343
344 let estimated_lines = (data.len() / 40).max(64);
347 let mut line_starts: Vec<usize> = Vec::with_capacity(estimated_lines);
348 line_starts.push(0); for pos in memchr::memchr_iter(term, data) {
350 if pos + 1 < data.len() {
351 line_starts.push(pos + 1);
352 }
353 }
354 let num_lines = line_starts.len();
355 if num_lines == 0 {
356 return Ok(());
357 }
358
359 let content_end = if data.last() == Some(&term) {
361 data.len() - 1
362 } else {
363 data.len()
364 };
365
366 if fast && !config.count && matches!(config.mode, OutputMode::Default) {
368 let first_full = line_full_at(data, &line_starts, 0);
370 let first_content = line_content_at(data, &line_starts, 0, content_end);
371 write_all_raw(writer, first_full)?;
372 if first_full.len() == first_content.len() {
373 writer.write_all(&[term])?;
374 }
375
376 let mut i = 1;
377 while i < num_lines {
378 let prev = line_content_at(data, &line_starts, i - 1, content_end);
379 let cur = line_content_at(data, &line_starts, i, content_end);
380
381 if lines_equal_fast(prev, cur) {
382 let group_end =
384 linear_scan_group_end(data, &line_starts, i - 1, num_lines, content_end);
385 i = group_end;
386 continue;
387 }
388
389 let cur_full = line_full_at(data, &line_starts, i);
391 write_all_raw(writer, cur_full)?;
392 if cur_full.len() == cur.len() {
393 writer.write_all(&[term])?;
394 }
395 i += 1;
396 }
397 return Ok(());
398 }
399
400 let mut i = 0;
402 while i < num_lines {
403 let content = line_content_at(data, &line_starts, i, content_end);
404 let full = line_full_at(data, &line_starts, i);
405
406 let group_end = if fast
408 && i + 1 < num_lines
409 && lines_equal_fast(
410 content,
411 line_content_at(data, &line_starts, i + 1, content_end),
412 ) {
413 linear_scan_group_end(data, &line_starts, i, num_lines, content_end)
415 } else if !fast
416 && i + 1 < num_lines
417 && lines_equal(
418 content,
419 line_content_at(data, &line_starts, i + 1, content_end),
420 config,
421 )
422 {
423 let mut j = i + 2;
425 while j < num_lines {
426 if !lines_equal(
427 content,
428 line_content_at(data, &line_starts, j, content_end),
429 config,
430 ) {
431 break;
432 }
433 j += 1;
434 }
435 j
436 } else {
437 i + 1
438 };
439
440 let count = (group_end - i) as u64;
441 output_group_bytes(writer, content, full, count, config, term)?;
442 i = group_end;
443 }
444
445 Ok(())
446}
447
448#[inline(always)]
450fn output_group_bytes(
451 writer: &mut impl Write,
452 content: &[u8],
453 full: &[u8],
454 count: u64,
455 config: &UniqConfig,
456 term: u8,
457) -> io::Result<()> {
458 let should_print = match config.mode {
459 OutputMode::Default => true,
460 OutputMode::RepeatedOnly => count > 1,
461 OutputMode::UniqueOnly => count == 1,
462 _ => true,
463 };
464
465 if should_print {
466 if config.count {
467 write_count_line(writer, count, content, term)?;
468 } else {
469 writer.write_all(full)?;
470 if full.len() == content.len() {
472 writer.write_all(&[term])?;
473 }
474 }
475 }
476
477 Ok(())
478}
479
480fn process_all_repeated_bytes(
482 data: &[u8],
483 writer: &mut impl Write,
484 config: &UniqConfig,
485 method: AllRepeatedMethod,
486 term: u8,
487) -> io::Result<()> {
488 let mut lines = LineIter::new(data, term);
489
490 let first = match lines.next() {
491 Some(v) => v,
492 None => return Ok(()),
493 };
494
495 let mut group_lines: Vec<(&[u8], &[u8])> = Vec::with_capacity(64);
498 group_lines.push(first);
499 let mut first_group_printed = false;
500
501 let fast = !needs_key_extraction(config) && !config.ignore_case;
502
503 for (cur_content, cur_full) in lines {
504 let prev_content = group_lines.last().unwrap().0;
505 let equal = if fast {
506 lines_equal_fast(prev_content, cur_content)
507 } else {
508 lines_equal(prev_content, cur_content, config)
509 };
510
511 if equal {
512 group_lines.push((cur_content, cur_full));
513 } else {
514 flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
516 group_lines.clear();
517 group_lines.push((cur_content, cur_full));
518 }
519 }
520
521 flush_all_repeated_bytes(writer, &group_lines, method, &mut first_group_printed, term)?;
523
524 Ok(())
525}
526
527fn flush_all_repeated_bytes(
529 writer: &mut impl Write,
530 group: &[(&[u8], &[u8])],
531 method: AllRepeatedMethod,
532 first_group_printed: &mut bool,
533 term: u8,
534) -> io::Result<()> {
535 if group.len() <= 1 {
536 return Ok(()); }
538
539 match method {
540 AllRepeatedMethod::Prepend => {
541 writer.write_all(&[term])?;
542 }
543 AllRepeatedMethod::Separate => {
544 if *first_group_printed {
545 writer.write_all(&[term])?;
546 }
547 }
548 AllRepeatedMethod::None => {}
549 }
550
551 for &(content, full) in group {
552 writer.write_all(full)?;
553 if full.len() == content.len() {
554 writer.write_all(&[term])?;
555 }
556 }
557
558 *first_group_printed = true;
559 Ok(())
560}
561
562fn process_group_bytes(
564 data: &[u8],
565 writer: &mut impl Write,
566 config: &UniqConfig,
567 method: GroupMethod,
568 term: u8,
569) -> io::Result<()> {
570 let mut lines = LineIter::new(data, term);
571
572 let (prev_content, prev_full) = match lines.next() {
573 Some(v) => v,
574 None => return Ok(()),
575 };
576
577 if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
579 writer.write_all(&[term])?;
580 }
581
582 writer.write_all(prev_full)?;
584 if prev_full.len() == prev_content.len() {
585 writer.write_all(&[term])?;
586 }
587
588 let mut prev_content = prev_content;
589 let fast = !needs_key_extraction(config) && !config.ignore_case;
590
591 for (cur_content, cur_full) in lines {
592 let equal = if fast {
593 lines_equal_fast(prev_content, cur_content)
594 } else {
595 lines_equal(prev_content, cur_content, config)
596 };
597
598 if !equal {
599 writer.write_all(&[term])?;
601 }
602
603 writer.write_all(cur_full)?;
604 if cur_full.len() == cur_content.len() {
605 writer.write_all(&[term])?;
606 }
607
608 prev_content = cur_content;
609 }
610
611 if matches!(method, GroupMethod::Append | GroupMethod::Both) {
613 writer.write_all(&[term])?;
614 }
615
616 Ok(())
617}
618
619pub fn process_uniq<R: Read, W: Write>(input: R, output: W, config: &UniqConfig) -> io::Result<()> {
626 let reader = BufReader::with_capacity(8 * 1024 * 1024, input);
627 let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, output);
628 let term = if config.zero_terminated { b'\0' } else { b'\n' };
629
630 match config.mode {
631 OutputMode::Group(method) => {
632 process_group_stream(reader, &mut writer, config, method, term)?;
633 }
634 OutputMode::AllRepeated(method) => {
635 process_all_repeated_stream(reader, &mut writer, config, method, term)?;
636 }
637 _ => {
638 process_standard_stream(reader, &mut writer, config, term)?;
639 }
640 }
641
642 writer.flush()?;
643 Ok(())
644}
645
646fn process_standard_stream<R: BufRead, W: Write>(
648 mut reader: R,
649 writer: &mut W,
650 config: &UniqConfig,
651 term: u8,
652) -> io::Result<()> {
653 let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
654 let mut current_line: Vec<u8> = Vec::with_capacity(4096);
655
656 if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
658 return Ok(()); }
660 let mut count: u64 = 1;
661
662 loop {
663 current_line.clear();
664 let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
665
666 if bytes_read == 0 {
667 output_group_stream(writer, &prev_line, count, config, term)?;
669 break;
670 }
671
672 if compare_lines_stream(&prev_line, ¤t_line, config, term) {
673 count += 1;
674 } else {
675 output_group_stream(writer, &prev_line, count, config, term)?;
676 std::mem::swap(&mut prev_line, &mut current_line);
677 count = 1;
678 }
679 }
680
681 Ok(())
682}
683
684#[inline(always)]
686fn compare_lines_stream(a: &[u8], b: &[u8], config: &UniqConfig, term: u8) -> bool {
687 let a_stripped = strip_term(a, term);
688 let b_stripped = strip_term(b, term);
689 lines_equal(a_stripped, b_stripped, config)
690}
691
692#[inline(always)]
694fn strip_term(line: &[u8], term: u8) -> &[u8] {
695 if line.last() == Some(&term) {
696 &line[..line.len() - 1]
697 } else {
698 line
699 }
700}
701
702#[inline(always)]
704fn output_group_stream(
705 writer: &mut impl Write,
706 line: &[u8],
707 count: u64,
708 config: &UniqConfig,
709 term: u8,
710) -> io::Result<()> {
711 let should_print = match config.mode {
712 OutputMode::Default => true,
713 OutputMode::RepeatedOnly => count > 1,
714 OutputMode::UniqueOnly => count == 1,
715 _ => true,
716 };
717
718 if should_print {
719 let content = strip_term(line, term);
720 if config.count {
721 write_count_line(writer, count, content, term)?;
722 } else {
723 writer.write_all(content)?;
724 writer.write_all(&[term])?;
725 }
726 }
727
728 Ok(())
729}
730
731fn process_all_repeated_stream<R: BufRead, W: Write>(
733 mut reader: R,
734 writer: &mut W,
735 config: &UniqConfig,
736 method: AllRepeatedMethod,
737 term: u8,
738) -> io::Result<()> {
739 let mut group: Vec<Vec<u8>> = Vec::new();
740 let mut current_line: Vec<u8> = Vec::with_capacity(4096);
741 let mut first_group_printed = false;
742
743 current_line.clear();
744 if read_line_term(&mut reader, &mut current_line, term)? == 0 {
745 return Ok(());
746 }
747 group.push(current_line.clone());
748
749 loop {
750 current_line.clear();
751 let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
752
753 if bytes_read == 0 {
754 flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
755 break;
756 }
757
758 if compare_lines_stream(group.last().unwrap(), ¤t_line, config, term) {
759 group.push(current_line.clone());
760 } else {
761 flush_all_repeated_stream(writer, &group, method, &mut first_group_printed, term)?;
762 group.clear();
763 group.push(current_line.clone());
764 }
765 }
766
767 Ok(())
768}
769
770fn flush_all_repeated_stream(
772 writer: &mut impl Write,
773 group: &[Vec<u8>],
774 method: AllRepeatedMethod,
775 first_group_printed: &mut bool,
776 term: u8,
777) -> io::Result<()> {
778 if group.len() <= 1 {
779 return Ok(());
780 }
781
782 match method {
783 AllRepeatedMethod::Prepend => {
784 writer.write_all(&[term])?;
785 }
786 AllRepeatedMethod::Separate => {
787 if *first_group_printed {
788 writer.write_all(&[term])?;
789 }
790 }
791 AllRepeatedMethod::None => {}
792 }
793
794 for line in group {
795 let content = strip_term(line, term);
796 writer.write_all(content)?;
797 writer.write_all(&[term])?;
798 }
799
800 *first_group_printed = true;
801 Ok(())
802}
803
804fn process_group_stream<R: BufRead, W: Write>(
806 mut reader: R,
807 writer: &mut W,
808 config: &UniqConfig,
809 method: GroupMethod,
810 term: u8,
811) -> io::Result<()> {
812 let mut prev_line: Vec<u8> = Vec::with_capacity(4096);
813 let mut current_line: Vec<u8> = Vec::with_capacity(4096);
814
815 if read_line_term(&mut reader, &mut prev_line, term)? == 0 {
816 return Ok(());
817 }
818
819 if matches!(method, GroupMethod::Prepend | GroupMethod::Both) {
821 writer.write_all(&[term])?;
822 }
823
824 let content = strip_term(&prev_line, term);
825 writer.write_all(content)?;
826 writer.write_all(&[term])?;
827
828 loop {
829 current_line.clear();
830 let bytes_read = read_line_term(&mut reader, &mut current_line, term)?;
831
832 if bytes_read == 0 {
833 if matches!(method, GroupMethod::Append | GroupMethod::Both) {
834 writer.write_all(&[term])?;
835 }
836 break;
837 }
838
839 if !compare_lines_stream(&prev_line, ¤t_line, config, term) {
840 writer.write_all(&[term])?;
841 }
842
843 let content = strip_term(¤t_line, term);
844 writer.write_all(content)?;
845 writer.write_all(&[term])?;
846
847 std::mem::swap(&mut prev_line, &mut current_line);
848 }
849
850 Ok(())
851}
852
853#[inline(always)]
856fn read_line_term<R: BufRead>(reader: &mut R, buf: &mut Vec<u8>, term: u8) -> io::Result<usize> {
857 reader.read_until(term, buf)
858}