pipestreamr 0.4.0

Rust SDK and CLI for the PipeStreamr unified events API
Documentation
use uuid::Uuid;

use crate::error::Error;
use crate::models::*;

/// Client for the PipeStreamr public API.
///
/// # Example
/// ```no_run
/// use pipestreamr::PipeStreamrClient;
///
/// # async fn run() -> Result<(), pipestreamr::Error> {
/// let client = PipeStreamrClient::new("https://pipestreamr.com", "ps_your_api_key");
/// let events = client.list_events(&Default::default()).await?;
/// println!("{} events", events.events.len());
/// # Ok(())
/// # }
/// ```
pub struct PipeStreamrClient {
    base_url: String,
    api_key: String,
    http: reqwest::Client,
}

impl PipeStreamrClient {
    pub fn new(base_url: &str, api_key: &str) -> Self {
        Self {
            base_url: base_url.trim_end_matches('/').to_string(),
            api_key: api_key.to_string(),
            http: reqwest::Client::new(),
        }
    }

    async fn check_response(&self, resp: reqwest::Response) -> Result<reqwest::Response, Error> {
        if resp.status().is_success() {
            return Ok(resp);
        }
        let status = resp.status().as_u16();
        let message = resp
            .json::<serde_json::Value>()
            .await
            .ok()
            .and_then(|v| v.get("error")?.as_str().map(String::from))
            .unwrap_or_else(|| "Unknown error".into());
        Err(Error::Api { status, message })
    }

    /// List projects belonging to the authenticated user.
    pub async fn list_projects(&self) -> Result<ProjectListResponse, Error> {
        let resp = self
            .http
            .get(format!("{}/api/v1/projects", self.base_url))
            .bearer_auth(&self.api_key)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// List events with optional filters and cursor-based pagination.
    pub async fn list_events(&self, query: &EventQuery) -> Result<EventListResponse, Error> {
        let mut params: Vec<(&str, String)> = Vec::new();
        if let Some(ref pid) = query.project_id {
            params.push(("project_id", pid.to_string()));
        }
        if let Some(ref t) = query.event_type {
            params.push(("type", t.clone()));
        }
        if let Some(ref p) = query.platform {
            params.push(("platform", p.clone()));
        }
        if let Some(sev) = query.severity {
            params.push(("severity", sev.to_string()));
        }
        if let Some(ref svc) = query.service {
            params.push(("service", svc.clone()));
        }
        if let Some(ref f) = query.from {
            params.push(("from", f.clone()));
        }
        if let Some(ref s) = query.search {
            params.push(("search", s.clone()));
        }
        if let Some(s) = query.since {
            params.push(("since", s.to_rfc3339()));
        }
        if let Some(u) = query.until {
            params.push(("until", u.to_rfc3339()));
        }
        if let Some(l) = query.limit {
            params.push(("limit", l.to_string()));
        }
        if let Some(ref c) = query.cursor {
            params.push(("cursor", c.clone()));
        }

        let resp = self
            .http
            .get(format!("{}/api/v1/messages", self.base_url))
            .bearer_auth(&self.api_key)
            .query(&params)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// List all events matching the query, automatically following pagination cursors.
    ///
    /// Returns up to `max_events` total (default 1000). The query's `limit` field
    /// controls the page size (default 100 per page).
    pub async fn list_all_events(
        &self,
        query: &EventQuery,
        max_events: Option<usize>,
    ) -> Result<Vec<Event>, Error> {
        let max = max_events.unwrap_or(1000);
        let mut all_events = Vec::new();
        let mut cursor: Option<String> = query.cursor.clone();

        loop {
            let page_query = EventQuery {
                project_id: query.project_id,
                event_type: query.event_type.clone(),
                platform: query.platform.clone(),
                severity: query.severity,
                service: query.service.clone(),
                from: query.from.clone(),
                search: query.search.clone(),
                since: query.since,
                until: query.until,
                limit: Some(query.limit.unwrap_or(100)),
                cursor: cursor.clone(),
            };
            let resp = self.list_events(&page_query).await?;
            all_events.extend(resp.events);

            if !resp.has_more || all_events.len() >= max {
                break;
            }
            cursor = resp.next_cursor;
        }

        all_events.truncate(max);
        Ok(all_events)
    }

    /// Get a single event by ID.
    pub async fn get_event(&self, id: Uuid) -> Result<EventResponse, Error> {
        let resp = self
            .http
            .get(format!("{}/api/v1/messages/{}", self.base_url, id))
            .bearer_auth(&self.api_key)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// Get message statistics (total, today, by platform).
    ///
    /// Optionally filter to a specific project.
    pub async fn stats(&self, project_id: Option<Uuid>) -> Result<StatsResponse, Error> {
        let mut params: Vec<(&str, String)> = Vec::new();
        if let Some(pid) = project_id {
            params.push(("project_id", pid.to_string()));
        }
        let resp = self
            .http
            .get(format!("{}/api/v1/stats", self.base_url))
            .bearer_auth(&self.api_key)
            .query(&params)
            .send()
            .await?;
        let resp = self.check_response(resp).await?;
        Ok(resp.json().await?)
    }

    /// Send log entries via the OTLP /v1/logs endpoint.
    ///
    /// The `project_id` parameter is required to specify which project receives the logs.
    /// Message fields (platform, from_id, etc.) are automatically mapped
    /// into OTLP attributes so Pipestreamr can parse them as message events.
    pub async fn send_logs(
        &self,
        service_name: &str,
        entries: &[LogEntry],
        project_id: Uuid,
    ) -> Result<(), Error> {
        let log_records: Vec<serde_json::Value> = entries
            .iter()
            .map(|e| {
                let mut record = serde_json::json!({
                    "body": { "stringValue": &e.body },
                });
                if let Some(ref sev) = e.severity_text {
                    record["severityText"] = serde_json::json!(sev);
                }

                // Collect all attributes: explicit attributes + message fields
                let mut kv: Vec<serde_json::Value> = Vec::new();

                // Add explicit attributes
                if let Some(ref attrs) = e.attributes {
                    if let Some(obj) = attrs.as_object() {
                        for (k, v) in obj {
                            kv.push(serde_json::json!({
                                "key": k,
                                "value": { "stringValue": v.to_string() }
                            }));
                        }
                    }
                }

                // Add message fields as attributes
                let fields: &[(&str, &Option<String>)] = &[
                    ("platform", &e.platform),
                    ("from_id", &e.from_id),
                    ("from_name", &e.from_name),
                    ("to_id", &e.to_id),
                    ("to_name", &e.to_name),
                    ("channel_id", &e.channel_id),
                    ("channel_name", &e.channel_name),
                    ("subject", &e.subject),
                    ("body_html", &e.body_html),
                    ("message_type", &e.message_type),
                    ("external_id", &e.external_id),
                ];
                for (key, val) in fields {
                    if let Some(v) = val {
                        kv.push(serde_json::json!({
                            "key": key,
                            "value": { "stringValue": v }
                        }));
                    }
                }
                if let Some(ref att) = e.attachments {
                    kv.push(serde_json::json!({
                        "key": "attachments",
                        "value": { "stringValue": att.to_string() }
                    }));
                }

                if !kv.is_empty() {
                    record["attributes"] = serde_json::json!(kv);
                }
                record
            })
            .collect();

        let payload = serde_json::json!({
            "resourceLogs": [{
                "resource": {
                    "attributes": [{
                        "key": "service.name",
                        "value": { "stringValue": service_name }
                    }]
                },
                "scopeLogs": [{
                    "logRecords": log_records
                }]
            }]
        });

        let resp = self
            .http
            .post(format!("{}/v1/logs", self.base_url))
            .bearer_auth(&self.api_key)
            .query(&[("project_id", project_id.to_string())])
            .json(&payload)
            .send()
            .await?;
        self.check_response(resp).await?;
        Ok(())
    }

    /// Check if the API server is healthy.
    pub async fn health(&self) -> Result<bool, Error> {
        let resp = self
            .http
            .get(format!("{}/health", self.base_url))
            .send()
            .await?;
        Ok(resp.status().is_success())
    }
}