use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::process::{Command, Child};
use std::fs;
use std::collections::HashMap;
use chrono::Utc;
#[cfg(unix)]
use std::os::unix::fs::MetadataExt;
#[derive(Debug, Serialize, Deserialize)]
pub struct WorkerState {
pub pid: u32,
pub started_at: String,
pub command: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerManifestEntry {
pub name: String,
pub role: String,
pub codebase_path: String,
pub model: String,
pub output_dir: String,
#[serde(default)]
pub shared_data_dir: Option<String>,
#[serde(default)]
pub cli_template: Option<String>,
#[serde(default)]
pub cli_template_light: Option<String>,
#[serde(default)]
pub hands_off_to: Vec<String>,
}
fn validate_workdir(path: &Path) -> Result<PathBuf> {
let canonical = fs::canonicalize(path)
.map_err(|e| anyhow!("Cannot access workdir '{}': {}", path.display(), e))?;
let metadata = fs::metadata(&canonical)
.map_err(|e| anyhow!("Cannot stat '{}': {}", canonical.display(), e))?;
if !metadata.is_dir() {
return Err(anyhow!("'{}' is not a directory", canonical.display()));
}
#[cfg(unix)]
{
let perms = metadata.permissions();
if !perms.readonly() || metadata.mode() & 0o700 == 0 {
}
}
Ok(canonical)
}
pub fn spawn_worker(
name: &str,
workdir: &Path,
model: &str,
instance_name: &str,
server: &str,
token: Option<&str>,
cli_template: Option<&str>,
) -> Result<Child> {
let validated_workdir = validate_workdir(workdir)?;
if !model.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_') || model.len() > 20 {
return Err(anyhow!("Invalid model name: '{}'", model));
}
if !instance_name.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_') {
return Err(anyhow!("Invalid instance name: '{}'", instance_name));
}
let mut cmd = Command::new("collab");
cmd.arg("worker")
.arg("--workdir").arg(&validated_workdir)
.arg("--model").arg(model);
if let Some(tmpl) = cli_template {
cmd.arg("--cli-template").arg(tmpl);
}
cmd.env("COLLAB_INSTANCE", instance_name);
cmd.env("COLLAB_SERVER", server);
if let Some(token) = token {
cmd.env("COLLAB_TOKEN", token);
}
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
cmd.process_group(0);
}
let child = cmd.spawn()
.map_err(|e| anyhow!("Failed to spawn collab worker for '{}': {}", name, e))?;
println!("✓ Started worker {} (PID: {})", name, child.id());
Ok(child)
}
pub fn read_manifest(manifest_path: &Path) -> Result<Vec<WorkerManifestEntry>> {
if !manifest_path.exists() {
return Err(anyhow!(
"Manifest not found: {}\nRun 'collab init workers.yml' first",
manifest_path.display()
));
}
#[cfg(unix)]
{
let metadata = fs::metadata(manifest_path)?;
let mode = metadata.mode();
if mode & 0o044 != 0 {
eprintln!("⚠ Warning: {} has group/other read permissions", manifest_path.display());
}
}
let content = fs::read_to_string(manifest_path)
.map_err(|e| anyhow!("Cannot read manifest {}: {}", manifest_path.display(), e))?;
serde_json::from_str(&content)
.map_err(|e| anyhow!("Invalid JSON in manifest {}: {}", manifest_path.display(), e))
}
pub fn save_worker_pid(pids_file: &Path, name: &str, pid: u32, command: &str) -> Result<()> {
let mut state: HashMap<String, WorkerState> = if pids_file.exists() {
let content = fs::read_to_string(pids_file)?;
serde_json::from_str(&content).unwrap_or_default()
} else {
HashMap::new()
};
state.insert(
name.to_string(),
WorkerState {
pid,
started_at: Utc::now().to_rfc3339(),
command: command.to_string(),
},
);
let json = serde_json::to_string_pretty(&state)?;
fs::write(pids_file, json)?;
#[cfg(unix)]
{
let perms = std::os::unix::fs::PermissionsExt::from_mode(0o600);
fs::set_permissions(pids_file, perms)?;
}
Ok(())
}
pub fn process_exists(pid: u32) -> bool {
#[cfg(unix)]
{
unsafe {
libc::kill(pid as i32, 0) == 0
}
}
#[cfg(not(unix))]
{
true
}
}
#[cfg(unix)]
fn process_group_exists(pgid: u32) -> bool {
let Ok(entries) = std::fs::read_dir("/proc") else {
return unsafe { libc::killpg(pgid as i32, 0) == 0 };
};
for entry in entries.flatten() {
let name = entry.file_name();
let s = name.to_string_lossy();
if s.chars().all(|c| c.is_ascii_digit()) {
let stat = std::fs::read_to_string(entry.path().join("stat")).unwrap_or_default();
let fields: Vec<&str> = stat.split_whitespace().collect();
if fields.get(4).map(|g| *g == pgid.to_string()).unwrap_or(false) {
return true;
}
}
}
false
}
pub fn kill_process(pid: u32, name: &str) -> Result<()> {
if !process_exists(pid) {
println!("⚠ Process {} (PID {}) not found", name, pid);
return Ok(());
}
#[cfg(unix)]
{
unsafe {
libc::killpg(pid as i32, libc::SIGTERM);
}
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
loop {
std::thread::sleep(std::time::Duration::from_millis(200));
if !process_group_exists(pid) {
break;
}
if std::time::Instant::now() >= deadline {
unsafe { libc::killpg(pid as i32, libc::SIGKILL); }
std::thread::sleep(std::time::Duration::from_millis(200));
break;
}
}
}
println!("✓ Stopped worker {} (PID {})", name, pid);
Ok(())
}
pub fn remove_worker_pid(pids_file: &Path, name: &str) -> Result<()> {
if !pids_file.exists() {
return Ok(());
}
let mut state: HashMap<String, WorkerState> = {
let content = fs::read_to_string(pids_file)?;
serde_json::from_str(&content).unwrap_or_default()
};
state.remove(name);
if state.is_empty() {
fs::remove_file(pids_file)?;
} else {
let json = serde_json::to_string_pretty(&state)?;
fs::write(pids_file, json)?;
#[cfg(unix)]
{
let perms = std::os::unix::fs::PermissionsExt::from_mode(0o600);
fs::set_permissions(pids_file, perms)?;
}
}
Ok(())
}