1use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::{Arc, Condvar, Mutex};
27use std::time::{Duration, Instant};
28
29use crate::template;
30
31pub const DEFAULT_HEARTBEAT_TEMPLATE: &str = "[{ELAPSED}s]";
33
34pub fn secs(option: &str, value: f64) -> Result<Duration, String> {
47 if !value.is_finite() || value <= 0.0 {
48 return Err(format!(
49 "invalid {option} '{value}': must be a positive number of seconds"
50 ));
51 }
52 Duration::try_from_secs_f64(value).map_err(|e| format!("invalid {option} '{value}': {e}"))
53}
54
55pub fn limit_label(limit: Duration) -> String {
57 let v = limit.as_secs_f64();
58 if v == v.trunc() {
59 format!("{}s", v as u64)
60 } else {
61 format!("{v}s")
62 }
63}
64
65pub struct Watchdog {
76 disarmed: Arc<AtomicBool>,
77}
78
79impl Watchdog {
80 pub fn arm(tool: &'static str, limit: Duration) -> Watchdog {
83 let disarmed = Arc::new(AtomicBool::new(false));
84 let flag = disarmed.clone();
85 std::thread::spawn(move || {
86 std::thread::sleep(limit);
87 if !flag.load(Ordering::SeqCst) {
88 eprintln!("{tool}: timed out after {}; aborted", limit_label(limit));
89 std::process::exit(2);
90 }
91 });
92 Watchdog { disarmed }
93 }
94
95 pub fn disarm(&self) {
97 self.disarmed.store(true, Ordering::SeqCst);
98 }
99}
100
101impl Drop for Watchdog {
102 fn drop(&mut self) {
103 self.disarm();
104 }
105}
106
107pub fn watchdog(tool: &'static str, timeout: Option<f64>) -> Result<Option<Watchdog>, String> {
110 match timeout {
111 Some(v) => Ok(Some(Watchdog::arm(tool, secs("--timeout", v)?))),
112 None => Ok(None),
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
120pub enum PulseTo {
121 Stderr,
123 Stdout,
125}
126
127#[derive(Default)]
130pub struct PulseState {
131 pairs: Mutex<Vec<(String, String)>>,
132}
133
134impl PulseState {
135 pub fn new() -> Arc<PulseState> {
137 Arc::new(PulseState::default())
138 }
139
140 pub fn set(&self, key: &str, value: &str) {
142 let mut pairs = self.pairs.lock().unwrap();
143 match pairs.iter_mut().find(|(k, _)| k == key) {
144 Some((_, v)) => *v = value.to_string(),
145 None => pairs.push((key.to_string(), value.to_string())),
146 }
147 }
148
149 fn snapshot(&self) -> Vec<(String, String)> {
150 self.pairs.lock().unwrap().clone()
151 }
152}
153
154#[derive(clap::Args, Debug)]
157pub struct HeartbeatOpts {
158 #[arg(long, value_name = "SECS")]
160 pub heartbeat: Option<f64>,
161
162 #[arg(long, value_name = "TEMPLATE")]
164 pub heartbeat_emit: Option<String>,
165
166 #[arg(long, value_enum, default_value_t = PulseTo::Stderr)]
168 pub heartbeat_to: PulseTo,
169}
170
171impl HeartbeatOpts {
172 pub fn start(&self, tool: &str, state: Arc<PulseState>) -> Result<Option<Heartbeat>, String> {
176 let Some(every) = self.heartbeat else {
177 return Ok(None);
178 };
179 let interval = secs("--heartbeat", every)?;
180 state.set("TOOL", tool);
181 let template = self
182 .heartbeat_emit
183 .clone()
184 .unwrap_or_else(|| DEFAULT_HEARTBEAT_TEMPLATE.to_string());
185 Ok(Some(Heartbeat::start(
186 interval,
187 template,
188 self.heartbeat_to,
189 state,
190 )))
191 }
192}
193
194pub struct Heartbeat {
198 stop: Arc<(Mutex<bool>, Condvar)>,
199 handle: Option<std::thread::JoinHandle<()>>,
200}
201
202impl Heartbeat {
203 pub fn start(
206 interval: Duration,
207 template: String,
208 to: PulseTo,
209 state: Arc<PulseState>,
210 ) -> Heartbeat {
211 let stop = Arc::new((Mutex::new(false), Condvar::new()));
212 let shared = stop.clone();
213 let handle = std::thread::spawn(move || {
214 let started = Instant::now();
215 let (lock, cvar) = &*shared;
216 let mut stopped = lock.lock().unwrap();
217 loop {
218 let tick_start = Instant::now();
220 while !*stopped {
221 let elapsed = tick_start.elapsed();
222 if elapsed >= interval {
223 break;
224 }
225 let (guard, _) = cvar.wait_timeout(stopped, interval - elapsed).unwrap();
226 stopped = guard;
227 }
228 if *stopped {
229 break;
230 }
231 let elapsed_s = started.elapsed().as_secs().to_string();
232 let mut pairs = vec![("ELAPSED".to_string(), elapsed_s)];
233 pairs.extend(state.snapshot());
234 let refs: Vec<(&str, &str)> = pairs
235 .iter()
236 .map(|(k, v)| (k.as_str(), v.as_str()))
237 .collect();
238 let line = template::render(&template, &refs);
239 match to {
240 PulseTo::Stdout => println!("{line}"),
241 PulseTo::Stderr => eprintln!("{line}"),
242 }
243 }
244 });
245 Heartbeat {
246 stop,
247 handle: Some(handle),
248 }
249 }
250}
251
252impl Drop for Heartbeat {
253 fn drop(&mut self) {
254 let (lock, cvar) = &*self.stop;
255 *lock.lock().unwrap() = true;
256 cvar.notify_all();
257 if let Some(h) = self.handle.take() {
258 let _ = h.join();
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 #[test]
268 fn secs_accepts_positive_fractions_only() {
269 assert_eq!(secs("--timeout", 0.25).unwrap(), Duration::from_millis(250));
270 assert!(secs("--timeout", 0.0).is_err());
271 assert!(secs("--timeout", -1.0).is_err());
272 assert!(secs("--timeout", f64::NAN).is_err());
273 }
274
275 #[test]
276 fn limit_label_drops_trailing_zeroes() {
277 assert_eq!(limit_label(Duration::from_secs(2)), "2s");
278 assert_eq!(limit_label(Duration::from_millis(1500)), "1.5s");
279 }
280
281 #[test]
282 fn pulse_state_set_replaces_existing_keys() {
283 let state = PulseState::new();
284 state.set("ITEM", "a");
285 state.set("ITEM", "b");
286 state.set("INDEX", "1");
287 let snap = state.snapshot();
288 assert_eq!(snap.len(), 2);
289 assert!(snap.contains(&("ITEM".to_string(), "b".to_string())));
290 }
291
292 #[test]
293 fn watchdog_disarmed_by_drop_does_not_kill() {
294 let w = Watchdog::arm("pulse-test", Duration::from_millis(20));
297 drop(w);
298 std::thread::sleep(Duration::from_millis(60));
299 }
300
301 #[test]
302 fn heartbeat_stops_on_drop() {
303 let state = PulseState::new();
304 let hb = Heartbeat::start(
305 Duration::from_millis(5),
306 "[{ELAPSED}s]".to_string(),
307 PulseTo::Stderr,
308 state,
309 );
310 std::thread::sleep(Duration::from_millis(12));
311 drop(hb); }
313}