Skip to main content

ralph_workflow/pipeline/idle_timeout/
readers.rs

1use super::{touch_activity, SharedActivityTimestamp};
2
3/// A reader wrapper that updates an activity timestamp on every read.
4///
5/// Wraps any `Read` implementation and updates a shared atomic timestamp
6/// whenever data is successfully read. This allows external monitoring of
7/// read activity for idle timeout detection.
8pub struct ActivityTrackingReader<R: std::io::Read> {
9    inner: R,
10    activity_timestamp: SharedActivityTimestamp,
11}
12
13impl<R: std::io::Read> ActivityTrackingReader<R> {
14    /// Create a new activity-tracking reader.
15    ///
16    /// The provided timestamp will be updated to the current time
17    /// whenever data is successfully read from the inner reader.
18    pub fn new(inner: R, activity_timestamp: SharedActivityTimestamp) -> Self {
19        touch_activity(&activity_timestamp);
20        Self {
21            inner,
22            activity_timestamp,
23        }
24    }
25}
26
27impl<R: std::io::Read> std::io::Read for ActivityTrackingReader<R> {
28    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
29        let n = std::io::Read::read(&mut self.inner, buf)?;
30        if n > 0 {
31            touch_activity(&self.activity_timestamp);
32        }
33        Ok(n)
34    }
35}
36
37/// A reader wrapper for stderr that updates an activity timestamp on every read.
38///
39/// This is similar to `ActivityTrackingReader` but designed specifically for
40/// stderr tracking in a separate thread. It shares the same activity timestamp
41/// as the stdout tracker, ensuring that any output (stdout OR stderr) prevents
42/// idle timeout kills.
43pub struct StderrActivityTracker<R: std::io::Read> {
44    inner: R,
45    activity_timestamp: SharedActivityTimestamp,
46}
47
48impl<R: std::io::Read> StderrActivityTracker<R> {
49    pub const fn new(inner: R, activity_timestamp: SharedActivityTimestamp) -> Self {
50        Self {
51            inner,
52            activity_timestamp,
53        }
54    }
55}
56
57impl<R: std::io::Read> std::io::Read for StderrActivityTracker<R> {
58    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
59        let n = std::io::Read::read(&mut self.inner, buf)?;
60        if n > 0 {
61            touch_activity(&self.activity_timestamp);
62        }
63        Ok(n)
64    }
65}