ad_plugins/par_util.rs
1#[cfg(feature = "parallel")]
2use std::sync::OnceLock;
3#[cfg(feature = "parallel")]
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6/// Minimum element count to justify rayon overhead.
7pub const PAR_THRESHOLD: usize = 4096;
8
9/// Number of CPU cores reserved for driver threads, tokio runtime, etc.
10/// The rayon pool will use `available_cores - RESERVED_CORES` threads (minimum 1).
11const RESERVED_CORES: usize = 2;
12
13/// Returns true if the data size warrants parallel processing.
14pub fn should_parallelize(num_elements: usize) -> bool {
15 num_elements >= PAR_THRESHOLD
16}
17
18/// Shared rayon ThreadPool.
19///
20/// Plugins in non-blocking mode each have their own data thread, so multiple
21/// plugins may submit rayon work concurrently. A single shared pool ensures
22/// work-stealing without over-subscription.
23///
24/// The pool is sized to `available_cores - RESERVED_CORES` to leave headroom
25/// for port driver data threads, autoconnect tasks, and the tokio runtime.
26/// Call [`set_num_threads`] before the first `thread_pool()` access to override.
27#[cfg(feature = "parallel")]
28static POOL: OnceLock<rayon::ThreadPool> = OnceLock::new();
29
30/// User-specified thread count override. 0 means "use default formula".
31#[cfg(feature = "parallel")]
32static NUM_THREADS_OVERRIDE: AtomicUsize = AtomicUsize::new(0);
33
34/// Set the number of rayon worker threads before the pool is first used.
35///
36/// Must be called before any plugin processes an array. Has no effect if the
37/// pool has already been initialized.
38#[cfg(feature = "parallel")]
39pub fn set_num_threads(n: usize) {
40 NUM_THREADS_OVERRIDE.store(n, Ordering::Relaxed);
41}
42
43#[cfg(feature = "parallel")]
44pub fn thread_pool() -> &'static rayon::ThreadPool {
45 POOL.get_or_init(|| {
46 let user = NUM_THREADS_OVERRIDE.load(Ordering::Relaxed);
47 let num_threads = if user > 0 {
48 user
49 } else {
50 let available = std::thread::available_parallelism()
51 .map(|n| n.get())
52 .unwrap_or(1);
53 available.saturating_sub(RESERVED_CORES).max(1)
54 };
55 rayon::ThreadPoolBuilder::new()
56 .num_threads(num_threads)
57 .build()
58 .expect("failed to create rayon thread pool")
59 })
60}