use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use super::source::UnifiedPressure;
const DEFAULT_EMA_ALPHA: f64 = 0.3;
const DEFAULT_TARGET_RHO: f64 = 0.7;
#[derive(Debug, Clone, Copy)]
pub struct ByteBudgetConfig {
pub start_bytes: u64,
pub max_bytes: u64,
pub floor_records: u64,
pub nominal_record_bytes: u64,
pub target_rho: f64,
pub ai_step: u64,
pub md_factor: f64,
pub ema_alpha: f64,
pub record_cap: usize,
}
impl Default for ByteBudgetConfig {
fn default() -> Self {
Self {
start_bytes: 8 * 1024 * 1024,
max_bytes: 64 * 1024 * 1024,
floor_records: 1,
nominal_record_bytes: 1024,
target_rho: DEFAULT_TARGET_RHO,
ai_step: 256 * 1024,
md_factor: 0.5,
ema_alpha: DEFAULT_EMA_ALPHA,
record_cap: 2000,
}
}
}
impl ByteBudgetConfig {
#[must_use]
fn min_bytes(&self) -> u64 {
self.floor_records
.saturating_mul(self.nominal_record_bytes)
.max(1)
}
fn sanitised(mut self) -> Self {
if !self.target_rho.is_finite() || self.target_rho <= 0.0 || self.target_rho >= 1.0 {
self.target_rho = DEFAULT_TARGET_RHO;
}
if !self.ema_alpha.is_finite() || self.ema_alpha <= 0.0 || self.ema_alpha > 1.0 {
self.ema_alpha = DEFAULT_EMA_ALPHA;
}
if !self.md_factor.is_finite() || self.md_factor <= 0.0 || self.md_factor >= 1.0 {
self.md_factor = 0.5;
}
self.record_cap = self.record_cap.max(1);
let min = self.min_bytes();
self.max_bytes = self.max_bytes.max(min);
self.start_bytes = self.start_bytes.clamp(min, self.max_bytes);
self
}
}
struct AtomicF64(AtomicU64);
impl AtomicF64 {
fn new(value: f64) -> Self {
Self(AtomicU64::new(value.to_bits()))
}
fn load(&self) -> f64 {
f64::from_bits(self.0.load(Ordering::Relaxed))
}
fn store(&self, value: f64) {
self.0.store(value.to_bits(), Ordering::Relaxed);
}
}
pub struct ByteBudgetController {
cfg: ByteBudgetConfig,
pressure: Arc<UnifiedPressure>,
ema_process_s: AtomicF64,
ema_ingest_s: AtomicF64,
seeded: std::sync::atomic::AtomicBool,
budget: AtomicU64,
}
impl ByteBudgetController {
#[must_use]
pub fn new(cfg: ByteBudgetConfig, pressure: Arc<UnifiedPressure>) -> Self {
let cfg = cfg.sanitised();
Self {
budget: AtomicU64::new(cfg.start_bytes),
ema_process_s: AtomicF64::new(0.0),
ema_ingest_s: AtomicF64::new(0.0),
seeded: std::sync::atomic::AtomicBool::new(false),
cfg,
pressure,
}
}
pub fn observe(&self, batch_bytes: u64, process_time: Duration, ingest_interval: Duration) {
let _ = batch_bytes;
let alpha = self.cfg.ema_alpha;
let proc_s = process_time.as_secs_f64();
let ingest_s = ingest_interval.as_secs_f64();
if self.seeded.swap(true, Ordering::Relaxed) {
let new_proc = alpha.mul_add(proc_s, (1.0 - alpha) * self.ema_process_s.load());
let new_ingest = alpha.mul_add(ingest_s, (1.0 - alpha) * self.ema_ingest_s.load());
self.ema_process_s.store(new_proc);
self.ema_ingest_s.store(new_ingest);
} else {
self.ema_process_s.store(proc_s);
self.ema_ingest_s.store(ingest_s);
}
if self.pressure.should_hold() {
self.multiplicative_decrease();
return;
}
let ema_ingest = self.ema_ingest_s.load();
let ema_process = self.ema_process_s.load();
let behind = if ema_ingest <= f64::EPSILON {
ema_process > 0.0
} else {
(ema_process / ema_ingest) > self.cfg.target_rho
};
if behind {
self.multiplicative_decrease();
} else {
self.additive_increase();
}
}
fn additive_increase(&self) {
let cur = self.budget.load(Ordering::Relaxed);
let next = cur.saturating_add(self.cfg.ai_step).min(self.cfg.max_bytes);
self.budget.store(next, Ordering::Relaxed);
}
fn multiplicative_decrease(&self) {
let cur = self.budget.load(Ordering::Relaxed);
#[allow(
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::cast_possible_truncation
)]
let scaled = (cur as f64 * self.cfg.md_factor).floor() as u64;
let next = scaled.max(self.cfg.min_bytes());
self.budget.store(next, Ordering::Relaxed);
}
#[must_use]
pub fn byte_budget(&self) -> u64 {
self.budget.load(Ordering::Relaxed)
}
#[must_use]
pub fn record_cap(&self) -> usize {
self.cfg.record_cap
}
#[must_use]
pub fn pressure(&self) -> &Arc<UnifiedPressure> {
&self.pressure
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::governor::source::{Hysteresis, Pressure, PressureSource};
use std::sync::atomic::AtomicU64 as StdAtomicU64;
struct MockSource {
value: StdAtomicU64,
}
impl MockSource {
fn new(value: f64) -> Self {
Self {
value: StdAtomicU64::new(value.to_bits()),
}
}
fn set(&self, value: f64) {
self.value.store(value.to_bits(), Ordering::Relaxed);
}
}
impl PressureSource for MockSource {
fn name(&self) -> &'static str {
"mock"
}
fn sample(&self) -> Pressure {
Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
}
fn is_hard(&self) -> bool {
true
}
}
fn controller(
cfg: ByteBudgetConfig,
src: &Arc<MockSource>,
) -> (ByteBudgetController, Arc<UnifiedPressure>) {
let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
let pressure = Arc::new(UnifiedPressure::new(
vec![Arc::clone(src) as Arc<dyn PressureSource>],
hyst,
));
(
ByteBudgetController::new(cfg, Arc::clone(&pressure)),
pressure,
)
}
fn test_cfg() -> ByteBudgetConfig {
ByteBudgetConfig {
start_bytes: 10_000,
max_bytes: 100_000,
floor_records: 1,
nominal_record_bytes: 1000, target_rho: 0.7,
ai_step: 5_000,
md_factor: 0.5,
ema_alpha: 1.0, record_cap: 2000,
}
}
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}
#[test]
fn starts_big_at_start_bytes() {
let src = Arc::new(MockSource::new(0.0));
let (ctl, _p) = controller(test_cfg(), &src);
assert_eq!(ctl.byte_budget(), 10_000);
assert!(ctl.record_cap() >= 1);
assert_eq!(ctl.record_cap(), 2000);
}
#[test]
fn slack_grows_budget_additively_and_caps() {
let src = Arc::new(MockSource::new(0.0)); let (ctl, _p) = controller(test_cfg(), &src);
let mut last = ctl.byte_budget();
for _ in 0..50 {
ctl.observe(500, ms(10), ms(100));
let now = ctl.byte_budget();
assert!(now >= last, "budget must be monotone up under slack");
last = now;
}
assert_eq!(ctl.byte_budget(), 100_000, "additive-increase caps at max");
}
#[test]
fn behind_shrinks_budget_multiplicatively() {
let src = Arc::new(MockSource::new(0.0));
let (ctl, _p) = controller(test_cfg(), &src);
let first = ctl.byte_budget();
ctl.observe(500, ms(90), ms(100));
let after = ctl.byte_budget();
assert!(after < first, "behind must shrink the budget");
assert_eq!(after, 5_000);
for _ in 0..20 {
ctl.observe(500, ms(90), ms(100));
}
assert_eq!(ctl.byte_budget(), 1_000, "shrink clamps to min_bytes");
assert!(ctl.byte_budget() >= 1, "never zero");
}
#[test]
fn memory_pressure_overrides_rho_and_shrinks_to_floor() {
let src = Arc::new(MockSource::new(0.0));
let (ctl, _p) = controller(test_cfg(), &src);
ctl.observe(500, ms(10), ms(100)); ctl.observe(500, ms(10), ms(100)); assert_eq!(ctl.byte_budget(), 20_000);
src.set(0.95);
let before = ctl.byte_budget();
ctl.observe(500, ms(10), ms(100));
let after = ctl.byte_budget();
assert!(
after < before,
"memory override must shrink even when rho says slack"
);
assert_eq!(after, 10_000, "20_000 * 0.5 under override");
for _ in 0..20 {
ctl.observe(500, ms(10), ms(100));
}
assert_eq!(ctl.byte_budget(), 1_000, "override clamps to floor");
assert!(ctl.byte_budget() >= 1);
assert!(ctl.record_cap() >= 1);
}
#[test]
fn zero_ingest_interval_is_safe_and_treated_as_behind() {
let src = Arc::new(MockSource::new(0.0));
let (ctl, _p) = controller(test_cfg(), &src);
let before = ctl.byte_budget();
ctl.observe(500, ms(5), Duration::ZERO);
let after = ctl.byte_budget();
assert!(after <= before, "zero ingest must not grow the budget");
assert_eq!(after, 5_000, "treated as behind -> multiplicative-decrease");
assert!(ctl.byte_budget() >= 1);
let cur = ctl.byte_budget();
ctl.observe(0, Duration::ZERO, Duration::ZERO);
assert!(ctl.byte_budget() >= cur, "both-zero is no-pressure slack");
}
#[test]
fn config_is_sanitised() {
let src = Arc::new(MockSource::new(0.0));
let bad = ByteBudgetConfig {
start_bytes: 0, max_bytes: 0, floor_records: 2,
nominal_record_bytes: 500, target_rho: 5.0, ai_step: 1_000,
md_factor: 2.0, ema_alpha: 0.0, record_cap: 0, };
let (ctl, _p) = controller(bad, &src);
assert_eq!(ctl.byte_budget(), 1_000, "start clamped up to min_bytes");
assert!(ctl.record_cap() >= 1);
}
}