1use std::{
4 io::{self, BufRead, Read},
5 process::ExitStatus,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Duration,
11};
12
13use tokio::{
14 io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader},
15 sync::mpsc::Sender,
16 task::{self, JoinHandle},
17 time::sleep,
18};
19use tracing::{error, info};
20
21use crate::process::{
22 ProcessStatus, capture_exit_status, get_process_status, spawn_process, stop_child,
23};
24
25#[derive(Debug, PartialEq)]
27pub enum RunEvent {
28 ProcessCreationFailed(String),
30 ProcessCreated,
32 ProcessEnd(bool),
34 ProcessNewOutputLine(String),
36}
37
38pub struct Runner {
40 command: String,
42 args: Vec<String>,
44}
45
46impl Runner {
47 pub fn new(command: impl Into<String>, args: Vec<impl Into<String>>) -> Self {
57 Self {
58 command: command.into(),
59 args: args.into_iter().map(|a| a.into()).collect(),
60 }
61 }
62
63 pub fn new_without_args(command: impl Into<String>) -> Self {
73 Self {
74 command: command.into(),
75 args: Vec::new(),
76 }
77 }
78 pub fn get_full_command(&self) -> String {
89 format!("{} {}", &self.command, &self.args.join(" "))
90 }
91 async fn read_stream<T: AsyncRead + Unpin>(tx: Sender<RunEvent>, mut stream: T) {
92 let mut buffer = [0; 1024];
93 loop {
94 let read_result = stream.read(&mut buffer).await;
95 match read_result {
96 Ok(0) => break,
97 Ok(n) => {
98 let data = String::from_utf8_lossy(&buffer[..n]);
99 let _ = tx
100 .send(RunEvent::ProcessNewOutputLine(data.to_string()))
101 .await;
102 }
103 Err(_) => break,
104 }
105 }
106 }
107 async fn launch_stream_reader<T>(tx: Sender<RunEvent>, stream: T) -> JoinHandle<()>
108 where
109 T: AsyncRead + Unpin + Send + 'static,
110 {
111 task::spawn(async move { Runner::read_stream(tx, stream).await })
112 }
113
114 pub async fn run(
146 &self,
147 tx: Sender<RunEvent>,
148 should_stop: Arc<AtomicBool>,
149 ) -> Option<ExitStatus> {
150 let mut process = spawn_process(&self.command, self.args.clone())
151 .map_err(async |err| {
152 let _ = tx
153 .send(RunEvent::ProcessCreationFailed(format!("{:?}", err)))
154 .await;
155 })
156 .ok()?;
157
158 let _ = tx.send(RunEvent::ProcessCreated).await;
159
160 let stdout_task = if let Some(stdout) = process.stdout.take() {
161 info!("Launching stdout reader function");
162 Some(Runner::launch_stream_reader(tx.clone(), stdout))
163 } else {
164 error!("Failed to launch stdout reader function");
165 None
166 };
167
168 let stderr_task = if let Some(stderr) = process.stderr.take() {
169 info!("Launching stderr reader function");
170 Some(Runner::launch_stream_reader(tx.clone(), stderr))
171 } else {
172 error!("Failed to launch stderr reader function");
173 None
174 };
175
176 let process_task = task::spawn(async move {
177 loop {
178 if should_stop.load(Ordering::Relaxed) {
179 if stop_child(&mut process).await.is_ok() {
180 return capture_exit_status(&mut process).await.ok();
181 }
182 return None;
183 }
184
185 match get_process_status(&mut process).await {
187 Err(_) => return None,
188 Ok(ProcessStatus::Done(status)) => return Some(status),
189 Ok(ProcessStatus::Running) => {
190 sleep(Duration::from_millis(100)).await;
191 }
192 }
193 }
194 });
195
196 let (process_result, stdout_result, stderr_result) = tokio::join!(
197 process_task,
198 async {
199 if let Some(task) = stdout_task {
200 task.await;
201 }
202 },
203 async {
204 if let Some(task) = stderr_task {
205 task.await;
206 }
207 }
208 );
209 let exit_status = process_result.ok().flatten();
210 let success = exit_status.map_or(false, |status| status.success());
211 let _ = tx.send(RunEvent::ProcessEnd(success)).await;
212 exit_status
213 }
214}
215
216#[cfg(test)]
217mod test {
218 use std::{env, os};
219
220 use tokio::{
221 process::Command,
222 sync::mpsc::{Receiver, channel},
223 };
224
225 use super::*;
226
227 async fn compile_program(c_file: &str, target: &str) {
228 let output = Command::new("gcc")
229 .arg(c_file)
230 .arg("-o")
231 .arg(target)
232 .output()
233 .await
234 .expect("Couldn't compile program");
235 }
236 async fn launch_program(
237 target: &str,
238 stop: Arc<AtomicBool>,
239 ) -> (JoinHandle<Option<ExitStatus>>, Receiver<RunEvent>) {
240 let runner = Runner::new_without_args(target.to_string());
241
242 let (tx, rx) = channel(10);
243 let thread_stop = stop.clone();
244
245 (
246 task::spawn(async move {
247 let exit = runner.run(tx, thread_stop).await;
248 assert!(exit.is_some());
249 exit
250 }),
251 rx,
252 )
253 }
254 async fn run_blocking_program(target: &str) {
255 tokio::time::sleep(Duration::from_secs(1)).await;
256 let stop = Arc::new(AtomicBool::new(false));
257 let (handler, _) = launch_program(target, stop.clone()).await;
258 stop.store(true, Ordering::Relaxed);
260 handler
261 .await
262 .expect("Couldn't join thread")
263 .expect("Couldn't get child exit status");
264 }
265 async fn compile_and_run_blocking_program(c_file: &str, target: &str) {
266 compile_program(c_file, target).await;
267 run_blocking_program(target).await;
268 let _ = std::fs::remove_file(target);
269 }
270 #[tokio::test]
271 async fn test_stuck_stdin() {
272 let c_file = "./tests/assets/wait_stdin.c";
274 let target = "./wait_stdin";
275 compile_and_run_blocking_program(c_file, target).await;
276 }
277
278 #[tokio::test]
279 async fn test_infinite_loop() {
280 let c_file = "./tests/assets/infinite_loop.c";
282 let target = "./infinite_loop";
283 compile_and_run_blocking_program(c_file, target).await;
284 }
285
286 #[tokio::test]
287 async fn test_infinite_loop_with_sig_mapped() {
288 let c_file = "./tests/assets/infinite_loop_map_signals.c";
290 let target = "./infinit_loop_map_signals";
291 compile_and_run_blocking_program(c_file, target).await;
292 }
293 #[tokio::test]
294 async fn test_infinite_loop_with_timeouts() {
295 let c_file = "./tests/assets/infinite_loop.c";
297 let target = "./infinite_loop_stdout";
298
299 compile_program(c_file, target).await;
300
301 let stop = Arc::new(AtomicBool::new(false));
302 let (handler, mut rx) = launch_program(target, stop.clone()).await;
303
304 tokio::time::sleep(Duration::from_millis(1000)).await;
306
307 let event = tokio::time::timeout(Duration::from_secs(5), rx.recv())
309 .await
310 .expect("To receive message before timeout")
311 .expect("To have a message");
312
313 assert_eq!(event, RunEvent::ProcessCreated);
314
315 for i in 1..=4 {
316 let event = tokio::time::timeout(Duration::from_secs(2), rx.recv())
317 .await
318 .expect("To receive message before timeout")
319 .expect("To have a message");
320
321 assert_eq!(
322 event,
323 RunEvent::ProcessNewOutputLine(format!("Hello {}\n", i))
324 );
325 }
326
327 stop.store(true, Ordering::Relaxed);
328
329 let join_result = tokio::time::timeout(Duration::from_secs(5), handler).await;
331
332 join_result
333 .expect("Timeout waiting for handler to complete")
334 .expect("Couldn't join thread")
335 .expect("Couldn't get child exit status");
336
337 let _ = std::fs::remove_file(target);
338 }
339}