roder_tasks/
log_buffer.rs1use std::collections::VecDeque;
2
3use roder_api::tasks::TaskOutputStream;
4use time::OffsetDateTime;
5
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub struct TaskLogEntry {
8 pub stream: TaskOutputStream,
9 pub chunk: String,
10 pub timestamp: OffsetDateTime,
11}
12
13#[derive(Debug, Clone)]
14pub struct BoundedLogBuffer {
15 max_bytes: usize,
16 entries: VecDeque<TaskLogEntry>,
17 current_bytes: usize,
18 dropped_bytes: u64,
19}
20
21impl BoundedLogBuffer {
22 pub fn new(max_bytes: usize) -> Self {
23 Self {
24 max_bytes,
25 entries: VecDeque::new(),
26 current_bytes: 0,
27 dropped_bytes: 0,
28 }
29 }
30
31 pub fn push(&mut self, stream: TaskOutputStream, mut chunk: String) -> u64 {
32 let mut newly_dropped = 0_u64;
33 if self.max_bytes == 0 {
34 newly_dropped = chunk.len() as u64;
35 self.dropped_bytes += newly_dropped;
36 return newly_dropped;
37 }
38
39 let chunk_len = chunk.len();
40 if chunk_len > self.max_bytes {
41 let drop_len = chunk_len - self.max_bytes;
42 chunk = chunk[drop_len..].to_string();
43 newly_dropped += drop_len as u64;
44 }
45
46 self.current_bytes += chunk.len();
47 self.entries.push_back(TaskLogEntry {
48 stream,
49 chunk,
50 timestamp: OffsetDateTime::now_utc(),
51 });
52
53 while self.current_bytes > self.max_bytes {
54 let Some(front) = self.entries.pop_front() else {
55 break;
56 };
57 let front_len = front.chunk.len();
58 self.current_bytes = self.current_bytes.saturating_sub(front_len);
59 newly_dropped += front_len as u64;
60 }
61
62 self.dropped_bytes += newly_dropped;
63 newly_dropped
64 }
65
66 pub fn entries(&self) -> Vec<TaskLogEntry> {
67 self.entries.iter().cloned().collect()
68 }
69
70 pub fn dropped_bytes(&self) -> u64 {
71 self.dropped_bytes
72 }
73
74 pub fn current_bytes(&self) -> usize {
75 self.current_bytes
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82
83 #[test]
84 fn bounded_log_buffer_drops_old_entries() {
85 let mut buffer = BoundedLogBuffer::new(8);
86 assert_eq!(buffer.push(TaskOutputStream::Stdout, "abc".to_string()), 0);
87 assert_eq!(buffer.push(TaskOutputStream::Stdout, "def".to_string()), 0);
88 assert_eq!(buffer.push(TaskOutputStream::Stdout, "ghi".to_string()), 3);
89
90 let entries = buffer.entries();
91 assert_eq!(entries.len(), 2);
92 assert_eq!(entries[0].chunk, "def");
93 assert_eq!(entries[1].chunk, "ghi");
94 assert_eq!(buffer.dropped_bytes(), 3);
95 assert_eq!(buffer.current_bytes(), 6);
96 }
97
98 #[test]
99 fn bounded_log_buffer_trims_oversized_chunks() {
100 let mut buffer = BoundedLogBuffer::new(4);
101 assert_eq!(
102 buffer.push(TaskOutputStream::Stderr, "abcdef".to_string()),
103 2
104 );
105
106 let entries = buffer.entries();
107 assert_eq!(entries.len(), 1);
108 assert_eq!(entries[0].chunk, "cdef");
109 assert_eq!(buffer.dropped_bytes(), 2);
110 assert_eq!(buffer.current_bytes(), 4);
111 }
112}