use super::transform;
use super::EventFilter;
use anyhow::Result;
use serde_json::Value;
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use tracing::info;
pub fn find_event_files(dir: &Path) -> Result<Vec<PathBuf>> {
if !dir.exists() {
return Ok(Vec::new());
}
let mut files: Vec<_> = fs::read_dir(dir)?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext == "jsonl")
.unwrap_or(false)
})
.map(|e| e.path())
.collect();
files.sort();
Ok(files)
}
pub fn get_all_event_files() -> Result<Vec<PathBuf>> {
let current_dir = std::env::current_dir()?;
let repo_name = crate::storage::extract_repo_name(¤t_dir)?;
let global_base = crate::storage::get_default_storage_dir()?;
let global_events_dir = global_base.join("events").join(&repo_name);
if !global_events_dir.exists() {
return Ok(Vec::new());
}
let mut all_files = Vec::new();
for entry in fs::read_dir(&global_events_dir)? {
let entry = entry?;
if entry.path().is_dir() {
let event_files = find_event_files(&entry.path())?;
all_files.extend(event_files);
}
}
all_files.sort();
Ok(all_files)
}
pub fn resolve_job_event_file(job_id: &str) -> Result<PathBuf> {
let current_dir = std::env::current_dir()?;
let repo_name = crate::storage::extract_repo_name(¤t_dir)?;
let global_base = crate::storage::get_default_storage_dir()?;
let job_events_dir = global_base.join("events").join(&repo_name).join(job_id);
if !job_events_dir.exists() {
return Err(anyhow::anyhow!("Job '{}' not found", job_id));
}
let event_files = find_event_files(&job_events_dir)?;
event_files
.into_iter()
.next_back()
.ok_or_else(|| anyhow::anyhow!("No event files found for job '{}'", job_id))
}
pub fn resolve_event_file_with_fallback(file: PathBuf, job_id: Option<&str>) -> Result<PathBuf> {
if file.exists() {
return Ok(file);
}
if let Some(job_id) = job_id {
if let Ok(resolved) = resolve_job_event_file(job_id) {
info!("Using global event file: {:?}", resolved);
return Ok(resolved);
}
}
Ok(file)
}
pub fn build_global_events_path(repo_name: &str) -> Result<PathBuf> {
let global_base = crate::storage::get_default_storage_dir()?;
Ok(global_base.join("events").join(repo_name))
}
pub fn determine_watch_path(file: &Path) -> PathBuf {
if file.exists() {
file.to_path_buf()
} else {
file.parent()
.map(|p| p.to_path_buf())
.unwrap_or_else(|| file.to_path_buf())
}
}
pub fn read_events_from_files(event_files: &[PathBuf]) -> Result<Vec<Value>> {
let mut all_events = Vec::new();
for file in event_files {
let content = fs::File::open(file)?;
let reader = BufReader::new(content);
for line in reader.lines() {
let line = line?;
if let Some(event) = transform::parse_event_line(&line) {
all_events.push(event);
}
}
}
Ok(all_events)
}
pub fn read_events_from_single_file(file: &PathBuf) -> Result<Vec<Value>> {
let file_handle = fs::File::open(file)?;
let reader = BufReader::new(file_handle);
let events = reader
.lines()
.map_while(Result::ok)
.filter_map(|line| transform::parse_event_line(&line))
.collect();
Ok(events)
}
pub fn read_and_filter_events(
file: &PathBuf,
filter: &EventFilter,
limit: usize,
) -> Result<Vec<Value>> {
let file_handle = fs::File::open(file)?;
let reader = BufReader::new(file_handle);
let events = reader
.lines()
.map_while(Result::ok)
.filter_map(|line| transform::parse_event_line(&line))
.filter(|event| filter.matches_event(event))
.take(limit)
.collect();
Ok(events)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::TempDir;
#[test]
fn test_find_event_files_empty_dir() {
let temp_dir = TempDir::new().unwrap();
let files = find_event_files(temp_dir.path()).unwrap();
assert!(files.is_empty());
}
#[test]
fn test_find_event_files_with_jsonl() {
let temp_dir = TempDir::new().unwrap();
fs::write(temp_dir.path().join("events1.jsonl"), "{}").unwrap();
fs::write(temp_dir.path().join("events2.jsonl"), "{}").unwrap();
fs::write(temp_dir.path().join("not-events.txt"), "{}").unwrap();
let files = find_event_files(temp_dir.path()).unwrap();
assert_eq!(files.len(), 2);
assert!(files[0].ends_with("events1.jsonl"));
assert!(files[1].ends_with("events2.jsonl"));
}
#[test]
fn test_find_event_files_nonexistent_dir() {
let files = find_event_files(Path::new("/nonexistent/path")).unwrap();
assert!(files.is_empty());
}
#[test]
fn test_determine_watch_path_existing_file() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.jsonl");
fs::write(&file_path, "{}").unwrap();
let watch_path = determine_watch_path(&file_path);
assert_eq!(watch_path, file_path);
}
#[test]
fn test_determine_watch_path_nonexistent_file() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("nonexistent.jsonl");
let watch_path = determine_watch_path(&file_path);
assert_eq!(watch_path, temp_dir.path());
}
#[test]
fn test_read_events_from_single_file() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("events.jsonl");
let mut file = fs::File::create(&file_path).unwrap();
writeln!(file, r#"{{"type":"test1"}}"#).unwrap();
writeln!(file, r#"{{"type":"test2"}}"#).unwrap();
let events = read_events_from_single_file(&file_path).unwrap();
assert_eq!(events.len(), 2);
}
#[test]
fn test_read_events_from_files() {
let temp_dir = TempDir::new().unwrap();
let file1 = temp_dir.path().join("events1.jsonl");
let file2 = temp_dir.path().join("events2.jsonl");
fs::write(&file1, r#"{"type":"test1"}"#).unwrap();
fs::write(&file2, r#"{"type":"test2"}"#).unwrap();
let events = read_events_from_files(&[file1, file2]).unwrap();
assert_eq!(events.len(), 2);
}
#[test]
fn test_build_global_events_path() {
let path = build_global_events_path("test-repo").unwrap();
assert!(path.ends_with("events/test-repo"));
}
}