1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
use std::fmt;
use std::io::ErrorKind;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering};
use std::sync::{Arc, PoisonError};
use std::time::Duration;
use crate::cancel::Cancel;
use crate::coroutine_impl::{co_cancel_data, run_coroutine, CoroutineImpl, EventSource};
use crate::scheduler::get_scheduler;
use crate::std::sync::atomic_dur::AtomicDuration;
use crate::std::sync::AtomicOption;
use crate::timeout_list::TimeoutHandle;
use crate::yield_now::{get_co_para, yield_now, yield_with};
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ParkError {
Canceled,
Timeout,
}
impl<T> From<PoisonError<T>> for ParkError {
fn from(_: PoisonError<T>) -> Self {
Self::Timeout
}
}
pub struct DropGuard<'a>(&'a Park);
pub struct Park {
// the coroutine that waiting for this park instance
wait_co: Arc<AtomicOption<CoroutineImpl>>,
// when odd means the Park no need to block
// the low bit used as flag, and higher bits used as flag to check the kernel delay drop
state: AtomicUsize,
// control how to deal with the cancellation, usually init one time
check_cancel: AtomicBool,
// timeout settings in ms, 0 is none (park forever)
timeout: AtomicDuration,
// timer handle, can be null
timeout_handle: AtomicPtr<TimeoutHandle<Arc<AtomicOption<CoroutineImpl>>>>,
// a flag if kernel is entered
wait_kernel: AtomicBool,
}
impl Default for Park {
fn default() -> Self {
Park::new()
}
}
// this is the park resource type (spmc style)
impl Park {
pub fn new() -> Self {
Park {
wait_co: Arc::new(AtomicOption::none()),
state: AtomicUsize::new(0),
check_cancel: true.into(),
timeout: AtomicDuration::new(None),
timeout_handle: AtomicPtr::new(ptr::null_mut()),
wait_kernel: AtomicBool::new(false),
}
}
// ignore cancel, if true, caller have to do the check instead
pub fn ignore_cancel(&self, ignore: bool) {
self.check_cancel.store(!ignore, Ordering::Relaxed);
}
#[inline]
fn set_timeout_handle(
&self,
handle: Option<TimeoutHandle<Arc<AtomicOption<CoroutineImpl>>>>,
) -> Option<TimeoutHandle<Arc<AtomicOption<CoroutineImpl>>>> {
let ptr = match handle {
None => ptr::null_mut(),
Some(h) => h.into_ptr(),
};
let old_ptr = self.timeout_handle.swap(ptr, Ordering::Relaxed);
if old_ptr.is_null() {
None
} else {
Some(unsafe { TimeoutHandle::from_ptr(old_ptr) })
}
}
// return true if need park the coroutine
// when the state is true, we clear it and indicate not to block
// when the state is false, means we need real park
#[inline]
fn check_park(&self) -> bool {
let mut state = self.state.load(Ordering::Acquire);
if state & 1 == 0 {
return true;
}
loop {
match self.state.compare_exchange_weak(
state,
state - 1,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => return false, // successfully consume the state, no need to block
Err(x) => {
if x & 1 == 0 {
return true;
}
state = x;
}
}
}
}
// unpark the underlying coroutine if any
#[inline]
pub(crate) fn unpark_impl(&self, b_sync: bool) {
let mut state = self.state.load(Ordering::Acquire);
if state & 1 == 1 {
// the state is already set do nothing here
return;
}
loop {
match self.state.compare_exchange_weak(
state,
state + 1,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => return self.wake_up(b_sync),
Err(x) => {
if x & 1 == 1 {
break; // already set, do nothing
}
state = x;
}
}
}
}
// unpark the underlying coroutine if any, push to the ready task queue
#[inline]
pub fn unpark(&self) {
self.unpark_impl(false);
}
// remove the timeout handle after return back to user space
#[inline]
fn remove_timeout_handle(&self) {
if let Some(h) = self.set_timeout_handle(None) {
if h.is_link() {
get_scheduler().del_timer(h);
}
// when timeout the node is unlinked
// just drop it to release memory
}
}
#[inline]
fn wake_up(&self, b_sync: bool) {
if let Some(co) = self.wait_co.take() {
if b_sync {
run_coroutine(co);
} else {
get_scheduler().schedule(co);
}
}
}
/// park current coroutine with specified timeout
/// if timeout happens, return Err(ParkError::Timeout)
/// if cancellation detected, return Err(ParkError::Canceled)
pub fn park_timeout(&self, dur: Option<Duration>) -> Result<(), ParkError> {
self.timeout.swap(dur);
// if the state is not set, need to wait
if !self.check_park() {
return Ok(());
}
// before a new yield wait the kernel done
if self.wait_kernel.swap(false, Ordering::AcqRel) {
while self.state.load(Ordering::Acquire) & 0x02 == 0x02 {
yield_now();
}
} else {
// should clear the generation
self.state.fetch_and(!0x02, Ordering::Release);
}
// what if the state is set before yield?
// the subscribe would re-check it
yield_with(self);
// clear the trigger state
self.check_park();
// remove timer handle
self.remove_timeout_handle();
// let _gen = self.state.load(Ordering::Acquire);
// println!("unparked gen={}, self={:p}", gen, self);
if let Some(err) = get_co_para() {
match err.kind() {
ErrorKind::TimedOut => return Err(ParkError::Timeout),
ErrorKind::Other => return Err(ParkError::Canceled),
_ => unreachable!("unexpected return error kind"),
}
}
Ok(())
}
fn delay_drop(&self) -> DropGuard {
self.wait_kernel.store(true, Ordering::Release);
DropGuard(self)
}
}
impl<'a> Drop for DropGuard<'a> {
fn drop(&mut self) {
// we would inc the state by 2 in kernel if all is done
self.0.state.fetch_add(0x02, Ordering::Release);
}
}
impl Drop for Park {
fn drop(&mut self) {
// wait the kernel finish
if !self.wait_kernel.load(Ordering::Acquire) {
return;
}
while self.state.load(Ordering::Acquire) & 0x02 == 0x02 {
yield_now();
}
self.set_timeout_handle(None);
}
}
impl EventSource for Park {
// register the coroutine to the park
fn subscribe(&mut self, co: CoroutineImpl) {
let cancel = co_cancel_data(&co);
// if we share the same park, the previous timer may wake up it by false
// if we not deleted the timer in time
let timeout_handle = self
.timeout
.swap(None)
.map(|dur| get_scheduler().add_timer(dur, self.wait_co.clone()));
self.set_timeout_handle(timeout_handle);
let _g = self.delay_drop();
// register the coroutine
self.wait_co.swap(co);
// re-check the state, only clear once after resume
if self.state.load(Ordering::Acquire) & 1 == 1 {
// here may have recursive call for subscribe
// normally the recursion depth is not too deep
return self.wake_up(true);
}
// register the cancel data
cancel.set_co(self.wait_co.clone());
// re-check the cancel status
if cancel.is_canceled() {
let _ = cancel.cancel();
}
}
// when the cancel is true we check the panic or do nothing
fn yield_back(&self, cancel: &'static Cancel) {
// we would inc the generation by 2 to another generation
self.state.fetch_add(0x02, Ordering::Release);
if self.check_cancel.load(Ordering::Relaxed) {
cancel.check_cancel();
}
}
}
impl fmt::Debug for Park {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Park {{ co: {:?}, state: {:?} }}",
self.wait_co, self.state
)
}
}