Skip to main content

scud/commands/
watch.rs

1//! Watch command - monitor running swarms via ZMQ
2
3use anyhow::Result;
4use clap::Parser;
5use std::path::PathBuf;
6use zeromq::{Socket, SocketRecv};
7
8use crate::commands::swarm::publisher::{discover_sessions, ZmqEvent};
9
10#[derive(Parser, Debug)]
11pub struct WatchArgs {
12    /// Session ID to watch (discovers automatically if not specified)
13    #[arg(long)]
14    pub session: Option<String>,
15
16    /// Tag to filter sessions
17    #[arg(long)]
18    pub tag: Option<String>,
19
20    /// Project root directory
21    #[arg(long)]
22    pub project_root: Option<PathBuf>,
23
24    /// Output format: text, json
25    #[arg(long, default_value = "text")]
26    pub format: String,
27}
28
29pub async fn run(args: WatchArgs) -> Result<()> {
30    let project_root = args
31        .project_root
32        .unwrap_or_else(|| std::env::current_dir().unwrap());
33
34    // Discover sessions
35    let sessions = discover_sessions(&project_root);
36
37    // Filter sessions based on criteria
38    let filtered_sessions: Vec<_> = sessions
39        .into_iter()
40        .filter(|s| {
41            // Filter by tag if specified
42            if let Some(ref tag) = args.tag {
43                if s.tag != *tag {
44                    return false;
45                }
46            }
47            // Filter by session if specified
48            if let Some(ref session) = args.session {
49                if s.session_id != *session {
50                    return false;
51                }
52            }
53            true
54        })
55        .collect();
56
57    if filtered_sessions.is_empty() {
58        println!("No running swarms found.");
59        if args.tag.is_some() {
60            println!("Try without --tag to see all sessions.");
61        }
62        return Ok(());
63    }
64
65    // If multiple sessions, list them
66    if filtered_sessions.len() > 1 && args.session.is_none() {
67        println!("Multiple swarms running. Specify --session to watch one:");
68        for session in &filtered_sessions {
69            println!("  {} (tag: {})", session.session_id, session.tag);
70        }
71        return Ok(());
72    }
73
74    let session = filtered_sessions.into_iter().next().unwrap();
75    println!("Watching swarm: {}", session.session_id);
76    println!("Connected to: {}", session.pub_endpoint);
77    println!("---");
78
79    // Connect and subscribe to ZMQ
80    let mut socket = zeromq::SubSocket::new();
81    socket.connect(&session.pub_endpoint).await?;
82    socket.subscribe("").await?;
83
84    // Receive and print events
85    loop {
86        match socket.recv().await {
87            Ok(msg) => {
88                // Get the first frame from the multi-part message
89                if let Some(frame) = msg.iter().next() {
90                    if let Ok(text) = std::str::from_utf8(frame) {
91                        if args.format == "json" {
92                            println!("{}", text);
93                        } else if let Ok(event) = serde_json::from_str::<ZmqEvent>(text) {
94                            print_event(&event);
95                        }
96                    }
97                }
98            }
99            Err(e) => {
100                eprintln!("Connection lost: {}", e);
101                break;
102            }
103        }
104    }
105
106    Ok(())
107}
108
109fn print_event(event: &ZmqEvent) {
110    match event {
111        ZmqEvent::SwarmStarted {
112            tag, total_waves, ..
113        } => {
114            println!("[SWARM] Started tag='{}' waves={}", tag, total_waves);
115        }
116        ZmqEvent::WaveStarted { wave, tasks, task_count } => {
117            println!(
118                "[WAVE {}] Started with {} tasks",
119                wave,
120                task_count
121            );
122            if !tasks.is_empty() {
123                println!("  Tasks: {:?}", tasks);
124            }
125        }
126        ZmqEvent::TaskStarted { task_id } => {
127            println!("[TASK {}] Started", task_id);
128        }
129        ZmqEvent::TaskSpawned { task_id } => {
130            println!("[TASK {}] Spawned", task_id);
131        }
132        ZmqEvent::TaskOutput { task_id, text } => {
133            println!("[{}] {}", task_id, text);
134        }
135        ZmqEvent::TaskCompleted {
136            task_id,
137            success,
138            duration_ms,
139        } => {
140            let status = if *success { "completed" } else { "FAILED" };
141            let duration = duration_ms
142                .map(|d| format!(" ({}ms)", d))
143                .unwrap_or_default();
144            println!("[TASK {}] {}{}", task_id, status, duration);
145        }
146        ZmqEvent::TaskFailed { task_id, reason } => {
147            println!("[TASK {}] FAILED: {}", task_id, reason);
148        }
149        ZmqEvent::ValidationStarted => {
150            println!("[VALIDATION] Running...");
151        }
152        ZmqEvent::ValidationCompleted { passed, output } => {
153            let status = if *passed { "PASSED" } else { "FAILED" };
154            println!("[VALIDATION] {}: {}", status, output);
155        }
156        ZmqEvent::ValidationPassed => {
157            println!("[VALIDATION] PASSED");
158        }
159        ZmqEvent::ValidationFailed { failures } => {
160            println!("[VALIDATION] FAILED:");
161            for failure in failures {
162                println!("  - {}", failure);
163            }
164        }
165        ZmqEvent::WaveCompleted { wave, duration_ms } => {
166            let duration = duration_ms
167                .map(|d| format!(" ({}ms)", d))
168                .unwrap_or_default();
169            println!("[WAVE {}] Completed{}", wave, duration);
170        }
171        ZmqEvent::SwarmCompleted { success } => {
172            let status = if *success { "SUCCESS" } else { "FAILED" };
173            println!("[SWARM] Completed: {}", status);
174        }
175        ZmqEvent::SwarmPaused => {
176            println!("[SWARM] Paused");
177        }
178        ZmqEvent::SwarmResumed => {
179            println!("[SWARM] Resumed");
180        }
181        ZmqEvent::Heartbeat { timestamp } => {
182            println!("[HEARTBEAT] {}", timestamp);
183        }
184        ZmqEvent::ToolCall { task_id, tool, input_summary } => {
185            let summary = input_summary.as_ref().map(|s| format!(" ({})", s)).unwrap_or_default();
186            println!("[TOOL {}] {} called{}", task_id, tool, summary);
187        }
188        ZmqEvent::ToolResult { task_id, tool, success, duration_ms } => {
189            let status = if *success { "success" } else { "failed" };
190            let duration = duration_ms.map(|d| format!(" ({}ms)", d)).unwrap_or_default();
191            println!("[TOOL {}] {} {}{}", task_id, tool, status, duration);
192        }
193        ZmqEvent::FileRead { task_id, path } => {
194            println!("[FILE {}] Read: {}", task_id, path);
195        }
196        ZmqEvent::FileWrite { task_id, path, lines_changed } => {
197            let lines = lines_changed.map(|l| format!(" ({} lines)", l)).unwrap_or_default();
198            println!("[FILE {}] Write: {}{}", task_id, path, lines);
199        }
200        ZmqEvent::DependencyMet { task_id, dependency_id } => {
201            println!("[DEP {}] Met: {}", task_id, dependency_id);
202        }
203        ZmqEvent::TaskUnblocked { task_id, by_task_id } => {
204            println!("[BLOCK {}] Unblocked by: {}", task_id, by_task_id);
205        }
206        ZmqEvent::RepairStarted { attempt, task_ids } => {
207            println!("[REPAIR] Started attempt {} for tasks: {:?}", attempt, task_ids);
208        }
209        ZmqEvent::RepairCompleted { attempt, success } => {
210            let status = if *success { "succeeded" } else { "failed" };
211            println!("[REPAIR] Attempt {} {}", attempt, status);
212        }
213    }
214}