google_cloud_storage/storage/open_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::Result;
16use crate::model_ext::{KeyAes256, OpenObjectRequest, ReadRange};
17use crate::object_descriptor::ObjectDescriptor;
18use crate::read_object::ReadObjectResponse;
19use crate::read_resume_policy::ReadResumePolicy;
20use crate::request_options::RequestOptions;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// A request builder for [Storage::open_object][crate::client::Storage::open_object].
25///
26/// # Example
27/// ```
28/// use google_cloud_storage::client::Storage;
29/// # use google_cloud_storage::builder::storage::OpenObject;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31/// let builder: OpenObject = client
32/// .open_object("projects/_/buckets/my-bucket", "my-object");
33/// let descriptor = builder
34/// .set_generation(123)
35/// .send()
36/// .await?;
37/// println!("object metadata={:?}", descriptor.object());
38/// // Use `descriptor` to read data from `my-object`.
39/// Ok(())
40/// }
41/// ```
42#[derive(Clone, Debug)]
43pub struct OpenObject<S = crate::storage::transport::Storage> {
44 stub: Arc<S>,
45 request: OpenObjectRequest,
46 options: RequestOptions,
47}
48
49impl<S> OpenObject<S>
50where
51 S: crate::storage::stub::Storage + 'static,
52{
53 /// Sends the request, returning a new object descriptor.
54 ///
55 /// Example:
56 /// ```ignore
57 /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
58 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
59 /// let open = client
60 /// .open_object("projects/_/buckets/my-bucket", "my-object")
61 /// .send()
62 /// .await?;
63 /// println!("object metadata={:?}", open.object());
64 /// # Ok(()) }
65 /// ```
66 pub async fn send(self) -> Result<ObjectDescriptor> {
67 let (descriptor, _) = self.stub.open_object(self.request, self.options).await?;
68 Ok(descriptor)
69 }
70
71 /// Sends the request, returning a new object descriptor and reader.
72 ///
73 /// Example:
74 /// ```ignore
75 /// # use google_cloud_storage::client::Storage;
76 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
77 /// use google_cloud_storage::model_ext::ReadRange;
78 /// let (descriptor, mut reader) = client
79 /// .open_object("projects/_/buckets/my-bucket", "my-object.parquet")
80 /// .send_and_read(ReadRange::tail(32))
81 /// .await?;
82 /// println!("object metadata={:?}", descriptor.object());
83 /// let data = reader.next().await.transpose()?;
84 /// # Ok(()) }
85 /// ```
86 ///
87 /// This method allows applications to open an object and issue a read
88 /// request in the same RPC, which is typically faster than opening an
89 /// object and then issuing a `read_range()` call. This may be useful when
90 /// opening objects that have metadata information in a footer or header.
91 pub async fn send_and_read(
92 mut self,
93 range: ReadRange,
94 ) -> Result<(ObjectDescriptor, ReadObjectResponse)> {
95 self.request.ranges.push(range);
96 let (descriptor, mut readers) = self.stub.open_object(self.request, self.options).await?;
97 if readers.len() == 1 {
98 return Ok((descriptor, readers.pop().unwrap()));
99 }
100 // Even if the service returns multiple read ranges, with different ids,
101 // the code in the library will return an error and close the stream.
102 unreachable!("the stub cannot create more readers")
103 }
104}
105
106impl<S> OpenObject<S> {
107 pub(crate) fn new(
108 bucket: String,
109 object: String,
110 stub: Arc<S>,
111 options: RequestOptions,
112 ) -> Self {
113 let request = OpenObjectRequest::default()
114 .set_bucket(bucket)
115 .set_object(object);
116 Self {
117 request,
118 options,
119 stub,
120 }
121 }
122
123 /// If present, selects a specific revision of this object (as
124 /// opposed to the latest version, the default).
125 ///
126 /// # Example
127 /// ```
128 /// # use google_cloud_storage::client::Storage;
129 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
130 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
131 /// let response = client
132 /// .open_object("projects/_/buckets/my-bucket", "my-object")
133 /// .set_generation(123456)
134 /// .send()
135 /// .await?;
136 /// # Ok(()) }
137 /// ```
138 pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
139 self.request = self.request.set_generation(v.into());
140 self
141 }
142
143 /// Makes the operation conditional on whether the object's current generation
144 /// matches the given value.
145 ///
146 /// # Example
147 /// ```
148 /// # use google_cloud_storage::client::Storage;
149 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
150 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
151 /// let response = client
152 /// .open_object("projects/_/buckets/my-bucket", "my-object")
153 /// .set_if_generation_match(123456)
154 /// .send()
155 /// .await?;
156 /// # Ok(()) }
157 /// ```
158 pub fn set_if_generation_match<T>(mut self, v: T) -> Self
159 where
160 T: Into<i64>,
161 {
162 self.request = self.request.set_if_generation_match(v.into());
163 self
164 }
165
166 /// Makes the operation conditional on whether the object's live generation
167 /// does not match the given value. If no live object exists, the precondition
168 /// fails.
169 ///
170 /// # Example
171 /// ```
172 /// # use google_cloud_storage::client::Storage;
173 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
174 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
175 /// let response = client
176 /// .open_object("projects/_/buckets/my-bucket", "my-object")
177 /// .set_if_generation_not_match(123456)
178 /// .send()
179 /// .await?;
180 /// # Ok(()) }
181 /// ```
182 pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
183 where
184 T: Into<i64>,
185 {
186 self.request = self.request.set_if_generation_not_match(v.into());
187 self
188 }
189
190 /// Makes the operation conditional on whether the object's current
191 /// metageneration matches the given value.
192 ///
193 /// # Example
194 /// ```
195 /// # use google_cloud_storage::client::Storage;
196 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
197 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
198 /// let response = client
199 /// .open_object("projects/_/buckets/my-bucket", "my-object")
200 /// .set_if_metageneration_match(123456)
201 /// .send()
202 /// .await?;
203 /// # Ok(()) }
204 /// ```
205 pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
206 where
207 T: Into<i64>,
208 {
209 self.request = self.request.set_if_metageneration_match(v.into());
210 self
211 }
212
213 /// Makes the operation conditional on whether the object's current
214 /// metageneration does not match the given value.
215 ///
216 /// # Example
217 /// ```
218 /// # use google_cloud_storage::client::Storage;
219 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
220 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
221 /// let response = client
222 /// .open_object("projects/_/buckets/my-bucket", "my-object")
223 /// .set_if_metageneration_not_match(123456)
224 /// .send()
225 /// .await?;
226 /// # Ok(()) }
227 /// ```
228 pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
229 where
230 T: Into<i64>,
231 {
232 self.request = self.request.set_if_metageneration_not_match(v.into());
233 self
234 }
235
236 /// The encryption key used with the Customer-Supplied Encryption Keys
237 /// feature. In raw bytes format (not base64-encoded).
238 ///
239 /// Example:
240 /// ```
241 /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
242 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
243 /// let key: &[u8] = &[97; 32];
244 /// let response = client
245 /// .open_object("projects/_/buckets/my-bucket", "my-object")
246 /// .set_key(KeyAes256::new(key)?)
247 /// .send()
248 /// .await?;
249 /// println!("response details={response:?}");
250 /// # Ok(()) }
251 /// ```
252 pub fn set_key(mut self, v: KeyAes256) -> Self {
253 self.request = self
254 .request
255 .set_common_object_request_params(crate::model::CommonObjectRequestParams::from(v));
256 self
257 }
258
259 /// The retry policy used for this request.
260 ///
261 /// # Example
262 /// ```
263 /// # use google_cloud_storage::client::Storage;
264 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
265 /// use google_cloud_storage::retry_policy::RetryableErrors;
266 /// use std::time::Duration;
267 /// use google_cloud_gax::retry_policy::RetryPolicyExt;
268 /// let response = client
269 /// .open_object("projects/_/buckets/my-bucket", "my-object")
270 /// .with_retry_policy(
271 /// RetryableErrors
272 /// .with_attempt_limit(5)
273 /// .with_time_limit(Duration::from_secs(10)),
274 /// )
275 /// .send()
276 /// .await?;
277 /// println!("response details={response:?}");
278 /// # Ok(()) }
279 /// ```
280 pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
281 mut self,
282 v: V,
283 ) -> Self {
284 self.options.retry_policy = v.into().into();
285 self
286 }
287
288 /// The backoff policy used for this request.
289 ///
290 /// # Example
291 /// ```
292 /// # use google_cloud_storage::client::Storage;
293 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
294 /// use std::time::Duration;
295 /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
296 /// let response = client
297 /// .open_object("projects/_/buckets/my-bucket", "my-object")
298 /// .with_backoff_policy(ExponentialBackoff::default())
299 /// .send()
300 /// .await?;
301 /// println!("response details={response:?}");
302 /// # Ok(()) }
303 /// ```
304 pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
305 mut self,
306 v: V,
307 ) -> Self {
308 self.options.backoff_policy = v.into().into();
309 self
310 }
311
312 /// The retry throttler used for this request.
313 ///
314 /// Most of the time you want to use the same throttler for all the requests
315 /// in a client, and even the same throttler for many clients. Rarely it
316 /// may be necessary to use an custom throttler for some subset of the
317 /// requests.
318 ///
319 /// # Example
320 /// ```
321 /// # use google_cloud_storage::client::Storage;
322 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
323 /// let response = client
324 /// .open_object("projects/_/buckets/my-bucket", "my-object")
325 /// .with_retry_throttler(adhoc_throttler())
326 /// .send()
327 /// .await?;
328 /// println!("response details={response:?}");
329 /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
330 /// # panic!();
331 /// }
332 /// # Ok(()) }
333 /// ```
334 pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
335 mut self,
336 v: V,
337 ) -> Self {
338 self.options.retry_throttler = v.into().into();
339 self
340 }
341
342 /// Configure the resume policy for read requests.
343 ///
344 /// The Cloud Storage client library can automatically resume a read that is
345 /// interrupted by a transient error. Applications may want to limit the
346 /// number of read attempts, or may wish to expand the type of errors
347 /// treated as retryable.
348 ///
349 /// # Example
350 /// ```
351 /// # use google_cloud_storage::client::Storage;
352 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
353 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
354 /// let response = client
355 /// .open_object("projects/_/buckets/my-bucket", "my-object")
356 /// .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
357 /// .send()
358 /// .await?;
359 /// # Ok(()) }
360 /// ```
361 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
362 where
363 V: ReadResumePolicy + 'static,
364 {
365 self.options.set_read_resume_policy(std::sync::Arc::new(v));
366 self
367 }
368
369 /// Configure per-attempt timeout.
370 ///
371 /// # Example
372 /// ```
373 /// # use google_cloud_storage::client::Storage;
374 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
375 /// use std::time::Duration;
376 /// let response = client
377 /// .open_object("projects/_/buckets/my-bucket", "my-object")
378 /// .with_attempt_timeout(Duration::from_secs(120))
379 /// .send()
380 /// .await?;
381 /// # Ok(()) }
382 /// ```
383 ///
384 /// The Cloud Storage client library times out `open_object()` attempts by
385 /// default (with a 60s timeout). Applications may want to set a different
386 /// value depending on how they are deployed.
387 ///
388 /// Note that the per-attempt timeout is subject to the overall retry loop
389 /// time limits (if any). The effective timeout for each attempt is the
390 /// smallest of (a) the per-attempt timeout, and (b) the remaining time in
391 /// the retry loop.
392 pub fn with_attempt_timeout(mut self, v: Duration) -> Self {
393 self.options.set_bidi_attempt_timeout(v);
394 self
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::client::Storage;
402 use crate::model::{CommonObjectRequestParams, Object};
403 use crate::model_ext::tests::create_key_helper;
404 use anyhow::Result;
405 use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
406 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
407 use google_cloud_gax::retry_policy::NeverRetry;
408 use http::HeaderValue;
409 use static_assertions::assert_impl_all;
410 use storage_grpc_mock::google::storage::v2::{
411 BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
412 ReadRange as ProtoRange,
413 };
414 use storage_grpc_mock::{MockStorage, start};
415
416 // Verify `open_object()` meets normal Send, Sync, requirements.
417 #[tokio::test]
418 async fn traits() -> Result<()> {
419 assert_impl_all!(OpenObject: Clone, std::fmt::Debug);
420 assert_impl_all!(OpenObject: Send, Sync);
421
422 let client = Storage::builder()
423 .with_credentials(Anonymous::new().build())
424 .build()
425 .await?;
426
427 fn need_send<T: Send>(_val: &T) {}
428 fn need_static<T: 'static>(_val: &T) {}
429
430 let open = client.open_object("projects/_/buckets/test-bucket", "test-object");
431 need_static(&open);
432
433 let fut = client
434 .open_object("projects/_/buckets/test-bucket", "test-object")
435 .send();
436 need_send(&fut);
437 need_static(&fut);
438 Ok(())
439 }
440
441 #[tokio::test]
442 async fn open_object_normal() -> Result<()> {
443 const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
444
445 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
446 let initial = BidiReadObjectResponse {
447 metadata: Some(ProtoObject {
448 bucket: BUCKET_NAME.to_string(),
449 name: "test-object".to_string(),
450 generation: 123456,
451 size: 42,
452 ..ProtoObject::default()
453 }),
454 ..BidiReadObjectResponse::default()
455 };
456 tx.send(Ok(initial.clone())).await?;
457
458 let mut mock = MockStorage::new();
459 mock.expect_bidi_read_object()
460 .return_once(|_| Ok(TonicResponse::from(rx)));
461 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
462
463 let client = Storage::builder()
464 .with_endpoint(endpoint)
465 .with_credentials(Anonymous::new().build())
466 .build()
467 .await?;
468 let descriptor = client
469 .open_object(BUCKET_NAME, "test-object")
470 .send()
471 .await?;
472
473 let got = descriptor.object();
474 let want = Object::new()
475 .set_bucket(BUCKET_NAME)
476 .set_name("test-object")
477 .set_generation(123456)
478 .set_size(42);
479 assert_eq!(got, want);
480
481 Ok(())
482 }
483
484 #[tokio::test]
485 async fn attributes() -> Result<()> {
486 let options = RequestOptions::new();
487 let builder = OpenObject::new(
488 "bucket".to_string(),
489 "object".to_string(),
490 Arc::new(StorageStub),
491 options,
492 )
493 .set_generation(123)
494 .set_if_generation_match(234)
495 .set_if_generation_not_match(345)
496 .set_if_metageneration_match(456)
497 .set_if_metageneration_not_match(567);
498 let want = OpenObjectRequest::default()
499 .set_bucket("bucket")
500 .set_object("object")
501 .set_generation(123)
502 .set_if_generation_match(234)
503 .set_if_generation_not_match(345)
504 .set_if_metageneration_match(456)
505 .set_if_metageneration_not_match(567);
506 assert_eq!(builder.request, want);
507 Ok(())
508 }
509
510 #[tokio::test]
511 async fn csek() -> Result<()> {
512 let options = RequestOptions::new();
513 let builder = OpenObject::new(
514 "bucket".to_string(),
515 "object".to_string(),
516 Arc::new(StorageStub),
517 options,
518 );
519
520 let (raw_key, _, _, _) = create_key_helper();
521 let key = KeyAes256::new(&raw_key)?;
522 let builder = builder.set_key(key.clone());
523 let want = OpenObjectRequest::default()
524 .set_bucket("bucket")
525 .set_object("object")
526 .set_common_object_request_params(CommonObjectRequestParams::from(key));
527 assert_eq!(builder.request, want);
528 Ok(())
529 }
530
531 #[tokio::test]
532 async fn request_options() -> Result<()> {
533 use crate::read_resume_policy::NeverResume;
534 use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
535 use google_cloud_gax::retry_policy::Aip194Strict;
536 use google_cloud_gax::retry_throttler::CircuitBreaker;
537
538 let options = RequestOptions::new();
539 let builder = OpenObject::new(
540 "bucket".to_string(),
541 "object".to_string(),
542 Arc::new(StorageStub),
543 options.clone(),
544 )
545 .with_backoff_policy(
546 ExponentialBackoffBuilder::default()
547 .with_scaling(4.0)
548 .build()
549 .expect("expontial backoff builds"),
550 )
551 .with_retry_policy(Aip194Strict)
552 .with_retry_throttler(CircuitBreaker::default())
553 .with_read_resume_policy(NeverResume)
554 .with_attempt_timeout(Duration::from_secs(120));
555
556 let got = builder.options;
557 assert!(
558 format!("{:?}", got.backoff_policy).contains("ExponentialBackoff"),
559 "{got:?}"
560 );
561 assert!(
562 format!("{:?}", got.retry_policy).contains("Aip194Strict"),
563 "{got:?}"
564 );
565 assert!(
566 format!("{:?}", got.retry_throttler).contains("CircuitBreaker"),
567 "{got:?}"
568 );
569 assert!(
570 format!("{:?}", got.read_resume_policy()).contains("NeverResume"),
571 "{got:?}"
572 );
573 assert_eq!(
574 got.bidi_attempt_timeout,
575 Duration::from_secs(120),
576 "{got:?}"
577 );
578
579 Ok(())
580 }
581
582 #[tokio::test]
583 async fn send() -> anyhow::Result<()> {
584 use storage_grpc_mock::google::storage::v2::{
585 BidiReadObjectResponse, Object as ProtoObject,
586 };
587 use storage_grpc_mock::{MockStorage, start};
588
589 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
590 let initial = BidiReadObjectResponse {
591 metadata: Some(ProtoObject {
592 bucket: "projects/_/buckets/test-bucket".to_string(),
593 name: "test-object".to_string(),
594 generation: 123456,
595 ..ProtoObject::default()
596 }),
597 ..BidiReadObjectResponse::default()
598 };
599 tx.send(Ok(initial.clone())).await?;
600
601 let mut mock = MockStorage::new();
602 mock.expect_bidi_read_object()
603 .return_once(|_| Ok(TonicResponse::from(rx)));
604 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
605
606 let client = Storage::builder()
607 .with_credentials(Anonymous::new().build())
608 .with_endpoint(endpoint.clone())
609 .build()
610 .await?;
611
612 let descriptor = client
613 .open_object("projects/_/buckets/test-bucket", "test-object")
614 .send()
615 .await?;
616 let want = Object::new()
617 .set_bucket("projects/_/buckets/test-bucket")
618 .set_name("test-object")
619 .set_generation(123456);
620 assert_eq!(descriptor.object(), want, "{descriptor:?}");
621 assert_eq!(
622 descriptor.headers().get("content-type"),
623 Some(&HeaderValue::from_static("application/grpc")),
624 "headers={:?}",
625 descriptor.headers()
626 );
627 Ok(())
628 }
629
630 #[tokio::test]
631 async fn send_and_read() -> anyhow::Result<()> {
632 use storage_grpc_mock::{MockStorage, start};
633
634 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
635 let payload = Vec::from_iter((0..32).map(|i| i as u8));
636 let initial = BidiReadObjectResponse {
637 metadata: Some(ProtoObject {
638 bucket: "projects/_/buckets/test-bucket".to_string(),
639 name: "test-object".to_string(),
640 generation: 123456,
641 ..ProtoObject::default()
642 }),
643 object_data_ranges: vec![ObjectRangeData {
644 read_range: Some(ProtoRange {
645 read_id: 0_i64,
646 ..ProtoRange::default()
647 }),
648 range_end: true,
649 checksummed_data: Some(ChecksummedData {
650 content: payload.clone(),
651 crc32c: None,
652 }),
653 }],
654 ..BidiReadObjectResponse::default()
655 };
656 tx.send(Ok(initial.clone())).await?;
657
658 let mut mock = MockStorage::new();
659 mock.expect_bidi_read_object()
660 .return_once(|_| Ok(TonicResponse::from(rx)));
661 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
662
663 let client = Storage::builder()
664 .with_credentials(Anonymous::new().build())
665 .with_endpoint(endpoint.clone())
666 .build()
667 .await?;
668
669 let (descriptor, mut reader) = client
670 .open_object("projects/_/buckets/test-bucket", "test-object")
671 .send_and_read(ReadRange::tail(32))
672 .await?;
673 let want = Object::new()
674 .set_bucket("projects/_/buckets/test-bucket")
675 .set_name("test-object")
676 .set_generation(123456);
677 assert_eq!(descriptor.object(), want, "{descriptor:?}");
678 assert_eq!(
679 descriptor.headers().get("content-type"),
680 Some(&HeaderValue::from_static("application/grpc")),
681 "headers={:?}",
682 descriptor.headers()
683 );
684
685 let mut got_payload = Vec::new();
686 while let Some(chunk) = reader.next().await.transpose()? {
687 got_payload.extend_from_slice(&chunk);
688 }
689 assert_eq!(got_payload, payload);
690 Ok(())
691 }
692
693 #[tokio::test(start_paused = true)]
694 async fn timeout() -> anyhow::Result<()> {
695 use storage_grpc_mock::google::storage::v2::BidiReadObjectResponse;
696 use storage_grpc_mock::{MockStorage, start};
697
698 let (_tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
699
700 let mut mock = MockStorage::new();
701 mock.expect_bidi_read_object()
702 .return_once(|_| Ok(TonicResponse::from(rx)));
703 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
704
705 let client = Storage::builder()
706 .with_credentials(Anonymous::new().build())
707 .with_endpoint(endpoint.clone())
708 .with_retry_policy(NeverRetry)
709 .build()
710 .await?;
711
712 // This will timeout because we never send the initial message over `_tx`.
713 let target = Duration::from_secs(120);
714 let start = tokio::time::Instant::now();
715 let err = client
716 .open_object("projects/_/buckets/test-bucket", "test-object")
717 .with_attempt_timeout(target)
718 .send()
719 .await
720 .unwrap_err();
721 assert!(err.is_timeout(), "{err:?}");
722 assert_eq!(start.elapsed(), target);
723
724 Ok(())
725 }
726
727 #[derive(Debug)]
728 struct StorageStub;
729 impl crate::stub::Storage for StorageStub {}
730}