1use {interval, Interval, Builder, wheel};
2use worker::Worker;
3use wheel::{Token, Wheel};
4
5use futures::{Future, Stream, Async, Poll};
6use futures::task::{self, Task};
7
8use std::fmt;
9use std::error::Error;
10use std::time::{Duration, Instant};
11
12#[derive(Clone)]
14pub struct Timer {
15 worker: Worker,
16}
17
18#[must_use = "futures do nothing unless polled"]
20#[derive(Debug)]
21pub struct Sleep {
22 timer: Timer,
23 when: Instant,
24 handle: Option<(Task, Token)>,
25}
26
27#[must_use = "futures do nothing unless polled"]
29#[derive(Debug)]
30pub struct Timeout<T> {
31 future: Option<T>,
32 sleep: Sleep,
33}
34
35#[derive(Debug)]
37pub struct TimeoutStream<T> {
38 stream: Option<T>,
39 duration: Duration,
40 sleep: Sleep,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum TimerError {
46 TooLong,
48 NoCapacity,
50}
51
52#[derive(Clone)]
54pub enum TimeoutError<T, E> {
55 Timer(T, TimerError),
57 TimedOut(T),
59 Inner(E),
61}
62
63pub fn build(builder: Builder) -> Timer {
64 let wheel = Wheel::new(&builder);
65 let worker = Worker::spawn(wheel, builder);
66
67 Timer { worker: worker }
68}
69
70impl Timer {
77 pub fn sleep(&self, duration: Duration) -> Sleep {
79 Sleep::new(self.clone(), duration)
80 }
81
82 pub fn timeout<F>(&self, future: F, duration: Duration) -> Timeout<F>
88 where F: Future
89 {
90 Timeout {
91 future: Some(future),
92 sleep: self.sleep(duration),
93 }
94 }
95
96 pub fn timeout_stream<T>(&self, stream: T, duration: Duration) -> TimeoutStream<T>
103 where T: Stream
104 {
105 TimeoutStream {
106 stream: Some(stream),
107 duration: duration,
108 sleep: self.sleep(duration),
109 }
110 }
111
112 pub fn interval(&self, dur: Duration) -> Interval {
115 interval::new(self.sleep(dur), dur)
116 }
117
118 pub fn interval_at(&self, at: Instant, dur: Duration) -> Interval {
121 let now = Instant::now();
122
123 let sleep = if at > now {
124 self.sleep(at - now)
125 } else {
126 self.sleep(Duration::from_millis(0))
127 };
128
129 interval::new(sleep, dur)
130 }
131}
132
133impl Default for Timer {
134 fn default() -> Timer {
135 wheel().build()
136 }
137}
138
139impl fmt::Debug for Timer {
140 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
141 write!(fmt, "Timer")
142 }
143}
144
145impl Sleep {
152 fn new(timer: Timer, duration: Duration) -> Sleep {
154 Sleep {
155 timer: timer,
156 when: Instant::now() + duration,
157 handle: None,
158 }
159 }
160
161 pub fn is_expired(&self) -> bool {
169 Instant::now() >= self.when - *self.timer.worker.tolerance()
170 }
171
172 pub fn remaining(&self) -> Duration {
174 let now = Instant::now();
175
176 if now >= self.when {
177 Duration::from_millis(0)
178 } else {
179 self.when - now
180 }
181 }
182
183 pub fn timer(&self) -> &Timer {
185 &self.timer
186 }
187}
188
189impl Future for Sleep {
190 type Item = ();
191 type Error = TimerError;
192
193 fn poll(&mut self) -> Poll<(), TimerError> {
194 if self.is_expired() {
195 return Ok(Async::Ready(()));
196 }
197
198 let handle = match self.handle {
203 None => {
204 if (self.when - Instant::now()) > *self.timer.worker.max_timeout() {
208 return Err(TimerError::TooLong);
209 }
210
211 let task = task::current();
213
214 match self.timer.worker.set_timeout(self.when, task.clone()) {
215 Ok(token) => {
216 (task, token)
217 }
218 Err(task) => {
219 task.notify();
221 return Ok(Async::NotReady);
222 }
223 }
224 }
225 Some((ref task, token)) => {
226 if task.will_notify_current() {
227 return Ok(Async::NotReady);
230 }
231
232 let task = task::current();
233
234 match self.timer.worker.move_timeout(token, self.when, task.clone()) {
237 Ok(_) => (task, token),
238 Err(task) => {
239 task.notify();
241 return Ok(Async::NotReady);
242 }
243 }
244 }
245 };
246
247 self.handle = Some(handle);
249
250 Ok(Async::NotReady)
251 }
252}
253
254impl Drop for Sleep {
255 fn drop(&mut self) {
256 if let Some((_, token)) = self.handle {
257 self.timer.worker.cancel_timeout(token, self.when);
258 }
259 }
260}
261
262impl<T> Timeout<T> {
269 pub fn get_ref(&self) -> &T {
275 self.future.as_ref().expect("the future has already been consumed")
276 }
277
278 pub fn get_mut(&mut self) -> &mut T {
284 self.future.as_mut().expect("the future has already been consumed")
285 }
286
287 pub fn into_inner(self) -> T {
293 self.future.expect("the future has already been consumed")
294 }
295}
296
297impl<F, E> Future for Timeout<F>
298 where F: Future<Error = E>
299{
300 type Item = F::Item;
301 type Error = TimeoutError<F, E>;
302
303 fn poll(&mut self) -> Poll<F::Item, TimeoutError<F, E>> {
304 match self.future {
306 Some(ref mut f) => {
307 match f.poll() {
308 Ok(Async::NotReady) => {}
309 Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
310 Err(e) => return Err(TimeoutError::Inner(e)),
311 }
312 }
313 None => panic!("cannot call poll once value is consumed"),
314 }
315
316 match self.sleep.poll() {
318 Ok(Async::NotReady) => Ok(Async::NotReady),
319 Ok(Async::Ready(_)) => {
320 let f = self.future.take().unwrap();
322 Err(TimeoutError::TimedOut(f).into())
323 }
324 Err(e) => {
325 let f = self.future.take().unwrap();
327 Err(TimeoutError::Timer(f, e).into())
328 }
329 }
330 }
331}
332
333impl<T> TimeoutStream<T> {
340 pub fn get_ref(&self) -> &T {
346 self.stream.as_ref().expect("the stream has already been consumed")
347 }
348
349 pub fn get_mut(&mut self) -> &mut T {
355 self.stream.as_mut().expect("the stream has already been consumed")
356 }
357
358 pub fn into_inner(self) -> T {
364 self.stream.expect("the stream has already been consumed")
365 }
366}
367
368impl<T, E> Stream for TimeoutStream<T>
369 where T: Stream<Error = E>
370{
371 type Item = T::Item;
372 type Error = TimeoutError<T, E>;
373
374 fn poll(&mut self) -> Poll<Option<T::Item>, TimeoutError<T, E>> {
375 match self.stream {
377 Some(ref mut s) => {
378 match s.poll() {
379 Ok(Async::NotReady) => {}
380 Ok(Async::Ready(Some(v))) => {
381 self.sleep = Sleep::new(self.sleep.timer.clone(), self.duration);
383
384 return Ok(Async::Ready(Some(v)));
386 },
387 Ok(Async::Ready(None)) => {
388 return Ok(Async::Ready(None));
389 },
390 Err(e) => return Err(TimeoutError::Inner(e)),
391 }
392 }
393 None => panic!("cannot call poll once value is consumed"),
394 }
395
396 match self.sleep.poll() {
398 Ok(Async::NotReady) => Ok(Async::NotReady),
399 Ok(Async::Ready(_)) => {
400 let s = self.stream.take().unwrap();
402 Err(TimeoutError::TimedOut(s).into())
403 }
404 Err(e) => {
405 let s = self.stream.take().unwrap();
407 Err(TimeoutError::Timer(s, e).into())
408 }
409 }
410 }
411}
412
413impl fmt::Display for TimerError {
420 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
421 write!(fmt, "{}", Error::description(self))
422 }
423}
424
425impl Error for TimerError {
426 fn description(&self) -> &str {
427 match *self {
428 TimerError::TooLong => "requested timeout too long",
429 TimerError::NoCapacity => "timer out of capacity",
430 }
431 }
432}
433
434impl<T, E: Error> fmt::Display for TimeoutError<T, E> {
435 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
436 write!(fmt, "{}", Error::description(self))
437 }
438}
439
440impl<T, E: Error> fmt::Debug for TimeoutError<T, E> {
441 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
442 write!(fmt, "{}", Error::description(self))
443 }
444}
445
446impl<T, E: Error> Error for TimeoutError<T, E> {
447 fn description(&self) -> &str {
448 use self::TimerError::*;
449 use self::TimeoutError::*;
450
451 match *self {
452 Timer(_, TooLong) => "requested timeout too long",
453 Timer(_, NoCapacity) => "timer out of capacity",
454 TimedOut(_) => "the future timed out",
455 Inner(_) => "inner error",
456 }
457 }
458}