io_tubes/tubes/
process.rs

1use std::{
2    ffi::OsStr,
3    io::{self, Error, ErrorKind},
4    pin::Pin,
5    process::Stdio,
6    task::{Context, Poll},
7};
8use tokio::{
9    io::{AsyncRead, AsyncWrite, ReadBuf},
10    process::{Child, ChildStdin, ChildStdout, Command},
11};
12
13/// A tube-like struct that allows easy access to spawned process's stdin and stdout.
14#[derive(Debug)]
15pub struct ProcessTube {
16    inner: Child,
17    stdin: ChildStdin,
18    stdout: ChildStdout,
19}
20
21impl ProcessTube {
22    /// Create a new ProcessTube by launching a program
23    pub fn new<S: AsRef<OsStr>>(program: S) -> io::Result<Self> {
24        Command::new(program).try_into()
25    }
26
27    /// Create a new ProcessTube using the specified command
28    pub fn from_command(cmd: Command) -> io::Result<Self> {
29        cmd.try_into()
30    }
31}
32
33impl TryFrom<Command> for ProcessTube {
34    type Error = io::Error;
35
36    fn try_from(mut value: Command) -> Result<Self, Self::Error> {
37        value
38            .stdin(Stdio::piped())
39            .stdout(Stdio::piped())
40            .spawn()?
41            .try_into()
42    }
43}
44
45impl TryFrom<Child> for ProcessTube {
46    type Error = io::Error;
47
48    fn try_from(mut inner: Child) -> Result<Self, Self::Error> {
49        let stdin = inner.stdin.take().ok_or_else(|| {
50            Error::new(ErrorKind::BrokenPipe, "Unable to extract stdin from child")
51        })?;
52        let stdout = inner.stdout.take().ok_or_else(|| {
53            Error::new(ErrorKind::BrokenPipe, "Unable to extract stdout from child")
54        })?;
55        Ok(ProcessTube {
56            inner,
57            stdin,
58            stdout,
59        })
60    }
61}
62
63impl From<ProcessTube> for Child {
64    fn from(mut tube: ProcessTube) -> Self {
65        tube.inner.stdin = Some(tube.stdin);
66        tube.inner.stdout = Some(tube.stdout);
67        tube.inner
68    }
69}
70
71impl AsyncRead for ProcessTube {
72    fn poll_read(
73        self: Pin<&mut Self>,
74        cx: &mut Context,
75        buf: &mut ReadBuf,
76    ) -> Poll<io::Result<()>> {
77        Pin::new(&mut self.get_mut().stdout).poll_read(cx, buf)
78    }
79}
80
81impl AsyncWrite for ProcessTube {
82    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
83        Pin::new(&mut self.get_mut().stdin).poll_write(cx, buf)
84    }
85
86    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
87        Pin::new(&mut self.get_mut().stdin).poll_flush(cx)
88    }
89
90    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
91        Pin::new(&mut self.get_mut().stdin).poll_shutdown(cx)
92    }
93
94    fn poll_write_vectored(
95        self: Pin<&mut Self>,
96        cx: &mut Context,
97        bufs: &[io::IoSlice],
98    ) -> Poll<Result<usize, io::Error>> {
99        Pin::new(&mut self.get_mut().stdin).poll_write_vectored(cx, bufs)
100    }
101
102    fn is_write_vectored(&self) -> bool {
103        self.stdin.is_write_vectored()
104    }
105}