tokio_seqpacket/
socket.rs

1use filedesc::FileDesc;
2use std::io::{IoSlice, IoSliceMut};
3use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, IntoRawFd, OwnedFd};
4use std::path::Path;
5use std::task::{Context, Poll};
6use tokio::io::unix::AsyncFd;
7
8use crate::ancillary::{AncillaryMessageReader, AncillaryMessageWriter};
9use crate::{sys, UCred};
10
11/// Unix seqpacket socket.
12///
13/// Note that there are no functions to get the local or remote address of the connection.
14/// That is because connected Unix sockets are always anonymous,
15/// which means that the address contains no useful information.
16pub struct UnixSeqpacket {
17	io: AsyncFd<FileDesc>,
18}
19
20impl std::fmt::Debug for UnixSeqpacket {
21	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
22		f.debug_struct("UnixSeqpacket")
23			.field("fd", &self.io.get_ref().as_raw_fd())
24			.finish()
25	}
26}
27
28impl AsFd for UnixSeqpacket {
29	fn as_fd(&self) -> BorrowedFd<'_> {
30		self.io.get_ref().as_fd()
31	}
32}
33
34impl TryFrom<OwnedFd> for UnixSeqpacket {
35	type Error = std::io::Error;
36
37	fn try_from(fd: OwnedFd) -> Result<Self, Self::Error> {
38		Self::new(FileDesc::new(fd))
39	}
40}
41
42impl From<UnixSeqpacket> for OwnedFd {
43	fn from(socket: UnixSeqpacket) -> Self {
44		socket.io.into_inner().into_fd()
45	}
46}
47
48impl UnixSeqpacket {
49	pub(crate) fn new(socket: FileDesc) -> std::io::Result<Self> {
50		let io = AsyncFd::new(socket)?;
51		Ok(Self { io })
52	}
53
54	/// Connect a new seqpacket socket to the given address.
55	pub async fn connect<P: AsRef<Path>>(address: P) -> std::io::Result<Self> {
56		let socket = sys::local_seqpacket_socket()?;
57		if let Err(e) = sys::connect(&socket, address) {
58			if e.kind() != std::io::ErrorKind::WouldBlock {
59				return Err(e);
60			}
61		}
62
63		let socket = Self::new(socket)?;
64		socket.io.writable().await?.retain_ready();
65		Ok(socket)
66	}
67
68	/// Create a pair of connected seqpacket sockets.
69	pub fn pair() -> std::io::Result<(Self, Self)> {
70		let (a, b) = sys::local_seqpacket_pair()?;
71		Ok((Self::new(a)?, Self::new(b)?))
72	}
73
74	/// Wrap a raw file descriptor as [`UnixSeqpacket`].
75	///
76	/// Registration of the file descriptor with the tokio runtime may fail.
77	/// For that reason, this function returns a [`std::io::Result`].
78	///
79	/// # Safety
80	/// This function is unsafe because the socket assumes it is the sole owner of the file descriptor.
81	/// Usage of this function could accidentally allow violating this contract
82	/// which can cause memory unsafety in code that relies on it being true.
83	pub unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> std::io::Result<Self> {
84		Self::new(FileDesc::from_raw_fd(fd))
85	}
86
87	/// Get the raw file descriptor of the socket.
88	///
89	/// This is a shortcut for `seqpacket.as_async_fd().as_raw_fd()`.
90	/// See [`as_async_fd`](Self::as_async_fd).
91	pub fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
92		self.io.as_raw_fd()
93	}
94
95	/// Deregister the socket from the tokio runtime and return the inner file descriptor.
96	pub fn into_raw_fd(self) -> std::os::unix::io::RawFd {
97		self.io.into_inner().into_raw_fd()
98	}
99
100	#[doc(hidden)]
101	#[deprecated(
102		since = "0.4.0",
103		note = "all I/O functions now take a shared reference to self, so splitting is no longer necessary"
104	)]
105	pub fn split(&self) -> (&Self, &Self) {
106		(self, self)
107	}
108
109	/// Get the async file descriptor of this object.
110	///
111	/// This can be useful for applications that want to do low-level socket calls, such as
112	/// [`sendmsg`](libc::sendmsg), but still want to use async and need to know when the socket is
113	/// ready to be used.
114	///
115	/// Example:
116	/// ```
117	/// # async fn f() -> std::io::Result<()> {
118	/// let seqpacket = tokio_seqpacket::UnixSeqpacket::connect("/tmp/example.sock").await?;
119	/// seqpacket.as_async_fd().writable().await?.retain_ready();
120	/// # Ok(()) }
121	/// ```
122	pub fn as_async_fd(&self) -> &AsyncFd<FileDesc> {
123		&self.io
124	}
125
126	/// Get the effective credentials of the process which called `connect` or `pair`.
127	///
128	/// Note that this is not necessarily the process that currently has the file descriptor
129	/// of the other side of the connection.
130	pub fn peer_cred(&self) -> std::io::Result<UCred> {
131		UCred::from_socket_peer(&self.io)
132	}
133
134	/// Get and clear the value of the `SO_ERROR` option.
135	pub fn take_error(&self) -> std::io::Result<Option<std::io::Error>> {
136		sys::take_socket_error(self.io.get_ref())
137	}
138
139	/// Try to send data on the socket to the connected peer without blocking.
140	///
141	/// If the socket is not ready yet, the current task is scheduled to wake up when the socket becomes writeable.
142	///
143	/// Note that unlike [`Self::send`], only the last task calling this function will be woken up.
144	/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
145	pub fn poll_send(&self, cx: &mut Context, buffer: &[u8]) -> Poll<std::io::Result<usize>> {
146		loop {
147			let mut ready_guard = ready!(self.io.poll_write_ready(cx)?);
148
149			match ready_guard.try_io(|inner| sys::send(inner.get_ref(), buffer)) {
150				Ok(result) => return Poll::Ready(result),
151				Err(_would_block) => continue,
152			}
153		}
154	}
155
156	/// Try to send data on the socket to the connected peer without blocking.
157	///
158	/// If the socket is not ready yet, the current task is scheduled to wake up when the socket becomes writeable.
159	///
160	/// Note that unlike [`Self::send_vectored`], only the last task calling this function will be woken up.
161	/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
162	pub fn poll_send_vectored(&self, cx: &mut Context, buffer: &[IoSlice]) -> Poll<std::io::Result<usize>> {
163		self.poll_send_vectored_with_ancillary(cx, buffer, &mut AncillaryMessageWriter::new(&mut []))
164	}
165
166	/// Try to send data with ancillary data on the socket to the connected peer without blocking.
167	///
168	/// If the socket is not ready yet, the current task is scheduled to wake up when the socket becomes writeable.
169	///
170	/// Note that unlike [`Self::send_vectored_with_ancillary`], only the last task calling this function will be woken up.
171	/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
172	pub fn poll_send_vectored_with_ancillary(
173		&self,
174		cx: &mut Context,
175		buffer: &[IoSlice],
176		ancillary: &mut AncillaryMessageWriter,
177	) -> Poll<std::io::Result<usize>> {
178		loop {
179			let mut ready_guard = ready!(self.io.poll_write_ready(cx)?);
180			match ready_guard.try_io(|inner| sys::send_msg(inner.get_ref(), buffer, ancillary)) {
181				Ok(result) => return Poll::Ready(result),
182				Err(_would_block) => continue,
183			}
184		}
185	}
186
187	/// Send data on the socket to the connected peer.
188	///
189	/// This function is safe to call concurrently from different tasks.
190	/// All calling tasks will try to complete the asynchronous action,
191	/// although the order in which they complete is not guaranteed.
192	pub async fn send(&self, buffer: &[u8]) -> std::io::Result<usize> {
193		loop {
194			let mut ready_guard = self.io.writable().await?;
195
196			match ready_guard.try_io(|inner| sys::send(inner.get_ref(), buffer)) {
197				Ok(result) => return result,
198				Err(_would_block) => continue,
199			}
200		}
201	}
202
203	/// Send data on the socket to the connected peer.
204	///
205	/// This function is safe to call concurrently from different tasks.
206	/// All calling tasks will try to complete the asynchronous action,
207	/// although the order in which they complete is not guaranteed.
208	pub async fn send_vectored(&self, buffer: &[IoSlice<'_>]) -> std::io::Result<usize> {
209		self.send_vectored_with_ancillary(buffer, &mut AncillaryMessageWriter::new(&mut []))
210			.await
211	}
212
213	/// Send data with ancillary data on the socket to the connected peer.
214	///
215	/// This function is safe to call concurrently from different tasks.
216	/// All calling tasks will try to complete the asynchronous action,
217	/// although the order in which they complete is not guaranteed.
218	pub async fn send_vectored_with_ancillary(
219		&self,
220		buffer: &[IoSlice<'_>],
221		ancillary: &mut AncillaryMessageWriter<'_>,
222	) -> std::io::Result<usize> {
223		loop {
224			let mut ready_guard = self.io.writable().await?;
225			match ready_guard.try_io(|inner| sys::send_msg(inner.get_ref(), buffer, ancillary)) {
226				Ok(result) => return result,
227				Err(_would_block) => continue,
228			}
229		}
230	}
231
232	/// Try to receive data on the socket from the connected peer without blocking.
233	///
234	/// If there is no data ready yet, the current task is scheduled to wake up when the socket becomes readable.
235	///
236	/// Note that unlike [`Self::recv`], only the last task calling this function will be woken up.
237	/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
238	pub fn poll_recv(&self, cx: &mut Context, buffer: &mut [u8]) -> Poll<std::io::Result<usize>> {
239		loop {
240			let mut ready_guard = ready!(self.io.poll_read_ready(cx)?);
241			match ready_guard.try_io(|inner| sys::recv(inner.get_ref(), buffer)) {
242				Ok(result) => return Poll::Ready(result),
243				Err(_would_block) => continue,
244			}
245		}
246	}
247
248	/// Try to receive data on the socket from the connected peer without blocking.
249	///
250	/// If there is no data ready yet, the current task is scheduled to wake up when the socket becomes readable.
251	///
252	/// Note that unlike [`Self::recv_vectored`], only the last task calling this function will be woken up.
253	/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
254	pub fn poll_recv_vectored(&self, cx: &mut Context, buffer: &mut [IoSliceMut]) -> Poll<std::io::Result<usize>> {
255		let (read, _ancillary) = ready!(self.poll_recv_vectored_with_ancillary(cx, buffer, &mut []))?;
256		Poll::Ready(Ok(read))
257	}
258
259	/// Try to receive data with ancillary data on the socket from the connected peer without blocking.
260	///
261	/// Any file descriptors received in the anicallary data will have the `close-on-exec` flag set.
262	/// If the OS supports it, this is done atomically with the reception of the message.
263	/// However, on Illumos and Solaris, the `close-on-exec` flag is set in a separate step after receiving the message.
264	///
265	/// Note that you should always wrap or close any file descriptors received this way.
266	/// If you do not, the received file descriptors will stay open until the process is terminated.
267	///
268	/// If there is no data ready yet, the current task is scheduled to wake up when the socket becomes readable.
269	///
270	/// Note that unlike [`Self::recv_vectored_with_ancillary`], only the last task calling this function will be woken up.
271	/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
272	pub fn poll_recv_vectored_with_ancillary<'a>(
273		&self,
274		cx: &mut Context,
275		buffer: &mut [IoSliceMut],
276		ancillary_buffer: &'a mut [u8],
277	) -> Poll<std::io::Result<(usize, AncillaryMessageReader<'a>)>> {
278		loop {
279			let mut ready_guard = ready!(self.io.poll_read_ready(cx)?);
280
281			let (read, ancillary_reader) = match ready_guard.try_io(|inner| sys::recv_msg(inner.get_ref(), buffer, ancillary_buffer)) {
282				Ok(x) => x?,
283				Err(_would_block) => continue,
284			};
285
286			// SAFETY: We have to work around a borrow checker bug:
287			// It doesn't know that we return in this branch, so the loop terminates.
288			// It thinks we will do another mutable borrow in the next loop iteration.
289			// TODO: Remove this transmute once the borrow checker is smart enough.
290			return Poll::Ready(Ok((read, unsafe { transmute_lifetime(ancillary_reader) })));
291		}
292	}
293
294	/// Receive data on the socket from the connected peer.
295	///
296	/// This function is safe to call concurrently from different tasks.
297	/// All calling tasks will try to complete the asynchronous action,
298	/// although the order in which they complete is not guaranteed.
299	pub async fn recv(&self, buffer: &mut [u8]) -> std::io::Result<usize> {
300		loop {
301			let mut ready_guard = self.io.readable().await?;
302			match ready_guard.try_io(|inner| sys::recv(inner.get_ref(), buffer)) {
303				Ok(result) => return result,
304				Err(_would_block) => continue,
305			}
306		}
307	}
308
309	/// Receive data on the socket from the connected peer.
310	///
311	/// This function is safe to call concurrently from different tasks.
312	/// All calling tasks will try to complete the asynchronous action,
313	/// although the order in which they complete is not guaranteed.
314	pub async fn recv_vectored(&self, buffer: &mut [IoSliceMut<'_>]) -> std::io::Result<usize> {
315		let (read, _ancillary) = self.recv_vectored_with_ancillary(buffer, &mut [])
316			.await?;
317		Ok(read)
318	}
319
320	/// Receive data with ancillary data on the socket from the connected peer.
321	///
322	/// Any file descriptors received in the anicallary data will have the `close-on-exec` flag set.
323	/// If the OS supports it, this is done atomically with the reception of the message.
324	/// However, on Illumos and Solaris, the `close-on-exec` flag is set in a separate step after receiving the message.
325	///
326	/// Note that you should always wrap or close any file descriptors received this way.
327	/// If you do not, the received file descriptors will stay open until the process is terminated.
328	///
329	/// This function is safe to call concurrently from different tasks.
330	/// All calling tasks will try to complete the asynchronous action,
331	/// although the order in which they complete is not guaranteed.
332	pub async fn recv_vectored_with_ancillary<'a>(
333		&self,
334		buffer: &mut [IoSliceMut<'_>],
335		ancillary_buffer: &'a mut [u8],
336	) -> std::io::Result<(usize, AncillaryMessageReader<'a>)> {
337		loop {
338			let mut ready_guard = self.io.readable().await?;
339
340			let (read, ancillary_reader) = match ready_guard.try_io(|inner| sys::recv_msg(inner.get_ref(), buffer, ancillary_buffer)) {
341				Ok(x) => x?,
342				Err(_would_block) => continue,
343			};
344
345			// SAFETY: We have to work around a borrow checker bug:
346			// It doesn't know that we return in this branch, so the loop terminates.
347			// It thinks we will do another mutable borrow in the next loop iteration.
348			// TODO: Remove this transmute once the borrow checker is smart enough.
349			return Ok((read, unsafe { transmute_lifetime(ancillary_reader) }));
350		}
351	}
352
353	/// Shuts down the read, write, or both halves of this connection.
354	///
355	/// This function will cause all pending and future I/O calls on the
356	/// specified portions to immediately return with an appropriate value
357	/// (see the documentation of `Shutdown`).
358	pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
359		sys::shutdown(self.io.get_ref(), how)
360	}
361}
362
363impl AsRawFd for UnixSeqpacket {
364	fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
365		self.as_raw_fd()
366	}
367}
368
369impl IntoRawFd for UnixSeqpacket {
370	fn into_raw_fd(self) -> std::os::unix::io::RawFd {
371		self.into_raw_fd()
372	}
373}
374
375/// Transmute the lifetime of a `AncillaryMessageReader`.
376///
377/// Exists to ensure we do not accidentally transmute more than we intend to.
378///
379/// # Safety
380/// All the safety requirements of [`std::mem::transmute`] should be uphold.
381#[allow(clippy::needless_lifetimes)]
382unsafe fn transmute_lifetime<'a, 'b>(input: AncillaryMessageReader<'a>) -> AncillaryMessageReader<'b> {
383	std::mem::transmute(input)
384}