cellos-ctl 0.5.3

cellctl — kubectl-style CLI for CellOS execution cells and formations. Thin HTTP client over cellos-server with apply/get/describe/logs/events/webui.
Documentation
//! `cellctl events [--formation NAME] [--follow] [--since SEQ] [--limit N]`
//!
//! Two delivery paths, same render:
//!
//! * **One-shot** (no `--follow`) — HTTP `GET /v1/events` returns a
//!   bounded page of recent events from JetStream. Suitable for
//!   environments where WebSocket isn't viable (corporate proxies,
//!   kubectl-style scripted pulls). Response shape mirrors the WS
//!   envelope so the wire is uniform: `{events: [{seq, event}], cursor}`.
//!
//! * **Follow** (`--follow`) — WebSocket `/ws/events`. Live tail of
//!   CloudEvents as the server forwards them from JetStream. Optional
//!   `--since` resumes at a cursor without re-emitting frames the
//!   client has already seen (ADR-0015 §D3).
//!
//! ## EVT-002 — WebSocket Bearer auth
//!
//! Browsers can't set custom WebSocket headers, so the web view's WS
//! path falls back to a localhost-proxy auth model (see ADR-0017).
//! cellctl is a CLI — it CAN set headers, so it does: the upgrade
//! request is built via `IntoClientRequest::into_client_request` and
//! `Authorization: Bearer <token>` is installed directly. The
//! `Sec-WebSocket-Protocol: bearer.<token>` subprotocol convention was
//! considered and rejected — token-bearing subprotocols are a
//! workaround for the browser limitation, not the right tool when you
//! have a direct header path.

use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::handshake::client::Request as WsRequest;
use tokio_tungstenite::tungstenite::http::{header as ws_header, HeaderValue as WsHeaderValue};
use tokio_tungstenite::tungstenite::Message;

use crate::client::CellosClient;
use crate::exit::{CtlError, CtlResult};
use crate::model::CloudEvent;

/// Wire shape returned by `GET /v1/events`. Mirrors
/// `cellos_server::routes::events::EventsResponse`. We keep the type
/// private to this module — every cellctl command renders directly to
/// stdout, there is no shared client-side projection.
#[derive(Debug, serde::Deserialize)]
struct EventsResponse {
    #[serde(default)]
    events: Vec<EventEnvelope>,
    #[serde(default)]
    #[allow(dead_code)] // surfaced via debug logs / future paging
    cursor: u64,
}

#[derive(Debug, serde::Deserialize)]
struct EventEnvelope {
    #[serde(default)]
    #[allow(dead_code)] // rendered in `print_event` only via the inner CloudEvent today
    seq: u64,
    event: CloudEvent,
}

pub async fn run(
    client: &CellosClient,
    formation: Option<&str>,
    follow: bool,
    since: Option<u64>,
    limit: Option<usize>,
) -> CtlResult<()> {
    if !follow {
        return one_shot(client, formation, since, limit).await;
    }
    if limit.is_some() {
        // `--limit` is meaningless with `--follow` (the WebSocket is a
        // live tail, not a paginated pull). Warn rather than error so
        // a copy-pasted command line still works.
        eprintln!("cellctl: warning: --limit ignored with --follow");
    }
    follow_ws(client, formation, since).await
}

async fn one_shot(
    client: &CellosClient,
    formation: Option<&str>,
    since: Option<u64>,
    limit: Option<usize>,
) -> CtlResult<()> {
    let path = one_shot_path(formation, since, limit);
    let resp = client.get_stream(&path).await?;
    let body = resp.text().await?;
    let trimmed = body.trim_start();

    // EVT-001 canonical shape: `{events: [...], cursor: N}`. Parse it
    // strictly first; fall through to the legacy shapes (bare array
    // or NDJSON) only if the canonical parse fails, so we don't
    // silently mask a malformed envelope.
    if trimmed.starts_with('{') {
        let resp: EventsResponse = serde_json::from_str(trimmed)
            .map_err(|e| CtlError::api(format!("parse events response: {e}")))?;
        for env in &resp.events {
            print_event(&env.event);
        }
        return Ok(());
    }

    // Legacy fallbacks. The server's stable contract is the canonical
    // envelope above; these branches exist so a cellctl built against
    // an older server (or a future test fixture) still renders
    // something useful rather than 1xx-erroring on a parse failure.
    if trimmed.starts_with('[') {
        let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
        for ev in arr {
            print_event(&ev);
        }
    } else {
        for line in body.lines() {
            if line.trim().is_empty() {
                continue;
            }
            let ev: CloudEvent = serde_json::from_str(line)
                .map_err(|e| CtlError::api(format!("parse event: {e}")))?;
            print_event(&ev);
        }
    }
    Ok(())
}

/// Build the relative path for the one-shot endpoint. Pulled out so
/// the query-string composition can be unit-tested without a live
/// server.
fn one_shot_path(formation: Option<&str>, since: Option<u64>, limit: Option<usize>) -> String {
    let mut path = String::from("/v1/events");
    let mut first = true;
    let mut push = |k: &str, v: String, first: &mut bool| {
        path.push(if *first { '?' } else { '&' });
        *first = false;
        path.push_str(k);
        path.push('=');
        path.push_str(&v);
    };
    if let Some(f) = formation {
        push("formation", urlencode(f), &mut first);
    }
    if let Some(s) = since {
        push("since", s.to_string(), &mut first);
    }
    if let Some(l) = limit {
        push("limit", l.to_string(), &mut first);
    }
    path
}

async fn follow_ws(
    client: &CellosClient,
    formation: Option<&str>,
    since: Option<u64>,
) -> CtlResult<()> {
    let path = ws_path(formation, since);
    let url = client.ws_url(&path)?;

    // EVT-002 fix: build an explicit upgrade request so we can install
    // `Authorization: Bearer <token>` before handing it to tungstenite.
    // The previous code passed a raw URL string to `connect_async`,
    // which has no way to carry a custom header — the server's
    // `require_bearer` check then rejected with 401 and the user saw a
    // confusing "ws: HTTP error: 401" diagnostic.
    let request = build_ws_request(&url, client.bearer_token())?;

    let (ws_stream, _resp) = tokio_tungstenite::connect_async(request)
        .await
        .map_err(|e| CtlError::api(format!("ws connect {url}: {e}")))?;

    let (mut tx, mut rx) = ws_stream.split();

    loop {
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {
                let _ = tx.send(Message::Close(None)).await;
                eprintln!();
                return Ok(());
            }
            msg = rx.next() => match msg {
                Some(Ok(Message::Text(t))) => {
                    render_ws_frame(&t);
                }
                Some(Ok(Message::Binary(b))) => {
                    if let Ok(s) = std::str::from_utf8(&b) {
                        render_ws_frame(s);
                    }
                }
                Some(Ok(Message::Ping(payload))) => {
                    let _ = tx.send(Message::Pong(payload)).await;
                }
                Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {}
                Some(Ok(Message::Close(_))) | None => return Ok(()),
                Some(Err(e)) => {
                    return Err(CtlError::api(format!("ws: {e}")));
                }
            }
        }
    }
}

/// Render one inbound WS text frame. The server wraps every CloudEvent
/// in a `{seq, event}` envelope (see `cellos-server::ws::build_envelope`);
/// strip the envelope before printing so the operator sees the same
/// event shape they get from the one-shot endpoint.
fn render_ws_frame(text: &str) {
    // Try the canonical envelope shape first.
    if let Ok(v) = serde_json::from_str::<serde_json::Value>(text) {
        if let Some(inner) = v.get("event") {
            if let Ok(ev) = serde_json::from_value::<CloudEvent>(inner.clone()) {
                print_event(&ev);
                return;
            }
        }
        // Bare CloudEvent (older bridges; defensive).
        if let Ok(ev) = serde_json::from_value::<CloudEvent>(v) {
            print_event(&ev);
            return;
        }
    }
    // Fall through: print as-is so the operator never silently loses a frame.
    println!("{text}");
}

/// Build the WS upgrade `http::Request` with `Authorization: Bearer
/// <token>` installed when a token is configured. Pulled out so it can
/// be unit-tested without a live server (EVT-002 regression pin).
fn build_ws_request(url: &str, bearer: Option<&str>) -> CtlResult<WsRequest> {
    let mut request = url
        .into_client_request()
        .map_err(|e| CtlError::usage(format!("ws build request {url}: {e}")))?;
    if let Some(tok) = bearer {
        let value = WsHeaderValue::from_str(&format!("Bearer {tok}"))
            .map_err(|e| CtlError::usage(format!("bad bearer token: {e}")))?;
        request
            .headers_mut()
            .insert(ws_header::AUTHORIZATION, value);
    }
    Ok(request)
}

fn ws_path(formation: Option<&str>, since: Option<u64>) -> String {
    let mut path = String::from("/ws/events");
    let mut first = true;
    let mut push = |k: &str, v: String, first: &mut bool| {
        path.push(if *first { '?' } else { '&' });
        *first = false;
        path.push_str(k);
        path.push('=');
        path.push_str(&v);
    };
    if let Some(f) = formation {
        push("formation", urlencode(f), &mut first);
    }
    if let Some(s) = since {
        push("since", s.to_string(), &mut first);
    }
    path
}

fn print_event(ev: &CloudEvent) {
    let ts = ev.time.as_deref().unwrap_or("-");
    let kind = ev.event_type.as_deref().unwrap_or("event");
    let subject = ev.subject.as_deref().unwrap_or("-");
    let data = ev
        .data
        .as_ref()
        .map(|v| serde_json::to_string(v).unwrap_or_default())
        .unwrap_or_default();
    if data.is_empty() {
        println!("{ts}  {kind}  {subject}");
    } else {
        println!("{ts}  {kind}  {subject}  {data}");
    }
}

fn urlencode(s: &str) -> String {
    url::form_urlencoded::byte_serialize(s.as_bytes()).collect()
}

#[cfg(test)]
mod tests {
    use super::*;

    /// EVT-002 regression: the WS upgrade request MUST carry
    /// `Authorization: Bearer <token>` when a token is configured.
    /// Before this fix `connect_async(url)` was called bare and the
    /// server returned 401.
    #[test]
    fn build_ws_request_installs_bearer_when_token_present() {
        let req = build_ws_request("ws://127.0.0.1:8080/ws/events", Some("s3cr3t"))
            .expect("build ws request");
        let auth = req
            .headers()
            .get(ws_header::AUTHORIZATION)
            .expect("AUTHORIZATION header must be present");
        assert_eq!(
            auth.to_str().expect("ascii header"),
            "Bearer s3cr3t",
            "EVT-002: WS upgrade must carry the Bearer token"
        );
    }

    /// No token configured → no header installed. The server will
    /// 401 the connect, which is the correct contract.
    #[test]
    fn build_ws_request_no_bearer_when_token_absent() {
        let req =
            build_ws_request("ws://127.0.0.1:8080/ws/events", None).expect("build ws request");
        assert!(
            req.headers().get(ws_header::AUTHORIZATION).is_none(),
            "no AUTHORIZATION header when no token is configured",
        );
    }

    /// `wss://` (TLS upgrade target) must be accepted just like
    /// `ws://`. Pinning so a future refactor that hard-codes the
    /// scheme breaks here.
    #[test]
    fn build_ws_request_accepts_wss_scheme() {
        let req = build_ws_request("wss://cellos.example.com/ws/events", Some("t"))
            .expect("build wss request");
        assert!(req.headers().get(ws_header::AUTHORIZATION).is_some());
    }

    /// The one-shot path builder must produce the documented query
    /// string. Pinning the shape so future changes can't silently
    /// drop a parameter on the wire.
    #[test]
    fn one_shot_path_composes_all_known_params() {
        let p = one_shot_path(Some("demo"), Some(42), Some(50));
        assert!(p.starts_with("/v1/events?"), "got {p}");
        assert!(p.contains("formation=demo"), "got {p}");
        assert!(p.contains("since=42"), "got {p}");
        assert!(p.contains("limit=50"), "got {p}");
    }

    #[test]
    fn one_shot_path_no_params() {
        assert_eq!(one_shot_path(None, None, None), "/v1/events");
    }

    #[test]
    fn ws_path_threads_since() {
        let p = ws_path(None, Some(7));
        assert_eq!(p, "/ws/events?since=7");
    }
}