use super::{
ContinueOn308, Error, Object, PerformUpload, Result, ResumableUploadStatus, Seek, SizeHint,
StreamingSource, X_GOOG_API_CLIENT_HEADER, apply_customer_supplied_encryption_headers,
handle_object_response, v1,
};
use futures::stream::unfold;
use std::sync::Arc;
impl<S> PerformUpload<S>
where
S: StreamingSource + Seek + Send + Sync + 'static,
<S as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
<S as Seek>::Error: std::error::Error + Send + Sync + 'static,
{
pub(crate) async fn send_unbuffered(self) -> Result<Object> {
let hint = self
.payload
.lock()
.await
.size_hint()
.await
.map_err(Error::deser)?;
let threshold = self.options.resumable_upload_threshold as u64;
if hint.upper().is_none_or(|max| max >= threshold) {
self.send_unbuffered_resumable(hint).await
} else {
self.send_unbuffered_single_shot(hint).await
}
}
async fn send_unbuffered_resumable(self, hint: SizeHint) -> Result<Object> {
let mut upload_url = None;
let throttler = self.options.retry_throttler.clone();
let retry = Arc::new(ContinueOn308::new(self.options.retry_policy.clone()));
let backoff = self.options.backoff_policy.clone();
gax::retry_loop_internal::retry_loop(
async move |_| self.resumable_attempt(&mut upload_url, hint.clone()).await,
async |duration| tokio::time::sleep(duration).await,
true,
throttler,
retry,
backoff,
)
.await
}
async fn resumable_attempt(&self, url: &mut Option<String>, hint: SizeHint) -> Result<Object> {
let (offset, url_ref) = if let Some(upload_url) = url.as_deref() {
match self.query_resumable_upload_attempt(upload_url).await? {
ResumableUploadStatus::Finalized(object) => {
return Ok(*object);
}
ResumableUploadStatus::Partial(offset) => (offset, upload_url),
}
} else {
let upload_url = self.start_resumable_upload_attempt().await?;
(0_u64, url.insert(upload_url).as_str())
};
let range = match (offset, hint.exact()) {
(o, None) => format!("bytes {o}-*/*"),
(_, Some(0)) => "bytes */0".to_string(),
(o, Some(u)) => format!("bytes {o}-{}/{u}", u - 1),
};
let builder = self
.inner
.client
.request(reqwest::Method::PUT, url_ref)
.header("content-type", "application/octet-stream")
.header("Content-Range", range)
.header(
"x-goog-api-client",
reqwest::header::HeaderValue::from_static(&X_GOOG_API_CLIENT_HEADER),
);
let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
let builder = self.inner.apply_auth_headers(builder).await?;
self.payload
.lock()
.await
.seek(offset)
.await
.map_err(Error::ser)?;
let payload = self.payload_to_body().await?;
let builder = builder.body(payload);
let response = builder.send().await.map_err(Self::send_err)?;
let object = self::handle_object_response(response).await?;
self.validate_response_object(object).await
}
pub(super) async fn send_unbuffered_single_shot(self, hint: SizeHint) -> Result<Object> {
let idempotent = self.options.idempotency.unwrap_or(
self.spec.if_generation_match.is_some() || self.spec.if_metageneration_match.is_some(),
);
let throttler = self.options.retry_throttler.clone();
let retry = self.options.retry_policy.clone();
let backoff = self.options.backoff_policy.clone();
gax::retry_loop_internal::retry_loop(
async move |_| self.single_shot_attempt(hint.clone()).await,
async |duration| tokio::time::sleep(duration).await,
idempotent,
throttler,
retry,
backoff,
)
.await
}
async fn single_shot_attempt(&self, hint: SizeHint) -> Result<Object> {
let builder = self.single_shot_builder(hint).await?;
let response = builder.send().await.map_err(Self::send_err)?;
let object = super::handle_object_response(response).await?;
self.validate_response_object(object).await
}
async fn single_shot_builder(&self, hint: SizeHint) -> Result<reqwest::RequestBuilder> {
let bucket = &self.resource().bucket;
let bucket_id = bucket.strip_prefix("projects/_/buckets/").ok_or_else(|| {
Error::binding(format!(
"malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
))
})?;
let object = &self.resource().name;
let builder = self
.inner
.client
.request(
reqwest::Method::POST,
format!("{}/upload/storage/v1/b/{bucket_id}/o", &self.inner.endpoint),
)
.query(&[("uploadType", "multipart")])
.query(&[("name", object)])
.header(
"x-goog-api-client",
reqwest::header::HeaderValue::from_static(&X_GOOG_API_CLIENT_HEADER),
);
let builder = self.apply_preconditions(builder);
let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
let builder = self.inner.apply_auth_headers(builder).await?;
let metadata = reqwest::multipart::Part::text(v1::insert_body(self.resource()).to_string())
.mime_str("application/json; charset=UTF-8")
.map_err(Error::ser)?;
self.payload
.lock()
.await
.seek(0)
.await
.map_err(Error::ser)?;
let payload = self.payload_to_body().await?;
let form = reqwest::multipart::Form::new().part("metadata", metadata);
let form = if let Some(exact) = hint.exact() {
form.part(
"media",
reqwest::multipart::Part::stream_with_length(payload, exact),
)
} else {
form.part("media", reqwest::multipart::Part::stream(payload))
};
let builder = builder.header(
"content-type",
format!("multipart/related; boundary={}", form.boundary()),
);
Ok(builder.body(reqwest::Body::wrap_stream(form.into_stream())))
}
async fn payload_to_body(&self) -> Result<reqwest::Body> {
let payload = self.payload.clone();
let stream = Box::pin(unfold(Some(payload), move |state| async move {
if let Some(payload) = state {
let mut guard = payload.lock().await;
if let Some(next) = guard.next().await {
drop(guard);
return Some((next, Some(payload)));
}
}
None
}));
Ok(reqwest::Body::wrap_stream(stream))
}
}
#[cfg(test)]
mod resumable_tests;
#[cfg(test)]
mod single_shot_tests;