hyperi_rustlib/worker/pool.rs
1// Project: hyperi-rustlib
2// File: src/worker/pool.rs
3// Purpose: Rayon pool + semaphore management, process_batch(), fan_out_async()
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use std::sync::Arc;
10
11use parking_lot::{Condvar, Mutex, RwLock};
12use rayon::ThreadPool;
13
14use super::config::WorkerPoolConfig;
15
16/// Adaptive worker pool with hybrid rayon (CPU) + tokio (async I/O) execution.
17///
18/// Provides two APIs:
19/// - [`process_batch`](Self::process_batch) -- CPU-bound work via rayon
20/// (JSON parsing, transforms, compression, CEL evaluation)
21/// - [`fan_out_async`](Self::fan_out_async) -- async I/O via tokio
22/// (enrichment, external APIs, storage writes)
23///
24/// The pool auto-scales active threads based on CPU/memory pressure using
25/// watermark bands. All thresholds are config-cascade overridable and emitted
26/// as gauge metrics.
27pub struct AdaptiveWorkerPool {
28 pub(crate) config: Arc<RwLock<WorkerPoolConfig>>,
29 rayon_pool: ThreadPool,
30 pub(crate) semaphore: Arc<Semaphore>,
31 #[cfg(feature = "memory")]
32 pub(crate) memory_guard: parking_lot::Mutex<Option<Arc<crate::memory::MemoryGuard>>>,
33 #[cfg(feature = "scaling")]
34 pub(crate) scaling_pressure: parking_lot::Mutex<Option<Arc<crate::scaling::ScalingPressure>>>,
35}
36
37/// Concurrency limiter for throttling rayon thread usage.
38///
39/// Rayon pools cannot be resized, so a limiter controls how many threads
40/// actively pick up work. Models explicit `target` (the scaler's desired
41/// concurrency) and `leased` (permits currently held by in-flight work);
42/// available headroom is the derived `target - leased`. A thread that cannot
43/// lease (`leased >= target`) PARKS on a condvar -- it does not spin -- so the
44/// throttle conserves CPU exactly when the scaler is trying to.
45///
46/// Why this shape (vs the old `available`/`max_permits` atomic): the scaler
47/// sets a *target*, and `active_threads()` reports *leased* (true in-flight),
48/// so an idle pool reports zero active and a downscale cannot be undone by
49/// guard drops refilling toward `max` -- drops only decrement `leased`, and
50/// no new lease is admitted while `leased >= target`.
51pub(crate) struct Semaphore {
52 state: Mutex<SemState>,
53 /// Signalled when a permit frees or the target grows.
54 available: Condvar,
55 /// Architectural ceiling (rayon pool size); `target` never exceeds it.
56 max_permits: usize,
57}
58
59struct SemState {
60 /// Scaler-controlled desired concurrency, kept in `[1, max_permits]`.
61 target: usize,
62 /// Permits currently held by in-flight work.
63 leased: usize,
64}
65
66impl Semaphore {
67 fn new(initial_target: usize, max_permits: usize) -> Self {
68 let max_permits = max_permits.max(1);
69 Self {
70 state: Mutex::new(SemState {
71 target: initial_target.clamp(1, max_permits),
72 leased: 0,
73 }),
74 available: Condvar::new(),
75 max_permits,
76 }
77 }
78
79 /// Lease a permit, parking until `leased < target`. Releases on drop.
80 fn acquire(&self) -> SemaphoreGuard<'_> {
81 let mut st = self.state.lock();
82 while st.leased >= st.target {
83 self.available.wait(&mut st);
84 }
85 st.leased += 1;
86 SemaphoreGuard { semaphore: self }
87 }
88
89 /// Set the target concurrency (called by the scaler). Clamped to
90 /// `[1, max_permits]`. Growing the target wakes parked acquirers so they
91 /// re-check; shrinking simply stops new leases until `leased` falls below
92 /// the new target -- in-flight work drains naturally.
93 pub(crate) fn set_target(&self, target: usize) {
94 let clamped = target.clamp(1, self.max_permits);
95 let mut st = self.state.lock();
96 let grew = clamped > st.target;
97 st.target = clamped;
98 drop(st);
99 if grew {
100 self.available.notify_all();
101 }
102 }
103
104 /// Current target concurrency.
105 pub(crate) fn target(&self) -> usize {
106 self.state.lock().target
107 }
108
109 /// Permits currently leased (in-flight work).
110 pub(crate) fn leased(&self) -> usize {
111 self.state.lock().leased
112 }
113
114 /// Headroom: how many more permits can be leased right now.
115 pub(crate) fn available(&self) -> usize {
116 let st = self.state.lock();
117 st.target.saturating_sub(st.leased)
118 }
119}
120
121struct SemaphoreGuard<'a> {
122 semaphore: &'a Semaphore,
123}
124
125impl Drop for SemaphoreGuard<'_> {
126 fn drop(&mut self) {
127 let mut st = self.semaphore.state.lock();
128 st.leased = st.leased.saturating_sub(1);
129 drop(st);
130 // Wake one parked acquirer; the freed permit is now leasable
131 // (subject to the current target, which it re-checks).
132 self.semaphore.available.notify_one();
133 }
134}
135
136/// Policy for [`AdaptiveWorkerPool::fan_out_async_with_policy`].
137///
138/// The plain [`fan_out_async`](AdaptiveWorkerPool::fan_out_async) helper has no
139/// timeout or cancellation, and a single hung future stalls its whole chunk. At
140/// fleet scale, user code eventually passes a future that ignores deadlines, so
141/// reusable external-I/O fan-out needs a deadline + cancellation contract.
142#[derive(Debug, Clone, Default)]
143pub struct FanOutPolicy {
144 /// Per-item timeout. `None` = no timeout (caller must bound the future).
145 pub per_item_timeout: Option<std::time::Duration>,
146 /// Cancellation token. When cancelled, no NEW items are spawned; in-flight
147 /// items are still awaited. Remaining unspawned items report `Cancelled`.
148 pub cancel: Option<tokio_util::sync::CancellationToken>,
149}
150
151/// Per-item outcome from [`AdaptiveWorkerPool::fan_out_async_with_policy`].
152#[derive(Debug)]
153pub enum FanOutResult<R, E> {
154 /// The future completed with `Ok`.
155 Ok(R),
156 /// The future completed with `Err`.
157 Err(E),
158 /// The future exceeded `per_item_timeout`.
159 TimedOut,
160 /// The task panicked (logged at `error`).
161 Panicked,
162 /// Not spawned because the cancellation token fired first.
163 Cancelled,
164}
165
166impl AdaptiveWorkerPool {
167 /// Create a new worker pool with the given configuration, validating it.
168 ///
169 /// Resolves `max_threads = 0` to the detected CPU count, then validates the
170 /// resolved config (rejects `min_threads > max_threads`, bad watermark
171 /// ordering, zero `async_concurrency`, etc.) before building. This prevents
172 /// invalid runtime state that would otherwise panic later in the scaler's
173 /// `clamp(min, max)`.
174 ///
175 /// # Errors
176 ///
177 /// Returns `ConfigError` if the resolved config is invalid.
178 pub fn try_new(config: WorkerPoolConfig) -> Result<Self, crate::config::ConfigError> {
179 let mut resolved = config;
180 resolved.resolve_max_threads();
181 resolved.validate()?;
182 Ok(Self::build(resolved))
183 }
184
185 /// Create a new worker pool with the given configuration.
186 ///
187 /// Resolves `max_threads = 0` to the detected CPU count.
188 ///
189 /// # Panics
190 ///
191 /// Panics immediately with a clear message if the config is invalid (e.g.
192 /// `min_threads > max_threads`). Use [`try_new`](Self::try_new) for fallible
193 /// construction from untrusted config.
194 #[must_use]
195 pub fn new(config: WorkerPoolConfig) -> Self {
196 Self::try_new(config).expect("invalid WorkerPoolConfig (use try_new to handle the error)")
197 }
198
199 /// Build the pool from an already-resolved, validated config.
200 fn build(resolved: WorkerPoolConfig) -> Self {
201 let max_threads = resolved.max_threads;
202 let min_threads = resolved.min_threads;
203
204 let rayon_pool = rayon::ThreadPoolBuilder::new()
205 .num_threads(max_threads)
206 .thread_name(|i| format!("worker-{i}"))
207 .build()
208 .expect("Failed to create rayon thread pool");
209
210 let semaphore = Arc::new(Semaphore::new(min_threads, max_threads));
211
212 Self {
213 config: Arc::new(RwLock::new(resolved)),
214 rayon_pool,
215 semaphore,
216 #[cfg(feature = "memory")]
217 memory_guard: parking_lot::Mutex::new(None),
218 #[cfg(feature = "scaling")]
219 scaling_pressure: parking_lot::Mutex::new(None),
220 }
221 }
222
223 /// Create a new worker pool from the config cascade.
224 ///
225 /// # Errors
226 ///
227 /// Returns an error if the config cascade is not initialised or validation fails.
228 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
229 let config = WorkerPoolConfig::from_cascade(key)?;
230 Self::try_new(config)
231 }
232
233 /// Process a batch of items in parallel using rayon (CPU-bound work).
234 ///
235 /// Each item is processed by the provided closure on a rayon worker thread.
236 /// A semaphore limits how many threads are active simultaneously (controlled
237 /// by the scaling controller). Results are returned in input order.
238 ///
239 /// Use this for: JSON parsing, transforms, compression, CEL evaluation, routing.
240 /// Do NOT use for work that needs `.await` -- use [`fan_out_async`](Self::fan_out_async).
241 pub fn process_batch<T, R, E, F>(&self, items: &[T], f: F) -> Vec<Result<R, E>>
242 where
243 T: Sync,
244 R: Send,
245 E: Send,
246 F: Fn(&T) -> Result<R, E> + Sync,
247 {
248 let sem = &self.semaphore;
249 self.rayon_pool.install(|| {
250 use rayon::prelude::*;
251 items
252 .par_iter()
253 .map(|item| {
254 let _permit = sem.acquire();
255 f(item)
256 })
257 .collect()
258 })
259 }
260
261 /// Fan out async work across tokio tasks with bounded concurrency.
262 ///
263 /// Each item is processed by the provided async closure on a tokio task.
264 /// Concurrency is limited by `async_concurrency` config.
265 ///
266 /// # Return contract
267 ///
268 /// The returned `Vec` has the same length as `items` and entries
269 /// correspond by index (input-order preserved):
270 ///
271 /// - `Some(Ok(r))` -- task completed successfully with result `r`
272 /// - `Some(Err(e))` -- task returned `Err(e)`
273 /// - `None` -- task panicked; the panic was logged at `error` level
274 /// with the input index. The wrapping `Option` exists so the
275 /// panic doesn't silently shorten the result vector (which was
276 /// the previous behaviour and violated the input-order contract).
277 ///
278 /// Use this for: enrichment lookups, external API calls, storage writes.
279 pub async fn fan_out_async<T, R, E, F, Fut>(
280 &self,
281 items: &[T],
282 f: F,
283 ) -> Vec<Option<Result<R, E>>>
284 where
285 T: Sync + Send,
286 R: Send + 'static,
287 E: Send + 'static,
288 F: Fn(&T) -> Fut + Send + Sync,
289 Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
290 {
291 let concurrency = self.config.read().async_concurrency;
292 let mut results: Vec<Option<Result<R, E>>> = (0..items.len()).map(|_| None).collect();
293
294 // Process in chunks of `concurrency` to limit in-flight tasks
295 for chunk_start in (0..items.len()).step_by(concurrency) {
296 let chunk_end = (chunk_start + concurrency).min(items.len());
297 let mut handles = Vec::with_capacity(chunk_end - chunk_start);
298
299 for (idx, item) in items
300 .iter()
301 .enumerate()
302 .skip(chunk_start)
303 .take(chunk_end - chunk_start)
304 {
305 let fut = f(item);
306 handles.push((idx, tokio::spawn(fut)));
307 }
308
309 for (idx, handle) in handles {
310 match handle.await {
311 Ok(result) => results[idx] = Some(result),
312 Err(join_err) => {
313 // Leave results[idx] = None; caller can detect
314 // the panic without shrinking the output vec.
315 tracing::error!(error = %join_err, idx, "fan_out_async task panicked");
316 }
317 }
318 }
319 }
320
321 results
322 }
323
324 /// Concurrency-bounded async fan-out with per-item timeout + cancellation.
325 ///
326 /// Unlike [`fan_out_async`](Self::fan_out_async), this streams completions
327 /// (a `JoinSet`) so a slow item never blocks faster ones, applies an
328 /// optional per-item timeout, and stops spawning new work when the policy's
329 /// cancellation token fires. Results are returned in input order; each is a
330 /// [`FanOutResult`]. Emits `dfe_fanout_*` metrics (in-flight gauge, timeout/
331 /// panic counters, batch-duration histogram) when the `metrics` feature is on.
332 pub async fn fan_out_async_with_policy<T, R, E, F, Fut>(
333 &self,
334 items: &[T],
335 policy: &FanOutPolicy,
336 f: F,
337 ) -> Vec<FanOutResult<R, E>>
338 where
339 T: Sync + Send,
340 R: Send + 'static,
341 E: Send + 'static,
342 F: Fn(&T) -> Fut + Send + Sync,
343 Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
344 {
345 let concurrency = self.config.read().async_concurrency.max(1);
346 let mut results: Vec<FanOutResult<R, E>> =
347 (0..items.len()).map(|_| FanOutResult::Cancelled).collect();
348
349 #[cfg(feature = "metrics")]
350 let started = std::time::Instant::now();
351
352 let mut set: tokio::task::JoinSet<(usize, FanOutResult<R, E>)> =
353 tokio::task::JoinSet::new();
354 // task Id -> input idx, so a panicked task (JoinError carries no payload)
355 // still maps back to its slot.
356 let mut id_to_idx: std::collections::HashMap<tokio::task::Id, usize> =
357 std::collections::HashMap::new();
358 let mut next = 0;
359 let cancelled = || policy.cancel.as_ref().is_some_and(|c| c.is_cancelled());
360
361 loop {
362 // Seed up to the concurrency limit (unless cancelled).
363 while set.len() < concurrency && next < items.len() && !cancelled() {
364 let fut = f(&items[next]);
365 let timeout = policy.per_item_timeout;
366 let idx = next;
367 let handle = set.spawn(async move {
368 let outcome = match timeout {
369 Some(d) => match tokio::time::timeout(d, fut).await {
370 Ok(Ok(r)) => FanOutResult::Ok(r),
371 Ok(Err(e)) => FanOutResult::Err(e),
372 Err(_) => FanOutResult::TimedOut,
373 },
374 None => match fut.await {
375 Ok(r) => FanOutResult::Ok(r),
376 Err(e) => FanOutResult::Err(e),
377 },
378 };
379 (idx, outcome)
380 });
381 id_to_idx.insert(handle.id(), idx);
382 next += 1;
383 }
384
385 #[cfg(feature = "metrics")]
386 ::metrics::gauge!("dfe_fanout_inflight").set(set.len() as f64);
387
388 let Some(joined) = set.join_next().await else {
389 break; // nothing in flight -- done (or cancelled with none left)
390 };
391 match joined {
392 Ok((idx, outcome)) => {
393 #[cfg(feature = "metrics")]
394 if matches!(outcome, FanOutResult::TimedOut) {
395 ::metrics::counter!("dfe_fanout_timeout_total").increment(1);
396 }
397 results[idx] = outcome;
398 }
399 Err(join_err) => {
400 // Panicked task -- map its task Id back to the input slot.
401 if let Some(&idx) = id_to_idx.get(&join_err.id()) {
402 results[idx] = FanOutResult::Panicked;
403 }
404 #[cfg(feature = "metrics")]
405 ::metrics::counter!("dfe_fanout_panic_total").increment(1);
406 tracing::error!(error = %join_err, "fan_out_async_with_policy task panicked");
407 }
408 }
409
410 // If cancelled and nothing left in flight, stop (remaining stay Cancelled).
411 if cancelled() && set.is_empty() {
412 break;
413 }
414 }
415
416 #[cfg(feature = "metrics")]
417 {
418 ::metrics::gauge!("dfe_fanout_inflight").set(0.0);
419 ::metrics::histogram!("dfe_fanout_batch_duration_seconds")
420 .record(started.elapsed().as_secs_f64());
421 }
422
423 results
424 }
425
426 /// Map owned items in parallel under the concurrency limiter.
427 ///
428 /// Like [`process_batch`](Self::process_batch) but takes OWNED items and a
429 /// closure that consumes each (so the transform may mutate its own copy),
430 /// returning an arbitrary `R` per item. Crucially the semaphore IS applied
431 /// per item, so this path obeys the scaler target -- unlike
432 /// [`install`](Self::install). Results are collected in input order.
433 ///
434 /// Used by `BatchEngine`'s parsed mid-tier transform so the parsed path is
435 /// throttled identically to the raw path.
436 pub fn map_owned<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
437 where
438 T: Send,
439 R: Send,
440 F: Fn(T) -> R + Sync,
441 {
442 let sem = &self.semaphore;
443 self.rayon_pool.install(|| {
444 use rayon::prelude::*;
445 items
446 .into_par_iter()
447 .map(|item| {
448 let _permit = sem.acquire();
449 f(item)
450 })
451 .collect()
452 })
453 }
454
455 /// Execute a closure on the rayon thread pool.
456 ///
457 /// Provides direct access to the rayon pool for operations that need
458 /// `par_iter_mut` or other rayon primitives not covered by the throttled
459 /// helpers. The semaphore is NOT applied -- callers manage their own
460 /// concurrency. Prefer [`process_batch`](Self::process_batch) or
461 /// [`map_owned`](Self::map_owned), which respect the scaler target; reach
462 /// for `install` only for genuine rayon-primitive escape hatches.
463 pub fn install<R: Send>(&self, f: impl FnOnce() -> R + Send) -> R {
464 self.rayon_pool.install(f)
465 }
466
467 /// Register worker pool metrics with the `MetricsManager`.
468 ///
469 /// Registers operational metrics and emits threshold gauges with current values.
470 /// Call this once during startup after creating the pool.
471 pub fn register_metrics(&self, manager: &crate::metrics::MetricsManager) {
472 let config = self.config.read();
473 super::metrics::register(manager, &config);
474 }
475
476 /// Start the background scaling controller.
477 ///
478 /// The controller samples CPU/memory every `scale_interval_secs` and adjusts
479 /// the semaphore permits based on watermark bands. Stops on cancellation.
480 pub fn start_scaling_loop(self: &Arc<Self>, cancel: tokio_util::sync::CancellationToken) {
481 let controller = super::scaler::ScalingController::new(self.clone());
482 tokio::spawn(controller.run(cancel));
483 }
484
485 /// Attach a `MemoryGuard` for dual-source memory pressure reading.
486 #[cfg(feature = "memory")]
487 pub fn set_memory_guard(&self, guard: Arc<crate::memory::MemoryGuard>) {
488 *self.memory_guard.lock() = Some(guard);
489 }
490
491 /// Attach a `ScalingPressure` for bidirectional pressure integration.
492 #[cfg(feature = "scaling")]
493 pub fn set_scaling_pressure(&self, pressure: Arc<crate::scaling::ScalingPressure>) {
494 *self.scaling_pressure.lock() = Some(pressure);
495 }
496
497 /// Number of permits currently leased -- true in-flight worker count.
498 ///
499 /// An idle pool reports 0 regardless of the scaler target. This is the
500 /// telemetry-grade "active" count (vs [`target_threads`](Self::target_threads),
501 /// the scaler's desired ceiling).
502 #[must_use]
503 pub fn active_threads(&self) -> usize {
504 self.semaphore.leased()
505 }
506
507 /// Current scaler target concurrency (the admission ceiling).
508 #[must_use]
509 pub fn target_threads(&self) -> usize {
510 self.semaphore.target()
511 }
512
513 /// Headroom: permits that could be leased right now (`target - leased`).
514 #[must_use]
515 pub fn available_threads(&self) -> usize {
516 self.semaphore.available()
517 }
518
519 /// Maximum thread count (pool size).
520 #[must_use]
521 pub fn max_threads(&self) -> usize {
522 self.config.read().max_threads
523 }
524}
525
526#[cfg(test)]
527mod semaphore_tests {
528 use std::sync::Arc;
529 use std::sync::atomic::{AtomicUsize, Ordering};
530
531 use super::Semaphore;
532
533 #[test]
534 fn idle_reports_zero_leased() {
535 let s = Semaphore::new(2, 8);
536 assert_eq!(s.leased(), 0);
537 assert_eq!(s.target(), 2);
538 assert_eq!(s.available(), 2);
539 }
540
541 #[test]
542 fn lease_and_drop_track_leased() {
543 let s = Semaphore::new(4, 8);
544 {
545 let _g1 = s.acquire();
546 let _g2 = s.acquire();
547 assert_eq!(s.leased(), 2);
548 assert_eq!(s.available(), 2);
549 }
550 assert_eq!(s.leased(), 0, "drops release leases");
551 assert_eq!(s.available(), 4);
552 }
553
554 #[test]
555 fn downscale_does_not_overshoot_on_drop() {
556 // Lease the full target, shrink the target while leased, then drain.
557 // The old model refilled `available` toward max_permits on drop,
558 // undoing the downscale; the new model derives available from the
559 // target, so post-drain available == target, not max.
560 let s = Semaphore::new(8, 8);
561 let guards: Vec<_> = (0..8).map(|_| s.acquire()).collect();
562 assert_eq!(s.leased(), 8);
563 s.set_target(2);
564 assert_eq!(s.target(), 2);
565 assert_eq!(
566 s.available(),
567 0,
568 "leased (8) exceeds target (2): no headroom"
569 );
570 drop(guards);
571 assert_eq!(s.leased(), 0);
572 assert_eq!(
573 s.available(),
574 2,
575 "available equals target after drain, not max_permits"
576 );
577 }
578
579 #[test]
580 fn set_target_clamps_to_one_and_max() {
581 let s = Semaphore::new(4, 8);
582 s.set_target(0);
583 assert_eq!(s.target(), 1, "target floored at 1 to avoid deadlock");
584 s.set_target(100);
585 assert_eq!(s.target(), 8, "target capped at max_permits");
586 }
587
588 #[test]
589 fn contention_never_exceeds_target() {
590 // 8 threads hammer a target=2 limiter; leased must never exceed 2.
591 let s = Arc::new(Semaphore::new(2, 2));
592 let max_seen = Arc::new(AtomicUsize::new(0));
593 let handles: Vec<_> = (0..8)
594 .map(|_| {
595 let s = Arc::clone(&s);
596 let max_seen = Arc::clone(&max_seen);
597 std::thread::spawn(move || {
598 for _ in 0..50 {
599 let _g = s.acquire();
600 max_seen.fetch_max(s.leased(), Ordering::Relaxed);
601 std::thread::yield_now();
602 }
603 })
604 })
605 .collect();
606 for h in handles {
607 h.join().unwrap();
608 }
609 assert!(
610 max_seen.load(Ordering::Relaxed) <= 2,
611 "leased never exceeded target=2"
612 );
613 assert_eq!(s.leased(), 0);
614 }
615
616 #[test]
617 fn grow_target_wakes_parked_acquirer() {
618 // target=1, one lease held; a second acquirer parks until the target
619 // grows -- proving wakeup on set_target, not a spin.
620 let s = Arc::new(Semaphore::new(1, 4));
621 let held = s.acquire();
622 assert_eq!(s.leased(), 1);
623 let s2 = Arc::clone(&s);
624 let handle = std::thread::spawn(move || {
625 let _g = s2.acquire();
626 s2.leased()
627 });
628 std::thread::sleep(std::time::Duration::from_millis(50));
629 s.set_target(2);
630 let observed = handle.join().unwrap();
631 assert!(observed >= 1, "parked acquirer proceeded after target grew");
632 drop(held);
633 }
634}