1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use crate::{error::Error, plan};
use anyhow::Result;
use std::{collections::HashMap, process::Stdio};
use threadpool::ThreadPool;
#[derive(Debug, Clone)]
pub(crate) struct OutputMode {
pub stderr: bool,
pub stdout: bool,
}
pub(crate) struct ExecutionEngine {
pub output: OutputMode,
}
impl ExecutionEngine {
pub fn new(output: OutputMode) -> Self {
Self { output }
}
pub async fn execute(&self, plan: plan::ExecutionPlan, workers: usize) -> Result<()> {
struct Work {
workdir: Option<String>,
env: HashMap<String, String>,
shell: plan::Shell,
command: String,
}
for stage in &plan.stages {
let pool = ThreadPool::new(workers);
let (signal_tx, signal_rx) = std::sync::mpsc::channel::<Result<()>>();
let mut signal_cnt = 0;
let nodes = stage.nodes.iter().map(|v| plan.nodes.get(v).unwrap());
for node in nodes {
for matrix in &node.invocations {
let mut work = Vec::<Work>::new();
for task in &node.tasks {
let workdir = if let Some(workdir) = &task.workdir {
Some(workdir.to_owned())
} else if let Some(workdir) = &node.workdir {
Some(workdir.to_owned())
} else {
None
};
let shell = if let Some(shell) = &task.shell {
shell.to_owned()
} else if let Some(shell) = &node.shell {
shell.to_owned()
} else {
crate::plan::Shell {
program: "sh".to_owned(),
args: vec!["-c".to_owned()],
}
};
let mut env = plan.env.clone();
env.extend(node.env.clone());
env.extend(matrix.env.clone());
env.extend(task.env.clone());
signal_cnt += 1;
work.push(Work {
command: task.cmd.clone(),
env,
shell,
workdir,
})
}
let t_tx = signal_tx.clone();
let output = self.output.clone();
// executes matrix entry
pool.execute(move || {
let res = move || -> Result<()> {
for w in work {
let mut cmd_proc = std::process::Command::new(w.shell.program);
cmd_proc.args(w.shell.args);
cmd_proc.envs(w.env);
if let Some(w) = w.workdir {
cmd_proc.current_dir(w);
}
cmd_proc.arg(&w.command);
cmd_proc.stdin(Stdio::null());
if !output.stdout {
cmd_proc.stdout(Stdio::null());
}
if !output.stderr {
cmd_proc.stderr(Stdio::null());
}
let _ = cmd_proc.spawn()?;
let output = cmd_proc.output()?;
match output.status.code().unwrap() {
| 0 => Ok(()),
| v => Err(Error::ChildProcess(format!(
"command: {} failed to execute with code {}",
w.command, v
))),
}?
}
Ok(())
}();
t_tx.send(res).expect("send failed");
});
}
}
let errs = signal_rx
.iter()
.take(signal_cnt)
.filter(|x| x.is_err())
.map(|x| x.expect_err("expecting an err"))
.collect::<Vec<_>>();
if errs.len() > 0 {
return Err(Error::Many(errs).into());
// abort at this stage
}
}
Ok(())
}
}