use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;
use crate::core::error::CoreError;
static ARMED: AtomicBool = AtomicBool::new(false);
static TRIPPED: AtomicBool = AtomicBool::new(false);
static PEAK_AT_TRIP: AtomicU64 = AtomicU64::new(0);
static BUDGET: AtomicU64 = AtomicU64::new(0);
pub fn checkpoint() -> Result<(), CoreError> {
if TRIPPED.load(Ordering::Relaxed) {
Err(CoreError::BudgetExceeded {
needed: PEAK_AT_TRIP.load(Ordering::Acquire),
budget: BUDGET.load(Ordering::Acquire),
})
} else {
Ok(())
}
}
#[derive(Debug)]
pub enum GovernorError {
AlreadyActive,
}
impl std::fmt::Display for GovernorError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GovernorError::AlreadyActive => write!(f, "a memory governor is already active"),
}
}
}
impl std::error::Error for GovernorError {}
#[derive(Debug)]
pub struct MemoryGovernor {
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl MemoryGovernor {
pub fn start<F>(budget_bytes: u64, poll: Duration, rss_source: F) -> Result<Self, GovernorError>
where
F: Fn() -> u64 + Send + 'static,
{
if ARMED.swap(true, Ordering::AcqRel) {
return Err(GovernorError::AlreadyActive);
}
TRIPPED.store(false, Ordering::Release);
PEAK_AT_TRIP.store(0, Ordering::Release);
BUDGET.store(budget_bytes, Ordering::Release);
let initial = rss_source();
if initial > budget_bytes {
PEAK_AT_TRIP.store(initial, Ordering::Release);
TRIPPED.store(true, Ordering::Release);
}
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = Arc::clone(&stop);
let handle = std::thread::spawn(move || loop {
let rss = rss_source();
if rss > budget_bytes {
PEAK_AT_TRIP.store(rss, Ordering::Release);
TRIPPED.store(true, Ordering::Release);
break;
}
if stop_thread.load(Ordering::Acquire) {
break;
}
std::thread::sleep(poll);
});
Ok(Self {
stop,
handle: Some(handle),
})
}
}
impl Drop for MemoryGovernor {
fn drop(&mut self) {
self.stop.store(true, Ordering::Release);
if let Some(h) = self.handle.take() {
let _ = h.join();
}
TRIPPED.store(false, Ordering::Release);
PEAK_AT_TRIP.store(0, Ordering::Release);
ARMED.store(false, Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn checkpoint_ok_when_unarmed_and_governor_error_displays() {
assert!(checkpoint().is_ok());
assert!(GovernorError::AlreadyActive
.to_string()
.contains("already active"));
}
}