1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
extern crate alloc;
use std::alloc::{Layout, dealloc};
use std::future::Future;
use std::mem::{self, ManuallyDrop};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::task::{Context, Poll, RawWaker, Waker};
use crate::loom_exports::debug_or_loom_assert;
use crate::loom_exports::sync::atomic::{self, AtomicU64, Ordering};
use super::util::RunOnDrop;
use super::{CLOSED, POLLING, REF_MASK, WAKE_MASK};
use super::{Task, raw_waker_vtable};
/// Virtual table for a `Runnable`.
#[derive(Debug)]
struct VTable {
run: unsafe fn(*const ()),
cancel: unsafe fn(*const ()),
}
/// Polls the inner future.
unsafe fn run<F, S, T>(ptr: *const ())
where
F: Future + Send + 'static,
F::Output: Send + 'static,
S: Fn(Runnable, T) + Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
{
let this = unsafe { &*(ptr as *const Task<F, S, T>) };
// A this point, the task cannot be in the `Completed` phase, otherwise
// it would not have been scheduled in the first place. It could,
// however, have been cancelled and transitioned from `Polling` to
// `Wind-down` after it was already scheduled. It is possible that in
// such case the `CLOSED` flag may not be visible when loading the
// state, but this is not a problem: when a task is cancelled while
// already scheduled (i.e. while the wake count is non-zero), its future
// is kept alive so even if the state loaded is stale, the worse that
// can happen is that the future will be unnecessarily polled.
//
// It is worth mentioning that, in order to detect if the task was
// awaken while polled, other executors reset a notification flag with
// an RMW when entering `run`. The idea here is to avoid such RMW and
// instead load a wake count. Only once the task has been polled, an RMW
// checks the wake count again to detect if the task was notified in the
// meantime. This method may be slightly more prone to spurious false
// positives but is much faster (1 vs 2 RMWs) and still prevent the
// occurrence of lost wake-ups.
// Load the state.
//
// Ordering: the below Acquire load synchronizes with the Release
// operation at the end of the call to `run` by the previous `Runnable`
// and ensures that the new state of the future stored by the previous
// call to `run` is visible. This synchronization exists because the RMW
// in the call to `Task::wake` or `Task::wake_by_ref` that scheduled
// this `Runnable` establishes a Release sequence. This load also
// synchronizes with the Release operation in `wake` and ensures that
// all memory operations performed by their callers are visible. Since
// this is a simple load, it may be stale and some wake requests may not
// be visible yet, but the post-polling RMW will later check if all wake
// requests were serviced.
let mut state = this.state.load(Ordering::Acquire);
let mut wake_count = state & WAKE_MASK;
debug_or_loom_assert!(state & POLLING == POLLING);
loop {
// Drop the future if the phase has transitioned to `Wind-down`.
if state & CLOSED == CLOSED {
unsafe { cancel::<F, S, T>(ptr) };
return;
}
// Poll the task.
let raw_waker = RawWaker::new(ptr, raw_waker_vtable::<F, S, T>());
let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) });
let cx = &mut Context::from_waker(&waker);
let fut = unsafe { Pin::new_unchecked(this.core.with_mut(|c| &mut *(*c).future)) };
// Set a panic guard to cancel the task if the future panics when
// polled.
let panic_guard = RunOnDrop::new(|| unsafe { cancel::<F, S, T>(ptr) });
let poll_state = fut.poll(cx);
mem::forget(panic_guard);
if let Poll::Ready(output) = poll_state {
// Set a panic guard to close the task if the future or the output
// panic when dropped. Miri complains if a reference to `this` is
// captured and `mem::forget` is called on the guard after
// deallocation, which is why the state is taken by pointer.
let state_ptr = &this.state as *const AtomicU64;
let panic_guard = RunOnDrop::new(|| {
// Clear the `POLLING` flag while setting the `CLOSED` flag
// to enter the `Closed` phase.
//
// Ordering: Release ordering on success is necessary to
// ensure that all memory operations on the future or the
// output are visible when the last reference deallocates
// the task.
let state = unsafe {
(*state_ptr)
.fetch_update(Ordering::Release, Ordering::Relaxed, |s| {
Some((s | CLOSED) & !POLLING)
})
.unwrap()
};
// Deallocate if there are no more references to the task.
if state & REF_MASK == 0 {
// Ensure that all atomic accesses to the state are
// visible.
//
// Ordering: this Acquire fence synchronizes with all
// Release operations that decrement the number of
// references to the task.
atomic::fence(Ordering::Acquire);
unsafe { dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>()) };
}
});
// Drop the future and publish its output.
this.core.with_mut(|c| unsafe {
ManuallyDrop::drop(&mut (*c).future);
(*c).output = ManuallyDrop::new(output);
});
// Clear the `POLLING` flag to enter the `Completed` phase,
// unless the task has concurrently transitioned to the
// `Wind-down` phase or unless this `Runnable` is the last
// reference to the task.
if this
.state
.fetch_update(Ordering::Release, Ordering::Relaxed, |s| {
if s & CLOSED == CLOSED || s & REF_MASK == 0 {
None
} else {
Some(s & !POLLING)
}
})
.is_ok()
{
mem::forget(panic_guard);
return;
}
// The task is in the `Wind-down` phase or this `Runnable`
// was the last reference, so the output must be dropped.
this.core
.with_mut(|c| unsafe { ManuallyDrop::drop(&mut (*c).output) });
mem::forget(panic_guard);
// Clear the `POLLING` flag to enter the `Closed` phase. This is
// not actually necessary if the `Runnable` is the last
// reference, but that should be a very rare occurrence.
//
// Ordering: Release ordering is necessary to ensure that the
// drop of the output is visible when the last reference
// deallocates the task.
state = this.state.fetch_and(!POLLING, Ordering::Release);
// Deallocate the task if there are no task references left.
if state & REF_MASK == 0 {
// Ensure that all atomic accesses to the state are visible.
//
// Ordering: this Acquire fence synchronizes with all
// Release operations that decrement the number of
// references to the task.
atomic::fence(Ordering::Acquire);
unsafe { dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>()) };
}
return;
}
// The future is `Pending`: try to reset the wake count.
//
// Ordering: a Release ordering is required in case the wake count
// is successfully cleared; it synchronizes, via a Release sequence,
// with the Acquire load upon entering `Runnable::run` the next time
// it is called. Acquire ordering is in turn necessary in case the
// wake count has changed and the future must be polled again; it
// synchronizes with the Release RMW in `wake` and ensures that all
// memory operations performed by their callers are visible when the
// polling loop is repeated.
state = this.state.fetch_sub(wake_count, Ordering::AcqRel);
debug_or_loom_assert!(state > wake_count);
wake_count = (state & WAKE_MASK) - wake_count;
// Return now if the wake count has been successfully cleared,
// provided that the task was not concurrently cancelled.
if wake_count == 0 && state & CLOSED == 0 {
// If there are no task references left, cancel and deallocate
// the task since it can never be scheduled again.
if state & REF_MASK == 0 {
let _drop_guard = RunOnDrop::new(|| {
unsafe { dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>()) };
});
// Drop the future;
this.core
.with_mut(|c| unsafe { ManuallyDrop::drop(&mut (*c).future) });
}
return;
}
}
}
/// Cancels the task, dropping the inner future.
unsafe fn cancel<F, S, T>(ptr: *const ())
where
F: Future + Send + 'static,
F::Output: Send + 'static,
S: Fn(Runnable, T) + Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
{
let this = unsafe { &*(ptr as *const Task<F, S, T>) };
// Ensure that the modifications of the future by the previous
// `Runnable` are visible.
//
// Ordering: this Acquire fence synchronizes with the Release operation
// at the end of the call to `run` by the previous `Runnable` and
// ensures that the new state of the future stored by the previous call
// to `run` is visible. This synchronization exists because the wake
// count RMW in the call to `Task::wake` that created this `Runnable`
// establishes a Release sequence.
atomic::fence(Ordering::Acquire);
// Set a drop guard to enter the `Closed` phase whether or not the
// future panics when dropped.
let _drop_guard = RunOnDrop::new(|| {
// Clear the `POLLING` flag while setting the `CLOSED` flag to enter
// the `Closed` phase.
//
// Ordering: Release ordering on success is necessary to ensure that
// all memory operations on the future are visible when the last
// reference deallocates the task.
let state = this
.state
.fetch_update(Ordering::Release, Ordering::Relaxed, |s| {
Some((s | CLOSED) & !POLLING)
})
.unwrap();
// Deallocate if there are no more references to the task.
if state & REF_MASK == 0 {
// Ensure that all atomic accesses to the state are visible.
//
// Ordering: this Acquire fence synchronizes with all Release
// operations that decrement the number of references to the
// task.
atomic::fence(Ordering::Acquire);
unsafe { dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>()) };
}
});
// Drop the future;
this.core
.with_mut(|c| unsafe { ManuallyDrop::drop(&mut (*c).future) });
}
/// Handle to a scheduled task.
///
/// Dropping the runnable directly instead of calling `run` cancels the task.
#[derive(Debug)]
pub(crate) struct Runnable {
task: *const (),
vtable: &'static VTable,
}
impl Runnable {
/// Creates a `Runnable`.
///
/// Safety: this is safe provided that:
///
/// - the task pointer points to a live task allocated with the global
/// allocator,
/// - there is not other live `Runnable` for this task,
/// - the wake count is non-zero,
/// - the `POLLING` flag is set and the `CLOSED` flag is cleared,
/// - the task contains a live future.
pub(super) unsafe fn new_unchecked<F, S, T>(task: *const Task<F, S, T>) -> Self
where
F: Future + Send + 'static,
F::Output: Send + 'static,
S: Fn(Runnable, T) + Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
{
Self {
task: task as *const (),
vtable: &VTable {
run: run::<F, S, T>,
cancel: cancel::<F, S, T>,
},
}
}
/// Polls the wrapped future.
pub(crate) fn run(self) {
// Prevent the drop handler from being called, as it would call `cancel`
// on the inner field.
let this = ManuallyDrop::new(self);
// Poll the future.
unsafe { (this.vtable.run)(this.task) }
}
}
impl Drop for Runnable {
fn drop(&mut self) {
// Cancel the task.
unsafe { (self.vtable.cancel)(self.task) }
}
}
unsafe impl Send for Runnable {}
impl UnwindSafe for Runnable {}
impl RefUnwindSafe for Runnable {}