#![warn(missing_docs)]
use core::fmt;
use std::{
cell::{Cell, RefCell},
future::Future,
hash::{Hash, Hasher},
pin::Pin,
ptr,
rc::Rc,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};
#[cfg(feature = "futures")]
use futures::future::FusedFuture;
use slab::Slab;
fn dummy_raw_waker() -> RawWaker {
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker {
dummy_raw_waker()
}
let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
RawWaker::new(std::ptr::null::<()>(), vtable)
}
fn dummy_waker() -> Waker {
unsafe { Waker::from_raw(dummy_raw_waker()) }
}
#[derive(Clone)]
struct EventHandleInner {
index: usize,
executor: Rc<ExecutorInner>,
}
impl fmt::Debug for EventHandleInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.index.fmt(f)
}
}
impl Eq for EventHandleInner {}
impl PartialEq for EventHandleInner {
fn eq(&self, other: &Self) -> bool {
self.index == other.index && ptr::eq(self.executor.as_ref(), other.executor.as_ref())
}
}
impl Hash for EventHandleInner {
fn hash<H: Hasher>(&self, state: &mut H) {
self.index.hash(state);
(self.executor.as_ref() as *const ExecutorInner).hash(state);
}
}
impl Drop for EventHandleInner {
fn drop(&mut self) {
self.executor.release_event_handle(self);
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct EventHandle(Rc<EventHandleInner>);
type SharedBool = Rc<Cell<bool>>;
pub struct EventFuture {
ready: SharedBool,
_handle: EventHandle,
done: bool,
}
impl Future for EventFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
if self.ready.get() {
self.done = true;
Poll::Ready(())
} else {
Poll::Pending
}
}
}
#[cfg(feature = "futures")]
impl FusedFuture for EventFuture {
fn is_terminated(&self) -> bool {
self.done
}
}
struct Task {
future: Pin<Box<dyn Future<Output = ()>>>,
}
impl Task {
pub fn new(future: impl Future<Output = ()> + 'static) -> Task {
Task {
future: Box::pin(future),
}
}
fn poll(&mut self, context: &mut Context) -> Poll<()> {
self.future.as_mut().poll(context)
}
}
#[derive(Default)]
struct ExecutorInner {
task_queue: RefCell<Vec<Task>>,
new_tasks: RefCell<Vec<Task>>,
events: RefCell<Slab<SharedBool>>,
}
impl ExecutorInner {
fn release_event_handle(&self, event: &EventHandleInner) {
self.events.borrow_mut().remove(event.index);
}
}
#[derive(Clone, Default)]
pub struct Executor {
inner: Rc<ExecutorInner>,
}
impl Executor {
pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
self.inner.new_tasks.borrow_mut().push(Task::new(future));
}
pub fn create_event_handle(&self) -> EventHandle {
let mut events = self.inner.events.borrow_mut();
let index = events.insert(Rc::new(Cell::new(false)));
EventHandle(Rc::new(EventHandleInner {
index,
executor: self.inner.clone(),
}))
}
pub fn notify_event(&self, handle: &EventHandle) {
self.inner.events.borrow_mut()[handle.0.index].replace(true);
}
pub fn event(&self, handle: &EventHandle) -> EventFuture {
let ready = self.inner.events.borrow_mut()[handle.0.index].clone();
EventFuture {
ready,
_handle: handle.clone(),
done: false,
}
}
pub fn step(&self) -> bool {
let waker = dummy_waker();
let mut context = Context::from_waker(&waker);
let mut tasks = self.inner.task_queue.borrow_mut();
tasks.append(&mut self.inner.new_tasks.borrow_mut());
let mut uncompleted_tasks = Vec::new();
let mut any_left = false;
for mut task in tasks.drain(..) {
match task.poll(&mut context) {
Poll::Ready(()) => {} Poll::Pending => {
uncompleted_tasks.push(task);
any_left = true;
}
}
}
*tasks = uncompleted_tasks;
for (_, event) in self.inner.events.borrow_mut().iter_mut() {
event.replace(false);
}
any_left
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn nop() {}
#[test]
fn test_nop() {
let executor = Executor::default();
executor.spawn(nop());
assert_eq!(executor.step(), false);
}
#[test]
fn test_event() {
let executor = Executor::default();
let events = [
executor.create_event_handle(),
executor.create_event_handle(),
];
async fn wait_event(events: [EventHandle; 2], executor: Executor) {
println!("before awaits");
executor.event(&events[0]).await;
println!("between awaits");
executor.event(&events[1]).await;
println!("after awaits");
}
executor.spawn(wait_event(events.clone(), executor.clone()));
println!("spawned");
assert_eq!(executor.step(), true);
assert_eq!(executor.inner.task_queue.borrow().len(), 1);
println!("step 1");
assert_eq!(executor.step(), true);
println!("step 2");
executor.notify_event(&events[0]);
println!("notified 1");
assert_eq!(executor.step(), true);
executor.notify_event(&events[1]);
println!("notified 2");
assert_eq!(executor.step(), false);
println!("step 3");
assert_eq!(executor.inner.task_queue.borrow().len(), 0);
}
#[test]
#[cfg(feature = "futures")]
fn test_select() {
use futures::select;
let first_event_id = Rc::new(Cell::new(2));
async fn wait_event(
events: [EventHandle; 2],
event_loop: Executor,
first_event_id: Rc<Cell<usize>>,
) {
println!("before select");
let mut fut0 = event_loop.event(&events[0]);
let mut fut1 = event_loop.event(&events[1]);
select! {
() = fut0 => { println!("event 0 fired first"); first_event_id.set(0); },
() = fut1 => { println!("event 1 fired first"); first_event_id.set(1); }
}
println!("after select");
}
for i in 0..2 {
println!("Testing event {}", i);
let executor = Executor::default();
{
let events = [
executor.create_event_handle(),
executor.create_event_handle(),
];
executor.spawn(wait_event(
events.clone(),
executor.clone(),
first_event_id.clone(),
));
println!("spawned");
assert_eq!(executor.step(), true);
assert_eq!(executor.inner.task_queue.borrow().len(), 1);
println!("step 1");
assert_eq!(executor.step(), true);
println!("step 2");
executor.notify_event(&events[i]);
println!("notified");
assert_eq!(executor.step(), false);
println!("step 3");
assert_eq!(first_event_id.get(), i);
assert_eq!(executor.inner.task_queue.borrow().len(), 0);
}
assert_eq!(executor.inner.events.borrow().len(), 0);
}
}
}