1pub mod auth;
2pub mod field_mask;
3pub mod serde;
4pub mod url;
5
6pub use auth::{GetToken, NoToken};
7pub use field_mask::FieldMask;
8
9use std::io::{Cursor, Read, Seek, SeekFrom, Write};
10use std::str::FromStr;
11use std::time::Duration;
12
13use hyper::header::{HeaderMap, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT};
14use hyper::Method;
15use hyper::StatusCode;
16use mime::Mime;
17use tokio::time::sleep;
18
19const LINE_ENDING: &str = "\r\n";
20
21pub type Body = http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>;
23
24pub type Response = hyper::Response<Body>;
26
27pub type Client<C> = hyper_util::client::legacy::Client<C, Body>;
29
30pub trait Connector:
32 hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
33{
34}
35
36impl<T> Connector for T where
37 T: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
38{
39}
40
41pub enum Retry {
42 Abort,
44 After(Duration),
46}
47
48#[derive(PartialEq, Eq)]
49pub enum UploadProtocol {
50 Simple,
51 Resumable,
52}
53
54pub trait Hub {}
58
59pub trait MethodsBuilder {}
61
62pub trait CallBuilder {}
64
65pub trait Resource {}
68
69pub trait ResponseResult {}
71
72pub trait RequestValue {}
74
75pub trait UnusedType {}
78
79pub trait Part {}
82
83pub trait NestedType {}
86
87pub trait ReadSeek: Seek + Read + Send {}
89impl<T: Seek + Read + Send> ReadSeek for T {}
90
91pub trait ToParts {
93 fn to_parts(&self) -> String;
94}
95
96pub trait Delegate: Send {
102 fn begin(&mut self, _info: MethodInfo) {}
109
110 fn http_error(&mut self, _err: &hyper_util::client::legacy::Error) -> Retry {
118 Retry::Abort
119 }
120
121 fn api_key(&mut self) -> Option<String> {
125 None
126 }
127
128 fn token(
134 &mut self,
135 e: Box<dyn std::error::Error + Send + Sync>,
136 ) -> std::result::Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
137 Err(e)
138 }
139
140 fn upload_url(&mut self) -> Option<String> {
148 None
149 }
150
151 fn store_upload_url(&mut self, url: Option<&str>) {
158 let _ = url;
159 }
160
161 fn response_json_decode_error(
170 &mut self,
171 json_encoded_value: &str,
172 json_decode_error: &serde_json::Error,
173 ) {
174 let _ = json_encoded_value;
175 let _ = json_decode_error;
176 }
177
178 fn http_failure(&mut self, _: &Response, _err: Option<&serde_json::Value>) -> Retry {
187 Retry::Abort
188 }
189
190 fn pre_request(&mut self) {}
194
195 fn chunk_size(&mut self) -> u64 {
199 1 << 23
200 }
201
202 fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
207 let _ = chunk;
208 false
209 }
210
211 fn finished(&mut self, is_success: bool) {
220 let _ = is_success;
221 }
222}
223
224#[derive(Default)]
227pub struct DefaultDelegate;
228
229impl Delegate for DefaultDelegate {}
230
231#[derive(Debug)]
232pub enum Error {
233 HttpError(hyper_util::client::legacy::Error),
235
236 UploadSizeLimitExceeded(u64, u64),
239
240 BadRequest(serde_json::Value),
243
244 MissingAPIKey,
247
248 MissingToken(Box<dyn std::error::Error + Send + Sync>),
250
251 Cancelled,
253
254 FieldClash(&'static str),
256
257 JsonDecodeError(String, serde_json::Error),
260
261 Failure(Response),
263
264 Io(std::io::Error),
266}
267
268impl std::fmt::Display for Error {
269 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
270 match self {
271 Error::Io(err) => err.fmt(f),
272 Error::HttpError(err) => err.fmt(f),
273 Error::UploadSizeLimitExceeded(resource_size, max_size) => writeln!(
274 f,
275 "The media size {resource_size} exceeds the maximum allowed upload size of {max_size}"
276 ),
277 Error::MissingAPIKey => {
278 writeln!(
279 f,
280 "The application's API key was not found in the configuration"
281 )?;
282 writeln!(
283 f,
284 "It is used as there are no Scopes defined for this method."
285 )
286 }
287 Error::BadRequest(message) => writeln!(f, "Bad Request: {message}"),
288 Error::MissingToken(e) => writeln!(f, "Token retrieval failed: {e}"),
289 Error::Cancelled => writeln!(f, "Operation cancelled by delegate"),
290 Error::FieldClash(field) => writeln!(
291 f,
292 "The custom parameter '{field}' is already provided natively by the CallBuilder."
293 ),
294 Error::JsonDecodeError(json_str, err) => writeln!(f, "{err}: {json_str}"),
295 Error::Failure(response) => {
296 writeln!(f, "Http status indicates failure: {response:?}")
297 }
298 }
299 }
300}
301
302impl std::error::Error for Error {
303 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
304 match *self {
305 Error::HttpError(ref err) => err.source(),
306 Error::JsonDecodeError(_, ref err) => err.source(),
307 _ => None,
308 }
309 }
310}
311
312impl From<std::io::Error> for Error {
313 fn from(err: std::io::Error) -> Self {
314 Error::Io(err)
315 }
316}
317
318pub type Result<T> = std::result::Result<T, Error>;
320
321pub struct MethodInfo {
323 pub id: &'static str,
324 pub http_method: Method,
325}
326
327const BOUNDARY: &str = "MDuXWGyeE33QFXGchb2VFWc4Z7945d";
328
329#[derive(Default)]
334pub struct MultiPartReader<'a> {
335 raw_parts: Vec<(HeaderMap, &'a mut (dyn Read + Send))>,
336 current_part: Option<(Cursor<Vec<u8>>, &'a mut (dyn Read + Send))>,
337 last_part_boundary: Option<Cursor<Vec<u8>>>,
338}
339
340impl<'a> MultiPartReader<'a> {
341 pub fn mime_type() -> Mime {
345 Mime::from_str(&format!("multipart/related;boundary={BOUNDARY}")).expect("valid mimetype")
346 }
347
348 pub fn reserve_exact(&mut self, cap: usize) {
350 self.raw_parts.reserve_exact(cap);
351 }
352
353 pub fn add_part(
365 &mut self,
366 reader: &'a mut (dyn Read + Send),
367 size: u64,
368 mime_type: Mime,
369 ) -> &mut MultiPartReader<'a> {
370 let mut headers = HeaderMap::new();
371 headers.insert(
372 CONTENT_TYPE,
373 hyper::header::HeaderValue::from_str(mime_type.as_ref()).unwrap(),
374 );
375 headers.insert(CONTENT_LENGTH, size.into());
376 self.raw_parts.push((headers, reader));
377 self
378 }
379
380 fn is_depleted(&self) -> bool {
382 self.raw_parts.is_empty()
383 && self.current_part.is_none()
384 && self.last_part_boundary.is_none()
385 }
386
387 fn is_last_part(&self) -> bool {
389 self.raw_parts.is_empty() && self.current_part.is_some()
390 }
391}
392
393impl Read for MultiPartReader<'_> {
394 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
395 match (
396 self.raw_parts.len(),
397 self.current_part.is_none(),
398 self.last_part_boundary.is_none(),
399 ) {
400 (_, _, false) => {
401 let br = self
402 .last_part_boundary
403 .as_mut()
404 .unwrap()
405 .read(buf)
406 .unwrap_or(0);
407 if br < buf.len() {
408 self.last_part_boundary = None;
409 }
410 return Ok(br);
411 }
412 (0, true, true) => return Ok(0),
413 (n, true, _) if n > 0 => {
414 use std::fmt::Write as _;
415 let (headers, reader) = self.raw_parts.remove(0);
416
417 let mut encoded_headers = String::new();
418 for (k, v) in &headers {
419 if !encoded_headers.is_empty() {
420 encoded_headers.push_str(LINE_ENDING);
421 }
422
423 write!(encoded_headers, "{}: {}", k, v.to_str().unwrap())
424 .map_err(std::io::Error::other)?;
425 }
426
427 let mut c = Cursor::new(Vec::<u8>::new());
428 (write!(
431 &mut c,
432 "{LINE_ENDING}--{BOUNDARY}{LINE_ENDING}{encoded_headers}{LINE_ENDING}{LINE_ENDING}"
433 ))?;
434 c.rewind()?;
435 self.current_part = Some((c, reader));
436 }
437 _ => {}
438 }
439
440 let (hb, rr) = {
442 let &mut (ref mut c, ref mut reader) = self.current_part.as_mut().unwrap();
443 let b = c.read(buf).unwrap_or(0);
444 (b, reader.read(&mut buf[b..]))
445 };
446
447 match rr {
448 Ok(bytes_read) => {
449 if hb < buf.len() && bytes_read == 0 {
450 if self.is_last_part() {
451 self.last_part_boundary = Some(Cursor::new(
454 format!("{LINE_ENDING}--{BOUNDARY}--{LINE_ENDING}").into_bytes(),
455 ))
456 }
457 self.current_part = None;
459 }
460 let mut total_bytes_read = hb + bytes_read;
461 while total_bytes_read < buf.len() && !self.is_depleted() {
462 match self.read(&mut buf[total_bytes_read..]) {
463 Ok(br) => total_bytes_read += br,
464 Err(err) => return Err(err),
465 }
466 }
467 Ok(total_bytes_read)
468 }
469 Err(err) => {
470 self.current_part = None;
472 self.last_part_boundary = None;
473 self.raw_parts.clear();
474 Err(err)
475 }
476 }
477 }
478}
479
480#[derive(PartialEq, Eq, Debug, Clone)]
485pub struct XUploadContentType(pub Mime);
486
487impl std::ops::Deref for XUploadContentType {
488 type Target = Mime;
489 fn deref(&self) -> &Mime {
490 &self.0
491 }
492}
493impl std::ops::DerefMut for XUploadContentType {
494 fn deref_mut(&mut self) -> &mut Mime {
495 &mut self.0
496 }
497}
498impl std::fmt::Display for XUploadContentType {
499 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
500 std::fmt::Display::fmt(&**self, f)
501 }
502}
503
504#[derive(Clone, PartialEq, Eq, Debug)]
505pub struct Chunk {
506 pub first: u64,
507 pub last: u64,
508}
509
510impl std::fmt::Display for Chunk {
511 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
512 (write!(fmt, "{}-{}", self.first, self.last)).ok();
513 Ok(())
514 }
515}
516
517impl FromStr for Chunk {
518 type Err = &'static str;
519
520 fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
522 let parts: Vec<&str> = s.split('-').collect();
523 if parts.len() != 2 {
524 return Err("Expected two parts: %i-%i");
525 }
526 Ok(Chunk {
527 first: match FromStr::from_str(parts[0]) {
528 Ok(d) => d,
529 _ => return Err("Couldn't parse 'first' as digit"),
530 },
531 last: match FromStr::from_str(parts[1]) {
532 Ok(d) => d,
533 _ => return Err("Couldn't parse 'last' as digit"),
534 },
535 })
536 }
537}
538
539#[derive(Clone, PartialEq, Eq, Debug)]
541pub struct ContentRange {
542 pub range: Option<Chunk>,
543 pub total_length: u64,
544}
545
546impl ContentRange {
547 pub fn header_value(&self) -> String {
548 format!(
549 "bytes {}/{}",
550 match self.range {
551 Some(ref c) => format!("{c}"),
552 None => "*".to_string(),
553 },
554 self.total_length
555 )
556 }
557}
558
559#[derive(Clone, PartialEq, Eq, Debug)]
560pub struct RangeResponseHeader(pub Chunk);
561
562impl RangeResponseHeader {
563 fn from_bytes(raw: &[u8]) -> Self {
564 if !raw.is_empty() {
565 if let Ok(s) = std::str::from_utf8(raw) {
566 const PREFIX: &str = "bytes ";
567 if let Some(stripped) = s.strip_prefix(PREFIX) {
568 if let Ok(c) = <Chunk as FromStr>::from_str(stripped) {
569 return RangeResponseHeader(c);
570 }
571 }
572 }
573 }
574
575 panic!("Unable to parse Range header {raw:?}")
576 }
577}
578
579pub struct ResumableUploadHelper<'a, A: 'a, C>
581where
582 C: Connector,
583{
584 pub client: &'a Client<C>,
585 pub delegate: &'a mut dyn Delegate,
586 pub start_at: Option<u64>,
587 pub auth: &'a A,
588 pub user_agent: &'a str,
589 pub auth_header: String,
590 pub url: &'a str,
591 pub reader: &'a mut dyn ReadSeek,
592 pub media_type: Mime,
593 pub content_length: u64,
594}
595
596impl<A, C> ResumableUploadHelper<'_, A, C>
597where
598 C: Connector,
599{
600 async fn query_transfer_status(
601 &mut self,
602 ) -> std::result::Result<u64, std::result::Result<Response, hyper_util::client::legacy::Error>>
603 {
604 loop {
605 match self
606 .client
607 .request(
608 hyper::Request::builder()
609 .method(hyper::Method::POST)
610 .uri(self.url)
611 .header(USER_AGENT, self.user_agent.to_string())
612 .header(
613 "Content-Range",
614 ContentRange {
615 range: None,
616 total_length: self.content_length,
617 }
618 .header_value(),
619 )
620 .header(AUTHORIZATION, self.auth_header.clone())
621 .body(to_body::<String>(None))
622 .unwrap(),
623 )
624 .await
625 {
626 Ok(r) => {
627 let headers = r.headers().clone();
629 let h: RangeResponseHeader = match headers.get("Range") {
630 Some(hh) if r.status() == StatusCode::PERMANENT_REDIRECT => {
631 RangeResponseHeader::from_bytes(hh.as_bytes())
632 }
633 None | Some(_) => {
634 let (parts, body) = r.into_parts();
635 let body = to_body(to_bytes(body).await);
636 let response = Response::from_parts(parts, body);
637 if let Retry::After(d) = self.delegate.http_failure(&response, None) {
638 sleep(d).await;
639 continue;
640 }
641 return Err(Ok(response));
642 }
643 };
644 return Ok(h.0.last);
645 }
646 Err(err) => {
647 if let Retry::After(d) = self.delegate.http_error(&err) {
648 sleep(d).await;
649 continue;
650 }
651 return Err(Err(err));
652 }
653 }
654 }
655 }
656
657 pub async fn upload(
661 &mut self,
662 ) -> Option<std::result::Result<Response, hyper_util::client::legacy::Error>> {
663 let mut start = match self.start_at {
664 Some(s) => s,
665 None => match self.query_transfer_status().await {
666 Ok(s) => s,
667 Err(result) => return Some(result),
668 },
669 };
670
671 const MIN_CHUNK_SIZE: u64 = 1 << 18;
672 let chunk_size = match self.delegate.chunk_size() {
673 cs if cs > MIN_CHUNK_SIZE => cs,
674 _ => MIN_CHUNK_SIZE,
675 };
676
677 loop {
678 self.reader.seek(SeekFrom::Start(start)).unwrap();
679
680 let request_size = match self.content_length - start {
681 rs if rs > chunk_size => chunk_size,
682 rs => rs,
683 };
684
685 let mut section_reader = self.reader.take(request_size);
686 let mut bytes = vec![];
687 section_reader.read_to_end(&mut bytes).unwrap();
688 let range_header = ContentRange {
689 range: Some(Chunk {
690 first: start,
691 last: start + request_size - 1,
692 }),
693 total_length: self.content_length,
694 };
695 if self.delegate.cancel_chunk_upload(&range_header) {
696 return None;
697 }
698 match self
699 .client
700 .request(
701 hyper::Request::builder()
702 .uri(self.url)
703 .method(hyper::Method::POST)
704 .header("Content-Range", range_header.header_value())
705 .header(CONTENT_TYPE, format!("{}", self.media_type))
706 .header(USER_AGENT, self.user_agent.to_string())
707 .body(to_body(bytes.into()))
708 .unwrap(),
709 )
710 .await
711 {
712 Ok(response) => {
713 start += request_size;
714
715 if response.status() == StatusCode::PERMANENT_REDIRECT {
716 continue;
717 }
718
719 let (parts, body) = response.into_parts();
720 let success = parts.status.is_success();
721 let bytes = to_bytes(body).await.unwrap_or_default();
722 let error = if !success {
723 serde_json::from_str(&to_string(&bytes)).ok()
724 } else {
725 None
726 };
727 let response = to_response(parts, bytes.into());
728
729 if !success {
730 if let Retry::After(d) =
731 self.delegate.http_failure(&response, error.as_ref())
732 {
733 sleep(d).await;
734 continue;
735 }
736 }
737 return Some(Ok(response));
738 }
739 Err(err) => {
740 if let Retry::After(d) = self.delegate.http_error(&err) {
741 sleep(d).await;
742 continue;
743 }
744 return Some(Err(err));
745 }
746 }
747 }
748 }
749}
750
751pub fn remove_json_null_values(value: &mut serde_json::value::Value) {
753 match value {
754 serde_json::value::Value::Object(map) => {
755 map.retain(|_, value| !value.is_null());
756 map.values_mut().for_each(remove_json_null_values);
757 }
758 serde_json::value::Value::Array(arr) => {
759 arr.retain(|value| !value.is_null());
760 arr.iter_mut().for_each(remove_json_null_values);
761 }
762 _ => {}
763 }
764}
765
766#[doc(hidden)]
767pub fn to_body<T>(bytes: Option<T>) -> Body
768where
769 T: Into<hyper::body::Bytes>,
770{
771 use http_body_util::BodyExt;
772
773 fn falliable(_: std::convert::Infallible) -> hyper::Error {
774 unreachable!()
775 }
776
777 let bytes = bytes.map(Into::into).unwrap_or_default();
778 Body::new(http_body_util::Full::from(bytes).map_err(falliable))
779}
780
781#[doc(hidden)]
782pub async fn to_bytes<T>(body: T) -> Option<hyper::body::Bytes>
783where
784 T: hyper::body::Body,
785{
786 use http_body_util::BodyExt;
787 body.collect().await.ok().map(|value| value.to_bytes())
788}
789
790#[doc(hidden)]
791pub fn to_string(bytes: &hyper::body::Bytes) -> std::borrow::Cow<'_, str> {
792 String::from_utf8_lossy(bytes)
793}
794
795#[doc(hidden)]
796pub fn to_response<T>(parts: http::response::Parts, body: Option<T>) -> Response
797where
798 T: Into<hyper::body::Bytes>,
799{
800 Response::from_parts(parts, to_body(body))
801}
802
803#[cfg(test)]
804mod tests {
805 use std::default::Default;
806 use std::str::FromStr;
807
808 use ::serde::{Deserialize, Serialize};
809
810 use super::*;
811
812 #[test]
813 fn serde() {
814 #[derive(Default, Serialize, Deserialize)]
815 struct Foo {
816 opt: Option<String>,
817 req: u32,
818 opt_vec: Option<Vec<String>>,
819 vec: Vec<String>,
820 }
821
822 let f: Foo = Default::default();
823 serde_json::to_string(&f).unwrap(); let j = "{\"opt\":null,\"req\":0,\"vec\":[]}";
826 let _f: Foo = serde_json::from_str(j).unwrap();
827
828 #[derive(Default, Serialize, Deserialize)]
833 struct Bar {
834 #[serde(rename = "snooSnoo")]
835 snoo_snoo: String,
836 }
837 serde_json::to_string(&<Bar as Default>::default()).unwrap();
838
839 let j = "{\"snooSnoo\":\"foo\"}";
840 let b: Bar = serde_json::from_str(j).unwrap();
841 assert_eq!(b.snoo_snoo, "foo");
842
843 }
852
853 #[test]
854 fn byte_range_from_str() {
855 assert_eq!(
856 <Chunk as FromStr>::from_str("2-42"),
857 Ok(Chunk { first: 2, last: 42 })
858 )
859 }
860
861 #[test]
862 fn dyn_delegate_is_send() {
863 fn with_send(_x: impl Send) {}
864
865 let mut dd = DefaultDelegate;
866 let dlg: &mut dyn Delegate = &mut dd;
867 with_send(dlg);
868 }
869
870 #[test]
871 fn test_mime() {
872 let mime = MultiPartReader::mime_type();
873
874 assert_eq!(mime::MULTIPART, mime.type_());
875 assert_eq!("related", mime.subtype());
876 assert_eq!(
877 Some(BOUNDARY),
878 mime.get_param("boundary").map(|x| x.as_str())
879 );
880 }
881}