reddb_server/storage/query/executors/parallel_scan.rs
1//! Parallel scan coordinator — Fase 4 P5 building block.
2//!
3//! Generic worker-pool helper for chunked collection scans.
4//! Mirrors PG's `nodeGather.c` + `execParallel.c` at a much
5//! smaller scale: splits an input into equal chunks, processes
6//! each chunk on a separate OS thread, and funnels results
7//! back to the caller in a single ordered Vec.
8//!
9//! The module intentionally doesn't depend on `rayon` —
10//! reddb avoids large transitive dep graphs and `rayon` is
11//! overkill for the single-query-level parallelism we need
12//! here. A plain `std::thread::spawn` + `mpsc::channel`
13//! approach gives us enough throughput for the scan sizes
14//! that matter (1k-10m rows per query).
15//!
16//! Usage pattern:
17//!
18//! ```text
19//! let out = parallel_scan(
20//! &records,
21//! num_cpus(),
22//! |chunk| chunk.iter().filter(|r| matches(r)).cloned().collect(),
23//! );
24//! ```
25//!
26//! The closure runs on each chunk; its output is a Vec that
27//! the coordinator concatenates in chunk order, preserving
28//! input ordering for the merged result.
29//!
30//! This module is **not yet wired** into any executor. The
31//! full-scan path in `runtime/query_exec/table.rs` is the
32//! obvious first call site once the planner learns to flag
33//! scans as "parallel-eligible" (size > threshold, no
34//! side-effecting filters).
35//!
36//! ## Parallelism decisions
37//!
38//! Parallelism overhead dominates at small sizes — thread
39//! spawn + channel setup is ~1 µs per thread. The coordinator
40//! falls back to sequential execution when:
41//!
42//! - `chunk_count == 1` (no parallelism available)
43//! - Input length < `min_parallel_rows` (tuned constant,
44//! default 4096)
45//!
46//! Above the threshold the input is sliced into roughly
47//! `chunk_count` equal pieces and each piece ships to a
48//! worker thread. The main thread joins and concatenates.
49
50use std::sync::mpsc;
51use std::thread;
52
53/// Minimum input length below which parallel execution falls
54/// back to sequential. Tuned so the overhead of thread spawn
55/// + channel setup (~3-5 µs) doesn't dominate the filter cost.
56pub const MIN_PARALLEL_ROWS: usize = 4096;
57
58/// Execute `worker` across `input` in parallel using
59/// `chunk_count` threads. Preserves input order in the
60/// output: chunk 0's results come first, chunk 1's next, etc.
61///
62/// - `T` — the input row type. Must be `Send + Sync` so chunks
63/// can be passed to worker threads.
64/// - `U` — the output row type from a single worker. Must be
65/// `Send`.
66/// - `F` — the worker closure. Must be `Send + Sync + 'static`
67/// and take a slice reference to the chunk.
68///
69/// Falls back to sequential execution when the input is
70/// smaller than `MIN_PARALLEL_ROWS` or when `chunk_count <= 1`.
71pub fn parallel_scan<T, U, F>(input: &[T], chunk_count: usize, worker: F) -> Vec<U>
72where
73 T: Send + Sync + Clone + 'static,
74 U: Send + 'static,
75 F: Fn(&[T]) -> Vec<U> + Send + Sync + 'static,
76{
77 if chunk_count <= 1 || input.len() < MIN_PARALLEL_ROWS {
78 return worker(input);
79 }
80
81 // Divide the input into roughly-equal slices. Slice
82 // boundaries may not fall on clean chunk_size multiples
83 // so the last chunk absorbs the remainder.
84 let chunk_size = input.len().div_ceil(chunk_count);
85 let mut chunks: Vec<Vec<T>> = Vec::with_capacity(chunk_count);
86 let mut idx = 0;
87 while idx < input.len() {
88 let end = (idx + chunk_size).min(input.len());
89 chunks.push(input[idx..end].to_vec());
90 idx = end;
91 }
92
93 // Spawn one worker per chunk. Each worker sends its
94 // output through the mpsc channel along with its chunk
95 // index so the main thread can concatenate in order.
96 let (tx, rx) = mpsc::channel();
97 let worker = std::sync::Arc::new(worker);
98 let mut handles = Vec::with_capacity(chunks.len());
99 for (chunk_idx, chunk) in chunks.into_iter().enumerate() {
100 let tx = tx.clone();
101 let worker = worker.clone();
102 let handle = thread::spawn(move || {
103 let result = worker(&chunk);
104 // Send (chunk_idx, result) so the coordinator can
105 // put the results back in order. We ignore errors
106 // because the receiver side outlives every sender
107 // — if it's gone, drop silently.
108 let _ = tx.send((chunk_idx, result));
109 });
110 handles.push(handle);
111 }
112 // Drop the parent tx so the channel closes when the last
113 // worker finishes.
114 drop(tx);
115
116 // Collect all results into an indexed Vec, then flatten
117 // in chunk order.
118 let mut indexed: Vec<Option<Vec<U>>> = (0..handles.len()).map(|_| None).collect();
119 while let Ok((idx, result)) = rx.recv() {
120 indexed[idx] = Some(result);
121 }
122 // Join every worker so panics propagate deterministically.
123 for handle in handles {
124 let _ = handle.join();
125 }
126
127 // Flatten in chunk order.
128 let mut out: Vec<U> = Vec::new();
129 for chunk_result in indexed.into_iter().flatten() {
130 out.extend(chunk_result);
131 }
132 out
133}
134
135/// Count-only variant of `parallel_scan` that avoids
136/// materialising the intermediate Vec. The worker returns
137/// a usize (number of matching rows in its chunk), and the
138/// coordinator sums them.
139///
140/// Used by `SELECT COUNT(*) FROM t WHERE filter` where the
141/// full row payload is irrelevant.
142pub fn parallel_count<T, F>(input: &[T], chunk_count: usize, counter: F) -> u64
143where
144 T: Send + Sync + Clone + 'static,
145 F: Fn(&[T]) -> u64 + Send + Sync + 'static,
146{
147 if chunk_count <= 1 || input.len() < MIN_PARALLEL_ROWS {
148 return counter(input);
149 }
150 let chunk_size = input.len().div_ceil(chunk_count);
151 let mut chunks: Vec<Vec<T>> = Vec::with_capacity(chunk_count);
152 let mut idx = 0;
153 while idx < input.len() {
154 let end = (idx + chunk_size).min(input.len());
155 chunks.push(input[idx..end].to_vec());
156 idx = end;
157 }
158 let (tx, rx) = mpsc::channel();
159 let counter = std::sync::Arc::new(counter);
160 let mut handles = Vec::with_capacity(chunks.len());
161 for chunk in chunks {
162 let tx = tx.clone();
163 let counter = counter.clone();
164 let handle = thread::spawn(move || {
165 let n = counter(&chunk);
166 let _ = tx.send(n);
167 });
168 handles.push(handle);
169 }
170 drop(tx);
171 let mut total = 0u64;
172 while let Ok(n) = rx.recv() {
173 total += n;
174 }
175 for handle in handles {
176 let _ = handle.join();
177 }
178 total
179}
180
181/// Number of worker threads to use by default. Currently
182/// clamped to `num_cpus().min(8)` — more than 8 workers for
183/// a single query tends to thrash the buffer pool.
184pub fn default_parallelism() -> usize {
185 std::thread::available_parallelism()
186 .map(|n| n.get().min(8))
187 .unwrap_or(1)
188}
189
190/// Phase 3.4 wiring entry point. Calls `parallel_scan` with
191/// `default_parallelism()` worker count. Used by the runtime
192/// scan executor when its planner cost model decides parallel
193/// is profitable (input size > MIN_PARALLEL_ROWS). Saves the
194/// caller from manually threading the worker count through.
195pub fn parallel_scan_default<T, U, F>(input: &[T], worker: F) -> Vec<U>
196where
197 T: Send + Sync + Clone + 'static,
198 U: Send + 'static,
199 F: Fn(&[T]) -> Vec<U> + Send + Sync + 'static,
200{
201 parallel_scan(input, default_parallelism(), worker)
202}
203
204/// Phase 3.4 wiring entry point for COUNT(*) over a filtered
205/// scan. Same default parallelism as `parallel_scan_default`.
206pub fn parallel_count_default<T, F>(input: &[T], counter: F) -> u64
207where
208 T: Send + Sync + Clone + 'static,
209 F: Fn(&[T]) -> u64 + Send + Sync + 'static,
210{
211 parallel_count(input, default_parallelism(), counter)
212}