use crate::find_and_parse_arun_toml;
use crate::{Error, Result};
use crate::{Runner, ShouldRun};
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
use sysinfo::{Pid, Process, ProcessRefreshKind, System};
use tokio::process::Child;
use tokio::time::sleep;
const WATCH_CHILD_DELAY: u64 = 3000;
#[tokio::main]
pub async fn run(run_ref: &str) -> Result<()> {
let mut parts = run_ref.splitn(2, '.');
let part1 = parts.next().ok_or_else(|| Error::RunRefNoParts(run_ref.to_string()))?;
let part2 = parts.next();
let config = find_and_parse_arun_toml(Path::new("./"))?;
let runners = if let Some(part2) = part2 {
config.get_grouped_runner(part1, part2).map(|r| vec![r])
}
else {
config
.get_runners(part1)
.or_else(|| config.get_solo_runner(part1).map(|r| vec![r]))
};
if let Some(runners) = runners {
run_runners(runners).await?;
} else {
println!("No runners found for '{run_ref}'");
}
Ok(())
}
async fn run_runners(runners: Vec<&Runner>) -> Result<()> {
let root_dir = Path::new(".");
struct RunnerConcurrentSpawn {
name: String,
child: Child,
end_all_on_exit: bool,
}
let mut children_to_watch: Vec<RunnerConcurrentSpawn> = Vec::new();
for runner in runners.iter() {
println!("==== Running runner: {}", runner.name);
match runner.should_run(root_dir)? {
ShouldRun::No(reason) => println!("Skip running runner '{}' because {reason}", runner.name),
ShouldRun::Yes => {
let child = runner.exec().await?;
if let Some(child) = child {
children_to_watch.push(RunnerConcurrentSpawn {
name: runner.name.to_string(),
child,
end_all_on_exit: runner.end_all_on_exit,
});
}
}
}
}
if !children_to_watch.is_empty() {
let mut end_all = false;
let mut sys = System::new();
'main: loop {
for RunnerConcurrentSpawn {
child, end_all_on_exit, ..
} in children_to_watch.iter_mut()
{
let status = child.try_wait()?;
if status.is_some() && *end_all_on_exit {
end_all = true;
}
}
if end_all {
for RunnerConcurrentSpawn { name, child, .. } in children_to_watch.iter_mut() {
if (child.try_wait()?).is_none() {
terminate_process_tree(&mut sys, name, child).await?
}
}
break 'main;
}
sleep(Duration::from_millis(WATCH_CHILD_DELAY)).await;
}
}
Ok(())
}
async fn terminate_process_tree(sys: &mut System, name: &str, proc: &mut Child) -> Result<()> {
if let Some(proc_id) = proc.id() {
let proc_pid = Pid::from_u32(proc_id);
sys.refresh_processes_specifics(ProcessRefreshKind::everything().without_cpu());
let sys_processes = sys.processes();
let children = find_descendant(sys_processes, &proc_pid);
match proc.kill().await {
Ok(_) => (),
Err(ex) => println!("Warning - error while stopping runner {name}. Cause: {ex}"),
};
for (pid, _) in children {
if let Some(process) = sys.process(pid) {
let _ = process.kill();
}
}
}
Ok(())
}
fn find_descendant(sys_processes: &HashMap<Pid, Process>, root_pid: &Pid) -> Vec<(Pid, String)> {
let mut children: HashMap<Pid, String> = HashMap::new();
'main: loop {
let mut cycle_has = false;
for (pid, p) in sys_processes.iter() {
if let Some(parent_pid) = p.parent() {
if !children.contains_key(pid) && (parent_pid == *root_pid || children.contains_key(&parent_pid)) {
children.insert(*pid, p.name().to_string());
cycle_has = true;
}
}
}
if !cycle_has {
break 'main;
}
}
children.into_iter().collect()
}