google_cloud_storage/storage/upload_object/
unbuffered.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::upload_source::Seek;
16use super::*;
17
18impl<T> UploadObject<T>
19where
20    T: StreamingSource + Seek + Send + Sync + 'static,
21    <T as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
22    <T as Seek>::Error: std::error::Error + Send + Sync + 'static,
23{
24    /// A simple upload from a buffer.
25    ///
26    /// # Example
27    /// ```
28    /// # use google_cloud_storage::client::Storage;
29    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
30    /// let response = client
31    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
32    ///     .send_unbuffered()
33    ///     .await?;
34    /// println!("response details={response:?}");
35    /// # Ok(()) }
36    /// ```
37    pub async fn send_unbuffered(self) -> Result<Object> {
38        // TODO(#2056) - make idempotency configurable.
39        // Single shot uploads are idempotent only if they have pre-conditions.
40        let idempotent =
41            self.spec.if_generation_match.is_some() || self.spec.if_metageneration_match.is_some();
42        gax::retry_loop_internal::retry_loop(
43            // TODO(#2044) - we need to apply any timeouts here.
44            async |_| self.single_shot_attempt().await,
45            async |duration| tokio::time::sleep(duration).await,
46            idempotent,
47            self.options.retry_throttler.clone(),
48            self.options.retry_policy.clone(),
49            self.options.backoff_policy.clone(),
50        )
51        .await
52    }
53
54    async fn single_shot_attempt(&self) -> Result<Object> {
55        // TODO(#2634) - use resumable uploads for large payloads.
56        let builder = self.single_shot_builder().await?;
57        let response = builder.send().await.map_err(Error::io)?;
58        if !response.status().is_success() {
59            return gaxi::http::to_http_error(response).await;
60        }
61        let response = response.json::<v1::Object>().await.map_err(Error::io)?;
62
63        Ok(Object::from(response))
64    }
65
66    async fn single_shot_builder(&self) -> Result<reqwest::RequestBuilder> {
67        use crate::upload_source::Seek;
68        let payload = self.payload.clone();
69        payload.lock().await.seek(0).await.map_err(Error::ser)?;
70
71        let bucket = &self.resource().bucket;
72        let bucket_id = bucket.strip_prefix("projects/_/buckets/").ok_or_else(|| {
73            Error::binding(format!(
74                "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
75            ))
76        })?;
77        let object = &self.resource().name;
78        let builder = self
79            .inner
80            .client
81            .request(
82                reqwest::Method::POST,
83                format!("{}/upload/storage/v1/b/{bucket_id}/o", &self.inner.endpoint),
84            )
85            .query(&[("uploadType", "multipart")])
86            .query(&[("name", enc(object))])
87            .header(
88                "x-goog-api-client",
89                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
90            );
91
92        let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
93        let builder = self.inner.apply_auth_headers(builder).await?;
94
95        let stream = Box::pin(unfold(Some(payload), move |state| async move {
96            if let Some(payload) = state {
97                let mut guard = payload.lock().await;
98                if let Some(next) = guard.next().await {
99                    drop(guard);
100                    return Some((next, Some(payload)));
101                }
102            }
103            None
104        }));
105        let metadata = reqwest::multipart::Part::text(v1::insert_body(self.resource()).to_string())
106            .mime_str("application/json; charset=UTF-8")
107            .map_err(Error::ser)?;
108        let form = reqwest::multipart::Form::new()
109            .part("metadata", metadata)
110            .part(
111                "media",
112                reqwest::multipart::Part::stream(reqwest::Body::wrap_stream(stream)),
113            );
114        let builder = builder.header(
115            "content-type",
116            format!("multipart/related; boundary={}", form.boundary()),
117        );
118        Ok(builder.body(reqwest::Body::wrap_stream(form.into_stream())))
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::super::client::tests::{create_key_helper, test_builder, test_inner_client};
125    use super::*;
126    use crate::upload_source::tests::VecStream;
127    use gax::retry_policy::RetryPolicyExt;
128    use http_body_util::BodyExt;
129    use httptest::{Expectation, Server, matchers::*, responders::*};
130    use serde_json::{Value, json};
131
132    type Result = anyhow::Result<()>;
133
134    #[tokio::test]
135    async fn send_unbuffered_normal() -> Result {
136        let payload = response_body().to_string();
137        let server = Server::run();
138        server.expect(
139            Expectation::matching(all_of![
140                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
141                request::query(url_decoded(contains(("name", "test-object")))),
142                request::query(url_decoded(contains(("uploadType", "multipart")))),
143            ])
144            .respond_with(
145                status_code(200)
146                    .append_header("content-type", "application/json")
147                    .body(payload),
148            ),
149        );
150
151        let client = Storage::builder()
152            .with_endpoint(format!("http://{}", server.addr()))
153            .with_credentials(auth::credentials::testing::test_credentials())
154            .build()
155            .await?;
156        let response = client
157            .upload_object("projects/_/buckets/test-bucket", "test-object", "")
158            .send_unbuffered()
159            .await?;
160        assert_eq!(response.name, "test-object");
161        assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
162        assert_eq!(
163            response.metadata.get("is-test-object").map(String::as_str),
164            Some("true")
165        );
166
167        Ok(())
168    }
169
170    #[tokio::test]
171    async fn send_unbuffered_not_found() -> Result {
172        let server = Server::run();
173        server.expect(
174            Expectation::matching(all_of![
175                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
176                request::query(url_decoded(contains(("name", "test-object")))),
177                request::query(url_decoded(contains(("uploadType", "multipart")))),
178            ])
179            .respond_with(status_code(404).body("NOT FOUND")),
180        );
181
182        let client = Storage::builder()
183            .with_endpoint(format!("http://{}", server.addr()))
184            .with_credentials(auth::credentials::testing::test_credentials())
185            .build()
186            .await?;
187        let err = client
188            .upload_object("projects/_/buckets/test-bucket", "test-object", "")
189            .send_unbuffered()
190            .await
191            .expect_err("expected a not found error");
192        assert_eq!(err.http_status_code(), Some(404));
193
194        Ok(())
195    }
196
197    async fn parse_multipart_body(
198        mut request: reqwest::Request,
199    ) -> anyhow::Result<(bytes::Bytes, bytes::Bytes)> {
200        let boundary = request
201            .headers()
202            .get("content-type")
203            .and_then(|h| h.to_str().ok())
204            .and_then(|h| h.strip_prefix("multipart/related; boundary="))
205            .expect("request should include content-type")
206            .to_string();
207        let stream = request
208            .body_mut()
209            .take()
210            .expect("request should have a body")
211            .into_data_stream();
212        let mut multipart = multer::Multipart::new(stream, boundary);
213        let Some(m) = multipart.next_field().await? else {
214            return Err(anyhow::Error::msg("missing metadata field"));
215        };
216        let metadata = m.bytes().await?;
217
218        let Some(p) = multipart.next_field().await? else {
219            return Err(anyhow::Error::msg("missing payload field"));
220        };
221        let payload = p.bytes().await?;
222
223        assert!(
224            multipart.next_field().await?.is_none(),
225            "unexpected extra fields"
226        );
227
228        Ok((metadata, payload))
229    }
230
231    fn response_body() -> Value {
232        json!({
233            "name": "test-object",
234            "bucket": "test-bucket",
235            "metadata": {
236                "is-test-object": "true",
237            }
238        })
239    }
240
241    #[tokio::test]
242    async fn upload_object_bytes() -> Result {
243        let inner = test_inner_client(test_builder());
244        let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
245            .single_shot_builder()
246            .await?
247            .build()?;
248
249        assert_eq!(request.method(), reqwest::Method::POST);
250        assert_eq!(
251            request.url().as_str(),
252            "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=multipart&name=object"
253        );
254        let (_metadata, contents) = parse_multipart_body(request).await?;
255        assert_eq!(contents, "hello");
256        Ok(())
257    }
258
259    #[tokio::test]
260    async fn upload_object_metadata() -> Result {
261        let inner = test_inner_client(test_builder());
262        let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
263            .with_metadata([("k0", "v0"), ("k1", "v1")])
264            .single_shot_builder()
265            .await?
266            .build()?;
267
268        assert_eq!(request.method(), reqwest::Method::POST);
269        assert_eq!(
270            request.url().as_str(),
271            "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=multipart&name=object"
272        );
273        let (metadata, contents) = parse_multipart_body(request).await?;
274        assert_eq!(contents, "hello");
275        let object = serde_json::from_slice::<Value>(&metadata)?;
276        assert_eq!(object, json!({"metadata": {"k0": "v0", "k1": "v1"}}));
277        Ok(())
278    }
279
280    #[tokio::test]
281    async fn upload_object_stream() -> Result {
282        let stream = VecStream::new(
283            [
284                "the ", "quick ", "brown ", "fox ", "jumps ", "over ", "the ", "lazy ", "dog",
285            ]
286            .map(|x| bytes::Bytes::from_static(x.as_bytes()))
287            .to_vec(),
288        );
289        let inner = test_inner_client(test_builder());
290        let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", stream)
291            .single_shot_builder()
292            .await?
293            .build()?;
294
295        assert_eq!(request.method(), reqwest::Method::POST);
296        assert_eq!(
297            request.url().as_str(),
298            "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=multipart&name=object"
299        );
300        let (_metadata, contents) = parse_multipart_body(request).await?;
301        assert_eq!(contents, "the quick brown fox jumps over the lazy dog");
302        Ok(())
303    }
304
305    #[tokio::test]
306    async fn upload_object_error_credentials() -> Result {
307        let inner = test_inner_client(
308            test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
309        );
310        let _ = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
311            .single_shot_builder()
312            .await
313            .inspect_err(|e| assert!(e.is_authentication()))
314            .expect_err("invalid credentials should err");
315        Ok(())
316    }
317
318    #[tokio::test]
319    async fn upload_object_bad_bucket() -> Result {
320        let inner = test_inner_client(test_builder());
321        UploadObject::new(inner, "malformed", "object", "hello")
322            .single_shot_builder()
323            .await
324            .expect_err("malformed bucket string should error");
325        Ok(())
326    }
327
328    #[tokio::test]
329    async fn upload_object_headers() -> Result {
330        // Make a 32-byte key.
331        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
332
333        let inner = test_inner_client(test_builder());
334        let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
335            .with_key(KeyAes256::new(&key)?)
336            .single_shot_builder()
337            .await?
338            .build()?;
339
340        assert_eq!(request.method(), reqwest::Method::POST);
341        assert_eq!(
342            request.url().as_str(),
343            "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=multipart&name=object"
344        );
345
346        let want = vec![
347            ("x-goog-encryption-algorithm", "AES256".to_string()),
348            ("x-goog-encryption-key", key_base64),
349            ("x-goog-encryption-key-sha256", key_sha256_base64),
350        ];
351
352        for (name, value) in want {
353            assert_eq!(
354                request.headers().get(name).unwrap().as_bytes(),
355                bytes::Bytes::from(value)
356            );
357        }
358        Ok(())
359    }
360
361    #[tokio::test]
362    async fn single_shot_retry_transient_not_idempotent() -> Result {
363        let server = Server::run();
364        let matching = || {
365            Expectation::matching(all_of![
366                request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
367                request::query(url_decoded(contains(("name", "object")))),
368                request::query(url_decoded(contains(("uploadType", "multipart")))),
369            ])
370        };
371        server.expect(
372            matching()
373                .times(1)
374                .respond_with(cycle![status_code(503).body("try-again"),]),
375        );
376
377        let inner =
378            test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
379        let err = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
380            .send_unbuffered()
381            .await
382            .expect_err("expected error as request is not idempotent");
383        assert_eq!(err.http_status_code(), Some(503), "{err:?}");
384
385        Ok(())
386    }
387
388    #[tokio::test]
389    async fn single_shot_retry_transient_failures_then_success() -> Result {
390        let server = Server::run();
391        let matching = || {
392            Expectation::matching(all_of![
393                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
394                request::query(url_decoded(contains(("name", "test-object")))),
395                request::query(url_decoded(contains(("uploadType", "multipart")))),
396            ])
397        };
398        server.expect(matching().times(3).respond_with(cycle![
399            status_code(503).body("try-again"),
400            status_code(503).body("try-again"),
401            json_encoded(response_body()).append_header("content-type", "application/json"),
402        ]));
403
404        let inner =
405            test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
406        let got = UploadObject::new(
407            inner,
408            "projects/_/buckets/test-bucket",
409            "test-object",
410            "hello",
411        )
412        .with_if_generation_match(0)
413        .send_unbuffered()
414        .await?;
415        let want = Object::from(serde_json::from_value::<v1::Object>(response_body())?);
416        assert_eq!(got, want);
417
418        Ok(())
419    }
420
421    #[tokio::test]
422    async fn single_shot_retry_transient_failures_then_permanent() -> Result {
423        let server = Server::run();
424        let matching = || {
425            Expectation::matching(all_of![
426                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
427                request::query(url_decoded(contains(("name", "test-object")))),
428                request::query(url_decoded(contains(("uploadType", "multipart")))),
429            ])
430        };
431        server.expect(matching().times(3).respond_with(cycle![
432            status_code(503).body("try-again"),
433            status_code(503).body("try-again"),
434            status_code(403).body("uh-oh"),
435        ]));
436
437        let inner =
438            test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
439        let err = UploadObject::new(
440            inner,
441            "projects/_/buckets/test-bucket",
442            "test-object",
443            "hello",
444        )
445        .with_if_generation_match(0)
446        .send_unbuffered()
447        .await
448        .expect_err("expected permanent error");
449        assert_eq!(err.http_status_code(), Some(403), "{err:?}");
450
451        Ok(())
452    }
453
454    #[tokio::test]
455    async fn single_shot_retry_transient_failures_exhausted() -> Result {
456        let server = Server::run();
457        let matching = || {
458            Expectation::matching(all_of![
459                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
460                request::query(url_decoded(contains(("name", "test-object")))),
461                request::query(url_decoded(contains(("uploadType", "multipart")))),
462            ])
463        };
464        server.expect(
465            matching()
466                .times(3)
467                .respond_with(status_code(503).body("try-again")),
468        );
469
470        let inner =
471            test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
472        let err = UploadObject::new(
473            inner,
474            "projects/_/buckets/test-bucket",
475            "test-object",
476            "hello",
477        )
478        .with_if_generation_match(0)
479        .with_retry_policy(crate::retry_policy::RecommendedPolicy.with_attempt_limit(3))
480        .send_unbuffered()
481        .await
482        .expect_err("expected permanent error");
483        assert_eq!(err.http_status_code(), Some(503), "{err:?}");
484
485        Ok(())
486    }
487}