use super::{format, io, transform};
use anyhow::Result;
use chrono::{DateTime, Local};
use serde_json::Value;
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
pub fn get_available_jobs() -> Result<Vec<format::JobInfo>> {
let current_dir = std::env::current_dir()?;
let repo_name = crate::storage::extract_repo_name(¤t_dir)?;
let global_base = crate::storage::get_default_storage_dir()?;
let global_events_dir = global_base.join("events").join(&repo_name);
if !global_events_dir.exists() {
return Ok(Vec::new());
}
let entries = fs::read_dir(&global_events_dir)?;
let jobs: Vec<format::JobInfo> = entries
.filter_map(|e| e.ok())
.filter(|e| e.path().is_dir())
.map(|entry| {
let job_id = entry.file_name().to_string_lossy().to_string();
read_job_status(&global_events_dir.join(&job_id)).unwrap_or_else(|_| format::JobInfo {
id: job_id.clone(),
status: format::JobStatus::Unknown,
start_time: None,
end_time: None,
success_count: 0,
failure_count: 0,
})
})
.collect();
Ok(jobs)
}
pub fn read_job_status(job_events_dir: &Path) -> Result<format::JobInfo> {
let job_id = job_events_dir
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let mut status = format::JobStatus::Unknown;
let mut start_time = None;
let mut end_time = None;
let mut success_count = 0;
let mut failure_count = 0;
let event_files = io::find_event_files(job_events_dir)?;
for file in event_files {
let content = fs::read_to_string(&file)?;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(event) = serde_json::from_str::<Value>(line) {
process_event_for_status(
&event,
&mut status,
&mut start_time,
&mut end_time,
&mut success_count,
&mut failure_count,
);
}
}
}
Ok(format::JobInfo {
id: job_id,
status,
start_time,
end_time,
success_count,
failure_count,
})
}
pub fn process_event_for_status(
event: &Value,
status: &mut format::JobStatus,
start_time: &mut Option<DateTime<Local>>,
end_time: &mut Option<DateTime<Local>>,
success_count: &mut u64,
failure_count: &mut u64,
) {
if event.get("JobStarted").is_some() {
*status = format::JobStatus::InProgress;
if let Some(ts) = transform::extract_timestamp(event) {
*start_time = Some(ts.with_timezone(&Local));
}
} else if let Some(completed) = event.get("JobCompleted") {
*status = format::JobStatus::Completed;
if let Some(ts) = transform::extract_timestamp(event) {
*end_time = Some(ts.with_timezone(&Local));
}
if let Some(s) = completed.get("success_count").and_then(|v| v.as_u64()) {
*success_count = s;
}
if let Some(f) = completed.get("failure_count").and_then(|v| v.as_u64()) {
*failure_count = f;
}
} else if event.get("JobFailed").is_some() {
*status = format::JobStatus::Failed;
if let Some(ts) = transform::extract_timestamp(event) {
*end_time = Some(ts.with_timezone(&Local));
}
}
}
pub async fn show_aggregated_stats(group_by: String, output_format: String) -> Result<()> {
let event_files = io::get_all_event_files()?;
if event_files.is_empty() {
println!("No events found in global storage.");
return Ok(());
}
let all_events = io::read_events_from_files(&event_files)?;
let (stats, total) = transform::calculate_event_statistics(all_events.into_iter(), &group_by);
let sorted_stats = transform::sort_statistics_by_count(stats);
format::display_statistics_with_format(&sorted_stats, total, &group_by, &output_format, true)
}
pub async fn search_aggregated_events(pattern: String, fields: Option<Vec<String>>) -> Result<()> {
let event_files = io::get_all_event_files()?;
if event_files.is_empty() {
println!("No events found in global storage.");
return Ok(());
}
let all_events = io::read_events_from_files(&event_files)?;
let matching_events =
transform::search_events_with_pattern(&all_events, &pattern, fields.as_deref())?;
format::display_search_results(&matching_events, true)
}
pub async fn export_aggregated_events(format: String, output: Option<PathBuf>) -> Result<()> {
let event_files = io::get_all_event_files()?;
if event_files.is_empty() {
println!("No events found in global storage.");
return Ok(());
}
let mut events = Vec::new();
for file in event_files {
let content = fs::File::open(&file)?;
let reader = BufReader::new(content);
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let event: Value = serde_json::from_str(&line)?;
events.push(event);
}
}
let exported = match format.as_str() {
"json" => format::export_as_json(&events)?,
"csv" => format::export_as_csv(&events)?,
"markdown" => format::export_as_markdown(&events)?,
_ => return Err(anyhow::anyhow!("Unsupported format: {}", format)),
};
if let Some(output_path) = output {
fs::write(output_path, exported)?;
println!(
"Events exported successfully ({} events from all jobs)",
events.len()
);
} else {
println!("{}", exported);
}
Ok(())
}
pub fn should_analyze_global_storage(all_jobs: bool, job_id: Option<&str>) -> bool {
all_jobs || job_id.is_some()
}
pub async fn analyze_retention_targets(
all_jobs: bool,
job_id: Option<&str>,
policy: &crate::cook::execution::events::retention::RetentionPolicy,
) -> Result<crate::cook::execution::events::retention::RetentionAnalysis> {
use crate::cook::execution::events::retention::{RetentionAnalysis, RetentionManager};
let mut analysis_total = RetentionAnalysis::default();
if should_analyze_global_storage(all_jobs, job_id) {
let current_dir = std::env::current_dir()?;
let repo_name = crate::storage::extract_repo_name(¤t_dir)?;
let global_events_dir = io::build_global_events_path(&repo_name)?;
if global_events_dir.exists() {
let job_dirs = get_job_directories(&global_events_dir, job_id)?;
analysis_total = aggregate_job_retention(job_dirs, policy).await?;
}
} else {
let local_file = PathBuf::from(".prodigy/events/mapreduce_events.jsonl");
if local_file.exists() {
let retention = RetentionManager::new(policy.clone(), local_file);
analysis_total = retention.analyze_retention().await?;
}
}
Ok(analysis_total)
}
pub async fn aggregate_job_retention(
job_dirs: Vec<PathBuf>,
policy: &crate::cook::execution::events::retention::RetentionPolicy,
) -> Result<crate::cook::execution::events::retention::RetentionAnalysis> {
use crate::cook::execution::events::retention::{RetentionAnalysis, RetentionManager};
let mut analysis_total = RetentionAnalysis::default();
for job_dir in job_dirs {
let event_files = io::find_event_files(&job_dir)?;
for event_file in event_files {
let retention = RetentionManager::new(policy.clone(), event_file);
let analysis = retention.analyze_retention().await?;
analysis_total.events_to_remove += analysis.events_to_remove;
analysis_total.space_to_save += analysis.space_to_save;
if policy.archive_old_events {
analysis_total.events_to_archive += analysis.events_to_archive;
}
}
}
Ok(analysis_total)
}
pub fn get_job_directories(global_events_dir: &Path, job_id: Option<&str>) -> Result<Vec<PathBuf>> {
if let Some(specific_job_id) = job_id {
let specific_dir = global_events_dir.join(specific_job_id);
if specific_dir.exists() {
Ok(vec![specific_dir])
} else {
Ok(vec![])
}
} else {
Ok(fs::read_dir(global_events_dir)?
.filter_map(|e| e.ok())
.filter(|e| e.path().is_dir())
.map(|e| e.path())
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_analyze_global_storage_with_all_jobs() {
assert!(should_analyze_global_storage(true, None));
}
#[test]
fn test_should_analyze_global_storage_with_job_id() {
assert!(should_analyze_global_storage(false, Some("job-123")));
}
#[test]
fn test_should_analyze_global_storage_neither_flag() {
assert!(!should_analyze_global_storage(false, None));
}
#[test]
fn test_process_event_for_status_job_started() {
use serde_json::json;
let event = json!({
"JobStarted": {
"job_id": "test-123",
"timestamp": "2024-01-01T00:00:00Z"
}
});
let mut status = format::JobStatus::Unknown;
let mut start_time = None;
let mut end_time = None;
let mut success_count = 0;
let mut failure_count = 0;
process_event_for_status(
&event,
&mut status,
&mut start_time,
&mut end_time,
&mut success_count,
&mut failure_count,
);
assert!(matches!(status, format::JobStatus::InProgress));
assert!(start_time.is_some());
}
#[test]
fn test_process_event_for_status_job_completed() {
use serde_json::json;
let event = json!({
"JobCompleted": {
"job_id": "test-123",
"success_count": 5,
"failure_count": 2,
"timestamp": "2024-01-01T01:00:00Z"
}
});
let mut status = format::JobStatus::InProgress;
let mut start_time = None;
let mut end_time = None;
let mut success_count = 0;
let mut failure_count = 0;
process_event_for_status(
&event,
&mut status,
&mut start_time,
&mut end_time,
&mut success_count,
&mut failure_count,
);
assert!(matches!(status, format::JobStatus::Completed));
assert!(end_time.is_some());
assert_eq!(success_count, 5);
assert_eq!(failure_count, 2);
}
#[test]
fn test_process_event_for_status_job_failed() {
use serde_json::json;
let event = json!({
"JobFailed": {
"job_id": "test-123",
"error": "Something went wrong",
"timestamp": "2024-01-01T01:00:00Z"
}
});
let mut status = format::JobStatus::InProgress;
let mut start_time = None;
let mut end_time = None;
let mut success_count = 0;
let mut failure_count = 0;
process_event_for_status(
&event,
&mut status,
&mut start_time,
&mut end_time,
&mut success_count,
&mut failure_count,
);
assert!(matches!(status, format::JobStatus::Failed));
assert!(end_time.is_some());
}
}