cellos-ctl 0.5.2

cellctl — kubectl-style CLI for CellOS execution cells and formations. Thin HTTP client over cellos-server with apply/get/describe/logs/events/webui.
Documentation
//! `cellctl logs <cell> [--follow] [--tail N]`
//!
//! Per Session 16: cellctl logs is the cell's CloudEvent stream formatted for
//! readability — "like kubectl logs but events instead of log lines."
//!
//! Single API call per command (Feynman's rule):
//!   - one-shot: `GET /v1/cells/<id>/events?tail=N`
//!   - follow:   same path with `?follow=true`; the server keeps the body open
//!     and emits newline-delimited JSON (one CloudEvent per line)

use futures_util::StreamExt;

use crate::client::CellosClient;
use crate::exit::{CtlError, CtlResult};
use crate::model::CloudEvent;

pub async fn run(
    client: &CellosClient,
    cell: &str,
    follow: bool,
    tail: Option<usize>,
) -> CtlResult<()> {
    let mut path = format!("/v1/cells/{}/events", urlencode(cell));
    let mut params: Vec<String> = Vec::new();
    if follow {
        params.push("follow=true".into());
    }
    if let Some(n) = tail {
        params.push(format!("tail={n}"));
    }
    if !params.is_empty() {
        path.push('?');
        path.push_str(&params.join("&"));
    }

    if !follow {
        // One-shot. Accept either a JSON array or NDJSON.
        let resp = client.get_stream(&path).await?;
        let body = resp.text().await?;
        let trimmed = body.trim_start();
        if trimmed.starts_with('[') {
            let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
            for ev in arr {
                print_event(&ev);
            }
        } else {
            for line in body.lines() {
                if line.trim().is_empty() {
                    continue;
                }
                let ev: CloudEvent = serde_json::from_str(line)
                    .map_err(|e| CtlError::api(format!("parse event: {e}")))?;
                print_event(&ev);
            }
        }
        return Ok(());
    }

    // Follow mode: read chunks, split on newlines, print as JSON arrives.
    let resp = client.get_stream(&path).await?;
    let mut stream = resp.bytes_stream();
    let mut buf: Vec<u8> = Vec::new();

    loop {
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {
                eprintln!();
                return Ok(());
            }
            chunk = stream.next() => {
                match chunk {
                    Some(Ok(bytes)) => {
                        buf.extend_from_slice(&bytes);
                        // Drain complete lines.
                        while let Some(pos) = buf.iter().position(|&b| b == b'\n') {
                            let line: Vec<u8> = buf.drain(..=pos).collect();
                            let line = &line[..line.len().saturating_sub(1)];
                            if line.is_empty() {
                                continue;
                            }
                            match serde_json::from_slice::<CloudEvent>(line) {
                                Ok(ev) => print_event(&ev),
                                Err(_) => {
                                    // Not JSON — print raw so the user still sees something.
                                    if let Ok(s) = std::str::from_utf8(line) {
                                        println!("{s}");
                                    }
                                }
                            }
                        }
                    }
                    Some(Err(e)) => {
                        return Err(CtlError::api(format!("stream: {e}")));
                    }
                    None => return Ok(()),
                }
            }
        }
    }
}

fn print_event(ev: &CloudEvent) {
    let ts = ev.time.as_deref().unwrap_or("-");
    let kind = ev.event_type.as_deref().unwrap_or("event");
    let subject = ev.subject.as_deref().unwrap_or("-");
    let data = ev
        .data
        .as_ref()
        .map(|v| serde_json::to_string(v).unwrap_or_default())
        .unwrap_or_default();
    if data.is_empty() {
        println!("{ts}  {kind}  {subject}");
    } else {
        println!("{ts}  {kind}  {subject}  {data}");
    }
}

fn urlencode(s: &str) -> String {
    url::form_urlencoded::byte_serialize(s.as_bytes()).collect()
}