1use super::upload_source::Seek;
16use super::*;
17
18impl<T> UploadObject<T>
19where
20 T: StreamingSource + Seek + Send + Sync + 'static,
21 <T as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
22 <T as Seek>::Error: std::error::Error + Send + Sync + 'static,
23{
24 pub async fn send_unbuffered(self) -> Result<Object> {
38 let idempotent =
41 self.spec.if_generation_match.is_some() || self.spec.if_metageneration_match.is_some();
42 gax::retry_loop_internal::retry_loop(
43 async |_| self.single_shot_attempt().await,
45 async |duration| tokio::time::sleep(duration).await,
46 idempotent,
47 self.options.retry_throttler.clone(),
48 self.options.retry_policy.clone(),
49 self.options.backoff_policy.clone(),
50 )
51 .await
52 }
53
54 async fn single_shot_attempt(&self) -> Result<Object> {
55 let builder = self.single_shot_builder().await?;
57 let response = builder.send().await.map_err(Error::io)?;
58 if !response.status().is_success() {
59 return gaxi::http::to_http_error(response).await;
60 }
61 let response = response.json::<v1::Object>().await.map_err(Error::io)?;
62
63 Ok(Object::from(response))
64 }
65
66 async fn single_shot_builder(&self) -> Result<reqwest::RequestBuilder> {
67 use crate::upload_source::Seek;
68 let payload = self.payload.clone();
69 payload.lock().await.seek(0).await.map_err(Error::ser)?;
70
71 let bucket = &self.resource().bucket;
72 let bucket_id = bucket.strip_prefix("projects/_/buckets/").ok_or_else(|| {
73 Error::binding(format!(
74 "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
75 ))
76 })?;
77 let object = &self.resource().name;
78 let builder = self
79 .inner
80 .client
81 .request(
82 reqwest::Method::POST,
83 format!("{}/upload/storage/v1/b/{bucket_id}/o", &self.inner.endpoint),
84 )
85 .query(&[("uploadType", "multipart")])
86 .query(&[("name", enc(object))])
87 .header(
88 "x-goog-api-client",
89 reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
90 );
91
92 let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
93 let builder = self.inner.apply_auth_headers(builder).await?;
94
95 let stream = Box::pin(unfold(Some(payload), move |state| async move {
96 if let Some(payload) = state {
97 let mut guard = payload.lock().await;
98 if let Some(next) = guard.next().await {
99 drop(guard);
100 return Some((next, Some(payload)));
101 }
102 }
103 None
104 }));
105 let metadata = reqwest::multipart::Part::text(v1::insert_body(self.resource()).to_string())
106 .mime_str("application/json; charset=UTF-8")
107 .map_err(Error::ser)?;
108 let form = reqwest::multipart::Form::new()
109 .part("metadata", metadata)
110 .part(
111 "media",
112 reqwest::multipart::Part::stream(reqwest::Body::wrap_stream(stream)),
113 );
114 let builder = builder.header(
115 "content-type",
116 format!("multipart/related; boundary={}", form.boundary()),
117 );
118 Ok(builder.body(reqwest::Body::wrap_stream(form.into_stream())))
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::super::client::tests::{create_key_helper, test_builder, test_inner_client};
125 use super::*;
126 use crate::upload_source::tests::VecStream;
127 use gax::retry_policy::RetryPolicyExt;
128 use http_body_util::BodyExt;
129 use httptest::{Expectation, Server, matchers::*, responders::*};
130 use serde_json::{Value, json};
131
132 type Result = anyhow::Result<()>;
133
134 #[tokio::test]
135 async fn send_unbuffered_normal() -> Result {
136 let payload = response_body().to_string();
137 let server = Server::run();
138 server.expect(
139 Expectation::matching(all_of![
140 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
141 request::query(url_decoded(contains(("name", "test-object")))),
142 request::query(url_decoded(contains(("uploadType", "multipart")))),
143 ])
144 .respond_with(
145 status_code(200)
146 .append_header("content-type", "application/json")
147 .body(payload),
148 ),
149 );
150
151 let client = Storage::builder()
152 .with_endpoint(format!("http://{}", server.addr()))
153 .with_credentials(auth::credentials::testing::test_credentials())
154 .build()
155 .await?;
156 let response = client
157 .upload_object("projects/_/buckets/test-bucket", "test-object", "")
158 .send_unbuffered()
159 .await?;
160 assert_eq!(response.name, "test-object");
161 assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
162 assert_eq!(
163 response.metadata.get("is-test-object").map(String::as_str),
164 Some("true")
165 );
166
167 Ok(())
168 }
169
170 #[tokio::test]
171 async fn send_unbuffered_not_found() -> Result {
172 let server = Server::run();
173 server.expect(
174 Expectation::matching(all_of![
175 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
176 request::query(url_decoded(contains(("name", "test-object")))),
177 request::query(url_decoded(contains(("uploadType", "multipart")))),
178 ])
179 .respond_with(status_code(404).body("NOT FOUND")),
180 );
181
182 let client = Storage::builder()
183 .with_endpoint(format!("http://{}", server.addr()))
184 .with_credentials(auth::credentials::testing::test_credentials())
185 .build()
186 .await?;
187 let err = client
188 .upload_object("projects/_/buckets/test-bucket", "test-object", "")
189 .send_unbuffered()
190 .await
191 .expect_err("expected a not found error");
192 assert_eq!(err.http_status_code(), Some(404));
193
194 Ok(())
195 }
196
197 async fn parse_multipart_body(
198 mut request: reqwest::Request,
199 ) -> anyhow::Result<(bytes::Bytes, bytes::Bytes)> {
200 let boundary = request
201 .headers()
202 .get("content-type")
203 .and_then(|h| h.to_str().ok())
204 .and_then(|h| h.strip_prefix("multipart/related; boundary="))
205 .expect("request should include content-type")
206 .to_string();
207 let stream = request
208 .body_mut()
209 .take()
210 .expect("request should have a body")
211 .into_data_stream();
212 let mut multipart = multer::Multipart::new(stream, boundary);
213 let Some(m) = multipart.next_field().await? else {
214 return Err(anyhow::Error::msg("missing metadata field"));
215 };
216 let metadata = m.bytes().await?;
217
218 let Some(p) = multipart.next_field().await? else {
219 return Err(anyhow::Error::msg("missing payload field"));
220 };
221 let payload = p.bytes().await?;
222
223 assert!(
224 multipart.next_field().await?.is_none(),
225 "unexpected extra fields"
226 );
227
228 Ok((metadata, payload))
229 }
230
231 fn response_body() -> Value {
232 json!({
233 "name": "test-object",
234 "bucket": "test-bucket",
235 "metadata": {
236 "is-test-object": "true",
237 }
238 })
239 }
240
241 #[tokio::test]
242 async fn upload_object_bytes() -> Result {
243 let inner = test_inner_client(test_builder());
244 let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
245 .single_shot_builder()
246 .await?
247 .build()?;
248
249 assert_eq!(request.method(), reqwest::Method::POST);
250 assert_eq!(
251 request.url().as_str(),
252 "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=multipart&name=object"
253 );
254 let (_metadata, contents) = parse_multipart_body(request).await?;
255 assert_eq!(contents, "hello");
256 Ok(())
257 }
258
259 #[tokio::test]
260 async fn upload_object_metadata() -> Result {
261 let inner = test_inner_client(test_builder());
262 let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
263 .with_metadata([("k0", "v0"), ("k1", "v1")])
264 .single_shot_builder()
265 .await?
266 .build()?;
267
268 assert_eq!(request.method(), reqwest::Method::POST);
269 assert_eq!(
270 request.url().as_str(),
271 "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=multipart&name=object"
272 );
273 let (metadata, contents) = parse_multipart_body(request).await?;
274 assert_eq!(contents, "hello");
275 let object = serde_json::from_slice::<Value>(&metadata)?;
276 assert_eq!(object, json!({"metadata": {"k0": "v0", "k1": "v1"}}));
277 Ok(())
278 }
279
280 #[tokio::test]
281 async fn upload_object_stream() -> Result {
282 let stream = VecStream::new(
283 [
284 "the ", "quick ", "brown ", "fox ", "jumps ", "over ", "the ", "lazy ", "dog",
285 ]
286 .map(|x| bytes::Bytes::from_static(x.as_bytes()))
287 .to_vec(),
288 );
289 let inner = test_inner_client(test_builder());
290 let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", stream)
291 .single_shot_builder()
292 .await?
293 .build()?;
294
295 assert_eq!(request.method(), reqwest::Method::POST);
296 assert_eq!(
297 request.url().as_str(),
298 "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=multipart&name=object"
299 );
300 let (_metadata, contents) = parse_multipart_body(request).await?;
301 assert_eq!(contents, "the quick brown fox jumps over the lazy dog");
302 Ok(())
303 }
304
305 #[tokio::test]
306 async fn upload_object_error_credentials() -> Result {
307 let inner = test_inner_client(
308 test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
309 );
310 let _ = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
311 .single_shot_builder()
312 .await
313 .inspect_err(|e| assert!(e.is_authentication()))
314 .expect_err("invalid credentials should err");
315 Ok(())
316 }
317
318 #[tokio::test]
319 async fn upload_object_bad_bucket() -> Result {
320 let inner = test_inner_client(test_builder());
321 UploadObject::new(inner, "malformed", "object", "hello")
322 .single_shot_builder()
323 .await
324 .expect_err("malformed bucket string should error");
325 Ok(())
326 }
327
328 #[tokio::test]
329 async fn upload_object_headers() -> Result {
330 let (key, key_base64, _, key_sha256_base64) = create_key_helper();
332
333 let inner = test_inner_client(test_builder());
334 let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
335 .with_key(KeyAes256::new(&key)?)
336 .single_shot_builder()
337 .await?
338 .build()?;
339
340 assert_eq!(request.method(), reqwest::Method::POST);
341 assert_eq!(
342 request.url().as_str(),
343 "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=multipart&name=object"
344 );
345
346 let want = vec![
347 ("x-goog-encryption-algorithm", "AES256".to_string()),
348 ("x-goog-encryption-key", key_base64),
349 ("x-goog-encryption-key-sha256", key_sha256_base64),
350 ];
351
352 for (name, value) in want {
353 assert_eq!(
354 request.headers().get(name).unwrap().as_bytes(),
355 bytes::Bytes::from(value)
356 );
357 }
358 Ok(())
359 }
360
361 #[tokio::test]
362 async fn single_shot_retry_transient_not_idempotent() -> Result {
363 let server = Server::run();
364 let matching = || {
365 Expectation::matching(all_of![
366 request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
367 request::query(url_decoded(contains(("name", "object")))),
368 request::query(url_decoded(contains(("uploadType", "multipart")))),
369 ])
370 };
371 server.expect(
372 matching()
373 .times(1)
374 .respond_with(cycle![status_code(503).body("try-again"),]),
375 );
376
377 let inner =
378 test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
379 let err = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
380 .send_unbuffered()
381 .await
382 .expect_err("expected error as request is not idempotent");
383 assert_eq!(err.http_status_code(), Some(503), "{err:?}");
384
385 Ok(())
386 }
387
388 #[tokio::test]
389 async fn single_shot_retry_transient_failures_then_success() -> Result {
390 let server = Server::run();
391 let matching = || {
392 Expectation::matching(all_of![
393 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
394 request::query(url_decoded(contains(("name", "test-object")))),
395 request::query(url_decoded(contains(("uploadType", "multipart")))),
396 ])
397 };
398 server.expect(matching().times(3).respond_with(cycle![
399 status_code(503).body("try-again"),
400 status_code(503).body("try-again"),
401 json_encoded(response_body()).append_header("content-type", "application/json"),
402 ]));
403
404 let inner =
405 test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
406 let got = UploadObject::new(
407 inner,
408 "projects/_/buckets/test-bucket",
409 "test-object",
410 "hello",
411 )
412 .with_if_generation_match(0)
413 .send_unbuffered()
414 .await?;
415 let want = Object::from(serde_json::from_value::<v1::Object>(response_body())?);
416 assert_eq!(got, want);
417
418 Ok(())
419 }
420
421 #[tokio::test]
422 async fn single_shot_retry_transient_failures_then_permanent() -> Result {
423 let server = Server::run();
424 let matching = || {
425 Expectation::matching(all_of![
426 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
427 request::query(url_decoded(contains(("name", "test-object")))),
428 request::query(url_decoded(contains(("uploadType", "multipart")))),
429 ])
430 };
431 server.expect(matching().times(3).respond_with(cycle![
432 status_code(503).body("try-again"),
433 status_code(503).body("try-again"),
434 status_code(403).body("uh-oh"),
435 ]));
436
437 let inner =
438 test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
439 let err = UploadObject::new(
440 inner,
441 "projects/_/buckets/test-bucket",
442 "test-object",
443 "hello",
444 )
445 .with_if_generation_match(0)
446 .send_unbuffered()
447 .await
448 .expect_err("expected permanent error");
449 assert_eq!(err.http_status_code(), Some(403), "{err:?}");
450
451 Ok(())
452 }
453
454 #[tokio::test]
455 async fn single_shot_retry_transient_failures_exhausted() -> Result {
456 let server = Server::run();
457 let matching = || {
458 Expectation::matching(all_of![
459 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
460 request::query(url_decoded(contains(("name", "test-object")))),
461 request::query(url_decoded(contains(("uploadType", "multipart")))),
462 ])
463 };
464 server.expect(
465 matching()
466 .times(3)
467 .respond_with(status_code(503).body("try-again")),
468 );
469
470 let inner =
471 test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
472 let err = UploadObject::new(
473 inner,
474 "projects/_/buckets/test-bucket",
475 "test-object",
476 "hello",
477 )
478 .with_if_generation_match(0)
479 .with_retry_policy(crate::retry_policy::RecommendedPolicy.with_attempt_limit(3))
480 .send_unbuffered()
481 .await
482 .expect_err("expected permanent error");
483 assert_eq!(err.http_status_code(), Some(503), "{err:?}");
484
485 Ok(())
486 }
487}