#![allow(dead_code)]
use crate::api::client::GdeltClient;
use crate::config;
use crate::data::masterfile::{FileEntry, FileType};
use crate::db::cache::{CacheDb, DownloadRecord};
use crate::error::{GdeltError, Result};
use chrono::Utc;
use flate2::read::GzDecoder;
use futures::stream::{self, StreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use std::path::{Path, PathBuf};
use tokio::sync::Semaphore;
use tracing::{debug, info, instrument, warn};
use zip::ZipArchive;
pub struct Downloader {
client: GdeltClient,
download_dir: PathBuf,
parallel: usize,
}
impl Downloader {
pub fn new(client: GdeltClient, parallel: usize) -> Result<Self> {
let download_dir = config::data_dir()
.ok_or_else(|| GdeltError::Download("Could not determine data directory".into()))?
.join("downloads");
std::fs::create_dir_all(&download_dir)?;
Ok(Self {
client,
download_dir,
parallel,
})
}
pub fn with_download_dir(mut self, dir: PathBuf) -> Self {
self.download_dir = dir;
self
}
#[instrument(skip(self, files, cache, progress))]
pub async fn download_files(
&self,
files: &[FileEntry],
cache: &CacheDb,
progress: Option<&MultiProgress>,
) -> Result<Vec<DownloadResult>> {
info!("Starting download of {} files", files.len());
let semaphore = std::sync::Arc::new(Semaphore::new(self.parallel));
let client = &self.client;
let download_dir = &self.download_dir;
let overall_pb = progress.map(|mp| {
let pb = mp.add(ProgressBar::new(files.len() as u64));
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} files ({eta})")
.unwrap()
.progress_chars("#>-"),
);
pb
});
let results: Vec<DownloadResult> = stream::iter(files.iter().enumerate())
.map(|(_idx, file)| {
let permit = semaphore.clone();
let client = client.clone();
let download_dir = download_dir.clone();
let file_pb = progress.map(|mp| {
let pb = mp.add(ProgressBar::new(file.size));
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} {msg} [{bar:30.cyan/blue}] {bytes}/{total_bytes}")
.unwrap()
.progress_chars("#>-"),
);
pb.set_message(file.filename().to_string());
pb
});
let overall = overall_pb.clone();
async move {
let _permit = permit.acquire().await.unwrap();
let result = Self::download_single(&client, file, &download_dir, file_pb.as_ref()).await;
if let Some(ref pb) = file_pb {
pb.finish_and_clear();
}
if let Some(ref pb) = overall {
pb.inc(1);
}
result
}
})
.buffer_unordered(self.parallel)
.collect()
.await;
if let Some(pb) = overall_pb {
pb.finish_with_message("Download complete");
}
for result in &results {
if result.success {
let record = DownloadRecord {
file_id: result.file_id.clone(),
file_type: result.file_type.as_str().to_string(),
remote_url: result.url.clone(),
local_path: result.local_path.to_string_lossy().to_string(),
file_size: Some(result.size as i64),
checksum: result.hash.clone(),
date_from: result.date_int,
date_to: result.date_int,
downloaded_at: Utc::now(),
imported_at: None,
};
if let Err(e) = cache.record_download(&record) {
warn!("Failed to record download: {}", e);
}
}
}
Ok(results)
}
async fn download_single(
client: &GdeltClient,
file: &FileEntry,
download_dir: &Path,
progress: Option<&ProgressBar>,
) -> DownloadResult {
let filename = file.filename();
let compressed_path = download_dir.join(filename);
let extracted_name = filename
.replace(".zip", "")
.replace(".gz", "")
.replace(".CSV", ".csv");
let extracted_path = download_dir.join(&extracted_name);
if extracted_path.exists() {
debug!("File already exists: {}", extracted_path.display());
return DownloadResult {
file_id: file.file_id(),
url: file.url.clone(),
local_path: extracted_path,
success: true,
error: None,
size: file.size,
hash: Some(file.hash.clone()),
file_type: file.file_type,
date_int: file.date_int(),
};
}
debug!("Downloading: {}", file.url);
let bytes = match client.get_bytes(&file.url).await {
Ok(b) => b,
Err(e) => {
return DownloadResult {
file_id: file.file_id(),
url: file.url.clone(),
local_path: compressed_path,
success: false,
error: Some(e.to_string()),
size: 0,
hash: None,
file_type: file.file_type,
date_int: file.date_int(),
};
}
};
if let Some(pb) = progress {
pb.set_position(bytes.len() as u64);
}
if let Err(e) = std::fs::write(&compressed_path, &bytes) {
return DownloadResult {
file_id: file.file_id(),
url: file.url.clone(),
local_path: compressed_path,
success: false,
error: Some(format!("Failed to write file: {}", e)),
size: bytes.len() as u64,
hash: None,
file_type: file.file_type,
date_int: file.date_int(),
};
}
let extract_result = if filename.ends_with(".zip") {
Self::extract_zip(&compressed_path, &extracted_path)
} else if filename.ends_with(".gz") {
Self::extract_gzip(&compressed_path, &extracted_path)
} else {
Ok(())
};
if let Err(e) = extract_result {
return DownloadResult {
file_id: file.file_id(),
url: file.url.clone(),
local_path: compressed_path,
success: false,
error: Some(format!("Extraction failed: {}", e)),
size: bytes.len() as u64,
hash: Some(file.hash.clone()),
file_type: file.file_type,
date_int: file.date_int(),
};
}
let _ = std::fs::remove_file(&compressed_path);
DownloadResult {
file_id: file.file_id(),
url: file.url.clone(),
local_path: extracted_path,
success: true,
error: None,
size: bytes.len() as u64,
hash: Some(file.hash.clone()),
file_type: file.file_type,
date_int: file.date_int(),
}
}
fn extract_zip(zip_path: &Path, output_path: &Path) -> Result<()> {
let file = std::fs::File::open(zip_path)?;
let mut archive = ZipArchive::new(file)
.map_err(|e| GdeltError::Download(format!("Invalid ZIP file: {}", e)))?;
if archive.is_empty() {
return Err(GdeltError::Download("Empty ZIP file".into()));
}
let mut inner = archive.by_index(0)
.map_err(|e| GdeltError::Download(format!("Failed to read ZIP entry: {}", e)))?;
let mut output = std::fs::File::create(output_path)?;
std::io::copy(&mut inner, &mut output)?;
Ok(())
}
fn extract_gzip(gz_path: &Path, output_path: &Path) -> Result<()> {
let file = std::fs::File::open(gz_path)?;
let mut decoder = GzDecoder::new(file);
let mut output = std::fs::File::create(output_path)?;
std::io::copy(&mut decoder, &mut output)?;
Ok(())
}
pub fn download_dir(&self) -> &Path {
&self.download_dir
}
pub fn list_downloaded(&self) -> Result<Vec<PathBuf>> {
let mut files = Vec::new();
for entry in std::fs::read_dir(&self.download_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_file() && path.extension().map(|e| e == "csv").unwrap_or(false) {
files.push(path);
}
}
files.sort();
Ok(files)
}
pub fn delete_files(&self, file_type: Option<FileType>) -> Result<u64> {
let mut count = 0;
for entry in std::fs::read_dir(&self.download_dir)? {
let entry = entry?;
let path = entry.path();
if !path.is_file() {
continue;
}
let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
let matches = match file_type {
Some(FileType::Events) => filename.contains(".export.") || filename.contains("export"),
Some(FileType::Gkg) => filename.contains(".gkg.") || filename.contains("gkg"),
Some(FileType::Mentions) => filename.contains(".mentions.") || filename.contains("mentions"),
Some(FileType::Unknown) | None => true,
};
if matches {
std::fs::remove_file(&path)?;
count += 1;
}
}
Ok(count)
}
}
#[derive(Debug, Clone)]
pub struct DownloadResult {
pub file_id: String,
pub url: String,
pub local_path: PathBuf,
pub success: bool,
pub error: Option<String>,
pub size: u64,
pub hash: Option<String>,
pub file_type: FileType,
pub date_int: Option<i64>,
}
impl DownloadResult {
pub fn is_success(&self) -> bool {
self.success
}
}