Skip to main content

reifydb_engine/flow/compiler/operator/
join.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_catalog::store::ttl::create::create_operator_ttl;
5use reifydb_core::{
6	common::JoinType::{self, Inner, Left},
7	interface::catalog::flow::FlowNodeId,
8	row::Ttl,
9};
10use reifydb_rql::{
11	expression::Expression,
12	flow::node::FlowNodeType,
13	nodes::{JoinInnerNode, JoinLeftNode},
14	query::QueryPlan,
15};
16use reifydb_transaction::transaction::Transaction;
17use reifydb_type::Result;
18
19use crate::flow::compiler::{CompileOperator, FlowCompiler};
20
21pub(crate) struct JoinCompiler {
22	pub join_type: JoinType,
23	pub left: Box<QueryPlan>,
24	pub right: Box<QueryPlan>,
25	pub on: Vec<Expression>,
26	pub alias: Option<String>,
27	pub ttl: Option<Ttl>,
28}
29
30impl From<JoinInnerNode> for JoinCompiler {
31	fn from(node: JoinInnerNode) -> Self {
32		Self {
33			join_type: Inner,
34			left: node.left,
35			right: node.right,
36			on: node.on,
37			alias: node.alias.map(|f| f.text().to_string()),
38			ttl: node.ttl,
39		}
40	}
41}
42
43impl From<JoinLeftNode> for JoinCompiler {
44	fn from(node: JoinLeftNode) -> Self {
45		Self {
46			join_type: Left,
47			left: node.left,
48			right: node.right,
49			on: node.on,
50			alias: node.alias.map(|f| f.text().to_string()),
51			ttl: node.ttl,
52		}
53	}
54}
55
56fn extract_source_name(plan: &QueryPlan) -> Option<String> {
57	match plan {
58		QueryPlan::TableScan(node) => Some(node.source.def().name.clone()),
59		QueryPlan::ViewScan(node) => Some(node.source.def().name().to_string()),
60		QueryPlan::RingBufferScan(node) => Some(node.source.def().name.clone()),
61		QueryPlan::DictionaryScan(node) => Some(node.source.def().name.clone()),
62
63		QueryPlan::Filter(node) => extract_source_name(&node.input),
64		QueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_source_name(p)),
65		QueryPlan::Take(node) => extract_source_name(&node.input),
66		_ => None,
67	}
68}
69
70fn collect_equal_conditions(expr: &Expression, out: &mut Vec<Expression>) {
71	match expr {
72		Expression::And(and) => {
73			collect_equal_conditions(&and.left, out);
74			collect_equal_conditions(&and.right, out);
75		}
76		other => out.push(other.clone()),
77	}
78}
79
80fn extract_join_keys(conditions: &[Expression]) -> (Vec<Expression>, Vec<Expression>) {
81	let mut left_keys = Vec::new();
82	let mut right_keys = Vec::new();
83
84	let mut flat = Vec::new();
85	for condition in conditions {
86		collect_equal_conditions(condition, &mut flat);
87	}
88
89	for condition in flat {
90		match condition {
91			Expression::Equal(eq) => {
92				left_keys.push(*eq.left.clone());
93				right_keys.push(*eq.right.clone());
94			}
95			_ => {
96				left_keys.push(condition.clone());
97				right_keys.push(condition.clone());
98			}
99		}
100	}
101
102	(left_keys, right_keys)
103}
104
105impl CompileOperator for JoinCompiler {
106	fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
107		let source_name = extract_source_name(&self.right);
108
109		let left_node = compiler.compile_plan(txn, *self.left)?;
110		let right_node = compiler.compile_plan(txn, *self.right)?;
111
112		let (left_keys, right_keys) = extract_join_keys(&self.on);
113
114		let effective_alias = self.alias.or(source_name).or_else(|| Some("other".to_string()));
115
116		let ttl = self.ttl.clone();
117		let node_id = compiler.add_node(
118			txn,
119			FlowNodeType::Join {
120				join_type: self.join_type,
121				left: left_keys,
122				right: right_keys,
123				alias: effective_alias,
124				ttl: self.ttl,
125			},
126		)?;
127
128		if let Some(ttl) = ttl
129			&& let Transaction::Admin(admin) = txn
130		{
131			create_operator_ttl(admin, node_id, &ttl)?;
132		}
133
134		compiler.add_edge(txn, &left_node, &node_id)?;
135		compiler.add_edge(txn, &right_node, &node_id)?;
136
137		Ok(node_id)
138	}
139}