reifydb_engine/flow/compiler/operator/
join.rs1use reifydb_core::{
5 common::JoinType::{self, Inner, Left},
6 interface::catalog::flow::FlowNodeId,
7 row::JoinTtl,
8};
9use reifydb_rql::{
10 expression::Expression,
11 flow::node::FlowNodeType,
12 nodes::{JoinInnerNode, JoinLeftNode},
13 query::QueryPlan,
14};
15use reifydb_transaction::transaction::Transaction;
16use reifydb_value::Result;
17
18use crate::flow::compiler::{CompileOperator, FlowCompiler};
19
20pub(crate) struct JoinCompiler {
21 pub join_type: JoinType,
22 pub left: Box<QueryPlan>,
23 pub right: Box<QueryPlan>,
24 pub on: Vec<Expression>,
25 pub alias: Option<String>,
26 pub ttl: Option<JoinTtl>,
27 pub snapshot: bool,
28}
29
30impl From<JoinInnerNode> for JoinCompiler {
31 fn from(node: JoinInnerNode) -> Self {
32 Self {
33 join_type: Inner,
34 left: node.left,
35 right: node.right,
36 on: node.on,
37 alias: node.alias.map(|f| f.text().to_string()),
38 ttl: node.ttl,
39 snapshot: node.snapshot,
40 }
41 }
42}
43
44impl From<JoinLeftNode> for JoinCompiler {
45 fn from(node: JoinLeftNode) -> Self {
46 Self {
47 join_type: Left,
48 left: node.left,
49 right: node.right,
50 on: node.on,
51 alias: node.alias.map(|f| f.text().to_string()),
52 ttl: node.ttl,
53 snapshot: node.snapshot,
54 }
55 }
56}
57
58fn extract_source_name(plan: &QueryPlan) -> Option<String> {
59 match plan {
60 QueryPlan::TableScan(node) => Some(node.source.def().name.clone()),
61 QueryPlan::ViewScan(node) => Some(node.source.def().name().to_string()),
62 QueryPlan::RingBufferScan(node) => Some(node.source.def().name.clone()),
63 QueryPlan::DictionaryScan(node) => Some(node.source.def().name.clone()),
64
65 QueryPlan::Filter(node) => extract_source_name(&node.input),
66 QueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_source_name(p)),
67 QueryPlan::Take(node) => extract_source_name(&node.input),
68 _ => None,
69 }
70}
71
72fn collect_equal_conditions(expr: &Expression, out: &mut Vec<Expression>) {
73 match expr {
74 Expression::And(and) => {
75 collect_equal_conditions(&and.left, out);
76 collect_equal_conditions(&and.right, out);
77 }
78 other => out.push(other.clone()),
79 }
80}
81
82fn extract_join_keys(conditions: &[Expression]) -> (Vec<Expression>, Vec<Expression>) {
83 let mut left_keys = Vec::new();
84 let mut right_keys = Vec::new();
85
86 let mut flat = Vec::new();
87 for condition in conditions {
88 collect_equal_conditions(condition, &mut flat);
89 }
90
91 for condition in flat {
92 match condition {
93 Expression::Equal(eq) => {
94 left_keys.push(*eq.left.clone());
95 right_keys.push(*eq.right.clone());
96 }
97 _ => {
98 left_keys.push(condition.clone());
99 right_keys.push(condition.clone());
100 }
101 }
102 }
103
104 (left_keys, right_keys)
105}
106
107impl CompileOperator for JoinCompiler {
108 fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
109 let source_name = extract_source_name(&self.right);
110
111 let left_node = compiler.compile_plan(txn, *self.left)?;
112 let right_node = compiler.compile_plan(txn, *self.right)?;
113
114 let (left_keys, right_keys) = extract_join_keys(&self.on);
115
116 let effective_alias = self.alias.or(source_name).or_else(|| Some("other".to_string()));
117
118 let node_id = compiler.add_node(
119 txn,
120 FlowNodeType::Join {
121 join_type: self.join_type,
122 left: left_keys,
123 right: right_keys,
124 alias: effective_alias,
125 snapshot: self.snapshot,
126 },
127 )?;
128
129 compiler.write_operator_settings_join(txn, node_id, self.ttl)?;
130
131 compiler.add_edge(txn, &left_node, &node_id)?;
132 compiler.add_edge(txn, &right_node, &node_id)?;
133
134 Ok(node_id)
135 }
136}