1use std::io::{self, Read, Seek, Cursor, Write, SeekFrom};
4use std;
5use std::fmt::{self, Display};
6use std::str::FromStr;
7use std::error;
8use std::thread::sleep;
9use std::time::Duration;
10
11use mime::{Mime, TopLevel, SubLevel, Attr, Value};
12use oauth2::{TokenType, Retry, self};
13use hyper;
14use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization, Header,
15 HeaderFormat, Bearer};
16use hyper::http::h1::LINE_ENDING;
17use hyper::method::Method;
18use hyper::status::StatusCode;
19
20use serde_json as json;
21
22pub trait Hub {}
26
27pub trait MethodsBuilder {}
29
30pub trait CallBuilder {}
32
33pub trait Resource {}
36
37pub trait ResponseResult {}
39
40pub trait RequestValue {}
42
43pub trait UnusedType {}
46
47pub trait Part {}
50
51pub trait NestedType {}
54
55pub trait ReadSeek: Seek + Read {}
57impl<T: Seek + Read> ReadSeek for T {}
58
59pub trait ToParts {
61 fn to_parts(&self) -> String;
62}
63
64#[derive(Deserialize)]
66pub struct JsonServerError {
67 pub error: String,
68 pub error_description: Option<String>
69}
70
71#[derive(Deserialize, Serialize, Debug)]
74pub struct ErrorResponse {
75 error: ServerError,
76}
77
78#[derive(Deserialize, Serialize, Debug)]
79pub struct ServerError {
80 errors: Vec<ServerMessage>,
81 code: u16,
82 message: String,
83}
84
85#[derive(Deserialize, Serialize, Debug)]
86pub struct ServerMessage {
87 domain: String,
88 reason: String,
89 message: String,
90 #[serde(rename="locationType")]
91 location_type: Option<String>,
92 location: Option<String>
93}
94
95#[derive(Copy, Clone)]
96pub struct DummyNetworkStream;
97
98impl Read for DummyNetworkStream {
99 fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
100 Ok(0)
101 }
102}
103
104impl Write for DummyNetworkStream {
105 fn write(&mut self, _: &[u8]) -> io::Result<usize> {
106 Ok(0)
107 }
108
109 fn flush(&mut self) -> io::Result<()> {
110 Ok(())
111 }
112}
113
114impl hyper::net::NetworkStream for DummyNetworkStream {
115 fn peer_addr(&mut self) -> io::Result<std::net::SocketAddr> {
116 Ok("127.0.0.1:1337".parse().unwrap())
117 }
118
119 fn set_read_timeout(&self, _dur: Option<Duration>) -> io::Result<()> {
120 Ok(())
121 }
122
123 fn set_write_timeout(&self, _dur: Option<Duration>) -> io::Result<()> {
124 Ok(())
125 }
126}
127
128
129pub trait Delegate {
135
136 fn begin(&mut self, MethodInfo) {}
143
144 fn http_error(&mut self, &hyper::Error) -> Retry {
151 Retry::Abort
152 }
153
154 fn api_key(&mut self) -> Option<String> {
158 None
159 }
160
161 fn token(&mut self, err: &error::Error) -> Option<oauth2::Token> {
167 let _ = err;
168 None
169 }
170
171 fn upload_url(&mut self) -> Option<String> {
179 None
180 }
181
182 fn store_upload_url(&mut self, url: Option<&str>) {
189 let _ = url;
190 }
191
192 fn response_json_decode_error(&mut self, json_encoded_value: &str, json_decode_error: &json::Error) {
201 let _ = json_encoded_value;
202 let _ = json_decode_error;
203 }
204
205 fn http_failure(&mut self, _: &hyper::client::Response, Option<JsonServerError>, _: Option<ServerError>) -> Retry {
214 Retry::Abort
215 }
216
217 fn pre_request(&mut self) { }
221
222 fn chunk_size(&mut self) -> u64 {
226 1 << 23
227 }
228
229 fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
234 let _ = chunk;
235 false
236 }
237
238 fn finished(&mut self, is_success: bool) {
247 let _ = is_success;
248 }
249}
250
251#[derive(Default)]
254pub struct DefaultDelegate;
255
256impl Delegate for DefaultDelegate {}
257
258
259#[derive(Debug)]
260pub enum Error {
261 HttpError(hyper::Error),
263
264 UploadSizeLimitExceeded(u64, u64),
267
268 BadRequest(ErrorResponse),
271
272 MissingAPIKey,
275
276 MissingToken(Box<error::Error>),
278
279 Cancelled,
281
282 FieldClash(&'static str),
284
285 JsonDecodeError(String, json::Error),
288
289 Failure(hyper::client::Response),
291}
292
293
294impl Display for Error {
295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296 match *self {
297 Error::HttpError(ref err) => err.fmt(f),
298 Error::UploadSizeLimitExceeded(ref resource_size, ref max_size) =>
299 writeln!(f, "The media size {} exceeds the maximum allowed upload size of {}"
300 , resource_size, max_size),
301 Error::MissingAPIKey => {
302 (writeln!(f, "The application's API key was not found in the configuration")).ok();
303 writeln!(f, "It is used as there are no Scopes defined for this method.")
304 },
305 Error::BadRequest(ref err) => {
306 try!(writeln!(f, "Bad Request ({}): {}", err.error.code, err.error.message));
307 for err in err.error.errors.iter() {
308 try!(writeln!(f, " {}: {}, {}{}",
309 err.domain,
310 err.message,
311 err.reason,
312 match &err.location {
313 &Some(ref loc) => format!("@{}", loc),
314 &None => String::new(),
315 }));
316 }
317 Ok(())
318 },
319 Error::MissingToken(ref err) =>
320 writeln!(f, "Token retrieval failed with error: {}", err),
321 Error::Cancelled =>
322 writeln!(f, "Operation cancelled by delegate"),
323 Error::FieldClash(field) =>
324 writeln!(f, "The custom parameter '{}' is already provided natively by the CallBuilder.", field),
325 Error::JsonDecodeError(ref json_str, ref err)
326 => writeln!(f, "{}: {}", err, json_str),
327 Error::Failure(ref response) =>
328 writeln!(f, "Http status indicates failure: {:?}", response),
329 }
330 }
331}
332
333impl error::Error for Error {
334 fn description(&self) -> &str {
335 match *self {
336 Error::HttpError(ref err) => err.description(),
337 Error::JsonDecodeError(_, ref err) => err.description(),
338 _ => "NO DESCRIPTION POSSIBLE - use `Display.fmt()` instead"
339 }
340 }
341
342 fn cause(&self) -> Option<&error::Error> {
343 match *self {
344 Error::HttpError(ref err) => err.cause(),
345 Error::JsonDecodeError(_, ref err) => err.cause(),
346 _ => None
347 }
348 }
349}
350
351pub type Result<T> = std::result::Result<T, Error>;
353
354pub struct MethodInfo {
356 pub id: &'static str,
357 pub http_method: Method,
358}
359
360const BOUNDARY: &'static str = "MDuXWGyeE33QFXGchb2VFWc4Z7945d";
361
362#[derive(Default)]
367pub struct MultiPartReader<'a> {
368 raw_parts: Vec<(Headers, &'a mut Read)>,
369 current_part: Option<(Cursor<Vec<u8>>, &'a mut Read)>,
370 last_part_boundary: Option<Cursor<Vec<u8>>>,
371}
372
373impl<'a> MultiPartReader<'a> {
374
375 pub fn reserve_exact(&mut self, cap: usize) {
377 self.raw_parts.reserve_exact(cap);
378 }
379
380 pub fn add_part(&mut self, reader: &'a mut Read, size: u64, mime_type: Mime) -> &mut MultiPartReader<'a> {
392 let mut headers = Headers::new();
393 headers.set(ContentType(mime_type));
394 headers.set(ContentLength(size));
395 self.raw_parts.push((headers, reader));
396 self
397 }
398
399 pub fn mime_type(&self) -> Mime {
402 Mime(
403 TopLevel::Multipart,
404 SubLevel::Ext("Related".to_string()),
405 vec![(Attr::Ext("boundary".to_string()), Value::Ext(BOUNDARY.to_string()))],
406 )
407 }
408
409 fn is_depleted(&self) -> bool {
411 self.raw_parts.len() == 0 && self.current_part.is_none() && self.last_part_boundary.is_none()
412 }
413
414 fn is_last_part(&self) -> bool {
416 self.raw_parts.len() == 0 && self.current_part.is_some()
417 }
418}
419
420impl<'a> Read for MultiPartReader<'a> {
421 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
422 match (self.raw_parts.len(),
423 self.current_part.is_none(),
424 self.last_part_boundary.is_none()) {
425 (_, _, false) => {
426 let br = self.last_part_boundary.as_mut().unwrap().read(buf).unwrap_or(0);
427 if br < buf.len() {
428 self.last_part_boundary = None;
429 }
430 return Ok(br)
431 },
432 (0, true, true) => return Ok(0),
433 (n, true, _) if n > 0 => {
434 let (headers, reader) = self.raw_parts.remove(0);
435 let mut c = Cursor::new(Vec::<u8>::new());
436 (write!(&mut c, "{}--{}{}{}{}", LINE_ENDING, BOUNDARY, LINE_ENDING,
437 headers, LINE_ENDING)).unwrap();
438 c.seek(SeekFrom::Start(0)).unwrap();
439 self.current_part = Some((c, reader));
440 }
441 _ => {},
442 }
443
444 let (hb, rr) = {
446 let &mut (ref mut c, ref mut reader) = self.current_part.as_mut().unwrap();
447 let b = c.read(buf).unwrap_or(0);
448 (b, reader.read(&mut buf[b..]))
449 };
450
451 match rr {
452 Ok(bytes_read) => {
453 if hb < buf.len() && bytes_read == 0 {
454 if self.is_last_part() {
455 self.last_part_boundary = Some(Cursor::new(
458 format!("{}--{}--", LINE_ENDING, BOUNDARY).into_bytes()))
459 }
460 self.current_part = None;
462 }
463 let mut total_bytes_read = hb + bytes_read;
464 while total_bytes_read < buf.len() && !self.is_depleted() {
465 match self.read(&mut buf[total_bytes_read ..]) {
466 Ok(br) => total_bytes_read += br,
467 Err(err) => return Err(err),
468 }
469 }
470 Ok(total_bytes_read)
471 }
472 Err(err) => {
473 self.current_part = None;
475 self.last_part_boundary = None;
476 self.raw_parts.clear();
477 Err(err)
478 }
479 }
480 }
481}
482
483#[derive(PartialEq, Debug, Clone)]
488pub struct XUploadContentType(pub Mime);
489
490impl ::std::ops::Deref for XUploadContentType {
491 type Target = Mime;
492 fn deref<'a>(&'a self) -> &'a Mime { &self.0 }
493}
494impl ::std::ops::DerefMut for XUploadContentType {
495 fn deref_mut<'a>(&'a mut self) -> &'a mut Mime { &mut self.0 }
496}
497impl Header for XUploadContentType {
498 fn header_name() -> &'static str { "X-Upload-Content-Type" }
499 fn parse_header(raw: &[Vec<u8>]) -> hyper::error::Result<Self> {
500 hyper::header::parsing::from_one_raw_str(raw).map(XUploadContentType)
501 }
502}
503impl HeaderFormat for XUploadContentType {
504 fn fmt_header(&self, f: &mut fmt::Formatter) -> fmt::Result {
505 Display::fmt(&**self, f)
506 }
507}
508impl Display for XUploadContentType {
509 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
510 fmt::Display::fmt(&**self, f)
511 }
512}
513
514#[derive(Clone, PartialEq, Debug)]
515pub struct Chunk {
516 pub first: u64,
517 pub last: u64
518}
519
520impl fmt::Display for Chunk {
521 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
522 (write!(fmt, "{}-{}", self.first, self.last)).ok();
523 Ok(())
524 }
525}
526
527impl FromStr for Chunk {
528 type Err = &'static str;
529
530 fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
532 let parts: Vec<&str> = s.split('-').collect();
533 if parts.len() != 2 {
534 return Err("Expected two parts: %i-%i")
535 }
536 Ok(
537 Chunk {
538 first: match FromStr::from_str(parts[0]) {
539 Ok(d) => d,
540 _ => return Err("Couldn't parse 'first' as digit")
541 },
542 last: match FromStr::from_str(parts[1]) {
543 Ok(d) => d,
544 _ => return Err("Couldn't parse 'last' as digit")
545 }
546 }
547 )
548 }
549}
550
551#[derive(Clone, PartialEq, Debug)]
553pub struct ContentRange {
554 pub range: Option<Chunk>,
555 pub total_length: u64,
556}
557
558impl Header for ContentRange {
559 fn header_name() -> &'static str {
560 "Content-Range"
561 }
562
563 fn parse_header(_: &[Vec<u8>]) -> hyper::error::Result<Self> {
565 Err(hyper::error::Error::Method)
566 }
567}
568
569
570impl HeaderFormat for ContentRange {
571 fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
572 try!(fmt.write_str("bytes "));
573 match self.range {
574 Some(ref c) => try!(c.fmt(fmt)),
575 None => try!(fmt.write_str("*"))
576 }
577 (write!(fmt, "/{}", self.total_length)).ok();
578 Ok(())
579 }
580}
581
582#[derive(Clone, PartialEq, Debug)]
583pub struct RangeResponseHeader(pub Chunk);
584
585impl Header for RangeResponseHeader {
586 fn header_name() -> &'static str {
587 "Range"
588 }
589
590 fn parse_header(raw: &[Vec<u8>]) -> hyper::error::Result<Self> {
591 if raw.len() > 0 {
592 let v = &raw[0];
593 if let Ok(s) = std::str::from_utf8(v) {
594 const PREFIX: &'static str = "bytes ";
595 if s.starts_with(PREFIX) {
596 if let Ok(c) = <Chunk as FromStr>::from_str(&s[PREFIX.len()..]) {
597 return Ok(RangeResponseHeader(c))
598 }
599 }
600 }
601 }
602 Err(hyper::error::Error::Method)
603 }
604}
605
606impl HeaderFormat for RangeResponseHeader {
607 fn fmt_header(&self, _: &mut fmt::Formatter) -> fmt::Result {
609 Err(fmt::Error)
610 }
611}
612
613pub struct ResumableUploadHelper<'a, A: 'a> {
615 pub client: &'a mut hyper::client::Client,
616 pub delegate: &'a mut Delegate,
617 pub start_at: Option<u64>,
618 pub auth: &'a mut A,
619 pub user_agent: &'a str,
620 pub auth_header: Authorization<Bearer>,
621 pub url: &'a str,
622 pub reader: &'a mut ReadSeek,
623 pub media_type: Mime,
624 pub content_length: u64
625}
626
627impl<'a, A> ResumableUploadHelper<'a, A>
628 where A: oauth2::GetToken {
629
630 fn query_transfer_status(&mut self) -> std::result::Result<u64, hyper::Result<hyper::client::Response>> {
631 loop {
632 match self.client.post(self.url)
633 .header(UserAgent(self.user_agent.to_string()))
634 .header(ContentRange { range: None, total_length: self.content_length })
635 .header(self.auth_header.clone())
636 .send() {
637 Ok(r) => {
638 let headers = r.headers.clone();
640 let h: &RangeResponseHeader = match headers.get() {
641 Some(hh) if r.status == StatusCode::PermanentRedirect => hh,
642 None|Some(_) => {
643 if let Retry::After(d) = self.delegate.http_failure(&r, None, None) {
644 sleep(d);
645 continue;
646 }
647 return Err(Ok(r))
648 }
649 };
650 return Ok(h.0.last)
651 }
652 Err(err) => {
653 if let Retry::After(d) = self.delegate.http_error(&err) {
654 sleep(d);
655 continue;
656 }
657 return Err(Err(err))
658 }
659 }
660 }
661 }
662
663 pub fn upload(&mut self) -> Option<hyper::Result<hyper::client::Response>> {
667 let mut start = match self.start_at {
668 Some(s) => s,
669 None => match self.query_transfer_status() {
670 Ok(s) => s,
671 Err(result) => return Some(result)
672 }
673 };
674
675 const MIN_CHUNK_SIZE: u64 = 1 << 18;
676 let chunk_size = match self.delegate.chunk_size() {
677 cs if cs > MIN_CHUNK_SIZE => cs,
678 _ => MIN_CHUNK_SIZE
679 };
680
681 self.reader.seek(SeekFrom::Start(start)).unwrap();
682 loop {
683 let request_size = match self.content_length - start {
684 rs if rs > chunk_size => chunk_size,
685 rs => rs
686 };
687
688 let mut section_reader = self.reader.take(request_size);
689 let range_header = ContentRange {
690 range: Some(Chunk {first: start, last: start + request_size - 1}),
691 total_length: self.content_length
692 };
693 start += request_size;
694 if self.delegate.cancel_chunk_upload(&range_header) {
695 return None
696 }
697 let res = self.client.post(self.url)
698 .header(range_header)
699 .header(ContentType(self.media_type.clone()))
700 .header(UserAgent(self.user_agent.to_string()))
701 .body(&mut section_reader)
702 .send();
703 match res {
704 Ok(mut res) => {
705 if res.status == StatusCode::PermanentRedirect {
706 continue
707 }
708 if !res.status.is_success() {
709 let mut json_err = String::new();
710 res.read_to_string(&mut json_err).unwrap();
711 if let Retry::After(d) = self.delegate.http_failure(&res,
712 json::from_str(&json_err).ok(),
713 json::from_str(&json_err).ok()) {
714 sleep(d);
715 continue;
716 }
717 }
718 return Some(Ok(res))
719 },
720 Err(err) => {
721 if let Retry::After(d) = self.delegate.http_error(&err) {
722 sleep(d);
723 continue;
724 }
725 return Some(Err(err))
726 }
727 }
728 }
729 }
730}
731
732pub fn remove_json_null_values(value: &mut json::value::Value) {
735 match *value {
736 json::value::Value::Object(ref mut map) => {
737 let mut for_removal = Vec::new();
738
739 for (key, mut value) in map.iter_mut() {
740 if value.is_null() {
741 for_removal.push(key.clone());
742 } else {
743 remove_json_null_values(&mut value);
744 }
745 }
746
747 for key in &for_removal {
748 map.remove(key);
749 }
750 }
751 _ => {}
752 }
753}