Skip to main content

cellos_ctl/cmd/
events.rs

1//! `cellctl events [--formation NAME] [--follow] [--since SEQ] [--limit N]`
2//!
3//! Two delivery paths, same render:
4//!
5//! * **One-shot** (no `--follow`) — HTTP `GET /v1/events` returns a
6//!   bounded page of recent events from JetStream. Suitable for
7//!   environments where WebSocket isn't viable (corporate proxies,
8//!   kubectl-style scripted pulls). Response shape mirrors the WS
9//!   envelope so the wire is uniform: `{events: [{seq, event}], cursor}`.
10//!
11//! * **Follow** (`--follow`) — WebSocket `/ws/events`. Live tail of
12//!   CloudEvents as the server forwards them from JetStream. Optional
13//!   `--since` resumes at a cursor without re-emitting frames the
14//!   client has already seen (ADR-0015 §D3).
15//!
16//! ## EVT-002 — WebSocket Bearer auth
17//!
18//! Browsers can't set custom WebSocket headers, so the web view's WS
19//! path falls back to a localhost-proxy auth model (see ADR-0017).
20//! cellctl is a CLI — it CAN set headers, so it does: the upgrade
21//! request is built via `IntoClientRequest::into_client_request` and
22//! `Authorization: Bearer <token>` is installed directly. The
23//! `Sec-WebSocket-Protocol: bearer.<token>` subprotocol convention was
24//! considered and rejected — token-bearing subprotocols are a
25//! workaround for the browser limitation, not the right tool when you
26//! have a direct header path.
27
28use futures_util::{SinkExt, StreamExt};
29use tokio_tungstenite::tungstenite::client::IntoClientRequest;
30use tokio_tungstenite::tungstenite::handshake::client::Request as WsRequest;
31use tokio_tungstenite::tungstenite::http::{header as ws_header, HeaderValue as WsHeaderValue};
32use tokio_tungstenite::tungstenite::Message;
33
34use crate::client::CellosClient;
35use crate::exit::{CtlError, CtlResult};
36use crate::model::CloudEvent;
37
38/// Wire shape returned by `GET /v1/events`. Mirrors
39/// `cellos_server::routes::events::EventsResponse`. We keep the type
40/// private to this module — every cellctl command renders directly to
41/// stdout, there is no shared client-side projection.
42#[derive(Debug, serde::Deserialize)]
43struct EventsResponse {
44    #[serde(default)]
45    events: Vec<EventEnvelope>,
46    #[serde(default)]
47    #[allow(dead_code)] // surfaced via debug logs / future paging
48    cursor: u64,
49}
50
51#[derive(Debug, serde::Deserialize)]
52struct EventEnvelope {
53    #[serde(default)]
54    #[allow(dead_code)] // rendered in `print_event` only via the inner CloudEvent today
55    seq: u64,
56    event: CloudEvent,
57}
58
59pub async fn run(
60    client: &CellosClient,
61    formation: Option<&str>,
62    follow: bool,
63    since: Option<u64>,
64    limit: Option<usize>,
65) -> CtlResult<()> {
66    if !follow {
67        return one_shot(client, formation, since, limit).await;
68    }
69    if limit.is_some() {
70        // `--limit` is meaningless with `--follow` (the WebSocket is a
71        // live tail, not a paginated pull). Warn rather than error so
72        // a copy-pasted command line still works.
73        eprintln!("cellctl: warning: --limit ignored with --follow");
74    }
75    follow_ws(client, formation, since).await
76}
77
78async fn one_shot(
79    client: &CellosClient,
80    formation: Option<&str>,
81    since: Option<u64>,
82    limit: Option<usize>,
83) -> CtlResult<()> {
84    let path = one_shot_path(formation, since, limit);
85    let resp = client.get_stream(&path).await?;
86    let body = resp.text().await?;
87    let trimmed = body.trim_start();
88
89    // EVT-001 canonical shape: `{events: [...], cursor: N}`. Parse it
90    // strictly first; fall through to the legacy shapes (bare array
91    // or NDJSON) only if the canonical parse fails, so we don't
92    // silently mask a malformed envelope.
93    if trimmed.starts_with('{') {
94        let resp: EventsResponse = serde_json::from_str(trimmed)
95            .map_err(|e| CtlError::api(format!("parse events response: {e}")))?;
96        for env in &resp.events {
97            print_event(&env.event);
98        }
99        return Ok(());
100    }
101
102    // Legacy fallbacks. The server's stable contract is the canonical
103    // envelope above; these branches exist so a cellctl built against
104    // an older server (or a future test fixture) still renders
105    // something useful rather than 1xx-erroring on a parse failure.
106    if trimmed.starts_with('[') {
107        let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
108        for ev in arr {
109            print_event(&ev);
110        }
111    } else {
112        for line in body.lines() {
113            if line.trim().is_empty() {
114                continue;
115            }
116            let ev: CloudEvent = serde_json::from_str(line)
117                .map_err(|e| CtlError::api(format!("parse event: {e}")))?;
118            print_event(&ev);
119        }
120    }
121    Ok(())
122}
123
124/// Build the relative path for the one-shot endpoint. Pulled out so
125/// the query-string composition can be unit-tested without a live
126/// server.
127fn one_shot_path(formation: Option<&str>, since: Option<u64>, limit: Option<usize>) -> String {
128    let mut path = String::from("/v1/events");
129    let mut first = true;
130    let mut push = |k: &str, v: String, first: &mut bool| {
131        path.push(if *first { '?' } else { '&' });
132        *first = false;
133        path.push_str(k);
134        path.push('=');
135        path.push_str(&v);
136    };
137    if let Some(f) = formation {
138        push("formation", urlencode(f), &mut first);
139    }
140    if let Some(s) = since {
141        push("since", s.to_string(), &mut first);
142    }
143    if let Some(l) = limit {
144        push("limit", l.to_string(), &mut first);
145    }
146    path
147}
148
149async fn follow_ws(
150    client: &CellosClient,
151    formation: Option<&str>,
152    since: Option<u64>,
153) -> CtlResult<()> {
154    let path = ws_path(formation, since);
155    let url = client.ws_url(&path)?;
156
157    // EVT-002 fix: build an explicit upgrade request so we can install
158    // `Authorization: Bearer <token>` before handing it to tungstenite.
159    // The previous code passed a raw URL string to `connect_async`,
160    // which has no way to carry a custom header — the server's
161    // `require_bearer` check then rejected with 401 and the user saw a
162    // confusing "ws: HTTP error: 401" diagnostic.
163    let request = build_ws_request(&url, client.bearer_token())?;
164
165    let (ws_stream, _resp) = tokio_tungstenite::connect_async(request)
166        .await
167        .map_err(|e| CtlError::api(format!("ws connect {url}: {e}")))?;
168
169    let (mut tx, mut rx) = ws_stream.split();
170
171    loop {
172        tokio::select! {
173            _ = tokio::signal::ctrl_c() => {
174                let _ = tx.send(Message::Close(None)).await;
175                eprintln!();
176                return Ok(());
177            }
178            msg = rx.next() => match msg {
179                Some(Ok(Message::Text(t))) => {
180                    render_ws_frame(&t);
181                }
182                Some(Ok(Message::Binary(b))) => {
183                    if let Ok(s) = std::str::from_utf8(&b) {
184                        render_ws_frame(s);
185                    }
186                }
187                Some(Ok(Message::Ping(payload))) => {
188                    let _ = tx.send(Message::Pong(payload)).await;
189                }
190                Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {}
191                Some(Ok(Message::Close(_))) | None => return Ok(()),
192                Some(Err(e)) => {
193                    return Err(CtlError::api(format!("ws: {e}")));
194                }
195            }
196        }
197    }
198}
199
200/// Render one inbound WS text frame. The server wraps every CloudEvent
201/// in a `{seq, event}` envelope (see `cellos-server::ws::build_envelope`);
202/// strip the envelope before printing so the operator sees the same
203/// event shape they get from the one-shot endpoint.
204fn render_ws_frame(text: &str) {
205    // Try the canonical envelope shape first.
206    if let Ok(v) = serde_json::from_str::<serde_json::Value>(text) {
207        if let Some(inner) = v.get("event") {
208            if let Ok(ev) = serde_json::from_value::<CloudEvent>(inner.clone()) {
209                print_event(&ev);
210                return;
211            }
212        }
213        // Bare CloudEvent (older bridges; defensive).
214        if let Ok(ev) = serde_json::from_value::<CloudEvent>(v) {
215            print_event(&ev);
216            return;
217        }
218    }
219    // Fall through: print as-is so the operator never silently loses a frame.
220    println!("{text}");
221}
222
223/// Build the WS upgrade `http::Request` with `Authorization: Bearer
224/// <token>` installed when a token is configured. Pulled out so it can
225/// be unit-tested without a live server (EVT-002 regression pin).
226fn build_ws_request(url: &str, bearer: Option<&str>) -> CtlResult<WsRequest> {
227    let mut request = url
228        .into_client_request()
229        .map_err(|e| CtlError::usage(format!("ws build request {url}: {e}")))?;
230    if let Some(tok) = bearer {
231        let value = WsHeaderValue::from_str(&format!("Bearer {tok}"))
232            .map_err(|e| CtlError::usage(format!("bad bearer token: {e}")))?;
233        request
234            .headers_mut()
235            .insert(ws_header::AUTHORIZATION, value);
236    }
237    Ok(request)
238}
239
240fn ws_path(formation: Option<&str>, since: Option<u64>) -> String {
241    let mut path = String::from("/ws/events");
242    let mut first = true;
243    let mut push = |k: &str, v: String, first: &mut bool| {
244        path.push(if *first { '?' } else { '&' });
245        *first = false;
246        path.push_str(k);
247        path.push('=');
248        path.push_str(&v);
249    };
250    if let Some(f) = formation {
251        push("formation", urlencode(f), &mut first);
252    }
253    if let Some(s) = since {
254        push("since", s.to_string(), &mut first);
255    }
256    path
257}
258
259fn print_event(ev: &CloudEvent) {
260    let ts = ev.time.as_deref().unwrap_or("-");
261    let kind = ev.event_type.as_deref().unwrap_or("event");
262    let subject = ev.subject.as_deref().unwrap_or("-");
263    let data = ev
264        .data
265        .as_ref()
266        .map(|v| serde_json::to_string(v).unwrap_or_default())
267        .unwrap_or_default();
268    if data.is_empty() {
269        println!("{ts}  {kind}  {subject}");
270    } else {
271        println!("{ts}  {kind}  {subject}  {data}");
272    }
273}
274
275fn urlencode(s: &str) -> String {
276    percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect()
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    /// EVT-002 regression: the WS upgrade request MUST carry
284    /// `Authorization: Bearer <token>` when a token is configured.
285    /// Before this fix `connect_async(url)` was called bare and the
286    /// server returned 401.
287    #[test]
288    fn build_ws_request_installs_bearer_when_token_present() {
289        let req = build_ws_request("ws://127.0.0.1:8080/ws/events", Some("s3cr3t"))
290            .expect("build ws request");
291        let auth = req
292            .headers()
293            .get(ws_header::AUTHORIZATION)
294            .expect("AUTHORIZATION header must be present");
295        assert_eq!(
296            auth.to_str().expect("ascii header"),
297            "Bearer s3cr3t",
298            "EVT-002: WS upgrade must carry the Bearer token"
299        );
300    }
301
302    /// No token configured → no header installed. The server will
303    /// 401 the connect, which is the correct contract.
304    #[test]
305    fn build_ws_request_no_bearer_when_token_absent() {
306        let req =
307            build_ws_request("ws://127.0.0.1:8080/ws/events", None).expect("build ws request");
308        assert!(
309            req.headers().get(ws_header::AUTHORIZATION).is_none(),
310            "no AUTHORIZATION header when no token is configured",
311        );
312    }
313
314    /// `wss://` (TLS upgrade target) must be accepted just like
315    /// `ws://`. Pinning so a future refactor that hard-codes the
316    /// scheme breaks here.
317    #[test]
318    fn build_ws_request_accepts_wss_scheme() {
319        let req = build_ws_request("wss://cellos.example.com/ws/events", Some("t"))
320            .expect("build wss request");
321        assert!(req.headers().get(ws_header::AUTHORIZATION).is_some());
322    }
323
324    /// The one-shot path builder must produce the documented query
325    /// string. Pinning the shape so future changes can't silently
326    /// drop a parameter on the wire.
327    #[test]
328    fn one_shot_path_composes_all_known_params() {
329        let p = one_shot_path(Some("demo"), Some(42), Some(50));
330        assert!(p.starts_with("/v1/events?"), "got {p}");
331        assert!(p.contains("formation=demo"), "got {p}");
332        assert!(p.contains("since=42"), "got {p}");
333        assert!(p.contains("limit=50"), "got {p}");
334    }
335
336    #[test]
337    fn one_shot_path_no_params() {
338        assert_eq!(one_shot_path(None, None, None), "/v1/events");
339    }
340
341    #[test]
342    fn ws_path_threads_since() {
343        let p = ws_path(None, Some(7));
344        assert_eq!(p, "/ws/events?since=7");
345    }
346}