#![allow(dead_code)]
use crate::data::masterfile::FileType;
use crate::db::cache::CacheDb;
use crate::db::duckdb::AnalyticsDb;
use crate::error::{GdeltError, Result};
use indicatif::ProgressBar;
use std::path::Path;
use tracing::{debug, info, instrument, warn};
pub struct Importer<'a> {
db: &'a AnalyticsDb,
cache: &'a CacheDb,
}
impl<'a> Importer<'a> {
pub fn new(db: &'a AnalyticsDb, cache: &'a CacheDb) -> Self {
Self { db, cache }
}
#[instrument(skip(self, progress))]
pub fn import_file(
&self,
path: &Path,
file_type: FileType,
progress: Option<&ProgressBar>,
) -> Result<ImportResult> {
let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
info!("Importing file: {} as {:?}", filename, file_type);
if let Some(pb) = progress {
pb.set_message(filename.to_string());
}
let result = match file_type {
FileType::Events => self.import_events(path),
FileType::Gkg => self.import_gkg(path),
FileType::Mentions => self.import_mentions(path),
FileType::Unknown => {
if filename.contains("export") {
self.import_events(path)
} else if filename.contains("gkg") {
self.import_gkg(path)
} else if filename.contains("mentions") {
self.import_mentions(path)
} else {
return Err(GdeltError::Import(format!(
"Cannot determine file type for: {}",
filename
)));
}
}
};
match result {
Ok(count) => {
let file_id = path.file_stem().and_then(|n| n.to_str()).unwrap_or("");
if let Err(e) = self.cache.mark_imported(file_id) {
warn!("Failed to mark file as imported: {}", e);
}
Ok(ImportResult {
file: path.to_path_buf(),
file_type,
rows_imported: count,
success: true,
error: None,
})
}
Err(e) => Ok(ImportResult {
file: path.to_path_buf(),
file_type,
rows_imported: 0,
success: false,
error: Some(e.to_string()),
}),
}
}
fn import_events(&self, path: &Path) -> Result<u64> {
let path_str = path.to_string_lossy();
debug!("Importing events from: {}", path_str);
let sql = format!(
r#"
INSERT INTO events
SELECT
column0 as global_event_id,
column1 as sql_date,
column2 as month_year,
column3 as year,
column4 as fraction_date,
NULLIF(column5, '') as actor1_code,
NULLIF(column6, '') as actor1_name,
NULLIF(column7, '') as actor1_country_code,
NULLIF(column8, '') as actor1_known_group_code,
NULLIF(column9, '') as actor1_ethnic_code,
NULLIF(column10, '') as actor1_religion1_code,
NULLIF(column11, '') as actor1_religion2_code,
NULLIF(column12, '') as actor1_type1_code,
NULLIF(column13, '') as actor1_type2_code,
NULLIF(column14, '') as actor1_type3_code,
NULLIF(column15, '') as actor2_code,
NULLIF(column16, '') as actor2_name,
NULLIF(column17, '') as actor2_country_code,
NULLIF(column18, '') as actor2_known_group_code,
NULLIF(column19, '') as actor2_ethnic_code,
NULLIF(column20, '') as actor2_religion1_code,
NULLIF(column21, '') as actor2_religion2_code,
NULLIF(column22, '') as actor2_type1_code,
NULLIF(column23, '') as actor2_type2_code,
NULLIF(column24, '') as actor2_type3_code,
column25 = '1' as is_root_event,
column26 as event_code,
NULLIF(column27, '') as event_base_code,
NULLIF(column28, '') as event_root_code,
TRY_CAST(column29 AS TINYINT) as quad_class,
TRY_CAST(column30 AS DOUBLE) as goldstein_scale,
TRY_CAST(column31 AS INTEGER) as num_mentions,
TRY_CAST(column32 AS INTEGER) as num_sources,
TRY_CAST(column33 AS INTEGER) as num_articles,
TRY_CAST(column34 AS DOUBLE) as avg_tone,
TRY_CAST(column35 AS INTEGER) as actor1_geo_type,
NULLIF(column36, '') as actor1_geo_fullname,
NULLIF(column37, '') as actor1_geo_country_code,
NULLIF(column38, '') as actor1_geo_adm1_code,
NULLIF(column39, '') as actor1_geo_adm2_code,
TRY_CAST(column40 AS DOUBLE) as actor1_geo_lat,
TRY_CAST(column41 AS DOUBLE) as actor1_geo_long,
NULLIF(column42, '') as actor1_geo_feature_id,
TRY_CAST(column43 AS INTEGER) as actor2_geo_type,
NULLIF(column44, '') as actor2_geo_fullname,
NULLIF(column45, '') as actor2_geo_country_code,
NULLIF(column46, '') as actor2_geo_adm1_code,
NULLIF(column47, '') as actor2_geo_adm2_code,
TRY_CAST(column48 AS DOUBLE) as actor2_geo_lat,
TRY_CAST(column49 AS DOUBLE) as actor2_geo_long,
NULLIF(column50, '') as actor2_geo_feature_id,
TRY_CAST(column51 AS INTEGER) as action_geo_type,
NULLIF(column52, '') as action_geo_fullname,
NULLIF(column53, '') as action_geo_country_code,
NULLIF(column54, '') as action_geo_adm1_code,
NULLIF(column55, '') as action_geo_adm2_code,
TRY_CAST(column56 AS DOUBLE) as action_geo_lat,
TRY_CAST(column57 AS DOUBLE) as action_geo_long,
NULLIF(column58, '') as action_geo_feature_id,
TRY_CAST(column59 AS BIGINT) as date_added,
NULLIF(column60, '') as source_url
FROM read_csv('{}', delim='\t', header=false, ignore_errors=true)
ON CONFLICT DO NOTHING
"#,
path_str
);
let result = self.db.query(&sql)?;
Ok(result.rows.len() as u64)
}
fn import_gkg(&self, path: &Path) -> Result<u64> {
let path_str = path.to_string_lossy();
debug!("Importing GKG from: {}", path_str);
let sql = format!(
r#"
INSERT INTO gkg
SELECT
column0 as gkg_record_id,
TRY_CAST(column1 AS BIGINT) as date,
TRY_CAST(column2 AS INTEGER) as source_collection_identifier,
NULLIF(column3, '') as source_common_name,
column4 as document_identifier,
string_split(NULLIF(column7, ''), ';') as themes,
string_split(NULLIF(column9, ''), ';') as locations,
string_split(NULLIF(column11, ''), ';') as persons,
string_split(NULLIF(column13, ''), ';') as organizations,
TRY_CAST(split_part(column15, ',', 1) AS DOUBLE) as tone,
TRY_CAST(split_part(column15, ',', 2) AS DOUBLE) as positive_score,
TRY_CAST(split_part(column15, ',', 3) AS DOUBLE) as negative_score,
TRY_CAST(split_part(column15, ',', 4) AS DOUBLE) as polarity,
TRY_CAST(split_part(column15, ',', 7) AS INTEGER) as word_count,
NULLIF(column18, '') as sharing_image
FROM read_csv('{}', delim='\t', header=false, ignore_errors=true)
ON CONFLICT DO NOTHING
"#,
path_str
);
let result = self.db.query(&sql)?;
Ok(result.rows.len() as u64)
}
fn import_mentions(&self, path: &Path) -> Result<u64> {
let path_str = path.to_string_lossy();
debug!("Importing mentions from: {}", path_str);
let sql = format!(
r#"
INSERT INTO mentions
SELECT
TRY_CAST(column0 AS BIGINT) as global_event_id,
TRY_CAST(column2 AS BIGINT) as mention_time_date,
TRY_CAST(column3 AS INTEGER) as mention_type,
NULLIF(column4, '') as mention_source_name,
NULLIF(column5, '') as mention_identifier,
TRY_CAST(column6 AS INTEGER) as sentence_id,
TRY_CAST(column11 AS DOUBLE) as confidence,
TRY_CAST(column12 AS INTEGER) as mention_doc_len,
TRY_CAST(column13 AS DOUBLE) as mention_doc_tone
FROM read_csv('{}', delim='\t', header=false, ignore_errors=true)
"#,
path_str
);
let result = self.db.query(&sql)?;
Ok(result.rows.len() as u64)
}
pub fn import_directory(
&self,
dir: &Path,
file_type: Option<FileType>,
progress: Option<&ProgressBar>,
) -> Result<Vec<ImportResult>> {
let mut results = Vec::new();
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if !path.is_file() {
continue;
}
let ext = path.extension().and_then(|e| e.to_str());
if ext != Some("csv") {
continue;
}
let detected_type = self.detect_file_type(&path);
if let Some(filter_type) = file_type {
if detected_type != filter_type {
continue;
}
}
let result = self.import_file(&path, detected_type, progress)?;
results.push(result);
}
Ok(results)
}
fn detect_file_type(&self, path: &Path) -> FileType {
let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
let filename_lower = filename.to_lowercase();
if filename_lower.contains("export") {
FileType::Events
} else if filename_lower.contains("gkg") {
FileType::Gkg
} else if filename_lower.contains("mentions") {
FileType::Mentions
} else {
FileType::Unknown
}
}
}
#[derive(Debug, Clone)]
pub struct ImportResult {
pub file: std::path::PathBuf,
pub file_type: FileType,
pub rows_imported: u64,
pub success: bool,
pub error: Option<String>,
}
impl ImportResult {
pub fn is_success(&self) -> bool {
self.success
}
}
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct ImportSummary {
pub total_files: u64,
pub successful: u64,
pub failed: u64,
pub total_rows: u64,
pub events_rows: u64,
pub gkg_rows: u64,
pub mentions_rows: u64,
}
impl ImportSummary {
pub fn from_results(results: &[ImportResult]) -> Self {
let mut summary = Self::default();
for result in results {
summary.total_files += 1;
if result.success {
summary.successful += 1;
summary.total_rows += result.rows_imported;
match result.file_type {
FileType::Events => summary.events_rows += result.rows_imported,
FileType::Gkg => summary.gkg_rows += result.rows_imported,
FileType::Mentions => summary.mentions_rows += result.rows_imported,
FileType::Unknown => {}
}
} else {
summary.failed += 1;
}
}
summary
}
}