seq_runtime/
watchdog.rs

1//! Watchdog timer for detecting stuck strands
2//!
3//! Monitors strand execution time and triggers alerts when strands run too long
4//! without yielding. This helps detect infinite loops and runaway computation.
5//!
6//! ## Configuration (Environment Variables)
7//!
8//! | Variable | Default | Description |
9//! |----------|---------|-------------|
10//! | `SEQ_WATCHDOG_SECS` | `0` (disabled) | Threshold in seconds for "stuck" strand |
11//! | `SEQ_WATCHDOG_INTERVAL` | `5` | Check frequency in seconds |
12//! | `SEQ_WATCHDOG_ACTION` | `warn` | Action: `warn` (dump diagnostics) or `exit` (terminate) |
13//!
14//! ## Example
15//!
16//! ```bash
17//! # Enable watchdog with 30 second threshold, check every 10 seconds
18//! SEQ_WATCHDOG_SECS=30 SEQ_WATCHDOG_INTERVAL=10 ./my-program
19//!
20//! # Enable watchdog that exits on stuck strand
21//! SEQ_WATCHDOG_SECS=60 SEQ_WATCHDOG_ACTION=exit ./my-program
22//! ```
23//!
24//! ## Design
25//!
26//! The watchdog runs on a dedicated thread and periodically scans the strand
27//! registry. It compares each strand's spawn time against the current time
28//! to detect strands that have been running longer than the threshold.
29//!
30//! This piggybacks on the existing strand registry infrastructure, requiring
31//! no additional tracking overhead on the hot path.
32//!
33//! ## Feature Flag
34//!
35//! This module requires the `diagnostics` feature (enabled by default).
36//! When disabled, the watchdog is not compiled.
37
38#![cfg(feature = "diagnostics")]
39
40use crate::diagnostics::dump_diagnostics;
41use crate::scheduler::strand_registry;
42use std::sync::Once;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::time::{Duration, SystemTime, UNIX_EPOCH};
45
46static WATCHDOG_INIT: Once = Once::new();
47// Tracks which strand triggered the watchdog (0 = none yet)
48static WATCHDOG_TRIGGERED_STRAND: AtomicU64 = AtomicU64::new(0);
49
50/// Watchdog configuration
51#[derive(Debug, Clone)]
52pub struct WatchdogConfig {
53    /// Threshold in seconds for considering a strand "stuck"
54    pub threshold_secs: u64,
55    /// How often to check (in seconds)
56    pub interval_secs: u64,
57    /// Action to take when a stuck strand is detected
58    pub action: WatchdogAction,
59}
60
61/// Action to take when watchdog detects a stuck strand
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum WatchdogAction {
64    /// Log a warning and dump diagnostics (default)
65    Warn,
66    /// Dump diagnostics and exit the process
67    Exit,
68}
69
70impl Default for WatchdogConfig {
71    fn default() -> Self {
72        Self {
73            threshold_secs: 0, // Disabled by default
74            interval_secs: 5,
75            action: WatchdogAction::Warn,
76        }
77    }
78}
79
80impl WatchdogConfig {
81    /// Load configuration from environment variables
82    pub fn from_env() -> Self {
83        let threshold_secs = std::env::var("SEQ_WATCHDOG_SECS")
84            .ok()
85            .and_then(|s| s.parse().ok())
86            .unwrap_or(0);
87
88        let interval_secs = std::env::var("SEQ_WATCHDOG_INTERVAL")
89            .ok()
90            .and_then(|s| s.parse().ok())
91            .filter(|&v| v > 0)
92            .unwrap_or(5);
93
94        let action = std::env::var("SEQ_WATCHDOG_ACTION")
95            .ok()
96            .map(|s| match s.to_lowercase().as_str() {
97                "exit" => WatchdogAction::Exit,
98                _ => WatchdogAction::Warn,
99            })
100            .unwrap_or(WatchdogAction::Warn);
101
102        Self {
103            threshold_secs,
104            interval_secs,
105            action,
106        }
107    }
108
109    /// Check if watchdog is enabled
110    pub fn is_enabled(&self) -> bool {
111        self.threshold_secs > 0
112    }
113}
114
115/// Install the watchdog timer
116///
117/// This spawns a dedicated thread that periodically checks for stuck strands.
118/// Safe to call multiple times (idempotent via Once).
119///
120/// The watchdog is only started if `SEQ_WATCHDOG_SECS` is set to a positive value.
121pub fn install_watchdog() {
122    WATCHDOG_INIT.call_once(|| {
123        let config = WatchdogConfig::from_env();
124
125        if !config.is_enabled() {
126            return;
127        }
128
129        eprintln!(
130            "[watchdog] Enabled: threshold={}s, interval={}s, action={:?}",
131            config.threshold_secs, config.interval_secs, config.action
132        );
133
134        if let Err(e) = std::thread::Builder::new()
135            .name("seq-watchdog".to_string())
136            .spawn(move || watchdog_loop(config))
137        {
138            eprintln!("[watchdog] WARNING: Failed to start watchdog thread: {}", e);
139        }
140    });
141}
142
143/// Main watchdog loop
144fn watchdog_loop(config: WatchdogConfig) {
145    let interval = Duration::from_secs(config.interval_secs);
146
147    loop {
148        std::thread::sleep(interval);
149
150        if let Some((strand_id, running_secs)) = check_for_stuck_strands(config.threshold_secs) {
151            handle_stuck_strand(strand_id, running_secs, &config);
152        }
153    }
154}
155
156/// Check the strand registry for any strands exceeding the threshold
157///
158/// Returns Some((strand_id, running_seconds)) for the longest-running stuck strand,
159/// or None if all strands are within threshold or system time is invalid.
160fn check_for_stuck_strands(threshold_secs: u64) -> Option<(u64, u64)> {
161    // Return None if system time is invalid (avoids false positives)
162    let now = SystemTime::now()
163        .duration_since(UNIX_EPOCH)
164        .ok()
165        .map(|d| d.as_secs())?;
166
167    let registry = strand_registry();
168    let mut worst: Option<(u64, u64)> = None;
169
170    for (strand_id, spawn_time) in registry.active_strands() {
171        if spawn_time == 0 {
172            continue;
173        }
174
175        let running_secs = now.saturating_sub(spawn_time);
176
177        if running_secs > threshold_secs {
178            match worst {
179                None => worst = Some((strand_id, running_secs)),
180                Some((_, prev_secs)) if running_secs > prev_secs => {
181                    worst = Some((strand_id, running_secs));
182                }
183                _ => {}
184            }
185        }
186    }
187
188    worst
189}
190
191/// Handle detection of a stuck strand
192fn handle_stuck_strand(strand_id: u64, running_secs: u64, config: &WatchdogConfig) {
193    // Track which strand triggered the watchdog to detect new stuck strands
194    let prev_strand = WATCHDOG_TRIGGERED_STRAND.swap(strand_id, Ordering::Relaxed);
195    let is_new_strand = prev_strand != strand_id;
196
197    use std::io::Write;
198    let mut stderr = std::io::stderr().lock();
199
200    let _ = writeln!(stderr);
201    let _ = writeln!(
202        stderr,
203        "WATCHDOG: Strand #{} running for {}s (threshold: {}s)",
204        strand_id, running_secs, config.threshold_secs
205    );
206
207    // Dump diagnostics on first trigger OR when a different strand gets stuck
208    if prev_strand == 0 || is_new_strand {
209        dump_diagnostics();
210    }
211
212    match config.action {
213        WatchdogAction::Warn => {
214            if prev_strand != 0 && !is_new_strand {
215                let _ = writeln!(stderr, "    (strand still stuck, diagnostics suppressed)");
216            }
217        }
218        WatchdogAction::Exit => {
219            let _ = writeln!(stderr, "    Exiting due to SEQ_WATCHDOG_ACTION=exit");
220            std::process::exit(1);
221        }
222    }
223}
224
225/// Reset the watchdog triggered state (for testing)
226#[cfg(test)]
227pub fn reset_triggered() {
228    WATCHDOG_TRIGGERED_STRAND.store(0, Ordering::Relaxed);
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use std::sync::Mutex;
235
236    // Serialize env var tests to avoid race conditions
237    static ENV_TEST_MUTEX: Mutex<()> = Mutex::new(());
238
239    #[test]
240    fn test_config_defaults() {
241        let config = WatchdogConfig::default();
242        assert_eq!(config.threshold_secs, 0);
243        assert_eq!(config.interval_secs, 5);
244        assert_eq!(config.action, WatchdogAction::Warn);
245        assert!(!config.is_enabled());
246    }
247
248    #[test]
249    fn test_config_enabled() {
250        let config = WatchdogConfig {
251            threshold_secs: 30,
252            interval_secs: 10,
253            action: WatchdogAction::Exit,
254        };
255        assert!(config.is_enabled());
256    }
257
258    #[test]
259    fn test_check_no_stuck_strands() {
260        // With no strands running, should return None
261        let result = check_for_stuck_strands(30);
262        // May or may not find strands depending on test execution order
263        // Just verify it doesn't panic
264        let _ = result;
265    }
266
267    // Helper to set env var (mutex must be held by caller)
268    unsafe fn set_env(key: &str, value: &str) {
269        // SAFETY: caller ensures mutex is held
270        unsafe { std::env::set_var(key, value) };
271    }
272
273    // Helper to restore env var (mutex must be held by caller)
274    unsafe fn restore_env(key: &str, orig: Option<String>) {
275        // SAFETY: caller ensures mutex is held
276        unsafe {
277            match orig {
278                Some(v) => std::env::set_var(key, v),
279                None => std::env::remove_var(key),
280            }
281        }
282    }
283
284    #[test]
285    fn test_from_env_all_values() {
286        let _guard = ENV_TEST_MUTEX.lock().unwrap();
287
288        // Save original values
289        let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
290        let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
291        let orig_action = std::env::var("SEQ_WATCHDOG_ACTION").ok();
292
293        // SAFETY: We hold the mutex, so no concurrent env var access
294        unsafe {
295            set_env("SEQ_WATCHDOG_SECS", "30");
296            set_env("SEQ_WATCHDOG_INTERVAL", "10");
297            set_env("SEQ_WATCHDOG_ACTION", "exit");
298        }
299
300        let config = WatchdogConfig::from_env();
301        assert_eq!(config.threshold_secs, 30);
302        assert_eq!(config.interval_secs, 10);
303        assert_eq!(config.action, WatchdogAction::Exit);
304        assert!(config.is_enabled());
305
306        // SAFETY: We hold the mutex
307        unsafe {
308            restore_env("SEQ_WATCHDOG_SECS", orig_secs);
309            restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
310            restore_env("SEQ_WATCHDOG_ACTION", orig_action);
311        }
312    }
313
314    #[test]
315    fn test_from_env_warn_action() {
316        let _guard = ENV_TEST_MUTEX.lock().unwrap();
317
318        let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
319
320        // SAFETY: We hold the mutex
321        unsafe {
322            set_env("SEQ_WATCHDOG_ACTION", "warn");
323        }
324
325        let config = WatchdogConfig::from_env();
326        assert_eq!(config.action, WatchdogAction::Warn);
327
328        // SAFETY: We hold the mutex
329        unsafe {
330            restore_env("SEQ_WATCHDOG_ACTION", orig);
331        }
332    }
333
334    #[test]
335    fn test_from_env_invalid_values() {
336        let _guard = ENV_TEST_MUTEX.lock().unwrap();
337
338        // Save original values
339        let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
340        let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
341
342        // SAFETY: We hold the mutex
343        unsafe {
344            set_env("SEQ_WATCHDOG_SECS", "not_a_number");
345            set_env("SEQ_WATCHDOG_INTERVAL", "0"); // 0 should use default
346        }
347
348        let config = WatchdogConfig::from_env();
349        assert_eq!(config.threshold_secs, 0); // Default on parse failure
350        assert_eq!(config.interval_secs, 5); // Default when 0
351
352        // SAFETY: We hold the mutex
353        unsafe {
354            restore_env("SEQ_WATCHDOG_SECS", orig_secs);
355            restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
356        }
357    }
358
359    #[test]
360    fn test_from_env_unknown_action_defaults_to_warn() {
361        let _guard = ENV_TEST_MUTEX.lock().unwrap();
362
363        let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
364
365        // SAFETY: We hold the mutex
366        unsafe {
367            set_env("SEQ_WATCHDOG_ACTION", "unknown_action");
368        }
369
370        let config = WatchdogConfig::from_env();
371        assert_eq!(config.action, WatchdogAction::Warn);
372
373        // SAFETY: We hold the mutex
374        unsafe {
375            restore_env("SEQ_WATCHDOG_ACTION", orig);
376        }
377    }
378}