use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use super::source::UnifiedPressure;
pub trait GateActuator: Send + Sync {
fn pause(&self);
fn resume(&self);
}
pub struct ObservingActuator {
inner: Box<dyn GateActuator>,
source: &'static str,
}
impl ObservingActuator {
#[must_use]
pub fn new(source: &'static str, inner: Box<dyn GateActuator>) -> Self {
Self { inner, source }
}
}
impl GateActuator for ObservingActuator {
fn pause(&self) {
#[cfg(feature = "metrics")]
{
::metrics::gauge!("inbound_paused").set(1.0);
::metrics::counter!("self_regulation_inbound_pauses_total").increment(1);
}
tracing::warn!(
source = self.source,
"self-regulation: inbound PAUSED under pressure (memory/back-pressure brake)"
);
self.inner.pause();
}
fn resume(&self) {
#[cfg(feature = "metrics")]
::metrics::gauge!("inbound_paused").set(0.0);
tracing::info!(
source = self.source,
"self-regulation: inbound RESUMED, pressure cleared"
);
self.inner.resume();
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopActuator;
impl GateActuator for NoopActuator {
fn pause(&self) {}
fn resume(&self) {}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Admit {
Yes,
Hold,
}
pub struct InboundGate {
pressure: Arc<UnifiedPressure>,
actuator: Box<dyn GateActuator>,
paused_edge: AtomicBool,
}
impl InboundGate {
#[must_use]
pub fn new(pressure: Arc<UnifiedPressure>, actuator: Box<dyn GateActuator>) -> Self {
Self {
pressure,
actuator,
paused_edge: AtomicBool::new(false),
}
}
pub fn evaluate(&self) -> Admit {
let hold = self.pressure.should_hold();
if hold {
if self
.paused_edge
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.actuator.pause();
}
Admit::Hold
} else {
if self
.paused_edge
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.actuator.resume();
}
Admit::Yes
}
}
#[must_use]
pub fn is_held(&self) -> bool {
self.paused_edge.load(Ordering::Acquire)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::governor::source::{Hysteresis, Pressure, PressureSource};
use std::sync::atomic::{AtomicU64, AtomicUsize};
struct MockSource {
value: AtomicU64,
hard: bool,
}
impl MockSource {
fn new(value: f64, hard: bool) -> Self {
Self {
value: AtomicU64::new(value.to_bits()),
hard,
}
}
fn set(&self, value: f64) {
self.value.store(value.to_bits(), Ordering::Relaxed);
}
}
impl PressureSource for MockSource {
fn name(&self) -> &'static str {
"mock"
}
fn sample(&self) -> Pressure {
Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
}
fn is_hard(&self) -> bool {
self.hard
}
}
struct CountingActuator {
pause_calls: AtomicUsize,
resume_calls: AtomicUsize,
}
impl CountingActuator {
fn new() -> Self {
Self {
pause_calls: AtomicUsize::new(0),
resume_calls: AtomicUsize::new(0),
}
}
fn pauses(&self) -> usize {
self.pause_calls.load(Ordering::Relaxed)
}
fn resumes(&self) -> usize {
self.resume_calls.load(Ordering::Relaxed)
}
}
impl GateActuator for CountingActuator {
fn pause(&self) {
self.pause_calls.fetch_add(1, Ordering::Relaxed);
}
fn resume(&self) {
self.resume_calls.fetch_add(1, Ordering::Relaxed);
}
}
struct SharedActuator(Arc<CountingActuator>);
impl GateActuator for SharedActuator {
fn pause(&self) {
self.0.pause();
}
fn resume(&self) {
self.0.resume();
}
}
fn governor_with(source: Arc<MockSource>) -> Arc<UnifiedPressure> {
let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
Arc::new(UnifiedPressure::new(
vec![source as Arc<dyn PressureSource>],
hyst,
))
}
#[test]
fn gate_drives_actuator_exactly_once_per_edge() {
let mem = Arc::new(MockSource::new(0.10, true));
let pressure = governor_with(Arc::clone(&mem));
let counter = Arc::new(CountingActuator::new());
let gate = InboundGate::new(
Arc::clone(&pressure),
Box::new(SharedActuator(Arc::clone(&counter))),
);
assert_eq!(gate.evaluate(), Admit::Yes);
assert!(!gate.is_held());
assert_eq!(counter.pauses(), 0);
assert_eq!(counter.resumes(), 0);
mem.set(0.90);
assert_eq!(gate.evaluate(), Admit::Hold);
assert!(gate.is_held());
assert_eq!(counter.pauses(), 1, "pause once on rising edge");
assert_eq!(counter.resumes(), 0);
for _ in 0..5 {
assert_eq!(gate.evaluate(), Admit::Hold);
}
assert_eq!(counter.pauses(), 1, "no re-pause while latched");
assert_eq!(counter.resumes(), 0);
mem.set(0.70);
assert_eq!(gate.evaluate(), Admit::Hold);
assert_eq!(counter.pauses(), 1, "band holds, no re-pause");
assert_eq!(counter.resumes(), 0);
mem.set(0.50);
assert_eq!(gate.evaluate(), Admit::Yes);
assert!(!gate.is_held());
assert_eq!(counter.pauses(), 1);
assert_eq!(counter.resumes(), 1, "resume once on falling edge");
for _ in 0..5 {
assert_eq!(gate.evaluate(), Admit::Yes);
}
assert_eq!(counter.pauses(), 1);
assert_eq!(counter.resumes(), 1, "no re-resume while released");
mem.set(0.95);
assert_eq!(gate.evaluate(), Admit::Hold);
assert!(gate.is_held());
assert_eq!(counter.pauses(), 2, "latch re-arms, pause fires again");
assert_eq!(counter.resumes(), 1);
}
#[test]
fn noop_actuator_gate_still_tracks_held_state() {
let mem = Arc::new(MockSource::new(0.10, true));
let pressure = governor_with(Arc::clone(&mem));
let gate = InboundGate::new(Arc::clone(&pressure), Box::new(NoopActuator));
assert_eq!(gate.evaluate(), Admit::Yes);
assert!(!gate.is_held());
mem.set(0.90);
assert_eq!(gate.evaluate(), Admit::Hold);
assert!(gate.is_held());
mem.set(0.10);
assert_eq!(gate.evaluate(), Admit::Yes);
assert!(!gate.is_held());
}
}