Skip to main content

hyperi_rustlib/governor/
source.rs

1// Project:   hyperi-rustlib
2// File:      src/governor/source.rs
3// Purpose:   Pressure seam + memory source for the self-regulation governor
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pressure seam: normalised readings, sources, and the unified latch.
10
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use crate::memory::MemoryGuard;
15
16/// A normalised pressure reading, clamped to `[0.0, 1.0]` on construction.
17///
18/// `NaN` collapses to `0.0` (treat an unreadable source as no pressure,
19/// never as max pressure -- a `NaN` masquerading as `1.0` would wedge the
20/// governor into a permanent hold).
21#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
22pub struct Pressure(f64);
23
24impl Pressure {
25    /// Construct a reading, clamping to `[0.0, 1.0]`. `NaN` becomes `0.0`.
26    #[must_use]
27    pub fn new(value: f64) -> Self {
28        // `clamp` panics on NaN, and `f64::max(NaN, 0.0)` returns 0.0 only
29        // for the left-NaN form -- be explicit so the intent is obvious.
30        let v = if value.is_nan() {
31            0.0
32        } else {
33            value.clamp(0.0, 1.0)
34        };
35        Self(v)
36    }
37
38    /// The clamped reading in `[0.0, 1.0]`.
39    #[must_use]
40    pub fn get(&self) -> f64 {
41        self.0
42    }
43}
44
45/// A source of normalised pressure feeding the unified governor.
46///
47/// Implementors are wrappers over the real signal (memory guard, and
48/// later CPU, queue depth, etc.). Keeping the trait here -- not in the
49/// signal's own module -- keeps `memory` a leaf with no governor
50/// dependency.
51pub trait PressureSource: Send + Sync {
52    /// Stable identifier for diagnostics (e.g. `"memory"`).
53    fn name(&self) -> &'static str;
54
55    /// Sample the current pressure.
56    fn sample(&self) -> Pressure;
57
58    /// Sensitivity weight applied to SOFT signals in the combine. HARD
59    /// signals ignore this (they are never down-weighted). Default `1.0`.
60    fn weight(&self) -> f64 {
61        1.0
62    }
63
64    /// HARD signals are never masked or down-weighted -- their raw reading
65    /// always competes for the combined level. Default `false`.
66    fn is_hard(&self) -> bool {
67        false
68    }
69}
70
71/// HARD pressure source backed by the [`MemoryGuard`].
72///
73/// A thin wrapper so `memory` stays a leaf module: the trait
74/// implementation lives here, in the governor, not in `guard.rs`.
75pub struct MemoryPressureSource(Arc<MemoryGuard>);
76
77impl MemoryPressureSource {
78    /// Wrap a shared memory guard as a pressure source.
79    #[must_use]
80    pub fn new(guard: Arc<MemoryGuard>) -> Self {
81        Self(guard)
82    }
83}
84
85impl PressureSource for MemoryPressureSource {
86    fn name(&self) -> &'static str {
87        "memory"
88    }
89
90    fn sample(&self) -> Pressure {
91        Pressure::new(self.0.pressure_ratio())
92    }
93
94    fn weight(&self) -> f64 {
95        1.0
96    }
97
98    fn is_hard(&self) -> bool {
99        true
100    }
101}
102
103/// Hysteresis band for the pause/resume latch.
104///
105/// `pause_above` must be strictly greater than `resume_below`, otherwise
106/// there is no band to hold the latch and it degenerates to a single
107/// threshold (flapping). [`Self::new`] validates this.
108#[derive(Debug, Clone, Copy)]
109pub struct Hysteresis {
110    /// Arm the latch (start holding) when the level reaches this.
111    pub pause_above: f64,
112    /// Release the latch (stop holding) when the level drops to this.
113    pub resume_below: f64,
114}
115
116impl Hysteresis {
117    /// Construct a band, validating `pause_above > resume_below` and that
118    /// both bounds sit inside `[0.0, 1.0]`.
119    ///
120    /// The range check matters because [`Pressure`] samples are clamped to
121    /// `[0.0, 1.0]`: a `resume_below < 0.0` could never release the latch
122    /// (permanent stuck-pause) and a `pause_above > 1.0` could never arm it
123    /// (brake silently disabled).
124    ///
125    /// # Errors
126    ///
127    /// Returns `Err` if the bounds are non-finite, outside `[0.0, 1.0]`, or
128    /// `pause_above` is not strictly greater than `resume_below`.
129    pub fn new(pause_above: f64, resume_below: f64) -> Result<Self, String> {
130        if !pause_above.is_finite() || !resume_below.is_finite() {
131            return Err(format!(
132                "hysteresis bounds must be finite, got pause_above={pause_above}, \
133                 resume_below={resume_below}"
134            ));
135        }
136        if !(0.0..=1.0).contains(&pause_above) || !(0.0..=1.0).contains(&resume_below) {
137            return Err(format!(
138                "hysteresis bounds must be within [0.0, 1.0] (pressure levels are \
139                 clamped to that range), got pause_above={pause_above}, \
140                 resume_below={resume_below}"
141            ));
142        }
143        if pause_above <= resume_below {
144            return Err(format!(
145                "hysteresis requires pause_above > resume_below, got \
146                 pause_above={pause_above}, resume_below={resume_below}"
147            ));
148        }
149        Ok(Self {
150            pause_above,
151            resume_below,
152        })
153    }
154}
155
156/// A per-source diagnostic line in a [`UnifiedPressureSnapshot`].
157#[derive(Debug, Clone)]
158pub struct SourceReading {
159    /// Source identifier.
160    pub name: &'static str,
161    /// Raw clamped sample.
162    pub raw: f64,
163    /// Weight the source declares.
164    pub weight: f64,
165    /// Whether the source is HARD (raw, never masked).
166    pub is_hard: bool,
167    /// The value that competed for the combined level (raw for HARD,
168    /// `raw * weight` for SOFT).
169    pub effective: f64,
170}
171
172/// Point-in-time breakdown of the governor for diagnostics / metrics.
173#[derive(Debug, Clone)]
174pub struct UnifiedPressureSnapshot {
175    /// Per-source readings.
176    pub sources: Vec<SourceReading>,
177    /// Max raw reading across HARD sources (`0.0` if none).
178    pub hard_max: f64,
179    /// Max weighted reading across SOFT sources (`0.0` if none).
180    pub soft_max: f64,
181    /// Combined level (`hard_max.max(soft_max)`).
182    pub level: f64,
183    /// Latched hold state at snapshot time.
184    pub paused: bool,
185}
186
187/// Combines pressure sources into one level under a hysteretic latch.
188///
189/// See the [module docs](crate::governor) for the design invariants. The
190/// latch state is an [`AtomicBool`] so [`should_hold`](Self::should_hold)
191/// is a cheap, `Sync` hot-path check.
192pub struct UnifiedPressure {
193    sources: Vec<Arc<dyn PressureSource>>,
194    hyst: Hysteresis,
195    paused: AtomicBool,
196}
197
198impl UnifiedPressure {
199    /// Build a governor over the given sources and hysteresis band.
200    #[must_use]
201    pub fn new(sources: Vec<Arc<dyn PressureSource>>, hyst: Hysteresis) -> Self {
202        Self {
203            sources,
204            hyst,
205            paused: AtomicBool::new(false),
206        }
207    }
208
209    /// Add a source after construction.
210    ///
211    /// Proves the seam accepts a new signal kind (e.g. a future CPU
212    /// source) with zero change to the gate API -- existing callers of
213    /// [`level`](Self::level) / [`should_hold`](Self::should_hold) are
214    /// untouched.
215    pub fn add_source(&mut self, source: Arc<dyn PressureSource>) {
216        self.sources.push(source);
217    }
218
219    /// Combined pressure level in `[0.0, 1.0]`.
220    ///
221    /// `hard_max` = max raw reading over HARD sources (never weighted,
222    /// never masked). `soft_max` = max of `sample * weight` over SOFT
223    /// sources. `level = hard_max.max(soft_max)`.
224    ///
225    /// Soft `weight()` is clamped to `[0.0, 1.0]` (non-finite -> `1.0`) at the
226    /// combine: `add_source` is the advertised extension seam, so a future
227    /// soft source returning a stray weight cannot push `level()` outside its
228    /// documented range or silently neutralise itself with a negative/NaN
229    /// weight. One branch per soft source per `evaluate()` (not per record);
230    /// with no soft source wired today the cost is nil.
231    #[must_use]
232    pub fn level(&self) -> f64 {
233        let mut hard_max = 0.0_f64;
234        let mut soft_max = 0.0_f64;
235        for src in &self.sources {
236            let raw = src.sample().get();
237            if src.is_hard() {
238                hard_max = hard_max.max(raw);
239            } else {
240                let w = src.weight();
241                let w = if w.is_finite() {
242                    w.clamp(0.0, 1.0)
243                } else {
244                    1.0
245                };
246                soft_max = soft_max.max(raw * w);
247            }
248        }
249        hard_max.max(soft_max)
250    }
251
252    /// Hysteretic hold latch over [`level`](Self::level).
253    ///
254    /// - Held and `level <= resume_below` -> release, return `false`.
255    /// - Not held and `level >= pause_above` -> arm, return `true`.
256    /// - Otherwise -> return the current latch state (the band holds it).
257    #[must_use]
258    pub fn should_hold(&self) -> bool {
259        let level = self.level();
260        let paused = self.paused.load(Ordering::Acquire);
261        if paused {
262            if level <= self.hyst.resume_below {
263                self.paused.store(false, Ordering::Release);
264                return false;
265            }
266            true
267        } else {
268            if level >= self.hyst.pause_above {
269                self.paused.store(true, Ordering::Release);
270                return true;
271            }
272            false
273        }
274    }
275
276    /// Per-source breakdown plus the combined level and latch state.
277    #[must_use]
278    pub fn snapshot(&self) -> UnifiedPressureSnapshot {
279        let mut readings = Vec::with_capacity(self.sources.len());
280        let mut hard_max = 0.0_f64;
281        let mut soft_max = 0.0_f64;
282        for src in &self.sources {
283            let raw = src.sample().get();
284            let weight = src.weight();
285            let is_hard = src.is_hard();
286            let effective = if is_hard { raw } else { raw * weight };
287            if is_hard {
288                hard_max = hard_max.max(raw);
289            } else {
290                soft_max = soft_max.max(effective);
291            }
292            readings.push(SourceReading {
293                name: src.name(),
294                raw,
295                weight,
296                is_hard,
297                effective,
298            });
299        }
300        UnifiedPressureSnapshot {
301            sources: readings,
302            hard_max,
303            soft_max,
304            level: hard_max.max(soft_max),
305            paused: self.paused.load(Ordering::Acquire),
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use std::sync::atomic::AtomicU64;
314
315    /// Scriptable test double: a source whose reading can be set at
316    /// runtime so a single `UnifiedPressure` can be driven through a
317    /// rising/falling sequence. Stores the reading as bit-pattern `u64`
318    /// so it stays `Sync` without a lock (crate forbids `unsafe`, so a
319    /// `Cell` would not be `Sync`).
320    struct MockSource {
321        name: &'static str,
322        value: AtomicU64,
323        weight: f64,
324        hard: bool,
325    }
326
327    impl MockSource {
328        fn new(name: &'static str, value: f64, weight: f64, hard: bool) -> Self {
329            Self {
330                name,
331                value: AtomicU64::new(value.to_bits()),
332                weight,
333                hard,
334            }
335        }
336
337        fn set(&self, value: f64) {
338            self.value.store(value.to_bits(), Ordering::Relaxed);
339        }
340    }
341
342    impl PressureSource for MockSource {
343        fn name(&self) -> &'static str {
344            self.name
345        }
346        fn sample(&self) -> Pressure {
347            Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
348        }
349        fn weight(&self) -> f64 {
350            self.weight
351        }
352        fn is_hard(&self) -> bool {
353            self.hard
354        }
355    }
356
357    fn approx(a: f64, b: f64) -> bool {
358        (a - b).abs() < 1e-9
359    }
360
361    #[test]
362    fn pressure_clamps_and_handles_nan() {
363        assert!(approx(Pressure::new(-1.0).get(), 0.0));
364        assert!(approx(Pressure::new(2.0).get(), 1.0));
365        assert!(approx(Pressure::new(0.5).get(), 0.5));
366        // NaN must collapse to 0.0, NOT to 1.0 -- a NaN reading must never
367        // wedge the governor into a permanent hold.
368        assert!(approx(Pressure::new(f64::NAN).get(), 0.0));
369        assert!(approx(Pressure::new(f64::INFINITY).get(), 1.0));
370        assert!(approx(Pressure::new(f64::NEG_INFINITY).get(), 0.0));
371    }
372
373    #[test]
374    fn hysteresis_rejects_inverted_band() {
375        assert!(Hysteresis::new(0.80, 0.65).is_ok());
376        assert!(Hysteresis::new(0.65, 0.80).is_err());
377        assert!(Hysteresis::new(0.80, 0.80).is_err());
378        assert!(Hysteresis::new(f64::NAN, 0.5).is_err());
379    }
380
381    /// Out-of-`[0,1]` bands must be rejected: `Pressure` clamps every sample to
382    /// `[0,1]`, so a band outside that range is unreachable in one direction --
383    /// `resume_below < 0.0` can never release (permanent stuck-pause) and
384    /// `pause_above > 1.0` can never arm (brake silently disabled). Both are
385    /// the failure modes the never-OOM/no-flap contract forbids.
386    #[test]
387    fn hysteresis_rejects_out_of_range_band() {
388        // resume_below below the clamp floor -> latch could never release.
389        assert!(Hysteresis::new(0.5, -0.1).is_err());
390        // pause_above above the clamp ceiling -> latch could never arm.
391        assert!(Hysteresis::new(1.5, 0.65).is_err());
392        // Both ends in range, valid ordering -> still ok (no regression).
393        assert!(Hysteresis::new(1.0, 0.0).is_ok());
394        assert!(Hysteresis::new(0.80, 0.65).is_ok());
395    }
396
397    /// A future SOFT source returning a stray weight must not push `level()`
398    /// outside `[0,1]` or silently neutralise itself. `add_source` is the
399    /// advertised extension seam, so the combine clamps each soft weight to
400    /// `[0,1]` (non-finite -> `1.0`).
401    #[test]
402    fn soft_weight_is_clamped_at_combine() {
403        // weight > 1 at full pressure -> clamped to 1.0, level == 1.0 (not 5.0).
404        let over = Arc::new(MockSource::new("over", 1.0, 5.0, false));
405        let p = UnifiedPressure::new(
406            vec![Arc::clone(&over) as Arc<dyn PressureSource>],
407            Hysteresis::new(0.80, 0.65).expect("band"),
408        );
409        assert!(
410            approx(p.level(), 1.0),
411            "level must stay <= 1.0, got {}",
412            p.level()
413        );
414
415        // NaN weight -> 1.0 multiplier (not dropped by f64::max(NaN)).
416        let nan_w = Arc::new(MockSource::new("nan", 0.5, f64::NAN, false));
417        let p2 = UnifiedPressure::new(
418            vec![Arc::clone(&nan_w) as Arc<dyn PressureSource>],
419            Hysteresis::new(0.80, 0.65).expect("band"),
420        );
421        assert!(
422            approx(p2.level(), 0.5),
423            "NaN weight -> 1.0, got {}",
424            p2.level()
425        );
426
427        // Negative weight -> clamped to 0.0, contributes nothing.
428        let neg = Arc::new(MockSource::new("neg", 1.0, -2.0, false));
429        let p3 = UnifiedPressure::new(
430            vec![Arc::clone(&neg) as Arc<dyn PressureSource>],
431            Hysteresis::new(0.80, 0.65).expect("band"),
432        );
433        assert!(
434            approx(p3.level(), 0.0),
435            "negative weight -> 0, got {}",
436            p3.level()
437        );
438    }
439
440    /// The adversarial proving test.
441    ///
442    /// Drives one `UnifiedPressure` through the full pause/resume cycle and
443    /// proves the two riskiest invariants:
444    ///   1. a saturated SOFT signal at weight 0.5 cannot force a hold the
445    ///      HARD signal would not (no soft-masks-hard, no spurious hold);
446    ///   2. the hysteresis latch arms on the rising edge, holds inside the
447    ///      band, releases on the falling edge, and re-arms cleanly (no
448    ///      sticky state).
449    ///
450    /// Step 6 proves a third soft source plugs in via `add_source` with
451    /// zero change to the gate API.
452    #[test]
453    fn adversarial_combine_and_hysteresis() {
454        let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
455
456        // HARD memory source + a SOFT "cpu" source at weight 0.5.
457        let mem = Arc::new(MockSource::new("memory", 0.50, 1.0, true));
458        let cpu = Arc::new(MockSource::new("cpu", 1.0, 0.5, false));
459
460        let governor = UnifiedPressure::new(
461            vec![
462                Arc::clone(&mem) as Arc<dyn PressureSource>,
463                Arc::clone(&cpu) as Arc<dyn PressureSource>,
464            ],
465            hyst,
466        );
467
468        // Step 1: memory=0.50, cpu=1.0 (saturated SOFT).
469        // soft = 1.0 * 0.5 = 0.50; hard = 0.50; level = max(0.50, 0.50) = 0.50.
470        // A saturated SOFT signal at weight 0.5 CANNOT force a hold the HARD
471        // signal would not. level < pause_above -> no hold.
472        assert!(
473            approx(governor.level(), 0.50),
474            "level should be 0.50, got {}",
475            governor.level()
476        );
477        assert!(
478            !governor.should_hold(),
479            "saturated soft signal must not mask/force a hold"
480        );
481
482        // Step 2: memory rises to 0.85 -> rising edge latches.
483        mem.set(0.85);
484        assert!(approx(governor.level(), 0.85), "hard 0.85 dominates");
485        assert!(
486            governor.should_hold(),
487            "rising edge above pause_above latches"
488        );
489
490        // Step 3: memory falls to 0.70 -> inside band (> resume_below) -> holds.
491        mem.set(0.70);
492        assert!(approx(governor.level(), 0.70));
493        assert!(
494            governor.should_hold(),
495            "0.70 is inside the hysteresis band -> latch stays held"
496        );
497
498        // Step 4: memory falls to 0.60 -> below resume_below -> releases.
499        mem.set(0.60);
500        assert!(approx(governor.level(), 0.60));
501        assert!(
502            !governor.should_hold(),
503            "falling edge below resume_below releases the latch"
504        );
505
506        // Step 5: memory back to 0.85 -> latch re-arms (no sticky state).
507        mem.set(0.85);
508        assert!(
509            governor.should_hold(),
510            "latch must re-arm cleanly with no sticky state"
511        );
512
513        // Step 6: add a THIRD soft source via add_source -- proves the seam
514        // accepts a new signal kind with zero gate-API change. Release first
515        // so we can observe the new source's effect cleanly.
516        mem.set(0.10);
517        let mut governor = governor;
518        let queue = Arc::new(MockSource::new("queue_depth", 0.0, 0.5, false));
519        governor.add_source(Arc::clone(&queue) as Arc<dyn PressureSource>);
520
521        // Drop out of the band first (everything low) so the latch releases.
522        cpu.set(0.0);
523        assert!(!governor.should_hold(), "all sources low -> released");
524
525        // Now saturate the new SOFT source: 1.0 * 0.5 = 0.50, still under
526        // pause_above. Same gate API, same behaviour -- a weighted soft
527        // source cannot force a hold on its own.
528        queue.set(1.0);
529        assert!(
530            approx(governor.level(), 0.50),
531            "new soft source weighted in"
532        );
533        assert!(
534            !governor.should_hold(),
535            "weighted third soft source still cannot force a hold"
536        );
537
538        // And the HARD signal still gets through unmasked over the new source.
539        mem.set(0.90);
540        assert!(approx(governor.level(), 0.90), "hard signal unmasked");
541        assert!(
542            governor.should_hold(),
543            "hard signal re-arms over soft sources"
544        );
545    }
546
547    #[test]
548    fn snapshot_reports_per_source_breakdown() {
549        let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
550        let mem = Arc::new(MockSource::new("memory", 0.70, 1.0, true));
551        let cpu = Arc::new(MockSource::new("cpu", 0.40, 0.5, false));
552        let governor = UnifiedPressure::new(
553            vec![
554                mem as Arc<dyn PressureSource>,
555                cpu as Arc<dyn PressureSource>,
556            ],
557            hyst,
558        );
559
560        let snap = governor.snapshot();
561        assert_eq!(snap.sources.len(), 2);
562        assert!(approx(snap.hard_max, 0.70));
563        assert!(approx(snap.soft_max, 0.20)); // 0.40 * 0.5
564        assert!(approx(snap.level, 0.70));
565        assert!(!snap.paused);
566
567        let cpu_reading = snap
568            .sources
569            .iter()
570            .find(|r| r.name == "cpu")
571            .expect("cpu present");
572        assert!(!cpu_reading.is_hard);
573        assert!(approx(cpu_reading.effective, 0.20));
574    }
575
576    #[test]
577    fn memory_pressure_source_wraps_guard_as_hard() {
578        use crate::memory::{MemoryGuard, MemoryGuardConfig};
579
580        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
581            limit_bytes: 1000,
582            pressure_threshold: 0.80,
583            ..Default::default()
584        }));
585        guard.add_bytes(700); // 70%
586        let src = MemoryPressureSource::new(Arc::clone(&guard));
587
588        assert_eq!(src.name(), "memory");
589        assert!(src.is_hard());
590        assert!(approx(src.weight(), 1.0));
591        assert!(
592            approx(src.sample().get(), 0.70),
593            "sample should mirror guard.pressure_ratio(), got {}",
594            src.sample().get()
595        );
596    }
597}