reifydb_engine/vm/volcano/join/
common.rs1use std::sync::Arc;
5
6use postcard::to_stdvec;
7use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns};
8use reifydb_runtime::hash::{Hash128, xxh3_128};
9use reifydb_transaction::transaction::Transaction;
10use reifydb_type::{fragment::Fragment, value::Value};
11
12use crate::{
13 Result,
14 expression::{compile::CompiledExpr, context::EvalContext},
15 vm::volcano::query::{QueryContext, QueryNode},
16};
17
18pub(crate) fn load_and_merge_all<'a>(
19 node: &mut Box<dyn QueryNode>,
20 rx: &mut Transaction<'a>,
21 ctx: &mut QueryContext,
22) -> Result<Columns> {
23 let mut result: Option<Columns> = None;
24
25 while let Some(columns) = node.next(rx, ctx)? {
26 if let Some(mut acc) = result.take() {
27 acc.append_columns(columns)?;
28 result = Some(acc);
29 } else {
30 result = Some(columns);
31 }
32 }
33 let result = result.unwrap_or_else(Columns::empty);
34 Ok(result)
35}
36
37pub struct ResolvedColumnNames {
38 pub qualified_names: Vec<String>,
39}
40
41pub fn resolve_column_names(
42 left_columns: &Columns,
43 right_columns: &Columns,
44 alias: &Option<Fragment>,
45 excluded_right_indices: Option<&[usize]>,
46) -> ResolvedColumnNames {
47 let mut qualified_names = Vec::new();
48
49 for col in left_columns.iter() {
50 qualified_names.push(col.name().text().to_string());
51 }
52
53 for (idx, col) in right_columns.iter().enumerate() {
54 if let Some(excluded) = excluded_right_indices
55 && excluded.contains(&idx)
56 {
57 continue;
58 }
59
60 let col_name = col.name().text();
61
62 let alias_text = alias.as_ref().map(|a| a.text()).unwrap_or("other");
63 let prefixed_name = format!("{}_{}", alias_text, col_name);
64
65 let mut final_name = prefixed_name.clone();
66 if qualified_names.contains(&final_name) {
67 let mut counter = 2;
68 loop {
69 let candidate = format!("{}_{}", prefixed_name, counter);
70 if !qualified_names.contains(&candidate) {
71 final_name = candidate;
72 break;
73 }
74 counter += 1;
75 }
76 }
77
78 qualified_names.push(final_name);
79 }
80
81 ResolvedColumnNames {
82 qualified_names,
83 }
84}
85
86pub fn build_eval_columns(
87 left_columns: &Columns,
88 right_columns: &Columns,
89 left_row: &[Value],
90 right_row: &[Value],
91 alias: &Option<Fragment>,
92) -> Vec<ColumnWithName> {
93 let mut eval_columns = Vec::new();
94
95 for (idx, col) in left_columns.iter().enumerate() {
96 let data = match &left_row[idx] {
97 Value::None {
98 ..
99 } => ColumnBuffer::typed_none(&col.get_type()),
100 value => ColumnBuffer::from(value.clone()),
101 };
102 eval_columns.push(ColumnWithName::new(col.name().clone(), data));
103 }
104
105 for (idx, col) in right_columns.iter().enumerate() {
106 let data = match &right_row[idx] {
107 Value::None {
108 ..
109 } => ColumnBuffer::typed_none(&col.get_type()),
110 value => ColumnBuffer::from(value.clone()),
111 };
112 if let Some(alias) = alias {
113 let aliased_name = Fragment::internal(format!("{}.{}", alias.text(), col.name().text()));
114 eval_columns.push(ColumnWithName {
115 name: aliased_name,
116 data,
117 });
118 } else {
119 eval_columns.push(ColumnWithName::new(col.name().clone(), data));
120 }
121 }
122
123 eval_columns
124}
125
126pub struct JoinContext {
127 pub context: Option<Arc<QueryContext>>,
128 pub compiled: Vec<CompiledExpr>,
129}
130
131impl Default for JoinContext {
132 fn default() -> Self {
133 Self::new()
134 }
135}
136
137impl JoinContext {
138 pub fn new() -> Self {
139 Self {
140 context: None,
141 compiled: vec![],
142 }
143 }
144
145 pub fn set(&mut self, ctx: &QueryContext) {
146 self.context = Some(Arc::new(ctx.clone()));
147 }
148
149 pub fn get(&self) -> &Arc<QueryContext> {
150 self.context.as_ref().expect("Join context not initialized")
151 }
152
153 pub fn is_initialized(&self) -> bool {
154 self.context.is_some()
155 }
156}
157
158pub(crate) fn compute_join_hash(
159 columns: &Columns,
160 col_indices: &[usize],
161 row_idx: usize,
162 buf: &mut Vec<u8>,
163) -> Option<Hash128> {
164 buf.clear();
165 for &idx in col_indices {
166 let value = columns[idx].get_value(row_idx);
167 if matches!(value, Value::None { .. }) {
168 return None;
169 }
170 let bytes = to_stdvec(&value).ok()?;
171 buf.extend_from_slice(&bytes);
172 }
173 Some(xxh3_128(buf))
174}
175
176pub(crate) fn keys_equal_by_index(
177 left: &Columns,
178 left_row: usize,
179 left_indices: &[usize],
180 right: &Columns,
181 right_row: usize,
182 right_indices: &[usize],
183) -> bool {
184 for (&li, &ri) in left_indices.iter().zip(right_indices.iter()) {
185 let lv = left[li].get_value(left_row);
186 let rv = right[ri].get_value(right_row);
187 if lv != rv {
188 return false;
189 }
190 }
191 true
192}
193
194pub(crate) fn eval_join_condition(
195 compiled: &[CompiledExpr],
196 left_columns: &Columns,
197 right_columns: &Columns,
198 left_row: &[Value],
199 right_row: &[Value],
200 alias: &Option<Fragment>,
201 ctx: &QueryContext,
202) -> bool {
203 if compiled.is_empty() {
204 return true;
205 }
206 let eval_columns = build_eval_columns(left_columns, right_columns, left_row, right_row, alias);
207 let session = EvalContext::from_query(ctx);
208 let exec_ctx = session.with_eval_join(Columns::new(eval_columns));
209 compiled.iter().all(|compiled_expr| {
210 let col = compiled_expr.execute(&exec_ctx).unwrap();
211 matches!(col.data().get_value(0), Value::Boolean(true))
212 })
213}