gdelt 0.1.0

CLI for GDELT Project - optimized for agentic usage with local data caching
//! Data file downloader for GDELT.
//!
//! Downloads and extracts GDELT data files with progress tracking
//! and parallel download support.

#![allow(dead_code)]

use crate::api::client::GdeltClient;
use crate::config;
use crate::data::masterfile::{FileEntry, FileType};
use crate::db::cache::{CacheDb, DownloadRecord};
use crate::error::{GdeltError, Result};
use chrono::Utc;
use flate2::read::GzDecoder;
use futures::stream::{self, StreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use std::path::{Path, PathBuf};
use tokio::sync::Semaphore;
use tracing::{debug, info, instrument, warn};
use zip::ZipArchive;

/// Downloader for GDELT data files
pub struct Downloader {
    client: GdeltClient,
    download_dir: PathBuf,
    parallel: usize,
}

impl Downloader {
    /// Create a new downloader
    pub fn new(client: GdeltClient, parallel: usize) -> Result<Self> {
        let download_dir = config::data_dir()
            .ok_or_else(|| GdeltError::Download("Could not determine data directory".into()))?
            .join("downloads");

        std::fs::create_dir_all(&download_dir)?;

        Ok(Self {
            client,
            download_dir,
            parallel,
        })
    }

    /// Set download directory
    pub fn with_download_dir(mut self, dir: PathBuf) -> Self {
        self.download_dir = dir;
        self
    }

    /// Download multiple files
    #[instrument(skip(self, files, cache, progress))]
    pub async fn download_files(
        &self,
        files: &[FileEntry],
        cache: &CacheDb,
        progress: Option<&MultiProgress>,
    ) -> Result<Vec<DownloadResult>> {
        info!("Starting download of {} files", files.len());

        let semaphore = std::sync::Arc::new(Semaphore::new(self.parallel));
        let client = &self.client;
        let download_dir = &self.download_dir;

        let overall_pb = progress.map(|mp| {
            let pb = mp.add(ProgressBar::new(files.len() as u64));
            pb.set_style(
                ProgressStyle::default_bar()
                    .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} files ({eta})")
                    .unwrap()
                    .progress_chars("#>-"),
            );
            pb
        });

        let results: Vec<DownloadResult> = stream::iter(files.iter().enumerate())
            .map(|(_idx, file)| {
                let permit = semaphore.clone();
                let client = client.clone();
                let download_dir = download_dir.clone();
                let file_pb = progress.map(|mp| {
                    let pb = mp.add(ProgressBar::new(file.size));
                    pb.set_style(
                        ProgressStyle::default_bar()
                            .template("{spinner:.green} {msg} [{bar:30.cyan/blue}] {bytes}/{total_bytes}")
                            .unwrap()
                            .progress_chars("#>-"),
                    );
                    pb.set_message(file.filename().to_string());
                    pb
                });
                let overall = overall_pb.clone();

                async move {
                    let _permit = permit.acquire().await.unwrap();

                    let result = Self::download_single(&client, file, &download_dir, file_pb.as_ref()).await;

                    if let Some(ref pb) = file_pb {
                        pb.finish_and_clear();
                    }
                    if let Some(ref pb) = overall {
                        pb.inc(1);
                    }

                    result
                }
            })
            .buffer_unordered(self.parallel)
            .collect()
            .await;

        if let Some(pb) = overall_pb {
            pb.finish_with_message("Download complete");
        }

        // Record successful downloads in cache
        for result in &results {
            if result.success {
                let record = DownloadRecord {
                    file_id: result.file_id.clone(),
                    file_type: result.file_type.as_str().to_string(),
                    remote_url: result.url.clone(),
                    local_path: result.local_path.to_string_lossy().to_string(),
                    file_size: Some(result.size as i64),
                    checksum: result.hash.clone(),
                    date_from: result.date_int,
                    date_to: result.date_int,
                    downloaded_at: Utc::now(),
                    imported_at: None,
                };
                if let Err(e) = cache.record_download(&record) {
                    warn!("Failed to record download: {}", e);
                }
            }
        }

        Ok(results)
    }

    /// Download a single file
    async fn download_single(
        client: &GdeltClient,
        file: &FileEntry,
        download_dir: &Path,
        progress: Option<&ProgressBar>,
    ) -> DownloadResult {
        let filename = file.filename();
        let compressed_path = download_dir.join(filename);

        // Determine extracted filename
        let extracted_name = filename
            .replace(".zip", "")
            .replace(".gz", "")
            .replace(".CSV", ".csv");
        let extracted_path = download_dir.join(&extracted_name);

        // Skip if already extracted
        if extracted_path.exists() {
            debug!("File already exists: {}", extracted_path.display());
            return DownloadResult {
                file_id: file.file_id(),
                url: file.url.clone(),
                local_path: extracted_path,
                success: true,
                error: None,
                size: file.size,
                hash: Some(file.hash.clone()),
                file_type: file.file_type,
                date_int: file.date_int(),
            };
        }

        // Download the file
        debug!("Downloading: {}", file.url);
        let bytes = match client.get_bytes(&file.url).await {
            Ok(b) => b,
            Err(e) => {
                return DownloadResult {
                    file_id: file.file_id(),
                    url: file.url.clone(),
                    local_path: compressed_path,
                    success: false,
                    error: Some(e.to_string()),
                    size: 0,
                    hash: None,
                    file_type: file.file_type,
                    date_int: file.date_int(),
                };
            }
        };

        if let Some(pb) = progress {
            pb.set_position(bytes.len() as u64);
        }

        // Save compressed file
        if let Err(e) = std::fs::write(&compressed_path, &bytes) {
            return DownloadResult {
                file_id: file.file_id(),
                url: file.url.clone(),
                local_path: compressed_path,
                success: false,
                error: Some(format!("Failed to write file: {}", e)),
                size: bytes.len() as u64,
                hash: None,
                file_type: file.file_type,
                date_int: file.date_int(),
            };
        }

        // Extract the file
        let extract_result = if filename.ends_with(".zip") {
            Self::extract_zip(&compressed_path, &extracted_path)
        } else if filename.ends_with(".gz") {
            Self::extract_gzip(&compressed_path, &extracted_path)
        } else {
            // No extraction needed
            Ok(())
        };

        if let Err(e) = extract_result {
            return DownloadResult {
                file_id: file.file_id(),
                url: file.url.clone(),
                local_path: compressed_path,
                success: false,
                error: Some(format!("Extraction failed: {}", e)),
                size: bytes.len() as u64,
                hash: Some(file.hash.clone()),
                file_type: file.file_type,
                date_int: file.date_int(),
            };
        }

        // Clean up compressed file
        let _ = std::fs::remove_file(&compressed_path);

        DownloadResult {
            file_id: file.file_id(),
            url: file.url.clone(),
            local_path: extracted_path,
            success: true,
            error: None,
            size: bytes.len() as u64,
            hash: Some(file.hash.clone()),
            file_type: file.file_type,
            date_int: file.date_int(),
        }
    }

    /// Extract a ZIP file
    fn extract_zip(zip_path: &Path, output_path: &Path) -> Result<()> {
        let file = std::fs::File::open(zip_path)?;
        let mut archive = ZipArchive::new(file)
            .map_err(|e| GdeltError::Download(format!("Invalid ZIP file: {}", e)))?;

        // GDELT ZIPs contain a single CSV file
        if archive.is_empty() {
            return Err(GdeltError::Download("Empty ZIP file".into()));
        }

        let mut inner = archive.by_index(0)
            .map_err(|e| GdeltError::Download(format!("Failed to read ZIP entry: {}", e)))?;

        let mut output = std::fs::File::create(output_path)?;
        std::io::copy(&mut inner, &mut output)?;

        Ok(())
    }

    /// Extract a GZIP file
    fn extract_gzip(gz_path: &Path, output_path: &Path) -> Result<()> {
        let file = std::fs::File::open(gz_path)?;
        let mut decoder = GzDecoder::new(file);

        let mut output = std::fs::File::create(output_path)?;
        std::io::copy(&mut decoder, &mut output)?;

        Ok(())
    }

    /// Get download directory
    pub fn download_dir(&self) -> &Path {
        &self.download_dir
    }

    /// List downloaded files
    pub fn list_downloaded(&self) -> Result<Vec<PathBuf>> {
        let mut files = Vec::new();

        for entry in std::fs::read_dir(&self.download_dir)? {
            let entry = entry?;
            let path = entry.path();
            if path.is_file() && path.extension().map(|e| e == "csv").unwrap_or(false) {
                files.push(path);
            }
        }

        files.sort();
        Ok(files)
    }

    /// Delete downloaded files
    pub fn delete_files(&self, file_type: Option<FileType>) -> Result<u64> {
        let mut count = 0;

        for entry in std::fs::read_dir(&self.download_dir)? {
            let entry = entry?;
            let path = entry.path();

            if !path.is_file() {
                continue;
            }

            let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");

            let matches = match file_type {
                Some(FileType::Events) => filename.contains(".export.") || filename.contains("export"),
                Some(FileType::Gkg) => filename.contains(".gkg.") || filename.contains("gkg"),
                Some(FileType::Mentions) => filename.contains(".mentions.") || filename.contains("mentions"),
                Some(FileType::Unknown) | None => true,
            };

            if matches {
                std::fs::remove_file(&path)?;
                count += 1;
            }
        }

        Ok(count)
    }
}

/// Result of a download operation
#[derive(Debug, Clone)]
pub struct DownloadResult {
    pub file_id: String,
    pub url: String,
    pub local_path: PathBuf,
    pub success: bool,
    pub error: Option<String>,
    pub size: u64,
    pub hash: Option<String>,
    pub file_type: FileType,
    pub date_int: Option<i64>,
}

impl DownloadResult {
    /// Check if download succeeded
    pub fn is_success(&self) -> bool {
        self.success
    }
}