1#![doc(html_root_url = "https://docs.rs/tokio-pipe/0.2.12")]
2use std::cmp;
23use std::convert::TryFrom;
24use std::ffi::c_void;
25use std::fmt;
26use std::io;
27use std::mem;
28use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
29use std::pin::Pin;
30#[cfg(target_os = "linux")]
31use std::ptr;
32use std::task::{Context, Poll};
33
34use tokio::io::unix::AsyncFd;
35use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
36
37#[cfg(target_os = "linux")]
38pub use libc::off64_t;
39
40pub use libc::PIPE_BUF;
41
42#[cfg(target_os = "macos")]
43const MAX_LEN: usize = <libc::c_int>::MAX as usize - 1;
44
45#[cfg(not(target_os = "macos"))]
46const MAX_LEN: usize = <libc::ssize_t>::MAX as usize;
47
48macro_rules! try_libc {
49 ($e: expr) => {{
50 let ret = $e;
51 if ret == -1 {
52 return Err(io::Error::last_os_error());
53 }
54 ret
55 }};
56}
57
58macro_rules! cvt {
59 ($e:expr) => {{
60 let ret = $e;
61 if ret == -1 {
62 Err(io::Error::last_os_error())
63 } else {
64 Ok(ret)
65 }
66 }};
67}
68
69macro_rules! ready {
70 ($e:expr) => {
71 match $e {
72 Poll::Pending => return Poll::Pending,
73 Poll::Ready(e) => e,
74 }
75 };
76}
77
78fn is_wouldblock(err: &io::Error) -> bool {
79 err.kind() == io::ErrorKind::WouldBlock
80}
81
82unsafe fn set_nonblocking(fd: RawFd) {
83 let status_flags = libc::fcntl(fd, libc::F_GETFL);
84 if (status_flags & libc::O_NONBLOCK) == 0 {
85 libc::fcntl(fd, libc::F_SETFL, status_flags | libc::O_NONBLOCK);
86 }
87}
88
89unsafe fn set_nonblocking_checked(fd: RawFd, status_flags: libc::c_int) -> Result<(), io::Error> {
90 if (status_flags & libc::O_NONBLOCK) == 0 {
91 let res = libc::fcntl(fd, libc::F_SETFL, status_flags | libc::O_NONBLOCK);
92 try_libc!(res);
93 }
94
95 Ok(())
96}
97
98#[cfg(target_os = "linux")]
104unsafe fn test_read_write_readiness(reader: RawFd, writer: RawFd) -> io::Result<(bool, bool)> {
105 use libc::{poll, pollfd, POLLERR, POLLHUP, POLLIN, POLLNVAL, POLLOUT};
106
107 let mut fds = [
108 pollfd {
109 fd: reader,
110 events: POLLIN,
111 revents: 0,
112 },
113 pollfd {
114 fd: writer,
115 events: POLLOUT,
116 revents: 0,
117 },
118 ];
119
120 try_libc!(poll(&mut fds[0], 2, 0));
122
123 let is_read_ready = match fds[0].revents {
124 POLLERR | POLLHUP | POLLIN => true,
125 POLLNVAL => {
126 return Err(io::Error::new(
127 io::ErrorKind::InvalidInput,
128 "fd of reader is invalid",
129 ))
130 }
131 _ => false,
132 };
133
134 let is_writer_ready = match fds[1].revents {
135 POLLERR | POLLHUP | POLLOUT => true,
136 POLLNVAL => {
137 return Err(io::Error::new(
138 io::ErrorKind::InvalidInput,
139 "fd of writer is invalid",
140 ))
141 }
142 _ => false,
143 };
144
145 Ok((is_read_ready, is_writer_ready))
146}
147
148fn check_pipe(fd: RawFd) -> Result<(), io::Error> {
149 let mut stat = mem::MaybeUninit::<libc::stat>::uninit();
150
151 try_libc!(unsafe { libc::fstat(fd, stat.as_mut_ptr()) });
152
153 let stat = unsafe { stat.assume_init() };
154 if (stat.st_mode & libc::S_IFMT) == libc::S_IFIFO {
155 Ok(())
156 } else {
157 Err(io::Error::new(io::ErrorKind::Other, "Fd is not a pipe"))
158 }
159}
160
161fn get_status_flags(fd: RawFd) -> Result<libc::c_int, io::Error> {
162 Ok(try_libc!(unsafe { libc::fcntl(fd, libc::F_GETFL) }))
163}
164
165#[derive(Debug)]
167#[cfg_attr(rustc_attrs, rustc_layout_scalar_valid_range_start(0))]
170#[cfg_attr(rustc_attrs, rustc_layout_scalar_valid_range_end(0xFF_FF_FF_FE))]
176struct PipeFd(RawFd);
177
178impl PipeFd {
179 fn from_raw_fd_checked(fd: RawFd, readable: bool) -> Result<Self, io::Error> {
182 let (access_mode, errmsg) = if readable {
183 (libc::O_RDONLY, "Fd is not the read end")
184 } else {
185 (libc::O_WRONLY, "Fd is not the write end")
186 };
187
188 check_pipe(fd)?;
189 let status_flags = get_status_flags(fd)?;
190 if (status_flags & libc::O_ACCMODE) == access_mode {
191 unsafe { set_nonblocking_checked(fd, status_flags) }?;
192 Ok(Self(fd))
193 } else {
194 Err(io::Error::new(io::ErrorKind::Other, errmsg))
195 }
196 }
197}
198
199impl AsRawFd for PipeFd {
200 fn as_raw_fd(&self) -> RawFd {
201 self.0
202 }
203}
204
205impl Drop for PipeFd {
206 fn drop(&mut self) {
207 let _ = unsafe { libc::close(self.0) };
208 }
209}
210
211#[derive(Copy, Clone, Debug)]
213pub struct AtomicWriteBuffer<'a>(&'a [u8]);
214impl<'a> AtomicWriteBuffer<'a> {
215 pub fn new(buffer: &'a [u8]) -> Option<Self> {
217 if buffer.len() <= PIPE_BUF {
218 Some(Self(buffer))
219 } else {
220 None
221 }
222 }
223
224 pub fn into_inner(self) -> &'a [u8] {
225 self.0
226 }
227}
228
229#[derive(Copy, Clone, Debug)]
231pub struct AtomicWriteIoSlices<'a, 'b>(&'a [io::IoSlice<'b>], usize);
232impl<'a, 'b> AtomicWriteIoSlices<'a, 'b> {
233 pub fn new(buffers: &'a [io::IoSlice<'b>]) -> Option<Self> {
235 let mut total_len = 0;
236
237 for buffer in buffers {
238 total_len += buffer.len();
239
240 if total_len > PIPE_BUF {
241 return None;
242 }
243 }
244
245 Some(Self(buffers, total_len))
246 }
247
248 pub fn get_total_len(self) -> usize {
249 self.1
250 }
251
252 pub fn into_inner(self) -> &'a [io::IoSlice<'b>] {
253 self.0
254 }
255}
256
257#[cfg(target_os = "linux")]
258async fn tee_impl(pipe_in: &PipeRead, pipe_out: &PipeWrite, len: usize) -> io::Result<usize> {
259 let mut read_ready = pipe_in.0.readable().await?;
261 let mut write_ready = pipe_out.0.writable().await?;
262
263 loop {
264 let ret = unsafe {
265 libc::tee(
266 pipe_in.as_raw_fd(),
267 pipe_out.as_raw_fd(),
268 len,
269 libc::SPLICE_F_NONBLOCK,
270 )
271 };
272 match cvt!(ret) {
273 Err(e) if is_wouldblock(&e) => {
274 let (read_readiness, write_readiness) = unsafe {
283 test_read_write_readiness(pipe_in.as_raw_fd(), pipe_out.as_raw_fd())?
284 };
285
286 if !read_readiness {
287 read_ready.clear_ready();
288 read_ready = pipe_in.0.readable().await?;
289 }
290
291 if !write_readiness {
292 write_ready.clear_ready();
293 write_ready = pipe_out.0.writable().await?;
294 }
295 }
296 Err(e) => break Err(e),
297 Ok(ret) => break Ok(ret as usize),
298 }
299 }
300}
301
302#[cfg(target_os = "linux")]
307pub async fn tee(
308 pipe_in: &mut PipeRead,
309 pipe_out: &mut PipeWrite,
310 len: usize,
311) -> io::Result<usize> {
312 tee_impl(pipe_in, pipe_out, len).await
313}
314
315#[cfg(target_os = "linux")]
316fn as_ptr<T>(option: Option<&mut T>) -> *mut T {
317 match option {
318 Some(some) => some,
319 None => ptr::null_mut(),
320 }
321}
322
323#[cfg(target_os = "linux")]
324async fn splice_impl(
325 fd_in: &mut AsyncFd<impl AsRawFd>,
326 mut off_in: Option<&mut off64_t>,
327 fd_out: &AsyncFd<impl AsRawFd>,
328 mut off_out: Option<&mut off64_t>,
329 len: usize,
330 has_more_data: bool,
331) -> io::Result<usize> {
332 let mut read_ready = fd_in.readable().await?;
334 let mut write_ready = fd_out.writable().await?;
335
336 let flags = libc::SPLICE_F_NONBLOCK
338 | if has_more_data {
339 libc::SPLICE_F_MORE
340 } else {
341 0
342 };
343
344 loop {
345 let ret = unsafe {
346 libc::splice(
347 fd_in.as_raw_fd(),
348 as_ptr(off_in.as_deref_mut()),
349 fd_out.as_raw_fd(),
350 as_ptr(off_out.as_deref_mut()),
351 len,
352 flags,
353 )
354 };
355 match cvt!(ret) {
356 Err(e) if is_wouldblock(&e) => {
357 let (read_readiness, write_readiness) =
366 unsafe { test_read_write_readiness(fd_in.as_raw_fd(), fd_out.as_raw_fd())? };
367
368 if !read_readiness {
369 read_ready.clear_ready();
370 read_ready = fd_in.readable().await?;
371 }
372
373 if !write_readiness {
374 write_ready.clear_ready();
375 write_ready = fd_out.writable().await?;
376 }
377 }
378 Err(e) => break Err(e),
379 Ok(ret) => break Ok(ret as usize),
380 }
381 }
382}
383
384#[cfg(target_os = "linux")]
389pub async fn splice(
390 pipe_in: &mut PipeRead,
391 pipe_out: &mut PipeWrite,
392 len: usize,
393) -> io::Result<usize> {
394 splice_impl(&mut pipe_in.0, None, &pipe_out.0, None, len, false).await
395}
396
397pub struct PipeRead(AsyncFd<PipeFd>);
399
400impl TryFrom<RawFd> for PipeRead {
401 type Error = io::Error;
402
403 fn try_from(fd: RawFd) -> Result<Self, Self::Error> {
404 Self::from_raw_fd_checked(fd)
405 }
406}
407
408impl PipeRead {
409 fn new(fd: RawFd) -> Result<Self, io::Error> {
410 Self::from_pipefd(PipeFd(fd))
411 }
412
413 fn from_pipefd(pipe_fd: PipeFd) -> Result<Self, io::Error> {
414 Ok(Self(AsyncFd::new(pipe_fd)?))
415 }
416
417 pub fn from_raw_fd_checked(fd: RawFd) -> Result<Self, io::Error> {
419 Self::from_pipefd(PipeFd::from_raw_fd_checked(fd, true)?)
420 }
421
422 #[cfg(target_os = "linux")]
434 pub async fn splice_to(
435 &mut self,
436 asyncfd_out: &AsyncFd<impl AsRawFd>,
437 off_out: Option<&mut off64_t>,
438 len: usize,
439 has_more_data: bool,
440 ) -> io::Result<usize> {
441 splice_impl(&mut self.0, None, asyncfd_out, off_out, len, has_more_data).await
442 }
443}
444
445impl AsyncRead for PipeRead {
446 fn poll_read(
447 mut self: Pin<&mut Self>,
448 cx: &mut Context<'_>,
449 buf: &mut ReadBuf<'_>,
450 ) -> Poll<io::Result<()>> {
451 let fd = self.0.as_raw_fd();
452
453 loop {
454 let pinned = Pin::new(&mut self.0);
455 let mut ready = ready!(pinned.poll_read_ready(cx))?;
456 let ret = unsafe {
457 libc::read(
458 fd,
459 buf.unfilled_mut() as *mut _ as *mut c_void,
460 cmp::min(buf.remaining(), MAX_LEN),
461 )
462 };
463 match cvt!(ret) {
464 Err(e) if is_wouldblock(&e) => {
465 ready.clear_ready();
466 }
467 Err(e) => return Poll::Ready(Err(e)),
468 Ok(ret) => {
469 let ret = ret as usize;
470 unsafe {
471 buf.assume_init(ret);
472 };
473 buf.advance(ret);
474 return Poll::Ready(Ok(()));
475 }
476 }
477 }
478 }
479}
480
481impl AsRawFd for PipeRead {
482 fn as_raw_fd(&self) -> RawFd {
483 self.0.as_raw_fd()
484 }
485}
486
487impl IntoRawFd for PipeRead {
488 fn into_raw_fd(self) -> RawFd {
489 let inner = self.0.into_inner();
490 let fd = inner.0;
491 mem::forget(inner);
492 fd
493 }
494}
495
496impl FromRawFd for PipeRead {
497 unsafe fn from_raw_fd(fd: RawFd) -> Self {
498 set_nonblocking(fd);
499 Self::new(fd).unwrap()
500 }
501}
502
503impl fmt::Debug for PipeRead {
504 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
505 write!(f, "PipeRead({})", self.as_raw_fd())
506 }
507}
508
509pub struct PipeWrite(AsyncFd<PipeFd>);
511
512impl TryFrom<RawFd> for PipeWrite {
513 type Error = io::Error;
514
515 fn try_from(fd: RawFd) -> Result<Self, Self::Error> {
516 Self::from_raw_fd_checked(fd)
517 }
518}
519
520impl PipeWrite {
521 fn new(fd: RawFd) -> Result<Self, io::Error> {
522 Self::from_pipefd(PipeFd(fd))
523 }
524
525 fn from_pipefd(pipe_fd: PipeFd) -> Result<Self, io::Error> {
526 Ok(Self(AsyncFd::new(pipe_fd)?))
527 }
528
529 pub fn from_raw_fd_checked(fd: RawFd) -> Result<Self, io::Error> {
531 Self::from_pipefd(PipeFd::from_raw_fd_checked(fd, false)?)
532 }
533}
534
535impl AsRawFd for PipeWrite {
536 fn as_raw_fd(&self) -> RawFd {
537 self.0.as_raw_fd()
538 }
539}
540
541impl IntoRawFd for PipeWrite {
542 fn into_raw_fd(self) -> RawFd {
543 let inner = self.0.into_inner();
544 let fd = inner.0;
545 mem::forget(inner);
546 fd
547 }
548}
549
550impl FromRawFd for PipeWrite {
551 unsafe fn from_raw_fd(fd: RawFd) -> Self {
552 set_nonblocking(fd);
553 Self::new(fd).unwrap()
554 }
555}
556
557impl PipeWrite {
558 fn poll_write_impl(
559 self: Pin<&Self>,
560 cx: &mut Context<'_>,
561 buf: &[u8],
562 ) -> Poll<Result<usize, io::Error>> {
563 let fd = self.0.as_raw_fd();
564
565 loop {
566 let pinned = Pin::new(&self.0);
567 let mut ready = ready!(pinned.poll_write_ready(cx))?;
568 let ret = unsafe {
569 libc::write(
570 fd,
571 buf.as_ptr() as *mut c_void,
572 cmp::min(buf.len(), MAX_LEN),
573 )
574 };
575 match cvt!(ret) {
576 Err(e) if is_wouldblock(&e) => {
577 ready.clear_ready();
578 }
579 Err(e) => return Poll::Ready(Err(e)),
580 Ok(ret) => return Poll::Ready(Ok(ret as usize)),
581 }
582 }
583 }
584
585 pub fn poll_write_atomic(
587 self: Pin<&Self>,
588 cx: &mut Context<'_>,
589 buf: AtomicWriteBuffer,
590 ) -> Poll<Result<usize, io::Error>> {
591 self.poll_write_impl(cx, buf.0)
592 }
593
594 fn poll_write_vectored_impl(
595 self: Pin<&Self>,
596 cx: &mut Context<'_>,
597 bufs: &[io::IoSlice<'_>],
598 ) -> Poll<Result<usize, io::Error>> {
599 let fd = self.0.as_raw_fd();
600
601 loop {
602 let pinned = Pin::new(&self.0);
603 let mut ready = ready!(pinned.poll_write_ready(cx))?;
604 let ret =
605 unsafe { libc::writev(fd, bufs.as_ptr() as *const libc::iovec, bufs.len() as i32) };
606 match cvt!(ret) {
607 Err(e) if is_wouldblock(&e) => {
608 ready.clear_ready();
609 }
610 Err(e) => return Poll::Ready(Err(e)),
611 Ok(ret) => return Poll::Ready(Ok(ret as usize)),
612 }
613 }
614 }
615
616 pub fn poll_write_vectored_atomic(
617 self: Pin<&Self>,
618 cx: &mut Context<'_>,
619 bufs: AtomicWriteIoSlices<'_, '_>,
620 ) -> Poll<Result<usize, io::Error>> {
621 self.poll_write_vectored_impl(cx, bufs.0)
622 }
623
624 #[cfg(target_os = "linux")]
634 pub async fn splice_from(
635 &mut self,
636 asyncfd_in: &mut AsyncFd<impl AsRawFd>,
637 off_in: Option<&mut off64_t>,
638 len: usize,
639 ) -> io::Result<usize> {
640 splice_impl(asyncfd_in, off_in, &self.0, None, len, false).await
641 }
642}
643
644impl AsyncWrite for PipeWrite {
645 fn poll_write(
646 self: Pin<&mut Self>,
647 cx: &mut Context<'_>,
648 buf: &[u8],
649 ) -> Poll<Result<usize, io::Error>> {
650 self.as_ref().poll_write_impl(cx, buf)
651 }
652
653 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
654 Poll::Ready(Ok(()))
655 }
656
657 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
658 Poll::Ready(Ok(()))
659 }
660
661 fn poll_write_vectored(
662 self: Pin<&mut Self>,
663 cx: &mut Context<'_>,
664 bufs: &[io::IoSlice<'_>],
665 ) -> Poll<Result<usize, io::Error>> {
666 self.as_ref().poll_write_vectored_impl(cx, bufs)
667 }
668
669 fn is_write_vectored(&self) -> bool {
670 true
671 }
672}
673
674impl fmt::Debug for PipeWrite {
675 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
676 write!(f, "PipeRead({})", self.as_raw_fd())
677 }
678}
679
680#[cfg(any(target_os = "linux", target_os = "solaris"))]
681fn sys_pipe() -> io::Result<(RawFd, RawFd)> {
682 let mut pipefd = [0; 2];
683 let ret = unsafe { libc::pipe2(pipefd.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
684 if ret == -1 {
685 return Err(io::Error::last_os_error());
686 }
687 Ok((pipefd[0], pipefd[1]))
688}
689
690#[cfg(not(any(target_os = "linux", target_os = "solaris")))]
691fn sys_pipe() -> io::Result<(RawFd, RawFd)> {
692 let mut pipefd = [0; 2];
693 try_libc!(unsafe { libc::pipe(pipefd.as_mut_ptr()) });
694 for fd in &pipefd {
695 let ret = try_libc!(unsafe { libc::fcntl(*fd, libc::F_GETFD) });
696 try_libc!(unsafe { libc::fcntl(*fd, libc::F_SETFD, ret | libc::FD_CLOEXEC) });
697 let ret = try_libc!(unsafe { libc::fcntl(*fd, libc::F_GETFL) });
698 try_libc!(unsafe { libc::fcntl(*fd, libc::F_SETFL, ret | libc::O_NONBLOCK) });
699 }
700 Ok((pipefd[0], pipefd[1]))
701}
702
703pub fn pipe() -> io::Result<(PipeRead, PipeWrite)> {
705 let (r, w) = sys_pipe()?;
706 Ok((PipeRead::new(r)?, PipeWrite::new(w)?))
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712
713 use std::fs::File;
714
715 use tokio::io::{AsyncReadExt, AsyncWriteExt};
716
717 #[cfg(target_os = "linux")]
718 use tokio::time::{sleep, Duration};
719
720 #[tokio::test]
721 async fn test() {
722 let (mut r, mut w) = pipe().unwrap();
723
724 let w_task = tokio::spawn(async move {
725 for n in 0..=65535 {
726 w.write_u32(n).await.unwrap();
727 }
728 });
730
731 let r_task = tokio::spawn(async move {
732 let mut n = 0u32;
733 let mut buf = [0; 4 * 128];
734 while n < 65535 {
735 r.read_exact(&mut buf).await.unwrap();
736 for x in buf.chunks(4) {
737 assert_eq!(x, n.to_be_bytes());
738 n += 1;
739 }
740 }
741 });
742 tokio::try_join!(w_task, r_task).unwrap();
743 }
744
745 #[tokio::test]
746 async fn test_write_after_shutdown() {
747 let (r, mut w) = pipe().unwrap();
748 w.shutdown().await.unwrap();
749 let result = w.write(b"ok").await;
750 assert!(result.is_ok());
751
752 drop(r)
753 }
754
755 #[tokio::test]
756 async fn test_read_to_end() -> io::Result<()> {
757 let (mut r, mut w) = pipe()?;
758 let t = tokio::spawn(async move {
759 w.write_all(&b"Hello, World!"[..]).await?;
760 io::Result::Ok(())
761 });
762
763 let mut buf = vec![];
764 r.read_to_end(&mut buf).await?;
765 assert_eq!(&b"Hello, World!"[..], &buf[..]);
766
767 t.await?
768 }
769
770 #[tokio::test]
771 async fn test_from_child_stdio() -> io::Result<()> {
772 use std::process::Stdio;
773 use tokio::process::Command;
774
775 let (mut r, w) = pipe()?;
776
777 let script = r#"#!/usr/bin/env python3
778import os
779with os.fdopen(1, 'wb') as w:
780 w.write(b"Hello, World!")
781"#;
782
783 let mut command = Command::new("python");
784 command
785 .args(&["-c", script])
786 .stdout(unsafe { Stdio::from_raw_fd(w.as_raw_fd()) });
787 unsafe {
788 command.pre_exec(|| Ok(()));
790 }
791 let mut child = command.spawn()?;
792 drop(w);
793
794 let mut buf = vec![];
795 r.read_to_end(&mut buf).await?;
796 assert_eq!(&b"Hello, World!"[..], &buf[..]);
797
798 child.wait().await?;
799 Ok(())
800 }
801
802 #[tokio::test]
803 async fn test_from_child_no_stdio() -> io::Result<()> {
804 use tokio::process::Command;
805
806 let (mut r, w) = pipe()?;
807
808 let script = r#"#!/usr/bin/env python3
809import os
810with os.fdopen(3, 'wb') as w:
811 w.write(b"Hello, World!")
812"#;
813
814 let mut command = Command::new("python");
815 command.args(&["-c", script]);
816 unsafe {
817 let w = w.as_raw_fd();
818 command.pre_exec(move || {
819 if w == 3 {
820 let flags = libc::fcntl(w, libc::F_SETFD);
822 if flags == -1 {
823 return Err(io::Error::last_os_error());
824 }
825 if flags & libc::FD_CLOEXEC != 0
826 && libc::fcntl(w, libc::F_SETFD, flags ^ libc::FD_CLOEXEC) == -1
827 {
828 return Err(io::Error::last_os_error());
829 }
830 } else {
831 let r = libc::dup2(w, 3);
832 if r == -1 {
833 return Err(io::Error::last_os_error());
834 }
835 }
836 Ok(())
837 });
838 }
839 let mut child = command.spawn()?;
840 drop(w);
841
842 let mut buf = vec![];
843 r.read_to_end(&mut buf).await?;
844 assert_eq!(&b"Hello, World!"[..], &buf[..]);
845
846 child.wait().await?;
847 Ok(())
848 }
849
850 #[cfg(target_os = "linux")]
851 #[tokio::test]
852 async fn test_tee() {
853 let (mut r1, mut w1) = pipe().unwrap();
854 let (mut r2, mut w2) = pipe().unwrap();
855
856 for n in 0..1024 {
857 w1.write_u32(n).await.unwrap();
858 }
859
860 tee(&mut r1, &mut w2, 4096).await.unwrap();
861
862 let r2_task = tokio::spawn(async move {
863 let mut n = 0u32;
864 let mut buf = [0; 4 * 128];
865 while n < 1024 {
866 r2.read_exact(&mut buf).await.unwrap();
867 for x in buf.chunks(4) {
868 assert_eq!(x, n.to_be_bytes());
869 n += 1;
870 }
871 }
872 });
873
874 let r1_task = tokio::spawn(async move {
875 let mut n = 0u32;
876 let mut buf = [0; 4 * 128];
877 while n < 1024 {
878 r1.read_exact(&mut buf).await.unwrap();
879 for x in buf.chunks(4) {
880 assert_eq!(x, n.to_be_bytes());
881 n += 1;
882 }
883 }
884 });
885
886 tokio::try_join!(r1_task, r2_task).unwrap();
887 }
888
889 #[cfg(target_os = "linux")]
890 #[tokio::test]
891 async fn test_tee_no_inf_loop() {
892 let (mut r1, mut w1) = pipe().unwrap();
893 let (mut r2, mut w2) = pipe().unwrap();
894
895 let w1_task = tokio::spawn(async move {
896 sleep(Duration::from_millis(100)).await;
897
898 for n in 0..1024 {
899 w1.write_u32(n).await.unwrap();
900 }
901 });
902
903 for n in 0..1024 {
904 w2.write_u32(n).await.unwrap();
905 }
906
907 let r2_task = tokio::spawn(async move {
908 sleep(Duration::from_millis(200)).await;
909
910 let mut n = 0u32;
911 let mut buf = [0; 4 * 128];
912 while n < 1024 {
913 r2.read_exact(&mut buf).await.unwrap();
914 for x in buf.chunks(4) {
915 assert_eq!(x, n.to_be_bytes());
916 n += 1;
917 }
918 }
919 });
920
921 tee(&mut r1, &mut w2, 4096).await.unwrap();
922
923 tokio::try_join!(w1_task, r2_task).unwrap();
924 }
925
926 #[cfg(target_os = "linux")]
927 #[tokio::test]
928 async fn test_splice() {
929 let (mut r1, mut w1) = pipe().unwrap();
930 let (mut r2, mut w2) = pipe().unwrap();
931
932 for n in 0..1024 {
933 w1.write_u32(n).await.unwrap();
934 }
935
936 splice(&mut r1, &mut w2, 4096).await.unwrap();
937
938 let mut n = 0u32;
939 let mut buf = [0; 4 * 128];
940 while n < 1024 {
941 r2.read_exact(&mut buf).await.unwrap();
942 for x in buf.chunks(4) {
943 assert_eq!(x, n.to_be_bytes());
944 n += 1;
945 }
946 }
947 }
948
949 #[cfg(target_os = "linux")]
950 #[tokio::test]
951 async fn test_splice_no_inf_loop() {
952 let (mut r1, mut w1) = pipe().unwrap();
953 let (mut r2, mut w2) = pipe().unwrap();
954
955 let w1_task = tokio::spawn(async move {
956 sleep(Duration::from_millis(100)).await;
957
958 for n in 0..1024 {
959 w1.write_u32(n).await.unwrap();
960 }
961 });
962
963 for n in 0..1024 {
964 w2.write_u32(n).await.unwrap();
965 }
966
967 let r2_task = tokio::spawn(async move {
968 sleep(Duration::from_millis(200)).await;
969
970 let mut n = 0u32;
971 let mut buf = [0; 4 * 128];
972 while n < 1024 {
973 r2.read_exact(&mut buf).await.unwrap();
974 for x in buf.chunks(4) {
975 assert_eq!(x, n.to_be_bytes());
976 n += 1;
977 }
978 }
979 });
980
981 splice(&mut r1, &mut w2, 4096).await.unwrap();
982
983 tokio::try_join!(w1_task, r2_task).unwrap();
984 }
985
986 fn as_ioslice<T>(v: &[T]) -> io::IoSlice<'_> {
987 io::IoSlice::new(unsafe {
988 std::slice::from_raw_parts(v.as_ptr() as *const u8, v.len() * std::mem::size_of::<T>())
989 })
990 }
991
992 #[tokio::test]
993 async fn test_writev() {
994 let (mut r, mut w) = pipe().unwrap();
995
996 let w_task = tokio::spawn(async move {
997 let buffer1: Vec<u32> = (0..512).collect();
998 let buffer2: Vec<u32> = (512..1024).collect();
999
1000 w.write_vectored(&[as_ioslice(&buffer1), as_ioslice(&buffer2)])
1001 .await
1002 .unwrap();
1003 });
1004
1005 let r_task = tokio::spawn(async move {
1006 let mut n = 0u32;
1007 let mut buf = [0; 4 * 128];
1008 while n < 1024 {
1009 r.read_exact(&mut buf).await.unwrap();
1010 for x in buf.chunks(4) {
1011 assert_eq!(x, n.to_ne_bytes());
1012 n += 1;
1013 }
1014 }
1015 });
1016 tokio::try_join!(w_task, r_task).unwrap();
1017 }
1018
1019 #[tokio::test]
1020 async fn test_piperead_from_raw_fd_checked_success() {
1021 let (r, _w) = pipe().unwrap();
1022 let _r = PipeRead::from_raw_fd_checked(r.into_raw_fd()).unwrap();
1023 }
1024
1025 #[tokio::test]
1026 async fn test_piperead_from_raw_fd_checked_failure_not_read_end() {
1027 let (_r, w) = pipe().unwrap();
1028 let error = PipeRead::from_raw_fd_checked(w.into_raw_fd())
1029 .unwrap_err()
1030 .into_inner()
1031 .unwrap();
1032
1033 assert_eq!(format!("{}", error), "Fd is not the read end");
1034 }
1035
1036 #[tokio::test]
1037 async fn test_piperead_from_raw_fd_checked_failure_not_pipe() {
1038 let fd = File::open("/dev/null").unwrap().into_raw_fd();
1039 let error = PipeRead::from_raw_fd_checked(fd)
1040 .unwrap_err()
1041 .into_inner()
1042 .unwrap();
1043
1044 assert_eq!(format!("{}", error), "Fd is not a pipe");
1045 }
1046
1047 #[tokio::test]
1048 async fn test_pipewrite_from_raw_fd_checked_success() {
1049 let (_r, w) = pipe().unwrap();
1050 let _w = PipeWrite::from_raw_fd_checked(w.into_raw_fd()).unwrap();
1051 }
1052
1053 #[tokio::test]
1054 async fn test_pipewrite_from_raw_fd_checked_failure_not_write_end() {
1055 let (r, _w) = pipe().unwrap();
1056 let error = PipeWrite::from_raw_fd_checked(r.into_raw_fd())
1057 .unwrap_err()
1058 .into_inner()
1059 .unwrap();
1060
1061 assert_eq!(format!("{}", error), "Fd is not the write end");
1062 }
1063
1064 #[tokio::test]
1065 async fn test_pipewrite_from_raw_fd_checked_failure_not_pipe() {
1066 let fd = File::open("/dev/null").unwrap().into_raw_fd();
1067 let error = PipeWrite::from_raw_fd_checked(fd)
1068 .unwrap_err()
1069 .into_inner()
1070 .unwrap();
1071
1072 assert_eq!(format!("{}", error), "Fd is not a pipe");
1073 }
1074
1075 #[test]
1076 fn test_atomic_write_io_slices() {
1077 let bytes: Vec<u8> = (0..PIPE_BUF + 20)
1078 .map(|i| (i % (u8::MAX as usize)) as u8)
1079 .collect();
1080 let mut io_slices = Vec::<io::IoSlice<'_>>::new();
1081
1082 for i in 0..bytes.len() {
1083 io_slices.push(io::IoSlice::new(&bytes[i..i + 1]));
1084 }
1085
1086 for i in 0..PIPE_BUF {
1087 let slices = AtomicWriteIoSlices::new(&io_slices[..i]).unwrap();
1088 assert_eq!(slices.get_total_len(), i);
1089 }
1090
1091 for i in PIPE_BUF + 1..bytes.len() {
1092 assert!(AtomicWriteIoSlices::new(&io_slices[..i]).is_none());
1093 }
1094 }
1095}