use super::client::{StorageInner, apply_customer_supplied_encryption_headers};
use crate::model::Object;
use crate::retry_policy::ContinueOn308;
use crate::storage::checksum::details::ChecksummedSource;
use crate::storage::client::info::X_GOOG_API_CLIENT_HEADER;
use crate::storage::v1;
use crate::streaming_source::{IterSource, Seek, SizeHint, StreamingSource};
use crate::{Error, Result};
use std::sync::Arc;
use tokio::sync::Mutex;
mod buffered;
mod unbuffered;
pub struct PerformUpload<S> {
payload: Arc<Mutex<ChecksummedSource<S>>>,
inner: Arc<StorageInner>,
spec: crate::model::WriteObjectSpec,
params: Option<crate::model::CommonObjectRequestParams>,
options: super::request_options::RequestOptions,
}
impl<S> PerformUpload<S> {
pub(crate) fn new(
payload: S,
inner: Arc<StorageInner>,
spec: crate::model::WriteObjectSpec,
params: Option<crate::model::CommonObjectRequestParams>,
options: super::request_options::RequestOptions,
) -> Self {
let checksum = options.checksum.clone();
Self {
payload: Arc::new(Mutex::new(ChecksummedSource::new(checksum, payload))),
inner,
spec,
params,
options,
}
}
fn resource(&self) -> &crate::model::Object {
self.spec
.resource
.as_ref()
.expect("resource field initialized in `new()`")
}
async fn start_resumable_upload_attempt(&self) -> Result<String> {
let builder = self.start_resumable_upload_request().await?;
let response = builder.send().await.map_err(Error::io)?;
self::handle_start_resumable_upload_response(response).await
}
async fn start_resumable_upload_request(&self) -> Result<reqwest::RequestBuilder> {
let bucket = &self.resource().bucket;
let bucket_id = bucket.strip_prefix("projects/_/buckets/").ok_or_else(|| {
Error::binding(format!(
"malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
))
})?;
let object = &self.resource().name;
let builder = self
.inner
.client
.request(
reqwest::Method::POST,
format!("{}/upload/storage/v1/b/{bucket_id}/o", &self.inner.endpoint),
)
.query(&[("uploadType", "resumable")])
.query(&[("name", object)])
.header("content-type", "application/json")
.header(
"x-goog-api-client",
reqwest::header::HeaderValue::from_static(&X_GOOG_API_CLIENT_HEADER),
);
let builder = self.apply_preconditions(builder);
let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
let builder = self.inner.apply_auth_headers(builder).await?;
let builder = builder.json(&v1::insert_body(self.resource()));
Ok(builder)
}
async fn query_resumable_upload_attempt(
&self,
upload_url: &str,
) -> Result<ResumableUploadStatus> {
let builder = self
.inner
.client
.request(reqwest::Method::PUT, upload_url)
.header("content-type", "application/octet-stream")
.header("Content-Range", "bytes */*")
.header("content-length", 0)
.header(
"x-goog-api-client",
reqwest::header::HeaderValue::from_static(&X_GOOG_API_CLIENT_HEADER),
);
let builder = self.inner.apply_auth_headers(builder).await?;
let response = builder.send().await.map_err(Error::io)?;
self::query_resumable_upload_handle_response(response).await
}
fn apply_preconditions(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
let builder = self
.spec
.if_generation_match
.iter()
.fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
let builder = self
.spec
.if_generation_not_match
.iter()
.fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
let builder = self
.spec
.if_metageneration_match
.iter()
.fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
let builder = self
.spec
.if_metageneration_not_match
.iter()
.fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
[
("kmsKeyName", self.resource().kms_key.as_str()),
("predefinedAcl", self.spec.predefined_acl.as_str()),
]
.into_iter()
.fold(
builder,
|b, (k, v)| if v.is_empty() { b } else { b.query(&[(k, v)]) },
)
}
}
async fn handle_start_resumable_upload_response(response: reqwest::Response) -> Result<String> {
if !response.status().is_success() {
return gaxi::http::to_http_error(response).await;
}
let location = response
.headers()
.get("Location")
.ok_or_else(|| Error::deser("missing Location header in start resumable upload"))?;
location.to_str().map_err(Error::deser).map(str::to_string)
}
async fn query_resumable_upload_handle_response(
response: reqwest::Response,
) -> Result<ResumableUploadStatus> {
if response.status() == RESUME_INCOMPLETE {
return self::parse_range(response).await;
}
let object = handle_object_response(response).await?;
Ok(ResumableUploadStatus::Finalized(Box::new(object)))
}
async fn handle_object_response(response: reqwest::Response) -> Result<Object> {
if !response.status().is_success() {
return gaxi::http::to_http_error(response).await;
}
let response = response.json::<v1::Object>().await.map_err(Error::deser)?;
Ok(Object::from(response))
}
async fn parse_range(response: reqwest::Response) -> Result<ResumableUploadStatus> {
let Some(end) = self::parse_range_end(response.headers()) else {
return gaxi::http::to_http_error(response).await;
};
let persisted_size = match end {
0 => 0,
e => e + 1,
};
Ok(ResumableUploadStatus::Partial(persisted_size))
}
#[derive(Debug, PartialEq)]
enum ResumableUploadStatus {
Finalized(Box<Object>),
Partial(u64),
}
fn parse_range_end(headers: &reqwest::header::HeaderMap) -> Option<u64> {
let Some(range) = headers.get("range") else {
return Some(0_u64);
};
let end = std::str::from_utf8(range.as_bytes().strip_prefix(b"bytes=0-")?).ok()?;
end.parse::<u64>().ok()
}
const RESUME_INCOMPLETE: reqwest::StatusCode = reqwest::StatusCode::PERMANENT_REDIRECT;
#[cfg(test)]
mod tests;