Skip to main content

deno_io/
pipe.rs

1// Copyright 2018-2026 the Deno authors. MIT license.
2use std::io;
3use std::pin::Pin;
4use std::process::Stdio;
5
6pub type RawPipeHandle = super::RawIoHandle;
7
8// The synchronous read end of a unidirectional pipe.
9pub struct PipeRead {
10  file: std::fs::File,
11}
12
13// The asynchronous read end of a unidirectional pipe.
14pub struct AsyncPipeRead {
15  #[cfg(windows)]
16  /// We use a `ChildStdout` here as it's a much better fit for a Windows named pipe on Windows. We
17  /// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future
18  /// if those can be created from raw handles down the road.
19  read: tokio::process::ChildStdout,
20  #[cfg(not(windows))]
21  read: tokio::net::unix::pipe::Receiver,
22}
23
24// The synchronous write end of a unidirectional pipe.
25pub struct PipeWrite {
26  file: std::fs::File,
27}
28
29// The asynchronous write end of a unidirectional pipe.
30pub struct AsyncPipeWrite {
31  #[cfg(windows)]
32  /// We use a `ChildStdin` here as it's a much better fit for a Windows named pipe on Windows. We
33  /// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future
34  /// if those can be created from raw handles down the road.
35  write: tokio::process::ChildStdin,
36  #[cfg(not(windows))]
37  write: tokio::net::unix::pipe::Sender,
38}
39
40impl PipeRead {
41  /// Converts this sync reader into an async reader. May fail if the Tokio runtime is
42  /// unavailable.
43  #[cfg(windows)]
44  pub fn into_async(self) -> io::Result<AsyncPipeRead> {
45    let owned: std::os::windows::io::OwnedHandle = self.file.into();
46    let stdout = std::process::ChildStdout::from(owned);
47    Ok(AsyncPipeRead {
48      read: tokio::process::ChildStdout::from_std(stdout)?,
49    })
50  }
51
52  /// Converts this sync reader into an async reader. May fail if the Tokio runtime is
53  /// unavailable.
54  #[cfg(not(windows))]
55  pub fn into_async(self) -> io::Result<AsyncPipeRead> {
56    Ok(AsyncPipeRead {
57      read: tokio::net::unix::pipe::Receiver::from_file(self.file)?,
58    })
59  }
60
61  /// Creates a new [`PipeRead`] instance that shares the same underlying file handle
62  /// as the existing [`PipeRead`] instance.
63  pub fn try_clone(&self) -> io::Result<Self> {
64    Ok(Self {
65      file: self.file.try_clone()?,
66    })
67  }
68}
69
70impl AsyncPipeRead {
71  /// Converts this async reader into an sync reader. May fail if the Tokio runtime is
72  /// unavailable.
73  #[cfg(windows)]
74  pub fn into_sync(self) -> io::Result<PipeRead> {
75    let owned = self.read.into_owned_handle()?;
76    Ok(PipeRead { file: owned.into() })
77  }
78
79  /// Converts this async reader into an sync reader. May fail if the Tokio runtime is
80  /// unavailable.
81  #[cfg(not(windows))]
82  pub fn into_sync(self) -> io::Result<PipeRead> {
83    let file = self.read.into_nonblocking_fd()?.into();
84    Ok(PipeRead { file })
85  }
86}
87
88impl std::io::Read for PipeRead {
89  fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
90    self.file.read(buf)
91  }
92
93  fn read_vectored(
94    &mut self,
95    bufs: &mut [io::IoSliceMut<'_>],
96  ) -> io::Result<usize> {
97    self.file.read_vectored(bufs)
98  }
99}
100
101impl tokio::io::AsyncRead for AsyncPipeRead {
102  fn poll_read(
103    self: Pin<&mut Self>,
104    cx: &mut std::task::Context<'_>,
105    buf: &mut tokio::io::ReadBuf<'_>,
106  ) -> std::task::Poll<io::Result<()>> {
107    Pin::new(&mut self.get_mut().read).poll_read(cx, buf)
108  }
109}
110
111impl PipeWrite {
112  /// Converts this sync writer into an async writer. May fail if the Tokio runtime is
113  /// unavailable.
114  #[cfg(windows)]
115  pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
116    let owned: std::os::windows::io::OwnedHandle = self.file.into();
117    let stdin = std::process::ChildStdin::from(owned);
118    Ok(AsyncPipeWrite {
119      write: tokio::process::ChildStdin::from_std(stdin)?,
120    })
121  }
122
123  /// Converts this sync writer into an async writer. May fail if the Tokio runtime is
124  /// unavailable.
125  #[cfg(not(windows))]
126  pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
127    Ok(AsyncPipeWrite {
128      write: tokio::net::unix::pipe::Sender::from_file(self.file)?,
129    })
130  }
131
132  /// Creates a new [`PipeWrite`] instance that shares the same underlying file handle
133  /// as the existing [`PipeWrite`] instance.
134  pub fn try_clone(&self) -> io::Result<Self> {
135    Ok(Self {
136      file: self.file.try_clone()?,
137    })
138  }
139}
140
141impl AsyncPipeWrite {
142  /// Converts this async writer into an sync writer. May fail if the Tokio runtime is
143  /// unavailable.
144  #[cfg(windows)]
145  pub fn into_sync(self) -> io::Result<PipeWrite> {
146    let owned = self.write.into_owned_handle()?;
147    Ok(PipeWrite { file: owned.into() })
148  }
149
150  /// Converts this async writer into an sync writer. May fail if the Tokio runtime is
151  /// unavailable.
152  #[cfg(not(windows))]
153  pub fn into_sync(self) -> io::Result<PipeWrite> {
154    let file = self.write.into_nonblocking_fd()?.into();
155    Ok(PipeWrite { file })
156  }
157}
158
159impl std::io::Write for PipeWrite {
160  fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
161    self.file.write(buf)
162  }
163
164  fn flush(&mut self) -> io::Result<()> {
165    self.file.flush()
166  }
167
168  fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
169    self.file.write_vectored(bufs)
170  }
171}
172
173impl tokio::io::AsyncWrite for AsyncPipeWrite {
174  #[inline(always)]
175  fn poll_write(
176    self: std::pin::Pin<&mut Self>,
177    cx: &mut std::task::Context<'_>,
178    buf: &[u8],
179  ) -> std::task::Poll<Result<usize, io::Error>> {
180    Pin::new(&mut self.get_mut().write).poll_write(cx, buf)
181  }
182
183  #[inline(always)]
184  fn poll_flush(
185    self: Pin<&mut Self>,
186    cx: &mut std::task::Context<'_>,
187  ) -> std::task::Poll<Result<(), io::Error>> {
188    Pin::new(&mut self.get_mut().write).poll_flush(cx)
189  }
190
191  #[inline(always)]
192  fn poll_shutdown(
193    self: Pin<&mut Self>,
194    cx: &mut std::task::Context<'_>,
195  ) -> std::task::Poll<Result<(), io::Error>> {
196    Pin::new(&mut self.get_mut().write).poll_shutdown(cx)
197  }
198
199  #[inline(always)]
200  fn is_write_vectored(&self) -> bool {
201    self.write.is_write_vectored()
202  }
203
204  #[inline(always)]
205  fn poll_write_vectored(
206    self: Pin<&mut Self>,
207    cx: &mut std::task::Context<'_>,
208    bufs: &[io::IoSlice<'_>],
209  ) -> std::task::Poll<Result<usize, io::Error>> {
210    Pin::new(&mut self.get_mut().write).poll_write_vectored(cx, bufs)
211  }
212}
213
214impl From<PipeRead> for Stdio {
215  fn from(val: PipeRead) -> Self {
216    Stdio::from(val.file)
217  }
218}
219
220impl From<PipeWrite> for Stdio {
221  fn from(val: PipeWrite) -> Self {
222    Stdio::from(val.file)
223  }
224}
225
226impl From<PipeRead> for std::fs::File {
227  fn from(val: PipeRead) -> Self {
228    val.file
229  }
230}
231
232impl From<PipeWrite> for std::fs::File {
233  fn from(val: PipeWrite) -> Self {
234    val.file
235  }
236}
237
238#[cfg(not(windows))]
239impl From<PipeRead> for std::os::unix::io::OwnedFd {
240  fn from(val: PipeRead) -> Self {
241    val.file.into()
242  }
243}
244
245#[cfg(not(windows))]
246impl From<PipeWrite> for std::os::unix::io::OwnedFd {
247  fn from(val: PipeWrite) -> Self {
248    val.file.into()
249  }
250}
251
252#[cfg(windows)]
253impl From<PipeRead> for std::os::windows::io::OwnedHandle {
254  fn from(val: PipeRead) -> Self {
255    val.file.into()
256  }
257}
258
259#[cfg(windows)]
260impl From<PipeWrite> for std::os::windows::io::OwnedHandle {
261  fn from(val: PipeWrite) -> Self {
262    val.file.into()
263  }
264}
265
266/// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles,
267/// but either side may be promoted to an async-capable reader/writer.
268///
269/// On Windows, we use a named pipe because that's the only way to get reliable async I/O
270/// support. On Unix platforms, we use the `os_pipe` library, which uses `pipe2` under the hood
271/// (or `pipe` on OSX).
272pub fn pipe() -> io::Result<(PipeRead, PipeWrite)> {
273  pipe_impl()
274}
275
276/// Creates a unidirectional pipe on top of a named pipe (which is technically bidirectional).
277#[cfg(windows)]
278pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
279  // SAFETY: We're careful with handles here
280  unsafe {
281    use std::os::windows::io::FromRawHandle;
282    use std::os::windows::io::OwnedHandle;
283    let (server, client) = crate::winpipe::create_named_pipe()?;
284    let read = std::fs::File::from(OwnedHandle::from_raw_handle(client));
285    let write = std::fs::File::from(OwnedHandle::from_raw_handle(server));
286    Ok((PipeRead { file: read }, PipeWrite { file: write }))
287  }
288}
289
290/// Creates a unidirectional pipe for unix platforms.
291#[cfg(not(windows))]
292pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
293  use std::os::unix::io::OwnedFd;
294  let (read, write) = os_pipe::pipe()?;
295  let read = std::fs::File::from(Into::<OwnedFd>::into(read));
296  let write = std::fs::File::from(Into::<OwnedFd>::into(write));
297  Ok((PipeRead { file: read }, PipeWrite { file: write }))
298}
299
300#[cfg(test)]
301mod tests {
302  use std::io::Read;
303  use std::io::Write;
304
305  use tokio::io::AsyncReadExt;
306  use tokio::io::AsyncWriteExt;
307
308  use super::*;
309
310  #[test]
311  fn test_pipe() {
312    let (mut read, mut write) = pipe().unwrap();
313    // Write to the server and read from the client
314    write.write_all(b"hello").unwrap();
315    let mut buf: [u8; 5] = Default::default();
316    read.read_exact(&mut buf).unwrap();
317    assert_eq!(&buf, b"hello");
318  }
319
320  #[tokio::test]
321  async fn test_async_pipe() {
322    let (read, write) = pipe().unwrap();
323    let mut read = read.into_async().unwrap();
324    let mut write = write.into_async().unwrap();
325
326    write.write_all(b"hello").await.unwrap();
327    let mut buf: [u8; 5] = Default::default();
328    read.read_exact(&mut buf).await.unwrap();
329    assert_eq!(&buf, b"hello");
330  }
331
332  /// Test a round-trip through async mode and back.
333  #[tokio::test]
334  async fn test_pipe_transmute() {
335    let (mut read, mut write) = pipe().unwrap();
336
337    // Sync
338    write.write_all(b"hello").unwrap();
339    let mut buf: [u8; 5] = Default::default();
340    read.read_exact(&mut buf).unwrap();
341    assert_eq!(&buf, b"hello");
342
343    let mut read = read.into_async().unwrap();
344    let mut write = write.into_async().unwrap();
345
346    // Async
347    write.write_all(b"hello").await.unwrap();
348    let mut buf: [u8; 5] = Default::default();
349    read.read_exact(&mut buf).await.unwrap();
350    assert_eq!(&buf, b"hello");
351
352    let mut read = read.into_sync().unwrap();
353    let mut write = write.into_sync().unwrap();
354
355    // Sync
356    write.write_all(b"hello").unwrap();
357    let mut buf: [u8; 5] = Default::default();
358    read.read_exact(&mut buf).unwrap();
359    assert_eq!(&buf, b"hello");
360  }
361
362  #[tokio::test]
363  async fn test_async_pipe_is_nonblocking() {
364    let (read, write) = pipe().unwrap();
365    let mut read = read.into_async().unwrap();
366    let mut write = write.into_async().unwrap();
367
368    let a = tokio::spawn(async move {
369      let mut buf: [u8; 5] = Default::default();
370      read.read_exact(&mut buf).await.unwrap();
371      assert_eq!(&buf, b"hello");
372    });
373    let b = tokio::spawn(async move {
374      write.write_all(b"hello").await.unwrap();
375    });
376
377    a.await.unwrap();
378    b.await.unwrap();
379  }
380}