cynos_query/executor/join/
hash.rs1use crate::executor::{Relation, RelationEntry, SharedTables};
4use alloc::rc::Rc;
5use alloc::sync::Arc;
6use alloc::vec::Vec;
7use cynos_core::{Row, Value};
8use core::hash::{Hash, Hasher};
9use hashbrown::HashMap;
10
11#[derive(Clone, Copy)]
14struct ValueRef<'a>(&'a Value);
15
16impl<'a> Hash for ValueRef<'a> {
17 #[inline]
18 fn hash<H: Hasher>(&self, state: &mut H) {
19 self.0.hash(state);
20 }
21}
22
23impl<'a> PartialEq for ValueRef<'a> {
24 #[inline]
25 fn eq(&self, other: &Self) -> bool {
26 self.0 == other.0
27 }
28}
29
30impl<'a> Eq for ValueRef<'a> {}
31
32pub struct HashJoin {
38 left_key_index: usize,
40 right_key_index: usize,
42 is_outer_join: bool,
44}
45
46impl HashJoin {
47 pub fn new(left_key_index: usize, right_key_index: usize, is_outer_join: bool) -> Self {
49 Self {
50 left_key_index,
51 right_key_index,
52 is_outer_join,
53 }
54 }
55
56 pub fn inner(left_key_index: usize, right_key_index: usize) -> Self {
58 Self::new(left_key_index, right_key_index, false)
59 }
60
61 pub fn left_outer(left_key_index: usize, right_key_index: usize) -> Self {
63 Self::new(left_key_index, right_key_index, true)
64 }
65
66 pub fn execute(&self, left: Relation, right: Relation) -> Relation {
68 let (build_rel, probe_rel, build_key_idx, probe_key_idx, swap) = if self.is_outer_join {
71 (&right, &left, self.right_key_index, self.left_key_index, true)
72 } else if left.len() <= right.len() {
73 (&left, &right, self.left_key_index, self.right_key_index, false)
74 } else {
75 (&right, &left, self.right_key_index, self.left_key_index, true)
76 };
77
78 let mut hash_table: HashMap<ValueRef<'_>, Vec<u32>> =
80 HashMap::with_capacity(build_rel.len());
81
82 for (idx, entry) in build_rel.entries.iter().enumerate() {
83 if let Some(key_value) = entry.get_field(build_key_idx) {
84 if !key_value.is_null() {
85 hash_table
86 .entry(ValueRef(key_value))
87 .or_default()
88 .push(idx as u32);
89 }
90 }
91 }
92
93 let build_col_count = build_rel
95 .entries
96 .first()
97 .map(|e| e.row.len())
98 .unwrap_or(0);
99 let probe_col_count = probe_rel
100 .entries
101 .first()
102 .map(|e| e.row.len())
103 .unwrap_or(0);
104 let total_col_count = if swap {
105 probe_col_count + build_col_count
106 } else {
107 build_col_count + probe_col_count
108 };
109
110 let combined_tables: SharedTables = if swap {
112 let mut t = probe_rel.tables.clone();
113 t.extend(build_rel.tables.iter().cloned());
114 Arc::from(t)
115 } else {
116 let mut t = build_rel.tables.clone();
117 t.extend(probe_rel.tables.iter().cloned());
118 Arc::from(t)
119 };
120
121 let avg_matches_per_key = if !hash_table.is_empty() {
123 build_rel.len() / hash_table.len()
124 } else {
125 1
126 };
127 let estimated_matches = probe_rel.len() * avg_matches_per_key;
128 let mut result_entries = Vec::with_capacity(estimated_matches);
129
130 for probe_entry in probe_rel.entries.iter() {
131 let key_value = probe_entry.get_field(probe_key_idx);
132 let mut matched = false;
133
134 if let Some(kv) = key_value {
135 if !kv.is_null() {
136 if let Some(build_indices) = hash_table.get(&ValueRef(kv)) {
137 matched = true;
138 for &build_idx in build_indices {
139 let build_entry = &build_rel.entries[build_idx as usize];
140
141 let mut values = Vec::with_capacity(total_col_count);
143 let combined_version = if swap {
145 values.extend(probe_entry.row.values().iter().cloned());
146 values.extend(build_entry.row.values().iter().cloned());
147 probe_entry.row.version().wrapping_add(build_entry.row.version())
148 } else {
149 values.extend(build_entry.row.values().iter().cloned());
150 values.extend(probe_entry.row.values().iter().cloned());
151 build_entry.row.version().wrapping_add(probe_entry.row.version())
152 };
153
154 result_entries.push(RelationEntry::new_combined(
155 Rc::new(Row::dummy_with_version(combined_version, values)),
156 Arc::clone(&combined_tables),
157 ));
158 }
159 }
160 }
161 }
162
163 if self.is_outer_join && !matched {
165 let mut values = Vec::with_capacity(total_col_count);
166 values.extend(probe_entry.row.values().iter().cloned());
167 values.resize(total_col_count, Value::Null);
168 let combined_version = probe_entry.row.version();
170
171 result_entries.push(RelationEntry::new_combined(
172 Rc::new(Row::dummy_with_version(combined_version, values)),
173 Arc::clone(&combined_tables),
174 ));
175 }
176 }
177
178 let combined_column_counts: Vec<usize> = if swap {
180 let mut counts = probe_rel.table_column_counts.clone();
181 counts.extend(build_rel.table_column_counts.iter().cloned());
182 counts
183 } else {
184 let mut counts = build_rel.table_column_counts.clone();
185 counts.extend(probe_rel.table_column_counts.iter().cloned());
186 counts
187 };
188
189 Relation {
190 entries: result_entries,
191 tables: combined_tables.to_vec(),
192 table_column_counts: combined_column_counts,
193 }
194 }
195}
196
197#[allow(dead_code)]
199pub fn hash_join<L, R, K, O, LK, RK, OF>(
200 left: &[L],
201 right: &[R],
202 left_key: LK,
203 right_key: RK,
204 output_fn: OF,
205) -> Vec<O>
206where
207 K: Eq + core::hash::Hash + Clone,
208 LK: Fn(&L) -> K,
209 RK: Fn(&R) -> K,
210 OF: Fn(&L, &R) -> O,
211{
212 let mut hash_table: HashMap<K, Vec<&L>> = HashMap::new();
214 for item in left {
215 let key = left_key(item);
216 hash_table.entry(key).or_default().push(item);
217 }
218
219 let mut results = Vec::new();
221 for item in right {
222 let key = right_key(item);
223 if let Some(matches) = hash_table.get(&key) {
224 for left_item in matches {
225 results.push(output_fn(left_item, item));
226 }
227 }
228 }
229
230 results
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use cynos_core::Row;
237 use alloc::vec;
238
239 #[test]
240 fn test_hash_join_inner() {
241 let left_rows = vec![
242 Row::new(0, vec![Value::Int64(1), Value::String("A".into())]),
243 Row::new(1, vec![Value::Int64(2), Value::String("B".into())]),
244 Row::new(2, vec![Value::Int64(3), Value::String("C".into())]),
245 ];
246 let right_rows = vec![
247 Row::new(10, vec![Value::Int64(1), Value::String("X".into())]),
248 Row::new(11, vec![Value::Int64(2), Value::String("Y".into())]),
249 Row::new(12, vec![Value::Int64(4), Value::String("Z".into())]),
250 ];
251
252 let left = Relation::from_rows_owned(left_rows, vec!["left".into()]);
253 let right = Relation::from_rows_owned(right_rows, vec!["right".into()]);
254
255 let join = HashJoin::inner(0, 0);
256 let result = join.execute(left, right);
257
258 assert_eq!(result.len(), 2);
260 }
261
262 #[test]
263 fn test_hash_join_left_outer() {
264 let left_rows = vec![
265 Row::new(0, vec![Value::Int64(1)]),
266 Row::new(1, vec![Value::Int64(2)]),
267 Row::new(2, vec![Value::Int64(3)]),
268 ];
269 let right_rows = vec![
270 Row::new(10, vec![Value::Int64(1)]),
271 Row::new(11, vec![Value::Int64(4)]),
272 ];
273
274 let left = Relation::from_rows_owned(left_rows, vec!["left".into()]);
275 let right = Relation::from_rows_owned(right_rows, vec!["right".into()]);
276
277 let join = HashJoin::left_outer(0, 0);
278 let result = join.execute(left, right);
279
280 assert_eq!(result.len(), 3);
282 }
283
284 #[test]
285 fn test_hash_join_function() {
286 let left = vec![(1, "A"), (2, "B"), (3, "C")];
287 let right = vec![(1, "X"), (2, "Y"), (4, "Z")];
288
289 let result = hash_join(
290 &left,
291 &right,
292 |l| l.0,
293 |r| r.0,
294 |l, r| (l.1, r.1),
295 );
296
297 assert_eq!(result.len(), 2);
298 assert!(result.contains(&("A", "X")));
299 assert!(result.contains(&("B", "Y")));
300 }
301
302 #[test]
303 fn test_hash_join_with_nulls() {
304 let left_rows = vec![
305 Row::new(0, vec![Value::Int64(1)]),
306 Row::new(1, vec![Value::Null]),
307 ];
308 let right_rows = vec![
309 Row::new(10, vec![Value::Int64(1)]),
310 Row::new(11, vec![Value::Null]),
311 ];
312
313 let left = Relation::from_rows_owned(left_rows, vec!["left".into()]);
314 let right = Relation::from_rows_owned(right_rows, vec!["right".into()]);
315
316 let join = HashJoin::inner(0, 0);
317 let result = join.execute(left, right);
318
319 assert_eq!(result.len(), 1);
321 }
322}