Skip to main content

ceres_client/
ckan.rs

1//! CKAN client for harvesting datasets from CKAN-compatible open data portals.
2//!
3//! # Future Extensions
4//!
5//! TODO: Add support for other portal types (roadmap v0.3+):
6//! - Socrata API (used by many US cities): <https://dev.socrata.com/>
7//! - DCAT-AP harvester for EU portals: <https://joinup.ec.europa.eu/collection/semantic-interoperability-community-semic/solution/dcat-application-profile-data-portals-europe>
8//!
9//! Consider creating a `PortalClient` trait that abstracts over different portal types:
10//! ```ignore
11//! pub trait PortalClient {
12//!     async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError>;
13//!     async fn get_dataset(&self, id: &str) -> Result<NewDataset, AppError>;
14//! }
15//! ```
16
17use ceres_core::HttpConfig;
18use ceres_core::error::AppError;
19use ceres_core::models::NewDataset;
20use chrono::{DateTime, Utc};
21use reqwest::{Client, StatusCode, Url};
22use serde::Deserialize;
23use serde_json::Value;
24use tokio::time::sleep;
25
26/// Generic wrapper for CKAN API responses.
27///
28/// CKAN API reference: <https://docs.ckan.org/en/2.9/api/>
29///
30/// CKAN always returns responses with the structure:
31/// ```json
32/// {
33///     "success": bool,
34///     "result": T
35/// }
36/// ```
37#[derive(Deserialize, Debug)]
38struct CkanResponse<T> {
39    success: bool,
40    result: T,
41}
42
43/// Response structure for CKAN package_search API.
44#[derive(Deserialize, Debug)]
45struct PackageSearchResult {
46    count: usize,
47    results: Vec<CkanDataset>,
48}
49
50/// Data Transfer Object for CKAN dataset details.
51///
52/// This structure represents the core fields returned by the CKAN `package_show` API.
53/// Additional fields returned by CKAN are captured in the `extras` map.
54///
55/// # Examples
56///
57/// ```
58/// use ceres_client::ckan::CkanDataset;
59///
60/// let json = r#"{
61///     "id": "dataset-123",
62///     "name": "my-dataset",
63///     "title": "My Dataset",
64///     "notes": "Description of the dataset",
65///     "organization": {"name": "test-org"}
66/// }"#;
67///
68/// let dataset: CkanDataset = serde_json::from_str(json).unwrap();
69/// assert_eq!(dataset.id, "dataset-123");
70/// assert_eq!(dataset.title, "My Dataset");
71/// assert!(dataset.extras.contains_key("organization"));
72/// ```
73#[derive(Deserialize, Debug, Clone)]
74pub struct CkanDataset {
75    /// Unique identifier for the dataset
76    pub id: String,
77    /// URL-friendly name/slug of the dataset
78    pub name: String,
79    /// Human-readable title of the dataset
80    pub title: String,
81    /// Optional description/notes about the dataset
82    pub notes: Option<String>,
83    /// All other fields returned by CKAN (e.g., organization, tags, resources)
84    #[serde(flatten)]
85    pub extras: serde_json::Map<String, Value>,
86}
87
88/// HTTP client for interacting with CKAN open data portals.
89///
90/// CKAN (Comprehensive Knowledge Archive Network) is an open-source data management
91/// system used by many government open data portals worldwide.
92///
93/// # Examples
94///
95/// ```no_run
96/// use ceres_client::CkanClient;
97///
98/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
99/// let client = CkanClient::new("https://dati.gov.it")?;
100/// let dataset_ids = client.list_package_ids().await?;
101/// println!("Found {} datasets", dataset_ids.len());
102/// # Ok(())
103/// # }
104/// ```
105#[derive(Clone)]
106pub struct CkanClient {
107    client: Client,
108    base_url: Url,
109}
110
111impl CkanClient {
112    /// Creates a new CKAN client for the specified portal.
113    ///
114    /// # Arguments
115    ///
116    /// * `base_url_str` - The base URL of the CKAN portal (e.g., <https://dati.gov.it>)
117    ///
118    /// # Returns
119    ///
120    /// Returns a configured `CkanClient` instance.
121    ///
122    /// # Errors
123    ///
124    /// Returns `AppError::Generic` if the URL is invalid or malformed.
125    /// Returns `AppError::ClientError` if the HTTP client cannot be built.
126    // TODO(validation): Add optional portal validation on construction
127    // Could probe /api/3/action/site_read to verify it's a valid CKAN portal.
128    // Add: pub async fn new_validated(url: &str) -> Result<Self, AppError>
129    pub fn new(base_url_str: &str) -> Result<Self, AppError> {
130        let base_url = Url::parse(base_url_str)
131            .map_err(|_| AppError::Generic(format!("Invalid CKAN URL: {}", base_url_str)))?;
132
133        let http_config = HttpConfig::default();
134        let client = Client::builder()
135            // TODO(config): Make User-Agent configurable or use version from Cargo.toml
136            .user_agent("Ceres/0.1 (semantic-search-bot)")
137            .timeout(http_config.timeout)
138            .build()
139            .map_err(|e| AppError::ClientError(e.to_string()))?;
140
141        Ok(Self { client, base_url })
142    }
143
144    /// Fetches the complete list of dataset IDs from the CKAN portal.
145    ///
146    /// This method calls the CKAN `package_list` API endpoint, which returns
147    /// all dataset identifiers available in the portal.
148    ///
149    /// # Returns
150    ///
151    /// A vector of dataset ID strings.
152    ///
153    /// # Errors
154    ///
155    /// Returns `AppError::ClientError` if the HTTP request fails.
156    /// Returns `AppError::Generic` if the CKAN API returns an error.
157    ///
158    /// # Performance Note
159    ///
160    /// TODO(performance): Add pagination for large portals
161    /// Large portals can have 100k+ datasets. CKAN supports limit/offset params.
162    /// Consider: `list_package_ids_paginated(limit: usize, offset: usize)`
163    /// Or streaming: `list_package_ids_stream() -> impl Stream<Item = ...>`
164    pub async fn list_package_ids(&self) -> Result<Vec<String>, AppError> {
165        let url = self
166            .base_url
167            .join("api/3/action/package_list")
168            .map_err(|e| AppError::Generic(e.to_string()))?;
169
170        let resp = self.request_with_retry(&url).await?;
171
172        let ckan_resp: CkanResponse<Vec<String>> = resp
173            .json()
174            .await
175            .map_err(|e| AppError::ClientError(e.to_string()))?;
176
177        if !ckan_resp.success {
178            return Err(AppError::Generic(
179                "CKAN API returned success: false".to_string(),
180            ));
181        }
182
183        Ok(ckan_resp.result)
184    }
185
186    /// Fetches the full details of a specific dataset by ID.
187    ///
188    /// This method calls the CKAN `package_show` API endpoint to retrieve
189    /// complete metadata for a single dataset.
190    ///
191    /// # Arguments
192    ///
193    /// * `id` - The unique identifier or name slug of the dataset
194    ///
195    /// # Returns
196    ///
197    /// A `CkanDataset` containing the dataset's metadata.
198    pub async fn show_package(&self, id: &str) -> Result<CkanDataset, AppError> {
199        let mut url = self
200            .base_url
201            .join("api/3/action/package_show")
202            .map_err(|e| AppError::Generic(e.to_string()))?;
203
204        url.query_pairs_mut().append_pair("id", id);
205
206        let resp = self.request_with_retry(&url).await?;
207
208        let ckan_resp: CkanResponse<CkanDataset> = resp
209            .json()
210            .await
211            .map_err(|e| AppError::ClientError(e.to_string()))?;
212
213        if !ckan_resp.success {
214            return Err(AppError::Generic(format!(
215                "CKAN failed to show package {}",
216                id
217            )));
218        }
219
220        Ok(ckan_resp.result)
221    }
222
223    /// Searches for datasets modified since a given timestamp.
224    ///
225    /// Uses CKAN's `package_search` API with a `metadata_modified` filter to fetch
226    /// only datasets that have been updated since the last sync. This enables
227    /// incremental harvesting with ~99% fewer API calls in steady state.
228    ///
229    /// # Arguments
230    ///
231    /// * `since` - Only return datasets modified after this timestamp
232    ///
233    /// # Returns
234    ///
235    /// A vector of `CkanDataset` containing all datasets modified since the given time.
236    /// Unlike `list_package_ids()` + `show_package()`, this returns complete dataset
237    /// objects in a single paginated query.
238    ///
239    /// # Errors
240    ///
241    /// Returns `AppError::ClientError` if the HTTP request fails.
242    /// Returns `AppError::Generic` if the CKAN API returns an error or doesn't support
243    /// the `package_search` endpoint (some older CKAN instances).
244    pub async fn search_modified_since(
245        &self,
246        since: DateTime<Utc>,
247    ) -> Result<Vec<CkanDataset>, AppError> {
248        const PAGE_SIZE: usize = 1000;
249        let mut all_datasets = Vec::new();
250        let mut start: usize = 0;
251
252        // Format timestamp for Solr query: YYYY-MM-DDTHH:MM:SSZ
253        let since_str = since.format("%Y-%m-%dT%H:%M:%SZ").to_string();
254        let fq = format!("metadata_modified:[{} TO *]", since_str);
255
256        loop {
257            let mut url = self
258                .base_url
259                .join("api/3/action/package_search")
260                .map_err(|e| AppError::Generic(e.to_string()))?;
261
262            url.query_pairs_mut()
263                .append_pair("fq", &fq)
264                .append_pair("rows", &PAGE_SIZE.to_string())
265                .append_pair("start", &start.to_string())
266                .append_pair("sort", "metadata_modified asc");
267
268            let resp = self.request_with_retry(&url).await?;
269
270            let ckan_resp: CkanResponse<PackageSearchResult> = resp
271                .json()
272                .await
273                .map_err(|e| AppError::ClientError(e.to_string()))?;
274
275            if !ckan_resp.success {
276                return Err(AppError::Generic(
277                    "CKAN package_search returned success: false".to_string(),
278                ));
279            }
280
281            let page_count = ckan_resp.result.results.len();
282            all_datasets.extend(ckan_resp.result.results);
283
284            // Check if we've fetched all results
285            if start + page_count >= ckan_resp.result.count || page_count < PAGE_SIZE {
286                break;
287            }
288
289            start += PAGE_SIZE;
290        }
291
292        Ok(all_datasets)
293    }
294
295    // TODO(observability): Add detailed retry logging
296    // Should log: (1) Attempt number and delay, (2) Reason for retry,
297    // (3) Final error if all retries exhausted. Use tracing crate.
298    async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
299        let http_config = HttpConfig::default();
300        let max_retries = http_config.max_retries;
301        let base_delay = http_config.retry_base_delay;
302        let mut last_error = AppError::Generic("No attempts made".to_string());
303
304        for attempt in 1..=max_retries {
305            match self.client.get(url.clone()).send().await {
306                Ok(resp) => {
307                    let status = resp.status();
308
309                    if status.is_success() {
310                        return Ok(resp);
311                    }
312
313                    if status == StatusCode::TOO_MANY_REQUESTS {
314                        last_error = AppError::RateLimitExceeded;
315                        if attempt < max_retries {
316                            let delay = base_delay * 2_u32.pow(attempt);
317                            sleep(delay).await;
318                            continue;
319                        }
320                    }
321
322                    if status.is_server_error() {
323                        last_error = AppError::ClientError(format!(
324                            "Server error: HTTP {}",
325                            status.as_u16()
326                        ));
327                        if attempt < max_retries {
328                            let delay = base_delay * attempt;
329                            sleep(delay).await;
330                            continue;
331                        }
332                    }
333
334                    return Err(AppError::ClientError(format!(
335                        "HTTP {} from {}",
336                        status.as_u16(),
337                        url
338                    )));
339                }
340                Err(e) => {
341                    if e.is_timeout() {
342                        last_error = AppError::Timeout(http_config.timeout.as_secs());
343                    } else if e.is_connect() {
344                        last_error = AppError::NetworkError(format!("Connection failed: {}", e));
345                    } else {
346                        last_error = AppError::ClientError(e.to_string());
347                    }
348
349                    if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
350                        let delay = base_delay * attempt;
351                        sleep(delay).await;
352                        continue;
353                    }
354                }
355            }
356        }
357
358        Err(last_error)
359    }
360
361    /// Converts a CKAN dataset into Ceres' internal `NewDataset` model.
362    ///
363    /// This helper method transforms CKAN-specific data structures into the format
364    /// used by Ceres for database storage.
365    ///
366    /// # Arguments
367    ///
368    /// * `dataset` - The CKAN dataset to convert
369    /// * `portal_url` - The base URL of the CKAN portal
370    ///
371    /// # Returns
372    ///
373    /// A `NewDataset` ready to be inserted into the database.
374    ///
375    /// # Examples
376    ///
377    /// ```
378    /// use ceres_client::CkanClient;
379    /// use ceres_client::ckan::CkanDataset;
380    ///
381    /// let ckan_dataset = CkanDataset {
382    ///     id: "abc-123".to_string(),
383    ///     name: "air-quality-data".to_string(),
384    ///     title: "Air Quality Monitoring".to_string(),
385    ///     notes: Some("Data from air quality sensors".to_string()),
386    ///     extras: serde_json::Map::new(),
387    /// };
388    ///
389    /// let new_dataset = CkanClient::into_new_dataset(
390    ///     ckan_dataset,
391    ///     "https://dati.gov.it"
392    /// );
393    ///
394    /// assert_eq!(new_dataset.original_id, "abc-123");
395    /// assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/air-quality-data");
396    /// assert_eq!(new_dataset.title, "Air Quality Monitoring");
397    /// ```
398    pub fn into_new_dataset(dataset: CkanDataset, portal_url: &str) -> NewDataset {
399        let landing_page = format!(
400            "{}/dataset/{}",
401            portal_url.trim_end_matches('/'),
402            dataset.name
403        );
404
405        let metadata_json = serde_json::Value::Object(dataset.extras.clone());
406
407        // Compute content hash for delta detection
408        let content_hash =
409            NewDataset::compute_content_hash(&dataset.title, dataset.notes.as_deref());
410
411        NewDataset {
412            original_id: dataset.id,
413            source_portal: portal_url.to_string(),
414            url: landing_page,
415            title: dataset.title,
416            description: dataset.notes,
417            embedding: None,
418            metadata: metadata_json,
419            content_hash,
420        }
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427
428    #[test]
429    fn test_new_with_valid_url() {
430        let result = CkanClient::new("https://dati.gov.it");
431        assert!(result.is_ok());
432        let client = result.unwrap();
433        assert_eq!(client.base_url.as_str(), "https://dati.gov.it/");
434    }
435
436    #[test]
437    fn test_new_with_invalid_url() {
438        let result = CkanClient::new("not-a-valid-url");
439        assert!(result.is_err());
440
441        if let Err(AppError::Generic(msg)) = result {
442            assert!(msg.contains("Invalid CKAN URL"));
443        } else {
444            panic!("Expected AppError::Generic");
445        }
446    }
447
448    #[test]
449    fn test_into_new_dataset_basic() {
450        let ckan_dataset = CkanDataset {
451            id: "dataset-123".to_string(),
452            name: "my-dataset".to_string(),
453            title: "My Dataset".to_string(),
454            notes: Some("This is a test dataset".to_string()),
455            extras: serde_json::Map::new(),
456        };
457
458        let portal_url = "https://dati.gov.it";
459        let new_dataset = CkanClient::into_new_dataset(ckan_dataset.clone(), portal_url);
460
461        assert_eq!(new_dataset.original_id, "dataset-123");
462        assert_eq!(new_dataset.source_portal, "https://dati.gov.it");
463        assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/my-dataset");
464        assert_eq!(new_dataset.title, "My Dataset");
465        assert!(new_dataset.embedding.is_none());
466
467        // Verify content hash is computed correctly
468        let expected_hash =
469            NewDataset::compute_content_hash(&ckan_dataset.title, ckan_dataset.notes.as_deref());
470        assert_eq!(new_dataset.content_hash, expected_hash);
471        assert_eq!(new_dataset.content_hash.len(), 64);
472    }
473
474    #[test]
475    fn test_ckan_response_deserialization() {
476        let json = r#"{
477            "success": true,
478            "result": ["dataset-1", "dataset-2", "dataset-3"]
479        }"#;
480
481        let response: CkanResponse<Vec<String>> = serde_json::from_str(json).unwrap();
482        assert!(response.success);
483        assert_eq!(response.result.len(), 3);
484    }
485
486    #[test]
487    fn test_ckan_dataset_deserialization() {
488        let json = r#"{
489            "id": "test-id",
490            "name": "test-name",
491            "title": "Test Title",
492            "notes": "Test notes",
493            "organization": {
494                "name": "test-org"
495            }
496        }"#;
497
498        let dataset: CkanDataset = serde_json::from_str(json).unwrap();
499        assert_eq!(dataset.id, "test-id");
500        assert_eq!(dataset.name, "test-name");
501        assert!(dataset.extras.contains_key("organization"));
502    }
503}
504
505// =============================================================================
506// Trait Implementations: PortalClient and PortalClientFactory
507// =============================================================================
508
509impl ceres_core::traits::PortalClient for CkanClient {
510    type PortalData = CkanDataset;
511
512    async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
513        self.list_package_ids().await
514    }
515
516    async fn get_dataset(&self, id: &str) -> Result<Self::PortalData, AppError> {
517        self.show_package(id).await
518    }
519
520    fn into_new_dataset(data: Self::PortalData, portal_url: &str) -> NewDataset {
521        CkanClient::into_new_dataset(data, portal_url)
522    }
523
524    async fn search_modified_since(
525        &self,
526        since: DateTime<Utc>,
527    ) -> Result<Vec<Self::PortalData>, AppError> {
528        self.search_modified_since(since).await
529    }
530}
531
532/// Factory for creating CKAN portal clients.
533#[derive(Debug, Clone, Default)]
534pub struct CkanClientFactory;
535
536impl CkanClientFactory {
537    /// Creates a new CKAN client factory.
538    pub fn new() -> Self {
539        Self
540    }
541}
542
543impl ceres_core::traits::PortalClientFactory for CkanClientFactory {
544    type Client = CkanClient;
545
546    fn create(&self, portal_url: &str) -> Result<Self::Client, AppError> {
547        CkanClient::new(portal_url)
548    }
549}