1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
use {
crate::*,
anyhow::Result,
crossbeam::channel::{bounded, select, unbounded, Receiver, Sender},
std::{
io::{BufRead, BufReader},
process::Stdio,
thread,
},
};
/// an executor calling a cargo (or similar) command in a separate
/// thread when asked to and sending the lines of output in a channel,
/// and finishing by None.
/// Channel sizes are designed to avoid useless computations.
pub struct Executor {
pub line_receiver: Receiver<CommandExecInfo>,
task_sender: Sender<Task>,
stop_sender: Sender<()>, // signal for stopping the thread
thread: thread::JoinHandle<()>,
}
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub struct Task {
pub backtrace: bool,
}
impl Executor {
/// launch the commands, send the lines of its stderr on the
/// line channel.
/// If `with_stdout` capture and send also its stdout.
pub fn new(mission: &Mission) -> Result<Self> {
let mut command = mission.get_command();
let with_stdout = mission.need_stdout();
let (task_sender, task_receiver) = bounded::<Task>(1);
let (stop_sender, stop_receiver) = bounded(0);
let (line_sender, line_receiver) = unbounded();
command
.stderr(Stdio::piped())
.stdout(if with_stdout { Stdio::piped() } else { Stdio::null() });
let thread = thread::spawn(move || {
loop {
select! {
recv(task_receiver) -> task => {
let task = match task {
Ok(task) => task,
_ => { break; }
};
debug!("starting task {:?}", task);
command.env(
"RUST_BACKTRACE",
if task.backtrace { "1" } else { "0" },
);
let child = command.spawn();
let mut child = match child {
Ok(child) => child,
Err(e) => {
if let Err(e) = line_sender.send(CommandExecInfo::Error(
format!("command launch failed: {}", e)
)) {
debug!("error when sending launch error: {}", e);
break;
}
continue;
}
};
let stderr = match child.stderr.take() {
Some(stderr) => stderr,
None => {
if let Err(e) = line_sender.send(CommandExecInfo::Error(
"taking stderr failed".to_string()
)) {
debug!("error when sending stderr error: {}", e);
break;
}
continue;
}
};
// if we need stdout, we listen for in on a separate thread
// (if somebody knows of an efficient and clean cross-platform way
// to listen for both stderr and stdout on the same thread, please tell me)
let out_thread = if with_stdout {
let stdout = match child.stdout.take() {
Some(stdout) => stdout,
None => {
if let Err(e) = line_sender.send(CommandExecInfo::Error(
"taking stdout failed".to_string()
)) {
debug!("error when sending stdout error: {}", e);
break;
}
continue;
}
};
let line_sender = line_sender.clone();
Some(thread::spawn(move|| {
let mut buf = String::new();
let mut buf_reader = BufReader::new(stdout);
loop {
let r = match buf_reader.read_line(&mut buf) {
Err(e) => CommandExecInfo::Error(e.to_string()),
Ok(0) => {
// finished
break;
}
Ok(_) => {
// debug!("STDOUT : {:?}", &buf);
CommandExecInfo::Line(CommandOutputLine {
origin: CommandStream::StdOut,
content: TLine::from_tty(buf.trim_end()),
})
}
};
if let Err(e) = line_sender.send(r) {
debug!("error when sending stdout line: {}", e);
break;
}
buf.clear();
}
}))
} else {
None
};
let mut buf = String::new();
let mut buf_reader = BufReader::new(stderr);
loop {
if let Ok(()) = stop_receiver.try_recv() {
debug!("stopping during execution");
match child.kill() {
Ok(()) => debug!("command stopped"),
_ => debug!("command already stopped"),
}
return;
}
let r = match buf_reader.read_line(&mut buf) {
Err(e) => CommandExecInfo::Error(e.to_string()),
Ok(0) => {
// finished
break;
}
Ok(_) => {
// debug!("STDERR : {:?}", &buf);
CommandExecInfo::Line(CommandOutputLine {
origin: CommandStream::StdErr,
content: TLine::from_tty(buf.trim_end()),
})
}
};
if let Err(e) = line_sender.send(r) {
debug!("error when sending stderr line: {}", e);
break;
}
buf.clear();
}
let status = match child.wait() {
Ok(exit_status) => {
debug!("exit_status: {:?}", &exit_status);
Some(exit_status)
}
Err(e) => {
warn!("error in child: {:?}", e);
None
}
};
if let Err(e) = line_sender.send(CommandExecInfo::End { status }) {
debug!("error when sending line: {}", e);
break;
}
debug!("finished command execution");
if let Some(thread) = out_thread {
debug!("waiting for out listening thread to join");
thread.join().unwrap();
debug!("out listening thread joined");
}
}
recv(stop_receiver) -> _ => {
debug!("leaving thread");
return;
}
}
}
});
Ok(Self {
line_receiver,
task_sender,
stop_sender,
thread,
})
}
/// notify the executor a computation is necessary
pub fn start(&self, task: Task) -> Result<()> {
self.task_sender.try_send(task)?;
Ok(())
}
pub fn die(self) -> Result<()> {
debug!("received kill order");
self.stop_sender.send(()).unwrap();
self.thread.join().unwrap();
Ok(())
}
}