use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use crate::utils::{TimeOp, TimeFilter, cmd_to_log_name};
use crate::{FileEvent, parse_log_line_jsonl};
const SCAN_BACK_BYTES: u64 = 4096;
pub struct Query {
log_dir: PathBuf,
cmd_filter: Option<String>,
path_filters: Option<Vec<PathBuf>>,
time_filters: Vec<TimeFilter>,
}
impl Query {
pub fn new(
log_dir: PathBuf,
cmd_filter: Option<String>,
path_filters: Option<Vec<PathBuf>>,
time_filters: Vec<TimeFilter>,
) -> Self {
Self {
log_dir,
cmd_filter,
path_filters,
time_filters,
}
}
pub async fn execute(&self) -> Result<()> {
let log_files = self.resolve_log_files()?;
if log_files.is_empty() {
println!("No matching log files found");
return Ok(());
}
let since_time = self.extract_since();
let until_time = self.extract_until();
let mut all_events = Vec::new();
for log_file in &log_files {
let events = self.read_events_from(log_file, since_time, until_time)?;
all_events.extend(events);
}
if let Some(ref path_filters) = self.path_filters {
all_events.retain(|event| path_filters.iter().any(|pf| event.path.starts_with(pf)));
}
self.output_events(&all_events)?;
Ok(())
}
fn extract_since(&self) -> Option<DateTime<Utc>> {
let mut since = None;
for f in &self.time_filters {
match f.op {
TimeOp::Gt | TimeOp::Ge => {
let candidate = f.time;
if since.is_none_or(|s| candidate > s) {
since = Some(candidate);
}
}
_ => {}
}
}
since
}
fn extract_until(&self) -> Option<DateTime<Utc>> {
let mut until = None;
for f in &self.time_filters {
match f.op {
TimeOp::Lt | TimeOp::Le => {
let candidate = f.time;
if until.is_none_or(|u| candidate < u) {
until = Some(candidate);
}
}
_ => {}
}
}
until
}
fn read_events_from(
&self,
log_path: &Path,
since: Option<DateTime<Utc>>,
until: Option<DateTime<Utc>>,
) -> Result<Vec<FileEvent>> {
let file = File::open(log_path)
.with_context(|| format!("Failed to open log file {}", log_path.display()))?;
let file_len = file.metadata()?.len();
if file_len == 0 {
return Ok(Vec::new());
}
let start_pos = if let Some(since_time) = since {
self.find_first_event_after(file_len, log_path, since_time)?
} else {
0
};
let mut reader = BufReader::new(
File::open(log_path)
.with_context(|| format!("Failed to open log file {}", log_path.display()))?,
);
reader.seek(SeekFrom::Start(start_pos))?;
let mut events = Vec::new();
let mut line = String::new();
while reader.read_line(&mut line)? > 0 {
let trimmed = line.trim();
if !trimmed.is_empty()
&& let Some(event) = parse_log_line_jsonl(trimmed)
{
let pass = self.time_filters.iter().all(|f| match f.op {
TimeOp::Gt => event.time > f.time,
TimeOp::Ge => event.time >= f.time,
TimeOp::Lt => event.time < f.time,
TimeOp::Le => event.time <= f.time,
TimeOp::Eq => event.time == f.time,
});
if pass {
if let Some(u) = until
&& event.time > u
{
break;
}
events.push(event);
}
}
line.clear();
}
Ok(events)
}
fn find_first_event_after(
&self,
file_len: u64,
log_path: &Path,
since: DateTime<Utc>,
) -> Result<u64> {
let file = File::open(log_path)
.with_context(|| format!("Failed to open log file {}", log_path.display()))?;
let mut reader = BufReader::new(file);
let mut low: u64 = 0;
let mut high: u64 = file_len;
while low < high {
let mid = low + (high - low) / 2;
let scan_start = mid.saturating_sub(SCAN_BACK_BYTES);
let mut buf = vec![0u8; (mid - scan_start) as usize];
reader.seek(SeekFrom::Start(scan_start))?;
reader.read_exact(&mut buf)?;
let content = String::from_utf8_lossy(&buf);
let line_start = content.rfind('\n').map(|p| p + 1).unwrap_or(0);
let adjusted_pos = scan_start + line_start as u64;
reader.seek(SeekFrom::Start(adjusted_pos))?;
let mut line = String::new();
if reader.read_line(&mut line)? == 0 {
high = mid;
continue;
}
let trimmed = line.trim();
let event_time = if !trimmed.is_empty() {
parse_log_line_jsonl(trimmed).map(|e| e.time)
} else {
None
};
match event_time {
Some(t) if t < since => {
low = mid + 1;
}
Some(_) => {
high = mid;
}
None => {
low = mid + 1;
}
}
}
Ok(low)
}
fn output_events(&self, events: &[FileEvent]) -> Result<()> {
for event in events {
println!("{}", event.to_jsonl_string());
}
Ok(())
}
pub fn resolve_log_files(&self) -> Result<Vec<PathBuf>> {
let log_dir = &self.log_dir;
if !log_dir.exists() {
return Ok(Vec::new());
}
Ok(if let Some(ref cmd) = self.cmd_filter {
let log_path = log_dir.join(cmd_to_log_name(cmd));
if log_path.exists() {
vec![log_path]
} else {
Vec::new()
}
} else {
let mut files = Vec::new();
for entry in fs::read_dir(log_dir)? {
let entry = entry?;
let path = entry.path();
let fname = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if fname.ends_with("_log.jsonl") && path.is_file() {
files.push(path);
}
}
files.sort();
files
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::EventType;
use chrono::Utc;
use std::io::Write;
fn create_log_file(dir: &Path, events: &[FileEvent]) -> PathBuf {
let path = dir.join("test.jsonl");
let mut f = fs::File::create(&path).unwrap();
for event in events {
writeln!(f, "{}", event.to_jsonl_string()).unwrap();
}
path
}
#[test]
fn test_read_events_basic() {
let dir = std::env::temp_dir().join("fsmon_query_test_basic");
fs::create_dir_all(&dir).unwrap();
let events = vec![
FileEvent {
time: Utc::now(),
event_type: EventType::Create,
path: PathBuf::from("/tmp/test"),
pid: 100,
cmd: "touch".into(),
user: "root".into(),
file_size: 0,
ppid: 0,
tgid: 0,
chain: String::new(),
},
FileEvent {
time: Utc::now(),
event_type: EventType::Modify,
path: PathBuf::from("/tmp/test"),
pid: 200,
cmd: "vim".into(),
user: "root".into(),
file_size: 100,
ppid: 0,
tgid: 0,
chain: String::new(),
},
];
let log_path = create_log_file(&dir, &events);
let log_dir = log_path.parent().unwrap().to_path_buf();
let q = Query::new(log_dir, None, None, vec![]);
let result = q.read_events_from(&log_path, None, None).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0].pid, 100);
assert_eq!(result[1].pid, 200);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_read_events_empty_file() {
let dir = std::env::temp_dir().join("fsmon_query_test_empty");
fs::create_dir_all(&dir).unwrap();
let log_path = create_log_file(&dir, &[]);
let log_dir = log_path.parent().unwrap().to_path_buf();
let q = Query::new(log_dir, None, None, vec![]);
let result = q.read_events_from(&log_path, None, None).unwrap();
assert!(result.is_empty());
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_resolve_log_files_by_cmd() {
let dir = std::env::temp_dir().join("fsmon_query_test_resolve_cmd");
fs::create_dir_all(&dir).unwrap();
let log_path = dir.join("openclaw_log.jsonl");
let mut f = fs::File::create(&log_path).unwrap();
writeln!(f, "{{\"time\":\"2025-01-01T00:00:00Z\",\"event_type\":\"CREATE\",\"path\":\"/a\",\"pid\":1,\"cmd\":\"openclaw\",\"user\":\"r\",\"file_size\":0,\"ppid\":0,\"tgid\":0,\"chain\":\"\"}}").unwrap();
let q = Query::new(dir.clone(), Some("openclaw".into()), None, vec![]);
let files = q.resolve_log_files().unwrap();
assert_eq!(files.len(), 1);
assert!(files[0].to_string_lossy().contains("openclaw_log.jsonl"));
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_resolve_log_files_nonexistent_cmd() {
let dir = std::env::temp_dir().join("fsmon_query_test_nonexistent_cmd");
fs::create_dir_all(&dir).unwrap();
let q = Query::new(dir.clone(), Some("nonexistent".into()), None, vec![]);
let files = q.resolve_log_files().unwrap();
assert!(
files.is_empty(),
"nonexistent cmd should yield no log files"
);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_read_events_with_since_filter() {
let dir = std::env::temp_dir().join("fsmon_query_test_since");
fs::create_dir_all(&dir).unwrap();
let now = Utc::now();
let old_time = now - chrono::Duration::hours(2);
let recent_time = now - chrono::Duration::minutes(30);
let events = vec![
FileEvent {
time: old_time,
event_type: EventType::Create,
path: PathBuf::from("/tmp/old"),
pid: 100,
cmd: "test".into(),
user: "root".into(),
file_size: 0,
ppid: 0,
tgid: 0,
chain: String::new(),
},
FileEvent {
time: recent_time,
event_type: EventType::Modify,
path: PathBuf::from("/tmp/recent"),
pid: 200,
cmd: "test".into(),
user: "root".into(),
file_size: 50,
ppid: 0,
tgid: 0,
chain: String::new(),
},
];
let log_path = create_log_file(&dir, &events);
let since = now - chrono::Duration::hours(1);
let log_dir = log_path.parent().unwrap().to_path_buf();
let q = Query::new(log_dir, None, None, vec![]);
let result = q.read_events_from(&log_path, Some(since), None).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].pid, 200);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_read_events_until_filter() {
let dir = std::env::temp_dir().join("fsmon_query_test_until");
fs::create_dir_all(&dir).unwrap();
let now = Utc::now();
let old_time = now - chrono::Duration::hours(2);
let recent_time = now - chrono::Duration::minutes(30);
let events = vec![
FileEvent {
time: old_time,
event_type: EventType::Create,
path: PathBuf::from("/tmp/old"),
pid: 100,
cmd: "test".into(),
user: "root".into(),
file_size: 0,
ppid: 0,
tgid: 0,
chain: String::new(),
},
FileEvent {
time: recent_time,
event_type: EventType::Modify,
path: PathBuf::from("/tmp/recent"),
pid: 200,
cmd: "test".into(),
user: "root".into(),
file_size: 50,
ppid: 0,
tgid: 0,
chain: String::new(),
},
];
let log_path = create_log_file(&dir, &events);
let until = now - chrono::Duration::hours(1);
let log_dir = log_path.parent().unwrap().to_path_buf();
let q = Query::new(log_dir, None, None, vec![]);
let result = q.read_events_from(&log_path, None, Some(until)).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].pid, 100);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_read_events_time_range() {
let dir = std::env::temp_dir().join("fsmon_query_test_range");
fs::create_dir_all(&dir).unwrap();
let now = Utc::now();
let t1 = now - chrono::Duration::hours(3);
let t2 = now - chrono::Duration::hours(2);
let t3 = now - chrono::Duration::hours(1);
let events = vec![
FileEvent {
time: t1,
event_type: EventType::Create,
path: PathBuf::from("/a"),
pid: 1,
cmd: "t".into(),
user: "r".into(),
file_size: 0,
ppid: 0,
tgid: 0,
chain: String::new(),
},
FileEvent {
time: t2,
event_type: EventType::Modify,
path: PathBuf::from("/b"),
pid: 2,
cmd: "t".into(),
user: "r".into(),
file_size: 0,
ppid: 0,
tgid: 0,
chain: String::new(),
},
FileEvent {
time: t3,
event_type: EventType::Delete,
path: PathBuf::from("/c"),
pid: 3,
cmd: "t".into(),
user: "r".into(),
file_size: 0,
ppid: 0,
tgid: 0,
chain: String::new(),
},
];
let log_path = create_log_file(&dir, &events);
let log_dir = log_path.parent().unwrap().to_path_buf();
let q = Query::new(log_dir, None, None, vec![]);
let since = now - chrono::Duration::minutes(150);
let until = now - chrono::Duration::minutes(90);
let result = q
.read_events_from(&log_path, Some(since), Some(until))
.unwrap();
assert_eq!(result.len(), 1, "expected only t2 in range");
assert_eq!(result[0].pid, 2);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_extract_since_and_until() {
let now = Utc::now();
let t1 = now - chrono::Duration::hours(2);
let t2 = now - chrono::Duration::hours(1);
let q = Query::new(
PathBuf::from("/tmp"),
None,
None,
vec![TimeFilter {
op: TimeOp::Gt,
time: t1,
}],
);
assert!(q.extract_since().is_some());
assert!(q.extract_until().is_none());
let q = Query::new(
PathBuf::from("/tmp"),
None,
None,
vec![TimeFilter {
op: TimeOp::Lt,
time: t2,
}],
);
assert!(q.extract_since().is_none());
assert!(q.extract_until().is_some());
let q = Query::new(
PathBuf::from("/tmp"),
None,
None,
vec![
TimeFilter {
op: TimeOp::Gt,
time: t1,
},
TimeFilter {
op: TimeOp::Lt,
time: t2,
},
],
);
assert!(q.extract_since().is_some());
assert!(q.extract_until().is_some());
let q = Query::new(PathBuf::from("/tmp"), None, None, vec![]);
assert!(q.extract_since().is_none());
assert!(q.extract_until().is_none());
}
#[test]
fn test_extract_since_takes_latest_ge() {
let now = Utc::now();
let t_early = now - chrono::Duration::hours(3);
let t_late = now - chrono::Duration::hours(1);
let q = Query::new(
PathBuf::from("/tmp"),
None,
None,
vec![
TimeFilter {
op: TimeOp::Ge,
time: t_early,
},
TimeFilter {
op: TimeOp::Ge,
time: t_late,
},
],
);
let s = q.extract_since().unwrap();
assert_eq!(s, t_late, "should pick the later/more-restrictive time");
}
#[test]
fn test_extract_until_takes_earliest_le() {
let now = Utc::now();
let t_early = now - chrono::Duration::hours(3);
let t_late = now - chrono::Duration::hours(1);
let q = Query::new(
PathBuf::from("/tmp"),
None,
None,
vec![
TimeFilter {
op: TimeOp::Le,
time: t_late,
},
TimeFilter {
op: TimeOp::Le,
time: t_early,
},
],
);
let u = q.extract_until().unwrap();
assert_eq!(u, t_early, "should pick the earlier/more-restrictive time");
}
#[test]
fn test_path_filter_filters_events() {
let dir = std::env::temp_dir().join("fsmon_query_test_path_filter");
fs::create_dir_all(&dir).unwrap();
let events = vec![
FileEvent {
time: Utc::now(),
event_type: EventType::Create,
path: PathBuf::from("/home/user/file.txt"),
pid: 1,
cmd: "vim".into(),
user: "root".into(),
file_size: 0,
ppid: 0,
tgid: 0,
chain: String::new(),
},
FileEvent {
time: Utc::now(),
event_type: EventType::Modify,
path: PathBuf::from("/tmp/cache.dat"),
pid: 2,
cmd: "bash".into(),
user: "root".into(),
file_size: 100,
ppid: 0,
tgid: 0,
chain: String::new(),
},
];
let log_path = create_log_file(&dir, &events);
let log_dir = log_path.parent().unwrap().to_path_buf();
let q = Query::new(log_dir, None, Some(vec![PathBuf::from("/home")]), vec![]);
let result = q.read_events_from(&log_path, None, None).unwrap();
assert_eq!(
result.len(),
2,
"read_events_from returns all events (path filter is applied in execute)"
);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_execute_with_path_filter() {
let dir = std::env::temp_dir().join("fsmon_query_test_exec_path");
fs::create_dir_all(&dir).unwrap();
let log_path = dir.join("_global_log.jsonl");
let mut f = fs::File::create(&log_path).unwrap();
writeln!(f, "{{\"time\":\"2025-01-01T00:00:00Z\",\"event_type\":\"CREATE\",\"path\":\"/home/a\",\"pid\":1,\"cmd\":\"x\",\"user\":\"r\",\"file_size\":0,\"ppid\":0,\"tgid\":0,\"chain\":\"\"}}").unwrap();
writeln!(f, "{{\"time\":\"2025-01-01T00:00:01Z\",\"event_type\":\"MODIFY\",\"path\":\"/tmp/b\",\"pid\":2,\"cmd\":\"y\",\"user\":\"r\",\"file_size\":10,\"ppid\":0,\"tgid\":0,\"chain\":\"\"}}").unwrap();
drop(f);
let q = Query::new(
dir.clone(),
None,
Some(vec![PathBuf::from("/home")]),
vec![],
);
let files = q.resolve_log_files().unwrap();
assert_eq!(files.len(), 1);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_resolve_log_files_by_cmd_returns_correct_file() {
let dir = std::env::temp_dir().join("fsmon_query_test_cmd_file");
fs::create_dir_all(&dir).unwrap();
let log_path = dir.join("nginx_log.jsonl");
fs::File::create(&log_path).unwrap();
let q = Query::new(dir.clone(), Some("nginx".into()), None, vec![]);
let files = q.resolve_log_files().unwrap();
assert_eq!(files.len(), 1);
assert!(files[0].to_string_lossy().ends_with("nginx_log.jsonl"));
let _ = fs::remove_dir_all(&dir);
}
}