use reqwest::Client;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use thiserror::Error;
use tracing::{info, warn};
pub const DEFAULT_LIMIT: u32 = 1000;
#[derive(Error, Debug)]
pub enum SocrataError {
#[error("API Request failed: {0}")]
RequestError(#[from] reqwest::Error),
#[error("API Request returned status {0}: {1}")]
ApiError(reqwest::StatusCode, String),
#[error("Failed to parse response: {0}")]
ParseError(#[from] serde_json::Error),
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ColumnMetadata {
pub name: String,
pub field_name: String,
pub data_type_name: String,
#[serde(default)]
pub description: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DatasetMetadata {
pub id: String,
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub category: Option<String>,
#[serde(default)]
pub columns: Vec<ColumnMetadata>,
#[serde(default)]
pub rows_updated_at: Option<u64>,
#[serde(default)]
pub publication_stage: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct CatalogResource {
pub name: String,
pub id: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub domain_category: Option<String>,
#[serde(rename = "type", default)]
pub resource_type: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct CatalogEntry {
pub resource: CatalogResource,
}
#[derive(Debug, Clone, Deserialize)]
pub struct CatalogResponse {
pub results: Vec<CatalogEntry>,
#[serde(rename = "resultSetSize")]
pub result_set_size: u32,
}
pub struct SocrataClient {
client: Client,
base_url: String,
app_token: Option<String>,
}
impl SocrataClient {
pub fn new(base_url: &str, app_token: Option<String>) -> Self {
Self {
client: Client::new(),
base_url: base_url.to_string(),
app_token,
}
}
fn add_auth(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
if let Some(token) = &self.app_token {
request.header("X-App-Token", token)
} else {
warn!("No App Token provided. Rate limits may apply.");
request
}
}
pub async fn fetch<T: DeserializeOwned>(
&self,
dataset_id: &str,
limit: u32,
offset: u32,
order: Option<&str>,
where_clause: Option<&str>,
) -> Result<Vec<T>, SocrataError> {
let mut url = format!(
"{}/resource/{}.json?$limit={}&$offset={}",
self.base_url, dataset_id, limit, offset
);
if let Some(ord) = order {
url.push_str(&format!("&$order={}", ord));
}
if let Some(clause) = where_clause {
url.push_str(&format!("&$where={}", clause));
}
info!("SODA Request: {}", url);
let request = self.add_auth(self.client.get(&url));
let response = request.send().await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(SocrataError::ApiError(status, body));
}
let data: Vec<T> = response.json().await?;
Ok(data)
}
pub async fn get_all<T: DeserializeOwned>(
&self,
dataset_id: &str,
order: Option<&str>,
where_clause: Option<&str>,
) -> Result<Vec<T>, SocrataError> {
let mut all_results: Vec<T> = Vec::new();
let mut offset = 0u32;
loop {
let page: Vec<T> = self.fetch(dataset_id, DEFAULT_LIMIT, offset, order, where_clause).await?;
let page_len = page.len() as u32;
if page.is_empty() {
break;
}
all_results.extend(page);
if page_len < DEFAULT_LIMIT {
break;
}
offset += DEFAULT_LIMIT;
info!("Fetched {} records so far...", all_results.len());
}
info!("Total records fetched: {}", all_results.len());
Ok(all_results)
}
pub async fn get_metadata(&self, dataset_id: &str) -> Result<DatasetMetadata, SocrataError> {
let url = format!("{}/api/views/{}.json", self.base_url, dataset_id);
info!("Fetching metadata: {}", url);
let request = self.add_auth(self.client.get(&url));
let response = request.send().await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(SocrataError::ApiError(status, body));
}
let metadata: DatasetMetadata = response.json().await?;
Ok(metadata)
}
pub async fn datasets(&self, limit: u32, offset: u32) -> Result<Vec<CatalogResource>, SocrataError> {
let mut url = format!("{}/api/catalog/v1?offset={}", self.base_url, offset);
if limit > 0 {
url.push_str(&format!("&limit={}", limit));
}
info!("Fetching catalog: {}", url);
let request = self.add_auth(self.client.get(&url));
let response = request.send().await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(SocrataError::ApiError(status, body));
}
let catalog: CatalogResponse = response.json().await?;
let resources: Vec<CatalogResource> = catalog.results.into_iter().map(|e| e.resource).collect();
info!("Found {} datasets", resources.len());
Ok(resources)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_creation() {
let client = SocrataClient::new("https://www.datos.gov.co", Some("test_token".to_string()));
assert_eq!(client.base_url, "https://www.datos.gov.co");
assert!(client.app_token.is_some());
}
#[test]
fn test_client_without_token() {
let client = SocrataClient::new("https://example.com", None);
assert!(client.app_token.is_none());
}
}