hyperi_rustlib/governor/config.rs
1// Project: hyperi-rustlib
2// File: src/governor/config.rs
3// Purpose: SelfRegulationConfig -- cascade-overridable governor settings
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Cascade-overridable configuration for the self-regulation governor.
10//!
11//! [`SelfRegulationConfig`] is the single config section that turns the
12//! data-plane governor ON (the default) or OFF. It is a sibling to
13//! [`MemoryGuardConfig`](crate::memory::MemoryGuardConfig) /
14//! [`ScalingPressureConfig`](crate::ScalingPressureConfig): loaded from the
15//! 8-layer cascade under the `self_regulation` key and registered in the
16//! config registry so the `/config` admin endpoint and hot-reload see it.
17//!
18//! # Default-ON, opt-out
19//!
20//! `enabled` defaults to `true`. When the `governor` feature is compiled in,
21//! the runtime builds the pressure governor and threads it into the
22//! transports + driver. To turn it OFF (byte-identical to pre-governor
23//! behaviour), set:
24//!
25//! ```yaml
26//! self_regulation:
27//! enabled: false
28//! ```
29//!
30//! When `enabled = false` the runtime constructs NOTHING -- no pressure, no
31//! gate, no byte-budget controller -- so every `Option` stays `None` and the
32//! data path is the original whole-batch loop.
33
34use serde::{Deserialize, Serialize};
35
36use super::{ByteBudgetConfig, Hysteresis};
37
38/// Sizing profile for the self-regulation byte budget.
39///
40/// Mirrors the three Kafka `SelfRegulationProfile` names so a single
41/// `self_regulation.profile` config value reads the same regardless of
42/// transport. It tunes the AIMD byte-budget envelope (start / ceiling /
43/// step), not the Kafka client knobs.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum SelfRegulationProfile {
47 /// Maximum throughput: large start budget + high ceiling, coarse steps.
48 #[default]
49 Throughput,
50 /// Balanced: moderate budget envelope.
51 Balanced,
52 /// Low latency: small budget envelope so blocks stay small + bursty.
53 LowLatency,
54}
55
56impl SelfRegulationProfile {
57 /// The byte-budget envelope for this profile. The hysteresis band and
58 /// target utilisation are profile-independent (set in
59 /// [`SelfRegulationConfig`]); this only sizes the AIMD lever.
60 #[must_use]
61 fn byte_budget_envelope(self) -> (u64, u64, u64, usize) {
62 // (start_bytes, max_bytes, ai_step, record_cap)
63 match self {
64 Self::Throughput => (16 * 1024 * 1024, 128 * 1024 * 1024, 512 * 1024, 2000),
65 Self::Balanced => (8 * 1024 * 1024, 64 * 1024 * 1024, 256 * 1024, 1000),
66 Self::LowLatency => (1024 * 1024, 16 * 1024 * 1024, 128 * 1024, 500),
67 }
68 }
69}
70
71/// Default for [`SelfRegulationConfig::enabled`] -- the governor is ON by
72/// default (opt-out via `self_regulation.enabled = false`).
73const fn default_enabled() -> bool {
74 true
75}
76
77fn default_pause_above() -> f64 {
78 0.80
79}
80
81fn default_resume_below() -> f64 {
82 0.65
83}
84
85fn default_target_rho() -> f64 {
86 0.7
87}
88
89fn default_md_factor() -> f64 {
90 0.5
91}
92
93/// Cascade-overridable settings for the self-regulation governor.
94///
95/// Loaded under the `self_regulation` key. All fields have sensible defaults
96/// so an app that sets nothing gets a fully working, default-ON governor.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(default)]
99pub struct SelfRegulationConfig {
100 /// Master switch. `true` (default) -> the runtime builds the governor and
101 /// threads it into the transports + driver. `false` -> nothing is built
102 /// (byte-identical to pre-governor behaviour).
103 pub enabled: bool,
104
105 /// Sizing profile for the AIMD byte budget.
106 pub profile: SelfRegulationProfile,
107
108 /// Hysteresis: arm the inbound hold when combined pressure reaches this.
109 pub pause_above: f64,
110
111 /// Hysteresis: release the inbound hold when pressure drops to this.
112 /// Must be strictly less than `pause_above` (validated; an invalid band
113 /// falls back to the defaults).
114 pub resume_below: f64,
115
116 /// Target utilisation `rho` for the byte-budget AIMD loop, in `(0, 1)`.
117 pub target_rho: f64,
118
119 /// Multiplicative-decrease factor for the byte budget, in `(0, 1)`.
120 pub md_factor: f64,
121}
122
123impl Default for SelfRegulationConfig {
124 fn default() -> Self {
125 Self {
126 enabled: default_enabled(),
127 profile: SelfRegulationProfile::default(),
128 pause_above: default_pause_above(),
129 resume_below: default_resume_below(),
130 target_rho: default_target_rho(),
131 md_factor: default_md_factor(),
132 }
133 }
134}
135
136impl SelfRegulationConfig {
137 /// Load from the config cascade under the `self_regulation` key, registering
138 /// the section so the `/config` admin endpoint + hot-reload see it.
139 ///
140 /// Falls back to [`SelfRegulationConfig::default()`] (default-ON) when the
141 /// cascade is not initialised or the key is absent.
142 #[must_use]
143 pub fn from_cascade() -> Self {
144 #[cfg(feature = "config")]
145 {
146 // `or_warn`: absent `self_regulation` key -> default-ON (silent);
147 // present-but-malformed -> WARN + default (was silently swallowed
148 // pre-2.8.11). Absent-key default-ON behaviour is unchanged.
149 if let Some(cfg) = crate::config::try_get()
150 && let Some(value) = cfg.unmarshal_key_registered_or_warn::<Self>("self_regulation")
151 {
152 return value;
153 }
154 }
155 Self::default()
156 }
157
158 /// Build the [`Hysteresis`] band from the config.
159 ///
160 /// An inverted / non-finite band falls back to the defaults
161 /// (`0.80 / 0.65`) rather than failing -- a bad knob must not wedge the
162 /// governor.
163 #[must_use]
164 pub fn hysteresis(&self) -> Hysteresis {
165 Hysteresis::new(self.pause_above, self.resume_below).unwrap_or_else(|e| {
166 tracing::warn!(
167 error = %e,
168 "invalid self_regulation hysteresis band; using defaults 0.80/0.65"
169 );
170 Hysteresis::new(default_pause_above(), default_resume_below())
171 .expect("default band is valid")
172 })
173 }
174
175 /// Build the [`ByteBudgetConfig`] from the profile envelope + overridable
176 /// `target_rho` / `md_factor`. The controller sanitises ranges itself.
177 #[must_use]
178 pub fn byte_budget_config(&self) -> ByteBudgetConfig {
179 let (start_bytes, max_bytes, ai_step, record_cap) = self.profile.byte_budget_envelope();
180 ByteBudgetConfig {
181 start_bytes,
182 max_bytes,
183 ai_step,
184 record_cap,
185 target_rho: self.target_rho,
186 md_factor: self.md_factor,
187 ..ByteBudgetConfig::default()
188 }
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn default_is_enabled() {
198 let cfg = SelfRegulationConfig::default();
199 assert!(cfg.enabled, "governor is ON by default (opt-out)");
200 assert_eq!(cfg.profile, SelfRegulationProfile::Throughput);
201 }
202
203 #[test]
204 fn from_cascade_falls_back_to_default_on() {
205 // No cascade initialised in this unit test -> default (enabled).
206 let cfg = SelfRegulationConfig::from_cascade();
207 assert!(cfg.enabled);
208 }
209
210 #[test]
211 fn hysteresis_uses_config_band() {
212 let cfg = SelfRegulationConfig {
213 pause_above: 0.9,
214 resume_below: 0.5,
215 ..Default::default()
216 };
217 let h = cfg.hysteresis();
218 assert!((h.pause_above - 0.9).abs() < 1e-9);
219 assert!((h.resume_below - 0.5).abs() < 1e-9);
220 }
221
222 #[test]
223 fn inverted_band_falls_back_to_defaults() {
224 let cfg = SelfRegulationConfig {
225 pause_above: 0.3,
226 resume_below: 0.8, // inverted
227 ..Default::default()
228 };
229 let h = cfg.hysteresis();
230 assert!((h.pause_above - 0.80).abs() < 1e-9);
231 assert!((h.resume_below - 0.65).abs() < 1e-9);
232 }
233
234 /// An out-of-`[0,1]` band (here a negative resume that could never release
235 /// the latch) must fall back to the safe defaults, not wedge the governor.
236 #[test]
237 fn out_of_range_band_falls_back_to_defaults() {
238 let cfg = SelfRegulationConfig {
239 pause_above: 0.9,
240 resume_below: -0.2, // below the pressure clamp floor
241 ..Default::default()
242 };
243 let h = cfg.hysteresis();
244 assert!((h.pause_above - 0.80).abs() < 1e-9);
245 assert!((h.resume_below - 0.65).abs() < 1e-9);
246 }
247
248 #[test]
249 fn profile_sizes_the_byte_budget() {
250 let tp = SelfRegulationConfig {
251 profile: SelfRegulationProfile::Throughput,
252 ..Default::default()
253 }
254 .byte_budget_config();
255 let ll = SelfRegulationConfig {
256 profile: SelfRegulationProfile::LowLatency,
257 ..Default::default()
258 }
259 .byte_budget_config();
260 assert!(
261 tp.start_bytes > ll.start_bytes,
262 "throughput starts bigger than low-latency"
263 );
264 assert!(tp.max_bytes > ll.max_bytes);
265 }
266
267 #[cfg(feature = "config")]
268 #[test]
269 fn serde_roundtrip_and_disabled_parse() {
270 let yaml = "enabled: false\nprofile: low_latency\n";
271 let cfg: SelfRegulationConfig = serde_yaml_ng::from_str(yaml).unwrap();
272 assert!(!cfg.enabled);
273 assert_eq!(cfg.profile, SelfRegulationProfile::LowLatency);
274 // Defaults fill the rest.
275 assert!((cfg.pause_above - 0.80).abs() < 1e-9);
276 }
277
278 /// The governor profile must serialise as snake_case so the
279 /// `self_regulation.profile` cascade key reads identically to the Kafka
280 /// sizing profile (rustlib<->pylib config-consistency rule).
281 #[cfg(feature = "config")]
282 #[test]
283 fn profile_serialises_snake_case() {
284 let j = serde_json::to_string(&SelfRegulationProfile::LowLatency).unwrap();
285 assert_eq!(j, "\"low_latency\"");
286 let j = serde_json::to_string(&SelfRegulationProfile::Throughput).unwrap();
287 assert_eq!(j, "\"throughput\"");
288 let j = serde_json::to_string(&SelfRegulationProfile::Balanced).unwrap();
289 assert_eq!(j, "\"balanced\"");
290 }
291}