Skip to main content

hyperi_rustlib/worker/
pool.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/pool.rs
3// Purpose:   Rayon pool + semaphore management, process_batch(), fan_out_async()
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use std::sync::Arc;
10
11use parking_lot::{Condvar, Mutex, RwLock};
12use rayon::ThreadPool;
13
14use super::config::WorkerPoolConfig;
15
16/// Adaptive worker pool with hybrid rayon (CPU) + tokio (async I/O) execution.
17///
18/// Provides two APIs:
19/// - [`process_batch`](Self::process_batch) -- CPU-bound work via rayon
20///   (JSON parsing, transforms, compression, CEL evaluation)
21/// - [`fan_out_async`](Self::fan_out_async) -- async I/O via tokio
22///   (enrichment, external APIs, storage writes)
23///
24/// The pool auto-scales active threads based on CPU/memory pressure using
25/// watermark bands. All thresholds are config-cascade overridable and emitted
26/// as gauge metrics.
27pub struct AdaptiveWorkerPool {
28    pub(crate) config: Arc<RwLock<WorkerPoolConfig>>,
29    rayon_pool: ThreadPool,
30    pub(crate) semaphore: Arc<Semaphore>,
31    #[cfg(feature = "memory")]
32    pub(crate) memory_guard: parking_lot::Mutex<Option<Arc<crate::memory::MemoryGuard>>>,
33    #[cfg(feature = "scaling")]
34    pub(crate) scaling_pressure: parking_lot::Mutex<Option<Arc<crate::scaling::ScalingPressure>>>,
35}
36
37/// Concurrency limiter for throttling rayon thread usage.
38///
39/// Rayon pools cannot be resized, so a limiter controls how many threads
40/// actively pick up work. Models explicit `target` (the scaler's desired
41/// concurrency) and `leased` (permits currently held by in-flight work);
42/// available headroom is the derived `target - leased`. A thread that cannot
43/// lease (`leased >= target`) PARKS on a condvar -- it does not spin -- so the
44/// throttle conserves CPU exactly when the scaler is trying to.
45///
46/// Why this shape (vs the old `available`/`max_permits` atomic): the scaler
47/// sets a *target*, and `active_threads()` reports *leased* (true in-flight),
48/// so an idle pool reports zero active and a downscale cannot be undone by
49/// guard drops refilling toward `max` -- drops only decrement `leased`, and
50/// no new lease is admitted while `leased >= target`.
51pub(crate) struct Semaphore {
52    state: Mutex<SemState>,
53    /// Signalled when a permit frees or the target grows.
54    available: Condvar,
55    /// Architectural ceiling (rayon pool size); `target` never exceeds it.
56    max_permits: usize,
57}
58
59struct SemState {
60    /// Scaler-controlled desired concurrency, kept in `[1, max_permits]`.
61    target: usize,
62    /// Permits currently held by in-flight work.
63    leased: usize,
64}
65
66impl Semaphore {
67    fn new(initial_target: usize, max_permits: usize) -> Self {
68        let max_permits = max_permits.max(1);
69        Self {
70            state: Mutex::new(SemState {
71                target: initial_target.clamp(1, max_permits),
72                leased: 0,
73            }),
74            available: Condvar::new(),
75            max_permits,
76        }
77    }
78
79    /// Lease a permit, parking until `leased < target`. Releases on drop.
80    fn acquire(&self) -> SemaphoreGuard<'_> {
81        let mut st = self.state.lock();
82        while st.leased >= st.target {
83            self.available.wait(&mut st);
84        }
85        st.leased += 1;
86        SemaphoreGuard { semaphore: self }
87    }
88
89    /// Set the target concurrency (called by the scaler). Clamped to
90    /// `[1, max_permits]`. Growing the target wakes parked acquirers so they
91    /// re-check; shrinking simply stops new leases until `leased` falls below
92    /// the new target -- in-flight work drains naturally.
93    pub(crate) fn set_target(&self, target: usize) {
94        let clamped = target.clamp(1, self.max_permits);
95        let mut st = self.state.lock();
96        let grew = clamped > st.target;
97        st.target = clamped;
98        drop(st);
99        if grew {
100            self.available.notify_all();
101        }
102    }
103
104    /// Current target concurrency.
105    pub(crate) fn target(&self) -> usize {
106        self.state.lock().target
107    }
108
109    /// Permits currently leased (in-flight work).
110    pub(crate) fn leased(&self) -> usize {
111        self.state.lock().leased
112    }
113
114    /// Headroom: how many more permits can be leased right now.
115    pub(crate) fn available(&self) -> usize {
116        let st = self.state.lock();
117        st.target.saturating_sub(st.leased)
118    }
119}
120
121struct SemaphoreGuard<'a> {
122    semaphore: &'a Semaphore,
123}
124
125impl Drop for SemaphoreGuard<'_> {
126    fn drop(&mut self) {
127        let mut st = self.semaphore.state.lock();
128        st.leased = st.leased.saturating_sub(1);
129        drop(st);
130        // Wake one parked acquirer; the freed permit is now leasable
131        // (subject to the current target, which it re-checks).
132        self.semaphore.available.notify_one();
133    }
134}
135
136/// Policy for [`AdaptiveWorkerPool::fan_out_async_with_policy`].
137///
138/// The plain [`fan_out_async`](AdaptiveWorkerPool::fan_out_async) helper has no
139/// timeout or cancellation, and a single hung future stalls its whole chunk. At
140/// fleet scale, user code eventually passes a future that ignores deadlines, so
141/// reusable external-I/O fan-out needs a deadline + cancellation contract.
142#[derive(Debug, Clone, Default)]
143pub struct FanOutPolicy {
144    /// Per-item timeout. `None` = no timeout (caller must bound the future).
145    pub per_item_timeout: Option<std::time::Duration>,
146    /// Cancellation token. When cancelled, no NEW items are spawned; in-flight
147    /// items are still awaited. Remaining unspawned items report `Cancelled`.
148    pub cancel: Option<tokio_util::sync::CancellationToken>,
149}
150
151/// Per-item outcome from [`AdaptiveWorkerPool::fan_out_async_with_policy`].
152#[derive(Debug)]
153pub enum FanOutResult<R, E> {
154    /// The future completed with `Ok`.
155    Ok(R),
156    /// The future completed with `Err`.
157    Err(E),
158    /// The future exceeded `per_item_timeout`.
159    TimedOut,
160    /// The task panicked (logged at `error`).
161    Panicked,
162    /// Not spawned because the cancellation token fired first.
163    Cancelled,
164}
165
166impl AdaptiveWorkerPool {
167    /// Create a new worker pool with the given configuration, validating it.
168    ///
169    /// Resolves `max_threads = 0` to the detected CPU count, then validates the
170    /// resolved config (rejects `min_threads > max_threads`, bad watermark
171    /// ordering, zero `async_concurrency`, etc.) before building. This prevents
172    /// invalid runtime state that would otherwise panic later in the scaler's
173    /// `clamp(min, max)`.
174    ///
175    /// # Errors
176    ///
177    /// Returns `ConfigError` if the resolved config is invalid.
178    pub fn try_new(config: WorkerPoolConfig) -> Result<Self, crate::config::ConfigError> {
179        let mut resolved = config;
180        resolved.resolve_max_threads();
181        resolved.validate()?;
182        Ok(Self::build(resolved))
183    }
184
185    /// Create a new worker pool with the given configuration.
186    ///
187    /// Resolves `max_threads = 0` to the detected CPU count.
188    ///
189    /// # Panics
190    ///
191    /// Panics immediately with a clear message if the config is invalid (e.g.
192    /// `min_threads > max_threads`). Use [`try_new`](Self::try_new) for fallible
193    /// construction from untrusted config.
194    #[must_use]
195    pub fn new(config: WorkerPoolConfig) -> Self {
196        Self::try_new(config).expect("invalid WorkerPoolConfig (use try_new to handle the error)")
197    }
198
199    /// Build the pool from an already-resolved, validated config.
200    fn build(resolved: WorkerPoolConfig) -> Self {
201        let max_threads = resolved.max_threads;
202        let min_threads = resolved.min_threads;
203
204        let rayon_pool = rayon::ThreadPoolBuilder::new()
205            .num_threads(max_threads)
206            .thread_name(|i| format!("worker-{i}"))
207            .build()
208            .expect("Failed to create rayon thread pool");
209
210        let semaphore = Arc::new(Semaphore::new(min_threads, max_threads));
211
212        Self {
213            config: Arc::new(RwLock::new(resolved)),
214            rayon_pool,
215            semaphore,
216            #[cfg(feature = "memory")]
217            memory_guard: parking_lot::Mutex::new(None),
218            #[cfg(feature = "scaling")]
219            scaling_pressure: parking_lot::Mutex::new(None),
220        }
221    }
222
223    /// Create a new worker pool from the config cascade.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the config cascade is not initialised or validation fails.
228    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
229        let config = WorkerPoolConfig::from_cascade(key)?;
230        Self::try_new(config)
231    }
232
233    /// Process a batch of items in parallel using rayon (CPU-bound work).
234    ///
235    /// Each item is processed by the provided closure on a rayon worker thread.
236    /// A semaphore limits how many threads are active simultaneously (controlled
237    /// by the scaling controller). Results are returned in input order.
238    ///
239    /// Use this for: JSON parsing, transforms, compression, CEL evaluation, routing.
240    /// Do NOT use for work that needs `.await` -- use [`fan_out_async`](Self::fan_out_async).
241    pub fn process_batch<T, R, E, F>(&self, items: &[T], f: F) -> Vec<Result<R, E>>
242    where
243        T: Sync,
244        R: Send,
245        E: Send,
246        F: Fn(&T) -> Result<R, E> + Sync,
247    {
248        let sem = &self.semaphore;
249        self.rayon_pool.install(|| {
250            use rayon::prelude::*;
251            items
252                .par_iter()
253                .map(|item| {
254                    let _permit = sem.acquire();
255                    f(item)
256                })
257                .collect()
258        })
259    }
260
261    /// Fan out async work across tokio tasks with bounded concurrency.
262    ///
263    /// Each item is processed by the provided async closure on a tokio task.
264    /// Concurrency is limited by `async_concurrency` config.
265    ///
266    /// # Return contract
267    ///
268    /// The returned `Vec` has the same length as `items` and entries
269    /// correspond by index (input-order preserved):
270    ///
271    /// - `Some(Ok(r))` -- task completed successfully with result `r`
272    /// - `Some(Err(e))` -- task returned `Err(e)`
273    /// - `None` -- task panicked; the panic was logged at `error` level
274    ///   with the input index. The wrapping `Option` exists so the
275    ///   panic doesn't silently shorten the result vector (which was
276    ///   the previous behaviour and violated the input-order contract).
277    ///
278    /// Use this for: enrichment lookups, external API calls, storage writes.
279    pub async fn fan_out_async<T, R, E, F, Fut>(
280        &self,
281        items: &[T],
282        f: F,
283    ) -> Vec<Option<Result<R, E>>>
284    where
285        T: Sync + Send,
286        R: Send + 'static,
287        E: Send + 'static,
288        F: Fn(&T) -> Fut + Send + Sync,
289        Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
290    {
291        let concurrency = self.config.read().async_concurrency;
292        let mut results: Vec<Option<Result<R, E>>> = (0..items.len()).map(|_| None).collect();
293
294        // Process in chunks of `concurrency` to limit in-flight tasks
295        for chunk_start in (0..items.len()).step_by(concurrency) {
296            let chunk_end = (chunk_start + concurrency).min(items.len());
297            let mut handles = Vec::with_capacity(chunk_end - chunk_start);
298
299            for (idx, item) in items
300                .iter()
301                .enumerate()
302                .skip(chunk_start)
303                .take(chunk_end - chunk_start)
304            {
305                let fut = f(item);
306                handles.push((idx, tokio::spawn(fut)));
307            }
308
309            for (idx, handle) in handles {
310                match handle.await {
311                    Ok(result) => results[idx] = Some(result),
312                    Err(join_err) => {
313                        // Leave results[idx] = None; caller can detect
314                        // the panic without shrinking the output vec.
315                        tracing::error!(error = %join_err, idx, "fan_out_async task panicked");
316                    }
317                }
318            }
319        }
320
321        results
322    }
323
324    /// Concurrency-bounded async fan-out with per-item timeout + cancellation.
325    ///
326    /// Unlike [`fan_out_async`](Self::fan_out_async), this streams completions
327    /// (a `JoinSet`) so a slow item never blocks faster ones, applies an
328    /// optional per-item timeout, and stops spawning new work when the policy's
329    /// cancellation token fires. Results are returned in input order; each is a
330    /// [`FanOutResult`]. Emits `dfe_fanout_*` metrics (in-flight gauge, timeout/
331    /// panic counters, batch-duration histogram) when the `metrics` feature is on.
332    pub async fn fan_out_async_with_policy<T, R, E, F, Fut>(
333        &self,
334        items: &[T],
335        policy: &FanOutPolicy,
336        f: F,
337    ) -> Vec<FanOutResult<R, E>>
338    where
339        T: Sync + Send,
340        R: Send + 'static,
341        E: Send + 'static,
342        F: Fn(&T) -> Fut + Send + Sync,
343        Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
344    {
345        let concurrency = self.config.read().async_concurrency.max(1);
346        let mut results: Vec<FanOutResult<R, E>> =
347            (0..items.len()).map(|_| FanOutResult::Cancelled).collect();
348
349        #[cfg(feature = "metrics")]
350        let started = std::time::Instant::now();
351
352        let mut set: tokio::task::JoinSet<(usize, FanOutResult<R, E>)> =
353            tokio::task::JoinSet::new();
354        // task Id -> input idx, so a panicked task (JoinError carries no payload)
355        // still maps back to its slot.
356        let mut id_to_idx: std::collections::HashMap<tokio::task::Id, usize> =
357            std::collections::HashMap::new();
358        let mut next = 0;
359        let cancelled = || policy.cancel.as_ref().is_some_and(|c| c.is_cancelled());
360
361        loop {
362            // Seed up to the concurrency limit (unless cancelled).
363            while set.len() < concurrency && next < items.len() && !cancelled() {
364                let fut = f(&items[next]);
365                let timeout = policy.per_item_timeout;
366                let idx = next;
367                let handle = set.spawn(async move {
368                    let outcome = match timeout {
369                        Some(d) => match tokio::time::timeout(d, fut).await {
370                            Ok(Ok(r)) => FanOutResult::Ok(r),
371                            Ok(Err(e)) => FanOutResult::Err(e),
372                            Err(_) => FanOutResult::TimedOut,
373                        },
374                        None => match fut.await {
375                            Ok(r) => FanOutResult::Ok(r),
376                            Err(e) => FanOutResult::Err(e),
377                        },
378                    };
379                    (idx, outcome)
380                });
381                id_to_idx.insert(handle.id(), idx);
382                next += 1;
383            }
384
385            #[cfg(feature = "metrics")]
386            ::metrics::gauge!("dfe_fanout_inflight").set(set.len() as f64);
387
388            let Some(joined) = set.join_next().await else {
389                break; // nothing in flight -- done (or cancelled with none left)
390            };
391            match joined {
392                Ok((idx, outcome)) => {
393                    #[cfg(feature = "metrics")]
394                    if matches!(outcome, FanOutResult::TimedOut) {
395                        ::metrics::counter!("dfe_fanout_timeout_total").increment(1);
396                    }
397                    results[idx] = outcome;
398                }
399                Err(join_err) => {
400                    // Panicked task -- map its task Id back to the input slot.
401                    if let Some(&idx) = id_to_idx.get(&join_err.id()) {
402                        results[idx] = FanOutResult::Panicked;
403                    }
404                    #[cfg(feature = "metrics")]
405                    ::metrics::counter!("dfe_fanout_panic_total").increment(1);
406                    tracing::error!(error = %join_err, "fan_out_async_with_policy task panicked");
407                }
408            }
409
410            // If cancelled and nothing left in flight, stop (remaining stay Cancelled).
411            if cancelled() && set.is_empty() {
412                break;
413            }
414        }
415
416        #[cfg(feature = "metrics")]
417        {
418            ::metrics::gauge!("dfe_fanout_inflight").set(0.0);
419            ::metrics::histogram!("dfe_fanout_batch_duration_seconds")
420                .record(started.elapsed().as_secs_f64());
421        }
422
423        results
424    }
425
426    /// Map owned items in parallel under the concurrency limiter.
427    ///
428    /// Like [`process_batch`](Self::process_batch) but takes OWNED items and a
429    /// closure that consumes each (so the transform may mutate its own copy),
430    /// returning an arbitrary `R` per item. Crucially the semaphore IS applied
431    /// per item, so this path obeys the scaler target -- unlike
432    /// [`install`](Self::install). Results are collected in input order.
433    ///
434    /// Used by `BatchEngine`'s parsed mid-tier transform so the parsed path is
435    /// throttled identically to the raw path.
436    pub fn map_owned<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
437    where
438        T: Send,
439        R: Send,
440        F: Fn(T) -> R + Sync,
441    {
442        let sem = &self.semaphore;
443        self.rayon_pool.install(|| {
444            use rayon::prelude::*;
445            items
446                .into_par_iter()
447                .map(|item| {
448                    let _permit = sem.acquire();
449                    f(item)
450                })
451                .collect()
452        })
453    }
454
455    /// Execute a closure on the rayon thread pool.
456    ///
457    /// Provides direct access to the rayon pool for operations that need
458    /// `par_iter_mut` or other rayon primitives not covered by the throttled
459    /// helpers. The semaphore is NOT applied -- callers manage their own
460    /// concurrency. Prefer [`process_batch`](Self::process_batch) or
461    /// [`map_owned`](Self::map_owned), which respect the scaler target; reach
462    /// for `install` only for genuine rayon-primitive escape hatches.
463    pub fn install<R: Send>(&self, f: impl FnOnce() -> R + Send) -> R {
464        self.rayon_pool.install(f)
465    }
466
467    /// Register worker pool metrics with the `MetricsManager`.
468    ///
469    /// Registers operational metrics and emits threshold gauges with current values.
470    /// Call this once during startup after creating the pool.
471    pub fn register_metrics(&self, manager: &crate::metrics::MetricsManager) {
472        let config = self.config.read();
473        super::metrics::register(manager, &config);
474    }
475
476    /// Start the background scaling controller.
477    ///
478    /// The controller samples CPU/memory every `scale_interval_secs` and adjusts
479    /// the semaphore permits based on watermark bands. Stops on cancellation.
480    pub fn start_scaling_loop(self: &Arc<Self>, cancel: tokio_util::sync::CancellationToken) {
481        let controller = super::scaler::ScalingController::new(self.clone());
482        tokio::spawn(controller.run(cancel));
483    }
484
485    /// Attach a `MemoryGuard` for dual-source memory pressure reading.
486    #[cfg(feature = "memory")]
487    pub fn set_memory_guard(&self, guard: Arc<crate::memory::MemoryGuard>) {
488        *self.memory_guard.lock() = Some(guard);
489    }
490
491    /// Attach a `ScalingPressure` for bidirectional pressure integration.
492    #[cfg(feature = "scaling")]
493    pub fn set_scaling_pressure(&self, pressure: Arc<crate::scaling::ScalingPressure>) {
494        *self.scaling_pressure.lock() = Some(pressure);
495    }
496
497    /// Number of permits currently leased -- true in-flight worker count.
498    ///
499    /// An idle pool reports 0 regardless of the scaler target. This is the
500    /// telemetry-grade "active" count (vs [`target_threads`](Self::target_threads),
501    /// the scaler's desired ceiling).
502    #[must_use]
503    pub fn active_threads(&self) -> usize {
504        self.semaphore.leased()
505    }
506
507    /// Current scaler target concurrency (the admission ceiling).
508    #[must_use]
509    pub fn target_threads(&self) -> usize {
510        self.semaphore.target()
511    }
512
513    /// Headroom: permits that could be leased right now (`target - leased`).
514    #[must_use]
515    pub fn available_threads(&self) -> usize {
516        self.semaphore.available()
517    }
518
519    /// Maximum thread count (pool size).
520    #[must_use]
521    pub fn max_threads(&self) -> usize {
522        self.config.read().max_threads
523    }
524}
525
526#[cfg(test)]
527mod semaphore_tests {
528    use std::sync::Arc;
529    use std::sync::atomic::{AtomicUsize, Ordering};
530
531    use super::Semaphore;
532
533    #[test]
534    fn idle_reports_zero_leased() {
535        let s = Semaphore::new(2, 8);
536        assert_eq!(s.leased(), 0);
537        assert_eq!(s.target(), 2);
538        assert_eq!(s.available(), 2);
539    }
540
541    #[test]
542    fn lease_and_drop_track_leased() {
543        let s = Semaphore::new(4, 8);
544        {
545            let _g1 = s.acquire();
546            let _g2 = s.acquire();
547            assert_eq!(s.leased(), 2);
548            assert_eq!(s.available(), 2);
549        }
550        assert_eq!(s.leased(), 0, "drops release leases");
551        assert_eq!(s.available(), 4);
552    }
553
554    #[test]
555    fn downscale_does_not_overshoot_on_drop() {
556        // Lease the full target, shrink the target while leased, then drain.
557        // The old model refilled `available` toward max_permits on drop,
558        // undoing the downscale; the new model derives available from the
559        // target, so post-drain available == target, not max.
560        let s = Semaphore::new(8, 8);
561        let guards: Vec<_> = (0..8).map(|_| s.acquire()).collect();
562        assert_eq!(s.leased(), 8);
563        s.set_target(2);
564        assert_eq!(s.target(), 2);
565        assert_eq!(
566            s.available(),
567            0,
568            "leased (8) exceeds target (2): no headroom"
569        );
570        drop(guards);
571        assert_eq!(s.leased(), 0);
572        assert_eq!(
573            s.available(),
574            2,
575            "available equals target after drain, not max_permits"
576        );
577    }
578
579    #[test]
580    fn set_target_clamps_to_one_and_max() {
581        let s = Semaphore::new(4, 8);
582        s.set_target(0);
583        assert_eq!(s.target(), 1, "target floored at 1 to avoid deadlock");
584        s.set_target(100);
585        assert_eq!(s.target(), 8, "target capped at max_permits");
586    }
587
588    #[test]
589    fn contention_never_exceeds_target() {
590        // 8 threads hammer a target=2 limiter; leased must never exceed 2.
591        let s = Arc::new(Semaphore::new(2, 2));
592        let max_seen = Arc::new(AtomicUsize::new(0));
593        let handles: Vec<_> = (0..8)
594            .map(|_| {
595                let s = Arc::clone(&s);
596                let max_seen = Arc::clone(&max_seen);
597                std::thread::spawn(move || {
598                    for _ in 0..50 {
599                        let _g = s.acquire();
600                        max_seen.fetch_max(s.leased(), Ordering::Relaxed);
601                        std::thread::yield_now();
602                    }
603                })
604            })
605            .collect();
606        for h in handles {
607            h.join().unwrap();
608        }
609        assert!(
610            max_seen.load(Ordering::Relaxed) <= 2,
611            "leased never exceeded target=2"
612        );
613        assert_eq!(s.leased(), 0);
614    }
615
616    #[test]
617    fn grow_target_wakes_parked_acquirer() {
618        // target=1, one lease held; a second acquirer parks until the target
619        // grows -- proving wakeup on set_target, not a spin.
620        let s = Arc::new(Semaphore::new(1, 4));
621        let held = s.acquire();
622        assert_eq!(s.leased(), 1);
623        let s2 = Arc::clone(&s);
624        let handle = std::thread::spawn(move || {
625            let _g = s2.acquire();
626            s2.leased()
627        });
628        std::thread::sleep(std::time::Duration::from_millis(50));
629        s.set_target(2);
630        let observed = handle.join().unwrap();
631        assert!(observed >= 1, "parked acquirer proceeded after target grew");
632        drop(held);
633    }
634}