Skip to main content

coding_tools/
pulse.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Jonathan Shook
3
4//! Run-bounding and liveness, shared by every tool: the `--timeout` watchdog
5//! that keeps any run bounded, and the `--heartbeat` pulse that gives an agent
6//! a sign of life during a long one.
7//!
8//! Two enforcement styles, chosen per tool:
9//!
10//! * [`Watchdog`] — a hard self-bound for the tools that do their own work
11//!   (`ct-search`, `ct-view`, `ct-tree`, `ct-edit`, `ct-patch`): when the limit
12//!   passes, the process prints a one-line message and exits `2`. The mutating
13//!   tools [`disarm`](Watchdog::disarm) it before their write phase, so a
14//!   timeout can never interrupt a file write halfway.
15//! * The child-running tools (`ct-test`, `ct-each`) instead bound the **child**
16//!   through [`supervise`](crate::supervise), folding a timeout into the
17//!   verdict rather than aborting — see that module.
18//!
19//! The [`Heartbeat`] is a small thread that prints a templated line every
20//! interval — minimal by default (`[{ELAPSED}s]`), token-customisable with
21//! `--heartbeat-emit`, and routable to stdout or stderr with `--heartbeat-to`.
22//! Dynamic tokens (e.g. `ct-each`'s current `{ITEM}`) flow through a shared
23//! [`PulseState`].
24
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::{Arc, Condvar, Mutex};
27use std::time::{Duration, Instant};
28
29use crate::template;
30
31/// The default `--heartbeat-emit` template: deliberately minimal.
32pub const DEFAULT_HEARTBEAT_TEMPLATE: &str = "[{ELAPSED}s]";
33
34/// Convert a positive seconds value (fractional allowed) into a [`Duration`].
35///
36/// # Examples
37///
38/// ```
39/// use coding_tools::pulse::secs;
40/// use std::time::Duration;
41///
42/// assert_eq!(secs("--timeout", 1.5).unwrap(), Duration::from_millis(1500));
43/// assert!(secs("--timeout", 0.0).is_err());
44/// assert!(secs("--heartbeat", -3.0).is_err());
45/// ```
46pub fn secs(option: &str, value: f64) -> Result<Duration, String> {
47    if !value.is_finite() || value <= 0.0 {
48        return Err(format!(
49            "invalid {option} '{value}': must be a positive number of seconds"
50        ));
51    }
52    Duration::try_from_secs_f64(value).map_err(|e| format!("invalid {option} '{value}': {e}"))
53}
54
55/// Render a duration limit for messages: `2s`, `1.5s` — no trailing zeros.
56pub fn limit_label(limit: Duration) -> String {
57    let v = limit.as_secs_f64();
58    if v == v.trunc() {
59        format!("{}s", v as u64)
60    } else {
61        format!("{v}s")
62    }
63}
64
65// ----- Watchdog -----------------------------------------------------------------
66
67/// A hard `--timeout` bound for a self-contained (non-child-running) tool.
68///
69/// [`arm`](Watchdog::arm) spawns a thread that, once the limit passes, prints
70/// `<tool>: timed out after <limit>; aborted` to stderr and exits the process
71/// with status `2` (the suite's usage/runtime-error code). Dropping the guard
72/// — or calling [`disarm`](Watchdog::disarm) — defuses it, so a run that
73/// finishes in time (or is about to start un-interruptible work, like
74/// `ct-edit`'s write phase) is never killed.
75pub struct Watchdog {
76    disarmed: Arc<AtomicBool>,
77}
78
79impl Watchdog {
80    /// Arm a timeout for `tool`; the returned guard must stay alive while the
81    /// bound should be enforced.
82    pub fn arm(tool: &'static str, limit: Duration) -> Watchdog {
83        let disarmed = Arc::new(AtomicBool::new(false));
84        let flag = disarmed.clone();
85        std::thread::spawn(move || {
86            std::thread::sleep(limit);
87            if !flag.load(Ordering::SeqCst) {
88                eprintln!("{tool}: timed out after {}; aborted", limit_label(limit));
89                std::process::exit(2);
90            }
91        });
92        Watchdog { disarmed }
93    }
94
95    /// Defuse the watchdog; after this the limit is never enforced.
96    pub fn disarm(&self) {
97        self.disarmed.store(true, Ordering::SeqCst);
98    }
99}
100
101impl Drop for Watchdog {
102    fn drop(&mut self) {
103        self.disarm();
104    }
105}
106
107/// Arm a [`Watchdog`] from a raw `--timeout` value, if one was given. The
108/// returned guard must be held for the span the bound should cover.
109pub fn watchdog(tool: &'static str, timeout: Option<f64>) -> Result<Option<Watchdog>, String> {
110    match timeout {
111        Some(v) => Ok(Some(Watchdog::arm(tool, secs("--timeout", v)?))),
112        None => Ok(None),
113    }
114}
115
116// ----- Heartbeat ----------------------------------------------------------------
117
118/// Stream selector for `--heartbeat-to`.
119#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
120pub enum PulseTo {
121    /// Write pulses to standard error (the default; never pollutes `--emit`).
122    Stderr,
123    /// Write pulses to standard output.
124    Stdout,
125}
126
127/// Live token values a heartbeat renders each pulse, updatable while running
128/// (e.g. `ct-each` sets `{ITEM}`/`{INDEX}`/`{DONE}`/`{TOTAL}` as it advances).
129#[derive(Default)]
130pub struct PulseState {
131    pairs: Mutex<Vec<(String, String)>>,
132}
133
134impl PulseState {
135    /// A fresh, empty state behind an [`Arc`] for sharing with the pulse thread.
136    pub fn new() -> Arc<PulseState> {
137        Arc::new(PulseState::default())
138    }
139
140    /// Set (or replace) one token's current value.
141    pub fn set(&self, key: &str, value: &str) {
142        let mut pairs = self.pairs.lock().unwrap();
143        match pairs.iter_mut().find(|(k, _)| k == key) {
144            Some((_, v)) => *v = value.to_string(),
145            None => pairs.push((key.to_string(), value.to_string())),
146        }
147    }
148
149    fn snapshot(&self) -> Vec<(String, String)> {
150        self.pairs.lock().unwrap().clone()
151    }
152}
153
154/// The shared `--heartbeat` option group, `#[command(flatten)]`-ed into every
155/// leaf tool's CLI so the flags are named and documented identically.
156#[derive(clap::Args, Debug)]
157pub struct HeartbeatOpts {
158    /// Print a liveness pulse every SECS seconds (fractional allowed) while the run is in progress.
159    #[arg(long, value_name = "SECS")]
160    pub heartbeat: Option<f64>,
161
162    /// Heartbeat line template. Tokens: {ELAPSED} (whole seconds so far), {TOOL}, plus per-tool tokens. Default: "[{ELAPSED}s]".
163    #[arg(long, value_name = "TEMPLATE")]
164    pub heartbeat_emit: Option<String>,
165
166    /// Stream heartbeat pulses are written to.
167    #[arg(long, value_enum, default_value_t = PulseTo::Stderr)]
168    pub heartbeat_to: PulseTo,
169}
170
171impl HeartbeatOpts {
172    /// Start the pulse if `--heartbeat` was given. `state` carries the dynamic
173    /// tokens; the `{TOOL}` token is set here. Returns the guard that stops the
174    /// pulse on drop (`None` when no heartbeat was requested).
175    pub fn start(
176        &self,
177        tool: &str,
178        state: Arc<PulseState>,
179    ) -> Result<Option<Heartbeat>, String> {
180        let Some(every) = self.heartbeat else {
181            return Ok(None);
182        };
183        let interval = secs("--heartbeat", every)?;
184        state.set("TOOL", tool);
185        let template = self
186            .heartbeat_emit
187            .clone()
188            .unwrap_or_else(|| DEFAULT_HEARTBEAT_TEMPLATE.to_string());
189        Ok(Some(Heartbeat::start(
190            interval,
191            template,
192            self.heartbeat_to,
193            state,
194        )))
195    }
196}
197
198/// A running heartbeat: a thread printing one templated line per interval.
199/// Dropping the guard stops the pulse promptly (before drop returns), so no
200/// pulse can land after a tool's final output.
201pub struct Heartbeat {
202    stop: Arc<(Mutex<bool>, Condvar)>,
203    handle: Option<std::thread::JoinHandle<()>>,
204}
205
206impl Heartbeat {
207    /// Start pulsing every `interval`, rendering `template` from `{ELAPSED}`
208    /// plus the current `state` tokens, onto the `to` stream.
209    pub fn start(
210        interval: Duration,
211        template: String,
212        to: PulseTo,
213        state: Arc<PulseState>,
214    ) -> Heartbeat {
215        let stop = Arc::new((Mutex::new(false), Condvar::new()));
216        let shared = stop.clone();
217        let handle = std::thread::spawn(move || {
218            let started = Instant::now();
219            let (lock, cvar) = &*shared;
220            let mut stopped = lock.lock().unwrap();
221            loop {
222                // Sleep one interval, waking early only when stopped.
223                let tick_start = Instant::now();
224                while !*stopped {
225                    let elapsed = tick_start.elapsed();
226                    if elapsed >= interval {
227                        break;
228                    }
229                    let (guard, _) = cvar.wait_timeout(stopped, interval - elapsed).unwrap();
230                    stopped = guard;
231                }
232                if *stopped {
233                    break;
234                }
235                let elapsed_s = started.elapsed().as_secs().to_string();
236                let mut pairs = vec![("ELAPSED".to_string(), elapsed_s)];
237                pairs.extend(state.snapshot());
238                let refs: Vec<(&str, &str)> = pairs
239                    .iter()
240                    .map(|(k, v)| (k.as_str(), v.as_str()))
241                    .collect();
242                let line = template::render(&template, &refs);
243                match to {
244                    PulseTo::Stdout => println!("{line}"),
245                    PulseTo::Stderr => eprintln!("{line}"),
246                }
247            }
248        });
249        Heartbeat {
250            stop,
251            handle: Some(handle),
252        }
253    }
254}
255
256impl Drop for Heartbeat {
257    fn drop(&mut self) {
258        let (lock, cvar) = &*self.stop;
259        *lock.lock().unwrap() = true;
260        cvar.notify_all();
261        if let Some(h) = self.handle.take() {
262            let _ = h.join();
263        }
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn secs_accepts_positive_fractions_only() {
273        assert_eq!(
274            secs("--timeout", 0.25).unwrap(),
275            Duration::from_millis(250)
276        );
277        assert!(secs("--timeout", 0.0).is_err());
278        assert!(secs("--timeout", -1.0).is_err());
279        assert!(secs("--timeout", f64::NAN).is_err());
280    }
281
282    #[test]
283    fn limit_label_drops_trailing_zeroes() {
284        assert_eq!(limit_label(Duration::from_secs(2)), "2s");
285        assert_eq!(limit_label(Duration::from_millis(1500)), "1.5s");
286    }
287
288    #[test]
289    fn pulse_state_set_replaces_existing_keys() {
290        let state = PulseState::new();
291        state.set("ITEM", "a");
292        state.set("ITEM", "b");
293        state.set("INDEX", "1");
294        let snap = state.snapshot();
295        assert_eq!(snap.len(), 2);
296        assert!(snap.contains(&("ITEM".to_string(), "b".to_string())));
297    }
298
299    #[test]
300    fn watchdog_disarmed_by_drop_does_not_kill() {
301        // Arm a tiny watchdog and drop it immediately; if disarm-on-drop failed,
302        // the process would exit(2) and the test run itself would die.
303        let w = Watchdog::arm("pulse-test", Duration::from_millis(20));
304        drop(w);
305        std::thread::sleep(Duration::from_millis(60));
306    }
307
308    #[test]
309    fn heartbeat_stops_on_drop() {
310        let state = PulseState::new();
311        let hb = Heartbeat::start(
312            Duration::from_millis(5),
313            "[{ELAPSED}s]".to_string(),
314            PulseTo::Stderr,
315            state,
316        );
317        std::thread::sleep(Duration::from_millis(12));
318        drop(hb); // must join promptly rather than hanging the test
319    }
320}