use std::io::{self, Read, Seek, Cursor, Write, SeekFrom};
use std;
use std::fmt::{self, Display};
use std::str::FromStr;
use std::thread::sleep_ms;
use mime::{Mime, TopLevel, SubLevel, Attr, Value};
use oauth2::{TokenType, Retry, self};
use hyper;
use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization, Header,
HeaderFormat};
use hyper::http::LINE_ENDING;
use hyper::method::Method;
use hyper::status::StatusCode;
use serde;
pub trait Hub {}
pub trait MethodsBuilder {}
pub trait CallBuilder {}
pub trait Resource {}
pub trait ResponseResult {}
pub trait RequestValue {}
pub trait UnusedType {}
pub trait Part {}
pub trait NestedType {}
pub trait ReadSeek: Seek + Read {}
impl<T: Seek + Read> ReadSeek for T {}
pub trait ToParts {
fn to_parts(&self) -> String;
}
#[derive(Deserialize)]
pub struct JsonServerError {
pub error: String,
pub error_description: Option<String>
}
#[derive(Copy, Clone)]
pub struct DummyNetworkStream;
impl Read for DummyNetworkStream {
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
Ok(0)
}
}
impl Write for DummyNetworkStream {
fn write(&mut self, _: &[u8]) -> io::Result<usize> {
Ok(0)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl hyper::net::NetworkStream for DummyNetworkStream {
fn peer_addr(&mut self) -> io::Result<std::net::SocketAddr> {
Ok("127.0.0.1:1337".parse().unwrap())
}
}
pub trait Delegate {
fn begin(&mut self, MethodInfo) {}
fn http_error(&mut self, &hyper::HttpError) -> Retry {
Retry::Abort
}
fn api_key(&mut self) -> Option<String> {
None
}
fn token(&mut self) -> Option<oauth2::Token> {
None
}
fn upload_url(&mut self) -> Option<String> {
None
}
fn store_upload_url(&mut self, url: &str) {
let _ = url;
}
fn response_json_decode_error(&mut self, json_encoded_value: &str, json_decode_error: &serde::json::Error) {
let _ = json_encoded_value;
let _ = json_decode_error;
}
fn http_failure(&mut self, _: &hyper::client::Response, Option<JsonServerError>) -> Retry {
Retry::Abort
}
fn pre_request(&mut self) { }
fn chunk_size(&mut self) -> u64 {
1 << 23
}
fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
let _ = chunk;
false
}
fn finished(&mut self, is_success: bool) {
let _ = is_success;
}
}
#[derive(Default)]
pub struct DefaultDelegate;
impl Delegate for DefaultDelegate {}
pub enum Error {
HttpError(hyper::HttpError),
UploadSizeLimitExceeded(u64, u64),
MissingAPIKey,
MissingToken,
Cancelled,
FieldClash(&'static str),
JsonDecodeError(serde::json::Error),
Failure(hyper::client::Response),
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct MethodInfo {
pub id: &'static str,
pub http_method: Method,
}
const BOUNDARY: &'static str = "MDuXWGyeE33QFXGchb2VFWc4Z7945d";
#[derive(Default)]
pub struct MultiPartReader<'a> {
raw_parts: Vec<(Headers, &'a mut Read)>,
current_part: Option<(Cursor<Vec<u8>>, &'a mut Read)>,
last_part_boundary: Option<Cursor<Vec<u8>>>,
}
impl<'a> MultiPartReader<'a> {
pub fn reserve_exact(&mut self, cap: usize) {
self.raw_parts.reserve_exact(cap);
}
pub fn add_part(&mut self, reader: &'a mut Read, size: u64, mime_type: Mime) -> &mut MultiPartReader<'a> {
let mut headers = Headers::new();
headers.set(ContentType(mime_type));
headers.set(ContentLength(size));
self.raw_parts.push((headers, reader));
self
}
pub fn mime_type(&self) -> Mime {
Mime(
TopLevel::Multipart,
SubLevel::Ext("Related".to_string()),
vec![(Attr::Ext("boundary".to_string()), Value::Ext(BOUNDARY.to_string()))],
)
}
fn is_depleted(&self) -> bool {
self.raw_parts.len() == 0 && self.current_part.is_none() && self.last_part_boundary.is_none()
}
fn is_last_part(&self) -> bool {
self.raw_parts.len() == 0 && self.current_part.is_some()
}
}
impl<'a> Read for MultiPartReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match (self.raw_parts.len(),
self.current_part.is_none(),
self.last_part_boundary.is_none()) {
(_, _, false) => {
let br = self.last_part_boundary.as_mut().unwrap().read(buf).unwrap_or(0);
if br < buf.len() {
self.last_part_boundary = None;
}
return Ok(br)
},
(0, true, true) => return Ok(0),
(n, true, _) if n > 0 => {
let (headers, reader) = self.raw_parts.remove(0);
let mut c = Cursor::new(Vec::<u8>::new());
write!(&mut c, "{}--{}{}{}{}", LINE_ENDING, BOUNDARY, LINE_ENDING,
headers, LINE_ENDING).unwrap();
c.seek(SeekFrom::Start(0)).unwrap();
self.current_part = Some((c, reader));
}
_ => {},
}
let (hb, rr) = {
let &mut (ref mut c, ref mut reader) = self.current_part.as_mut().unwrap();
let b = c.read(buf).unwrap_or(0);
(b, reader.read(&mut buf[b..]))
};
match rr {
Ok(bytes_read) => {
if hb < buf.len() && bytes_read == 0 {
if self.is_last_part() {
self.last_part_boundary = Some(Cursor::new(
format!("{}--{}", LINE_ENDING, BOUNDARY).into_bytes()))
}
self.current_part = None;
}
let mut total_bytes_read = hb + bytes_read;
while total_bytes_read < buf.len() && !self.is_depleted() {
match self.read(&mut buf[total_bytes_read ..]) {
Ok(br) => total_bytes_read += br,
Err(err) => return Err(err),
}
}
Ok(total_bytes_read)
}
Err(err) => {
self.current_part = None;
self.last_part_boundary = None;
self.raw_parts.clear();
Err(err)
}
}
}
}
header!{
#[doc="The `X-Upload-Content-Type` header."]
(XUploadContentType, "X-Upload-Content-Type") => [Mime]
}
#[derive(Clone, PartialEq, Debug)]
pub struct Chunk {
pub first: u64,
pub last: u64
}
impl fmt::Display for Chunk {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}-{}", self.first, self.last).ok();
Ok(())
}
}
impl FromStr for Chunk {
type Err = &'static str;
fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
let parts: Vec<&str> = s.split('-').collect();
if parts.len() != 2 {
return Err("Expected two parts: %i-%i")
}
Ok(
Chunk {
first: match FromStr::from_str(parts[0]) {
Ok(d) => d,
_ => return Err("Couldn't parse 'first' as digit")
},
last: match FromStr::from_str(parts[1]) {
Ok(d) => d,
_ => return Err("Couldn't parse 'last' as digit")
}
}
)
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct ContentRange {
pub range: Option<Chunk>,
pub total_length: u64,
}
impl Header for ContentRange {
fn header_name() -> &'static str {
"Content-Range"
}
fn parse_header(_: &[Vec<u8>]) -> Option<ContentRange> {
None
}
}
impl HeaderFormat for ContentRange {
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
try!(fmt.write_str("bytes "));
match self.range {
Some(ref c) => try!(c.fmt(fmt)),
None => try!(fmt.write_str("*"))
}
write!(fmt, "/{}", self.total_length).ok();
Ok(())
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct RangeResponseHeader(pub Chunk);
impl Header for RangeResponseHeader {
fn header_name() -> &'static str {
"Range"
}
fn parse_header(raw: &[Vec<u8>]) -> Option<RangeResponseHeader> {
if let [ref v] = raw {
if let Ok(s) = std::str::from_utf8(v) {
const PREFIX: &'static str = "bytes=";
if s.starts_with(PREFIX) {
if let Ok(c) = <Chunk as FromStr>::from_str(&s[PREFIX.len()..]) {
return Some(RangeResponseHeader(c))
}
}
}
}
None
}
}
impl HeaderFormat for RangeResponseHeader {
fn fmt_header(&self, _: &mut fmt::Formatter) -> fmt::Result {
Err(fmt::Error)
}
}
pub struct ResumableUploadHelper<'a, A: 'a> {
pub client: &'a mut hyper::client::Client,
pub delegate: &'a mut Delegate,
pub start_at: Option<u64>,
pub auth: &'a mut A,
pub user_agent: &'a str,
pub auth_header: Authorization<oauth2::Scheme>,
pub url: &'a str,
pub reader: &'a mut ReadSeek,
pub media_type: Mime,
pub content_length: u64
}
impl<'a, A> ResumableUploadHelper<'a, A>
where A: oauth2::GetToken {
fn query_transfer_status(&mut self) -> std::result::Result<u64, hyper::HttpResult<hyper::client::Response>> {
loop {
match self.client.post(self.url)
.header(UserAgent(self.user_agent.to_string()))
.header(ContentRange { range: None, total_length: self.content_length })
.header(self.auth_header.clone())
.send() {
Ok(r) => {
let headers = r.headers.clone();
let h: &RangeResponseHeader = match headers.get() {
Some(hh) if r.status == StatusCode::PermanentRedirect => hh,
None|Some(_) => {
if let Retry::After(d) = self.delegate.http_failure(&r, None) {
sleep_ms(d.num_milliseconds() as u32);
continue;
}
return Err(Ok(r))
}
};
return Ok(h.0.last)
}
Err(err) => {
if let Retry::After(d) = self.delegate.http_error(&err) {
sleep_ms(d.num_milliseconds() as u32);
continue;
}
return Err(Err(err))
}
}
}
}
pub fn upload(&mut self) -> Option<hyper::HttpResult<hyper::client::Response>> {
let mut start = match self.start_at {
Some(s) => s,
None => match self.query_transfer_status() {
Ok(s) => s,
Err(result) => return Some(result)
}
};
const MIN_CHUNK_SIZE: u64 = 1 << 18;
let chunk_size = match self.delegate.chunk_size() {
cs if cs > MIN_CHUNK_SIZE => cs,
_ => MIN_CHUNK_SIZE
};
self.reader.seek(SeekFrom::Start(start)).unwrap();
loop {
let request_size = match self.content_length - start {
rs if rs > chunk_size => chunk_size,
rs => rs
};
let mut section_reader = self.reader.take(request_size);
let range_header = ContentRange {
range: Some(Chunk {first: start, last: start + request_size - 1}),
total_length: self.content_length
};
start += request_size;
if self.delegate.cancel_chunk_upload(&range_header) {
return None
}
match self.client.post(self.url)
.header(range_header)
.header(ContentType(self.media_type.clone()))
.header(UserAgent(self.user_agent.to_string()))
.body(&mut section_reader)
.send() {
Ok(mut res) => {
if res.status == StatusCode::PermanentRedirect {
continue
}
if !res.status.is_success() {
let mut json_err = String::new();
res.read_to_string(&mut json_err).unwrap();
if let Retry::After(d) = self.delegate.http_failure(&res, serde::json::from_str(&json_err).ok()) {
sleep_ms(d.num_milliseconds() as u32);
continue;
}
}
return Some(Ok(res))
},
Err(err) => {
if let Retry::After(d) = self.delegate.http_error(&err) {
sleep_ms(d.num_milliseconds() as u32);
continue;
}
return Some(Err(err))
}
}
}
}
}