#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
#![warn(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
#![doc(
html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]
#![doc(
html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]
use std::{
io,
task::{Poll, Waker},
time::Duration,
};
use compio_buf::BufResult;
use compio_log::instrument;
mod macros;
mod key;
pub use key::Key;
mod asyncify;
pub use asyncify::*;
pub mod op;
mod fd;
pub use fd::*;
mod driver_type;
pub use driver_type::*;
mod buffer_pool;
pub use buffer_pool::*;
mod sys;
pub use sys::*;
use crate::key::ErasedKey;
mod sys_slice;
#[derive(Debug)]
pub enum PushEntry<K, R> {
Pending(K),
Ready(R),
}
impl<K, R> PushEntry<K, R> {
pub const fn is_ready(&self) -> bool {
matches!(self, Self::Ready(_))
}
pub fn take_ready(self) -> Option<R> {
match self {
Self::Pending(_) => None,
Self::Ready(res) => Some(res),
}
}
pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
match self {
Self::Pending(k) => PushEntry::Pending(f(k)),
Self::Ready(r) => PushEntry::Ready(r),
}
}
pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
match self {
Self::Pending(k) => PushEntry::Pending(k),
Self::Ready(r) => PushEntry::Ready(f(r)),
}
}
}
pub struct Proactor {
driver: Driver,
}
impl Proactor {
pub fn new() -> io::Result<Self> {
Self::builder().build()
}
pub fn builder() -> ProactorBuilder {
ProactorBuilder::new()
}
fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
Ok(Self {
driver: Driver::new(builder)?,
})
}
pub fn default_extra(&self) -> Extra {
self.driver.default_extra().into()
}
pub fn driver_type(&self) -> DriverType {
self.driver.driver_type()
}
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
self.driver.attach(fd)
}
pub fn cancel<T: OpCode>(&mut self, key: Key<T>) -> Option<BufResult<usize, T>> {
instrument!(compio_log::Level::DEBUG, "cancel", ?key);
key.set_cancelled();
if key.has_result() {
Some(key.take_result())
} else {
self.driver.cancel(key);
None
}
}
pub fn push<T: sys::OpCode + 'static>(
&mut self,
op: T,
) -> PushEntry<Key<T>, BufResult<usize, T>> {
self.push_with_extra(op, self.default_extra())
}
pub fn push_with_extra<T: sys::OpCode + 'static>(
&mut self,
op: T,
extra: Extra,
) -> PushEntry<Key<T>, BufResult<usize, T>> {
let key = Key::new(op, extra);
match self.driver.push(key.clone().erase()) {
Poll::Pending => PushEntry::Pending(key),
Poll::Ready(res) => {
key.set_result(res);
PushEntry::Ready(key.take_result())
}
}
}
pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
self.driver.poll(timeout)
}
pub fn pop<T>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
instrument!(compio_log::Level::DEBUG, "pop", ?key);
if key.has_result() {
PushEntry::Ready(key.take_result())
} else {
PushEntry::Pending(key)
}
}
pub fn pop_with_extra<T>(
&mut self,
key: Key<T>,
) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
instrument!(compio_log::Level::DEBUG, "pop", ?key);
if key.has_result() {
let extra = key.swap_extra(self.default_extra());
let res = key.take_result();
PushEntry::Ready((res, extra))
} else {
PushEntry::Pending(key)
}
}
pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: &Waker) {
op.set_waker(waker);
}
pub fn waker(&self) -> Waker {
self.driver.waker()
}
pub fn create_buffer_pool(
&mut self,
buffer_len: u16,
buffer_size: usize,
) -> io::Result<BufferPool> {
self.driver.create_buffer_pool(buffer_len, buffer_size)
}
pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
unsafe { self.driver.release_buffer_pool(buffer_pool) }
}
pub fn register_personality(&self) -> io::Result<u16> {
fn unsupported() -> io::Error {
io::Error::new(
io::ErrorKind::Unsupported,
"Personality is only supported on io-uring driver",
)
}
#[cfg(io_uring)]
match self.driver.as_iour() {
Some(iour) => iour.register_personality(),
None => Err(unsupported()),
}
#[cfg(not(io_uring))]
Err(unsupported())
}
pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
fn unsupported(_: u16) -> io::Error {
io::Error::new(
io::ErrorKind::Unsupported,
"Personality is only supported on io-uring driver",
)
}
#[cfg(io_uring)]
match self.driver.as_iour() {
Some(iour) => iour.unregister_personality(personality),
None => Err(unsupported(personality)),
}
#[cfg(not(io_uring))]
Err(unsupported(personality))
}
}
impl AsRawFd for Proactor {
fn as_raw_fd(&self) -> RawFd {
self.driver.as_raw_fd()
}
}
#[derive(Debug)]
pub(crate) struct Entry {
key: ErasedKey,
result: io::Result<usize>,
#[cfg(io_uring)]
flags: u32,
}
unsafe impl Send for Entry {}
unsafe impl Sync for Entry {}
impl Entry {
pub(crate) fn new(key: ErasedKey, result: io::Result<usize>) -> Self {
#[cfg(not(io_uring))]
{
Self { key, result }
}
#[cfg(io_uring)]
{
Self {
key,
result,
flags: 0,
}
}
}
#[allow(dead_code)]
pub fn user_data(&self) -> usize {
self.key.as_raw()
}
#[allow(dead_code)]
pub fn into_key(self) -> ErasedKey {
self.key
}
#[cfg(io_uring)]
pub fn flags(&self) -> u32 {
self.flags
}
#[cfg(io_uring)]
pub(crate) fn set_flags(&mut self, flags: u32) {
self.flags = flags;
}
pub fn notify(self) {
#[cfg(io_uring)]
self.key.borrow().extra_mut().set_flags(self.flags());
self.key.set_result(self.result);
}
}
#[derive(Debug, Clone)]
enum ThreadPoolBuilder {
Create { limit: usize, recv_limit: Duration },
Reuse(AsyncifyPool),
}
impl Default for ThreadPoolBuilder {
fn default() -> Self {
Self::new()
}
}
impl ThreadPoolBuilder {
pub fn new() -> Self {
Self::Create {
limit: 256,
recv_limit: Duration::from_secs(60),
}
}
pub fn create_or_reuse(&self) -> AsyncifyPool {
match self {
Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
Self::Reuse(pool) => pool.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct ProactorBuilder {
capacity: u32,
pool_builder: ThreadPoolBuilder,
sqpoll_idle: Option<Duration>,
coop_taskrun: bool,
taskrun_flag: bool,
eventfd: Option<RawFd>,
driver_type: Option<DriverType>,
}
unsafe impl Send for ProactorBuilder {}
unsafe impl Sync for ProactorBuilder {}
impl Default for ProactorBuilder {
fn default() -> Self {
Self::new()
}
}
impl ProactorBuilder {
pub fn new() -> Self {
Self {
capacity: 1024,
pool_builder: ThreadPoolBuilder::new(),
sqpoll_idle: None,
coop_taskrun: false,
taskrun_flag: false,
eventfd: None,
driver_type: None,
}
}
pub fn capacity(&mut self, capacity: u32) -> &mut Self {
self.capacity = capacity;
self
}
pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
*limit = value;
}
self
}
pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
*recv_limit = timeout;
}
self
}
pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
self.pool_builder = ThreadPoolBuilder::Reuse(pool);
self
}
pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
self.reuse_thread_pool(self.create_or_get_thread_pool());
self
}
pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
self.pool_builder.create_or_reuse()
}
pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
self.sqpoll_idle = Some(idle);
self
}
pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
self.coop_taskrun = enable;
self
}
pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
self.taskrun_flag = enable;
self
}
pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
self.eventfd = Some(fd);
self
}
pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
self.driver_type = Some(t);
self
}
pub fn build(&self) -> io::Result<Proactor> {
Proactor::with_builder(self)
}
}