1use {interval, Interval, Builder, wheel};
2use worker::Worker;
3use wheel::{Token, Wheel};
4
5use futures::{Future, Stream, Async, Poll};
6use futures::task::{self, Task};
7
8use std::{fmt, io};
9use std::error::Error;
10use std::time::{Duration, Instant};
11
12#[derive(Clone)]
14pub struct Timer {
15 worker: Worker,
16}
17
18#[must_use = "futures do nothing unless polled"]
20#[derive(Debug)]
21pub struct Sleep {
22 timer: Timer,
23 when: Instant,
24 handle: Option<(Task, Token)>,
25}
26
27#[must_use = "futures do nothing unless polled"]
29#[derive(Debug)]
30pub struct Timeout<T> {
31 future: Option<T>,
32 sleep: Sleep,
33}
34
35#[derive(Debug)]
37pub struct TimeoutStream<T> {
38 stream: Option<T>,
39 duration: Duration,
40 sleep: Sleep,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum TimerError {
46 TooLong,
48 NoCapacity,
50}
51
52#[derive(Clone)]
54pub enum TimeoutError<T> {
55 Timer(T, TimerError),
57 TimedOut(T),
59}
60
61pub fn build(builder: Builder) -> Timer {
62 let wheel = Wheel::new(&builder);
63 let worker = Worker::spawn(wheel, builder);
64
65 Timer { worker: worker }
66}
67
68impl Timer {
75 pub fn sleep(&self, duration: Duration) -> Sleep {
77 Sleep::new(self.clone(), duration)
78 }
79
80 pub fn timeout<F, E>(&self, future: F, duration: Duration) -> Timeout<F>
86 where F: Future<Error = E>,
87 E: From<TimeoutError<F>>,
88 {
89 Timeout {
90 future: Some(future),
91 sleep: self.sleep(duration),
92 }
93 }
94
95 pub fn timeout_stream<T, E>(&self, stream: T, duration: Duration) -> TimeoutStream<T>
102 where T: Stream<Error = E>,
103 E: From<TimeoutError<T>>,
104 {
105 TimeoutStream {
106 stream: Some(stream),
107 duration: duration,
108 sleep: self.sleep(duration),
109 }
110 }
111
112 pub fn interval(&self, dur: Duration) -> Interval {
115 interval::new(self.sleep(dur), dur)
116 }
117
118 pub fn interval_at(&self, at: Instant, dur: Duration) -> Interval {
121 let now = Instant::now();
122
123 let sleep = if at > now {
124 self.sleep(at - now)
125 } else {
126 self.sleep(Duration::from_millis(0))
127 };
128
129 interval::new(sleep, dur)
130 }
131}
132
133impl Default for Timer {
134 fn default() -> Timer {
135 wheel().build()
136 }
137}
138
139impl fmt::Debug for Timer {
140 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
141 write!(fmt, "Timer")
142 }
143}
144
145impl Sleep {
152 fn new(timer: Timer, duration: Duration) -> Sleep {
154 Sleep {
155 timer: timer,
156 when: Instant::now() + duration,
157 handle: None,
158 }
159 }
160
161 pub fn is_expired(&self) -> bool {
169 Instant::now() >= self.when - *self.timer.worker.tolerance()
170 }
171
172 pub fn remaining(&self) -> Duration {
174 let now = Instant::now();
175
176 if now >= self.when {
177 Duration::from_millis(0)
178 } else {
179 self.when - now
180 }
181 }
182
183 pub fn timer(&self) -> &Timer {
185 &self.timer
186 }
187}
188
189impl Future for Sleep {
190 type Item = ();
191 type Error = TimerError;
192
193 fn poll(&mut self) -> Poll<(), TimerError> {
194 if self.is_expired() {
195 return Ok(Async::Ready(()));
196 }
197
198 let handle = match self.handle {
203 None => {
204 let now = Instant::now();
208 if self.when > now && (self.when - now) > *self.timer.worker.max_timeout() {
209 return Err(TimerError::TooLong);
210 }
211
212 let task = task::current();
214
215 match self.timer.worker.set_timeout(self.when, task.clone()) {
216 Ok(token) => {
217 (task, token)
218 }
219 Err(task) => {
220 task.notify();
222 return Ok(Async::NotReady);
223 }
224 }
225 }
226 Some((ref task, token)) => {
227 if task.will_notify_current() {
228 return Ok(Async::NotReady);
231 }
232
233 let task = task::current();
234
235 match self.timer.worker.move_timeout(token, self.when, task.clone()) {
238 Ok(_) => (task, token),
239 Err(task) => {
240 task.notify();
242 return Ok(Async::NotReady);
243 }
244 }
245 }
246 };
247
248 self.handle = Some(handle);
250
251 Ok(Async::NotReady)
252 }
253}
254
255impl Drop for Sleep {
256 fn drop(&mut self) {
257 if let Some((_, token)) = self.handle {
258 self.timer.worker.cancel_timeout(token, self.when);
259 }
260 }
261}
262
263impl<T> Timeout<T> {
270 pub fn get_ref(&self) -> &T {
276 self.future.as_ref().expect("the future has already been consumed")
277 }
278
279 pub fn get_mut(&mut self) -> &mut T {
285 self.future.as_mut().expect("the future has already been consumed")
286 }
287
288 pub fn into_inner(self) -> T {
294 self.future.expect("the future has already been consumed")
295 }
296}
297
298impl<F, E> Future for Timeout<F>
299 where F: Future<Error = E>,
300 E: From<TimeoutError<F>>,
301{
302 type Item = F::Item;
303 type Error = E;
304
305 fn poll(&mut self) -> Poll<F::Item, E> {
306 match self.future {
308 Some(ref mut f) => {
309 match f.poll() {
310 Ok(Async::NotReady) => {}
311 v => return v,
312 }
313 }
314 None => panic!("cannot call poll once value is consumed"),
315 }
316
317 match self.sleep.poll() {
319 Ok(Async::NotReady) => Ok(Async::NotReady),
320 Ok(Async::Ready(_)) => {
321 let f = self.future.take().unwrap();
323 Err(TimeoutError::TimedOut(f).into())
324 }
325 Err(e) => {
326 let f = self.future.take().unwrap();
328 Err(TimeoutError::Timer(f, e).into())
329 }
330 }
331 }
332}
333
334impl<T> TimeoutStream<T> {
341 pub fn get_ref(&self) -> &T {
347 self.stream.as_ref().expect("the stream has already been consumed")
348 }
349
350 pub fn get_mut(&mut self) -> &mut T {
356 self.stream.as_mut().expect("the stream has already been consumed")
357 }
358
359 pub fn into_inner(self) -> T {
365 self.stream.expect("the stream has already been consumed")
366 }
367}
368
369impl<T, E> Stream for TimeoutStream<T>
370 where T: Stream<Error = E>,
371 E: From<TimeoutError<T>>,
372{
373 type Item = T::Item;
374 type Error = E;
375
376 fn poll(&mut self) -> Poll<Option<T::Item>, E> {
377 match self.stream {
379 Some(ref mut s) => {
380 match s.poll() {
381 Ok(Async::NotReady) => {}
382 Ok(Async::Ready(Some(v))) => {
383 self.sleep = Sleep::new(self.sleep.timer.clone(), self.duration);
385
386 return Ok(Async::Ready(Some(v)));
388 }
389 v => return v,
390 }
391 }
392 None => panic!("cannot call poll once value is consumed"),
393 }
394
395 match self.sleep.poll() {
397 Ok(Async::NotReady) => Ok(Async::NotReady),
398 Ok(Async::Ready(_)) => {
399 let s = self.stream.take().unwrap();
401 Err(TimeoutError::TimedOut(s).into())
402 }
403 Err(e) => {
404 let s = self.stream.take().unwrap();
406 Err(TimeoutError::Timer(s, e).into())
407 }
408 }
409 }
410}
411
412impl fmt::Display for TimerError {
419 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
420 write!(fmt, "{}", Error::description(self))
421 }
422}
423
424impl Error for TimerError {
425 fn description(&self) -> &str {
426 match *self {
427 TimerError::TooLong => "requested timeout too long",
428 TimerError::NoCapacity => "timer out of capacity",
429 }
430 }
431}
432
433impl<T> fmt::Display for TimeoutError<T> {
434 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
435 write!(fmt, "{}", Error::description(self))
436 }
437}
438
439impl<T> fmt::Debug for TimeoutError<T> {
440 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
441 write!(fmt, "{}", Error::description(self))
442 }
443}
444
445impl<T> Error for TimeoutError<T> {
446 fn description(&self) -> &str {
447 use self::TimerError::*;
448 use self::TimeoutError::*;
449
450 match *self {
451 Timer(_, TooLong) => "requested timeout too long",
452 Timer(_, NoCapacity) => "timer out of capacity",
453 TimedOut(_) => "the future timed out",
454 }
455 }
456}
457
458impl<T> From<TimeoutError<T>> for io::Error {
459 fn from(src: TimeoutError<T>) -> io::Error {
460 use self::TimerError::*;
461 use self::TimeoutError::*;
462
463 match src {
464 Timer(_, TooLong) => io::Error::new(io::ErrorKind::InvalidInput, "requested timeout too long"),
465 Timer(_, NoCapacity) => io::Error::new(io::ErrorKind::Other, "timer out of capacity"),
466 TimedOut(_) => io::Error::new(io::ErrorKind::TimedOut, "the future timed out"),
467 }
468 }
469}
470
471impl From<TimerError> for io::Error {
472 fn from(src: TimerError) -> io::Error {
473 io::Error::new(io::ErrorKind::Other, src)
474 }
475}
476
477impl From<TimerError> for () {
478 fn from(_: TimerError) -> () {
479 }
480}
481
482impl<T> From<TimeoutError<T>> for () {
483 fn from(_: TimeoutError<T>) -> () {
484 }
485}