use std::cmp::Ordering;
use std::io::{self, Write};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OrderCheck {
Default,
Strict,
None,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OutputSpec {
JoinField,
FileField(usize, usize),
}
pub struct JoinConfig {
pub field1: usize,
pub field2: usize,
pub print_unpaired1: bool,
pub print_unpaired2: bool,
pub only_unpaired1: bool,
pub only_unpaired2: bool,
pub empty_filler: Option<Vec<u8>>,
pub case_insensitive: bool,
pub output_format: Option<Vec<OutputSpec>>,
pub auto_format: bool,
pub separator: Option<u8>,
pub order_check: OrderCheck,
pub header: bool,
pub zero_terminated: bool,
}
impl Default for JoinConfig {
fn default() -> Self {
Self {
field1: 0,
field2: 0,
print_unpaired1: false,
print_unpaired2: false,
only_unpaired1: false,
only_unpaired2: false,
empty_filler: None,
case_insensitive: false,
output_format: None,
auto_format: false,
separator: None,
order_check: OrderCheck::Default,
header: false,
zero_terminated: false,
}
}
}
fn split_lines<'a>(data: &'a [u8], delim: u8) -> Vec<&'a [u8]> {
if data.is_empty() {
return Vec::new();
}
let est_lines = data.len() / 40 + 1;
let mut lines = Vec::with_capacity(est_lines);
let mut start = 0;
for pos in memchr::memchr_iter(delim, data) {
lines.push(&data[start..pos]);
start = pos + 1;
}
if start < data.len() {
lines.push(&data[start..]);
}
lines
}
fn split_fields_whitespace<'a>(line: &'a [u8]) -> Vec<&'a [u8]> {
let mut fields = Vec::with_capacity(8);
let mut i = 0;
let len = line.len();
while i < len {
while i < len && (line[i] == b' ' || line[i] == b'\t') {
i += 1;
}
if i >= len {
break;
}
let start = i;
while i < len && line[i] != b' ' && line[i] != b'\t' {
i += 1;
}
fields.push(&line[start..i]);
}
fields
}
fn split_fields_char<'a>(line: &'a [u8], sep: u8) -> Vec<&'a [u8]> {
let mut fields = Vec::with_capacity(8);
let mut start = 0;
for pos in memchr::memchr_iter(sep, line) {
fields.push(&line[start..pos]);
start = pos + 1;
}
fields.push(&line[start..]);
fields
}
#[inline]
fn split_fields<'a>(line: &'a [u8], separator: Option<u8>) -> Vec<&'a [u8]> {
if let Some(sep) = separator {
split_fields_char(line, sep)
} else {
split_fields_whitespace(line)
}
}
#[inline]
fn extract_field<'a>(line: &'a [u8], field_index: usize, separator: Option<u8>) -> &'a [u8] {
if let Some(sep) = separator {
let mut count = 0;
let mut start = 0;
for pos in memchr::memchr_iter(sep, line) {
if count == field_index {
return &line[start..pos];
}
count += 1;
start = pos + 1;
}
if count == field_index {
return &line[start..];
}
b""
} else {
let mut count = 0;
let mut i = 0;
let len = line.len();
while i < len {
while i < len && (line[i] == b' ' || line[i] == b'\t') {
i += 1;
}
if i >= len {
break;
}
let start = i;
while i < len && line[i] != b' ' && line[i] != b'\t' {
i += 1;
}
if count == field_index {
return &line[start..i];
}
count += 1;
}
b""
}
}
#[inline]
fn compare_keys(a: &[u8], b: &[u8], case_insensitive: bool) -> Ordering {
if case_insensitive {
for (&ca, &cb) in a.iter().zip(b.iter()) {
match ca.to_ascii_lowercase().cmp(&cb.to_ascii_lowercase()) {
Ordering::Equal => continue,
other => return other,
}
}
a.len().cmp(&b.len())
} else {
a.cmp(b)
}
}
fn write_paired_default_zerocopy(
line1: &[u8],
line2: &[u8],
join_key: &[u8],
field1: usize,
field2: usize,
separator: Option<u8>,
out_sep: u8,
delim: u8,
buf: &mut Vec<u8>,
) {
buf.extend_from_slice(join_key);
write_other_fields(line1, field1, separator, out_sep, buf);
write_other_fields(line2, field2, separator, out_sep, buf);
buf.push(delim);
}
#[inline]
fn write_other_fields(
line: &[u8],
skip_field: usize,
separator: Option<u8>,
out_sep: u8,
buf: &mut Vec<u8>,
) {
if let Some(sep) = separator {
let mut field_idx = 0;
let mut start = 0;
for pos in memchr::memchr_iter(sep, line) {
if field_idx != skip_field {
buf.push(out_sep);
buf.extend_from_slice(&line[start..pos]);
}
field_idx += 1;
start = pos + 1;
}
if field_idx != skip_field {
buf.push(out_sep);
buf.extend_from_slice(&line[start..]);
}
} else {
let mut field_idx = 0;
let mut i = 0;
let len = line.len();
while i < len {
while i < len && (line[i] == b' ' || line[i] == b'\t') {
i += 1;
}
if i >= len {
break;
}
let start = i;
while i < len && line[i] != b' ' && line[i] != b'\t' {
i += 1;
}
if field_idx != skip_field {
buf.push(out_sep);
buf.extend_from_slice(&line[start..i]);
}
field_idx += 1;
}
}
}
fn write_paired_format(
fields1: &[&[u8]],
fields2: &[&[u8]],
join_key: &[u8],
specs: &[OutputSpec],
empty: &[u8],
out_sep: u8,
delim: u8,
buf: &mut Vec<u8>,
) {
for (i, spec) in specs.iter().enumerate() {
if i > 0 {
buf.push(out_sep);
}
match spec {
OutputSpec::JoinField => buf.extend_from_slice(join_key),
OutputSpec::FileField(file_num, field_idx) => {
let fields = if *file_num == 0 { fields1 } else { fields2 };
if let Some(f) = fields.get(*field_idx) {
buf.extend_from_slice(f);
} else {
buf.extend_from_slice(empty);
}
}
}
}
buf.push(delim);
}
fn write_unpaired_default_zerocopy(
line: &[u8],
join_field: usize,
separator: Option<u8>,
out_sep: u8,
delim: u8,
buf: &mut Vec<u8>,
) {
let key = extract_field(line, join_field, separator);
buf.extend_from_slice(key);
write_other_fields(line, join_field, separator, out_sep, buf);
buf.push(delim);
}
fn write_unpaired_format(
fields: &[&[u8]],
file_num: usize,
join_field: usize,
specs: &[OutputSpec],
empty: &[u8],
out_sep: u8,
delim: u8,
buf: &mut Vec<u8>,
) {
let key = fields.get(join_field).copied().unwrap_or(b"");
for (i, spec) in specs.iter().enumerate() {
if i > 0 {
buf.push(out_sep);
}
match spec {
OutputSpec::JoinField => buf.extend_from_slice(key),
OutputSpec::FileField(fnum, fidx) => {
if *fnum == file_num {
if let Some(f) = fields.get(*fidx) {
buf.extend_from_slice(f);
} else {
buf.extend_from_slice(empty);
}
} else {
buf.extend_from_slice(empty);
}
}
}
}
buf.push(delim);
}
pub fn join(
data1: &[u8],
data2: &[u8],
config: &JoinConfig,
tool_name: &str,
file1_name: &str,
file2_name: &str,
out: &mut impl Write,
) -> io::Result<bool> {
let delim = if config.zero_terminated { b'\0' } else { b'\n' };
let out_sep = config.separator.unwrap_or(b' ');
let empty = config.empty_filler.as_deref().unwrap_or(b"");
let ci = config.case_insensitive;
let print_paired = !config.only_unpaired1 && !config.only_unpaired2;
let show_unpaired1 = config.print_unpaired1 || config.only_unpaired1;
let show_unpaired2 = config.print_unpaired2 || config.only_unpaired2;
let lines1 = split_lines(data1, delim);
let lines2 = split_lines(data2, delim);
let keys1: Vec<&[u8]> = lines1
.iter()
.map(|l| extract_field(l, config.field1, config.separator))
.collect();
let keys2: Vec<&[u8]> = lines2
.iter()
.map(|l| extract_field(l, config.field2, config.separator))
.collect();
let mut i1 = 0usize;
let mut i2 = 0usize;
let mut had_order_error = false;
let mut warned1 = false;
let mut warned2 = false;
const FLUSH_THRESHOLD: usize = 4 * 1024 * 1024;
let mut buf = Vec::with_capacity((data1.len() + data2.len()).min(FLUSH_THRESHOLD * 2));
let auto_specs: Option<Vec<OutputSpec>> = if config.auto_format {
let fc1 = if !lines1.is_empty() {
split_fields(lines1[0], config.separator).len()
} else {
1
};
let fc2 = if !lines2.is_empty() {
split_fields(lines2[0], config.separator).len()
} else {
1
};
let mut specs = Vec::new();
specs.push(OutputSpec::JoinField);
for i in 0..fc1 {
if i != config.field1 {
specs.push(OutputSpec::FileField(0, i));
}
}
for i in 0..fc2 {
if i != config.field2 {
specs.push(OutputSpec::FileField(1, i));
}
}
Some(specs)
} else {
None
};
let format = config.output_format.as_deref().or(auto_specs.as_deref());
if config.header && !lines1.is_empty() && !lines2.is_empty() {
let key = extract_field(lines1[0], config.field1, config.separator);
if let Some(specs) = format {
let fields1 = split_fields(lines1[0], config.separator);
let fields2 = split_fields(lines2[0], config.separator);
write_paired_format(
&fields1, &fields2, key, specs, empty, out_sep, delim, &mut buf,
);
} else {
write_paired_default_zerocopy(
lines1[0],
lines2[0],
key,
config.field1,
config.field2,
config.separator,
out_sep,
delim,
&mut buf,
);
}
i1 = 1;
i2 = 1;
} else if config.header {
if !lines1.is_empty() {
i1 = 1;
}
if !lines2.is_empty() {
i2 = 1;
}
}
while i1 < lines1.len() && i2 < lines2.len() {
debug_assert!(i1 < keys1.len() && i2 < keys2.len());
let key1 = unsafe { *keys1.get_unchecked(i1) };
let key2 = unsafe { *keys2.get_unchecked(i2) };
if config.order_check != OrderCheck::None {
if !warned1 && i1 > (if config.header { 1 } else { 0 }) {
let prev_key = keys1[i1 - 1];
if compare_keys(key1, prev_key, ci) == Ordering::Less {
had_order_error = true;
warned1 = true;
eprintln!(
"{}: {}:{}: is not sorted: {}",
tool_name,
file1_name,
i1 + 1,
String::from_utf8_lossy(lines1[i1])
);
if config.order_check == OrderCheck::Strict {
out.write_all(&buf)?;
return Ok(true);
}
}
}
if !warned2 && i2 > (if config.header { 1 } else { 0 }) {
let prev_key = keys2[i2 - 1];
if compare_keys(key2, prev_key, ci) == Ordering::Less {
had_order_error = true;
warned2 = true;
eprintln!(
"{}: {}:{}: is not sorted: {}",
tool_name,
file2_name,
i2 + 1,
String::from_utf8_lossy(lines2[i2])
);
if config.order_check == OrderCheck::Strict {
out.write_all(&buf)?;
return Ok(true);
}
}
}
}
match compare_keys(key1, key2, ci) {
Ordering::Less => {
if show_unpaired1 {
if let Some(specs) = format {
let fields1 = split_fields(lines1[i1], config.separator);
write_unpaired_format(
&fields1,
0,
config.field1,
specs,
empty,
out_sep,
delim,
&mut buf,
);
} else {
write_unpaired_default_zerocopy(
lines1[i1],
config.field1,
config.separator,
out_sep,
delim,
&mut buf,
);
}
}
i1 += 1;
if show_unpaired1 && buf.len() >= FLUSH_THRESHOLD {
out.write_all(&buf)?;
buf.clear();
}
}
Ordering::Greater => {
if show_unpaired2 {
if let Some(specs) = format {
let fields2 = split_fields(lines2[i2], config.separator);
write_unpaired_format(
&fields2,
1,
config.field2,
specs,
empty,
out_sep,
delim,
&mut buf,
);
} else {
write_unpaired_default_zerocopy(
lines2[i2],
config.field2,
config.separator,
out_sep,
delim,
&mut buf,
);
}
}
i2 += 1;
if buf.len() >= FLUSH_THRESHOLD {
out.write_all(&buf)?;
buf.clear();
}
}
Ordering::Equal => {
let group_start = i2;
let current_key = key2;
i2 += 1;
while i2 < lines2.len() {
debug_assert!(i2 < keys2.len());
let next_key = unsafe { *keys2.get_unchecked(i2) };
if compare_keys(next_key, current_key, ci) != Ordering::Equal {
break;
}
i2 += 1;
}
let group_size = i2 - group_start;
let group2_fields: Vec<Vec<&[u8]>> = if print_paired && format.is_some() {
(group_start..i2)
.map(|j| split_fields(lines2[j], config.separator))
.collect()
} else {
Vec::new()
};
let group2_suffixes: Vec<Vec<u8>> =
if print_paired && format.is_none() && group_size > 1 {
(group_start..i2)
.map(|j| {
let mut s = Vec::with_capacity(lines2[j].len());
write_other_fields(
lines2[j],
config.field2,
config.separator,
out_sep,
&mut s,
);
s
})
.collect()
} else {
Vec::new()
};
loop {
if print_paired {
let key = extract_field(lines1[i1], config.field1, config.separator);
if let Some(specs) = format {
let fields1 = split_fields(lines1[i1], config.separator);
for fields2 in &group2_fields {
write_paired_format(
&fields1, fields2, key, specs, empty, out_sep, delim, &mut buf,
);
}
} else if !group2_suffixes.is_empty() {
let mut prefix = Vec::with_capacity(lines1[i1].len() + 2);
prefix.extend_from_slice(key);
write_other_fields(
lines1[i1],
config.field1,
config.separator,
out_sep,
&mut prefix,
);
for suffix in &group2_suffixes {
buf.extend_from_slice(&prefix);
buf.extend_from_slice(suffix);
buf.push(delim);
}
} else {
for j in group_start..i2 {
write_paired_default_zerocopy(
lines1[i1],
lines2[j],
key,
config.field1,
config.field2,
config.separator,
out_sep,
delim,
&mut buf,
);
}
}
}
if buf.len() >= FLUSH_THRESHOLD {
out.write_all(&buf)?;
buf.clear();
}
i1 += 1;
if i1 >= lines1.len() {
break;
}
debug_assert!(i1 < keys1.len());
let next_key = unsafe { *keys1.get_unchecked(i1) };
let cmp = compare_keys(next_key, current_key, ci);
if cmp != Ordering::Equal {
if config.order_check != OrderCheck::None
&& !warned1
&& cmp == Ordering::Less
{
had_order_error = true;
warned1 = true;
eprintln!(
"{}: {}:{}: is not sorted: {}",
tool_name,
file1_name,
i1 + 1,
String::from_utf8_lossy(lines1[i1])
);
if config.order_check == OrderCheck::Strict {
out.write_all(&buf)?;
return Ok(true);
}
}
break;
}
}
}
}
}
while i1 < lines1.len() {
if config.order_check != OrderCheck::None
&& !warned1
&& i1 > (if config.header { 1 } else { 0 })
{
let key1 = keys1[i1];
let prev_key = keys1[i1 - 1];
if compare_keys(key1, prev_key, ci) == Ordering::Less {
had_order_error = true;
warned1 = true;
eprintln!(
"{}: {}:{}: is not sorted: {}",
tool_name,
file1_name,
i1 + 1,
String::from_utf8_lossy(lines1[i1])
);
if config.order_check == OrderCheck::Strict {
out.write_all(&buf)?;
return Ok(true);
}
}
}
if show_unpaired1 {
if let Some(specs) = format {
let fields1 = split_fields(lines1[i1], config.separator);
write_unpaired_format(
&fields1,
0,
config.field1,
specs,
empty,
out_sep,
delim,
&mut buf,
);
} else {
write_unpaired_default_zerocopy(
lines1[i1],
config.field1,
config.separator,
out_sep,
delim,
&mut buf,
);
}
}
i1 += 1;
}
while i2 < lines2.len() {
if config.order_check != OrderCheck::None
&& !warned2
&& i2 > (if config.header { 1 } else { 0 })
{
let key2 = keys2[i2];
let prev_key = keys2[i2 - 1];
if compare_keys(key2, prev_key, ci) == Ordering::Less {
had_order_error = true;
warned2 = true;
eprintln!(
"{}: {}:{}: is not sorted: {}",
tool_name,
file2_name,
i2 + 1,
String::from_utf8_lossy(lines2[i2])
);
if config.order_check == OrderCheck::Strict {
out.write_all(&buf)?;
return Ok(true);
}
}
}
if show_unpaired2 {
if let Some(specs) = format {
let fields2 = split_fields(lines2[i2], config.separator);
write_unpaired_format(
&fields2,
1,
config.field2,
specs,
empty,
out_sep,
delim,
&mut buf,
);
} else {
write_unpaired_default_zerocopy(
lines2[i2],
config.field2,
config.separator,
out_sep,
delim,
&mut buf,
);
}
}
i2 += 1;
}
out.write_all(&buf)?;
Ok(had_order_error)
}