use crate::common::{CompactArc, CompactVec};
use crate::core::value::NULL_VALUE;
use crate::core::{Result, Row, RowVec, Value};
use crate::executor::context::ExecutionContext;
use crate::executor::expression::RowFilter;
use crate::executor::hash_table::JoinHashTable;
use crate::executor::operator::{ColumnInfo, MaterializedOperator, Operator};
use crate::executor::operators::hash_join::{HashJoinOperator, JoinSide, JoinType};
use crate::executor::operators::merge_join::MergeJoinOperator;
use crate::executor::operators::nested_loop_join::NestedLoopJoinOperator;
use crate::executor::parallel::{
parallel_hash_join, ParallelConfig, DEFAULT_PARALLEL_JOIN_THRESHOLD,
};
use crate::executor::planner::{RuntimeJoinAlgorithm, RuntimeJoinDecision};
use crate::executor::utils::{extract_join_keys_and_residual, is_sorted_on_keys};
use crate::optimizer::bloom::RuntimeBloomFilter;
use crate::parser::ast::Expression;
const STREAMING_LIMIT_THRESHOLD: u64 = 1000;
#[derive(Debug)]
pub struct JoinResult {
pub rows: RowVec,
pub columns: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct JoinAnalysis {
pub left_key_indices: Vec<usize>,
pub right_key_indices: Vec<usize>,
pub residual_conditions: Vec<Expression>,
pub left_sorted: bool,
pub right_sorted: bool,
pub join_type: JoinType,
pub join_type_str: String,
}
#[derive(Debug, Clone)]
struct JoinConfig {
algorithm: RuntimeJoinAlgorithm,
build_left: bool,
}
pub struct JoinRequest<'a> {
pub left_rows: CompactArc<Vec<Row>>,
pub right_rows: CompactArc<Vec<Row>>,
pub left_columns: &'a [String],
pub right_columns: &'a [String],
pub condition: Option<&'a Expression>,
pub join_type: &'a str,
pub limit: Option<u64>,
pub ctx: &'a ExecutionContext,
pub algorithm_hint: Option<&'a RuntimeJoinDecision>,
}
pub struct StreamingJoinRequest<'a> {
pub build_rows: CompactArc<Vec<Row>>,
pub build_columns: &'a [String],
pub probe_source: Box<dyn Operator>,
pub probe_columns: Vec<String>,
pub condition: Option<&'a Expression>,
pub join_type: &'a str,
pub build_is_left: bool,
pub limit: Option<u64>,
pub ctx: &'a ExecutionContext,
pub bloom_filter: Option<RuntimeBloomFilter>,
pub pre_built_hash_table: Option<JoinHashTable>,
}
pub struct JoinExecutor {}
impl JoinExecutor {
pub fn new() -> Self {
Self {}
}
pub fn execute(&self, request: JoinRequest<'_>) -> Result<JoinResult> {
let mut all_columns = request.left_columns.to_vec();
all_columns.extend(request.right_columns.iter().cloned());
let analysis = self.analyze(
request.left_columns,
request.right_columns,
request.condition,
request.join_type,
);
let config = if let Some(hint) = request.algorithm_hint {
self.convert_runtime_decision(hint, &analysis)
} else {
self.select_algorithm(&analysis, &request.left_rows, &request.right_rows)
};
let rows = match config.algorithm {
RuntimeJoinAlgorithm::HashJoin => self.execute_hash_join(
request.left_rows,
request.right_rows,
&analysis,
request.left_columns,
request.right_columns,
config.build_left,
request.limit,
request.ctx,
)?,
RuntimeJoinAlgorithm::MergeJoin => self.execute_merge_join(
request.left_rows,
request.right_rows,
request.left_columns,
request.right_columns,
&analysis,
request.limit,
)?,
RuntimeJoinAlgorithm::NestedLoop => self.execute_nested_loop(
request.left_rows,
request.right_rows,
request.condition,
request.left_columns,
request.right_columns,
&analysis.join_type_str,
request.limit,
)?,
};
Ok(JoinResult {
rows,
columns: all_columns,
})
}
pub fn execute_streaming(&self, request: StreamingJoinRequest<'_>) -> Result<JoinResult> {
let (left_columns, right_columns) = if request.build_is_left {
(
request.build_columns.to_vec(),
request.probe_columns.clone(),
)
} else {
(
request.probe_columns.clone(),
request.build_columns.to_vec(),
)
};
let mut all_columns = left_columns.clone();
all_columns.extend(right_columns.iter().cloned());
let analysis = self.analyze(
&left_columns,
&right_columns,
request.condition,
request.join_type,
);
let probe_op = request.probe_source;
let build_side = if request.build_is_left {
JoinSide::Left
} else {
JoinSide::Right
};
let mut join_op = if let Some(hash_table) = request.pre_built_hash_table {
HashJoinOperator::with_prebuilt(
probe_op,
request.build_rows,
hash_table,
analysis.join_type,
analysis.left_key_indices.clone(),
analysis.right_key_indices.clone(),
request.build_is_left,
)
} else {
let build_schema: Vec<ColumnInfo> =
request.build_columns.iter().map(ColumnInfo::new).collect();
let build_rows_vec =
CompactArc::try_unwrap(request.build_rows).unwrap_or_else(|arc| (*arc).clone());
let build_op = Box::new(MaterializedOperator::new(build_rows_vec, build_schema));
let (left_op, right_op): (Box<dyn Operator>, Box<dyn Operator>) =
if request.build_is_left {
(build_op, probe_op)
} else {
(probe_op, build_op)
};
HashJoinOperator::new(
left_op,
right_op,
analysis.join_type,
analysis.left_key_indices.clone(),
analysis.right_key_indices.clone(),
build_side,
)
};
let is_inner = !analysis.join_type_str.contains("LEFT")
&& !analysis.join_type_str.contains("RIGHT")
&& !analysis.join_type_str.contains("FULL");
let residual_filters: Vec<RowFilter> = if is_inner {
analysis
.residual_conditions
.iter()
.map(|cond| RowFilter::new(cond, &all_columns).map(|f| f.with_context(request.ctx)))
.collect::<Result<Vec<_>>>()?
} else {
Vec::new()
};
let rows =
self.execute_operator_with_filter(&mut join_op, request.limit, &residual_filters)?;
let left_col_count = left_columns.len();
let right_col_count = right_columns.len();
let rows = if !is_inner && !analysis.residual_conditions.is_empty() {
self.apply_residual_post_join(
rows,
&analysis.residual_conditions,
&all_columns,
&analysis.join_type_str,
left_col_count,
right_col_count,
request.ctx,
)?
} else {
rows
};
Ok(JoinResult {
rows,
columns: all_columns,
})
}
fn analyze(
&self,
left_columns: &[String],
right_columns: &[String],
condition: Option<&Expression>,
join_type_str: &str,
) -> JoinAnalysis {
let join_type = JoinType::parse(join_type_str);
let (left_key_indices, right_key_indices, residual_conditions) =
if let Some(cond) = condition {
extract_join_keys_and_residual(cond, left_columns, right_columns)
} else {
(Vec::new(), Vec::new(), Vec::new())
};
JoinAnalysis {
left_key_indices,
right_key_indices,
residual_conditions,
left_sorted: false,
right_sorted: false,
join_type,
join_type_str: join_type_str.to_uppercase(),
}
}
fn select_algorithm(
&self,
analysis: &JoinAnalysis,
left_rows: &[Row],
right_rows: &[Row],
) -> JoinConfig {
let has_equality_keys = !analysis.left_key_indices.is_empty();
if !has_equality_keys {
return JoinConfig {
algorithm: RuntimeJoinAlgorithm::NestedLoop,
build_left: false,
};
}
let left_sorted = is_sorted_on_keys(left_rows, &analysis.left_key_indices);
let right_sorted =
left_sorted && is_sorted_on_keys(right_rows, &analysis.right_key_indices);
if left_sorted && right_sorted {
return JoinConfig {
algorithm: RuntimeJoinAlgorithm::MergeJoin,
build_left: false,
};
}
let join_type = &analysis.join_type_str;
let build_left = if join_type.contains("LEFT") || join_type.contains("FULL") {
false
} else if join_type.contains("RIGHT") {
true
} else {
left_rows.len() <= right_rows.len()
};
JoinConfig {
algorithm: RuntimeJoinAlgorithm::HashJoin,
build_left,
}
}
fn convert_runtime_decision(
&self,
decision: &RuntimeJoinDecision,
analysis: &JoinAnalysis,
) -> JoinConfig {
let build_left = match decision.algorithm {
RuntimeJoinAlgorithm::HashJoin => {
let join_type = &analysis.join_type_str;
if join_type.contains("LEFT") || join_type.contains("FULL") {
false
} else if join_type.contains("RIGHT") {
true
} else {
!decision.swap_sides
}
}
_ => false, };
JoinConfig {
algorithm: decision.algorithm,
build_left,
}
}
#[allow(clippy::too_many_arguments)]
fn execute_hash_join(
&self,
left_rows: CompactArc<Vec<Row>>,
right_rows: CompactArc<Vec<Row>>,
analysis: &JoinAnalysis,
left_columns: &[String],
right_columns: &[String],
build_left: bool,
limit: Option<u64>,
ctx: &ExecutionContext,
) -> Result<RowVec> {
let left_col_count = left_columns.len();
let right_col_count = right_columns.len();
let mut all_columns = left_columns.to_vec();
all_columns.extend(right_columns.iter().cloned());
let build_row_count = if build_left {
left_rows.len()
} else {
right_rows.len()
};
let use_parallel = build_row_count >= DEFAULT_PARALLEL_JOIN_THRESHOLD
&& limit.is_none_or(|l| l > STREAMING_LIMIT_THRESHOLD);
if use_parallel {
self.execute_hash_join_parallel(
left_rows,
right_rows,
analysis,
left_col_count,
right_col_count,
&all_columns,
build_left,
limit,
ctx,
)
} else {
self.execute_hash_join_streaming(
left_rows,
right_rows,
analysis,
left_columns,
right_columns,
left_col_count,
right_col_count,
&all_columns,
build_left,
limit,
ctx,
)
}
}
#[allow(clippy::too_many_arguments)]
fn execute_hash_join_parallel(
&self,
left_rows: CompactArc<Vec<Row>>,
right_rows: CompactArc<Vec<Row>>,
analysis: &JoinAnalysis,
left_col_count: usize,
right_col_count: usize,
all_columns: &[String],
build_left: bool,
limit: Option<u64>,
ctx: &ExecutionContext,
) -> Result<RowVec> {
let config = ParallelConfig::default();
let (probe_slice, build_slice, probe_key_indices, build_key_indices, swapped) =
if build_left {
(
right_rows.as_slice(),
left_rows.as_slice(),
&analysis.right_key_indices,
&analysis.left_key_indices,
true, )
} else {
(
left_rows.as_slice(),
right_rows.as_slice(),
&analysis.left_key_indices,
&analysis.right_key_indices,
false, )
};
let (probe_col_count, build_col_count) = if swapped {
(right_col_count, left_col_count)
} else {
(left_col_count, right_col_count)
};
let result = parallel_hash_join(
probe_slice,
build_slice,
probe_key_indices,
build_key_indices,
analysis.join_type,
probe_col_count,
build_col_count,
swapped,
&config,
);
let mut rows: RowVec = result
.rows
.into_iter()
.enumerate()
.map(|(i, row)| (i as i64, row))
.collect();
let is_inner = !analysis.join_type_str.contains("LEFT")
&& !analysis.join_type_str.contains("RIGHT")
&& !analysis.join_type_str.contains("FULL");
if !analysis.residual_conditions.is_empty() {
if is_inner {
for cond in &analysis.residual_conditions {
let filter = RowFilter::new(cond, all_columns)?.with_context(ctx);
filter.retain_checked(&mut rows)?;
}
} else {
rows = self.apply_residual_post_join(
rows,
&analysis.residual_conditions,
all_columns,
&analysis.join_type_str,
left_col_count,
right_col_count,
ctx,
)?;
}
}
if let Some(max) = limit {
rows.truncate(max as usize);
}
Ok(rows)
}
#[allow(clippy::too_many_arguments)]
fn execute_hash_join_streaming(
&self,
left_rows: CompactArc<Vec<Row>>,
right_rows: CompactArc<Vec<Row>>,
analysis: &JoinAnalysis,
left_columns: &[String],
right_columns: &[String],
left_col_count: usize,
right_col_count: usize,
all_columns: &[String],
build_left: bool,
limit: Option<u64>,
ctx: &ExecutionContext,
) -> Result<RowVec> {
let left_schema: Vec<ColumnInfo> = left_columns.iter().map(ColumnInfo::new).collect();
let right_schema: Vec<ColumnInfo> = right_columns.iter().map(ColumnInfo::new).collect();
let left_op = Box::new(MaterializedOperator::from_arc(left_rows, left_schema));
let right_op = Box::new(MaterializedOperator::from_arc(right_rows, right_schema));
let build_side = if build_left {
JoinSide::Left
} else {
JoinSide::Right
};
let mut join_op = HashJoinOperator::new(
left_op,
right_op,
analysis.join_type,
analysis.left_key_indices.clone(),
analysis.right_key_indices.clone(),
build_side,
);
let is_inner = !analysis.join_type_str.contains("LEFT")
&& !analysis.join_type_str.contains("RIGHT")
&& !analysis.join_type_str.contains("FULL");
let residual_filters: Vec<RowFilter> = if is_inner {
analysis
.residual_conditions
.iter()
.map(|cond| RowFilter::new(cond, all_columns).map(|f| f.with_context(ctx)))
.collect::<Result<Vec<_>>>()?
} else {
Vec::new()
};
let rows = self.execute_operator_with_filter(&mut join_op, limit, &residual_filters)?;
let rows = if !is_inner && !analysis.residual_conditions.is_empty() {
self.apply_residual_post_join(
rows,
&analysis.residual_conditions,
all_columns,
&analysis.join_type_str,
left_col_count,
right_col_count,
ctx,
)?
} else {
rows
};
Ok(rows)
}
fn execute_merge_join(
&self,
left_rows: CompactArc<Vec<Row>>,
right_rows: CompactArc<Vec<Row>>,
left_columns: &[String],
right_columns: &[String],
analysis: &JoinAnalysis,
limit: Option<u64>,
) -> Result<RowVec> {
let left_schema: Vec<ColumnInfo> = left_columns.iter().map(ColumnInfo::new).collect();
let right_schema: Vec<ColumnInfo> = right_columns.iter().map(ColumnInfo::new).collect();
let left_vec = CompactArc::try_unwrap(left_rows).unwrap_or_else(|arc| (*arc).clone());
let right_vec = CompactArc::try_unwrap(right_rows).unwrap_or_else(|arc| (*arc).clone());
let left_op = Box::new(MaterializedOperator::new(left_vec, left_schema));
let right_op = Box::new(MaterializedOperator::new(right_vec, right_schema));
let mut merge_op = MergeJoinOperator::new(
left_op,
right_op,
analysis.join_type,
analysis.left_key_indices.clone(),
analysis.right_key_indices.clone(),
);
self.execute_operator_with_filter(&mut merge_op, limit, &[])
}
#[allow(clippy::too_many_arguments)]
fn execute_nested_loop(
&self,
left_rows: CompactArc<Vec<Row>>,
right_rows: CompactArc<Vec<Row>>,
condition: Option<&Expression>,
left_columns: &[String],
right_columns: &[String],
join_type_str: &str,
limit: Option<u64>,
) -> Result<RowVec> {
let left_schema: Vec<ColumnInfo> = left_columns.iter().map(ColumnInfo::new).collect();
let right_schema: Vec<ColumnInfo> = right_columns.iter().map(ColumnInfo::new).collect();
let left_vec = CompactArc::try_unwrap(left_rows).unwrap_or_else(|arc| (*arc).clone());
let right_vec = CompactArc::try_unwrap(right_rows).unwrap_or_else(|arc| (*arc).clone());
let left_op = Box::new(MaterializedOperator::new(left_vec, left_schema));
let right_op = Box::new(MaterializedOperator::new(right_vec, right_schema));
let join_type = if join_type_str.contains("CROSS") {
JoinType::Cross
} else if join_type_str.contains("FULL") {
JoinType::Full
} else if join_type_str.contains("RIGHT") {
JoinType::Right
} else if join_type_str.contains("LEFT") {
JoinType::Left
} else {
JoinType::Inner
};
let mut nl_op =
NestedLoopJoinOperator::new(left_op, right_op, join_type, condition.cloned());
self.execute_operator_with_filter(&mut nl_op, limit, &[])
}
fn execute_operator_with_filter(
&self,
op: &mut dyn Operator,
limit: Option<u64>,
residual_filters: &[RowFilter],
) -> Result<RowVec> {
op.open()?;
let max_rows = limit.map(|l| l as usize).unwrap_or(usize::MAX);
let mut rows = RowVec::with_capacity(max_rows.min(1000));
let mut row_id = 0i64;
let has_filters = !residual_filters.is_empty();
while let Some(row_ref) = op.next()? {
let row = row_ref.into_owned();
if has_filters {
let pass = match residual_filters.len() {
1 => residual_filters[0].matches_checked(&row)?,
2 => {
residual_filters[0].matches_checked(&row)?
&& residual_filters[1].matches_checked(&row)?
}
3 => {
residual_filters[0].matches_checked(&row)?
&& residual_filters[1].matches_checked(&row)?
&& residual_filters[2].matches_checked(&row)?
}
4 => {
residual_filters[0].matches_checked(&row)?
&& residual_filters[1].matches_checked(&row)?
&& residual_filters[2].matches_checked(&row)?
&& residual_filters[3].matches_checked(&row)?
}
_ => {
let mut all_pass = true;
for f in residual_filters {
if !f.matches_checked(&row)? {
all_pass = false;
break;
}
}
all_pass
}
};
if !pass {
continue;
}
}
rows.push((row_id, row));
row_id += 1;
if rows.len() >= max_rows {
break;
}
}
op.close()?;
Ok(rows)
}
#[allow(clippy::too_many_arguments)]
fn apply_residual_post_join(
&self,
mut rows: RowVec,
residual: &[Expression],
all_columns: &[String],
join_type: &str,
left_col_count: usize,
right_col_count: usize,
ctx: &ExecutionContext,
) -> Result<RowVec> {
let is_left_outer = join_type.contains("LEFT");
let is_right_outer = join_type.contains("RIGHT");
let is_full_outer = join_type.contains("FULL");
for cond in residual {
let filter = RowFilter::new(cond, all_columns)?.with_context(ctx);
if is_left_outer || is_right_outer || is_full_outer {
let mut new_rows = RowVec::with_capacity(rows.len());
for (row_id, row) in rows {
if filter.matches_checked(&row)? {
new_rows.push((row_id, row));
} else {
if is_left_outer {
let mut new_values: CompactVec<Value> =
CompactVec::with_capacity(left_col_count + right_col_count);
new_values.extend(row.iter().take(left_col_count).cloned());
new_values.extend(std::iter::repeat_n(NULL_VALUE, right_col_count));
new_rows.push((row_id, Row::from_compact_vec(new_values)));
} else if is_right_outer {
let mut new_values: CompactVec<Value> =
CompactVec::with_capacity(left_col_count + right_col_count);
new_values.extend(std::iter::repeat_n(NULL_VALUE, left_col_count));
new_values.extend(row.iter().skip(left_col_count).cloned());
new_rows.push((row_id, Row::from_compact_vec(new_values)));
} else {
new_rows.push((row_id, row));
}
}
}
rows = new_rows;
} else {
filter.retain_checked(&mut rows)?;
}
}
Ok(rows)
}
}
impl Default for JoinExecutor {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_rows(data: Vec<Vec<i64>>) -> Vec<Row> {
data.into_iter()
.map(|vals| Row::from_values(vals.into_iter().map(Value::integer).collect()))
.collect()
}
#[test]
fn test_inner_join() {
let executor = JoinExecutor::new();
let ctx = ExecutionContext::new();
let left = make_rows(vec![vec![1, 10], vec![2, 20], vec![3, 30]]);
let right = make_rows(vec![vec![1, 100], vec![3, 300]]);
let left_cols = vec!["a.id".to_string(), "a.val".to_string()];
let right_cols = vec!["b.id".to_string(), "b.data".to_string()];
use crate::parser::ast::{Identifier, InfixExpression};
use crate::parser::token::{Position, Token, TokenType};
let cond = Expression::Infix(InfixExpression::new(
Token::new(TokenType::Operator, "=", Position::default()),
Box::new(Expression::Identifier(Identifier::new(
Token::new(TokenType::Identifier, "a.id", Position::default()),
"a.id".to_string(),
))),
"=".to_string(),
Box::new(Expression::Identifier(Identifier::new(
Token::new(TokenType::Identifier, "b.id", Position::default()),
"b.id".to_string(),
))),
));
let request = JoinRequest {
left_rows: CompactArc::new(left),
right_rows: CompactArc::new(right),
left_columns: &left_cols,
right_columns: &right_cols,
condition: Some(&cond),
join_type: "INNER",
limit: None,
ctx: &ctx,
algorithm_hint: None,
};
let result = executor.execute(request).unwrap();
assert_eq!(result.rows.len(), 2);
assert_eq!(result.columns.len(), 4);
}
#[test]
fn test_left_join() {
let executor = JoinExecutor::new();
let ctx = ExecutionContext::new();
let left = make_rows(vec![vec![1, 10], vec![2, 20], vec![3, 30]]);
let right = make_rows(vec![vec![1, 100]]);
let left_cols = vec!["a.id".to_string(), "a.val".to_string()];
let right_cols = vec!["b.id".to_string(), "b.data".to_string()];
use crate::parser::ast::{Identifier, InfixExpression};
use crate::parser::token::{Position, Token, TokenType};
let cond = Expression::Infix(InfixExpression::new(
Token::new(TokenType::Operator, "=", Position::default()),
Box::new(Expression::Identifier(Identifier::new(
Token::new(TokenType::Identifier, "a.id", Position::default()),
"a.id".to_string(),
))),
"=".to_string(),
Box::new(Expression::Identifier(Identifier::new(
Token::new(TokenType::Identifier, "b.id", Position::default()),
"b.id".to_string(),
))),
));
let request = JoinRequest {
left_rows: CompactArc::new(left),
right_rows: CompactArc::new(right),
left_columns: &left_cols,
right_columns: &right_cols,
condition: Some(&cond),
join_type: "LEFT",
limit: None,
ctx: &ctx,
algorithm_hint: None,
};
let result = executor.execute(request).unwrap();
assert_eq!(result.rows.len(), 3);
}
#[test]
fn test_early_termination() {
let executor = JoinExecutor::new();
let ctx = ExecutionContext::new();
let left = make_rows(vec![vec![1], vec![2], vec![3]]);
let right = make_rows(vec![vec![1], vec![2], vec![3]]);
let left_cols = vec!["a.id".to_string()];
let right_cols = vec!["b.id".to_string()];
use crate::parser::ast::{Identifier, InfixExpression};
use crate::parser::token::{Position, Token, TokenType};
let cond = Expression::Infix(InfixExpression::new(
Token::new(TokenType::Operator, "=", Position::default()),
Box::new(Expression::Identifier(Identifier::new(
Token::new(TokenType::Identifier, "a.id", Position::default()),
"a.id".to_string(),
))),
"=".to_string(),
Box::new(Expression::Identifier(Identifier::new(
Token::new(TokenType::Identifier, "b.id", Position::default()),
"b.id".to_string(),
))),
));
let request = JoinRequest {
left_rows: CompactArc::new(left),
right_rows: CompactArc::new(right),
left_columns: &left_cols,
right_columns: &right_cols,
condition: Some(&cond),
join_type: "INNER",
limit: Some(1), ctx: &ctx,
algorithm_hint: None,
};
let result = executor.execute(request).unwrap();
assert_eq!(result.rows.len(), 1);
}
#[test]
fn test_cross_join() {
let executor = JoinExecutor::new();
let ctx = ExecutionContext::new();
let left = make_rows(vec![vec![1], vec![2]]);
let right = make_rows(vec![vec![10], vec![20]]);
let left_cols = vec!["a.id".to_string()];
let right_cols = vec!["b.val".to_string()];
let request = JoinRequest {
left_rows: CompactArc::new(left),
right_rows: CompactArc::new(right),
left_columns: &left_cols,
right_columns: &right_cols,
condition: None,
join_type: "CROSS",
limit: None,
ctx: &ctx,
algorithm_hint: None,
};
let result = executor.execute(request).unwrap();
assert_eq!(result.rows.len(), 4);
}
}