Skip to main content

cynos_query/executor/join/
hash.rs

1//! Hash Join implementation.
2
3use crate::executor::{Relation, RelationEntry, SharedTables};
4use alloc::rc::Rc;
5use alloc::sync::Arc;
6use alloc::vec::Vec;
7use cynos_core::{Row, Value};
8use core::hash::{Hash, Hasher};
9use hashbrown::HashMap;
10
11/// A wrapper around Value reference that implements Hash and Eq for use as HashMap key.
12/// This avoids cloning Value during hash table operations.
13#[derive(Clone, Copy)]
14struct ValueRef<'a>(&'a Value);
15
16impl<'a> Hash for ValueRef<'a> {
17    #[inline]
18    fn hash<H: Hasher>(&self, state: &mut H) {
19        self.0.hash(state);
20    }
21}
22
23impl<'a> PartialEq for ValueRef<'a> {
24    #[inline]
25    fn eq(&self, other: &Self) -> bool {
26        self.0 == other.0
27    }
28}
29
30impl<'a> Eq for ValueRef<'a> {}
31
32/// Hash Join executor.
33///
34/// Implements the classic hash join algorithm:
35/// 1. Build phase: Create a hash table from the smaller relation
36/// 2. Probe phase: Scan the larger relation and probe the hash table
37pub struct HashJoin {
38    /// Column index for the left (build) relation.
39    left_key_index: usize,
40    /// Column index for the right (probe) relation.
41    right_key_index: usize,
42    /// Whether this is an outer join.
43    is_outer_join: bool,
44}
45
46impl HashJoin {
47    /// Creates a new hash join executor.
48    pub fn new(left_key_index: usize, right_key_index: usize, is_outer_join: bool) -> Self {
49        Self {
50            left_key_index,
51            right_key_index,
52            is_outer_join,
53        }
54    }
55
56    /// Creates an inner hash join.
57    pub fn inner(left_key_index: usize, right_key_index: usize) -> Self {
58        Self::new(left_key_index, right_key_index, false)
59    }
60
61    /// Creates a left outer hash join.
62    pub fn left_outer(left_key_index: usize, right_key_index: usize) -> Self {
63        Self::new(left_key_index, right_key_index, true)
64    }
65
66    /// Executes the hash join.
67    pub fn execute(&self, left: Relation, right: Relation) -> Relation {
68        // Determine which relation to use for build vs probe
69        // For outer join, we must use right for build (to preserve all left rows)
70        let (build_rel, probe_rel, build_key_idx, probe_key_idx, swap) = if self.is_outer_join {
71            (&right, &left, self.right_key_index, self.left_key_index, true)
72        } else if left.len() <= right.len() {
73            (&left, &right, self.left_key_index, self.right_key_index, false)
74        } else {
75            (&right, &left, self.right_key_index, self.left_key_index, true)
76        };
77
78        // Build phase: create hash table mapping key values to entry indices
79        let mut hash_table: HashMap<ValueRef<'_>, Vec<u32>> =
80            HashMap::with_capacity(build_rel.len());
81
82        for (idx, entry) in build_rel.entries.iter().enumerate() {
83            if let Some(key_value) = entry.get_field(build_key_idx) {
84                if !key_value.is_null() {
85                    hash_table
86                        .entry(ValueRef(key_value))
87                        .or_default()
88                        .push(idx as u32);
89                }
90            }
91        }
92
93        // Probe phase
94        let build_col_count = build_rel
95            .entries
96            .first()
97            .map(|e| e.row.len())
98            .unwrap_or(0);
99        let probe_col_count = probe_rel
100            .entries
101            .first()
102            .map(|e| e.row.len())
103            .unwrap_or(0);
104        let total_col_count = if swap {
105            probe_col_count + build_col_count
106        } else {
107            build_col_count + probe_col_count
108        };
109
110        // Pre-compute combined tables once (shared via Arc)
111        let combined_tables: SharedTables = if swap {
112            let mut t = probe_rel.tables.clone();
113            t.extend(build_rel.tables.iter().cloned());
114            Arc::from(t)
115        } else {
116            let mut t = build_rel.tables.clone();
117            t.extend(probe_rel.tables.iter().cloned());
118            Arc::from(t)
119        };
120
121        // Estimate result size for pre-allocation
122        let avg_matches_per_key = if !hash_table.is_empty() {
123            build_rel.len() / hash_table.len()
124        } else {
125            1
126        };
127        let estimated_matches = probe_rel.len() * avg_matches_per_key;
128        let mut result_entries = Vec::with_capacity(estimated_matches);
129
130        for probe_entry in probe_rel.entries.iter() {
131            let key_value = probe_entry.get_field(probe_key_idx);
132            let mut matched = false;
133
134            if let Some(kv) = key_value {
135                if !kv.is_null() {
136                    if let Some(build_indices) = hash_table.get(&ValueRef(kv)) {
137                        matched = true;
138                        for &build_idx in build_indices {
139                            let build_entry = &build_rel.entries[build_idx as usize];
140
141                            // Inline combine to avoid function call overhead
142                            let mut values = Vec::with_capacity(total_col_count);
143                            // Compute sum version for JOIN result
144                            let combined_version = if swap {
145                                values.extend(probe_entry.row.values().iter().cloned());
146                                values.extend(build_entry.row.values().iter().cloned());
147                                probe_entry.row.version().wrapping_add(build_entry.row.version())
148                            } else {
149                                values.extend(build_entry.row.values().iter().cloned());
150                                values.extend(probe_entry.row.values().iter().cloned());
151                                build_entry.row.version().wrapping_add(probe_entry.row.version())
152                            };
153
154                            result_entries.push(RelationEntry::new_combined(
155                                Rc::new(Row::dummy_with_version(combined_version, values)),
156                                Arc::clone(&combined_tables),
157                            ));
158                        }
159                    }
160                }
161            }
162
163            // For outer join, add unmatched probe entries with nulls
164            if self.is_outer_join && !matched {
165                let mut values = Vec::with_capacity(total_col_count);
166                values.extend(probe_entry.row.values().iter().cloned());
167                values.resize(total_col_count, Value::Null);
168                // For unmatched rows, use probe's version (the other side is NULL)
169                let combined_version = probe_entry.row.version();
170
171                result_entries.push(RelationEntry::new_combined(
172                    Rc::new(Row::dummy_with_version(combined_version, values)),
173                    Arc::clone(&combined_tables),
174                ));
175            }
176        }
177
178        // Compute combined table column counts
179        let combined_column_counts: Vec<usize> = if swap {
180            let mut counts = probe_rel.table_column_counts.clone();
181            counts.extend(build_rel.table_column_counts.iter().cloned());
182            counts
183        } else {
184            let mut counts = build_rel.table_column_counts.clone();
185            counts.extend(probe_rel.table_column_counts.iter().cloned());
186            counts
187        };
188
189        Relation {
190            entries: result_entries,
191            tables: combined_tables.to_vec(),
192            table_column_counts: combined_column_counts,
193        }
194    }
195}
196
197/// Performs a hash join using key extraction functions.
198#[allow(dead_code)]
199pub fn hash_join<L, R, K, O, LK, RK, OF>(
200    left: &[L],
201    right: &[R],
202    left_key: LK,
203    right_key: RK,
204    output_fn: OF,
205) -> Vec<O>
206where
207    K: Eq + core::hash::Hash + Clone,
208    LK: Fn(&L) -> K,
209    RK: Fn(&R) -> K,
210    OF: Fn(&L, &R) -> O,
211{
212    // Build phase
213    let mut hash_table: HashMap<K, Vec<&L>> = HashMap::new();
214    for item in left {
215        let key = left_key(item);
216        hash_table.entry(key).or_default().push(item);
217    }
218
219    // Probe phase
220    let mut results = Vec::new();
221    for item in right {
222        let key = right_key(item);
223        if let Some(matches) = hash_table.get(&key) {
224            for left_item in matches {
225                results.push(output_fn(left_item, item));
226            }
227        }
228    }
229
230    results
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use cynos_core::Row;
237    use alloc::vec;
238
239    #[test]
240    fn test_hash_join_inner() {
241        let left_rows = vec![
242            Row::new(0, vec![Value::Int64(1), Value::String("A".into())]),
243            Row::new(1, vec![Value::Int64(2), Value::String("B".into())]),
244            Row::new(2, vec![Value::Int64(3), Value::String("C".into())]),
245        ];
246        let right_rows = vec![
247            Row::new(10, vec![Value::Int64(1), Value::String("X".into())]),
248            Row::new(11, vec![Value::Int64(2), Value::String("Y".into())]),
249            Row::new(12, vec![Value::Int64(4), Value::String("Z".into())]),
250        ];
251
252        let left = Relation::from_rows_owned(left_rows, vec!["left".into()]);
253        let right = Relation::from_rows_owned(right_rows, vec!["right".into()]);
254
255        let join = HashJoin::inner(0, 0);
256        let result = join.execute(left, right);
257
258        // Should match on keys 1 and 2
259        assert_eq!(result.len(), 2);
260    }
261
262    #[test]
263    fn test_hash_join_left_outer() {
264        let left_rows = vec![
265            Row::new(0, vec![Value::Int64(1)]),
266            Row::new(1, vec![Value::Int64(2)]),
267            Row::new(2, vec![Value::Int64(3)]),
268        ];
269        let right_rows = vec![
270            Row::new(10, vec![Value::Int64(1)]),
271            Row::new(11, vec![Value::Int64(4)]),
272        ];
273
274        let left = Relation::from_rows_owned(left_rows, vec!["left".into()]);
275        let right = Relation::from_rows_owned(right_rows, vec!["right".into()]);
276
277        let join = HashJoin::left_outer(0, 0);
278        let result = join.execute(left, right);
279
280        // Should have 3 rows: 1 match + 2 unmatched left rows with nulls
281        assert_eq!(result.len(), 3);
282    }
283
284    #[test]
285    fn test_hash_join_function() {
286        let left = vec![(1, "A"), (2, "B"), (3, "C")];
287        let right = vec![(1, "X"), (2, "Y"), (4, "Z")];
288
289        let result = hash_join(
290            &left,
291            &right,
292            |l| l.0,
293            |r| r.0,
294            |l, r| (l.1, r.1),
295        );
296
297        assert_eq!(result.len(), 2);
298        assert!(result.contains(&("A", "X")));
299        assert!(result.contains(&("B", "Y")));
300    }
301
302    #[test]
303    fn test_hash_join_with_nulls() {
304        let left_rows = vec![
305            Row::new(0, vec![Value::Int64(1)]),
306            Row::new(1, vec![Value::Null]),
307        ];
308        let right_rows = vec![
309            Row::new(10, vec![Value::Int64(1)]),
310            Row::new(11, vec![Value::Null]),
311        ];
312
313        let left = Relation::from_rows_owned(left_rows, vec!["left".into()]);
314        let right = Relation::from_rows_owned(right_rows, vec!["right".into()]);
315
316        let join = HashJoin::inner(0, 0);
317        let result = join.execute(left, right);
318
319        // NULL values should not match
320        assert_eq!(result.len(), 1);
321    }
322}