1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
use filedesc::FileDesc;
use std::io::{IoSlice, IoSliceMut};
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, IntoRawFd, OwnedFd};
use std::path::Path;
use std::task::{Context, Poll};
use tokio::io::unix::AsyncFd;
use crate::ancillary::{AncillaryMessageReader, AncillaryMessageWriter};
use crate::{sys, UCred};
/// Unix seqpacket socket.
///
/// Note that there are no functions to get the local or remote address of the connection.
/// That is because connected Unix sockets are always anonymous,
/// which means that the address contains no useful information.
pub struct UnixSeqpacket {
io: AsyncFd<FileDesc>,
}
impl std::fmt::Debug for UnixSeqpacket {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("UnixSeqpacket")
.field("fd", &self.io.get_ref().as_raw_fd())
.finish()
}
}
impl AsFd for UnixSeqpacket {
fn as_fd(&self) -> BorrowedFd<'_> {
self.io.get_ref().as_fd()
}
}
impl TryFrom<OwnedFd> for UnixSeqpacket {
type Error = std::io::Error;
fn try_from(fd: OwnedFd) -> Result<Self, Self::Error> {
Self::new(FileDesc::new(fd))
}
}
impl From<UnixSeqpacket> for OwnedFd {
fn from(socket: UnixSeqpacket) -> Self {
socket.io.into_inner().into_fd()
}
}
impl UnixSeqpacket {
pub(crate) fn new(socket: FileDesc) -> std::io::Result<Self> {
let io = AsyncFd::new(socket)?;
Ok(Self { io })
}
/// Connect a new seqpacket socket to the given address.
pub async fn connect<P: AsRef<Path>>(address: P) -> std::io::Result<Self> {
let socket = sys::local_seqpacket_socket()?;
if let Err(e) = sys::connect(&socket, address) {
if e.kind() != std::io::ErrorKind::WouldBlock {
return Err(e);
}
}
let socket = Self::new(socket)?;
socket.io.writable().await?.retain_ready();
Ok(socket)
}
/// Create a pair of connected seqpacket sockets.
pub fn pair() -> std::io::Result<(Self, Self)> {
let (a, b) = sys::local_seqpacket_pair()?;
Ok((Self::new(a)?, Self::new(b)?))
}
/// Wrap a raw file descriptor as [`UnixSeqpacket`].
///
/// Registration of the file descriptor with the tokio runtime may fail.
/// For that reason, this function returns a [`std::io::Result`].
///
/// # Safety
/// This function is unsafe because the socket assumes it is the sole owner of the file descriptor.
/// Usage of this function could accidentally allow violating this contract
/// which can cause memory unsafety in code that relies on it being true.
pub unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> std::io::Result<Self> {
Self::new(FileDesc::from_raw_fd(fd))
}
/// Get the raw file descriptor of the socket.
///
/// This is a shortcut for `seqpacket.as_async_fd().as_raw_fd()`.
/// See [`as_async_fd`](Self::as_async_fd).
pub fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.io.as_raw_fd()
}
/// Deregister the socket from the tokio runtime and return the inner file descriptor.
pub fn into_raw_fd(self) -> std::os::unix::io::RawFd {
self.io.into_inner().into_raw_fd()
}
#[doc(hidden)]
#[deprecated(
since = "0.4.0",
note = "all I/O functions now take a shared reference to self, so splitting is no longer necessary"
)]
pub fn split(&self) -> (&Self, &Self) {
(self, self)
}
/// Get the async file descriptor of this object.
///
/// This can be useful for applications that want to do low-level socket calls, such as
/// [`sendmsg`](libc::sendmsg), but still want to use async and need to know when the socket is
/// ready to be used.
///
/// Example:
/// ```
/// # async fn f() -> std::io::Result<()> {
/// let seqpacket = tokio_seqpacket::UnixSeqpacket::connect("/tmp/example.sock").await?;
/// seqpacket.as_async_fd().writable().await?.retain_ready();
/// # Ok(()) }
/// ```
pub fn as_async_fd(&self) -> &AsyncFd<FileDesc> {
&self.io
}
/// Get the effective credentials of the process which called `connect` or `pair`.
///
/// Note that this is not necessarily the process that currently has the file descriptor
/// of the other side of the connection.
pub fn peer_cred(&self) -> std::io::Result<UCred> {
UCred::from_socket_peer(&self.io)
}
/// Get and clear the value of the `SO_ERROR` option.
pub fn take_error(&self) -> std::io::Result<Option<std::io::Error>> {
sys::take_socket_error(self.io.get_ref())
}
/// Try to send data on the socket to the connected peer without blocking.
///
/// If the socket is not ready yet, the current task is scheduled to wake up when the socket becomes writeable.
///
/// Note that unlike [`Self::send`], only the last task calling this function will be woken up.
/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
pub fn poll_send(&self, cx: &mut Context, buffer: &[u8]) -> Poll<std::io::Result<usize>> {
loop {
let mut ready_guard = ready!(self.io.poll_write_ready(cx)?);
match ready_guard.try_io(|inner| sys::send(inner.get_ref(), buffer)) {
Ok(result) => return Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
/// Try to send data on the socket to the connected peer without blocking.
///
/// If the socket is not ready yet, the current task is scheduled to wake up when the socket becomes writeable.
///
/// Note that unlike [`Self::send_vectored`], only the last task calling this function will be woken up.
/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
pub fn poll_send_vectored(&self, cx: &mut Context, buffer: &[IoSlice]) -> Poll<std::io::Result<usize>> {
self.poll_send_vectored_with_ancillary(cx, buffer, &mut AncillaryMessageWriter::new(&mut []))
}
/// Try to send data with ancillary data on the socket to the connected peer without blocking.
///
/// If the socket is not ready yet, the current task is scheduled to wake up when the socket becomes writeable.
///
/// Note that unlike [`Self::send_vectored_with_ancillary`], only the last task calling this function will be woken up.
/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
pub fn poll_send_vectored_with_ancillary(
&self,
cx: &mut Context,
buffer: &[IoSlice],
ancillary: &mut AncillaryMessageWriter,
) -> Poll<std::io::Result<usize>> {
loop {
let mut ready_guard = ready!(self.io.poll_write_ready(cx)?);
match ready_guard.try_io(|inner| sys::send_msg(inner.get_ref(), buffer, ancillary)) {
Ok(result) => return Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
/// Send data on the socket to the connected peer.
///
/// This function is safe to call concurrently from different tasks.
/// All calling tasks will try to complete the asynchronous action,
/// although the order in which they complete is not guaranteed.
pub async fn send(&self, buffer: &[u8]) -> std::io::Result<usize> {
loop {
let mut ready_guard = self.io.writable().await?;
match ready_guard.try_io(|inner| sys::send(inner.get_ref(), buffer)) {
Ok(result) => return result,
Err(_would_block) => continue,
}
}
}
/// Send data on the socket to the connected peer.
///
/// This function is safe to call concurrently from different tasks.
/// All calling tasks will try to complete the asynchronous action,
/// although the order in which they complete is not guaranteed.
pub async fn send_vectored(&self, buffer: &[IoSlice<'_>]) -> std::io::Result<usize> {
self.send_vectored_with_ancillary(buffer, &mut AncillaryMessageWriter::new(&mut []))
.await
}
/// Send data with ancillary data on the socket to the connected peer.
///
/// This function is safe to call concurrently from different tasks.
/// All calling tasks will try to complete the asynchronous action,
/// although the order in which they complete is not guaranteed.
pub async fn send_vectored_with_ancillary(
&self,
buffer: &[IoSlice<'_>],
ancillary: &mut AncillaryMessageWriter<'_>,
) -> std::io::Result<usize> {
loop {
let mut ready_guard = self.io.writable().await?;
match ready_guard.try_io(|inner| sys::send_msg(inner.get_ref(), buffer, ancillary)) {
Ok(result) => return result,
Err(_would_block) => continue,
}
}
}
/// Try to receive data on the socket from the connected peer without blocking.
///
/// If there is no data ready yet, the current task is scheduled to wake up when the socket becomes readable.
///
/// Note that unlike [`Self::recv`], only the last task calling this function will be woken up.
/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
pub fn poll_recv(&self, cx: &mut Context, buffer: &mut [u8]) -> Poll<std::io::Result<usize>> {
loop {
let mut ready_guard = ready!(self.io.poll_read_ready(cx)?);
match ready_guard.try_io(|inner| sys::recv(inner.get_ref(), buffer)) {
Ok(result) => return Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
/// Try to receive data on the socket from the connected peer without blocking.
///
/// If there is no data ready yet, the current task is scheduled to wake up when the socket becomes readable.
///
/// Note that unlike [`Self::recv_vectored`], only the last task calling this function will be woken up.
/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
pub fn poll_recv_vectored(&self, cx: &mut Context, buffer: &mut [IoSliceMut]) -> Poll<std::io::Result<usize>> {
let (read, _ancillary) = ready!(self.poll_recv_vectored_with_ancillary(cx, buffer, &mut []))?;
Poll::Ready(Ok(read))
}
/// Try to receive data with ancillary data on the socket from the connected peer without blocking.
///
/// Any file descriptors received in the anicallary data will have the `close-on-exec` flag set.
/// If the OS supports it, this is done atomically with the reception of the message.
/// However, on Illumos and Solaris, the `close-on-exec` flag is set in a separate step after receiving the message.
///
/// Note that you should always wrap or close any file descriptors received this way.
/// If you do not, the received file descriptors will stay open until the process is terminated.
///
/// If there is no data ready yet, the current task is scheduled to wake up when the socket becomes readable.
///
/// Note that unlike [`Self::recv_vectored_with_ancillary`], only the last task calling this function will be woken up.
/// For that reason, it is preferable to use the async functions rather than polling functions when possible.
pub fn poll_recv_vectored_with_ancillary<'a>(
&self,
cx: &mut Context,
buffer: &mut [IoSliceMut],
ancillary_buffer: &'a mut [u8],
) -> Poll<std::io::Result<(usize, AncillaryMessageReader<'a>)>> {
loop {
let mut ready_guard = ready!(self.io.poll_read_ready(cx)?);
let (read, ancillary_reader) = match ready_guard.try_io(|inner| sys::recv_msg(inner.get_ref(), buffer, ancillary_buffer)) {
Ok(x) => x?,
Err(_would_block) => continue,
};
// SAFETY: We have to work around a borrow checker bug:
// It doesn't know that we return in this branch, so the loop terminates.
// It thinks we will do another mutable borrow in the next loop iteration.
// TODO: Remove this transmute once the borrow checker is smart enough.
return Poll::Ready(Ok((read, unsafe { transmute_lifetime(ancillary_reader) })));
}
}
/// Receive data on the socket from the connected peer.
///
/// This function is safe to call concurrently from different tasks.
/// All calling tasks will try to complete the asynchronous action,
/// although the order in which they complete is not guaranteed.
pub async fn recv(&self, buffer: &mut [u8]) -> std::io::Result<usize> {
loop {
let mut ready_guard = self.io.readable().await?;
match ready_guard.try_io(|inner| sys::recv(inner.get_ref(), buffer)) {
Ok(result) => return result,
Err(_would_block) => continue,
}
}
}
/// Receive data on the socket from the connected peer.
///
/// This function is safe to call concurrently from different tasks.
/// All calling tasks will try to complete the asynchronous action,
/// although the order in which they complete is not guaranteed.
pub async fn recv_vectored(&self, buffer: &mut [IoSliceMut<'_>]) -> std::io::Result<usize> {
let (read, _ancillary) = self.recv_vectored_with_ancillary(buffer, &mut [])
.await?;
Ok(read)
}
/// Receive data with ancillary data on the socket from the connected peer.
///
/// Any file descriptors received in the anicallary data will have the `close-on-exec` flag set.
/// If the OS supports it, this is done atomically with the reception of the message.
/// However, on Illumos and Solaris, the `close-on-exec` flag is set in a separate step after receiving the message.
///
/// Note that you should always wrap or close any file descriptors received this way.
/// If you do not, the received file descriptors will stay open until the process is terminated.
///
/// This function is safe to call concurrently from different tasks.
/// All calling tasks will try to complete the asynchronous action,
/// although the order in which they complete is not guaranteed.
pub async fn recv_vectored_with_ancillary<'a>(
&self,
buffer: &mut [IoSliceMut<'_>],
ancillary_buffer: &'a mut [u8],
) -> std::io::Result<(usize, AncillaryMessageReader<'a>)> {
loop {
let mut ready_guard = self.io.readable().await?;
let (read, ancillary_reader) = match ready_guard.try_io(|inner| sys::recv_msg(inner.get_ref(), buffer, ancillary_buffer)) {
Ok(x) => x?,
Err(_would_block) => continue,
};
// SAFETY: We have to work around a borrow checker bug:
// It doesn't know that we return in this branch, so the loop terminates.
// It thinks we will do another mutable borrow in the next loop iteration.
// TODO: Remove this transmute once the borrow checker is smart enough.
return Ok((read, unsafe { transmute_lifetime(ancillary_reader) }));
}
}
/// Shuts down the read, write, or both halves of this connection.
///
/// This function will cause all pending and future I/O calls on the
/// specified portions to immediately return with an appropriate value
/// (see the documentation of `Shutdown`).
pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
sys::shutdown(self.io.get_ref(), how)
}
}
impl AsRawFd for UnixSeqpacket {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.as_raw_fd()
}
}
impl IntoRawFd for UnixSeqpacket {
fn into_raw_fd(self) -> std::os::unix::io::RawFd {
self.into_raw_fd()
}
}
/// Transmute the lifetime of a `AncillaryMessageReader`.
///
/// Exists to ensure we do not accidentally transmute more than we intend to.
///
/// # Safety
/// All the safety requirements of [`std::mem::transmute`] should be uphold.
#[allow(clippy::needless_lifetimes)]
unsafe fn transmute_lifetime<'a, 'b>(input: AncillaryMessageReader<'a>) -> AncillaryMessageReader<'b> {
std::mem::transmute(input)
}