google_cloud_storage/storage/
upload_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Contains the request builder for [upload_object()] and related types.
16//!
17//! [upload_object()]: crate::storage::client::Storage::upload_object()
18
19use super::client::*;
20use super::perform_upload::PerformUpload;
21use super::upload_source::{Seek, StreamingSource};
22use super::*;
23use crate::storage::checksum::{ChecksumEngine, Crc32c, Known, KnownCrc32c, KnownMd5, Md5};
24
25/// A request builder for object uploads.
26///
27/// # Example: hello world
28/// ```
29/// use google_cloud_storage::client::Storage;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31///     let response = client
32///         .upload_object("projects/_/buckets/my-bucket", "hello", "Hello World!")
33///         .send_unbuffered()
34///         .await?;
35///     println!("response details={response:?}");
36///     Ok(())
37/// }
38/// ```
39///
40/// # Example: upload a file
41/// ```
42/// use google_cloud_storage::client::Storage;
43/// async fn sample(client: &Storage) -> anyhow::Result<()> {
44///     let payload = tokio::fs::File::open("my-data").await?;
45///     let response = client
46///         .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
47///         .send_unbuffered()
48///         .await?;
49///     println!("response details={response:?}");
50///     Ok(())
51/// }
52/// ```
53///
54/// # Example: upload a custom data source
55/// ```
56/// use google_cloud_storage::{client::Storage, upload_source::StreamingSource};
57/// struct DataSource;
58/// impl StreamingSource for DataSource {
59///     type Error = std::io::Error;
60///     async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
61///         # panic!();
62///     }
63/// }
64///
65/// async fn sample(client: &Storage) -> anyhow::Result<()> {
66///     let response = client
67///         .upload_object("projects/_/buckets/my-bucket", "my-object", DataSource)
68///         .send()
69///         .await?;
70///     println!("response details={response:?}");
71///     Ok(())
72/// }
73/// ```
74pub struct UploadObject<T, C = Crc32c> {
75    inner: std::sync::Arc<StorageInner>,
76    spec: crate::model::WriteObjectSpec,
77    params: Option<crate::model::CommonObjectRequestParams>,
78    payload: Payload<T>,
79    options: super::request_options::RequestOptions,
80    checksum: C,
81}
82
83impl<T, C> UploadObject<T, C> {
84    /// Set a [request precondition] on the object generation to match.
85    ///
86    /// With this precondition the request fails if the current object
87    /// generation matches the provided value. A common value is `0`, which
88    /// prevents uploads from succeeding if the object already exists.
89    ///
90    /// # Example
91    /// ```
92    /// # use google_cloud_storage::client::Storage;
93    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
94    /// let response = client
95    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
96    ///     .with_if_generation_match(0)
97    ///     .send()
98    ///     .await?;
99    /// println!("response details={response:?}");
100    /// # Ok(()) }
101    /// ```
102    ///
103    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
104    pub fn with_if_generation_match<V>(mut self, v: V) -> Self
105    where
106        V: Into<i64>,
107    {
108        self.spec.if_generation_match = Some(v.into());
109        self
110    }
111
112    /// Set a [request precondition] on the object generation to match.
113    ///
114    /// With this precondition the request fails if the current object
115    /// generation does not match the provided value.
116    ///
117    /// # Example
118    /// ```
119    /// # use google_cloud_storage::client::Storage;
120    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
121    /// let response = client
122    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
123    ///     .with_if_generation_not_match(0)
124    ///     .send()
125    ///     .await?;
126    /// println!("response details={response:?}");
127    /// # Ok(()) }
128    /// ```
129    ///
130    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
131    pub fn with_if_generation_not_match<V>(mut self, v: V) -> Self
132    where
133        V: Into<i64>,
134    {
135        self.spec.if_generation_not_match = Some(v.into());
136        self
137    }
138
139    /// Set a [request precondition] on the object meta generation.
140    ///
141    /// With this precondition the request fails if the current object metadata
142    /// generation does not match the provided value. This may be useful to
143    /// prevent changes when the metageneration is known.
144    ///
145    /// # Example
146    /// ```
147    /// # use google_cloud_storage::client::Storage;
148    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
149    /// let response = client
150    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
151    ///     .with_if_metageneration_match(1234)
152    ///     .send()
153    ///     .await?;
154    /// println!("response details={response:?}");
155    /// # Ok(()) }
156    /// ```
157    ///
158    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
159    pub fn with_if_metageneration_match<V>(mut self, v: V) -> Self
160    where
161        V: Into<i64>,
162    {
163        self.spec.if_metageneration_match = Some(v.into());
164        self
165    }
166
167    /// Set a [request precondition] on the object meta-generation.
168    ///
169    /// With this precondition the request fails if the current object metadata
170    /// generation matches the provided value. This is rarely useful in uploads,
171    /// it is more commonly used on downloads to prevent downloads if the value
172    /// is already cached.
173    ///
174    /// # Example
175    /// ```
176    /// # use google_cloud_storage::client::Storage;
177    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
178    /// let response = client
179    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
180    ///     .with_if_metageneration_not_match(1234)
181    ///     .send()
182    ///     .await?;
183    /// println!("response details={response:?}");
184    /// # Ok(()) }
185    /// ```
186    ///
187    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
188    pub fn with_if_metageneration_not_match<V>(mut self, v: V) -> Self
189    where
190        V: Into<i64>,
191    {
192        self.spec.if_metageneration_not_match = Some(v.into());
193        self
194    }
195
196    /// Sets the ACL for the new object.
197    ///
198    /// # Example
199    /// ```
200    /// # use google_cloud_storage::client::Storage;
201    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
202    /// # use google_cloud_storage::model::ObjectAccessControl;
203    /// let response = client
204    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
205    ///     .with_acl([ObjectAccessControl::new().set_entity("allAuthenticatedUsers").set_role("READER")])
206    ///     .send()
207    ///     .await?;
208    /// println!("response details={response:?}");
209    /// # Ok(()) }
210    /// ```
211    pub fn with_acl<I, V>(mut self, v: I) -> Self
212    where
213        I: IntoIterator<Item = V>,
214        V: Into<crate::model::ObjectAccessControl>,
215    {
216        self.mut_resource().acl = v.into_iter().map(|a| a.into()).collect();
217        self
218    }
219
220    /// Sets the [cache control] for the new object.
221    ///
222    /// This can be used to control caching in [public objects].
223    ///
224    /// # Example
225    /// ```
226    /// # use google_cloud_storage::client::Storage;
227    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
228    /// let response = client
229    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
230    ///     .with_cache_control("public; max-age=7200")
231    ///     .send()
232    ///     .await?;
233    /// println!("response details={response:?}");
234    /// # Ok(()) }
235    /// ```
236    ///
237    /// [public objects]: https://cloud.google.com/storage/docs/access-control/making-data-public
238    /// [cache control]: https://datatracker.ietf.org/doc/html/rfc7234#section-5.2
239    pub fn with_cache_control<V: Into<String>>(mut self, v: V) -> Self {
240        self.mut_resource().cache_control = v.into();
241        self
242    }
243
244    /// Sets the [content disposition] for the new object.
245    ///
246    /// Google Cloud Storage can serve content directly to web browsers. This
247    /// attribute sets the `Content-Disposition` header, which may change how
248    /// the browser displays the contents.
249    ///
250    /// # Example
251    /// ```
252    /// # use google_cloud_storage::client::Storage;
253    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
254    /// let response = client
255    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
256    ///     .with_content_disposition("inline")
257    ///     .send()
258    ///     .await?;
259    /// println!("response details={response:?}");
260    /// # Ok(()) }
261    /// ```
262    ///
263    /// [content disposition]: https://datatracker.ietf.org/doc/html/rfc6266
264    pub fn with_content_disposition<V: Into<String>>(mut self, v: V) -> Self {
265        self.mut_resource().content_disposition = v.into();
266        self
267    }
268
269    /// Sets the [content encoding] for the object data.
270    ///
271    /// This can be used to upload compressed data and enable [transcoding] of
272    /// the data during downloads.
273    ///
274    /// # Example
275    /// ```
276    /// # use google_cloud_storage::client::Storage;
277    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
278    /// use flate2::write::GzEncoder;
279    /// use std::io::Write;
280    /// let mut e = GzEncoder::new(Vec::new(), flate2::Compression::default());
281    /// e.write_all(b"hello world");
282    /// let response = client
283    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", bytes::Bytes::from_owner(e.finish()?))
284    ///     .with_content_encoding("gzip")
285    ///     .send()
286    ///     .await?;
287    /// println!("response details={response:?}");
288    /// # Ok(()) }
289    /// ```
290    ///
291    /// [transcoding]: https://cloud.google.com/storage/docs/transcoding
292    /// [content encoding]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.2
293    pub fn with_content_encoding<V: Into<String>>(mut self, v: V) -> Self {
294        self.mut_resource().content_encoding = v.into();
295        self
296    }
297
298    /// Sets the [content language] for the new object.
299    ///
300    /// Google Cloud Storage can serve content directly to web browsers. This
301    /// attribute sets the `Content-Language` header, which may change how the
302    /// browser displays the contents.
303    ///
304    /// # Example
305    /// ```
306    /// # use google_cloud_storage::client::Storage;
307    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
308    /// let response = client
309    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
310    ///     .with_content_language("en")
311    ///     .send()
312    ///     .await?;
313    /// println!("response details={response:?}");
314    /// # Ok(()) }
315    /// ```
316    ///
317    /// [content language]: https://cloud.google.com/storage/docs/metadata#content-language
318    pub fn with_content_language<V: Into<String>>(mut self, v: V) -> Self {
319        self.mut_resource().content_language = v.into();
320        self
321    }
322
323    /// Sets the [content type] for the new object.
324    ///
325    /// Google Cloud Storage can serve content directly to web browsers. This
326    /// attribute sets the `Content-Type` header, which may change how the
327    /// browser interprets the contents.
328    ///
329    /// # Example
330    /// ```
331    /// # use google_cloud_storage::client::Storage;
332    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
333    /// let response = client
334    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
335    ///     .with_content_type("text/plain")
336    ///     .send()
337    ///     .await?;
338    /// println!("response details={response:?}");
339    /// # Ok(()) }
340    /// ```
341    ///
342    /// [content type]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.5
343    pub fn with_content_type<V: Into<String>>(mut self, v: V) -> Self {
344        self.mut_resource().content_type = v.into();
345        self
346    }
347
348    /// Sets the [custom time] for the new object.
349    ///
350    /// This field is typically set in order to use the [DaysSinceCustomTime]
351    /// condition in Object Lifecycle Management.
352    ///
353    /// # Example
354    /// ```
355    /// # use google_cloud_storage::client::Storage;
356    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
357    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
358    /// let response = client
359    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
360    ///     .with_custom_time(time)
361    ///     .send()
362    ///     .await?;
363    /// println!("response details={response:?}");
364    /// # Ok(()) }
365    /// ```
366    ///
367    /// [DaysSinceCustomTime]: https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
368    /// [custom time]: https://cloud.google.com/storage/docs/metadata#custom-time
369    pub fn with_custom_time<V: Into<wkt::Timestamp>>(mut self, v: V) -> Self {
370        self.mut_resource().custom_time = Some(v.into());
371        self
372    }
373
374    /// Sets the [event based hold] flag for the new object.
375    ///
376    /// This field is typically set in order to prevent objects from being
377    /// deleted or modified.
378    ///
379    /// # Example
380    /// ```
381    /// # use google_cloud_storage::client::Storage;
382    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
383    /// let response = client
384    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
385    ///     .with_event_based_hold(true)
386    ///     .send()
387    ///     .await?;
388    /// println!("response details={response:?}");
389    /// # Ok(()) }
390    /// ```
391    ///
392    /// [event based hold]: https://cloud.google.com/storage/docs/object-holds
393    pub fn with_event_based_hold<V: Into<bool>>(mut self, v: V) -> Self {
394        self.mut_resource().event_based_hold = Some(v.into());
395        self
396    }
397
398    /// Sets the [custom metadata] for the new object.
399    ///
400    /// This field is typically set to annotate the object with
401    /// application-specific metadata.
402    ///
403    /// # Example
404    /// ```
405    /// # use google_cloud_storage::client::Storage;
406    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
407    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
408    /// let response = client
409    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
410    ///     .with_metadata([("test-only", "true"), ("environment", "qa")])
411    ///     .send()
412    ///     .await?;
413    /// println!("response details={response:?}");
414    /// # Ok(()) }
415    /// ```
416    ///
417    /// [custom metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
418    pub fn with_metadata<I, K, V>(mut self, i: I) -> Self
419    where
420        I: IntoIterator<Item = (K, V)>,
421        K: Into<String>,
422        V: Into<String>,
423    {
424        self.mut_resource().metadata = i.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
425        self
426    }
427
428    /// Sets the [retention configuration] for the new object.
429    ///
430    /// # Example
431    /// ```
432    /// # use google_cloud_storage::client::Storage;
433    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
434    /// # use google_cloud_storage::model::object::{Retention, retention};
435    /// let response = client
436    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
437    ///     .with_retention(
438    ///         Retention::new()
439    ///             .set_mode(retention::Mode::Locked)
440    ///             .set_retain_until_time(wkt::Timestamp::try_from("2035-01-01T00:00:00Z")?))
441    ///     .send()
442    ///     .await?;
443    /// println!("response details={response:?}");
444    /// # Ok(()) }
445    /// ```
446    ///
447    /// [retention configuration]: https://cloud.google.com/storage/docs/metadata#retention-config
448    pub fn with_retention<V>(mut self, v: V) -> Self
449    where
450        V: Into<crate::model::object::Retention>,
451    {
452        self.mut_resource().retention = Some(v.into());
453        self
454    }
455
456    /// Sets the [storage class] for the new object.
457    ///
458    /// # Example
459    /// ```
460    /// # use google_cloud_storage::client::Storage;
461    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
462    /// let response = client
463    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
464    ///     .with_storage_class("ARCHIVE")
465    ///     .send()
466    ///     .await?;
467    /// println!("response details={response:?}");
468    /// # Ok(()) }
469    /// ```
470    ///
471    /// [storage class]: https://cloud.google.com/storage/docs/storage-classes
472    pub fn with_storage_class<V>(mut self, v: V) -> Self
473    where
474        V: Into<String>,
475    {
476        self.mut_resource().storage_class = v.into();
477        self
478    }
479
480    /// Sets the [temporary hold] flag for the new object.
481    ///
482    /// This field is typically set in order to prevent objects from being
483    /// deleted or modified.
484    ///
485    /// # Example
486    /// ```
487    /// # use google_cloud_storage::client::Storage;
488    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
489    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
490    /// let response = client
491    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
492    ///     .with_temporary_hold(true)
493    ///     .send()
494    ///     .await?;
495    /// println!("response details={response:?}");
496    /// # Ok(()) }
497    /// ```
498    ///
499    /// [temporary hold]: https://cloud.google.com/storage/docs/object-holds
500    pub fn with_temporary_hold<V: Into<bool>>(mut self, v: V) -> Self {
501        self.mut_resource().temporary_hold = v.into();
502        self
503    }
504
505    /// Sets the resource name of the [Customer-managed encryption key] for this
506    /// object.
507    ///
508    /// The service imposes a number of restrictions on the keys used to encrypt
509    /// Google Cloud Storage objects. Read the documentation in full before
510    /// trying to use customer-managed encryption keys. In particular, verify
511    /// the service has the necessary permissions, and the key is in a
512    /// compatible location.
513    ///
514    /// # Example
515    /// ```
516    /// # use google_cloud_storage::client::Storage;
517    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
518    /// let response = client
519    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
520    ///     .with_kms_key("projects/test-project/locations/us-central1/keyRings/test-ring/cryptoKeys/test-key")
521    ///     .send()
522    ///     .await?;
523    /// println!("response details={response:?}");
524    /// # Ok(()) }
525    /// ```
526    ///
527    /// [Customer-managed encryption key]: https://cloud.google.com/storage/docs/encryption/customer-managed-keys
528    pub fn with_kms_key<V>(mut self, v: V) -> Self
529    where
530        V: Into<String>,
531    {
532        self.mut_resource().kms_key = v.into();
533        self
534    }
535
536    /// Configure this object to use one of the [predefined ACLs].
537    ///
538    /// # Example
539    /// ```
540    /// # use google_cloud_storage::client::Storage;
541    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
542    /// let response = client
543    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
544    ///     .with_predefined_acl("private")
545    ///     .send()
546    ///     .await?;
547    /// println!("response details={response:?}");
548    /// # Ok(()) }
549    /// ```
550    ///
551    /// [predefined ACLs]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
552    pub fn with_predefined_acl<V>(mut self, v: V) -> Self
553    where
554        V: Into<String>,
555    {
556        self.spec.predefined_acl = v.into();
557        self
558    }
559
560    /// The encryption key used with the Customer-Supplied Encryption Keys
561    /// feature. In raw bytes format (not base64-encoded).
562    ///
563    /// # Example
564    /// ```
565    /// # use google_cloud_storage::client::Storage;
566    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
567    /// # use google_cloud_storage::client::KeyAes256;
568    /// let key: &[u8] = &[97; 32];
569    /// let response = client
570    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
571    ///     .with_key(KeyAes256::new(key)?)
572    ///     .send()
573    ///     .await?;
574    /// println!("response details={response:?}");
575    /// # Ok(()) }
576    /// ```
577    pub fn with_key(mut self, v: KeyAes256) -> Self {
578        self.params = Some(v.into());
579        self
580    }
581
582    /// Configure the idempotency for this upload.
583    ///
584    /// By default, the client library treats single-shot uploads without
585    /// preconditions, as non-idempotent. If the destination bucket is
586    /// configured with [object versioning] then the operation may succeed
587    /// multiple times with observable side-effects. With object versioning and
588    /// a [lifecycle] policy limiting the number of versions, uploading the same
589    /// data multiple times may result in data loss.
590    ///
591    /// The client library cannot efficiently determine if these conditions
592    /// apply to your upload. If they do, or your application can tolerate
593    /// multiple versions of the same data for other reasons, consider using
594    /// `with_idempotency(true)`.
595    ///
596    /// The client library treats resumable uploads as idempotent, regardless of
597    /// the value in this option. Such uploads can succeed at most once.
598    ///
599    /// # Example
600    /// ```
601    /// # use google_cloud_storage::client::Storage;
602    /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
603    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
604    /// use std::time::Duration;
605    /// use gax::retry_policy::RetryPolicyExt;
606    /// let response = client
607    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
608    ///     .with_idempotency(true)
609    ///     .send()
610    ///     .await?;
611    /// println!("response details={response:?}");
612    /// # Ok(()) }
613    /// ```
614    ///
615    /// [lifecycle]: https://cloud.google.com/storage/docs/lifecycle
616    /// [object versioning]: https://cloud.google.com/storage/docs/object-versioning
617    pub fn with_idempotency(mut self, v: bool) -> Self {
618        self.options.idempotency = Some(v);
619        self
620    }
621
622    /// The retry policy used for this request.
623    ///
624    /// # Example
625    /// ```
626    /// # use google_cloud_storage::client::Storage;
627    /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
628    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
629    /// use std::time::Duration;
630    /// use gax::retry_policy::RetryPolicyExt;
631    /// let response = client
632    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
633    ///     .with_retry_policy(RecommendedPolicy
634    ///         .with_attempt_limit(5)
635    ///         .with_time_limit(Duration::from_secs(10)),
636    ///     )
637    ///     .send()
638    ///     .await?;
639    /// println!("response details={response:?}");
640    /// # Ok(()) }
641    /// ```
642    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
643        self.options.retry_policy = v.into().into();
644        self
645    }
646
647    /// The backoff policy used for this request.
648    ///
649    /// # Example
650    /// ```
651    /// # use google_cloud_storage::client::Storage;
652    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
653    /// use std::time::Duration;
654    /// use gax::exponential_backoff::ExponentialBackoff;
655    /// let response = client
656    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
657    ///     .with_backoff_policy(ExponentialBackoff::default())
658    ///     .send()
659    ///     .await?;
660    /// println!("response details={response:?}");
661    /// # Ok(()) }
662    /// ```
663    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
664        mut self,
665        v: V,
666    ) -> Self {
667        self.options.backoff_policy = v.into().into();
668        self
669    }
670
671    /// The retry throttler used for this request.
672    ///
673    /// Most of the time you want to use the same throttler for all the requests
674    /// in a client, and even the same throttler for many clients. Rarely it
675    /// may be necessary to use an custom throttler for some subset of the
676    /// requests.
677    ///
678    /// # Example
679    /// ```
680    /// # use google_cloud_storage::client::Storage;
681    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
682    /// let response = client
683    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
684    ///     .with_retry_throttler(adhoc_throttler())
685    ///     .send()
686    ///     .await?;
687    /// println!("response details={response:?}");
688    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
689    ///     # panic!();
690    /// }
691    /// # Ok(()) }
692    /// ```
693    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
694        mut self,
695        v: V,
696    ) -> Self {
697        self.options.retry_throttler = v.into().into();
698        self
699    }
700
701    /// Sets the payload size threshold to switch from single-shot to resumable uploads.
702    ///
703    /// # Example
704    /// ```
705    /// # use google_cloud_storage::client::Storage;
706    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
707    /// let response = client
708    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
709    ///     .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
710    ///     .send()
711    ///     .await?;
712    /// println!("response details={response:?}");
713    /// # Ok(()) }
714    /// ```
715    ///
716    /// The client library can perform uploads using [single-shot] or
717    /// [resumable] uploads. For small objects, single-shot uploads offer better
718    /// performance, as they require a single HTTP transfer. For larger objects,
719    /// the additional request latency is not significant, and resumable uploads
720    /// offer better recovery on errors.
721    ///
722    /// The library automatically selects resumable uploads when the payload is
723    /// equal to or larger than this option. For smaller uploads the client
724    /// library uses single-shot uploads.
725    ///
726    /// The exact threshold depends on where the application is deployed and
727    /// destination bucket location with respect to where the application is
728    /// running. The library defaults should work well in most cases, but some
729    /// applications may benefit from fine-tuning.
730    ///
731    /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
732    /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
733    pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
734        self.options.resumable_upload_threshold = v.into();
735        self
736    }
737
738    /// Changes the buffer size for some resumable uploads.
739    ///
740    /// # Example
741    /// ```
742    /// # use google_cloud_storage::client::Storage;
743    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
744    /// let response = client
745    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
746    ///     .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
747    ///     .send()
748    ///     .await?;
749    /// println!("response details={response:?}");
750    /// # Ok(()) }
751    /// ```
752    ///
753    /// When performing [resumable uploads] from sources without [Seek] the
754    /// client library needs to buffer data in memory until it is persisted by
755    /// the service. Otherwise the data would be lost if the upload fails.
756    /// Applications may want to tune this buffer size:
757    ///
758    /// - Use smaller buffer sizes to support more concurrent uploads in the
759    ///   same application.
760    /// - Use larger buffer sizes for better throughput. Sending many small
761    ///   buffers stalls the upload until the client receives a successful
762    ///   response from the service.
763    ///
764    /// Keep in mind that there are diminishing returns on using larger buffers.
765    ///
766    /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
767    /// [Seek]: crate::upload_source::Seek
768    pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
769        self.options.resumable_upload_buffer_size = v.into();
770        self
771    }
772
773    fn mut_resource(&mut self) -> &mut crate::model::Object {
774        self.spec
775            .resource
776            .as_mut()
777            .expect("resource field initialized in `new()`")
778    }
779
780    pub(crate) fn build(self) -> PerformUpload<C, Payload<T>> {
781        PerformUpload::new(
782            self.checksum,
783            self.payload,
784            self.inner,
785            self.spec,
786            self.params,
787            self.options,
788        )
789    }
790
791    fn switch_checksum<F, U>(self, new: F) -> UploadObject<T, U>
792    where
793        F: FnOnce(C) -> U,
794    {
795        UploadObject {
796            payload: self.payload,
797            inner: self.inner,
798            spec: self.spec,
799            params: self.params,
800            options: self.options,
801            checksum: new(self.checksum),
802        }
803    }
804
805    fn set_crc32c<V: Into<u32>>(mut self, v: V) -> Self {
806        let checksum = self.mut_resource().checksums.get_or_insert_default();
807        checksum.crc32c = Some(v.into());
808        self
809    }
810
811    pub fn set_md5_hash<I, V>(mut self, i: I) -> Self
812    where
813        I: IntoIterator<Item = V>,
814        V: Into<u8>,
815    {
816        let checksum = self.mut_resource().checksums.get_or_insert_default();
817        checksum.md5_hash = i.into_iter().map(|v| v.into()).collect();
818        self
819    }
820}
821
822impl<T> UploadObject<T, Crc32c> {
823    /// Provide a precomputed value for the CRC32C checksum.
824    ///
825    /// # Example
826    /// ```
827    /// # use google_cloud_storage::client::Storage;
828    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
829    /// use crc32c::crc32c;
830    /// let response = client
831    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
832    ///     .with_known_crc32c(crc32c(b"hello world"))
833    ///     .send()
834    ///     .await?;
835    /// println!("response details={response:?}");
836    /// # Ok(()) }
837    /// ```
838    ///
839    /// In some applications, the payload's CRC32C checksum is already known.
840    /// For example, the application may be downloading the data from another
841    /// blob storage system.
842    ///
843    /// In such cases, it is safer to pass the known CRC32C of the payload to
844    /// [Cloud Storage], and more efficient to skip the computation in the
845    /// client library.
846    ///
847    /// Note that once you provide a CRC32C value to this builder you cannot
848    /// use [compute_md5()] to also have the library compute the checksums.
849    ///
850    /// [compute_md5()]: UploadObject::compute_md5
851    pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, KnownCrc32c> {
852        let this = self.switch_checksum(|_| KnownCrc32c);
853        this.set_crc32c(v)
854    }
855
856    /// Provide a precomputed value for the MD5 hash.
857    ///
858    /// # Example
859    /// ```
860    /// # use google_cloud_storage::client::Storage;
861    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
862    /// use md5::compute;
863    /// let hash = md5::compute(b"hello world");
864    /// let response = client
865    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
866    ///     .with_known_md5_hash(bytes::Bytes::from_owner(hash.0))
867    ///     .send()
868    ///     .await?;
869    /// println!("response details={response:?}");
870    /// # Ok(()) }
871    /// ```
872    ///
873    /// In some applications, the payload's MD5 hash is already known. For
874    /// example, the application may be downloading the data from another blob
875    /// storage system.
876    ///
877    /// In such cases, it is safer to pass the known MD5 of the payload to
878    /// [Cloud Storage], and more efficient to skip the computation in the
879    /// client library.
880    ///
881    /// Note that once you provide a MD5 value to this builder you cannot
882    /// use [compute_md5()] to also have the library compute the checksums.
883    ///
884    /// [compute_md5()]: UploadObject::compute_md5
885    pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Crc32c<KnownMd5>>
886    where
887        I: IntoIterator<Item = V>,
888        V: Into<u8>,
889    {
890        let this = self.switch_checksum(|_| Crc32c::from_inner(KnownMd5));
891        this.set_md5_hash(i)
892    }
893
894    /// Enables computation of MD5 hashes.
895    ///
896    /// # Example
897    /// ```
898    /// # use google_cloud_storage::client::Storage;
899    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
900    /// let payload = tokio::fs::File::open("my-data").await?;
901    /// let response = client
902    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
903    ///     .compute_md5()
904    ///     .send()
905    ///     .await?;
906    /// println!("response details={response:?}");
907    /// # Ok(()) }
908    /// ```
909    ///
910    /// See [precompute_checksums][UploadObject::precompute_checksums] for more
911    /// details on how checksums are used by the client library and their
912    /// limitations.
913    pub fn compute_md5(self) -> UploadObject<T, Md5<Crc32c>> {
914        self.switch_checksum(Md5::from_inner)
915    }
916}
917
918impl<T> UploadObject<T, Crc32c<KnownMd5>> {
919    /// See [UploadObject<T, Crc32c>::with_known_crc32c].
920    pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, Known> {
921        let this = self.switch_checksum(|_| Known);
922        this.set_crc32c(v)
923    }
924}
925
926impl<T> UploadObject<T, Md5<Crc32c>> {
927    /// See [UploadObject<T, Crc32c>::with_known_crc32c].
928    pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, Md5<KnownCrc32c>> {
929        let this = self.switch_checksum(|_| Md5::from_inner(KnownCrc32c));
930        this.set_crc32c(v)
931    }
932
933    /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
934    pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Crc32c<KnownMd5>>
935    where
936        I: IntoIterator<Item = V>,
937        V: Into<u8>,
938    {
939        let this = self.switch_checksum(|_| Crc32c::from_inner(KnownMd5));
940        this.set_md5_hash(i)
941    }
942}
943
944impl<T> UploadObject<T, Md5<KnownCrc32c>> {
945    /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
946    pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Known>
947    where
948        I: IntoIterator<Item = V>,
949        V: Into<u8>,
950    {
951        let this = self.switch_checksum(|_| Known);
952        this.set_md5_hash(i)
953    }
954}
955
956impl<T> UploadObject<T, KnownCrc32c> {
957    /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
958    pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Known>
959    where
960        I: IntoIterator<Item = V>,
961        V: Into<u8>,
962    {
963        let this = self.switch_checksum(|_| Known);
964        this.set_md5_hash(i)
965    }
966
967    /// See [UploadObject<T, Crc32c>::compute_md5()].
968    pub fn compute_md5(self) -> UploadObject<T, Md5<KnownCrc32c>> {
969        self.switch_checksum(Md5::from_inner)
970    }
971}
972
973impl<T> UploadObject<T> {
974    pub(crate) fn new<B, O, P>(
975        inner: std::sync::Arc<StorageInner>,
976        bucket: B,
977        object: O,
978        payload: P,
979    ) -> Self
980    where
981        B: Into<String>,
982        O: Into<String>,
983        P: Into<Payload<T>>,
984    {
985        let options = inner.options.clone();
986        let resource = crate::model::Object::new()
987            .set_bucket(bucket)
988            .set_name(object);
989        UploadObject {
990            inner,
991            spec: crate::model::WriteObjectSpec::new().set_resource(resource),
992            params: None,
993            payload: payload.into(),
994            options,
995            checksum: Crc32c::default(),
996        }
997    }
998}
999
1000impl<T, C> UploadObject<T, C>
1001where
1002    C: ChecksumEngine + Send + Sync + 'static,
1003    T: StreamingSource + Seek + Send + Sync + 'static,
1004    <T as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
1005    <T as Seek>::Error: std::error::Error + Send + Sync + 'static,
1006{
1007    /// A simple upload from a buffer.
1008    ///
1009    /// # Example
1010    /// ```
1011    /// # use google_cloud_storage::client::Storage;
1012    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1013    /// let response = client
1014    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1015    ///     .send_unbuffered()
1016    ///     .await?;
1017    /// println!("response details={response:?}");
1018    /// # Ok(()) }
1019    /// ```
1020    pub async fn send_unbuffered(self) -> Result<Object> {
1021        self.build().send_unbuffered().await
1022    }
1023
1024    /// Precompute the payload checksums before uploading the data.
1025    ///
1026    /// If the checksums are known when the upload starts, the client library
1027    /// can include the checksums with the upload request, and the service can
1028    /// reject the upload if the payload and the checksums do not match.
1029    ///
1030    /// # Example
1031    /// ```
1032    /// # use google_cloud_storage::client::Storage;
1033    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1034    /// let payload = tokio::fs::File::open("my-data").await?;
1035    /// let response = client
1036    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
1037    ///     .precompute_checksums()
1038    ///     .await?
1039    ///     .send_unbuffered()
1040    ///     .await?;
1041    /// println!("response details={response:?}");
1042    /// # Ok(()) }
1043    /// ```
1044    ///
1045    /// Precomputing the checksums can be expensive if the data source is slow
1046    /// to read. Therefore, the client library does not precompute the checksums
1047    /// by default. The client library compares the checksums computed by the
1048    /// service against its own checksums. If they do not match, the client
1049    /// library returns an error. However, the service has already created the
1050    /// object with the (likely incorrect) data.
1051    ///
1052    /// The client library currently uses the [JSON API], it is not possible to
1053    /// send the checksums at the end of the upload with this API.
1054    ///
1055    /// [JSON API]: https://cloud.google.com/storage/docs/json_api
1056    pub async fn precompute_checksums(mut self) -> Result<UploadObject<T, Known>>
1057    where
1058        C: ChecksumEngine + Send + Sync + 'static,
1059    {
1060        let mut offset = 0_u64;
1061        self.payload.seek(offset).await.map_err(Error::ser)?;
1062        while let Some(n) = self.payload.next().await.transpose().map_err(Error::ser)? {
1063            self.checksum.update(offset, &n);
1064            offset += n.len() as u64;
1065        }
1066        self.payload.seek(0_u64).await.map_err(Error::ser)?;
1067        let computed = self.checksum.finalize();
1068        let current = self.mut_resource().checksums.get_or_insert_default();
1069        crate::storage::checksum::update(current, computed);
1070        Ok(self.switch_checksum(|_| Known))
1071    }
1072}
1073
1074impl<T, C> UploadObject<T, C>
1075where
1076    C: ChecksumEngine + Send + Sync + 'static,
1077    T: StreamingSource + Send + Sync + 'static,
1078    T::Error: std::error::Error + Send + Sync + 'static,
1079{
1080    /// Upload an object from a streaming source without rewinds.
1081    ///
1082    /// # Example
1083    /// ```
1084    /// # use google_cloud_storage::client::Storage;
1085    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1086    /// let response = client
1087    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1088    ///     .send()
1089    ///     .await?;
1090    /// println!("response details={response:?}");
1091    /// # Ok(()) }
1092    /// ```
1093    pub async fn send(self) -> crate::Result<Object> {
1094        self.build().send().await
1095    }
1096}
1097
1098// We need `Debug` to use `expect_err()` in `Result<UploadObject, ...>`.
1099impl<T, C> std::fmt::Debug for UploadObject<T, C>
1100where
1101    C: std::fmt::Debug,
1102{
1103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104        f.debug_struct("UploadObject")
1105            .field("inner", &self.inner)
1106            .field("spec", &self.spec)
1107            .field("params", &self.params)
1108            // skip payload, as it is not `Debug`
1109            .field("options", &self.options)
1110            .field("checksum", &self.checksum)
1111            .finish()
1112    }
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117    use super::client::tests::{test_builder, test_inner_client};
1118    use super::*;
1119    use crate::model::{ObjectChecksums, WriteObjectSpec};
1120    use crate::upload_source::tests::MockSeekSource;
1121    use std::error::Error as _;
1122    use std::io::{Error as IoError, ErrorKind};
1123
1124    type Result = anyhow::Result<()>;
1125
1126    // Verify `upload_object()` can be used with a source that implements `StreamingSource` **and** `Seek`
1127    #[tokio::test]
1128    async fn test_upload_streaming_source_and_seek() -> Result {
1129        struct Source;
1130        impl crate::upload_source::StreamingSource for Source {
1131            type Error = std::io::Error;
1132            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1133                None
1134            }
1135        }
1136        impl crate::upload_source::Seek for Source {
1137            type Error = std::io::Error;
1138            async fn seek(&mut self, _offset: u64) -> std::result::Result<(), Self::Error> {
1139                Ok(())
1140            }
1141        }
1142
1143        let client = Storage::builder()
1144            .with_credentials(auth::credentials::testing::test_credentials())
1145            .build()
1146            .await?;
1147        let _ = client.upload_object("projects/_/buckets/test-bucket", "test-object", Source);
1148        Ok(())
1149    }
1150
1151    // Verify `upload_object()` can be used with a source that **only** implements `StreamingSource`.
1152    #[tokio::test]
1153    async fn test_upload_only_streaming_source() -> Result {
1154        struct Source;
1155        impl crate::upload_source::StreamingSource for Source {
1156            type Error = std::io::Error;
1157            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1158                None
1159            }
1160        }
1161
1162        let client = Storage::builder()
1163            .with_credentials(auth::credentials::testing::test_credentials())
1164            .build()
1165            .await?;
1166        let _ = client.upload_object("projects/_/buckets/test-bucket", "test-object", Source);
1167        Ok(())
1168    }
1169
1170    // Verify `upload_object()` meets normal Send, Sync, requirements.
1171    #[tokio::test]
1172    async fn test_upload_is_send_and_static() -> Result {
1173        let client = Storage::builder()
1174            .with_credentials(auth::credentials::testing::test_credentials())
1175            .build()
1176            .await?;
1177
1178        fn need_send<T: Send>(_val: &T) {}
1179        fn need_sync<T: Sync>(_val: &T) {}
1180        fn need_static<T: 'static>(_val: &T) {}
1181
1182        let upload = client.upload_object("projects/_/buckets/test-bucket", "test-object", "");
1183        need_send(&upload);
1184        need_sync(&upload);
1185        need_static(&upload);
1186
1187        let upload = client
1188            .upload_object("projects/_/buckets/test-bucket", "test-object", "")
1189            .send_unbuffered();
1190        need_send(&upload);
1191        need_static(&upload);
1192
1193        let upload = client
1194            .upload_object("projects/_/buckets/test-bucket", "test-object", "")
1195            .send();
1196        need_send(&upload);
1197        need_static(&upload);
1198
1199        Ok(())
1200    }
1201
1202    #[test]
1203    fn upload_object_unbuffered_metadata() -> Result {
1204        use crate::model::ObjectAccessControl;
1205        let inner = test_inner_client(test_builder());
1206        let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1207            .with_if_generation_match(10)
1208            .with_if_generation_not_match(20)
1209            .with_if_metageneration_match(30)
1210            .with_if_metageneration_not_match(40)
1211            .with_predefined_acl("private")
1212            .with_acl([ObjectAccessControl::new()
1213                .set_entity("allAuthenticatedUsers")
1214                .set_role("READER")])
1215            .with_cache_control("public; max-age=7200")
1216            .with_content_disposition("inline")
1217            .with_content_encoding("gzip")
1218            .with_content_language("en")
1219            .with_content_type("text/plain")
1220            .with_known_crc32c(crc32c::crc32c(b""))
1221            .with_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1222            .with_event_based_hold(true)
1223            .with_known_md5_hash(md5::compute(b"").0)
1224            .with_metadata([("k0", "v0"), ("k1", "v1")])
1225            .with_retention(
1226                crate::model::object::Retention::new()
1227                    .set_mode(crate::model::object::retention::Mode::Locked)
1228                    .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
1229            )
1230            .with_storage_class("ARCHIVE")
1231            .with_temporary_hold(true)
1232            .with_kms_key("test-key");
1233
1234        let resource = request.spec.resource.take().unwrap();
1235        let request = request;
1236        assert_eq!(
1237            &request.spec,
1238            &WriteObjectSpec::new()
1239                .set_if_generation_match(10)
1240                .set_if_generation_not_match(20)
1241                .set_if_metageneration_match(30)
1242                .set_if_metageneration_not_match(40)
1243                .set_predefined_acl("private")
1244        );
1245
1246        assert_eq!(
1247            resource,
1248            Object::new()
1249                .set_name("object")
1250                .set_bucket("projects/_/buckets/bucket")
1251                .set_acl([ObjectAccessControl::new()
1252                    .set_entity("allAuthenticatedUsers")
1253                    .set_role("READER")])
1254                .set_cache_control("public; max-age=7200")
1255                .set_content_disposition("inline")
1256                .set_content_encoding("gzip")
1257                .set_content_language("en")
1258                .set_content_type("text/plain")
1259                .set_checksums(
1260                    crate::model::ObjectChecksums::new()
1261                        .set_crc32c(crc32c::crc32c(b""))
1262                        .set_md5_hash(bytes::Bytes::from_iter(md5::compute(b"").0))
1263                )
1264                .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1265                .set_event_based_hold(true)
1266                .set_metadata([("k0", "v0"), ("k1", "v1")])
1267                .set_retention(
1268                    crate::model::object::Retention::new()
1269                        .set_mode("LOCKED")
1270                        .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?)
1271                )
1272                .set_storage_class("ARCHIVE")
1273                .set_temporary_hold(true)
1274                .set_kms_key("test-key")
1275        );
1276
1277        Ok(())
1278    }
1279
1280    #[test]
1281    fn upload_object_options() {
1282        let inner = test_inner_client(
1283            test_builder()
1284                .with_resumable_upload_threshold(123_usize)
1285                .with_resumable_upload_buffer_size(234_usize),
1286        );
1287        let request = UploadObject::new(inner.clone(), "projects/_/buckets/bucket", "object", "");
1288        assert_eq!(request.options.resumable_upload_threshold, 123);
1289        assert_eq!(request.options.resumable_upload_buffer_size, 234);
1290
1291        let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1292            .with_resumable_upload_threshold(345_usize)
1293            .with_resumable_upload_buffer_size(456_usize);
1294        assert_eq!(request.options.resumable_upload_threshold, 345);
1295        assert_eq!(request.options.resumable_upload_buffer_size, 456);
1296    }
1297
1298    const QUICK: &str = "the quick brown fox jumps over the lazy dog";
1299    const VEXING: &str = "how vexingly quick daft zebras jump";
1300
1301    fn quick_checksum<E: ChecksumEngine>(mut engine: E) -> ObjectChecksums {
1302        engine.update(0, &bytes::Bytes::from_static(QUICK.as_bytes()));
1303        engine.finalize()
1304    }
1305
1306    async fn collect<S: StreamingSource>(mut stream: S) -> anyhow::Result<Vec<u8>> {
1307        let mut collected = Vec::new();
1308        while let Some(b) = stream.next().await.transpose()? {
1309            collected.extend_from_slice(&b);
1310        }
1311        Ok(collected)
1312    }
1313
1314    #[tokio::test]
1315    async fn checksum_default() -> Result {
1316        let client = test_builder().build().await?;
1317        let upload = client
1318            .upload_object("my-bucket", "my-object", QUICK)
1319            .precompute_checksums()
1320            .await?;
1321        let want = quick_checksum(Crc32c::default());
1322        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1323        let collected = collect(upload.payload).await?;
1324        assert_eq!(collected, QUICK.as_bytes());
1325        Ok(())
1326    }
1327
1328    #[tokio::test]
1329    async fn checksum_md5_and_crc32c() -> Result {
1330        let client = test_builder().build().await?;
1331        let upload = client
1332            .upload_object("my-bucket", "my-object", QUICK)
1333            .compute_md5()
1334            .precompute_checksums()
1335            .await?;
1336        let want = quick_checksum(Crc32c::from_inner(Md5::default()));
1337        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1338        Ok(())
1339    }
1340
1341    #[tokio::test]
1342    async fn checksum_precomputed() -> Result {
1343        let mut engine = Crc32c::from_inner(Md5::default());
1344        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1345        let ck = engine.finalize();
1346
1347        let client = test_builder().build().await?;
1348        let upload = client
1349            .upload_object("my-bucket", "my-object", QUICK)
1350            .with_known_crc32c(ck.crc32c.unwrap())
1351            .with_known_md5_hash(ck.md5_hash.clone())
1352            .precompute_checksums()
1353            .await?;
1354        // Note that the checksums do not match the data. This is intentional,
1355        // we are trying to verify that whatever is provided in with_crc32c()
1356        // and with_md5() is respected.
1357        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(ck));
1358
1359        Ok(())
1360    }
1361
1362    #[tokio::test]
1363    async fn checksum_crc32c_known_md5_computed() -> Result {
1364        let mut engine = Crc32c::from_inner(Md5::default());
1365        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1366        let ck = engine.finalize();
1367
1368        let client = test_builder().build().await?;
1369        let upload = client
1370            .upload_object("my-bucket", "my-object", QUICK)
1371            .compute_md5()
1372            .with_known_crc32c(ck.crc32c.unwrap())
1373            .precompute_checksums()
1374            .await?;
1375        // Note that the checksums do not match the data. This is intentional,
1376        // we are trying to verify that whatever is provided in with_known*()
1377        // is respected.
1378        let want = quick_checksum(Md5::default()).set_crc32c(ck.crc32c.unwrap());
1379        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1380
1381        Ok(())
1382    }
1383
1384    #[tokio::test]
1385    async fn checksum_mixed_then_precomputed() -> Result {
1386        let mut engine = Crc32c::from_inner(Md5::default());
1387        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1388        let ck = engine.finalize();
1389
1390        let client = test_builder().build().await?;
1391        let upload = client
1392            .upload_object("my-bucket", "my-object", QUICK)
1393            .with_known_md5_hash(ck.md5_hash.clone())
1394            .with_known_crc32c(ck.crc32c.unwrap())
1395            .precompute_checksums()
1396            .await?;
1397        // Note that the checksums do not match the data. This is intentional,
1398        // we are trying to verify that whatever is provided in with_known*()
1399        // is respected.
1400        let want = ck.clone();
1401        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1402
1403        Ok(())
1404    }
1405
1406    #[tokio::test]
1407    async fn checksum_full_computed_then_md5_precomputed() -> Result {
1408        let mut engine = Crc32c::from_inner(Md5::default());
1409        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1410        let ck = engine.finalize();
1411
1412        let client = test_builder().build().await?;
1413        let upload = client
1414            .upload_object("my-bucket", "my-object", QUICK)
1415            .compute_md5()
1416            .with_known_md5_hash(ck.md5_hash.clone())
1417            .precompute_checksums()
1418            .await?;
1419        // Note that the checksums do not match the data. This is intentional,
1420        // we are trying to verify that whatever is provided in with_known*()
1421        // is respected.
1422        let want = quick_checksum(Crc32c::default()).set_md5_hash(ck.md5_hash.clone());
1423        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1424
1425        Ok(())
1426    }
1427
1428    #[tokio::test]
1429    async fn checksum_known_crc32_then_computed_md5() -> Result {
1430        let mut engine = Crc32c::from_inner(Md5::default());
1431        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1432        let ck = engine.finalize();
1433
1434        let client = test_builder().build().await?;
1435        let upload = client
1436            .upload_object("my-bucket", "my-object", QUICK)
1437            .with_known_crc32c(ck.crc32c.unwrap())
1438            .compute_md5()
1439            .with_known_md5_hash(ck.md5_hash.clone())
1440            .precompute_checksums()
1441            .await?;
1442        // Note that the checksums do not match the data. This is intentional,
1443        // we are trying to verify that whatever is provided in with_known*()
1444        // is respected.
1445        let want = ck.clone();
1446        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1447
1448        Ok(())
1449    }
1450
1451    #[tokio::test]
1452    async fn checksum_known_crc32_then_known_md5() -> Result {
1453        let mut engine = Crc32c::from_inner(Md5::default());
1454        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1455        let ck = engine.finalize();
1456
1457        let client = test_builder().build().await?;
1458        let upload = client
1459            .upload_object("my-bucket", "my-object", QUICK)
1460            .with_known_crc32c(ck.crc32c.unwrap())
1461            .with_known_md5_hash(ck.md5_hash.clone())
1462            .precompute_checksums()
1463            .await?;
1464        // Note that the checksums do not match the data. This is intentional,
1465        // we are trying to verify that whatever is provided in with_known*()
1466        // is respected.
1467        let want = ck.clone();
1468        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1469
1470        Ok(())
1471    }
1472
1473    #[tokio::test]
1474    async fn precompute_checksums_seek_error() -> Result {
1475        let mut source = MockSeekSource::new();
1476        source
1477            .expect_seek()
1478            .once()
1479            .returning(|_| Err(IoError::new(ErrorKind::Deadlock, "test-only")));
1480
1481        let client = test_builder().build().await?;
1482        let err = client
1483            .upload_object("my-bucket", "my-object", source)
1484            .precompute_checksums()
1485            .await
1486            .expect_err("seek() returns an error");
1487        assert!(err.is_serialization(), "{err:?}");
1488        assert!(
1489            err.source()
1490                .and_then(|e| e.downcast_ref::<IoError>())
1491                .is_some(),
1492            "{err:?}"
1493        );
1494
1495        Ok(())
1496    }
1497
1498    #[tokio::test]
1499    async fn precompute_checksums_next_error() -> Result {
1500        let mut source = MockSeekSource::new();
1501        source.expect_seek().returning(|_| Ok(()));
1502        let mut seq = mockall::Sequence::new();
1503        source
1504            .expect_next()
1505            .times(3)
1506            .in_sequence(&mut seq)
1507            .returning(|| Some(Ok(bytes::Bytes::new())));
1508        source
1509            .expect_next()
1510            .once()
1511            .in_sequence(&mut seq)
1512            .returning(|| Some(Err(IoError::new(ErrorKind::BrokenPipe, "test-only"))));
1513
1514        let client = test_builder().build().await?;
1515        let err = client
1516            .upload_object("my-bucket", "my-object", source)
1517            .precompute_checksums()
1518            .await
1519            .expect_err("seek() returns an error");
1520        assert!(err.is_serialization(), "{err:?}");
1521        assert!(
1522            err.source()
1523                .and_then(|e| e.downcast_ref::<IoError>())
1524                .is_some(),
1525            "{err:?}"
1526        );
1527
1528        Ok(())
1529    }
1530
1531    #[tokio::test]
1532    async fn debug() -> Result {
1533        let client = test_builder().build().await?;
1534        let upload = client
1535            .upload_object("my-bucket", "my-object", "")
1536            .precompute_checksums()
1537            .await;
1538
1539        let fmt = format!("{upload:?}");
1540        ["UploadObject", "inner", "spec", "options", "checksum"]
1541            .into_iter()
1542            .for_each(|text| {
1543                assert!(fmt.contains(text), "expected {text} in {fmt}");
1544            });
1545        Ok(())
1546    }
1547}