pipestreamr 0.2.0

Rust SDK and CLI for the PipeStreamr unified events API
Documentation
use uuid::Uuid;

use crate::error::Error;
use crate::models::*;

/// Client for the PipeStreamr public API.
///
/// # Example
/// ```no_run
/// use pipestreamr::PipeStreamrClient;
///
/// # async fn run() -> Result<(), pipestreamr::Error> {
/// let client = PipeStreamrClient::new("https://api.pipestreamr.com", "ps_your_api_key");
/// let events = client.list_events(&Default::default()).await?;
/// println!("{} events", events.events.len());
/// # Ok(())
/// # }
/// ```
pub struct PipeStreamrClient {
    base_url: String,
    api_key: String,
    http: reqwest::Client,
}

impl PipeStreamrClient {
    pub fn new(base_url: &str, api_key: &str) -> Self {
        Self {
            base_url: base_url.trim_end_matches('/').to_string(),
            api_key: api_key.to_string(),
            http: reqwest::Client::new(),
        }
    }

    async fn check_response(&self, resp: reqwest::Response) -> Result<reqwest::Response, Error> {
        if resp.status().is_success() {
            return Ok(resp);
        }
        let status = resp.status().as_u16();
        let message = resp
            .json::<serde_json::Value>()
            .await
            .ok()
            .and_then(|v| v.get("error")?.as_str().map(String::from))
            .unwrap_or_else(|| "Unknown error".into());
        Err(Error::Api { status, message })
    }

    /// List events with optional filters and cursor-based pagination.
    pub async fn list_events(&self, query: &EventQuery) -> Result<EventListResponse, Error> {
        let mut params: Vec<(&str, String)> = Vec::new();
        if let Some(ref t) = query.event_type {
            params.push(("type", t.clone()));
        }
        if let Some(ref p) = query.platform {
            params.push(("platform", p.clone()));
        }
        if let Some(sev) = query.severity {
            params.push(("severity", sev.to_string()));
        }
        if let Some(ref svc) = query.service {
            params.push(("service", svc.clone()));
        }
        if let Some(ref f) = query.from {
            params.push(("from", f.clone()));
        }
        if let Some(ref s) = query.search {
            params.push(("search", s.clone()));
        }
        if let Some(s) = query.since {
            params.push(("since", s.to_rfc3339()));
        }
        if let Some(u) = query.until {
            params.push(("until", u.to_rfc3339()));
        }
        if let Some(l) = query.limit {
            params.push(("limit", l.to_string()));
        }
        if let Some(ref c) = query.cursor {
            params.push(("cursor", c.clone()));
        }

        let resp = self
            .http
            .get(format!("{}/api/v1/events", self.base_url))
            .bearer_auth(&self.api_key)
            .query(&params)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// Get a single event by ID.
    pub async fn get_event(&self, id: Uuid) -> Result<EventResponse, Error> {
        let resp = self
            .http
            .get(format!("{}/api/v1/events/{}", self.base_url, id))
            .bearer_auth(&self.api_key)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// Get message statistics (total, today, by platform).
    pub async fn stats(&self) -> Result<StatsResponse, Error> {
        let resp = self
            .http
            .get(format!("{}/api/v1/stats", self.base_url))
            .bearer_auth(&self.api_key)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// Check if the API server is healthy.
    pub async fn health(&self) -> Result<bool, Error> {
        let resp = self
            .http
            .get(format!("{}/health", self.base_url))
            .send()
            .await?;
        Ok(resp.status().is_success())
    }
}