google_cloud_storage/storage/upload_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::client::*;
16use super::*;
17use futures::stream::unfold;
18use std::collections::VecDeque;
19use std::sync::Arc;
20use tokio::sync::Mutex;
21
22mod buffered;
23mod unbuffered;
24
25/// A request builder for uploads without rewind.
26pub struct UploadObject<T> {
27 inner: std::sync::Arc<StorageInner>,
28 spec: crate::model::WriteObjectSpec,
29 params: Option<crate::model::CommonObjectRequestParams>,
30 // We need `Arc<Mutex<>>` because this is re-used in retryable uploads.
31 payload: Arc<Mutex<InsertPayload<T>>>,
32 options: super::request_options::RequestOptions,
33}
34
35impl<T> UploadObject<T> {
36 /// Set a [request precondition] on the object generation to match.
37 ///
38 /// With this precondition the request fails if the current object
39 /// generation matches the provided value. A common value is `0`, which
40 /// prevents uploads from succeeding if the object already exists.
41 ///
42 /// # Example
43 /// ```
44 /// # use google_cloud_storage::client::Storage;
45 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
46 /// let response = client
47 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
48 /// .with_if_generation_match(0)
49 /// .send()
50 /// .await?;
51 /// println!("response details={response:?}");
52 /// # Ok(()) }
53 /// ```
54 ///
55 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
56 pub fn with_if_generation_match<V>(mut self, v: V) -> Self
57 where
58 V: Into<i64>,
59 {
60 self.spec.if_generation_match = Some(v.into());
61 self
62 }
63
64 /// Set a [request precondition] on the object generation to match.
65 ///
66 /// With this precondition the request fails if the current object
67 /// generation does not match the provided value.
68 ///
69 /// # Example
70 /// ```
71 /// # use google_cloud_storage::client::Storage;
72 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
73 /// let response = client
74 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
75 /// .with_if_generation_not_match(0)
76 /// .send()
77 /// .await?;
78 /// println!("response details={response:?}");
79 /// # Ok(()) }
80 /// ```
81 ///
82 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
83 pub fn with_if_generation_not_match<V>(mut self, v: V) -> Self
84 where
85 V: Into<i64>,
86 {
87 self.spec.if_generation_not_match = Some(v.into());
88 self
89 }
90
91 /// Set a [request precondition] on the object meta generation.
92 ///
93 /// With this precondition the request fails if the current object metadata
94 /// generation does not match the provided value. This may be useful to
95 /// prevent changes when the metageneration is known.
96 ///
97 /// # Example
98 /// ```
99 /// # use google_cloud_storage::client::Storage;
100 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
101 /// let response = client
102 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
103 /// .with_if_metageneration_match(1234)
104 /// .send()
105 /// .await?;
106 /// println!("response details={response:?}");
107 /// # Ok(()) }
108 /// ```
109 ///
110 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
111 pub fn with_if_metageneration_match<V>(mut self, v: V) -> Self
112 where
113 V: Into<i64>,
114 {
115 self.spec.if_metageneration_match = Some(v.into());
116 self
117 }
118
119 /// Set a [request precondition] on the object meta-generation.
120 ///
121 /// With this precondition the request fails if the current object metadata
122 /// generation matches the provided value. This is rarely useful in uploads,
123 /// it is more commonly used on downloads to prevent downloads if the value
124 /// is already cached.
125 ///
126 /// # Example
127 /// ```
128 /// # use google_cloud_storage::client::Storage;
129 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
130 /// let response = client
131 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
132 /// .with_if_metageneration_not_match(1234)
133 /// .send()
134 /// .await?;
135 /// println!("response details={response:?}");
136 /// # Ok(()) }
137 /// ```
138 ///
139 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
140 pub fn with_if_metageneration_not_match<V>(mut self, v: V) -> Self
141 where
142 V: Into<i64>,
143 {
144 self.spec.if_metageneration_not_match = Some(v.into());
145 self
146 }
147
148 /// Sets the ACL for the new object.
149 ///
150 /// # Example
151 /// ```
152 /// # use google_cloud_storage::client::Storage;
153 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
154 /// # use google_cloud_storage::model::ObjectAccessControl;
155 /// let response = client
156 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
157 /// .with_acl([ObjectAccessControl::new().set_entity("allAuthenticatedUsers").set_role("READER")])
158 /// .send()
159 /// .await?;
160 /// println!("response details={response:?}");
161 /// # Ok(()) }
162 /// ```
163 pub fn with_acl<I, V>(mut self, v: I) -> Self
164 where
165 I: IntoIterator<Item = V>,
166 V: Into<crate::model::ObjectAccessControl>,
167 {
168 self.mut_resource().acl = v.into_iter().map(|a| a.into()).collect();
169 self
170 }
171
172 /// Sets the [cache control] for the new object.
173 ///
174 /// This can be used to control caching in [public objects].
175 ///
176 /// # Example
177 /// ```
178 /// # use google_cloud_storage::client::Storage;
179 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
180 /// let response = client
181 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
182 /// .with_cache_control("public; max-age=7200")
183 /// .send()
184 /// .await?;
185 /// println!("response details={response:?}");
186 /// # Ok(()) }
187 /// ```
188 ///
189 /// [public objects]: https://cloud.google.com/storage/docs/access-control/making-data-public
190 /// [cache control]: https://datatracker.ietf.org/doc/html/rfc7234#section-5.2
191 pub fn with_cache_control<V: Into<String>>(mut self, v: V) -> Self {
192 self.mut_resource().cache_control = v.into();
193 self
194 }
195
196 /// Sets the [content disposition] for the new object.
197 ///
198 /// Google Cloud Storage can serve content directly to web browsers. This
199 /// attribute sets the `Content-Disposition` header, which may change how
200 /// the browser displays the contents.
201 ///
202 /// # Example
203 /// ```
204 /// # use google_cloud_storage::client::Storage;
205 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
206 /// let response = client
207 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
208 /// .with_content_disposition("inline")
209 /// .send()
210 /// .await?;
211 /// println!("response details={response:?}");
212 /// # Ok(()) }
213 /// ```
214 ///
215 /// [content disposition]: https://datatracker.ietf.org/doc/html/rfc6266
216 pub fn with_content_disposition<V: Into<String>>(mut self, v: V) -> Self {
217 self.mut_resource().content_disposition = v.into();
218 self
219 }
220
221 /// Sets the [content encoding] for the object data.
222 ///
223 /// This can be used to upload compressed data and enable [transcoding] of
224 /// the data during downloads.
225 ///
226 /// # Example
227 /// ```
228 /// # use google_cloud_storage::client::Storage;
229 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
230 /// use flate2::write::GzEncoder;
231 /// use std::io::Write;
232 /// let mut e = GzEncoder::new(Vec::new(), flate2::Compression::default());
233 /// e.write_all(b"hello world");
234 /// let response = client
235 /// .upload_object("projects/_/buckets/my-bucket", "my-object", bytes::Bytes::from_owner(e.finish()?))
236 /// .with_content_encoding("gzip")
237 /// .send()
238 /// .await?;
239 /// println!("response details={response:?}");
240 /// # Ok(()) }
241 /// ```
242 ///
243 /// [transcoding]: https://cloud.google.com/storage/docs/transcoding
244 /// [content encoding]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.2
245 pub fn with_content_encoding<V: Into<String>>(mut self, v: V) -> Self {
246 self.mut_resource().content_encoding = v.into();
247 self
248 }
249
250 /// Sets the [content language] for the new object.
251 ///
252 /// Google Cloud Storage can serve content directly to web browsers. This
253 /// attribute sets the `Content-Language` header, which may change how the
254 /// browser displays the contents.
255 ///
256 /// # Example
257 /// ```
258 /// # use google_cloud_storage::client::Storage;
259 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
260 /// let response = client
261 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
262 /// .with_content_language("en")
263 /// .send()
264 /// .await?;
265 /// println!("response details={response:?}");
266 /// # Ok(()) }
267 /// ```
268 ///
269 /// [content language]: https://cloud.google.com/storage/docs/metadata#content-language
270 pub fn with_content_language<V: Into<String>>(mut self, v: V) -> Self {
271 self.mut_resource().content_language = v.into();
272 self
273 }
274
275 /// Sets the [content type] for the new object.
276 ///
277 /// Google Cloud Storage can serve content directly to web browsers. This
278 /// attribute sets the `Content-Type` header, which may change how the
279 /// browser interprets the contents.
280 ///
281 /// # Example
282 /// ```
283 /// # use google_cloud_storage::client::Storage;
284 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
285 /// let response = client
286 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
287 /// .with_content_type("text/plain")
288 /// .send()
289 /// .await?;
290 /// println!("response details={response:?}");
291 /// # Ok(()) }
292 /// ```
293 ///
294 /// [content type]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.5
295 pub fn with_content_type<V: Into<String>>(mut self, v: V) -> Self {
296 self.mut_resource().content_type = v.into();
297 self
298 }
299
300 /// Sets the [custom time] for the new object.
301 ///
302 /// This field is typically set in order to use the [DaysSinceCustomTime]
303 /// condition in Object Lifecycle Management.
304 ///
305 /// # Example
306 /// ```
307 /// # use google_cloud_storage::client::Storage;
308 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
309 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
310 /// let response = client
311 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
312 /// .with_custom_time(time)
313 /// .send()
314 /// .await?;
315 /// println!("response details={response:?}");
316 /// # Ok(()) }
317 /// ```
318 ///
319 /// [DaysSinceCustomTime]: https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
320 /// [custom time]: https://cloud.google.com/storage/docs/metadata#custom-time
321 pub fn with_custom_time<V: Into<wkt::Timestamp>>(mut self, v: V) -> Self {
322 self.mut_resource().custom_time = Some(v.into());
323 self
324 }
325
326 /// Sets the [event based hold] flag for the new object.
327 ///
328 /// This field is typically set in order to prevent objects from being
329 /// deleted or modified.
330 ///
331 /// # Example
332 /// ```
333 /// # use google_cloud_storage::client::Storage;
334 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
335 /// let response = client
336 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
337 /// .with_event_based_hold(true)
338 /// .send()
339 /// .await?;
340 /// println!("response details={response:?}");
341 /// # Ok(()) }
342 /// ```
343 ///
344 /// [event based hold]: https://cloud.google.com/storage/docs/object-holds
345 pub fn with_event_based_hold<V: Into<bool>>(mut self, v: V) -> Self {
346 self.mut_resource().event_based_hold = Some(v.into());
347 self
348 }
349
350 /// Sets the [custom metadata] for the new object.
351 ///
352 /// This field is typically set to annotate the object with
353 /// application-specific metadata.
354 ///
355 /// # Example
356 /// ```
357 /// # use google_cloud_storage::client::Storage;
358 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
359 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
360 /// let response = client
361 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
362 /// .with_metadata([("test-only", "true"), ("environment", "qa")])
363 /// .send()
364 /// .await?;
365 /// println!("response details={response:?}");
366 /// # Ok(()) }
367 /// ```
368 ///
369 /// [custom metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
370 pub fn with_metadata<I, K, V>(mut self, i: I) -> Self
371 where
372 I: IntoIterator<Item = (K, V)>,
373 K: Into<String>,
374 V: Into<String>,
375 {
376 self.mut_resource().metadata = i.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
377 self
378 }
379
380 /// Sets the [retention configuration] for the new object.
381 ///
382 /// # Example
383 /// ```
384 /// # use google_cloud_storage::client::Storage;
385 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
386 /// # use google_cloud_storage::model::object::{Retention, retention};
387 /// let response = client
388 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
389 /// .with_retention(
390 /// Retention::new()
391 /// .set_mode(retention::Mode::Locked)
392 /// .set_retain_until_time(wkt::Timestamp::try_from("2035-01-01T00:00:00Z")?))
393 /// .send()
394 /// .await?;
395 /// println!("response details={response:?}");
396 /// # Ok(()) }
397 /// ```
398 ///
399 /// [retention configuration]: https://cloud.google.com/storage/docs/metadata#retention-config
400 pub fn with_retention<V>(mut self, v: V) -> Self
401 where
402 V: Into<crate::model::object::Retention>,
403 {
404 self.mut_resource().retention = Some(v.into());
405 self
406 }
407
408 /// Sets the [storage class] for the new object.
409 ///
410 /// # Example
411 /// ```
412 /// # use google_cloud_storage::client::Storage;
413 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
414 /// let response = client
415 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
416 /// .with_storage_class("ARCHIVE")
417 /// .send()
418 /// .await?;
419 /// println!("response details={response:?}");
420 /// # Ok(()) }
421 /// ```
422 ///
423 /// [storage class]: https://cloud.google.com/storage/docs/storage-classes
424 pub fn with_storage_class<V>(mut self, v: V) -> Self
425 where
426 V: Into<String>,
427 {
428 self.mut_resource().storage_class = v.into();
429 self
430 }
431
432 /// Sets the [temporary hold] flag for the new object.
433 ///
434 /// This field is typically set in order to prevent objects from being
435 /// deleted or modified.
436 ///
437 /// # Example
438 /// ```
439 /// # use google_cloud_storage::client::Storage;
440 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
441 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
442 /// let response = client
443 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
444 /// .with_temporary_hold(true)
445 /// .send()
446 /// .await?;
447 /// println!("response details={response:?}");
448 /// # Ok(()) }
449 /// ```
450 ///
451 /// [temporary hold]: https://cloud.google.com/storage/docs/object-holds
452 pub fn with_temporary_hold<V: Into<bool>>(mut self, v: V) -> Self {
453 self.mut_resource().temporary_hold = v.into();
454 self
455 }
456
457 /// Sets the resource name of the [Customer-managed encryption key] for this
458 /// object.
459 ///
460 /// The service imposes a number of restrictions on the keys used to encrypt
461 /// Google Cloud Storage objects. Read the documentation in full before
462 /// trying to use customer-managed encryption keys. In particular, verify
463 /// the service has the necessary permissions, and the key is in a
464 /// compatible location.
465 ///
466 /// # Example
467 /// ```
468 /// # use google_cloud_storage::client::Storage;
469 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
470 /// let response = client
471 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
472 /// .with_kms_key("projects/test-project/locations/us-central1/keyRings/test-ring/cryptoKeys/test-key")
473 /// .send()
474 /// .await?;
475 /// println!("response details={response:?}");
476 /// # Ok(()) }
477 /// ```
478 ///
479 /// [Customer-managed encryption key]: https://cloud.google.com/storage/docs/encryption/customer-managed-keys
480 pub fn with_kms_key<V>(mut self, v: V) -> Self
481 where
482 V: Into<String>,
483 {
484 self.mut_resource().kms_key = v.into();
485 self
486 }
487
488 /// Configure this object to use one of the [predefined ACLs].
489 ///
490 /// # Example
491 /// ```
492 /// # use google_cloud_storage::client::Storage;
493 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
494 /// let response = client
495 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
496 /// .with_predefined_acl("private")
497 /// .send()
498 /// .await?;
499 /// println!("response details={response:?}");
500 /// # Ok(()) }
501 /// ```
502 ///
503 /// [predefined ACLs]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
504 pub fn with_predefined_acl<V>(mut self, v: V) -> Self
505 where
506 V: Into<String>,
507 {
508 self.spec.predefined_acl = v.into();
509 self
510 }
511
512 /// The encryption key used with the Customer-Supplied Encryption Keys
513 /// feature. In raw bytes format (not base64-encoded).
514 ///
515 /// # Example
516 /// ```
517 /// # use google_cloud_storage::client::Storage;
518 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
519 /// # use google_cloud_storage::client::KeyAes256;
520 /// let key: &[u8] = &[97; 32];
521 /// let response = client
522 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
523 /// .with_key(KeyAes256::new(key)?)
524 /// .send()
525 /// .await?;
526 /// println!("response details={response:?}");
527 /// # Ok(()) }
528 /// ```
529 pub fn with_key(mut self, v: KeyAes256) -> Self {
530 self.params = Some(v.into());
531 self
532 }
533
534 // TODO(#2050) - this should be automatically computed?
535 #[allow(dead_code)]
536 fn with_crc32c<V>(mut self, v: V) -> Self
537 where
538 V: Into<u32>,
539 {
540 let mut checksum = self.mut_resource().checksums.take().unwrap_or_default();
541 checksum.crc32c = Some(v.into());
542 self.mut_resource().checksums = Some(checksum);
543 self
544 }
545
546 // TODO(#2050) - this should be automatically computed?
547 #[allow(dead_code)]
548 fn with_md5_hash<I, V>(mut self, i: I) -> Self
549 where
550 I: IntoIterator<Item = V>,
551 V: Into<u8>,
552 {
553 let mut checksum = self.mut_resource().checksums.take().unwrap_or_default();
554 checksum.md5_hash = i.into_iter().map(|v| v.into()).collect();
555 // TODO(#2050) - should we return an error (or panic?) if the size is wrong?
556 self.mut_resource().checksums = Some(checksum);
557 self
558 }
559
560 /// The retry policy used for this request.
561 ///
562 /// # Example
563 /// ```
564 /// # use google_cloud_storage::client::Storage;
565 /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
566 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
567 /// use std::time::Duration;
568 /// use gax::retry_policy::RetryPolicyExt;
569 /// let response = client
570 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
571 /// .with_retry_policy(RecommendedPolicy
572 /// .with_attempt_limit(5)
573 /// .with_time_limit(Duration::from_secs(10)),
574 /// )
575 /// .send()
576 /// .await?;
577 /// println!("response details={response:?}");
578 /// # Ok(()) }
579 /// ```
580 pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
581 self.options.retry_policy = v.into().into();
582 self
583 }
584
585 /// The backoff policy used for this request.
586 ///
587 /// # Example
588 /// ```
589 /// # use google_cloud_storage::client::Storage;
590 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
591 /// use std::time::Duration;
592 /// use gax::exponential_backoff::ExponentialBackoff;
593 /// let response = client
594 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
595 /// .with_backoff_policy(ExponentialBackoff::default())
596 /// .send()
597 /// .await?;
598 /// println!("response details={response:?}");
599 /// # Ok(()) }
600 /// ```
601 pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
602 mut self,
603 v: V,
604 ) -> Self {
605 self.options.backoff_policy = v.into().into();
606 self
607 }
608
609 /// The retry throttler used for this request.
610 ///
611 /// Most of the time you want to use the same throttler for all the requests
612 /// in a client, and even the same throttler for many clients. Rarely it
613 /// maybe be necessary to use an ad-hoc throttler for some subset of the
614 /// requests.
615 ///
616 /// # Example
617 /// ```
618 /// # use google_cloud_storage::client::Storage;
619 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
620 /// let response = client
621 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
622 /// .with_retry_throttler(adhoc_throttler())
623 /// .send()
624 /// .await?;
625 /// println!("response details={response:?}");
626 /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
627 /// # panic!();
628 /// }
629 /// # Ok(()) }
630 /// ```
631 pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
632 mut self,
633 v: V,
634 ) -> Self {
635 self.options.retry_throttler = v.into().into();
636 self
637 }
638
639 /// Sets the payload size threshold to switch from single-shot to resumable uploads.
640 ///
641 /// # Example
642 /// ```
643 /// # use google_cloud_storage::client::Storage;
644 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
645 /// let response = client
646 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
647 /// .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
648 /// .send()
649 /// .await?;
650 /// println!("response details={response:?}");
651 /// # Ok(()) }
652 /// ```
653 ///
654 /// The client library can perform uploads using [single-shot] or
655 /// [resumable] uploads. For small objects, single-shot uploads offer better
656 /// performance, as they require a single HTTP transfer. For larger objects,
657 /// the additional request latency is not significant, and resumable uploads
658 /// offer better recovery on errors.
659 ///
660 /// The library automatically selects resumable uploads when the payload is
661 /// equal to or larger than this option. For smaller uploads the client
662 /// library uses single-shot uploads.
663 ///
664 /// The exact threshold depends on where the application is deployed and
665 /// destination bucket location with respect to where the application is
666 /// running. The library defaults should work well in most cases, but some
667 /// applications may benefit from fine-tuning.
668 ///
669 /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
670 /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
671 pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
672 self.options.resumable_upload_threshold = v.into();
673 self
674 }
675
676 /// Changes the buffer size for some resumable uploads.
677 ///
678 /// # Example
679 /// ```
680 /// # use google_cloud_storage::client::Storage;
681 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
682 /// let response = client
683 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
684 /// .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
685 /// .send()
686 /// .await?;
687 /// println!("response details={response:?}");
688 /// # Ok(()) }
689 /// ```
690 ///
691 /// When performing [resumable uploads] from sources without [Seek] the
692 /// client library needs to buffer data in memory until it is persisted by
693 /// the service. Otherwise the data would be lost if the upload fails.
694 /// Applications may want to tune this buffer size:
695 ///
696 /// - Use smaller buffer sizes to support more concurrent uploads in the
697 /// same application.
698 /// - Use larger buffer sizes for better throughput. Sending many small
699 /// buffers stalls the upload until the client receives a successful
700 /// response from the service.
701 ///
702 /// Keep in mind that there are diminishing returns on using larger buffers.
703 ///
704 /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
705 /// [Seek]: crate::upload_source::Seek
706 pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
707 self.options.resumable_upload_buffer_size = v.into();
708 self
709 }
710
711 fn mut_resource(&mut self) -> &mut crate::model::Object {
712 self.spec
713 .resource
714 .as_mut()
715 .expect("resource field initialized in `new()`")
716 }
717
718 fn resource(&self) -> &crate::model::Object {
719 self.spec
720 .resource
721 .as_ref()
722 .expect("resource field initialized in `new()`")
723 }
724
725 pub(crate) fn new<B, O, P>(
726 inner: std::sync::Arc<StorageInner>,
727 bucket: B,
728 object: O,
729 payload: P,
730 ) -> Self
731 where
732 B: Into<String>,
733 O: Into<String>,
734 P: Into<InsertPayload<T>>,
735 {
736 let options = inner.options.clone();
737 let resource = crate::model::Object::new()
738 .set_bucket(bucket)
739 .set_name(object);
740 UploadObject {
741 inner,
742 spec: crate::model::WriteObjectSpec::new().set_resource(resource),
743 params: None,
744 payload: Arc::new(Mutex::new(payload.into())),
745 options,
746 }
747 }
748
749 async fn start_resumable_upload(&self) -> Result<String>
750 where
751 T: Send + Sync + 'static,
752 {
753 let id = gax::retry_loop_internal::retry_loop(
754 // TODO(#2044) - we need to apply any timeouts here.
755 async |_| self.start_resumable_upload_attempt().await,
756 async |duration| tokio::time::sleep(duration).await,
757 // Creating a resumable upload is always idempotent. There are no
758 // **observable** side-effects if executed multiple times. Any extra
759 // sessions created in the retry loop are simply lost and eventually
760 // garbage collected.
761 true,
762 self.options.retry_throttler.clone(),
763 self.options.retry_policy.clone(),
764 self.options.backoff_policy.clone(),
765 )
766 .await?;
767 Ok(id)
768 }
769
770 async fn start_resumable_upload_attempt(&self) -> Result<String> {
771 let builder = self.start_resumable_upload_request().await?;
772 let response = builder.send().await.map_err(Error::io)?;
773 self::handle_start_resumable_upload_response(response).await
774 }
775
776 async fn start_resumable_upload_request(&self) -> Result<reqwest::RequestBuilder> {
777 let bucket = &self.resource().bucket;
778 let bucket_id = bucket.strip_prefix("projects/_/buckets/").ok_or_else(|| {
779 Error::binding(format!(
780 "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
781 ))
782 })?;
783 let object = &self.resource().name;
784 let builder = self
785 .inner
786 .client
787 .request(
788 reqwest::Method::POST,
789 format!("{}/upload/storage/v1/b/{bucket_id}/o", &self.inner.endpoint),
790 )
791 .query(&[("uploadType", "resumable")])
792 .query(&[("name", enc(object))])
793 .header("content-type", "application/json")
794 .header(
795 "x-goog-api-client",
796 reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
797 );
798
799 let builder = self.apply_preconditions(builder);
800 let builder = apply_customer_supplied_encryption_headers(builder, &self.params);
801 let builder = self.inner.apply_auth_headers(builder).await?;
802 let builder = builder.json(&v1::insert_body(self.resource()));
803 Ok(builder)
804 }
805
806 fn apply_preconditions(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
807 let builder = self
808 .spec
809 .if_generation_match
810 .iter()
811 .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
812 let builder = self
813 .spec
814 .if_generation_not_match
815 .iter()
816 .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
817 let builder = self
818 .spec
819 .if_metageneration_match
820 .iter()
821 .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
822 let builder = self
823 .spec
824 .if_metageneration_not_match
825 .iter()
826 .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
827
828 [
829 ("kmsKeyName", self.resource().kms_key.as_str()),
830 ("predefinedAcl", self.spec.predefined_acl.as_str()),
831 ]
832 .into_iter()
833 .fold(
834 builder,
835 |b, (k, v)| if v.is_empty() { b } else { b.query(&[(k, v)]) },
836 )
837 }
838}
839
840async fn handle_start_resumable_upload_response(response: reqwest::Response) -> Result<String> {
841 if !response.status().is_success() {
842 return gaxi::http::to_http_error(response).await;
843 }
844 let location = response
845 .headers()
846 .get("Location")
847 .ok_or_else(|| Error::deser("missing Location header in start resumable upload"))?;
848 location.to_str().map_err(Error::deser).map(str::to_string)
849}
850
851#[cfg(test)]
852mod tests {
853 use super::client::tests::{create_key_helper, test_builder, test_inner_client};
854 use super::*;
855 use crate::model::WriteObjectSpec;
856 use gax::retry_policy::RetryPolicyExt;
857 use gax::retry_result::RetryResult;
858 use httptest::{Expectation, Server, matchers::*, responders::status_code};
859 use serde_json::{Value, json};
860 use std::collections::BTreeMap;
861 use std::time::Duration;
862
863 type Result = anyhow::Result<()>;
864
865 #[test]
866 fn upload_object_unbuffered_metadata() -> Result {
867 use crate::model::ObjectAccessControl;
868 let inner = test_inner_client(test_builder());
869 let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
870 .with_if_generation_match(10)
871 .with_if_generation_not_match(20)
872 .with_if_metageneration_match(30)
873 .with_if_metageneration_not_match(40)
874 .with_predefined_acl("private")
875 .with_acl([ObjectAccessControl::new()
876 .set_entity("allAuthenticatedUsers")
877 .set_role("READER")])
878 .with_cache_control("public; max-age=7200")
879 .with_content_disposition("inline")
880 .with_content_encoding("gzip")
881 .with_content_language("en")
882 .with_content_type("text/plain")
883 .with_crc32c(crc32c::crc32c(b""))
884 .with_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
885 .with_event_based_hold(true)
886 .with_md5_hash(md5::compute(b"").0)
887 .with_metadata([("k0", "v0"), ("k1", "v1")])
888 .with_retention(
889 crate::model::object::Retention::new()
890 .set_mode(crate::model::object::retention::Mode::Locked)
891 .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
892 )
893 .with_storage_class("ARCHIVE")
894 .with_temporary_hold(true)
895 .with_kms_key("test-key");
896
897 let resource = request.spec.resource.take().unwrap();
898 let request = request;
899 assert_eq!(
900 &request.spec,
901 &WriteObjectSpec::new()
902 .set_if_generation_match(10)
903 .set_if_generation_not_match(20)
904 .set_if_metageneration_match(30)
905 .set_if_metageneration_not_match(40)
906 .set_predefined_acl("private")
907 );
908
909 assert_eq!(
910 resource,
911 Object::new()
912 .set_name("object")
913 .set_bucket("projects/_/buckets/bucket")
914 .set_acl([ObjectAccessControl::new()
915 .set_entity("allAuthenticatedUsers")
916 .set_role("READER")])
917 .set_cache_control("public; max-age=7200")
918 .set_content_disposition("inline")
919 .set_content_encoding("gzip")
920 .set_content_language("en")
921 .set_content_type("text/plain")
922 .set_checksums(
923 crate::model::ObjectChecksums::new()
924 .set_crc32c(crc32c::crc32c(b""))
925 .set_md5_hash(bytes::Bytes::from_iter(md5::compute(b"").0))
926 )
927 .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
928 .set_event_based_hold(true)
929 .set_metadata([("k0", "v0"), ("k1", "v1")])
930 .set_retention(
931 crate::model::object::Retention::new()
932 .set_mode("LOCKED")
933 .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?)
934 )
935 .set_storage_class("ARCHIVE")
936 .set_temporary_hold(true)
937 .set_kms_key("test-key")
938 );
939
940 Ok(())
941 }
942
943 #[test]
944 fn upload_object_options() {
945 let inner = test_inner_client(
946 test_builder()
947 .with_resumable_upload_threshold(123_usize)
948 .with_resumable_upload_buffer_size(234_usize),
949 );
950 let request = UploadObject::new(inner.clone(), "projects/_/buckets/bucket", "object", "");
951 assert_eq!(request.options.resumable_upload_threshold, 123);
952 assert_eq!(request.options.resumable_upload_buffer_size, 234);
953
954 let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
955 .with_resumable_upload_threshold(345_usize)
956 .with_resumable_upload_buffer_size(456_usize);
957 assert_eq!(request.options.resumable_upload_threshold, 345);
958 assert_eq!(request.options.resumable_upload_buffer_size, 456);
959 }
960
961 #[tokio::test]
962 async fn start_resumable_upload() -> Result {
963 let inner = test_inner_client(test_builder());
964 let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
965 .start_resumable_upload_request()
966 .await?
967 .build()?;
968
969 assert_eq!(request.method(), reqwest::Method::POST);
970 assert_eq!(
971 request.url().as_str(),
972 "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=resumable&name=object"
973 );
974 let body = request.body_mut().take().unwrap();
975 let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
976 let json = serde_json::from_slice::<Value>(&contents)?;
977 assert_eq!(json, json!({}));
978 Ok(())
979 }
980
981 #[tokio::test]
982 async fn start_resumable_upload_headers() -> Result {
983 // Make a 32-byte key.
984 let (key, key_base64, _, key_sha256_base64) = create_key_helper();
985
986 let inner = test_inner_client(test_builder());
987 let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
988 .with_key(KeyAes256::new(&key)?)
989 .start_resumable_upload_request()
990 .await?
991 .build()?;
992
993 assert_eq!(request.method(), reqwest::Method::POST);
994 assert_eq!(
995 request.url().as_str(),
996 "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=resumable&name=object"
997 );
998
999 let want = vec![
1000 ("x-goog-encryption-algorithm", "AES256".to_string()),
1001 ("x-goog-encryption-key", key_base64),
1002 ("x-goog-encryption-key-sha256", key_sha256_base64),
1003 ];
1004
1005 for (name, value) in want {
1006 assert_eq!(
1007 request.headers().get(name).unwrap().as_bytes(),
1008 bytes::Bytes::from(value)
1009 );
1010 }
1011 Ok(())
1012 }
1013
1014 #[tokio::test]
1015 async fn start_resumable_upload_bad_bucket() -> Result {
1016 let inner = test_inner_client(test_builder());
1017 UploadObject::new(inner, "malformed", "object", "hello")
1018 .start_resumable_upload_request()
1019 .await
1020 .expect_err("malformed bucket string should error");
1021 Ok(())
1022 }
1023
1024 #[tokio::test]
1025 async fn start_resumable_upload_metadata_in_request() -> Result {
1026 use crate::model::ObjectAccessControl;
1027 let inner = test_inner_client(test_builder());
1028 let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1029 .with_if_generation_match(10)
1030 .with_if_generation_not_match(20)
1031 .with_if_metageneration_match(30)
1032 .with_if_metageneration_not_match(40)
1033 .with_predefined_acl("private")
1034 .with_acl([ObjectAccessControl::new()
1035 .set_entity("allAuthenticatedUsers")
1036 .set_role("READER")])
1037 .with_cache_control("public; max-age=7200")
1038 .with_content_disposition("inline")
1039 .with_content_encoding("gzip")
1040 .with_content_language("en")
1041 .with_content_type("text/plain")
1042 .with_crc32c(crc32c::crc32c(b""))
1043 .with_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1044 .with_event_based_hold(true)
1045 .with_md5_hash(md5::compute(b"").0)
1046 .with_metadata([("k0", "v0"), ("k1", "v1")])
1047 .with_retention(
1048 crate::model::object::Retention::new()
1049 .set_mode(crate::model::object::retention::Mode::Locked)
1050 .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
1051 )
1052 .with_storage_class("ARCHIVE")
1053 .with_temporary_hold(true)
1054 .with_kms_key("test-key")
1055 .start_resumable_upload_request()
1056 .await?
1057 .build()?;
1058
1059 assert_eq!(request.method(), reqwest::Method::POST);
1060 let want_pairs: BTreeMap<String, String> = [
1061 ("uploadType", "resumable"),
1062 ("name", "object"),
1063 ("ifGenerationMatch", "10"),
1064 ("ifGenerationNotMatch", "20"),
1065 ("ifMetagenerationMatch", "30"),
1066 ("ifMetagenerationNotMatch", "40"),
1067 ("kmsKeyName", "test-key"),
1068 ("predefinedAcl", "private"),
1069 ]
1070 .iter()
1071 .map(|(k, v)| (k.to_string(), v.to_string()))
1072 .collect();
1073 let query_pairs: BTreeMap<String, String> = request
1074 .url()
1075 .query_pairs()
1076 .map(|param| (param.0.to_string(), param.1.to_string()))
1077 .collect();
1078 assert_eq!(query_pairs, want_pairs);
1079
1080 let body = request.body_mut().take().unwrap();
1081 let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
1082 let json = serde_json::from_slice::<Value>(&contents)?;
1083 assert_eq!(
1084 json,
1085 json!({
1086 "acl": [{"entity": "allAuthenticatedUsers", "role": "READER"}],
1087 "cacheControl": "public; max-age=7200",
1088 "contentDisposition": "inline",
1089 "contentEncoding": "gzip",
1090 "contentLanguage": "en",
1091 "contentType": "text/plain",
1092 "crc32c": "AAAAAA==",
1093 "customTime": "2025-07-07T18:11:00Z",
1094 "eventBasedHold": true,
1095 "md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
1096 "metadata": {"k0": "v0", "k1": "v1"},
1097 "retention": {"mode": "LOCKED", "retainUntilTime": "2035-07-07T18:14:00Z"},
1098 "storageClass": "ARCHIVE",
1099 "temporaryHold": true,
1100 })
1101 );
1102 Ok(())
1103 }
1104
1105 #[tokio::test]
1106 async fn start_resumable_upload_credentials() -> Result {
1107 let inner = test_inner_client(
1108 test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
1109 );
1110 let _ = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1111 .start_resumable_upload_request()
1112 .await
1113 .inspect_err(|e| assert!(e.is_authentication()))
1114 .expect_err("invalid credentials should err");
1115 Ok(())
1116 }
1117
1118 #[tokio::test]
1119 async fn start_resumable_upload_immediate_success() -> Result {
1120 let server = Server::run();
1121 let session = server.url("/upload/session/test-only-001");
1122 let want = session.to_string();
1123 server.expect(
1124 Expectation::matching(all_of![
1125 request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1126 request::query(url_decoded(contains(("name", "object")))),
1127 request::query(url_decoded(contains(("uploadType", "resumable")))),
1128 ])
1129 .respond_with(
1130 status_code(200)
1131 .append_header("location", session.to_string())
1132 .body(""),
1133 ),
1134 );
1135
1136 let inner =
1137 test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
1138 let got = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1139 .start_resumable_upload()
1140 .await?;
1141 assert_eq!(got, want);
1142
1143 Ok(())
1144 }
1145
1146 #[tokio::test]
1147 async fn start_resumable_upload_immediate_error() -> Result {
1148 let server = Server::run();
1149 server.expect(
1150 Expectation::matching(all_of![
1151 request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1152 request::query(url_decoded(contains(("name", "object")))),
1153 request::query(url_decoded(contains(("uploadType", "resumable")))),
1154 ])
1155 .respond_with(status_code(403).body("uh-oh")),
1156 );
1157
1158 let inner =
1159 test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
1160 let err = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1161 .start_resumable_upload()
1162 .await
1163 .expect_err("request should fail");
1164 assert_eq!(err.http_status_code(), Some(403), "{err:?}");
1165
1166 Ok(())
1167 }
1168
1169 #[tokio::test]
1170 async fn start_resumable_upload_retry_transient_failures_then_success() -> Result {
1171 use httptest::responders::cycle;
1172 let server = Server::run();
1173 let session = server.url("/upload/session/test-only-001");
1174 let want = session.to_string();
1175 let matching = || {
1176 Expectation::matching(all_of![
1177 request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1178 request::query(url_decoded(contains(("name", "object")))),
1179 request::query(url_decoded(contains(("uploadType", "resumable")))),
1180 ])
1181 };
1182 server.expect(matching().times(3).respond_with(cycle![
1183 status_code(503).body("try-again"),
1184 status_code(503).body("try-again"),
1185 status_code(200).append_header("location", session.to_string()),
1186 ]));
1187
1188 let inner =
1189 test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
1190 let got = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1191 .start_resumable_upload()
1192 .await?;
1193 assert_eq!(got, want);
1194
1195 Ok(())
1196 }
1197
1198 // Verify the retry options are used and that exhausted policies result in
1199 // errors.
1200 #[tokio::test]
1201 async fn start_resumable_upload_request_retry_options() -> Result {
1202 let server = Server::run();
1203 let matching = || {
1204 Expectation::matching(all_of![
1205 request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1206 request::query(url_decoded(contains(("name", "object")))),
1207 request::query(url_decoded(contains(("uploadType", "resumable")))),
1208 ])
1209 };
1210 server.expect(
1211 matching()
1212 .times(3)
1213 .respond_with(status_code(503).body("try-again")),
1214 );
1215
1216 let inner =
1217 test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
1218 let mut retry = MockRetryPolicy::new();
1219 retry
1220 .expect_on_error()
1221 .times(1..)
1222 .returning(|_, _, _, e| RetryResult::Continue(e));
1223
1224 let mut backoff = MockBackoffPolicy::new();
1225 backoff
1226 .expect_on_failure()
1227 .times(1..)
1228 .return_const(Duration::from_micros(1));
1229
1230 let mut throttler = MockRetryThrottler::new();
1231 throttler
1232 .expect_throttle_retry_attempt()
1233 .times(1..)
1234 .return_const(false);
1235 throttler
1236 .expect_on_retry_failure()
1237 .times(1..)
1238 .return_const(());
1239 throttler.expect_on_success().never().return_const(());
1240
1241 let err = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1242 .with_retry_policy(retry.with_attempt_limit(3))
1243 .with_backoff_policy(backoff)
1244 .with_retry_throttler(throttler)
1245 .start_resumable_upload()
1246 .await
1247 .expect_err("request should fail after 3 retry attempts");
1248 assert_eq!(err.http_status_code(), Some(503), "{err:?}");
1249
1250 Ok(())
1251 }
1252
1253 // Verify the client retry options are used and that exhausted policies
1254 // result in errors.
1255 #[tokio::test]
1256 async fn start_resumable_upload_client_retry_options() -> Result {
1257 let server = Server::run();
1258 let matching = || {
1259 Expectation::matching(all_of![
1260 request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
1261 request::query(url_decoded(contains(("name", "object")))),
1262 request::query(url_decoded(contains(("uploadType", "resumable")))),
1263 ])
1264 };
1265 server.expect(
1266 matching()
1267 .times(3)
1268 .respond_with(status_code(503).body("try-again")),
1269 );
1270
1271 let mut retry = MockRetryPolicy::new();
1272 retry
1273 .expect_on_error()
1274 .times(1..)
1275 .returning(|_, _, _, e| RetryResult::Continue(e));
1276
1277 let mut backoff = MockBackoffPolicy::new();
1278 backoff
1279 .expect_on_failure()
1280 .times(1..)
1281 .return_const(Duration::from_micros(1));
1282
1283 let mut throttler = MockRetryThrottler::new();
1284 throttler
1285 .expect_throttle_retry_attempt()
1286 .times(1..)
1287 .return_const(false);
1288 throttler
1289 .expect_on_retry_failure()
1290 .times(1..)
1291 .return_const(());
1292 throttler.expect_on_success().never().return_const(());
1293
1294 let inner = test_inner_client(
1295 test_builder()
1296 .with_endpoint(format!("http://{}", server.addr()))
1297 .with_retry_policy(retry.with_attempt_limit(3))
1298 .with_backoff_policy(backoff)
1299 .with_retry_throttler(throttler),
1300 );
1301 let err = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
1302 .start_resumable_upload()
1303 .await
1304 .expect_err("request should fail after 3 retry attempts");
1305 assert_eq!(err.http_status_code(), Some(503), "{err:?}");
1306
1307 Ok(())
1308 }
1309
1310 mockall::mock! {
1311 #[derive(Debug)]
1312 pub RetryThrottler {}
1313
1314 impl gax::retry_throttler::RetryThrottler for RetryThrottler {
1315 fn throttle_retry_attempt(&self) -> bool;
1316 fn on_retry_failure(&mut self, flow: &RetryResult);
1317 fn on_success(&mut self);
1318 }
1319 }
1320
1321 mockall::mock! {
1322 #[derive(Debug)]
1323 pub RetryPolicy {}
1324
1325 impl gax::retry_policy::RetryPolicy for RetryPolicy {
1326 fn on_error(&self, loop_start: std::time::Instant, attempt_count: u32, idempotent: bool, error: gax::error::Error) -> RetryResult;
1327 }
1328 }
1329
1330 mockall::mock! {
1331 #[derive(Debug)]
1332 pub BackoffPolicy {}
1333
1334 impl gax::backoff_policy::BackoffPolicy for BackoffPolicy {
1335 fn on_failure(&self, loop_start: std::time::Instant, attempt_count: u32) -> std::time::Duration;
1336 }
1337 }
1338}