async_send_fd/
tokio_stream.rs

1use std::{
2    io::{Error, ErrorKind},
3    os::unix::{
4        io::{AsRawFd, RawFd},
5        net::UnixStream as OsUnixStream,
6        prelude::{FromRawFd, IntoRawFd},
7    },
8    process,
9};
10
11use tokio::{
12    io::Interest,
13    net::{
14        unix::{OwnedReadHalf, OwnedWriteHalf, ReadHalf, WriteHalf},
15        UnixStream,
16    },
17};
18
19use passfd::FdPassingExt;
20
21use crate::{AsyncRecvFd, AsyncSendFd};
22
23/// A trait to send raw file descriptors
24pub trait AsyncSendTokioStream {
25    fn send_stream(
26        &self,
27        fd: UnixStream,
28    ) -> impl std::future::Future<Output = Result<(), Error>> + Send;
29}
30
31/// A trait to receive raw file descriptors
32pub trait AsyncRecvTokioStream {
33    fn recv_stream(&self) -> impl std::future::Future<Output = Result<UnixStream, Error>> + Send;
34}
35
36impl AsyncRecvFd for UnixStream {
37    async fn recv_fd(&self) -> Result<RawFd, Error> {
38        loop {
39            self.readable().await?;
40
41            match self.try_io(Interest::READABLE, || self.as_raw_fd().recv_fd()) {
42                Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
43                    continue;
44                }
45                r => return r,
46            }
47        }
48    }
49}
50
51impl AsyncSendFd for UnixStream {
52    async fn send_fd(&self, fd: RawFd) -> Result<(), Error> {
53        loop {
54            self.writable().await?;
55
56            match self.try_io(Interest::WRITABLE, || self.as_raw_fd().send_fd(fd)) {
57                Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
58                    continue;
59                }
60                Ok(_) => {
61                    // Try to close the FD if sent into another process
62                    if let Some(pid) = self.peer_cred().ok().and_then(|creds| creds.pid()) {
63                        if pid as u32 != process::id() {
64                            nix::unistd::close(fd)?;
65                        }
66                    }
67
68                    return Ok(());
69                }
70                r => return r,
71            }
72        }
73    }
74}
75
76impl AsyncSendTokioStream for UnixStream {
77    async fn send_stream(&self, stream: UnixStream) -> Result<(), Error> {
78        let fd = stream.into_std()?.into_raw_fd();
79
80        self.send_fd(fd).await
81    }
82}
83
84impl AsyncRecvTokioStream for UnixStream {
85    async fn recv_stream(&self) -> Result<UnixStream, Error> {
86        let fd = self.recv_fd().await?;
87
88        let os_stream = unsafe { OsUnixStream::from_raw_fd(fd) };
89        UnixStream::from_std(os_stream)
90    }
91}
92
93impl AsyncRecvFd for ReadHalf<'_> {
94    async fn recv_fd(&self) -> Result<RawFd, Error> {
95        self.as_ref().recv_fd().await
96    }
97}
98
99impl AsyncRecvTokioStream for ReadHalf<'_> {
100    async fn recv_stream(&self) -> Result<UnixStream, Error> {
101        self.as_ref().recv_stream().await
102    }
103}
104
105impl AsyncSendFd for WriteHalf<'_> {
106    async fn send_fd(&self, fd: RawFd) -> Result<(), Error> {
107        self.as_ref().send_fd(fd).await
108    }
109}
110
111impl AsyncSendTokioStream for WriteHalf<'_> {
112    async fn send_stream(&self, stream: UnixStream) -> Result<(), Error> {
113        self.as_ref().send_stream(stream).await
114    }
115}
116
117impl AsyncRecvFd for OwnedReadHalf {
118    async fn recv_fd(&self) -> Result<RawFd, Error> {
119        self.as_ref().recv_fd().await
120    }
121}
122
123impl AsyncRecvTokioStream for OwnedReadHalf {
124    async fn recv_stream(&self) -> Result<UnixStream, Error> {
125        self.as_ref().recv_stream().await
126    }
127}
128
129impl AsyncSendFd for OwnedWriteHalf {
130    async fn send_fd(&self, fd: RawFd) -> Result<(), Error> {
131        self.as_ref().send_fd(fd).await
132    }
133}
134
135impl AsyncSendTokioStream for OwnedWriteHalf {
136    async fn send_stream(&self, stream: UnixStream) -> Result<(), Error> {
137        self.as_ref().send_stream(stream).await
138    }
139}