tokio_timer_patched/
timer.rs

1use {interval, Interval, Builder, wheel};
2use worker::Worker;
3use wheel::{Token, Wheel};
4
5use futures::{Future, Stream, Async, Poll};
6use futures::task::{self, Task};
7
8use std::{fmt, io};
9use std::error::Error;
10use std::time::{Duration, Instant};
11
12/// A facility for scheduling timeouts
13#[derive(Clone)]
14pub struct Timer {
15    worker: Worker,
16}
17
18/// A `Future` that does nothing and completes after the requested duration
19#[must_use = "futures do nothing unless polled"]
20#[derive(Debug)]
21pub struct Sleep {
22    timer: Timer,
23    when: Instant,
24    handle: Option<(Task, Token)>,
25}
26
27/// Allows a given `Future` to execute for a max duration
28#[must_use = "futures do nothing unless polled"]
29#[derive(Debug)]
30pub struct Timeout<T> {
31    future: Option<T>,
32    sleep: Sleep,
33}
34
35/// Allows a given `Stream` to take a max duration to yield the next value.
36#[derive(Debug)]
37pub struct TimeoutStream<T> {
38    stream: Option<T>,
39    duration: Duration,
40    sleep: Sleep,
41}
42
43/// The error type for timer operations.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum TimerError {
46    /// The requested timeout exceeds the timer's `max_timeout` setting.
47    TooLong,
48    /// The timer has reached capacity and cannot support new timeouts.
49    NoCapacity,
50}
51
52/// The error type for timeout operations.
53#[derive(Clone)]
54pub enum TimeoutError<T> {
55    /// An error caused by the timer
56    Timer(T, TimerError),
57    /// The operation timed out
58    TimedOut(T),
59}
60
61pub fn build(builder: Builder) -> Timer {
62    let wheel = Wheel::new(&builder);
63    let worker = Worker::spawn(wheel, builder);
64
65    Timer { worker: worker }
66}
67
68/*
69 *
70 * ===== Timer =====
71 *
72 */
73
74impl Timer {
75    /// Returns a future that completes once the given instant has been reached
76    pub fn sleep(&self, duration: Duration) -> Sleep {
77        Sleep::new(self.clone(), duration)
78    }
79
80    /// Allow the given future to execute for at most `duration` time.
81    ///
82    /// If the given future completes within the given time, then the `Timeout`
83    /// future will complete with that result. If `duration` expires, the
84    /// `Timeout` future completes with a `TimeoutError`.
85    pub fn timeout<F, E>(&self, future: F, duration: Duration) -> Timeout<F>
86        where F: Future<Error = E>,
87              E: From<TimeoutError<F>>,
88    {
89        Timeout {
90            future: Some(future),
91            sleep: self.sleep(duration),
92        }
93    }
94
95    /// Allow the given stream to execute for at most `duration` time per
96    /// yielded value.
97    ///
98    /// If the given stream yields a value within the allocated duration, then
99    /// value is returned and the timeout is reset for the next value. If the
100    /// `duration` expires, then the stream will error with a `TimeoutError`.
101    pub fn timeout_stream<T, E>(&self, stream: T, duration: Duration) -> TimeoutStream<T>
102        where T: Stream<Error = E>,
103              E: From<TimeoutError<T>>,
104    {
105        TimeoutStream {
106            stream: Some(stream),
107            duration: duration,
108            sleep: self.sleep(duration),
109        }
110    }
111
112    /// Creates a new interval which will fire at `dur` time into the future,
113    /// and will repeat every `dur` interval after
114    pub fn interval(&self, dur: Duration) -> Interval {
115        interval::new(self.sleep(dur), dur)
116    }
117
118    /// Creates a new interval which will fire at the time specified by `at`,
119    /// and then will repeat every `dur` interval after
120    pub fn interval_at(&self, at: Instant, dur: Duration) -> Interval {
121        let now = Instant::now();
122
123        let sleep = if at > now {
124            self.sleep(at - now)
125        } else {
126            self.sleep(Duration::from_millis(0))
127        };
128
129        interval::new(sleep, dur)
130    }
131}
132
133impl Default for Timer {
134    fn default() -> Timer {
135        wheel().build()
136    }
137}
138
139impl fmt::Debug for Timer {
140    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
141        write!(fmt, "Timer")
142    }
143}
144
145/*
146 *
147 * ===== Sleep =====
148 *
149 */
150
151impl Sleep {
152    /// Create a new `Sleep`
153    fn new(timer: Timer, duration: Duration) -> Sleep {
154        Sleep {
155            timer: timer,
156            when: Instant::now() + duration,
157            handle: None,
158        }
159    }
160
161    /// Returns true if the `Sleep` is expired.
162    ///
163    /// A `Sleep` is expired when the requested duration has elapsed. In
164    /// practice, the `Sleep` can expire slightly before the requested duration
165    /// as the timer is not precise.
166    ///
167    /// See the crate docs for more detail.
168    pub fn is_expired(&self) -> bool {
169        Instant::now() >= self.when - *self.timer.worker.tolerance()
170    }
171
172    /// Returns the duration remaining
173    pub fn remaining(&self) -> Duration {
174        let now = Instant::now();
175
176        if now >= self.when {
177            Duration::from_millis(0)
178        } else {
179            self.when - now
180        }
181    }
182
183    /// Returns a ref to the timer backing this `Sleep`
184    pub fn timer(&self) -> &Timer {
185        &self.timer
186    }
187}
188
189impl Future for Sleep {
190    type Item = ();
191    type Error = TimerError;
192
193    fn poll(&mut self) -> Poll<(), TimerError> {
194        if self.is_expired() {
195            return Ok(Async::Ready(()));
196        }
197
198        // The `Sleep` has not expired, so perform any necessary operations
199        // with the timer worker in order to get notified after the requested
200        // instant.
201
202        let handle = match self.handle {
203            None => {
204                // An wakeup request has not yet been sent to the timer. Before
205                // doing so, check to ensure that the requested duration does
206                // not exceed the `max_timeout` duration
207                let now = Instant::now();
208                if self.when > now && (self.when - now) > *self.timer.worker.max_timeout() {
209                    return Err(TimerError::TooLong);
210                }
211
212                // Get the current task handle
213                let task = task::current();
214
215                match self.timer.worker.set_timeout(self.when, task.clone()) {
216                    Ok(token) => {
217                        (task, token)
218                    }
219                    Err(task) => {
220                        // The timer is overloaded, yield the current task
221                        task.notify();
222                        return Ok(Async::NotReady);
223                    }
224                }
225            }
226            Some((ref task, token)) => {
227                if task.will_notify_current() {
228                    // Nothing more to do, the notify on timeout has already
229                    // been registered
230                    return Ok(Async::NotReady);
231                }
232
233                let task = task::current();
234
235                // The timeout has been moved to another task, in this case the
236                // timer has to be notified
237                match self.timer.worker.move_timeout(token, self.when, task.clone()) {
238                    Ok(_) => (task, token),
239                    Err(task) => {
240                        // Overloaded timer, yield hte current task
241                        task.notify();
242                        return Ok(Async::NotReady);
243                    }
244                }
245            }
246        };
247
248        // Moved out here to make the borrow checker happy
249        self.handle = Some(handle);
250
251        Ok(Async::NotReady)
252    }
253}
254
255impl Drop for Sleep {
256    fn drop(&mut self) {
257        if let Some((_, token)) = self.handle {
258            self.timer.worker.cancel_timeout(token, self.when);
259        }
260    }
261}
262
263/*
264 *
265 * ===== Timeout ====
266 *
267 */
268
269impl<T> Timeout<T> {
270    /// Gets a reference to the underlying future in this timeout.
271    ///
272    /// # Panics
273    ///
274    /// This function panics if the underlying future has already been consumed.
275    pub fn get_ref(&self) -> &T {
276        self.future.as_ref().expect("the future has already been consumed")
277    }
278
279    /// Gets a mutable reference to the underlying future in this timeout.
280    ///
281    /// # Panics
282    ///
283    /// This function panics if the underlying future has already been consumed.
284    pub fn get_mut(&mut self) -> &mut T {
285        self.future.as_mut().expect("the future has already been consumed")
286    }
287
288    /// Consumes this timeout, returning the underlying future.
289    ///
290    /// # Panics
291    ///
292    /// This function panics if the underlying future has already been consumed.
293    pub fn into_inner(self) -> T {
294        self.future.expect("the future has already been consumed")
295    }
296}
297
298impl<F, E> Future for Timeout<F>
299    where F: Future<Error = E>,
300          E: From<TimeoutError<F>>,
301{
302    type Item = F::Item;
303    type Error = E;
304
305    fn poll(&mut self) -> Poll<F::Item, E> {
306        // First, try polling the future
307        match self.future {
308            Some(ref mut f) => {
309                match f.poll() {
310                    Ok(Async::NotReady) => {}
311                    v => return v,
312                }
313            }
314            None => panic!("cannot call poll once value is consumed"),
315        }
316
317        // Now check the timer
318        match self.sleep.poll() {
319            Ok(Async::NotReady) => Ok(Async::NotReady),
320            Ok(Async::Ready(_)) => {
321                // Timeout has elapsed, error the future
322                let f = self.future.take().unwrap();
323                Err(TimeoutError::TimedOut(f).into())
324            }
325            Err(e) => {
326                // Something went wrong with the underlying timeout
327                let f = self.future.take().unwrap();
328                Err(TimeoutError::Timer(f, e).into())
329            }
330        }
331    }
332}
333
334/*
335 *
336 * ===== TimeoutStream ====
337 *
338 */
339
340impl<T> TimeoutStream<T> {
341    /// Gets a reference to the underlying stream in this timeout.
342    ///
343    /// # Panics
344    ///
345    /// This function panics if the underlying stream has already been consumed.
346    pub fn get_ref(&self) -> &T {
347        self.stream.as_ref().expect("the stream has already been consumed")
348    }
349
350    /// Gets a mutable reference to the underlying stream in this timeout.
351    ///
352    /// # Panics
353    ///
354    /// This function panics if the underlying stream has already been consumed.
355    pub fn get_mut(&mut self) -> &mut T {
356        self.stream.as_mut().expect("the stream has already been consumed")
357    }
358
359    /// Consumes this timeout, returning the underlying stream.
360    ///
361    /// # Panics
362    ///
363    /// This function panics if the underlying stream has already been consumed.
364    pub fn into_inner(self) -> T {
365        self.stream.expect("the stream has already been consumed")
366    }
367}
368
369impl<T, E> Stream for TimeoutStream<T>
370    where T: Stream<Error = E>,
371          E: From<TimeoutError<T>>,
372{
373    type Item = T::Item;
374    type Error = E;
375
376    fn poll(&mut self) -> Poll<Option<T::Item>, E> {
377        // First, try polling the future
378        match self.stream {
379            Some(ref mut s) => {
380                match s.poll() {
381                    Ok(Async::NotReady) => {}
382                    Ok(Async::Ready(Some(v))) => {
383                        // Reset the timeout
384                        self.sleep = Sleep::new(self.sleep.timer.clone(), self.duration);
385
386                        // Return the value
387                        return Ok(Async::Ready(Some(v)));
388                    }
389                    v => return v,
390                }
391            }
392            None => panic!("cannot call poll once value is consumed"),
393        }
394
395        // Now check the timer
396        match self.sleep.poll() {
397            Ok(Async::NotReady) => Ok(Async::NotReady),
398            Ok(Async::Ready(_)) => {
399                // Timeout has elapsed, error the future
400                let s = self.stream.take().unwrap();
401                Err(TimeoutError::TimedOut(s).into())
402            }
403            Err(e) => {
404                // Something went wrong with the underlying timeout
405                let s = self.stream.take().unwrap();
406                Err(TimeoutError::Timer(s, e).into())
407            }
408        }
409    }
410}
411
412/*
413 *
414 * ===== Errors =====
415 *
416 */
417
418impl fmt::Display for TimerError {
419    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
420        write!(fmt, "{}", Error::description(self))
421    }
422}
423
424impl Error for TimerError {
425    fn description(&self) -> &str {
426        match *self {
427            TimerError::TooLong => "requested timeout too long",
428            TimerError::NoCapacity => "timer out of capacity",
429        }
430    }
431}
432
433impl<T> fmt::Display for TimeoutError<T> {
434    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
435        write!(fmt, "{}", Error::description(self))
436    }
437}
438
439impl<T> fmt::Debug for TimeoutError<T> {
440    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
441        write!(fmt, "{}", Error::description(self))
442    }
443}
444
445impl<T> Error for TimeoutError<T> {
446    fn description(&self) -> &str {
447        use self::TimerError::*;
448        use self::TimeoutError::*;
449
450        match *self {
451            Timer(_, TooLong) => "requested timeout too long",
452            Timer(_, NoCapacity) => "timer out of capacity",
453            TimedOut(_) => "the future timed out",
454        }
455    }
456}
457
458impl<T> From<TimeoutError<T>> for io::Error {
459    fn from(src: TimeoutError<T>) -> io::Error {
460        use self::TimerError::*;
461        use self::TimeoutError::*;
462
463        match src {
464            Timer(_, TooLong) => io::Error::new(io::ErrorKind::InvalidInput, "requested timeout too long"),
465            Timer(_, NoCapacity) => io::Error::new(io::ErrorKind::Other, "timer out of capacity"),
466            TimedOut(_) => io::Error::new(io::ErrorKind::TimedOut, "the future timed out"),
467        }
468    }
469}
470
471impl From<TimerError> for io::Error {
472    fn from(src: TimerError) -> io::Error {
473        io::Error::new(io::ErrorKind::Other, src)
474    }
475}
476
477impl From<TimerError> for () {
478    fn from(_: TimerError) -> () {
479    }
480}
481
482impl<T> From<TimeoutError<T>> for () {
483    fn from(_: TimeoutError<T>) -> () {
484    }
485}