pty_process/
pty.rs

1#![allow(clippy::module_name_repetitions)]
2
3use std::io::{Read as _, Write as _};
4
5type AsyncPty = tokio::io::unix::AsyncFd<crate::sys::Pty>;
6
7/// Allocate and return a new pty and pts.
8///
9/// # Errors
10/// Returns an error if the pty failed to be allocated, or if we were
11/// unable to put it into non-blocking mode.
12pub fn open() -> crate::Result<(Pty, Pts)> {
13    let pty = crate::sys::Pty::open()?;
14    let pts = pty.pts()?;
15    pty.set_nonblocking()?;
16    let pty = tokio::io::unix::AsyncFd::new(pty)?;
17    Ok((Pty(pty), Pts(pts)))
18}
19
20/// An allocated pty
21pub struct Pty(AsyncPty);
22
23impl Pty {
24    /// Use the provided file descriptor as a pty.
25    ///
26    /// # Safety
27    /// The provided file descriptor must be valid, open, belong to a pty,
28    /// and put into nonblocking mode.
29    ///
30    /// # Errors
31    /// Returns an error if it fails to be registered with the async runtime.
32    pub unsafe fn from_fd(fd: std::os::fd::OwnedFd) -> crate::Result<Self> {
33        Ok(Self(tokio::io::unix::AsyncFd::new(unsafe {
34            crate::sys::Pty::from_fd(fd)
35        })?))
36    }
37
38    /// Change the terminal size associated with the pty.
39    ///
40    /// # Errors
41    /// Returns an error if we were unable to set the terminal size.
42    pub fn resize(&self, size: crate::Size) -> crate::Result<()> {
43        self.0.get_ref().set_term_size(size)
44    }
45
46    /// Splits a `Pty` into a read half and a write half, which can be used to
47    /// read from and write to the pty concurrently. Does not allocate, but
48    /// the returned halves cannot be moved to independent tasks.
49    pub fn split(&mut self) -> (ReadPty<'_>, WritePty<'_>) {
50        (ReadPty(&self.0), WritePty(&self.0))
51    }
52
53    /// Splits a `Pty` into a read half and a write half, which can be used to
54    /// read from and write to the pty concurrently. This method requires an
55    /// allocation, but the returned halves can be moved to independent tasks.
56    /// The original `Pty` instance can be recovered via the
57    /// [`OwnedReadPty::unsplit`] method.
58    #[must_use]
59    pub fn into_split(self) -> (OwnedReadPty, OwnedWritePty) {
60        let Self(pt) = self;
61        let read_pt = std::sync::Arc::new(pt);
62        let write_pt = std::sync::Arc::clone(&read_pt);
63        (OwnedReadPty(read_pt), OwnedWritePty(write_pt))
64    }
65}
66
67impl From<Pty> for std::os::fd::OwnedFd {
68    fn from(pty: Pty) -> Self {
69        pty.0.into_inner().into()
70    }
71}
72
73impl std::os::fd::AsFd for Pty {
74    fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
75        self.0.get_ref().as_fd()
76    }
77}
78
79impl std::os::fd::AsRawFd for Pty {
80    fn as_raw_fd(&self) -> std::os::fd::RawFd {
81        self.0.get_ref().as_raw_fd()
82    }
83}
84
85impl tokio::io::AsyncRead for Pty {
86    fn poll_read(
87        self: std::pin::Pin<&mut Self>,
88        cx: &mut std::task::Context<'_>,
89        buf: &mut tokio::io::ReadBuf,
90    ) -> std::task::Poll<std::io::Result<()>> {
91        loop {
92            let mut guard = match self.0.poll_read_ready(cx) {
93                std::task::Poll::Ready(guard) => guard,
94                std::task::Poll::Pending => return std::task::Poll::Pending,
95            }?;
96            // XXX should be able to optimize this once read_buf is stabilized
97            // in std
98            let b = buf.initialize_unfilled();
99            match guard.try_io(|inner| inner.get_ref().read(b)) {
100                Ok(Ok(bytes)) => {
101                    buf.advance(bytes);
102                    return std::task::Poll::Ready(Ok(()));
103                }
104                Ok(Err(e)) => return std::task::Poll::Ready(Err(e)),
105                Err(_would_block) => {}
106            }
107        }
108    }
109}
110
111impl tokio::io::AsyncWrite for Pty {
112    fn poll_write(
113        self: std::pin::Pin<&mut Self>,
114        cx: &mut std::task::Context<'_>,
115        buf: &[u8],
116    ) -> std::task::Poll<std::io::Result<usize>> {
117        loop {
118            let mut guard = match self.0.poll_write_ready(cx) {
119                std::task::Poll::Ready(guard) => guard,
120                std::task::Poll::Pending => return std::task::Poll::Pending,
121            }?;
122            match guard.try_io(|inner| inner.get_ref().write(buf)) {
123                Ok(result) => return std::task::Poll::Ready(result),
124                Err(_would_block) => {}
125            }
126        }
127    }
128
129    fn poll_flush(
130        self: std::pin::Pin<&mut Self>,
131        cx: &mut std::task::Context<'_>,
132    ) -> std::task::Poll<std::io::Result<()>> {
133        loop {
134            let mut guard = match self.0.poll_write_ready(cx) {
135                std::task::Poll::Ready(guard) => guard,
136                std::task::Poll::Pending => return std::task::Poll::Pending,
137            }?;
138            match guard.try_io(|inner| inner.get_ref().flush()) {
139                Ok(_) => return std::task::Poll::Ready(Ok(())),
140                Err(_would_block) => {}
141            }
142        }
143    }
144
145    fn poll_shutdown(
146        self: std::pin::Pin<&mut Self>,
147        _cx: &mut std::task::Context<'_>,
148    ) -> std::task::Poll<Result<(), std::io::Error>> {
149        std::task::Poll::Ready(Ok(()))
150    }
151}
152
153/// The child end of the pty
154///
155/// See [`open`] and [`Command::spawn`](crate::Command::spawn)
156pub struct Pts(pub(crate) crate::sys::Pts);
157
158impl Pts {
159    /// Use the provided file descriptor as a pts.
160    ///
161    /// # Safety
162    /// The provided file descriptor must be valid, open, and belong to the
163    /// child end of a pty.
164    #[must_use]
165    pub unsafe fn from_fd(fd: std::os::fd::OwnedFd) -> Self {
166        Self(unsafe { crate::sys::Pts::from_fd(fd) })
167    }
168}
169
170impl std::os::fd::AsFd for Pts {
171    fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
172        self.0.as_fd()
173    }
174}
175
176impl std::os::fd::AsRawFd for Pts {
177    fn as_raw_fd(&self) -> std::os::fd::RawFd {
178        self.0.as_raw_fd()
179    }
180}
181
182/// Borrowed read half of a [`Pty`]
183pub struct ReadPty<'a>(&'a AsyncPty);
184
185impl tokio::io::AsyncRead for ReadPty<'_> {
186    fn poll_read(
187        self: std::pin::Pin<&mut Self>,
188        cx: &mut std::task::Context<'_>,
189        buf: &mut tokio::io::ReadBuf,
190    ) -> std::task::Poll<std::io::Result<()>> {
191        loop {
192            let mut guard = match self.0.poll_read_ready(cx) {
193                std::task::Poll::Ready(guard) => guard,
194                std::task::Poll::Pending => return std::task::Poll::Pending,
195            }?;
196            // XXX should be able to optimize this once read_buf is stabilized
197            // in std
198            let b = buf.initialize_unfilled();
199            match guard.try_io(|inner| inner.get_ref().read(b)) {
200                Ok(Ok(bytes)) => {
201                    buf.advance(bytes);
202                    return std::task::Poll::Ready(Ok(()));
203                }
204                Ok(Err(e)) => return std::task::Poll::Ready(Err(e)),
205                Err(_would_block) => {}
206            }
207        }
208    }
209}
210
211/// Borrowed write half of a [`Pty`]
212pub struct WritePty<'a>(&'a AsyncPty);
213
214impl WritePty<'_> {
215    /// Change the terminal size associated with the pty.
216    ///
217    /// # Errors
218    /// Returns an error if we were unable to set the terminal size.
219    pub fn resize(&self, size: crate::Size) -> crate::Result<()> {
220        self.0.get_ref().set_term_size(size)
221    }
222}
223
224impl tokio::io::AsyncWrite for WritePty<'_> {
225    fn poll_write(
226        self: std::pin::Pin<&mut Self>,
227        cx: &mut std::task::Context<'_>,
228        buf: &[u8],
229    ) -> std::task::Poll<std::io::Result<usize>> {
230        loop {
231            let mut guard = match self.0.poll_write_ready(cx) {
232                std::task::Poll::Ready(guard) => guard,
233                std::task::Poll::Pending => return std::task::Poll::Pending,
234            }?;
235            match guard.try_io(|inner| inner.get_ref().write(buf)) {
236                Ok(result) => return std::task::Poll::Ready(result),
237                Err(_would_block) => {}
238            }
239        }
240    }
241
242    fn poll_flush(
243        self: std::pin::Pin<&mut Self>,
244        cx: &mut std::task::Context<'_>,
245    ) -> std::task::Poll<std::io::Result<()>> {
246        loop {
247            let mut guard = match self.0.poll_write_ready(cx) {
248                std::task::Poll::Ready(guard) => guard,
249                std::task::Poll::Pending => return std::task::Poll::Pending,
250            }?;
251            match guard.try_io(|inner| inner.get_ref().flush()) {
252                Ok(_) => return std::task::Poll::Ready(Ok(())),
253                Err(_would_block) => {}
254            }
255        }
256    }
257
258    fn poll_shutdown(
259        self: std::pin::Pin<&mut Self>,
260        _cx: &mut std::task::Context<'_>,
261    ) -> std::task::Poll<Result<(), std::io::Error>> {
262        std::task::Poll::Ready(Ok(()))
263    }
264}
265
266/// Owned read half of a [`Pty`]
267#[derive(Debug)]
268pub struct OwnedReadPty(std::sync::Arc<AsyncPty>);
269
270impl OwnedReadPty {
271    /// Attempt to join the two halves of a `Pty` back into a single instance.
272    /// The two halves must have originated from calling
273    /// [`into_split`](Pty::into_split) on a single instance.
274    ///
275    /// # Errors
276    /// Returns an error if the two halves came from different [`Pty`]
277    /// instances. The mismatched halves are returned as part of the error.
278    pub fn unsplit(self, write_half: OwnedWritePty) -> crate::Result<Pty> {
279        let Self(read_pt) = self;
280        let OwnedWritePty(write_pt) = write_half;
281        if std::sync::Arc::ptr_eq(&read_pt, &write_pt) {
282            drop(write_pt);
283            Ok(Pty(std::sync::Arc::try_unwrap(read_pt)
284                // it shouldn't be possible for more than two references to
285                // the same pty to exist
286                .unwrap_or_else(|_| unreachable!())))
287        } else {
288            Err(crate::Error::Unsplit(
289                Self(read_pt),
290                OwnedWritePty(write_pt),
291            ))
292        }
293    }
294}
295
296impl tokio::io::AsyncRead for OwnedReadPty {
297    fn poll_read(
298        self: std::pin::Pin<&mut Self>,
299        cx: &mut std::task::Context<'_>,
300        buf: &mut tokio::io::ReadBuf,
301    ) -> std::task::Poll<std::io::Result<()>> {
302        loop {
303            let mut guard = match self.0.poll_read_ready(cx) {
304                std::task::Poll::Ready(guard) => guard,
305                std::task::Poll::Pending => return std::task::Poll::Pending,
306            }?;
307            // XXX should be able to optimize this once read_buf is stabilized
308            // in std
309            let b = buf.initialize_unfilled();
310            match guard.try_io(|inner| inner.get_ref().read(b)) {
311                Ok(Ok(bytes)) => {
312                    buf.advance(bytes);
313                    return std::task::Poll::Ready(Ok(()));
314                }
315                Ok(Err(e)) => return std::task::Poll::Ready(Err(e)),
316                Err(_would_block) => {}
317            }
318        }
319    }
320}
321
322/// Owned write half of a [`Pty`]
323#[derive(Debug)]
324pub struct OwnedWritePty(std::sync::Arc<AsyncPty>);
325
326impl OwnedWritePty {
327    /// Change the terminal size associated with the pty.
328    ///
329    /// # Errors
330    /// Returns an error if we were unable to set the terminal size.
331    pub fn resize(&self, size: crate::Size) -> crate::Result<()> {
332        self.0.get_ref().set_term_size(size)
333    }
334}
335
336impl tokio::io::AsyncWrite for OwnedWritePty {
337    fn poll_write(
338        self: std::pin::Pin<&mut Self>,
339        cx: &mut std::task::Context<'_>,
340        buf: &[u8],
341    ) -> std::task::Poll<std::io::Result<usize>> {
342        loop {
343            let mut guard = match self.0.poll_write_ready(cx) {
344                std::task::Poll::Ready(guard) => guard,
345                std::task::Poll::Pending => return std::task::Poll::Pending,
346            }?;
347            match guard.try_io(|inner| inner.get_ref().write(buf)) {
348                Ok(result) => return std::task::Poll::Ready(result),
349                Err(_would_block) => {}
350            }
351        }
352    }
353
354    fn poll_flush(
355        self: std::pin::Pin<&mut Self>,
356        cx: &mut std::task::Context<'_>,
357    ) -> std::task::Poll<std::io::Result<()>> {
358        loop {
359            let mut guard = match self.0.poll_write_ready(cx) {
360                std::task::Poll::Ready(guard) => guard,
361                std::task::Poll::Pending => return std::task::Poll::Pending,
362            }?;
363            match guard.try_io(|inner| inner.get_ref().flush()) {
364                Ok(_) => return std::task::Poll::Ready(Ok(())),
365                Err(_would_block) => {}
366            }
367        }
368    }
369
370    fn poll_shutdown(
371        self: std::pin::Pin<&mut Self>,
372        _cx: &mut std::task::Context<'_>,
373    ) -> std::task::Poll<Result<(), std::io::Error>> {
374        std::task::Poll::Ready(Ok(()))
375    }
376}