hyperi_rustlib/governor/runtime.rs
1// Project: hyperi-rustlib
2// File: src/governor/runtime.rs
3// Purpose: SelfRegulationGovernor -- the built, wired-in governor bundle
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! The constructed governor bundle the runtime threads into transports + driver.
10//!
11//! [`SelfRegulationGovernor`] is what [`SelfRegulationConfig::build`] produces
12//! when `enabled = true`: ONE shared [`UnifiedPressure`] (memory-only HARD
13//! source today) plus ONE [`ByteBudgetController`] (the AIMD lever) over that
14//! same pressure. The runtime constructs it BEFORE the transports and the
15//! engine driver so the pressure can be threaded into:
16//!
17//! - the inbound gate of each receive transport (Kafka pause-partitions,
18//! HTTP/gRPC 503), via [`pressure`](Self::pressure);
19//! - the byte-budget lever feeding the streaming driver's sub-block size +
20//! recv `max`, via [`budget`](Self::budget).
21//!
22//! When `enabled = false` the runtime builds NOTHING -- it never calls
23//! [`SelfRegulationConfig::build`] -- so all the downstream `Option`s stay
24//! `None` and the data path is byte-identical to pre-governor behaviour.
25
26use std::sync::Arc;
27
28use crate::memory::MemoryGuard;
29
30use super::config::SelfRegulationConfig;
31use super::{ByteBudgetController, MemoryPressureSource, PressureSource, UnifiedPressure};
32
33/// The constructed self-regulation governor: shared pressure + byte budget.
34///
35/// Built once by the runtime when self-regulation is enabled, then cloned
36/// (cheap `Arc` bumps) into the transports and the driver.
37#[derive(Clone)]
38pub struct SelfRegulationGovernor {
39 pressure: Arc<UnifiedPressure>,
40 budget: Arc<ByteBudgetController>,
41}
42
43impl SelfRegulationGovernor {
44 /// The shared pressure governor. Thread a clone into each receive
45 /// transport's inbound gate (`InboundGate::new(governor.pressure(), ...)`)
46 /// and into the HTTP/gRPC `with_pressure(Some(...))` hooks.
47 #[must_use]
48 pub fn pressure(&self) -> Arc<UnifiedPressure> {
49 Arc::clone(&self.pressure)
50 }
51
52 /// The shared byte-budget controller (AIMD lever). The streaming driver
53 /// reads [`byte_budget`](ByteBudgetController::byte_budget) per block for
54 /// the sub-block size + recv `max`, and calls
55 /// [`observe`](ByteBudgetController::observe) per block.
56 #[must_use]
57 pub fn budget(&self) -> Arc<ByteBudgetController> {
58 Arc::clone(&self.budget)
59 }
60
61 /// Build an [`InboundGate`](super::InboundGate) for a given source label
62 /// and actuator, wrapped in an [`ObservingActuator`](super::ObservingActuator)
63 /// so pause/resume edges emit metrics + brake-reason logs.
64 ///
65 /// This is the one-call form of the gate dance: pass the transport's own
66 /// actuator (e.g. `KafkaTransport::gate_actuator()`) and a label, get back
67 /// a gate over THIS governor's shared pressure. Hand the result to
68 /// `KafkaTransport::with_inbound_gate(...)`.
69 #[must_use]
70 pub fn inbound_gate(
71 &self,
72 source: &'static str,
73 actuator: Box<dyn super::GateActuator>,
74 ) -> super::InboundGate {
75 super::InboundGate::new(
76 self.pressure(),
77 Box::new(super::ObservingActuator::new(source, actuator)),
78 )
79 }
80
81 /// Attach a self-regulation inbound gate to a Kafka receive transport
82 /// (`transport-kafka` feature) -- the full
83 /// `gate_actuator -> InboundGate -> with_inbound_gate` dance in one call.
84 ///
85 /// Pauses the consumer's ASSIGNED partitions under pressure (member stays
86 /// in the group -- no rebalance) with pause/resume edges observed via
87 /// [`ObservingActuator`](super::ObservingActuator). Returns the transport
88 /// with the gate attached.
89 #[cfg(feature = "transport-kafka")]
90 #[must_use]
91 pub fn attach_kafka_gate(
92 &self,
93 transport: crate::transport::kafka::KafkaTransport,
94 ) -> crate::transport::kafka::KafkaTransport {
95 let gate = self.inbound_gate("kafka", transport.gate_actuator());
96 transport.with_inbound_gate(gate)
97 }
98}
99
100impl SelfRegulationConfig {
101 /// Construct the [`SelfRegulationGovernor`] over a shared memory guard.
102 ///
103 /// Returns `None` when `enabled = false` -- the caller then builds nothing
104 /// and the data path stays byte-identical to pre-governor behaviour.
105 ///
106 /// Construction order matters: this is called BEFORE the transports + the
107 /// engine driver so the produced pressure / budget can be threaded in.
108 #[must_use]
109 pub fn build(&self, memory_guard: Arc<MemoryGuard>) -> Option<SelfRegulationGovernor> {
110 if !self.enabled {
111 tracing::info!("self-regulation governor disabled (self_regulation.enabled=false)");
112 return None;
113 }
114
115 // ONE pressure governor over a single HARD memory source. New SOFT
116 // sources (CPU, queue depth) plug in later via `add_source` with zero
117 // change to the gate / budget APIs.
118 let sources: Vec<Arc<dyn PressureSource>> =
119 vec![Arc::new(MemoryPressureSource::new(memory_guard)) as Arc<dyn PressureSource>];
120 let pressure = Arc::new(UnifiedPressure::new(sources, self.hysteresis()));
121
122 // ONE byte-budget controller (AIMD lever) over the SAME pressure, so
123 // its memory HARD override consults the same latch the gate does.
124 let budget = Arc::new(ByteBudgetController::new(
125 self.byte_budget_config(),
126 Arc::clone(&pressure),
127 ));
128
129 tracing::info!(
130 profile = ?self.profile,
131 pause_above = self.pause_above,
132 resume_below = self.resume_below,
133 start_byte_budget = budget.byte_budget(),
134 "self-regulation governor enabled (default-on)"
135 );
136
137 Some(SelfRegulationGovernor { pressure, budget })
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use crate::memory::{MemoryGuard, MemoryGuardConfig};
145
146 fn guard() -> Arc<MemoryGuard> {
147 Arc::new(MemoryGuard::new(MemoryGuardConfig {
148 limit_bytes: 1024 * 1024,
149 ..Default::default()
150 }))
151 }
152
153 #[test]
154 fn disabled_builds_nothing() {
155 let cfg = SelfRegulationConfig {
156 enabled: false,
157 ..Default::default()
158 };
159 assert!(
160 cfg.build(guard()).is_none(),
161 "disabled -> no governor constructed (Options stay None)"
162 );
163 }
164
165 #[test]
166 fn enabled_builds_pressure_and_budget() {
167 let cfg = SelfRegulationConfig::default();
168 let gov = cfg.build(guard()).expect("enabled by default");
169 // Pressure is low (empty guard) -> gate would admit, budget starts big.
170 assert!(gov.pressure().level() < cfg.pause_above);
171 assert!(gov.budget().byte_budget() >= 1);
172 }
173
174 #[test]
175 fn pressure_reflects_memory_guard() {
176 let g = guard();
177 g.add_bytes(900 * 1024); // ~88% of 1 MiB
178 let gov = SelfRegulationConfig::default()
179 .build(Arc::clone(&g))
180 .expect("enabled");
181 // The HARD memory source feeds the pressure level directly.
182 assert!(
183 gov.pressure().level() > 0.5,
184 "high memory should raise the pressure level, got {}",
185 gov.pressure().level()
186 );
187 }
188}