google_cloud_storage/storage/
upload_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::client::*;
16use super::*;
17use futures::stream::unfold;
18use std::collections::VecDeque;
19use std::sync::Arc;
20use tokio::sync::Mutex;
21
22mod buffered;
23mod unbuffered;
24
25/// A request builder for uploads without rewind.
26pub struct UploadObject<T> {
27    inner: std::sync::Arc<StorageInner>,
28    spec: crate::model::WriteObjectSpec,
29    params: Option<crate::model::CommonObjectRequestParams>,
30    // We need `Arc<Mutex<>>` because this is re-used in retryable uploads.
31    payload: Arc<Mutex<InsertPayload<T>>>,
32    options: super::request_options::RequestOptions,
33}
34
35impl<T> UploadObject<T> {
36    /// Set a [request precondition] on the object generation to match.
37    ///
38    /// With this precondition the request fails if the current object
39    /// generation matches the provided value. A common value is `0`, which
40    /// prevents uploads from succeeding if the object already exists.
41    ///
42    /// # Example
43    /// ```
44    /// # use google_cloud_storage::client::Storage;
45    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
46    /// let response = client
47    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
48    ///     .with_if_generation_match(0)
49    ///     .send()
50    ///     .await?;
51    /// println!("response details={response:?}");
52    /// # Ok(()) }
53    /// ```
54    ///
55    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
56    pub fn with_if_generation_match<V>(mut self, v: V) -> Self
57    where
58        V: Into<i64>,
59    {
60        self.spec.if_generation_match = Some(v.into());
61        self
62    }
63
64    /// Set a [request precondition] on the object generation to match.
65    ///
66    /// With this precondition the request fails if the current object
67    /// generation does not match the provided value.
68    ///
69    /// # Example
70    /// ```
71    /// # use google_cloud_storage::client::Storage;
72    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
73    /// let response = client
74    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
75    ///     .with_if_generation_not_match(0)
76    ///     .send()
77    ///     .await?;
78    /// println!("response details={response:?}");
79    /// # Ok(()) }
80    /// ```
81    ///
82    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
83    pub fn with_if_generation_not_match<V>(mut self, v: V) -> Self
84    where
85        V: Into<i64>,
86    {
87        self.spec.if_generation_not_match = Some(v.into());
88        self
89    }
90
91    /// Set a [request precondition] on the object meta generation.
92    ///
93    /// With this precondition the request fails if the current object metadata
94    /// generation does not match the provided value. This may be useful to
95    /// prevent changes when the metageneration is known.
96    ///
97    /// # Example
98    /// ```
99    /// # use google_cloud_storage::client::Storage;
100    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
101    /// let response = client
102    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
103    ///     .with_if_metageneration_match(1234)
104    ///     .send()
105    ///     .await?;
106    /// println!("response details={response:?}");
107    /// # Ok(()) }
108    /// ```
109    ///
110    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
111    pub fn with_if_metageneration_match<V>(mut self, v: V) -> Self
112    where
113        V: Into<i64>,
114    {
115        self.spec.if_metageneration_match = Some(v.into());
116        self
117    }
118
119    /// Set a [request precondition] on the object meta-generation.
120    ///
121    /// With this precondition the request fails if the current object metadata
122    /// generation matches the provided value. This is rarely useful in uploads,
123    /// it is more commonly used on downloads to prevent downloads if the value
124    /// is already cached.
125    ///
126    /// # Example
127    /// ```
128    /// # use google_cloud_storage::client::Storage;
129    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
130    /// let response = client
131    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
132    ///     .with_if_metageneration_not_match(1234)
133    ///     .send()
134    ///     .await?;
135    /// println!("response details={response:?}");
136    /// # Ok(()) }
137    /// ```
138    ///
139    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
140    pub fn with_if_metageneration_not_match<V>(mut self, v: V) -> Self
141    where
142        V: Into<i64>,
143    {
144        self.spec.if_metageneration_not_match = Some(v.into());
145        self
146    }
147
148    /// Sets the ACL for the new object.
149    ///
150    /// # Example
151    /// ```
152    /// # use google_cloud_storage::client::Storage;
153    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
154    /// # use google_cloud_storage::model::ObjectAccessControl;
155    /// let response = client
156    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
157    ///     .with_acl([ObjectAccessControl::new().set_entity("allAuthenticatedUsers").set_role("READER")])
158    ///     .send()
159    ///     .await?;
160    /// println!("response details={response:?}");
161    /// # Ok(()) }
162    /// ```
163    pub fn with_acl<I, V>(mut self, v: I) -> Self
164    where
165        I: IntoIterator<Item = V>,
166        V: Into<crate::model::ObjectAccessControl>,
167    {
168        self.mut_resource().acl = v.into_iter().map(|a| a.into()).collect();
169        self
170    }
171
172    /// Sets the [cache control] for the new object.
173    ///
174    /// This can be used to control caching in [public objects].
175    ///
176    /// # Example
177    /// ```
178    /// # use google_cloud_storage::client::Storage;
179    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
180    /// let response = client
181    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
182    ///     .with_cache_control("public; max-age=7200")
183    ///     .send()
184    ///     .await?;
185    /// println!("response details={response:?}");
186    /// # Ok(()) }
187    /// ```
188    ///
189    /// [public objects]: https://cloud.google.com/storage/docs/access-control/making-data-public
190    /// [cache control]: https://datatracker.ietf.org/doc/html/rfc7234#section-5.2
191    pub fn with_cache_control<V: Into<String>>(mut self, v: V) -> Self {
192        self.mut_resource().cache_control = v.into();
193        self
194    }
195
196    /// Sets the [content disposition] for the new object.
197    ///
198    /// Google Cloud Storage can serve content directly to web browsers. This
199    /// attribute sets the `Content-Disposition` header, which may change how
200    /// the browser displays the contents.
201    ///
202    /// # Example
203    /// ```
204    /// # use google_cloud_storage::client::Storage;
205    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
206    /// let response = client
207    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
208    ///     .with_content_disposition("inline")
209    ///     .send()
210    ///     .await?;
211    /// println!("response details={response:?}");
212    /// # Ok(()) }
213    /// ```
214    ///
215    /// [content disposition]: https://datatracker.ietf.org/doc/html/rfc6266
216    pub fn with_content_disposition<V: Into<String>>(mut self, v: V) -> Self {
217        self.mut_resource().content_disposition = v.into();
218        self
219    }
220
221    /// Sets the [content encoding] for the object data.
222    ///
223    /// This can be used to upload compressed data and enable [transcoding] of
224    /// the data during downloads.
225    ///
226    /// # Example
227    /// ```
228    /// # use google_cloud_storage::client::Storage;
229    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
230    /// use flate2::write::GzEncoder;
231    /// use std::io::Write;
232    /// let mut e = GzEncoder::new(Vec::new(), flate2::Compression::default());
233    /// e.write_all(b"hello world");
234    /// let response = client
235    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", bytes::Bytes::from_owner(e.finish()?))
236    ///     .with_content_encoding("gzip")
237    ///     .send()
238    ///     .await?;
239    /// println!("response details={response:?}");
240    /// # Ok(()) }
241    /// ```
242    ///
243    /// [transcoding]: https://cloud.google.com/storage/docs/transcoding
244    /// [content encoding]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.2
245    pub fn with_content_encoding<V: Into<String>>(mut self, v: V) -> Self {
246        self.mut_resource().content_encoding = v.into();
247        self
248    }
249
250    /// Sets the [content language] for the new object.
251    ///
252    /// Google Cloud Storage can serve content directly to web browsers. This
253    /// attribute sets the `Content-Language` header, which may change how the
254    /// browser displays the contents.
255    ///
256    /// # Example
257    /// ```
258    /// # use google_cloud_storage::client::Storage;
259    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
260    /// let response = client
261    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
262    ///     .with_content_language("en")
263    ///     .send()
264    ///     .await?;
265    /// println!("response details={response:?}");
266    /// # Ok(()) }
267    /// ```
268    ///
269    /// [content language]: https://cloud.google.com/storage/docs/metadata#content-language
270    pub fn with_content_language<V: Into<String>>(mut self, v: V) -> Self {
271        self.mut_resource().content_language = v.into();
272        self
273    }
274
275    /// Sets the [content type] for the new object.
276    ///
277    /// Google Cloud Storage can serve content directly to web browsers. This
278    /// attribute sets the `Content-Type` header, which may change how the
279    /// browser interprets the contents.
280    ///
281    /// # Example
282    /// ```
283    /// # use google_cloud_storage::client::Storage;
284    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
285    /// let response = client
286    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
287    ///     .with_content_type("text/plain")
288    ///     .send()
289    ///     .await?;
290    /// println!("response details={response:?}");
291    /// # Ok(()) }
292    /// ```
293    ///
294    /// [content type]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.5
295    pub fn with_content_type<V: Into<String>>(mut self, v: V) -> Self {
296        self.mut_resource().content_type = v.into();
297        self
298    }
299
300    /// Sets the [custom time] for the new object.
301    ///
302    /// This field is typically set in order to use the [DaysSinceCustomTime]
303    /// condition in Object Lifecycle Management.
304    ///
305    /// # Example
306    /// ```
307    /// # use google_cloud_storage::client::Storage;
308    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
309    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
310    /// let response = client
311    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
312    ///     .with_custom_time(time)
313    ///     .send()
314    ///     .await?;
315    /// println!("response details={response:?}");
316    /// # Ok(()) }
317    /// ```
318    ///
319    /// [DaysSinceCustomTime]: https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
320    /// [custom time]: https://cloud.google.com/storage/docs/metadata#custom-time
321    pub fn with_custom_time<V: Into<wkt::Timestamp>>(mut self, v: V) -> Self {
322        self.mut_resource().custom_time = Some(v.into());
323        self
324    }
325
326    /// Sets the [event based hold] flag for the new object.
327    ///
328    /// This field is typically set in order to prevent objects from being
329    /// deleted or modified.
330    ///
331    /// # Example
332    /// ```
333    /// # use google_cloud_storage::client::Storage;
334    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
335    /// let response = client
336    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
337    ///     .with_event_based_hold(true)
338    ///     .send()
339    ///     .await?;
340    /// println!("response details={response:?}");
341    /// # Ok(()) }
342    /// ```
343    ///
344    /// [event based hold]: https://cloud.google.com/storage/docs/object-holds
345    pub fn with_event_based_hold<V: Into<bool>>(mut self, v: V) -> Self {
346        self.mut_resource().event_based_hold = Some(v.into());
347        self
348    }
349
350    /// Sets the [custom metadata] for the new object.
351    ///
352    /// This field is typically set to annotate the object with
353    /// application-specific metadata.
354    ///
355    /// # Example
356    /// ```
357    /// # use google_cloud_storage::client::Storage;
358    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
359    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
360    /// let response = client
361    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
362    ///     .with_metadata([("test-only", "true"), ("environment", "qa")])
363    ///     .send()
364    ///     .await?;
365    /// println!("response details={response:?}");
366    /// # Ok(()) }
367    /// ```
368    ///
369    /// [custom metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
370    pub fn with_metadata<I, K, V>(mut self, i: I) -> Self
371    where
372        I: IntoIterator<Item = (K, V)>,
373        K: Into<String>,
374        V: Into<String>,
375    {
376        self.mut_resource().metadata = i.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
377        self
378    }
379
380    /// Sets the [retention configuration] for the new object.
381    ///
382    /// # Example
383    /// ```
384    /// # use google_cloud_storage::client::Storage;
385    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
386    /// # use google_cloud_storage::model::object::{Retention, retention};
387    /// let response = client
388    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
389    ///     .with_retention(
390    ///         Retention::new()
391    ///             .set_mode(retention::Mode::Locked)
392    ///             .set_retain_until_time(wkt::Timestamp::try_from("2035-01-01T00:00:00Z")?))
393    ///     .send()
394    ///     .await?;
395    /// println!("response details={response:?}");
396    /// # Ok(()) }
397    /// ```
398    ///
399    /// [retention configuration]: https://cloud.google.com/storage/docs/metadata#retention-config
400    pub fn with_retention<V>(mut self, v: V) -> Self
401    where
402        V: Into<crate::model::object::Retention>,
403    {
404        self.mut_resource().retention = Some(v.into());
405        self
406    }
407
408    /// Sets the [storage class] for the new object.
409    ///
410    /// # Example
411    /// ```
412    /// # use google_cloud_storage::client::Storage;
413    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
414    /// let response = client
415    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
416    ///     .with_storage_class("ARCHIVE")
417    ///     .send()
418    ///     .await?;
419    /// println!("response details={response:?}");
420    /// # Ok(()) }
421    /// ```
422    ///
423    /// [storage class]: https://cloud.google.com/storage/docs/storage-classes
424    pub fn with_storage_class<V>(mut self, v: V) -> Self
425    where
426        V: Into<String>,
427    {
428        self.mut_resource().storage_class = v.into();
429        self
430    }
431
432    /// Sets the [temporary hold] flag for the new object.
433    ///
434    /// This field is typically set in order to prevent objects from being
435    /// deleted or modified.
436    ///
437    /// # Example
438    /// ```
439    /// # use google_cloud_storage::client::Storage;
440    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
441    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
442    /// let response = client
443    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
444    ///     .with_temporary_hold(true)
445    ///     .send()
446    ///     .await?;
447    /// println!("response details={response:?}");
448    /// # Ok(()) }
449    /// ```
450    ///
451    /// [temporary hold]: https://cloud.google.com/storage/docs/object-holds
452    pub fn with_temporary_hold<V: Into<bool>>(mut self, v: V) -> Self {
453        self.mut_resource().temporary_hold = v.into();
454        self
455    }
456
457    /// Sets the resource name of the [Customer-managed encryption key] for this
458    /// object.
459    ///
460    /// The service imposes a number of restrictions on the keys used to encrypt
461    /// Google Cloud Storage objects. Read the documentation in full before
462    /// trying to use customer-managed encryption keys. In particular, verify
463    /// the service has the necessary permissions, and the key is in a
464    /// compatible location.
465    ///
466    /// # Example
467    /// ```
468    /// # use google_cloud_storage::client::Storage;
469    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
470    /// let response = client
471    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
472    ///     .with_kms_key("projects/test-project/locations/us-central1/keyRings/test-ring/cryptoKeys/test-key")
473    ///     .send()
474    ///     .await?;
475    /// println!("response details={response:?}");
476    /// # Ok(()) }
477    /// ```
478    ///
479    /// [Customer-managed encryption key]: https://cloud.google.com/storage/docs/encryption/customer-managed-keys
480    pub fn with_kms_key<V>(mut self, v: V) -> Self
481    where
482        V: Into<String>,
483    {
484        self.mut_resource().kms_key = v.into();
485        self
486    }
487
488    /// Configure this object to use one of the [predefined ACLs].
489    ///
490    /// # Example
491    /// ```
492    /// # use google_cloud_storage::client::Storage;
493    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
494    /// let response = client
495    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
496    ///     .with_predefined_acl("private")
497    ///     .send()
498    ///     .await?;
499    /// println!("response details={response:?}");
500    /// # Ok(()) }
501    /// ```
502    ///
503    /// [predefined ACLs]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
504    pub fn with_predefined_acl<V>(mut self, v: V) -> Self
505    where
506        V: Into<String>,
507    {
508        self.spec.predefined_acl = v.into();
509        self
510    }
511
512    /// The encryption key used with the Customer-Supplied Encryption Keys
513    /// feature. In raw bytes format (not base64-encoded).
514    ///
515    /// # Example
516    /// ```
517    /// # use google_cloud_storage::client::Storage;
518    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
519    /// # use google_cloud_storage::client::KeyAes256;
520    /// let key: &[u8] = &[97; 32];
521    /// let response = client
522    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
523    ///     .with_key(KeyAes256::new(key)?)
524    ///     .send()
525    ///     .await?;
526    /// println!("response details={response:?}");
527    /// # Ok(()) }
528    /// ```
529    pub fn with_key(mut self, v: KeyAes256) -> Self {
530        self.params = Some(v.into());
531        self
532    }
533
534    // TODO(#2050) - this should be automatically computed?
535    #[allow(dead_code)]
536    fn with_crc32c<V>(mut self, v: V) -> Self
537    where
538        V: Into<u32>,
539    {
540        let mut checksum = self.mut_resource().checksums.take().unwrap_or_default();
541        checksum.crc32c = Some(v.into());
542        self.mut_resource().checksums = Some(checksum);
543        self
544    }
545
546    // TODO(#2050) - this should be automatically computed?
547    #[allow(dead_code)]
548    fn with_md5_hash<I, V>(mut self, i: I) -> Self
549    where
550        I: IntoIterator<Item = V>,
551        V: Into<u8>,
552    {
553        let mut checksum = self.mut_resource().checksums.take().unwrap_or_default();
554        checksum.md5_hash = i.into_iter().map(|v| v.into()).collect();
555        // TODO(#2050) - should we return an error (or panic?) if the size is wrong?
556        self.mut_resource().checksums = Some(checksum);
557        self
558    }
559
560    /// The retry policy used for this request.
561    ///
562    /// # Example
563    /// ```
564    /// # use google_cloud_storage::client::Storage;
565    /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
566    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
567    /// use std::time::Duration;
568    /// use gax::retry_policy::RetryPolicyExt;
569    /// let response = client
570    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
571    ///     .with_retry_policy(RecommendedPolicy
572    ///         .with_attempt_limit(5)
573    ///         .with_time_limit(Duration::from_secs(10)),
574    ///     )
575    ///     .send()
576    ///     .await?;
577    /// println!("response details={response:?}");
578    /// # Ok(()) }
579    /// ```
580    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
581        self.options.retry_policy = v.into().into();
582        self
583    }
584
585    /// The backoff policy used for this request.
586    ///
587    /// # Example
588    /// ```
589    /// # use google_cloud_storage::client::Storage;
590    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
591    /// use std::time::Duration;
592    /// use gax::exponential_backoff::ExponentialBackoff;
593    /// let response = client
594    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
595    ///     .with_backoff_policy(ExponentialBackoff::default())
596    ///     .send()
597    ///     .await?;
598    /// println!("response details={response:?}");
599    /// # Ok(()) }
600    /// ```
601    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
602        mut self,
603        v: V,
604    ) -> Self {
605        self.options.backoff_policy = v.into().into();
606        self
607    }
608
609    /// The retry throttler used for this request.
610    ///
611    /// Most of the time you want to use the same throttler for all the requests
612    /// in a client, and even the same throttler for many clients. Rarely it
613    /// maybe be necessary to use an ad-hoc throttler for some subset of the
614    /// requests.
615    ///
616    /// # Example
617    /// ```
618    /// # use google_cloud_storage::client::Storage;
619    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
620    /// let response = client
621    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
622    ///     .with_retry_throttler(adhoc_throttler())
623    ///     .send()
624    ///     .await?;
625    /// println!("response details={response:?}");
626    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
627    ///     # panic!();
628    /// }
629    /// # Ok(()) }
630    /// ```
631    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
632        mut self,
633        v: V,
634    ) -> Self {
635        self.options.retry_throttler = v.into().into();
636        self
637    }
638
639    /// Sets the payload size threshold to switch from single-shot to resumable uploads.
640    ///
641    /// # Example
642    /// ```
643    /// # use google_cloud_storage::client::Storage;
644    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
645    /// let response = client
646    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
647    ///     .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
648    ///     .send()
649    ///     .await?;
650    /// println!("response details={response:?}");
651    /// # Ok(()) }
652    /// ```
653    ///
654    /// The client library can perform uploads using [single-shot] or
655    /// [resumable] uploads. For small objects, single-shot uploads offer better
656    /// performance, as they require a single HTTP transfer. For larger objects,
657    /// the additional request latency is not significant, and resumable uploads
658    /// offer better recovery on errors.
659    ///
660    /// The library automatically selects resumable uploads when the payload is
661    /// equal to or larger than this option. For smaller uploads the client
662    /// library uses single-shot uploads.
663    ///
664    /// The exact threshold depends on where the application is deployed and
665    /// destination bucket location with respect to where the application is
666    /// running. The library defaults should work well in most cases, but some
667    /// applications may benefit from fine-tuning.
668    ///
669    /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
670    /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
671    pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
672        self.options.resumable_upload_threshold = v.into();
673        self
674    }
675
676    /// Changes the buffer size for some resumable uploads.
677    ///
678    /// # Example
679    /// ```
680    /// # use google_cloud_storage::client::Storage;
681    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
682    /// let response = client
683    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
684    ///     .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
685    ///     .send()
686    ///     .await?;
687    /// println!("response details={response:?}");
688    /// # Ok(()) }
689    /// ```
690    ///
691    /// When performing [resumable uploads] from sources without [Seek] the
692    /// client library needs to buffer data in memory until it is persisted by
693    /// the service. Otherwise the data would be lost if the upload fails.
694    /// Applications may want to tune this buffer size:
695    ///
696    /// - Use smaller buffer sizes to support more concurrent uploads in the
697    ///   same application.
698    /// - Use larger buffer sizes for better throughput. Sending many small
699    ///   buffers stalls the upload until the client receives a successful
700    ///   response from the service.
701    ///
702    /// Keep in mind that there are diminishing returns on using larger buffers.
703    ///
704    /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
705    /// [Seek]: crate::upload_source::Seek
706    pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
707        self.options.resumable_upload_buffer_size = v.into();
708        self
709    }
710
711    fn mut_resource(&mut self) -> &mut crate::model::Object {
712        self.spec
713            .resource
714            .as_mut()
715            .expect("resource field initialized in `new()`")
716    }
717
718    fn resource(&self) -> &crate::model::Object {
719        self.spec
720            .resource
721            .as_ref()
722            .expect("resource field initialized in `new()`")
723    }
724
725    pub(crate) fn new<B, O, P>(
726        inner: std::sync::Arc<StorageInner>,
727        bucket: B,
728        object: O,
729        payload: P,
730    ) -> Self
731    where
732        B: Into<String>,
733        O: Into<String>,
734        P: Into<InsertPayload<T>>,
735    {
736        let options = inner.options.clone();
737        let resource = crate::model::Object::new()
738            .set_bucket(bucket)
739            .set_name(object);
740        UploadObject {
741            inner,
742            spec: crate::model::WriteObjectSpec::new().set_resource(resource),
743            params: None,
744            payload: Arc::new(Mutex::new(payload.into())),
745            options,
746        }
747    }
748
749    async fn start_resumable_upload(&self) -> Result<String>
750    where
751        T: Send + Sync + 'static,
752    {
753        let id = gax::retry_loop_internal::retry_loop(
754            // TODO(#2044) - we need to apply any timeouts here.
755            async |_| self.start_resumable_upload_attempt().await,
756            async |duration| tokio::time::sleep(duration).await,
757            // Creating a resumable upload is always idempotent. There are no
758            // **observable** side-effects if executed multiple times. Any extra
759            // sessions created in the retry loop are simply lost and eventually
760            // garbage collected.
761            true,
762            self.options.retry_throttler.clone(),
763            self.options.retry_policy.clone(),
764            self.options.backoff_policy.clone(),
765        )
766        .await?;
767        Ok(id)
768    }
769
770    async fn start_resumable_upload_attempt(&self) -> Result<String> {
771        let builder = self.start_resumable_upload_request().await?;
772        let response = builder.send().await.map_err(Error::io)?;
773        self::handle_start_resumable_upload_response(response).await
774    }
775
776    async fn start_resumable_upload_request(&self) -> Result<reqwest::RequestBuilder> {
777        let bucket = &self.resource().bucket;
778        let bucket_id = bucket.strip_prefix("projects/_/buckets/").ok_or_else(|| {
779            Error::binding(format!(
780                "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
781            ))
782        })?;
783        let object = &self.resource().name;
784        let builder = self
785            .inner
786            .client
787            .request(
788                reqwest::Method::POST,
789                format!("{}/upload/storage/v1/b/{bucket_id}/o", &self.inner.endpoint),
790            )
791            .query(&[("uploadType", "resumable")])
792            .query(&[("name", enc(object))])
793            .header("content-type", "application/json")
794            .header(
795                "x-goog-api-client",
796                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
797            );
798
799        let builder = self.apply_preconditions(builder);
800        let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
801        let builder = self.inner.apply_auth_headers(builder).await?;
802        let builder = builder.json(&v1::insert_body(self.resource()));
803        Ok(builder)
804    }
805
806    fn apply_preconditions(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
807        let builder = self
808            .spec
809            .if_generation_match
810            .iter()
811            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
812        let builder = self
813            .spec
814            .if_generation_not_match
815            .iter()
816            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
817        let builder = self
818            .spec
819            .if_metageneration_match
820            .iter()
821            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
822        let builder = self
823            .spec
824            .if_metageneration_not_match
825            .iter()
826            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
827
828        [
829            ("kmsKeyName", self.resource().kms_key.as_str()),
830            ("predefinedAcl", self.spec.predefined_acl.as_str()),
831        ]
832        .into_iter()
833        .fold(
834            builder,
835            |b, (k, v)| if v.is_empty() { b } else { b.query(&[(k, v)]) },
836        )
837    }
838}
839
840async fn handle_start_resumable_upload_response(response: reqwest::Response) -> Result<String> {
841    if !response.status().is_success() {
842        return gaxi::http::to_http_error(response).await;
843    }
844    let location = response
845        .headers()
846        .get("Location")
847        .ok_or_else(|| Error::deser("missing Location header in start resumable upload"))?;
848    location.to_str().map_err(Error::deser).map(str::to_string)
849}
850
851#[cfg(test)]
852mod tests {
853    use super::client::tests::{create_key_helper, test_builder, test_inner_client};
854    use super::*;
855    use crate::model::WriteObjectSpec;
856    use gax::retry_policy::RetryPolicyExt;
857    use gax::retry_result::RetryResult;
858    use httptest::{Expectation, Server, matchers::*, responders::status_code};
859    use serde_json::{Value, json};
860    use std::collections::BTreeMap;
861    use std::time::Duration;
862
863    type Result = anyhow::Result<()>;
864
865    #[test]
866    fn upload_object_unbuffered_metadata() -> Result {
867        use crate::model::ObjectAccessControl;
868        let inner = test_inner_client(test_builder());
869        let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
870            .with_if_generation_match(10)
871            .with_if_generation_not_match(20)
872            .with_if_metageneration_match(30)
873            .with_if_metageneration_not_match(40)
874            .with_predefined_acl("private")
875            .with_acl([ObjectAccessControl::new()
876                .set_entity("allAuthenticatedUsers")
877                .set_role("READER")])
878            .with_cache_control("public; max-age=7200")
879            .with_content_disposition("inline")
880            .with_content_encoding("gzip")
881            .with_content_language("en")
882            .with_content_type("text/plain")
883            .with_crc32c(crc32c::crc32c(b""))
884            .with_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
885            .with_event_based_hold(true)
886            .with_md5_hash(md5::compute(b"").0)
887            .with_metadata([("k0", "v0"), ("k1", "v1")])
888            .with_retention(
889                crate::model::object::Retention::new()
890                    .set_mode(crate::model::object::retention::Mode::Locked)
891                    .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
892            )
893            .with_storage_class("ARCHIVE")
894            .with_temporary_hold(true)
895            .with_kms_key("test-key");
896
897        let resource = request.spec.resource.take().unwrap();
898        let request = request;
899        assert_eq!(
900            &request.spec,
901            &WriteObjectSpec::new()
902                .set_if_generation_match(10)
903                .set_if_generation_not_match(20)
904                .set_if_metageneration_match(30)
905                .set_if_metageneration_not_match(40)
906                .set_predefined_acl("private")
907        );
908
909        assert_eq!(
910            resource,
911            Object::new()
912                .set_name("object")
913                .set_bucket("projects/_/buckets/bucket")
914                .set_acl([ObjectAccessControl::new()
915                    .set_entity("allAuthenticatedUsers")
916                    .set_role("READER")])
917                .set_cache_control("public; max-age=7200")
918                .set_content_disposition("inline")
919                .set_content_encoding("gzip")
920                .set_content_language("en")
921                .set_content_type("text/plain")
922                .set_checksums(
923                    crate::model::ObjectChecksums::new()
924                        .set_crc32c(crc32c::crc32c(b""))
925                        .set_md5_hash(bytes::Bytes::from_iter(md5::compute(b"").0))
926                )
927                .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
928                .set_event_based_hold(true)
929                .set_metadata([("k0", "v0"), ("k1", "v1")])
930                .set_retention(
931                    crate::model::object::Retention::new()
932                        .set_mode("LOCKED")
933                        .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?)
934                )
935                .set_storage_class("ARCHIVE")
936                .set_temporary_hold(true)
937                .set_kms_key("test-key")
938        );
939
940        Ok(())
941    }
942
943    #[test]
944    fn upload_object_options() {
945        let inner = test_inner_client(
946            test_builder()
947                .with_resumable_upload_threshold(123_usize)
948                .with_resumable_upload_buffer_size(234_usize),
949        );
950        let request = UploadObject::new(inner.clone(), "projects/_/buckets/bucket", "object", "");
951        assert_eq!(request.options.resumable_upload_threshold, 123);
952        assert_eq!(request.options.resumable_upload_buffer_size, 234);
953
954        let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
955            .with_resumable_upload_threshold(345_usize)
956            .with_resumable_upload_buffer_size(456_usize);
957        assert_eq!(request.options.resumable_upload_threshold, 345);
958        assert_eq!(request.options.resumable_upload_buffer_size, 456);
959    }
960
961    #[tokio::test]
962    async fn start_resumable_upload() -> Result {
963        let inner = test_inner_client(test_builder());
964        let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
965            .start_resumable_upload_request()
966            .await?
967            .build()?;
968
969        assert_eq!(request.method(), reqwest::Method::POST);
970        assert_eq!(
971            request.url().as_str(),
972            "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=resumable&name=object"
973        );
974        let body = request.body_mut().take().unwrap();
975        let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
976        let json = serde_json::from_slice::<Value>(&contents)?;
977        assert_eq!(json, json!({}));
978        Ok(())
979    }
980
981    #[tokio::test]
982    async fn start_resumable_upload_headers() -> Result {
983        // Make a 32-byte key.
984        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
985
986        let inner = test_inner_client(test_builder());
987        let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
988            .with_key(KeyAes256::new(&key)?)
989            .start_resumable_upload_request()
990            .await?
991            .build()?;
992
993        assert_eq!(request.method(), reqwest::Method::POST);
994        assert_eq!(
995            request.url().as_str(),
996            "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=resumable&name=object"
997        );
998
999        let want = vec![
1000            ("x-goog-encryption-algorithm", "AES256".to_string()),
1001            ("x-goog-encryption-key", key_base64),
1002            ("x-goog-encryption-key-sha256", key_sha256_base64),
1003        ];
1004
1005        for (name, value) in want {
1006            assert_eq!(
1007                request.headers().get(name).unwrap().as_bytes(),
1008                bytes::Bytes::from(value)
1009            );
1010        }
1011        Ok(())
1012    }
1013
1014    #[tokio::test]
1015    async fn start_resumable_upload_bad_bucket() -> Result {
1016        let inner = test_inner_client(test_builder());
1017        UploadObject::new(inner, "malformed", "object", "hello")
1018            .start_resumable_upload_request()
1019            .await
1020            .expect_err("malformed bucket string should error");
1021        Ok(())
1022    }
1023
1024    #[tokio::test]
1025    async fn start_resumable_upload_metadata_in_request() -> Result {
1026        use crate::model::ObjectAccessControl;
1027        let inner = test_inner_client(test_builder());
1028        let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1029            .with_if_generation_match(10)
1030            .with_if_generation_not_match(20)
1031            .with_if_metageneration_match(30)
1032            .with_if_metageneration_not_match(40)
1033            .with_predefined_acl("private")
1034            .with_acl([ObjectAccessControl::new()
1035                .set_entity("allAuthenticatedUsers")
1036                .set_role("READER")])
1037            .with_cache_control("public; max-age=7200")
1038            .with_content_disposition("inline")
1039            .with_content_encoding("gzip")
1040            .with_content_language("en")
1041            .with_content_type("text/plain")
1042            .with_crc32c(crc32c::crc32c(b""))
1043            .with_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1044            .with_event_based_hold(true)
1045            .with_md5_hash(md5::compute(b"").0)
1046            .with_metadata([("k0", "v0"), ("k1", "v1")])
1047            .with_retention(
1048                crate::model::object::Retention::new()
1049                    .set_mode(crate::model::object::retention::Mode::Locked)
1050                    .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
1051            )
1052            .with_storage_class("ARCHIVE")
1053            .with_temporary_hold(true)
1054            .with_kms_key("test-key")
1055            .start_resumable_upload_request()
1056            .await?
1057            .build()?;
1058
1059        assert_eq!(request.method(), reqwest::Method::POST);
1060        let want_pairs: BTreeMap<String, String> = [
1061            ("uploadType", "resumable"),
1062            ("name", "object"),
1063            ("ifGenerationMatch", "10"),
1064            ("ifGenerationNotMatch", "20"),
1065            ("ifMetagenerationMatch", "30"),
1066            ("ifMetagenerationNotMatch", "40"),
1067            ("kmsKeyName", "test-key"),
1068            ("predefinedAcl", "private"),
1069        ]
1070        .iter()
1071        .map(|(k, v)| (k.to_string(), v.to_string()))
1072        .collect();
1073        let query_pairs: BTreeMap<String, String> = request
1074            .url()
1075            .query_pairs()
1076            .map(|param| (param.0.to_string(), param.1.to_string()))
1077            .collect();
1078        assert_eq!(query_pairs, want_pairs);
1079
1080        let body = request.body_mut().take().unwrap();
1081        let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
1082        let json = serde_json::from_slice::<Value>(&contents)?;
1083        assert_eq!(
1084            json,
1085            json!({
1086                "acl": [{"entity": "allAuthenticatedUsers", "role": "READER"}],
1087                "cacheControl": "public; max-age=7200",
1088                "contentDisposition": "inline",
1089                "contentEncoding": "gzip",
1090                "contentLanguage": "en",
1091                "contentType": "text/plain",
1092                "crc32c": "AAAAAA==",
1093                "customTime": "2025-07-07T18:11:00Z",
1094                "eventBasedHold": true,
1095                "md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
1096                "metadata": {"k0": "v0", "k1": "v1"},
1097                "retention": {"mode": "LOCKED", "retainUntilTime": "2035-07-07T18:14:00Z"},
1098                "storageClass": "ARCHIVE",
1099                "temporaryHold": true,
1100            })
1101        );
1102        Ok(())
1103    }
1104
1105    #[tokio::test]
1106    async fn start_resumable_upload_credentials() -> Result {
1107        let inner = test_inner_client(
1108            test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
1109        );
1110        let _ = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1111            .start_resumable_upload_request()
1112            .await
1113            .inspect_err(|e| assert!(e.is_authentication()))
1114            .expect_err("invalid credentials should err");
1115        Ok(())
1116    }
1117
1118    #[tokio::test]
1119    async fn start_resumable_upload_immediate_success() -> Result {
1120        let server = Server::run();
1121        let session = server.url("/upload/session/test-only-001");
1122        let want = session.to_string();
1123        server.expect(
1124            Expectation::matching(all_of![
1125                request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1126                request::query(url_decoded(contains(("name", "object")))),
1127                request::query(url_decoded(contains(("uploadType", "resumable")))),
1128            ])
1129            .respond_with(
1130                status_code(200)
1131                    .append_header("location", session.to_string())
1132                    .body(""),
1133            ),
1134        );
1135
1136        let inner =
1137            test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
1138        let got = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1139            .start_resumable_upload()
1140            .await?;
1141        assert_eq!(got, want);
1142
1143        Ok(())
1144    }
1145
1146    #[tokio::test]
1147    async fn start_resumable_upload_immediate_error() -> Result {
1148        let server = Server::run();
1149        server.expect(
1150            Expectation::matching(all_of![
1151                request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1152                request::query(url_decoded(contains(("name", "object")))),
1153                request::query(url_decoded(contains(("uploadType", "resumable")))),
1154            ])
1155            .respond_with(status_code(403).body("uh-oh")),
1156        );
1157
1158        let inner =
1159            test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
1160        let err = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1161            .start_resumable_upload()
1162            .await
1163            .expect_err("request should fail");
1164        assert_eq!(err.http_status_code(), Some(403), "{err:?}");
1165
1166        Ok(())
1167    }
1168
1169    #[tokio::test]
1170    async fn start_resumable_upload_retry_transient_failures_then_success() -> Result {
1171        use httptest::responders::cycle;
1172        let server = Server::run();
1173        let session = server.url("/upload/session/test-only-001");
1174        let want = session.to_string();
1175        let matching = || {
1176            Expectation::matching(all_of![
1177                request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1178                request::query(url_decoded(contains(("name", "object")))),
1179                request::query(url_decoded(contains(("uploadType", "resumable")))),
1180            ])
1181        };
1182        server.expect(matching().times(3).respond_with(cycle![
1183            status_code(503).body("try-again"),
1184            status_code(503).body("try-again"),
1185            status_code(200).append_header("location", session.to_string()),
1186        ]));
1187
1188        let inner =
1189            test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
1190        let got = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1191            .start_resumable_upload()
1192            .await?;
1193        assert_eq!(got, want);
1194
1195        Ok(())
1196    }
1197
1198    // Verify the retry options are used and that exhausted policies result in
1199    // errors.
1200    #[tokio::test]
1201    async fn start_resumable_upload_request_retry_options() -> Result {
1202        let server = Server::run();
1203        let matching = || {
1204            Expectation::matching(all_of![
1205                request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1206                request::query(url_decoded(contains(("name", "object")))),
1207                request::query(url_decoded(contains(("uploadType", "resumable")))),
1208            ])
1209        };
1210        server.expect(
1211            matching()
1212                .times(3)
1213                .respond_with(status_code(503).body("try-again")),
1214        );
1215
1216        let inner =
1217            test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
1218        let mut retry = MockRetryPolicy::new();
1219        retry
1220            .expect_on_error()
1221            .times(1..)
1222            .returning(|_, _, _, e| RetryResult::Continue(e));
1223
1224        let mut backoff = MockBackoffPolicy::new();
1225        backoff
1226            .expect_on_failure()
1227            .times(1..)
1228            .return_const(Duration::from_micros(1));
1229
1230        let mut throttler = MockRetryThrottler::new();
1231        throttler
1232            .expect_throttle_retry_attempt()
1233            .times(1..)
1234            .return_const(false);
1235        throttler
1236            .expect_on_retry_failure()
1237            .times(1..)
1238            .return_const(());
1239        throttler.expect_on_success().never().return_const(());
1240
1241        let err = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1242            .with_retry_policy(retry.with_attempt_limit(3))
1243            .with_backoff_policy(backoff)
1244            .with_retry_throttler(throttler)
1245            .start_resumable_upload()
1246            .await
1247            .expect_err("request should fail after 3 retry attempts");
1248        assert_eq!(err.http_status_code(), Some(503), "{err:?}");
1249
1250        Ok(())
1251    }
1252
1253    // Verify the client retry options are used and that exhausted policies
1254    // result in errors.
1255    #[tokio::test]
1256    async fn start_resumable_upload_client_retry_options() -> Result {
1257        let server = Server::run();
1258        let matching = || {
1259            Expectation::matching(all_of![
1260                request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1261                request::query(url_decoded(contains(("name", "object")))),
1262                request::query(url_decoded(contains(("uploadType", "resumable")))),
1263            ])
1264        };
1265        server.expect(
1266            matching()
1267                .times(3)
1268                .respond_with(status_code(503).body("try-again")),
1269        );
1270
1271        let mut retry = MockRetryPolicy::new();
1272        retry
1273            .expect_on_error()
1274            .times(1..)
1275            .returning(|_, _, _, e| RetryResult::Continue(e));
1276
1277        let mut backoff = MockBackoffPolicy::new();
1278        backoff
1279            .expect_on_failure()
1280            .times(1..)
1281            .return_const(Duration::from_micros(1));
1282
1283        let mut throttler = MockRetryThrottler::new();
1284        throttler
1285            .expect_throttle_retry_attempt()
1286            .times(1..)
1287            .return_const(false);
1288        throttler
1289            .expect_on_retry_failure()
1290            .times(1..)
1291            .return_const(());
1292        throttler.expect_on_success().never().return_const(());
1293
1294        let inner = test_inner_client(
1295            test_builder()
1296                .with_endpoint(format!("http://{}", server.addr()))
1297                .with_retry_policy(retry.with_attempt_limit(3))
1298                .with_backoff_policy(backoff)
1299                .with_retry_throttler(throttler),
1300        );
1301        let err = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1302            .start_resumable_upload()
1303            .await
1304            .expect_err("request should fail after 3 retry attempts");
1305        assert_eq!(err.http_status_code(), Some(503), "{err:?}");
1306
1307        Ok(())
1308    }
1309
1310    mockall::mock! {
1311        #[derive(Debug)]
1312        pub RetryThrottler {}
1313
1314        impl gax::retry_throttler::RetryThrottler for RetryThrottler {
1315            fn throttle_retry_attempt(&self) -> bool;
1316            fn on_retry_failure(&mut self, flow: &RetryResult);
1317            fn on_success(&mut self);
1318        }
1319    }
1320
1321    mockall::mock! {
1322        #[derive(Debug)]
1323        pub RetryPolicy {}
1324
1325        impl gax::retry_policy::RetryPolicy for RetryPolicy {
1326            fn on_error(&self, loop_start: std::time::Instant, attempt_count: u32, idempotent: bool, error: gax::error::Error) -> RetryResult;
1327        }
1328    }
1329
1330    mockall::mock! {
1331        #[derive(Debug)]
1332        pub BackoffPolicy {}
1333
1334        impl gax::backoff_policy::BackoffPolicy for BackoffPolicy {
1335            fn on_failure(&self, loop_start: std::time::Instant, attempt_count: u32) -> std::time::Duration;
1336        }
1337    }
1338}