use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use crate::template;
pub const DEFAULT_HEARTBEAT_TEMPLATE: &str = "[{ELAPSED}s]";
pub fn secs(option: &str, value: f64) -> Result<Duration, String> {
if !value.is_finite() || value <= 0.0 {
return Err(format!(
"invalid {option} '{value}': must be a positive number of seconds"
));
}
Duration::try_from_secs_f64(value).map_err(|e| format!("invalid {option} '{value}': {e}"))
}
pub fn limit_label(limit: Duration) -> String {
let v = limit.as_secs_f64();
if v == v.trunc() {
format!("{}s", v as u64)
} else {
format!("{v}s")
}
}
pub struct Watchdog {
disarmed: Arc<AtomicBool>,
}
impl Watchdog {
pub fn arm(tool: &'static str, limit: Duration) -> Watchdog {
let disarmed = Arc::new(AtomicBool::new(false));
let flag = disarmed.clone();
std::thread::spawn(move || {
std::thread::sleep(limit);
if !flag.load(Ordering::SeqCst) {
eprintln!("{tool}: timed out after {}; aborted", limit_label(limit));
std::process::exit(2);
}
});
Watchdog { disarmed }
}
pub fn disarm(&self) {
self.disarmed.store(true, Ordering::SeqCst);
}
}
impl Drop for Watchdog {
fn drop(&mut self) {
self.disarm();
}
}
pub fn watchdog(tool: &'static str, timeout: Option<f64>) -> Result<Option<Watchdog>, String> {
match timeout {
Some(v) => Ok(Some(Watchdog::arm(tool, secs("--timeout", v)?))),
None => Ok(None),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
pub enum PulseTo {
Stderr,
Stdout,
}
#[derive(Default)]
pub struct PulseState {
pairs: Mutex<Vec<(String, String)>>,
}
impl PulseState {
pub fn new() -> Arc<PulseState> {
Arc::new(PulseState::default())
}
pub fn set(&self, key: &str, value: &str) {
let mut pairs = self.pairs.lock().unwrap();
match pairs.iter_mut().find(|(k, _)| k == key) {
Some((_, v)) => *v = value.to_string(),
None => pairs.push((key.to_string(), value.to_string())),
}
}
fn snapshot(&self) -> Vec<(String, String)> {
self.pairs.lock().unwrap().clone()
}
}
#[derive(clap::Args, Debug)]
pub struct HeartbeatOpts {
#[arg(long, value_name = "SECS")]
pub heartbeat: Option<f64>,
#[arg(long, value_name = "TEMPLATE")]
pub heartbeat_emit: Option<String>,
#[arg(long, value_enum, default_value_t = PulseTo::Stderr)]
pub heartbeat_to: PulseTo,
}
impl HeartbeatOpts {
pub fn start(
&self,
tool: &str,
state: Arc<PulseState>,
) -> Result<Option<Heartbeat>, String> {
let Some(every) = self.heartbeat else {
return Ok(None);
};
let interval = secs("--heartbeat", every)?;
state.set("TOOL", tool);
let template = self
.heartbeat_emit
.clone()
.unwrap_or_else(|| DEFAULT_HEARTBEAT_TEMPLATE.to_string());
Ok(Some(Heartbeat::start(
interval,
template,
self.heartbeat_to,
state,
)))
}
}
pub struct Heartbeat {
stop: Arc<(Mutex<bool>, Condvar)>,
handle: Option<std::thread::JoinHandle<()>>,
}
impl Heartbeat {
pub fn start(
interval: Duration,
template: String,
to: PulseTo,
state: Arc<PulseState>,
) -> Heartbeat {
let stop = Arc::new((Mutex::new(false), Condvar::new()));
let shared = stop.clone();
let handle = std::thread::spawn(move || {
let started = Instant::now();
let (lock, cvar) = &*shared;
let mut stopped = lock.lock().unwrap();
loop {
let tick_start = Instant::now();
while !*stopped {
let elapsed = tick_start.elapsed();
if elapsed >= interval {
break;
}
let (guard, _) = cvar.wait_timeout(stopped, interval - elapsed).unwrap();
stopped = guard;
}
if *stopped {
break;
}
let elapsed_s = started.elapsed().as_secs().to_string();
let mut pairs = vec![("ELAPSED".to_string(), elapsed_s)];
pairs.extend(state.snapshot());
let refs: Vec<(&str, &str)> = pairs
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
let line = template::render(&template, &refs);
match to {
PulseTo::Stdout => println!("{line}"),
PulseTo::Stderr => eprintln!("{line}"),
}
}
});
Heartbeat {
stop,
handle: Some(handle),
}
}
}
impl Drop for Heartbeat {
fn drop(&mut self) {
let (lock, cvar) = &*self.stop;
*lock.lock().unwrap() = true;
cvar.notify_all();
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn secs_accepts_positive_fractions_only() {
assert_eq!(
secs("--timeout", 0.25).unwrap(),
Duration::from_millis(250)
);
assert!(secs("--timeout", 0.0).is_err());
assert!(secs("--timeout", -1.0).is_err());
assert!(secs("--timeout", f64::NAN).is_err());
}
#[test]
fn limit_label_drops_trailing_zeroes() {
assert_eq!(limit_label(Duration::from_secs(2)), "2s");
assert_eq!(limit_label(Duration::from_millis(1500)), "1.5s");
}
#[test]
fn pulse_state_set_replaces_existing_keys() {
let state = PulseState::new();
state.set("ITEM", "a");
state.set("ITEM", "b");
state.set("INDEX", "1");
let snap = state.snapshot();
assert_eq!(snap.len(), 2);
assert!(snap.contains(&("ITEM".to_string(), "b".to_string())));
}
#[test]
fn watchdog_disarmed_by_drop_does_not_kill() {
let w = Watchdog::arm("pulse-test", Duration::from_millis(20));
drop(w);
std::thread::sleep(Duration::from_millis(60));
}
#[test]
fn heartbeat_stops_on_drop() {
let state = PulseState::new();
let hb = Heartbeat::start(
Duration::from_millis(5),
"[{ELAPSED}s]".to_string(),
PulseTo::Stderr,
state,
);
std::thread::sleep(Duration::from_millis(12));
drop(hb); }
}