hyperi_rustlib/governor/budget.rs
1// Project: hyperi-rustlib
2// File: src/governor/budget.rs
3// Purpose: Byte-budget controller: AIMD lever with memory HARD override
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Byte-budget controller: the self-regulation lever.
10//!
11//! Sizes the inbound byte budget so the stage runs at a target
12//! utilisation `rho ~= 0.7` -- busy enough to be efficient, with enough
13//! headroom that a burst does not blow the buffer. The loop is AIMD
14//! (additive-increase / multiplicative-decrease), the classic congestion
15//! lever:
16//!
17//! - `rho = EMA(process_time) / EMA(ingest_interval)` -- how much of the
18//! inter-arrival gap the stage spends processing. `rho < target` means
19//! slack (we can pull more); `rho > target` means we are falling
20//! behind.
21//! - **slack** (`rho < target`): additive-increase the budget by
22//! `ai_step`, capped at `max_bytes`.
23//! - **behind** (`rho > target`): multiplicative-decrease the budget by
24//! `md_factor` (`< 1`).
25//! - **memory HARD override**: if the pressure latch says hold (or the
26//! memory source reads high), multiplicative-decrease IMMEDIATELY,
27//! regardless of rho. Memory never waits for the rho loop.
28//! - **floor**: the budget never drops below `min_bytes` (derived from
29//! `floor_records`) and never reaches `0`. The [`record_cap`] poll
30//! safety cap stays `>= 1`.
31//!
32//! [`record_cap`]: ByteBudgetController::record_cap
33//!
34//! It starts BIG (`start_bytes`) and lets the decrease loop find the
35//! right level, rather than starting small and ramping -- a cold pipeline
36//! should not be artificially throttled.
37//!
38//! Additive and default-off (the `governor` feature). NOT wired into any
39//! driver or recv loop here; that lands in a later phase.
40
41use std::sync::Arc;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::time::Duration;
44
45use super::source::UnifiedPressure;
46
47/// EMA smoothing factor for the process-time / ingest-interval signals.
48const DEFAULT_EMA_ALPHA: f64 = 0.3;
49
50/// Target utilisation: keep the stage ~70% busy, 30% headroom.
51const DEFAULT_TARGET_RHO: f64 = 0.7;
52
53/// Configuration for the [`ByteBudgetController`].
54///
55/// All byte values are in bytes. The controller starts at `start_bytes`
56/// and moves between `min_bytes` (derived from `floor_records`) and
57/// `max_bytes`.
58#[derive(Debug, Clone, Copy)]
59pub struct ByteBudgetConfig {
60 /// Initial budget. Starts BIG so a cold pipeline is not throttled.
61 pub start_bytes: u64,
62 /// Hard ceiling on the budget (additive-increase saturates here).
63 pub max_bytes: u64,
64 /// Floor in records: the budget never drops below
65 /// `floor_records * nominal_record_bytes` (and never below 1 byte).
66 pub floor_records: u64,
67 /// Nominal per-record size used to derive the byte floor from
68 /// `floor_records`. Record sizes vary at runtime; this is only the
69 /// floor estimate, not a live measurement.
70 pub nominal_record_bytes: u64,
71 /// Target utilisation `rho` in `(0, 1)`. Default `0.7`.
72 pub target_rho: f64,
73 /// Additive-increase step (bytes added per slack observation).
74 pub ai_step: u64,
75 /// Multiplicative-decrease factor in `(0, 1)`. Default e.g. `0.5`.
76 pub md_factor: f64,
77 /// EMA smoothing factor in `(0, 1]` for the timing signals.
78 pub ema_alpha: f64,
79 /// Poll-safety cap on record count, independent of the byte budget.
80 /// A tiny-record flood cannot blow the count even within budget.
81 pub record_cap: usize,
82}
83
84impl Default for ByteBudgetConfig {
85 fn default() -> Self {
86 Self {
87 // 8 MiB start, 64 MiB ceiling -- generous defaults; a real
88 // deployment tunes these via config in a later phase.
89 start_bytes: 8 * 1024 * 1024,
90 max_bytes: 64 * 1024 * 1024,
91 floor_records: 1,
92 nominal_record_bytes: 1024,
93 target_rho: DEFAULT_TARGET_RHO,
94 ai_step: 256 * 1024,
95 md_factor: 0.5,
96 ema_alpha: DEFAULT_EMA_ALPHA,
97 record_cap: 2000,
98 }
99 }
100}
101
102impl ByteBudgetConfig {
103 /// The derived absolute byte floor: `floor_records * nominal_record_bytes`,
104 /// clamped to at least `1` (the budget is never `0`).
105 #[must_use]
106 fn min_bytes(&self) -> u64 {
107 self.floor_records
108 .saturating_mul(self.nominal_record_bytes)
109 .max(1)
110 }
111
112 /// Sanitise the config so the control loop cannot misbehave: clamp
113 /// `target_rho` and `ema_alpha` into their open ranges, force
114 /// `md_factor` into `(0, 1)`, and ensure `record_cap >= 1`,
115 /// `max_bytes >= min_bytes`, and `start_bytes` inside `[min, max]`.
116 fn sanitised(mut self) -> Self {
117 if !self.target_rho.is_finite() || self.target_rho <= 0.0 || self.target_rho >= 1.0 {
118 self.target_rho = DEFAULT_TARGET_RHO;
119 }
120 if !self.ema_alpha.is_finite() || self.ema_alpha <= 0.0 || self.ema_alpha > 1.0 {
121 self.ema_alpha = DEFAULT_EMA_ALPHA;
122 }
123 if !self.md_factor.is_finite() || self.md_factor <= 0.0 || self.md_factor >= 1.0 {
124 self.md_factor = 0.5;
125 }
126 self.record_cap = self.record_cap.max(1);
127 let min = self.min_bytes();
128 self.max_bytes = self.max_bytes.max(min);
129 self.start_bytes = self.start_bytes.clamp(min, self.max_bytes);
130 self
131 }
132}
133
134/// Stores an `f64` as an atomic bit-pattern so the controller stays
135/// `Sync` with interior mutability and no lock (the crate forbids
136/// `unsafe`, so a `Cell` would not be `Sync`).
137struct AtomicF64(AtomicU64);
138
139impl AtomicF64 {
140 fn new(value: f64) -> Self {
141 Self(AtomicU64::new(value.to_bits()))
142 }
143 fn load(&self) -> f64 {
144 f64::from_bits(self.0.load(Ordering::Relaxed))
145 }
146 fn store(&self, value: f64) {
147 self.0.store(value.to_bits(), Ordering::Relaxed);
148 }
149}
150
151/// AIMD byte-budget lever with a memory HARD override.
152///
153/// See the [module docs](crate::governor) for the algorithm. `observe()`
154/// is the control step; `byte_budget()` and `record_cap()` are the cheap reads
155/// the recv loop consults. All state is interior-mutable and `Sync`.
156pub struct ByteBudgetController {
157 cfg: ByteBudgetConfig,
158 pressure: Arc<UnifiedPressure>,
159 /// EMA of observed batch process time, in seconds.
160 ema_process_s: AtomicF64,
161 /// EMA of observed ingest inter-arrival interval, in seconds.
162 ema_ingest_s: AtomicF64,
163 /// Whether any timing observation has been folded in yet.
164 seeded: std::sync::atomic::AtomicBool,
165 /// Current byte budget.
166 budget: AtomicU64,
167}
168
169impl ByteBudgetController {
170 /// Build a controller from config and a shared pressure latch.
171 ///
172 /// The config is sanitised (ranges clamped, floors enforced); the
173 /// budget starts at the sanitised `start_bytes`.
174 #[must_use]
175 pub fn new(cfg: ByteBudgetConfig, pressure: Arc<UnifiedPressure>) -> Self {
176 let cfg = cfg.sanitised();
177 Self {
178 budget: AtomicU64::new(cfg.start_bytes),
179 ema_process_s: AtomicF64::new(0.0),
180 ema_ingest_s: AtomicF64::new(0.0),
181 seeded: std::sync::atomic::AtomicBool::new(false),
182 cfg,
183 pressure,
184 }
185 }
186
187 /// Fold one observation into the control loop and update the budget.
188 ///
189 /// `batch_bytes` is currently informational (the loop drives off
190 /// timing, not size); `process_time` is how long the batch took to
191 /// process and `ingest_interval` is the gap since the previous
192 /// batch's arrival.
193 ///
194 /// Steps, in order:
195 /// 1. EMA-smooth `process_time` and `ingest_interval`.
196 /// 2. Compute `rho = ema_process / ema_ingest`. A zero (or
197 /// sub-resolution) `ingest_interval` means arrivals are
198 /// back-to-back faster than we process -- treat rho as high
199 /// (behind), which is the safe direction (shrink).
200 /// 3. If memory says hold (the HARD override), multiplicative-decrease
201 /// and return -- memory never waits for the rho loop.
202 /// 4. Otherwise AIMD on rho vs `target_rho`.
203 /// 5. Clamp to `[min_bytes, max_bytes]`.
204 pub fn observe(&self, batch_bytes: u64, process_time: Duration, ingest_interval: Duration) {
205 let _ = batch_bytes; // reserved for a future size-aware refinement
206
207 let alpha = self.cfg.ema_alpha;
208 let proc_s = process_time.as_secs_f64();
209 let ingest_s = ingest_interval.as_secs_f64();
210
211 // Step 1: EMA. Seed on the first observation so the average is not
212 // dragged from a 0.0 cold start.
213 if self.seeded.swap(true, Ordering::Relaxed) {
214 let new_proc = alpha.mul_add(proc_s, (1.0 - alpha) * self.ema_process_s.load());
215 let new_ingest = alpha.mul_add(ingest_s, (1.0 - alpha) * self.ema_ingest_s.load());
216 self.ema_process_s.store(new_proc);
217 self.ema_ingest_s.store(new_ingest);
218 } else {
219 self.ema_process_s.store(proc_s);
220 self.ema_ingest_s.store(ingest_s);
221 }
222
223 // Step 3: memory HARD override takes precedence over rho entirely.
224 if self.pressure.should_hold() {
225 self.multiplicative_decrease();
226 return;
227 }
228
229 // Step 2: rho. Guard div-by-zero: a non-positive ingest EMA means
230 // arrivals outrun processing -> treat as "behind" (shrink).
231 let ema_ingest = self.ema_ingest_s.load();
232 let ema_process = self.ema_process_s.load();
233 let behind = if ema_ingest <= f64::EPSILON {
234 // Back-to-back arrivals with any processing cost -> behind.
235 // Pure-zero processing AND zero ingest -> nothing to do.
236 ema_process > 0.0
237 } else {
238 (ema_process / ema_ingest) > self.cfg.target_rho
239 };
240
241 // Step 4: AIMD.
242 if behind {
243 self.multiplicative_decrease();
244 } else {
245 self.additive_increase();
246 }
247 }
248
249 /// Additive-increase: budget += ai_step, saturating at `max_bytes`.
250 fn additive_increase(&self) {
251 let cur = self.budget.load(Ordering::Relaxed);
252 let next = cur.saturating_add(self.cfg.ai_step).min(self.cfg.max_bytes);
253 self.budget.store(next, Ordering::Relaxed);
254 }
255
256 /// Multiplicative-decrease: budget *= md_factor, clamped to the floor
257 /// and never `0`.
258 fn multiplicative_decrease(&self) {
259 let cur = self.budget.load(Ordering::Relaxed);
260 // f64 math then back to u64; budgets are well under 2^52 so this is
261 // lossless in the operating range. `floor()` then clamp.
262 #[allow(
263 clippy::cast_precision_loss,
264 clippy::cast_sign_loss,
265 clippy::cast_possible_truncation
266 )]
267 let scaled = (cur as f64 * self.cfg.md_factor).floor() as u64;
268 let next = scaled.max(self.cfg.min_bytes());
269 self.budget.store(next, Ordering::Relaxed);
270 }
271
272 /// Current byte budget. Always `>= min_bytes`, never `0`.
273 #[must_use]
274 pub fn byte_budget(&self) -> u64 {
275 self.budget.load(Ordering::Relaxed)
276 }
277
278 /// Poll-safety record cap (recv max count), independent of the byte
279 /// budget. Always `>= 1` so a tiny-record flood cannot blow the count
280 /// even when many records fit inside the byte budget.
281 #[must_use]
282 pub fn record_cap(&self) -> usize {
283 self.cfg.record_cap
284 }
285
286 /// The shared pressure governor this controller drives off. Lets a caller
287 /// (e.g. the governed driver) read the combined
288 /// [`level`](UnifiedPressure::level) for the `pressure_ratio` gauge without
289 /// holding a second `Arc`.
290 #[must_use]
291 pub fn pressure(&self) -> &Arc<UnifiedPressure> {
292 &self.pressure
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use crate::governor::source::{Hysteresis, Pressure, PressureSource};
300 use std::sync::atomic::AtomicU64 as StdAtomicU64;
301
302 /// Scriptable HARD source so the test can force `should_hold()`.
303 struct MockSource {
304 value: StdAtomicU64,
305 }
306 impl MockSource {
307 fn new(value: f64) -> Self {
308 Self {
309 value: StdAtomicU64::new(value.to_bits()),
310 }
311 }
312 fn set(&self, value: f64) {
313 self.value.store(value.to_bits(), Ordering::Relaxed);
314 }
315 }
316 impl PressureSource for MockSource {
317 fn name(&self) -> &'static str {
318 "mock"
319 }
320 fn sample(&self) -> Pressure {
321 Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
322 }
323 fn is_hard(&self) -> bool {
324 true
325 }
326 }
327
328 fn controller(
329 cfg: ByteBudgetConfig,
330 src: &Arc<MockSource>,
331 ) -> (ByteBudgetController, Arc<UnifiedPressure>) {
332 let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
333 let pressure = Arc::new(UnifiedPressure::new(
334 vec![Arc::clone(src) as Arc<dyn PressureSource>],
335 hyst,
336 ));
337 (
338 ByteBudgetController::new(cfg, Arc::clone(&pressure)),
339 pressure,
340 )
341 }
342
343 fn test_cfg() -> ByteBudgetConfig {
344 ByteBudgetConfig {
345 start_bytes: 10_000,
346 max_bytes: 100_000,
347 floor_records: 1,
348 nominal_record_bytes: 1000, // min_bytes = 1000
349 target_rho: 0.7,
350 ai_step: 5_000,
351 md_factor: 0.5,
352 ema_alpha: 1.0, // alpha=1 -> EMA == latest sample (deterministic)
353 record_cap: 2000,
354 }
355 }
356
357 fn ms(n: u64) -> Duration {
358 Duration::from_millis(n)
359 }
360
361 #[test]
362 fn starts_big_at_start_bytes() {
363 let src = Arc::new(MockSource::new(0.0));
364 let (ctl, _p) = controller(test_cfg(), &src);
365 assert_eq!(ctl.byte_budget(), 10_000);
366 assert!(ctl.record_cap() >= 1);
367 assert_eq!(ctl.record_cap(), 2000);
368 }
369
370 /// rho < 0.7 (slack) -> budget grows additively, monotone up, capped.
371 #[test]
372 fn slack_grows_budget_additively_and_caps() {
373 let src = Arc::new(MockSource::new(0.0)); // no memory pressure
374 let (ctl, _p) = controller(test_cfg(), &src);
375
376 // process 10ms, ingest 100ms -> rho = 0.1 < 0.7 -> slack.
377 let mut last = ctl.byte_budget();
378 for _ in 0..50 {
379 ctl.observe(500, ms(10), ms(100));
380 let now = ctl.byte_budget();
381 assert!(now >= last, "budget must be monotone up under slack");
382 last = now;
383 }
384 // ai_step 5000, start 10_000, cap 100_000 -> saturates at the cap.
385 assert_eq!(ctl.byte_budget(), 100_000, "additive-increase caps at max");
386 }
387
388 /// rho > 0.7 (behind) -> budget shrinks multiplicatively toward floor.
389 #[test]
390 fn behind_shrinks_budget_multiplicatively() {
391 let src = Arc::new(MockSource::new(0.0));
392 let (ctl, _p) = controller(test_cfg(), &src);
393
394 // process 90ms, ingest 100ms -> rho = 0.9 > 0.7 -> behind.
395 let first = ctl.byte_budget();
396 ctl.observe(500, ms(90), ms(100));
397 let after = ctl.byte_budget();
398 assert!(after < first, "behind must shrink the budget");
399 // 10_000 * 0.5 = 5_000.
400 assert_eq!(after, 5_000);
401
402 // Keep going -> shrinks toward the 1000-byte floor, never below.
403 for _ in 0..20 {
404 ctl.observe(500, ms(90), ms(100));
405 }
406 assert_eq!(ctl.byte_budget(), 1_000, "shrink clamps to min_bytes");
407 assert!(ctl.byte_budget() >= 1, "never zero");
408 }
409
410 /// THE adversarial test: memory HARD override beats rho.
411 ///
412 /// Even with rho deep in slack (would grow), forcing memory pressure
413 /// high (`should_hold()` true) must multiplicative-decrease the budget
414 /// IMMEDIATELY, toward the floor, never to zero.
415 #[test]
416 fn memory_pressure_overrides_rho_and_shrinks_to_floor() {
417 let src = Arc::new(MockSource::new(0.0));
418 let (ctl, _p) = controller(test_cfg(), &src);
419
420 // Grow a bit first under slack so there is room to shrink.
421 ctl.observe(500, ms(10), ms(100)); // -> 15_000
422 ctl.observe(500, ms(10), ms(100)); // -> 20_000
423 assert_eq!(ctl.byte_budget(), 20_000);
424
425 // Now SLAM memory high. rho is still deep slack (10ms/100ms) but
426 // the HARD override must win and SHRINK.
427 src.set(0.95);
428 let before = ctl.byte_budget();
429 ctl.observe(500, ms(10), ms(100));
430 let after = ctl.byte_budget();
431 assert!(
432 after < before,
433 "memory override must shrink even when rho says slack"
434 );
435 assert_eq!(after, 10_000, "20_000 * 0.5 under override");
436
437 // Sustained pressure drives toward the floor, never zero.
438 for _ in 0..20 {
439 ctl.observe(500, ms(10), ms(100));
440 }
441 assert_eq!(ctl.byte_budget(), 1_000, "override clamps to floor");
442 assert!(ctl.byte_budget() >= 1);
443 assert!(ctl.record_cap() >= 1);
444 }
445
446 /// ingest_interval == 0 must not panic or divide-by-zero, and must be
447 /// treated as "behind" (shrink) -- the safe direction.
448 #[test]
449 fn zero_ingest_interval_is_safe_and_treated_as_behind() {
450 let src = Arc::new(MockSource::new(0.0));
451 let (ctl, _p) = controller(test_cfg(), &src);
452
453 let before = ctl.byte_budget();
454 // Zero ingest interval, non-zero processing -> behind -> shrink.
455 ctl.observe(500, ms(5), Duration::ZERO);
456 let after = ctl.byte_budget();
457 assert!(after <= before, "zero ingest must not grow the budget");
458 assert_eq!(after, 5_000, "treated as behind -> multiplicative-decrease");
459 assert!(ctl.byte_budget() >= 1);
460
461 // Both zero -> nothing to do (no processing cost, no arrivals):
462 // must not panic and must not collapse the budget below the floor.
463 let cur = ctl.byte_budget();
464 ctl.observe(0, Duration::ZERO, Duration::ZERO);
465 // alpha=1 so ema_process becomes 0 -> not behind -> additive-increase.
466 assert!(ctl.byte_budget() >= cur, "both-zero is no-pressure slack");
467 }
468
469 /// Config sanitisation: garbage ranges fall back to safe defaults and
470 /// the budget never starts at or reaches zero.
471 #[test]
472 fn config_is_sanitised() {
473 let src = Arc::new(MockSource::new(0.0));
474 let bad = ByteBudgetConfig {
475 start_bytes: 0, // below floor
476 max_bytes: 0, // below floor
477 floor_records: 2,
478 nominal_record_bytes: 500, // min_bytes = 1000
479 target_rho: 5.0, // out of range -> default 0.7
480 ai_step: 1_000,
481 md_factor: 2.0, // out of range -> default 0.5
482 ema_alpha: 0.0, // out of range -> default
483 record_cap: 0, // -> clamped to 1
484 };
485 let (ctl, _p) = controller(bad, &src);
486 assert_eq!(ctl.byte_budget(), 1_000, "start clamped up to min_bytes");
487 assert!(ctl.record_cap() >= 1);
488 }
489}