Skip to main content

futures_timer/native/
delay.rs

1//! Support for creating futures that represent timeouts.
2//!
3//! This module contains the `Delay` type which is a future that will resolve
4//! at a particular point in the future.
5
6use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::atomic::AtomicUsize;
10use std::sync::atomic::Ordering::SeqCst;
11use std::sync::{Arc, Mutex};
12use std::task::{Context, Poll};
13use std::time::{Duration, Instant};
14
15use super::arc_list::Node;
16use super::AtomicWaker;
17use super::{ScheduledTimer, TimerHandle};
18
19/// A future representing the notification that an elapsed duration has
20/// occurred.
21///
22/// This is created through the `Delay::new` method indicating when the future should fire.
23/// Note that these futures are not intended for high resolution timers, but rather they will
24/// likely fire some granularity after the exact instant that they're otherwise indicated to fire
25/// at.
26pub struct Delay {
27    state: Option<Arc<Node<ScheduledTimer>>>,
28}
29
30impl Delay {
31    /// Creates a new future which will fire at `dur` time into the future.
32    ///
33    /// The returned object will be bound to the default timer for this thread.
34    /// The default timer will be spun up in a helper thread on first use.
35    #[inline]
36    pub fn new(dur: Duration) -> Delay {
37        Delay::new_handle(
38            Instant::now()
39                .checked_add(dur)
40                .unwrap_or_else(Self::far_future),
41            Default::default(),
42        )
43    }
44
45    /// Creates a new future which will fire at the time specified by `at`.
46    ///
47    /// The returned instance of `Delay` will be bound to the timer specified by
48    /// the `handle` argument.
49    pub(crate) fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
50        let inner = match handle.inner.upgrade() {
51            Some(i) => i,
52            None => return Delay { state: None },
53        };
54        let state = Arc::new(Node::new(ScheduledTimer {
55            at: Mutex::new(Some(at)),
56            state: AtomicUsize::new(0),
57            waker: AtomicWaker::new(),
58            inner: handle.inner,
59            slot: Mutex::new(None),
60        }));
61
62        // If we fail to actually push our node then we've become an inert
63        // timer, meaning that we'll want to immediately return an error from
64        // `poll`.
65        if inner.list.push(&state).is_err() {
66            return Delay { state: None };
67        }
68
69        inner.waker.wake();
70        Delay { state: Some(state) }
71    }
72
73    /// Resets this timeout to an new timeout which will fire at the time
74    /// specified by `at`.
75    #[inline]
76    pub fn reset(&mut self, dur: Duration) {
77        if self._reset(dur).is_err() {
78            self.state = None
79        }
80    }
81
82    fn far_future() -> Instant {
83        // 30 years from now, inspired by Tokio.
84        Instant::now() + Duration::from_secs(86400 * 365 * 30)
85    }
86
87    fn _reset(&mut self, dur: Duration) -> Result<(), ()> {
88        let state = match self.state {
89            Some(ref state) => state,
90            None => return Err(()),
91        };
92        if let Some(timeouts) = state.inner.upgrade() {
93            let mut bits = state.state.load(SeqCst);
94            loop {
95                // If we've been invalidated, cancel this reset
96                if bits & 0b10 != 0 {
97                    return Err(());
98                }
99                let new = bits.wrapping_add(0b100) & !0b11;
100                match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
101                    Ok(_) => break,
102                    Err(s) => bits = s,
103                }
104            }
105            *state.at.lock().unwrap() = Some(
106                Instant::now()
107                    .checked_add(dur)
108                    .unwrap_or_else(Self::far_future),
109            );
110            // If we fail to push our node then we've become an inert timer, so
111            // we'll want to clear our `state` field accordingly
112            timeouts.list.push(state)?;
113            timeouts.waker.wake();
114        }
115
116        Ok(())
117    }
118}
119
120impl Future for Delay {
121    type Output = ();
122
123    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124        let state = match self.state {
125            Some(ref state) => state,
126            None => panic!("timer has gone away"),
127        };
128
129        if state.state.load(SeqCst) & 1 != 0 {
130            return Poll::Ready(());
131        }
132
133        state.waker.register(cx.waker());
134
135        // Now that we've registered, do the full check of our own internal
136        // state. If we've fired the first bit is set, and if we've been
137        // invalidated the second bit is set.
138        match state.state.load(SeqCst) {
139            n if n & 0b01 != 0 => Poll::Ready(()),
140            n if n & 0b10 != 0 => panic!("timer has gone away"),
141            _ => Poll::Pending,
142        }
143    }
144}
145
146impl Drop for Delay {
147    fn drop(&mut self) {
148        let state = match self.state {
149            Some(ref s) => s,
150            None => return,
151        };
152        if let Some(timeouts) = state.inner.upgrade() {
153            *state.at.lock().unwrap() = None;
154            if timeouts.list.push(state).is_ok() {
155                timeouts.waker.wake();
156            }
157        }
158    }
159}
160
161impl fmt::Debug for Delay {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
163        f.debug_struct("Delay").finish()
164    }
165}