1use anyhow::Result;
4use clap::Parser;
5use std::path::PathBuf;
6
7#[cfg(feature = "zeromq")]
8use zeromq::{Socket, SocketRecv};
9
10use crate::commands::swarm::publisher::{discover_sessions, ZmqEvent};
11
12#[derive(Parser, Debug)]
13pub struct WatchArgs {
14 #[arg(long)]
16 pub session: Option<String>,
17
18 #[arg(long)]
20 pub tag: Option<String>,
21
22 #[arg(long)]
24 pub project_root: Option<PathBuf>,
25
26 #[arg(long, default_value = "text")]
28 pub format: String,
29}
30
31pub async fn run(args: WatchArgs) -> Result<()> {
32 let project_root = args
33 .project_root
34 .unwrap_or_else(|| std::env::current_dir().unwrap());
35
36 let sessions = discover_sessions(&project_root);
38
39 let filtered_sessions: Vec<_> = sessions
41 .into_iter()
42 .filter(|s| {
43 if let Some(ref tag) = args.tag {
45 if s.tag != *tag {
46 return false;
47 }
48 }
49 if let Some(ref session) = args.session {
51 if s.session_id != *session {
52 return false;
53 }
54 }
55 true
56 })
57 .collect();
58
59 if filtered_sessions.is_empty() {
60 println!("No running swarms found.");
61 if args.tag.is_some() {
62 println!("Try without --tag to see all sessions.");
63 }
64 return Ok(());
65 }
66
67 if filtered_sessions.len() > 1 && args.session.is_none() {
69 println!("Multiple swarms running. Specify --session to watch one:");
70 for session in &filtered_sessions {
71 println!(" {} (tag: {})", session.session_id, session.tag);
72 }
73 return Ok(());
74 }
75
76 let session = filtered_sessions.into_iter().next().unwrap();
77 println!("Watching swarm: {}", session.session_id);
78 println!("Connected to: {}", session.pub_endpoint);
79 println!("---");
80
81 #[cfg(feature = "zeromq")]
82 {
83 let mut socket = zeromq::SubSocket::new();
85 socket.connect(&session.pub_endpoint).await?;
86 socket.subscribe("").await?;
87
88 loop {
90 match socket.recv().await {
91 Ok(msg) => {
92 if let Some(frame) = msg.iter().next() {
94 if let Ok(text) = std::str::from_utf8(frame) {
95 if args.format == "json" {
96 println!("{}", text);
97 } else if let Ok(event) = serde_json::from_str::<ZmqEvent>(text) {
98 print_event(&event);
99 }
100 }
101 }
102 }
103 Err(e) => {
104 eprintln!("Connection lost: {}", e);
105 break;
106 }
107 }
108 }
109 }
110
111 #[cfg(not(feature = "zeromq"))]
112 {
113 anyhow::bail!(
114 "Watch command requires the 'zeromq' feature. Rebuild with: cargo build --features zeromq"
115 );
116 }
117
118 #[allow(unreachable_code)]
119 Ok(())
120}
121
122fn print_event(event: &ZmqEvent) {
123 match event {
124 ZmqEvent::SwarmStarted {
125 tag, total_waves, ..
126 } => {
127 println!("[SWARM] Started tag='{}' waves={}", tag, total_waves);
128 }
129 ZmqEvent::WaveStarted {
130 wave,
131 tasks,
132 task_count,
133 } => {
134 println!("[WAVE {}] Started with {} tasks", wave, task_count);
135 if !tasks.is_empty() {
136 println!(" Tasks: {:?}", tasks);
137 }
138 }
139 ZmqEvent::TaskStarted { task_id } => {
140 println!("[TASK {}] Started", task_id);
141 }
142 ZmqEvent::TaskSpawned { task_id } => {
143 println!("[TASK {}] Spawned", task_id);
144 }
145 ZmqEvent::TaskOutput { task_id, text } => {
146 println!("[{}] {}", task_id, text);
147 }
148 ZmqEvent::TaskCompleted {
149 task_id,
150 success,
151 duration_ms,
152 } => {
153 let status = if *success { "completed" } else { "FAILED" };
154 let duration = duration_ms
155 .map(|d| format!(" ({}ms)", d))
156 .unwrap_or_default();
157 println!("[TASK {}] {}{}", task_id, status, duration);
158 }
159 ZmqEvent::TaskFailed { task_id, reason } => {
160 println!("[TASK {}] FAILED: {}", task_id, reason);
161 }
162 ZmqEvent::ValidationStarted => {
163 println!("[VALIDATION] Running...");
164 }
165 ZmqEvent::ValidationCompleted { passed, output } => {
166 let status = if *passed { "PASSED" } else { "FAILED" };
167 println!("[VALIDATION] {}: {}", status, output);
168 }
169 ZmqEvent::ValidationPassed => {
170 println!("[VALIDATION] PASSED");
171 }
172 ZmqEvent::ValidationFailed { failures } => {
173 println!("[VALIDATION] FAILED:");
174 for failure in failures {
175 println!(" - {}", failure);
176 }
177 }
178 ZmqEvent::WaveCompleted { wave, duration_ms } => {
179 let duration = duration_ms
180 .map(|d| format!(" ({}ms)", d))
181 .unwrap_or_default();
182 println!("[WAVE {}] Completed{}", wave, duration);
183 }
184 ZmqEvent::SwarmCompleted { success } => {
185 let status = if *success { "SUCCESS" } else { "FAILED" };
186 println!("[SWARM] Completed: {}", status);
187 }
188 ZmqEvent::SwarmPaused => {
189 println!("[SWARM] Paused");
190 }
191 ZmqEvent::SwarmResumed => {
192 println!("[SWARM] Resumed");
193 }
194 ZmqEvent::Heartbeat { timestamp } => {
195 println!("[HEARTBEAT] {}", timestamp);
196 }
197 ZmqEvent::ToolCall {
198 task_id,
199 tool,
200 input_summary,
201 } => {
202 let summary = input_summary
203 .as_ref()
204 .map(|s| format!(" ({})", s))
205 .unwrap_or_default();
206 println!("[TOOL {}] {} called{}", task_id, tool, summary);
207 }
208 ZmqEvent::ToolResult {
209 task_id,
210 tool,
211 success,
212 duration_ms,
213 } => {
214 let status = if *success { "success" } else { "failed" };
215 let duration = duration_ms
216 .map(|d| format!(" ({}ms)", d))
217 .unwrap_or_default();
218 println!("[TOOL {}] {} {}{}", task_id, tool, status, duration);
219 }
220 ZmqEvent::FileRead { task_id, path } => {
221 println!("[FILE {}] Read: {}", task_id, path);
222 }
223 ZmqEvent::FileWrite {
224 task_id,
225 path,
226 lines_changed,
227 } => {
228 let lines = lines_changed
229 .map(|l| format!(" ({} lines)", l))
230 .unwrap_or_default();
231 println!("[FILE {}] Write: {}{}", task_id, path, lines);
232 }
233 ZmqEvent::DependencyMet {
234 task_id,
235 dependency_id,
236 } => {
237 println!("[DEP {}] Met: {}", task_id, dependency_id);
238 }
239 ZmqEvent::TaskUnblocked {
240 task_id,
241 by_task_id,
242 } => {
243 println!("[BLOCK {}] Unblocked by: {}", task_id, by_task_id);
244 }
245 ZmqEvent::RepairStarted { attempt, task_ids } => {
246 println!(
247 "[REPAIR] Started attempt {} for tasks: {:?}",
248 attempt, task_ids
249 );
250 }
251 ZmqEvent::RepairCompleted { attempt, success } => {
252 let status = if *success { "succeeded" } else { "failed" };
253 println!("[REPAIR] Attempt {} {}", attempt, status);
254 }
255 }
256}