1use std::io;
3use std::pin::Pin;
4use std::process::Stdio;
5
6pub type RawPipeHandle = super::RawIoHandle;
7
8pub struct PipeRead {
10 file: std::fs::File,
11}
12
13pub struct AsyncPipeRead {
15 #[cfg(windows)]
16 read: tokio::process::ChildStdout,
20 #[cfg(not(windows))]
21 read: tokio::net::unix::pipe::Receiver,
22}
23
24pub struct PipeWrite {
26 file: std::fs::File,
27}
28
29pub struct AsyncPipeWrite {
31 #[cfg(windows)]
32 write: tokio::process::ChildStdin,
36 #[cfg(not(windows))]
37 write: tokio::net::unix::pipe::Sender,
38}
39
40impl PipeRead {
41 #[cfg(windows)]
44 pub fn into_async(self) -> io::Result<AsyncPipeRead> {
45 let owned: std::os::windows::io::OwnedHandle = self.file.into();
46 let stdout = std::process::ChildStdout::from(owned);
47 Ok(AsyncPipeRead {
48 read: tokio::process::ChildStdout::from_std(stdout)?,
49 })
50 }
51
52 #[cfg(not(windows))]
55 pub fn into_async(self) -> io::Result<AsyncPipeRead> {
56 Ok(AsyncPipeRead {
57 read: tokio::net::unix::pipe::Receiver::from_file(self.file)?,
58 })
59 }
60
61 pub fn try_clone(&self) -> io::Result<Self> {
64 Ok(Self {
65 file: self.file.try_clone()?,
66 })
67 }
68}
69
70impl AsyncPipeRead {
71 #[cfg(windows)]
74 pub fn into_sync(self) -> io::Result<PipeRead> {
75 let owned = self.read.into_owned_handle()?;
76 Ok(PipeRead { file: owned.into() })
77 }
78
79 #[cfg(not(windows))]
82 pub fn into_sync(self) -> io::Result<PipeRead> {
83 let file = self.read.into_nonblocking_fd()?.into();
84 Ok(PipeRead { file })
85 }
86}
87
88impl std::io::Read for PipeRead {
89 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
90 self.file.read(buf)
91 }
92
93 fn read_vectored(
94 &mut self,
95 bufs: &mut [io::IoSliceMut<'_>],
96 ) -> io::Result<usize> {
97 self.file.read_vectored(bufs)
98 }
99}
100
101impl tokio::io::AsyncRead for AsyncPipeRead {
102 fn poll_read(
103 self: Pin<&mut Self>,
104 cx: &mut std::task::Context<'_>,
105 buf: &mut tokio::io::ReadBuf<'_>,
106 ) -> std::task::Poll<io::Result<()>> {
107 Pin::new(&mut self.get_mut().read).poll_read(cx, buf)
108 }
109}
110
111impl PipeWrite {
112 #[cfg(windows)]
115 pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
116 let owned: std::os::windows::io::OwnedHandle = self.file.into();
117 let stdin = std::process::ChildStdin::from(owned);
118 Ok(AsyncPipeWrite {
119 write: tokio::process::ChildStdin::from_std(stdin)?,
120 })
121 }
122
123 #[cfg(not(windows))]
126 pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
127 Ok(AsyncPipeWrite {
128 write: tokio::net::unix::pipe::Sender::from_file(self.file)?,
129 })
130 }
131
132 pub fn try_clone(&self) -> io::Result<Self> {
135 Ok(Self {
136 file: self.file.try_clone()?,
137 })
138 }
139}
140
141impl AsyncPipeWrite {
142 #[cfg(windows)]
145 pub fn into_sync(self) -> io::Result<PipeWrite> {
146 let owned = self.write.into_owned_handle()?;
147 Ok(PipeWrite { file: owned.into() })
148 }
149
150 #[cfg(not(windows))]
153 pub fn into_sync(self) -> io::Result<PipeWrite> {
154 let file = self.write.into_nonblocking_fd()?.into();
155 Ok(PipeWrite { file })
156 }
157}
158
159impl std::io::Write for PipeWrite {
160 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
161 self.file.write(buf)
162 }
163
164 fn flush(&mut self) -> io::Result<()> {
165 self.file.flush()
166 }
167
168 fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
169 self.file.write_vectored(bufs)
170 }
171}
172
173impl tokio::io::AsyncWrite for AsyncPipeWrite {
174 #[inline(always)]
175 fn poll_write(
176 self: std::pin::Pin<&mut Self>,
177 cx: &mut std::task::Context<'_>,
178 buf: &[u8],
179 ) -> std::task::Poll<Result<usize, io::Error>> {
180 Pin::new(&mut self.get_mut().write).poll_write(cx, buf)
181 }
182
183 #[inline(always)]
184 fn poll_flush(
185 self: Pin<&mut Self>,
186 cx: &mut std::task::Context<'_>,
187 ) -> std::task::Poll<Result<(), io::Error>> {
188 Pin::new(&mut self.get_mut().write).poll_flush(cx)
189 }
190
191 #[inline(always)]
192 fn poll_shutdown(
193 self: Pin<&mut Self>,
194 cx: &mut std::task::Context<'_>,
195 ) -> std::task::Poll<Result<(), io::Error>> {
196 Pin::new(&mut self.get_mut().write).poll_shutdown(cx)
197 }
198
199 #[inline(always)]
200 fn is_write_vectored(&self) -> bool {
201 self.write.is_write_vectored()
202 }
203
204 #[inline(always)]
205 fn poll_write_vectored(
206 self: Pin<&mut Self>,
207 cx: &mut std::task::Context<'_>,
208 bufs: &[io::IoSlice<'_>],
209 ) -> std::task::Poll<Result<usize, io::Error>> {
210 Pin::new(&mut self.get_mut().write).poll_write_vectored(cx, bufs)
211 }
212}
213
214impl From<PipeRead> for Stdio {
215 fn from(val: PipeRead) -> Self {
216 Stdio::from(val.file)
217 }
218}
219
220impl From<PipeWrite> for Stdio {
221 fn from(val: PipeWrite) -> Self {
222 Stdio::from(val.file)
223 }
224}
225
226impl From<PipeRead> for std::fs::File {
227 fn from(val: PipeRead) -> Self {
228 val.file
229 }
230}
231
232impl From<PipeWrite> for std::fs::File {
233 fn from(val: PipeWrite) -> Self {
234 val.file
235 }
236}
237
238#[cfg(not(windows))]
239impl From<PipeRead> for std::os::unix::io::OwnedFd {
240 fn from(val: PipeRead) -> Self {
241 val.file.into()
242 }
243}
244
245#[cfg(not(windows))]
246impl From<PipeWrite> for std::os::unix::io::OwnedFd {
247 fn from(val: PipeWrite) -> Self {
248 val.file.into()
249 }
250}
251
252#[cfg(windows)]
253impl From<PipeRead> for std::os::windows::io::OwnedHandle {
254 fn from(val: PipeRead) -> Self {
255 val.file.into()
256 }
257}
258
259#[cfg(windows)]
260impl From<PipeWrite> for std::os::windows::io::OwnedHandle {
261 fn from(val: PipeWrite) -> Self {
262 val.file.into()
263 }
264}
265
266pub fn pipe() -> io::Result<(PipeRead, PipeWrite)> {
273 pipe_impl()
274}
275
276#[cfg(windows)]
278pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
279 unsafe {
281 use std::os::windows::io::FromRawHandle;
282 use std::os::windows::io::OwnedHandle;
283 let (server, client) = crate::winpipe::create_named_pipe()?;
284 let read = std::fs::File::from(OwnedHandle::from_raw_handle(client));
285 let write = std::fs::File::from(OwnedHandle::from_raw_handle(server));
286 Ok((PipeRead { file: read }, PipeWrite { file: write }))
287 }
288}
289
290#[cfg(not(windows))]
292pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
293 use std::os::unix::io::OwnedFd;
294 let (read, write) = os_pipe::pipe()?;
295 let read = std::fs::File::from(Into::<OwnedFd>::into(read));
296 let write = std::fs::File::from(Into::<OwnedFd>::into(write));
297 Ok((PipeRead { file: read }, PipeWrite { file: write }))
298}
299
300#[cfg(test)]
301mod tests {
302 use std::io::Read;
303 use std::io::Write;
304
305 use tokio::io::AsyncReadExt;
306 use tokio::io::AsyncWriteExt;
307
308 use super::*;
309
310 #[test]
311 fn test_pipe() {
312 let (mut read, mut write) = pipe().unwrap();
313 write.write_all(b"hello").unwrap();
315 let mut buf: [u8; 5] = Default::default();
316 read.read_exact(&mut buf).unwrap();
317 assert_eq!(&buf, b"hello");
318 }
319
320 #[tokio::test]
321 async fn test_async_pipe() {
322 let (read, write) = pipe().unwrap();
323 let mut read = read.into_async().unwrap();
324 let mut write = write.into_async().unwrap();
325
326 write.write_all(b"hello").await.unwrap();
327 let mut buf: [u8; 5] = Default::default();
328 read.read_exact(&mut buf).await.unwrap();
329 assert_eq!(&buf, b"hello");
330 }
331
332 #[tokio::test]
334 async fn test_pipe_transmute() {
335 let (mut read, mut write) = pipe().unwrap();
336
337 write.write_all(b"hello").unwrap();
339 let mut buf: [u8; 5] = Default::default();
340 read.read_exact(&mut buf).unwrap();
341 assert_eq!(&buf, b"hello");
342
343 let mut read = read.into_async().unwrap();
344 let mut write = write.into_async().unwrap();
345
346 write.write_all(b"hello").await.unwrap();
348 let mut buf: [u8; 5] = Default::default();
349 read.read_exact(&mut buf).await.unwrap();
350 assert_eq!(&buf, b"hello");
351
352 let mut read = read.into_sync().unwrap();
353 let mut write = write.into_sync().unwrap();
354
355 write.write_all(b"hello").unwrap();
357 let mut buf: [u8; 5] = Default::default();
358 read.read_exact(&mut buf).unwrap();
359 assert_eq!(&buf, b"hello");
360 }
361
362 #[tokio::test]
363 async fn test_async_pipe_is_nonblocking() {
364 let (read, write) = pipe().unwrap();
365 let mut read = read.into_async().unwrap();
366 let mut write = write.into_async().unwrap();
367
368 let a = tokio::spawn(async move {
369 let mut buf: [u8; 5] = Default::default();
370 read.read_exact(&mut buf).await.unwrap();
371 assert_eq!(&buf, b"hello");
372 });
373 let b = tokio::spawn(async move {
374 write.write_all(b"hello").await.unwrap();
375 });
376
377 a.await.unwrap();
378 b.await.unwrap();
379 }
380}