Skip to main content

hyperi_rustlib/worker/
pool.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/pool.rs
3// Purpose:   Rayon pool + semaphore management, process_batch(), fan_out_async()
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use std::sync::Arc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::Instant;
12
13use parking_lot::RwLock;
14use rayon::ThreadPool;
15
16use super::config::WorkerPoolConfig;
17
18/// Adaptive worker pool with hybrid rayon (CPU) + tokio (async I/O) execution.
19///
20/// Provides two APIs:
21/// - [`process_batch`](Self::process_batch) -- CPU-bound work via rayon
22///   (JSON parsing, transforms, compression, CEL evaluation)
23/// - [`fan_out_async`](Self::fan_out_async) -- async I/O via tokio
24///   (enrichment, external APIs, storage writes)
25///
26/// The pool auto-scales active threads based on CPU/memory pressure using
27/// watermark bands. All thresholds are config-cascade overridable and emitted
28/// as gauge metrics.
29pub struct AdaptiveWorkerPool {
30    pub(crate) config: Arc<RwLock<WorkerPoolConfig>>,
31    rayon_pool: ThreadPool,
32    pub(crate) semaphore: Arc<Semaphore>,
33    #[cfg(feature = "memory")]
34    pub(crate) memory_guard: parking_lot::Mutex<Option<Arc<crate::memory::MemoryGuard>>>,
35    #[cfg(feature = "scaling")]
36    pub(crate) scaling_pressure: parking_lot::Mutex<Option<Arc<crate::scaling::ScalingPressure>>>,
37}
38
39/// Counting semaphore for throttling rayon thread usage.
40///
41/// Rayon pools cannot be resized, so we use a semaphore to control how many
42/// threads actively pick up work. Threads that cannot acquire a permit sleep
43/// on [`std::thread::yield_now`].
44pub(crate) struct Semaphore {
45    permits: AtomicUsize,
46    max_permits: usize,
47}
48
49impl Semaphore {
50    fn new(initial_permits: usize, max_permits: usize) -> Self {
51        Self {
52            permits: AtomicUsize::new(initial_permits),
53            max_permits,
54        }
55    }
56
57    /// Acquire a permit (blocking). Returns a guard that releases on drop.
58    fn acquire(&self) -> SemaphoreGuard<'_> {
59        let start = Instant::now();
60        loop {
61            let current = self.permits.load(Ordering::Acquire);
62            if current > 0
63                && self
64                    .permits
65                    .compare_exchange_weak(
66                        current,
67                        current - 1,
68                        Ordering::AcqRel,
69                        Ordering::Relaxed,
70                    )
71                    .is_ok()
72            {
73                return SemaphoreGuard {
74                    semaphore: self,
75                    wait_duration: start.elapsed(),
76                };
77            }
78            std::thread::yield_now();
79        }
80    }
81
82    /// Set the number of available permits (called by scaler).
83    pub(crate) fn set_permits(&self, count: usize) {
84        let clamped = count.min(self.max_permits);
85        self.permits.store(clamped, Ordering::Release);
86    }
87
88    /// Current number of available (unacquired) permits.
89    pub(crate) fn available_permits(&self) -> usize {
90        self.permits.load(Ordering::Relaxed)
91    }
92}
93
94struct SemaphoreGuard<'a> {
95    semaphore: &'a Semaphore,
96    #[allow(dead_code)]
97    wait_duration: std::time::Duration,
98}
99
100impl Drop for SemaphoreGuard<'_> {
101    fn drop(&mut self) {
102        // Release the permit, but never let the available count exceed
103        // the current cap. `set_permits` (called by the scaler when
104        // shrinking) writes a smaller value while N guards may still be
105        // outstanding; if each unconditionally added 1 on drop, the
106        // post-drain `available` would overshoot the new cap and admit
107        // too much work.
108        //
109        // CAS loop reads the current available count, adds 1 clamped to
110        // max_permits, and retries on contention with another dropping
111        // guard.
112        let max = self.semaphore.max_permits;
113        let mut cur = self.semaphore.permits.load(Ordering::Acquire);
114        loop {
115            let new = (cur + 1).min(max);
116            match self.semaphore.permits.compare_exchange_weak(
117                cur,
118                new,
119                Ordering::AcqRel,
120                Ordering::Acquire,
121            ) {
122                Ok(_) => return,
123                Err(actual) => cur = actual,
124            }
125        }
126    }
127}
128
129impl AdaptiveWorkerPool {
130    /// Create a new worker pool with the given configuration.
131    ///
132    /// Resolves `max_threads = 0` to the detected CPU count.
133    /// Creates a fixed rayon thread pool and a semaphore starting at `min_threads`.
134    #[must_use]
135    pub fn new(config: WorkerPoolConfig) -> Self {
136        let mut resolved = config;
137        resolved.resolve_max_threads();
138
139        let max_threads = resolved.max_threads;
140        let min_threads = resolved.min_threads;
141
142        let rayon_pool = rayon::ThreadPoolBuilder::new()
143            .num_threads(max_threads)
144            .thread_name(|i| format!("worker-{i}"))
145            .build()
146            .expect("Failed to create rayon thread pool");
147
148        let semaphore = Arc::new(Semaphore::new(min_threads, max_threads));
149
150        Self {
151            config: Arc::new(RwLock::new(resolved)),
152            rayon_pool,
153            semaphore,
154            #[cfg(feature = "memory")]
155            memory_guard: parking_lot::Mutex::new(None),
156            #[cfg(feature = "scaling")]
157            scaling_pressure: parking_lot::Mutex::new(None),
158        }
159    }
160
161    /// Create a new worker pool from the config cascade.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the config cascade is not initialised or validation fails.
166    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
167        let config = WorkerPoolConfig::from_cascade(key)?;
168        Ok(Self::new(config))
169    }
170
171    /// Process a batch of items in parallel using rayon (CPU-bound work).
172    ///
173    /// Each item is processed by the provided closure on a rayon worker thread.
174    /// A semaphore limits how many threads are active simultaneously (controlled
175    /// by the scaling controller). Results are returned in input order.
176    ///
177    /// Use this for: JSON parsing, transforms, compression, CEL evaluation, routing.
178    /// Do NOT use for work that needs `.await` -- use [`fan_out_async`](Self::fan_out_async).
179    pub fn process_batch<T, R, E, F>(&self, items: &[T], f: F) -> Vec<Result<R, E>>
180    where
181        T: Sync,
182        R: Send,
183        E: Send,
184        F: Fn(&T) -> Result<R, E> + Sync,
185    {
186        let sem = &self.semaphore;
187        self.rayon_pool.install(|| {
188            use rayon::prelude::*;
189            items
190                .par_iter()
191                .map(|item| {
192                    let _permit = sem.acquire();
193                    f(item)
194                })
195                .collect()
196        })
197    }
198
199    /// Fan out async work across tokio tasks with bounded concurrency.
200    ///
201    /// Each item is processed by the provided async closure on a tokio task.
202    /// Concurrency is limited by `async_concurrency` config.
203    ///
204    /// # Return contract
205    ///
206    /// The returned `Vec` has the same length as `items` and entries
207    /// correspond by index (input-order preserved):
208    ///
209    /// - `Some(Ok(r))` -- task completed successfully with result `r`
210    /// - `Some(Err(e))` -- task returned `Err(e)`
211    /// - `None` -- task panicked; the panic was logged at `error` level
212    ///   with the input index. The wrapping `Option` exists so the
213    ///   panic doesn't silently shorten the result vector (which was
214    ///   the previous behaviour and violated the input-order contract).
215    ///
216    /// Use this for: enrichment lookups, external API calls, storage writes.
217    pub async fn fan_out_async<T, R, E, F, Fut>(
218        &self,
219        items: &[T],
220        f: F,
221    ) -> Vec<Option<Result<R, E>>>
222    where
223        T: Sync + Send,
224        R: Send + 'static,
225        E: Send + 'static,
226        F: Fn(&T) -> Fut + Send + Sync,
227        Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
228    {
229        let concurrency = self.config.read().async_concurrency;
230        let mut results: Vec<Option<Result<R, E>>> = (0..items.len()).map(|_| None).collect();
231
232        // Process in chunks of `concurrency` to limit in-flight tasks
233        for chunk_start in (0..items.len()).step_by(concurrency) {
234            let chunk_end = (chunk_start + concurrency).min(items.len());
235            let mut handles = Vec::with_capacity(chunk_end - chunk_start);
236
237            for (idx, item) in items
238                .iter()
239                .enumerate()
240                .skip(chunk_start)
241                .take(chunk_end - chunk_start)
242            {
243                let fut = f(item);
244                handles.push((idx, tokio::spawn(fut)));
245            }
246
247            for (idx, handle) in handles {
248                match handle.await {
249                    Ok(result) => results[idx] = Some(result),
250                    Err(join_err) => {
251                        // Leave results[idx] = None; caller can detect
252                        // the panic without shrinking the output vec.
253                        tracing::error!(error = %join_err, idx, "fan_out_async task panicked");
254                    }
255                }
256            }
257        }
258
259        results
260    }
261
262    /// Execute a closure on the rayon thread pool.
263    ///
264    /// Provides direct access to the rayon pool for operations that need
265    /// `par_iter_mut` or other rayon primitives not covered by `process_batch`.
266    /// The semaphore is NOT applied -- callers manage their own concurrency.
267    ///
268    /// Used by `BatchEngine` for the mutable transform phase.
269    pub fn install<R: Send>(&self, f: impl FnOnce() -> R + Send) -> R {
270        self.rayon_pool.install(f)
271    }
272
273    /// Register worker pool metrics with the `MetricsManager`.
274    ///
275    /// Registers operational metrics and emits threshold gauges with current values.
276    /// Call this once during startup after creating the pool.
277    pub fn register_metrics(&self, manager: &crate::metrics::MetricsManager) {
278        let config = self.config.read();
279        super::metrics::register(manager, &config);
280    }
281
282    /// Start the background scaling controller.
283    ///
284    /// The controller samples CPU/memory every `scale_interval_secs` and adjusts
285    /// the semaphore permits based on watermark bands. Stops on cancellation.
286    pub fn start_scaling_loop(self: &Arc<Self>, cancel: tokio_util::sync::CancellationToken) {
287        let controller = super::scaler::ScalingController::new(self.clone());
288        tokio::spawn(controller.run(cancel));
289    }
290
291    /// Attach a `MemoryGuard` for dual-source memory pressure reading.
292    #[cfg(feature = "memory")]
293    pub fn set_memory_guard(&self, guard: Arc<crate::memory::MemoryGuard>) {
294        *self.memory_guard.lock() = Some(guard);
295    }
296
297    /// Attach a `ScalingPressure` for bidirectional pressure integration.
298    #[cfg(feature = "scaling")]
299    pub fn set_scaling_pressure(&self, pressure: Arc<crate::scaling::ScalingPressure>) {
300        *self.scaling_pressure.lock() = Some(pressure);
301    }
302
303    /// Current number of active worker threads (permits in use).
304    #[must_use]
305    pub fn active_threads(&self) -> usize {
306        let cfg = self.config.read();
307        cfg.max_threads
308            .saturating_sub(self.semaphore.available_permits())
309    }
310
311    /// Maximum thread count (pool size).
312    #[must_use]
313    pub fn max_threads(&self) -> usize {
314        self.config.read().max_threads
315    }
316}