use std::sync::Arc;
use crate::memory::MemoryGuard;
use super::config::SelfRegulationConfig;
use super::{ByteBudgetController, MemoryPressureSource, PressureSource, UnifiedPressure};
#[derive(Clone)]
pub struct SelfRegulationGovernor {
pressure: Arc<UnifiedPressure>,
budget: Arc<ByteBudgetController>,
}
impl SelfRegulationGovernor {
#[must_use]
pub fn pressure(&self) -> Arc<UnifiedPressure> {
Arc::clone(&self.pressure)
}
#[must_use]
pub fn budget(&self) -> Arc<ByteBudgetController> {
Arc::clone(&self.budget)
}
#[must_use]
pub fn inbound_gate(
&self,
source: &'static str,
actuator: Box<dyn super::GateActuator>,
) -> super::InboundGate {
super::InboundGate::new(
self.pressure(),
Box::new(super::ObservingActuator::new(source, actuator)),
)
}
#[cfg(feature = "transport-kafka")]
#[must_use]
pub fn attach_kafka_gate(
&self,
transport: crate::transport::kafka::KafkaTransport,
) -> crate::transport::kafka::KafkaTransport {
let gate = self.inbound_gate("kafka", transport.gate_actuator());
transport.with_inbound_gate(gate)
}
}
impl SelfRegulationConfig {
#[must_use]
pub fn build(&self, memory_guard: Arc<MemoryGuard>) -> Option<SelfRegulationGovernor> {
if !self.enabled {
tracing::info!("self-regulation governor disabled (self_regulation.enabled=false)");
return None;
}
let sources: Vec<Arc<dyn PressureSource>> =
vec![Arc::new(MemoryPressureSource::new(memory_guard)) as Arc<dyn PressureSource>];
let pressure = Arc::new(UnifiedPressure::new(sources, self.hysteresis()));
let budget = Arc::new(ByteBudgetController::new(
self.byte_budget_config(),
Arc::clone(&pressure),
));
tracing::info!(
profile = ?self.profile,
pause_above = self.pause_above,
resume_below = self.resume_below,
start_byte_budget = budget.byte_budget(),
"self-regulation governor enabled (default-on)"
);
Some(SelfRegulationGovernor { pressure, budget })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::{MemoryGuard, MemoryGuardConfig};
fn guard() -> Arc<MemoryGuard> {
Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes: 1024 * 1024,
..Default::default()
}))
}
#[test]
fn disabled_builds_nothing() {
let cfg = SelfRegulationConfig {
enabled: false,
..Default::default()
};
assert!(
cfg.build(guard()).is_none(),
"disabled -> no governor constructed (Options stay None)"
);
}
#[test]
fn enabled_builds_pressure_and_budget() {
let cfg = SelfRegulationConfig::default();
let gov = cfg.build(guard()).expect("enabled by default");
assert!(gov.pressure().level() < cfg.pause_above);
assert!(gov.budget().byte_budget() >= 1);
}
#[test]
fn pressure_reflects_memory_guard() {
let g = guard();
g.add_bytes(900 * 1024); let gov = SelfRegulationConfig::default()
.build(Arc::clone(&g))
.expect("enabled");
assert!(
gov.pressure().level() > 0.5,
"high memory should raise the pressure level, got {}",
gov.pressure().level()
);
}
}