Skip to main content

ralph_workflow/pipeline/idle_timeout/
monitor.rs

1//! Idle-timeout monitor thread.
2
3use super::kill::{
4    force_kill_best_effort, kill_process, KillConfig, KillResult, DEFAULT_KILL_CONFIG,
5};
6use super::{is_idle_timeout_exceeded, SharedActivityTimestamp, SharedFileActivityTracker};
7use crate::executor::{AgentChild, ChildProcessInfo, ProcessExecutor};
8use crate::workspace::Workspace;
9use std::sync::Arc;
10use std::time::Duration;
11
12/// Configuration for file activity monitoring during timeout detection.
13///
14/// When provided, the monitor will check for recent AI-generated file updates
15/// in addition to stdout/stderr activity.
16pub struct FileActivityConfig {
17    /// Shared file activity tracker.
18    pub tracker: SharedFileActivityTracker,
19    /// Workspace for reading file metadata.
20    pub workspace: Arc<dyn Workspace>,
21}
22
23/// Configuration for the idle timeout monitor.
24#[derive(Debug, Clone, Copy)]
25pub struct MonitorConfig {
26    /// Timeout duration.
27    pub timeout: Duration,
28    /// Check interval for the monitor loop.
29    pub check_interval: Duration,
30    /// Kill configuration for process termination.
31    pub kill_config: KillConfig,
32    /// Number of consecutive idle observations required before killing the process.
33    ///
34    /// Requiring more than one confirmation prevents false kills when the agent is
35    /// transiently quiet (e.g., waiting for an LLM API response, running a slow
36    /// compilation, or transitioning between work phases). Each additional
37    /// confirmation adds one `check_interval` of grace time before enforcement.
38    ///
39    /// Default: 2 (one extra `check_interval` of confirmation before kill).
40    pub required_idle_confirmations: u32,
41    /// Whether to check for active child processes before declaring the agent idle.
42    ///
43    /// When `true` (the default), the monitor queries the `ProcessExecutor` for
44    /// active child processes of the agent. If any are found the idle counter is
45    /// reset, preventing false kills when the agent is running a long subprocess
46    /// (e.g. `cargo test`, `npm install`, `cargo build`).
47    ///
48    /// Set to `false` in tests that deliberately do not want this check to run.
49    pub check_child_processes: bool,
50}
51
52impl Default for MonitorConfig {
53    fn default() -> Self {
54        Self {
55            timeout: Duration::from_secs(super::IDLE_TIMEOUT_SECS),
56            check_interval: DEFAULT_CHECK_INTERVAL,
57            kill_config: DEFAULT_KILL_CONFIG,
58            required_idle_confirmations: 2,
59            check_child_processes: true,
60        }
61    }
62}
63
64/// Result of idle timeout monitoring.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum MonitorResult {
67    /// Process completed normally (not killed by monitor).
68    ProcessCompleted,
69    /// Idle timeout was exceeded and termination was initiated.
70    ///
71    /// In the common case the subprocess exits promptly after SIGTERM/SIGKILL,
72    /// and by the time this result is returned the process is already gone.
73    ///
74    /// In pathological cases (e.g. a stuck/unresponsive subprocess or one that
75    /// does not terminate even after repeated SIGKILL attempts), the monitor may
76    /// return `TimedOut` after a bounded enforcement window so the pipeline can
77    /// regain control. When that happens, a background reaper continues best-effort
78    /// SIGKILL attempts until the process is observed dead.
79    ///
80    /// The `escalated` flag indicates whether SIGKILL/taskkill was required:
81    /// - `false`: Process terminated after SIGTERM within grace period
82    /// - `true`: Process did not respond to SIGTERM, required SIGKILL/taskkill
83    ///
84    /// `child_status_at_timeout` records the child-process state when the timeout
85    /// was enforced, enabling observability (AC #9):
86    /// - `None`: no children existed (or child-process checking was disabled)
87    /// - `Some(info)`: children were present (with stalled CPU) at kill time
88    TimedOut {
89        escalated: bool,
90        child_status_at_timeout: Option<ChildProcessInfo>,
91    },
92}
93
94/// Default check interval for the idle monitor (30 seconds).
95const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_secs(30);
96
97fn sleep_until_next_check_or_stop(
98    should_stop: &std::sync::atomic::AtomicBool,
99    check_interval: Duration,
100) -> bool {
101    use std::cmp;
102    use std::sync::atomic::Ordering;
103
104    let poll_interval = cmp::min(check_interval, Duration::from_millis(100));
105    let deadline = std::time::Instant::now() + check_interval;
106
107    loop {
108        if should_stop.load(Ordering::Acquire) {
109            return true;
110        }
111
112        let now = std::time::Instant::now();
113        if now >= deadline {
114            return false;
115        }
116
117        let remaining = deadline.saturating_duration_since(now);
118        std::thread::sleep(cmp::min(poll_interval, remaining));
119    }
120}
121
122/// Monitors activity and kills a process if idle timeout is exceeded.
123pub fn monitor_idle_timeout(
124    activity_timestamp: &SharedActivityTimestamp,
125    child: &Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
126    timeout: Duration,
127    should_stop: &Arc<std::sync::atomic::AtomicBool>,
128    executor: &Arc<dyn ProcessExecutor>,
129) -> MonitorResult {
130    monitor_idle_timeout_with_interval_and_kill_config(
131        activity_timestamp,
132        None, // No file activity config
133        child,
134        should_stop,
135        executor,
136        MonitorConfig {
137            timeout,
138            check_interval: DEFAULT_CHECK_INTERVAL,
139            kill_config: DEFAULT_KILL_CONFIG,
140            ..Default::default()
141        },
142    )
143}
144
145/// Like [`monitor_idle_timeout`] but with a configurable check interval.
146pub fn monitor_idle_timeout_with_interval(
147    activity_timestamp: &SharedActivityTimestamp,
148    child: &Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
149    timeout: Duration,
150    should_stop: &Arc<std::sync::atomic::AtomicBool>,
151    executor: &Arc<dyn ProcessExecutor>,
152    check_interval: Duration,
153) -> MonitorResult {
154    monitor_idle_timeout_with_interval_and_kill_config(
155        activity_timestamp,
156        None, // No file activity config
157        child,
158        should_stop,
159        executor,
160        MonitorConfig {
161            timeout,
162            check_interval,
163            kill_config: DEFAULT_KILL_CONFIG,
164            ..Default::default()
165        },
166    )
167}
168
169/// # Panics
170///
171/// May panic if internal synchronization primitives (mutex, atomic) are in an invalid state.
172pub fn monitor_idle_timeout_with_interval_and_kill_config(
173    activity_timestamp: &SharedActivityTimestamp,
174    file_activity_config: Option<&FileActivityConfig>,
175    child: &Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
176    should_stop: &Arc<std::sync::atomic::AtomicBool>,
177    executor: &Arc<dyn ProcessExecutor>,
178    config: MonitorConfig,
179) -> MonitorResult {
180    monitor_idle_timeout_with_interval_and_kill_config_and_observer(
181        activity_timestamp,
182        file_activity_config,
183        child,
184        should_stop,
185        executor,
186        config,
187        None,
188    )
189}
190
191pub fn monitor_idle_timeout_with_interval_and_kill_config_and_observer(
192    activity_timestamp: &SharedActivityTimestamp,
193    file_activity_config: Option<&FileActivityConfig>,
194    child: &Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
195    should_stop: &Arc<std::sync::atomic::AtomicBool>,
196    executor: &Arc<dyn ProcessExecutor>,
197    config: MonitorConfig,
198    child_activity_suppressed: Option<&Arc<std::sync::Mutex<Option<ChildProcessInfo>>>>,
199) -> MonitorResult {
200    use std::sync::atomic::Ordering;
201
202    #[derive(Debug, Clone, Copy)]
203    struct TimeoutEnforcementState {
204        pid: u32,
205        escalated: bool,
206        last_sigkill_sent_at: Option<std::time::Instant>,
207        triggered_at: std::time::Instant,
208    }
209
210    let timeout = config.timeout;
211    let check_interval = config.check_interval;
212    let kill_config = config.kill_config;
213    let required_idle_confirmations = config.required_idle_confirmations;
214    let check_child_processes = config.check_child_processes;
215
216    let mut timeout_triggered: Option<TimeoutEnforcementState> = None;
217    let mut last_file_activity: Option<std::time::Instant> = None;
218    let mut consecutive_idle_count: u32 = 0;
219    let mut last_child_observation: Option<ChildProcessInfo> = None;
220    let mut last_child_info: Option<ChildProcessInfo> = None;
221    let mut child_startup_grace_available = true;
222
223    loop {
224        // Fast-path teardown: if the process completed and we have not already
225        // triggered idle-timeout enforcement, stop immediately.
226        if timeout_triggered.is_none() && should_stop.load(Ordering::Acquire) {
227            return MonitorResult::ProcessCompleted;
228        }
229
230        if timeout_triggered.is_none()
231            && sleep_until_next_check_or_stop(should_stop.as_ref(), check_interval)
232        {
233            return MonitorResult::ProcessCompleted;
234        }
235
236        if let Some(mut state) = timeout_triggered.take() {
237            let status = {
238                let mut locked_child = child
239                    .lock()
240                    .expect("child process mutex poisoned - indicates panic in another thread");
241                locked_child.try_wait()
242            };
243
244            if let Ok(Some(_)) = status {
245                return MonitorResult::TimedOut {
246                    escalated: state.escalated,
247                    child_status_at_timeout: last_child_info,
248                };
249            }
250
251            let now = std::time::Instant::now();
252
253            // Be robust to future changes: if we ever enter the enforcement state
254            // without having escalated yet, force escalation now.
255            if state.escalated {
256                let should_resend = state
257                    .last_sigkill_sent_at
258                    .is_none_or(|t| now.duration_since(t) >= kill_config.sigkill_resend_interval());
259                if should_resend {
260                    let _ = force_kill_best_effort(state.pid, executor.as_ref());
261                    state.last_sigkill_sent_at = Some(now);
262                }
263            } else {
264                let _ = force_kill_best_effort(state.pid, executor.as_ref());
265                state.escalated = true;
266                state.last_sigkill_sent_at = Some(now);
267            }
268
269            // After a bounded enforcement window, return TimedOut so the
270            // main pipeline can regain control. A detached reaper keeps
271            // trying to kill until the process is observed dead.
272            if now.duration_since(state.triggered_at) >= kill_config.post_sigkill_hard_cap()
273                && state.escalated
274            {
275                let child_for_reaper = Arc::clone(child);
276                let executor_for_reaper = Arc::clone(executor);
277                let should_stop_for_reaper = Arc::clone(should_stop);
278                let config_for_reaper = kill_config;
279                let pid = state.pid;
280                std::thread::spawn(move || {
281                    // Bound the reaper's lifetime to avoid leaking threads across
282                    // repeated timeouts. If the process is truly unkillable, a bounded
283                    // best-effort reaper is the least-bad option.
284                    let deadline =
285                        std::time::Instant::now() + config_for_reaper.post_sigkill_hard_cap();
286                    let mut last_kill_sent_at: Option<std::time::Instant> = None;
287
288                    while std::time::Instant::now() < deadline {
289                        if should_stop_for_reaper.load(Ordering::Acquire) {
290                            return;
291                        }
292
293                        let status = {
294                            let mut locked_child = child_for_reaper.lock().expect(
295                                "child process mutex poisoned - indicates panic in another thread",
296                            );
297                            locked_child.try_wait()
298                        };
299
300                        if let Ok(Some(_)) = status {
301                            return;
302                        }
303                        let now = std::time::Instant::now();
304                        let should_resend = last_kill_sent_at.is_none_or(|t| {
305                            now.duration_since(t) >= config_for_reaper.sigkill_resend_interval()
306                        });
307                        if should_resend {
308                            let _ = force_kill_best_effort(pid, executor_for_reaper.as_ref());
309                            last_kill_sent_at = Some(now);
310                        }
311                        std::thread::sleep(config_for_reaper.poll_interval());
312                    }
313                });
314
315                return MonitorResult::TimedOut {
316                    escalated: state.escalated,
317                    child_status_at_timeout: last_child_info,
318                };
319            }
320
321            timeout_triggered = Some(state);
322            continue;
323        }
324
325        if !is_idle_timeout_exceeded(activity_timestamp, timeout) {
326            consecutive_idle_count = 0;
327            last_child_observation = None;
328            last_child_info = None;
329            child_startup_grace_available = true;
330            continue;
331        }
332
333        // Log diagnostic information about timeout trigger
334        let time_since_output = super::time_since_activity(activity_timestamp);
335        eprintln!(
336            "Idle timeout exceeded: no output activity for {} seconds",
337            time_since_output.as_secs()
338        );
339
340        // Check file activity if config provided
341        if let Some(config) = file_activity_config {
342            // Fast path: if we confirmed file activity recently (monotonic clock),
343            // skip an expensive filesystem re-scan. This prevents the multi-iteration
344            // false positive where the same file falls outside the window on the next
345            // check because check_interval has elapsed.
346            if last_file_activity.is_some_and(|t| t.elapsed() < timeout) {
347                consecutive_idle_count = 0;
348                last_child_observation = None;
349                last_child_info = None;
350                child_startup_grace_available = true;
351                eprintln!(
352                    "Continuing monitoring: file activity was confirmed within the last timeout window"
353                );
354                continue;
355            }
356
357            // Widen the scan window to cover check_interval jitter plus scan overhead:
358            //   - A file written just before output stopped will be ~(timeout + check_interval)
359            //     old when the monitor first fires.
360            //   - The file scan itself takes time, so `actual_idle` computed before the scan
361            //     is slightly smaller than the true elapsed time at comparison; adding
362            //     `scan_overhead_buffer` compensates for that.
363            //   - `cap` bounds the maximum window so that after `last_file_activity` expires
364            //     and we re-scan, old files written long before output stopped do not
365            //     indefinitely prevent a correct kill.
366            let scan_overhead_buffer = Duration::from_secs(1);
367            let cap = timeout + check_interval + scan_overhead_buffer;
368            let actual_idle = super::time_since_activity(activity_timestamp);
369            let file_window = (actual_idle + scan_overhead_buffer).min(cap);
370
371            let locked_tracker = config
372                .tracker
373                .lock()
374                .expect("file activity tracker mutex poisoned - indicates panic in another thread");
375
376            match locked_tracker.check_for_recent_activity(config.workspace.as_ref(), file_window) {
377                Ok(true) => {
378                    consecutive_idle_count = 0;
379                    last_file_activity = Some(std::time::Instant::now());
380                    last_child_observation = None;
381                    last_child_info = None;
382                    child_startup_grace_available = true;
383                    eprintln!("AI-generated files were updated recently, continuing monitoring");
384                    continue;
385                }
386                Ok(false) => {
387                    eprintln!(
388                        "No AI-generated file updates in the last {file_window:?}, proceeding with timeout"
389                    );
390                }
391                Err(e) => {
392                    eprintln!(
393                        "Warning: file activity check failed (treating as no recent file activity, proceeding with timeout enforcement): {e}"
394                    );
395                }
396            }
397        }
398
399        // Re-check output timestamp: the agent may have produced output during the
400        // file scan. This closes the race window between "scan said no activity" and
401        // "kill is sent".
402        if !is_idle_timeout_exceeded(activity_timestamp, timeout) {
403            consecutive_idle_count = 0;
404            last_child_observation = None;
405            last_child_info = None;
406            child_startup_grace_available = true;
407            eprintln!("Output activity detected after file scan; continuing monitoring");
408            continue;
409        }
410
411        // Check for active child processes: the agent may have spawned a subprocess
412        // (e.g. cargo test, cargo build, npm install) that is still doing useful
413        // work even though there is no stdout/stderr output and no file-system
414        // activity in tracked locations. Mere descendant existence is not enough:
415        // suppression only applies when the executor reports currently active child
416        // work in the latest snapshot.
417        if check_child_processes {
418            let child_pid = {
419                let locked_child = child.lock().expect("child process mutex poisoned");
420                locked_child.id()
421            };
422            let info = executor.get_child_process_info(child_pid);
423            if info.has_children() {
424                last_child_info = Some(info);
425                let previous_observation = last_child_observation;
426                let first_active_child_observation = previous_observation.is_none()
427                    && child_startup_grace_available
428                    && info.has_currently_active_children();
429
430                if first_active_child_observation {
431                    child_startup_grace_available = false;
432                    consecutive_idle_count = 0;
433                    last_child_observation = Some(info);
434                    eprintln!(
435                        "Agent has currently active child processes for the first time during idle timeout \
436                         (pid {child_pid}, {} active of {} children, cpu {}ms, signature {}); granting startup grace",
437                        info.active_child_count,
438                        info.child_count,
439                        info.cpu_time_ms,
440                        info.descendant_pid_signature
441                    );
442                    continue;
443                }
444
445                if previous_observation.is_some_and(|prev| info.shows_fresh_progress_since(prev)) {
446                    last_child_observation = Some(info);
447                    if let Some(observed_activity) = child_activity_suppressed.as_ref() {
448                        *observed_activity
449                            .lock()
450                            .expect("child activity observer mutex poisoned") = Some(info);
451                    }
452                    consecutive_idle_count = 0;
453                    child_startup_grace_available = true;
454                    eprintln!(
455                        "Agent has currently active child processes (pid {child_pid}, \
456                         {} active of {} children, cpu {}ms, signature {}); continuing monitoring",
457                        info.active_child_count,
458                        info.child_count,
459                        info.cpu_time_ms,
460                        info.descendant_pid_signature
461                    );
462                    continue;
463                }
464
465                if info.has_stalled_children() {
466                    eprintln!(
467                        "Agent has child processes (pid {child_pid}, {} total, 0 currently active, cpu {}ms, signature {}) \
468                         but none show current work; treating as idle",
469                        info.child_count,
470                        info.cpu_time_ms,
471                        info.descendant_pid_signature
472                    );
473                } else if info.has_currently_active_children() {
474                    eprintln!(
475                        "Agent has child processes (pid {child_pid}, {} active of {} total, cpu {}ms, signature {}) \
476                         but they showed no fresh progress since the last idle check; treating as idle",
477                        info.active_child_count,
478                        info.child_count,
479                        info.cpu_time_ms,
480                        info.descendant_pid_signature
481                    );
482                }
483                last_child_observation = Some(info);
484            } else {
485                last_child_observation = None;
486                last_child_info = None;
487                child_startup_grace_available = true;
488            }
489        }
490
491        // Require multiple consecutive idle observations before killing to avoid
492        // false positives during transient quiet periods (LLM API waits, slow
493        // compilations, transitions between work phases, etc.).
494        consecutive_idle_count += 1;
495        if consecutive_idle_count < required_idle_confirmations {
496            eprintln!(
497                "Idle confirmed {consecutive_idle_count}/{required_idle_confirmations} times; waiting for next check interval before kill"
498            );
499            continue;
500        }
501
502        let child_id = {
503            let mut locked_child = child
504                .lock()
505                .expect("child process mutex poisoned - indicates panic in another thread");
506            if let Ok(Some(_)) = locked_child.try_wait() {
507                return MonitorResult::ProcessCompleted;
508            }
509            locked_child.id()
510        };
511
512        let kill_result = kill_process(child_id, executor.as_ref(), Some(child), kill_config);
513        match kill_result {
514            KillResult::TerminatedByTerm => {
515                return MonitorResult::TimedOut {
516                    escalated: false,
517                    child_status_at_timeout: last_child_info,
518                }
519            }
520            KillResult::TerminatedByKill => {
521                return MonitorResult::TimedOut {
522                    escalated: true,
523                    child_status_at_timeout: last_child_info,
524                }
525            }
526            KillResult::SignalsSentAwaitingExit { escalated } => {
527                timeout_triggered = Some(TimeoutEnforcementState {
528                    pid: child_id,
529                    escalated,
530                    triggered_at: std::time::Instant::now(),
531                    last_sigkill_sent_at: escalated.then_some(std::time::Instant::now()),
532                });
533            }
534            KillResult::Failed => {
535                if should_stop.load(Ordering::Acquire) {
536                    return MonitorResult::ProcessCompleted;
537                }
538            }
539        }
540    }
541}