tokio_seqpacket/socket.rs
1use filedesc::FileDesc;
2use std::io::{IoSlice, IoSliceMut};
3use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, IntoRawFd, OwnedFd};
4use std::path::Path;
5use std::task::{Context, Poll};
6use tokio::io::unix::AsyncFd;
7
8use crate::ancillary::{AncillaryMessageReader, AncillaryMessageWriter};
9use crate::{sys, UCred};
10
11/// Unix seqpacket socket.
12///
13/// Note that there are no functions to get the local or remote address of the connection.
14/// That is because connected Unix sockets are always anonymous,
15/// which means that the address contains no useful information.
16pub struct UnixSeqpacket {
17 io: AsyncFd<FileDesc>,
18}
19
20impl std::fmt::Debug for UnixSeqpacket {
21 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
22 f.debug_struct("UnixSeqpacket")
23 .field("fd", &self.io.get_ref().as_raw_fd())
24 .finish()
25 }
26}
27
28impl AsFd for UnixSeqpacket {
29 fn as_fd(&self) -> BorrowedFd<'_> {
30 self.io.get_ref().as_fd()
31 }
32}
33
34impl TryFrom<OwnedFd> for UnixSeqpacket {
35 type Error = std::io::Error;
36
37 fn try_from(fd: OwnedFd) -> Result<Self, Self::Error> {
38 Self::new(FileDesc::new(fd))
39 }
40}
41
42impl From<UnixSeqpacket> for OwnedFd {
43 fn from(socket: UnixSeqpacket) -> Self {
44 socket.io.into_inner().into_fd()
45 }
46}
47
48impl UnixSeqpacket {
49 pub(crate) fn new(socket: FileDesc) -> std::io::Result<Self> {
50 let io = AsyncFd::new(socket)?;
51 Ok(Self { io })
52 }
53
54 /// Connect a new seqpacket socket to the given address.
55 pub async fn connect<P: AsRef<Path>>(address: P) -> std::io::Result<Self> {
56 let socket = sys::local_seqpacket_socket()?;
57 if let Err(e) = sys::connect(&socket, address) {
58 if e.kind() != std::io::ErrorKind::WouldBlock {
59 return Err(e);
60 }
61 }
62
63 let socket = Self::new(socket)?;
64 socket.io.writable().await?.retain_ready();
65 Ok(socket)
66 }
67
68 /// Create a pair of connected seqpacket sockets.
69 pub fn pair() -> std::io::Result<(Self, Self)> {
70 let (a, b) = sys::local_seqpacket_pair()?;
71 Ok((Self::new(a)?, Self::new(b)?))
72 }
73
74 /// Wrap a raw file descriptor as [`UnixSeqpacket`].
75 ///
76 /// Registration of the file descriptor with the tokio runtime may fail.
77 /// For that reason, this function returns a [`std::io::Result`].
78 ///
79 /// # Safety
80 /// This function is unsafe because the socket assumes it is the sole owner of the file descriptor.
81 /// Usage of this function could accidentally allow violating this contract
82 /// which can cause memory unsafety in code that relies on it being true.
83 pub unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> std::io::Result<Self> {
84 Self::new(FileDesc::from_raw_fd(fd))
85 }
86
87 /// Get the raw file descriptor of the socket.
88 ///
89 /// This is a shortcut for `seqpacket.as_async_fd().as_raw_fd()`.
90 /// See [`as_async_fd`](Self::as_async_fd).
91 pub fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
92 self.io.as_raw_fd()
93 }
94
95 /// Deregister the socket from the tokio runtime and return the inner file descriptor.
96 pub fn into_raw_fd(self) -> std::os::unix::io::RawFd {
97 self.io.into_inner().into_raw_fd()
98 }
99
100 #[doc(hidden)]
101 #[deprecated(
102 since = "0.4.0",
103 note = "all I/O functions now take a shared reference to self, so splitting is no longer necessary"
104 )]
105 pub fn split(&self) -> (&Self, &Self) {
106 (self, self)
107 }
108
109 /// Get the async file descriptor of this object.
110 ///
111 /// This can be useful for applications that want to do low-level socket calls, such as
112 /// [`sendmsg`](libc::sendmsg), but still want to use async and need to know when the socket is
113 /// ready to be used.
114 ///
115 /// Example:
116 /// ```
117 /// # async fn f() -> std::io::Result<()> {
118 /// let seqpacket = tokio_seqpacket::UnixSeqpacket::connect("/tmp/example.sock").await?;
119 /// seqpacket.as_async_fd().writable().await?.retain_ready();
120 /// # Ok(()) }
121 /// ```
122 pub fn as_async_fd(&self) -> &AsyncFd<FileDesc> {
123 &self.io
124 }
125
126 /// Get the effective credentials of the process which called `connect` or `pair`.
127 ///
128 /// Note that this is not necessarily the process that currently has the file descriptor
129 /// of the other side of the connection.
130 pub fn peer_cred(&self) -> std::io::Result<UCred> {
131 UCred::from_socket_peer(&self.io)
132 }
133
134 /// Get and clear the value of the `SO_ERROR` option.
135 pub fn take_error(&self) -> std::io::Result<Option<std::io::Error>> {
136 sys::take_socket_error(self.io.get_ref())
137 }
138
139 /// Try to send data on the socket to the connected peer without blocking.
140 ///
141 /// If the socket is not ready yet, the current task is scheduled to wake up when the socket becomes writeable.
142 ///
143 /// Note that unlike [`Self::send`], only the last task calling this function will be woken up.
144 /// For that reason, it is preferable to use the async functions rather than polling functions when possible.
145 pub fn poll_send(&self, cx: &mut Context, buffer: &[u8]) -> Poll<std::io::Result<usize>> {
146 loop {
147 let mut ready_guard = ready!(self.io.poll_write_ready(cx)?);
148
149 match ready_guard.try_io(|inner| sys::send(inner.get_ref(), buffer)) {
150 Ok(result) => return Poll::Ready(result),
151 Err(_would_block) => continue,
152 }
153 }
154 }
155
156 /// Try to send data on the socket to the connected peer without blocking.
157 ///
158 /// If the socket is not ready yet, the current task is scheduled to wake up when the socket becomes writeable.
159 ///
160 /// Note that unlike [`Self::send_vectored`], only the last task calling this function will be woken up.
161 /// For that reason, it is preferable to use the async functions rather than polling functions when possible.
162 pub fn poll_send_vectored(&self, cx: &mut Context, buffer: &[IoSlice]) -> Poll<std::io::Result<usize>> {
163 self.poll_send_vectored_with_ancillary(cx, buffer, &mut AncillaryMessageWriter::new(&mut []))
164 }
165
166 /// Try to send data with ancillary data on the socket to the connected peer without blocking.
167 ///
168 /// If the socket is not ready yet, the current task is scheduled to wake up when the socket becomes writeable.
169 ///
170 /// Note that unlike [`Self::send_vectored_with_ancillary`], only the last task calling this function will be woken up.
171 /// For that reason, it is preferable to use the async functions rather than polling functions when possible.
172 pub fn poll_send_vectored_with_ancillary(
173 &self,
174 cx: &mut Context,
175 buffer: &[IoSlice],
176 ancillary: &mut AncillaryMessageWriter,
177 ) -> Poll<std::io::Result<usize>> {
178 loop {
179 let mut ready_guard = ready!(self.io.poll_write_ready(cx)?);
180 match ready_guard.try_io(|inner| sys::send_msg(inner.get_ref(), buffer, ancillary)) {
181 Ok(result) => return Poll::Ready(result),
182 Err(_would_block) => continue,
183 }
184 }
185 }
186
187 /// Send data on the socket to the connected peer.
188 ///
189 /// This function is safe to call concurrently from different tasks.
190 /// All calling tasks will try to complete the asynchronous action,
191 /// although the order in which they complete is not guaranteed.
192 pub async fn send(&self, buffer: &[u8]) -> std::io::Result<usize> {
193 loop {
194 let mut ready_guard = self.io.writable().await?;
195
196 match ready_guard.try_io(|inner| sys::send(inner.get_ref(), buffer)) {
197 Ok(result) => return result,
198 Err(_would_block) => continue,
199 }
200 }
201 }
202
203 /// Send data on the socket to the connected peer.
204 ///
205 /// This function is safe to call concurrently from different tasks.
206 /// All calling tasks will try to complete the asynchronous action,
207 /// although the order in which they complete is not guaranteed.
208 pub async fn send_vectored(&self, buffer: &[IoSlice<'_>]) -> std::io::Result<usize> {
209 self.send_vectored_with_ancillary(buffer, &mut AncillaryMessageWriter::new(&mut []))
210 .await
211 }
212
213 /// Send data with ancillary data on the socket to the connected peer.
214 ///
215 /// This function is safe to call concurrently from different tasks.
216 /// All calling tasks will try to complete the asynchronous action,
217 /// although the order in which they complete is not guaranteed.
218 pub async fn send_vectored_with_ancillary(
219 &self,
220 buffer: &[IoSlice<'_>],
221 ancillary: &mut AncillaryMessageWriter<'_>,
222 ) -> std::io::Result<usize> {
223 loop {
224 let mut ready_guard = self.io.writable().await?;
225 match ready_guard.try_io(|inner| sys::send_msg(inner.get_ref(), buffer, ancillary)) {
226 Ok(result) => return result,
227 Err(_would_block) => continue,
228 }
229 }
230 }
231
232 /// Try to receive data on the socket from the connected peer without blocking.
233 ///
234 /// If there is no data ready yet, the current task is scheduled to wake up when the socket becomes readable.
235 ///
236 /// Note that unlike [`Self::recv`], only the last task calling this function will be woken up.
237 /// For that reason, it is preferable to use the async functions rather than polling functions when possible.
238 pub fn poll_recv(&self, cx: &mut Context, buffer: &mut [u8]) -> Poll<std::io::Result<usize>> {
239 loop {
240 let mut ready_guard = ready!(self.io.poll_read_ready(cx)?);
241 match ready_guard.try_io(|inner| sys::recv(inner.get_ref(), buffer)) {
242 Ok(result) => return Poll::Ready(result),
243 Err(_would_block) => continue,
244 }
245 }
246 }
247
248 /// Try to receive data on the socket from the connected peer without blocking.
249 ///
250 /// If there is no data ready yet, the current task is scheduled to wake up when the socket becomes readable.
251 ///
252 /// Note that unlike [`Self::recv_vectored`], only the last task calling this function will be woken up.
253 /// For that reason, it is preferable to use the async functions rather than polling functions when possible.
254 pub fn poll_recv_vectored(&self, cx: &mut Context, buffer: &mut [IoSliceMut]) -> Poll<std::io::Result<usize>> {
255 let (read, _ancillary) = ready!(self.poll_recv_vectored_with_ancillary(cx, buffer, &mut []))?;
256 Poll::Ready(Ok(read))
257 }
258
259 /// Try to receive data with ancillary data on the socket from the connected peer without blocking.
260 ///
261 /// Any file descriptors received in the anicallary data will have the `close-on-exec` flag set.
262 /// If the OS supports it, this is done atomically with the reception of the message.
263 /// However, on Illumos and Solaris, the `close-on-exec` flag is set in a separate step after receiving the message.
264 ///
265 /// Note that you should always wrap or close any file descriptors received this way.
266 /// If you do not, the received file descriptors will stay open until the process is terminated.
267 ///
268 /// If there is no data ready yet, the current task is scheduled to wake up when the socket becomes readable.
269 ///
270 /// Note that unlike [`Self::recv_vectored_with_ancillary`], only the last task calling this function will be woken up.
271 /// For that reason, it is preferable to use the async functions rather than polling functions when possible.
272 pub fn poll_recv_vectored_with_ancillary<'a>(
273 &self,
274 cx: &mut Context,
275 buffer: &mut [IoSliceMut],
276 ancillary_buffer: &'a mut [u8],
277 ) -> Poll<std::io::Result<(usize, AncillaryMessageReader<'a>)>> {
278 loop {
279 let mut ready_guard = ready!(self.io.poll_read_ready(cx)?);
280
281 let (read, ancillary_reader) = match ready_guard.try_io(|inner| sys::recv_msg(inner.get_ref(), buffer, ancillary_buffer)) {
282 Ok(x) => x?,
283 Err(_would_block) => continue,
284 };
285
286 // SAFETY: We have to work around a borrow checker bug:
287 // It doesn't know that we return in this branch, so the loop terminates.
288 // It thinks we will do another mutable borrow in the next loop iteration.
289 // TODO: Remove this transmute once the borrow checker is smart enough.
290 return Poll::Ready(Ok((read, unsafe { transmute_lifetime(ancillary_reader) })));
291 }
292 }
293
294 /// Receive data on the socket from the connected peer.
295 ///
296 /// This function is safe to call concurrently from different tasks.
297 /// All calling tasks will try to complete the asynchronous action,
298 /// although the order in which they complete is not guaranteed.
299 pub async fn recv(&self, buffer: &mut [u8]) -> std::io::Result<usize> {
300 loop {
301 let mut ready_guard = self.io.readable().await?;
302 match ready_guard.try_io(|inner| sys::recv(inner.get_ref(), buffer)) {
303 Ok(result) => return result,
304 Err(_would_block) => continue,
305 }
306 }
307 }
308
309 /// Receive data on the socket from the connected peer.
310 ///
311 /// This function is safe to call concurrently from different tasks.
312 /// All calling tasks will try to complete the asynchronous action,
313 /// although the order in which they complete is not guaranteed.
314 pub async fn recv_vectored(&self, buffer: &mut [IoSliceMut<'_>]) -> std::io::Result<usize> {
315 let (read, _ancillary) = self.recv_vectored_with_ancillary(buffer, &mut [])
316 .await?;
317 Ok(read)
318 }
319
320 /// Receive data with ancillary data on the socket from the connected peer.
321 ///
322 /// Any file descriptors received in the anicallary data will have the `close-on-exec` flag set.
323 /// If the OS supports it, this is done atomically with the reception of the message.
324 /// However, on Illumos and Solaris, the `close-on-exec` flag is set in a separate step after receiving the message.
325 ///
326 /// Note that you should always wrap or close any file descriptors received this way.
327 /// If you do not, the received file descriptors will stay open until the process is terminated.
328 ///
329 /// This function is safe to call concurrently from different tasks.
330 /// All calling tasks will try to complete the asynchronous action,
331 /// although the order in which they complete is not guaranteed.
332 pub async fn recv_vectored_with_ancillary<'a>(
333 &self,
334 buffer: &mut [IoSliceMut<'_>],
335 ancillary_buffer: &'a mut [u8],
336 ) -> std::io::Result<(usize, AncillaryMessageReader<'a>)> {
337 loop {
338 let mut ready_guard = self.io.readable().await?;
339
340 let (read, ancillary_reader) = match ready_guard.try_io(|inner| sys::recv_msg(inner.get_ref(), buffer, ancillary_buffer)) {
341 Ok(x) => x?,
342 Err(_would_block) => continue,
343 };
344
345 // SAFETY: We have to work around a borrow checker bug:
346 // It doesn't know that we return in this branch, so the loop terminates.
347 // It thinks we will do another mutable borrow in the next loop iteration.
348 // TODO: Remove this transmute once the borrow checker is smart enough.
349 return Ok((read, unsafe { transmute_lifetime(ancillary_reader) }));
350 }
351 }
352
353 /// Shuts down the read, write, or both halves of this connection.
354 ///
355 /// This function will cause all pending and future I/O calls on the
356 /// specified portions to immediately return with an appropriate value
357 /// (see the documentation of `Shutdown`).
358 pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
359 sys::shutdown(self.io.get_ref(), how)
360 }
361}
362
363impl AsRawFd for UnixSeqpacket {
364 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
365 self.as_raw_fd()
366 }
367}
368
369impl IntoRawFd for UnixSeqpacket {
370 fn into_raw_fd(self) -> std::os::unix::io::RawFd {
371 self.into_raw_fd()
372 }
373}
374
375/// Transmute the lifetime of a `AncillaryMessageReader`.
376///
377/// Exists to ensure we do not accidentally transmute more than we intend to.
378///
379/// # Safety
380/// All the safety requirements of [`std::mem::transmute`] should be uphold.
381#[allow(clippy::needless_lifetimes)]
382unsafe fn transmute_lifetime<'a, 'b>(input: AncillaryMessageReader<'a>) -> AncillaryMessageReader<'b> {
383 std::mem::transmute(input)
384}