binocular/preview/structured_log/
watcher.rs1use super::{parse_line, LogEntry, LogFormat};
2use crate::infra::channel::Sender;
3use std::io::{BufRead, BufReader, Seek, SeekFrom};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8const POLL_MS: Duration = Duration::from_millis(250);
9
10pub fn spawn_log_watcher(
11 path: String,
12 format: LogFormat,
13 start_offset: u64,
14 stop: Arc<AtomicBool>,
15 tx: impl Sender<(String, Vec<LogEntry>)> + 'static,
16) {
17 std::thread::spawn(move || {
18 let Ok(file) = std::fs::File::open(&path) else {
19 return;
20 };
21 let mut reader = BufReader::new(file);
22 let _ = reader.seek(SeekFrom::Start(start_offset));
23
24 let mut line_buf = String::new();
25
26 loop {
27 if stop.load(Ordering::Relaxed) {
28 break;
29 }
30
31 std::thread::sleep(POLL_MS);
32
33 if stop.load(Ordering::Relaxed) {
34 break;
35 }
36
37 let mut new_entries: Vec<LogEntry> = Vec::new();
38
39 loop {
40 line_buf.clear();
41 match reader.read_line(&mut line_buf) {
42 Ok(0) => break, Ok(_) => {
44 if line_buf.ends_with('\n') {
45 if let Some(entry) = parse_line(line_buf.trim_end(), &format) {
47 new_entries.push(entry);
48 }
49 } else {
50 let back = line_buf.len() as i64;
52 let _ = reader.seek(SeekFrom::Current(-back));
53 break;
54 }
55 }
56 Err(_) => break,
57 }
58 }
59
60 if !new_entries.is_empty() {
61 if tx.send((path.clone(), new_entries)).is_err() {
62 break; }
64 }
65 }
66 });
67}