async_os_pipe/
lib.rs

1use std::io::{IoSlice, IoSliceMut, Result};
2use std::pin::{pin, Pin};
3use std::task::{Context, Poll};
4
5use bytes::BufMut;
6#[doc(no_inline)]
7use sys::{pipe as sys_pipe, LeftStream, RightStream};
8use tokio::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
9
10enum EitherStream {
11    Left(LeftStream),
12    Right(RightStream),
13}
14
15/// Cross platform bidirectional stream that implements [`AsyncRead`] and [`AsyncWrite`].
16pub struct Stream(EitherStream);
17
18/// Crates a pair of connected cross-platform [Stream]'s.
19///
20/// ```rust
21/// # use async_os_pipe::pipe;
22/// # use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _,};
23/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
24/// let (mut tx, mut rx) = pipe().await.unwrap();
25///
26/// tx.write_all(b"hello world!\n").await.unwrap();
27///
28/// let mut buf = vec![0; 13];
29/// rx.read_exact(&mut buf).await.unwrap();
30///
31/// assert_eq!(buf, b"hello world!\n");
32/// # });
33/// ```
34pub async fn pipe() -> Result<(Stream, Stream)> {
35    let (tx, rx) = sys_pipe().await?;
36    Ok((
37        Stream(EitherStream::Left(tx)),
38        Stream(EitherStream::Right(rx)),
39    ))
40}
41
42macro_rules! multiplex {
43    ($o:ident.$($f:tt)+) => {
44        match &$o.0 {
45            EitherStream::Left(inner) => inner.$($f)+,
46            EitherStream::Right(inner) => inner.$($f)+,
47        }
48    };
49    (pinned, $o:ident.$($f:tt)+) => {
50        match &mut $o.get_mut().0 {
51            EitherStream::Left(inner) => pin!(inner).$($f)+,
52            EitherStream::Right(inner) => pin!(inner).$($f)+,
53        }
54    };
55}
56
57impl AsyncRead for Stream {
58    fn poll_read(
59        self: Pin<&mut Self>,
60        cx: &mut Context<'_>,
61        buf: &mut ReadBuf<'_>,
62    ) -> Poll<Result<()>> {
63        multiplex!(pinned, self.poll_read(cx, buf))
64    }
65}
66
67impl AsyncWrite for Stream {
68    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
69        multiplex!(pinned, self.poll_write(cx, buf))
70    }
71
72    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
73        multiplex!(pinned, self.poll_flush(cx))
74    }
75
76    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
77        multiplex!(pinned, self.poll_shutdown(cx))
78    }
79
80    fn is_write_vectored(&self) -> bool {
81        multiplex!(self.is_write_vectored())
82    }
83
84    fn poll_write_vectored(
85        self: Pin<&mut Self>,
86        cx: &mut Context<'_>,
87        buf: &[IoSlice<'_>],
88    ) -> Poll<Result<usize>> {
89        multiplex!(pinned, self.poll_write_vectored(cx, buf))
90    }
91}
92
93impl Stream {
94    /// Reads or writes from the stream using a user-provided IO operation.
95    ///
96    /// See [`UnixStream::async_io`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.async_io).
97    pub async fn async_io<R>(&self, interest: Interest, f: impl FnMut() -> Result<R>) -> Result<R> {
98        multiplex!(self.async_io(interest, f).await)
99    }
100
101    /// Waits for the stream to become readable.
102    ///
103    /// See [`UnixStream::readable`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.readable).
104    pub async fn readable(&self) -> Result<()> {
105        multiplex!(self.readable().await)
106    }
107
108    /// Waits for any of the requested ready states.
109    ///
110    /// See [`UnixStream::ready`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.ready).
111    pub async fn ready(&self, interest: Interest) -> Result<Ready> {
112        multiplex!(self.ready(interest).await)
113    }
114
115    /// Polls for read readiness.
116    ///
117    /// See [`UnixStream::poll_read_ready`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.poll_read_ready).
118    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
119        multiplex!(self.poll_read_ready(cx))
120    }
121
122    /// Tries to read or write from the socket using a user-provided IO operation.
123    ///
124    /// See [`UnixStream::try_io`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_io).
125    pub fn try_io<R>(&self, interest: Interest, f: impl FnOnce() -> Result<R>) -> Result<R> {
126        multiplex!(self.try_io(interest, f))
127    }
128
129    /// Try to read data from the stream into the provided buffer, returning how many bytes were read.
130    ///
131    /// See [`UnixStream::try_read`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_read).
132    pub fn try_read(&self, buf: &mut [u8]) -> Result<usize> {
133        multiplex!(self.try_read(buf))
134    }
135
136    /// Tries to read data from the stream into the provided buffer, advancing the buffer’s internal cursor, returning how many bytes were read.
137    ///
138    /// See [`UnixStream::try_read_buf`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_read_buf).
139    pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> Result<usize> {
140        multiplex!(self.try_read_buf(buf))
141    }
142
143    /// Tries to read data from the stream into the provided buffers, returning how many bytes were read.
144    ///
145    /// See [`UnixStream::try_read_vectored`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_read_vectored).
146    pub fn try_read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
147        multiplex!(self.try_read_vectored(bufs))
148    }
149
150    /// Waits for the stream to become writable.
151    ///
152    /// See [`UnixStream::writable`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.writable).
153    pub async fn writable(&self) -> Result<()> {
154        multiplex!(self.writable().await)
155    }
156
157    /// Polls for write readiness.
158    ///
159    /// See [`UnixStream::poll_write_ready`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.poll_write_ready).
160    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
161        multiplex!(self.poll_write_ready(cx))
162    }
163
164    /// Tries to write a buffer to the stream, returning how many bytes were written.
165    ///
166    /// See [`UnixStream::try_write`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_write).
167    pub fn try_write(&self, buf: &[u8]) -> Result<usize> {
168        multiplex!(self.try_write(buf))
169    }
170
171    /// Tries to write several buffers to the stream, returning how many bytes were written.
172    ///
173    /// See [`UnixStream::try_write_vectored`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_write_vectored).
174    pub fn try_write_vectored(&self, buf: &[IoSlice<'_>]) -> Result<usize> {
175        multiplex!(self.try_write_vectored(buf))
176    }
177}
178
179#[cfg_attr(unix, path = "sys/unix.rs")]
180#[cfg_attr(windows, path = "sys/windows.rs")]
181mod sys;