#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use futures_lite::pin;
use scoped_tls::scoped_thread_local;
use waker_fn::waker_fn;
#[cfg(feature = "async-io")]
use async_io::parking;
#[cfg(not(feature = "async-io"))]
use parking;
scoped_thread_local!(static EX: Executor);
scoped_thread_local!(static LOCAL_EX: LocalExecutor);
#[derive(Debug)]
pub struct Executor {
ex: Arc<multitask::Executor>,
}
impl Executor {
pub fn new() -> Executor {
Executor {
ex: Arc::new(multitask::Executor::new()),
}
}
pub fn spawner(&self) -> Spawner {
Spawner {
ex: self.ex.clone(),
}
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
Task(self.ex.spawn(future))
}
pub fn enter<T>(&self, f: impl FnOnce() -> T) -> T {
if EX.is_set() {
panic!("cannot call `Executor::enter()` if already inside an `Executor`");
}
EX.set(self, f)
}
pub fn run<T>(&self, future: impl Future<Output = T>) -> T {
self.enter(|| {
let (p, u) = parking::pair();
let ticker = self.ex.ticker({
let u = u.clone();
move || u.unpark()
});
pin!(future);
let waker = waker_fn(move || u.unpark());
let cx = &mut Context::from_waker(&waker);
'start: loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
break t;
}
for _ in 0..200 {
if !ticker.tick() {
p.park();
continue 'start;
}
}
p.park_timeout(Duration::from_secs(0));
}
})
}
}
impl Default for Executor {
fn default() -> Executor {
Executor::new()
}
}
#[derive(Debug)]
pub struct Spawner {
ex: Arc<multitask::Executor>,
}
impl Spawner {
pub fn current() -> Spawner {
if EX.is_set() {
EX.with(|ex| ex.spawner())
} else {
panic!("`Spawner::current()` must be called from an `Executor`")
}
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
Task(self.ex.spawn(future))
}
}
#[derive(Debug)]
pub struct LocalExecutor {
ex: multitask::LocalExecutor,
parker: parking::Parker,
}
impl LocalExecutor {
pub fn new() -> LocalExecutor {
let (p, u) = parking::pair();
LocalExecutor {
ex: multitask::LocalExecutor::new(move || u.unpark()),
parker: p,
}
}
pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
Task(self.ex.spawn(future))
}
pub fn run<T>(&self, future: impl Future<Output = T>) -> T {
pin!(future);
let u = self.parker.unparker();
let waker = waker_fn(move || u.unpark());
let cx = &mut Context::from_waker(&waker);
LOCAL_EX.set(self, || {
'start: loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
break t;
}
for _ in 0..200 {
if !self.ex.tick() {
self.parker.park();
continue 'start;
}
}
self.parker.park_timeout(Duration::from_secs(0));
}
})
}
}
impl Default for LocalExecutor {
fn default() -> LocalExecutor {
LocalExecutor::new()
}
}
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(multitask::Task<T>);
impl<T> Task<T> {
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T>
where
T: Send + 'static,
{
if EX.is_set() {
EX.with(|ex| ex.spawn(future))
} else if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.spawn(future))
} else {
panic!("`Task::spawn()` must be called from an `Executor` or `LocalExecutor`")
}
}
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T>
where
T: 'static,
{
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.spawn(future))
} else {
panic!("`Task::local()` must be called from a `LocalExecutor`")
}
}
pub fn detach(self) {
self.0.detach();
}
pub async fn cancel(self) -> Option<T> {
self.0.cancel().await
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}