seq_runtime/
watchdog.rs

1//! Watchdog timer for detecting stuck strands
2//!
3//! Monitors strand execution time and triggers alerts when strands run too long
4//! without yielding. This helps detect infinite loops and runaway computation.
5//!
6//! ## Configuration (Environment Variables)
7//!
8//! | Variable | Default | Description |
9//! |----------|---------|-------------|
10//! | `SEQ_WATCHDOG_SECS` | `0` (disabled) | Threshold in seconds for "stuck" strand |
11//! | `SEQ_WATCHDOG_INTERVAL` | `5` | Check frequency in seconds |
12//! | `SEQ_WATCHDOG_ACTION` | `warn` | Action: `warn` (dump diagnostics) or `exit` (terminate) |
13//!
14//! ## Example
15//!
16//! ```bash
17//! # Enable watchdog with 30 second threshold, check every 10 seconds
18//! SEQ_WATCHDOG_SECS=30 SEQ_WATCHDOG_INTERVAL=10 ./my-program
19//!
20//! # Enable watchdog that exits on stuck strand
21//! SEQ_WATCHDOG_SECS=60 SEQ_WATCHDOG_ACTION=exit ./my-program
22//! ```
23//!
24//! ## Design
25//!
26//! The watchdog runs on a dedicated thread and periodically scans the strand
27//! registry. It compares each strand's spawn time against the current time
28//! to detect strands that have been running longer than the threshold.
29//!
30//! This piggybacks on the existing strand registry infrastructure, requiring
31//! no additional tracking overhead on the hot path.
32
33use crate::diagnostics::dump_diagnostics;
34use crate::scheduler::strand_registry;
35use std::sync::Once;
36use std::sync::atomic::{AtomicU64, Ordering};
37use std::time::{Duration, SystemTime, UNIX_EPOCH};
38
39static WATCHDOG_INIT: Once = Once::new();
40// Tracks which strand triggered the watchdog (0 = none yet)
41static WATCHDOG_TRIGGERED_STRAND: AtomicU64 = AtomicU64::new(0);
42
43/// Watchdog configuration
44#[derive(Debug, Clone)]
45pub struct WatchdogConfig {
46    /// Threshold in seconds for considering a strand "stuck"
47    pub threshold_secs: u64,
48    /// How often to check (in seconds)
49    pub interval_secs: u64,
50    /// Action to take when a stuck strand is detected
51    pub action: WatchdogAction,
52}
53
54/// Action to take when watchdog detects a stuck strand
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum WatchdogAction {
57    /// Log a warning and dump diagnostics (default)
58    Warn,
59    /// Dump diagnostics and exit the process
60    Exit,
61}
62
63impl Default for WatchdogConfig {
64    fn default() -> Self {
65        Self {
66            threshold_secs: 0, // Disabled by default
67            interval_secs: 5,
68            action: WatchdogAction::Warn,
69        }
70    }
71}
72
73impl WatchdogConfig {
74    /// Load configuration from environment variables
75    pub fn from_env() -> Self {
76        let threshold_secs = std::env::var("SEQ_WATCHDOG_SECS")
77            .ok()
78            .and_then(|s| s.parse().ok())
79            .unwrap_or(0);
80
81        let interval_secs = std::env::var("SEQ_WATCHDOG_INTERVAL")
82            .ok()
83            .and_then(|s| s.parse().ok())
84            .filter(|&v| v > 0)
85            .unwrap_or(5);
86
87        let action = std::env::var("SEQ_WATCHDOG_ACTION")
88            .ok()
89            .map(|s| match s.to_lowercase().as_str() {
90                "exit" => WatchdogAction::Exit,
91                _ => WatchdogAction::Warn,
92            })
93            .unwrap_or(WatchdogAction::Warn);
94
95        Self {
96            threshold_secs,
97            interval_secs,
98            action,
99        }
100    }
101
102    /// Check if watchdog is enabled
103    pub fn is_enabled(&self) -> bool {
104        self.threshold_secs > 0
105    }
106}
107
108/// Install the watchdog timer
109///
110/// This spawns a dedicated thread that periodically checks for stuck strands.
111/// Safe to call multiple times (idempotent via Once).
112///
113/// The watchdog is only started if `SEQ_WATCHDOG_SECS` is set to a positive value.
114pub fn install_watchdog() {
115    WATCHDOG_INIT.call_once(|| {
116        let config = WatchdogConfig::from_env();
117
118        if !config.is_enabled() {
119            return;
120        }
121
122        eprintln!(
123            "[watchdog] Enabled: threshold={}s, interval={}s, action={:?}",
124            config.threshold_secs, config.interval_secs, config.action
125        );
126
127        if let Err(e) = std::thread::Builder::new()
128            .name("seq-watchdog".to_string())
129            .spawn(move || watchdog_loop(config))
130        {
131            eprintln!("[watchdog] WARNING: Failed to start watchdog thread: {}", e);
132        }
133    });
134}
135
136/// Main watchdog loop
137fn watchdog_loop(config: WatchdogConfig) {
138    let interval = Duration::from_secs(config.interval_secs);
139
140    loop {
141        std::thread::sleep(interval);
142
143        if let Some((strand_id, running_secs)) = check_for_stuck_strands(config.threshold_secs) {
144            handle_stuck_strand(strand_id, running_secs, &config);
145        }
146    }
147}
148
149/// Check the strand registry for any strands exceeding the threshold
150///
151/// Returns Some((strand_id, running_seconds)) for the longest-running stuck strand,
152/// or None if all strands are within threshold or system time is invalid.
153fn check_for_stuck_strands(threshold_secs: u64) -> Option<(u64, u64)> {
154    // Return None if system time is invalid (avoids false positives)
155    let now = SystemTime::now()
156        .duration_since(UNIX_EPOCH)
157        .ok()
158        .map(|d| d.as_secs())?;
159
160    let registry = strand_registry();
161    let mut worst: Option<(u64, u64)> = None;
162
163    for (strand_id, spawn_time) in registry.active_strands() {
164        if spawn_time == 0 {
165            continue;
166        }
167
168        let running_secs = now.saturating_sub(spawn_time);
169
170        if running_secs > threshold_secs {
171            match worst {
172                None => worst = Some((strand_id, running_secs)),
173                Some((_, prev_secs)) if running_secs > prev_secs => {
174                    worst = Some((strand_id, running_secs));
175                }
176                _ => {}
177            }
178        }
179    }
180
181    worst
182}
183
184/// Handle detection of a stuck strand
185fn handle_stuck_strand(strand_id: u64, running_secs: u64, config: &WatchdogConfig) {
186    // Track which strand triggered the watchdog to detect new stuck strands
187    let prev_strand = WATCHDOG_TRIGGERED_STRAND.swap(strand_id, Ordering::Relaxed);
188    let is_new_strand = prev_strand != strand_id;
189
190    use std::io::Write;
191    let mut stderr = std::io::stderr().lock();
192
193    let _ = writeln!(stderr);
194    let _ = writeln!(
195        stderr,
196        "WATCHDOG: Strand #{} running for {}s (threshold: {}s)",
197        strand_id, running_secs, config.threshold_secs
198    );
199
200    // Dump diagnostics on first trigger OR when a different strand gets stuck
201    if prev_strand == 0 || is_new_strand {
202        dump_diagnostics();
203    }
204
205    match config.action {
206        WatchdogAction::Warn => {
207            if prev_strand != 0 && !is_new_strand {
208                let _ = writeln!(stderr, "    (strand still stuck, diagnostics suppressed)");
209            }
210        }
211        WatchdogAction::Exit => {
212            let _ = writeln!(stderr, "    Exiting due to SEQ_WATCHDOG_ACTION=exit");
213            std::process::exit(1);
214        }
215    }
216}
217
218/// Reset the watchdog triggered state (for testing)
219#[cfg(test)]
220pub fn reset_triggered() {
221    WATCHDOG_TRIGGERED_STRAND.store(0, Ordering::Relaxed);
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use std::sync::Mutex;
228
229    // Serialize env var tests to avoid race conditions
230    static ENV_TEST_MUTEX: Mutex<()> = Mutex::new(());
231
232    #[test]
233    fn test_config_defaults() {
234        let config = WatchdogConfig::default();
235        assert_eq!(config.threshold_secs, 0);
236        assert_eq!(config.interval_secs, 5);
237        assert_eq!(config.action, WatchdogAction::Warn);
238        assert!(!config.is_enabled());
239    }
240
241    #[test]
242    fn test_config_enabled() {
243        let config = WatchdogConfig {
244            threshold_secs: 30,
245            interval_secs: 10,
246            action: WatchdogAction::Exit,
247        };
248        assert!(config.is_enabled());
249    }
250
251    #[test]
252    fn test_check_no_stuck_strands() {
253        // With no strands running, should return None
254        let result = check_for_stuck_strands(30);
255        // May or may not find strands depending on test execution order
256        // Just verify it doesn't panic
257        let _ = result;
258    }
259
260    // Helper to set env var (mutex must be held by caller)
261    unsafe fn set_env(key: &str, value: &str) {
262        // SAFETY: caller ensures mutex is held
263        unsafe { std::env::set_var(key, value) };
264    }
265
266    // Helper to restore env var (mutex must be held by caller)
267    unsafe fn restore_env(key: &str, orig: Option<String>) {
268        // SAFETY: caller ensures mutex is held
269        unsafe {
270            match orig {
271                Some(v) => std::env::set_var(key, v),
272                None => std::env::remove_var(key),
273            }
274        }
275    }
276
277    #[test]
278    fn test_from_env_all_values() {
279        let _guard = ENV_TEST_MUTEX.lock().unwrap();
280
281        // Save original values
282        let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
283        let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
284        let orig_action = std::env::var("SEQ_WATCHDOG_ACTION").ok();
285
286        // SAFETY: We hold the mutex, so no concurrent env var access
287        unsafe {
288            set_env("SEQ_WATCHDOG_SECS", "30");
289            set_env("SEQ_WATCHDOG_INTERVAL", "10");
290            set_env("SEQ_WATCHDOG_ACTION", "exit");
291        }
292
293        let config = WatchdogConfig::from_env();
294        assert_eq!(config.threshold_secs, 30);
295        assert_eq!(config.interval_secs, 10);
296        assert_eq!(config.action, WatchdogAction::Exit);
297        assert!(config.is_enabled());
298
299        // SAFETY: We hold the mutex
300        unsafe {
301            restore_env("SEQ_WATCHDOG_SECS", orig_secs);
302            restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
303            restore_env("SEQ_WATCHDOG_ACTION", orig_action);
304        }
305    }
306
307    #[test]
308    fn test_from_env_warn_action() {
309        let _guard = ENV_TEST_MUTEX.lock().unwrap();
310
311        let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
312
313        // SAFETY: We hold the mutex
314        unsafe {
315            set_env("SEQ_WATCHDOG_ACTION", "warn");
316        }
317
318        let config = WatchdogConfig::from_env();
319        assert_eq!(config.action, WatchdogAction::Warn);
320
321        // SAFETY: We hold the mutex
322        unsafe {
323            restore_env("SEQ_WATCHDOG_ACTION", orig);
324        }
325    }
326
327    #[test]
328    fn test_from_env_invalid_values() {
329        let _guard = ENV_TEST_MUTEX.lock().unwrap();
330
331        // Save original values
332        let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
333        let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
334
335        // SAFETY: We hold the mutex
336        unsafe {
337            set_env("SEQ_WATCHDOG_SECS", "not_a_number");
338            set_env("SEQ_WATCHDOG_INTERVAL", "0"); // 0 should use default
339        }
340
341        let config = WatchdogConfig::from_env();
342        assert_eq!(config.threshold_secs, 0); // Default on parse failure
343        assert_eq!(config.interval_secs, 5); // Default when 0
344
345        // SAFETY: We hold the mutex
346        unsafe {
347            restore_env("SEQ_WATCHDOG_SECS", orig_secs);
348            restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
349        }
350    }
351
352    #[test]
353    fn test_from_env_unknown_action_defaults_to_warn() {
354        let _guard = ENV_TEST_MUTEX.lock().unwrap();
355
356        let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
357
358        // SAFETY: We hold the mutex
359        unsafe {
360            set_env("SEQ_WATCHDOG_ACTION", "unknown_action");
361        }
362
363        let config = WatchdogConfig::from_env();
364        assert_eq!(config.action, WatchdogAction::Warn);
365
366        // SAFETY: We hold the mutex
367        unsafe {
368            restore_env("SEQ_WATCHDOG_ACTION", orig);
369        }
370    }
371}