1#![cfg_attr(not(feature = "async-io"), doc = "# #[cfg(never)]")]
47use std::io::{self, Error, ErrorKind, IoSlice, Read, Result, StdinLock, StdoutLock, Write};
71use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
72
73use rustix::fs::{fcntl_getfl, fcntl_setfl, fstat, FileType, OFlags};
74
75#[derive(Debug)]
76struct NonBlocking<T: AsFd> {
77 inner: T,
78 prev_flags: OFlags,
79}
80
81impl<T: AsFd> NonBlocking<T> {
82 fn new(inner: T) -> Result<Self> {
83 let ft = FileType::from_raw_mode(fstat(&inner)?.st_mode);
84 if !matches!(
85 ft,
86 FileType::Fifo | FileType::Socket | FileType::CharacterDevice
87 ) {
88 return Err(Error::new(
89 ErrorKind::Other,
90 format!("File type {ft:?} is not pipe-like"),
91 ));
92 }
93
94 let prev_flags = fcntl_getfl(&inner)?;
95 fcntl_setfl(&inner, prev_flags | OFlags::NONBLOCK)?;
96 Ok(Self { inner, prev_flags })
97 }
98}
99
100impl<T: AsFd> Drop for NonBlocking<T> {
101 fn drop(&mut self) {
102 let _: std::result::Result<_, _> = fcntl_setfl(&self.inner, self.prev_flags);
103 }
104}
105
106#[derive(Debug)]
108pub struct PipeStdin {
109 inner: NonBlocking<StdinLock<'static>>,
110}
111
112impl PipeStdin {
113 pub fn lock() -> Result<Self> {
120 let inner = NonBlocking::new(io::stdin().lock())?;
121 Ok(Self { inner })
122 }
123}
124
125impl AsFd for PipeStdin {
126 fn as_fd(&self) -> BorrowedFd<'_> {
127 self.inner.inner.as_fd()
128 }
129}
130
131impl AsRawFd for PipeStdin {
132 fn as_raw_fd(&self) -> RawFd {
133 self.inner.inner.as_raw_fd()
134 }
135}
136
137#[cfg(feature = "async-io")]
139unsafe impl async_io::IoSafe for PipeStdin {}
140
141impl Read for &'_ PipeStdin {
144 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
145 rustix::io::read(self, buf).map_err(Into::into)
146 }
147
148 fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> Result<usize> {
149 rustix::io::readv(self, bufs).map_err(Into::into)
150 }
151}
152
153impl Read for PipeStdin {
154 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
155 <&PipeStdin>::read(&mut &*self, buf)
156 }
157
158 fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> Result<usize> {
159 <&PipeStdin>::read_vectored(&mut &*self, bufs)
160 }
161}
162
163#[derive(Debug)]
165pub struct PipeStdout {
166 inner: NonBlocking<StdoutLock<'static>>,
167}
168
169impl PipeStdout {
170 pub fn lock() -> Result<Self> {
176 let inner = NonBlocking::new(io::stdout().lock())?;
177 Ok(Self { inner })
178 }
179}
180
181impl AsFd for PipeStdout {
182 fn as_fd(&self) -> BorrowedFd<'_> {
183 self.inner.inner.as_fd()
184 }
185}
186
187impl AsRawFd for PipeStdout {
188 fn as_raw_fd(&self) -> RawFd {
189 self.inner.inner.as_raw_fd()
190 }
191}
192
193#[cfg(feature = "async-io")]
195unsafe impl async_io::IoSafe for PipeStdout {}
196
197impl Write for &'_ PipeStdout {
199 fn write(&mut self, buf: &[u8]) -> Result<usize> {
200 rustix::io::write(self, buf).map_err(Into::into)
201 }
202
203 fn flush(&mut self) -> Result<()> {
204 Ok(())
205 }
206
207 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
208 rustix::io::writev(self, bufs).map_err(Into::into)
209 }
210}
211
212impl Write for PipeStdout {
213 fn write(&mut self, buf: &[u8]) -> Result<usize> {
214 <&PipeStdout>::write(&mut &*self, buf)
215 }
216
217 fn flush(&mut self) -> Result<()> {
218 <&PipeStdout>::flush(&mut &*self)
219 }
220
221 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
222 <&PipeStdout>::write_vectored(&mut &*self, bufs)
223 }
224}
225
226#[cfg(feature = "tokio")]
229#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
230mod tokio_impl {
231 use std::pin::Pin;
232 use std::task::{Context, Poll};
233
234 use futures::ready;
235 use tokio::io::unix::AsyncFd;
236 use tokio::io::{Interest, ReadBuf};
237
238 use super::*;
239
240 pub struct TokioPipeStdin {
241 inner: AsyncFd<PipeStdin>,
242 }
243
244 impl futures::AsyncRead for TokioPipeStdin {
245 fn poll_read(
246 self: Pin<&mut Self>,
247 cx: &mut Context<'_>,
248 buf: &mut [u8],
249 ) -> Poll<Result<usize>> {
250 loop {
251 let mut guard = ready!(self.inner.poll_read_ready(cx))?;
252 match guard.try_io(|inner| inner.get_ref().read(buf)) {
253 Ok(ret) => return Poll::Ready(ret),
254 Err(_would_block) => continue,
255 }
256 }
257 }
258
259 fn poll_read_vectored(
260 self: Pin<&mut Self>,
261 cx: &mut Context<'_>,
262 bufs: &mut [io::IoSliceMut<'_>],
263 ) -> Poll<Result<usize>> {
264 loop {
265 let mut guard = ready!(self.inner.poll_read_ready(cx))?;
266 match guard.try_io(|inner| inner.get_ref().read_vectored(bufs)) {
267 Ok(ret) => return Poll::Ready(ret),
268 Err(_would_block) => continue,
269 }
270 }
271 }
272 }
273
274 impl tokio::io::AsyncRead for TokioPipeStdin {
275 fn poll_read(
276 self: Pin<&mut Self>,
277 cx: &mut Context<'_>,
278 buf: &mut ReadBuf<'_>,
279 ) -> Poll<io::Result<()>> {
280 let len = ready!(<Self as futures::AsyncRead>::poll_read(
281 self,
282 cx,
283 buf.initialize_unfilled()
284 ))?;
285 buf.advance(len);
286 Poll::Ready(Ok(()))
287 }
288 }
289
290 impl PipeStdin {
291 pub fn lock_tokio() -> Result<TokioPipeStdin> {
297 Self::lock()?.try_into_tokio()
298 }
299
300 pub fn try_into_tokio(self) -> Result<TokioPipeStdin> {
306 let inner = AsyncFd::with_interest(self, Interest::READABLE)?;
307 Ok(TokioPipeStdin { inner })
308 }
309 }
310
311 pub struct TokioPipeStdout {
312 inner: AsyncFd<PipeStdout>,
313 }
314
315 impl futures::AsyncWrite for TokioPipeStdout {
316 fn poll_write(
317 self: Pin<&mut Self>,
318 cx: &mut Context<'_>,
319 buf: &[u8],
320 ) -> Poll<Result<usize>> {
321 loop {
322 let mut guard = ready!(self.inner.poll_write_ready(cx))?;
323 match guard.try_io(|inner| inner.get_ref().write(buf)) {
324 Ok(result) => return Poll::Ready(result),
325 Err(_would_block) => continue,
326 }
327 }
328 }
329
330 fn poll_write_vectored(
331 self: Pin<&mut Self>,
332 cx: &mut Context<'_>,
333 bufs: &[IoSlice<'_>],
334 ) -> Poll<Result<usize>> {
335 loop {
336 let mut guard = ready!(self.inner.poll_write_ready(cx))?;
337 match guard.try_io(|inner| inner.get_ref().write_vectored(bufs)) {
338 Ok(result) => return Poll::Ready(result),
339 Err(_would_block) => continue,
340 }
341 }
342 }
343
344 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
345 Poll::Ready(Ok(()))
346 }
347
348 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
349 Poll::Ready(Ok(()))
350 }
351 }
352
353 impl tokio::io::AsyncWrite for TokioPipeStdout {
354 fn poll_write(
355 self: Pin<&mut Self>,
356 cx: &mut Context<'_>,
357 buf: &[u8],
358 ) -> Poll<Result<usize>> {
359 <Self as futures::AsyncWrite>::poll_write(self, cx, buf)
360 }
361
362 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
363 Poll::Ready(Ok(()))
364 }
365
366 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
367 Poll::Ready(Ok(()))
368 }
369 }
370
371 impl PipeStdout {
372 pub fn lock_tokio() -> Result<TokioPipeStdout> {
378 Self::lock()?.try_into_tokio()
379 }
380
381 pub fn try_into_tokio(self) -> Result<TokioPipeStdout> {
387 let inner = AsyncFd::with_interest(self, Interest::WRITABLE)?;
388 Ok(TokioPipeStdout { inner })
389 }
390 }
391}