io_streams/
streams.rs

1use crate::lockers::{StderrLocker, StdinLocker, StdoutLocker};
2#[cfg(feature = "char-device")]
3use char_device::CharDevice;
4use duplex::Duplex;
5use io_extras::grip::{AsRawGrip, AsRawReadWriteGrip, FromRawGrip, RawGrip};
6#[cfg(windows)]
7use io_extras::os::windows::{
8    AsHandleOrSocket, AsRawHandleOrSocket, AsRawReadWriteHandleOrSocket, AsReadWriteHandleOrSocket,
9    BorrowedHandleOrSocket, RawHandleOrSocket,
10};
11use io_extras::raw::{RawReadable, RawWriteable};
12use io_lifetimes::{FromFilelike, FromSocketlike, IntoFilelike, IntoSocketlike};
13use std::fmt::{self, Arguments, Debug};
14use std::fs::{File, OpenOptions};
15use std::io::{self, IoSlice, IoSliceMut, Read, Seek, Write};
16use std::net::TcpStream;
17#[cfg(unix)]
18use std::os::unix::{
19    io::{AsRawFd, RawFd},
20    net::UnixStream,
21};
22#[cfg(target_os = "wasi")]
23use std::os::wasi::io::{AsRawFd, RawFd};
24use system_interface::io::{Peek, ReadReady};
25#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
26use {
27    duplex::HalfDuplex,
28    socketpair::{socketpair_stream, SocketpairStream},
29};
30#[cfg(not(windows))]
31use {
32    io_extras::os::rustix::{AsRawReadWriteFd, AsReadWriteFd},
33    io_lifetimes::{AsFd, BorrowedFd},
34};
35#[cfg(not(target_os = "wasi"))]
36use {
37    // WASI doesn't support pipes yet
38    os_pipe::{pipe, PipeReader, PipeWriter},
39    std::{
40        io::{copy, Cursor},
41        process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, Stdio},
42        thread::{self, JoinHandle},
43    },
44};
45
46/// An unbuffered and unlocked input byte stream, implementing [`Read`],
47/// abstracted over the source of the input.
48///
49/// It primarily consists of a single file handle, and also contains any
50/// resources needed to safely hold the file handle live.
51///
52/// Since it is unbuffered, and since many input sources have high per-call
53/// overhead, it is often beneficial to wrap this in a [`BufReader`].
54///
55/// [`BufReader`]: std::io::BufReader
56pub struct StreamReader {
57    handle: RawReadable,
58    resources: ReadResources,
59}
60
61/// An unbuffered and unlocked output byte stream, implementing [`Write`],
62/// abstracted over the destination of the output.
63///
64/// It primarily consists of a single file handle, and also contains any
65/// resources needed to safely hold the file handle live.
66///
67/// Since it is unbuffered, and since many destinations have high per-call
68/// overhead, it is often beneficial to wrap this in a [`BufWriter`] or
69/// [`LineWriter`].
70///
71/// [`BufWriter`]: std::io::BufWriter
72/// [`LineWriter`]: std::io::LineWriter
73pub struct StreamWriter {
74    handle: RawWriteable,
75    resources: WriteResources,
76}
77
78/// An unbuffered and unlocked interactive combination input and output stream,
79/// implementing [`Read`] and [`Write`].
80///
81/// This may hold two file descriptors, one for reading and one for writing,
82/// such as stdin and stdout, or it may hold one file handle for both
83/// reading and writing, such as for a TCP socket.
84///
85/// There is no `file` constructor, even though [`File`] implements both `Read`
86/// and `Write`, because normal files are not interactive. However, there is a
87/// `char_device` constructor for [character device files].
88///
89/// [`File`]: std::fs::File
90/// [character device files]: https://docs.rs/char-device/latest/char_device/struct.CharDevice.html
91pub struct StreamDuplexer {
92    read_handle: RawReadable,
93    write_handle: RawWriteable,
94    resources: DuplexResources,
95}
96
97// The Windows [`HANDLE`] type may be transferred across and shared between
98// thread boundaries (despite containing a `*mut void`, which in general isn't
99// `Send` or `Sync`).
100//
101// [`HANDLE`]: std::os::windows::raw::HANDLE
102#[cfg(windows)]
103unsafe impl Send for StreamReader {}
104#[cfg(windows)]
105unsafe impl Sync for StreamReader {}
106#[cfg(windows)]
107unsafe impl Send for StreamWriter {}
108#[cfg(windows)]
109unsafe impl Sync for StreamWriter {}
110#[cfg(windows)]
111unsafe impl Send for StreamDuplexer {}
112#[cfg(windows)]
113unsafe impl Sync for StreamDuplexer {}
114
115/// Additional resources that need to be held in order to keep the stream live.
116enum ReadResources {
117    File(File),
118    TcpStream(TcpStream),
119    #[cfg(unix)]
120    UnixStream(UnixStream),
121    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
122    PipeReader(PipeReader),
123    Stdin(StdinLocker),
124    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
125    PipedThread(Option<(PipeReader, JoinHandle<io::Result<()>>)>),
126    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
127    Child(Child),
128    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
129    ChildStdout(ChildStdout),
130    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
131    ChildStderr(ChildStderr),
132}
133
134/// Additional resources that need to be held in order to keep the stream live.
135#[allow(dead_code)] // Fields are never explicitly used, but held because of their `Drop`.
136enum WriteResources {
137    File(File),
138    TcpStream(TcpStream),
139    #[cfg(unix)]
140    UnixStream(UnixStream),
141    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
142    PipeWriter(PipeWriter),
143    Stdout(StdoutLocker),
144    Stderr(StderrLocker),
145    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
146    PipedThread(Option<(PipeWriter, JoinHandle<io::Result<Box<dyn Write + Send>>>)>),
147    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
148    Child(Child),
149    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
150    ChildStdin(ChildStdin),
151}
152
153/// Additional resources that need to be held in order to keep the stream live.
154enum DuplexResources {
155    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
156    PipeReaderWriter((PipeReader, PipeWriter)),
157    StdinStdout((StdinLocker, StdoutLocker)),
158    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
159    Child(Child),
160    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
161    ChildStdoutStdin((ChildStdout, ChildStdin)),
162    #[cfg(feature = "char-device")]
163    CharDevice(CharDevice),
164    DevNull(File),
165    TcpStream(TcpStream),
166    #[cfg(unix)]
167    UnixStream(UnixStream),
168    #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
169    SocketpairStream(SocketpairStream),
170    #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
171    SocketedThreadFunc(Option<(SocketpairStream, JoinHandle<io::Result<SocketpairStream>>)>),
172    #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
173    SocketedThread(
174        Option<(
175            SocketpairStream,
176            JoinHandle<io::Result<Box<dyn HalfDuplex + Send>>>,
177        )>,
178    ),
179    #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
180    SocketedThreadReadReady(
181        Option<(
182            SocketpairStream,
183            JoinHandle<io::Result<Box<dyn HalfDuplexReadReady + Send>>>,
184        )>,
185    ),
186}
187
188impl StreamReader {
189    /// Read from standard input.
190    ///
191    /// Unlike [`std::io::stdin`], this `stdin` returns a stream which is
192    /// unbuffered and unlocked.
193    ///
194    /// Since it is unbuffered, it is often beneficial to wrap the resulting
195    /// `StreamReader` in a [`BufReader`].
196    ///
197    /// This acquires a [`std::io::StdinLock`] (in a non-recursive way) to
198    /// prevent accesses to `std::io::Stdin` while this is live, and fails if a
199    /// `StreamReader` or `StreamDuplexer` for standard input already exists.
200    ///
201    /// [`BufReader`]: std::io::BufReader
202    #[inline]
203    pub fn stdin() -> io::Result<Self> {
204        let stdin_locker = StdinLocker::new()?;
205
206        // Obtain stdin's handle.
207        #[cfg(not(windows))]
208        let handle = stdin_locker.as_raw_fd();
209
210        // On Windows, stdin may be connected to a UTF-16 console, which
211        // `RawHandleOrSocket` can take care of for us.
212        #[cfg(windows)]
213        let handle = RawHandleOrSocket::stdin();
214
215        Ok(Self::handle(handle, ReadResources::Stdin(stdin_locker)))
216    }
217
218    /// Read from an open file, taking ownership of it.
219    ///
220    /// This method can be passed a [`std::fs::File`] or similar `File` types.
221    #[inline]
222    #[must_use]
223    pub fn file<Filelike: IntoFilelike + Read + Write + Seek>(filelike: Filelike) -> Self {
224        // Safety: We don't implement `From`/`Into` to allow the inner `File`
225        // to be extracted, so we don't need to worry that we're granting
226        // ambient authorities here.
227        Self::_file(File::from_into_filelike(filelike))
228    }
229
230    #[inline]
231    #[must_use]
232    fn _file(file: File) -> Self {
233        let handle = file.as_raw_grip();
234        Self::handle(handle, ReadResources::File(file))
235    }
236
237    /// Read from an open TCP stream, taking ownership of it.
238    ///
239    /// This method can be passed a [`std::net::TcpStream`] or similar
240    /// `TcpStream` types.
241    #[inline]
242    #[must_use]
243    pub fn tcp_stream<Socketlike: IntoSocketlike>(socketlike: Socketlike) -> Self {
244        Self::_tcp_stream(TcpStream::from_into_socketlike(socketlike))
245    }
246
247    #[inline]
248    #[must_use]
249    fn _tcp_stream(tcp_stream: TcpStream) -> Self {
250        let handle = tcp_stream.as_raw_grip();
251        // Safety: We don't implement `From`/`Into` to allow the inner
252        // `TcpStream` to be extracted, so we don't need to worry that
253        // we're granting ambient authorities here.
254        Self::handle(handle, ReadResources::TcpStream(tcp_stream))
255    }
256
257    /// Read from an open Unix-domain socket, taking ownership of it.
258    #[cfg(unix)]
259    #[inline]
260    #[must_use]
261    pub fn unix_stream(unix_stream: UnixStream) -> Self {
262        let handle = unix_stream.as_raw_grip();
263        Self::handle(handle, ReadResources::UnixStream(unix_stream))
264    }
265
266    /// Read from the reading end of an open pipe, taking ownership of it.
267    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
268    #[inline]
269    #[must_use]
270    pub fn pipe_reader(pipe_reader: PipeReader) -> Self {
271        let handle = pipe_reader.as_raw_grip();
272        Self::handle(handle, ReadResources::PipeReader(pipe_reader))
273    }
274
275    /// Spawn the given command and read from its standard output.
276    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
277    pub fn read_from_command(mut command: Command) -> io::Result<Self> {
278        command.stdin(Stdio::null());
279        command.stdout(Stdio::piped());
280        let child = command.spawn()?;
281        let handle = child.stdout.as_ref().unwrap().as_raw_grip();
282        Ok(Self::handle(handle, ReadResources::Child(child)))
283    }
284
285    /// Read from a child process' standard output, taking ownership of it.
286    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
287    #[inline]
288    #[must_use]
289    pub fn child_stdout(child_stdout: ChildStdout) -> Self {
290        let handle = child_stdout.as_raw_grip();
291        Self::handle(handle, ReadResources::ChildStdout(child_stdout))
292    }
293
294    /// Read from a child process' standard error, taking ownership of it.
295    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
296    #[inline]
297    #[must_use]
298    pub fn child_stderr(child_stderr: ChildStderr) -> Self {
299        let handle = child_stderr.as_raw_grip();
300        Self::handle(handle, ReadResources::ChildStderr(child_stderr))
301    }
302
303    /// Read from a boxed `Read` implementation, taking ownership of it. This
304    /// works by creating a new thread to read the data and write it through a
305    /// pipe.
306    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
307    pub fn piped_thread(mut boxed_read: Box<dyn Read + Send>) -> io::Result<Self> {
308        let (pipe_reader, mut pipe_writer) = pipe()?;
309        let join_handle = thread::Builder::new()
310            .name("piped thread for boxed reader".to_owned())
311            .spawn(move || copy(&mut *boxed_read, &mut pipe_writer).map(|_size| ()))?;
312        let handle = pipe_reader.as_raw_grip();
313        Ok(Self::handle(
314            handle,
315            ReadResources::PipedThread(Some((pipe_reader, join_handle))),
316        ))
317    }
318
319    /// Read from the null device, which produces no data.
320    pub fn null() -> io::Result<Self> {
321        #[cfg(not(windows))]
322        {
323            Ok(Self::file(File::open("/dev/null")?))
324        }
325
326        #[cfg(windows)]
327        {
328            Ok(Self::file(File::open("nul")?))
329        }
330    }
331
332    /// Read from the given string.
333    #[inline]
334    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
335    pub fn str<S: AsRef<str>>(s: S) -> io::Result<Self> {
336        Self::bytes(s.as_ref().as_bytes())
337    }
338
339    /// Read from the given bytes.
340    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
341    pub fn bytes(bytes: &[u8]) -> io::Result<Self> {
342        // If we can write it to a new pipe without blocking, do so.
343        #[cfg(not(any(windows, target_os = "redox")))]
344        if bytes.len() <= rustix::pipe::PIPE_BUF {
345            let (pipe_reader, mut pipe_writer) = pipe()?;
346
347            pipe_writer.write_all(bytes)?;
348            pipe_writer.flush()?;
349            drop(pipe_writer);
350
351            let handle = pipe_reader.as_raw_grip();
352            return Ok(Self::handle(handle, ReadResources::PipeReader(pipe_reader)));
353        }
354
355        // Otherwise, launch a thread.
356        Self::piped_thread(Box::new(Cursor::new(bytes.to_vec())))
357    }
358
359    #[inline]
360    #[must_use]
361    fn handle(handle: RawGrip, resources: ReadResources) -> Self {
362        Self {
363            handle: unsafe { RawReadable::from_raw_grip(handle) },
364            resources,
365        }
366    }
367
368    fn map_err(&mut self, e: io::Error) -> io::Error {
369        match &mut self.resources {
370            #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
371            ReadResources::PipedThread(piped_thread) => {
372                let (pipe_reader, join_handle) = piped_thread.take().unwrap();
373                drop(pipe_reader);
374                join_handle.join().unwrap().unwrap_err()
375            }
376            _ => e,
377        }
378    }
379}
380
381impl StreamWriter {
382    /// Write to standard output.
383    ///
384    /// Unlike [`std::io::stdout`], this `stdout` returns a stream which is
385    /// unbuffered and unlocked.
386    ///
387    /// Since it is unbuffered, it is often beneficial to wrap the resulting
388    /// `StreamWriter` in a [`BufWriter`] or [`LineWriter`].
389    ///
390    /// This acquires a [`std::io::StdoutLock`] (in a non-recursive way) to
391    /// prevent accesses to `std::io::Stdout` while this is live, and fails if
392    /// a `StreamWriter` or `StreamDuplexer` for standard output already
393    /// exists.
394    ///
395    /// [`BufWriter`]: std::io::BufWriter
396    /// [`LineWriter`]: std::io::LineWriter
397    #[inline]
398    pub fn stdout() -> io::Result<Self> {
399        let stdout_locker = StdoutLocker::new()?;
400
401        // Obtain stdout's handle.
402        #[cfg(not(windows))]
403        let handle = stdout_locker.as_raw_fd();
404
405        // On Windows, stdout may be connected to a UTF-16 console, which
406        // `RawHandleOrSocket` can take care of for us.
407        #[cfg(windows)]
408        let handle = RawHandleOrSocket::stdout();
409
410        Ok(Self::handle(handle, WriteResources::Stdout(stdout_locker)))
411    }
412
413    /// Write to standard error.
414    ///
415    /// Like [`std::io::stderr`], this `stderr` returns a stream which is
416    /// unbuffered. However, unlike [`std::io::stderr`], the stream is also
417    /// unlocked.
418    ///
419    /// Since it is unbuffered, it is often beneficial to wrap the resulting
420    /// `StreamWriter` in a [`BufWriter`] or [`LineWriter`].
421    ///
422    /// This acquires a [`std::io::StderrLock`] (in a non-recursive way) to
423    /// prevent accesses to `std::io::Stderr` while this is live, and fails if
424    /// a `StreamWriter` or `StreamDuplexer` for standard output already
425    /// exists.
426    ///
427    /// [`BufWriter`]: std::io::BufWriter
428    /// [`LineWriter`]: std::io::LineWriter
429    #[inline]
430    pub fn stderr() -> io::Result<Self> {
431        let stderr_locker = StderrLocker::new()?;
432
433        // Obtain stdout's handle.
434        #[cfg(not(windows))]
435        let handle = stderr_locker.as_raw_fd();
436
437        // On Windows, stdout may be connected to a UTF-16 console, which
438        // `RawHandleOrSocket` can take care of for us.
439        #[cfg(windows)]
440        let handle = RawHandleOrSocket::stderr();
441
442        Ok(Self::handle(handle, WriteResources::Stderr(stderr_locker)))
443    }
444
445    /// Write to an open file, taking ownership of it.
446    ///
447    /// This method can be passed a [`std::fs::File`] or similar `File` types.
448    #[inline]
449    #[must_use]
450    pub fn file<Filelike: IntoFilelike + Read + Write + Seek>(filelike: Filelike) -> Self {
451        // Safety: We don't implement `From`/`Into` to allow the inner `File`
452        // to be extracted, so we don't need to worry that we're granting
453        // ambient authorities here.
454        Self::_file(File::from_into_filelike(filelike))
455    }
456
457    #[inline]
458    #[must_use]
459    fn _file(file: File) -> Self {
460        let handle = file.as_raw_grip();
461        Self::handle(handle, WriteResources::File(file))
462    }
463
464    /// Write to an open TCP stream, taking ownership of it.
465    ///
466    /// This method can be passed a [`std::net::TcpStream`] or similar
467    /// `TcpStream` types.
468    #[inline]
469    #[must_use]
470    pub fn tcp_stream<Socketlike: IntoSocketlike>(socketlike: Socketlike) -> Self {
471        // Safety: We don't implement `From`/`Into` to allow the inner
472        // `TcpStream` to be extracted, so we don't need to worry that we're
473        // granting ambient authorities here.
474        Self::_tcp_stream(TcpStream::from_into_socketlike(socketlike))
475    }
476
477    #[inline]
478    #[must_use]
479    fn _tcp_stream(tcp_stream: TcpStream) -> Self {
480        let handle = tcp_stream.as_raw_grip();
481        Self::handle(handle, WriteResources::TcpStream(tcp_stream))
482    }
483
484    /// Write to an open Unix-domain stream, taking ownership of it.
485    #[cfg(unix)]
486    #[inline]
487    #[must_use]
488    pub fn unix_stream(unix_stream: UnixStream) -> Self {
489        let handle = unix_stream.as_raw_grip();
490        Self::handle(handle, WriteResources::UnixStream(unix_stream))
491    }
492
493    /// Write to the writing end of an open pipe, taking ownership of it.
494    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
495    #[inline]
496    #[must_use]
497    pub fn pipe_writer(pipe_writer: PipeWriter) -> Self {
498        let handle = pipe_writer.as_raw_grip();
499        Self::handle(handle, WriteResources::PipeWriter(pipe_writer))
500    }
501
502    /// Spawn the given command and write to its standard input. Its standard
503    /// output is redirected to `Stdio::null()`.
504    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
505    pub fn write_to_command(mut command: Command) -> io::Result<Self> {
506        command.stdin(Stdio::piped());
507        command.stdout(Stdio::null());
508        let child = command.spawn()?;
509        let handle = child.stdin.as_ref().unwrap().as_raw_grip();
510        Ok(Self::handle(handle, WriteResources::Child(child)))
511    }
512
513    /// Write to the given child standard input, taking ownership of it.
514    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
515    #[inline]
516    #[must_use]
517    pub fn child_stdin(child_stdin: ChildStdin) -> Self {
518        let handle = child_stdin.as_raw_grip();
519        Self::handle(handle, WriteResources::ChildStdin(child_stdin))
520    }
521
522    /// Write to a boxed `Write` implementation, taking ownership of it. This
523    /// works by creating a new thread to read the data through a pipe and
524    /// write it.
525    ///
526    /// Writes to the pipe aren't synchronous with writes to the boxed `Write`
527    /// implementation. To ensure data is flushed all the way through the
528    /// thread and into the boxed `Write` implementation, call [`flush`]`()`,
529    /// which synchronizes with the thread to ensure that is has completed
530    /// writing all pending output.
531    ///
532    /// [`flush`]: https://doc.rust-lang.org/std/io/trait.Write.html#tymethod.flush
533    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
534    pub fn piped_thread(mut boxed_write: Box<dyn Write + Send>) -> io::Result<Self> {
535        let (mut pipe_reader, pipe_writer) = pipe()?;
536        let join_handle = thread::Builder::new()
537            .name("piped thread for boxed writer".to_owned())
538            .spawn(move || {
539                copy(&mut pipe_reader, &mut *boxed_write)?;
540                boxed_write.flush()?;
541                Ok(boxed_write)
542            })?;
543        let handle = pipe_writer.as_raw_grip();
544        Ok(Self::handle(
545            handle,
546            WriteResources::PipedThread(Some((pipe_writer, join_handle))),
547        ))
548    }
549
550    /// Write to the null device, which ignores all data.
551    pub fn null() -> io::Result<Self> {
552        #[cfg(not(windows))]
553        {
554            Ok(Self::file(File::create("/dev/null")?))
555        }
556
557        #[cfg(windows)]
558        {
559            Ok(Self::file(File::create("nul")?))
560        }
561    }
562
563    #[inline]
564    fn handle(handle: RawGrip, resources: WriteResources) -> Self {
565        Self {
566            handle: unsafe { RawWriteable::from_raw_grip(handle) },
567            resources,
568        }
569    }
570
571    fn map_err(&mut self, e: io::Error) -> io::Error {
572        match &mut self.resources {
573            #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
574            WriteResources::PipedThread(piped_thread) => {
575                let (pipe_writer, join_handle) = piped_thread.take().unwrap();
576                drop(pipe_writer);
577                join_handle.join().unwrap().map(|_| ()).unwrap_err()
578            }
579            _ => e,
580        }
581    }
582}
583
584impl StreamDuplexer {
585    /// Duplex with stdin and stdout, taking ownership of them.
586    ///
587    /// Unlike [`std::io::stdin`] and [`std::io::stdout`], this `stdin_stdout`
588    /// returns a stream which is unbuffered and unlocked.
589    ///
590    /// This acquires a [`std::io::StdinLock`] and a [`std::io::StdoutLock`]
591    /// (in non-recursive ways) to prevent accesses to [`std::io::Stdin`] and
592    /// [`std::io::Stdout`] while this is live, and fails if a `StreamReader`
593    /// for standard input, a `StreamWriter` for standard output, or a
594    /// `StreamDuplexer` for standard input and standard output already exist.
595    #[inline]
596    pub fn stdin_stdout() -> io::Result<Self> {
597        let stdin_locker = StdinLocker::new()?;
598        let stdout_locker = StdoutLocker::new()?;
599
600        // Obtain stdin's and stdout's handles.
601        #[cfg(not(windows))]
602        let (read, write) = (stdin_locker.as_raw_grip(), stdout_locker.as_raw_grip());
603
604        // On Windows, stdin and stdout may be connected to a UTF-16 console,
605        // which `RawHandleOrSocket` can take care of for us.
606        #[cfg(windows)]
607        let (read, write) = (RawHandleOrSocket::stdin(), RawHandleOrSocket::stdout());
608
609        Ok(Self::two_handles(
610            read,
611            write,
612            DuplexResources::StdinStdout((stdin_locker, stdout_locker)),
613        ))
614    }
615
616    /// Duplex with an open character device, taking ownership of it.
617    #[cfg(feature = "char-device")]
618    #[cfg_attr(docsrs, doc(cfg(feature = "char-device")))]
619    #[inline]
620    #[must_use]
621    pub fn char_device(char_device: CharDevice) -> Self {
622        let handle = char_device.as_raw_grip();
623        Self::handle(handle, DuplexResources::CharDevice(char_device))
624    }
625
626    /// Duplex with an open TCP stream, taking ownership of it.
627    ///
628    /// This method can be passed a [`std::net::TcpStream`] or similar
629    /// `TcpStream` types.
630    #[inline]
631    #[must_use]
632    pub fn tcp_stream<Socketlike: IntoSocketlike>(socketlike: Socketlike) -> Self {
633        Self::_tcp_stream(TcpStream::from_into_socketlike(socketlike))
634    }
635
636    #[inline]
637    #[must_use]
638    fn _tcp_stream(tcp_stream: TcpStream) -> Self {
639        let handle = tcp_stream.as_raw_grip();
640        // Safety: We don't implement `From`/`Into` to allow the inner
641        // `TcpStream` to be extracted, so we don't need to worry that
642        // we're granting ambient authorities here.
643        Self::handle(handle, DuplexResources::TcpStream(tcp_stream))
644    }
645
646    /// Duplex with an open Unix-domain stream, taking ownership of it.
647    #[cfg(unix)]
648    #[must_use]
649    pub fn unix_stream(unix_stream: UnixStream) -> Self {
650        let handle = unix_stream.as_raw_grip();
651        Self::handle(handle, DuplexResources::UnixStream(unix_stream))
652    }
653
654    /// Duplex with a pair of pipe streams, taking ownership of them.
655    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
656    #[inline]
657    #[must_use]
658    pub fn pipe_reader_writer(pipe_reader: PipeReader, pipe_writer: PipeWriter) -> Self {
659        let read = pipe_reader.as_raw_grip();
660        let write = pipe_writer.as_raw_grip();
661        Self::two_handles(
662            read,
663            write,
664            DuplexResources::PipeReaderWriter((pipe_reader, pipe_writer)),
665        )
666    }
667
668    /// Duplex with one end of a socketpair stream, taking ownership of it.
669    #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
670    #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
671    #[must_use]
672    pub fn socketpair_stream(stream: SocketpairStream) -> Self {
673        let handle = stream.as_raw_grip();
674        Self::handle(handle, DuplexResources::SocketpairStream(stream))
675    }
676
677    /// Spawn the given command and duplex with its standard input and output.
678    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
679    pub fn duplex_with_command(mut command: Command) -> io::Result<Self> {
680        command.stdin(Stdio::piped());
681        command.stdout(Stdio::piped());
682        let child = command.spawn()?;
683        let read = child.stdout.as_ref().unwrap().as_raw_grip();
684        let write = child.stdin.as_ref().unwrap().as_raw_grip();
685        Ok(Self::two_handles(
686            read,
687            write,
688            DuplexResources::Child(child),
689        ))
690    }
691
692    /// Duplex with a child process' stdout and stdin, taking ownership of
693    /// them.
694    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
695    #[inline]
696    #[must_use]
697    pub fn child_stdout_stdin(child_stdout: ChildStdout, child_stdin: ChildStdin) -> Self {
698        let read = child_stdout.as_raw_grip();
699        let write = child_stdin.as_raw_grip();
700        Self::two_handles(
701            read,
702            write,
703            DuplexResources::ChildStdoutStdin((child_stdout, child_stdin)),
704        )
705    }
706
707    /// Duplex with a duplexer from on another thread through a socketpair.
708    ///
709    /// A socketpair is created, new thread is created, `boxed_duplex` is
710    /// read from and written to over the socketpair.
711    ///
712    /// Writes to the pipe aren't synchronous with writes to the boxed `Write`
713    /// implementation. To ensure data is flushed all the way through the
714    /// thread and into the boxed `Write` implementation, call `flush()`, which
715    /// synchronizes with the thread to ensure that is has completed writing
716    /// all pending output.
717    #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
718    #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
719    pub fn socketed_thread_read_first(
720        mut boxed_duplex: Box<dyn HalfDuplex + Send>,
721    ) -> io::Result<Self> {
722        let (a, b) = socketpair_stream()?;
723        let join_handle = thread::Builder::new()
724            .name("socketed thread for boxed duplexer".to_owned())
725            .spawn(move || {
726                read_first(a, &mut *boxed_duplex)?;
727                Ok(boxed_duplex)
728            })?;
729        let handle = b.as_raw_grip();
730        Ok(Self::handle(
731            handle,
732            DuplexResources::SocketedThread(Some((b, join_handle))),
733        ))
734    }
735
736    /// Duplex with a duplexer from on another thread through a socketpair.
737    ///
738    /// A socketpair is created, new thread is created, `boxed_duplex` is
739    /// written to and read from over the socketpair.
740    ///
741    /// Writes to the pipe aren't synchronous with writes to the boxed `Write`
742    /// implementation. To ensure data is flushed all the way through the
743    /// thread and into the boxed `Write` implementation, call `flush()`, which
744    /// synchronizes with the thread to ensure that is has completed writing
745    /// all pending output.
746    #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
747    #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
748    pub fn socketed_thread_write_first(
749        mut boxed_duplex: Box<dyn HalfDuplex + Send>,
750    ) -> io::Result<Self> {
751        let (a, b) = socketpair_stream()?;
752        let join_handle = thread::Builder::new()
753            .name("socketed thread for boxed duplexer".to_owned())
754            .spawn(move || {
755                write_first(a, &mut *boxed_duplex)?;
756                Ok(boxed_duplex)
757            })?;
758        let handle = b.as_raw_grip();
759        Ok(Self::handle(
760            handle,
761            DuplexResources::SocketedThread(Some((b, join_handle))),
762        ))
763    }
764
765    /// Duplex with a duplexer from on another thread through a socketpair.
766    ///
767    /// A socketpair is created, new thread is created, `boxed_duplex` is
768    /// written to and/or read from over the socketpair.
769    /// `ReadReady::num_ready_bytes` is used to determine whether to read from
770    /// or write to `boxed_duplex` first. This may be inefficient, so if you
771    /// know which direction should go first, use `socketed_thread_read_first`
772    /// or `socketed_thread_write_first` instead.
773    ///
774    /// Writes to the pipe aren't synchronous with writes to the boxed `Write`
775    /// implementation. To ensure data is flushed all the way through the
776    /// thread and into the boxed `Write` implementation, call `flush()`, which
777    /// synchronizes with the thread to ensure that is has completed writing
778    /// all pending output.
779    #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
780    #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
781    pub fn socketed_thread(
782        mut boxed_duplex: Box<dyn HalfDuplexReadReady + Send>,
783    ) -> io::Result<Self> {
784        let (a, b) = socketpair_stream()?;
785        let join_handle = thread::Builder::new()
786            .name("socketed thread for boxed duplexer".to_owned())
787            .spawn(move || {
788                loop {
789                    if a.num_ready_bytes()? != 0 {
790                        write_first(a, &mut *boxed_duplex)?;
791                        break;
792                    }
793                    if boxed_duplex.num_ready_bytes()? != 0 {
794                        read_first(a, &mut *boxed_duplex)?;
795                        break;
796                    }
797
798                    // TODO: Implement a nicer way to wait for either of
799                    // the streams to be ready to read.
800                    std::thread::sleep(std::time::Duration::from_secs(1));
801                }
802                Ok(boxed_duplex)
803            })?;
804        let handle = b.as_raw_grip();
805        Ok(Self::handle(
806            handle,
807            DuplexResources::SocketedThreadReadReady(Some((b, join_handle))),
808        ))
809    }
810
811    /// Duplex with a function running on another thread through a socketpair.
812    ///
813    /// A socketpair is created, new thread is created, `func` is called in the
814    /// new thread and passed one of the ends of the socketstream.
815    ///
816    /// Writes to the pipe aren't synchronous with writes to the boxed `Write`
817    /// implementation. To ensure data is flushed all the way through the
818    /// thread and into the boxed `Write` implementation, call `flush()`, which
819    /// synchronizes with the thread to ensure that is has completed writing
820    /// all pending output.
821    #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
822    #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
823    pub fn socketed_thread_func(
824        func: Box<dyn Send + FnOnce(SocketpairStream) -> io::Result<SocketpairStream>>,
825    ) -> io::Result<Self> {
826        let (a, b) = socketpair_stream()?;
827        let join_handle = thread::Builder::new()
828            .name("socketed thread for boxed duplexer".to_owned())
829            .spawn(move || func(a))?;
830        let handle = b.as_raw_grip();
831        Ok(Self::handle(
832            handle,
833            DuplexResources::SocketedThreadFunc(Some((b, join_handle))),
834        ))
835    }
836
837    /// Read and write with the null device, which ignores all data, and
838    /// produces no data.
839    pub fn null() -> io::Result<Self> {
840        #[cfg(not(windows))]
841        let file = OpenOptions::new()
842            .read(true)
843            .write(true)
844            .open("/dev/null")?;
845
846        #[cfg(windows)]
847        let file = OpenOptions::new().read(true).write(true).open("nul")?;
848
849        let handle = file.as_raw_grip();
850        Ok(Self::handle(handle, DuplexResources::DevNull(file)))
851    }
852
853    #[inline]
854    #[must_use]
855    fn handle(handle: RawGrip, resources: DuplexResources) -> Self {
856        Self {
857            read_handle: unsafe { RawReadable::from_raw_grip(handle) },
858            write_handle: unsafe { RawWriteable::from_raw_grip(handle) },
859            resources,
860        }
861    }
862
863    #[inline]
864    #[must_use]
865    fn two_handles(read: RawGrip, write: RawGrip, resources: DuplexResources) -> Self {
866        Self {
867            read_handle: unsafe { RawReadable::from_raw_grip(read) },
868            write_handle: unsafe { RawWriteable::from_raw_grip(write) },
869            resources,
870        }
871    }
872
873    fn map_err(&mut self, e: io::Error) -> io::Error {
874        match &mut self.resources {
875            _ => e,
876        }
877    }
878}
879
880impl Read for StreamReader {
881    #[inline]
882    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
883        match self.handle.read(buf) {
884            Ok(size) => Ok(size),
885            Err(e) => Err(self.map_err(e)),
886        }
887    }
888
889    #[inline]
890    fn read_vectored(&mut self, bufs: &mut [IoSliceMut]) -> io::Result<usize> {
891        match self.handle.read_vectored(bufs) {
892            Ok(size) => Ok(size),
893            Err(e) => Err(self.map_err(e)),
894        }
895    }
896
897    #[cfg(can_vector)]
898    #[inline]
899    fn is_read_vectored(&self) -> bool {
900        self.handle.is_read_vectored()
901    }
902
903    #[inline]
904    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
905        match self.handle.read_to_end(buf) {
906            Ok(size) => Ok(size),
907            Err(e) => Err(self.map_err(e)),
908        }
909    }
910
911    #[inline]
912    fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
913        match self.handle.read_to_string(buf) {
914            Ok(size) => Ok(size),
915            Err(e) => Err(self.map_err(e)),
916        }
917    }
918
919    #[inline]
920    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
921        match self.handle.read_exact(buf) {
922            Ok(()) => Ok(()),
923            Err(e) => Err(self.map_err(e)),
924        }
925    }
926}
927
928impl Peek for StreamReader {
929    fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
930        match &mut self.resources {
931            ReadResources::File(file) => Peek::peek(file, buf),
932            ReadResources::TcpStream(tcp_stream) => Peek::peek(tcp_stream, buf),
933            #[cfg(unix)]
934            ReadResources::UnixStream(unix_stream) => Peek::peek(unix_stream, buf),
935            _ => Ok(0),
936        }
937    }
938}
939
940impl ReadReady for StreamReader {
941    fn num_ready_bytes(&self) -> io::Result<u64> {
942        match &self.resources {
943            ReadResources::File(file) => ReadReady::num_ready_bytes(file),
944            ReadResources::TcpStream(tcp_stream) => ReadReady::num_ready_bytes(tcp_stream),
945            #[cfg(unix)]
946            ReadResources::UnixStream(unix_stream) => ReadReady::num_ready_bytes(unix_stream),
947            ReadResources::PipeReader(pipe_reader) => ReadReady::num_ready_bytes(pipe_reader),
948            ReadResources::Stdin(stdin) => ReadReady::num_ready_bytes(stdin),
949            #[cfg(not(target_os = "wasi"))]
950            ReadResources::PipedThread(piped_thread) => {
951                ReadReady::num_ready_bytes(&piped_thread.as_ref().unwrap().0)
952            }
953            #[cfg(not(target_os = "wasi"))]
954            ReadResources::Child(child) => {
955                ReadReady::num_ready_bytes(child.stdout.as_ref().unwrap())
956            }
957            #[cfg(not(target_os = "wasi"))]
958            ReadResources::ChildStdout(child_stdout) => ReadReady::num_ready_bytes(child_stdout),
959            #[cfg(not(target_os = "wasi"))]
960            ReadResources::ChildStderr(child_stderr) => ReadReady::num_ready_bytes(child_stderr),
961        }
962    }
963}
964
965impl Write for StreamWriter {
966    #[inline]
967    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
968        match self.handle.write(buf) {
969            Ok(size) => Ok(size),
970            Err(e) => Err(self.map_err(e)),
971        }
972    }
973
974    #[inline]
975    fn flush(&mut self) -> io::Result<()> {
976        match self.handle.flush() {
977            Ok(()) => {
978                // There's no way to send a flush event through a pipe, so for
979                // now, force a flush by closing the pipe, waiting for the
980                // thread to exit, recover the boxed writer, and then wrap it
981                // in a whole new piped thread.
982                #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
983                if let WriteResources::PipedThread(piped_thread) = &mut self.resources {
984                    let (mut pipe_writer, join_handle) = piped_thread.take().unwrap();
985                    pipe_writer.flush()?;
986                    drop(pipe_writer);
987                    let boxed_write = join_handle.join().unwrap().unwrap();
988                    *self = Self::piped_thread(boxed_write)?;
989                }
990                Ok(())
991            }
992            Err(e) => Err(self.map_err(e)),
993        }
994    }
995
996    #[inline]
997    fn write_vectored(&mut self, bufs: &[IoSlice]) -> io::Result<usize> {
998        match self.handle.write_vectored(bufs) {
999            Ok(size) => Ok(size),
1000            Err(e) => Err(self.map_err(e)),
1001        }
1002    }
1003
1004    #[cfg(can_vector)]
1005    #[inline]
1006    fn is_write_vectored(&self) -> bool {
1007        self.handle.is_write_vectored()
1008    }
1009
1010    #[inline]
1011    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
1012        match self.handle.write_all(buf) {
1013            Ok(()) => Ok(()),
1014            Err(e) => Err(self.map_err(e)),
1015        }
1016    }
1017
1018    #[cfg(write_all_vectored)]
1019    #[inline]
1020    fn write_all_vectored(&mut self, bufs: &mut [IoSlice]) -> io::Result<()> {
1021        match self.handle.write_all_vectored(bufs) {
1022            Ok(()) => Ok(()),
1023            Err(e) => Err(self.map_err(e)),
1024        }
1025    }
1026
1027    #[inline]
1028    fn write_fmt(&mut self, fmt: Arguments) -> io::Result<()> {
1029        match self.handle.write_fmt(fmt) {
1030            Ok(()) => Ok(()),
1031            Err(e) => Err(self.map_err(e)),
1032        }
1033    }
1034}
1035
1036impl Read for StreamDuplexer {
1037    #[inline]
1038    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1039        match self.read_handle.read(buf) {
1040            Ok(size) => Ok(size),
1041            Err(e) => Err(self.map_err(e)),
1042        }
1043    }
1044
1045    #[inline]
1046    fn read_vectored(&mut self, bufs: &mut [IoSliceMut]) -> io::Result<usize> {
1047        match self.read_handle.read_vectored(bufs) {
1048            Ok(size) => Ok(size),
1049            Err(e) => Err(self.map_err(e)),
1050        }
1051    }
1052
1053    #[cfg(can_vector)]
1054    #[inline]
1055    fn is_read_vectored(&self) -> bool {
1056        self.read_handle.is_read_vectored()
1057    }
1058
1059    #[inline]
1060    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
1061        match self.read_handle.read_to_end(buf) {
1062            Ok(size) => Ok(size),
1063            Err(e) => Err(self.map_err(e)),
1064        }
1065    }
1066
1067    #[inline]
1068    fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
1069        match self.read_handle.read_to_string(buf) {
1070            Ok(size) => Ok(size),
1071            Err(e) => Err(self.map_err(e)),
1072        }
1073    }
1074
1075    #[inline]
1076    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
1077        match self.read_handle.read_exact(buf) {
1078            Ok(()) => Ok(()),
1079            Err(e) => Err(self.map_err(e)),
1080        }
1081    }
1082}
1083
1084impl Peek for StreamDuplexer {
1085    fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1086        match &mut self.resources {
1087            DuplexResources::TcpStream(tcp_stream) => Peek::peek(tcp_stream, buf),
1088            #[cfg(unix)]
1089            DuplexResources::UnixStream(unix_stream) => Peek::peek(unix_stream, buf),
1090            #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1091            DuplexResources::SocketpairStream(socketpair) => Peek::peek(socketpair, buf),
1092            #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1093            DuplexResources::SocketedThreadFunc(socketed_thread) => {
1094                Peek::peek(&mut socketed_thread.as_mut().unwrap().0, buf)
1095            }
1096            #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1097            DuplexResources::SocketedThreadReadReady(socketed_thread) => {
1098                Peek::peek(&mut socketed_thread.as_mut().unwrap().0, buf)
1099            }
1100            _ => Ok(0),
1101        }
1102    }
1103}
1104
1105impl ReadReady for StreamDuplexer {
1106    fn num_ready_bytes(&self) -> io::Result<u64> {
1107        match &self.resources {
1108            #[cfg(not(target_os = "wasi"))]
1109            DuplexResources::PipeReaderWriter((pipe_reader, _)) => {
1110                ReadReady::num_ready_bytes(pipe_reader)
1111            }
1112            DuplexResources::StdinStdout((stdin, _)) => ReadReady::num_ready_bytes(stdin),
1113            #[cfg(not(target_os = "wasi"))]
1114            DuplexResources::Child(child) => {
1115                ReadReady::num_ready_bytes(child.stdout.as_ref().unwrap())
1116            }
1117            #[cfg(not(target_os = "wasi"))]
1118            DuplexResources::ChildStdoutStdin((child_stdout, _)) => {
1119                ReadReady::num_ready_bytes(child_stdout)
1120            }
1121            #[cfg(feature = "char-device")]
1122            DuplexResources::CharDevice(char_device) => ReadReady::num_ready_bytes(char_device),
1123            DuplexResources::DevNull(file) => ReadReady::num_ready_bytes(file),
1124            DuplexResources::TcpStream(tcp_stream) => ReadReady::num_ready_bytes(tcp_stream),
1125            #[cfg(unix)]
1126            DuplexResources::UnixStream(unix_stream) => ReadReady::num_ready_bytes(unix_stream),
1127            #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1128            DuplexResources::SocketpairStream(socketpair_stream) => {
1129                ReadReady::num_ready_bytes(socketpair_stream)
1130            }
1131            #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1132            DuplexResources::SocketedThreadFunc(socketed_thread) => {
1133                ReadReady::num_ready_bytes(&socketed_thread.as_ref().unwrap().0)
1134            }
1135            #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1136            DuplexResources::SocketedThread(socketed_thread) => {
1137                ReadReady::num_ready_bytes(&socketed_thread.as_ref().unwrap().0)
1138            }
1139            #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1140            DuplexResources::SocketedThreadReadReady(socketed_thread) => {
1141                ReadReady::num_ready_bytes(&socketed_thread.as_ref().unwrap().0)
1142            }
1143        }
1144    }
1145}
1146
1147impl Write for StreamDuplexer {
1148    #[inline]
1149    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1150        match self.write_handle.write(buf) {
1151            Ok(size) => Ok(size),
1152            Err(e) => Err(self.map_err(e)),
1153        }
1154    }
1155
1156    #[inline]
1157    fn flush(&mut self) -> io::Result<()> {
1158        match self.write_handle.flush() {
1159            Ok(()) => Ok(()),
1160            Err(e) => Err(self.map_err(e)),
1161        }
1162    }
1163
1164    #[inline]
1165    fn write_vectored(&mut self, bufs: &[IoSlice]) -> io::Result<usize> {
1166        match self.write_handle.write_vectored(bufs) {
1167            Ok(size) => Ok(size),
1168            Err(e) => Err(self.map_err(e)),
1169        }
1170    }
1171
1172    #[cfg(can_vector)]
1173    #[inline]
1174    fn is_write_vectored(&self) -> bool {
1175        self.write_handle.is_write_vectored()
1176    }
1177
1178    #[inline]
1179    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
1180        match self.write_handle.write_all(buf) {
1181            Ok(()) => Ok(()),
1182            Err(e) => Err(self.map_err(e)),
1183        }
1184    }
1185
1186    #[cfg(write_all_vectored)]
1187    #[inline]
1188    fn write_all_vectored(&mut self, bufs: &mut [IoSlice]) -> io::Result<()> {
1189        match self.write_handle.write_all_vectored(bufs) {
1190            Ok(()) => Ok(()),
1191            Err(e) => Err(self.map_err(e)),
1192        }
1193    }
1194
1195    #[inline]
1196    fn write_fmt(&mut self, fmt: Arguments) -> io::Result<()> {
1197        match self.write_handle.write_fmt(fmt) {
1198            Ok(()) => Ok(()),
1199            Err(e) => Err(self.map_err(e)),
1200        }
1201    }
1202}
1203
1204impl Duplex for StreamDuplexer {}
1205
1206#[cfg(not(windows))]
1207impl AsRawFd for StreamReader {
1208    #[inline]
1209    fn as_raw_fd(&self) -> RawFd {
1210        self.handle.as_raw_fd()
1211    }
1212}
1213
1214#[cfg(not(windows))]
1215impl AsRawFd for StreamWriter {
1216    #[inline]
1217    fn as_raw_fd(&self) -> RawFd {
1218        self.handle.as_raw_fd()
1219    }
1220}
1221
1222#[cfg(not(windows))]
1223impl AsRawReadWriteFd for StreamDuplexer {
1224    #[inline]
1225    fn as_raw_read_fd(&self) -> RawFd {
1226        self.read_handle.as_raw_fd()
1227    }
1228
1229    #[inline]
1230    fn as_raw_write_fd(&self) -> RawFd {
1231        self.write_handle.as_raw_fd()
1232    }
1233}
1234
1235#[cfg(windows)]
1236impl AsRawHandleOrSocket for StreamReader {
1237    #[inline]
1238    fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
1239        self.handle.as_raw_handle_or_socket()
1240    }
1241}
1242
1243#[cfg(windows)]
1244impl AsRawHandleOrSocket for StreamWriter {
1245    #[inline]
1246    fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
1247        self.handle.as_raw_handle_or_socket()
1248    }
1249}
1250
1251#[cfg(windows)]
1252impl AsRawReadWriteHandleOrSocket for StreamDuplexer {
1253    #[inline]
1254    fn as_raw_read_handle_or_socket(&self) -> RawHandleOrSocket {
1255        self.read_handle.as_raw_handle_or_socket()
1256    }
1257
1258    #[inline]
1259    fn as_raw_write_handle_or_socket(&self) -> RawHandleOrSocket {
1260        self.write_handle.as_raw_handle_or_socket()
1261    }
1262}
1263
1264#[cfg(not(windows))]
1265impl AsFd for StreamReader {
1266    #[inline]
1267    fn as_fd(&self) -> BorrowedFd<'_> {
1268        unsafe { BorrowedFd::borrow_raw(self.handle.as_raw_fd()) }
1269    }
1270}
1271
1272#[cfg(not(windows))]
1273impl AsFd for StreamWriter {
1274    #[inline]
1275    fn as_fd(&self) -> BorrowedFd<'_> {
1276        unsafe { BorrowedFd::borrow_raw(self.handle.as_raw_fd()) }
1277    }
1278}
1279
1280#[cfg(not(windows))]
1281impl AsReadWriteFd for StreamDuplexer {
1282    #[inline]
1283    fn as_read_fd(&self) -> BorrowedFd<'_> {
1284        unsafe { BorrowedFd::borrow_raw(self.read_handle.as_raw_fd()) }
1285    }
1286
1287    #[inline]
1288    fn as_write_fd(&self) -> BorrowedFd<'_> {
1289        unsafe { BorrowedFd::borrow_raw(self.write_handle.as_raw_fd()) }
1290    }
1291}
1292
1293#[cfg(windows)]
1294impl AsHandleOrSocket for StreamReader {
1295    #[inline]
1296    fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
1297        unsafe { BorrowedHandleOrSocket::borrow_raw(self.handle.as_raw_handle_or_socket()) }
1298    }
1299}
1300
1301#[cfg(windows)]
1302impl AsHandleOrSocket for StreamWriter {
1303    #[inline]
1304    fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
1305        unsafe { BorrowedHandleOrSocket::borrow_raw(self.handle.as_raw_handle_or_socket()) }
1306    }
1307}
1308
1309#[cfg(windows)]
1310impl AsReadWriteHandleOrSocket for StreamDuplexer {
1311    #[inline]
1312    fn as_read_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
1313        unsafe { BorrowedHandleOrSocket::borrow_raw(self.read_handle.as_raw_handle_or_socket()) }
1314    }
1315
1316    #[inline]
1317    fn as_write_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
1318        unsafe { BorrowedHandleOrSocket::borrow_raw(self.write_handle.as_raw_handle_or_socket()) }
1319    }
1320}
1321
1322impl Drop for ReadResources {
1323    fn drop(&mut self) {
1324        match self {
1325            #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
1326            Self::PipedThread(piped_thread) => {
1327                let (pipe_reader, join_handle) = piped_thread.take().unwrap();
1328                drop(pipe_reader);
1329                // If the thread was still writing, we may have just caused
1330                // it to fail with `BrokenPipe`; ignore such errors because
1331                // we're dropping the stream.
1332                match join_handle.join().unwrap() {
1333                    Ok(()) => (),
1334                    Err(e) if e.kind() == io::ErrorKind::BrokenPipe => (),
1335                    Err(e) => Err(e).unwrap(),
1336                }
1337            }
1338            _ => {}
1339        }
1340    }
1341}
1342
1343impl Drop for WriteResources {
1344    fn drop(&mut self) {
1345        match self {
1346            #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
1347            Self::PipedThread(piped_thread) => {
1348                if let Some((pipe_writer, join_handle)) = piped_thread.take() {
1349                    drop(pipe_writer);
1350                    join_handle.join().unwrap().unwrap();
1351                }
1352            }
1353            _ => {}
1354        }
1355    }
1356}
1357
1358impl Drop for DuplexResources {
1359    fn drop(&mut self) {
1360        match self {
1361            #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1362            Self::SocketedThreadFunc(socketed_thread) => {
1363                if let Some((socketpair, join_handle)) = socketed_thread.take() {
1364                    drop(socketpair);
1365                    join_handle.join().unwrap().unwrap();
1366                }
1367            }
1368            _ => {}
1369        }
1370    }
1371}
1372
1373impl Debug for StreamReader {
1374    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1375        // Just print the fd number; don't try to print the path or any
1376        // information about it, because this information is otherwise
1377        // unavailable to safe Rust code.
1378        f.debug_struct("StreamReader")
1379            .field("raw_grip", &self.as_raw_grip())
1380            .finish()
1381    }
1382}
1383
1384impl Debug for StreamWriter {
1385    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1386        // Just print the fd number; don't try to print the path or any
1387        // information about it, because this information is otherwise
1388        // unavailable to safe Rust code.
1389        f.debug_struct("StreamWriter")
1390            .field("raw_grip", &self.as_raw_grip())
1391            .finish()
1392    }
1393}
1394
1395impl Debug for StreamDuplexer {
1396    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1397        // Just print the fd numbers; don't try to print the path or any
1398        // information about it, because this information is otherwise
1399        // unavailable to safe Rust code.
1400        f.debug_struct("StreamDuplexer")
1401            .field("unsafe_readable", &self.as_raw_read_grip())
1402            .field("unsafe_writeable", &self.as_raw_write_grip())
1403            .finish()
1404    }
1405}
1406
1407/// A trait that combines [`HalfDuplex`] and [`ReadReady`]. Implemented via
1408/// blanket implementation.
1409#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1410#[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
1411pub trait HalfDuplexReadReady: HalfDuplex + ReadReady {}
1412
1413#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1414impl<T: HalfDuplex + ReadReady> HalfDuplexReadReady for T {}
1415
1416#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1417fn read_first<T: HalfDuplex + ?Sized>(
1418    mut a: SocketpairStream,
1419    boxed_duplex: &mut T,
1420) -> io::Result<()> {
1421    'thread: loop {
1422        let mut buf = vec![0_u8; crate::buffered::DEFAULT_BUF_SIZE];
1423
1424        loop {
1425            let n = match boxed_duplex.read(&mut buf) {
1426                Ok(0) => break 'thread,
1427                Ok(n) => n,
1428                Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
1429                Err(e) => return Err(e),
1430            };
1431            a.write_all(&buf[..n])?;
1432            if n < buf.len() {
1433                break;
1434            }
1435        }
1436
1437        loop {
1438            let n = match a.read(&mut buf) {
1439                Ok(0) => break 'thread,
1440                Ok(n) => n,
1441                Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
1442                Err(e) => return Err(e),
1443            };
1444            boxed_duplex.write_all(&buf[..n])?;
1445            if n < buf.len() {
1446                break;
1447            }
1448        }
1449    }
1450    Ok(())
1451}
1452
1453#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1454fn write_first<T: HalfDuplex + ?Sized>(
1455    mut a: SocketpairStream,
1456    boxed_duplex: &mut T,
1457) -> io::Result<()> {
1458    'thread: loop {
1459        let mut buf = [0_u8; crate::buffered::DEFAULT_BUF_SIZE];
1460
1461        loop {
1462            let n = match a.read(&mut buf) {
1463                Ok(0) => break 'thread,
1464                Ok(n) => n,
1465                Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
1466                Err(e) => return Err(e),
1467            };
1468            boxed_duplex.write_all(&buf[..n])?;
1469            if n < buf.len() {
1470                break;
1471            }
1472        }
1473
1474        loop {
1475            let n = match boxed_duplex.read(&mut buf) {
1476                Ok(0) => break 'thread,
1477                Ok(n) => n,
1478                Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
1479                Err(e) => return Err(e),
1480            };
1481            a.write_all(&buf[..n])?;
1482            if n < buf.len() {
1483                break;
1484            }
1485        }
1486    }
1487    Ok(())
1488}