use futures_util::StreamExt;
use crate::client::CellosClient;
use crate::exit::{CtlError, CtlResult};
use crate::model::CloudEvent;
pub async fn run(
client: &CellosClient,
cell: &str,
follow: bool,
tail: Option<usize>,
) -> CtlResult<()> {
let mut path = format!("/v1/cells/{}/events", urlencode(cell));
let mut params: Vec<String> = Vec::new();
if follow {
params.push("follow=true".into());
}
if let Some(n) = tail {
params.push(format!("tail={n}"));
}
if !params.is_empty() {
path.push('?');
path.push_str(¶ms.join("&"));
}
if !follow {
let resp = client.get_stream(&path).await?;
let body = resp.text().await?;
let trimmed = body.trim_start();
if trimmed.starts_with('[') {
let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
for ev in arr {
print_event(&ev);
}
} else {
for line in body.lines() {
if line.trim().is_empty() {
continue;
}
let ev: CloudEvent = serde_json::from_str(line)
.map_err(|e| CtlError::api(format!("parse event: {e}")))?;
print_event(&ev);
}
}
return Ok(());
}
let resp = client.get_stream(&path).await?;
let mut stream = resp.bytes_stream();
let mut buf: Vec<u8> = Vec::new();
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
eprintln!();
return Ok(());
}
chunk = stream.next() => {
match chunk {
Some(Ok(bytes)) => {
buf.extend_from_slice(&bytes);
while let Some(pos) = buf.iter().position(|&b| b == b'\n') {
let line: Vec<u8> = buf.drain(..=pos).collect();
let line = &line[..line.len().saturating_sub(1)];
if line.is_empty() {
continue;
}
match serde_json::from_slice::<CloudEvent>(line) {
Ok(ev) => print_event(&ev),
Err(_) => {
if let Ok(s) = std::str::from_utf8(line) {
println!("{s}");
}
}
}
}
}
Some(Err(e)) => {
return Err(CtlError::api(format!("stream: {e}")));
}
None => return Ok(()),
}
}
}
}
}
fn print_event(ev: &CloudEvent) {
let ts = ev.time.as_deref().unwrap_or("-");
let kind = ev.event_type.as_deref().unwrap_or("event");
let subject = ev.subject.as_deref().unwrap_or("-");
let data = ev
.data
.as_ref()
.map(|v| serde_json::to_string(v).unwrap_or_default())
.unwrap_or_default();
if data.is_empty() {
println!("{ts} {kind} {subject}");
} else {
println!("{ts} {kind} {subject} {data}");
}
}
fn urlencode(s: &str) -> String {
url::form_urlencoded::byte_serialize(s.as_bytes()).collect()
}