use crate::api::client::GdeltClient;
use crate::db::cache::{ArticleContent, CacheDb};
use crate::db::duckdb::{AnalyticsDb, GkgRecord};
use crate::error::{GdeltError, Result};
use chrono::Utc;
use scraper::{Html, Selector};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::{debug, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GkgEnrichment {
pub themes: Vec<String>,
pub persons: Vec<String>,
pub organizations: Vec<String>,
pub locations: Vec<String>,
pub tone: Option<ToneData>,
pub word_count: Option<i32>,
pub source: GkgSource,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToneData {
pub average: f64,
pub positive: Option<f64>,
pub negative: Option<f64>,
pub polarity: Option<f64>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum GkgSource {
Local,
Api,
NotFound,
}
#[derive(Debug, Clone, Serialize)]
pub struct EnrichedArticle {
pub url: String,
pub title: Option<String>,
pub date: Option<String>,
pub source: Option<String>,
pub language: Option<String>,
pub gkg: Option<GkgEnrichment>,
pub full_text: Option<ArticleContent>,
}
#[derive(Debug, Clone, Serialize, Default)]
pub struct EnrichmentStats {
pub total: usize,
pub gkg_found: usize,
pub gkg_local: usize,
pub gkg_api: usize,
pub text_fetched: usize,
pub text_cached: usize,
pub errors: usize,
}
pub struct GkgLookup {
db: Option<AnalyticsDb>,
}
impl GkgLookup {
pub fn new() -> Result<Self> {
let db = AnalyticsDb::open().ok();
Ok(Self { db })
}
pub fn lookup(&self, url: &str) -> Result<Option<GkgEnrichment>> {
if let Some(ref db) = self.db {
if let Some(record) = db.find_gkg_by_url(url)? {
return Ok(Some(Self::record_to_enrichment(record, GkgSource::Local)));
}
}
Ok(None)
}
pub fn lookup_batch(&self, urls: &[&str]) -> Result<HashMap<String, GkgEnrichment>> {
let mut results = HashMap::new();
if let Some(ref db) = self.db {
let records = db.find_gkg_by_urls(urls)?;
for (url, record) in records {
results.insert(url, Self::record_to_enrichment(record, GkgSource::Local));
}
}
Ok(results)
}
fn record_to_enrichment(record: GkgRecord, source: GkgSource) -> GkgEnrichment {
let tone = record.tone.map(|avg| ToneData {
average: avg,
positive: record.positive_score,
negative: record.negative_score,
polarity: record.polarity,
});
GkgEnrichment {
themes: record.themes,
persons: record.persons,
organizations: record.organizations,
locations: record.locations,
tone,
word_count: record.word_count,
source,
}
}
}
impl Default for GkgLookup {
fn default() -> Self {
Self::new().unwrap_or(Self { db: None })
}
}
pub struct ArticleFetcher {
client: GdeltClient,
cache: Option<CacheDb>,
cache_ttl_days: i64,
}
impl ArticleFetcher {
pub fn new(client: GdeltClient) -> Result<Self> {
let cache = CacheDb::open().ok();
Ok(Self {
client,
cache,
cache_ttl_days: 7,
})
}
pub fn with_cache_ttl(client: GdeltClient, ttl_days: i64) -> Result<Self> {
let cache = CacheDb::open().ok();
Ok(Self {
client,
cache,
cache_ttl_days: ttl_days,
})
}
pub async fn fetch(&self, url: &str) -> Result<ArticleContent> {
if let Some(ref cache) = self.cache {
if let Ok(Some(cached)) = cache.get_article(url) {
debug!("Article cache hit for: {}", url);
return Ok(cached);
}
}
debug!("Fetching article: {}", url);
let html = self.client.get_text(url).await?;
let content = self.extract_content(url, &html)?;
if let Some(ref cache) = self.cache {
if let Err(e) = cache.put_article(&content, self.cache_ttl_days) {
warn!("Failed to cache article: {}", e);
}
}
Ok(content)
}
pub async fn fetch_batch(&self, urls: &[&str], concurrency: usize) -> Vec<Result<ArticleContent>> {
use futures::stream::{self, StreamExt};
stream::iter(urls.iter())
.map(|url| async move { self.fetch(url).await })
.buffer_unordered(concurrency)
.collect()
.await
}
fn extract_content(&self, url: &str, html: &str) -> Result<ArticleContent> {
let document = Html::parse_document(html);
let title = self.extract_title(&document);
let text = self.extract_text(&document);
let word_count = text.split_whitespace().count();
Ok(ArticleContent {
url: url.to_string(),
title,
text,
word_count,
fetched_at: Utc::now(),
})
}
fn extract_title(&self, document: &Html) -> Option<String> {
if let Ok(selector) = Selector::parse("meta[property='og:title']") {
if let Some(elem) = document.select(&selector).next() {
if let Some(content) = elem.value().attr("content") {
return Some(content.trim().to_string());
}
}
}
if let Ok(selector) = Selector::parse("title") {
if let Some(elem) = document.select(&selector).next() {
let title: String = elem.text().collect();
if !title.is_empty() {
return Some(title.trim().to_string());
}
}
}
None
}
fn extract_text(&self, document: &Html) -> String {
let article_selectors = [
"article",
"[role='main']",
"main",
".article-content",
".article-body",
".post-content",
".entry-content",
".content",
"#content",
".story-body",
".story-content",
];
for selector_str in &article_selectors {
if let Ok(selector) = Selector::parse(selector_str) {
if let Some(elem) = document.select(&selector).next() {
let text = self.clean_text(elem.text().collect::<String>());
if text.len() > 200 {
return text;
}
}
}
}
if let Ok(body_selector) = Selector::parse("body") {
if let Some(body) = document.select(&body_selector).next() {
return self.clean_text(body.text().collect::<String>());
}
}
String::new()
}
fn clean_text(&self, text: String) -> String {
text.split_whitespace()
.collect::<Vec<_>>()
.join(" ")
.trim()
.to_string()
}
}
pub struct EnrichmentService {
gkg_lookup: GkgLookup,
article_fetcher: Option<ArticleFetcher>,
}
impl EnrichmentService {
pub fn new() -> Result<Self> {
Ok(Self {
gkg_lookup: GkgLookup::new()?,
article_fetcher: None,
})
}
pub fn with_fetcher(client: GdeltClient) -> Result<Self> {
Ok(Self {
gkg_lookup: GkgLookup::new()?,
article_fetcher: Some(ArticleFetcher::new(client)?),
})
}
pub async fn enrich(
&self,
url: &str,
title: Option<String>,
date: Option<String>,
source: Option<String>,
language: Option<String>,
fetch_text: bool,
) -> Result<EnrichedArticle> {
let gkg = self.gkg_lookup.lookup(url)?;
let full_text = if fetch_text {
if let Some(ref fetcher) = self.article_fetcher {
match fetcher.fetch(url).await {
Ok(content) => Some(content),
Err(e) => {
warn!("Failed to fetch article {}: {}", url, e);
None
}
}
} else {
return Err(GdeltError::Config(
"Article fetcher not configured. Use EnrichmentService::with_fetcher()".into()
));
}
} else {
None
};
Ok(EnrichedArticle {
url: url.to_string(),
title,
date,
source,
language,
gkg,
full_text,
})
}
pub async fn enrich_batch(
&self,
articles: Vec<ArticleInput>,
fetch_text: bool,
concurrency: usize,
) -> (Vec<EnrichedArticle>, EnrichmentStats) {
use futures::stream::{self, StreamExt};
let mut stats = EnrichmentStats {
total: articles.len(),
..Default::default()
};
let urls: Vec<&str> = articles.iter().map(|a| a.url.as_str()).collect();
let gkg_results = self.gkg_lookup.lookup_batch(&urls).unwrap_or_default();
stats.gkg_found = gkg_results.len();
stats.gkg_local = gkg_results.values().filter(|g| g.source == GkgSource::Local).count();
stats.gkg_api = gkg_results.values().filter(|g| g.source == GkgSource::Api).count();
let enriched: Vec<EnrichedArticle> = if fetch_text && self.article_fetcher.is_some() {
let fetcher = self.article_fetcher.as_ref().unwrap();
stream::iter(articles.iter())
.map(|article| async {
let gkg = gkg_results.get(&article.url).cloned();
let full_text = match fetcher.fetch(&article.url).await {
Ok(content) => Some(content),
Err(e) => {
warn!("Failed to fetch {}: {}", article.url, e);
None
}
};
EnrichedArticle {
url: article.url.clone(),
title: article.title.clone(),
date: article.date.clone(),
source: article.source.clone(),
language: article.language.clone(),
gkg,
full_text,
}
})
.buffer_unordered(concurrency)
.collect()
.await
} else {
articles
.into_iter()
.map(|article| {
let gkg = gkg_results.get(&article.url).cloned();
EnrichedArticle {
url: article.url,
title: article.title,
date: article.date,
source: article.source,
language: article.language,
gkg,
full_text: None,
}
})
.collect()
};
stats.text_fetched = enriched.iter().filter(|a| a.full_text.is_some()).count();
(enriched, stats)
}
}
impl Default for EnrichmentService {
fn default() -> Self {
Self::new().unwrap_or(Self {
gkg_lookup: GkgLookup { db: None },
article_fetcher: None,
})
}
}
#[derive(Debug, Clone)]
pub struct ArticleInput {
pub url: String,
pub title: Option<String>,
pub date: Option<String>,
pub source: Option<String>,
pub language: Option<String>,
}
impl ArticleInput {
pub fn new(url: String) -> Self {
Self {
url,
title: None,
date: None,
source: None,
language: None,
}
}
pub fn with_metadata(
url: String,
title: Option<String>,
date: Option<String>,
source: Option<String>,
language: Option<String>,
) -> Self {
Self {
url,
title,
date,
source,
language,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_article_input() {
let input = ArticleInput::new("https://example.com".to_string());
assert_eq!(input.url, "https://example.com");
assert!(input.title.is_none());
}
#[test]
fn test_gkg_lookup_new() {
let lookup = GkgLookup::new();
assert!(lookup.is_ok());
}
}