Skip to main content

ralph_workflow/pipeline/idle_timeout/
monitor.rs

1//! Idle-timeout monitor thread.
2
3use super::kill::{
4    force_kill_best_effort, kill_process, KillConfig, KillResult, DEFAULT_KILL_CONFIG,
5};
6use super::{is_idle_timeout_exceeded, SharedActivityTimestamp};
7use crate::executor::{AgentChild, ProcessExecutor};
8use std::sync::Arc;
9use std::time::Duration;
10
11/// Result of idle timeout monitoring.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum MonitorResult {
14    /// Process completed normally (not killed by monitor).
15    ProcessCompleted,
16    /// Idle timeout was exceeded and termination was initiated.
17    ///
18    /// In the common case the subprocess exits promptly after SIGTERM/SIGKILL,
19    /// and by the time this result is returned the process is already gone.
20    ///
21    /// In pathological cases (e.g. a stuck/unresponsive subprocess or one that
22    /// does not terminate even after repeated SIGKILL attempts), the monitor may
23    /// return `TimedOut` after a bounded enforcement window so the pipeline can
24    /// regain control. When that happens, a background reaper continues best-effort
25    /// SIGKILL attempts until the process is observed dead.
26    ///
27    /// The `escalated` flag indicates whether SIGKILL/taskkill was required:
28    /// - `false`: Process terminated after SIGTERM within grace period
29    /// - `true`: Process did not respond to SIGTERM, required SIGKILL/taskkill
30    TimedOut { escalated: bool },
31}
32
33/// Default check interval for the idle monitor (1 second).
34const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_secs(1);
35
36/// Monitors activity and kills a process if idle timeout is exceeded.
37pub fn monitor_idle_timeout(
38    activity_timestamp: SharedActivityTimestamp,
39    child: Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
40    timeout_secs: u64,
41    should_stop: Arc<std::sync::atomic::AtomicBool>,
42    executor: Arc<dyn ProcessExecutor>,
43) -> MonitorResult {
44    monitor_idle_timeout_with_interval_and_kill_config(
45        activity_timestamp,
46        child,
47        timeout_secs,
48        should_stop,
49        executor,
50        DEFAULT_CHECK_INTERVAL,
51        DEFAULT_KILL_CONFIG,
52    )
53}
54
55/// Like [`monitor_idle_timeout`] but with a configurable check interval.
56pub fn monitor_idle_timeout_with_interval(
57    activity_timestamp: SharedActivityTimestamp,
58    child: Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
59    timeout_secs: u64,
60    should_stop: Arc<std::sync::atomic::AtomicBool>,
61    executor: Arc<dyn ProcessExecutor>,
62    check_interval: Duration,
63) -> MonitorResult {
64    monitor_idle_timeout_with_interval_and_kill_config(
65        activity_timestamp,
66        child,
67        timeout_secs,
68        should_stop,
69        executor,
70        check_interval,
71        DEFAULT_KILL_CONFIG,
72    )
73}
74
75pub fn monitor_idle_timeout_with_interval_and_kill_config(
76    activity_timestamp: SharedActivityTimestamp,
77    child: Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
78    timeout_secs: u64,
79    should_stop: Arc<std::sync::atomic::AtomicBool>,
80    executor: Arc<dyn ProcessExecutor>,
81    check_interval: Duration,
82    kill_config: KillConfig,
83) -> MonitorResult {
84    use std::sync::atomic::Ordering;
85
86    #[derive(Debug, Clone, Copy)]
87    struct TimeoutEnforcementState {
88        pid: u32,
89        escalated: bool,
90        last_sigkill_sent_at: Option<std::time::Instant>,
91        triggered_at: std::time::Instant,
92    }
93
94    let mut timeout_triggered: Option<TimeoutEnforcementState> = None;
95
96    loop {
97        // Fast-path teardown: if the process completed and we have not already
98        // triggered idle-timeout enforcement, stop immediately.
99        if timeout_triggered.is_none() && should_stop.load(Ordering::Acquire) {
100            return MonitorResult::ProcessCompleted;
101        }
102
103        std::thread::sleep(check_interval);
104
105        if let Some(mut state) = timeout_triggered.take() {
106            let status = {
107                let mut locked_child = child
108                    .lock()
109                    .expect("child process mutex poisoned - indicates panic in another thread");
110                locked_child.try_wait()
111            };
112
113            match status {
114                Ok(Some(_)) => {
115                    return MonitorResult::TimedOut {
116                        escalated: state.escalated,
117                    }
118                }
119                Ok(None) | Err(_) => {
120                    let now = std::time::Instant::now();
121
122                    // Be robust to future changes: if we ever enter the enforcement state
123                    // without having escalated yet, force escalation now.
124                    if !state.escalated {
125                        let _ = force_kill_best_effort(state.pid, executor.as_ref());
126                        state.escalated = true;
127                        state.last_sigkill_sent_at = Some(now);
128                    } else {
129                        let should_resend = match state.last_sigkill_sent_at {
130                            None => true,
131                            Some(t) => {
132                                now.duration_since(t) >= kill_config.sigkill_resend_interval()
133                            }
134                        };
135                        if should_resend {
136                            let _ = force_kill_best_effort(state.pid, executor.as_ref());
137                            state.last_sigkill_sent_at = Some(now);
138                        }
139                    }
140
141                    // After a bounded enforcement window, return TimedOut so the
142                    // main pipeline can regain control. A detached reaper keeps
143                    // trying to kill until the process is observed dead.
144                    if now.duration_since(state.triggered_at) >= kill_config.post_sigkill_hard_cap()
145                        && state.escalated
146                    {
147                        let child_for_reaper = Arc::clone(&child);
148                        let executor_for_reaper = Arc::clone(&executor);
149                        let should_stop_for_reaper = Arc::clone(&should_stop);
150                        let config_for_reaper = kill_config;
151                        let pid = state.pid;
152                        std::thread::spawn(move || {
153                            // Bound the reaper's lifetime to avoid leaking threads across
154                            // repeated timeouts. If the process is truly unkillable, a bounded
155                            // best-effort reaper is the least-bad option.
156                            let deadline = std::time::Instant::now()
157                                + config_for_reaper.post_sigkill_hard_cap();
158                            let mut last_kill_sent_at: Option<std::time::Instant> = None;
159
160                            while std::time::Instant::now() < deadline {
161                                if should_stop_for_reaper.load(Ordering::Acquire) {
162                                    return;
163                                }
164
165                                let status = {
166                                    let mut locked_child = child_for_reaper.lock()
167                                        .expect("child process mutex poisoned - indicates panic in another thread");
168                                    locked_child.try_wait()
169                                };
170
171                                match status {
172                                    Ok(Some(_)) => return,
173                                    Ok(None) | Err(_) => {
174                                        let now = std::time::Instant::now();
175                                        let should_resend = match last_kill_sent_at {
176                                            None => true,
177                                            Some(t) => {
178                                                now.duration_since(t)
179                                                    >= config_for_reaper.sigkill_resend_interval()
180                                            }
181                                        };
182                                        if should_resend {
183                                            let _ = force_kill_best_effort(
184                                                pid,
185                                                executor_for_reaper.as_ref(),
186                                            );
187                                            last_kill_sent_at = Some(now);
188                                        }
189                                        std::thread::sleep(config_for_reaper.poll_interval());
190                                    }
191                                }
192                            }
193                        });
194
195                        return MonitorResult::TimedOut {
196                            escalated: state.escalated,
197                        };
198                    }
199
200                    timeout_triggered = Some(state);
201                    continue;
202                }
203            }
204        }
205
206        if !is_idle_timeout_exceeded(&activity_timestamp, timeout_secs) {
207            continue;
208        }
209
210        let child_id = {
211            let mut locked_child = child
212                .lock()
213                .expect("child process mutex poisoned - indicates panic in another thread");
214            if let Ok(Some(_)) = locked_child.try_wait() {
215                return MonitorResult::ProcessCompleted;
216            }
217            locked_child.id()
218        };
219
220        let kill_result = kill_process(child_id, executor.as_ref(), Some(&child), kill_config);
221        match kill_result {
222            KillResult::TerminatedByTerm => return MonitorResult::TimedOut { escalated: false },
223            KillResult::TerminatedByKill => return MonitorResult::TimedOut { escalated: true },
224            KillResult::SignalsSentAwaitingExit { escalated } => {
225                timeout_triggered = Some(TimeoutEnforcementState {
226                    pid: child_id,
227                    escalated,
228                    triggered_at: std::time::Instant::now(),
229                    last_sigkill_sent_at: escalated.then_some(std::time::Instant::now()),
230                });
231            }
232            KillResult::Failed => {
233                if should_stop.load(Ordering::Acquire) {
234                    return MonitorResult::ProcessCompleted;
235                }
236            }
237        }
238    }
239}