1use futures_util::{SinkExt, StreamExt};
29use tokio_tungstenite::tungstenite::client::IntoClientRequest;
30use tokio_tungstenite::tungstenite::handshake::client::Request as WsRequest;
31use tokio_tungstenite::tungstenite::http::{header as ws_header, HeaderValue as WsHeaderValue};
32use tokio_tungstenite::tungstenite::Message;
33
34use crate::client::CellosClient;
35use crate::exit::{CtlError, CtlResult};
36use crate::model::CloudEvent;
37
38#[derive(Debug, serde::Deserialize)]
43struct EventsResponse {
44 #[serde(default)]
45 events: Vec<EventEnvelope>,
46 #[serde(default)]
47 #[allow(dead_code)] cursor: u64,
49}
50
51#[derive(Debug, serde::Deserialize)]
52struct EventEnvelope {
53 #[serde(default)]
54 #[allow(dead_code)] seq: u64,
56 event: CloudEvent,
57}
58
59pub async fn run(
60 client: &CellosClient,
61 formation: Option<&str>,
62 follow: bool,
63 since: Option<u64>,
64 limit: Option<usize>,
65) -> CtlResult<()> {
66 if !follow {
67 return one_shot(client, formation, since, limit).await;
68 }
69 if limit.is_some() {
70 eprintln!("cellctl: warning: --limit ignored with --follow");
74 }
75 follow_ws(client, formation, since).await
76}
77
78async fn one_shot(
79 client: &CellosClient,
80 formation: Option<&str>,
81 since: Option<u64>,
82 limit: Option<usize>,
83) -> CtlResult<()> {
84 let path = one_shot_path(formation, since, limit);
85 let resp = client.get_stream(&path).await?;
86 let body = resp.text().await?;
87 let trimmed = body.trim_start();
88
89 if trimmed.starts_with('{') {
94 let resp: EventsResponse = serde_json::from_str(trimmed)
95 .map_err(|e| CtlError::api(format!("parse events response: {e}")))?;
96 for env in &resp.events {
97 print_event(&env.event);
98 }
99 return Ok(());
100 }
101
102 if trimmed.starts_with('[') {
107 let arr: Vec<CloudEvent> = serde_json::from_str(trimmed)?;
108 for ev in arr {
109 print_event(&ev);
110 }
111 } else {
112 for line in body.lines() {
113 if line.trim().is_empty() {
114 continue;
115 }
116 let ev: CloudEvent = serde_json::from_str(line)
117 .map_err(|e| CtlError::api(format!("parse event: {e}")))?;
118 print_event(&ev);
119 }
120 }
121 Ok(())
122}
123
124fn one_shot_path(formation: Option<&str>, since: Option<u64>, limit: Option<usize>) -> String {
128 let mut path = String::from("/v1/events");
129 let mut first = true;
130 let mut push = |k: &str, v: String, first: &mut bool| {
131 path.push(if *first { '?' } else { '&' });
132 *first = false;
133 path.push_str(k);
134 path.push('=');
135 path.push_str(&v);
136 };
137 if let Some(f) = formation {
138 push("formation", urlencode(f), &mut first);
139 }
140 if let Some(s) = since {
141 push("since", s.to_string(), &mut first);
142 }
143 if let Some(l) = limit {
144 push("limit", l.to_string(), &mut first);
145 }
146 path
147}
148
149async fn follow_ws(
150 client: &CellosClient,
151 formation: Option<&str>,
152 since: Option<u64>,
153) -> CtlResult<()> {
154 let path = ws_path(formation, since);
155 let url = client.ws_url(&path)?;
156
157 let request = build_ws_request(&url, client.bearer_token())?;
164
165 let (ws_stream, _resp) = tokio_tungstenite::connect_async(request)
166 .await
167 .map_err(|e| CtlError::api(format!("ws connect {url}: {e}")))?;
168
169 let (mut tx, mut rx) = ws_stream.split();
170
171 loop {
172 tokio::select! {
173 _ = tokio::signal::ctrl_c() => {
174 let _ = tx.send(Message::Close(None)).await;
175 eprintln!();
176 return Ok(());
177 }
178 msg = rx.next() => match msg {
179 Some(Ok(Message::Text(t))) => {
180 render_ws_frame(&t);
181 }
182 Some(Ok(Message::Binary(b))) => {
183 if let Ok(s) = std::str::from_utf8(&b) {
184 render_ws_frame(s);
185 }
186 }
187 Some(Ok(Message::Ping(payload))) => {
188 let _ = tx.send(Message::Pong(payload)).await;
189 }
190 Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {}
191 Some(Ok(Message::Close(_))) | None => return Ok(()),
192 Some(Err(e)) => {
193 return Err(CtlError::api(format!("ws: {e}")));
194 }
195 }
196 }
197 }
198}
199
200fn render_ws_frame(text: &str) {
205 if let Ok(v) = serde_json::from_str::<serde_json::Value>(text) {
207 if let Some(inner) = v.get("event") {
208 if let Ok(ev) = serde_json::from_value::<CloudEvent>(inner.clone()) {
209 print_event(&ev);
210 return;
211 }
212 }
213 if let Ok(ev) = serde_json::from_value::<CloudEvent>(v) {
215 print_event(&ev);
216 return;
217 }
218 }
219 println!("{text}");
221}
222
223fn build_ws_request(url: &str, bearer: Option<&str>) -> CtlResult<WsRequest> {
227 let mut request = url
228 .into_client_request()
229 .map_err(|e| CtlError::usage(format!("ws build request {url}: {e}")))?;
230 if let Some(tok) = bearer {
231 let value = WsHeaderValue::from_str(&format!("Bearer {tok}"))
232 .map_err(|e| CtlError::usage(format!("bad bearer token: {e}")))?;
233 request
234 .headers_mut()
235 .insert(ws_header::AUTHORIZATION, value);
236 }
237 Ok(request)
238}
239
240fn ws_path(formation: Option<&str>, since: Option<u64>) -> String {
241 let mut path = String::from("/ws/events");
242 let mut first = true;
243 let mut push = |k: &str, v: String, first: &mut bool| {
244 path.push(if *first { '?' } else { '&' });
245 *first = false;
246 path.push_str(k);
247 path.push('=');
248 path.push_str(&v);
249 };
250 if let Some(f) = formation {
251 push("formation", urlencode(f), &mut first);
252 }
253 if let Some(s) = since {
254 push("since", s.to_string(), &mut first);
255 }
256 path
257}
258
259fn print_event(ev: &CloudEvent) {
260 let ts = ev.time.as_deref().unwrap_or("-");
261 let kind = ev.event_type.as_deref().unwrap_or("event");
262 let subject = ev.subject.as_deref().unwrap_or("-");
263 let data = ev
264 .data
265 .as_ref()
266 .map(|v| serde_json::to_string(v).unwrap_or_default())
267 .unwrap_or_default();
268 if data.is_empty() {
269 println!("{ts} {kind} {subject}");
270 } else {
271 println!("{ts} {kind} {subject} {data}");
272 }
273}
274
275fn urlencode(s: &str) -> String {
276 percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect()
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[test]
288 fn build_ws_request_installs_bearer_when_token_present() {
289 let req = build_ws_request("ws://127.0.0.1:8080/ws/events", Some("s3cr3t"))
290 .expect("build ws request");
291 let auth = req
292 .headers()
293 .get(ws_header::AUTHORIZATION)
294 .expect("AUTHORIZATION header must be present");
295 assert_eq!(
296 auth.to_str().expect("ascii header"),
297 "Bearer s3cr3t",
298 "EVT-002: WS upgrade must carry the Bearer token"
299 );
300 }
301
302 #[test]
305 fn build_ws_request_no_bearer_when_token_absent() {
306 let req =
307 build_ws_request("ws://127.0.0.1:8080/ws/events", None).expect("build ws request");
308 assert!(
309 req.headers().get(ws_header::AUTHORIZATION).is_none(),
310 "no AUTHORIZATION header when no token is configured",
311 );
312 }
313
314 #[test]
318 fn build_ws_request_accepts_wss_scheme() {
319 let req = build_ws_request("wss://cellos.example.com/ws/events", Some("t"))
320 .expect("build wss request");
321 assert!(req.headers().get(ws_header::AUTHORIZATION).is_some());
322 }
323
324 #[test]
328 fn one_shot_path_composes_all_known_params() {
329 let p = one_shot_path(Some("demo"), Some(42), Some(50));
330 assert!(p.starts_with("/v1/events?"), "got {p}");
331 assert!(p.contains("formation=demo"), "got {p}");
332 assert!(p.contains("since=42"), "got {p}");
333 assert!(p.contains("limit=50"), "got {p}");
334 }
335
336 #[test]
337 fn one_shot_path_no_params() {
338 assert_eq!(one_shot_path(None, None, None), "/v1/events");
339 }
340
341 #[test]
342 fn ws_path_threads_since() {
343 let p = ws_path(None, Some(7));
344 assert_eq!(p, "/ws/events?since=7");
345 }
346}