Skip to main content

ralph_workflow/pipeline/idle_timeout/
readers.rs

1use super::{touch_activity, SharedActivityTimestamp};
2use std::io::{self, Read};
3
4/// A reader wrapper that updates an activity timestamp on every read.
5///
6/// Wraps any `Read` implementation and updates a shared atomic timestamp
7/// whenever data is successfully read. This allows external monitoring of
8/// read activity for idle timeout detection.
9pub struct ActivityTrackingReader<R: Read> {
10    inner: R,
11    activity_timestamp: SharedActivityTimestamp,
12}
13
14impl<R: Read> ActivityTrackingReader<R> {
15    /// Create a new activity-tracking reader.
16    ///
17    /// The provided timestamp will be updated to the current time
18    /// whenever data is successfully read from the inner reader.
19    pub fn new(inner: R, activity_timestamp: SharedActivityTimestamp) -> Self {
20        touch_activity(&activity_timestamp);
21        Self {
22            inner,
23            activity_timestamp,
24        }
25    }
26}
27
28impl<R: Read> Read for ActivityTrackingReader<R> {
29    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
30        let n = self.inner.read(buf)?;
31        if n > 0 {
32            touch_activity(&self.activity_timestamp);
33        }
34        Ok(n)
35    }
36}
37
38/// A reader wrapper for stderr that updates an activity timestamp on every read.
39///
40/// This is similar to `ActivityTrackingReader` but designed specifically for
41/// stderr tracking in a separate thread. It shares the same activity timestamp
42/// as the stdout tracker, ensuring that any output (stdout OR stderr) prevents
43/// idle timeout kills.
44pub struct StderrActivityTracker<R: Read> {
45    inner: R,
46    activity_timestamp: SharedActivityTimestamp,
47}
48
49impl<R: Read> StderrActivityTracker<R> {
50    pub fn new(inner: R, activity_timestamp: SharedActivityTimestamp) -> Self {
51        Self {
52            inner,
53            activity_timestamp,
54        }
55    }
56}
57
58impl<R: Read> Read for StderrActivityTracker<R> {
59    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
60        let n = self.inner.read(buf)?;
61        if n > 0 {
62            touch_activity(&self.activity_timestamp);
63        }
64        Ok(n)
65    }
66}