Skip to main content

tokio_timer/timer/
entry.rs

1use atomic::AtomicU64;
2use timer::{HandlePriv, Inner};
3use Error;
4
5use crossbeam_utils::CachePadded;
6use futures::task::AtomicTask;
7use futures::Poll;
8
9use std::cell::UnsafeCell;
10use std::ptr;
11use std::sync::atomic::AtomicBool;
12use std::sync::atomic::Ordering::{Relaxed, SeqCst};
13use std::sync::{Arc, Weak};
14use std::time::{Duration, Instant};
15use std::u64;
16
17/// Internal state shared between a `Delay` instance and the timer.
18///
19/// This struct is used as a node in two intrusive data structures:
20///
21/// * An atomic stack used to signal to the timer thread that the entry state
22///   has changed. The timer thread will observe the entry on this stack and
23///   perform any actions as necessary.
24///
25/// * A doubly linked list used **only** by the timer thread. Each slot in the
26///   timer wheel is a head pointer to the list of entries that must be
27///   processed during that timer tick.
28#[derive(Debug)]
29pub(crate) struct Entry {
30    /// Only accessed from `Registration`.
31    time: CachePadded<UnsafeCell<Time>>,
32
33    /// Timer internals. Using a weak pointer allows the timer to shutdown
34    /// without all `Delay` instances having completed.
35    ///
36    /// When `None`, the entry has not yet been linked with a timer instance.
37    inner: Option<Weak<Inner>>,
38
39    /// Tracks the entry state. This value contains the following information:
40    ///
41    /// * The deadline at which the entry must be "fired".
42    /// * A flag indicating if the entry has already been fired.
43    /// * Whether or not the entry transitioned to the error state.
44    ///
45    /// When an `Entry` is created, `state` is initialized to the instant at
46    /// which the entry must be fired. When a timer is reset to a different
47    /// instant, this value is changed.
48    state: AtomicU64,
49
50    /// Task to notify once the deadline is reached.
51    task: AtomicTask,
52
53    /// True when the entry is queued in the "process" stack. This value
54    /// is set before pushing the value and unset after popping the value.
55    ///
56    /// TODO: This could possibly be rolled up into `state`.
57    pub(super) queued: AtomicBool,
58
59    /// Next entry in the "process" linked list.
60    ///
61    /// Access to this field is coordinated by the `queued` flag.
62    ///
63    /// Represents a strong Arc ref.
64    pub(super) next_atomic: UnsafeCell<*mut Entry>,
65
66    /// When the entry expires, relative to the `start` of the timer
67    /// (Inner::start). This is only used by the timer.
68    ///
69    /// A `Delay` instance can be reset to a different deadline by the thread
70    /// that owns the `Delay` instance. In this case, the timer thread will not
71    /// immediately know that this has happened. The timer thread must know the
72    /// last deadline that it saw as it uses this value to locate the entry in
73    /// its wheel.
74    ///
75    /// Once the timer thread observes that the instant has changed, it updates
76    /// the wheel and sets this value. The idea is that this value eventually
77    /// converges to the value of `state` as the timer thread makes updates.
78    when: UnsafeCell<Option<u64>>,
79
80    /// Next entry in the State's linked list.
81    ///
82    /// This is only accessed by the timer
83    pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
84
85    /// Previous entry in the State's linked list.
86    ///
87    /// This is only accessed by the timer and is used to unlink a canceled
88    /// entry.
89    ///
90    /// This is a weak reference.
91    pub(super) prev_stack: UnsafeCell<*const Entry>,
92}
93
94/// Stores the info for `Delay`.
95#[derive(Debug)]
96pub(crate) struct Time {
97    pub(crate) deadline: Instant,
98    pub(crate) duration: Duration,
99}
100
101/// Flag indicating a timer entry has elapsed
102const ELAPSED: u64 = 1 << 63;
103
104/// Flag indicating a timer entry has reached an error state
105const ERROR: u64 = u64::MAX;
106
107// ===== impl Entry =====
108
109impl Entry {
110    pub fn new(deadline: Instant, duration: Duration) -> Entry {
111        Entry {
112            time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })),
113            inner: None,
114            task: AtomicTask::new(),
115            state: AtomicU64::new(0),
116            queued: AtomicBool::new(false),
117            next_atomic: UnsafeCell::new(ptr::null_mut()),
118            when: UnsafeCell::new(None),
119            next_stack: UnsafeCell::new(None),
120            prev_stack: UnsafeCell::new(ptr::null_mut()),
121        }
122    }
123
124    /// Only called by `Registration`
125    pub fn time_ref(&self) -> &Time {
126        unsafe { &*self.time.get() }
127    }
128
129    /// Only called by `Registration`
130    pub fn time_mut(&self) -> &mut Time {
131        unsafe { &mut *self.time.get() }
132    }
133
134    /// Returns `true` if the `Entry` is currently associated with a timer
135    /// instance.
136    pub fn is_registered(&self) -> bool {
137        self.inner.is_some()
138    }
139
140    /// Only called by `Registration`
141    pub fn register(me: &mut Arc<Self>) {
142        let handle = match HandlePriv::try_current() {
143            Ok(handle) => handle,
144            Err(_) => {
145                // Could not associate the entry with a timer, transition the
146                // state to error
147                Arc::get_mut(me).unwrap().transition_to_error();
148
149                return;
150            }
151        };
152
153        Entry::register_with(me, handle)
154    }
155
156    /// Only called by `Registration`
157    pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
158        assert!(!me.is_registered(), "only register an entry once");
159
160        let deadline = me.time_ref().deadline;
161
162        let inner = match handle.inner() {
163            Some(inner) => inner,
164            None => {
165                // Could not associate the entry with a timer, transition the
166                // state to error
167                Arc::get_mut(me).unwrap().transition_to_error();
168
169                return;
170            }
171        };
172
173        // Increment the number of active timeouts
174        if inner.increment().is_err() {
175            Arc::get_mut(me).unwrap().transition_to_error();
176
177            return;
178        }
179
180        // Associate the entry with the timer
181        Arc::get_mut(me).unwrap().inner = Some(handle.into_inner());
182
183        let when = inner.normalize_deadline(deadline);
184
185        // Relaxed OK: At this point, there are no other threads that have
186        // access to this entry.
187        if when <= inner.elapsed() {
188            me.state.store(ELAPSED, Relaxed);
189            return;
190        } else {
191            me.state.store(when, Relaxed);
192        }
193
194        if inner.queue(me).is_err() {
195            // The timer has shutdown, transition the entry to the error state.
196            me.error();
197        }
198    }
199
200    fn transition_to_error(&mut self) {
201        self.inner = Some(Weak::new());
202        self.state = AtomicU64::new(ERROR);
203    }
204
205    /// The current entry state as known by the timer. This is not the value of
206    /// `state`, but lets the timer know how to converge its state to `state`.
207    pub fn when_internal(&self) -> Option<u64> {
208        unsafe { (*self.when.get()) }
209    }
210
211    pub fn set_when_internal(&self, when: Option<u64>) {
212        unsafe {
213            (*self.when.get()) = when;
214        }
215    }
216
217    /// Called by `Timer` to load the current value of `state` for processing
218    pub fn load_state(&self) -> Option<u64> {
219        let state = self.state.load(SeqCst);
220
221        if is_elapsed(state) {
222            None
223        } else {
224            Some(state)
225        }
226    }
227
228    pub fn is_elapsed(&self) -> bool {
229        let state = self.state.load(SeqCst);
230        is_elapsed(state)
231    }
232
233    pub fn fire(&self, when: u64) {
234        let mut curr = self.state.load(SeqCst);
235
236        loop {
237            if is_elapsed(curr) || curr > when {
238                return;
239            }
240
241            let next = ELAPSED | curr;
242            let actual = self.state.compare_and_swap(curr, next, SeqCst);
243
244            if curr == actual {
245                break;
246            }
247
248            curr = actual;
249        }
250
251        self.task.notify();
252    }
253
254    pub fn error(&self) {
255        // Only transition to the error state if not currently elapsed
256        let mut curr = self.state.load(SeqCst);
257
258        loop {
259            if is_elapsed(curr) {
260                return;
261            }
262
263            let next = ERROR;
264
265            let actual = self.state.compare_and_swap(curr, next, SeqCst);
266
267            if curr == actual {
268                break;
269            }
270
271            curr = actual;
272        }
273
274        self.task.notify();
275    }
276
277    pub fn cancel(entry: &Arc<Entry>) {
278        let state = entry.state.fetch_or(ELAPSED, SeqCst);
279
280        if is_elapsed(state) {
281            // Nothing more to do
282            return;
283        }
284
285        // If registered with a timer instance, try to upgrade the Arc.
286        let inner = match entry.upgrade_inner() {
287            Some(inner) => inner,
288            None => return,
289        };
290
291        let _ = inner.queue(entry);
292    }
293
294    pub fn poll_elapsed(&self) -> Poll<(), Error> {
295        use futures::Async::NotReady;
296
297        let mut curr = self.state.load(SeqCst);
298
299        if is_elapsed(curr) {
300            if curr == ERROR {
301                return Err(Error::shutdown());
302            } else {
303                return Ok(().into());
304            }
305        }
306
307        self.task.register();
308
309        curr = self.state.load(SeqCst).into();
310
311        if is_elapsed(curr) {
312            if curr == ERROR {
313                return Err(Error::shutdown());
314            } else {
315                return Ok(().into());
316            }
317        }
318
319        Ok(NotReady)
320    }
321
322    /// Only called by `Registration`
323    pub fn reset(entry: &mut Arc<Entry>) {
324        if !entry.is_registered() {
325            return;
326        }
327
328        let inner = match entry.upgrade_inner() {
329            Some(inner) => inner,
330            None => return,
331        };
332
333        let deadline = entry.time_ref().deadline;
334        let when = inner.normalize_deadline(deadline);
335        let elapsed = inner.elapsed();
336
337        let mut curr = entry.state.load(SeqCst);
338        let mut notify;
339
340        loop {
341            // In these two cases, there is no work to do when resetting the
342            // timer. If the `Entry` is in an error state, then it cannot be
343            // used anymore. If resetting the entry to the current value, then
344            // the reset is a noop.
345            if curr == ERROR || curr == when {
346                return;
347            }
348
349            let next;
350
351            if when <= elapsed {
352                next = ELAPSED;
353                notify = !is_elapsed(curr);
354            } else {
355                next = when;
356                notify = true;
357            }
358
359            let actual = entry.state.compare_and_swap(curr, next, SeqCst);
360
361            if curr == actual {
362                break;
363            }
364
365            curr = actual;
366        }
367
368        if notify {
369            let _ = inner.queue(entry);
370        }
371    }
372
373    fn upgrade_inner(&self) -> Option<Arc<Inner>> {
374        self.inner.as_ref().and_then(|inner| inner.upgrade())
375    }
376}
377
378fn is_elapsed(state: u64) -> bool {
379    state & ELAPSED == ELAPSED
380}
381
382impl Drop for Entry {
383    fn drop(&mut self) {
384        let inner = match self.upgrade_inner() {
385            Some(inner) => inner,
386            None => return,
387        };
388
389        inner.decrement();
390    }
391}
392
393unsafe impl Send for Entry {}
394unsafe impl Sync for Entry {}