use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::memory::MemoryGuard;
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
pub struct Pressure(f64);
impl Pressure {
#[must_use]
pub fn new(value: f64) -> Self {
let v = if value.is_nan() {
0.0
} else {
value.clamp(0.0, 1.0)
};
Self(v)
}
#[must_use]
pub fn get(&self) -> f64 {
self.0
}
}
pub trait PressureSource: Send + Sync {
fn name(&self) -> &'static str;
fn sample(&self) -> Pressure;
fn weight(&self) -> f64 {
1.0
}
fn is_hard(&self) -> bool {
false
}
}
pub struct MemoryPressureSource(Arc<MemoryGuard>);
impl MemoryPressureSource {
#[must_use]
pub fn new(guard: Arc<MemoryGuard>) -> Self {
Self(guard)
}
}
impl PressureSource for MemoryPressureSource {
fn name(&self) -> &'static str {
"memory"
}
fn sample(&self) -> Pressure {
Pressure::new(self.0.pressure_ratio())
}
fn weight(&self) -> f64 {
1.0
}
fn is_hard(&self) -> bool {
true
}
}
#[derive(Debug, Clone, Copy)]
pub struct Hysteresis {
pub pause_above: f64,
pub resume_below: f64,
}
impl Hysteresis {
pub fn new(pause_above: f64, resume_below: f64) -> Result<Self, String> {
if !pause_above.is_finite() || !resume_below.is_finite() {
return Err(format!(
"hysteresis bounds must be finite, got pause_above={pause_above}, \
resume_below={resume_below}"
));
}
if pause_above <= resume_below {
return Err(format!(
"hysteresis requires pause_above > resume_below, got \
pause_above={pause_above}, resume_below={resume_below}"
));
}
Ok(Self {
pause_above,
resume_below,
})
}
}
#[derive(Debug, Clone)]
pub struct SourceReading {
pub name: &'static str,
pub raw: f64,
pub weight: f64,
pub is_hard: bool,
pub effective: f64,
}
#[derive(Debug, Clone)]
pub struct UnifiedPressureSnapshot {
pub sources: Vec<SourceReading>,
pub hard_max: f64,
pub soft_max: f64,
pub level: f64,
pub paused: bool,
}
pub struct UnifiedPressure {
sources: Vec<Arc<dyn PressureSource>>,
hyst: Hysteresis,
paused: AtomicBool,
}
impl UnifiedPressure {
#[must_use]
pub fn new(sources: Vec<Arc<dyn PressureSource>>, hyst: Hysteresis) -> Self {
Self {
sources,
hyst,
paused: AtomicBool::new(false),
}
}
pub fn add_source(&mut self, source: Arc<dyn PressureSource>) {
self.sources.push(source);
}
#[must_use]
pub fn level(&self) -> f64 {
let mut hard_max = 0.0_f64;
let mut soft_max = 0.0_f64;
for src in &self.sources {
let raw = src.sample().get();
if src.is_hard() {
hard_max = hard_max.max(raw);
} else {
soft_max = soft_max.max(raw * src.weight());
}
}
hard_max.max(soft_max)
}
#[must_use]
pub fn should_hold(&self) -> bool {
let level = self.level();
let paused = self.paused.load(Ordering::Acquire);
if paused {
if level <= self.hyst.resume_below {
self.paused.store(false, Ordering::Release);
return false;
}
true
} else {
if level >= self.hyst.pause_above {
self.paused.store(true, Ordering::Release);
return true;
}
false
}
}
#[must_use]
pub fn snapshot(&self) -> UnifiedPressureSnapshot {
let mut readings = Vec::with_capacity(self.sources.len());
let mut hard_max = 0.0_f64;
let mut soft_max = 0.0_f64;
for src in &self.sources {
let raw = src.sample().get();
let weight = src.weight();
let is_hard = src.is_hard();
let effective = if is_hard { raw } else { raw * weight };
if is_hard {
hard_max = hard_max.max(raw);
} else {
soft_max = soft_max.max(effective);
}
readings.push(SourceReading {
name: src.name(),
raw,
weight,
is_hard,
effective,
});
}
UnifiedPressureSnapshot {
sources: readings,
hard_max,
soft_max,
level: hard_max.max(soft_max),
paused: self.paused.load(Ordering::Acquire),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicU64;
struct MockSource {
name: &'static str,
value: AtomicU64,
weight: f64,
hard: bool,
}
impl MockSource {
fn new(name: &'static str, value: f64, weight: f64, hard: bool) -> Self {
Self {
name,
value: AtomicU64::new(value.to_bits()),
weight,
hard,
}
}
fn set(&self, value: f64) {
self.value.store(value.to_bits(), Ordering::Relaxed);
}
}
impl PressureSource for MockSource {
fn name(&self) -> &'static str {
self.name
}
fn sample(&self) -> Pressure {
Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
}
fn weight(&self) -> f64 {
self.weight
}
fn is_hard(&self) -> bool {
self.hard
}
}
fn approx(a: f64, b: f64) -> bool {
(a - b).abs() < 1e-9
}
#[test]
fn pressure_clamps_and_handles_nan() {
assert!(approx(Pressure::new(-1.0).get(), 0.0));
assert!(approx(Pressure::new(2.0).get(), 1.0));
assert!(approx(Pressure::new(0.5).get(), 0.5));
assert!(approx(Pressure::new(f64::NAN).get(), 0.0));
assert!(approx(Pressure::new(f64::INFINITY).get(), 1.0));
assert!(approx(Pressure::new(f64::NEG_INFINITY).get(), 0.0));
}
#[test]
fn hysteresis_rejects_inverted_band() {
assert!(Hysteresis::new(0.80, 0.65).is_ok());
assert!(Hysteresis::new(0.65, 0.80).is_err());
assert!(Hysteresis::new(0.80, 0.80).is_err());
assert!(Hysteresis::new(f64::NAN, 0.5).is_err());
}
#[test]
fn adversarial_combine_and_hysteresis() {
let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
let mem = Arc::new(MockSource::new("memory", 0.50, 1.0, true));
let cpu = Arc::new(MockSource::new("cpu", 1.0, 0.5, false));
let governor = UnifiedPressure::new(
vec![
Arc::clone(&mem) as Arc<dyn PressureSource>,
Arc::clone(&cpu) as Arc<dyn PressureSource>,
],
hyst,
);
assert!(
approx(governor.level(), 0.50),
"level should be 0.50, got {}",
governor.level()
);
assert!(
!governor.should_hold(),
"saturated soft signal must not mask/force a hold"
);
mem.set(0.85);
assert!(approx(governor.level(), 0.85), "hard 0.85 dominates");
assert!(
governor.should_hold(),
"rising edge above pause_above latches"
);
mem.set(0.70);
assert!(approx(governor.level(), 0.70));
assert!(
governor.should_hold(),
"0.70 is inside the hysteresis band -> latch stays held"
);
mem.set(0.60);
assert!(approx(governor.level(), 0.60));
assert!(
!governor.should_hold(),
"falling edge below resume_below releases the latch"
);
mem.set(0.85);
assert!(
governor.should_hold(),
"latch must re-arm cleanly with no sticky state"
);
mem.set(0.10);
let mut governor = governor;
let queue = Arc::new(MockSource::new("queue_depth", 0.0, 0.5, false));
governor.add_source(Arc::clone(&queue) as Arc<dyn PressureSource>);
cpu.set(0.0);
assert!(!governor.should_hold(), "all sources low -> released");
queue.set(1.0);
assert!(
approx(governor.level(), 0.50),
"new soft source weighted in"
);
assert!(
!governor.should_hold(),
"weighted third soft source still cannot force a hold"
);
mem.set(0.90);
assert!(approx(governor.level(), 0.90), "hard signal unmasked");
assert!(
governor.should_hold(),
"hard signal re-arms over soft sources"
);
}
#[test]
fn snapshot_reports_per_source_breakdown() {
let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
let mem = Arc::new(MockSource::new("memory", 0.70, 1.0, true));
let cpu = Arc::new(MockSource::new("cpu", 0.40, 0.5, false));
let governor = UnifiedPressure::new(
vec![
mem as Arc<dyn PressureSource>,
cpu as Arc<dyn PressureSource>,
],
hyst,
);
let snap = governor.snapshot();
assert_eq!(snap.sources.len(), 2);
assert!(approx(snap.hard_max, 0.70));
assert!(approx(snap.soft_max, 0.20)); assert!(approx(snap.level, 0.70));
assert!(!snap.paused);
let cpu_reading = snap
.sources
.iter()
.find(|r| r.name == "cpu")
.expect("cpu present");
assert!(!cpu_reading.is_hard);
assert!(approx(cpu_reading.effective, 0.20));
}
#[test]
fn memory_pressure_source_wraps_guard_as_hard() {
use crate::memory::{MemoryGuard, MemoryGuardConfig};
let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes: 1000,
pressure_threshold: 0.80,
..Default::default()
}));
guard.add_bytes(700); let src = MemoryPressureSource::new(Arc::clone(&guard));
assert_eq!(src.name(), "memory");
assert!(src.is_hard());
assert!(approx(src.weight(), 1.0));
assert!(
approx(src.sample().get(), 0.70),
"sample should mirror guard.pressure_ratio(), got {}",
src.sample().get()
);
}
}