use datafusion::logical_expr::{Join, JoinType};
use datafusion::prelude::*;
use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::QueryOp;
use crate::control::planner::physical::PhysicalTask;
use crate::types::{TenantId, VShardId};
use super::search::extract_table_name;
pub(super) fn convert_join(join: &Join, tenant_id: TenantId) -> crate::Result<Vec<PhysicalTask>> {
let left_collection =
extract_table_name(&join.left).ok_or_else(|| crate::Error::PlanError {
detail: "JOIN left side must be a table scan".into(),
})?;
let right_collection =
extract_table_name(&join.right).ok_or_else(|| crate::Error::PlanError {
detail: "JOIN right side must be a table scan".into(),
})?;
let join_type_str = match join.join_type {
JoinType::Inner => "inner",
JoinType::Left => "left",
JoinType::Right => "right",
JoinType::Full => "full",
other => {
return Err(crate::Error::PlanError {
detail: format!("{other:?} JOIN is not supported"),
});
}
};
let mut on_keys = Vec::with_capacity(join.on.len());
for (left_expr, right_expr) in &join.on {
let left_col = match left_expr {
Expr::Column(col) => col.name.clone(),
_ => {
return Err(crate::Error::PlanError {
detail: "JOIN ON must be column = column".into(),
});
}
};
let right_col = match right_expr {
Expr::Column(col) => col.name.clone(),
_ => {
return Err(crate::Error::PlanError {
detail: "JOIN ON must be column = column".into(),
});
}
};
on_keys.push((left_col, right_col));
}
if on_keys.is_empty() {
let vshard = VShardId::from_collection(&left_collection);
let condition = if let Some(filter) = &join.filter {
let filters = super::extract::expr_to_scan_filters(filter);
rmp_serde::to_vec_named(&filters).map_err(|e| crate::Error::Serialization {
format: "msgpack".into(),
detail: format!("join condition serialization: {e}"),
})?
} else {
Vec::new()
};
return Ok(vec![PhysicalTask {
tenant_id,
vshard_id: vshard,
plan: PhysicalPlan::Query(QueryOp::NestedLoopJoin {
left_collection,
right_collection,
condition,
join_type: join_type_str.to_string(),
limit: 1000,
}),
}]);
}
let left_vshard = VShardId::from_collection(&left_collection);
let right_vshard = VShardId::from_collection(&right_collection);
if left_vshard == right_vshard {
return Ok(vec![PhysicalTask {
tenant_id,
vshard_id: left_vshard,
plan: PhysicalPlan::Query(QueryOp::HashJoin {
left_collection,
right_collection,
on: on_keys,
join_type: join_type_str.to_string(),
limit: 1000,
}),
}]);
}
Ok(vec![PhysicalTask {
tenant_id,
vshard_id: left_vshard,
plan: PhysicalPlan::Query(QueryOp::HashJoin {
left_collection,
right_collection,
on: on_keys,
join_type: join_type_str.to_string(),
limit: 1000,
}),
}])
}