Skip to main content

ad_plugins/
par_util.rs

1#[cfg(feature = "parallel")]
2use std::sync::OnceLock;
3#[cfg(feature = "parallel")]
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6/// Minimum element count to justify rayon overhead.
7pub const PAR_THRESHOLD: usize = 4096;
8
9/// Number of CPU cores reserved for driver threads, tokio runtime, etc.
10/// The rayon pool will use `available_cores - RESERVED_CORES` threads (minimum 1).
11const RESERVED_CORES: usize = 2;
12
13/// Returns true if the data size warrants parallel processing.
14pub fn should_parallelize(num_elements: usize) -> bool {
15    num_elements >= PAR_THRESHOLD
16}
17
18/// Shared rayon ThreadPool.
19///
20/// Plugins in non-blocking mode each have their own data thread, so multiple
21/// plugins may submit rayon work concurrently. A single shared pool ensures
22/// work-stealing without over-subscription.
23///
24/// The pool is sized to `available_cores - RESERVED_CORES` to leave headroom
25/// for port driver data threads, autoconnect tasks, and the tokio runtime.
26/// Call [`set_num_threads`] before the first `thread_pool()` access to override.
27#[cfg(feature = "parallel")]
28static POOL: OnceLock<rayon::ThreadPool> = OnceLock::new();
29
30/// User-specified thread count override. 0 means "use default formula".
31#[cfg(feature = "parallel")]
32static NUM_THREADS_OVERRIDE: AtomicUsize = AtomicUsize::new(0);
33
34/// Set the number of rayon worker threads before the pool is first used.
35///
36/// Must be called before any plugin processes an array. Has no effect if the
37/// pool has already been initialized.
38#[cfg(feature = "parallel")]
39pub fn set_num_threads(n: usize) {
40    NUM_THREADS_OVERRIDE.store(n, Ordering::Relaxed);
41}
42
43#[cfg(feature = "parallel")]
44pub fn thread_pool() -> &'static rayon::ThreadPool {
45    POOL.get_or_init(|| {
46        let user = NUM_THREADS_OVERRIDE.load(Ordering::Relaxed);
47        let num_threads = if user > 0 {
48            user
49        } else {
50            let available = std::thread::available_parallelism()
51                .map(|n| n.get())
52                .unwrap_or(1);
53            available.saturating_sub(RESERVED_CORES).max(1)
54        };
55        rayon::ThreadPoolBuilder::new()
56            .num_threads(num_threads)
57            .build()
58            .expect("failed to create rayon thread pool")
59    })
60}