use reifydb_core::{
common::JoinType::{self, Inner, Left},
interface::catalog::flow::FlowNodeId,
};
use reifydb_rql::{
expression::Expression,
flow::node::FlowNodeType,
nodes::{JoinInnerNode, JoinLeftNode},
query::QueryPlan,
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::Result;
use crate::flow::compiler::{CompileOperator, FlowCompiler};
pub(crate) struct JoinCompiler {
pub join_type: JoinType,
pub left: Box<QueryPlan>,
pub right: Box<QueryPlan>,
pub on: Vec<Expression>,
pub alias: Option<String>,
}
impl From<JoinInnerNode> for JoinCompiler {
fn from(node: JoinInnerNode) -> Self {
Self {
join_type: Inner,
left: node.left,
right: node.right,
on: node.on,
alias: node.alias.map(|f| f.text().to_string()),
}
}
}
impl From<JoinLeftNode> for JoinCompiler {
fn from(node: JoinLeftNode) -> Self {
Self {
join_type: Left,
left: node.left,
right: node.right,
on: node.on,
alias: node.alias.map(|f| f.text().to_string()),
}
}
}
fn extract_source_name(plan: &QueryPlan) -> Option<String> {
match plan {
QueryPlan::TableScan(node) => Some(node.source.def().name.clone()),
QueryPlan::ViewScan(node) => Some(node.source.def().name().to_string()),
QueryPlan::RingBufferScan(node) => Some(node.source.def().name.clone()),
QueryPlan::DictionaryScan(node) => Some(node.source.def().name.clone()),
QueryPlan::Filter(node) => extract_source_name(&node.input),
QueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_source_name(p)),
QueryPlan::Take(node) => extract_source_name(&node.input),
_ => None,
}
}
fn collect_equal_conditions(expr: &Expression, out: &mut Vec<Expression>) {
match expr {
Expression::And(and) => {
collect_equal_conditions(&and.left, out);
collect_equal_conditions(&and.right, out);
}
other => out.push(other.clone()),
}
}
fn extract_join_keys(conditions: &[Expression]) -> (Vec<Expression>, Vec<Expression>) {
let mut left_keys = Vec::new();
let mut right_keys = Vec::new();
let mut flat = Vec::new();
for condition in conditions {
collect_equal_conditions(condition, &mut flat);
}
for condition in flat {
match condition {
Expression::Equal(eq) => {
left_keys.push(*eq.left.clone());
right_keys.push(*eq.right.clone());
}
_ => {
left_keys.push(condition.clone());
right_keys.push(condition.clone());
}
}
}
(left_keys, right_keys)
}
impl CompileOperator for JoinCompiler {
fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
let source_name = extract_source_name(&self.right);
let left_node = compiler.compile_plan(txn, *self.left)?;
let right_node = compiler.compile_plan(txn, *self.right)?;
let (left_keys, right_keys) = extract_join_keys(&self.on);
let effective_alias = self.alias.or(source_name).or_else(|| Some("other".to_string()));
let node_id = compiler.add_node(
txn,
FlowNodeType::Join {
join_type: self.join_type,
left: left_keys,
right: right_keys,
alias: effective_alias,
},
)?;
compiler.add_edge(txn, &left_node, &node_id)?;
compiler.add_edge(txn, &right_node, &node_id)?;
Ok(node_id)
}
}