use crate::column::{OutCol, ScopedValue};
use crate::comp::CompMaker;
use crate::prelude::*;
use crate::util::Outfile;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
pub enum JoinType {
#[default]
Quick,
Full,
Poly,
Binsearch,
Hash,
}
#[derive(Debug, Clone, Default)]
pub struct NoMatch {
pub file_num: usize,
pub file_name: String,
}
impl NoMatch {
#[must_use]
pub fn new(file_num: usize, file_name: &str) -> Self {
Self { file_num, file_name: file_name.to_string() }
}
}
#[derive(Debug, Default, Clone)]
pub struct OutColSpec {
file: usize,
cols: ColumnSet,
}
impl OutColSpec {
#[must_use]
pub const fn new(file: usize, cols: ColumnSet) -> Self {
Self { file, cols }
}
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct JoinConfig {
pub join_type: JoinType,
pub infiles: Vec<String>,
pub match_out: String,
pub unmatch_out: Vec<NoMatch>,
pub col_specs: Vec<OutColSpec>,
pub skip_check: bool,
pub out_delim: u8,
pub lookback_limit: u32,
pub keys: Vec<String>,
pub dups: DupColHandling,
pub use_other: bool,
pub empty: ScopedValue,
}
#[derive(Clone, Default, Debug)]
struct OneOutCol {
file: usize,
col: OutCol,
}
impl OneOutCol {
fn new(file: usize, col: &OutCol) -> Self {
Self { file, col: col.clone() }
}
const fn new_plain(file: usize, col: usize) -> Self {
Self { file, col: OutCol::from_num(col) }
}
fn write(&self, mut w: impl Write, f: &[Reader]) -> Result<()> {
w.write_all(f[self.file].curr().get(self.col.num))?;
Ok(())
}
fn write_head(&self, mut w: impl Write, f: &[Reader]) -> Result<()> {
w.write_all(f[self.file].header().get(self.col.num).as_bytes())?;
Ok(())
}
}
impl JoinConfig {
#[must_use]
pub fn new() -> Self {
Self {
match_out: "-".to_string(),
out_delim: b'\t',
lookback_limit: 100,
..Self::default()
}
}
pub fn join(&self) -> Result<()> {
Joiner::new(self)?.join(self)
}
}
type SharedOutFile = Rc<RefCell<Outfile>>;
#[derive(Default)]
struct Joiner {
r: Vec<Reader>,
comp: LineCompList,
yes_match: Outfile,
no_match: Vec<Option<SharedOutFile>>,
out_cols: Vec<OneOutCol>,
files: HashMap<String, SharedOutFile>,
}
impl Joiner {
fn get_shared_writer(&mut self, name: &str) -> Result<(bool, SharedOutFile)> {
if self.files.contains_key(name) {
return Ok((false, self.files.get(name).unwrap().clone()));
}
let f = get_writer(name)?;
let f = Rc::new(RefCell::new(f));
self.files.insert(name.to_string(), f.clone());
Ok((true, f))
}
fn new(config: &JoinConfig) -> Result<Self> {
Ok(Self { yes_match: get_writer(&config.match_out)?, ..Default::default() })
}
fn join(&mut self, config: &JoinConfig) -> Result<()> {
if config.infiles.len() < 2 {
return err!("Join requires at least two input files, {} found", config.infiles.len());
}
for x in &config.infiles {
self.r.push(Reader::new_open2(x)?);
}
for _x in 0..config.infiles.len() {
self.no_match.push(None);
}
for x in &config.unmatch_out {
if (x.file_num < 1) || (x.file_num > config.infiles.len()) {
return err!(
"Join had {} input files, but requested non matching lines from file {}",
config.infiles.len(),
x.file_num
);
}
let num = x.file_num - 1;
if self.no_match[num].is_none() {
let (flag, w) = self.get_shared_writer(&x.file_name)?;
if flag {
self.r[num].write_header(&mut w.borrow_mut().0)?;
}
self.no_match[num] = Some(w);
} else {
return err!("Multiple uses of --also for file {}", x.file_num);
}
}
if config.keys.is_empty() {
self.comp.push(CompMaker::make_line_comp("1")?);
} else {
for x in &config.keys {
self.comp.push(CompMaker::make_line_comp(x)?);
}
}
for i in 0..self.r.len() {
self.comp.lookup_n(&self.r[i].names(), i, self.r.len())?;
}
if config.col_specs.is_empty() {
for f in 0..self.r.len() {
let used = self.comp.used_cols(f);
for x in 0..self.r[f].names().len() {
if (f == 0) || !used.contains(&x) {
self.out_cols.push(OneOutCol::new_plain(f, x));
}
}
}
} else {
for x in &config.col_specs {
let mut x = x.clone();
if x.file >= self.r.len() {
return err!(
"{} input files, but file {} referred to as an output column",
self.r.len(),
x.file
);
}
x.cols.lookup(&self.r[x.file].names())?;
for y in x.cols.get_cols() {
self.out_cols.push(OneOutCol::new(x.file, y));
}
}
}
if self.out_cols.is_empty() {
return err!("No output columns specified");
}
if self.r[0].has_header() {
self.yes_match.write_all(b" CDX")?;
for x in &self.out_cols {
self.yes_match.write_all(&[config.out_delim])?;
x.write_head(&mut *self.yes_match, &self.r)?;
}
self.yes_match.0.write_all(b"\n")?;
}
if config.join_type == JoinType::Quick {
self.join_quick(config)
} else {
err!("Only quick supported")
}
}
fn join_quick(&mut self, config: &JoinConfig) -> Result<()> {
if !self.r[0].is_done() && !self.r[1].is_done() {
let mut cmp = self.comp.comp_cols_n(self.r[0].curr(), self.r[1].curr(), 0, 1);
'outer: loop {
match cmp {
Ordering::Equal => loop {
self.out_cols[0].write(&mut *self.yes_match, &self.r)?;
for x in &self.out_cols[1..] {
self.yes_match.write_all(&[config.out_delim])?;
x.write(&mut *self.yes_match, &self.r)?;
}
self.yes_match.write_all(b"\n")?;
if self.r[0].get_line()? {
self.r[1].get_line()?;
break 'outer;
}
cmp = self.comp.comp_cols_n(self.r[0].curr(), self.r[1].curr(), 0, 1);
if cmp != Ordering::Equal {
if self.r[1].get_line()? {
break 'outer;
}
cmp = self.comp.comp_cols_n(self.r[0].curr(), self.r[1].curr(), 0, 1);
break;
}
},
Ordering::Less => {
if let Some(x) = &mut self.no_match[0] {
self.r[0].write(&mut x.borrow_mut().0)?;
}
if self.r[0].get_line()? {
break;
}
cmp = self.comp.comp_cols_n(self.r[0].curr(), self.r[1].curr(), 0, 1);
}
Ordering::Greater => {
if let Some(x) = &mut self.no_match[1] {
self.r[1].write(&mut x.borrow_mut().0)?;
}
if self.r[1].get_line()? {
break;
}
cmp = self.comp.comp_cols_n(self.r[0].curr(), self.r[1].curr(), 0, 1);
}
}
}
}
while !self.r[0].is_done() {
if let Some(x) = &mut self.no_match[0] {
self.r[0].write(&mut x.borrow_mut().0)?;
}
self.r[0].get_line()?;
}
while !self.r[1].is_done() {
if let Some(x) = &mut self.no_match[1] {
self.r[1].write(&mut x.borrow_mut().0)?;
}
self.r[1].get_line()?;
}
Ok(())
}
}