async_send_fd/
tokio_stream.rs1use std::{
2 io::{Error, ErrorKind},
3 os::unix::{
4 io::{AsRawFd, RawFd},
5 net::UnixStream as OsUnixStream,
6 prelude::{FromRawFd, IntoRawFd},
7 },
8 process,
9};
10
11use tokio::{
12 io::Interest,
13 net::{
14 unix::{OwnedReadHalf, OwnedWriteHalf, ReadHalf, WriteHalf},
15 UnixStream,
16 },
17};
18
19use passfd::FdPassingExt;
20
21use crate::{AsyncRecvFd, AsyncSendFd};
22
23pub trait AsyncSendTokioStream {
25 fn send_stream(
26 &self,
27 fd: UnixStream,
28 ) -> impl std::future::Future<Output = Result<(), Error>> + Send;
29}
30
31pub trait AsyncRecvTokioStream {
33 fn recv_stream(&self) -> impl std::future::Future<Output = Result<UnixStream, Error>> + Send;
34}
35
36impl AsyncRecvFd for UnixStream {
37 async fn recv_fd(&self) -> Result<RawFd, Error> {
38 loop {
39 self.readable().await?;
40
41 match self.try_io(Interest::READABLE, || self.as_raw_fd().recv_fd()) {
42 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
43 continue;
44 }
45 r => return r,
46 }
47 }
48 }
49}
50
51impl AsyncSendFd for UnixStream {
52 async fn send_fd(&self, fd: RawFd) -> Result<(), Error> {
53 loop {
54 self.writable().await?;
55
56 match self.try_io(Interest::WRITABLE, || self.as_raw_fd().send_fd(fd)) {
57 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
58 continue;
59 }
60 Ok(_) => {
61 if let Some(pid) = self.peer_cred().ok().and_then(|creds| creds.pid()) {
63 if pid as u32 != process::id() {
64 nix::unistd::close(fd)?;
65 }
66 }
67
68 return Ok(());
69 }
70 r => return r,
71 }
72 }
73 }
74}
75
76impl AsyncSendTokioStream for UnixStream {
77 async fn send_stream(&self, stream: UnixStream) -> Result<(), Error> {
78 let fd = stream.into_std()?.into_raw_fd();
79
80 self.send_fd(fd).await
81 }
82}
83
84impl AsyncRecvTokioStream for UnixStream {
85 async fn recv_stream(&self) -> Result<UnixStream, Error> {
86 let fd = self.recv_fd().await?;
87
88 let os_stream = unsafe { OsUnixStream::from_raw_fd(fd) };
89 UnixStream::from_std(os_stream)
90 }
91}
92
93impl AsyncRecvFd for ReadHalf<'_> {
94 async fn recv_fd(&self) -> Result<RawFd, Error> {
95 self.as_ref().recv_fd().await
96 }
97}
98
99impl AsyncRecvTokioStream for ReadHalf<'_> {
100 async fn recv_stream(&self) -> Result<UnixStream, Error> {
101 self.as_ref().recv_stream().await
102 }
103}
104
105impl AsyncSendFd for WriteHalf<'_> {
106 async fn send_fd(&self, fd: RawFd) -> Result<(), Error> {
107 self.as_ref().send_fd(fd).await
108 }
109}
110
111impl AsyncSendTokioStream for WriteHalf<'_> {
112 async fn send_stream(&self, stream: UnixStream) -> Result<(), Error> {
113 self.as_ref().send_stream(stream).await
114 }
115}
116
117impl AsyncRecvFd for OwnedReadHalf {
118 async fn recv_fd(&self) -> Result<RawFd, Error> {
119 self.as_ref().recv_fd().await
120 }
121}
122
123impl AsyncRecvTokioStream for OwnedReadHalf {
124 async fn recv_stream(&self) -> Result<UnixStream, Error> {
125 self.as_ref().recv_stream().await
126 }
127}
128
129impl AsyncSendFd for OwnedWriteHalf {
130 async fn send_fd(&self, fd: RawFd) -> Result<(), Error> {
131 self.as_ref().send_fd(fd).await
132 }
133}
134
135impl AsyncSendTokioStream for OwnedWriteHalf {
136 async fn send_stream(&self, stream: UnixStream) -> Result<(), Error> {
137 self.as_ref().send_stream(stream).await
138 }
139}