futures_timer/native/
delay.rs1use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::atomic::AtomicUsize;
10use std::sync::atomic::Ordering::SeqCst;
11use std::sync::{Arc, Mutex};
12use std::task::{Context, Poll};
13use std::time::{Duration, Instant};
14
15use super::arc_list::Node;
16use super::AtomicWaker;
17use super::{ScheduledTimer, TimerHandle};
18
19pub struct Delay {
27 state: Option<Arc<Node<ScheduledTimer>>>,
28}
29
30impl Delay {
31 #[inline]
36 pub fn new(dur: Duration) -> Delay {
37 Delay::new_handle(
38 Instant::now()
39 .checked_add(dur)
40 .unwrap_or_else(Self::far_future),
41 Default::default(),
42 )
43 }
44
45 pub(crate) fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
50 let inner = match handle.inner.upgrade() {
51 Some(i) => i,
52 None => return Delay { state: None },
53 };
54 let state = Arc::new(Node::new(ScheduledTimer {
55 at: Mutex::new(Some(at)),
56 state: AtomicUsize::new(0),
57 waker: AtomicWaker::new(),
58 inner: handle.inner,
59 slot: Mutex::new(None),
60 }));
61
62 if inner.list.push(&state).is_err() {
66 return Delay { state: None };
67 }
68
69 inner.waker.wake();
70 Delay { state: Some(state) }
71 }
72
73 #[inline]
76 pub fn reset(&mut self, dur: Duration) {
77 if self._reset(dur).is_err() {
78 self.state = None
79 }
80 }
81
82 fn far_future() -> Instant {
83 Instant::now() + Duration::from_secs(86400 * 365 * 30)
85 }
86
87 fn _reset(&mut self, dur: Duration) -> Result<(), ()> {
88 let state = match self.state {
89 Some(ref state) => state,
90 None => return Err(()),
91 };
92 if let Some(timeouts) = state.inner.upgrade() {
93 let mut bits = state.state.load(SeqCst);
94 loop {
95 if bits & 0b10 != 0 {
97 return Err(());
98 }
99 let new = bits.wrapping_add(0b100) & !0b11;
100 match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
101 Ok(_) => break,
102 Err(s) => bits = s,
103 }
104 }
105 *state.at.lock().unwrap() = Some(
106 Instant::now()
107 .checked_add(dur)
108 .unwrap_or_else(Self::far_future),
109 );
110 timeouts.list.push(state)?;
113 timeouts.waker.wake();
114 }
115
116 Ok(())
117 }
118}
119
120impl Future for Delay {
121 type Output = ();
122
123 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124 let state = match self.state {
125 Some(ref state) => state,
126 None => panic!("timer has gone away"),
127 };
128
129 if state.state.load(SeqCst) & 1 != 0 {
130 return Poll::Ready(());
131 }
132
133 state.waker.register(cx.waker());
134
135 match state.state.load(SeqCst) {
139 n if n & 0b01 != 0 => Poll::Ready(()),
140 n if n & 0b10 != 0 => panic!("timer has gone away"),
141 _ => Poll::Pending,
142 }
143 }
144}
145
146impl Drop for Delay {
147 fn drop(&mut self) {
148 let state = match self.state {
149 Some(ref s) => s,
150 None => return,
151 };
152 if let Some(timeouts) = state.inner.upgrade() {
153 *state.at.lock().unwrap() = None;
154 if timeouts.list.push(state).is_ok() {
155 timeouts.waker.wake();
156 }
157 }
158 }
159}
160
161impl fmt::Debug for Delay {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
163 f.debug_struct("Delay").finish()
164 }
165}