use axum::{
body::Body,
extract::{MatchedPath, Request},
middleware::Next,
response::{IntoResponse, Response},
};
use crate::{
resil::{AdaptiveShedderConfig, ShedderRegistry, WindowConfig},
rest::{
RestError, RestResilienceConfig,
middleware::{MiddlewareMetrics, record_resilience_event},
},
};
pub async fn shedding_middleware(
registry: ShedderRegistry,
service: String,
config: RestResilienceConfig,
metrics: MiddlewareMetrics,
request: Request<Body>,
next: Next,
) -> Response {
let key = shedding_key(&service, &request);
let shedder = registry.get_or_insert(key, shedder_config(&config)).await;
let guard = match shedder.allow().await {
Ok(guard) => {
record_resilience_event(&metrics, "shedder", "pass");
guard
}
Err(_) => {
record_resilience_event(&metrics, "shedder", "drop");
return RestError::Overloaded.into_response();
}
};
let response = next.run(request).await;
if response.status().is_server_error() {
guard.record_failure().await;
} else {
guard.record_success().await;
}
response
}
fn shedder_config(config: &RestResilienceConfig) -> AdaptiveShedderConfig {
AdaptiveShedderConfig {
max_in_flight: config
.shedding_max_in_flight
.or(config.max_concurrency)
.unwrap_or(1024),
min_request_count: config.shedding_min_request_count,
max_latency: config.shedding_max_latency,
cpu_threshold_millis: config.shedding_cpu_threshold_millis,
cool_off: config.shedding_cool_off,
window: WindowConfig {
buckets: config.shedding_window_buckets,
bucket_duration: config.shedding_window_bucket_duration,
},
..AdaptiveShedderConfig::default()
}
}
fn shedding_key(service: &str, request: &Request<Body>) -> String {
let method = request.method().as_str();
let route = request
.extensions()
.get::<MatchedPath>()
.map(|path| path.as_str())
.unwrap_or("unknown");
format!("{service}:{method}:{route}")
}