use super::*;
use crate::objects::{Metadata, Object};
#[derive(Clone)]
pub struct ResumableSession(pub http::Uri);
impl From<ResumableSession> for http::Uri {
fn from(rs: ResumableSession) -> Self {
rs.0
}
}
pub struct InitResumableInsertResponse {
pub resumable_session: ResumableSession,
}
impl ApiResponse<&[u8]> for InitResumableInsertResponse {}
impl ApiResponse<bytes::Bytes> for InitResumableInsertResponse {}
impl<B> TryFrom<http::Response<B>> for InitResumableInsertResponse
where
B: AsRef<[u8]>,
{
type Error = Error;
fn try_from(response: http::Response<B>) -> Result<Self, Self::Error> {
let (parts, _body) = response.into_parts();
match parts.headers.get(http::header::LOCATION) {
Some(session_uri) => match session_uri.to_str() {
Ok(session_uri) => Ok(Self {
resumable_session: ResumableSession(session_uri.parse()?),
}),
Err(_err) => Err(Error::OpaqueHeaderValue(session_uri.clone())),
},
None => Err(Error::UnknownHeader(http::header::LOCATION)),
}
}
}
pub enum ResumableInsertResponseMetadata {
PartialSize(u64),
Complete(Box<Metadata>),
}
pub struct ResumableInsertResponse {
pub metadata: ResumableInsertResponseMetadata,
}
impl ResumableInsertResponse {
fn try_from_response<B: AsRef<[u8]>>(
response: http::response::Response<B>,
) -> Result<Self, Error> {
let status = response.status();
if status.eq(&http::StatusCode::PERMANENT_REDIRECT)
|| status.eq(&http::StatusCode::OK)
|| status.eq(&http::StatusCode::CREATED)
{
Self::try_from(response)
} else {
if let Some(ct) = response
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|ct| ct.to_str().ok())
{
if ct.starts_with("text/plain") && !response.body().as_ref().is_empty() {
if let Ok(message) = std::str::from_utf8(response.body().as_ref()) {
let api_err = error::ApiError {
code: status.into(),
message: message.to_owned(),
errors: vec![],
};
return Err(Error::Api(api_err));
}
}
}
Err(Error::from(response.status()))
}
}
}
impl ApiResponse<&[u8]> for ResumableInsertResponse {
fn try_from_parts(response: http::response::Response<&[u8]>) -> Result<Self, Error> {
Self::try_from_response(response)
}
}
impl ApiResponse<bytes::Bytes> for ResumableInsertResponse {
fn try_from_parts(response: http::response::Response<bytes::Bytes>) -> Result<Self, Error> {
Self::try_from_response(response)
}
}
impl<B> TryFrom<http::Response<B>> for ResumableInsertResponse
where
B: AsRef<[u8]>,
{
type Error = Error;
fn try_from(response: http::Response<B>) -> Result<Self, Self::Error> {
if response.status().eq(&http::StatusCode::PERMANENT_REDIRECT) {
let (parts, _body) = response.into_parts();
let end_pos = match parts.headers.get(http::header::RANGE) {
Some(range_val) => match range_val.to_str() {
Ok(range) => match range.split('-').last() {
Some(pos) => {
let pos = pos.parse::<u64>();
match pos {
Ok(pos) => Ok(pos),
Err(_err) => Err(Error::OpaqueHeaderValue(range_val.clone())),
}
}
None => Err(Error::UnknownHeader(http::header::RANGE)),
},
Err(_err) => Err(Error::OpaqueHeaderValue(range_val.clone())),
},
None => Err(Error::UnknownHeader(http::header::RANGE)),
}?;
Ok(Self {
metadata: ResumableInsertResponseMetadata::PartialSize(end_pos + 1),
})
} else {
let (_parts, body) = response.into_parts();
let metadata = Box::new(serde_json::from_slice(body.as_ref())?);
Ok(Self {
metadata: ResumableInsertResponseMetadata::Complete(metadata),
})
}
}
}
pub struct CancelResumableInsertResponse;
impl CancelResumableInsertResponse {
fn try_from_response<B: AsRef<[u8]>>(
response: http::response::Response<B>,
) -> Result<Self, Error> {
if response.status().as_u16() == 499 {
Self::try_from(response)
} else {
Err(Error::from(response.status()))
}
}
}
impl ApiResponse<&[u8]> for CancelResumableInsertResponse {
fn try_from_parts(response: http::response::Response<&[u8]>) -> Result<Self, Error> {
Self::try_from_response(response)
}
}
impl ApiResponse<bytes::Bytes> for CancelResumableInsertResponse {
fn try_from_parts(response: http::response::Response<bytes::Bytes>) -> Result<Self, Error> {
Self::try_from_response(response)
}
}
impl<B> TryFrom<http::Response<B>> for CancelResumableInsertResponse
where
B: AsRef<[u8]>,
{
type Error = Error;
fn try_from(_response: http::Response<B>) -> Result<Self, Self::Error> {
Ok(Self)
}
}
impl Object {
pub fn resumable_insert_init<'a, OID>(
&self,
id: &OID,
content_type: Option<&str>,
) -> Result<http::Request<()>, Error>
where
OID: ObjectIdentifier<'a> + ?Sized,
{
let uri = format!(
"https://{}/upload/storage/v1/b/{}/o?uploadType=resumable&name={}",
self.authority.as_str(),
percent_encoding::percent_encode(id.bucket().as_ref(), crate::util::PATH_ENCODE_SET,),
percent_encoding::percent_encode(id.object().as_ref(), crate::util::QUERY_ENCODE_SET,),
);
let req_builder = http::Request::builder()
.header(http::header::CONTENT_LENGTH, 0u64)
.header(
http::header::HeaderName::from_static("x-upload-content-type"),
http::header::HeaderValue::from_str(
content_type.unwrap_or("application/octet-stream"),
)
.map_err(http::Error::from)?,
);
Ok(req_builder.method("POST").uri(uri).body(())?)
}
pub fn resumable_cancel(session: ResumableSession) -> Result<http::Request<()>, Error> {
let req_builder = http::Request::builder().header(http::header::CONTENT_LENGTH, 0u64);
Ok(req_builder.method("DELETE").uri(session).body(())?)
}
pub fn resumable_append<B>(
session: ResumableSession,
content: B,
length: u64,
) -> Result<http::Request<B>, Error> {
let req_builder = http::Request::builder().header(http::header::CONTENT_LENGTH, length);
Ok(req_builder.method("PUT").uri(session).body(content)?)
}
}