reifydb_engine/vm/volcano/join/
common.rs1use std::sync::Arc;
5
6use postcard::to_stdvec;
7use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns};
8use reifydb_runtime::hash::{Hash128, xxh3_128};
9use reifydb_transaction::transaction::Transaction;
10use reifydb_type::{fragment::Fragment, value::Value};
11
12use crate::{
13 Result,
14 expression::{compile::CompiledExpr, context::EvalContext},
15 vm::volcano::query::{QueryContext, QueryNode},
16};
17
18pub(crate) fn load_and_merge_all<'a>(
20 node: &mut Box<dyn QueryNode>,
21 rx: &mut Transaction<'a>,
22 ctx: &mut QueryContext,
23) -> Result<Columns> {
24 let mut result: Option<Columns> = None;
25
26 while let Some(columns) = node.next(rx, ctx)? {
27 if let Some(mut acc) = result.take() {
28 acc.append_columns(columns)?;
29 result = Some(acc);
30 } else {
31 result = Some(columns);
32 }
33 }
34 let result = result.unwrap_or_else(Columns::empty);
35 Ok(result)
36}
37
38pub struct ResolvedColumnNames {
40 pub qualified_names: Vec<String>,
41}
42
43pub fn resolve_column_names(
45 left_columns: &Columns,
46 right_columns: &Columns,
47 alias: &Option<Fragment>,
48 excluded_right_indices: Option<&[usize]>,
49) -> ResolvedColumnNames {
50 let mut qualified_names = Vec::new();
51
52 for col in left_columns.iter() {
54 qualified_names.push(col.name().text().to_string());
55 }
56
57 for (idx, col) in right_columns.iter().enumerate() {
59 if let Some(excluded) = excluded_right_indices
61 && excluded.contains(&idx)
62 {
63 continue;
64 }
65
66 let col_name = col.name().text();
67
68 let alias_text = alias.as_ref().map(|a| a.text()).unwrap_or("other");
70 let prefixed_name = format!("{}_{}", alias_text, col_name);
71
72 let mut final_name = prefixed_name.clone();
74 if qualified_names.contains(&final_name) {
75 let mut counter = 2;
76 loop {
77 let candidate = format!("{}_{}", prefixed_name, counter);
78 if !qualified_names.contains(&candidate) {
79 final_name = candidate;
80 break;
81 }
82 counter += 1;
83 }
84 }
85
86 qualified_names.push(final_name);
87 }
88
89 ResolvedColumnNames {
90 qualified_names,
91 }
92}
93
94pub fn build_eval_columns(
96 left_columns: &Columns,
97 right_columns: &Columns,
98 left_row: &[Value],
99 right_row: &[Value],
100 alias: &Option<Fragment>,
101) -> Vec<ColumnWithName> {
102 let mut eval_columns = Vec::new();
103
104 for (idx, col) in left_columns.iter().enumerate() {
105 let data = match &left_row[idx] {
106 Value::None {
107 ..
108 } => ColumnBuffer::typed_none(&col.get_type()),
109 value => ColumnBuffer::from(value.clone()),
110 };
111 eval_columns.push(ColumnWithName::new(col.name().clone(), data));
112 }
113
114 for (idx, col) in right_columns.iter().enumerate() {
115 let data = match &right_row[idx] {
116 Value::None {
117 ..
118 } => ColumnBuffer::typed_none(&col.get_type()),
119 value => ColumnBuffer::from(value.clone()),
120 };
121 if let Some(alias) = alias {
122 let aliased_name = Fragment::internal(format!("{}.{}", alias.text(), col.name().text()));
123 eval_columns.push(ColumnWithName {
124 name: aliased_name,
125 data,
126 });
127 } else {
128 eval_columns.push(ColumnWithName::new(col.name().clone(), data));
129 }
130 }
131
132 eval_columns
133}
134
135pub struct JoinContext {
137 pub context: Option<Arc<QueryContext>>,
138 pub compiled: Vec<CompiledExpr>,
139}
140
141impl Default for JoinContext {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147impl JoinContext {
148 pub fn new() -> Self {
149 Self {
150 context: None,
151 compiled: vec![],
152 }
153 }
154
155 pub fn set(&mut self, ctx: &QueryContext) {
156 self.context = Some(Arc::new(ctx.clone()));
157 }
158
159 pub fn get(&self) -> &Arc<QueryContext> {
160 self.context.as_ref().expect("Join context not initialized")
161 }
162
163 pub fn is_initialized(&self) -> bool {
164 self.context.is_some()
165 }
166}
167
168pub(crate) fn compute_join_hash(
172 columns: &Columns,
173 col_indices: &[usize],
174 row_idx: usize,
175 buf: &mut Vec<u8>,
176) -> Option<Hash128> {
177 buf.clear();
178 for &idx in col_indices {
179 let value = columns[idx].get_value(row_idx);
180 if matches!(value, Value::None { .. }) {
181 return None;
182 }
183 let bytes = to_stdvec(&value).ok()?;
184 buf.extend_from_slice(&bytes);
185 }
186 Some(xxh3_128(buf))
187}
188
189pub(crate) fn keys_equal_by_index(
191 left: &Columns,
192 left_row: usize,
193 left_indices: &[usize],
194 right: &Columns,
195 right_row: usize,
196 right_indices: &[usize],
197) -> bool {
198 for (&li, &ri) in left_indices.iter().zip(right_indices.iter()) {
199 let lv = left[li].get_value(left_row);
200 let rv = right[ri].get_value(right_row);
201 if lv != rv {
202 return false;
203 }
204 }
205 true
206}
207
208pub(crate) fn eval_join_condition(
210 compiled: &[CompiledExpr],
211 left_columns: &Columns,
212 right_columns: &Columns,
213 left_row: &[Value],
214 right_row: &[Value],
215 alias: &Option<Fragment>,
216 ctx: &QueryContext,
217) -> bool {
218 if compiled.is_empty() {
219 return true;
220 }
221 let eval_columns = build_eval_columns(left_columns, right_columns, left_row, right_row, alias);
222 let session = EvalContext::from_query(ctx);
223 let exec_ctx = session.with_eval_join(Columns::new(eval_columns));
224 compiled.iter().all(|compiled_expr| {
225 let col = compiled_expr.execute(&exec_ctx).unwrap();
226 matches!(col.data().get_value(0), Value::Boolean(true))
227 })
228}