1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
use alloc::sync::Arc;
use core::{
cell::Cell,
future::Future,
mem::offset_of,
pin::Pin,
sync::atomic::Ordering,
task::{Context, Poll},
};
mod atomic_waker;
mod queue;
mod task;
mod vtable;
mod waker;
use crate::{
runtime::schedular::task::{ErasedTask, Task},
util::Defer,
};
use queue::Queue;
use self::task::ErasedTaskPtr;
#[derive(Debug)]
pub enum SchedularPoll {
/// Returns that the schedular should yield back to the root schedular.
ShouldYield,
/// There was no work to be done.
Empty,
/// No work could be done.
Pending,
/// Work was done, but we didn't finish.
PendingProgress,
}
pub struct Schedular {
len: Cell<usize>,
should_poll: Arc<Queue>,
all_next: Cell<Option<ErasedTaskPtr>>,
all_prev: Cell<Option<ErasedTaskPtr>>,
}
impl Schedular {
/// Create a new schedular.
pub fn new() -> Self {
let queue = Arc::new(Queue::new());
unsafe {
Pin::new_unchecked(&*queue).init();
}
Schedular {
len: Cell::new(0),
should_poll: queue,
all_prev: Cell::new(None),
all_next: Cell::new(None),
}
}
/// Returns if there are no pending tasks.
pub fn is_empty(&self) -> bool {
self.all_next.get().is_none()
}
/// # Safety
/// This function erases any lifetime associated with the future.
/// Caller must ensure that either the future completes or is dropped before the lifetime
pub unsafe fn push<F>(&self, f: F)
where
F: Future<Output = ()>,
{
let queue = Arc::downgrade(&self.should_poll);
// These should always be the same as task has a repr(C);
assert_eq!(offset_of!(Task<F>, head), offset_of!(Task<u8>, head));
assert_eq!(offset_of!(Task<F>, body), offset_of!(Task<u8>, body));
let task = Arc::new(Task::new(queue, f));
// One count for the all list and one for the should_poll list.
let task = ErasedTask::new(task);
self.push_task_to_all(task.clone());
let task_ptr = ErasedTask::into_ptr(task);
Pin::new_unchecked(&*self.should_poll).push(task_ptr.as_node_ptr());
self.len.set(self.len.get() + 1);
}
/// Add a new task to the all task list.
/// The all task list owns a reference to the task while it is in the list.
unsafe fn push_task_to_all(&self, task: ErasedTask) {
let task = ErasedTask::into_ptr(task);
task.body().next.set(self.all_next.get());
if let Some(x) = self.all_next.get() {
x.body().prev.set(Some(task));
}
self.all_next.set(Some(task));
if self.all_prev.get().is_none() {
self.all_prev.set(Some(task));
}
}
/// Removes the task from the all task list.
/// Dropping the ownership the list has.
unsafe fn pop_task_all(&self, task: ErasedTaskPtr) {
task.body().queued.store(true, Ordering::Release);
if !task.body().done.replace(true) {
task.task_drop();
}
// detach the task from the all list
if let Some(next) = task.body().next.get() {
next.body().prev.set(task.body().prev.get())
} else {
self.all_prev.set(task.body().prev.get());
}
if let Some(prev) = task.body().prev.get() {
prev.body().next.set(task.body().next.get())
} else {
self.all_next.set(task.body().next.get());
}
let _ = unsafe { ErasedTask::from_ptr(task) };
// drop the ownership of the all list,
// Task is now dropped or only owned by wakers or
self.len.set(self.len.get() - 1);
}
pub unsafe fn poll(&self, cx: &mut Context) -> SchedularPoll {
// A task it's ownership is shared among a number of different places.
// - The all-task list
// - One or multiple wakers
// - The should_poll list if scheduled.
//
// When a task is retrieved from the should_poll list we transfer it's arc count to a
// waker. When a waker is cloned it also increments the arc count. If the waker is then
// woken up the count is transfered back to the should_poll list.
if self.is_empty() {
// No tasks, nothing to be done.
return SchedularPoll::Empty;
}
self.should_poll.waker().register(cx.waker());
let mut iteration = 0;
let mut yielded = 0;
loop {
// Popped a task, ownership taken from the queue
let cur = match Pin::new_unchecked(&*self.should_poll).pop() {
queue::Pop::Empty => {
if iteration > 0 {
return SchedularPoll::PendingProgress;
} else {
return SchedularPoll::Pending;
}
}
queue::Pop::Value(x) => x,
queue::Pop::Inconsistant => {
cx.waker().wake_by_ref();
return SchedularPoll::ShouldYield;
}
};
// Take ownership of the task from the schedular.
let cur_ptr = ErasedTaskPtr::from_nonnull(cur.cast());
let cur = ErasedTask::from_ptr(cur_ptr);
if cur.body().done.get() {
continue;
}
let prev = cur.body().queued.swap(false, Ordering::AcqRel);
assert!(prev);
// wakers owns the arc count of cur now until the end of the scope.
// So we can use cur_ptr until the end of the scope waker is only dropped then.
let waker = waker::get(cur);
let mut ctx = Context::from_waker(&waker);
// if drive_task panics we still want to remove the task from the list.
// So handle it with a drop
let remove = Defer::new((), |_| self.pop_task_all(cur_ptr));
iteration += 1;
match cur_ptr.task_drive(&mut ctx) {
Poll::Ready(_) => {
// Nothing todo the defer will remove the task from the list.
}
Poll::Pending => {
// don't remove task from the list.
remove.take();
// we had a pending and test if a yielded future immediatily queued itself
// again.
yielded += cur_ptr.body().queued.load(Ordering::Relaxed) as usize;
// If we polled all the futures atleas once,
// or more then one future immediatily queued itself after being polled,
// yield back to the parent schedular.
if yielded > 2 || iteration > self.len.get() {
cx.waker().wake_by_ref();
return SchedularPoll::ShouldYield;
}
}
}
}
}
/// Remove all tasks from the list.
pub fn clear(&self) {
// Clear all pending futures from the all list
while let Some(c) = self.all_next.get() {
unsafe { self.pop_task_all(c) }
}
loop {
let cur = match unsafe { Pin::new_unchecked(&*self.should_poll).pop() } {
queue::Pop::Empty => break,
queue::Pop::Value(x) => x,
queue::Pop::Inconsistant => {
#[cfg(feature = "std")]
std::thread::yield_now();
continue;
}
};
unsafe { ErasedTask::from_ptr(ErasedTaskPtr::from_nonnull(cur.cast())) };
}
}
}
impl Drop for Schedular {
fn drop(&mut self) {
self.clear()
}
}