hyperi_rustlib/worker/pool.rs
1// Project: hyperi-rustlib
2// File: src/worker/pool.rs
3// Purpose: Rayon pool + semaphore management, process_batch(), fan_out_async()
4// Language: Rust
5//
6// License: FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use std::sync::Arc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::Instant;
12
13use parking_lot::RwLock;
14use rayon::ThreadPool;
15
16use super::config::WorkerPoolConfig;
17
18/// Adaptive worker pool with hybrid rayon (CPU) + tokio (async I/O) execution.
19///
20/// Provides two APIs:
21/// - [`process_batch`](Self::process_batch) -- CPU-bound work via rayon
22/// (JSON parsing, transforms, compression, CEL evaluation)
23/// - [`fan_out_async`](Self::fan_out_async) -- async I/O via tokio
24/// (enrichment, external APIs, storage writes)
25///
26/// The pool auto-scales active threads based on CPU/memory pressure using
27/// watermark bands. All thresholds are config-cascade overridable and emitted
28/// as gauge metrics.
29pub struct AdaptiveWorkerPool {
30 pub(crate) config: Arc<RwLock<WorkerPoolConfig>>,
31 rayon_pool: ThreadPool,
32 pub(crate) semaphore: Arc<Semaphore>,
33 #[cfg(feature = "memory")]
34 pub(crate) memory_guard: parking_lot::Mutex<Option<Arc<crate::memory::MemoryGuard>>>,
35 #[cfg(feature = "scaling")]
36 pub(crate) scaling_pressure: parking_lot::Mutex<Option<Arc<crate::scaling::ScalingPressure>>>,
37}
38
39/// Counting semaphore for throttling rayon thread usage.
40///
41/// Rayon pools cannot be resized, so we use a semaphore to control how many
42/// threads actively pick up work. Threads that cannot acquire a permit sleep
43/// on [`std::thread::yield_now`].
44pub(crate) struct Semaphore {
45 permits: AtomicUsize,
46 max_permits: usize,
47}
48
49impl Semaphore {
50 fn new(initial_permits: usize, max_permits: usize) -> Self {
51 Self {
52 permits: AtomicUsize::new(initial_permits),
53 max_permits,
54 }
55 }
56
57 /// Acquire a permit (blocking). Returns a guard that releases on drop.
58 fn acquire(&self) -> SemaphoreGuard<'_> {
59 let start = Instant::now();
60 loop {
61 let current = self.permits.load(Ordering::Acquire);
62 if current > 0
63 && self
64 .permits
65 .compare_exchange_weak(
66 current,
67 current - 1,
68 Ordering::AcqRel,
69 Ordering::Relaxed,
70 )
71 .is_ok()
72 {
73 return SemaphoreGuard {
74 semaphore: self,
75 wait_duration: start.elapsed(),
76 };
77 }
78 std::thread::yield_now();
79 }
80 }
81
82 /// Set the number of available permits (called by scaler).
83 pub(crate) fn set_permits(&self, count: usize) {
84 let clamped = count.min(self.max_permits);
85 self.permits.store(clamped, Ordering::Release);
86 }
87
88 /// Current number of available (unacquired) permits.
89 pub(crate) fn available_permits(&self) -> usize {
90 self.permits.load(Ordering::Relaxed)
91 }
92}
93
94struct SemaphoreGuard<'a> {
95 semaphore: &'a Semaphore,
96 #[allow(dead_code)]
97 wait_duration: std::time::Duration,
98}
99
100impl Drop for SemaphoreGuard<'_> {
101 fn drop(&mut self) {
102 // Release the permit, but never let the available count exceed
103 // the current cap. `set_permits` (called by the scaler when
104 // shrinking) writes a smaller value while N guards may still be
105 // outstanding; if each unconditionally added 1 on drop, the
106 // post-drain `available` would overshoot the new cap and admit
107 // too much work.
108 //
109 // CAS loop reads the current available count, adds 1 clamped to
110 // max_permits, and retries on contention with another dropping
111 // guard.
112 let max = self.semaphore.max_permits;
113 let mut cur = self.semaphore.permits.load(Ordering::Acquire);
114 loop {
115 let new = (cur + 1).min(max);
116 match self.semaphore.permits.compare_exchange_weak(
117 cur,
118 new,
119 Ordering::AcqRel,
120 Ordering::Acquire,
121 ) {
122 Ok(_) => return,
123 Err(actual) => cur = actual,
124 }
125 }
126 }
127}
128
129impl AdaptiveWorkerPool {
130 /// Create a new worker pool with the given configuration.
131 ///
132 /// Resolves `max_threads = 0` to the detected CPU count.
133 /// Creates a fixed rayon thread pool and a semaphore starting at `min_threads`.
134 #[must_use]
135 pub fn new(config: WorkerPoolConfig) -> Self {
136 let mut resolved = config;
137 resolved.resolve_max_threads();
138
139 let max_threads = resolved.max_threads;
140 let min_threads = resolved.min_threads;
141
142 let rayon_pool = rayon::ThreadPoolBuilder::new()
143 .num_threads(max_threads)
144 .thread_name(|i| format!("worker-{i}"))
145 .build()
146 .expect("Failed to create rayon thread pool");
147
148 let semaphore = Arc::new(Semaphore::new(min_threads, max_threads));
149
150 Self {
151 config: Arc::new(RwLock::new(resolved)),
152 rayon_pool,
153 semaphore,
154 #[cfg(feature = "memory")]
155 memory_guard: parking_lot::Mutex::new(None),
156 #[cfg(feature = "scaling")]
157 scaling_pressure: parking_lot::Mutex::new(None),
158 }
159 }
160
161 /// Create a new worker pool from the config cascade.
162 ///
163 /// # Errors
164 ///
165 /// Returns an error if the config cascade is not initialised or validation fails.
166 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
167 let config = WorkerPoolConfig::from_cascade(key)?;
168 Ok(Self::new(config))
169 }
170
171 /// Process a batch of items in parallel using rayon (CPU-bound work).
172 ///
173 /// Each item is processed by the provided closure on a rayon worker thread.
174 /// A semaphore limits how many threads are active simultaneously (controlled
175 /// by the scaling controller). Results are returned in input order.
176 ///
177 /// Use this for: JSON parsing, transforms, compression, CEL evaluation, routing.
178 /// Do NOT use for work that needs `.await` -- use [`fan_out_async`](Self::fan_out_async).
179 pub fn process_batch<T, R, E, F>(&self, items: &[T], f: F) -> Vec<Result<R, E>>
180 where
181 T: Sync,
182 R: Send,
183 E: Send,
184 F: Fn(&T) -> Result<R, E> + Sync,
185 {
186 let sem = &self.semaphore;
187 self.rayon_pool.install(|| {
188 use rayon::prelude::*;
189 items
190 .par_iter()
191 .map(|item| {
192 let _permit = sem.acquire();
193 f(item)
194 })
195 .collect()
196 })
197 }
198
199 /// Fan out async work across tokio tasks with bounded concurrency.
200 ///
201 /// Each item is processed by the provided async closure on a tokio task.
202 /// Concurrency is limited by `async_concurrency` config.
203 ///
204 /// # Return contract
205 ///
206 /// The returned `Vec` has the same length as `items` and entries
207 /// correspond by index (input-order preserved):
208 ///
209 /// - `Some(Ok(r))` -- task completed successfully with result `r`
210 /// - `Some(Err(e))` -- task returned `Err(e)`
211 /// - `None` -- task panicked; the panic was logged at `error` level
212 /// with the input index. The wrapping `Option` exists so the
213 /// panic doesn't silently shorten the result vector (which was
214 /// the previous behaviour and violated the input-order contract).
215 ///
216 /// Use this for: enrichment lookups, external API calls, storage writes.
217 pub async fn fan_out_async<T, R, E, F, Fut>(
218 &self,
219 items: &[T],
220 f: F,
221 ) -> Vec<Option<Result<R, E>>>
222 where
223 T: Sync + Send,
224 R: Send + 'static,
225 E: Send + 'static,
226 F: Fn(&T) -> Fut + Send + Sync,
227 Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
228 {
229 let concurrency = self.config.read().async_concurrency;
230 let mut results: Vec<Option<Result<R, E>>> = (0..items.len()).map(|_| None).collect();
231
232 // Process in chunks of `concurrency` to limit in-flight tasks
233 for chunk_start in (0..items.len()).step_by(concurrency) {
234 let chunk_end = (chunk_start + concurrency).min(items.len());
235 let mut handles = Vec::with_capacity(chunk_end - chunk_start);
236
237 for (idx, item) in items
238 .iter()
239 .enumerate()
240 .skip(chunk_start)
241 .take(chunk_end - chunk_start)
242 {
243 let fut = f(item);
244 handles.push((idx, tokio::spawn(fut)));
245 }
246
247 for (idx, handle) in handles {
248 match handle.await {
249 Ok(result) => results[idx] = Some(result),
250 Err(join_err) => {
251 // Leave results[idx] = None; caller can detect
252 // the panic without shrinking the output vec.
253 tracing::error!(error = %join_err, idx, "fan_out_async task panicked");
254 }
255 }
256 }
257 }
258
259 results
260 }
261
262 /// Execute a closure on the rayon thread pool.
263 ///
264 /// Provides direct access to the rayon pool for operations that need
265 /// `par_iter_mut` or other rayon primitives not covered by `process_batch`.
266 /// The semaphore is NOT applied -- callers manage their own concurrency.
267 ///
268 /// Used by `BatchEngine` for the mutable transform phase.
269 pub fn install<R: Send>(&self, f: impl FnOnce() -> R + Send) -> R {
270 self.rayon_pool.install(f)
271 }
272
273 /// Register worker pool metrics with the `MetricsManager`.
274 ///
275 /// Registers operational metrics and emits threshold gauges with current values.
276 /// Call this once during startup after creating the pool.
277 pub fn register_metrics(&self, manager: &crate::metrics::MetricsManager) {
278 let config = self.config.read();
279 super::metrics::register(manager, &config);
280 }
281
282 /// Start the background scaling controller.
283 ///
284 /// The controller samples CPU/memory every `scale_interval_secs` and adjusts
285 /// the semaphore permits based on watermark bands. Stops on cancellation.
286 pub fn start_scaling_loop(self: &Arc<Self>, cancel: tokio_util::sync::CancellationToken) {
287 let controller = super::scaler::ScalingController::new(self.clone());
288 tokio::spawn(controller.run(cancel));
289 }
290
291 /// Attach a `MemoryGuard` for dual-source memory pressure reading.
292 #[cfg(feature = "memory")]
293 pub fn set_memory_guard(&self, guard: Arc<crate::memory::MemoryGuard>) {
294 *self.memory_guard.lock() = Some(guard);
295 }
296
297 /// Attach a `ScalingPressure` for bidirectional pressure integration.
298 #[cfg(feature = "scaling")]
299 pub fn set_scaling_pressure(&self, pressure: Arc<crate::scaling::ScalingPressure>) {
300 *self.scaling_pressure.lock() = Some(pressure);
301 }
302
303 /// Current number of active worker threads (permits in use).
304 #[must_use]
305 pub fn active_threads(&self) -> usize {
306 let cfg = self.config.read();
307 cfg.max_threads
308 .saturating_sub(self.semaphore.available_permits())
309 }
310
311 /// Maximum thread count (pool size).
312 #[must_use]
313 pub fn max_threads(&self) -> usize {
314 self.config.read().max_threads
315 }
316}