use super::*;
pub(super) fn resolve_max_parallel(
config: &OrchestrationConfig,
override_value: Option<usize>,
) -> Result<usize, String> {
let value = override_value.unwrap_or(config.max_parallel);
if value == 0 {
return Err(String::from("max_parallel must be >= 1"));
}
Ok(value)
}
pub(super) fn run_oneshot_parallel(
config: &OrchestrationConfig,
ordered: &[String],
max_parallel: usize,
global_continue_on_error: bool,
) -> Result<(String, Vec<TaskRunResult>, Option<String>), String> {
let logs_dir = PathBuf::from(DEFAULT_RUN_LOG_DIR);
std::fs::create_dir_all(&logs_dir)
.map_err(|err| format!("create log directory {} failed: {err}", logs_dir.display()))?;
let task_index = build_task_index(config)?;
let selected: HashSet<String> = ordered.iter().cloned().collect();
let order_pos: HashMap<String, usize> = ordered
.iter()
.enumerate()
.map(|(idx, name)| (name.clone(), idx))
.collect();
let mut indegree: HashMap<String, usize> =
selected.iter().map(|name| (name.clone(), 0usize)).collect();
let mut adjacency: HashMap<String, Vec<String>> = selected
.iter()
.map(|name| (name.clone(), Vec::new()))
.collect();
for name in ordered {
let task = &config.tasks[*task_index
.get(name)
.ok_or_else(|| format!("internal error: task not found: {name}"))?];
for dep in &task.depends_on {
if selected.contains(dep) {
*indegree
.get_mut(name)
.ok_or_else(|| format!("internal error: missing indegree for {name}"))? += 1;
adjacency
.get_mut(dep)
.ok_or_else(|| format!("internal error: missing adjacency for {dep}"))?
.push(name.clone());
}
}
}
let mut ready: VecDeque<String> = ordered
.iter()
.filter(|name| indegree.get(*name).copied().unwrap_or(0) == 0)
.cloned()
.collect();
let mut active: HashMap<String, RunningTask> = HashMap::new();
let mut finished = HashSet::new();
let mut results = Vec::new();
let mut failure_message = None;
let mut block_new_tasks = false;
while !ready.is_empty() || !active.is_empty() {
while !block_new_tasks && active.len() < max_parallel && !ready.is_empty() {
let name = ready
.pop_front()
.ok_or_else(|| String::from("internal queue error"))?;
let task = &config.tasks[*task_index
.get(&name)
.ok_or_else(|| format!("internal error: task not found: {name}"))?];
let running = spawn_oneshot_task(task, &logs_dir)?;
active.insert(name, running);
}
if active.is_empty() {
break;
}
let keys = active.keys().cloned().collect::<Vec<_>>();
let mut progressed = false;
for key in keys {
let done = if let Some(running) = active.get_mut(&key) {
running
.child
.try_wait()
.map_err(|err| format!("poll task {key} failed: {err}"))?
.is_some()
} else {
false
};
if !done {
continue;
}
let running = active
.remove(&key)
.ok_or_else(|| format!("internal error: missing running task {key}"))?;
let output = running
.child
.wait_with_output()
.map_err(|err| format!("collect output for task {key} failed: {err}"))?;
append_task_log(&running.log_file, &output.stdout, &output.stderr)?;
let success = output.status.success();
let task = &config.tasks[*task_index
.get(&key)
.ok_or_else(|| format!("internal error: task not found: {key}"))?];
results.push(TaskRunResult {
name: key.clone(),
status: if success {
String::from("pass")
} else {
String::from("fail")
},
exit_code: output.status.code(),
elapsed_ms: running.started_at.elapsed().as_millis() as u64,
log_file: Some(running.log_file.clone()),
});
finished.insert(key.clone());
progressed = true;
if !success && !global_continue_on_error && !task.continue_on_error {
block_new_tasks = true;
if failure_message.is_none() {
failure_message = Some(format!(
"task {} failed with status {:?}",
task.name, output.status
));
}
continue;
}
if let Some(dependents) = adjacency.get(&key) {
for next in dependents {
let value = indegree
.get_mut(next)
.ok_or_else(|| format!("internal error: missing indegree for {next}"))?;
*value = value.saturating_sub(1);
if *value == 0 && !ready.iter().any(|task_name| task_name == next) {
ready.push_back(next.clone());
}
}
let mut sorted = ready.into_iter().collect::<Vec<_>>();
sorted.sort_by_key(|name| order_pos.get(name).copied().unwrap_or(usize::MAX));
ready = sorted.into();
}
}
if !progressed {
std::thread::sleep(Duration::from_millis(10));
}
}
let mut skipped = ordered
.iter()
.filter(|name| !finished.contains(*name))
.cloned()
.collect::<Vec<_>>();
skipped.sort_by_key(|name| order_pos.get(name).copied().unwrap_or(usize::MAX));
for name in skipped {
results.push(TaskRunResult {
name,
status: String::from("skipped"),
exit_code: None,
elapsed_ms: 0,
log_file: None,
});
}
let status = if failure_message.is_some() {
String::from("fail")
} else {
String::from("pass")
};
Ok((status, results, failure_message))
}
fn spawn_oneshot_task(task: &OrchestrationTask, logs_dir: &Path) -> Result<RunningTask, String> {
if task.command.is_empty() {
return Err(format!("task {} has empty command", task.name));
}
let mut cmd = Command::new(&task.command[0]);
if task.command.len() > 1 {
cmd.args(&task.command[1..]);
}
if let Some(cwd) = &task.cwd {
cmd.current_dir(cwd);
}
for (key, value) in &task.env {
cmd.env(key, value);
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let child = cmd
.spawn()
.map_err(|err| format!("spawn task {} failed: {err}", task.name))?;
let log_file = logs_dir.join(format!("{}.log", sanitize_name(&task.name)));
Ok(RunningTask {
child,
started_at: Instant::now(),
log_file,
})
}
fn append_task_log(log_file: &Path, stdout: &[u8], stderr: &[u8]) -> Result<(), String> {
if let Some(parent) = log_file.parent() {
std::fs::create_dir_all(parent)
.map_err(|err| format!("create log directory {} failed: {err}", parent.display()))?;
}
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(log_file)
.map_err(|err| format!("open log file {} failed: {err}", log_file.display()))?;
file.write_all(b"# STDOUT\n")
.map_err(|err| format!("write log failed: {err}"))?;
file.write_all(stdout)
.map_err(|err| format!("write log failed: {err}"))?;
file.write_all(b"\n# STDERR\n")
.map_err(|err| format!("write log failed: {err}"))?;
file.write_all(stderr)
.map_err(|err| format!("write log failed: {err}"))?;
Ok(())
}
pub(super) fn run_oneshot_task(
task: &OrchestrationTask,
inherited_env: &HashMap<String, String>,
) -> Result<(), String> {
if task.command.is_empty() {
return Err(format!("task {} has empty command", task.name));
}
let mut cmd = Command::new(&task.command[0]);
if task.command.len() > 1 {
cmd.args(&task.command[1..]);
}
if let Some(cwd) = &task.cwd {
cmd.current_dir(cwd);
}
for (key, value) in inherited_env {
cmd.env(key, value);
}
for (key, value) in &task.env {
cmd.env(key, value);
}
let status = cmd
.status()
.map_err(|err| format!("run task {} failed: {err}", task.name))?;
if !status.success() {
return Err(format!(
"task {} failed with status {:?}",
task.name, status
));
}
Ok(())
}
pub(super) fn spawn_service_task(
task: &OrchestrationTask,
log_file: &Path,
) -> Result<Child, String> {
if task.command.is_empty() {
return Err(format!("task {} has empty command", task.name));
}
if let Some(parent) = log_file.parent() {
std::fs::create_dir_all(parent)
.map_err(|err| format!("create log directory {} failed: {err}", parent.display()))?;
}
let stdout_file = OpenOptions::new()
.create(true)
.append(true)
.open(log_file)
.map_err(|err| format!("open service log {} failed: {err}", log_file.display()))?;
let stderr_file = stdout_file
.try_clone()
.map_err(|err| format!("clone service log {} failed: {err}", log_file.display()))?;
let mut cmd = Command::new(&task.command[0]);
if task.command.len() > 1 {
cmd.args(&task.command[1..]);
}
if let Some(cwd) = &task.cwd {
cmd.current_dir(cwd);
}
for (key, value) in &task.env {
cmd.env(key, value);
}
cmd.stdout(Stdio::from(stdout_file));
cmd.stderr(Stdio::from(stderr_file));
let mut child = cmd
.spawn()
.map_err(|err| format!("spawn service task {} failed: {err}", task.name))?;
std::thread::sleep(Duration::from_millis(60));
if let Some(status) = child
.try_wait()
.map_err(|err| format!("poll service task {} failed: {err}", task.name))?
{
return Err(format!(
"service task {} exited too early with status {:?}",
task.name, status
));
}
Ok(child)
}