tokio_timer/
timer.rs

1use {interval, Interval, Builder, wheel};
2use worker::Worker;
3use wheel::{Token, Wheel};
4
5use futures::{Future, Stream, Async, Poll};
6use futures::task::{self, Task};
7
8use std::fmt;
9use std::error::Error;
10use std::time::{Duration, Instant};
11
12/// A facility for scheduling timeouts
13#[derive(Clone)]
14pub struct Timer {
15    worker: Worker,
16}
17
18/// A `Future` that does nothing and completes after the requested duration
19#[must_use = "futures do nothing unless polled"]
20#[derive(Debug)]
21pub struct Sleep {
22    timer: Timer,
23    when: Instant,
24    handle: Option<(Task, Token)>,
25}
26
27/// Allows a given `Future` to execute for a max duration
28#[must_use = "futures do nothing unless polled"]
29#[derive(Debug)]
30pub struct Timeout<T> {
31    future: Option<T>,
32    sleep: Sleep,
33}
34
35/// Allows a given `Stream` to take a max duration to yield the next value.
36#[derive(Debug)]
37pub struct TimeoutStream<T> {
38    stream: Option<T>,
39    duration: Duration,
40    sleep: Sleep,
41}
42
43/// The error type for timer operations.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum TimerError {
46    /// The requested timeout exceeds the timer's `max_timeout` setting.
47    TooLong,
48    /// The timer has reached capacity and cannot support new timeouts.
49    NoCapacity,
50}
51
52/// The error type for timeout operations.
53#[derive(Clone)]
54pub enum TimeoutError<T, E> {
55    /// An error caused by the timer
56    Timer(T, TimerError),
57    /// The operation timed out
58    TimedOut(T),
59    /// Inner error
60    Inner(E),
61}
62
63pub fn build(builder: Builder) -> Timer {
64    let wheel = Wheel::new(&builder);
65    let worker = Worker::spawn(wheel, builder);
66
67    Timer { worker: worker }
68}
69
70/*
71 *
72 * ===== Timer =====
73 *
74 */
75
76impl Timer {
77    /// Returns a future that completes once the given instant has been reached
78    pub fn sleep(&self, duration: Duration) -> Sleep {
79        Sleep::new(self.clone(), duration)
80    }
81
82    /// Allow the given future to execute for at most `duration` time.
83    ///
84    /// If the given future completes within the given time, then the `Timeout`
85    /// future will complete with that result. If `duration` expires, the
86    /// `Timeout` future completes with a `TimeoutError`.
87    pub fn timeout<F>(&self, future: F, duration: Duration) -> Timeout<F>
88        where F: Future
89    {
90        Timeout {
91            future: Some(future),
92            sleep: self.sleep(duration),
93        }
94    }
95
96    /// Allow the given stream to execute for at most `duration` time per
97    /// yielded value.
98    ///
99    /// If the given stream yields a value within the allocated duration, then
100    /// value is returned and the timeout is reset for the next value. If the
101    /// `duration` expires, then the stream will error with a `TimeoutError`.
102    pub fn timeout_stream<T>(&self, stream: T, duration: Duration) -> TimeoutStream<T>
103        where T: Stream
104    {
105        TimeoutStream {
106            stream: Some(stream),
107            duration: duration,
108            sleep: self.sleep(duration),
109        }
110    }
111
112    /// Creates a new interval which will fire at `dur` time into the future,
113    /// and will repeat every `dur` interval after
114    pub fn interval(&self, dur: Duration) -> Interval {
115        interval::new(self.sleep(dur), dur)
116    }
117
118    /// Creates a new interval which will fire at the time specified by `at`,
119    /// and then will repeat every `dur` interval after
120    pub fn interval_at(&self, at: Instant, dur: Duration) -> Interval {
121        let now = Instant::now();
122
123        let sleep = if at > now {
124            self.sleep(at - now)
125        } else {
126            self.sleep(Duration::from_millis(0))
127        };
128
129        interval::new(sleep, dur)
130    }
131}
132
133impl Default for Timer {
134    fn default() -> Timer {
135        wheel().build()
136    }
137}
138
139impl fmt::Debug for Timer {
140    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
141        write!(fmt, "Timer")
142    }
143}
144
145/*
146 *
147 * ===== Sleep =====
148 *
149 */
150
151impl Sleep {
152    /// Create a new `Sleep`
153    fn new(timer: Timer, duration: Duration) -> Sleep {
154        Sleep {
155            timer: timer,
156            when: Instant::now() + duration,
157            handle: None,
158        }
159    }
160
161    /// Returns true if the `Sleep` is expired.
162    ///
163    /// A `Sleep` is expired when the requested duration has elapsed. In
164    /// practice, the `Sleep` can expire slightly before the requested duration
165    /// as the timer is not precise.
166    ///
167    /// See the crate docs for more detail.
168    pub fn is_expired(&self) -> bool {
169        Instant::now() >= self.when - *self.timer.worker.tolerance()
170    }
171
172    /// Returns the duration remaining
173    pub fn remaining(&self) -> Duration {
174        let now = Instant::now();
175
176        if now >= self.when {
177            Duration::from_millis(0)
178        } else {
179            self.when - now
180        }
181    }
182
183    /// Returns a ref to the timer backing this `Sleep`
184    pub fn timer(&self) -> &Timer {
185        &self.timer
186    }
187}
188
189impl Future for Sleep {
190    type Item = ();
191    type Error = TimerError;
192
193    fn poll(&mut self) -> Poll<(), TimerError> {
194        if self.is_expired() {
195            return Ok(Async::Ready(()));
196        }
197
198        // The `Sleep` has not expired, so perform any necessary operations
199        // with the timer worker in order to get notified after the requested
200        // instant.
201
202        let handle = match self.handle {
203            None => {
204                // An wakeup request has not yet been sent to the timer. Before
205                // doing so, check to ensure that the requested duration does
206                // not exceed the `max_timeout` duration
207                if (self.when - Instant::now()) > *self.timer.worker.max_timeout() {
208                    return Err(TimerError::TooLong);
209                }
210
211                // Get the current task handle
212                let task = task::current();
213
214                match self.timer.worker.set_timeout(self.when, task.clone()) {
215                    Ok(token) => {
216                        (task, token)
217                    }
218                    Err(task) => {
219                        // The timer is overloaded, yield the current task
220                        task.notify();
221                        return Ok(Async::NotReady);
222                    }
223                }
224            }
225            Some((ref task, token)) => {
226                if task.will_notify_current() {
227                    // Nothing more to do, the notify on timeout has already
228                    // been registered
229                    return Ok(Async::NotReady);
230                }
231
232                let task = task::current();
233
234                // The timeout has been moved to another task, in this case the
235                // timer has to be notified
236                match self.timer.worker.move_timeout(token, self.when, task.clone()) {
237                    Ok(_) => (task, token),
238                    Err(task) => {
239                        // Overloaded timer, yield hte current task
240                        task.notify();
241                        return Ok(Async::NotReady);
242                    }
243                }
244            }
245        };
246
247        // Moved out here to make the borrow checker happy
248        self.handle = Some(handle);
249
250        Ok(Async::NotReady)
251    }
252}
253
254impl Drop for Sleep {
255    fn drop(&mut self) {
256        if let Some((_, token)) = self.handle {
257            self.timer.worker.cancel_timeout(token, self.when);
258        }
259    }
260}
261
262/*
263 *
264 * ===== Timeout ====
265 *
266 */
267
268impl<T> Timeout<T> {
269    /// Gets a reference to the underlying future in this timeout.
270    ///
271    /// # Panics
272    ///
273    /// This function panics if the underlying future has already been consumed.
274    pub fn get_ref(&self) -> &T {
275        self.future.as_ref().expect("the future has already been consumed")
276    }
277
278    /// Gets a mutable reference to the underlying future in this timeout.
279    ///
280    /// # Panics
281    ///
282    /// This function panics if the underlying future has already been consumed.
283    pub fn get_mut(&mut self) -> &mut T {
284        self.future.as_mut().expect("the future has already been consumed")
285    }
286
287    /// Consumes this timeout, returning the underlying future.
288    ///
289    /// # Panics
290    ///
291    /// This function panics if the underlying future has already been consumed.
292    pub fn into_inner(self) -> T {
293        self.future.expect("the future has already been consumed")
294    }
295}
296
297impl<F, E> Future for Timeout<F>
298    where F: Future<Error = E>
299{
300    type Item = F::Item;
301    type Error = TimeoutError<F, E>;
302
303    fn poll(&mut self) -> Poll<F::Item, TimeoutError<F, E>> {
304        // First, try polling the future
305        match self.future {
306            Some(ref mut f) => {
307                match f.poll() {
308                    Ok(Async::NotReady) => {}
309                    Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
310                    Err(e) => return Err(TimeoutError::Inner(e)),
311                }
312            }
313            None => panic!("cannot call poll once value is consumed"),
314        }
315
316        // Now check the timer
317        match self.sleep.poll() {
318            Ok(Async::NotReady) => Ok(Async::NotReady),
319            Ok(Async::Ready(_)) => {
320                // Timeout has elapsed, error the future
321                let f = self.future.take().unwrap();
322                Err(TimeoutError::TimedOut(f).into())
323            }
324            Err(e) => {
325                // Something went wrong with the underlying timeout
326                let f = self.future.take().unwrap();
327                Err(TimeoutError::Timer(f, e).into())
328            }
329        }
330    }
331}
332
333/*
334 *
335 * ===== TimeoutStream ====
336 *
337 */
338
339impl<T> TimeoutStream<T> {
340    /// Gets a reference to the underlying stream in this timeout.
341    ///
342    /// # Panics
343    ///
344    /// This function panics if the underlying stream has already been consumed.
345    pub fn get_ref(&self) -> &T {
346        self.stream.as_ref().expect("the stream has already been consumed")
347    }
348
349    /// Gets a mutable reference to the underlying stream in this timeout.
350    ///
351    /// # Panics
352    ///
353    /// This function panics if the underlying stream has already been consumed.
354    pub fn get_mut(&mut self) -> &mut T {
355        self.stream.as_mut().expect("the stream has already been consumed")
356    }
357
358    /// Consumes this timeout, returning the underlying stream.
359    ///
360    /// # Panics
361    ///
362    /// This function panics if the underlying stream has already been consumed.
363    pub fn into_inner(self) -> T {
364        self.stream.expect("the stream has already been consumed")
365    }
366}
367
368impl<T, E> Stream for TimeoutStream<T>
369    where T: Stream<Error = E>
370{
371    type Item = T::Item;
372    type Error = TimeoutError<T, E>;
373
374    fn poll(&mut self) -> Poll<Option<T::Item>, TimeoutError<T, E>> {
375        // First, try polling the future
376        match self.stream {
377            Some(ref mut s) => {
378                match s.poll() {
379                    Ok(Async::NotReady) => {}
380                    Ok(Async::Ready(Some(v))) => {
381                        // Reset the timeout
382                        self.sleep = Sleep::new(self.sleep.timer.clone(), self.duration);
383
384                        // Return the value
385                        return Ok(Async::Ready(Some(v)));
386                    },
387                    Ok(Async::Ready(None)) => {
388                        return Ok(Async::Ready(None));
389                    },
390                    Err(e) => return Err(TimeoutError::Inner(e)),
391                }
392            }
393            None => panic!("cannot call poll once value is consumed"),
394        }
395
396        // Now check the timer
397        match self.sleep.poll() {
398            Ok(Async::NotReady) => Ok(Async::NotReady),
399            Ok(Async::Ready(_)) => {
400                // Timeout has elapsed, error the future
401                let s = self.stream.take().unwrap();
402                Err(TimeoutError::TimedOut(s).into())
403            }
404            Err(e) => {
405                // Something went wrong with the underlying timeout
406                let s = self.stream.take().unwrap();
407                Err(TimeoutError::Timer(s, e).into())
408            }
409        }
410    }
411}
412
413/*
414 *
415 * ===== Errors =====
416 *
417 */
418
419impl fmt::Display for TimerError {
420    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
421        write!(fmt, "{}", Error::description(self))
422    }
423}
424
425impl Error for TimerError {
426    fn description(&self) -> &str {
427        match *self {
428            TimerError::TooLong => "requested timeout too long",
429            TimerError::NoCapacity => "timer out of capacity",
430        }
431    }
432}
433
434impl<T, E: Error> fmt::Display for TimeoutError<T, E> {
435    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
436        write!(fmt, "{}", Error::description(self))
437    }
438}
439
440impl<T, E: Error> fmt::Debug for TimeoutError<T, E> {
441    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
442        write!(fmt, "{}", Error::description(self))
443    }
444}
445
446impl<T, E: Error> Error for TimeoutError<T, E> {
447    fn description(&self) -> &str {
448        use self::TimerError::*;
449        use self::TimeoutError::*;
450
451        match *self {
452            Timer(_, TooLong) => "requested timeout too long",
453            Timer(_, NoCapacity) => "timer out of capacity",
454            TimedOut(_) => "the future timed out",
455            Inner(_) => "inner error",
456        }
457    }
458}