Skip to main content

cellos_ctl/cmd/
logs.rs

1//! `cellctl logs <cell> [--follow] [--tail N]`
2//!
3//! Per Session 16: cellctl logs is the cell's CloudEvent stream formatted for
4//! readability — "like kubectl logs but events instead of log lines."
5//!
6//! Single API call per command (Feynman's rule):
7//!   - one-shot: `GET /v1/cells/<id>/events?tail=N`
8//!   - follow:   same path with `?follow=true`; the server keeps the body open
9//!     and emits newline-delimited JSON (one CloudEvent per line)
10
11use futures_util::StreamExt;
12
13use crate::client::CellosClient;
14use crate::exit::{CtlError, CtlResult};
15use crate::model::CloudEvent;
16
17pub async fn run(
18    client: &CellosClient,
19    cell: &str,
20    follow: bool,
21    tail: Option<usize>,
22) -> CtlResult<()> {
23    let mut path = format!("/v1/cells/{}/events", urlencode(cell));
24    let mut params: Vec<String> = Vec::new();
25    if follow {
26        params.push("follow=true".into());
27    }
28    if let Some(n) = tail {
29        params.push(format!("tail={n}"));
30    }
31    if !params.is_empty() {
32        path.push('?');
33        path.push_str(&params.join("&"));
34    }
35
36    if !follow {
37        // One-shot. Accept either a JSON array or NDJSON.
38        let resp = client.get_stream(&path).await?;
39        let body = resp.text().await?;
40        let trimmed = body.trim_start();
41        if trimmed.starts_with('[') {
42            let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
43            for ev in arr {
44                print_event(&ev);
45            }
46        } else {
47            for line in body.lines() {
48                if line.trim().is_empty() {
49                    continue;
50                }
51                let ev: CloudEvent = serde_json::from_str(line)
52                    .map_err(|e| CtlError::api(format!("parse event: {e}")))?;
53                print_event(&ev);
54            }
55        }
56        return Ok(());
57    }
58
59    // Follow mode: read chunks, split on newlines, print as JSON arrives.
60    let resp = client.get_stream(&path).await?;
61    let mut stream = resp.bytes_stream();
62    let mut buf: Vec<u8> = Vec::new();
63
64    loop {
65        tokio::select! {
66            _ = tokio::signal::ctrl_c() => {
67                eprintln!();
68                return Ok(());
69            }
70            chunk = stream.next() => {
71                match chunk {
72                    Some(Ok(bytes)) => {
73                        buf.extend_from_slice(&bytes);
74                        // Drain complete lines.
75                        while let Some(pos) = buf.iter().position(|&b| b == b'\n') {
76                            let line: Vec<u8> = buf.drain(..=pos).collect();
77                            let line = &line[..line.len().saturating_sub(1)];
78                            if line.is_empty() {
79                                continue;
80                            }
81                            match serde_json::from_slice::<CloudEvent>(line) {
82                                Ok(ev) => print_event(&ev),
83                                Err(_) => {
84                                    // Not JSON — print raw so the user still sees something.
85                                    if let Ok(s) = std::str::from_utf8(line) {
86                                        println!("{s}");
87                                    }
88                                }
89                            }
90                        }
91                    }
92                    Some(Err(e)) => {
93                        return Err(CtlError::api(format!("stream: {e}")));
94                    }
95                    None => return Ok(()),
96                }
97            }
98        }
99    }
100}
101
102fn print_event(ev: &CloudEvent) {
103    let ts = ev.time.as_deref().unwrap_or("-");
104    let kind = ev.event_type.as_deref().unwrap_or("event");
105    let subject = ev.subject.as_deref().unwrap_or("-");
106    let data = ev
107        .data
108        .as_ref()
109        .map(|v| serde_json::to_string(v).unwrap_or_default())
110        .unwrap_or_default();
111    if data.is_empty() {
112        println!("{ts}  {kind}  {subject}");
113    } else {
114        println!("{ts}  {kind}  {subject}  {data}");
115    }
116}
117
118fn urlencode(s: &str) -> String {
119    percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect()
120}