socrata-sdk 0.1.0

A Rust client for the Socrata Open Data API (SODA)
Documentation
//! # Socrata SDK
//!
//! A high-performance, asynchronous Rust client for the Socrata Open Data API (SODA).
//!
//! ## Features
//! - **Async/Await**: Built on `reqwest` and `tokio`.
//! - **Generic**: Works with any `serde::Deserialize` type.
//! - **Pagination**: Automatic iteration over large datasets with `get_all()`.
//!
//! ## Example
//! ```ignore
//! use socrata_sdk::SocrataClient;
//! use serde::Deserialize;
//!
//! #[derive(Deserialize)]
//! struct Contract { id: String }
//!
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//!     let client = SocrataClient::new("https://www.datos.gov.co", None);
//!     let contracts: Vec<Contract> = client.fetch("abcd-1234", 100, 0, None, None).await?;
//!     println!("Fetched {} contracts", contracts.len());
//!     Ok(())
//! }
//! ```

use reqwest::Client;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use thiserror::Error;
use tracing::{info, warn};

/// Default page size for pagination (matches Socrata's default).
pub const DEFAULT_LIMIT: u32 = 1000;

#[derive(Error, Debug)]
pub enum SocrataError {
    #[error("API Request failed: {0}")]
    RequestError(#[from] reqwest::Error),
    #[error("API Request returned status {0}: {1}")]
    ApiError(reqwest::StatusCode, String),
    #[error("Failed to parse response: {0}")]
    ParseError(#[from] serde_json::Error),
}

/// Metadata about a Socrata dataset column.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ColumnMetadata {
    pub name: String,
    pub field_name: String,
    pub data_type_name: String,
    #[serde(default)]
    pub description: Option<String>,
}

/// Metadata about a Socrata dataset.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DatasetMetadata {
    pub id: String,
    pub name: String,
    #[serde(default)]
    pub description: Option<String>,
    #[serde(default)]
    pub category: Option<String>,
    #[serde(default)]
    pub columns: Vec<ColumnMetadata>,
    #[serde(default)]
    pub rows_updated_at: Option<u64>,
    #[serde(default)]
    pub publication_stage: Option<String>,
}

/// Information about a dataset from the catalog API.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct CatalogResource {
    pub name: String,
    pub id: String,
    #[serde(default)]
    pub description: Option<String>,
    #[serde(default)]
    pub domain_category: Option<String>,
    #[serde(rename = "type", default)]
    pub resource_type: Option<String>,
}

/// Wrapper for catalog API response.
#[derive(Debug, Clone, Deserialize)]
pub struct CatalogEntry {
    pub resource: CatalogResource,
}

/// Response from the catalog API.
#[derive(Debug, Clone, Deserialize)]
pub struct CatalogResponse {
    pub results: Vec<CatalogEntry>,
    #[serde(rename = "resultSetSize")]
    pub result_set_size: u32,
}

/// A client for the Socrata Open Data API (SODA).
pub struct SocrataClient {
    client: Client,
    base_url: String,
    app_token: Option<String>,
}

impl SocrataClient {
    /// Creates a new `SocrataClient`.
    ///
    /// # Arguments
    /// * `base_url` - The base domain of the Socrata instance (e.g., "https://www.datos.gov.co")
    /// * `app_token` - Optional Socrata App Token for higher rate limits.
    pub fn new(base_url: &str, app_token: Option<String>) -> Self {
        Self {
            client: Client::new(),
            base_url: base_url.to_string(),
            app_token,
        }
    }

    /// Add authentication headers to a request.
    fn add_auth(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
        if let Some(token) = &self.app_token {
            request.header("X-App-Token", token)
        } else {
            warn!("No App Token provided. Rate limits may apply.");
            request
        }
    }

    /// Fetch data from a specific dataset ID using SODA API.
    ///
    /// # Arguments
    /// * `dataset_id` - The 4x4 ID of the dataset (e.g. "jbjy-vk9h")
    /// * `limit` - Number of records to return
    /// * `offset` - Number of records to skip
    /// * `order` - SoQL order clause (e.g. "date DESC")
    /// * `where_clause` - Optional SoQL where clause
    pub async fn fetch<T: DeserializeOwned>(
        &self,
        dataset_id: &str,
        limit: u32,
        offset: u32,
        order: Option<&str>,
        where_clause: Option<&str>,
    ) -> Result<Vec<T>, SocrataError> {
        let mut url = format!(
            "{}/resource/{}.json?$limit={}&$offset={}",
            self.base_url, dataset_id, limit, offset
        );

        if let Some(ord) = order {
            url.push_str(&format!("&$order={}", ord));
        }

        if let Some(clause) = where_clause {
            url.push_str(&format!("&$where={}", clause));
        }

        info!("SODA Request: {}", url);

        let request = self.add_auth(self.client.get(&url));
        let response = request.send().await?;

        if !response.status().is_success() {
            let status = response.status();
            let body = response.text().await.unwrap_or_default();
            return Err(SocrataError::ApiError(status, body));
        }

        let data: Vec<T> = response.json().await?;
        Ok(data)
    }

    /// Fetch ALL data from a dataset, automatically paginating through results.
    ///
    /// This method handles pagination internally and returns all records.
    /// For very large datasets, consider using `fetch()` with explicit pagination.
    ///
    /// # Arguments
    /// * `dataset_id` - The 4x4 ID of the dataset
    /// * `order` - Optional SoQL order clause
    /// * `where_clause` - Optional SoQL where clause
    pub async fn get_all<T: DeserializeOwned>(
        &self,
        dataset_id: &str,
        order: Option<&str>,
        where_clause: Option<&str>,
    ) -> Result<Vec<T>, SocrataError> {
        let mut all_results: Vec<T> = Vec::new();
        let mut offset = 0u32;

        loop {
            let page: Vec<T> = self.fetch(dataset_id, DEFAULT_LIMIT, offset, order, where_clause).await?;
            let page_len = page.len() as u32;

            if page.is_empty() {
                break;
            }

            all_results.extend(page);

            if page_len < DEFAULT_LIMIT {
                // Last page (incomplete page means no more data)
                break;
            }

            offset += DEFAULT_LIMIT;
            info!("Fetched {} records so far...", all_results.len());
        }

        info!("Total records fetched: {}", all_results.len());
        Ok(all_results)
    }

    /// Retrieve metadata about a specific dataset.
    ///
    /// # Arguments
    /// * `dataset_id` - The 4x4 ID of the dataset
    pub async fn get_metadata(&self, dataset_id: &str) -> Result<DatasetMetadata, SocrataError> {
        let url = format!("{}/api/views/{}.json", self.base_url, dataset_id);

        info!("Fetching metadata: {}", url);

        let request = self.add_auth(self.client.get(&url));
        let response = request.send().await?;

        if !response.status().is_success() {
            let status = response.status();
            let body = response.text().await.unwrap_or_default();
            return Err(SocrataError::ApiError(status, body));
        }

        let metadata: DatasetMetadata = response.json().await?;
        Ok(metadata)
    }

    /// List all datasets in the domain.
    ///
    /// Uses the Socrata Catalog API to discover available datasets.
    ///
    /// # Arguments
    /// * `limit` - Maximum number of datasets to return (0 = all)
    /// * `offset` - Number of datasets to skip
    pub async fn datasets(&self, limit: u32, offset: u32) -> Result<Vec<CatalogResource>, SocrataError> {
        let mut url = format!("{}/api/catalog/v1?offset={}", self.base_url, offset);

        if limit > 0 {
            url.push_str(&format!("&limit={}", limit));
        }

        info!("Fetching catalog: {}", url);

        let request = self.add_auth(self.client.get(&url));
        let response = request.send().await?;

        if !response.status().is_success() {
            let status = response.status();
            let body = response.text().await.unwrap_or_default();
            return Err(SocrataError::ApiError(status, body));
        }

        let catalog: CatalogResponse = response.json().await?;
        let resources: Vec<CatalogResource> = catalog.results.into_iter().map(|e| e.resource).collect();

        info!("Found {} datasets", resources.len());
        Ok(resources)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_client_creation() {
        let client = SocrataClient::new("https://www.datos.gov.co", Some("test_token".to_string()));
        assert_eq!(client.base_url, "https://www.datos.gov.co");
        assert!(client.app_token.is_some());
    }

    #[test]
    fn test_client_without_token() {
        let client = SocrataClient::new("https://example.com", None);
        assert!(client.app_token.is_none());
    }
}