vride_runtime/
lib.rs

1use {
2    futures::{
3        future::{BoxFuture, FutureExt},
4        task::{waker_ref, ArcWake},
5    },
6    std::{
7        future::Future,
8        sync::mpsc::{sync_channel, Receiver, SyncSender},
9        sync::{Arc, Mutex},
10        task::{Context, Poll},
11        time::Duration,
12    }
13};
14
15/// Task executor that receives tasks off of a channel and runs them.
16pub struct Executor {
17    ready_queue: Receiver<Arc<Task>>,
18}
19
20impl Executor {
21    fn run(&self) {
22        while let Ok(task) = self.ready_queue.recv() {
23            // Take the future, and if it has not yet completed (is still Some),
24            // poll it in an attempt to complete it.
25            let mut future_slot = task.future.lock().unwrap();
26            if let Some(mut future) = future_slot.take() {
27                // Create a `LocalWaker` from the task itself
28                let waker = waker_ref(&task);
29                let context = &mut Context::from_waker(&*waker);
30                // `BoxFuture<T>` is a type alias for
31                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
32                // We can get a `Pin<&mut dyn Future + Send + 'static>`
33                // from it by calling the `Pin::as_mut` method.
34                if let Poll::Pending = future.as_mut().poll(context) {
35                    // We're not done processing the future, so put it
36                    // back in its task to be run again in the future.
37                    *future_slot = Some(future);
38                }
39            }
40        }
41    }
42}
43
44/// `Spawner` spawns new futures onto the task channel.
45#[derive(Clone)]
46pub struct Spawner {
47    task_sender: SyncSender<Arc<Task>>,
48}
49
50/// A future that can reschedule itself to be polled by an `Executor`.
51pub struct Task {
52    /// In-progress future that should be pushed to completion.
53    ///
54    /// The `Mutex` is not necessary for correctness, since we only have
55    /// one thread executing tasks at once. However, Rust isn't smart
56    /// enough to know that `future` is only mutated from one thread,
57    /// so we need to use the `Mutex` to prove thread-safety. A production
58    /// executor would not need this, and could use `UnsafeCell` instead.
59    future: Mutex<Option<BoxFuture<'static, ()>>>,
60
61    /// Handle to place the task itself back onto the task queue.
62    task_sender: SyncSender<Arc<Task>>,
63}
64
65pub fn new_executor_and_spawner() -> (Executor, Spawner) {
66    // Maximum number of tasks to allow queueing in the channel at once.
67    // This is just to make `sync_channel` happy, and wouldn't be present in
68    // a real executor.
69    const MAX_QUEUED_TASKS: usize = 10_000;
70    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
71    (Executor { ready_queue }, Spawner { task_sender })
72}
73
74impl Spawner {
75    fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {
76        let future = future.boxed();
77        let task = Arc::new(Task {
78            future: Mutex::new(Some(future)),
79            task_sender: self.task_sender.clone(),
80        });
81        self.task_sender.send(task).expect("too many tasks queued");
82    }
83}
84
85impl ArcWake for Task {
86    fn wake_by_ref(arc_self: &Arc<Self>) {
87        // Implement `wake` by sending this task back onto the task channel
88        // so that it will be polled again by the executor.
89        let cloned = arc_self.clone();
90        arc_self
91            .task_sender
92            .send(cloned)
93            .expect("too many tasks queued");
94    }
95}