use serde::{Deserialize, Serialize};
use std::path::Path;
#[cfg(feature = "zmq")]
use std::path::PathBuf;
#[cfg(feature = "zmq")]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "zmq")]
use std::sync::Arc;
#[cfg(feature = "zmq")]
use zmq;
#[derive(Debug, Clone)]
pub struct DiscoveredSession {
pub session_id: String,
pub tag: String,
pub pub_endpoint: String,
pub rep_endpoint: String,
pub session_dir: std::path::PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "command", rename_all = "snake_case")]
pub enum ControlCommand {
Pause,
Resume,
Stop,
Status,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ControlResponse {
pub success: bool,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<SwarmStatus>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmStatus {
pub state: String, pub current_wave: usize,
pub total_waves: usize,
pub tasks_completed: usize,
pub tasks_total: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum ZmqEvent {
SwarmStarted {
session_id: String,
tag: String,
total_waves: usize,
},
WaveStarted {
wave: usize,
tasks: Vec<String>,
task_count: usize,
},
TaskStarted {
task_id: String,
},
TaskSpawned {
task_id: String,
},
TaskOutput {
task_id: String,
text: String,
},
TaskCompleted {
task_id: String,
success: bool,
duration_ms: Option<u64>,
},
TaskFailed {
task_id: String,
reason: String,
},
ValidationStarted,
ValidationCompleted {
passed: bool,
output: String,
},
ValidationPassed,
ValidationFailed {
failures: Vec<String>,
},
WaveCompleted {
wave: usize,
duration_ms: Option<u64>,
},
SwarmCompleted {
success: bool,
},
SwarmPaused,
SwarmResumed,
Heartbeat {
timestamp: String,
},
ToolCall {
task_id: String,
tool: String,
input_summary: Option<String>,
},
ToolResult {
task_id: String,
tool: String,
success: bool,
duration_ms: Option<u64>,
},
FileRead {
task_id: String,
path: String,
},
FileWrite {
task_id: String,
path: String,
lines_changed: Option<u32>,
},
DependencyMet {
task_id: String,
dependency_id: String,
},
TaskUnblocked {
task_id: String,
by_task_id: String,
},
RepairStarted {
attempt: usize,
task_ids: Vec<String>,
},
RepairCompleted {
attempt: usize,
success: bool,
},
}
pub fn discover_endpoints(session_dir: &Path) -> Option<(String, String)> {
let pub_path = session_dir.join("zmq-pub.addr");
let rep_path = session_dir.join("zmq-rep.addr");
if pub_path.exists() && rep_path.exists() {
if let (Ok(pub_addr), Ok(rep_addr)) = (
std::fs::read_to_string(&pub_path),
std::fs::read_to_string(&rep_path),
) {
Some((pub_addr.trim().to_string(), rep_addr.trim().to_string()))
} else {
None
}
} else {
None
}
}
pub fn discover_sessions(project_root: &Path) -> Vec<DiscoveredSession> {
let swarm_dir = project_root.join(".scud/swarm");
let mut sessions = vec![];
if let Ok(entries) = std::fs::read_dir(&swarm_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some((pub_addr, rep_addr)) = discover_endpoints(&path) {
let session_id = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let tag = session_id
.split('-')
.next()
.unwrap_or("unknown")
.to_string();
sessions.push(DiscoveredSession {
session_id,
tag,
pub_endpoint: pub_addr,
rep_endpoint: rep_addr,
session_dir: path,
});
}
}
}
}
sessions
}
#[cfg(feature = "zmq")]
pub struct EventPublisher {
pub_socket: zmq::Socket,
rep_socket: zmq::Socket,
pub_endpoint: String,
rep_endpoint: String,
session_dir: PathBuf,
_context: zmq::Context, }
#[cfg(feature = "zmq")]
impl EventPublisher {
pub fn new(session_dir: &Path) -> Result<Self> {
let context = zmq::Context::new();
let pub_endpoint = "tcp://127.0.0.1:5555".to_string();
let rep_endpoint = "tcp://127.0.0.1:5556".to_string();
let pub_socket = context.socket(zmq::PUB)?;
pub_socket.bind(&pub_endpoint)?;
tracing::info!("ZMQ PUB bound to {}", pub_endpoint);
let rep_socket = context.socket(zmq::REP)?;
rep_socket.bind(&rep_endpoint)?;
tracing::info!("ZMQ REP bound to {}", rep_endpoint);
let publisher = Self {
pub_socket,
rep_socket,
pub_endpoint,
rep_endpoint,
session_dir: session_dir.to_path_buf(),
_context: context,
};
publisher.write_discovery_files()?;
Ok(publisher)
}
fn write_discovery_files(&self) -> Result<()> {
std::fs::create_dir_all(&self.session_dir)?;
let pub_path = self.session_dir.join("zmq-pub.addr");
std::fs::write(&pub_path, &self.pub_endpoint)?;
tracing::debug!("Wrote PUB address to {:?}", pub_path);
let rep_path = self.session_dir.join("zmq-rep.addr");
std::fs::write(&rep_path, &self.rep_endpoint)?;
tracing::debug!("Wrote REP address to {:?}", rep_path);
Ok(())
}
pub fn publish(&self, event: &ZmqEvent) -> Result<()> {
let json = serde_json::to_string(event)?;
self.pub_socket.send(&json, 0)?;
Ok(())
}
pub fn rep_socket(&self) -> &zmq::Socket {
&self.rep_socket
}
pub fn pub_endpoint(&self) -> &str {
&self.pub_endpoint
}
pub fn rep_endpoint(&self) -> &str {
&self.rep_endpoint
}
pub fn handle_control_request(
&self,
pause_flag: &Arc<AtomicBool>,
stop_flag: &Arc<AtomicBool>,
status_fn: &dyn Fn() -> SwarmStatus,
) -> Result<Option<String>> {
match self.rep_socket.recv_string(0) {
Ok(Ok(request)) => {
let response = match serde_json::from_str::<ControlCommand>(&request) {
Ok(ControlCommand::Pause) => {
pause_flag.store(true, Ordering::SeqCst);
ControlResponse {
success: true,
message: "Swarm paused".into(),
status: None,
}
}
Ok(ControlCommand::Resume) => {
pause_flag.store(false, Ordering::SeqCst);
ControlResponse {
success: true,
message: "Swarm resumed".into(),
status: None,
}
}
Ok(ControlCommand::Stop) => {
stop_flag.store(true, Ordering::SeqCst);
ControlResponse {
success: true,
message: "Swarm stopping".into(),
status: None,
}
}
Ok(ControlCommand::Status) => ControlResponse {
success: true,
message: "Status retrieved".into(),
status: Some(status_fn()),
},
Err(e) => ControlResponse {
success: false,
message: format!("Invalid command: {}", e),
status: None,
},
};
let response_json = serde_json::to_string(&response).unwrap_or_else(|_| {
r#"{"success":false,"message":"Serialization error"}"#.to_string()
});
self.rep_socket.send(&response_json, 0)?;
Ok(Some(request))
}
Ok(Err(_)) => {
Ok(None)
}
Err(zmq::Error::EAGAIN) => {
Ok(None)
}
Err(e) => Err(anyhow::anyhow!("REP socket error: {}", e)),
}
}
pub fn cleanup(&self) {
let _ = std::fs::remove_file(self.session_dir.join("zmq-pub.addr"));
let _ = std::fs::remove_file(self.session_dir.join("zmq-rep.addr"));
}
}
#[cfg(feature = "zmq")]
impl Drop for EventPublisher {
fn drop(&mut self) {
self.cleanup();
}
}