extern crate hex;
use std::io::{Read, Seek};
use error::{BlobError, BackendError, CrateDBError};
use dbcluster::DBCluster;
use backend::{Backend, BackendResult};
use dbcluster::{Loadbalancing, EndpointType};
use common::sha1_digest;
use sql::{QueryRunner, Nothing as NoParams};
use row::ByIndex;
use self::hex::FromHex;
#[derive(Debug, Clone, PartialEq)]
pub struct BlobRef {
pub sha1: Vec<u8>,
pub table: String,
}
pub trait BlobContainer {
fn list<TBL: Into<String>>(&self, table: TBL) -> Result<Vec<BlobRef>, BlobError>;
fn put<TBL: Into<String>, B: Read + Seek>(&self,
table: TBL,
blob: &mut B)
-> Result<BlobRef, BlobError>;
fn delete(&self, blob: BlobRef) -> Result<(), BlobError>;
fn get(&self, blob: &BlobRef) -> Result<Box<Read>, BlobError>;
}
impl<T: Backend + Sized> BlobContainer for DBCluster<T> {
fn put<TBL: Into<String>, B: Read + Seek>(&self,
table: TBL,
blob: &mut B)
-> Result<BlobRef, BlobError> {
match sha1_digest(blob) {
Ok(sha1) => {
let url = self.get_endpoint(EndpointType::Blob);
let table = table.into();
match self.backend
.upload_blob(url, &table, &sha1, blob)
.map_err(BlobError::Transport) {
Ok(status) => {
match status {
BackendResult::Ok => {
Ok(BlobRef {
table: table,
sha1: sha1,
})
}
BackendResult::NotFound => {
Err(BlobError::Action(CrateDBError::new("Could not upload BLOB. Not found.",
"404")))
}
BackendResult::NotAuthorized => {
Err(BlobError::Action(CrateDBError::new("Could not upload BLOB: Not authorized.",
"403")))
}
BackendResult::Timeout => {
Err(BlobError::Action(CrateDBError::new("Could not upload BLOB. Timed out.",
"408")))
}
BackendResult::Error => {
Err(BlobError::Action(CrateDBError::new("Could not upload BLOB. Server error.",
"500")))
}
}
}
Err(e) => Err(e),
}
}
Err(io) => Err(BlobError::Transport(BackendError::from_io(io))),
}
}
fn delete(&self, blob: BlobRef) -> Result<(), BlobError> {
let url = self.get_endpoint(EndpointType::Blob);
match self.backend
.delete_blob(url, &blob.table, &blob.sha1)
.map_err(BlobError::Transport) {
Ok(status) => {
match status {
BackendResult::Ok => Ok(()),
BackendResult::NotFound => {
Err(BlobError::Action(CrateDBError::new("Could not delete BLOB. Not found.",
"404")))
}
BackendResult::NotAuthorized => {
Err(BlobError::Action(CrateDBError::new("Could not delete BLOB: Not authorized.",
"403")))
}
BackendResult::Timeout => {
Err(BlobError::Action(CrateDBError::new("Could not delete BLOB. Timed out.",
"408")))
}
BackendResult::Error => {
Err(BlobError::Action(CrateDBError::new("Could not delete BLOB. Server error.",
"500")))
}
}
}
Err(e) => Err(e),
}
}
fn get(&self, blob: &BlobRef) -> Result<Box<Read>, BlobError> {
let url = self.get_endpoint(EndpointType::Blob);
match self.backend
.fetch_blob(url, &blob.table, &blob.sha1)
.map_err(BlobError::Transport) {
Ok((status, content)) => {
match status {
BackendResult::Ok => Ok(content),
BackendResult::NotFound => {
Err(BlobError::Action(CrateDBError::new("Could not fetch BLOB. Not found.",
"404")))
}
BackendResult::NotAuthorized => {
Err(BlobError::Action(CrateDBError::new("Could not fetch BLOB: Not authorized.",
"403")))
}
BackendResult::Timeout => {
Err(BlobError::Action(CrateDBError::new("Could not fetch BLOB. Timed out.",
"408")))
}
BackendResult::Error => {
Err(BlobError::Action(CrateDBError::new("Could not fetch BLOB. Server error.",
"500")))
}
}
}
Err(e) => Err(e),
}
}
fn list<TBL: Into<String>>(&self, table: TBL) -> Result<Vec<BlobRef>, BlobError> {
let table_name = table.into();
match self.query(format!("select digest from blob.{}", table_name),
None::<Box<NoParams>>) {
Ok((_, rows)) => {
let mut blob_refs = Vec::with_capacity(rows.len());
for row in rows {
if let Some(digest_str) = row.as_string(0) {
if let Ok(digest) = Vec::from_hex(digest_str) {
blob_refs.push(BlobRef {
sha1: digest,
table: table_name.clone(),
});
}
}
}
Ok(blob_refs)
}
Err(e) => Err(BlobError::Action(e)),
}
}
}