interprocess_docfix/os/unix/udsocket/tokio/stream/
mod.rs1#[cfg(uds_supported)]
2use super::c_wrappers;
3use {
4 crate::os::unix::{
5 imports::*,
6 udsocket::{ToUdSocketPath, UdSocketPath, UdStream as SyncUdStream},
7 },
8 std::{
9 convert::TryFrom,
10 error::Error,
11 fmt::{self, Formatter},
12 io,
13 net::Shutdown,
14 pin::Pin,
15 task::{Context, Poll},
16 },
17};
18
19mod connect_future;
20mod read_half;
21mod write_half;
22use connect_future::*;
23pub use {read_half::*, write_half::*};
24
25#[derive(Debug)]
30pub struct UdStream(TokioUdStream);
31impl UdStream {
32 pub async fn connect(path: impl ToUdSocketPath<'_>) -> io::Result<Self> {
36 let path = path.to_socket_path()?;
37 Self::_connect(&path).await
38 }
39 async fn _connect(path: &UdSocketPath<'_>) -> io::Result<Self> {
40 let stream = ConnectFuture { path }.await?;
41 Self::from_sync(stream)
42 }
43
44 pub fn split(&mut self) -> (BorrowedReadHalf<'_>, BorrowedWriteHalf<'_>) {
48 let (read_tok, write_tok) = self.0.split();
49 (BorrowedReadHalf(read_tok), BorrowedWriteHalf(write_tok))
50 }
51 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
57 let (read_tok, write_tok) = self.0.into_split();
58 (OwnedReadHalf(read_tok), OwnedWriteHalf(write_tok))
59 }
60 pub fn reunite(read: OwnedReadHalf, write: OwnedWriteHalf) -> Result<Self, ReuniteError> {
62 let (read_tok, write_tok) = (read.0, write.0);
63 let stream_tok = read_tok.reunite(write_tok)?;
64 Ok(Self::from_tokio(stream_tok))
65 }
66
67 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
71 c_wrappers::shutdown(self.as_raw_fd().as_ref(), how)
72 }
73 #[cfg(any(doc, uds_peercred))]
75 #[cfg_attr( feature = "doc_cfg",
77 doc(cfg(any(
78 all(
79 target_os = "linux",
80 any(
81 target_env = "gnu",
82 target_env = "musl",
83 target_env = "musleabi",
84 target_env = "musleabihf"
85 )
86 ),
87 target_os = "emscripten",
88 target_os = "redox",
89 target_os = "haiku"
90 )))
91 )]
92 pub fn get_peer_credentials(&self) -> io::Result<ucred> {
93 c_wrappers::get_peer_ucred(self.as_raw_fd().as_ref())
94 }
95 fn pinproject(self: Pin<&mut Self>) -> Pin<&mut TokioUdStream> {
96 Pin::new(&mut self.get_mut().0)
97 }
98 tokio_wrapper_conversion_methods!(
99 sync SyncUdStream,
100 std StdUdStream,
101 tokio TokioUdStream);
102}
103tokio_wrapper_trait_impls!(
104 for UdStream,
105 sync SyncUdStream,
106 std StdUdStream,
107 tokio TokioUdStream);
108
109#[cfg(feature = "tokio_support")]
110impl TokioAsyncRead for UdStream {
111 fn poll_read(
112 self: Pin<&mut Self>,
113 cx: &mut Context<'_>,
114 buf: &mut ReadBuf<'_>,
115 ) -> Poll<io::Result<()>> {
116 self.pinproject().poll_read(cx, buf)
117 }
118}
119#[cfg(feature = "tokio_support")]
120impl FuturesAsyncRead for UdStream {
121 fn poll_read(
122 self: Pin<&mut Self>,
123 cx: &mut Context<'_>,
124 buf: &mut [u8],
125 ) -> Poll<io::Result<usize>> {
126 let mut buf = ReadBuf::new(buf);
127 match self.pinproject().poll_read(cx, &mut buf) {
128 Poll::Ready(Ok(())) => Poll::Ready(Ok(buf.filled().len())),
129 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
130 Poll::Pending => Poll::Pending,
131 }
132 }
133}
134#[cfg(feature = "tokio_support")]
135impl TokioAsyncWrite for UdStream {
136 fn poll_write(
137 self: Pin<&mut Self>,
138 cx: &mut Context<'_>,
139 buf: &[u8],
140 ) -> Poll<Result<usize, io::Error>> {
141 self.pinproject().poll_write(cx, buf)
142 }
143 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
144 self.pinproject().poll_flush(cx)
145 }
146 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
148 self.pinproject().poll_shutdown(cx)
149 }
150}
151#[cfg(feature = "tokio_support")]
152impl FuturesAsyncWrite for UdStream {
153 fn poll_write(
154 self: Pin<&mut Self>,
155 cx: &mut Context<'_>,
156 buf: &[u8],
157 ) -> Poll<Result<usize, io::Error>> {
158 self.pinproject().poll_write(cx, buf)
159 }
160 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
161 self.pinproject().poll_flush(cx)
162 }
163 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
165 self.shutdown(Shutdown::Both)?;
166 Poll::Ready(Ok(()))
167 }
168}
169
170#[derive(Debug)]
172pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
173impl Error for ReuniteError {}
174impl fmt::Display for ReuniteError {
175 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
176 f.write_str("tried to reunite halves of different streams")
177 }
178}
179impl From<TokioReuniteError> for ReuniteError {
180 fn from(TokioReuniteError(read, write): TokioReuniteError) -> Self {
181 let read = OwnedReadHalf::from_tokio(read);
182 let write = OwnedWriteHalf::from_tokio(write);
183 Self(read, write)
184 }
185}
186impl From<ReuniteError> for TokioReuniteError {
187 fn from(ReuniteError(read, write): ReuniteError) -> Self {
188 let read = read.into_tokio();
189 let write = write.into_tokio();
190 Self(read, write)
191 }
192}