tokio_pty_process_stream/
process.rs

1use futures::future::Future as _;
2use snafu::ResultExt as _;
3use std::os::unix::io::AsRawFd as _;
4use tokio::io::{AsyncRead as _, AsyncWrite as _};
5use tokio_pty_process::{CommandExt as _, PtyMaster as _};
6
7const READ_BUFFER_SIZE: usize = 4 * 1024;
8
9/// Represents events generated by the process.
10#[derive(Debug, PartialEq, Eq)]
11pub enum Event {
12    /// Emitted once the command has been successfully spawned.
13    CommandStart { cmd: String, args: Vec<String> },
14
15    /// Emitted every time the command produces output. Note that when a
16    /// process is running under a pty, both stdout and stderr are attached to
17    /// the single pty input - there is no way to differentiate them when
18    /// reading from the pty output.
19    Output { data: Vec<u8> },
20
21    /// Emitted when the command has exited.
22    CommandExit { status: std::process::ExitStatus },
23
24    /// Emitted by a `ResizingProcess` when a resize event happens
25    Resize { size: (u16, u16) },
26}
27
28struct State {
29    pty: Option<tokio_pty_process::AsyncPtyMaster>,
30    process: Option<tokio_pty_process::Child>,
31}
32
33impl State {
34    fn new() -> Self {
35        Self {
36            pty: None,
37            process: None,
38        }
39    }
40
41    fn pty(&self) -> &tokio_pty_process::AsyncPtyMaster {
42        self.pty.as_ref().unwrap()
43    }
44
45    fn pty_mut(&mut self) -> &mut tokio_pty_process::AsyncPtyMaster {
46        self.pty.as_mut().unwrap()
47    }
48
49    fn process(&mut self) -> &mut tokio_pty_process::Child {
50        self.process.as_mut().unwrap()
51    }
52}
53
54/// A spawned process.
55///
56/// Wraps `AsyncPtyMaster` and `Child` from `tokio-pty-process` to provide a
57/// view of the process as a single stream which emits events. In particular,
58/// the stream will return an event when the process starts, when it writes
59/// output to the pty, and when it exits. See the `Event` type for more
60/// details.
61pub struct Process<R: tokio::io::AsyncRead> {
62    state: State,
63    input: R,
64    input_buf: std::collections::VecDeque<u8>,
65    cmd: String,
66    args: Vec<String>,
67    buf: [u8; READ_BUFFER_SIZE],
68    started: bool,
69    exited: bool,
70    needs_resize: Option<(u16, u16)>,
71    stdin_closed: bool,
72    stdout_closed: bool,
73}
74
75impl<R: tokio::io::AsyncRead + 'static> Process<R> {
76    /// Creates a new process stream.
77    ///
78    /// The process is not spawned and the pty is not opened until `poll` is
79    /// called.
80    ///
81    /// Takes as input the command and arguments to run, as well as the
82    /// `AsyncRead` object to read input from. Typically you will pass in
83    /// something connected to stdin here, although other options may be more
84    /// useful for automation or testing.
85    pub fn new(cmd: &str, args: &[String], input: R) -> Self {
86        Self {
87            state: State::new(),
88            input,
89            input_buf: std::collections::VecDeque::new(),
90            cmd: cmd.to_string(),
91            args: args.to_vec(),
92            buf: [0; READ_BUFFER_SIZE],
93            started: false,
94            exited: false,
95            needs_resize: None,
96            stdin_closed: false,
97            stdout_closed: false,
98        }
99    }
100
101    /// Requests a change to the pty's terminal size.
102    ///
103    /// This will only be applied on the next call to `poll`.
104    pub fn resize(&mut self, rows: u16, cols: u16) {
105        self.needs_resize = Some((rows, cols));
106    }
107
108    /// Returns a mutable reference to the input object provided in the
109    /// constructor.
110    ///
111    /// This can be useful if you are driving the input manually, rather than
112    /// just hooking it up directly to stdin.
113    pub fn input(&mut self) -> &mut R {
114        &mut self.input
115    }
116}
117
118impl<R: tokio::io::AsyncRead + 'static> Process<R> {
119    const POLL_FNS:
120        &'static [&'static dyn for<'a> Fn(
121            &'a mut Self,
122        )
123            -> component_future::Poll<
124            Option<Event>,
125            crate::error::Error,
126        >] = &[
127        // order is important here - checking command_exit first so that we
128        // don't try to read from a process that has already exited, which
129        // causes an error. also, poll_resize needs to happen after
130        // poll_command_start, or else the pty might not be initialized.
131        &Self::poll_command_start,
132        &Self::poll_command_exit,
133        &Self::poll_resize,
134        &Self::poll_read_stdin,
135        &Self::poll_write_stdin,
136        &Self::poll_read_stdout,
137    ];
138
139    fn poll_resize(
140        &mut self,
141    ) -> component_future::Poll<Option<Event>, crate::error::Error> {
142        if let Some((rows, cols)) = &self.needs_resize {
143            component_future::try_ready!(self
144                .state
145                .pty()
146                .resize(*rows, *cols)
147                .context(crate::error::ResizePty));
148            log::debug!("resize({}x{})", cols, rows);
149            self.needs_resize = None;
150            Ok(component_future::Async::DidWork)
151        } else {
152            Ok(component_future::Async::NothingToDo)
153        }
154    }
155
156    fn poll_command_start(
157        &mut self,
158    ) -> component_future::Poll<Option<Event>, crate::error::Error> {
159        if self.started {
160            return Ok(component_future::Async::NothingToDo);
161        }
162
163        if self.state.pty.is_none() {
164            self.state.pty = Some(
165                tokio_pty_process::AsyncPtyMaster::open()
166                    .context(crate::error::OpenPty)?,
167            );
168            log::debug!(
169                "openpty({})",
170                self.state.pty.as_ref().unwrap().as_raw_fd()
171            );
172        }
173
174        if self.state.process.is_none() {
175            self.state.process = Some(
176                std::process::Command::new(&self.cmd)
177                    .args(&self.args)
178                    .spawn_pty_async(self.state.pty())
179                    .context(crate::error::SpawnProcess {
180                        cmd: self.cmd.clone(),
181                    })?,
182            );
183            log::debug!(
184                "spawn({})",
185                self.state.process.as_ref().unwrap().id()
186            );
187        }
188
189        self.started = true;
190        Ok(component_future::Async::Ready(Some(Event::CommandStart {
191            cmd: self.cmd.clone(),
192            args: self.args.clone(),
193        })))
194    }
195
196    fn poll_read_stdin(
197        &mut self,
198    ) -> component_future::Poll<Option<Event>, crate::error::Error> {
199        if self.exited || self.stdin_closed {
200            return Ok(component_future::Async::NothingToDo);
201        }
202
203        let n = component_future::try_ready!(self
204            .input
205            .poll_read(&mut self.buf)
206            .context(crate::error::ReadTerminal));
207        log::debug!("read_stdin({})", n);
208        if n > 0 {
209            self.input_buf.extend(self.buf[..n].iter());
210        } else {
211            self.input_buf.push_back(b'\x04');
212            self.stdin_closed = true;
213        }
214        Ok(component_future::Async::DidWork)
215    }
216
217    fn poll_write_stdin(
218        &mut self,
219    ) -> component_future::Poll<Option<Event>, crate::error::Error> {
220        if self.exited || self.input_buf.is_empty() {
221            return Ok(component_future::Async::NothingToDo);
222        }
223
224        let (a, b) = self.input_buf.as_slices();
225        let buf = if a.is_empty() { b } else { a };
226        let n = component_future::try_ready!(self
227            .state
228            .pty_mut()
229            .poll_write(buf)
230            .context(crate::error::WritePty));
231        log::debug!("write_stdin({})", n);
232        for _ in 0..n {
233            self.input_buf.pop_front();
234        }
235        Ok(component_future::Async::DidWork)
236    }
237
238    fn poll_read_stdout(
239        &mut self,
240    ) -> component_future::Poll<Option<Event>, crate::error::Error> {
241        match self
242            .state
243            .pty_mut()
244            .poll_read(&mut self.buf)
245            .context(crate::error::ReadPty)
246        {
247            Ok(futures::Async::Ready(n)) => {
248                log::debug!("read_stdout({})", n);
249                let bytes = self.buf[..n].to_vec();
250                Ok(component_future::Async::Ready(Some(Event::Output {
251                    data: bytes,
252                })))
253            }
254            Ok(futures::Async::NotReady) => {
255                Ok(component_future::Async::NotReady)
256            }
257            Err(e) => {
258                // XXX this seems to be how eof is returned, but this seems...
259                // wrong? i feel like there has to be a better way to do this
260                if let crate::error::Error::ReadPty { source } = &e {
261                    if source.kind() == std::io::ErrorKind::Other {
262                        log::debug!("read_stdout(eof)");
263                        self.stdout_closed = true;
264                        return Ok(component_future::Async::DidWork);
265                    }
266                }
267                Err(e)
268            }
269        }
270    }
271
272    fn poll_command_exit(
273        &mut self,
274    ) -> component_future::Poll<Option<Event>, crate::error::Error> {
275        if self.exited {
276            return Ok(component_future::Async::Ready(None));
277        }
278        if !self.stdout_closed {
279            return Ok(component_future::Async::NothingToDo);
280        }
281
282        let status = component_future::try_ready!(self
283            .state
284            .process()
285            .poll()
286            .context(crate::error::ProcessExitPoll));
287        log::debug!("exit({})", status);
288        self.exited = true;
289        Ok(component_future::Async::Ready(Some(Event::CommandExit {
290            status,
291        })))
292    }
293}
294
295#[must_use = "streams do nothing unless polled"]
296impl<R: tokio::io::AsyncRead + 'static> futures::stream::Stream
297    for Process<R>
298{
299    type Item = Event;
300    type Error = crate::error::Error;
301
302    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
303        component_future::poll_stream(self, Self::POLL_FNS)
304    }
305}
306
307#[cfg(test)]
308mod test {
309    use super::*;
310    use futures::sink::Sink as _;
311    use futures::stream::Stream as _;
312
313    #[test]
314    fn test_simple() {
315        let (wres, rres) = tokio::sync::mpsc::channel(100);
316        let wres2 = wres.clone();
317        let mut wres = wres.wait();
318        let buf = std::io::Cursor::new(b"hello world\n");
319        let fut = Process::new("cat", &[], buf)
320            .for_each(move |e| {
321                wres.send(Ok(e)).unwrap();
322                Ok(())
323            })
324            .map_err(|e| {
325                wres2.wait().send(Err(e)).unwrap();
326            });
327        tokio::run(fut);
328
329        let mut rres = rres.wait();
330
331        let event = rres.next();
332        let event = event.unwrap();
333        let event = event.unwrap();
334        let event = event.unwrap();
335        assert_eq!(
336            event,
337            Event::CommandStart {
338                cmd: "cat".to_string(),
339                args: vec![]
340            }
341        );
342
343        let mut output: Vec<u8> = vec![];
344        let mut exited = false;
345        for event in rres {
346            assert!(!exited);
347            let event = event.unwrap();
348            let event = event.unwrap();
349            match event {
350                Event::CommandStart { .. } => {
351                    panic!("unexpected CommandStart")
352                }
353                Event::Output { data } => {
354                    output.extend(data.iter());
355                }
356                Event::CommandExit { status } => {
357                    assert!(status.success());
358                    exited = true;
359                }
360                Event::Resize { .. } => {}
361            }
362        }
363        assert!(exited);
364        assert_eq!(output, b"hello world\r\nhello world\r\n");
365    }
366}