1#![cfg_attr(not(feature = "async-io"), doc = "# #[cfg(never)]")]
47use std::io::{self, Error, ErrorKind, IoSlice, Read, Result, StdinLock, StdoutLock, Write};
71use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
72
73use rustix::fs::{fcntl_getfl, fcntl_setfl, fstat, FileType, OFlags};
74
75#[derive(Debug)]
76struct NonBlocking<T: AsFd> {
77 inner: T,
78 prev_flags: OFlags,
79}
80
81impl<T: AsFd> NonBlocking<T> {
82 fn new(inner: T) -> Result<Self> {
83 let ft = FileType::from_raw_mode(fstat(&inner)?.st_mode);
84 if !matches!(
85 ft,
86 FileType::Fifo | FileType::Socket | FileType::CharacterDevice
87 ) {
88 return Err(Error::new(
89 ErrorKind::Other,
90 format!("File type {ft:?} is not pipe-like"),
91 ));
92 }
93
94 let prev_flags = fcntl_getfl(&inner)?;
95 fcntl_setfl(&inner, prev_flags | OFlags::NONBLOCK)?;
96 Ok(Self { inner, prev_flags })
97 }
98}
99
100impl<T: AsFd> Drop for NonBlocking<T> {
101 fn drop(&mut self) {
102 let _: std::result::Result<_, _> = fcntl_setfl(&self.inner, self.prev_flags);
103 }
104}
105
106#[derive(Debug)]
108pub struct PipeStdin {
109 inner: NonBlocking<StdinLock<'static>>,
110}
111
112impl PipeStdin {
113 pub fn lock() -> Result<Self> {
120 let inner = NonBlocking::new(io::stdin().lock())?;
121 Ok(Self { inner })
122 }
123}
124
125impl AsFd for PipeStdin {
126 fn as_fd(&self) -> BorrowedFd<'_> {
127 self.inner.inner.as_fd()
128 }
129}
130
131impl AsRawFd for PipeStdin {
132 fn as_raw_fd(&self) -> RawFd {
133 self.inner.inner.as_raw_fd()
134 }
135}
136
137#[cfg(feature = "async-io")]
139unsafe impl async_io::IoSafe for PipeStdin {}
140
141impl Read for &'_ PipeStdin {
144 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
145 rustix::io::read(self, buf).map_err(Into::into)
146 }
147
148 fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> Result<usize> {
149 rustix::io::readv(self, bufs).map_err(Into::into)
150 }
151}
152
153impl Read for PipeStdin {
154 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
155 <&PipeStdin>::read(&mut &*self, buf)
156 }
157
158 fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> Result<usize> {
159 <&PipeStdin>::read_vectored(&mut &*self, bufs)
160 }
161}
162
163#[derive(Debug)]
165pub struct PipeStdout {
166 inner: NonBlocking<StdoutLock<'static>>,
167}
168
169impl PipeStdout {
170 pub fn lock() -> Result<Self> {
176 let inner = NonBlocking::new(io::stdout().lock())?;
177 Ok(Self { inner })
178 }
179}
180
181impl AsFd for PipeStdout {
182 fn as_fd(&self) -> BorrowedFd<'_> {
183 self.inner.inner.as_fd()
184 }
185}
186
187impl AsRawFd for PipeStdout {
188 fn as_raw_fd(&self) -> RawFd {
189 self.inner.inner.as_raw_fd()
190 }
191}
192
193#[cfg(feature = "async-io")]
195unsafe impl async_io::IoSafe for PipeStdout {}
196
197impl Write for &'_ PipeStdout {
199 fn write(&mut self, buf: &[u8]) -> Result<usize> {
200 rustix::io::write(self, buf).map_err(Into::into)
201 }
202
203 fn flush(&mut self) -> Result<()> {
204 Ok(())
205 }
206
207 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
208 rustix::io::writev(self, bufs).map_err(Into::into)
209 }
210}
211
212impl Write for PipeStdout {
213 fn write(&mut self, buf: &[u8]) -> Result<usize> {
214 <&PipeStdout>::write(&mut &*self, buf)
215 }
216
217 fn flush(&mut self) -> Result<()> {
218 <&PipeStdout>::flush(&mut &*self)
219 }
220
221 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
222 <&PipeStdout>::write_vectored(&mut &*self, bufs)
223 }
224}
225
226#[cfg(feature = "tokio")]
229#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
230mod tokio_impl {
231 use std::pin::Pin;
232 use std::task::{Context, Poll};
233
234 use futures::ready;
235 use tokio::io::unix::AsyncFd;
236 use tokio::io::{Interest, ReadBuf};
237
238 use super::*;
239
240 pub struct TokioPipeStdin {
241 inner: AsyncFd<PipeStdin>,
242 }
243
244 impl futures::AsyncRead for TokioPipeStdin {
245 fn poll_read(
246 self: Pin<&mut Self>,
247 cx: &mut Context<'_>,
248 buf: &mut [u8],
249 ) -> Poll<Result<usize>> {
250 loop {
251 let mut guard = ready!(self.inner.poll_read_ready(cx))?;
252 match guard.try_io(|inner| inner.get_ref().read(buf)) {
253 Ok(ret) => return Poll::Ready(ret),
254 Err(_would_block) => continue,
255 }
256 }
257 }
258
259 fn poll_read_vectored(
260 self: Pin<&mut Self>,
261 cx: &mut Context<'_>,
262 bufs: &mut [io::IoSliceMut<'_>],
263 ) -> Poll<Result<usize>> {
264 loop {
265 let mut guard = ready!(self.inner.poll_read_ready(cx))?;
266 match guard.try_io(|inner| inner.get_ref().read_vectored(bufs)) {
267 Ok(ret) => return Poll::Ready(ret),
268 Err(_would_block) => continue,
269 }
270 }
271 }
272 }
273
274 impl tokio::io::AsyncRead for TokioPipeStdin {
275 fn poll_read(
276 self: Pin<&mut Self>,
277 cx: &mut Context<'_>,
278 buf: &mut ReadBuf<'_>,
279 ) -> Poll<io::Result<()>> {
280 let len = loop {
281 let mut guard = ready!(self.inner.poll_read_ready(cx))?;
282 match guard.try_io(|inner| {
283 let (written, _) = rustix::io::read(inner, unsafe { buf.unfilled_mut() })?;
285 Ok(written.len())
286 }) {
287 Ok(ret) => break ret?,
288 Err(_would_block) => continue,
289 }
290 };
291 buf.advance(len);
292 Poll::Ready(Ok(()))
293 }
294 }
295
296 impl PipeStdin {
297 pub fn lock_tokio() -> Result<TokioPipeStdin> {
303 Self::lock()?.try_into_tokio()
304 }
305
306 pub fn try_into_tokio(self) -> Result<TokioPipeStdin> {
312 let inner = AsyncFd::with_interest(self, Interest::READABLE)?;
313 Ok(TokioPipeStdin { inner })
314 }
315 }
316
317 pub struct TokioPipeStdout {
318 inner: AsyncFd<PipeStdout>,
319 }
320
321 impl futures::AsyncWrite for TokioPipeStdout {
322 fn poll_write(
323 self: Pin<&mut Self>,
324 cx: &mut Context<'_>,
325 buf: &[u8],
326 ) -> Poll<Result<usize>> {
327 loop {
328 let mut guard = ready!(self.inner.poll_write_ready(cx))?;
329 match guard.try_io(|inner| inner.get_ref().write(buf)) {
330 Ok(result) => return Poll::Ready(result),
331 Err(_would_block) => continue,
332 }
333 }
334 }
335
336 fn poll_write_vectored(
337 self: Pin<&mut Self>,
338 cx: &mut Context<'_>,
339 bufs: &[IoSlice<'_>],
340 ) -> Poll<Result<usize>> {
341 loop {
342 let mut guard = ready!(self.inner.poll_write_ready(cx))?;
343 match guard.try_io(|inner| inner.get_ref().write_vectored(bufs)) {
344 Ok(result) => return Poll::Ready(result),
345 Err(_would_block) => continue,
346 }
347 }
348 }
349
350 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
351 Poll::Ready(Ok(()))
352 }
353
354 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
355 Poll::Ready(Ok(()))
356 }
357 }
358
359 impl tokio::io::AsyncWrite for TokioPipeStdout {
360 fn poll_write(
361 self: Pin<&mut Self>,
362 cx: &mut Context<'_>,
363 buf: &[u8],
364 ) -> Poll<Result<usize>> {
365 <Self as futures::AsyncWrite>::poll_write(self, cx, buf)
366 }
367
368 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
369 Poll::Ready(Ok(()))
370 }
371
372 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
373 Poll::Ready(Ok(()))
374 }
375 }
376
377 impl PipeStdout {
378 pub fn lock_tokio() -> Result<TokioPipeStdout> {
384 Self::lock()?.try_into_tokio()
385 }
386
387 pub fn try_into_tokio(self) -> Result<TokioPipeStdout> {
393 let inner = AsyncFd::with_interest(self, Interest::WRITABLE)?;
394 Ok(TokioPipeStdout { inner })
395 }
396 }
397}