Skip to main content

hyperi_rustlib/governor/
source.rs

1// Project:   hyperi-rustlib
2// File:      src/governor/source.rs
3// Purpose:   Pressure seam + memory source for the self-regulation governor
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pressure seam: normalised readings, sources, and the unified latch.
10
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use crate::memory::MemoryGuard;
15
16/// A normalised pressure reading, clamped to `[0.0, 1.0]` on construction.
17///
18/// `NaN` collapses to `0.0` (treat an unreadable source as no pressure,
19/// never as max pressure -- a `NaN` masquerading as `1.0` would wedge the
20/// governor into a permanent hold).
21#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
22pub struct Pressure(f64);
23
24impl Pressure {
25    /// Construct a reading, clamping to `[0.0, 1.0]`. `NaN` becomes `0.0`.
26    #[must_use]
27    pub fn new(value: f64) -> Self {
28        // `clamp` panics on NaN, and `f64::max(NaN, 0.0)` returns 0.0 only
29        // for the left-NaN form -- be explicit so the intent is obvious.
30        let v = if value.is_nan() {
31            0.0
32        } else {
33            value.clamp(0.0, 1.0)
34        };
35        Self(v)
36    }
37
38    /// The clamped reading in `[0.0, 1.0]`.
39    #[must_use]
40    pub fn get(&self) -> f64 {
41        self.0
42    }
43}
44
45/// A source of normalised pressure feeding the unified governor.
46///
47/// Implementors are wrappers over the real signal (memory guard, and
48/// later CPU, queue depth, etc.). Keeping the trait here -- not in the
49/// signal's own module -- keeps `memory` a leaf with no governor
50/// dependency.
51pub trait PressureSource: Send + Sync {
52    /// Stable identifier for diagnostics (e.g. `"memory"`).
53    fn name(&self) -> &'static str;
54
55    /// Sample the current pressure.
56    fn sample(&self) -> Pressure;
57
58    /// Sensitivity weight applied to SOFT signals in the combine. HARD
59    /// signals ignore this (they are never down-weighted). Default `1.0`.
60    fn weight(&self) -> f64 {
61        1.0
62    }
63
64    /// HARD signals are never masked or down-weighted -- their raw reading
65    /// always competes for the combined level. Default `false`.
66    fn is_hard(&self) -> bool {
67        false
68    }
69}
70
71/// HARD pressure source backed by the [`MemoryGuard`].
72///
73/// A thin wrapper so `memory` stays a leaf module: the trait
74/// implementation lives here, in the governor, not in `guard.rs`.
75pub struct MemoryPressureSource(Arc<MemoryGuard>);
76
77impl MemoryPressureSource {
78    /// Wrap a shared memory guard as a pressure source.
79    #[must_use]
80    pub fn new(guard: Arc<MemoryGuard>) -> Self {
81        Self(guard)
82    }
83}
84
85impl PressureSource for MemoryPressureSource {
86    fn name(&self) -> &'static str {
87        "memory"
88    }
89
90    fn sample(&self) -> Pressure {
91        Pressure::new(self.0.pressure_ratio())
92    }
93
94    fn weight(&self) -> f64 {
95        1.0
96    }
97
98    fn is_hard(&self) -> bool {
99        true
100    }
101}
102
103/// Hysteresis band for the pause/resume latch.
104///
105/// `pause_above` must be strictly greater than `resume_below`, otherwise
106/// there is no band to hold the latch and it degenerates to a single
107/// threshold (flapping). [`Self::new`] validates this.
108#[derive(Debug, Clone, Copy)]
109pub struct Hysteresis {
110    /// Arm the latch (start holding) when the level reaches this.
111    pub pause_above: f64,
112    /// Release the latch (stop holding) when the level drops to this.
113    pub resume_below: f64,
114}
115
116impl Hysteresis {
117    /// Construct a band, validating `pause_above > resume_below`.
118    ///
119    /// # Errors
120    ///
121    /// Returns `Err` if the bounds are non-finite or `pause_above` is not
122    /// strictly greater than `resume_below`.
123    pub fn new(pause_above: f64, resume_below: f64) -> Result<Self, String> {
124        if !pause_above.is_finite() || !resume_below.is_finite() {
125            return Err(format!(
126                "hysteresis bounds must be finite, got pause_above={pause_above}, \
127                 resume_below={resume_below}"
128            ));
129        }
130        if pause_above <= resume_below {
131            return Err(format!(
132                "hysteresis requires pause_above > resume_below, got \
133                 pause_above={pause_above}, resume_below={resume_below}"
134            ));
135        }
136        Ok(Self {
137            pause_above,
138            resume_below,
139        })
140    }
141}
142
143/// A per-source diagnostic line in a [`UnifiedPressureSnapshot`].
144#[derive(Debug, Clone)]
145pub struct SourceReading {
146    /// Source identifier.
147    pub name: &'static str,
148    /// Raw clamped sample.
149    pub raw: f64,
150    /// Weight the source declares.
151    pub weight: f64,
152    /// Whether the source is HARD (raw, never masked).
153    pub is_hard: bool,
154    /// The value that competed for the combined level (raw for HARD,
155    /// `raw * weight` for SOFT).
156    pub effective: f64,
157}
158
159/// Point-in-time breakdown of the governor for diagnostics / metrics.
160#[derive(Debug, Clone)]
161pub struct UnifiedPressureSnapshot {
162    /// Per-source readings.
163    pub sources: Vec<SourceReading>,
164    /// Max raw reading across HARD sources (`0.0` if none).
165    pub hard_max: f64,
166    /// Max weighted reading across SOFT sources (`0.0` if none).
167    pub soft_max: f64,
168    /// Combined level (`hard_max.max(soft_max)`).
169    pub level: f64,
170    /// Latched hold state at snapshot time.
171    pub paused: bool,
172}
173
174/// Combines pressure sources into one level under a hysteretic latch.
175///
176/// See the [module docs](crate::governor) for the design invariants. The
177/// latch state is an [`AtomicBool`] so [`should_hold`](Self::should_hold)
178/// is a cheap, `Sync` hot-path check.
179pub struct UnifiedPressure {
180    sources: Vec<Arc<dyn PressureSource>>,
181    hyst: Hysteresis,
182    paused: AtomicBool,
183}
184
185impl UnifiedPressure {
186    /// Build a governor over the given sources and hysteresis band.
187    #[must_use]
188    pub fn new(sources: Vec<Arc<dyn PressureSource>>, hyst: Hysteresis) -> Self {
189        Self {
190            sources,
191            hyst,
192            paused: AtomicBool::new(false),
193        }
194    }
195
196    /// Add a source after construction.
197    ///
198    /// Proves the seam accepts a new signal kind (e.g. a future CPU
199    /// source) with zero change to the gate API -- existing callers of
200    /// [`level`](Self::level) / [`should_hold`](Self::should_hold) are
201    /// untouched.
202    pub fn add_source(&mut self, source: Arc<dyn PressureSource>) {
203        self.sources.push(source);
204    }
205
206    /// Combined pressure level in `[0.0, 1.0]`.
207    ///
208    /// `hard_max` = max raw reading over HARD sources (never weighted,
209    /// never masked). `soft_max` = max of `sample * weight` over SOFT
210    /// sources. `level = hard_max.max(soft_max)`.
211    #[must_use]
212    pub fn level(&self) -> f64 {
213        let mut hard_max = 0.0_f64;
214        let mut soft_max = 0.0_f64;
215        for src in &self.sources {
216            let raw = src.sample().get();
217            if src.is_hard() {
218                hard_max = hard_max.max(raw);
219            } else {
220                soft_max = soft_max.max(raw * src.weight());
221            }
222        }
223        hard_max.max(soft_max)
224    }
225
226    /// Hysteretic hold latch over [`level`](Self::level).
227    ///
228    /// - Held and `level <= resume_below` -> release, return `false`.
229    /// - Not held and `level >= pause_above` -> arm, return `true`.
230    /// - Otherwise -> return the current latch state (the band holds it).
231    #[must_use]
232    pub fn should_hold(&self) -> bool {
233        let level = self.level();
234        let paused = self.paused.load(Ordering::Acquire);
235        if paused {
236            if level <= self.hyst.resume_below {
237                self.paused.store(false, Ordering::Release);
238                return false;
239            }
240            true
241        } else {
242            if level >= self.hyst.pause_above {
243                self.paused.store(true, Ordering::Release);
244                return true;
245            }
246            false
247        }
248    }
249
250    /// Per-source breakdown plus the combined level and latch state.
251    #[must_use]
252    pub fn snapshot(&self) -> UnifiedPressureSnapshot {
253        let mut readings = Vec::with_capacity(self.sources.len());
254        let mut hard_max = 0.0_f64;
255        let mut soft_max = 0.0_f64;
256        for src in &self.sources {
257            let raw = src.sample().get();
258            let weight = src.weight();
259            let is_hard = src.is_hard();
260            let effective = if is_hard { raw } else { raw * weight };
261            if is_hard {
262                hard_max = hard_max.max(raw);
263            } else {
264                soft_max = soft_max.max(effective);
265            }
266            readings.push(SourceReading {
267                name: src.name(),
268                raw,
269                weight,
270                is_hard,
271                effective,
272            });
273        }
274        UnifiedPressureSnapshot {
275            sources: readings,
276            hard_max,
277            soft_max,
278            level: hard_max.max(soft_max),
279            paused: self.paused.load(Ordering::Acquire),
280        }
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use std::sync::atomic::AtomicU64;
288
289    /// Scriptable test double: a source whose reading can be set at
290    /// runtime so a single `UnifiedPressure` can be driven through a
291    /// rising/falling sequence. Stores the reading as bit-pattern `u64`
292    /// so it stays `Sync` without a lock (crate forbids `unsafe`, so a
293    /// `Cell` would not be `Sync`).
294    struct MockSource {
295        name: &'static str,
296        value: AtomicU64,
297        weight: f64,
298        hard: bool,
299    }
300
301    impl MockSource {
302        fn new(name: &'static str, value: f64, weight: f64, hard: bool) -> Self {
303            Self {
304                name,
305                value: AtomicU64::new(value.to_bits()),
306                weight,
307                hard,
308            }
309        }
310
311        fn set(&self, value: f64) {
312            self.value.store(value.to_bits(), Ordering::Relaxed);
313        }
314    }
315
316    impl PressureSource for MockSource {
317        fn name(&self) -> &'static str {
318            self.name
319        }
320        fn sample(&self) -> Pressure {
321            Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
322        }
323        fn weight(&self) -> f64 {
324            self.weight
325        }
326        fn is_hard(&self) -> bool {
327            self.hard
328        }
329    }
330
331    fn approx(a: f64, b: f64) -> bool {
332        (a - b).abs() < 1e-9
333    }
334
335    #[test]
336    fn pressure_clamps_and_handles_nan() {
337        assert!(approx(Pressure::new(-1.0).get(), 0.0));
338        assert!(approx(Pressure::new(2.0).get(), 1.0));
339        assert!(approx(Pressure::new(0.5).get(), 0.5));
340        // NaN must collapse to 0.0, NOT to 1.0 -- a NaN reading must never
341        // wedge the governor into a permanent hold.
342        assert!(approx(Pressure::new(f64::NAN).get(), 0.0));
343        assert!(approx(Pressure::new(f64::INFINITY).get(), 1.0));
344        assert!(approx(Pressure::new(f64::NEG_INFINITY).get(), 0.0));
345    }
346
347    #[test]
348    fn hysteresis_rejects_inverted_band() {
349        assert!(Hysteresis::new(0.80, 0.65).is_ok());
350        assert!(Hysteresis::new(0.65, 0.80).is_err());
351        assert!(Hysteresis::new(0.80, 0.80).is_err());
352        assert!(Hysteresis::new(f64::NAN, 0.5).is_err());
353    }
354
355    /// The adversarial proving test.
356    ///
357    /// Drives one `UnifiedPressure` through the full pause/resume cycle and
358    /// proves the two riskiest invariants:
359    ///   1. a saturated SOFT signal at weight 0.5 cannot force a hold the
360    ///      HARD signal would not (no soft-masks-hard, no spurious hold);
361    ///   2. the hysteresis latch arms on the rising edge, holds inside the
362    ///      band, releases on the falling edge, and re-arms cleanly (no
363    ///      sticky state).
364    ///
365    /// Step 6 proves a third soft source plugs in via `add_source` with
366    /// zero change to the gate API.
367    #[test]
368    fn adversarial_combine_and_hysteresis() {
369        let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
370
371        // HARD memory source + a SOFT "cpu" source at weight 0.5.
372        let mem = Arc::new(MockSource::new("memory", 0.50, 1.0, true));
373        let cpu = Arc::new(MockSource::new("cpu", 1.0, 0.5, false));
374
375        let governor = UnifiedPressure::new(
376            vec![
377                Arc::clone(&mem) as Arc<dyn PressureSource>,
378                Arc::clone(&cpu) as Arc<dyn PressureSource>,
379            ],
380            hyst,
381        );
382
383        // Step 1: memory=0.50, cpu=1.0 (saturated SOFT).
384        // soft = 1.0 * 0.5 = 0.50; hard = 0.50; level = max(0.50, 0.50) = 0.50.
385        // A saturated SOFT signal at weight 0.5 CANNOT force a hold the HARD
386        // signal would not. level < pause_above -> no hold.
387        assert!(
388            approx(governor.level(), 0.50),
389            "level should be 0.50, got {}",
390            governor.level()
391        );
392        assert!(
393            !governor.should_hold(),
394            "saturated soft signal must not mask/force a hold"
395        );
396
397        // Step 2: memory rises to 0.85 -> rising edge latches.
398        mem.set(0.85);
399        assert!(approx(governor.level(), 0.85), "hard 0.85 dominates");
400        assert!(
401            governor.should_hold(),
402            "rising edge above pause_above latches"
403        );
404
405        // Step 3: memory falls to 0.70 -> inside band (> resume_below) -> holds.
406        mem.set(0.70);
407        assert!(approx(governor.level(), 0.70));
408        assert!(
409            governor.should_hold(),
410            "0.70 is inside the hysteresis band -> latch stays held"
411        );
412
413        // Step 4: memory falls to 0.60 -> below resume_below -> releases.
414        mem.set(0.60);
415        assert!(approx(governor.level(), 0.60));
416        assert!(
417            !governor.should_hold(),
418            "falling edge below resume_below releases the latch"
419        );
420
421        // Step 5: memory back to 0.85 -> latch re-arms (no sticky state).
422        mem.set(0.85);
423        assert!(
424            governor.should_hold(),
425            "latch must re-arm cleanly with no sticky state"
426        );
427
428        // Step 6: add a THIRD soft source via add_source -- proves the seam
429        // accepts a new signal kind with zero gate-API change. Release first
430        // so we can observe the new source's effect cleanly.
431        mem.set(0.10);
432        let mut governor = governor;
433        let queue = Arc::new(MockSource::new("queue_depth", 0.0, 0.5, false));
434        governor.add_source(Arc::clone(&queue) as Arc<dyn PressureSource>);
435
436        // Drop out of the band first (everything low) so the latch releases.
437        cpu.set(0.0);
438        assert!(!governor.should_hold(), "all sources low -> released");
439
440        // Now saturate the new SOFT source: 1.0 * 0.5 = 0.50, still under
441        // pause_above. Same gate API, same behaviour -- a weighted soft
442        // source cannot force a hold on its own.
443        queue.set(1.0);
444        assert!(
445            approx(governor.level(), 0.50),
446            "new soft source weighted in"
447        );
448        assert!(
449            !governor.should_hold(),
450            "weighted third soft source still cannot force a hold"
451        );
452
453        // And the HARD signal still gets through unmasked over the new source.
454        mem.set(0.90);
455        assert!(approx(governor.level(), 0.90), "hard signal unmasked");
456        assert!(
457            governor.should_hold(),
458            "hard signal re-arms over soft sources"
459        );
460    }
461
462    #[test]
463    fn snapshot_reports_per_source_breakdown() {
464        let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
465        let mem = Arc::new(MockSource::new("memory", 0.70, 1.0, true));
466        let cpu = Arc::new(MockSource::new("cpu", 0.40, 0.5, false));
467        let governor = UnifiedPressure::new(
468            vec![
469                mem as Arc<dyn PressureSource>,
470                cpu as Arc<dyn PressureSource>,
471            ],
472            hyst,
473        );
474
475        let snap = governor.snapshot();
476        assert_eq!(snap.sources.len(), 2);
477        assert!(approx(snap.hard_max, 0.70));
478        assert!(approx(snap.soft_max, 0.20)); // 0.40 * 0.5
479        assert!(approx(snap.level, 0.70));
480        assert!(!snap.paused);
481
482        let cpu_reading = snap
483            .sources
484            .iter()
485            .find(|r| r.name == "cpu")
486            .expect("cpu present");
487        assert!(!cpu_reading.is_hard);
488        assert!(approx(cpu_reading.effective, 0.20));
489    }
490
491    #[test]
492    fn memory_pressure_source_wraps_guard_as_hard() {
493        use crate::memory::{MemoryGuard, MemoryGuardConfig};
494
495        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
496            limit_bytes: 1000,
497            pressure_threshold: 0.80,
498            ..Default::default()
499        }));
500        guard.add_bytes(700); // 70%
501        let src = MemoryPressureSource::new(Arc::clone(&guard));
502
503        assert_eq!(src.name(), "memory");
504        assert!(src.is_hard());
505        assert!(approx(src.weight(), 1.0));
506        assert!(
507            approx(src.sample().get(), 0.70),
508            "sample should mirror guard.pressure_ratio(), got {}",
509            src.sample().get()
510        );
511    }
512}