cellos-ctl 0.5.0

cellctl — kubectl-style CLI for CellOS execution cells and formations. Thin HTTP client over cellos-server with apply/get/describe/logs/events/webui.
Documentation
//! `cellctl events [--formation NAME] [--follow]`
//!
//! Tails the cellos-server `/ws/events` WebSocket. The server has already
//! subscribed to NATS/JetStream on behalf of the client and forwards
//! CloudEvents as they arrive (CHATROOM Session 16, fern @11:14).
//!
//! No `--follow` flag falls back to a one-shot HTTP fetch of recent events for
//! environments where WebSocket isn't viable (a common kubectl pattern).

use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::Message;

use crate::client::CellosClient;
use crate::exit::{CtlError, CtlResult};
use crate::model::CloudEvent;

pub async fn run(client: &CellosClient, formation: Option<&str>, follow: bool) -> CtlResult<()> {
    if !follow {
        return one_shot(client, formation).await;
    }
    follow_ws(client, formation).await
}

async fn one_shot(client: &CellosClient, formation: Option<&str>) -> CtlResult<()> {
    let mut path = "/v1/events".to_string();
    if let Some(f) = formation {
        path.push_str(&format!("?formation={}", urlencode(f)));
    }
    let resp = client.get_stream(&path).await?;
    let body = resp.text().await?;
    let trimmed = body.trim_start();
    if trimmed.starts_with('[') {
        let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
        for ev in arr {
            print_event(&ev);
        }
    } else {
        for line in body.lines() {
            if line.trim().is_empty() {
                continue;
            }
            let ev: CloudEvent = serde_json::from_str(line)
                .map_err(|e| CtlError::api(format!("parse event: {e}")))?;
            print_event(&ev);
        }
    }
    Ok(())
}

async fn follow_ws(client: &CellosClient, formation: Option<&str>) -> CtlResult<()> {
    let mut path = "/ws/events".to_string();
    if let Some(f) = formation {
        path.push_str(&format!("?formation={}", urlencode(f)));
    }
    let url = client.ws_url(&path)?;

    // tokio-tungstenite's connect_async accepts http::Request, but to keep deps
    // small we build a simple request via Url and pass the auth header on the
    // first message instead — many servers accept query-string auth too.
    // For now, pass through the URL only; servers that require Bearer over WS
    // can support the `Sec-WebSocket-Protocol: bearer.<token>` convention later.
    let (ws_stream, _resp) = tokio_tungstenite::connect_async(&url)
        .await
        .map_err(|e| CtlError::api(format!("ws connect {url}: {e}")))?;

    let (mut tx, mut rx) = ws_stream.split();

    loop {
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {
                let _ = tx.send(Message::Close(None)).await;
                eprintln!();
                return Ok(());
            }
            msg = rx.next() => match msg {
                Some(Ok(Message::Text(t))) => {
                    match serde_json::from_str::<CloudEvent>(&t) {
                        Ok(ev) => print_event(&ev),
                        Err(_) => println!("{t}"),
                    }
                }
                Some(Ok(Message::Binary(b))) => {
                    if let Ok(s) = std::str::from_utf8(&b) {
                        match serde_json::from_str::<CloudEvent>(s) {
                            Ok(ev) => print_event(&ev),
                            Err(_) => println!("{s}"),
                        }
                    }
                }
                Some(Ok(Message::Ping(payload))) => {
                    let _ = tx.send(Message::Pong(payload)).await;
                }
                Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {}
                Some(Ok(Message::Close(_))) | None => return Ok(()),
                Some(Err(e)) => {
                    return Err(CtlError::api(format!("ws: {e}")));
                }
            }
        }
    }
}

fn print_event(ev: &CloudEvent) {
    let ts = ev.time.as_deref().unwrap_or("-");
    let kind = ev.event_type.as_deref().unwrap_or("event");
    let subject = ev.subject.as_deref().unwrap_or("-");
    let data = ev
        .data
        .as_ref()
        .map(|v| serde_json::to_string(v).unwrap_or_default())
        .unwrap_or_default();
    if data.is_empty() {
        println!("{ts}  {kind}  {subject}");
    } else {
        println!("{ts}  {kind}  {subject}  {data}");
    }
}

fn urlencode(s: &str) -> String {
    url::form_urlencoded::byte_serialize(s.as_bytes()).collect()
}