nuviz_cli/watcher/
tail.rs1use std::fs::File;
2use std::io::{BufRead, BufReader, Seek, SeekFrom};
3use std::path::{Path, PathBuf};
4
5use crate::data::metrics::MetricRecord;
6
7pub struct TailReader {
12 path: PathBuf,
13 position: u64,
14}
15
16impl TailReader {
17 pub fn new(path: &Path) -> Self {
18 Self {
19 path: path.to_path_buf(),
20 position: 0,
21 }
22 }
23
24 #[allow(dead_code)]
26 pub fn from_end(path: &Path) -> Self {
27 let position = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
28 Self {
29 path: path.to_path_buf(),
30 position,
31 }
32 }
33
34 pub fn read_new(&mut self) -> Vec<MetricRecord> {
36 let file = match File::open(&self.path) {
37 Ok(f) => f,
38 Err(_) => return Vec::new(),
39 };
40
41 let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
42
43 if file_len < self.position {
45 self.position = 0;
46 }
47
48 if file_len == self.position {
50 return Vec::new();
51 }
52
53 let mut reader = BufReader::new(file);
54 if self.position > 0 && reader.seek(SeekFrom::Start(self.position)).is_err() {
55 return Vec::new();
56 }
57
58 let mut records = Vec::new();
59 let mut line = String::new();
60
61 loop {
62 line.clear();
63 match reader.read_line(&mut line) {
64 Ok(0) => break, Ok(n) => {
66 self.position += n as u64;
67 let trimmed = line.trim();
68 if !trimmed.is_empty() {
69 if let Ok(record) = serde_json::from_str::<MetricRecord>(trimmed) {
70 records.push(record);
71 }
72 }
73 }
74 Err(_) => break,
75 }
76 }
77
78 records
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use std::io::Write;
86
87 #[test]
88 fn test_read_new_lines() {
89 let dir = tempfile::tempdir().unwrap();
90 let path = dir.path().join("metrics.jsonl");
91
92 {
94 let mut f = File::create(&path).unwrap();
95 writeln!(
96 f,
97 r#"{{"step":0,"timestamp":1.0,"metrics":{{"loss":1.0}}}}"#
98 )
99 .unwrap();
100 }
101
102 let mut reader = TailReader::new(&path);
103 let records = reader.read_new();
104 assert_eq!(records.len(), 1);
105 assert_eq!(records[0].step, 0);
106
107 let records = reader.read_new();
109 assert!(records.is_empty());
110
111 {
113 let mut f = std::fs::OpenOptions::new()
114 .append(true)
115 .open(&path)
116 .unwrap();
117 writeln!(
118 f,
119 r#"{{"step":1,"timestamp":2.0,"metrics":{{"loss":0.5}}}}"#
120 )
121 .unwrap();
122 writeln!(
123 f,
124 r#"{{"step":2,"timestamp":3.0,"metrics":{{"loss":0.3}}}}"#
125 )
126 .unwrap();
127 }
128
129 let records = reader.read_new();
130 assert_eq!(records.len(), 2);
131 assert_eq!(records[0].step, 1);
132 assert_eq!(records[1].step, 2);
133 }
134
135 #[test]
136 fn test_from_end_skips_existing() {
137 let dir = tempfile::tempdir().unwrap();
138 let path = dir.path().join("metrics.jsonl");
139
140 {
141 let mut f = File::create(&path).unwrap();
142 writeln!(
143 f,
144 r#"{{"step":0,"timestamp":1.0,"metrics":{{"loss":1.0}}}}"#
145 )
146 .unwrap();
147 }
148
149 let mut reader = TailReader::from_end(&path);
150 let records = reader.read_new();
151 assert!(records.is_empty());
152
153 {
155 let mut f = std::fs::OpenOptions::new()
156 .append(true)
157 .open(&path)
158 .unwrap();
159 writeln!(
160 f,
161 r#"{{"step":1,"timestamp":2.0,"metrics":{{"loss":0.5}}}}"#
162 )
163 .unwrap();
164 }
165
166 let records = reader.read_new();
167 assert_eq!(records.len(), 1);
168 assert_eq!(records[0].step, 1);
169 }
170
171 #[test]
172 fn test_missing_file() {
173 let mut reader = TailReader::new(Path::new("/nonexistent/metrics.jsonl"));
174 let records = reader.read_new();
175 assert!(records.is_empty());
176 }
177
178 #[test]
179 fn test_truncated_file_resets() {
180 let dir = tempfile::tempdir().unwrap();
181 let path = dir.path().join("metrics.jsonl");
182
183 {
184 let mut f = File::create(&path).unwrap();
185 writeln!(
186 f,
187 r#"{{"step":0,"timestamp":1.0,"metrics":{{"loss":1.0}}}}"#
188 )
189 .unwrap();
190 writeln!(
191 f,
192 r#"{{"step":1,"timestamp":2.0,"metrics":{{"loss":0.5}}}}"#
193 )
194 .unwrap();
195 }
196
197 let mut reader = TailReader::new(&path);
198 reader.read_new(); File::create(&path).unwrap();
202 {
203 let mut f = std::fs::OpenOptions::new()
204 .append(true)
205 .open(&path)
206 .unwrap();
207 writeln!(
208 f,
209 r#"{{"step":0,"timestamp":3.0,"metrics":{{"loss":0.9}}}}"#
210 )
211 .unwrap();
212 }
213
214 let records = reader.read_new();
215 assert_eq!(records.len(), 1);
216 assert_eq!(records[0].step, 0);
217 }
218
219 #[test]
220 fn test_malformed_lines_skipped() {
221 let dir = tempfile::tempdir().unwrap();
222 let path = dir.path().join("metrics.jsonl");
223
224 {
225 let mut f = File::create(&path).unwrap();
226 writeln!(
227 f,
228 r#"{{"step":0,"timestamp":1.0,"metrics":{{"loss":1.0}}}}"#
229 )
230 .unwrap();
231 writeln!(f, "this is garbage").unwrap();
232 writeln!(
233 f,
234 r#"{{"step":2,"timestamp":3.0,"metrics":{{"loss":0.3}}}}"#
235 )
236 .unwrap();
237 }
238
239 let mut reader = TailReader::new(&path);
240 let records = reader.read_new();
241 assert_eq!(records.len(), 2);
242 }
243}