1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//! ## Load objects from a remote repository over HTTP

extern crate futures;
extern crate hyper;
extern crate hyper_rustls;
extern crate reproto_core as core;
extern crate reproto_repository as repository;
extern crate tokio_core;
extern crate url;

use core::Source;
use core::errors::{Error, Result};
use futures::future::{err, ok};
use futures::{Future, Stream};
use hyper::header::ContentLength;
use hyper::{Client, Method, Request, StatusCode};
use repository::{CachedObjects, Checksum, HexSlice, Objects, ObjectsConfig};
use std::io::Read;
use std::time::Duration;
use tokio_core::reactor::Core;
use url::Url;

pub struct HttpObjects {
    url: Url,
    core: Core,
}

impl HttpObjects {
    pub fn new(url: Url, core: Core) -> HttpObjects {
        HttpObjects {
            url: url,
            core: core,
        }
    }

    fn checksum_url(&self, checksum: &Checksum) -> Result<hyper::Uri> {
        let url = self.url.join(HexSlice::new(checksum).to_string().as_ref())?;

        let url = url.to_string()
            .parse::<hyper::Uri>()
            .map_err(|e| format!("Failed to parse URL: {}: {}", e, url))?;

        Ok(url)
    }

    fn handle_request(
        &mut self,
        request: Request,
    ) -> Box<Future<Item = (Vec<u8>, StatusCode), Error = Error>> {
        let handle = self.core.handle();

        let client = Client::configure()
            .connector(hyper_rustls::HttpsConnector::new(4, &handle))
            .build(&handle);

        let body_and_status = client
            .request(request)
            .map_err::<_, Error>(|e| format!("Request to repository failed: {}", e).into())
            .and_then(|res| {
                let status = res.status().clone();

                res.body()
                    .map_err::<Error, _>(|e| format!("Failed to perform request: {}", e).into())
                    .fold(Vec::new(), |mut out: Vec<u8>, chunk| {
                        out.extend(chunk.as_ref());
                        ok::<_, Error>(out)
                    })
                    .map(move |body| (body, status))
            });

        Box::new(body_and_status)
    }
}

impl Objects for HttpObjects {
    fn put_object(&mut self, checksum: &Checksum, source: &mut Read, _force: bool) -> Result<bool> {
        let mut buffer = Vec::new();
        source.read_to_end(&mut buffer)?;

        let url = self.checksum_url(checksum)?;

        let mut request = Request::new(Method::Put, url);
        request
            .headers_mut()
            .set(ContentLength(buffer.len() as u64));
        request.set_body(buffer);

        let work = self.handle_request(request).and_then(|(body, status)| {
            if !status.is_success() {
                if let Ok(body) = String::from_utf8(body) {
                    return err(format!("bad response: {}: {}", status, body).into());
                }

                return err(format!("bad response: {}", status).into());
            }

            ok(())
        });

        self.core.run(work)?;

        // TODO: use status code to determine if the upload resulted in changes or not.
        Ok(true)
    }

    fn get_object(&mut self, checksum: &Checksum) -> Result<Option<Source>> {
        let url = self.checksum_url(checksum)?;
        let name = url.to_string();

        let request = Request::new(Method::Get, url);

        let work = self.handle_request(request).and_then(|(body, status)| {
            if status.is_success() {
                return ok(Some(body));
            }

            if status == StatusCode::NotFound {
                return ok(None);
            }

            if let Ok(body) = String::from_utf8(body) {
                return err(format!("bad response: {}: {}", status, body).into());
            }

            return err(format!("bad response: {}", status).into());
        });

        let out = self.core.run(work)?;
        Ok(out.map(|out| Source::bytes(name, out)))
    }
}

/// Load objects from an HTTP url.
pub fn objects_from_url(config: ObjectsConfig, url: &Url) -> Result<Box<Objects>> {
    let core = Core::new()?;

    let http_objects = HttpObjects::new(url.clone(), core);

    if let Some(cache_home) = config.cache_home {
        let missing_cache_time = config
            .missing_cache_time
            .unwrap_or_else(|| Duration::new(60, 0));

        return Ok(Box::new(CachedObjects::new(
            cache_home,
            missing_cache_time,
            http_objects,
        )));
    }

    Ok(Box::new(http_objects))
}