use std::alloc::{self, alloc, dealloc};
use std::io;
use std::marker::PhantomData;
use std::os::fd::RawFd;
use std::ptr::{self, NonNull};
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::io::{Buf, BufMut, BufMutParts, BufMutSlice, BufSlice, NO_OFFSET, ReadBuf};
use crate::kqueue::fd::OpKind;
use crate::kqueue::op::{DirectOp, FdIter, FdOp, FdOpExtract};
use crate::{AsyncFd, SubmissionQueue, fd, syscall};
pub(crate) use crate::unix::{IoMutSlice, IoSlice};
pub(crate) use std::io::*;
#[derive(Debug)]
pub(crate) struct ReadBufPool {
pool_size: u16,
buf_size: u32,
bufs_addr: NonNull<u8>,
available: Box<[AtomicUsize]>,
}
impl ReadBufPool {
pub(crate) fn new(
_: SubmissionQueue,
pool_size: u16,
buf_size: u32,
) -> io::Result<ReadBufPool> {
let bufs_layout = alloc_layout_buffers(pool_size, buf_size, page_size())?;
let Some(bufs_addr) = NonNull::new(unsafe { alloc(bufs_layout) }) else {
return Err(io::ErrorKind::OutOfMemory.into());
};
let mut available_size = pool_size / usize::BITS as u16;
if !pool_size.is_multiple_of(usize::BITS as u16) {
available_size += 1;
}
let mut available: Box<[AtomicUsize]> =
unsafe { Box::new_zeroed_slice(available_size as usize).assume_init() };
if !available_size.is_multiple_of(usize::BITS as u16) {
*available[available_size as usize - 1].get_mut() =
((1 << ((available_size * usize::BITS as u16) - pool_size)) - 1)
<< (pool_size % usize::BITS as u16);
}
Ok(ReadBufPool {
pool_size,
buf_size,
bufs_addr,
available,
})
}
pub(crate) const fn buf_size(&self) -> usize {
self.buf_size as usize
}
pub(crate) fn get_buf(&self) -> Option<(NonNull<u8>, u32)> {
for (idx, available) in self.available.iter().enumerate() {
let mut value = available.load(Ordering::Relaxed);
let mut i = value.trailing_ones();
while i < usize::BITS {
value = available.fetch_or(1 << i, Ordering::AcqRel);
if is_unset(value, i as usize) {
let buf_id = (idx * usize::BITS as usize) + i as usize;
let len = self.buf_size();
let ptr = unsafe { self.bufs_addr.byte_offset((len * buf_id).cast_signed()) };
return Some((ptr, len as u32));
}
i += (value >> i).trailing_ones();
}
}
None
}
pub(crate) unsafe fn release(&self, ptr: NonNull<[u8]>) {
let buf_id = unsafe {
ptr.cast::<u8>().offset_from(self.bufs_addr).cast_unsigned() / (self.buf_size as usize)
};
let idx = buf_id / usize::BITS as usize;
let n = buf_id % usize::BITS as usize;
let old_value = self.available[idx].fetch_and(!(1 << n), Ordering::AcqRel);
debug_assert!(!is_unset(old_value, n));
}
}
impl ReadBuf {
pub(crate) fn parts_sys(&mut self) -> BufMutParts {
let (ptr, len) = if let Some((ptr, len)) = self.shared.get_buf() {
self.owned = Some(NonNull::slice_from_raw_parts(ptr, 0));
(ptr.as_ptr(), len)
} else {
(ptr::null_mut(), 0)
};
BufMutParts::Pool(PoolBufParts { ptr, len })
}
}
pub(crate) struct PoolBufParts {
ptr: *mut u8,
len: u32,
}
impl BufMutParts {
pub(crate) fn pool_ptr(self) -> io::Result<(*mut u8, u32, bool)> {
match self {
BufMutParts::Buf { len: 0, .. } => Err(io::Error::from_raw_os_error(libc::ENOBUFS)),
BufMutParts::Buf { ptr, len } => Ok((ptr, len, false)),
BufMutParts::Pool(PoolBufParts { ptr, len }) => Ok((ptr, len, true)),
}
}
}
unsafe impl Sync for ReadBufPool {}
unsafe impl Send for ReadBufPool {}
impl Drop for ReadBufPool {
fn drop(&mut self) {
let page_size = page_size();
let layout = alloc_layout_buffers(self.pool_size, self.buf_size, page_size).unwrap();
unsafe { dealloc(self.bufs_addr.as_ptr(), layout) }
}
}
fn alloc_layout_buffers(
pool_size: u16,
buf_size: u32,
page_size: usize,
) -> io::Result<alloc::Layout> {
match alloc::Layout::from_size_align(pool_size as usize * buf_size as usize, page_size) {
Ok(layout) => Ok(layout),
Err(_) => Err(io::ErrorKind::OutOfMemory.into()),
}
}
#[allow(clippy::cast_sign_loss)] fn page_size() -> usize {
unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize }
}
const fn is_unset(value: usize, n: usize) -> bool {
((value >> n) & 1) == 0
}
pub(crate) struct ReadOp<B>(PhantomData<*const B>);
impl<B: BufMut> FdOp for ReadOp<B> {
type Output = B;
type Resources = B;
type Args = u64; type OperationOutput = libc::ssize_t;
const OP_KIND: OpKind = OpKind::Read;
#[allow(clippy::cast_possible_wrap)]
fn try_run(
fd: &AsyncFd,
buf: &mut Self::Resources,
offset: &mut Self::Args,
) -> io::Result<Self::OperationOutput> {
let (ptr, len, is_pool) = buf.parts().pool_ptr()?;
let res = if *offset == NO_OFFSET {
syscall!(read(fd.fd(), ptr.cast(), len as _))
} else {
syscall!(pread(fd.fd(), ptr.cast(), len as _, *offset as _))
};
if res.is_err() && is_pool {
buf.release();
}
res
}
#[allow(clippy::cast_sign_loss)]
fn map_ok(_: &AsyncFd, mut buf: Self::Resources, n: Self::OperationOutput) -> Self::Output {
unsafe { buf.set_init(n as _) };
buf
}
}
pub(crate) struct MultishotReadOp;
impl FdIter for MultishotReadOp {
type Output = crate::io::ReadBuf;
type Resources = crate::io::ReadBufPool;
type Args = ();
type OperationOutput = (NonNull<u8>, libc::ssize_t);
const OP_KIND: OpKind = OpKind::Read;
fn try_run(
fd: &AsyncFd,
buf_pool: &mut Self::Resources,
(): &mut Self::Args,
) -> io::Result<Self::OperationOutput> {
let Some((ptr, len)) = buf_pool.shared.get_buf() else {
return Err(io::Error::from_raw_os_error(libc::ENOBUFS));
};
match syscall!(read(fd.fd(), ptr.as_ptr().cast(), len as _)) {
Ok(n) => Ok((ptr, n)),
Err(err) => {
let ptr = NonNull::slice_from_raw_parts(ptr, 0);
unsafe { buf_pool.shared.release(ptr) }
Err(err)
}
}
}
fn is_complete((_, n): &Self::OperationOutput) -> bool {
*n == 0
}
fn map_next(
_: &AsyncFd,
buf_pool: &Self::Resources,
(ptr, n): Self::OperationOutput,
) -> Self::Output {
unsafe { buf_pool.new_buffer(ptr, n.cast_unsigned()) }
}
}
pub(crate) struct ReadVectoredOp<B, const N: usize>(PhantomData<*const B>);
impl<B: BufMutSlice<N>, const N: usize> FdOp for ReadVectoredOp<B, N> {
type Output = B;
type Resources = (B, [crate::io::IoMutSlice; N]);
type Args = u64; type OperationOutput = libc::ssize_t;
const OP_KIND: OpKind = OpKind::Read;
#[allow(clippy::cast_possible_wrap)]
fn try_run(
fd: &AsyncFd,
(_, iovecs): &mut Self::Resources,
offset: &mut Self::Args,
) -> io::Result<Self::OperationOutput> {
if *offset == NO_OFFSET {
syscall!(readv(fd.fd(), iovecs.as_ptr().cast(), iovecs.len() as _))
} else {
syscall!(preadv(
fd.fd(),
iovecs.as_ptr().cast(),
iovecs.len() as _,
*offset as _
))
}
}
#[allow(clippy::cast_sign_loss)]
fn map_ok(
_: &AsyncFd,
(mut bufs, _): Self::Resources,
n: Self::OperationOutput,
) -> Self::Output {
unsafe { bufs.set_init(n as _) };
bufs
}
}
pub(crate) struct WriteOp<B>(PhantomData<*const B>);
impl<B: Buf> FdOp for WriteOp<B> {
type Output = usize;
type Resources = B;
type Args = u64; type OperationOutput = libc::ssize_t;
const OP_KIND: OpKind = OpKind::Write;
#[allow(clippy::cast_possible_wrap)]
fn try_run(
fd: &AsyncFd,
buf: &mut Self::Resources,
offset: &mut Self::Args,
) -> io::Result<Self::OperationOutput> {
let (ptr, len) = unsafe { buf.parts() };
if *offset == NO_OFFSET {
syscall!(write(fd.fd(), ptr.cast(), len as _))
} else {
syscall!(pwrite(fd.fd(), ptr.cast(), len as _, *offset as _))
}
}
fn map_ok(fd: &AsyncFd, buf: Self::Resources, n: Self::OperationOutput) -> Self::Output {
Self::map_ok_extract(fd, buf, n).1
}
}
impl<B: Buf> FdOpExtract for WriteOp<B> {
type ExtractOutput = (B, usize);
fn map_ok_extract(
_: &AsyncFd,
buf: Self::Resources,
n: Self::OperationOutput,
) -> Self::ExtractOutput {
(buf, n.cast_unsigned())
}
}
pub(crate) struct WriteVectoredOp<B, const N: usize>(PhantomData<*const B>);
impl<B: BufSlice<N>, const N: usize> FdOp for WriteVectoredOp<B, N> {
type Output = usize;
type Resources = (B, [crate::io::IoSlice; N]);
type Args = u64; type OperationOutput = libc::ssize_t;
const OP_KIND: OpKind = OpKind::Write;
#[allow(clippy::cast_possible_wrap)]
fn try_run(
fd: &AsyncFd,
(_, iovecs): &mut Self::Resources,
offset: &mut Self::Args,
) -> io::Result<Self::OperationOutput> {
if *offset == NO_OFFSET {
syscall!(writev(fd.fd(), iovecs.as_ptr().cast(), iovecs.len() as _))
} else {
syscall!(pwritev(
fd.fd(),
iovecs.as_ptr().cast(),
iovecs.len() as _,
*offset as _
))
}
}
fn map_ok(fd: &AsyncFd, resources: Self::Resources, n: Self::OperationOutput) -> Self::Output {
Self::map_ok_extract(fd, resources, n).1
}
}
impl<B: BufSlice<N>, const N: usize> FdOpExtract for WriteVectoredOp<B, N> {
type ExtractOutput = (B, usize);
fn map_ok_extract(
_: &AsyncFd,
(bufs, _): Self::Resources,
n: Self::OperationOutput,
) -> Self::ExtractOutput {
(bufs, n.cast_unsigned())
}
}
pub(crate) struct CloseOp;
impl DirectOp for CloseOp {
type Output = ();
type Resources = ();
type Args = (RawFd, fd::Kind);
fn run(
_: &SubmissionQueue,
(): Self::Resources,
(fd, kind): Self::Args,
) -> io::Result<Self::Output> {
let fd::Kind::File = kind;
syscall!(close(fd))?;
Ok(())
}
}