Skip to main content

reifydb_engine/flow/compiler/operator/
join.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use reifydb_core::{
5	common::JoinType::{self, Inner, Left},
6	interface::catalog::flow::FlowNodeId,
7	row::JoinTtl,
8};
9use reifydb_rql::{
10	expression::Expression,
11	flow::node::FlowNodeType,
12	nodes::{JoinInnerNode, JoinLeftNode},
13	query::QueryPlan,
14};
15use reifydb_transaction::transaction::Transaction;
16use reifydb_value::Result;
17
18use crate::flow::compiler::{CompileOperator, FlowCompiler};
19
20pub(crate) struct JoinCompiler {
21	pub join_type: JoinType,
22	pub left: Box<QueryPlan>,
23	pub right: Box<QueryPlan>,
24	pub on: Vec<Expression>,
25	pub alias: Option<String>,
26	pub ttl: Option<JoinTtl>,
27	pub snapshot: bool,
28}
29
30impl From<JoinInnerNode> for JoinCompiler {
31	fn from(node: JoinInnerNode) -> Self {
32		Self {
33			join_type: Inner,
34			left: node.left,
35			right: node.right,
36			on: node.on,
37			alias: node.alias.map(|f| f.text().to_string()),
38			ttl: node.ttl,
39			snapshot: node.snapshot,
40		}
41	}
42}
43
44impl From<JoinLeftNode> for JoinCompiler {
45	fn from(node: JoinLeftNode) -> Self {
46		Self {
47			join_type: Left,
48			left: node.left,
49			right: node.right,
50			on: node.on,
51			alias: node.alias.map(|f| f.text().to_string()),
52			ttl: node.ttl,
53			snapshot: node.snapshot,
54		}
55	}
56}
57
58fn extract_source_name(plan: &QueryPlan) -> Option<String> {
59	match plan {
60		QueryPlan::TableScan(node) => Some(node.source.def().name.clone()),
61		QueryPlan::ViewScan(node) => Some(node.source.def().name().to_string()),
62		QueryPlan::RingBufferScan(node) => Some(node.source.def().name.clone()),
63		QueryPlan::DictionaryScan(node) => Some(node.source.def().name.clone()),
64
65		QueryPlan::Filter(node) => extract_source_name(&node.input),
66		QueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_source_name(p)),
67		QueryPlan::Take(node) => extract_source_name(&node.input),
68		_ => None,
69	}
70}
71
72fn collect_equal_conditions(expr: &Expression, out: &mut Vec<Expression>) {
73	match expr {
74		Expression::And(and) => {
75			collect_equal_conditions(&and.left, out);
76			collect_equal_conditions(&and.right, out);
77		}
78		other => out.push(other.clone()),
79	}
80}
81
82fn extract_join_keys(conditions: &[Expression]) -> (Vec<Expression>, Vec<Expression>) {
83	let mut left_keys = Vec::new();
84	let mut right_keys = Vec::new();
85
86	let mut flat = Vec::new();
87	for condition in conditions {
88		collect_equal_conditions(condition, &mut flat);
89	}
90
91	for condition in flat {
92		match condition {
93			Expression::Equal(eq) => {
94				left_keys.push(*eq.left.clone());
95				right_keys.push(*eq.right.clone());
96			}
97			_ => {
98				left_keys.push(condition.clone());
99				right_keys.push(condition.clone());
100			}
101		}
102	}
103
104	(left_keys, right_keys)
105}
106
107impl CompileOperator for JoinCompiler {
108	fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
109		let source_name = extract_source_name(&self.right);
110
111		let left_node = compiler.compile_plan(txn, *self.left)?;
112		let right_node = compiler.compile_plan(txn, *self.right)?;
113
114		let (left_keys, right_keys) = extract_join_keys(&self.on);
115
116		let effective_alias = self.alias.or(source_name).or_else(|| Some("other".to_string()));
117
118		let node_id = compiler.add_node(
119			txn,
120			FlowNodeType::Join {
121				join_type: self.join_type,
122				left: left_keys,
123				right: right_keys,
124				alias: effective_alias,
125				snapshot: self.snapshot,
126			},
127		)?;
128
129		compiler.write_operator_settings_join(txn, node_id, self.ttl)?;
130
131		compiler.add_edge(txn, &left_node, &node_id)?;
132		compiler.add_edge(txn, &right_node, &node_id)?;
133
134		Ok(node_id)
135	}
136}