ceres_client/ckan.rs
1//! CKAN client for harvesting datasets from CKAN-compatible open data portals.
2//!
3//! # Future Extensions
4//!
5//! TODO: Add support for other portal types (roadmap v0.3+):
6//! - Socrata API (used by many US cities): <https://dev.socrata.com/>
7//! - DCAT-AP harvester for EU portals: <https://joinup.ec.europa.eu/collection/semantic-interoperability-community-semic/solution/dcat-application-profile-data-portals-europe>
8//!
9//! Consider creating a `PortalClient` trait that abstracts over different portal types:
10//! ```ignore
11//! pub trait PortalClient {
12//! async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError>;
13//! async fn get_dataset(&self, id: &str) -> Result<NewDataset, AppError>;
14//! }
15//! ```
16
17use ceres_core::HttpConfig;
18use ceres_core::error::AppError;
19use ceres_core::models::NewDataset;
20use chrono::{DateTime, Utc};
21use reqwest::{Client, StatusCode, Url};
22use serde::Deserialize;
23use serde_json::Value;
24use tokio::time::sleep;
25
26/// Generic wrapper for CKAN API responses.
27///
28/// CKAN API reference: <https://docs.ckan.org/en/2.9/api/>
29///
30/// CKAN always returns responses with the structure:
31/// ```json
32/// {
33/// "success": bool,
34/// "result": T
35/// }
36/// ```
37#[derive(Deserialize, Debug)]
38struct CkanResponse<T> {
39 success: bool,
40 result: T,
41}
42
43/// Response structure for CKAN package_search API.
44#[derive(Deserialize, Debug)]
45struct PackageSearchResult {
46 count: usize,
47 results: Vec<CkanDataset>,
48}
49
50/// Data Transfer Object for CKAN dataset details.
51///
52/// This structure represents the core fields returned by the CKAN `package_show` API.
53/// Additional fields returned by CKAN are captured in the `extras` map.
54///
55/// # Examples
56///
57/// ```
58/// use ceres_client::ckan::CkanDataset;
59///
60/// let json = r#"{
61/// "id": "dataset-123",
62/// "name": "my-dataset",
63/// "title": "My Dataset",
64/// "notes": "Description of the dataset",
65/// "organization": {"name": "test-org"}
66/// }"#;
67///
68/// let dataset: CkanDataset = serde_json::from_str(json).unwrap();
69/// assert_eq!(dataset.id, "dataset-123");
70/// assert_eq!(dataset.title, "My Dataset");
71/// assert!(dataset.extras.contains_key("organization"));
72/// ```
73#[derive(Deserialize, Debug, Clone)]
74pub struct CkanDataset {
75 /// Unique identifier for the dataset
76 pub id: String,
77 /// URL-friendly name/slug of the dataset
78 pub name: String,
79 /// Human-readable title of the dataset
80 pub title: String,
81 /// Optional description/notes about the dataset
82 pub notes: Option<String>,
83 /// All other fields returned by CKAN (e.g., organization, tags, resources)
84 #[serde(flatten)]
85 pub extras: serde_json::Map<String, Value>,
86}
87
88/// HTTP client for interacting with CKAN open data portals.
89///
90/// CKAN (Comprehensive Knowledge Archive Network) is an open-source data management
91/// system used by many government open data portals worldwide.
92///
93/// # Examples
94///
95/// ```no_run
96/// use ceres_client::CkanClient;
97///
98/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
99/// let client = CkanClient::new("https://dati.gov.it")?;
100/// let dataset_ids = client.list_package_ids().await?;
101/// println!("Found {} datasets", dataset_ids.len());
102/// # Ok(())
103/// # }
104/// ```
105#[derive(Clone)]
106pub struct CkanClient {
107 client: Client,
108 base_url: Url,
109}
110
111impl CkanClient {
112 /// Creates a new CKAN client for the specified portal.
113 ///
114 /// # Arguments
115 ///
116 /// * `base_url_str` - The base URL of the CKAN portal (e.g., <https://dati.gov.it>)
117 ///
118 /// # Returns
119 ///
120 /// Returns a configured `CkanClient` instance.
121 ///
122 /// # Errors
123 ///
124 /// Returns `AppError::Generic` if the URL is invalid or malformed.
125 /// Returns `AppError::ClientError` if the HTTP client cannot be built.
126 // TODO(validation): Add optional portal validation on construction
127 // Could probe /api/3/action/site_read to verify it's a valid CKAN portal.
128 // Add: pub async fn new_validated(url: &str) -> Result<Self, AppError>
129 pub fn new(base_url_str: &str) -> Result<Self, AppError> {
130 let base_url = Url::parse(base_url_str)
131 .map_err(|_| AppError::Generic(format!("Invalid CKAN URL: {}", base_url_str)))?;
132
133 let http_config = HttpConfig::default();
134 let client = Client::builder()
135 // TODO(config): Make User-Agent configurable or use version from Cargo.toml
136 .user_agent("Ceres/0.1 (semantic-search-bot)")
137 .timeout(http_config.timeout)
138 .build()
139 .map_err(|e| AppError::ClientError(e.to_string()))?;
140
141 Ok(Self { client, base_url })
142 }
143
144 /// Fetches the complete list of dataset IDs from the CKAN portal.
145 ///
146 /// This method calls the CKAN `package_list` API endpoint, which returns
147 /// all dataset identifiers available in the portal.
148 ///
149 /// # Returns
150 ///
151 /// A vector of dataset ID strings.
152 ///
153 /// # Errors
154 ///
155 /// Returns `AppError::ClientError` if the HTTP request fails.
156 /// Returns `AppError::Generic` if the CKAN API returns an error.
157 ///
158 /// # Performance Note
159 ///
160 /// TODO(performance): Add pagination for large portals
161 /// Large portals can have 100k+ datasets. CKAN supports limit/offset params.
162 /// Consider: `list_package_ids_paginated(limit: usize, offset: usize)`
163 /// Or streaming: `list_package_ids_stream() -> impl Stream<Item = ...>`
164 pub async fn list_package_ids(&self) -> Result<Vec<String>, AppError> {
165 let url = self
166 .base_url
167 .join("api/3/action/package_list")
168 .map_err(|e| AppError::Generic(e.to_string()))?;
169
170 let resp = self.request_with_retry(&url).await?;
171
172 let ckan_resp: CkanResponse<Vec<String>> = resp
173 .json()
174 .await
175 .map_err(|e| AppError::ClientError(e.to_string()))?;
176
177 if !ckan_resp.success {
178 return Err(AppError::Generic(
179 "CKAN API returned success: false".to_string(),
180 ));
181 }
182
183 Ok(ckan_resp.result)
184 }
185
186 /// Fetches the full details of a specific dataset by ID.
187 ///
188 /// This method calls the CKAN `package_show` API endpoint to retrieve
189 /// complete metadata for a single dataset.
190 ///
191 /// # Arguments
192 ///
193 /// * `id` - The unique identifier or name slug of the dataset
194 ///
195 /// # Returns
196 ///
197 /// A `CkanDataset` containing the dataset's metadata.
198 pub async fn show_package(&self, id: &str) -> Result<CkanDataset, AppError> {
199 let mut url = self
200 .base_url
201 .join("api/3/action/package_show")
202 .map_err(|e| AppError::Generic(e.to_string()))?;
203
204 url.query_pairs_mut().append_pair("id", id);
205
206 let resp = self.request_with_retry(&url).await?;
207
208 let ckan_resp: CkanResponse<CkanDataset> = resp
209 .json()
210 .await
211 .map_err(|e| AppError::ClientError(e.to_string()))?;
212
213 if !ckan_resp.success {
214 return Err(AppError::Generic(format!(
215 "CKAN failed to show package {}",
216 id
217 )));
218 }
219
220 Ok(ckan_resp.result)
221 }
222
223 /// Searches for datasets modified since a given timestamp.
224 ///
225 /// Uses CKAN's `package_search` API with a `metadata_modified` filter to fetch
226 /// only datasets that have been updated since the last sync. This enables
227 /// incremental harvesting with ~99% fewer API calls in steady state.
228 ///
229 /// # Arguments
230 ///
231 /// * `since` - Only return datasets modified after this timestamp
232 ///
233 /// # Returns
234 ///
235 /// A vector of `CkanDataset` containing all datasets modified since the given time.
236 /// Unlike `list_package_ids()` + `show_package()`, this returns complete dataset
237 /// objects in a single paginated query.
238 ///
239 /// # Errors
240 ///
241 /// Returns `AppError::ClientError` if the HTTP request fails.
242 /// Returns `AppError::Generic` if the CKAN API returns an error or doesn't support
243 /// the `package_search` endpoint (some older CKAN instances).
244 pub async fn search_modified_since(
245 &self,
246 since: DateTime<Utc>,
247 ) -> Result<Vec<CkanDataset>, AppError> {
248 const PAGE_SIZE: usize = 1000;
249 let mut all_datasets = Vec::new();
250 let mut start: usize = 0;
251
252 // Format timestamp for Solr query: YYYY-MM-DDTHH:MM:SSZ
253 let since_str = since.format("%Y-%m-%dT%H:%M:%SZ").to_string();
254 let fq = format!("metadata_modified:[{} TO *]", since_str);
255
256 loop {
257 let mut url = self
258 .base_url
259 .join("api/3/action/package_search")
260 .map_err(|e| AppError::Generic(e.to_string()))?;
261
262 url.query_pairs_mut()
263 .append_pair("fq", &fq)
264 .append_pair("rows", &PAGE_SIZE.to_string())
265 .append_pair("start", &start.to_string())
266 .append_pair("sort", "metadata_modified asc");
267
268 let resp = self.request_with_retry(&url).await?;
269
270 let ckan_resp: CkanResponse<PackageSearchResult> = resp
271 .json()
272 .await
273 .map_err(|e| AppError::ClientError(e.to_string()))?;
274
275 if !ckan_resp.success {
276 return Err(AppError::Generic(
277 "CKAN package_search returned success: false".to_string(),
278 ));
279 }
280
281 let page_count = ckan_resp.result.results.len();
282 all_datasets.extend(ckan_resp.result.results);
283
284 // Check if we've fetched all results
285 if start + page_count >= ckan_resp.result.count || page_count < PAGE_SIZE {
286 break;
287 }
288
289 start += PAGE_SIZE;
290 }
291
292 Ok(all_datasets)
293 }
294
295 // TODO(observability): Add detailed retry logging
296 // Should log: (1) Attempt number and delay, (2) Reason for retry,
297 // (3) Final error if all retries exhausted. Use tracing crate.
298 async fn request_with_retry(&self, url: &Url) -> Result<reqwest::Response, AppError> {
299 let http_config = HttpConfig::default();
300 let max_retries = http_config.max_retries;
301 let base_delay = http_config.retry_base_delay;
302 let mut last_error = AppError::Generic("No attempts made".to_string());
303
304 for attempt in 1..=max_retries {
305 match self.client.get(url.clone()).send().await {
306 Ok(resp) => {
307 let status = resp.status();
308
309 if status.is_success() {
310 return Ok(resp);
311 }
312
313 if status == StatusCode::TOO_MANY_REQUESTS {
314 last_error = AppError::RateLimitExceeded;
315 if attempt < max_retries {
316 let delay = base_delay * 2_u32.pow(attempt);
317 sleep(delay).await;
318 continue;
319 }
320 }
321
322 if status.is_server_error() {
323 last_error = AppError::ClientError(format!(
324 "Server error: HTTP {}",
325 status.as_u16()
326 ));
327 if attempt < max_retries {
328 let delay = base_delay * attempt;
329 sleep(delay).await;
330 continue;
331 }
332 }
333
334 return Err(AppError::ClientError(format!(
335 "HTTP {} from {}",
336 status.as_u16(),
337 url
338 )));
339 }
340 Err(e) => {
341 if e.is_timeout() {
342 last_error = AppError::Timeout(http_config.timeout.as_secs());
343 } else if e.is_connect() {
344 last_error = AppError::NetworkError(format!("Connection failed: {}", e));
345 } else {
346 last_error = AppError::ClientError(e.to_string());
347 }
348
349 if attempt < max_retries && (e.is_timeout() || e.is_connect()) {
350 let delay = base_delay * attempt;
351 sleep(delay).await;
352 continue;
353 }
354 }
355 }
356 }
357
358 Err(last_error)
359 }
360
361 /// Converts a CKAN dataset into Ceres' internal `NewDataset` model.
362 ///
363 /// This helper method transforms CKAN-specific data structures into the format
364 /// used by Ceres for database storage.
365 ///
366 /// # Arguments
367 ///
368 /// * `dataset` - The CKAN dataset to convert
369 /// * `portal_url` - The base URL of the CKAN portal
370 ///
371 /// # Returns
372 ///
373 /// A `NewDataset` ready to be inserted into the database.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// use ceres_client::CkanClient;
379 /// use ceres_client::ckan::CkanDataset;
380 ///
381 /// let ckan_dataset = CkanDataset {
382 /// id: "abc-123".to_string(),
383 /// name: "air-quality-data".to_string(),
384 /// title: "Air Quality Monitoring".to_string(),
385 /// notes: Some("Data from air quality sensors".to_string()),
386 /// extras: serde_json::Map::new(),
387 /// };
388 ///
389 /// let new_dataset = CkanClient::into_new_dataset(
390 /// ckan_dataset,
391 /// "https://dati.gov.it"
392 /// );
393 ///
394 /// assert_eq!(new_dataset.original_id, "abc-123");
395 /// assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/air-quality-data");
396 /// assert_eq!(new_dataset.title, "Air Quality Monitoring");
397 /// ```
398 pub fn into_new_dataset(dataset: CkanDataset, portal_url: &str) -> NewDataset {
399 let landing_page = format!(
400 "{}/dataset/{}",
401 portal_url.trim_end_matches('/'),
402 dataset.name
403 );
404
405 let metadata_json = serde_json::Value::Object(dataset.extras.clone());
406
407 // Compute content hash for delta detection
408 let content_hash =
409 NewDataset::compute_content_hash(&dataset.title, dataset.notes.as_deref());
410
411 NewDataset {
412 original_id: dataset.id,
413 source_portal: portal_url.to_string(),
414 url: landing_page,
415 title: dataset.title,
416 description: dataset.notes,
417 embedding: None,
418 metadata: metadata_json,
419 content_hash,
420 }
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427
428 #[test]
429 fn test_new_with_valid_url() {
430 let result = CkanClient::new("https://dati.gov.it");
431 assert!(result.is_ok());
432 let client = result.unwrap();
433 assert_eq!(client.base_url.as_str(), "https://dati.gov.it/");
434 }
435
436 #[test]
437 fn test_new_with_invalid_url() {
438 let result = CkanClient::new("not-a-valid-url");
439 assert!(result.is_err());
440
441 if let Err(AppError::Generic(msg)) = result {
442 assert!(msg.contains("Invalid CKAN URL"));
443 } else {
444 panic!("Expected AppError::Generic");
445 }
446 }
447
448 #[test]
449 fn test_into_new_dataset_basic() {
450 let ckan_dataset = CkanDataset {
451 id: "dataset-123".to_string(),
452 name: "my-dataset".to_string(),
453 title: "My Dataset".to_string(),
454 notes: Some("This is a test dataset".to_string()),
455 extras: serde_json::Map::new(),
456 };
457
458 let portal_url = "https://dati.gov.it";
459 let new_dataset = CkanClient::into_new_dataset(ckan_dataset.clone(), portal_url);
460
461 assert_eq!(new_dataset.original_id, "dataset-123");
462 assert_eq!(new_dataset.source_portal, "https://dati.gov.it");
463 assert_eq!(new_dataset.url, "https://dati.gov.it/dataset/my-dataset");
464 assert_eq!(new_dataset.title, "My Dataset");
465 assert!(new_dataset.embedding.is_none());
466
467 // Verify content hash is computed correctly
468 let expected_hash =
469 NewDataset::compute_content_hash(&ckan_dataset.title, ckan_dataset.notes.as_deref());
470 assert_eq!(new_dataset.content_hash, expected_hash);
471 assert_eq!(new_dataset.content_hash.len(), 64);
472 }
473
474 #[test]
475 fn test_ckan_response_deserialization() {
476 let json = r#"{
477 "success": true,
478 "result": ["dataset-1", "dataset-2", "dataset-3"]
479 }"#;
480
481 let response: CkanResponse<Vec<String>> = serde_json::from_str(json).unwrap();
482 assert!(response.success);
483 assert_eq!(response.result.len(), 3);
484 }
485
486 #[test]
487 fn test_ckan_dataset_deserialization() {
488 let json = r#"{
489 "id": "test-id",
490 "name": "test-name",
491 "title": "Test Title",
492 "notes": "Test notes",
493 "organization": {
494 "name": "test-org"
495 }
496 }"#;
497
498 let dataset: CkanDataset = serde_json::from_str(json).unwrap();
499 assert_eq!(dataset.id, "test-id");
500 assert_eq!(dataset.name, "test-name");
501 assert!(dataset.extras.contains_key("organization"));
502 }
503}
504
505// =============================================================================
506// Trait Implementations: PortalClient and PortalClientFactory
507// =============================================================================
508
509impl ceres_core::traits::PortalClient for CkanClient {
510 type PortalData = CkanDataset;
511
512 async fn list_dataset_ids(&self) -> Result<Vec<String>, AppError> {
513 self.list_package_ids().await
514 }
515
516 async fn get_dataset(&self, id: &str) -> Result<Self::PortalData, AppError> {
517 self.show_package(id).await
518 }
519
520 fn into_new_dataset(data: Self::PortalData, portal_url: &str) -> NewDataset {
521 CkanClient::into_new_dataset(data, portal_url)
522 }
523
524 async fn search_modified_since(
525 &self,
526 since: DateTime<Utc>,
527 ) -> Result<Vec<Self::PortalData>, AppError> {
528 self.search_modified_since(since).await
529 }
530}
531
532/// Factory for creating CKAN portal clients.
533#[derive(Debug, Clone, Default)]
534pub struct CkanClientFactory;
535
536impl CkanClientFactory {
537 /// Creates a new CKAN client factory.
538 pub fn new() -> Self {
539 Self
540 }
541}
542
543impl ceres_core::traits::PortalClientFactory for CkanClientFactory {
544 type Client = CkanClient;
545
546 fn create(&self, portal_url: &str) -> Result<Self::Client, AppError> {
547 CkanClient::new(portal_url)
548 }
549}