1use crate::diagnostics::dump_diagnostics;
34use crate::scheduler::strand_registry;
35use std::sync::Once;
36use std::sync::atomic::{AtomicU64, Ordering};
37use std::time::{Duration, SystemTime, UNIX_EPOCH};
38
39static WATCHDOG_INIT: Once = Once::new();
40static WATCHDOG_TRIGGERED_STRAND: AtomicU64 = AtomicU64::new(0);
42
43#[derive(Debug, Clone)]
45pub struct WatchdogConfig {
46 pub threshold_secs: u64,
48 pub interval_secs: u64,
50 pub action: WatchdogAction,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum WatchdogAction {
57 Warn,
59 Exit,
61}
62
63impl Default for WatchdogConfig {
64 fn default() -> Self {
65 Self {
66 threshold_secs: 0, interval_secs: 5,
68 action: WatchdogAction::Warn,
69 }
70 }
71}
72
73impl WatchdogConfig {
74 pub fn from_env() -> Self {
76 let threshold_secs = std::env::var("SEQ_WATCHDOG_SECS")
77 .ok()
78 .and_then(|s| s.parse().ok())
79 .unwrap_or(0);
80
81 let interval_secs = std::env::var("SEQ_WATCHDOG_INTERVAL")
82 .ok()
83 .and_then(|s| s.parse().ok())
84 .filter(|&v| v > 0)
85 .unwrap_or(5);
86
87 let action = std::env::var("SEQ_WATCHDOG_ACTION")
88 .ok()
89 .map(|s| match s.to_lowercase().as_str() {
90 "exit" => WatchdogAction::Exit,
91 _ => WatchdogAction::Warn,
92 })
93 .unwrap_or(WatchdogAction::Warn);
94
95 Self {
96 threshold_secs,
97 interval_secs,
98 action,
99 }
100 }
101
102 pub fn is_enabled(&self) -> bool {
104 self.threshold_secs > 0
105 }
106}
107
108pub fn install_watchdog() {
115 WATCHDOG_INIT.call_once(|| {
116 let config = WatchdogConfig::from_env();
117
118 if !config.is_enabled() {
119 return;
120 }
121
122 eprintln!(
123 "[watchdog] Enabled: threshold={}s, interval={}s, action={:?}",
124 config.threshold_secs, config.interval_secs, config.action
125 );
126
127 if let Err(e) = std::thread::Builder::new()
128 .name("seq-watchdog".to_string())
129 .spawn(move || watchdog_loop(config))
130 {
131 eprintln!("[watchdog] WARNING: Failed to start watchdog thread: {}", e);
132 }
133 });
134}
135
136fn watchdog_loop(config: WatchdogConfig) {
138 let interval = Duration::from_secs(config.interval_secs);
139
140 loop {
141 std::thread::sleep(interval);
142
143 if let Some((strand_id, running_secs)) = check_for_stuck_strands(config.threshold_secs) {
144 handle_stuck_strand(strand_id, running_secs, &config);
145 }
146 }
147}
148
149fn check_for_stuck_strands(threshold_secs: u64) -> Option<(u64, u64)> {
154 let now = SystemTime::now()
156 .duration_since(UNIX_EPOCH)
157 .ok()
158 .map(|d| d.as_secs())?;
159
160 let registry = strand_registry();
161 let mut worst: Option<(u64, u64)> = None;
162
163 for (strand_id, spawn_time) in registry.active_strands() {
164 if spawn_time == 0 {
165 continue;
166 }
167
168 let running_secs = now.saturating_sub(spawn_time);
169
170 if running_secs > threshold_secs {
171 match worst {
172 None => worst = Some((strand_id, running_secs)),
173 Some((_, prev_secs)) if running_secs > prev_secs => {
174 worst = Some((strand_id, running_secs));
175 }
176 _ => {}
177 }
178 }
179 }
180
181 worst
182}
183
184fn handle_stuck_strand(strand_id: u64, running_secs: u64, config: &WatchdogConfig) {
186 let prev_strand = WATCHDOG_TRIGGERED_STRAND.swap(strand_id, Ordering::Relaxed);
188 let is_new_strand = prev_strand != strand_id;
189
190 use std::io::Write;
191 let mut stderr = std::io::stderr().lock();
192
193 let _ = writeln!(stderr);
194 let _ = writeln!(
195 stderr,
196 "WATCHDOG: Strand #{} running for {}s (threshold: {}s)",
197 strand_id, running_secs, config.threshold_secs
198 );
199
200 if prev_strand == 0 || is_new_strand {
202 dump_diagnostics();
203 }
204
205 match config.action {
206 WatchdogAction::Warn => {
207 if prev_strand != 0 && !is_new_strand {
208 let _ = writeln!(stderr, " (strand still stuck, diagnostics suppressed)");
209 }
210 }
211 WatchdogAction::Exit => {
212 let _ = writeln!(stderr, " Exiting due to SEQ_WATCHDOG_ACTION=exit");
213 std::process::exit(1);
214 }
215 }
216}
217
218#[cfg(test)]
220pub fn reset_triggered() {
221 WATCHDOG_TRIGGERED_STRAND.store(0, Ordering::Relaxed);
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use std::sync::Mutex;
228
229 static ENV_TEST_MUTEX: Mutex<()> = Mutex::new(());
231
232 #[test]
233 fn test_config_defaults() {
234 let config = WatchdogConfig::default();
235 assert_eq!(config.threshold_secs, 0);
236 assert_eq!(config.interval_secs, 5);
237 assert_eq!(config.action, WatchdogAction::Warn);
238 assert!(!config.is_enabled());
239 }
240
241 #[test]
242 fn test_config_enabled() {
243 let config = WatchdogConfig {
244 threshold_secs: 30,
245 interval_secs: 10,
246 action: WatchdogAction::Exit,
247 };
248 assert!(config.is_enabled());
249 }
250
251 #[test]
252 fn test_check_no_stuck_strands() {
253 let result = check_for_stuck_strands(30);
255 let _ = result;
258 }
259
260 unsafe fn set_env(key: &str, value: &str) {
262 unsafe { std::env::set_var(key, value) };
264 }
265
266 unsafe fn restore_env(key: &str, orig: Option<String>) {
268 unsafe {
270 match orig {
271 Some(v) => std::env::set_var(key, v),
272 None => std::env::remove_var(key),
273 }
274 }
275 }
276
277 #[test]
278 fn test_from_env_all_values() {
279 let _guard = ENV_TEST_MUTEX.lock().unwrap();
280
281 let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
283 let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
284 let orig_action = std::env::var("SEQ_WATCHDOG_ACTION").ok();
285
286 unsafe {
288 set_env("SEQ_WATCHDOG_SECS", "30");
289 set_env("SEQ_WATCHDOG_INTERVAL", "10");
290 set_env("SEQ_WATCHDOG_ACTION", "exit");
291 }
292
293 let config = WatchdogConfig::from_env();
294 assert_eq!(config.threshold_secs, 30);
295 assert_eq!(config.interval_secs, 10);
296 assert_eq!(config.action, WatchdogAction::Exit);
297 assert!(config.is_enabled());
298
299 unsafe {
301 restore_env("SEQ_WATCHDOG_SECS", orig_secs);
302 restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
303 restore_env("SEQ_WATCHDOG_ACTION", orig_action);
304 }
305 }
306
307 #[test]
308 fn test_from_env_warn_action() {
309 let _guard = ENV_TEST_MUTEX.lock().unwrap();
310
311 let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
312
313 unsafe {
315 set_env("SEQ_WATCHDOG_ACTION", "warn");
316 }
317
318 let config = WatchdogConfig::from_env();
319 assert_eq!(config.action, WatchdogAction::Warn);
320
321 unsafe {
323 restore_env("SEQ_WATCHDOG_ACTION", orig);
324 }
325 }
326
327 #[test]
328 fn test_from_env_invalid_values() {
329 let _guard = ENV_TEST_MUTEX.lock().unwrap();
330
331 let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
333 let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
334
335 unsafe {
337 set_env("SEQ_WATCHDOG_SECS", "not_a_number");
338 set_env("SEQ_WATCHDOG_INTERVAL", "0"); }
340
341 let config = WatchdogConfig::from_env();
342 assert_eq!(config.threshold_secs, 0); assert_eq!(config.interval_secs, 5); unsafe {
347 restore_env("SEQ_WATCHDOG_SECS", orig_secs);
348 restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
349 }
350 }
351
352 #[test]
353 fn test_from_env_unknown_action_defaults_to_warn() {
354 let _guard = ENV_TEST_MUTEX.lock().unwrap();
355
356 let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
357
358 unsafe {
360 set_env("SEQ_WATCHDOG_ACTION", "unknown_action");
361 }
362
363 let config = WatchdogConfig::from_env();
364 assert_eq!(config.action, WatchdogAction::Warn);
365
366 unsafe {
368 restore_env("SEQ_WATCHDOG_ACTION", orig);
369 }
370 }
371}