use std::collections::HashSet;
use anyhow::Result;
use crate::chain::{Chain, ChainItem, ChainItemKind, ChainMode, FailurePolicy};
use crate::resolver::ResolutionOverrides;
use crate::types::{DetectionWarning, ProjectContext};
pub(crate) fn run_chain(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
chain: &Chain,
) -> Result<i32> {
let mut warnings: HashSet<DetectionWarning> = HashSet::new();
for item in &chain.items {
if let ChainItemKind::Task(name) = &item.kind {
crate::cmd::run::precheck_task(ctx, overrides, name)?;
}
}
let result = match chain.mode {
ChainMode::Sequential => run_sequential(ctx, overrides, chain, &mut warnings),
ChainMode::Parallel => run_parallel(ctx, overrides, chain, &mut warnings),
};
crate::cmd::emit_collected_warnings(&warnings, overrides);
result
}
fn run_sequential(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
chain: &Chain,
warnings: &mut HashSet<DetectionWarning>,
) -> Result<i32> {
let keep_going = matches!(chain.failure, FailurePolicy::KeepGoing);
let mut first_failure: Option<i32> = None;
for item in &chain.items {
let code = dispatch_item(ctx, overrides, item, warnings)?;
if code != 0 {
first_failure.get_or_insert(code);
if !keep_going {
return Ok(code);
}
}
}
Ok(first_failure.unwrap_or(0))
}
fn run_parallel(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
chain: &Chain,
warnings: &mut HashSet<DetectionWarning>,
) -> Result<i32> {
let in_gha = actions_rs::env::is_github_actions();
let grouped = if in_gha {
overrides.group_output && overrides.github_group_parallel
} else {
overrides.parallel_grouped
};
if grouped {
let gha_syntax = in_gha;
run_parallel_grouped(ctx, overrides, chain, warnings, gha_syntax)
} else {
run_parallel_streaming(ctx, overrides, chain, warnings)
}
}
fn run_parallel_streaming(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
chain: &Chain,
warnings: &mut HashSet<DetectionWarning>,
) -> Result<i32> {
use std::process::Child;
use std::sync::Arc;
use crate::chain::mux::{LineSink, StdioSink, prefix_width, render_prefix, spawn_readers};
let names: Vec<&str> = chain.items.iter().map(ChainItem::display_name).collect();
let width = prefix_width(&names);
let colorize = colored::control::SHOULD_COLORIZE.should_colorize();
let sink: Arc<dyn LineSink> = Arc::new(StdioSink);
let mut children: Vec<(String, Child)> = Vec::with_capacity(chain.items.len());
let mut reader_handles = Vec::new();
let spawn_outcome: Result<()> = (|| {
for item in &chain.items {
let prefix = render_prefix(item.display_name(), width, colorize);
let mut child = match &item.kind {
ChainItemKind::Task(name) => crate::cmd::run::dispatch_task_piped(
ctx,
overrides,
name,
&item.args,
Some(warnings),
)?,
ChainItemKind::Install { .. } => {
anyhow::bail!("install items cannot run in parallel chains")
}
};
let stdout: Box<dyn std::io::Read + Send> =
Box::new(child.stdout.take().expect("stdout piped"));
let stderr: Box<dyn std::io::Read + Send> =
Box::new(child.stderr.take().expect("stderr piped"));
reader_handles.extend(spawn_readers(
vec![
(prefix.clone(), false, stdout),
(prefix.clone(), true, stderr),
],
&sink,
));
children.push((item.display_name().to_string(), child));
}
Ok(())
})();
if let Err(e) = spawn_outcome {
for (_, mut c) in children {
let _ = c.kill();
let _ = c.wait();
}
for h in reader_handles {
let _ = h.join();
}
return Err(e);
}
let mut remaining: Vec<(String, Child)> = children;
let mut first_failure: Option<i32> = None;
let kill_on_fail = matches!(chain.failure, FailurePolicy::KillOnFail);
while !remaining.is_empty() {
let mut next: Vec<(String, Child)> = Vec::with_capacity(remaining.len());
for (name, mut child) in std::mem::take(&mut remaining) {
match child.try_wait()? {
Some(status) => {
let code = crate::cmd::exit_code(status);
if code != 0 {
first_failure.get_or_insert(code);
}
}
None => {
if kill_on_fail && first_failure.is_some() {
let _ = child.kill();
let _ = child.wait();
} else {
next.push((name, child));
}
}
}
}
remaining = next;
if !remaining.is_empty() {
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
for h in reader_handles {
let _ = h.join();
}
Ok(first_failure.unwrap_or(0))
}
struct GroupedTask {
name: String,
child: std::process::Child,
sink: std::sync::Arc<crate::chain::mux::BufferSink>,
readers: Vec<std::thread::JoinHandle<()>>,
}
const READER_DRAIN_GRACE: std::time::Duration = std::time::Duration::from_millis(500);
fn run_parallel_grouped(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
chain: &Chain,
warnings: &mut HashSet<DetectionWarning>,
gha_syntax: bool,
) -> Result<i32> {
use std::sync::Arc;
use crate::chain::mux::{BufferSink, LineSink, spawn_readers};
let mut tasks: Vec<GroupedTask> = Vec::with_capacity(chain.items.len());
let spawn_outcome: Result<()> = (|| {
for item in &chain.items {
let (name, mut child, sink) = match &item.kind {
ChainItemKind::Task(task_name) => {
let sink = Arc::new(BufferSink::new()?);
let child = crate::cmd::run::dispatch_task_piped(
ctx,
overrides,
task_name,
&item.args,
Some(warnings),
)?;
(item.display_name().to_string(), child, sink)
}
ChainItemKind::Install { .. } => {
anyhow::bail!("install items cannot run in parallel chains")
}
};
let stdout: Box<dyn std::io::Read + Send> =
Box::new(child.stdout.take().expect("stdout piped"));
let stderr: Box<dyn std::io::Read + Send> =
Box::new(child.stderr.take().expect("stderr piped"));
let dyn_sink: Arc<dyn LineSink> = sink.clone();
let readers = spawn_readers(
vec![
(String::new(), false, stdout),
(String::new(), true, stderr),
],
&dyn_sink,
);
tasks.push(GroupedTask {
name,
child,
sink,
readers,
});
}
Ok(())
})();
if let Err(e) = spawn_outcome {
for mut t in tasks {
let _ = t.child.kill();
let _ = t.child.wait();
t.sink.close();
wait_for_readers(&mut t.readers, READER_DRAIN_GRACE);
}
return Err(e);
}
let colorize = colored::control::SHOULD_COLORIZE.should_colorize();
let mut remaining = tasks;
let mut first_failure: Option<i32> = None;
let kill_on_fail = matches!(chain.failure, FailurePolicy::KillOnFail);
while !remaining.is_empty() {
let mut next: Vec<GroupedTask> = Vec::with_capacity(remaining.len());
for mut t in std::mem::take(&mut remaining) {
match t.child.try_wait()? {
Some(status) => {
let code = crate::cmd::exit_code(status);
if code != 0 {
first_failure.get_or_insert(code);
}
flush_task_group(&t.name, gha_syntax, colorize, &t.sink, t.readers);
}
None => {
if kill_on_fail && first_failure.is_some() {
let _ = t.child.kill();
let _ = t.child.wait();
flush_task_group(&t.name, gha_syntax, colorize, &t.sink, t.readers);
} else {
next.push(t);
}
}
}
}
remaining = next;
if !remaining.is_empty() {
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
Ok(first_failure.unwrap_or(0))
}
fn flush_task_group(
name: &str,
gha_syntax: bool,
colorize: bool,
sink: &crate::chain::mux::BufferSink,
mut readers: Vec<std::thread::JoinHandle<()>>,
) {
use std::io::Write as _;
wait_for_readers(&mut readers, READER_DRAIN_GRACE);
sink.close();
join_finished_readers(&mut readers);
if gha_syntax {
let _group = actions_rs::log::group_guard(format!("runner: {name}"));
let mut stdout = std::io::stdout();
let mut stderr = std::io::stderr();
let _ = sink.replay_to(&mut stdout, &mut stderr);
} else {
let header = format!("runner: {name}");
let header = if colorize {
use colored::Colorize as _;
header
.color(crate::chain::mux::color_for(name))
.bold()
.to_string()
} else {
header
};
{
let mut out = std::io::stdout().lock();
let _ = writeln!(out, "{header}");
let _ = out.flush();
}
let mut stdout = std::io::stdout();
let mut stderr = std::io::stderr();
let _ = sink.replay_to(&mut stdout, &mut stderr);
}
}
fn wait_for_readers(readers: &mut Vec<std::thread::JoinHandle<()>>, grace: std::time::Duration) {
let deadline = std::time::Instant::now() + grace;
loop {
join_finished_readers(readers);
if readers.is_empty() {
return;
}
let now = std::time::Instant::now();
if now >= deadline {
return;
}
std::thread::sleep((deadline - now).min(std::time::Duration::from_millis(10)));
}
}
fn join_finished_readers(readers: &mut Vec<std::thread::JoinHandle<()>>) {
let mut index = 0;
while index < readers.len() {
if readers[index].is_finished() {
let handle = readers.swap_remove(index);
let _ = handle.join();
} else {
index += 1;
}
}
}
fn dispatch_item(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
item: &ChainItem,
warnings: &mut HashSet<DetectionWarning>,
) -> Result<i32> {
match &item.kind {
ChainItemKind::Task(name) => {
crate::cmd::run::run(ctx, overrides, name, &item.args, Some(warnings))
}
ChainItemKind::Install { frozen } => {
crate::cmd::install::install_pms(ctx, overrides, *frozen, Some(warnings))
}
}
}