Skip to main content

hyperi_rustlib/governor/
budget.rs

1// Project:   hyperi-rustlib
2// File:      src/governor/budget.rs
3// Purpose:   Byte-budget controller: AIMD lever with memory HARD override
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Byte-budget controller: the self-regulation lever.
10//!
11//! Sizes the inbound byte budget so the stage runs at a target
12//! utilisation `rho ~= 0.7` -- busy enough to be efficient, with enough
13//! headroom that a burst does not blow the buffer. The loop is AIMD
14//! (additive-increase / multiplicative-decrease), the classic congestion
15//! lever:
16//!
17//! - `rho = EMA(process_time) / EMA(ingest_interval)` -- how much of the
18//!   inter-arrival gap the stage spends processing. `rho < target` means
19//!   slack (we can pull more); `rho > target` means we are falling
20//!   behind.
21//! - **slack** (`rho < target`): additive-increase the budget by
22//!   `ai_step`, capped at `max_bytes`.
23//! - **behind** (`rho > target`): multiplicative-decrease the budget by
24//!   `md_factor` (`< 1`).
25//! - **memory HARD override**: if the pressure latch says hold (or the
26//!   memory source reads high), multiplicative-decrease IMMEDIATELY,
27//!   regardless of rho. Memory never waits for the rho loop.
28//! - **floor**: the budget never drops below `min_bytes` (derived from
29//!   `floor_records`) and never reaches `0`. The [`record_cap`] poll
30//!   safety cap stays `>= 1`.
31//!
32//! [`record_cap`]: ByteBudgetController::record_cap
33//!
34//! It starts BIG (`start_bytes`) and lets the decrease loop find the
35//! right level, rather than starting small and ramping -- a cold pipeline
36//! should not be artificially throttled.
37//!
38//! Additive and default-off (the `governor` feature). NOT wired into any
39//! driver or recv loop here; that lands in a later phase.
40
41use std::sync::Arc;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::time::Duration;
44
45use super::source::UnifiedPressure;
46
47/// EMA smoothing factor for the process-time / ingest-interval signals.
48const DEFAULT_EMA_ALPHA: f64 = 0.3;
49
50/// Target utilisation: keep the stage ~70% busy, 30% headroom.
51const DEFAULT_TARGET_RHO: f64 = 0.7;
52
53/// Configuration for the [`ByteBudgetController`].
54///
55/// All byte values are in bytes. The controller starts at `start_bytes`
56/// and moves between `min_bytes` (derived from `floor_records`) and
57/// `max_bytes`.
58#[derive(Debug, Clone, Copy)]
59pub struct ByteBudgetConfig {
60    /// Initial budget. Starts BIG so a cold pipeline is not throttled.
61    pub start_bytes: u64,
62    /// Hard ceiling on the budget (additive-increase saturates here).
63    pub max_bytes: u64,
64    /// Floor in records: the budget never drops below
65    /// `floor_records * nominal_record_bytes` (and never below 1 byte).
66    pub floor_records: u64,
67    /// Nominal per-record size used to derive the byte floor from
68    /// `floor_records`. Record sizes vary at runtime; this is only the
69    /// floor estimate, not a live measurement.
70    pub nominal_record_bytes: u64,
71    /// Target utilisation `rho` in `(0, 1)`. Default `0.7`.
72    pub target_rho: f64,
73    /// Additive-increase step (bytes added per slack observation).
74    pub ai_step: u64,
75    /// Multiplicative-decrease factor in `(0, 1)`. Default e.g. `0.5`.
76    pub md_factor: f64,
77    /// EMA smoothing factor in `(0, 1]` for the timing signals.
78    pub ema_alpha: f64,
79    /// Poll-safety cap on record count, independent of the byte budget.
80    /// A tiny-record flood cannot blow the count even within budget.
81    pub record_cap: usize,
82}
83
84impl Default for ByteBudgetConfig {
85    fn default() -> Self {
86        Self {
87            // 8 MiB start, 64 MiB ceiling -- generous defaults; a real
88            // deployment tunes these via config in a later phase.
89            start_bytes: 8 * 1024 * 1024,
90            max_bytes: 64 * 1024 * 1024,
91            floor_records: 1,
92            nominal_record_bytes: 1024,
93            target_rho: DEFAULT_TARGET_RHO,
94            ai_step: 256 * 1024,
95            md_factor: 0.5,
96            ema_alpha: DEFAULT_EMA_ALPHA,
97            record_cap: 2000,
98        }
99    }
100}
101
102impl ByteBudgetConfig {
103    /// The derived absolute byte floor: `floor_records * nominal_record_bytes`,
104    /// clamped to at least `1` (the budget is never `0`).
105    #[must_use]
106    fn min_bytes(&self) -> u64 {
107        self.floor_records
108            .saturating_mul(self.nominal_record_bytes)
109            .max(1)
110    }
111
112    /// Sanitise the config so the control loop cannot misbehave: clamp
113    /// `target_rho` and `ema_alpha` into their open ranges, force
114    /// `md_factor` into `(0, 1)`, and ensure `record_cap >= 1`,
115    /// `max_bytes >= min_bytes`, and `start_bytes` inside `[min, max]`.
116    fn sanitised(mut self) -> Self {
117        if !self.target_rho.is_finite() || self.target_rho <= 0.0 || self.target_rho >= 1.0 {
118            self.target_rho = DEFAULT_TARGET_RHO;
119        }
120        if !self.ema_alpha.is_finite() || self.ema_alpha <= 0.0 || self.ema_alpha > 1.0 {
121            self.ema_alpha = DEFAULT_EMA_ALPHA;
122        }
123        if !self.md_factor.is_finite() || self.md_factor <= 0.0 || self.md_factor >= 1.0 {
124            self.md_factor = 0.5;
125        }
126        self.record_cap = self.record_cap.max(1);
127        let min = self.min_bytes();
128        self.max_bytes = self.max_bytes.max(min);
129        self.start_bytes = self.start_bytes.clamp(min, self.max_bytes);
130        self
131    }
132}
133
134/// Stores an `f64` as an atomic bit-pattern so the controller stays
135/// `Sync` with interior mutability and no lock (the crate forbids
136/// `unsafe`, so a `Cell` would not be `Sync`).
137struct AtomicF64(AtomicU64);
138
139impl AtomicF64 {
140    fn new(value: f64) -> Self {
141        Self(AtomicU64::new(value.to_bits()))
142    }
143    fn load(&self) -> f64 {
144        f64::from_bits(self.0.load(Ordering::Relaxed))
145    }
146    fn store(&self, value: f64) {
147        self.0.store(value.to_bits(), Ordering::Relaxed);
148    }
149}
150
151/// AIMD byte-budget lever with a memory HARD override.
152///
153/// See the [module docs](crate::governor) for the algorithm. `observe()`
154/// is the control step; `byte_budget()` and `record_cap()` are the cheap reads
155/// the recv loop consults. All state is interior-mutable and `Sync`.
156pub struct ByteBudgetController {
157    cfg: ByteBudgetConfig,
158    pressure: Arc<UnifiedPressure>,
159    /// EMA of observed batch process time, in seconds.
160    ema_process_s: AtomicF64,
161    /// EMA of observed ingest inter-arrival interval, in seconds.
162    ema_ingest_s: AtomicF64,
163    /// Whether any timing observation has been folded in yet.
164    seeded: std::sync::atomic::AtomicBool,
165    /// Current byte budget.
166    budget: AtomicU64,
167}
168
169impl ByteBudgetController {
170    /// Build a controller from config and a shared pressure latch.
171    ///
172    /// The config is sanitised (ranges clamped, floors enforced); the
173    /// budget starts at the sanitised `start_bytes`.
174    #[must_use]
175    pub fn new(cfg: ByteBudgetConfig, pressure: Arc<UnifiedPressure>) -> Self {
176        let cfg = cfg.sanitised();
177        Self {
178            budget: AtomicU64::new(cfg.start_bytes),
179            ema_process_s: AtomicF64::new(0.0),
180            ema_ingest_s: AtomicF64::new(0.0),
181            seeded: std::sync::atomic::AtomicBool::new(false),
182            cfg,
183            pressure,
184        }
185    }
186
187    /// Fold one observation into the control loop and update the budget.
188    ///
189    /// `batch_bytes` is currently informational (the loop drives off
190    /// timing, not size); `process_time` is how long the batch took to
191    /// process and `ingest_interval` is the gap since the previous
192    /// batch's arrival.
193    ///
194    /// Steps, in order:
195    /// 1. EMA-smooth `process_time` and `ingest_interval`.
196    /// 2. Compute `rho = ema_process / ema_ingest`. A zero (or
197    ///    sub-resolution) `ingest_interval` means arrivals are
198    ///    back-to-back faster than we process -- treat rho as high
199    ///    (behind), which is the safe direction (shrink).
200    /// 3. If memory says hold (the HARD override), multiplicative-decrease
201    ///    and return -- memory never waits for the rho loop.
202    /// 4. Otherwise AIMD on rho vs `target_rho`.
203    /// 5. Clamp to `[min_bytes, max_bytes]`.
204    pub fn observe(&self, batch_bytes: u64, process_time: Duration, ingest_interval: Duration) {
205        let _ = batch_bytes; // reserved for a future size-aware refinement
206
207        let alpha = self.cfg.ema_alpha;
208        let proc_s = process_time.as_secs_f64();
209        let ingest_s = ingest_interval.as_secs_f64();
210
211        // Step 1: EMA. Seed on the first observation so the average is not
212        // dragged from a 0.0 cold start.
213        if self.seeded.swap(true, Ordering::Relaxed) {
214            let new_proc = alpha.mul_add(proc_s, (1.0 - alpha) * self.ema_process_s.load());
215            let new_ingest = alpha.mul_add(ingest_s, (1.0 - alpha) * self.ema_ingest_s.load());
216            self.ema_process_s.store(new_proc);
217            self.ema_ingest_s.store(new_ingest);
218        } else {
219            self.ema_process_s.store(proc_s);
220            self.ema_ingest_s.store(ingest_s);
221        }
222
223        // Step 3: memory HARD override takes precedence over rho entirely.
224        if self.pressure.should_hold() {
225            self.multiplicative_decrease();
226            return;
227        }
228
229        // Step 2: rho. Guard div-by-zero: a non-positive ingest EMA means
230        // arrivals outrun processing -> treat as "behind" (shrink).
231        let ema_ingest = self.ema_ingest_s.load();
232        let ema_process = self.ema_process_s.load();
233        let behind = if ema_ingest <= f64::EPSILON {
234            // Back-to-back arrivals with any processing cost -> behind.
235            // Pure-zero processing AND zero ingest -> nothing to do.
236            ema_process > 0.0
237        } else {
238            (ema_process / ema_ingest) > self.cfg.target_rho
239        };
240
241        // Step 4: AIMD.
242        if behind {
243            self.multiplicative_decrease();
244        } else {
245            self.additive_increase();
246        }
247    }
248
249    /// Additive-increase: budget += ai_step, saturating at `max_bytes`.
250    fn additive_increase(&self) {
251        let cur = self.budget.load(Ordering::Relaxed);
252        let next = cur.saturating_add(self.cfg.ai_step).min(self.cfg.max_bytes);
253        self.budget.store(next, Ordering::Relaxed);
254    }
255
256    /// Multiplicative-decrease: budget *= md_factor, clamped to the floor
257    /// and never `0`.
258    fn multiplicative_decrease(&self) {
259        let cur = self.budget.load(Ordering::Relaxed);
260        // f64 math then back to u64; budgets are well under 2^52 so this is
261        // lossless in the operating range. `floor()` then clamp.
262        #[allow(
263            clippy::cast_precision_loss,
264            clippy::cast_sign_loss,
265            clippy::cast_possible_truncation
266        )]
267        let scaled = (cur as f64 * self.cfg.md_factor).floor() as u64;
268        let next = scaled.max(self.cfg.min_bytes());
269        self.budget.store(next, Ordering::Relaxed);
270    }
271
272    /// Current byte budget. Always `>= min_bytes`, never `0`.
273    #[must_use]
274    pub fn byte_budget(&self) -> u64 {
275        self.budget.load(Ordering::Relaxed)
276    }
277
278    /// Poll-safety record cap (recv max count), independent of the byte
279    /// budget. Always `>= 1` so a tiny-record flood cannot blow the count
280    /// even when many records fit inside the byte budget.
281    #[must_use]
282    pub fn record_cap(&self) -> usize {
283        self.cfg.record_cap
284    }
285
286    /// The shared pressure governor this controller drives off. Lets a caller
287    /// (e.g. the governed driver) read the combined
288    /// [`level`](UnifiedPressure::level) for the `pressure_ratio` gauge without
289    /// holding a second `Arc`.
290    #[must_use]
291    pub fn pressure(&self) -> &Arc<UnifiedPressure> {
292        &self.pressure
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use crate::governor::source::{Hysteresis, Pressure, PressureSource};
300    use std::sync::atomic::AtomicU64 as StdAtomicU64;
301
302    /// Scriptable HARD source so the test can force `should_hold()`.
303    struct MockSource {
304        value: StdAtomicU64,
305    }
306    impl MockSource {
307        fn new(value: f64) -> Self {
308            Self {
309                value: StdAtomicU64::new(value.to_bits()),
310            }
311        }
312        fn set(&self, value: f64) {
313            self.value.store(value.to_bits(), Ordering::Relaxed);
314        }
315    }
316    impl PressureSource for MockSource {
317        fn name(&self) -> &'static str {
318            "mock"
319        }
320        fn sample(&self) -> Pressure {
321            Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
322        }
323        fn is_hard(&self) -> bool {
324            true
325        }
326    }
327
328    fn controller(
329        cfg: ByteBudgetConfig,
330        src: &Arc<MockSource>,
331    ) -> (ByteBudgetController, Arc<UnifiedPressure>) {
332        let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
333        let pressure = Arc::new(UnifiedPressure::new(
334            vec![Arc::clone(src) as Arc<dyn PressureSource>],
335            hyst,
336        ));
337        (
338            ByteBudgetController::new(cfg, Arc::clone(&pressure)),
339            pressure,
340        )
341    }
342
343    fn test_cfg() -> ByteBudgetConfig {
344        ByteBudgetConfig {
345            start_bytes: 10_000,
346            max_bytes: 100_000,
347            floor_records: 1,
348            nominal_record_bytes: 1000, // min_bytes = 1000
349            target_rho: 0.7,
350            ai_step: 5_000,
351            md_factor: 0.5,
352            ema_alpha: 1.0, // alpha=1 -> EMA == latest sample (deterministic)
353            record_cap: 2000,
354        }
355    }
356
357    fn ms(n: u64) -> Duration {
358        Duration::from_millis(n)
359    }
360
361    #[test]
362    fn starts_big_at_start_bytes() {
363        let src = Arc::new(MockSource::new(0.0));
364        let (ctl, _p) = controller(test_cfg(), &src);
365        assert_eq!(ctl.byte_budget(), 10_000);
366        assert!(ctl.record_cap() >= 1);
367        assert_eq!(ctl.record_cap(), 2000);
368    }
369
370    /// rho < 0.7 (slack) -> budget grows additively, monotone up, capped.
371    #[test]
372    fn slack_grows_budget_additively_and_caps() {
373        let src = Arc::new(MockSource::new(0.0)); // no memory pressure
374        let (ctl, _p) = controller(test_cfg(), &src);
375
376        // process 10ms, ingest 100ms -> rho = 0.1 < 0.7 -> slack.
377        let mut last = ctl.byte_budget();
378        for _ in 0..50 {
379            ctl.observe(500, ms(10), ms(100));
380            let now = ctl.byte_budget();
381            assert!(now >= last, "budget must be monotone up under slack");
382            last = now;
383        }
384        // ai_step 5000, start 10_000, cap 100_000 -> saturates at the cap.
385        assert_eq!(ctl.byte_budget(), 100_000, "additive-increase caps at max");
386    }
387
388    /// rho > 0.7 (behind) -> budget shrinks multiplicatively toward floor.
389    #[test]
390    fn behind_shrinks_budget_multiplicatively() {
391        let src = Arc::new(MockSource::new(0.0));
392        let (ctl, _p) = controller(test_cfg(), &src);
393
394        // process 90ms, ingest 100ms -> rho = 0.9 > 0.7 -> behind.
395        let first = ctl.byte_budget();
396        ctl.observe(500, ms(90), ms(100));
397        let after = ctl.byte_budget();
398        assert!(after < first, "behind must shrink the budget");
399        // 10_000 * 0.5 = 5_000.
400        assert_eq!(after, 5_000);
401
402        // Keep going -> shrinks toward the 1000-byte floor, never below.
403        for _ in 0..20 {
404            ctl.observe(500, ms(90), ms(100));
405        }
406        assert_eq!(ctl.byte_budget(), 1_000, "shrink clamps to min_bytes");
407        assert!(ctl.byte_budget() >= 1, "never zero");
408    }
409
410    /// THE adversarial test: memory HARD override beats rho.
411    ///
412    /// Even with rho deep in slack (would grow), forcing memory pressure
413    /// high (`should_hold()` true) must multiplicative-decrease the budget
414    /// IMMEDIATELY, toward the floor, never to zero.
415    #[test]
416    fn memory_pressure_overrides_rho_and_shrinks_to_floor() {
417        let src = Arc::new(MockSource::new(0.0));
418        let (ctl, _p) = controller(test_cfg(), &src);
419
420        // Grow a bit first under slack so there is room to shrink.
421        ctl.observe(500, ms(10), ms(100)); // -> 15_000
422        ctl.observe(500, ms(10), ms(100)); // -> 20_000
423        assert_eq!(ctl.byte_budget(), 20_000);
424
425        // Now SLAM memory high. rho is still deep slack (10ms/100ms) but
426        // the HARD override must win and SHRINK.
427        src.set(0.95);
428        let before = ctl.byte_budget();
429        ctl.observe(500, ms(10), ms(100));
430        let after = ctl.byte_budget();
431        assert!(
432            after < before,
433            "memory override must shrink even when rho says slack"
434        );
435        assert_eq!(after, 10_000, "20_000 * 0.5 under override");
436
437        // Sustained pressure drives toward the floor, never zero.
438        for _ in 0..20 {
439            ctl.observe(500, ms(10), ms(100));
440        }
441        assert_eq!(ctl.byte_budget(), 1_000, "override clamps to floor");
442        assert!(ctl.byte_budget() >= 1);
443        assert!(ctl.record_cap() >= 1);
444    }
445
446    /// ingest_interval == 0 must not panic or divide-by-zero, and must be
447    /// treated as "behind" (shrink) -- the safe direction.
448    #[test]
449    fn zero_ingest_interval_is_safe_and_treated_as_behind() {
450        let src = Arc::new(MockSource::new(0.0));
451        let (ctl, _p) = controller(test_cfg(), &src);
452
453        let before = ctl.byte_budget();
454        // Zero ingest interval, non-zero processing -> behind -> shrink.
455        ctl.observe(500, ms(5), Duration::ZERO);
456        let after = ctl.byte_budget();
457        assert!(after <= before, "zero ingest must not grow the budget");
458        assert_eq!(after, 5_000, "treated as behind -> multiplicative-decrease");
459        assert!(ctl.byte_budget() >= 1);
460
461        // Both zero -> nothing to do (no processing cost, no arrivals):
462        // must not panic and must not collapse the budget below the floor.
463        let cur = ctl.byte_budget();
464        ctl.observe(0, Duration::ZERO, Duration::ZERO);
465        // alpha=1 so ema_process becomes 0 -> not behind -> additive-increase.
466        assert!(ctl.byte_budget() >= cur, "both-zero is no-pressure slack");
467    }
468
469    /// Config sanitisation: garbage ranges fall back to safe defaults and
470    /// the budget never starts at or reaches zero.
471    #[test]
472    fn config_is_sanitised() {
473        let src = Arc::new(MockSource::new(0.0));
474        let bad = ByteBudgetConfig {
475            start_bytes: 0, // below floor
476            max_bytes: 0,   // below floor
477            floor_records: 2,
478            nominal_record_bytes: 500, // min_bytes = 1000
479            target_rho: 5.0,           // out of range -> default 0.7
480            ai_step: 1_000,
481            md_factor: 2.0, // out of range -> default 0.5
482            ema_alpha: 0.0, // out of range -> default
483            record_cap: 0,  // -> clamped to 1
484        };
485        let (ctl, _p) = controller(bad, &src);
486        assert_eq!(ctl.byte_budget(), 1_000, "start clamped up to min_bytes");
487        assert!(ctl.record_cap() >= 1);
488    }
489}