google_cloud_storage/storage/write_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Contains the request builder for [write_object()] and related types.
16//!
17//! [write_object()]: crate::storage::client::Storage::write_object()
18
19use super::streaming_source::{Seek, StreamingSource};
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::storage::checksum::details::update as checksum_update;
23use crate::storage::checksum::details::{Checksum, Md5};
24use crate::storage::request_options::RequestOptions;
25
26/// A request builder for object writes.
27///
28/// # Example: hello world
29/// ```
30/// use google_cloud_storage::client::Storage;
31/// async fn sample(client: &Storage) -> anyhow::Result<()> {
32/// let response = client
33/// .write_object("projects/_/buckets/my-bucket", "hello", "Hello World!")
34/// .send_unbuffered()
35/// .await?;
36/// println!("response details={response:?}");
37/// Ok(())
38/// }
39/// ```
40///
41/// # Example: upload a file
42/// ```
43/// use google_cloud_storage::client::Storage;
44/// async fn sample(client: &Storage) -> anyhow::Result<()> {
45/// let payload = tokio::fs::File::open("my-data").await?;
46/// let response = client
47/// .write_object("projects/_/buckets/my-bucket", "my-object", payload)
48/// .send_unbuffered()
49/// .await?;
50/// println!("response details={response:?}");
51/// Ok(())
52/// }
53/// ```
54///
55/// # Example: create a new object from a custom data source
56/// ```
57/// use google_cloud_storage::{client::Storage, streaming_source::StreamingSource};
58/// struct DataSource;
59/// impl StreamingSource for DataSource {
60/// type Error = std::io::Error;
61/// async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
62/// # panic!();
63/// }
64/// }
65///
66/// async fn sample(client: &Storage) -> anyhow::Result<()> {
67/// let response = client
68/// .write_object("projects/_/buckets/my-bucket", "my-object", DataSource)
69/// .send_buffered()
70/// .await?;
71/// println!("response details={response:?}");
72/// Ok(())
73/// }
74/// ```
75pub struct WriteObject<T, S = crate::storage::transport::Storage>
76where
77 S: crate::storage::stub::Storage + 'static,
78{
79 stub: std::sync::Arc<S>,
80 pub(crate) request: crate::model_ext::WriteObjectRequest,
81 pub(crate) payload: Payload<T>,
82 pub(crate) options: RequestOptions,
83}
84
85impl<T, S> WriteObject<T, S>
86where
87 S: crate::storage::stub::Storage + 'static,
88{
89 /// Set a [request precondition] on the object generation to match.
90 ///
91 /// With this precondition the request fails if the current object
92 /// generation matches the provided value. A common value is `0`, which
93 /// prevents writes from succeeding if the object already exists.
94 ///
95 /// # Example
96 /// ```
97 /// # use google_cloud_storage::client::Storage;
98 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
99 /// let response = client
100 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
101 /// .set_if_generation_match(0)
102 /// .send_buffered()
103 /// .await?;
104 /// println!("response details={response:?}");
105 /// # Ok(()) }
106 /// ```
107 ///
108 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
109 pub fn set_if_generation_match<V>(mut self, v: V) -> Self
110 where
111 V: Into<i64>,
112 {
113 self.request.spec.if_generation_match = Some(v.into());
114 self
115 }
116
117 /// Set a [request precondition] on the object generation to match.
118 ///
119 /// With this precondition the request fails if the current object
120 /// generation does not match the provided value.
121 ///
122 /// # Example
123 /// ```
124 /// # use google_cloud_storage::client::Storage;
125 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
126 /// let response = client
127 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
128 /// .set_if_generation_not_match(0)
129 /// .send_buffered()
130 /// .await?;
131 /// println!("response details={response:?}");
132 /// # Ok(()) }
133 /// ```
134 ///
135 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
136 pub fn set_if_generation_not_match<V>(mut self, v: V) -> Self
137 where
138 V: Into<i64>,
139 {
140 self.request.spec.if_generation_not_match = Some(v.into());
141 self
142 }
143
144 /// Set a [request precondition] on the object meta generation.
145 ///
146 /// With this precondition the request fails if the current object metadata
147 /// generation does not match the provided value. This may be useful to
148 /// prevent changes when the metageneration is known.
149 ///
150 /// # Example
151 /// ```
152 /// # use google_cloud_storage::client::Storage;
153 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
154 /// let response = client
155 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
156 /// .set_if_metageneration_match(1234)
157 /// .send_buffered()
158 /// .await?;
159 /// println!("response details={response:?}");
160 /// # Ok(()) }
161 /// ```
162 ///
163 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
164 pub fn set_if_metageneration_match<V>(mut self, v: V) -> Self
165 where
166 V: Into<i64>,
167 {
168 self.request.spec.if_metageneration_match = Some(v.into());
169 self
170 }
171
172 /// Set a [request precondition] on the object meta-generation.
173 ///
174 /// With this precondition the request fails if the current object metadata
175 /// generation matches the provided value. This is rarely useful in uploads,
176 /// it is more commonly used on reads to prevent a large response if the
177 /// data is already cached.
178 ///
179 /// # Example
180 /// ```
181 /// # use google_cloud_storage::client::Storage;
182 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
183 /// let response = client
184 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
185 /// .set_if_metageneration_not_match(1234)
186 /// .send_buffered()
187 /// .await?;
188 /// println!("response details={response:?}");
189 /// # Ok(()) }
190 /// ```
191 ///
192 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
193 pub fn set_if_metageneration_not_match<V>(mut self, v: V) -> Self
194 where
195 V: Into<i64>,
196 {
197 self.request.spec.if_metageneration_not_match = Some(v.into());
198 self
199 }
200
201 /// Sets the ACL for the new object.
202 ///
203 /// # Example
204 /// ```
205 /// # use google_cloud_storage::client::Storage;
206 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
207 /// # use google_cloud_storage::model::ObjectAccessControl;
208 /// let response = client
209 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
210 /// .set_acl([ObjectAccessControl::new().set_entity("allAuthenticatedUsers").set_role("READER")])
211 /// .send_buffered()
212 /// .await?;
213 /// println!("response details={response:?}");
214 /// # Ok(()) }
215 /// ```
216 pub fn set_acl<I, V>(mut self, v: I) -> Self
217 where
218 I: IntoIterator<Item = V>,
219 V: Into<crate::model::ObjectAccessControl>,
220 {
221 self.mut_resource().acl = v.into_iter().map(|a| a.into()).collect();
222 self
223 }
224
225 /// Sets the [cache control] for the new object.
226 ///
227 /// This can be used to control caching in [public objects].
228 ///
229 /// # Example
230 /// ```
231 /// # use google_cloud_storage::client::Storage;
232 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
233 /// let response = client
234 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
235 /// .set_cache_control("public; max-age=7200")
236 /// .send_buffered()
237 /// .await?;
238 /// println!("response details={response:?}");
239 /// # Ok(()) }
240 /// ```
241 ///
242 /// [public objects]: https://cloud.google.com/storage/docs/access-control/making-data-public
243 /// [cache control]: https://datatracker.ietf.org/doc/html/rfc7234#section-5.2
244 pub fn set_cache_control<V: Into<String>>(mut self, v: V) -> Self {
245 self.mut_resource().cache_control = v.into();
246 self
247 }
248
249 /// Sets the [content disposition] for the new object.
250 ///
251 /// Google Cloud Storage can serve content directly to web browsers. This
252 /// attribute sets the `Content-Disposition` header, which may change how
253 /// the browser displays the contents.
254 ///
255 /// # Example
256 /// ```
257 /// # use google_cloud_storage::client::Storage;
258 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
259 /// let response = client
260 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
261 /// .set_content_disposition("inline")
262 /// .send_buffered()
263 /// .await?;
264 /// println!("response details={response:?}");
265 /// # Ok(()) }
266 /// ```
267 ///
268 /// [content disposition]: https://datatracker.ietf.org/doc/html/rfc6266
269 pub fn set_content_disposition<V: Into<String>>(mut self, v: V) -> Self {
270 self.mut_resource().content_disposition = v.into();
271 self
272 }
273
274 /// Sets the [content encoding] for the object data.
275 ///
276 /// This can be used to upload compressed data and enable [transcoding] of
277 /// the data during reads.
278 ///
279 /// # Example
280 /// ```
281 /// # use google_cloud_storage::client::Storage;
282 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
283 /// use flate2::write::GzEncoder;
284 /// use std::io::Write;
285 /// let mut e = GzEncoder::new(Vec::new(), flate2::Compression::default());
286 /// e.write_all(b"hello world");
287 /// let response = client
288 /// .write_object("projects/_/buckets/my-bucket", "my-object", bytes::Bytes::from_owner(e.finish()?))
289 /// .set_content_encoding("gzip")
290 /// .send_buffered()
291 /// .await?;
292 /// println!("response details={response:?}");
293 /// # Ok(()) }
294 /// ```
295 ///
296 /// [transcoding]: https://cloud.google.com/storage/docs/transcoding
297 /// [content encoding]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.2
298 pub fn set_content_encoding<V: Into<String>>(mut self, v: V) -> Self {
299 self.mut_resource().content_encoding = v.into();
300 self
301 }
302
303 /// Sets the [content language] for the new object.
304 ///
305 /// Google Cloud Storage can serve content directly to web browsers. This
306 /// attribute sets the `Content-Language` header, which may change how the
307 /// browser displays the contents.
308 ///
309 /// # Example
310 /// ```
311 /// # use google_cloud_storage::client::Storage;
312 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
313 /// let response = client
314 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
315 /// .set_content_language("en")
316 /// .send_buffered()
317 /// .await?;
318 /// println!("response details={response:?}");
319 /// # Ok(()) }
320 /// ```
321 ///
322 /// [content language]: https://cloud.google.com/storage/docs/metadata#content-language
323 pub fn set_content_language<V: Into<String>>(mut self, v: V) -> Self {
324 self.mut_resource().content_language = v.into();
325 self
326 }
327
328 /// Sets the [content type] for the new object.
329 ///
330 /// Google Cloud Storage can serve content directly to web browsers. This
331 /// attribute sets the `Content-Type` header, which may change how the
332 /// browser interprets the contents.
333 ///
334 /// # Example
335 /// ```
336 /// # use google_cloud_storage::client::Storage;
337 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
338 /// let response = client
339 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
340 /// .set_content_type("text/plain")
341 /// .send_buffered()
342 /// .await?;
343 /// println!("response details={response:?}");
344 /// # Ok(()) }
345 /// ```
346 ///
347 /// [content type]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.5
348 pub fn set_content_type<V: Into<String>>(mut self, v: V) -> Self {
349 self.mut_resource().content_type = v.into();
350 self
351 }
352
353 /// Sets the [custom time] for the new object.
354 ///
355 /// This field is typically set in order to use the [DaysSinceCustomTime]
356 /// condition in Object Lifecycle Management.
357 ///
358 /// # Example
359 /// ```
360 /// # use google_cloud_storage::client::Storage;
361 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
362 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
363 /// let response = client
364 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
365 /// .set_custom_time(time)
366 /// .send_buffered()
367 /// .await?;
368 /// println!("response details={response:?}");
369 /// # Ok(()) }
370 /// ```
371 ///
372 /// [DaysSinceCustomTime]: https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
373 /// [custom time]: https://cloud.google.com/storage/docs/metadata#custom-time
374 pub fn set_custom_time<V: Into<wkt::Timestamp>>(mut self, v: V) -> Self {
375 self.mut_resource().custom_time = Some(v.into());
376 self
377 }
378
379 /// Sets the [event based hold] flag for the new object.
380 ///
381 /// This field is typically set in order to prevent objects from being
382 /// deleted or modified.
383 ///
384 /// # Example
385 /// ```
386 /// # use google_cloud_storage::client::Storage;
387 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
388 /// let response = client
389 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
390 /// .set_event_based_hold(true)
391 /// .send_buffered()
392 /// .await?;
393 /// println!("response details={response:?}");
394 /// # Ok(()) }
395 /// ```
396 ///
397 /// [event based hold]: https://cloud.google.com/storage/docs/object-holds
398 pub fn set_event_based_hold<V: Into<bool>>(mut self, v: V) -> Self {
399 self.mut_resource().event_based_hold = Some(v.into());
400 self
401 }
402
403 /// Sets the [custom metadata] for the new object.
404 ///
405 /// This field is typically set to annotate the object with
406 /// application-specific metadata.
407 ///
408 /// # Example
409 /// ```
410 /// # use google_cloud_storage::client::Storage;
411 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
412 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
413 /// let response = client
414 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
415 /// .set_metadata([("test-only", "true"), ("environment", "qa")])
416 /// .send_buffered()
417 /// .await?;
418 /// println!("response details={response:?}");
419 /// # Ok(()) }
420 /// ```
421 ///
422 /// [custom metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
423 pub fn set_metadata<I, K, V>(mut self, i: I) -> Self
424 where
425 I: IntoIterator<Item = (K, V)>,
426 K: Into<String>,
427 V: Into<String>,
428 {
429 self.mut_resource().metadata = i.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
430 self
431 }
432
433 /// Sets the [retention configuration] for the new object.
434 ///
435 /// # Example
436 /// ```
437 /// # use google_cloud_storage::client::Storage;
438 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
439 /// # use google_cloud_storage::model::object::{Retention, retention};
440 /// let response = client
441 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
442 /// .set_retention(
443 /// Retention::new()
444 /// .set_mode(retention::Mode::Locked)
445 /// .set_retain_until_time(wkt::Timestamp::try_from("2035-01-01T00:00:00Z")?))
446 /// .send_buffered()
447 /// .await?;
448 /// println!("response details={response:?}");
449 /// # Ok(()) }
450 /// ```
451 ///
452 /// [retention configuration]: https://cloud.google.com/storage/docs/metadata#retention-config
453 pub fn set_retention<V>(mut self, v: V) -> Self
454 where
455 V: Into<crate::model::object::Retention>,
456 {
457 self.mut_resource().retention = Some(v.into());
458 self
459 }
460
461 /// Sets the [storage class] for the new object.
462 ///
463 /// # Example
464 /// ```
465 /// # use google_cloud_storage::client::Storage;
466 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
467 /// let response = client
468 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
469 /// .set_storage_class("ARCHIVE")
470 /// .send_buffered()
471 /// .await?;
472 /// println!("response details={response:?}");
473 /// # Ok(()) }
474 /// ```
475 ///
476 /// [storage class]: https://cloud.google.com/storage/docs/storage-classes
477 pub fn set_storage_class<V>(mut self, v: V) -> Self
478 where
479 V: Into<String>,
480 {
481 self.mut_resource().storage_class = v.into();
482 self
483 }
484
485 /// Sets the [temporary hold] flag for the new object.
486 ///
487 /// This field is typically set in order to prevent objects from being
488 /// deleted or modified.
489 ///
490 /// # Example
491 /// ```
492 /// # use google_cloud_storage::client::Storage;
493 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
494 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
495 /// let response = client
496 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
497 /// .set_temporary_hold(true)
498 /// .send_buffered()
499 /// .await?;
500 /// println!("response details={response:?}");
501 /// # Ok(()) }
502 /// ```
503 ///
504 /// [temporary hold]: https://cloud.google.com/storage/docs/object-holds
505 pub fn set_temporary_hold<V: Into<bool>>(mut self, v: V) -> Self {
506 self.mut_resource().temporary_hold = v.into();
507 self
508 }
509
510 /// Sets the resource name of the [Customer-managed encryption key] for this
511 /// object.
512 ///
513 /// The service imposes a number of restrictions on the keys used to encrypt
514 /// Google Cloud Storage objects. Read the documentation in full before
515 /// trying to use customer-managed encryption keys. In particular, verify
516 /// the service has the necessary permissions, and the key is in a
517 /// compatible location.
518 ///
519 /// # Example
520 /// ```
521 /// # use google_cloud_storage::client::Storage;
522 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
523 /// let response = client
524 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
525 /// .set_kms_key("projects/test-project/locations/us-central1/keyRings/test-ring/cryptoKeys/test-key")
526 /// .send_buffered()
527 /// .await?;
528 /// println!("response details={response:?}");
529 /// # Ok(()) }
530 /// ```
531 ///
532 /// [Customer-managed encryption key]: https://cloud.google.com/storage/docs/encryption/customer-managed-keys
533 pub fn set_kms_key<V>(mut self, v: V) -> Self
534 where
535 V: Into<String>,
536 {
537 self.mut_resource().kms_key = v.into();
538 self
539 }
540
541 /// Configure this object to use one of the [predefined ACLs].
542 ///
543 /// # Example
544 /// ```
545 /// # use google_cloud_storage::client::Storage;
546 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
547 /// let response = client
548 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
549 /// .set_predefined_acl("private")
550 /// .send_buffered()
551 /// .await?;
552 /// println!("response details={response:?}");
553 /// # Ok(()) }
554 /// ```
555 ///
556 /// [predefined ACLs]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
557 pub fn set_predefined_acl<V>(mut self, v: V) -> Self
558 where
559 V: Into<String>,
560 {
561 self.request.spec.predefined_acl = v.into();
562 self
563 }
564
565 /// The encryption key used with the Customer-Supplied Encryption Keys
566 /// feature. In raw bytes format (not base64-encoded).
567 ///
568 /// # Example
569 /// ```
570 /// # use google_cloud_storage::client::Storage;
571 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
572 /// # use google_cloud_storage::model_ext::KeyAes256;
573 /// let key: &[u8] = &[97; 32];
574 /// let response = client
575 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
576 /// .set_key(KeyAes256::new(key)?)
577 /// .send_buffered()
578 /// .await?;
579 /// println!("response details={response:?}");
580 /// # Ok(()) }
581 /// ```
582 pub fn set_key(mut self, v: KeyAes256) -> Self {
583 self.request.params = Some(v.into());
584 self
585 }
586
587 /// Sets the object custom contexts.
588 ///
589 /// # Example
590 /// ```
591 /// # use google_cloud_storage::client::Storage;
592 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
593 /// # use google_cloud_storage::model::{ObjectContexts, ObjectCustomContextPayload};
594 /// let response = client
595 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
596 /// .set_contexts(
597 /// ObjectContexts::new().set_custom([
598 /// ("example", ObjectCustomContextPayload::new().set_value("true")),
599 /// ])
600 /// )
601 /// .send_buffered()
602 /// .await?;
603 /// println!("response details={response:?}");
604 /// # Ok(()) }
605 /// ```
606 pub fn set_contexts<V>(mut self, v: V) -> Self
607 where
608 V: Into<crate::model::ObjectContexts>,
609 {
610 self.mut_resource().contexts = Some(v.into());
611 self
612 }
613
614 /// Configure the idempotency for this upload.
615 ///
616 /// By default, the client library treats single-shot uploads without
617 /// preconditions, as non-idempotent. If the destination bucket is
618 /// configured with [object versioning] then the operation may succeed
619 /// multiple times with observable side-effects. With object versioning and
620 /// a [lifecycle] policy limiting the number of versions, uploading the same
621 /// data multiple times may result in data loss.
622 ///
623 /// The client library cannot efficiently determine if these conditions
624 /// apply to your upload. If they do, or your application can tolerate
625 /// multiple versions of the same data for other reasons, consider using
626 /// `with_idempotency(true)`.
627 ///
628 /// The client library treats resumable uploads as idempotent, regardless of
629 /// the value in this option. Such uploads can succeed at most once.
630 ///
631 /// # Example
632 /// ```
633 /// # use google_cloud_storage::client::Storage;
634 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
635 /// use std::time::Duration;
636 /// use google_cloud_gax::retry_policy::RetryPolicyExt;
637 /// let response = client
638 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
639 /// .with_idempotency(true)
640 /// .send_buffered()
641 /// .await?;
642 /// println!("response details={response:?}");
643 /// # Ok(()) }
644 /// ```
645 ///
646 /// [lifecycle]: https://cloud.google.com/storage/docs/lifecycle
647 /// [object versioning]: https://cloud.google.com/storage/docs/object-versioning
648 pub fn with_idempotency(mut self, v: bool) -> Self {
649 self.options.idempotency = Some(v);
650 self
651 }
652
653 /// The retry policy used for this request.
654 ///
655 /// # Example
656 /// ```
657 /// # use google_cloud_storage::client::Storage;
658 /// # use google_cloud_storage::retry_policy::RetryableErrors;
659 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
660 /// use std::time::Duration;
661 /// use google_cloud_gax::retry_policy::RetryPolicyExt;
662 /// let response = client
663 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
664 /// .with_retry_policy(
665 /// RetryableErrors
666 /// .with_attempt_limit(5)
667 /// .with_time_limit(Duration::from_secs(90)),
668 /// )
669 /// .send_buffered()
670 /// .await?;
671 /// println!("response details={response:?}");
672 /// # Ok(()) }
673 /// ```
674 pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
675 mut self,
676 v: V,
677 ) -> Self {
678 self.options.retry_policy = v.into().into();
679 self
680 }
681
682 /// The backoff policy used for this request.
683 ///
684 /// # Example
685 /// ```
686 /// # use google_cloud_storage::client::Storage;
687 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
688 /// use std::time::Duration;
689 /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
690 /// let response = client
691 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
692 /// .with_backoff_policy(ExponentialBackoff::default())
693 /// .send_buffered()
694 /// .await?;
695 /// println!("response details={response:?}");
696 /// # Ok(()) }
697 /// ```
698 pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
699 mut self,
700 v: V,
701 ) -> Self {
702 self.options.backoff_policy = v.into().into();
703 self
704 }
705
706 /// The retry throttler used for this request.
707 ///
708 /// Most of the time you want to use the same throttler for all the requests
709 /// in a client, and even the same throttler for many clients. Rarely it
710 /// may be necessary to use an custom throttler for some subset of the
711 /// requests.
712 ///
713 /// # Example
714 /// ```
715 /// # use google_cloud_storage::client::Storage;
716 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
717 /// let response = client
718 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
719 /// .with_retry_throttler(adhoc_throttler())
720 /// .send_buffered()
721 /// .await?;
722 /// println!("response details={response:?}");
723 /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
724 /// # panic!();
725 /// }
726 /// # Ok(()) }
727 /// ```
728 pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
729 mut self,
730 v: V,
731 ) -> Self {
732 self.options.retry_throttler = v.into().into();
733 self
734 }
735
736 /// Sets the payload size threshold to switch from single-shot to resumable uploads.
737 ///
738 /// # Example
739 /// ```
740 /// # use google_cloud_storage::client::Storage;
741 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
742 /// let response = client
743 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
744 /// .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
745 /// .send_buffered()
746 /// .await?;
747 /// println!("response details={response:?}");
748 /// # Ok(()) }
749 /// ```
750 ///
751 /// The client library can perform uploads using [single-shot] or
752 /// [resumable] uploads. For small objects, single-shot uploads offer better
753 /// performance, as they require a single HTTP transfer. For larger objects,
754 /// the additional request latency is not significant, and resumable uploads
755 /// offer better recovery on errors.
756 ///
757 /// The library automatically selects resumable uploads when the payload is
758 /// equal to or larger than this option. For smaller uploads the client
759 /// library uses single-shot uploads.
760 ///
761 /// The exact threshold depends on where the application is deployed and
762 /// destination bucket location with respect to where the application is
763 /// running. The library defaults should work well in most cases, but some
764 /// applications may benefit from fine-tuning.
765 ///
766 /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
767 /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
768 pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
769 self.options.set_resumable_upload_threshold(v.into());
770 self
771 }
772
773 /// Changes the buffer size for some resumable uploads.
774 ///
775 /// # Example
776 /// ```
777 /// # use google_cloud_storage::client::Storage;
778 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
779 /// let response = client
780 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
781 /// .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
782 /// .send_buffered()
783 /// .await?;
784 /// println!("response details={response:?}");
785 /// # Ok(()) }
786 /// ```
787 ///
788 /// When performing [resumable uploads] from sources without [Seek] the
789 /// client library needs to buffer data in memory until it is persisted by
790 /// the service. Otherwise the data would be lost if the upload fails.
791 /// Applications may want to tune this buffer size:
792 ///
793 /// - Use smaller buffer sizes to support more concurrent uploads in the
794 /// same application.
795 /// - Use larger buffer sizes for better throughput. Sending many small
796 /// buffers stalls the upload until the client receives a successful
797 /// response from the service.
798 ///
799 /// Keep in mind that there are diminishing returns on using larger buffers.
800 ///
801 /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
802 /// [Seek]: crate::streaming_source::Seek
803 pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
804 self.options.set_resumable_upload_buffer_size(v.into());
805 self
806 }
807
808 /// Sets the `User-Agent` header for this request.
809 ///
810 /// # Example
811 /// ```
812 /// # use google_cloud_storage::client::Storage;
813 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
814 /// let mut response = client
815 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
816 /// .with_user_agent("my-app/1.0.0")
817 /// .send_buffered()
818 /// .await?;
819 /// println!("response details={response:?}");
820 /// # Ok(()) }
821 /// ```
822 pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
823 self.options.user_agent = Some(user_agent.into());
824 self
825 }
826
827 /// Sets the project that will be billed for this request.
828 ///
829 /// Required for [Requester Pays] buckets. The value overrides any
830 /// `quota_project_id` configured on the credentials; the credential-level
831 /// header is suppressed for this RPC.
832 ///
833 /// # Example
834 /// ```
835 /// # use google_cloud_storage::client::Storage;
836 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
837 /// let response = client
838 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello")
839 /// .with_quota_project("my-billing-project")
840 /// .send_buffered()
841 /// .await?;
842 /// # Ok(()) }
843 /// ```
844 ///
845 /// [Requester Pays]: https://cloud.google.com/storage/docs/requester-pays
846 pub fn with_quota_project(mut self, project: impl Into<String>) -> Self {
847 self.options.set_quota_project(project);
848 self
849 }
850
851 fn mut_resource(&mut self) -> &mut crate::model::Object {
852 self.request
853 .spec
854 .resource
855 .as_mut()
856 .expect("resource field initialized in `new()`")
857 }
858
859 fn set_crc32c<V: Into<u32>>(mut self, v: V) -> Self {
860 let checksum = self.mut_resource().checksums.get_or_insert_default();
861 checksum.crc32c = Some(v.into());
862 self
863 }
864
865 /// Sets the MD5 hash for the object being written.
866 pub fn set_md5_hash<I, V>(mut self, i: I) -> Self
867 where
868 I: IntoIterator<Item = V>,
869 V: Into<u8>,
870 {
871 let checksum = self.mut_resource().checksums.get_or_insert_default();
872 checksum.md5_hash = i.into_iter().map(|v| v.into()).collect();
873 self
874 }
875
876 /// Provide a precomputed value for the CRC32C checksum.
877 ///
878 /// # Example
879 /// ```
880 /// # use google_cloud_storage::client::Storage;
881 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
882 /// use crc32c::crc32c;
883 /// let response = client
884 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
885 /// .with_known_crc32c(crc32c(b"hello world"))
886 /// .send_buffered()
887 /// .await?;
888 /// println!("response details={response:?}");
889 /// # Ok(()) }
890 /// ```
891 ///
892 /// In some applications, the payload's CRC32C checksum is already known.
893 /// For example, the application may be reading the data from another blob
894 /// storage system.
895 ///
896 /// In such cases, it is safer to pass the known CRC32C of the payload to
897 /// [Cloud Storage], and more efficient to skip the computation in the
898 /// client library.
899 ///
900 /// Note that once you provide a CRC32C value to this builder you cannot
901 /// use [compute_md5()] to also have the library compute the checksums.
902 ///
903 /// [compute_md5()]: WriteObject::compute_md5
904 pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> Self {
905 let mut this = self;
906 this.options.checksum.crc32c = None;
907 this.set_crc32c(v)
908 }
909
910 /// Provide a precomputed value for the MD5 hash.
911 ///
912 /// # Example
913 /// ```
914 /// # use google_cloud_storage::client::Storage;
915 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
916 /// use md5::compute;
917 /// let hash = md5::compute(b"hello world");
918 /// let response = client
919 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
920 /// .with_known_md5_hash(bytes::Bytes::from_owner(hash.0))
921 /// .send_buffered()
922 /// .await?;
923 /// println!("response details={response:?}");
924 /// # Ok(()) }
925 /// ```
926 ///
927 /// In some applications, the payload's MD5 hash is already known. For
928 /// example, the application may be reading the data from another blob
929 /// storage system.
930 ///
931 /// In such cases, it is safer to pass the known MD5 of the payload to
932 /// [Cloud Storage], and more efficient to skip the computation in the
933 /// client library.
934 ///
935 /// Note that once you provide a MD5 value to this builder you cannot
936 /// use [compute_md5()] to also have the library compute the checksums.
937 ///
938 /// [compute_md5()]: WriteObject::compute_md5
939 pub fn with_known_md5_hash<I, V>(self, i: I) -> Self
940 where
941 I: IntoIterator<Item = V>,
942 V: Into<u8>,
943 {
944 let mut this = self;
945 this.options.checksum.md5_hash = None;
946 this.set_md5_hash(i)
947 }
948
949 /// Enables computation of MD5 hashes.
950 ///
951 /// # Example
952 /// ```
953 /// # use google_cloud_storage::client::Storage;
954 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
955 /// let payload = tokio::fs::File::open("my-data").await?;
956 /// let response = client
957 /// .write_object("projects/_/buckets/my-bucket", "my-object", payload)
958 /// .compute_md5()
959 /// .send_buffered()
960 /// .await?;
961 /// println!("response details={response:?}");
962 /// # Ok(()) }
963 /// ```
964 ///
965 /// See [precompute_checksums][WriteObject::precompute_checksums] for more
966 /// details on how checksums are used by the client library and their
967 /// limitations.
968 pub fn compute_md5(self) -> Self {
969 let mut this = self;
970 this.options.checksum.md5_hash = Some(Md5::default());
971 this
972 }
973
974 pub(crate) fn new<B, O, P>(
975 stub: std::sync::Arc<S>,
976 bucket: B,
977 object: O,
978 payload: P,
979 options: RequestOptions,
980 ) -> Self
981 where
982 B: Into<String>,
983 O: Into<String>,
984 P: Into<Payload<T>>,
985 {
986 let resource = crate::model::Object::new()
987 .set_bucket(bucket)
988 .set_name(object);
989 WriteObject {
990 stub,
991 request: crate::model_ext::WriteObjectRequest {
992 spec: crate::model::WriteObjectSpec::new().set_resource(resource),
993 params: None,
994 },
995 payload: payload.into(),
996 options,
997 }
998 }
999}
1000
1001impl<T, S> WriteObject<T, S>
1002where
1003 T: StreamingSource + Seek + Send + Sync + 'static,
1004 <T as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
1005 <T as Seek>::Error: std::error::Error + Send + Sync + 'static,
1006 S: crate::storage::stub::Storage + 'static,
1007{
1008 /// A simple upload from a buffer.
1009 ///
1010 /// # Example
1011 /// ```
1012 /// # use google_cloud_storage::client::Storage;
1013 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1014 /// let response = client
1015 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1016 /// .send_unbuffered()
1017 /// .await?;
1018 /// println!("response details={response:?}");
1019 /// # Ok(()) }
1020 /// ```
1021 pub async fn send_unbuffered(self) -> Result<Object> {
1022 self.stub
1023 .write_object_unbuffered(self.payload, self.request, self.options)
1024 .await
1025 }
1026
1027 /// Precompute the payload checksums before uploading the data.
1028 ///
1029 /// If the checksums are known when the upload starts, the client library
1030 /// can include the checksums with the upload request, and the service can
1031 /// reject the upload if the payload and the checksums do not match.
1032 ///
1033 /// # Example
1034 /// ```
1035 /// # use google_cloud_storage::client::Storage;
1036 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1037 /// let payload = tokio::fs::File::open("my-data").await?;
1038 /// let response = client
1039 /// .write_object("projects/_/buckets/my-bucket", "my-object", payload)
1040 /// .precompute_checksums()
1041 /// .await?
1042 /// .send_unbuffered()
1043 /// .await?;
1044 /// println!("response details={response:?}");
1045 /// # Ok(()) }
1046 /// ```
1047 ///
1048 /// Precomputing the checksums can be expensive if the data source is slow
1049 /// to read. Therefore, the client library does not precompute the checksums
1050 /// by default. The client library compares the checksums computed by the
1051 /// service against its own checksums. If they do not match, the client
1052 /// library returns an error. However, the service has already created the
1053 /// object with the (likely incorrect) data.
1054 ///
1055 /// The client library currently uses the [JSON API], it is not possible to
1056 /// send the checksums at the end of the upload with this API.
1057 ///
1058 /// [JSON API]: https://cloud.google.com/storage/docs/json_api
1059 pub async fn precompute_checksums(mut self) -> Result<Self> {
1060 let mut offset = 0_u64;
1061 self.payload.seek(offset).await.map_err(Error::ser)?;
1062 while let Some(n) = self.payload.next().await.transpose().map_err(Error::ser)? {
1063 self.options.checksum.update(offset, &n);
1064 offset += n.len() as u64;
1065 }
1066 self.payload.seek(0_u64).await.map_err(Error::ser)?;
1067 let computed = self.options.checksum.finalize();
1068 let current = self.mut_resource().checksums.get_or_insert_default();
1069 checksum_update(current, computed);
1070 self.options.checksum = Checksum {
1071 crc32c: None,
1072 md5_hash: None,
1073 };
1074 Ok(self)
1075 }
1076}
1077
1078impl<T, S> WriteObject<T, S>
1079where
1080 T: StreamingSource + Send + Sync + 'static,
1081 T::Error: std::error::Error + Send + Sync + 'static,
1082 S: crate::storage::stub::Storage + 'static,
1083{
1084 /// Upload an object from a streaming source without rewinds.
1085 ///
1086 /// If the data source does **not** implement [Seek] the client library must
1087 /// buffer data sent to the service until the service confirms it has
1088 /// persisted the data. This requires more memory in the client, and when
1089 /// the buffer grows too large, may require stalling the writer until the
1090 /// service can persist the data.
1091 ///
1092 /// Use this function for data sources where it is expensive or impossible
1093 /// to restart the data source. This function is also useful when it is hard
1094 /// or impossible to predict the number of bytes emitted by a stream, even
1095 /// if restarting the stream is not too expensive.
1096 ///
1097 /// # Example
1098 /// ```
1099 /// # use google_cloud_storage::client::Storage;
1100 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1101 /// let response = client
1102 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1103 /// .send_buffered()
1104 /// .await?;
1105 /// println!("response details={response:?}");
1106 /// # Ok(()) }
1107 /// ```
1108 pub async fn send_buffered(self) -> crate::Result<Object> {
1109 self.stub
1110 .write_object_buffered(self.payload, self.request, self.options)
1111 .await
1112 }
1113}
1114
1115// We need `Debug` to use `expect_err()` in `Result<WriteObject, ...>`.
1116impl<T, S> std::fmt::Debug for WriteObject<T, S>
1117where
1118 S: crate::storage::stub::Storage + 'static,
1119{
1120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1121 f.debug_struct("WriteObject")
1122 .field("stub", &self.stub)
1123 .field("request", &self.request)
1124 // skip payload, as it is not `Debug`
1125 .field("options", &self.options)
1126 .finish()
1127 }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use super::client::tests::{test_builder, test_inner_client};
1133 use super::*;
1134 use crate::client::Storage;
1135 use crate::model::{
1136 CommonObjectRequestParams, ObjectChecksums, ObjectContexts, ObjectCustomContextPayload,
1137 WriteObjectSpec,
1138 };
1139 use crate::storage::checksum::details::{Crc32c, Md5};
1140 use crate::streaming_source::tests::MockSeekSource;
1141 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
1142 use httptest::{Expectation, Server, matchers::*, responders::status_code};
1143 use std::error::Error as _;
1144 use std::io::{Error as IoError, ErrorKind};
1145
1146 type Result = anyhow::Result<()>;
1147
1148 // Verify `write_object()` can be used with a source that implements
1149 // `StreamingSource` **and** `Seek`
1150 #[tokio::test]
1151 async fn test_upload_streaming_source_and_seek() -> Result {
1152 struct Source;
1153 impl crate::streaming_source::StreamingSource for Source {
1154 type Error = std::io::Error;
1155 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1156 None
1157 }
1158 }
1159 impl crate::streaming_source::Seek for Source {
1160 type Error = std::io::Error;
1161 async fn seek(&mut self, _offset: u64) -> std::result::Result<(), Self::Error> {
1162 Ok(())
1163 }
1164 }
1165
1166 let client = Storage::builder()
1167 .with_credentials(Anonymous::new().build())
1168 .build()
1169 .await?;
1170 let _ = client.write_object("projects/_/buckets/test-bucket", "test-object", Source);
1171 Ok(())
1172 }
1173
1174 // Verify `write_object()` can be used with a source that **only**
1175 // implements `StreamingSource`.
1176 #[tokio::test]
1177 async fn test_upload_only_streaming_source() -> Result {
1178 struct Source;
1179 impl crate::streaming_source::StreamingSource for Source {
1180 type Error = std::io::Error;
1181 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1182 None
1183 }
1184 }
1185
1186 let client = Storage::builder()
1187 .with_credentials(Anonymous::new().build())
1188 .build()
1189 .await?;
1190 let _ = client.write_object("projects/_/buckets/test-bucket", "test-object", Source);
1191 Ok(())
1192 }
1193
1194 // Verify `write_object()` meets normal Send, Sync, requirements.
1195 #[tokio::test]
1196 async fn test_upload_is_send_and_static() -> Result {
1197 let client = Storage::builder()
1198 .with_credentials(Anonymous::new().build())
1199 .build()
1200 .await?;
1201
1202 fn need_send<T: Send>(_val: &T) {}
1203 fn need_sync<T: Sync>(_val: &T) {}
1204 fn need_static<T: 'static>(_val: &T) {}
1205
1206 let upload = client.write_object("projects/_/buckets/test-bucket", "test-object", "");
1207 need_send(&upload);
1208 need_sync(&upload);
1209 need_static(&upload);
1210
1211 let upload = client
1212 .write_object("projects/_/buckets/test-bucket", "test-object", "")
1213 .send_unbuffered();
1214 need_send(&upload);
1215 need_static(&upload);
1216
1217 let upload = client
1218 .write_object("projects/_/buckets/test-bucket", "test-object", "")
1219 .send_buffered();
1220 need_send(&upload);
1221 need_static(&upload);
1222
1223 Ok(())
1224 }
1225
1226 #[tokio::test]
1227 async fn write_object_metadata() -> Result {
1228 use crate::model::ObjectAccessControl;
1229 let inner = test_inner_client(test_builder()).await;
1230 let options = inner.options.clone();
1231 let stub = crate::storage::transport::Storage::new_test(inner);
1232 let key = KeyAes256::new(&[0x42; 32]).expect("hard-coded key is not an error");
1233 let mut builder =
1234 WriteObject::new(stub, "projects/_/buckets/bucket", "object", "", options)
1235 .set_if_generation_match(10)
1236 .set_if_generation_not_match(20)
1237 .set_if_metageneration_match(30)
1238 .set_if_metageneration_not_match(40)
1239 .set_predefined_acl("private")
1240 .set_acl([ObjectAccessControl::new()
1241 .set_entity("allAuthenticatedUsers")
1242 .set_role("READER")])
1243 .set_cache_control("public; max-age=7200")
1244 .set_content_disposition("inline")
1245 .set_content_encoding("gzip")
1246 .set_content_language("en")
1247 .set_content_type("text/plain")
1248 .set_contexts(ObjectContexts::new().set_custom([(
1249 "context-key",
1250 ObjectCustomContextPayload::new().set_value("context-value"),
1251 )]))
1252 .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1253 .set_event_based_hold(true)
1254 .set_key(key.clone())
1255 .set_metadata([("k0", "v0"), ("k1", "v1")])
1256 .set_retention(
1257 crate::model::object::Retention::new()
1258 .set_mode(crate::model::object::retention::Mode::Locked)
1259 .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
1260 )
1261 .set_storage_class("ARCHIVE")
1262 .set_temporary_hold(true)
1263 .set_kms_key("test-key")
1264 .with_known_crc32c(crc32c::crc32c(b""))
1265 .with_known_md5_hash(md5::compute(b"").0);
1266
1267 let resource = builder.request.spec.resource.take().unwrap();
1268 let builder = builder;
1269 assert_eq!(
1270 &builder.request.spec,
1271 &WriteObjectSpec::new()
1272 .set_if_generation_match(10)
1273 .set_if_generation_not_match(20)
1274 .set_if_metageneration_match(30)
1275 .set_if_metageneration_not_match(40)
1276 .set_predefined_acl("private")
1277 );
1278
1279 assert_eq!(
1280 &builder.request.params,
1281 &Some(CommonObjectRequestParams::from(key))
1282 );
1283
1284 assert_eq!(
1285 resource,
1286 Object::new()
1287 .set_name("object")
1288 .set_bucket("projects/_/buckets/bucket")
1289 .set_acl([ObjectAccessControl::new()
1290 .set_entity("allAuthenticatedUsers")
1291 .set_role("READER")])
1292 .set_cache_control("public; max-age=7200")
1293 .set_content_disposition("inline")
1294 .set_content_encoding("gzip")
1295 .set_content_language("en")
1296 .set_content_type("text/plain")
1297 .set_contexts(ObjectContexts::new().set_custom([(
1298 "context-key",
1299 ObjectCustomContextPayload::new().set_value("context-value"),
1300 )]))
1301 .set_checksums(
1302 crate::model::ObjectChecksums::new()
1303 .set_crc32c(crc32c::crc32c(b""))
1304 .set_md5_hash(bytes::Bytes::from_iter(md5::compute(b"").0))
1305 )
1306 .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1307 .set_event_based_hold(true)
1308 .set_metadata([("k0", "v0"), ("k1", "v1")])
1309 .set_retention(
1310 crate::model::object::Retention::new()
1311 .set_mode("LOCKED")
1312 .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?)
1313 )
1314 .set_storage_class("ARCHIVE")
1315 .set_temporary_hold(true)
1316 .set_kms_key("test-key")
1317 );
1318
1319 Ok(())
1320 }
1321
1322 #[tokio::test]
1323 async fn upload_object_options() {
1324 let inner = test_inner_client(
1325 test_builder()
1326 .with_resumable_upload_threshold(123_usize)
1327 .with_resumable_upload_buffer_size(234_usize),
1328 )
1329 .await;
1330 let options = inner.options.clone();
1331 let stub = crate::storage::transport::Storage::new_test(inner);
1332 let request = WriteObject::new(
1333 stub.clone(),
1334 "projects/_/buckets/bucket",
1335 "object",
1336 "",
1337 options.clone(),
1338 );
1339 assert_eq!(request.options.resumable_upload_threshold(), 123);
1340 assert_eq!(request.options.resumable_upload_buffer_size(), 234);
1341 assert_eq!(request.options.user_agent, None);
1342
1343 let user_agent = "quick_foxes_lazy_dogs/1.0.0";
1344 let request = WriteObject::new(stub, "projects/_/buckets/bucket", "object", "", options)
1345 .with_resumable_upload_threshold(345_usize)
1346 .with_resumable_upload_buffer_size(456_usize)
1347 .with_user_agent(user_agent);
1348 assert_eq!(request.options.resumable_upload_threshold(), 345);
1349 assert_eq!(request.options.resumable_upload_buffer_size(), 456);
1350 assert_eq!(request.options.user_agent.as_deref(), Some(user_agent));
1351 }
1352
1353 const QUICK: &str = "the quick brown fox jumps over the lazy dog";
1354 const VEXING: &str = "how vexingly quick daft zebras jump";
1355
1356 fn quick_checksum(mut engine: Checksum) -> ObjectChecksums {
1357 engine.update(0, &bytes::Bytes::from_static(QUICK.as_bytes()));
1358 engine.finalize()
1359 }
1360
1361 async fn collect<S: StreamingSource>(mut stream: S) -> anyhow::Result<Vec<u8>> {
1362 let mut collected = Vec::new();
1363 while let Some(b) = stream.next().await.transpose()? {
1364 collected.extend_from_slice(&b);
1365 }
1366 Ok(collected)
1367 }
1368
1369 #[tokio::test]
1370 async fn checksum_default() -> Result {
1371 let client = test_builder().build().await?;
1372 let upload = client
1373 .write_object("my-bucket", "my-object", QUICK)
1374 .precompute_checksums()
1375 .await?;
1376 let want = quick_checksum(Checksum {
1377 crc32c: Some(Crc32c::default()),
1378 md5_hash: None,
1379 });
1380 assert_eq!(
1381 upload.request.spec.resource.and_then(|r| r.checksums),
1382 Some(want)
1383 );
1384 let collected = collect(upload.payload).await?;
1385 assert_eq!(collected, QUICK.as_bytes());
1386 Ok(())
1387 }
1388
1389 #[tokio::test]
1390 async fn checksum_md5_and_crc32c() -> Result {
1391 let client = test_builder().build().await?;
1392 let upload = client
1393 .write_object("my-bucket", "my-object", QUICK)
1394 .compute_md5()
1395 .precompute_checksums()
1396 .await?;
1397 let want = quick_checksum(Checksum {
1398 crc32c: Some(Crc32c::default()),
1399 md5_hash: Some(Md5::default()),
1400 });
1401 assert_eq!(
1402 upload.request.spec.resource.and_then(|r| r.checksums),
1403 Some(want)
1404 );
1405 Ok(())
1406 }
1407
1408 #[tokio::test]
1409 async fn checksum_precomputed() -> Result {
1410 let mut engine = Checksum {
1411 crc32c: Some(Crc32c::default()),
1412 md5_hash: Some(Md5::default()),
1413 };
1414 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1415 let ck = engine.finalize();
1416
1417 let client = test_builder().build().await?;
1418 let upload = client
1419 .write_object("my-bucket", "my-object", QUICK)
1420 .with_known_crc32c(ck.crc32c.unwrap())
1421 .with_known_md5_hash(ck.md5_hash.clone())
1422 .precompute_checksums()
1423 .await?;
1424 // Note that the checksums do not match the data. This is intentional,
1425 // we are trying to verify that whatever is provided in with_crc32c()
1426 // and with_md5() is respected.
1427 assert_eq!(
1428 upload.request.spec.resource.and_then(|r| r.checksums),
1429 Some(ck)
1430 );
1431
1432 Ok(())
1433 }
1434
1435 #[tokio::test]
1436 async fn checksum_crc32c_known_md5_computed() -> Result {
1437 let mut engine = Checksum {
1438 crc32c: Some(Crc32c::default()),
1439 md5_hash: Some(Md5::default()),
1440 };
1441 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1442 let ck = engine.finalize();
1443
1444 let client = test_builder().build().await?;
1445 let upload = client
1446 .write_object("my-bucket", "my-object", QUICK)
1447 .compute_md5()
1448 .with_known_crc32c(ck.crc32c.unwrap())
1449 .precompute_checksums()
1450 .await?;
1451 // Note that the checksums do not match the data. This is intentional,
1452 // we are trying to verify that whatever is provided in with_known*()
1453 // is respected.
1454 let want = quick_checksum(Checksum {
1455 crc32c: None,
1456 md5_hash: Some(Md5::default()),
1457 })
1458 .set_crc32c(ck.crc32c.unwrap());
1459 assert_eq!(
1460 upload.request.spec.resource.and_then(|r| r.checksums),
1461 Some(want)
1462 );
1463
1464 Ok(())
1465 }
1466
1467 #[tokio::test]
1468 async fn checksum_mixed_then_precomputed() -> Result {
1469 let mut engine = Checksum {
1470 crc32c: Some(Crc32c::default()),
1471 md5_hash: Some(Md5::default()),
1472 };
1473 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1474 let ck = engine.finalize();
1475
1476 let client = test_builder().build().await?;
1477 let upload = client
1478 .write_object("my-bucket", "my-object", QUICK)
1479 .with_known_md5_hash(ck.md5_hash.clone())
1480 .with_known_crc32c(ck.crc32c.unwrap())
1481 .precompute_checksums()
1482 .await?;
1483 // Note that the checksums do not match the data. This is intentional,
1484 // we are trying to verify that whatever is provided in with_known*()
1485 // is respected.
1486 let want = ck.clone();
1487 assert_eq!(
1488 upload.request.spec.resource.and_then(|r| r.checksums),
1489 Some(want)
1490 );
1491
1492 Ok(())
1493 }
1494
1495 #[tokio::test]
1496 async fn checksum_full_computed_then_md5_precomputed() -> Result {
1497 let mut engine = Checksum {
1498 crc32c: Some(Crc32c::default()),
1499 md5_hash: Some(Md5::default()),
1500 };
1501 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1502 let ck = engine.finalize();
1503
1504 let client = test_builder().build().await?;
1505 let upload = client
1506 .write_object("my-bucket", "my-object", QUICK)
1507 .compute_md5()
1508 .with_known_md5_hash(ck.md5_hash.clone())
1509 .precompute_checksums()
1510 .await?;
1511 // Note that the checksums do not match the data. This is intentional,
1512 // we are trying to verify that whatever is provided in with_known*()
1513 // is respected.
1514 let want = quick_checksum(Checksum {
1515 crc32c: Some(Crc32c::default()),
1516 md5_hash: None,
1517 })
1518 .set_md5_hash(ck.md5_hash.clone());
1519 assert_eq!(
1520 upload.request.spec.resource.and_then(|r| r.checksums),
1521 Some(want)
1522 );
1523
1524 Ok(())
1525 }
1526
1527 #[tokio::test]
1528 async fn checksum_known_crc32_then_computed_md5() -> Result {
1529 let mut engine = Checksum {
1530 crc32c: Some(Crc32c::default()),
1531 md5_hash: Some(Md5::default()),
1532 };
1533 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1534 let ck = engine.finalize();
1535
1536 let client = test_builder().build().await?;
1537 let upload = client
1538 .write_object("my-bucket", "my-object", QUICK)
1539 .with_known_crc32c(ck.crc32c.unwrap())
1540 .compute_md5()
1541 .with_known_md5_hash(ck.md5_hash.clone())
1542 .precompute_checksums()
1543 .await?;
1544 // Note that the checksums do not match the data. This is intentional,
1545 // we are trying to verify that whatever is provided in with_known*()
1546 // is respected.
1547 let want = ck.clone();
1548 assert_eq!(
1549 upload.request.spec.resource.and_then(|r| r.checksums),
1550 Some(want)
1551 );
1552
1553 Ok(())
1554 }
1555
1556 #[tokio::test]
1557 async fn checksum_known_crc32_then_known_md5() -> Result {
1558 let mut engine = Checksum {
1559 crc32c: Some(Crc32c::default()),
1560 md5_hash: Some(Md5::default()),
1561 };
1562 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1563 let ck = engine.finalize();
1564
1565 let client = test_builder().build().await?;
1566 let upload = client
1567 .write_object("my-bucket", "my-object", QUICK)
1568 .with_known_crc32c(ck.crc32c.unwrap())
1569 .with_known_md5_hash(ck.md5_hash.clone())
1570 .precompute_checksums()
1571 .await?;
1572 // Note that the checksums do not match the data. This is intentional,
1573 // we are trying to verify that whatever is provided in with_known*()
1574 // is respected.
1575 let want = ck.clone();
1576 assert_eq!(
1577 upload.request.spec.resource.and_then(|r| r.checksums),
1578 Some(want)
1579 );
1580
1581 Ok(())
1582 }
1583
1584 #[tokio::test]
1585 async fn precompute_checksums_seek_error() -> Result {
1586 let mut source = MockSeekSource::new();
1587 source
1588 .expect_seek()
1589 .once()
1590 .returning(|_| Err(IoError::new(ErrorKind::Deadlock, "test-only")));
1591
1592 let client = test_builder().build().await?;
1593 let err = client
1594 .write_object("my-bucket", "my-object", source)
1595 .precompute_checksums()
1596 .await
1597 .expect_err("seek() returns an error");
1598 assert!(err.is_serialization(), "{err:?}");
1599 assert!(
1600 err.source()
1601 .and_then(|e| e.downcast_ref::<IoError>())
1602 .is_some(),
1603 "{err:?}"
1604 );
1605
1606 Ok(())
1607 }
1608
1609 #[tokio::test]
1610 async fn precompute_checksums_next_error() -> Result {
1611 let mut source = MockSeekSource::new();
1612 source.expect_seek().returning(|_| Ok(()));
1613 let mut seq = mockall::Sequence::new();
1614 source
1615 .expect_next()
1616 .times(3)
1617 .in_sequence(&mut seq)
1618 .returning(|| Some(Ok(bytes::Bytes::new())));
1619 source
1620 .expect_next()
1621 .once()
1622 .in_sequence(&mut seq)
1623 .returning(|| Some(Err(IoError::new(ErrorKind::BrokenPipe, "test-only"))));
1624
1625 let client = test_builder().build().await?;
1626 let err = client
1627 .write_object("my-bucket", "my-object", source)
1628 .precompute_checksums()
1629 .await
1630 .expect_err("seek() returns an error");
1631 assert!(err.is_serialization(), "{err:?}");
1632 assert!(
1633 err.source()
1634 .and_then(|e| e.downcast_ref::<IoError>())
1635 .is_some(),
1636 "{err:?}"
1637 );
1638
1639 Ok(())
1640 }
1641
1642 #[tokio::test]
1643 async fn write_object_with_user_agent() -> Result {
1644 use http::header::USER_AGENT;
1645
1646 let user_agent = "quicker_foxes_lazier_dogs/1.2.3";
1647 let server = Server::run();
1648 server.expect(
1649 Expectation::matching(all_of![
1650 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
1651 request::headers(contains((USER_AGENT.as_str(), user_agent))),
1652 request::query(url_decoded(contains(("uploadType", "multipart")))),
1653 ])
1654 .times(1)
1655 .respond_with(status_code(200).body("{}")),
1656 );
1657
1658 let client = Storage::builder()
1659 .with_endpoint(format!("http://{}", server.addr()))
1660 .with_credentials(Anonymous::new().build())
1661 .build()
1662 .await?;
1663 let _ = client
1664 .write_object(
1665 "projects/_/buckets/test-bucket",
1666 "test-object",
1667 "hello world",
1668 )
1669 .with_user_agent(user_agent)
1670 .send_unbuffered()
1671 .await?;
1672
1673 Ok(())
1674 }
1675
1676 #[tokio::test]
1677 async fn write_object_with_quota_project() -> Result {
1678 const PROJECT_NAME: &str = "project_lazy_dog";
1679 let server = Server::run();
1680 let session = server.url("/upload/session/test-only-001");
1681 let path = session.path().to_string();
1682
1683 server.expect(
1684 Expectation::matching(all_of![
1685 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
1686 request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1687 request::query(url_decoded(contains(("uploadType", "resumable")))),
1688 ])
1689 .times(1)
1690 .respond_with(status_code(200).append_header("location", session.to_string())),
1691 );
1692
1693 server.expect(
1694 Expectation::matching(all_of![
1695 request::method_path("PUT", path),
1696 request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1697 ])
1698 .times(1)
1699 .respond_with(
1700 status_code(200)
1701 .append_header(http::header::CONTENT_TYPE, "application/json")
1702 .body(serde_json::to_string(&crate::model::Object::new()).unwrap()),
1703 ),
1704 );
1705
1706 let client = Storage::builder()
1707 .with_endpoint(format!("http://{}", server.addr()))
1708 .with_credentials(Anonymous::new().build())
1709 .build()
1710 .await?;
1711 let _ = client
1712 .write_object(
1713 "projects/_/buckets/test-bucket",
1714 "test-object",
1715 "hello world",
1716 )
1717 .with_quota_project(PROJECT_NAME)
1718 .with_resumable_upload_threshold(0_usize)
1719 .send_unbuffered()
1720 .await?;
1721
1722 Ok(())
1723 }
1724
1725 #[tokio::test]
1726 async fn debug() -> Result {
1727 let client = test_builder().build().await?;
1728 let upload = client
1729 .write_object("my-bucket", "my-object", "")
1730 .precompute_checksums()
1731 .await;
1732
1733 let fmt = format!("{upload:?}");
1734 ["WriteObject", "inner", "spec", "options", "checksum"]
1735 .into_iter()
1736 .for_each(|text| {
1737 assert!(fmt.contains(text), "expected {text} in {fmt}");
1738 });
1739 Ok(())
1740 }
1741}