1use clap::{Args, Subcommand};
2use reqwest_eventsource::Event;
3use futures_util::stream::StreamExt;
4
5use crate::client::BackpacClient;
6use crate::errors::CairnError;
7use super::{output_json, Cli};
8
9#[derive(Args, Debug)]
10pub struct WatchArgs {
11 #[command(subcommand)]
12 pub command: WatchCommands,
13}
14
15#[derive(Subcommand, Debug)]
16pub enum WatchCommands {
17 Intent {
19 id: String,
21 },
22
23 Agent,
25}
26
27impl WatchArgs {
28 pub async fn execute(&self, cli: &Cli) -> Result<(), CairnError> {
29 let client = BackpacClient::new(cli.jwt.as_deref(), cli.api_url.as_deref());
30
31 match &self.command {
32 WatchCommands::Intent { id } => {
33 let path = format!("/v1/intents/{}/stream", id);
34 let mut es = client.stream(&path)?;
35
36 while let Some(event) = es.next().await {
37 match event {
38 Ok(Event::Open) => {
39 if !cli.quiet {
40 eprintln!("[SSE] Connected to intent stream for ID: {}", id);
41 }
42 }
43 Ok(Event::Message(message)) => {
44 if let Ok(json) = serde_json::from_str::<serde_json::Value>(&message.data) {
45 if json.get("type").and_then(|t| t.as_str()) == Some("close") {
47 if !cli.quiet {
48 eprintln!("[SSE] Stream closed by server.");
49 }
50 break;
51 }
52 output_json(&json, &cli.output);
53 } else {
54 if !cli.quiet {
55 println!("{}", message.data);
56 }
57 }
58 }
59 Err(reqwest_eventsource::Error::StreamEnded) => {
60 if !cli.quiet {
61 eprintln!("[SSE] Stream ended.");
62 }
63 break;
64 }
65 Err(err) => {
66 return Err(CairnError::General(format!("SSE error: {}", err)));
67 }
68 }
69 }
70 Ok(())
71 }
72
73 WatchCommands::Agent => {
74 let mut es = client.stream("/v1/agents/stream")?;
75
76 while let Some(event) = es.next().await {
77 match event {
78 Ok(Event::Open) => {
79 if !cli.quiet {
80 eprintln!("[SSE] Connected to agent event stream.");
81 }
82 }
83 Ok(Event::Message(message)) => {
84 if let Ok(json) = serde_json::from_str::<serde_json::Value>(&message.data) {
85 output_json(&json, &cli.output);
86 } else {
87 if !cli.quiet {
88 println!("{}", message.data);
89 }
90 }
91 }
92 Err(reqwest_eventsource::Error::StreamEnded) => {
93 if !cli.quiet {
94 eprintln!("[SSE] Stream ended.");
95 }
96 break;
97 }
98 Err(err) => {
99 return Err(CairnError::General(format!("SSE error: {}", err)));
100 }
101 }
102 }
103 Ok(())
104 }
105 }
106 }
107}