Skip to main content

roder_tasks/
log_buffer.rs

1use std::collections::VecDeque;
2
3use roder_api::tasks::TaskOutputStream;
4use time::OffsetDateTime;
5
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub struct TaskLogEntry {
8    pub stream: TaskOutputStream,
9    pub chunk: String,
10    pub timestamp: OffsetDateTime,
11}
12
13#[derive(Debug, Clone)]
14pub struct BoundedLogBuffer {
15    max_bytes: usize,
16    entries: VecDeque<TaskLogEntry>,
17    current_bytes: usize,
18    dropped_bytes: u64,
19}
20
21impl BoundedLogBuffer {
22    pub fn new(max_bytes: usize) -> Self {
23        Self {
24            max_bytes,
25            entries: VecDeque::new(),
26            current_bytes: 0,
27            dropped_bytes: 0,
28        }
29    }
30
31    pub fn push(&mut self, stream: TaskOutputStream, mut chunk: String) -> u64 {
32        let mut newly_dropped = 0_u64;
33        if self.max_bytes == 0 {
34            newly_dropped = chunk.len() as u64;
35            self.dropped_bytes += newly_dropped;
36            return newly_dropped;
37        }
38
39        let chunk_len = chunk.len();
40        if chunk_len > self.max_bytes {
41            let drop_len = chunk_len - self.max_bytes;
42            chunk = chunk[drop_len..].to_string();
43            newly_dropped += drop_len as u64;
44        }
45
46        self.current_bytes += chunk.len();
47        self.entries.push_back(TaskLogEntry {
48            stream,
49            chunk,
50            timestamp: OffsetDateTime::now_utc(),
51        });
52
53        while self.current_bytes > self.max_bytes {
54            let Some(front) = self.entries.pop_front() else {
55                break;
56            };
57            let front_len = front.chunk.len();
58            self.current_bytes = self.current_bytes.saturating_sub(front_len);
59            newly_dropped += front_len as u64;
60        }
61
62        self.dropped_bytes += newly_dropped;
63        newly_dropped
64    }
65
66    pub fn entries(&self) -> Vec<TaskLogEntry> {
67        self.entries.iter().cloned().collect()
68    }
69
70    pub fn dropped_bytes(&self) -> u64 {
71        self.dropped_bytes
72    }
73
74    pub fn current_bytes(&self) -> usize {
75        self.current_bytes
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82
83    #[test]
84    fn bounded_log_buffer_drops_old_entries() {
85        let mut buffer = BoundedLogBuffer::new(8);
86        assert_eq!(buffer.push(TaskOutputStream::Stdout, "abc".to_string()), 0);
87        assert_eq!(buffer.push(TaskOutputStream::Stdout, "def".to_string()), 0);
88        assert_eq!(buffer.push(TaskOutputStream::Stdout, "ghi".to_string()), 3);
89
90        let entries = buffer.entries();
91        assert_eq!(entries.len(), 2);
92        assert_eq!(entries[0].chunk, "def");
93        assert_eq!(entries[1].chunk, "ghi");
94        assert_eq!(buffer.dropped_bytes(), 3);
95        assert_eq!(buffer.current_bytes(), 6);
96    }
97
98    #[test]
99    fn bounded_log_buffer_trims_oversized_chunks() {
100        let mut buffer = BoundedLogBuffer::new(4);
101        assert_eq!(
102            buffer.push(TaskOutputStream::Stderr, "abcdef".to_string()),
103            2
104        );
105
106        let entries = buffer.entries();
107        assert_eq!(entries.len(), 1);
108        assert_eq!(entries[0].chunk, "cdef");
109        assert_eq!(buffer.dropped_bytes(), 2);
110        assert_eq!(buffer.current_bytes(), 4);
111    }
112}