Skip to main content

over_there/core/server/
proc.rs

1use log::error;
2use std::io;
3use std::pin::Pin;
4use std::process::Output;
5use std::sync::Arc;
6use tokio::{process::Child, runtime::Handle, sync::Mutex, task};
7
8#[derive(Copy, Clone, Debug)]
9pub struct ExitStatus {
10    pub id: u32,
11    pub is_success: bool,
12    pub exit_code: Option<i32>,
13}
14
15#[derive(Debug)]
16pub struct LocalProc {
17    id: u32,
18    inner: Child,
19    exit_status: Option<ExitStatus>,
20
21    supports_stdin: bool,
22    supports_stdout: bool,
23    supports_stderr: bool,
24
25    /// Handle to task that is processing stdout/stderr
26    io_handle: Option<task::JoinHandle<()>>,
27
28    /// Internal buffer of all stdout that has been acquired
29    stdout_buf: Arc<Mutex<Vec<u8>>>,
30
31    /// Internal buffer of all stderr that has been acquired
32    stderr_buf: Arc<Mutex<Vec<u8>>>,
33}
34
35impl LocalProc {
36    pub fn new(child: Child) -> Self {
37        Self {
38            id: child.id(),
39            exit_status: None,
40            supports_stdin: child.stdin.is_some(),
41            supports_stdout: child.stdout.is_some(),
42            supports_stderr: child.stderr.is_some(),
43            inner: child,
44            io_handle: None,
45            stdout_buf: Arc::new(Mutex::new(Vec::new())),
46            stderr_buf: Arc::new(Mutex::new(Vec::new())),
47        }
48    }
49
50    pub fn id(&self) -> u32 {
51        self.id
52    }
53
54    pub fn inner(&self) -> &Child {
55        &self.inner
56    }
57
58    pub async fn exit_status(&mut self) -> Option<ExitStatus> {
59        use futures::future::{poll_fn, Future};
60        use std::task::Poll;
61
62        match self.exit_status {
63            None => {
64                let exit_status =
65                    poll_fn(|ctx| match Pin::new(&mut self.inner).poll(ctx) {
66                        Poll::Ready(res) => Poll::Ready(Some(res)),
67                        Poll::Pending => Poll::Ready(None),
68                    })
69                    .await;
70
71                if let Some(status) = exit_status {
72                    self.exit_status = Some(ExitStatus {
73                        id: self.id,
74                        is_success: status.is_ok(),
75                        exit_code: status.ok().and_then(|s| s.code()),
76                    });
77                }
78
79                self.exit_status
80            }
81            x => x,
82        }
83    }
84
85    /// Spawns io-processing task for stdout/stderr
86    /// Will panic if not in tokio runtime
87    pub fn spawn(mut self) -> Self {
88        // Only spawn once
89        if self.io_handle.is_some() {
90            return self;
91        }
92
93        let handle = Handle::current();
94
95        let stdout = self.inner.stdout.take();
96        let stderr = self.inner.stderr.take();
97
98        let stdout_buf = Arc::clone(&self.stdout_buf);
99        let stderr_buf = Arc::clone(&self.stderr_buf);
100
101        let io_handle = handle.spawn(async move {
102            let _ = tokio::join!(
103                async {
104                    use tokio::io::AsyncReadExt;
105
106                    if let Some(mut stdout) = stdout {
107                        let mut buf = [0; 1024];
108
109                        loop {
110                            match stdout.read(&mut buf).await {
111                                Ok(size) if size > 0 => {
112                                    stdout_buf
113                                        .lock()
114                                        .await
115                                        .extend_from_slice(&buf[..size]);
116                                }
117                                Ok(_) => break,
118                                Err(x) => {
119                                    error!("stdout reader died: {}", x);
120                                    break;
121                                }
122                            }
123                        }
124                    }
125                },
126                async {
127                    use tokio::io::AsyncReadExt;
128
129                    if let Some(mut stderr) = stderr {
130                        let mut buf = [0; 1024];
131
132                        loop {
133                            match stderr.read(&mut buf).await {
134                                Ok(size) if size > 0 => {
135                                    stderr_buf
136                                        .lock()
137                                        .await
138                                        .extend_from_slice(&buf[..size]);
139                                }
140                                Ok(_) => break,
141                                Err(x) => {
142                                    error!("stderr reader died: {}", x);
143                                    break;
144                                }
145                            }
146                        }
147                    }
148                }
149            );
150        });
151
152        self.io_handle = Some(io_handle);
153
154        self
155    }
156
157    pub async fn write_stdin(&mut self, buf: &[u8]) -> io::Result<()> {
158        use tokio::io::AsyncWriteExt;
159
160        match self.inner.stdin.as_mut() {
161            Some(stdin) => {
162                let mut result = stdin.write_all(buf).await;
163                if result.is_ok() {
164                    result = stdin.flush().await;
165                }
166                result
167            }
168            None => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
169        }
170    }
171
172    pub async fn read_stdout(&mut self) -> io::Result<Vec<u8>> {
173        if self.supports_stdout {
174            Ok(self.stdout_buf.lock().await.drain(..).collect())
175        } else {
176            Err(io::Error::from(io::ErrorKind::BrokenPipe))
177        }
178    }
179
180    pub async fn read_stderr(&mut self) -> io::Result<Vec<u8>> {
181        if self.supports_stderr {
182            Ok(self.stderr_buf.lock().await.drain(..).collect())
183        } else {
184            Err(io::Error::from(io::ErrorKind::BrokenPipe))
185        }
186    }
187
188    pub fn kill(&mut self) -> io::Result<()> {
189        self.inner.kill()
190    }
191
192    pub async fn kill_and_wait(mut self) -> io::Result<Output> {
193        self.kill()?;
194        self.inner.wait_with_output().await
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use std::process::Stdio;
202    use std::time::Duration;
203    use tokio::process::Command;
204    use tokio::time::{delay_for, timeout};
205    use tokio::{fs, io};
206
207    #[tokio::test]
208    async fn test_id_should_return_child_id() {
209        let child = Command::new("cat")
210            .stdin(Stdio::null())
211            .stdout(Stdio::null())
212            .stderr(Stdio::null())
213            .spawn()
214            .unwrap();
215
216        let id = child.id();
217        let local_proc = LocalProc::new(child);
218        assert_eq!(id, local_proc.id());
219    }
220
221    #[tokio::test]
222    async fn test_write_stdin_should_return_an_error_if_not_piped() {
223        let child = Command::new("cat")
224            .stdin(Stdio::null())
225            .stdout(Stdio::null())
226            .stderr(Stdio::null())
227            .spawn()
228            .unwrap();
229
230        let mut local_proc = LocalProc::new(child);
231        match local_proc.write_stdin(&[1, 2, 3]).await {
232            Ok(_) => panic!("Successfully wrote to stdin when not piped"),
233            Err(x) => assert_eq!(x.kind(), io::ErrorKind::BrokenPipe),
234        }
235    }
236
237    #[tokio::test]
238    async fn test_write_stdin_should_write_contents_to_process() {
239        let f = tempfile::tempfile().unwrap();
240        let child = Command::new("cat")
241            .stdin(Stdio::piped())
242            .stdout(f.try_clone().unwrap())
243            .stderr(Stdio::null())
244            .spawn()
245            .unwrap();
246
247        let mut local_proc = LocalProc::new(child);
248        match local_proc.write_stdin(b"test").await {
249            Ok(_) => {
250                match timeout(Duration::from_millis(10), async {
251                    use std::io::SeekFrom;
252                    use tokio::io::AsyncReadExt;
253                    let mut f = fs::File::from_std(f);
254
255                    loop {
256                        let mut s = String::new();
257                        f.seek(SeekFrom::Start(0)).await.unwrap();
258                        f.read_to_string(&mut s).await.unwrap();
259                        if !s.is_empty() {
260                            break s;
261                        }
262
263                        task::yield_now().await;
264                    }
265                })
266                .await
267                {
268                    Ok(s) => assert_eq!(s, "test", "Unexpected output"),
269                    Err(x) => panic!("Failed to write to file: {}", x),
270                }
271            }
272            Err(_) => panic!("Failed to write to stdin"),
273        }
274    }
275
276    #[tokio::test]
277    async fn test_read_stdout_should_return_an_error_if_not_piped() {
278        let child = Command::new("echo")
279            .arg("test")
280            .stdin(Stdio::null())
281            .stdout(Stdio::null())
282            .stderr(Stdio::null())
283            .spawn()
284            .unwrap();
285
286        let mut local_proc = LocalProc::new(child);
287        match local_proc.read_stdout().await {
288            Ok(_) => {
289                panic!("Unexpectedly succeeded in reading stdout not piped")
290            }
291            Err(x) => assert_eq!(x.kind(), io::ErrorKind::BrokenPipe),
292        }
293    }
294
295    #[tokio::test]
296    async fn test_read_stdout_should_return_empty_content_if_none_available() {
297        let child = Command::new("echo")
298            .arg("test")
299            .stdin(Stdio::null())
300            .stdout(Stdio::piped())
301            .stderr(Stdio::null())
302            .spawn()
303            .unwrap();
304
305        // NOTE: Not spawning so we can ensure that no content is available
306        let mut local_proc = LocalProc::new(child);
307
308        match local_proc.read_stdout().await {
309            Ok(buf) => assert!(buf.is_empty()),
310            Err(x) => panic!("Unexpected error: {}", x),
311        }
312    }
313
314    #[tokio::test]
315    async fn test_read_stdout_should_not_return_content_returned_previously() {
316        let child = Command::new("echo")
317            .arg("test")
318            .stdin(Stdio::null())
319            .stdout(Stdio::piped())
320            .stderr(Stdio::null())
321            .spawn()
322            .unwrap();
323
324        let mut local_proc = LocalProc::new(child).spawn();
325
326        // Get first batch of bytes and discard
327        assert!(
328            !timeout(Duration::from_millis(10), async {
329                loop {
330                    match local_proc.read_stdout().await {
331                        Ok(buf) => {
332                            if !buf.is_empty() {
333                                break buf;
334                            }
335
336                            // NOTE: The read above is too quick as it only awaits
337                            //       for a lock, and thereby prevents switching
338                            //       to another task -- yield to enable switching
339                            task::yield_now().await;
340                        }
341                        Err(x) => panic!("Unexpected error: {}", x),
342                    }
343                }
344            })
345            .await
346            .unwrap()
347            .is_empty(),
348            "Failed to get first batch of content"
349        );
350
351        // Assert second batch is empty
352        assert!(
353            local_proc.read_stdout().await.unwrap().is_empty(),
354            "Unexpectedly got content when nothing should be left"
355        );
356    }
357
358    #[tokio::test]
359    async fn test_read_stdout_should_return_content_if_available() {
360        let child = Command::new("echo")
361            .arg("test")
362            .stdin(Stdio::null())
363            .stdout(Stdio::piped())
364            .stderr(Stdio::null())
365            .spawn()
366            .unwrap();
367
368        let mut local_proc = LocalProc::new(child).spawn();
369
370        let buf = timeout(Duration::from_millis(10), async {
371            loop {
372                match local_proc.read_stdout().await {
373                    Ok(buf) => {
374                        if !buf.is_empty() {
375                            break buf;
376                        }
377
378                        // NOTE: The read above is too quick as it only awaits
379                        //       for a lock, and thereby prevents switching
380                        //       to another task -- yield to enable switching
381                        task::yield_now().await;
382                    }
383                    Err(x) => panic!("Unexpected error: {}", x),
384                }
385            }
386        })
387        .await
388        .unwrap();
389
390        assert_eq!(buf, b"test\n");
391    }
392
393    #[tokio::test]
394    async fn test_read_stderr_should_return_an_error_if_not_piped() {
395        let child = Command::new("rev")
396            .arg("--aaa")
397            .stdin(Stdio::null())
398            .stdout(Stdio::null())
399            .stderr(Stdio::null())
400            .spawn()
401            .unwrap();
402
403        let mut local_proc = LocalProc::new(child);
404        match local_proc.read_stderr().await {
405            Ok(_) => {
406                panic!("Unexpectedly succeeded in reading stderr not piped")
407            }
408            Err(x) => assert_eq!(x.kind(), io::ErrorKind::BrokenPipe),
409        }
410    }
411
412    #[tokio::test]
413    async fn test_read_stderr_should_return_empty_content_if_none_available() {
414        let child = Command::new("rev")
415            .arg("--aaa")
416            .stdin(Stdio::null())
417            .stdout(Stdio::null())
418            .stderr(Stdio::piped())
419            .spawn()
420            .unwrap();
421
422        // NOTE: Not spawning so we can ensure that no content is available
423        let mut local_proc = LocalProc::new(child);
424
425        match local_proc.read_stderr().await {
426            Ok(buf) => assert!(buf.is_empty()),
427            Err(x) => panic!("Unexpected error: {}", x),
428        }
429    }
430
431    #[tokio::test]
432    async fn test_read_stderr_should_not_return_content_returned_previously() {
433        let child = Command::new("rev")
434            .arg("--aaa")
435            .stdin(Stdio::null())
436            .stdout(Stdio::null())
437            .stderr(Stdio::piped())
438            .spawn()
439            .unwrap();
440
441        let mut local_proc = LocalProc::new(child).spawn();
442
443        // Get first batch of bytes and discard
444        assert!(
445            !timeout(Duration::from_millis(10), async {
446                loop {
447                    match local_proc.read_stderr().await {
448                        Ok(buf) => {
449                            if !buf.is_empty() {
450                                break buf;
451                            }
452
453                            // NOTE: The read above is too quick as it only awaits
454                            //       for a lock, and thereby prevents switching
455                            //       to another task -- yield to enable switching
456                            task::yield_now().await;
457                        }
458                        Err(x) => panic!("Unexpected error: {}", x),
459                    }
460                }
461            })
462            .await
463            .unwrap()
464            .is_empty(),
465            "Failed to get first batch of content"
466        );
467
468        // Assert second batch is empty
469        assert!(
470            local_proc.read_stderr().await.unwrap().is_empty(),
471            "Unexpectedly got content when nothing should be left"
472        );
473    }
474
475    #[tokio::test]
476    async fn test_read_stderr_should_return_content_if_available() {
477        let child = Command::new("rev")
478            .arg("--aaa")
479            .stdin(Stdio::null())
480            .stdout(Stdio::null())
481            .stderr(Stdio::piped())
482            .spawn()
483            .unwrap();
484
485        let mut local_proc = LocalProc::new(child).spawn();
486
487        let buf = timeout(Duration::from_millis(10), async {
488            loop {
489                match local_proc.read_stderr().await {
490                    Ok(buf) => {
491                        if !buf.is_empty() {
492                            break buf;
493                        }
494
495                        // NOTE: The read above is too quick as it only awaits
496                        //       for a lock, and thereby prevents switching
497                        //       to another task -- yield to enable switching
498                        task::yield_now().await;
499                    }
500                    Err(x) => panic!("Unexpected error: {}", x),
501                }
502            }
503        })
504        .await
505        .unwrap();
506
507        assert!(buf.len() > 0);
508    }
509
510    #[tokio::test]
511    async fn test_exit_status_should_return_none_if_not_exited() {
512        let child = Command::new("sleep")
513            .arg("60")
514            .stdin(Stdio::null())
515            .stdout(Stdio::null())
516            .stderr(Stdio::null())
517            .spawn()
518            .unwrap();
519        let mut local_proc = LocalProc::new(child).spawn();
520        match local_proc.exit_status().await {
521            None => (),
522            Some(x) => panic!("Unexpected content: {:?}", x),
523        }
524    }
525
526    #[tokio::test]
527    async fn test_exit_status_should_return_some_status_if_exited() {
528        let child = Command::new("echo")
529            .stdin(Stdio::null())
530            .stdout(Stdio::null())
531            .stderr(Stdio::null())
532            .spawn()
533            .unwrap();
534
535        let id = child.id();
536        let mut local_proc = LocalProc::new(child).spawn();
537
538        // Give process some time to run and complete
539        delay_for(Duration::from_millis(10)).await;
540
541        match local_proc.exit_status().await {
542            Some(status) => assert_eq!(status.id, id),
543            None => panic!("Unexpectedly got no result"),
544        }
545    }
546
547    #[tokio::test]
548    async fn test_exit_status_should_support_being_called_multiple_times_after_exit(
549    ) {
550        let child = Command::new("echo")
551            .stdin(Stdio::null())
552            .stdout(Stdio::null())
553            .stderr(Stdio::null())
554            .spawn()
555            .unwrap();
556
557        let mut local_proc = LocalProc::new(child).spawn();
558
559        // Give process some time to run and complete
560        delay_for(Duration::from_millis(10)).await;
561
562        assert!(local_proc.exit_status().await.is_some());
563        assert!(local_proc.exit_status().await.is_some());
564    }
565
566    #[tokio::test]
567    async fn kill_should_send_kill_request_to_process_without_waiting() {
568        let child = Command::new("sleep")
569            .arg("60")
570            .stdin(Stdio::null())
571            .stdout(Stdio::null())
572            .stderr(Stdio::null())
573            .spawn()
574            .unwrap();
575
576        let mut local_proc = LocalProc::new(child).spawn();
577        match local_proc.kill() {
578            Ok(_) => (),
579            Err(x) => panic!("Unexpected error: {}", x),
580        }
581    }
582
583    #[tokio::test]
584    async fn test_kill_and_wait_should_kill_and_return_process_result() {
585        let child = Command::new("sleep")
586            .arg("60")
587            .stdin(Stdio::null())
588            .stdout(Stdio::null())
589            .stderr(Stdio::null())
590            .spawn()
591            .unwrap();
592
593        let local_proc = LocalProc::new(child).spawn();
594        match local_proc.kill_and_wait().await {
595            Ok(_) => (),
596            Err(x) => panic!("Unexpected error: {}", x),
597        }
598    }
599}