1use std::time::Duration;
24
25use ceres_core::HttpConfig;
26use ceres_core::error::AppError;
27use ceres_core::models::NewDataset;
28use chrono::{DateTime, Utc};
29use futures::stream::BoxStream;
30use reqwest::{Client, StatusCode, Url};
31use serde_json::Value;
32use tokio::time::sleep;
33
34const PAGE_DELAY: Duration = Duration::from_millis(200);
36
37const REQUEST_TIMEOUT: Duration = Duration::from_secs(90);
42
43const MAX_RETRY_DELAY: Duration = Duration::from_secs(30);
45
46#[derive(Debug, Clone)]
48pub struct DcatDataset {
49 pub id_uri: String,
51 pub identifier: String,
53 pub title: String,
55 pub description: Option<String>,
57 pub raw: Value,
59}
60
61#[derive(Clone)]
63pub struct DcatClient {
64 client: Client,
65 base_url: Url,
66 language: String,
67}
68
69impl DcatClient {
70 pub fn new(base_url_str: &str, language: &str) -> Result<Self, AppError> {
82 let base_url = Url::parse(base_url_str)
83 .map_err(|_| AppError::InvalidPortalUrl(base_url_str.to_string()))?;
84
85 let client = Client::builder()
86 .user_agent("Ceres/0.1 (semantic-search-bot)")
87 .timeout(REQUEST_TIMEOUT)
88 .build()
89 .map_err(|e| AppError::ClientError(e.to_string()))?;
90
91 Ok(Self {
92 client,
93 base_url,
94 language: language.to_string(),
95 })
96 }
97
98 pub fn portal_type(&self) -> &'static str {
100 "dcat"
101 }
102
103 pub fn base_url(&self) -> &str {
105 self.base_url.as_str()
106 }
107
108 pub async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
113 let datasets = self.search_all_datasets().await?;
114 Ok(datasets.into_iter().map(|d| d.identifier).collect())
115 }
116
117 pub async fn get_dataset(&self, _id: &str) -> Result<DcatDataset, AppError> {
120 Err(AppError::Generic(
121 "get_dataset is not implemented for DCAT portals. \
122 Use search_all_datasets() instead — it fetches complete metadata \
123 for all datasets in a single paginated catalog request."
124 .to_string(),
125 ))
126 }
127
128 pub async fn search_modified_since(
133 &self,
134 since: DateTime<Utc>,
135 ) -> Result<Vec<DcatDataset>, AppError> {
136 let since_str = since.to_rfc3339();
137 self.paginate_catalog(Some(&since_str)).await
138 }
139
140 pub async fn search_all_datasets(&self) -> Result<Vec<DcatDataset>, AppError> {
142 self.paginate_catalog(None).await
143 }
144
145 pub fn into_new_dataset(
150 data: DcatDataset,
151 portal_url: &str,
152 _url_template: Option<&str>,
153 language: &str,
154 ) -> NewDataset {
155 let content_hash = NewDataset::compute_content_hash_with_language(
156 &data.title,
157 data.description.as_deref(),
158 language,
159 );
160
161 NewDataset {
162 original_id: data.identifier,
163 source_portal: portal_url.to_string(),
164 url: data.id_uri,
165 title: data.title,
166 description: data.description,
167 embedding: None,
168 metadata: data.raw,
169 content_hash,
170 }
171 }
172
173 async fn paginate_catalog(
188 &self,
189 modified_since: Option<&str>,
190 ) -> Result<Vec<DcatDataset>, AppError> {
191 let mut all_datasets = Vec::new();
192 let mut next_url = Some(self.build_first_page_url(modified_since)?);
193 let mut page = 0u32;
194
195 while let Some(url) = next_url {
196 page += 1;
197
198 let graph = match self.fetch_graph(&url).await {
199 Ok(g) => g,
200 Err(e) => {
201 if all_datasets.is_empty() {
204 return Err(e);
205 }
206 tracing::warn!(
207 page,
208 collected = all_datasets.len(),
209 error = %e,
210 "Page fetch failed; returning partial results"
211 );
212 break;
213 }
214 };
215
216 for node in &graph {
218 if is_dataset_node(node)
219 && let Some(dataset) = extract_dataset(node, &self.language)
220 {
221 all_datasets.push(dataset);
222 }
223 }
224
225 next_url =
227 match extract_hydra_next(&graph) {
228 Some(next) => {
229 sleep(PAGE_DELAY).await;
230 Some(Url::parse(&next).map_err(|e| {
231 AppError::Generic(format!("Invalid hydra:next URL: {e}"))
232 })?)
233 }
234 None => None,
235 };
236 }
237
238 Ok(all_datasets)
239 }
240
241 pub fn paginate_catalog_stream(
246 &self,
247 modified_since: Option<String>,
248 ) -> BoxStream<'_, Result<Vec<DcatDataset>, AppError>> {
249 struct PaginationState {
250 next_url: Option<Url>,
251 page: u32,
252 }
253
254 let first_url = match self.build_first_page_url(modified_since.as_deref()) {
255 Ok(url) => url,
256 Err(err) => {
257 return Box::pin(futures::stream::once(async move { Err(err) }));
258 }
259 };
260
261 let initial = PaginationState {
262 next_url: Some(first_url),
263 page: 0,
264 };
265
266 Box::pin(futures::stream::unfold(
267 initial,
268 move |mut state| async move {
269 let url = state.next_url.take()?;
270 state.page += 1;
271
272 let graph = match self.fetch_graph(&url).await {
273 Ok(g) => g,
274 Err(e) => {
275 if state.page == 1 {
276 return Some((
278 Err(e),
279 PaginationState {
280 next_url: None,
281 page: state.page,
282 },
283 ));
284 }
285 tracing::warn!(
286 page = state.page,
287 error = %e,
288 "Page fetch failed; stopping stream"
289 );
290 return None;
291 }
292 };
293 let mut datasets = Vec::new();
295 for node in &graph {
296 if is_dataset_node(node)
297 && let Some(dataset) = extract_dataset(node, &self.language)
298 {
299 datasets.push(dataset);
300 }
301 }
302
303 state.next_url = match extract_hydra_next(&graph) {
305 Some(next) => {
306 sleep(PAGE_DELAY).await;
307 match Url::parse(&next) {
308 Ok(u) => Some(u),
309 Err(e) => {
310 tracing::warn!(error = %e, "Invalid hydra:next URL; stopping stream");
311 None
312 }
313 }
314 }
315 None => None,
316 };
317
318 Some((Ok(datasets), state))
319 },
320 ))
321 }
322
323 fn build_first_page_url(&self, modified_since: Option<&str>) -> Result<Url, AppError> {
325 let mut url = self
326 .base_url
327 .join("api/1/site/catalog.jsonld")
328 .map_err(|e| AppError::Generic(e.to_string()))?;
329
330 {
331 let mut pairs = url.query_pairs_mut();
332 pairs
333 .append_pair("page", "1")
334 .append_pair("page_size", "100");
335 if let Some(since) = modified_since {
336 pairs.append_pair("modified_since", since);
337 }
338 }
339
340 Ok(url)
341 }
342
343 async fn fetch_graph(&self, url: &Url) -> Result<Vec<Value>, AppError> {
345 let resp = self.request_with_retry(url).await?;
346
347 let body: Value = resp.json().await.map_err(|e| {
348 AppError::ClientError(format!("Portal returned non-JSON response: {e}"))
349 })?;
350
351 match body.get("@graph") {
352 Some(Value::Array(graph)) => Ok(graph.clone()),
353 Some(_) => Err(AppError::ClientError(
354 "DCAT response @graph is not an array".to_string(),
355 )),
356 None => Err(AppError::ClientError(
357 "DCAT response missing @graph".to_string(),
358 )),
359 }
360 }
361
362 async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
364 let http_config = HttpConfig::default();
365 let max_retries = http_config.max_retries;
366 let base_delay = http_config.retry_base_delay;
367 let mut last_error = AppError::Generic("No attempts made".to_string());
368
369 for attempt in 1..=max_retries {
370 match self.client.get(url.clone()).send().await {
371 Ok(resp) => {
372 let status = resp.status();
373
374 if status.is_success() {
375 return Ok(resp);
376 }
377
378 if status == StatusCode::TOO_MANY_REQUESTS {
379 last_error = AppError::RateLimitExceeded;
380 if attempt < max_retries {
381 let delay = resp
382 .headers()
383 .get("retry-after")
384 .and_then(|v| v.to_str().ok())
385 .and_then(|v| v.parse::<u64>().ok())
386 .map(Duration::from_secs)
387 .unwrap_or_else(|| {
388 (base_delay * 2_u32.pow(attempt)).min(MAX_RETRY_DELAY)
389 });
390 sleep(delay).await;
391 continue;
392 }
393 return Err(AppError::RateLimitExceeded);
394 }
395
396 if status.is_server_error() {
397 last_error = AppError::ClientError(format!(
398 "Server error: HTTP {}",
399 status.as_u16()
400 ));
401 if attempt < max_retries {
402 sleep(base_delay * attempt).await;
403 continue;
404 }
405 }
406
407 return Err(AppError::ClientError(format!(
408 "HTTP {} from {}",
409 status.as_u16(),
410 url
411 )));
412 }
413 Err(e) => {
414 if e.is_timeout() {
415 last_error = AppError::Timeout(REQUEST_TIMEOUT.as_secs());
416 } else if e.is_connect() {
417 last_error = AppError::NetworkError(format!("Connection failed: {e}"));
418 } else {
419 last_error = AppError::ClientError(e.to_string());
420 }
421 if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
422 sleep(base_delay * attempt).await;
423 continue;
424 }
425 }
426 }
427 }
428
429 Err(last_error)
430 }
431}
432
433pub fn resolve_jsonld_text(value: &Value, language: &str) -> String {
448 match value {
449 Value::String(s) => s.clone(),
450 Value::Object(obj) => {
451 obj.get("@value")
453 .and_then(|v| v.as_str())
454 .unwrap_or("")
455 .to_string()
456 }
457 Value::Array(arr) => {
458 let find_lang = |lang: &str| -> Option<&str> {
460 arr.iter().find_map(|item| {
461 let obj = item.as_object()?;
462 let item_lang = obj.get("@language")?.as_str()?;
463 if item_lang.eq_ignore_ascii_case(lang) {
464 obj.get("@value")?.as_str()
465 } else {
466 None
467 }
468 })
469 };
470
471 if let Some(s) = find_lang(language) {
472 return s.to_string();
473 }
474 if language != "en"
475 && let Some(s) = find_lang("en")
476 {
477 return s.to_string();
478 }
479 arr.iter()
481 .find_map(|item| {
482 item.as_object()
483 .and_then(|o| o.get("@value"))
484 .and_then(|v| v.as_str())
485 .filter(|s| !s.is_empty())
486 })
487 .unwrap_or("")
488 .to_string()
489 }
490 _ => String::new(),
491 }
492}
493
494pub fn is_dataset_node(node: &Value) -> bool {
500 let type_value = match node.get("@type") {
501 Some(v) => v,
502 None => return false,
503 };
504
505 let is_dataset_type = |s: &str| {
506 matches!(
507 s,
508 "Dataset" | "dcat:Dataset" | "http://www.w3.org/ns/dcat#Dataset"
509 )
510 };
511
512 match type_value {
513 Value::String(s) => is_dataset_type(s.as_str()),
514 Value::Array(arr) => arr
515 .iter()
516 .any(|v| v.as_str().map(is_dataset_type).unwrap_or(false)),
517 _ => false,
518 }
519}
520
521pub fn extract_hydra_next(graph: &[Value]) -> Option<String> {
528 for node in graph {
529 let Some(obj) = node.as_object() else {
530 continue;
531 };
532
533 let is_pcv = obj.get("@type").map(|t| match t {
535 Value::String(s) => {
536 s == "hydra:PartialCollectionView"
537 || s == "http://www.w3.org/ns/hydra/core#PartialCollectionView"
538 }
539 Value::Array(arr) => arr.iter().any(|v| {
540 v.as_str()
541 .map(|s| {
542 s == "hydra:PartialCollectionView"
543 || s == "http://www.w3.org/ns/hydra/core#PartialCollectionView"
544 })
545 .unwrap_or(false)
546 }),
547 _ => false,
548 });
549
550 if is_pcv != Some(true) {
551 continue;
552 }
553
554 for key in &["next", "hydra:next", "http://www.w3.org/ns/hydra/core#next"] {
556 if let Some(next) = obj.get(*key) {
557 if let Some(s) = next.as_str() {
558 return Some(s.to_string());
559 }
560 if let Some(id) = next.get("@id").and_then(|v| v.as_str()) {
562 return Some(id.to_string());
563 }
564 }
565 }
566 }
567
568 None
569}
570
571pub fn extract_dataset(node: &Value, language: &str) -> Option<DcatDataset> {
578 let id_uri = node.get("@id")?.as_str()?.to_string();
579 if id_uri.is_empty() {
580 return None;
581 }
582
583 let identifier = node
585 .get("identifier")
586 .and_then(|v| match v {
587 Value::String(s) => Some(s.clone()),
588 Value::Object(o) => o.get("@value").and_then(|v| v.as_str()).map(String::from),
589 _ => None,
590 })
591 .filter(|s| !s.is_empty())
592 .unwrap_or_else(|| {
593 id_uri
594 .trim_end_matches('/')
595 .rsplit('/')
596 .next()
597 .unwrap_or(&id_uri)
598 .to_string()
599 });
600
601 let title = node
603 .get("title")
604 .map(|v| resolve_jsonld_text(v, language))
605 .filter(|s| !s.is_empty())?;
606
607 let description = node
609 .get("description")
610 .map(|v| resolve_jsonld_text(v, language))
611 .filter(|s| !s.is_empty());
612
613 Some(DcatDataset {
614 id_uri,
615 identifier,
616 title,
617 description,
618 raw: node.clone(),
619 })
620}
621
622#[cfg(test)]
627mod tests {
628 use super::*;
629 use serde_json::json;
630
631 #[test]
634 fn resolve_plain_string() {
635 let v = json!("Dataset Title");
636 assert_eq!(resolve_jsonld_text(&v, "en"), "Dataset Title");
637 }
638
639 #[test]
640 fn resolve_single_lang_object() {
641 let v = json!({"@language": "fr", "@value": "Titre du jeu de données"});
642 assert_eq!(resolve_jsonld_text(&v, "fr"), "Titre du jeu de données");
643 }
644
645 #[test]
646 fn resolve_array_with_lang_match() {
647 let v = json!([
648 {"@language": "en", "@value": "English Title"},
649 {"@language": "fr", "@value": "Titre Français"}
650 ]);
651 assert_eq!(resolve_jsonld_text(&v, "fr"), "Titre Français");
652 }
653
654 #[test]
655 fn resolve_array_fallback_to_en() {
656 let v = json!([
657 {"@language": "en", "@value": "English Title"},
658 {"@language": "de", "@value": "Deutscher Titel"}
659 ]);
660 assert_eq!(resolve_jsonld_text(&v, "fr"), "English Title");
661 }
662
663 #[test]
664 fn resolve_array_fallback_to_first() {
665 let v = json!([
666 {"@language": "de", "@value": "Deutscher Titel"},
667 {"@language": "nl", "@value": "Nederlandse Titel"}
668 ]);
669 assert_eq!(resolve_jsonld_text(&v, "fr"), "Deutscher Titel");
670 }
671
672 #[test]
673 fn resolve_null_returns_empty() {
674 assert_eq!(resolve_jsonld_text(&Value::Null, "en"), "");
675 }
676
677 #[test]
680 fn is_dataset_node_string_form() {
681 let node = json!({"@type": "Dataset", "@id": "https://example.org/d/1"});
682 assert!(is_dataset_node(&node));
683 }
684
685 #[test]
686 fn is_dataset_node_array_form() {
687 let node = json!({"@type": ["Dataset", "dcat:Dataset"], "@id": "https://example.org/d/1"});
688 assert!(is_dataset_node(&node));
689 }
690
691 #[test]
692 fn is_dataset_node_full_uri() {
693 let node =
694 json!({"@type": "http://www.w3.org/ns/dcat#Dataset", "@id": "https://example.org/d/1"});
695 assert!(is_dataset_node(&node));
696 }
697
698 #[test]
699 fn is_dataset_node_distribution_false() {
700 let node = json!({"@type": "Distribution"});
701 assert!(!is_dataset_node(&node));
702 }
703
704 #[test]
705 fn is_dataset_node_missing_type_false() {
706 let node = json!({"@id": "https://example.org/d/1"});
707 assert!(!is_dataset_node(&node));
708 }
709
710 #[test]
713 fn extract_hydra_next_with_next_key() {
714 let graph = vec![
715 json!({"@type": "hydra:PartialCollectionView", "next": "https://example.org/catalog.jsonld?page=2&page_size=100"}),
716 ];
717 assert_eq!(
718 extract_hydra_next(&graph),
719 Some("https://example.org/catalog.jsonld?page=2&page_size=100".to_string())
720 );
721 }
722
723 #[test]
724 fn extract_hydra_next_with_hydra_prefix() {
725 let graph = vec![
726 json!({"@type": "hydra:PartialCollectionView", "hydra:next": "https://example.org/catalog.jsonld?page=2"}),
727 ];
728 assert_eq!(
729 extract_hydra_next(&graph),
730 Some("https://example.org/catalog.jsonld?page=2".to_string())
731 );
732 }
733
734 #[test]
735 fn extract_hydra_next_absent_returns_none() {
736 let graph = vec![
737 json!({"@type": "Dataset", "title": "foo"}),
738 json!({"@type": ["Catalog", "hydra:Collection"], "totalItems": 100}),
739 ];
740 assert_eq!(extract_hydra_next(&graph), None);
741 }
742
743 #[test]
744 fn extract_hydra_next_skips_non_object_nodes() {
745 let graph = vec![
747 json!("https://example.org/context"),
748 json!(42),
749 json!({"@type": "hydra:PartialCollectionView", "next": "https://example.org/?page=2"}),
750 ];
751 assert_eq!(
752 extract_hydra_next(&graph),
753 Some("https://example.org/?page=2".to_string())
754 );
755 }
756
757 #[test]
760 fn extract_dataset_basic() {
761 let node = json!({
762 "@id": "https://data.public.lu/datasets/abc123/",
763 "@type": "Dataset",
764 "identifier": "abc123",
765 "title": "Test Dataset",
766 "description": "A test description"
767 });
768 let dataset = extract_dataset(&node, "en").unwrap();
769 assert_eq!(dataset.id_uri, "https://data.public.lu/datasets/abc123/");
770 assert_eq!(dataset.identifier, "abc123");
771 assert_eq!(dataset.title, "Test Dataset");
772 assert_eq!(dataset.description.as_deref(), Some("A test description"));
773 }
774
775 #[test]
776 fn extract_dataset_identifier_fallback_to_id_segment() {
777 let node = json!({
778 "@id": "https://data.public.lu/datasets/my-dataset/",
779 "@type": "Dataset",
780 "title": "My Dataset"
781 });
782 let dataset = extract_dataset(&node, "en").unwrap();
783 assert_eq!(dataset.identifier, "my-dataset");
784 }
785
786 #[test]
787 fn extract_dataset_missing_id_returns_none() {
788 let node = json!({"@type": "Dataset", "title": "No ID"});
789 assert!(extract_dataset(&node, "en").is_none());
790 }
791
792 #[test]
793 fn extract_dataset_missing_title_returns_none() {
794 let node = json!({"@id": "https://example.org/d/1", "@type": "Dataset"});
795 assert!(extract_dataset(&node, "en").is_none());
796 }
797
798 #[test]
801 fn into_new_dataset_mapping() {
802 let dataset = DcatDataset {
803 id_uri: "https://data.public.lu/datasets/abc123/".to_string(),
804 identifier: "abc123".to_string(),
805 title: "Test Dataset".to_string(),
806 description: Some("A description".to_string()),
807 raw: json!({"@id": "https://data.public.lu/datasets/abc123/"}),
808 };
809
810 let new_dataset =
811 DcatClient::into_new_dataset(dataset, "https://data.public.lu", None, "fr");
812
813 assert_eq!(new_dataset.original_id, "abc123");
814 assert_eq!(new_dataset.source_portal, "https://data.public.lu");
815 assert_eq!(new_dataset.url, "https://data.public.lu/datasets/abc123/");
816 assert_eq!(new_dataset.title, "Test Dataset");
817 assert_eq!(new_dataset.description.as_deref(), Some("A description"));
818 assert!(new_dataset.embedding.is_none());
819
820 let expected_hash = NewDataset::compute_content_hash_with_language(
821 "Test Dataset",
822 Some("A description"),
823 "fr",
824 );
825 assert_eq!(new_dataset.content_hash, expected_hash);
826 }
827
828 #[test]
829 fn into_new_dataset_url_template_ignored() {
830 let dataset = DcatDataset {
831 id_uri: "https://data.public.lu/datasets/abc123/".to_string(),
832 identifier: "abc123".to_string(),
833 title: "Test".to_string(),
834 description: None,
835 raw: json!({}),
836 };
837
838 let new_dataset = DcatClient::into_new_dataset(
839 dataset,
840 "https://data.public.lu",
841 Some("https://example.org/{id}"),
842 "en",
843 );
844
845 assert_eq!(new_dataset.url, "https://data.public.lu/datasets/abc123/");
847 }
848
849 #[test]
852 fn dcat_client_new_valid() {
853 assert!(DcatClient::new("https://data.public.lu", "fr").is_ok());
854 }
855
856 #[test]
857 fn dcat_client_new_invalid_url() {
858 let result = DcatClient::new("not-a-url", "en");
859 assert!(matches!(result, Err(AppError::InvalidPortalUrl(_))));
860 }
861
862 #[tokio::test]
865 #[ignore = "requires network access to data.public.lu"]
866 async fn test_dcat_smoke_luxembourg() {
867 let client = DcatClient::new("https://data.public.lu", "fr").unwrap();
868 let datasets = client.search_all_datasets().await.unwrap();
869 assert!(
870 datasets.len() > 100,
871 "Expected >100 datasets, got {}",
872 datasets.len()
873 );
874 let first = &datasets[0];
875 assert!(!first.title.is_empty(), "First dataset title is empty");
876 assert!(!first.id_uri.is_empty(), "First dataset id_uri is empty");
877 assert!(
878 !first.identifier.is_empty(),
879 "First dataset identifier is empty"
880 );
881 }
882}