use std::{
fmt::Debug,
sync::{Arc, RwLock, RwLockWriteGuard},
task::{Context, Poll},
time::Duration,
};
use anyhow::{Context as _, anyhow};
use async_trait::async_trait;
use ic_bn_lib_common::{
traits::shed::GetsSystemInfo,
types::shed::{ShedReason, ShedResponse, SystemOptions},
};
use systemstat::{Platform, System};
use tower::{Layer, Service, ServiceExt};
use tracing::{debug, error};
use super::{BoxFuture, ewma::EWMA};
use crate::Error;
#[derive(Clone)]
pub struct SystemInfo(Arc<System>);
impl SystemInfo {
pub fn new() -> Self {
Self(Arc::new(System::new()))
}
}
impl Default for SystemInfo {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl GetsSystemInfo for SystemInfo {
async fn cpu_usage(&self) -> Result<f64, Error> {
let cpu = self
.0
.cpu_load_aggregate()
.context("unable to measure CPU load")?;
tokio::time::sleep(Duration::from_millis(900)).await;
let cpu = cpu.done().context("unable to measure CPU load")?;
Ok(1.0 - cpu.idle as f64)
}
fn memory_usage(&self) -> Result<f64, Error> {
let mem = self.0.memory().context("unable to measure memory usage")?;
if mem.total.as_u64() == 0 {
return Err(anyhow!("total memory is zero").into());
}
Ok(1.0 - mem.free.as_u64() as f64 / mem.total.as_u64() as f64)
}
fn load_avg(&self) -> Result<(f64, f64, f64), Error> {
let la = self
.0
.load_average()
.context("unable to measure load average")?;
Ok((la.one as f64, la.five as f64, la.fifteen as f64))
}
}
#[derive(Debug)]
struct StateInner {
cpu: EWMA,
memory: EWMA,
load_avg: (EWMA, EWMA, EWMA),
shed_reason: Option<ShedReason>,
}
impl StateInner {
fn new(alpha: f64) -> Self {
Self {
cpu: EWMA::new(alpha),
memory: EWMA::new(alpha),
load_avg: (EWMA::new(alpha), EWMA::new(alpha), EWMA::new(alpha)),
shed_reason: None,
}
}
}
#[derive(Debug)]
pub struct State<S: GetsSystemInfo> {
opts: SystemOptions,
sys_info: S,
inner: RwLock<StateInner>,
}
impl<S: GetsSystemInfo> State<S> {
pub fn new(alpha: f64, opts: SystemOptions, sys_info: S) -> Self {
Self {
opts,
sys_info,
inner: RwLock::new(StateInner::new(alpha)),
}
}
async fn measure(&self) -> Result<(), Error> {
let cpu = self.sys_info.cpu_usage().await?;
let mem = self.sys_info.memory_usage()?;
let (l1, l5, l15) = self.sys_info.load_avg()?;
let mut inner = self.inner.write().unwrap();
inner.cpu.add(cpu);
inner.memory.add(mem);
inner.load_avg.0.add(l1);
inner.load_avg.1.add(l5);
inner.load_avg.2.add(l15);
inner.shed_reason = self.evaluate(&inner);
debug!(
"System load: CPU {cpu}, MEM {mem}, LAVG1: {l1}, LAVG5: {l5}, LAVG15: {l15}, Overload: {:?}",
inner.shed_reason
);
drop(inner); Ok(())
}
fn evaluate(&self, state: &RwLockWriteGuard<'_, StateInner>) -> Option<ShedReason> {
if self
.opts
.cpu
.map(|x| state.cpu.get().unwrap_or(0.0) > x)
.unwrap_or(false)
{
return Some(ShedReason::CPU);
}
if self
.opts
.memory
.map(|x| state.memory.get().unwrap_or(0.0) > x)
.unwrap_or(false)
{
return Some(ShedReason::Memory);
}
if self
.opts
.loadavg_1
.map(|x| state.load_avg.0.get().unwrap_or(0.0) > x)
.unwrap_or(false)
{
return Some(ShedReason::LoadAvg);
}
if self
.opts
.loadavg_5
.map(|x| state.load_avg.1.get().unwrap_or(0.0) > x)
.unwrap_or(false)
{
return Some(ShedReason::LoadAvg);
}
if self
.opts
.loadavg_15
.map(|x| state.load_avg.2.get().unwrap_or(0.0) > x)
.unwrap_or(false)
{
return Some(ShedReason::LoadAvg);
}
None
}
fn is_overloaded(&self) -> Option<ShedReason> {
self.inner.read().unwrap().shed_reason
}
async fn run(&self) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = self.measure().await {
error!("SystemLoadShedder: error: {e:#}");
}
}
}
}
#[derive(Debug, Clone)]
pub struct SystemLoadShedder<S: GetsSystemInfo, I> {
state: Arc<State<S>>,
inner: I,
}
impl<S: GetsSystemInfo, I> SystemLoadShedder<S, I> {
pub const fn new(inner: I, state: Arc<State<S>>) -> Self {
Self { state, inner }
}
}
impl<S: GetsSystemInfo, R, I> Service<R> for SystemLoadShedder<S, I>
where
R: Send + 'static,
I: Service<R> + Clone + Send + Sync + 'static,
I::Future: Send,
{
type Response = ShedResponse<I::Response>;
type Error = I::Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: R) -> Self::Future {
let shed_reason = self.state.is_overloaded();
if let Some(v) = shed_reason {
return Box::pin(async move { Ok(ShedResponse::Overload(v)) });
}
let inner = self.inner.clone();
Box::pin(async move {
let response = inner.oneshot(req).await;
Ok(ShedResponse::Inner(response?))
})
}
}
#[derive(Debug, Clone)]
pub struct SystemLoadShedderLayer<S: GetsSystemInfo>(Arc<State<S>>);
impl<S: GetsSystemInfo> SystemLoadShedderLayer<S> {
pub fn new(ewma_alpha: f64, opts: SystemOptions, sys_info: S) -> Self {
let state = Arc::new(State::new(ewma_alpha, opts, sys_info));
let state_bg = state.clone();
tokio::spawn(async move { state_bg.run().await });
Self(state)
}
}
impl<S: GetsSystemInfo, I: Clone + Send + Sync + 'static> Layer<I> for SystemLoadShedderLayer<S> {
type Service = SystemLoadShedder<S, I>;
fn layer(&self, inner: I) -> Self::Service {
SystemLoadShedder::new(inner, self.0.clone())
}
}
#[cfg(test)]
mod test {
use std::sync::Mutex;
use super::*;
#[derive(Clone, Debug)]
struct StubSystemInfoVal {
cpu: f64,
memory: f64,
l1: f64,
l5: f64,
l15: f64,
}
#[derive(Clone, Debug)]
struct StubSystemInfo {
v: Arc<Mutex<StubSystemInfoVal>>,
}
#[async_trait]
impl GetsSystemInfo for StubSystemInfo {
async fn cpu_usage(&self) -> Result<f64, Error> {
Ok(self.v.lock().unwrap().cpu)
}
fn memory_usage(&self) -> Result<f64, Error> {
Ok(self.v.lock().unwrap().memory)
}
fn load_avg(&self) -> Result<(f64, f64, f64), Error> {
let v = self.v.lock().unwrap();
Ok((v.l1, v.l5, v.l15))
}
}
#[derive(Debug, Clone)]
struct StubService;
impl Service<Duration> for StubService {
type Response = ();
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Duration) -> Self::Future {
let fut = async move {
tokio::time::sleep(req).await;
Ok(())
};
Box::pin(fut)
}
}
#[tokio::test]
async fn test_system_shedder() {
let inner = StubService;
let opts = SystemOptions {
cpu: Some(0.5),
memory: Some(0.5),
loadavg_1: Some(0.5),
loadavg_5: Some(0.5),
loadavg_15: Some(0.5),
};
let sys_info = StubSystemInfo {
v: Arc::new(Mutex::new(StubSystemInfoVal {
cpu: 0.0,
memory: 0.0,
l1: 0.0,
l5: 0.0,
l15: 0.0,
})),
};
let state = Arc::new(State::new(0.8, opts, sys_info.clone()));
let mut shedder = SystemLoadShedder::new(inner, state.clone());
let _ = state.measure().await;
let resp = shedder.call(Duration::ZERO).await.unwrap();
assert!(matches!(resp, ShedResponse::Inner(_)));
sys_info.v.lock().unwrap().cpu = 1.0;
let _ = state.measure().await;
let resp = shedder.call(Duration::ZERO).await.unwrap();
assert_eq!(resp, ShedResponse::Overload(ShedReason::CPU));
sys_info.v.lock().unwrap().cpu = 0.0;
sys_info.v.lock().unwrap().memory = 1.0;
let _ = state.measure().await;
let resp = shedder.call(Duration::ZERO).await.unwrap();
assert_eq!(resp, ShedResponse::Overload(ShedReason::Memory));
sys_info.v.lock().unwrap().memory = 0.0;
sys_info.v.lock().unwrap().l1 = 1.0;
let _ = state.measure().await;
let resp = shedder.call(Duration::ZERO).await.unwrap();
assert_eq!(resp, ShedResponse::Overload(ShedReason::LoadAvg));
sys_info.v.lock().unwrap().l1 = 0.0;
sys_info.v.lock().unwrap().l5 = 1.0;
let _ = state.measure().await;
let resp = shedder.call(Duration::ZERO).await.unwrap();
assert_eq!(resp, ShedResponse::Overload(ShedReason::LoadAvg));
sys_info.v.lock().unwrap().l5 = 0.0;
sys_info.v.lock().unwrap().l15 = 1.0;
let _ = state.measure().await;
let resp = shedder.call(Duration::ZERO).await.unwrap();
assert_eq!(resp, ShedResponse::Overload(ShedReason::LoadAvg));
sys_info.v.lock().unwrap().l15 = 0.0;
let _ = state.measure().await;
let resp = shedder.call(Duration::ZERO).await.unwrap();
assert!(matches!(resp, ShedResponse::Inner(_)));
}
}