1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::time::Duration;
use futures::{Future, Poll, Async};
use futures::sync::oneshot::spawn;
use tokio::runtime::Runtime;
use super::credential::CredentialsError;
use super::client::{TimeoutFuture, SignAndDispatchError};
use super::request::{HttpResponse, HttpDispatchError};
lazy_static! {
static ref FALLBACK_RUNTIME: Runtime = Runtime::new().unwrap();
}
pub struct RusotoFuture<T, E> {
state: Option<RusotoFutureState<T, E>>
}
pub fn new<T, E>(
future: Box<TimeoutFuture<Item=HttpResponse, Error=SignAndDispatchError> + Send>,
handler: fn(HttpResponse) -> Box<Future<Item=T, Error=E> + Send>
) -> RusotoFuture<T, E>
{
RusotoFuture { state: Some(RusotoFutureState::SignAndDispatch { future, handler }) }
}
impl<T, E> RusotoFuture<T, E> {
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.set_timeout(timeout);
self
}
pub fn set_timeout(&mut self, timeout: Duration) {
if let Some(RusotoFutureState::SignAndDispatch { ref mut future, .. }) = self.state {
future.set_timeout(timeout);
}
}
pub fn clear_timeout(&mut self) {
if let Some(RusotoFutureState::SignAndDispatch { ref mut future, .. }) = self.state {
future.clear_timeout();
}
}
pub fn sync(self) -> Result<T, E>
where T: Send + 'static,
E: From<CredentialsError> + From<HttpDispatchError> + Send + 'static
{
spawn(self, &FALLBACK_RUNTIME.executor()).wait()
}
}
impl<T, E> Future for RusotoFuture<T, E>
where E: From<CredentialsError> + From<HttpDispatchError>
{
type Item = T;
type Error = E;
fn poll(&mut self) -> Poll<T, E> {
match self.state.take().unwrap() {
RusotoFutureState::SignAndDispatch { mut future, handler } => {
match future.poll() {
Err(SignAndDispatchError::Credentials(err)) => Err(err.into()),
Err(SignAndDispatchError::Dispatch(err)) => Err(err.into()),
Ok(Async::Ready(response)) => {
self.state = Some(RusotoFutureState::RunningResponseHandler(handler(response)));
self.poll()
},
Ok(Async::NotReady) => {
self.state = Some(RusotoFutureState::SignAndDispatch { future, handler });
Ok(Async::NotReady)
}
}
},
RusotoFutureState::RunningResponseHandler(mut future) => {
match future.poll()? {
Async::Ready(value) => Ok(Async::Ready(value)),
Async::NotReady => {
self.state = Some(RusotoFutureState::RunningResponseHandler(future));
Ok(Async::NotReady)
}
}
}
}
}
}
enum RusotoFutureState<T, E> {
SignAndDispatch {
future: Box<TimeoutFuture<Item=HttpResponse, Error=SignAndDispatchError> + Send>,
handler: fn(HttpResponse) -> Box<Future<Item=T, Error=E> + Send>
},
RunningResponseHandler(Box<Future<Item=T, Error=E> + Send>)
}
#[test]
fn rusoto_future_is_send() {
fn is_send<T: Send>() {}
is_send::<RusotoFuture<(), ()>>();
}