use std::cell::RefCell;
use std::fmt;
use std::io;
use std::rc::Rc;
use std::time;
use cache::{
Cache,
CacheRead,
CacheWrite,
Storage,
};
use chrono;
use futures::future::Shared;
use futures::{future, Async, Future, Stream};
use hyper;
use hyper::header::{Authorization, Bearer, ContentType, ContentLength};
use hyper::Method;
use hyper::client::{Client, HttpConnector, Request};
use hyper_tls::HttpsConnector;
use jwt;
use openssl;
use serde_json;
use tokio_core::reactor::Handle;
use url::form_urlencoded;
use url::percent_encoding::{percent_encode, PATH_SEGMENT_ENCODE_SET, QUERY_ENCODE_SET};
use errors::*;
type HyperClient = Client<HttpsConnector<HttpConnector>>;
struct Bucket {
name: String,
client: HyperClient,
}
impl fmt::Display for Bucket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Bucket(name={})", self.name)
}
}
impl Bucket {
pub fn new(name: String, handle: &Handle) -> Result<Bucket> {
let client = Client::configure()
.connector(HttpsConnector::new(1, handle)?)
.build(handle);
Ok(Bucket { name, client })
}
fn get(&self, key: &str, cred_provider: &Option<GCSCredentialProvider>) -> SFuture<Vec<u8>> {
let url = format!("https://www.googleapis.com/download/storage/v1/b/{}/o/{}?alt=media",
percent_encode(self.name.as_bytes(), PATH_SEGMENT_ENCODE_SET),
percent_encode(key.as_bytes(), PATH_SEGMENT_ENCODE_SET));
let client = self.client.clone();
let creds_opt_future = if let &Some(ref cred_provider) = cred_provider {
future::Either::A(cred_provider.credentials(&self.client).map(Some))
} else {
future::Either::B(future::ok(None))
};
Box::new(creds_opt_future.and_then(move |creds_opt| {
let mut request = Request::new(Method::Get, url.parse().unwrap());
if let Some(creds) = creds_opt {
request.headers_mut()
.set(Authorization(Bearer { token: creds.token }));
}
client.request(request).chain_err(move || {
format!("failed GET: {}", url)
}).and_then(|res| {
if res.status().is_success() {
Ok(res.body())
} else {
Err(ErrorKind::BadHTTPStatus(res.status().clone()).into())
}
}).and_then(|body| {
body.fold(Vec::new(), |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, hyper::Error>(body)
}).chain_err(|| {
"failed to read HTTP body"
})
})
}))
}
fn put(&self, key: &str, content: Vec<u8>, cred_provider: &Option<GCSCredentialProvider>) -> SFuture<()> {
let url = format!("https://www.googleapis.com/upload/storage/v1/b/{}/o?name={}&uploadType=media",
percent_encode(self.name.as_bytes(), PATH_SEGMENT_ENCODE_SET),
percent_encode(key.as_bytes(), QUERY_ENCODE_SET));
let client = self.client.clone();
let creds_opt_future = if let &Some(ref cred_provider) = cred_provider {
future::Either::A(cred_provider.credentials(&self.client).map(Some))
} else {
future::Either::B(future::ok(None))
};
Box::new(creds_opt_future.and_then(move |creds_opt| {
let mut request = Request::new(Method::Post, url.parse().unwrap());
{
let headers = request.headers_mut();
if let Some(creds) = creds_opt {
headers.set(Authorization(Bearer { token: creds.token }));
}
headers.set(ContentType::octet_stream());
headers.set(ContentLength(content.len() as u64));
}
request.set_body(content);
client.request(request).then(|result| {
match result {
Ok(res) => {
if res.status().is_success() {
trace!("PUT succeeded");
Ok(())
} else {
trace!("PUT failed with HTTP status: {}", res.status());
Err(ErrorKind::BadHTTPStatus(res.status().clone()).into())
}
}
Err(e) => {
trace!("PUT failed with error: {:?}", e);
Err(e.into())
}
}
})
}))
}
}
pub struct GCSCredentialProvider {
rw_mode: RWMode,
sa_key: ServiceAccountKey,
cached_credentials: RefCell<Option<Shared<SFuture<GCSCredential>>>>,
}
#[derive(Debug, Deserialize)]
pub struct ServiceAccountKey {
private_key: String,
client_email: String,
}
#[derive(Serialize)]
struct JwtClaims {
#[serde(rename = "iss")]
issuer: String,
scope: String,
#[serde(rename = "aud")]
audience: String,
#[serde(rename = "exp")]
expiration: i64,
#[serde(rename = "iat")]
issued_at: i64,
}
#[derive(Deserialize)]
struct TokenMsg {
access_token: String,
}
#[derive(Copy, Clone)]
pub enum RWMode {
ReadOnly,
ReadWrite,
}
#[derive(Clone)]
pub struct GCSCredential {
token: String,
expiration_time: chrono::DateTime<chrono::UTC>,
}
impl GCSCredentialProvider {
pub fn new(rw_mode: RWMode, sa_key: ServiceAccountKey) -> Self {
GCSCredentialProvider {
rw_mode,
sa_key,
cached_credentials: RefCell::new(None),
}
}
fn auth_request_jwt(&self, expire_at: &chrono::DateTime<chrono::UTC>) -> Result<String> {
let scope = (match self.rw_mode {
RWMode::ReadOnly => "https://www.googleapis.com/auth/devstorage.readonly",
RWMode::ReadWrite => "https://www.googleapis.com/auth/devstorage.read_write",
}).to_owned();
let jwt_claims = JwtClaims {
issuer: self.sa_key.client_email.clone(),
scope: scope,
audience: "https://www.googleapis.com/oauth2/v4/token".to_owned(),
expiration: expire_at.timestamp(),
issued_at: chrono::UTC::now().timestamp(),
};
let binary_key = openssl::rsa::Rsa::private_key_from_pem(
self.sa_key.private_key.as_bytes()
)?.private_key_to_der()?;
let auth_request_jwt = jwt::encode(
&jwt::Header::new(jwt::Algorithm::RS256),
&jwt_claims,
&binary_key,
)?;
Ok(auth_request_jwt)
}
fn request_new_token(&self, client: &HyperClient) -> SFuture<GCSCredential> {
let client = client.clone();
let expires_at = chrono::UTC::now() + chrono::Duration::minutes(59);
let auth_jwt = self.auth_request_jwt(&expires_at);
Box::new(future::result(auth_jwt).and_then(move |auth_jwt| {
let url = "https://www.googleapis.com/oauth2/v4/token";
let params = form_urlencoded::Serializer::new(String::new())
.append_pair("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer")
.append_pair("assertion", &auth_jwt)
.finish();
let mut request = Request::new(Method::Post, url.parse().unwrap());
{
let headers = request.headers_mut();
headers.set(ContentType::form_url_encoded());
headers.set(ContentLength(params.len() as u64));
}
request.set_body(params);
client.request(request).map_err(Into::into)
}).and_then(move |res| {
if res.status().is_success() {
Ok(res.body())
} else {
Err(ErrorKind::BadHTTPStatus(res.status().clone()).into())
}
}).and_then(move |body| {
body.fold(Vec::new(), |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, hyper::Error>(body)
}).chain_err(|| {
"failed to read HTTP body"
})
}).and_then(move |body| {
let body_str = String::from_utf8(body)?;
let token_msg: TokenMsg = serde_json::from_str(&body_str)?;
Ok(GCSCredential {
token: token_msg.access_token,
expiration_time: expires_at,
})
}))
}
pub fn credentials(&self, client: &HyperClient) -> SFuture<GCSCredential> {
let mut future_opt = self.cached_credentials.borrow_mut();
let needs_refresh = match Option::as_mut(&mut future_opt).map(|f| f.poll()) {
None => true,
Some(Ok(Async::Ready(ref creds))) => creds.expiration_time < chrono::UTC::now(),
_ => false
};
if needs_refresh {
let credentials = self.request_new_token(client);
*future_opt = Some(credentials.shared());
};
Box::new(Option::as_mut(&mut future_opt).unwrap().clone().then(|result| {
match result {
Ok(e) => Ok((*e).clone()),
Err(e) => Err(e.to_string().into()),
}
}))
}
}
pub struct GCSCache {
bucket: Rc<Bucket>,
credential_provider: Option<GCSCredentialProvider>,
rw_mode: RWMode,
}
impl GCSCache {
pub fn new(bucket: String,
credential_provider: Option<GCSCredentialProvider>,
rw_mode: RWMode,
handle: &Handle) -> Result<GCSCache>
{
Ok(GCSCache {
bucket: Rc::new(Bucket::new(bucket, handle)?),
rw_mode: rw_mode,
credential_provider: credential_provider,
})
}
}
impl Storage for GCSCache {
fn get(&self, key: &str) -> SFuture<Cache> {
Box::new(self.bucket.get(&key, &self.credential_provider).then(|result| {
match result {
Ok(data) => {
let hit = CacheRead::from(io::Cursor::new(data))?;
Ok(Cache::Hit(hit))
}
Err(e) => {
warn!("Got GCS error: {:?}", e);
Ok(Cache::Miss)
}
}
}))
}
fn put(&self, key: &str, entry: CacheWrite) -> SFuture<time::Duration> {
if let RWMode::ReadOnly = self.rw_mode {
return Box::new(future::ok(time::Duration::new(0, 0)));
}
let start = time::Instant::now();
let data = match entry.finish() {
Ok(data) => data,
Err(e) => return Box::new(future::err(e.into())),
};
let bucket = self.bucket.clone();
let response = bucket.put(&key, data, &self.credential_provider).chain_err(|| {
"failed to put cache entry in GCS"
});
Box::new(response.map(move |_| start.elapsed()))
}
fn location(&self) -> String {
format!("GCS, bucket: {}", self.bucket)
}
fn current_size(&self) -> Option<u64> { None }
fn max_size(&self) -> Option<u64> { None }
}