Skip to main content

ej_io/
runner.rs

1//! High-level async process runner with event handling.
2
3use std::{
4    io::{self, BufRead, Read},
5    process::ExitStatus,
6    sync::{
7        Arc,
8        atomic::{AtomicBool, Ordering},
9    },
10    time::Duration,
11};
12
13use tokio::{
14    io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader},
15    sync::mpsc::Sender,
16    task::{self, JoinHandle},
17    time::sleep,
18};
19use tracing::{error, info};
20
21use crate::process::{
22    ProcessStatus, capture_exit_status, get_process_status, spawn_process, stop_child,
23};
24
25/// Events emitted during process execution.
26#[derive(Debug, PartialEq)]
27pub enum RunEvent {
28    /// Process creation failed with error message.
29    ProcessCreationFailed(String),
30    /// Process was successfully created.
31    ProcessCreated,
32    /// Process ended (true = success, false = failure).
33    ProcessEnd(bool),
34    /// New output line from the process.
35    ProcessNewOutputLine(String),
36}
37
38/// High-level async process runner with event-driven output handling.
39pub struct Runner {
40    /// Command to execute.
41    command: String,
42    /// Command line arguments.
43    args: Vec<String>,
44}
45
46impl Runner {
47    /// Create a new runner with command and arguments.
48    ///
49    /// # Examples
50    ///
51    /// ```rust
52    /// use ej_io::runner::Runner;
53    ///
54    /// let runner = Runner::new("ls", vec!["-la", "/tmp"]);
55    /// ```
56    pub fn new(command: impl Into<String>, args: Vec<impl Into<String>>) -> Self {
57        Self {
58            command: command.into(),
59            args: args.into_iter().map(|a| a.into()).collect(),
60        }
61    }
62
63    /// Create a new runner with just a command (no arguments).
64    ///
65    /// # Examples
66    ///
67    /// ```rust
68    /// use ej_io::runner::Runner;
69    ///
70    /// let runner = Runner::new_without_args("pwd");
71    /// ```
72    pub fn new_without_args(command: impl Into<String>) -> Self {
73        Self {
74            command: command.into(),
75            args: Vec::new(),
76        }
77    }
78    /// Get the full command string with arguments.
79    ///
80    /// # Examples
81    ///
82    /// ```rust
83    /// use ej_io::runner::Runner;
84    ///
85    /// let runner = Runner::new("ls", vec!["-la"]);
86    /// assert_eq!(runner.get_full_command(), "ls -la");
87    /// ```
88    pub fn get_full_command(&self) -> String {
89        format!("{} {}", &self.command, &self.args.join(" "))
90    }
91    async fn read_stream<T: AsyncRead + Unpin>(tx: Sender<RunEvent>, mut stream: T) {
92        let mut buffer = [0; 1024];
93        loop {
94            let read_result = stream.read(&mut buffer).await;
95            match read_result {
96                Ok(0) => break,
97                Ok(n) => {
98                    let data = String::from_utf8_lossy(&buffer[..n]);
99                    let _ = tx
100                        .send(RunEvent::ProcessNewOutputLine(data.to_string()))
101                        .await;
102                }
103                Err(_) => break,
104            }
105        }
106    }
107    async fn launch_stream_reader<T>(tx: Sender<RunEvent>, stream: T) -> JoinHandle<()>
108    where
109        T: AsyncRead + Unpin + Send + 'static,
110    {
111        task::spawn(async move { Runner::read_stream(tx, stream).await })
112    }
113
114    /// Asynchronously run the process with event monitoring.
115    ///
116    /// Starts the process and monitors its execution asynchronously, sending events via the provided tokio channel.
117    /// Reads stdout and stderr concurrently until the process finishes or is stopped.
118    ///
119    /// # Arguments
120    ///
121    /// * `tx` - Tokio async channel sender for RunEvent notifications
122    /// * `should_stop` - Atomic flag to signal process termination
123    ///
124    /// # Returns
125    ///
126    /// Returns an `Option<ExitStatus>` - `None` if the process failed to start or was terminated,
127    /// `Some(ExitStatus)` if the process completed normally.
128    ///
129    /// # Examples
130    ///
131    /// ```rust
132    /// use ej_io::runner::{Runner, RunEvent};
133    /// use std::sync::{Arc, atomic::AtomicBool};
134    /// use tokio::sync::mpsc;
135    ///
136    /// #[tokio::main]
137    /// async fn main() {
138    ///     let runner = Runner::new("echo", vec!["Hello"]);
139    ///     let (tx, mut rx) = mpsc::channel(100);
140    ///     let should_stop = Arc::new(AtomicBool::new(false));
141    ///
142    ///     let exit_status = runner.run(tx, should_stop).await;
143    /// }
144    /// ```
145    pub async fn run(
146        &self,
147        tx: Sender<RunEvent>,
148        should_stop: Arc<AtomicBool>,
149    ) -> Option<ExitStatus> {
150        let mut process = spawn_process(&self.command, self.args.clone())
151            .map_err(async |err| {
152                let _ = tx
153                    .send(RunEvent::ProcessCreationFailed(format!("{:?}", err)))
154                    .await;
155            })
156            .ok()?;
157
158        let _ = tx.send(RunEvent::ProcessCreated).await;
159
160        let stdout_task = if let Some(stdout) = process.stdout.take() {
161            info!("Launching stdout reader function");
162            Some(Runner::launch_stream_reader(tx.clone(), stdout))
163        } else {
164            error!("Failed to launch stdout reader function");
165            None
166        };
167
168        let stderr_task = if let Some(stderr) = process.stderr.take() {
169            info!("Launching stderr reader function");
170            Some(Runner::launch_stream_reader(tx.clone(), stderr))
171        } else {
172            error!("Failed to launch stderr reader function");
173            None
174        };
175
176        let process_task = task::spawn(async move {
177            loop {
178                if should_stop.load(Ordering::Relaxed) {
179                    if stop_child(&mut process).await.is_ok() {
180                        return capture_exit_status(&mut process).await.ok();
181                    }
182                    return None;
183                }
184
185                // Check process status
186                match get_process_status(&mut process).await {
187                    Err(_) => return None,
188                    Ok(ProcessStatus::Done(status)) => return Some(status),
189                    Ok(ProcessStatus::Running) => {
190                        sleep(Duration::from_millis(100)).await;
191                    }
192                }
193            }
194        });
195
196        let (process_result, stdout_result, stderr_result) = tokio::join!(
197            process_task,
198            async {
199                if let Some(task) = stdout_task {
200                    task.await;
201                }
202            },
203            async {
204                if let Some(task) = stderr_task {
205                    task.await;
206                }
207            }
208        );
209        let exit_status = process_result.ok().flatten();
210        let success = exit_status.map_or(false, |status| status.success());
211        let _ = tx.send(RunEvent::ProcessEnd(success)).await;
212        exit_status
213    }
214}
215
216#[cfg(test)]
217mod test {
218    use std::{env, os};
219
220    use tokio::{
221        process::Command,
222        sync::mpsc::{Receiver, channel},
223    };
224
225    use super::*;
226
227    async fn compile_program(c_file: &str, target: &str) {
228        let output = Command::new("gcc")
229            .arg(c_file)
230            .arg("-o")
231            .arg(target)
232            .output()
233            .await
234            .expect("Couldn't compile program");
235    }
236    async fn launch_program(
237        target: &str,
238        stop: Arc<AtomicBool>,
239    ) -> (JoinHandle<Option<ExitStatus>>, Receiver<RunEvent>) {
240        let runner = Runner::new_without_args(target.to_string());
241
242        let (tx, rx) = channel(10);
243        let thread_stop = stop.clone();
244
245        (
246            task::spawn(async move {
247                let exit = runner.run(tx, thread_stop).await;
248                assert!(exit.is_some());
249                exit
250            }),
251            rx,
252        )
253    }
254    async fn run_blocking_program(target: &str) {
255        tokio::time::sleep(Duration::from_secs(1)).await;
256        let stop = Arc::new(AtomicBool::new(false));
257        let (handler, _) = launch_program(target, stop.clone()).await;
258        // Stop should kill the process no matter the condition it is in
259        stop.store(true, Ordering::Relaxed);
260        handler
261            .await
262            .expect("Couldn't join thread")
263            .expect("Couldn't get child exit status");
264    }
265    async fn compile_and_run_blocking_program(c_file: &str, target: &str) {
266        compile_program(c_file, target).await;
267        run_blocking_program(target).await;
268        let _ = std::fs::remove_file(target);
269    }
270    #[tokio::test]
271    async fn test_stuck_stdin() {
272        // This code blocks reading stdin forever
273        let c_file = "./tests/assets/wait_stdin.c";
274        let target = "./wait_stdin";
275        compile_and_run_blocking_program(c_file, target).await;
276    }
277
278    #[tokio::test]
279    async fn test_infinite_loop() {
280        // This code does while(1)
281        let c_file = "./tests/assets/infinite_loop.c";
282        let target = "./infinite_loop";
283        compile_and_run_blocking_program(c_file, target).await;
284    }
285
286    #[tokio::test]
287    async fn test_infinite_loop_with_sig_mapped() {
288        // This code enters an inifinite loop and ignores sigterm and sigint
289        let c_file = "./tests/assets/infinite_loop_map_signals.c";
290        let target = "./infinit_loop_map_signals";
291        compile_and_run_blocking_program(c_file, target).await;
292    }
293    #[tokio::test]
294    async fn test_infinite_loop_with_timeouts() {
295        // This code loops forever and prints Hello * every second
296        let c_file = "./tests/assets/infinite_loop.c";
297        let target = "./infinite_loop_stdout";
298
299        compile_program(c_file, target).await;
300
301        let stop = Arc::new(AtomicBool::new(false));
302        let (handler, mut rx) = launch_program(target, stop.clone()).await;
303
304        // Give the program some time to start
305        tokio::time::sleep(Duration::from_millis(1000)).await;
306
307        // Wait for process creation with timeout
308        let event = tokio::time::timeout(Duration::from_secs(5), rx.recv())
309            .await
310            .expect("To receive message before timeout")
311            .expect("To have a message");
312
313        assert_eq!(event, RunEvent::ProcessCreated);
314
315        for i in 1..=4 {
316            let event = tokio::time::timeout(Duration::from_secs(2), rx.recv())
317                .await
318                .expect("To receive message before timeout")
319                .expect("To have a message");
320
321            assert_eq!(
322                event,
323                RunEvent::ProcessNewOutputLine(format!("Hello {}\n", i))
324            );
325        }
326
327        stop.store(true, Ordering::Relaxed);
328
329        // Wait for handler to complete with timeout
330        let join_result = tokio::time::timeout(Duration::from_secs(5), handler).await;
331
332        join_result
333            .expect("Timeout waiting for handler to complete")
334            .expect("Couldn't join thread")
335            .expect("Couldn't get child exit status");
336
337        let _ = std::fs::remove_file(target);
338    }
339}