#![cfg_attr(docsrs, feature(doc_cfg))]
#![allow(unused_features)]
#![warn(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
#![doc(
html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]
#![doc(
html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]
use std::{any::Any, fmt::Debug, ptr::NonNull, task::Waker};
use crate::queue::{TaskId, TaskQueue};
mod join_handle;
mod queue;
mod task;
mod util;
mod waker;
use compio_log::{instrument, trace};
use compio_send_wrapper::SendWrapper;
use crossbeam_queue::ArrayQueue;
pub use join_handle::{JoinError, JoinHandle, ResumeUnwind};
use util::panic_guard;
cfg_if::cfg_if! {
if #[cfg(loom)] {
use loom::cell::UnsafeCell;
use loom::hint;
use loom::thread::yield_now;
use loom::sync::atomic::*;
} else {
use std::hint;
use std::thread::yield_now;
use std::sync::atomic::*;
#[repr(transparent)]
struct UnsafeCell<T>(std::cell::UnsafeCell<T>);
impl<T> UnsafeCell<T> {
pub fn new(value: T) -> Self {
Self(std::cell::UnsafeCell::new(value))
}
#[inline(always)]
pub fn with_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut T) -> R,
{
f(self.0.get())
}
#[inline(always)]
pub fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(*const T) -> R,
{
f(self.0.get())
}
}
}
}
pub(crate) type PanicResult<T> = Result<T, Panic>;
pub(crate) type Panic = Box<dyn Any + Send + 'static>;
#[derive(Debug)]
pub struct Executor {
ptr: NonNull<Shared>,
config: ExecutorConfig,
}
#[derive(Debug, Clone)]
pub struct ExecutorConfig {
pub sync_queue_size: usize,
pub local_queue_size: usize,
pub max_interval: u32,
pub waker: Option<Waker>,
}
impl Default for ExecutorConfig {
fn default() -> Self {
Self {
sync_queue_size: 64,
local_queue_size: 64,
max_interval: 61,
waker: None,
}
}
}
pub(crate) struct Shared {
waker: Option<Waker>,
sync: ArrayQueue<TaskId>,
queue: SendWrapper<TaskQueue>,
}
impl Executor {
pub fn new() -> Self {
Self::with_config(ExecutorConfig::default())
}
pub fn with_config(mut config: ExecutorConfig) -> Self {
let ptr = Box::into_raw(Box::new(Shared {
waker: config.waker.take(),
sync: ArrayQueue::new(config.sync_queue_size),
queue: SendWrapper::new(TaskQueue::new(config.local_queue_size)),
}));
Self {
config,
ptr: unsafe { NonNull::new_unchecked(ptr) },
}
}
pub fn spawn<F: Future + 'static>(&self, fut: F) -> JoinHandle<F::Output> {
let shared = self.shared();
let tracker = shared.queue.tracker();
let queue = unsafe { shared.queue.get_unchecked() };
let task = queue.insert(self.ptr, tracker, fut);
JoinHandle::new(task)
}
pub fn tick(&self) -> bool {
let queue = self.queue();
while let Some(id) = self.shared().sync.pop() {
queue.make_hot(id);
}
for id in queue.iter_hot().take(self.config.max_interval as _) {
queue.make_cold(id);
let task = queue.take(id).expect("Task was not reset back");
let res = unsafe { task.run() };
if res.is_ready() {
unsafe { task.drop() };
queue.remove(id);
} else {
queue.reset(id, task);
}
}
queue.has_hot()
}
#[doc(hidden)]
pub fn has_task(&self) -> bool {
self.queue().hot_head().is_some()
}
pub fn clear(&self) {
instrument!(compio_log::Level::TRACE, "Executor::drop");
trace!("Dropping Executor");
while self.shared().sync.pop().is_some() {}
unsafe { self.queue().clear() };
}
#[inline(always)]
fn shared(&self) -> &Shared {
unsafe { self.ptr.as_ref() }
}
#[inline(always)]
fn queue(&self) -> &TaskQueue {
unsafe { self.shared().queue.get_unchecked() }
}
}
impl Drop for Executor {
fn drop(&mut self) {
self.clear();
unsafe { drop(Box::from_raw(self.ptr.as_ptr())) };
}
}
impl Default for Executor {
fn default() -> Self {
Self::new()
}
}