use std::{future::Future, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::sync::Mutex;
use crate::resil::{
SharedCpuUsageProvider, WindowConfig, WindowSnapshot, cpu::default_cpu_provider,
shedder_state::ShedderState,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdaptiveShedderConfig {
pub max_in_flight: usize,
pub min_request_count: u64,
pub max_latency: Duration,
pub overload_in_flight_percent: u8,
pub window: WindowConfig,
pub cpu_threshold_millis: u32,
pub cool_off: Duration,
pub min_overload_factor_percent: u8,
}
impl Default for AdaptiveShedderConfig {
fn default() -> Self {
Self {
max_in_flight: 1024,
min_request_count: 20,
max_latency: Duration::from_millis(250),
overload_in_flight_percent: 80,
window: WindowConfig::default(),
cpu_threshold_millis: 900,
cool_off: Duration::from_secs(1),
min_overload_factor_percent: 10,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ShedderSnapshot {
pub in_flight: usize,
pub avg_in_flight: f64,
pub cpu_usage_millis: u32,
pub window: WindowSnapshot,
}
#[derive(Debug, Error, Clone, Copy, PartialEq, Eq)]
#[error("service overloaded")]
pub struct ShedderError;
pub struct AdaptiveShedder {
state: Arc<Mutex<ShedderState>>,
cpu: SharedCpuUsageProvider,
}
impl Clone for AdaptiveShedder {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
cpu: self.cpu.clone(),
}
}
}
impl std::fmt::Debug for AdaptiveShedder {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("AdaptiveShedder")
.finish_non_exhaustive()
}
}
impl AdaptiveShedder {
pub fn new(config: AdaptiveShedderConfig) -> Self {
Self::with_cpu_usage_provider(config, default_cpu_provider())
}
pub fn with_cpu_usage_provider(
config: AdaptiveShedderConfig,
cpu: SharedCpuUsageProvider,
) -> Self {
Self {
state: Arc::new(Mutex::new(ShedderState::new(config))),
cpu,
}
}
pub async fn allow(&self) -> Result<ShedderGuard, ShedderError> {
let cpu = self.cpu.cpu_usage_millis();
self.state.lock().await.allow(cpu)?;
Ok(ShedderGuard {
shedder: self.clone(),
started_at: std::time::Instant::now(),
completed: false,
})
}
pub async fn run<F, T>(&self, future: F) -> Result<T, ShedderError>
where
F: Future<Output = T>,
{
let guard = self.allow().await?;
let value = future.await;
guard.record_success().await;
Ok(value)
}
pub async fn snapshot(&self) -> ShedderSnapshot {
self.state.lock().await.snapshot()
}
async fn record_success(&self, latency: Duration) {
self.state.lock().await.record_success(latency);
}
async fn record_failure(&self, latency: Duration) {
self.state.lock().await.record_failure(latency);
}
}
#[derive(Debug)]
pub struct ShedderGuard {
shedder: AdaptiveShedder,
started_at: std::time::Instant,
completed: bool,
}
impl ShedderGuard {
pub async fn record_success(mut self) {
self.shedder.record_success(self.started_at.elapsed()).await;
self.completed = true;
}
pub async fn record_failure(mut self) {
self.shedder.record_failure(self.started_at.elapsed()).await;
self.completed = true;
}
}
impl Drop for ShedderGuard {
fn drop(&mut self) {
if !self.completed {
let shedder = self.shedder.clone();
let latency = self.started_at.elapsed();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
shedder.record_failure(latency).await;
});
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::resil::CpuUsageProvider;
struct FixedCpu(u32);
impl CpuUsageProvider for FixedCpu {
fn cpu_usage_millis(&self) -> u32 {
self.0
}
}
#[tokio::test]
async fn rejects_when_in_flight_is_full() {
let shedder = AdaptiveShedder::new(AdaptiveShedderConfig {
max_in_flight: 1,
..AdaptiveShedderConfig::default()
});
let _guard = shedder.allow().await.expect("first");
assert!(shedder.allow().await.is_err());
}
#[tokio::test]
async fn recovers_after_request_finishes() {
let shedder = AdaptiveShedder::new(AdaptiveShedderConfig {
max_in_flight: 1,
..AdaptiveShedderConfig::default()
});
shedder.allow().await.expect("first").record_success().await;
assert!(shedder.allow().await.is_ok());
}
#[tokio::test]
async fn cpu_snapshot_is_recorded() {
let shedder = AdaptiveShedder::with_cpu_usage_provider(
AdaptiveShedderConfig::default(),
Arc::new(FixedCpu(950)),
);
shedder
.allow()
.await
.expect("allowed")
.record_success()
.await;
assert_eq!(shedder.snapshot().await.cpu_usage_millis, 950);
}
}