#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
#![warn(missing_docs)]
#[cfg(all(
target_os = "linux",
not(feature = "io-uring"),
not(feature = "polling")
))]
compile_error!("You must choose at least one of these features: [\"io-uring\", \"polling\"]");
use std::{io, task::Poll, time::Duration};
use compio_buf::BufResult;
use compio_log::{instrument, trace};
use slab::Slab;
mod key;
pub use key::Key;
pub mod op;
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(all())))]
mod unix;
mod asyncify;
pub use asyncify::*;
cfg_if::cfg_if! {
if #[cfg(windows)] {
#[path = "iocp/mod.rs"]
mod sys;
} else if #[cfg(all(target_os = "linux", feature = "polling", feature = "io-uring"))] {
#[path = "fusion/mod.rs"]
mod sys;
} else if #[cfg(all(target_os = "linux", feature = "io-uring"))] {
#[path = "iour/mod.rs"]
mod sys;
} else if #[cfg(unix)] {
#[path = "poll/mod.rs"]
mod sys;
}
}
pub use sys::*;
#[cfg(windows)]
#[macro_export]
#[doc(hidden)]
macro_rules! syscall {
(BOOL, $e:expr) => {
$crate::syscall!($e, == 0)
};
(SOCKET, $e:expr) => {
$crate::syscall!($e, != 0)
};
(HANDLE, $e:expr) => {
$crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
};
($e:expr, $op: tt $rhs: expr) => {{
#[allow(unused_unsafe)]
let res = unsafe { $e };
if res $op $rhs {
Err(::std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
#[cfg(unix)]
#[macro_export]
#[doc(hidden)]
macro_rules! syscall {
(break $e:expr) => {
match $crate::syscall!($e) {
Ok(fd) => ::std::task::Poll::Ready(Ok(fd as usize)),
Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
=> ::std::task::Poll::Pending,
Err(e) => ::std::task::Poll::Ready(Err(e)),
}
};
($e:expr, $f:ident($fd:expr)) => {
match $crate::syscall!(break $e) {
::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
::std::task::Poll::Ready(Err(e)) => Err(e),
}
};
($e:expr) => {{
#[allow(unused_unsafe)]
let res = unsafe { $e };
if res == -1 {
Err(::std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
#[macro_export]
#[doc(hidden)]
macro_rules! impl_raw_fd {
($t:ty, $inner:ident) => {
impl $crate::AsRawFd for $t {
fn as_raw_fd(&self) -> $crate::RawFd {
self.$inner.as_raw_fd()
}
}
impl $crate::FromRawFd for $t {
unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
Self {
$inner: $crate::FromRawFd::from_raw_fd(fd),
}
}
}
impl $crate::IntoRawFd for $t {
fn into_raw_fd(self) -> $crate::RawFd {
self.$inner.into_raw_fd()
}
}
};
}
pub enum PushEntry<K, R> {
Pending(K),
Ready(R),
}
impl<K, R> PushEntry<K, R> {
pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
match self {
Self::Pending(k) => PushEntry::Pending(f(k)),
Self::Ready(r) => PushEntry::Ready(r),
}
}
pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
match self {
Self::Pending(k) => PushEntry::Pending(k),
Self::Ready(r) => PushEntry::Ready(f(r)),
}
}
}
pub struct Proactor {
driver: Driver,
ops: Slab<RawOp>,
}
impl Proactor {
pub fn new() -> io::Result<Self> {
Self::builder().build()
}
pub fn builder() -> ProactorBuilder {
ProactorBuilder::new()
}
fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
Ok(Self {
driver: Driver::new(builder)?,
ops: Slab::with_capacity(builder.capacity as _),
})
}
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
self.driver.attach(fd)
}
pub fn cancel(&mut self, user_data: usize) {
instrument!(compio_log::Level::DEBUG, "cancel", user_data);
if let Some(op) = self.ops.get_mut(user_data) {
if op.set_cancelled() {
trace!("cancel and remove {}", user_data);
self.ops.remove(user_data);
return;
}
}
self.driver.cancel(user_data, &mut self.ops);
}
pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
let entry = self.ops.vacant_entry();
let user_data = entry.key();
let op = self.driver.create_op(user_data, op);
let op = entry.insert(op);
match self.driver.push(user_data, op) {
Poll::Pending => PushEntry::Pending(unsafe { Key::new(user_data) }),
Poll::Ready(res) => {
let mut op = self.ops.remove(user_data);
op.set_result(res);
PushEntry::Ready(unsafe { op.into_inner::<T>() })
}
}
}
pub fn poll(
&mut self,
timeout: Option<Duration>,
entries: &mut impl Extend<usize>,
) -> io::Result<()> {
unsafe {
self.driver
.poll(timeout, OutEntries::new(entries, &mut self.ops))?;
}
Ok(())
}
pub fn pop<T: OpCode>(&mut self, user_data: Key<T>) -> BufResult<usize, T> {
instrument!(compio_log::Level::DEBUG, "pop", ?user_data);
let op = self
.ops
.try_remove(*user_data)
.expect("the entry should be valid");
trace!("poped {}", *user_data);
unsafe { op.into_inner::<T>() }
}
pub fn has_result(&self, user_data: usize) -> bool {
self.ops
.get(user_data)
.map(|op| op.has_result())
.unwrap_or_default()
}
pub fn handle(&self) -> io::Result<NotifyHandle> {
self.driver.handle()
}
}
impl AsRawFd for Proactor {
fn as_raw_fd(&self) -> RawFd {
self.driver.as_raw_fd()
}
}
#[derive(Debug)]
pub(crate) struct Entry {
user_data: usize,
result: io::Result<usize>,
}
impl Entry {
pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
Self { user_data, result }
}
pub fn user_data(&self) -> usize {
self.user_data
}
pub fn into_result(self) -> io::Result<usize> {
self.result
}
}
struct OutEntries<'a, 'b, E> {
entries: &'b mut E,
registry: &'a mut Slab<RawOp>,
}
impl<'a, 'b, E> OutEntries<'a, 'b, E> {
pub fn new(entries: &'b mut E, registry: &'a mut Slab<RawOp>) -> Self {
Self { entries, registry }
}
#[allow(dead_code)]
pub fn registry(&mut self) -> &mut Slab<RawOp> {
self.registry
}
}
impl<E: Extend<usize>> Extend<Entry> for OutEntries<'_, '_, E> {
fn extend<T: IntoIterator<Item = Entry>>(&mut self, iter: T) {
self.entries.extend(iter.into_iter().filter_map(|e| {
let user_data = e.user_data();
if self.registry[user_data].set_result(e.into_result()) {
self.registry.remove(user_data);
None
} else {
Some(user_data)
}
}))
}
}
#[derive(Debug, Clone)]
enum ThreadPoolBuilder {
Create { limit: usize, recv_limit: Duration },
Reuse(AsyncifyPool),
}
impl Default for ThreadPoolBuilder {
fn default() -> Self {
Self::new()
}
}
impl ThreadPoolBuilder {
pub fn new() -> Self {
Self::Create {
limit: 256,
recv_limit: Duration::from_secs(60),
}
}
pub fn create_or_reuse(&self) -> AsyncifyPool {
match self {
Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
Self::Reuse(pool) => pool.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct ProactorBuilder {
capacity: u32,
pool_builder: ThreadPoolBuilder,
}
impl Default for ProactorBuilder {
fn default() -> Self {
Self::new()
}
}
impl ProactorBuilder {
pub fn new() -> Self {
Self {
capacity: 1024,
pool_builder: ThreadPoolBuilder::new(),
}
}
pub fn capacity(&mut self, capacity: u32) -> &mut Self {
self.capacity = capacity;
self
}
pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
*limit = value;
}
self
}
pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
*recv_limit = timeout;
}
self
}
pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
self.pool_builder = ThreadPoolBuilder::Reuse(pool);
self
}
pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
self.reuse_thread_pool(self.create_or_get_thread_pool());
self
}
pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
self.pool_builder.create_or_reuse()
}
pub fn build(&self) -> io::Result<Proactor> {
Proactor::with_builder(self)
}
}