1use clap::{Args, Subcommand};
2use reqwest_eventsource::Event;
3use futures_util::stream::StreamExt;
4
5use crate::client::BackpacClient;
6use crate::errors::CairnError;
7use super::{output_json, Cli};
8
9#[derive(Args, Debug)]
10pub struct WatchArgs {
11 #[command(subcommand)]
12 pub command: WatchCommands,
13}
14
15#[derive(Subcommand, Debug)]
16pub enum WatchCommands {
17 Intent {
19 id: String,
21 },
22
23 Agent,
25}
26
27impl WatchArgs {
28 pub async fn execute(&self, cli: &Cli) -> Result<(), CairnError> {
29 let client = BackpacClient::new(cli.jwt.as_deref(), cli.api_url.as_deref(), cli.worker_url.as_deref());
30
31 match &self.command {
32 WatchCommands::Intent { id } => {
33 let path = format!("/v1/intents/{}/stream", id);
34 let mut retries = 0;
35 let max_retries = 5;
36
37 loop {
38 let mut es = client.stream(&path)?;
39 let mut should_retry = false;
40
41 while let Some(event) = es.next().await {
42 match event {
43 Ok(Event::Open) => {
44 retries = 0; if !cli.quiet {
46 eprintln!("[SSE] Connected to intent stream for ID: {}", id);
47 }
48 }
49 Ok(Event::Message(message)) => {
50 if let Ok(json) = serde_json::from_str::<serde_json::Value>(&message.data) {
51 if json.get("type").and_then(|t| t.as_str()) == Some("close") {
52 if !cli.quiet {
53 eprintln!("[SSE] Stream closed by server.");
54 }
55 should_retry = false; break;
57 }
58 output_json(&json, &cli.output);
59 } else {
60 if !cli.quiet {
61 println!("{}", message.data);
62 }
63 }
64 }
65 Err(reqwest_eventsource::Error::StreamEnded) => {
66 if !cli.quiet {
67 eprintln!("[SSE] Stream ended.");
68 }
69 should_retry = false;
70 break;
71 }
72 Err(err) => {
73 if !cli.quiet {
74 eprintln!("[SSE] Stream error: {}. Attempting reconnect...", err);
75 }
76 should_retry = true;
77 break;
78 }
79 }
80 }
81
82 if !should_retry {
83 break;
84 }
85
86 retries += 1;
87 if retries > max_retries {
88 return Err(CairnError::General(format!("SSE failed after {} retries", max_retries)));
89 }
90
91 let backoff = std::time::Duration::from_millis(500 * (1 << (retries - 1)));
92 tokio::time::sleep(backoff).await;
93 }
94 Ok(())
95 }
96
97 WatchCommands::Agent => {
98 let mut retries = 0;
99 let max_retries = 5;
100
101 loop {
102 let mut es = client.stream("/v1/agents/stream")?;
103 let mut should_retry = false;
104
105 while let Some(event) = es.next().await {
106 match event {
107 Ok(Event::Open) => {
108 retries = 0; if !cli.quiet {
110 eprintln!("[SSE] Connected to agent event stream.");
111 }
112 }
113 Ok(Event::Message(message)) => {
114 if let Ok(json) = serde_json::from_str::<serde_json::Value>(&message.data) {
115 output_json(&json, &cli.output);
116 } else {
117 if !cli.quiet {
118 println!("{}", message.data);
119 }
120 }
121 }
122 Err(reqwest_eventsource::Error::StreamEnded) => {
123 if !cli.quiet {
124 eprintln!("[SSE] Stream ended.");
125 }
126 should_retry = false;
127 break;
128 }
129 Err(err) => {
130 if !cli.quiet {
131 eprintln!("[SSE] Stream error: {}. Attempting reconnect...", err);
132 }
133 should_retry = true;
134 break;
135 }
136 }
137 }
138
139 if !should_retry {
140 break;
141 }
142
143 retries += 1;
144 if retries > max_retries {
145 return Err(CairnError::General(format!("SSE failed after {} retries", max_retries)));
146 }
147
148 let backoff = std::time::Duration::from_millis(500 * (1 << (retries - 1)));
149 tokio::time::sleep(backoff).await;
150 }
151 Ok(())
152 }
153 }
154 }
155}