Skip to main content

scud/commands/swarm/
publisher.rs

1//! ZeroMQ event publisher for real-time swarm monitoring
2//!
3//! Publishes swarm events via PUB socket and accepts control commands via REP socket.
4//! Clients can discover socket addresses via files in `.scud/swarm/<session>/`.
5
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8#[cfg(feature = "zmq")]
9use std::path::PathBuf;
10#[cfg(feature = "zmq")]
11use std::sync::atomic::{AtomicBool, Ordering};
12#[cfg(feature = "zmq")]
13use std::sync::Arc;
14#[cfg(feature = "zmq")]
15use zmq;
16
17/// Discovered swarm session with ZMQ endpoints
18#[derive(Debug, Clone)]
19pub struct DiscoveredSession {
20    pub session_id: String,
21    pub tag: String,
22    pub pub_endpoint: String,
23    pub rep_endpoint: String,
24    pub session_dir: std::path::PathBuf,
25}
26
27/// Control command format
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(tag = "command", rename_all = "snake_case")]
30pub enum ControlCommand {
31    Pause,
32    Resume,
33    Stop,
34    Status,
35}
36
37/// Control response format
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct ControlResponse {
40    pub success: bool,
41    pub message: String,
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub status: Option<SwarmStatus>,
44}
45
46/// Swarm status information
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct SwarmStatus {
49    pub state: String, // "running", "paused", "completed"
50    pub current_wave: usize,
51    pub total_waves: usize,
52    pub tasks_completed: usize,
53    pub tasks_total: usize,
54}
55
56/// ZMQ event format
57#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "event", rename_all = "snake_case")]
59pub enum ZmqEvent {
60    SwarmStarted {
61        session_id: String,
62        tag: String,
63        total_waves: usize,
64    },
65    WaveStarted {
66        wave: usize,
67        tasks: Vec<String>,
68        task_count: usize,
69    },
70    TaskStarted {
71        task_id: String,
72    },
73    TaskSpawned {
74        task_id: String,
75    },
76    TaskOutput {
77        task_id: String,
78        text: String,
79    },
80    TaskCompleted {
81        task_id: String,
82        success: bool,
83        duration_ms: Option<u64>,
84    },
85    TaskFailed {
86        task_id: String,
87        reason: String,
88    },
89    ValidationStarted,
90    ValidationCompleted {
91        passed: bool,
92        output: String,
93    },
94    ValidationPassed,
95    ValidationFailed {
96        failures: Vec<String>,
97    },
98    WaveCompleted {
99        wave: usize,
100        duration_ms: Option<u64>,
101    },
102    SwarmCompleted {
103        success: bool,
104    },
105    SwarmPaused,
106    SwarmResumed,
107    Heartbeat {
108        timestamp: String,
109    },
110    ToolCall {
111        task_id: String,
112        tool: String,
113        input_summary: Option<String>,
114    },
115    ToolResult {
116        task_id: String,
117        tool: String,
118        success: bool,
119        duration_ms: Option<u64>,
120    },
121    FileRead {
122        task_id: String,
123        path: String,
124    },
125    FileWrite {
126        task_id: String,
127        path: String,
128        lines_changed: Option<u32>,
129    },
130    DependencyMet {
131        task_id: String,
132        dependency_id: String,
133    },
134    TaskUnblocked {
135        task_id: String,
136        by_task_id: String,
137    },
138    RepairStarted {
139        attempt: usize,
140        task_ids: Vec<String>,
141    },
142    RepairCompleted {
143        attempt: usize,
144        success: bool,
145    },
146}
147
148/// Discover ZMQ endpoints for a specific session directory
149pub fn discover_endpoints(session_dir: &Path) -> Option<(String, String)> {
150    let pub_path = session_dir.join("zmq-pub.addr");
151    let rep_path = session_dir.join("zmq-rep.addr");
152
153    if pub_path.exists() && rep_path.exists() {
154        if let (Ok(pub_addr), Ok(rep_addr)) = (
155            std::fs::read_to_string(&pub_path),
156            std::fs::read_to_string(&rep_path),
157        ) {
158            Some((pub_addr.trim().to_string(), rep_addr.trim().to_string()))
159        } else {
160            None
161        }
162    } else {
163        None
164    }
165}
166
167/// Discover all running swarm sessions
168pub fn discover_sessions(project_root: &Path) -> Vec<DiscoveredSession> {
169    let swarm_dir = project_root.join(".scud/swarm");
170    let mut sessions = vec![];
171
172    if let Ok(entries) = std::fs::read_dir(&swarm_dir) {
173        for entry in entries.flatten() {
174            let path = entry.path();
175            if path.is_dir() {
176                if let Some((pub_addr, rep_addr)) = discover_endpoints(&path) {
177                    // Try to get tag from session name (format: tag-timestamp)
178                    let session_id = path
179                        .file_name()
180                        .and_then(|n| n.to_str())
181                        .unwrap_or("unknown")
182                        .to_string();
183
184                    let tag = session_id
185                        .split('-')
186                        .next()
187                        .unwrap_or("unknown")
188                        .to_string();
189
190                    sessions.push(DiscoveredSession {
191                        session_id,
192                        tag,
193                        pub_endpoint: pub_addr,
194                        rep_endpoint: rep_addr,
195                        session_dir: path,
196                    });
197                }
198            }
199        }
200    }
201
202    sessions
203}
204
205/// ZMQ Publisher for swarm events
206#[cfg(feature = "zmq")]
207pub struct EventPublisher {
208    pub_socket: zmq::Socket,
209    rep_socket: zmq::Socket,
210    pub_endpoint: String,
211    rep_endpoint: String,
212    session_dir: PathBuf,
213    _context: zmq::Context, // Keep context alive
214}
215
216#[cfg(feature = "zmq")]
217impl EventPublisher {
218    /// Create and bind publisher sockets
219    ///
220    /// Binds to dynamic ports on localhost and writes addresses to discovery files.
221    pub fn new(session_dir: &Path) -> Result<Self> {
222        let context = zmq::Context::new();
223
224        // For simplicity, use fixed endpoints
225        let pub_endpoint = "tcp://127.0.0.1:5555".to_string();
226        let rep_endpoint = "tcp://127.0.0.1:5556".to_string();
227
228        // Create PUB socket for events
229        let pub_socket = context.socket(zmq::PUB)?;
230        pub_socket.bind(&pub_endpoint)?;
231        tracing::info!("ZMQ PUB bound to {}", pub_endpoint);
232
233        // Create REP socket for control
234        let rep_socket = context.socket(zmq::REP)?;
235        rep_socket.bind(&rep_endpoint)?;
236        tracing::info!("ZMQ REP bound to {}", rep_endpoint);
237
238        let publisher = Self {
239            pub_socket,
240            rep_socket,
241            pub_endpoint,
242            rep_endpoint,
243            session_dir: session_dir.to_path_buf(),
244            _context: context,
245        };
246
247        // Write discovery files
248        publisher.write_discovery_files()?;
249
250        Ok(publisher)
251    }
252
253    /// Write socket addresses to discovery files
254    fn write_discovery_files(&self) -> Result<()> {
255        std::fs::create_dir_all(&self.session_dir)?;
256
257        let pub_path = self.session_dir.join("zmq-pub.addr");
258        std::fs::write(&pub_path, &self.pub_endpoint)?;
259        tracing::debug!("Wrote PUB address to {:?}", pub_path);
260
261        let rep_path = self.session_dir.join("zmq-rep.addr");
262        std::fs::write(&rep_path, &self.rep_endpoint)?;
263        tracing::debug!("Wrote REP address to {:?}", rep_path);
264
265        Ok(())
266    }
267
268    /// Publish an event to all subscribers
269    pub fn publish(&self, event: &ZmqEvent) -> Result<()> {
270        let json = serde_json::to_string(event)?;
271        self.pub_socket.send(&json, 0)?;
272        Ok(())
273    }
274
275    /// Get the REP socket for control command handling
276    pub fn rep_socket(&self) -> &zmq::Socket {
277        &self.rep_socket
278    }
279
280    /// Get the PUB endpoint address
281    pub fn pub_endpoint(&self) -> &str {
282        &self.pub_endpoint
283    }
284
285    /// Get the REP endpoint address
286    pub fn rep_endpoint(&self) -> &str {
287        &self.rep_endpoint
288    }
289
290    /// Handle REP socket requests (control commands)
291    pub fn handle_control_request(
292        &self,
293        pause_flag: &Arc<AtomicBool>,
294        stop_flag: &Arc<AtomicBool>,
295        status_fn: &dyn Fn() -> SwarmStatus,
296    ) -> Result<Option<String>> {
297        // Try to receive a request (non-blocking)
298        match self.rep_socket.recv_string(0) {
299            Ok(Ok(request)) => {
300                let response = match serde_json::from_str::<ControlCommand>(&request) {
301                    Ok(ControlCommand::Pause) => {
302                        pause_flag.store(true, Ordering::SeqCst);
303                        ControlResponse {
304                            success: true,
305                            message: "Swarm paused".into(),
306                            status: None,
307                        }
308                    }
309                    Ok(ControlCommand::Resume) => {
310                        pause_flag.store(false, Ordering::SeqCst);
311                        ControlResponse {
312                            success: true,
313                            message: "Swarm resumed".into(),
314                            status: None,
315                        }
316                    }
317                    Ok(ControlCommand::Stop) => {
318                        stop_flag.store(true, Ordering::SeqCst);
319                        ControlResponse {
320                            success: true,
321                            message: "Swarm stopping".into(),
322                            status: None,
323                        }
324                    }
325                    Ok(ControlCommand::Status) => ControlResponse {
326                        success: true,
327                        message: "Status retrieved".into(),
328                        status: Some(status_fn()),
329                    },
330                    Err(e) => ControlResponse {
331                        success: false,
332                        message: format!("Invalid command: {}", e),
333                        status: None,
334                    },
335                };
336
337                let response_json = serde_json::to_string(&response).unwrap_or_else(|_| {
338                    r#"{"success":false,"message":"Serialization error"}"#.to_string()
339                });
340                self.rep_socket.send(&response_json, 0)?;
341                Ok(Some(request))
342            }
343            Ok(Err(_)) => {
344                // Received non-UTF8 data, ignore
345                Ok(None)
346            }
347            Err(zmq::Error::EAGAIN) => {
348                // No message available
349                Ok(None)
350            }
351            Err(e) => Err(anyhow::anyhow!("REP socket error: {}", e)),
352        }
353    }
354
355    /// Clean up discovery files on shutdown
356    pub fn cleanup(&self) {
357        let _ = std::fs::remove_file(self.session_dir.join("zmq-pub.addr"));
358        let _ = std::fs::remove_file(self.session_dir.join("zmq-rep.addr"));
359    }
360}
361
362#[cfg(feature = "zmq")]
363impl Drop for EventPublisher {
364    fn drop(&mut self) {
365        self.cleanup();
366    }
367}