use std::time::{Duration, Instant};
pub const ADAPTIVE_SAMPLE_INTERVAL: usize = 10;
pub const ADAPTIVE_MIN_BATCH: usize = 500;
pub fn next_adaptive_batch_size(current: usize, base: usize, under_pressure: bool) -> usize {
if under_pressure {
(current * 3 / 4).max(ADAPTIVE_MIN_BATCH)
} else {
(current * 5 / 4).min(base)
}
}
pub const GOVERNOR_SAMPLE_INTERVAL_MS: u64 = 1500;
pub fn next_parallel(current: usize, min: usize, max: usize, under_pressure: bool) -> usize {
let lo = min.max(1);
let hi = max.max(lo);
let cur = current.clamp(lo, hi);
if under_pressure {
cur.saturating_sub(1).max(lo)
} else {
(cur + 1).min(hi)
}
}
#[derive(Debug)]
pub struct GovernorState {
prev: Option<u64>,
current: usize,
floor: usize,
ceiling: usize,
}
impl GovernorState {
pub fn new(start: usize, floor: usize, ceiling: usize) -> Self {
let ceiling = ceiling.max(1);
let floor = floor.clamp(1, ceiling);
Self {
prev: None,
current: start.clamp(floor, ceiling),
floor,
ceiling,
}
}
#[cfg(test)]
pub fn current(&self) -> usize {
self.current
}
pub fn observe(&mut self, sample: Option<u64>) -> Option<(usize, usize)> {
let cur_p = sample?;
let under_pressure = self.prev.is_some_and(|p| cur_p > p);
self.prev = Some(cur_p);
let next = next_parallel(self.current, self.floor, self.ceiling, under_pressure);
if next == self.current {
None
} else {
let from = self.current;
self.current = next;
Some((from, next))
}
}
}
pub const GOVERNOR_POLL_MS: u64 = 200;
pub trait PressureSource: Send {
fn sample_pressure(&mut self) -> Option<u64>;
}
impl PressureSource for Box<dyn crate::source::Source> {
fn sample_pressure(&mut self) -> Option<u64> {
crate::source::Source::sample_pressure(self.as_mut())
}
}
pub struct Governor {
state: GovernorState,
sample_interval: Duration,
poll_interval: Duration,
}
impl Governor {
pub fn new(start: usize, floor: usize, ceiling: usize) -> Self {
let sample_ms = sample_interval_ms_from_env();
let poll_ms = GOVERNOR_POLL_MS.min(sample_ms);
Self {
state: GovernorState::new(start, floor, ceiling),
sample_interval: Duration::from_millis(sample_ms),
poll_interval: Duration::from_millis(poll_ms),
}
}
#[cfg(test)]
pub fn with_intervals(
start: usize,
floor: usize,
ceiling: usize,
sample_interval: Duration,
poll_interval: Duration,
) -> Self {
Self {
state: GovernorState::new(start, floor, ceiling),
sample_interval,
poll_interval,
}
}
pub fn tick(&mut self, sample: Option<u64>) -> Option<(usize, usize)> {
self.state.observe(sample)
}
pub fn run<S, Stop, Decide>(&mut self, source: &mut S, stop: Stop, mut on_decision: Decide)
where
S: PressureSource + ?Sized,
Stop: Fn() -> bool,
Decide: FnMut(usize, usize),
{
let mut last_sample = Instant::now();
while !stop() {
std::thread::sleep(self.poll_interval);
if stop() {
break;
}
if last_sample.elapsed() < self.sample_interval {
continue;
}
last_sample = Instant::now();
if let Some((from, to)) = self.tick(source.sample_pressure()) {
on_decision(from, to);
}
}
}
}
fn sample_interval_ms_from_env() -> u64 {
std::env::var("RIVET_GOVERNOR_INTERVAL_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.filter(|&n| n > 0)
.unwrap_or(GOVERNOR_SAMPLE_INTERVAL_MS)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn adaptive_shrinks_by_25_percent_under_pressure() {
assert_eq!(next_adaptive_batch_size(10_000, 10_000, true), 7_500);
assert_eq!(next_adaptive_batch_size(8_000, 10_000, true), 6_000);
}
#[test]
fn adaptive_grows_by_25_percent_when_idle() {
assert_eq!(next_adaptive_batch_size(4_000, 10_000, false), 5_000);
}
#[test]
fn adaptive_recovery_caps_at_base_ceiling() {
assert_eq!(next_adaptive_batch_size(9_000, 10_000, false), 10_000);
assert_eq!(next_adaptive_batch_size(10_000, 10_000, false), 10_000);
}
#[test]
fn adaptive_shrink_respects_min_floor() {
assert_eq!(
next_adaptive_batch_size(600, 10_000, true),
ADAPTIVE_MIN_BATCH
);
assert_eq!(
next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 10_000, true),
ADAPTIVE_MIN_BATCH
);
}
#[test]
fn adaptive_pressure_path_ignores_base_uses_only_floor() {
assert_eq!(
next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 100, true),
ADAPTIVE_MIN_BATCH
);
}
#[test]
fn adaptive_steady_state_oscillation_stays_bounded() {
let base = 5_000;
let mut s = base;
for _ in 0..50 {
s = next_adaptive_batch_size(s, base, true);
}
assert_eq!(
s, ADAPTIVE_MIN_BATCH,
"sustained pressure must converge to floor"
);
for _ in 0..50 {
s = next_adaptive_batch_size(s, base, false);
}
assert_eq!(s, base, "sustained recovery must converge to base ceiling");
}
#[test]
fn next_parallel_sheds_one_under_pressure() {
assert_eq!(next_parallel(8, 1, 8, true), 7);
assert_eq!(next_parallel(4, 1, 8, true), 3);
}
#[test]
fn next_parallel_recovers_one_when_idle() {
assert_eq!(next_parallel(4, 1, 8, false), 5);
}
#[test]
fn next_parallel_shrink_respects_min_floor() {
assert_eq!(next_parallel(2, 2, 8, true), 2, "already at min stays");
assert_eq!(next_parallel(1, 1, 8, true), 1, "never below 1");
}
#[test]
fn next_parallel_grow_respects_max_ceiling() {
assert_eq!(next_parallel(8, 1, 8, false), 8, "already at max stays");
}
#[test]
fn next_parallel_min_floored_at_one() {
assert_eq!(next_parallel(1, 0, 8, true), 1);
}
#[test]
fn next_parallel_steady_state_converges_to_bounds() {
let (min, max) = (2, 6);
let mut p = max;
for _ in 0..20 {
p = next_parallel(p, min, max, true);
}
assert_eq!(p, min, "sustained pressure converges to min");
for _ in 0..20 {
p = next_parallel(p, min, max, false);
}
assert_eq!(p, max, "sustained recovery converges to max");
}
#[test]
fn governor_state_clamps_start_into_bounds() {
assert_eq!(GovernorState::new(99, 2, 6).current(), 6);
assert_eq!(GovernorState::new(0, 2, 6).current(), 2);
assert_eq!(GovernorState::new(5, 0, 0).current(), 1);
}
#[test]
fn governor_state_first_sample_only_sets_baseline_then_recovers() {
let mut g = GovernorState::new(6, 2, 6);
assert_eq!(g.observe(Some(100)), None, "at ceiling, idle ⇒ no change");
assert_eq!(g.current(), 6);
}
#[test]
fn governor_state_backs_off_under_rising_pressure() {
let mut g = GovernorState::new(6, 2, 6);
assert_eq!(g.observe(Some(100)), None); assert_eq!(g.observe(Some(200)), Some((6, 5)), "rising ⇒ shed one");
assert_eq!(g.observe(Some(300)), Some((5, 4)));
assert_eq!(g.current(), 4);
}
#[test]
fn governor_state_recovers_when_pressure_flat() {
let mut g = GovernorState::new(3, 2, 6);
assert_eq!(
g.observe(Some(100)),
Some((3, 4)),
"flat/idle ⇒ recover one"
);
assert_eq!(g.observe(Some(100)), Some((4, 5)));
}
#[test]
fn governor_state_none_sample_holds_flat_and_keeps_baseline() {
let mut g = GovernorState::new(4, 2, 6);
assert_eq!(g.observe(Some(200)), Some((4, 5))); assert_eq!(g.observe(None), None, "no sample ⇒ no change");
assert_eq!(
g.observe(Some(300)),
Some((5, 4)),
"rising vs preserved baseline"
);
}
struct VecSource {
samples: std::collections::VecDeque<Option<u64>>,
sample_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}
impl VecSource {
fn new(
samples: impl IntoIterator<Item = Option<u64>>,
sample_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
) -> Self {
Self {
samples: samples.into_iter().collect(),
sample_count,
}
}
}
impl PressureSource for VecSource {
fn sample_pressure(&mut self) -> Option<u64> {
self.sample_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.samples.pop_front().unwrap_or(None)
}
}
#[test]
fn governor_tick_mirrors_governor_state_observe() {
let samples = [Some(100u64), Some(200), Some(150), None, Some(400)];
let mut g =
Governor::with_intervals(6, 2, 6, Duration::from_millis(1), Duration::from_millis(1));
let mut s = GovernorState::new(6, 2, 6);
for sample in samples {
assert_eq!(g.tick(sample), s.observe(sample));
}
}
#[test]
fn governor_run_emits_decisions_for_every_rising_sample_until_stop() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let sample_count = Arc::new(AtomicUsize::new(0));
let mut source = VecSource::new(
[
Some(100),
Some(200), Some(300), Some(400), Some(500), ],
Arc::clone(&sample_count),
);
let mut gov =
Governor::with_intervals(6, 2, 6, Duration::from_millis(1), Duration::from_millis(1));
let stop_count = Arc::clone(&sample_count);
let stop = move || stop_count.load(Ordering::Relaxed) >= 5;
let mut decisions: Vec<(usize, usize)> = Vec::new();
gov.run(&mut source, stop, |from, to| {
decisions.push((from, to));
});
assert_eq!(decisions, vec![(6, 5), (5, 4), (4, 3), (3, 2)]);
}
#[test]
fn governor_run_stops_promptly_within_one_poll_quantum() {
let sample_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let mut source = VecSource::new([Some(100)], sample_count);
let mut gov =
Governor::with_intervals(6, 2, 6, Duration::from_millis(50), Duration::from_millis(5));
let start = std::time::Instant::now();
gov.run(&mut source, || true, |_, _| {});
assert!(
start.elapsed() < Duration::from_millis(40),
"run() must exit on stop without sleeping a full sample interval"
);
}
}