pipestreamr 0.3.0

Rust SDK and CLI for the PipeStreamr unified events API
Documentation
use uuid::Uuid;

use crate::error::Error;
use crate::models::*;

/// Client for the PipeStreamr public API.
///
/// # Example
/// ```no_run
/// use pipestreamr::PipeStreamrClient;
///
/// # async fn run() -> Result<(), pipestreamr::Error> {
/// let client = PipeStreamrClient::new("https://api.pipestreamr.com", "ps_your_api_key");
/// let events = client.list_events(&Default::default()).await?;
/// println!("{} events", events.events.len());
/// # Ok(())
/// # }
/// ```
pub struct PipeStreamrClient {
    base_url: String,
    api_key: String,
    http: reqwest::Client,
}

impl PipeStreamrClient {
    pub fn new(base_url: &str, api_key: &str) -> Self {
        Self {
            base_url: base_url.trim_end_matches('/').to_string(),
            api_key: api_key.to_string(),
            http: reqwest::Client::new(),
        }
    }

    async fn check_response(&self, resp: reqwest::Response) -> Result<reqwest::Response, Error> {
        if resp.status().is_success() {
            return Ok(resp);
        }
        let status = resp.status().as_u16();
        let message = resp
            .json::<serde_json::Value>()
            .await
            .ok()
            .and_then(|v| v.get("error")?.as_str().map(String::from))
            .unwrap_or_else(|| "Unknown error".into());
        Err(Error::Api { status, message })
    }

    /// List events with optional filters and cursor-based pagination.
    pub async fn list_events(&self, query: &EventQuery) -> Result<EventListResponse, Error> {
        let mut params: Vec<(&str, String)> = Vec::new();
        if let Some(ref t) = query.event_type {
            params.push(("type", t.clone()));
        }
        if let Some(ref p) = query.platform {
            params.push(("platform", p.clone()));
        }
        if let Some(sev) = query.severity {
            params.push(("severity", sev.to_string()));
        }
        if let Some(ref svc) = query.service {
            params.push(("service", svc.clone()));
        }
        if let Some(ref f) = query.from {
            params.push(("from", f.clone()));
        }
        if let Some(ref s) = query.search {
            params.push(("search", s.clone()));
        }
        if let Some(s) = query.since {
            params.push(("since", s.to_rfc3339()));
        }
        if let Some(u) = query.until {
            params.push(("until", u.to_rfc3339()));
        }
        if let Some(l) = query.limit {
            params.push(("limit", l.to_string()));
        }
        if let Some(ref c) = query.cursor {
            params.push(("cursor", c.clone()));
        }

        let resp = self
            .http
            .get(format!("{}/api/v1/messages", self.base_url))
            .bearer_auth(&self.api_key)
            .query(&params)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// Get a single event by ID.
    pub async fn get_event(&self, id: Uuid) -> Result<EventResponse, Error> {
        let resp = self
            .http
            .get(format!("{}/api/v1/messages/{}", self.base_url, id))
            .bearer_auth(&self.api_key)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// Get message statistics (total, today, by platform).
    pub async fn stats(&self) -> Result<StatsResponse, Error> {
        let resp = self
            .http
            .get(format!("{}/api/v1/stats", self.base_url))
            .bearer_auth(&self.api_key)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// Send log entries via the OTLP /v1/logs endpoint.
    pub async fn send_logs(
        &self,
        service_name: &str,
        entries: &[LogEntry],
    ) -> Result<(), Error> {
        let log_records: Vec<serde_json::Value> = entries
            .iter()
            .map(|e| {
                let mut record = serde_json::json!({
                    "body": { "stringValue": &e.body },
                });
                if let Some(ref sev) = e.severity_text {
                    record["severityText"] = serde_json::json!(sev);
                }
                if let Some(ref attrs) = e.attributes {
                    if let Some(obj) = attrs.as_object() {
                        let kv: Vec<serde_json::Value> = obj
                            .iter()
                            .map(|(k, v)| {
                                serde_json::json!({
                                    "key": k,
                                    "value": { "stringValue": v.to_string() }
                                })
                            })
                            .collect();
                        record["attributes"] = serde_json::json!(kv);
                    }
                }
                record
            })
            .collect();

        let payload = serde_json::json!({
            "resourceLogs": [{
                "resource": {
                    "attributes": [{
                        "key": "service.name",
                        "value": { "stringValue": service_name }
                    }]
                },
                "scopeLogs": [{
                    "logRecords": log_records
                }]
            }]
        });

        let resp = self
            .http
            .post(format!("{}/v1/logs", self.base_url))
            .bearer_auth(&self.api_key)
            .json(&payload)
            .send()
            .await?;
        self.check_response(resp).await?;
        Ok(())
    }

    /// Check if the API server is healthy.
    pub async fn health(&self) -> Result<bool, Error> {
        let resp = self
            .http
            .get(format!("{}/health", self.base_url))
            .send()
            .await?;
        Ok(resp.status().is_success())
    }
}