use anyhow::{Context, Result};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use crate::sessions::Tmux;
use crate::{frontmatter, snapshot};
pub struct ParallelConfig {
pub tasks: Vec<String>,
pub model: Option<String>,
#[allow(dead_code)]
pub no_git: bool,
pub no_worktree: bool,
pub timeout_secs: u64,
pub dry_run: bool,
}
struct TaskState {
index: usize,
description: String,
worktree_path: PathBuf,
pane_id: String,
completed: bool,
}
struct TaskResult {
index: usize,
description: String,
output: Option<String>,
diff: Option<String>,
error: Option<String>,
}
const PROMPT_FILENAME: &str = ".agent-doc-prompt.txt";
const RESULT_FILENAME: &str = ".agent-doc-result.json";
const LOG_FILENAME: &str = ".agent-doc-result.log";
const POLL_INTERVAL: Duration = Duration::from_secs(2);
pub fn run(file: &Path, config: ParallelConfig) -> Result<()> {
if !file.exists() {
anyhow::bail!("file not found: {}", file.display());
}
let canonical = file
.canonicalize()
.with_context(|| format!("failed to canonicalize {}", file.display()))?;
let project_root = snapshot::find_project_root(&canonical)
.ok_or_else(|| anyhow::anyhow!("no .agent-doc/ project root found for {}", file.display()))?;
let content = std::fs::read_to_string(file)
.with_context(|| format!("failed to read {}", file.display()))?;
let (fm, _body) = frontmatter::parse(&content)?;
let session_name = {
let tmux = Tmux::default_server();
tmux.cmd()
.args(["display-message", "-p", "#{session_name}"])
.output()
.ok()
.filter(|o| o.status.success())
.map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "claude".to_string())
};
let agent_doc_session = fm
.session
.as_deref()
.unwrap_or("deep")
.to_string();
eprintln!("[parallel] Project root: {}", project_root.display());
eprintln!("[parallel] Tmux session: {}", session_name);
eprintln!("[parallel] Tasks: {}", config.tasks.len());
for (i, task) in config.tasks.iter().enumerate() {
eprintln!("[parallel] {}: {}", i + 1, task);
}
if config.dry_run {
eprintln!("[parallel] Dry run — exiting without spawning tasks.");
return Ok(());
}
if config.tasks.is_empty() {
eprintln!("[parallel] No tasks provided.");
return Ok(());
}
let tmux = Tmux::default_server();
let mut task_states: Vec<TaskState> = Vec::new();
for (i, task) in config.tasks.iter().enumerate() {
let task_cwd = if config.no_worktree {
eprintln!("[parallel] Task {}/{} (no worktree, using project root)", i + 1, config.tasks.len());
project_root.clone()
} else {
eprintln!("[parallel] Creating worktree for task {}/{}...", i + 1, config.tasks.len());
let worktree_info = crate::worktree::create(&project_root, &agent_doc_session, i)?;
eprintln!("[parallel] Worktree: {}", worktree_info.path.display());
worktree_info.path
};
let prompt_path = task_cwd.join(PROMPT_FILENAME);
std::fs::write(&prompt_path, task)
.with_context(|| format!("failed to write prompt to {}", prompt_path.display()))?;
let pane_id = tmux.auto_start(&session_name, &task_cwd)
.with_context(|| format!("failed to create tmux pane for task {}", i + 1))?;
eprintln!("[parallel] Pane: {}", pane_id);
let mut cmd_parts = Vec::new();
cmd_parts.push("claude -p --output-format json".to_string());
if let Some(ref model) = config.model {
cmd_parts.push(format!("--model {}", model));
}
cmd_parts.push(format!(
"< {} > {} 2>{}; exit",
PROMPT_FILENAME, RESULT_FILENAME, LOG_FILENAME
));
let env_prefix = crate::env::shell_export_prefix(&fm.env);
let cmd_str = format!("{}{}", env_prefix, cmd_parts.join(" "));
tmux.send_keys(&pane_id, &cmd_str)
.with_context(|| format!("failed to send keys to pane {} for task {}", pane_id, i + 1))?;
if let Err(e) = tmux.stash_pane(&pane_id, &session_name) {
eprintln!("[parallel] Warning: failed to stash pane {}: {}", pane_id, e);
}
task_states.push(TaskState {
index: i,
description: task.clone(),
worktree_path: task_cwd,
pane_id,
completed: false,
});
}
eprintln!("[parallel] Polling for task completion...");
let start = Instant::now();
let timeout = Duration::from_secs(config.timeout_secs);
loop {
let all_done = task_states.iter().all(|t| t.completed);
if all_done {
break;
}
if start.elapsed() > timeout {
eprintln!("[parallel] Timeout reached ({} seconds). Killing remaining panes...", config.timeout_secs);
for task in &mut task_states {
if !task.completed {
eprintln!("[parallel] Killing pane {} (task {})", task.pane_id, task.index + 1);
if let Err(e) = tmux.kill_pane(&task.pane_id) {
eprintln!("[parallel] Warning: failed to kill pane {}: {}", task.pane_id, e);
}
task.completed = true;
}
}
break;
}
let total = task_states.len();
for task in &mut task_states {
if task.completed {
continue;
}
if !tmux.pane_alive(&task.pane_id) {
task.completed = true;
eprintln!(
"[parallel] Task {}/{} complete.",
task.index + 1,
total
);
}
}
let completed = task_states.iter().filter(|t| t.completed).count();
if completed < task_states.len() {
std::thread::sleep(POLL_INTERVAL);
}
}
eprintln!("[parallel] Collecting results...");
let mut results: Vec<TaskResult> = Vec::new();
for task in &task_states {
let result_path = task.worktree_path.join(RESULT_FILENAME);
let log_path = task.worktree_path.join(LOG_FILENAME);
let output = match std::fs::read_to_string(&result_path) {
Ok(content) if !content.trim().is_empty() => {
match extract_result_text(&content) {
Some(text) => Some(text),
None => Some(content),
}
}
Ok(_) => None,
Err(e) => {
eprintln!("[parallel] Warning: could not read result for task {}: {}", task.index + 1, e);
let log_content = std::fs::read_to_string(&log_path).ok();
if let Some(ref log) = log_content
&& !log.trim().is_empty() {
eprintln!("[parallel] Log: {}", log.trim());
}
None
}
};
let error = std::fs::read_to_string(&log_path)
.ok()
.filter(|s| !s.trim().is_empty());
let diff = if config.no_worktree {
None
} else {
crate::worktree::diff(&task.worktree_path)
.ok()
.filter(|s| !s.trim().is_empty())
};
results.push(TaskResult {
index: task.index,
description: task.description.clone(),
output,
diff,
error,
});
}
let markdown = format_results(&results);
println!("{}", markdown);
eprintln!("[parallel] Done. {} tasks completed.", results.len());
Ok(())
}
fn extract_result_text(json_str: &str) -> Option<String> {
for line in json_str.lines().rev() {
let line = line.trim();
if line.is_empty() {
continue;
}
if let Ok(val) = serde_json::from_str::<serde_json::Value>(line)
&& let Some(result) = val.get("result").and_then(|v| v.as_str()) {
return Some(result.to_string());
}
}
None
}
fn format_results(results: &[TaskResult]) -> String {
let mut out = String::new();
out.push_str("## Deep Results\n\n");
for result in results {
out.push_str(&format!("### Task {} — {}\n\n", result.index + 1, result.description));
if let Some(ref output) = result.output {
out.push_str(output);
out.push_str("\n\n");
} else {
out.push_str("*No output captured.*\n\n");
}
if let Some(ref diff) = result.diff {
out.push_str("<details>\n<summary>Git diff</summary>\n\n```diff\n");
out.push_str(diff);
out.push_str("\n```\n\n</details>\n\n");
}
if let Some(ref error) = result.error {
out.push_str("<details>\n<summary>Stderr log</summary>\n\n```\n");
out.push_str(error);
out.push_str("\n```\n\n</details>\n\n");
}
}
out
}