ceres_client/
ckan.rs

1//! CKAN client for harvesting datasets from CKAN-compatible open data portals.
2//!
3//! # Future Extensions
4//!
5//! TODO: Add support for other portal types (roadmap v0.2):
6//! - Socrata API (used by many US cities): <https://dev.socrata.com/>
7//! - DCAT-AP harvester for EU portals: <https://joinup.ec.europa.eu/collection/semantic-interoperability-community-semic/solution/dcat-application-profile-data-portals-europe>
8//!
9//! Consider creating a `PortalClient` trait that abstracts over different portal types:
10//! ```ignore
11//! pub trait PortalClient {
12//!     async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError>;
13//!     async fn get_dataset(&self, id: &str) -> Result<NewDataset, AppError>;
14//! }
15//! ```
16
17use ceres_core::error::AppError;
18use ceres_core::models::NewDataset;
19use ceres_core::HttpConfig;
20use reqwest::{Client, StatusCode, Url};
21use serde::Deserialize;
22use serde_json::Value;
23use tokio::time::sleep;
24
25/// Generic wrapper for CKAN API responses.
26///
27/// CKAN API reference: <https://docs.ckan.org/en/2.9/api/>
28///
29/// CKAN always returns responses with the structure:
30/// ```json
31/// {
32///     "success": bool,
33///     "result": T
34/// }
35/// ```
36#[derive(Deserialize, Debug)]
37struct CkanResponse<T> {
38    success: bool,
39    result: T,
40}
41
42/// Data Transfer Object for CKAN dataset details.
43///
44/// This structure represents the core fields returned by the CKAN `package_show` API.
45/// Additional fields returned by CKAN are captured in the `extras` map.
46///
47/// # Examples
48///
49/// ```
50/// use ceres_client::ckan::CkanDataset;
51///
52/// let json = r#"{
53///     "id": "dataset-123",
54///     "name": "my-dataset",
55///     "title": "My Dataset",
56///     "notes": "Description of the dataset",
57///     "organization": {"name": "test-org"}
58/// }"#;
59///
60/// let dataset: CkanDataset = serde_json::from_str(json).unwrap();
61/// assert_eq!(dataset.id, "dataset-123");
62/// assert_eq!(dataset.title, "My Dataset");
63/// assert!(dataset.extras.contains_key("organization"));
64/// ```
65#[derive(Deserialize, Debug, Clone)]
66pub struct CkanDataset {
67    /// Unique identifier for the dataset
68    pub id: String,
69    /// URL-friendly name/slug of the dataset
70    pub name: String,
71    /// Human-readable title of the dataset
72    pub title: String,
73    /// Optional description/notes about the dataset
74    pub notes: Option<String>,
75    /// All other fields returned by CKAN (e.g., organization, tags, resources)
76    #[serde(flatten)]
77    pub extras: serde_json::Map<String, Value>,
78}
79
80/// HTTP client for interacting with CKAN open data portals.
81///
82/// CKAN (Comprehensive Knowledge Archive Network) is an open-source data management
83/// system used by many government open data portals worldwide.
84///
85/// # Examples
86///
87/// ```no_run
88/// use ceres_client::CkanClient;
89///
90/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
91/// let client = CkanClient::new("https://dati.gov.it")?;
92/// let dataset_ids = client.list_package_ids().await?;
93/// println!("Found {} datasets", dataset_ids.len());
94/// # Ok(())
95/// # }
96/// ```
97#[derive(Clone)]
98pub struct CkanClient {
99    client: Client,
100    base_url: Url,
101}
102
103impl CkanClient {
104    /// Creates a new CKAN client for the specified portal.
105    ///
106    /// # Arguments
107    ///
108    /// * `base_url_str` - The base URL of the CKAN portal (e.g., <https://dati.gov.it>)
109    ///
110    /// # Returns
111    ///
112    /// Returns a configured `CkanClient` instance.
113    ///
114    /// # Errors
115    ///
116    /// Returns `AppError::Generic` if the URL is invalid or malformed.
117    /// Returns `AppError::ClientError` if the HTTP client cannot be built.
118    // TODO(validation): Add optional portal validation on construction
119    // Could probe /api/3/action/site_read to verify it's a valid CKAN portal.
120    // Add: pub async fn new_validated(url: &str) -> Result<Self, AppError>
121    pub fn new(base_url_str: &str) -> Result<Self, AppError> {
122        let base_url = Url::parse(base_url_str)
123            .map_err(|_| AppError::Generic(format!("Invalid CKAN URL: {}", base_url_str)))?;
124
125        let http_config = HttpConfig::default();
126        let client = Client::builder()
127            // TODO(config): Make User-Agent configurable or use version from Cargo.toml
128            .user_agent("Ceres/0.1 (semantic-search-bot)")
129            .timeout(http_config.timeout)
130            .build()
131            .map_err(|e| AppError::ClientError(e.to_string()))?;
132
133        Ok(Self { client, base_url })
134    }
135
136    /// Fetches the complete list of dataset IDs from the CKAN portal.
137    ///
138    /// This method calls the CKAN `package_list` API endpoint, which returns
139    /// all dataset identifiers available in the portal.
140    ///
141    /// # Returns
142    ///
143    /// A vector of dataset ID strings.
144    ///
145    /// # Errors
146    ///
147    /// Returns `AppError::ClientError` if the HTTP request fails.
148    /// Returns `AppError::Generic` if the CKAN API returns an error.
149    ///
150    /// # Performance Note
151    ///
152    /// TODO(performance): Add pagination for large portals
153    /// Large portals can have 100k+ datasets. CKAN supports limit/offset params.
154    /// Consider: `list_package_ids_paginated(limit: usize, offset: usize)`
155    /// Or streaming: `list_package_ids_stream() -> impl Stream<Item = ...>`
156    pub async fn list_package_ids(&self) -> Result<Vec<String>, AppError> {
157        let url = self
158            .base_url
159            .join("api/3/action/package_list")
160            .map_err(|e| AppError::Generic(e.to_string()))?;
161
162        let resp = self.request_with_retry(&url).await?;
163
164        let ckan_resp: CkanResponse<Vec<String>> = resp
165            .json()
166            .await
167            .map_err(|e| AppError::ClientError(e.to_string()))?;
168
169        if !ckan_resp.success {
170            return Err(AppError::Generic(
171                "CKAN API returned success: false".to_string(),
172            ));
173        }
174
175        Ok(ckan_resp.result)
176    }
177
178    /// Fetches the full details of a specific dataset by ID.
179    ///
180    /// This method calls the CKAN `package_show` API endpoint to retrieve
181    /// complete metadata for a single dataset.
182    ///
183    /// # Arguments
184    ///
185    /// * `id` - The unique identifier or name slug of the dataset
186    ///
187    /// # Returns
188    ///
189    /// A `CkanDataset` containing the dataset's metadata.
190    pub async fn show_package(&self, id: &str) -> Result<CkanDataset, AppError> {
191        let mut url = self
192            .base_url
193            .join("api/3/action/package_show")
194            .map_err(|e| AppError::Generic(e.to_string()))?;
195
196        url.query_pairs_mut().append_pair("id", id);
197
198        let resp = self.request_with_retry(&url).await?;
199
200        let ckan_resp: CkanResponse<CkanDataset> = resp
201            .json()
202            .await
203            .map_err(|e| AppError::ClientError(e.to_string()))?;
204
205        if !ckan_resp.success {
206            return Err(AppError::Generic(format!(
207                "CKAN failed to show package {}",
208                id
209            )));
210        }
211
212        Ok(ckan_resp.result)
213    }
214
215    // TODO(observability): Add detailed retry logging
216    // Should log: (1) Attempt number and delay, (2) Reason for retry,
217    // (3) Final error if all retries exhausted. Use tracing crate.
218    async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
219        let http_config = HttpConfig::default();
220        let max_retries = http_config.max_retries;
221        let base_delay = http_config.retry_base_delay;
222        let mut last_error = AppError::Generic("No attempts made".to_string());
223
224        for attempt in 1..=max_retries {
225            match self.client.get(url.clone()).send().await {
226                Ok(resp) => {
227                    let status = resp.status();
228
229                    if status.is_success() {
230                        return Ok(resp);
231                    }
232
233                    if status == StatusCode::TOO_MANY_REQUESTS {
234                        last_error = AppError::RateLimitExceeded;
235                        if attempt < max_retries {
236                            let delay = base_delay * 2_u32.pow(attempt);
237                            sleep(delay).await;
238                            continue;
239                        }
240                    }
241
242                    if status.is_server_error() {
243                        last_error = AppError::ClientError(format!(
244                            "Server error: HTTP {}",
245                            status.as_u16()
246                        ));
247                        if attempt < max_retries {
248                            let delay = base_delay * attempt;
249                            sleep(delay).await;
250                            continue;
251                        }
252                    }
253
254                    return Err(AppError::ClientError(format!(
255                        "HTTP {} from {}",
256                        status.as_u16(),
257                        url
258                    )));
259                }
260                Err(e) => {
261                    if e.is_timeout() {
262                        last_error = AppError::Timeout(http_config.timeout.as_secs());
263                    } else if e.is_connect() {
264                        last_error = AppError::NetworkError(format!("Connection failed: {}", e));
265                    } else {
266                        last_error = AppError::ClientError(e.to_string());
267                    }
268
269                    if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
270                        let delay = base_delay * attempt;
271                        sleep(delay).await;
272                        continue;
273                    }
274                }
275            }
276        }
277
278        Err(last_error)
279    }
280
281    /// Converts a CKAN dataset into Ceres' internal `NewDataset` model.
282    ///
283    /// This helper method transforms CKAN-specific data structures into the format
284    /// used by Ceres for database storage.
285    ///
286    /// # Arguments
287    ///
288    /// * `dataset` - The CKAN dataset to convert
289    /// * `portal_url` - The base URL of the CKAN portal
290    ///
291    /// # Returns
292    ///
293    /// A `NewDataset` ready to be inserted into the database.
294    ///
295    /// # Examples
296    ///
297    /// ```
298    /// use ceres_client::CkanClient;
299    /// use ceres_client::ckan::CkanDataset;
300    ///
301    /// let ckan_dataset = CkanDataset {
302    ///     id: "abc-123".to_string(),
303    ///     name: "air-quality-data".to_string(),
304    ///     title: "Air Quality Monitoring".to_string(),
305    ///     notes: Some("Data from air quality sensors".to_string()),
306    ///     extras: serde_json::Map::new(),
307    /// };
308    ///
309    /// let new_dataset = CkanClient::into_new_dataset(
310    ///     ckan_dataset,
311    ///     "https://dati.gov.it"
312    /// );
313    ///
314    /// assert_eq!(new_dataset.original_id, "abc-123");
315    /// assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/air-quality-data");
316    /// assert_eq!(new_dataset.title, "Air Quality Monitoring");
317    /// ```
318    pub fn into_new_dataset(dataset: CkanDataset, portal_url: &str) -> NewDataset {
319        let landing_page = format!(
320            "{}/dataset/{}",
321            portal_url.trim_end_matches('/'),
322            dataset.name
323        );
324
325        let metadata_json = serde_json::Value::Object(dataset.extras.clone());
326
327        // Compute content hash for delta detection
328        let content_hash =
329            NewDataset::compute_content_hash(&dataset.title, dataset.notes.as_deref());
330
331        NewDataset {
332            original_id: dataset.id,
333            source_portal: portal_url.to_string(),
334            url: landing_page,
335            title: dataset.title,
336            description: dataset.notes,
337            embedding: None,
338            metadata: metadata_json,
339            content_hash,
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347
348    #[test]
349    fn test_new_with_valid_url() {
350        let result = CkanClient::new("https://dati.gov.it");
351        assert!(result.is_ok());
352        let client = result.unwrap();
353        assert_eq!(client.base_url.as_str(), "https://dati.gov.it/");
354    }
355
356    #[test]
357    fn test_new_with_invalid_url() {
358        let result = CkanClient::new("not-a-valid-url");
359        assert!(result.is_err());
360
361        if let Err(AppError::Generic(msg)) = result {
362            assert!(msg.contains("Invalid CKAN URL"));
363        } else {
364            panic!("Expected AppError::Generic");
365        }
366    }
367
368    #[test]
369    fn test_into_new_dataset_basic() {
370        let ckan_dataset = CkanDataset {
371            id: "dataset-123".to_string(),
372            name: "my-dataset".to_string(),
373            title: "My Dataset".to_string(),
374            notes: Some("This is a test dataset".to_string()),
375            extras: serde_json::Map::new(),
376        };
377
378        let portal_url = "https://dati.gov.it";
379        let new_dataset = CkanClient::into_new_dataset(ckan_dataset.clone(), portal_url);
380
381        assert_eq!(new_dataset.original_id, "dataset-123");
382        assert_eq!(new_dataset.source_portal, "https://dati.gov.it");
383        assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/my-dataset");
384        assert_eq!(new_dataset.title, "My Dataset");
385        assert!(new_dataset.embedding.is_none());
386
387        // Verify content hash is computed correctly
388        let expected_hash =
389            NewDataset::compute_content_hash(&ckan_dataset.title, ckan_dataset.notes.as_deref());
390        assert_eq!(new_dataset.content_hash, expected_hash);
391        assert_eq!(new_dataset.content_hash.len(), 64);
392    }
393
394    #[test]
395    fn test_ckan_response_deserialization() {
396        let json = r#"{
397            "success": true,
398            "result": ["dataset-1", "dataset-2", "dataset-3"]
399        }"#;
400
401        let response: CkanResponse<Vec<String>> = serde_json::from_str(json).unwrap();
402        assert!(response.success);
403        assert_eq!(response.result.len(), 3);
404    }
405
406    #[test]
407    fn test_ckan_dataset_deserialization() {
408        let json = r#"{
409            "id": "test-id",
410            "name": "test-name",
411            "title": "Test Title",
412            "notes": "Test notes",
413            "organization": {
414                "name": "test-org"
415            }
416        }"#;
417
418        let dataset: CkanDataset = serde_json::from_str(json).unwrap();
419        assert_eq!(dataset.id, "test-id");
420        assert_eq!(dataset.name, "test-name");
421        assert!(dataset.extras.contains_key("organization"));
422    }
423}