#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
#![warn(missing_docs)]
#[cfg(all(
    target_os = "linux",
    not(feature = "io-uring"),
    not(feature = "polling")
))]
compile_error!("You must choose at least one of these features: [\"io-uring\", \"polling\"]");
use std::{io, task::Poll, time::Duration};
use compio_buf::BufResult;
use compio_log::{instrument, trace};
use slab::Slab;
mod key;
pub use key::Key;
pub mod op;
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(all())))]
mod unix;
mod asyncify;
pub use asyncify::*;
cfg_if::cfg_if! {
    if #[cfg(windows)] {
        #[path = "iocp/mod.rs"]
        mod sys;
    } else if #[cfg(all(target_os = "linux", feature = "polling", feature = "io-uring"))] {
        #[path = "fusion/mod.rs"]
        mod sys;
    } else if #[cfg(all(target_os = "linux", feature = "io-uring"))] {
        #[path = "iour/mod.rs"]
        mod sys;
    } else if #[cfg(unix)] {
        #[path = "poll/mod.rs"]
        mod sys;
    }
}
pub use sys::*;
#[cfg(windows)]
#[macro_export]
#[doc(hidden)]
macro_rules! syscall {
    (BOOL, $e:expr) => {
        $crate::syscall!($e, == 0)
    };
    (SOCKET, $e:expr) => {
        $crate::syscall!($e, != 0)
    };
    (HANDLE, $e:expr) => {
        $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
    };
    ($e:expr, $op: tt $rhs: expr) => {{
        #[allow(unused_unsafe)]
        let res = unsafe { $e };
        if res $op $rhs {
            Err(::std::io::Error::last_os_error())
        } else {
            Ok(res)
        }
    }};
}
#[cfg(unix)]
#[macro_export]
#[doc(hidden)]
macro_rules! syscall {
    (break $e:expr) => {
        match $crate::syscall!($e) {
            Ok(fd) => ::std::task::Poll::Ready(Ok(fd as usize)),
            Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
                   => ::std::task::Poll::Pending,
            Err(e) => ::std::task::Poll::Ready(Err(e)),
        }
    };
    ($e:expr, $f:ident($fd:expr)) => {
        match $crate::syscall!(break $e) {
            ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
            ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
            ::std::task::Poll::Ready(Err(e)) => Err(e),
        }
    };
    ($e:expr) => {{
        #[allow(unused_unsafe)]
        let res = unsafe { $e };
        if res == -1 {
            Err(::std::io::Error::last_os_error())
        } else {
            Ok(res)
        }
    }};
}
#[macro_export]
#[doc(hidden)]
macro_rules! impl_raw_fd {
    ($t:ty, $inner:ident) => {
        impl $crate::AsRawFd for $t {
            fn as_raw_fd(&self) -> $crate::RawFd {
                self.$inner.as_raw_fd()
            }
        }
        impl $crate::FromRawFd for $t {
            unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
                Self {
                    $inner: $crate::FromRawFd::from_raw_fd(fd),
                }
            }
        }
        impl $crate::IntoRawFd for $t {
            fn into_raw_fd(self) -> $crate::RawFd {
                self.$inner.into_raw_fd()
            }
        }
    };
}
pub enum PushEntry<K, R> {
    Pending(K),
    Ready(R),
}
impl<K, R> PushEntry<K, R> {
    pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
        match self {
            Self::Pending(k) => PushEntry::Pending(f(k)),
            Self::Ready(r) => PushEntry::Ready(r),
        }
    }
    pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
        match self {
            Self::Pending(k) => PushEntry::Pending(k),
            Self::Ready(r) => PushEntry::Ready(f(r)),
        }
    }
}
pub struct Proactor {
    driver: Driver,
    ops: Slab<RawOp>,
}
impl Proactor {
    pub fn new() -> io::Result<Self> {
        Self::builder().build()
    }
    pub fn builder() -> ProactorBuilder {
        ProactorBuilder::new()
    }
    fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
        Ok(Self {
            driver: Driver::new(builder)?,
            ops: Slab::with_capacity(builder.capacity as _),
        })
    }
    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
        self.driver.attach(fd)
    }
    pub fn cancel(&mut self, user_data: usize) {
        instrument!(compio_log::Level::DEBUG, "cancel", user_data);
        if let Some(op) = self.ops.get_mut(user_data) {
            if op.set_cancelled() {
                trace!("cancel and remove {}", user_data);
                self.ops.remove(user_data);
                return;
            }
        }
        self.driver.cancel(user_data, &mut self.ops);
    }
    pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
        let entry = self.ops.vacant_entry();
        let user_data = entry.key();
        let op = self.driver.create_op(user_data, op);
        let op = entry.insert(op);
        match self.driver.push(user_data, op) {
            Poll::Pending => PushEntry::Pending(unsafe { Key::new(user_data) }),
            Poll::Ready(res) => {
                let mut op = self.ops.remove(user_data);
                op.set_result(res);
                PushEntry::Ready(unsafe { op.into_inner::<T>() })
            }
        }
    }
    pub fn poll(
        &mut self,
        timeout: Option<Duration>,
        entries: &mut impl Extend<usize>,
    ) -> io::Result<()> {
        unsafe {
            self.driver
                .poll(timeout, OutEntries::new(entries, &mut self.ops))?;
        }
        Ok(())
    }
    pub fn pop<T: OpCode>(&mut self, user_data: Key<T>) -> BufResult<usize, T> {
        instrument!(compio_log::Level::DEBUG, "pop", ?user_data);
        let op = self
            .ops
            .try_remove(*user_data)
            .expect("the entry should be valid");
        trace!("poped {}", *user_data);
        unsafe { op.into_inner::<T>() }
    }
    pub fn has_result(&self, user_data: usize) -> bool {
        self.ops
            .get(user_data)
            .map(|op| op.has_result())
            .unwrap_or_default()
    }
    pub fn handle(&self) -> io::Result<NotifyHandle> {
        self.driver.handle()
    }
}
impl AsRawFd for Proactor {
    fn as_raw_fd(&self) -> RawFd {
        self.driver.as_raw_fd()
    }
}
#[derive(Debug)]
pub(crate) struct Entry {
    user_data: usize,
    result: io::Result<usize>,
}
impl Entry {
    pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
        Self { user_data, result }
    }
    pub fn user_data(&self) -> usize {
        self.user_data
    }
    pub fn into_result(self) -> io::Result<usize> {
        self.result
    }
}
struct OutEntries<'a, 'b, E> {
    entries: &'b mut E,
    registry: &'a mut Slab<RawOp>,
}
impl<'a, 'b, E> OutEntries<'a, 'b, E> {
    pub fn new(entries: &'b mut E, registry: &'a mut Slab<RawOp>) -> Self {
        Self { entries, registry }
    }
    #[allow(dead_code)]
    pub fn registry(&mut self) -> &mut Slab<RawOp> {
        self.registry
    }
}
impl<E: Extend<usize>> Extend<Entry> for OutEntries<'_, '_, E> {
    fn extend<T: IntoIterator<Item = Entry>>(&mut self, iter: T) {
        self.entries.extend(iter.into_iter().filter_map(|e| {
            let user_data = e.user_data();
            if self.registry[user_data].set_result(e.into_result()) {
                self.registry.remove(user_data);
                None
            } else {
                Some(user_data)
            }
        }))
    }
}
#[derive(Debug, Clone)]
enum ThreadPoolBuilder {
    Create { limit: usize, recv_limit: Duration },
    Reuse(AsyncifyPool),
}
impl Default for ThreadPoolBuilder {
    fn default() -> Self {
        Self::new()
    }
}
impl ThreadPoolBuilder {
    pub fn new() -> Self {
        Self::Create {
            limit: 256,
            recv_limit: Duration::from_secs(60),
        }
    }
    pub fn create_or_reuse(&self) -> AsyncifyPool {
        match self {
            Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
            Self::Reuse(pool) => pool.clone(),
        }
    }
}
#[derive(Debug, Clone)]
pub struct ProactorBuilder {
    capacity: u32,
    pool_builder: ThreadPoolBuilder,
}
impl Default for ProactorBuilder {
    fn default() -> Self {
        Self::new()
    }
}
impl ProactorBuilder {
    pub fn new() -> Self {
        Self {
            capacity: 1024,
            pool_builder: ThreadPoolBuilder::new(),
        }
    }
    pub fn capacity(&mut self, capacity: u32) -> &mut Self {
        self.capacity = capacity;
        self
    }
    pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
        if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
            *limit = value;
        }
        self
    }
    pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
        if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
            *recv_limit = timeout;
        }
        self
    }
    pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
        self.pool_builder = ThreadPoolBuilder::Reuse(pool);
        self
    }
    pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
        self.reuse_thread_pool(self.create_or_get_thread_pool());
        self
    }
    pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
        self.pool_builder.create_or_reuse()
    }
    pub fn build(&self) -> io::Result<Proactor> {
        Proactor::with_builder(self)
    }
}