use super::client::{StorageInner, apply_customer_supplied_encryption_headers};
use crate::model::Object;
use crate::retry_policy::ContinueOn308;
use crate::storage::checksum::details::ChecksummedSource;
use crate::storage::info::X_GOOG_API_CLIENT_HEADER;
use crate::storage::v1;
use crate::streaming_source::{IterSource, Seek, SizeHint, StreamingSource};
use crate::{Error, Result};
use gaxi::attempt_info::AttemptInfo;
use gaxi::http::HttpRequestBuilder;
use gaxi::http::reqwest::{HeaderMap, HeaderValue, Method, Response, StatusCode};
use google_cloud_gax::options::internal::{PathTemplate, RequestOptionsExt, ResourceName};
use std::sync::Arc;
use tokio::sync::Mutex;
mod buffered;
mod unbuffered;
pub struct PerformUpload<S> {
payload: Arc<Mutex<ChecksummedSource<S>>>,
inner: Arc<StorageInner>,
spec: crate::model::WriteObjectSpec,
params: Option<crate::model::CommonObjectRequestParams>,
options: super::request_options::RequestOptions,
}
impl<S> PerformUpload<S> {
pub(crate) fn new(
payload: S,
inner: Arc<StorageInner>,
spec: crate::model::WriteObjectSpec,
params: Option<crate::model::CommonObjectRequestParams>,
options: super::request_options::RequestOptions,
) -> Self {
let checksum = options.checksum.clone();
Self {
payload: Arc::new(Mutex::new(ChecksummedSource::new(checksum, payload))),
inner,
spec,
params,
options,
}
}
fn resource(&self) -> &crate::model::Object {
self.spec
.resource
.as_ref()
.expect("resource field initialized in `new()`")
}
async fn start_resumable_upload_attempt(&self, attempt_count: u32) -> Result<String> {
let builder = self.start_resumable_upload_request().await?;
let options = self.options.gax();
let options = options
.insert_extension(PathTemplate("/upload/storage/v1/b/{bucket}/o"))
.insert_extension(ResourceName(format!(
"//storage.googleapis.com/{}",
self.resource().bucket
)));
let response = builder
.send(options, AttemptInfo::new(attempt_count))
.await
.map_err(Error::io)?;
self::handle_start_resumable_upload_response(response).await
}
async fn start_resumable_upload_request(&self) -> Result<HttpRequestBuilder> {
let bucket = &self.resource().bucket;
let bucket_id = bucket.strip_prefix("projects/_/buckets/").ok_or_else(|| {
Error::binding(format!(
"malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
))
})?;
let object = &self.resource().name;
let builder = self
.inner
.client
.http_builder(Method::POST, &format!("/upload/storage/v1/b/{bucket_id}/o"))
.query("uploadType", "resumable")
.query("name", object)
.header("content-type", "application/json")
.header(
"x-goog-api-client",
HeaderValue::from_static(&X_GOOG_API_CLIENT_HEADER),
);
let builder = self.apply_preconditions(builder);
let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
let builder = builder.body(v1::insert_body(self.resource()).to_string());
Ok(builder)
}
async fn query_resumable_upload_attempt(
&self,
upload_url: &str,
attempt_count: u32,
) -> Result<ResumableUploadStatus> {
let options = self
.options
.gax()
.insert_extension(PathTemplate("/upload/storage/v1/b/{bucket}/o"))
.insert_extension(ResourceName(format!(
"//storage.googleapis.com/{}",
self.resource().bucket
)));
let builder = self
.inner
.client
.http_builder_with_url(Method::PUT, upload_url, crate::storage::DEFAULT_HOST)?
.header("content-type", "application/octet-stream")
.header("Content-Range", "bytes */*")
.header("content-length", 0)
.header(
"x-goog-api-client",
HeaderValue::from_static(&X_GOOG_API_CLIENT_HEADER),
);
let response = builder
.send(options, AttemptInfo::new(attempt_count))
.await
.map_err(Error::io)?;
self::query_resumable_upload_handle_response(response).await
}
fn apply_preconditions(&self, builder: HttpRequestBuilder) -> HttpRequestBuilder {
let builder = self
.spec
.if_generation_match
.iter()
.fold(builder, |b, v| b.query("ifGenerationMatch", v));
let builder = self
.spec
.if_generation_not_match
.iter()
.fold(builder, |b, v| b.query("ifGenerationNotMatch", v));
let builder = self
.spec
.if_metageneration_match
.iter()
.fold(builder, |b, v| b.query("ifMetagenerationMatch", v));
let builder = self
.spec
.if_metageneration_not_match
.iter()
.fold(builder, |b, v| b.query("ifMetagenerationNotMatch", v));
[
("kmsKeyName", self.resource().kms_key.as_str()),
("predefinedAcl", self.spec.predefined_acl.as_str()),
]
.into_iter()
.fold(
builder,
|b, (k, v)| if v.is_empty() { b } else { b.query(k, v) },
)
}
}
async fn handle_start_resumable_upload_response(response: Response) -> Result<String> {
if !response.status().is_success() {
return gaxi::http::to_http_error(response).await;
}
let location = response
.headers()
.get("Location")
.ok_or_else(|| Error::deser("missing Location header in start resumable upload"))?;
location.to_str().map_err(Error::deser).map(str::to_string)
}
async fn query_resumable_upload_handle_response(
response: Response,
) -> Result<ResumableUploadStatus> {
if response.status() == RESUME_INCOMPLETE {
return self::parse_range(response).await;
}
let object = handle_object_response(response).await?;
Ok(ResumableUploadStatus::Finalized(Box::new(object)))
}
async fn handle_object_response(response: Response) -> Result<Object> {
if !response.status().is_success() {
return gaxi::http::to_http_error(response).await;
}
let response = response.json::<v1::Object>().await.map_err(Error::deser)?;
Ok(Object::from(response))
}
async fn parse_range(response: Response) -> Result<ResumableUploadStatus> {
let Some(end) = self::parse_range_end(response.headers()) else {
return gaxi::http::to_http_error(response).await;
};
let persisted_size = match end {
0 => 0,
e => e + 1,
};
Ok(ResumableUploadStatus::Partial(persisted_size))
}
#[derive(Debug, PartialEq)]
enum ResumableUploadStatus {
Finalized(Box<Object>),
Partial(u64),
}
fn parse_range_end(headers: &HeaderMap) -> Option<u64> {
let Some(range) = headers.get("range") else {
return Some(0_u64);
};
let end = std::str::from_utf8(range.as_bytes().strip_prefix(b"bytes=0-")?).ok()?;
end.parse::<u64>().ok()
}
const RESUME_INCOMPLETE: StatusCode = StatusCode::PERMANENT_REDIRECT;
#[cfg(test)]
mod tests;