use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::Path;
use anyhow::{Context, Result};
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use tokio::sync::mpsc;
use arcbox_protocol::agent::LogEntry;
const READ_BUFFER_SIZE: usize = 8192;
#[derive(Debug, Clone)]
pub struct LogWatchOptions {
pub stdout: bool,
pub stderr: bool,
pub timestamps: bool,
pub tail: i64,
pub since: i64,
pub until: i64,
}
impl Default for LogWatchOptions {
fn default() -> Self {
Self {
stdout: true,
stderr: true,
timestamps: false,
tail: 0,
since: 0,
until: 0,
}
}
}
pub async fn watch_log_file(
log_path: impl AsRef<Path>,
options: LogWatchOptions,
cancel: mpsc::Receiver<()>,
) -> Result<mpsc::Receiver<LogEntry>> {
let log_path = log_path.as_ref().to_path_buf();
let (tx, rx) = mpsc::channel::<LogEntry>(64);
tokio::spawn(async move {
if let Err(e) = run_watcher(log_path, options, tx, cancel).await {
tracing::error!("Log watcher error: {}", e);
}
});
Ok(rx)
}
async fn run_watcher(
log_path: std::path::PathBuf,
options: LogWatchOptions,
tx: mpsc::Sender<LogEntry>,
mut cancel: mpsc::Receiver<()>,
) -> Result<()> {
let mut file = loop {
match std::fs::File::open(&log_path) {
Ok(f) => break f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tokio::select! {
_ = cancel.recv() => {
tracing::debug!("Log watcher cancelled while waiting for file");
return Ok(());
}
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
continue;
}
}
}
Err(e) => return Err(e).context("failed to open log file"),
}
};
let initial_lines = read_with_tail(&mut file, options.tail)?;
for line in initial_lines {
let entry = parse_log_line(&line, &options);
if should_include_entry(&entry, &options) {
if tx.send(entry).await.is_err() {
return Ok(());
}
}
}
let mut pos = file.seek(SeekFrom::End(0))?;
let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<notify::Event>>(16);
let mut watcher = RecommendedWatcher::new(
move |res| {
let _ = notify_tx.blocking_send(res);
},
Config::default(),
)
.context("failed to create file watcher")?;
watcher
.watch(&log_path, RecursiveMode::NonRecursive)
.context("failed to watch log file")?;
tracing::debug!("Started watching log file: {:?}", log_path);
loop {
tokio::select! {
_ = cancel.recv() => {
tracing::debug!("Log watcher cancelled");
break;
}
Some(event) = notify_rx.recv() => {
match event {
Ok(event) => {
if event.kind.is_modify() || event.kind.is_create() {
if let Ok(new_lines) = read_new_content(&mut file, &mut pos) {
for line in new_lines {
let entry = parse_log_line(&line, &options);
if should_include_entry(&entry, &options) {
if tx.send(entry).await.is_err() {
return Ok(());
}
}
}
}
}
}
Err(e) => {
tracing::warn!("File watcher error: {}", e);
}
}
}
}
}
Ok(())
}
fn read_with_tail(file: &mut std::fs::File, tail: i64) -> Result<Vec<String>> {
let reader = BufReader::with_capacity(READ_BUFFER_SIZE, &*file);
let all_lines: Vec<String> = reader.lines().collect::<std::io::Result<Vec<_>>>()?;
if tail <= 0 || tail as usize >= all_lines.len() {
Ok(all_lines)
} else {
let start = all_lines.len() - tail as usize;
Ok(all_lines[start..].to_vec())
}
}
fn read_new_content(file: &mut std::fs::File, pos: &mut u64) -> Result<Vec<String>> {
file.seek(SeekFrom::Start(*pos))?;
let mut reader = BufReader::new(&*file);
let mut lines = Vec::new();
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) => break, Ok(_) => {
let trimmed = line.trim_end_matches(&['\n', '\r'][..]).to_string();
if !trimmed.is_empty() {
lines.push(trimmed);
}
}
Err(e) => {
tracing::warn!("Error reading log line: {}", e);
break;
}
}
}
*pos = file.seek(SeekFrom::Current(0))?;
Ok(lines)
}
fn parse_log_line(line: &str, options: &LogWatchOptions) -> LogEntry {
let now = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let (timestamp, stream, data) = parse_structured_line(line);
let final_timestamp = timestamp.unwrap_or(now);
let final_stream = stream.unwrap_or_else(|| "stdout".to_string());
let output_data = if options.timestamps {
let ts = chrono::DateTime::from_timestamp_nanos(final_timestamp);
format!("{} {}", ts.format("%Y-%m-%dT%H:%M:%S%.9fZ"), data)
} else {
data
};
LogEntry {
stream: final_stream,
data: output_data.into_bytes(),
timestamp: final_timestamp,
}
}
fn parse_structured_line(line: &str) -> (Option<i64>, Option<String>, String) {
if let Some((ts_str, rest)) = line.split_once(' ') {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts_str) {
let timestamp = dt.timestamp_nanos_opt();
if let Some((stream, msg)) = rest.split_once(": ") {
if stream == "stdout" || stream == "stderr" {
return (timestamp, Some(stream.to_string()), msg.to_string());
}
}
return (timestamp, None, rest.to_string());
}
}
if let Some((stream, msg)) = line.split_once(": ") {
if stream == "stdout" || stream == "stderr" {
return (None, Some(stream.to_string()), msg.to_string());
}
}
(None, None, line.to_string())
}
fn should_include_entry(entry: &LogEntry, options: &LogWatchOptions) -> bool {
if entry.stream == "stdout" && !options.stdout {
return false;
}
if entry.stream == "stderr" && !options.stderr {
return false;
}
let timestamp_secs = entry.timestamp / 1_000_000_000;
if options.since > 0 && timestamp_secs < options.since {
return false;
}
if options.until > 0 && timestamp_secs > options.until {
return false;
}
true
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_plain_line() {
let options = LogWatchOptions::default();
let entry = parse_log_line("hello world", &options);
assert_eq!(entry.stream, "stdout");
assert_eq!(String::from_utf8_lossy(&entry.data), "hello world");
}
#[test]
fn test_parse_line_with_stream_prefix() {
let options = LogWatchOptions::default();
let entry = parse_log_line("stderr: error message", &options);
assert_eq!(entry.stream, "stderr");
assert_eq!(String::from_utf8_lossy(&entry.data), "error message");
let entry = parse_log_line("stdout: info message", &options);
assert_eq!(entry.stream, "stdout");
assert_eq!(String::from_utf8_lossy(&entry.data), "info message");
}
#[test]
fn test_parse_line_with_timestamp() {
let options = LogWatchOptions::default();
let entry = parse_log_line("2024-01-15T10:30:00Z stdout: test message", &options);
assert_eq!(entry.stream, "stdout");
assert_eq!(String::from_utf8_lossy(&entry.data), "test message");
assert!(entry.timestamp > 0);
}
#[test]
fn test_should_include_entry_stdout_only() {
let options = LogWatchOptions {
stdout: true,
stderr: false,
..Default::default()
};
let stdout_entry = LogEntry {
stream: "stdout".to_string(),
data: vec![],
timestamp: 0,
};
let stderr_entry = LogEntry {
stream: "stderr".to_string(),
data: vec![],
timestamp: 0,
};
assert!(should_include_entry(&stdout_entry, &options));
assert!(!should_include_entry(&stderr_entry, &options));
}
#[test]
fn test_should_include_entry_time_filter() {
let options = LogWatchOptions {
since: 1000,
until: 2000,
..Default::default()
};
let before = LogEntry {
stream: "stdout".to_string(),
data: vec![],
timestamp: 500_000_000_000, };
let during = LogEntry {
stream: "stdout".to_string(),
data: vec![],
timestamp: 1500_000_000_000, };
let after = LogEntry {
stream: "stdout".to_string(),
data: vec![],
timestamp: 2500_000_000_000, };
assert!(!should_include_entry(&before, &options));
assert!(should_include_entry(&during, &options));
assert!(!should_include_entry(&after, &options));
}
#[test]
fn test_parse_structured_line() {
let (ts, stream, msg) = parse_structured_line("hello world");
assert!(ts.is_none());
assert!(stream.is_none());
assert_eq!(msg, "hello world");
let (ts, stream, msg) = parse_structured_line("stderr: error");
assert!(ts.is_none());
assert_eq!(stream, Some("stderr".to_string()));
assert_eq!(msg, "error");
let (ts, stream, msg) =
parse_structured_line("2024-01-15T10:30:00+00:00 stdout: message");
assert!(ts.is_some());
assert_eq!(stream, Some("stdout".to_string()));
assert_eq!(msg, "message");
}
}