use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::handshake::client::Request as WsRequest;
use tokio_tungstenite::tungstenite::http::{header as ws_header, HeaderValue as WsHeaderValue};
use tokio_tungstenite::tungstenite::Message;
use crate::client::CellosClient;
use crate::exit::{CtlError, CtlResult};
use crate::model::CloudEvent;
#[derive(Debug, serde::Deserialize)]
struct EventsResponse {
#[serde(default)]
events: Vec<EventEnvelope>,
#[serde(default)]
#[allow(dead_code)] cursor: u64,
}
#[derive(Debug, serde::Deserialize)]
struct EventEnvelope {
#[serde(default)]
#[allow(dead_code)] seq: u64,
event: CloudEvent,
}
pub async fn run(
client: &CellosClient,
formation: Option<&str>,
follow: bool,
since: Option<u64>,
limit: Option<usize>,
) -> CtlResult<()> {
if !follow {
return one_shot(client, formation, since, limit).await;
}
if limit.is_some() {
eprintln!("cellctl: warning: --limit ignored with --follow");
}
follow_ws(client, formation, since).await
}
async fn one_shot(
client: &CellosClient,
formation: Option<&str>,
since: Option<u64>,
limit: Option<usize>,
) -> CtlResult<()> {
let path = one_shot_path(formation, since, limit);
let resp = client.get_stream(&path).await?;
let body = resp.text().await?;
let trimmed = body.trim_start();
if trimmed.starts_with('{') {
let resp: EventsResponse = serde_json::from_str(trimmed)
.map_err(|e| CtlError::api(format!("parse events response: {e}")))?;
for env in &resp.events {
print_event(&env.event);
}
return Ok(());
}
if trimmed.starts_with('[') {
let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
for ev in arr {
print_event(&ev);
}
} else {
for line in body.lines() {
if line.trim().is_empty() {
continue;
}
let ev: CloudEvent = serde_json::from_str(line)
.map_err(|e| CtlError::api(format!("parse event: {e}")))?;
print_event(&ev);
}
}
Ok(())
}
fn one_shot_path(formation: Option<&str>, since: Option<u64>, limit: Option<usize>) -> String {
let mut path = String::from("/v1/events");
let mut first = true;
let mut push = |k: &str, v: String, first: &mut bool| {
path.push(if *first { '?' } else { '&' });
*first = false;
path.push_str(k);
path.push('=');
path.push_str(&v);
};
if let Some(f) = formation {
push("formation", urlencode(f), &mut first);
}
if let Some(s) = since {
push("since", s.to_string(), &mut first);
}
if let Some(l) = limit {
push("limit", l.to_string(), &mut first);
}
path
}
async fn follow_ws(
client: &CellosClient,
formation: Option<&str>,
since: Option<u64>,
) -> CtlResult<()> {
let path = ws_path(formation, since);
let url = client.ws_url(&path)?;
let request = build_ws_request(&url, client.bearer_token())?;
let (ws_stream, _resp) = tokio_tungstenite::connect_async(request)
.await
.map_err(|e| CtlError::api(format!("ws connect {url}: {e}")))?;
let (mut tx, mut rx) = ws_stream.split();
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
let _ = tx.send(Message::Close(None)).await;
eprintln!();
return Ok(());
}
msg = rx.next() => match msg {
Some(Ok(Message::Text(t))) => {
render_ws_frame(&t);
}
Some(Ok(Message::Binary(b))) => {
if let Ok(s) = std::str::from_utf8(&b) {
render_ws_frame(s);
}
}
Some(Ok(Message::Ping(payload))) => {
let _ = tx.send(Message::Pong(payload)).await;
}
Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {}
Some(Ok(Message::Close(_))) | None => return Ok(()),
Some(Err(e)) => {
return Err(CtlError::api(format!("ws: {e}")));
}
}
}
}
}
fn render_ws_frame(text: &str) {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(text) {
if let Some(inner) = v.get("event") {
if let Ok(ev) = serde_json::from_value::<CloudEvent>(inner.clone()) {
print_event(&ev);
return;
}
}
if let Ok(ev) = serde_json::from_value::<CloudEvent>(v) {
print_event(&ev);
return;
}
}
println!("{text}");
}
fn build_ws_request(url: &str, bearer: Option<&str>) -> CtlResult<WsRequest> {
let mut request = url
.into_client_request()
.map_err(|e| CtlError::usage(format!("ws build request {url}: {e}")))?;
if let Some(tok) = bearer {
let value = WsHeaderValue::from_str(&format!("Bearer {tok}"))
.map_err(|e| CtlError::usage(format!("bad bearer token: {e}")))?;
request
.headers_mut()
.insert(ws_header::AUTHORIZATION, value);
}
Ok(request)
}
fn ws_path(formation: Option<&str>, since: Option<u64>) -> String {
let mut path = String::from("/ws/events");
let mut first = true;
let mut push = |k: &str, v: String, first: &mut bool| {
path.push(if *first { '?' } else { '&' });
*first = false;
path.push_str(k);
path.push('=');
path.push_str(&v);
};
if let Some(f) = formation {
push("formation", urlencode(f), &mut first);
}
if let Some(s) = since {
push("since", s.to_string(), &mut first);
}
path
}
fn print_event(ev: &CloudEvent) {
let ts = ev.time.as_deref().unwrap_or("-");
let kind = ev.event_type.as_deref().unwrap_or("event");
let subject = ev.subject.as_deref().unwrap_or("-");
let data = ev
.data
.as_ref()
.map(|v| serde_json::to_string(v).unwrap_or_default())
.unwrap_or_default();
if data.is_empty() {
println!("{ts} {kind} {subject}");
} else {
println!("{ts} {kind} {subject} {data}");
}
}
fn urlencode(s: &str) -> String {
percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_ws_request_installs_bearer_when_token_present() {
let req = build_ws_request("ws://127.0.0.1:8080/ws/events", Some("s3cr3t"))
.expect("build ws request");
let auth = req
.headers()
.get(ws_header::AUTHORIZATION)
.expect("AUTHORIZATION header must be present");
assert_eq!(
auth.to_str().expect("ascii header"),
"Bearer s3cr3t",
"EVT-002: WS upgrade must carry the Bearer token"
);
}
#[test]
fn build_ws_request_no_bearer_when_token_absent() {
let req =
build_ws_request("ws://127.0.0.1:8080/ws/events", None).expect("build ws request");
assert!(
req.headers().get(ws_header::AUTHORIZATION).is_none(),
"no AUTHORIZATION header when no token is configured",
);
}
#[test]
fn build_ws_request_accepts_wss_scheme() {
let req = build_ws_request("wss://cellos.example.com/ws/events", Some("t"))
.expect("build wss request");
assert!(req.headers().get(ws_header::AUTHORIZATION).is_some());
}
#[test]
fn one_shot_path_composes_all_known_params() {
let p = one_shot_path(Some("demo"), Some(42), Some(50));
assert!(p.starts_with("/v1/events?"), "got {p}");
assert!(p.contains("formation=demo"), "got {p}");
assert!(p.contains("since=42"), "got {p}");
assert!(p.contains("limit=50"), "got {p}");
}
#[test]
fn one_shot_path_no_params() {
assert_eq!(one_shot_path(None, None, None), "/v1/events");
}
#[test]
fn ws_path_threads_since() {
let p = ws_path(None, Some(7));
assert_eq!(p, "/ws/events?since=7");
}
}