google_cloud_storage/storage/
upload_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Contains the request builder for [upload_object()] and related types.
16//!
17//! [upload_object()]: crate::storage::client::Storage::upload_object()
18
19use super::client::*;
20use super::perform_upload::PerformUpload;
21use super::upload_source::{Seek, StreamingSource};
22use super::*;
23use crate::storage::checksum::{
24    ChecksumEngine,
25    details::{Crc32c, Known, KnownCrc32c, KnownMd5, Md5, update as checksum_update},
26};
27
28/// A request builder for object uploads.
29///
30/// # Example: hello world
31/// ```
32/// use google_cloud_storage::client::Storage;
33/// async fn sample(client: &Storage) -> anyhow::Result<()> {
34///     let response = client
35///         .upload_object("projects/_/buckets/my-bucket", "hello", "Hello World!")
36///         .send_unbuffered()
37///         .await?;
38///     println!("response details={response:?}");
39///     Ok(())
40/// }
41/// ```
42///
43/// # Example: upload a file
44/// ```
45/// use google_cloud_storage::client::Storage;
46/// async fn sample(client: &Storage) -> anyhow::Result<()> {
47///     let payload = tokio::fs::File::open("my-data").await?;
48///     let response = client
49///         .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
50///         .send_unbuffered()
51///         .await?;
52///     println!("response details={response:?}");
53///     Ok(())
54/// }
55/// ```
56///
57/// # Example: upload a custom data source
58/// ```
59/// use google_cloud_storage::{client::Storage, upload_source::StreamingSource};
60/// struct DataSource;
61/// impl StreamingSource for DataSource {
62///     type Error = std::io::Error;
63///     async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
64///         # panic!();
65///     }
66/// }
67///
68/// async fn sample(client: &Storage) -> anyhow::Result<()> {
69///     let response = client
70///         .upload_object("projects/_/buckets/my-bucket", "my-object", DataSource)
71///         .send_buffered()
72///         .await?;
73///     println!("response details={response:?}");
74///     Ok(())
75/// }
76/// ```
77pub struct UploadObject<T, C = Crc32c> {
78    inner: std::sync::Arc<StorageInner>,
79    spec: crate::model::WriteObjectSpec,
80    params: Option<crate::model::CommonObjectRequestParams>,
81    payload: Payload<T>,
82    options: super::request_options::RequestOptions,
83    checksum: C,
84}
85
86impl<T, C> UploadObject<T, C> {
87    /// Set a [request precondition] on the object generation to match.
88    ///
89    /// With this precondition the request fails if the current object
90    /// generation matches the provided value. A common value is `0`, which
91    /// prevents uploads from succeeding if the object already exists.
92    ///
93    /// # Example
94    /// ```
95    /// # use google_cloud_storage::client::Storage;
96    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
97    /// let response = client
98    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
99    ///     .with_if_generation_match(0)
100    ///     .send_buffered()
101    ///     .await?;
102    /// println!("response details={response:?}");
103    /// # Ok(()) }
104    /// ```
105    ///
106    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
107    pub fn with_if_generation_match<V>(mut self, v: V) -> Self
108    where
109        V: Into<i64>,
110    {
111        self.spec.if_generation_match = Some(v.into());
112        self
113    }
114
115    /// Set a [request precondition] on the object generation to match.
116    ///
117    /// With this precondition the request fails if the current object
118    /// generation does not match the provided value.
119    ///
120    /// # Example
121    /// ```
122    /// # use google_cloud_storage::client::Storage;
123    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
124    /// let response = client
125    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
126    ///     .with_if_generation_not_match(0)
127    ///     .send_buffered()
128    ///     .await?;
129    /// println!("response details={response:?}");
130    /// # Ok(()) }
131    /// ```
132    ///
133    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
134    pub fn with_if_generation_not_match<V>(mut self, v: V) -> Self
135    where
136        V: Into<i64>,
137    {
138        self.spec.if_generation_not_match = Some(v.into());
139        self
140    }
141
142    /// Set a [request precondition] on the object meta generation.
143    ///
144    /// With this precondition the request fails if the current object metadata
145    /// generation does not match the provided value. This may be useful to
146    /// prevent changes when the metageneration is known.
147    ///
148    /// # Example
149    /// ```
150    /// # use google_cloud_storage::client::Storage;
151    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
152    /// let response = client
153    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
154    ///     .with_if_metageneration_match(1234)
155    ///     .send_buffered()
156    ///     .await?;
157    /// println!("response details={response:?}");
158    /// # Ok(()) }
159    /// ```
160    ///
161    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
162    pub fn with_if_metageneration_match<V>(mut self, v: V) -> Self
163    where
164        V: Into<i64>,
165    {
166        self.spec.if_metageneration_match = Some(v.into());
167        self
168    }
169
170    /// Set a [request precondition] on the object meta-generation.
171    ///
172    /// With this precondition the request fails if the current object metadata
173    /// generation matches the provided value. This is rarely useful in uploads,
174    /// it is more commonly used on downloads to prevent downloads if the value
175    /// is already cached.
176    ///
177    /// # Example
178    /// ```
179    /// # use google_cloud_storage::client::Storage;
180    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
181    /// let response = client
182    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
183    ///     .with_if_metageneration_not_match(1234)
184    ///     .send_buffered()
185    ///     .await?;
186    /// println!("response details={response:?}");
187    /// # Ok(()) }
188    /// ```
189    ///
190    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
191    pub fn with_if_metageneration_not_match<V>(mut self, v: V) -> Self
192    where
193        V: Into<i64>,
194    {
195        self.spec.if_metageneration_not_match = Some(v.into());
196        self
197    }
198
199    /// Sets the ACL for the new object.
200    ///
201    /// # Example
202    /// ```
203    /// # use google_cloud_storage::client::Storage;
204    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
205    /// # use google_cloud_storage::model::ObjectAccessControl;
206    /// let response = client
207    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
208    ///     .with_acl([ObjectAccessControl::new().set_entity("allAuthenticatedUsers").set_role("READER")])
209    ///     .send_buffered()
210    ///     .await?;
211    /// println!("response details={response:?}");
212    /// # Ok(()) }
213    /// ```
214    pub fn with_acl<I, V>(mut self, v: I) -> Self
215    where
216        I: IntoIterator<Item = V>,
217        V: Into<crate::model::ObjectAccessControl>,
218    {
219        self.mut_resource().acl = v.into_iter().map(|a| a.into()).collect();
220        self
221    }
222
223    /// Sets the [cache control] for the new object.
224    ///
225    /// This can be used to control caching in [public objects].
226    ///
227    /// # Example
228    /// ```
229    /// # use google_cloud_storage::client::Storage;
230    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
231    /// let response = client
232    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
233    ///     .with_cache_control("public; max-age=7200")
234    ///     .send_buffered()
235    ///     .await?;
236    /// println!("response details={response:?}");
237    /// # Ok(()) }
238    /// ```
239    ///
240    /// [public objects]: https://cloud.google.com/storage/docs/access-control/making-data-public
241    /// [cache control]: https://datatracker.ietf.org/doc/html/rfc7234#section-5.2
242    pub fn with_cache_control<V: Into<String>>(mut self, v: V) -> Self {
243        self.mut_resource().cache_control = v.into();
244        self
245    }
246
247    /// Sets the [content disposition] for the new object.
248    ///
249    /// Google Cloud Storage can serve content directly to web browsers. This
250    /// attribute sets the `Content-Disposition` header, which may change how
251    /// the browser displays the contents.
252    ///
253    /// # Example
254    /// ```
255    /// # use google_cloud_storage::client::Storage;
256    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
257    /// let response = client
258    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
259    ///     .with_content_disposition("inline")
260    ///     .send_buffered()
261    ///     .await?;
262    /// println!("response details={response:?}");
263    /// # Ok(()) }
264    /// ```
265    ///
266    /// [content disposition]: https://datatracker.ietf.org/doc/html/rfc6266
267    pub fn with_content_disposition<V: Into<String>>(mut self, v: V) -> Self {
268        self.mut_resource().content_disposition = v.into();
269        self
270    }
271
272    /// Sets the [content encoding] for the object data.
273    ///
274    /// This can be used to upload compressed data and enable [transcoding] of
275    /// the data during downloads.
276    ///
277    /// # Example
278    /// ```
279    /// # use google_cloud_storage::client::Storage;
280    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
281    /// use flate2::write::GzEncoder;
282    /// use std::io::Write;
283    /// let mut e = GzEncoder::new(Vec::new(), flate2::Compression::default());
284    /// e.write_all(b"hello world");
285    /// let response = client
286    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", bytes::Bytes::from_owner(e.finish()?))
287    ///     .with_content_encoding("gzip")
288    ///     .send_buffered()
289    ///     .await?;
290    /// println!("response details={response:?}");
291    /// # Ok(()) }
292    /// ```
293    ///
294    /// [transcoding]: https://cloud.google.com/storage/docs/transcoding
295    /// [content encoding]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.2
296    pub fn with_content_encoding<V: Into<String>>(mut self, v: V) -> Self {
297        self.mut_resource().content_encoding = v.into();
298        self
299    }
300
301    /// Sets the [content language] for the new object.
302    ///
303    /// Google Cloud Storage can serve content directly to web browsers. This
304    /// attribute sets the `Content-Language` header, which may change how the
305    /// browser displays the contents.
306    ///
307    /// # Example
308    /// ```
309    /// # use google_cloud_storage::client::Storage;
310    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
311    /// let response = client
312    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
313    ///     .with_content_language("en")
314    ///     .send_buffered()
315    ///     .await?;
316    /// println!("response details={response:?}");
317    /// # Ok(()) }
318    /// ```
319    ///
320    /// [content language]: https://cloud.google.com/storage/docs/metadata#content-language
321    pub fn with_content_language<V: Into<String>>(mut self, v: V) -> Self {
322        self.mut_resource().content_language = v.into();
323        self
324    }
325
326    /// Sets the [content type] for the new object.
327    ///
328    /// Google Cloud Storage can serve content directly to web browsers. This
329    /// attribute sets the `Content-Type` header, which may change how the
330    /// browser interprets the contents.
331    ///
332    /// # Example
333    /// ```
334    /// # use google_cloud_storage::client::Storage;
335    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
336    /// let response = client
337    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
338    ///     .with_content_type("text/plain")
339    ///     .send_buffered()
340    ///     .await?;
341    /// println!("response details={response:?}");
342    /// # Ok(()) }
343    /// ```
344    ///
345    /// [content type]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.5
346    pub fn with_content_type<V: Into<String>>(mut self, v: V) -> Self {
347        self.mut_resource().content_type = v.into();
348        self
349    }
350
351    /// Sets the [custom time] for the new object.
352    ///
353    /// This field is typically set in order to use the [DaysSinceCustomTime]
354    /// condition in Object Lifecycle Management.
355    ///
356    /// # Example
357    /// ```
358    /// # use google_cloud_storage::client::Storage;
359    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
360    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
361    /// let response = client
362    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
363    ///     .with_custom_time(time)
364    ///     .send_buffered()
365    ///     .await?;
366    /// println!("response details={response:?}");
367    /// # Ok(()) }
368    /// ```
369    ///
370    /// [DaysSinceCustomTime]: https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
371    /// [custom time]: https://cloud.google.com/storage/docs/metadata#custom-time
372    pub fn with_custom_time<V: Into<wkt::Timestamp>>(mut self, v: V) -> Self {
373        self.mut_resource().custom_time = Some(v.into());
374        self
375    }
376
377    /// Sets the [event based hold] flag for the new object.
378    ///
379    /// This field is typically set in order to prevent objects from being
380    /// deleted or modified.
381    ///
382    /// # Example
383    /// ```
384    /// # use google_cloud_storage::client::Storage;
385    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
386    /// let response = client
387    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
388    ///     .with_event_based_hold(true)
389    ///     .send_buffered()
390    ///     .await?;
391    /// println!("response details={response:?}");
392    /// # Ok(()) }
393    /// ```
394    ///
395    /// [event based hold]: https://cloud.google.com/storage/docs/object-holds
396    pub fn with_event_based_hold<V: Into<bool>>(mut self, v: V) -> Self {
397        self.mut_resource().event_based_hold = Some(v.into());
398        self
399    }
400
401    /// Sets the [custom metadata] for the new object.
402    ///
403    /// This field is typically set to annotate the object with
404    /// application-specific metadata.
405    ///
406    /// # Example
407    /// ```
408    /// # use google_cloud_storage::client::Storage;
409    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
410    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
411    /// let response = client
412    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
413    ///     .with_metadata([("test-only", "true"), ("environment", "qa")])
414    ///     .send_buffered()
415    ///     .await?;
416    /// println!("response details={response:?}");
417    /// # Ok(()) }
418    /// ```
419    ///
420    /// [custom metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
421    pub fn with_metadata<I, K, V>(mut self, i: I) -> Self
422    where
423        I: IntoIterator<Item = (K, V)>,
424        K: Into<String>,
425        V: Into<String>,
426    {
427        self.mut_resource().metadata = i.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
428        self
429    }
430
431    /// Sets the [retention configuration] for the new object.
432    ///
433    /// # Example
434    /// ```
435    /// # use google_cloud_storage::client::Storage;
436    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
437    /// # use google_cloud_storage::model::object::{Retention, retention};
438    /// let response = client
439    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
440    ///     .with_retention(
441    ///         Retention::new()
442    ///             .set_mode(retention::Mode::Locked)
443    ///             .set_retain_until_time(wkt::Timestamp::try_from("2035-01-01T00:00:00Z")?))
444    ///     .send_buffered()
445    ///     .await?;
446    /// println!("response details={response:?}");
447    /// # Ok(()) }
448    /// ```
449    ///
450    /// [retention configuration]: https://cloud.google.com/storage/docs/metadata#retention-config
451    pub fn with_retention<V>(mut self, v: V) -> Self
452    where
453        V: Into<crate::model::object::Retention>,
454    {
455        self.mut_resource().retention = Some(v.into());
456        self
457    }
458
459    /// Sets the [storage class] for the new object.
460    ///
461    /// # Example
462    /// ```
463    /// # use google_cloud_storage::client::Storage;
464    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
465    /// let response = client
466    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
467    ///     .with_storage_class("ARCHIVE")
468    ///     .send_buffered()
469    ///     .await?;
470    /// println!("response details={response:?}");
471    /// # Ok(()) }
472    /// ```
473    ///
474    /// [storage class]: https://cloud.google.com/storage/docs/storage-classes
475    pub fn with_storage_class<V>(mut self, v: V) -> Self
476    where
477        V: Into<String>,
478    {
479        self.mut_resource().storage_class = v.into();
480        self
481    }
482
483    /// Sets the [temporary hold] flag for the new object.
484    ///
485    /// This field is typically set in order to prevent objects from being
486    /// deleted or modified.
487    ///
488    /// # Example
489    /// ```
490    /// # use google_cloud_storage::client::Storage;
491    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
492    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
493    /// let response = client
494    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
495    ///     .with_temporary_hold(true)
496    ///     .send_buffered()
497    ///     .await?;
498    /// println!("response details={response:?}");
499    /// # Ok(()) }
500    /// ```
501    ///
502    /// [temporary hold]: https://cloud.google.com/storage/docs/object-holds
503    pub fn with_temporary_hold<V: Into<bool>>(mut self, v: V) -> Self {
504        self.mut_resource().temporary_hold = v.into();
505        self
506    }
507
508    /// Sets the resource name of the [Customer-managed encryption key] for this
509    /// object.
510    ///
511    /// The service imposes a number of restrictions on the keys used to encrypt
512    /// Google Cloud Storage objects. Read the documentation in full before
513    /// trying to use customer-managed encryption keys. In particular, verify
514    /// the service has the necessary permissions, and the key is in a
515    /// compatible location.
516    ///
517    /// # Example
518    /// ```
519    /// # use google_cloud_storage::client::Storage;
520    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
521    /// let response = client
522    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
523    ///     .with_kms_key("projects/test-project/locations/us-central1/keyRings/test-ring/cryptoKeys/test-key")
524    ///     .send_buffered()
525    ///     .await?;
526    /// println!("response details={response:?}");
527    /// # Ok(()) }
528    /// ```
529    ///
530    /// [Customer-managed encryption key]: https://cloud.google.com/storage/docs/encryption/customer-managed-keys
531    pub fn with_kms_key<V>(mut self, v: V) -> Self
532    where
533        V: Into<String>,
534    {
535        self.mut_resource().kms_key = v.into();
536        self
537    }
538
539    /// Configure this object to use one of the [predefined ACLs].
540    ///
541    /// # Example
542    /// ```
543    /// # use google_cloud_storage::client::Storage;
544    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
545    /// let response = client
546    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
547    ///     .with_predefined_acl("private")
548    ///     .send_buffered()
549    ///     .await?;
550    /// println!("response details={response:?}");
551    /// # Ok(()) }
552    /// ```
553    ///
554    /// [predefined ACLs]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
555    pub fn with_predefined_acl<V>(mut self, v: V) -> Self
556    where
557        V: Into<String>,
558    {
559        self.spec.predefined_acl = v.into();
560        self
561    }
562
563    /// The encryption key used with the Customer-Supplied Encryption Keys
564    /// feature. In raw bytes format (not base64-encoded).
565    ///
566    /// # Example
567    /// ```
568    /// # use google_cloud_storage::client::Storage;
569    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
570    /// # use google_cloud_storage::builder::storage::KeyAes256;
571    /// let key: &[u8] = &[97; 32];
572    /// let response = client
573    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
574    ///     .with_key(KeyAes256::new(key)?)
575    ///     .send_buffered()
576    ///     .await?;
577    /// println!("response details={response:?}");
578    /// # Ok(()) }
579    /// ```
580    pub fn with_key(mut self, v: KeyAes256) -> Self {
581        self.params = Some(v.into());
582        self
583    }
584
585    /// Configure the idempotency for this upload.
586    ///
587    /// By default, the client library treats single-shot uploads without
588    /// preconditions, as non-idempotent. If the destination bucket is
589    /// configured with [object versioning] then the operation may succeed
590    /// multiple times with observable side-effects. With object versioning and
591    /// a [lifecycle] policy limiting the number of versions, uploading the same
592    /// data multiple times may result in data loss.
593    ///
594    /// The client library cannot efficiently determine if these conditions
595    /// apply to your upload. If they do, or your application can tolerate
596    /// multiple versions of the same data for other reasons, consider using
597    /// `with_idempotency(true)`.
598    ///
599    /// The client library treats resumable uploads as idempotent, regardless of
600    /// the value in this option. Such uploads can succeed at most once.
601    ///
602    /// # Example
603    /// ```
604    /// # use google_cloud_storage::client::Storage;
605    /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
606    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
607    /// use std::time::Duration;
608    /// use gax::retry_policy::RetryPolicyExt;
609    /// let response = client
610    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
611    ///     .with_idempotency(true)
612    ///     .send_buffered()
613    ///     .await?;
614    /// println!("response details={response:?}");
615    /// # Ok(()) }
616    /// ```
617    ///
618    /// [lifecycle]: https://cloud.google.com/storage/docs/lifecycle
619    /// [object versioning]: https://cloud.google.com/storage/docs/object-versioning
620    pub fn with_idempotency(mut self, v: bool) -> Self {
621        self.options.idempotency = Some(v);
622        self
623    }
624
625    /// The retry policy used for this request.
626    ///
627    /// # Example
628    /// ```
629    /// # use google_cloud_storage::client::Storage;
630    /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
631    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
632    /// use std::time::Duration;
633    /// use gax::retry_policy::RetryPolicyExt;
634    /// let response = client
635    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
636    ///     .with_retry_policy(RecommendedPolicy
637    ///         .with_attempt_limit(5)
638    ///         .with_time_limit(Duration::from_secs(10)),
639    ///     )
640    ///     .send_buffered()
641    ///     .await?;
642    /// println!("response details={response:?}");
643    /// # Ok(()) }
644    /// ```
645    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
646        self.options.retry_policy = v.into().into();
647        self
648    }
649
650    /// The backoff policy used for this request.
651    ///
652    /// # Example
653    /// ```
654    /// # use google_cloud_storage::client::Storage;
655    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
656    /// use std::time::Duration;
657    /// use gax::exponential_backoff::ExponentialBackoff;
658    /// let response = client
659    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
660    ///     .with_backoff_policy(ExponentialBackoff::default())
661    ///     .send_buffered()
662    ///     .await?;
663    /// println!("response details={response:?}");
664    /// # Ok(()) }
665    /// ```
666    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
667        mut self,
668        v: V,
669    ) -> Self {
670        self.options.backoff_policy = v.into().into();
671        self
672    }
673
674    /// The retry throttler used for this request.
675    ///
676    /// Most of the time you want to use the same throttler for all the requests
677    /// in a client, and even the same throttler for many clients. Rarely it
678    /// may be necessary to use an custom throttler for some subset of the
679    /// requests.
680    ///
681    /// # Example
682    /// ```
683    /// # use google_cloud_storage::client::Storage;
684    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
685    /// let response = client
686    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
687    ///     .with_retry_throttler(adhoc_throttler())
688    ///     .send_buffered()
689    ///     .await?;
690    /// println!("response details={response:?}");
691    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
692    ///     # panic!();
693    /// }
694    /// # Ok(()) }
695    /// ```
696    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
697        mut self,
698        v: V,
699    ) -> Self {
700        self.options.retry_throttler = v.into().into();
701        self
702    }
703
704    /// Sets the payload size threshold to switch from single-shot to resumable uploads.
705    ///
706    /// # Example
707    /// ```
708    /// # use google_cloud_storage::client::Storage;
709    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
710    /// let response = client
711    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
712    ///     .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
713    ///     .send_buffered()
714    ///     .await?;
715    /// println!("response details={response:?}");
716    /// # Ok(()) }
717    /// ```
718    ///
719    /// The client library can perform uploads using [single-shot] or
720    /// [resumable] uploads. For small objects, single-shot uploads offer better
721    /// performance, as they require a single HTTP transfer. For larger objects,
722    /// the additional request latency is not significant, and resumable uploads
723    /// offer better recovery on errors.
724    ///
725    /// The library automatically selects resumable uploads when the payload is
726    /// equal to or larger than this option. For smaller uploads the client
727    /// library uses single-shot uploads.
728    ///
729    /// The exact threshold depends on where the application is deployed and
730    /// destination bucket location with respect to where the application is
731    /// running. The library defaults should work well in most cases, but some
732    /// applications may benefit from fine-tuning.
733    ///
734    /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
735    /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
736    pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
737        self.options.resumable_upload_threshold = v.into();
738        self
739    }
740
741    /// Changes the buffer size for some resumable uploads.
742    ///
743    /// # Example
744    /// ```
745    /// # use google_cloud_storage::client::Storage;
746    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
747    /// let response = client
748    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
749    ///     .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
750    ///     .send_buffered()
751    ///     .await?;
752    /// println!("response details={response:?}");
753    /// # Ok(()) }
754    /// ```
755    ///
756    /// When performing [resumable uploads] from sources without [Seek] the
757    /// client library needs to buffer data in memory until it is persisted by
758    /// the service. Otherwise the data would be lost if the upload fails.
759    /// Applications may want to tune this buffer size:
760    ///
761    /// - Use smaller buffer sizes to support more concurrent uploads in the
762    ///   same application.
763    /// - Use larger buffer sizes for better throughput. Sending many small
764    ///   buffers stalls the upload until the client receives a successful
765    ///   response from the service.
766    ///
767    /// Keep in mind that there are diminishing returns on using larger buffers.
768    ///
769    /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
770    /// [Seek]: crate::upload_source::Seek
771    pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
772        self.options.resumable_upload_buffer_size = v.into();
773        self
774    }
775
776    fn mut_resource(&mut self) -> &mut crate::model::Object {
777        self.spec
778            .resource
779            .as_mut()
780            .expect("resource field initialized in `new()`")
781    }
782
783    pub(crate) fn build(self) -> PerformUpload<C, Payload<T>> {
784        PerformUpload::new(
785            self.checksum,
786            self.payload,
787            self.inner,
788            self.spec,
789            self.params,
790            self.options,
791        )
792    }
793
794    fn switch_checksum<F, U>(self, new: F) -> UploadObject<T, U>
795    where
796        F: FnOnce(C) -> U,
797    {
798        UploadObject {
799            payload: self.payload,
800            inner: self.inner,
801            spec: self.spec,
802            params: self.params,
803            options: self.options,
804            checksum: new(self.checksum),
805        }
806    }
807
808    fn set_crc32c<V: Into<u32>>(mut self, v: V) -> Self {
809        let checksum = self.mut_resource().checksums.get_or_insert_default();
810        checksum.crc32c = Some(v.into());
811        self
812    }
813
814    pub fn set_md5_hash<I, V>(mut self, i: I) -> Self
815    where
816        I: IntoIterator<Item = V>,
817        V: Into<u8>,
818    {
819        let checksum = self.mut_resource().checksums.get_or_insert_default();
820        checksum.md5_hash = i.into_iter().map(|v| v.into()).collect();
821        self
822    }
823}
824
825impl<T> UploadObject<T, Crc32c> {
826    /// Provide a precomputed value for the CRC32C checksum.
827    ///
828    /// # Example
829    /// ```
830    /// # use google_cloud_storage::client::Storage;
831    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
832    /// use crc32c::crc32c;
833    /// let response = client
834    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
835    ///     .with_known_crc32c(crc32c(b"hello world"))
836    ///     .send_buffered()
837    ///     .await?;
838    /// println!("response details={response:?}");
839    /// # Ok(()) }
840    /// ```
841    ///
842    /// In some applications, the payload's CRC32C checksum is already known.
843    /// For example, the application may be downloading the data from another
844    /// blob storage system.
845    ///
846    /// In such cases, it is safer to pass the known CRC32C of the payload to
847    /// [Cloud Storage], and more efficient to skip the computation in the
848    /// client library.
849    ///
850    /// Note that once you provide a CRC32C value to this builder you cannot
851    /// use [compute_md5()] to also have the library compute the checksums.
852    ///
853    /// [compute_md5()]: UploadObject::compute_md5
854    pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, KnownCrc32c> {
855        let this = self.switch_checksum(|_| KnownCrc32c);
856        this.set_crc32c(v)
857    }
858
859    /// Provide a precomputed value for the MD5 hash.
860    ///
861    /// # Example
862    /// ```
863    /// # use google_cloud_storage::client::Storage;
864    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
865    /// use md5::compute;
866    /// let hash = md5::compute(b"hello world");
867    /// let response = client
868    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
869    ///     .with_known_md5_hash(bytes::Bytes::from_owner(hash.0))
870    ///     .send_buffered()
871    ///     .await?;
872    /// println!("response details={response:?}");
873    /// # Ok(()) }
874    /// ```
875    ///
876    /// In some applications, the payload's MD5 hash is already known. For
877    /// example, the application may be downloading the data from another blob
878    /// storage system.
879    ///
880    /// In such cases, it is safer to pass the known MD5 of the payload to
881    /// [Cloud Storage], and more efficient to skip the computation in the
882    /// client library.
883    ///
884    /// Note that once you provide a MD5 value to this builder you cannot
885    /// use [compute_md5()] to also have the library compute the checksums.
886    ///
887    /// [compute_md5()]: UploadObject::compute_md5
888    pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Crc32c<KnownMd5>>
889    where
890        I: IntoIterator<Item = V>,
891        V: Into<u8>,
892    {
893        let this = self.switch_checksum(|_| Crc32c::from_inner(KnownMd5));
894        this.set_md5_hash(i)
895    }
896
897    /// Enables computation of MD5 hashes.
898    ///
899    /// # Example
900    /// ```
901    /// # use google_cloud_storage::client::Storage;
902    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
903    /// let payload = tokio::fs::File::open("my-data").await?;
904    /// let response = client
905    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
906    ///     .compute_md5()
907    ///     .send_buffered()
908    ///     .await?;
909    /// println!("response details={response:?}");
910    /// # Ok(()) }
911    /// ```
912    ///
913    /// See [precompute_checksums][UploadObject::precompute_checksums] for more
914    /// details on how checksums are used by the client library and their
915    /// limitations.
916    pub fn compute_md5(self) -> UploadObject<T, Md5<Crc32c>> {
917        self.switch_checksum(Md5::from_inner)
918    }
919}
920
921impl<T> UploadObject<T, Crc32c<KnownMd5>> {
922    /// See [UploadObject<T, Crc32c>::with_known_crc32c].
923    pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, Known> {
924        let this = self.switch_checksum(|_| Known);
925        this.set_crc32c(v)
926    }
927}
928
929impl<T> UploadObject<T, Md5<Crc32c>> {
930    /// See [UploadObject<T, Crc32c>::with_known_crc32c].
931    pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, Md5<KnownCrc32c>> {
932        let this = self.switch_checksum(|_| Md5::from_inner(KnownCrc32c));
933        this.set_crc32c(v)
934    }
935
936    /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
937    pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Crc32c<KnownMd5>>
938    where
939        I: IntoIterator<Item = V>,
940        V: Into<u8>,
941    {
942        let this = self.switch_checksum(|_| Crc32c::from_inner(KnownMd5));
943        this.set_md5_hash(i)
944    }
945}
946
947impl<T> UploadObject<T, Md5<KnownCrc32c>> {
948    /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
949    pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Known>
950    where
951        I: IntoIterator<Item = V>,
952        V: Into<u8>,
953    {
954        let this = self.switch_checksum(|_| Known);
955        this.set_md5_hash(i)
956    }
957}
958
959impl<T> UploadObject<T, KnownCrc32c> {
960    /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
961    pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Known>
962    where
963        I: IntoIterator<Item = V>,
964        V: Into<u8>,
965    {
966        let this = self.switch_checksum(|_| Known);
967        this.set_md5_hash(i)
968    }
969
970    /// See [UploadObject<T, Crc32c>::compute_md5()].
971    pub fn compute_md5(self) -> UploadObject<T, Md5<KnownCrc32c>> {
972        self.switch_checksum(Md5::from_inner)
973    }
974}
975
976impl<T> UploadObject<T> {
977    pub(crate) fn new<B, O, P>(
978        inner: std::sync::Arc<StorageInner>,
979        bucket: B,
980        object: O,
981        payload: P,
982    ) -> Self
983    where
984        B: Into<String>,
985        O: Into<String>,
986        P: Into<Payload<T>>,
987    {
988        let options = inner.options.clone();
989        let resource = crate::model::Object::new()
990            .set_bucket(bucket)
991            .set_name(object);
992        UploadObject {
993            inner,
994            spec: crate::model::WriteObjectSpec::new().set_resource(resource),
995            params: None,
996            payload: payload.into(),
997            options,
998            checksum: Crc32c::default(),
999        }
1000    }
1001}
1002
1003impl<T, C> UploadObject<T, C>
1004where
1005    C: ChecksumEngine + Send + Sync + 'static,
1006    T: StreamingSource + Seek + Send + Sync + 'static,
1007    <T as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
1008    <T as Seek>::Error: std::error::Error + Send + Sync + 'static,
1009{
1010    /// A simple upload from a buffer.
1011    ///
1012    /// # Example
1013    /// ```
1014    /// # use google_cloud_storage::client::Storage;
1015    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1016    /// let response = client
1017    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1018    ///     .send_unbuffered()
1019    ///     .await?;
1020    /// println!("response details={response:?}");
1021    /// # Ok(()) }
1022    /// ```
1023    pub async fn send_unbuffered(self) -> Result<Object> {
1024        self.build().send_unbuffered().await
1025    }
1026
1027    /// Precompute the payload checksums before uploading the data.
1028    ///
1029    /// If the checksums are known when the upload starts, the client library
1030    /// can include the checksums with the upload request, and the service can
1031    /// reject the upload if the payload and the checksums do not match.
1032    ///
1033    /// # Example
1034    /// ```
1035    /// # use google_cloud_storage::client::Storage;
1036    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1037    /// let payload = tokio::fs::File::open("my-data").await?;
1038    /// let response = client
1039    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
1040    ///     .precompute_checksums()
1041    ///     .await?
1042    ///     .send_unbuffered()
1043    ///     .await?;
1044    /// println!("response details={response:?}");
1045    /// # Ok(()) }
1046    /// ```
1047    ///
1048    /// Precomputing the checksums can be expensive if the data source is slow
1049    /// to read. Therefore, the client library does not precompute the checksums
1050    /// by default. The client library compares the checksums computed by the
1051    /// service against its own checksums. If they do not match, the client
1052    /// library returns an error. However, the service has already created the
1053    /// object with the (likely incorrect) data.
1054    ///
1055    /// The client library currently uses the [JSON API], it is not possible to
1056    /// send the checksums at the end of the upload with this API.
1057    ///
1058    /// [JSON API]: https://cloud.google.com/storage/docs/json_api
1059    pub async fn precompute_checksums(mut self) -> Result<UploadObject<T, Known>>
1060    where
1061        C: ChecksumEngine + Send + Sync + 'static,
1062    {
1063        let mut offset = 0_u64;
1064        self.payload.seek(offset).await.map_err(Error::ser)?;
1065        while let Some(n) = self.payload.next().await.transpose().map_err(Error::ser)? {
1066            self.checksum.update(offset, &n);
1067            offset += n.len() as u64;
1068        }
1069        self.payload.seek(0_u64).await.map_err(Error::ser)?;
1070        let computed = self.checksum.finalize();
1071        let current = self.mut_resource().checksums.get_or_insert_default();
1072        checksum_update(current, computed);
1073        Ok(self.switch_checksum(|_| Known))
1074    }
1075}
1076
1077impl<T, C> UploadObject<T, C>
1078where
1079    C: ChecksumEngine + Send + Sync + 'static,
1080    T: StreamingSource + Send + Sync + 'static,
1081    T::Error: std::error::Error + Send + Sync + 'static,
1082{
1083    /// Upload an object from a streaming source without rewinds.
1084    ///
1085    /// # Example
1086    /// ```
1087    /// # use google_cloud_storage::client::Storage;
1088    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1089    /// let response = client
1090    ///     .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1091    ///     .send_buffered()
1092    ///     .await?;
1093    /// println!("response details={response:?}");
1094    /// # Ok(()) }
1095    /// ```
1096    pub async fn send_buffered(self) -> crate::Result<Object> {
1097        self.build().send().await
1098    }
1099}
1100
1101// We need `Debug` to use `expect_err()` in `Result<UploadObject, ...>`.
1102impl<T, C> std::fmt::Debug for UploadObject<T, C>
1103where
1104    C: std::fmt::Debug,
1105{
1106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1107        f.debug_struct("UploadObject")
1108            .field("inner", &self.inner)
1109            .field("spec", &self.spec)
1110            .field("params", &self.params)
1111            // skip payload, as it is not `Debug`
1112            .field("options", &self.options)
1113            .field("checksum", &self.checksum)
1114            .finish()
1115    }
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120    use super::client::tests::{test_builder, test_inner_client};
1121    use super::*;
1122    use crate::model::{ObjectChecksums, WriteObjectSpec};
1123    use crate::upload_source::tests::MockSeekSource;
1124    use std::error::Error as _;
1125    use std::io::{Error as IoError, ErrorKind};
1126
1127    type Result = anyhow::Result<()>;
1128
1129    // Verify `upload_object()` can be used with a source that implements `StreamingSource` **and** `Seek`
1130    #[tokio::test]
1131    async fn test_upload_streaming_source_and_seek() -> Result {
1132        struct Source;
1133        impl crate::upload_source::StreamingSource for Source {
1134            type Error = std::io::Error;
1135            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1136                None
1137            }
1138        }
1139        impl crate::upload_source::Seek for Source {
1140            type Error = std::io::Error;
1141            async fn seek(&mut self, _offset: u64) -> std::result::Result<(), Self::Error> {
1142                Ok(())
1143            }
1144        }
1145
1146        let client = Storage::builder()
1147            .with_credentials(auth::credentials::testing::test_credentials())
1148            .build()
1149            .await?;
1150        let _ = client.upload_object("projects/_/buckets/test-bucket", "test-object", Source);
1151        Ok(())
1152    }
1153
1154    // Verify `upload_object()` can be used with a source that **only** implements `StreamingSource`.
1155    #[tokio::test]
1156    async fn test_upload_only_streaming_source() -> Result {
1157        struct Source;
1158        impl crate::upload_source::StreamingSource for Source {
1159            type Error = std::io::Error;
1160            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1161                None
1162            }
1163        }
1164
1165        let client = Storage::builder()
1166            .with_credentials(auth::credentials::testing::test_credentials())
1167            .build()
1168            .await?;
1169        let _ = client.upload_object("projects/_/buckets/test-bucket", "test-object", Source);
1170        Ok(())
1171    }
1172
1173    // Verify `upload_object()` meets normal Send, Sync, requirements.
1174    #[tokio::test]
1175    async fn test_upload_is_send_and_static() -> Result {
1176        let client = Storage::builder()
1177            .with_credentials(auth::credentials::testing::test_credentials())
1178            .build()
1179            .await?;
1180
1181        fn need_send<T: Send>(_val: &T) {}
1182        fn need_sync<T: Sync>(_val: &T) {}
1183        fn need_static<T: 'static>(_val: &T) {}
1184
1185        let upload = client.upload_object("projects/_/buckets/test-bucket", "test-object", "");
1186        need_send(&upload);
1187        need_sync(&upload);
1188        need_static(&upload);
1189
1190        let upload = client
1191            .upload_object("projects/_/buckets/test-bucket", "test-object", "")
1192            .send_unbuffered();
1193        need_send(&upload);
1194        need_static(&upload);
1195
1196        let upload = client
1197            .upload_object("projects/_/buckets/test-bucket", "test-object", "")
1198            .send_buffered();
1199        need_send(&upload);
1200        need_static(&upload);
1201
1202        Ok(())
1203    }
1204
1205    #[test]
1206    fn upload_object_unbuffered_metadata() -> Result {
1207        use crate::model::ObjectAccessControl;
1208        let inner = test_inner_client(test_builder());
1209        let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1210            .with_if_generation_match(10)
1211            .with_if_generation_not_match(20)
1212            .with_if_metageneration_match(30)
1213            .with_if_metageneration_not_match(40)
1214            .with_predefined_acl("private")
1215            .with_acl([ObjectAccessControl::new()
1216                .set_entity("allAuthenticatedUsers")
1217                .set_role("READER")])
1218            .with_cache_control("public; max-age=7200")
1219            .with_content_disposition("inline")
1220            .with_content_encoding("gzip")
1221            .with_content_language("en")
1222            .with_content_type("text/plain")
1223            .with_known_crc32c(crc32c::crc32c(b""))
1224            .with_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1225            .with_event_based_hold(true)
1226            .with_known_md5_hash(md5::compute(b"").0)
1227            .with_metadata([("k0", "v0"), ("k1", "v1")])
1228            .with_retention(
1229                crate::model::object::Retention::new()
1230                    .set_mode(crate::model::object::retention::Mode::Locked)
1231                    .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
1232            )
1233            .with_storage_class("ARCHIVE")
1234            .with_temporary_hold(true)
1235            .with_kms_key("test-key");
1236
1237        let resource = request.spec.resource.take().unwrap();
1238        let request = request;
1239        assert_eq!(
1240            &request.spec,
1241            &WriteObjectSpec::new()
1242                .set_if_generation_match(10)
1243                .set_if_generation_not_match(20)
1244                .set_if_metageneration_match(30)
1245                .set_if_metageneration_not_match(40)
1246                .set_predefined_acl("private")
1247        );
1248
1249        assert_eq!(
1250            resource,
1251            Object::new()
1252                .set_name("object")
1253                .set_bucket("projects/_/buckets/bucket")
1254                .set_acl([ObjectAccessControl::new()
1255                    .set_entity("allAuthenticatedUsers")
1256                    .set_role("READER")])
1257                .set_cache_control("public; max-age=7200")
1258                .set_content_disposition("inline")
1259                .set_content_encoding("gzip")
1260                .set_content_language("en")
1261                .set_content_type("text/plain")
1262                .set_checksums(
1263                    crate::model::ObjectChecksums::new()
1264                        .set_crc32c(crc32c::crc32c(b""))
1265                        .set_md5_hash(bytes::Bytes::from_iter(md5::compute(b"").0))
1266                )
1267                .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1268                .set_event_based_hold(true)
1269                .set_metadata([("k0", "v0"), ("k1", "v1")])
1270                .set_retention(
1271                    crate::model::object::Retention::new()
1272                        .set_mode("LOCKED")
1273                        .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?)
1274                )
1275                .set_storage_class("ARCHIVE")
1276                .set_temporary_hold(true)
1277                .set_kms_key("test-key")
1278        );
1279
1280        Ok(())
1281    }
1282
1283    #[test]
1284    fn upload_object_options() {
1285        let inner = test_inner_client(
1286            test_builder()
1287                .with_resumable_upload_threshold(123_usize)
1288                .with_resumable_upload_buffer_size(234_usize),
1289        );
1290        let request = UploadObject::new(inner.clone(), "projects/_/buckets/bucket", "object", "");
1291        assert_eq!(request.options.resumable_upload_threshold, 123);
1292        assert_eq!(request.options.resumable_upload_buffer_size, 234);
1293
1294        let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1295            .with_resumable_upload_threshold(345_usize)
1296            .with_resumable_upload_buffer_size(456_usize);
1297        assert_eq!(request.options.resumable_upload_threshold, 345);
1298        assert_eq!(request.options.resumable_upload_buffer_size, 456);
1299    }
1300
1301    const QUICK: &str = "the quick brown fox jumps over the lazy dog";
1302    const VEXING: &str = "how vexingly quick daft zebras jump";
1303
1304    fn quick_checksum<E: ChecksumEngine>(mut engine: E) -> ObjectChecksums {
1305        engine.update(0, &bytes::Bytes::from_static(QUICK.as_bytes()));
1306        engine.finalize()
1307    }
1308
1309    async fn collect<S: StreamingSource>(mut stream: S) -> anyhow::Result<Vec<u8>> {
1310        let mut collected = Vec::new();
1311        while let Some(b) = stream.next().await.transpose()? {
1312            collected.extend_from_slice(&b);
1313        }
1314        Ok(collected)
1315    }
1316
1317    #[tokio::test]
1318    async fn checksum_default() -> Result {
1319        let client = test_builder().build().await?;
1320        let upload = client
1321            .upload_object("my-bucket", "my-object", QUICK)
1322            .precompute_checksums()
1323            .await?;
1324        let want = quick_checksum(Crc32c::default());
1325        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1326        let collected = collect(upload.payload).await?;
1327        assert_eq!(collected, QUICK.as_bytes());
1328        Ok(())
1329    }
1330
1331    #[tokio::test]
1332    async fn checksum_md5_and_crc32c() -> Result {
1333        let client = test_builder().build().await?;
1334        let upload = client
1335            .upload_object("my-bucket", "my-object", QUICK)
1336            .compute_md5()
1337            .precompute_checksums()
1338            .await?;
1339        let want = quick_checksum(Crc32c::from_inner(Md5::default()));
1340        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1341        Ok(())
1342    }
1343
1344    #[tokio::test]
1345    async fn checksum_precomputed() -> Result {
1346        let mut engine = Crc32c::from_inner(Md5::default());
1347        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1348        let ck = engine.finalize();
1349
1350        let client = test_builder().build().await?;
1351        let upload = client
1352            .upload_object("my-bucket", "my-object", QUICK)
1353            .with_known_crc32c(ck.crc32c.unwrap())
1354            .with_known_md5_hash(ck.md5_hash.clone())
1355            .precompute_checksums()
1356            .await?;
1357        // Note that the checksums do not match the data. This is intentional,
1358        // we are trying to verify that whatever is provided in with_crc32c()
1359        // and with_md5() is respected.
1360        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(ck));
1361
1362        Ok(())
1363    }
1364
1365    #[tokio::test]
1366    async fn checksum_crc32c_known_md5_computed() -> Result {
1367        let mut engine = Crc32c::from_inner(Md5::default());
1368        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1369        let ck = engine.finalize();
1370
1371        let client = test_builder().build().await?;
1372        let upload = client
1373            .upload_object("my-bucket", "my-object", QUICK)
1374            .compute_md5()
1375            .with_known_crc32c(ck.crc32c.unwrap())
1376            .precompute_checksums()
1377            .await?;
1378        // Note that the checksums do not match the data. This is intentional,
1379        // we are trying to verify that whatever is provided in with_known*()
1380        // is respected.
1381        let want = quick_checksum(Md5::default()).set_crc32c(ck.crc32c.unwrap());
1382        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1383
1384        Ok(())
1385    }
1386
1387    #[tokio::test]
1388    async fn checksum_mixed_then_precomputed() -> Result {
1389        let mut engine = Crc32c::from_inner(Md5::default());
1390        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1391        let ck = engine.finalize();
1392
1393        let client = test_builder().build().await?;
1394        let upload = client
1395            .upload_object("my-bucket", "my-object", QUICK)
1396            .with_known_md5_hash(ck.md5_hash.clone())
1397            .with_known_crc32c(ck.crc32c.unwrap())
1398            .precompute_checksums()
1399            .await?;
1400        // Note that the checksums do not match the data. This is intentional,
1401        // we are trying to verify that whatever is provided in with_known*()
1402        // is respected.
1403        let want = ck.clone();
1404        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1405
1406        Ok(())
1407    }
1408
1409    #[tokio::test]
1410    async fn checksum_full_computed_then_md5_precomputed() -> Result {
1411        let mut engine = Crc32c::from_inner(Md5::default());
1412        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1413        let ck = engine.finalize();
1414
1415        let client = test_builder().build().await?;
1416        let upload = client
1417            .upload_object("my-bucket", "my-object", QUICK)
1418            .compute_md5()
1419            .with_known_md5_hash(ck.md5_hash.clone())
1420            .precompute_checksums()
1421            .await?;
1422        // Note that the checksums do not match the data. This is intentional,
1423        // we are trying to verify that whatever is provided in with_known*()
1424        // is respected.
1425        let want = quick_checksum(Crc32c::default()).set_md5_hash(ck.md5_hash.clone());
1426        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1427
1428        Ok(())
1429    }
1430
1431    #[tokio::test]
1432    async fn checksum_known_crc32_then_computed_md5() -> Result {
1433        let mut engine = Crc32c::from_inner(Md5::default());
1434        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1435        let ck = engine.finalize();
1436
1437        let client = test_builder().build().await?;
1438        let upload = client
1439            .upload_object("my-bucket", "my-object", QUICK)
1440            .with_known_crc32c(ck.crc32c.unwrap())
1441            .compute_md5()
1442            .with_known_md5_hash(ck.md5_hash.clone())
1443            .precompute_checksums()
1444            .await?;
1445        // Note that the checksums do not match the data. This is intentional,
1446        // we are trying to verify that whatever is provided in with_known*()
1447        // is respected.
1448        let want = ck.clone();
1449        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1450
1451        Ok(())
1452    }
1453
1454    #[tokio::test]
1455    async fn checksum_known_crc32_then_known_md5() -> Result {
1456        let mut engine = Crc32c::from_inner(Md5::default());
1457        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1458        let ck = engine.finalize();
1459
1460        let client = test_builder().build().await?;
1461        let upload = client
1462            .upload_object("my-bucket", "my-object", QUICK)
1463            .with_known_crc32c(ck.crc32c.unwrap())
1464            .with_known_md5_hash(ck.md5_hash.clone())
1465            .precompute_checksums()
1466            .await?;
1467        // Note that the checksums do not match the data. This is intentional,
1468        // we are trying to verify that whatever is provided in with_known*()
1469        // is respected.
1470        let want = ck.clone();
1471        assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1472
1473        Ok(())
1474    }
1475
1476    #[tokio::test]
1477    async fn precompute_checksums_seek_error() -> Result {
1478        let mut source = MockSeekSource::new();
1479        source
1480            .expect_seek()
1481            .once()
1482            .returning(|_| Err(IoError::new(ErrorKind::Deadlock, "test-only")));
1483
1484        let client = test_builder().build().await?;
1485        let err = client
1486            .upload_object("my-bucket", "my-object", source)
1487            .precompute_checksums()
1488            .await
1489            .expect_err("seek() returns an error");
1490        assert!(err.is_serialization(), "{err:?}");
1491        assert!(
1492            err.source()
1493                .and_then(|e| e.downcast_ref::<IoError>())
1494                .is_some(),
1495            "{err:?}"
1496        );
1497
1498        Ok(())
1499    }
1500
1501    #[tokio::test]
1502    async fn precompute_checksums_next_error() -> Result {
1503        let mut source = MockSeekSource::new();
1504        source.expect_seek().returning(|_| Ok(()));
1505        let mut seq = mockall::Sequence::new();
1506        source
1507            .expect_next()
1508            .times(3)
1509            .in_sequence(&mut seq)
1510            .returning(|| Some(Ok(bytes::Bytes::new())));
1511        source
1512            .expect_next()
1513            .once()
1514            .in_sequence(&mut seq)
1515            .returning(|| Some(Err(IoError::new(ErrorKind::BrokenPipe, "test-only"))));
1516
1517        let client = test_builder().build().await?;
1518        let err = client
1519            .upload_object("my-bucket", "my-object", source)
1520            .precompute_checksums()
1521            .await
1522            .expect_err("seek() returns an error");
1523        assert!(err.is_serialization(), "{err:?}");
1524        assert!(
1525            err.source()
1526                .and_then(|e| e.downcast_ref::<IoError>())
1527                .is_some(),
1528            "{err:?}"
1529        );
1530
1531        Ok(())
1532    }
1533
1534    #[tokio::test]
1535    async fn debug() -> Result {
1536        let client = test_builder().build().await?;
1537        let upload = client
1538            .upload_object("my-bucket", "my-object", "")
1539            .precompute_checksums()
1540            .await;
1541
1542        let fmt = format!("{upload:?}");
1543        ["UploadObject", "inner", "spec", "options", "checksum"]
1544            .into_iter()
1545            .for_each(|text| {
1546                assert!(fmt.contains(text), "expected {text} in {fmt}");
1547            });
1548        Ok(())
1549    }
1550}