1use std::time::Duration;
24
25use ceres_core::HttpConfig;
26use ceres_core::error::AppError;
27use ceres_core::models::NewDataset;
28use chrono::{DateTime, Utc};
29use futures::stream::BoxStream;
30use reqwest::{Client, StatusCode, Url};
31use serde_json::Value;
32use tokio::time::sleep;
33
34const PAGE_DELAY: Duration = Duration::from_millis(200);
36
37const REQUEST_TIMEOUT: Duration = Duration::from_secs(90);
42
43const MAX_RETRY_DELAY: Duration = Duration::from_secs(30);
45
46#[derive(Debug, Clone)]
48pub struct DcatDataset {
49 pub id_uri: String,
51 pub identifier: String,
53 pub title: String,
55 pub description: Option<String>,
57 pub raw: Value,
59}
60
61#[derive(Clone)]
63pub struct DcatClient {
64 client: Client,
65 base_url: Url,
66 language: String,
67}
68
69impl DcatClient {
70 pub fn new(base_url_str: &str, language: &str) -> Result<Self, AppError> {
82 let base_url = Url::parse(base_url_str)
83 .map_err(|_| AppError::InvalidPortalUrl(base_url_str.to_string()))?;
84
85 let client = Client::builder()
86 .user_agent("Ceres/0.1 (semantic-search-bot)")
87 .timeout(REQUEST_TIMEOUT)
88 .build()
89 .map_err(|e| AppError::ClientError(e.to_string()))?;
90
91 Ok(Self {
92 client,
93 base_url,
94 language: language.to_string(),
95 })
96 }
97
98 pub fn portal_type(&self) -> &'static str {
100 "dcat"
101 }
102
103 pub fn base_url(&self) -> &str {
105 self.base_url.as_str()
106 }
107
108 pub async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
113 let datasets = self.search_all_datasets().await?;
114 Ok(datasets.into_iter().map(|d| d.identifier).collect())
115 }
116
117 pub async fn get_dataset(&self, _id: &str) -> Result<DcatDataset, AppError> {
120 Err(AppError::Generic(
121 "get_dataset is not implemented for DCAT portals. \
122 Use search_all_datasets() instead — it fetches complete metadata \
123 for all datasets in a single paginated catalog request."
124 .to_string(),
125 ))
126 }
127
128 pub async fn search_modified_since(
133 &self,
134 since: DateTime<Utc>,
135 ) -> Result<Vec<DcatDataset>, AppError> {
136 let since_str = since.to_rfc3339();
137 self.paginate_catalog(Some(&since_str)).await
138 }
139
140 pub async fn search_all_datasets(&self) -> Result<Vec<DcatDataset>, AppError> {
142 self.paginate_catalog(None).await
143 }
144
145 pub fn into_new_dataset(
150 data: DcatDataset,
151 portal_url: &str,
152 _url_template: Option<&str>,
153 language: &str,
154 ) -> NewDataset {
155 let content_hash = NewDataset::compute_content_hash_with_language(
156 &data.title,
157 data.description.as_deref(),
158 language,
159 );
160
161 NewDataset {
162 original_id: data.identifier,
163 source_portal: portal_url.to_string(),
164 url: data.id_uri,
165 title: data.title,
166 description: data.description,
167 embedding: None,
168 metadata: data.raw,
169 content_hash,
170 }
171 }
172
173 async fn paginate_catalog(
188 &self,
189 modified_since: Option<&str>,
190 ) -> Result<Vec<DcatDataset>, AppError> {
191 let mut all_datasets = Vec::new();
192 let mut next_url = Some(self.build_first_page_url(modified_since)?);
193 let mut page = 0u32;
194
195 while let Some(url) = next_url {
196 page += 1;
197
198 let graph = match self.fetch_graph(&url).await {
199 Ok(g) => g,
200 Err(e) => {
201 if all_datasets.is_empty() {
204 return Err(e);
205 }
206 tracing::warn!(
207 page,
208 collected = all_datasets.len(),
209 error = %e,
210 "Page fetch failed; returning partial results"
211 );
212 break;
213 }
214 };
215
216 for node in &graph {
218 if is_dataset_node(node)
219 && let Some(dataset) = extract_dataset(node, &self.language)
220 {
221 all_datasets.push(dataset);
222 }
223 }
224
225 next_url =
227 match extract_hydra_next(&graph) {
228 Some(next) => {
229 sleep(PAGE_DELAY).await;
230 Some(Url::parse(&next).map_err(|e| {
231 AppError::Generic(format!("Invalid hydra:next URL: {e}"))
232 })?)
233 }
234 None => None,
235 };
236 }
237
238 Ok(all_datasets)
239 }
240
241 pub fn paginate_catalog_stream(
246 &self,
247 modified_since: Option<String>,
248 ) -> BoxStream<'_, Result<Vec<DcatDataset>, AppError>> {
249 struct PaginationState {
250 next_url: Option<Url>,
251 page: u32,
252 }
253
254 let first_url = match self.build_first_page_url(modified_since.as_deref()) {
255 Ok(url) => url,
256 Err(err) => {
257 return Box::pin(futures::stream::once(async move { Err(err) }));
258 }
259 };
260
261 let initial = PaginationState {
262 next_url: Some(first_url),
263 page: 0,
264 };
265
266 Box::pin(futures::stream::unfold(
267 initial,
268 move |mut state| async move {
269 let url = state.next_url.take()?;
270 state.page += 1;
271
272 let graph = match self.fetch_graph(&url).await {
273 Ok(g) => g,
274 Err(e) => {
275 if state.page == 1 {
276 return Some((
278 Err(e),
279 PaginationState {
280 next_url: None,
281 page: state.page,
282 },
283 ));
284 }
285 tracing::warn!(
286 page = state.page,
287 error = %e,
288 "Page fetch failed; stopping stream"
289 );
290 return None;
291 }
292 };
293 let mut datasets = Vec::new();
295 for node in &graph {
296 if is_dataset_node(node)
297 && let Some(dataset) = extract_dataset(node, &self.language)
298 {
299 datasets.push(dataset);
300 }
301 }
302
303 state.next_url = match extract_hydra_next(&graph) {
305 Some(next) => {
306 sleep(PAGE_DELAY).await;
307 match Url::parse(&next) {
308 Ok(u) => Some(u),
309 Err(e) => {
310 tracing::warn!(error = %e, "Invalid hydra:next URL; stopping stream");
311 None
312 }
313 }
314 }
315 None => None,
316 };
317
318 Some((Ok(datasets), state))
319 },
320 ))
321 }
322
323 fn build_first_page_url(&self, modified_since: Option<&str>) -> Result<Url, AppError> {
325 let mut url = self
326 .base_url
327 .join("api/1/site/catalog.jsonld")
328 .map_err(|e| AppError::Generic(e.to_string()))?;
329
330 {
331 let mut pairs = url.query_pairs_mut();
332 pairs
333 .append_pair("page", "1")
334 .append_pair("page_size", "100");
335 if let Some(since) = modified_since {
336 pairs.append_pair("modified_since", since);
337 }
338 }
339
340 Ok(url)
341 }
342
343 async fn fetch_graph(&self, url: &Url) -> Result<Vec<Value>, AppError> {
345 let resp = self.request_with_retry(url).await?;
346
347 let body: Value = resp.json().await.map_err(|e| {
348 AppError::ClientError(format!("Portal returned non-JSON response: {e}"))
349 })?;
350
351 match body.get("@graph") {
352 Some(Value::Array(graph)) => Ok(graph.clone()),
353 Some(_) => Err(AppError::ClientError(
354 "DCAT response @graph is not an array".to_string(),
355 )),
356 None => Err(AppError::ClientError(
357 "DCAT response missing @graph".to_string(),
358 )),
359 }
360 }
361
362 async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
364 let http_config = HttpConfig::default();
365 let max_retries = http_config.max_retries;
366 let base_delay = http_config.retry_base_delay;
367 let mut last_error = AppError::Generic("No attempts made".to_string());
368
369 for attempt in 1..=max_retries {
370 match self.client.get(url.clone()).send().await {
371 Ok(resp) => {
372 let status = resp.status();
373
374 if status.is_success() {
375 return Ok(resp);
376 }
377
378 if status == StatusCode::TOO_MANY_REQUESTS {
379 last_error = AppError::RateLimitExceeded;
380 if attempt < max_retries {
381 let delay = resp
382 .headers()
383 .get("retry-after")
384 .and_then(|v| v.to_str().ok())
385 .and_then(|v| v.parse::<u64>().ok())
386 .map(Duration::from_secs)
387 .unwrap_or_else(|| {
388 (base_delay * 2_u32.pow(attempt)).min(MAX_RETRY_DELAY)
389 });
390 sleep(delay).await;
391 continue;
392 }
393 return Err(AppError::RateLimitExceeded);
394 }
395
396 if status.is_server_error() {
397 last_error = AppError::ClientError(format!(
398 "Server error: HTTP {}",
399 status.as_u16()
400 ));
401 if attempt < max_retries {
402 sleep(base_delay * attempt).await;
403 continue;
404 }
405 }
406
407 return Err(AppError::ClientError(format!(
408 "HTTP {} from {}",
409 status.as_u16(),
410 url
411 )));
412 }
413 Err(e) => {
414 if e.is_timeout() {
415 last_error = AppError::Timeout(REQUEST_TIMEOUT.as_secs());
416 } else if e.is_connect() {
417 last_error = AppError::NetworkError(format!("Connection failed: {e}"));
418 } else {
419 last_error = AppError::ClientError(e.to_string());
420 }
421 if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
422 sleep(base_delay * attempt).await;
423 continue;
424 }
425 }
426 }
427 }
428
429 Err(last_error)
430 }
431}
432
433pub fn resolve_jsonld_text(value: &Value, language: &str) -> String {
448 match value {
449 Value::String(s) => s.clone(),
450 Value::Object(obj) => {
451 obj.get("@value")
453 .and_then(|v| v.as_str())
454 .unwrap_or("")
455 .to_string()
456 }
457 Value::Array(arr) => {
458 let find_lang = |lang: &str, allow_base_match: bool| -> Option<&str> {
460 arr.iter().find_map(|item| {
461 let obj = item.as_object()?;
462 let item_lang = obj.get("@language")?.as_str()?;
463 let matches = item_lang.eq_ignore_ascii_case(lang)
464 || (allow_base_match
465 && item_lang
466 .split('-')
467 .next()
468 .map(|base| base.eq_ignore_ascii_case(lang))
469 .unwrap_or(false));
470 if matches {
471 obj.get("@value")?.as_str()
472 } else {
473 None
474 }
475 })
476 };
477
478 if let Some(s) = find_lang(language, false).or_else(|| find_lang(language, true)) {
479 return s.to_string();
480 }
481 if language != "en"
482 && let Some(s) = find_lang("en", false).or_else(|| find_lang("en", true))
483 {
484 return s.to_string();
485 }
486 arr.iter()
488 .find_map(|item| {
489 item.as_object()
490 .and_then(|o| o.get("@value"))
491 .and_then(|v| v.as_str())
492 .filter(|s| !s.is_empty())
493 })
494 .unwrap_or("")
495 .to_string()
496 }
497 _ => String::new(),
498 }
499}
500
501pub fn is_dataset_node(node: &Value) -> bool {
507 let type_value = match node.get("@type") {
508 Some(v) => v,
509 None => return false,
510 };
511
512 let is_dataset_type = |s: &str| {
513 matches!(
514 s,
515 "Dataset" | "dcat:Dataset" | "http://www.w3.org/ns/dcat#Dataset"
516 )
517 };
518
519 match type_value {
520 Value::String(s) => is_dataset_type(s.as_str()),
521 Value::Array(arr) => arr
522 .iter()
523 .any(|v| v.as_str().map(is_dataset_type).unwrap_or(false)),
524 _ => false,
525 }
526}
527
528pub fn extract_hydra_next(graph: &[Value]) -> Option<String> {
535 for node in graph {
536 let Some(obj) = node.as_object() else {
537 continue;
538 };
539
540 let is_pcv = obj.get("@type").map(|t| match t {
542 Value::String(s) => {
543 s == "hydra:PartialCollectionView"
544 || s == "http://www.w3.org/ns/hydra/core#PartialCollectionView"
545 }
546 Value::Array(arr) => arr.iter().any(|v| {
547 v.as_str()
548 .map(|s| {
549 s == "hydra:PartialCollectionView"
550 || s == "http://www.w3.org/ns/hydra/core#PartialCollectionView"
551 })
552 .unwrap_or(false)
553 }),
554 _ => false,
555 });
556
557 if is_pcv != Some(true) {
558 continue;
559 }
560
561 for key in &["next", "hydra:next", "http://www.w3.org/ns/hydra/core#next"] {
563 if let Some(next) = obj.get(*key) {
564 if let Some(s) = next.as_str() {
565 return Some(s.to_string());
566 }
567 if let Some(id) = next.get("@id").and_then(|v| v.as_str()) {
569 return Some(id.to_string());
570 }
571 }
572 }
573 }
574
575 None
576}
577
578fn get_jsonld_property<'a>(node: &'a Value, keys: &[&str]) -> Option<&'a Value> {
579 keys.iter().find_map(|key| node.get(*key))
580}
581
582pub fn extract_dataset(node: &Value, language: &str) -> Option<DcatDataset> {
589 let id_uri = node.get("@id")?.as_str()?.to_string();
590 if id_uri.is_empty() {
591 return None;
592 }
593
594 let identifier =
596 get_jsonld_property(node, &["identifier", "dct:identifier", "dcat:identifier"])
597 .and_then(|v| match v {
598 Value::String(s) => Some(s.clone()),
599 Value::Object(o) => o.get("@value").and_then(|v| v.as_str()).map(String::from),
600 _ => None,
601 })
602 .filter(|s| !s.is_empty())
603 .unwrap_or_else(|| {
604 id_uri
605 .trim_end_matches('/')
606 .rsplit('/')
607 .next()
608 .unwrap_or(&id_uri)
609 .to_string()
610 });
611
612 let title = get_jsonld_property(node, &["title", "dct:title"])
614 .map(|v| resolve_jsonld_text(v, language))
615 .filter(|s| !s.is_empty())?;
616
617 let description = get_jsonld_property(node, &["description", "dct:description"])
619 .map(|v| resolve_jsonld_text(v, language))
620 .filter(|s| !s.is_empty());
621
622 Some(DcatDataset {
623 id_uri,
624 identifier,
625 title,
626 description,
627 raw: node.clone(),
628 })
629}
630
631#[cfg(test)]
636mod tests {
637 use super::*;
638 use serde_json::json;
639
640 #[test]
643 fn resolve_plain_string() {
644 let v = json!("Dataset Title");
645 assert_eq!(resolve_jsonld_text(&v, "en"), "Dataset Title");
646 }
647
648 #[test]
649 fn resolve_single_lang_object() {
650 let v = json!({"@language": "fr", "@value": "Titre du jeu de données"});
651 assert_eq!(resolve_jsonld_text(&v, "fr"), "Titre du jeu de données");
652 }
653
654 #[test]
655 fn resolve_array_with_lang_match() {
656 let v = json!([
657 {"@language": "en", "@value": "English Title"},
658 {"@language": "fr", "@value": "Titre Français"}
659 ]);
660 assert_eq!(resolve_jsonld_text(&v, "fr"), "Titre Français");
661 }
662
663 #[test]
664 fn resolve_array_fallback_to_en() {
665 let v = json!([
666 {"@language": "en", "@value": "English Title"},
667 {"@language": "de", "@value": "Deutscher Titel"}
668 ]);
669 assert_eq!(resolve_jsonld_text(&v, "fr"), "English Title");
670 }
671
672 #[test]
673 fn resolve_array_matches_translated_language_tags() {
674 let v = json!([
675 {"@language": "de", "@value": "Deutscher Titel"},
676 {"@language": "en-t-de-t0-mtec", "@value": "English machine title"}
677 ]);
678 assert_eq!(resolve_jsonld_text(&v, "en"), "English machine title");
679 }
680
681 #[test]
682 fn resolve_array_prefers_exact_language_over_translated_tag() {
683 let v = json!([
684 {"@language": "en-t-de-t0-mtec", "@value": "English machine title"},
685 {"@language": "en", "@value": "English Title"}
686 ]);
687 assert_eq!(resolve_jsonld_text(&v, "en"), "English Title");
688 }
689
690 #[test]
691 fn resolve_array_fallback_to_first() {
692 let v = json!([
693 {"@language": "de", "@value": "Deutscher Titel"},
694 {"@language": "nl", "@value": "Nederlandse Titel"}
695 ]);
696 assert_eq!(resolve_jsonld_text(&v, "fr"), "Deutscher Titel");
697 }
698
699 #[test]
700 fn resolve_null_returns_empty() {
701 assert_eq!(resolve_jsonld_text(&Value::Null, "en"), "");
702 }
703
704 #[test]
707 fn is_dataset_node_string_form() {
708 let node = json!({"@type": "Dataset", "@id": "https://example.org/d/1"});
709 assert!(is_dataset_node(&node));
710 }
711
712 #[test]
713 fn is_dataset_node_array_form() {
714 let node = json!({"@type": ["Dataset", "dcat:Dataset"], "@id": "https://example.org/d/1"});
715 assert!(is_dataset_node(&node));
716 }
717
718 #[test]
719 fn is_dataset_node_full_uri() {
720 let node =
721 json!({"@type": "http://www.w3.org/ns/dcat#Dataset", "@id": "https://example.org/d/1"});
722 assert!(is_dataset_node(&node));
723 }
724
725 #[test]
726 fn is_dataset_node_distribution_false() {
727 let node = json!({"@type": "Distribution"});
728 assert!(!is_dataset_node(&node));
729 }
730
731 #[test]
732 fn is_dataset_node_missing_type_false() {
733 let node = json!({"@id": "https://example.org/d/1"});
734 assert!(!is_dataset_node(&node));
735 }
736
737 #[test]
740 fn extract_hydra_next_with_next_key() {
741 let graph = vec![
742 json!({"@type": "hydra:PartialCollectionView", "next": "https://example.org/catalog.jsonld?page=2&page_size=100"}),
743 ];
744 assert_eq!(
745 extract_hydra_next(&graph),
746 Some("https://example.org/catalog.jsonld?page=2&page_size=100".to_string())
747 );
748 }
749
750 #[test]
751 fn extract_hydra_next_with_hydra_prefix() {
752 let graph = vec![
753 json!({"@type": "hydra:PartialCollectionView", "hydra:next": "https://example.org/catalog.jsonld?page=2"}),
754 ];
755 assert_eq!(
756 extract_hydra_next(&graph),
757 Some("https://example.org/catalog.jsonld?page=2".to_string())
758 );
759 }
760
761 #[test]
762 fn extract_hydra_next_absent_returns_none() {
763 let graph = vec![
764 json!({"@type": "Dataset", "title": "foo"}),
765 json!({"@type": ["Catalog", "hydra:Collection"], "totalItems": 100}),
766 ];
767 assert_eq!(extract_hydra_next(&graph), None);
768 }
769
770 #[test]
771 fn extract_hydra_next_skips_non_object_nodes() {
772 let graph = vec![
774 json!("https://example.org/context"),
775 json!(42),
776 json!({"@type": "hydra:PartialCollectionView", "next": "https://example.org/?page=2"}),
777 ];
778 assert_eq!(
779 extract_hydra_next(&graph),
780 Some("https://example.org/?page=2".to_string())
781 );
782 }
783
784 #[test]
787 fn extract_dataset_basic() {
788 let node = json!({
789 "@id": "https://data.public.lu/datasets/abc123/",
790 "@type": "Dataset",
791 "identifier": "abc123",
792 "title": "Test Dataset",
793 "description": "A test description"
794 });
795 let dataset = extract_dataset(&node, "en").unwrap();
796 assert_eq!(dataset.id_uri, "https://data.public.lu/datasets/abc123/");
797 assert_eq!(dataset.identifier, "abc123");
798 assert_eq!(dataset.title, "Test Dataset");
799 assert_eq!(dataset.description.as_deref(), Some("A test description"));
800 }
801
802 #[test]
803 fn extract_dataset_supports_prefixed_dcat_ap_keys() {
804 let node = json!({
805 "@id": "http://data.europa.eu/88u/dataset/example",
806 "@type": "dcat:Dataset",
807 "dct:identifier": {"@value": "example-id"},
808 "dct:title": [
809 {"@language": "de", "@value": "Deutscher Titel"},
810 {"@language": "en-t-de-t0-mtec", "@value": "English Title"}
811 ],
812 "dct:description": {"@language": "en", "@value": "Description"}
813 });
814
815 let dataset = extract_dataset(&node, "en").unwrap();
816 assert_eq!(dataset.identifier, "example-id");
817 assert_eq!(dataset.title, "English Title");
818 assert_eq!(dataset.description.as_deref(), Some("Description"));
819 }
820
821 #[test]
822 fn extract_dataset_identifier_fallback_to_id_segment() {
823 let node = json!({
824 "@id": "https://data.public.lu/datasets/my-dataset/",
825 "@type": "Dataset",
826 "title": "My Dataset"
827 });
828 let dataset = extract_dataset(&node, "en").unwrap();
829 assert_eq!(dataset.identifier, "my-dataset");
830 }
831
832 #[test]
833 fn extract_dataset_missing_id_returns_none() {
834 let node = json!({"@type": "Dataset", "title": "No ID"});
835 assert!(extract_dataset(&node, "en").is_none());
836 }
837
838 #[test]
839 fn extract_dataset_missing_title_returns_none() {
840 let node = json!({"@id": "https://example.org/d/1", "@type": "Dataset"});
841 assert!(extract_dataset(&node, "en").is_none());
842 }
843
844 #[test]
847 fn into_new_dataset_mapping() {
848 let dataset = DcatDataset {
849 id_uri: "https://data.public.lu/datasets/abc123/".to_string(),
850 identifier: "abc123".to_string(),
851 title: "Test Dataset".to_string(),
852 description: Some("A description".to_string()),
853 raw: json!({"@id": "https://data.public.lu/datasets/abc123/"}),
854 };
855
856 let new_dataset =
857 DcatClient::into_new_dataset(dataset, "https://data.public.lu", None, "fr");
858
859 assert_eq!(new_dataset.original_id, "abc123");
860 assert_eq!(new_dataset.source_portal, "https://data.public.lu");
861 assert_eq!(new_dataset.url, "https://data.public.lu/datasets/abc123/");
862 assert_eq!(new_dataset.title, "Test Dataset");
863 assert_eq!(new_dataset.description.as_deref(), Some("A description"));
864 assert!(new_dataset.embedding.is_none());
865
866 let expected_hash = NewDataset::compute_content_hash_with_language(
867 "Test Dataset",
868 Some("A description"),
869 "fr",
870 );
871 assert_eq!(new_dataset.content_hash, expected_hash);
872 }
873
874 #[test]
875 fn into_new_dataset_url_template_ignored() {
876 let dataset = DcatDataset {
877 id_uri: "https://data.public.lu/datasets/abc123/".to_string(),
878 identifier: "abc123".to_string(),
879 title: "Test".to_string(),
880 description: None,
881 raw: json!({}),
882 };
883
884 let new_dataset = DcatClient::into_new_dataset(
885 dataset,
886 "https://data.public.lu",
887 Some("https://example.org/{id}"),
888 "en",
889 );
890
891 assert_eq!(new_dataset.url, "https://data.public.lu/datasets/abc123/");
893 }
894
895 #[test]
898 fn dcat_client_new_valid() {
899 assert!(DcatClient::new("https://data.public.lu", "fr").is_ok());
900 }
901
902 #[test]
903 fn dcat_client_new_invalid_url() {
904 let result = DcatClient::new("not-a-url", "en");
905 assert!(matches!(result, Err(AppError::InvalidPortalUrl(_))));
906 }
907
908 #[tokio::test]
911 #[ignore = "requires network access to data.public.lu"]
912 async fn test_dcat_smoke_luxembourg() {
913 let client = DcatClient::new("https://data.public.lu", "fr").unwrap();
914 let datasets = client.search_all_datasets().await.unwrap();
915 assert!(
916 datasets.len() > 100,
917 "Expected >100 datasets, got {}",
918 datasets.len()
919 );
920 let first = &datasets[0];
921 assert!(!first.title.is_empty(), "First dataset title is empty");
922 assert!(!first.id_uri.is_empty(), "First dataset id_uri is empty");
923 assert!(
924 !first.identifier.is_empty(),
925 "First dataset identifier is empty"
926 );
927 }
928}