1use std::error;
4use std::fmt::{self, Display};
5use std::io::{self, Cursor, Read, Seek, SeekFrom, Write};
6use std::str::FromStr;
7use std::thread::sleep;
8use std::time::Duration;
9
10use itertools::Itertools;
11
12use hyper::body::Buf;
13use hyper::header::{HeaderMap, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT};
14use hyper::Method;
15use hyper::StatusCode;
16
17use mime::{Attr, Mime, SubLevel, TopLevel, Value};
18
19use serde_json as json;
20
21const LINE_ENDING: &str = "\r\n";
22
23pub enum Retry {
24 Abort,
26 After(Duration),
28}
29
30pub trait Hub {}
34
35pub trait MethodsBuilder {}
37
38pub trait CallBuilder {}
40
41pub trait Resource {}
44
45pub trait ResponseResult {}
47
48pub trait RequestValue {}
50
51pub trait UnusedType {}
54
55pub trait Part {}
58
59pub trait NestedType {}
62
63pub trait ReadSeek: Seek + Read + Send {}
65impl<T: Seek + Read + Send> ReadSeek for T {}
66
67pub trait ToParts {
69 fn to_parts(&self) -> String;
70}
71
72#[derive(Deserialize)]
74pub struct JsonServerError {
75 pub error: String,
76 pub error_description: Option<String>,
77}
78
79#[derive(Deserialize, Serialize, Debug)]
82pub struct ErrorResponse {
83 pub error: ServerError,
84}
85
86#[derive(Deserialize, Serialize, Debug)]
87pub struct ServerError {
88 pub errors: Vec<ServerMessage>,
89 pub code: u16,
90 pub message: String,
91}
92
93#[derive(Deserialize, Serialize, Debug)]
94pub struct ServerMessage {
95 pub domain: String,
96 pub reason: String,
97 pub message: String,
98 #[serde(rename = "locationType")]
99 pub location_type: Option<String>,
100 pub location: Option<String>,
101}
102
103pub trait Delegate: Send {
109 fn begin(&mut self, _info: MethodInfo) {}
116
117 fn http_error(&mut self, _err: &hyper::Error) -> Retry {
124 Retry::Abort
125 }
126
127 fn api_key(&mut self) -> Option<String> {
131 None
132 }
133
134 fn token(&mut self, err: &oauth2::Error) -> Option<oauth2::AccessToken> {
140 let _ = err;
141 None
142 }
143
144 fn upload_url(&mut self) -> Option<String> {
152 None
153 }
154
155 fn store_upload_url(&mut self, url: Option<&str>) {
162 let _ = url;
163 }
164
165 fn response_json_decode_error(
174 &mut self,
175 json_encoded_value: &str,
176 json_decode_error: &json::Error,
177 ) {
178 let _ = json_encoded_value;
179 let _ = json_decode_error;
180 }
181
182 fn http_failure(
191 &mut self,
192 _: &hyper::Response<hyper::body::Body>,
193 _err: Option<JsonServerError>,
194 _: Option<ServerError>,
195 ) -> Retry {
196 Retry::Abort
197 }
198
199 fn pre_request(&mut self) {}
203
204 fn chunk_size(&mut self) -> u64 {
208 1 << 23
209 }
210
211 fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
216 let _ = chunk;
217 false
218 }
219
220 fn finished(&mut self, is_success: bool) {
229 let _ = is_success;
230 }
231}
232
233#[derive(Default)]
236pub struct DefaultDelegate;
237
238impl Delegate for DefaultDelegate {}
239
240#[derive(Debug)]
241pub enum Error {
242 HttpError(hyper::Error),
244
245 UploadSizeLimitExceeded(u64, u64),
248
249 BadRequest(ErrorResponse),
252
253 MissingAPIKey,
256
257 MissingToken(oauth2::Error),
259
260 Cancelled,
262
263 FieldClash(&'static str),
265
266 JsonDecodeError(String, json::Error),
269
270 Failure(hyper::Response<hyper::body::Body>),
272
273 Io(std::io::Error),
275}
276
277impl Display for Error {
278 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
279 match *self {
280 Error::Io(ref err) => err.fmt(f),
281 Error::HttpError(ref err) => err.fmt(f),
282 Error::UploadSizeLimitExceeded(ref resource_size, ref max_size) => writeln!(
283 f,
284 "The media size {} exceeds the maximum allowed upload size of {}",
285 resource_size, max_size
286 ),
287 Error::MissingAPIKey => {
288 (writeln!(
289 f,
290 "The application's API key was not found in the configuration"
291 ))
292 .ok();
293 writeln!(
294 f,
295 "It is used as there are no Scopes defined for this method."
296 )
297 }
298 Error::BadRequest(ref err) => {
299 writeln!(f, "Bad Request ({}): {}", err.error.code, err.error.message)?;
300 for err in err.error.errors.iter() {
301 writeln!(
302 f,
303 " {}: {}, {}{}",
304 err.domain,
305 err.message,
306 err.reason,
307 match err.location {
308 Some(ref loc) => format!("@{}", loc),
309 None => String::new(),
310 }
311 )?;
312 }
313 Ok(())
314 }
315 Error::MissingToken(ref err) => {
316 writeln!(f, "Token retrieval failed with error: {}", err)
317 }
318 Error::Cancelled => writeln!(f, "Operation cancelled by delegate"),
319 Error::FieldClash(field) => writeln!(
320 f,
321 "The custom parameter '{}' is already provided natively by the CallBuilder.",
322 field
323 ),
324 Error::JsonDecodeError(ref json_str, ref err) => writeln!(f, "{}: {}", err, json_str),
325 Error::Failure(ref response) => {
326 writeln!(f, "Http status indicates failure: {:?}", response)
327 }
328 }
329 }
330}
331
332impl error::Error for Error {
333 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
334 match *self {
335 Error::HttpError(ref err) => err.source(),
336 Error::JsonDecodeError(_, ref err) => err.source(),
337 _ => None,
338 }
339 }
340}
341
342impl From<std::io::Error> for Error {
343 fn from(err: std::io::Error) -> Self {
344 Error::Io(err)
345 }
346}
347
348pub type Result<T> = std::result::Result<T, Error>;
350
351pub struct MethodInfo {
353 pub id: &'static str,
354 pub http_method: Method,
355}
356
357const BOUNDARY: &str = "MDuXWGyeE33QFXGchb2VFWc4Z7945d";
358
359#[derive(Default)]
364pub struct MultiPartReader<'a> {
365 raw_parts: Vec<(HeaderMap, &'a mut (dyn Read + Send))>,
366 current_part: Option<(Cursor<Vec<u8>>, &'a mut (dyn Read + Send))>,
367 last_part_boundary: Option<Cursor<Vec<u8>>>,
368}
369
370impl<'a> MultiPartReader<'a> {
371 pub fn reserve_exact(&mut self, cap: usize) {
373 self.raw_parts.reserve_exact(cap);
374 }
375
376 pub fn add_part(
388 &mut self,
389 reader: &'a mut (dyn Read + Send),
390 size: u64,
391 mime_type: Mime,
392 ) -> &mut MultiPartReader<'a> {
393 let mut headers = HeaderMap::new();
394 headers.insert(
395 CONTENT_TYPE,
396 hyper::header::HeaderValue::from_str(&format!("{}", mime_type)).unwrap(),
397 );
398 headers.insert(CONTENT_LENGTH, size.into());
399 self.raw_parts.push((headers, reader));
400 self
401 }
402
403 pub fn mime_type(&self) -> Mime {
406 Mime(
407 TopLevel::Multipart,
408 SubLevel::Ext("related".to_string()),
409 vec![(
410 Attr::Ext("boundary".to_string()),
411 Value::Ext(BOUNDARY.to_string()),
412 )],
413 )
414 }
415
416 fn is_depleted(&self) -> bool {
418 self.raw_parts.is_empty()
419 && self.current_part.is_none()
420 && self.last_part_boundary.is_none()
421 }
422
423 fn is_last_part(&self) -> bool {
425 self.raw_parts.is_empty() && self.current_part.is_some()
426 }
427}
428
429impl<'a> Read for MultiPartReader<'a> {
430 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
431 match (
432 self.raw_parts.len(),
433 self.current_part.is_none(),
434 self.last_part_boundary.is_none(),
435 ) {
436 (_, _, false) => {
437 let br = self
438 .last_part_boundary
439 .as_mut()
440 .unwrap()
441 .read(buf)
442 .unwrap_or(0);
443 if br < buf.len() {
444 self.last_part_boundary = None;
445 }
446 return Ok(br);
447 }
448 (0, true, true) => return Ok(0),
449 (n, true, _) if n > 0 => {
450 let (headers, reader) = self.raw_parts.remove(0);
451 let mut c = Cursor::new(Vec::<u8>::new());
452 (write!(
455 &mut c,
456 "{}--{}{}{}{}{}",
457 LINE_ENDING,
458 BOUNDARY,
459 LINE_ENDING,
460 headers
461 .iter()
462 .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap()))
463 .join(LINE_ENDING),
464 LINE_ENDING,
465 LINE_ENDING,
466 ))
467 .unwrap();
468 c.seek(SeekFrom::Start(0)).unwrap();
469 self.current_part = Some((c, reader));
470 }
471 _ => {}
472 }
473
474 let (hb, rr) = {
476 let &mut (ref mut c, ref mut reader) = self.current_part.as_mut().unwrap();
477 let b = c.read(buf).unwrap_or(0);
478 (b, reader.read(&mut buf[b..]))
479 };
480
481 match rr {
482 Ok(bytes_read) => {
483 if hb < buf.len() && bytes_read == 0 {
484 if self.is_last_part() {
485 self.last_part_boundary = Some(Cursor::new(
488 format!("{}--{}--{}", LINE_ENDING, BOUNDARY, LINE_ENDING).into_bytes(),
489 ))
490 }
491 self.current_part = None;
493 }
494 let mut total_bytes_read = hb + bytes_read;
495 while total_bytes_read < buf.len() && !self.is_depleted() {
496 match self.read(&mut buf[total_bytes_read..]) {
497 Ok(br) => total_bytes_read += br,
498 Err(err) => return Err(err),
499 }
500 }
501 Ok(total_bytes_read)
502 }
503 Err(err) => {
504 self.current_part = None;
506 self.last_part_boundary = None;
507 self.raw_parts.clear();
508 Err(err)
509 }
510 }
511 }
512}
513
514#[derive(PartialEq, Debug, Clone)]
519pub struct XUploadContentType(pub Mime);
520
521impl ::std::ops::Deref for XUploadContentType {
522 type Target = Mime;
523 fn deref(&self) -> &Mime {
524 &self.0
525 }
526}
527impl ::std::ops::DerefMut for XUploadContentType {
528 fn deref_mut(&mut self) -> &mut Mime {
529 &mut self.0
530 }
531}
532impl Display for XUploadContentType {
533 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
534 fmt::Display::fmt(&**self, f)
535 }
536}
537
538#[derive(Clone, PartialEq, Debug)]
539pub struct Chunk {
540 pub first: u64,
541 pub last: u64,
542}
543
544impl fmt::Display for Chunk {
545 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
546 (write!(fmt, "{}-{}", self.first, self.last)).ok();
547 Ok(())
548 }
549}
550
551impl FromStr for Chunk {
552 type Err = &'static str;
553
554 fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
556 let parts: Vec<&str> = s.split('-').collect();
557 if parts.len() != 2 {
558 return Err("Expected two parts: %i-%i");
559 }
560 Ok(Chunk {
561 first: match FromStr::from_str(parts[0]) {
562 Ok(d) => d,
563 _ => return Err("Couldn't parse 'first' as digit"),
564 },
565 last: match FromStr::from_str(parts[1]) {
566 Ok(d) => d,
567 _ => return Err("Couldn't parse 'last' as digit"),
568 },
569 })
570 }
571}
572
573#[derive(Clone, PartialEq, Debug)]
575pub struct ContentRange {
576 pub range: Option<Chunk>,
577 pub total_length: u64,
578}
579
580impl ContentRange {
581 pub fn header_value(&self) -> String {
582 format!(
583 "bytes {}/{}",
584 match self.range {
585 Some(ref c) => format!("{}", c),
586 None => "*".to_string(),
587 },
588 self.total_length
589 )
590 }
591}
592
593#[derive(Clone, PartialEq, Debug)]
594pub struct RangeResponseHeader(pub Chunk);
595
596impl RangeResponseHeader {
597 fn from_bytes(raw: &[u8]) -> Self {
598 if !raw.is_empty() {
599 if let Ok(s) = std::str::from_utf8(raw) {
600 const PREFIX: &str = "bytes ";
601 if let Some(stripped) = s.strip_prefix(PREFIX) {
602 if let Ok(c) = <Chunk as FromStr>::from_str(&stripped) {
603 return RangeResponseHeader(c);
604 }
605 }
606 }
607 }
608
609 panic!("Unable to parse Range header {:?}", raw)
610 }
611}
612
613pub struct ResumableUploadHelper<'a, A: 'a> {
615 pub client: &'a hyper::client::Client<
616 hyper_rustls::HttpsConnector<hyper::client::connect::HttpConnector>,
617 hyper::body::Body,
618 >,
619 pub delegate: &'a mut dyn Delegate,
620 pub start_at: Option<u64>,
621 pub auth: &'a A,
622 pub user_agent: &'a str,
623 pub auth_header: String,
624 pub url: &'a str,
625 pub reader: &'a mut dyn ReadSeek,
626 pub media_type: Mime,
627 pub content_length: u64,
628}
629
630impl<'a, A> ResumableUploadHelper<'a, A> {
631 async fn query_transfer_status(
632 &mut self,
633 ) -> std::result::Result<u64, hyper::Result<hyper::Response<hyper::body::Body>>> {
634 loop {
635 match self
636 .client
637 .request(
638 hyper::Request::builder()
639 .method(hyper::Method::POST)
640 .uri(self.url)
641 .header(USER_AGENT, self.user_agent.to_string())
642 .header(
643 "Content-Range",
644 ContentRange {
645 range: None,
646 total_length: self.content_length,
647 }
648 .header_value(),
649 )
650 .header(AUTHORIZATION, self.auth_header.clone())
651 .body(hyper::body::Body::empty())
652 .unwrap(),
653 )
654 .await
655 {
656 Ok(r) => {
657 let headers = r.headers().clone();
659 let h: RangeResponseHeader = match headers.get("Range") {
660 Some(hh) if r.status() == StatusCode::PERMANENT_REDIRECT => {
661 RangeResponseHeader::from_bytes(hh.as_bytes())
662 }
663 None | Some(_) => {
664 if let Retry::After(d) = self.delegate.http_failure(&r, None, None) {
665 sleep(d);
666 continue;
667 }
668 return Err(Ok(r));
669 }
670 };
671 return Ok(h.0.last);
672 }
673 Err(err) => {
674 if let Retry::After(d) = self.delegate.http_error(&err) {
675 sleep(d);
676 continue;
677 }
678 return Err(Err(err));
679 }
680 }
681 }
682 }
683
684 pub async fn upload(&mut self) -> Option<hyper::Result<hyper::Response<hyper::body::Body>>> {
688 let mut start = match self.start_at {
689 Some(s) => s,
690 None => match self.query_transfer_status().await {
691 Ok(s) => s,
692 Err(result) => return Some(result),
693 },
694 };
695
696 const MIN_CHUNK_SIZE: u64 = 1 << 18;
697 let chunk_size = match self.delegate.chunk_size() {
698 cs if cs > MIN_CHUNK_SIZE => cs,
699 _ => MIN_CHUNK_SIZE,
700 };
701
702 loop {
703 self.reader.seek(SeekFrom::Start(start)).unwrap();
704
705 let request_size = match self.content_length - start {
706 rs if rs > chunk_size => chunk_size,
707 rs => rs,
708 };
709
710 let mut section_reader = self.reader.take(request_size);
711 let mut req_bytes = vec![];
712 section_reader.read_to_end(&mut req_bytes).unwrap();
713 let range_header = ContentRange {
714 range: Some(Chunk {
715 first: start,
716 last: start + request_size - 1,
717 }),
718 total_length: self.content_length,
719 };
720 if self.delegate.cancel_chunk_upload(&range_header) {
721 return None;
722 }
723 let res = self
724 .client
725 .request(
726 hyper::Request::builder()
727 .uri(self.url)
728 .method(hyper::Method::POST)
729 .header("Content-Range", range_header.header_value())
730 .header(CONTENT_TYPE, format!("{}", self.media_type))
731 .header(USER_AGENT, self.user_agent.to_string())
732 .body(hyper::body::Body::from(req_bytes))
733 .unwrap(),
734 )
735 .await;
736 match res {
737 Ok(res) => {
738 start += request_size;
739
740 if res.status() == StatusCode::PERMANENT_REDIRECT {
741 continue;
742 }
743
744 let (res_parts, res_body) = res.into_parts();
745 let res_body_string: String = String::from_utf8(
746 hyper::body::to_bytes(res_body)
747 .await
748 .unwrap()
749 .into_iter()
750 .collect(),
751 )
752 .unwrap();
753 let reconstructed_result =
754 hyper::Response::from_parts(res_parts, res_body_string.clone().into());
755
756 if !reconstructed_result.status().is_success() {
757 if let Retry::After(d) = self.delegate.http_failure(
758 &reconstructed_result,
759 json::from_str(&res_body_string).ok(),
760 json::from_str(&res_body_string).ok(),
761 ) {
762 sleep(d);
763 continue;
764 }
765 }
766 return Some(Ok(reconstructed_result));
767 }
768 Err(err) => {
769 if let Retry::After(d) = self.delegate.http_error(&err) {
770 sleep(d);
771 continue;
772 }
773 return Some(Err(err));
774 }
775 }
776 }
777 }
778}
779
780pub fn remove_json_null_values(value: &mut json::value::Value) {
783 match *value {
784 json::value::Value::Object(ref mut map) => {
785 let mut for_removal = Vec::new();
786
787 for (key, mut value) in map.iter_mut() {
788 if value.is_null() {
789 for_removal.push(key.clone());
790 } else {
791 remove_json_null_values(&mut value);
792 }
793 }
794
795 for key in &for_removal {
796 map.remove(key);
797 }
798 }
799 json::value::Value::Array(ref mut arr) => {
800 let mut i = 0;
801 while i < arr.len() {
802 if arr[i].is_null() {
803 arr.remove(i);
804 } else {
805 remove_json_null_values(&mut arr[i]);
806 i += 1;
807 }
808 }
809 }
810 _ => {}
811 }
812}
813
814pub async fn get_body_as_string(res_body: &mut hyper::Body) -> String {
816 let res_body_buf = hyper::body::to_bytes(res_body).await.unwrap();
817 let res_body_string = String::from_utf8_lossy(&res_body_buf);
818 res_body_string.to_string()
819}