vibesql_executor/select/morsel/
join.rs1use std::sync::{Arc, Mutex};
7
8use ahash::AHashMap;
9use crossbeam_deque::{Injector, Worker};
10use vibesql_storage::Row;
11use vibesql_types::SqlValue;
12
13use super::{config::MorselConfig, create_morsels, morsel_debug_enabled, steal_morsel, Morsel};
14
15type JoinPairResults = Arc<Mutex<Vec<(usize, Vec<(usize, usize)>)>>>;
17
18pub fn morsel_parallel_probe_sqlvalue(
34 probe_rows: &[Row],
35 probe_col_idx: usize,
36 hash_table: &AHashMap<SqlValue, Vec<usize>>,
37 config: &MorselConfig,
38) -> Vec<(usize, usize)> {
39 if probe_rows.is_empty() {
40 return Vec::new();
41 }
42
43 let morsel_size = config.join_probe_size;
45
46 if probe_rows.len() < morsel_size {
48 let mut pairs = Vec::with_capacity(probe_rows.len());
49 for (probe_idx, probe_row) in probe_rows.iter().enumerate() {
50 let key = &probe_row.values[probe_col_idx];
51 if *key == SqlValue::Null {
52 continue;
53 }
54 if let Some(build_indices) = hash_table.get(key) {
55 for &build_idx in build_indices {
56 pairs.push((build_idx, probe_idx));
57 }
58 }
59 }
60 return pairs;
61 }
62
63 let morsels = create_morsels(probe_rows.len(), morsel_size);
65 let morsel_count = morsels.len();
66
67 if morsel_debug_enabled() {
68 eprintln!(
69 "[MORSEL] Probe: {} morsels for {} rows (size={})",
70 morsel_count,
71 probe_rows.len(),
72 morsel_size
73 );
74 }
75
76 let injector: Injector<Morsel> = Injector::new();
78 for morsel in morsels {
79 injector.push(morsel);
80 }
81
82 let results: JoinPairResults = Arc::new(Mutex::new(Vec::with_capacity(morsel_count)));
84
85 rayon::scope(|s| {
87 let num_threads = rayon::current_num_threads();
88
89 for _ in 0..num_threads {
90 let injector_ref = &injector;
91 let results_ref = results.clone();
92
93 s.spawn(move |_| {
94 let worker: Worker<Morsel> = Worker::new_fifo();
95 let mut local_pairs = Vec::with_capacity(morsel_size);
96
97 while let Some(m) = steal_morsel(injector_ref, &worker) {
98 let start_idx = m.start_idx();
99 let morsel_rows = m.rows(probe_rows);
100 local_pairs.clear();
101
102 for (local_idx, probe_row) in morsel_rows.iter().enumerate() {
103 let probe_idx = start_idx + local_idx;
104 let key = &probe_row.values[probe_col_idx];
105 if *key == SqlValue::Null {
106 continue;
107 }
108 if let Some(build_indices) = hash_table.get(key) {
109 for &build_idx in build_indices {
110 local_pairs.push((build_idx, probe_idx));
111 }
112 }
113 }
114
115 if !local_pairs.is_empty() {
116 results_ref.lock().unwrap().push((start_idx, local_pairs.clone()));
117 }
118 }
119 });
120 }
121 });
122
123 let mut sorted_results = Arc::try_unwrap(results)
125 .expect("all threads should have completed")
126 .into_inner()
127 .expect("mutex not poisoned");
128
129 sorted_results.sort_by_key(|(start_idx, _)| *start_idx);
131
132 let total: usize = sorted_results.iter().map(|(_, pairs)| pairs.len()).sum();
133 let mut final_pairs = Vec::with_capacity(total);
134 for (_, pairs) in sorted_results {
135 final_pairs.extend(pairs);
136 }
137
138 if morsel_debug_enabled() {
139 eprintln!(
140 "[MORSEL] Probe complete: {} morsels, {} matches",
141 morsel_count,
142 final_pairs.len()
143 );
144 }
145
146 final_pairs
147}