tokio_pipe/
lib.rs

1#![doc(html_root_url = "https://docs.rs/tokio-pipe/0.2.12")]
2//! Asynchronous pipe(2) library using tokio.
3//!
4//! # Example
5//!
6//! ```
7//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
8//!
9//! #[tokio::main]
10//! async fn main() -> anyhow::Result<()> {
11//!     let (mut r, mut w) = tokio_pipe::pipe()?;
12//!
13//!     w.write_all(b"HELLO, WORLD!").await?;
14//!
15//!     let mut buf = [0; 16];
16//!     let len = r.read(&mut buf[..]).await?;
17//!
18//!     assert_eq!(&buf[..len], &b"HELLO, WORLD!"[..]);
19//!     Ok(())
20//! }
21//! ```
22use std::cmp;
23use std::convert::TryFrom;
24use std::ffi::c_void;
25use std::fmt;
26use std::io;
27use std::mem;
28use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
29use std::pin::Pin;
30#[cfg(target_os = "linux")]
31use std::ptr;
32use std::task::{Context, Poll};
33
34use tokio::io::unix::AsyncFd;
35use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
36
37#[cfg(target_os = "linux")]
38pub use libc::off64_t;
39
40pub use libc::PIPE_BUF;
41
42#[cfg(target_os = "macos")]
43const MAX_LEN: usize = <libc::c_int>::MAX as usize - 1;
44
45#[cfg(not(target_os = "macos"))]
46const MAX_LEN: usize = <libc::ssize_t>::MAX as usize;
47
48macro_rules! try_libc {
49    ($e: expr) => {{
50        let ret = $e;
51        if ret == -1 {
52            return Err(io::Error::last_os_error());
53        }
54        ret
55    }};
56}
57
58macro_rules! cvt {
59    ($e:expr) => {{
60        let ret = $e;
61        if ret == -1 {
62            Err(io::Error::last_os_error())
63        } else {
64            Ok(ret)
65        }
66    }};
67}
68
69macro_rules! ready {
70    ($e:expr) => {
71        match $e {
72            Poll::Pending => return Poll::Pending,
73            Poll::Ready(e) => e,
74        }
75    };
76}
77
78fn is_wouldblock(err: &io::Error) -> bool {
79    err.kind() == io::ErrorKind::WouldBlock
80}
81
82unsafe fn set_nonblocking(fd: RawFd) {
83    let status_flags = libc::fcntl(fd, libc::F_GETFL);
84    if (status_flags & libc::O_NONBLOCK) == 0 {
85        libc::fcntl(fd, libc::F_SETFL, status_flags | libc::O_NONBLOCK);
86    }
87}
88
89unsafe fn set_nonblocking_checked(fd: RawFd, status_flags: libc::c_int) -> Result<(), io::Error> {
90    if (status_flags & libc::O_NONBLOCK) == 0 {
91        let res = libc::fcntl(fd, libc::F_SETFL, status_flags | libc::O_NONBLOCK);
92        try_libc!(res);
93    }
94
95    Ok(())
96}
97
98/// Return whether (reader is ready, writer is ready).
99///
100/// Readiness for reader/writer does not just mean readable/writable,
101/// they are also considered as ready if they aqre disconnected or an
102/// exceptional condition has occured (`libc::POLLERR`).
103#[cfg(target_os = "linux")]
104unsafe fn test_read_write_readiness(reader: RawFd, writer: RawFd) -> io::Result<(bool, bool)> {
105    use libc::{poll, pollfd, POLLERR, POLLHUP, POLLIN, POLLNVAL, POLLOUT};
106
107    let mut fds = [
108        pollfd {
109            fd: reader,
110            events: POLLIN,
111            revents: 0,
112        },
113        pollfd {
114            fd: writer,
115            events: POLLOUT,
116            revents: 0,
117        },
118    ];
119
120    // Specify timeout to 0 so that it returns immediately.
121    try_libc!(poll(&mut fds[0], 2, 0));
122
123    let is_read_ready = match fds[0].revents {
124        POLLERR | POLLHUP | POLLIN => true,
125        POLLNVAL => {
126            return Err(io::Error::new(
127                io::ErrorKind::InvalidInput,
128                "fd of reader is invalid",
129            ))
130        }
131        _ => false,
132    };
133
134    let is_writer_ready = match fds[1].revents {
135        POLLERR | POLLHUP | POLLOUT => true,
136        POLLNVAL => {
137            return Err(io::Error::new(
138                io::ErrorKind::InvalidInput,
139                "fd of writer is invalid",
140            ))
141        }
142        _ => false,
143    };
144
145    Ok((is_read_ready, is_writer_ready))
146}
147
148fn check_pipe(fd: RawFd) -> Result<(), io::Error> {
149    let mut stat = mem::MaybeUninit::<libc::stat>::uninit();
150
151    try_libc!(unsafe { libc::fstat(fd, stat.as_mut_ptr()) });
152
153    let stat = unsafe { stat.assume_init() };
154    if (stat.st_mode & libc::S_IFMT) == libc::S_IFIFO {
155        Ok(())
156    } else {
157        Err(io::Error::new(io::ErrorKind::Other, "Fd is not a pipe"))
158    }
159}
160
161fn get_status_flags(fd: RawFd) -> Result<libc::c_int, io::Error> {
162    Ok(try_libc!(unsafe { libc::fcntl(fd, libc::F_GETFL) }))
163}
164
165// needs impl AsRawFd for RawFd (^v1.48)
166#[derive(Debug)]
167// Optimize size of `Option<PipeFd>` by manually specifing the range.
168// Shamelessly taken from [`io-lifetimes::OwnedFd`](https://github.com/sunfishcode/io-lifetimes/blob/8669b5a9fc1d0604d1105f6e39c77fa633ac9c71/src/types.rs#L99).
169#[cfg_attr(rustc_attrs, rustc_layout_scalar_valid_range_start(0))]
170// libstd/os/raw/mod.rs me that every libstd-supported platform has a
171// 32-bit c_int.
172//
173// Below is -2, in two's complement, but that only works out
174// because c_int is 32 bits.
175#[cfg_attr(rustc_attrs, rustc_layout_scalar_valid_range_end(0xFF_FF_FF_FE))]
176struct PipeFd(RawFd);
177
178impl PipeFd {
179    /// * `fd` - PipeFd would take the ownership of this fd.
180    /// * `readable` - true for the read end, false for the write end
181    fn from_raw_fd_checked(fd: RawFd, readable: bool) -> Result<Self, io::Error> {
182        let (access_mode, errmsg) = if readable {
183            (libc::O_RDONLY, "Fd is not the read end")
184        } else {
185            (libc::O_WRONLY, "Fd is not the write end")
186        };
187
188        check_pipe(fd)?;
189        let status_flags = get_status_flags(fd)?;
190        if (status_flags & libc::O_ACCMODE) == access_mode {
191            unsafe { set_nonblocking_checked(fd, status_flags) }?;
192            Ok(Self(fd))
193        } else {
194            Err(io::Error::new(io::ErrorKind::Other, errmsg))
195        }
196    }
197}
198
199impl AsRawFd for PipeFd {
200    fn as_raw_fd(&self) -> RawFd {
201        self.0
202    }
203}
204
205impl Drop for PipeFd {
206    fn drop(&mut self) {
207        let _ = unsafe { libc::close(self.0) };
208    }
209}
210
211/// A buffer that can be written atomically
212#[derive(Copy, Clone, Debug)]
213pub struct AtomicWriteBuffer<'a>(&'a [u8]);
214impl<'a> AtomicWriteBuffer<'a> {
215    /// If buffer is more than PIPE_BUF, then return None.
216    pub fn new(buffer: &'a [u8]) -> Option<Self> {
217        if buffer.len() <= PIPE_BUF {
218            Some(Self(buffer))
219        } else {
220            None
221        }
222    }
223
224    pub fn into_inner(self) -> &'a [u8] {
225        self.0
226    }
227}
228
229/// `IoSlice`s that can be written atomically
230#[derive(Copy, Clone, Debug)]
231pub struct AtomicWriteIoSlices<'a, 'b>(&'a [io::IoSlice<'b>], usize);
232impl<'a, 'b> AtomicWriteIoSlices<'a, 'b> {
233    /// If total length is more than PIPE_BUF, then return None.
234    pub fn new(buffers: &'a [io::IoSlice<'b>]) -> Option<Self> {
235        let mut total_len = 0;
236
237        for buffer in buffers {
238            total_len += buffer.len();
239
240            if total_len > PIPE_BUF {
241                return None;
242            }
243        }
244
245        Some(Self(buffers, total_len))
246    }
247
248    pub fn get_total_len(self) -> usize {
249        self.1
250    }
251
252    pub fn into_inner(self) -> &'a [io::IoSlice<'b>] {
253        self.0
254    }
255}
256
257#[cfg(target_os = "linux")]
258async fn tee_impl(pipe_in: &PipeRead, pipe_out: &PipeWrite, len: usize) -> io::Result<usize> {
259    // There is only one reader and one writer, so it only needs to polled once.
260    let mut read_ready = pipe_in.0.readable().await?;
261    let mut write_ready = pipe_out.0.writable().await?;
262
263    loop {
264        let ret = unsafe {
265            libc::tee(
266                pipe_in.as_raw_fd(),
267                pipe_out.as_raw_fd(),
268                len,
269                libc::SPLICE_F_NONBLOCK,
270            )
271        };
272        match cvt!(ret) {
273            Err(e) if is_wouldblock(&e) => {
274                // Since tokio might use epoll's edge-triggered mode, we cannot blindly
275                // clear the readiness, otherwise it would block forever.
276                //
277                // So what we do instead is to use test_read_write_readiness, which
278                // uses poll to test for readiness.
279                //
280                // Poll always uses level-triggered mode and it does not require
281                // any registration at all.
282                let (read_readiness, write_readiness) = unsafe {
283                    test_read_write_readiness(pipe_in.as_raw_fd(), pipe_out.as_raw_fd())?
284                };
285
286                if !read_readiness {
287                    read_ready.clear_ready();
288                    read_ready = pipe_in.0.readable().await?;
289                }
290
291                if !write_readiness {
292                    write_ready.clear_ready();
293                    write_ready = pipe_out.0.writable().await?;
294                }
295            }
296            Err(e) => break Err(e),
297            Ok(ret) => break Ok(ret as usize),
298        }
299    }
300}
301
302/// Duplicates up to len bytes of data from pipe_in to pipe_out.
303///
304/// It does not consume the data that is duplicated from pipe_in; therefore, that data
305/// can be copied by a subsequent splice.
306#[cfg(target_os = "linux")]
307pub async fn tee(
308    pipe_in: &mut PipeRead,
309    pipe_out: &mut PipeWrite,
310    len: usize,
311) -> io::Result<usize> {
312    tee_impl(pipe_in, pipe_out, len).await
313}
314
315#[cfg(target_os = "linux")]
316fn as_ptr<T>(option: Option<&mut T>) -> *mut T {
317    match option {
318        Some(some) => some,
319        None => ptr::null_mut(),
320    }
321}
322
323#[cfg(target_os = "linux")]
324async fn splice_impl(
325    fd_in: &mut AsyncFd<impl AsRawFd>,
326    mut off_in: Option<&mut off64_t>,
327    fd_out: &AsyncFd<impl AsRawFd>,
328    mut off_out: Option<&mut off64_t>,
329    len: usize,
330    has_more_data: bool,
331) -> io::Result<usize> {
332    // There is only one reader and one writer, so it only needs to polled once.
333    let mut read_ready = fd_in.readable().await?;
334    let mut write_ready = fd_out.writable().await?;
335
336    // Prepare args for the syscall
337    let flags = libc::SPLICE_F_NONBLOCK
338        | if has_more_data {
339            libc::SPLICE_F_MORE
340        } else {
341            0
342        };
343
344    loop {
345        let ret = unsafe {
346            libc::splice(
347                fd_in.as_raw_fd(),
348                as_ptr(off_in.as_deref_mut()),
349                fd_out.as_raw_fd(),
350                as_ptr(off_out.as_deref_mut()),
351                len,
352                flags,
353            )
354        };
355        match cvt!(ret) {
356            Err(e) if is_wouldblock(&e) => {
357                // Since tokio might use epoll's edge-triggered mode, we cannot blindly
358                // clear the readiness, otherwise it would block forever.
359                //
360                // So what we do instead is to use test_read_write_readiness, which
361                // uses poll to test for readiness.
362                //
363                // Poll always uses level-triggered mode and it does not require
364                // any registration at all.
365                let (read_readiness, write_readiness) =
366                    unsafe { test_read_write_readiness(fd_in.as_raw_fd(), fd_out.as_raw_fd())? };
367
368                if !read_readiness {
369                    read_ready.clear_ready();
370                    read_ready = fd_in.readable().await?;
371                }
372
373                if !write_readiness {
374                    write_ready.clear_ready();
375                    write_ready = fd_out.writable().await?;
376                }
377            }
378            Err(e) => break Err(e),
379            Ok(ret) => break Ok(ret as usize),
380        }
381    }
382}
383
384/// Moves data between pipes without copying between kernel address space and
385/// user address space.
386///
387/// It transfers up to len bytes of data from pipe_in to pipe_out.
388#[cfg(target_os = "linux")]
389pub async fn splice(
390    pipe_in: &mut PipeRead,
391    pipe_out: &mut PipeWrite,
392    len: usize,
393) -> io::Result<usize> {
394    splice_impl(&mut pipe_in.0, None, &pipe_out.0, None, len, false).await
395}
396
397/// Pipe read
398pub struct PipeRead(AsyncFd<PipeFd>);
399
400impl TryFrom<RawFd> for PipeRead {
401    type Error = io::Error;
402
403    fn try_from(fd: RawFd) -> Result<Self, Self::Error> {
404        Self::from_raw_fd_checked(fd)
405    }
406}
407
408impl PipeRead {
409    fn new(fd: RawFd) -> Result<Self, io::Error> {
410        Self::from_pipefd(PipeFd(fd))
411    }
412
413    fn from_pipefd(pipe_fd: PipeFd) -> Result<Self, io::Error> {
414        Ok(Self(AsyncFd::new(pipe_fd)?))
415    }
416
417    /// * `fd` - PipeRead would take the ownership of this fd.
418    pub fn from_raw_fd_checked(fd: RawFd) -> Result<Self, io::Error> {
419        Self::from_pipefd(PipeFd::from_raw_fd_checked(fd, true)?)
420    }
421
422    /// Moves data between pipe and fd without copying between kernel address space and
423    /// user address space.
424    ///
425    /// It transfers up to len bytes of data from self to asyncfd_out.
426    ///
427    ///  * `asyncfd_out` - must be have O_NONBLOCK set,
428    ///    otherwise this function might block.
429    ///  * `off_out` - If it is not None, then it would be updated on success.
430    ///  * `has_more_data` - If there is more data to be sent to off_out.
431    ///    This is a helpful hint for socket (see also the description of MSG_MORE
432    ///    in send(2), and the description of TCP_CORK in tcp(7)).
433    #[cfg(target_os = "linux")]
434    pub async fn splice_to(
435        &mut self,
436        asyncfd_out: &AsyncFd<impl AsRawFd>,
437        off_out: Option<&mut off64_t>,
438        len: usize,
439        has_more_data: bool,
440    ) -> io::Result<usize> {
441        splice_impl(&mut self.0, None, asyncfd_out, off_out, len, has_more_data).await
442    }
443}
444
445impl AsyncRead for PipeRead {
446    fn poll_read(
447        mut self: Pin<&mut Self>,
448        cx: &mut Context<'_>,
449        buf: &mut ReadBuf<'_>,
450    ) -> Poll<io::Result<()>> {
451        let fd = self.0.as_raw_fd();
452
453        loop {
454            let pinned = Pin::new(&mut self.0);
455            let mut ready = ready!(pinned.poll_read_ready(cx))?;
456            let ret = unsafe {
457                libc::read(
458                    fd,
459                    buf.unfilled_mut() as *mut _ as *mut c_void,
460                    cmp::min(buf.remaining(), MAX_LEN),
461                )
462            };
463            match cvt!(ret) {
464                Err(e) if is_wouldblock(&e) => {
465                    ready.clear_ready();
466                }
467                Err(e) => return Poll::Ready(Err(e)),
468                Ok(ret) => {
469                    let ret = ret as usize;
470                    unsafe {
471                        buf.assume_init(ret);
472                    };
473                    buf.advance(ret);
474                    return Poll::Ready(Ok(()));
475                }
476            }
477        }
478    }
479}
480
481impl AsRawFd for PipeRead {
482    fn as_raw_fd(&self) -> RawFd {
483        self.0.as_raw_fd()
484    }
485}
486
487impl IntoRawFd for PipeRead {
488    fn into_raw_fd(self) -> RawFd {
489        let inner = self.0.into_inner();
490        let fd = inner.0;
491        mem::forget(inner);
492        fd
493    }
494}
495
496impl FromRawFd for PipeRead {
497    unsafe fn from_raw_fd(fd: RawFd) -> Self {
498        set_nonblocking(fd);
499        Self::new(fd).unwrap()
500    }
501}
502
503impl fmt::Debug for PipeRead {
504    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
505        write!(f, "PipeRead({})", self.as_raw_fd())
506    }
507}
508
509/// Pipe write
510pub struct PipeWrite(AsyncFd<PipeFd>);
511
512impl TryFrom<RawFd> for PipeWrite {
513    type Error = io::Error;
514
515    fn try_from(fd: RawFd) -> Result<Self, Self::Error> {
516        Self::from_raw_fd_checked(fd)
517    }
518}
519
520impl PipeWrite {
521    fn new(fd: RawFd) -> Result<Self, io::Error> {
522        Self::from_pipefd(PipeFd(fd))
523    }
524
525    fn from_pipefd(pipe_fd: PipeFd) -> Result<Self, io::Error> {
526        Ok(Self(AsyncFd::new(pipe_fd)?))
527    }
528
529    /// * `fd` - PipeWrite would take the ownership of this fd.
530    pub fn from_raw_fd_checked(fd: RawFd) -> Result<Self, io::Error> {
531        Self::from_pipefd(PipeFd::from_raw_fd_checked(fd, false)?)
532    }
533}
534
535impl AsRawFd for PipeWrite {
536    fn as_raw_fd(&self) -> RawFd {
537        self.0.as_raw_fd()
538    }
539}
540
541impl IntoRawFd for PipeWrite {
542    fn into_raw_fd(self) -> RawFd {
543        let inner = self.0.into_inner();
544        let fd = inner.0;
545        mem::forget(inner);
546        fd
547    }
548}
549
550impl FromRawFd for PipeWrite {
551    unsafe fn from_raw_fd(fd: RawFd) -> Self {
552        set_nonblocking(fd);
553        Self::new(fd).unwrap()
554    }
555}
556
557impl PipeWrite {
558    fn poll_write_impl(
559        self: Pin<&Self>,
560        cx: &mut Context<'_>,
561        buf: &[u8],
562    ) -> Poll<Result<usize, io::Error>> {
563        let fd = self.0.as_raw_fd();
564
565        loop {
566            let pinned = Pin::new(&self.0);
567            let mut ready = ready!(pinned.poll_write_ready(cx))?;
568            let ret = unsafe {
569                libc::write(
570                    fd,
571                    buf.as_ptr() as *mut c_void,
572                    cmp::min(buf.len(), MAX_LEN),
573                )
574            };
575            match cvt!(ret) {
576                Err(e) if is_wouldblock(&e) => {
577                    ready.clear_ready();
578                }
579                Err(e) => return Poll::Ready(Err(e)),
580                Ok(ret) => return Poll::Ready(Ok(ret as usize)),
581            }
582        }
583    }
584
585    /// Write buf atomically to the pipe, using guarantees provided in POSIX.1
586    pub fn poll_write_atomic(
587        self: Pin<&Self>,
588        cx: &mut Context<'_>,
589        buf: AtomicWriteBuffer,
590    ) -> Poll<Result<usize, io::Error>> {
591        self.poll_write_impl(cx, buf.0)
592    }
593
594    fn poll_write_vectored_impl(
595        self: Pin<&Self>,
596        cx: &mut Context<'_>,
597        bufs: &[io::IoSlice<'_>],
598    ) -> Poll<Result<usize, io::Error>> {
599        let fd = self.0.as_raw_fd();
600
601        loop {
602            let pinned = Pin::new(&self.0);
603            let mut ready = ready!(pinned.poll_write_ready(cx))?;
604            let ret =
605                unsafe { libc::writev(fd, bufs.as_ptr() as *const libc::iovec, bufs.len() as i32) };
606            match cvt!(ret) {
607                Err(e) if is_wouldblock(&e) => {
608                    ready.clear_ready();
609                }
610                Err(e) => return Poll::Ready(Err(e)),
611                Ok(ret) => return Poll::Ready(Ok(ret as usize)),
612            }
613        }
614    }
615
616    pub fn poll_write_vectored_atomic(
617        self: Pin<&Self>,
618        cx: &mut Context<'_>,
619        bufs: AtomicWriteIoSlices<'_, '_>,
620    ) -> Poll<Result<usize, io::Error>> {
621        self.poll_write_vectored_impl(cx, bufs.0)
622    }
623
624    /// Moves data between fd and pipe without copying between kernel address space and
625    /// user address space.
626    ///
627    /// It transfers up to len bytes of data from asyncfd_in to self.
628    ///
629    ///  * `asyncfd_in` - must be have O_NONBLOCK set,
630    ///    otherwise this function might block.
631    ///    There must not be other reader for that fd (or its duplicates).
632    ///  * `off_in` - If it is not None, then it would be updated on success.
633    #[cfg(target_os = "linux")]
634    pub async fn splice_from(
635        &mut self,
636        asyncfd_in: &mut AsyncFd<impl AsRawFd>,
637        off_in: Option<&mut off64_t>,
638        len: usize,
639    ) -> io::Result<usize> {
640        splice_impl(asyncfd_in, off_in, &self.0, None, len, false).await
641    }
642}
643
644impl AsyncWrite for PipeWrite {
645    fn poll_write(
646        self: Pin<&mut Self>,
647        cx: &mut Context<'_>,
648        buf: &[u8],
649    ) -> Poll<Result<usize, io::Error>> {
650        self.as_ref().poll_write_impl(cx, buf)
651    }
652
653    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
654        Poll::Ready(Ok(()))
655    }
656
657    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
658        Poll::Ready(Ok(()))
659    }
660
661    fn poll_write_vectored(
662        self: Pin<&mut Self>,
663        cx: &mut Context<'_>,
664        bufs: &[io::IoSlice<'_>],
665    ) -> Poll<Result<usize, io::Error>> {
666        self.as_ref().poll_write_vectored_impl(cx, bufs)
667    }
668
669    fn is_write_vectored(&self) -> bool {
670        true
671    }
672}
673
674impl fmt::Debug for PipeWrite {
675    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
676        write!(f, "PipeRead({})", self.as_raw_fd())
677    }
678}
679
680#[cfg(any(target_os = "linux", target_os = "solaris"))]
681fn sys_pipe() -> io::Result<(RawFd, RawFd)> {
682    let mut pipefd = [0; 2];
683    let ret = unsafe { libc::pipe2(pipefd.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
684    if ret == -1 {
685        return Err(io::Error::last_os_error());
686    }
687    Ok((pipefd[0], pipefd[1]))
688}
689
690#[cfg(not(any(target_os = "linux", target_os = "solaris")))]
691fn sys_pipe() -> io::Result<(RawFd, RawFd)> {
692    let mut pipefd = [0; 2];
693    try_libc!(unsafe { libc::pipe(pipefd.as_mut_ptr()) });
694    for fd in &pipefd {
695        let ret = try_libc!(unsafe { libc::fcntl(*fd, libc::F_GETFD) });
696        try_libc!(unsafe { libc::fcntl(*fd, libc::F_SETFD, ret | libc::FD_CLOEXEC) });
697        let ret = try_libc!(unsafe { libc::fcntl(*fd, libc::F_GETFL) });
698        try_libc!(unsafe { libc::fcntl(*fd, libc::F_SETFL, ret | libc::O_NONBLOCK) });
699    }
700    Ok((pipefd[0], pipefd[1]))
701}
702
703/// Open pipe
704pub fn pipe() -> io::Result<(PipeRead, PipeWrite)> {
705    let (r, w) = sys_pipe()?;
706    Ok((PipeRead::new(r)?, PipeWrite::new(w)?))
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712
713    use std::fs::File;
714
715    use tokio::io::{AsyncReadExt, AsyncWriteExt};
716
717    #[cfg(target_os = "linux")]
718    use tokio::time::{sleep, Duration};
719
720    #[tokio::test]
721    async fn test() {
722        let (mut r, mut w) = pipe().unwrap();
723
724        let w_task = tokio::spawn(async move {
725            for n in 0..=65535 {
726                w.write_u32(n).await.unwrap();
727            }
728            //w.shutdown().await.unwrap();
729        });
730
731        let r_task = tokio::spawn(async move {
732            let mut n = 0u32;
733            let mut buf = [0; 4 * 128];
734            while n < 65535 {
735                r.read_exact(&mut buf).await.unwrap();
736                for x in buf.chunks(4) {
737                    assert_eq!(x, n.to_be_bytes());
738                    n += 1;
739                }
740            }
741        });
742        tokio::try_join!(w_task, r_task).unwrap();
743    }
744
745    #[tokio::test]
746    async fn test_write_after_shutdown() {
747        let (r, mut w) = pipe().unwrap();
748        w.shutdown().await.unwrap();
749        let result = w.write(b"ok").await;
750        assert!(result.is_ok());
751
752        drop(r)
753    }
754
755    #[tokio::test]
756    async fn test_read_to_end() -> io::Result<()> {
757        let (mut r, mut w) = pipe()?;
758        let t = tokio::spawn(async move {
759            w.write_all(&b"Hello, World!"[..]).await?;
760            io::Result::Ok(())
761        });
762
763        let mut buf = vec![];
764        r.read_to_end(&mut buf).await?;
765        assert_eq!(&b"Hello, World!"[..], &buf[..]);
766
767        t.await?
768    }
769
770    #[tokio::test]
771    async fn test_from_child_stdio() -> io::Result<()> {
772        use std::process::Stdio;
773        use tokio::process::Command;
774
775        let (mut r, w) = pipe()?;
776
777        let script = r#"#!/usr/bin/env python3
778import os
779with os.fdopen(1, 'wb') as w:
780    w.write(b"Hello, World!")
781"#;
782
783        let mut command = Command::new("python");
784        command
785            .args(&["-c", script])
786            .stdout(unsafe { Stdio::from_raw_fd(w.as_raw_fd()) });
787        unsafe {
788            // suppress posix_spawn
789            command.pre_exec(|| Ok(()));
790        }
791        let mut child = command.spawn()?;
792        drop(w);
793
794        let mut buf = vec![];
795        r.read_to_end(&mut buf).await?;
796        assert_eq!(&b"Hello, World!"[..], &buf[..]);
797
798        child.wait().await?;
799        Ok(())
800    }
801
802    #[tokio::test]
803    async fn test_from_child_no_stdio() -> io::Result<()> {
804        use tokio::process::Command;
805
806        let (mut r, w) = pipe()?;
807
808        let script = r#"#!/usr/bin/env python3
809import os
810with os.fdopen(3, 'wb') as w:
811    w.write(b"Hello, World!")
812"#;
813
814        let mut command = Command::new("python");
815        command.args(&["-c", script]);
816        unsafe {
817            let w = w.as_raw_fd();
818            command.pre_exec(move || {
819                if w == 3 {
820                    // drop CLOEXEC
821                    let flags = libc::fcntl(w, libc::F_SETFD);
822                    if flags == -1 {
823                        return Err(io::Error::last_os_error());
824                    }
825                    if flags & libc::FD_CLOEXEC != 0
826                        && libc::fcntl(w, libc::F_SETFD, flags ^ libc::FD_CLOEXEC) == -1
827                    {
828                        return Err(io::Error::last_os_error());
829                    }
830                } else {
831                    let r = libc::dup2(w, 3);
832                    if r == -1 {
833                        return Err(io::Error::last_os_error());
834                    }
835                }
836                Ok(())
837            });
838        }
839        let mut child = command.spawn()?;
840        drop(w);
841
842        let mut buf = vec![];
843        r.read_to_end(&mut buf).await?;
844        assert_eq!(&b"Hello, World!"[..], &buf[..]);
845
846        child.wait().await?;
847        Ok(())
848    }
849
850    #[cfg(target_os = "linux")]
851    #[tokio::test]
852    async fn test_tee() {
853        let (mut r1, mut w1) = pipe().unwrap();
854        let (mut r2, mut w2) = pipe().unwrap();
855
856        for n in 0..1024 {
857            w1.write_u32(n).await.unwrap();
858        }
859
860        tee(&mut r1, &mut w2, 4096).await.unwrap();
861
862        let r2_task = tokio::spawn(async move {
863            let mut n = 0u32;
864            let mut buf = [0; 4 * 128];
865            while n < 1024 {
866                r2.read_exact(&mut buf).await.unwrap();
867                for x in buf.chunks(4) {
868                    assert_eq!(x, n.to_be_bytes());
869                    n += 1;
870                }
871            }
872        });
873
874        let r1_task = tokio::spawn(async move {
875            let mut n = 0u32;
876            let mut buf = [0; 4 * 128];
877            while n < 1024 {
878                r1.read_exact(&mut buf).await.unwrap();
879                for x in buf.chunks(4) {
880                    assert_eq!(x, n.to_be_bytes());
881                    n += 1;
882                }
883            }
884        });
885
886        tokio::try_join!(r1_task, r2_task).unwrap();
887    }
888
889    #[cfg(target_os = "linux")]
890    #[tokio::test]
891    async fn test_tee_no_inf_loop() {
892        let (mut r1, mut w1) = pipe().unwrap();
893        let (mut r2, mut w2) = pipe().unwrap();
894
895        let w1_task = tokio::spawn(async move {
896            sleep(Duration::from_millis(100)).await;
897
898            for n in 0..1024 {
899                w1.write_u32(n).await.unwrap();
900            }
901        });
902
903        for n in 0..1024 {
904            w2.write_u32(n).await.unwrap();
905        }
906
907        let r2_task = tokio::spawn(async move {
908            sleep(Duration::from_millis(200)).await;
909
910            let mut n = 0u32;
911            let mut buf = [0; 4 * 128];
912            while n < 1024 {
913                r2.read_exact(&mut buf).await.unwrap();
914                for x in buf.chunks(4) {
915                    assert_eq!(x, n.to_be_bytes());
916                    n += 1;
917                }
918            }
919        });
920
921        tee(&mut r1, &mut w2, 4096).await.unwrap();
922
923        tokio::try_join!(w1_task, r2_task).unwrap();
924    }
925
926    #[cfg(target_os = "linux")]
927    #[tokio::test]
928    async fn test_splice() {
929        let (mut r1, mut w1) = pipe().unwrap();
930        let (mut r2, mut w2) = pipe().unwrap();
931
932        for n in 0..1024 {
933            w1.write_u32(n).await.unwrap();
934        }
935
936        splice(&mut r1, &mut w2, 4096).await.unwrap();
937
938        let mut n = 0u32;
939        let mut buf = [0; 4 * 128];
940        while n < 1024 {
941            r2.read_exact(&mut buf).await.unwrap();
942            for x in buf.chunks(4) {
943                assert_eq!(x, n.to_be_bytes());
944                n += 1;
945            }
946        }
947    }
948
949    #[cfg(target_os = "linux")]
950    #[tokio::test]
951    async fn test_splice_no_inf_loop() {
952        let (mut r1, mut w1) = pipe().unwrap();
953        let (mut r2, mut w2) = pipe().unwrap();
954
955        let w1_task = tokio::spawn(async move {
956            sleep(Duration::from_millis(100)).await;
957
958            for n in 0..1024 {
959                w1.write_u32(n).await.unwrap();
960            }
961        });
962
963        for n in 0..1024 {
964            w2.write_u32(n).await.unwrap();
965        }
966
967        let r2_task = tokio::spawn(async move {
968            sleep(Duration::from_millis(200)).await;
969
970            let mut n = 0u32;
971            let mut buf = [0; 4 * 128];
972            while n < 1024 {
973                r2.read_exact(&mut buf).await.unwrap();
974                for x in buf.chunks(4) {
975                    assert_eq!(x, n.to_be_bytes());
976                    n += 1;
977                }
978            }
979        });
980
981        splice(&mut r1, &mut w2, 4096).await.unwrap();
982
983        tokio::try_join!(w1_task, r2_task).unwrap();
984    }
985
986    fn as_ioslice<T>(v: &[T]) -> io::IoSlice<'_> {
987        io::IoSlice::new(unsafe {
988            std::slice::from_raw_parts(v.as_ptr() as *const u8, v.len() * std::mem::size_of::<T>())
989        })
990    }
991
992    #[tokio::test]
993    async fn test_writev() {
994        let (mut r, mut w) = pipe().unwrap();
995
996        let w_task = tokio::spawn(async move {
997            let buffer1: Vec<u32> = (0..512).collect();
998            let buffer2: Vec<u32> = (512..1024).collect();
999
1000            w.write_vectored(&[as_ioslice(&buffer1), as_ioslice(&buffer2)])
1001                .await
1002                .unwrap();
1003        });
1004
1005        let r_task = tokio::spawn(async move {
1006            let mut n = 0u32;
1007            let mut buf = [0; 4 * 128];
1008            while n < 1024 {
1009                r.read_exact(&mut buf).await.unwrap();
1010                for x in buf.chunks(4) {
1011                    assert_eq!(x, n.to_ne_bytes());
1012                    n += 1;
1013                }
1014            }
1015        });
1016        tokio::try_join!(w_task, r_task).unwrap();
1017    }
1018
1019    #[tokio::test]
1020    async fn test_piperead_from_raw_fd_checked_success() {
1021        let (r, _w) = pipe().unwrap();
1022        let _r = PipeRead::from_raw_fd_checked(r.into_raw_fd()).unwrap();
1023    }
1024
1025    #[tokio::test]
1026    async fn test_piperead_from_raw_fd_checked_failure_not_read_end() {
1027        let (_r, w) = pipe().unwrap();
1028        let error = PipeRead::from_raw_fd_checked(w.into_raw_fd())
1029            .unwrap_err()
1030            .into_inner()
1031            .unwrap();
1032
1033        assert_eq!(format!("{}", error), "Fd is not the read end");
1034    }
1035
1036    #[tokio::test]
1037    async fn test_piperead_from_raw_fd_checked_failure_not_pipe() {
1038        let fd = File::open("/dev/null").unwrap().into_raw_fd();
1039        let error = PipeRead::from_raw_fd_checked(fd)
1040            .unwrap_err()
1041            .into_inner()
1042            .unwrap();
1043
1044        assert_eq!(format!("{}", error), "Fd is not a pipe");
1045    }
1046
1047    #[tokio::test]
1048    async fn test_pipewrite_from_raw_fd_checked_success() {
1049        let (_r, w) = pipe().unwrap();
1050        let _w = PipeWrite::from_raw_fd_checked(w.into_raw_fd()).unwrap();
1051    }
1052
1053    #[tokio::test]
1054    async fn test_pipewrite_from_raw_fd_checked_failure_not_write_end() {
1055        let (r, _w) = pipe().unwrap();
1056        let error = PipeWrite::from_raw_fd_checked(r.into_raw_fd())
1057            .unwrap_err()
1058            .into_inner()
1059            .unwrap();
1060
1061        assert_eq!(format!("{}", error), "Fd is not the write end");
1062    }
1063
1064    #[tokio::test]
1065    async fn test_pipewrite_from_raw_fd_checked_failure_not_pipe() {
1066        let fd = File::open("/dev/null").unwrap().into_raw_fd();
1067        let error = PipeWrite::from_raw_fd_checked(fd)
1068            .unwrap_err()
1069            .into_inner()
1070            .unwrap();
1071
1072        assert_eq!(format!("{}", error), "Fd is not a pipe");
1073    }
1074
1075    #[test]
1076    fn test_atomic_write_io_slices() {
1077        let bytes: Vec<u8> = (0..PIPE_BUF + 20)
1078            .map(|i| (i % (u8::MAX as usize)) as u8)
1079            .collect();
1080        let mut io_slices = Vec::<io::IoSlice<'_>>::new();
1081
1082        for i in 0..bytes.len() {
1083            io_slices.push(io::IoSlice::new(&bytes[i..i + 1]));
1084        }
1085
1086        for i in 0..PIPE_BUF {
1087            let slices = AtomicWriteIoSlices::new(&io_slices[..i]).unwrap();
1088            assert_eq!(slices.get_total_len(), i);
1089        }
1090
1091        for i in PIPE_BUF + 1..bytes.len() {
1092            assert!(AtomicWriteIoSlices::new(&io_slices[..i]).is_none());
1093        }
1094    }
1095}