#![allow(dead_code)]
use crate::config;
use crate::error::{GdeltError, Result};
use duckdb::{Connection, Row};
use std::path::PathBuf;
use tracing::{debug, info, instrument};
pub struct AnalyticsDb {
conn: Connection,
db_path: PathBuf,
}
impl AnalyticsDb {
#[instrument]
pub fn open() -> Result<Self> {
let db_path = config::data_dir()
.ok_or_else(|| GdeltError::Database("Could not determine data directory".into()))?
.join("gdelt.duckdb");
Self::open_at(db_path)
}
pub fn open_at(db_path: PathBuf) -> Result<Self> {
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open(&db_path)?;
let db = Self { conn, db_path };
db.init_schema()?;
Ok(db)
}
pub fn open_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
let db = Self {
conn,
db_path: PathBuf::from(":memory:"),
};
db.init_schema()?;
Ok(db)
}
fn init_schema(&self) -> Result<()> {
debug!("Initializing DuckDB schema");
self.conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS events (
global_event_id BIGINT PRIMARY KEY,
sql_date INTEGER NOT NULL,
month_year INTEGER NOT NULL,
year INTEGER NOT NULL,
fraction_date DOUBLE,
actor1_code VARCHAR,
actor1_name VARCHAR,
actor1_country_code VARCHAR,
actor1_known_group_code VARCHAR,
actor1_ethnic_code VARCHAR,
actor1_religion1_code VARCHAR,
actor1_religion2_code VARCHAR,
actor1_type1_code VARCHAR,
actor1_type2_code VARCHAR,
actor1_type3_code VARCHAR,
actor2_code VARCHAR,
actor2_name VARCHAR,
actor2_country_code VARCHAR,
actor2_known_group_code VARCHAR,
actor2_ethnic_code VARCHAR,
actor2_religion1_code VARCHAR,
actor2_religion2_code VARCHAR,
actor2_type1_code VARCHAR,
actor2_type2_code VARCHAR,
actor2_type3_code VARCHAR,
is_root_event BOOLEAN,
event_code VARCHAR NOT NULL,
event_base_code VARCHAR,
event_root_code VARCHAR,
quad_class TINYINT,
goldstein_scale DOUBLE,
num_mentions INTEGER,
num_sources INTEGER,
num_articles INTEGER,
avg_tone DOUBLE,
actor1_geo_type INTEGER,
actor1_geo_fullname VARCHAR,
actor1_geo_country_code VARCHAR,
actor1_geo_adm1_code VARCHAR,
actor1_geo_adm2_code VARCHAR,
actor1_geo_lat DOUBLE,
actor1_geo_long DOUBLE,
actor1_geo_feature_id VARCHAR,
actor2_geo_type INTEGER,
actor2_geo_fullname VARCHAR,
actor2_geo_country_code VARCHAR,
actor2_geo_adm1_code VARCHAR,
actor2_geo_adm2_code VARCHAR,
actor2_geo_lat DOUBLE,
actor2_geo_long DOUBLE,
actor2_geo_feature_id VARCHAR,
action_geo_type INTEGER,
action_geo_fullname VARCHAR,
action_geo_country_code VARCHAR,
action_geo_adm1_code VARCHAR,
action_geo_adm2_code VARCHAR,
action_geo_lat DOUBLE,
action_geo_long DOUBLE,
action_geo_feature_id VARCHAR,
date_added BIGINT,
source_url VARCHAR
);
CREATE INDEX IF NOT EXISTS idx_events_date ON events(sql_date);
CREATE INDEX IF NOT EXISTS idx_events_actor1 ON events(actor1_code);
CREATE INDEX IF NOT EXISTS idx_events_actor2 ON events(actor2_code);
CREATE INDEX IF NOT EXISTS idx_events_event_code ON events(event_code);
CREATE INDEX IF NOT EXISTS idx_events_quad_class ON events(quad_class);
CREATE INDEX IF NOT EXISTS idx_events_country ON events(action_geo_country_code);
"#,
)?;
self.conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS gkg (
gkg_record_id VARCHAR PRIMARY KEY,
date BIGINT,
source_collection_identifier INTEGER,
source_common_name VARCHAR,
document_identifier VARCHAR NOT NULL,
themes VARCHAR[],
locations VARCHAR[],
persons VARCHAR[],
organizations VARCHAR[],
tone DOUBLE,
positive_score DOUBLE,
negative_score DOUBLE,
polarity DOUBLE,
word_count INTEGER,
sharing_image VARCHAR
);
CREATE INDEX IF NOT EXISTS idx_gkg_date ON gkg(date);
"#,
)?;
self.conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS mentions (
global_event_id BIGINT NOT NULL,
mention_time_date BIGINT,
mention_type INTEGER,
mention_source_name VARCHAR,
mention_identifier VARCHAR,
sentence_id INTEGER,
confidence DOUBLE,
mention_doc_len INTEGER,
mention_doc_tone DOUBLE
);
CREATE INDEX IF NOT EXISTS idx_mentions_event ON mentions(global_event_id);
CREATE INDEX IF NOT EXISTS idx_mentions_date ON mentions(mention_time_date);
"#,
)?;
self.conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS metadata (
key VARCHAR PRIMARY KEY,
value VARCHAR,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"#,
)?;
Ok(())
}
pub fn stats(&self) -> Result<DbStats> {
let events_count: i64 = self
.conn
.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
let gkg_count: i64 = self
.conn
.query_row("SELECT COUNT(*) FROM gkg", [], |row| row.get(0))?;
let mentions_count: i64 = self
.conn
.query_row("SELECT COUNT(*) FROM mentions", [], |row| row.get(0))?;
let events_date_range = self.get_date_range("events", "sql_date")?;
let gkg_date_range = self.get_date_range("gkg", "date")?;
let file_size = std::fs::metadata(&self.db_path)
.map(|m| m.len())
.unwrap_or(0);
Ok(DbStats {
events_count: events_count as u64,
gkg_count: gkg_count as u64,
mentions_count: mentions_count as u64,
events_date_range,
gkg_date_range,
file_size_bytes: file_size,
})
}
fn get_date_range(&self, table: &str, column: &str) -> Result<Option<(i64, i64)>> {
let query = format!("SELECT MIN({}), MAX({}) FROM {}", column, column, table);
let result: (Option<i64>, Option<i64>) =
self.conn.query_row(&query, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
match result {
(Some(min), Some(max)) => Ok(Some((min, max))),
_ => Ok(None),
}
}
pub fn query(&self, sql: &str) -> Result<QueryResult> {
self.query_with_params(sql, &[])
}
pub fn query_with_params(&self, sql: &str, params: &[&dyn duckdb::ToSql]) -> Result<QueryResult> {
let mut stmt = self.conn.prepare(sql)?;
let mut rows = Vec::new();
let mut query_rows = stmt.query(params)?;
let column_count = query_rows.as_ref().map(|r| r.column_count()).unwrap_or(0);
let column_names: Vec<String> = if column_count > 0 {
query_rows.as_ref()
.map(|r| r.column_names().iter().map(|s| s.to_string()).collect())
.unwrap_or_default()
} else {
Vec::new()
};
while let Some(row) = query_rows.next()? {
let mut values = Vec::with_capacity(column_count);
for i in 0..column_count {
let value = row_to_json_value(row, i)?;
values.push(value);
}
rows.push(values);
}
Ok(QueryResult {
columns: column_names,
rows,
})
}
pub fn query_events(&self, filters: EventFilters) -> Result<Vec<EventRow>> {
let mut conditions = Vec::new();
let mut params_vec: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
if let Some(ref actor) = filters.actor {
conditions.push("(actor1_code = ? OR actor2_code = ?)");
params_vec.push(Box::new(actor.clone()));
params_vec.push(Box::new(actor.clone()));
}
if let Some(ref event_code) = filters.event_code {
if event_code.ends_with('*') {
let prefix = event_code.trim_end_matches('*');
conditions.push("event_code LIKE ?");
params_vec.push(Box::new(format!("{}%", prefix)));
} else {
conditions.push("event_code = ?");
params_vec.push(Box::new(event_code.clone()));
}
}
if let Some(quad_class) = filters.quad_class {
conditions.push("quad_class = ?");
params_vec.push(Box::new(quad_class as i32));
}
if let Some(ref country) = filters.country {
conditions.push("action_geo_country_code = ?");
params_vec.push(Box::new(country.clone()));
}
if let Some(ref start) = filters.start_date {
conditions.push("sql_date >= ?");
params_vec.push(Box::new(parse_date_to_int(start)));
}
if let Some(ref end) = filters.end_date {
conditions.push("sql_date <= ?");
params_vec.push(Box::new(parse_date_to_int(end)));
}
if let Some(goldstein_min) = filters.goldstein_min {
conditions.push("goldstein_scale >= ?");
params_vec.push(Box::new(goldstein_min));
}
if let Some(goldstein_max) = filters.goldstein_max {
conditions.push("goldstein_scale <= ?");
params_vec.push(Box::new(goldstein_max));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let sql = format!(
r#"
SELECT global_event_id, sql_date, actor1_code, actor1_name, actor1_country_code,
actor2_code, actor2_name, actor2_country_code, event_code, event_root_code,
quad_class, goldstein_scale, num_mentions, num_sources, num_articles,
avg_tone, action_geo_fullname, action_geo_country_code, source_url
FROM events
{}
ORDER BY sql_date DESC
LIMIT ? OFFSET ?
"#,
where_clause
);
params_vec.push(Box::new(filters.limit as i64));
params_vec.push(Box::new(filters.offset as i64));
let mut stmt = self.conn.prepare(&sql)?;
let params_refs: Vec<&dyn duckdb::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
let mut rows = stmt.query(params_refs.as_slice())?;
let mut results = Vec::new();
while let Some(row) = rows.next()? {
results.push(EventRow {
global_event_id: row.get(0)?,
sql_date: row.get(1)?,
actor1_code: row.get(2)?,
actor1_name: row.get(3)?,
actor1_country_code: row.get(4)?,
actor2_code: row.get(5)?,
actor2_name: row.get(6)?,
actor2_country_code: row.get(7)?,
event_code: row.get(8)?,
event_root_code: row.get(9)?,
quad_class: row.get(10)?,
goldstein_scale: row.get(11)?,
num_mentions: row.get(12)?,
num_sources: row.get(13)?,
num_articles: row.get(14)?,
avg_tone: row.get(15)?,
action_geo_fullname: row.get(16)?,
action_geo_country_code: row.get(17)?,
source_url: row.get(18)?,
});
}
Ok(results)
}
pub fn find_gkg_by_url(&self, url: &str) -> Result<Option<GkgRecord>> {
let mut stmt = self.conn.prepare(
r#"
SELECT gkg_record_id, date, source_common_name, document_identifier,
array_to_string(themes, '|') as themes_str,
array_to_string(persons, '|') as persons_str,
array_to_string(organizations, '|') as orgs_str,
array_to_string(locations, '|') as locs_str,
tone, positive_score, negative_score, polarity, word_count
FROM gkg
WHERE document_identifier = ?
LIMIT 1
"#,
)?;
let mut rows = stmt.query([url])?;
if let Some(row) = rows.next()? {
let themes_str: Option<String> = row.get(4)?;
let persons_str: Option<String> = row.get(5)?;
let orgs_str: Option<String> = row.get(6)?;
let locs_str: Option<String> = row.get(7)?;
Ok(Some(GkgRecord {
gkg_record_id: row.get(0)?,
date: row.get(1)?,
source_common_name: row.get(2)?,
document_identifier: row.get(3)?,
themes: parse_pipe_list(&themes_str),
persons: parse_pipe_list(&persons_str),
organizations: parse_pipe_list(&orgs_str),
locations: parse_pipe_list(&locs_str),
tone: row.get(8)?,
positive_score: row.get(9)?,
negative_score: row.get(10)?,
polarity: row.get(11)?,
word_count: row.get(12)?,
}))
} else {
Ok(None)
}
}
pub fn find_gkg_by_urls(&self, urls: &[&str]) -> Result<std::collections::HashMap<String, GkgRecord>> {
let mut results = std::collections::HashMap::new();
for url in urls {
if let Some(record) = self.find_gkg_by_url(url)? {
results.insert(url.to_string(), record);
}
}
Ok(results)
}
pub fn lookup_event(&self, event_id: i64) -> Result<Option<EventRow>> {
let mut stmt = self.conn.prepare(
r#"
SELECT global_event_id, sql_date, actor1_code, actor1_name, actor1_country_code,
actor2_code, actor2_name, actor2_country_code, event_code, event_root_code,
quad_class, goldstein_scale, num_mentions, num_sources, num_articles,
avg_tone, action_geo_fullname, action_geo_country_code, source_url
FROM events
WHERE global_event_id = ?
"#,
)?;
let mut rows = stmt.query([event_id])?;
if let Some(row) = rows.next()? {
Ok(Some(EventRow {
global_event_id: row.get(0)?,
sql_date: row.get(1)?,
actor1_code: row.get(2)?,
actor1_name: row.get(3)?,
actor1_country_code: row.get(4)?,
actor2_code: row.get(5)?,
actor2_name: row.get(6)?,
actor2_country_code: row.get(7)?,
event_code: row.get(8)?,
event_root_code: row.get(9)?,
quad_class: row.get(10)?,
goldstein_scale: row.get(11)?,
num_mentions: row.get(12)?,
num_sources: row.get(13)?,
num_articles: row.get(14)?,
avg_tone: row.get(15)?,
action_geo_fullname: row.get(16)?,
action_geo_country_code: row.get(17)?,
source_url: row.get(18)?,
}))
} else {
Ok(None)
}
}
pub fn import_events_file(&self, path: &std::path::Path) -> Result<u64> {
let path_str = path.to_string_lossy();
info!("Importing events from: {}", path_str);
let sql = format!(
r#"
INSERT INTO events
SELECT * FROM read_csv_auto('{}', delim='\t', header=false,
columns={{
'global_event_id': 'BIGINT',
'sql_date': 'INTEGER',
'month_year': 'INTEGER',
'year': 'INTEGER',
'fraction_date': 'DOUBLE'
-- Add more columns as needed
}})
ON CONFLICT DO NOTHING
"#,
path_str
);
let count = self.conn.execute(&sql, [])?;
Ok(count as u64)
}
pub fn vacuum(&self) -> Result<()> {
self.conn.execute_batch("VACUUM;")?;
Ok(())
}
pub fn analyze(&self) -> Result<()> {
self.conn.execute_batch("ANALYZE;")?;
Ok(())
}
pub fn export(&self, table: &str, path: &std::path::Path, format: ExportFormat) -> Result<()> {
let path_str = path.to_string_lossy();
let sql = match format {
ExportFormat::Parquet => format!("COPY {} TO '{}' (FORMAT PARQUET)", table, path_str),
ExportFormat::Csv => format!("COPY {} TO '{}' (FORMAT CSV, HEADER)", table, path_str),
ExportFormat::Json => format!("COPY {} TO '{}' (FORMAT JSON)", table, path_str),
};
self.conn.execute(&sql, [])?;
Ok(())
}
pub fn path(&self) -> &std::path::Path {
&self.db_path
}
}
fn row_to_json_value(row: &Row, idx: usize) -> Result<serde_json::Value> {
if let Ok(v) = row.get::<_, i64>(idx) {
return Ok(serde_json::Value::Number(v.into()));
}
if let Ok(v) = row.get::<_, f64>(idx) {
return Ok(serde_json::json!(v));
}
if let Ok(v) = row.get::<_, String>(idx) {
return Ok(serde_json::Value::String(v));
}
if let Ok(v) = row.get::<_, bool>(idx) {
return Ok(serde_json::Value::Bool(v));
}
Ok(serde_json::Value::Null)
}
fn parse_date_to_int(date_str: &str) -> i32 {
date_str.replace('-', "").parse().unwrap_or(0)
}
fn parse_pipe_list(s: &Option<String>) -> Vec<String> {
match s {
Some(text) if !text.is_empty() => {
text.split('|')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
}
_ => Vec::new(),
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct DbStats {
pub events_count: u64,
pub gkg_count: u64,
pub mentions_count: u64,
pub events_date_range: Option<(i64, i64)>,
pub gkg_date_range: Option<(i64, i64)>,
pub file_size_bytes: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct QueryResult {
pub columns: Vec<String>,
pub rows: Vec<Vec<serde_json::Value>>,
}
#[derive(Debug, Clone, Default)]
pub struct EventFilters {
pub actor: Option<String>,
pub event_code: Option<String>,
pub quad_class: Option<u8>,
pub country: Option<String>,
pub start_date: Option<String>,
pub end_date: Option<String>,
pub goldstein_min: Option<f64>,
pub goldstein_max: Option<f64>,
pub limit: u32,
pub offset: u32,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct EventRow {
pub global_event_id: i64,
pub sql_date: i32,
pub actor1_code: Option<String>,
pub actor1_name: Option<String>,
pub actor1_country_code: Option<String>,
pub actor2_code: Option<String>,
pub actor2_name: Option<String>,
pub actor2_country_code: Option<String>,
pub event_code: String,
pub event_root_code: Option<String>,
pub quad_class: Option<i32>,
pub goldstein_scale: Option<f64>,
pub num_mentions: Option<i32>,
pub num_sources: Option<i32>,
pub num_articles: Option<i32>,
pub avg_tone: Option<f64>,
pub action_geo_fullname: Option<String>,
pub action_geo_country_code: Option<String>,
pub source_url: Option<String>,
}
#[derive(Debug, Clone, Copy)]
pub enum ExportFormat {
Parquet,
Csv,
Json,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct GkgRecord {
pub gkg_record_id: String,
pub date: Option<i64>,
pub source_common_name: Option<String>,
pub document_identifier: String,
pub themes: Vec<String>,
pub persons: Vec<String>,
pub organizations: Vec<String>,
pub locations: Vec<String>,
pub tone: Option<f64>,
pub positive_score: Option<f64>,
pub negative_score: Option<f64>,
pub polarity: Option<f64>,
pub word_count: Option<i32>,
}
pub mod validation {
use crate::error::{GdeltError, Result};
const MAX_PATTERN_LENGTH: usize = 256;
pub fn validate_date(date: &str) -> Result<i32> {
let normalized = date.replace('-', "");
if normalized.len() != 8 {
return Err(GdeltError::Validation(format!(
"Invalid date format '{}': expected YYYY-MM-DD or YYYYMMDD",
date
)));
}
normalized.parse::<i32>().map_err(|_| {
GdeltError::Validation(format!("Invalid date '{}': not a valid number", date))
})
}
pub fn validate_pattern(pattern: &str) -> Result<String> {
if pattern.len() > MAX_PATTERN_LENGTH {
return Err(GdeltError::Validation(format!(
"Pattern too long: {} chars (max {})",
pattern.len(),
MAX_PATTERN_LENGTH
)));
}
if !pattern.chars().all(|c| c.is_alphanumeric() || " -_.".contains(c)) {
return Err(GdeltError::Validation(format!(
"Invalid pattern '{}': only alphanumeric, spaces, hyphens, underscores, and dots allowed",
pattern
)));
}
Ok(pattern.to_string())
}
pub fn validate_country_code(code: &str) -> Result<String> {
if code.len() < 2 || code.len() > 3 {
return Err(GdeltError::Validation(format!(
"Invalid country code '{}': must be 2-3 characters",
code
)));
}
if !code.chars().all(|c| c.is_ascii_alphabetic()) {
return Err(GdeltError::Validation(format!(
"Invalid country code '{}': must be alphabetic",
code
)));
}
Ok(code.to_uppercase())
}
pub fn validate_actor_code(code: &str) -> Result<String> {
if code.len() > 50 {
return Err(GdeltError::Validation(format!(
"Actor code too long: {} chars (max 50)",
code.len()
)));
}
if !code.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
return Err(GdeltError::Validation(format!(
"Invalid actor code '{}': only alphanumeric and underscore allowed",
code
)));
}
Ok(code.to_uppercase())
}
pub fn validate_theme(theme: &str) -> Result<String> {
if theme.len() > MAX_PATTERN_LENGTH {
return Err(GdeltError::Validation(format!(
"Theme too long: {} chars (max {})",
theme.len(),
MAX_PATTERN_LENGTH
)));
}
if !theme.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
return Err(GdeltError::Validation(format!(
"Invalid theme '{}': only alphanumeric and underscore allowed",
theme
)));
}
Ok(theme.to_uppercase())
}
pub fn validate_entity_name(name: &str) -> Result<String> {
if name.len() > MAX_PATTERN_LENGTH {
return Err(GdeltError::Validation(format!(
"Entity name too long: {} chars (max {})",
name.len(),
MAX_PATTERN_LENGTH
)));
}
if !name.chars().all(|c| c.is_alphanumeric() || " -.'',".contains(c)) {
return Err(GdeltError::Validation(format!(
"Invalid entity name '{}': contains disallowed characters",
name
)));
}
Ok(name.to_string())
}
}