use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use crate::utils::{TimeFilter, TimeFilterExt, cmd_to_log_name};
use crate::{FileEvent, parse_log_line_jsonl};
const SCAN_BACK_BYTES: u64 = 4096;
pub struct Query {
log_dir: PathBuf,
cmd_filter: Option<String>,
path_filters: Option<Vec<PathBuf>>,
time_filters: Vec<TimeFilter>,
local_time: bool,
}
impl Query {
pub fn new(
log_dir: PathBuf,
cmd_filter: Option<String>,
path_filters: Option<Vec<PathBuf>>,
time_filters: Vec<TimeFilter>,
local_time: bool,
) -> Self {
Self {
log_dir,
cmd_filter,
path_filters,
time_filters,
local_time,
}
}
pub fn path_filters(&self) -> Option<&Vec<PathBuf>> {
self.path_filters.as_ref()
}
pub async fn execute(&self) -> Result<()> {
let log_files = self.resolve_log_files()?;
if log_files.is_empty() {
println!("No matching log files found");
return Ok(());
}
let since_time = self.extract_since();
let until_time = self.extract_until();
let mut all_events = Vec::new();
for log_file in &log_files {
let events = self.read_events_from(log_file, since_time, until_time)?;
all_events.extend(events);
}
if let Some(ref path_filters) = self.path_filters {
all_events.retain(|event| path_filters.iter().any(|pf| event.path.starts_with(pf)));
}
self.output_events(&all_events)?;
Ok(())
}
fn extract_since(&self) -> Option<DateTime<Utc>> {
let mut since = None;
for f in &self.time_filters {
if f.is_lower_bound() {
let candidate = f.time;
if since.is_none_or(|s| candidate > s) {
since = Some(candidate);
}
}
}
since
}
fn extract_until(&self) -> Option<DateTime<Utc>> {
let mut until = None;
for f in &self.time_filters {
if f.is_upper_bound() {
let candidate = f.time;
if until.is_none_or(|u| candidate < u) {
until = Some(candidate);
}
}
}
until
}
pub fn read_events_from(
&self,
log_path: &Path,
since: Option<DateTime<Utc>>,
until: Option<DateTime<Utc>>,
) -> Result<Vec<FileEvent>> {
let file = File::open(log_path)
.with_context(|| format!("Failed to open log file {}", log_path.display()))?;
let file_len = file.metadata()?.len();
if file_len == 0 {
return Ok(Vec::new());
}
let start_pos = if let Some(since_time) = since {
self.find_first_event_after(file_len, log_path, since_time)?
} else {
0
};
let mut reader = BufReader::new(
File::open(log_path)
.with_context(|| format!("Failed to open log file {}", log_path.display()))?,
);
reader.seek(SeekFrom::Start(start_pos))?;
let mut events = Vec::new();
let mut line = String::new();
while reader.read_line(&mut line)? > 0 {
let trimmed = line.trim();
if !trimmed.is_empty()
&& let Some(event) = parse_log_line_jsonl(trimmed)
{
let pass = self.time_filters.iter().all(|f| f.matches(event.time));
if pass {
if let Some(u) = until
&& event.time > u
{
break;
}
events.push(event);
}
}
line.clear();
}
Ok(events)
}
pub fn find_first_event_after(
&self,
file_len: u64,
log_path: &Path,
since: DateTime<Utc>,
) -> Result<u64> {
let file = File::open(log_path)
.with_context(|| format!("Failed to open log file {}", log_path.display()))?;
let mut reader = BufReader::new(file);
let mut low: u64 = 0;
let mut high: u64 = file_len;
while low < high {
let mid = low + (high - low) / 2;
let scan_start = mid.saturating_sub(SCAN_BACK_BYTES);
let mut buf = vec![0u8; (mid - scan_start) as usize];
reader.seek(SeekFrom::Start(scan_start))?;
reader.read_exact(&mut buf)?;
let content = String::from_utf8_lossy(&buf);
let line_start = content.rfind('\n').map(|p| p + 1).unwrap_or(0);
let adjusted_pos = scan_start + line_start as u64;
reader.seek(SeekFrom::Start(adjusted_pos))?;
let mut line = String::new();
if reader.read_line(&mut line)? == 0 {
high = mid;
continue;
}
let trimmed = line.trim();
let event_time = if !trimmed.is_empty() {
parse_log_line_jsonl(trimmed).map(|e| e.time)
} else {
None
};
match event_time {
Some(t) if t < since => {
low = mid + 1;
}
Some(_) => {
high = mid;
}
None => {
low = mid + 1;
}
}
}
Ok(low)
}
pub async fn execute_changes(&self) -> Result<()> {
let log_files = self.resolve_log_files()?;
if log_files.is_empty() {
println!("No matching log files found");
return Ok(());
}
let since_time = self.extract_since();
let until_time = self.extract_until();
let mut latest_by_path: std::collections::HashMap<PathBuf, FileEvent> =
std::collections::HashMap::new();
for log_file in &log_files {
let events = self.read_events_from(log_file, since_time, until_time)?;
for event in events {
if let Some(ref path_filters) = self.path_filters
&& !path_filters.iter().any(|pf| event.path.starts_with(pf))
{
continue;
}
match latest_by_path.entry(event.path.clone()) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
if event.time > entry.get().time {
entry.insert(event);
}
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(event);
}
}
}
}
let mut all_events: Vec<FileEvent> = latest_by_path.into_values().collect();
all_events.sort_by_key(|b| std::cmp::Reverse(b.time));
self.output_events(&all_events)?;
Ok(())
}
fn output_events(&self, events: &[FileEvent]) -> Result<()> {
for event in events {
let line = if self.local_time {
event.to_jsonl_string_local()
} else {
event.to_jsonl_string()
};
println!("{}", line);
}
Ok(())
}
pub fn resolve_log_files(&self) -> Result<Vec<PathBuf>> {
let log_dir = &self.log_dir;
if !log_dir.exists() {
return Ok(Vec::new());
}
Ok(if let Some(ref cmd) = self.cmd_filter {
let log_path = log_dir.join(cmd_to_log_name(cmd));
if log_path.exists() {
vec![log_path]
} else {
Vec::new()
}
} else {
let mut files = Vec::new();
for entry in fs::read_dir(log_dir)? {
let entry = entry?;
let path = entry.path();
let fname = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if fname.ends_with("_log.jsonl") && path.is_file() {
files.push(path);
}
}
files.sort();
files
})
}
}