Skip to main content

exocore_apps_sdk/
executor.rs

1//! Simple executor to be used inside of an application runtime. The executor is
2//! polled when needed by the runtime.
3//!
4//! This is partially copied and adapted from https://rust-lang.github.io/async-book/02_execution/04_executor.html.
5
6use std::{
7    future::Future,
8    sync::{
9        mpsc::{sync_channel, Receiver, SyncSender},
10        Arc, Mutex,
11    },
12    task::Context,
13};
14
15use futures::{
16    future::{BoxFuture, FutureExt},
17    task::{waker_ref, ArcWake},
18};
19
20const MAX_QUEUED_TASKS: usize = 100_000;
21
22/// Spawns a task onto the executor.
23pub fn spawn(future: impl Future<Output = ()> + 'static + Send) {
24    EXECUTOR.spawner.spawn(future);
25}
26
27pub(crate) fn poll_executor() {
28    let executor = EXECUTOR.executor.lock().unwrap();
29    executor.poll();
30}
31
32lazy_static! {
33    static ref EXECUTOR: ExecutorPair = init();
34}
35
36struct ExecutorPair {
37    executor: Mutex<Executor>,
38    spawner: Spawner,
39}
40
41fn init() -> ExecutorPair {
42    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
43    let executor = Executor { ready_queue };
44    let spawner = Spawner { task_sender };
45
46    ExecutorPair {
47        executor: Mutex::new(executor),
48        spawner,
49    }
50}
51
52/// Task executor that receives tasks off of a channel and runs them.
53struct Executor {
54    ready_queue: Receiver<Arc<Task>>,
55}
56
57impl Executor {
58    fn poll(&self) {
59        while let Ok(task) = self.ready_queue.try_recv() {
60            // Take the future, and if it has not yet completed (is still Some),
61            // poll it in an attempt to complete it.
62            let mut future_slot = task.future.lock().unwrap();
63            if let Some(mut future) = future_slot.take() {
64                // Create a `LocalWaker` from the task itself
65                let waker = waker_ref(&task);
66                let context = &mut Context::from_waker(&waker);
67                // `BoxFuture<T>` is a type alias for
68                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
69                // We can get a `Pin<&mut dyn Future + Send + 'static>`
70                // from it by calling the `Pin::as_mut` method.
71                if future.as_mut().poll(context).is_pending() {
72                    // We're not done processing the future, so put it
73                    // back in its task to be run again in the future.
74                    *future_slot = Some(future);
75                }
76            }
77        }
78    }
79}
80
81/// `Spawner` spawns new futures onto the task channel.
82#[derive(Clone)]
83struct Spawner {
84    task_sender: SyncSender<Arc<Task>>,
85}
86
87impl Spawner {
88    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
89        let future = future.boxed();
90        let task = Arc::new(Task {
91            future: Mutex::new(Some(future)),
92            task_sender: self.task_sender.clone(),
93        });
94        self.task_sender.send(task).expect("too many tasks queued");
95    }
96}
97
98/// A future that can reschedule itself to be polled by an `Executor`.
99struct Task {
100    /// In-progress future that should be pushed to completion.
101    ///
102    /// The `Mutex` is not necessary for correctness, since we only have
103    /// one thread executing tasks at once. However, Rust isn't smart
104    /// enough to know that `future` is only mutated from one thread,
105    /// so we need to use the `Mutex` to prove thread-safety. A production
106    /// executor would not need this, and could use `UnsafeCell` instead.
107    future: Mutex<Option<BoxFuture<'static, ()>>>,
108
109    /// Handle to place the task itself back onto the task queue.
110    task_sender: SyncSender<Arc<Task>>,
111}
112
113impl ArcWake for Task {
114    fn wake_by_ref(arc_self: &Arc<Self>) {
115        // Implement `wake` by sending this task back onto the task channel
116        // so that it will be polled again by the executor.
117        let cloned = arc_self.clone();
118        arc_self
119            .task_sender
120            .send(cloned)
121            .expect("too many tasks queued");
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use futures::channel::oneshot;
128
129    use super::*;
130
131    #[test]
132    fn simple_two_tasks_channels() {
133        let (sender1, mut receiver1) = oneshot::channel();
134        spawn(async move {
135            sender1.send("hello").unwrap();
136        });
137
138        // nothing has been executed yet, so should not have received on first channel
139        assert!(receiver1.try_recv().unwrap().is_none());
140
141        // create second task which will receive from first channel, then send to
142        // another one
143        let (sender2, mut receiver2) = oneshot::channel();
144        spawn(async move {
145            sender2.send(receiver1.await.unwrap()).unwrap();
146        });
147
148        // nothing has been executed yet, so should not have received on second channel
149        assert!(receiver2.try_recv().unwrap().is_none());
150
151        // poll executor, should have received from first channel and forward to second
152        poll_executor();
153
154        // second channel should have received
155        assert_eq!(receiver2.try_recv().unwrap(), Some("hello"));
156    }
157}