1use serde::{Deserialize, Serialize};
7use std::path::Path;
8#[cfg(feature = "zmq")]
9use std::path::PathBuf;
10#[cfg(feature = "zmq")]
11use std::sync::atomic::{AtomicBool, Ordering};
12#[cfg(feature = "zmq")]
13use std::sync::Arc;
14#[cfg(feature = "zmq")]
15use zmq;
16
17#[derive(Debug, Clone)]
19pub struct DiscoveredSession {
20 pub session_id: String,
21 pub tag: String,
22 pub pub_endpoint: String,
23 pub rep_endpoint: String,
24 pub session_dir: std::path::PathBuf,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(tag = "command", rename_all = "snake_case")]
30pub enum ControlCommand {
31 Pause,
32 Resume,
33 Stop,
34 Status,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct ControlResponse {
40 pub success: bool,
41 pub message: String,
42 #[serde(skip_serializing_if = "Option::is_none")]
43 pub status: Option<SwarmStatus>,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct SwarmStatus {
49 pub state: String, pub current_wave: usize,
51 pub total_waves: usize,
52 pub tasks_completed: usize,
53 pub tasks_total: usize,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "event", rename_all = "snake_case")]
59pub enum ZmqEvent {
60 SwarmStarted {
61 session_id: String,
62 tag: String,
63 total_waves: usize,
64 },
65 WaveStarted {
66 wave: usize,
67 tasks: Vec<String>,
68 task_count: usize,
69 },
70 TaskStarted {
71 task_id: String,
72 },
73 TaskSpawned {
74 task_id: String,
75 },
76 TaskOutput {
77 task_id: String,
78 text: String,
79 },
80 TaskCompleted {
81 task_id: String,
82 success: bool,
83 duration_ms: Option<u64>,
84 },
85 TaskFailed {
86 task_id: String,
87 reason: String,
88 },
89 ValidationStarted,
90 ValidationCompleted {
91 passed: bool,
92 output: String,
93 },
94 ValidationPassed,
95 ValidationFailed {
96 failures: Vec<String>,
97 },
98 WaveCompleted {
99 wave: usize,
100 duration_ms: Option<u64>,
101 },
102 SwarmCompleted {
103 success: bool,
104 },
105 SwarmPaused,
106 SwarmResumed,
107 Heartbeat {
108 timestamp: String,
109 },
110 ToolCall {
111 task_id: String,
112 tool: String,
113 input_summary: Option<String>,
114 },
115 ToolResult {
116 task_id: String,
117 tool: String,
118 success: bool,
119 duration_ms: Option<u64>,
120 },
121 FileRead {
122 task_id: String,
123 path: String,
124 },
125 FileWrite {
126 task_id: String,
127 path: String,
128 lines_changed: Option<u32>,
129 },
130 DependencyMet {
131 task_id: String,
132 dependency_id: String,
133 },
134 TaskUnblocked {
135 task_id: String,
136 by_task_id: String,
137 },
138 RepairStarted {
139 attempt: usize,
140 task_ids: Vec<String>,
141 },
142 RepairCompleted {
143 attempt: usize,
144 success: bool,
145 },
146}
147
148pub fn discover_endpoints(session_dir: &Path) -> Option<(String, String)> {
150 let pub_path = session_dir.join("zmq-pub.addr");
151 let rep_path = session_dir.join("zmq-rep.addr");
152
153 if pub_path.exists() && rep_path.exists() {
154 if let (Ok(pub_addr), Ok(rep_addr)) = (
155 std::fs::read_to_string(&pub_path),
156 std::fs::read_to_string(&rep_path),
157 ) {
158 Some((pub_addr.trim().to_string(), rep_addr.trim().to_string()))
159 } else {
160 None
161 }
162 } else {
163 None
164 }
165}
166
167pub fn discover_sessions(project_root: &Path) -> Vec<DiscoveredSession> {
169 let swarm_dir = project_root.join(".scud/swarm");
170 let mut sessions = vec![];
171
172 if let Ok(entries) = std::fs::read_dir(&swarm_dir) {
173 for entry in entries.flatten() {
174 let path = entry.path();
175 if path.is_dir() {
176 if let Some((pub_addr, rep_addr)) = discover_endpoints(&path) {
177 let session_id = path
179 .file_name()
180 .and_then(|n| n.to_str())
181 .unwrap_or("unknown")
182 .to_string();
183
184 let tag = session_id
185 .split('-')
186 .next()
187 .unwrap_or("unknown")
188 .to_string();
189
190 sessions.push(DiscoveredSession {
191 session_id,
192 tag,
193 pub_endpoint: pub_addr,
194 rep_endpoint: rep_addr,
195 session_dir: path,
196 });
197 }
198 }
199 }
200 }
201
202 sessions
203}
204
205#[cfg(feature = "zmq")]
207pub struct EventPublisher {
208 pub_socket: zmq::Socket,
209 rep_socket: zmq::Socket,
210 pub_endpoint: String,
211 rep_endpoint: String,
212 session_dir: PathBuf,
213 _context: zmq::Context, }
215
216#[cfg(feature = "zmq")]
217impl EventPublisher {
218 pub fn new(session_dir: &Path) -> Result<Self> {
222 let context = zmq::Context::new();
223
224 let pub_endpoint = "tcp://127.0.0.1:5555".to_string();
226 let rep_endpoint = "tcp://127.0.0.1:5556".to_string();
227
228 let pub_socket = context.socket(zmq::PUB)?;
230 pub_socket.bind(&pub_endpoint)?;
231 tracing::info!("ZMQ PUB bound to {}", pub_endpoint);
232
233 let rep_socket = context.socket(zmq::REP)?;
235 rep_socket.bind(&rep_endpoint)?;
236 tracing::info!("ZMQ REP bound to {}", rep_endpoint);
237
238 let publisher = Self {
239 pub_socket,
240 rep_socket,
241 pub_endpoint,
242 rep_endpoint,
243 session_dir: session_dir.to_path_buf(),
244 _context: context,
245 };
246
247 publisher.write_discovery_files()?;
249
250 Ok(publisher)
251 }
252
253 fn write_discovery_files(&self) -> Result<()> {
255 std::fs::create_dir_all(&self.session_dir)?;
256
257 let pub_path = self.session_dir.join("zmq-pub.addr");
258 std::fs::write(&pub_path, &self.pub_endpoint)?;
259 tracing::debug!("Wrote PUB address to {:?}", pub_path);
260
261 let rep_path = self.session_dir.join("zmq-rep.addr");
262 std::fs::write(&rep_path, &self.rep_endpoint)?;
263 tracing::debug!("Wrote REP address to {:?}", rep_path);
264
265 Ok(())
266 }
267
268 pub fn publish(&self, event: &ZmqEvent) -> Result<()> {
270 let json = serde_json::to_string(event)?;
271 self.pub_socket.send(&json, 0)?;
272 Ok(())
273 }
274
275 pub fn rep_socket(&self) -> &zmq::Socket {
277 &self.rep_socket
278 }
279
280 pub fn pub_endpoint(&self) -> &str {
282 &self.pub_endpoint
283 }
284
285 pub fn rep_endpoint(&self) -> &str {
287 &self.rep_endpoint
288 }
289
290 pub fn handle_control_request(
292 &self,
293 pause_flag: &Arc<AtomicBool>,
294 stop_flag: &Arc<AtomicBool>,
295 status_fn: &dyn Fn() -> SwarmStatus,
296 ) -> Result<Option<String>> {
297 match self.rep_socket.recv_string(0) {
299 Ok(Ok(request)) => {
300 let response = match serde_json::from_str::<ControlCommand>(&request) {
301 Ok(ControlCommand::Pause) => {
302 pause_flag.store(true, Ordering::SeqCst);
303 ControlResponse {
304 success: true,
305 message: "Swarm paused".into(),
306 status: None,
307 }
308 }
309 Ok(ControlCommand::Resume) => {
310 pause_flag.store(false, Ordering::SeqCst);
311 ControlResponse {
312 success: true,
313 message: "Swarm resumed".into(),
314 status: None,
315 }
316 }
317 Ok(ControlCommand::Stop) => {
318 stop_flag.store(true, Ordering::SeqCst);
319 ControlResponse {
320 success: true,
321 message: "Swarm stopping".into(),
322 status: None,
323 }
324 }
325 Ok(ControlCommand::Status) => ControlResponse {
326 success: true,
327 message: "Status retrieved".into(),
328 status: Some(status_fn()),
329 },
330 Err(e) => ControlResponse {
331 success: false,
332 message: format!("Invalid command: {}", e),
333 status: None,
334 },
335 };
336
337 let response_json = serde_json::to_string(&response).unwrap_or_else(|_| {
338 r#"{"success":false,"message":"Serialization error"}"#.to_string()
339 });
340 self.rep_socket.send(&response_json, 0)?;
341 Ok(Some(request))
342 }
343 Ok(Err(_)) => {
344 Ok(None)
346 }
347 Err(zmq::Error::EAGAIN) => {
348 Ok(None)
350 }
351 Err(e) => Err(anyhow::anyhow!("REP socket error: {}", e)),
352 }
353 }
354
355 pub fn cleanup(&self) {
357 let _ = std::fs::remove_file(self.session_dir.join("zmq-pub.addr"));
358 let _ = std::fs::remove_file(self.session_dir.join("zmq-rep.addr"));
359 }
360}
361
362#[cfg(feature = "zmq")]
363impl Drop for EventPublisher {
364 fn drop(&mut self) {
365 self.cleanup();
366 }
367}