Skip to main content

cairn_cli/cli/
watch.rs

1use clap::{Args, Subcommand};
2use reqwest_eventsource::Event;
3use futures_util::stream::StreamExt;
4
5use crate::client::BackpacClient;
6use crate::errors::CairnError;
7use super::{output_json, Cli};
8
9#[derive(Args, Debug)]
10pub struct WatchArgs {
11    #[command(subcommand)]
12    pub command: WatchCommands,
13}
14
15#[derive(Subcommand, Debug)]
16pub enum WatchCommands {
17    /// Watch an execution intent for real-time status updates via SSE.
18    Intent {
19        /// Execution intent ID to watch
20        id: String,
21    },
22    
23    /// Watch the agent stream for real-time notifications and inbound Proofs of Intent.
24    Agent,
25}
26
27impl WatchArgs {
28    pub async fn execute(&self, cli: &Cli) -> Result<(), CairnError> {
29        let client = BackpacClient::new(cli.jwt.as_deref(), cli.api_url.as_deref());
30
31        match &self.command {
32            WatchCommands::Intent { id } => {
33                let path = format!("/v1/intents/{}/stream", id);
34                let mut es = client.stream(&path)?;
35                
36                while let Some(event) = es.next().await {
37                    match event {
38                        Ok(Event::Open) => {
39                            if !cli.quiet {
40                                eprintln!("[SSE] Connected to intent stream for ID: {}", id);
41                            }
42                        }
43                        Ok(Event::Message(message)) => {
44                            if let Ok(json) = serde_json::from_str::<serde_json::Value>(&message.data) {
45                                // Close event
46                                if json.get("type").and_then(|t| t.as_str()) == Some("close") {
47                                    if !cli.quiet {
48                                        eprintln!("[SSE] Stream closed by server.");
49                                    }
50                                    break;
51                                }
52                                output_json(&json, &cli.output);
53                            } else {
54                                if !cli.quiet {
55                                    println!("{}", message.data);
56                                }
57                            }
58                        }
59                        Err(reqwest_eventsource::Error::StreamEnded) => {
60                            if !cli.quiet {
61                                eprintln!("[SSE] Stream ended.");
62                            }
63                            break;
64                        }
65                        Err(err) => {
66                            return Err(CairnError::General(format!("SSE error: {}", err)));
67                        }
68                    }
69                }
70                Ok(())
71            }
72
73            WatchCommands::Agent => {
74                let mut es = client.stream("/v1/agents/stream")?;
75                
76                while let Some(event) = es.next().await {
77                    match event {
78                        Ok(Event::Open) => {
79                            if !cli.quiet {
80                                eprintln!("[SSE] Connected to agent event stream.");
81                            }
82                        }
83                        Ok(Event::Message(message)) => {
84                            if let Ok(json) = serde_json::from_str::<serde_json::Value>(&message.data) {
85                                output_json(&json, &cli.output);
86                            } else {
87                                if !cli.quiet {
88                                    println!("{}", message.data);
89                                }
90                            }
91                        }
92                        Err(reqwest_eventsource::Error::StreamEnded) => {
93                            if !cli.quiet {
94                                eprintln!("[SSE] Stream ended.");
95                            }
96                            break;
97                        }
98                        Err(err) => {
99                            return Err(CairnError::General(format!("SSE error: {}", err)));
100                        }
101                    }
102                }
103                Ok(())
104            }
105        }
106    }
107}