tokio_pty_process_stream/
process.rs1use futures::future::Future as _;
2use snafu::ResultExt as _;
3use std::os::unix::io::AsRawFd as _;
4use tokio::io::{AsyncRead as _, AsyncWrite as _};
5use tokio_pty_process::{CommandExt as _, PtyMaster as _};
6
7const READ_BUFFER_SIZE: usize = 4 * 1024;
8
9#[derive(Debug, PartialEq, Eq)]
11pub enum Event {
12 CommandStart { cmd: String, args: Vec<String> },
14
15 Output { data: Vec<u8> },
20
21 CommandExit { status: std::process::ExitStatus },
23
24 Resize { size: (u16, u16) },
26}
27
28struct State {
29 pty: Option<tokio_pty_process::AsyncPtyMaster>,
30 process: Option<tokio_pty_process::Child>,
31}
32
33impl State {
34 fn new() -> Self {
35 Self {
36 pty: None,
37 process: None,
38 }
39 }
40
41 fn pty(&self) -> &tokio_pty_process::AsyncPtyMaster {
42 self.pty.as_ref().unwrap()
43 }
44
45 fn pty_mut(&mut self) -> &mut tokio_pty_process::AsyncPtyMaster {
46 self.pty.as_mut().unwrap()
47 }
48
49 fn process(&mut self) -> &mut tokio_pty_process::Child {
50 self.process.as_mut().unwrap()
51 }
52}
53
54pub struct Process<R: tokio::io::AsyncRead> {
62 state: State,
63 input: R,
64 input_buf: std::collections::VecDeque<u8>,
65 cmd: String,
66 args: Vec<String>,
67 buf: [u8; READ_BUFFER_SIZE],
68 started: bool,
69 exited: bool,
70 needs_resize: Option<(u16, u16)>,
71 stdin_closed: bool,
72 stdout_closed: bool,
73}
74
75impl<R: tokio::io::AsyncRead + 'static> Process<R> {
76 pub fn new(cmd: &str, args: &[String], input: R) -> Self {
86 Self {
87 state: State::new(),
88 input,
89 input_buf: std::collections::VecDeque::new(),
90 cmd: cmd.to_string(),
91 args: args.to_vec(),
92 buf: [0; READ_BUFFER_SIZE],
93 started: false,
94 exited: false,
95 needs_resize: None,
96 stdin_closed: false,
97 stdout_closed: false,
98 }
99 }
100
101 pub fn resize(&mut self, rows: u16, cols: u16) {
105 self.needs_resize = Some((rows, cols));
106 }
107
108 pub fn input(&mut self) -> &mut R {
114 &mut self.input
115 }
116}
117
118impl<R: tokio::io::AsyncRead + 'static> Process<R> {
119 const POLL_FNS:
120 &'static [&'static dyn for<'a> Fn(
121 &'a mut Self,
122 )
123 -> component_future::Poll<
124 Option<Event>,
125 crate::error::Error,
126 >] = &[
127 &Self::poll_command_start,
132 &Self::poll_command_exit,
133 &Self::poll_resize,
134 &Self::poll_read_stdin,
135 &Self::poll_write_stdin,
136 &Self::poll_read_stdout,
137 ];
138
139 fn poll_resize(
140 &mut self,
141 ) -> component_future::Poll<Option<Event>, crate::error::Error> {
142 if let Some((rows, cols)) = &self.needs_resize {
143 component_future::try_ready!(self
144 .state
145 .pty()
146 .resize(*rows, *cols)
147 .context(crate::error::ResizePty));
148 log::debug!("resize({}x{})", cols, rows);
149 self.needs_resize = None;
150 Ok(component_future::Async::DidWork)
151 } else {
152 Ok(component_future::Async::NothingToDo)
153 }
154 }
155
156 fn poll_command_start(
157 &mut self,
158 ) -> component_future::Poll<Option<Event>, crate::error::Error> {
159 if self.started {
160 return Ok(component_future::Async::NothingToDo);
161 }
162
163 if self.state.pty.is_none() {
164 self.state.pty = Some(
165 tokio_pty_process::AsyncPtyMaster::open()
166 .context(crate::error::OpenPty)?,
167 );
168 log::debug!(
169 "openpty({})",
170 self.state.pty.as_ref().unwrap().as_raw_fd()
171 );
172 }
173
174 if self.state.process.is_none() {
175 self.state.process = Some(
176 std::process::Command::new(&self.cmd)
177 .args(&self.args)
178 .spawn_pty_async(self.state.pty())
179 .context(crate::error::SpawnProcess {
180 cmd: self.cmd.clone(),
181 })?,
182 );
183 log::debug!(
184 "spawn({})",
185 self.state.process.as_ref().unwrap().id()
186 );
187 }
188
189 self.started = true;
190 Ok(component_future::Async::Ready(Some(Event::CommandStart {
191 cmd: self.cmd.clone(),
192 args: self.args.clone(),
193 })))
194 }
195
196 fn poll_read_stdin(
197 &mut self,
198 ) -> component_future::Poll<Option<Event>, crate::error::Error> {
199 if self.exited || self.stdin_closed {
200 return Ok(component_future::Async::NothingToDo);
201 }
202
203 let n = component_future::try_ready!(self
204 .input
205 .poll_read(&mut self.buf)
206 .context(crate::error::ReadTerminal));
207 log::debug!("read_stdin({})", n);
208 if n > 0 {
209 self.input_buf.extend(self.buf[..n].iter());
210 } else {
211 self.input_buf.push_back(b'\x04');
212 self.stdin_closed = true;
213 }
214 Ok(component_future::Async::DidWork)
215 }
216
217 fn poll_write_stdin(
218 &mut self,
219 ) -> component_future::Poll<Option<Event>, crate::error::Error> {
220 if self.exited || self.input_buf.is_empty() {
221 return Ok(component_future::Async::NothingToDo);
222 }
223
224 let (a, b) = self.input_buf.as_slices();
225 let buf = if a.is_empty() { b } else { a };
226 let n = component_future::try_ready!(self
227 .state
228 .pty_mut()
229 .poll_write(buf)
230 .context(crate::error::WritePty));
231 log::debug!("write_stdin({})", n);
232 for _ in 0..n {
233 self.input_buf.pop_front();
234 }
235 Ok(component_future::Async::DidWork)
236 }
237
238 fn poll_read_stdout(
239 &mut self,
240 ) -> component_future::Poll<Option<Event>, crate::error::Error> {
241 match self
242 .state
243 .pty_mut()
244 .poll_read(&mut self.buf)
245 .context(crate::error::ReadPty)
246 {
247 Ok(futures::Async::Ready(n)) => {
248 log::debug!("read_stdout({})", n);
249 let bytes = self.buf[..n].to_vec();
250 Ok(component_future::Async::Ready(Some(Event::Output {
251 data: bytes,
252 })))
253 }
254 Ok(futures::Async::NotReady) => {
255 Ok(component_future::Async::NotReady)
256 }
257 Err(e) => {
258 if let crate::error::Error::ReadPty { source } = &e {
261 if source.kind() == std::io::ErrorKind::Other {
262 log::debug!("read_stdout(eof)");
263 self.stdout_closed = true;
264 return Ok(component_future::Async::DidWork);
265 }
266 }
267 Err(e)
268 }
269 }
270 }
271
272 fn poll_command_exit(
273 &mut self,
274 ) -> component_future::Poll<Option<Event>, crate::error::Error> {
275 if self.exited {
276 return Ok(component_future::Async::Ready(None));
277 }
278 if !self.stdout_closed {
279 return Ok(component_future::Async::NothingToDo);
280 }
281
282 let status = component_future::try_ready!(self
283 .state
284 .process()
285 .poll()
286 .context(crate::error::ProcessExitPoll));
287 log::debug!("exit({})", status);
288 self.exited = true;
289 Ok(component_future::Async::Ready(Some(Event::CommandExit {
290 status,
291 })))
292 }
293}
294
295#[must_use = "streams do nothing unless polled"]
296impl<R: tokio::io::AsyncRead + 'static> futures::stream::Stream
297 for Process<R>
298{
299 type Item = Event;
300 type Error = crate::error::Error;
301
302 fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
303 component_future::poll_stream(self, Self::POLL_FNS)
304 }
305}
306
307#[cfg(test)]
308mod test {
309 use super::*;
310 use futures::sink::Sink as _;
311 use futures::stream::Stream as _;
312
313 #[test]
314 fn test_simple() {
315 let (wres, rres) = tokio::sync::mpsc::channel(100);
316 let wres2 = wres.clone();
317 let mut wres = wres.wait();
318 let buf = std::io::Cursor::new(b"hello world\n");
319 let fut = Process::new("cat", &[], buf)
320 .for_each(move |e| {
321 wres.send(Ok(e)).unwrap();
322 Ok(())
323 })
324 .map_err(|e| {
325 wres2.wait().send(Err(e)).unwrap();
326 });
327 tokio::run(fut);
328
329 let mut rres = rres.wait();
330
331 let event = rres.next();
332 let event = event.unwrap();
333 let event = event.unwrap();
334 let event = event.unwrap();
335 assert_eq!(
336 event,
337 Event::CommandStart {
338 cmd: "cat".to_string(),
339 args: vec![]
340 }
341 );
342
343 let mut output: Vec<u8> = vec![];
344 let mut exited = false;
345 for event in rres {
346 assert!(!exited);
347 let event = event.unwrap();
348 let event = event.unwrap();
349 match event {
350 Event::CommandStart { .. } => {
351 panic!("unexpected CommandStart")
352 }
353 Event::Output { data } => {
354 output.extend(data.iter());
355 }
356 Event::CommandExit { status } => {
357 assert!(status.success());
358 exited = true;
359 }
360 Event::Resize { .. } => {}
361 }
362 }
363 assert!(exited);
364 assert_eq!(output, b"hello world\r\nhello world\r\n");
365 }
366}