use alloc::{sync::Arc, task::Wake, vec::Vec};
use core::{cell::Cell, fmt, future::Future};
use crate::prelude::*;
pub trait Spawn: Clone {
fn spawn_local(&self, f: impl Future<Output = ()> + 'static);
#[inline(always)]
fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
self.spawn_local(f)
}
}
#[doc = include_str!("../examples/executor.rs")]
#[doc = include_str!("../examples/spawn.rs")]
#[doc = include_str!("../examples/recursive.rs")]
#[doc = include_str!("../examples/resume.rs")]
pub struct Executor<P: Pool = DefaultPool>(Arc<P>);
impl Default for Executor {
fn default() -> Self {
Self::new(Default::default())
}
}
impl<P: Pool> Clone for Executor<P> {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl<P: Pool + fmt::Debug> fmt::Debug for Executor<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Executor").field(&self.0).finish()
}
}
impl<P: Pool> Executor<P> {
#[inline(always)]
pub fn new(pool: P) -> Self {
Self(Arc::new(pool))
}
#[inline(always)]
pub fn block_on(self, f: impl Future<Output = ()> + 'static) {
#[cfg(feature = "web")]
wasm_bindgen_futures::spawn_local(f);
#[cfg(not(feature = "web"))]
block_on(f, self.0);
}
}
impl<P: Pool> Spawn for Executor<P> {
#[inline(always)]
fn spawn_local(&self, f: impl Future<Output = ()> + 'static) {
self.0.push(Box::pin(f.fuse()))
}
}
#[doc = include_str!("../examples/pool.rs")]
pub trait Pool {
type Park: Park;
fn push(&self, task: LocalBoxNotifier<'static, ()>);
fn drain(&self, tasks: &mut Vec<LocalBoxNotifier<'static, ()>>) -> bool;
}
pub trait Park: Default + Send + Sync + 'static {
fn park(&self);
fn unpark(&self);
}
#[derive(Default)]
pub struct DefaultPool {
spawning_queue: Cell<Vec<LocalBoxNotifier<'static, ()>>>,
}
impl fmt::Debug for DefaultPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let queue = self.spawning_queue.take();
f.debug_struct("DefaultPool")
.field("spawning_queue", &queue)
.finish()?;
self.spawning_queue.set(queue);
Ok(())
}
}
impl Pool for DefaultPool {
type Park = DefaultPark;
#[inline(always)]
fn push(&self, task: LocalBoxNotifier<'static>) {
let mut queue = self.spawning_queue.take();
queue.push(task);
self.spawning_queue.set(queue);
}
#[inline(always)]
fn drain(&self, tasks: &mut Vec<LocalBoxNotifier<'static>>) -> bool {
let mut queue = self.spawning_queue.take();
let mut drained = queue.drain(..).peekable();
let has_drained = drained.peek().is_some();
tasks.extend(drained);
self.spawning_queue.set(queue);
has_drained
}
}
#[cfg(not(feature = "std"))]
#[derive(Copy, Clone, Debug, Default)]
pub struct DefaultPark;
#[cfg(feature = "std")]
#[derive(Debug)]
pub struct DefaultPark(std::sync::atomic::AtomicBool, std::thread::Thread);
#[cfg(feature = "std")]
impl Default for DefaultPark {
fn default() -> Self {
Self(
std::sync::atomic::AtomicBool::new(true),
std::thread::current(),
)
}
}
impl Park for DefaultPark {
#[inline(always)]
fn park(&self) {
#[cfg(feature = "std")]
while self.0.swap(true, std::sync::atomic::Ordering::SeqCst) {
std::thread::park();
}
#[cfg(not(feature = "std"))]
core::hint::spin_loop();
}
#[inline(always)]
fn unpark(&self) {
#[cfg(feature = "std")]
if self.0.swap(false, std::sync::atomic::Ordering::SeqCst) {
self.1.unpark();
}
}
}
struct Unpark<P: Park>(P);
impl<P: Park> Wake for Unpark<P> {
#[inline(always)]
fn wake(self: Arc<Self>) {
self.0.unpark();
}
#[inline(always)]
fn wake_by_ref(self: &Arc<Self>) {
self.0.unpark();
}
}
#[cfg(not(feature = "web"))]
fn block_on<P: Pool>(f: impl Future<Output = ()> + 'static, pool: Arc<P>) {
let f: LocalBoxNotifier<'_> = Box::pin(f.fuse());
let tasks = &mut Vec::new();
let parky = Arc::new(Unpark(<P as Pool>::Park::default()));
let waker = parky.clone().into();
let tasky = &mut Task::from_waker(&waker);
tasks.push(f);
while !tasks.is_empty() {
let poll = Pin::new(tasks.as_mut_slice()).poll_next(tasky);
let Ready((task_index, ())) = poll else {
if !pool.drain(tasks) {
parky.0.park();
}
continue;
};
tasks.swap_remove(task_index);
}
}