1use std::{
2 cell::RefCell,
3 marker::PhantomData,
4 pin::Pin,
5 rc::Rc,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9 },
10 task::{Context, Poll, Waker},
11};
12
13use futures::task::{waker, ArcWake, AtomicWaker};
14use parking_lot::Mutex;
15use slotmap::{new_key_type, SlotMap};
16
17use crate::{effect::Effect, Frame};
18
19new_key_type! {struct TaskId; }
20
21struct Task<Data> {
22 effect: Pin<Box<dyn Effect<Data>>>,
23 _marker: PhantomData<Data>,
24}
25
26impl<Data> Task<Data> {
27 fn new(effect: Pin<Box<dyn Effect<Data>>>) -> Self {
28 Self {
29 effect,
30 _marker: PhantomData,
31 }
32 }
33
34 pub fn poll(&mut self, context: &mut Context, data: &mut Data) -> Poll<()> {
35 self.effect.as_mut().poll(context, data)
36 }
37}
38
39struct TaskWaker {
40 id: TaskId,
41 shared: Arc<Shared>,
42}
43
44impl ArcWake for TaskWaker {
45 fn wake_by_ref(t: &Arc<Self>) {
46 t.shared.push_ready(t.id);
47 }
48}
49
50struct Shared {
52 woken: AtomicBool,
53 executor_waker: AtomicWaker,
54 ready: Mutex<Vec<TaskId>>,
55}
56
57impl Shared {
58 fn push_ready(&self, id: TaskId) {
59 self.ready.lock().push(id);
60 self.woken.store(true, Ordering::Relaxed);
61 self.executor_waker.wake();
62 }
63}
64
65pub struct Executor<Data = Frame> {
67 tasks: SlotMap<TaskId, (Task<Data>, Waker)>,
68 processing: Vec<TaskId>,
69
70 shared: Arc<Shared>,
71 incoming: Rc<RefCell<Vec<Task<Data>>>>,
73}
74
75pub struct Spawner<Data> {
76 incoming: std::rc::Weak<RefCell<Vec<Task<Data>>>>,
77}
78
79impl<Data> Spawner<Data> {
80 pub fn spawn(&self, effect: impl 'static + Effect<Data>) {
81 let incoming = self.incoming.upgrade().expect("Executor dropped");
82 let task = Task::new(Box::pin(effect));
83 incoming.borrow_mut().push(task);
84 }
85}
86
87impl<Data> Executor<Data> {
88 pub fn new() -> Self {
89 let shared = Arc::new(Shared {
90 executor_waker: AtomicWaker::new(),
91 ready: Default::default(),
92 woken: AtomicBool::new(false),
93 });
94
95 let incoming = Default::default();
96
97 Self {
98 tasks: SlotMap::with_key(),
99 shared,
100 processing: Vec::new(),
101 incoming,
102 }
103 }
104
105 pub fn spawner(&self) -> Spawner<Data> {
107 Spawner {
108 incoming: Rc::downgrade(&self.incoming),
109 }
110 }
111
112 pub fn poll_tick(&mut self, data: &mut Data, cx: &mut Context<'_>) -> Poll<()> {
113 self.shared.executor_waker.register(cx.waker());
114
115 if self
116 .shared
117 .woken
118 .compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
119 .is_ok()
120 {
121 tracing::info!("Executor ready");
122 self.tick(data);
123 Poll::Ready(())
124 } else {
125 Poll::Pending
126 }
127 }
128
129 pub fn tick(&mut self, data: &mut Data) {
130 {
131 assert!(self.processing.is_empty());
132 core::mem::swap(&mut *self.shared.ready.lock(), &mut self.processing);
133 }
134
135 self.processing
137 .extend(self.incoming.borrow_mut().drain(..).map(|task| {
138 self.tasks.insert_with_key(|id| {
139 let waker = waker(Arc::new(TaskWaker {
140 id,
141 shared: self.shared.clone(),
142 }));
143
144 (task, waker)
145 })
146 }));
147
148 for id in self.processing.drain(..) {
149 let (task, waker) = self.tasks.get_mut(id).unwrap();
150 let mut context = Context::from_waker(&*waker);
151 tracing::debug!(?id, "Polling task");
152
153 if task.poll(&mut context, data).is_ready() {
154 tracing::debug!(?id, "Task completed");
155 self.tasks.remove(id);
156 }
157 }
158 }
159}
160
161impl<Data> Default for Executor<Data> {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167#[cfg(test)]
168mod tests {
169
170 use crate::effect::FutureEffect;
171
172 use super::*;
173
174 #[test]
175 fn single_test() {
176 let (tx, rx) = flume::unbounded();
177
178 let mut ex = Executor::new();
179
180 let spawner = ex.spawner();
181
182 spawner.spawn(FutureEffect::new(rx.into_recv_async(), |data, val| {
183 *data = Some(val.unwrap());
184 }));
185
186 let mut data = None;
187
188 ex.tick(&mut data);
189 assert_eq!(data, None);
190
191 tx.send(5).unwrap();
192
193 ex.tick(&mut data);
194 assert_eq!(data, Some(5));
195 }
196}