gdelt 0.1.0

CLI for GDELT Project - optimized for agentic usage with local data caching
//! Data importer for loading CSV files into DuckDB.
//!
//! Handles efficient bulk import of GDELT data files.

#![allow(dead_code)]

use crate::data::masterfile::FileType;
use crate::db::cache::CacheDb;
use crate::db::duckdb::AnalyticsDb;
use crate::error::{GdeltError, Result};
use indicatif::ProgressBar;
use std::path::Path;
use tracing::{debug, info, instrument, warn};

/// Data importer
pub struct Importer<'a> {
    db: &'a AnalyticsDb,
    cache: &'a CacheDb,
}

impl<'a> Importer<'a> {
    /// Create a new importer
    pub fn new(db: &'a AnalyticsDb, cache: &'a CacheDb) -> Self {
        Self { db, cache }
    }

    /// Import a single file
    #[instrument(skip(self, progress))]
    pub fn import_file(
        &self,
        path: &Path,
        file_type: FileType,
        progress: Option<&ProgressBar>,
    ) -> Result<ImportResult> {
        let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
        info!("Importing file: {} as {:?}", filename, file_type);

        if let Some(pb) = progress {
            pb.set_message(filename.to_string());
        }

        let result = match file_type {
            FileType::Events => self.import_events(path),
            FileType::Gkg => self.import_gkg(path),
            FileType::Mentions => self.import_mentions(path),
            FileType::Unknown => {
                // Try to detect from filename
                if filename.contains("export") {
                    self.import_events(path)
                } else if filename.contains("gkg") {
                    self.import_gkg(path)
                } else if filename.contains("mentions") {
                    self.import_mentions(path)
                } else {
                    return Err(GdeltError::Import(format!(
                        "Cannot determine file type for: {}",
                        filename
                    )));
                }
            }
        };

        match result {
            Ok(count) => {
                // Mark as imported in cache
                let file_id = path.file_stem().and_then(|n| n.to_str()).unwrap_or("");
                if let Err(e) = self.cache.mark_imported(file_id) {
                    warn!("Failed to mark file as imported: {}", e);
                }

                Ok(ImportResult {
                    file: path.to_path_buf(),
                    file_type,
                    rows_imported: count,
                    success: true,
                    error: None,
                })
            }
            Err(e) => Ok(ImportResult {
                file: path.to_path_buf(),
                file_type,
                rows_imported: 0,
                success: false,
                error: Some(e.to_string()),
            }),
        }
    }

    /// Import events from CSV
    fn import_events(&self, path: &Path) -> Result<u64> {
        let path_str = path.to_string_lossy();
        debug!("Importing events from: {}", path_str);

        // Use DuckDB's CSV reader for efficient bulk import
        let sql = format!(
            r#"
            INSERT INTO events
            SELECT
                column0 as global_event_id,
                column1 as sql_date,
                column2 as month_year,
                column3 as year,
                column4 as fraction_date,
                NULLIF(column5, '') as actor1_code,
                NULLIF(column6, '') as actor1_name,
                NULLIF(column7, '') as actor1_country_code,
                NULLIF(column8, '') as actor1_known_group_code,
                NULLIF(column9, '') as actor1_ethnic_code,
                NULLIF(column10, '') as actor1_religion1_code,
                NULLIF(column11, '') as actor1_religion2_code,
                NULLIF(column12, '') as actor1_type1_code,
                NULLIF(column13, '') as actor1_type2_code,
                NULLIF(column14, '') as actor1_type3_code,
                NULLIF(column15, '') as actor2_code,
                NULLIF(column16, '') as actor2_name,
                NULLIF(column17, '') as actor2_country_code,
                NULLIF(column18, '') as actor2_known_group_code,
                NULLIF(column19, '') as actor2_ethnic_code,
                NULLIF(column20, '') as actor2_religion1_code,
                NULLIF(column21, '') as actor2_religion2_code,
                NULLIF(column22, '') as actor2_type1_code,
                NULLIF(column23, '') as actor2_type2_code,
                NULLIF(column24, '') as actor2_type3_code,
                column25 = '1' as is_root_event,
                column26 as event_code,
                NULLIF(column27, '') as event_base_code,
                NULLIF(column28, '') as event_root_code,
                TRY_CAST(column29 AS TINYINT) as quad_class,
                TRY_CAST(column30 AS DOUBLE) as goldstein_scale,
                TRY_CAST(column31 AS INTEGER) as num_mentions,
                TRY_CAST(column32 AS INTEGER) as num_sources,
                TRY_CAST(column33 AS INTEGER) as num_articles,
                TRY_CAST(column34 AS DOUBLE) as avg_tone,
                TRY_CAST(column35 AS INTEGER) as actor1_geo_type,
                NULLIF(column36, '') as actor1_geo_fullname,
                NULLIF(column37, '') as actor1_geo_country_code,
                NULLIF(column38, '') as actor1_geo_adm1_code,
                NULLIF(column39, '') as actor1_geo_adm2_code,
                TRY_CAST(column40 AS DOUBLE) as actor1_geo_lat,
                TRY_CAST(column41 AS DOUBLE) as actor1_geo_long,
                NULLIF(column42, '') as actor1_geo_feature_id,
                TRY_CAST(column43 AS INTEGER) as actor2_geo_type,
                NULLIF(column44, '') as actor2_geo_fullname,
                NULLIF(column45, '') as actor2_geo_country_code,
                NULLIF(column46, '') as actor2_geo_adm1_code,
                NULLIF(column47, '') as actor2_geo_adm2_code,
                TRY_CAST(column48 AS DOUBLE) as actor2_geo_lat,
                TRY_CAST(column49 AS DOUBLE) as actor2_geo_long,
                NULLIF(column50, '') as actor2_geo_feature_id,
                TRY_CAST(column51 AS INTEGER) as action_geo_type,
                NULLIF(column52, '') as action_geo_fullname,
                NULLIF(column53, '') as action_geo_country_code,
                NULLIF(column54, '') as action_geo_adm1_code,
                NULLIF(column55, '') as action_geo_adm2_code,
                TRY_CAST(column56 AS DOUBLE) as action_geo_lat,
                TRY_CAST(column57 AS DOUBLE) as action_geo_long,
                NULLIF(column58, '') as action_geo_feature_id,
                TRY_CAST(column59 AS BIGINT) as date_added,
                NULLIF(column60, '') as source_url
            FROM read_csv('{}', delim='\t', header=false, ignore_errors=true)
            ON CONFLICT DO NOTHING
            "#,
            path_str
        );

        let result = self.db.query(&sql)?;
        Ok(result.rows.len() as u64)
    }

    /// Import GKG from CSV
    fn import_gkg(&self, path: &Path) -> Result<u64> {
        let path_str = path.to_string_lossy();
        debug!("Importing GKG from: {}", path_str);

        let sql = format!(
            r#"
            INSERT INTO gkg
            SELECT
                column0 as gkg_record_id,
                TRY_CAST(column1 AS BIGINT) as date,
                TRY_CAST(column2 AS INTEGER) as source_collection_identifier,
                NULLIF(column3, '') as source_common_name,
                column4 as document_identifier,
                string_split(NULLIF(column7, ''), ';') as themes,
                string_split(NULLIF(column9, ''), ';') as locations,
                string_split(NULLIF(column11, ''), ';') as persons,
                string_split(NULLIF(column13, ''), ';') as organizations,
                TRY_CAST(split_part(column15, ',', 1) AS DOUBLE) as tone,
                TRY_CAST(split_part(column15, ',', 2) AS DOUBLE) as positive_score,
                TRY_CAST(split_part(column15, ',', 3) AS DOUBLE) as negative_score,
                TRY_CAST(split_part(column15, ',', 4) AS DOUBLE) as polarity,
                TRY_CAST(split_part(column15, ',', 7) AS INTEGER) as word_count,
                NULLIF(column18, '') as sharing_image
            FROM read_csv('{}', delim='\t', header=false, ignore_errors=true)
            ON CONFLICT DO NOTHING
            "#,
            path_str
        );

        let result = self.db.query(&sql)?;
        Ok(result.rows.len() as u64)
    }

    /// Import mentions from CSV
    fn import_mentions(&self, path: &Path) -> Result<u64> {
        let path_str = path.to_string_lossy();
        debug!("Importing mentions from: {}", path_str);

        let sql = format!(
            r#"
            INSERT INTO mentions
            SELECT
                TRY_CAST(column0 AS BIGINT) as global_event_id,
                TRY_CAST(column2 AS BIGINT) as mention_time_date,
                TRY_CAST(column3 AS INTEGER) as mention_type,
                NULLIF(column4, '') as mention_source_name,
                NULLIF(column5, '') as mention_identifier,
                TRY_CAST(column6 AS INTEGER) as sentence_id,
                TRY_CAST(column11 AS DOUBLE) as confidence,
                TRY_CAST(column12 AS INTEGER) as mention_doc_len,
                TRY_CAST(column13 AS DOUBLE) as mention_doc_tone
            FROM read_csv('{}', delim='\t', header=false, ignore_errors=true)
            "#,
            path_str
        );

        let result = self.db.query(&sql)?;
        Ok(result.rows.len() as u64)
    }

    /// Import all files in a directory
    pub fn import_directory(
        &self,
        dir: &Path,
        file_type: Option<FileType>,
        progress: Option<&ProgressBar>,
    ) -> Result<Vec<ImportResult>> {
        let mut results = Vec::new();

        for entry in std::fs::read_dir(dir)? {
            let entry = entry?;
            let path = entry.path();

            if !path.is_file() {
                continue;
            }

            let ext = path.extension().and_then(|e| e.to_str());
            if ext != Some("csv") {
                continue;
            }

            let detected_type = self.detect_file_type(&path);

            // Filter by type if specified
            if let Some(filter_type) = file_type {
                if detected_type != filter_type {
                    continue;
                }
            }

            let result = self.import_file(&path, detected_type, progress)?;
            results.push(result);
        }

        Ok(results)
    }

    /// Detect file type from filename
    fn detect_file_type(&self, path: &Path) -> FileType {
        let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
        let filename_lower = filename.to_lowercase();

        if filename_lower.contains("export") {
            FileType::Events
        } else if filename_lower.contains("gkg") {
            FileType::Gkg
        } else if filename_lower.contains("mentions") {
            FileType::Mentions
        } else {
            FileType::Unknown
        }
    }
}

/// Result of an import operation
#[derive(Debug, Clone)]
pub struct ImportResult {
    pub file: std::path::PathBuf,
    pub file_type: FileType,
    pub rows_imported: u64,
    pub success: bool,
    pub error: Option<String>,
}

impl ImportResult {
    /// Check if import succeeded
    pub fn is_success(&self) -> bool {
        self.success
    }
}

/// Import summary
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct ImportSummary {
    pub total_files: u64,
    pub successful: u64,
    pub failed: u64,
    pub total_rows: u64,
    pub events_rows: u64,
    pub gkg_rows: u64,
    pub mentions_rows: u64,
}

impl ImportSummary {
    /// Create summary from import results
    pub fn from_results(results: &[ImportResult]) -> Self {
        let mut summary = Self::default();

        for result in results {
            summary.total_files += 1;
            if result.success {
                summary.successful += 1;
                summary.total_rows += result.rows_imported;

                match result.file_type {
                    FileType::Events => summary.events_rows += result.rows_imported,
                    FileType::Gkg => summary.gkg_rows += result.rows_imported,
                    FileType::Mentions => summary.mentions_rows += result.rows_imported,
                    FileType::Unknown => {}
                }
            } else {
                summary.failed += 1;
            }
        }

        summary
    }
}