violet/
executor.rs

1use std::{
2    cell::RefCell,
3    marker::PhantomData,
4    pin::Pin,
5    rc::Rc,
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc,
9    },
10    task::{Context, Poll, Waker},
11};
12
13use futures::task::{waker, ArcWake, AtomicWaker};
14use parking_lot::Mutex;
15use slotmap::{new_key_type, SlotMap};
16
17use crate::{effect::Effect, Frame};
18
19new_key_type! {struct TaskId; }
20
21struct Task<Data> {
22    effect: Pin<Box<dyn Effect<Data>>>,
23    _marker: PhantomData<Data>,
24}
25
26impl<Data> Task<Data> {
27    fn new(effect: Pin<Box<dyn Effect<Data>>>) -> Self {
28        Self {
29            effect,
30            _marker: PhantomData,
31        }
32    }
33
34    pub fn poll(&mut self, context: &mut Context, data: &mut Data) -> Poll<()> {
35        self.effect.as_mut().poll(context, data)
36    }
37}
38
39struct TaskWaker {
40    id: TaskId,
41    shared: Arc<Shared>,
42}
43
44impl ArcWake for TaskWaker {
45    fn wake_by_ref(t: &Arc<Self>) {
46        t.shared.push_ready(t.id);
47    }
48}
49
50/// Is Send + Sync
51struct Shared {
52    woken: AtomicBool,
53    executor_waker: AtomicWaker,
54    ready: Mutex<Vec<TaskId>>,
55}
56
57impl Shared {
58    fn push_ready(&self, id: TaskId) {
59        self.ready.lock().push(id);
60        self.woken.store(true, Ordering::Relaxed);
61        self.executor_waker.wake();
62    }
63}
64
65/// Allows executing futures
66pub struct Executor<Data = Frame> {
67    tasks: SlotMap<TaskId, (Task<Data>, Waker)>,
68    processing: Vec<TaskId>,
69
70    shared: Arc<Shared>,
71    /// New tasks
72    incoming: Rc<RefCell<Vec<Task<Data>>>>,
73}
74
75pub struct Spawner<Data> {
76    incoming: std::rc::Weak<RefCell<Vec<Task<Data>>>>,
77}
78
79impl<Data> Spawner<Data> {
80    pub fn spawn(&self, effect: impl 'static + Effect<Data>) {
81        let incoming = self.incoming.upgrade().expect("Executor dropped");
82        let task = Task::new(Box::pin(effect));
83        incoming.borrow_mut().push(task);
84    }
85}
86
87impl<Data> Executor<Data> {
88    pub fn new() -> Self {
89        let shared = Arc::new(Shared {
90            executor_waker: AtomicWaker::new(),
91            ready: Default::default(),
92            woken: AtomicBool::new(false),
93        });
94
95        let incoming = Default::default();
96
97        Self {
98            tasks: SlotMap::with_key(),
99            shared,
100            processing: Vec::new(),
101            incoming,
102        }
103    }
104
105    /// Returns a thread local spawner
106    pub fn spawner(&self) -> Spawner<Data> {
107        Spawner {
108            incoming: Rc::downgrade(&self.incoming),
109        }
110    }
111
112    pub fn poll_tick(&mut self, data: &mut Data, cx: &mut Context<'_>) -> Poll<()> {
113        self.shared.executor_waker.register(cx.waker());
114
115        if self
116            .shared
117            .woken
118            .compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
119            .is_ok()
120        {
121            tracing::info!("Executor ready");
122            self.tick(data);
123            Poll::Ready(())
124        } else {
125            Poll::Pending
126        }
127    }
128
129    pub fn tick(&mut self, data: &mut Data) {
130        {
131            assert!(self.processing.is_empty());
132            core::mem::swap(&mut *self.shared.ready.lock(), &mut self.processing);
133        }
134
135        // Add new tasks
136        self.processing
137            .extend(self.incoming.borrow_mut().drain(..).map(|task| {
138                self.tasks.insert_with_key(|id| {
139                    let waker = waker(Arc::new(TaskWaker {
140                        id,
141                        shared: self.shared.clone(),
142                    }));
143
144                    (task, waker)
145                })
146            }));
147
148        for id in self.processing.drain(..) {
149            let (task, waker) = self.tasks.get_mut(id).unwrap();
150            let mut context = Context::from_waker(&*waker);
151            tracing::debug!(?id, "Polling task");
152
153            if task.poll(&mut context, data).is_ready() {
154                tracing::debug!(?id, "Task completed");
155                self.tasks.remove(id);
156            }
157        }
158    }
159}
160
161impl<Data> Default for Executor<Data> {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167#[cfg(test)]
168mod tests {
169
170    use crate::effect::FutureEffect;
171
172    use super::*;
173
174    #[test]
175    fn single_test() {
176        let (tx, rx) = flume::unbounded();
177
178        let mut ex = Executor::new();
179
180        let spawner = ex.spawner();
181
182        spawner.spawn(FutureEffect::new(rx.into_recv_async(), |data, val| {
183            *data = Some(val.unwrap());
184        }));
185
186        let mut data = None;
187
188        ex.tick(&mut data);
189        assert_eq!(data, None);
190
191        tx.send(5).unwrap();
192
193        ex.tick(&mut data);
194        assert_eq!(data, Some(5));
195    }
196}