socrata_sdk/
lib.rs

1//! # Socrata SDK
2//!
3//! A high-performance, asynchronous Rust client for the Socrata Open Data API (SODA).
4//!
5//! ## Features
6//! - **Async/Await**: Built on `reqwest` and `tokio`.
7//! - **Generic**: Works with any `serde::Deserialize` type.
8//! - **Pagination**: Automatic iteration over large datasets with `get_all()`.
9//!
10//! ## Example
11//! ```ignore
12//! use socrata_sdk::SocrataClient;
13//! use serde::Deserialize;
14//!
15//! #[derive(Deserialize)]
16//! struct Contract { id: String }
17//!
18//! #[tokio::main]
19//! async fn main() -> anyhow::Result<()> {
20//!     let client = SocrataClient::new("https://www.datos.gov.co", None);
21//!     let contracts: Vec<Contract> = client.fetch("abcd-1234", 100, 0, None, None).await?;
22//!     println!("Fetched {} contracts", contracts.len());
23//!     Ok(())
24//! }
25//! ```
26
27use reqwest::Client;
28use serde::{de::DeserializeOwned, Deserialize, Serialize};
29use thiserror::Error;
30use tracing::{info, warn};
31
32/// Default page size for pagination (matches Socrata's default).
33pub const DEFAULT_LIMIT: u32 = 1000;
34
35#[derive(Error, Debug)]
36pub enum SocrataError {
37    #[error("API Request failed: {0}")]
38    RequestError(#[from] reqwest::Error),
39    #[error("API Request returned status {0}: {1}")]
40    ApiError(reqwest::StatusCode, String),
41    #[error("Failed to parse response: {0}")]
42    ParseError(#[from] serde_json::Error),
43}
44
45/// Metadata about a Socrata dataset column.
46#[derive(Debug, Clone, Deserialize, Serialize)]
47#[serde(rename_all = "camelCase")]
48pub struct ColumnMetadata {
49    pub name: String,
50    pub field_name: String,
51    pub data_type_name: String,
52    #[serde(default)]
53    pub description: Option<String>,
54}
55
56/// Metadata about a Socrata dataset.
57#[derive(Debug, Clone, Deserialize, Serialize)]
58#[serde(rename_all = "camelCase")]
59pub struct DatasetMetadata {
60    pub id: String,
61    pub name: String,
62    #[serde(default)]
63    pub description: Option<String>,
64    #[serde(default)]
65    pub category: Option<String>,
66    #[serde(default)]
67    pub columns: Vec<ColumnMetadata>,
68    #[serde(default)]
69    pub rows_updated_at: Option<u64>,
70    #[serde(default)]
71    pub publication_stage: Option<String>,
72}
73
74/// Information about a dataset from the catalog API.
75#[derive(Debug, Clone, Deserialize, Serialize)]
76pub struct CatalogResource {
77    pub name: String,
78    pub id: String,
79    #[serde(default)]
80    pub description: Option<String>,
81    #[serde(default)]
82    pub domain_category: Option<String>,
83    #[serde(rename = "type", default)]
84    pub resource_type: Option<String>,
85}
86
87/// Wrapper for catalog API response.
88#[derive(Debug, Clone, Deserialize)]
89pub struct CatalogEntry {
90    pub resource: CatalogResource,
91}
92
93/// Response from the catalog API.
94#[derive(Debug, Clone, Deserialize)]
95pub struct CatalogResponse {
96    pub results: Vec<CatalogEntry>,
97    #[serde(rename = "resultSetSize")]
98    pub result_set_size: u32,
99}
100
101/// A client for the Socrata Open Data API (SODA).
102pub struct SocrataClient {
103    client: Client,
104    base_url: String,
105    app_token: Option<String>,
106}
107
108impl SocrataClient {
109    /// Creates a new `SocrataClient`.
110    ///
111    /// # Arguments
112    /// * `base_url` - The base domain of the Socrata instance (e.g., "https://www.datos.gov.co")
113    /// * `app_token` - Optional Socrata App Token for higher rate limits.
114    pub fn new(base_url: &str, app_token: Option<String>) -> Self {
115        Self {
116            client: Client::new(),
117            base_url: base_url.to_string(),
118            app_token,
119        }
120    }
121
122    /// Add authentication headers to a request.
123    fn add_auth(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
124        if let Some(token) = &self.app_token {
125            request.header("X-App-Token", token)
126        } else {
127            warn!("No App Token provided. Rate limits may apply.");
128            request
129        }
130    }
131
132    /// Fetch data from a specific dataset ID using SODA API.
133    ///
134    /// # Arguments
135    /// * `dataset_id` - The 4x4 ID of the dataset (e.g. "jbjy-vk9h")
136    /// * `limit` - Number of records to return
137    /// * `offset` - Number of records to skip
138    /// * `order` - SoQL order clause (e.g. "date DESC")
139    /// * `where_clause` - Optional SoQL where clause
140    pub async fn fetch<T: DeserializeOwned>(
141        &self,
142        dataset_id: &str,
143        limit: u32,
144        offset: u32,
145        order: Option<&str>,
146        where_clause: Option<&str>,
147    ) -> Result<Vec<T>, SocrataError> {
148        let mut url = format!(
149            "{}/resource/{}.json?$limit={}&$offset={}",
150            self.base_url, dataset_id, limit, offset
151        );
152
153        if let Some(ord) = order {
154            url.push_str(&format!("&$order={}", ord));
155        }
156
157        if let Some(clause) = where_clause {
158            url.push_str(&format!("&$where={}", clause));
159        }
160
161        info!("SODA Request: {}", url);
162
163        let request = self.add_auth(self.client.get(&url));
164        let response = request.send().await?;
165
166        if !response.status().is_success() {
167            let status = response.status();
168            let body = response.text().await.unwrap_or_default();
169            return Err(SocrataError::ApiError(status, body));
170        }
171
172        let data: Vec<T> = response.json().await?;
173        Ok(data)
174    }
175
176    /// Fetch ALL data from a dataset, automatically paginating through results.
177    ///
178    /// This method handles pagination internally and returns all records.
179    /// For very large datasets, consider using `fetch()` with explicit pagination.
180    ///
181    /// # Arguments
182    /// * `dataset_id` - The 4x4 ID of the dataset
183    /// * `order` - Optional SoQL order clause
184    /// * `where_clause` - Optional SoQL where clause
185    pub async fn get_all<T: DeserializeOwned>(
186        &self,
187        dataset_id: &str,
188        order: Option<&str>,
189        where_clause: Option<&str>,
190    ) -> Result<Vec<T>, SocrataError> {
191        let mut all_results: Vec<T> = Vec::new();
192        let mut offset = 0u32;
193
194        loop {
195            let page: Vec<T> = self.fetch(dataset_id, DEFAULT_LIMIT, offset, order, where_clause).await?;
196            let page_len = page.len() as u32;
197
198            if page.is_empty() {
199                break;
200            }
201
202            all_results.extend(page);
203
204            if page_len < DEFAULT_LIMIT {
205                // Last page (incomplete page means no more data)
206                break;
207            }
208
209            offset += DEFAULT_LIMIT;
210            info!("Fetched {} records so far...", all_results.len());
211        }
212
213        info!("Total records fetched: {}", all_results.len());
214        Ok(all_results)
215    }
216
217    /// Retrieve metadata about a specific dataset.
218    ///
219    /// # Arguments
220    /// * `dataset_id` - The 4x4 ID of the dataset
221    pub async fn get_metadata(&self, dataset_id: &str) -> Result<DatasetMetadata, SocrataError> {
222        let url = format!("{}/api/views/{}.json", self.base_url, dataset_id);
223
224        info!("Fetching metadata: {}", url);
225
226        let request = self.add_auth(self.client.get(&url));
227        let response = request.send().await?;
228
229        if !response.status().is_success() {
230            let status = response.status();
231            let body = response.text().await.unwrap_or_default();
232            return Err(SocrataError::ApiError(status, body));
233        }
234
235        let metadata: DatasetMetadata = response.json().await?;
236        Ok(metadata)
237    }
238
239    /// List all datasets in the domain.
240    ///
241    /// Uses the Socrata Catalog API to discover available datasets.
242    ///
243    /// # Arguments
244    /// * `limit` - Maximum number of datasets to return (0 = all)
245    /// * `offset` - Number of datasets to skip
246    pub async fn datasets(&self, limit: u32, offset: u32) -> Result<Vec<CatalogResource>, SocrataError> {
247        let mut url = format!("{}/api/catalog/v1?offset={}", self.base_url, offset);
248
249        if limit > 0 {
250            url.push_str(&format!("&limit={}", limit));
251        }
252
253        info!("Fetching catalog: {}", url);
254
255        let request = self.add_auth(self.client.get(&url));
256        let response = request.send().await?;
257
258        if !response.status().is_success() {
259            let status = response.status();
260            let body = response.text().await.unwrap_or_default();
261            return Err(SocrataError::ApiError(status, body));
262        }
263
264        let catalog: CatalogResponse = response.json().await?;
265        let resources: Vec<CatalogResource> = catalog.results.into_iter().map(|e| e.resource).collect();
266
267        info!("Found {} datasets", resources.len());
268        Ok(resources)
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn test_client_creation() {
278        let client = SocrataClient::new("https://www.datos.gov.co", Some("test_token".to_string()));
279        assert_eq!(client.base_url, "https://www.datos.gov.co");
280        assert!(client.app_token.is_some());
281    }
282
283    #[test]
284    fn test_client_without_token() {
285        let client = SocrataClient::new("https://example.com", None);
286        assert!(client.app_token.is_none());
287    }
288}
289