1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use std::{
cell::RefCell,
collections::VecDeque,
future::Future,
rc::Rc,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
task::{Context, Poll},
time::Duration,
};
use waker_fn::waker_fn;
use super::reactor::Reactor;
use crate::{
local::TaskLocalStorage,
task::{Task, TaskMetadata},
};
type Runnable = async_task::Runnable<TaskMetadata>;
thread_local! {
pub(crate) static EXECUTOR: Executor = const { Executor::new() };
}
pub(crate) struct Executor {
queue: RefCell<VecDeque<Runnable>>,
reactor: RefCell<Reactor>,
pub(crate) tls: RefCell<Option<Rc<TaskLocalStorage>>>,
}
impl Executor {
pub const fn new() -> Self {
Self {
queue: RefCell::new(VecDeque::new()),
reactor: RefCell::new(Reactor::new()),
tls: RefCell::new(None),
}
}
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
let metadata = TaskMetadata {
tls: Rc::new(TaskLocalStorage::new()),
};
// SAFETY: `runnable` will never be moved off this thread or shared with another thread
// because of the `!Send + !Sync` bounds on `Self`. Both `future` and `schedule` are
// `'static` so they cannot be used after being freed.
//
// TODO: Makesure that the waker can never be sent off the thread.
let (runnable, task) = unsafe {
async_task::Builder::new()
.metadata(metadata)
.spawn_unchecked(
move |_| future,
|runnable| {
self.queue.borrow_mut().push_back(runnable);
},
)
};
runnable.schedule();
task
}
/// Run the provided closure with the reactor.
/// Used to ensure the thread safety of the executor.
pub(crate) fn with_reactor(&self, f: impl FnOnce(&mut Reactor)) {
f(&mut self.reactor.borrow_mut());
}
/// Wakes any expired sleepers, then polls a single task. If all tasks were sleeping,
/// returns how long it will be until one is awake.
pub(crate) fn tick(&self) -> Option<Duration> {
let next_wake = self.reactor.borrow_mut().tick();
let runnable = {
let mut queue = self.queue.borrow_mut();
queue.pop_front()
};
if let Some(runnable) = runnable {
TaskLocalStorage::scope(runnable.metadata().tls.clone(), || {
runnable.run();
});
None
} else {
Some(next_wake)
}
}
pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_fn({
let woken = woken.clone();
move || woken.store(true, Ordering::Relaxed)
});
let mut cx = Context::from_waker(&waker);
futures_util::pin_mut!(future);
let mut was_woken = true;
loop {
if was_woken && let Poll::Ready(output) = future.as_mut().poll(&mut cx) {
return output;
}
unsafe {
vex_sdk::vexTasksRun();
}
let next_wake = self.tick();
// This is updated only after the tick because another task could have woken up the
// future in that time.
was_woken = woken.swap(false, Ordering::Relaxed);
// Yield to OS on desktop platforms to avoid high CPU usage while all tasks are
// sleeping. On VEXos, this behavior is disabled so that devices are updated
// as fast as possible.
if cfg!(not(target_os = "vexos"))
&& let Some(next_wake) = next_wake
&& !was_woken
{
// We should still be polling vexTasksRun fairly often.
const MAX_YIELD: Duration = Duration::from_millis(5);
// N.B. `next_wake` takes into account when the future passed into
// this function will wake, if it is sleeping.
let time_to_yield = Duration::min(MAX_YIELD, next_wake);
std::thread::sleep(time_to_yield);
}
}
}
}
#[cfg(test)]
mod test {
use vex_sdk_mock as _;
use super::*;
#[test]
fn spawns_task() {
let executor = Executor::new();
let result = executor.block_on(executor.spawn(async { 1 }));
assert_eq!(result, 1);
}
}