use crate::api::client::GdeltClient;
use crate::cli::args::{DataCommand, DownloadCommand, GlobalArgs, DataType};
use crate::cli::output::OutputWriter;
use crate::data::downloader::Downloader;
use crate::data::masterfile::{FileType, MasterFileList};
use crate::db::cache::CacheDb;
use crate::db::duckdb::AnalyticsDb;
use crate::error::{ExitStatus, GdeltError, Result};
use chrono::NaiveDate;
use indicatif::MultiProgress;
use serde_json::json;
use std::time::Duration;
pub async fn handle_data(cmd: DataCommand, global: &GlobalArgs) -> Result<ExitStatus> {
match cmd {
DataCommand::Download(download_cmd) => handle_download(download_cmd, global).await,
DataCommand::Sync(args) => handle_sync(args.data_type, args.quick, args.days, global).await,
DataCommand::Status(args) => handle_status(args.detailed, global).await,
DataCommand::List(args) => handle_list(args.data_type, global).await,
DataCommand::Delete(args) => handle_delete(args, global).await,
}
}
async fn handle_download(cmd: DownloadCommand, global: &GlobalArgs) -> Result<ExitStatus> {
let client = GdeltClient::with_timeout(Duration::from_secs(global.timeout))?;
let cache = CacheDb::open()?;
let output = OutputWriter::new(global);
let (file_type, start, end, parallel) = match cmd {
DownloadCommand::Events(args) => (FileType::Events, args.start, args.end, args.parallel),
DownloadCommand::Gkg(args) => (FileType::Gkg, args.start, args.end, args.parallel),
DownloadCommand::Mentions(args) => (FileType::Mentions, args.start, args.end, args.parallel),
};
if global.dry_run {
let list = MasterFileList::fetch(&client, false).await?;
let files = list.filter_by_type(file_type);
let start_date = start.as_ref().and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok());
let end_date = end.as_ref().and_then(|e| NaiveDate::parse_from_str(e, "%Y-%m-%d").ok());
let filtered: Vec<_> = files.iter()
.filter(|f| {
if let Some(dt) = f.datetime {
let date = dt.date();
let after_start = start_date.map_or(true, |s| date >= s);
let before_end = end_date.map_or(true, |e| date <= e);
after_start && before_end
} else {
false
}
})
.collect();
let total_size: u64 = filtered.iter().map(|f| f.size).sum();
output.write_value(&json!({
"dry_run": true,
"file_type": file_type.as_str(),
"file_count": filtered.len(),
"total_size_bytes": total_size,
"total_size_mb": total_size / (1024 * 1024),
}))?;
return Ok(ExitStatus::Success);
}
let list = MasterFileList::fetch(&client, false).await?;
let start_date = start.as_ref().and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok());
let end_date = end.as_ref().and_then(|e| NaiveDate::parse_from_str(e, "%Y-%m-%d").ok());
let files: Vec<_> = list.filter_by_type(file_type)
.into_iter()
.filter(|f| {
if let Some(dt) = f.datetime {
let date = dt.date();
let after_start = start_date.map_or(true, |s| date >= s);
let before_end = end_date.map_or(true, |e| date <= e);
after_start && before_end
} else {
false
}
})
.cloned()
.collect();
if files.is_empty() {
output.write_value(&json!({
"status": "no_files",
"message": "No files found matching criteria",
}))?;
return Ok(ExitStatus::Success);
}
let downloader = Downloader::new(client, parallel as usize)?;
let mp = if !global.quiet {
Some(MultiProgress::new())
} else {
None
};
let results = downloader.download_files(&files, &cache, mp.as_ref()).await?;
let successful = results.iter().filter(|r| r.success).count();
let failed = results.iter().filter(|r| !r.success).count();
let total_size: u64 = results.iter().filter(|r| r.success).map(|r| r.size).sum();
output.write_value(&json!({
"status": "complete",
"file_type": file_type.as_str(),
"downloaded": successful,
"failed": failed,
"total_size_bytes": total_size,
}))?;
Ok(ExitStatus::Success)
}
async fn handle_sync(data_type: Option<DataType>, quick: bool, days: Option<u32>, global: &GlobalArgs) -> Result<ExitStatus> {
let client = GdeltClient::with_timeout(Duration::from_secs(global.timeout))?;
let cache = CacheDb::open()?;
let output = OutputWriter::new(global);
let days_to_sync = if quick {
1 } else {
days.unwrap_or(0) };
let file_types: Vec<FileType> = match data_type {
Some(DataType::Events) => vec![FileType::Events],
Some(DataType::Gkg) => vec![FileType::Gkg],
Some(DataType::Mentions) => vec![FileType::Mentions],
Some(DataType::All) | None => vec![FileType::Events, FileType::Gkg, FileType::Mentions],
};
if quick && !global.quiet {
eprintln!("Quick sync: fetching last 24 hours of data...");
} else if days.is_some() && !global.quiet {
eprintln!("Syncing last {} days of data...", days_to_sync);
}
let files: Vec<_> = if days_to_sync > 0 {
let list = MasterFileList::fetch(&client, false).await?;
let cutoff = chrono::Utc::now() - chrono::Duration::days(days_to_sync as i64);
let cutoff_date = cutoff.date_naive();
list.entries.iter()
.filter(|f| file_types.contains(&f.file_type))
.filter(|f| {
if let Some(dt) = f.datetime {
dt.date() >= cutoff_date
} else {
false
}
})
.cloned()
.collect()
} else {
let list = MasterFileList::fetch_latest(&client).await?;
list.entries.iter()
.filter(|f| file_types.contains(&f.file_type))
.cloned()
.collect()
};
if files.is_empty() {
output.write_value(&json!({
"status": "up_to_date",
"message": "No new files available",
}))?;
return Ok(ExitStatus::Success);
}
if global.dry_run {
let total_size: u64 = files.iter().map(|f| f.size).sum();
output.write_value(&json!({
"dry_run": true,
"mode": if quick { "quick" } else if days.is_some() { "days" } else { "latest" },
"days": days_to_sync,
"files_available": files.len(),
"total_size_mb": total_size / (1024 * 1024),
"files": files.iter().take(10).map(|f| json!({
"filename": f.filename(),
"type": f.file_type.as_str(),
"size": f.size,
})).collect::<Vec<_>>(),
}))?;
return Ok(ExitStatus::Success);
}
let downloader = Downloader::new(client, 4)?;
let mp = if !global.quiet {
Some(MultiProgress::new())
} else {
None
};
let results = downloader.download_files(&files, &cache, mp.as_ref()).await?;
let successful = results.iter().filter(|r| r.success).count();
let failed = results.iter().filter(|r| !r.success).count();
let total_size: u64 = results.iter().filter(|r| r.success).map(|r| r.size).sum();
output.write_value(&json!({
"status": "synced",
"mode": if quick { "quick" } else if days.is_some() { "days" } else { "latest" },
"files_downloaded": successful,
"files_failed": failed,
"total_size_bytes": total_size,
}))?;
Ok(ExitStatus::Success)
}
async fn handle_status(detailed: bool, global: &GlobalArgs) -> Result<ExitStatus> {
let cache = CacheDb::open()?;
let output = OutputWriter::new(global);
let status = cache.download_status()?;
let cache_stats = cache.stats()?;
if detailed {
let db = AnalyticsDb::open()?;
let db_stats = db.stats()?;
output.write_value(&json!({
"downloads": status,
"cache": cache_stats,
"database": db_stats,
}))?;
} else {
output.write_value(&status)?;
}
Ok(ExitStatus::Success)
}
async fn handle_list(data_type: Option<DataType>, global: &GlobalArgs) -> Result<ExitStatus> {
let cache = CacheDb::open()?;
let output = OutputWriter::new(global);
let file_type_str = match data_type {
Some(DataType::Events) => Some("events"),
Some(DataType::Gkg) => Some("gkg"),
Some(DataType::Mentions) => Some("mentions"),
Some(DataType::All) | None => None,
};
let downloads = cache.get_downloads(file_type_str)?;
output.write_value(&json!({
"count": downloads.len(),
"downloads": downloads,
}))?;
Ok(ExitStatus::Success)
}
async fn handle_delete(args: crate::cli::args::DataDeleteArgs, global: &GlobalArgs) -> Result<ExitStatus> {
let output = OutputWriter::new(global);
if !global.yes && !global.dry_run {
return Err(GdeltError::Validation(
"Use --yes to confirm deletion or --dry-run to preview".into()
));
}
let file_type = match args.data_type {
DataType::Events => Some(FileType::Events),
DataType::Gkg => Some(FileType::Gkg),
DataType::Mentions => Some(FileType::Mentions),
DataType::All => None,
};
let client = GdeltClient::new()?;
let downloader = Downloader::new(client, 1)?;
if global.dry_run {
let files = downloader.list_downloaded()?;
output.write_value(&json!({
"dry_run": true,
"files_to_delete": files.len(),
}))?;
return Ok(ExitStatus::Success);
}
let deleted = downloader.delete_files(file_type)?;
output.write_value(&json!({
"status": "deleted",
"files_removed": deleted,
}))?;
Ok(ExitStatus::Success)
}