use std::collections::HashMap;
use std::time::Duration;
use ceres_core::HttpConfig;
use ceres_core::error::AppError;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use reqwest::{Client, StatusCode, Url};
use serde::Deserialize;
use serde_json::{Value, json};
use tokio::time::sleep;
use crate::dcat::{DcatDataset, extract_dataset, is_dataset_node};
const PAGE_DELAY: Duration = Duration::from_millis(300);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
const MAX_RETRY_DELAY: Duration = Duration::from_secs(30);
const PAGE_RETRY_BASE_DELAY: Duration = Duration::from_secs(10);
const MAX_PAGE_RETRIES: u32 = 3;
const DEFAULT_PAGE_SIZE: usize = 1000;
const MIN_PAGE_SIZE: usize = 50;
const PARTITION_PAGE_SIZE: usize = 500;
const PARTITION_OVERFLOW_OFFSET: usize = 9_500;
const REGISTRY_PAGE_SIZE: usize = 100;
const MAX_REGISTRY_PAGE_RETRIES: u32 = 3;
#[derive(Debug, Deserialize)]
struct SparqlResponse {
results: SparqlResults,
}
#[derive(Debug, Deserialize)]
struct SparqlResults {
bindings: Vec<HashMap<String, SparqlValue>>,
}
#[derive(Debug, Deserialize)]
struct SparqlValue {
value: String,
#[serde(rename = "xml:lang")]
lang: Option<String>,
}
enum OwnedPartition {
Publisher(String),
NoPublisher,
}
enum PartitionRef<'a> {
Publisher(&'a str),
NoPublisher,
}
struct RegistryPage {
datasets: Vec<DcatDataset>,
entries: usize,
}
#[derive(Clone)]
pub struct SparqlDcatClient {
client: Client,
endpoint_url: Url,
base_url: Url,
language: String,
page_size: usize,
}
impl SparqlDcatClient {
pub fn new(
base_url_str: &str,
language: &str,
sparql_endpoint: Option<&str>,
) -> Result<Self, AppError> {
if language.is_empty()
|| !language
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b == b'-')
{
return Err(AppError::ConfigError(format!(
"Invalid language tag '{language}'. Expected a BCP47 tag (e.g., 'en', 'nb', 'fr')."
)));
}
let base_url = Url::parse(base_url_str)
.map_err(|_| AppError::InvalidPortalUrl(base_url_str.to_string()))?;
let endpoint_url = if let Some(ep) = sparql_endpoint {
Url::parse(ep)
.map_err(|_| AppError::ConfigError(format!("Invalid sparql_endpoint URL: {ep}")))?
} else {
base_url.join("sparql").map_err(|e| {
AppError::Generic(format!("Failed to build SPARQL endpoint URL: {e}"))
})?
};
let client = Client::builder()
.user_agent("Ceres/0.1 (semantic-search-bot)")
.timeout(REQUEST_TIMEOUT)
.build()
.map_err(|e| AppError::ClientError(e.to_string()))?;
Ok(Self {
client,
endpoint_url,
base_url,
language: language.to_string(),
page_size: DEFAULT_PAGE_SIZE,
})
}
pub fn portal_type(&self) -> &'static str {
"dcat"
}
pub fn base_url(&self) -> &str {
self.base_url.as_str()
}
pub async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
let mut ids = Vec::new();
let mut offset = 0usize;
loop {
let query = format!(
"SELECT DISTINCT ?dataset \
WHERE {{ ?dataset a <http://www.w3.org/ns/dcat#Dataset> }} \
LIMIT {} OFFSET {}",
self.page_size, offset
);
let bindings = self.execute_query(&query).await?;
if bindings.is_empty() {
break;
}
let count = bindings.len();
for binding in bindings {
if let Some(v) = binding.get("dataset") {
ids.push(extract_last_segment(&v.value));
}
}
if count < self.page_size {
break;
}
offset += self.page_size;
sleep(PAGE_DELAY).await;
}
Ok(ids)
}
pub async fn get_dataset(&self, _id: &str) -> Result<DcatDataset, AppError> {
Err(AppError::Generic(
"get_dataset is not implemented for SPARQL DCAT portals. \
Use search_all_datasets() instead."
.to_string(),
))
}
pub async fn search_modified_since(
&self,
since: DateTime<Utc>,
) -> Result<Vec<DcatDataset>, AppError> {
let filter = Self::modified_since_filter(since);
self.paginate_sparql(Some(&filter)).await
}
fn modified_since_filter(since: DateTime<Utc>) -> String {
format!(
"FILTER (!BOUND(?modified) || ?modified > \"{}\"^^<http://www.w3.org/2001/XMLSchema#dateTime>)",
since.to_rfc3339()
)
}
pub async fn search_all_datasets(&self) -> Result<Vec<DcatDataset>, AppError> {
self.paginate_sparql(None).await
}
pub async fn dataset_count(&self) -> Result<usize, AppError> {
let query = "SELECT (COUNT(DISTINCT ?dataset) AS ?count) \
WHERE { ?dataset a <http://www.w3.org/ns/dcat#Dataset> }";
let bindings = self.execute_query(query).await?;
bindings
.first()
.and_then(|b| b.get("count"))
.and_then(|v| v.value.parse::<usize>().ok())
.ok_or_else(|| AppError::Generic("Failed to parse dataset count".to_string()))
}
pub fn paginate_sparql_stream(&self) -> BoxStream<'_, Result<Vec<DcatDataset>, AppError>> {
if self.uses_data_europa_registry() {
return self.paginate_data_europa_registry_stream();
}
struct PartitionState {
remaining: Vec<String>,
current: Option<OwnedPartition>,
offset: usize,
did_no_publisher: bool,
enumerated: bool,
fetched: usize,
skipped_partitions: usize,
overflow_partitions: usize,
done: bool,
}
let initial = PartitionState {
remaining: Vec::new(),
current: None,
offset: 0,
did_no_publisher: false,
enumerated: false,
fetched: 0,
skipped_partitions: 0,
overflow_partitions: 0,
done: false,
};
Box::pin(futures::stream::unfold(initial, move |mut state| {
async move {
if state.done {
let incomplete = state.skipped_partitions + state.overflow_partitions;
if incomplete > 0 {
return Some((
Err(AppError::Generic(format!(
"SPARQL harvest incomplete: {} partitions skipped, {} partitions overflowed (>{} offset), fetched {} datasets",
state.skipped_partitions,
state.overflow_partitions,
PARTITION_OVERFLOW_OFFSET,
state.fetched
))),
PartitionState {
skipped_partitions: 0,
overflow_partitions: 0,
..state
},
));
}
return None;
}
if !state.enumerated {
state.enumerated = true;
match self.enumerate_publishers().await {
Ok(mut pubs) => {
tracing::info!(
publishers = pubs.len(),
"SPARQL partition enumeration complete"
);
pubs.reverse();
state.remaining = pubs;
}
Err(e) => {
return Some((
Err(e),
PartitionState {
done: true,
..state
},
));
}
}
}
if state.current.is_none() {
if let Some(pub_uri) = state.remaining.pop() {
state.current = Some(OwnedPartition::Publisher(pub_uri));
state.offset = 0;
} else if !state.did_no_publisher {
state.did_no_publisher = true;
state.current = Some(OwnedPartition::NoPublisher);
state.offset = 0;
} else {
state.done = true;
return if state.skipped_partitions + state.overflow_partitions > 0 {
Some((Ok(vec![]), state))
} else {
None
};
}
}
let partition_ref = match state.current.as_ref().unwrap() {
OwnedPartition::Publisher(p) => PartitionRef::Publisher(p.as_str()),
OwnedPartition::NoPublisher => PartitionRef::NoPublisher,
};
let mut last_err = None;
let mut page_result = None;
for attempt in 0..=MAX_PAGE_RETRIES {
match self
.fetch_partition_page(&partition_ref, state.offset, PARTITION_PAGE_SIZE)
.await
{
Ok(datasets) => {
page_result = Some(datasets);
break;
}
Err(e) => {
if attempt < MAX_PAGE_RETRIES {
let delay = PAGE_RETRY_BASE_DELAY * 2_u32.pow(attempt);
tracing::warn!(
offset = state.offset,
attempt = attempt + 1,
delay_secs = delay.as_secs(),
error = %e,
"SPARQL partition page failed; retrying after backoff"
);
sleep(delay).await;
}
last_err = Some(e);
}
}
}
let datasets = match page_result {
Some(d) => d,
None => {
let e = last_err.unwrap();
let label = match &partition_ref {
PartitionRef::Publisher(p) => *p,
PartitionRef::NoPublisher => "<no-publisher>",
};
tracing::warn!(
partition = label,
offset = state.offset,
error = %e,
"SPARQL partition failed after retries; skipping remainder"
);
state.skipped_partitions += 1;
state.current = None;
sleep(PAGE_RETRY_BASE_DELAY).await;
return Some((Ok(vec![]), state));
}
};
let count = datasets.len();
state.fetched += count;
tracing::debug!(
offset = state.offset,
page_datasets = count,
total_fetched = state.fetched,
"SPARQL partition page fetched"
);
let partition_full = count >= PARTITION_PAGE_SIZE;
state.offset += PARTITION_PAGE_SIZE;
if !partition_full {
state.current = None;
} else if state.offset >= PARTITION_OVERFLOW_OFFSET {
let label = match &partition_ref {
PartitionRef::Publisher(p) => *p,
PartitionRef::NoPublisher => "<no-publisher>",
};
tracing::warn!(
partition = label,
offset = state.offset,
"SPARQL partition exceeded overflow offset; sub-partitioning not implemented, stopping scan"
);
state.overflow_partitions += 1;
state.current = None;
}
sleep(PAGE_DELAY).await;
Some((Ok(datasets), state))
}
}))
}
fn uses_data_europa_registry(&self) -> bool {
self.base_url
.host_str()
.map(|host| host.eq_ignore_ascii_case("data.europa.eu"))
.unwrap_or(false)
}
fn paginate_data_europa_registry_stream(
&self,
) -> BoxStream<'_, Result<Vec<DcatDataset>, AppError>> {
struct RegistryState {
remaining_catalogues: Vec<String>,
current_catalogue: Option<String>,
offset: usize,
enumerated: bool,
fetched: usize,
skipped_catalogues: usize,
done: bool,
}
let initial = RegistryState {
remaining_catalogues: Vec::new(),
current_catalogue: None,
offset: 0,
enumerated: false,
fetched: 0,
skipped_catalogues: 0,
done: false,
};
Box::pin(futures::stream::unfold(
initial,
move |mut state| async move {
if state.done {
if state.skipped_catalogues > 0 {
return Some((
Err(AppError::Generic(format!(
"data.europa.eu Registry harvest incomplete: {} catalogues skipped, fetched {} datasets",
state.skipped_catalogues, state.fetched
))),
RegistryState {
skipped_catalogues: 0,
..state
},
));
}
return None;
}
if !state.enumerated {
state.enumerated = true;
match self.enumerate_registry_catalogues().await {
Ok(mut catalogues) => {
tracing::info!(
catalogues = catalogues.len(),
"data.europa.eu Registry catalogue enumeration complete"
);
catalogues.reverse();
state.remaining_catalogues = catalogues;
}
Err(e) => {
return Some((
Err(e),
RegistryState {
done: true,
..state
},
));
}
}
}
if state.current_catalogue.is_none() {
if let Some(catalogue) = state.remaining_catalogues.pop() {
state.current_catalogue = Some(catalogue);
state.offset = 0;
} else {
state.done = true;
return if state.skipped_catalogues > 0 {
Some((Ok(vec![]), state))
} else {
None
};
}
}
let catalogue = state.current_catalogue.as_deref().unwrap();
let mut last_err = None;
let mut page_result = None;
for attempt in 0..=MAX_REGISTRY_PAGE_RETRIES {
match self
.fetch_registry_catalogue_page(catalogue, state.offset, REGISTRY_PAGE_SIZE)
.await
{
Ok(page) => {
page_result = Some(page);
break;
}
Err(e) => {
if attempt < MAX_REGISTRY_PAGE_RETRIES {
let delay = PAGE_RETRY_BASE_DELAY * 2_u32.pow(attempt);
tracing::warn!(
catalogue,
offset = state.offset,
attempt = attempt + 1,
delay_secs = delay.as_secs(),
error = %e,
"data.europa.eu Registry page failed; retrying after backoff"
);
sleep(delay).await;
}
last_err = Some(e);
}
}
}
let page = match page_result {
Some(page) => page,
None => {
let e = last_err.unwrap();
tracing::warn!(
catalogue,
offset = state.offset,
error = %e,
"data.europa.eu Registry catalogue failed after retries; skipping remainder"
);
state.skipped_catalogues += 1;
state.current_catalogue = None;
sleep(PAGE_RETRY_BASE_DELAY).await;
return Some((Ok(vec![]), state));
}
};
let count = page.datasets.len();
state.fetched += count;
tracing::debug!(
catalogue,
offset = state.offset,
page_entries = page.entries,
page_datasets = count,
total_fetched = state.fetched,
"data.europa.eu Registry page fetched"
);
if page.entries < REGISTRY_PAGE_SIZE {
state.current_catalogue = None;
} else {
state.offset += REGISTRY_PAGE_SIZE;
}
sleep(PAGE_DELAY).await;
Some((Ok(page.datasets), state))
},
))
}
async fn enumerate_registry_catalogues(&self) -> Result<Vec<String>, AppError> {
let mut catalogues = Vec::new();
let mut offset = 0usize;
let limit = 5000usize;
loop {
let mut url = self.registry_url(&["catalogues"])?;
url.query_pairs_mut()
.append_pair("limit", &limit.to_string())
.append_pair("offset", &offset.to_string())
.append_pair("valueType", "identifiers");
let resp = self.request_url_with_retry(url).await?;
let page: Vec<String> = resp.json().await.map_err(|e| {
AppError::ClientError(format!(
"data.europa.eu Registry returned non-JSON catalogue list: {e}"
))
})?;
let count = page.len();
catalogues.extend(page.into_iter().filter(|id| !id.is_empty()));
if count < limit {
break;
}
offset += limit;
sleep(PAGE_DELAY).await;
}
Ok(catalogues)
}
async fn fetch_registry_catalogue_page(
&self,
catalogue: &str,
offset: usize,
limit: usize,
) -> Result<RegistryPage, AppError> {
let mut url = self.registry_url(&["catalogues", catalogue, "datasets"])?;
url.query_pairs_mut()
.append_pair("limit", &limit.to_string())
.append_pair("offset", &offset.to_string())
.append_pair("valueType", "metadata")
.append_pair("hydra", "false");
let resp = self.request_url_with_retry(url).await?;
let body: Value = resp.json().await.map_err(|e| {
AppError::ClientError(format!(
"data.europa.eu Registry returned non-JSON metadata page: {e}"
))
})?;
Ok(self.extract_registry_page(body))
}
fn registry_url(&self, segments: &[&str]) -> Result<Url, AppError> {
let mut url = self.base_url.join("api/hub/repo/").map_err(|e| {
AppError::Generic(format!("Failed to build data.europa.eu Registry URL: {e}"))
})?;
{
let mut path = url
.path_segments_mut()
.map_err(|_| AppError::Generic("Registry URL cannot be a base URL".to_string()))?;
path.pop_if_empty();
for segment in segments {
path.push(segment);
}
}
Ok(url)
}
fn extract_registry_page(&self, body: Value) -> RegistryPage {
let Some(graph) = body.get("@graph").and_then(|v| v.as_array()) else {
return RegistryPage {
datasets: Vec::new(),
entries: 0,
};
};
let mut datasets = Vec::new();
let mut entries = 0usize;
for node in graph {
if is_hydra_view_node(node) {
continue;
}
if is_dataset_node(node) {
entries += 1;
if let Some(dataset) = extract_dataset(node, &self.language) {
datasets.push(dataset);
}
continue;
}
if let Some(nested) = node.get("@graph").and_then(|v| v.as_array()) {
entries += 1;
if let Some(dataset_node) = nested.iter().find(|n| is_dataset_node(n))
&& let Some(mut dataset) = extract_dataset(dataset_node, &self.language)
{
dataset.raw = node.clone();
datasets.push(dataset);
}
}
}
RegistryPage { datasets, entries }
}
async fn enumerate_publishers(&self) -> Result<Vec<String>, AppError> {
let mut publishers = Vec::new();
let mut offset = 0usize;
let limit = self.page_size;
loop {
let query = format!(
"PREFIX dcat: <http://www.w3.org/ns/dcat#>\n\
PREFIX dct: <http://purl.org/dc/terms/>\n\
\n\
SELECT DISTINCT ?pub WHERE {{\n\
?d a dcat:Dataset ; dct:publisher ?pub .\n\
}}\n\
LIMIT {limit} OFFSET {offset}"
);
let bindings = self.execute_query(&query).await?;
if bindings.is_empty() {
break;
}
let count = bindings.len();
for b in bindings {
if let Some(v) = b.get("pub") {
if v.value.starts_with("http://") || v.value.starts_with("https://") {
publishers.push(v.value.clone());
}
}
}
if count < limit {
break;
}
offset += limit;
sleep(PAGE_DELAY).await;
if offset >= 100_000 {
tracing::warn!(
offset,
publishers = publishers.len(),
"Publisher enumeration exceeded 100k offset; truncating"
);
break;
}
}
Ok(publishers)
}
async fn fetch_partition_page(
&self,
partition: &PartitionRef<'_>,
offset: usize,
limit: usize,
) -> Result<Vec<DcatDataset>, AppError> {
let publisher_clause = match partition {
PartitionRef::Publisher(uri) => {
if !(uri.starts_with("http://") || uri.starts_with("https://"))
|| uri.contains('<')
|| uri.contains('>')
{
return Err(AppError::Generic(format!("Invalid publisher IRI: {uri}")));
}
format!("?dataset dct:publisher <{uri}> .")
}
PartitionRef::NoPublisher => {
"FILTER NOT EXISTS {{ ?dataset dct:publisher ?anyPub }}".to_string()
}
};
let title_filter = self
.lang_filter_alternatives()
.iter()
.map(|l| format!("lang(?title) = \"{l}\""))
.collect::<Vec<_>>()
.join(" || ");
let query = format!(
"PREFIX dcat: <http://www.w3.org/ns/dcat#>\n\
PREFIX dct: <http://purl.org/dc/terms/>\n\
\n\
SELECT ?dataset ?title ?description ?identifier ?modified\n\
WHERE {{\n\
?dataset a dcat:Dataset .\n\
{publisher_clause}\n\
?dataset dct:title ?title .\n\
FILTER ({title_filter})\n\
OPTIONAL {{ ?dataset dct:description ?description }}\n\
OPTIONAL {{ ?dataset dct:identifier ?identifier }}\n\
OPTIONAL {{ ?dataset dct:modified ?modified }}\n\
}}\n\
LIMIT {limit} OFFSET {offset}"
);
let bindings = self.execute_query(&query).await?;
Ok(self.bindings_to_datasets(bindings))
}
async fn paginate_sparql(
&self,
extra_filter: Option<&str>,
) -> Result<Vec<DcatDataset>, AppError> {
let mut all_datasets = Vec::new();
let mut offset = 0usize;
let mut current_page_size = self.page_size;
'pages: loop {
let query =
self.build_dataset_query_with_limit(offset, current_page_size, extra_filter);
let bindings = 'retry: {
for page_attempt in 0..=MAX_PAGE_RETRIES {
match self.execute_query(&query).await {
Ok(b) => break 'retry b,
Err(e) => {
if is_server_overload(&e) && current_page_size > MIN_PAGE_SIZE {
let new_size = (current_page_size / 2).max(MIN_PAGE_SIZE);
tracing::warn!(
offset,
old_page_size = current_page_size,
new_page_size = new_size,
error = %e,
"SPARQL server overload; reducing page size and retrying"
);
current_page_size = new_size;
sleep(PAGE_RETRY_BASE_DELAY).await;
continue 'pages;
}
if all_datasets.is_empty() && page_attempt == MAX_PAGE_RETRIES {
return Err(e);
}
if page_attempt < MAX_PAGE_RETRIES {
let delay = PAGE_RETRY_BASE_DELAY * 2_u32.pow(page_attempt);
tracing::warn!(
offset,
attempt = page_attempt + 1,
delay_secs = delay.as_secs(),
error = %e,
"SPARQL page fetch failed; retrying after backoff"
);
sleep(delay).await;
continue;
}
tracing::warn!(
offset,
collected = all_datasets.len(),
error = %e,
"SPARQL page fetch failed after retries; returning partial results"
);
break 'pages;
}
}
}
break 'pages;
};
if bindings.is_empty() {
break;
}
let count = bindings.len();
let datasets = self.bindings_to_datasets(bindings);
tracing::debug!(
offset,
page_size = current_page_size,
page_datasets = datasets.len(),
total_fetched = all_datasets.len() + datasets.len(),
"SPARQL page fetched"
);
all_datasets.extend(datasets);
if count < current_page_size {
break;
}
offset += current_page_size;
sleep(PAGE_DELAY).await;
}
Ok(all_datasets)
}
#[cfg(test)]
fn build_dataset_query(&self, offset: usize, extra_filter: Option<&str>) -> String {
self.build_dataset_query_with_limit(offset, self.page_size, extra_filter)
}
fn build_dataset_query_with_limit(
&self,
offset: usize,
limit: usize,
extra_filter: Option<&str>,
) -> String {
let extra = extra_filter.unwrap_or("");
let title_filter = self
.lang_filter_alternatives()
.iter()
.map(|l| format!("lang(?title) = \"{l}\""))
.collect::<Vec<_>>()
.join(" || ");
format!(
"PREFIX dcat: <http://www.w3.org/ns/dcat#>\n\
PREFIX dct: <http://purl.org/dc/terms/>\n\
\n\
SELECT ?dataset ?title ?description ?identifier ?modified\n\
WHERE {{\n\
?dataset a dcat:Dataset .\n\
?dataset dct:title ?title .\n\
FILTER ({title_filter})\n\
OPTIONAL {{ ?dataset dct:description ?description }}\n\
OPTIONAL {{ ?dataset dct:identifier ?identifier }}\n\
OPTIONAL {{ ?dataset dct:modified ?modified }}\n\
{extra}\n\
}}\n\
LIMIT {limit} OFFSET {offset}",
)
}
async fn execute_query(
&self,
query: &str,
) -> Result<Vec<HashMap<String, SparqlValue>>, AppError> {
let resp = self.request_with_retry(query).await?;
let body: SparqlResponse = resp.json().await.map_err(|e| {
AppError::ClientError(format!("SPARQL endpoint returned non-JSON response: {e}"))
})?;
Ok(body.results.bindings)
}
async fn request_with_retry(&self, query: &str) -> Result<reqwest::Response, AppError> {
let http_config = HttpConfig::default();
let max_retries = http_config.max_retries;
let base_delay = http_config.retry_base_delay;
let mut last_error = AppError::Generic("No attempts made".to_string());
for attempt in 1..=max_retries {
let body = url::form_urlencoded::Serializer::new(String::new())
.append_pair("query", query)
.finish();
match self
.client
.post(self.endpoint_url.clone())
.header("Accept", "application/sparql-results+json")
.header("Content-Type", "application/x-www-form-urlencoded")
.body(body)
.send()
.await
{
Ok(resp) => {
let status = resp.status();
if status.is_success() {
return Ok(resp);
}
if status == StatusCode::TOO_MANY_REQUESTS {
last_error = AppError::RateLimitExceeded;
if attempt < max_retries {
let delay = resp
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or_else(|| {
(base_delay * 2_u32.pow(attempt)).min(MAX_RETRY_DELAY)
});
sleep(delay).await;
continue;
}
return Err(AppError::RateLimitExceeded);
}
if status.is_server_error() {
last_error = AppError::ClientError(format!(
"Server error: HTTP {}",
status.as_u16()
));
if attempt < max_retries {
sleep(base_delay * attempt).await;
continue;
}
}
return Err(AppError::ClientError(format!(
"HTTP {} from {}",
status.as_u16(),
self.endpoint_url
)));
}
Err(e) => {
if e.is_timeout() {
last_error = AppError::Timeout(REQUEST_TIMEOUT.as_secs());
} else if e.is_connect() {
last_error = AppError::NetworkError(format!("Connection failed: {e}"));
} else {
last_error = AppError::ClientError(e.to_string());
}
if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
sleep(base_delay * attempt).await;
continue;
}
}
}
}
Err(last_error)
}
async fn request_url_with_retry(&self, url: Url) -> Result<reqwest::Response, AppError> {
let http_config = HttpConfig::default();
let max_retries = http_config.max_retries;
let base_delay = http_config.retry_base_delay;
let mut last_error = AppError::Generic("No attempts made".to_string());
for attempt in 1..=max_retries {
match self
.client
.get(url.clone())
.header("Accept", "application/ld+json, application/json")
.send()
.await
{
Ok(resp) => {
let status = resp.status();
if status.is_success() {
return Ok(resp);
}
if status == StatusCode::TOO_MANY_REQUESTS {
last_error = AppError::RateLimitExceeded;
if attempt < max_retries {
let delay = resp
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or_else(|| {
(base_delay * 2_u32.pow(attempt)).min(MAX_RETRY_DELAY)
});
sleep(delay).await;
continue;
}
return Err(AppError::RateLimitExceeded);
}
if status.is_server_error() {
last_error = AppError::ClientError(format!(
"Server error: HTTP {}",
status.as_u16()
));
if attempt < max_retries {
sleep(base_delay * attempt).await;
continue;
}
}
return Err(AppError::ClientError(format!(
"HTTP {} from {}",
status.as_u16(),
url
)));
}
Err(e) => {
if e.is_timeout() {
last_error = AppError::Timeout(REQUEST_TIMEOUT.as_secs());
} else if e.is_connect() {
last_error = AppError::NetworkError(format!("Connection failed: {e}"));
} else {
last_error = AppError::ClientError(e.to_string());
}
if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
sleep(base_delay * attempt).await;
continue;
}
}
}
}
Err(last_error)
}
fn lang_filter_alternatives(&self) -> Vec<&str> {
let lang = self.language.as_str();
let mut alts = vec![lang];
match lang {
"nb" | "nn" | "no" => {
for &sibling in &["nb", "nn", "no"] {
if sibling != lang {
alts.push(sibling);
}
}
}
_ => {}
}
alts.push("en");
alts.push(""); alts
}
fn are_sibling_languages(a: &str, b: &str) -> bool {
matches!((a, b), ("nb" | "nn" | "no", "nb" | "nn" | "no"))
}
fn lang_rank(&self, lang: Option<&str>) -> u8 {
match lang {
Some(l) if l == self.language => 3,
Some(l) if Self::are_sibling_languages(l, &self.language) => 2,
Some("en") => 1,
_ => 0,
}
}
fn best_value<'a>(
&self,
bindings: &'a [HashMap<String, SparqlValue>],
key: &str,
) -> Option<&'a SparqlValue> {
bindings
.iter()
.filter_map(|b| b.get(key))
.filter(|v| !v.value.is_empty())
.max_by(|a, b| {
self.lang_rank(a.lang.as_deref())
.cmp(&self.lang_rank(b.lang.as_deref()))
})
}
fn bindings_to_datasets(
&self,
bindings: Vec<HashMap<String, SparqlValue>>,
) -> Vec<DcatDataset> {
let mut grouped: HashMap<String, Vec<HashMap<String, SparqlValue>>> = HashMap::new();
for binding in bindings {
let Some(dataset_uri) = binding.get("dataset").map(|v| v.value.clone()) else {
continue;
};
grouped.entry(dataset_uri).or_default().push(binding);
}
let mut datasets = Vec::new();
let mut seen_identifiers: std::collections::HashSet<String> =
std::collections::HashSet::new();
for (dataset_uri, rows) in &grouped {
let Some(title_val) = self.best_value(rows, "title") else {
continue; };
let title = title_val.value.clone();
let description = self
.best_value(rows, "description")
.map(|v| v.value.clone());
let identifier = rows
.iter()
.filter_map(|b| b.get("identifier"))
.map(|v| v.value.clone())
.find(|s| !s.is_empty())
.unwrap_or_else(|| extract_last_segment(dataset_uri));
if !seen_identifiers.insert(identifier.clone()) {
continue;
}
let raw_binding = rows
.iter()
.find(|b| b.get("title").map(|v| v.value.as_str()) == Some(title.as_str()))
.unwrap_or(&rows[0]);
let raw = binding_to_json(raw_binding);
datasets.push(DcatDataset {
id_uri: dataset_uri.clone(),
identifier,
title,
description,
raw,
});
}
datasets
}
}
fn is_server_overload(e: &AppError) -> bool {
let msg = e.to_string();
msg.contains("HTTP 500")
|| msg.contains("HTTP 502")
|| msg.contains("HTTP 503")
|| msg.contains("HTTP 504")
|| msg.contains("timed out")
|| msg.contains("timeout")
}
fn is_hydra_view_node(node: &Value) -> bool {
let Some(type_value) = node.get("@type") else {
return false;
};
let is_view_type = |s: &str| {
s == "hydra:PartialCollectionView"
|| s == "http://www.w3.org/ns/hydra/core#PartialCollectionView"
};
match type_value {
Value::String(s) => is_view_type(s),
Value::Array(arr) => arr
.iter()
.any(|v| v.as_str().map(is_view_type).unwrap_or(false)),
_ => false,
}
}
fn extract_last_segment(uri: &str) -> String {
uri.trim_end_matches('/')
.rsplit('/')
.next()
.unwrap_or(uri)
.to_string()
}
fn binding_to_json(binding: &HashMap<String, SparqlValue>) -> Value {
let mut map = serde_json::Map::new();
for (key, val) in binding {
let mut entry = serde_json::Map::new();
entry.insert("value".to_string(), json!(val.value));
if let Some(ref lang) = val.lang {
entry.insert("xml:lang".to_string(), json!(lang));
}
map.insert(key.clone(), Value::Object(entry));
}
Value::Object(map)
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_sparql_json() -> &'static str {
r#"{
"head": { "vars": ["dataset", "title", "description", "identifier", "modified"] },
"results": {
"bindings": [
{
"dataset": { "type": "uri", "value": "https://data.europa.eu/dataset/1234" },
"title": { "type": "literal", "value": "Air Quality Data", "xml:lang": "en" },
"description": { "type": "literal", "value": "Air quality measurements", "xml:lang": "en" },
"identifier": { "type": "literal", "value": "air-quality-1234" }
},
{
"dataset": { "type": "uri", "value": "https://data.europa.eu/dataset/5678" },
"title": { "type": "literal", "value": "Population Census", "xml:lang": "en" }
}
]
}
}"#
}
#[test]
fn parse_sparql_response() {
let resp: SparqlResponse = serde_json::from_str(sample_sparql_json()).unwrap();
assert_eq!(resp.results.bindings.len(), 2);
let first = &resp.results.bindings[0];
assert_eq!(
first["dataset"].value,
"https://data.europa.eu/dataset/1234"
);
assert_eq!(first["title"].value, "Air Quality Data");
assert_eq!(first["title"].lang.as_deref(), Some("en"));
assert_eq!(first["identifier"].value, "air-quality-1234");
let second = &resp.results.bindings[1];
assert_eq!(
second["dataset"].value,
"https://data.europa.eu/dataset/5678"
);
assert!(second.get("description").is_none());
}
#[test]
fn bindings_to_datasets_basic() {
let client = SparqlDcatClient::new("https://data.europa.eu", "en", None).unwrap();
let resp: SparqlResponse = serde_json::from_str(sample_sparql_json()).unwrap();
let datasets = client.bindings_to_datasets(resp.results.bindings);
assert_eq!(datasets.len(), 2);
let air = datasets
.iter()
.find(|d| d.id_uri == "https://data.europa.eu/dataset/1234")
.unwrap();
assert_eq!(air.title, "Air Quality Data");
assert_eq!(air.description.as_deref(), Some("Air quality measurements"));
assert_eq!(air.identifier, "air-quality-1234");
let pop = datasets
.iter()
.find(|d| d.id_uri == "https://data.europa.eu/dataset/5678")
.unwrap();
assert_eq!(pop.title, "Population Census");
assert!(pop.description.is_none());
assert_eq!(pop.identifier, "5678"); }
#[test]
fn bindings_deduplicates_by_uri() {
let client = SparqlDcatClient::new("https://data.europa.eu", "en", None).unwrap();
let json = r#"{
"results": {
"bindings": [
{
"dataset": { "type": "uri", "value": "https://example.org/d/1" },
"title": { "type": "literal", "value": "First Title", "xml:lang": "en" }
},
{
"dataset": { "type": "uri", "value": "https://example.org/d/1" },
"title": { "type": "literal", "value": "Duplicate Title", "xml:lang": "en" }
}
]
}
}"#;
let resp: SparqlResponse = serde_json::from_str(json).unwrap();
let datasets = client.bindings_to_datasets(resp.results.bindings);
assert_eq!(datasets.len(), 1);
}
#[test]
fn bindings_prefers_requested_language() {
let client = SparqlDcatClient::new("https://data.europa.eu", "nb", None).unwrap();
let json = r#"{
"results": {
"bindings": [
{
"dataset": { "type": "uri", "value": "https://example.org/d/1" },
"title": { "type": "literal", "value": "English Title", "xml:lang": "en" },
"description": { "type": "literal", "value": "English desc", "xml:lang": "en" }
},
{
"dataset": { "type": "uri", "value": "https://example.org/d/1" },
"title": { "type": "literal", "value": "Norsk Tittel", "xml:lang": "nb" },
"description": { "type": "literal", "value": "Norsk beskrivelse", "xml:lang": "nb" }
}
]
}
}"#;
let resp: SparqlResponse = serde_json::from_str(json).unwrap();
let datasets = client.bindings_to_datasets(resp.results.bindings);
assert_eq!(datasets.len(), 1);
assert_eq!(datasets[0].title, "Norsk Tittel");
assert_eq!(
datasets[0].description.as_deref(),
Some("Norsk beskrivelse")
);
}
#[test]
fn bindings_skip_empty_title() {
let client = SparqlDcatClient::new("https://data.europa.eu", "en", None).unwrap();
let json = r#"{
"results": {
"bindings": [
{
"dataset": { "type": "uri", "value": "https://example.org/d/1" },
"title": { "type": "literal", "value": "" }
},
{
"dataset": { "type": "uri", "value": "https://example.org/d/2" },
"title": { "type": "literal", "value": "Valid Title" }
}
]
}
}"#;
let resp: SparqlResponse = serde_json::from_str(json).unwrap();
let datasets = client.bindings_to_datasets(resp.results.bindings);
assert_eq!(datasets.len(), 1);
assert_eq!(datasets[0].title, "Valid Title");
}
fn binding(
uri: &str,
key: &str,
value: &str,
lang: Option<&str>,
) -> HashMap<String, SparqlValue> {
let mut b = HashMap::new();
b.insert(
"dataset".to_string(),
SparqlValue {
value: uri.to_string(),
lang: None,
},
);
b.insert(
key.to_string(),
SparqlValue {
value: value.to_string(),
lang: lang.map(str::to_string),
},
);
b
}
#[test]
fn lang_selection_is_order_independent() {
let client = SparqlDcatClient::new("https://example.org", "it", None).unwrap();
let en_first = vec![
binding("https://example.org/d/1", "title", "English", Some("en")),
binding("https://example.org/d/1", "title", "Italiano", Some("it")),
];
let it_first = vec![
binding("https://example.org/d/1", "title", "Italiano", Some("it")),
binding("https://example.org/d/1", "title", "English", Some("en")),
];
assert_eq!(
client.best_value(&en_first, "title").unwrap().value,
"Italiano"
);
assert_eq!(
client.best_value(&it_first, "title").unwrap().value,
"Italiano"
);
}
#[test]
fn lang_falls_back_to_sibling_then_en_then_untagged() {
let client = SparqlDcatClient::new("https://example.org", "nn", None).unwrap();
let rows = vec![
binding("https://example.org/d/1", "title", "Untagged", None),
binding("https://example.org/d/1", "title", "English", Some("en")),
binding("https://example.org/d/1", "title", "Bokmål", Some("nb")),
];
assert_eq!(client.best_value(&rows, "title").unwrap().value, "Bokmål");
let rows = vec![
binding("https://example.org/d/2", "title", "Untagged", None),
binding("https://example.org/d/2", "title", "English", Some("en")),
];
assert_eq!(client.best_value(&rows, "title").unwrap().value, "English");
}
#[test]
fn title_and_description_selected_independently() {
let client = SparqlDcatClient::new("https://example.org", "fr", None).unwrap();
let json = r#"{
"results": {
"bindings": [
{
"dataset": { "type": "uri", "value": "https://example.org/d/1" },
"title": { "type": "literal", "value": "Titre FR", "xml:lang": "fr" },
"description": { "type": "literal", "value": "English description", "xml:lang": "en" }
},
{
"dataset": { "type": "uri", "value": "https://example.org/d/1" },
"title": { "type": "literal", "value": "English Title", "xml:lang": "en" }
}
]
}
}"#;
let resp: SparqlResponse = serde_json::from_str(json).unwrap();
let datasets = client.bindings_to_datasets(resp.results.bindings);
assert_eq!(datasets.len(), 1);
assert_eq!(datasets[0].title, "Titre FR");
assert_eq!(
datasets[0].description.as_deref(),
Some("English description")
);
}
#[test]
fn lang_rank_orders_preferred_sibling_en_other() {
let client = SparqlDcatClient::new("https://example.org", "nb", None).unwrap();
assert_eq!(client.lang_rank(Some("nb")), 3); assert_eq!(client.lang_rank(Some("nn")), 2); assert_eq!(client.lang_rank(Some("en")), 1); assert_eq!(client.lang_rank(Some("de")), 0); assert_eq!(client.lang_rank(None), 0); }
#[test]
fn extract_last_segment_basic() {
assert_eq!(
extract_last_segment("https://data.europa.eu/dataset/1234"),
"1234"
);
}
#[test]
fn extract_last_segment_trailing_slash() {
assert_eq!(
extract_last_segment("https://data.europa.eu/dataset/1234/"),
"1234"
);
}
#[test]
fn build_dataset_query_basic() {
let client = SparqlDcatClient::new("https://data.europa.eu", "en", None).unwrap();
let query = client.build_dataset_query(0, None);
assert!(query.contains("dcat:Dataset"));
assert!(query.contains("dct:title"));
assert!(!query.contains("ORDER BY"));
assert!(query.contains("LIMIT 1000 OFFSET 0"));
assert!(query.contains("lang(?title) = \"en\""));
assert!(!query.contains("lang(?description)"));
}
#[test]
fn build_dataset_query_with_offset() {
let client = SparqlDcatClient::new("https://data.europa.eu", "nb", None).unwrap();
let query = client.build_dataset_query(2000, None);
assert!(query.contains("LIMIT 1000 OFFSET 2000"));
assert!(query.contains("lang(?title) = \"nb\""));
assert!(query.contains("lang(?title) = \"nn\""));
assert!(query.contains("lang(?title) = \"no\""));
}
#[test]
fn build_dataset_query_with_filter() {
let client = SparqlDcatClient::new("https://data.europa.eu", "en", None).unwrap();
let filter = "FILTER (?modified > \"2026-01-01T00:00:00Z\"^^<http://www.w3.org/2001/XMLSchema#dateTime>)";
let query = client.build_dataset_query(0, Some(filter));
assert!(query.contains(filter));
}
#[test]
fn modified_since_filter_allows_unbound_modified() {
let since = chrono::DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc);
let filter = SparqlDcatClient::modified_since_filter(since);
assert!(filter.contains("!BOUND(?modified)"));
assert!(filter.contains("?modified > \"2026-01-01T00:00:00+00:00\""));
let bound_pos = filter.find("!BOUND(?modified)").unwrap();
let cmp_pos = filter.find("?modified >").unwrap();
assert!(bound_pos < cmp_pos);
}
#[test]
fn sparql_client_new_valid() {
let client = SparqlDcatClient::new("https://data.europa.eu", "en", None).unwrap();
assert_eq!(client.portal_type(), "dcat");
assert_eq!(client.base_url(), "https://data.europa.eu/");
assert_eq!(
client.endpoint_url.as_str(),
"https://data.europa.eu/sparql"
);
}
#[test]
fn sparql_client_new_with_custom_endpoint() {
let client = SparqlDcatClient::new(
"https://data.norge.no",
"nb",
Some("https://sparql.fellesdatakatalog.digdir.no"),
)
.unwrap();
assert_eq!(client.base_url(), "https://data.norge.no/");
assert_eq!(
client.endpoint_url.as_str(),
"https://sparql.fellesdatakatalog.digdir.no/"
);
}
#[test]
fn sparql_client_new_invalid_url() {
let result = SparqlDcatClient::new("not-a-url", "en", None);
assert!(matches!(result, Err(AppError::InvalidPortalUrl(_))));
}
#[test]
fn sparql_client_rejects_invalid_language() {
let result = SparqlDcatClient::new("https://data.europa.eu", "en\"}", None);
assert!(matches!(result, Err(AppError::ConfigError(_))));
let result = SparqlDcatClient::new("https://data.europa.eu", "", None);
assert!(matches!(result, Err(AppError::ConfigError(_))));
}
#[test]
fn bindings_deduplicates_by_identifier() {
let client = SparqlDcatClient::new("https://data.europa.eu", "en", None).unwrap();
let json = r#"{
"results": {
"bindings": [
{
"dataset": { "type": "uri", "value": "https://example.org/a/shared-id" },
"title": { "type": "literal", "value": "First", "xml:lang": "en" }
},
{
"dataset": { "type": "uri", "value": "https://example.org/b/shared-id" },
"title": { "type": "literal", "value": "Second", "xml:lang": "en" }
}
]
}
}"#;
let resp: SparqlResponse = serde_json::from_str(json).unwrap();
let datasets = client.bindings_to_datasets(resp.results.bindings);
assert_eq!(datasets.len(), 1);
assert_eq!(datasets[0].identifier, "shared-id");
}
#[test]
fn registry_page_extracts_nested_dataset_graphs() {
let client = SparqlDcatClient::new("https://data.europa.eu", "en", None).unwrap();
let body = json!({
"@graph": [
{
"@type": "hydra:PartialCollectionView",
"hydra:totalItems": {"@value": "1"}
},
{
"@id": "http://data.europa.eu/88u/dataset/example",
"@graph": [
{
"@id": "http://data.europa.eu/88u/distribution/example",
"@type": "dcat:Distribution"
},
{
"@id": "http://data.europa.eu/88u/dataset/example",
"@type": "dcat:Dataset",
"dct:identifier": {"@value": "example-id"},
"dct:title": [
{"@language": "de", "@value": "Deutscher Titel"},
{"@language": "en-t-de-t0-mtec", "@value": "English Title"}
],
"dct:description": {"@language": "en", "@value": "Description"}
}
]
}
]
});
let page = client.extract_registry_page(body);
assert_eq!(page.entries, 1);
assert_eq!(page.datasets.len(), 1);
assert_eq!(page.datasets[0].identifier, "example-id");
assert_eq!(page.datasets[0].title, "English Title");
assert_eq!(page.datasets[0].description.as_deref(), Some("Description"));
}
#[test]
fn binding_to_json_preserves_fields() {
let mut binding = HashMap::new();
binding.insert(
"dataset".to_string(),
SparqlValue {
value: "https://example.org/d/1".to_string(),
lang: None,
},
);
binding.insert(
"title".to_string(),
SparqlValue {
value: "Test Title".to_string(),
lang: Some("en".to_string()),
},
);
let json = binding_to_json(&binding);
let obj = json.as_object().unwrap();
assert_eq!(obj["dataset"]["value"], "https://example.org/d/1");
assert_eq!(obj["title"]["value"], "Test Title");
assert_eq!(obj["title"]["xml:lang"], "en");
}
#[tokio::test]
#[ignore = "requires network access to data.europa.eu"]
async fn test_sparql_smoke_europa() {
let client = SparqlDcatClient::new("https://data.europa.eu", "en", None).unwrap();
let count = client.dataset_count().await.unwrap();
assert!(count > 100, "Expected >100 datasets, got {}", count);
let datasets = client.search_all_datasets().await.unwrap();
assert!(
!datasets.is_empty(),
"Expected at least some datasets from data.europa.eu"
);
let first = &datasets[0];
assert!(!first.title.is_empty(), "First dataset title is empty");
assert!(!first.id_uri.is_empty(), "First dataset id_uri is empty");
}
}