use std::{
cell::RefCell,
future::Future,
mem::transmute,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::{sync_channel, Receiver},
Arc,
},
task::{ready, Context, Poll, Wake, Waker},
};
use slab::Slab;
use crate::{
futures::event::{Event, EventWaiter},
reactor::Reactor,
};
struct TaskId(AtomicUsize);
impl Wake for TaskId {
fn wake(self: Arc<TaskId>) {
EXEC.with(|exec| {
let mut exec = exec.borrow_mut();
if let Some(task) = exec.waiting.try_remove(self.0.load(Ordering::Relaxed)) {
exec.run_q.push(task);
}
});
}
}
struct Task {
id: Arc<TaskId>,
future: Pin<Box<dyn Future<Output = ()>>>,
}
pub struct Executor {
waiting: Slab<Task>,
run_q: Vec<Task>,
}
thread_local! {
static EXEC: RefCell<Executor> = const { RefCell::new(
Executor {
waiting: Slab::new(),
run_q: Vec::new(),
}
)}
}
pub struct TaskJoiner<'a, T> {
rx: Receiver<T>,
_evt: Event,
finished: EventWaiter<'a>,
}
impl<'a, T> TaskJoiner<'a, T> {
pub fn join(self) -> T {
self.rx.recv().unwrap()
}
}
impl<'a, T> Future for TaskJoiner<'a, T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.finished).poll(cx)).unwrap();
Poll::Ready(self.rx.recv().unwrap())
}
}
impl Executor {
pub fn spawn<'a, Fut, T>(f: Fut) -> TaskJoiner<'a, T>
where
Fut: Future<Output = T> + 'static,
T: Send + 'static,
{
let (tx, rx) = sync_channel(1);
let mut evt = Event::new().unwrap();
let evt2 = evt.clone();
let fut = async move {
let value = f.await;
let _ = evt2.notify_one();
let _ = tx.send(value);
};
let task = Task {
id: Arc::new(TaskId(AtomicUsize::new(0))),
future: Box::pin(fut),
};
EXEC.with(|exec| {
exec.borrow_mut().run_q.push(task);
});
let waiter: EventWaiter<'static> = unsafe { transmute(evt.wait()) };
TaskJoiner {
rx,
_evt: evt,
finished: waiter,
}
}
pub fn block_on<Fut, T>(f: Fut) -> T
where
Fut: Future<Output = T> + 'static,
T: Send + 'static,
{
let joiner = Self::spawn(f);
Self::executor_loop();
joiner.join()
}
pub fn run() {
Self::executor_loop()
}
fn executor_loop() {
EXEC.with(|exec| loop {
if exec.borrow().run_q.is_empty() {
Reactor::react();
}
let mut task = exec.borrow_mut().run_q.pop().unwrap();
let waker = Waker::from(task.id.clone());
let mut cx = Context::from_waker(&waker);
match task.future.as_mut().poll(&mut cx) {
Poll::Ready(()) => {}
Poll::Pending => {
let waiting = &mut exec.borrow_mut().waiting;
let slot = waiting.vacant_entry();
task.id.0.store(slot.key(), Ordering::Relaxed);
slot.insert(task);
}
}
if exec.borrow().run_q.is_empty() && exec.borrow().waiting.is_empty() {
return;
}
});
}
}