use clap::{Parser, Subcommand};
use anyhow::Result;
use serde::Deserialize;
use std::path::PathBuf;
mod client;
mod init;
mod worker;
mod lifecycle;
#[cfg(feature = "monitor")]
mod monitor;
#[cfg(feature = "monitor")]
mod wizard;
use client::CollabClient;
#[derive(Debug, Deserialize, Default)]
struct Config {
host: Option<String>,
instance: Option<String>,
token: Option<String>,
}
fn load_config() -> Config {
let local = local_config_path().and_then(|p| read_config(&p));
let global = config_path().and_then(|p| read_config(&p));
match (local, global) {
(Some(l), Some(g)) => Config {
host: l.host.or(g.host),
instance: l.instance.or(g.instance),
token: l.token.or(g.token),
},
(Some(c), None) | (None, Some(c)) => c,
(None, None) => Config::default(),
}
}
fn read_config(path: &PathBuf) -> Option<Config> {
let contents = std::fs::read_to_string(path).ok()?;
toml::from_str::<Config>(&contents).ok()
}
fn local_config_path() -> Option<PathBuf> {
let home = home_dir()?;
let mut dir = std::env::current_dir().ok()?;
loop {
let candidate = dir.join(".collab.toml");
if candidate.exists() {
return Some(candidate);
}
if dir == home {
return None;
}
if !dir.pop() {
return None;
}
}
}
fn config_path() -> Option<PathBuf> {
home_dir().map(|h| h.join(".collab.toml"))
}
fn home_dir() -> Option<PathBuf> {
#[cfg(windows)]
{
std::env::var("USERPROFILE").ok().map(PathBuf::from).or_else(|| {
let drive = std::env::var("HOMEDRIVE").ok()?;
let path = std::env::var("HOMEPATH").ok()?;
Some(PathBuf::from(format!("{}{}", drive, path)))
})
}
#[cfg(not(windows))]
{
std::env::var("HOME").ok().map(PathBuf::from)
}
}
fn load_dotenv() {
let home = home_dir();
let mut dir = match std::env::current_dir() {
Ok(d) => d,
Err(_) => return,
};
loop {
let candidate = dir.join(".env");
if candidate.is_file() {
if let Ok(contents) = std::fs::read_to_string(&candidate) {
for line in contents.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some((key, val)) = line.split_once('=') {
let key = key.trim();
let val = val.trim().trim_matches('"').trim_matches('\'');
if std::env::var(key).is_err() {
std::env::set_var(key, val);
}
}
}
}
return;
}
if home.as_ref().map_or(false, |h| &dir == h) {
return;
}
if !dir.pop() {
return;
}
}
}
#[derive(Parser)]
#[command(name = "collab", version)]
#[command(about = "Collaboration tool for Claude Code instances", long_about = None)]
#[command(args_conflicts_with_subcommands = false)]
struct Cli {
#[arg(long, global = true)]
server: Option<String>,
#[arg(short, long, global = true)]
instance: Option<String>,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum TodoAction {
Add {
#[arg(value_name = "@INSTANCE")]
instance: String,
#[arg(value_name = "DESCRIPTION")]
description: String,
},
List {
#[arg(value_name = "@INSTANCE")]
instance: Option<String>,
#[arg(long = "for", value_name = "INSTANCE")]
for_instance: Option<String>,
},
Done {
#[arg(value_name = "HASH")]
hash: String,
},
}
#[derive(Subcommand)]
enum Commands {
List {
#[arg(short, long)]
all: bool,
#[arg(short, long, value_name = "@INSTANCE")]
from: Option<String>,
#[arg(long, value_name = "HASH")]
since: Option<String>,
},
Reply {
#[arg(value_name = "@INSTANCE")]
sender: String,
#[arg(value_name = "MESSAGE")]
message: String,
},
Show {
#[arg(value_name = "HASH")]
hash: String,
},
Status,
Add {
#[arg(value_name = "@INSTANCE")]
recipient: String,
#[arg(value_name = "MESSAGE")]
message: String,
#[arg(short, long, value_name = "HASH1,HASH2")]
refs: Option<String>,
},
Broadcast {
#[arg(value_name = "MESSAGE")]
message: String,
#[arg(short, long, value_name = "HASH1,HASH2")]
refs: Option<String>,
},
Stream {
#[arg(short, long, value_name = "DESCRIPTION")]
role: Option<String>,
},
History {
#[arg(value_name = "@INSTANCE")]
filter: Option<String>,
},
Roster,
#[cfg(feature = "monitor")]
Monitor {
#[arg(short, long, default_value = "2")]
interval: u64,
},
ConfigPath,
Usage,
Todo {
#[command(subcommand)]
action: TodoAction,
},
Init {
#[arg(value_name = "FILE")]
file: Option<PathBuf>,
#[arg(short, long, value_name = "DIR")]
output: Option<String>,
},
Worker {
#[arg(long, value_name = "PATH")]
workdir: Option<PathBuf>,
#[arg(long, value_name = "MODEL")]
model: Option<String>,
#[arg(long, value_name = "TEMPLATE")]
cli_template: Option<String>,
#[arg(long)]
auto_reply: Option<bool>,
#[arg(long, value_name = "MS")]
batch_wait: Option<u64>,
},
Start {
#[arg(value_name = "TARGET")]
target: String,
},
Stop {
#[arg(value_name = "TARGET")]
target: String,
},
Restart {
#[arg(value_name = "TARGET")]
target: String,
},
LifecycleStatus,
}
#[tokio::main]
async fn main() -> Result<()> {
load_dotenv();
let cli = Cli::parse();
let file_config = load_config();
let server = cli.server
.or_else(|| std::env::var("COLLAB_SERVER").ok())
.or(file_config.host.clone())
.unwrap_or_else(|| "http://localhost:8000".to_string());
let instance = cli.instance
.or_else(|| std::env::var("COLLAB_INSTANCE").ok())
.or(file_config.instance.clone());
let token = std::env::var("COLLAB_TOKEN").ok().or(file_config.token.clone());
if let Commands::Init { file, output } = cli.command {
match file {
Some(path) => {
init::run_from_yaml(&path, output.as_deref())?;
}
None => {
#[cfg(feature = "monitor")]
{
match wizard::run()? {
Some(config) => init::generate(&config, output.as_deref())?,
None => println!("Wizard cancelled."),
}
}
#[cfg(not(feature = "monitor"))]
{
anyhow::bail!(
"Interactive wizard requires the 'monitor' feature.\n\
Provide a YAML file instead: collab init workers.yaml"
);
}
}
}
return Ok(());
}
if let Commands::Worker { workdir, model, cli_template, auto_reply, batch_wait } = cli.command {
let workdir = workdir.unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
let model = model.unwrap_or_default();
let auto_reply = auto_reply.unwrap_or(true);
let batch_wait = batch_wait.unwrap_or(2000);
let instance_id = instance.ok_or_else(|| {
anyhow::anyhow!(
"Instance ID required. Set via --instance, $COLLAB_INSTANCE, or ~/.collab.toml"
)
})?;
let (hands_off_to, teammates, manifest_cli_template, manifest_cli_template_light) = match find_manifest() {
Ok(manifest_path) => {
match lifecycle::read_manifest(&manifest_path) {
Ok(manifest) => {
let entry = manifest.iter().find(|w| w.name == instance_id);
let hands_off = entry
.map(|w| w.hands_off_to.clone())
.unwrap_or_default();
let tmpl = entry
.and_then(|w| w.cli_template.clone());
let tmpl_light = entry
.and_then(|w| w.cli_template_light.clone());
let team: Vec<(String, String)> = manifest.iter()
.map(|w| (w.name.clone(), w.role.clone()))
.collect();
(hands_off, team, tmpl, tmpl_light)
}
Err(_) => (vec![], vec![], None, None),
}
}
Err(_) => (vec![], vec![], None, None),
};
let resolved_cli_template = cli_template.or(manifest_cli_template);
let harness = worker::WorkerHarness::new(
CollabClient::new(&server, &instance_id, token.as_deref()),
instance_id,
workdir,
model,
resolved_cli_template,
manifest_cli_template_light,
auto_reply,
batch_wait,
hands_off_to,
teammates,
);
harness.run().await?;
return Ok(());
}
if let Commands::Start { target } = cli.command {
return lifecycle_start(&target, &server, token.as_deref()).await;
}
if let Commands::Stop { target } = cli.command {
return lifecycle_stop(&target, &server, token.as_deref()).await;
}
if let Commands::Restart { target } = cli.command {
return lifecycle_restart(&target, &server, token.as_deref()).await;
}
if matches!(cli.command, Commands::LifecycleStatus) {
return lifecycle_status().await;
}
if matches!(cli.command, Commands::Roster) {
let client = CollabClient::new(&server, "", token.as_deref());
client.show_roster().await?;
return Ok(());
}
if matches!(cli.command, Commands::ConfigPath) {
if let Some(local) = local_config_path() {
println!("local: {}", local.display());
}
match config_path() {
Some(path) => println!("global: {}", path.display()),
None => println!("Could not determine home directory"),
}
return Ok(());
}
if matches!(cli.command, Commands::Usage) {
let log_path = find_manifest()
.map(|p| p.parent().unwrap().join("usage.log"))
.unwrap_or_else(|_| std::path::PathBuf::from(".collab/usage.log"));
if !log_path.exists() {
println!("No usage data yet. Workers log to {} after each invocation.", log_path.display());
return Ok(());
}
let content = std::fs::read_to_string(&log_path)?;
let mut per_worker: std::collections::HashMap<String, (u64, u64, u64, u32, String, u32, u32, f64)> = std::collections::HashMap::new();
let mut total_input: u64 = 0;
let mut total_output: u64 = 0;
let mut total_duration: u64 = 0;
let mut total_calls: u32 = 0;
let mut total_light: u32 = 0;
let mut total_full: u32 = 0;
let mut total_cost: f64 = 0.0;
let mut any_cost = false;
for line in content.lines() {
if line.len() > 1024 || line.is_empty() { continue; }
let cols: Vec<&str> = line.split('\t').collect();
if cols.len() >= 5 {
let worker = cols[1];
if worker.len() > 64 || !worker.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_') {
continue;
}
let worker = worker.to_string();
let dur: u64 = cols[2].parse().unwrap_or(0);
let inp: u64 = cols[3].parse().unwrap_or(0);
let out: u64 = cols[4].parse().unwrap_or(0);
let model = if cols.len() >= 6 { cols[5].to_string() } else { "?".to_string() };
let tier = if cols.len() >= 7 { cols[6] } else { "full" };
let cost: f64 = if cols.len() >= 8 { cols[7].parse().unwrap_or(0.0) } else { 0.0 };
if cost > 0.0 { any_cost = true; }
total_input += inp;
total_output += out;
total_duration += dur;
total_calls += 1;
total_cost += cost;
if tier == "light" { total_light += 1; } else { total_full += 1; }
let entry = per_worker.entry(worker).or_insert((0, 0, 0, 0, model.clone(), 0, 0, 0.0));
entry.0 += inp;
entry.1 += out;
entry.2 += dur;
entry.3 += 1;
entry.4 = model;
if tier == "light" { entry.5 += 1; } else { entry.6 += 1; }
entry.7 += cost;
}
}
let fmt_time = |secs: u64| -> String {
let h = secs / 3600;
let m = (secs % 3600) / 60;
let s = secs % 60;
if h > 0 { format!("{:>2}:{:02}:{:02}", h, m, s) }
else { format!(" {:02}:{:02}", m, s) }
};
let header = if any_cost { "Token usage (actual)\n" } else { "Token usage (estimated ~4 chars/token)\n" };
println!("{}", header);
let client = CollabClient::new(&server, "", token.as_deref());
let mut todo_counts: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
for worker_name in per_worker.keys() {
if let Ok(todos) = client.fetch_todos(worker_name).await {
todo_counts.insert(worker_name.clone(), todos.len());
}
}
let total_todos: usize = todo_counts.values().sum();
let mut workers: Vec<_> = per_worker.iter().collect();
workers.sort_by(|a, b| (b.1.0 + b.1.1).cmp(&(a.1.0 + a.1.1)));
let cost_col = if any_cost { " Cost" } else { "" };
println!("{:<20} {:>8} {:>8} {:>6} {:>8} {:<10} {:<10} {:<6}{}", "Worker", "Input", "Output", "Calls", "Time", "CLI", "Tiers", "Todos", cost_col);
println!("{}", "─".repeat(if any_cost { 96 } else { 88 }));
for (name, (inp, out, dur, calls, model, light, full, cost)) in &workers {
let tier_str = format!("{}F/{}L", full, light);
let todo_str = match todo_counts.get(*name) {
Some(0) => "—".to_string(),
Some(n) => format!("{}", n),
None => "?".to_string(),
};
let cost_str = if any_cost { format!(" ${:.4}", cost) } else { String::new() };
println!("{:<20} {:>7}K {:>7}K {:>6} {:>8} {:<10} {:<10} {:<6}{}", name, inp / 1000, out / 1000, calls, fmt_time(*dur), model, tier_str, todo_str, cost_str);
}
println!("{}", "─".repeat(if any_cost { 96 } else { 88 }));
let total_tier_str = format!("{}F/{}L", total_full, total_light);
let total_todo_str = if total_todos > 0 { format!("{}", total_todos) } else { "—".to_string() };
let total_cost_str = if any_cost { format!(" ${:.4}", total_cost) } else { String::new() };
println!("{:<20} {:>7}K {:>7}K {:>6} {:>8} {:<10} {:<10} {:<6}{}", "TOTAL", total_input / 1000, total_output / 1000, total_calls, fmt_time(total_duration), "", total_tier_str, total_todo_str, total_cost_str);
return Ok(());
}
let instance_id = instance.ok_or_else(|| {
anyhow::anyhow!(
"Instance ID required. Set via --instance, $COLLAB_INSTANCE, or ~/.collab.toml\n\
\n\
Example ~/.collab.toml:\n\
host = \"http://localhost:8000\"\n\
instance = \"worker1\""
)
})?;
let client = CollabClient::new(&server, &instance_id, token.as_deref());
let _ = client.heartbeat(None).await;
match cli.command {
Commands::List { all, from, since } => {
client.list_messages(!all, from.as_deref(), since.as_deref()).await?;
}
Commands::Reply { sender, message } => {
client.reply_to_latest(&sender, &message).await?;
}
Commands::Show { hash } => {
client.show_message(&hash).await?;
}
Commands::Status => {
client.show_status().await?;
}
Commands::Add { recipient, message, refs } => {
let recipient = recipient.trim_start_matches('@');
let ref_hashes = refs.map(|r| {
r.split(',').map(|s| s.trim().to_string()).collect()
});
client.add_message(recipient, &message, ref_hashes).await?;
}
Commands::Stream { role } => {
client.stream_messages(role).await?;
}
Commands::Broadcast { message, refs } => {
let ref_hashes = refs.map(|r| {
r.split(',').map(|s| s.trim().to_string()).collect()
});
client.broadcast(&message, ref_hashes).await?;
}
Commands::History { filter } => {
let filter_id = filter.as_deref().map(|s| s.trim_start_matches('@'));
client.show_history(filter_id).await?;
}
Commands::Todo { action } => match action {
TodoAction::Add { instance, description } => {
let instance = instance.trim_start_matches('@');
client.todo_add(instance, &description).await?;
}
TodoAction::List { instance, for_instance } => {
let target = for_instance.as_deref().or(instance.as_deref());
let target = target.map(|s| s.trim_start_matches('@'));
client.todo_list(target).await?;
}
TodoAction::Done { hash } => {
client.todo_done(&hash).await?;
}
},
#[cfg(feature = "monitor")]
Commands::Monitor { interval } => {
let server2 = server.clone();
let instance2 = instance_id.clone();
let token2 = token.clone();
std::thread::spawn(move || {
monitor::run(&server2, &instance2, interval, token2.as_deref())
})
.join()
.unwrap_or_else(|_| Err(anyhow::anyhow!("monitor panicked")))?;
}
Commands::Roster | Commands::ConfigPath | Commands::Usage | Commands::Init { .. } | Commands::Start { .. } | Commands::Stop { .. } | Commands::Restart { .. } | Commands::LifecycleStatus => unreachable!(),
#[allow(unreachable_patterns)]
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
Ok(())
}
fn parse_target(target: &str) -> Result<Vec<String>> {
let target = target.trim();
if target == "all" {
Ok(vec!["all".to_string()])
} else if target.starts_with('@') {
let name = &target[1..];
if name.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_') {
Ok(vec![name.to_string()])
} else {
Err(anyhow::anyhow!("Invalid instance name: {}", name))
}
} else if target.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_') {
Ok(vec![target.to_string()])
} else {
Err(anyhow::anyhow!("Invalid target: {}", target))
}
}
async fn lifecycle_start(target: &str, server: &str, token: Option<&str>) -> Result<()> {
let targets = parse_target(target)?;
let manifest_path = find_manifest()?;
let manifest = lifecycle::read_manifest(&manifest_path)?;
let pids_file = manifest_path.parent().unwrap().join("workers.pids");
if pids_file.exists() {
let content = std::fs::read_to_string(&pids_file)?;
let state: std::collections::HashMap<String, lifecycle::WorkerState> =
serde_json::from_str(&content).unwrap_or_default();
for (name, ws) in &state {
if !lifecycle::process_exists(ws.pid) {
println!("⚠ Cleaning up stale PID for {} (PID {} no longer running)", name, ws.pid);
lifecycle::remove_worker_pid(&pids_file, name)?;
}
}
}
let workers = if targets[0] == "all" {
manifest.clone()
} else {
manifest.into_iter()
.filter(|w| targets.contains(&w.name))
.collect()
};
if workers.is_empty() {
println!("No matching workers found");
return Ok(());
}
for worker in workers {
let workdir = std::path::PathBuf::from(&worker.output_dir);
let child = lifecycle::spawn_worker(
&worker.name,
&workdir,
&worker.model,
&worker.name,
server,
token,
worker.cli_template.as_deref(),
)?;
let pid = child.id();
let mut cmd = format!("collab worker --workdir {} --model {}", worker.output_dir, worker.model);
if let Some(tmpl) = &worker.cli_template {
cmd.push_str(&format!(" --cli-template {:?}", tmpl));
}
lifecycle::save_worker_pid(&pids_file, &worker.name, pid, &cmd)?;
std::mem::drop(child);
}
println!("✓ Workers started. Check status with: collab lifecycle-status");
Ok(())
}
async fn lifecycle_stop(target: &str, server: &str, token: Option<&str>) -> Result<()> {
let targets = parse_target(target)?;
let manifest_path = find_manifest()?;
let _manifest = lifecycle::read_manifest(&manifest_path)?;
let pids_file = manifest_path.parent().unwrap().join("workers.pids");
let mut state: std::collections::HashMap<String, lifecycle::WorkerState> = if pids_file.exists() {
let content = std::fs::read_to_string(&pids_file)?;
serde_json::from_str(&content).unwrap_or_default()
} else {
println!("No running workers found");
return Ok(());
};
let workers_to_stop: Vec<String> = if targets[0] == "all" {
state.keys().cloned().collect()
} else {
targets.iter()
.filter(|t| state.contains_key(*t))
.cloned()
.collect()
};
if workers_to_stop.is_empty() {
println!("No matching running workers found");
return Ok(());
}
for name in &workers_to_stop {
if let Some(worker_state) = state.remove(name) {
lifecycle::kill_process(worker_state.pid, name)?;
lifecycle::remove_worker_pid(&pids_file, name)?;
}
}
println!("✓ Workers stopped");
Ok(())
}
async fn lifecycle_restart(target: &str, server: &str, token: Option<&str>) -> Result<()> {
lifecycle_stop(target, server, token).await?;
std::thread::sleep(std::time::Duration::from_millis(500));
lifecycle_start(target, server, token).await?;
Ok(())
}
async fn lifecycle_status() -> Result<()> {
let manifest_path = find_manifest()?;
let pids_file = manifest_path.parent().unwrap().join("workers.pids");
if !pids_file.exists() {
println!("No workers running");
return Ok(());
}
let content = std::fs::read_to_string(&pids_file)?;
let state: std::collections::HashMap<String, lifecycle::WorkerState> = serde_json::from_str(&content)?;
println!("Running workers:");
for (name, worker_state) in &state {
println!(" {} (PID: {})", name, worker_state.pid);
println!(" Started: {}", worker_state.started_at);
println!(" Command: {}", worker_state.command);
}
Ok(())
}
fn find_manifest() -> Result<std::path::PathBuf> {
let mut current = std::env::current_dir()?;
loop {
let manifest = current.join(".collab/workers.json");
if manifest.exists() {
return Ok(manifest);
}
if !current.pop() {
break;
}
}
Err(anyhow::anyhow!(
"Manifest not found. Run 'collab init workers.yml' in your project directory"
))
}