use ceres_core::config::PortalType;
use ceres_core::error::AppError;
use ceres_core::models::NewDataset;
use ceres_core::traits::{PortalClient, PortalClientFactory};
use chrono::{DateTime, Utc};
use futures::StreamExt;
use futures::stream::BoxStream;
use crate::ckan::{CkanClient, CkanDataset};
use crate::dcat::{DcatClient, DcatDataset};
use crate::sparql::SparqlDcatClient;
#[derive(Debug, Clone)]
pub enum PortalDataEnum {
Ckan(CkanDataset),
Dcat(DcatDataset),
}
#[derive(Clone)]
pub enum PortalClientEnum {
Ckan(CkanClient),
Dcat(DcatClient),
SparqlDcat(SparqlDcatClient),
}
impl PortalClient for PortalClientEnum {
type PortalData = PortalDataEnum;
fn portal_type(&self) -> &'static str {
match self {
Self::Ckan(c) => c.portal_type(),
Self::Dcat(c) => c.portal_type(),
Self::SparqlDcat(c) => c.portal_type(),
}
}
fn base_url(&self) -> &str {
match self {
Self::Ckan(c) => c.base_url(),
Self::Dcat(c) => c.base_url(),
Self::SparqlDcat(c) => c.base_url(),
}
}
async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
match self {
Self::Ckan(c) => c.list_dataset_ids().await,
Self::Dcat(c) => c.list_dataset_ids().await,
Self::SparqlDcat(c) => c.list_dataset_ids().await,
}
}
async fn get_dataset(&self, id: &str) -> Result<Self::PortalData, AppError> {
match self {
Self::Ckan(c) => c.get_dataset(id).await.map(PortalDataEnum::Ckan),
Self::Dcat(c) => c.get_dataset(id).await.map(PortalDataEnum::Dcat),
Self::SparqlDcat(c) => c.get_dataset(id).await.map(PortalDataEnum::Dcat),
}
}
fn into_new_dataset(
data: Self::PortalData,
portal_url: &str,
url_template: Option<&str>,
language: &str,
) -> NewDataset {
match data {
PortalDataEnum::Ckan(ckan_data) => {
CkanClient::into_new_dataset(ckan_data, portal_url, url_template, language)
}
PortalDataEnum::Dcat(dcat_data) => {
DcatClient::into_new_dataset(dcat_data, portal_url, url_template, language)
}
}
}
async fn search_modified_since(
&self,
since: DateTime<Utc>,
) -> Result<Vec<Self::PortalData>, AppError> {
match self {
Self::Ckan(c) => c
.search_modified_since(since)
.await
.map(|datasets| datasets.into_iter().map(PortalDataEnum::Ckan).collect()),
Self::Dcat(c) => c
.search_modified_since(since)
.await
.map(|datasets| datasets.into_iter().map(PortalDataEnum::Dcat).collect()),
Self::SparqlDcat(c) => c
.search_modified_since(since)
.await
.map(|datasets| datasets.into_iter().map(PortalDataEnum::Dcat).collect()),
}
}
async fn search_all_datasets(&self) -> Result<Vec<Self::PortalData>, AppError> {
match self {
Self::Ckan(c) => c
.search_all_datasets()
.await
.map(|datasets| datasets.into_iter().map(PortalDataEnum::Ckan).collect()),
Self::Dcat(c) => c
.search_all_datasets()
.await
.map(|datasets| datasets.into_iter().map(PortalDataEnum::Dcat).collect()),
Self::SparqlDcat(c) => c
.search_all_datasets()
.await
.map(|datasets| datasets.into_iter().map(PortalDataEnum::Dcat).collect()),
}
}
fn search_all_datasets_stream(&self) -> BoxStream<'_, Result<Vec<Self::PortalData>, AppError>> {
match self {
Self::Ckan(c) => Box::pin(StreamExt::map(
c.search_all_datasets_stream(),
|r: Result<Vec<CkanDataset>, AppError>| {
r.map(|datasets| datasets.into_iter().map(PortalDataEnum::Ckan).collect())
},
)),
Self::Dcat(c) => Box::pin(StreamExt::map(
c.paginate_catalog_stream(None),
|r: Result<Vec<DcatDataset>, AppError>| {
r.map(|datasets| datasets.into_iter().map(PortalDataEnum::Dcat).collect())
},
)),
Self::SparqlDcat(c) => Box::pin(StreamExt::map(
c.paginate_sparql_stream(),
|r: Result<Vec<DcatDataset>, AppError>| {
r.map(|datasets| datasets.into_iter().map(PortalDataEnum::Dcat).collect())
},
)),
}
}
async fn dataset_count(&self) -> Result<usize, AppError> {
match self {
Self::Ckan(c) => c.dataset_count().await,
Self::Dcat(_) => Err(AppError::Generic(
"dataset_count not supported for DCAT udata REST portals".to_string(),
)),
Self::SparqlDcat(c) => c.dataset_count().await,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PortalClientFactoryEnum;
impl PortalClientFactoryEnum {
pub fn new() -> Self {
Self
}
}
impl PortalClientFactory for PortalClientFactoryEnum {
type Client = PortalClientEnum;
fn create(
&self,
portal_url: &str,
portal_type: PortalType,
language: &str,
profile: Option<&str>,
sparql_endpoint: Option<&str>,
) -> Result<Self::Client, AppError> {
match portal_type {
PortalType::Ckan => Ok(PortalClientEnum::Ckan(CkanClient::new(portal_url)?)),
PortalType::Dcat => match profile {
Some("sparql") => Ok(PortalClientEnum::SparqlDcat(SparqlDcatClient::new(
portal_url,
language,
sparql_endpoint,
)?)),
None | Some("" | "udata_rest") => Ok(PortalClientEnum::Dcat(DcatClient::new(
portal_url, language,
)?)),
Some(other) => Err(AppError::ConfigError(format!(
"Unsupported DCAT profile '{other}'. Supported profiles are 'sparql' and 'udata_rest'."
))),
},
other => Err(AppError::ConfigError(format!(
"Portal type '{}' is not yet supported.",
other
))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_factory_creates_ckan_client() {
let factory = PortalClientFactoryEnum::new();
let client = factory.create(
"https://dati.comune.milano.it",
PortalType::Ckan,
"en",
None,
None,
);
assert!(client.is_ok());
let client = client.unwrap();
assert_eq!(client.portal_type(), "ckan");
assert_eq!(client.base_url(), "https://dati.comune.milano.it/");
}
#[test]
fn test_factory_creates_dcat_client() {
let factory = PortalClientFactoryEnum::new();
let client = factory.create("https://data.public.lu", PortalType::Dcat, "fr", None, None);
assert!(client.is_ok());
let client = client.unwrap();
assert_eq!(client.portal_type(), "dcat");
assert_eq!(client.base_url(), "https://data.public.lu/");
}
#[test]
fn test_factory_creates_sparql_dcat_client() {
let factory = PortalClientFactoryEnum::new();
let client = factory.create(
"https://data.europa.eu",
PortalType::Dcat,
"en",
Some("sparql"),
None,
);
assert!(client.is_ok());
let client = client.unwrap();
assert_eq!(client.portal_type(), "dcat");
assert_eq!(client.base_url(), "https://data.europa.eu/");
assert!(matches!(client, PortalClientEnum::SparqlDcat(_)));
}
#[test]
fn test_factory_creates_sparql_dcat_client_with_custom_endpoint() {
let factory = PortalClientFactoryEnum::new();
let client = factory.create(
"https://data.norge.no",
PortalType::Dcat,
"nb",
Some("sparql"),
Some("https://sparql.fellesdatakatalog.digdir.no"),
);
assert!(client.is_ok());
let client = client.unwrap();
assert_eq!(client.portal_type(), "dcat");
assert_eq!(client.base_url(), "https://data.norge.no/");
assert!(matches!(client, PortalClientEnum::SparqlDcat(_)));
}
#[test]
fn test_factory_dcat_default_profile_is_udata_rest() {
let factory = PortalClientFactoryEnum::new();
let client = factory
.create("https://data.public.lu", PortalType::Dcat, "fr", None, None)
.unwrap();
assert!(matches!(client, PortalClientEnum::Dcat(_)));
}
#[test]
fn test_factory_rejects_unknown_dcat_profile() {
let factory = PortalClientFactoryEnum::new();
let result = factory.create(
"https://data.public.lu",
PortalType::Dcat,
"en",
Some("spqarql"),
None,
);
assert!(result.is_err());
}
#[test]
fn test_factory_rejects_unsupported_type() {
let factory = PortalClientFactoryEnum::new();
let result = factory.create(
"https://data.cityofnewyork.us",
PortalType::Socrata,
"en",
None,
None,
);
assert!(result.is_err());
}
}