Skip to main content

hyperi_rustlib/governor/
runtime.rs

1// Project:   hyperi-rustlib
2// File:      src/governor/runtime.rs
3// Purpose:   SelfRegulationGovernor -- the built, wired-in governor bundle
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! The constructed governor bundle the runtime threads into transports + driver.
10//!
11//! [`SelfRegulationGovernor`] is what [`SelfRegulationConfig::build`] produces
12//! when `enabled = true`: ONE shared [`UnifiedPressure`] (memory-only HARD
13//! source today) plus ONE [`ByteBudgetController`] (the AIMD lever) over that
14//! same pressure. The runtime constructs it BEFORE the transports and the
15//! engine driver so the pressure can be threaded into:
16//!
17//! - the inbound gate of each receive transport (Kafka pause-partitions,
18//!   HTTP/gRPC 503), via [`pressure`](Self::pressure);
19//! - the byte-budget lever feeding the streaming driver's sub-block size +
20//!   recv `max`, via [`budget`](Self::budget).
21//!
22//! When `enabled = false` the runtime builds NOTHING -- it never calls
23//! [`SelfRegulationConfig::build`] -- so all the downstream `Option`s stay
24//! `None` and the data path is byte-identical to pre-governor behaviour.
25
26use std::sync::Arc;
27
28use crate::memory::MemoryGuard;
29
30use super::config::SelfRegulationConfig;
31use super::{ByteBudgetController, MemoryPressureSource, PressureSource, UnifiedPressure};
32
33/// The constructed self-regulation governor: shared pressure + byte budget.
34///
35/// Built once by the runtime when self-regulation is enabled, then cloned
36/// (cheap `Arc` bumps) into the transports and the driver.
37#[derive(Clone)]
38pub struct SelfRegulationGovernor {
39    pressure: Arc<UnifiedPressure>,
40    budget: Arc<ByteBudgetController>,
41}
42
43impl SelfRegulationGovernor {
44    /// The shared pressure governor. Thread a clone into each receive
45    /// transport's inbound gate (`InboundGate::new(governor.pressure(), ...)`)
46    /// and into the HTTP/gRPC `with_pressure(Some(...))` hooks.
47    #[must_use]
48    pub fn pressure(&self) -> Arc<UnifiedPressure> {
49        Arc::clone(&self.pressure)
50    }
51
52    /// The shared byte-budget controller (AIMD lever). The streaming driver
53    /// reads [`byte_budget`](ByteBudgetController::byte_budget) per block for
54    /// the sub-block size + recv `max`, and calls
55    /// [`observe`](ByteBudgetController::observe) per block.
56    #[must_use]
57    pub fn budget(&self) -> Arc<ByteBudgetController> {
58        Arc::clone(&self.budget)
59    }
60
61    /// Build an [`InboundGate`](super::InboundGate) for a given source label
62    /// and actuator, wrapped in an [`ObservingActuator`](super::ObservingActuator)
63    /// so pause/resume edges emit metrics + brake-reason logs.
64    ///
65    /// This is the one-call form of the gate dance: pass the transport's own
66    /// actuator (e.g. `KafkaTransport::gate_actuator()`) and a label, get back
67    /// a gate over THIS governor's shared pressure. Hand the result to
68    /// `KafkaTransport::with_inbound_gate(...)`.
69    #[must_use]
70    pub fn inbound_gate(
71        &self,
72        source: &'static str,
73        actuator: Box<dyn super::GateActuator>,
74    ) -> super::InboundGate {
75        super::InboundGate::new(
76            self.pressure(),
77            Box::new(super::ObservingActuator::new(source, actuator)),
78        )
79    }
80
81    /// Attach a self-regulation inbound gate to a Kafka receive transport
82    /// (`transport-kafka` feature) -- the full
83    /// `gate_actuator -> InboundGate -> with_inbound_gate` dance in one call.
84    ///
85    /// Pauses the consumer's ASSIGNED partitions under pressure (member stays
86    /// in the group -- no rebalance) with pause/resume edges observed via
87    /// [`ObservingActuator`](super::ObservingActuator). Returns the transport
88    /// with the gate attached.
89    #[cfg(feature = "transport-kafka")]
90    #[must_use]
91    pub fn attach_kafka_gate(
92        &self,
93        transport: crate::transport::kafka::KafkaTransport,
94    ) -> crate::transport::kafka::KafkaTransport {
95        let gate = self.inbound_gate("kafka", transport.gate_actuator());
96        transport.with_inbound_gate(gate)
97    }
98}
99
100impl SelfRegulationConfig {
101    /// Construct the [`SelfRegulationGovernor`] over a shared memory guard.
102    ///
103    /// Returns `None` when `enabled = false` -- the caller then builds nothing
104    /// and the data path stays byte-identical to pre-governor behaviour.
105    ///
106    /// Construction order matters: this is called BEFORE the transports + the
107    /// engine driver so the produced pressure / budget can be threaded in.
108    #[must_use]
109    pub fn build(&self, memory_guard: Arc<MemoryGuard>) -> Option<SelfRegulationGovernor> {
110        if !self.enabled {
111            tracing::info!("self-regulation governor disabled (self_regulation.enabled=false)");
112            return None;
113        }
114
115        // ONE pressure governor over a single HARD memory source. New SOFT
116        // sources (CPU, queue depth) plug in later via `add_source` with zero
117        // change to the gate / budget APIs.
118        let sources: Vec<Arc<dyn PressureSource>> =
119            vec![Arc::new(MemoryPressureSource::new(memory_guard)) as Arc<dyn PressureSource>];
120        let pressure = Arc::new(UnifiedPressure::new(sources, self.hysteresis()));
121
122        // ONE byte-budget controller (AIMD lever) over the SAME pressure, so
123        // its memory HARD override consults the same latch the gate does.
124        let budget = Arc::new(ByteBudgetController::new(
125            self.byte_budget_config(),
126            Arc::clone(&pressure),
127        ));
128
129        tracing::info!(
130            profile = ?self.profile,
131            pause_above = self.pause_above,
132            resume_below = self.resume_below,
133            start_byte_budget = budget.byte_budget(),
134            "self-regulation governor enabled (default-on)"
135        );
136
137        Some(SelfRegulationGovernor { pressure, budget })
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use crate::memory::{MemoryGuard, MemoryGuardConfig};
145
146    fn guard() -> Arc<MemoryGuard> {
147        Arc::new(MemoryGuard::new(MemoryGuardConfig {
148            limit_bytes: 1024 * 1024,
149            ..Default::default()
150        }))
151    }
152
153    #[test]
154    fn disabled_builds_nothing() {
155        let cfg = SelfRegulationConfig {
156            enabled: false,
157            ..Default::default()
158        };
159        assert!(
160            cfg.build(guard()).is_none(),
161            "disabled -> no governor constructed (Options stay None)"
162        );
163    }
164
165    #[test]
166    fn enabled_builds_pressure_and_budget() {
167        let cfg = SelfRegulationConfig::default();
168        let gov = cfg.build(guard()).expect("enabled by default");
169        // Pressure is low (empty guard) -> gate would admit, budget starts big.
170        assert!(gov.pressure().level() < cfg.pause_above);
171        assert!(gov.budget().byte_budget() >= 1);
172    }
173
174    #[test]
175    fn pressure_reflects_memory_guard() {
176        let g = guard();
177        g.add_bytes(900 * 1024); // ~88% of 1 MiB
178        let gov = SelfRegulationConfig::default()
179            .build(Arc::clone(&g))
180            .expect("enabled");
181        // The HARD memory source feeds the pressure level directly.
182        assert!(
183            gov.pressure().level() > 0.5,
184            "high memory should raise the pressure level, got {}",
185            gov.pressure().level()
186        );
187    }
188}