use std::collections::HashMap;
use std::fs;
use std::io::Read as _;
use std::path::Path;
use std::process::{Child, Stdio};
use anyhow::{Context, bail};
use worktrunk::config::expand_template;
use worktrunk::git::Repository;
use worktrunk::shell_exec::ShellConfig;
use super::pipeline_spec::{PipelineSpec, PipelineStepSpec};
use super::process::HookLog;
pub fn run_pipeline() -> anyhow::Result<()> {
let mut contents = String::new();
std::io::stdin()
.read_to_string(&mut contents)
.context("failed to read pipeline spec from stdin")?;
let spec: PipelineSpec =
serde_json::from_str(&contents).context("failed to deserialize pipeline spec")?;
let repo =
Repository::at(&spec.worktree_path).context("failed to open repository for pipeline")?;
fs::create_dir_all(&spec.log_dir)
.with_context(|| format!("failed to create log directory: {}", spec.log_dir.display()))?;
let mut cmd_index = 0usize;
for step in &spec.steps {
match step {
PipelineStepSpec::Single { template, name } => {
let log_name = command_log_name(name.as_deref(), cmd_index);
let log_file = create_command_log(&spec, &log_name)?;
let expanded = expand_now(template, &spec, &repo, name.as_deref())?;
let step_json = build_step_context_json(&spec.context, name.as_deref())?;
let mut child =
spawn_shell_command(&expanded, &spec.worktree_path, &step_json, log_file)?;
let status = child.wait().context("failed to wait for child process")?;
if !status.success() {
bail!(
"command failed with {}: {}",
format_exit(status.code()),
expanded,
);
}
cmd_index += 1;
}
PipelineStepSpec::Concurrent { commands } => {
run_concurrent_group(commands, &spec, &repo, &mut cmd_index)?;
}
}
}
Ok(())
}
fn expand_now(
template: &str,
spec: &PipelineSpec,
repo: &Repository,
name: Option<&str>,
) -> anyhow::Result<String> {
let mut vars: HashMap<&str, &str> = spec
.context
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
if let Some(n) = name {
vars.insert("hook_name", n);
}
let label = name.unwrap_or("pipeline step");
Ok(expand_template(template, &vars, true, repo, label)?)
}
fn spawn_shell_command(
expanded: &str,
worktree_path: &Path,
context_json: &str,
log_file: fs::File,
) -> anyhow::Result<Child> {
let shell = ShellConfig::get()?;
let log_err = log_file
.try_clone()
.context("failed to clone log file handle")?;
let mut child = shell
.command(expanded)
.current_dir(worktree_path)
.stdin(Stdio::piped())
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(log_err))
.spawn()
.with_context(|| format!("failed to spawn: {expanded}"))?;
if let Some(mut stdin) = child.stdin.take() {
use std::io::Write;
let _ = stdin.write_all(context_json.as_bytes());
}
Ok(child)
}
fn run_concurrent_group(
commands: &[super::pipeline_spec::PipelineCommandSpec],
spec: &PipelineSpec,
repo: &Repository,
cmd_index: &mut usize,
) -> anyhow::Result<()> {
let mut children = Vec::with_capacity(commands.len());
for cmd in commands {
let log_name = command_log_name(cmd.name.as_deref(), *cmd_index);
let log_file = create_command_log(spec, &log_name)?;
let expanded = expand_now(&cmd.template, spec, repo, cmd.name.as_deref())?;
let cmd_json = build_step_context_json(&spec.context, cmd.name.as_deref())?;
let child = spawn_shell_command(&expanded, &spec.worktree_path, &cmd_json, log_file)?;
children.push((cmd.name.clone(), expanded, child));
*cmd_index += 1;
}
let mut failures = Vec::new();
for (name, expanded, mut child) in children {
let status = child
.wait()
.with_context(|| format!("failed to wait for: {expanded}"))?;
if !status.success() {
let label = name.as_deref().unwrap_or(&expanded);
failures.push(label.to_string());
}
}
if !failures.is_empty() {
bail!("concurrent group had failures: {}", failures.join(", "));
}
Ok(())
}
fn build_step_context_json(
base_context: &HashMap<String, String>,
name: Option<&str>,
) -> anyhow::Result<String> {
if let Some(n) = name {
let mut ctx = base_context.clone();
ctx.insert("hook_name".into(), n.into());
serde_json::to_string(&ctx).context("failed to serialize step context")
} else {
serde_json::to_string(base_context).context("failed to serialize step context")
}
}
fn command_log_name(name: Option<&str>, index: usize) -> String {
match name {
Some(n) => n.to_string(),
None => format!("cmd-{index}"),
}
}
fn create_command_log(spec: &PipelineSpec, name: &str) -> anyhow::Result<fs::File> {
let hook_log = HookLog::hook(spec.source, spec.hook_type, name);
let path = hook_log.path(&spec.log_dir, &spec.branch);
fs::File::create(&path)
.with_context(|| format!("failed to create log file: {}", path.display()))
}
fn format_exit(code: Option<i32>) -> String {
code.map_or("signal".to_string(), |c| format!("exit code {c}"))
}