use std::collections::VecDeque;
use std::sync::{
Arc, LazyLock, Mutex,
atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
};
use std::thread;
use std::time::{Duration, Instant};
const DEFAULT_MIN_CAPACITY_PCT: u32 = 25;
const DEFAULT_MAX_LOAD_PER_CORE: f32 = 1.25;
const DEFAULT_SEVERE_LOAD_PER_CORE: f32 = 1.75;
const DEFAULT_MAX_PSI_AVG10: f32 = 20.0;
const DEFAULT_SEVERE_PSI_AVG10: f32 = 40.0;
const DEFAULT_GROWTH_CONSECUTIVE_HEALTHY_TICKS: u32 = 3;
const DEFAULT_TICK_SECS: u64 = 2;
const DEFAULT_MAX_INFLIGHT_BYTES: usize = 512 * 1024 * 1024;
const DEFAULT_MAX_INFLIGHT_MEMORY_FRACTION_DENOMINATOR: u64 = 32;
const DEFAULT_MAX_INFLIGHT_BYTES_CEILING: u64 = 16 * 1024 * 1024 * 1024;
const DEFAULT_MIN_INFLIGHT_BYTES: usize = 1024 * 1024;
const TELEMETRY_DECISION_HISTORY: usize = 128;
const DEFAULT_CONFORMAL_K: usize = 256;
const DEFAULT_CONFORMAL_K_MIN: usize = 32;
const DEFAULT_CONFORMAL_ALPHA_PRESSURED: f32 = 0.05;
const DEFAULT_CONFORMAL_ALPHA_SEVERE: f32 = 0.01;
const DEFAULT_DRIFT_DELTA: f32 = 0.01;
const DEFAULT_DRIFT_LAMBDA: f32 = 0.5;
const MAD_OUTLIER_K: f32 = 3.5;
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CalibrationMode {
Static,
Conformal,
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct GovernorConfig {
pub available_parallelism: usize,
pub reserved_cores: usize,
pub max_workers: usize,
pub max_inflight_bytes: usize,
pub min_inflight_bytes: usize,
pub min_capacity_pct: u32,
pub max_load_per_core: f32,
pub severe_load_per_core: f32,
pub max_psi_avg10: f32,
pub severe_psi_avg10: f32,
pub growth_consecutive_healthy_ticks: u32,
pub tick: Duration,
pub disabled: bool,
pub calibration_mode: CalibrationMode,
pub conformal_k: usize,
pub conformal_k_min: usize,
pub conformal_alpha_pressured: f32,
pub conformal_alpha_severe: f32,
pub drift_delta: f32,
pub drift_lambda: f32,
}
impl GovernorConfig {
pub fn from_env() -> Self {
let available_parallelism = available_parallelism();
let reserved_cores = env_usize("CASS_RESPONSIVENESS_RESERVED_CORES")
.unwrap_or_else(|| default_reserved_cores_for_available(available_parallelism))
.min(available_parallelism.saturating_sub(1));
let worker_ceiling = worker_ceiling_for(available_parallelism, reserved_cores);
let max_workers = env_usize("CASS_RESPONSIVENESS_MAX_WORKERS")
.filter(|v| *v > 0)
.map(|v| v.min(worker_ceiling))
.unwrap_or(worker_ceiling)
.max(1);
let max_inflight_bytes = env_usize("CASS_RESPONSIVENESS_MAX_INFLIGHT_BYTES")
.filter(|v| *v > 0)
.unwrap_or_else(default_max_inflight_bytes);
let min_inflight_bytes = env_usize("CASS_RESPONSIVENESS_MIN_INFLIGHT_BYTES")
.filter(|v| *v > 0)
.map(|v| v.min(max_inflight_bytes))
.unwrap_or(DEFAULT_MIN_INFLIGHT_BYTES.min(max_inflight_bytes))
.max(1);
let min_capacity_pct = env_u32("CASS_RESPONSIVENESS_MIN_CAPACITY_PCT")
.map(|v| v.clamp(10, 100))
.unwrap_or(DEFAULT_MIN_CAPACITY_PCT);
let max_load_per_core =
env_f32("CASS_RESPONSIVENESS_MAX_LOAD_PER_CORE").unwrap_or(DEFAULT_MAX_LOAD_PER_CORE);
let severe_load_per_core = env_f32("CASS_RESPONSIVENESS_SEVERE_LOAD_PER_CORE")
.unwrap_or(DEFAULT_SEVERE_LOAD_PER_CORE);
let max_psi_avg10 =
env_f32("CASS_RESPONSIVENESS_MAX_PSI_AVG10").unwrap_or(DEFAULT_MAX_PSI_AVG10);
let severe_psi_avg10 =
env_f32("CASS_RESPONSIVENESS_SEVERE_PSI_AVG10").unwrap_or(DEFAULT_SEVERE_PSI_AVG10);
let growth_consecutive_healthy_ticks = env_u32("CASS_RESPONSIVENESS_GROWTH_TICKS")
.unwrap_or(DEFAULT_GROWTH_CONSECUTIVE_HEALTHY_TICKS);
let tick_secs = env_u32("CASS_RESPONSIVENESS_TICK_SECS")
.map(|v| v as u64)
.filter(|v| *v > 0)
.unwrap_or(DEFAULT_TICK_SECS);
let disabled = env_bool_truthy("CASS_RESPONSIVENESS_DISABLE");
let calibration_mode = match dotenvy::var("CASS_RESPONSIVENESS_CALIBRATION")
.ok()
.as_deref()
.map(str::trim)
.map(str::to_ascii_lowercase)
.as_deref()
{
Some("static") => CalibrationMode::Static,
_ => CalibrationMode::Conformal,
};
let conformal_k = env_u32("CASS_RESPONSIVENESS_CONFORMAL_K")
.map(|v| (v as usize).clamp(16, 4096))
.unwrap_or(DEFAULT_CONFORMAL_K);
let conformal_k_min = env_u32("CASS_RESPONSIVENESS_CONFORMAL_K_MIN")
.map(|v| (v as usize).clamp(4, conformal_k))
.unwrap_or(DEFAULT_CONFORMAL_K_MIN.min(conformal_k));
let conformal_alpha_pressured = env_f32("CASS_RESPONSIVENESS_CONFORMAL_ALPHA_PRESSURED")
.filter(|v| v.is_finite() && *v > 0.0 && *v < 0.5)
.unwrap_or(DEFAULT_CONFORMAL_ALPHA_PRESSURED);
let conformal_alpha_severe = env_f32("CASS_RESPONSIVENESS_CONFORMAL_ALPHA_SEVERE")
.filter(|v| v.is_finite() && *v > 0.0 && *v < conformal_alpha_pressured)
.unwrap_or_else(|| DEFAULT_CONFORMAL_ALPHA_SEVERE.min(conformal_alpha_pressured * 0.5));
let drift_delta = env_f32("CASS_RESPONSIVENESS_DRIFT_DELTA")
.filter(|v| v.is_finite() && *v > 0.0 && *v < 10.0)
.unwrap_or(DEFAULT_DRIFT_DELTA);
let drift_lambda = env_f32("CASS_RESPONSIVENESS_DRIFT_LAMBDA")
.filter(|v| v.is_finite() && *v > 0.0 && *v < 100.0)
.unwrap_or(DEFAULT_DRIFT_LAMBDA);
Self {
available_parallelism,
reserved_cores,
max_workers,
max_inflight_bytes,
min_inflight_bytes,
min_capacity_pct,
max_load_per_core,
severe_load_per_core,
max_psi_avg10,
severe_psi_avg10,
growth_consecutive_healthy_ticks,
tick: Duration::from_secs(tick_secs),
disabled,
calibration_mode,
conformal_k,
conformal_k_min,
conformal_alpha_pressured,
conformal_alpha_severe,
drift_delta,
drift_lambda,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize)]
pub(crate) struct HealthSnapshot {
pub load_per_core: Option<f32>,
pub psi_cpu_some_avg10: Option<f32>,
}
impl HealthSnapshot {
pub fn is_severe(&self, cfg: &GovernorConfig) -> bool {
self.load_per_core
.is_some_and(|v| v > cfg.severe_load_per_core)
|| self
.psi_cpu_some_avg10
.is_some_and(|v| v > cfg.severe_psi_avg10)
}
pub fn is_pressured(&self, cfg: &GovernorConfig) -> bool {
self.load_per_core
.is_some_and(|v| v > cfg.max_load_per_core)
|| self
.psi_cpu_some_avg10
.is_some_and(|v| v > cfg.max_psi_avg10)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum GovernorDecisionReason {
Disabled,
Severe,
Pressured,
PressuredFloorHold,
HealthyHold,
HealthyGrow,
HealthyCeilingHold,
}
pub(crate) fn next_capacity(
prev_capacity_pct: u32,
healthy_streak: u32,
snapshot: &HealthSnapshot,
cfg: &GovernorConfig,
) -> (u32, u32, GovernorDecisionReason) {
if cfg.disabled {
return (100, 0, GovernorDecisionReason::Disabled);
}
if snapshot.is_severe(cfg) {
return (cfg.min_capacity_pct, 0, GovernorDecisionReason::Severe);
}
if snapshot.is_pressured(cfg) {
let step_down = prev_capacity_pct
.saturating_sub(25)
.max(cfg.min_capacity_pct);
let reason = if step_down == prev_capacity_pct {
GovernorDecisionReason::PressuredFloorHold
} else {
GovernorDecisionReason::Pressured
};
return (step_down, 0, reason);
}
let new_streak = healthy_streak.saturating_add(1);
if new_streak >= cfg.growth_consecutive_healthy_ticks {
let grown = prev_capacity_pct.saturating_add(25).min(100);
if grown > prev_capacity_pct {
return (grown, 0, GovernorDecisionReason::HealthyGrow);
}
(
grown,
new_streak,
GovernorDecisionReason::HealthyCeilingHold,
)
} else {
(
prev_capacity_pct,
new_streak,
GovernorDecisionReason::HealthyHold,
)
}
}
pub(crate) fn scale_worker_count(desired: usize, capacity_pct: u32) -> usize {
if desired == 0 {
return 0;
}
let capacity = capacity_pct.clamp(1, 100) as usize;
let scaled = desired.saturating_mul(capacity) / 100;
scaled.max(1)
}
fn available_parallelism() -> usize {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
}
pub(crate) fn default_reserved_cores_for_available(available_parallelism: usize) -> usize {
match available_parallelism {
0 | 1 => 0,
2..=4 => 1,
5..=16 => 2,
n => (n / 8).clamp(2, 8),
}
}
fn worker_ceiling_for(available_parallelism: usize, reserved_cores: usize) -> usize {
available_parallelism
.max(1)
.saturating_sub(reserved_cores)
.max(1)
}
pub(crate) fn scale_worker_count_with_policy(
desired: usize,
capacity_pct: u32,
cfg: &GovernorConfig,
) -> usize {
if desired == 0 {
return 0;
}
let ceiling = worker_ceiling_for(cfg.available_parallelism, cfg.reserved_cores)
.min(cfg.max_workers.max(1));
scale_worker_count(desired.min(ceiling), capacity_pct)
}
pub(crate) fn scale_inflight_byte_limit(
desired_bytes: usize,
capacity_pct: u32,
cfg: &GovernorConfig,
) -> usize {
if desired_bytes == 0 {
return 0;
}
let capped = desired_bytes.min(cfg.max_inflight_bytes.max(1));
let capacity = capacity_pct.clamp(1, 100) as usize;
let scaled = capped.saturating_mul(capacity) / 100;
scaled.max(cfg.min_inflight_bytes.min(capped)).max(1)
}
pub(crate) trait HealthReader: Send + Sync {
fn snapshot(&self) -> HealthSnapshot;
}
pub(crate) struct ProcHealthReader {
#[cfg(target_os = "linux")]
ncpu: usize,
}
impl ProcHealthReader {
pub fn new() -> Self {
Self {
#[cfg(target_os = "linux")]
ncpu: available_parallelism(),
}
}
}
impl HealthReader for ProcHealthReader {
#[cfg(target_os = "linux")]
fn snapshot(&self) -> HealthSnapshot {
let load_per_core = read_loadavg().map(|l1| {
if self.ncpu == 0 {
l1
} else {
l1 / self.ncpu as f32
}
});
let psi_cpu_some_avg10 = read_psi_cpu_some_avg10();
HealthSnapshot {
load_per_core,
psi_cpu_some_avg10,
}
}
#[cfg(not(target_os = "linux"))]
fn snapshot(&self) -> HealthSnapshot {
HealthSnapshot {
load_per_core: None,
psi_cpu_some_avg10: None,
}
}
}
#[cfg(target_os = "linux")]
fn read_loadavg() -> Option<f32> {
let raw = std::fs::read_to_string("/proc/loadavg").ok()?;
let first = raw.split_whitespace().next()?;
first.parse::<f32>().ok()
}
#[cfg(target_os = "linux")]
fn read_psi_cpu_some_avg10() -> Option<f32> {
let raw = std::fs::read_to_string("/proc/pressure/cpu").ok()?;
for line in raw.lines() {
if let Some(rest) = line.strip_prefix("some ") {
for token in rest.split_whitespace() {
if let Some(v) = token.strip_prefix("avg10=") {
return v.parse::<f32>().ok();
}
}
}
}
None
}
#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize)]
pub(crate) struct GovernorDecision {
pub at_elapsed_ms: u64,
pub prev_capacity_pct: u32,
pub next_capacity_pct: u32,
pub reason: GovernorDecisionReason,
pub snapshot: HealthSnapshot,
}
#[derive(Clone, Debug, serde::Serialize)]
pub(crate) struct GovernorTelemetry {
pub current_capacity_pct: u32,
pub resource_policy: ResourcePolicyTelemetry,
pub healthy_streak: u32,
pub shrink_count: u64,
pub grow_count: u64,
pub ticks_total: u64,
pub disabled_via_env: bool,
pub last_snapshot: Option<HealthSnapshot>,
pub last_reason: Option<GovernorDecisionReason>,
pub recent_decisions: Vec<GovernorDecision>,
pub calibration: Option<CalibrationTelemetry>,
}
#[derive(Clone, Copy, Debug, serde::Serialize)]
pub(crate) struct ResourcePolicyTelemetry {
pub available_parallelism: usize,
pub reserved_cores: usize,
pub max_workers: usize,
pub effective_worker_ceiling: usize,
pub max_inflight_bytes: usize,
pub min_inflight_bytes: usize,
}
impl ResourcePolicyTelemetry {
fn from_config(cfg: &GovernorConfig) -> Self {
let effective_worker_ceiling =
worker_ceiling_for(cfg.available_parallelism, cfg.reserved_cores)
.min(cfg.max_workers.max(1));
Self {
available_parallelism: cfg.available_parallelism,
reserved_cores: cfg.reserved_cores,
max_workers: cfg.max_workers,
effective_worker_ceiling,
max_inflight_bytes: cfg.max_inflight_bytes,
min_inflight_bytes: cfg.min_inflight_bytes,
}
}
}
pub(crate) fn conformal_quantile_sorted(scores: &[f32], alpha: f32) -> Option<f32> {
if scores.is_empty() || !alpha.is_finite() || alpha <= 0.0 || alpha >= 1.0 {
return None;
}
let k = scores.len();
let target = ((k as f32 + 1.0) * (1.0 - alpha)).ceil() as isize - 1;
let idx = target.clamp(0, (k as isize) - 1) as usize;
Some(scores[idx])
}
#[derive(Debug, Clone)]
struct CalibrationStream {
samples: VecDeque<f32>,
k: usize,
}
impl CalibrationStream {
fn new(k: usize) -> Self {
Self {
samples: VecDeque::with_capacity(k.max(1)),
k: k.max(1),
}
}
#[cfg(test)]
fn len(&self) -> usize {
self.samples.len()
}
fn clear(&mut self) {
self.samples.clear();
}
fn push(&mut self, value: f32) {
if !value.is_finite() {
return;
}
if self.samples.len() >= self.k {
self.samples.pop_front();
}
self.samples.push_back(value);
}
fn median(&self) -> Option<f32> {
if self.samples.is_empty() {
return None;
}
let mut buf: Vec<f32> = self.samples.iter().copied().collect();
buf.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let n = buf.len();
Some(if n.is_multiple_of(2) {
(buf[n / 2 - 1] + buf[n / 2]) / 2.0
} else {
buf[n / 2]
})
}
fn mad(&self) -> Option<f32> {
let med = self.median()?;
let mut deviations: Vec<f32> = self.samples.iter().map(|v| (v - med).abs()).collect();
deviations.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let n = deviations.len();
if n == 0 {
return None;
}
Some(if n.is_multiple_of(2) {
(deviations[n / 2 - 1] + deviations[n / 2]) / 2.0
} else {
deviations[n / 2]
})
}
fn is_outlier(&self, v: f32) -> bool {
const MIN_SAMPLES_FOR_GATE: usize = 8;
if self.samples.len() < MIN_SAMPLES_FOR_GATE {
return false;
}
let (Some(med), Some(mad)) = (self.median(), self.mad()) else {
return false;
};
if mad == 0.0 {
return false;
}
(v - med).abs() > MAD_OUTLIER_K * mad
}
fn quantile(&self, alpha: f32, k_min: usize) -> Option<f32> {
if self.samples.len() < k_min {
return None;
}
let mut sorted: Vec<f32> = self.samples.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
conformal_quantile_sorted(&sorted, alpha)
}
}
#[derive(Debug, Clone)]
struct PageHinkley {
delta: f32,
lambda: f32,
n: u64,
running_mean: f32,
cumulative: f32,
min_cumulative: f32,
}
impl PageHinkley {
fn new(delta: f32, lambda: f32) -> Self {
Self {
delta,
lambda,
n: 0,
running_mean: 0.0,
cumulative: 0.0,
min_cumulative: 0.0,
}
}
fn reset(&mut self) {
self.n = 0;
self.running_mean = 0.0;
self.cumulative = 0.0;
self.min_cumulative = 0.0;
}
fn observe(&mut self, v: f32) -> bool {
if !v.is_finite() {
return false;
}
self.n = self.n.saturating_add(1);
self.running_mean += (v - self.running_mean) / (self.n as f32);
self.cumulative += v - self.running_mean - self.delta;
if self.cumulative < self.min_cumulative {
self.min_cumulative = self.cumulative;
}
(self.cumulative - self.min_cumulative) > self.lambda
}
}
#[derive(Debug, Clone)]
struct SignalCalibration {
window: CalibrationStream,
drift: PageHinkley,
}
impl SignalCalibration {
fn new(cfg: &GovernorConfig) -> Self {
Self {
window: CalibrationStream::new(cfg.conformal_k),
drift: PageHinkley::new(cfg.drift_delta, cfg.drift_lambda),
}
}
fn observe(&mut self, v: f32, is_healthy_tick: bool) -> SignalObserveOutcome {
if !v.is_finite() {
return SignalObserveOutcome::NotFinite;
}
let drift_detected = self.drift.observe(v);
if drift_detected {
self.window.clear();
self.drift.reset();
return SignalObserveOutcome::DriftResetTriggered;
}
if !is_healthy_tick {
return SignalObserveOutcome::SkippedPressuredTick;
}
if self.window.is_outlier(v) {
return SignalObserveOutcome::RejectedOutlier;
}
self.window.push(v);
SignalObserveOutcome::Accepted
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum SignalObserveOutcome {
Accepted,
SkippedPressuredTick,
RejectedOutlier,
DriftResetTriggered,
NotFinite,
}
#[derive(Debug, Clone)]
struct GovernorCalibration {
load: SignalCalibration,
psi: SignalCalibration,
drift_reset_count: u64,
outliers_rejected: u64,
observations_total: u64,
}
impl GovernorCalibration {
fn new(cfg: &GovernorConfig) -> Self {
Self {
load: SignalCalibration::new(cfg),
psi: SignalCalibration::new(cfg),
drift_reset_count: 0,
outliers_rejected: 0,
observations_total: 0,
}
}
fn observe_and_compute_thresholds(
&mut self,
snapshot: &HealthSnapshot,
is_healthy_tick: bool,
cfg: &GovernorConfig,
) -> Option<EffectiveThresholds> {
self.observations_total = self.observations_total.saturating_add(1);
if let Some(v) = snapshot.load_per_core {
let outcome = self.load.observe(v, is_healthy_tick);
match outcome {
SignalObserveOutcome::RejectedOutlier => {
self.outliers_rejected = self.outliers_rejected.saturating_add(1);
}
SignalObserveOutcome::DriftResetTriggered => {
self.drift_reset_count = self.drift_reset_count.saturating_add(1);
}
_ => {}
}
}
if let Some(v) = snapshot.psi_cpu_some_avg10 {
let outcome = self.psi.observe(v, is_healthy_tick);
match outcome {
SignalObserveOutcome::RejectedOutlier => {
self.outliers_rejected = self.outliers_rejected.saturating_add(1);
}
SignalObserveOutcome::DriftResetTriggered => {
self.drift_reset_count = self.drift_reset_count.saturating_add(1);
}
_ => {}
}
}
let load_pressured = self
.load
.window
.quantile(cfg.conformal_alpha_pressured, cfg.conformal_k_min);
let load_severe = self
.load
.window
.quantile(cfg.conformal_alpha_severe, cfg.conformal_k_min);
let psi_pressured = self
.psi
.window
.quantile(cfg.conformal_alpha_pressured, cfg.conformal_k_min);
let psi_severe = self
.psi
.window
.quantile(cfg.conformal_alpha_severe, cfg.conformal_k_min);
let (lp, ls, pp, ps) = (load_pressured?, load_severe?, psi_pressured?, psi_severe?);
if ls <= lp || ps <= pp {
return None;
}
Some(EffectiveThresholds {
pressured_load: lp,
severe_load: ls,
pressured_psi: pp,
severe_psi: ps,
})
}
#[cfg(test)]
fn telemetry(&self, cfg: &GovernorConfig) -> CalibrationTelemetry {
CalibrationTelemetry {
mode: cfg.calibration_mode,
load_window_len: self.load.window.len(),
psi_window_len: self.psi.window.len(),
conformal_k: cfg.conformal_k,
conformal_k_min: cfg.conformal_k_min,
conformal_alpha_pressured: cfg.conformal_alpha_pressured,
conformal_alpha_severe: cfg.conformal_alpha_severe,
drift_reset_count: self.drift_reset_count,
outliers_rejected: self.outliers_rejected,
observations_total: self.observations_total,
load_pressured_q: self
.load
.window
.quantile(cfg.conformal_alpha_pressured, cfg.conformal_k_min),
load_severe_q: self
.load
.window
.quantile(cfg.conformal_alpha_severe, cfg.conformal_k_min),
psi_pressured_q: self
.psi
.window
.quantile(cfg.conformal_alpha_pressured, cfg.conformal_k_min),
psi_severe_q: self
.psi
.window
.quantile(cfg.conformal_alpha_severe, cfg.conformal_k_min),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize)]
pub(crate) struct EffectiveThresholds {
pub pressured_load: f32,
pub severe_load: f32,
pub pressured_psi: f32,
pub severe_psi: f32,
}
impl EffectiveThresholds {
fn apply_to(self, cfg: &GovernorConfig) -> GovernorConfig {
GovernorConfig {
max_load_per_core: self.pressured_load,
severe_load_per_core: self.severe_load,
max_psi_avg10: self.pressured_psi,
severe_psi_avg10: self.severe_psi,
..*cfg
}
}
}
#[derive(Clone, Debug, serde::Serialize)]
pub(crate) struct CalibrationTelemetry {
pub mode: CalibrationMode,
pub load_window_len: usize,
pub psi_window_len: usize,
pub conformal_k: usize,
pub conformal_k_min: usize,
pub conformal_alpha_pressured: f32,
pub conformal_alpha_severe: f32,
pub drift_reset_count: u64,
pub outliers_rejected: u64,
pub observations_total: u64,
pub load_pressured_q: Option<f32>,
pub load_severe_q: Option<f32>,
pub psi_pressured_q: Option<f32>,
pub psi_severe_q: Option<f32>,
}
struct GovernorRuntimeState {
recent_decisions: VecDeque<GovernorDecision>,
last_snapshot: Option<HealthSnapshot>,
last_reason: Option<GovernorDecisionReason>,
calibration: Option<GovernorCalibration>,
}
impl GovernorRuntimeState {
fn new(cfg: &GovernorConfig) -> Self {
let calibration = match cfg.calibration_mode {
CalibrationMode::Conformal => Some(GovernorCalibration::new(cfg)),
CalibrationMode::Static => None,
};
Self {
recent_decisions: VecDeque::with_capacity(TELEMETRY_DECISION_HISTORY),
last_snapshot: None,
last_reason: None,
calibration,
}
}
}
struct Governor {
cfg: GovernorConfig,
current_capacity: AtomicU32,
healthy_streak: AtomicU32,
shrink_count: AtomicU64,
grow_count: AtomicU64,
ticks_total: AtomicU64,
started: AtomicBool,
reader: Arc<dyn HealthReader>,
runtime: Mutex<GovernorRuntimeState>,
started_at: Instant,
}
impl Governor {
fn new(cfg: GovernorConfig, reader: Arc<dyn HealthReader>) -> Self {
Self {
cfg,
current_capacity: AtomicU32::new(100),
healthy_streak: AtomicU32::new(0),
shrink_count: AtomicU64::new(0),
grow_count: AtomicU64::new(0),
ticks_total: AtomicU64::new(0),
started: AtomicBool::new(false),
reader,
runtime: Mutex::new(GovernorRuntimeState::new(&cfg)),
started_at: Instant::now(),
}
}
fn ensure_started(self: &Arc<Self>) {
if self.cfg.disabled {
return;
}
if self
.started
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
let me = Arc::clone(self);
let spawn_result = thread::Builder::new()
.name("cass-responsiveness-governor".into())
.spawn(move || me.run());
if let Err(err) = spawn_result {
self.started.store(false, Ordering::Release);
tracing::warn!(
error = %err,
"failed to spawn cass responsiveness governor thread; capacity pinned at 100% until a later start succeeds"
);
}
}
fn run(&self) {
loop {
self.step_once();
thread::sleep(self.cfg.tick);
}
}
fn apply_calibration(
&self,
snapshot: &HealthSnapshot,
is_healthy_tick: bool,
) -> GovernorConfig {
if self.cfg.calibration_mode == CalibrationMode::Static {
return self.cfg;
}
let mut runtime = self
.runtime
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let Some(cal) = runtime.calibration.as_mut() else {
return self.cfg;
};
match cal.observe_and_compute_thresholds(snapshot, is_healthy_tick, &self.cfg) {
Some(effective) => effective.apply_to(&self.cfg),
None => self.cfg,
}
}
fn step_once(&self) {
let snapshot = self.reader.snapshot();
let prev = self.current_capacity.load(Ordering::Relaxed);
let streak = self.healthy_streak.load(Ordering::Relaxed);
let static_is_pressured = snapshot.is_pressured(&self.cfg);
let is_healthy_tick = !static_is_pressured;
let effective_cfg = self.apply_calibration(&snapshot, is_healthy_tick);
let (next, next_streak, reason) = next_capacity(prev, streak, &snapshot, &effective_cfg);
if next < prev {
self.shrink_count.fetch_add(1, Ordering::Relaxed);
} else if next > prev {
self.grow_count.fetch_add(1, Ordering::Relaxed);
}
self.ticks_total.fetch_add(1, Ordering::Relaxed);
self.current_capacity.store(next, Ordering::Relaxed);
self.healthy_streak.store(next_streak, Ordering::Relaxed);
let record_this_tick = next != prev
|| matches!(
reason,
GovernorDecisionReason::Severe
| GovernorDecisionReason::Pressured
| GovernorDecisionReason::PressuredFloorHold
);
let mut runtime = self
.runtime
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
runtime.last_snapshot = Some(snapshot);
runtime.last_reason = Some(reason);
if record_this_tick {
if runtime.recent_decisions.len() >= TELEMETRY_DECISION_HISTORY {
runtime.recent_decisions.pop_front();
}
runtime.recent_decisions.push_back(GovernorDecision {
at_elapsed_ms: self.started_at.elapsed().as_millis() as u64,
prev_capacity_pct: prev,
next_capacity_pct: next,
reason,
snapshot,
});
}
drop(runtime);
if next != prev {
tracing::info!(
prev_capacity_pct = prev,
next_capacity_pct = next,
reason = ?reason,
load_per_core = ?snapshot.load_per_core,
psi_cpu_some_avg10 = ?snapshot.psi_cpu_some_avg10,
"cass responsiveness governor updated capacity"
);
}
}
#[cfg(test)]
fn telemetry(&self) -> GovernorTelemetry {
let runtime = self
.runtime
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let recent: Vec<_> = runtime.recent_decisions.iter().copied().collect();
let last_snapshot = runtime.last_snapshot;
let last_reason = runtime.last_reason;
let calibration = runtime
.calibration
.as_ref()
.map(|cal| cal.telemetry(&self.cfg));
drop(runtime);
let disabled = env_bool_truthy("CASS_RESPONSIVENESS_DISABLE") || self.cfg.disabled;
let current = if disabled {
100
} else {
self.current_capacity.load(Ordering::Relaxed)
};
GovernorTelemetry {
current_capacity_pct: current,
resource_policy: ResourcePolicyTelemetry::from_config(&self.cfg),
healthy_streak: self.healthy_streak.load(Ordering::Relaxed),
shrink_count: self.shrink_count.load(Ordering::Relaxed),
grow_count: self.grow_count.load(Ordering::Relaxed),
ticks_total: self.ticks_total.load(Ordering::Relaxed),
disabled_via_env: disabled,
last_snapshot,
last_reason,
recent_decisions: recent,
calibration,
}
}
}
static GOVERNOR: LazyLock<Arc<Governor>> = LazyLock::new(|| {
Arc::new(Governor::new(
GovernorConfig::from_env(),
Arc::new(ProcHealthReader::new()),
))
});
pub(crate) fn current_capacity_pct() -> u32 {
if disabled_via_env() {
return 100;
}
let g = GOVERNOR.clone();
g.ensure_started();
g.current_capacity.load(Ordering::Relaxed)
}
pub(crate) fn disabled_via_env() -> bool {
env_bool_truthy("CASS_RESPONSIVENESS_DISABLE")
}
pub(crate) fn effective_worker_count(desired: usize) -> usize {
let g = GOVERNOR.clone();
scale_worker_count_with_policy(desired, current_capacity_pct(), &g.cfg)
}
pub(crate) fn effective_inflight_byte_limit(desired_bytes: usize) -> usize {
let g = GOVERNOR.clone();
scale_inflight_byte_limit(desired_bytes, current_capacity_pct(), &g.cfg)
}
pub(crate) fn telemetry_snapshot_passive() -> GovernorTelemetry {
let cfg = GovernorConfig::from_env();
let calibration = match cfg.calibration_mode {
CalibrationMode::Conformal => Some(CalibrationTelemetry {
mode: cfg.calibration_mode,
load_window_len: 0,
psi_window_len: 0,
conformal_k: cfg.conformal_k,
conformal_k_min: cfg.conformal_k_min,
conformal_alpha_pressured: cfg.conformal_alpha_pressured,
conformal_alpha_severe: cfg.conformal_alpha_severe,
drift_reset_count: 0,
outliers_rejected: 0,
observations_total: 0,
load_pressured_q: None,
load_severe_q: None,
psi_pressured_q: None,
psi_severe_q: None,
}),
CalibrationMode::Static => None,
};
GovernorTelemetry {
current_capacity_pct: 100,
resource_policy: ResourcePolicyTelemetry::from_config(&cfg),
healthy_streak: 0,
shrink_count: 0,
grow_count: 0,
ticks_total: 0,
disabled_via_env: cfg.disabled,
last_snapshot: None,
last_reason: None,
recent_decisions: Vec::new(),
calibration,
}
}
fn env_u32(key: &str) -> Option<u32> {
dotenvy::var(key).ok().and_then(|v| v.trim().parse().ok())
}
fn env_f32(key: &str) -> Option<f32> {
dotenvy::var(key).ok().and_then(|v| v.trim().parse().ok())
}
fn env_usize(key: &str) -> Option<usize> {
dotenvy::var(key).ok().and_then(|v| v.trim().parse().ok())
}
fn env_bool_truthy(key: &str) -> bool {
match dotenvy::var(key) {
Ok(v) => matches!(
v.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
),
Err(_) => false,
}
}
fn default_max_inflight_bytes() -> usize {
default_max_inflight_bytes_for_available(available_memory_bytes())
}
fn default_max_inflight_bytes_for_available(available_bytes: Option<u64>) -> usize {
let Some(available_bytes) = available_bytes else {
return DEFAULT_MAX_INFLIGHT_BYTES;
};
let ceiling = usize::try_from(DEFAULT_MAX_INFLIGHT_BYTES_CEILING).unwrap_or(usize::MAX);
let budget = available_bytes / DEFAULT_MAX_INFLIGHT_MEMORY_FRACTION_DENOMINATOR;
let budget = budget.min(DEFAULT_MAX_INFLIGHT_BYTES_CEILING);
let budget = usize::try_from(budget).unwrap_or(ceiling);
budget.clamp(DEFAULT_MAX_INFLIGHT_BYTES, ceiling)
}
pub(crate) fn available_memory_bytes() -> Option<u64> {
let meminfo = std::fs::read_to_string("/proc/meminfo").ok()?;
proc_kib_field_bytes(&meminfo, "MemAvailable:")
}
pub(crate) fn total_memory_bytes() -> Option<u64> {
let meminfo = std::fs::read_to_string("/proc/meminfo").ok()?;
proc_kib_field_bytes(&meminfo, "MemTotal:")
}
pub(crate) fn process_resident_memory_bytes() -> Option<u64> {
let status = std::fs::read_to_string("/proc/self/status").ok()?;
proc_kib_field_bytes(&status, "VmRSS:")
}
fn proc_kib_field_bytes(contents: &str, prefix: &str) -> Option<u64> {
for line in contents.lines() {
if let Some(rest) = line.strip_prefix(prefix) {
let kb = rest.split_whitespace().next()?.parse::<u64>().ok()?;
return kb.checked_mul(1024);
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
fn cfg() -> GovernorConfig {
GovernorConfig {
available_parallelism: 16,
reserved_cores: 2,
max_workers: 14,
max_inflight_bytes: DEFAULT_MAX_INFLIGHT_BYTES,
min_inflight_bytes: DEFAULT_MIN_INFLIGHT_BYTES,
min_capacity_pct: DEFAULT_MIN_CAPACITY_PCT,
max_load_per_core: DEFAULT_MAX_LOAD_PER_CORE,
severe_load_per_core: DEFAULT_SEVERE_LOAD_PER_CORE,
max_psi_avg10: DEFAULT_MAX_PSI_AVG10,
severe_psi_avg10: DEFAULT_SEVERE_PSI_AVG10,
growth_consecutive_healthy_ticks: DEFAULT_GROWTH_CONSECUTIVE_HEALTHY_TICKS,
tick: Duration::from_millis(1),
disabled: false,
calibration_mode: CalibrationMode::Static,
conformal_k: DEFAULT_CONFORMAL_K,
conformal_k_min: DEFAULT_CONFORMAL_K_MIN,
conformal_alpha_pressured: DEFAULT_CONFORMAL_ALPHA_PRESSURED,
conformal_alpha_severe: DEFAULT_CONFORMAL_ALPHA_SEVERE,
drift_delta: DEFAULT_DRIFT_DELTA,
drift_lambda: DEFAULT_DRIFT_LAMBDA,
}
}
fn healthy() -> HealthSnapshot {
HealthSnapshot {
load_per_core: Some(0.1),
psi_cpu_some_avg10: Some(0.0),
}
}
fn pressured() -> HealthSnapshot {
HealthSnapshot {
load_per_core: Some(1.5),
psi_cpu_some_avg10: Some(5.0),
}
}
fn severe() -> HealthSnapshot {
HealthSnapshot {
load_per_core: Some(3.0),
psi_cpu_some_avg10: Some(80.0),
}
}
struct ScriptedReader {
snapshots: std::sync::Mutex<std::collections::VecDeque<HealthSnapshot>>,
fallback: HealthSnapshot,
}
impl ScriptedReader {
fn new(script: Vec<HealthSnapshot>) -> Self {
let fallback = *script.last().unwrap_or(&HealthSnapshot {
load_per_core: None,
psi_cpu_some_avg10: None,
});
Self {
snapshots: std::sync::Mutex::new(script.into()),
fallback,
}
}
}
impl HealthReader for ScriptedReader {
fn snapshot(&self) -> HealthSnapshot {
let mut guard = self.snapshots.lock().expect("scripted reader mutex");
guard.pop_front().unwrap_or(self.fallback)
}
}
fn build_test_governor(cfg: GovernorConfig, script: Vec<HealthSnapshot>) -> Governor {
Governor::new(cfg, Arc::new(ScriptedReader::new(script)))
}
#[test]
fn disabled_config_always_returns_full_capacity() {
let mut c = cfg();
c.disabled = true;
let snap = HealthSnapshot {
load_per_core: Some(10.0),
psi_cpu_some_avg10: Some(90.0),
};
let (next, streak, reason) = next_capacity(50, 0, &snap, &c);
assert_eq!(next, 100);
assert_eq!(streak, 0);
assert_eq!(reason, GovernorDecisionReason::Disabled);
}
#[test]
fn healthy_snapshot_does_not_grow_before_streak_threshold() {
let c = cfg();
let h = healthy();
let (next, streak, reason) = next_capacity(50, 0, &h, &c);
assert_eq!(next, 50);
assert_eq!(streak, 1);
assert_eq!(reason, GovernorDecisionReason::HealthyHold);
let (next, streak, reason) = next_capacity(next, streak, &h, &c);
assert_eq!(next, 50);
assert_eq!(streak, 2);
assert_eq!(reason, GovernorDecisionReason::HealthyHold);
let (next, streak, reason) = next_capacity(next, streak, &h, &c);
assert_eq!(next, 75);
assert_eq!(streak, 0, "streak must reset after a growth step");
assert_eq!(reason, GovernorDecisionReason::HealthyGrow);
}
#[test]
fn healthy_at_ceiling_is_classified_as_ceiling_hold() {
let c = cfg();
let h = healthy();
let (next, streak, reason) = next_capacity(100, 2, &h, &c);
assert_eq!(next, 100);
assert_eq!(reason, GovernorDecisionReason::HealthyCeilingHold);
assert_eq!(streak, 3, "ceiling hold keeps streak rather than resetting");
}
#[test]
fn moderate_pressure_shrinks_immediately() {
let c = cfg();
let p = pressured();
let (next, streak, reason) = next_capacity(100, 2, &p, &c);
assert_eq!(next, 75);
assert_eq!(streak, 0);
assert_eq!(reason, GovernorDecisionReason::Pressured);
let (next, streak, reason) = next_capacity(next, streak, &p, &c);
assert_eq!(next, 50);
assert_eq!(streak, 0);
assert_eq!(reason, GovernorDecisionReason::Pressured);
let (next, _, reason) = next_capacity(DEFAULT_MIN_CAPACITY_PCT, 0, &p, &c);
assert_eq!(next, DEFAULT_MIN_CAPACITY_PCT);
assert_eq!(reason, GovernorDecisionReason::PressuredFloorHold);
}
#[test]
fn severe_pressure_drops_straight_to_floor() {
let c = cfg();
let s = severe();
let (next, streak, reason) = next_capacity(100, 2, &s, &c);
assert_eq!(next, DEFAULT_MIN_CAPACITY_PCT);
assert_eq!(streak, 0);
assert_eq!(reason, GovernorDecisionReason::Severe);
}
#[test]
fn scale_worker_count_never_below_one_and_never_above_desired() {
assert_eq!(scale_worker_count(0, 100), 0);
assert_eq!(scale_worker_count(16, 100), 16);
assert_eq!(scale_worker_count(16, 50), 8);
assert_eq!(scale_worker_count(16, 25), 4);
assert_eq!(scale_worker_count(1, 1), 1);
assert!(scale_worker_count(4, 100) <= 4);
}
#[test]
fn default_reserved_core_policy_preserves_interactive_headroom() {
assert_eq!(default_reserved_cores_for_available(1), 0);
assert_eq!(default_reserved_cores_for_available(2), 1);
assert_eq!(default_reserved_cores_for_available(8), 2);
assert_eq!(default_reserved_cores_for_available(64), 8);
}
#[test]
fn worker_policy_applies_reserved_cores_and_live_capacity() {
let mut c = cfg();
c.available_parallelism = 16;
c.reserved_cores = 4;
c.max_workers = 20;
assert_eq!(scale_worker_count_with_policy(64, 100, &c), 12);
assert_eq!(scale_worker_count_with_policy(64, 50, &c), 6);
assert_eq!(scale_worker_count_with_policy(2, 50, &c), 1);
assert_eq!(scale_worker_count_with_policy(0, 50, &c), 0);
}
#[test]
fn inflight_byte_policy_caps_and_scales_without_increasing_small_budgets() {
let mut c = cfg();
c.max_inflight_bytes = 128 * 1024 * 1024;
c.min_inflight_bytes = 8 * 1024 * 1024;
assert_eq!(
scale_inflight_byte_limit(512 * 1024 * 1024, 100, &c),
128 * 1024 * 1024
);
assert_eq!(
scale_inflight_byte_limit(512 * 1024 * 1024, 25, &c),
32 * 1024 * 1024
);
assert_eq!(
scale_inflight_byte_limit(4 * 1024 * 1024, 25, &c),
4 * 1024 * 1024
);
assert_eq!(scale_inflight_byte_limit(0, 25, &c), 0);
}
#[test]
fn default_inflight_byte_budget_scales_with_available_memory() {
let gib = 1024_u64 * 1024 * 1024;
assert_eq!(
default_max_inflight_bytes_for_available(None),
DEFAULT_MAX_INFLIGHT_BYTES
);
assert_eq!(
default_max_inflight_bytes_for_available(Some(2 * gib)),
DEFAULT_MAX_INFLIGHT_BYTES,
"small hosts keep the old conservative floor"
);
assert_eq!(
default_max_inflight_bytes_for_available(Some(256 * gib)),
8 * 1024 * 1024 * 1024,
"256 GiB hosts can keep materially more work in flight"
);
assert_eq!(
default_max_inflight_bytes_for_available(Some(1024 * gib)),
usize::try_from(DEFAULT_MAX_INFLIGHT_BYTES_CEILING).unwrap_or(usize::MAX),
"very large hosts are still bounded"
);
}
#[test]
fn env_disable_signal_is_truthy_aware() {
let probe = "__CASS_RESP_DISABLE_PARSE_PROBE__";
let prior = std::env::var(probe).ok();
for truthy in ["1", "true", "True", "YES", "on"] {
unsafe {
std::env::set_var(probe, truthy);
}
assert!(
env_bool_truthy(probe),
"expected `{truthy}` to be recognized as truthy"
);
}
for falsy in ["0", "false", "No", "off", ""] {
unsafe {
std::env::set_var(probe, falsy);
}
assert!(
!env_bool_truthy(probe),
"expected `{falsy}` to be recognized as falsy"
);
}
unsafe {
std::env::remove_var(probe);
}
assert!(!env_bool_truthy(probe), "absent env var must be falsy");
if let Some(v) = prior {
unsafe {
std::env::set_var(probe, v);
}
}
}
#[test]
fn snapshot_classification_tolerates_missing_signals() {
let c = cfg();
let no_signals = HealthSnapshot {
load_per_core: None,
psi_cpu_some_avg10: None,
};
assert!(!no_signals.is_severe(&c));
assert!(!no_signals.is_pressured(&c));
let (next, streak, reason) = next_capacity(80, 0, &no_signals, &c);
assert_eq!(next, 80);
assert_eq!(streak, 1);
assert_eq!(reason, GovernorDecisionReason::HealthyHold);
}
#[test]
fn telemetry_counts_shrink_and_grow_events() {
let mut script = vec![severe()];
script.extend(std::iter::repeat_n(healthy(), 9));
let gov = build_test_governor(cfg(), script);
for _ in 0..10 {
gov.step_once();
}
let tele = gov.telemetry();
assert_eq!(
tele.current_capacity_pct, 100,
"should have recovered to ceiling after 9 healthy ticks"
);
assert_eq!(tele.shrink_count, 1, "one severe drop = one shrink");
assert_eq!(
tele.grow_count, 3,
"recovery from 25 to 100 in 25pp steps = 3 grow events"
);
assert_eq!(tele.ticks_total, 10);
let reasons: Vec<GovernorDecisionReason> =
tele.recent_decisions.iter().map(|d| d.reason).collect();
assert_eq!(
reasons,
vec![
GovernorDecisionReason::Severe,
GovernorDecisionReason::HealthyGrow,
GovernorDecisionReason::HealthyGrow,
GovernorDecisionReason::HealthyGrow,
]
);
}
#[test]
fn telemetry_ring_buffer_is_bounded() {
let count = TELEMETRY_DECISION_HISTORY + 50;
let script = std::iter::repeat_n(pressured(), count).collect::<Vec<_>>();
let gov = build_test_governor(cfg(), script);
for _ in 0..count {
gov.step_once();
}
let tele = gov.telemetry();
assert_eq!(
tele.recent_decisions.len(),
TELEMETRY_DECISION_HISTORY,
"ring buffer must saturate at its cap"
);
assert_eq!(tele.ticks_total, count as u64);
let last = tele.recent_decisions.last().unwrap();
let first = tele.recent_decisions.first().unwrap();
assert!(
last.at_elapsed_ms >= first.at_elapsed_ms,
"ring buffer must preserve chronological order"
);
}
#[test]
fn telemetry_skips_healthy_hold_ticks() {
let script = std::iter::repeat_n(healthy(), 2).collect::<Vec<_>>();
let gov = build_test_governor(cfg(), script);
for _ in 0..2 {
gov.step_once();
}
let tele = gov.telemetry();
assert_eq!(
tele.recent_decisions.len(),
0,
"healthy-hold ticks should not pollute the ring buffer"
);
assert_eq!(tele.current_capacity_pct, 100);
}
#[test]
fn telemetry_survives_mutex_poison() {
let gov = Arc::new(build_test_governor(
cfg(),
std::iter::repeat_n(pressured(), 4).collect(),
));
{
let poison_gov = Arc::clone(&gov);
let handle = std::thread::spawn(move || {
let _held = poison_gov
.runtime
.lock()
.expect("fresh mutex should not be poisoned");
panic!("intentional poison for regression test");
});
let _ = handle.join();
}
assert!(
gov.runtime.is_poisoned(),
"mutex must be poisoned after the helper thread's panic"
);
for _ in 0..4 {
gov.step_once();
}
let tele = gov.telemetry();
assert_eq!(
tele.ticks_total, 4,
"atomics update regardless of mutex state"
);
assert!(
!tele.recent_decisions.is_empty(),
"telemetry must continue to record after mutex poison, got: {tele:?}"
);
assert_eq!(
tele.recent_decisions.first().unwrap().reason,
GovernorDecisionReason::Pressured,
"first recorded decision after poison should still classify correctly"
);
}
#[test]
fn telemetry_serializes_to_json_with_expected_keys() {
let gov = build_test_governor(cfg(), vec![severe(), pressured()]);
gov.step_once();
gov.step_once();
let tele = gov.telemetry();
let json = serde_json::to_string(&tele).expect("telemetry serializes");
for key in [
"current_capacity_pct",
"resource_policy",
"reserved_cores",
"max_inflight_bytes",
"shrink_count",
"grow_count",
"ticks_total",
"disabled_via_env",
"last_snapshot",
"last_reason",
"recent_decisions",
"healthy_streak",
] {
assert!(
json.contains(key),
"expected JSON to contain `{key}`, got: {json}"
);
}
assert!(
json.contains("\"severe\"") || json.contains("\"pressured\""),
"expected snake_case reason tag in JSON: {json}"
);
}
fn run_script_and_trace(
cfg: GovernorConfig,
script: Vec<HealthSnapshot>,
) -> (Governor, Vec<u32>) {
let tick_count = script.len();
let gov = build_test_governor(cfg, script);
let mut capacities = Vec::with_capacity(tick_count);
for _ in 0..tick_count {
gov.step_once();
capacities.push(gov.current_capacity.load(Ordering::Relaxed));
}
(gov, capacities)
}
fn transitions(capacities: &[u32]) -> usize {
capacities
.windows(2)
.filter(|pair| pair[0] != pair[1])
.count()
}
#[test]
fn anti_flap_alternating_pressured_healthy_never_grows() {
let mut script = Vec::with_capacity(100);
for i in 0..100 {
script.push(if i % 2 == 0 { pressured() } else { healthy() });
}
let (gov, capacities) = run_script_and_trace(cfg(), script);
let tele = gov.telemetry();
assert_eq!(
tele.grow_count, 0,
"alternating flap must never produce a grow event"
);
assert_eq!(tele.shrink_count, 3, "flap shrinks until floor, then holds");
let t = transitions(&capacities);
assert!(
t <= 3,
"alternating flap must not oscillate capacity; saw {t} transitions over {} ticks",
capacities.len()
);
}
#[test]
fn anti_flap_near_threshold_jitter_does_not_oscillate() {
let mut script = Vec::with_capacity(60);
for i in 0..60 {
script.push(HealthSnapshot {
load_per_core: Some(if i % 2 == 0 { 1.24 } else { 1.26 }),
psi_cpu_some_avg10: Some(1.0),
});
}
let (_gov, capacities) = run_script_and_trace(cfg(), script);
let t = transitions(&capacities);
assert!(
t <= 3,
"threshold jitter must not cause capacity oscillation; saw {t} transitions"
);
}
#[test]
fn anti_flap_burst_recovery_respects_hysteresis() {
let mut script = Vec::new();
for _ in 0..3 {
for _ in 0..5 {
script.push(severe());
}
for _ in 0..9 {
script.push(healthy());
}
}
let (gov, capacities) = run_script_and_trace(cfg(), script);
let tele = gov.telemetry();
assert_eq!(tele.shrink_count, 3, "one shrink per severe burst");
assert_eq!(
tele.grow_count, 9,
"each 9-tick healthy window produces 3 grow steps"
);
let t = transitions(&capacities);
assert_eq!(t, 11);
assert!(
(t as f64) / (capacities.len() as f64) <= 1.0 / 3.0,
"transition rate must respect the 3-tick hysteresis"
);
}
#[test]
fn anti_flap_transition_rate_upper_bound() {
let growth_ticks = DEFAULT_GROWTH_CONSECUTIVE_HEALTHY_TICKS as usize;
let mut script = Vec::with_capacity(120);
while script.len() < 120 {
for _ in 0..growth_ticks {
script.push(healthy());
}
script.push(severe());
}
script.truncate(120);
let tick_count = script.len();
let (_gov, capacities) = run_script_and_trace(cfg(), script);
let t = transitions(&capacities);
let rate = t as f64 / tick_count as f64;
assert!(
rate <= 0.55,
"worst-case transition rate must stay bounded; saw {rate} over {tick_count} ticks"
);
}
#[test]
fn conformal_quantile_index_matches_vovk_theorem_1() {
let sorted: Vec<f32> = (0..256).map(|i| i as f32).collect();
let q = conformal_quantile_sorted(&sorted, 0.05).unwrap();
assert!(
(q - 244.0).abs() < 1e-6,
"K=256, α=0.05 expected sorted[244]=244 but got {q}"
);
let q_tight = conformal_quantile_sorted(&sorted, 0.01).unwrap();
assert!(q_tight > q, "α=0.01 must produce q̂ ≥ α=0.05 q̂");
}
#[test]
fn conformal_quantile_clamps_to_last_element_on_tiny_window() {
let sorted = [0.0_f32, 1.0, 2.0, 3.0, 4.0];
let q = conformal_quantile_sorted(&sorted, 0.01).unwrap();
assert_eq!(q, 4.0);
}
#[test]
fn conformal_quantile_rejects_pathological_alpha() {
let sorted = [1.0_f32, 2.0, 3.0];
assert!(conformal_quantile_sorted(&sorted, 0.0).is_none());
assert!(conformal_quantile_sorted(&sorted, 1.0).is_none());
assert!(conformal_quantile_sorted(&sorted, f32::NAN).is_none());
}
#[test]
fn conformal_coverage_on_iid_uniform_meets_guarantee() {
let mut stream = CalibrationStream::new(256);
for i in 0..256 {
let v = (i as f32) * 0.0039; stream.push(v);
}
let q = stream.quantile(0.05, 32).unwrap();
let mut covered = 0usize;
let test_n = 1024;
for i in 0..test_n {
let v = (i as f32) * (1.0 / test_n as f32);
if v <= q {
covered += 1;
}
}
let coverage = covered as f32 / test_n as f32;
assert!(
(0.90..=1.00).contains(&coverage),
"observed coverage {coverage} outside [0.90, 1.00] window"
);
}
#[test]
fn mad_rejects_obvious_outlier_on_stationary_stream() {
let mut stream = CalibrationStream::new(64);
for _ in 0..32 {
stream.push(1.0);
}
for _ in 0..32 {
stream.push(1.2);
}
assert!(stream.is_outlier(10.0));
assert!(stream.is_outlier(1.5));
assert!(!stream.is_outlier(1.15));
}
#[test]
fn mad_is_not_an_outlier_on_empty_window() {
let stream = CalibrationStream::new(16);
assert!(!stream.is_outlier(100.0));
}
#[test]
fn page_hinkley_does_not_trip_on_stationary_stream() {
let mut ph = PageHinkley::new(0.01, 0.5);
let mut state: u32 = 12345;
let mut trips = 0;
for _ in 0..10_000 {
state = state.wrapping_mul(1103515245).wrapping_add(12345);
let r = (state as f32 / u32::MAX as f32) * 0.02 - 0.01; if ph.observe(r) {
trips += 1;
ph.reset();
}
}
assert!(
trips < 100,
"page-hinkley tripped {trips} times on stationary stream (expected < 100)"
);
}
#[test]
fn page_hinkley_trips_on_clear_mean_shift() {
let mut ph = PageHinkley::new(0.01, 0.5);
for _ in 0..500 {
ph.observe(0.0);
}
let mut trips_in_phase_2 = 0;
for _ in 0..500 {
if ph.observe(0.5) {
trips_in_phase_2 += 1;
ph.reset();
break;
}
}
assert!(
trips_in_phase_2 >= 1,
"page-hinkley missed a clear 0.5-magnitude mean shift"
);
}
fn conformal_cfg() -> GovernorConfig {
GovernorConfig {
calibration_mode: CalibrationMode::Conformal,
conformal_k: 64,
conformal_k_min: 16,
conformal_alpha_pressured: 0.05,
conformal_alpha_severe: 0.01,
drift_delta: 0.01,
drift_lambda: 0.5,
..cfg()
}
}
#[test]
fn conformal_mode_static_behavior_until_k_min_reached() {
let script = std::iter::repeat_n(healthy(), 20).collect();
let gov = build_test_governor(conformal_cfg(), script);
for _ in 0..20 {
gov.step_once();
}
let tele = gov.telemetry();
assert_eq!(
tele.shrink_count, 0,
"no shrinks expected during healthy-only warm-up"
);
let cal = tele
.calibration
.expect("conformal mode must emit calibration telemetry");
assert_eq!(cal.mode, CalibrationMode::Conformal);
assert!(cal.load_window_len > 0);
}
#[test]
fn conformal_mode_never_inverts_severe_vs_pressured_invariant() {
let cfg_conf = conformal_cfg();
let mut cal = GovernorCalibration::new(&cfg_conf);
let mut state: u32 = 987654321;
for _ in 0..96 {
state = state.wrapping_mul(1103515245).wrapping_add(12345);
let v = 0.05 + (state as f32 / u32::MAX as f32);
let snap = HealthSnapshot {
load_per_core: Some(v),
psi_cpu_some_avg10: Some(v * 10.0),
};
let thresholds = cal.observe_and_compute_thresholds(&snap, true, &cfg_conf);
if let Some(t) = thresholds {
assert!(
t.severe_load > t.pressured_load,
"PO-C2-3 violated for load: severe {} !> pressured {}",
t.severe_load,
t.pressured_load
);
assert!(
t.severe_psi > t.pressured_psi,
"PO-C2-3 violated for psi: severe {} !> pressured {}",
t.severe_psi,
t.pressured_psi
);
}
}
let tele = cal.telemetry(&cfg_conf);
assert!(
tele.load_pressured_q.is_some() && tele.load_severe_q.is_some(),
"expected both load quantiles to be emitted by the end of the loop"
);
}
#[test]
fn conformal_mode_falls_back_to_static_on_degenerate_window() {
let script = std::iter::repeat_n(healthy(), 80).collect();
let gov = build_test_governor(conformal_cfg(), script);
for _ in 0..80 {
gov.step_once();
}
let tele = gov.telemetry();
assert_eq!(tele.shrink_count, 0);
assert!(tele.calibration.is_some());
}
#[test]
fn conformal_telemetry_serializes_with_calibration_block() {
let script = std::iter::repeat_n(healthy(), 40).collect();
let gov = build_test_governor(conformal_cfg(), script);
for _ in 0..40 {
gov.step_once();
}
let tele = gov.telemetry();
let json = serde_json::to_string(&tele).expect("telemetry serialization");
for key in [
"calibration",
"\"mode\":\"conformal\"",
"load_window_len",
"psi_window_len",
"conformal_k",
"drift_reset_count",
"outliers_rejected",
] {
assert!(
json.contains(key),
"expected JSON to contain `{key}`; got: {json}"
);
}
}
fn idle_dev_box_trace(n: usize) -> Vec<HealthSnapshot> {
let mut state: u32 = 42;
(0..n)
.map(|_| {
state = state.wrapping_mul(1103515245).wrapping_add(12345);
let u = (state as f32 / u32::MAX as f32).clamp(0.0, 0.9999);
let load = if u < 0.95 {
0.2 + u * 0.3
} else {
0.9 + u * 0.2
};
HealthSnapshot {
load_per_core: Some(load),
psi_cpu_some_avg10: Some(load * 8.0),
}
})
.collect()
}
fn run_replay(mut cfg: GovernorConfig, script: Vec<HealthSnapshot>) -> GovernorTelemetry {
cfg.tick = Duration::from_millis(1);
let gov = build_test_governor(cfg, script.clone());
for _ in 0..script.len() {
gov.step_once();
}
gov.telemetry()
}
#[test]
#[serial]
fn conformal_vs_static_idle_dev_trace_is_not_materially_worse() {
let script = idle_dev_box_trace(2_048);
let static_cfg = GovernorConfig {
calibration_mode: CalibrationMode::Static,
..cfg()
};
let conf_cfg = GovernorConfig {
calibration_mode: CalibrationMode::Conformal,
conformal_k: 256,
conformal_k_min: 32,
conformal_alpha_pressured: 0.05,
conformal_alpha_severe: 0.01,
drift_delta: 0.01,
drift_lambda: 0.5,
..cfg()
};
let static_tele = run_replay(static_cfg, script.clone());
let conformal_tele = run_replay(conf_cfg, script);
eprintln!(
"replay trace: static shrinks={}, conformal shrinks={}, \
static grows={}, conformal grows={}",
static_tele.shrink_count,
conformal_tele.shrink_count,
static_tele.grow_count,
conformal_tele.grow_count,
);
assert!(
conformal_tele.shrink_count <= 1024,
"conformal shrink_count={} is more than 10x the α=0.05 theoretical FP budget — conformal calibration is misbehaving",
conformal_tele.shrink_count
);
}
#[test]
#[serial]
fn conformal_vs_static_under_sustained_pressure_shrinks_similarly() {
let pressured_trace: Vec<HealthSnapshot> = std::iter::repeat_n(severe(), 128).collect();
let static_cfg = GovernorConfig {
calibration_mode: CalibrationMode::Static,
..cfg()
};
let conf_cfg = GovernorConfig {
calibration_mode: CalibrationMode::Conformal,
conformal_k: 256,
conformal_k_min: 32,
conformal_alpha_pressured: 0.05,
conformal_alpha_severe: 0.01,
drift_delta: 0.01,
drift_lambda: 0.5,
..cfg()
};
let static_tele = run_replay(static_cfg, pressured_trace.clone());
let conformal_tele = run_replay(conf_cfg, pressured_trace);
assert_eq!(static_tele.current_capacity_pct, cfg().min_capacity_pct);
assert_eq!(
conformal_tele.current_capacity_pct,
cfg().min_capacity_pct,
"conformal must not be more permissive than static under severe load"
);
}
}