1use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::{Arc, Condvar, Mutex};
27use std::time::{Duration, Instant};
28
29use crate::template;
30
31pub const DEFAULT_HEARTBEAT_TEMPLATE: &str = "[{ELAPSED}s]";
33
34pub fn secs(option: &str, value: f64) -> Result<Duration, String> {
47 if !value.is_finite() || value <= 0.0 {
48 return Err(format!(
49 "invalid {option} '{value}': must be a positive number of seconds"
50 ));
51 }
52 Duration::try_from_secs_f64(value).map_err(|e| format!("invalid {option} '{value}': {e}"))
53}
54
55pub fn limit_label(limit: Duration) -> String {
57 let v = limit.as_secs_f64();
58 if v == v.trunc() {
59 format!("{}s", v as u64)
60 } else {
61 format!("{v}s")
62 }
63}
64
65pub struct Watchdog {
76 disarmed: Arc<AtomicBool>,
77}
78
79impl Watchdog {
80 pub fn arm(tool: &'static str, limit: Duration) -> Watchdog {
83 let disarmed = Arc::new(AtomicBool::new(false));
84 let flag = disarmed.clone();
85 std::thread::spawn(move || {
86 std::thread::sleep(limit);
87 if !flag.load(Ordering::SeqCst) {
88 eprintln!("{tool}: timed out after {}; aborted", limit_label(limit));
89 std::process::exit(2);
90 }
91 });
92 Watchdog { disarmed }
93 }
94
95 pub fn disarm(&self) {
97 self.disarmed.store(true, Ordering::SeqCst);
98 }
99}
100
101impl Drop for Watchdog {
102 fn drop(&mut self) {
103 self.disarm();
104 }
105}
106
107pub fn watchdog(tool: &'static str, timeout: Option<f64>) -> Result<Option<Watchdog>, String> {
110 match timeout {
111 Some(v) => Ok(Some(Watchdog::arm(tool, secs("--timeout", v)?))),
112 None => Ok(None),
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
120pub enum PulseTo {
121 Stderr,
123 Stdout,
125}
126
127#[derive(Default)]
130pub struct PulseState {
131 pairs: Mutex<Vec<(String, String)>>,
132}
133
134impl PulseState {
135 pub fn new() -> Arc<PulseState> {
137 Arc::new(PulseState::default())
138 }
139
140 pub fn set(&self, key: &str, value: &str) {
142 let mut pairs = self.pairs.lock().unwrap();
143 match pairs.iter_mut().find(|(k, _)| k == key) {
144 Some((_, v)) => *v = value.to_string(),
145 None => pairs.push((key.to_string(), value.to_string())),
146 }
147 }
148
149 fn snapshot(&self) -> Vec<(String, String)> {
150 self.pairs.lock().unwrap().clone()
151 }
152}
153
154#[derive(clap::Args, Debug)]
157pub struct HeartbeatOpts {
158 #[arg(long, value_name = "SECS")]
160 pub heartbeat: Option<f64>,
161
162 #[arg(long, value_name = "TEMPLATE")]
164 pub heartbeat_emit: Option<String>,
165
166 #[arg(long, value_enum, default_value_t = PulseTo::Stderr)]
168 pub heartbeat_to: PulseTo,
169}
170
171impl HeartbeatOpts {
172 pub fn start(
176 &self,
177 tool: &str,
178 state: Arc<PulseState>,
179 ) -> Result<Option<Heartbeat>, String> {
180 let Some(every) = self.heartbeat else {
181 return Ok(None);
182 };
183 let interval = secs("--heartbeat", every)?;
184 state.set("TOOL", tool);
185 let template = self
186 .heartbeat_emit
187 .clone()
188 .unwrap_or_else(|| DEFAULT_HEARTBEAT_TEMPLATE.to_string());
189 Ok(Some(Heartbeat::start(
190 interval,
191 template,
192 self.heartbeat_to,
193 state,
194 )))
195 }
196}
197
198pub struct Heartbeat {
202 stop: Arc<(Mutex<bool>, Condvar)>,
203 handle: Option<std::thread::JoinHandle<()>>,
204}
205
206impl Heartbeat {
207 pub fn start(
210 interval: Duration,
211 template: String,
212 to: PulseTo,
213 state: Arc<PulseState>,
214 ) -> Heartbeat {
215 let stop = Arc::new((Mutex::new(false), Condvar::new()));
216 let shared = stop.clone();
217 let handle = std::thread::spawn(move || {
218 let started = Instant::now();
219 let (lock, cvar) = &*shared;
220 let mut stopped = lock.lock().unwrap();
221 loop {
222 let tick_start = Instant::now();
224 while !*stopped {
225 let elapsed = tick_start.elapsed();
226 if elapsed >= interval {
227 break;
228 }
229 let (guard, _) = cvar.wait_timeout(stopped, interval - elapsed).unwrap();
230 stopped = guard;
231 }
232 if *stopped {
233 break;
234 }
235 let elapsed_s = started.elapsed().as_secs().to_string();
236 let mut pairs = vec![("ELAPSED".to_string(), elapsed_s)];
237 pairs.extend(state.snapshot());
238 let refs: Vec<(&str, &str)> = pairs
239 .iter()
240 .map(|(k, v)| (k.as_str(), v.as_str()))
241 .collect();
242 let line = template::render(&template, &refs);
243 match to {
244 PulseTo::Stdout => println!("{line}"),
245 PulseTo::Stderr => eprintln!("{line}"),
246 }
247 }
248 });
249 Heartbeat {
250 stop,
251 handle: Some(handle),
252 }
253 }
254}
255
256impl Drop for Heartbeat {
257 fn drop(&mut self) {
258 let (lock, cvar) = &*self.stop;
259 *lock.lock().unwrap() = true;
260 cvar.notify_all();
261 if let Some(h) = self.handle.take() {
262 let _ = h.join();
263 }
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 fn secs_accepts_positive_fractions_only() {
273 assert_eq!(
274 secs("--timeout", 0.25).unwrap(),
275 Duration::from_millis(250)
276 );
277 assert!(secs("--timeout", 0.0).is_err());
278 assert!(secs("--timeout", -1.0).is_err());
279 assert!(secs("--timeout", f64::NAN).is_err());
280 }
281
282 #[test]
283 fn limit_label_drops_trailing_zeroes() {
284 assert_eq!(limit_label(Duration::from_secs(2)), "2s");
285 assert_eq!(limit_label(Duration::from_millis(1500)), "1.5s");
286 }
287
288 #[test]
289 fn pulse_state_set_replaces_existing_keys() {
290 let state = PulseState::new();
291 state.set("ITEM", "a");
292 state.set("ITEM", "b");
293 state.set("INDEX", "1");
294 let snap = state.snapshot();
295 assert_eq!(snap.len(), 2);
296 assert!(snap.contains(&("ITEM".to_string(), "b".to_string())));
297 }
298
299 #[test]
300 fn watchdog_disarmed_by_drop_does_not_kill() {
301 let w = Watchdog::arm("pulse-test", Duration::from_millis(20));
304 drop(w);
305 std::thread::sleep(Duration::from_millis(60));
306 }
307
308 #[test]
309 fn heartbeat_stops_on_drop() {
310 let state = PulseState::new();
311 let hb = Heartbeat::start(
312 Duration::from_millis(5),
313 "[{ELAPSED}s]".to_string(),
314 PulseTo::Stderr,
315 state,
316 );
317 std::thread::sleep(Duration::from_millis(12));
318 drop(hb); }
320}