1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::time::Duration;

use futures::{Future, Poll, Async};
use futures::sync::oneshot::spawn;
use tokio::runtime::Runtime;

use super::credential::CredentialsError;
use super::client::{TimeoutFuture, SignAndDispatchError};
use super::request::{HttpResponse, HttpDispatchError};

lazy_static! {
    static ref FALLBACK_RUNTIME: Runtime = Runtime::new().unwrap();
}

/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E> {
    state: Option<RusotoFutureState<T, E>>
}

pub fn new<T, E>(
        future: Box<TimeoutFuture<Item=HttpResponse, Error=SignAndDispatchError> + Send>,
        handler: fn(HttpResponse) -> Box<Future<Item=T, Error=E> + Send>
    ) -> RusotoFuture<T, E>
{
    RusotoFuture { state: Some(RusotoFutureState::SignAndDispatch { future, handler }) }
}

impl<T, E> RusotoFuture<T, E> {
    /// Set the timeout on the future to the provided duration.
    ///
    /// Unlike `set_timeout` this method can be easily chained:
    ///
    /// ```rust,ignore
    /// let future = s3.head_object(...)
    ///     .with_timeout(Duration::from_secs(10));
    /// ```
    ///
    /// This is only guaranteed to take effect when called before the future
    /// is polled for the first time.
    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.set_timeout(timeout);
        self
    }

    /// Set the timeout on the future to the provided duration.
    ///
    /// This is only guaranteed to take effect when called before the future
    /// is polled for the first time.
    pub fn set_timeout(&mut self, timeout: Duration) {
        if let Some(RusotoFutureState::SignAndDispatch { ref mut future, .. }) = self.state {
            future.set_timeout(timeout);
        }
    }

    /// Clear the timeout on the future.
    ///
    /// This is only guaranteed to take effect when called before the future
    /// is polled for the first time.
    pub fn clear_timeout(&mut self) {
        if let Some(RusotoFutureState::SignAndDispatch { ref mut future, .. }) = self.state {
            future.clear_timeout();
        }
    }

    /// Blocks the current thread until the future has resolved.
    ///
    /// This is meant to provide a simple way for non-async consumers
    /// to work with rusoto.
    pub fn sync(self) -> Result<T, E>
        where T: Send + 'static,
              E: From<CredentialsError> + From<HttpDispatchError> + Send + 'static
    {
        spawn(self, &FALLBACK_RUNTIME.executor()).wait()
    }
}

impl<T, E> Future for RusotoFuture<T, E>
    where E: From<CredentialsError> + From<HttpDispatchError>
{
    type Item = T;
    type Error = E;

    fn poll(&mut self) -> Poll<T, E> {
        match self.state.take().unwrap() {
            RusotoFutureState::SignAndDispatch { mut future, handler } => {
                match future.poll() {
                    Err(SignAndDispatchError::Credentials(err)) => Err(err.into()),
                    Err(SignAndDispatchError::Dispatch(err)) => Err(err.into()),
                    Ok(Async::Ready(response)) => {
                        self.state = Some(RusotoFutureState::RunningResponseHandler(handler(response)));
                        self.poll()
                    },
                    Ok(Async::NotReady) => {
                        self.state = Some(RusotoFutureState::SignAndDispatch { future, handler });
                        Ok(Async::NotReady)
                    }
                }
            },
            RusotoFutureState::RunningResponseHandler(mut future) => {
                match future.poll()? {
                    Async::Ready(value) => Ok(Async::Ready(value)),
                    Async::NotReady => {
                        self.state = Some(RusotoFutureState::RunningResponseHandler(future));
                        Ok(Async::NotReady)
                    }
                }
            }
        }
    }
}

enum RusotoFutureState<T, E> {
    SignAndDispatch {
        future: Box<TimeoutFuture<Item=HttpResponse, Error=SignAndDispatchError> + Send>,
        handler: fn(HttpResponse) -> Box<Future<Item=T, Error=E> + Send>
    },
    RunningResponseHandler(Box<Future<Item=T, Error=E> + Send>)
}

#[test]
fn rusoto_future_is_send() {
    fn is_send<T: Send>() {}

    is_send::<RusotoFuture<(), ()>>();
}