tokio_timer/timer/
entry.rs1use atomic::AtomicU64;
2use timer::{HandlePriv, Inner};
3use Error;
4
5use crossbeam_utils::CachePadded;
6use futures::task::AtomicTask;
7use futures::Poll;
8
9use std::cell::UnsafeCell;
10use std::ptr;
11use std::sync::atomic::AtomicBool;
12use std::sync::atomic::Ordering::{Relaxed, SeqCst};
13use std::sync::{Arc, Weak};
14use std::time::{Duration, Instant};
15use std::u64;
16
17#[derive(Debug)]
29pub(crate) struct Entry {
30 time: CachePadded<UnsafeCell<Time>>,
32
33 inner: Option<Weak<Inner>>,
38
39 state: AtomicU64,
49
50 task: AtomicTask,
52
53 pub(super) queued: AtomicBool,
58
59 pub(super) next_atomic: UnsafeCell<*mut Entry>,
65
66 when: UnsafeCell<Option<u64>>,
79
80 pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
84
85 pub(super) prev_stack: UnsafeCell<*const Entry>,
92}
93
94#[derive(Debug)]
96pub(crate) struct Time {
97 pub(crate) deadline: Instant,
98 pub(crate) duration: Duration,
99}
100
101const ELAPSED: u64 = 1 << 63;
103
104const ERROR: u64 = u64::MAX;
106
107impl Entry {
110 pub fn new(deadline: Instant, duration: Duration) -> Entry {
111 Entry {
112 time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })),
113 inner: None,
114 task: AtomicTask::new(),
115 state: AtomicU64::new(0),
116 queued: AtomicBool::new(false),
117 next_atomic: UnsafeCell::new(ptr::null_mut()),
118 when: UnsafeCell::new(None),
119 next_stack: UnsafeCell::new(None),
120 prev_stack: UnsafeCell::new(ptr::null_mut()),
121 }
122 }
123
124 pub fn time_ref(&self) -> &Time {
126 unsafe { &*self.time.get() }
127 }
128
129 pub fn time_mut(&self) -> &mut Time {
131 unsafe { &mut *self.time.get() }
132 }
133
134 pub fn is_registered(&self) -> bool {
137 self.inner.is_some()
138 }
139
140 pub fn register(me: &mut Arc<Self>) {
142 let handle = match HandlePriv::try_current() {
143 Ok(handle) => handle,
144 Err(_) => {
145 Arc::get_mut(me).unwrap().transition_to_error();
148
149 return;
150 }
151 };
152
153 Entry::register_with(me, handle)
154 }
155
156 pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
158 assert!(!me.is_registered(), "only register an entry once");
159
160 let deadline = me.time_ref().deadline;
161
162 let inner = match handle.inner() {
163 Some(inner) => inner,
164 None => {
165 Arc::get_mut(me).unwrap().transition_to_error();
168
169 return;
170 }
171 };
172
173 if inner.increment().is_err() {
175 Arc::get_mut(me).unwrap().transition_to_error();
176
177 return;
178 }
179
180 Arc::get_mut(me).unwrap().inner = Some(handle.into_inner());
182
183 let when = inner.normalize_deadline(deadline);
184
185 if when <= inner.elapsed() {
188 me.state.store(ELAPSED, Relaxed);
189 return;
190 } else {
191 me.state.store(when, Relaxed);
192 }
193
194 if inner.queue(me).is_err() {
195 me.error();
197 }
198 }
199
200 fn transition_to_error(&mut self) {
201 self.inner = Some(Weak::new());
202 self.state = AtomicU64::new(ERROR);
203 }
204
205 pub fn when_internal(&self) -> Option<u64> {
208 unsafe { (*self.when.get()) }
209 }
210
211 pub fn set_when_internal(&self, when: Option<u64>) {
212 unsafe {
213 (*self.when.get()) = when;
214 }
215 }
216
217 pub fn load_state(&self) -> Option<u64> {
219 let state = self.state.load(SeqCst);
220
221 if is_elapsed(state) {
222 None
223 } else {
224 Some(state)
225 }
226 }
227
228 pub fn is_elapsed(&self) -> bool {
229 let state = self.state.load(SeqCst);
230 is_elapsed(state)
231 }
232
233 pub fn fire(&self, when: u64) {
234 let mut curr = self.state.load(SeqCst);
235
236 loop {
237 if is_elapsed(curr) || curr > when {
238 return;
239 }
240
241 let next = ELAPSED | curr;
242 let actual = self.state.compare_and_swap(curr, next, SeqCst);
243
244 if curr == actual {
245 break;
246 }
247
248 curr = actual;
249 }
250
251 self.task.notify();
252 }
253
254 pub fn error(&self) {
255 let mut curr = self.state.load(SeqCst);
257
258 loop {
259 if is_elapsed(curr) {
260 return;
261 }
262
263 let next = ERROR;
264
265 let actual = self.state.compare_and_swap(curr, next, SeqCst);
266
267 if curr == actual {
268 break;
269 }
270
271 curr = actual;
272 }
273
274 self.task.notify();
275 }
276
277 pub fn cancel(entry: &Arc<Entry>) {
278 let state = entry.state.fetch_or(ELAPSED, SeqCst);
279
280 if is_elapsed(state) {
281 return;
283 }
284
285 let inner = match entry.upgrade_inner() {
287 Some(inner) => inner,
288 None => return,
289 };
290
291 let _ = inner.queue(entry);
292 }
293
294 pub fn poll_elapsed(&self) -> Poll<(), Error> {
295 use futures::Async::NotReady;
296
297 let mut curr = self.state.load(SeqCst);
298
299 if is_elapsed(curr) {
300 if curr == ERROR {
301 return Err(Error::shutdown());
302 } else {
303 return Ok(().into());
304 }
305 }
306
307 self.task.register();
308
309 curr = self.state.load(SeqCst).into();
310
311 if is_elapsed(curr) {
312 if curr == ERROR {
313 return Err(Error::shutdown());
314 } else {
315 return Ok(().into());
316 }
317 }
318
319 Ok(NotReady)
320 }
321
322 pub fn reset(entry: &mut Arc<Entry>) {
324 if !entry.is_registered() {
325 return;
326 }
327
328 let inner = match entry.upgrade_inner() {
329 Some(inner) => inner,
330 None => return,
331 };
332
333 let deadline = entry.time_ref().deadline;
334 let when = inner.normalize_deadline(deadline);
335 let elapsed = inner.elapsed();
336
337 let mut curr = entry.state.load(SeqCst);
338 let mut notify;
339
340 loop {
341 if curr == ERROR || curr == when {
346 return;
347 }
348
349 let next;
350
351 if when <= elapsed {
352 next = ELAPSED;
353 notify = !is_elapsed(curr);
354 } else {
355 next = when;
356 notify = true;
357 }
358
359 let actual = entry.state.compare_and_swap(curr, next, SeqCst);
360
361 if curr == actual {
362 break;
363 }
364
365 curr = actual;
366 }
367
368 if notify {
369 let _ = inner.queue(entry);
370 }
371 }
372
373 fn upgrade_inner(&self) -> Option<Arc<Inner>> {
374 self.inner.as_ref().and_then(|inner| inner.upgrade())
375 }
376}
377
378fn is_elapsed(state: u64) -> bool {
379 state & ELAPSED == ELAPSED
380}
381
382impl Drop for Entry {
383 fn drop(&mut self) {
384 let inner = match self.upgrade_inner() {
385 Some(inner) => inner,
386 None => return,
387 };
388
389 inner.decrement();
390 }
391}
392
393unsafe impl Send for Entry {}
394unsafe impl Sync for Entry {}