hyperi_rustlib/governor/source.rs
1// Project: hyperi-rustlib
2// File: src/governor/source.rs
3// Purpose: Pressure seam + memory source for the self-regulation governor
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pressure seam: normalised readings, sources, and the unified latch.
10
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use crate::memory::MemoryGuard;
15
16/// A normalised pressure reading, clamped to `[0.0, 1.0]` on construction.
17///
18/// `NaN` collapses to `0.0` (treat an unreadable source as no pressure,
19/// never as max pressure -- a `NaN` masquerading as `1.0` would wedge the
20/// governor into a permanent hold).
21#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
22pub struct Pressure(f64);
23
24impl Pressure {
25 /// Construct a reading, clamping to `[0.0, 1.0]`. `NaN` becomes `0.0`.
26 #[must_use]
27 pub fn new(value: f64) -> Self {
28 // `clamp` panics on NaN, and `f64::max(NaN, 0.0)` returns 0.0 only
29 // for the left-NaN form -- be explicit so the intent is obvious.
30 let v = if value.is_nan() {
31 0.0
32 } else {
33 value.clamp(0.0, 1.0)
34 };
35 Self(v)
36 }
37
38 /// The clamped reading in `[0.0, 1.0]`.
39 #[must_use]
40 pub fn get(&self) -> f64 {
41 self.0
42 }
43}
44
45/// A source of normalised pressure feeding the unified governor.
46///
47/// Implementors are wrappers over the real signal (memory guard, and
48/// later CPU, queue depth, etc.). Keeping the trait here -- not in the
49/// signal's own module -- keeps `memory` a leaf with no governor
50/// dependency.
51pub trait PressureSource: Send + Sync {
52 /// Stable identifier for diagnostics (e.g. `"memory"`).
53 fn name(&self) -> &'static str;
54
55 /// Sample the current pressure.
56 fn sample(&self) -> Pressure;
57
58 /// Sensitivity weight applied to SOFT signals in the combine. HARD
59 /// signals ignore this (they are never down-weighted). Default `1.0`.
60 fn weight(&self) -> f64 {
61 1.0
62 }
63
64 /// HARD signals are never masked or down-weighted -- their raw reading
65 /// always competes for the combined level. Default `false`.
66 fn is_hard(&self) -> bool {
67 false
68 }
69}
70
71/// HARD pressure source backed by the [`MemoryGuard`].
72///
73/// A thin wrapper so `memory` stays a leaf module: the trait
74/// implementation lives here, in the governor, not in `guard.rs`.
75pub struct MemoryPressureSource(Arc<MemoryGuard>);
76
77impl MemoryPressureSource {
78 /// Wrap a shared memory guard as a pressure source.
79 #[must_use]
80 pub fn new(guard: Arc<MemoryGuard>) -> Self {
81 Self(guard)
82 }
83}
84
85impl PressureSource for MemoryPressureSource {
86 fn name(&self) -> &'static str {
87 "memory"
88 }
89
90 fn sample(&self) -> Pressure {
91 Pressure::new(self.0.pressure_ratio())
92 }
93
94 fn weight(&self) -> f64 {
95 1.0
96 }
97
98 fn is_hard(&self) -> bool {
99 true
100 }
101}
102
103/// Hysteresis band for the pause/resume latch.
104///
105/// `pause_above` must be strictly greater than `resume_below`, otherwise
106/// there is no band to hold the latch and it degenerates to a single
107/// threshold (flapping). [`Self::new`] validates this.
108#[derive(Debug, Clone, Copy)]
109pub struct Hysteresis {
110 /// Arm the latch (start holding) when the level reaches this.
111 pub pause_above: f64,
112 /// Release the latch (stop holding) when the level drops to this.
113 pub resume_below: f64,
114}
115
116impl Hysteresis {
117 /// Construct a band, validating `pause_above > resume_below` and that
118 /// both bounds sit inside `[0.0, 1.0]`.
119 ///
120 /// The range check matters because [`Pressure`] samples are clamped to
121 /// `[0.0, 1.0]`: a `resume_below < 0.0` could never release the latch
122 /// (permanent stuck-pause) and a `pause_above > 1.0` could never arm it
123 /// (brake silently disabled).
124 ///
125 /// # Errors
126 ///
127 /// Returns `Err` if the bounds are non-finite, outside `[0.0, 1.0]`, or
128 /// `pause_above` is not strictly greater than `resume_below`.
129 pub fn new(pause_above: f64, resume_below: f64) -> Result<Self, String> {
130 if !pause_above.is_finite() || !resume_below.is_finite() {
131 return Err(format!(
132 "hysteresis bounds must be finite, got pause_above={pause_above}, \
133 resume_below={resume_below}"
134 ));
135 }
136 if !(0.0..=1.0).contains(&pause_above) || !(0.0..=1.0).contains(&resume_below) {
137 return Err(format!(
138 "hysteresis bounds must be within [0.0, 1.0] (pressure levels are \
139 clamped to that range), got pause_above={pause_above}, \
140 resume_below={resume_below}"
141 ));
142 }
143 if pause_above <= resume_below {
144 return Err(format!(
145 "hysteresis requires pause_above > resume_below, got \
146 pause_above={pause_above}, resume_below={resume_below}"
147 ));
148 }
149 Ok(Self {
150 pause_above,
151 resume_below,
152 })
153 }
154}
155
156/// A per-source diagnostic line in a [`UnifiedPressureSnapshot`].
157#[derive(Debug, Clone)]
158pub struct SourceReading {
159 /// Source identifier.
160 pub name: &'static str,
161 /// Raw clamped sample.
162 pub raw: f64,
163 /// Weight the source declares.
164 pub weight: f64,
165 /// Whether the source is HARD (raw, never masked).
166 pub is_hard: bool,
167 /// The value that competed for the combined level (raw for HARD,
168 /// `raw * weight` for SOFT).
169 pub effective: f64,
170}
171
172/// Point-in-time breakdown of the governor for diagnostics / metrics.
173#[derive(Debug, Clone)]
174pub struct UnifiedPressureSnapshot {
175 /// Per-source readings.
176 pub sources: Vec<SourceReading>,
177 /// Max raw reading across HARD sources (`0.0` if none).
178 pub hard_max: f64,
179 /// Max weighted reading across SOFT sources (`0.0` if none).
180 pub soft_max: f64,
181 /// Combined level (`hard_max.max(soft_max)`).
182 pub level: f64,
183 /// Latched hold state at snapshot time.
184 pub paused: bool,
185}
186
187/// Combines pressure sources into one level under a hysteretic latch.
188///
189/// See the [module docs](crate::governor) for the design invariants. The
190/// latch state is an [`AtomicBool`] so [`should_hold`](Self::should_hold)
191/// is a cheap, `Sync` hot-path check.
192pub struct UnifiedPressure {
193 sources: Vec<Arc<dyn PressureSource>>,
194 hyst: Hysteresis,
195 paused: AtomicBool,
196}
197
198impl UnifiedPressure {
199 /// Build a governor over the given sources and hysteresis band.
200 #[must_use]
201 pub fn new(sources: Vec<Arc<dyn PressureSource>>, hyst: Hysteresis) -> Self {
202 Self {
203 sources,
204 hyst,
205 paused: AtomicBool::new(false),
206 }
207 }
208
209 /// Add a source after construction.
210 ///
211 /// Proves the seam accepts a new signal kind (e.g. a future CPU
212 /// source) with zero change to the gate API -- existing callers of
213 /// [`level`](Self::level) / [`should_hold`](Self::should_hold) are
214 /// untouched.
215 pub fn add_source(&mut self, source: Arc<dyn PressureSource>) {
216 self.sources.push(source);
217 }
218
219 /// Combined pressure level in `[0.0, 1.0]`.
220 ///
221 /// `hard_max` = max raw reading over HARD sources (never weighted,
222 /// never masked). `soft_max` = max of `sample * weight` over SOFT
223 /// sources. `level = hard_max.max(soft_max)`.
224 ///
225 /// Soft `weight()` is clamped to `[0.0, 1.0]` (non-finite -> `1.0`) at the
226 /// combine: `add_source` is the advertised extension seam, so a future
227 /// soft source returning a stray weight cannot push `level()` outside its
228 /// documented range or silently neutralise itself with a negative/NaN
229 /// weight. One branch per soft source per `evaluate()` (not per record);
230 /// with no soft source wired today the cost is nil.
231 #[must_use]
232 pub fn level(&self) -> f64 {
233 let mut hard_max = 0.0_f64;
234 let mut soft_max = 0.0_f64;
235 for src in &self.sources {
236 let raw = src.sample().get();
237 if src.is_hard() {
238 hard_max = hard_max.max(raw);
239 } else {
240 let w = src.weight();
241 let w = if w.is_finite() {
242 w.clamp(0.0, 1.0)
243 } else {
244 1.0
245 };
246 soft_max = soft_max.max(raw * w);
247 }
248 }
249 hard_max.max(soft_max)
250 }
251
252 /// Hysteretic hold latch over [`level`](Self::level).
253 ///
254 /// - Held and `level <= resume_below` -> release, return `false`.
255 /// - Not held and `level >= pause_above` -> arm, return `true`.
256 /// - Otherwise -> return the current latch state (the band holds it).
257 #[must_use]
258 pub fn should_hold(&self) -> bool {
259 let level = self.level();
260 let paused = self.paused.load(Ordering::Acquire);
261 if paused {
262 if level <= self.hyst.resume_below {
263 self.paused.store(false, Ordering::Release);
264 return false;
265 }
266 true
267 } else {
268 if level >= self.hyst.pause_above {
269 self.paused.store(true, Ordering::Release);
270 return true;
271 }
272 false
273 }
274 }
275
276 /// Per-source breakdown plus the combined level and latch state.
277 #[must_use]
278 pub fn snapshot(&self) -> UnifiedPressureSnapshot {
279 let mut readings = Vec::with_capacity(self.sources.len());
280 let mut hard_max = 0.0_f64;
281 let mut soft_max = 0.0_f64;
282 for src in &self.sources {
283 let raw = src.sample().get();
284 let weight = src.weight();
285 let is_hard = src.is_hard();
286 let effective = if is_hard { raw } else { raw * weight };
287 if is_hard {
288 hard_max = hard_max.max(raw);
289 } else {
290 soft_max = soft_max.max(effective);
291 }
292 readings.push(SourceReading {
293 name: src.name(),
294 raw,
295 weight,
296 is_hard,
297 effective,
298 });
299 }
300 UnifiedPressureSnapshot {
301 sources: readings,
302 hard_max,
303 soft_max,
304 level: hard_max.max(soft_max),
305 paused: self.paused.load(Ordering::Acquire),
306 }
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use std::sync::atomic::AtomicU64;
314
315 /// Scriptable test double: a source whose reading can be set at
316 /// runtime so a single `UnifiedPressure` can be driven through a
317 /// rising/falling sequence. Stores the reading as bit-pattern `u64`
318 /// so it stays `Sync` without a lock (crate forbids `unsafe`, so a
319 /// `Cell` would not be `Sync`).
320 struct MockSource {
321 name: &'static str,
322 value: AtomicU64,
323 weight: f64,
324 hard: bool,
325 }
326
327 impl MockSource {
328 fn new(name: &'static str, value: f64, weight: f64, hard: bool) -> Self {
329 Self {
330 name,
331 value: AtomicU64::new(value.to_bits()),
332 weight,
333 hard,
334 }
335 }
336
337 fn set(&self, value: f64) {
338 self.value.store(value.to_bits(), Ordering::Relaxed);
339 }
340 }
341
342 impl PressureSource for MockSource {
343 fn name(&self) -> &'static str {
344 self.name
345 }
346 fn sample(&self) -> Pressure {
347 Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
348 }
349 fn weight(&self) -> f64 {
350 self.weight
351 }
352 fn is_hard(&self) -> bool {
353 self.hard
354 }
355 }
356
357 fn approx(a: f64, b: f64) -> bool {
358 (a - b).abs() < 1e-9
359 }
360
361 #[test]
362 fn pressure_clamps_and_handles_nan() {
363 assert!(approx(Pressure::new(-1.0).get(), 0.0));
364 assert!(approx(Pressure::new(2.0).get(), 1.0));
365 assert!(approx(Pressure::new(0.5).get(), 0.5));
366 // NaN must collapse to 0.0, NOT to 1.0 -- a NaN reading must never
367 // wedge the governor into a permanent hold.
368 assert!(approx(Pressure::new(f64::NAN).get(), 0.0));
369 assert!(approx(Pressure::new(f64::INFINITY).get(), 1.0));
370 assert!(approx(Pressure::new(f64::NEG_INFINITY).get(), 0.0));
371 }
372
373 #[test]
374 fn hysteresis_rejects_inverted_band() {
375 assert!(Hysteresis::new(0.80, 0.65).is_ok());
376 assert!(Hysteresis::new(0.65, 0.80).is_err());
377 assert!(Hysteresis::new(0.80, 0.80).is_err());
378 assert!(Hysteresis::new(f64::NAN, 0.5).is_err());
379 }
380
381 /// Out-of-`[0,1]` bands must be rejected: `Pressure` clamps every sample to
382 /// `[0,1]`, so a band outside that range is unreachable in one direction --
383 /// `resume_below < 0.0` can never release (permanent stuck-pause) and
384 /// `pause_above > 1.0` can never arm (brake silently disabled). Both are
385 /// the failure modes the never-OOM/no-flap contract forbids.
386 #[test]
387 fn hysteresis_rejects_out_of_range_band() {
388 // resume_below below the clamp floor -> latch could never release.
389 assert!(Hysteresis::new(0.5, -0.1).is_err());
390 // pause_above above the clamp ceiling -> latch could never arm.
391 assert!(Hysteresis::new(1.5, 0.65).is_err());
392 // Both ends in range, valid ordering -> still ok (no regression).
393 assert!(Hysteresis::new(1.0, 0.0).is_ok());
394 assert!(Hysteresis::new(0.80, 0.65).is_ok());
395 }
396
397 /// A future SOFT source returning a stray weight must not push `level()`
398 /// outside `[0,1]` or silently neutralise itself. `add_source` is the
399 /// advertised extension seam, so the combine clamps each soft weight to
400 /// `[0,1]` (non-finite -> `1.0`).
401 #[test]
402 fn soft_weight_is_clamped_at_combine() {
403 // weight > 1 at full pressure -> clamped to 1.0, level == 1.0 (not 5.0).
404 let over = Arc::new(MockSource::new("over", 1.0, 5.0, false));
405 let p = UnifiedPressure::new(
406 vec![Arc::clone(&over) as Arc<dyn PressureSource>],
407 Hysteresis::new(0.80, 0.65).expect("band"),
408 );
409 assert!(
410 approx(p.level(), 1.0),
411 "level must stay <= 1.0, got {}",
412 p.level()
413 );
414
415 // NaN weight -> 1.0 multiplier (not dropped by f64::max(NaN)).
416 let nan_w = Arc::new(MockSource::new("nan", 0.5, f64::NAN, false));
417 let p2 = UnifiedPressure::new(
418 vec![Arc::clone(&nan_w) as Arc<dyn PressureSource>],
419 Hysteresis::new(0.80, 0.65).expect("band"),
420 );
421 assert!(
422 approx(p2.level(), 0.5),
423 "NaN weight -> 1.0, got {}",
424 p2.level()
425 );
426
427 // Negative weight -> clamped to 0.0, contributes nothing.
428 let neg = Arc::new(MockSource::new("neg", 1.0, -2.0, false));
429 let p3 = UnifiedPressure::new(
430 vec![Arc::clone(&neg) as Arc<dyn PressureSource>],
431 Hysteresis::new(0.80, 0.65).expect("band"),
432 );
433 assert!(
434 approx(p3.level(), 0.0),
435 "negative weight -> 0, got {}",
436 p3.level()
437 );
438 }
439
440 /// The adversarial proving test.
441 ///
442 /// Drives one `UnifiedPressure` through the full pause/resume cycle and
443 /// proves the two riskiest invariants:
444 /// 1. a saturated SOFT signal at weight 0.5 cannot force a hold the
445 /// HARD signal would not (no soft-masks-hard, no spurious hold);
446 /// 2. the hysteresis latch arms on the rising edge, holds inside the
447 /// band, releases on the falling edge, and re-arms cleanly (no
448 /// sticky state).
449 ///
450 /// Step 6 proves a third soft source plugs in via `add_source` with
451 /// zero change to the gate API.
452 #[test]
453 fn adversarial_combine_and_hysteresis() {
454 let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
455
456 // HARD memory source + a SOFT "cpu" source at weight 0.5.
457 let mem = Arc::new(MockSource::new("memory", 0.50, 1.0, true));
458 let cpu = Arc::new(MockSource::new("cpu", 1.0, 0.5, false));
459
460 let governor = UnifiedPressure::new(
461 vec![
462 Arc::clone(&mem) as Arc<dyn PressureSource>,
463 Arc::clone(&cpu) as Arc<dyn PressureSource>,
464 ],
465 hyst,
466 );
467
468 // Step 1: memory=0.50, cpu=1.0 (saturated SOFT).
469 // soft = 1.0 * 0.5 = 0.50; hard = 0.50; level = max(0.50, 0.50) = 0.50.
470 // A saturated SOFT signal at weight 0.5 CANNOT force a hold the HARD
471 // signal would not. level < pause_above -> no hold.
472 assert!(
473 approx(governor.level(), 0.50),
474 "level should be 0.50, got {}",
475 governor.level()
476 );
477 assert!(
478 !governor.should_hold(),
479 "saturated soft signal must not mask/force a hold"
480 );
481
482 // Step 2: memory rises to 0.85 -> rising edge latches.
483 mem.set(0.85);
484 assert!(approx(governor.level(), 0.85), "hard 0.85 dominates");
485 assert!(
486 governor.should_hold(),
487 "rising edge above pause_above latches"
488 );
489
490 // Step 3: memory falls to 0.70 -> inside band (> resume_below) -> holds.
491 mem.set(0.70);
492 assert!(approx(governor.level(), 0.70));
493 assert!(
494 governor.should_hold(),
495 "0.70 is inside the hysteresis band -> latch stays held"
496 );
497
498 // Step 4: memory falls to 0.60 -> below resume_below -> releases.
499 mem.set(0.60);
500 assert!(approx(governor.level(), 0.60));
501 assert!(
502 !governor.should_hold(),
503 "falling edge below resume_below releases the latch"
504 );
505
506 // Step 5: memory back to 0.85 -> latch re-arms (no sticky state).
507 mem.set(0.85);
508 assert!(
509 governor.should_hold(),
510 "latch must re-arm cleanly with no sticky state"
511 );
512
513 // Step 6: add a THIRD soft source via add_source -- proves the seam
514 // accepts a new signal kind with zero gate-API change. Release first
515 // so we can observe the new source's effect cleanly.
516 mem.set(0.10);
517 let mut governor = governor;
518 let queue = Arc::new(MockSource::new("queue_depth", 0.0, 0.5, false));
519 governor.add_source(Arc::clone(&queue) as Arc<dyn PressureSource>);
520
521 // Drop out of the band first (everything low) so the latch releases.
522 cpu.set(0.0);
523 assert!(!governor.should_hold(), "all sources low -> released");
524
525 // Now saturate the new SOFT source: 1.0 * 0.5 = 0.50, still under
526 // pause_above. Same gate API, same behaviour -- a weighted soft
527 // source cannot force a hold on its own.
528 queue.set(1.0);
529 assert!(
530 approx(governor.level(), 0.50),
531 "new soft source weighted in"
532 );
533 assert!(
534 !governor.should_hold(),
535 "weighted third soft source still cannot force a hold"
536 );
537
538 // And the HARD signal still gets through unmasked over the new source.
539 mem.set(0.90);
540 assert!(approx(governor.level(), 0.90), "hard signal unmasked");
541 assert!(
542 governor.should_hold(),
543 "hard signal re-arms over soft sources"
544 );
545 }
546
547 #[test]
548 fn snapshot_reports_per_source_breakdown() {
549 let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
550 let mem = Arc::new(MockSource::new("memory", 0.70, 1.0, true));
551 let cpu = Arc::new(MockSource::new("cpu", 0.40, 0.5, false));
552 let governor = UnifiedPressure::new(
553 vec![
554 mem as Arc<dyn PressureSource>,
555 cpu as Arc<dyn PressureSource>,
556 ],
557 hyst,
558 );
559
560 let snap = governor.snapshot();
561 assert_eq!(snap.sources.len(), 2);
562 assert!(approx(snap.hard_max, 0.70));
563 assert!(approx(snap.soft_max, 0.20)); // 0.40 * 0.5
564 assert!(approx(snap.level, 0.70));
565 assert!(!snap.paused);
566
567 let cpu_reading = snap
568 .sources
569 .iter()
570 .find(|r| r.name == "cpu")
571 .expect("cpu present");
572 assert!(!cpu_reading.is_hard);
573 assert!(approx(cpu_reading.effective, 0.20));
574 }
575
576 #[test]
577 fn memory_pressure_source_wraps_guard_as_hard() {
578 use crate::memory::{MemoryGuard, MemoryGuardConfig};
579
580 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
581 limit_bytes: 1000,
582 pressure_threshold: 0.80,
583 ..Default::default()
584 }));
585 guard.add_bytes(700); // 70%
586 let src = MemoryPressureSource::new(Arc::clone(&guard));
587
588 assert_eq!(src.name(), "memory");
589 assert!(src.is_hard());
590 assert!(approx(src.weight(), 1.0));
591 assert!(
592 approx(src.sample().get(), 0.70),
593 "sample should mirror guard.pressure_ratio(), got {}",
594 src.sample().get()
595 );
596 }
597}