reproto-server 0.1.1

ReProto Repository Server
use errors::*;
use errors::ErrorKind::*;
use flate2::FlateReadExt;
use futures::future::{Future, ok};
use futures_cpupool::CpuPool;
use hyper::{self, Method, StatusCode};
use hyper::header::{ContentEncoding, ContentLength, ContentType, Encoding, Headers};
use hyper::mime;
use hyper::server::{Request, Response, Service};
use io;
use reproto_repository::{Checksum, FileObjects, Objects, to_checksum};
use std::fs::File;
use std::io::{Seek, SeekFrom};
use std::io::Read;
use std::sync::{Arc, Mutex};
use tempfile;

const CHECKSUM_MISMATCH: &'static str = "checksum mismatch";
const BAD_OBJECT_ID: &'static str = "bad object id";

/// ## Read the contents of the file into a byte-vector
fn read_contents<'a, R: AsMut<Read + 'a>>(mut reader: R) -> Result<Vec<u8>> {
    let mut content = Vec::new();
    reader.as_mut().read_to_end(&mut content)?;

pub struct ReprotoService {
    pub max_file_size: u64,
    pub pool: Arc<CpuPool>,
    pub objects: Arc<Mutex<FileObjects>>,

type EncodingFn = fn(&File) -> Result<Box<Read>>;

impl ReprotoService {
    fn no_encoding(input: &File) -> Result<Box<Read>> {

    fn gzip_encoding(input: &File) -> Result<Box<Read>> {
        Ok(Box::new(input.try_clone()?.gz_decode().chain_err(|| "failed to open gz file")?))

    fn pick_encoding(headers: &Headers) -> fn(&File) -> Result<Box<Read>> {
        if let Some(h) = headers.get::<ContentEncoding>() {
            // client encoded as gzip
            if h.0.iter().find(|encoding| **encoding == Encoding::Gzip).is_some() {
                return Self::gzip_encoding;


    fn not_found() -> Response {

    fn get_objects<'a, I>(&self, path: I) -> Result<Box<Future<Item = Response, Error = Error>>>
        where I: IntoIterator<Item = &'a str>
        let id = if let Some(id) = path.into_iter().next() {
        } else {
            return Ok(ok(Self::not_found()).boxed());

        let objects = self.objects.clone();

        let checksum = Checksum::from_str(id).map_err(|_| BadRequest(BAD_OBJECT_ID))?;

        // No async I/O, use pool
            .spawn_fn(move || {
                let result = objects.lock().map_err(|_| PoisonError)?.get_object(&checksum)?;

                let object = match result {
                    Some(object) => object,
                    None => return Ok(Self::not_found()),

                let bytes = read_contents(;

                    .with_header(ContentLength(bytes.len() as u64))

    /// Put the uploaded object into the object repository.
    fn put_uploaded_object<F>(&self,
                              body: F,
                              checksum: Checksum,
                              encoding: EncodingFn)
                              -> Box<Future<Item = Response, Error = Error>>
        where F: 'static + Future<Item = File, Error = Error>
        let pool = self.pool.clone();
        let objects = self.objects.clone();

        let upload = body.and_then(move |mut tmp| {
            pool.spawn_fn(move || {
                let mut checksum_read = encoding(&tmp)?;

                let actual =
                    to_checksum(&mut checksum_read).chain_err(|| "failed to calculate checksum")?;

                if actual != checksum {
                    info!("{} != {}", actual, checksum);

                    return Ok(Response::new()

                info!("Uploading object: {}", checksum);

                let mut read = encoding(&tmp)?;

                    .map_err(|_| PoisonError)?
                    .put_object(&checksum, &mut read, false)
                    .chain_err(|| "failed to put object")?;



    fn put_objects<'a, I>(&self,
                          req: Request,
                          path: I)
                          -> Result<Box<Future<Item = Response, Error = Error>>>
        where I: IntoIterator<Item = &'a str>
        let id = if let Some(id) = path.into_iter().next() {
        } else {
            return Ok(ok(Self::not_found()).boxed());

        let checksum = Checksum::from_str(id).map_err(|_| BadRequest(BAD_OBJECT_ID))?;

        if let Some(len) = req.headers().get::<ContentLength>() {
            if len.0 > self.max_file_size {
                return Err(BadRequest("file too large").into());
        } else {
            return Err(BadRequest("missing content-length").into());

        let encoding = Self::pick_encoding(req.headers());

        info!("Creating temporary file");
        let body = io::stream_to_file(tempfile::tempfile()?, self.pool.clone(), req.body());
        Ok(self.put_uploaded_object(body, checksum, encoding))

    fn inner_call<'a, I>(&self,
                         req: Request,
                         path: I)
                         -> Result<Box<Future<Item = Response, Error = Error>>>
        where I: IntoIterator<Item = &'a str>
        let mut it = path.into_iter();

        if let Some(part) = {
            match (req.method(), part) {
                (&Method::Get, "objects") => return self.get_objects(it),
                (&Method::Put, "objects") => return self.put_objects(req, it),
                _ => return Ok(ok(Self::not_found()).boxed()),


    fn handle_error(e: Error) -> Response {
        match *e.kind() {
            BadRequest(ref message) => {
                return Response::new()
                    .with_header(ContentLength(message.len() as u64))
            _ => {
                error!("{}", e);

                for e in e.iter().skip(1) {
                    error!("caused by: {}", e);

                if let Some(backtrace) = e.backtrace() {
                    error!("{:?}", backtrace);

                return Response::new().with_status(StatusCode::InternalServerError);

impl Service for ReprotoService {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item = Response, Error = hyper::Error>>;

    fn call(&self, req: Request) -> Self::Future {
        let full_path = String::from(req.path());

        let path = full_path.split('/').skip(1);

        Box::new(self.inner_call(req, path)
            .unwrap_or_else(|e| ok(Self::handle_error(e)).boxed())
            .or_else(|e| ok(Self::handle_error(e))))