use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
use std::time::Duration;
use super::capabilities::{GovernorBackoff, ProviderLimits};
const RATE_GOVERNOR_ENABLED_ENV: &str = "HARN_LLM_RATE_GOVERNOR";
const DEFAULT_MAX_CONCURRENCY: u32 = 8;
const DEFAULT_MIN_CONCURRENCY: u32 = 1;
const DEFAULT_BACKOFF_BASE_MS: u64 = 1_000;
const DEFAULT_BACKOFF_MAX_MS: u64 = 60_000;
const DEFAULT_BACKOFF_MULTIPLIER: f64 = 2.0;
const THROTTLE_STREAK_TO_OPEN: u32 = 3;
const SUCCESSES_PER_INCREASE: u32 = 4;
pub fn enabled() -> bool {
std::env::var(RATE_GOVERNOR_ENABLED_ENV)
.map(|raw| {
let v = raw.trim().to_ascii_lowercase();
matches!(v.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThrottleSignal {
RateLimit429,
Overloaded,
EmptyUnderLoad,
}
impl ThrottleSignal {
pub fn label(self) -> &'static str {
match self {
ThrottleSignal::RateLimit429 => "rate_limit_429",
ThrottleSignal::Overloaded => "overloaded",
ThrottleSignal::EmptyUnderLoad => "empty_under_load",
}
}
pub fn classify(
http_status: Option<u16>,
body_lower: &str,
empty_completion: bool,
provider_already_throttled: bool,
) -> Option<ThrottleSignal> {
if http_status == Some(429)
|| body_lower.contains("rate_limit_error")
|| body_lower.contains("temporarily limiting requests")
|| body_lower.contains(" 429 ")
|| body_lower.starts_with("429 ")
{
return Some(ThrottleSignal::RateLimit429);
}
if matches!(http_status, Some(529) | Some(503))
|| body_lower.contains("overloaded_error")
|| body_lower.contains("overloaded")
{
return Some(ThrottleSignal::Overloaded);
}
if empty_completion && provider_already_throttled {
return Some(ThrottleSignal::EmptyUnderLoad);
}
None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open { until_ms: u128 },
HalfOpen,
}
impl CircuitState {
fn label(self) -> &'static str {
match self {
CircuitState::Closed => "closed",
CircuitState::Open { .. } => "open",
CircuitState::HalfOpen => "half_open",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct ResolvedLimits {
pub max_concurrency: u32,
pub min_concurrency: u32,
pub rpm: Option<u32>,
pub tpm: Option<u64>,
pub adaptive: bool,
pub backoff_base_ms: u64,
pub backoff_max_ms: u64,
pub backoff_multiplier: f64,
pub backoff_jitter: bool,
}
impl Default for ResolvedLimits {
fn default() -> Self {
Self {
max_concurrency: DEFAULT_MAX_CONCURRENCY,
min_concurrency: DEFAULT_MIN_CONCURRENCY,
rpm: None,
tpm: None,
adaptive: true,
backoff_base_ms: DEFAULT_BACKOFF_BASE_MS,
backoff_max_ms: DEFAULT_BACKOFF_MAX_MS,
backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER,
backoff_jitter: true,
}
}
}
impl ResolvedLimits {
pub fn from_catalog(row: Option<&ProviderLimits>) -> Self {
let mut resolved = ResolvedLimits::default();
let Some(row) = row else { return resolved };
if let Some(v) = row.max_concurrency {
resolved.max_concurrency = v.max(1);
}
if let Some(v) = row.min_concurrency {
resolved.min_concurrency = v.max(1);
}
resolved.min_concurrency = resolved.min_concurrency.min(resolved.max_concurrency);
resolved.rpm = row.rpm;
resolved.tpm = row.tpm;
if let Some(v) = row.adaptive {
resolved.adaptive = v;
}
if let Some(b) = row.backoff.as_ref() {
apply_backoff(&mut resolved, b);
}
resolved
}
}
fn apply_backoff(resolved: &mut ResolvedLimits, b: &GovernorBackoff) {
if let Some(v) = b.base_ms {
resolved.backoff_base_ms = v.max(1);
}
if let Some(v) = b.max_ms {
resolved.backoff_max_ms = v.max(resolved.backoff_base_ms);
}
if let Some(v) = b.multiplier {
resolved.backoff_multiplier = if v.is_finite() && v >= 1.0 { v } else { 1.0 };
}
if let Some(v) = b.jitter {
resolved.backoff_jitter = v;
}
}
#[derive(Debug)]
struct TokenBucket {
max: u64,
window_ms: u128,
entries: std::collections::VecDeque<(u128, u64)>,
}
impl TokenBucket {
fn new(max: u64) -> Self {
Self {
max: max.max(1),
window_ms: 60_000,
entries: std::collections::VecDeque::new(),
}
}
fn prune(&mut self, now_ms: u128) {
while self
.entries
.front()
.is_some_and(|(t, _)| now_ms.saturating_sub(*t) >= self.window_ms)
{
self.entries.pop_front();
}
}
fn used(&self) -> u64 {
self.entries.iter().map(|(_, u)| *u).sum()
}
fn wait_for(&mut self, now_ms: u128, units: u64) -> Option<Duration> {
self.prune(now_ms);
let charge = units.min(self.max);
if self.used().saturating_add(charge) <= self.max {
return None;
}
self.entries.front().map(|(t, _)| {
let elapsed = now_ms.saturating_sub(*t);
let remaining = self.window_ms.saturating_sub(elapsed);
Duration::from_millis(remaining.min(u128::from(u64::MAX)) as u64)
})
}
fn record(&mut self, now_ms: u128, units: u64) {
let charge = units.min(self.max);
if charge > 0 {
self.entries.push_back((now_ms, charge));
}
}
}
#[derive(Debug)]
pub struct ProviderGovernor {
limits: ResolvedLimits,
concurrency_limit: u32,
in_flight: u32,
consecutive_successes: u32,
consecutive_throttles: u32,
open_cycles: u32,
circuit: CircuitState,
rpm_bucket: Option<TokenBucket>,
tpm_bucket: Option<TokenBucket>,
last_signal: Option<ThrottleSignal>,
jitter_state: u64,
}
impl ProviderGovernor {
fn new(limits: ResolvedLimits) -> Self {
let rpm_bucket = limits.rpm.map(|r| TokenBucket::new(u64::from(r)));
let tpm_bucket = limits.tpm.map(TokenBucket::new);
Self {
concurrency_limit: limits.max_concurrency,
in_flight: 0,
consecutive_successes: 0,
consecutive_throttles: 0,
open_cycles: 0,
circuit: CircuitState::Closed,
rpm_bucket,
tpm_bucket,
last_signal: None,
jitter_state: 0x9E37_79B9_7F4A_7C15,
limits,
}
}
fn is_throttled(&self) -> bool {
!matches!(self.circuit, CircuitState::Closed)
}
fn tick_circuit(&mut self, now_ms: u128) -> CircuitState {
if let CircuitState::Open { until_ms } = self.circuit {
if now_ms >= until_ms {
self.circuit = CircuitState::HalfOpen;
}
}
self.circuit
}
fn decide(&mut self, now_ms: u128, est_tokens: u64) -> GateOutcome {
match self.tick_circuit(now_ms) {
CircuitState::Open { until_ms } => {
let remaining = until_ms.saturating_sub(now_ms);
return GateOutcome::CircuitOpen(Duration::from_millis(
remaining.min(u128::from(u64::MAX)) as u64,
));
}
CircuitState::HalfOpen => {
if self.in_flight > 0 {
return GateOutcome::Wait(Duration::from_millis(50));
}
}
CircuitState::Closed => {}
}
if matches!(self.circuit, CircuitState::Closed) {
if let Some(b) = self.rpm_bucket.as_mut() {
if let Some(d) = b.wait_for(now_ms, 1) {
return GateOutcome::Wait(d);
}
}
if let Some(b) = self.tpm_bucket.as_mut() {
if let Some(d) = b.wait_for(now_ms, est_tokens) {
return GateOutcome::Wait(d);
}
}
}
let effective_limit = if matches!(self.circuit, CircuitState::HalfOpen) {
1
} else {
self.concurrency_limit
};
if self.in_flight >= effective_limit {
return GateOutcome::Wait(Duration::from_millis(25));
}
self.in_flight += 1;
if matches!(self.circuit, CircuitState::Closed) {
if let Some(b) = self.rpm_bucket.as_mut() {
b.record(now_ms, 1);
}
if let Some(b) = self.tpm_bucket.as_mut() {
b.record(now_ms, est_tokens);
}
}
GateOutcome::Proceed
}
fn release_success(&mut self, now_ms: u128) {
self.in_flight = self.in_flight.saturating_sub(1);
if matches!(self.circuit, CircuitState::HalfOpen) {
self.circuit = CircuitState::Closed;
self.open_cycles = 0;
}
self.consecutive_throttles = 0;
if self.limits.adaptive {
self.consecutive_successes = self.consecutive_successes.saturating_add(1);
if self.consecutive_successes >= SUCCESSES_PER_INCREASE {
self.consecutive_successes = 0;
if self.concurrency_limit < self.limits.max_concurrency {
self.concurrency_limit += 1;
}
}
}
let _ = now_ms;
}
fn release_throttle(
&mut self,
now_ms: u128,
signal: ThrottleSignal,
retry_after_ms: Option<u64>,
) {
self.in_flight = self.in_flight.saturating_sub(1);
self.last_signal = Some(signal);
self.consecutive_successes = 0;
if self.limits.adaptive {
let halved = (self.concurrency_limit / 2).max(self.limits.min_concurrency);
self.concurrency_limit = halved;
}
if matches!(self.circuit, CircuitState::HalfOpen) {
self.open_circuit(now_ms, retry_after_ms);
return;
}
self.consecutive_throttles = self.consecutive_throttles.saturating_add(1);
if self.consecutive_throttles >= THROTTLE_STREAK_TO_OPEN {
self.open_circuit(now_ms, retry_after_ms);
}
}
fn release_neutral(&mut self, _now_ms: u128) {
self.in_flight = self.in_flight.saturating_sub(1);
if matches!(self.circuit, CircuitState::HalfOpen) {
}
}
fn open_circuit(&mut self, now_ms: u128, retry_after_ms: Option<u64>) {
self.open_cycles = self.open_cycles.saturating_add(1);
let window = self.backoff_window_ms(retry_after_ms);
self.circuit = CircuitState::Open {
until_ms: now_ms.saturating_add(u128::from(window)),
};
self.consecutive_throttles = 0;
}
fn backoff_window_ms(&mut self, retry_after_ms: Option<u64>) -> u64 {
let exp = self.open_cycles.saturating_sub(1);
let mut window =
self.limits.backoff_base_ms as f64 * self.limits.backoff_multiplier.powi(exp as i32);
if !window.is_finite() {
window = self.limits.backoff_max_ms as f64;
}
let mut window = (window as u64).min(self.limits.backoff_max_ms);
if self.limits.backoff_jitter && window > 0 {
let half = window / 2;
let span = window - half;
let draw = if span > 0 {
self.next_jitter() % (span + 1)
} else {
0
};
window = half + draw;
}
window.max(retry_after_ms.unwrap_or(0))
}
fn next_jitter(&mut self) -> u64 {
let mut x = self.jitter_state;
x ^= x >> 12;
x ^= x << 25;
x ^= x >> 27;
self.jitter_state = x;
x.wrapping_mul(0x2545_F491_4F6C_DD1D)
}
pub fn snapshot(&self) -> GovernorSnapshot {
GovernorSnapshot {
concurrency_limit: self.concurrency_limit,
max_concurrency: self.limits.max_concurrency,
min_concurrency: self.limits.min_concurrency,
in_flight: self.in_flight,
circuit_state: self.circuit.label(),
consecutive_throttles: self.consecutive_throttles,
open_cycles: self.open_cycles,
last_signal: self.last_signal.map(|s| s.label()),
rpm: self.limits.rpm,
tpm: self.limits.tpm,
adaptive: self.limits.adaptive,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GateOutcome {
Proceed,
Wait(Duration),
CircuitOpen(Duration),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GovernorSnapshot {
pub concurrency_limit: u32,
pub max_concurrency: u32,
pub min_concurrency: u32,
pub in_flight: u32,
pub circuit_state: &'static str,
pub consecutive_throttles: u32,
pub open_cycles: u32,
pub last_signal: Option<&'static str>,
pub rpm: Option<u32>,
pub tpm: Option<u64>,
pub adaptive: bool,
}
impl GovernorSnapshot {
pub fn to_json(&self) -> serde_json::Value {
serde_json::json!({
"concurrency_limit": self.concurrency_limit,
"max_concurrency": self.max_concurrency,
"min_concurrency": self.min_concurrency,
"in_flight": self.in_flight,
"circuit_state": self.circuit_state,
"consecutive_throttles": self.consecutive_throttles,
"open_cycles": self.open_cycles,
"last_signal": self.last_signal,
"rpm": self.rpm,
"tpm": self.tpm,
"adaptive": self.adaptive,
})
}
}
pub fn org_key_id(api_key: &str) -> String {
let trimmed = api_key.trim();
if trimmed.is_empty() {
return "default".to_string();
}
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for byte in trimmed.as_bytes() {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(0x0000_0100_0000_01B3);
}
format!("k{hash:016x}")
}
fn route_key(provider: &str, org_key: &str) -> String {
format!("{}::{}", provider.trim().to_ascii_lowercase(), org_key)
}
#[derive(Default)]
struct Registry {
governors: HashMap<String, ProviderGovernor>,
}
fn registry() -> &'static Mutex<Registry> {
static REGISTRY: OnceLock<Mutex<Registry>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(Registry::default()))
}
fn with_governor<R>(
provider: &str,
org_key: &str,
f: impl FnOnce(&mut ProviderGovernor) -> R,
) -> R {
let key = route_key(provider, org_key);
let mut reg = registry()
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let governor = reg.governors.entry(key).or_insert_with(|| {
let limits = ResolvedLimits::from_catalog(
super::capabilities::provider_limits_for(provider).as_ref(),
);
ProviderGovernor::new(limits)
});
f(governor)
}
pub fn gate(provider: &str, org_key: &str, est_tokens: u64) -> GateOutcome {
if !enabled() {
return GateOutcome::Proceed;
}
let now_ms = crate::clock_mock::instant_now().as_millis();
with_governor(provider, org_key, |g| g.decide(now_ms, est_tokens))
}
pub fn provider_already_throttled(provider: &str, org_key: &str) -> bool {
if !enabled() {
return false;
}
with_governor(provider, org_key, |g| g.is_throttled())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GovernorOutcome {
Served,
Throttled {
signal: ThrottleSignal,
retry_after_ms: Option<u64>,
},
Neutral,
}
pub fn record_outcome(provider: &str, org_key: &str, outcome: GovernorOutcome) {
if !enabled() {
return;
}
let now_ms = crate::clock_mock::instant_now().as_millis();
with_governor(provider, org_key, |g| match outcome {
GovernorOutcome::Served => g.release_success(now_ms),
GovernorOutcome::Throttled {
signal,
retry_after_ms,
} => g.release_throttle(now_ms, signal, retry_after_ms),
GovernorOutcome::Neutral => g.release_neutral(now_ms),
});
}
pub fn snapshot(provider: &str, org_key: &str) -> Option<GovernorSnapshot> {
let key = route_key(provider, org_key);
let reg = registry()
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
reg.governors.get(&key).map(ProviderGovernor::snapshot)
}
pub fn circuit_is_open(provider: &str, org_key: &str) -> bool {
if !enabled() {
return false;
}
let now_ms = crate::clock_mock::instant_now().as_millis();
let key = route_key(provider, org_key);
let mut reg = registry()
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
match reg.governors.get_mut(&key) {
Some(g) => matches!(g.tick_circuit(now_ms), CircuitState::Open { .. }),
None => false,
}
}
pub fn all_snapshots() -> Vec<(String, GovernorSnapshot)> {
let reg = registry()
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let mut out: Vec<(String, GovernorSnapshot)> = reg
.governors
.iter()
.map(|(k, g)| (k.clone(), g.snapshot()))
.collect();
out.sort_by(|a, b| a.0.cmp(&b.0));
out
}
pub fn resolved_limits_for(provider: &str) -> ResolvedLimits {
ResolvedLimits::from_catalog(super::capabilities::provider_limits_for(provider).as_ref())
}
pub fn configured_limit_providers() -> Vec<String> {
super::capabilities::provider_limit_providers()
}
pub fn build_throttle_record(
provider: &str,
org_key: &str,
signal: ThrottleSignal,
http_status: Option<u16>,
retry_after_ms: Option<u64>,
timestamp: String,
) -> serde_json::Value {
serde_json::json!({
"type": "provider_throttle",
"timestamp": timestamp,
"provider": provider,
"org_key_id": org_key,
"signal_type": signal.label(),
"http_status": http_status,
"retry_after_ms": retry_after_ms,
})
}
pub fn build_state_record(
provider: &str,
org_key: &str,
snapshot: &GovernorSnapshot,
timestamp: String,
) -> serde_json::Value {
let mut record = snapshot.to_json();
if let Some(obj) = record.as_object_mut() {
obj.insert("type".to_string(), serde_json::json!("governor_state"));
obj.insert("timestamp".to_string(), serde_json::json!(timestamp));
obj.insert("provider".to_string(), serde_json::json!(provider));
obj.insert("org_key_id".to_string(), serde_json::json!(org_key));
}
record
}
pub fn reset_for_tests() {
let mut reg = registry()
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
reg.governors.clear();
}
#[cfg(test)]
mod tests {
use super::*;
use crate::clock_mock::{install_override, MockClock};
static TEST_ENV_LOCK: Mutex<()> = Mutex::new(());
struct GovernorEnabledGuard {
_lock: std::sync::MutexGuard<'static, ()>,
prev: Option<String>,
}
impl GovernorEnabledGuard {
fn on() -> Self {
let lock = TEST_ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev = std::env::var(RATE_GOVERNOR_ENABLED_ENV).ok();
std::env::set_var(RATE_GOVERNOR_ENABLED_ENV, "1");
reset_for_tests();
Self { _lock: lock, prev }
}
}
impl Drop for GovernorEnabledGuard {
fn drop(&mut self) {
match self.prev.take() {
Some(v) => std::env::set_var(RATE_GOVERNOR_ENABLED_ENV, v),
None => std::env::remove_var(RATE_GOVERNOR_ENABLED_ENV),
}
reset_for_tests();
}
}
fn limits(max: u32, min: u32) -> ResolvedLimits {
ResolvedLimits {
max_concurrency: max,
min_concurrency: min,
..Default::default()
}
}
#[test]
fn classify_429_from_status() {
assert_eq!(
ThrottleSignal::classify(Some(429), "", false, false),
Some(ThrottleSignal::RateLimit429)
);
}
#[test]
fn classify_429_from_anthropic_body() {
assert_eq!(
ThrottleSignal::classify(None, "type: rate_limit_error", false, false),
Some(ThrottleSignal::RateLimit429)
);
assert_eq!(
ThrottleSignal::classify(None, "server temporarily limiting requests", false, false),
Some(ThrottleSignal::RateLimit429)
);
}
#[test]
fn classify_overloaded_from_status_and_body() {
assert_eq!(
ThrottleSignal::classify(Some(529), "", false, false),
Some(ThrottleSignal::Overloaded)
);
assert_eq!(
ThrottleSignal::classify(Some(503), "", false, false),
Some(ThrottleSignal::Overloaded)
);
assert_eq!(
ThrottleSignal::classify(None, "overloaded_error", false, false),
Some(ThrottleSignal::Overloaded)
);
}
#[test]
fn empty_completion_is_throttle_only_under_load() {
assert_eq!(
ThrottleSignal::classify(None, "", true, false),
None,
"empty on healthy provider must not be a throttle signal"
);
assert_eq!(
ThrottleSignal::classify(None, "", true, true),
Some(ThrottleSignal::EmptyUnderLoad)
);
}
#[test]
fn clean_success_has_no_signal() {
assert_eq!(
ThrottleSignal::classify(Some(200), "ok", false, false),
None
);
}
#[test]
fn default_limits_are_conservative() {
let l = ResolvedLimits::default();
assert_eq!(l.max_concurrency, DEFAULT_MAX_CONCURRENCY);
assert_eq!(l.min_concurrency, DEFAULT_MIN_CONCURRENCY);
assert!(l.adaptive);
}
#[test]
fn catalog_overlay_clamps_min_below_max() {
let row = ProviderLimits {
max_concurrency: Some(4),
min_concurrency: Some(10), ..Default::default()
};
let l = ResolvedLimits::from_catalog(Some(&row));
assert_eq!(l.max_concurrency, 4);
assert_eq!(l.min_concurrency, 4, "min clamped to max");
}
#[test]
fn aimd_additive_increase_toward_max() {
let mut g = ProviderGovernor::new(limits(4, 1));
g.concurrency_limit = 1;
for _ in 0..SUCCESSES_PER_INCREASE {
g.in_flight = 1;
g.release_success(0);
}
assert_eq!(g.concurrency_limit, 2);
for _ in 0..(SUCCESSES_PER_INCREASE * 20) {
g.in_flight = 1;
g.release_success(0);
}
assert_eq!(g.concurrency_limit, 4, "capped at max_concurrency");
}
#[test]
fn aimd_multiplicative_decrease_halves_to_floor() {
let mut g = ProviderGovernor::new(limits(16, 2));
assert_eq!(g.concurrency_limit, 16);
g.in_flight = 1;
g.release_throttle(0, ThrottleSignal::RateLimit429, None);
assert_eq!(g.concurrency_limit, 8);
g.in_flight = 1;
g.release_throttle(0, ThrottleSignal::RateLimit429, None);
assert_eq!(g.concurrency_limit, 4);
g.in_flight = 1;
g.release_throttle(0, ThrottleSignal::RateLimit429, None);
assert_eq!(g.concurrency_limit, 2, "floored at min");
g.in_flight = 1;
g.release_throttle(0, ThrottleSignal::RateLimit429, None);
assert_eq!(g.concurrency_limit, 2, "never below min");
}
#[test]
fn non_adaptive_pins_limit_at_max() {
let mut l = limits(8, 1);
l.adaptive = false;
let mut g = ProviderGovernor::new(l);
g.in_flight = 1;
g.release_throttle(0, ThrottleSignal::RateLimit429, None);
assert_eq!(g.concurrency_limit, 8, "fixed semaphore when non-adaptive");
}
#[test]
fn circuit_opens_after_throttle_streak() {
let mut g = ProviderGovernor::new(limits(8, 1));
assert_eq!(g.circuit, CircuitState::Closed);
for _ in 0..THROTTLE_STREAK_TO_OPEN {
g.in_flight = 1;
g.release_throttle(1_000, ThrottleSignal::RateLimit429, None);
}
assert!(matches!(g.circuit, CircuitState::Open { .. }));
}
#[test]
fn circuit_honors_retry_after_floor() {
let mut g = ProviderGovernor::new(limits(8, 1));
for _ in 0..THROTTLE_STREAK_TO_OPEN {
g.in_flight = 1;
g.release_throttle(1_000, ThrottleSignal::RateLimit429, Some(30_000));
}
match g.circuit {
CircuitState::Open { until_ms } => {
assert!(
until_ms >= 1_000 + 30_000,
"OPEN window must not wake before Retry-After"
);
}
other => panic!("expected Open, got {other:?}"),
}
}
#[test]
fn half_open_probe_success_closes_circuit() {
let mut g = ProviderGovernor::new(limits(8, 1));
for _ in 0..THROTTLE_STREAK_TO_OPEN {
g.in_flight = 1;
g.release_throttle(1_000, ThrottleSignal::RateLimit429, Some(5_000));
}
assert!(matches!(g.decide(2_000, 0), GateOutcome::CircuitOpen(_)));
let now = 1_000 + 5_000 + 1;
assert_eq!(g.decide(now, 0), GateOutcome::Proceed);
assert_eq!(g.circuit, CircuitState::HalfOpen);
assert!(matches!(g.decide(now, 0), GateOutcome::Wait(_)));
g.release_success(now);
assert_eq!(g.circuit, CircuitState::Closed);
assert_eq!(g.open_cycles, 0);
}
#[test]
fn half_open_probe_throttle_reopens_with_grown_backoff() {
let mut g = ProviderGovernor::new(limits(8, 1));
for _ in 0..THROTTLE_STREAK_TO_OPEN {
g.in_flight = 1;
g.release_throttle(1_000, ThrottleSignal::RateLimit429, None);
}
let first_open = match g.circuit {
CircuitState::Open { until_ms } => until_ms,
other => panic!("expected Open, got {other:?}"),
};
let now = first_open as u128 + 1;
assert_eq!(g.decide(now, 0), GateOutcome::Proceed);
g.release_throttle(now, ThrottleSignal::RateLimit429, None);
match g.circuit {
CircuitState::Open { until_ms } => {
let first_window = first_open - 1_000;
let second_window = until_ms - now;
assert!(
second_window >= first_window,
"exp backoff must not shrink: {second_window} >= {first_window}"
);
}
other => panic!("expected re-Open, got {other:?}"),
}
}
#[test]
fn backoff_grows_exponentially_up_to_max() {
let mut l = limits(8, 1);
l.backoff_base_ms = 1_000;
l.backoff_max_ms = 8_000;
l.backoff_multiplier = 2.0;
l.backoff_jitter = false; let mut g = ProviderGovernor::new(l);
g.open_cycles = 1;
assert_eq!(g.backoff_window_ms(None), 1_000);
g.open_cycles = 2;
assert_eq!(g.backoff_window_ms(None), 2_000);
g.open_cycles = 3;
assert_eq!(g.backoff_window_ms(None), 4_000);
g.open_cycles = 4;
assert_eq!(g.backoff_window_ms(None), 8_000);
g.open_cycles = 5;
assert_eq!(g.backoff_window_ms(None), 8_000, "capped at max");
}
#[test]
fn full_jitter_stays_within_window() {
let mut l = limits(8, 1);
l.backoff_base_ms = 10_000;
l.backoff_max_ms = 10_000;
l.backoff_jitter = true;
let mut g = ProviderGovernor::new(l);
g.open_cycles = 1;
for _ in 0..1_000 {
let w = g.backoff_window_ms(None);
assert!((5_000..=10_000).contains(&w), "jitter out of band: {w}");
}
}
#[test]
fn tpm_bucket_blocks_over_limit_then_recovers() {
let mut b = TokenBucket::new(100);
assert_eq!(b.wait_for(0, 60), None);
b.record(0, 60);
assert!(
b.wait_for(0, 60).is_some(),
"second charge exceeds the window and must wait"
);
assert_eq!(b.wait_for(0, 40), None);
b.record(0, 40);
assert!(b.wait_for(0, 1).is_some());
assert_eq!(b.wait_for(60_001, 60), None);
}
#[test]
fn rpm_bucket_limits_requests_per_window() {
let mut b = TokenBucket::new(2);
assert_eq!(b.wait_for(0, 1), None);
b.record(0, 1);
assert_eq!(b.wait_for(0, 1), None);
b.record(0, 1);
assert!(b.wait_for(0, 1).is_some(), "3rd request in window waits");
}
#[test]
fn gate_no_op_when_flag_off() {
let _lock = TEST_ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev = std::env::var(RATE_GOVERNOR_ENABLED_ENV).ok();
std::env::remove_var(RATE_GOVERNOR_ENABLED_ENV);
reset_for_tests();
assert_eq!(gate("anthropic", "default", 0), GateOutcome::Proceed);
record_outcome("anthropic", "default", GovernorOutcome::Served);
assert!(snapshot("anthropic", "default").is_none());
if let Some(v) = prev {
std::env::set_var(RATE_GOVERNOR_ENABLED_ENV, v);
}
}
#[test]
fn org_key_id_fingerprints_without_leaking_secret() {
let id = org_key_id("sk-ant-super-secret-1234567890");
assert!(!id.contains("secret"));
assert!(!id.contains("sk-ant"));
assert_eq!(org_key_id(""), "default");
assert_ne!(org_key_id("key-a"), org_key_id("key-b"));
assert_eq!(org_key_id("key-a"), org_key_id("key-a"));
}
#[test]
fn throttle_record_carries_structured_fields() {
let rec = build_throttle_record(
"anthropic",
"kdeadbeef",
ThrottleSignal::RateLimit429,
Some(429),
Some(30_000),
"1.234".to_string(),
);
assert_eq!(rec["type"], "provider_throttle");
assert_eq!(rec["provider"], "anthropic");
assert_eq!(rec["org_key_id"], "kdeadbeef");
assert_eq!(rec["signal_type"], "rate_limit_429");
assert_eq!(rec["http_status"], 429);
assert_eq!(rec["retry_after_ms"], 30_000);
}
#[test]
fn mechanism_fitness_backoff_recover_without_starving_healthy() {
let _env = GovernorEnabledGuard::on();
let _clock = install_override(MockClock::at_wall_ms(0));
let bad = ("anthropic", "orgA");
let good = ("openai", "orgB");
for _ in 0..20 {
assert_eq!(gate(good.0, good.1, 0), GateOutcome::Proceed);
record_outcome(good.0, good.1, GovernorOutcome::Served);
}
let good_snap = snapshot(good.0, good.1).expect("healthy governor exists");
assert_eq!(good_snap.circuit_state, "closed");
assert!(
good_snap.concurrency_limit >= good_snap.min_concurrency,
"healthy provider not starved"
);
assert!(!circuit_is_open(good.0, good.1));
for _ in 0..THROTTLE_STREAK_TO_OPEN {
assert_eq!(gate(bad.0, bad.1, 0), GateOutcome::Proceed);
record_outcome(
bad.0,
bad.1,
GovernorOutcome::Throttled {
signal: ThrottleSignal::RateLimit429,
retry_after_ms: Some(5_000),
},
);
}
let bad_snap = snapshot(bad.0, bad.1).expect("bad governor exists");
assert_eq!(bad_snap.circuit_state, "open");
assert!(
bad_snap.concurrency_limit < bad_snap.max_concurrency,
"AIMD decreased concurrency on the throttled provider"
);
assert!(
circuit_is_open(bad.0, bad.1),
"auto-response seam sees OPEN"
);
assert!(matches!(gate(bad.0, bad.1, 0), GateOutcome::CircuitOpen(_)));
assert_eq!(gate(good.0, good.1, 0), GateOutcome::Proceed);
record_outcome(good.0, good.1, GovernorOutcome::Served);
assert!(!circuit_is_open(good.0, good.1));
crate::clock_mock::advance(Duration::from_secs(6));
assert_eq!(
gate(bad.0, bad.1, 0),
GateOutcome::Proceed,
"probe admitted"
);
record_outcome(bad.0, bad.1, GovernorOutcome::Served);
let recovered = snapshot(bad.0, bad.1).expect("bad governor exists");
assert_eq!(recovered.circuit_state, "closed", "recovered to CLOSED");
assert!(!circuit_is_open(bad.0, bad.1));
}
#[test]
fn mechanism_fitness_no_throttle_path_unthrottled() {
let _env = GovernorEnabledGuard::on();
let _clock = install_override(MockClock::at_wall_ms(0));
for _ in 0..100 {
assert_eq!(gate("openai", "org", 0), GateOutcome::Proceed);
record_outcome("openai", "org", GovernorOutcome::Served);
}
let snap = snapshot("openai", "org").unwrap();
assert_eq!(snap.circuit_state, "closed");
assert_eq!(snap.in_flight, 0);
}
}