use crate::common::CompactArc;
use crate::core::value::NULL_VALUE;
use crate::core::{Result, Row};
use crate::executor::hash_table::{hash_row_keys, verify_key_equality, JoinHashTable};
use crate::executor::operator::{ColumnInfo, Operator, RowRef};
const BUILD_COLUMN_NAMES: [&str; 32] = [
"build_0", "build_1", "build_2", "build_3", "build_4", "build_5", "build_6", "build_7",
"build_8", "build_9", "build_10", "build_11", "build_12", "build_13", "build_14", "build_15",
"build_16", "build_17", "build_18", "build_19", "build_20", "build_21", "build_22", "build_23",
"build_24", "build_25", "build_26", "build_27", "build_28", "build_29", "build_30", "build_31",
];
#[inline]
fn get_build_column_name(i: usize) -> String {
if i < BUILD_COLUMN_NAMES.len() {
BUILD_COLUMN_NAMES[i].to_string()
} else {
format!("build_{}", i)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinSide {
Left,
Right,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinType {
Inner,
Left,
Right,
Full,
Cross,
Semi,
Anti,
}
impl JoinType {
pub fn parse(s: &str) -> Self {
let bytes = s.as_bytes();
for (i, &b) in bytes.iter().enumerate() {
match b | 32 {
b'l' => {
if i + 4 <= bytes.len()
&& (bytes[i + 1] | 32) == b'e'
&& (bytes[i + 2] | 32) == b'f'
&& (bytes[i + 3] | 32) == b't'
{
return JoinType::Left;
}
}
b'r' => {
if i + 5 <= bytes.len()
&& (bytes[i + 1] | 32) == b'i'
&& (bytes[i + 2] | 32) == b'g'
&& (bytes[i + 3] | 32) == b'h'
&& (bytes[i + 4] | 32) == b't'
{
return JoinType::Right;
}
}
b'f' => {
if i + 4 <= bytes.len()
&& (bytes[i + 1] | 32) == b'u'
&& (bytes[i + 2] | 32) == b'l'
&& (bytes[i + 3] | 32) == b'l'
{
return JoinType::Full;
}
}
b'c' => {
if i + 5 <= bytes.len()
&& (bytes[i + 1] | 32) == b'r'
&& (bytes[i + 2] | 32) == b'o'
&& (bytes[i + 3] | 32) == b's'
&& (bytes[i + 4] | 32) == b's'
{
return JoinType::Cross;
}
}
b's' => {
if i + 4 <= bytes.len()
&& (bytes[i + 1] | 32) == b'e'
&& (bytes[i + 2] | 32) == b'm'
&& (bytes[i + 3] | 32) == b'i'
{
return JoinType::Semi;
}
}
b'a' => {
if i + 4 <= bytes.len()
&& (bytes[i + 1] | 32) == b'n'
&& (bytes[i + 2] | 32) == b't'
&& (bytes[i + 3] | 32) == b'i'
{
return JoinType::Anti;
}
}
_ => {}
}
}
JoinType::Inner
}
#[allow(clippy::should_implement_trait)]
pub fn from_str(s: &str) -> Self {
Self::parse(s)
}
pub fn needs_unmatched_probe(&self, swapped: bool) -> bool {
match self {
JoinType::Inner | JoinType::Cross | JoinType::Semi => false,
JoinType::Anti => !swapped, JoinType::Left => !swapped, JoinType::Right => swapped, JoinType::Full => true, }
}
pub fn needs_unmatched_build(&self, swapped: bool) -> bool {
match self {
JoinType::Inner | JoinType::Cross | JoinType::Semi | JoinType::Anti => false,
JoinType::Left => swapped, JoinType::Right => !swapped, JoinType::Full => true, }
}
pub fn is_semi(&self) -> bool {
matches!(self, JoinType::Semi)
}
pub fn is_anti(&self) -> bool {
matches!(self, JoinType::Anti)
}
}
pub struct HashJoinOperator {
left: Box<dyn Operator>,
right: Box<dyn Operator>,
join_type: JoinType,
build_side: JoinSide,
left_key_indices: Vec<usize>,
right_key_indices: Vec<usize>,
build_rows: CompactArc<Vec<Row>>,
hash_table: Option<JoinHashTable>,
schema: Vec<ColumnInfo>,
left_col_count: usize,
right_col_count: usize,
current_probe_row: Option<Row>,
current_probe_hash: u64,
current_match_idx: usize,
current_matches: Vec<usize>,
probe_had_match: bool,
build_matched: Vec<bool>,
returning_unmatched_build: bool,
unmatched_build_idx: usize,
is_self_join: bool,
self_join_probe_idx: usize,
cached_null_build: Option<Row>,
cached_null_probe: Option<Row>,
opened: bool,
probe_exhausted: bool,
}
impl HashJoinOperator {
pub fn new(
left: Box<dyn Operator>,
right: Box<dyn Operator>,
join_type: JoinType,
left_key_indices: Vec<usize>,
right_key_indices: Vec<usize>,
build_side: JoinSide,
) -> Self {
let mut schema = Vec::new();
if join_type.is_semi() || join_type.is_anti() {
schema.extend(left.schema().iter().cloned());
} else {
schema.extend(left.schema().iter().cloned());
schema.extend(right.schema().iter().cloned());
}
let left_col_count = left.schema().len();
let right_col_count = right.schema().len();
Self {
left,
right,
join_type,
build_side,
left_key_indices,
right_key_indices,
build_rows: CompactArc::new(Vec::new()),
hash_table: None,
schema,
left_col_count,
right_col_count,
current_probe_row: None,
current_probe_hash: 0,
current_match_idx: 0,
current_matches: Vec::new(),
probe_had_match: false,
build_matched: Vec::new(),
returning_unmatched_build: false,
unmatched_build_idx: 0,
is_self_join: false,
self_join_probe_idx: 0,
cached_null_build: None,
cached_null_probe: None,
opened: false,
probe_exhausted: false,
}
}
pub fn with_prebuilt(
probe: Box<dyn Operator>,
build_rows: CompactArc<Vec<Row>>,
hash_table: crate::executor::hash_table::JoinHashTable,
join_type: JoinType,
probe_key_indices: Vec<usize>,
build_key_indices: Vec<usize>,
build_is_left: bool,
) -> Self {
let probe_col_count = probe.schema().len();
let build_col_count = if build_rows.is_empty() {
0
} else {
build_rows[0].len()
};
let total_cols = build_col_count + probe_col_count;
let mut schema = Vec::with_capacity(total_cols);
let (left_col_count, right_col_count) = if build_is_left {
for i in 0..build_col_count {
schema.push(ColumnInfo::new(get_build_column_name(i)));
}
schema.extend(probe.schema().iter().cloned());
(build_col_count, probe_col_count)
} else {
schema.extend(probe.schema().iter().cloned());
for i in 0..build_col_count {
schema.push(ColumnInfo::new(get_build_column_name(i)));
}
(probe_col_count, build_col_count)
};
let (left_key_indices, right_key_indices, build_side) = if build_is_left {
(build_key_indices, probe_key_indices, JoinSide::Left)
} else {
(probe_key_indices, build_key_indices, JoinSide::Right)
};
let build_matched = if matches!(join_type, JoinType::Full)
|| (matches!(join_type, JoinType::Left) && build_is_left)
|| (matches!(join_type, JoinType::Right) && !build_is_left)
{
vec![false; build_rows.len()]
} else {
Vec::new()
};
let (left, right) = if build_is_left {
(
Box::new(crate::executor::operator::EmptyOperator::new()) as Box<dyn Operator>,
probe,
)
} else {
(
probe,
Box::new(crate::executor::operator::EmptyOperator::new()) as Box<dyn Operator>,
)
};
Self {
left,
right,
join_type,
build_side,
left_key_indices,
right_key_indices,
build_rows,
hash_table: Some(hash_table),
schema,
left_col_count,
right_col_count,
current_probe_row: None,
current_probe_hash: 0,
current_match_idx: 0,
current_matches: Vec::new(),
probe_had_match: false,
build_matched,
returning_unmatched_build: false,
unmatched_build_idx: 0,
is_self_join: false,
self_join_probe_idx: 0,
cached_null_build: None,
cached_null_probe: None,
opened: false,
probe_exhausted: false,
}
}
pub fn self_join(
input: Box<dyn Operator>,
join_type: JoinType,
left_key_indices: Vec<usize>,
right_key_indices: Vec<usize>,
) -> Self {
let mut schema = Vec::new();
schema.extend(input.schema().iter().cloned());
schema.extend(input.schema().iter().cloned());
let col_count = input.schema().len();
Self {
left: input,
right: Box::new(crate::executor::operator::EmptyOperator::new()),
join_type,
build_side: JoinSide::Left, left_key_indices,
right_key_indices,
build_rows: CompactArc::new(Vec::new()),
hash_table: None,
schema,
left_col_count: col_count,
right_col_count: col_count,
current_probe_row: None,
current_probe_hash: 0,
current_match_idx: 0,
current_matches: Vec::new(),
probe_had_match: false,
build_matched: Vec::new(),
returning_unmatched_build: false,
unmatched_build_idx: 0,
is_self_join: true,
self_join_probe_idx: 0,
cached_null_build: None,
cached_null_probe: None,
opened: false,
probe_exhausted: false,
}
}
fn build_key_indices(&self) -> &[usize] {
match self.build_side {
JoinSide::Left => &self.left_key_indices,
JoinSide::Right => &self.right_key_indices,
}
}
fn probe_key_indices(&self) -> &[usize] {
match self.build_side {
JoinSide::Left => &self.right_key_indices,
JoinSide::Right => &self.left_key_indices,
}
}
#[inline]
fn null_build_row(&mut self) -> Row {
if let Some(ref row) = self.cached_null_build {
return row.clone();
}
let count = match self.build_side {
JoinSide::Left => self.left_col_count,
JoinSide::Right => self.right_col_count,
};
let row = Row::from_values(vec![NULL_VALUE; count]);
self.cached_null_build = Some(row.clone());
row
}
#[inline]
fn null_probe_row(&mut self) -> Row {
if let Some(ref row) = self.cached_null_probe {
return row.clone();
}
let count = match self.build_side {
JoinSide::Left => self.right_col_count,
JoinSide::Right => self.left_col_count,
};
let row = Row::from_values(vec![NULL_VALUE; count]);
self.cached_null_probe = Some(row.clone());
row
}
#[inline]
fn combine_rows_direct(&self, probe_row: Row, build_idx: usize) -> RowRef {
let probe_is_left = matches!(self.build_side, JoinSide::Right);
RowRef::direct_build_composite(
probe_row,
CompactArc::clone(&self.build_rows),
build_idx,
probe_is_left,
)
}
#[inline]
fn combine_rows_ref(&self, probe_row: Row, build_row: Row) -> RowRef {
match self.build_side {
JoinSide::Left => {
RowRef::Composite(crate::executor::operator::CompositeRow::new(
build_row, probe_row,
))
}
JoinSide::Right => {
RowRef::Composite(crate::executor::operator::CompositeRow::new(
probe_row, build_row,
))
}
}
}
fn next_probe_row(&mut self) -> Result<Option<Row>> {
if self.is_self_join {
if self.self_join_probe_idx >= self.build_rows.len() {
return Ok(None);
}
let row = self.build_rows[self.self_join_probe_idx].clone();
self.self_join_probe_idx += 1;
Ok(Some(row))
} else {
let probe_op = match self.build_side {
JoinSide::Left => &mut self.right,
JoinSide::Right => &mut self.left,
};
match probe_op.next()? {
Some(row_ref) => Ok(Some(row_ref.into_owned())),
None => Ok(None),
}
}
}
}
impl Operator for HashJoinOperator {
fn open(&mut self) -> Result<()> {
if self.hash_table.is_some() {
let probe_op = match self.build_side {
JoinSide::Left => &mut self.right,
JoinSide::Right => &mut self.left,
};
probe_op.open()?;
self.opened = true;
return Ok(());
}
self.left.open()?;
if !self.is_self_join {
self.right.open()?;
}
let build_op = match self.build_side {
JoinSide::Left => &mut self.left,
JoinSide::Right => &mut self.right,
};
let mut build_rows = Vec::new();
while let Some(row_ref) = build_op.next()? {
build_rows.push(row_ref.into_owned());
}
let build_key_indices = self.build_key_indices().to_vec();
let hash_table = JoinHashTable::build(&build_rows, &build_key_indices);
let needs_build_tracking = matches!(self.join_type, JoinType::Full)
|| (matches!(self.join_type, JoinType::Left) && self.build_side == JoinSide::Left)
|| (matches!(self.join_type, JoinType::Right) && self.build_side == JoinSide::Right)
|| (self.is_self_join && !matches!(self.join_type, JoinType::Inner));
if needs_build_tracking {
self.build_matched = vec![false; build_rows.len()];
}
self.build_rows = CompactArc::new(build_rows);
self.hash_table = Some(hash_table);
self.opened = true;
Ok(())
}
fn next(&mut self) -> Result<Option<RowRef>> {
if !self.opened {
return Err(crate::core::Error::internal(
"HashJoinOperator::next called before open",
));
}
if self.returning_unmatched_build {
while self.unmatched_build_idx < self.build_rows.len() {
let idx = self.unmatched_build_idx;
self.unmatched_build_idx += 1;
if !self.build_matched[idx] {
let build_row = self.build_rows[idx].clone();
let null_probe = self.null_probe_row();
return Ok(Some(self.combine_rows_ref(null_probe, build_row)));
}
}
return Ok(None);
}
loop {
while self.current_match_idx < self.current_matches.len() {
let build_idx = self.current_matches[self.current_match_idx];
self.current_match_idx += 1;
let build_row = &self.build_rows[build_idx];
if verify_key_equality(
self.current_probe_row.as_ref().unwrap(),
build_row,
self.probe_key_indices(),
self.build_key_indices(),
) {
self.probe_had_match = true;
if self.join_type.is_semi() {
let probe_row = self.current_probe_row.take().unwrap();
self.current_match_idx = self.current_matches.len();
return Ok(Some(RowRef::Owned(probe_row)));
}
if self.join_type.is_anti() {
self.current_match_idx = self.current_matches.len();
continue;
}
if !self.build_matched.is_empty() {
self.build_matched[build_idx] = true;
}
let probe_row = if self.current_match_idx >= self.current_matches.len() {
self.current_probe_row.take().unwrap()
} else {
self.current_probe_row.as_ref().unwrap().clone()
};
return Ok(Some(self.combine_rows_direct(probe_row, build_idx)));
}
}
if self.join_type.is_anti() && !self.probe_had_match {
if let Some(probe_row) = self.current_probe_row.take() {
return Ok(Some(RowRef::Owned(probe_row)));
}
}
let needs_unmatched_probe = matches!(self.join_type, JoinType::Full)
|| (matches!(self.join_type, JoinType::Right) && self.build_side == JoinSide::Left)
|| (matches!(self.join_type, JoinType::Left) && self.build_side == JoinSide::Right);
if needs_unmatched_probe && !self.probe_had_match {
if let Some(probe_row) = self.current_probe_row.take() {
let null_build = self.null_build_row();
return Ok(Some(self.combine_rows_ref(probe_row, null_build)));
}
}
let next_probe = self.next_probe_row()?;
match next_probe {
Some(probe_row) => {
let probe_key_indices = self.probe_key_indices();
let hash = hash_row_keys(&probe_row, probe_key_indices);
let hash_table = self.hash_table.as_ref().unwrap();
self.current_matches.clear();
self.current_matches.extend(hash_table.probe(hash));
self.current_probe_row = Some(probe_row);
self.current_probe_hash = hash;
self.current_match_idx = 0;
self.probe_had_match = false;
}
None => {
self.probe_exhausted = true;
if !self.build_matched.is_empty() {
self.returning_unmatched_build = true;
self.unmatched_build_idx = 0;
return self.next();
}
return Ok(None);
}
}
}
}
fn close(&mut self) -> Result<()> {
self.left.close()?;
if !self.is_self_join {
self.right.close()?;
}
Ok(())
}
fn schema(&self) -> &[ColumnInfo] {
&self.schema
}
fn estimated_rows(&self) -> Option<usize> {
let left_est = self.left.estimated_rows()?;
let right_est = self.right.estimated_rows()?;
Some(match self.join_type {
JoinType::Inner => left_est.min(right_est),
JoinType::Left => left_est,
JoinType::Right => right_est,
JoinType::Full => left_est + right_est,
JoinType::Cross => left_est * right_est,
JoinType::Semi => left_est.min(right_est), JoinType::Anti => left_est, })
}
fn name(&self) -> &str {
match self.join_type {
JoinType::Inner => "HashJoin (INNER)",
JoinType::Left => "HashJoin (LEFT)",
JoinType::Right => "HashJoin (RIGHT)",
JoinType::Full => "HashJoin (FULL)",
JoinType::Cross => "HashJoin (CROSS)",
JoinType::Semi => "HashJoin (SEMI)",
JoinType::Anti => "HashJoin (ANTI)",
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::Value;
use crate::executor::operator::MaterializedOperator;
fn make_rows(data: Vec<Vec<i64>>) -> Vec<Row> {
data.into_iter()
.map(|vals| Row::from_values(vals.into_iter().map(Value::integer).collect()))
.collect()
}
fn make_operator(data: Vec<Vec<i64>>, cols: Vec<&str>) -> Box<dyn Operator> {
let rows = make_rows(data);
let schema = cols.into_iter().map(ColumnInfo::new).collect();
Box::new(MaterializedOperator::new(rows, schema))
}
fn collect_results(op: &mut dyn Operator) -> Result<Vec<Row>> {
let mut results = Vec::new();
op.open()?;
while let Some(row_ref) = op.next()? {
results.push(row_ref.into_owned());
}
op.close()?;
Ok(results)
}
#[test]
fn test_inner_join() {
let left = make_operator(
vec![vec![1, 10], vec![2, 20], vec![3, 30]],
vec!["id", "value"],
);
let right = make_operator(vec![vec![1, 100], vec![3, 300]], vec!["id", "data"]);
let mut join = HashJoinOperator::new(
left,
right,
JoinType::Inner,
vec![0], vec![0], JoinSide::Right,
);
let results = collect_results(&mut join).unwrap();
assert_eq!(results.len(), 2);
let row1 = &results[0];
assert_eq!(row1.get(0), Some(&Value::integer(1)));
assert_eq!(row1.get(1), Some(&Value::integer(10)));
assert_eq!(row1.get(2), Some(&Value::integer(1)));
assert_eq!(row1.get(3), Some(&Value::integer(100)));
}
#[test]
fn test_left_join() {
let left = make_operator(
vec![vec![1, 10], vec![2, 20], vec![3, 30]],
vec!["id", "value"],
);
let right = make_operator(vec![vec![1, 100]], vec!["id", "data"]);
let mut join = HashJoinOperator::new(
left,
right,
JoinType::Left,
vec![0],
vec![0],
JoinSide::Right,
);
let results = collect_results(&mut join).unwrap();
assert_eq!(results.len(), 3);
let row2 = results
.iter()
.find(|r| r.get(0) == Some(&Value::integer(2)))
.unwrap();
assert!(row2.get(2).unwrap().is_null());
assert!(row2.get(3).unwrap().is_null());
}
#[test]
fn test_self_join() {
let input = make_operator(
vec![vec![1, 10], vec![2, 10], vec![3, 20]],
vec!["id", "age"],
);
let mut join = HashJoinOperator::self_join(
input,
JoinType::Inner,
vec![1], vec![1], );
let results = collect_results(&mut join).unwrap();
assert_eq!(results.len(), 5);
}
#[test]
fn test_empty_build() {
let left = make_operator(vec![vec![1, 10], vec![2, 20]], vec!["id", "value"]);
let right = make_operator(vec![], vec!["id", "data"]);
let mut join = HashJoinOperator::new(
left,
right,
JoinType::Inner,
vec![0],
vec![0],
JoinSide::Right,
);
let results = collect_results(&mut join).unwrap();
assert_eq!(results.len(), 0);
}
#[test]
fn test_multi_key_join() {
let left = make_operator(
vec![vec![1, 10, 100], vec![1, 20, 200], vec![2, 10, 300]],
vec!["a", "b", "val"],
);
let right = make_operator(
vec![vec![1, 10, 1000], vec![1, 20, 2000]],
vec!["a", "b", "data"],
);
let mut join = HashJoinOperator::new(
left,
right,
JoinType::Inner,
vec![0, 1], vec![0, 1], JoinSide::Right,
);
let results = collect_results(&mut join).unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_semi_join() {
let left = make_operator(
vec![vec![1, 100], vec![2, 200], vec![3, 300]],
vec!["id", "value"],
);
let right = make_operator(
vec![vec![1, 10], vec![1, 20], vec![3, 30]],
vec!["user_id", "order_id"],
);
let mut join = HashJoinOperator::new(
left,
right,
JoinType::Semi,
vec![0], vec![0], JoinSide::Right,
);
let results = collect_results(&mut join).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(join.schema().len(), 2);
let ids: Vec<i64> = results
.iter()
.map(|r| r.get(0).unwrap().as_int64().unwrap())
.collect();
assert!(ids.contains(&1));
assert!(ids.contains(&3));
assert!(!ids.contains(&2));
}
#[test]
fn test_anti_join() {
let left = make_operator(
vec![vec![1, 100], vec![2, 200], vec![3, 300]],
vec!["id", "value"],
);
let right = make_operator(vec![vec![1, 10], vec![3, 30]], vec!["user_id", "order_id"]);
let mut join = HashJoinOperator::new(
left,
right,
JoinType::Anti,
vec![0], vec![0], JoinSide::Right,
);
let results = collect_results(&mut join).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(join.schema().len(), 2);
let row = &results[0];
assert_eq!(row.get(0), Some(&Value::integer(2)));
assert_eq!(row.get(1), Some(&Value::integer(200)));
}
#[test]
fn test_anti_join_empty_right() {
let left = make_operator(
vec![vec![1, 100], vec![2, 200], vec![3, 300]],
vec!["id", "value"],
);
let right = make_operator(vec![], vec!["user_id", "order_id"]);
let mut join = HashJoinOperator::new(
left,
right,
JoinType::Anti,
vec![0],
vec![0],
JoinSide::Right,
);
let results = collect_results(&mut join).unwrap();
assert_eq!(results.len(), 3);
}
#[test]
fn test_semi_join_empty_right() {
let left = make_operator(
vec![vec![1, 100], vec![2, 200], vec![3, 300]],
vec!["id", "value"],
);
let right = make_operator(vec![], vec!["user_id", "order_id"]);
let mut join = HashJoinOperator::new(
left,
right,
JoinType::Semi,
vec![0],
vec![0],
JoinSide::Right,
);
let results = collect_results(&mut join).unwrap();
assert_eq!(results.len(), 0);
}
}