google_cloud_storage/storage/open_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::Result;
16use crate::model_ext::{KeyAes256, OpenObjectRequest, ReadRange};
17use crate::object_descriptor::ObjectDescriptor;
18use crate::read_object::ReadObjectResponse;
19use crate::read_resume_policy::ReadResumePolicy;
20use crate::request_options::RequestOptions;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// A request builder for [Storage::open_object][crate::client::Storage::open_object].
25///
26/// # Example
27/// ```
28/// use google_cloud_storage::client::Storage;
29/// # use google_cloud_storage::builder::storage::OpenObject;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31/// let builder: OpenObject = client
32/// .open_object("projects/_/buckets/my-bucket", "my-object");
33/// let descriptor = builder
34/// .set_generation(123)
35/// .send()
36/// .await?;
37/// println!("object metadata={:?}", descriptor.object());
38/// // Use `descriptor` to read data from `my-object`.
39/// Ok(())
40/// }
41/// ```
42#[derive(Clone, Debug)]
43pub struct OpenObject<S = crate::storage::transport::Storage> {
44 stub: Arc<S>,
45 request: OpenObjectRequest,
46 options: RequestOptions,
47}
48
49impl<S> OpenObject<S>
50where
51 S: crate::storage::stub::Storage + 'static,
52{
53 /// Sends the request, returning a new object descriptor.
54 ///
55 /// Example:
56 /// ```ignore
57 /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
58 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
59 /// let open = client
60 /// .open_object("projects/_/buckets/my-bucket", "my-object")
61 /// .send()
62 /// .await?;
63 /// println!("object metadata={:?}", open.object());
64 /// # Ok(()) }
65 /// ```
66 pub async fn send(self) -> Result<ObjectDescriptor> {
67 let (descriptor, _) = self.stub.open_object(self.request, self.options).await?;
68 Ok(descriptor)
69 }
70
71 /// Sends the request, returning a new object descriptor and reader.
72 ///
73 /// Example:
74 /// ```ignore
75 /// # use google_cloud_storage::client::Storage;
76 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
77 /// use google_cloud_storage::model_ext::ReadRange;
78 /// let (descriptor, mut reader) = client
79 /// .open_object("projects/_/buckets/my-bucket", "my-object.parquet")
80 /// .send_and_read(ReadRange::tail(32))
81 /// .await?;
82 /// println!("object metadata={:?}", descriptor.object());
83 /// let data = reader.next().await.transpose()?;
84 /// # Ok(()) }
85 /// ```
86 ///
87 /// This method allows applications to open an object and issue a read
88 /// request in the same RPC, which is typically faster than opening an
89 /// object and then issuing a `read_range()` call. This may be useful when
90 /// opening objects that have metadata information in a footer or header.
91 pub async fn send_and_read(
92 mut self,
93 range: ReadRange,
94 ) -> Result<(ObjectDescriptor, ReadObjectResponse)> {
95 self.request.ranges.push(range);
96 let (descriptor, mut readers) = self.stub.open_object(self.request, self.options).await?;
97 if readers.len() == 1 {
98 return Ok((descriptor, readers.pop().unwrap()));
99 }
100 // Even if the service returns multiple read ranges, with different ids,
101 // the code in the library will return an error and close the stream.
102 unreachable!("the stub cannot create more readers")
103 }
104}
105
106impl<S> OpenObject<S> {
107 pub(crate) fn new<B, O>(
108 stub: std::sync::Arc<S>,
109 bucket: B,
110 object: O,
111 options: RequestOptions,
112 ) -> Self
113 where
114 B: Into<String>,
115 O: Into<String>,
116 {
117 let request = OpenObjectRequest::default()
118 .set_bucket(bucket)
119 .set_object(object);
120 Self {
121 request,
122 options,
123 stub,
124 }
125 }
126
127 /// If present, selects a specific revision of this object (as
128 /// opposed to the latest version, the default).
129 ///
130 /// # Example
131 /// ```
132 /// # use google_cloud_storage::client::Storage;
133 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
134 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
135 /// let response = client
136 /// .open_object("projects/_/buckets/my-bucket", "my-object")
137 /// .set_generation(123456)
138 /// .send()
139 /// .await?;
140 /// # Ok(()) }
141 /// ```
142 pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
143 self.request = self.request.set_generation(v.into());
144 self
145 }
146
147 /// Makes the operation conditional on whether the object's current generation
148 /// matches the given value.
149 ///
150 /// # Example
151 /// ```
152 /// # use google_cloud_storage::client::Storage;
153 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
154 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
155 /// let response = client
156 /// .open_object("projects/_/buckets/my-bucket", "my-object")
157 /// .set_if_generation_match(123456)
158 /// .send()
159 /// .await?;
160 /// # Ok(()) }
161 /// ```
162 pub fn set_if_generation_match<T>(mut self, v: T) -> Self
163 where
164 T: Into<i64>,
165 {
166 self.request = self.request.set_if_generation_match(v.into());
167 self
168 }
169
170 /// Makes the operation conditional on whether the object's live generation
171 /// does not match the given value. If no live object exists, the precondition
172 /// fails.
173 ///
174 /// # Example
175 /// ```
176 /// # use google_cloud_storage::client::Storage;
177 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
178 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
179 /// let response = client
180 /// .open_object("projects/_/buckets/my-bucket", "my-object")
181 /// .set_if_generation_not_match(123456)
182 /// .send()
183 /// .await?;
184 /// # Ok(()) }
185 /// ```
186 pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
187 where
188 T: Into<i64>,
189 {
190 self.request = self.request.set_if_generation_not_match(v.into());
191 self
192 }
193
194 /// Makes the operation conditional on whether the object's current
195 /// metageneration matches the given value.
196 ///
197 /// # Example
198 /// ```
199 /// # use google_cloud_storage::client::Storage;
200 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
201 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
202 /// let response = client
203 /// .open_object("projects/_/buckets/my-bucket", "my-object")
204 /// .set_if_metageneration_match(123456)
205 /// .send()
206 /// .await?;
207 /// # Ok(()) }
208 /// ```
209 pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
210 where
211 T: Into<i64>,
212 {
213 self.request = self.request.set_if_metageneration_match(v.into());
214 self
215 }
216
217 /// Makes the operation conditional on whether the object's current
218 /// metageneration does not match the given value.
219 ///
220 /// # Example
221 /// ```
222 /// # use google_cloud_storage::client::Storage;
223 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
224 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
225 /// let response = client
226 /// .open_object("projects/_/buckets/my-bucket", "my-object")
227 /// .set_if_metageneration_not_match(123456)
228 /// .send()
229 /// .await?;
230 /// # Ok(()) }
231 /// ```
232 pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
233 where
234 T: Into<i64>,
235 {
236 self.request = self.request.set_if_metageneration_not_match(v.into());
237 self
238 }
239
240 /// The encryption key used with the Customer-Supplied Encryption Keys
241 /// feature. In raw bytes format (not base64-encoded).
242 ///
243 /// Example:
244 /// ```
245 /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
246 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
247 /// let key: &[u8] = &[97; 32];
248 /// let response = client
249 /// .open_object("projects/_/buckets/my-bucket", "my-object")
250 /// .set_key(KeyAes256::new(key)?)
251 /// .send()
252 /// .await?;
253 /// println!("response details={response:?}");
254 /// # Ok(()) }
255 /// ```
256 pub fn set_key(mut self, v: KeyAes256) -> Self {
257 self.request = self
258 .request
259 .set_common_object_request_params(crate::model::CommonObjectRequestParams::from(v));
260 self
261 }
262
263 /// The retry policy used for this request.
264 ///
265 /// # Example
266 /// ```
267 /// # use google_cloud_storage::client::Storage;
268 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
269 /// use google_cloud_storage::retry_policy::RetryableErrors;
270 /// use std::time::Duration;
271 /// use google_cloud_gax::retry_policy::RetryPolicyExt;
272 /// let response = client
273 /// .open_object("projects/_/buckets/my-bucket", "my-object")
274 /// .with_retry_policy(
275 /// RetryableErrors
276 /// .with_attempt_limit(5)
277 /// .with_time_limit(Duration::from_secs(10)),
278 /// )
279 /// .send()
280 /// .await?;
281 /// println!("response details={response:?}");
282 /// # Ok(()) }
283 /// ```
284 pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
285 mut self,
286 v: V,
287 ) -> Self {
288 self.options.retry_policy = v.into().into();
289 self
290 }
291
292 /// The backoff policy used for this request.
293 ///
294 /// # Example
295 /// ```
296 /// # use google_cloud_storage::client::Storage;
297 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
298 /// use std::time::Duration;
299 /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
300 /// let response = client
301 /// .open_object("projects/_/buckets/my-bucket", "my-object")
302 /// .with_backoff_policy(ExponentialBackoff::default())
303 /// .send()
304 /// .await?;
305 /// println!("response details={response:?}");
306 /// # Ok(()) }
307 /// ```
308 pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
309 mut self,
310 v: V,
311 ) -> Self {
312 self.options.backoff_policy = v.into().into();
313 self
314 }
315
316 /// The retry throttler used for this request.
317 ///
318 /// Most of the time you want to use the same throttler for all the requests
319 /// in a client, and even the same throttler for many clients. Rarely it
320 /// may be necessary to use an custom throttler for some subset of the
321 /// requests.
322 ///
323 /// # Example
324 /// ```
325 /// # use google_cloud_storage::client::Storage;
326 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
327 /// let response = client
328 /// .open_object("projects/_/buckets/my-bucket", "my-object")
329 /// .with_retry_throttler(adhoc_throttler())
330 /// .send()
331 /// .await?;
332 /// println!("response details={response:?}");
333 /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
334 /// # panic!();
335 /// }
336 /// # Ok(()) }
337 /// ```
338 pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
339 mut self,
340 v: V,
341 ) -> Self {
342 self.options.retry_throttler = v.into().into();
343 self
344 }
345
346 /// Configure the resume policy for read requests.
347 ///
348 /// The Cloud Storage client library can automatically resume a read that is
349 /// interrupted by a transient error. Applications may want to limit the
350 /// number of read attempts, or may wish to expand the type of errors
351 /// treated as retryable.
352 ///
353 /// # Example
354 /// ```
355 /// # use google_cloud_storage::client::Storage;
356 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
357 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
358 /// let response = client
359 /// .open_object("projects/_/buckets/my-bucket", "my-object")
360 /// .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
361 /// .send()
362 /// .await?;
363 /// # Ok(()) }
364 /// ```
365 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
366 where
367 V: ReadResumePolicy + 'static,
368 {
369 self.options.set_read_resume_policy(std::sync::Arc::new(v));
370 self
371 }
372
373 /// Configure per-attempt timeout.
374 ///
375 /// # Example
376 /// ```
377 /// # use google_cloud_storage::client::Storage;
378 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
379 /// use std::time::Duration;
380 /// let response = client
381 /// .open_object("projects/_/buckets/my-bucket", "my-object")
382 /// .with_attempt_timeout(Duration::from_secs(120))
383 /// .send()
384 /// .await?;
385 /// # Ok(()) }
386 /// ```
387 ///
388 /// The Cloud Storage client library times out `open_object()` attempts by
389 /// default (with a 60s timeout). Applications may want to set a different
390 /// value depending on how they are deployed.
391 ///
392 /// Note that the per-attempt timeout is subject to the overall retry loop
393 /// time limits (if any). The effective timeout for each attempt is the
394 /// smallest of (a) the per-attempt timeout, and (b) the remaining time in
395 /// the retry loop.
396 pub fn with_attempt_timeout(mut self, v: Duration) -> Self {
397 self.options.set_bidi_attempt_timeout(v);
398 self
399 }
400
401 /// Sets the `User-Agent` header for this request.
402 ///
403 /// # Example
404 /// ```
405 /// # use google_cloud_storage::client::Storage;
406 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
407 /// let mut response = client
408 /// .open_object("projects/_/buckets/my-bucket", "my-object")
409 /// .with_user_agent("my-app/1.0.0")
410 /// .send()
411 /// .await?;
412 /// println!("response details={response:?}");
413 /// # Ok(()) }
414 /// ```
415 pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
416 self.options.user_agent = Some(user_agent.into());
417 self
418 }
419
420 /// Sets the project that will be billed for this request.
421 ///
422 /// Required for [Requester Pays] buckets. The value overrides any
423 /// `quota_project_id` configured on the credentials; the credential-level
424 /// header is suppressed for this RPC.
425 ///
426 /// # Example
427 /// ```
428 /// # use google_cloud_storage::client::Storage;
429 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
430 /// let response = client
431 /// .open_object("projects/_/buckets/my-bucket", "my-object")
432 /// .with_quota_project("my-billing-project")
433 /// .send()
434 /// .await?;
435 /// # Ok(()) }
436 /// ```
437 ///
438 /// [Requester Pays]: https://cloud.google.com/storage/docs/requester-pays
439 pub fn with_quota_project(mut self, project: impl Into<String>) -> Self {
440 self.options.set_quota_project(project);
441 self
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use crate::client::Storage;
449 use crate::model::{CommonObjectRequestParams, Object};
450 use crate::model_ext::tests::create_key_helper;
451 use anyhow::Result;
452 use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
453 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
454 use google_cloud_gax::retry_policy::NeverRetry;
455 use http::HeaderValue;
456 use static_assertions::assert_impl_all;
457 use storage_grpc_mock::google::storage::v2::{
458 BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
459 ReadRange as ProtoRange,
460 };
461 use storage_grpc_mock::{MockStorage, start};
462
463 const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
464 const OBJECT_NAME: &str = "test-object";
465 const USER_AGENT: &str = "quick_foxes_lazy_dogs/1.2.3";
466 const BIND_ADDRESS: &str = "0.0.0.0:0";
467
468 // Verify `open_object()` meets normal Send, Sync, requirements.
469 #[tokio::test]
470 async fn traits() -> Result<()> {
471 assert_impl_all!(OpenObject: Clone, std::fmt::Debug, Send, Sync);
472
473 let client = Storage::builder()
474 .with_credentials(Anonymous::new().build())
475 .build()
476 .await?;
477
478 fn need_send<T: Send>(_val: &T) {}
479 fn need_static<T: 'static>(_val: &T) {}
480
481 let open = client.open_object(BUCKET_NAME, OBJECT_NAME);
482 need_static(&open);
483
484 let fut = client.open_object(BUCKET_NAME, OBJECT_NAME).send();
485 need_send(&fut);
486 need_static(&fut);
487 Ok(())
488 }
489
490 #[tokio::test]
491 async fn open_object_normal() -> Result<()> {
492 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
493 let initial = BidiReadObjectResponse {
494 metadata: Some(ProtoObject {
495 bucket: BUCKET_NAME.to_string(),
496 name: OBJECT_NAME.to_string(),
497 generation: 123456,
498 size: 42,
499 ..ProtoObject::default()
500 }),
501 ..BidiReadObjectResponse::default()
502 };
503 tx.send(Ok(initial)).await?;
504
505 let mut mock = MockStorage::new();
506 mock.expect_bidi_read_object()
507 .return_once(|_| Ok(TonicResponse::from(rx)));
508 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
509
510 let client = Storage::builder()
511 .with_endpoint(endpoint)
512 .with_credentials(Anonymous::new().build())
513 .build()
514 .await?;
515 let descriptor = client.open_object(BUCKET_NAME, OBJECT_NAME).send().await?;
516
517 let got = descriptor.object();
518 let want = Object::new()
519 .set_bucket(BUCKET_NAME)
520 .set_name(OBJECT_NAME)
521 .set_generation(123456)
522 .set_size(42);
523 assert_eq!(got, want);
524
525 Ok(())
526 }
527
528 #[tokio::test]
529 async fn attributes() -> Result<()> {
530 let options = RequestOptions::new();
531 let builder = OpenObject::new(
532 Arc::new(StorageStub),
533 BUCKET_NAME.to_string(),
534 OBJECT_NAME.to_string(),
535 options,
536 )
537 .set_generation(123)
538 .set_if_generation_match(234)
539 .set_if_generation_not_match(345)
540 .set_if_metageneration_match(456)
541 .set_if_metageneration_not_match(567);
542 let want = OpenObjectRequest::default()
543 .set_bucket(BUCKET_NAME)
544 .set_object(OBJECT_NAME)
545 .set_generation(123)
546 .set_if_generation_match(234)
547 .set_if_generation_not_match(345)
548 .set_if_metageneration_match(456)
549 .set_if_metageneration_not_match(567);
550 assert_eq!(builder.request, want);
551 Ok(())
552 }
553
554 #[tokio::test]
555 async fn csek() -> Result<()> {
556 let options = RequestOptions::new();
557 let builder = OpenObject::new(
558 Arc::new(StorageStub),
559 BUCKET_NAME.to_string(),
560 OBJECT_NAME.to_string(),
561 options,
562 );
563
564 let (raw_key, _, _, _) = create_key_helper();
565 let key = KeyAes256::new(&raw_key)?;
566 let builder = builder.set_key(key.clone());
567 let want = OpenObjectRequest::default()
568 .set_bucket(BUCKET_NAME)
569 .set_object(OBJECT_NAME)
570 .set_common_object_request_params(CommonObjectRequestParams::from(key));
571 assert_eq!(builder.request, want);
572 Ok(())
573 }
574
575 #[tokio::test]
576 async fn request_options() -> Result<()> {
577 use crate::read_resume_policy::NeverResume;
578 use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
579 use google_cloud_gax::retry_policy::Aip194Strict;
580 use google_cloud_gax::retry_throttler::CircuitBreaker;
581
582 let options = RequestOptions::new();
583 let builder = OpenObject::new(
584 Arc::new(StorageStub),
585 BUCKET_NAME.to_string(),
586 OBJECT_NAME.to_string(),
587 options,
588 )
589 .with_backoff_policy(
590 ExponentialBackoffBuilder::default()
591 .with_scaling(4.0)
592 .build()
593 .expect("expontial backoff builds"),
594 )
595 .with_retry_policy(Aip194Strict)
596 .with_retry_throttler(CircuitBreaker::default())
597 .with_read_resume_policy(NeverResume)
598 .with_attempt_timeout(Duration::from_secs(120))
599 .with_user_agent(USER_AGENT);
600
601 let got = builder.options;
602 assert!(
603 format!("{:?}", got.backoff_policy).contains("ExponentialBackoff"),
604 "{got:?}"
605 );
606 assert!(
607 format!("{:?}", got.retry_policy).contains("Aip194Strict"),
608 "{got:?}"
609 );
610 assert!(
611 format!("{:?}", got.retry_throttler).contains("CircuitBreaker"),
612 "{got:?}"
613 );
614 assert!(
615 format!("{:?}", got.read_resume_policy()).contains("NeverResume"),
616 "{got:?}"
617 );
618 assert_eq!(
619 got.bidi_attempt_timeout,
620 Duration::from_secs(120),
621 "{got:?}"
622 );
623 assert_eq!(got.user_agent.as_deref(), Some(USER_AGENT), "{got:?}");
624
625 Ok(())
626 }
627
628 #[tokio::test]
629 async fn send() -> anyhow::Result<()> {
630 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
631 let initial = BidiReadObjectResponse {
632 metadata: Some(ProtoObject {
633 bucket: BUCKET_NAME.to_string(),
634 name: OBJECT_NAME.to_string(),
635 generation: 123456,
636 ..ProtoObject::default()
637 }),
638 ..BidiReadObjectResponse::default()
639 };
640 tx.send(Ok(initial)).await?;
641
642 let mut mock = MockStorage::new();
643 mock.expect_bidi_read_object()
644 .return_once(|_| Ok(TonicResponse::from(rx)));
645 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
646
647 let client = Storage::builder()
648 .with_credentials(Anonymous::new().build())
649 .with_endpoint(endpoint)
650 .build()
651 .await?;
652
653 let descriptor = client.open_object(BUCKET_NAME, OBJECT_NAME).send().await?;
654 let want = Object::new()
655 .set_bucket(BUCKET_NAME)
656 .set_name(OBJECT_NAME)
657 .set_generation(123456);
658 assert_eq!(descriptor.object(), want, "{descriptor:?}");
659 assert_eq!(
660 descriptor.headers().get("content-type"),
661 Some(&HeaderValue::from_static("application/grpc")),
662 "headers={:?}",
663 descriptor.headers()
664 );
665 Ok(())
666 }
667
668 #[tokio::test]
669 async fn send_and_read() -> anyhow::Result<()> {
670 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
671 let payload = Vec::from_iter((0..32).map(|i| i as u8));
672 let initial = BidiReadObjectResponse {
673 metadata: Some(ProtoObject {
674 bucket: BUCKET_NAME.to_string(),
675 name: OBJECT_NAME.to_string(),
676 generation: 123456,
677 ..ProtoObject::default()
678 }),
679 object_data_ranges: vec![ObjectRangeData {
680 read_range: Some(ProtoRange {
681 read_id: 0_i64,
682 ..ProtoRange::default()
683 }),
684 range_end: true,
685 checksummed_data: Some(ChecksummedData {
686 content: payload.clone(),
687 crc32c: None,
688 }),
689 }],
690 ..BidiReadObjectResponse::default()
691 };
692 tx.send(Ok(initial)).await?;
693
694 let mut mock = MockStorage::new();
695 mock.expect_bidi_read_object()
696 .return_once(|_| Ok(TonicResponse::from(rx)));
697 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
698
699 let client = Storage::builder()
700 .with_credentials(Anonymous::new().build())
701 .with_endpoint(endpoint)
702 .build()
703 .await?;
704
705 let (descriptor, mut reader) = client
706 .open_object(BUCKET_NAME, OBJECT_NAME)
707 .send_and_read(ReadRange::tail(32))
708 .await?;
709 let want = Object::new()
710 .set_bucket(BUCKET_NAME)
711 .set_name(OBJECT_NAME)
712 .set_generation(123456);
713 assert_eq!(descriptor.object(), want, "{descriptor:?}");
714 assert_eq!(
715 descriptor.headers().get("content-type"),
716 Some(&HeaderValue::from_static("application/grpc")),
717 "headers={:?}",
718 descriptor.headers()
719 );
720
721 let mut got_payload = Vec::new();
722 while let Some(chunk) = reader.next().await.transpose()? {
723 got_payload.extend_from_slice(&chunk);
724 }
725 assert_eq!(got_payload, payload);
726 Ok(())
727 }
728
729 #[tokio::test(start_paused = true)]
730 async fn timeout() -> anyhow::Result<()> {
731 let (_tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
732
733 let mut mock = MockStorage::new();
734 mock.expect_bidi_read_object()
735 .return_once(|_| Ok(TonicResponse::from(rx)));
736 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
737
738 let client = Storage::builder()
739 .with_credentials(Anonymous::new().build())
740 .with_endpoint(endpoint)
741 .with_retry_policy(NeverRetry)
742 .build()
743 .await?;
744
745 // This will timeout because we never send the initial message over `_tx`.
746 let target = Duration::from_secs(120);
747 let start = tokio::time::Instant::now();
748 let err = client
749 .open_object(BUCKET_NAME, OBJECT_NAME)
750 .with_attempt_timeout(target)
751 .send()
752 .await
753 .unwrap_err();
754 assert!(err.is_timeout(), "{err:?}");
755 assert_eq!(start.elapsed(), target);
756
757 Ok(())
758 }
759
760 #[tokio::test]
761 async fn user_agent() -> anyhow::Result<()> {
762 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
763 let initial = BidiReadObjectResponse {
764 metadata: Some(ProtoObject {
765 bucket: BUCKET_NAME.to_string(),
766 name: OBJECT_NAME.to_string(),
767 generation: 123456,
768 ..ProtoObject::default()
769 }),
770 ..BidiReadObjectResponse::default()
771 };
772 tx.send(Ok(initial)).await?;
773
774 let mut mock = MockStorage::new();
775 mock.expect_bidi_read_object().return_once(|request| {
776 let metadata = request.metadata();
777 let user_agent = metadata
778 .get(http::header::USER_AGENT.as_str())
779 .and_then(|v| v.to_str().ok())
780 .expect("user-agent should be set");
781
782 let got = user_agent.split(' ').any(|s| s == USER_AGENT);
783 assert!(got, "{user_agent:?}");
784
785 Ok(TonicResponse::from(rx))
786 });
787 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
788
789 let client = Storage::builder()
790 .with_credentials(Anonymous::new().build())
791 .with_endpoint(endpoint)
792 .build()
793 .await?;
794
795 let _descriptor = client
796 .open_object(BUCKET_NAME, OBJECT_NAME)
797 .with_user_agent(USER_AGENT)
798 .send()
799 .await?;
800 Ok(())
801 }
802
803 #[tokio::test]
804 async fn quota_project() -> anyhow::Result<()> {
805 const PROJECT_NAME: &str = "project_lazy_dog";
806 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
807 let initial = BidiReadObjectResponse {
808 metadata: Some(ProtoObject {
809 bucket: BUCKET_NAME.to_string(),
810 name: OBJECT_NAME.to_string(),
811 generation: 123456,
812 ..ProtoObject::default()
813 }),
814 ..BidiReadObjectResponse::default()
815 };
816 tx.send(Ok(initial)).await?;
817
818 let mut mock = MockStorage::new();
819 mock.expect_bidi_read_object().return_once(|request| {
820 let user_project = request
821 .metadata()
822 .get("x-goog-user-project")
823 .and_then(|v| v.to_str().ok())
824 .expect("x-goog-user-project should be set");
825 assert_eq!(user_project, PROJECT_NAME);
826 Ok(TonicResponse::from(rx))
827 });
828 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
829
830 let client = Storage::builder()
831 .with_credentials(Anonymous::new().build())
832 .with_endpoint(endpoint)
833 .build()
834 .await?;
835
836 let _descriptor = client
837 .open_object(BUCKET_NAME, OBJECT_NAME)
838 .with_quota_project(PROJECT_NAME)
839 .send()
840 .await?;
841 Ok(())
842 }
843
844 #[derive(Debug)]
845 struct StorageStub;
846 impl crate::stub::Storage for StorageStub {}
847}