google_cloud_storage/storage/upload_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Contains the request builder for [upload_object()] and related types.
16//!
17//! [upload_object()]: crate::storage::client::Storage::upload_object()
18
19use super::client::*;
20use super::perform_upload::PerformUpload;
21use super::upload_source::{Seek, StreamingSource};
22use super::*;
23use crate::storage::checksum::{
24 ChecksumEngine,
25 details::{Crc32c, Known, KnownCrc32c, KnownMd5, Md5, update as checksum_update},
26};
27
28/// A request builder for object uploads.
29///
30/// # Example: hello world
31/// ```
32/// use google_cloud_storage::client::Storage;
33/// async fn sample(client: &Storage) -> anyhow::Result<()> {
34/// let response = client
35/// .upload_object("projects/_/buckets/my-bucket", "hello", "Hello World!")
36/// .send_unbuffered()
37/// .await?;
38/// println!("response details={response:?}");
39/// Ok(())
40/// }
41/// ```
42///
43/// # Example: upload a file
44/// ```
45/// use google_cloud_storage::client::Storage;
46/// async fn sample(client: &Storage) -> anyhow::Result<()> {
47/// let payload = tokio::fs::File::open("my-data").await?;
48/// let response = client
49/// .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
50/// .send_unbuffered()
51/// .await?;
52/// println!("response details={response:?}");
53/// Ok(())
54/// }
55/// ```
56///
57/// # Example: upload a custom data source
58/// ```
59/// use google_cloud_storage::{client::Storage, upload_source::StreamingSource};
60/// struct DataSource;
61/// impl StreamingSource for DataSource {
62/// type Error = std::io::Error;
63/// async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
64/// # panic!();
65/// }
66/// }
67///
68/// async fn sample(client: &Storage) -> anyhow::Result<()> {
69/// let response = client
70/// .upload_object("projects/_/buckets/my-bucket", "my-object", DataSource)
71/// .send_buffered()
72/// .await?;
73/// println!("response details={response:?}");
74/// Ok(())
75/// }
76/// ```
77pub struct UploadObject<T, C = Crc32c> {
78 inner: std::sync::Arc<StorageInner>,
79 spec: crate::model::WriteObjectSpec,
80 params: Option<crate::model::CommonObjectRequestParams>,
81 payload: Payload<T>,
82 options: super::request_options::RequestOptions,
83 checksum: C,
84}
85
86impl<T, C> UploadObject<T, C> {
87 /// Set a [request precondition] on the object generation to match.
88 ///
89 /// With this precondition the request fails if the current object
90 /// generation matches the provided value. A common value is `0`, which
91 /// prevents uploads from succeeding if the object already exists.
92 ///
93 /// # Example
94 /// ```
95 /// # use google_cloud_storage::client::Storage;
96 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
97 /// let response = client
98 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
99 /// .with_if_generation_match(0)
100 /// .send_buffered()
101 /// .await?;
102 /// println!("response details={response:?}");
103 /// # Ok(()) }
104 /// ```
105 ///
106 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
107 pub fn with_if_generation_match<V>(mut self, v: V) -> Self
108 where
109 V: Into<i64>,
110 {
111 self.spec.if_generation_match = Some(v.into());
112 self
113 }
114
115 /// Set a [request precondition] on the object generation to match.
116 ///
117 /// With this precondition the request fails if the current object
118 /// generation does not match the provided value.
119 ///
120 /// # Example
121 /// ```
122 /// # use google_cloud_storage::client::Storage;
123 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
124 /// let response = client
125 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
126 /// .with_if_generation_not_match(0)
127 /// .send_buffered()
128 /// .await?;
129 /// println!("response details={response:?}");
130 /// # Ok(()) }
131 /// ```
132 ///
133 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
134 pub fn with_if_generation_not_match<V>(mut self, v: V) -> Self
135 where
136 V: Into<i64>,
137 {
138 self.spec.if_generation_not_match = Some(v.into());
139 self
140 }
141
142 /// Set a [request precondition] on the object meta generation.
143 ///
144 /// With this precondition the request fails if the current object metadata
145 /// generation does not match the provided value. This may be useful to
146 /// prevent changes when the metageneration is known.
147 ///
148 /// # Example
149 /// ```
150 /// # use google_cloud_storage::client::Storage;
151 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
152 /// let response = client
153 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
154 /// .with_if_metageneration_match(1234)
155 /// .send_buffered()
156 /// .await?;
157 /// println!("response details={response:?}");
158 /// # Ok(()) }
159 /// ```
160 ///
161 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
162 pub fn with_if_metageneration_match<V>(mut self, v: V) -> Self
163 where
164 V: Into<i64>,
165 {
166 self.spec.if_metageneration_match = Some(v.into());
167 self
168 }
169
170 /// Set a [request precondition] on the object meta-generation.
171 ///
172 /// With this precondition the request fails if the current object metadata
173 /// generation matches the provided value. This is rarely useful in uploads,
174 /// it is more commonly used on downloads to prevent downloads if the value
175 /// is already cached.
176 ///
177 /// # Example
178 /// ```
179 /// # use google_cloud_storage::client::Storage;
180 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
181 /// let response = client
182 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
183 /// .with_if_metageneration_not_match(1234)
184 /// .send_buffered()
185 /// .await?;
186 /// println!("response details={response:?}");
187 /// # Ok(()) }
188 /// ```
189 ///
190 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
191 pub fn with_if_metageneration_not_match<V>(mut self, v: V) -> Self
192 where
193 V: Into<i64>,
194 {
195 self.spec.if_metageneration_not_match = Some(v.into());
196 self
197 }
198
199 /// Sets the ACL for the new object.
200 ///
201 /// # Example
202 /// ```
203 /// # use google_cloud_storage::client::Storage;
204 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
205 /// # use google_cloud_storage::model::ObjectAccessControl;
206 /// let response = client
207 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
208 /// .with_acl([ObjectAccessControl::new().set_entity("allAuthenticatedUsers").set_role("READER")])
209 /// .send_buffered()
210 /// .await?;
211 /// println!("response details={response:?}");
212 /// # Ok(()) }
213 /// ```
214 pub fn with_acl<I, V>(mut self, v: I) -> Self
215 where
216 I: IntoIterator<Item = V>,
217 V: Into<crate::model::ObjectAccessControl>,
218 {
219 self.mut_resource().acl = v.into_iter().map(|a| a.into()).collect();
220 self
221 }
222
223 /// Sets the [cache control] for the new object.
224 ///
225 /// This can be used to control caching in [public objects].
226 ///
227 /// # Example
228 /// ```
229 /// # use google_cloud_storage::client::Storage;
230 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
231 /// let response = client
232 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
233 /// .with_cache_control("public; max-age=7200")
234 /// .send_buffered()
235 /// .await?;
236 /// println!("response details={response:?}");
237 /// # Ok(()) }
238 /// ```
239 ///
240 /// [public objects]: https://cloud.google.com/storage/docs/access-control/making-data-public
241 /// [cache control]: https://datatracker.ietf.org/doc/html/rfc7234#section-5.2
242 pub fn with_cache_control<V: Into<String>>(mut self, v: V) -> Self {
243 self.mut_resource().cache_control = v.into();
244 self
245 }
246
247 /// Sets the [content disposition] for the new object.
248 ///
249 /// Google Cloud Storage can serve content directly to web browsers. This
250 /// attribute sets the `Content-Disposition` header, which may change how
251 /// the browser displays the contents.
252 ///
253 /// # Example
254 /// ```
255 /// # use google_cloud_storage::client::Storage;
256 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
257 /// let response = client
258 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
259 /// .with_content_disposition("inline")
260 /// .send_buffered()
261 /// .await?;
262 /// println!("response details={response:?}");
263 /// # Ok(()) }
264 /// ```
265 ///
266 /// [content disposition]: https://datatracker.ietf.org/doc/html/rfc6266
267 pub fn with_content_disposition<V: Into<String>>(mut self, v: V) -> Self {
268 self.mut_resource().content_disposition = v.into();
269 self
270 }
271
272 /// Sets the [content encoding] for the object data.
273 ///
274 /// This can be used to upload compressed data and enable [transcoding] of
275 /// the data during downloads.
276 ///
277 /// # Example
278 /// ```
279 /// # use google_cloud_storage::client::Storage;
280 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
281 /// use flate2::write::GzEncoder;
282 /// use std::io::Write;
283 /// let mut e = GzEncoder::new(Vec::new(), flate2::Compression::default());
284 /// e.write_all(b"hello world");
285 /// let response = client
286 /// .upload_object("projects/_/buckets/my-bucket", "my-object", bytes::Bytes::from_owner(e.finish()?))
287 /// .with_content_encoding("gzip")
288 /// .send_buffered()
289 /// .await?;
290 /// println!("response details={response:?}");
291 /// # Ok(()) }
292 /// ```
293 ///
294 /// [transcoding]: https://cloud.google.com/storage/docs/transcoding
295 /// [content encoding]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.2
296 pub fn with_content_encoding<V: Into<String>>(mut self, v: V) -> Self {
297 self.mut_resource().content_encoding = v.into();
298 self
299 }
300
301 /// Sets the [content language] for the new object.
302 ///
303 /// Google Cloud Storage can serve content directly to web browsers. This
304 /// attribute sets the `Content-Language` header, which may change how the
305 /// browser displays the contents.
306 ///
307 /// # Example
308 /// ```
309 /// # use google_cloud_storage::client::Storage;
310 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
311 /// let response = client
312 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
313 /// .with_content_language("en")
314 /// .send_buffered()
315 /// .await?;
316 /// println!("response details={response:?}");
317 /// # Ok(()) }
318 /// ```
319 ///
320 /// [content language]: https://cloud.google.com/storage/docs/metadata#content-language
321 pub fn with_content_language<V: Into<String>>(mut self, v: V) -> Self {
322 self.mut_resource().content_language = v.into();
323 self
324 }
325
326 /// Sets the [content type] for the new object.
327 ///
328 /// Google Cloud Storage can serve content directly to web browsers. This
329 /// attribute sets the `Content-Type` header, which may change how the
330 /// browser interprets the contents.
331 ///
332 /// # Example
333 /// ```
334 /// # use google_cloud_storage::client::Storage;
335 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
336 /// let response = client
337 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
338 /// .with_content_type("text/plain")
339 /// .send_buffered()
340 /// .await?;
341 /// println!("response details={response:?}");
342 /// # Ok(()) }
343 /// ```
344 ///
345 /// [content type]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.5
346 pub fn with_content_type<V: Into<String>>(mut self, v: V) -> Self {
347 self.mut_resource().content_type = v.into();
348 self
349 }
350
351 /// Sets the [custom time] for the new object.
352 ///
353 /// This field is typically set in order to use the [DaysSinceCustomTime]
354 /// condition in Object Lifecycle Management.
355 ///
356 /// # Example
357 /// ```
358 /// # use google_cloud_storage::client::Storage;
359 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
360 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
361 /// let response = client
362 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
363 /// .with_custom_time(time)
364 /// .send_buffered()
365 /// .await?;
366 /// println!("response details={response:?}");
367 /// # Ok(()) }
368 /// ```
369 ///
370 /// [DaysSinceCustomTime]: https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
371 /// [custom time]: https://cloud.google.com/storage/docs/metadata#custom-time
372 pub fn with_custom_time<V: Into<wkt::Timestamp>>(mut self, v: V) -> Self {
373 self.mut_resource().custom_time = Some(v.into());
374 self
375 }
376
377 /// Sets the [event based hold] flag for the new object.
378 ///
379 /// This field is typically set in order to prevent objects from being
380 /// deleted or modified.
381 ///
382 /// # Example
383 /// ```
384 /// # use google_cloud_storage::client::Storage;
385 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
386 /// let response = client
387 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
388 /// .with_event_based_hold(true)
389 /// .send_buffered()
390 /// .await?;
391 /// println!("response details={response:?}");
392 /// # Ok(()) }
393 /// ```
394 ///
395 /// [event based hold]: https://cloud.google.com/storage/docs/object-holds
396 pub fn with_event_based_hold<V: Into<bool>>(mut self, v: V) -> Self {
397 self.mut_resource().event_based_hold = Some(v.into());
398 self
399 }
400
401 /// Sets the [custom metadata] for the new object.
402 ///
403 /// This field is typically set to annotate the object with
404 /// application-specific metadata.
405 ///
406 /// # Example
407 /// ```
408 /// # use google_cloud_storage::client::Storage;
409 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
410 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
411 /// let response = client
412 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
413 /// .with_metadata([("test-only", "true"), ("environment", "qa")])
414 /// .send_buffered()
415 /// .await?;
416 /// println!("response details={response:?}");
417 /// # Ok(()) }
418 /// ```
419 ///
420 /// [custom metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
421 pub fn with_metadata<I, K, V>(mut self, i: I) -> Self
422 where
423 I: IntoIterator<Item = (K, V)>,
424 K: Into<String>,
425 V: Into<String>,
426 {
427 self.mut_resource().metadata = i.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
428 self
429 }
430
431 /// Sets the [retention configuration] for the new object.
432 ///
433 /// # Example
434 /// ```
435 /// # use google_cloud_storage::client::Storage;
436 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
437 /// # use google_cloud_storage::model::object::{Retention, retention};
438 /// let response = client
439 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
440 /// .with_retention(
441 /// Retention::new()
442 /// .set_mode(retention::Mode::Locked)
443 /// .set_retain_until_time(wkt::Timestamp::try_from("2035-01-01T00:00:00Z")?))
444 /// .send_buffered()
445 /// .await?;
446 /// println!("response details={response:?}");
447 /// # Ok(()) }
448 /// ```
449 ///
450 /// [retention configuration]: https://cloud.google.com/storage/docs/metadata#retention-config
451 pub fn with_retention<V>(mut self, v: V) -> Self
452 where
453 V: Into<crate::model::object::Retention>,
454 {
455 self.mut_resource().retention = Some(v.into());
456 self
457 }
458
459 /// Sets the [storage class] for the new object.
460 ///
461 /// # Example
462 /// ```
463 /// # use google_cloud_storage::client::Storage;
464 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
465 /// let response = client
466 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
467 /// .with_storage_class("ARCHIVE")
468 /// .send_buffered()
469 /// .await?;
470 /// println!("response details={response:?}");
471 /// # Ok(()) }
472 /// ```
473 ///
474 /// [storage class]: https://cloud.google.com/storage/docs/storage-classes
475 pub fn with_storage_class<V>(mut self, v: V) -> Self
476 where
477 V: Into<String>,
478 {
479 self.mut_resource().storage_class = v.into();
480 self
481 }
482
483 /// Sets the [temporary hold] flag for the new object.
484 ///
485 /// This field is typically set in order to prevent objects from being
486 /// deleted or modified.
487 ///
488 /// # Example
489 /// ```
490 /// # use google_cloud_storage::client::Storage;
491 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
492 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
493 /// let response = client
494 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
495 /// .with_temporary_hold(true)
496 /// .send_buffered()
497 /// .await?;
498 /// println!("response details={response:?}");
499 /// # Ok(()) }
500 /// ```
501 ///
502 /// [temporary hold]: https://cloud.google.com/storage/docs/object-holds
503 pub fn with_temporary_hold<V: Into<bool>>(mut self, v: V) -> Self {
504 self.mut_resource().temporary_hold = v.into();
505 self
506 }
507
508 /// Sets the resource name of the [Customer-managed encryption key] for this
509 /// object.
510 ///
511 /// The service imposes a number of restrictions on the keys used to encrypt
512 /// Google Cloud Storage objects. Read the documentation in full before
513 /// trying to use customer-managed encryption keys. In particular, verify
514 /// the service has the necessary permissions, and the key is in a
515 /// compatible location.
516 ///
517 /// # Example
518 /// ```
519 /// # use google_cloud_storage::client::Storage;
520 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
521 /// let response = client
522 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
523 /// .with_kms_key("projects/test-project/locations/us-central1/keyRings/test-ring/cryptoKeys/test-key")
524 /// .send_buffered()
525 /// .await?;
526 /// println!("response details={response:?}");
527 /// # Ok(()) }
528 /// ```
529 ///
530 /// [Customer-managed encryption key]: https://cloud.google.com/storage/docs/encryption/customer-managed-keys
531 pub fn with_kms_key<V>(mut self, v: V) -> Self
532 where
533 V: Into<String>,
534 {
535 self.mut_resource().kms_key = v.into();
536 self
537 }
538
539 /// Configure this object to use one of the [predefined ACLs].
540 ///
541 /// # Example
542 /// ```
543 /// # use google_cloud_storage::client::Storage;
544 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
545 /// let response = client
546 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
547 /// .with_predefined_acl("private")
548 /// .send_buffered()
549 /// .await?;
550 /// println!("response details={response:?}");
551 /// # Ok(()) }
552 /// ```
553 ///
554 /// [predefined ACLs]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
555 pub fn with_predefined_acl<V>(mut self, v: V) -> Self
556 where
557 V: Into<String>,
558 {
559 self.spec.predefined_acl = v.into();
560 self
561 }
562
563 /// The encryption key used with the Customer-Supplied Encryption Keys
564 /// feature. In raw bytes format (not base64-encoded).
565 ///
566 /// # Example
567 /// ```
568 /// # use google_cloud_storage::client::Storage;
569 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
570 /// # use google_cloud_storage::builder::storage::KeyAes256;
571 /// let key: &[u8] = &[97; 32];
572 /// let response = client
573 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
574 /// .with_key(KeyAes256::new(key)?)
575 /// .send_buffered()
576 /// .await?;
577 /// println!("response details={response:?}");
578 /// # Ok(()) }
579 /// ```
580 pub fn with_key(mut self, v: KeyAes256) -> Self {
581 self.params = Some(v.into());
582 self
583 }
584
585 /// Configure the idempotency for this upload.
586 ///
587 /// By default, the client library treats single-shot uploads without
588 /// preconditions, as non-idempotent. If the destination bucket is
589 /// configured with [object versioning] then the operation may succeed
590 /// multiple times with observable side-effects. With object versioning and
591 /// a [lifecycle] policy limiting the number of versions, uploading the same
592 /// data multiple times may result in data loss.
593 ///
594 /// The client library cannot efficiently determine if these conditions
595 /// apply to your upload. If they do, or your application can tolerate
596 /// multiple versions of the same data for other reasons, consider using
597 /// `with_idempotency(true)`.
598 ///
599 /// The client library treats resumable uploads as idempotent, regardless of
600 /// the value in this option. Such uploads can succeed at most once.
601 ///
602 /// # Example
603 /// ```
604 /// # use google_cloud_storage::client::Storage;
605 /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
606 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
607 /// use std::time::Duration;
608 /// use gax::retry_policy::RetryPolicyExt;
609 /// let response = client
610 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
611 /// .with_idempotency(true)
612 /// .send_buffered()
613 /// .await?;
614 /// println!("response details={response:?}");
615 /// # Ok(()) }
616 /// ```
617 ///
618 /// [lifecycle]: https://cloud.google.com/storage/docs/lifecycle
619 /// [object versioning]: https://cloud.google.com/storage/docs/object-versioning
620 pub fn with_idempotency(mut self, v: bool) -> Self {
621 self.options.idempotency = Some(v);
622 self
623 }
624
625 /// The retry policy used for this request.
626 ///
627 /// # Example
628 /// ```
629 /// # use google_cloud_storage::client::Storage;
630 /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
631 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
632 /// use std::time::Duration;
633 /// use gax::retry_policy::RetryPolicyExt;
634 /// let response = client
635 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
636 /// .with_retry_policy(RecommendedPolicy
637 /// .with_attempt_limit(5)
638 /// .with_time_limit(Duration::from_secs(10)),
639 /// )
640 /// .send_buffered()
641 /// .await?;
642 /// println!("response details={response:?}");
643 /// # Ok(()) }
644 /// ```
645 pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
646 self.options.retry_policy = v.into().into();
647 self
648 }
649
650 /// The backoff policy used for this request.
651 ///
652 /// # Example
653 /// ```
654 /// # use google_cloud_storage::client::Storage;
655 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
656 /// use std::time::Duration;
657 /// use gax::exponential_backoff::ExponentialBackoff;
658 /// let response = client
659 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
660 /// .with_backoff_policy(ExponentialBackoff::default())
661 /// .send_buffered()
662 /// .await?;
663 /// println!("response details={response:?}");
664 /// # Ok(()) }
665 /// ```
666 pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
667 mut self,
668 v: V,
669 ) -> Self {
670 self.options.backoff_policy = v.into().into();
671 self
672 }
673
674 /// The retry throttler used for this request.
675 ///
676 /// Most of the time you want to use the same throttler for all the requests
677 /// in a client, and even the same throttler for many clients. Rarely it
678 /// may be necessary to use an custom throttler for some subset of the
679 /// requests.
680 ///
681 /// # Example
682 /// ```
683 /// # use google_cloud_storage::client::Storage;
684 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
685 /// let response = client
686 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
687 /// .with_retry_throttler(adhoc_throttler())
688 /// .send_buffered()
689 /// .await?;
690 /// println!("response details={response:?}");
691 /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
692 /// # panic!();
693 /// }
694 /// # Ok(()) }
695 /// ```
696 pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
697 mut self,
698 v: V,
699 ) -> Self {
700 self.options.retry_throttler = v.into().into();
701 self
702 }
703
704 /// Sets the payload size threshold to switch from single-shot to resumable uploads.
705 ///
706 /// # Example
707 /// ```
708 /// # use google_cloud_storage::client::Storage;
709 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
710 /// let response = client
711 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
712 /// .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
713 /// .send_buffered()
714 /// .await?;
715 /// println!("response details={response:?}");
716 /// # Ok(()) }
717 /// ```
718 ///
719 /// The client library can perform uploads using [single-shot] or
720 /// [resumable] uploads. For small objects, single-shot uploads offer better
721 /// performance, as they require a single HTTP transfer. For larger objects,
722 /// the additional request latency is not significant, and resumable uploads
723 /// offer better recovery on errors.
724 ///
725 /// The library automatically selects resumable uploads when the payload is
726 /// equal to or larger than this option. For smaller uploads the client
727 /// library uses single-shot uploads.
728 ///
729 /// The exact threshold depends on where the application is deployed and
730 /// destination bucket location with respect to where the application is
731 /// running. The library defaults should work well in most cases, but some
732 /// applications may benefit from fine-tuning.
733 ///
734 /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
735 /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
736 pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
737 self.options.resumable_upload_threshold = v.into();
738 self
739 }
740
741 /// Changes the buffer size for some resumable uploads.
742 ///
743 /// # Example
744 /// ```
745 /// # use google_cloud_storage::client::Storage;
746 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
747 /// let response = client
748 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
749 /// .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
750 /// .send_buffered()
751 /// .await?;
752 /// println!("response details={response:?}");
753 /// # Ok(()) }
754 /// ```
755 ///
756 /// When performing [resumable uploads] from sources without [Seek] the
757 /// client library needs to buffer data in memory until it is persisted by
758 /// the service. Otherwise the data would be lost if the upload fails.
759 /// Applications may want to tune this buffer size:
760 ///
761 /// - Use smaller buffer sizes to support more concurrent uploads in the
762 /// same application.
763 /// - Use larger buffer sizes for better throughput. Sending many small
764 /// buffers stalls the upload until the client receives a successful
765 /// response from the service.
766 ///
767 /// Keep in mind that there are diminishing returns on using larger buffers.
768 ///
769 /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
770 /// [Seek]: crate::upload_source::Seek
771 pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
772 self.options.resumable_upload_buffer_size = v.into();
773 self
774 }
775
776 fn mut_resource(&mut self) -> &mut crate::model::Object {
777 self.spec
778 .resource
779 .as_mut()
780 .expect("resource field initialized in `new()`")
781 }
782
783 pub(crate) fn build(self) -> PerformUpload<C, Payload<T>> {
784 PerformUpload::new(
785 self.checksum,
786 self.payload,
787 self.inner,
788 self.spec,
789 self.params,
790 self.options,
791 )
792 }
793
794 fn switch_checksum<F, U>(self, new: F) -> UploadObject<T, U>
795 where
796 F: FnOnce(C) -> U,
797 {
798 UploadObject {
799 payload: self.payload,
800 inner: self.inner,
801 spec: self.spec,
802 params: self.params,
803 options: self.options,
804 checksum: new(self.checksum),
805 }
806 }
807
808 fn set_crc32c<V: Into<u32>>(mut self, v: V) -> Self {
809 let checksum = self.mut_resource().checksums.get_or_insert_default();
810 checksum.crc32c = Some(v.into());
811 self
812 }
813
814 pub fn set_md5_hash<I, V>(mut self, i: I) -> Self
815 where
816 I: IntoIterator<Item = V>,
817 V: Into<u8>,
818 {
819 let checksum = self.mut_resource().checksums.get_or_insert_default();
820 checksum.md5_hash = i.into_iter().map(|v| v.into()).collect();
821 self
822 }
823}
824
825impl<T> UploadObject<T, Crc32c> {
826 /// Provide a precomputed value for the CRC32C checksum.
827 ///
828 /// # Example
829 /// ```
830 /// # use google_cloud_storage::client::Storage;
831 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
832 /// use crc32c::crc32c;
833 /// let response = client
834 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
835 /// .with_known_crc32c(crc32c(b"hello world"))
836 /// .send_buffered()
837 /// .await?;
838 /// println!("response details={response:?}");
839 /// # Ok(()) }
840 /// ```
841 ///
842 /// In some applications, the payload's CRC32C checksum is already known.
843 /// For example, the application may be downloading the data from another
844 /// blob storage system.
845 ///
846 /// In such cases, it is safer to pass the known CRC32C of the payload to
847 /// [Cloud Storage], and more efficient to skip the computation in the
848 /// client library.
849 ///
850 /// Note that once you provide a CRC32C value to this builder you cannot
851 /// use [compute_md5()] to also have the library compute the checksums.
852 ///
853 /// [compute_md5()]: UploadObject::compute_md5
854 pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, KnownCrc32c> {
855 let this = self.switch_checksum(|_| KnownCrc32c);
856 this.set_crc32c(v)
857 }
858
859 /// Provide a precomputed value for the MD5 hash.
860 ///
861 /// # Example
862 /// ```
863 /// # use google_cloud_storage::client::Storage;
864 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
865 /// use md5::compute;
866 /// let hash = md5::compute(b"hello world");
867 /// let response = client
868 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
869 /// .with_known_md5_hash(bytes::Bytes::from_owner(hash.0))
870 /// .send_buffered()
871 /// .await?;
872 /// println!("response details={response:?}");
873 /// # Ok(()) }
874 /// ```
875 ///
876 /// In some applications, the payload's MD5 hash is already known. For
877 /// example, the application may be downloading the data from another blob
878 /// storage system.
879 ///
880 /// In such cases, it is safer to pass the known MD5 of the payload to
881 /// [Cloud Storage], and more efficient to skip the computation in the
882 /// client library.
883 ///
884 /// Note that once you provide a MD5 value to this builder you cannot
885 /// use [compute_md5()] to also have the library compute the checksums.
886 ///
887 /// [compute_md5()]: UploadObject::compute_md5
888 pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Crc32c<KnownMd5>>
889 where
890 I: IntoIterator<Item = V>,
891 V: Into<u8>,
892 {
893 let this = self.switch_checksum(|_| Crc32c::from_inner(KnownMd5));
894 this.set_md5_hash(i)
895 }
896
897 /// Enables computation of MD5 hashes.
898 ///
899 /// # Example
900 /// ```
901 /// # use google_cloud_storage::client::Storage;
902 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
903 /// let payload = tokio::fs::File::open("my-data").await?;
904 /// let response = client
905 /// .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
906 /// .compute_md5()
907 /// .send_buffered()
908 /// .await?;
909 /// println!("response details={response:?}");
910 /// # Ok(()) }
911 /// ```
912 ///
913 /// See [precompute_checksums][UploadObject::precompute_checksums] for more
914 /// details on how checksums are used by the client library and their
915 /// limitations.
916 pub fn compute_md5(self) -> UploadObject<T, Md5<Crc32c>> {
917 self.switch_checksum(Md5::from_inner)
918 }
919}
920
921impl<T> UploadObject<T, Crc32c<KnownMd5>> {
922 /// See [UploadObject<T, Crc32c>::with_known_crc32c].
923 pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, Known> {
924 let this = self.switch_checksum(|_| Known);
925 this.set_crc32c(v)
926 }
927}
928
929impl<T> UploadObject<T, Md5<Crc32c>> {
930 /// See [UploadObject<T, Crc32c>::with_known_crc32c].
931 pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, Md5<KnownCrc32c>> {
932 let this = self.switch_checksum(|_| Md5::from_inner(KnownCrc32c));
933 this.set_crc32c(v)
934 }
935
936 /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
937 pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Crc32c<KnownMd5>>
938 where
939 I: IntoIterator<Item = V>,
940 V: Into<u8>,
941 {
942 let this = self.switch_checksum(|_| Crc32c::from_inner(KnownMd5));
943 this.set_md5_hash(i)
944 }
945}
946
947impl<T> UploadObject<T, Md5<KnownCrc32c>> {
948 /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
949 pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Known>
950 where
951 I: IntoIterator<Item = V>,
952 V: Into<u8>,
953 {
954 let this = self.switch_checksum(|_| Known);
955 this.set_md5_hash(i)
956 }
957}
958
959impl<T> UploadObject<T, KnownCrc32c> {
960 /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
961 pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Known>
962 where
963 I: IntoIterator<Item = V>,
964 V: Into<u8>,
965 {
966 let this = self.switch_checksum(|_| Known);
967 this.set_md5_hash(i)
968 }
969
970 /// See [UploadObject<T, Crc32c>::compute_md5()].
971 pub fn compute_md5(self) -> UploadObject<T, Md5<KnownCrc32c>> {
972 self.switch_checksum(Md5::from_inner)
973 }
974}
975
976impl<T> UploadObject<T> {
977 pub(crate) fn new<B, O, P>(
978 inner: std::sync::Arc<StorageInner>,
979 bucket: B,
980 object: O,
981 payload: P,
982 ) -> Self
983 where
984 B: Into<String>,
985 O: Into<String>,
986 P: Into<Payload<T>>,
987 {
988 let options = inner.options.clone();
989 let resource = crate::model::Object::new()
990 .set_bucket(bucket)
991 .set_name(object);
992 UploadObject {
993 inner,
994 spec: crate::model::WriteObjectSpec::new().set_resource(resource),
995 params: None,
996 payload: payload.into(),
997 options,
998 checksum: Crc32c::default(),
999 }
1000 }
1001}
1002
1003impl<T, C> UploadObject<T, C>
1004where
1005 C: ChecksumEngine + Send + Sync + 'static,
1006 T: StreamingSource + Seek + Send + Sync + 'static,
1007 <T as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
1008 <T as Seek>::Error: std::error::Error + Send + Sync + 'static,
1009{
1010 /// A simple upload from a buffer.
1011 ///
1012 /// # Example
1013 /// ```
1014 /// # use google_cloud_storage::client::Storage;
1015 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1016 /// let response = client
1017 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1018 /// .send_unbuffered()
1019 /// .await?;
1020 /// println!("response details={response:?}");
1021 /// # Ok(()) }
1022 /// ```
1023 pub async fn send_unbuffered(self) -> Result<Object> {
1024 self.build().send_unbuffered().await
1025 }
1026
1027 /// Precompute the payload checksums before uploading the data.
1028 ///
1029 /// If the checksums are known when the upload starts, the client library
1030 /// can include the checksums with the upload request, and the service can
1031 /// reject the upload if the payload and the checksums do not match.
1032 ///
1033 /// # Example
1034 /// ```
1035 /// # use google_cloud_storage::client::Storage;
1036 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1037 /// let payload = tokio::fs::File::open("my-data").await?;
1038 /// let response = client
1039 /// .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
1040 /// .precompute_checksums()
1041 /// .await?
1042 /// .send_unbuffered()
1043 /// .await?;
1044 /// println!("response details={response:?}");
1045 /// # Ok(()) }
1046 /// ```
1047 ///
1048 /// Precomputing the checksums can be expensive if the data source is slow
1049 /// to read. Therefore, the client library does not precompute the checksums
1050 /// by default. The client library compares the checksums computed by the
1051 /// service against its own checksums. If they do not match, the client
1052 /// library returns an error. However, the service has already created the
1053 /// object with the (likely incorrect) data.
1054 ///
1055 /// The client library currently uses the [JSON API], it is not possible to
1056 /// send the checksums at the end of the upload with this API.
1057 ///
1058 /// [JSON API]: https://cloud.google.com/storage/docs/json_api
1059 pub async fn precompute_checksums(mut self) -> Result<UploadObject<T, Known>>
1060 where
1061 C: ChecksumEngine + Send + Sync + 'static,
1062 {
1063 let mut offset = 0_u64;
1064 self.payload.seek(offset).await.map_err(Error::ser)?;
1065 while let Some(n) = self.payload.next().await.transpose().map_err(Error::ser)? {
1066 self.checksum.update(offset, &n);
1067 offset += n.len() as u64;
1068 }
1069 self.payload.seek(0_u64).await.map_err(Error::ser)?;
1070 let computed = self.checksum.finalize();
1071 let current = self.mut_resource().checksums.get_or_insert_default();
1072 checksum_update(current, computed);
1073 Ok(self.switch_checksum(|_| Known))
1074 }
1075}
1076
1077impl<T, C> UploadObject<T, C>
1078where
1079 C: ChecksumEngine + Send + Sync + 'static,
1080 T: StreamingSource + Send + Sync + 'static,
1081 T::Error: std::error::Error + Send + Sync + 'static,
1082{
1083 /// Upload an object from a streaming source without rewinds.
1084 ///
1085 /// # Example
1086 /// ```
1087 /// # use google_cloud_storage::client::Storage;
1088 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1089 /// let response = client
1090 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1091 /// .send_buffered()
1092 /// .await?;
1093 /// println!("response details={response:?}");
1094 /// # Ok(()) }
1095 /// ```
1096 pub async fn send_buffered(self) -> crate::Result<Object> {
1097 self.build().send().await
1098 }
1099}
1100
1101// We need `Debug` to use `expect_err()` in `Result<UploadObject, ...>`.
1102impl<T, C> std::fmt::Debug for UploadObject<T, C>
1103where
1104 C: std::fmt::Debug,
1105{
1106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1107 f.debug_struct("UploadObject")
1108 .field("inner", &self.inner)
1109 .field("spec", &self.spec)
1110 .field("params", &self.params)
1111 // skip payload, as it is not `Debug`
1112 .field("options", &self.options)
1113 .field("checksum", &self.checksum)
1114 .finish()
1115 }
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120 use super::client::tests::{test_builder, test_inner_client};
1121 use super::*;
1122 use crate::model::{ObjectChecksums, WriteObjectSpec};
1123 use crate::upload_source::tests::MockSeekSource;
1124 use std::error::Error as _;
1125 use std::io::{Error as IoError, ErrorKind};
1126
1127 type Result = anyhow::Result<()>;
1128
1129 // Verify `upload_object()` can be used with a source that implements `StreamingSource` **and** `Seek`
1130 #[tokio::test]
1131 async fn test_upload_streaming_source_and_seek() -> Result {
1132 struct Source;
1133 impl crate::upload_source::StreamingSource for Source {
1134 type Error = std::io::Error;
1135 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1136 None
1137 }
1138 }
1139 impl crate::upload_source::Seek for Source {
1140 type Error = std::io::Error;
1141 async fn seek(&mut self, _offset: u64) -> std::result::Result<(), Self::Error> {
1142 Ok(())
1143 }
1144 }
1145
1146 let client = Storage::builder()
1147 .with_credentials(auth::credentials::testing::test_credentials())
1148 .build()
1149 .await?;
1150 let _ = client.upload_object("projects/_/buckets/test-bucket", "test-object", Source);
1151 Ok(())
1152 }
1153
1154 // Verify `upload_object()` can be used with a source that **only** implements `StreamingSource`.
1155 #[tokio::test]
1156 async fn test_upload_only_streaming_source() -> Result {
1157 struct Source;
1158 impl crate::upload_source::StreamingSource for Source {
1159 type Error = std::io::Error;
1160 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1161 None
1162 }
1163 }
1164
1165 let client = Storage::builder()
1166 .with_credentials(auth::credentials::testing::test_credentials())
1167 .build()
1168 .await?;
1169 let _ = client.upload_object("projects/_/buckets/test-bucket", "test-object", Source);
1170 Ok(())
1171 }
1172
1173 // Verify `upload_object()` meets normal Send, Sync, requirements.
1174 #[tokio::test]
1175 async fn test_upload_is_send_and_static() -> Result {
1176 let client = Storage::builder()
1177 .with_credentials(auth::credentials::testing::test_credentials())
1178 .build()
1179 .await?;
1180
1181 fn need_send<T: Send>(_val: &T) {}
1182 fn need_sync<T: Sync>(_val: &T) {}
1183 fn need_static<T: 'static>(_val: &T) {}
1184
1185 let upload = client.upload_object("projects/_/buckets/test-bucket", "test-object", "");
1186 need_send(&upload);
1187 need_sync(&upload);
1188 need_static(&upload);
1189
1190 let upload = client
1191 .upload_object("projects/_/buckets/test-bucket", "test-object", "")
1192 .send_unbuffered();
1193 need_send(&upload);
1194 need_static(&upload);
1195
1196 let upload = client
1197 .upload_object("projects/_/buckets/test-bucket", "test-object", "")
1198 .send_buffered();
1199 need_send(&upload);
1200 need_static(&upload);
1201
1202 Ok(())
1203 }
1204
1205 #[test]
1206 fn upload_object_unbuffered_metadata() -> Result {
1207 use crate::model::ObjectAccessControl;
1208 let inner = test_inner_client(test_builder());
1209 let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1210 .with_if_generation_match(10)
1211 .with_if_generation_not_match(20)
1212 .with_if_metageneration_match(30)
1213 .with_if_metageneration_not_match(40)
1214 .with_predefined_acl("private")
1215 .with_acl([ObjectAccessControl::new()
1216 .set_entity("allAuthenticatedUsers")
1217 .set_role("READER")])
1218 .with_cache_control("public; max-age=7200")
1219 .with_content_disposition("inline")
1220 .with_content_encoding("gzip")
1221 .with_content_language("en")
1222 .with_content_type("text/plain")
1223 .with_known_crc32c(crc32c::crc32c(b""))
1224 .with_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1225 .with_event_based_hold(true)
1226 .with_known_md5_hash(md5::compute(b"").0)
1227 .with_metadata([("k0", "v0"), ("k1", "v1")])
1228 .with_retention(
1229 crate::model::object::Retention::new()
1230 .set_mode(crate::model::object::retention::Mode::Locked)
1231 .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
1232 )
1233 .with_storage_class("ARCHIVE")
1234 .with_temporary_hold(true)
1235 .with_kms_key("test-key");
1236
1237 let resource = request.spec.resource.take().unwrap();
1238 let request = request;
1239 assert_eq!(
1240 &request.spec,
1241 &WriteObjectSpec::new()
1242 .set_if_generation_match(10)
1243 .set_if_generation_not_match(20)
1244 .set_if_metageneration_match(30)
1245 .set_if_metageneration_not_match(40)
1246 .set_predefined_acl("private")
1247 );
1248
1249 assert_eq!(
1250 resource,
1251 Object::new()
1252 .set_name("object")
1253 .set_bucket("projects/_/buckets/bucket")
1254 .set_acl([ObjectAccessControl::new()
1255 .set_entity("allAuthenticatedUsers")
1256 .set_role("READER")])
1257 .set_cache_control("public; max-age=7200")
1258 .set_content_disposition("inline")
1259 .set_content_encoding("gzip")
1260 .set_content_language("en")
1261 .set_content_type("text/plain")
1262 .set_checksums(
1263 crate::model::ObjectChecksums::new()
1264 .set_crc32c(crc32c::crc32c(b""))
1265 .set_md5_hash(bytes::Bytes::from_iter(md5::compute(b"").0))
1266 )
1267 .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1268 .set_event_based_hold(true)
1269 .set_metadata([("k0", "v0"), ("k1", "v1")])
1270 .set_retention(
1271 crate::model::object::Retention::new()
1272 .set_mode("LOCKED")
1273 .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?)
1274 )
1275 .set_storage_class("ARCHIVE")
1276 .set_temporary_hold(true)
1277 .set_kms_key("test-key")
1278 );
1279
1280 Ok(())
1281 }
1282
1283 #[test]
1284 fn upload_object_options() {
1285 let inner = test_inner_client(
1286 test_builder()
1287 .with_resumable_upload_threshold(123_usize)
1288 .with_resumable_upload_buffer_size(234_usize),
1289 );
1290 let request = UploadObject::new(inner.clone(), "projects/_/buckets/bucket", "object", "");
1291 assert_eq!(request.options.resumable_upload_threshold, 123);
1292 assert_eq!(request.options.resumable_upload_buffer_size, 234);
1293
1294 let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1295 .with_resumable_upload_threshold(345_usize)
1296 .with_resumable_upload_buffer_size(456_usize);
1297 assert_eq!(request.options.resumable_upload_threshold, 345);
1298 assert_eq!(request.options.resumable_upload_buffer_size, 456);
1299 }
1300
1301 const QUICK: &str = "the quick brown fox jumps over the lazy dog";
1302 const VEXING: &str = "how vexingly quick daft zebras jump";
1303
1304 fn quick_checksum<E: ChecksumEngine>(mut engine: E) -> ObjectChecksums {
1305 engine.update(0, &bytes::Bytes::from_static(QUICK.as_bytes()));
1306 engine.finalize()
1307 }
1308
1309 async fn collect<S: StreamingSource>(mut stream: S) -> anyhow::Result<Vec<u8>> {
1310 let mut collected = Vec::new();
1311 while let Some(b) = stream.next().await.transpose()? {
1312 collected.extend_from_slice(&b);
1313 }
1314 Ok(collected)
1315 }
1316
1317 #[tokio::test]
1318 async fn checksum_default() -> Result {
1319 let client = test_builder().build().await?;
1320 let upload = client
1321 .upload_object("my-bucket", "my-object", QUICK)
1322 .precompute_checksums()
1323 .await?;
1324 let want = quick_checksum(Crc32c::default());
1325 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1326 let collected = collect(upload.payload).await?;
1327 assert_eq!(collected, QUICK.as_bytes());
1328 Ok(())
1329 }
1330
1331 #[tokio::test]
1332 async fn checksum_md5_and_crc32c() -> Result {
1333 let client = test_builder().build().await?;
1334 let upload = client
1335 .upload_object("my-bucket", "my-object", QUICK)
1336 .compute_md5()
1337 .precompute_checksums()
1338 .await?;
1339 let want = quick_checksum(Crc32c::from_inner(Md5::default()));
1340 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1341 Ok(())
1342 }
1343
1344 #[tokio::test]
1345 async fn checksum_precomputed() -> Result {
1346 let mut engine = Crc32c::from_inner(Md5::default());
1347 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1348 let ck = engine.finalize();
1349
1350 let client = test_builder().build().await?;
1351 let upload = client
1352 .upload_object("my-bucket", "my-object", QUICK)
1353 .with_known_crc32c(ck.crc32c.unwrap())
1354 .with_known_md5_hash(ck.md5_hash.clone())
1355 .precompute_checksums()
1356 .await?;
1357 // Note that the checksums do not match the data. This is intentional,
1358 // we are trying to verify that whatever is provided in with_crc32c()
1359 // and with_md5() is respected.
1360 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(ck));
1361
1362 Ok(())
1363 }
1364
1365 #[tokio::test]
1366 async fn checksum_crc32c_known_md5_computed() -> Result {
1367 let mut engine = Crc32c::from_inner(Md5::default());
1368 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1369 let ck = engine.finalize();
1370
1371 let client = test_builder().build().await?;
1372 let upload = client
1373 .upload_object("my-bucket", "my-object", QUICK)
1374 .compute_md5()
1375 .with_known_crc32c(ck.crc32c.unwrap())
1376 .precompute_checksums()
1377 .await?;
1378 // Note that the checksums do not match the data. This is intentional,
1379 // we are trying to verify that whatever is provided in with_known*()
1380 // is respected.
1381 let want = quick_checksum(Md5::default()).set_crc32c(ck.crc32c.unwrap());
1382 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1383
1384 Ok(())
1385 }
1386
1387 #[tokio::test]
1388 async fn checksum_mixed_then_precomputed() -> Result {
1389 let mut engine = Crc32c::from_inner(Md5::default());
1390 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1391 let ck = engine.finalize();
1392
1393 let client = test_builder().build().await?;
1394 let upload = client
1395 .upload_object("my-bucket", "my-object", QUICK)
1396 .with_known_md5_hash(ck.md5_hash.clone())
1397 .with_known_crc32c(ck.crc32c.unwrap())
1398 .precompute_checksums()
1399 .await?;
1400 // Note that the checksums do not match the data. This is intentional,
1401 // we are trying to verify that whatever is provided in with_known*()
1402 // is respected.
1403 let want = ck.clone();
1404 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1405
1406 Ok(())
1407 }
1408
1409 #[tokio::test]
1410 async fn checksum_full_computed_then_md5_precomputed() -> Result {
1411 let mut engine = Crc32c::from_inner(Md5::default());
1412 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1413 let ck = engine.finalize();
1414
1415 let client = test_builder().build().await?;
1416 let upload = client
1417 .upload_object("my-bucket", "my-object", QUICK)
1418 .compute_md5()
1419 .with_known_md5_hash(ck.md5_hash.clone())
1420 .precompute_checksums()
1421 .await?;
1422 // Note that the checksums do not match the data. This is intentional,
1423 // we are trying to verify that whatever is provided in with_known*()
1424 // is respected.
1425 let want = quick_checksum(Crc32c::default()).set_md5_hash(ck.md5_hash.clone());
1426 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1427
1428 Ok(())
1429 }
1430
1431 #[tokio::test]
1432 async fn checksum_known_crc32_then_computed_md5() -> Result {
1433 let mut engine = Crc32c::from_inner(Md5::default());
1434 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1435 let ck = engine.finalize();
1436
1437 let client = test_builder().build().await?;
1438 let upload = client
1439 .upload_object("my-bucket", "my-object", QUICK)
1440 .with_known_crc32c(ck.crc32c.unwrap())
1441 .compute_md5()
1442 .with_known_md5_hash(ck.md5_hash.clone())
1443 .precompute_checksums()
1444 .await?;
1445 // Note that the checksums do not match the data. This is intentional,
1446 // we are trying to verify that whatever is provided in with_known*()
1447 // is respected.
1448 let want = ck.clone();
1449 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1450
1451 Ok(())
1452 }
1453
1454 #[tokio::test]
1455 async fn checksum_known_crc32_then_known_md5() -> Result {
1456 let mut engine = Crc32c::from_inner(Md5::default());
1457 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1458 let ck = engine.finalize();
1459
1460 let client = test_builder().build().await?;
1461 let upload = client
1462 .upload_object("my-bucket", "my-object", QUICK)
1463 .with_known_crc32c(ck.crc32c.unwrap())
1464 .with_known_md5_hash(ck.md5_hash.clone())
1465 .precompute_checksums()
1466 .await?;
1467 // Note that the checksums do not match the data. This is intentional,
1468 // we are trying to verify that whatever is provided in with_known*()
1469 // is respected.
1470 let want = ck.clone();
1471 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1472
1473 Ok(())
1474 }
1475
1476 #[tokio::test]
1477 async fn precompute_checksums_seek_error() -> Result {
1478 let mut source = MockSeekSource::new();
1479 source
1480 .expect_seek()
1481 .once()
1482 .returning(|_| Err(IoError::new(ErrorKind::Deadlock, "test-only")));
1483
1484 let client = test_builder().build().await?;
1485 let err = client
1486 .upload_object("my-bucket", "my-object", source)
1487 .precompute_checksums()
1488 .await
1489 .expect_err("seek() returns an error");
1490 assert!(err.is_serialization(), "{err:?}");
1491 assert!(
1492 err.source()
1493 .and_then(|e| e.downcast_ref::<IoError>())
1494 .is_some(),
1495 "{err:?}"
1496 );
1497
1498 Ok(())
1499 }
1500
1501 #[tokio::test]
1502 async fn precompute_checksums_next_error() -> Result {
1503 let mut source = MockSeekSource::new();
1504 source.expect_seek().returning(|_| Ok(()));
1505 let mut seq = mockall::Sequence::new();
1506 source
1507 .expect_next()
1508 .times(3)
1509 .in_sequence(&mut seq)
1510 .returning(|| Some(Ok(bytes::Bytes::new())));
1511 source
1512 .expect_next()
1513 .once()
1514 .in_sequence(&mut seq)
1515 .returning(|| Some(Err(IoError::new(ErrorKind::BrokenPipe, "test-only"))));
1516
1517 let client = test_builder().build().await?;
1518 let err = client
1519 .upload_object("my-bucket", "my-object", source)
1520 .precompute_checksums()
1521 .await
1522 .expect_err("seek() returns an error");
1523 assert!(err.is_serialization(), "{err:?}");
1524 assert!(
1525 err.source()
1526 .and_then(|e| e.downcast_ref::<IoError>())
1527 .is_some(),
1528 "{err:?}"
1529 );
1530
1531 Ok(())
1532 }
1533
1534 #[tokio::test]
1535 async fn debug() -> Result {
1536 let client = test_builder().build().await?;
1537 let upload = client
1538 .upload_object("my-bucket", "my-object", "")
1539 .precompute_checksums()
1540 .await;
1541
1542 let fmt = format!("{upload:?}");
1543 ["UploadObject", "inner", "spec", "options", "checksum"]
1544 .into_iter()
1545 .for_each(|text| {
1546 assert!(fmt.contains(text), "expected {text} in {fmt}");
1547 });
1548 Ok(())
1549 }
1550}