vride_runtime/lib.rs
1use {
2 futures::{
3 future::{BoxFuture, FutureExt},
4 task::{waker_ref, ArcWake},
5 },
6 std::{
7 future::Future,
8 sync::mpsc::{sync_channel, Receiver, SyncSender},
9 sync::{Arc, Mutex},
10 task::{Context, Poll},
11 time::Duration,
12 }
13};
14
15/// Task executor that receives tasks off of a channel and runs them.
16pub struct Executor {
17 ready_queue: Receiver<Arc<Task>>,
18}
19
20impl Executor {
21 fn run(&self) {
22 while let Ok(task) = self.ready_queue.recv() {
23 // Take the future, and if it has not yet completed (is still Some),
24 // poll it in an attempt to complete it.
25 let mut future_slot = task.future.lock().unwrap();
26 if let Some(mut future) = future_slot.take() {
27 // Create a `LocalWaker` from the task itself
28 let waker = waker_ref(&task);
29 let context = &mut Context::from_waker(&*waker);
30 // `BoxFuture<T>` is a type alias for
31 // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
32 // We can get a `Pin<&mut dyn Future + Send + 'static>`
33 // from it by calling the `Pin::as_mut` method.
34 if let Poll::Pending = future.as_mut().poll(context) {
35 // We're not done processing the future, so put it
36 // back in its task to be run again in the future.
37 *future_slot = Some(future);
38 }
39 }
40 }
41 }
42}
43
44/// `Spawner` spawns new futures onto the task channel.
45#[derive(Clone)]
46pub struct Spawner {
47 task_sender: SyncSender<Arc<Task>>,
48}
49
50/// A future that can reschedule itself to be polled by an `Executor`.
51pub struct Task {
52 /// In-progress future that should be pushed to completion.
53 ///
54 /// The `Mutex` is not necessary for correctness, since we only have
55 /// one thread executing tasks at once. However, Rust isn't smart
56 /// enough to know that `future` is only mutated from one thread,
57 /// so we need to use the `Mutex` to prove thread-safety. A production
58 /// executor would not need this, and could use `UnsafeCell` instead.
59 future: Mutex<Option<BoxFuture<'static, ()>>>,
60
61 /// Handle to place the task itself back onto the task queue.
62 task_sender: SyncSender<Arc<Task>>,
63}
64
65pub fn new_executor_and_spawner() -> (Executor, Spawner) {
66 // Maximum number of tasks to allow queueing in the channel at once.
67 // This is just to make `sync_channel` happy, and wouldn't be present in
68 // a real executor.
69 const MAX_QUEUED_TASKS: usize = 10_000;
70 let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
71 (Executor { ready_queue }, Spawner { task_sender })
72}
73
74impl Spawner {
75 fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {
76 let future = future.boxed();
77 let task = Arc::new(Task {
78 future: Mutex::new(Some(future)),
79 task_sender: self.task_sender.clone(),
80 });
81 self.task_sender.send(task).expect("too many tasks queued");
82 }
83}
84
85impl ArcWake for Task {
86 fn wake_by_ref(arc_self: &Arc<Self>) {
87 // Implement `wake` by sending this task back onto the task channel
88 // so that it will be polled again by the executor.
89 let cloned = arc_self.clone();
90 arc_self
91 .task_sender
92 .send(cloned)
93 .expect("too many tasks queued");
94 }
95}