use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::Message;
use crate::client::CellosClient;
use crate::exit::{CtlError, CtlResult};
use crate::model::CloudEvent;
pub async fn run(client: &CellosClient, formation: Option<&str>, follow: bool) -> CtlResult<()> {
if !follow {
return one_shot(client, formation).await;
}
follow_ws(client, formation).await
}
async fn one_shot(client: &CellosClient, formation: Option<&str>) -> CtlResult<()> {
let mut path = "/v1/events".to_string();
if let Some(f) = formation {
path.push_str(&format!("?formation={}", urlencode(f)));
}
let resp = client.get_stream(&path).await?;
let body = resp.text().await?;
let trimmed = body.trim_start();
if trimmed.starts_with('[') {
let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
for ev in arr {
print_event(&ev);
}
} else {
for line in body.lines() {
if line.trim().is_empty() {
continue;
}
let ev: CloudEvent = serde_json::from_str(line)
.map_err(|e| CtlError::api(format!("parse event: {e}")))?;
print_event(&ev);
}
}
Ok(())
}
async fn follow_ws(client: &CellosClient, formation: Option<&str>) -> CtlResult<()> {
let mut path = "/ws/events".to_string();
if let Some(f) = formation {
path.push_str(&format!("?formation={}", urlencode(f)));
}
let url = client.ws_url(&path)?;
let (ws_stream, _resp) = tokio_tungstenite::connect_async(&url)
.await
.map_err(|e| CtlError::api(format!("ws connect {url}: {e}")))?;
let (mut tx, mut rx) = ws_stream.split();
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
let _ = tx.send(Message::Close(None)).await;
eprintln!();
return Ok(());
}
msg = rx.next() => match msg {
Some(Ok(Message::Text(t))) => {
match serde_json::from_str::<CloudEvent>(&t) {
Ok(ev) => print_event(&ev),
Err(_) => println!("{t}"),
}
}
Some(Ok(Message::Binary(b))) => {
if let Ok(s) = std::str::from_utf8(&b) {
match serde_json::from_str::<CloudEvent>(s) {
Ok(ev) => print_event(&ev),
Err(_) => println!("{s}"),
}
}
}
Some(Ok(Message::Ping(payload))) => {
let _ = tx.send(Message::Pong(payload)).await;
}
Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {}
Some(Ok(Message::Close(_))) | None => return Ok(()),
Some(Err(e)) => {
return Err(CtlError::api(format!("ws: {e}")));
}
}
}
}
}
fn print_event(ev: &CloudEvent) {
let ts = ev.time.as_deref().unwrap_or("-");
let kind = ev.event_type.as_deref().unwrap_or("event");
let subject = ev.subject.as_deref().unwrap_or("-");
let data = ev
.data
.as_ref()
.map(|v| serde_json::to_string(v).unwrap_or_default())
.unwrap_or_default();
if data.is_empty() {
println!("{ts} {kind} {subject}");
} else {
println!("{ts} {kind} {subject} {data}");
}
}
fn urlencode(s: &str) -> String {
url::form_urlencoded::byte_serialize(s.as_bytes()).collect()
}