#![cfg(feature = "diagnostics")]
use crate::diagnostics::dump_diagnostics;
use crate::scheduler::strand_registry;
use std::sync::Once;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
static WATCHDOG_INIT: Once = Once::new();
static WATCHDOG_TRIGGERED_STRAND: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone)]
pub struct WatchdogConfig {
pub threshold_secs: u64,
pub interval_secs: u64,
pub action: WatchdogAction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WatchdogAction {
Warn,
Exit,
}
impl Default for WatchdogConfig {
fn default() -> Self {
Self {
threshold_secs: 0, interval_secs: 5,
action: WatchdogAction::Warn,
}
}
}
impl WatchdogConfig {
pub fn from_env() -> Self {
let threshold_secs = std::env::var("SEQ_WATCHDOG_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let interval_secs = std::env::var("SEQ_WATCHDOG_INTERVAL")
.ok()
.and_then(|s| s.parse().ok())
.filter(|&v| v > 0)
.unwrap_or(5);
let action = std::env::var("SEQ_WATCHDOG_ACTION")
.ok()
.map(|s| match s.to_lowercase().as_str() {
"exit" => WatchdogAction::Exit,
_ => WatchdogAction::Warn,
})
.unwrap_or(WatchdogAction::Warn);
Self {
threshold_secs,
interval_secs,
action,
}
}
pub fn is_enabled(&self) -> bool {
self.threshold_secs > 0
}
}
pub fn install_watchdog() {
WATCHDOG_INIT.call_once(|| {
let config = WatchdogConfig::from_env();
if !config.is_enabled() {
return;
}
eprintln!(
"[watchdog] Enabled: threshold={}s, interval={}s, action={:?}",
config.threshold_secs, config.interval_secs, config.action
);
if let Err(e) = std::thread::Builder::new()
.name("seq-watchdog".to_string())
.spawn(move || watchdog_loop(config))
{
eprintln!("[watchdog] WARNING: Failed to start watchdog thread: {}", e);
}
});
}
fn watchdog_loop(config: WatchdogConfig) {
let interval = Duration::from_secs(config.interval_secs);
loop {
std::thread::sleep(interval);
if let Some((strand_id, running_secs)) = check_for_stuck_strands(config.threshold_secs) {
handle_stuck_strand(strand_id, running_secs, &config);
}
}
}
fn check_for_stuck_strands(threshold_secs: u64) -> Option<(u64, u64)> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs())?;
let registry = strand_registry();
let mut worst: Option<(u64, u64)> = None;
for (strand_id, spawn_time) in registry.active_strands() {
if spawn_time == 0 {
continue;
}
let running_secs = now.saturating_sub(spawn_time);
if running_secs > threshold_secs {
match worst {
None => worst = Some((strand_id, running_secs)),
Some((_, prev_secs)) if running_secs > prev_secs => {
worst = Some((strand_id, running_secs));
}
_ => {}
}
}
}
worst
}
fn handle_stuck_strand(strand_id: u64, running_secs: u64, config: &WatchdogConfig) {
let prev_strand = WATCHDOG_TRIGGERED_STRAND.swap(strand_id, Ordering::Relaxed);
let is_new_strand = prev_strand != strand_id;
use std::io::Write;
let mut stderr = std::io::stderr().lock();
let _ = writeln!(stderr);
let _ = writeln!(
stderr,
"WATCHDOG: Strand #{} running for {}s (threshold: {}s)",
strand_id, running_secs, config.threshold_secs
);
if prev_strand == 0 || is_new_strand {
dump_diagnostics();
}
match config.action {
WatchdogAction::Warn => {
if prev_strand != 0 && !is_new_strand {
let _ = writeln!(stderr, " (strand still stuck, diagnostics suppressed)");
}
}
WatchdogAction::Exit => {
let _ = writeln!(stderr, " Exiting due to SEQ_WATCHDOG_ACTION=exit");
std::process::exit(1);
}
}
}
#[cfg(test)]
pub fn reset_triggered() {
WATCHDOG_TRIGGERED_STRAND.store(0, Ordering::Relaxed);
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
static ENV_TEST_MUTEX: Mutex<()> = Mutex::new(());
#[test]
fn test_config_defaults() {
let config = WatchdogConfig::default();
assert_eq!(config.threshold_secs, 0);
assert_eq!(config.interval_secs, 5);
assert_eq!(config.action, WatchdogAction::Warn);
assert!(!config.is_enabled());
}
#[test]
fn test_config_enabled() {
let config = WatchdogConfig {
threshold_secs: 30,
interval_secs: 10,
action: WatchdogAction::Exit,
};
assert!(config.is_enabled());
}
#[test]
fn test_check_no_stuck_strands() {
let result = check_for_stuck_strands(30);
let _ = result;
}
unsafe fn set_env(key: &str, value: &str) {
unsafe { std::env::set_var(key, value) };
}
unsafe fn restore_env(key: &str, orig: Option<String>) {
unsafe {
match orig {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
}
#[test]
fn test_from_env_all_values() {
let _guard = ENV_TEST_MUTEX.lock().unwrap();
let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
let orig_action = std::env::var("SEQ_WATCHDOG_ACTION").ok();
unsafe {
set_env("SEQ_WATCHDOG_SECS", "30");
set_env("SEQ_WATCHDOG_INTERVAL", "10");
set_env("SEQ_WATCHDOG_ACTION", "exit");
}
let config = WatchdogConfig::from_env();
assert_eq!(config.threshold_secs, 30);
assert_eq!(config.interval_secs, 10);
assert_eq!(config.action, WatchdogAction::Exit);
assert!(config.is_enabled());
unsafe {
restore_env("SEQ_WATCHDOG_SECS", orig_secs);
restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
restore_env("SEQ_WATCHDOG_ACTION", orig_action);
}
}
#[test]
fn test_from_env_warn_action() {
let _guard = ENV_TEST_MUTEX.lock().unwrap();
let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
unsafe {
set_env("SEQ_WATCHDOG_ACTION", "warn");
}
let config = WatchdogConfig::from_env();
assert_eq!(config.action, WatchdogAction::Warn);
unsafe {
restore_env("SEQ_WATCHDOG_ACTION", orig);
}
}
#[test]
fn test_from_env_invalid_values() {
let _guard = ENV_TEST_MUTEX.lock().unwrap();
let orig_secs = std::env::var("SEQ_WATCHDOG_SECS").ok();
let orig_interval = std::env::var("SEQ_WATCHDOG_INTERVAL").ok();
unsafe {
set_env("SEQ_WATCHDOG_SECS", "not_a_number");
set_env("SEQ_WATCHDOG_INTERVAL", "0"); }
let config = WatchdogConfig::from_env();
assert_eq!(config.threshold_secs, 0); assert_eq!(config.interval_secs, 5);
unsafe {
restore_env("SEQ_WATCHDOG_SECS", orig_secs);
restore_env("SEQ_WATCHDOG_INTERVAL", orig_interval);
}
}
#[test]
fn test_from_env_unknown_action_defaults_to_warn() {
let _guard = ENV_TEST_MUTEX.lock().unwrap();
let orig = std::env::var("SEQ_WATCHDOG_ACTION").ok();
unsafe {
set_env("SEQ_WATCHDOG_ACTION", "unknown_action");
}
let config = WatchdogConfig::from_env();
assert_eq!(config.action, WatchdogAction::Warn);
unsafe {
restore_env("SEQ_WATCHDOG_ACTION", orig);
}
}
}