1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
pub mod timer {
use futures::{Async, Future, Poll};
use std::sync::mpsc::RecvError;
use std::time;
use fiber::{self, Context};
use io::poll;
pub trait TimerExt: Sized + Future {
fn timeout_after(self, duration: time::Duration) -> TimeoutAfter<Self> {
TimeoutAfter {
future: self,
timeout: timeout(duration),
}
}
}
impl<T: Future> TimerExt for T {}
pub struct TimeoutAfter<T> {
future: T,
timeout: Timeout,
}
impl<T: Future> Future for TimeoutAfter<T> {
type Item = T::Item;
type Error = Option<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(value) = self.future.poll().map_err(Some)? {
Ok(Async::Ready(value))
} else if let Ok(Async::NotReady) = self.timeout.poll() {
Ok(Async::NotReady)
} else {
Err(None)
}
}
}
#[derive(Debug)]
pub struct Timeout {
start: time::Instant,
duration: time::Duration,
inner: Option<poll::poller::Timeout>,
}
pub fn timeout(delay_from_now: time::Duration) -> Timeout {
Timeout {
start: time::Instant::now(),
duration: delay_from_now,
inner: None,
}
}
impl Future for Timeout {
type Item = ();
type Error = RecvError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut inner) = self.inner {
inner.poll()
} else {
let duration = self.duration;
let elapsed = self.start.elapsed();
if elapsed >= duration {
return Ok(Async::Ready(()));
}
let set_timeout = |mut c: Context| {
let rest = duration - elapsed;
poll::poller::set_timeout(c.poller(), rest)
};
if let Some(inner) = fiber::with_current_context(set_timeout) {
self.inner = Some(inner);
self.poll()
} else {
Ok(Async::NotReady)
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use futures::{self, Async, Future};
use std::time::Duration;
#[test]
fn it_works() {
let mut timeout = timeout(Duration::from_secs(0));
assert_eq!(timeout.poll(), Ok(Async::Ready(())));
}
#[test]
fn timeout_after_works() {
let mut future = futures::empty::<(), ()>().timeout_after(Duration::from_secs(0));
assert_eq!(future.poll(), Err(None));
let mut future = futures::finished::<(), ()>(()).timeout_after(Duration::from_secs(1));
assert_eq!(future.poll(), Ok(Async::Ready(())));
let mut future = futures::failed::<(), ()>(()).timeout_after(Duration::from_secs(1));
assert_eq!(future.poll(), Err(Some(())));
}
}
}