Skip to main content

ralph_workflow/pipeline/
idle_timeout.rs

1//! Idle timeout detection for agent subprocess execution.
2//!
3//! This module provides infrastructure to detect when an agent subprocess
4//! has stopped producing output, indicating it may be stuck (e.g., waiting
5//! for user input in unattended mode).
6//!
7//! # Design
8//!
9//! The idle timeout system uses a shared atomic timestamp that gets updated
10//! whenever data is read from the subprocess stdout. A monitor thread
11//! periodically checks this timestamp and can kill the subprocess if
12//! no output has been received for longer than the configured timeout.
13//!
14//! # Timeout Value
15//!
16//! The default timeout is 5 minutes (300 seconds), which is:
17//! - Long enough for complex tool operations and LLM reasoning
18//! - Short enough to detect truly stuck agents
19//! - Aligned with typical CI/CD step timeouts
20
21use std::io::{self, Read};
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, SystemTime, UNIX_EPOCH};
25
26use crate::executor::ProcessExecutor;
27
28/// Default idle timeout in seconds (5 minutes).
29///
30/// This value was chosen because:
31/// - Complex tool operations rarely take more than 2 minutes of silence
32/// - LLM reasoning models may take 30-60 seconds between outputs
33/// - If no output for 5 minutes, the agent is almost certainly stuck
34pub const IDLE_TIMEOUT_SECS: u64 = 300;
35
36/// Shared timestamp for tracking last activity.
37///
38/// Stores milliseconds since UNIX epoch. Use [`ActivityTracker::new`] to create.
39pub type SharedActivityTimestamp = Arc<AtomicU64>;
40
41/// Creates a new shared activity timestamp initialized to the current time.
42pub fn new_activity_timestamp() -> SharedActivityTimestamp {
43    let now_ms = SystemTime::now()
44        .duration_since(UNIX_EPOCH)
45        .unwrap_or(Duration::ZERO)
46        .as_millis() as u64;
47    Arc::new(AtomicU64::new(now_ms))
48}
49
50/// Updates the shared activity timestamp to the current time.
51pub fn touch_activity(timestamp: &SharedActivityTimestamp) {
52    let now_ms = SystemTime::now()
53        .duration_since(UNIX_EPOCH)
54        .unwrap_or(Duration::ZERO)
55        .as_millis() as u64;
56    timestamp.store(now_ms, Ordering::Release);
57}
58
59/// Returns the duration since the last activity update.
60pub fn time_since_activity(timestamp: &SharedActivityTimestamp) -> Duration {
61    let last_ms = timestamp.load(Ordering::Acquire);
62    let now_ms = SystemTime::now()
63        .duration_since(UNIX_EPOCH)
64        .unwrap_or(Duration::ZERO)
65        .as_millis() as u64;
66
67    Duration::from_millis(now_ms.saturating_sub(last_ms))
68}
69
70/// Checks if the idle timeout has been exceeded.
71pub fn is_idle_timeout_exceeded(timestamp: &SharedActivityTimestamp, timeout_secs: u64) -> bool {
72    time_since_activity(timestamp) > Duration::from_secs(timeout_secs)
73}
74
75/// A reader wrapper that updates an activity timestamp on every read.
76///
77/// This wraps any `Read` implementation and updates a shared atomic timestamp
78/// whenever data is successfully read. This allows external monitoring of
79/// read activity for idle timeout detection.
80pub struct ActivityTrackingReader<R: Read> {
81    inner: R,
82    activity_timestamp: SharedActivityTimestamp,
83}
84
85impl<R: Read> ActivityTrackingReader<R> {
86    /// Create a new activity-tracking reader.
87    ///
88    /// The provided timestamp will be updated to the current time
89    /// whenever data is successfully read from the inner reader.
90    pub fn new(inner: R, activity_timestamp: SharedActivityTimestamp) -> Self {
91        // Initialize timestamp to now
92        touch_activity(&activity_timestamp);
93        Self {
94            inner,
95            activity_timestamp,
96        }
97    }
98}
99
100impl<R: Read> Read for ActivityTrackingReader<R> {
101    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
102        let n = self.inner.read(buf)?;
103        if n > 0 {
104            touch_activity(&self.activity_timestamp);
105        }
106        Ok(n)
107    }
108}
109
110/// Result of idle timeout monitoring.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum MonitorResult {
113    /// Process completed normally (not killed by monitor).
114    ProcessCompleted,
115    /// Process was killed due to idle timeout.
116    TimedOut,
117}
118
119/// Monitors activity and kills a process if idle timeout is exceeded.
120///
121/// This function runs in a loop, checking the activity timestamp periodically.
122/// If the timestamp indicates no activity for longer than `timeout_secs`,
123/// it will attempt to kill the process and return `MonitorResult::TimedOut`.
124///
125/// The function returns when either:
126/// - The process is killed due to timeout
127/// - The `should_stop` flag is set to true (process completed normally)
128///
129/// # Arguments
130///
131/// * `activity_timestamp` - Shared timestamp updated by the reader
132/// * `child_id` - Process ID to kill if timeout exceeded
133/// * `timeout_secs` - Maximum seconds of inactivity before killing
134/// * `should_stop` - Atomic flag to signal monitor should exit (set when process completes)
135/// * `executor` - Process executor for killing the subprocess
136///
137/// # Platform Notes
138///
139/// Uses `kill -TERM` command on Unix and `taskkill` on Windows via the ProcessExecutor trait.
140pub fn monitor_idle_timeout(
141    activity_timestamp: SharedActivityTimestamp,
142    child_id: u32,
143    timeout_secs: u64,
144    should_stop: Arc<std::sync::atomic::AtomicBool>,
145    executor: Arc<dyn ProcessExecutor>,
146) -> MonitorResult {
147    use std::sync::atomic::Ordering;
148
149    // Check every second
150    const CHECK_INTERVAL: Duration = Duration::from_secs(1);
151
152    loop {
153        std::thread::sleep(CHECK_INTERVAL);
154
155        // Check if we should stop (process completed normally)
156        if should_stop.load(Ordering::Acquire) {
157            return MonitorResult::ProcessCompleted;
158        }
159
160        // Check if idle timeout exceeded
161        if is_idle_timeout_exceeded(&activity_timestamp, timeout_secs) {
162            // Kill the process using platform-specific command
163            let killed = kill_process(child_id, executor.as_ref());
164            if killed {
165                return MonitorResult::TimedOut;
166            }
167            // If kill failed, process may have already exited - check should_stop again
168            if should_stop.load(Ordering::Acquire) {
169                return MonitorResult::ProcessCompleted;
170            }
171            // Kill failed for unknown reason, try again next iteration
172        }
173    }
174}
175
176/// Kill a process by PID using platform-specific commands via executor.
177///
178/// Returns true if the kill command succeeded, false otherwise.
179#[cfg(unix)]
180fn kill_process(pid: u32, executor: &dyn ProcessExecutor) -> bool {
181    // Use kill command to send SIGTERM via ProcessExecutor
182    executor
183        .execute("kill", &["-TERM", &pid.to_string()], &[], None)
184        .map(|o| o.status.success())
185        .unwrap_or(false)
186}
187
188/// Kill a process by PID using platform-specific commands via executor.
189///
190/// Returns true if the kill command succeeded, false otherwise.
191#[cfg(windows)]
192fn kill_process(pid: u32, executor: &dyn ProcessExecutor) -> bool {
193    // Use taskkill to force kill the process via ProcessExecutor
194    executor
195        .execute("taskkill", &["/F", "/PID", &pid.to_string()], &[], None)
196        .map(|o| o.status.success())
197        .unwrap_or(false)
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use std::io::Cursor;
204    use std::thread;
205
206    #[test]
207    fn test_new_activity_timestamp_is_recent() {
208        let timestamp = new_activity_timestamp();
209        let elapsed = time_since_activity(&timestamp);
210        // Should be very recent (less than 100ms)
211        assert!(elapsed < Duration::from_millis(100));
212    }
213
214    #[test]
215    fn test_touch_activity_updates_timestamp() {
216        let timestamp = new_activity_timestamp();
217        // Wait a bit
218        thread::sleep(Duration::from_millis(50));
219        let before_touch = time_since_activity(&timestamp);
220
221        // Touch should reset the elapsed time
222        touch_activity(&timestamp);
223        let after_touch = time_since_activity(&timestamp);
224
225        assert!(before_touch >= Duration::from_millis(50));
226        assert!(after_touch < Duration::from_millis(10));
227    }
228
229    #[test]
230    fn test_is_idle_timeout_exceeded_false_when_recent() {
231        let timestamp = new_activity_timestamp();
232        // Timeout of 1 second, activity just now
233        assert!(!is_idle_timeout_exceeded(&timestamp, 1));
234    }
235
236    #[test]
237    fn test_is_idle_timeout_exceeded_true_after_timeout() {
238        let timestamp = new_activity_timestamp();
239        // Set timestamp to 2 seconds ago
240        let two_secs_ago = SystemTime::now()
241            .duration_since(UNIX_EPOCH)
242            .unwrap()
243            .as_millis() as u64
244            - 2000;
245        timestamp.store(two_secs_ago, Ordering::Release);
246
247        // Timeout of 1 second should be exceeded
248        assert!(is_idle_timeout_exceeded(&timestamp, 1));
249    }
250
251    #[test]
252    fn test_activity_tracking_reader_updates_on_read() {
253        let data = b"hello world";
254        let cursor = Cursor::new(data.to_vec());
255        let timestamp = new_activity_timestamp();
256
257        let mut reader = ActivityTrackingReader::new(cursor, timestamp.clone());
258
259        // Set timestamp to 1 second ago AFTER creating reader
260        // (since new() calls touch_activity)
261        let one_sec_ago = SystemTime::now()
262            .duration_since(UNIX_EPOCH)
263            .unwrap()
264            .as_millis() as u64
265            - 1000;
266        timestamp.store(one_sec_ago, Ordering::Release);
267
268        // Before read, should be ~1 second old
269        assert!(time_since_activity(&timestamp) >= Duration::from_millis(900));
270
271        // Read some data
272        let mut buf = [0u8; 5];
273        let n = reader.read(&mut buf).unwrap();
274        assert_eq!(n, 5);
275
276        // After read, timestamp should be updated (very recent)
277        assert!(time_since_activity(&timestamp) < Duration::from_millis(100));
278    }
279
280    #[test]
281    fn test_activity_tracking_reader_no_update_on_zero_read() {
282        let data = b"";
283        let cursor = Cursor::new(data.to_vec());
284        let timestamp = new_activity_timestamp();
285
286        // Set timestamp to 1 second ago
287        let one_sec_ago = SystemTime::now()
288            .duration_since(UNIX_EPOCH)
289            .unwrap()
290            .as_millis() as u64
291            - 1000;
292        timestamp.store(one_sec_ago, Ordering::Release);
293
294        let mut reader = ActivityTrackingReader::new(cursor, timestamp.clone());
295
296        // Note: ActivityTrackingReader::new touches the timestamp, so reset it
297        let one_sec_ago = SystemTime::now()
298            .duration_since(UNIX_EPOCH)
299            .unwrap()
300            .as_millis() as u64
301            - 1000;
302        timestamp.store(one_sec_ago, Ordering::Release);
303
304        // Read (should return 0, EOF)
305        let mut buf = [0u8; 5];
306        let n = reader.read(&mut buf).unwrap();
307        assert_eq!(n, 0);
308
309        // After zero-read, timestamp should NOT be updated
310        assert!(time_since_activity(&timestamp) >= Duration::from_millis(900));
311    }
312
313    #[test]
314    fn test_activity_tracking_reader_passes_through_data() {
315        let data = b"hello world";
316        let cursor = Cursor::new(data.to_vec());
317        let timestamp = new_activity_timestamp();
318
319        let mut reader = ActivityTrackingReader::new(cursor, timestamp);
320
321        let mut buf = [0u8; 20];
322        let n = reader.read(&mut buf).unwrap();
323
324        assert_eq!(n, 11);
325        assert_eq!(&buf[..n], b"hello world");
326    }
327
328    #[test]
329    fn test_idle_timeout_constant_is_five_minutes() {
330        assert_eq!(IDLE_TIMEOUT_SECS, 300);
331    }
332
333    #[test]
334    fn test_monitor_result_variants() {
335        // Ensure MonitorResult variants exist and are distinct
336        assert_ne!(MonitorResult::ProcessCompleted, MonitorResult::TimedOut);
337    }
338
339    #[test]
340    fn test_monitor_stops_when_signaled() {
341        use std::sync::atomic::AtomicBool;
342
343        let timestamp = new_activity_timestamp();
344        let should_stop = Arc::new(AtomicBool::new(false));
345        let should_stop_clone = should_stop.clone();
346
347        // Use a fake PID (0 - which won't match any real process)
348        let fake_pid = 0u32;
349
350        // Create a mock executor for the monitor
351        let executor: Arc<dyn crate::executor::ProcessExecutor> =
352            Arc::new(crate::executor::MockProcessExecutor::new());
353
354        // Spawn monitor in a thread
355        let handle = thread::spawn(move || {
356            monitor_idle_timeout(timestamp, fake_pid, 60, should_stop_clone, executor)
357        });
358
359        // Signal stop after a short delay
360        thread::sleep(Duration::from_millis(50));
361        should_stop.store(true, std::sync::atomic::Ordering::Release);
362
363        // Wait for monitor to complete
364        let result = handle.join().expect("Monitor thread panicked");
365        assert_eq!(result, MonitorResult::ProcessCompleted);
366    }
367}