1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use anyhow::Result;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use interactive_process::InteractiveProcess;
use threadpool::ThreadPool;
use crate::{
error::Error,
output::{self, Controller},
plan,
};
pub(crate) struct ExecutionEngine {
pub output: Arc<Mutex<output::Controller>>,
}
impl ExecutionEngine {
pub fn new(prefix: String, silent: bool) -> Self {
Self {
output: Arc::new(Mutex::new(Controller::new(
!silent,
prefix,
Box::new(std::io::stdout()),
))),
}
}
pub async fn execute(&self, plan: plan::ExecutionPlan, workers: usize) -> Result<()> {
struct Work {
workdir: Option<String>,
env: HashMap<String, String>,
shell: plan::Shell,
command: String,
}
for stage in &plan.stages {
let pool = ThreadPool::new(workers);
let (signal_tx, signal_rx) = std::sync::mpsc::channel::<Result<()>>();
let mut signal_cnt = 0;
let nodes = stage.nodes.iter().map(|v| plan.nodes.get(v).unwrap());
for node in nodes {
for matrix in &node.invocations {
let t_tx = signal_tx.clone();
let t_output = self.output.clone();
let mut work = Vec::<Work>::new();
for task in &node.tasks {
let workdir = if let Some(workdir) = &task.workdir {
Some(workdir.to_owned())
} else if let Some(workdir) = &node.workdir {
Some(workdir.to_owned())
} else {
None
};
let shell = if let Some(shell) = &task.shell {
shell.to_owned()
} else if let Some(shell) = &node.shell {
shell.to_owned()
} else {
crate::plan::Shell {
program: "sh".to_owned(),
args: vec!["-c".to_owned()],
}
};
let mut env = plan.env.clone();
env.extend(node.env.clone());
env.extend(matrix.env.clone());
env.extend(task.env.clone());
signal_cnt += 1;
work.push(Work {
command: task.cmd.clone(),
env,
shell,
workdir,
})
}
// executes matrix entry
pool.execute(move || {
let res = move || -> Result<()> {
for w in work {
let mut cmd_proc = std::process::Command::new(w.shell.program);
cmd_proc.args(w.shell.args);
cmd_proc.envs(w.env);
if let Some(w) = w.workdir {
cmd_proc.current_dir(w);
}
cmd_proc.arg(&w.command);
let loc_out = t_output.clone();
let exit_status = InteractiveProcess::new(&mut cmd_proc, move |l| match l {
| Ok(v) => {
let mut lock = loc_out.lock().unwrap();
lock.print(&v).expect("could not print");
},
| Err(..) => {},
})?
.wait()?;
if let Some(code) = exit_status.code() {
if code != 0 {
let err_msg = format!("command \"{}\" failed with code {}", &w.command, code);
return Err(Error::ChildProcess(err_msg).into());
}
}
}
Ok(())
}();
match res {
| Ok(..) => t_tx.send(Ok(())).expect("send failed"),
| Err(e) => t_tx
// error formatting should be improved
.send(Err(Error::Generic(format!("{:?}", e)).into()))
.expect("send failed"),
}
});
}
}
let errs = signal_rx
.iter()
.take(signal_cnt)
.filter(|x| x.is_err())
.map(|x| x.expect_err("expect"))
.collect::<Vec<_>>();
if errs.len() > 0 {
return Err(Error::Many(errs).into()); // abort at this stage
}
}
Ok(())
}
}