Skip to main content

hyperi_rustlib/governor/
config.rs

1// Project:   hyperi-rustlib
2// File:      src/governor/config.rs
3// Purpose:   SelfRegulationConfig -- cascade-overridable governor settings
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Cascade-overridable configuration for the self-regulation governor.
10//!
11//! [`SelfRegulationConfig`] is the single config section that turns the
12//! data-plane governor ON (the default) or OFF. It is a sibling to
13//! [`MemoryGuardConfig`](crate::memory::MemoryGuardConfig) /
14//! [`ScalingPressureConfig`](crate::ScalingPressureConfig): loaded from the
15//! 8-layer cascade under the `self_regulation` key and registered in the
16//! config registry so the `/config` admin endpoint and hot-reload see it.
17//!
18//! # Default-ON, opt-out
19//!
20//! `enabled` defaults to `true`. When the `governor` feature is compiled in,
21//! the runtime builds the pressure governor and threads it into the
22//! transports + driver. To turn it OFF (byte-identical to pre-governor
23//! behaviour), set:
24//!
25//! ```yaml
26//! self_regulation:
27//!   enabled: false
28//! ```
29//!
30//! When `enabled = false` the runtime constructs NOTHING -- no pressure, no
31//! gate, no byte-budget controller -- so every `Option` stays `None` and the
32//! data path is the original whole-batch loop.
33
34use serde::{Deserialize, Serialize};
35
36use super::{ByteBudgetConfig, Hysteresis};
37
38/// Sizing profile for the self-regulation byte budget.
39///
40/// Mirrors the three Kafka `SelfRegulationProfile` names so a single
41/// `self_regulation.profile` config value reads the same regardless of
42/// transport. It tunes the AIMD byte-budget envelope (start / ceiling /
43/// step), not the Kafka client knobs.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum SelfRegulationProfile {
47    /// Maximum throughput: large start budget + high ceiling, coarse steps.
48    #[default]
49    Throughput,
50    /// Balanced: moderate budget envelope.
51    Balanced,
52    /// Low latency: small budget envelope so blocks stay small + bursty.
53    LowLatency,
54}
55
56impl SelfRegulationProfile {
57    /// The byte-budget envelope for this profile. The hysteresis band and
58    /// target utilisation are profile-independent (set in
59    /// [`SelfRegulationConfig`]); this only sizes the AIMD lever.
60    #[must_use]
61    fn byte_budget_envelope(self) -> (u64, u64, u64, usize) {
62        // (start_bytes, max_bytes, ai_step, record_cap)
63        match self {
64            Self::Throughput => (16 * 1024 * 1024, 128 * 1024 * 1024, 512 * 1024, 2000),
65            Self::Balanced => (8 * 1024 * 1024, 64 * 1024 * 1024, 256 * 1024, 1000),
66            Self::LowLatency => (1024 * 1024, 16 * 1024 * 1024, 128 * 1024, 500),
67        }
68    }
69}
70
71/// Default for [`SelfRegulationConfig::enabled`] -- the governor is ON by
72/// default (opt-out via `self_regulation.enabled = false`).
73const fn default_enabled() -> bool {
74    true
75}
76
77fn default_pause_above() -> f64 {
78    0.80
79}
80
81fn default_resume_below() -> f64 {
82    0.65
83}
84
85fn default_target_rho() -> f64 {
86    0.7
87}
88
89fn default_md_factor() -> f64 {
90    0.5
91}
92
93/// Cascade-overridable settings for the self-regulation governor.
94///
95/// Loaded under the `self_regulation` key. All fields have sensible defaults
96/// so an app that sets nothing gets a fully working, default-ON governor.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(default)]
99pub struct SelfRegulationConfig {
100    /// Master switch. `true` (default) -> the runtime builds the governor and
101    /// threads it into the transports + driver. `false` -> nothing is built
102    /// (byte-identical to pre-governor behaviour).
103    pub enabled: bool,
104
105    /// Sizing profile for the AIMD byte budget.
106    pub profile: SelfRegulationProfile,
107
108    /// Hysteresis: arm the inbound hold when combined pressure reaches this.
109    pub pause_above: f64,
110
111    /// Hysteresis: release the inbound hold when pressure drops to this.
112    /// Must be strictly less than `pause_above` (validated; an invalid band
113    /// falls back to the defaults).
114    pub resume_below: f64,
115
116    /// Target utilisation `rho` for the byte-budget AIMD loop, in `(0, 1)`.
117    pub target_rho: f64,
118
119    /// Multiplicative-decrease factor for the byte budget, in `(0, 1)`.
120    pub md_factor: f64,
121}
122
123impl Default for SelfRegulationConfig {
124    fn default() -> Self {
125        Self {
126            enabled: default_enabled(),
127            profile: SelfRegulationProfile::default(),
128            pause_above: default_pause_above(),
129            resume_below: default_resume_below(),
130            target_rho: default_target_rho(),
131            md_factor: default_md_factor(),
132        }
133    }
134}
135
136impl SelfRegulationConfig {
137    /// Load from the config cascade under the `self_regulation` key, registering
138    /// the section so the `/config` admin endpoint + hot-reload see it.
139    ///
140    /// Falls back to [`SelfRegulationConfig::default()`] (default-ON) when the
141    /// cascade is not initialised or the key is absent.
142    #[must_use]
143    pub fn from_cascade() -> Self {
144        #[cfg(feature = "config")]
145        {
146            // `or_warn`: absent `self_regulation` key -> default-ON (silent);
147            // present-but-malformed -> WARN + default (was silently swallowed
148            // pre-2.8.11). Absent-key default-ON behaviour is unchanged.
149            if let Some(cfg) = crate::config::try_get()
150                && let Some(value) = cfg.unmarshal_key_registered_or_warn::<Self>("self_regulation")
151            {
152                return value;
153            }
154        }
155        Self::default()
156    }
157
158    /// Build the [`Hysteresis`] band from the config.
159    ///
160    /// An inverted / non-finite band falls back to the defaults
161    /// (`0.80 / 0.65`) rather than failing -- a bad knob must not wedge the
162    /// governor.
163    #[must_use]
164    pub fn hysteresis(&self) -> Hysteresis {
165        Hysteresis::new(self.pause_above, self.resume_below).unwrap_or_else(|e| {
166            tracing::warn!(
167                error = %e,
168                "invalid self_regulation hysteresis band; using defaults 0.80/0.65"
169            );
170            Hysteresis::new(default_pause_above(), default_resume_below())
171                .expect("default band is valid")
172        })
173    }
174
175    /// Build the [`ByteBudgetConfig`] from the profile envelope + overridable
176    /// `target_rho` / `md_factor`. The controller sanitises ranges itself.
177    #[must_use]
178    pub fn byte_budget_config(&self) -> ByteBudgetConfig {
179        let (start_bytes, max_bytes, ai_step, record_cap) = self.profile.byte_budget_envelope();
180        ByteBudgetConfig {
181            start_bytes,
182            max_bytes,
183            ai_step,
184            record_cap,
185            target_rho: self.target_rho,
186            md_factor: self.md_factor,
187            ..ByteBudgetConfig::default()
188        }
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn default_is_enabled() {
198        let cfg = SelfRegulationConfig::default();
199        assert!(cfg.enabled, "governor is ON by default (opt-out)");
200        assert_eq!(cfg.profile, SelfRegulationProfile::Throughput);
201    }
202
203    #[test]
204    fn from_cascade_falls_back_to_default_on() {
205        // No cascade initialised in this unit test -> default (enabled).
206        let cfg = SelfRegulationConfig::from_cascade();
207        assert!(cfg.enabled);
208    }
209
210    #[test]
211    fn hysteresis_uses_config_band() {
212        let cfg = SelfRegulationConfig {
213            pause_above: 0.9,
214            resume_below: 0.5,
215            ..Default::default()
216        };
217        let h = cfg.hysteresis();
218        assert!((h.pause_above - 0.9).abs() < 1e-9);
219        assert!((h.resume_below - 0.5).abs() < 1e-9);
220    }
221
222    #[test]
223    fn inverted_band_falls_back_to_defaults() {
224        let cfg = SelfRegulationConfig {
225            pause_above: 0.3,
226            resume_below: 0.8, // inverted
227            ..Default::default()
228        };
229        let h = cfg.hysteresis();
230        assert!((h.pause_above - 0.80).abs() < 1e-9);
231        assert!((h.resume_below - 0.65).abs() < 1e-9);
232    }
233
234    /// An out-of-`[0,1]` band (here a negative resume that could never release
235    /// the latch) must fall back to the safe defaults, not wedge the governor.
236    #[test]
237    fn out_of_range_band_falls_back_to_defaults() {
238        let cfg = SelfRegulationConfig {
239            pause_above: 0.9,
240            resume_below: -0.2, // below the pressure clamp floor
241            ..Default::default()
242        };
243        let h = cfg.hysteresis();
244        assert!((h.pause_above - 0.80).abs() < 1e-9);
245        assert!((h.resume_below - 0.65).abs() < 1e-9);
246    }
247
248    #[test]
249    fn profile_sizes_the_byte_budget() {
250        let tp = SelfRegulationConfig {
251            profile: SelfRegulationProfile::Throughput,
252            ..Default::default()
253        }
254        .byte_budget_config();
255        let ll = SelfRegulationConfig {
256            profile: SelfRegulationProfile::LowLatency,
257            ..Default::default()
258        }
259        .byte_budget_config();
260        assert!(
261            tp.start_bytes > ll.start_bytes,
262            "throughput starts bigger than low-latency"
263        );
264        assert!(tp.max_bytes > ll.max_bytes);
265    }
266
267    #[cfg(feature = "config")]
268    #[test]
269    fn serde_roundtrip_and_disabled_parse() {
270        let yaml = "enabled: false\nprofile: low_latency\n";
271        let cfg: SelfRegulationConfig = serde_yaml_ng::from_str(yaml).unwrap();
272        assert!(!cfg.enabled);
273        assert_eq!(cfg.profile, SelfRegulationProfile::LowLatency);
274        // Defaults fill the rest.
275        assert!((cfg.pause_above - 0.80).abs() < 1e-9);
276    }
277
278    /// The governor profile must serialise as snake_case so the
279    /// `self_regulation.profile` cascade key reads identically to the Kafka
280    /// sizing profile (rustlib<->pylib config-consistency rule).
281    #[cfg(feature = "config")]
282    #[test]
283    fn profile_serialises_snake_case() {
284        let j = serde_json::to_string(&SelfRegulationProfile::LowLatency).unwrap();
285        assert_eq!(j, "\"low_latency\"");
286        let j = serde_json::to_string(&SelfRegulationProfile::Throughput).unwrap();
287        assert_eq!(j, "\"throughput\"");
288        let j = serde_json::to_string(&SelfRegulationProfile::Balanced).unwrap();
289        assert_eq!(j, "\"balanced\"");
290    }
291}