Skip to main content

binocular/preview/structured_log/
watcher.rs

1use super::{parse_line, LogEntry, LogFormat};
2use crate::infra::channel::Sender;
3use std::io::{BufRead, BufReader, Seek, SeekFrom};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8const POLL_MS: Duration = Duration::from_millis(250);
9
10pub fn spawn_log_watcher(
11    path: String,
12    format: LogFormat,
13    start_offset: u64,
14    stop: Arc<AtomicBool>,
15    tx: impl Sender<(String, Vec<LogEntry>)> + 'static,
16) {
17    std::thread::spawn(move || {
18        let Ok(file) = std::fs::File::open(&path) else {
19            return;
20        };
21        let mut reader = BufReader::new(file);
22        let _ = reader.seek(SeekFrom::Start(start_offset));
23
24        let mut line_buf = String::new();
25
26        loop {
27            if stop.load(Ordering::Relaxed) {
28                break;
29            }
30
31            std::thread::sleep(POLL_MS);
32
33            if stop.load(Ordering::Relaxed) {
34                break;
35            }
36
37            let mut new_entries: Vec<LogEntry> = Vec::new();
38
39            loop {
40                line_buf.clear();
41                match reader.read_line(&mut line_buf) {
42                    Ok(0) => break, // EOF. no new data yet
43                    Ok(_) => {
44                        if line_buf.ends_with('\n') {
45                            // Complete line
46                            if let Some(entry) = parse_line(line_buf.trim_end(), &format) {
47                                new_entries.push(entry);
48                            }
49                        } else {
50                            // Incomplete line, seek back and wait for more data.
51                            let back = line_buf.len() as i64;
52                            let _ = reader.seek(SeekFrom::Current(-back));
53                            break;
54                        }
55                    }
56                    Err(_) => break,
57                }
58            }
59
60            if !new_entries.is_empty() {
61                if tx.send((path.clone(), new_entries)).is_err() {
62                    break; // Receiver gone, app is closing.
63                }
64            }
65        }
66    });
67}