use std::{
sync::Arc,
thread::JoinHandle,
time::{Duration, Instant}
};
use parking_lot::{Condvar, Mutex};
const MAX_PROC_MILLIS: u64 = 200;
enum State {
Idle,
Processing {
start_time: Instant
},
Timeout,
Term
}
struct Inner {
state: State
}
struct Shared {
inner: Mutex<Inner>,
signal: Condvar
}
pub(crate) fn run() -> WatchDog {
let inner = Inner { state: State::Idle };
let shared = Shared {
inner: Mutex::new(inner),
signal: Condvar::new()
};
let sh = Arc::new(shared);
let shared = Arc::clone(&sh);
let jh = std::thread::spawn(|| monitor_thread(shared));
WatchDog { sh, jh }
}
pub(crate) struct WatchDog {
sh: Arc<Shared>,
jh: JoinHandle<()>
}
impl WatchDog {
pub(crate) fn begin_process(&self) {
let mut g = self.sh.inner.lock();
g.state = State::Processing {
start_time: Instant::now()
};
self.sh.signal.notify_one();
}
pub(crate) fn end_process(&self) {
let mut g = self.sh.inner.lock();
g.state = State::Idle;
self.sh.signal.notify_one();
}
pub(crate) fn kill(self) -> std::thread::Result<()> {
let mut g = self.sh.inner.lock();
g.state = State::Term;
self.sh.signal.notify_one();
drop(g);
self.jh.join()
}
}
fn monitor_thread(sh: Arc<Shared>) {
let mut g = sh.inner.lock();
loop {
match g.state {
State::Idle => {
sh.signal.wait(&mut g);
}
State::Processing { start_time } => {
let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS);
if sh.signal.wait_until(&mut g, timeout).timed_out() {
g.state = State::Timeout;
continue;
}
}
State::Timeout => {
#[cfg(feature = "tracing")]
tracing::warn!(
"Message processing held up the dispatcher more than {}ms",
MAX_PROC_MILLIS
);
#[cfg(not(feature = "tracing"))]
eprintln!(
"Warning: Message processing held up the dispatcher more than {}ms",
MAX_PROC_MILLIS
);
g.state = State::Idle;
continue;
}
State::Term => {
break;
}
}
}
}