#![allow(dead_code)]
use crate::config;
use crate::error::{GdeltError, Result};
use chrono::{DateTime, Duration, Utc};
use rusqlite::{params, Connection};
use sha2::{Digest, Sha256};
use std::path::PathBuf;
use tracing::{debug, instrument};
pub struct CacheDb {
conn: Connection,
db_path: PathBuf,
}
impl CacheDb {
#[instrument]
pub fn open() -> Result<Self> {
let db_path = config::cache_dir()
.ok_or_else(|| GdeltError::Cache("Could not determine cache directory".into()))?
.join("cache.sqlite");
Self::open_at(db_path)
}
pub fn open_at(db_path: PathBuf) -> Result<Self> {
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open(&db_path)?;
let db = Self { conn, db_path };
db.init_schema()?;
Ok(db)
}
pub fn open_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
let db = Self {
conn,
db_path: PathBuf::from(":memory:"),
};
db.init_schema()?;
Ok(db)
}
fn init_schema(&self) -> Result<()> {
debug!("Initializing cache schema");
self.conn.execute_batch(
r#"
-- API response cache
CREATE TABLE IF NOT EXISTS api_cache (
cache_key TEXT PRIMARY KEY,
url TEXT NOT NULL,
response_body TEXT NOT NULL,
content_type TEXT,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_api_cache_expires ON api_cache(expires_at);
-- Downloaded files tracking
CREATE TABLE IF NOT EXISTS downloads (
file_id TEXT PRIMARY KEY,
file_type TEXT NOT NULL,
remote_url TEXT NOT NULL,
local_path TEXT NOT NULL,
file_size INTEGER,
checksum TEXT,
date_from INTEGER,
date_to INTEGER,
downloaded_at INTEGER NOT NULL,
imported_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_downloads_type ON downloads(file_type);
CREATE INDEX IF NOT EXISTS idx_downloads_date ON downloads(date_from, date_to);
-- Sync state
CREATE TABLE IF NOT EXISTS sync_state (
data_type TEXT PRIMARY KEY,
last_sync_at INTEGER NOT NULL,
last_file_date INTEGER,
next_expected_file TEXT
);
-- Settings
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
-- Article content cache (for fetched full text)
CREATE TABLE IF NOT EXISTS article_cache (
url_hash TEXT PRIMARY KEY,
url TEXT NOT NULL,
title TEXT,
text_content TEXT NOT NULL,
word_count INTEGER,
fetched_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_article_cache_expires ON article_cache(expires_at);
"#,
)?;
Ok(())
}
pub fn get_cached(&self, url: &str) -> Result<Option<CacheEntry>> {
let cache_key = Self::make_cache_key(url);
let now = Utc::now().timestamp();
let mut stmt = self.conn.prepare(
"SELECT response_body, content_type, created_at, expires_at
FROM api_cache
WHERE cache_key = ? AND expires_at > ?",
)?;
let result = stmt.query_row(params![cache_key, now], |row| {
Ok(CacheEntry {
body: row.get(0)?,
content_type: row.get(1)?,
created_at: DateTime::from_timestamp(row.get::<_, i64>(2)?, 0)
.unwrap_or_else(Utc::now),
expires_at: DateTime::from_timestamp(row.get::<_, i64>(3)?, 0)
.unwrap_or_else(Utc::now),
})
});
match result {
Ok(entry) => Ok(Some(entry)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn put_cached(&self, url: &str, body: &str, content_type: Option<&str>, ttl_secs: i64) -> Result<()> {
let cache_key = Self::make_cache_key(url);
let now = Utc::now();
let expires_at = now + Duration::seconds(ttl_secs);
self.conn.execute(
"INSERT OR REPLACE INTO api_cache (cache_key, url, response_body, content_type, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?)",
params![
cache_key,
url,
body,
content_type,
now.timestamp(),
expires_at.timestamp()
],
)?;
Ok(())
}
pub fn prune_expired(&self) -> Result<u64> {
let now = Utc::now().timestamp();
let count = self
.conn
.execute("DELETE FROM api_cache WHERE expires_at < ?", params![now])?;
Ok(count as u64)
}
pub fn record_download(&self, download: &DownloadRecord) -> Result<()> {
self.conn.execute(
"INSERT OR REPLACE INTO downloads
(file_id, file_type, remote_url, local_path, file_size, checksum, date_from, date_to, downloaded_at, imported_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![
download.file_id,
download.file_type,
download.remote_url,
download.local_path,
download.file_size,
download.checksum,
download.date_from,
download.date_to,
download.downloaded_at.timestamp(),
download.imported_at.map(|d| d.timestamp())
],
)?;
Ok(())
}
pub fn mark_imported(&self, file_id: &str) -> Result<()> {
let now = Utc::now().timestamp();
self.conn.execute(
"UPDATE downloads SET imported_at = ? WHERE file_id = ?",
params![now, file_id],
)?;
Ok(())
}
pub fn get_downloads(&self, file_type: Option<&str>) -> Result<Vec<DownloadRecord>> {
let sql = if file_type.is_some() {
"SELECT file_id, file_type, remote_url, local_path, file_size, checksum,
date_from, date_to, downloaded_at, imported_at
FROM downloads WHERE file_type = ? ORDER BY date_from DESC"
} else {
"SELECT file_id, file_type, remote_url, local_path, file_size, checksum,
date_from, date_to, downloaded_at, imported_at
FROM downloads ORDER BY date_from DESC"
};
let mut stmt = self.conn.prepare(sql)?;
let rows = if let Some(ft) = file_type {
stmt.query_map(params![ft], row_to_download)?
} else {
stmt.query_map([], row_to_download)?
};
let mut results = Vec::new();
for row in rows {
results.push(row?);
}
Ok(results)
}
pub fn download_status(&self) -> Result<DownloadStatus> {
let events_count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM downloads WHERE file_type = 'events'",
[],
|row| row.get(0),
)?;
let gkg_count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM downloads WHERE file_type = 'gkg'",
[],
|row| row.get(0),
)?;
let mentions_count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM downloads WHERE file_type = 'mentions'",
[],
|row| row.get(0),
)?;
let pending_import: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM downloads WHERE imported_at IS NULL",
[],
|row| row.get(0),
)?;
let total_size: i64 = self.conn.query_row(
"SELECT COALESCE(SUM(file_size), 0) FROM downloads",
[],
|row| row.get(0),
)?;
let latest_date: Option<i64> = self
.conn
.query_row("SELECT MAX(date_to) FROM downloads", [], |row| row.get(0))
.ok();
Ok(DownloadStatus {
events_files: events_count as u64,
gkg_files: gkg_count as u64,
mentions_files: mentions_count as u64,
pending_import: pending_import as u64,
total_size_bytes: total_size as u64,
latest_date,
})
}
pub fn update_sync_state(&self, data_type: &str, last_file_date: Option<i64>) -> Result<()> {
let now = Utc::now().timestamp();
self.conn.execute(
"INSERT OR REPLACE INTO sync_state (data_type, last_sync_at, last_file_date)
VALUES (?, ?, ?)",
params![data_type, now, last_file_date],
)?;
Ok(())
}
pub fn get_sync_state(&self, data_type: &str) -> Result<Option<SyncState>> {
let result = self.conn.query_row(
"SELECT last_sync_at, last_file_date, next_expected_file
FROM sync_state WHERE data_type = ?",
params![data_type],
|row| {
Ok(SyncState {
data_type: data_type.to_string(),
last_sync_at: DateTime::from_timestamp(row.get::<_, i64>(0)?, 0)
.unwrap_or_else(Utc::now),
last_file_date: row.get(1)?,
next_expected_file: row.get(2)?,
})
},
);
match result {
Ok(state) => Ok(Some(state)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn get_setting(&self, key: &str) -> Result<Option<String>> {
let result = self.conn.query_row(
"SELECT value FROM settings WHERE key = ?",
params![key],
|row| row.get(0),
);
match result {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn set_setting(&self, key: &str, value: &str) -> Result<()> {
let now = Utc::now().timestamp();
self.conn.execute(
"INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES (?, ?, ?)",
params![key, value, now],
)?;
Ok(())
}
pub fn clear(&self) -> Result<()> {
self.conn.execute_batch(
"DELETE FROM api_cache;
DELETE FROM downloads;
DELETE FROM sync_state;",
)?;
Ok(())
}
pub fn stats(&self) -> Result<CacheStats> {
let api_cache_count: i64 = self
.conn
.query_row("SELECT COUNT(*) FROM api_cache", [], |row| row.get(0))?;
let api_cache_size: i64 = self.conn.query_row(
"SELECT COALESCE(SUM(LENGTH(response_body)), 0) FROM api_cache",
[],
|row| row.get(0),
)?;
let downloads_count: i64 = self
.conn
.query_row("SELECT COUNT(*) FROM downloads", [], |row| row.get(0))?;
let file_size = std::fs::metadata(&self.db_path)
.map(|m| m.len())
.unwrap_or(0);
Ok(CacheStats {
api_cache_entries: api_cache_count as u64,
api_cache_size_bytes: api_cache_size as u64,
downloads_tracked: downloads_count as u64,
db_file_size_bytes: file_size,
})
}
fn make_cache_key(url: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(url.as_bytes());
format!("{:x}", hasher.finalize())
}
pub fn get_article(&self, url: &str) -> Result<Option<ArticleContent>> {
let url_hash = Self::make_cache_key(url);
let now = Utc::now().timestamp();
let mut stmt = self.conn.prepare(
"SELECT url, title, text_content, word_count, fetched_at
FROM article_cache
WHERE url_hash = ? AND expires_at > ?",
)?;
let result = stmt.query_row(params![url_hash, now], |row| {
Ok(ArticleContent {
url: row.get(0)?,
title: row.get(1)?,
text: row.get(2)?,
word_count: row.get::<_, Option<i32>>(3)?.unwrap_or(0) as usize,
fetched_at: DateTime::from_timestamp(row.get::<_, i64>(4)?, 0)
.unwrap_or_else(Utc::now),
})
});
match result {
Ok(entry) => Ok(Some(entry)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn put_article(&self, content: &ArticleContent, ttl_days: i64) -> Result<()> {
let url_hash = Self::make_cache_key(&content.url);
let now = Utc::now();
let expires_at = now + Duration::days(ttl_days);
self.conn.execute(
"INSERT OR REPLACE INTO article_cache
(url_hash, url, title, text_content, word_count, fetched_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?)",
params![
url_hash,
content.url,
content.title,
content.text,
content.word_count as i32,
content.fetched_at.timestamp(),
expires_at.timestamp()
],
)?;
Ok(())
}
pub fn article_cache_stats(&self) -> Result<ArticleCacheStats> {
let count: i64 = self
.conn
.query_row("SELECT COUNT(*) FROM article_cache", [], |row| row.get(0))?;
let size: i64 = self.conn.query_row(
"SELECT COALESCE(SUM(LENGTH(text_content)), 0) FROM article_cache",
[],
|row| row.get(0),
)?;
let expired: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM article_cache WHERE expires_at < ?",
params![Utc::now().timestamp()],
|row| row.get(0),
)?;
Ok(ArticleCacheStats {
cached_articles: count as u64,
total_size_bytes: size as u64,
expired_count: expired as u64,
})
}
pub fn prune_article_cache(&self) -> Result<u64> {
let now = Utc::now().timestamp();
let count = self
.conn
.execute("DELETE FROM article_cache WHERE expires_at < ?", params![now])?;
Ok(count as u64)
}
pub fn clear_article_cache(&self) -> Result<u64> {
let count = self.conn.execute("DELETE FROM article_cache", [])?;
Ok(count as u64)
}
pub fn is_file_processed(&self, file_id: &str) -> Result<bool> {
let result: rusqlite::Result<i64> = self.conn.query_row(
"SELECT 1 FROM downloads WHERE file_id = ? AND imported_at IS NOT NULL",
params![file_id],
|row| row.get(0),
);
match result {
Ok(_) => Ok(true),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
Err(e) => Err(e.into()),
}
}
pub fn mark_file_processed(&self, file_id: &str, file_size: u64) -> Result<()> {
let now = Utc::now().timestamp();
self.conn.execute(
"INSERT OR REPLACE INTO downloads
(file_id, file_type, remote_url, local_path, file_size, downloaded_at, imported_at)
VALUES (?, 'sync', '', '', ?, ?, ?)",
params![file_id, file_size as i64, now, now],
)?;
Ok(())
}
}
fn row_to_download(row: &rusqlite::Row) -> rusqlite::Result<DownloadRecord> {
Ok(DownloadRecord {
file_id: row.get(0)?,
file_type: row.get(1)?,
remote_url: row.get(2)?,
local_path: row.get(3)?,
file_size: row.get(4)?,
checksum: row.get(5)?,
date_from: row.get(6)?,
date_to: row.get(7)?,
downloaded_at: DateTime::from_timestamp(row.get::<_, i64>(8)?, 0).unwrap_or_else(Utc::now),
imported_at: row
.get::<_, Option<i64>>(9)?
.and_then(|ts| DateTime::from_timestamp(ts, 0)),
})
}
#[derive(Debug, Clone)]
pub struct CacheEntry {
pub body: String,
pub content_type: Option<String>,
pub created_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct DownloadRecord {
pub file_id: String,
pub file_type: String,
pub remote_url: String,
pub local_path: String,
pub file_size: Option<i64>,
pub checksum: Option<String>,
pub date_from: Option<i64>,
pub date_to: Option<i64>,
pub downloaded_at: DateTime<Utc>,
pub imported_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct DownloadStatus {
pub events_files: u64,
pub gkg_files: u64,
pub mentions_files: u64,
pub pending_import: u64,
pub total_size_bytes: u64,
pub latest_date: Option<i64>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct SyncState {
pub data_type: String,
pub last_sync_at: DateTime<Utc>,
pub last_file_date: Option<i64>,
pub next_expected_file: Option<String>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct CacheStats {
pub api_cache_entries: u64,
pub api_cache_size_bytes: u64,
pub downloads_tracked: u64,
pub db_file_size_bytes: u64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ArticleContent {
pub url: String,
pub title: Option<String>,
pub text: String,
pub word_count: usize,
pub fetched_at: DateTime<Utc>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ArticleCacheStats {
pub cached_articles: u64,
pub total_size_bytes: u64,
pub expired_count: u64,
}