ceres_client/ckan.rs
1//! CKAN client for harvesting datasets from CKAN-compatible open data portals.
2//!
3//! # Future Extensions
4//!
5//! TODO: Add support for other portal types (roadmap v0.2):
6//! - Socrata API (used by many US cities): <https://dev.socrata.com/>
7//! - DCAT-AP harvester for EU portals: <https://joinup.ec.europa.eu/collection/semantic-interoperability-community-semic/solution/dcat-application-profile-data-portals-europe>
8//!
9//! Consider creating a `PortalClient` trait that abstracts over different portal types:
10//! ```ignore
11//! pub trait PortalClient {
12//! async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError>;
13//! async fn get_dataset(&self, id: &str) -> Result<NewDataset, AppError>;
14//! }
15//! ```
16
17use ceres_core::error::AppError;
18use ceres_core::models::NewDataset;
19use ceres_core::HttpConfig;
20use reqwest::{Client, StatusCode, Url};
21use serde::Deserialize;
22use serde_json::Value;
23use tokio::time::sleep;
24
25/// Generic wrapper for CKAN API responses.
26///
27/// CKAN API reference: <https://docs.ckan.org/en/2.9/api/>
28///
29/// CKAN always returns responses with the structure:
30/// ```json
31/// {
32/// "success": bool,
33/// "result": T
34/// }
35/// ```
36#[derive(Deserialize, Debug)]
37struct CkanResponse<T> {
38 success: bool,
39 result: T,
40}
41
42/// Data Transfer Object for CKAN dataset details.
43///
44/// This structure represents the core fields returned by the CKAN `package_show` API.
45/// Additional fields returned by CKAN are captured in the `extras` map.
46///
47/// # Examples
48///
49/// ```
50/// use ceres_client::ckan::CkanDataset;
51///
52/// let json = r#"{
53/// "id": "dataset-123",
54/// "name": "my-dataset",
55/// "title": "My Dataset",
56/// "notes": "Description of the dataset",
57/// "organization": {"name": "test-org"}
58/// }"#;
59///
60/// let dataset: CkanDataset = serde_json::from_str(json).unwrap();
61/// assert_eq!(dataset.id, "dataset-123");
62/// assert_eq!(dataset.title, "My Dataset");
63/// assert!(dataset.extras.contains_key("organization"));
64/// ```
65#[derive(Deserialize, Debug, Clone)]
66pub struct CkanDataset {
67 /// Unique identifier for the dataset
68 pub id: String,
69 /// URL-friendly name/slug of the dataset
70 pub name: String,
71 /// Human-readable title of the dataset
72 pub title: String,
73 /// Optional description/notes about the dataset
74 pub notes: Option<String>,
75 /// All other fields returned by CKAN (e.g., organization, tags, resources)
76 #[serde(flatten)]
77 pub extras: serde_json::Map<String, Value>,
78}
79
80/// HTTP client for interacting with CKAN open data portals.
81///
82/// CKAN (Comprehensive Knowledge Archive Network) is an open-source data management
83/// system used by many government open data portals worldwide.
84///
85/// # Examples
86///
87/// ```no_run
88/// use ceres_client::CkanClient;
89///
90/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
91/// let client = CkanClient::new("https://dati.gov.it")?;
92/// let dataset_ids = client.list_package_ids().await?;
93/// println!("Found {} datasets", dataset_ids.len());
94/// # Ok(())
95/// # }
96/// ```
97#[derive(Clone)]
98pub struct CkanClient {
99 client: Client,
100 base_url: Url,
101}
102
103impl CkanClient {
104 /// Creates a new CKAN client for the specified portal.
105 ///
106 /// # Arguments
107 ///
108 /// * `base_url_str` - The base URL of the CKAN portal (e.g., <https://dati.gov.it>)
109 ///
110 /// # Returns
111 ///
112 /// Returns a configured `CkanClient` instance.
113 ///
114 /// # Errors
115 ///
116 /// Returns `AppError::Generic` if the URL is invalid or malformed.
117 /// Returns `AppError::ClientError` if the HTTP client cannot be built.
118 // TODO(validation): Add optional portal validation on construction
119 // Could probe /api/3/action/site_read to verify it's a valid CKAN portal.
120 // Add: pub async fn new_validated(url: &str) -> Result<Self, AppError>
121 pub fn new(base_url_str: &str) -> Result<Self, AppError> {
122 let base_url = Url::parse(base_url_str)
123 .map_err(|_| AppError::Generic(format!("Invalid CKAN URL: {}", base_url_str)))?;
124
125 let http_config = HttpConfig::default();
126 let client = Client::builder()
127 // TODO(config): Make User-Agent configurable or use version from Cargo.toml
128 .user_agent("Ceres/0.1 (semantic-search-bot)")
129 .timeout(http_config.timeout)
130 .build()
131 .map_err(|e| AppError::ClientError(e.to_string()))?;
132
133 Ok(Self { client, base_url })
134 }
135
136 /// Fetches the complete list of dataset IDs from the CKAN portal.
137 ///
138 /// This method calls the CKAN `package_list` API endpoint, which returns
139 /// all dataset identifiers available in the portal.
140 ///
141 /// # Returns
142 ///
143 /// A vector of dataset ID strings.
144 ///
145 /// # Errors
146 ///
147 /// Returns `AppError::ClientError` if the HTTP request fails.
148 /// Returns `AppError::Generic` if the CKAN API returns an error.
149 ///
150 /// # Performance Note
151 ///
152 /// TODO(performance): Add pagination for large portals
153 /// Large portals can have 100k+ datasets. CKAN supports limit/offset params.
154 /// Consider: `list_package_ids_paginated(limit: usize, offset: usize)`
155 /// Or streaming: `list_package_ids_stream() -> impl Stream<Item = ...>`
156 pub async fn list_package_ids(&self) -> Result<Vec<String>, AppError> {
157 let url = self
158 .base_url
159 .join("api/3/action/package_list")
160 .map_err(|e| AppError::Generic(e.to_string()))?;
161
162 let resp = self.request_with_retry(&url).await?;
163
164 let ckan_resp: CkanResponse<Vec<String>> = resp
165 .json()
166 .await
167 .map_err(|e| AppError::ClientError(e.to_string()))?;
168
169 if !ckan_resp.success {
170 return Err(AppError::Generic(
171 "CKAN API returned success: false".to_string(),
172 ));
173 }
174
175 Ok(ckan_resp.result)
176 }
177
178 /// Fetches the full details of a specific dataset by ID.
179 ///
180 /// This method calls the CKAN `package_show` API endpoint to retrieve
181 /// complete metadata for a single dataset.
182 ///
183 /// # Arguments
184 ///
185 /// * `id` - The unique identifier or name slug of the dataset
186 ///
187 /// # Returns
188 ///
189 /// A `CkanDataset` containing the dataset's metadata.
190 pub async fn show_package(&self, id: &str) -> Result<CkanDataset, AppError> {
191 let mut url = self
192 .base_url
193 .join("api/3/action/package_show")
194 .map_err(|e| AppError::Generic(e.to_string()))?;
195
196 url.query_pairs_mut().append_pair("id", id);
197
198 let resp = self.request_with_retry(&url).await?;
199
200 let ckan_resp: CkanResponse<CkanDataset> = resp
201 .json()
202 .await
203 .map_err(|e| AppError::ClientError(e.to_string()))?;
204
205 if !ckan_resp.success {
206 return Err(AppError::Generic(format!(
207 "CKAN failed to show package {}",
208 id
209 )));
210 }
211
212 Ok(ckan_resp.result)
213 }
214
215 // TODO(observability): Add detailed retry logging
216 // Should log: (1) Attempt number and delay, (2) Reason for retry,
217 // (3) Final error if all retries exhausted. Use tracing crate.
218 async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
219 let http_config = HttpConfig::default();
220 let max_retries = http_config.max_retries;
221 let base_delay = http_config.retry_base_delay;
222 let mut last_error = AppError::Generic("No attempts made".to_string());
223
224 for attempt in 1..=max_retries {
225 match self.client.get(url.clone()).send().await {
226 Ok(resp) => {
227 let status = resp.status();
228
229 if status.is_success() {
230 return Ok(resp);
231 }
232
233 if status == StatusCode::TOO_MANY_REQUESTS {
234 last_error = AppError::RateLimitExceeded;
235 if attempt < max_retries {
236 let delay = base_delay * 2_u32.pow(attempt);
237 sleep(delay).await;
238 continue;
239 }
240 }
241
242 if status.is_server_error() {
243 last_error = AppError::ClientError(format!(
244 "Server error: HTTP {}",
245 status.as_u16()
246 ));
247 if attempt < max_retries {
248 let delay = base_delay * attempt;
249 sleep(delay).await;
250 continue;
251 }
252 }
253
254 return Err(AppError::ClientError(format!(
255 "HTTP {} from {}",
256 status.as_u16(),
257 url
258 )));
259 }
260 Err(e) => {
261 if e.is_timeout() {
262 last_error = AppError::Timeout(http_config.timeout.as_secs());
263 } else if e.is_connect() {
264 last_error = AppError::NetworkError(format!("Connection failed: {}", e));
265 } else {
266 last_error = AppError::ClientError(e.to_string());
267 }
268
269 if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
270 let delay = base_delay * attempt;
271 sleep(delay).await;
272 continue;
273 }
274 }
275 }
276 }
277
278 Err(last_error)
279 }
280
281 /// Converts a CKAN dataset into Ceres' internal `NewDataset` model.
282 ///
283 /// This helper method transforms CKAN-specific data structures into the format
284 /// used by Ceres for database storage.
285 ///
286 /// # Arguments
287 ///
288 /// * `dataset` - The CKAN dataset to convert
289 /// * `portal_url` - The base URL of the CKAN portal
290 ///
291 /// # Returns
292 ///
293 /// A `NewDataset` ready to be inserted into the database.
294 ///
295 /// # Examples
296 ///
297 /// ```
298 /// use ceres_client::CkanClient;
299 /// use ceres_client::ckan::CkanDataset;
300 ///
301 /// let ckan_dataset = CkanDataset {
302 /// id: "abc-123".to_string(),
303 /// name: "air-quality-data".to_string(),
304 /// title: "Air Quality Monitoring".to_string(),
305 /// notes: Some("Data from air quality sensors".to_string()),
306 /// extras: serde_json::Map::new(),
307 /// };
308 ///
309 /// let new_dataset = CkanClient::into_new_dataset(
310 /// ckan_dataset,
311 /// "https://dati.gov.it"
312 /// );
313 ///
314 /// assert_eq!(new_dataset.original_id, "abc-123");
315 /// assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/air-quality-data");
316 /// assert_eq!(new_dataset.title, "Air Quality Monitoring");
317 /// ```
318 pub fn into_new_dataset(dataset: CkanDataset, portal_url: &str) -> NewDataset {
319 let landing_page = format!(
320 "{}/dataset/{}",
321 portal_url.trim_end_matches('/'),
322 dataset.name
323 );
324
325 let metadata_json = serde_json::Value::Object(dataset.extras.clone());
326
327 // Compute content hash for delta detection
328 let content_hash =
329 NewDataset::compute_content_hash(&dataset.title, dataset.notes.as_deref());
330
331 NewDataset {
332 original_id: dataset.id,
333 source_portal: portal_url.to_string(),
334 url: landing_page,
335 title: dataset.title,
336 description: dataset.notes,
337 embedding: None,
338 metadata: metadata_json,
339 content_hash,
340 }
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347
348 #[test]
349 fn test_new_with_valid_url() {
350 let result = CkanClient::new("https://dati.gov.it");
351 assert!(result.is_ok());
352 let client = result.unwrap();
353 assert_eq!(client.base_url.as_str(), "https://dati.gov.it/");
354 }
355
356 #[test]
357 fn test_new_with_invalid_url() {
358 let result = CkanClient::new("not-a-valid-url");
359 assert!(result.is_err());
360
361 if let Err(AppError::Generic(msg)) = result {
362 assert!(msg.contains("Invalid CKAN URL"));
363 } else {
364 panic!("Expected AppError::Generic");
365 }
366 }
367
368 #[test]
369 fn test_into_new_dataset_basic() {
370 let ckan_dataset = CkanDataset {
371 id: "dataset-123".to_string(),
372 name: "my-dataset".to_string(),
373 title: "My Dataset".to_string(),
374 notes: Some("This is a test dataset".to_string()),
375 extras: serde_json::Map::new(),
376 };
377
378 let portal_url = "https://dati.gov.it";
379 let new_dataset = CkanClient::into_new_dataset(ckan_dataset.clone(), portal_url);
380
381 assert_eq!(new_dataset.original_id, "dataset-123");
382 assert_eq!(new_dataset.source_portal, "https://dati.gov.it");
383 assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/my-dataset");
384 assert_eq!(new_dataset.title, "My Dataset");
385 assert!(new_dataset.embedding.is_none());
386
387 // Verify content hash is computed correctly
388 let expected_hash =
389 NewDataset::compute_content_hash(&ckan_dataset.title, ckan_dataset.notes.as_deref());
390 assert_eq!(new_dataset.content_hash, expected_hash);
391 assert_eq!(new_dataset.content_hash.len(), 64);
392 }
393
394 #[test]
395 fn test_ckan_response_deserialization() {
396 let json = r#"{
397 "success": true,
398 "result": ["dataset-1", "dataset-2", "dataset-3"]
399 }"#;
400
401 let response: CkanResponse<Vec<String>> = serde_json::from_str(json).unwrap();
402 assert!(response.success);
403 assert_eq!(response.result.len(), 3);
404 }
405
406 #[test]
407 fn test_ckan_dataset_deserialization() {
408 let json = r#"{
409 "id": "test-id",
410 "name": "test-name",
411 "title": "Test Title",
412 "notes": "Test notes",
413 "organization": {
414 "name": "test-org"
415 }
416 }"#;
417
418 let dataset: CkanDataset = serde_json::from_str(json).unwrap();
419 assert_eq!(dataset.id, "test-id");
420 assert_eq!(dataset.name, "test-name");
421 assert!(dataset.extras.contains_key("organization"));
422 }
423}