#[macro_use]
extern crate log;
use std::sync::atomic;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
const INIT: usize = 0;
const LIVE: usize = 1;
const TEST: usize = 2;
const RISK: usize = 3;
const DEAD: usize = 4;
pub struct Vigil {
shared: Arc<VigilShared>,
}
impl Vigil {
pub fn create(
interval_ms: usize,
missed_test_cb: Option<Callback>,
at_risk_cb: Option<Callback>,
stall_detected_cb: Option<Callback>,
) -> (Self, thread::JoinHandle<()>) {
let shared = Arc::new(VigilShared {
tick_interval: atomic::AtomicUsize::new(interval_ms),
state: atomic::AtomicUsize::new(INIT),
terminated: atomic::AtomicBool::new(false),
});
let callbacks = VigilCallbacks {
missed_test_cb,
at_risk_cb,
stall_detected_cb,
};
let thread = thread::spawn({
let shared = shared.clone();
move || shared.watch(callbacks)
});
(Vigil { shared }, thread)
}
pub fn notify(&self) {
self.shared.state.store(LIVE, atomic::Ordering::Relaxed);
}
pub fn set_interval(&self, interval_ms: usize) {
self.shared
.tick_interval
.store(interval_ms, atomic::Ordering::Relaxed);
self.notify();
}
}
impl Drop for Vigil {
fn drop(&mut self) {
self.shared
.terminated
.store(true, atomic::Ordering::Relaxed);
}
}
type Callback = Box<dyn Fn() + Send + 'static>;
struct VigilShared {
tick_interval: atomic::AtomicUsize,
state: atomic::AtomicUsize,
terminated: atomic::AtomicBool,
}
struct VigilCallbacks {
missed_test_cb: Option<Callback>,
at_risk_cb: Option<Callback>,
stall_detected_cb: Option<Callback>,
}
impl VigilShared {
fn watch(&self, callbacks: VigilCallbacks) {
loop {
if self.terminated.load(atomic::Ordering::Relaxed) {
info!("Vigil is terminating");
break;
}
match self.state.load(atomic::Ordering::Relaxed) {
INIT => info!("Liveness not initialized... waiting"),
LIVE => {
info!("Software is live - Re-testing");
self.state.store(TEST, atomic::Ordering::Relaxed);
}
TEST => {
warn!("Software missed a test - Temporary glitch/slowdown?");
let _ = self.state.compare_exchange(
TEST,
RISK,
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
);
if let Some(ref cb) = callbacks.missed_test_cb {
cb();
}
}
RISK => {
error!("Software missed multiple tests - Stall detected?");
let _ = self.state.compare_exchange(
RISK,
DEAD,
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
);
if let Some(ref cb) = callbacks.at_risk_cb {
cb();
}
}
DEAD => {
error!("Software is still unresponsive - Likely stalled");
if let Some(ref cb) = callbacks.stall_detected_cb {
cb();
}
}
v => {
warn!("Liveness check had unexpected value {}, resetting", v);
self.state.store(INIT, atomic::Ordering::Relaxed);
}
}
let interval_ms = self.tick_interval.load(atomic::Ordering::Relaxed) as u64;
thread::sleep(Duration::from_millis(interval_ms));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_callbacks(status: Arc<atomic::AtomicUsize>) -> (Callback, Callback, Callback) {
(
Box::new({
let status = status.clone();
move || status.store(TEST, atomic::Ordering::Relaxed)
}),
Box::new({
let status = status.clone();
move || status.store(RISK, atomic::Ordering::Relaxed)
}),
Box::new({
let status = status;
move || status.store(DEAD, atomic::Ordering::Relaxed)
}),
)
}
macro_rules! test {
($name:ident, $sleep:expr, $interval:expr, $status:expr) => {
#[test]
fn $name() {
let status = Arc::new(atomic::AtomicUsize::new(INIT));
let (a, b, c) = create_callbacks(status.clone());
let (vigil, thread) = Vigil::create(100, Some(a), Some(b), Some(c));
for _ in 1..10 {
std::thread::sleep(Duration::from_millis(50));
vigil.notify();
}
vigil.set_interval($interval);
std::thread::sleep(Duration::from_millis($sleep));
vigil.set_interval(100);
for _ in 1..10 {
std::thread::sleep(Duration::from_millis(50));
vigil.notify();
}
let status = status.load(atomic::Ordering::Relaxed);
assert_eq!($status, status);
drop(vigil);
thread.join().unwrap();
}
};
($name:ident, $sleep:expr, $status:expr) => {
test!($name, $sleep, 100, $status);
};
}
test!(no_false_positives, 0, INIT);
test!(miss_single_test, 200, TEST);
test!(miss_multiple_tests, 300, RISK);
test!(complete_stall, 500, DEAD);
test!(predicted_stall, 500, 750, INIT);
}