use cache::{
Cache,
CacheRead,
CacheWrite,
CacheWriteFuture,
CacheWriteWriter,
Storage,
};
use futures::{self,Future};
use simples3::{
AutoRefreshingProviderSync,
Bucket,
ChainProvider,
DefaultCredentialsProviderSync,
ProfileProvider,
ProvideAwsCredentials,
Ssl,
};
use std::env;
use std::io::{
self,
Error,
ErrorKind,
};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
#[derive(Clone)]
pub struct S3Cache {
bucket: Arc<Bucket>,
provider: Option<Arc<DefaultCredentialsProviderSync>>,
}
impl S3Cache {
pub fn new(bucket: &str) -> io::Result<S3Cache> {
let home = try!(env::home_dir().ok_or(Error::new(ErrorKind::Other, "Couldn't find home directory")));
let profile_providers = vec![
ProfileProvider::with_configuration(home.join(".aws").join("credentials"), "default"),
ProfileProvider::with_configuration(home.join(".boto"), "Credentials"),
];
let provider = AutoRefreshingProviderSync::with_mutex(ChainProvider::with_profile_providers(profile_providers)).ok().map(Arc::new);
let bucket = Arc::new(Bucket::new(bucket, Ssl::No));
Ok(S3Cache {
bucket: bucket,
provider: provider,
})
}
}
fn normalize_key(key: &str) -> String {
format!("{}/{}/{}/{}", &key[0..1], &key[1..2], &key[2..3], &key)
}
impl Storage for S3Cache {
fn get(&self, key: &str) -> Cache {
let key = normalize_key(key);
match self.bucket.get(&key) {
Ok(data) => {
CacheRead::from(io::Cursor::new(data))
.map(Cache::Hit)
.unwrap_or_else(Cache::Error)
}
Err(e) => {
warn!("Got AWS error: {:?}", e);
Cache::Miss
}
}
}
fn start_put(&self, _key: &str) -> io::Result<CacheWrite> {
Ok(CacheWrite::new(io::Cursor::new(vec!())))
}
fn finish_put(&self, key: &str, entry: CacheWrite) -> CacheWriteFuture {
let (complete, promise) = futures::oneshot();
let this = self.clone();
let key = key.to_owned();
thread::spawn(move || {
let start = Instant::now();
complete.complete(entry.finish()
.and_then(|writer| {
match writer {
CacheWriteWriter::File(_) => Err(Error::new(ErrorKind::Other, "Bad CacheWrite?")),
CacheWriteWriter::Cursor(c) => {
this.provider
.as_ref()
.ok_or(Error::new(ErrorKind::Other, "No AWS credential provider available!"))
.and_then(|provider| provider.credentials().or(Err(Error::new(ErrorKind::Other, "Couldn't get AWS credentials!"))))
.and_then(|credentials| {
let data = c.into_inner();
let key = normalize_key(&key);
this.bucket.put(&key, &data, &credentials)
.map_err(|e| Error::new(ErrorKind::Other, format!("Error putting cache entry to S3: {:?}", e)))
})
.map(|_| start.elapsed())
}
}
}).map_err(|e| format!("{}", e)));
});
promise.boxed()
}
fn get_location(&self) -> String {
format!("S3, bucket: {}", self.bucket)
}
}