#![allow(clippy::non_send_fields_in_send_ty)]
use std::future::Future;
use std::marker::PhantomData;
use std::mem::{self, size_of, MaybeUninit};
use std::pin::Pin;
use std::task::{self, Poll};
use std::{io, ptr};
use crate::cancel::{Cancel, CancelOp, CancelResult};
use crate::extract::{Extract, Extractor};
use crate::fd::{AsyncFd, Descriptor, File};
use crate::io::{
Buf, BufIdx, BufMut, BufMutSlice, BufSlice, ReadBuf, ReadBufPool, ReadNBuf, SkipBuf,
};
use crate::op::{op_async_iter, op_future, poll_state, OpState};
use crate::{libc, man_link, SubmissionQueue};
#[doc = man_link!(socket(2))]
pub const fn socket<D: Descriptor>(
sq: SubmissionQueue,
domain: libc::c_int,
r#type: libc::c_int,
protocol: libc::c_int,
flags: libc::c_int,
) -> Socket<D> {
Socket {
sq: Some(sq),
state: OpState::NotStarted((domain, r#type, protocol, flags)),
kind: PhantomData,
}
}
impl<D: Descriptor> AsyncFd<D> {
#[doc = man_link!(connect(2))]
pub fn connect<'fd, A>(&'fd self, address: impl Into<Box<A>>) -> Connect<'fd, A, D>
where
A: SocketAddress,
{
let address = address.into();
Connect::new(self, address, ())
}
#[doc = man_link!(send(2))]
pub const fn send<'fd, B>(&'fd self, buf: B, flags: libc::c_int) -> Send<'fd, B, D>
where
B: Buf,
{
Send::new(self, buf, (libc::IORING_OP_SEND as u8, flags))
}
pub const fn send_zc<'fd, B>(&'fd self, buf: B, flags: libc::c_int) -> Send<'fd, B, D>
where
B: Buf,
{
Send::new(self, buf, (libc::IORING_OP_SEND_ZC as u8, flags))
}
pub const fn send_all<'fd, B>(&'fd self, buf: B) -> SendAll<'fd, B, D>
where
B: Buf,
{
SendAll::new(self, buf)
}
#[doc = man_link!(sendmsg(2))]
pub fn send_vectored<'fd, B, const N: usize>(
&'fd self,
bufs: B,
flags: libc::c_int,
) -> SendMsg<'fd, B, NoAddress, N, D>
where
B: BufSlice<N>,
{
self.sendmsg(libc::IORING_OP_SENDMSG as u8, bufs, NoAddress, flags)
}
pub fn send_vectored_zc<'fd, B, const N: usize>(
&'fd self,
bufs: B,
flags: libc::c_int,
) -> SendMsg<'fd, B, NoAddress, N, D>
where
B: BufSlice<N>,
{
self.sendmsg(libc::IORING_OP_SENDMSG_ZC as u8, bufs, NoAddress, flags)
}
pub fn send_all_vectored<'fd, B, const N: usize>(
&'fd self,
bufs: B,
) -> SendAllVectored<'fd, B, N, D>
where
B: BufSlice<N>,
{
SendAllVectored::new(self, bufs)
}
#[doc = man_link!(send(2))]
pub const fn sendto<'fd, B, A>(
&'fd self,
buf: B,
address: A,
flags: libc::c_int,
) -> SendTo<'fd, B, A, D>
where
B: Buf,
A: SocketAddress,
{
SendTo::new(self, buf, address, (libc::IORING_OP_SEND as u8, flags))
}
pub const fn sendto_zc<'fd, B, A>(
&'fd self,
buf: B,
address: A,
flags: libc::c_int,
) -> SendTo<'fd, B, A, D>
where
B: Buf,
A: SocketAddress,
{
SendTo::new(self, buf, address, (libc::IORING_OP_SEND_ZC as u8, flags))
}
#[doc = man_link!(sendmsg(2))]
pub fn sendto_vectored<'fd, B, A, const N: usize>(
&'fd self,
bufs: B,
address: A,
flags: libc::c_int,
) -> SendMsg<'fd, B, A, N, D>
where
B: BufSlice<N>,
A: SocketAddress,
{
self.sendmsg(libc::IORING_OP_SENDMSG as u8, bufs, address, flags)
}
pub fn sendto_vectored_zc<'fd, B, A, const N: usize>(
&'fd self,
bufs: B,
address: A,
flags: libc::c_int,
) -> SendMsg<'fd, B, A, N, D>
where
B: BufSlice<N>,
A: SocketAddress,
{
self.sendmsg(libc::IORING_OP_SENDMSG_ZC as u8, bufs, address, flags)
}
fn sendmsg<'fd, B, A, const N: usize>(
&'fd self,
op: u8,
bufs: B,
address: A,
flags: libc::c_int,
) -> SendMsg<'fd, B, A, N, D>
where
B: BufSlice<N>,
A: SocketAddress,
{
let msg = unsafe { mem::zeroed() };
let iovecs = unsafe { bufs.as_iovecs() };
SendMsg::new(self, bufs, address, msg, iovecs, (op, flags))
}
#[doc = man_link!(recv(2))]
pub const fn recv<'fd, B>(&'fd self, buf: B, flags: libc::c_int) -> Recv<'fd, B, D>
where
B: BufMut,
{
Recv::new(self, buf, flags)
}
pub const fn multishot_recv<'fd>(
&'fd self,
pool: ReadBufPool,
flags: libc::c_int,
) -> MultishotRecv<'fd, D> {
MultishotRecv::new(self, pool, flags)
}
pub const fn recv_n<'fd, B>(&'fd self, buf: B, n: usize) -> RecvN<'fd, B, D>
where
B: BufMut,
{
RecvN::new(self, buf, n)
}
#[doc = man_link!(recvmsg(2))]
pub fn recv_vectored<'fd, B, const N: usize>(
&'fd self,
mut bufs: B,
flags: libc::c_int,
) -> RecvVectored<'fd, B, N, D>
where
B: BufMutSlice<N>,
{
let msg = unsafe { Box::new(mem::zeroed()) };
let iovecs = unsafe { bufs.as_iovecs_mut() };
RecvVectored::new(self, bufs, msg, iovecs, flags)
}
pub fn recv_n_vectored<'fd, B, const N: usize>(
&'fd self,
bufs: B,
n: usize,
) -> RecvNVectored<'fd, B, N, D>
where
B: BufMutSlice<N>,
{
RecvNVectored::new(self, bufs, n)
}
#[doc = man_link!(recvmsg(2))]
pub fn recvfrom<'fd, B, A>(&'fd self, mut buf: B, flags: libc::c_int) -> RecvFrom<'fd, B, A, D>
where
B: BufMut,
A: SocketAddress,
{
let msg = unsafe { mem::zeroed() };
let (buf_ptr, buf_len) = unsafe { buf.parts_mut() };
let iovec = libc::iovec {
iov_base: buf_ptr.cast(),
iov_len: buf_len as _,
};
let msg = Box::new((msg, MaybeUninit::uninit()));
RecvFrom::new(self, buf, msg, iovec, flags)
}
#[doc = man_link!(recvmsg(2))]
pub fn recvfrom_vectored<'fd, B, A, const N: usize>(
&'fd self,
mut bufs: B,
flags: libc::c_int,
) -> RecvFromVectored<'fd, B, A, N, D>
where
B: BufMutSlice<N>,
A: SocketAddress,
{
let msg = unsafe { mem::zeroed() };
let iovecs = unsafe { bufs.as_iovecs_mut() };
let msg = Box::new((msg, MaybeUninit::uninit()));
RecvFromVectored::new(self, bufs, msg, iovecs, flags)
}
#[doc = man_link!(shutdown(2))]
pub const fn shutdown<'fd>(&'fd self, how: std::net::Shutdown) -> Shutdown<'fd, D> {
let how = match how {
std::net::Shutdown::Read => libc::SHUT_RD,
std::net::Shutdown::Write => libc::SHUT_WR,
std::net::Shutdown::Both => libc::SHUT_RDWR,
};
Shutdown::new(self, how)
}
#[doc = man_link!(accept(2))]
pub fn accept<'fd, A>(&'fd self) -> Accept<'fd, A, D> {
const _: () = assert!(libc::SOCK_CLOEXEC == libc::O_CLOEXEC);
self.accept4(D::cloexec_flag())
}
#[doc = man_link!(accept4(2))]
pub fn accept4<'fd, A>(&'fd self, flags: libc::c_int) -> Accept<'fd, A, D> {
let address = Box::new((MaybeUninit::uninit(), 0));
Accept::new(self, address, flags)
}
pub fn multishot_accept<'fd>(&'fd self) -> MultishotAccept<'fd, D> {
const _: () = assert!(libc::SOCK_CLOEXEC == libc::O_CLOEXEC);
self.multishot_accept4(D::cloexec_flag())
}
pub const fn multishot_accept4<'fd>(&'fd self, flags: libc::c_int) -> MultishotAccept<'fd, D> {
MultishotAccept::new(self, flags)
}
#[doc = man_link!(getsockopt(2))]
#[doc(alias = "getsockopt")]
#[allow(clippy::cast_sign_loss)] pub fn socket_option<'fd, T>(
&'fd self,
level: libc::c_int,
optname: libc::c_int,
) -> SocketOption<'fd, T, D> {
let value = Box::new(MaybeUninit::uninit());
SocketOption::new(self, value, (level as libc::__u32, optname as libc::__u32))
}
#[doc = man_link!(setsockopt(2))]
#[doc(alias = "setsockopt")]
#[allow(clippy::cast_sign_loss)] pub fn set_socket_option<'fd, T>(
&'fd self,
level: libc::c_int,
optname: libc::c_int,
optvalue: T,
) -> SetSocketOption<'fd, T, D> {
let value = Box::new(optvalue);
SetSocketOption::new(self, value, (level as libc::__u32, optname as libc::__u32))
}
}
#[derive(Debug)]
#[must_use = "`Future`s do nothing unless polled"]
pub struct Socket<D: Descriptor = File> {
sq: Option<SubmissionQueue>,
state: OpState<(libc::c_int, libc::c_int, libc::c_int, libc::c_int)>,
kind: PhantomData<D>,
}
impl<D: Descriptor + Unpin> Future for Socket<D> {
type Output = io::Result<AsyncFd<D>>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
let op_index = poll_state!(
Socket,
self.state,
self.sq.as_ref().unwrap(),
ctx,
|submission, (domain, r#type, protocol, flags)| unsafe {
submission.socket(domain, r#type, protocol, flags);
D::create_flags(submission);
},
);
let sq = self.sq.as_ref().unwrap();
match sq.poll_op(ctx, op_index) {
Poll::Ready(result) => {
self.state = OpState::Done;
match result {
Ok((_, fd)) => Poll::Ready(Ok(unsafe {
AsyncFd::from_raw(fd, self.sq.take().unwrap())
})),
Err(err) => Poll::Ready(Err(err)),
}
}
Poll::Pending => Poll::Pending,
}
}
}
op_future! {
fn AsyncFd::connect -> (),
struct Connect<'fd, A: SocketAddress> {
address: Box<A>,
},
setup_state: _unused: (),
setup: |submission, fd, (address,), ()| unsafe {
let (ptr, len) = SocketAddress::as_ptr(&**address);
submission.connect(fd.fd(), ptr, len);
},
map_result: |result| Ok(debug_assert!(result == 0)),
extract: |this, (address,), res| -> Box<A> {
debug_assert!(res == 0);
Ok(address)
},
}
op_future! {
fn AsyncFd::send -> usize,
struct Send<'fd, B: Buf> {
buf: B,
},
drop_using: Box,
setup_state: flags: (u8, libc::c_int),
setup: |submission, fd, (buf,), (op, flags)| unsafe {
let (ptr, len) = buf.parts();
submission.send(op, fd.fd(), ptr, len, flags);
},
map_result: |n| {
#[allow(clippy::cast_sign_loss)] Ok(n as usize)
},
extract: |this, (buf,), n| -> (B, usize) {
#[allow(clippy::cast_sign_loss)] Ok((buf, n as usize))
},
}
#[derive(Debug)]
pub struct SendAll<'fd, B, D: Descriptor = File> {
send: Extractor<Send<'fd, SkipBuf<B>, D>>,
}
impl<'fd, B: Buf, D: Descriptor> SendAll<'fd, B, D> {
const fn new(fd: &'fd AsyncFd<D>, buf: B) -> SendAll<'fd, B, D> {
let buf = SkipBuf { buf, skip: 0 };
SendAll {
send: Extractor {
fut: fd.send(buf, 0),
},
}
}
fn inner_poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<io::Result<B>> {
let this = unsafe { Pin::into_inner_unchecked(self) };
let mut send = unsafe { Pin::new_unchecked(&mut this.send) };
match send.as_mut().poll(ctx) {
Poll::Ready(Ok((_, 0))) => Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
Poll::Ready(Ok((mut buf, n))) => {
buf.skip += n as u32;
if let (_, 0) = unsafe { buf.parts() } {
return Poll::Ready(Ok(buf.buf));
}
send.set(send.fut.fd.send(buf, 0).extract());
unsafe { Pin::new_unchecked(this) }.inner_poll(ctx)
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}
impl<'fd, B, D: Descriptor> Cancel for SendAll<'fd, B, D> {
fn try_cancel(&mut self) -> CancelResult {
self.send.try_cancel()
}
fn cancel(&mut self) -> CancelOp {
self.send.cancel()
}
}
impl<'fd, B: Buf, D: Descriptor> Future for SendAll<'fd, B, D> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.inner_poll(ctx).map_ok(|_| ())
}
}
impl<'fd, B: Buf, D: Descriptor> Extract for SendAll<'fd, B, D> {}
impl<'fd, B: Buf, D: Descriptor> Future for Extractor<SendAll<'fd, B, D>> {
type Output = io::Result<B>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
unsafe { Pin::map_unchecked_mut(self, |s| &mut s.fut) }.inner_poll(ctx)
}
}
op_future! {
fn AsyncFd::sendto -> usize,
struct SendTo<'fd, B: Buf, A: SocketAddress> {
buf: B,
address: A,
},
drop_using: Box,
setup_state: flags: (u8, libc::c_int),
setup: |submission, fd, (buf, address), (op, flags)| unsafe {
let (buf, buf_len) = buf.parts();
let (addr, addr_len) = SocketAddress::as_ptr(address);
submission.sendto(op, fd.fd(), buf, buf_len, addr, addr_len, flags);
},
map_result: |n| {
#[allow(clippy::cast_sign_loss)] Ok(n as usize)
},
extract: |this, (buf, _), n| -> (B, usize) {
#[allow(clippy::cast_sign_loss)] Ok((buf, n as usize))
},
}
op_future! {
fn AsyncFd::send_vectored -> usize,
struct SendMsg<'fd, B: BufSlice<N>, A: SocketAddress; const N: usize> {
bufs: B,
address: A,
msg: libc::msghdr,
iovecs: [libc::iovec; N],
},
drop_using: Box,
impl !Unpin,
setup_state: flags: (u8, libc::c_int),
setup: |submission, fd, (_, address, msg, iovecs), (op, flags)| unsafe {
msg.msg_iov = iovecs.as_mut_ptr();
msg.msg_iovlen = N;
let (addr, addr_len) = SocketAddress::as_ptr(address);
msg.msg_name = addr.cast_mut().cast();
msg.msg_namelen = addr_len;
submission.sendmsg(op, fd.fd(), &*msg, flags);
},
map_result: |n| {
#[allow(clippy::cast_sign_loss)] Ok(n as usize)
},
extract: |this, (buf, _, _, _), n| -> (B, usize) {
#[allow(clippy::cast_sign_loss)] Ok((buf, n as usize))
},
}
#[derive(Debug)]
pub struct SendAllVectored<'fd, B, const N: usize, D: Descriptor = File> {
send: Extractor<SendMsg<'fd, B, NoAddress, N, D>>,
skip: u64,
}
impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> SendAllVectored<'fd, B, N, D> {
fn new(fd: &'fd AsyncFd<D>, bufs: B) -> SendAllVectored<'fd, B, N, D> {
SendAllVectored {
send: fd.send_vectored(bufs, 0).extract(),
skip: 0,
}
}
fn inner_poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<io::Result<B>> {
let this = unsafe { Pin::into_inner_unchecked(self) };
let mut send = unsafe { Pin::new_unchecked(&mut this.send) };
match send.as_mut().poll(ctx) {
Poll::Ready(Ok((_, 0))) => Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
Poll::Ready(Ok((bufs, n))) => {
this.skip += n as u64;
let mut iovecs = unsafe { bufs.as_iovecs() };
let mut skip = this.skip;
for iovec in &mut iovecs {
if iovec.iov_len as u64 <= skip {
skip -= iovec.iov_len as u64;
iovec.iov_len = 0;
} else {
iovec.iov_len -= skip as usize;
break;
}
}
if iovecs[N - 1].iov_len == 0 {
return Poll::Ready(Ok(bufs));
}
let msg = unsafe { mem::zeroed() };
let op = libc::IORING_OP_SENDMSG as u8;
send.set(
SendMsg::new(send.fut.fd, bufs, NoAddress, msg, iovecs, (op, 0)).extract(),
);
unsafe { Pin::new_unchecked(this) }.inner_poll(ctx)
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}
impl<'fd, B, const N: usize, D: Descriptor> Cancel for SendAllVectored<'fd, B, N, D> {
fn try_cancel(&mut self) -> CancelResult {
self.send.try_cancel()
}
fn cancel(&mut self) -> CancelOp {
self.send.cancel()
}
}
impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> Future for SendAllVectored<'fd, B, N, D> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.inner_poll(ctx).map_ok(|_| ())
}
}
impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> Extract for SendAllVectored<'fd, B, N, D> {}
impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> Future
for Extractor<SendAllVectored<'fd, B, N, D>>
{
type Output = io::Result<B>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
unsafe { Pin::map_unchecked_mut(self, |s| &mut s.fut) }.inner_poll(ctx)
}
}
op_future! {
fn AsyncFd::recv -> B,
struct Recv<'fd, B: BufMut> {
buf: B,
},
drop_using: Box,
setup_state: flags: libc::c_int,
setup: |submission, fd, (buf,), flags| unsafe {
let (ptr, len) = buf.parts_mut();
submission.recv(fd.fd(), ptr, len, flags);
if let Some(buf_group) = buf.buffer_group() {
submission.set_buffer_select(buf_group.0);
}
},
map_result: |this, (mut buf,), buf_idx, n| {
#[allow(clippy::cast_sign_loss)] unsafe { buf.buffer_init(BufIdx(buf_idx), n as u32) };
Ok(buf)
},
}
op_async_iter! {
fn AsyncFd::multishot_recv -> ReadBuf,
struct MultishotRecv<'fd> {
buf_pool: ReadBufPool,
},
setup_state: flags: libc::c_int,
setup: |submission, this, flags| unsafe {
submission.multishot_recv(this.fd.fd(), flags, this.buf_pool.group_id().0);
},
map_result: |this, buf_idx, n| {
if n == 0 {
this.state = crate::op::OpState::Done;
}
#[allow(clippy::cast_sign_loss)] unsafe { this.buf_pool.new_buffer(BufIdx(buf_idx), n as u32) }
},
}
#[derive(Debug)]
pub struct RecvN<'fd, B, D: Descriptor = File> {
recv: Recv<'fd, ReadNBuf<B>, D>,
left: usize,
}
impl<'fd, B: BufMut, D: Descriptor> RecvN<'fd, B, D> {
const fn new(fd: &'fd AsyncFd<D>, buf: B, n: usize) -> RecvN<'fd, B, D> {
let buf = ReadNBuf { buf, last_read: 0 };
RecvN {
recv: fd.recv(buf, 0),
left: n,
}
}
}
impl<'fd, B, D: Descriptor> Cancel for RecvN<'fd, B, D> {
fn try_cancel(&mut self) -> CancelResult {
self.recv.try_cancel()
}
fn cancel(&mut self) -> CancelOp {
self.recv.cancel()
}
}
impl<'fd, B: BufMut, D: Descriptor> Future for RecvN<'fd, B, D> {
type Output = io::Result<B>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = unsafe { Pin::into_inner_unchecked(self) };
let mut recv = unsafe { Pin::new_unchecked(&mut this.recv) };
match recv.as_mut().poll(ctx) {
Poll::Ready(Ok(buf)) => {
if buf.last_read == 0 {
return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
}
if buf.last_read >= this.left {
return Poll::Ready(Ok(buf.buf));
}
this.left -= buf.last_read;
recv.set(recv.fd.recv(buf, 0));
unsafe { Pin::new_unchecked(this) }.poll(ctx)
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}
op_future! {
fn AsyncFd::recv_vectored -> (B, libc::c_int),
struct RecvVectored<'fd, B: BufMutSlice<N>; const N: usize> {
bufs: B,
msg: Box<libc::msghdr>,
iovecs: [libc::iovec; N],
},
drop_using: Box,
impl !Unpin,
setup_state: flags: libc::c_int,
setup: |submission, fd, (_, msg, iovecs), flags| unsafe {
msg.msg_iov = iovecs.as_mut_ptr();
msg.msg_iovlen = N;
submission.recvmsg(fd.fd(), &**msg, flags);
},
map_result: |this, (mut bufs, msg, _), n| {
#[allow(clippy::cast_sign_loss)] unsafe { bufs.set_init(n as usize) };
Ok((bufs, msg.msg_flags))
},
}
#[derive(Debug)]
pub struct RecvNVectored<'fd, B, const N: usize, D: Descriptor = File> {
recv: RecvVectored<'fd, ReadNBuf<B>, N, D>,
left: usize,
}
impl<'fd, B: BufMutSlice<N>, const N: usize, D: Descriptor> RecvNVectored<'fd, B, N, D> {
fn new(fd: &'fd AsyncFd<D>, buf: B, n: usize) -> RecvNVectored<'fd, B, N, D> {
let bufs = ReadNBuf { buf, last_read: 0 };
RecvNVectored {
recv: fd.recv_vectored(bufs, 0),
left: n,
}
}
}
impl<'fd, B, const N: usize, D: Descriptor> Cancel for RecvNVectored<'fd, B, N, D> {
fn try_cancel(&mut self) -> CancelResult {
self.recv.try_cancel()
}
fn cancel(&mut self) -> CancelOp {
self.recv.cancel()
}
}
impl<'fd, B: BufMutSlice<N>, const N: usize, D: Descriptor> Future for RecvNVectored<'fd, B, N, D> {
type Output = io::Result<B>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = unsafe { Pin::into_inner_unchecked(self) };
let mut recv = unsafe { Pin::new_unchecked(&mut this.recv) };
match recv.as_mut().poll(ctx) {
Poll::Ready(Ok((bufs, _))) => {
if bufs.last_read == 0 {
return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
}
if bufs.last_read >= this.left {
return Poll::Ready(Ok(bufs.buf));
}
this.left -= bufs.last_read;
recv.set(recv.fd.recv_vectored(bufs, 0));
unsafe { Pin::new_unchecked(this) }.poll(ctx)
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}
op_future! {
fn AsyncFd::recvfrom -> (B, A, libc::c_int),
struct RecvFrom<'fd, B: BufMut, A: SocketAddress> {
buf: B,
msg: Box<(libc::msghdr, MaybeUninit<A>)>,
iovec: libc::iovec,
},
drop_using: Box,
impl !Unpin,
setup_state: flags: libc::c_int,
setup: |submission, fd, (buf, msg, iovec), flags| unsafe {
let address = &mut msg.1;
let msg = &mut msg.0;
msg.msg_iov = &mut *iovec;
msg.msg_iovlen = 1;
let (addr, addr_len) = SocketAddress::as_mut_ptr(address);
msg.msg_name = addr.cast();
msg.msg_namelen = addr_len;
submission.recvmsg(fd.fd(), &*msg, flags);
if let Some(buf_group) = buf.buffer_group() {
submission.set_buffer_select(buf_group.0);
}
},
map_result: |this, (mut buf, msg, _), buf_idx, n| {
#[allow(clippy::cast_sign_loss)] unsafe { buf.buffer_init(BufIdx(buf_idx), n as u32) };
let address = unsafe { SocketAddress::init(msg.1, msg.0.msg_namelen) };
Ok((buf, address, msg.0.msg_flags))
},
}
op_future! {
fn AsyncFd::recvfrom_vectored -> (B, A, libc::c_int),
struct RecvFromVectored<'fd, B: BufMutSlice<N>, A: SocketAddress; const N: usize> {
bufs: B,
msg: Box<(libc::msghdr, MaybeUninit<A>)>,
iovecs: [libc::iovec; N],
},
drop_using: Box,
impl !Unpin,
setup_state: flags: libc::c_int,
setup: |submission, fd, (_, msg, iovecs), flags| unsafe {
let address = &mut msg.1;
let msg = &mut msg.0;
msg.msg_iov = iovecs.as_mut_ptr();
msg.msg_iovlen = N;
let (addr, addr_len) = SocketAddress::as_mut_ptr(address);
msg.msg_name = addr.cast();
msg.msg_namelen = addr_len;
submission.recvmsg(fd.fd(), &*msg, flags);
},
map_result: |this, (mut bufs, msg, _), n| {
#[allow(clippy::cast_sign_loss)] unsafe { bufs.set_init(n as usize) };
let address = unsafe { SocketAddress::init(msg.1, msg.0.msg_namelen) };
Ok((bufs, address, msg.0.msg_flags))
},
}
op_future! {
fn AsyncFd::shutdown -> (),
struct Shutdown<'fd> {
},
setup_state: flags: libc::c_int,
setup: |submission, fd, (), how| unsafe {
submission.shutdown(fd.fd(), how);
},
map_result: |n| Ok(debug_assert!(n == 0)),
}
op_future! {
fn AsyncFd::accept -> (AsyncFd<D>, A),
struct Accept<'fd, A: SocketAddress> {
address: Box<(MaybeUninit<A>, libc::socklen_t)>,
},
setup_state: flags: libc::c_int,
setup: |submission, fd, (address,), flags| unsafe {
let (ptr, len) = SocketAddress::as_mut_ptr(&mut address.0);
address.1 = len;
submission.accept(fd.fd(), ptr, &mut address.1, flags);
submission.set_async();
D::create_flags(submission);
},
map_result: |this, (address,), fd| {
let sq = this.fd.sq.clone();
let stream = unsafe { AsyncFd::from_raw(fd, sq) };
let len = address.1;
let address = unsafe { SocketAddress::init(address.0, len) };
Ok((stream, address))
},
}
op_async_iter! {
fn AsyncFd::multishot_accept -> AsyncFd<D>,
struct MultishotAccept<'fd> {
},
setup_state: flags: libc::c_int,
setup: |submission, this, flags| unsafe {
submission.multishot_accept(this.fd.fd(), flags);
submission.set_async();
D::create_flags(submission);
},
map_result: |this, _flags, fd| {
let sq = this.fd.sq.clone();
unsafe { AsyncFd::from_raw(fd, sq) }
},
}
op_future! {
fn AsyncFd::socket_option -> T,
struct SocketOption<'fd, T> {
value: Box<MaybeUninit<T>>,
},
setup_state: flags: (libc::__u32, libc::__u32),
setup: |submission, fd, (value,), (level, optname)| unsafe {
let optvalue = ptr::addr_of_mut!(**value).cast();
let optlen = size_of::<T>() as u32;
submission.uring_command(libc::SOCKET_URING_OP_GETSOCKOPT, fd.fd(), level, optname, optvalue, optlen);
},
map_result: |this, (value,), optlen| {
debug_assert!(optlen == (size_of::<T>() as i32));
Ok(unsafe { MaybeUninit::assume_init(*value) })
},
}
op_future! {
fn AsyncFd::set_socket_option -> (),
struct SetSocketOption<'fd, T> {
value: Box<T>,
},
setup_state: flags: (libc::__u32, libc::__u32),
setup: |submission, fd, (value,), (level, optname)| unsafe {
let optvalue = ptr::addr_of_mut!(**value).cast();
let optlen = size_of::<T>() as u32;
submission.uring_command(libc::SOCKET_URING_OP_SETSOCKOPT, fd.fd(), level, optname, optvalue, optlen);
},
map_result: |result| Ok(debug_assert!(result == 0)),
extract: |this, (value,), res| -> Box<T> {
debug_assert!(res == 0);
Ok(value)
},
}
pub trait SocketAddress: Sized {
unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t);
unsafe fn as_mut_ptr(address: &mut MaybeUninit<Self>)
-> (*mut libc::sockaddr, libc::socklen_t);
unsafe fn init(address: MaybeUninit<Self>, length: libc::socklen_t) -> Self;
}
impl SocketAddress for (libc::sockaddr, libc::socklen_t) {
unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
(ptr::addr_of!(self.0).cast(), self.1)
}
unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
(
ptr::addr_of_mut!((*this.as_mut_ptr()).0).cast(),
size_of::<libc::sockaddr>() as _,
)
}
unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
debug_assert!(length >= size_of::<libc::sa_family_t>() as _);
let mut this = this.assume_init();
this.1 = length;
this
}
}
impl SocketAddress for (libc::sockaddr_storage, libc::socklen_t) {
unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
(ptr::addr_of!(self.0).cast(), self.1)
}
unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
(
ptr::addr_of_mut!((*this.as_mut_ptr()).0).cast(),
size_of::<libc::sockaddr_storage>() as _,
)
}
unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
debug_assert!(length >= size_of::<libc::sa_family_t>() as _);
let mut this = this.assume_init();
this.1 = length;
this
}
}
impl SocketAddress for libc::sockaddr_in {
unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
(
(self as *const libc::sockaddr_in).cast(),
size_of::<Self>() as _,
)
}
unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
(this.as_mut_ptr().cast(), size_of::<Self>() as _)
}
unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
debug_assert!(length == size_of::<Self>() as _);
this.assume_init()
}
}
impl SocketAddress for libc::sockaddr_in6 {
unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
(
(self as *const libc::sockaddr_in6).cast(),
size_of::<Self>() as _,
)
}
unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
(this.as_mut_ptr().cast(), size_of::<Self>() as _)
}
unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
debug_assert!(length == size_of::<Self>() as _);
this.assume_init()
}
}
impl SocketAddress for (libc::sockaddr_un, libc::socklen_t) {
unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
(ptr::addr_of!(self.0).cast(), self.1)
}
unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
(
ptr::addr_of_mut!((*this.as_mut_ptr()).0).cast(),
size_of::<libc::sockaddr_un>() as _,
)
}
unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
debug_assert!(length >= size_of::<libc::sa_family_t>() as _);
let mut this = this.assume_init();
this.1 = length;
this
}
}
#[derive(Debug)]
pub struct NoAddress;
impl SocketAddress for NoAddress {
unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
(ptr::null_mut(), 0)
}
unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
_ = this;
(ptr::null_mut(), 0)
}
unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
debug_assert!(length == 0);
this.assume_init()
}
}