pipestreamr 0.2.0

Rust SDK and CLI for the PipeStreamr unified events API
Documentation
use std::process;

use chrono::{DateTime, Utc};
use clap::{Parser, Subcommand};
use uuid::Uuid;

use pipestreamr::{EventQuery, PipeStreamrClient};

#[derive(Parser)]
#[command(name = "pipestreamr", version, about = "CLI for the PipeStreamr API")]
struct Cli {
    /// API base URL
    #[arg(long, env = "PIPESTREAMR_URL", default_value = "https://api.pipestreamr.com")]
    url: String,

    /// API key (or set PIPESTREAMR_API_KEY)
    #[arg(long, env = "PIPESTREAMR_API_KEY")]
    api_key: Option<String>,

    #[command(subcommand)]
    command: Command,
}

#[derive(Subcommand)]
enum Command {
    /// List events (messages and/or logs)
    Events {
        /// Filter by type: "log" or "message"
        #[arg(long, name = "type")]
        event_type: Option<String>,
        /// Filter by platform (e.g. discord, telegram, gmail)
        #[arg(long)]
        platform: Option<String>,
        /// Minimum severity number (for logs)
        #[arg(long)]
        severity: Option<i16>,
        /// Filter by service name (for logs)
        #[arg(long)]
        service: Option<String>,
        /// Filter by sender ID (for messages)
        #[arg(long)]
        from: Option<String>,
        /// Full-text search
        #[arg(long)]
        search: Option<String>,
        /// Events after this time (RFC 3339)
        #[arg(long)]
        since: Option<DateTime<Utc>>,
        /// Events before this time (RFC 3339)
        #[arg(long)]
        until: Option<DateTime<Utc>>,
        /// Max results (1-100)
        #[arg(long)]
        limit: Option<i64>,
        /// Pagination cursor
        #[arg(long)]
        cursor: Option<String>,
    },
    /// Get a single event by ID
    Event {
        /// Event UUID
        id: Uuid,
    },
    /// Show message statistics
    Stats,
    /// Check API health
    Health,
}

#[tokio::main]
async fn main() {
    let cli = Cli::parse();

    let api_key = match cli.api_key {
        Some(k) => k,
        None => {
            eprintln!("Error: missing API key. Set PIPESTREAMR_API_KEY or pass --api-key");
            process::exit(1);
        }
    };

    let client = PipeStreamrClient::new(&cli.url, &api_key);

    let result: Result<String, pipestreamr::Error> = match cli.command {
        Command::Events {
            event_type,
            platform,
            severity,
            service,
            from,
            search,
            since,
            until,
            limit,
            cursor,
        } => {
            let query = EventQuery {
                event_type,
                platform,
                severity,
                service,
                from,
                search,
                since,
                until,
                limit,
                cursor,
            };
            client
                .list_events(&query)
                .await
                .map(|r| serde_json::to_string_pretty(&r).unwrap())
        }
        Command::Event { id } => client
            .get_event(id)
            .await
            .map(|r| serde_json::to_string_pretty(&r).unwrap()),
        Command::Stats => client
            .stats()
            .await
            .map(|r| serde_json::to_string_pretty(&r).unwrap()),
        Command::Health => client.health().await.map(|ok| {
            if ok {
                "API is healthy".to_string()
            } else {
                "API is unhealthy".to_string()
            }
        }),
    };

    match result {
        Ok(output) => println!("{output}"),
        Err(e) => {
            eprintln!("Error: {e}");
            process::exit(1);
        }
    }
}