#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/tokio-pty-process/0.4.0")]
use async_trait::async_trait;
use futures::Future;
use io::{Read, Write};
use libc::{c_int, c_ushort};
use mio::event::Evented;
use mio::unix::{EventedFd, UnixReady};
use mio::{PollOpt, Ready, Token};
use std::ffi::{CStr, OsStr, OsString};
use std::fmt;
use std::fs::{File, OpenOptions};
use std::io::{self};
use std::mem;
use std::os::unix::prelude::*;
use std::os::unix::process::CommandExt as StdUnixCommandExt;
use std::{
pin::Pin,
process::{self, ExitStatus},
task::{Context, Poll},
};
use tokio::io::PollEvented;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::signal::unix::{signal, Signal, SignalKind};
mod split;
pub use split::{AsyncPtyMasterReadHalf, AsyncPtyMasterWriteHalf};
#[derive(Debug)]
struct AsyncPtyFile(File);
impl AsyncPtyFile {
pub fn new(inner: File) -> Self {
AsyncPtyFile(inner)
}
}
impl Read for AsyncPtyFile {
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
self.0.read(bytes)
}
}
impl Write for AsyncPtyFile {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.0.write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
impl Evented for AsyncPtyFile {
fn register(
&self,
poll: &mio::Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
EventedFd(&self.0.as_raw_fd()).register(poll, token, interest | UnixReady::hup(), opts)
}
fn reregister(
&self,
poll: &mio::Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest | UnixReady::hup(), opts)
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.0.as_raw_fd()).deregister(poll)
}
}
pub struct AsyncPtyMaster(PollEvented<AsyncPtyFile>);
impl AsyncPtyMaster {
pub fn open() -> Result<Self, io::Error> {
let inner = unsafe {
const APPLY_NONBLOCK_AFTER_OPEN: bool = cfg!(target_os = "freebsd");
let fd = if APPLY_NONBLOCK_AFTER_OPEN {
libc::posix_openpt(libc::O_RDWR | libc::O_NOCTTY)
} else {
libc::posix_openpt(libc::O_RDWR | libc::O_NOCTTY | libc::O_NONBLOCK)
};
if fd < 0 {
return Err(io::Error::last_os_error());
}
if libc::grantpt(fd) != 0 {
return Err(io::Error::last_os_error());
}
if libc::unlockpt(fd) != 0 {
return Err(io::Error::last_os_error());
}
if APPLY_NONBLOCK_AFTER_OPEN {
let flags = libc::fcntl(fd, libc::F_GETFL, 0);
if flags < 0 {
return Err(io::Error::last_os_error());
}
if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) == -1 {
return Err(io::Error::last_os_error());
}
}
File::from_raw_fd(fd)
};
let evented = PollEvented::new(AsyncPtyFile::new(inner))?;
Ok(AsyncPtyMaster(evented))
}
pub fn split(self) -> (AsyncPtyMasterReadHalf, AsyncPtyMasterWriteHalf) {
split::split(self)
}
fn open_sync_pty_slave(&self) -> Result<File, io::Error> {
let mut buf: [libc::c_char; 512] = [0; 512];
let fd = self.as_raw_fd();
#[cfg(not(any(target_os = "macos", target_os = "freebsd")))]
{
if unsafe { libc::ptsname_r(fd, buf.as_mut_ptr(), buf.len()) } != 0 {
return Err(io::Error::last_os_error());
}
}
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
unsafe {
let st = libc::ptsname(fd);
if st.is_null() {
return Err(io::Error::last_os_error());
}
libc::strncpy(buf.as_mut_ptr(), st, buf.len());
}
let ptsname = OsStr::from_bytes(unsafe { CStr::from_ptr(&buf as _) }.to_bytes());
OpenOptions::new().read(true).write(true).open(ptsname)
}
}
impl AsRawFd for AsyncPtyMaster {
fn as_raw_fd(&self) -> RawFd {
self.0.get_ref().0.as_raw_fd()
}
}
impl AsyncRead for AsyncPtyMaster {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<io::Result<usize>> {
AsyncRead::poll_read(Pin::new(&mut self.0), cx, buf)
}
}
impl AsyncWrite for AsyncPtyMaster {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
AsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
}
}
#[must_use = "futures do nothing unless polled"]
pub struct Child {
inner: process::Child,
kill_on_drop: bool,
reaped: bool,
sigchld: Signal,
}
impl fmt::Debug for Child {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Child")
.field("pid", &self.inner.id())
.field("inner", &self.inner)
.field("kill_on_drop", &self.kill_on_drop)
.field("reaped", &self.reaped)
.field("sigchld", &"..")
.finish()
}
}
impl Child {
fn new(inner: process::Child) -> Child {
Child {
inner: inner,
kill_on_drop: true,
reaped: false,
sigchld: signal(SignalKind::child()).expect("could not get sigchld signal"),
}
}
pub fn id(&self) -> u32 {
self.inner.id()
}
pub fn kill(&mut self) -> io::Result<()> {
if self.reaped {
Ok(())
} else {
self.inner.kill()
}
}
pub fn forget(mut self) {
self.kill_on_drop = false;
}
pub fn poll_exit(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<ExitStatus>> {
assert!(!self.reaped);
loop {
match self.try_wait() {
Ok(Some(status)) => {
self.reaped = true;
return Poll::Ready(Ok(status));
}
Err(e) => return Poll::Ready(Err(e)),
_ => {}
}
if self.sigchld.poll_recv(cx).is_pending() {
return Poll::Pending;
}
}
}
fn try_wait(&self) -> io::Result<Option<ExitStatus>> {
let id = self.id() as c_int;
let mut status = 0;
loop {
match unsafe { libc::waitpid(id, &mut status, libc::WNOHANG) } {
0 => return Ok(None),
n if n < 0 => {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(err);
}
n => {
assert_eq!(n, id);
return Ok(Some(ExitStatus::from_raw(status)));
}
}
}
}
}
impl Future for Child {
type Output = std::io::Result<ExitStatus>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_exit(cx)
}
}
impl Drop for Child {
fn drop(&mut self) {
if self.kill_on_drop {
drop(self.kill());
}
}
}
pub struct AsyncPtyFd<T: AsAsyncPtyFd>(T);
impl<T: AsAsyncPtyFd> AsyncPtyFd<T> {
pub fn from(inner: T) -> Self {
AsyncPtyFd(inner)
}
}
impl<T: AsAsyncPtyFd> Future for AsyncPtyFd<T> {
type Output = RawFd;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.0.as_async_pty_fd(cx)
}
}
pub trait AsAsyncPtyFd {
fn as_async_pty_fd(&self, cx: &mut Context<'_>) -> Poll<RawFd>;
}
impl AsAsyncPtyFd for AsyncPtyMaster {
fn as_async_pty_fd(&self, _cx: &mut Context<'_>) -> Poll<RawFd> {
Poll::Ready(self.as_raw_fd())
}
}
#[async_trait]
pub trait PtyMaster: PollPtyMaster {
async fn resize(&self, dimensions: (u16, u16)) -> Result<(), io::Error>;
async fn size(&self) -> Result<(u16, u16), io::Error>;
}
#[async_trait]
impl<T: Send + Sync> PtyMaster for T
where
T: PollPtyMaster,
{
async fn resize(&self, dimensions: (u16, u16)) -> Result<(), io::Error> {
let resize = Resize {
pty: self,
cols: dimensions.0,
rows: dimensions.1,
};
resize.await
}
async fn size(&self) -> Result<(u16, u16), io::Error> {
GetSize(self).await
}
}
pub trait PollPtyMaster {
fn poll_ptsname(&self, cx: &mut Context<'_>) -> Poll<Result<OsString, io::Error>>;
fn poll_resize(
&self,
cx: &mut Context<'_>,
rows: c_ushort,
cols: c_ushort,
) -> Poll<Result<(), io::Error>>;
fn poll_winsize(&self, cx: &mut Context<'_>) -> Poll<Result<(c_ushort, c_ushort), io::Error>>;
}
impl<T: AsAsyncPtyFd> PollPtyMaster for T {
fn poll_ptsname(&self, cx: &mut Context<'_>) -> Poll<Result<OsString, io::Error>> {
let mut buf: [libc::c_char; 512] = [0; 512];
let fd = futures::ready!(self.as_async_pty_fd(cx));
#[cfg(not(any(target_os = "macos", target_os = "freebsd")))]
{
if unsafe { libc::ptsname_r(fd, buf.as_mut_ptr(), buf.len()) } != 0 {
return Poll::Ready(Err(io::Error::last_os_error()));
}
}
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
unsafe {
let st = libc::ptsname(fd);
if st.is_null() {
return Poll::Ready(Err(io::Error::last_os_error()));
}
libc::strncpy(buf.as_mut_ptr(), st, buf.len());
}
let ptsname = OsStr::from_bytes(unsafe { CStr::from_ptr(&buf as _) }.to_bytes());
Poll::Ready(Ok(ptsname.to_os_string()))
}
fn poll_winsize(&self, cx: &mut Context<'_>) -> Poll<Result<(c_ushort, c_ushort), io::Error>> {
let fd = futures::ready!(self.as_async_pty_fd(cx));
let mut winsz: libc::winsize = unsafe { std::mem::zeroed() };
if unsafe { libc::ioctl(fd, libc::TIOCGWINSZ.into(), &mut winsz) } != 0 {
return Poll::Ready(Err(io::Error::last_os_error()));
}
Poll::Ready(Ok((winsz.ws_col, winsz.ws_row)))
}
fn poll_resize(
&self,
cx: &mut Context<'_>,
rows: c_ushort,
cols: c_ushort,
) -> Poll<Result<(), io::Error>> {
let fd = futures::ready!(self.as_async_pty_fd(cx));
let winsz = libc::winsize {
ws_row: rows,
ws_col: cols,
ws_xpixel: 0,
ws_ypixel: 0,
};
if unsafe { libc::ioctl(fd, libc::TIOCSWINSZ.into(), &winsz) } != 0 {
return Poll::Ready(Err(io::Error::last_os_error()));
}
Poll::Ready(Ok(()))
}
}
trait CommandExtInternal {
fn spawn_pty_async_full(&mut self, ptymaster: &AsyncPtyMaster, raw: bool) -> io::Result<Child>;
}
impl CommandExtInternal for process::Command {
fn spawn_pty_async_full(&mut self, ptymaster: &AsyncPtyMaster, raw: bool) -> io::Result<Child> {
let master_fd = ptymaster.as_raw_fd();
let slave = ptymaster.open_sync_pty_slave()?;
let slave_fd = slave.as_raw_fd();
self.stdin(slave.try_clone()?);
self.stdout(slave.try_clone()?);
self.stderr(slave);
unsafe {
self.pre_exec(move || {
if raw {
let mut attrs: libc::termios = mem::zeroed();
if libc::tcgetattr(slave_fd, &mut attrs as _) != 0 {
return Err(io::Error::last_os_error());
}
libc::cfmakeraw(&mut attrs as _);
if libc::tcsetattr(slave_fd, libc::TCSANOW, &attrs as _) != 0 {
return Err(io::Error::last_os_error());
}
}
if libc::close(master_fd) != 0 {
return Err(io::Error::last_os_error());
}
if libc::setsid() < 0 {
return Err(io::Error::last_os_error());
}
if libc::ioctl(0, libc::TIOCSCTTY.into(), 1) != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
});
}
Ok(Child::new(self.spawn()?))
}
}
pub trait CommandExt {
fn spawn_pty_async(&mut self, ptymaster: &AsyncPtyMaster) -> io::Result<Child>;
fn spawn_pty_async_raw(&mut self, ptymaster: &AsyncPtyMaster) -> io::Result<Child>;
}
impl CommandExt for process::Command {
fn spawn_pty_async(&mut self, ptymaster: &AsyncPtyMaster) -> io::Result<Child> {
self.spawn_pty_async_full(ptymaster, false)
}
fn spawn_pty_async_raw(&mut self, ptymaster: &AsyncPtyMaster) -> io::Result<Child> {
self.spawn_pty_async_full(ptymaster, true)
}
}
struct GetSize<'a, T: PtyMaster + Send>(&'a T);
impl<'a, T: PtyMaster + Send> Future for GetSize<'a, T> {
type Output = io::Result<(c_ushort, c_ushort)>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_winsize(cx)
}
}
struct Resize<'a, T: PtyMaster + Send> {
pub pty: &'a T,
pub rows: c_ushort,
pub cols: c_ushort,
}
impl<'a, T: PtyMaster + Send> Future for Resize<'a, T> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.pty.poll_resize(cx, self.rows, self.cols)
}
}
#[cfg(test)]
mod tests {
extern crate errno;
extern crate libc;
use super::*;
use futures::executor::block_on;
#[tokio::test]
async fn basic_nonblocking() {
let master = AsyncPtyMaster::open().unwrap();
let fd = master.as_raw_fd();
let mut buf = [0u8; 128];
let rval = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, 128) };
let errno: i32 = errno::errno().into();
assert_eq!(rval, -1);
assert_eq!(errno, libc::EWOULDBLOCK as i32);
}
#[tokio::test]
async fn test_winsize() {
let master = AsyncPtyMaster::open().expect("Could not open the PTY");
#[cfg(target_os = "macos")]
let mut child = std::process::Command::new("cat")
.spawn_pty_async(&master)
.expect("Could not spawn child");
block_on(Resize {
pty: &master,
cols: 80,
rows: 50,
})
.expect("Could not resize the PTY");
let (cols, rows) = block_on(GetSize(&master)).expect("Could not get PTY size");
assert_eq!(cols, 80);
assert_eq!(rows, 50);
#[cfg(target_os = "macos")]
child.kill().expect("Could not kill child");
}
#[tokio::test]
async fn test_size() {
let master = AsyncPtyMaster::open().expect("Could not open the PTY");
#[cfg(target_os = "macos")]
let mut child = std::process::Command::new("cat")
.spawn_pty_async(&master)
.expect("Could not spawn child");
let (_rows, _cols) = master.size().await.expect("Could not get PTY size");
#[cfg(target_os = "macos")]
child.kill().expect("Could not kill child");
}
#[tokio::test]
async fn test_resize() {
let master = AsyncPtyMaster::open().expect("Could not open the PTY");
#[cfg(target_os = "macos")]
let mut child = std::process::Command::new("cat")
.spawn_pty_async(&master)
.expect("Could not spawn child");
let _resize = master.resize((80, 50)).await.expect("resize failed");
#[cfg(target_os = "macos")]
child.kill().expect("Could not kill child");
}
#[tokio::test]
async fn test_from_fd() {
let master = AsyncPtyMaster::open().expect("Could not open the PTY");
let _fd = AsyncPtyFd::from(master).await;
}
}