pub mod auth;
pub mod field_mask;
pub mod serde;
pub mod url;
pub use auth::{GetToken, NoToken};
pub use field_mask::FieldMask;
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use std::str::FromStr;
use std::time::Duration;
use hyper::header::{HeaderMap, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT};
use hyper::Method;
use hyper::StatusCode;
use mime::Mime;
use tokio::time::sleep;
const LINE_ENDING: &str = "\r\n";
pub type Body = http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>;
pub type Response = hyper::Response<Body>;
pub type Client<C> = hyper_util::client::legacy::Client<C, Body>;
pub trait Connector:
hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
{
}
impl<T> Connector for T where
T: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
{
}
pub enum Retry {
Abort,
After(Duration),
}
#[derive(PartialEq, Eq)]
pub enum UploadProtocol {
Simple,
Resumable,
}
pub trait Hub {}
pub trait MethodsBuilder {}
pub trait CallBuilder {}
pub trait Resource {}
pub trait ResponseResult {}
pub trait RequestValue {}
pub trait UnusedType {}
pub trait Part {}
pub trait NestedType {}
pub trait ReadSeek: Seek + Read + Send {}
impl<T: Seek + Read + Send> ReadSeek for T {}
pub trait ToParts {
fn to_parts(&self) -> String;
}
pub trait Delegate: Send {
fn begin(&mut self, _info: MethodInfo) {}
fn http_error(&mut self, _err: &hyper_util::client::legacy::Error) -> Retry {
Retry::Abort
}
fn api_key(&mut self) -> Option<String> {
None
}
fn token(
&mut self,
e: Box<dyn std::error::Error + Send + Sync>,
) -> std::result::Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
Err(e)
}
fn upload_url(&mut self) -> Option<String> {
None
}
fn store_upload_url(&mut self, url: Option<&str>) {
let _ = url;
}
fn response_json_decode_error(
&mut self,
json_encoded_value: &str,
json_decode_error: &serde_json::Error,
) {
let _ = json_encoded_value;
let _ = json_decode_error;
}
fn http_failure(&mut self, _: &Response, _err: Option<&serde_json::Value>) -> Retry {
Retry::Abort
}
fn pre_request(&mut self) {}
fn chunk_size(&mut self) -> u64 {
1 << 23
}
fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
let _ = chunk;
false
}
fn finished(&mut self, is_success: bool) {
let _ = is_success;
}
}
#[derive(Default)]
pub struct DefaultDelegate;
impl Delegate for DefaultDelegate {}
#[derive(Debug)]
pub enum Error {
HttpError(hyper_util::client::legacy::Error),
UploadSizeLimitExceeded(u64, u64),
BadRequest(serde_json::Value),
MissingAPIKey,
MissingToken(Box<dyn std::error::Error + Send + Sync>),
Cancelled,
FieldClash(&'static str),
JsonDecodeError(String, serde_json::Error),
Failure(Response),
Io(std::io::Error),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Error::Io(err) => err.fmt(f),
Error::HttpError(err) => err.fmt(f),
Error::UploadSizeLimitExceeded(resource_size, max_size) => writeln!(
f,
"The media size {resource_size} exceeds the maximum allowed upload size of {max_size}"
),
Error::MissingAPIKey => {
writeln!(
f,
"The application's API key was not found in the configuration"
)?;
writeln!(
f,
"It is used as there are no Scopes defined for this method."
)
}
Error::BadRequest(message) => writeln!(f, "Bad Request: {message}"),
Error::MissingToken(e) => writeln!(f, "Token retrieval failed: {e}"),
Error::Cancelled => writeln!(f, "Operation cancelled by delegate"),
Error::FieldClash(field) => writeln!(
f,
"The custom parameter '{field}' is already provided natively by the CallBuilder."
),
Error::JsonDecodeError(json_str, err) => writeln!(f, "{err}: {json_str}"),
Error::Failure(response) => {
writeln!(f, "Http status indicates failure: {response:?}")
}
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match *self {
Error::HttpError(ref err) => err.source(),
Error::JsonDecodeError(_, ref err) => err.source(),
_ => None,
}
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Error::Io(err)
}
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct MethodInfo {
pub id: &'static str,
pub http_method: Method,
}
const BOUNDARY: &str = "MDuXWGyeE33QFXGchb2VFWc4Z7945d";
#[derive(Default)]
pub struct MultiPartReader<'a> {
raw_parts: Vec<(HeaderMap, &'a mut (dyn Read + Send))>,
current_part: Option<(Cursor<Vec<u8>>, &'a mut (dyn Read + Send))>,
last_part_boundary: Option<Cursor<Vec<u8>>>,
}
impl<'a> MultiPartReader<'a> {
pub fn mime_type() -> Mime {
Mime::from_str(&format!("multipart/related;boundary={BOUNDARY}")).expect("valid mimetype")
}
pub fn reserve_exact(&mut self, cap: usize) {
self.raw_parts.reserve_exact(cap);
}
pub fn add_part(
&mut self,
reader: &'a mut (dyn Read + Send),
size: u64,
mime_type: Mime,
) -> &mut MultiPartReader<'a> {
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
hyper::header::HeaderValue::from_str(mime_type.as_ref()).unwrap(),
);
headers.insert(CONTENT_LENGTH, size.into());
self.raw_parts.push((headers, reader));
self
}
fn is_depleted(&self) -> bool {
self.raw_parts.is_empty()
&& self.current_part.is_none()
&& self.last_part_boundary.is_none()
}
fn is_last_part(&self) -> bool {
self.raw_parts.is_empty() && self.current_part.is_some()
}
}
impl Read for MultiPartReader<'_> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match (
self.raw_parts.len(),
self.current_part.is_none(),
self.last_part_boundary.is_none(),
) {
(_, _, false) => {
let br = self
.last_part_boundary
.as_mut()
.unwrap()
.read(buf)
.unwrap_or(0);
if br < buf.len() {
self.last_part_boundary = None;
}
return Ok(br);
}
(0, true, true) => return Ok(0),
(n, true, _) if n > 0 => {
use std::fmt::Write as _;
let (headers, reader) = self.raw_parts.remove(0);
let mut encoded_headers = String::new();
for (k, v) in &headers {
if !encoded_headers.is_empty() {
encoded_headers.push_str(LINE_ENDING);
}
write!(encoded_headers, "{}: {}", k, v.to_str().unwrap())
.map_err(std::io::Error::other)?;
}
let mut c = Cursor::new(Vec::<u8>::new());
(write!(
&mut c,
"{LINE_ENDING}--{BOUNDARY}{LINE_ENDING}{encoded_headers}{LINE_ENDING}{LINE_ENDING}"
))?;
c.rewind()?;
self.current_part = Some((c, reader));
}
_ => {}
}
let (hb, rr) = {
let &mut (ref mut c, ref mut reader) = self.current_part.as_mut().unwrap();
let b = c.read(buf).unwrap_or(0);
(b, reader.read(&mut buf[b..]))
};
match rr {
Ok(bytes_read) => {
if hb < buf.len() && bytes_read == 0 {
if self.is_last_part() {
self.last_part_boundary = Some(Cursor::new(
format!("{LINE_ENDING}--{BOUNDARY}--{LINE_ENDING}").into_bytes(),
))
}
self.current_part = None;
}
let mut total_bytes_read = hb + bytes_read;
while total_bytes_read < buf.len() && !self.is_depleted() {
match self.read(&mut buf[total_bytes_read..]) {
Ok(br) => total_bytes_read += br,
Err(err) => return Err(err),
}
}
Ok(total_bytes_read)
}
Err(err) => {
self.current_part = None;
self.last_part_boundary = None;
self.raw_parts.clear();
Err(err)
}
}
}
}
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct XUploadContentType(pub Mime);
impl std::ops::Deref for XUploadContentType {
type Target = Mime;
fn deref(&self) -> &Mime {
&self.0
}
}
impl std::ops::DerefMut for XUploadContentType {
fn deref_mut(&mut self) -> &mut Mime {
&mut self.0
}
}
impl std::fmt::Display for XUploadContentType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
std::fmt::Display::fmt(&**self, f)
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Chunk {
pub first: u64,
pub last: u64,
}
impl std::fmt::Display for Chunk {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
(write!(fmt, "{}-{}", self.first, self.last)).ok();
Ok(())
}
}
impl FromStr for Chunk {
type Err = &'static str;
fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
let parts: Vec<&str> = s.split('-').collect();
if parts.len() != 2 {
return Err("Expected two parts: %i-%i");
}
Ok(Chunk {
first: match FromStr::from_str(parts[0]) {
Ok(d) => d,
_ => return Err("Couldn't parse 'first' as digit"),
},
last: match FromStr::from_str(parts[1]) {
Ok(d) => d,
_ => return Err("Couldn't parse 'last' as digit"),
},
})
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct ContentRange {
pub range: Option<Chunk>,
pub total_length: u64,
}
impl ContentRange {
pub fn header_value(&self) -> String {
format!(
"bytes {}/{}",
match self.range {
Some(ref c) => format!("{c}"),
None => "*".to_string(),
},
self.total_length
)
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct RangeResponseHeader(pub Chunk);
impl RangeResponseHeader {
fn from_bytes(raw: &[u8]) -> Self {
if !raw.is_empty() {
if let Ok(s) = std::str::from_utf8(raw) {
const PREFIX: &str = "bytes ";
if let Some(stripped) = s.strip_prefix(PREFIX) {
if let Ok(c) = <Chunk as FromStr>::from_str(stripped) {
return RangeResponseHeader(c);
}
}
}
}
panic!("Unable to parse Range header {raw:?}")
}
}
pub struct ResumableUploadHelper<'a, A: 'a, C>
where
C: Connector,
{
pub client: &'a Client<C>,
pub delegate: &'a mut dyn Delegate,
pub start_at: Option<u64>,
pub auth: &'a A,
pub user_agent: &'a str,
pub auth_header: String,
pub url: &'a str,
pub reader: &'a mut dyn ReadSeek,
pub media_type: Mime,
pub content_length: u64,
}
impl<A, C> ResumableUploadHelper<'_, A, C>
where
C: Connector,
{
async fn query_transfer_status(
&mut self,
) -> std::result::Result<u64, std::result::Result<Response, hyper_util::client::legacy::Error>>
{
loop {
match self
.client
.request(
hyper::Request::builder()
.method(hyper::Method::POST)
.uri(self.url)
.header(USER_AGENT, self.user_agent.to_string())
.header(
"Content-Range",
ContentRange {
range: None,
total_length: self.content_length,
}
.header_value(),
)
.header(AUTHORIZATION, self.auth_header.clone())
.body(to_body::<String>(None))
.unwrap(),
)
.await
{
Ok(r) => {
let headers = r.headers().clone();
let h: RangeResponseHeader = match headers.get("Range") {
Some(hh) if r.status() == StatusCode::PERMANENT_REDIRECT => {
RangeResponseHeader::from_bytes(hh.as_bytes())
}
None | Some(_) => {
let (parts, body) = r.into_parts();
let body = to_body(to_bytes(body).await);
let response = Response::from_parts(parts, body);
if let Retry::After(d) = self.delegate.http_failure(&response, None) {
sleep(d).await;
continue;
}
return Err(Ok(response));
}
};
return Ok(h.0.last);
}
Err(err) => {
if let Retry::After(d) = self.delegate.http_error(&err) {
sleep(d).await;
continue;
}
return Err(Err(err));
}
}
}
}
pub async fn upload(
&mut self,
) -> Option<std::result::Result<Response, hyper_util::client::legacy::Error>> {
let mut start = match self.start_at {
Some(s) => s,
None => match self.query_transfer_status().await {
Ok(s) => s,
Err(result) => return Some(result),
},
};
const MIN_CHUNK_SIZE: u64 = 1 << 18;
let chunk_size = match self.delegate.chunk_size() {
cs if cs > MIN_CHUNK_SIZE => cs,
_ => MIN_CHUNK_SIZE,
};
loop {
self.reader.seek(SeekFrom::Start(start)).unwrap();
let request_size = match self.content_length - start {
rs if rs > chunk_size => chunk_size,
rs => rs,
};
let mut section_reader = self.reader.take(request_size);
let mut bytes = vec![];
section_reader.read_to_end(&mut bytes).unwrap();
let range_header = ContentRange {
range: Some(Chunk {
first: start,
last: start + request_size - 1,
}),
total_length: self.content_length,
};
if self.delegate.cancel_chunk_upload(&range_header) {
return None;
}
match self
.client
.request(
hyper::Request::builder()
.uri(self.url)
.method(hyper::Method::POST)
.header("Content-Range", range_header.header_value())
.header(CONTENT_TYPE, format!("{}", self.media_type))
.header(USER_AGENT, self.user_agent.to_string())
.body(to_body(bytes.into()))
.unwrap(),
)
.await
{
Ok(response) => {
start += request_size;
if response.status() == StatusCode::PERMANENT_REDIRECT {
continue;
}
let (parts, body) = response.into_parts();
let success = parts.status.is_success();
let bytes = to_bytes(body).await.unwrap_or_default();
let error = if !success {
serde_json::from_str(&to_string(&bytes)).ok()
} else {
None
};
let response = to_response(parts, bytes.into());
if !success {
if let Retry::After(d) =
self.delegate.http_failure(&response, error.as_ref())
{
sleep(d).await;
continue;
}
}
return Some(Ok(response));
}
Err(err) => {
if let Retry::After(d) = self.delegate.http_error(&err) {
sleep(d).await;
continue;
}
return Some(Err(err));
}
}
}
}
}
pub fn remove_json_null_values(value: &mut serde_json::value::Value) {
match value {
serde_json::value::Value::Object(map) => {
map.retain(|_, value| !value.is_null());
map.values_mut().for_each(remove_json_null_values);
}
serde_json::value::Value::Array(arr) => {
arr.retain(|value| !value.is_null());
arr.iter_mut().for_each(remove_json_null_values);
}
_ => {}
}
}
#[doc(hidden)]
pub fn to_body<T>(bytes: Option<T>) -> Body
where
T: Into<hyper::body::Bytes>,
{
use http_body_util::BodyExt;
fn falliable(_: std::convert::Infallible) -> hyper::Error {
unreachable!()
}
let bytes = bytes.map(Into::into).unwrap_or_default();
Body::new(http_body_util::Full::from(bytes).map_err(falliable))
}
#[doc(hidden)]
pub async fn to_bytes<T>(body: T) -> Option<hyper::body::Bytes>
where
T: hyper::body::Body,
{
use http_body_util::BodyExt;
body.collect().await.ok().map(|value| value.to_bytes())
}
#[doc(hidden)]
pub fn to_string(bytes: &hyper::body::Bytes) -> std::borrow::Cow<'_, str> {
String::from_utf8_lossy(bytes)
}
#[doc(hidden)]
pub fn to_response<T>(parts: http::response::Parts, body: Option<T>) -> Response
where
T: Into<hyper::body::Bytes>,
{
Response::from_parts(parts, to_body(body))
}
#[cfg(test)]
mod tests {
use std::default::Default;
use std::str::FromStr;
use ::serde::{Deserialize, Serialize};
use super::*;
#[test]
fn serde() {
#[derive(Default, Serialize, Deserialize)]
struct Foo {
opt: Option<String>,
req: u32,
opt_vec: Option<Vec<String>>,
vec: Vec<String>,
}
let f: Foo = Default::default();
serde_json::to_string(&f).unwrap();
let j = "{\"opt\":null,\"req\":0,\"vec\":[]}";
let _f: Foo = serde_json::from_str(j).unwrap();
#[derive(Default, Serialize, Deserialize)]
struct Bar {
#[serde(rename = "snooSnoo")]
snoo_snoo: String,
}
serde_json::to_string(&<Bar as Default>::default()).unwrap();
let j = "{\"snooSnoo\":\"foo\"}";
let b: Bar = serde_json::from_str(j).unwrap();
assert_eq!(b.snoo_snoo, "foo");
}
#[test]
fn byte_range_from_str() {
assert_eq!(
<Chunk as FromStr>::from_str("2-42"),
Ok(Chunk { first: 2, last: 42 })
)
}
#[test]
fn dyn_delegate_is_send() {
fn with_send(_x: impl Send) {}
let mut dd = DefaultDelegate;
let dlg: &mut dyn Delegate = &mut dd;
with_send(dlg);
}
#[test]
fn test_mime() {
let mime = MultiPartReader::mime_type();
assert_eq!(mime::MULTIPART, mime.type_());
assert_eq!("related", mime.subtype());
assert_eq!(
Some(BOUNDARY),
mime.get_param("boundary").map(|x| x.as_str())
);
}
}