use std::collections::HashSet;
use anyhow::Result;
use crate::chain::{Chain, ChainItem, ChainItemKind, ChainMode, FailurePolicy};
use crate::resolver::ResolutionOverrides;
use crate::types::{DetectionWarning, ProjectContext};
pub(crate) fn run_chain(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
chain: &Chain,
) -> Result<i32> {
let mut warnings: HashSet<DetectionWarning> = HashSet::new();
for item in &chain.items {
if let ChainItemKind::Task(name) = &item.kind {
crate::cmd::run::precheck_task(ctx, overrides, name)?;
}
}
let result = match chain.mode {
ChainMode::Sequential => run_sequential(ctx, overrides, chain, &mut warnings),
ChainMode::Parallel => run_parallel(ctx, overrides, chain, &mut warnings),
};
crate::cmd::emit_collected_warnings(&warnings, overrides);
result
}
fn run_sequential(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
chain: &Chain,
warnings: &mut HashSet<DetectionWarning>,
) -> Result<i32> {
let keep_going = matches!(chain.failure, FailurePolicy::KeepGoing);
let mut first_failure: Option<i32> = None;
for item in &chain.items {
let code = dispatch_item(ctx, overrides, item, warnings)?;
if code != 0 {
first_failure.get_or_insert(code);
if !keep_going {
return Ok(code);
}
}
}
Ok(first_failure.unwrap_or(0))
}
fn run_parallel(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
chain: &Chain,
warnings: &mut HashSet<DetectionWarning>,
) -> Result<i32> {
use std::process::Child;
use std::sync::Arc;
use crate::chain::mux::{LineSink, StdioSink, prefix_width, render_prefix, spawn_readers};
let names: Vec<&str> = chain.items.iter().map(ChainItem::display_name).collect();
let width = prefix_width(&names);
let colorize = colored::control::SHOULD_COLORIZE.should_colorize();
let sink: Arc<dyn LineSink> = Arc::new(StdioSink);
let mut children: Vec<(String, Child)> = Vec::with_capacity(chain.items.len());
let mut reader_handles = Vec::new();
let spawn_outcome: Result<()> = (|| {
for item in &chain.items {
let prefix = render_prefix(item.display_name(), width, colorize);
let mut child = match &item.kind {
ChainItemKind::Task(name) => crate::cmd::run::dispatch_task_piped(
ctx,
overrides,
name,
&item.args,
Some(warnings),
)?,
ChainItemKind::Install { .. } => {
anyhow::bail!("install items cannot run in parallel chains")
}
};
let stdout: Box<dyn std::io::Read + Send> =
Box::new(child.stdout.take().expect("stdout piped"));
let stderr: Box<dyn std::io::Read + Send> =
Box::new(child.stderr.take().expect("stderr piped"));
reader_handles.extend(spawn_readers(
vec![
(prefix.clone(), false, stdout),
(prefix.clone(), true, stderr),
],
&sink,
));
children.push((item.display_name().to_string(), child));
}
Ok(())
})();
if let Err(e) = spawn_outcome {
for (_, mut c) in children {
let _ = c.kill();
let _ = c.wait();
}
for h in reader_handles {
let _ = h.join();
}
return Err(e);
}
let mut remaining: Vec<(String, Child)> = children;
let mut first_failure: Option<i32> = None;
let kill_on_fail = matches!(chain.failure, FailurePolicy::KillOnFail);
while !remaining.is_empty() {
let mut next: Vec<(String, Child)> = Vec::with_capacity(remaining.len());
for (name, mut child) in std::mem::take(&mut remaining) {
match child.try_wait()? {
Some(status) => {
let code = crate::cmd::exit_code(status);
if code != 0 {
first_failure.get_or_insert(code);
}
}
None => {
if kill_on_fail && first_failure.is_some() {
let _ = child.kill();
let _ = child.wait();
} else {
next.push((name, child));
}
}
}
}
remaining = next;
if !remaining.is_empty() {
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
for h in reader_handles {
let _ = h.join();
}
Ok(first_failure.unwrap_or(0))
}
fn dispatch_item(
ctx: &ProjectContext,
overrides: &ResolutionOverrides,
item: &ChainItem,
warnings: &mut HashSet<DetectionWarning>,
) -> Result<i32> {
match &item.kind {
ChainItemKind::Task(name) => {
crate::cmd::run::run(ctx, overrides, name, &item.args, Some(warnings))
}
ChainItemKind::Install { frozen } => {
crate::cmd::install::install_pms(ctx, *frozen, Some(warnings))
}
}
}