Skip to main content

reifydb_engine/vm/volcano/join/
common.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use postcard::to_stdvec;
7use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns};
8use reifydb_runtime::hash::{Hash128, xxh3_128};
9use reifydb_transaction::transaction::Transaction;
10use reifydb_type::{fragment::Fragment, value::Value};
11
12use crate::{
13	Result,
14	expression::{compile::CompiledExpr, context::EvalContext},
15	vm::volcano::query::{QueryContext, QueryNode},
16};
17
18pub(crate) fn load_and_merge_all<'a>(
19	node: &mut Box<dyn QueryNode>,
20	rx: &mut Transaction<'a>,
21	ctx: &mut QueryContext,
22) -> Result<Columns> {
23	let mut result: Option<Columns> = None;
24
25	while let Some(columns) = node.next(rx, ctx)? {
26		if let Some(mut acc) = result.take() {
27			acc.append_columns(columns)?;
28			result = Some(acc);
29		} else {
30			result = Some(columns);
31		}
32	}
33	let result = result.unwrap_or_else(Columns::empty);
34	Ok(result)
35}
36
37pub struct ResolvedColumnNames {
38	pub qualified_names: Vec<String>,
39}
40
41pub fn resolve_column_names(
42	left_columns: &Columns,
43	right_columns: &Columns,
44	alias: &Option<Fragment>,
45	excluded_right_indices: Option<&[usize]>,
46) -> ResolvedColumnNames {
47	let mut qualified_names = Vec::new();
48
49	for col in left_columns.iter() {
50		qualified_names.push(col.name().text().to_string());
51	}
52
53	for (idx, col) in right_columns.iter().enumerate() {
54		if let Some(excluded) = excluded_right_indices
55			&& excluded.contains(&idx)
56		{
57			continue;
58		}
59
60		let col_name = col.name().text();
61
62		let alias_text = alias.as_ref().map(|a| a.text()).unwrap_or("other");
63		let prefixed_name = format!("{}_{}", alias_text, col_name);
64
65		let mut final_name = prefixed_name.clone();
66		if qualified_names.contains(&final_name) {
67			let mut counter = 2;
68			loop {
69				let candidate = format!("{}_{}", prefixed_name, counter);
70				if !qualified_names.contains(&candidate) {
71					final_name = candidate;
72					break;
73				}
74				counter += 1;
75			}
76		}
77
78		qualified_names.push(final_name);
79	}
80
81	ResolvedColumnNames {
82		qualified_names,
83	}
84}
85
86pub fn build_eval_columns(
87	left_columns: &Columns,
88	right_columns: &Columns,
89	left_row: &[Value],
90	right_row: &[Value],
91	alias: &Option<Fragment>,
92) -> Vec<ColumnWithName> {
93	let mut eval_columns = Vec::new();
94
95	for (idx, col) in left_columns.iter().enumerate() {
96		let data = match &left_row[idx] {
97			Value::None {
98				..
99			} => ColumnBuffer::typed_none(&col.get_type()),
100			value => ColumnBuffer::from(value.clone()),
101		};
102		eval_columns.push(ColumnWithName::new(col.name().clone(), data));
103	}
104
105	for (idx, col) in right_columns.iter().enumerate() {
106		let data = match &right_row[idx] {
107			Value::None {
108				..
109			} => ColumnBuffer::typed_none(&col.get_type()),
110			value => ColumnBuffer::from(value.clone()),
111		};
112		if let Some(alias) = alias {
113			let aliased_name = Fragment::internal(format!("{}.{}", alias.text(), col.name().text()));
114			eval_columns.push(ColumnWithName {
115				name: aliased_name,
116				data,
117			});
118		} else {
119			eval_columns.push(ColumnWithName::new(col.name().clone(), data));
120		}
121	}
122
123	eval_columns
124}
125
126pub struct JoinContext {
127	pub context: Option<Arc<QueryContext>>,
128	pub compiled: Vec<CompiledExpr>,
129}
130
131impl Default for JoinContext {
132	fn default() -> Self {
133		Self::new()
134	}
135}
136
137impl JoinContext {
138	pub fn new() -> Self {
139		Self {
140			context: None,
141			compiled: vec![],
142		}
143	}
144
145	pub fn set(&mut self, ctx: &QueryContext) {
146		self.context = Some(Arc::new(ctx.clone()));
147	}
148
149	pub fn get(&self) -> &Arc<QueryContext> {
150		self.context.as_ref().expect("Join context not initialized")
151	}
152
153	pub fn is_initialized(&self) -> bool {
154		self.context.is_some()
155	}
156}
157
158pub(crate) fn compute_join_hash(
159	columns: &Columns,
160	col_indices: &[usize],
161	row_idx: usize,
162	buf: &mut Vec<u8>,
163) -> Option<Hash128> {
164	buf.clear();
165	for &idx in col_indices {
166		let value = columns[idx].get_value(row_idx);
167		if matches!(value, Value::None { .. }) {
168			return None;
169		}
170		let bytes = to_stdvec(&value).ok()?;
171		buf.extend_from_slice(&bytes);
172	}
173	Some(xxh3_128(buf))
174}
175
176pub(crate) fn keys_equal_by_index(
177	left: &Columns,
178	left_row: usize,
179	left_indices: &[usize],
180	right: &Columns,
181	right_row: usize,
182	right_indices: &[usize],
183) -> bool {
184	for (&li, &ri) in left_indices.iter().zip(right_indices.iter()) {
185		let lv = left[li].get_value(left_row);
186		let rv = right[ri].get_value(right_row);
187		if lv != rv {
188			return false;
189		}
190	}
191	true
192}
193
194pub(crate) fn eval_join_condition(
195	compiled: &[CompiledExpr],
196	left_columns: &Columns,
197	right_columns: &Columns,
198	left_row: &[Value],
199	right_row: &[Value],
200	alias: &Option<Fragment>,
201	ctx: &QueryContext,
202) -> bool {
203	if compiled.is_empty() {
204		return true;
205	}
206	let eval_columns = build_eval_columns(left_columns, right_columns, left_row, right_row, alias);
207	let session = EvalContext::from_query(ctx);
208	let exec_ctx = session.with_eval_join(Columns::new(eval_columns));
209	compiled.iter().all(|compiled_expr| {
210		let col = compiled_expr.execute(&exec_ctx).unwrap();
211		matches!(col.data().get_value(0), Value::Boolean(true))
212	})
213}