Skip to main content

nuviz_cli/watcher/
tail.rs

1use std::fs::File;
2use std::io::{BufRead, BufReader, Seek, SeekFrom};
3use std::path::{Path, PathBuf};
4
5use crate::data::metrics::MetricRecord;
6
7/// Incrementally reads new lines from a JSONL file.
8///
9/// Tracks the file position so that subsequent calls to `read_new()`
10/// return only lines appended since the last read.
11pub struct TailReader {
12    path: PathBuf,
13    position: u64,
14}
15
16impl TailReader {
17    pub fn new(path: &Path) -> Self {
18        Self {
19            path: path.to_path_buf(),
20            position: 0,
21        }
22    }
23
24    /// Create a TailReader starting from the end of the current file.
25    #[allow(dead_code)]
26    pub fn from_end(path: &Path) -> Self {
27        let position = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
28        Self {
29            path: path.to_path_buf(),
30            position,
31        }
32    }
33
34    /// Read all new lines since the last read.
35    pub fn read_new(&mut self) -> Vec<MetricRecord> {
36        let file = match File::open(&self.path) {
37            Ok(f) => f,
38            Err(_) => return Vec::new(),
39        };
40
41        let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
42
43        // File was truncated — reset position
44        if file_len < self.position {
45            self.position = 0;
46        }
47
48        // No new data
49        if file_len == self.position {
50            return Vec::new();
51        }
52
53        let mut reader = BufReader::new(file);
54        if self.position > 0 && reader.seek(SeekFrom::Start(self.position)).is_err() {
55            return Vec::new();
56        }
57
58        let mut records = Vec::new();
59        let mut line = String::new();
60
61        loop {
62            line.clear();
63            match reader.read_line(&mut line) {
64                Ok(0) => break, // EOF
65                Ok(n) => {
66                    self.position += n as u64;
67                    let trimmed = line.trim();
68                    if !trimmed.is_empty() {
69                        if let Ok(record) = serde_json::from_str::<MetricRecord>(trimmed) {
70                            records.push(record);
71                        }
72                    }
73                }
74                Err(_) => break,
75            }
76        }
77
78        records
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use std::io::Write;
86
87    #[test]
88    fn test_read_new_lines() {
89        let dir = tempfile::tempdir().unwrap();
90        let path = dir.path().join("metrics.jsonl");
91
92        // Write initial data
93        {
94            let mut f = File::create(&path).unwrap();
95            writeln!(
96                f,
97                r#"{{"step":0,"timestamp":1.0,"metrics":{{"loss":1.0}}}}"#
98            )
99            .unwrap();
100        }
101
102        let mut reader = TailReader::new(&path);
103        let records = reader.read_new();
104        assert_eq!(records.len(), 1);
105        assert_eq!(records[0].step, 0);
106
107        // Read again — no new data
108        let records = reader.read_new();
109        assert!(records.is_empty());
110
111        // Append more data
112        {
113            let mut f = std::fs::OpenOptions::new()
114                .append(true)
115                .open(&path)
116                .unwrap();
117            writeln!(
118                f,
119                r#"{{"step":1,"timestamp":2.0,"metrics":{{"loss":0.5}}}}"#
120            )
121            .unwrap();
122            writeln!(
123                f,
124                r#"{{"step":2,"timestamp":3.0,"metrics":{{"loss":0.3}}}}"#
125            )
126            .unwrap();
127        }
128
129        let records = reader.read_new();
130        assert_eq!(records.len(), 2);
131        assert_eq!(records[0].step, 1);
132        assert_eq!(records[1].step, 2);
133    }
134
135    #[test]
136    fn test_from_end_skips_existing() {
137        let dir = tempfile::tempdir().unwrap();
138        let path = dir.path().join("metrics.jsonl");
139
140        {
141            let mut f = File::create(&path).unwrap();
142            writeln!(
143                f,
144                r#"{{"step":0,"timestamp":1.0,"metrics":{{"loss":1.0}}}}"#
145            )
146            .unwrap();
147        }
148
149        let mut reader = TailReader::from_end(&path);
150        let records = reader.read_new();
151        assert!(records.is_empty());
152
153        // Append new data
154        {
155            let mut f = std::fs::OpenOptions::new()
156                .append(true)
157                .open(&path)
158                .unwrap();
159            writeln!(
160                f,
161                r#"{{"step":1,"timestamp":2.0,"metrics":{{"loss":0.5}}}}"#
162            )
163            .unwrap();
164        }
165
166        let records = reader.read_new();
167        assert_eq!(records.len(), 1);
168        assert_eq!(records[0].step, 1);
169    }
170
171    #[test]
172    fn test_missing_file() {
173        let mut reader = TailReader::new(Path::new("/nonexistent/metrics.jsonl"));
174        let records = reader.read_new();
175        assert!(records.is_empty());
176    }
177
178    #[test]
179    fn test_truncated_file_resets() {
180        let dir = tempfile::tempdir().unwrap();
181        let path = dir.path().join("metrics.jsonl");
182
183        {
184            let mut f = File::create(&path).unwrap();
185            writeln!(
186                f,
187                r#"{{"step":0,"timestamp":1.0,"metrics":{{"loss":1.0}}}}"#
188            )
189            .unwrap();
190            writeln!(
191                f,
192                r#"{{"step":1,"timestamp":2.0,"metrics":{{"loss":0.5}}}}"#
193            )
194            .unwrap();
195        }
196
197        let mut reader = TailReader::new(&path);
198        reader.read_new(); // Read all
199
200        // Truncate file (simulate rotation)
201        File::create(&path).unwrap();
202        {
203            let mut f = std::fs::OpenOptions::new()
204                .append(true)
205                .open(&path)
206                .unwrap();
207            writeln!(
208                f,
209                r#"{{"step":0,"timestamp":3.0,"metrics":{{"loss":0.9}}}}"#
210            )
211            .unwrap();
212        }
213
214        let records = reader.read_new();
215        assert_eq!(records.len(), 1);
216        assert_eq!(records[0].step, 0);
217    }
218
219    #[test]
220    fn test_malformed_lines_skipped() {
221        let dir = tempfile::tempdir().unwrap();
222        let path = dir.path().join("metrics.jsonl");
223
224        {
225            let mut f = File::create(&path).unwrap();
226            writeln!(
227                f,
228                r#"{{"step":0,"timestamp":1.0,"metrics":{{"loss":1.0}}}}"#
229            )
230            .unwrap();
231            writeln!(f, "this is garbage").unwrap();
232            writeln!(
233                f,
234                r#"{{"step":2,"timestamp":3.0,"metrics":{{"loss":0.3}}}}"#
235            )
236            .unwrap();
237        }
238
239        let mut reader = TailReader::new(&path);
240        let records = reader.read_new();
241        assert_eq!(records.len(), 2);
242    }
243}