ralph_workflow/pipeline/idle_timeout/
monitor.rs1use super::kill::{
4 force_kill_best_effort, kill_process, KillConfig, KillResult, DEFAULT_KILL_CONFIG,
5};
6use super::{is_idle_timeout_exceeded, SharedActivityTimestamp, SharedFileActivityTracker};
7use crate::executor::{AgentChild, ChildProcessInfo, ProcessExecutor};
8use crate::workspace::Workspace;
9use std::sync::Arc;
10use std::time::Duration;
11
12pub struct FileActivityConfig {
17 pub tracker: SharedFileActivityTracker,
19 pub workspace: Arc<dyn Workspace>,
21}
22
23#[derive(Debug, Clone, Copy)]
25pub struct MonitorConfig {
26 pub timeout: Duration,
28 pub check_interval: Duration,
30 pub kill_config: KillConfig,
32 pub required_idle_confirmations: u32,
41 pub check_child_processes: bool,
50}
51
52impl Default for MonitorConfig {
53 fn default() -> Self {
54 Self {
55 timeout: Duration::from_secs(super::IDLE_TIMEOUT_SECS),
56 check_interval: DEFAULT_CHECK_INTERVAL,
57 kill_config: DEFAULT_KILL_CONFIG,
58 required_idle_confirmations: 2,
59 check_child_processes: true,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum MonitorResult {
67 ProcessCompleted,
69 TimedOut {
89 escalated: bool,
90 child_status_at_timeout: Option<ChildProcessInfo>,
91 },
92}
93
94const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_secs(30);
96
97fn sleep_until_next_check_or_stop(
98 should_stop: &std::sync::atomic::AtomicBool,
99 check_interval: Duration,
100) -> bool {
101 use std::cmp;
102 use std::sync::atomic::Ordering;
103
104 let poll_interval = cmp::min(check_interval, Duration::from_millis(100));
105 let deadline = std::time::Instant::now() + check_interval;
106
107 loop {
108 if should_stop.load(Ordering::Acquire) {
109 return true;
110 }
111
112 let now = std::time::Instant::now();
113 if now >= deadline {
114 return false;
115 }
116
117 let remaining = deadline.saturating_duration_since(now);
118 std::thread::sleep(cmp::min(poll_interval, remaining));
119 }
120}
121
122pub fn monitor_idle_timeout(
124 activity_timestamp: &SharedActivityTimestamp,
125 child: &Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
126 timeout: Duration,
127 should_stop: &Arc<std::sync::atomic::AtomicBool>,
128 executor: &Arc<dyn ProcessExecutor>,
129) -> MonitorResult {
130 monitor_idle_timeout_with_interval_and_kill_config(
131 activity_timestamp,
132 None, child,
134 should_stop,
135 executor,
136 MonitorConfig {
137 timeout,
138 check_interval: DEFAULT_CHECK_INTERVAL,
139 kill_config: DEFAULT_KILL_CONFIG,
140 ..Default::default()
141 },
142 )
143}
144
145pub fn monitor_idle_timeout_with_interval(
147 activity_timestamp: &SharedActivityTimestamp,
148 child: &Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
149 timeout: Duration,
150 should_stop: &Arc<std::sync::atomic::AtomicBool>,
151 executor: &Arc<dyn ProcessExecutor>,
152 check_interval: Duration,
153) -> MonitorResult {
154 monitor_idle_timeout_with_interval_and_kill_config(
155 activity_timestamp,
156 None, child,
158 should_stop,
159 executor,
160 MonitorConfig {
161 timeout,
162 check_interval,
163 kill_config: DEFAULT_KILL_CONFIG,
164 ..Default::default()
165 },
166 )
167}
168
169pub fn monitor_idle_timeout_with_interval_and_kill_config(
173 activity_timestamp: &SharedActivityTimestamp,
174 file_activity_config: Option<&FileActivityConfig>,
175 child: &Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
176 should_stop: &Arc<std::sync::atomic::AtomicBool>,
177 executor: &Arc<dyn ProcessExecutor>,
178 config: MonitorConfig,
179) -> MonitorResult {
180 monitor_idle_timeout_with_interval_and_kill_config_and_observer(
181 activity_timestamp,
182 file_activity_config,
183 child,
184 should_stop,
185 executor,
186 config,
187 None,
188 )
189}
190
191pub fn monitor_idle_timeout_with_interval_and_kill_config_and_observer(
192 activity_timestamp: &SharedActivityTimestamp,
193 file_activity_config: Option<&FileActivityConfig>,
194 child: &Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
195 should_stop: &Arc<std::sync::atomic::AtomicBool>,
196 executor: &Arc<dyn ProcessExecutor>,
197 config: MonitorConfig,
198 child_activity_suppressed: Option<&Arc<std::sync::Mutex<Option<ChildProcessInfo>>>>,
199) -> MonitorResult {
200 use std::sync::atomic::Ordering;
201
202 #[derive(Debug, Clone, Copy)]
203 struct TimeoutEnforcementState {
204 pid: u32,
205 escalated: bool,
206 last_sigkill_sent_at: Option<std::time::Instant>,
207 triggered_at: std::time::Instant,
208 }
209
210 let timeout = config.timeout;
211 let check_interval = config.check_interval;
212 let kill_config = config.kill_config;
213 let required_idle_confirmations = config.required_idle_confirmations;
214 let check_child_processes = config.check_child_processes;
215
216 let mut timeout_triggered: Option<TimeoutEnforcementState> = None;
217 let mut last_file_activity: Option<std::time::Instant> = None;
218 let mut consecutive_idle_count: u32 = 0;
219 let mut last_child_observation: Option<ChildProcessInfo> = None;
220 let mut last_child_info: Option<ChildProcessInfo> = None;
221 let mut child_startup_grace_available = true;
222
223 loop {
224 if timeout_triggered.is_none() && should_stop.load(Ordering::Acquire) {
227 return MonitorResult::ProcessCompleted;
228 }
229
230 if timeout_triggered.is_none()
231 && sleep_until_next_check_or_stop(should_stop.as_ref(), check_interval)
232 {
233 return MonitorResult::ProcessCompleted;
234 }
235
236 if let Some(mut state) = timeout_triggered.take() {
237 let status = {
238 let mut locked_child = child
239 .lock()
240 .expect("child process mutex poisoned - indicates panic in another thread");
241 locked_child.try_wait()
242 };
243
244 if let Ok(Some(_)) = status {
245 return MonitorResult::TimedOut {
246 escalated: state.escalated,
247 child_status_at_timeout: last_child_info,
248 };
249 }
250
251 let now = std::time::Instant::now();
252
253 if state.escalated {
256 let should_resend = state
257 .last_sigkill_sent_at
258 .is_none_or(|t| now.duration_since(t) >= kill_config.sigkill_resend_interval());
259 if should_resend {
260 let _ = force_kill_best_effort(state.pid, executor.as_ref());
261 state.last_sigkill_sent_at = Some(now);
262 }
263 } else {
264 let _ = force_kill_best_effort(state.pid, executor.as_ref());
265 state.escalated = true;
266 state.last_sigkill_sent_at = Some(now);
267 }
268
269 if now.duration_since(state.triggered_at) >= kill_config.post_sigkill_hard_cap()
273 && state.escalated
274 {
275 let child_for_reaper = Arc::clone(child);
276 let executor_for_reaper = Arc::clone(executor);
277 let should_stop_for_reaper = Arc::clone(should_stop);
278 let config_for_reaper = kill_config;
279 let pid = state.pid;
280 std::thread::spawn(move || {
281 let deadline =
285 std::time::Instant::now() + config_for_reaper.post_sigkill_hard_cap();
286 let mut last_kill_sent_at: Option<std::time::Instant> = None;
287
288 while std::time::Instant::now() < deadline {
289 if should_stop_for_reaper.load(Ordering::Acquire) {
290 return;
291 }
292
293 let status = {
294 let mut locked_child = child_for_reaper.lock().expect(
295 "child process mutex poisoned - indicates panic in another thread",
296 );
297 locked_child.try_wait()
298 };
299
300 if let Ok(Some(_)) = status {
301 return;
302 }
303 let now = std::time::Instant::now();
304 let should_resend = last_kill_sent_at.is_none_or(|t| {
305 now.duration_since(t) >= config_for_reaper.sigkill_resend_interval()
306 });
307 if should_resend {
308 let _ = force_kill_best_effort(pid, executor_for_reaper.as_ref());
309 last_kill_sent_at = Some(now);
310 }
311 std::thread::sleep(config_for_reaper.poll_interval());
312 }
313 });
314
315 return MonitorResult::TimedOut {
316 escalated: state.escalated,
317 child_status_at_timeout: last_child_info,
318 };
319 }
320
321 timeout_triggered = Some(state);
322 continue;
323 }
324
325 if !is_idle_timeout_exceeded(activity_timestamp, timeout) {
326 consecutive_idle_count = 0;
327 last_child_observation = None;
328 last_child_info = None;
329 child_startup_grace_available = true;
330 continue;
331 }
332
333 let time_since_output = super::time_since_activity(activity_timestamp);
335 eprintln!(
336 "Idle timeout exceeded: no output activity for {} seconds",
337 time_since_output.as_secs()
338 );
339
340 if let Some(config) = file_activity_config {
342 if last_file_activity.is_some_and(|t| t.elapsed() < timeout) {
347 consecutive_idle_count = 0;
348 last_child_observation = None;
349 last_child_info = None;
350 child_startup_grace_available = true;
351 eprintln!(
352 "Continuing monitoring: file activity was confirmed within the last timeout window"
353 );
354 continue;
355 }
356
357 let scan_overhead_buffer = Duration::from_secs(1);
367 let cap = timeout + check_interval + scan_overhead_buffer;
368 let actual_idle = super::time_since_activity(activity_timestamp);
369 let file_window = (actual_idle + scan_overhead_buffer).min(cap);
370
371 let locked_tracker = config
372 .tracker
373 .lock()
374 .expect("file activity tracker mutex poisoned - indicates panic in another thread");
375
376 match locked_tracker.check_for_recent_activity(config.workspace.as_ref(), file_window) {
377 Ok(true) => {
378 consecutive_idle_count = 0;
379 last_file_activity = Some(std::time::Instant::now());
380 last_child_observation = None;
381 last_child_info = None;
382 child_startup_grace_available = true;
383 eprintln!("AI-generated files were updated recently, continuing monitoring");
384 continue;
385 }
386 Ok(false) => {
387 eprintln!(
388 "No AI-generated file updates in the last {file_window:?}, proceeding with timeout"
389 );
390 }
391 Err(e) => {
392 eprintln!(
393 "Warning: file activity check failed (treating as no recent file activity, proceeding with timeout enforcement): {e}"
394 );
395 }
396 }
397 }
398
399 if !is_idle_timeout_exceeded(activity_timestamp, timeout) {
403 consecutive_idle_count = 0;
404 last_child_observation = None;
405 last_child_info = None;
406 child_startup_grace_available = true;
407 eprintln!("Output activity detected after file scan; continuing monitoring");
408 continue;
409 }
410
411 if check_child_processes {
418 let child_pid = {
419 let locked_child = child.lock().expect("child process mutex poisoned");
420 locked_child.id()
421 };
422 let info = executor.get_child_process_info(child_pid);
423 if info.has_children() {
424 last_child_info = Some(info);
425 let previous_observation = last_child_observation;
426 let first_active_child_observation = previous_observation.is_none()
427 && child_startup_grace_available
428 && info.has_currently_active_children();
429
430 if first_active_child_observation {
431 child_startup_grace_available = false;
432 consecutive_idle_count = 0;
433 last_child_observation = Some(info);
434 eprintln!(
435 "Agent has currently active child processes for the first time during idle timeout \
436 (pid {child_pid}, {} active of {} children, cpu {}ms, signature {}); granting startup grace",
437 info.active_child_count,
438 info.child_count,
439 info.cpu_time_ms,
440 info.descendant_pid_signature
441 );
442 continue;
443 }
444
445 if previous_observation.is_some_and(|prev| info.shows_fresh_progress_since(prev)) {
446 last_child_observation = Some(info);
447 if let Some(observed_activity) = child_activity_suppressed.as_ref() {
448 *observed_activity
449 .lock()
450 .expect("child activity observer mutex poisoned") = Some(info);
451 }
452 consecutive_idle_count = 0;
453 child_startup_grace_available = true;
454 eprintln!(
455 "Agent has currently active child processes (pid {child_pid}, \
456 {} active of {} children, cpu {}ms, signature {}); continuing monitoring",
457 info.active_child_count,
458 info.child_count,
459 info.cpu_time_ms,
460 info.descendant_pid_signature
461 );
462 continue;
463 }
464
465 if info.has_stalled_children() {
466 eprintln!(
467 "Agent has child processes (pid {child_pid}, {} total, 0 currently active, cpu {}ms, signature {}) \
468 but none show current work; treating as idle",
469 info.child_count,
470 info.cpu_time_ms,
471 info.descendant_pid_signature
472 );
473 } else if info.has_currently_active_children() {
474 eprintln!(
475 "Agent has child processes (pid {child_pid}, {} active of {} total, cpu {}ms, signature {}) \
476 but they showed no fresh progress since the last idle check; treating as idle",
477 info.active_child_count,
478 info.child_count,
479 info.cpu_time_ms,
480 info.descendant_pid_signature
481 );
482 }
483 last_child_observation = Some(info);
484 } else {
485 last_child_observation = None;
486 last_child_info = None;
487 child_startup_grace_available = true;
488 }
489 }
490
491 consecutive_idle_count += 1;
495 if consecutive_idle_count < required_idle_confirmations {
496 eprintln!(
497 "Idle confirmed {consecutive_idle_count}/{required_idle_confirmations} times; waiting for next check interval before kill"
498 );
499 continue;
500 }
501
502 let child_id = {
503 let mut locked_child = child
504 .lock()
505 .expect("child process mutex poisoned - indicates panic in another thread");
506 if let Ok(Some(_)) = locked_child.try_wait() {
507 return MonitorResult::ProcessCompleted;
508 }
509 locked_child.id()
510 };
511
512 let kill_result = kill_process(child_id, executor.as_ref(), Some(child), kill_config);
513 match kill_result {
514 KillResult::TerminatedByTerm => {
515 return MonitorResult::TimedOut {
516 escalated: false,
517 child_status_at_timeout: last_child_info,
518 }
519 }
520 KillResult::TerminatedByKill => {
521 return MonitorResult::TimedOut {
522 escalated: true,
523 child_status_at_timeout: last_child_info,
524 }
525 }
526 KillResult::SignalsSentAwaitingExit { escalated } => {
527 timeout_triggered = Some(TimeoutEnforcementState {
528 pid: child_id,
529 escalated,
530 triggered_at: std::time::Instant::now(),
531 last_sigkill_sent_at: escalated.then_some(std::time::Instant::now()),
532 });
533 }
534 KillResult::Failed => {
535 if should_stop.load(Ordering::Acquire) {
536 return MonitorResult::ProcessCompleted;
537 }
538 }
539 }
540 }
541}