1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
//! Simple executor to be used inside of an application runtime. The executor is
//! polled when needed by the runtime.
//!
//! This is partially copied and adapted from https://rust-lang.github.io/async-book/02_execution/04_executor.html.

use std::{
    future::Future,
    sync::{
        mpsc::{sync_channel, Receiver, SyncSender},
        Arc, Mutex,
    },
    task::Context,
};

use futures::{
    future::{BoxFuture, FutureExt},
    task::{waker_ref, ArcWake},
};

const MAX_QUEUED_TASKS: usize = 100_000;

/// Spawns a task onto the executor.
pub fn spawn(future: impl Future<Output = ()> + 'static + Send) {
    EXECUTOR.spawner.spawn(future);
}

pub(crate) fn poll_executor() {
    let executor = EXECUTOR.executor.lock().unwrap();
    executor.poll();
}

lazy_static! {
    static ref EXECUTOR: ExecutorPair = init();
}

struct ExecutorPair {
    executor: Mutex<Executor>,
    spawner: Spawner,
}

fn init() -> ExecutorPair {
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    let executor = Executor { ready_queue };
    let spawner = Spawner { task_sender };

    ExecutorPair {
        executor: Mutex::new(executor),
        spawner,
    }
}

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

impl Executor {
    fn poll(&self) {
        while let Ok(task) = self.ready_queue.try_recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if future.as_mut().poll(context).is_pending() {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need to use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

#[cfg(test)]
mod tests {
    use futures::channel::oneshot;

    use super::*;

    #[test]
    fn simple_two_tasks_channels() {
        let (sender1, mut receiver1) = oneshot::channel();
        spawn(async move {
            sender1.send("hello").unwrap();
        });

        // nothing has been executed yet, so should not have received on first channel
        assert!(receiver1.try_recv().unwrap().is_none());

        // create second task which will receive from first channel, then send to
        // another one
        let (sender2, mut receiver2) = oneshot::channel();
        spawn(async move {
            sender2.send(receiver1.await.unwrap()).unwrap();
        });

        // nothing has been executed yet, so should not have received on second channel
        assert!(receiver2.try_recv().unwrap().is_none());

        // poll executor, should have received from first channel and forward to second
        poll_executor();

        // second channel should have received
        assert_eq!(receiver2.try_recv().unwrap(), Some("hello"));
    }
}