Skip to main content

ceres_client/
ckan.rs

1//! CKAN client for harvesting datasets from CKAN-compatible open data portals.
2//!
3//! # Future Extensions
4//!
5//! The `PortalClient` trait (defined in `ceres_core::traits`) was introduced in PR #90,
6//! abstracting over portal types. Remaining portal implementations tracked in #61:
7//! - Socrata API (used by many US cities): <https://dev.socrata.com/>
8//! - DCAT-AP harvester for EU portals: <https://joinup.ec.europa.eu/collection/semantic-interoperability-community-semic/solution/dcat-application-profile-data-portals-europe>
9
10use std::time::Duration;
11
12use ceres_core::HttpConfig;
13use ceres_core::LocalizedField;
14use ceres_core::error::AppError;
15use ceres_core::models::NewDataset;
16use chrono::{DateTime, Utc};
17use futures::stream::BoxStream;
18use reqwest::{Client, StatusCode, Url};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::time::sleep;
22
23/// Generic wrapper for CKAN API responses.
24///
25/// CKAN API reference: <https://docs.ckan.org/en/2.9/api/>
26///
27/// CKAN always returns responses with the structure:
28/// ```json
29/// {
30///     "success": bool,
31///     "result": T
32/// }
33/// ```
34#[derive(Deserialize, Debug)]
35struct CkanResponse<T> {
36    success: bool,
37    result: T,
38}
39
40/// Response structure for CKAN package_search API.
41#[derive(Deserialize, Debug)]
42struct PackageSearchResult {
43    count: usize,
44    results: Vec<CkanDataset>,
45}
46
47/// Data Transfer Object for CKAN dataset details.
48///
49/// This structure represents the core fields returned by the CKAN `package_show` API.
50/// Additional fields returned by CKAN are captured in the `extras` map.
51///
52/// The `title` and `notes` fields use [`LocalizedField`] to support both plain
53/// strings and multilingual objects (e.g., `{"en": "...", "de": "..."}`).
54///
55/// # Examples
56///
57/// ```
58/// use ceres_client::ckan::CkanDataset;
59///
60/// // Plain string fields (most portals)
61/// let json = r#"{
62///     "id": "dataset-123",
63///     "name": "my-dataset",
64///     "title": "My Dataset",
65///     "notes": "Description of the dataset",
66///     "organization": {"name": "test-org"}
67/// }"#;
68///
69/// let dataset: CkanDataset = serde_json::from_str(json).unwrap();
70/// assert_eq!(dataset.id, "dataset-123");
71/// assert_eq!(dataset.title.resolve("en"), "My Dataset");
72/// assert!(dataset.extras.contains_key("organization"));
73///
74/// // Multilingual fields (e.g., Swiss portals)
75/// let json = r#"{
76///     "id": "dataset-456",
77///     "name": "swiss-dataset",
78///     "title": {"en": "English Title", "de": "Deutscher Titel"},
79///     "notes": {"en": "English description", "de": "Deutsche Beschreibung"}
80/// }"#;
81///
82/// let dataset: CkanDataset = serde_json::from_str(json).unwrap();
83/// assert_eq!(dataset.title.resolve("de"), "Deutscher Titel");
84/// assert_eq!(dataset.notes.as_ref().unwrap().resolve("en"), "English description");
85/// ```
86#[derive(Deserialize, Debug, Clone)]
87pub struct CkanDataset {
88    /// Unique identifier for the dataset
89    pub id: String,
90    /// URL-friendly name/slug of the dataset
91    pub name: String,
92    /// Human-readable title of the dataset (plain string or multilingual object)
93    pub title: LocalizedField,
94    /// Optional description/notes about the dataset (plain string or multilingual object)
95    pub notes: Option<LocalizedField>,
96    /// All other fields returned by CKAN (e.g., organization, tags, resources)
97    #[serde(flatten)]
98    pub extras: serde_json::Map<String, Value>,
99}
100
101/// HTTP client for interacting with CKAN open data portals.
102///
103/// CKAN (Comprehensive Knowledge Archive Network) is an open-source data management
104/// system used by many government open data portals worldwide.
105///
106/// # Examples
107///
108/// ```no_run
109/// use ceres_client::CkanClient;
110///
111/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
112/// let client = CkanClient::new("https://dati.gov.it")?;
113/// let dataset_ids = client.list_package_ids().await?;
114/// println!("Found {} datasets", dataset_ids.len());
115/// # Ok(())
116/// # }
117/// ```
118#[derive(Clone)]
119pub struct CkanClient {
120    client: Client,
121    base_url: Url,
122}
123
124impl CkanClient {
125    /// Delay between paginated API requests to avoid rate limiting.
126    const PAGE_DELAY: Duration = Duration::from_secs(1);
127
128    /// Maximum backoff delay for rate-limited retries within `request_with_retry`.
129    const MAX_RETRY_DELAY: Duration = Duration::from_secs(30);
130
131    /// Cooldown when a page request is rate-limited after exhausting low-level retries.
132    /// The pagination loop waits this long before retrying the same page.
133    const PAGE_RATE_LIMIT_COOLDOWN: Duration = Duration::from_secs(60);
134
135    /// Maximum number of page-level retries when a page is rate-limited.
136    const PAGE_RATE_LIMIT_RETRIES: u32 = 3;
137
138    /// Minimum page size for adaptive reduction.
139    /// Some portals truncate responses above ~1.5MB, so even 50 rows can fail
140    /// if datasets at that offset are resource-heavy. 10 is low enough to
141    /// handle any realistic dataset size while still making progress.
142    const MIN_PAGE_SIZE: usize = 10;
143
144    /// Errors worth retrying with a smaller page size.
145    ///
146    /// Timeouts and body-related errors suggest the response is too large for
147    /// the portal to serve reliably. Network errors during body streaming also
148    /// qualify since a smaller page means less data to transfer.
149    fn is_page_size_reducible(err: &AppError) -> bool {
150        matches!(err, AppError::Timeout(_) | AppError::NetworkError(_))
151    }
152
153    /// Creates a new CKAN client for the specified portal.
154    ///
155    /// # Arguments
156    ///
157    /// * `base_url_str` - The base URL of the CKAN portal (e.g., <https://dati.gov.it>)
158    ///
159    /// # Returns
160    ///
161    /// Returns a configured `CkanClient` instance.
162    ///
163    /// # Errors
164    ///
165    /// Returns `AppError::Generic` if the URL is invalid or malformed.
166    /// Returns `AppError::ClientError` if the HTTP client cannot be built.
167    // TODO(validation): Add optional portal validation on construction
168    // Could probe /api/3/action/site_read to verify it's a valid CKAN portal.
169    // Add: pub async fn new_validated(url: &str) -> Result<Self, AppError>
170    pub fn new(base_url_str: &str) -> Result<Self, AppError> {
171        let base_url = Url::parse(base_url_str)
172            .map_err(|_| AppError::InvalidPortalUrl(base_url_str.to_string()))?;
173
174        let http_config = HttpConfig::default();
175        let client = Client::builder()
176            // TODO(config): Make User-Agent configurable or use version from Cargo.toml
177            .user_agent("Ceres/0.1 (semantic-search-bot)")
178            .timeout(http_config.timeout)
179            .build()
180            .map_err(|e| AppError::ClientError(e.to_string()))?;
181
182        Ok(Self { client, base_url })
183    }
184
185    /// Fetches the complete list of dataset IDs from the CKAN portal.
186    ///
187    /// This method calls the CKAN `package_list` API endpoint, which returns
188    /// all dataset identifiers available in the portal.
189    ///
190    /// # Returns
191    ///
192    /// A vector of dataset ID strings.
193    ///
194    /// # Errors
195    ///
196    /// Returns `AppError::ClientError` if the HTTP request fails.
197    /// Returns `AppError::Generic` if the CKAN API returns an error.
198    ///
199    /// # Performance Note
200    ///
201    /// TODO(performance): Add pagination for large portals
202    /// Large portals can have 100k+ datasets. CKAN supports limit/offset params.
203    /// Consider: `list_package_ids_paginated(limit: usize, offset: usize)`
204    /// Or streaming: `list_package_ids_stream() -> impl Stream<Item = ...>`
205    pub async fn list_package_ids(&self) -> Result<Vec<String>, AppError> {
206        let url = self
207            .base_url
208            .join("api/3/action/package_list")
209            .map_err(|e| AppError::Generic(e.to_string()))?;
210
211        let resp = self.request_with_retry(&url).await?;
212
213        let ckan_resp: CkanResponse<Vec<String>> = resp
214            .json()
215            .await
216            .map_err(|e| AppError::ClientError(e.to_string()))?;
217
218        if !ckan_resp.success {
219            return Err(AppError::Generic(
220                "CKAN API returned success: false".to_string(),
221            ));
222        }
223
224        Ok(ckan_resp.result)
225    }
226
227    /// Fetches the full details of a specific dataset by ID.
228    ///
229    /// This method calls the CKAN `package_show` API endpoint to retrieve
230    /// complete metadata for a single dataset.
231    ///
232    /// # Arguments
233    ///
234    /// * `id` - The unique identifier or name slug of the dataset
235    ///
236    /// # Returns
237    ///
238    /// A `CkanDataset` containing the dataset's metadata.
239    pub async fn show_package(&self, id: &str) -> Result<CkanDataset, AppError> {
240        let mut url = self
241            .base_url
242            .join("api/3/action/package_show")
243            .map_err(|e| AppError::Generic(e.to_string()))?;
244
245        url.query_pairs_mut().append_pair("id", id);
246
247        let resp = self.request_with_retry(&url).await?;
248
249        let ckan_resp: CkanResponse<CkanDataset> = resp
250            .json()
251            .await
252            .map_err(|e| AppError::ClientError(e.to_string()))?;
253
254        if !ckan_resp.success {
255            return Err(AppError::Generic(format!(
256                "CKAN failed to show package {}",
257                id
258            )));
259        }
260
261        Ok(ckan_resp.result)
262    }
263
264    /// Searches for datasets modified since a given timestamp.
265    ///
266    /// Uses CKAN's `package_search` API with a `metadata_modified` filter to fetch
267    /// only datasets that have been updated since the last sync. This enables
268    /// incremental harvesting with ~99% fewer API calls in steady state.
269    ///
270    /// # Arguments
271    ///
272    /// * `since` - Only return datasets modified after this timestamp
273    ///
274    /// # Returns
275    ///
276    /// A vector of `CkanDataset` containing all datasets modified since the given time.
277    /// Unlike `list_package_ids()` + `show_package()`, this returns complete dataset
278    /// objects in a single paginated query.
279    ///
280    /// # Errors
281    ///
282    /// Returns `AppError::ClientError` if the HTTP request fails.
283    /// Returns `AppError::Generic` if the CKAN API returns an error or doesn't support
284    /// the `package_search` endpoint (some older CKAN instances).
285    pub async fn search_modified_since(
286        &self,
287        since: DateTime<Utc>,
288    ) -> Result<Vec<CkanDataset>, AppError> {
289        let since_str = since.format("%Y-%m-%dT%H:%M:%SZ").to_string();
290        let fq = Some(format!("metadata_modified:[{} TO *]", since_str));
291        self.paginated_search(fq.as_deref()).await
292    }
293
294    /// Fetches all datasets from the portal using paginated `package_search`.
295    ///
296    /// This makes ~N/1000 API calls instead of N individual `package_show` calls,
297    /// which is critical for large portals like HDX (~40k datasets) that enforce
298    /// strict rate limits.
299    pub async fn search_all_datasets(&self) -> Result<Vec<CkanDataset>, AppError> {
300        self.paginated_search(None).await
301    }
302
303    /// Core paginated search with page-level rate limit resilience.
304    ///
305    /// Fetches all matching datasets using `package_search`, handling pagination
306    /// and rate limiting at two levels:
307    /// 1. Per-request retries via `request_with_retry` (immediate backoff)
308    /// 2. Per-page retries with longer cooldown when a page is persistently rate-limited
309    ///
310    /// # Arguments
311    ///
312    /// * `fq` - Optional Solr filter query (e.g., `metadata_modified:[... TO *]`)
313    async fn paginated_search(&self, fq: Option<&str>) -> Result<Vec<CkanDataset>, AppError> {
314        let mut page_size: usize = 1000;
315        let mut all_datasets = Vec::new();
316        let mut start: usize = 0;
317        let mut page_delay = Self::PAGE_DELAY;
318
319        loop {
320            match self
321                .fetch_search_page(fq, start, page_size, &mut page_delay)
322                .await
323            {
324                Ok((datasets, total_count)) => {
325                    let page_count = datasets.len();
326                    all_datasets.extend(datasets);
327
328                    // Check if we've fetched all results
329                    if start + page_count >= total_count || page_count < page_size {
330                        break;
331                    }
332
333                    start += page_size;
334                }
335                Err(e) if page_size > Self::MIN_PAGE_SIZE && Self::is_page_size_reducible(&e) => {
336                    // Timeout or truncated response — quarter the page size and
337                    // retry the same offset. Quartering converges faster than
338                    // halving (1000→250→62→15→10 vs 1000→500→250→125→62→31→15→10).
339                    page_size = (page_size / 4).max(Self::MIN_PAGE_SIZE);
340                    tracing::warn!(
341                        new_page_size = page_size,
342                        offset = start,
343                        error = %e,
344                        "Page failed, reducing page size and retrying"
345                    );
346                    continue;
347                }
348                Err(e) if Self::is_page_size_reducible(&e) => {
349                    // Already at MIN_PAGE_SIZE and still failing — this portal
350                    // can't reliably serve package_search (e.g. response size cap).
351                    // Return a non-retryable error so the caller falls back to
352                    // ID-by-ID fetching.
353                    return Err(AppError::Generic(format!(
354                        "package_search unreliable even at page_size={page_size}: {e}"
355                    )));
356                }
357                Err(e) => return Err(e),
358            }
359
360            // Polite delay between pages to avoid triggering rate limits
361            sleep(page_delay).await;
362        }
363
364        Ok(all_datasets)
365    }
366
367    /// Streams datasets page-by-page instead of accumulating into a single Vec.
368    ///
369    /// Each yielded item is one page of results (up to `page_size` datasets).
370    /// Memory is bounded to a single page at a time on the consumer side.
371    /// Uses the same adaptive page size and rate-limit logic as `paginated_search`.
372    fn paginated_search_stream(
373        &self,
374        fq: Option<String>,
375    ) -> BoxStream<'_, Result<Vec<CkanDataset>, AppError>> {
376        struct PaginationState {
377            start: usize,
378            page_size: usize,
379            page_delay: Duration,
380            done: bool,
381            fq: Option<String>,
382        }
383
384        let initial = PaginationState {
385            start: 0,
386            page_size: 1000,
387            page_delay: Self::PAGE_DELAY,
388            done: false,
389            fq,
390        };
391
392        Box::pin(futures::stream::unfold(
393            initial,
394            move |mut state| async move {
395                if state.done {
396                    return None;
397                }
398
399                loop {
400                    match self
401                        .fetch_search_page(
402                            state.fq.as_deref(),
403                            state.start,
404                            state.page_size,
405                            &mut state.page_delay,
406                        )
407                        .await
408                    {
409                        Ok((datasets, total_count)) => {
410                            let page_count = datasets.len();
411
412                            if state.start + page_count >= total_count
413                                || page_count < state.page_size
414                            {
415                                state.done = true;
416                            } else {
417                                state.start += state.page_size;
418                            }
419
420                            // Sleep between pages (skipped for last page)
421                            if !state.done {
422                                sleep(state.page_delay).await;
423                            }
424
425                            return Some((Ok(datasets), state));
426                        }
427                        Err(e)
428                            if state.page_size > Self::MIN_PAGE_SIZE
429                                && Self::is_page_size_reducible(&e) =>
430                        {
431                            state.page_size = (state.page_size / 4).max(Self::MIN_PAGE_SIZE);
432                            tracing::warn!(
433                                new_page_size = state.page_size,
434                                offset = state.start,
435                                error = %e,
436                                "Page failed, reducing page size and retrying"
437                            );
438                            continue; // retry same offset with smaller page
439                        }
440                        Err(e) if Self::is_page_size_reducible(&e) => {
441                            state.done = true;
442                            return Some((
443                                Err(AppError::Generic(format!(
444                                    "package_search unreliable even at page_size={}: {e}",
445                                    state.page_size
446                                ))),
447                                state,
448                            ));
449                        }
450                        Err(e) => {
451                            state.done = true;
452                            return Some((Err(e), state));
453                        }
454                    }
455                }
456            },
457        ))
458    }
459
460    /// Returns the total dataset count from a lightweight `rows=0` query.
461    pub async fn dataset_count(&self) -> Result<usize, AppError> {
462        let mut page_delay = Self::PAGE_DELAY;
463        let (_, total) = self.fetch_search_page(None, 0, 0, &mut page_delay).await?;
464        Ok(total)
465    }
466
467    /// Fetches a single page of search results with rate-limit resilience.
468    ///
469    /// Returns `(datasets, total_count)` where `total_count` is the total number
470    /// of matching datasets reported by CKAN (for pagination control).
471    async fn fetch_search_page(
472        &self,
473        fq: Option<&str>,
474        start: usize,
475        page_size: usize,
476        page_delay: &mut Duration,
477    ) -> Result<(Vec<CkanDataset>, usize), AppError> {
478        let mut url = self
479            .base_url
480            .join("api/3/action/package_search")
481            .map_err(|e| AppError::Generic(e.to_string()))?;
482
483        {
484            let mut pairs = url.query_pairs_mut();
485            if let Some(filter) = fq {
486                pairs.append_pair("fq", filter);
487            }
488            pairs
489                .append_pair("rows", &page_size.to_string())
490                .append_pair("start", &start.to_string())
491                .append_pair("sort", "metadata_modified asc");
492        }
493
494        // Page-level retry: if the request is rate-limited after exhausting
495        // low-level retries, wait a longer cooldown and try the page again.
496        let mut page_result = None;
497        for page_attempt in 0..=Self::PAGE_RATE_LIMIT_RETRIES {
498            match self.request_with_retry(&url).await {
499                Ok(resp) => {
500                    page_result = Some(Ok(resp));
501                    break;
502                }
503                Err(AppError::RateLimitExceeded)
504                    if page_attempt < Self::PAGE_RATE_LIMIT_RETRIES =>
505                {
506                    let cooldown = Self::PAGE_RATE_LIMIT_COOLDOWN * (page_attempt + 1);
507                    sleep(cooldown).await;
508                    *page_delay = (*page_delay * 2).min(Duration::from_secs(5));
509                }
510                Err(e) => {
511                    page_result = Some(Err(e));
512                    break;
513                }
514            }
515        }
516
517        let resp = match page_result {
518            Some(Ok(resp)) => resp,
519            Some(Err(e)) => return Err(e),
520            None => return Err(AppError::RateLimitExceeded),
521        };
522
523        let ckan_resp: CkanResponse<PackageSearchResult> = resp
524            .json()
525            .await
526            .map_err(|e| AppError::ClientError(e.to_string()))?;
527
528        if !ckan_resp.success {
529            return Err(AppError::Generic(
530                "CKAN package_search returned success: false".to_string(),
531            ));
532        }
533
534        Ok((ckan_resp.result.results, ckan_resp.result.count))
535    }
536
537    /// Maximum retries for rate-limited (429) responses.
538    /// Higher than normal retries because rate limits are transient.
539    /// With 500ms base and 30s cap: 1s, 2s, 4s, 8s, 16s, 30s, 30s, 30s, 30s = ~151s total wait.
540    const RATE_LIMIT_MAX_RETRIES: u32 = 10;
541
542    // TODO(observability): Add detailed retry logging
543    // Should log: (1) Attempt number and delay, (2) Reason for retry,
544    // (3) Final error if all retries exhausted. Use tracing crate.
545    async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
546        let http_config = HttpConfig::default();
547        let max_retries = http_config.max_retries;
548        let base_delay = http_config.retry_base_delay;
549        let mut last_error = AppError::Generic("No attempts made".to_string());
550        // Use higher retry count for 429s since they are transient
551        let effective_max = Self::RATE_LIMIT_MAX_RETRIES.max(max_retries);
552
553        for attempt in 1..=effective_max {
554            match self.client.get(url.clone()).send().await {
555                Ok(resp) => {
556                    let status = resp.status();
557
558                    if status.is_success() {
559                        return Ok(resp);
560                    }
561
562                    if status == StatusCode::TOO_MANY_REQUESTS {
563                        last_error = AppError::RateLimitExceeded;
564                        if attempt < effective_max {
565                            // Respect Retry-After header if present, otherwise exponential backoff (capped)
566                            let delay = resp
567                                .headers()
568                                .get("retry-after")
569                                .and_then(|v| v.to_str().ok())
570                                .and_then(|v| v.parse::<u64>().ok())
571                                .map(Duration::from_secs)
572                                .unwrap_or_else(|| {
573                                    (base_delay * 2_u32.pow(attempt)).min(Self::MAX_RETRY_DELAY)
574                                });
575                            sleep(delay).await;
576                            continue;
577                        }
578
579                        return Err(AppError::RateLimitExceeded);
580                    }
581
582                    if status.is_server_error() {
583                        last_error = AppError::ClientError(format!(
584                            "Server error: HTTP {}",
585                            status.as_u16()
586                        ));
587                        if attempt < max_retries {
588                            let delay = base_delay * attempt;
589                            sleep(delay).await;
590                            continue;
591                        }
592                    }
593
594                    return Err(AppError::ClientError(format!(
595                        "HTTP {} from {}",
596                        status.as_u16(),
597                        url
598                    )));
599                }
600                Err(e) => {
601                    if e.is_timeout() {
602                        last_error = AppError::Timeout(http_config.timeout.as_secs());
603                    } else if e.is_connect() {
604                        last_error = AppError::NetworkError(format!("Connection failed: {}", e));
605                    } else {
606                        last_error = AppError::ClientError(e.to_string());
607                    }
608
609                    if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
610                        let delay = base_delay * attempt;
611                        sleep(delay).await;
612                        continue;
613                    }
614                }
615            }
616        }
617
618        Err(last_error)
619    }
620
621    /// Converts a CKAN dataset into Ceres' internal `NewDataset` model.
622    ///
623    /// This helper method transforms CKAN-specific data structures into the format
624    /// used by Ceres for database storage. Multilingual fields are resolved using
625    /// the specified language preference.
626    ///
627    /// # Arguments
628    ///
629    /// * `dataset` - The CKAN dataset to convert
630    /// * `portal_url` - The base URL of the CKAN portal
631    /// * `url_template` - Optional URL template with `{id}` and `{name}` placeholders
632    /// * `language` - Preferred language for resolving multilingual fields
633    ///
634    /// # Returns
635    ///
636    /// A `NewDataset` ready to be inserted into the database.
637    ///
638    /// # Examples
639    ///
640    /// ```
641    /// use ceres_client::CkanClient;
642    /// use ceres_client::ckan::CkanDataset;
643    /// use ceres_core::LocalizedField;
644    ///
645    /// let ckan_dataset = CkanDataset {
646    ///     id: "abc-123".to_string(),
647    ///     name: "air-quality-data".to_string(),
648    ///     title: LocalizedField::Plain("Air Quality Monitoring".to_string()),
649    ///     notes: Some(LocalizedField::Plain("Data from air quality sensors".to_string())),
650    ///     extras: serde_json::Map::new(),
651    /// };
652    ///
653    /// let new_dataset = CkanClient::into_new_dataset(
654    ///     ckan_dataset,
655    ///     "https://dati.gov.it",
656    ///     None,
657    ///     "en",
658    /// );
659    ///
660    /// assert_eq!(new_dataset.original_id, "abc-123");
661    /// assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/air-quality-data");
662    /// assert_eq!(new_dataset.title, "Air Quality Monitoring");
663    /// ```
664    pub fn into_new_dataset(
665        dataset: CkanDataset,
666        portal_url: &str,
667        url_template: Option<&str>,
668        language: &str,
669    ) -> NewDataset {
670        let landing_page = match url_template {
671            Some(template) => template
672                .replace("{id}", &dataset.id)
673                .replace("{name}", &dataset.name),
674            None => format!(
675                "{}/dataset/{}",
676                portal_url.trim_end_matches('/'),
677                dataset.name
678            ),
679        };
680
681        let metadata_json = serde_json::Value::Object(dataset.extras.clone());
682
683        // Resolve multilingual fields using preferred language
684        let title = dataset.title.resolve(language);
685        // Some portals (e.g., opendata.swiss) store the description in a
686        // "description" field instead of "notes". Fall back to extras if needed.
687        let description = dataset
688            .notes
689            .map(|n| n.resolve(language))
690            .or_else(|| {
691                dataset
692                    .extras
693                    .get("description")
694                    .and_then(|v| serde_json::from_value::<LocalizedField>(v.clone()).ok())
695                    .map(|f| f.resolve(language))
696            })
697            .filter(|d| !d.is_empty());
698
699        // Content hash includes language for correct delta detection
700        let content_hash = NewDataset::compute_content_hash_with_language(
701            &title,
702            description.as_deref(),
703            language,
704        );
705
706        NewDataset {
707            original_id: dataset.id,
708            source_portal: portal_url.to_string(),
709            url: landing_page,
710            title,
711            description,
712            embedding: None,
713            metadata: metadata_json,
714            content_hash,
715        }
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722
723    #[test]
724    fn test_new_with_valid_url() {
725        let result = CkanClient::new("https://dati.gov.it");
726        assert!(result.is_ok());
727        let client = result.unwrap();
728        assert_eq!(client.base_url.as_str(), "https://dati.gov.it/");
729    }
730
731    #[test]
732    fn test_new_with_invalid_url() {
733        let result = CkanClient::new("not-a-valid-url");
734        assert!(result.is_err());
735
736        if let Err(AppError::InvalidPortalUrl(url)) = result {
737            assert_eq!(url, "not-a-valid-url");
738        } else {
739            panic!("Expected AppError::InvalidPortalUrl");
740        }
741    }
742
743    #[test]
744    fn test_into_new_dataset_basic() {
745        let ckan_dataset = CkanDataset {
746            id: "dataset-123".to_string(),
747            name: "my-dataset".to_string(),
748            title: LocalizedField::Plain("My Dataset".to_string()),
749            notes: Some(LocalizedField::Plain("This is a test dataset".to_string())),
750            extras: serde_json::Map::new(),
751        };
752
753        let portal_url = "https://dati.gov.it";
754        let new_dataset = CkanClient::into_new_dataset(ckan_dataset, portal_url, None, "en");
755
756        assert_eq!(new_dataset.original_id, "dataset-123");
757        assert_eq!(new_dataset.source_portal, "https://dati.gov.it");
758        assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/my-dataset");
759        assert_eq!(new_dataset.title, "My Dataset");
760        assert!(new_dataset.embedding.is_none());
761
762        // Verify content hash is computed correctly (includes language)
763        let expected_hash = NewDataset::compute_content_hash_with_language(
764            "My Dataset",
765            Some("This is a test dataset"),
766            "en",
767        );
768        assert_eq!(new_dataset.content_hash, expected_hash);
769        assert_eq!(new_dataset.content_hash.len(), 64);
770    }
771
772    #[test]
773    fn test_into_new_dataset_with_url_template() {
774        let ckan_dataset = CkanDataset {
775            id: "52db43b1-4d6a-446c-a3fc-b2e470fe5a45".to_string(),
776            name: "raccolta-differenziata".to_string(),
777            title: LocalizedField::Plain("Raccolta Differenziata".to_string()),
778            notes: Some(LocalizedField::Plain(
779                "Percentuale raccolta differenziata".to_string(),
780            )),
781            extras: serde_json::Map::new(),
782        };
783
784        let portal_url = "https://dati.gov.it/opendata/";
785        let template = "https://www.dati.gov.it/view-dataset/dataset?id={id}";
786        let new_dataset =
787            CkanClient::into_new_dataset(ckan_dataset, portal_url, Some(template), "en");
788
789        assert_eq!(
790            new_dataset.url,
791            "https://www.dati.gov.it/view-dataset/dataset?id=52db43b1-4d6a-446c-a3fc-b2e470fe5a45"
792        );
793        assert_eq!(new_dataset.source_portal, "https://dati.gov.it/opendata/");
794    }
795
796    #[test]
797    fn test_into_new_dataset_url_template_with_name() {
798        let ckan_dataset = CkanDataset {
799            id: "abc-123".to_string(),
800            name: "air-quality-data".to_string(),
801            title: LocalizedField::Plain("Air Quality".to_string()),
802            notes: None,
803            extras: serde_json::Map::new(),
804        };
805
806        let template = "https://example.com/datasets/{name}/view";
807        let new_dataset =
808            CkanClient::into_new_dataset(ckan_dataset, "https://example.com", Some(template), "en");
809
810        assert_eq!(
811            new_dataset.url,
812            "https://example.com/datasets/air-quality-data/view"
813        );
814    }
815
816    #[test]
817    fn test_ckan_response_deserialization() {
818        let json = r#"{
819            "success": true,
820            "result": ["dataset-1", "dataset-2", "dataset-3"]
821        }"#;
822
823        let response: CkanResponse<Vec<String>> = serde_json::from_str(json).unwrap();
824        assert!(response.success);
825        assert_eq!(response.result.len(), 3);
826    }
827
828    #[test]
829    fn test_ckan_dataset_deserialization() {
830        let json = r#"{
831            "id": "test-id",
832            "name": "test-name",
833            "title": "Test Title",
834            "notes": "Test notes",
835            "organization": {
836                "name": "test-org"
837            }
838        }"#;
839
840        let dataset: CkanDataset = serde_json::from_str(json).unwrap();
841        assert_eq!(dataset.id, "test-id");
842        assert_eq!(dataset.name, "test-name");
843        assert_eq!(dataset.title.resolve("en"), "Test Title");
844        assert!(dataset.extras.contains_key("organization"));
845    }
846
847    #[test]
848    fn test_ckan_dataset_multilingual_deserialization() {
849        let json = r#"{
850            "id": "swiss-123",
851            "name": "swiss-dataset",
852            "title": {"en": "English Title", "de": "Deutscher Titel", "fr": "Titre Francais"},
853            "notes": {"en": "English description", "de": "Deutsche Beschreibung"}
854        }"#;
855
856        let dataset: CkanDataset = serde_json::from_str(json).unwrap();
857        assert_eq!(dataset.id, "swiss-123");
858        assert_eq!(dataset.title.resolve("en"), "English Title");
859        assert_eq!(dataset.title.resolve("de"), "Deutscher Titel");
860        assert_eq!(dataset.title.resolve("it"), "English Title"); // fallback to en
861        assert_eq!(
862            dataset.notes.as_ref().unwrap().resolve("de"),
863            "Deutsche Beschreibung"
864        );
865    }
866
867    #[test]
868    fn test_into_new_dataset_multilingual() {
869        let json = r#"{
870            "id": "swiss-dataset",
871            "name": "test-multilingual",
872            "title": {"en": "English Title", "de": "Deutscher Titel"},
873            "notes": {"en": "English description", "de": "Deutsche Beschreibung"}
874        }"#;
875        let dataset: CkanDataset = serde_json::from_str(json).unwrap();
876        let new_ds =
877            CkanClient::into_new_dataset(dataset, "https://ckan.opendata.swiss", None, "de");
878        assert_eq!(new_ds.title, "Deutscher Titel");
879        assert_eq!(
880            new_ds.description,
881            Some("Deutsche Beschreibung".to_string())
882        );
883    }
884
885    #[test]
886    fn test_into_new_dataset_description_fallback() {
887        // Swiss portal stores description in "description" field, not "notes"
888        let json = r#"{
889            "id": "swiss-no-notes",
890            "name": "dataset-without-notes",
891            "title": {"en": "English Title", "de": "Deutscher Titel"},
892            "description": {"en": "English desc", "de": "Deutsche Beschreibung"}
893        }"#;
894        let dataset: CkanDataset = serde_json::from_str(json).unwrap();
895        assert!(dataset.notes.is_none());
896        let new_ds =
897            CkanClient::into_new_dataset(dataset, "https://ckan.opendata.swiss", None, "en");
898        assert_eq!(new_ds.description, Some("English desc".to_string()));
899    }
900
901    #[test]
902    fn test_into_new_dataset_description_fallback_empty() {
903        // If "description" exists but all translations are empty, description should be None
904        let json = r#"{
905            "id": "swiss-empty-desc",
906            "name": "dataset-empty-desc",
907            "title": "Some Title",
908            "description": {"en": "", "de": "", "fr": ""}
909        }"#;
910        let dataset: CkanDataset = serde_json::from_str(json).unwrap();
911        let new_ds =
912            CkanClient::into_new_dataset(dataset, "https://ckan.opendata.swiss", None, "en");
913        assert_eq!(new_ds.description, None);
914    }
915
916    #[test]
917    fn test_into_new_dataset_notes_takes_priority() {
918        // When both "notes" and "description" exist, "notes" should win
919        let json = r#"{
920            "id": "both-fields",
921            "name": "dataset-both",
922            "title": "Title",
923            "notes": "Notes description",
924            "description": {"en": "Extras description"}
925        }"#;
926        let dataset: CkanDataset = serde_json::from_str(json).unwrap();
927        let new_ds = CkanClient::into_new_dataset(dataset, "https://example.com", None, "en");
928        assert_eq!(new_ds.description, Some("Notes description".to_string()));
929    }
930
931    #[test]
932    fn test_is_page_size_reducible_timeout() {
933        assert!(CkanClient::is_page_size_reducible(&AppError::Timeout(30)));
934    }
935
936    #[test]
937    fn test_is_page_size_reducible_client_error() {
938        let err = AppError::ClientError("error decoding response body".to_string());
939        assert!(!CkanClient::is_page_size_reducible(&err));
940    }
941
942    #[test]
943    fn test_is_page_size_reducible_network_error() {
944        let err = AppError::NetworkError("connection reset".to_string());
945        assert!(CkanClient::is_page_size_reducible(&err));
946    }
947
948    #[test]
949    fn test_is_page_size_reducible_non_reducible() {
950        assert!(!CkanClient::is_page_size_reducible(
951            &AppError::RateLimitExceeded
952        ));
953        assert!(!CkanClient::is_page_size_reducible(&AppError::Generic(
954            "something".to_string()
955        )));
956    }
957}
958
959// =============================================================================
960// Trait Implementations: PortalClient and PortalClientFactory
961// =============================================================================
962
963impl ceres_core::traits::PortalClient for CkanClient {
964    type PortalData = CkanDataset;
965
966    fn portal_type(&self) -> &'static str {
967        "ckan"
968    }
969
970    fn base_url(&self) -> &str {
971        self.base_url.as_str()
972    }
973
974    async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
975        self.list_package_ids().await
976    }
977
978    async fn get_dataset(&self, id: &str) -> Result<Self::PortalData, AppError> {
979        self.show_package(id).await
980    }
981
982    fn into_new_dataset(
983        data: Self::PortalData,
984        portal_url: &str,
985        url_template: Option<&str>,
986        language: &str,
987    ) -> NewDataset {
988        CkanClient::into_new_dataset(data, portal_url, url_template, language)
989    }
990
991    async fn search_modified_since(
992        &self,
993        since: DateTime<Utc>,
994    ) -> Result<Vec<Self::PortalData>, AppError> {
995        self.search_modified_since(since).await
996    }
997
998    async fn search_all_datasets(&self) -> Result<Vec<Self::PortalData>, AppError> {
999        self.search_all_datasets().await
1000    }
1001
1002    fn search_all_datasets_stream(&self) -> BoxStream<'_, Result<Vec<Self::PortalData>, AppError>> {
1003        self.paginated_search_stream(None)
1004    }
1005
1006    async fn dataset_count(&self) -> Result<usize, AppError> {
1007        self.dataset_count().await
1008    }
1009}
1010
1011/// Factory for creating CKAN portal clients.
1012#[derive(Debug, Clone, Default)]
1013pub struct CkanClientFactory;
1014
1015impl CkanClientFactory {}
1016
1017impl ceres_core::traits::PortalClientFactory for CkanClientFactory {
1018    type Client = CkanClient;
1019
1020    fn create(
1021        &self,
1022        portal_url: &str,
1023        portal_type: ceres_core::config::PortalType,
1024        _language: &str,
1025    ) -> Result<Self::Client, AppError> {
1026        match portal_type {
1027            ceres_core::config::PortalType::Ckan => CkanClient::new(portal_url),
1028            other => Err(AppError::ConfigError(format!(
1029                "CkanClientFactory can only create CKAN clients, but portal type {:?} was requested for URL {}",
1030                other, portal_url
1031            ))),
1032        }
1033    }
1034}