1use futures_util::StreamExt;
12
13use crate::client::CellosClient;
14use crate::exit::{CtlError, CtlResult};
15use crate::model::CloudEvent;
16
17pub async fn run(
18 client: &CellosClient,
19 cell: &str,
20 follow: bool,
21 tail: Option<usize>,
22) -> CtlResult<()> {
23 let mut path = format!("/v1/cells/{}/events", urlencode(cell));
24 let mut params: Vec<String> = Vec::new();
25 if follow {
26 params.push("follow=true".into());
27 }
28 if let Some(n) = tail {
29 params.push(format!("tail={n}"));
30 }
31 if !params.is_empty() {
32 path.push('?');
33 path.push_str(¶ms.join("&"));
34 }
35
36 if !follow {
37 let resp = client.get_stream(&path).await?;
39 let body = resp.text().await?;
40 let trimmed = body.trim_start();
41 if trimmed.starts_with('[') {
42 let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
43 for ev in arr {
44 print_event(&ev);
45 }
46 } else {
47 for line in body.lines() {
48 if line.trim().is_empty() {
49 continue;
50 }
51 let ev: CloudEvent = serde_json::from_str(line)
52 .map_err(|e| CtlError::api(format!("parse event: {e}")))?;
53 print_event(&ev);
54 }
55 }
56 return Ok(());
57 }
58
59 let resp = client.get_stream(&path).await?;
61 let mut stream = resp.bytes_stream();
62 let mut buf: Vec<u8> = Vec::new();
63
64 loop {
65 tokio::select! {
66 _ = tokio::signal::ctrl_c() => {
67 eprintln!();
68 return Ok(());
69 }
70 chunk = stream.next() => {
71 match chunk {
72 Some(Ok(bytes)) => {
73 buf.extend_from_slice(&bytes);
74 while let Some(pos) = buf.iter().position(|&b| b == b'\n') {
76 let line: Vec<u8> = buf.drain(..=pos).collect();
77 let line = &line[..line.len().saturating_sub(1)];
78 if line.is_empty() {
79 continue;
80 }
81 match serde_json::from_slice::<CloudEvent>(line) {
82 Ok(ev) => print_event(&ev),
83 Err(_) => {
84 if let Ok(s) = std::str::from_utf8(line) {
86 println!("{s}");
87 }
88 }
89 }
90 }
91 }
92 Some(Err(e)) => {
93 return Err(CtlError::api(format!("stream: {e}")));
94 }
95 None => return Ok(()),
96 }
97 }
98 }
99 }
100}
101
102fn print_event(ev: &CloudEvent) {
103 let ts = ev.time.as_deref().unwrap_or("-");
104 let kind = ev.event_type.as_deref().unwrap_or("event");
105 let subject = ev.subject.as_deref().unwrap_or("-");
106 let data = ev
107 .data
108 .as_ref()
109 .map(|v| serde_json::to_string(v).unwrap_or_default())
110 .unwrap_or_default();
111 if data.is_empty() {
112 println!("{ts} {kind} {subject}");
113 } else {
114 println!("{ts} {kind} {subject} {data}");
115 }
116}
117
118fn urlencode(s: &str) -> String {
119 percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect()
120}