hyperi_rustlib/governor/config.rs
1// Project: hyperi-rustlib
2// File: src/governor/config.rs
3// Purpose: SelfRegulationConfig -- cascade-overridable governor settings
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Cascade-overridable configuration for the self-regulation governor.
10//!
11//! [`SelfRegulationConfig`] is the single config section that turns the
12//! data-plane governor ON (the default) or OFF. It is a sibling to
13//! [`MemoryGuardConfig`](crate::memory::MemoryGuardConfig) /
14//! [`ScalingPressureConfig`](crate::ScalingPressureConfig): loaded from the
15//! 8-layer cascade under the `self_regulation` key and registered in the
16//! config registry so the `/config` admin endpoint and hot-reload see it.
17//!
18//! # Default-ON, opt-out
19//!
20//! `enabled` defaults to `true`. When the `governor` feature is compiled in,
21//! the runtime builds the pressure governor and threads it into the
22//! transports + driver. To turn it OFF (byte-identical to pre-governor
23//! behaviour), set:
24//!
25//! ```yaml
26//! self_regulation:
27//! enabled: false
28//! ```
29//!
30//! When `enabled = false` the runtime constructs NOTHING -- no pressure, no
31//! gate, no byte-budget controller -- so every `Option` stays `None` and the
32//! data path is the original whole-batch loop.
33
34use serde::{Deserialize, Serialize};
35
36use super::{ByteBudgetConfig, Hysteresis};
37
38/// Sizing profile for the self-regulation byte budget.
39///
40/// Mirrors the three Kafka `SelfRegulationProfile` names so a single
41/// `self_regulation.profile` config value reads the same regardless of
42/// transport. It tunes the AIMD byte-budget envelope (start / ceiling /
43/// step), not the Kafka client knobs.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum SelfRegulationProfile {
47 /// Maximum throughput: large start budget + high ceiling, coarse steps.
48 #[default]
49 Throughput,
50 /// Balanced: moderate budget envelope.
51 Balanced,
52 /// Low latency: small budget envelope so blocks stay small + bursty.
53 LowLatency,
54}
55
56impl SelfRegulationProfile {
57 /// The byte-budget envelope for this profile. The hysteresis band and
58 /// target utilisation are profile-independent (set in
59 /// [`SelfRegulationConfig`]); this only sizes the AIMD lever.
60 #[must_use]
61 fn byte_budget_envelope(self) -> (u64, u64, u64, usize) {
62 // (start_bytes, max_bytes, ai_step, record_cap)
63 match self {
64 Self::Throughput => (16 * 1024 * 1024, 128 * 1024 * 1024, 512 * 1024, 2000),
65 Self::Balanced => (8 * 1024 * 1024, 64 * 1024 * 1024, 256 * 1024, 1000),
66 Self::LowLatency => (1024 * 1024, 16 * 1024 * 1024, 128 * 1024, 500),
67 }
68 }
69}
70
71/// Default for [`SelfRegulationConfig::enabled`] -- the governor is ON by
72/// default (opt-out via `self_regulation.enabled = false`).
73const fn default_enabled() -> bool {
74 true
75}
76
77fn default_pause_above() -> f64 {
78 0.80
79}
80
81fn default_resume_below() -> f64 {
82 0.65
83}
84
85fn default_target_rho() -> f64 {
86 0.7
87}
88
89fn default_md_factor() -> f64 {
90 0.5
91}
92
93/// Cascade-overridable settings for the self-regulation governor.
94///
95/// Loaded under the `self_regulation` key. All fields have sensible defaults
96/// so an app that sets nothing gets a fully working, default-ON governor.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(default)]
99pub struct SelfRegulationConfig {
100 /// Master switch. `true` (default) -> the runtime builds the governor and
101 /// threads it into the transports + driver. `false` -> nothing is built
102 /// (byte-identical to pre-governor behaviour).
103 pub enabled: bool,
104
105 /// Sizing profile for the AIMD byte budget.
106 pub profile: SelfRegulationProfile,
107
108 /// Hysteresis: arm the inbound hold when combined pressure reaches this.
109 pub pause_above: f64,
110
111 /// Hysteresis: release the inbound hold when pressure drops to this.
112 /// Must be strictly less than `pause_above` (validated; an invalid band
113 /// falls back to the defaults).
114 pub resume_below: f64,
115
116 /// Target utilisation `rho` for the byte-budget AIMD loop, in `(0, 1)`.
117 pub target_rho: f64,
118
119 /// Multiplicative-decrease factor for the byte budget, in `(0, 1)`.
120 pub md_factor: f64,
121}
122
123impl Default for SelfRegulationConfig {
124 fn default() -> Self {
125 Self {
126 enabled: default_enabled(),
127 profile: SelfRegulationProfile::default(),
128 pause_above: default_pause_above(),
129 resume_below: default_resume_below(),
130 target_rho: default_target_rho(),
131 md_factor: default_md_factor(),
132 }
133 }
134}
135
136impl SelfRegulationConfig {
137 /// Load from the config cascade under the `self_regulation` key, registering
138 /// the section so the `/config` admin endpoint + hot-reload see it.
139 ///
140 /// Falls back to [`SelfRegulationConfig::default()`] (default-ON) when the
141 /// cascade is not initialised or the key is absent.
142 #[must_use]
143 pub fn from_cascade() -> Self {
144 #[cfg(feature = "config")]
145 {
146 if let Some(cfg) = crate::config::try_get()
147 && let Ok(value) = cfg.unmarshal_key_registered::<Self>("self_regulation")
148 {
149 return value;
150 }
151 }
152 Self::default()
153 }
154
155 /// Build the [`Hysteresis`] band from the config.
156 ///
157 /// An inverted / non-finite band falls back to the defaults
158 /// (`0.80 / 0.65`) rather than failing -- a bad knob must not wedge the
159 /// governor.
160 #[must_use]
161 pub fn hysteresis(&self) -> Hysteresis {
162 Hysteresis::new(self.pause_above, self.resume_below).unwrap_or_else(|e| {
163 tracing::warn!(
164 error = %e,
165 "invalid self_regulation hysteresis band; using defaults 0.80/0.65"
166 );
167 Hysteresis::new(default_pause_above(), default_resume_below())
168 .expect("default band is valid")
169 })
170 }
171
172 /// Build the [`ByteBudgetConfig`] from the profile envelope + overridable
173 /// `target_rho` / `md_factor`. The controller sanitises ranges itself.
174 #[must_use]
175 pub fn byte_budget_config(&self) -> ByteBudgetConfig {
176 let (start_bytes, max_bytes, ai_step, record_cap) = self.profile.byte_budget_envelope();
177 ByteBudgetConfig {
178 start_bytes,
179 max_bytes,
180 ai_step,
181 record_cap,
182 target_rho: self.target_rho,
183 md_factor: self.md_factor,
184 ..ByteBudgetConfig::default()
185 }
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn default_is_enabled() {
195 let cfg = SelfRegulationConfig::default();
196 assert!(cfg.enabled, "governor is ON by default (opt-out)");
197 assert_eq!(cfg.profile, SelfRegulationProfile::Throughput);
198 }
199
200 #[test]
201 fn from_cascade_falls_back_to_default_on() {
202 // No cascade initialised in this unit test -> default (enabled).
203 let cfg = SelfRegulationConfig::from_cascade();
204 assert!(cfg.enabled);
205 }
206
207 #[test]
208 fn hysteresis_uses_config_band() {
209 let cfg = SelfRegulationConfig {
210 pause_above: 0.9,
211 resume_below: 0.5,
212 ..Default::default()
213 };
214 let h = cfg.hysteresis();
215 assert!((h.pause_above - 0.9).abs() < 1e-9);
216 assert!((h.resume_below - 0.5).abs() < 1e-9);
217 }
218
219 #[test]
220 fn inverted_band_falls_back_to_defaults() {
221 let cfg = SelfRegulationConfig {
222 pause_above: 0.3,
223 resume_below: 0.8, // inverted
224 ..Default::default()
225 };
226 let h = cfg.hysteresis();
227 assert!((h.pause_above - 0.80).abs() < 1e-9);
228 assert!((h.resume_below - 0.65).abs() < 1e-9);
229 }
230
231 #[test]
232 fn profile_sizes_the_byte_budget() {
233 let tp = SelfRegulationConfig {
234 profile: SelfRegulationProfile::Throughput,
235 ..Default::default()
236 }
237 .byte_budget_config();
238 let ll = SelfRegulationConfig {
239 profile: SelfRegulationProfile::LowLatency,
240 ..Default::default()
241 }
242 .byte_budget_config();
243 assert!(
244 tp.start_bytes > ll.start_bytes,
245 "throughput starts bigger than low-latency"
246 );
247 assert!(tp.max_bytes > ll.max_bytes);
248 }
249
250 #[cfg(feature = "config")]
251 #[test]
252 fn serde_roundtrip_and_disabled_parse() {
253 let yaml = "enabled: false\nprofile: low_latency\n";
254 let cfg: SelfRegulationConfig = serde_yaml_ng::from_str(yaml).unwrap();
255 assert!(!cfg.enabled);
256 assert_eq!(cfg.profile, SelfRegulationProfile::LowLatency);
257 // Defaults fill the rest.
258 assert!((cfg.pause_above - 0.80).abs() < 1e-9);
259 }
260
261 /// The governor profile must serialise as snake_case so the
262 /// `self_regulation.profile` cascade key reads identically to the Kafka
263 /// sizing profile (rustlib<->pylib config-consistency rule).
264 #[cfg(feature = "config")]
265 #[test]
266 fn profile_serialises_snake_case() {
267 let j = serde_json::to_string(&SelfRegulationProfile::LowLatency).unwrap();
268 assert_eq!(j, "\"low_latency\"");
269 let j = serde_json::to_string(&SelfRegulationProfile::Throughput).unwrap();
270 assert_eq!(j, "\"throughput\"");
271 let j = serde_json::to_string(&SelfRegulationProfile::Balanced).unwrap();
272 assert_eq!(j, "\"balanced\"");
273 }
274}