use std::{
ffi::CString,
io,
marker::PhantomPinned,
os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
pin::Pin,
};
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use io_uring::{
opcode,
types::{Fd, FsyncFlags},
};
use pin_project_lite::pin_project;
use socket2::{SockAddr, SockAddrStorage, socklen_t};
use super::OpCode;
pub use crate::sys::unix_op::*;
use crate::{OpEntry, op::*, sys_slice::*, syscall};
unsafe impl<
D: std::marker::Send + 'static,
F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
> OpCode for Asyncify<F, D>
{
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
OpEntry::Blocking
}
fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
let this = self.project();
let f = this
.f
.take()
.expect("the operate method could only be called once");
let BufResult(res, data) = f();
*this.data = Some(data);
res
}
}
unsafe impl<
S,
D: std::marker::Send + 'static,
F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
> OpCode for AsyncifyFd<S, F, D>
{
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
OpEntry::Blocking
}
fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
let this = self.project();
let f = this
.f
.take()
.expect("the operate method could only be called once");
let BufResult(res, data) = f(this.fd);
*this.data = Some(data);
res
}
}
unsafe impl OpCode for OpenFile {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
.flags(self.flags | libc::O_CLOEXEC)
.mode(self.mode)
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
unsafe impl OpCode for CloseFile {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Ftruncate::new(Fd(self.fd.as_fd().as_raw_fd()), self.size)
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
pin_project! {
pub struct FileStat<S> {
pub(crate) fd: S,
pub(crate) stat: Statx,
}
}
impl<S> FileStat<S> {
pub fn new(fd: S) -> Self {
Self {
fd,
stat: unsafe { std::mem::zeroed() },
}
}
}
unsafe impl<S: AsFd> OpCode for FileStat<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
static EMPTY_NAME: &[u8] = b"\0";
opcode::Statx::new(
Fd(this.fd.as_fd().as_fd().as_raw_fd()),
EMPTY_NAME.as_ptr().cast(),
this.stat as *mut _ as _,
)
.flags(libc::AT_EMPTY_PATH)
.mask(statx_mask())
.build()
.into()
}
#[cfg(gnulinux)]
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
let this = self.project();
static EMPTY_NAME: &[u8] = b"\0";
let res = syscall!(libc::statx(
this.fd.as_fd().as_raw_fd(),
EMPTY_NAME.as_ptr().cast(),
libc::AT_EMPTY_PATH,
statx_mask(),
this.stat as *mut _ as _
))?;
Ok(res as _)
}
#[cfg(not(gnulinux))]
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
let this = self.project();
let mut stat = unsafe { std::mem::zeroed() };
let res = syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut stat))?;
*this.stat = stat_to_statx(stat);
Ok(res as _)
}
}
impl<S> IntoInner for FileStat<S> {
type Inner = Stat;
fn into_inner(self) -> Self::Inner {
statx_to_stat(self.stat)
}
}
pub struct PathStat {
pub(crate) path: CString,
pub(crate) stat: Statx,
pub(crate) follow_symlink: bool,
}
impl PathStat {
pub fn new(path: CString, follow_symlink: bool) -> Self {
Self {
path,
stat: unsafe { std::mem::zeroed() },
follow_symlink,
}
}
}
unsafe impl OpCode for PathStat {
fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
let mut flags = libc::AT_EMPTY_PATH;
if !self.follow_symlink {
flags |= libc::AT_SYMLINK_NOFOLLOW;
}
opcode::Statx::new(
Fd(libc::AT_FDCWD),
self.path.as_ptr(),
std::ptr::addr_of_mut!(self.stat).cast(),
)
.flags(flags)
.mask(statx_mask())
.build()
.into()
}
#[cfg(gnulinux)]
fn call_blocking(mut self: Pin<&mut Self>) -> io::Result<usize> {
let mut flags = libc::AT_EMPTY_PATH;
if !self.follow_symlink {
flags |= libc::AT_SYMLINK_NOFOLLOW;
}
let res = syscall!(libc::statx(
libc::AT_FDCWD,
self.path.as_ptr(),
flags,
statx_mask(),
std::ptr::addr_of_mut!(self.stat).cast()
))?;
Ok(res as _)
}
#[cfg(not(gnulinux))]
fn call_blocking(mut self: Pin<&mut Self>) -> io::Result<usize> {
let mut stat = unsafe { std::mem::zeroed() };
let res = if self.follow_symlink {
syscall!(libc::stat(self.path.as_ptr(), &mut stat))?
} else {
syscall!(libc::lstat(self.path.as_ptr(), &mut stat))?
};
self.stat = stat_to_statx(stat);
Ok(res as _)
}
}
impl IntoInner for PathStat {
type Inner = Stat;
fn into_inner(self) -> Self::Inner {
statx_to_stat(self.stat)
}
}
unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
let fd = Fd(this.fd.as_fd().as_raw_fd());
let slice = this.buffer.sys_slice_mut();
opcode::Read::new(
fd,
slice.ptr() as _,
slice.len().try_into().unwrap_or(u32::MAX),
)
.offset(*this.offset)
.build()
.into()
}
}
unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
*this.slices = this.buffer.sys_slices_mut();
opcode::Readv::new(
Fd(this.fd.as_fd().as_raw_fd()),
this.slices.as_ptr() as _,
this.slices.len().try_into().unwrap_or(u32::MAX),
)
.offset(*this.offset)
.build()
.into()
}
}
unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let slice = self.buffer.as_init();
opcode::Write::new(
Fd(self.fd.as_fd().as_raw_fd()),
slice.as_ptr(),
slice.len().try_into().unwrap_or(u32::MAX),
)
.offset(self.offset)
.build()
.into()
}
}
unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
*this.slices = this.buffer.as_ref().sys_slices();
opcode::Writev::new(
Fd(this.fd.as_fd().as_raw_fd()),
this.slices.as_ptr() as _,
this.slices.len().try_into().unwrap_or(u32::MAX),
)
.offset(*this.offset)
.build()
.into()
}
}
unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let fd = self.fd.as_fd().as_raw_fd();
let slice = self.project().buffer.sys_slice_mut();
opcode::Read::new(
Fd(fd),
slice.ptr() as _,
slice.len().try_into().unwrap_or(u32::MAX),
)
.build()
.into()
}
}
unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
*this.slices = this.buffer.sys_slices_mut();
opcode::Readv::new(
Fd(this.fd.as_fd().as_raw_fd()),
this.slices.as_ptr() as _,
this.slices.len().try_into().unwrap_or(u32::MAX),
)
.build()
.into()
}
}
unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let slice = self.buffer.as_init();
opcode::Write::new(
Fd(self.fd.as_fd().as_raw_fd()),
slice.as_ptr(),
slice.len().try_into().unwrap_or(u32::MAX),
)
.build()
.into()
}
}
unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
*this.slices = this.buffer.as_ref().sys_slices();
opcode::Writev::new(
Fd(this.fd.as_fd().as_raw_fd()),
this.slices.as_ptr() as _,
this.slices.len().try_into().unwrap_or(u32::MAX),
)
.build()
.into()
}
}
unsafe impl<S: AsFd> OpCode for Sync<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
.flags(if self.datasync {
FsyncFlags::DATASYNC
} else {
FsyncFlags::empty()
})
.build()
.into()
}
}
unsafe impl OpCode for Unlink {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
.flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
unsafe impl OpCode for CreateDir {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
.mode(self.mode)
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
unsafe impl OpCode for Rename {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::RenameAt::new(
Fd(libc::AT_FDCWD),
self.old_path.as_ptr(),
Fd(libc::AT_FDCWD),
self.new_path.as_ptr(),
)
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
unsafe impl OpCode for Symlink {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::SymlinkAt::new(
Fd(libc::AT_FDCWD),
self.source.as_ptr(),
self.target.as_ptr(),
)
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
unsafe impl OpCode for HardLink {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::LinkAt::new(
Fd(libc::AT_FDCWD),
self.source.as_ptr(),
Fd(libc::AT_FDCWD),
self.target.as_ptr(),
)
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
unsafe impl OpCode for CreateSocket {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Socket::new(
self.domain,
self.socket_type | libc::SOCK_CLOEXEC,
self.protocol,
)
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
Ok(syscall!(libc::socket(
self.domain,
self.socket_type | libc::SOCK_CLOEXEC,
self.protocol
))? as _)
}
}
unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
unsafe impl OpCode for CloseSocket {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
self.call()
}
}
unsafe impl<S: AsFd> OpCode for Accept<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
opcode::Accept::new(
Fd(this.fd.as_fd().as_raw_fd()),
unsafe { this.buffer.view_as::<libc::sockaddr>() },
this.addr_len,
)
.flags(libc::SOCK_CLOEXEC)
.build()
.into()
}
unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
*self.project().accepted_fd = Some(fd);
}
}
unsafe impl<S: AsFd> OpCode for Connect<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Connect::new(
Fd(self.fd.as_fd().as_raw_fd()),
self.addr.as_ptr().cast(),
self.addr.len(),
)
.build()
.into()
}
}
unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let fd = self.fd.as_fd().as_raw_fd();
let flags = self.flags;
let slice = self.project().buffer.sys_slice_mut();
opcode::Recv::new(
Fd(fd),
slice.ptr() as _,
slice.len().try_into().unwrap_or(u32::MAX),
)
.flags(flags)
.build()
.into()
}
}
unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
self.as_mut().set_msg();
let this = self.project();
opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
.flags(*this.flags as _)
.build()
.into()
}
}
unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let slice = self.buffer.as_init();
opcode::Send::new(
Fd(self.fd.as_fd().as_raw_fd()),
slice.as_ptr(),
slice.len().try_into().unwrap_or(u32::MAX),
)
.flags(self.flags)
.build()
.into()
}
}
unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
self.as_mut().set_msg();
let this = self.project();
opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
.flags(*this.flags as _)
.build()
.into()
}
}
struct RecvFromHeader<S> {
pub(crate) fd: S,
pub(crate) addr: SockAddrStorage,
pub(crate) msg: libc::msghdr,
pub(crate) flags: i32,
_p: PhantomPinned,
}
impl<S> RecvFromHeader<S> {
pub fn new(fd: S, flags: i32) -> Self {
Self {
fd,
addr: SockAddrStorage::zeroed(),
msg: unsafe { std::mem::zeroed() },
flags,
_p: PhantomPinned,
}
}
}
impl<S: AsFd> RecvFromHeader<S> {
pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
self.msg.msg_name = &mut self.addr as *mut _ as _;
self.msg.msg_namelen = self.addr.size_of() as _;
self.msg.msg_iov = slices.as_mut_ptr() as _;
self.msg.msg_iovlen = slices.len() as _;
opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
.flags(self.flags as _)
.build()
.into()
}
pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
(self.addr, self.msg.msg_namelen)
}
}
pin_project! {
pub struct RecvFrom<T: IoBufMut, S> {
header: RecvFromHeader<S>,
#[pin]
buffer: T,
slice: Option<SysSlice>,
}
}
impl<T: IoBufMut, S> RecvFrom<T, S> {
pub fn new(fd: S, buffer: T, flags: i32) -> Self {
Self {
header: RecvFromHeader::new(fd, flags),
buffer,
slice: None,
}
}
}
unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
let slice = this.slice.insert(this.buffer.sys_slice_mut());
this.header.create_entry(std::slice::from_mut(slice))
}
}
impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
type Inner = (T, SockAddrStorage, socklen_t);
fn into_inner(self) -> Self::Inner {
let (addr, addr_len) = self.header.into_addr();
(self.buffer, addr, addr_len)
}
}
pin_project! {
pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
header: RecvFromHeader<S>,
#[pin]
buffer: T,
slice: Vec<SysSlice>,
}
}
impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
pub fn new(fd: S, buffer: T, flags: i32) -> Self {
Self {
header: RecvFromHeader::new(fd, flags),
buffer,
slice: vec![],
}
}
}
unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
*this.slice = this.buffer.sys_slices_mut();
this.header.create_entry(this.slice)
}
}
impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
type Inner = (T, SockAddrStorage, socklen_t);
fn into_inner(self) -> Self::Inner {
let (addr, addr_len) = self.header.into_addr();
(self.buffer, addr, addr_len)
}
}
struct SendToHeader<S> {
pub(crate) fd: S,
pub(crate) addr: SockAddr,
pub(crate) msg: libc::msghdr,
pub(crate) flags: i32,
_p: PhantomPinned,
}
impl<S> SendToHeader<S> {
pub fn new(fd: S, addr: SockAddr, flags: i32) -> Self {
Self {
fd,
addr,
msg: unsafe { std::mem::zeroed() },
flags,
_p: PhantomPinned,
}
}
}
impl<S: AsFd> SendToHeader<S> {
pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
self.msg.msg_name = self.addr.as_ptr() as _;
self.msg.msg_namelen = self.addr.len();
self.msg.msg_iov = slices.as_mut_ptr() as _;
self.msg.msg_iovlen = slices.len() as _;
opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
.flags(self.flags as _)
.build()
.into()
}
}
pin_project! {
pub struct SendTo<T: IoBuf, S> {
header: SendToHeader<S>,
#[pin]
buffer: T,
slice: Option<SysSlice>,
}
}
impl<T: IoBuf, S> SendTo<T, S> {
pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
Self {
header: SendToHeader::new(fd, addr, flags),
buffer,
slice: None,
}
}
}
unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
this.header.create_entry(std::slice::from_mut(slice))
}
}
impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
type Inner = T;
fn into_inner(self) -> Self::Inner {
self.buffer
}
}
pin_project! {
pub struct SendToVectored<T: IoVectoredBuf, S> {
header: SendToHeader<S>,
#[pin]
buffer: T,
slice: Vec<SysSlice>,
}
}
impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
Self {
header: SendToHeader::new(fd, addr, flags),
buffer,
slice: vec![],
}
}
}
unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
*this.slice = this.buffer.as_ref().sys_slices();
this.header.create_entry(this.slice)
}
}
impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
type Inner = T;
fn into_inner(self) -> Self::Inner {
self.buffer
}
}
unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
self.as_mut().set_msg();
let this = self.project();
opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
.flags(*this.flags as _)
.build()
.into()
}
}
unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
self.as_mut().set_msg();
let this = self.project();
opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
.flags(*this.flags as _)
.build()
.into()
}
}
unsafe impl<S: AsFd> OpCode for PollOnce<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let flags = match self.interest {
Interest::Readable => libc::POLLIN,
Interest::Writable => libc::POLLOUT,
};
opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
.build()
.into()
}
}
unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Splice::new(
Fd(self.fd_in.as_fd().as_raw_fd()),
self.offset_in,
Fd(self.fd_out.as_fd().as_raw_fd()),
self.offset_out,
self.len.try_into().unwrap_or(u32::MAX),
)
.flags(self.flags)
.build()
.into()
}
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
let mut offset_in = self.offset_in;
let mut offset_out = self.offset_out;
let offset_in_ptr = if offset_in < 0 {
std::ptr::null_mut()
} else {
&mut offset_in
};
let offset_out_ptr = if offset_out < 0 {
std::ptr::null_mut()
} else {
&mut offset_out
};
Ok(syscall!(libc::splice(
self.fd_in.as_fd().as_raw_fd(),
offset_in_ptr,
self.fd_out.as_fd().as_raw_fd(),
offset_out_ptr,
self.len,
self.flags,
))? as _)
}
}
mod buf_ring {
use std::{
io,
marker::PhantomPinned,
os::fd::{AsFd, AsRawFd},
pin::Pin,
ptr,
};
use io_uring::{opcode, squeue::Flags, types::Fd};
use pin_project_lite::pin_project;
use socket2::{SockAddr, SockAddrStorage, socklen_t};
use super::OpCode;
use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
#[derive(Debug)]
pub struct ReadManagedAt<S> {
pub(crate) fd: S,
pub(crate) offset: u64,
buffer_group: u16,
len: u32,
_p: PhantomPinned,
}
impl<S> ReadManagedAt<S> {
pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_io_uring();
Ok(Self {
fd,
offset,
buffer_group: buffer_pool.buffer_group(),
len: len.try_into().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
})?,
_p: PhantomPinned,
})
}
}
unsafe impl<S: AsFd> OpCode for ReadManagedAt<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let fd = Fd(self.fd.as_fd().as_raw_fd());
let offset = self.offset;
opcode::Read::new(fd, ptr::null_mut(), self.len)
.offset(offset)
.buf_group(self.buffer_group)
.build()
.flags(Flags::BUFFER_SELECT)
.into()
}
}
impl<S> TakeBuffer for ReadManagedAt<S> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;
fn take_buffer(
self,
buffer_pool: &Self::BufferPool,
result: io::Result<usize>,
buffer_id: u16,
) -> io::Result<Self::Buffer<'_>> {
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_io_uring();
let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
#[cfg(fusion)]
let res = res.map(BorrowedBuffer::new_io_uring);
res
}
}
pub struct ReadManaged<S> {
fd: S,
buffer_group: u16,
len: u32,
_p: PhantomPinned,
}
impl<S> ReadManaged<S> {
pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_io_uring();
Ok(Self {
fd,
buffer_group: buffer_pool.buffer_group(),
len: len.try_into().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
})?,
_p: PhantomPinned,
})
}
}
unsafe impl<S: AsFd> OpCode for ReadManaged<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let fd = self.fd.as_fd().as_raw_fd();
opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
.buf_group(self.buffer_group)
.build()
.flags(Flags::BUFFER_SELECT)
.into()
}
}
impl<S> TakeBuffer for ReadManaged<S> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;
fn take_buffer(
self,
buffer_pool: &Self::BufferPool,
result: io::Result<usize>,
buffer_id: u16,
) -> io::Result<Self::Buffer<'_>> {
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_io_uring();
let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
#[cfg(fusion)]
let res = res.map(BorrowedBuffer::new_io_uring);
res
}
}
pub struct RecvManaged<S> {
fd: S,
buffer_group: u16,
len: u32,
flags: i32,
_p: PhantomPinned,
}
impl<S> RecvManaged<S> {
pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_io_uring();
Ok(Self {
fd,
buffer_group: buffer_pool.buffer_group(),
len: len.try_into().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
})?,
flags,
_p: PhantomPinned,
})
}
}
unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let fd = self.fd.as_fd().as_raw_fd();
opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
.flags(self.flags)
.buf_group(self.buffer_group)
.build()
.flags(Flags::BUFFER_SELECT)
.into()
}
}
impl<S> TakeBuffer for RecvManaged<S> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;
fn take_buffer(
self,
buffer_pool: &Self::BufferPool,
result: io::Result<usize>,
buffer_id: u16,
) -> io::Result<Self::Buffer<'_>> {
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_io_uring();
let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
#[cfg(fusion)]
let res = res.map(BorrowedBuffer::new_io_uring);
res
}
}
pin_project! {
pub struct RecvFromManaged<S> {
fd: S,
buffer_group: u16,
flags: i32,
addr: SockAddrStorage,
addr_len: socklen_t,
iovec: libc::iovec,
msg: libc::msghdr,
_p: PhantomPinned,
}
}
impl<S> RecvFromManaged<S> {
pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_io_uring();
let len: u32 = len.try_into().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
})?;
let addr = SockAddrStorage::zeroed();
Ok(Self {
fd,
buffer_group: buffer_pool.buffer_group(),
flags,
addr_len: addr.size_of() as _,
addr,
iovec: libc::iovec {
iov_base: ptr::null_mut(),
iov_len: len as _,
},
msg: unsafe { std::mem::zeroed() },
_p: PhantomPinned,
})
}
}
unsafe impl<S: AsFd> OpCode for RecvFromManaged<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
this.msg.msg_name = this.addr as *mut _ as _;
this.msg.msg_namelen = *this.addr_len;
this.msg.msg_iov = this.iovec as *const _ as *mut _;
this.msg.msg_iovlen = 1;
opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
.flags(*this.flags as _)
.buf_group(*this.buffer_group)
.build()
.flags(Flags::BUFFER_SELECT)
.into()
}
}
impl<S> TakeBuffer for RecvFromManaged<S> {
type Buffer<'a> = (BorrowedBuffer<'a>, SockAddr);
type BufferPool = BufferPool;
fn take_buffer(
self,
buffer_pool: &Self::BufferPool,
result: io::Result<usize>,
buffer_id: u16,
) -> io::Result<Self::Buffer<'_>> {
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_io_uring();
let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
let addr = unsafe { SockAddr::new(self.addr, self.addr_len) };
let buffer = unsafe { buffer_pool.get_buffer(buffer_id, result) }?;
#[cfg(fusion)]
let buffer = BorrowedBuffer::new_io_uring(buffer);
Ok((buffer, addr))
}
}
}
pub use buf_ring::{ReadManaged, ReadManagedAt, RecvFromManaged, RecvManaged};