use {
super::JoinType,
crate::{
macros::try_option, recipe::PlannedRecipe, types::Row, JoinError, NullOrd, Result, Value,
},
rayon::prelude::*,
std::{cmp::Ordering, fmt::Debug},
};
macro_rules! unwrap_or_break {
($unwrap: expr) => {
match $unwrap {
Some(value) => value,
None => {
break;
}
}
};
}
#[derive(Debug)]
pub enum JoinMethod {
General(PlannedRecipe),
ColumnEqColumn {
plane_index: usize,
plane_trust_ordered: bool,
self_index: usize,
self_trust_ordered: bool,
},
All,
FirstTable,
Ignore,
}
impl JoinMethod {
pub fn run(
self,
join: &JoinType,
left_width: usize,
right_width: usize,
mut plane_rows: Vec<Row>,
mut self_rows: Vec<Row>,
) -> Result<Vec<Row>> {
Ok(match self {
JoinMethod::Ignore => plane_rows,
JoinMethod::FirstTable => self_rows,
JoinMethod::All => self_rows
.into_iter()
.fold(vec![], |mut result_rows, self_row| {
let joined_rows = plane_rows.clone().into_iter().map(|mut plane_row| {
plane_row.extend(self_row.clone());
plane_row
});
result_rows.extend(joined_rows);
result_rows
}),
JoinMethod::General(recipe) => {
let unfolded_rows = plane_rows
.into_par_iter()
.map(|left_row| {
let inner_rows = self_rows
.iter()
.enumerate()
.filter_map(|(index, right_row)| {
if try_option!(recipe.confirm_join_constraint(&left_row, right_row))
{
Some(Ok((
index,
join_parts(left_row.clone(), right_row.clone()),
)))
} else {
None
}
})
.collect::<Result<Vec<(usize, Row)>>>()?;
Ok(if inner_rows.is_empty() && join.includes_left() {
(
vec![],
vec![join_parts(left_row, vec![Value::Null; right_width])],
)
} else {
inner_rows.into_iter().unzip()
})
})
.collect::<Result<Vec<(Vec<usize>, Vec<Row>)>>>()?;
let (mut used_right_indexes, mut rows): (Vec<usize>, Vec<Row>) = unfolded_rows
.into_iter()
.reduce(
|mut all: (Vec<usize>, Vec<Row>), set: (Vec<usize>, Vec<Row>)| {
all.0.extend(set.0);
all.1.extend(set.1);
all
},
)
.unwrap_or((vec![], vec![]));
used_right_indexes.par_sort_unstable();
used_right_indexes.dedup();
self_rows.iter().enumerate().for_each(|(index, row)| {
if !used_right_indexes.iter().any(|used| used == &index)
&& join.includes_right()
{
rows.push(join_parts(vec![Value::Null; left_width], row.clone()))
}
});
rows
}
JoinMethod::ColumnEqColumn {
plane_index,
plane_trust_ordered,
self_index,
self_trust_ordered,
} => {
if !plane_trust_ordered {
plane_rows.par_sort_unstable_by(|row_l, row_r| {
row_l
.get(plane_index)
.and_then(|row_l| {
row_r
.get(plane_index)
.and_then(|row_r| row_l.null_cmp(row_r))
})
.unwrap_or(Ordering::Equal)
});
}
let mut left_partitions = plane_rows
.into_iter()
.fold(
vec![],
|mut partitions: Vec<(Value, Vec<Row>)>, row: Row| {
let value = row.get(plane_index).unwrap().clone(); if let Some(last) = partitions.last_mut() {
if last.0 == value {
last.1.push(row);
} else {
partitions.push((value, vec![row]));
}
partitions
} else {
vec![(value, vec![row])]
}
},
)
.into_iter()
.peekable();
if !self_trust_ordered {
self_rows.par_sort_unstable_by(|row_l, row_r| {
row_l
.get(self_index)
.and_then(|row_l| {
row_r
.get(self_index)
.and_then(|row_r| row_l.null_cmp(row_r))
})
.unwrap_or(Ordering::Equal)
});
}
let mut right_partitions = self_rows
.into_iter()
.fold(
vec![],
|mut partitions: Vec<(Value, Vec<Row>)>, row: Row| {
let value = row.get(self_index).unwrap().clone(); if let Some(last) = partitions.last_mut() {
if last.0 == value {
last.1.push(row);
} else {
partitions.push((value, vec![row]));
}
partitions
} else {
vec![(value, vec![row])]
}
},
)
.into_iter()
.peekable();
let mut left_results = vec![];
let mut inner_results = vec![];
let mut right_results = vec![];
loop {
match unwrap_or_break!(left_partitions.peek())
.0
.null_cmp(&unwrap_or_break!(right_partitions.peek()).0)
{
Some(Ordering::Less) => {
left_results
.push(left_partitions.next().ok_or(JoinError::Unreachable)?);
}
Some(Ordering::Equal) => {
inner_results.push((
left_partitions.next().ok_or(JoinError::Unreachable)?,
right_partitions.next().ok_or(JoinError::Unreachable)?,
));
}
None => {
left_results
.push(left_partitions.next().ok_or(JoinError::Unreachable)?);
right_results
.push(right_partitions.next().ok_or(JoinError::Unreachable)?);
}
Some(Ordering::Greater) => {
right_results
.push(right_partitions.next().ok_or(JoinError::Unreachable)?);
}
}
}
left_results.extend(left_partitions);
right_results.extend(right_partitions);
let left_rows = left_results
.into_par_iter()
.map(|(_, left_rows)| {
left_rows
.into_iter()
.map(|left| join_parts(left, vec![Value::Null; right_width]))
.collect::<Vec<Row>>()
})
.reduce(Vec::new, |mut all, set| {
all.extend(set);
all
});
let mut inner_rows = inner_results
.into_par_iter()
.map(|((_, left_rows), (_, right_rows))| {
left_rows
.into_iter()
.map(|left| {
right_rows
.clone()
.into_iter()
.map(|right| join_parts(left.clone(), right))
.collect()
})
.reduce(|mut all: Vec<Row>, set| {
all.extend(set);
all
})
.unwrap_or_default()
})
.reduce(Vec::new, |mut all: Vec<Row>, set| {
all.extend(set);
all
});
let right_rows = right_results
.into_par_iter()
.map(|(_, right_rows)| {
right_rows
.into_iter()
.map(|right| join_parts(vec![Value::Null; left_width], right))
.collect::<Vec<Row>>()
})
.reduce(Vec::new, |mut all, set| {
all.extend(set);
all
});
if join.includes_left() {
inner_rows.extend(left_rows)
};
if join.includes_right() {
inner_rows.extend(right_rows)
};
inner_rows
}
})
}
}
fn join_parts(mut left: Vec<Value>, right: Vec<Value>) -> Vec<Value> {
left.extend(right);
left
}