use std::time::Duration;
use ceres_core::HttpConfig;
use ceres_core::error::AppError;
use ceres_core::models::NewDataset;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use reqwest::{Client, StatusCode, Url};
use serde_json::Value;
use tokio::time::sleep;
const PAGE_DELAY: Duration = Duration::from_millis(200);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(90);
const MAX_RETRY_DELAY: Duration = Duration::from_secs(30);
#[derive(Debug, Clone)]
pub struct DcatDataset {
pub id_uri: String,
pub identifier: String,
pub title: String,
pub description: Option<String>,
pub raw: Value,
}
#[derive(Clone)]
pub struct DcatClient {
client: Client,
base_url: Url,
language: String,
}
impl DcatClient {
pub fn new(base_url_str: &str, language: &str) -> Result<Self, AppError> {
let base_url = Url::parse(base_url_str)
.map_err(|_| AppError::InvalidPortalUrl(base_url_str.to_string()))?;
let client = Client::builder()
.user_agent("Ceres/0.1 (semantic-search-bot)")
.timeout(REQUEST_TIMEOUT)
.build()
.map_err(|e| AppError::ClientError(e.to_string()))?;
Ok(Self {
client,
base_url,
language: language.to_string(),
})
}
pub fn portal_type(&self) -> &'static str {
"dcat"
}
pub fn base_url(&self) -> &str {
self.base_url.as_str()
}
pub async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
let datasets = self.search_all_datasets().await?;
Ok(datasets.into_iter().map(|d| d.identifier).collect())
}
pub async fn get_dataset(&self, _id: &str) -> Result<DcatDataset, AppError> {
Err(AppError::Generic(
"get_dataset is not implemented for DCAT portals. \
Use search_all_datasets() instead — it fetches complete metadata \
for all datasets in a single paginated catalog request."
.to_string(),
))
}
pub async fn search_modified_since(
&self,
since: DateTime<Utc>,
) -> Result<Vec<DcatDataset>, AppError> {
let since_str = since.to_rfc3339();
self.paginate_catalog(Some(&since_str)).await
}
pub async fn search_all_datasets(&self) -> Result<Vec<DcatDataset>, AppError> {
self.paginate_catalog(None).await
}
pub fn into_new_dataset(
data: DcatDataset,
portal_url: &str,
_url_template: Option<&str>,
language: &str,
) -> NewDataset {
let content_hash = NewDataset::compute_content_hash_with_language(
&data.title,
data.description.as_deref(),
language,
);
NewDataset {
original_id: data.identifier,
source_portal: portal_url.to_string(),
url: data.id_uri,
title: data.title,
description: data.description,
embedding: None,
metadata: data.raw,
content_hash,
}
}
async fn paginate_catalog(
&self,
modified_since: Option<&str>,
) -> Result<Vec<DcatDataset>, AppError> {
let mut all_datasets = Vec::new();
let mut next_url = Some(self.build_first_page_url(modified_since)?);
let mut page = 0u32;
while let Some(url) = next_url {
page += 1;
let graph = match self.fetch_graph(&url).await {
Ok(g) => g,
Err(e) => {
if all_datasets.is_empty() {
return Err(e);
}
tracing::warn!(
page,
collected = all_datasets.len(),
error = %e,
"Page fetch failed; returning partial results"
);
break;
}
};
for node in &graph {
if is_dataset_node(node)
&& let Some(dataset) = extract_dataset(node, &self.language)
{
all_datasets.push(dataset);
}
}
next_url =
match extract_hydra_next(&graph) {
Some(next) => {
sleep(PAGE_DELAY).await;
Some(Url::parse(&next).map_err(|e| {
AppError::Generic(format!("Invalid hydra:next URL: {e}"))
})?)
}
None => None,
};
}
Ok(all_datasets)
}
pub fn paginate_catalog_stream(
&self,
modified_since: Option<String>,
) -> BoxStream<'_, Result<Vec<DcatDataset>, AppError>> {
struct PaginationState {
next_url: Option<Url>,
page: u32,
}
let first_url = match self.build_first_page_url(modified_since.as_deref()) {
Ok(url) => url,
Err(err) => {
return Box::pin(futures::stream::once(async move { Err(err) }));
}
};
let initial = PaginationState {
next_url: Some(first_url),
page: 0,
};
Box::pin(futures::stream::unfold(
initial,
move |mut state| async move {
let url = state.next_url.take()?;
state.page += 1;
let graph = match self.fetch_graph(&url).await {
Ok(g) => g,
Err(e) => {
if state.page == 1 {
return Some((
Err(e),
PaginationState {
next_url: None,
page: state.page,
},
));
}
tracing::warn!(
page = state.page,
error = %e,
"Page fetch failed; stopping stream"
);
return None;
}
};
let mut datasets = Vec::new();
for node in &graph {
if is_dataset_node(node)
&& let Some(dataset) = extract_dataset(node, &self.language)
{
datasets.push(dataset);
}
}
state.next_url = match extract_hydra_next(&graph) {
Some(next) => {
sleep(PAGE_DELAY).await;
match Url::parse(&next) {
Ok(u) => Some(u),
Err(e) => {
tracing::warn!(error = %e, "Invalid hydra:next URL; stopping stream");
None
}
}
}
None => None,
};
Some((Ok(datasets), state))
},
))
}
fn build_first_page_url(&self, modified_since: Option<&str>) -> Result<Url, AppError> {
let mut url = self
.base_url
.join("api/1/site/catalog.jsonld")
.map_err(|e| AppError::Generic(e.to_string()))?;
{
let mut pairs = url.query_pairs_mut();
pairs
.append_pair("page", "1")
.append_pair("page_size", "100");
if let Some(since) = modified_since {
pairs.append_pair("modified_since", since);
}
}
Ok(url)
}
async fn fetch_graph(&self, url: &Url) -> Result<Vec<Value>, AppError> {
let resp = self.request_with_retry(url).await?;
let body: Value = resp.json().await.map_err(|e| {
AppError::ClientError(format!("Portal returned non-JSON response: {e}"))
})?;
match body.get("@graph") {
Some(Value::Array(graph)) => Ok(graph.clone()),
Some(_) => Err(AppError::ClientError(
"DCAT response @graph is not an array".to_string(),
)),
None => Err(AppError::ClientError(
"DCAT response missing @graph".to_string(),
)),
}
}
async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
let http_config = HttpConfig::default();
let max_retries = http_config.max_retries;
let base_delay = http_config.retry_base_delay;
let mut last_error = AppError::Generic("No attempts made".to_string());
for attempt in 1..=max_retries {
match self.client.get(url.clone()).send().await {
Ok(resp) => {
let status = resp.status();
if status.is_success() {
return Ok(resp);
}
if status == StatusCode::TOO_MANY_REQUESTS {
last_error = AppError::RateLimitExceeded;
if attempt < max_retries {
let delay = resp
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or_else(|| {
(base_delay * 2_u32.pow(attempt)).min(MAX_RETRY_DELAY)
});
sleep(delay).await;
continue;
}
return Err(AppError::RateLimitExceeded);
}
if status.is_server_error() {
last_error = AppError::ClientError(format!(
"Server error: HTTP {}",
status.as_u16()
));
if attempt < max_retries {
sleep(base_delay * attempt).await;
continue;
}
}
return Err(AppError::ClientError(format!(
"HTTP {} from {}",
status.as_u16(),
url
)));
}
Err(e) => {
if e.is_timeout() {
last_error = AppError::Timeout(REQUEST_TIMEOUT.as_secs());
} else if e.is_connect() {
last_error = AppError::NetworkError(format!("Connection failed: {e}"));
} else {
last_error = AppError::ClientError(e.to_string());
}
if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
sleep(base_delay * attempt).await;
continue;
}
}
}
}
Err(last_error)
}
}
pub fn resolve_jsonld_text(value: &Value, language: &str) -> String {
match value {
Value::String(s) => s.clone(),
Value::Object(obj) => {
obj.get("@value")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string()
}
Value::Array(arr) => {
let find_lang = |lang: &str, allow_base_match: bool| -> Option<&str> {
arr.iter().find_map(|item| {
let obj = item.as_object()?;
let item_lang = obj.get("@language")?.as_str()?;
let matches = item_lang.eq_ignore_ascii_case(lang)
|| (allow_base_match
&& item_lang
.split('-')
.next()
.map(|base| base.eq_ignore_ascii_case(lang))
.unwrap_or(false));
if matches {
obj.get("@value")?.as_str()
} else {
None
}
})
};
if let Some(s) = find_lang(language, false).or_else(|| find_lang(language, true)) {
return s.to_string();
}
if language != "en"
&& let Some(s) = find_lang("en", false).or_else(|| find_lang("en", true))
{
return s.to_string();
}
arr.iter()
.find_map(|item| {
item.as_object()
.and_then(|o| o.get("@value"))
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
})
.unwrap_or("")
.to_string()
}
_ => String::new(),
}
}
pub fn is_dataset_node(node: &Value) -> bool {
let type_value = match node.get("@type") {
Some(v) => v,
None => return false,
};
let is_dataset_type = |s: &str| {
matches!(
s,
"Dataset" | "dcat:Dataset" | "http://www.w3.org/ns/dcat#Dataset"
)
};
match type_value {
Value::String(s) => is_dataset_type(s.as_str()),
Value::Array(arr) => arr
.iter()
.any(|v| v.as_str().map(is_dataset_type).unwrap_or(false)),
_ => false,
}
}
pub fn extract_hydra_next(graph: &[Value]) -> Option<String> {
for node in graph {
let Some(obj) = node.as_object() else {
continue;
};
let is_pcv = obj.get("@type").map(|t| match t {
Value::String(s) => {
s == "hydra:PartialCollectionView"
|| s == "http://www.w3.org/ns/hydra/core#PartialCollectionView"
}
Value::Array(arr) => arr.iter().any(|v| {
v.as_str()
.map(|s| {
s == "hydra:PartialCollectionView"
|| s == "http://www.w3.org/ns/hydra/core#PartialCollectionView"
})
.unwrap_or(false)
}),
_ => false,
});
if is_pcv != Some(true) {
continue;
}
for key in &["next", "hydra:next", "http://www.w3.org/ns/hydra/core#next"] {
if let Some(next) = obj.get(*key) {
if let Some(s) = next.as_str() {
return Some(s.to_string());
}
if let Some(id) = next.get("@id").and_then(|v| v.as_str()) {
return Some(id.to_string());
}
}
}
}
None
}
fn get_jsonld_property<'a>(node: &'a Value, keys: &[&str]) -> Option<&'a Value> {
keys.iter().find_map(|key| node.get(*key))
}
pub fn extract_dataset(node: &Value, language: &str) -> Option<DcatDataset> {
let id_uri = node.get("@id")?.as_str()?.to_string();
if id_uri.is_empty() {
return None;
}
let identifier =
get_jsonld_property(node, &["identifier", "dct:identifier", "dcat:identifier"])
.and_then(|v| match v {
Value::String(s) => Some(s.clone()),
Value::Object(o) => o.get("@value").and_then(|v| v.as_str()).map(String::from),
_ => None,
})
.filter(|s| !s.is_empty())
.unwrap_or_else(|| {
id_uri
.trim_end_matches('/')
.rsplit('/')
.next()
.unwrap_or(&id_uri)
.to_string()
});
let title = get_jsonld_property(node, &["title", "dct:title"])
.map(|v| resolve_jsonld_text(v, language))
.filter(|s| !s.is_empty())?;
let description = get_jsonld_property(node, &["description", "dct:description"])
.map(|v| resolve_jsonld_text(v, language))
.filter(|s| !s.is_empty());
Some(DcatDataset {
id_uri,
identifier,
title,
description,
raw: node.clone(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn resolve_plain_string() {
let v = json!("Dataset Title");
assert_eq!(resolve_jsonld_text(&v, "en"), "Dataset Title");
}
#[test]
fn resolve_single_lang_object() {
let v = json!({"@language": "fr", "@value": "Titre du jeu de données"});
assert_eq!(resolve_jsonld_text(&v, "fr"), "Titre du jeu de données");
}
#[test]
fn resolve_array_with_lang_match() {
let v = json!([
{"@language": "en", "@value": "English Title"},
{"@language": "fr", "@value": "Titre Français"}
]);
assert_eq!(resolve_jsonld_text(&v, "fr"), "Titre Français");
}
#[test]
fn resolve_array_fallback_to_en() {
let v = json!([
{"@language": "en", "@value": "English Title"},
{"@language": "de", "@value": "Deutscher Titel"}
]);
assert_eq!(resolve_jsonld_text(&v, "fr"), "English Title");
}
#[test]
fn resolve_array_matches_translated_language_tags() {
let v = json!([
{"@language": "de", "@value": "Deutscher Titel"},
{"@language": "en-t-de-t0-mtec", "@value": "English machine title"}
]);
assert_eq!(resolve_jsonld_text(&v, "en"), "English machine title");
}
#[test]
fn resolve_array_prefers_exact_language_over_translated_tag() {
let v = json!([
{"@language": "en-t-de-t0-mtec", "@value": "English machine title"},
{"@language": "en", "@value": "English Title"}
]);
assert_eq!(resolve_jsonld_text(&v, "en"), "English Title");
}
#[test]
fn resolve_array_fallback_to_first() {
let v = json!([
{"@language": "de", "@value": "Deutscher Titel"},
{"@language": "nl", "@value": "Nederlandse Titel"}
]);
assert_eq!(resolve_jsonld_text(&v, "fr"), "Deutscher Titel");
}
#[test]
fn resolve_null_returns_empty() {
assert_eq!(resolve_jsonld_text(&Value::Null, "en"), "");
}
#[test]
fn is_dataset_node_string_form() {
let node = json!({"@type": "Dataset", "@id": "https://example.org/d/1"});
assert!(is_dataset_node(&node));
}
#[test]
fn is_dataset_node_array_form() {
let node = json!({"@type": ["Dataset", "dcat:Dataset"], "@id": "https://example.org/d/1"});
assert!(is_dataset_node(&node));
}
#[test]
fn is_dataset_node_full_uri() {
let node =
json!({"@type": "http://www.w3.org/ns/dcat#Dataset", "@id": "https://example.org/d/1"});
assert!(is_dataset_node(&node));
}
#[test]
fn is_dataset_node_distribution_false() {
let node = json!({"@type": "Distribution"});
assert!(!is_dataset_node(&node));
}
#[test]
fn is_dataset_node_missing_type_false() {
let node = json!({"@id": "https://example.org/d/1"});
assert!(!is_dataset_node(&node));
}
#[test]
fn extract_hydra_next_with_next_key() {
let graph = vec![
json!({"@type": "hydra:PartialCollectionView", "next": "https://example.org/catalog.jsonld?page=2&page_size=100"}),
];
assert_eq!(
extract_hydra_next(&graph),
Some("https://example.org/catalog.jsonld?page=2&page_size=100".to_string())
);
}
#[test]
fn extract_hydra_next_with_hydra_prefix() {
let graph = vec![
json!({"@type": "hydra:PartialCollectionView", "hydra:next": "https://example.org/catalog.jsonld?page=2"}),
];
assert_eq!(
extract_hydra_next(&graph),
Some("https://example.org/catalog.jsonld?page=2".to_string())
);
}
#[test]
fn extract_hydra_next_absent_returns_none() {
let graph = vec![
json!({"@type": "Dataset", "title": "foo"}),
json!({"@type": ["Catalog", "hydra:Collection"], "totalItems": 100}),
];
assert_eq!(extract_hydra_next(&graph), None);
}
#[test]
fn extract_hydra_next_skips_non_object_nodes() {
let graph = vec![
json!("https://example.org/context"),
json!(42),
json!({"@type": "hydra:PartialCollectionView", "next": "https://example.org/?page=2"}),
];
assert_eq!(
extract_hydra_next(&graph),
Some("https://example.org/?page=2".to_string())
);
}
#[test]
fn extract_dataset_basic() {
let node = json!({
"@id": "https://data.public.lu/datasets/abc123/",
"@type": "Dataset",
"identifier": "abc123",
"title": "Test Dataset",
"description": "A test description"
});
let dataset = extract_dataset(&node, "en").unwrap();
assert_eq!(dataset.id_uri, "https://data.public.lu/datasets/abc123/");
assert_eq!(dataset.identifier, "abc123");
assert_eq!(dataset.title, "Test Dataset");
assert_eq!(dataset.description.as_deref(), Some("A test description"));
}
#[test]
fn extract_dataset_supports_prefixed_dcat_ap_keys() {
let node = json!({
"@id": "http://data.europa.eu/88u/dataset/example",
"@type": "dcat:Dataset",
"dct:identifier": {"@value": "example-id"},
"dct:title": [
{"@language": "de", "@value": "Deutscher Titel"},
{"@language": "en-t-de-t0-mtec", "@value": "English Title"}
],
"dct:description": {"@language": "en", "@value": "Description"}
});
let dataset = extract_dataset(&node, "en").unwrap();
assert_eq!(dataset.identifier, "example-id");
assert_eq!(dataset.title, "English Title");
assert_eq!(dataset.description.as_deref(), Some("Description"));
}
#[test]
fn extract_dataset_identifier_fallback_to_id_segment() {
let node = json!({
"@id": "https://data.public.lu/datasets/my-dataset/",
"@type": "Dataset",
"title": "My Dataset"
});
let dataset = extract_dataset(&node, "en").unwrap();
assert_eq!(dataset.identifier, "my-dataset");
}
#[test]
fn extract_dataset_missing_id_returns_none() {
let node = json!({"@type": "Dataset", "title": "No ID"});
assert!(extract_dataset(&node, "en").is_none());
}
#[test]
fn extract_dataset_missing_title_returns_none() {
let node = json!({"@id": "https://example.org/d/1", "@type": "Dataset"});
assert!(extract_dataset(&node, "en").is_none());
}
#[test]
fn into_new_dataset_mapping() {
let dataset = DcatDataset {
id_uri: "https://data.public.lu/datasets/abc123/".to_string(),
identifier: "abc123".to_string(),
title: "Test Dataset".to_string(),
description: Some("A description".to_string()),
raw: json!({"@id": "https://data.public.lu/datasets/abc123/"}),
};
let new_dataset =
DcatClient::into_new_dataset(dataset, "https://data.public.lu", None, "fr");
assert_eq!(new_dataset.original_id, "abc123");
assert_eq!(new_dataset.source_portal, "https://data.public.lu");
assert_eq!(new_dataset.url, "https://data.public.lu/datasets/abc123/");
assert_eq!(new_dataset.title, "Test Dataset");
assert_eq!(new_dataset.description.as_deref(), Some("A description"));
assert!(new_dataset.embedding.is_none());
let expected_hash = NewDataset::compute_content_hash_with_language(
"Test Dataset",
Some("A description"),
"fr",
);
assert_eq!(new_dataset.content_hash, expected_hash);
}
#[test]
fn into_new_dataset_url_template_ignored() {
let dataset = DcatDataset {
id_uri: "https://data.public.lu/datasets/abc123/".to_string(),
identifier: "abc123".to_string(),
title: "Test".to_string(),
description: None,
raw: json!({}),
};
let new_dataset = DcatClient::into_new_dataset(
dataset,
"https://data.public.lu",
Some("https://example.org/{id}"),
"en",
);
assert_eq!(new_dataset.url, "https://data.public.lu/datasets/abc123/");
}
#[test]
fn dcat_client_new_valid() {
assert!(DcatClient::new("https://data.public.lu", "fr").is_ok());
}
#[test]
fn dcat_client_new_invalid_url() {
let result = DcatClient::new("not-a-url", "en");
assert!(matches!(result, Err(AppError::InvalidPortalUrl(_))));
}
#[tokio::test]
#[ignore = "requires network access to data.public.lu"]
async fn test_dcat_smoke_luxembourg() {
let client = DcatClient::new("https://data.public.lu", "fr").unwrap();
let datasets = client.search_all_datasets().await.unwrap();
assert!(
datasets.len() > 100,
"Expected >100 datasets, got {}",
datasets.len()
);
let first = &datasets[0];
assert!(!first.title.is_empty(), "First dataset title is empty");
assert!(!first.id_uri.is_empty(), "First dataset id_uri is empty");
assert!(
!first.identifier.is_empty(),
"First dataset identifier is empty"
);
}
}