use super::*;
pub(crate) async fn handle_ipc_command(
command: WatcherCommand,
agents: Arc<Mutex<HashMap<String, ManagedAgent>>>,
) -> WatcherResponse {
match command {
WatcherCommand::Deploy { name } => {
if let Err(e) = validate_agent_name(&name) {
return WatcherResponse::Error { message: e };
}
let mut agents = agents.lock().await;
let config_path = watcher_dir().join(&name).join("config.toml");
if !config_path.exists() {
return WatcherResponse::Error {
message: format!("Agent '{}' not found. Run: watcher init {}", name, name)
};
}
let config = match AgentConfig::load(&config_path) {
Ok(config) => config,
Err(e) => return WatcherResponse::Error {
message: format!("Failed to load agent '{}': {}", name, e)
}
};
if let Some(agent) = agents.get_mut(&name) {
if agent.is_running() {
return WatcherResponse::Error {
message: format!("Agent '{}' is already running", name)
};
}
agent.stopped = false;
if agent.config.agent.trigger == "always" {
match spawn_agent(agent, "deploy restart").await {
Ok(()) => WatcherResponse::Ok {
message: format!("Agent '{}' deployed and started", name)
},
Err(e) => WatcherResponse::Error {
message: format!("Failed to start agent '{}': {}", name, e)
}
}
} else {
WatcherResponse::Ok {
message: format!("Agent '{}' deployed", name)
}
}
} else {
let mut agent = ManagedAgent::new(name.clone(), config_path, config);
if agent.config.agent.trigger == "always" {
match spawn_agent(&mut agent, "deploy start").await {
Ok(()) => {
agents.insert(name.clone(), agent);
WatcherResponse::Ok {
message: format!("Agent '{}' deployed and started", name)
}
},
Err(e) => WatcherResponse::Error {
message: format!("Failed to start agent '{}': {}", name, e)
}
}
} else {
agents.insert(name.clone(), agent);
WatcherResponse::Ok {
message: format!("Agent '{}' deployed", name)
}
}
}
}
WatcherCommand::Stop { name } => {
let mut agents = agents.lock().await;
if let Some(agent) = agents.get_mut(&name) {
agent.stopped = true;
if let Some(ref mut child) = agent.child {
let _ = child.kill().await;
let _ = child.wait().await;
}
WatcherResponse::Ok {
message: format!("Agent '{}' stopped", name)
}
} else {
WatcherResponse::Error {
message: format!("Agent '{}' not found or not running", name)
}
}
}
WatcherCommand::Status => {
let agents = agents.lock().await;
let agent_info: Vec<AgentStatusInfo> = agents.values()
.map(|agent| agent.to_status_info())
.collect();
WatcherResponse::Status { agents: agent_info }
}
WatcherCommand::AgentStatus { name } => {
let agents = agents.lock().await;
if let Some(agent) = agents.get(&name) {
WatcherResponse::AgentDetail {
info: agent.to_status_info()
}
} else {
WatcherResponse::Error {
message: format!("Agent '{}' not found", name)
}
}
}
}
}
pub(crate) async fn ipc_listener(agents: Arc<Mutex<HashMap<String, ManagedAgent>>>) {
let socket_path = watcher_dir().join("watcher.sock");
if socket_path.exists() {
if tokio::time::timeout(Duration::from_secs(2), UnixStream::connect(&socket_path)).await.is_ok() {
log("Another supervisor is already running");
std::process::exit(1);
} else {
log("Removing stale socket");
let _ = std::fs::remove_file(&socket_path);
}
}
let listener = match UnixListener::bind(&socket_path) {
Ok(listener) => listener,
Err(e) => {
log(&format!("Failed to bind IPC socket: {}", e));
return;
}
};
if let Err(e) = std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o600)) {
log(&format!("Failed to set socket permissions: {}", e));
return;
}
log(&format!("IPC listening on {}", socket_path.display()));
let semaphore = Arc::new(Semaphore::new(10));
loop {
match listener.accept().await {
Ok((stream, _)) => {
let agents = agents.clone();
let permit = semaphore.clone().try_acquire_owned();
match permit {
Ok(permit) => {
tokio::spawn(async move {
let _ = handle_ipc_connection(stream, agents).await;
drop(permit); });
}
Err(_) => {
log("IPC: too many concurrent connections, dropping");
}
}
}
Err(e) => {
log(&format!("IPC accept error: {}", e));
break;
}
}
}
}
pub(crate) async fn handle_ipc_connection(
mut stream: UnixStream,
agents: Arc<Mutex<HashMap<String, ManagedAgent>>>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut reader = BufReader::new(&mut stream);
let mut line = String::new();
reader.read_line(&mut line).await?;
let command: WatcherCommand = serde_json::from_str(line.trim())?;
let response = handle_ipc_command(command, agents).await;
let response_json = serde_json::to_string(&response)?;
stream.write_all(response_json.as_bytes()).await?;
stream.write_all(b"\n").await?;
stream.flush().await?;
Ok(())
}
pub(crate) async fn send_ipc_command(command: WatcherCommand) -> Result<WatcherResponse, String> {
let socket_path = watcher_dir().join("watcher.sock");
if !socket_path.exists() {
return Err("Supervisor not running. Start with: watcher run".to_string());
}
let connect_result = tokio::time::timeout(
Duration::from_secs(5),
UnixStream::connect(&socket_path)
).await;
let mut stream = match connect_result {
Ok(Ok(stream)) => stream,
Ok(Err(_)) => {
return Err("Supervisor socket is stale. Remove it and restart: watcher run".to_string());
}
Err(_) => {
return Err("Supervisor not responding (timeout). Try: watcher run".to_string());
}
};
let command_json = serde_json::to_string(&command)
.map_err(|e| format!("Failed to serialize command: {}", e))?;
stream.write_all(command_json.as_bytes()).await
.map_err(|e| format!("Failed to send command: {}", e))?;
stream.write_all(b"\n").await
.map_err(|e| format!("Failed to send command: {}", e))?;
stream.flush().await
.map_err(|e| format!("Failed to send command: {}", e))?;
let mut reader = BufReader::new(&mut stream);
let mut response_line = String::new();
reader.read_line(&mut response_line).await
.map_err(|e| format!("Failed to read response: {}", e))?;
serde_json::from_str(response_line.trim())
.map_err(|e| format!("Failed to parse response: {}", e))
}