interprocess_docfix/os/unix/udsocket/tokio/stream/
mod.rs

1#[cfg(uds_supported)]
2use super::c_wrappers;
3use {
4    crate::os::unix::{
5        imports::*,
6        udsocket::{ToUdSocketPath, UdSocketPath, UdStream as SyncUdStream},
7    },
8    std::{
9        convert::TryFrom,
10        error::Error,
11        fmt::{self, Formatter},
12        io,
13        net::Shutdown,
14        pin::Pin,
15        task::{Context, Poll},
16    },
17};
18
19mod connect_future;
20mod read_half;
21mod write_half;
22use connect_future::*;
23pub use {read_half::*, write_half::*};
24
25/// A Unix domain socket byte stream, obtained either from [`UdStreamListener`](super::UdStreamListener) or by connecting to an existing server.
26///
27/// # Examples
28/// - [Basic client](https://github.com/kotauskas/interprocess/blob/main/examples/tokio_udstream/client.rs)
29#[derive(Debug)]
30pub struct UdStream(TokioUdStream);
31impl UdStream {
32    /// Connects to a Unix domain socket server at the specified path.
33    ///
34    /// See [`ToUdSocketPath`] for an example of using various string types to specify socket paths.
35    pub async fn connect(path: impl ToUdSocketPath<'_>) -> io::Result<Self> {
36        let path = path.to_socket_path()?;
37        Self::_connect(&path).await
38    }
39    async fn _connect(path: &UdSocketPath<'_>) -> io::Result<Self> {
40        let stream = ConnectFuture { path }.await?;
41        Self::from_sync(stream)
42    }
43
44    /// Borrows a stream into a read half and a write half, which can be used to read and write the stream concurrently.
45    ///
46    /// This method is more efficient than [`.into_split()`](Self::into_split), but the halves cannot be moved into independently spawned tasks.
47    pub fn split(&mut self) -> (BorrowedReadHalf<'_>, BorrowedWriteHalf<'_>) {
48        let (read_tok, write_tok) = self.0.split();
49        (BorrowedReadHalf(read_tok), BorrowedWriteHalf(write_tok))
50    }
51    /// Splits a stream into a read half and a write half, which can be used to read and write the stream concurrently.
52    ///
53    /// Unlike [`.split()`](Self::split), the owned halves can be moved to separate tasks, which comes at the cost of a heap allocation.
54    ///
55    /// Dropping either half will shut it down. This is equivalent to calling [`.shutdown()`](Self::shutdown) on the stream with the corresponding argument.
56    pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
57        let (read_tok, write_tok) = self.0.into_split();
58        (OwnedReadHalf(read_tok), OwnedWriteHalf(write_tok))
59    }
60    /// Attempts to put two owned halves of a stream back together and recover the original stream. Succeeds only if the two halves originated from the same call to [`.into_split()`](Self::into_split).
61    pub fn reunite(read: OwnedReadHalf, write: OwnedWriteHalf) -> Result<Self, ReuniteError> {
62        let (read_tok, write_tok) = (read.0, write.0);
63        let stream_tok = read_tok.reunite(write_tok)?;
64        Ok(Self::from_tokio(stream_tok))
65    }
66
67    /// Shuts down the read, write, or both halves of the stream. See [`Shutdown`].
68    ///
69    /// Attempting to call this method with the same `how` argument multiple times may return `Ok(())` every time or it may return an error the second time it is called, depending on the platform. You must either avoid using the same value twice or ignore the error entirely.
70    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
71        c_wrappers::shutdown(self.as_raw_fd().as_ref(), how)
72    }
73    /// Fetches the credentials of the other end of the connection without using ancillary data. The returned structure contains the process identifier, user identifier and group identifier of the peer.
74    #[cfg(any(doc, uds_peercred))]
75    #[cfg_attr( // uds_peercred template
76        feature = "doc_cfg",
77        doc(cfg(any(
78            all(
79                target_os = "linux",
80                any(
81                    target_env = "gnu",
82                    target_env = "musl",
83                    target_env = "musleabi",
84                    target_env = "musleabihf"
85                )
86            ),
87            target_os = "emscripten",
88            target_os = "redox",
89            target_os = "haiku"
90        )))
91    )]
92    pub fn get_peer_credentials(&self) -> io::Result<ucred> {
93        c_wrappers::get_peer_ucred(self.as_raw_fd().as_ref())
94    }
95    fn pinproject(self: Pin<&mut Self>) -> Pin<&mut TokioUdStream> {
96        Pin::new(&mut self.get_mut().0)
97    }
98    tokio_wrapper_conversion_methods!(
99        sync SyncUdStream,
100        std StdUdStream,
101        tokio TokioUdStream);
102}
103tokio_wrapper_trait_impls!(
104    for UdStream,
105    sync SyncUdStream,
106    std StdUdStream,
107    tokio TokioUdStream);
108
109#[cfg(feature = "tokio_support")]
110impl TokioAsyncRead for UdStream {
111    fn poll_read(
112        self: Pin<&mut Self>,
113        cx: &mut Context<'_>,
114        buf: &mut ReadBuf<'_>,
115    ) -> Poll<io::Result<()>> {
116        self.pinproject().poll_read(cx, buf)
117    }
118}
119#[cfg(feature = "tokio_support")]
120impl FuturesAsyncRead for UdStream {
121    fn poll_read(
122        self: Pin<&mut Self>,
123        cx: &mut Context<'_>,
124        buf: &mut [u8],
125    ) -> Poll<io::Result<usize>> {
126        let mut buf = ReadBuf::new(buf);
127        match self.pinproject().poll_read(cx, &mut buf) {
128            Poll::Ready(Ok(())) => Poll::Ready(Ok(buf.filled().len())),
129            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
130            Poll::Pending => Poll::Pending,
131        }
132    }
133}
134#[cfg(feature = "tokio_support")]
135impl TokioAsyncWrite for UdStream {
136    fn poll_write(
137        self: Pin<&mut Self>,
138        cx: &mut Context<'_>,
139        buf: &[u8],
140    ) -> Poll<Result<usize, io::Error>> {
141        self.pinproject().poll_write(cx, buf)
142    }
143    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
144        self.pinproject().poll_flush(cx)
145    }
146    /// Finishes immediately. See the `.shutdown()` method.
147    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
148        self.pinproject().poll_shutdown(cx)
149    }
150}
151#[cfg(feature = "tokio_support")]
152impl FuturesAsyncWrite for UdStream {
153    fn poll_write(
154        self: Pin<&mut Self>,
155        cx: &mut Context<'_>,
156        buf: &[u8],
157    ) -> Poll<Result<usize, io::Error>> {
158        self.pinproject().poll_write(cx, buf)
159    }
160    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
161        self.pinproject().poll_flush(cx)
162    }
163    /// Finishes immediately. See the `.shutdown()` method.
164    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
165        self.shutdown(Shutdown::Both)?;
166        Poll::Ready(Ok(()))
167    }
168}
169
170/// Error indicating that a read half and a write half were not from the same stream, and thus could not be reunited.
171#[derive(Debug)]
172pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
173impl Error for ReuniteError {}
174impl fmt::Display for ReuniteError {
175    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
176        f.write_str("tried to reunite halves of different streams")
177    }
178}
179impl From<TokioReuniteError> for ReuniteError {
180    fn from(TokioReuniteError(read, write): TokioReuniteError) -> Self {
181        let read = OwnedReadHalf::from_tokio(read);
182        let write = OwnedWriteHalf::from_tokio(write);
183        Self(read, write)
184    }
185}
186impl From<ReuniteError> for TokioReuniteError {
187    fn from(ReuniteError(read, write): ReuniteError) -> Self {
188        let read = read.into_tokio();
189        let write = write.into_tokio();
190        Self(read, write)
191    }
192}