ralph_workflow/pipeline/idle_timeout/
monitor.rs1use super::kill::{
4 force_kill_best_effort, kill_process, KillConfig, KillResult, DEFAULT_KILL_CONFIG,
5};
6use super::{is_idle_timeout_exceeded, SharedActivityTimestamp};
7use crate::executor::{AgentChild, ProcessExecutor};
8use std::sync::Arc;
9use std::time::Duration;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum MonitorResult {
14 ProcessCompleted,
16 TimedOut { escalated: bool },
31}
32
33const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_secs(1);
35
36pub fn monitor_idle_timeout(
38 activity_timestamp: SharedActivityTimestamp,
39 child: Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
40 timeout_secs: u64,
41 should_stop: Arc<std::sync::atomic::AtomicBool>,
42 executor: Arc<dyn ProcessExecutor>,
43) -> MonitorResult {
44 monitor_idle_timeout_with_interval_and_kill_config(
45 activity_timestamp,
46 child,
47 timeout_secs,
48 should_stop,
49 executor,
50 DEFAULT_CHECK_INTERVAL,
51 DEFAULT_KILL_CONFIG,
52 )
53}
54
55pub fn monitor_idle_timeout_with_interval(
57 activity_timestamp: SharedActivityTimestamp,
58 child: Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
59 timeout_secs: u64,
60 should_stop: Arc<std::sync::atomic::AtomicBool>,
61 executor: Arc<dyn ProcessExecutor>,
62 check_interval: Duration,
63) -> MonitorResult {
64 monitor_idle_timeout_with_interval_and_kill_config(
65 activity_timestamp,
66 child,
67 timeout_secs,
68 should_stop,
69 executor,
70 check_interval,
71 DEFAULT_KILL_CONFIG,
72 )
73}
74
75pub fn monitor_idle_timeout_with_interval_and_kill_config(
76 activity_timestamp: SharedActivityTimestamp,
77 child: Arc<std::sync::Mutex<Box<dyn AgentChild>>>,
78 timeout_secs: u64,
79 should_stop: Arc<std::sync::atomic::AtomicBool>,
80 executor: Arc<dyn ProcessExecutor>,
81 check_interval: Duration,
82 kill_config: KillConfig,
83) -> MonitorResult {
84 use std::sync::atomic::Ordering;
85
86 #[derive(Debug, Clone, Copy)]
87 struct TimeoutEnforcementState {
88 pid: u32,
89 escalated: bool,
90 last_sigkill_sent_at: Option<std::time::Instant>,
91 triggered_at: std::time::Instant,
92 }
93
94 let mut timeout_triggered: Option<TimeoutEnforcementState> = None;
95
96 loop {
97 if timeout_triggered.is_none() && should_stop.load(Ordering::Acquire) {
100 return MonitorResult::ProcessCompleted;
101 }
102
103 std::thread::sleep(check_interval);
104
105 if let Some(mut state) = timeout_triggered.take() {
106 let status = {
107 let mut locked_child = child
108 .lock()
109 .expect("child process mutex poisoned - indicates panic in another thread");
110 locked_child.try_wait()
111 };
112
113 match status {
114 Ok(Some(_)) => {
115 return MonitorResult::TimedOut {
116 escalated: state.escalated,
117 }
118 }
119 Ok(None) | Err(_) => {
120 let now = std::time::Instant::now();
121
122 if !state.escalated {
125 let _ = force_kill_best_effort(state.pid, executor.as_ref());
126 state.escalated = true;
127 state.last_sigkill_sent_at = Some(now);
128 } else {
129 let should_resend = match state.last_sigkill_sent_at {
130 None => true,
131 Some(t) => {
132 now.duration_since(t) >= kill_config.sigkill_resend_interval()
133 }
134 };
135 if should_resend {
136 let _ = force_kill_best_effort(state.pid, executor.as_ref());
137 state.last_sigkill_sent_at = Some(now);
138 }
139 }
140
141 if now.duration_since(state.triggered_at) >= kill_config.post_sigkill_hard_cap()
145 && state.escalated
146 {
147 let child_for_reaper = Arc::clone(&child);
148 let executor_for_reaper = Arc::clone(&executor);
149 let should_stop_for_reaper = Arc::clone(&should_stop);
150 let config_for_reaper = kill_config;
151 let pid = state.pid;
152 std::thread::spawn(move || {
153 let deadline = std::time::Instant::now()
157 + config_for_reaper.post_sigkill_hard_cap();
158 let mut last_kill_sent_at: Option<std::time::Instant> = None;
159
160 while std::time::Instant::now() < deadline {
161 if should_stop_for_reaper.load(Ordering::Acquire) {
162 return;
163 }
164
165 let status = {
166 let mut locked_child = child_for_reaper.lock()
167 .expect("child process mutex poisoned - indicates panic in another thread");
168 locked_child.try_wait()
169 };
170
171 match status {
172 Ok(Some(_)) => return,
173 Ok(None) | Err(_) => {
174 let now = std::time::Instant::now();
175 let should_resend = match last_kill_sent_at {
176 None => true,
177 Some(t) => {
178 now.duration_since(t)
179 >= config_for_reaper.sigkill_resend_interval()
180 }
181 };
182 if should_resend {
183 let _ = force_kill_best_effort(
184 pid,
185 executor_for_reaper.as_ref(),
186 );
187 last_kill_sent_at = Some(now);
188 }
189 std::thread::sleep(config_for_reaper.poll_interval());
190 }
191 }
192 }
193 });
194
195 return MonitorResult::TimedOut {
196 escalated: state.escalated,
197 };
198 }
199
200 timeout_triggered = Some(state);
201 continue;
202 }
203 }
204 }
205
206 if !is_idle_timeout_exceeded(&activity_timestamp, timeout_secs) {
207 continue;
208 }
209
210 let child_id = {
211 let mut locked_child = child
212 .lock()
213 .expect("child process mutex poisoned - indicates panic in another thread");
214 if let Ok(Some(_)) = locked_child.try_wait() {
215 return MonitorResult::ProcessCompleted;
216 }
217 locked_child.id()
218 };
219
220 let kill_result = kill_process(child_id, executor.as_ref(), Some(&child), kill_config);
221 match kill_result {
222 KillResult::TerminatedByTerm => return MonitorResult::TimedOut { escalated: false },
223 KillResult::TerminatedByKill => return MonitorResult::TimedOut { escalated: true },
224 KillResult::SignalsSentAwaitingExit { escalated } => {
225 timeout_triggered = Some(TimeoutEnforcementState {
226 pid: child_id,
227 escalated,
228 triggered_at: std::time::Instant::now(),
229 last_sigkill_sent_at: escalated.then_some(std::time::Instant::now()),
230 });
231 }
232 KillResult::Failed => {
233 if should_stop.load(Ordering::Acquire) {
234 return MonitorResult::ProcessCompleted;
235 }
236 }
237 }
238 }
239}