Skip to main content

hyperi_rustlib/worker/
pool.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/pool.rs
3// Purpose:   Rayon pool + semaphore management, process_batch(), fan_out_async()
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use std::sync::Arc;
10
11use parking_lot::{Condvar, Mutex, RwLock};
12use rayon::ThreadPool;
13
14use super::config::WorkerPoolConfig;
15
16/// Adaptive worker pool with hybrid rayon (CPU) + tokio (async I/O) execution.
17///
18/// Provides two APIs:
19/// - [`process_batch`](Self::process_batch) -- CPU-bound work via rayon
20///   (JSON parsing, transforms, compression, CEL evaluation)
21/// - [`fan_out_async`](Self::fan_out_async) -- async I/O via tokio
22///   (enrichment, external APIs, storage writes)
23///
24/// The pool auto-scales active threads based on CPU/memory pressure using
25/// watermark bands. All thresholds are config-cascade overridable and emitted
26/// as gauge metrics.
27pub struct AdaptiveWorkerPool {
28    pub(crate) config: Arc<RwLock<WorkerPoolConfig>>,
29    rayon_pool: ThreadPool,
30    pub(crate) semaphore: Arc<Semaphore>,
31    #[cfg(feature = "memory")]
32    pub(crate) memory_guard: parking_lot::Mutex<Option<Arc<crate::memory::MemoryGuard>>>,
33    #[cfg(feature = "scaling")]
34    pub(crate) scaling_pressure: parking_lot::Mutex<Option<Arc<crate::scaling::ScalingPressure>>>,
35}
36
37/// Concurrency limiter for throttling rayon thread usage.
38///
39/// Rayon pools cannot be resized, so a limiter controls how many threads
40/// actively pick up work. Models explicit `target` (the scaler's desired
41/// concurrency) and `leased` (permits currently held by in-flight work);
42/// available headroom is the derived `target - leased`. A thread that cannot
43/// lease (`leased >= target`) PARKS on a condvar -- it does not spin -- so the
44/// throttle conserves CPU exactly when the scaler is trying to.
45///
46/// Why this shape (vs the old `available`/`max_permits` atomic): the scaler
47/// sets a *target*, and `active_threads()` reports *leased* (true in-flight),
48/// so an idle pool reports zero active and a downscale cannot be undone by
49/// guard drops refilling toward `max` -- drops only decrement `leased`, and
50/// no new lease is admitted while `leased >= target`.
51pub(crate) struct Semaphore {
52    state: Mutex<SemState>,
53    /// Signalled when a permit frees or the target grows.
54    available: Condvar,
55    /// Architectural ceiling (rayon pool size); `target` never exceeds it.
56    max_permits: usize,
57}
58
59struct SemState {
60    /// Scaler-controlled desired concurrency, kept in `[1, max_permits]`.
61    target: usize,
62    /// Permits currently held by in-flight work.
63    leased: usize,
64}
65
66impl Semaphore {
67    fn new(initial_target: usize, max_permits: usize) -> Self {
68        let max_permits = max_permits.max(1);
69        Self {
70            state: Mutex::new(SemState {
71                target: initial_target.clamp(1, max_permits),
72                leased: 0,
73            }),
74            available: Condvar::new(),
75            max_permits,
76        }
77    }
78
79    /// Lease a permit, parking until `leased < target`. Releases on drop.
80    fn acquire(&self) -> SemaphoreGuard<'_> {
81        let mut st = self.state.lock();
82        while st.leased >= st.target {
83            self.available.wait(&mut st);
84        }
85        st.leased += 1;
86        SemaphoreGuard { semaphore: self }
87    }
88
89    /// Set the target concurrency (called by the scaler). Clamped to
90    /// `[1, max_permits]`. Growing the target wakes parked acquirers so they
91    /// re-check; shrinking simply stops new leases until `leased` falls below
92    /// the new target -- in-flight work drains naturally.
93    pub(crate) fn set_target(&self, target: usize) {
94        let clamped = target.clamp(1, self.max_permits);
95        let mut st = self.state.lock();
96        let grew = clamped > st.target;
97        st.target = clamped;
98        drop(st);
99        if grew {
100            self.available.notify_all();
101        }
102    }
103
104    /// Current target concurrency.
105    pub(crate) fn target(&self) -> usize {
106        self.state.lock().target
107    }
108
109    /// Permits currently leased (in-flight work).
110    pub(crate) fn leased(&self) -> usize {
111        self.state.lock().leased
112    }
113
114    /// Headroom: how many more permits can be leased right now.
115    pub(crate) fn available(&self) -> usize {
116        let st = self.state.lock();
117        st.target.saturating_sub(st.leased)
118    }
119}
120
121struct SemaphoreGuard<'a> {
122    semaphore: &'a Semaphore,
123}
124
125impl Drop for SemaphoreGuard<'_> {
126    fn drop(&mut self) {
127        let mut st = self.semaphore.state.lock();
128        st.leased = st.leased.saturating_sub(1);
129        drop(st);
130        // Wake one parked acquirer; the freed permit is now leasable
131        // (subject to the current target, which it re-checks).
132        self.semaphore.available.notify_one();
133    }
134}
135
136/// Policy for [`AdaptiveWorkerPool::fan_out_async_with_policy`].
137///
138/// The plain [`fan_out_async`](AdaptiveWorkerPool::fan_out_async) helper has no
139/// timeout or cancellation, and a single hung future stalls its whole chunk. At
140/// fleet scale, user code eventually passes a future that ignores deadlines, so
141/// reusable external-I/O fan-out needs a deadline + cancellation contract
142/// (Codex review 2026-06-03).
143#[derive(Debug, Clone, Default)]
144pub struct FanOutPolicy {
145    /// Per-item timeout. `None` = no timeout (caller must bound the future).
146    pub per_item_timeout: Option<std::time::Duration>,
147    /// Cancellation token. When cancelled, no NEW items are spawned; in-flight
148    /// items are still awaited. Remaining unspawned items report `Cancelled`.
149    pub cancel: Option<tokio_util::sync::CancellationToken>,
150}
151
152/// Per-item outcome from [`AdaptiveWorkerPool::fan_out_async_with_policy`].
153#[derive(Debug)]
154pub enum FanOutResult<R, E> {
155    /// The future completed with `Ok`.
156    Ok(R),
157    /// The future completed with `Err`.
158    Err(E),
159    /// The future exceeded `per_item_timeout`.
160    TimedOut,
161    /// The task panicked (logged at `error`).
162    Panicked,
163    /// Not spawned because the cancellation token fired first.
164    Cancelled,
165}
166
167impl AdaptiveWorkerPool {
168    /// Create a new worker pool with the given configuration, validating it.
169    ///
170    /// Resolves `max_threads = 0` to the detected CPU count, then validates the
171    /// resolved config (rejects `min_threads > max_threads`, bad watermark
172    /// ordering, zero `async_concurrency`, etc.) before building. This prevents
173    /// invalid runtime state that would otherwise panic later in the scaler's
174    /// `clamp(min, max)` (Codex review 2026-06-03).
175    ///
176    /// # Errors
177    ///
178    /// Returns `ConfigError` if the resolved config is invalid.
179    pub fn try_new(config: WorkerPoolConfig) -> Result<Self, crate::config::ConfigError> {
180        let mut resolved = config;
181        resolved.resolve_max_threads();
182        resolved.validate()?;
183        Ok(Self::build(resolved))
184    }
185
186    /// Create a new worker pool with the given configuration.
187    ///
188    /// Resolves `max_threads = 0` to the detected CPU count.
189    ///
190    /// # Panics
191    ///
192    /// Panics immediately with a clear message if the config is invalid (e.g.
193    /// `min_threads > max_threads`). Use [`try_new`](Self::try_new) for fallible
194    /// construction from untrusted config.
195    #[must_use]
196    pub fn new(config: WorkerPoolConfig) -> Self {
197        Self::try_new(config).expect("invalid WorkerPoolConfig (use try_new to handle the error)")
198    }
199
200    /// Build the pool from an already-resolved, validated config.
201    fn build(resolved: WorkerPoolConfig) -> Self {
202        let max_threads = resolved.max_threads;
203        let min_threads = resolved.min_threads;
204
205        let rayon_pool = rayon::ThreadPoolBuilder::new()
206            .num_threads(max_threads)
207            .thread_name(|i| format!("worker-{i}"))
208            .build()
209            .expect("Failed to create rayon thread pool");
210
211        let semaphore = Arc::new(Semaphore::new(min_threads, max_threads));
212
213        Self {
214            config: Arc::new(RwLock::new(resolved)),
215            rayon_pool,
216            semaphore,
217            #[cfg(feature = "memory")]
218            memory_guard: parking_lot::Mutex::new(None),
219            #[cfg(feature = "scaling")]
220            scaling_pressure: parking_lot::Mutex::new(None),
221        }
222    }
223
224    /// Create a new worker pool from the config cascade.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if the config cascade is not initialised or validation fails.
229    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
230        let config = WorkerPoolConfig::from_cascade(key)?;
231        Self::try_new(config)
232    }
233
234    /// Process a batch of items in parallel using rayon (CPU-bound work).
235    ///
236    /// Each item is processed by the provided closure on a rayon worker thread.
237    /// A semaphore limits how many threads are active simultaneously (controlled
238    /// by the scaling controller). Results are returned in input order.
239    ///
240    /// Use this for: JSON parsing, transforms, compression, CEL evaluation, routing.
241    /// Do NOT use for work that needs `.await` -- use [`fan_out_async`](Self::fan_out_async).
242    pub fn process_batch<T, R, E, F>(&self, items: &[T], f: F) -> Vec<Result<R, E>>
243    where
244        T: Sync,
245        R: Send,
246        E: Send,
247        F: Fn(&T) -> Result<R, E> + Sync,
248    {
249        let sem = &self.semaphore;
250        self.rayon_pool.install(|| {
251            use rayon::prelude::*;
252            items
253                .par_iter()
254                .map(|item| {
255                    let _permit = sem.acquire();
256                    f(item)
257                })
258                .collect()
259        })
260    }
261
262    /// Fan out async work across tokio tasks with bounded concurrency.
263    ///
264    /// Each item is processed by the provided async closure on a tokio task.
265    /// Concurrency is limited by `async_concurrency` config.
266    ///
267    /// # Return contract
268    ///
269    /// The returned `Vec` has the same length as `items` and entries
270    /// correspond by index (input-order preserved):
271    ///
272    /// - `Some(Ok(r))` -- task completed successfully with result `r`
273    /// - `Some(Err(e))` -- task returned `Err(e)`
274    /// - `None` -- task panicked; the panic was logged at `error` level
275    ///   with the input index. The wrapping `Option` exists so the
276    ///   panic doesn't silently shorten the result vector (which was
277    ///   the previous behaviour and violated the input-order contract).
278    ///
279    /// Use this for: enrichment lookups, external API calls, storage writes.
280    pub async fn fan_out_async<T, R, E, F, Fut>(
281        &self,
282        items: &[T],
283        f: F,
284    ) -> Vec<Option<Result<R, E>>>
285    where
286        T: Sync + Send,
287        R: Send + 'static,
288        E: Send + 'static,
289        F: Fn(&T) -> Fut + Send + Sync,
290        Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
291    {
292        let concurrency = self.config.read().async_concurrency;
293        let mut results: Vec<Option<Result<R, E>>> = (0..items.len()).map(|_| None).collect();
294
295        // Process in chunks of `concurrency` to limit in-flight tasks
296        for chunk_start in (0..items.len()).step_by(concurrency) {
297            let chunk_end = (chunk_start + concurrency).min(items.len());
298            let mut handles = Vec::with_capacity(chunk_end - chunk_start);
299
300            for (idx, item) in items
301                .iter()
302                .enumerate()
303                .skip(chunk_start)
304                .take(chunk_end - chunk_start)
305            {
306                let fut = f(item);
307                handles.push((idx, tokio::spawn(fut)));
308            }
309
310            for (idx, handle) in handles {
311                match handle.await {
312                    Ok(result) => results[idx] = Some(result),
313                    Err(join_err) => {
314                        // Leave results[idx] = None; caller can detect
315                        // the panic without shrinking the output vec.
316                        tracing::error!(error = %join_err, idx, "fan_out_async task panicked");
317                    }
318                }
319            }
320        }
321
322        results
323    }
324
325    /// Concurrency-bounded async fan-out with per-item timeout + cancellation.
326    ///
327    /// Unlike [`fan_out_async`](Self::fan_out_async), this streams completions
328    /// (a `JoinSet`) so a slow item never blocks faster ones, applies an
329    /// optional per-item timeout, and stops spawning new work when the policy's
330    /// cancellation token fires. Results are returned in input order; each is a
331    /// [`FanOutResult`]. Emits `dfe_fanout_*` metrics (in-flight gauge, timeout/
332    /// panic counters, batch-duration histogram) when the `metrics` feature is on.
333    pub async fn fan_out_async_with_policy<T, R, E, F, Fut>(
334        &self,
335        items: &[T],
336        policy: &FanOutPolicy,
337        f: F,
338    ) -> Vec<FanOutResult<R, E>>
339    where
340        T: Sync + Send,
341        R: Send + 'static,
342        E: Send + 'static,
343        F: Fn(&T) -> Fut + Send + Sync,
344        Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
345    {
346        let concurrency = self.config.read().async_concurrency.max(1);
347        let mut results: Vec<FanOutResult<R, E>> =
348            (0..items.len()).map(|_| FanOutResult::Cancelled).collect();
349
350        #[cfg(feature = "metrics")]
351        let started = std::time::Instant::now();
352
353        let mut set: tokio::task::JoinSet<(usize, FanOutResult<R, E>)> =
354            tokio::task::JoinSet::new();
355        // task Id -> input idx, so a panicked task (JoinError carries no payload)
356        // still maps back to its slot.
357        let mut id_to_idx: std::collections::HashMap<tokio::task::Id, usize> =
358            std::collections::HashMap::new();
359        let mut next = 0;
360        let cancelled = || policy.cancel.as_ref().is_some_and(|c| c.is_cancelled());
361
362        loop {
363            // Seed up to the concurrency limit (unless cancelled).
364            while set.len() < concurrency && next < items.len() && !cancelled() {
365                let fut = f(&items[next]);
366                let timeout = policy.per_item_timeout;
367                let idx = next;
368                let handle = set.spawn(async move {
369                    let outcome = match timeout {
370                        Some(d) => match tokio::time::timeout(d, fut).await {
371                            Ok(Ok(r)) => FanOutResult::Ok(r),
372                            Ok(Err(e)) => FanOutResult::Err(e),
373                            Err(_) => FanOutResult::TimedOut,
374                        },
375                        None => match fut.await {
376                            Ok(r) => FanOutResult::Ok(r),
377                            Err(e) => FanOutResult::Err(e),
378                        },
379                    };
380                    (idx, outcome)
381                });
382                id_to_idx.insert(handle.id(), idx);
383                next += 1;
384            }
385
386            #[cfg(feature = "metrics")]
387            ::metrics::gauge!("dfe_fanout_inflight").set(set.len() as f64);
388
389            let Some(joined) = set.join_next().await else {
390                break; // nothing in flight -- done (or cancelled with none left)
391            };
392            match joined {
393                Ok((idx, outcome)) => {
394                    #[cfg(feature = "metrics")]
395                    if matches!(outcome, FanOutResult::TimedOut) {
396                        ::metrics::counter!("dfe_fanout_timeout_total").increment(1);
397                    }
398                    results[idx] = outcome;
399                }
400                Err(join_err) => {
401                    // Panicked task -- map its task Id back to the input slot.
402                    if let Some(&idx) = id_to_idx.get(&join_err.id()) {
403                        results[idx] = FanOutResult::Panicked;
404                    }
405                    #[cfg(feature = "metrics")]
406                    ::metrics::counter!("dfe_fanout_panic_total").increment(1);
407                    tracing::error!(error = %join_err, "fan_out_async_with_policy task panicked");
408                }
409            }
410
411            // If cancelled and nothing left in flight, stop (remaining stay Cancelled).
412            if cancelled() && set.is_empty() {
413                break;
414            }
415        }
416
417        #[cfg(feature = "metrics")]
418        {
419            ::metrics::gauge!("dfe_fanout_inflight").set(0.0);
420            ::metrics::histogram!("dfe_fanout_batch_duration_seconds")
421                .record(started.elapsed().as_secs_f64());
422        }
423
424        results
425    }
426
427    /// Map owned items in parallel under the concurrency limiter.
428    ///
429    /// Like [`process_batch`](Self::process_batch) but takes OWNED items and a
430    /// closure that consumes each (so the transform may mutate its own copy),
431    /// returning an arbitrary `R` per item. Crucially the semaphore IS applied
432    /// per item, so this path obeys the scaler target -- unlike
433    /// [`install`](Self::install). Results are collected in input order.
434    ///
435    /// Used by `BatchEngine`'s parsed mid-tier transform so the parsed path is
436    /// throttled identically to the raw path.
437    pub fn map_owned<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
438    where
439        T: Send,
440        R: Send,
441        F: Fn(T) -> R + Sync,
442    {
443        let sem = &self.semaphore;
444        self.rayon_pool.install(|| {
445            use rayon::prelude::*;
446            items
447                .into_par_iter()
448                .map(|item| {
449                    let _permit = sem.acquire();
450                    f(item)
451                })
452                .collect()
453        })
454    }
455
456    /// Execute a closure on the rayon thread pool.
457    ///
458    /// Provides direct access to the rayon pool for operations that need
459    /// `par_iter_mut` or other rayon primitives not covered by the throttled
460    /// helpers. The semaphore is NOT applied -- callers manage their own
461    /// concurrency. Prefer [`process_batch`](Self::process_batch) or
462    /// [`map_owned`](Self::map_owned), which respect the scaler target; reach
463    /// for `install` only for genuine rayon-primitive escape hatches.
464    pub fn install<R: Send>(&self, f: impl FnOnce() -> R + Send) -> R {
465        self.rayon_pool.install(f)
466    }
467
468    /// Register worker pool metrics with the `MetricsManager`.
469    ///
470    /// Registers operational metrics and emits threshold gauges with current values.
471    /// Call this once during startup after creating the pool.
472    pub fn register_metrics(&self, manager: &crate::metrics::MetricsManager) {
473        let config = self.config.read();
474        super::metrics::register(manager, &config);
475    }
476
477    /// Start the background scaling controller.
478    ///
479    /// The controller samples CPU/memory every `scale_interval_secs` and adjusts
480    /// the semaphore permits based on watermark bands. Stops on cancellation.
481    pub fn start_scaling_loop(self: &Arc<Self>, cancel: tokio_util::sync::CancellationToken) {
482        let controller = super::scaler::ScalingController::new(self.clone());
483        tokio::spawn(controller.run(cancel));
484    }
485
486    /// Attach a `MemoryGuard` for dual-source memory pressure reading.
487    #[cfg(feature = "memory")]
488    pub fn set_memory_guard(&self, guard: Arc<crate::memory::MemoryGuard>) {
489        *self.memory_guard.lock() = Some(guard);
490    }
491
492    /// Attach a `ScalingPressure` for bidirectional pressure integration.
493    #[cfg(feature = "scaling")]
494    pub fn set_scaling_pressure(&self, pressure: Arc<crate::scaling::ScalingPressure>) {
495        *self.scaling_pressure.lock() = Some(pressure);
496    }
497
498    /// Number of permits currently leased -- true in-flight worker count.
499    ///
500    /// An idle pool reports 0 regardless of the scaler target. This is the
501    /// telemetry-grade "active" count (vs [`target_threads`](Self::target_threads),
502    /// the scaler's desired ceiling).
503    #[must_use]
504    pub fn active_threads(&self) -> usize {
505        self.semaphore.leased()
506    }
507
508    /// Current scaler target concurrency (the admission ceiling).
509    #[must_use]
510    pub fn target_threads(&self) -> usize {
511        self.semaphore.target()
512    }
513
514    /// Headroom: permits that could be leased right now (`target - leased`).
515    #[must_use]
516    pub fn available_threads(&self) -> usize {
517        self.semaphore.available()
518    }
519
520    /// Maximum thread count (pool size).
521    #[must_use]
522    pub fn max_threads(&self) -> usize {
523        self.config.read().max_threads
524    }
525}
526
527#[cfg(test)]
528mod semaphore_tests {
529    use std::sync::Arc;
530    use std::sync::atomic::{AtomicUsize, Ordering};
531
532    use super::Semaphore;
533
534    #[test]
535    fn idle_reports_zero_leased() {
536        let s = Semaphore::new(2, 8);
537        assert_eq!(s.leased(), 0);
538        assert_eq!(s.target(), 2);
539        assert_eq!(s.available(), 2);
540    }
541
542    #[test]
543    fn lease_and_drop_track_leased() {
544        let s = Semaphore::new(4, 8);
545        {
546            let _g1 = s.acquire();
547            let _g2 = s.acquire();
548            assert_eq!(s.leased(), 2);
549            assert_eq!(s.available(), 2);
550        }
551        assert_eq!(s.leased(), 0, "drops release leases");
552        assert_eq!(s.available(), 4);
553    }
554
555    #[test]
556    fn downscale_does_not_overshoot_on_drop() {
557        // Lease the full target, shrink the target while leased, then drain.
558        // The old model refilled `available` toward max_permits on drop,
559        // undoing the downscale; the new model derives available from the
560        // target, so post-drain available == target, not max.
561        let s = Semaphore::new(8, 8);
562        let guards: Vec<_> = (0..8).map(|_| s.acquire()).collect();
563        assert_eq!(s.leased(), 8);
564        s.set_target(2);
565        assert_eq!(s.target(), 2);
566        assert_eq!(
567            s.available(),
568            0,
569            "leased (8) exceeds target (2): no headroom"
570        );
571        drop(guards);
572        assert_eq!(s.leased(), 0);
573        assert_eq!(
574            s.available(),
575            2,
576            "available equals target after drain, not max_permits"
577        );
578    }
579
580    #[test]
581    fn set_target_clamps_to_one_and_max() {
582        let s = Semaphore::new(4, 8);
583        s.set_target(0);
584        assert_eq!(s.target(), 1, "target floored at 1 to avoid deadlock");
585        s.set_target(100);
586        assert_eq!(s.target(), 8, "target capped at max_permits");
587    }
588
589    #[test]
590    fn contention_never_exceeds_target() {
591        // 8 threads hammer a target=2 limiter; leased must never exceed 2.
592        let s = Arc::new(Semaphore::new(2, 2));
593        let max_seen = Arc::new(AtomicUsize::new(0));
594        let handles: Vec<_> = (0..8)
595            .map(|_| {
596                let s = Arc::clone(&s);
597                let max_seen = Arc::clone(&max_seen);
598                std::thread::spawn(move || {
599                    for _ in 0..50 {
600                        let _g = s.acquire();
601                        max_seen.fetch_max(s.leased(), Ordering::Relaxed);
602                        std::thread::yield_now();
603                    }
604                })
605            })
606            .collect();
607        for h in handles {
608            h.join().unwrap();
609        }
610        assert!(
611            max_seen.load(Ordering::Relaxed) <= 2,
612            "leased never exceeded target=2"
613        );
614        assert_eq!(s.leased(), 0);
615    }
616
617    #[test]
618    fn grow_target_wakes_parked_acquirer() {
619        // target=1, one lease held; a second acquirer parks until the target
620        // grows -- proving wakeup on set_target, not a spin.
621        let s = Arc::new(Semaphore::new(1, 4));
622        let held = s.acquire();
623        assert_eq!(s.leased(), 1);
624        let s2 = Arc::clone(&s);
625        let handle = std::thread::spawn(move || {
626            let _g = s2.acquire();
627            s2.leased()
628        });
629        std::thread::sleep(std::time::Duration::from_millis(50));
630        s.set_target(2);
631        let observed = handle.join().unwrap();
632        assert!(observed >= 1, "parked acquirer proceeded after target grew");
633        drop(held);
634    }
635}