vibesql_executor/select/morsel/
join.rs

1//! Morsel-driven parallel join operations.
2//!
3//! This module provides parallel hash join probe operations using morsel-driven
4//! execution with work-stealing for dynamic load balancing.
5
6use std::sync::{Arc, Mutex};
7
8use ahash::AHashMap;
9use crossbeam_deque::{Injector, Worker};
10use vibesql_storage::Row;
11use vibesql_types::SqlValue;
12
13use super::{config::MorselConfig, create_morsels, morsel_debug_enabled, steal_morsel, Morsel};
14
15/// Thread-safe container for collecting join results (row index pairs)
16type JoinPairResults = Arc<Mutex<Vec<(usize, Vec<(usize, usize)>)>>>;
17
18/// Morsel-driven parallel probe for hash join with SqlValue keys.
19///
20/// Probes rows against a hash table using work-stealing for dynamic load balancing.
21/// Returns index pairs (build_idx, probe_idx) for matched rows.
22///
23/// # Arguments
24///
25/// - `probe_rows`: Rows to probe against the hash table
26/// - `probe_col_idx`: Column index to use as the probe key
27/// - `hash_table`: Hash table mapping SqlValue keys to build row indices
28/// - `config`: Morsel configuration
29///
30/// # Returns
31///
32/// Vector of (build_idx, probe_idx) pairs for matched rows.
33pub fn morsel_parallel_probe_sqlvalue(
34    probe_rows: &[Row],
35    probe_col_idx: usize,
36    hash_table: &AHashMap<SqlValue, Vec<usize>>,
37    config: &MorselConfig,
38) -> Vec<(usize, usize)> {
39    if probe_rows.is_empty() {
40        return Vec::new();
41    }
42
43    // Use join-probe-specific morsel size
44    let morsel_size = config.join_probe_size;
45
46    // For small datasets, process directly
47    if probe_rows.len() < morsel_size {
48        let mut pairs = Vec::with_capacity(probe_rows.len());
49        for (probe_idx, probe_row) in probe_rows.iter().enumerate() {
50            let key = &probe_row.values[probe_col_idx];
51            if *key == SqlValue::Null {
52                continue;
53            }
54            if let Some(build_indices) = hash_table.get(key) {
55                for &build_idx in build_indices {
56                    pairs.push((build_idx, probe_idx));
57                }
58            }
59        }
60        return pairs;
61    }
62
63    // Create morsels
64    let morsels = create_morsels(probe_rows.len(), morsel_size);
65    let morsel_count = morsels.len();
66
67    if morsel_debug_enabled() {
68        eprintln!(
69            "[MORSEL] Probe: {} morsels for {} rows (size={})",
70            morsel_count,
71            probe_rows.len(),
72            morsel_size
73        );
74    }
75
76    // Create global injector queue
77    let injector: Injector<Morsel> = Injector::new();
78    for morsel in morsels {
79        injector.push(morsel);
80    }
81
82    // Results storage shared across threads
83    let results: JoinPairResults = Arc::new(Mutex::new(Vec::with_capacity(morsel_count)));
84
85    // Process morsels in parallel
86    rayon::scope(|s| {
87        let num_threads = rayon::current_num_threads();
88
89        for _ in 0..num_threads {
90            let injector_ref = &injector;
91            let results_ref = results.clone();
92
93            s.spawn(move |_| {
94                let worker: Worker<Morsel> = Worker::new_fifo();
95                let mut local_pairs = Vec::with_capacity(morsel_size);
96
97                while let Some(m) = steal_morsel(injector_ref, &worker) {
98                    let start_idx = m.start_idx();
99                    let morsel_rows = m.rows(probe_rows);
100                    local_pairs.clear();
101
102                    for (local_idx, probe_row) in morsel_rows.iter().enumerate() {
103                        let probe_idx = start_idx + local_idx;
104                        let key = &probe_row.values[probe_col_idx];
105                        if *key == SqlValue::Null {
106                            continue;
107                        }
108                        if let Some(build_indices) = hash_table.get(key) {
109                            for &build_idx in build_indices {
110                                local_pairs.push((build_idx, probe_idx));
111                            }
112                        }
113                    }
114
115                    if !local_pairs.is_empty() {
116                        results_ref.lock().unwrap().push((start_idx, local_pairs.clone()));
117                    }
118                }
119            });
120        }
121    });
122
123    // Extract results after scope completes
124    let mut sorted_results = Arc::try_unwrap(results)
125        .expect("all threads should have completed")
126        .into_inner()
127        .expect("mutex not poisoned");
128
129    // Sort by start index to maintain order, then flatten
130    sorted_results.sort_by_key(|(start_idx, _)| *start_idx);
131
132    let total: usize = sorted_results.iter().map(|(_, pairs)| pairs.len()).sum();
133    let mut final_pairs = Vec::with_capacity(total);
134    for (_, pairs) in sorted_results {
135        final_pairs.extend(pairs);
136    }
137
138    if morsel_debug_enabled() {
139        eprintln!(
140            "[MORSEL] Probe complete: {} morsels, {} matches",
141            morsel_count,
142            final_pairs.len()
143        );
144    }
145
146    final_pairs
147}