Skip to main content

scud/commands/
watch.rs

1//! Watch command - monitor running swarms via ZMQ
2
3use anyhow::Result;
4use clap::Parser;
5use std::path::PathBuf;
6
7#[cfg(feature = "zeromq")]
8use zeromq::{Socket, SocketRecv};
9
10use crate::commands::swarm::publisher::{discover_sessions, ZmqEvent};
11
12#[derive(Parser, Debug)]
13pub struct WatchArgs {
14    /// Session ID to watch (discovers automatically if not specified)
15    #[arg(long)]
16    pub session: Option<String>,
17
18    /// Tag to filter sessions
19    #[arg(long)]
20    pub tag: Option<String>,
21
22    /// Project root directory
23    #[arg(long)]
24    pub project_root: Option<PathBuf>,
25
26    /// Output format: text, json
27    #[arg(long, default_value = "text")]
28    pub format: String,
29}
30
31pub async fn run(args: WatchArgs) -> Result<()> {
32    let project_root = args
33        .project_root
34        .unwrap_or_else(|| std::env::current_dir().unwrap());
35
36    // Discover sessions
37    let sessions = discover_sessions(&project_root);
38
39    // Filter sessions based on criteria
40    let filtered_sessions: Vec<_> = sessions
41        .into_iter()
42        .filter(|s| {
43            // Filter by tag if specified
44            if let Some(ref tag) = args.tag {
45                if s.tag != *tag {
46                    return false;
47                }
48            }
49            // Filter by session if specified
50            if let Some(ref session) = args.session {
51                if s.session_id != *session {
52                    return false;
53                }
54            }
55            true
56        })
57        .collect();
58
59    if filtered_sessions.is_empty() {
60        println!("No running swarms found.");
61        if args.tag.is_some() {
62            println!("Try without --tag to see all sessions.");
63        }
64        return Ok(());
65    }
66
67    // If multiple sessions, list them
68    if filtered_sessions.len() > 1 && args.session.is_none() {
69        println!("Multiple swarms running. Specify --session to watch one:");
70        for session in &filtered_sessions {
71            println!("  {} (tag: {})", session.session_id, session.tag);
72        }
73        return Ok(());
74    }
75
76    let session = filtered_sessions.into_iter().next().unwrap();
77    println!("Watching swarm: {}", session.session_id);
78    println!("Connected to: {}", session.pub_endpoint);
79    println!("---");
80
81    #[cfg(feature = "zeromq")]
82    {
83        // Connect and subscribe to ZMQ
84        let mut socket = zeromq::SubSocket::new();
85        socket.connect(&session.pub_endpoint).await?;
86        socket.subscribe("").await?;
87
88        // Receive and print events
89        loop {
90            match socket.recv().await {
91                Ok(msg) => {
92                    // Get the first frame from the multi-part message
93                    if let Some(frame) = msg.iter().next() {
94                        if let Ok(text) = std::str::from_utf8(frame) {
95                            if args.format == "json" {
96                                println!("{}", text);
97                            } else if let Ok(event) = serde_json::from_str::<ZmqEvent>(text) {
98                                print_event(&event);
99                            }
100                        }
101                    }
102                }
103                Err(e) => {
104                    eprintln!("Connection lost: {}", e);
105                    break;
106                }
107            }
108        }
109    }
110
111    #[cfg(not(feature = "zeromq"))]
112    {
113        anyhow::bail!(
114            "Watch command requires the 'zeromq' feature. Rebuild with: cargo build --features zeromq"
115        );
116    }
117
118    #[allow(unreachable_code)]
119    Ok(())
120}
121
122fn print_event(event: &ZmqEvent) {
123    match event {
124        ZmqEvent::SwarmStarted {
125            tag, total_waves, ..
126        } => {
127            println!("[SWARM] Started tag='{}' waves={}", tag, total_waves);
128        }
129        ZmqEvent::WaveStarted {
130            wave,
131            tasks,
132            task_count,
133        } => {
134            println!("[WAVE {}] Started with {} tasks", wave, task_count);
135            if !tasks.is_empty() {
136                println!("  Tasks: {:?}", tasks);
137            }
138        }
139        ZmqEvent::TaskStarted { task_id } => {
140            println!("[TASK {}] Started", task_id);
141        }
142        ZmqEvent::TaskSpawned { task_id } => {
143            println!("[TASK {}] Spawned", task_id);
144        }
145        ZmqEvent::TaskOutput { task_id, text } => {
146            println!("[{}] {}", task_id, text);
147        }
148        ZmqEvent::TaskCompleted {
149            task_id,
150            success,
151            duration_ms,
152        } => {
153            let status = if *success { "completed" } else { "FAILED" };
154            let duration = duration_ms
155                .map(|d| format!(" ({}ms)", d))
156                .unwrap_or_default();
157            println!("[TASK {}] {}{}", task_id, status, duration);
158        }
159        ZmqEvent::TaskFailed { task_id, reason } => {
160            println!("[TASK {}] FAILED: {}", task_id, reason);
161        }
162        ZmqEvent::ValidationStarted => {
163            println!("[VALIDATION] Running...");
164        }
165        ZmqEvent::ValidationCompleted { passed, output } => {
166            let status = if *passed { "PASSED" } else { "FAILED" };
167            println!("[VALIDATION] {}: {}", status, output);
168        }
169        ZmqEvent::ValidationPassed => {
170            println!("[VALIDATION] PASSED");
171        }
172        ZmqEvent::ValidationFailed { failures } => {
173            println!("[VALIDATION] FAILED:");
174            for failure in failures {
175                println!("  - {}", failure);
176            }
177        }
178        ZmqEvent::WaveCompleted { wave, duration_ms } => {
179            let duration = duration_ms
180                .map(|d| format!(" ({}ms)", d))
181                .unwrap_or_default();
182            println!("[WAVE {}] Completed{}", wave, duration);
183        }
184        ZmqEvent::SwarmCompleted { success } => {
185            let status = if *success { "SUCCESS" } else { "FAILED" };
186            println!("[SWARM] Completed: {}", status);
187        }
188        ZmqEvent::SwarmPaused => {
189            println!("[SWARM] Paused");
190        }
191        ZmqEvent::SwarmResumed => {
192            println!("[SWARM] Resumed");
193        }
194        ZmqEvent::Heartbeat { timestamp } => {
195            println!("[HEARTBEAT] {}", timestamp);
196        }
197        ZmqEvent::ToolCall {
198            task_id,
199            tool,
200            input_summary,
201        } => {
202            let summary = input_summary
203                .as_ref()
204                .map(|s| format!(" ({})", s))
205                .unwrap_or_default();
206            println!("[TOOL {}] {} called{}", task_id, tool, summary);
207        }
208        ZmqEvent::ToolResult {
209            task_id,
210            tool,
211            success,
212            duration_ms,
213        } => {
214            let status = if *success { "success" } else { "failed" };
215            let duration = duration_ms
216                .map(|d| format!(" ({}ms)", d))
217                .unwrap_or_default();
218            println!("[TOOL {}] {} {}{}", task_id, tool, status, duration);
219        }
220        ZmqEvent::FileRead { task_id, path } => {
221            println!("[FILE {}] Read: {}", task_id, path);
222        }
223        ZmqEvent::FileWrite {
224            task_id,
225            path,
226            lines_changed,
227        } => {
228            let lines = lines_changed
229                .map(|l| format!(" ({} lines)", l))
230                .unwrap_or_default();
231            println!("[FILE {}] Write: {}{}", task_id, path, lines);
232        }
233        ZmqEvent::DependencyMet {
234            task_id,
235            dependency_id,
236        } => {
237            println!("[DEP {}] Met: {}", task_id, dependency_id);
238        }
239        ZmqEvent::TaskUnblocked {
240            task_id,
241            by_task_id,
242        } => {
243            println!("[BLOCK {}] Unblocked by: {}", task_id, by_task_id);
244        }
245        ZmqEvent::RepairStarted { attempt, task_ids } => {
246            println!(
247                "[REPAIR] Started attempt {} for tasks: {:?}",
248                attempt, task_ids
249            );
250        }
251        ZmqEvent::RepairCompleted { attempt, success } => {
252            let status = if *success { "succeeded" } else { "failed" };
253            println!("[REPAIR] Attempt {} {}", attempt, status);
254        }
255    }
256}