Skip to main content

reddb_server/storage/query/executors/
parallel_scan.rs

1//! Parallel scan coordinator — Fase 4 P5 building block.
2//!
3//! Generic worker-pool helper for chunked collection scans.
4//! Mirrors PG's `nodeGather.c` + `execParallel.c` at a much
5//! smaller scale: splits an input into equal chunks, processes
6//! each chunk on a separate OS thread, and funnels results
7//! back to the caller in a single ordered Vec.
8//!
9//! The module intentionally doesn't depend on `rayon` —
10//! reddb avoids large transitive dep graphs and `rayon` is
11//! overkill for the single-query-level parallelism we need
12//! here. A plain `std::thread::spawn` + `mpsc::channel`
13//! approach gives us enough throughput for the scan sizes
14//! that matter (1k-10m rows per query).
15//!
16//! Usage pattern:
17//!
18//! ```text
19//! let out = parallel_scan(
20//!     &records,
21//!     num_cpus(),
22//!     |chunk| chunk.iter().filter(|r| matches(r)).cloned().collect(),
23//! );
24//! ```
25//!
26//! The closure runs on each chunk; its output is a Vec that
27//! the coordinator concatenates in chunk order, preserving
28//! input ordering for the merged result.
29//!
30//! This module is **not yet wired** into any executor. The
31//! full-scan path in `runtime/query_exec/table.rs` is the
32//! obvious first call site once the planner learns to flag
33//! scans as "parallel-eligible" (size > threshold, no
34//! side-effecting filters).
35//!
36//! ## Parallelism decisions
37//!
38//! Parallelism overhead dominates at small sizes — thread
39//! spawn + channel setup is ~1 µs per thread. The coordinator
40//! falls back to sequential execution when:
41//!
42//! - `chunk_count == 1` (no parallelism available)
43//! - Input length < `min_parallel_rows` (tuned constant,
44//!   default 4096)
45//!
46//! Above the threshold the input is sliced into roughly
47//! `chunk_count` equal pieces and each piece ships to a
48//! worker thread. The main thread joins and concatenates.
49
50use std::sync::mpsc;
51use std::thread;
52
53/// Minimum input length below which parallel execution falls
54/// back to sequential. Tuned so the overhead of thread spawn
55/// + channel setup (~3-5 µs) doesn't dominate the filter cost.
56pub const MIN_PARALLEL_ROWS: usize = 4096;
57
58/// Execute `worker` across `input` in parallel using
59/// `chunk_count` threads. Preserves input order in the
60/// output: chunk 0's results come first, chunk 1's next, etc.
61///
62/// - `T` — the input row type. Must be `Send + Sync` so chunks
63///   can be passed to worker threads.
64/// - `U` — the output row type from a single worker. Must be
65///   `Send`.
66/// - `F` — the worker closure. Must be `Send + Sync + 'static`
67///   and take a slice reference to the chunk.
68///
69/// Falls back to sequential execution when the input is
70/// smaller than `MIN_PARALLEL_ROWS` or when `chunk_count <= 1`.
71pub fn parallel_scan<T, U, F>(input: &[T], chunk_count: usize, worker: F) -> Vec<U>
72where
73    T: Send + Sync + Clone + 'static,
74    U: Send + 'static,
75    F: Fn(&[T]) -> Vec<U> + Send + Sync + 'static,
76{
77    if chunk_count <= 1 || input.len() < MIN_PARALLEL_ROWS {
78        return worker(input);
79    }
80
81    // Divide the input into roughly-equal slices. Slice
82    // boundaries may not fall on clean chunk_size multiples
83    // so the last chunk absorbs the remainder.
84    let chunk_size = input.len().div_ceil(chunk_count);
85    let mut chunks: Vec<Vec<T>> = Vec::with_capacity(chunk_count);
86    let mut idx = 0;
87    while idx < input.len() {
88        let end = (idx + chunk_size).min(input.len());
89        chunks.push(input[idx..end].to_vec());
90        idx = end;
91    }
92
93    // Spawn one worker per chunk. Each worker sends its
94    // output through the mpsc channel along with its chunk
95    // index so the main thread can concatenate in order.
96    let (tx, rx) = mpsc::channel();
97    let worker = std::sync::Arc::new(worker);
98    let mut handles = Vec::with_capacity(chunks.len());
99    for (chunk_idx, chunk) in chunks.into_iter().enumerate() {
100        let tx = tx.clone();
101        let worker = worker.clone();
102        let handle = thread::spawn(move || {
103            let result = worker(&chunk);
104            // Send (chunk_idx, result) so the coordinator can
105            // put the results back in order. We ignore errors
106            // because the receiver side outlives every sender
107            // — if it's gone, drop silently.
108            let _ = tx.send((chunk_idx, result));
109        });
110        handles.push(handle);
111    }
112    // Drop the parent tx so the channel closes when the last
113    // worker finishes.
114    drop(tx);
115
116    // Collect all results into an indexed Vec, then flatten
117    // in chunk order.
118    let mut indexed: Vec<Option<Vec<U>>> = (0..handles.len()).map(|_| None).collect();
119    while let Ok((idx, result)) = rx.recv() {
120        indexed[idx] = Some(result);
121    }
122    // Join every worker so panics propagate deterministically.
123    for handle in handles {
124        let _ = handle.join();
125    }
126
127    // Flatten in chunk order.
128    let mut out: Vec<U> = Vec::new();
129    for chunk_result in indexed.into_iter().flatten() {
130        out.extend(chunk_result);
131    }
132    out
133}
134
135/// Count-only variant of `parallel_scan` that avoids
136/// materialising the intermediate Vec. The worker returns
137/// a usize (number of matching rows in its chunk), and the
138/// coordinator sums them.
139///
140/// Used by `SELECT COUNT(*) FROM t WHERE filter` where the
141/// full row payload is irrelevant.
142pub fn parallel_count<T, F>(input: &[T], chunk_count: usize, counter: F) -> u64
143where
144    T: Send + Sync + Clone + 'static,
145    F: Fn(&[T]) -> u64 + Send + Sync + 'static,
146{
147    if chunk_count <= 1 || input.len() < MIN_PARALLEL_ROWS {
148        return counter(input);
149    }
150    let chunk_size = input.len().div_ceil(chunk_count);
151    let mut chunks: Vec<Vec<T>> = Vec::with_capacity(chunk_count);
152    let mut idx = 0;
153    while idx < input.len() {
154        let end = (idx + chunk_size).min(input.len());
155        chunks.push(input[idx..end].to_vec());
156        idx = end;
157    }
158    let (tx, rx) = mpsc::channel();
159    let counter = std::sync::Arc::new(counter);
160    let mut handles = Vec::with_capacity(chunks.len());
161    for chunk in chunks {
162        let tx = tx.clone();
163        let counter = counter.clone();
164        let handle = thread::spawn(move || {
165            let n = counter(&chunk);
166            let _ = tx.send(n);
167        });
168        handles.push(handle);
169    }
170    drop(tx);
171    let mut total = 0u64;
172    while let Ok(n) = rx.recv() {
173        total += n;
174    }
175    for handle in handles {
176        let _ = handle.join();
177    }
178    total
179}
180
181/// Number of worker threads to use by default. Currently
182/// clamped to `num_cpus().min(8)` — more than 8 workers for
183/// a single query tends to thrash the buffer pool.
184pub fn default_parallelism() -> usize {
185    std::thread::available_parallelism()
186        .map(|n| n.get().min(8))
187        .unwrap_or(1)
188}
189
190/// Phase 3.4 wiring entry point. Calls `parallel_scan` with
191/// `default_parallelism()` worker count. Used by the runtime
192/// scan executor when its planner cost model decides parallel
193/// is profitable (input size > MIN_PARALLEL_ROWS). Saves the
194/// caller from manually threading the worker count through.
195pub fn parallel_scan_default<T, U, F>(input: &[T], worker: F) -> Vec<U>
196where
197    T: Send + Sync + Clone + 'static,
198    U: Send + 'static,
199    F: Fn(&[T]) -> Vec<U> + Send + Sync + 'static,
200{
201    parallel_scan(input, default_parallelism(), worker)
202}
203
204/// Phase 3.4 wiring entry point for COUNT(*) over a filtered
205/// scan. Same default parallelism as `parallel_scan_default`.
206pub fn parallel_count_default<T, F>(input: &[T], counter: F) -> u64
207where
208    T: Send + Sync + Clone + 'static,
209    F: Fn(&[T]) -> u64 + Send + Sync + 'static,
210{
211    parallel_count(input, default_parallelism(), counter)
212}