ralph_workflow/pipeline/
idle_timeout.rs1use std::io::{self, Read};
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, SystemTime, UNIX_EPOCH};
25
26use crate::executor::ProcessExecutor;
27
28pub const IDLE_TIMEOUT_SECS: u64 = 300;
35
36pub type SharedActivityTimestamp = Arc<AtomicU64>;
40
41pub fn new_activity_timestamp() -> SharedActivityTimestamp {
43 let now_ms = SystemTime::now()
44 .duration_since(UNIX_EPOCH)
45 .unwrap_or(Duration::ZERO)
46 .as_millis() as u64;
47 Arc::new(AtomicU64::new(now_ms))
48}
49
50pub fn touch_activity(timestamp: &SharedActivityTimestamp) {
52 let now_ms = SystemTime::now()
53 .duration_since(UNIX_EPOCH)
54 .unwrap_or(Duration::ZERO)
55 .as_millis() as u64;
56 timestamp.store(now_ms, Ordering::Release);
57}
58
59pub fn time_since_activity(timestamp: &SharedActivityTimestamp) -> Duration {
61 let last_ms = timestamp.load(Ordering::Acquire);
62 let now_ms = SystemTime::now()
63 .duration_since(UNIX_EPOCH)
64 .unwrap_or(Duration::ZERO)
65 .as_millis() as u64;
66
67 Duration::from_millis(now_ms.saturating_sub(last_ms))
68}
69
70pub fn is_idle_timeout_exceeded(timestamp: &SharedActivityTimestamp, timeout_secs: u64) -> bool {
72 time_since_activity(timestamp) > Duration::from_secs(timeout_secs)
73}
74
75pub struct ActivityTrackingReader<R: Read> {
81 inner: R,
82 activity_timestamp: SharedActivityTimestamp,
83}
84
85impl<R: Read> ActivityTrackingReader<R> {
86 pub fn new(inner: R, activity_timestamp: SharedActivityTimestamp) -> Self {
91 touch_activity(&activity_timestamp);
93 Self {
94 inner,
95 activity_timestamp,
96 }
97 }
98}
99
100impl<R: Read> Read for ActivityTrackingReader<R> {
101 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
102 let n = self.inner.read(buf)?;
103 if n > 0 {
104 touch_activity(&self.activity_timestamp);
105 }
106 Ok(n)
107 }
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum MonitorResult {
113 ProcessCompleted,
115 TimedOut,
117}
118
119pub fn monitor_idle_timeout(
141 activity_timestamp: SharedActivityTimestamp,
142 child_id: u32,
143 timeout_secs: u64,
144 should_stop: Arc<std::sync::atomic::AtomicBool>,
145 executor: Arc<dyn ProcessExecutor>,
146) -> MonitorResult {
147 use std::sync::atomic::Ordering;
148
149 const CHECK_INTERVAL: Duration = Duration::from_secs(1);
151
152 loop {
153 std::thread::sleep(CHECK_INTERVAL);
154
155 if should_stop.load(Ordering::Acquire) {
157 return MonitorResult::ProcessCompleted;
158 }
159
160 if is_idle_timeout_exceeded(&activity_timestamp, timeout_secs) {
162 let killed = kill_process(child_id, executor.as_ref());
164 if killed {
165 return MonitorResult::TimedOut;
166 }
167 if should_stop.load(Ordering::Acquire) {
169 return MonitorResult::ProcessCompleted;
170 }
171 }
173 }
174}
175
176#[cfg(unix)]
180fn kill_process(pid: u32, executor: &dyn ProcessExecutor) -> bool {
181 executor
183 .execute("kill", &["-TERM", &pid.to_string()], &[], None)
184 .map(|o| o.status.success())
185 .unwrap_or(false)
186}
187
188#[cfg(windows)]
192fn kill_process(pid: u32, executor: &dyn ProcessExecutor) -> bool {
193 executor
195 .execute("taskkill", &["/F", "/PID", &pid.to_string()], &[], None)
196 .map(|o| o.status.success())
197 .unwrap_or(false)
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use std::io::Cursor;
204 use std::thread;
205
206 #[test]
207 fn test_new_activity_timestamp_is_recent() {
208 let timestamp = new_activity_timestamp();
209 let elapsed = time_since_activity(×tamp);
210 assert!(elapsed < Duration::from_millis(100));
212 }
213
214 #[test]
215 fn test_touch_activity_updates_timestamp() {
216 let timestamp = new_activity_timestamp();
217 thread::sleep(Duration::from_millis(50));
219 let before_touch = time_since_activity(×tamp);
220
221 touch_activity(×tamp);
223 let after_touch = time_since_activity(×tamp);
224
225 assert!(before_touch >= Duration::from_millis(50));
226 assert!(after_touch < Duration::from_millis(10));
227 }
228
229 #[test]
230 fn test_is_idle_timeout_exceeded_false_when_recent() {
231 let timestamp = new_activity_timestamp();
232 assert!(!is_idle_timeout_exceeded(×tamp, 1));
234 }
235
236 #[test]
237 fn test_is_idle_timeout_exceeded_true_after_timeout() {
238 let timestamp = new_activity_timestamp();
239 let two_secs_ago = SystemTime::now()
241 .duration_since(UNIX_EPOCH)
242 .unwrap()
243 .as_millis() as u64
244 - 2000;
245 timestamp.store(two_secs_ago, Ordering::Release);
246
247 assert!(is_idle_timeout_exceeded(×tamp, 1));
249 }
250
251 #[test]
252 fn test_activity_tracking_reader_updates_on_read() {
253 let data = b"hello world";
254 let cursor = Cursor::new(data.to_vec());
255 let timestamp = new_activity_timestamp();
256
257 let mut reader = ActivityTrackingReader::new(cursor, timestamp.clone());
258
259 let one_sec_ago = SystemTime::now()
262 .duration_since(UNIX_EPOCH)
263 .unwrap()
264 .as_millis() as u64
265 - 1000;
266 timestamp.store(one_sec_ago, Ordering::Release);
267
268 assert!(time_since_activity(×tamp) >= Duration::from_millis(900));
270
271 let mut buf = [0u8; 5];
273 let n = reader.read(&mut buf).unwrap();
274 assert_eq!(n, 5);
275
276 assert!(time_since_activity(×tamp) < Duration::from_millis(100));
278 }
279
280 #[test]
281 fn test_activity_tracking_reader_no_update_on_zero_read() {
282 let data = b"";
283 let cursor = Cursor::new(data.to_vec());
284 let timestamp = new_activity_timestamp();
285
286 let one_sec_ago = SystemTime::now()
288 .duration_since(UNIX_EPOCH)
289 .unwrap()
290 .as_millis() as u64
291 - 1000;
292 timestamp.store(one_sec_ago, Ordering::Release);
293
294 let mut reader = ActivityTrackingReader::new(cursor, timestamp.clone());
295
296 let one_sec_ago = SystemTime::now()
298 .duration_since(UNIX_EPOCH)
299 .unwrap()
300 .as_millis() as u64
301 - 1000;
302 timestamp.store(one_sec_ago, Ordering::Release);
303
304 let mut buf = [0u8; 5];
306 let n = reader.read(&mut buf).unwrap();
307 assert_eq!(n, 0);
308
309 assert!(time_since_activity(×tamp) >= Duration::from_millis(900));
311 }
312
313 #[test]
314 fn test_activity_tracking_reader_passes_through_data() {
315 let data = b"hello world";
316 let cursor = Cursor::new(data.to_vec());
317 let timestamp = new_activity_timestamp();
318
319 let mut reader = ActivityTrackingReader::new(cursor, timestamp);
320
321 let mut buf = [0u8; 20];
322 let n = reader.read(&mut buf).unwrap();
323
324 assert_eq!(n, 11);
325 assert_eq!(&buf[..n], b"hello world");
326 }
327
328 #[test]
329 fn test_idle_timeout_constant_is_five_minutes() {
330 assert_eq!(IDLE_TIMEOUT_SECS, 300);
331 }
332
333 #[test]
334 fn test_monitor_result_variants() {
335 assert_ne!(MonitorResult::ProcessCompleted, MonitorResult::TimedOut);
337 }
338
339 #[test]
340 fn test_monitor_stops_when_signaled() {
341 use std::sync::atomic::AtomicBool;
342
343 let timestamp = new_activity_timestamp();
344 let should_stop = Arc::new(AtomicBool::new(false));
345 let should_stop_clone = should_stop.clone();
346
347 let fake_pid = 0u32;
349
350 let executor: Arc<dyn crate::executor::ProcessExecutor> =
352 Arc::new(crate::executor::MockProcessExecutor::new());
353
354 let handle = thread::spawn(move || {
356 monitor_idle_timeout(timestamp, fake_pid, 60, should_stop_clone, executor)
357 });
358
359 thread::sleep(Duration::from_millis(50));
361 should_stop.store(true, std::sync::atomic::Ordering::Release);
362
363 let result = handle.join().expect("Monitor thread panicked");
365 assert_eq!(result, MonitorResult::ProcessCompleted);
366 }
367}