simple_async_local_executor/
lib.rs1#![warn(missing_docs)]
2
3use core::fmt;
34use std::{
35 cell::{Cell, RefCell},
36 future::Future,
37 hash::{Hash, Hasher},
38 pin::Pin,
39 ptr,
40 rc::Rc,
41 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
42};
43
44#[cfg(feature = "futures")]
45use futures::future::FusedFuture;
46use slab::Slab;
47
48fn dummy_raw_waker() -> RawWaker {
54 fn no_op(_: *const ()) {}
55 fn clone(_: *const ()) -> RawWaker {
56 dummy_raw_waker()
57 }
58
59 let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
60 RawWaker::new(std::ptr::null::<()>(), vtable)
61}
62fn dummy_waker() -> Waker {
63 unsafe { Waker::from_raw(dummy_raw_waker()) }
64}
65
66#[derive(Clone)]
67struct EventHandleInner {
68 index: usize,
69 executor: Rc<ExecutorInner>,
70}
71impl fmt::Debug for EventHandleInner {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 self.index.fmt(f)
74 }
75}
76impl Eq for EventHandleInner {}
77impl PartialEq for EventHandleInner {
78 fn eq(&self, other: &Self) -> bool {
79 self.index == other.index && ptr::eq(self.executor.as_ref(), other.executor.as_ref())
80 }
81}
82impl Hash for EventHandleInner {
83 fn hash<H: Hasher>(&self, state: &mut H) {
84 self.index.hash(state);
85 (self.executor.as_ref() as *const ExecutorInner).hash(state);
86 }
87}
88impl Drop for EventHandleInner {
89 fn drop(&mut self) {
90 self.executor.release_event_handle(self);
91 }
92}
93#[derive(Clone, Debug, PartialEq, Eq, Hash)]
95pub struct EventHandle(Rc<EventHandleInner>);
96
97type SharedBool = Rc<Cell<bool>>;
98
99pub struct EventFuture {
101 ready: SharedBool,
102 _handle: EventHandle,
103 done: bool,
104}
105impl Future for EventFuture {
106 type Output = ();
107 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
108 if self.ready.get() {
109 self.done = true;
110 Poll::Ready(())
111 } else {
112 Poll::Pending
113 }
114 }
115}
116#[cfg(feature = "futures")]
117impl FusedFuture for EventFuture {
118 fn is_terminated(&self) -> bool {
119 self.done
120 }
121}
122
123struct Task {
124 future: Pin<Box<dyn Future<Output = ()>>>,
125}
126impl Task {
127 pub fn new(future: impl Future<Output = ()> + 'static) -> Task {
128 Task {
129 future: Box::pin(future),
130 }
131 }
132 fn poll(&mut self, context: &mut Context) -> Poll<()> {
133 self.future.as_mut().poll(context)
134 }
135}
136
137#[derive(Default)]
138struct ExecutorInner {
139 task_queue: RefCell<Vec<Task>>,
140 new_tasks: RefCell<Vec<Task>>,
141 events: RefCell<Slab<SharedBool>>,
142}
143impl ExecutorInner {
144 fn release_event_handle(&self, event: &EventHandleInner) {
145 self.events.borrow_mut().remove(event.index);
146 }
147}
148
149#[derive(Clone, Default)]
158pub struct Executor {
159 inner: Rc<ExecutorInner>,
160}
161impl Executor {
162 pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
173 self.inner.new_tasks.borrow_mut().push(Task::new(future));
174 }
175 pub fn create_event_handle(&self) -> EventHandle {
177 let mut events = self.inner.events.borrow_mut();
178 let index = events.insert(Rc::new(Cell::new(false)));
179 EventHandle(Rc::new(EventHandleInner {
180 index,
181 executor: self.inner.clone(),
182 }))
183 }
184 pub fn notify_event(&self, handle: &EventHandle) {
189 self.inner.events.borrow_mut()[handle.0.index].replace(true);
190 }
191 pub fn event(&self, handle: &EventHandle) -> EventFuture {
196 let ready = self.inner.events.borrow_mut()[handle.0.index].clone();
197 EventFuture {
198 ready,
199 _handle: handle.clone(),
200 done: false,
201 }
202 }
203 pub fn step(&self) -> bool {
221 let waker = dummy_waker();
223 let mut context = Context::from_waker(&waker);
224 let mut tasks = self.inner.task_queue.borrow_mut();
226 tasks.append(&mut self.inner.new_tasks.borrow_mut());
227 let mut uncompleted_tasks = Vec::new();
229 let mut any_left = false;
230 for mut task in tasks.drain(..) {
231 match task.poll(&mut context) {
232 Poll::Ready(()) => {} Poll::Pending => {
234 uncompleted_tasks.push(task);
235 any_left = true;
236 }
237 }
238 }
239 *tasks = uncompleted_tasks;
241 for (_, event) in self.inner.events.borrow_mut().iter_mut() {
243 event.replace(false);
244 }
245 any_left
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 async fn nop() {}
254
255 #[test]
256 fn test_nop() {
257 let executor = Executor::default();
258 executor.spawn(nop());
259 assert_eq!(executor.step(), false);
260 }
261
262 #[test]
263 fn test_event() {
264 let executor = Executor::default();
265 let events = [
266 executor.create_event_handle(),
267 executor.create_event_handle(),
268 ];
269
270 async fn wait_event(events: [EventHandle; 2], executor: Executor) {
271 println!("before awaits");
272 executor.event(&events[0]).await;
273 println!("between awaits");
274 executor.event(&events[1]).await;
275 println!("after awaits");
276 }
277
278 executor.spawn(wait_event(events.clone(), executor.clone()));
279 println!("spawned");
280 assert_eq!(executor.step(), true);
281 assert_eq!(executor.inner.task_queue.borrow().len(), 1);
282 println!("step 1");
283 assert_eq!(executor.step(), true);
284 println!("step 2");
285 executor.notify_event(&events[0]);
286 println!("notified 1");
287 assert_eq!(executor.step(), true);
288 executor.notify_event(&events[1]);
289 println!("notified 2");
290 assert_eq!(executor.step(), false);
291 println!("step 3");
292 assert_eq!(executor.inner.task_queue.borrow().len(), 0);
293 }
294
295 #[test]
296 #[cfg(feature = "futures")]
297 fn test_select() {
298 use futures::select;
299 let first_event_id = Rc::new(Cell::new(2));
300
301 async fn wait_event(
302 events: [EventHandle; 2],
303 event_loop: Executor,
304 first_event_id: Rc<Cell<usize>>,
305 ) {
306 println!("before select");
307 let mut fut0 = event_loop.event(&events[0]);
308 let mut fut1 = event_loop.event(&events[1]);
309 select! {
310 () = fut0 => { println!("event 0 fired first"); first_event_id.set(0); },
311 () = fut1 => { println!("event 1 fired first"); first_event_id.set(1); }
312 }
313 println!("after select");
314 }
315
316 for i in 0..2 {
317 println!("Testing event {}", i);
318 let executor = Executor::default();
319 {
320 let events = [
321 executor.create_event_handle(),
322 executor.create_event_handle(),
323 ];
324 executor.spawn(wait_event(
325 events.clone(),
326 executor.clone(),
327 first_event_id.clone(),
328 ));
329 println!("spawned");
330 assert_eq!(executor.step(), true);
331 assert_eq!(executor.inner.task_queue.borrow().len(), 1);
332 println!("step 1");
333 assert_eq!(executor.step(), true);
334 println!("step 2");
335 executor.notify_event(&events[i]);
336 println!("notified");
337 assert_eq!(executor.step(), false);
338 println!("step 3");
339 assert_eq!(first_event_id.get(), i);
340 assert_eq!(executor.inner.task_queue.borrow().len(), 0);
341 }
342 assert_eq!(executor.inner.events.borrow().len(), 0);
343 }
344 }
345}