use crate::logging_event::LogEventV1;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::PathBuf;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, Default)]
pub struct LogFilter {
pub agent: Option<String>,
pub level: Option<String>,
pub since: Option<Duration>,
pub limit: Option<usize>,
}
impl LogFilter {
pub fn matches(&self, event: &LogEventV1) -> bool {
if let Some(agent) = &self.agent {
if event.agent.as_deref() != Some(agent.as_str()) {
return false;
}
}
if let Some(level) = &self.level {
if !event.level.eq_ignore_ascii_case(level) {
return false;
}
}
if let Some(since) = self.since {
if !is_within_since(&event.ts, since) {
return false;
}
}
true
}
}
fn is_within_since(ts: &str, since: Duration) -> bool {
let Ok(event_time) = ts.parse::<DateTime<Utc>>() else {
return false;
};
let cutoff = Utc::now()
.checked_sub_signed(chrono::Duration::from_std(since).unwrap_or(chrono::Duration::zero()))
.unwrap_or(DateTime::<Utc>::from(SystemTime::UNIX_EPOCH));
event_time >= cutoff
}
pub struct LogReader {
path: PathBuf,
filter: LogFilter,
}
impl LogReader {
pub fn new(path: PathBuf, filter: LogFilter) -> Self {
Self { path, filter }
}
pub fn read_filtered(&self) -> Result<Vec<LogEventV1>> {
if !self.path.exists() {
return Ok(Vec::new());
}
let file = File::open(&self.path)
.with_context(|| format!("Failed to open log file: {}", self.path.display()))?;
let reader = BufReader::new(file);
let mut matched: Vec<LogEventV1> = Vec::new();
for line in reader.lines() {
let line =
line.with_context(|| format!("Failed to read log file: {}", self.path.display()))?;
let line = line.trim();
if line.is_empty() {
continue;
}
let Ok(event) = serde_json::from_str::<LogEventV1>(line) else {
continue;
};
if self.filter.matches(&event) {
matched.push(event);
}
}
if let Some(limit) = self.filter.limit {
if matched.len() > limit {
let start = matched.len() - limit;
matched = matched[start..].to_vec();
}
}
Ok(matched)
}
pub fn follow<F>(&self, mut callback: F) -> Result<()>
where
F: FnMut(&LogEventV1) -> bool,
{
let mut file = loop {
if self.path.exists() {
break File::open(&self.path).with_context(|| {
format!("Failed to open log file: {}", self.path.display())
})?;
}
std::thread::sleep(Duration::from_millis(500));
};
let mut pos = file
.seek(SeekFrom::End(0))
.context("Failed to seek to end of log file")?;
loop {
std::thread::sleep(Duration::from_millis(500));
let metadata = match std::fs::metadata(&self.path) {
Ok(m) => m,
Err(_) => continue, };
if metadata.len() < pos {
file = File::open(&self.path).with_context(|| {
format!("Failed to re-open log file: {}", self.path.display())
})?;
pos = 0;
}
file.seek(SeekFrom::Start(pos))
.context("Failed to seek log file")?;
let mut reader = BufReader::new(&file);
let mut new_bytes: u64 = 0;
let mut line = String::new();
loop {
let bytes = match reader.read_line(&mut line) {
Ok(0) => break,
Ok(n) => n,
Err(e) => return Err(e).context("Failed to read log file"),
};
new_bytes += bytes as u64;
if line.ends_with('\n') {
let trimmed = line.trim();
if !trimmed.is_empty() {
if let Ok(event) = serde_json::from_str::<LogEventV1>(trimmed) {
if self.filter.matches(&event) && !callback(&event) {
return Ok(());
}
}
}
}
line.clear();
}
pos += new_bytes;
}
}
}
mod ansi {
pub const RED: &str = "\x1b[31m";
pub const YELLOW: &str = "\x1b[33m";
pub const CYAN: &str = "\x1b[36m";
pub const DIM: &str = "\x1b[2m";
pub const RESET: &str = "\x1b[0m";
}
fn stdout_is_tty() -> bool {
use std::io::IsTerminal;
std::io::stdout().is_terminal()
}
pub fn format_event_human(event: &LogEventV1) -> String {
let use_color = stdout_is_tty();
let level_upper = event.level.to_uppercase();
let level_padded = format!("{:<5}", level_upper);
let colored_level = if use_color {
let color = match event.level.to_lowercase().as_str() {
"error" => ansi::RED,
"warn" => ansi::YELLOW,
"debug" => ansi::CYAN,
"trace" => ansi::DIM,
_ => ansi::RESET,
};
format!("{color}{level_padded}{}", ansi::RESET)
} else {
level_padded
};
let agent_suffix = match &event.agent {
Some(a) => format!("/{a}"),
None => String::new(),
};
let msg_suffix = if let Some(err) = &event.error {
format!(": {err}")
} else if let Some(outcome) = &event.outcome {
format!(" ({outcome})")
} else {
String::new()
};
let action_text = if event.action == "send" {
format_send_action(event)
} else {
let ppid_suffix = event
.fields
.get("ppid")
.and_then(|v| v.as_u64())
.map(|ppid| format!("/ppid={ppid}"))
.unwrap_or_default();
let target_suffix = if event.target.is_empty() {
String::new()
} else {
format!(" -> {}", event.target)
};
let action_text = format!("{}{}{}", event.action, msg_suffix, target_suffix);
return format!(
"{} {} [{}{} pid={}{}] {}",
event.ts,
colored_level,
event.source_binary,
agent_suffix,
event.pid,
ppid_suffix,
action_text,
);
};
format!(
"{} {} [{}{}] {}",
event.ts, colored_level, event.source_binary, agent_suffix, action_text
)
}
fn format_send_action(event: &LogEventV1) -> String {
let sender_agent = field_string(event, "sender_agent")
.or_else(|| event.agent.clone())
.unwrap_or_else(|| "-".to_string());
let sender_team = field_string(event, "sender_team")
.or_else(|| event.team.clone())
.unwrap_or_else(|| "-".to_string());
let sender_pid = field_u64(event, "sender_pid")
.map(|v| v.to_string())
.unwrap_or_else(|| "-".to_string());
let (fallback_recipient_agent, fallback_recipient_team) = event
.target
.split_once('@')
.map(|(a, t)| (Some(a.to_string()), Some(t.to_string())))
.unwrap_or((None, None));
let recipient_agent = field_string(event, "recipient_agent")
.or(fallback_recipient_agent)
.unwrap_or_else(|| "-".to_string());
let recipient_team = field_string(event, "recipient_team")
.or(fallback_recipient_team)
.unwrap_or_else(|| "-".to_string());
let recipient_pid = field_u64(event, "recipient_pid")
.map(|v| v.to_string())
.unwrap_or_else(|| "-".to_string());
let mut line = format!(
"send {}@{} [{}] -> {}@{} [{}]",
sender_agent, sender_team, sender_pid, recipient_agent, recipient_team, recipient_pid
);
if let Some(preview) = field_string(event, "message_preview")
&& !preview.is_empty()
{
line.push(' ');
line.push('"');
line.push_str(&preview);
line.push('"');
}
line
}
fn field_string(event: &LogEventV1, key: &str) -> Option<String> {
event
.fields
.get(key)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
fn field_u64(event: &LogEventV1, key: &str) -> Option<u64> {
event.fields.get(key).and_then(|v| v.as_u64())
}
pub fn parse_since(s: &str) -> Result<Duration> {
let s = s.trim();
if s.is_empty() {
anyhow::bail!("empty duration string");
}
let (num_part, unit) = s.split_at(s.len() - 1);
let n: u64 = num_part
.parse()
.with_context(|| format!("invalid duration value '{num_part}' in '{s}'"))?;
match unit {
"s" => Ok(Duration::from_secs(n)),
"m" => Ok(Duration::from_secs(n * 60)),
"h" => Ok(Duration::from_secs(n * 3600)),
other => {
anyhow::bail!("unknown duration unit '{other}' in '{s}'; expected 's', 'm', or 'h'")
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logging_event::{LogEventV1, new_log_event};
use chrono::{Duration as ChronoDuration, Utc};
use std::io::Write;
use tempfile::{NamedTempFile, TempDir};
fn make_event_with_agent(agent: &str, level: &str) -> LogEventV1 {
LogEventV1::builder("atm", "test_action", "atm::test")
.level(level)
.agent(agent)
.build()
}
fn make_event_at_ts(ts: &str, level: &str) -> LogEventV1 {
let mut ev = new_log_event("atm", "test_action", "atm::test", level);
ev.ts = ts.to_string();
ev
}
fn write_events_to_file(events: &[LogEventV1]) -> NamedTempFile {
let mut f = NamedTempFile::new().expect("temp file");
for event in events {
let line = serde_json::to_string(event).expect("serialize");
writeln!(f, "{line}").expect("write line");
}
f.flush().expect("flush");
f
}
#[test]
fn test_parse_since_seconds() {
assert_eq!(parse_since("90s").unwrap(), Duration::from_secs(90));
}
#[test]
fn test_parse_since_minutes() {
assert_eq!(parse_since("30m").unwrap(), Duration::from_secs(1800));
}
#[test]
fn test_parse_since_hours() {
assert_eq!(parse_since("2h").unwrap(), Duration::from_secs(7200));
}
#[test]
fn test_parse_since_invalid_unit() {
assert!(parse_since("5d").is_err());
}
#[test]
fn test_parse_since_empty_string() {
assert!(parse_since("").is_err());
}
#[test]
fn test_parse_since_non_numeric() {
assert!(parse_since("xm").is_err());
}
#[test]
fn test_filter_by_agent() {
let events = vec![
make_event_with_agent("team-lead", "info"),
make_event_with_agent("team-lead", "info"),
make_event_with_agent("arch-ctm", "info"),
];
let f = write_events_to_file(&events);
let filter = LogFilter {
agent: Some("team-lead".to_string()),
..Default::default()
};
let reader = LogReader::new(f.path().to_path_buf(), filter);
let results = reader.read_filtered().expect("read_filtered");
assert_eq!(results.len(), 2);
for ev in &results {
assert_eq!(ev.agent.as_deref(), Some("team-lead"));
}
}
#[test]
fn test_filter_by_level() {
let events = vec![
make_event_with_agent("a", "info"),
make_event_with_agent("a", "warn"),
make_event_with_agent("a", "error"),
make_event_with_agent("a", "warn"),
];
let f = write_events_to_file(&events);
let filter = LogFilter {
level: Some("warn".to_string()),
..Default::default()
};
let reader = LogReader::new(f.path().to_path_buf(), filter);
let results = reader.read_filtered().expect("read_filtered");
assert_eq!(results.len(), 2);
for ev in &results {
assert_eq!(ev.level, "warn");
}
}
#[test]
fn test_filter_by_level_case_insensitive() {
let events = vec![
make_event_with_agent("a", "INFO"),
make_event_with_agent("a", "warn"),
];
let f = write_events_to_file(&events);
let filter = LogFilter {
level: Some("info".to_string()),
..Default::default()
};
let reader = LogReader::new(f.path().to_path_buf(), filter);
let results = reader.read_filtered().expect("read_filtered");
assert_eq!(results.len(), 1);
}
#[test]
fn test_filter_since() {
let now = Utc::now();
let two_hours_ago =
(now - ChronoDuration::hours(2)).to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let thirty_mins_ago =
(now - ChronoDuration::minutes(30)).to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let one_min_ago =
(now - ChronoDuration::minutes(1)).to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let events = vec![
make_event_at_ts(&two_hours_ago, "info"), make_event_at_ts(&thirty_mins_ago, "info"), make_event_at_ts(&one_min_ago, "info"), ];
let f = write_events_to_file(&events);
let filter = LogFilter {
since: Some(Duration::from_secs(3600)), ..Default::default()
};
let reader = LogReader::new(f.path().to_path_buf(), filter);
let results = reader.read_filtered().expect("read_filtered");
assert_eq!(
results.len(),
2,
"only events within the last hour should match"
);
}
#[test]
fn test_limit() {
let events: Vec<LogEventV1> = (0..10)
.map(|i| {
let mut ev = new_log_event("atm", &format!("action_{i}"), "atm::test", "info");
ev.action = format!("action_{i}");
ev
})
.collect();
let f = write_events_to_file(&events);
let filter = LogFilter {
limit: Some(3),
..Default::default()
};
let reader = LogReader::new(f.path().to_path_buf(), filter);
let results = reader.read_filtered().expect("read_filtered");
assert_eq!(results.len(), 3, "limit=3 should return last 3 events");
assert_eq!(results[0].action, "action_7");
assert_eq!(results[1].action, "action_8");
assert_eq!(results[2].action, "action_9");
}
#[test]
fn test_format_human() {
let mut ev = new_log_event("atm", "send_message", "atm::send", "info");
ev.ts = "2026-02-23T10:30:00Z".to_string();
ev.agent = Some("team-lead".to_string());
let formatted = format_event_human(&ev);
assert!(
formatted.contains("2026-02-23T10:30:00Z"),
"must contain timestamp"
);
assert!(formatted.contains("INFO"), "must contain level");
assert!(formatted.contains("send_message"), "must contain action");
assert!(
formatted.contains("pid="),
"must include pid in human-rendered output"
);
}
#[test]
fn test_format_human_error_suffix() {
let mut ev = new_log_event("atm", "dispatch_error", "atm::send", "error");
ev.ts = "2026-02-23T10:30:02Z".to_string();
ev.agent = Some("arch-ctm".to_string());
ev.error = Some("connection refused".to_string());
let formatted = format_event_human(&ev);
assert!(
formatted.contains(": connection refused"),
"must contain error suffix"
);
}
#[test]
fn test_format_human_outcome_suffix() {
let mut ev = new_log_event("atm", "send_message", "atm::send", "info");
ev.ts = "2026-02-23T10:30:00Z".to_string();
ev.outcome = Some("ok".to_string());
let formatted = format_event_human(&ev);
assert!(formatted.contains("(ok)"), "must contain outcome suffix");
}
#[test]
fn test_format_human_no_agent_suffix() {
let ev = new_log_event("atm-daemon", "daemon_start", "atm_daemon::main", "info");
let formatted = format_event_human(&ev);
assert!(
formatted.contains("[atm-daemon pid="),
"no agent suffix when agent is None; got: {formatted}"
);
}
#[test]
fn test_format_human_includes_target_suffix() {
let ev = new_log_event("atm", "send_message", "atm::send", "info");
let formatted = format_event_human(&ev);
assert!(
formatted.contains("-> atm::send"),
"formatted event should include target suffix"
);
}
#[test]
fn test_format_human_includes_ppid_when_present() {
let mut ev = new_log_event("atm", "send_message", "atm::send", "info");
ev.fields
.insert("ppid".to_string(), serde_json::Value::Number(123u64.into()));
let formatted = format_event_human(&ev);
assert!(
formatted.contains("ppid=123"),
"formatted event should include ppid when available"
);
}
#[test]
fn test_format_human_send_uses_normalized_sender_recipient_layout() {
let mut ev = new_log_event("atm", "send", "atm::send", "info");
ev.fields.insert(
"sender_agent".to_string(),
serde_json::Value::String("team-lead".to_string()),
);
ev.fields.insert(
"sender_team".to_string(),
serde_json::Value::String("atm-dev".to_string()),
);
ev.fields.insert(
"sender_pid".to_string(),
serde_json::Value::Number(44201u64.into()),
);
ev.fields.insert(
"recipient_agent".to_string(),
serde_json::Value::String("arch-ctm".to_string()),
);
ev.fields.insert(
"recipient_team".to_string(),
serde_json::Value::String("atm-dev".to_string()),
);
ev.fields.insert(
"recipient_pid".to_string(),
serde_json::Value::Number(8009u64.into()),
);
ev.fields.insert(
"message_preview".to_string(),
serde_json::Value::String("test message...".to_string()),
);
let rendered = format_event_human(&ev);
assert!(rendered.contains(
"send team-lead@atm-dev [44201] -> arch-ctm@atm-dev [8009] \"test message...\""
));
}
#[test]
fn test_format_human_send_uses_dash_for_missing_pids() {
let mut ev = new_log_event("atm", "send", "atm::send", "info");
ev.agent = Some("arch-ctm".to_string());
ev.fields
.insert("ppid".to_string(), serde_json::Value::Number(123u64.into()));
ev.fields.insert(
"sender_agent".to_string(),
serde_json::Value::String("team-lead".to_string()),
);
ev.fields.insert(
"sender_team".to_string(),
serde_json::Value::String("atm-dev".to_string()),
);
ev.fields.insert(
"recipient_agent".to_string(),
serde_json::Value::String("arch-ctm".to_string()),
);
ev.fields.insert(
"recipient_team".to_string(),
serde_json::Value::String("atm-dev".to_string()),
);
let rendered = format_event_human(&ev);
assert!(rendered.contains("send team-lead@atm-dev [-] -> arch-ctm@atm-dev [-]"));
assert!(
!rendered.contains("pid="),
"send lines should only show sender/recipient PID slots"
);
assert!(
!rendered.contains("ppid="),
"send lines should not include emitter ppid in prefix"
);
}
#[test]
fn test_nonexistent_file_returns_empty() {
let tmp = TempDir::new().expect("temp dir");
let path = tmp.path().join("no-such-file.jsonl");
let filter = LogFilter::default();
let reader = LogReader::new(path, filter);
let results = reader
.read_filtered()
.expect("should return Ok on missing file");
assert!(results.is_empty(), "missing file should return empty vec");
}
#[test]
fn test_follow_mode() {
use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
let tmp = TempDir::new().expect("temp dir");
let log_path = tmp.path().join("atm.log.jsonl");
{
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.expect("open file");
let ev = new_log_event("atm", "before_follow", "atm::test", "info");
writeln!(file, "{}", serde_json::to_string(&ev).unwrap()).unwrap();
}
let collected: Arc<Mutex<Vec<LogEventV1>>> = Arc::new(Mutex::new(Vec::new()));
let collected_clone = collected.clone();
let log_path_clone = log_path.clone();
let stop_writer = Arc::new(AtomicBool::new(false));
let stop_writer_clone = Arc::clone(&stop_writer);
let writer_thread = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(700));
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&log_path_clone)
.expect("open for appending");
for i in 0..200u32 {
if stop_writer_clone.load(Ordering::Relaxed) {
break;
}
let mut ev =
new_log_event("atm", &format!("follow_event_{i}"), "atm::test", "info");
ev.action = format!("follow_event_{i}");
writeln!(file, "{}", serde_json::to_string(&ev).unwrap()).unwrap();
file.flush().unwrap();
std::thread::sleep(Duration::from_millis(50));
}
});
let (done_tx, done_rx) = mpsc::channel::<()>();
let filter = LogFilter::default();
let reader = LogReader::new(log_path.clone(), filter);
let follow_thread = std::thread::spawn(move || {
reader
.follow(|event| {
let mut guard = collected_clone.lock().unwrap();
guard.push(event.clone());
if guard.len() >= 3 {
stop_writer.store(true, Ordering::Relaxed);
false
} else {
true
}
})
.expect("follow should succeed");
let _ = done_tx.send(());
});
done_rx
.recv_timeout(Duration::from_secs(10))
.expect("follow thread did not finish within 10s — possible deadlock");
follow_thread.join().expect("follow thread joined");
writer_thread.join().expect("writer thread joined");
let guard = collected.lock().unwrap();
assert_eq!(
guard.len(),
3,
"follow should have yielded exactly 3 new events"
);
assert!(
guard[0].action.starts_with("follow_event_"),
"actions should be follow events"
);
}
#[test]
fn test_malformed_lines_skipped() {
let tmp = TempDir::new().expect("temp dir");
let path = tmp.path().join("atm.log.jsonl");
{
let mut f = std::fs::File::create(&path).expect("create");
writeln!(f, "not valid json").unwrap();
writeln!(f, "{{\"garbage\": true}}").unwrap();
let ev = new_log_event("atm", "real_event", "atm::test", "info");
writeln!(f, "{}", serde_json::to_string(&ev).unwrap()).unwrap();
}
let filter = LogFilter::default();
let reader = LogReader::new(path, filter);
let results = reader.read_filtered().expect("read_filtered");
assert_eq!(results.len(), 1, "only the valid event should be returned");
assert_eq!(results[0].action, "real_event");
}
}