google_cloud_storage/storage/upload_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Contains the request builder for [upload_object()] and related types.
16//!
17//! [upload_object()]: crate::storage::client::Storage::upload_object()
18
19use super::client::*;
20use super::perform_upload::PerformUpload;
21use super::upload_source::{Seek, StreamingSource};
22use super::*;
23use crate::storage::checksum::{ChecksumEngine, Crc32c, Known, KnownCrc32c, KnownMd5, Md5};
24
25/// A request builder for object uploads.
26///
27/// # Example: hello world
28/// ```
29/// use google_cloud_storage::client::Storage;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31/// let response = client
32/// .upload_object("projects/_/buckets/my-bucket", "hello", "Hello World!")
33/// .send_unbuffered()
34/// .await?;
35/// println!("response details={response:?}");
36/// Ok(())
37/// }
38/// ```
39///
40/// # Example: upload a file
41/// ```
42/// use google_cloud_storage::client::Storage;
43/// async fn sample(client: &Storage) -> anyhow::Result<()> {
44/// let payload = tokio::fs::File::open("my-data").await?;
45/// let response = client
46/// .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
47/// .send_unbuffered()
48/// .await?;
49/// println!("response details={response:?}");
50/// Ok(())
51/// }
52/// ```
53///
54/// # Example: upload a custom data source
55/// ```
56/// use google_cloud_storage::{client::Storage, upload_source::StreamingSource};
57/// struct DataSource;
58/// impl StreamingSource for DataSource {
59/// type Error = std::io::Error;
60/// async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
61/// # panic!();
62/// }
63/// }
64///
65/// async fn sample(client: &Storage) -> anyhow::Result<()> {
66/// let response = client
67/// .upload_object("projects/_/buckets/my-bucket", "my-object", DataSource)
68/// .send()
69/// .await?;
70/// println!("response details={response:?}");
71/// Ok(())
72/// }
73/// ```
74pub struct UploadObject<T, C = Crc32c> {
75 inner: std::sync::Arc<StorageInner>,
76 spec: crate::model::WriteObjectSpec,
77 params: Option<crate::model::CommonObjectRequestParams>,
78 payload: Payload<T>,
79 options: super::request_options::RequestOptions,
80 checksum: C,
81}
82
83impl<T, C> UploadObject<T, C> {
84 /// Set a [request precondition] on the object generation to match.
85 ///
86 /// With this precondition the request fails if the current object
87 /// generation matches the provided value. A common value is `0`, which
88 /// prevents uploads from succeeding if the object already exists.
89 ///
90 /// # Example
91 /// ```
92 /// # use google_cloud_storage::client::Storage;
93 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
94 /// let response = client
95 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
96 /// .with_if_generation_match(0)
97 /// .send()
98 /// .await?;
99 /// println!("response details={response:?}");
100 /// # Ok(()) }
101 /// ```
102 ///
103 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
104 pub fn with_if_generation_match<V>(mut self, v: V) -> Self
105 where
106 V: Into<i64>,
107 {
108 self.spec.if_generation_match = Some(v.into());
109 self
110 }
111
112 /// Set a [request precondition] on the object generation to match.
113 ///
114 /// With this precondition the request fails if the current object
115 /// generation does not match the provided value.
116 ///
117 /// # Example
118 /// ```
119 /// # use google_cloud_storage::client::Storage;
120 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
121 /// let response = client
122 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
123 /// .with_if_generation_not_match(0)
124 /// .send()
125 /// .await?;
126 /// println!("response details={response:?}");
127 /// # Ok(()) }
128 /// ```
129 ///
130 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
131 pub fn with_if_generation_not_match<V>(mut self, v: V) -> Self
132 where
133 V: Into<i64>,
134 {
135 self.spec.if_generation_not_match = Some(v.into());
136 self
137 }
138
139 /// Set a [request precondition] on the object meta generation.
140 ///
141 /// With this precondition the request fails if the current object metadata
142 /// generation does not match the provided value. This may be useful to
143 /// prevent changes when the metageneration is known.
144 ///
145 /// # Example
146 /// ```
147 /// # use google_cloud_storage::client::Storage;
148 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
149 /// let response = client
150 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
151 /// .with_if_metageneration_match(1234)
152 /// .send()
153 /// .await?;
154 /// println!("response details={response:?}");
155 /// # Ok(()) }
156 /// ```
157 ///
158 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
159 pub fn with_if_metageneration_match<V>(mut self, v: V) -> Self
160 where
161 V: Into<i64>,
162 {
163 self.spec.if_metageneration_match = Some(v.into());
164 self
165 }
166
167 /// Set a [request precondition] on the object meta-generation.
168 ///
169 /// With this precondition the request fails if the current object metadata
170 /// generation matches the provided value. This is rarely useful in uploads,
171 /// it is more commonly used on downloads to prevent downloads if the value
172 /// is already cached.
173 ///
174 /// # Example
175 /// ```
176 /// # use google_cloud_storage::client::Storage;
177 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
178 /// let response = client
179 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
180 /// .with_if_metageneration_not_match(1234)
181 /// .send()
182 /// .await?;
183 /// println!("response details={response:?}");
184 /// # Ok(()) }
185 /// ```
186 ///
187 /// [request precondition]: https://cloud.google.com/storage/docs/request-preconditions
188 pub fn with_if_metageneration_not_match<V>(mut self, v: V) -> Self
189 where
190 V: Into<i64>,
191 {
192 self.spec.if_metageneration_not_match = Some(v.into());
193 self
194 }
195
196 /// Sets the ACL for the new object.
197 ///
198 /// # Example
199 /// ```
200 /// # use google_cloud_storage::client::Storage;
201 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
202 /// # use google_cloud_storage::model::ObjectAccessControl;
203 /// let response = client
204 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
205 /// .with_acl([ObjectAccessControl::new().set_entity("allAuthenticatedUsers").set_role("READER")])
206 /// .send()
207 /// .await?;
208 /// println!("response details={response:?}");
209 /// # Ok(()) }
210 /// ```
211 pub fn with_acl<I, V>(mut self, v: I) -> Self
212 where
213 I: IntoIterator<Item = V>,
214 V: Into<crate::model::ObjectAccessControl>,
215 {
216 self.mut_resource().acl = v.into_iter().map(|a| a.into()).collect();
217 self
218 }
219
220 /// Sets the [cache control] for the new object.
221 ///
222 /// This can be used to control caching in [public objects].
223 ///
224 /// # Example
225 /// ```
226 /// # use google_cloud_storage::client::Storage;
227 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
228 /// let response = client
229 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
230 /// .with_cache_control("public; max-age=7200")
231 /// .send()
232 /// .await?;
233 /// println!("response details={response:?}");
234 /// # Ok(()) }
235 /// ```
236 ///
237 /// [public objects]: https://cloud.google.com/storage/docs/access-control/making-data-public
238 /// [cache control]: https://datatracker.ietf.org/doc/html/rfc7234#section-5.2
239 pub fn with_cache_control<V: Into<String>>(mut self, v: V) -> Self {
240 self.mut_resource().cache_control = v.into();
241 self
242 }
243
244 /// Sets the [content disposition] for the new object.
245 ///
246 /// Google Cloud Storage can serve content directly to web browsers. This
247 /// attribute sets the `Content-Disposition` header, which may change how
248 /// the browser displays the contents.
249 ///
250 /// # Example
251 /// ```
252 /// # use google_cloud_storage::client::Storage;
253 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
254 /// let response = client
255 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
256 /// .with_content_disposition("inline")
257 /// .send()
258 /// .await?;
259 /// println!("response details={response:?}");
260 /// # Ok(()) }
261 /// ```
262 ///
263 /// [content disposition]: https://datatracker.ietf.org/doc/html/rfc6266
264 pub fn with_content_disposition<V: Into<String>>(mut self, v: V) -> Self {
265 self.mut_resource().content_disposition = v.into();
266 self
267 }
268
269 /// Sets the [content encoding] for the object data.
270 ///
271 /// This can be used to upload compressed data and enable [transcoding] of
272 /// the data during downloads.
273 ///
274 /// # Example
275 /// ```
276 /// # use google_cloud_storage::client::Storage;
277 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
278 /// use flate2::write::GzEncoder;
279 /// use std::io::Write;
280 /// let mut e = GzEncoder::new(Vec::new(), flate2::Compression::default());
281 /// e.write_all(b"hello world");
282 /// let response = client
283 /// .upload_object("projects/_/buckets/my-bucket", "my-object", bytes::Bytes::from_owner(e.finish()?))
284 /// .with_content_encoding("gzip")
285 /// .send()
286 /// .await?;
287 /// println!("response details={response:?}");
288 /// # Ok(()) }
289 /// ```
290 ///
291 /// [transcoding]: https://cloud.google.com/storage/docs/transcoding
292 /// [content encoding]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.2
293 pub fn with_content_encoding<V: Into<String>>(mut self, v: V) -> Self {
294 self.mut_resource().content_encoding = v.into();
295 self
296 }
297
298 /// Sets the [content language] for the new object.
299 ///
300 /// Google Cloud Storage can serve content directly to web browsers. This
301 /// attribute sets the `Content-Language` header, which may change how the
302 /// browser displays the contents.
303 ///
304 /// # Example
305 /// ```
306 /// # use google_cloud_storage::client::Storage;
307 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
308 /// let response = client
309 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
310 /// .with_content_language("en")
311 /// .send()
312 /// .await?;
313 /// println!("response details={response:?}");
314 /// # Ok(()) }
315 /// ```
316 ///
317 /// [content language]: https://cloud.google.com/storage/docs/metadata#content-language
318 pub fn with_content_language<V: Into<String>>(mut self, v: V) -> Self {
319 self.mut_resource().content_language = v.into();
320 self
321 }
322
323 /// Sets the [content type] for the new object.
324 ///
325 /// Google Cloud Storage can serve content directly to web browsers. This
326 /// attribute sets the `Content-Type` header, which may change how the
327 /// browser interprets the contents.
328 ///
329 /// # Example
330 /// ```
331 /// # use google_cloud_storage::client::Storage;
332 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
333 /// let response = client
334 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
335 /// .with_content_type("text/plain")
336 /// .send()
337 /// .await?;
338 /// println!("response details={response:?}");
339 /// # Ok(()) }
340 /// ```
341 ///
342 /// [content type]: https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.5
343 pub fn with_content_type<V: Into<String>>(mut self, v: V) -> Self {
344 self.mut_resource().content_type = v.into();
345 self
346 }
347
348 /// Sets the [custom time] for the new object.
349 ///
350 /// This field is typically set in order to use the [DaysSinceCustomTime]
351 /// condition in Object Lifecycle Management.
352 ///
353 /// # Example
354 /// ```
355 /// # use google_cloud_storage::client::Storage;
356 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
357 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
358 /// let response = client
359 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
360 /// .with_custom_time(time)
361 /// .send()
362 /// .await?;
363 /// println!("response details={response:?}");
364 /// # Ok(()) }
365 /// ```
366 ///
367 /// [DaysSinceCustomTime]: https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
368 /// [custom time]: https://cloud.google.com/storage/docs/metadata#custom-time
369 pub fn with_custom_time<V: Into<wkt::Timestamp>>(mut self, v: V) -> Self {
370 self.mut_resource().custom_time = Some(v.into());
371 self
372 }
373
374 /// Sets the [event based hold] flag for the new object.
375 ///
376 /// This field is typically set in order to prevent objects from being
377 /// deleted or modified.
378 ///
379 /// # Example
380 /// ```
381 /// # use google_cloud_storage::client::Storage;
382 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
383 /// let response = client
384 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
385 /// .with_event_based_hold(true)
386 /// .send()
387 /// .await?;
388 /// println!("response details={response:?}");
389 /// # Ok(()) }
390 /// ```
391 ///
392 /// [event based hold]: https://cloud.google.com/storage/docs/object-holds
393 pub fn with_event_based_hold<V: Into<bool>>(mut self, v: V) -> Self {
394 self.mut_resource().event_based_hold = Some(v.into());
395 self
396 }
397
398 /// Sets the [custom metadata] for the new object.
399 ///
400 /// This field is typically set to annotate the object with
401 /// application-specific metadata.
402 ///
403 /// # Example
404 /// ```
405 /// # use google_cloud_storage::client::Storage;
406 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
407 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
408 /// let response = client
409 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
410 /// .with_metadata([("test-only", "true"), ("environment", "qa")])
411 /// .send()
412 /// .await?;
413 /// println!("response details={response:?}");
414 /// # Ok(()) }
415 /// ```
416 ///
417 /// [custom metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
418 pub fn with_metadata<I, K, V>(mut self, i: I) -> Self
419 where
420 I: IntoIterator<Item = (K, V)>,
421 K: Into<String>,
422 V: Into<String>,
423 {
424 self.mut_resource().metadata = i.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
425 self
426 }
427
428 /// Sets the [retention configuration] for the new object.
429 ///
430 /// # Example
431 /// ```
432 /// # use google_cloud_storage::client::Storage;
433 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
434 /// # use google_cloud_storage::model::object::{Retention, retention};
435 /// let response = client
436 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
437 /// .with_retention(
438 /// Retention::new()
439 /// .set_mode(retention::Mode::Locked)
440 /// .set_retain_until_time(wkt::Timestamp::try_from("2035-01-01T00:00:00Z")?))
441 /// .send()
442 /// .await?;
443 /// println!("response details={response:?}");
444 /// # Ok(()) }
445 /// ```
446 ///
447 /// [retention configuration]: https://cloud.google.com/storage/docs/metadata#retention-config
448 pub fn with_retention<V>(mut self, v: V) -> Self
449 where
450 V: Into<crate::model::object::Retention>,
451 {
452 self.mut_resource().retention = Some(v.into());
453 self
454 }
455
456 /// Sets the [storage class] for the new object.
457 ///
458 /// # Example
459 /// ```
460 /// # use google_cloud_storage::client::Storage;
461 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
462 /// let response = client
463 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
464 /// .with_storage_class("ARCHIVE")
465 /// .send()
466 /// .await?;
467 /// println!("response details={response:?}");
468 /// # Ok(()) }
469 /// ```
470 ///
471 /// [storage class]: https://cloud.google.com/storage/docs/storage-classes
472 pub fn with_storage_class<V>(mut self, v: V) -> Self
473 where
474 V: Into<String>,
475 {
476 self.mut_resource().storage_class = v.into();
477 self
478 }
479
480 /// Sets the [temporary hold] flag for the new object.
481 ///
482 /// This field is typically set in order to prevent objects from being
483 /// deleted or modified.
484 ///
485 /// # Example
486 /// ```
487 /// # use google_cloud_storage::client::Storage;
488 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
489 /// let time = wkt::Timestamp::try_from("2025-07-07T18:30:00Z")?;
490 /// let response = client
491 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
492 /// .with_temporary_hold(true)
493 /// .send()
494 /// .await?;
495 /// println!("response details={response:?}");
496 /// # Ok(()) }
497 /// ```
498 ///
499 /// [temporary hold]: https://cloud.google.com/storage/docs/object-holds
500 pub fn with_temporary_hold<V: Into<bool>>(mut self, v: V) -> Self {
501 self.mut_resource().temporary_hold = v.into();
502 self
503 }
504
505 /// Sets the resource name of the [Customer-managed encryption key] for this
506 /// object.
507 ///
508 /// The service imposes a number of restrictions on the keys used to encrypt
509 /// Google Cloud Storage objects. Read the documentation in full before
510 /// trying to use customer-managed encryption keys. In particular, verify
511 /// the service has the necessary permissions, and the key is in a
512 /// compatible location.
513 ///
514 /// # Example
515 /// ```
516 /// # use google_cloud_storage::client::Storage;
517 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
518 /// let response = client
519 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
520 /// .with_kms_key("projects/test-project/locations/us-central1/keyRings/test-ring/cryptoKeys/test-key")
521 /// .send()
522 /// .await?;
523 /// println!("response details={response:?}");
524 /// # Ok(()) }
525 /// ```
526 ///
527 /// [Customer-managed encryption key]: https://cloud.google.com/storage/docs/encryption/customer-managed-keys
528 pub fn with_kms_key<V>(mut self, v: V) -> Self
529 where
530 V: Into<String>,
531 {
532 self.mut_resource().kms_key = v.into();
533 self
534 }
535
536 /// Configure this object to use one of the [predefined ACLs].
537 ///
538 /// # Example
539 /// ```
540 /// # use google_cloud_storage::client::Storage;
541 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
542 /// let response = client
543 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
544 /// .with_predefined_acl("private")
545 /// .send()
546 /// .await?;
547 /// println!("response details={response:?}");
548 /// # Ok(()) }
549 /// ```
550 ///
551 /// [predefined ACLs]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
552 pub fn with_predefined_acl<V>(mut self, v: V) -> Self
553 where
554 V: Into<String>,
555 {
556 self.spec.predefined_acl = v.into();
557 self
558 }
559
560 /// The encryption key used with the Customer-Supplied Encryption Keys
561 /// feature. In raw bytes format (not base64-encoded).
562 ///
563 /// # Example
564 /// ```
565 /// # use google_cloud_storage::client::Storage;
566 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
567 /// # use google_cloud_storage::client::KeyAes256;
568 /// let key: &[u8] = &[97; 32];
569 /// let response = client
570 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
571 /// .with_key(KeyAes256::new(key)?)
572 /// .send()
573 /// .await?;
574 /// println!("response details={response:?}");
575 /// # Ok(()) }
576 /// ```
577 pub fn with_key(mut self, v: KeyAes256) -> Self {
578 self.params = Some(v.into());
579 self
580 }
581
582 /// Configure the idempotency for this upload.
583 ///
584 /// By default, the client library treats single-shot uploads without
585 /// preconditions, as non-idempotent. If the destination bucket is
586 /// configured with [object versioning] then the operation may succeed
587 /// multiple times with observable side-effects. With object versioning and
588 /// a [lifecycle] policy limiting the number of versions, uploading the same
589 /// data multiple times may result in data loss.
590 ///
591 /// The client library cannot efficiently determine if these conditions
592 /// apply to your upload. If they do, or your application can tolerate
593 /// multiple versions of the same data for other reasons, consider using
594 /// `with_idempotency(true)`.
595 ///
596 /// The client library treats resumable uploads as idempotent, regardless of
597 /// the value in this option. Such uploads can succeed at most once.
598 ///
599 /// # Example
600 /// ```
601 /// # use google_cloud_storage::client::Storage;
602 /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
603 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
604 /// use std::time::Duration;
605 /// use gax::retry_policy::RetryPolicyExt;
606 /// let response = client
607 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
608 /// .with_idempotency(true)
609 /// .send()
610 /// .await?;
611 /// println!("response details={response:?}");
612 /// # Ok(()) }
613 /// ```
614 ///
615 /// [lifecycle]: https://cloud.google.com/storage/docs/lifecycle
616 /// [object versioning]: https://cloud.google.com/storage/docs/object-versioning
617 pub fn with_idempotency(mut self, v: bool) -> Self {
618 self.options.idempotency = Some(v);
619 self
620 }
621
622 /// The retry policy used for this request.
623 ///
624 /// # Example
625 /// ```
626 /// # use google_cloud_storage::client::Storage;
627 /// # use google_cloud_storage::retry_policy::RecommendedPolicy;
628 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
629 /// use std::time::Duration;
630 /// use gax::retry_policy::RetryPolicyExt;
631 /// let response = client
632 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
633 /// .with_retry_policy(RecommendedPolicy
634 /// .with_attempt_limit(5)
635 /// .with_time_limit(Duration::from_secs(10)),
636 /// )
637 /// .send()
638 /// .await?;
639 /// println!("response details={response:?}");
640 /// # Ok(()) }
641 /// ```
642 pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
643 self.options.retry_policy = v.into().into();
644 self
645 }
646
647 /// The backoff policy used for this request.
648 ///
649 /// # Example
650 /// ```
651 /// # use google_cloud_storage::client::Storage;
652 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
653 /// use std::time::Duration;
654 /// use gax::exponential_backoff::ExponentialBackoff;
655 /// let response = client
656 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
657 /// .with_backoff_policy(ExponentialBackoff::default())
658 /// .send()
659 /// .await?;
660 /// println!("response details={response:?}");
661 /// # Ok(()) }
662 /// ```
663 pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
664 mut self,
665 v: V,
666 ) -> Self {
667 self.options.backoff_policy = v.into().into();
668 self
669 }
670
671 /// The retry throttler used for this request.
672 ///
673 /// Most of the time you want to use the same throttler for all the requests
674 /// in a client, and even the same throttler for many clients. Rarely it
675 /// may be necessary to use an custom throttler for some subset of the
676 /// requests.
677 ///
678 /// # Example
679 /// ```
680 /// # use google_cloud_storage::client::Storage;
681 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
682 /// let response = client
683 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
684 /// .with_retry_throttler(adhoc_throttler())
685 /// .send()
686 /// .await?;
687 /// println!("response details={response:?}");
688 /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
689 /// # panic!();
690 /// }
691 /// # Ok(()) }
692 /// ```
693 pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
694 mut self,
695 v: V,
696 ) -> Self {
697 self.options.retry_throttler = v.into().into();
698 self
699 }
700
701 /// Sets the payload size threshold to switch from single-shot to resumable uploads.
702 ///
703 /// # Example
704 /// ```
705 /// # use google_cloud_storage::client::Storage;
706 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
707 /// let response = client
708 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
709 /// .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
710 /// .send()
711 /// .await?;
712 /// println!("response details={response:?}");
713 /// # Ok(()) }
714 /// ```
715 ///
716 /// The client library can perform uploads using [single-shot] or
717 /// [resumable] uploads. For small objects, single-shot uploads offer better
718 /// performance, as they require a single HTTP transfer. For larger objects,
719 /// the additional request latency is not significant, and resumable uploads
720 /// offer better recovery on errors.
721 ///
722 /// The library automatically selects resumable uploads when the payload is
723 /// equal to or larger than this option. For smaller uploads the client
724 /// library uses single-shot uploads.
725 ///
726 /// The exact threshold depends on where the application is deployed and
727 /// destination bucket location with respect to where the application is
728 /// running. The library defaults should work well in most cases, but some
729 /// applications may benefit from fine-tuning.
730 ///
731 /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
732 /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
733 pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
734 self.options.resumable_upload_threshold = v.into();
735 self
736 }
737
738 /// Changes the buffer size for some resumable uploads.
739 ///
740 /// # Example
741 /// ```
742 /// # use google_cloud_storage::client::Storage;
743 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
744 /// let response = client
745 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
746 /// .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
747 /// .send()
748 /// .await?;
749 /// println!("response details={response:?}");
750 /// # Ok(()) }
751 /// ```
752 ///
753 /// When performing [resumable uploads] from sources without [Seek] the
754 /// client library needs to buffer data in memory until it is persisted by
755 /// the service. Otherwise the data would be lost if the upload fails.
756 /// Applications may want to tune this buffer size:
757 ///
758 /// - Use smaller buffer sizes to support more concurrent uploads in the
759 /// same application.
760 /// - Use larger buffer sizes for better throughput. Sending many small
761 /// buffers stalls the upload until the client receives a successful
762 /// response from the service.
763 ///
764 /// Keep in mind that there are diminishing returns on using larger buffers.
765 ///
766 /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
767 /// [Seek]: crate::upload_source::Seek
768 pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
769 self.options.resumable_upload_buffer_size = v.into();
770 self
771 }
772
773 fn mut_resource(&mut self) -> &mut crate::model::Object {
774 self.spec
775 .resource
776 .as_mut()
777 .expect("resource field initialized in `new()`")
778 }
779
780 pub(crate) fn build(self) -> PerformUpload<C, Payload<T>> {
781 PerformUpload::new(
782 self.checksum,
783 self.payload,
784 self.inner,
785 self.spec,
786 self.params,
787 self.options,
788 )
789 }
790
791 fn switch_checksum<F, U>(self, new: F) -> UploadObject<T, U>
792 where
793 F: FnOnce(C) -> U,
794 {
795 UploadObject {
796 payload: self.payload,
797 inner: self.inner,
798 spec: self.spec,
799 params: self.params,
800 options: self.options,
801 checksum: new(self.checksum),
802 }
803 }
804
805 fn set_crc32c<V: Into<u32>>(mut self, v: V) -> Self {
806 let checksum = self.mut_resource().checksums.get_or_insert_default();
807 checksum.crc32c = Some(v.into());
808 self
809 }
810
811 pub fn set_md5_hash<I, V>(mut self, i: I) -> Self
812 where
813 I: IntoIterator<Item = V>,
814 V: Into<u8>,
815 {
816 let checksum = self.mut_resource().checksums.get_or_insert_default();
817 checksum.md5_hash = i.into_iter().map(|v| v.into()).collect();
818 self
819 }
820}
821
822impl<T> UploadObject<T, Crc32c> {
823 /// Provide a precomputed value for the CRC32C checksum.
824 ///
825 /// # Example
826 /// ```
827 /// # use google_cloud_storage::client::Storage;
828 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
829 /// use crc32c::crc32c;
830 /// let response = client
831 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
832 /// .with_known_crc32c(crc32c(b"hello world"))
833 /// .send()
834 /// .await?;
835 /// println!("response details={response:?}");
836 /// # Ok(()) }
837 /// ```
838 ///
839 /// In some applications, the payload's CRC32C checksum is already known.
840 /// For example, the application may be downloading the data from another
841 /// blob storage system.
842 ///
843 /// In such cases, it is safer to pass the known CRC32C of the payload to
844 /// [Cloud Storage], and more efficient to skip the computation in the
845 /// client library.
846 ///
847 /// Note that once you provide a CRC32C value to this builder you cannot
848 /// use [compute_md5()] to also have the library compute the checksums.
849 ///
850 /// [compute_md5()]: UploadObject::compute_md5
851 pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, KnownCrc32c> {
852 let this = self.switch_checksum(|_| KnownCrc32c);
853 this.set_crc32c(v)
854 }
855
856 /// Provide a precomputed value for the MD5 hash.
857 ///
858 /// # Example
859 /// ```
860 /// # use google_cloud_storage::client::Storage;
861 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
862 /// use md5::compute;
863 /// let hash = md5::compute(b"hello world");
864 /// let response = client
865 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
866 /// .with_known_md5_hash(bytes::Bytes::from_owner(hash.0))
867 /// .send()
868 /// .await?;
869 /// println!("response details={response:?}");
870 /// # Ok(()) }
871 /// ```
872 ///
873 /// In some applications, the payload's MD5 hash is already known. For
874 /// example, the application may be downloading the data from another blob
875 /// storage system.
876 ///
877 /// In such cases, it is safer to pass the known MD5 of the payload to
878 /// [Cloud Storage], and more efficient to skip the computation in the
879 /// client library.
880 ///
881 /// Note that once you provide a MD5 value to this builder you cannot
882 /// use [compute_md5()] to also have the library compute the checksums.
883 ///
884 /// [compute_md5()]: UploadObject::compute_md5
885 pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Crc32c<KnownMd5>>
886 where
887 I: IntoIterator<Item = V>,
888 V: Into<u8>,
889 {
890 let this = self.switch_checksum(|_| Crc32c::from_inner(KnownMd5));
891 this.set_md5_hash(i)
892 }
893
894 /// Enables computation of MD5 hashes.
895 ///
896 /// # Example
897 /// ```
898 /// # use google_cloud_storage::client::Storage;
899 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
900 /// let payload = tokio::fs::File::open("my-data").await?;
901 /// let response = client
902 /// .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
903 /// .compute_md5()
904 /// .send()
905 /// .await?;
906 /// println!("response details={response:?}");
907 /// # Ok(()) }
908 /// ```
909 ///
910 /// See [precompute_checksums][UploadObject::precompute_checksums] for more
911 /// details on how checksums are used by the client library and their
912 /// limitations.
913 pub fn compute_md5(self) -> UploadObject<T, Md5<Crc32c>> {
914 self.switch_checksum(Md5::from_inner)
915 }
916}
917
918impl<T> UploadObject<T, Crc32c<KnownMd5>> {
919 /// See [UploadObject<T, Crc32c>::with_known_crc32c].
920 pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, Known> {
921 let this = self.switch_checksum(|_| Known);
922 this.set_crc32c(v)
923 }
924}
925
926impl<T> UploadObject<T, Md5<Crc32c>> {
927 /// See [UploadObject<T, Crc32c>::with_known_crc32c].
928 pub fn with_known_crc32c<V: Into<u32>>(self, v: V) -> UploadObject<T, Md5<KnownCrc32c>> {
929 let this = self.switch_checksum(|_| Md5::from_inner(KnownCrc32c));
930 this.set_crc32c(v)
931 }
932
933 /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
934 pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Crc32c<KnownMd5>>
935 where
936 I: IntoIterator<Item = V>,
937 V: Into<u8>,
938 {
939 let this = self.switch_checksum(|_| Crc32c::from_inner(KnownMd5));
940 this.set_md5_hash(i)
941 }
942}
943
944impl<T> UploadObject<T, Md5<KnownCrc32c>> {
945 /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
946 pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Known>
947 where
948 I: IntoIterator<Item = V>,
949 V: Into<u8>,
950 {
951 let this = self.switch_checksum(|_| Known);
952 this.set_md5_hash(i)
953 }
954}
955
956impl<T> UploadObject<T, KnownCrc32c> {
957 /// See [UploadObject<T, Crc32c>::with_known_md5_hash].
958 pub fn with_known_md5_hash<I, V>(self, i: I) -> UploadObject<T, Known>
959 where
960 I: IntoIterator<Item = V>,
961 V: Into<u8>,
962 {
963 let this = self.switch_checksum(|_| Known);
964 this.set_md5_hash(i)
965 }
966
967 /// See [UploadObject<T, Crc32c>::compute_md5()].
968 pub fn compute_md5(self) -> UploadObject<T, Md5<KnownCrc32c>> {
969 self.switch_checksum(Md5::from_inner)
970 }
971}
972
973impl<T> UploadObject<T> {
974 pub(crate) fn new<B, O, P>(
975 inner: std::sync::Arc<StorageInner>,
976 bucket: B,
977 object: O,
978 payload: P,
979 ) -> Self
980 where
981 B: Into<String>,
982 O: Into<String>,
983 P: Into<Payload<T>>,
984 {
985 let options = inner.options.clone();
986 let resource = crate::model::Object::new()
987 .set_bucket(bucket)
988 .set_name(object);
989 UploadObject {
990 inner,
991 spec: crate::model::WriteObjectSpec::new().set_resource(resource),
992 params: None,
993 payload: payload.into(),
994 options,
995 checksum: Crc32c::default(),
996 }
997 }
998}
999
1000impl<T, C> UploadObject<T, C>
1001where
1002 C: ChecksumEngine + Send + Sync + 'static,
1003 T: StreamingSource + Seek + Send + Sync + 'static,
1004 <T as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
1005 <T as Seek>::Error: std::error::Error + Send + Sync + 'static,
1006{
1007 /// A simple upload from a buffer.
1008 ///
1009 /// # Example
1010 /// ```
1011 /// # use google_cloud_storage::client::Storage;
1012 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1013 /// let response = client
1014 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1015 /// .send_unbuffered()
1016 /// .await?;
1017 /// println!("response details={response:?}");
1018 /// # Ok(()) }
1019 /// ```
1020 pub async fn send_unbuffered(self) -> Result<Object> {
1021 self.build().send_unbuffered().await
1022 }
1023
1024 /// Precompute the payload checksums before uploading the data.
1025 ///
1026 /// If the checksums are known when the upload starts, the client library
1027 /// can include the checksums with the upload request, and the service can
1028 /// reject the upload if the payload and the checksums do not match.
1029 ///
1030 /// # Example
1031 /// ```
1032 /// # use google_cloud_storage::client::Storage;
1033 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1034 /// let payload = tokio::fs::File::open("my-data").await?;
1035 /// let response = client
1036 /// .upload_object("projects/_/buckets/my-bucket", "my-object", payload)
1037 /// .precompute_checksums()
1038 /// .await?
1039 /// .send_unbuffered()
1040 /// .await?;
1041 /// println!("response details={response:?}");
1042 /// # Ok(()) }
1043 /// ```
1044 ///
1045 /// Precomputing the checksums can be expensive if the data source is slow
1046 /// to read. Therefore, the client library does not precompute the checksums
1047 /// by default. The client library compares the checksums computed by the
1048 /// service against its own checksums. If they do not match, the client
1049 /// library returns an error. However, the service has already created the
1050 /// object with the (likely incorrect) data.
1051 ///
1052 /// The client library currently uses the [JSON API], it is not possible to
1053 /// send the checksums at the end of the upload with this API.
1054 ///
1055 /// [JSON API]: https://cloud.google.com/storage/docs/json_api
1056 pub async fn precompute_checksums(mut self) -> Result<UploadObject<T, Known>>
1057 where
1058 C: ChecksumEngine + Send + Sync + 'static,
1059 {
1060 let mut offset = 0_u64;
1061 self.payload.seek(offset).await.map_err(Error::ser)?;
1062 while let Some(n) = self.payload.next().await.transpose().map_err(Error::ser)? {
1063 self.checksum.update(offset, &n);
1064 offset += n.len() as u64;
1065 }
1066 self.payload.seek(0_u64).await.map_err(Error::ser)?;
1067 let computed = self.checksum.finalize();
1068 let current = self.mut_resource().checksums.get_or_insert_default();
1069 crate::storage::checksum::update(current, computed);
1070 Ok(self.switch_checksum(|_| Known))
1071 }
1072}
1073
1074impl<T, C> UploadObject<T, C>
1075where
1076 C: ChecksumEngine + Send + Sync + 'static,
1077 T: StreamingSource + Send + Sync + 'static,
1078 T::Error: std::error::Error + Send + Sync + 'static,
1079{
1080 /// Upload an object from a streaming source without rewinds.
1081 ///
1082 /// # Example
1083 /// ```
1084 /// # use google_cloud_storage::client::Storage;
1085 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
1086 /// let response = client
1087 /// .upload_object("projects/_/buckets/my-bucket", "my-object", "hello world")
1088 /// .send()
1089 /// .await?;
1090 /// println!("response details={response:?}");
1091 /// # Ok(()) }
1092 /// ```
1093 pub async fn send(self) -> crate::Result<Object> {
1094 self.build().send().await
1095 }
1096}
1097
1098// We need `Debug` to use `expect_err()` in `Result<UploadObject, ...>`.
1099impl<T, C> std::fmt::Debug for UploadObject<T, C>
1100where
1101 C: std::fmt::Debug,
1102{
1103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104 f.debug_struct("UploadObject")
1105 .field("inner", &self.inner)
1106 .field("spec", &self.spec)
1107 .field("params", &self.params)
1108 // skip payload, as it is not `Debug`
1109 .field("options", &self.options)
1110 .field("checksum", &self.checksum)
1111 .finish()
1112 }
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117 use super::client::tests::{test_builder, test_inner_client};
1118 use super::*;
1119 use crate::model::{ObjectChecksums, WriteObjectSpec};
1120 use crate::upload_source::tests::MockSeekSource;
1121 use std::error::Error as _;
1122 use std::io::{Error as IoError, ErrorKind};
1123
1124 type Result = anyhow::Result<()>;
1125
1126 // Verify `upload_object()` can be used with a source that implements `StreamingSource` **and** `Seek`
1127 #[tokio::test]
1128 async fn test_upload_streaming_source_and_seek() -> Result {
1129 struct Source;
1130 impl crate::upload_source::StreamingSource for Source {
1131 type Error = std::io::Error;
1132 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1133 None
1134 }
1135 }
1136 impl crate::upload_source::Seek for Source {
1137 type Error = std::io::Error;
1138 async fn seek(&mut self, _offset: u64) -> std::result::Result<(), Self::Error> {
1139 Ok(())
1140 }
1141 }
1142
1143 let client = Storage::builder()
1144 .with_credentials(auth::credentials::testing::test_credentials())
1145 .build()
1146 .await?;
1147 let _ = client.upload_object("projects/_/buckets/test-bucket", "test-object", Source);
1148 Ok(())
1149 }
1150
1151 // Verify `upload_object()` can be used with a source that **only** implements `StreamingSource`.
1152 #[tokio::test]
1153 async fn test_upload_only_streaming_source() -> Result {
1154 struct Source;
1155 impl crate::upload_source::StreamingSource for Source {
1156 type Error = std::io::Error;
1157 async fn next(&mut self) -> Option<std::result::Result<bytes::Bytes, Self::Error>> {
1158 None
1159 }
1160 }
1161
1162 let client = Storage::builder()
1163 .with_credentials(auth::credentials::testing::test_credentials())
1164 .build()
1165 .await?;
1166 let _ = client.upload_object("projects/_/buckets/test-bucket", "test-object", Source);
1167 Ok(())
1168 }
1169
1170 // Verify `upload_object()` meets normal Send, Sync, requirements.
1171 #[tokio::test]
1172 async fn test_upload_is_send_and_static() -> Result {
1173 let client = Storage::builder()
1174 .with_credentials(auth::credentials::testing::test_credentials())
1175 .build()
1176 .await?;
1177
1178 fn need_send<T: Send>(_val: &T) {}
1179 fn need_sync<T: Sync>(_val: &T) {}
1180 fn need_static<T: 'static>(_val: &T) {}
1181
1182 let upload = client.upload_object("projects/_/buckets/test-bucket", "test-object", "");
1183 need_send(&upload);
1184 need_sync(&upload);
1185 need_static(&upload);
1186
1187 let upload = client
1188 .upload_object("projects/_/buckets/test-bucket", "test-object", "")
1189 .send_unbuffered();
1190 need_send(&upload);
1191 need_static(&upload);
1192
1193 let upload = client
1194 .upload_object("projects/_/buckets/test-bucket", "test-object", "")
1195 .send();
1196 need_send(&upload);
1197 need_static(&upload);
1198
1199 Ok(())
1200 }
1201
1202 #[test]
1203 fn upload_object_unbuffered_metadata() -> Result {
1204 use crate::model::ObjectAccessControl;
1205 let inner = test_inner_client(test_builder());
1206 let mut request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1207 .with_if_generation_match(10)
1208 .with_if_generation_not_match(20)
1209 .with_if_metageneration_match(30)
1210 .with_if_metageneration_not_match(40)
1211 .with_predefined_acl("private")
1212 .with_acl([ObjectAccessControl::new()
1213 .set_entity("allAuthenticatedUsers")
1214 .set_role("READER")])
1215 .with_cache_control("public; max-age=7200")
1216 .with_content_disposition("inline")
1217 .with_content_encoding("gzip")
1218 .with_content_language("en")
1219 .with_content_type("text/plain")
1220 .with_known_crc32c(crc32c::crc32c(b""))
1221 .with_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1222 .with_event_based_hold(true)
1223 .with_known_md5_hash(md5::compute(b"").0)
1224 .with_metadata([("k0", "v0"), ("k1", "v1")])
1225 .with_retention(
1226 crate::model::object::Retention::new()
1227 .set_mode(crate::model::object::retention::Mode::Locked)
1228 .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
1229 )
1230 .with_storage_class("ARCHIVE")
1231 .with_temporary_hold(true)
1232 .with_kms_key("test-key");
1233
1234 let resource = request.spec.resource.take().unwrap();
1235 let request = request;
1236 assert_eq!(
1237 &request.spec,
1238 &WriteObjectSpec::new()
1239 .set_if_generation_match(10)
1240 .set_if_generation_not_match(20)
1241 .set_if_metageneration_match(30)
1242 .set_if_metageneration_not_match(40)
1243 .set_predefined_acl("private")
1244 );
1245
1246 assert_eq!(
1247 resource,
1248 Object::new()
1249 .set_name("object")
1250 .set_bucket("projects/_/buckets/bucket")
1251 .set_acl([ObjectAccessControl::new()
1252 .set_entity("allAuthenticatedUsers")
1253 .set_role("READER")])
1254 .set_cache_control("public; max-age=7200")
1255 .set_content_disposition("inline")
1256 .set_content_encoding("gzip")
1257 .set_content_language("en")
1258 .set_content_type("text/plain")
1259 .set_checksums(
1260 crate::model::ObjectChecksums::new()
1261 .set_crc32c(crc32c::crc32c(b""))
1262 .set_md5_hash(bytes::Bytes::from_iter(md5::compute(b"").0))
1263 )
1264 .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
1265 .set_event_based_hold(true)
1266 .set_metadata([("k0", "v0"), ("k1", "v1")])
1267 .set_retention(
1268 crate::model::object::Retention::new()
1269 .set_mode("LOCKED")
1270 .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?)
1271 )
1272 .set_storage_class("ARCHIVE")
1273 .set_temporary_hold(true)
1274 .set_kms_key("test-key")
1275 );
1276
1277 Ok(())
1278 }
1279
1280 #[test]
1281 fn upload_object_options() {
1282 let inner = test_inner_client(
1283 test_builder()
1284 .with_resumable_upload_threshold(123_usize)
1285 .with_resumable_upload_buffer_size(234_usize),
1286 );
1287 let request = UploadObject::new(inner.clone(), "projects/_/buckets/bucket", "object", "");
1288 assert_eq!(request.options.resumable_upload_threshold, 123);
1289 assert_eq!(request.options.resumable_upload_buffer_size, 234);
1290
1291 let request = UploadObject::new(inner, "projects/_/buckets/bucket", "object", "")
1292 .with_resumable_upload_threshold(345_usize)
1293 .with_resumable_upload_buffer_size(456_usize);
1294 assert_eq!(request.options.resumable_upload_threshold, 345);
1295 assert_eq!(request.options.resumable_upload_buffer_size, 456);
1296 }
1297
1298 const QUICK: &str = "the quick brown fox jumps over the lazy dog";
1299 const VEXING: &str = "how vexingly quick daft zebras jump";
1300
1301 fn quick_checksum<E: ChecksumEngine>(mut engine: E) -> ObjectChecksums {
1302 engine.update(0, &bytes::Bytes::from_static(QUICK.as_bytes()));
1303 engine.finalize()
1304 }
1305
1306 async fn collect<S: StreamingSource>(mut stream: S) -> anyhow::Result<Vec<u8>> {
1307 let mut collected = Vec::new();
1308 while let Some(b) = stream.next().await.transpose()? {
1309 collected.extend_from_slice(&b);
1310 }
1311 Ok(collected)
1312 }
1313
1314 #[tokio::test]
1315 async fn checksum_default() -> Result {
1316 let client = test_builder().build().await?;
1317 let upload = client
1318 .upload_object("my-bucket", "my-object", QUICK)
1319 .precompute_checksums()
1320 .await?;
1321 let want = quick_checksum(Crc32c::default());
1322 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1323 let collected = collect(upload.payload).await?;
1324 assert_eq!(collected, QUICK.as_bytes());
1325 Ok(())
1326 }
1327
1328 #[tokio::test]
1329 async fn checksum_md5_and_crc32c() -> Result {
1330 let client = test_builder().build().await?;
1331 let upload = client
1332 .upload_object("my-bucket", "my-object", QUICK)
1333 .compute_md5()
1334 .precompute_checksums()
1335 .await?;
1336 let want = quick_checksum(Crc32c::from_inner(Md5::default()));
1337 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1338 Ok(())
1339 }
1340
1341 #[tokio::test]
1342 async fn checksum_precomputed() -> Result {
1343 let mut engine = Crc32c::from_inner(Md5::default());
1344 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1345 let ck = engine.finalize();
1346
1347 let client = test_builder().build().await?;
1348 let upload = client
1349 .upload_object("my-bucket", "my-object", QUICK)
1350 .with_known_crc32c(ck.crc32c.unwrap())
1351 .with_known_md5_hash(ck.md5_hash.clone())
1352 .precompute_checksums()
1353 .await?;
1354 // Note that the checksums do not match the data. This is intentional,
1355 // we are trying to verify that whatever is provided in with_crc32c()
1356 // and with_md5() is respected.
1357 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(ck));
1358
1359 Ok(())
1360 }
1361
1362 #[tokio::test]
1363 async fn checksum_crc32c_known_md5_computed() -> Result {
1364 let mut engine = Crc32c::from_inner(Md5::default());
1365 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1366 let ck = engine.finalize();
1367
1368 let client = test_builder().build().await?;
1369 let upload = client
1370 .upload_object("my-bucket", "my-object", QUICK)
1371 .compute_md5()
1372 .with_known_crc32c(ck.crc32c.unwrap())
1373 .precompute_checksums()
1374 .await?;
1375 // Note that the checksums do not match the data. This is intentional,
1376 // we are trying to verify that whatever is provided in with_known*()
1377 // is respected.
1378 let want = quick_checksum(Md5::default()).set_crc32c(ck.crc32c.unwrap());
1379 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1380
1381 Ok(())
1382 }
1383
1384 #[tokio::test]
1385 async fn checksum_mixed_then_precomputed() -> Result {
1386 let mut engine = Crc32c::from_inner(Md5::default());
1387 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1388 let ck = engine.finalize();
1389
1390 let client = test_builder().build().await?;
1391 let upload = client
1392 .upload_object("my-bucket", "my-object", QUICK)
1393 .with_known_md5_hash(ck.md5_hash.clone())
1394 .with_known_crc32c(ck.crc32c.unwrap())
1395 .precompute_checksums()
1396 .await?;
1397 // Note that the checksums do not match the data. This is intentional,
1398 // we are trying to verify that whatever is provided in with_known*()
1399 // is respected.
1400 let want = ck.clone();
1401 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1402
1403 Ok(())
1404 }
1405
1406 #[tokio::test]
1407 async fn checksum_full_computed_then_md5_precomputed() -> Result {
1408 let mut engine = Crc32c::from_inner(Md5::default());
1409 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1410 let ck = engine.finalize();
1411
1412 let client = test_builder().build().await?;
1413 let upload = client
1414 .upload_object("my-bucket", "my-object", QUICK)
1415 .compute_md5()
1416 .with_known_md5_hash(ck.md5_hash.clone())
1417 .precompute_checksums()
1418 .await?;
1419 // Note that the checksums do not match the data. This is intentional,
1420 // we are trying to verify that whatever is provided in with_known*()
1421 // is respected.
1422 let want = quick_checksum(Crc32c::default()).set_md5_hash(ck.md5_hash.clone());
1423 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1424
1425 Ok(())
1426 }
1427
1428 #[tokio::test]
1429 async fn checksum_known_crc32_then_computed_md5() -> Result {
1430 let mut engine = Crc32c::from_inner(Md5::default());
1431 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1432 let ck = engine.finalize();
1433
1434 let client = test_builder().build().await?;
1435 let upload = client
1436 .upload_object("my-bucket", "my-object", QUICK)
1437 .with_known_crc32c(ck.crc32c.unwrap())
1438 .compute_md5()
1439 .with_known_md5_hash(ck.md5_hash.clone())
1440 .precompute_checksums()
1441 .await?;
1442 // Note that the checksums do not match the data. This is intentional,
1443 // we are trying to verify that whatever is provided in with_known*()
1444 // is respected.
1445 let want = ck.clone();
1446 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1447
1448 Ok(())
1449 }
1450
1451 #[tokio::test]
1452 async fn checksum_known_crc32_then_known_md5() -> Result {
1453 let mut engine = Crc32c::from_inner(Md5::default());
1454 engine.update(0, &bytes::Bytes::from_static(VEXING.as_bytes()));
1455 let ck = engine.finalize();
1456
1457 let client = test_builder().build().await?;
1458 let upload = client
1459 .upload_object("my-bucket", "my-object", QUICK)
1460 .with_known_crc32c(ck.crc32c.unwrap())
1461 .with_known_md5_hash(ck.md5_hash.clone())
1462 .precompute_checksums()
1463 .await?;
1464 // Note that the checksums do not match the data. This is intentional,
1465 // we are trying to verify that whatever is provided in with_known*()
1466 // is respected.
1467 let want = ck.clone();
1468 assert_eq!(upload.spec.resource.and_then(|r| r.checksums), Some(want));
1469
1470 Ok(())
1471 }
1472
1473 #[tokio::test]
1474 async fn precompute_checksums_seek_error() -> Result {
1475 let mut source = MockSeekSource::new();
1476 source
1477 .expect_seek()
1478 .once()
1479 .returning(|_| Err(IoError::new(ErrorKind::Deadlock, "test-only")));
1480
1481 let client = test_builder().build().await?;
1482 let err = client
1483 .upload_object("my-bucket", "my-object", source)
1484 .precompute_checksums()
1485 .await
1486 .expect_err("seek() returns an error");
1487 assert!(err.is_serialization(), "{err:?}");
1488 assert!(
1489 err.source()
1490 .and_then(|e| e.downcast_ref::<IoError>())
1491 .is_some(),
1492 "{err:?}"
1493 );
1494
1495 Ok(())
1496 }
1497
1498 #[tokio::test]
1499 async fn precompute_checksums_next_error() -> Result {
1500 let mut source = MockSeekSource::new();
1501 source.expect_seek().returning(|_| Ok(()));
1502 let mut seq = mockall::Sequence::new();
1503 source
1504 .expect_next()
1505 .times(3)
1506 .in_sequence(&mut seq)
1507 .returning(|| Some(Ok(bytes::Bytes::new())));
1508 source
1509 .expect_next()
1510 .once()
1511 .in_sequence(&mut seq)
1512 .returning(|| Some(Err(IoError::new(ErrorKind::BrokenPipe, "test-only"))));
1513
1514 let client = test_builder().build().await?;
1515 let err = client
1516 .upload_object("my-bucket", "my-object", source)
1517 .precompute_checksums()
1518 .await
1519 .expect_err("seek() returns an error");
1520 assert!(err.is_serialization(), "{err:?}");
1521 assert!(
1522 err.source()
1523 .and_then(|e| e.downcast_ref::<IoError>())
1524 .is_some(),
1525 "{err:?}"
1526 );
1527
1528 Ok(())
1529 }
1530
1531 #[tokio::test]
1532 async fn debug() -> Result {
1533 let client = test_builder().build().await?;
1534 let upload = client
1535 .upload_object("my-bucket", "my-object", "")
1536 .precompute_checksums()
1537 .await;
1538
1539 let fmt = format!("{upload:?}");
1540 ["UploadObject", "inner", "spec", "options", "checksum"]
1541 .into_iter()
1542 .for_each(|text| {
1543 assert!(fmt.contains(text), "expected {text} in {fmt}");
1544 });
1545 Ok(())
1546 }
1547}