Skip to main content

hyperi_rustlib/governor/
config.rs

1// Project:   hyperi-rustlib
2// File:      src/governor/config.rs
3// Purpose:   SelfRegulationConfig -- cascade-overridable governor settings
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Cascade-overridable configuration for the self-regulation governor.
10//!
11//! [`SelfRegulationConfig`] is the single config section that turns the
12//! data-plane governor ON (the default) or OFF. It is a sibling to
13//! [`MemoryGuardConfig`](crate::memory::MemoryGuardConfig) /
14//! [`ScalingPressureConfig`](crate::ScalingPressureConfig): loaded from the
15//! 8-layer cascade under the `self_regulation` key and registered in the
16//! config registry so the `/config` admin endpoint and hot-reload see it.
17//!
18//! # Default-ON, opt-out
19//!
20//! `enabled` defaults to `true`. When the `governor` feature is compiled in,
21//! the runtime builds the pressure governor and threads it into the
22//! transports + driver. To turn it OFF (byte-identical to pre-governor
23//! behaviour), set:
24//!
25//! ```yaml
26//! self_regulation:
27//!   enabled: false
28//! ```
29//!
30//! When `enabled = false` the runtime constructs NOTHING -- no pressure, no
31//! gate, no byte-budget controller -- so every `Option` stays `None` and the
32//! data path is the original whole-batch loop.
33
34use serde::{Deserialize, Serialize};
35
36use super::{ByteBudgetConfig, Hysteresis};
37
38/// Sizing profile for the self-regulation byte budget.
39///
40/// Mirrors the three Kafka `SelfRegulationProfile` names so a single
41/// `self_regulation.profile` config value reads the same regardless of
42/// transport. It tunes the AIMD byte-budget envelope (start / ceiling /
43/// step), not the Kafka client knobs.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum SelfRegulationProfile {
47    /// Maximum throughput: large start budget + high ceiling, coarse steps.
48    #[default]
49    Throughput,
50    /// Balanced: moderate budget envelope.
51    Balanced,
52    /// Low latency: small budget envelope so blocks stay small + bursty.
53    LowLatency,
54}
55
56impl SelfRegulationProfile {
57    /// The byte-budget envelope for this profile. The hysteresis band and
58    /// target utilisation are profile-independent (set in
59    /// [`SelfRegulationConfig`]); this only sizes the AIMD lever.
60    #[must_use]
61    fn byte_budget_envelope(self) -> (u64, u64, u64, usize) {
62        // (start_bytes, max_bytes, ai_step, record_cap)
63        match self {
64            Self::Throughput => (16 * 1024 * 1024, 128 * 1024 * 1024, 512 * 1024, 2000),
65            Self::Balanced => (8 * 1024 * 1024, 64 * 1024 * 1024, 256 * 1024, 1000),
66            Self::LowLatency => (1024 * 1024, 16 * 1024 * 1024, 128 * 1024, 500),
67        }
68    }
69}
70
71/// Default for [`SelfRegulationConfig::enabled`] -- the governor is ON by
72/// default (opt-out via `self_regulation.enabled = false`).
73const fn default_enabled() -> bool {
74    true
75}
76
77fn default_pause_above() -> f64 {
78    0.80
79}
80
81fn default_resume_below() -> f64 {
82    0.65
83}
84
85fn default_target_rho() -> f64 {
86    0.7
87}
88
89fn default_md_factor() -> f64 {
90    0.5
91}
92
93/// Cascade-overridable settings for the self-regulation governor.
94///
95/// Loaded under the `self_regulation` key. All fields have sensible defaults
96/// so an app that sets nothing gets a fully working, default-ON governor.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(default)]
99pub struct SelfRegulationConfig {
100    /// Master switch. `true` (default) -> the runtime builds the governor and
101    /// threads it into the transports + driver. `false` -> nothing is built
102    /// (byte-identical to pre-governor behaviour).
103    pub enabled: bool,
104
105    /// Sizing profile for the AIMD byte budget.
106    pub profile: SelfRegulationProfile,
107
108    /// Hysteresis: arm the inbound hold when combined pressure reaches this.
109    pub pause_above: f64,
110
111    /// Hysteresis: release the inbound hold when pressure drops to this.
112    /// Must be strictly less than `pause_above` (validated; an invalid band
113    /// falls back to the defaults).
114    pub resume_below: f64,
115
116    /// Target utilisation `rho` for the byte-budget AIMD loop, in `(0, 1)`.
117    pub target_rho: f64,
118
119    /// Multiplicative-decrease factor for the byte budget, in `(0, 1)`.
120    pub md_factor: f64,
121}
122
123impl Default for SelfRegulationConfig {
124    fn default() -> Self {
125        Self {
126            enabled: default_enabled(),
127            profile: SelfRegulationProfile::default(),
128            pause_above: default_pause_above(),
129            resume_below: default_resume_below(),
130            target_rho: default_target_rho(),
131            md_factor: default_md_factor(),
132        }
133    }
134}
135
136impl SelfRegulationConfig {
137    /// Load from the config cascade under the `self_regulation` key, registering
138    /// the section so the `/config` admin endpoint + hot-reload see it.
139    ///
140    /// Falls back to [`SelfRegulationConfig::default()`] (default-ON) when the
141    /// cascade is not initialised or the key is absent.
142    #[must_use]
143    pub fn from_cascade() -> Self {
144        #[cfg(feature = "config")]
145        {
146            if let Some(cfg) = crate::config::try_get()
147                && let Ok(value) = cfg.unmarshal_key_registered::<Self>("self_regulation")
148            {
149                return value;
150            }
151        }
152        Self::default()
153    }
154
155    /// Build the [`Hysteresis`] band from the config.
156    ///
157    /// An inverted / non-finite band falls back to the defaults
158    /// (`0.80 / 0.65`) rather than failing -- a bad knob must not wedge the
159    /// governor.
160    #[must_use]
161    pub fn hysteresis(&self) -> Hysteresis {
162        Hysteresis::new(self.pause_above, self.resume_below).unwrap_or_else(|e| {
163            tracing::warn!(
164                error = %e,
165                "invalid self_regulation hysteresis band; using defaults 0.80/0.65"
166            );
167            Hysteresis::new(default_pause_above(), default_resume_below())
168                .expect("default band is valid")
169        })
170    }
171
172    /// Build the [`ByteBudgetConfig`] from the profile envelope + overridable
173    /// `target_rho` / `md_factor`. The controller sanitises ranges itself.
174    #[must_use]
175    pub fn byte_budget_config(&self) -> ByteBudgetConfig {
176        let (start_bytes, max_bytes, ai_step, record_cap) = self.profile.byte_budget_envelope();
177        ByteBudgetConfig {
178            start_bytes,
179            max_bytes,
180            ai_step,
181            record_cap,
182            target_rho: self.target_rho,
183            md_factor: self.md_factor,
184            ..ByteBudgetConfig::default()
185        }
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn default_is_enabled() {
195        let cfg = SelfRegulationConfig::default();
196        assert!(cfg.enabled, "governor is ON by default (opt-out)");
197        assert_eq!(cfg.profile, SelfRegulationProfile::Throughput);
198    }
199
200    #[test]
201    fn from_cascade_falls_back_to_default_on() {
202        // No cascade initialised in this unit test -> default (enabled).
203        let cfg = SelfRegulationConfig::from_cascade();
204        assert!(cfg.enabled);
205    }
206
207    #[test]
208    fn hysteresis_uses_config_band() {
209        let cfg = SelfRegulationConfig {
210            pause_above: 0.9,
211            resume_below: 0.5,
212            ..Default::default()
213        };
214        let h = cfg.hysteresis();
215        assert!((h.pause_above - 0.9).abs() < 1e-9);
216        assert!((h.resume_below - 0.5).abs() < 1e-9);
217    }
218
219    #[test]
220    fn inverted_band_falls_back_to_defaults() {
221        let cfg = SelfRegulationConfig {
222            pause_above: 0.3,
223            resume_below: 0.8, // inverted
224            ..Default::default()
225        };
226        let h = cfg.hysteresis();
227        assert!((h.pause_above - 0.80).abs() < 1e-9);
228        assert!((h.resume_below - 0.65).abs() < 1e-9);
229    }
230
231    #[test]
232    fn profile_sizes_the_byte_budget() {
233        let tp = SelfRegulationConfig {
234            profile: SelfRegulationProfile::Throughput,
235            ..Default::default()
236        }
237        .byte_budget_config();
238        let ll = SelfRegulationConfig {
239            profile: SelfRegulationProfile::LowLatency,
240            ..Default::default()
241        }
242        .byte_budget_config();
243        assert!(
244            tp.start_bytes > ll.start_bytes,
245            "throughput starts bigger than low-latency"
246        );
247        assert!(tp.max_bytes > ll.max_bytes);
248    }
249
250    #[cfg(feature = "config")]
251    #[test]
252    fn serde_roundtrip_and_disabled_parse() {
253        let yaml = "enabled: false\nprofile: low_latency\n";
254        let cfg: SelfRegulationConfig = serde_yaml_ng::from_str(yaml).unwrap();
255        assert!(!cfg.enabled);
256        assert_eq!(cfg.profile, SelfRegulationProfile::LowLatency);
257        // Defaults fill the rest.
258        assert!((cfg.pause_above - 0.80).abs() < 1e-9);
259    }
260
261    /// The governor profile must serialise as snake_case so the
262    /// `self_regulation.profile` cascade key reads identically to the Kafka
263    /// sizing profile (rustlib<->pylib config-consistency rule).
264    #[cfg(feature = "config")]
265    #[test]
266    fn profile_serialises_snake_case() {
267        let j = serde_json::to_string(&SelfRegulationProfile::LowLatency).unwrap();
268        assert_eq!(j, "\"low_latency\"");
269        let j = serde_json::to_string(&SelfRegulationProfile::Throughput).unwrap();
270        assert_eq!(j, "\"throughput\"");
271        let j = serde_json::to_string(&SelfRegulationProfile::Balanced).unwrap();
272        assert_eq!(j, "\"balanced\"");
273    }
274}