1use crate::lockers::{StderrLocker, StdinLocker, StdoutLocker};
2#[cfg(feature = "char-device")]
3use char_device::CharDevice;
4use duplex::Duplex;
5use io_extras::grip::{AsRawGrip, AsRawReadWriteGrip, FromRawGrip, RawGrip};
6#[cfg(windows)]
7use io_extras::os::windows::{
8 AsHandleOrSocket, AsRawHandleOrSocket, AsRawReadWriteHandleOrSocket, AsReadWriteHandleOrSocket,
9 BorrowedHandleOrSocket, RawHandleOrSocket,
10};
11use io_extras::raw::{RawReadable, RawWriteable};
12use io_lifetimes::{FromFilelike, FromSocketlike, IntoFilelike, IntoSocketlike};
13use std::fmt::{self, Arguments, Debug};
14use std::fs::{File, OpenOptions};
15use std::io::{self, IoSlice, IoSliceMut, Read, Seek, Write};
16use std::net::TcpStream;
17#[cfg(unix)]
18use std::os::unix::{
19 io::{AsRawFd, RawFd},
20 net::UnixStream,
21};
22#[cfg(target_os = "wasi")]
23use std::os::wasi::io::{AsRawFd, RawFd};
24use system_interface::io::{Peek, ReadReady};
25#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
26use {
27 duplex::HalfDuplex,
28 socketpair::{socketpair_stream, SocketpairStream},
29};
30#[cfg(not(windows))]
31use {
32 io_extras::os::rustix::{AsRawReadWriteFd, AsReadWriteFd},
33 io_lifetimes::{AsFd, BorrowedFd},
34};
35#[cfg(not(target_os = "wasi"))]
36use {
37 os_pipe::{pipe, PipeReader, PipeWriter},
39 std::{
40 io::{copy, Cursor},
41 process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, Stdio},
42 thread::{self, JoinHandle},
43 },
44};
45
46pub struct StreamReader {
57 handle: RawReadable,
58 resources: ReadResources,
59}
60
61pub struct StreamWriter {
74 handle: RawWriteable,
75 resources: WriteResources,
76}
77
78pub struct StreamDuplexer {
92 read_handle: RawReadable,
93 write_handle: RawWriteable,
94 resources: DuplexResources,
95}
96
97#[cfg(windows)]
103unsafe impl Send for StreamReader {}
104#[cfg(windows)]
105unsafe impl Sync for StreamReader {}
106#[cfg(windows)]
107unsafe impl Send for StreamWriter {}
108#[cfg(windows)]
109unsafe impl Sync for StreamWriter {}
110#[cfg(windows)]
111unsafe impl Send for StreamDuplexer {}
112#[cfg(windows)]
113unsafe impl Sync for StreamDuplexer {}
114
115enum ReadResources {
117 File(File),
118 TcpStream(TcpStream),
119 #[cfg(unix)]
120 UnixStream(UnixStream),
121 #[cfg(not(target_os = "wasi"))] PipeReader(PipeReader),
123 Stdin(StdinLocker),
124 #[cfg(not(target_os = "wasi"))] PipedThread(Option<(PipeReader, JoinHandle<io::Result<()>>)>),
126 #[cfg(not(target_os = "wasi"))] Child(Child),
128 #[cfg(not(target_os = "wasi"))] ChildStdout(ChildStdout),
130 #[cfg(not(target_os = "wasi"))] ChildStderr(ChildStderr),
132}
133
134#[allow(dead_code)] enum WriteResources {
137 File(File),
138 TcpStream(TcpStream),
139 #[cfg(unix)]
140 UnixStream(UnixStream),
141 #[cfg(not(target_os = "wasi"))] PipeWriter(PipeWriter),
143 Stdout(StdoutLocker),
144 Stderr(StderrLocker),
145 #[cfg(not(target_os = "wasi"))] PipedThread(Option<(PipeWriter, JoinHandle<io::Result<Box<dyn Write + Send>>>)>),
147 #[cfg(not(target_os = "wasi"))] Child(Child),
149 #[cfg(not(target_os = "wasi"))] ChildStdin(ChildStdin),
151}
152
153enum DuplexResources {
155 #[cfg(not(target_os = "wasi"))] PipeReaderWriter((PipeReader, PipeWriter)),
157 StdinStdout((StdinLocker, StdoutLocker)),
158 #[cfg(not(target_os = "wasi"))] Child(Child),
160 #[cfg(not(target_os = "wasi"))] ChildStdoutStdin((ChildStdout, ChildStdin)),
162 #[cfg(feature = "char-device")]
163 CharDevice(CharDevice),
164 DevNull(File),
165 TcpStream(TcpStream),
166 #[cfg(unix)]
167 UnixStream(UnixStream),
168 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
169 SocketpairStream(SocketpairStream),
170 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
171 SocketedThreadFunc(Option<(SocketpairStream, JoinHandle<io::Result<SocketpairStream>>)>),
172 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
173 SocketedThread(
174 Option<(
175 SocketpairStream,
176 JoinHandle<io::Result<Box<dyn HalfDuplex + Send>>>,
177 )>,
178 ),
179 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
180 SocketedThreadReadReady(
181 Option<(
182 SocketpairStream,
183 JoinHandle<io::Result<Box<dyn HalfDuplexReadReady + Send>>>,
184 )>,
185 ),
186}
187
188impl StreamReader {
189 #[inline]
203 pub fn stdin() -> io::Result<Self> {
204 let stdin_locker = StdinLocker::new()?;
205
206 #[cfg(not(windows))]
208 let handle = stdin_locker.as_raw_fd();
209
210 #[cfg(windows)]
213 let handle = RawHandleOrSocket::stdin();
214
215 Ok(Self::handle(handle, ReadResources::Stdin(stdin_locker)))
216 }
217
218 #[inline]
222 #[must_use]
223 pub fn file<Filelike: IntoFilelike + Read + Write + Seek>(filelike: Filelike) -> Self {
224 Self::_file(File::from_into_filelike(filelike))
228 }
229
230 #[inline]
231 #[must_use]
232 fn _file(file: File) -> Self {
233 let handle = file.as_raw_grip();
234 Self::handle(handle, ReadResources::File(file))
235 }
236
237 #[inline]
242 #[must_use]
243 pub fn tcp_stream<Socketlike: IntoSocketlike>(socketlike: Socketlike) -> Self {
244 Self::_tcp_stream(TcpStream::from_into_socketlike(socketlike))
245 }
246
247 #[inline]
248 #[must_use]
249 fn _tcp_stream(tcp_stream: TcpStream) -> Self {
250 let handle = tcp_stream.as_raw_grip();
251 Self::handle(handle, ReadResources::TcpStream(tcp_stream))
255 }
256
257 #[cfg(unix)]
259 #[inline]
260 #[must_use]
261 pub fn unix_stream(unix_stream: UnixStream) -> Self {
262 let handle = unix_stream.as_raw_grip();
263 Self::handle(handle, ReadResources::UnixStream(unix_stream))
264 }
265
266 #[cfg(not(target_os = "wasi"))] #[inline]
269 #[must_use]
270 pub fn pipe_reader(pipe_reader: PipeReader) -> Self {
271 let handle = pipe_reader.as_raw_grip();
272 Self::handle(handle, ReadResources::PipeReader(pipe_reader))
273 }
274
275 #[cfg(not(target_os = "wasi"))] pub fn read_from_command(mut command: Command) -> io::Result<Self> {
278 command.stdin(Stdio::null());
279 command.stdout(Stdio::piped());
280 let child = command.spawn()?;
281 let handle = child.stdout.as_ref().unwrap().as_raw_grip();
282 Ok(Self::handle(handle, ReadResources::Child(child)))
283 }
284
285 #[cfg(not(target_os = "wasi"))] #[inline]
288 #[must_use]
289 pub fn child_stdout(child_stdout: ChildStdout) -> Self {
290 let handle = child_stdout.as_raw_grip();
291 Self::handle(handle, ReadResources::ChildStdout(child_stdout))
292 }
293
294 #[cfg(not(target_os = "wasi"))] #[inline]
297 #[must_use]
298 pub fn child_stderr(child_stderr: ChildStderr) -> Self {
299 let handle = child_stderr.as_raw_grip();
300 Self::handle(handle, ReadResources::ChildStderr(child_stderr))
301 }
302
303 #[cfg(not(target_os = "wasi"))] pub fn piped_thread(mut boxed_read: Box<dyn Read + Send>) -> io::Result<Self> {
308 let (pipe_reader, mut pipe_writer) = pipe()?;
309 let join_handle = thread::Builder::new()
310 .name("piped thread for boxed reader".to_owned())
311 .spawn(move || copy(&mut *boxed_read, &mut pipe_writer).map(|_size| ()))?;
312 let handle = pipe_reader.as_raw_grip();
313 Ok(Self::handle(
314 handle,
315 ReadResources::PipedThread(Some((pipe_reader, join_handle))),
316 ))
317 }
318
319 pub fn null() -> io::Result<Self> {
321 #[cfg(not(windows))]
322 {
323 Ok(Self::file(File::open("/dev/null")?))
324 }
325
326 #[cfg(windows)]
327 {
328 Ok(Self::file(File::open("nul")?))
329 }
330 }
331
332 #[inline]
334 #[cfg(not(target_os = "wasi"))] pub fn str<S: AsRef<str>>(s: S) -> io::Result<Self> {
336 Self::bytes(s.as_ref().as_bytes())
337 }
338
339 #[cfg(not(target_os = "wasi"))] pub fn bytes(bytes: &[u8]) -> io::Result<Self> {
342 #[cfg(not(any(windows, target_os = "redox")))]
344 if bytes.len() <= rustix::pipe::PIPE_BUF {
345 let (pipe_reader, mut pipe_writer) = pipe()?;
346
347 pipe_writer.write_all(bytes)?;
348 pipe_writer.flush()?;
349 drop(pipe_writer);
350
351 let handle = pipe_reader.as_raw_grip();
352 return Ok(Self::handle(handle, ReadResources::PipeReader(pipe_reader)));
353 }
354
355 Self::piped_thread(Box::new(Cursor::new(bytes.to_vec())))
357 }
358
359 #[inline]
360 #[must_use]
361 fn handle(handle: RawGrip, resources: ReadResources) -> Self {
362 Self {
363 handle: unsafe { RawReadable::from_raw_grip(handle) },
364 resources,
365 }
366 }
367
368 fn map_err(&mut self, e: io::Error) -> io::Error {
369 match &mut self.resources {
370 #[cfg(not(target_os = "wasi"))] ReadResources::PipedThread(piped_thread) => {
372 let (pipe_reader, join_handle) = piped_thread.take().unwrap();
373 drop(pipe_reader);
374 join_handle.join().unwrap().unwrap_err()
375 }
376 _ => e,
377 }
378 }
379}
380
381impl StreamWriter {
382 #[inline]
398 pub fn stdout() -> io::Result<Self> {
399 let stdout_locker = StdoutLocker::new()?;
400
401 #[cfg(not(windows))]
403 let handle = stdout_locker.as_raw_fd();
404
405 #[cfg(windows)]
408 let handle = RawHandleOrSocket::stdout();
409
410 Ok(Self::handle(handle, WriteResources::Stdout(stdout_locker)))
411 }
412
413 #[inline]
430 pub fn stderr() -> io::Result<Self> {
431 let stderr_locker = StderrLocker::new()?;
432
433 #[cfg(not(windows))]
435 let handle = stderr_locker.as_raw_fd();
436
437 #[cfg(windows)]
440 let handle = RawHandleOrSocket::stderr();
441
442 Ok(Self::handle(handle, WriteResources::Stderr(stderr_locker)))
443 }
444
445 #[inline]
449 #[must_use]
450 pub fn file<Filelike: IntoFilelike + Read + Write + Seek>(filelike: Filelike) -> Self {
451 Self::_file(File::from_into_filelike(filelike))
455 }
456
457 #[inline]
458 #[must_use]
459 fn _file(file: File) -> Self {
460 let handle = file.as_raw_grip();
461 Self::handle(handle, WriteResources::File(file))
462 }
463
464 #[inline]
469 #[must_use]
470 pub fn tcp_stream<Socketlike: IntoSocketlike>(socketlike: Socketlike) -> Self {
471 Self::_tcp_stream(TcpStream::from_into_socketlike(socketlike))
475 }
476
477 #[inline]
478 #[must_use]
479 fn _tcp_stream(tcp_stream: TcpStream) -> Self {
480 let handle = tcp_stream.as_raw_grip();
481 Self::handle(handle, WriteResources::TcpStream(tcp_stream))
482 }
483
484 #[cfg(unix)]
486 #[inline]
487 #[must_use]
488 pub fn unix_stream(unix_stream: UnixStream) -> Self {
489 let handle = unix_stream.as_raw_grip();
490 Self::handle(handle, WriteResources::UnixStream(unix_stream))
491 }
492
493 #[cfg(not(target_os = "wasi"))] #[inline]
496 #[must_use]
497 pub fn pipe_writer(pipe_writer: PipeWriter) -> Self {
498 let handle = pipe_writer.as_raw_grip();
499 Self::handle(handle, WriteResources::PipeWriter(pipe_writer))
500 }
501
502 #[cfg(not(target_os = "wasi"))] pub fn write_to_command(mut command: Command) -> io::Result<Self> {
506 command.stdin(Stdio::piped());
507 command.stdout(Stdio::null());
508 let child = command.spawn()?;
509 let handle = child.stdin.as_ref().unwrap().as_raw_grip();
510 Ok(Self::handle(handle, WriteResources::Child(child)))
511 }
512
513 #[cfg(not(target_os = "wasi"))] #[inline]
516 #[must_use]
517 pub fn child_stdin(child_stdin: ChildStdin) -> Self {
518 let handle = child_stdin.as_raw_grip();
519 Self::handle(handle, WriteResources::ChildStdin(child_stdin))
520 }
521
522 #[cfg(not(target_os = "wasi"))] pub fn piped_thread(mut boxed_write: Box<dyn Write + Send>) -> io::Result<Self> {
535 let (mut pipe_reader, pipe_writer) = pipe()?;
536 let join_handle = thread::Builder::new()
537 .name("piped thread for boxed writer".to_owned())
538 .spawn(move || {
539 copy(&mut pipe_reader, &mut *boxed_write)?;
540 boxed_write.flush()?;
541 Ok(boxed_write)
542 })?;
543 let handle = pipe_writer.as_raw_grip();
544 Ok(Self::handle(
545 handle,
546 WriteResources::PipedThread(Some((pipe_writer, join_handle))),
547 ))
548 }
549
550 pub fn null() -> io::Result<Self> {
552 #[cfg(not(windows))]
553 {
554 Ok(Self::file(File::create("/dev/null")?))
555 }
556
557 #[cfg(windows)]
558 {
559 Ok(Self::file(File::create("nul")?))
560 }
561 }
562
563 #[inline]
564 fn handle(handle: RawGrip, resources: WriteResources) -> Self {
565 Self {
566 handle: unsafe { RawWriteable::from_raw_grip(handle) },
567 resources,
568 }
569 }
570
571 fn map_err(&mut self, e: io::Error) -> io::Error {
572 match &mut self.resources {
573 #[cfg(not(target_os = "wasi"))] WriteResources::PipedThread(piped_thread) => {
575 let (pipe_writer, join_handle) = piped_thread.take().unwrap();
576 drop(pipe_writer);
577 join_handle.join().unwrap().map(|_| ()).unwrap_err()
578 }
579 _ => e,
580 }
581 }
582}
583
584impl StreamDuplexer {
585 #[inline]
596 pub fn stdin_stdout() -> io::Result<Self> {
597 let stdin_locker = StdinLocker::new()?;
598 let stdout_locker = StdoutLocker::new()?;
599
600 #[cfg(not(windows))]
602 let (read, write) = (stdin_locker.as_raw_grip(), stdout_locker.as_raw_grip());
603
604 #[cfg(windows)]
607 let (read, write) = (RawHandleOrSocket::stdin(), RawHandleOrSocket::stdout());
608
609 Ok(Self::two_handles(
610 read,
611 write,
612 DuplexResources::StdinStdout((stdin_locker, stdout_locker)),
613 ))
614 }
615
616 #[cfg(feature = "char-device")]
618 #[cfg_attr(docsrs, doc(cfg(feature = "char-device")))]
619 #[inline]
620 #[must_use]
621 pub fn char_device(char_device: CharDevice) -> Self {
622 let handle = char_device.as_raw_grip();
623 Self::handle(handle, DuplexResources::CharDevice(char_device))
624 }
625
626 #[inline]
631 #[must_use]
632 pub fn tcp_stream<Socketlike: IntoSocketlike>(socketlike: Socketlike) -> Self {
633 Self::_tcp_stream(TcpStream::from_into_socketlike(socketlike))
634 }
635
636 #[inline]
637 #[must_use]
638 fn _tcp_stream(tcp_stream: TcpStream) -> Self {
639 let handle = tcp_stream.as_raw_grip();
640 Self::handle(handle, DuplexResources::TcpStream(tcp_stream))
644 }
645
646 #[cfg(unix)]
648 #[must_use]
649 pub fn unix_stream(unix_stream: UnixStream) -> Self {
650 let handle = unix_stream.as_raw_grip();
651 Self::handle(handle, DuplexResources::UnixStream(unix_stream))
652 }
653
654 #[cfg(not(target_os = "wasi"))] #[inline]
657 #[must_use]
658 pub fn pipe_reader_writer(pipe_reader: PipeReader, pipe_writer: PipeWriter) -> Self {
659 let read = pipe_reader.as_raw_grip();
660 let write = pipe_writer.as_raw_grip();
661 Self::two_handles(
662 read,
663 write,
664 DuplexResources::PipeReaderWriter((pipe_reader, pipe_writer)),
665 )
666 }
667
668 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
670 #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
671 #[must_use]
672 pub fn socketpair_stream(stream: SocketpairStream) -> Self {
673 let handle = stream.as_raw_grip();
674 Self::handle(handle, DuplexResources::SocketpairStream(stream))
675 }
676
677 #[cfg(not(target_os = "wasi"))] pub fn duplex_with_command(mut command: Command) -> io::Result<Self> {
680 command.stdin(Stdio::piped());
681 command.stdout(Stdio::piped());
682 let child = command.spawn()?;
683 let read = child.stdout.as_ref().unwrap().as_raw_grip();
684 let write = child.stdin.as_ref().unwrap().as_raw_grip();
685 Ok(Self::two_handles(
686 read,
687 write,
688 DuplexResources::Child(child),
689 ))
690 }
691
692 #[cfg(not(target_os = "wasi"))] #[inline]
696 #[must_use]
697 pub fn child_stdout_stdin(child_stdout: ChildStdout, child_stdin: ChildStdin) -> Self {
698 let read = child_stdout.as_raw_grip();
699 let write = child_stdin.as_raw_grip();
700 Self::two_handles(
701 read,
702 write,
703 DuplexResources::ChildStdoutStdin((child_stdout, child_stdin)),
704 )
705 }
706
707 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
718 #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
719 pub fn socketed_thread_read_first(
720 mut boxed_duplex: Box<dyn HalfDuplex + Send>,
721 ) -> io::Result<Self> {
722 let (a, b) = socketpair_stream()?;
723 let join_handle = thread::Builder::new()
724 .name("socketed thread for boxed duplexer".to_owned())
725 .spawn(move || {
726 read_first(a, &mut *boxed_duplex)?;
727 Ok(boxed_duplex)
728 })?;
729 let handle = b.as_raw_grip();
730 Ok(Self::handle(
731 handle,
732 DuplexResources::SocketedThread(Some((b, join_handle))),
733 ))
734 }
735
736 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
747 #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
748 pub fn socketed_thread_write_first(
749 mut boxed_duplex: Box<dyn HalfDuplex + Send>,
750 ) -> io::Result<Self> {
751 let (a, b) = socketpair_stream()?;
752 let join_handle = thread::Builder::new()
753 .name("socketed thread for boxed duplexer".to_owned())
754 .spawn(move || {
755 write_first(a, &mut *boxed_duplex)?;
756 Ok(boxed_duplex)
757 })?;
758 let handle = b.as_raw_grip();
759 Ok(Self::handle(
760 handle,
761 DuplexResources::SocketedThread(Some((b, join_handle))),
762 ))
763 }
764
765 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
780 #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
781 pub fn socketed_thread(
782 mut boxed_duplex: Box<dyn HalfDuplexReadReady + Send>,
783 ) -> io::Result<Self> {
784 let (a, b) = socketpair_stream()?;
785 let join_handle = thread::Builder::new()
786 .name("socketed thread for boxed duplexer".to_owned())
787 .spawn(move || {
788 loop {
789 if a.num_ready_bytes()? != 0 {
790 write_first(a, &mut *boxed_duplex)?;
791 break;
792 }
793 if boxed_duplex.num_ready_bytes()? != 0 {
794 read_first(a, &mut *boxed_duplex)?;
795 break;
796 }
797
798 std::thread::sleep(std::time::Duration::from_secs(1));
801 }
802 Ok(boxed_duplex)
803 })?;
804 let handle = b.as_raw_grip();
805 Ok(Self::handle(
806 handle,
807 DuplexResources::SocketedThreadReadReady(Some((b, join_handle))),
808 ))
809 }
810
811 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
822 #[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
823 pub fn socketed_thread_func(
824 func: Box<dyn Send + FnOnce(SocketpairStream) -> io::Result<SocketpairStream>>,
825 ) -> io::Result<Self> {
826 let (a, b) = socketpair_stream()?;
827 let join_handle = thread::Builder::new()
828 .name("socketed thread for boxed duplexer".to_owned())
829 .spawn(move || func(a))?;
830 let handle = b.as_raw_grip();
831 Ok(Self::handle(
832 handle,
833 DuplexResources::SocketedThreadFunc(Some((b, join_handle))),
834 ))
835 }
836
837 pub fn null() -> io::Result<Self> {
840 #[cfg(not(windows))]
841 let file = OpenOptions::new()
842 .read(true)
843 .write(true)
844 .open("/dev/null")?;
845
846 #[cfg(windows)]
847 let file = OpenOptions::new().read(true).write(true).open("nul")?;
848
849 let handle = file.as_raw_grip();
850 Ok(Self::handle(handle, DuplexResources::DevNull(file)))
851 }
852
853 #[inline]
854 #[must_use]
855 fn handle(handle: RawGrip, resources: DuplexResources) -> Self {
856 Self {
857 read_handle: unsafe { RawReadable::from_raw_grip(handle) },
858 write_handle: unsafe { RawWriteable::from_raw_grip(handle) },
859 resources,
860 }
861 }
862
863 #[inline]
864 #[must_use]
865 fn two_handles(read: RawGrip, write: RawGrip, resources: DuplexResources) -> Self {
866 Self {
867 read_handle: unsafe { RawReadable::from_raw_grip(read) },
868 write_handle: unsafe { RawWriteable::from_raw_grip(write) },
869 resources,
870 }
871 }
872
873 fn map_err(&mut self, e: io::Error) -> io::Error {
874 match &mut self.resources {
875 _ => e,
876 }
877 }
878}
879
880impl Read for StreamReader {
881 #[inline]
882 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
883 match self.handle.read(buf) {
884 Ok(size) => Ok(size),
885 Err(e) => Err(self.map_err(e)),
886 }
887 }
888
889 #[inline]
890 fn read_vectored(&mut self, bufs: &mut [IoSliceMut]) -> io::Result<usize> {
891 match self.handle.read_vectored(bufs) {
892 Ok(size) => Ok(size),
893 Err(e) => Err(self.map_err(e)),
894 }
895 }
896
897 #[cfg(can_vector)]
898 #[inline]
899 fn is_read_vectored(&self) -> bool {
900 self.handle.is_read_vectored()
901 }
902
903 #[inline]
904 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
905 match self.handle.read_to_end(buf) {
906 Ok(size) => Ok(size),
907 Err(e) => Err(self.map_err(e)),
908 }
909 }
910
911 #[inline]
912 fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
913 match self.handle.read_to_string(buf) {
914 Ok(size) => Ok(size),
915 Err(e) => Err(self.map_err(e)),
916 }
917 }
918
919 #[inline]
920 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
921 match self.handle.read_exact(buf) {
922 Ok(()) => Ok(()),
923 Err(e) => Err(self.map_err(e)),
924 }
925 }
926}
927
928impl Peek for StreamReader {
929 fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
930 match &mut self.resources {
931 ReadResources::File(file) => Peek::peek(file, buf),
932 ReadResources::TcpStream(tcp_stream) => Peek::peek(tcp_stream, buf),
933 #[cfg(unix)]
934 ReadResources::UnixStream(unix_stream) => Peek::peek(unix_stream, buf),
935 _ => Ok(0),
936 }
937 }
938}
939
940impl ReadReady for StreamReader {
941 fn num_ready_bytes(&self) -> io::Result<u64> {
942 match &self.resources {
943 ReadResources::File(file) => ReadReady::num_ready_bytes(file),
944 ReadResources::TcpStream(tcp_stream) => ReadReady::num_ready_bytes(tcp_stream),
945 #[cfg(unix)]
946 ReadResources::UnixStream(unix_stream) => ReadReady::num_ready_bytes(unix_stream),
947 ReadResources::PipeReader(pipe_reader) => ReadReady::num_ready_bytes(pipe_reader),
948 ReadResources::Stdin(stdin) => ReadReady::num_ready_bytes(stdin),
949 #[cfg(not(target_os = "wasi"))]
950 ReadResources::PipedThread(piped_thread) => {
951 ReadReady::num_ready_bytes(&piped_thread.as_ref().unwrap().0)
952 }
953 #[cfg(not(target_os = "wasi"))]
954 ReadResources::Child(child) => {
955 ReadReady::num_ready_bytes(child.stdout.as_ref().unwrap())
956 }
957 #[cfg(not(target_os = "wasi"))]
958 ReadResources::ChildStdout(child_stdout) => ReadReady::num_ready_bytes(child_stdout),
959 #[cfg(not(target_os = "wasi"))]
960 ReadResources::ChildStderr(child_stderr) => ReadReady::num_ready_bytes(child_stderr),
961 }
962 }
963}
964
965impl Write for StreamWriter {
966 #[inline]
967 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
968 match self.handle.write(buf) {
969 Ok(size) => Ok(size),
970 Err(e) => Err(self.map_err(e)),
971 }
972 }
973
974 #[inline]
975 fn flush(&mut self) -> io::Result<()> {
976 match self.handle.flush() {
977 Ok(()) => {
978 #[cfg(not(target_os = "wasi"))] if let WriteResources::PipedThread(piped_thread) = &mut self.resources {
984 let (mut pipe_writer, join_handle) = piped_thread.take().unwrap();
985 pipe_writer.flush()?;
986 drop(pipe_writer);
987 let boxed_write = join_handle.join().unwrap().unwrap();
988 *self = Self::piped_thread(boxed_write)?;
989 }
990 Ok(())
991 }
992 Err(e) => Err(self.map_err(e)),
993 }
994 }
995
996 #[inline]
997 fn write_vectored(&mut self, bufs: &[IoSlice]) -> io::Result<usize> {
998 match self.handle.write_vectored(bufs) {
999 Ok(size) => Ok(size),
1000 Err(e) => Err(self.map_err(e)),
1001 }
1002 }
1003
1004 #[cfg(can_vector)]
1005 #[inline]
1006 fn is_write_vectored(&self) -> bool {
1007 self.handle.is_write_vectored()
1008 }
1009
1010 #[inline]
1011 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
1012 match self.handle.write_all(buf) {
1013 Ok(()) => Ok(()),
1014 Err(e) => Err(self.map_err(e)),
1015 }
1016 }
1017
1018 #[cfg(write_all_vectored)]
1019 #[inline]
1020 fn write_all_vectored(&mut self, bufs: &mut [IoSlice]) -> io::Result<()> {
1021 match self.handle.write_all_vectored(bufs) {
1022 Ok(()) => Ok(()),
1023 Err(e) => Err(self.map_err(e)),
1024 }
1025 }
1026
1027 #[inline]
1028 fn write_fmt(&mut self, fmt: Arguments) -> io::Result<()> {
1029 match self.handle.write_fmt(fmt) {
1030 Ok(()) => Ok(()),
1031 Err(e) => Err(self.map_err(e)),
1032 }
1033 }
1034}
1035
1036impl Read for StreamDuplexer {
1037 #[inline]
1038 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1039 match self.read_handle.read(buf) {
1040 Ok(size) => Ok(size),
1041 Err(e) => Err(self.map_err(e)),
1042 }
1043 }
1044
1045 #[inline]
1046 fn read_vectored(&mut self, bufs: &mut [IoSliceMut]) -> io::Result<usize> {
1047 match self.read_handle.read_vectored(bufs) {
1048 Ok(size) => Ok(size),
1049 Err(e) => Err(self.map_err(e)),
1050 }
1051 }
1052
1053 #[cfg(can_vector)]
1054 #[inline]
1055 fn is_read_vectored(&self) -> bool {
1056 self.read_handle.is_read_vectored()
1057 }
1058
1059 #[inline]
1060 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
1061 match self.read_handle.read_to_end(buf) {
1062 Ok(size) => Ok(size),
1063 Err(e) => Err(self.map_err(e)),
1064 }
1065 }
1066
1067 #[inline]
1068 fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
1069 match self.read_handle.read_to_string(buf) {
1070 Ok(size) => Ok(size),
1071 Err(e) => Err(self.map_err(e)),
1072 }
1073 }
1074
1075 #[inline]
1076 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
1077 match self.read_handle.read_exact(buf) {
1078 Ok(()) => Ok(()),
1079 Err(e) => Err(self.map_err(e)),
1080 }
1081 }
1082}
1083
1084impl Peek for StreamDuplexer {
1085 fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1086 match &mut self.resources {
1087 DuplexResources::TcpStream(tcp_stream) => Peek::peek(tcp_stream, buf),
1088 #[cfg(unix)]
1089 DuplexResources::UnixStream(unix_stream) => Peek::peek(unix_stream, buf),
1090 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1091 DuplexResources::SocketpairStream(socketpair) => Peek::peek(socketpair, buf),
1092 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1093 DuplexResources::SocketedThreadFunc(socketed_thread) => {
1094 Peek::peek(&mut socketed_thread.as_mut().unwrap().0, buf)
1095 }
1096 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1097 DuplexResources::SocketedThreadReadReady(socketed_thread) => {
1098 Peek::peek(&mut socketed_thread.as_mut().unwrap().0, buf)
1099 }
1100 _ => Ok(0),
1101 }
1102 }
1103}
1104
1105impl ReadReady for StreamDuplexer {
1106 fn num_ready_bytes(&self) -> io::Result<u64> {
1107 match &self.resources {
1108 #[cfg(not(target_os = "wasi"))]
1109 DuplexResources::PipeReaderWriter((pipe_reader, _)) => {
1110 ReadReady::num_ready_bytes(pipe_reader)
1111 }
1112 DuplexResources::StdinStdout((stdin, _)) => ReadReady::num_ready_bytes(stdin),
1113 #[cfg(not(target_os = "wasi"))]
1114 DuplexResources::Child(child) => {
1115 ReadReady::num_ready_bytes(child.stdout.as_ref().unwrap())
1116 }
1117 #[cfg(not(target_os = "wasi"))]
1118 DuplexResources::ChildStdoutStdin((child_stdout, _)) => {
1119 ReadReady::num_ready_bytes(child_stdout)
1120 }
1121 #[cfg(feature = "char-device")]
1122 DuplexResources::CharDevice(char_device) => ReadReady::num_ready_bytes(char_device),
1123 DuplexResources::DevNull(file) => ReadReady::num_ready_bytes(file),
1124 DuplexResources::TcpStream(tcp_stream) => ReadReady::num_ready_bytes(tcp_stream),
1125 #[cfg(unix)]
1126 DuplexResources::UnixStream(unix_stream) => ReadReady::num_ready_bytes(unix_stream),
1127 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1128 DuplexResources::SocketpairStream(socketpair_stream) => {
1129 ReadReady::num_ready_bytes(socketpair_stream)
1130 }
1131 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1132 DuplexResources::SocketedThreadFunc(socketed_thread) => {
1133 ReadReady::num_ready_bytes(&socketed_thread.as_ref().unwrap().0)
1134 }
1135 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1136 DuplexResources::SocketedThread(socketed_thread) => {
1137 ReadReady::num_ready_bytes(&socketed_thread.as_ref().unwrap().0)
1138 }
1139 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1140 DuplexResources::SocketedThreadReadReady(socketed_thread) => {
1141 ReadReady::num_ready_bytes(&socketed_thread.as_ref().unwrap().0)
1142 }
1143 }
1144 }
1145}
1146
1147impl Write for StreamDuplexer {
1148 #[inline]
1149 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1150 match self.write_handle.write(buf) {
1151 Ok(size) => Ok(size),
1152 Err(e) => Err(self.map_err(e)),
1153 }
1154 }
1155
1156 #[inline]
1157 fn flush(&mut self) -> io::Result<()> {
1158 match self.write_handle.flush() {
1159 Ok(()) => Ok(()),
1160 Err(e) => Err(self.map_err(e)),
1161 }
1162 }
1163
1164 #[inline]
1165 fn write_vectored(&mut self, bufs: &[IoSlice]) -> io::Result<usize> {
1166 match self.write_handle.write_vectored(bufs) {
1167 Ok(size) => Ok(size),
1168 Err(e) => Err(self.map_err(e)),
1169 }
1170 }
1171
1172 #[cfg(can_vector)]
1173 #[inline]
1174 fn is_write_vectored(&self) -> bool {
1175 self.write_handle.is_write_vectored()
1176 }
1177
1178 #[inline]
1179 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
1180 match self.write_handle.write_all(buf) {
1181 Ok(()) => Ok(()),
1182 Err(e) => Err(self.map_err(e)),
1183 }
1184 }
1185
1186 #[cfg(write_all_vectored)]
1187 #[inline]
1188 fn write_all_vectored(&mut self, bufs: &mut [IoSlice]) -> io::Result<()> {
1189 match self.write_handle.write_all_vectored(bufs) {
1190 Ok(()) => Ok(()),
1191 Err(e) => Err(self.map_err(e)),
1192 }
1193 }
1194
1195 #[inline]
1196 fn write_fmt(&mut self, fmt: Arguments) -> io::Result<()> {
1197 match self.write_handle.write_fmt(fmt) {
1198 Ok(()) => Ok(()),
1199 Err(e) => Err(self.map_err(e)),
1200 }
1201 }
1202}
1203
1204impl Duplex for StreamDuplexer {}
1205
1206#[cfg(not(windows))]
1207impl AsRawFd for StreamReader {
1208 #[inline]
1209 fn as_raw_fd(&self) -> RawFd {
1210 self.handle.as_raw_fd()
1211 }
1212}
1213
1214#[cfg(not(windows))]
1215impl AsRawFd for StreamWriter {
1216 #[inline]
1217 fn as_raw_fd(&self) -> RawFd {
1218 self.handle.as_raw_fd()
1219 }
1220}
1221
1222#[cfg(not(windows))]
1223impl AsRawReadWriteFd for StreamDuplexer {
1224 #[inline]
1225 fn as_raw_read_fd(&self) -> RawFd {
1226 self.read_handle.as_raw_fd()
1227 }
1228
1229 #[inline]
1230 fn as_raw_write_fd(&self) -> RawFd {
1231 self.write_handle.as_raw_fd()
1232 }
1233}
1234
1235#[cfg(windows)]
1236impl AsRawHandleOrSocket for StreamReader {
1237 #[inline]
1238 fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
1239 self.handle.as_raw_handle_or_socket()
1240 }
1241}
1242
1243#[cfg(windows)]
1244impl AsRawHandleOrSocket for StreamWriter {
1245 #[inline]
1246 fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
1247 self.handle.as_raw_handle_or_socket()
1248 }
1249}
1250
1251#[cfg(windows)]
1252impl AsRawReadWriteHandleOrSocket for StreamDuplexer {
1253 #[inline]
1254 fn as_raw_read_handle_or_socket(&self) -> RawHandleOrSocket {
1255 self.read_handle.as_raw_handle_or_socket()
1256 }
1257
1258 #[inline]
1259 fn as_raw_write_handle_or_socket(&self) -> RawHandleOrSocket {
1260 self.write_handle.as_raw_handle_or_socket()
1261 }
1262}
1263
1264#[cfg(not(windows))]
1265impl AsFd for StreamReader {
1266 #[inline]
1267 fn as_fd(&self) -> BorrowedFd<'_> {
1268 unsafe { BorrowedFd::borrow_raw(self.handle.as_raw_fd()) }
1269 }
1270}
1271
1272#[cfg(not(windows))]
1273impl AsFd for StreamWriter {
1274 #[inline]
1275 fn as_fd(&self) -> BorrowedFd<'_> {
1276 unsafe { BorrowedFd::borrow_raw(self.handle.as_raw_fd()) }
1277 }
1278}
1279
1280#[cfg(not(windows))]
1281impl AsReadWriteFd for StreamDuplexer {
1282 #[inline]
1283 fn as_read_fd(&self) -> BorrowedFd<'_> {
1284 unsafe { BorrowedFd::borrow_raw(self.read_handle.as_raw_fd()) }
1285 }
1286
1287 #[inline]
1288 fn as_write_fd(&self) -> BorrowedFd<'_> {
1289 unsafe { BorrowedFd::borrow_raw(self.write_handle.as_raw_fd()) }
1290 }
1291}
1292
1293#[cfg(windows)]
1294impl AsHandleOrSocket for StreamReader {
1295 #[inline]
1296 fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
1297 unsafe { BorrowedHandleOrSocket::borrow_raw(self.handle.as_raw_handle_or_socket()) }
1298 }
1299}
1300
1301#[cfg(windows)]
1302impl AsHandleOrSocket for StreamWriter {
1303 #[inline]
1304 fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
1305 unsafe { BorrowedHandleOrSocket::borrow_raw(self.handle.as_raw_handle_or_socket()) }
1306 }
1307}
1308
1309#[cfg(windows)]
1310impl AsReadWriteHandleOrSocket for StreamDuplexer {
1311 #[inline]
1312 fn as_read_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
1313 unsafe { BorrowedHandleOrSocket::borrow_raw(self.read_handle.as_raw_handle_or_socket()) }
1314 }
1315
1316 #[inline]
1317 fn as_write_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
1318 unsafe { BorrowedHandleOrSocket::borrow_raw(self.write_handle.as_raw_handle_or_socket()) }
1319 }
1320}
1321
1322impl Drop for ReadResources {
1323 fn drop(&mut self) {
1324 match self {
1325 #[cfg(not(target_os = "wasi"))] Self::PipedThread(piped_thread) => {
1327 let (pipe_reader, join_handle) = piped_thread.take().unwrap();
1328 drop(pipe_reader);
1329 match join_handle.join().unwrap() {
1333 Ok(()) => (),
1334 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => (),
1335 Err(e) => Err(e).unwrap(),
1336 }
1337 }
1338 _ => {}
1339 }
1340 }
1341}
1342
1343impl Drop for WriteResources {
1344 fn drop(&mut self) {
1345 match self {
1346 #[cfg(not(target_os = "wasi"))] Self::PipedThread(piped_thread) => {
1348 if let Some((pipe_writer, join_handle)) = piped_thread.take() {
1349 drop(pipe_writer);
1350 join_handle.join().unwrap().unwrap();
1351 }
1352 }
1353 _ => {}
1354 }
1355 }
1356}
1357
1358impl Drop for DuplexResources {
1359 fn drop(&mut self) {
1360 match self {
1361 #[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1362 Self::SocketedThreadFunc(socketed_thread) => {
1363 if let Some((socketpair, join_handle)) = socketed_thread.take() {
1364 drop(socketpair);
1365 join_handle.join().unwrap().unwrap();
1366 }
1367 }
1368 _ => {}
1369 }
1370 }
1371}
1372
1373impl Debug for StreamReader {
1374 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1375 f.debug_struct("StreamReader")
1379 .field("raw_grip", &self.as_raw_grip())
1380 .finish()
1381 }
1382}
1383
1384impl Debug for StreamWriter {
1385 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1386 f.debug_struct("StreamWriter")
1390 .field("raw_grip", &self.as_raw_grip())
1391 .finish()
1392 }
1393}
1394
1395impl Debug for StreamDuplexer {
1396 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1397 f.debug_struct("StreamDuplexer")
1401 .field("unsafe_readable", &self.as_raw_read_grip())
1402 .field("unsafe_writeable", &self.as_raw_write_grip())
1403 .finish()
1404 }
1405}
1406
1407#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1410#[cfg_attr(docsrs, doc(cfg(all(not(target_os = "wasi"), feature = "socketpair"))))]
1411pub trait HalfDuplexReadReady: HalfDuplex + ReadReady {}
1412
1413#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1414impl<T: HalfDuplex + ReadReady> HalfDuplexReadReady for T {}
1415
1416#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1417fn read_first<T: HalfDuplex + ?Sized>(
1418 mut a: SocketpairStream,
1419 boxed_duplex: &mut T,
1420) -> io::Result<()> {
1421 'thread: loop {
1422 let mut buf = vec![0_u8; crate::buffered::DEFAULT_BUF_SIZE];
1423
1424 loop {
1425 let n = match boxed_duplex.read(&mut buf) {
1426 Ok(0) => break 'thread,
1427 Ok(n) => n,
1428 Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
1429 Err(e) => return Err(e),
1430 };
1431 a.write_all(&buf[..n])?;
1432 if n < buf.len() {
1433 break;
1434 }
1435 }
1436
1437 loop {
1438 let n = match a.read(&mut buf) {
1439 Ok(0) => break 'thread,
1440 Ok(n) => n,
1441 Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
1442 Err(e) => return Err(e),
1443 };
1444 boxed_duplex.write_all(&buf[..n])?;
1445 if n < buf.len() {
1446 break;
1447 }
1448 }
1449 }
1450 Ok(())
1451}
1452
1453#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
1454fn write_first<T: HalfDuplex + ?Sized>(
1455 mut a: SocketpairStream,
1456 boxed_duplex: &mut T,
1457) -> io::Result<()> {
1458 'thread: loop {
1459 let mut buf = [0_u8; crate::buffered::DEFAULT_BUF_SIZE];
1460
1461 loop {
1462 let n = match a.read(&mut buf) {
1463 Ok(0) => break 'thread,
1464 Ok(n) => n,
1465 Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
1466 Err(e) => return Err(e),
1467 };
1468 boxed_duplex.write_all(&buf[..n])?;
1469 if n < buf.len() {
1470 break;
1471 }
1472 }
1473
1474 loop {
1475 let n = match boxed_duplex.read(&mut buf) {
1476 Ok(0) => break 'thread,
1477 Ok(n) => n,
1478 Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
1479 Err(e) => return Err(e),
1480 };
1481 a.write_all(&buf[..n])?;
1482 if n < buf.len() {
1483 break;
1484 }
1485 }
1486 }
1487 Ok(())
1488}