1use reqwest::Client;
28use serde::{de::DeserializeOwned, Deserialize, Serialize};
29use thiserror::Error;
30use tracing::{info, warn};
31
32pub const DEFAULT_LIMIT: u32 = 1000;
34
35#[derive(Error, Debug)]
36pub enum SocrataError {
37 #[error("API Request failed: {0}")]
38 RequestError(#[from] reqwest::Error),
39 #[error("API Request returned status {0}: {1}")]
40 ApiError(reqwest::StatusCode, String),
41 #[error("Failed to parse response: {0}")]
42 ParseError(#[from] serde_json::Error),
43}
44
45#[derive(Debug, Clone, Deserialize, Serialize)]
47#[serde(rename_all = "camelCase")]
48pub struct ColumnMetadata {
49 pub name: String,
50 pub field_name: String,
51 pub data_type_name: String,
52 #[serde(default)]
53 pub description: Option<String>,
54}
55
56#[derive(Debug, Clone, Deserialize, Serialize)]
58#[serde(rename_all = "camelCase")]
59pub struct DatasetMetadata {
60 pub id: String,
61 pub name: String,
62 #[serde(default)]
63 pub description: Option<String>,
64 #[serde(default)]
65 pub category: Option<String>,
66 #[serde(default)]
67 pub columns: Vec<ColumnMetadata>,
68 #[serde(default)]
69 pub rows_updated_at: Option<u64>,
70 #[serde(default)]
71 pub publication_stage: Option<String>,
72}
73
74#[derive(Debug, Clone, Deserialize, Serialize)]
76pub struct CatalogResource {
77 pub name: String,
78 pub id: String,
79 #[serde(default)]
80 pub description: Option<String>,
81 #[serde(default)]
82 pub domain_category: Option<String>,
83 #[serde(rename = "type", default)]
84 pub resource_type: Option<String>,
85}
86
87#[derive(Debug, Clone, Deserialize)]
89pub struct CatalogEntry {
90 pub resource: CatalogResource,
91}
92
93#[derive(Debug, Clone, Deserialize)]
95pub struct CatalogResponse {
96 pub results: Vec<CatalogEntry>,
97 #[serde(rename = "resultSetSize")]
98 pub result_set_size: u32,
99}
100
101pub struct SocrataClient {
103 client: Client,
104 base_url: String,
105 app_token: Option<String>,
106}
107
108impl SocrataClient {
109 pub fn new(base_url: &str, app_token: Option<String>) -> Self {
115 Self {
116 client: Client::new(),
117 base_url: base_url.to_string(),
118 app_token,
119 }
120 }
121
122 fn add_auth(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
124 if let Some(token) = &self.app_token {
125 request.header("X-App-Token", token)
126 } else {
127 warn!("No App Token provided. Rate limits may apply.");
128 request
129 }
130 }
131
132 pub async fn fetch<T: DeserializeOwned>(
141 &self,
142 dataset_id: &str,
143 limit: u32,
144 offset: u32,
145 order: Option<&str>,
146 where_clause: Option<&str>,
147 ) -> Result<Vec<T>, SocrataError> {
148 let mut url = format!(
149 "{}/resource/{}.json?$limit={}&$offset={}",
150 self.base_url, dataset_id, limit, offset
151 );
152
153 if let Some(ord) = order {
154 url.push_str(&format!("&$order={}", ord));
155 }
156
157 if let Some(clause) = where_clause {
158 url.push_str(&format!("&$where={}", clause));
159 }
160
161 info!("SODA Request: {}", url);
162
163 let request = self.add_auth(self.client.get(&url));
164 let response = request.send().await?;
165
166 if !response.status().is_success() {
167 let status = response.status();
168 let body = response.text().await.unwrap_or_default();
169 return Err(SocrataError::ApiError(status, body));
170 }
171
172 let data: Vec<T> = response.json().await?;
173 Ok(data)
174 }
175
176 pub async fn get_all<T: DeserializeOwned>(
186 &self,
187 dataset_id: &str,
188 order: Option<&str>,
189 where_clause: Option<&str>,
190 ) -> Result<Vec<T>, SocrataError> {
191 let mut all_results: Vec<T> = Vec::new();
192 let mut offset = 0u32;
193
194 loop {
195 let page: Vec<T> = self.fetch(dataset_id, DEFAULT_LIMIT, offset, order, where_clause).await?;
196 let page_len = page.len() as u32;
197
198 if page.is_empty() {
199 break;
200 }
201
202 all_results.extend(page);
203
204 if page_len < DEFAULT_LIMIT {
205 break;
207 }
208
209 offset += DEFAULT_LIMIT;
210 info!("Fetched {} records so far...", all_results.len());
211 }
212
213 info!("Total records fetched: {}", all_results.len());
214 Ok(all_results)
215 }
216
217 pub async fn get_metadata(&self, dataset_id: &str) -> Result<DatasetMetadata, SocrataError> {
222 let url = format!("{}/api/views/{}.json", self.base_url, dataset_id);
223
224 info!("Fetching metadata: {}", url);
225
226 let request = self.add_auth(self.client.get(&url));
227 let response = request.send().await?;
228
229 if !response.status().is_success() {
230 let status = response.status();
231 let body = response.text().await.unwrap_or_default();
232 return Err(SocrataError::ApiError(status, body));
233 }
234
235 let metadata: DatasetMetadata = response.json().await?;
236 Ok(metadata)
237 }
238
239 pub async fn datasets(&self, limit: u32, offset: u32) -> Result<Vec<CatalogResource>, SocrataError> {
247 let mut url = format!("{}/api/catalog/v1?offset={}", self.base_url, offset);
248
249 if limit > 0 {
250 url.push_str(&format!("&limit={}", limit));
251 }
252
253 info!("Fetching catalog: {}", url);
254
255 let request = self.add_auth(self.client.get(&url));
256 let response = request.send().await?;
257
258 if !response.status().is_success() {
259 let status = response.status();
260 let body = response.text().await.unwrap_or_default();
261 return Err(SocrataError::ApiError(status, body));
262 }
263
264 let catalog: CatalogResponse = response.json().await?;
265 let resources: Vec<CatalogResource> = catalog.results.into_iter().map(|e| e.resource).collect();
266
267 info!("Found {} datasets", resources.len());
268 Ok(resources)
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[test]
277 fn test_client_creation() {
278 let client = SocrataClient::new("https://www.datos.gov.co", Some("test_token".to_string()));
279 assert_eq!(client.base_url, "https://www.datos.gov.co");
280 assert!(client.app_token.is_some());
281 }
282
283 #[test]
284 fn test_client_without_token() {
285 let client = SocrataClient::new("https://example.com", None);
286 assert!(client.app_token.is_none());
287 }
288}
289