use std::borrow::Cow;
use std::collections::HashMap;
use std::fs;
use std::io::Read as _;
use std::path::Path;
use std::process::{Child, ExitStatus, Stdio};
use anyhow::Context;
use worktrunk::git::{Repository, WorktrunkError};
use worktrunk::shell_exec::ShellConfig;
use super::command_executor::{expand_shell_template, wait_first_error};
use super::pipeline_spec::{PipelineSpec, PipelineStepSpec};
use super::process::HookLog;
pub fn run_pipeline() -> anyhow::Result<()> {
let mut contents = String::new();
std::io::stdin()
.read_to_string(&mut contents)
.context("failed to read pipeline spec from stdin")?;
let spec: PipelineSpec =
serde_json::from_str(&contents).context("failed to deserialize pipeline spec")?;
let repo =
Repository::at(&spec.worktree_path).context("failed to open repository for pipeline")?;
fs::create_dir_all(&spec.log_dir)
.with_context(|| format!("failed to create log directory: {}", spec.log_dir.display()))?;
let mut cmd_index = 0usize;
for step in &spec.steps {
match step {
PipelineStepSpec::Single { template, name } => {
let log_name = command_log_name(name.as_deref(), cmd_index);
let log_file = create_command_log(&spec, &log_name)?;
let step_ctx = step_context(&spec.context, name.as_deref());
let label = name.as_deref().unwrap_or("pipeline step");
let expanded = expand_shell_template(template, &step_ctx, &repo, label)?;
let step_json = serde_json::to_string(&*step_ctx)
.context("failed to serialize step context")?;
let mut child =
spawn_shell_command(&expanded, &spec.worktree_path, &step_json, log_file)?;
let status = child.wait().context("failed to wait for child process")?;
if !status.success() {
return Err(failure_error(&status, &expanded));
}
cmd_index += 1;
}
PipelineStepSpec::Concurrent { commands } => {
run_concurrent_group(commands, &spec, &repo, &mut cmd_index)?;
}
}
}
Ok(())
}
fn step_context<'a>(
base: &'a HashMap<String, String>,
name: Option<&str>,
) -> Cow<'a, HashMap<String, String>> {
match name {
Some(n) => {
let mut ctx = base.clone();
ctx.insert("hook_name".into(), n.into());
Cow::Owned(ctx)
}
None => Cow::Borrowed(base),
}
}
fn spawn_shell_command(
expanded: &str,
worktree_path: &Path,
context_json: &str,
log_file: fs::File,
) -> anyhow::Result<Child> {
let shell = ShellConfig::get()?;
let log_err = log_file
.try_clone()
.context("failed to clone log file handle")?;
let mut child = shell
.command(expanded)
.current_dir(worktree_path)
.stdin(Stdio::piped())
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(log_err))
.spawn()
.with_context(|| format!("failed to spawn: {expanded}"))?;
if let Some(mut stdin) = child.stdin.take() {
use std::io::Write;
let _ = stdin.write_all(context_json.as_bytes());
}
Ok(child)
}
fn run_concurrent_group(
commands: &[super::pipeline_spec::PipelineCommandSpec],
spec: &PipelineSpec,
repo: &Repository,
cmd_index: &mut usize,
) -> anyhow::Result<()> {
let serial = super::force_serial_concurrent();
let mut children = Vec::with_capacity(if serial { 0 } else { commands.len() });
for cmd in commands {
let log_name = command_log_name(cmd.name.as_deref(), *cmd_index);
let log_file = create_command_log(spec, &log_name)?;
let cmd_ctx = step_context(&spec.context, cmd.name.as_deref());
let label = cmd.name.as_deref().unwrap_or("pipeline step");
let expanded = expand_shell_template(&cmd.template, &cmd_ctx, repo, label)?;
let cmd_json =
serde_json::to_string(&*cmd_ctx).context("failed to serialize step context")?;
let mut child = spawn_shell_command(&expanded, &spec.worktree_path, &cmd_json, log_file)?;
*cmd_index += 1;
if serial {
let status = child
.wait()
.with_context(|| format!("failed to wait for: {expanded}"))?;
if !status.success() {
return Err(failure_error(
&status,
cmd.name.as_deref().unwrap_or(&expanded),
));
}
} else {
children.push((cmd.name.clone(), expanded, child));
}
}
wait_first_error(children.into_iter().map(
|(name, expanded, mut child)| -> anyhow::Result<()> {
let status = child
.wait()
.with_context(|| format!("failed to wait for: {expanded}"))?;
if !status.success() {
return Err(failure_error(&status, name.as_deref().unwrap_or(&expanded)));
}
Ok(())
},
))
}
fn command_log_name(name: Option<&str>, index: usize) -> String {
match name {
Some(n) => n.to_string(),
None => format!("cmd-{index}"),
}
}
fn create_command_log(spec: &PipelineSpec, name: &str) -> anyhow::Result<fs::File> {
let hook_log = HookLog::hook(spec.source, spec.hook_type, name);
let path = hook_log.path(&spec.log_dir, &spec.branch);
fs::File::create(&path)
.with_context(|| format!("failed to create log file: {}", path.display()))
}
fn failure_error(status: &ExitStatus, label: &str) -> anyhow::Error {
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(sig) = status.signal() {
let message = format!(
"pipeline step terminated by {}: {label}",
format_signal(sig)
);
return WorktrunkError::ChildProcessExited {
code: 128 + sig,
message,
signal: Some(sig),
}
.into();
}
}
let code = status.code().unwrap_or(1);
let message = format!("command failed with exit code {code}: {label}");
WorktrunkError::ChildProcessExited {
code,
message,
signal: None,
}
.into()
}
#[cfg(unix)]
fn format_signal(sig: i32) -> String {
match nix::sys::signal::Signal::try_from(sig) {
Ok(signal) => format!("signal {sig} ({signal})"),
Err(_) => format!("signal {sig}"),
}
}
#[cfg(all(test, unix))]
mod tests {
use super::*;
use std::os::unix::process::ExitStatusExt;
use worktrunk::git::interrupt_exit_code;
fn downcast_child_exit(err: &anyhow::Error) -> (i32, Option<i32>, String) {
match err.downcast_ref::<WorktrunkError>() {
Some(WorktrunkError::ChildProcessExited {
code,
message,
signal,
}) => (*code, *signal, message.clone()),
_ => panic!("expected ChildProcessExited, got {err:?}"),
}
}
#[test]
fn signal_exit_reports_named_signal_and_shell_exit_code() {
let cases = [
(
15,
143,
"pipeline step terminated by signal 15 (SIGTERM): my-step",
),
(
2,
130,
"pipeline step terminated by signal 2 (SIGINT): my-step",
),
(
9,
137,
"pipeline step terminated by signal 9 (SIGKILL): my-step",
),
];
for (sig, expected_code, expected_msg) in cases {
let status = ExitStatus::from_raw(sig);
let err = failure_error(&status, "my-step");
let (code, signal, message) = downcast_child_exit(&err);
assert_eq!(signal, Some(sig), "signal field for {sig}");
assert_eq!(code, expected_code, "exit code for {sig}");
assert_eq!(message, expected_msg, "message for {sig}");
assert_eq!(
interrupt_exit_code(&err),
Some(expected_code),
"interrupt_exit_code for {sig}",
);
}
}
#[test]
fn non_signal_exit_preserves_child_code() {
let status = ExitStatus::from_raw(2 << 8);
let err = failure_error(&status, "my-step");
let (code, signal, message) = downcast_child_exit(&err);
assert_eq!(signal, None);
assert_eq!(code, 2);
assert_eq!(message, "command failed with exit code 2: my-step");
assert_eq!(interrupt_exit_code(&err), None);
}
}