1use anyhow::Result;
4use clap::Parser;
5use std::path::PathBuf;
6use zeromq::{Socket, SocketRecv};
7
8use crate::commands::swarm::publisher::{discover_sessions, ZmqEvent};
9
10#[derive(Parser, Debug)]
11pub struct WatchArgs {
12 #[arg(long)]
14 pub session: Option<String>,
15
16 #[arg(long)]
18 pub tag: Option<String>,
19
20 #[arg(long)]
22 pub project_root: Option<PathBuf>,
23
24 #[arg(long, default_value = "text")]
26 pub format: String,
27}
28
29pub async fn run(args: WatchArgs) -> Result<()> {
30 let project_root = args
31 .project_root
32 .unwrap_or_else(|| std::env::current_dir().unwrap());
33
34 let sessions = discover_sessions(&project_root);
36
37 let filtered_sessions: Vec<_> = sessions
39 .into_iter()
40 .filter(|s| {
41 if let Some(ref tag) = args.tag {
43 if s.tag != *tag {
44 return false;
45 }
46 }
47 if let Some(ref session) = args.session {
49 if s.session_id != *session {
50 return false;
51 }
52 }
53 true
54 })
55 .collect();
56
57 if filtered_sessions.is_empty() {
58 println!("No running swarms found.");
59 if args.tag.is_some() {
60 println!("Try without --tag to see all sessions.");
61 }
62 return Ok(());
63 }
64
65 if filtered_sessions.len() > 1 && args.session.is_none() {
67 println!("Multiple swarms running. Specify --session to watch one:");
68 for session in &filtered_sessions {
69 println!(" {} (tag: {})", session.session_id, session.tag);
70 }
71 return Ok(());
72 }
73
74 let session = filtered_sessions.into_iter().next().unwrap();
75 println!("Watching swarm: {}", session.session_id);
76 println!("Connected to: {}", session.pub_endpoint);
77 println!("---");
78
79 let mut socket = zeromq::SubSocket::new();
81 socket.connect(&session.pub_endpoint).await?;
82 socket.subscribe("").await?;
83
84 loop {
86 match socket.recv().await {
87 Ok(msg) => {
88 if let Some(frame) = msg.iter().next() {
90 if let Ok(text) = std::str::from_utf8(&frame) {
91 if args.format == "json" {
92 println!("{}", text);
93 } else if let Ok(event) = serde_json::from_str::<ZmqEvent>(text) {
94 print_event(&event);
95 }
96 }
97 }
98 }
99 Err(e) => {
100 eprintln!("Connection lost: {}", e);
101 break;
102 }
103 }
104 }
105
106 Ok(())
107}
108
109fn print_event(event: &ZmqEvent) {
110 match event {
111 ZmqEvent::SwarmStarted {
112 tag, total_waves, ..
113 } => {
114 println!("[SWARM] Started tag='{}' waves={}", tag, total_waves);
115 }
116 ZmqEvent::WaveStarted { wave, tasks, task_count } => {
117 println!(
118 "[WAVE {}] Started with {} tasks",
119 wave,
120 task_count
121 );
122 if !tasks.is_empty() {
123 println!(" Tasks: {:?}", tasks);
124 }
125 }
126 ZmqEvent::TaskStarted { task_id } => {
127 println!("[TASK {}] Started", task_id);
128 }
129 ZmqEvent::TaskSpawned { task_id } => {
130 println!("[TASK {}] Spawned", task_id);
131 }
132 ZmqEvent::TaskOutput { task_id, text } => {
133 println!("[{}] {}", task_id, text);
134 }
135 ZmqEvent::TaskCompleted {
136 task_id,
137 success,
138 duration_ms,
139 } => {
140 let status = if *success { "completed" } else { "FAILED" };
141 let duration = duration_ms
142 .map(|d| format!(" ({}ms)", d))
143 .unwrap_or_default();
144 println!("[TASK {}] {}{}", task_id, status, duration);
145 }
146 ZmqEvent::TaskFailed { task_id, reason } => {
147 println!("[TASK {}] FAILED: {}", task_id, reason);
148 }
149 ZmqEvent::ValidationStarted => {
150 println!("[VALIDATION] Running...");
151 }
152 ZmqEvent::ValidationCompleted { passed, output } => {
153 let status = if *passed { "PASSED" } else { "FAILED" };
154 println!("[VALIDATION] {}: {}", status, output);
155 }
156 ZmqEvent::ValidationPassed => {
157 println!("[VALIDATION] PASSED");
158 }
159 ZmqEvent::ValidationFailed { failures } => {
160 println!("[VALIDATION] FAILED:");
161 for failure in failures {
162 println!(" - {}", failure);
163 }
164 }
165 ZmqEvent::WaveCompleted { wave, duration_ms } => {
166 let duration = duration_ms
167 .map(|d| format!(" ({}ms)", d))
168 .unwrap_or_default();
169 println!("[WAVE {}] Completed{}", wave, duration);
170 }
171 ZmqEvent::SwarmCompleted { success } => {
172 let status = if *success { "SUCCESS" } else { "FAILED" };
173 println!("[SWARM] Completed: {}", status);
174 }
175 ZmqEvent::SwarmPaused => {
176 println!("[SWARM] Paused");
177 }
178 ZmqEvent::SwarmResumed => {
179 println!("[SWARM] Resumed");
180 }
181 ZmqEvent::Heartbeat { timestamp } => {
182 println!("[HEARTBEAT] {}", timestamp);
183 }
184 ZmqEvent::ToolCall { task_id, tool, input_summary } => {
185 let summary = input_summary.as_ref().map(|s| format!(" ({})", s)).unwrap_or_default();
186 println!("[TOOL {}] {} called{}", task_id, tool, summary);
187 }
188 ZmqEvent::ToolResult { task_id, tool, success, duration_ms } => {
189 let status = if *success { "success" } else { "failed" };
190 let duration = duration_ms.map(|d| format!(" ({}ms)", d)).unwrap_or_default();
191 println!("[TOOL {}] {} {}{}", task_id, tool, status, duration);
192 }
193 ZmqEvent::FileRead { task_id, path } => {
194 println!("[FILE {}] Read: {}", task_id, path);
195 }
196 ZmqEvent::FileWrite { task_id, path, lines_changed } => {
197 let lines = lines_changed.map(|l| format!(" ({} lines)", l)).unwrap_or_default();
198 println!("[FILE {}] Write: {}{}", task_id, path, lines);
199 }
200 ZmqEvent::DependencyMet { task_id, dependency_id } => {
201 println!("[DEP {}] Met: {}", task_id, dependency_id);
202 }
203 ZmqEvent::TaskUnblocked { task_id, by_task_id } => {
204 println!("[BLOCK {}] Unblocked by: {}", task_id, by_task_id);
205 }
206 ZmqEvent::RepairStarted { attempt, task_ids } => {
207 println!("[REPAIR] Started attempt {} for tasks: {:?}", attempt, task_ids);
208 }
209 ZmqEvent::RepairCompleted { attempt, success } => {
210 let status = if *success { "succeeded" } else { "failed" };
211 println!("[REPAIR] Attempt {} {}", attempt, status);
212 }
213 }
214}