1use std::time::Duration;
11
12use ceres_core::HttpConfig;
13use ceres_core::LocalizedField;
14use ceres_core::error::AppError;
15use ceres_core::models::NewDataset;
16use chrono::{DateTime, Utc};
17use futures::stream::BoxStream;
18use reqwest::{Client, StatusCode, Url};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::time::sleep;
22
23#[derive(Deserialize, Debug)]
35struct CkanResponse<T> {
36 success: bool,
37 result: T,
38}
39
40#[derive(Deserialize, Debug)]
42struct PackageSearchResult {
43 count: usize,
44 results: Vec<CkanDataset>,
45}
46
47#[derive(Deserialize, Debug, Clone)]
87pub struct CkanDataset {
88 pub id: String,
90 pub name: String,
92 pub title: LocalizedField,
94 pub notes: Option<LocalizedField>,
96 #[serde(flatten)]
98 pub extras: serde_json::Map<String, Value>,
99}
100
101#[derive(Clone)]
119pub struct CkanClient {
120 client: Client,
121 base_url: Url,
122}
123
124impl CkanClient {
125 const PAGE_DELAY: Duration = Duration::from_secs(1);
127
128 const MAX_RETRY_DELAY: Duration = Duration::from_secs(30);
130
131 const PAGE_RATE_LIMIT_COOLDOWN: Duration = Duration::from_secs(60);
134
135 const PAGE_RATE_LIMIT_RETRIES: u32 = 3;
137
138 const MIN_PAGE_SIZE: usize = 10;
143
144 fn is_page_size_reducible(err: &AppError) -> bool {
150 matches!(err, AppError::Timeout(_) | AppError::NetworkError(_))
151 }
152
153 pub fn new(base_url_str: &str) -> Result<Self, AppError> {
171 let base_url = Url::parse(base_url_str)
172 .map_err(|_| AppError::InvalidPortalUrl(base_url_str.to_string()))?;
173
174 let http_config = HttpConfig::default();
175 let client = Client::builder()
176 .user_agent("Ceres/0.1 (semantic-search-bot)")
178 .timeout(http_config.timeout)
179 .build()
180 .map_err(|e| AppError::ClientError(e.to_string()))?;
181
182 Ok(Self { client, base_url })
183 }
184
185 pub async fn list_package_ids(&self) -> Result<Vec<String>, AppError> {
206 let url = self
207 .base_url
208 .join("api/3/action/package_list")
209 .map_err(|e| AppError::Generic(e.to_string()))?;
210
211 let resp = self.request_with_retry(&url).await?;
212
213 let ckan_resp: CkanResponse<Vec<String>> = resp
214 .json()
215 .await
216 .map_err(|e| AppError::ClientError(e.to_string()))?;
217
218 if !ckan_resp.success {
219 return Err(AppError::Generic(
220 "CKAN API returned success: false".to_string(),
221 ));
222 }
223
224 Ok(ckan_resp.result)
225 }
226
227 pub async fn show_package(&self, id: &str) -> Result<CkanDataset, AppError> {
240 let mut url = self
241 .base_url
242 .join("api/3/action/package_show")
243 .map_err(|e| AppError::Generic(e.to_string()))?;
244
245 url.query_pairs_mut().append_pair("id", id);
246
247 let resp = self.request_with_retry(&url).await?;
248
249 let ckan_resp: CkanResponse<CkanDataset> = resp
250 .json()
251 .await
252 .map_err(|e| AppError::ClientError(e.to_string()))?;
253
254 if !ckan_resp.success {
255 return Err(AppError::Generic(format!(
256 "CKAN failed to show package {}",
257 id
258 )));
259 }
260
261 Ok(ckan_resp.result)
262 }
263
264 pub async fn search_modified_since(
286 &self,
287 since: DateTime<Utc>,
288 ) -> Result<Vec<CkanDataset>, AppError> {
289 let since_str = since.format("%Y-%m-%dT%H:%M:%SZ").to_string();
290 let fq = Some(format!("metadata_modified:[{} TO *]", since_str));
291 self.paginated_search(fq.as_deref()).await
292 }
293
294 pub async fn search_all_datasets(&self) -> Result<Vec<CkanDataset>, AppError> {
300 self.paginated_search(None).await
301 }
302
303 async fn paginated_search(&self, fq: Option<&str>) -> Result<Vec<CkanDataset>, AppError> {
314 let mut page_size: usize = 1000;
315 let mut all_datasets = Vec::new();
316 let mut start: usize = 0;
317 let mut page_delay = Self::PAGE_DELAY;
318
319 loop {
320 match self
321 .fetch_search_page(fq, start, page_size, &mut page_delay)
322 .await
323 {
324 Ok((datasets, total_count)) => {
325 let page_count = datasets.len();
326 all_datasets.extend(datasets);
327
328 if start + page_count >= total_count || page_count < page_size {
330 break;
331 }
332
333 start += page_size;
334 }
335 Err(e) if page_size > Self::MIN_PAGE_SIZE && Self::is_page_size_reducible(&e) => {
336 page_size = (page_size / 4).max(Self::MIN_PAGE_SIZE);
340 tracing::warn!(
341 new_page_size = page_size,
342 offset = start,
343 error = %e,
344 "Page failed, reducing page size and retrying"
345 );
346 continue;
347 }
348 Err(e) if Self::is_page_size_reducible(&e) => {
349 return Err(AppError::Generic(format!(
354 "package_search unreliable even at page_size={page_size}: {e}"
355 )));
356 }
357 Err(e) => return Err(e),
358 }
359
360 sleep(page_delay).await;
362 }
363
364 Ok(all_datasets)
365 }
366
367 fn paginated_search_stream(
373 &self,
374 fq: Option<String>,
375 ) -> BoxStream<'_, Result<Vec<CkanDataset>, AppError>> {
376 struct PaginationState {
377 start: usize,
378 page_size: usize,
379 page_delay: Duration,
380 done: bool,
381 fq: Option<String>,
382 }
383
384 let initial = PaginationState {
385 start: 0,
386 page_size: 1000,
387 page_delay: Self::PAGE_DELAY,
388 done: false,
389 fq,
390 };
391
392 Box::pin(futures::stream::unfold(
393 initial,
394 move |mut state| async move {
395 if state.done {
396 return None;
397 }
398
399 loop {
400 match self
401 .fetch_search_page(
402 state.fq.as_deref(),
403 state.start,
404 state.page_size,
405 &mut state.page_delay,
406 )
407 .await
408 {
409 Ok((datasets, total_count)) => {
410 let page_count = datasets.len();
411
412 if state.start + page_count >= total_count
413 || page_count < state.page_size
414 {
415 state.done = true;
416 } else {
417 state.start += state.page_size;
418 }
419
420 if !state.done {
422 sleep(state.page_delay).await;
423 }
424
425 return Some((Ok(datasets), state));
426 }
427 Err(e)
428 if state.page_size > Self::MIN_PAGE_SIZE
429 && Self::is_page_size_reducible(&e) =>
430 {
431 state.page_size = (state.page_size / 4).max(Self::MIN_PAGE_SIZE);
432 tracing::warn!(
433 new_page_size = state.page_size,
434 offset = state.start,
435 error = %e,
436 "Page failed, reducing page size and retrying"
437 );
438 continue; }
440 Err(e) if Self::is_page_size_reducible(&e) => {
441 state.done = true;
442 return Some((
443 Err(AppError::Generic(format!(
444 "package_search unreliable even at page_size={}: {e}",
445 state.page_size
446 ))),
447 state,
448 ));
449 }
450 Err(e) => {
451 state.done = true;
452 return Some((Err(e), state));
453 }
454 }
455 }
456 },
457 ))
458 }
459
460 pub async fn dataset_count(&self) -> Result<usize, AppError> {
462 let mut page_delay = Self::PAGE_DELAY;
463 let (_, total) = self.fetch_search_page(None, 0, 0, &mut page_delay).await?;
464 Ok(total)
465 }
466
467 async fn fetch_search_page(
472 &self,
473 fq: Option<&str>,
474 start: usize,
475 page_size: usize,
476 page_delay: &mut Duration,
477 ) -> Result<(Vec<CkanDataset>, usize), AppError> {
478 let mut url = self
479 .base_url
480 .join("api/3/action/package_search")
481 .map_err(|e| AppError::Generic(e.to_string()))?;
482
483 {
484 let mut pairs = url.query_pairs_mut();
485 if let Some(filter) = fq {
486 pairs.append_pair("fq", filter);
487 }
488 pairs
489 .append_pair("rows", &page_size.to_string())
490 .append_pair("start", &start.to_string())
491 .append_pair("sort", "metadata_modified asc");
492 }
493
494 let mut page_result = None;
497 for page_attempt in 0..=Self::PAGE_RATE_LIMIT_RETRIES {
498 match self.request_with_retry(&url).await {
499 Ok(resp) => {
500 page_result = Some(Ok(resp));
501 break;
502 }
503 Err(AppError::RateLimitExceeded)
504 if page_attempt < Self::PAGE_RATE_LIMIT_RETRIES =>
505 {
506 let cooldown = Self::PAGE_RATE_LIMIT_COOLDOWN * (page_attempt + 1);
507 sleep(cooldown).await;
508 *page_delay = (*page_delay * 2).min(Duration::from_secs(5));
509 }
510 Err(e) => {
511 page_result = Some(Err(e));
512 break;
513 }
514 }
515 }
516
517 let resp = match page_result {
518 Some(Ok(resp)) => resp,
519 Some(Err(e)) => return Err(e),
520 None => return Err(AppError::RateLimitExceeded),
521 };
522
523 let ckan_resp: CkanResponse<PackageSearchResult> = resp
524 .json()
525 .await
526 .map_err(|e| AppError::ClientError(e.to_string()))?;
527
528 if !ckan_resp.success {
529 return Err(AppError::Generic(
530 "CKAN package_search returned success: false".to_string(),
531 ));
532 }
533
534 Ok((ckan_resp.result.results, ckan_resp.result.count))
535 }
536
537 const RATE_LIMIT_MAX_RETRIES: u32 = 10;
541
542 async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
546 let http_config = HttpConfig::default();
547 let max_retries = http_config.max_retries;
548 let base_delay = http_config.retry_base_delay;
549 let mut last_error = AppError::Generic("No attempts made".to_string());
550 let effective_max = Self::RATE_LIMIT_MAX_RETRIES.max(max_retries);
552
553 for attempt in 1..=effective_max {
554 match self.client.get(url.clone()).send().await {
555 Ok(resp) => {
556 let status = resp.status();
557
558 if status.is_success() {
559 return Ok(resp);
560 }
561
562 if status == StatusCode::TOO_MANY_REQUESTS {
563 last_error = AppError::RateLimitExceeded;
564 if attempt < effective_max {
565 let delay = resp
567 .headers()
568 .get("retry-after")
569 .and_then(|v| v.to_str().ok())
570 .and_then(|v| v.parse::<u64>().ok())
571 .map(Duration::from_secs)
572 .unwrap_or_else(|| {
573 (base_delay * 2_u32.pow(attempt)).min(Self::MAX_RETRY_DELAY)
574 });
575 sleep(delay).await;
576 continue;
577 }
578
579 return Err(AppError::RateLimitExceeded);
580 }
581
582 if status.is_server_error() {
583 last_error = AppError::ClientError(format!(
584 "Server error: HTTP {}",
585 status.as_u16()
586 ));
587 if attempt < max_retries {
588 let delay = base_delay * attempt;
589 sleep(delay).await;
590 continue;
591 }
592 }
593
594 return Err(AppError::ClientError(format!(
595 "HTTP {} from {}",
596 status.as_u16(),
597 url
598 )));
599 }
600 Err(e) => {
601 if e.is_timeout() {
602 last_error = AppError::Timeout(http_config.timeout.as_secs());
603 } else if e.is_connect() {
604 last_error = AppError::NetworkError(format!("Connection failed: {}", e));
605 } else {
606 last_error = AppError::ClientError(e.to_string());
607 }
608
609 if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
610 let delay = base_delay * attempt;
611 sleep(delay).await;
612 continue;
613 }
614 }
615 }
616 }
617
618 Err(last_error)
619 }
620
621 pub fn into_new_dataset(
665 dataset: CkanDataset,
666 portal_url: &str,
667 url_template: Option<&str>,
668 language: &str,
669 ) -> NewDataset {
670 let landing_page = match url_template {
671 Some(template) => template
672 .replace("{id}", &dataset.id)
673 .replace("{name}", &dataset.name),
674 None => format!(
675 "{}/dataset/{}",
676 portal_url.trim_end_matches('/'),
677 dataset.name
678 ),
679 };
680
681 let metadata_json = serde_json::Value::Object(dataset.extras.clone());
682
683 let title = dataset.title.resolve(language);
685 let description = dataset
688 .notes
689 .map(|n| n.resolve(language))
690 .or_else(|| {
691 dataset
692 .extras
693 .get("description")
694 .and_then(|v| serde_json::from_value::<LocalizedField>(v.clone()).ok())
695 .map(|f| f.resolve(language))
696 })
697 .filter(|d| !d.is_empty());
698
699 let content_hash = NewDataset::compute_content_hash_with_language(
701 &title,
702 description.as_deref(),
703 language,
704 );
705
706 NewDataset {
707 original_id: dataset.id,
708 source_portal: portal_url.to_string(),
709 url: landing_page,
710 title,
711 description,
712 embedding: None,
713 metadata: metadata_json,
714 content_hash,
715 }
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_new_with_valid_url() {
725 let result = CkanClient::new("https://dati.gov.it");
726 assert!(result.is_ok());
727 let client = result.unwrap();
728 assert_eq!(client.base_url.as_str(), "https://dati.gov.it/");
729 }
730
731 #[test]
732 fn test_new_with_invalid_url() {
733 let result = CkanClient::new("not-a-valid-url");
734 assert!(result.is_err());
735
736 if let Err(AppError::InvalidPortalUrl(url)) = result {
737 assert_eq!(url, "not-a-valid-url");
738 } else {
739 panic!("Expected AppError::InvalidPortalUrl");
740 }
741 }
742
743 #[test]
744 fn test_into_new_dataset_basic() {
745 let ckan_dataset = CkanDataset {
746 id: "dataset-123".to_string(),
747 name: "my-dataset".to_string(),
748 title: LocalizedField::Plain("My Dataset".to_string()),
749 notes: Some(LocalizedField::Plain("This is a test dataset".to_string())),
750 extras: serde_json::Map::new(),
751 };
752
753 let portal_url = "https://dati.gov.it";
754 let new_dataset = CkanClient::into_new_dataset(ckan_dataset, portal_url, None, "en");
755
756 assert_eq!(new_dataset.original_id, "dataset-123");
757 assert_eq!(new_dataset.source_portal, "https://dati.gov.it");
758 assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/my-dataset");
759 assert_eq!(new_dataset.title, "My Dataset");
760 assert!(new_dataset.embedding.is_none());
761
762 let expected_hash = NewDataset::compute_content_hash_with_language(
764 "My Dataset",
765 Some("This is a test dataset"),
766 "en",
767 );
768 assert_eq!(new_dataset.content_hash, expected_hash);
769 assert_eq!(new_dataset.content_hash.len(), 64);
770 }
771
772 #[test]
773 fn test_into_new_dataset_with_url_template() {
774 let ckan_dataset = CkanDataset {
775 id: "52db43b1-4d6a-446c-a3fc-b2e470fe5a45".to_string(),
776 name: "raccolta-differenziata".to_string(),
777 title: LocalizedField::Plain("Raccolta Differenziata".to_string()),
778 notes: Some(LocalizedField::Plain(
779 "Percentuale raccolta differenziata".to_string(),
780 )),
781 extras: serde_json::Map::new(),
782 };
783
784 let portal_url = "https://dati.gov.it/opendata/";
785 let template = "https://www.dati.gov.it/view-dataset/dataset?id={id}";
786 let new_dataset =
787 CkanClient::into_new_dataset(ckan_dataset, portal_url, Some(template), "en");
788
789 assert_eq!(
790 new_dataset.url,
791 "https://www.dati.gov.it/view-dataset/dataset?id=52db43b1-4d6a-446c-a3fc-b2e470fe5a45"
792 );
793 assert_eq!(new_dataset.source_portal, "https://dati.gov.it/opendata/");
794 }
795
796 #[test]
797 fn test_into_new_dataset_url_template_with_name() {
798 let ckan_dataset = CkanDataset {
799 id: "abc-123".to_string(),
800 name: "air-quality-data".to_string(),
801 title: LocalizedField::Plain("Air Quality".to_string()),
802 notes: None,
803 extras: serde_json::Map::new(),
804 };
805
806 let template = "https://example.com/datasets/{name}/view";
807 let new_dataset =
808 CkanClient::into_new_dataset(ckan_dataset, "https://example.com", Some(template), "en");
809
810 assert_eq!(
811 new_dataset.url,
812 "https://example.com/datasets/air-quality-data/view"
813 );
814 }
815
816 #[test]
817 fn test_ckan_response_deserialization() {
818 let json = r#"{
819 "success": true,
820 "result": ["dataset-1", "dataset-2", "dataset-3"]
821 }"#;
822
823 let response: CkanResponse<Vec<String>> = serde_json::from_str(json).unwrap();
824 assert!(response.success);
825 assert_eq!(response.result.len(), 3);
826 }
827
828 #[test]
829 fn test_ckan_dataset_deserialization() {
830 let json = r#"{
831 "id": "test-id",
832 "name": "test-name",
833 "title": "Test Title",
834 "notes": "Test notes",
835 "organization": {
836 "name": "test-org"
837 }
838 }"#;
839
840 let dataset: CkanDataset = serde_json::from_str(json).unwrap();
841 assert_eq!(dataset.id, "test-id");
842 assert_eq!(dataset.name, "test-name");
843 assert_eq!(dataset.title.resolve("en"), "Test Title");
844 assert!(dataset.extras.contains_key("organization"));
845 }
846
847 #[test]
848 fn test_ckan_dataset_multilingual_deserialization() {
849 let json = r#"{
850 "id": "swiss-123",
851 "name": "swiss-dataset",
852 "title": {"en": "English Title", "de": "Deutscher Titel", "fr": "Titre Francais"},
853 "notes": {"en": "English description", "de": "Deutsche Beschreibung"}
854 }"#;
855
856 let dataset: CkanDataset = serde_json::from_str(json).unwrap();
857 assert_eq!(dataset.id, "swiss-123");
858 assert_eq!(dataset.title.resolve("en"), "English Title");
859 assert_eq!(dataset.title.resolve("de"), "Deutscher Titel");
860 assert_eq!(dataset.title.resolve("it"), "English Title"); assert_eq!(
862 dataset.notes.as_ref().unwrap().resolve("de"),
863 "Deutsche Beschreibung"
864 );
865 }
866
867 #[test]
868 fn test_into_new_dataset_multilingual() {
869 let json = r#"{
870 "id": "swiss-dataset",
871 "name": "test-multilingual",
872 "title": {"en": "English Title", "de": "Deutscher Titel"},
873 "notes": {"en": "English description", "de": "Deutsche Beschreibung"}
874 }"#;
875 let dataset: CkanDataset = serde_json::from_str(json).unwrap();
876 let new_ds =
877 CkanClient::into_new_dataset(dataset, "https://ckan.opendata.swiss", None, "de");
878 assert_eq!(new_ds.title, "Deutscher Titel");
879 assert_eq!(
880 new_ds.description,
881 Some("Deutsche Beschreibung".to_string())
882 );
883 }
884
885 #[test]
886 fn test_into_new_dataset_description_fallback() {
887 let json = r#"{
889 "id": "swiss-no-notes",
890 "name": "dataset-without-notes",
891 "title": {"en": "English Title", "de": "Deutscher Titel"},
892 "description": {"en": "English desc", "de": "Deutsche Beschreibung"}
893 }"#;
894 let dataset: CkanDataset = serde_json::from_str(json).unwrap();
895 assert!(dataset.notes.is_none());
896 let new_ds =
897 CkanClient::into_new_dataset(dataset, "https://ckan.opendata.swiss", None, "en");
898 assert_eq!(new_ds.description, Some("English desc".to_string()));
899 }
900
901 #[test]
902 fn test_into_new_dataset_description_fallback_empty() {
903 let json = r#"{
905 "id": "swiss-empty-desc",
906 "name": "dataset-empty-desc",
907 "title": "Some Title",
908 "description": {"en": "", "de": "", "fr": ""}
909 }"#;
910 let dataset: CkanDataset = serde_json::from_str(json).unwrap();
911 let new_ds =
912 CkanClient::into_new_dataset(dataset, "https://ckan.opendata.swiss", None, "en");
913 assert_eq!(new_ds.description, None);
914 }
915
916 #[test]
917 fn test_into_new_dataset_notes_takes_priority() {
918 let json = r#"{
920 "id": "both-fields",
921 "name": "dataset-both",
922 "title": "Title",
923 "notes": "Notes description",
924 "description": {"en": "Extras description"}
925 }"#;
926 let dataset: CkanDataset = serde_json::from_str(json).unwrap();
927 let new_ds = CkanClient::into_new_dataset(dataset, "https://example.com", None, "en");
928 assert_eq!(new_ds.description, Some("Notes description".to_string()));
929 }
930
931 #[test]
932 fn test_is_page_size_reducible_timeout() {
933 assert!(CkanClient::is_page_size_reducible(&AppError::Timeout(30)));
934 }
935
936 #[test]
937 fn test_is_page_size_reducible_client_error() {
938 let err = AppError::ClientError("error decoding response body".to_string());
939 assert!(!CkanClient::is_page_size_reducible(&err));
940 }
941
942 #[test]
943 fn test_is_page_size_reducible_network_error() {
944 let err = AppError::NetworkError("connection reset".to_string());
945 assert!(CkanClient::is_page_size_reducible(&err));
946 }
947
948 #[test]
949 fn test_is_page_size_reducible_non_reducible() {
950 assert!(!CkanClient::is_page_size_reducible(
951 &AppError::RateLimitExceeded
952 ));
953 assert!(!CkanClient::is_page_size_reducible(&AppError::Generic(
954 "something".to_string()
955 )));
956 }
957}
958
959impl ceres_core::traits::PortalClient for CkanClient {
964 type PortalData = CkanDataset;
965
966 fn portal_type(&self) -> &'static str {
967 "ckan"
968 }
969
970 fn base_url(&self) -> &str {
971 self.base_url.as_str()
972 }
973
974 async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
975 self.list_package_ids().await
976 }
977
978 async fn get_dataset(&self, id: &str) -> Result<Self::PortalData, AppError> {
979 self.show_package(id).await
980 }
981
982 fn into_new_dataset(
983 data: Self::PortalData,
984 portal_url: &str,
985 url_template: Option<&str>,
986 language: &str,
987 ) -> NewDataset {
988 CkanClient::into_new_dataset(data, portal_url, url_template, language)
989 }
990
991 async fn search_modified_since(
992 &self,
993 since: DateTime<Utc>,
994 ) -> Result<Vec<Self::PortalData>, AppError> {
995 self.search_modified_since(since).await
996 }
997
998 async fn search_all_datasets(&self) -> Result<Vec<Self::PortalData>, AppError> {
999 self.search_all_datasets().await
1000 }
1001
1002 fn search_all_datasets_stream(&self) -> BoxStream<'_, Result<Vec<Self::PortalData>, AppError>> {
1003 self.paginated_search_stream(None)
1004 }
1005
1006 async fn dataset_count(&self) -> Result<usize, AppError> {
1007 self.dataset_count().await
1008 }
1009}
1010
1011#[derive(Debug, Clone, Default)]
1013pub struct CkanClientFactory;
1014
1015impl CkanClientFactory {}
1016
1017impl ceres_core::traits::PortalClientFactory for CkanClientFactory {
1018 type Client = CkanClient;
1019
1020 fn create(
1021 &self,
1022 portal_url: &str,
1023 portal_type: ceres_core::config::PortalType,
1024 _language: &str,
1025 ) -> Result<Self::Client, AppError> {
1026 match portal_type {
1027 ceres_core::config::PortalType::Ckan => CkanClient::new(portal_url),
1028 other => Err(AppError::ConfigError(format!(
1029 "CkanClientFactory can only create CKAN clients, but portal type {:?} was requested for URL {}",
1030 other, portal_url
1031 ))),
1032 }
1033 }
1034}