1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use crate::layout_helpers::extend;
use crate::proc_stack::ProcStack;
use crate::proc_vtable::ProcVTable;
use crate::state::*;
use crossbeam_utils::Backoff;
use std::alloc::Layout;
use std::cell::Cell;
use std::fmt::{self, Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Waker;
/// The pdata of a proc.
///
/// This pdata is stored right at the beginning of every heap-allocated proc.
pub(crate) struct ProcData {
/// Current state of the proc.
///
/// Contains flags representing the current state and the reference count.
pub(crate) state: AtomicUsize,
/// The proc that is blocked on the `ProcHandle`.
///
/// This waker needs to be woken once the proc completes or is closed.
pub(crate) awaiter: Cell<Option<Waker>>,
/// The virtual table.
///
/// In addition to the actual waker virtual table, it also contains pointers to several other
/// methods necessary for bookkeeping the heap-allocated proc.
pub(crate) vtable: &'static ProcVTable,
}
impl ProcData {
/// Cancels the proc.
///
/// This method will only mark the proc as closed and will notify the awaiter, but it won't
/// reschedule the proc if it's not completed.
pub(crate) fn cancel(&self) {
let mut state = self.state.load(Ordering::Acquire);
loop {
// If the proc has been completed or closed, it can't be cancelled.
if state & (COMPLETED | CLOSED) != 0 {
break;
}
// Mark the proc as closed.
match self.state.compare_exchange_weak(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Notify the awaiter that the proc has been closed.
if state & AWAITER != 0 {
self.notify();
}
break;
}
Err(s) => state = s,
}
}
}
/// Notifies the proc blocked on the proc.
///
/// If there is a registered waker, it will be removed from the pdata and woken.
#[inline]
pub(crate) fn notify(&self) {
if let Some(waker) = self.swap_awaiter(None) {
// We need a safeguard against panics because waking can panic.
waker.wake();
}
}
/// Notifies the proc blocked on the proc unless its waker matches `current`.
///
/// If there is a registered waker, it will be removed from the pdata.
#[inline]
pub(crate) fn notify_unless(&self, current: &Waker) {
if let Some(waker) = self.swap_awaiter(None) {
if !waker.will_wake(current) {
// We need a safeguard against panics because waking can panic.
waker.wake();
}
}
}
/// Swaps the awaiter and returns the previous value.
#[inline]
pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> {
let new_is_none = new.is_none();
// We're about to try acquiring the lock in a loop. If it's already being held by another
// thread, we'll have to spin for a while so it's best to employ a backoff strategy.
let backoff = Backoff::new();
loop {
// Acquire the lock. If we're storing an awaiter, then also set the awaiter flag.
let state = if new_is_none {
self.state.fetch_or(LOCKED, Ordering::Acquire)
} else {
self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire)
};
// If the lock was acquired, break from the loop.
if state & LOCKED == 0 {
break;
}
// Snooze for a little while because the lock is held by another thread.
backoff.snooze();
}
// Replace the awaiter.
let old = self.awaiter.replace(new);
// Release the lock. If we've cleared the awaiter, then also unset the awaiter flag.
if new_is_none {
self.state.fetch_and(!LOCKED & !AWAITER, Ordering::Release);
} else {
self.state.fetch_and(!LOCKED, Ordering::Release);
}
old
}
#[inline]
pub(crate) fn offset_stack() -> usize {
let layout_pdata = Layout::new::<ProcData>();
let layout_stack = Layout::new::<ProcStack>();
let (_, offset_stack) = extend(layout_pdata, layout_stack);
offset_stack
}
}
impl Debug for ProcData {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);
fmt.debug_struct("ProcData")
.field("scheduled", &(state & SCHEDULED != 0))
.field("running", &(state & RUNNING != 0))
.field("completed", &(state & COMPLETED != 0))
.field("closed", &(state & CLOSED != 0))
.field("handle", &(state & HANDLE != 0))
.field("awaiter", &(state & AWAITER != 0))
.field("locked", &(state & LOCKED != 0))
.field("ref_count", &(state / REFERENCE))
.finish()
}
}