reddb_server/storage/query/executors/parallel_scan.rs
1//! Parallel scan coordinator — Fase 4 P5 building block.
2//!
3//! Generic worker-pool helper for chunked collection scans.
4//! Mirrors PG's `nodeGather.c` + `execParallel.c` at a much
5//! smaller scale: splits an input into equal chunks, processes
6//! each chunk on a separate OS thread, and funnels results
7//! back to the caller in a single ordered Vec.
8//!
9//! The module intentionally doesn't depend on `rayon` —
10//! reddb avoids large transitive dep graphs and `rayon` is
11//! overkill for the single-query-level parallelism we need
12//! here. A plain `std::thread::spawn` + `mpsc::channel`
13//! approach gives us enough throughput for the scan sizes
14//! that matter (1k-10m rows per query).
15//!
16//! Usage pattern:
17//!
18//! ```text
19//! let out = parallel_scan(
20//! &records,
21//! num_cpus(),
22//! |chunk| chunk.iter().filter(|r| matches(r)).cloned().collect(),
23//! );
24//! ```
25//!
26//! The closure runs on each chunk; its output is a Vec that
27//! the coordinator concatenates in chunk order, preserving
28//! input ordering for the merged result.
29//!
30//! This module is **not yet wired** into any executor. The
31//! full-scan path in `runtime/query_exec/table.rs` is the
32//! obvious first call site once the planner learns to flag
33//! scans as "parallel-eligible" (size > threshold, no
34//! side-effecting filters).
35//!
36//! ## Parallelism decisions
37//!
38//! Parallelism overhead dominates at small sizes — thread
39//! spawn + channel setup is ~1 µs per thread. The coordinator
40//! falls back to sequential execution when:
41//!
42//! - `chunk_count == 1` (no parallelism available)
43//! - Input length < `min_parallel_rows` (tuned constant,
44//! default 4096)
45//!
46//! Above the threshold the input is sliced into roughly
47//! `chunk_count` equal pieces and each piece ships to a
48//! worker thread. The main thread joins and concatenates.
49
50use std::sync::mpsc;
51use std::thread;
52
53/// Minimum input length below which parallel execution falls
54/// back to sequential. Tuned so the overhead of thread spawn
55/// + channel setup (~3-5 µs) doesn't dominate the filter cost.
56pub const MIN_PARALLEL_ROWS: usize = 4096;
57
58/// Execute `worker` across `input` in parallel using
59/// `chunk_count` threads. Preserves input order in the
60/// output: chunk 0's results come first, chunk 1's next, etc.
61///
62/// - `T` — the input row type. Must be `Send + Sync` so chunks
63/// can be passed to worker threads.
64/// - `U` — the output row type from a single worker. Must be
65/// `Send`.
66/// - `F` — the worker closure. Must be `Send + Sync + 'static`
67/// and take a slice reference to the chunk.
68///
69/// Falls back to sequential execution when the input is
70/// smaller than `MIN_PARALLEL_ROWS` or when `chunk_count <= 1`.
71pub fn parallel_scan<T, U, F>(input: &[T], chunk_count: usize, worker: F) -> Vec<U>
72where
73 T: Send + Sync + Clone + 'static,
74 U: Send + 'static,
75 F: Fn(&[T]) -> Vec<U> + Send + Sync + 'static,
76{
77 if chunk_count <= 1 || input.len() < MIN_PARALLEL_ROWS {
78 return worker(input);
79 }
80
81 // Divide the input into roughly-equal slices. Slice
82 // boundaries may not fall on clean chunk_size multiples
83 // so the last chunk absorbs the remainder.
84 let chunk_size = input.len().div_ceil(chunk_count);
85 let mut chunks: Vec<Vec<T>> = Vec::with_capacity(chunk_count);
86 let mut idx = 0;
87 while idx < input.len() {
88 let end = (idx + chunk_size).min(input.len());
89 chunks.push(input[idx..end].to_vec());
90 idx = end;
91 }
92
93 // Spawn one worker per chunk. Each worker sends its
94 // output through the mpsc channel along with its chunk
95 // index so the main thread can concatenate in order.
96 let (tx, rx) = mpsc::channel();
97 let worker = std::sync::Arc::new(worker);
98 let mut handles = Vec::with_capacity(chunks.len());
99 for (chunk_idx, chunk) in chunks.into_iter().enumerate() {
100 let tx = tx.clone();
101 let worker = worker.clone();
102 let handle = thread::spawn(move || {
103 let result = worker(&chunk);
104 // Send (chunk_idx, result) so the coordinator can
105 // put the results back in order. We ignore errors
106 // because the receiver side outlives every sender
107 // — if it's gone, drop silently.
108 let _ = tx.send((chunk_idx, result));
109 });
110 handles.push(handle);
111 }
112 // Drop the parent tx so the channel closes when the last
113 // worker finishes.
114 drop(tx);
115
116 // Collect all results into an indexed Vec, then flatten
117 // in chunk order.
118 let mut indexed: Vec<Option<Vec<U>>> = (0..handles.len()).map(|_| None).collect();
119 while let Ok((idx, result)) = rx.recv() {
120 indexed[idx] = Some(result);
121 }
122 // Join every worker so panics propagate deterministically.
123 for handle in handles {
124 let _ = handle.join();
125 }
126
127 // Flatten in chunk order.
128 let mut out: Vec<U> = Vec::new();
129 for chunk_result in indexed.into_iter().flatten() {
130 out.extend(chunk_result);
131 }
132 out
133}
134
135/// Count-only variant of `parallel_scan` that avoids
136/// materialising the intermediate Vec. The worker returns
137/// a usize (number of matching rows in its chunk), and the
138/// coordinator sums them.
139///
140/// Used by `SELECT COUNT(*) FROM t WHERE filter` where the
141/// full row payload is irrelevant.
142pub fn parallel_count<T, F>(input: &[T], chunk_count: usize, counter: F) -> u64
143where
144 T: Send + Sync + Clone + 'static,
145 F: Fn(&[T]) -> u64 + Send + Sync + 'static,
146{
147 if chunk_count <= 1 || input.len() < MIN_PARALLEL_ROWS {
148 return counter(input);
149 }
150 let chunk_size = input.len().div_ceil(chunk_count);
151 let mut chunks: Vec<Vec<T>> = Vec::with_capacity(chunk_count);
152 let mut idx = 0;
153 while idx < input.len() {
154 let end = (idx + chunk_size).min(input.len());
155 chunks.push(input[idx..end].to_vec());
156 idx = end;
157 }
158 let (tx, rx) = mpsc::channel();
159 let counter = std::sync::Arc::new(counter);
160 let mut handles = Vec::with_capacity(chunks.len());
161 for chunk in chunks {
162 let tx = tx.clone();
163 let counter = counter.clone();
164 let handle = thread::spawn(move || {
165 let n = counter(&chunk);
166 let _ = tx.send(n);
167 });
168 handles.push(handle);
169 }
170 drop(tx);
171 let mut total = 0u64;
172 while let Ok(n) = rx.recv() {
173 total += n;
174 }
175 for handle in handles {
176 let _ = handle.join();
177 }
178 total
179}
180
181/// Number of worker threads to use by default. Currently
182/// clamped to `num_cpus().min(8)` — more than 8 workers for
183/// a single query tends to thrash the buffer pool.
184pub fn default_parallelism() -> usize {
185 std::thread::available_parallelism()
186 .map(|n| n.get().min(8))
187 .unwrap_or(1)
188}
189
190/// Phase 3.4 wiring entry point. Calls `parallel_scan` with
191/// `default_parallelism()` worker count. Used by the runtime
192/// scan executor when its planner cost model decides parallel
193/// is profitable (input size > MIN_PARALLEL_ROWS). Saves the
194/// caller from manually threading the worker count through.
195pub fn parallel_scan_default<T, U, F>(input: &[T], worker: F) -> Vec<U>
196where
197 T: Send + Sync + Clone + 'static,
198 U: Send + 'static,
199 F: Fn(&[T]) -> Vec<U> + Send + Sync + 'static,
200{
201 parallel_scan(input, default_parallelism(), worker)
202}
203
204/// Phase 3.4 wiring entry point for COUNT(*) over a filtered
205/// scan. Same default parallelism as `parallel_scan_default`.
206pub fn parallel_count_default<T, F>(input: &[T], counter: F) -> u64
207where
208 T: Send + Sync + Clone + 'static,
209 F: Fn(&[T]) -> u64 + Send + Sync + 'static,
210{
211 parallel_count(input, default_parallelism(), counter)
212}
213
214// ──────── Issue #768 / S9 — pull-based scan iterator ────────
215
216/// Default number of rows processed per pulled batch. Sized so the
217/// per-batch working set stays well under one engine page worth of
218/// row payload for typical row widths, while amortising the
219/// per-batch closure-call overhead. The streaming chunk producer
220/// (S1) re-buffers these into page-aligned wire chunks downstream,
221/// so this is purely the executor's internal pull granularity.
222pub const DEFAULT_SCAN_BATCH_ROWS: usize = 256;
223
224/// Lazily-evaluated, pull-based counterpart to [`parallel_scan`].
225///
226/// Where [`parallel_scan`] eagerly processes the whole input and
227/// concatenates every worker's output into a single `Vec<U>`,
228/// `ScanBatches` holds a borrow of the input and yields **one
229/// processed batch at a time** on demand. Only the rows of the
230/// current batch are materialised; the caller (the S1
231/// [`ChunkProducer`](crate::server::output_stream::ChunkProducer))
232/// drains each batch into the wire buffer before the next batch is
233/// pulled, so server-side memory tracks the chunk-buffer working
234/// set rather than the full result-set cardinality.
235///
236/// Ordering is preserved: batch *k* covers input rows
237/// `[k·batch_rows, (k+1)·batch_rows)`, so flattening the yielded
238/// batches reproduces the exact order (and contents) of
239/// `parallel_scan`'s `Vec<U>` — this is the parity contract the S9
240/// golden tests assert.
241///
242/// Parallelism note: the eager [`parallel_scan`] spreads work
243/// across worker threads, which is profitable for a one-shot
244/// collect. A pull-based scan that must yield rows *in order* to a
245/// single downstream consumer is inherently sequential at the
246/// boundary, so this iterator runs the worker on the consumer's
247/// thread. Bounded read-ahead parallelism (a Gather-style sync
248/// channel) is a future enhancement; it does not change this wire
249/// contract.
250pub struct ScanBatches<'a, T, U, F>
251where
252 F: Fn(&[T]) -> Vec<U>,
253{
254 input: &'a [T],
255 cursor: usize,
256 batch_rows: usize,
257 worker: F,
258 _marker: std::marker::PhantomData<fn() -> U>,
259}
260
261impl<'a, T, U, F> Iterator for ScanBatches<'a, T, U, F>
262where
263 F: Fn(&[T]) -> Vec<U>,
264{
265 type Item = Vec<U>;
266
267 fn next(&mut self) -> Option<Vec<U>> {
268 if self.cursor >= self.input.len() {
269 return None;
270 }
271 let end = (self.cursor + self.batch_rows).min(self.input.len());
272 let batch = &self.input[self.cursor..end];
273 self.cursor = end;
274 Some((self.worker)(batch))
275 }
276}
277
278/// Construct a pull-based [`ScanBatches`] over `input`, applying
279/// `worker` to each `batch_rows`-sized slice on demand. A
280/// `batch_rows` of 0 is clamped to 1 so the iterator always makes
281/// progress.
282pub fn parallel_scan_stream<T, U, F>(
283 input: &[T],
284 batch_rows: usize,
285 worker: F,
286) -> ScanBatches<'_, T, U, F>
287where
288 F: Fn(&[T]) -> Vec<U>,
289{
290 ScanBatches {
291 input,
292 cursor: 0,
293 batch_rows: batch_rows.max(1),
294 worker,
295 _marker: std::marker::PhantomData,
296 }
297}
298
299/// Per-row flattening helper over [`parallel_scan_stream`]. Yields
300/// one `U` at a time — the natural shape for a record-at-a-time
301/// streaming driver — while keeping the same lazy, bounded-memory
302/// pull semantics (at most one batch is materialised at a time).
303pub fn parallel_scan_rows<'a, T, U, F>(
304 input: &'a [T],
305 batch_rows: usize,
306 worker: F,
307) -> impl Iterator<Item = U> + 'a
308where
309 T: 'a,
310 U: 'a,
311 F: Fn(&[T]) -> Vec<U> + 'a,
312{
313 parallel_scan_stream(input, batch_rows, worker).flat_map(|batch| batch.into_iter())
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319
320 fn copy_worker(chunk: &[u64]) -> Vec<u64> {
321 chunk.to_vec()
322 }
323
324 #[test]
325 fn scan_stream_yields_batches_in_order_and_matches_eager_collect() {
326 // Acceptance #3 / #5: parity with the materialising path on a
327 // small fixture — flattening the pulled batches reproduces the
328 // exact Vec `parallel_scan` would have built.
329 let input: Vec<u64> = (0..1000).collect();
330 let eager = parallel_scan(&input, default_parallelism(), copy_worker);
331 let streamed: Vec<u64> = parallel_scan_rows(&input, 64, copy_worker).collect();
332 assert_eq!(eager, streamed);
333 assert_eq!(streamed, input);
334 }
335
336 #[test]
337 fn scan_stream_applies_filter_worker_with_parity() {
338 // A filtering worker (the WHERE-clause shape) must stream the
339 // same surviving rows, in the same order, as the eager path.
340 let input: Vec<u64> = (0..500).collect();
341 let even =
342 |chunk: &[u64]| -> Vec<u64> { chunk.iter().copied().filter(|n| n % 2 == 0).collect() };
343 let eager = parallel_scan(&input, default_parallelism(), even);
344 let streamed: Vec<u64> = parallel_scan_rows(&input, 16, even).collect();
345 assert_eq!(eager, streamed);
346 assert!(streamed.iter().all(|n| n % 2 == 0));
347 }
348
349 #[test]
350 fn scan_stream_batch_rows_zero_is_clamped_to_one() {
351 let input: Vec<u64> = (0..5).collect();
352 let batches: Vec<Vec<u64>> = parallel_scan_stream(&input, 0, copy_worker).collect();
353 assert_eq!(batches.len(), 5, "batch_rows 0 must clamp to 1 row/batch");
354 assert_eq!(batches.concat(), input);
355 }
356
357 #[test]
358 fn scan_stream_materialises_at_most_one_batch_at_a_time() {
359 // Acceptance #1: bounded memory. The worker asserts it is never
360 // handed more than `batch_rows` rows, so no call path can
361 // smuggle the full input through in one materialised slice.
362 let input: Vec<u64> = (0..10_000).collect();
363 const BATCH: usize = 128;
364 let bounded = |chunk: &[u64]| -> Vec<u64> {
365 assert!(
366 chunk.len() <= BATCH,
367 "worker saw {} rows, exceeding batch cap {BATCH}",
368 chunk.len()
369 );
370 chunk.to_vec()
371 };
372 let total: usize = parallel_scan_rows(&input, BATCH, bounded).count();
373 assert_eq!(total, input.len());
374 }
375}