Skip to main content

cairn_cli/cli/
watch.rs

1use clap::{Args, Subcommand};
2use reqwest_eventsource::Event;
3use futures_util::stream::StreamExt;
4
5use crate::client::BackpacClient;
6use crate::errors::CairnError;
7use super::{output_json, Cli};
8
9#[derive(Args, Debug)]
10pub struct WatchArgs {
11    #[command(subcommand)]
12    pub command: WatchCommands,
13}
14
15#[derive(Subcommand, Debug)]
16pub enum WatchCommands {
17    /// Watch an execution intent for real-time status updates via SSE.
18    Intent {
19        /// Execution intent ID to watch
20        id: String,
21    },
22    
23    /// Watch the agent stream for real-time notifications and inbound Proofs of Intent.
24    Agent,
25}
26
27impl WatchArgs {
28    pub async fn execute(&self, cli: &Cli) -> Result<(), CairnError> {
29        let client = BackpacClient::new(cli.jwt.as_deref(), cli.api_url.as_deref());
30
31        match &self.command {
32            WatchCommands::Intent { id } => {
33                let path = format!("/v1/intents/{}/stream", id);
34                let mut retries = 0;
35                let max_retries = 5;
36
37                loop {
38                    let mut es = client.stream(&path)?;
39                    let mut should_retry = false;
40                    
41                    while let Some(event) = es.next().await {
42                        match event {
43                            Ok(Event::Open) => {
44                                retries = 0; // Reset retries on successful connection
45                                if !cli.quiet {
46                                    eprintln!("[SSE] Connected to intent stream for ID: {}", id);
47                                }
48                            }
49                            Ok(Event::Message(message)) => {
50                                if let Ok(json) = serde_json::from_str::<serde_json::Value>(&message.data) {
51                                    if json.get("type").and_then(|t| t.as_str()) == Some("close") {
52                                        if !cli.quiet {
53                                            eprintln!("[SSE] Stream closed by server.");
54                                        }
55                                        should_retry = false; // Clean shutdown
56                                        break;
57                                    }
58                                    output_json(&json, &cli.output);
59                                } else {
60                                    if !cli.quiet {
61                                        println!("{}", message.data);
62                                    }
63                                }
64                            }
65                            Err(reqwest_eventsource::Error::StreamEnded) => {
66                                if !cli.quiet {
67                                    eprintln!("[SSE] Stream ended.");
68                                }
69                                should_retry = false;
70                                break;
71                            }
72                            Err(err) => {
73                                if !cli.quiet {
74                                    eprintln!("[SSE] Stream error: {}. Attempting reconnect...", err);
75                                }
76                                should_retry = true;
77                                break;
78                            }
79                        }
80                    }
81
82                    if !should_retry {
83                        break;
84                    }
85
86                    retries += 1;
87                    if retries > max_retries {
88                        return Err(CairnError::General(format!("SSE failed after {} retries", max_retries)));
89                    }
90
91                    let backoff = std::time::Duration::from_millis(500 * (1 << (retries - 1)));
92                    tokio::time::sleep(backoff).await;
93                }
94                Ok(())
95            }
96
97            WatchCommands::Agent => {
98                let mut retries = 0;
99                let max_retries = 5;
100
101                loop {
102                    let mut es = client.stream("/v1/agents/stream")?;
103                    let mut should_retry = false;
104                    
105                    while let Some(event) = es.next().await {
106                        match event {
107                            Ok(Event::Open) => {
108                                retries = 0; // Reset on success
109                                if !cli.quiet {
110                                    eprintln!("[SSE] Connected to agent event stream.");
111                                }
112                            }
113                            Ok(Event::Message(message)) => {
114                                if let Ok(json) = serde_json::from_str::<serde_json::Value>(&message.data) {
115                                    output_json(&json, &cli.output);
116                                } else {
117                                    if !cli.quiet {
118                                        println!("{}", message.data);
119                                    }
120                                }
121                            }
122                            Err(reqwest_eventsource::Error::StreamEnded) => {
123                                if !cli.quiet {
124                                    eprintln!("[SSE] Stream ended.");
125                                }
126                                should_retry = false;
127                                break;
128                            }
129                            Err(err) => {
130                                if !cli.quiet {
131                                    eprintln!("[SSE] Stream error: {}. Attempting reconnect...", err);
132                                }
133                                should_retry = true;
134                                break;
135                            }
136                        }
137                    }
138
139                    if !should_retry {
140                        break;
141                    }
142
143                    retries += 1;
144                    if retries > max_retries {
145                        return Err(CairnError::General(format!("SSE failed after {} retries", max_retries)));
146                    }
147
148                    let backoff = std::time::Duration::from_millis(500 * (1 << (retries - 1)));
149                    tokio::time::sleep(backoff).await;
150                }
151                Ok(())
152            }
153        }
154    }
155}