Skip to main content

google_cloud_storage/storage/
write_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Contains the request builder for [write_object()] and related types.
16//!
17//! [write_object()]: crate::storage::client::Storage::write_object()
18
19use super::streaming_source::{Seek, StreamingSource};
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::storage::checksum::details::update as checksum_update;
23use crate::storage::checksum::details::{Checksum, Md5};
24use crate::storage::request_options::RequestOptions;
25
26/// A request builder for object writes.
27///
28/// # Example: hello world
29/// ```
30/// use google_cloud_storage::client::Storage;
31/// async fn sample(client: &Storage) -> anyhow::Result<()> {
32///     let response = client
33///         .write_object("projects/_/buckets/my-bucket", "hello", "Hello World!")
34///         .send_unbuffered()
35///         .await?;
36///     println!("response details={response:?}");
37///     Ok(())
38/// }
39/// ```
40///
41/// # Example: upload a file
42/// ```
43/// use google_cloud_storage::client::Storage;
44/// async fn sample(client: &Storage) -> anyhow::Result<()> {
45///     let payload = tokio::fs::File::open("my-data").await?;
46///     let response = client
47///         .write_object("projects/_/buckets/my-bucket", "my-object", payload)
48///         .send_unbuffered()
49///         .await?;
50///     println!("response details={response:?}");
51///     Ok(())
52/// }
53/// ```
54///
55/// # Example: create a new object from a custom data source
56/// ```
57/// use google_cloud_storage::{client::Storage, streaming_source::StreamingSource};
58/// struct DataSource;
59/// impl StreamingSource for DataSource {
60///     type Error = std::io::Error;
61///     async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
62///         # panic!();
63///     }
64/// }
65///
66/// async fn sample(client: &Storage) -> anyhow::Result<()> {
67///     let response = client
68///         .write_object("projects/_/buckets/my-bucket", "my-object", DataSource)
69///         .send_buffered()
70///         .await?;
71///     println!("response details={response:?}");
72///     Ok(())
73/// }
74/// ```
75pub struct WriteObject<T, S = crate::storage::transport::Storage>
76where
77    S: crate::storage::stub::Storage + 'static,
78{
79    stub: std::sync::Arc<S>,
80    pub(crate) request: crate::model_ext::WriteObjectRequest,
81    pub(crate) payload: Payload<T>,
82    pub(crate) options: RequestOptions,
83}
84
85impl<T, S> WriteObject<T, S>
86where
87    S: crate::storage::stub::Storage + 'static,
88{
89    /// Set a [request precondition] on the object generation to match.
90    ///
91    /// With this precondition the request fails if the current object
92    /// generation matches the provided value. A common value is `0`, which
93    /// prevents writes from succeeding if the object already exists.
94    ///
95    /// # Example
96    /// ```
97    /// # use google_cloud_storage::client::Storage;
98    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
99    /// let response = client
100    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
101    ///     .set_if_generation_match(0)
102    ///     .send_buffered()
103    ///     .await?;
104    /// println!("response details={response:?}");
105    /// # Ok(()) }
106    /// ```
107    ///
108    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
109    pub fn set_if_generation_match<V>(mut self, v: V) -> Self
110    where
111        V: Into<i64>,
112    {
113        self.request.spec.if_generation_match = Some(v.into());
114        self
115    }
116
117    /// Set a [request precondition] on the object generation to match.
118    ///
119    /// With this precondition the request fails if the current object
120    /// generation does not match the provided value.
121    ///
122    /// # Example
123    /// ```
124    /// # use google_cloud_storage::client::Storage;
125    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
126    /// let response = client
127    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
128    ///     .set_if_generation_not_match(0)
129    ///     .send_buffered()
130    ///     .await?;
131    /// println!("response details={response:?}");
132    /// # Ok(()) }
133    /// ```
134    ///
135    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
136    pub fn set_if_generation_not_match<V>(mut self, v: V) -> Self
137    where
138        V: Into<i64>,
139    {
140        self.request.spec.if_generation_not_match = Some(v.into());
141        self
142    }
143
144    /// Set a [request precondition] on the object meta generation.
145    ///
146    /// With this precondition the request fails if the current object metadata
147    /// generation does not match the provided value. This may be useful to
148    /// prevent changes when the metageneration is known.
149    ///
150    /// # Example
151    /// ```
152    /// # use google_cloud_storage::client::Storage;
153    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
154    /// let response = client
155    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
156    ///     .set_if_metageneration_match(1234)
157    ///     .send_buffered()
158    ///     .await?;
159    /// println!("response details={response:?}");
160    /// # Ok(()) }
161    /// ```
162    ///
163    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
164    pub fn set_if_metageneration_match<V>(mut self, v: V) -> Self
165    where
166        V: Into<i64>,
167    {
168        self.request.spec.if_metageneration_match = Some(v.into());
169        self
170    }
171
172    /// Set a [request precondition] on the object meta-generation.
173    ///
174    /// With this precondition the request fails if the current object metadata
175    /// generation matches the provided value. This is rarely useful in uploads,
176    /// it is more commonly used on reads to prevent a large response if the
177    /// data is already cached.
178    ///
179    /// # Example
180    /// ```
181    /// # use google_cloud_storage::client::Storage;
182    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
183    /// let response = client
184    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
185    ///     .set_if_metageneration_not_match(1234)
186    ///     .send_buffered()
187    ///     .await?;
188    /// println!("response details={response:?}");
189    /// # Ok(()) }
190    /// ```
191    ///
192    /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
193    pub fn set_if_metageneration_not_match<V>(mut self, v: V) -> Self
194    where
195        V: Into<i64>,
196    {
197        self.request.spec.if_metageneration_not_match = Some(v.into());
198        self
199    }
200
201    /// Sets the ACL for the new object.
202    ///
203    /// # Example
204    /// ```
205    /// # use google_cloud_storage::client::Storage;
206    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
207    /// # use google_cloud_storage::model::ObjectAccessControl;
208    /// let response = client
209    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
210    ///     .set_acl([ObjectAccessControl::new().set_entity("allAuthenticatedUsers").set_role("READER")])
211    ///     .send_buffered()
212    ///     .await?;
213    /// println!("response details={response:?}");
214    /// # Ok(()) }
215    /// ```
216    pub fn set_acl<I, V>(mut self, v: I) -> Self
217    where
218        I: IntoIterator<Item = V>,
219        V: Into<crate::model::ObjectAccessControl>,
220    {
221        self.mut_resource().acl = v.into_iter().map(|a| a.into()).collect();
222        self
223    }
224
225    /// Sets the [cache control] for the new object.
226    ///
227    /// This can be used to control caching in [public objects].
228    ///
229    /// # Example
230    /// ```
231    /// # use google_cloud_storage::client::Storage;
232    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
233    /// let response = client
234    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
235    ///     .set_cache_control("public; max-age=7200")
236    ///     .send_buffered()
237    ///     .await?;
238    /// println!("response details={response:?}");
239    /// # Ok(()) }
240    /// ```
241    ///
242    /// [public objects]: https://cloud.google.com/storage/docs/access-control/making-data-public
243    /// [cache control]: https://datatracker.ietf.org/doc/html/rfc7234#section-5.2
244    pub fn set_cache_control<V: Into<String>>(mut self, v: V) -> Self {
245        self.mut_resource().cache_control = v.into();
246        self
247    }
248
249    /// Sets the [content disposition] for the new object.
250    ///
251    /// Google Cloud Storage can serve content directly to web browsers. This
252    /// attribute sets the `Content-Disposition` header, which may change how
253    /// the browser displays the contents.
254    ///
255    /// # Example
256    /// ```
257    /// # use google_cloud_storage::client::Storage;
258    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
259    /// let response = client
260    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
261    ///     .set_content_disposition("inline")
262    ///     .send_buffered()
263    ///     .await?;
264    /// println!("response details={response:?}");
265    /// # Ok(()) }
266    /// ```
267    ///
268    /// [content disposition]: https://datatracker.ietf.org/doc/html/rfc6266
269    pub fn set_content_disposition<V: Into<String>>(mut self, v: V) -> Self {
270        self.mut_resource().content_disposition = v.into();
271        self
272    }
273
274    /// Sets the [content encoding] for the object data.
275    ///
276    /// This can be used to upload compressed data and enable [transcoding] of
277    /// the data during reads.
278    ///
279    /// # Example
280    /// ```
281    /// # use google_cloud_storage::client::Storage;
282    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
283    /// use flate2::write::GzEncoder;
284    /// use std::io::Write;
285    /// let mut e = GzEncoder::new(Vec::new(), flate2::Compression::default());
286    /// e.write_all(b"hello world");
287    /// let response = client
288    ///     .write_object("projects/_/buckets/my-bucket", "my-object", bytes::Bytes::from_owner(e.finish()?))
289    ///     .set_content_encoding("gzip")
290    ///     .send_buffered()
291    ///     .await?;
292    /// println!("response details={response:?}");
293    /// # Ok(()) }
294    /// ```
295    ///
296    /// [transcoding]: https://cloud.google.com/storage/docs/transcoding
297    /// [content encoding]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.2
298    pub fn set_content_encoding<V: Into<String>>(mut self, v: V) -> Self {
299        self.mut_resource().content_encoding = v.into();
300        self
301    }
302
303    /// Sets the [content language] for the new object.
304    ///
305    /// Google Cloud Storage can serve content directly to web browsers. This
306    /// attribute sets the `Content-Language` header, which may change how the
307    /// browser displays the contents.
308    ///
309    /// # Example
310    /// ```
311    /// # use google_cloud_storage::client::Storage;
312    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
313    /// let response = client
314    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
315    ///     .set_content_language("en")
316    ///     .send_buffered()
317    ///     .await?;
318    /// println!("response details={response:?}");
319    /// # Ok(()) }
320    /// ```
321    ///
322    /// [content language]: https://cloud.google.com/storage/docs/metadata#content-language
323    pub fn set_content_language<V: Into<String>>(mut self, v: V) -> Self {
324        self.mut_resource().content_language = v.into();
325        self
326    }
327
328    /// Sets the [content type] for the new object.
329    ///
330    /// Google Cloud Storage can serve content directly to web browsers. This
331    /// attribute sets the `Content-Type` header, which may change how the
332    /// browser interprets the contents.
333    ///
334    /// # Example
335    /// ```
336    /// # use google_cloud_storage::client::Storage;
337    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
338    /// let response = client
339    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
340    ///     .set_content_type("text/plain")
341    ///     .send_buffered()
342    ///     .await?;
343    /// println!("response details={response:?}");
344    /// # Ok(()) }
345    /// ```
346    ///
347    /// [content type]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.5
348    pub fn set_content_type<V: Into<String>>(mut self, v: V) -> Self {
349        self.mut_resource().content_type = v.into();
350        self
351    }
352
353    /// Sets the [custom time] for the new object.
354    ///
355    /// This field is typically set in order to use the [DaysSinceCustomTime]
356    /// condition in Object Lifecycle Management.
357    ///
358    /// # Example
359    /// ```
360    /// # use google_cloud_storage::client::Storage;
361    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
362    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
363    /// let response = client
364    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
365    ///     .set_custom_time(time)
366    ///     .send_buffered()
367    ///     .await?;
368    /// println!("response details={response:?}");
369    /// # Ok(()) }
370    /// ```
371    ///
372    /// [DaysSinceCustomTime]: https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
373    /// [custom time]: https://cloud.google.com/storage/docs/metadata#custom-time
374    pub fn set_custom_time<V: Into<wkt::Timestamp>>(mut self, v: V) -> Self {
375        self.mut_resource().custom_time = Some(v.into());
376        self
377    }
378
379    /// Sets the [event based hold] flag for the new object.
380    ///
381    /// This field is typically set in order to prevent objects from being
382    /// deleted or modified.
383    ///
384    /// # Example
385    /// ```
386    /// # use google_cloud_storage::client::Storage;
387    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
388    /// let response = client
389    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
390    ///     .set_event_based_hold(true)
391    ///     .send_buffered()
392    ///     .await?;
393    /// println!("response details={response:?}");
394    /// # Ok(()) }
395    /// ```
396    ///
397    /// [event based hold]: https://cloud.google.com/storage/docs/object-holds
398    pub fn set_event_based_hold<V: Into<bool>>(mut self, v: V) -> Self {
399        self.mut_resource().event_based_hold = Some(v.into());
400        self
401    }
402
403    /// Sets the [custom metadata] for the new object.
404    ///
405    /// This field is typically set to annotate the object with
406    /// application-specific metadata.
407    ///
408    /// # Example
409    /// ```
410    /// # use google_cloud_storage::client::Storage;
411    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
412    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
413    /// let response = client
414    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
415    ///     .set_metadata([("test-only", "true"), ("environment", "qa")])
416    ///     .send_buffered()
417    ///     .await?;
418    /// println!("response details={response:?}");
419    /// # Ok(()) }
420    /// ```
421    ///
422    /// [custom metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
423    pub fn set_metadata<I, K, V>(mut self, i: I) -> Self
424    where
425        I: IntoIterator<Item = (K, V)>,
426        K: Into<String>,
427        V: Into<String>,
428    {
429        self.mut_resource().metadata = i.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
430        self
431    }
432
433    /// Sets the [retention configuration] for the new object.
434    ///
435    /// # Example
436    /// ```
437    /// # use google_cloud_storage::client::Storage;
438    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
439    /// # use google_cloud_storage::model::object::{Retention, retention};
440    /// let response = client
441    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
442    ///     .set_retention(
443    ///         Retention::new()
444    ///             .set_mode(retention::Mode::Locked)
445    ///             .set_retain_until_time(wkt::Timestamp::try_from("2035-01-01T00:00:00Z")?))
446    ///     .send_buffered()
447    ///     .await?;
448    /// println!("response details={response:?}");
449    /// # Ok(()) }
450    /// ```
451    ///
452    /// [retention configuration]: https://cloud.google.com/storage/docs/metadata#retention-config
453    pub fn set_retention<V>(mut self, v: V) -> Self
454    where
455        V: Into<crate::model::object::Retention>,
456    {
457        self.mut_resource().retention = Some(v.into());
458        self
459    }
460
461    /// Sets the [storage class] for the new object.
462    ///
463    /// # Example
464    /// ```
465    /// # use google_cloud_storage::client::Storage;
466    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
467    /// let response = client
468    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
469    ///     .set_storage_class("ARCHIVE")
470    ///     .send_buffered()
471    ///     .await?;
472    /// println!("response details={response:?}");
473    /// # Ok(()) }
474    /// ```
475    ///
476    /// [storage class]: https://cloud.google.com/storage/docs/storage-classes
477    pub fn set_storage_class<V>(mut self, v: V) -> Self
478    where
479        V: Into<String>,
480    {
481        self.mut_resource().storage_class = v.into();
482        self
483    }
484
485    /// Sets the [temporary hold] flag for the new object.
486    ///
487    /// This field is typically set in order to prevent objects from being
488    /// deleted or modified.
489    ///
490    /// # Example
491    /// ```
492    /// # use google_cloud_storage::client::Storage;
493    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
494    /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
495    /// let response = client
496    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
497    ///     .set_temporary_hold(true)
498    ///     .send_buffered()
499    ///     .await?;
500    /// println!("response details={response:?}");
501    /// # Ok(()) }
502    /// ```
503    ///
504    /// [temporary hold]: https://cloud.google.com/storage/docs/object-holds
505    pub fn set_temporary_hold<V: Into<bool>>(mut self, v: V) -> Self {
506        self.mut_resource().temporary_hold = v.into();
507        self
508    }
509
510    /// Sets the resource name of the [Customer-managed encryption key] for this
511    /// object.
512    ///
513    /// The service imposes a number of restrictions on the keys used to encrypt
514    /// Google Cloud Storage objects. Read the documentation in full before
515    /// trying to use customer-managed encryption keys. In particular, verify
516    /// the service has the necessary permissions, and the key is in a
517    /// compatible location.
518    ///
519    /// # Example
520    /// ```
521    /// # use google_cloud_storage::client::Storage;
522    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
523    /// let response = client
524    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
525    ///     .set_kms_key("projects/test-project/locations/us-central1/keyRings/test-ring/cryptoKeys/test-key")
526    ///     .send_buffered()
527    ///     .await?;
528    /// println!("response details={response:?}");
529    /// # Ok(()) }
530    /// ```
531    ///
532    /// [Customer-managed encryption key]: https://cloud.google.com/storage/docs/encryption/customer-managed-keys
533    pub fn set_kms_key<V>(mut self, v: V) -> Self
534    where
535        V: Into<String>,
536    {
537        self.mut_resource().kms_key = v.into();
538        self
539    }
540
541    /// Configure this object to use one of the [predefined ACLs].
542    ///
543    /// # Example
544    /// ```
545    /// # use google_cloud_storage::client::Storage;
546    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
547    /// let response = client
548    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
549    ///     .set_predefined_acl("private")
550    ///     .send_buffered()
551    ///     .await?;
552    /// println!("response details={response:?}");
553    /// # Ok(()) }
554    /// ```
555    ///
556    /// [predefined ACLs]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
557    pub fn set_predefined_acl<V>(mut self, v: V) -> Self
558    where
559        V: Into<String>,
560    {
561        self.request.spec.predefined_acl = v.into();
562        self
563    }
564
565    /// The encryption key used with the Customer-Supplied Encryption Keys
566    /// feature. In raw bytes format (not base64-encoded).
567    ///
568    /// # Example
569    /// ```
570    /// # use google_cloud_storage::client::Storage;
571    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
572    /// # use google_cloud_storage::model_ext::KeyAes256;
573    /// let key: &[u8] = &[97; 32];
574    /// let response = client
575    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
576    ///     .set_key(KeyAes256::new(key)?)
577    ///     .send_buffered()
578    ///     .await?;
579    /// println!("response details={response:?}");
580    /// # Ok(()) }
581    /// ```
582    pub fn set_key(mut self, v: KeyAes256) -> Self {
583        self.request.params = Some(v.into());
584        self
585    }
586
587    /// Sets the object custom contexts.
588    ///
589    /// # Example
590    /// ```
591    /// # use google_cloud_storage::client::Storage;
592    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
593    /// # use google_cloud_storage::model::{ObjectContexts, ObjectCustomContextPayload};
594    /// let response = client
595    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
596    ///     .set_contexts(
597    ///         ObjectContexts::new().set_custom([
598    ///             ("example", ObjectCustomContextPayload::new().set_value("true")),
599    ///         ])
600    ///     )
601    ///     .send_buffered()
602    ///     .await?;
603    /// println!("response details={response:?}");
604    /// # Ok(()) }
605    /// ```
606    pub fn set_contexts<V>(mut self, v: V) -> Self
607    where
608        V: Into<crate::model::ObjectContexts>,
609    {
610        self.mut_resource().contexts = Some(v.into());
611        self
612    }
613
614    /// Configure the idempotency for this upload.
615    ///
616    /// By default, the client library treats single-shot uploads without
617    /// preconditions, as non-idempotent. If the destination bucket is
618    /// configured with [object versioning] then the operation may succeed
619    /// multiple times with observable side-effects. With object versioning and
620    /// a [lifecycle] policy limiting the number of versions, uploading the same
621    /// data multiple times may result in data loss.
622    ///
623    /// The client library cannot efficiently determine if these conditions
624    /// apply to your upload. If they do, or your application can tolerate
625    /// multiple versions of the same data for other reasons, consider using
626    /// `with_idempotency(true)`.
627    ///
628    /// The client library treats resumable uploads as idempotent, regardless of
629    /// the value in this option. Such uploads can succeed at most once.
630    ///
631    /// # Example
632    /// ```
633    /// # use google_cloud_storage::client::Storage;
634    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
635    /// use std::time::Duration;
636    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
637    /// let response = client
638    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
639    ///     .with_idempotency(true)
640    ///     .send_buffered()
641    ///     .await?;
642    /// println!("response details={response:?}");
643    /// # Ok(()) }
644    /// ```
645    ///
646    /// [lifecycle]: https://cloud.google.com/storage/docs/lifecycle
647    /// [object versioning]: https://cloud.google.com/storage/docs/object-versioning
648    pub fn with_idempotency(mut self, v: bool) -> Self {
649        self.options.idempotency = Some(v);
650        self
651    }
652
653    /// The retry policy used for this request.
654    ///
655    /// # Example
656    /// ```
657    /// # use google_cloud_storage::client::Storage;
658    /// # use google_cloud_storage::retry_policy::RetryableErrors;
659    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
660    /// use std::time::Duration;
661    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
662    /// let response = client
663    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
664    ///     .with_retry_policy(
665    ///         RetryableErrors
666    ///             .with_attempt_limit(5)
667    ///             .with_time_limit(Duration::from_secs(90)),
668    ///     )
669    ///     .send_buffered()
670    ///     .await?;
671    /// println!("response details={response:?}");
672    /// # Ok(()) }
673    /// ```
674    pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
675        mut self,
676        v: V,
677    ) -> Self {
678        self.options.retry_policy = v.into().into();
679        self
680    }
681
682    /// The backoff policy used for this request.
683    ///
684    /// # Example
685    /// ```
686    /// # use google_cloud_storage::client::Storage;
687    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
688    /// use std::time::Duration;
689    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
690    /// let response = client
691    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
692    ///     .with_backoff_policy(ExponentialBackoff::default())
693    ///     .send_buffered()
694    ///     .await?;
695    /// println!("response details={response:?}");
696    /// # Ok(()) }
697    /// ```
698    pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
699        mut self,
700        v: V,
701    ) -> Self {
702        self.options.backoff_policy = v.into().into();
703        self
704    }
705
706    /// The retry throttler used for this request.
707    ///
708    /// Most of the time you want to use the same throttler for all the requests
709    /// in a client, and even the same throttler for many clients. Rarely it
710    /// may be necessary to use an custom throttler for some subset of the
711    /// requests.
712    ///
713    /// # Example
714    /// ```
715    /// # use google_cloud_storage::client::Storage;
716    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
717    /// let response = client
718    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
719    ///     .with_retry_throttler(adhoc_throttler())
720    ///     .send_buffered()
721    ///     .await?;
722    /// println!("response details={response:?}");
723    /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
724    ///     # panic!();
725    /// }
726    /// # Ok(()) }
727    /// ```
728    pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
729        mut self,
730        v: V,
731    ) -> Self {
732        self.options.retry_throttler = v.into().into();
733        self
734    }
735
736    /// Sets the payload size threshold to switch from single-shot to resumable uploads.
737    ///
738    /// # Example
739    /// ```
740    /// # use google_cloud_storage::client::Storage;
741    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
742    /// let response = client
743    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
744    ///     .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
745    ///     .send_buffered()
746    ///     .await?;
747    /// println!("response details={response:?}");
748    /// # Ok(()) }
749    /// ```
750    ///
751    /// The client library can perform uploads using [single-shot] or
752    /// [resumable] uploads. For small objects, single-shot uploads offer better
753    /// performance, as they require a single HTTP transfer. For larger objects,
754    /// the additional request latency is not significant, and resumable uploads
755    /// offer better recovery on errors.
756    ///
757    /// The library automatically selects resumable uploads when the payload is
758    /// equal to or larger than this option. For smaller uploads the client
759    /// library uses single-shot uploads.
760    ///
761    /// The exact threshold depends on where the application is deployed and
762    /// destination bucket location with respect to where the application is
763    /// running. The library defaults should work well in most cases, but some
764    /// applications may benefit from fine-tuning.
765    ///
766    /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
767    /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
768    pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
769        self.options.set_resumable_upload_threshold(v.into());
770        self
771    }
772
773    /// Changes the buffer size for some resumable uploads.
774    ///
775    /// # Example
776    /// ```
777    /// # use google_cloud_storage::client::Storage;
778    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
779    /// let response = client
780    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
781    ///     .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
782    ///     .send_buffered()
783    ///     .await?;
784    /// println!("response details={response:?}");
785    /// # Ok(()) }
786    /// ```
787    ///
788    /// When performing [resumable uploads] from sources without [Seek] the
789    /// client library needs to buffer data in memory until it is persisted by
790    /// the service. Otherwise the data would be lost if the upload fails.
791    /// Applications may want to tune this buffer size:
792    ///
793    /// - Use smaller buffer sizes to support more concurrent uploads in the
794    ///   same application.
795    /// - Use larger buffer sizes for better throughput. Sending many small
796    ///   buffers stalls the upload until the client receives a successful
797    ///   response from the service.
798    ///
799    /// Keep in mind that there are diminishing returns on using larger buffers.
800    ///
801    /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
802    /// [Seek]: crate::streaming_source::Seek
803    pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
804        self.options.set_resumable_upload_buffer_size(v.into());
805        self
806    }
807
808    /// Sets the `User-Agent` header for this request.
809    ///
810    /// # Example
811    /// ```
812    /// # use google_cloud_storage::client::Storage;
813    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
814    /// let mut response = client
815    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
816    ///     .with_user_agent("my-app/1.0.0")
817    ///     .send_buffered()
818    ///     .await?;
819    /// println!("response details={response:?}");
820    /// # Ok(()) }
821    /// ```
822    pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
823        self.options.user_agent = Some(user_agent.into());
824        self
825    }
826
827    /// Sets the project that will be billed for this request.
828    ///
829    /// Required for [Requester Pays] buckets. The value overrides any
830    /// `quota_project_id` configured on the credentials; the credential-level
831    /// header is suppressed for this RPC.
832    ///
833    /// # Example
834    /// ```
835    /// # use google_cloud_storage::client::Storage;
836    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
837    /// let response = client
838    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello")
839    ///     .with_quota_project("my-billing-project")
840    ///     .send_buffered()
841    ///     .await?;
842    /// # Ok(()) }
843    /// ```
844    ///
845    /// [Requester Pays]: https://cloud.google.com/storage/docs/requester-pays
846    pub fn with_quota_project(mut self, project: impl Into<String>) -> Self {
847        self.options.set_quota_project(project);
848        self
849    }
850
851    fn mut_resource(&mut self) -> &mut crate::model::Object {
852        self.request
853            .spec
854            .resource
855            .as_mut()
856            .expect("resource field initialized in `new()`")
857    }
858
859    fn set_crc32c<V: Into<u32>>(mut self, v: V) -> Self {
860        let checksum = self.mut_resource().checksums.get_or_insert_default();
861        checksum.crc32c = Some(v.into());
862        self
863    }
864
865    /// Sets the MD5 hash for the object being written.
866    pub fn set_md5_hash<I, V>(mut self, i: I) -> Self
867    where
868        I: IntoIterator<Item = V>,
869        V: Into<u8>,
870    {
871        let checksum = self.mut_resource().checksums.get_or_insert_default();
872        checksum.md5_hash = i.into_iter().map(|v| v.into()).collect();
873        self
874    }
875
876    /// Provide a precomputed value for the CRC32C checksum.
877    ///
878    /// # Example
879    /// ```
880    /// # use google_cloud_storage::client::Storage;
881    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
882    /// use crc32c::crc32c;
883    /// let response = client
884    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
885    ///     .with_known_crc32c(crc32c(b"hello world"))
886    ///     .send_buffered()
887    ///     .await?;
888    /// println!("response details={response:?}");
889    /// # Ok(()) }
890    /// ```
891    ///
892    /// In some applications, the payload's CRC32C checksum is already known.
893    /// For example, the application may be reading the data from another blob
894    /// storage system.
895    ///
896    /// In such cases, it is safer to pass the known CRC32C of the payload to
897    /// [Cloud Storage], and more efficient to skip the computation in the
898    /// client library.
899    ///
900    /// Note that once you provide a CRC32C value to this builder you cannot
901    /// use [compute_md5()] to also have the library compute the checksums.
902    ///
903    /// [compute_md5()]: WriteObject::compute_md5
904    pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> Self {
905        let mut this = self;
906        this.options.checksum.crc32c = None;
907        this.set_crc32c(v)
908    }
909
910    /// Provide a precomputed value for the MD5 hash.
911    ///
912    /// # Example
913    /// ```
914    /// # use google_cloud_storage::client::Storage;
915    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
916    /// use md5::compute;
917    /// let hash = md5::compute(b"hello world");
918    /// let response = client
919    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
920    ///     .with_known_md5_hash(bytes::Bytes::from_owner(hash.0))
921    ///     .send_buffered()
922    ///     .await?;
923    /// println!("response details={response:?}");
924    /// # Ok(()) }
925    /// ```
926    ///
927    /// In some applications, the payload's MD5 hash is already known. For
928    /// example, the application may be reading the data from another blob
929    /// storage system.
930    ///
931    /// In such cases, it is safer to pass the known MD5 of the payload to
932    /// [Cloud Storage], and more efficient to skip the computation in the
933    /// client library.
934    ///
935    /// Note that once you provide a MD5 value to this builder you cannot
936    /// use [compute_md5()] to also have the library compute the checksums.
937    ///
938    /// [compute_md5()]: WriteObject::compute_md5
939    pub fn with_known_md5_hash<I, V>(self, i: I) -> Self
940    where
941        I: IntoIterator<Item = V>,
942        V: Into<u8>,
943    {
944        let mut this = self;
945        this.options.checksum.md5_hash = None;
946        this.set_md5_hash(i)
947    }
948
949    /// Enables computation of MD5 hashes.
950    ///
951    /// # Example
952    /// ```
953    /// # use google_cloud_storage::client::Storage;
954    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
955    /// let payload = tokio::fs::File::open("my-data").await?;
956    /// let response = client
957    ///     .write_object("projects/_/buckets/my-bucket", "my-object", payload)
958    ///     .compute_md5()
959    ///     .send_buffered()
960    ///     .await?;
961    /// println!("response details={response:?}");
962    /// # Ok(()) }
963    /// ```
964    ///
965    /// See [precompute_checksums][WriteObject::precompute_checksums] for more
966    /// details on how checksums are used by the client library and their
967    /// limitations.
968    pub fn compute_md5(self) -> Self {
969        let mut this = self;
970        this.options.checksum.md5_hash = Some(Md5::default());
971        this
972    }
973
974    pub(crate) fn new<B, O, P>(
975        stub: std::sync::Arc<S>,
976        bucket: B,
977        object: O,
978        payload: P,
979        options: RequestOptions,
980    ) -> Self
981    where
982        B: Into<String>,
983        O: Into<String>,
984        P: Into<Payload<T>>,
985    {
986        let resource = crate::model::Object::new()
987            .set_bucket(bucket)
988            .set_name(object);
989        WriteObject {
990            stub,
991            request: crate::model_ext::WriteObjectRequest {
992                spec: crate::model::WriteObjectSpec::new().set_resource(resource),
993                params: None,
994            },
995            payload: payload.into(),
996            options,
997        }
998    }
999}
1000
1001impl<T, S> WriteObject<T, S>
1002where
1003    T: StreamingSource + Seek + Send + Sync + 'static,
1004    <T as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
1005    <T as Seek>::Error: std::error::Error + Send + Sync + 'static,
1006    S: crate::storage::stub::Storage + 'static,
1007{
1008    /// A simple upload from a buffer.
1009    ///
1010    /// # Example
1011    /// ```
1012    /// # use google_cloud_storage::client::Storage;
1013    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1014    /// let response = client
1015    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1016    ///     .send_unbuffered()
1017    ///     .await?;
1018    /// println!("response details={response:?}");
1019    /// # Ok(()) }
1020    /// ```
1021    pub async fn send_unbuffered(self) -> Result<Object> {
1022        self.stub
1023            .write_object_unbuffered(self.payload, self.request, self.options)
1024            .await
1025    }
1026
1027    /// Precompute the payload checksums before uploading the data.
1028    ///
1029    /// If the checksums are known when the upload starts, the client library
1030    /// can include the checksums with the upload request, and the service can
1031    /// reject the upload if the payload and the checksums do not match.
1032    ///
1033    /// # Example
1034    /// ```
1035    /// # use google_cloud_storage::client::Storage;
1036    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1037    /// let payload = tokio::fs::File::open("my-data").await?;
1038    /// let response = client
1039    ///     .write_object("projects/_/buckets/my-bucket", "my-object", payload)
1040    ///     .precompute_checksums()
1041    ///     .await?
1042    ///     .send_unbuffered()
1043    ///     .await?;
1044    /// println!("response details={response:?}");
1045    /// # Ok(()) }
1046    /// ```
1047    ///
1048    /// Precomputing the checksums can be expensive if the data source is slow
1049    /// to read. Therefore, the client library does not precompute the checksums
1050    /// by default. The client library compares the checksums computed by the
1051    /// service against its own checksums. If they do not match, the client
1052    /// library returns an error. However, the service has already created the
1053    /// object with the (likely incorrect) data.
1054    ///
1055    /// The client library currently uses the [JSON API], it is not possible to
1056    /// send the checksums at the end of the upload with this API.
1057    ///
1058    /// [JSON API]: https://cloud.google.com/storage/docs/json_api
1059    pub async fn precompute_checksums(mut self) -> Result<Self> {
1060        let mut offset = 0_u64;
1061        self.payload.seek(offset).await.map_err(Error::ser)?;
1062        while let Some(n) = self.payload.next().await.transpose().map_err(Error::ser)? {
1063            self.options.checksum.update(offset, &n);
1064            offset += n.len() as u64;
1065        }
1066        self.payload.seek(0_u64).await.map_err(Error::ser)?;
1067        let computed = self.options.checksum.finalize();
1068        let current = self.mut_resource().checksums.get_or_insert_default();
1069        checksum_update(current, computed);
1070        self.options.checksum = Checksum {
1071            crc32c: None,
1072            md5_hash: None,
1073        };
1074        Ok(self)
1075    }
1076}
1077
1078impl<T, S> WriteObject<T, S>
1079where
1080    T: StreamingSource + Send + Sync + 'static,
1081    T::Error: std::error::Error + Send + Sync + 'static,
1082    S: crate::storage::stub::Storage + 'static,
1083{
1084    /// Upload an object from a streaming source without rewinds.
1085    ///
1086    /// If the data source does **not** implement [Seek] the client library must
1087    /// buffer data sent to the service until the service confirms it has
1088    /// persisted the data. This requires more memory in the client, and when
1089    /// the buffer grows too large, may require stalling the writer until the
1090    /// service can persist the data.
1091    ///
1092    /// Use this function for data sources where it is expensive or impossible
1093    /// to restart the data source. This function is also useful when it is hard
1094    /// or impossible to predict the number of bytes emitted by a stream, even
1095    /// if restarting the stream is not too expensive.
1096    ///
1097    /// # Example
1098    /// ```
1099    /// # use google_cloud_storage::client::Storage;
1100    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1101    /// let response = client
1102    ///     .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1103    ///     .send_buffered()
1104    ///     .await?;
1105    /// println!("response details={response:?}");
1106    /// # Ok(()) }
1107    /// ```
1108    pub async fn send_buffered(self) -> crate::Result<Object> {
1109        self.stub
1110            .write_object_buffered(self.payload, self.request, self.options)
1111            .await
1112    }
1113}
1114
1115// We need `Debug` to use `expect_err()` in `Result<WriteObject, ...>`.
1116impl<T, S> std::fmt::Debug for WriteObject<T, S>
1117where
1118    S: crate::storage::stub::Storage + 'static,
1119{
1120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1121        f.debug_struct("WriteObject")
1122            .field("stub", &self.stub)
1123            .field("request", &self.request)
1124            // skip payload, as it is not `Debug`
1125            .field("options", &self.options)
1126            .finish()
1127    }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use super::client::tests::{test_builder, test_inner_client};
1133    use super::*;
1134    use crate::client::Storage;
1135    use crate::model::{
1136        CommonObjectRequestParams, ObjectChecksums, ObjectContexts, ObjectCustomContextPayload,
1137        WriteObjectSpec,
1138    };
1139    use crate::storage::checksum::details::{Crc32c, Md5};
1140    use crate::streaming_source::tests::MockSeekSource;
1141    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
1142    use httptest::{Expectation, Server, matchers::*, responders::status_code};
1143    use std::error::Error as _;
1144    use std::io::{Error as IoError, ErrorKind};
1145
1146    type Result = anyhow::Result<()>;
1147
1148    // Verify `write_object()` can be used with a source that implements
1149    // `StreamingSource` **and** `Seek`
1150    #[tokio::test]
1151    async fn test_upload_streaming_source_and_seek() -> Result {
1152        struct Source;
1153        impl crate::streaming_source::StreamingSource for Source {
1154            type Error = std::io::Error;
1155            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1156                None
1157            }
1158        }
1159        impl crate::streaming_source::Seek for Source {
1160            type Error = std::io::Error;
1161            async fn seek(&mut self, _offset: u64) -> std::result::Result<(), Self::Error> {
1162                Ok(())
1163            }
1164        }
1165
1166        let client = Storage::builder()
1167            .with_credentials(Anonymous::new().build())
1168            .build()
1169            .await?;
1170        let _ = client.write_object("projects/_/buckets/test-bucket", "test-object", Source);
1171        Ok(())
1172    }
1173
1174    // Verify `write_object()` can be used with a source that **only**
1175    // implements `StreamingSource`.
1176    #[tokio::test]
1177    async fn test_upload_only_streaming_source() -> Result {
1178        struct Source;
1179        impl crate::streaming_source::StreamingSource for Source {
1180            type Error = std::io::Error;
1181            async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1182                None
1183            }
1184        }
1185
1186        let client = Storage::builder()
1187            .with_credentials(Anonymous::new().build())
1188            .build()
1189            .await?;
1190        let _ = client.write_object("projects/_/buckets/test-bucket", "test-object", Source);
1191        Ok(())
1192    }
1193
1194    // Verify `write_object()` meets normal Send, Sync, requirements.
1195    #[tokio::test]
1196    async fn test_upload_is_send_and_static() -> Result {
1197        let client = Storage::builder()
1198            .with_credentials(Anonymous::new().build())
1199            .build()
1200            .await?;
1201
1202        fn need_send<T: Send>(_val: &T) {}
1203        fn need_sync<T: Sync>(_val: &T) {}
1204        fn need_static<T: 'static>(_val: &T) {}
1205
1206        let upload = client.write_object("projects/_/buckets/test-bucket", "test-object", "");
1207        need_send(&upload);
1208        need_sync(&upload);
1209        need_static(&upload);
1210
1211        let upload = client
1212            .write_object("projects/_/buckets/test-bucket", "test-object", "")
1213            .send_unbuffered();
1214        need_send(&upload);
1215        need_static(&upload);
1216
1217        let upload = client
1218            .write_object("projects/_/buckets/test-bucket", "test-object", "")
1219            .send_buffered();
1220        need_send(&upload);
1221        need_static(&upload);
1222
1223        Ok(())
1224    }
1225
1226    #[tokio::test]
1227    async fn write_object_metadata() -> Result {
1228        use crate::model::ObjectAccessControl;
1229        let inner = test_inner_client(test_builder()).await;
1230        let options = inner.options.clone();
1231        let stub = crate::storage::transport::Storage::new_test(inner);
1232        let key = KeyAes256::new(&[0x42; 32]).expect("hard-coded key is not an error");
1233        let mut builder =
1234            WriteObject::new(stub, "projects/_/buckets/bucket", "object", "", options)
1235                .set_if_generation_match(10)
1236                .set_if_generation_not_match(20)
1237                .set_if_metageneration_match(30)
1238                .set_if_metageneration_not_match(40)
1239                .set_predefined_acl("private")
1240                .set_acl([ObjectAccessControl::new()
1241                    .set_entity("allAuthenticatedUsers")
1242                    .set_role("READER")])
1243                .set_cache_control("public; max-age=7200")
1244                .set_content_disposition("inline")
1245                .set_content_encoding("gzip")
1246                .set_content_language("en")
1247                .set_content_type("text/plain")
1248                .set_contexts(ObjectContexts::new().set_custom([(
1249                    "context-key",
1250                    ObjectCustomContextPayload::new().set_value("context-value"),
1251                )]))
1252                .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1253                .set_event_based_hold(true)
1254                .set_key(key.clone())
1255                .set_metadata([("k0", "v0"), ("k1", "v1")])
1256                .set_retention(
1257                    crate::model::object::Retention::new()
1258                        .set_mode(crate::model::object::retention::Mode::Locked)
1259                        .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
1260                )
1261                .set_storage_class("ARCHIVE")
1262                .set_temporary_hold(true)
1263                .set_kms_key("test-key")
1264                .with_known_crc32c(crc32c::crc32c(b""))
1265                .with_known_md5_hash(md5::compute(b"").0);
1266
1267        let resource = builder.request.spec.resource.take().unwrap();
1268        let builder = builder;
1269        assert_eq!(
1270            &builder.request.spec,
1271            &WriteObjectSpec::new()
1272                .set_if_generation_match(10)
1273                .set_if_generation_not_match(20)
1274                .set_if_metageneration_match(30)
1275                .set_if_metageneration_not_match(40)
1276                .set_predefined_acl("private")
1277        );
1278
1279        assert_eq!(
1280            &builder.request.params,
1281            &Some(CommonObjectRequestParams::from(key))
1282        );
1283
1284        assert_eq!(
1285            resource,
1286            Object::new()
1287                .set_name("object")
1288                .set_bucket("projects/_/buckets/bucket")
1289                .set_acl([ObjectAccessControl::new()
1290                    .set_entity("allAuthenticatedUsers")
1291                    .set_role("READER")])
1292                .set_cache_control("public; max-age=7200")
1293                .set_content_disposition("inline")
1294                .set_content_encoding("gzip")
1295                .set_content_language("en")
1296                .set_content_type("text/plain")
1297                .set_contexts(ObjectContexts::new().set_custom([(
1298                    "context-key",
1299                    ObjectCustomContextPayload::new().set_value("context-value"),
1300                )]))
1301                .set_checksums(
1302                    crate::model::ObjectChecksums::new()
1303                        .set_crc32c(crc32c::crc32c(b""))
1304                        .set_md5_hash(bytes::Bytes::from_iter(md5::compute(b"").0))
1305                )
1306                .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1307                .set_event_based_hold(true)
1308                .set_metadata([("k0", "v0"), ("k1", "v1")])
1309                .set_retention(
1310                    crate::model::object::Retention::new()
1311                        .set_mode("LOCKED")
1312                        .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?)
1313                )
1314                .set_storage_class("ARCHIVE")
1315                .set_temporary_hold(true)
1316                .set_kms_key("test-key")
1317        );
1318
1319        Ok(())
1320    }
1321
1322    #[tokio::test]
1323    async fn upload_object_options() {
1324        let inner = test_inner_client(
1325            test_builder()
1326                .with_resumable_upload_threshold(123_usize)
1327                .with_resumable_upload_buffer_size(234_usize),
1328        )
1329        .await;
1330        let options = inner.options.clone();
1331        let stub = crate::storage::transport::Storage::new_test(inner);
1332        let request = WriteObject::new(
1333            stub.clone(),
1334            "projects/_/buckets/bucket",
1335            "object",
1336            "",
1337            options.clone(),
1338        );
1339        assert_eq!(request.options.resumable_upload_threshold(), 123);
1340        assert_eq!(request.options.resumable_upload_buffer_size(), 234);
1341        assert_eq!(request.options.user_agent, None);
1342
1343        let user_agent = "quick_foxes_lazy_dogs/1.0.0";
1344        let request = WriteObject::new(stub, "projects/_/buckets/bucket", "object", "", options)
1345            .with_resumable_upload_threshold(345_usize)
1346            .with_resumable_upload_buffer_size(456_usize)
1347            .with_user_agent(user_agent);
1348        assert_eq!(request.options.resumable_upload_threshold(), 345);
1349        assert_eq!(request.options.resumable_upload_buffer_size(), 456);
1350        assert_eq!(request.options.user_agent.as_deref(), Some(user_agent));
1351    }
1352
1353    const QUICK: &str = "the quick brown fox jumps over the lazy dog";
1354    const VEXING: &str = "how vexingly quick daft zebras jump";
1355
1356    fn quick_checksum(mut engine: Checksum) -> ObjectChecksums {
1357        engine.update(0, &bytes::Bytes::from_static(QUICK.as_bytes()));
1358        engine.finalize()
1359    }
1360
1361    async fn collect<S: StreamingSource>(mut stream: S) -> anyhow::Result<Vec<u8>> {
1362        let mut collected = Vec::new();
1363        while let Some(b) = stream.next().await.transpose()? {
1364            collected.extend_from_slice(&b);
1365        }
1366        Ok(collected)
1367    }
1368
1369    #[tokio::test]
1370    async fn checksum_default() -> Result {
1371        let client = test_builder().build().await?;
1372        let upload = client
1373            .write_object("my-bucket", "my-object", QUICK)
1374            .precompute_checksums()
1375            .await?;
1376        let want = quick_checksum(Checksum {
1377            crc32c: Some(Crc32c::default()),
1378            md5_hash: None,
1379        });
1380        assert_eq!(
1381            upload.request.spec.resource.and_then(|r| r.checksums),
1382            Some(want)
1383        );
1384        let collected = collect(upload.payload).await?;
1385        assert_eq!(collected, QUICK.as_bytes());
1386        Ok(())
1387    }
1388
1389    #[tokio::test]
1390    async fn checksum_md5_and_crc32c() -> Result {
1391        let client = test_builder().build().await?;
1392        let upload = client
1393            .write_object("my-bucket", "my-object", QUICK)
1394            .compute_md5()
1395            .precompute_checksums()
1396            .await?;
1397        let want = quick_checksum(Checksum {
1398            crc32c: Some(Crc32c::default()),
1399            md5_hash: Some(Md5::default()),
1400        });
1401        assert_eq!(
1402            upload.request.spec.resource.and_then(|r| r.checksums),
1403            Some(want)
1404        );
1405        Ok(())
1406    }
1407
1408    #[tokio::test]
1409    async fn checksum_precomputed() -> Result {
1410        let mut engine = Checksum {
1411            crc32c: Some(Crc32c::default()),
1412            md5_hash: Some(Md5::default()),
1413        };
1414        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1415        let ck = engine.finalize();
1416
1417        let client = test_builder().build().await?;
1418        let upload = client
1419            .write_object("my-bucket", "my-object", QUICK)
1420            .with_known_crc32c(ck.crc32c.unwrap())
1421            .with_known_md5_hash(ck.md5_hash.clone())
1422            .precompute_checksums()
1423            .await?;
1424        // Note that the checksums do not match the data. This is intentional,
1425        // we are trying to verify that whatever is provided in with_crc32c()
1426        // and with_md5() is respected.
1427        assert_eq!(
1428            upload.request.spec.resource.and_then(|r| r.checksums),
1429            Some(ck)
1430        );
1431
1432        Ok(())
1433    }
1434
1435    #[tokio::test]
1436    async fn checksum_crc32c_known_md5_computed() -> Result {
1437        let mut engine = Checksum {
1438            crc32c: Some(Crc32c::default()),
1439            md5_hash: Some(Md5::default()),
1440        };
1441        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1442        let ck = engine.finalize();
1443
1444        let client = test_builder().build().await?;
1445        let upload = client
1446            .write_object("my-bucket", "my-object", QUICK)
1447            .compute_md5()
1448            .with_known_crc32c(ck.crc32c.unwrap())
1449            .precompute_checksums()
1450            .await?;
1451        // Note that the checksums do not match the data. This is intentional,
1452        // we are trying to verify that whatever is provided in with_known*()
1453        // is respected.
1454        let want = quick_checksum(Checksum {
1455            crc32c: None,
1456            md5_hash: Some(Md5::default()),
1457        })
1458        .set_crc32c(ck.crc32c.unwrap());
1459        assert_eq!(
1460            upload.request.spec.resource.and_then(|r| r.checksums),
1461            Some(want)
1462        );
1463
1464        Ok(())
1465    }
1466
1467    #[tokio::test]
1468    async fn checksum_mixed_then_precomputed() -> Result {
1469        let mut engine = Checksum {
1470            crc32c: Some(Crc32c::default()),
1471            md5_hash: Some(Md5::default()),
1472        };
1473        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1474        let ck = engine.finalize();
1475
1476        let client = test_builder().build().await?;
1477        let upload = client
1478            .write_object("my-bucket", "my-object", QUICK)
1479            .with_known_md5_hash(ck.md5_hash.clone())
1480            .with_known_crc32c(ck.crc32c.unwrap())
1481            .precompute_checksums()
1482            .await?;
1483        // Note that the checksums do not match the data. This is intentional,
1484        // we are trying to verify that whatever is provided in with_known*()
1485        // is respected.
1486        let want = ck.clone();
1487        assert_eq!(
1488            upload.request.spec.resource.and_then(|r| r.checksums),
1489            Some(want)
1490        );
1491
1492        Ok(())
1493    }
1494
1495    #[tokio::test]
1496    async fn checksum_full_computed_then_md5_precomputed() -> Result {
1497        let mut engine = Checksum {
1498            crc32c: Some(Crc32c::default()),
1499            md5_hash: Some(Md5::default()),
1500        };
1501        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1502        let ck = engine.finalize();
1503
1504        let client = test_builder().build().await?;
1505        let upload = client
1506            .write_object("my-bucket", "my-object", QUICK)
1507            .compute_md5()
1508            .with_known_md5_hash(ck.md5_hash.clone())
1509            .precompute_checksums()
1510            .await?;
1511        // Note that the checksums do not match the data. This is intentional,
1512        // we are trying to verify that whatever is provided in with_known*()
1513        // is respected.
1514        let want = quick_checksum(Checksum {
1515            crc32c: Some(Crc32c::default()),
1516            md5_hash: None,
1517        })
1518        .set_md5_hash(ck.md5_hash.clone());
1519        assert_eq!(
1520            upload.request.spec.resource.and_then(|r| r.checksums),
1521            Some(want)
1522        );
1523
1524        Ok(())
1525    }
1526
1527    #[tokio::test]
1528    async fn checksum_known_crc32_then_computed_md5() -> Result {
1529        let mut engine = Checksum {
1530            crc32c: Some(Crc32c::default()),
1531            md5_hash: Some(Md5::default()),
1532        };
1533        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1534        let ck = engine.finalize();
1535
1536        let client = test_builder().build().await?;
1537        let upload = client
1538            .write_object("my-bucket", "my-object", QUICK)
1539            .with_known_crc32c(ck.crc32c.unwrap())
1540            .compute_md5()
1541            .with_known_md5_hash(ck.md5_hash.clone())
1542            .precompute_checksums()
1543            .await?;
1544        // Note that the checksums do not match the data. This is intentional,
1545        // we are trying to verify that whatever is provided in with_known*()
1546        // is respected.
1547        let want = ck.clone();
1548        assert_eq!(
1549            upload.request.spec.resource.and_then(|r| r.checksums),
1550            Some(want)
1551        );
1552
1553        Ok(())
1554    }
1555
1556    #[tokio::test]
1557    async fn checksum_known_crc32_then_known_md5() -> Result {
1558        let mut engine = Checksum {
1559            crc32c: Some(Crc32c::default()),
1560            md5_hash: Some(Md5::default()),
1561        };
1562        engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1563        let ck = engine.finalize();
1564
1565        let client = test_builder().build().await?;
1566        let upload = client
1567            .write_object("my-bucket", "my-object", QUICK)
1568            .with_known_crc32c(ck.crc32c.unwrap())
1569            .with_known_md5_hash(ck.md5_hash.clone())
1570            .precompute_checksums()
1571            .await?;
1572        // Note that the checksums do not match the data. This is intentional,
1573        // we are trying to verify that whatever is provided in with_known*()
1574        // is respected.
1575        let want = ck.clone();
1576        assert_eq!(
1577            upload.request.spec.resource.and_then(|r| r.checksums),
1578            Some(want)
1579        );
1580
1581        Ok(())
1582    }
1583
1584    #[tokio::test]
1585    async fn precompute_checksums_seek_error() -> Result {
1586        let mut source = MockSeekSource::new();
1587        source
1588            .expect_seek()
1589            .once()
1590            .returning(|_| Err(IoError::new(ErrorKind::Deadlock, "test-only")));
1591
1592        let client = test_builder().build().await?;
1593        let err = client
1594            .write_object("my-bucket", "my-object", source)
1595            .precompute_checksums()
1596            .await
1597            .expect_err("seek() returns an error");
1598        assert!(err.is_serialization(), "{err:?}");
1599        assert!(
1600            err.source()
1601                .and_then(|e| e.downcast_ref::<IoError>())
1602                .is_some(),
1603            "{err:?}"
1604        );
1605
1606        Ok(())
1607    }
1608
1609    #[tokio::test]
1610    async fn precompute_checksums_next_error() -> Result {
1611        let mut source = MockSeekSource::new();
1612        source.expect_seek().returning(|_| Ok(()));
1613        let mut seq = mockall::Sequence::new();
1614        source
1615            .expect_next()
1616            .times(3)
1617            .in_sequence(&mut seq)
1618            .returning(|| Some(Ok(bytes::Bytes::new())));
1619        source
1620            .expect_next()
1621            .once()
1622            .in_sequence(&mut seq)
1623            .returning(|| Some(Err(IoError::new(ErrorKind::BrokenPipe, "test-only"))));
1624
1625        let client = test_builder().build().await?;
1626        let err = client
1627            .write_object("my-bucket", "my-object", source)
1628            .precompute_checksums()
1629            .await
1630            .expect_err("seek() returns an error");
1631        assert!(err.is_serialization(), "{err:?}");
1632        assert!(
1633            err.source()
1634                .and_then(|e| e.downcast_ref::<IoError>())
1635                .is_some(),
1636            "{err:?}"
1637        );
1638
1639        Ok(())
1640    }
1641
1642    #[tokio::test]
1643    async fn write_object_with_user_agent() -> Result {
1644        use http::header::USER_AGENT;
1645
1646        let user_agent = "quicker_foxes_lazier_dogs/1.2.3";
1647        let server = Server::run();
1648        server.expect(
1649            Expectation::matching(all_of![
1650                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
1651                request::headers(contains((USER_AGENT.as_str(), user_agent))),
1652                request::query(url_decoded(contains(("uploadType", "multipart")))),
1653            ])
1654            .times(1)
1655            .respond_with(status_code(200).body("{}")),
1656        );
1657
1658        let client = Storage::builder()
1659            .with_endpoint(format!("http://{}", server.addr()))
1660            .with_credentials(Anonymous::new().build())
1661            .build()
1662            .await?;
1663        let _ = client
1664            .write_object(
1665                "projects/_/buckets/test-bucket",
1666                "test-object",
1667                "hello world",
1668            )
1669            .with_user_agent(user_agent)
1670            .send_unbuffered()
1671            .await?;
1672
1673        Ok(())
1674    }
1675
1676    #[tokio::test]
1677    async fn write_object_with_quota_project() -> Result {
1678        const PROJECT_NAME: &str = "project_lazy_dog";
1679        let server = Server::run();
1680        let session = server.url("/upload/session/test-only-001");
1681        let path = session.path().to_string();
1682
1683        server.expect(
1684            Expectation::matching(all_of![
1685                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
1686                request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1687                request::query(url_decoded(contains(("uploadType", "resumable")))),
1688            ])
1689            .times(1)
1690            .respond_with(status_code(200).append_header("location", session.to_string())),
1691        );
1692
1693        server.expect(
1694            Expectation::matching(all_of![
1695                request::method_path("PUT", path),
1696                request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1697            ])
1698            .times(1)
1699            .respond_with(
1700                status_code(200)
1701                    .append_header(http::header::CONTENT_TYPE, "application/json")
1702                    .body(serde_json::to_string(&crate::model::Object::new()).unwrap()),
1703            ),
1704        );
1705
1706        let client = Storage::builder()
1707            .with_endpoint(format!("http://{}", server.addr()))
1708            .with_credentials(Anonymous::new().build())
1709            .build()
1710            .await?;
1711        let _ = client
1712            .write_object(
1713                "projects/_/buckets/test-bucket",
1714                "test-object",
1715                "hello world",
1716            )
1717            .with_quota_project(PROJECT_NAME)
1718            .with_resumable_upload_threshold(0_usize)
1719            .send_unbuffered()
1720            .await?;
1721
1722        Ok(())
1723    }
1724
1725    #[tokio::test]
1726    async fn debug() -> Result {
1727        let client = test_builder().build().await?;
1728        let upload = client
1729            .write_object("my-bucket", "my-object", "")
1730            .precompute_checksums()
1731            .await;
1732
1733        let fmt = format!("{upload:?}");
1734        ["WriteObject", "inner", "spec", "options", "checksum"]
1735            .into_iter()
1736            .for_each(|text| {
1737                assert!(fmt.contains(text), "expected {text} in {fmt}");
1738            });
1739        Ok(())
1740    }
1741}