use crate::authenticator_delegate::{AuthenticatorDelegate, Retry};
use crate::refresh::RefreshFlow;
use crate::storage::{hash_scopes, DiskTokenStorage, MemoryStorage, TokenStorage};
use crate::types::{ApplicationSecret, GetToken, RefreshResult, RequestError, Token};
use futures::{future, prelude::*};
use tokio_timer;
use std::error::Error;
use std::io;
use std::sync::{Arc, Mutex};
pub struct Authenticator<
T: GetToken,
S: TokenStorage,
AD: AuthenticatorDelegate,
C: hyper::client::connect::Connect,
> {
client: hyper::Client<C>,
inner: Arc<Mutex<T>>,
store: Arc<Mutex<S>>,
delegate: AD,
}
impl<T: GetToken, AD: AuthenticatorDelegate, C: hyper::client::connect::Connect>
Authenticator<T, MemoryStorage, AD, C>
{
pub fn new(
client: hyper::Client<C>,
inner: T,
delegate: AD,
) -> Authenticator<T, MemoryStorage, AD, C> {
Authenticator {
client: client,
inner: Arc::new(Mutex::new(inner)),
store: Arc::new(Mutex::new(MemoryStorage::new())),
delegate: delegate,
}
}
}
impl<T: GetToken, AD: AuthenticatorDelegate, C: hyper::client::connect::Connect>
Authenticator<T, DiskTokenStorage, AD, C>
{
pub fn new_disk<P: AsRef<str>>(
client: hyper::Client<C>,
inner: T,
delegate: AD,
token_storage_path: P,
) -> io::Result<Authenticator<T, DiskTokenStorage, AD, C>> {
Ok(Authenticator {
client: client,
inner: Arc::new(Mutex::new(inner)),
store: Arc::new(Mutex::new(DiskTokenStorage::new(token_storage_path)?)),
delegate: delegate,
})
}
}
impl<
GT: 'static + GetToken + Send,
S: 'static + TokenStorage + Send,
AD: 'static + AuthenticatorDelegate + Send,
C: 'static + hyper::client::connect::Connect + Clone + Send,
> GetToken for Authenticator<GT, S, AD, C>
{
fn api_key(&mut self) -> Option<String> {
self.inner.lock().unwrap().api_key()
}
fn application_secret(&self) -> ApplicationSecret {
self.inner.lock().unwrap().application_secret()
}
fn token<I, T>(
&mut self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
where
T: Into<String>,
I: IntoIterator<Item = T>,
{
let (scope_key, scopes) = hash_scopes(scopes);
let store = self.store.clone();
let mut delegate = self.delegate.clone();
let client = self.client.clone();
let appsecret = self.inner.lock().unwrap().application_secret();
let gettoken = self.inner.clone();
let loopfn = move |()| -> Box<
dyn Future<Item = future::Loop<Token, ()>, Error = RequestError> + Send,
> {
match store.lock().unwrap().get(
scope_key.clone(),
&scopes.iter().map(|s| s.as_str()).collect(),
) {
Ok(Some(t)) => {
if !t.expired() {
return Box::new(Ok(future::Loop::Break(t)).into_future());
}
let refresh_token = t.refresh_token.clone();
let mut delegate = delegate.clone();
let store = store.clone();
let scopes = scopes.clone();
let refresh_fut = RefreshFlow::refresh_token(
client.clone(),
appsecret.clone(),
refresh_token,
)
.and_then(move |rr| -> Box<dyn Future<Item=future::Loop<Token, ()>, Error=RequestError> + Send> {
match rr {
RefreshResult::Error(ref e) => {
delegate.token_refresh_failed(
format!("{}", e.description().to_string()),
&Some("the request has likely timed out".to_string()),
);
Box::new(Err(RequestError::Refresh(rr)).into_future())
}
RefreshResult::RefreshError(ref s, ref ss) => {
delegate.token_refresh_failed(
format!("{} {}", s, ss.clone().map(|s| format!("({})", s)).unwrap_or("".to_string())),
&Some("the refresh token is likely invalid and your authorization has been revoked".to_string()),
);
Box::new(Err(RequestError::Refresh(rr)).into_future())
}
RefreshResult::Success(t) => {
if let Err(e) = store.lock().unwrap().set(scope_key, &scopes.iter().map(|s| s.as_str()).collect(), Some(t.clone())) {
match delegate.token_storage_failure(true, &e) {
Retry::Skip => Box::new(Ok(future::Loop::Break(t)).into_future()),
Retry::Abort => Box::new(Err(RequestError::Cache(Box::new(e))).into_future()),
Retry::After(d) => Box::new(
tokio_timer::sleep(d)
.then(|_| Ok(future::Loop::Continue(()))),
)
as Box<
dyn Future<
Item = future::Loop<Token, ()>,
Error = RequestError> + Send>,
}
} else {
Box::new(Ok(future::Loop::Break(t)).into_future())
}
},
}
});
Box::new(refresh_fut)
}
Ok(None) => {
let store = store.clone();
let scopes = scopes.clone();
let mut delegate = delegate.clone();
Box::new(
gettoken
.lock()
.unwrap()
.token(scopes.clone())
.and_then(move |t| {
if let Err(e) = store.lock().unwrap().set(
scope_key,
&scopes.iter().map(|s| s.as_str()).collect(),
Some(t.clone()),
) {
match delegate.token_storage_failure(true, &e) {
Retry::Skip => {
Box::new(Ok(future::Loop::Break(t)).into_future())
}
Retry::Abort => Box::new(
Err(RequestError::Cache(Box::new(e))).into_future(),
),
Retry::After(d) => Box::new(
tokio_timer::sleep(d)
.then(|_| Ok(future::Loop::Continue(()))),
)
as Box<
dyn Future<
Item = future::Loop<Token, ()>,
Error = RequestError,
> + Send,
>,
}
} else {
Box::new(Ok(future::Loop::Break(t)).into_future())
}
}),
)
}
Err(err) => match delegate.token_storage_failure(false, &err) {
Retry::Abort | Retry::Skip => {
return Box::new(Err(RequestError::Cache(Box::new(err))).into_future())
}
Retry::After(d) => {
return Box::new(
tokio_timer::sleep(d).then(|_| Ok(future::Loop::Continue(()))),
)
}
},
}
};
Box::new(future::loop_fn((), loopfn))
}
}