1use std::cmp::Ordering;
2use std::io::{self, Write};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum OrderCheck {
7 Default,
8 Strict,
9 None,
10}
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum OutputSpec {
15 JoinField,
17 FileField(usize, usize),
19}
20
21pub struct JoinConfig {
23 pub field1: usize,
25 pub field2: usize,
27 pub print_unpaired1: bool,
29 pub print_unpaired2: bool,
31 pub only_unpaired1: bool,
33 pub only_unpaired2: bool,
35 pub empty_filler: Option<Vec<u8>>,
37 pub case_insensitive: bool,
39 pub output_format: Option<Vec<OutputSpec>>,
41 pub auto_format: bool,
43 pub separator: Option<u8>,
45 pub order_check: OrderCheck,
47 pub header: bool,
49 pub zero_terminated: bool,
51}
52
53impl Default for JoinConfig {
54 fn default() -> Self {
55 Self {
56 field1: 0,
57 field2: 0,
58 print_unpaired1: false,
59 print_unpaired2: false,
60 only_unpaired1: false,
61 only_unpaired2: false,
62 empty_filler: None,
63 case_insensitive: false,
64 output_format: None,
65 auto_format: false,
66 separator: None,
67 order_check: OrderCheck::Default,
68 header: false,
69 zero_terminated: false,
70 }
71 }
72}
73
74fn split_lines<'a>(data: &'a [u8], delim: u8) -> Vec<&'a [u8]> {
77 if data.is_empty() {
78 return Vec::new();
79 }
80 let est_lines = data.len() / 40 + 1;
82 let mut lines = Vec::with_capacity(est_lines);
83 let mut start = 0;
84 for pos in memchr::memchr_iter(delim, data) {
85 lines.push(&data[start..pos]);
86 start = pos + 1;
87 }
88 if start < data.len() {
89 lines.push(&data[start..]);
90 }
91 lines
92}
93
94fn split_fields_whitespace<'a>(line: &'a [u8]) -> Vec<&'a [u8]> {
96 let mut fields = Vec::with_capacity(8);
97 let mut i = 0;
98 let len = line.len();
99 while i < len {
100 while i < len && (line[i] == b' ' || line[i] == b'\t') {
102 i += 1;
103 }
104 if i >= len {
105 break;
106 }
107 let start = i;
108 while i < len && line[i] != b' ' && line[i] != b'\t' {
109 i += 1;
110 }
111 fields.push(&line[start..i]);
112 }
113 fields
114}
115
116fn split_fields_char<'a>(line: &'a [u8], sep: u8) -> Vec<&'a [u8]> {
119 let mut fields = Vec::with_capacity(8);
120 let mut start = 0;
121 for pos in memchr::memchr_iter(sep, line) {
122 fields.push(&line[start..pos]);
123 start = pos + 1;
124 }
125 fields.push(&line[start..]);
126 fields
127}
128
129#[inline]
131fn split_fields<'a>(line: &'a [u8], separator: Option<u8>) -> Vec<&'a [u8]> {
132 if let Some(sep) = separator {
133 split_fields_char(line, sep)
134 } else {
135 split_fields_whitespace(line)
136 }
137}
138
139#[inline]
141fn extract_field<'a>(line: &'a [u8], field_index: usize, separator: Option<u8>) -> &'a [u8] {
142 if let Some(sep) = separator {
143 let mut count = 0;
144 let mut start = 0;
145 for pos in memchr::memchr_iter(sep, line) {
146 if count == field_index {
147 return &line[start..pos];
148 }
149 count += 1;
150 start = pos + 1;
151 }
152 if count == field_index {
153 return &line[start..];
154 }
155 b""
156 } else {
157 let mut count = 0;
158 let mut i = 0;
159 let len = line.len();
160 while i < len {
161 while i < len && (line[i] == b' ' || line[i] == b'\t') {
162 i += 1;
163 }
164 if i >= len {
165 break;
166 }
167 let start = i;
168 while i < len && line[i] != b' ' && line[i] != b'\t' {
169 i += 1;
170 }
171 if count == field_index {
172 return &line[start..i];
173 }
174 count += 1;
175 }
176 b""
177 }
178}
179
180#[inline]
182fn compare_keys(a: &[u8], b: &[u8], case_insensitive: bool) -> Ordering {
183 if case_insensitive {
184 for (&ca, &cb) in a.iter().zip(b.iter()) {
185 match ca.to_ascii_lowercase().cmp(&cb.to_ascii_lowercase()) {
186 Ordering::Equal => continue,
187 other => return other,
188 }
189 }
190 a.len().cmp(&b.len())
191 } else {
192 a.cmp(b)
193 }
194}
195
196fn write_paired_default_zerocopy(
199 line1: &[u8],
200 line2: &[u8],
201 join_key: &[u8],
202 field1: usize,
203 field2: usize,
204 separator: Option<u8>,
205 out_sep: u8,
206 delim: u8,
207 buf: &mut Vec<u8>,
208) {
209 buf.extend_from_slice(join_key);
210 write_other_fields(line1, field1, separator, out_sep, buf);
211 write_other_fields(line2, field2, separator, out_sep, buf);
212 buf.push(delim);
213}
214
215#[inline]
218fn write_other_fields(
219 line: &[u8],
220 skip_field: usize,
221 separator: Option<u8>,
222 out_sep: u8,
223 buf: &mut Vec<u8>,
224) {
225 if let Some(sep) = separator {
226 let mut field_idx = 0;
227 let mut start = 0;
228 for pos in memchr::memchr_iter(sep, line) {
229 if field_idx != skip_field {
230 buf.push(out_sep);
231 buf.extend_from_slice(&line[start..pos]);
232 }
233 field_idx += 1;
234 start = pos + 1;
235 }
236 if field_idx != skip_field {
238 buf.push(out_sep);
239 buf.extend_from_slice(&line[start..]);
240 }
241 } else {
242 let mut field_idx = 0;
244 let mut i = 0;
245 let len = line.len();
246 while i < len {
247 while i < len && (line[i] == b' ' || line[i] == b'\t') {
248 i += 1;
249 }
250 if i >= len {
251 break;
252 }
253 let start = i;
254 while i < len && line[i] != b' ' && line[i] != b'\t' {
255 i += 1;
256 }
257 if field_idx != skip_field {
258 buf.push(out_sep);
259 buf.extend_from_slice(&line[start..i]);
260 }
261 field_idx += 1;
262 }
263 }
264}
265
266fn write_paired_format(
268 fields1: &[&[u8]],
269 fields2: &[&[u8]],
270 join_key: &[u8],
271 specs: &[OutputSpec],
272 empty: &[u8],
273 out_sep: u8,
274 delim: u8,
275 buf: &mut Vec<u8>,
276) {
277 for (i, spec) in specs.iter().enumerate() {
278 if i > 0 {
279 buf.push(out_sep);
280 }
281 match spec {
282 OutputSpec::JoinField => buf.extend_from_slice(join_key),
283 OutputSpec::FileField(file_num, field_idx) => {
284 let fields = if *file_num == 0 { fields1 } else { fields2 };
285 if let Some(f) = fields.get(*field_idx) {
286 buf.extend_from_slice(f);
287 } else {
288 buf.extend_from_slice(empty);
289 }
290 }
291 }
292 }
293 buf.push(delim);
294}
295
296fn write_unpaired_default_zerocopy(
298 line: &[u8],
299 join_field: usize,
300 separator: Option<u8>,
301 out_sep: u8,
302 delim: u8,
303 buf: &mut Vec<u8>,
304) {
305 let key = extract_field(line, join_field, separator);
306 buf.extend_from_slice(key);
307 write_other_fields(line, join_field, separator, out_sep, buf);
308 buf.push(delim);
309}
310
311fn write_unpaired_format(
313 fields: &[&[u8]],
314 file_num: usize,
315 join_field: usize,
316 specs: &[OutputSpec],
317 empty: &[u8],
318 out_sep: u8,
319 delim: u8,
320 buf: &mut Vec<u8>,
321) {
322 let key = fields.get(join_field).copied().unwrap_or(b"");
323 for (i, spec) in specs.iter().enumerate() {
324 if i > 0 {
325 buf.push(out_sep);
326 }
327 match spec {
328 OutputSpec::JoinField => buf.extend_from_slice(key),
329 OutputSpec::FileField(fnum, fidx) => {
330 if *fnum == file_num {
331 if let Some(f) = fields.get(*fidx) {
332 buf.extend_from_slice(f);
333 } else {
334 buf.extend_from_slice(empty);
335 }
336 } else {
337 buf.extend_from_slice(empty);
338 }
339 }
340 }
341 }
342 buf.push(delim);
343}
344
345pub fn join(
347 data1: &[u8],
348 data2: &[u8],
349 config: &JoinConfig,
350 tool_name: &str,
351 file1_name: &str,
352 file2_name: &str,
353 out: &mut impl Write,
354) -> io::Result<bool> {
355 let delim = if config.zero_terminated { b'\0' } else { b'\n' };
356 let out_sep = config.separator.unwrap_or(b' ');
357 let empty = config.empty_filler.as_deref().unwrap_or(b"");
358 let ci = config.case_insensitive;
359
360 let print_paired = !config.only_unpaired1 && !config.only_unpaired2;
361 let show_unpaired1 = config.print_unpaired1 || config.only_unpaired1;
362 let show_unpaired2 = config.print_unpaired2 || config.only_unpaired2;
363
364 let lines1 = split_lines(data1, delim);
365 let lines2 = split_lines(data2, delim);
366
367 let keys1: Vec<&[u8]> = lines1
371 .iter()
372 .map(|l| extract_field(l, config.field1, config.separator))
373 .collect();
374 let keys2: Vec<&[u8]> = lines2
375 .iter()
376 .map(|l| extract_field(l, config.field2, config.separator))
377 .collect();
378
379 let mut i1 = 0usize;
380 let mut i2 = 0usize;
381 let mut had_order_error = false;
382 let mut warned1 = false;
383 let mut warned2 = false;
384
385 const FLUSH_THRESHOLD: usize = 4 * 1024 * 1024;
386 let mut buf = Vec::with_capacity((data1.len() + data2.len()).min(FLUSH_THRESHOLD * 2));
387
388 let auto_specs: Option<Vec<OutputSpec>> = if config.auto_format {
390 let fc1 = if !lines1.is_empty() {
391 split_fields(lines1[0], config.separator).len()
392 } else {
393 1
394 };
395 let fc2 = if !lines2.is_empty() {
396 split_fields(lines2[0], config.separator).len()
397 } else {
398 1
399 };
400 let mut specs = Vec::new();
401 specs.push(OutputSpec::JoinField);
402 for i in 0..fc1 {
403 if i != config.field1 {
404 specs.push(OutputSpec::FileField(0, i));
405 }
406 }
407 for i in 0..fc2 {
408 if i != config.field2 {
409 specs.push(OutputSpec::FileField(1, i));
410 }
411 }
412 Some(specs)
413 } else {
414 None
415 };
416
417 let format = config.output_format.as_deref().or(auto_specs.as_deref());
418
419 if config.header && !lines1.is_empty() && !lines2.is_empty() {
421 let key = extract_field(lines1[0], config.field1, config.separator);
422
423 if let Some(specs) = format {
424 let fields1 = split_fields(lines1[0], config.separator);
425 let fields2 = split_fields(lines2[0], config.separator);
426 write_paired_format(
427 &fields1, &fields2, key, specs, empty, out_sep, delim, &mut buf,
428 );
429 } else {
430 write_paired_default_zerocopy(
431 lines1[0],
432 lines2[0],
433 key,
434 config.field1,
435 config.field2,
436 config.separator,
437 out_sep,
438 delim,
439 &mut buf,
440 );
441 }
442 i1 = 1;
443 i2 = 1;
444 } else if config.header {
445 if !lines1.is_empty() {
447 i1 = 1;
448 }
449 if !lines2.is_empty() {
450 i2 = 1;
451 }
452 }
453
454 while i1 < lines1.len() && i2 < lines2.len() {
455 debug_assert!(i1 < keys1.len() && i2 < keys2.len());
456 let key1 = unsafe { *keys1.get_unchecked(i1) };
459 let key2 = unsafe { *keys2.get_unchecked(i2) };
460
461 if config.order_check != OrderCheck::None {
463 if !warned1 && i1 > (if config.header { 1 } else { 0 }) {
464 let prev_key = keys1[i1 - 1];
465 if compare_keys(key1, prev_key, ci) == Ordering::Less {
466 had_order_error = true;
467 warned1 = true;
468 eprintln!(
469 "{}: {}:{}: is not sorted: {}",
470 tool_name,
471 file1_name,
472 i1 + 1,
473 String::from_utf8_lossy(lines1[i1])
474 );
475 if config.order_check == OrderCheck::Strict {
476 out.write_all(&buf)?;
477 return Ok(true);
478 }
479 }
480 }
481 if !warned2 && i2 > (if config.header { 1 } else { 0 }) {
482 let prev_key = keys2[i2 - 1];
483 if compare_keys(key2, prev_key, ci) == Ordering::Less {
484 had_order_error = true;
485 warned2 = true;
486 eprintln!(
487 "{}: {}:{}: is not sorted: {}",
488 tool_name,
489 file2_name,
490 i2 + 1,
491 String::from_utf8_lossy(lines2[i2])
492 );
493 if config.order_check == OrderCheck::Strict {
494 out.write_all(&buf)?;
495 return Ok(true);
496 }
497 }
498 }
499 }
500
501 match compare_keys(key1, key2, ci) {
502 Ordering::Less => {
503 if show_unpaired1 {
504 if let Some(specs) = format {
505 let fields1 = split_fields(lines1[i1], config.separator);
506 write_unpaired_format(
507 &fields1,
508 0,
509 config.field1,
510 specs,
511 empty,
512 out_sep,
513 delim,
514 &mut buf,
515 );
516 } else {
517 write_unpaired_default_zerocopy(
518 lines1[i1],
519 config.field1,
520 config.separator,
521 out_sep,
522 delim,
523 &mut buf,
524 );
525 }
526 }
527 i1 += 1;
528 if show_unpaired1 && buf.len() >= FLUSH_THRESHOLD {
529 out.write_all(&buf)?;
530 buf.clear();
531 }
532 }
533 Ordering::Greater => {
534 if show_unpaired2 {
535 if let Some(specs) = format {
536 let fields2 = split_fields(lines2[i2], config.separator);
537 write_unpaired_format(
538 &fields2,
539 1,
540 config.field2,
541 specs,
542 empty,
543 out_sep,
544 delim,
545 &mut buf,
546 );
547 } else {
548 write_unpaired_default_zerocopy(
549 lines2[i2],
550 config.field2,
551 config.separator,
552 out_sep,
553 delim,
554 &mut buf,
555 );
556 }
557 }
558 i2 += 1;
559
560 if buf.len() >= FLUSH_THRESHOLD {
562 out.write_all(&buf)?;
563 buf.clear();
564 }
565 }
566 Ordering::Equal => {
567 let group_start = i2;
569 let current_key = key2;
570 i2 += 1;
571 while i2 < lines2.len() {
572 debug_assert!(i2 < keys2.len());
573 let next_key = unsafe { *keys2.get_unchecked(i2) };
575 if compare_keys(next_key, current_key, ci) != Ordering::Equal {
576 break;
577 }
578 i2 += 1;
579 }
580
581 let group_size = i2 - group_start;
583 let group2_fields: Vec<Vec<&[u8]>> = if print_paired && format.is_some() {
584 (group_start..i2)
585 .map(|j| split_fields(lines2[j], config.separator))
586 .collect()
587 } else {
588 Vec::new()
589 };
590 let group2_suffixes: Vec<Vec<u8>> =
593 if print_paired && format.is_none() && group_size > 1 {
594 (group_start..i2)
595 .map(|j| {
596 let mut s = Vec::with_capacity(lines2[j].len());
597 write_other_fields(
598 lines2[j],
599 config.field2,
600 config.separator,
601 out_sep,
602 &mut s,
603 );
604 s
605 })
606 .collect()
607 } else {
608 Vec::new()
609 };
610
611 loop {
613 if print_paired {
614 let key = extract_field(lines1[i1], config.field1, config.separator);
615 if let Some(specs) = format {
616 let fields1 = split_fields(lines1[i1], config.separator);
617 for fields2 in &group2_fields {
618 write_paired_format(
619 &fields1, fields2, key, specs, empty, out_sep, delim, &mut buf,
620 );
621 }
622 } else if !group2_suffixes.is_empty() {
623 let mut prefix = Vec::with_capacity(lines1[i1].len() + 2);
625 prefix.extend_from_slice(key);
626 write_other_fields(
627 lines1[i1],
628 config.field1,
629 config.separator,
630 out_sep,
631 &mut prefix,
632 );
633 for suffix in &group2_suffixes {
634 buf.extend_from_slice(&prefix);
635 buf.extend_from_slice(suffix);
636 buf.push(delim);
637 }
638 } else {
639 for j in group_start..i2 {
641 write_paired_default_zerocopy(
642 lines1[i1],
643 lines2[j],
644 key,
645 config.field1,
646 config.field2,
647 config.separator,
648 out_sep,
649 delim,
650 &mut buf,
651 );
652 }
653 }
654 }
655 if buf.len() >= FLUSH_THRESHOLD {
657 out.write_all(&buf)?;
658 buf.clear();
659 }
660 i1 += 1;
661 if i1 >= lines1.len() {
662 break;
663 }
664 debug_assert!(i1 < keys1.len());
665 let next_key = unsafe { *keys1.get_unchecked(i1) };
667 let cmp = compare_keys(next_key, current_key, ci);
668 if cmp != Ordering::Equal {
669 if config.order_check != OrderCheck::None
671 && !warned1
672 && cmp == Ordering::Less
673 {
674 had_order_error = true;
675 warned1 = true;
676 eprintln!(
677 "{}: {}:{}: is not sorted: {}",
678 tool_name,
679 file1_name,
680 i1 + 1,
681 String::from_utf8_lossy(lines1[i1])
682 );
683 if config.order_check == OrderCheck::Strict {
684 out.write_all(&buf)?;
685 return Ok(true);
686 }
687 }
688 break;
689 }
690 }
691 }
692 }
693 }
694
695 while i1 < lines1.len() {
697 if config.order_check != OrderCheck::None
699 && !warned1
700 && i1 > (if config.header { 1 } else { 0 })
701 {
702 let key1 = keys1[i1];
703 let prev_key = keys1[i1 - 1];
704 if compare_keys(key1, prev_key, ci) == Ordering::Less {
705 had_order_error = true;
706 warned1 = true;
707 eprintln!(
708 "{}: {}:{}: is not sorted: {}",
709 tool_name,
710 file1_name,
711 i1 + 1,
712 String::from_utf8_lossy(lines1[i1])
713 );
714 if config.order_check == OrderCheck::Strict {
715 out.write_all(&buf)?;
716 return Ok(true);
717 }
718 }
719 }
720 if show_unpaired1 {
721 if let Some(specs) = format {
722 let fields1 = split_fields(lines1[i1], config.separator);
723 write_unpaired_format(
724 &fields1,
725 0,
726 config.field1,
727 specs,
728 empty,
729 out_sep,
730 delim,
731 &mut buf,
732 );
733 } else {
734 write_unpaired_default_zerocopy(
735 lines1[i1],
736 config.field1,
737 config.separator,
738 out_sep,
739 delim,
740 &mut buf,
741 );
742 }
743 }
744 i1 += 1;
745 }
746
747 while i2 < lines2.len() {
749 if config.order_check != OrderCheck::None
751 && !warned2
752 && i2 > (if config.header { 1 } else { 0 })
753 {
754 let key2 = keys2[i2];
755 let prev_key = keys2[i2 - 1];
756 if compare_keys(key2, prev_key, ci) == Ordering::Less {
757 had_order_error = true;
758 warned2 = true;
759 eprintln!(
760 "{}: {}:{}: is not sorted: {}",
761 tool_name,
762 file2_name,
763 i2 + 1,
764 String::from_utf8_lossy(lines2[i2])
765 );
766 if config.order_check == OrderCheck::Strict {
767 out.write_all(&buf)?;
768 return Ok(true);
769 }
770 }
771 }
772 if show_unpaired2 {
773 if let Some(specs) = format {
774 let fields2 = split_fields(lines2[i2], config.separator);
775 write_unpaired_format(
776 &fields2,
777 1,
778 config.field2,
779 specs,
780 empty,
781 out_sep,
782 delim,
783 &mut buf,
784 );
785 } else {
786 write_unpaired_default_zerocopy(
787 lines2[i2],
788 config.field2,
789 config.separator,
790 out_sep,
791 delim,
792 &mut buf,
793 );
794 }
795 }
796 i2 += 1;
797 }
798
799 out.write_all(&buf)?;
800 Ok(had_order_error)
801}