1use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use crate::memory::MemoryGuard;
15
16#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
22pub struct Pressure(f64);
23
24impl Pressure {
25 #[must_use]
27 pub fn new(value: f64) -> Self {
28 let v = if value.is_nan() {
31 0.0
32 } else {
33 value.clamp(0.0, 1.0)
34 };
35 Self(v)
36 }
37
38 #[must_use]
40 pub fn get(&self) -> f64 {
41 self.0
42 }
43}
44
45pub trait PressureSource: Send + Sync {
52 fn name(&self) -> &'static str;
54
55 fn sample(&self) -> Pressure;
57
58 fn weight(&self) -> f64 {
61 1.0
62 }
63
64 fn is_hard(&self) -> bool {
67 false
68 }
69}
70
71pub struct MemoryPressureSource(Arc<MemoryGuard>);
76
77impl MemoryPressureSource {
78 #[must_use]
80 pub fn new(guard: Arc<MemoryGuard>) -> Self {
81 Self(guard)
82 }
83}
84
85impl PressureSource for MemoryPressureSource {
86 fn name(&self) -> &'static str {
87 "memory"
88 }
89
90 fn sample(&self) -> Pressure {
91 Pressure::new(self.0.pressure_ratio())
92 }
93
94 fn weight(&self) -> f64 {
95 1.0
96 }
97
98 fn is_hard(&self) -> bool {
99 true
100 }
101}
102
103#[derive(Debug, Clone, Copy)]
109pub struct Hysteresis {
110 pub pause_above: f64,
112 pub resume_below: f64,
114}
115
116impl Hysteresis {
117 pub fn new(pause_above: f64, resume_below: f64) -> Result<Self, String> {
124 if !pause_above.is_finite() || !resume_below.is_finite() {
125 return Err(format!(
126 "hysteresis bounds must be finite, got pause_above={pause_above}, \
127 resume_below={resume_below}"
128 ));
129 }
130 if pause_above <= resume_below {
131 return Err(format!(
132 "hysteresis requires pause_above > resume_below, got \
133 pause_above={pause_above}, resume_below={resume_below}"
134 ));
135 }
136 Ok(Self {
137 pause_above,
138 resume_below,
139 })
140 }
141}
142
143#[derive(Debug, Clone)]
145pub struct SourceReading {
146 pub name: &'static str,
148 pub raw: f64,
150 pub weight: f64,
152 pub is_hard: bool,
154 pub effective: f64,
157}
158
159#[derive(Debug, Clone)]
161pub struct UnifiedPressureSnapshot {
162 pub sources: Vec<SourceReading>,
164 pub hard_max: f64,
166 pub soft_max: f64,
168 pub level: f64,
170 pub paused: bool,
172}
173
174pub struct UnifiedPressure {
180 sources: Vec<Arc<dyn PressureSource>>,
181 hyst: Hysteresis,
182 paused: AtomicBool,
183}
184
185impl UnifiedPressure {
186 #[must_use]
188 pub fn new(sources: Vec<Arc<dyn PressureSource>>, hyst: Hysteresis) -> Self {
189 Self {
190 sources,
191 hyst,
192 paused: AtomicBool::new(false),
193 }
194 }
195
196 pub fn add_source(&mut self, source: Arc<dyn PressureSource>) {
203 self.sources.push(source);
204 }
205
206 #[must_use]
212 pub fn level(&self) -> f64 {
213 let mut hard_max = 0.0_f64;
214 let mut soft_max = 0.0_f64;
215 for src in &self.sources {
216 let raw = src.sample().get();
217 if src.is_hard() {
218 hard_max = hard_max.max(raw);
219 } else {
220 soft_max = soft_max.max(raw * src.weight());
221 }
222 }
223 hard_max.max(soft_max)
224 }
225
226 #[must_use]
232 pub fn should_hold(&self) -> bool {
233 let level = self.level();
234 let paused = self.paused.load(Ordering::Acquire);
235 if paused {
236 if level <= self.hyst.resume_below {
237 self.paused.store(false, Ordering::Release);
238 return false;
239 }
240 true
241 } else {
242 if level >= self.hyst.pause_above {
243 self.paused.store(true, Ordering::Release);
244 return true;
245 }
246 false
247 }
248 }
249
250 #[must_use]
252 pub fn snapshot(&self) -> UnifiedPressureSnapshot {
253 let mut readings = Vec::with_capacity(self.sources.len());
254 let mut hard_max = 0.0_f64;
255 let mut soft_max = 0.0_f64;
256 for src in &self.sources {
257 let raw = src.sample().get();
258 let weight = src.weight();
259 let is_hard = src.is_hard();
260 let effective = if is_hard { raw } else { raw * weight };
261 if is_hard {
262 hard_max = hard_max.max(raw);
263 } else {
264 soft_max = soft_max.max(effective);
265 }
266 readings.push(SourceReading {
267 name: src.name(),
268 raw,
269 weight,
270 is_hard,
271 effective,
272 });
273 }
274 UnifiedPressureSnapshot {
275 sources: readings,
276 hard_max,
277 soft_max,
278 level: hard_max.max(soft_max),
279 paused: self.paused.load(Ordering::Acquire),
280 }
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use std::sync::atomic::AtomicU64;
288
289 struct MockSource {
295 name: &'static str,
296 value: AtomicU64,
297 weight: f64,
298 hard: bool,
299 }
300
301 impl MockSource {
302 fn new(name: &'static str, value: f64, weight: f64, hard: bool) -> Self {
303 Self {
304 name,
305 value: AtomicU64::new(value.to_bits()),
306 weight,
307 hard,
308 }
309 }
310
311 fn set(&self, value: f64) {
312 self.value.store(value.to_bits(), Ordering::Relaxed);
313 }
314 }
315
316 impl PressureSource for MockSource {
317 fn name(&self) -> &'static str {
318 self.name
319 }
320 fn sample(&self) -> Pressure {
321 Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
322 }
323 fn weight(&self) -> f64 {
324 self.weight
325 }
326 fn is_hard(&self) -> bool {
327 self.hard
328 }
329 }
330
331 fn approx(a: f64, b: f64) -> bool {
332 (a - b).abs() < 1e-9
333 }
334
335 #[test]
336 fn pressure_clamps_and_handles_nan() {
337 assert!(approx(Pressure::new(-1.0).get(), 0.0));
338 assert!(approx(Pressure::new(2.0).get(), 1.0));
339 assert!(approx(Pressure::new(0.5).get(), 0.5));
340 assert!(approx(Pressure::new(f64::NAN).get(), 0.0));
343 assert!(approx(Pressure::new(f64::INFINITY).get(), 1.0));
344 assert!(approx(Pressure::new(f64::NEG_INFINITY).get(), 0.0));
345 }
346
347 #[test]
348 fn hysteresis_rejects_inverted_band() {
349 assert!(Hysteresis::new(0.80, 0.65).is_ok());
350 assert!(Hysteresis::new(0.65, 0.80).is_err());
351 assert!(Hysteresis::new(0.80, 0.80).is_err());
352 assert!(Hysteresis::new(f64::NAN, 0.5).is_err());
353 }
354
355 #[test]
368 fn adversarial_combine_and_hysteresis() {
369 let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
370
371 let mem = Arc::new(MockSource::new("memory", 0.50, 1.0, true));
373 let cpu = Arc::new(MockSource::new("cpu", 1.0, 0.5, false));
374
375 let governor = UnifiedPressure::new(
376 vec![
377 Arc::clone(&mem) as Arc<dyn PressureSource>,
378 Arc::clone(&cpu) as Arc<dyn PressureSource>,
379 ],
380 hyst,
381 );
382
383 assert!(
388 approx(governor.level(), 0.50),
389 "level should be 0.50, got {}",
390 governor.level()
391 );
392 assert!(
393 !governor.should_hold(),
394 "saturated soft signal must not mask/force a hold"
395 );
396
397 mem.set(0.85);
399 assert!(approx(governor.level(), 0.85), "hard 0.85 dominates");
400 assert!(
401 governor.should_hold(),
402 "rising edge above pause_above latches"
403 );
404
405 mem.set(0.70);
407 assert!(approx(governor.level(), 0.70));
408 assert!(
409 governor.should_hold(),
410 "0.70 is inside the hysteresis band -> latch stays held"
411 );
412
413 mem.set(0.60);
415 assert!(approx(governor.level(), 0.60));
416 assert!(
417 !governor.should_hold(),
418 "falling edge below resume_below releases the latch"
419 );
420
421 mem.set(0.85);
423 assert!(
424 governor.should_hold(),
425 "latch must re-arm cleanly with no sticky state"
426 );
427
428 mem.set(0.10);
432 let mut governor = governor;
433 let queue = Arc::new(MockSource::new("queue_depth", 0.0, 0.5, false));
434 governor.add_source(Arc::clone(&queue) as Arc<dyn PressureSource>);
435
436 cpu.set(0.0);
438 assert!(!governor.should_hold(), "all sources low -> released");
439
440 queue.set(1.0);
444 assert!(
445 approx(governor.level(), 0.50),
446 "new soft source weighted in"
447 );
448 assert!(
449 !governor.should_hold(),
450 "weighted third soft source still cannot force a hold"
451 );
452
453 mem.set(0.90);
455 assert!(approx(governor.level(), 0.90), "hard signal unmasked");
456 assert!(
457 governor.should_hold(),
458 "hard signal re-arms over soft sources"
459 );
460 }
461
462 #[test]
463 fn snapshot_reports_per_source_breakdown() {
464 let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
465 let mem = Arc::new(MockSource::new("memory", 0.70, 1.0, true));
466 let cpu = Arc::new(MockSource::new("cpu", 0.40, 0.5, false));
467 let governor = UnifiedPressure::new(
468 vec![
469 mem as Arc<dyn PressureSource>,
470 cpu as Arc<dyn PressureSource>,
471 ],
472 hyst,
473 );
474
475 let snap = governor.snapshot();
476 assert_eq!(snap.sources.len(), 2);
477 assert!(approx(snap.hard_max, 0.70));
478 assert!(approx(snap.soft_max, 0.20)); assert!(approx(snap.level, 0.70));
480 assert!(!snap.paused);
481
482 let cpu_reading = snap
483 .sources
484 .iter()
485 .find(|r| r.name == "cpu")
486 .expect("cpu present");
487 assert!(!cpu_reading.is_hard);
488 assert!(approx(cpu_reading.effective, 0.20));
489 }
490
491 #[test]
492 fn memory_pressure_source_wraps_guard_as_hard() {
493 use crate::memory::{MemoryGuard, MemoryGuardConfig};
494
495 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
496 limit_bytes: 1000,
497 pressure_threshold: 0.80,
498 ..Default::default()
499 }));
500 guard.add_bytes(700); let src = MemoryPressureSource::new(Arc::clone(&guard));
502
503 assert_eq!(src.name(), "memory");
504 assert!(src.is_hard());
505 assert!(approx(src.weight(), 1.0));
506 assert!(
507 approx(src.sample().get(), 0.70),
508 "sample should mirror guard.pressure_ratio(), got {}",
509 src.sample().get()
510 );
511 }
512}