1#![cfg(feature = "diagnostics")]
39
40use crate::diagnostics::dump_diagnostics;
41use crate::scheduler::strand_registry;
42use std::sync::Once;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::time::{Duration, SystemTime, UNIX_EPOCH};
45
46static WATCHDOG_INIT: Once = Once::new();
47static WATCHDOG_TRIGGERED_STRAND: AtomicU64 = AtomicU64::new(0);
49
50#[derive(Debug, Clone)]
52pub struct WatchdogConfig {
53 pub threshold_secs: u64,
55 pub interval_secs: u64,
57 pub action: WatchdogAction,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum WatchdogAction {
64 Warn,
66 Exit,
68}
69
70impl Default for WatchdogConfig {
71 fn default() -> Self {
72 Self {
73 threshold_secs: 0, interval_secs: 5,
75 action: WatchdogAction::Warn,
76 }
77 }
78}
79
80impl WatchdogConfig {
81 pub fn from_env() -> Self {
83 let threshold_secs = std::env::var("SEQ_WATCHDOG_SECS")
84 .ok()
85 .and_then(|s| s.parse().ok())
86 .unwrap_or(0);
87
88 let interval_secs = std::env::var("SEQ_WATCHDOG_INTERVAL")
89 .ok()
90 .and_then(|s| s.parse().ok())
91 .filter(|&v| v > 0)
92 .unwrap_or(5);
93
94 let action = std::env::var("SEQ_WATCHDOG_ACTION")
95 .ok()
96 .map(|s| match s.to_lowercase().as_str() {
97 "exit" => WatchdogAction::Exit,
98 _ => WatchdogAction::Warn,
99 })
100 .unwrap_or(WatchdogAction::Warn);
101
102 Self {
103 threshold_secs,
104 interval_secs,
105 action,
106 }
107 }
108
109 pub fn is_enabled(&self) -> bool {
111 self.threshold_secs > 0
112 }
113}
114
115pub fn install_watchdog() {
122 WATCHDOG_INIT.call_once(|| {
123 let config = WatchdogConfig::from_env();
124
125 if !config.is_enabled() {
126 return;
127 }
128
129 eprintln!(
130 "[watchdog] Enabled: threshold={}s, interval={}s, action={:?}",
131 config.threshold_secs, config.interval_secs, config.action
132 );
133
134 if let Err(e) = std::thread::Builder::new()
135 .name("seq-watchdog".to_string())
136 .spawn(move || watchdog_loop(config))
137 {
138 eprintln!("[watchdog] WARNING: Failed to start watchdog thread: {}", e);
139 }
140 });
141}
142
143fn watchdog_loop(config: WatchdogConfig) {
145 let interval = Duration::from_secs(config.interval_secs);
146
147 loop {
148 std::thread::sleep(interval);
149
150 if let Some((strand_id, running_secs)) = check_for_stuck_strands(config.threshold_secs) {
151 handle_stuck_strand(strand_id, running_secs, &config);
152 }
153 }
154}
155
156fn check_for_stuck_strands(threshold_secs: u64) -> Option<(u64, u64)> {
161 let now = SystemTime::now()
163 .duration_since(UNIX_EPOCH)
164 .ok()
165 .map(|d| d.as_secs())?;
166
167 let registry = strand_registry();
168 let mut worst: Option<(u64, u64)> = None;
169
170 for (strand_id, spawn_time) in registry.active_strands() {
171 if spawn_time == 0 {
172 continue;
173 }
174
175 let running_secs = now.saturating_sub(spawn_time);
176
177 if running_secs > threshold_secs {
178 match worst {
179 None => worst = Some((strand_id, running_secs)),
180 Some((_, prev_secs)) if running_secs > prev_secs => {
181 worst = Some((strand_id, running_secs));
182 }
183 _ => {}
184 }
185 }
186 }
187
188 worst
189}
190
191fn handle_stuck_strand(strand_id: u64, running_secs: u64, config: &WatchdogConfig) {
193 let prev_strand = WATCHDOG_TRIGGERED_STRAND.swap(strand_id, Ordering::Relaxed);
195 let is_new_strand = prev_strand != strand_id;
196
197 use std::io::Write;
198 let mut stderr = std::io::stderr().lock();
199
200 let _ = writeln!(stderr);
201 let _ = writeln!(
202 stderr,
203 "WATCHDOG: Strand #{} running for {}s (threshold: {}s)",
204 strand_id, running_secs, config.threshold_secs
205 );
206
207 if prev_strand == 0 || is_new_strand {
209 dump_diagnostics();
210 }
211
212 match config.action {
213 WatchdogAction::Warn => {
214 if prev_strand != 0 && !is_new_strand {
215 let _ = writeln!(stderr, " (strand still stuck, diagnostics suppressed)");
216 }
217 }
218 WatchdogAction::Exit => {
219 let _ = writeln!(stderr, " Exiting due to SEQ_WATCHDOG_ACTION=exit");
220 std::process::exit(1);
221 }
222 }
223}
224
225#[cfg(test)]
227pub fn reset_triggered() {
228 WATCHDOG_TRIGGERED_STRAND.store(0, Ordering::Relaxed);
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use std::sync::Mutex;
235
236 static ENV_TEST_MUTEX: Mutex<()> = Mutex::new(());
238
239 #[test]
240 fn test_config_defaults() {
241 let config = WatchdogConfig::default();
242 assert_eq!(config.threshold_secs, 0);
243 assert_eq!(config.interval_secs, 5);
244 assert_eq!(config.action, WatchdogAction::Warn);
245 assert!(!config.is_enabled());
246 }
247
248 #[test]
249 fn test_config_enabled() {
250 let config = WatchdogConfig {
251 threshold_secs: 30,
252 interval_secs: 10,
253 action: WatchdogAction::Exit,
254 };
255 assert!(config.is_enabled());
256 }
257
258 #[test]
259 fn test_check_no_stuck_strands() {
260 let result = check_for_stuck_strands(30);
262 let _ = result;
265 }
266
267 unsafe fn set_env(key: &str, value: &str) {
269 unsafe { std::env::set_var(key, value) };
271 }
272
273 unsafe fn restore_env(key: &str, orig: Option<String>) {
275 unsafe {
277 match orig {
278 Some(v) => std::env::set_var(key, v),
279 None => std::env::remove_var(key),
280 }
281 }
282 }
283
284 #[test]
285 fn test_from_env_all_values() {
286 let _guard = ENV_TEST_MUTEX.lock().unwrap();
287
288 let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
290 let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
291 let orig_action = std::env::var("SEQ_WATCHDOG_ACTION").ok();
292
293 unsafe {
295 set_env("SEQ_WATCHDOG_SECS", "30");
296 set_env("SEQ_WATCHDOG_INTERVAL", "10");
297 set_env("SEQ_WATCHDOG_ACTION", "exit");
298 }
299
300 let config = WatchdogConfig::from_env();
301 assert_eq!(config.threshold_secs, 30);
302 assert_eq!(config.interval_secs, 10);
303 assert_eq!(config.action, WatchdogAction::Exit);
304 assert!(config.is_enabled());
305
306 unsafe {
308 restore_env("SEQ_WATCHDOG_SECS", orig_secs);
309 restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
310 restore_env("SEQ_WATCHDOG_ACTION", orig_action);
311 }
312 }
313
314 #[test]
315 fn test_from_env_warn_action() {
316 let _guard = ENV_TEST_MUTEX.lock().unwrap();
317
318 let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
319
320 unsafe {
322 set_env("SEQ_WATCHDOG_ACTION", "warn");
323 }
324
325 let config = WatchdogConfig::from_env();
326 assert_eq!(config.action, WatchdogAction::Warn);
327
328 unsafe {
330 restore_env("SEQ_WATCHDOG_ACTION", orig);
331 }
332 }
333
334 #[test]
335 fn test_from_env_invalid_values() {
336 let _guard = ENV_TEST_MUTEX.lock().unwrap();
337
338 let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
340 let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
341
342 unsafe {
344 set_env("SEQ_WATCHDOG_SECS", "not_a_number");
345 set_env("SEQ_WATCHDOG_INTERVAL", "0"); }
347
348 let config = WatchdogConfig::from_env();
349 assert_eq!(config.threshold_secs, 0); assert_eq!(config.interval_secs, 5); unsafe {
354 restore_env("SEQ_WATCHDOG_SECS", orig_secs);
355 restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
356 }
357 }
358
359 #[test]
360 fn test_from_env_unknown_action_defaults_to_warn() {
361 let _guard = ENV_TEST_MUTEX.lock().unwrap();
362
363 let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
364
365 unsafe {
367 set_env("SEQ_WATCHDOG_ACTION", "unknown_action");
368 }
369
370 let config = WatchdogConfig::from_env();
371 assert_eq!(config.action, WatchdogAction::Warn);
372
373 unsafe {
375 restore_env("SEQ_WATCHDOG_ACTION", orig);
376 }
377 }
378}