1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use {
crate::{
error::Error,
plan,
},
anyhow::Result,
std::{
collections::HashMap,
process::Stdio,
},
threadpool::ThreadPool,
};
#[derive(Debug, Clone)]
pub(crate) struct OutputMode {
pub stderr: bool,
pub stdout: bool,
}
pub(crate) struct ExecutionEngine {
pub output: OutputMode,
}
impl ExecutionEngine {
pub fn new(output: OutputMode) -> Self {
Self { output }
}
pub fn execute(&self, plan: &plan::ExecutionPlan, workers: usize) -> Result<()> {
struct Work {
workdir: Option<String>,
env: HashMap<String, String>,
shell: plan::Shell,
command: String,
}
for stage in &plan.stages {
let pool = ThreadPool::new(workers);
let (signal_tx, signal_rx) = std::sync::mpsc::channel::<Result<()>>();
let mut signal_cnt = 0;
let nodes = stage.nodes.iter().map(|v| plan.nodes.get(v).unwrap());
for node in nodes {
for matrix in &node.invocations {
let mut work = Vec::<Work>::new();
for task in &node.tasks {
let workdir = if let Some(workdir) = &task.workdir {
Some(workdir.to_owned())
} else if let Some(workdir) = &node.workdir {
Some(workdir.to_owned())
} else {
None
};
let shell = if let Some(shell) = &task.shell {
shell.to_owned()
} else if let Some(shell) = &node.shell {
shell.to_owned()
} else {
crate::plan::Shell {
program: "sh".to_owned(),
args: vec!["-c".to_owned()],
}
};
let mut env = plan.env.clone();
env.extend(node.env.clone());
env.extend(matrix.env.clone());
env.extend(task.env.clone());
signal_cnt += 1;
work.push(Work {
command: task.cmd.clone(),
env,
shell,
workdir,
})
}
let output = self.output.clone();
// executes matrix entry
for w in work {
let t_tx = signal_tx.clone();
pool.execute(move || {
let res = move || -> Result<()> {
let mut cmd_proc = std::process::Command::new(w.shell.program);
cmd_proc.args(w.shell.args);
cmd_proc.envs(w.env);
if let Some(w) = w.workdir {
cmd_proc.current_dir(w);
}
cmd_proc.arg(&w.command);
cmd_proc.stdin(Stdio::null());
if !output.stdout {
cmd_proc.stdout(Stdio::null());
}
if !output.stderr {
cmd_proc.stderr(Stdio::null());
}
let output = cmd_proc.spawn()?.wait_with_output()?;
match output.status.code().unwrap() {
| 0 => Ok(()),
| v => {
Err(Error::ChildProcess(format!(
"command: {} failed to execute with code {}",
w.command, v
)))
},
}?;
Ok(())
}();
t_tx.send(res).expect("send failed");
});
}
}
}
let errs = signal_rx
.iter()
.take(signal_cnt)
.filter(|x| x.is_err())
.map(|x| x.expect_err("expecting an err"))
.collect::<Vec<_>>();
if errs.len() > 0 {
return Err(Error::Many(errs).into());
// abort at this stage
}
}
Ok(())
}
}