use std::time::Duration;
use ceres_core::HttpConfig;
use ceres_core::LocalizedField;
use ceres_core::error::AppError;
use ceres_core::models::NewDataset;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use reqwest::{Client, StatusCode, Url};
use serde::Deserialize;
use serde_json::Value;
use tokio::time::sleep;
#[derive(Deserialize, Debug)]
struct CkanResponse<T> {
success: bool,
result: T,
}
#[derive(Deserialize, Debug)]
struct PackageSearchResult {
count: usize,
results: Vec<CkanDataset>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct CkanDataset {
pub id: String,
pub name: String,
pub title: LocalizedField,
pub notes: Option<LocalizedField>,
#[serde(flatten)]
pub extras: serde_json::Map<String, Value>,
}
#[derive(Clone)]
pub struct CkanClient {
client: Client,
base_url: Url,
}
impl CkanClient {
const PAGE_DELAY: Duration = Duration::from_secs(1);
const MAX_RETRY_DELAY: Duration = Duration::from_secs(30);
const PAGE_RATE_LIMIT_COOLDOWN: Duration = Duration::from_secs(60);
const PAGE_RATE_LIMIT_RETRIES: u32 = 3;
const MIN_PAGE_SIZE: usize = 10;
fn is_page_size_reducible(err: &AppError) -> bool {
matches!(err, AppError::Timeout(_) | AppError::NetworkError(_))
}
pub fn new(base_url_str: &str) -> Result<Self, AppError> {
let base_url = Url::parse(base_url_str)
.map_err(|_| AppError::InvalidPortalUrl(base_url_str.to_string()))?;
let http_config = HttpConfig::default();
let client = Client::builder()
.user_agent("Ceres/0.1 (semantic-search-bot)")
.timeout(http_config.timeout)
.build()
.map_err(|e| AppError::ClientError(e.to_string()))?;
Ok(Self { client, base_url })
}
pub async fn list_package_ids(&self) -> Result<Vec<String>, AppError> {
let url = self
.base_url
.join("api/3/action/package_list")
.map_err(|e| AppError::Generic(e.to_string()))?;
let resp = self.request_with_retry(&url).await?;
let ckan_resp: CkanResponse<Vec<String>> = resp
.json()
.await
.map_err(|e| AppError::ClientError(e.to_string()))?;
if !ckan_resp.success {
return Err(AppError::Generic(
"CKAN API returned success: false".to_string(),
));
}
Ok(ckan_resp.result)
}
pub async fn show_package(&self, id: &str) -> Result<CkanDataset, AppError> {
let mut url = self
.base_url
.join("api/3/action/package_show")
.map_err(|e| AppError::Generic(e.to_string()))?;
url.query_pairs_mut().append_pair("id", id);
let resp = self.request_with_retry(&url).await?;
let ckan_resp: CkanResponse<CkanDataset> = resp
.json()
.await
.map_err(|e| AppError::ClientError(e.to_string()))?;
if !ckan_resp.success {
return Err(AppError::Generic(format!(
"CKAN failed to show package {}",
id
)));
}
Ok(ckan_resp.result)
}
pub async fn search_modified_since(
&self,
since: DateTime<Utc>,
) -> Result<Vec<CkanDataset>, AppError> {
let since_str = since.format("%Y-%m-%dT%H:%M:%SZ").to_string();
let fq = Some(format!("metadata_modified:[{} TO *]", since_str));
self.paginated_search(fq.as_deref()).await
}
pub async fn search_all_datasets(&self) -> Result<Vec<CkanDataset>, AppError> {
self.paginated_search(None).await
}
async fn paginated_search(&self, fq: Option<&str>) -> Result<Vec<CkanDataset>, AppError> {
let mut page_size: usize = 1000;
let mut all_datasets = Vec::new();
let mut start: usize = 0;
let mut page_delay = Self::PAGE_DELAY;
loop {
match self
.fetch_search_page(fq, start, page_size, &mut page_delay)
.await
{
Ok((datasets, total_count)) => {
let page_count = datasets.len();
all_datasets.extend(datasets);
if start + page_count >= total_count || page_count < page_size {
break;
}
start += page_size;
}
Err(e) if page_size > Self::MIN_PAGE_SIZE && Self::is_page_size_reducible(&e) => {
page_size = (page_size / 4).max(Self::MIN_PAGE_SIZE);
tracing::warn!(
new_page_size = page_size,
offset = start,
error = %e,
"Page failed, reducing page size and retrying"
);
continue;
}
Err(e) if Self::is_page_size_reducible(&e) => {
return Err(AppError::Generic(format!(
"package_search unreliable even at page_size={page_size}: {e}"
)));
}
Err(e) => return Err(e),
}
sleep(page_delay).await;
}
Ok(all_datasets)
}
fn paginated_search_stream(
&self,
fq: Option<String>,
) -> BoxStream<'_, Result<Vec<CkanDataset>, AppError>> {
struct PaginationState {
start: usize,
page_size: usize,
page_delay: Duration,
done: bool,
fq: Option<String>,
}
let initial = PaginationState {
start: 0,
page_size: 1000,
page_delay: Self::PAGE_DELAY,
done: false,
fq,
};
Box::pin(futures::stream::unfold(
initial,
move |mut state| async move {
if state.done {
return None;
}
loop {
match self
.fetch_search_page(
state.fq.as_deref(),
state.start,
state.page_size,
&mut state.page_delay,
)
.await
{
Ok((datasets, total_count)) => {
let page_count = datasets.len();
if state.start + page_count >= total_count
|| page_count < state.page_size
{
state.done = true;
} else {
state.start += state.page_size;
}
if !state.done {
sleep(state.page_delay).await;
}
return Some((Ok(datasets), state));
}
Err(e)
if state.page_size > Self::MIN_PAGE_SIZE
&& Self::is_page_size_reducible(&e) =>
{
state.page_size = (state.page_size / 4).max(Self::MIN_PAGE_SIZE);
tracing::warn!(
new_page_size = state.page_size,
offset = state.start,
error = %e,
"Page failed, reducing page size and retrying"
);
continue; }
Err(e) if Self::is_page_size_reducible(&e) => {
state.done = true;
return Some((
Err(AppError::Generic(format!(
"package_search unreliable even at page_size={}: {e}",
state.page_size
))),
state,
));
}
Err(e) => {
state.done = true;
return Some((Err(e), state));
}
}
}
},
))
}
pub async fn dataset_count(&self) -> Result<usize, AppError> {
let mut page_delay = Self::PAGE_DELAY;
let (_, total) = self.fetch_search_page(None, 0, 0, &mut page_delay).await?;
Ok(total)
}
async fn fetch_search_page(
&self,
fq: Option<&str>,
start: usize,
page_size: usize,
page_delay: &mut Duration,
) -> Result<(Vec<CkanDataset>, usize), AppError> {
let mut url = self
.base_url
.join("api/3/action/package_search")
.map_err(|e| AppError::Generic(e.to_string()))?;
{
let mut pairs = url.query_pairs_mut();
if let Some(filter) = fq {
pairs.append_pair("fq", filter);
}
pairs
.append_pair("rows", &page_size.to_string())
.append_pair("start", &start.to_string())
.append_pair("sort", "metadata_modified asc");
}
let mut page_result = None;
for page_attempt in 0..=Self::PAGE_RATE_LIMIT_RETRIES {
match self.request_with_retry(&url).await {
Ok(resp) => {
page_result = Some(Ok(resp));
break;
}
Err(AppError::RateLimitExceeded)
if page_attempt < Self::PAGE_RATE_LIMIT_RETRIES =>
{
let cooldown = Self::PAGE_RATE_LIMIT_COOLDOWN * (page_attempt + 1);
sleep(cooldown).await;
*page_delay = (*page_delay * 2).min(Duration::from_secs(5));
}
Err(e) => {
page_result = Some(Err(e));
break;
}
}
}
let resp = match page_result {
Some(Ok(resp)) => resp,
Some(Err(e)) => return Err(e),
None => return Err(AppError::RateLimitExceeded),
};
let ckan_resp: CkanResponse<PackageSearchResult> = resp
.json()
.await
.map_err(|e| AppError::ClientError(e.to_string()))?;
if !ckan_resp.success {
return Err(AppError::Generic(
"CKAN package_search returned success: false".to_string(),
));
}
Ok((ckan_resp.result.results, ckan_resp.result.count))
}
const RATE_LIMIT_MAX_RETRIES: u32 = 10;
async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
let http_config = HttpConfig::default();
let max_retries = http_config.max_retries;
let base_delay = http_config.retry_base_delay;
let mut last_error = AppError::Generic("No attempts made".to_string());
let effective_max = Self::RATE_LIMIT_MAX_RETRIES.max(max_retries);
for attempt in 1..=effective_max {
match self.client.get(url.clone()).send().await {
Ok(resp) => {
let status = resp.status();
if status.is_success() {
return Ok(resp);
}
if status == StatusCode::TOO_MANY_REQUESTS {
last_error = AppError::RateLimitExceeded;
if attempt < effective_max {
let delay = resp
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or_else(|| {
(base_delay * 2_u32.pow(attempt)).min(Self::MAX_RETRY_DELAY)
});
sleep(delay).await;
continue;
}
return Err(AppError::RateLimitExceeded);
}
if status.is_server_error() {
last_error = AppError::ClientError(format!(
"Server error: HTTP {}",
status.as_u16()
));
if attempt < max_retries {
let delay = base_delay * attempt;
sleep(delay).await;
continue;
}
}
return Err(AppError::ClientError(format!(
"HTTP {} from {}",
status.as_u16(),
url
)));
}
Err(e) => {
if e.is_timeout() {
last_error = AppError::Timeout(http_config.timeout.as_secs());
} else if e.is_connect() {
last_error = AppError::NetworkError(format!("Connection failed: {}", e));
} else {
last_error = AppError::ClientError(e.to_string());
}
if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
let delay = base_delay * attempt;
sleep(delay).await;
continue;
}
}
}
}
Err(last_error)
}
pub fn into_new_dataset(
dataset: CkanDataset,
portal_url: &str,
url_template: Option<&str>,
language: &str,
) -> NewDataset {
let landing_page = match url_template {
Some(template) => template
.replace("{id}", &dataset.id)
.replace("{name}", &dataset.name),
None => format!(
"{}/dataset/{}",
portal_url.trim_end_matches('/'),
dataset.name
),
};
let metadata_json = serde_json::Value::Object(dataset.extras.clone());
let title = dataset.title.resolve(language);
let description = dataset
.notes
.map(|n| n.resolve(language))
.or_else(|| {
dataset
.extras
.get("description")
.and_then(|v| serde_json::from_value::<LocalizedField>(v.clone()).ok())
.map(|f| f.resolve(language))
})
.filter(|d| !d.is_empty());
let content_hash = NewDataset::compute_content_hash_with_language(
&title,
description.as_deref(),
language,
);
NewDataset {
original_id: dataset.id,
source_portal: portal_url.to_string(),
url: landing_page,
title,
description,
embedding: None,
metadata: metadata_json,
content_hash,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_with_valid_url() {
let result = CkanClient::new("https://dati.gov.it");
assert!(result.is_ok());
let client = result.unwrap();
assert_eq!(client.base_url.as_str(), "https://dati.gov.it/");
}
#[test]
fn test_new_with_invalid_url() {
let result = CkanClient::new("not-a-valid-url");
assert!(result.is_err());
if let Err(AppError::InvalidPortalUrl(url)) = result {
assert_eq!(url, "not-a-valid-url");
} else {
panic!("Expected AppError::InvalidPortalUrl");
}
}
#[test]
fn test_into_new_dataset_basic() {
let ckan_dataset = CkanDataset {
id: "dataset-123".to_string(),
name: "my-dataset".to_string(),
title: LocalizedField::Plain("My Dataset".to_string()),
notes: Some(LocalizedField::Plain("This is a test dataset".to_string())),
extras: serde_json::Map::new(),
};
let portal_url = "https://dati.gov.it";
let new_dataset = CkanClient::into_new_dataset(ckan_dataset, portal_url, None, "en");
assert_eq!(new_dataset.original_id, "dataset-123");
assert_eq!(new_dataset.source_portal, "https://dati.gov.it");
assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/my-dataset");
assert_eq!(new_dataset.title, "My Dataset");
assert!(new_dataset.embedding.is_none());
let expected_hash = NewDataset::compute_content_hash_with_language(
"My Dataset",
Some("This is a test dataset"),
"en",
);
assert_eq!(new_dataset.content_hash, expected_hash);
assert_eq!(new_dataset.content_hash.len(), 64);
}
#[test]
fn test_into_new_dataset_with_url_template() {
let ckan_dataset = CkanDataset {
id: "52db43b1-4d6a-446c-a3fc-b2e470fe5a45".to_string(),
name: "raccolta-differenziata".to_string(),
title: LocalizedField::Plain("Raccolta Differenziata".to_string()),
notes: Some(LocalizedField::Plain(
"Percentuale raccolta differenziata".to_string(),
)),
extras: serde_json::Map::new(),
};
let portal_url = "https://dati.gov.it/opendata/";
let template = "https://www.dati.gov.it/view-dataset/dataset?id={id}";
let new_dataset =
CkanClient::into_new_dataset(ckan_dataset, portal_url, Some(template), "en");
assert_eq!(
new_dataset.url,
"https://www.dati.gov.it/view-dataset/dataset?id=52db43b1-4d6a-446c-a3fc-b2e470fe5a45"
);
assert_eq!(new_dataset.source_portal, "https://dati.gov.it/opendata/");
}
#[test]
fn test_into_new_dataset_url_template_with_name() {
let ckan_dataset = CkanDataset {
id: "abc-123".to_string(),
name: "air-quality-data".to_string(),
title: LocalizedField::Plain("Air Quality".to_string()),
notes: None,
extras: serde_json::Map::new(),
};
let template = "https://example.com/datasets/{name}/view";
let new_dataset =
CkanClient::into_new_dataset(ckan_dataset, "https://example.com", Some(template), "en");
assert_eq!(
new_dataset.url,
"https://example.com/datasets/air-quality-data/view"
);
}
#[test]
fn test_ckan_response_deserialization() {
let json = r#"{
"success": true,
"result": ["dataset-1", "dataset-2", "dataset-3"]
}"#;
let response: CkanResponse<Vec<String>> = serde_json::from_str(json).unwrap();
assert!(response.success);
assert_eq!(response.result.len(), 3);
}
#[test]
fn test_ckan_dataset_deserialization() {
let json = r#"{
"id": "test-id",
"name": "test-name",
"title": "Test Title",
"notes": "Test notes",
"organization": {
"name": "test-org"
}
}"#;
let dataset: CkanDataset = serde_json::from_str(json).unwrap();
assert_eq!(dataset.id, "test-id");
assert_eq!(dataset.name, "test-name");
assert_eq!(dataset.title.resolve("en"), "Test Title");
assert!(dataset.extras.contains_key("organization"));
}
#[test]
fn test_ckan_dataset_multilingual_deserialization() {
let json = r#"{
"id": "swiss-123",
"name": "swiss-dataset",
"title": {"en": "English Title", "de": "Deutscher Titel", "fr": "Titre Francais"},
"notes": {"en": "English description", "de": "Deutsche Beschreibung"}
}"#;
let dataset: CkanDataset = serde_json::from_str(json).unwrap();
assert_eq!(dataset.id, "swiss-123");
assert_eq!(dataset.title.resolve("en"), "English Title");
assert_eq!(dataset.title.resolve("de"), "Deutscher Titel");
assert_eq!(dataset.title.resolve("it"), "English Title"); assert_eq!(
dataset.notes.as_ref().unwrap().resolve("de"),
"Deutsche Beschreibung"
);
}
#[test]
fn test_into_new_dataset_multilingual() {
let json = r#"{
"id": "swiss-dataset",
"name": "test-multilingual",
"title": {"en": "English Title", "de": "Deutscher Titel"},
"notes": {"en": "English description", "de": "Deutsche Beschreibung"}
}"#;
let dataset: CkanDataset = serde_json::from_str(json).unwrap();
let new_ds =
CkanClient::into_new_dataset(dataset, "https://ckan.opendata.swiss", None, "de");
assert_eq!(new_ds.title, "Deutscher Titel");
assert_eq!(
new_ds.description,
Some("Deutsche Beschreibung".to_string())
);
}
#[test]
fn test_into_new_dataset_description_fallback() {
let json = r#"{
"id": "swiss-no-notes",
"name": "dataset-without-notes",
"title": {"en": "English Title", "de": "Deutscher Titel"},
"description": {"en": "English desc", "de": "Deutsche Beschreibung"}
}"#;
let dataset: CkanDataset = serde_json::from_str(json).unwrap();
assert!(dataset.notes.is_none());
let new_ds =
CkanClient::into_new_dataset(dataset, "https://ckan.opendata.swiss", None, "en");
assert_eq!(new_ds.description, Some("English desc".to_string()));
}
#[test]
fn test_into_new_dataset_description_fallback_empty() {
let json = r#"{
"id": "swiss-empty-desc",
"name": "dataset-empty-desc",
"title": "Some Title",
"description": {"en": "", "de": "", "fr": ""}
}"#;
let dataset: CkanDataset = serde_json::from_str(json).unwrap();
let new_ds =
CkanClient::into_new_dataset(dataset, "https://ckan.opendata.swiss", None, "en");
assert_eq!(new_ds.description, None);
}
#[test]
fn test_into_new_dataset_notes_takes_priority() {
let json = r#"{
"id": "both-fields",
"name": "dataset-both",
"title": "Title",
"notes": "Notes description",
"description": {"en": "Extras description"}
}"#;
let dataset: CkanDataset = serde_json::from_str(json).unwrap();
let new_ds = CkanClient::into_new_dataset(dataset, "https://example.com", None, "en");
assert_eq!(new_ds.description, Some("Notes description".to_string()));
}
#[test]
fn test_is_page_size_reducible_timeout() {
assert!(CkanClient::is_page_size_reducible(&AppError::Timeout(30)));
}
#[test]
fn test_is_page_size_reducible_client_error() {
let err = AppError::ClientError("error decoding response body".to_string());
assert!(!CkanClient::is_page_size_reducible(&err));
}
#[test]
fn test_is_page_size_reducible_network_error() {
let err = AppError::NetworkError("connection reset".to_string());
assert!(CkanClient::is_page_size_reducible(&err));
}
#[test]
fn test_is_page_size_reducible_non_reducible() {
assert!(!CkanClient::is_page_size_reducible(
&AppError::RateLimitExceeded
));
assert!(!CkanClient::is_page_size_reducible(&AppError::Generic(
"something".to_string()
)));
}
}
impl ceres_core::traits::PortalClient for CkanClient {
type PortalData = CkanDataset;
fn portal_type(&self) -> &'static str {
"ckan"
}
fn base_url(&self) -> &str {
self.base_url.as_str()
}
async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
self.list_package_ids().await
}
async fn get_dataset(&self, id: &str) -> Result<Self::PortalData, AppError> {
self.show_package(id).await
}
fn into_new_dataset(
data: Self::PortalData,
portal_url: &str,
url_template: Option<&str>,
language: &str,
) -> NewDataset {
CkanClient::into_new_dataset(data, portal_url, url_template, language)
}
async fn search_modified_since(
&self,
since: DateTime<Utc>,
) -> Result<Vec<Self::PortalData>, AppError> {
self.search_modified_since(since).await
}
async fn search_all_datasets(&self) -> Result<Vec<Self::PortalData>, AppError> {
self.search_all_datasets().await
}
fn search_all_datasets_stream(&self) -> BoxStream<'_, Result<Vec<Self::PortalData>, AppError>> {
self.paginated_search_stream(None)
}
async fn dataset_count(&self) -> Result<usize, AppError> {
self.dataset_count().await
}
}
#[derive(Debug, Clone, Default)]
pub struct CkanClientFactory;
impl CkanClientFactory {}
impl ceres_core::traits::PortalClientFactory for CkanClientFactory {
type Client = CkanClient;
fn create(
&self,
portal_url: &str,
portal_type: ceres_core::config::PortalType,
_language: &str,
_profile: Option<&str>,
_sparql_endpoint: Option<&str>,
) -> Result<Self::Client, AppError> {
match portal_type {
ceres_core::config::PortalType::Ckan => CkanClient::new(portal_url),
other => Err(AppError::ConfigError(format!(
"CkanClientFactory can only create CKAN clients, but portal type {:?} was requested for URL {}",
other, portal_url
))),
}
}
}