hyperi_rustlib/worker/pool.rs
1// Project: hyperi-rustlib
2// File: src/worker/pool.rs
3// Purpose: Rayon pool + semaphore management, process_batch(), fan_out_async()
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use std::sync::Arc;
10
11use parking_lot::{Condvar, Mutex, RwLock};
12use rayon::ThreadPool;
13
14use super::config::WorkerPoolConfig;
15
16/// Adaptive worker pool with hybrid rayon (CPU) + tokio (async I/O) execution.
17///
18/// Provides two APIs:
19/// - [`process_batch`](Self::process_batch) -- CPU-bound work via rayon
20/// (JSON parsing, transforms, compression, CEL evaluation)
21/// - [`fan_out_async`](Self::fan_out_async) -- async I/O via tokio
22/// (enrichment, external APIs, storage writes)
23///
24/// The pool auto-scales active threads based on CPU/memory pressure using
25/// watermark bands. All thresholds are config-cascade overridable and emitted
26/// as gauge metrics.
27pub struct AdaptiveWorkerPool {
28 pub(crate) config: Arc<RwLock<WorkerPoolConfig>>,
29 rayon_pool: ThreadPool,
30 pub(crate) semaphore: Arc<Semaphore>,
31 #[cfg(feature = "memory")]
32 pub(crate) memory_guard: parking_lot::Mutex<Option<Arc<crate::memory::MemoryGuard>>>,
33 #[cfg(feature = "scaling")]
34 pub(crate) scaling_pressure: parking_lot::Mutex<Option<Arc<crate::scaling::ScalingPressure>>>,
35}
36
37/// Concurrency limiter for throttling rayon thread usage.
38///
39/// Rayon pools cannot be resized, so a limiter controls how many threads
40/// actively pick up work. Models explicit `target` (the scaler's desired
41/// concurrency) and `leased` (permits currently held by in-flight work);
42/// available headroom is the derived `target - leased`. A thread that cannot
43/// lease (`leased >= target`) PARKS on a condvar -- it does not spin -- so the
44/// throttle conserves CPU exactly when the scaler is trying to.
45///
46/// Why this shape (vs the old `available`/`max_permits` atomic): the scaler
47/// sets a *target*, and `active_threads()` reports *leased* (true in-flight),
48/// so an idle pool reports zero active and a downscale cannot be undone by
49/// guard drops refilling toward `max` -- drops only decrement `leased`, and
50/// no new lease is admitted while `leased >= target`.
51pub(crate) struct Semaphore {
52 state: Mutex<SemState>,
53 /// Signalled when a permit frees or the target grows.
54 available: Condvar,
55 /// Architectural ceiling (rayon pool size); `target` never exceeds it.
56 max_permits: usize,
57}
58
59struct SemState {
60 /// Scaler-controlled desired concurrency, kept in `[1, max_permits]`.
61 target: usize,
62 /// Permits currently held by in-flight work.
63 leased: usize,
64}
65
66impl Semaphore {
67 fn new(initial_target: usize, max_permits: usize) -> Self {
68 let max_permits = max_permits.max(1);
69 Self {
70 state: Mutex::new(SemState {
71 target: initial_target.clamp(1, max_permits),
72 leased: 0,
73 }),
74 available: Condvar::new(),
75 max_permits,
76 }
77 }
78
79 /// Lease a permit, parking until `leased < target`. Releases on drop.
80 fn acquire(&self) -> SemaphoreGuard<'_> {
81 let mut st = self.state.lock();
82 while st.leased >= st.target {
83 self.available.wait(&mut st);
84 }
85 st.leased += 1;
86 SemaphoreGuard { semaphore: self }
87 }
88
89 /// Set the target concurrency (called by the scaler). Clamped to
90 /// `[1, max_permits]`. Growing the target wakes parked acquirers so they
91 /// re-check; shrinking simply stops new leases until `leased` falls below
92 /// the new target -- in-flight work drains naturally.
93 pub(crate) fn set_target(&self, target: usize) {
94 let clamped = target.clamp(1, self.max_permits);
95 let mut st = self.state.lock();
96 let grew = clamped > st.target;
97 st.target = clamped;
98 drop(st);
99 if grew {
100 self.available.notify_all();
101 }
102 }
103
104 /// Current target concurrency.
105 pub(crate) fn target(&self) -> usize {
106 self.state.lock().target
107 }
108
109 /// Permits currently leased (in-flight work).
110 pub(crate) fn leased(&self) -> usize {
111 self.state.lock().leased
112 }
113
114 /// Headroom: how many more permits can be leased right now.
115 pub(crate) fn available(&self) -> usize {
116 let st = self.state.lock();
117 st.target.saturating_sub(st.leased)
118 }
119}
120
121struct SemaphoreGuard<'a> {
122 semaphore: &'a Semaphore,
123}
124
125impl Drop for SemaphoreGuard<'_> {
126 fn drop(&mut self) {
127 let mut st = self.semaphore.state.lock();
128 st.leased = st.leased.saturating_sub(1);
129 drop(st);
130 // Wake one parked acquirer; the freed permit is now leasable
131 // (subject to the current target, which it re-checks).
132 self.semaphore.available.notify_one();
133 }
134}
135
136/// Policy for [`AdaptiveWorkerPool::fan_out_async_with_policy`].
137///
138/// The plain [`fan_out_async`](AdaptiveWorkerPool::fan_out_async) helper has no
139/// timeout or cancellation, and a single hung future stalls its whole chunk. At
140/// fleet scale, user code eventually passes a future that ignores deadlines, so
141/// reusable external-I/O fan-out needs a deadline + cancellation contract
142/// (Codex review 2026-06-03).
143#[derive(Debug, Clone, Default)]
144pub struct FanOutPolicy {
145 /// Per-item timeout. `None` = no timeout (caller must bound the future).
146 pub per_item_timeout: Option<std::time::Duration>,
147 /// Cancellation token. When cancelled, no NEW items are spawned; in-flight
148 /// items are still awaited. Remaining unspawned items report `Cancelled`.
149 pub cancel: Option<tokio_util::sync::CancellationToken>,
150}
151
152/// Per-item outcome from [`AdaptiveWorkerPool::fan_out_async_with_policy`].
153#[derive(Debug)]
154pub enum FanOutResult<R, E> {
155 /// The future completed with `Ok`.
156 Ok(R),
157 /// The future completed with `Err`.
158 Err(E),
159 /// The future exceeded `per_item_timeout`.
160 TimedOut,
161 /// The task panicked (logged at `error`).
162 Panicked,
163 /// Not spawned because the cancellation token fired first.
164 Cancelled,
165}
166
167impl AdaptiveWorkerPool {
168 /// Create a new worker pool with the given configuration, validating it.
169 ///
170 /// Resolves `max_threads = 0` to the detected CPU count, then validates the
171 /// resolved config (rejects `min_threads > max_threads`, bad watermark
172 /// ordering, zero `async_concurrency`, etc.) before building. This prevents
173 /// invalid runtime state that would otherwise panic later in the scaler's
174 /// `clamp(min, max)` (Codex review 2026-06-03).
175 ///
176 /// # Errors
177 ///
178 /// Returns `ConfigError` if the resolved config is invalid.
179 pub fn try_new(config: WorkerPoolConfig) -> Result<Self, crate::config::ConfigError> {
180 let mut resolved = config;
181 resolved.resolve_max_threads();
182 resolved.validate()?;
183 Ok(Self::build(resolved))
184 }
185
186 /// Create a new worker pool with the given configuration.
187 ///
188 /// Resolves `max_threads = 0` to the detected CPU count.
189 ///
190 /// # Panics
191 ///
192 /// Panics immediately with a clear message if the config is invalid (e.g.
193 /// `min_threads > max_threads`). Use [`try_new`](Self::try_new) for fallible
194 /// construction from untrusted config.
195 #[must_use]
196 pub fn new(config: WorkerPoolConfig) -> Self {
197 Self::try_new(config).expect("invalid WorkerPoolConfig (use try_new to handle the error)")
198 }
199
200 /// Build the pool from an already-resolved, validated config.
201 fn build(resolved: WorkerPoolConfig) -> Self {
202 let max_threads = resolved.max_threads;
203 let min_threads = resolved.min_threads;
204
205 let rayon_pool = rayon::ThreadPoolBuilder::new()
206 .num_threads(max_threads)
207 .thread_name(|i| format!("worker-{i}"))
208 .build()
209 .expect("Failed to create rayon thread pool");
210
211 let semaphore = Arc::new(Semaphore::new(min_threads, max_threads));
212
213 Self {
214 config: Arc::new(RwLock::new(resolved)),
215 rayon_pool,
216 semaphore,
217 #[cfg(feature = "memory")]
218 memory_guard: parking_lot::Mutex::new(None),
219 #[cfg(feature = "scaling")]
220 scaling_pressure: parking_lot::Mutex::new(None),
221 }
222 }
223
224 /// Create a new worker pool from the config cascade.
225 ///
226 /// # Errors
227 ///
228 /// Returns an error if the config cascade is not initialised or validation fails.
229 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
230 let config = WorkerPoolConfig::from_cascade(key)?;
231 Self::try_new(config)
232 }
233
234 /// Process a batch of items in parallel using rayon (CPU-bound work).
235 ///
236 /// Each item is processed by the provided closure on a rayon worker thread.
237 /// A semaphore limits how many threads are active simultaneously (controlled
238 /// by the scaling controller). Results are returned in input order.
239 ///
240 /// Use this for: JSON parsing, transforms, compression, CEL evaluation, routing.
241 /// Do NOT use for work that needs `.await` -- use [`fan_out_async`](Self::fan_out_async).
242 pub fn process_batch<T, R, E, F>(&self, items: &[T], f: F) -> Vec<Result<R, E>>
243 where
244 T: Sync,
245 R: Send,
246 E: Send,
247 F: Fn(&T) -> Result<R, E> + Sync,
248 {
249 let sem = &self.semaphore;
250 self.rayon_pool.install(|| {
251 use rayon::prelude::*;
252 items
253 .par_iter()
254 .map(|item| {
255 let _permit = sem.acquire();
256 f(item)
257 })
258 .collect()
259 })
260 }
261
262 /// Fan out async work across tokio tasks with bounded concurrency.
263 ///
264 /// Each item is processed by the provided async closure on a tokio task.
265 /// Concurrency is limited by `async_concurrency` config.
266 ///
267 /// # Return contract
268 ///
269 /// The returned `Vec` has the same length as `items` and entries
270 /// correspond by index (input-order preserved):
271 ///
272 /// - `Some(Ok(r))` -- task completed successfully with result `r`
273 /// - `Some(Err(e))` -- task returned `Err(e)`
274 /// - `None` -- task panicked; the panic was logged at `error` level
275 /// with the input index. The wrapping `Option` exists so the
276 /// panic doesn't silently shorten the result vector (which was
277 /// the previous behaviour and violated the input-order contract).
278 ///
279 /// Use this for: enrichment lookups, external API calls, storage writes.
280 pub async fn fan_out_async<T, R, E, F, Fut>(
281 &self,
282 items: &[T],
283 f: F,
284 ) -> Vec<Option<Result<R, E>>>
285 where
286 T: Sync + Send,
287 R: Send + 'static,
288 E: Send + 'static,
289 F: Fn(&T) -> Fut + Send + Sync,
290 Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
291 {
292 let concurrency = self.config.read().async_concurrency;
293 let mut results: Vec<Option<Result<R, E>>> = (0..items.len()).map(|_| None).collect();
294
295 // Process in chunks of `concurrency` to limit in-flight tasks
296 for chunk_start in (0..items.len()).step_by(concurrency) {
297 let chunk_end = (chunk_start + concurrency).min(items.len());
298 let mut handles = Vec::with_capacity(chunk_end - chunk_start);
299
300 for (idx, item) in items
301 .iter()
302 .enumerate()
303 .skip(chunk_start)
304 .take(chunk_end - chunk_start)
305 {
306 let fut = f(item);
307 handles.push((idx, tokio::spawn(fut)));
308 }
309
310 for (idx, handle) in handles {
311 match handle.await {
312 Ok(result) => results[idx] = Some(result),
313 Err(join_err) => {
314 // Leave results[idx] = None; caller can detect
315 // the panic without shrinking the output vec.
316 tracing::error!(error = %join_err, idx, "fan_out_async task panicked");
317 }
318 }
319 }
320 }
321
322 results
323 }
324
325 /// Concurrency-bounded async fan-out with per-item timeout + cancellation.
326 ///
327 /// Unlike [`fan_out_async`](Self::fan_out_async), this streams completions
328 /// (a `JoinSet`) so a slow item never blocks faster ones, applies an
329 /// optional per-item timeout, and stops spawning new work when the policy's
330 /// cancellation token fires. Results are returned in input order; each is a
331 /// [`FanOutResult`]. Emits `dfe_fanout_*` metrics (in-flight gauge, timeout/
332 /// panic counters, batch-duration histogram) when the `metrics` feature is on.
333 pub async fn fan_out_async_with_policy<T, R, E, F, Fut>(
334 &self,
335 items: &[T],
336 policy: &FanOutPolicy,
337 f: F,
338 ) -> Vec<FanOutResult<R, E>>
339 where
340 T: Sync + Send,
341 R: Send + 'static,
342 E: Send + 'static,
343 F: Fn(&T) -> Fut + Send + Sync,
344 Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
345 {
346 let concurrency = self.config.read().async_concurrency.max(1);
347 let mut results: Vec<FanOutResult<R, E>> =
348 (0..items.len()).map(|_| FanOutResult::Cancelled).collect();
349
350 #[cfg(feature = "metrics")]
351 let started = std::time::Instant::now();
352
353 let mut set: tokio::task::JoinSet<(usize, FanOutResult<R, E>)> =
354 tokio::task::JoinSet::new();
355 // task Id -> input idx, so a panicked task (JoinError carries no payload)
356 // still maps back to its slot.
357 let mut id_to_idx: std::collections::HashMap<tokio::task::Id, usize> =
358 std::collections::HashMap::new();
359 let mut next = 0;
360 let cancelled = || policy.cancel.as_ref().is_some_and(|c| c.is_cancelled());
361
362 loop {
363 // Seed up to the concurrency limit (unless cancelled).
364 while set.len() < concurrency && next < items.len() && !cancelled() {
365 let fut = f(&items[next]);
366 let timeout = policy.per_item_timeout;
367 let idx = next;
368 let handle = set.spawn(async move {
369 let outcome = match timeout {
370 Some(d) => match tokio::time::timeout(d, fut).await {
371 Ok(Ok(r)) => FanOutResult::Ok(r),
372 Ok(Err(e)) => FanOutResult::Err(e),
373 Err(_) => FanOutResult::TimedOut,
374 },
375 None => match fut.await {
376 Ok(r) => FanOutResult::Ok(r),
377 Err(e) => FanOutResult::Err(e),
378 },
379 };
380 (idx, outcome)
381 });
382 id_to_idx.insert(handle.id(), idx);
383 next += 1;
384 }
385
386 #[cfg(feature = "metrics")]
387 ::metrics::gauge!("dfe_fanout_inflight").set(set.len() as f64);
388
389 let Some(joined) = set.join_next().await else {
390 break; // nothing in flight -- done (or cancelled with none left)
391 };
392 match joined {
393 Ok((idx, outcome)) => {
394 #[cfg(feature = "metrics")]
395 if matches!(outcome, FanOutResult::TimedOut) {
396 ::metrics::counter!("dfe_fanout_timeout_total").increment(1);
397 }
398 results[idx] = outcome;
399 }
400 Err(join_err) => {
401 // Panicked task -- map its task Id back to the input slot.
402 if let Some(&idx) = id_to_idx.get(&join_err.id()) {
403 results[idx] = FanOutResult::Panicked;
404 }
405 #[cfg(feature = "metrics")]
406 ::metrics::counter!("dfe_fanout_panic_total").increment(1);
407 tracing::error!(error = %join_err, "fan_out_async_with_policy task panicked");
408 }
409 }
410
411 // If cancelled and nothing left in flight, stop (remaining stay Cancelled).
412 if cancelled() && set.is_empty() {
413 break;
414 }
415 }
416
417 #[cfg(feature = "metrics")]
418 {
419 ::metrics::gauge!("dfe_fanout_inflight").set(0.0);
420 ::metrics::histogram!("dfe_fanout_batch_duration_seconds")
421 .record(started.elapsed().as_secs_f64());
422 }
423
424 results
425 }
426
427 /// Map owned items in parallel under the concurrency limiter.
428 ///
429 /// Like [`process_batch`](Self::process_batch) but takes OWNED items and a
430 /// closure that consumes each (so the transform may mutate its own copy),
431 /// returning an arbitrary `R` per item. Crucially the semaphore IS applied
432 /// per item, so this path obeys the scaler target -- unlike
433 /// [`install`](Self::install). Results are collected in input order.
434 ///
435 /// Used by `BatchEngine`'s parsed mid-tier transform so the parsed path is
436 /// throttled identically to the raw path.
437 pub fn map_owned<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
438 where
439 T: Send,
440 R: Send,
441 F: Fn(T) -> R + Sync,
442 {
443 let sem = &self.semaphore;
444 self.rayon_pool.install(|| {
445 use rayon::prelude::*;
446 items
447 .into_par_iter()
448 .map(|item| {
449 let _permit = sem.acquire();
450 f(item)
451 })
452 .collect()
453 })
454 }
455
456 /// Execute a closure on the rayon thread pool.
457 ///
458 /// Provides direct access to the rayon pool for operations that need
459 /// `par_iter_mut` or other rayon primitives not covered by the throttled
460 /// helpers. The semaphore is NOT applied -- callers manage their own
461 /// concurrency. Prefer [`process_batch`](Self::process_batch) or
462 /// [`map_owned`](Self::map_owned), which respect the scaler target; reach
463 /// for `install` only for genuine rayon-primitive escape hatches.
464 pub fn install<R: Send>(&self, f: impl FnOnce() -> R + Send) -> R {
465 self.rayon_pool.install(f)
466 }
467
468 /// Register worker pool metrics with the `MetricsManager`.
469 ///
470 /// Registers operational metrics and emits threshold gauges with current values.
471 /// Call this once during startup after creating the pool.
472 pub fn register_metrics(&self, manager: &crate::metrics::MetricsManager) {
473 let config = self.config.read();
474 super::metrics::register(manager, &config);
475 }
476
477 /// Start the background scaling controller.
478 ///
479 /// The controller samples CPU/memory every `scale_interval_secs` and adjusts
480 /// the semaphore permits based on watermark bands. Stops on cancellation.
481 pub fn start_scaling_loop(self: &Arc<Self>, cancel: tokio_util::sync::CancellationToken) {
482 let controller = super::scaler::ScalingController::new(self.clone());
483 tokio::spawn(controller.run(cancel));
484 }
485
486 /// Attach a `MemoryGuard` for dual-source memory pressure reading.
487 #[cfg(feature = "memory")]
488 pub fn set_memory_guard(&self, guard: Arc<crate::memory::MemoryGuard>) {
489 *self.memory_guard.lock() = Some(guard);
490 }
491
492 /// Attach a `ScalingPressure` for bidirectional pressure integration.
493 #[cfg(feature = "scaling")]
494 pub fn set_scaling_pressure(&self, pressure: Arc<crate::scaling::ScalingPressure>) {
495 *self.scaling_pressure.lock() = Some(pressure);
496 }
497
498 /// Number of permits currently leased -- true in-flight worker count.
499 ///
500 /// An idle pool reports 0 regardless of the scaler target. This is the
501 /// telemetry-grade "active" count (vs [`target_threads`](Self::target_threads),
502 /// the scaler's desired ceiling).
503 #[must_use]
504 pub fn active_threads(&self) -> usize {
505 self.semaphore.leased()
506 }
507
508 /// Current scaler target concurrency (the admission ceiling).
509 #[must_use]
510 pub fn target_threads(&self) -> usize {
511 self.semaphore.target()
512 }
513
514 /// Headroom: permits that could be leased right now (`target - leased`).
515 #[must_use]
516 pub fn available_threads(&self) -> usize {
517 self.semaphore.available()
518 }
519
520 /// Maximum thread count (pool size).
521 #[must_use]
522 pub fn max_threads(&self) -> usize {
523 self.config.read().max_threads
524 }
525}
526
527#[cfg(test)]
528mod semaphore_tests {
529 use std::sync::Arc;
530 use std::sync::atomic::{AtomicUsize, Ordering};
531
532 use super::Semaphore;
533
534 #[test]
535 fn idle_reports_zero_leased() {
536 let s = Semaphore::new(2, 8);
537 assert_eq!(s.leased(), 0);
538 assert_eq!(s.target(), 2);
539 assert_eq!(s.available(), 2);
540 }
541
542 #[test]
543 fn lease_and_drop_track_leased() {
544 let s = Semaphore::new(4, 8);
545 {
546 let _g1 = s.acquire();
547 let _g2 = s.acquire();
548 assert_eq!(s.leased(), 2);
549 assert_eq!(s.available(), 2);
550 }
551 assert_eq!(s.leased(), 0, "drops release leases");
552 assert_eq!(s.available(), 4);
553 }
554
555 #[test]
556 fn downscale_does_not_overshoot_on_drop() {
557 // Lease the full target, shrink the target while leased, then drain.
558 // The old model refilled `available` toward max_permits on drop,
559 // undoing the downscale; the new model derives available from the
560 // target, so post-drain available == target, not max.
561 let s = Semaphore::new(8, 8);
562 let guards: Vec<_> = (0..8).map(|_| s.acquire()).collect();
563 assert_eq!(s.leased(), 8);
564 s.set_target(2);
565 assert_eq!(s.target(), 2);
566 assert_eq!(
567 s.available(),
568 0,
569 "leased (8) exceeds target (2): no headroom"
570 );
571 drop(guards);
572 assert_eq!(s.leased(), 0);
573 assert_eq!(
574 s.available(),
575 2,
576 "available equals target after drain, not max_permits"
577 );
578 }
579
580 #[test]
581 fn set_target_clamps_to_one_and_max() {
582 let s = Semaphore::new(4, 8);
583 s.set_target(0);
584 assert_eq!(s.target(), 1, "target floored at 1 to avoid deadlock");
585 s.set_target(100);
586 assert_eq!(s.target(), 8, "target capped at max_permits");
587 }
588
589 #[test]
590 fn contention_never_exceeds_target() {
591 // 8 threads hammer a target=2 limiter; leased must never exceed 2.
592 let s = Arc::new(Semaphore::new(2, 2));
593 let max_seen = Arc::new(AtomicUsize::new(0));
594 let handles: Vec<_> = (0..8)
595 .map(|_| {
596 let s = Arc::clone(&s);
597 let max_seen = Arc::clone(&max_seen);
598 std::thread::spawn(move || {
599 for _ in 0..50 {
600 let _g = s.acquire();
601 max_seen.fetch_max(s.leased(), Ordering::Relaxed);
602 std::thread::yield_now();
603 }
604 })
605 })
606 .collect();
607 for h in handles {
608 h.join().unwrap();
609 }
610 assert!(
611 max_seen.load(Ordering::Relaxed) <= 2,
612 "leased never exceeded target=2"
613 );
614 assert_eq!(s.leased(), 0);
615 }
616
617 #[test]
618 fn grow_target_wakes_parked_acquirer() {
619 // target=1, one lease held; a second acquirer parks until the target
620 // grows -- proving wakeup on set_target, not a spin.
621 let s = Arc::new(Semaphore::new(1, 4));
622 let held = s.acquire();
623 assert_eq!(s.leased(), 1);
624 let s2 = Arc::clone(&s);
625 let handle = std::thread::spawn(move || {
626 let _g = s2.acquire();
627 s2.leased()
628 });
629 std::thread::sleep(std::time::Duration::from_millis(50));
630 s.set_target(2);
631 let observed = handle.join().unwrap();
632 assert!(observed >= 1, "parked acquirer proceeded after target grew");
633 drop(held);
634 }
635}