simple_async_local_executor/
lib.rs

1#![warn(missing_docs)]
2
3//! Single-threaded polling-based executor suitable for use in games, embedded systems or WASM.
4//!
5//! This crate provides an executor to run async functions in  single-threaded
6//! environment with deterministic execution. To do so, the executor provides a [`Executor::step()`]
7//! method that polls all non-blocked tasks exactly once. The executor also provides events
8//! that can be waited upon. These events are referred to by the [`EventHandle`] type which is
9//! instantiated by calling [`Executor::create_event_handle()`], and can be waited on by
10//! creating [`EventFuture`] by calling the [`Executor::event()`] method. They can be activated
11//! by calling the [`Executor::notify_event()`] method.
12//!
13//! # Example
14//! ```
15//! # use simple_async_local_executor::*;
16//! let executor = Executor::default();
17//! let events = [executor.create_event_handle(), executor.create_event_handle()];
18//!
19//! async fn wait_event(events: [EventHandle; 2], executor: Executor) {
20//!     executor.event(&events[0]).await;
21//!     executor.event(&events[1]).await;
22//! }
23//!
24//! executor.spawn(wait_event(events.clone(), executor.clone()));
25//! assert_eq!(executor.step(), true);
26//! assert_eq!(executor.step(), true);
27//! executor.notify_event(&events[0]);
28//! assert_eq!(executor.step(), true);
29//! executor.notify_event(&events[1]);
30//! assert_eq!(executor.step(), false);
31//! ```
32
33use core::fmt;
34use std::{
35    cell::{Cell, RefCell},
36    future::Future,
37    hash::{Hash, Hasher},
38    pin::Pin,
39    ptr,
40    rc::Rc,
41    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
42};
43
44#[cfg(feature = "futures")]
45use futures::future::FusedFuture;
46use slab::Slab;
47
48// Useful reading for people interested in writing executors:
49// - https://os.phil-opp.com/async-await/
50// - https://rust-lang.github.io/async-book/02_execution/01_chapter.html
51// - https://github.com/rust-lang/rfcs/blob/master/text/2592-futures.md#rationale-drawbacks-and-alternatives-to-the-wakeup-design-waker
52
53fn dummy_raw_waker() -> RawWaker {
54    fn no_op(_: *const ()) {}
55    fn clone(_: *const ()) -> RawWaker {
56        dummy_raw_waker()
57    }
58
59    let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
60    RawWaker::new(std::ptr::null::<()>(), vtable)
61}
62fn dummy_waker() -> Waker {
63    unsafe { Waker::from_raw(dummy_raw_waker()) }
64}
65
66#[derive(Clone)]
67struct EventHandleInner {
68    index: usize,
69    executor: Rc<ExecutorInner>,
70}
71impl fmt::Debug for EventHandleInner {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        self.index.fmt(f)
74    }
75}
76impl Eq for EventHandleInner {}
77impl PartialEq for EventHandleInner {
78    fn eq(&self, other: &Self) -> bool {
79        self.index == other.index && ptr::eq(self.executor.as_ref(), other.executor.as_ref())
80    }
81}
82impl Hash for EventHandleInner {
83    fn hash<H: Hasher>(&self, state: &mut H) {
84        self.index.hash(state);
85        (self.executor.as_ref() as *const ExecutorInner).hash(state);
86    }
87}
88impl Drop for EventHandleInner {
89    fn drop(&mut self) {
90        self.executor.release_event_handle(self);
91    }
92}
93/// A handle for an event, can be kept and cloned around
94#[derive(Clone, Debug, PartialEq, Eq, Hash)]
95pub struct EventHandle(Rc<EventHandleInner>);
96
97type SharedBool = Rc<Cell<bool>>;
98
99/// A future to await an event
100pub struct EventFuture {
101    ready: SharedBool,
102    _handle: EventHandle,
103    done: bool,
104}
105impl Future for EventFuture {
106    type Output = ();
107    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
108        if self.ready.get() {
109            self.done = true;
110            Poll::Ready(())
111        } else {
112            Poll::Pending
113        }
114    }
115}
116#[cfg(feature = "futures")]
117impl FusedFuture for EventFuture {
118    fn is_terminated(&self) -> bool {
119        self.done
120    }
121}
122
123struct Task {
124    future: Pin<Box<dyn Future<Output = ()>>>,
125}
126impl Task {
127    pub fn new(future: impl Future<Output = ()> + 'static) -> Task {
128        Task {
129            future: Box::pin(future),
130        }
131    }
132    fn poll(&mut self, context: &mut Context) -> Poll<()> {
133        self.future.as_mut().poll(context)
134    }
135}
136
137#[derive(Default)]
138struct ExecutorInner {
139    task_queue: RefCell<Vec<Task>>,
140    new_tasks: RefCell<Vec<Task>>,
141    events: RefCell<Slab<SharedBool>>,
142}
143impl ExecutorInner {
144    fn release_event_handle(&self, event: &EventHandleInner) {
145        self.events.borrow_mut().remove(event.index);
146    }
147}
148
149/// Single-threaded polling-based executor
150///
151/// This is a thin-wrapper (using [`Rc`]) around the real executor, so that this struct can be
152/// cloned and passed around.
153///
154/// See the [module documentation] for more details.
155///
156/// [module documentation]: index.html
157#[derive(Clone, Default)]
158pub struct Executor {
159    inner: Rc<ExecutorInner>,
160}
161impl Executor {
162    /// Spawn a new task to be run by this executor.
163    ///
164    /// # Example
165    /// ```
166    /// # use simple_async_local_executor::*;
167    /// async fn nop() {}
168    /// let executor = Executor::default();
169    /// executor.spawn(nop());
170    /// assert_eq!(executor.step(), false);
171    /// ```
172    pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
173        self.inner.new_tasks.borrow_mut().push(Task::new(future));
174    }
175    /// Create an event handle, that can be used to [await](Executor::event()) and [notify](Executor::notify_event()) an event.
176    pub fn create_event_handle(&self) -> EventHandle {
177        let mut events = self.inner.events.borrow_mut();
178        let index = events.insert(Rc::new(Cell::new(false)));
179        EventHandle(Rc::new(EventHandleInner {
180            index,
181            executor: self.inner.clone(),
182        }))
183    }
184    /// Notify an event.
185    ///
186    /// All tasks currently waiting on this event will
187    /// progress at the next call to [`step`](Executor::step()).
188    pub fn notify_event(&self, handle: &EventHandle) {
189        self.inner.events.borrow_mut()[handle.0.index].replace(true);
190    }
191    /// Create an event future.
192    ///
193    /// Once this future is awaited, its task will be blocked until the next [`step`](Executor::step())
194    /// after [`notify_event`](Executor::notify_event()) is called with this `handle`.
195    pub fn event(&self, handle: &EventHandle) -> EventFuture {
196        let ready = self.inner.events.borrow_mut()[handle.0.index].clone();
197        EventFuture {
198            ready,
199            _handle: handle.clone(),
200            done: false,
201        }
202    }
203    /// Run each non-blocked task exactly once.
204    ///
205    /// Return whether there are any non-completed tasks.
206    ///
207    /// # Example
208    /// ```
209    /// # use simple_async_local_executor::*;
210    /// let executor = Executor::default();
211    /// let event = executor.create_event_handle();
212    /// async fn wait_event(event: EventHandle, executor: Executor) {
213    ///     executor.event(&event).await;
214    /// }
215    /// executor.spawn(wait_event(event.clone(), executor.clone()));
216    /// assert_eq!(executor.step(), true); // still one task in the queue
217    /// executor.notify_event(&event);
218    /// assert_eq!(executor.step(), false); // no more task in the queue
219    /// ```
220    pub fn step(&self) -> bool {
221        // dummy waker and context
222        let waker = dummy_waker();
223        let mut context = Context::from_waker(&waker);
224        // append new tasks to all tasks
225        let mut tasks = self.inner.task_queue.borrow_mut();
226        tasks.append(&mut self.inner.new_tasks.borrow_mut());
227        // go through all tasks, and keep uncompleted ones
228        let mut uncompleted_tasks = Vec::new();
229        let mut any_left = false;
230        for mut task in tasks.drain(..) {
231            match task.poll(&mut context) {
232                Poll::Ready(()) => {} // task done
233                Poll::Pending => {
234                    uncompleted_tasks.push(task);
235                    any_left = true;
236                }
237            }
238        }
239        // replace all tasks with uncompleted ones
240        *tasks = uncompleted_tasks;
241        // clear events
242        for (_, event) in self.inner.events.borrow_mut().iter_mut() {
243            event.replace(false);
244        }
245        any_left
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    async fn nop() {}
254
255    #[test]
256    fn test_nop() {
257        let executor = Executor::default();
258        executor.spawn(nop());
259        assert_eq!(executor.step(), false);
260    }
261
262    #[test]
263    fn test_event() {
264        let executor = Executor::default();
265        let events = [
266            executor.create_event_handle(),
267            executor.create_event_handle(),
268        ];
269
270        async fn wait_event(events: [EventHandle; 2], executor: Executor) {
271            println!("before awaits");
272            executor.event(&events[0]).await;
273            println!("between awaits");
274            executor.event(&events[1]).await;
275            println!("after awaits");
276        }
277
278        executor.spawn(wait_event(events.clone(), executor.clone()));
279        println!("spawned");
280        assert_eq!(executor.step(), true);
281        assert_eq!(executor.inner.task_queue.borrow().len(), 1);
282        println!("step 1");
283        assert_eq!(executor.step(), true);
284        println!("step 2");
285        executor.notify_event(&events[0]);
286        println!("notified 1");
287        assert_eq!(executor.step(), true);
288        executor.notify_event(&events[1]);
289        println!("notified 2");
290        assert_eq!(executor.step(), false);
291        println!("step 3");
292        assert_eq!(executor.inner.task_queue.borrow().len(), 0);
293    }
294
295    #[test]
296    #[cfg(feature = "futures")]
297    fn test_select() {
298        use futures::select;
299        let first_event_id = Rc::new(Cell::new(2));
300
301        async fn wait_event(
302            events: [EventHandle; 2],
303            event_loop: Executor,
304            first_event_id: Rc<Cell<usize>>,
305        ) {
306            println!("before select");
307            let mut fut0 = event_loop.event(&events[0]);
308            let mut fut1 = event_loop.event(&events[1]);
309            select! {
310                () = fut0 => { println!("event 0 fired first"); first_event_id.set(0); },
311                () = fut1 => { println!("event 1 fired first"); first_event_id.set(1); }
312            }
313            println!("after select");
314        }
315
316        for i in 0..2 {
317            println!("Testing event {}", i);
318            let executor = Executor::default();
319            {
320                let events = [
321                    executor.create_event_handle(),
322                    executor.create_event_handle(),
323                ];
324                executor.spawn(wait_event(
325                    events.clone(),
326                    executor.clone(),
327                    first_event_id.clone(),
328                ));
329                println!("spawned");
330                assert_eq!(executor.step(), true);
331                assert_eq!(executor.inner.task_queue.borrow().len(), 1);
332                println!("step 1");
333                assert_eq!(executor.step(), true);
334                println!("step 2");
335                executor.notify_event(&events[i]);
336                println!("notified");
337                assert_eq!(executor.step(), false);
338                println!("step 3");
339                assert_eq!(first_event_id.get(), i);
340                assert_eq!(executor.inner.task_queue.borrow().len(), 0);
341            }
342            assert_eq!(executor.inner.events.borrow().len(), 0);
343        }
344    }
345}