Skip to main content

reifydb_engine/vm/volcano/join/
common.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use postcard::to_stdvec;
7use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns};
8use reifydb_runtime::hash::{Hash128, xxh3_128};
9use reifydb_transaction::transaction::Transaction;
10use reifydb_type::{fragment::Fragment, value::Value};
11
12use crate::{
13	Result,
14	expression::{compile::CompiledExpr, context::EvalContext},
15	vm::volcano::query::{QueryContext, QueryNode},
16};
17
18/// Load and merge all batches from a node into a single Columns
19pub(crate) fn load_and_merge_all<'a>(
20	node: &mut Box<dyn QueryNode>,
21	rx: &mut Transaction<'a>,
22	ctx: &mut QueryContext,
23) -> Result<Columns> {
24	let mut result: Option<Columns> = None;
25
26	while let Some(columns) = node.next(rx, ctx)? {
27		if let Some(mut acc) = result.take() {
28			acc.append_columns(columns)?;
29			result = Some(acc);
30		} else {
31			result = Some(columns);
32		}
33	}
34	let result = result.unwrap_or_else(Columns::empty);
35	Ok(result)
36}
37
38/// Result of resolving column names for joins
39pub struct ResolvedColumnNames {
40	pub qualified_names: Vec<String>,
41}
42
43/// Resolve column name conflicts between left and right tables
44pub fn resolve_column_names(
45	left_columns: &Columns,
46	right_columns: &Columns,
47	alias: &Option<Fragment>,
48	excluded_right_indices: Option<&[usize]>,
49) -> ResolvedColumnNames {
50	let mut qualified_names = Vec::new();
51
52	// Add left columns (never prefixed)
53	for col in left_columns.iter() {
54		qualified_names.push(col.name().text().to_string());
55	}
56
57	// Add right columns with ALWAYS-prefix behavior
58	for (idx, col) in right_columns.iter().enumerate() {
59		// Skip excluded columns (used in natural join)
60		if let Some(excluded) = excluded_right_indices
61			&& excluded.contains(&idx)
62		{
63			continue;
64		}
65
66		let col_name = col.name().text();
67
68		// ALWAYS prefix right columns with alias (should always be Some now)
69		let alias_text = alias.as_ref().map(|a| a.text()).unwrap_or("other");
70		let prefixed_name = format!("{}_{}", alias_text, col_name);
71
72		// Check for secondary conflict (prefixed name already exists)
73		let mut final_name = prefixed_name.clone();
74		if qualified_names.contains(&final_name) {
75			let mut counter = 2;
76			loop {
77				let candidate = format!("{}_{}", prefixed_name, counter);
78				if !qualified_names.contains(&candidate) {
79					final_name = candidate;
80					break;
81				}
82				counter += 1;
83			}
84		}
85
86		qualified_names.push(final_name);
87	}
88
89	ResolvedColumnNames {
90		qualified_names,
91	}
92}
93
94/// Build evaluation columns for join conditions
95pub fn build_eval_columns(
96	left_columns: &Columns,
97	right_columns: &Columns,
98	left_row: &[Value],
99	right_row: &[Value],
100	alias: &Option<Fragment>,
101) -> Vec<ColumnWithName> {
102	let mut eval_columns = Vec::new();
103
104	for (idx, col) in left_columns.iter().enumerate() {
105		let data = match &left_row[idx] {
106			Value::None {
107				..
108			} => ColumnBuffer::typed_none(&col.get_type()),
109			value => ColumnBuffer::from(value.clone()),
110		};
111		eval_columns.push(ColumnWithName::new(col.name().clone(), data));
112	}
113
114	for (idx, col) in right_columns.iter().enumerate() {
115		let data = match &right_row[idx] {
116			Value::None {
117				..
118			} => ColumnBuffer::typed_none(&col.get_type()),
119			value => ColumnBuffer::from(value.clone()),
120		};
121		if let Some(alias) = alias {
122			let aliased_name = Fragment::internal(format!("{}.{}", alias.text(), col.name().text()));
123			eval_columns.push(ColumnWithName {
124				name: aliased_name,
125				data,
126			});
127		} else {
128			eval_columns.push(ColumnWithName::new(col.name().clone(), data));
129		}
130	}
131
132	eval_columns
133}
134
135/// Common context holder for join nodes
136pub struct JoinContext {
137	pub context: Option<Arc<QueryContext>>,
138	pub compiled: Vec<CompiledExpr>,
139}
140
141impl Default for JoinContext {
142	fn default() -> Self {
143		Self::new()
144	}
145}
146
147impl JoinContext {
148	pub fn new() -> Self {
149		Self {
150			context: None,
151			compiled: vec![],
152		}
153	}
154
155	pub fn set(&mut self, ctx: &QueryContext) {
156		self.context = Some(Arc::new(ctx.clone()));
157	}
158
159	pub fn get(&self) -> &Arc<QueryContext> {
160		self.context.as_ref().expect("Join context not initialized")
161	}
162
163	pub fn is_initialized(&self) -> bool {
164		self.context.is_some()
165	}
166}
167
168/// Compute a hash over the values at the given column indices for a single row.
169/// Returns `None` if any key value is `Undefined` (NULL != NULL semantics).
170/// The `buf` parameter is a reusable scratch buffer to avoid per-row allocation.
171pub(crate) fn compute_join_hash(
172	columns: &Columns,
173	col_indices: &[usize],
174	row_idx: usize,
175	buf: &mut Vec<u8>,
176) -> Option<Hash128> {
177	buf.clear();
178	for &idx in col_indices {
179		let value = columns[idx].get_value(row_idx);
180		if matches!(value, Value::None { .. }) {
181			return None;
182		}
183		let bytes = to_stdvec(&value).ok()?;
184		buf.extend_from_slice(&bytes);
185	}
186	Some(xxh3_128(buf))
187}
188
189/// Check actual key equality between two rows by column indices.
190pub(crate) fn keys_equal_by_index(
191	left: &Columns,
192	left_row: usize,
193	left_indices: &[usize],
194	right: &Columns,
195	right_row: usize,
196	right_indices: &[usize],
197) -> bool {
198	for (&li, &ri) in left_indices.iter().zip(right_indices.iter()) {
199		let lv = left[li].get_value(left_row);
200		let rv = right[ri].get_value(right_row);
201		if lv != rv {
202			return false;
203		}
204	}
205	true
206}
207
208/// Evaluate compiled join condition predicates for a (left_row, right_row) pair.
209pub(crate) fn eval_join_condition(
210	compiled: &[CompiledExpr],
211	left_columns: &Columns,
212	right_columns: &Columns,
213	left_row: &[Value],
214	right_row: &[Value],
215	alias: &Option<Fragment>,
216	ctx: &QueryContext,
217) -> bool {
218	if compiled.is_empty() {
219		return true;
220	}
221	let eval_columns = build_eval_columns(left_columns, right_columns, left_row, right_row, alias);
222	let session = EvalContext::from_query(ctx);
223	let exec_ctx = session.with_eval_join(Columns::new(eval_columns));
224	compiled.iter().all(|compiled_expr| {
225		let col = compiled_expr.execute(&exec_ctx).unwrap();
226		matches!(col.data().get_value(0), Value::Boolean(true))
227	})
228}