use std::time::Duration;
use crate::resil::{
AdaptiveShedderConfig, RollingWindow, ShedderError, ShedderSnapshot, WindowOutcome,
};
const DEFAULT_MIN_RT: Duration = Duration::from_secs(1);
const CPU_MAX_MILLIS: u32 = 1000;
const OVERLOAD_FACTOR_LOWER_BOUND: f64 = 0.1;
const FLYING_BETA: f64 = 0.9;
#[derive(Debug)]
pub(crate) struct ShedderState {
config: AdaptiveShedderConfig,
in_flight: usize,
avg_in_flight: f64,
overload_started_at: Option<std::time::Instant>,
dropped_recently: bool,
last_cpu_usage_millis: u32,
window: RollingWindow,
}
impl ShedderState {
pub(crate) fn new(config: AdaptiveShedderConfig) -> Self {
Self {
window: RollingWindow::new(config.window),
config: AdaptiveShedderConfig {
max_in_flight: config.max_in_flight.max(1),
overload_in_flight_percent: config.overload_in_flight_percent.min(100),
cpu_threshold_millis: config.cpu_threshold_millis.min(1000),
min_overload_factor_percent: config.min_overload_factor_percent.min(100),
..config
},
in_flight: 0,
avg_in_flight: 0.0,
overload_started_at: None,
dropped_recently: false,
last_cpu_usage_millis: 0,
}
}
pub(crate) fn allow(&mut self, cpu_usage_millis: u32) -> Result<(), ShedderError> {
self.last_cpu_usage_millis = cpu_usage_millis;
if self.should_drop(cpu_usage_millis) {
self.window.record(WindowOutcome::Drop);
self.dropped_recently = true;
return Err(ShedderError);
}
self.in_flight += 1;
Ok(())
}
pub(crate) fn record_success(&mut self, latency: Duration) {
self.in_flight = self.in_flight.saturating_sub(1);
self.update_average_in_flight();
self.window
.record_with_latency(WindowOutcome::Success, latency);
}
pub(crate) fn record_failure(&mut self, latency: Duration) {
self.in_flight = self.in_flight.saturating_sub(1);
self.update_average_in_flight();
self.window
.record_with_latency(WindowOutcome::Failure, latency);
}
pub(crate) fn snapshot(&self) -> ShedderSnapshot {
ShedderSnapshot {
in_flight: self.in_flight,
avg_in_flight: self.avg_in_flight,
cpu_usage_millis: self.last_cpu_usage_millis,
window: self.window.snapshot(),
}
}
fn should_drop(&mut self, cpu_usage_millis: u32) -> bool {
if self.in_flight >= self.config.max_in_flight {
return true;
}
let snapshot = self.window.snapshot();
if snapshot.total() < self.config.min_request_count {
return false;
}
let average_latency = snapshot.average_latency();
let overloaded = self.system_overloaded(cpu_usage_millis)
|| self.still_hot()
|| average_latency.is_some_and(|latency| latency >= self.config.max_latency);
if !overloaded {
self.overload_started_at = None;
self.dropped_recently = false;
return false;
}
self.overload_started_at
.get_or_insert_with(std::time::Instant::now);
self.high_throughput(cpu_usage_millis)
}
fn high_throughput(&self, cpu_usage_millis: u32) -> bool {
let max_flight = self.max_flight() * self.overload_factor(cpu_usage_millis);
self.avg_in_flight > max_flight && self.in_flight as f64 > max_flight
}
fn max_flight(&self) -> f64 {
let max_pass = self.window.max_successes_per_bucket().max(1) as f64;
let min_rt = self.window.min_average_latency().unwrap_or(DEFAULT_MIN_RT);
let bucket_duration = self
.config
.window
.bucket_duration
.max(Duration::from_millis(1));
let window_scale = 1.0 / bucket_duration.as_secs_f64() / 1000.0;
(max_pass * min_rt.as_secs_f64() * 1000.0 * window_scale).max(1.0)
}
fn overload_factor(&self, cpu_usage_millis: u32) -> f64 {
if cpu_usage_millis < self.config.cpu_threshold_millis {
return 1.0;
}
let threshold = self.config.cpu_threshold_millis.min(CPU_MAX_MILLIS - 1);
let denominator = CPU_MAX_MILLIS.saturating_sub(threshold).max(1) as f64;
let raw = CPU_MAX_MILLIS.saturating_sub(cpu_usage_millis.min(CPU_MAX_MILLIS)) as f64
/ denominator;
raw.clamp(self.min_overload_factor(), 1.0)
}
fn min_overload_factor(&self) -> f64 {
(self.config.min_overload_factor_percent as f64 / 100.0)
.clamp(OVERLOAD_FACTOR_LOWER_BOUND, 1.0)
}
fn system_overloaded(&mut self, cpu_usage_millis: u32) -> bool {
if cpu_usage_millis >= self.config.cpu_threshold_millis {
self.overload_started_at
.get_or_insert_with(std::time::Instant::now);
true
} else {
false
}
}
fn still_hot(&self) -> bool {
self.dropped_recently
&& self
.overload_started_at
.is_some_and(|started| started.elapsed() < self.config.cool_off)
}
fn update_average_in_flight(&mut self) {
self.avg_in_flight = self.avg_in_flight * FLYING_BETA + self.in_flight as f64 * 0.1;
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::resil::AdaptiveShedderConfig;
#[test]
fn latency_overload_can_reject() {
let mut state = ShedderState::new(AdaptiveShedderConfig {
max_in_flight: 10,
min_request_count: 1,
max_latency: Duration::from_nanos(1),
..AdaptiveShedderConfig::default()
});
state
.window
.record_with_latency(WindowOutcome::Success, Duration::from_millis(1));
state.avg_in_flight = 2.0;
state.in_flight = 2;
assert!(state.should_drop(0));
}
#[test]
fn high_cpu_and_throughput_can_reject() {
let mut state = ShedderState::new(AdaptiveShedderConfig {
max_in_flight: 10,
min_request_count: 1,
cpu_threshold_millis: 800,
..AdaptiveShedderConfig::default()
});
state
.window
.record_with_latency(WindowOutcome::Success, Duration::from_millis(1));
state.avg_in_flight = 2.0;
state.in_flight = 2;
assert!(state.should_drop(950));
}
}