async_os_pipe/lib.rs
1use std::io::{IoSlice, IoSliceMut, Result};
2use std::pin::{pin, Pin};
3use std::task::{Context, Poll};
4
5use bytes::BufMut;
6#[doc(no_inline)]
7use sys::{pipe as sys_pipe, LeftStream, RightStream};
8use tokio::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
9
10enum EitherStream {
11 Left(LeftStream),
12 Right(RightStream),
13}
14
15/// Cross platform bidirectional stream that implements [`AsyncRead`] and [`AsyncWrite`].
16pub struct Stream(EitherStream);
17
18/// Crates a pair of connected cross-platform [Stream]'s.
19///
20/// ```rust
21/// # use async_os_pipe::pipe;
22/// # use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _,};
23/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
24/// let (mut tx, mut rx) = pipe().await.unwrap();
25///
26/// tx.write_all(b"hello world!\n").await.unwrap();
27///
28/// let mut buf = vec![0; 13];
29/// rx.read_exact(&mut buf).await.unwrap();
30///
31/// assert_eq!(buf, b"hello world!\n");
32/// # });
33/// ```
34pub async fn pipe() -> Result<(Stream, Stream)> {
35 let (tx, rx) = sys_pipe().await?;
36 Ok((
37 Stream(EitherStream::Left(tx)),
38 Stream(EitherStream::Right(rx)),
39 ))
40}
41
42macro_rules! multiplex {
43 ($o:ident.$($f:tt)+) => {
44 match &$o.0 {
45 EitherStream::Left(inner) => inner.$($f)+,
46 EitherStream::Right(inner) => inner.$($f)+,
47 }
48 };
49 (pinned, $o:ident.$($f:tt)+) => {
50 match &mut $o.get_mut().0 {
51 EitherStream::Left(inner) => pin!(inner).$($f)+,
52 EitherStream::Right(inner) => pin!(inner).$($f)+,
53 }
54 };
55}
56
57impl AsyncRead for Stream {
58 fn poll_read(
59 self: Pin<&mut Self>,
60 cx: &mut Context<'_>,
61 buf: &mut ReadBuf<'_>,
62 ) -> Poll<Result<()>> {
63 multiplex!(pinned, self.poll_read(cx, buf))
64 }
65}
66
67impl AsyncWrite for Stream {
68 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
69 multiplex!(pinned, self.poll_write(cx, buf))
70 }
71
72 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
73 multiplex!(pinned, self.poll_flush(cx))
74 }
75
76 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
77 multiplex!(pinned, self.poll_shutdown(cx))
78 }
79
80 fn is_write_vectored(&self) -> bool {
81 multiplex!(self.is_write_vectored())
82 }
83
84 fn poll_write_vectored(
85 self: Pin<&mut Self>,
86 cx: &mut Context<'_>,
87 buf: &[IoSlice<'_>],
88 ) -> Poll<Result<usize>> {
89 multiplex!(pinned, self.poll_write_vectored(cx, buf))
90 }
91}
92
93impl Stream {
94 /// Reads or writes from the stream using a user-provided IO operation.
95 ///
96 /// See [`UnixStream::async_io`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.async_io).
97 pub async fn async_io<R>(&self, interest: Interest, f: impl FnMut() -> Result<R>) -> Result<R> {
98 multiplex!(self.async_io(interest, f).await)
99 }
100
101 /// Waits for the stream to become readable.
102 ///
103 /// See [`UnixStream::readable`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.readable).
104 pub async fn readable(&self) -> Result<()> {
105 multiplex!(self.readable().await)
106 }
107
108 /// Waits for any of the requested ready states.
109 ///
110 /// See [`UnixStream::ready`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.ready).
111 pub async fn ready(&self, interest: Interest) -> Result<Ready> {
112 multiplex!(self.ready(interest).await)
113 }
114
115 /// Polls for read readiness.
116 ///
117 /// See [`UnixStream::poll_read_ready`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.poll_read_ready).
118 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
119 multiplex!(self.poll_read_ready(cx))
120 }
121
122 /// Tries to read or write from the socket using a user-provided IO operation.
123 ///
124 /// See [`UnixStream::try_io`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_io).
125 pub fn try_io<R>(&self, interest: Interest, f: impl FnOnce() -> Result<R>) -> Result<R> {
126 multiplex!(self.try_io(interest, f))
127 }
128
129 /// Try to read data from the stream into the provided buffer, returning how many bytes were read.
130 ///
131 /// See [`UnixStream::try_read`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_read).
132 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize> {
133 multiplex!(self.try_read(buf))
134 }
135
136 /// Tries to read data from the stream into the provided buffer, advancing the buffer’s internal cursor, returning how many bytes were read.
137 ///
138 /// See [`UnixStream::try_read_buf`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_read_buf).
139 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> Result<usize> {
140 multiplex!(self.try_read_buf(buf))
141 }
142
143 /// Tries to read data from the stream into the provided buffers, returning how many bytes were read.
144 ///
145 /// See [`UnixStream::try_read_vectored`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_read_vectored).
146 pub fn try_read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
147 multiplex!(self.try_read_vectored(bufs))
148 }
149
150 /// Waits for the stream to become writable.
151 ///
152 /// See [`UnixStream::writable`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.writable).
153 pub async fn writable(&self) -> Result<()> {
154 multiplex!(self.writable().await)
155 }
156
157 /// Polls for write readiness.
158 ///
159 /// See [`UnixStream::poll_write_ready`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.poll_write_ready).
160 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
161 multiplex!(self.poll_write_ready(cx))
162 }
163
164 /// Tries to write a buffer to the stream, returning how many bytes were written.
165 ///
166 /// See [`UnixStream::try_write`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_write).
167 pub fn try_write(&self, buf: &[u8]) -> Result<usize> {
168 multiplex!(self.try_write(buf))
169 }
170
171 /// Tries to write several buffers to the stream, returning how many bytes were written.
172 ///
173 /// See [`UnixStream::try_write_vectored`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.try_write_vectored).
174 pub fn try_write_vectored(&self, buf: &[IoSlice<'_>]) -> Result<usize> {
175 multiplex!(self.try_write_vectored(buf))
176 }
177}
178
179#[cfg_attr(unix, path = "sys/unix.rs")]
180#[cfg_attr(windows, path = "sys/windows.rs")]
181mod sys;