google_cloud_storage/storage/
open_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::Result;
16use crate::model_ext::{KeyAes256, OpenObjectRequest, ReadRange};
17use crate::object_descriptor::ObjectDescriptor;
18use crate::read_object::ReadObjectResponse;
19use crate::read_resume_policy::ReadResumePolicy;
20use crate::request_options::RequestOptions;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// A request builder for [Storage::open_object][crate::client::Storage::open_object].
25///
26/// # Example
27/// ```
28/// use google_cloud_storage::client::Storage;
29/// # use google_cloud_storage::builder::storage::OpenObject;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31///     let builder: OpenObject = client
32///         .open_object("projects/_/buckets/my-bucket", "my-object");
33///     let descriptor = builder
34///         .set_generation(123)
35///         .send()
36///         .await?;
37///     println!("object metadata={:?}", descriptor.object());
38///     // Use `descriptor` to read data from `my-object`.
39///     Ok(())
40/// }
41/// ```
42#[derive(Clone, Debug)]
43pub struct OpenObject<S = crate::storage::transport::Storage> {
44    stub: Arc<S>,
45    request: OpenObjectRequest,
46    options: RequestOptions,
47}
48
49impl<S> OpenObject<S>
50where
51    S: crate::storage::stub::Storage + 'static,
52{
53    /// Sends the request, returning a new object descriptor.
54    ///
55    /// Example:
56    /// ```ignore
57    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
58    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
59    /// let open = client
60    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
61    ///     .send()
62    ///     .await?;
63    /// println!("object metadata={:?}", open.object());
64    /// # Ok(()) }
65    /// ```
66    pub async fn send(self) -> Result<ObjectDescriptor> {
67        let (descriptor, _) = self.stub.open_object(self.request, self.options).await?;
68        Ok(descriptor)
69    }
70
71    /// Sends the request, returning a new object descriptor and reader.
72    ///
73    /// Example:
74    /// ```ignore
75    /// # use google_cloud_storage::client::Storage;
76    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
77    /// use google_cloud_storage::model_ext::ReadRange;
78    /// let (descriptor, mut reader) = client
79    ///     .open_object("projects/_/buckets/my-bucket", "my-object.parquet")
80    ///     .send_and_read(ReadRange::tail(32))
81    ///     .await?;
82    /// println!("object metadata={:?}", descriptor.object());
83    /// let data = reader.next().await.transpose()?;
84    /// # Ok(()) }
85    /// ```
86    ///
87    /// This method allows applications to open an object and issue a read
88    /// request in the same RPC, which is typically faster than opening an
89    /// object and then issuing a `read_range()` call. This may be useful when
90    /// opening objects that have metadata information in a footer or header.
91    pub async fn send_and_read(
92        mut self,
93        range: ReadRange,
94    ) -> Result<(ObjectDescriptor, ReadObjectResponse)> {
95        self.request.ranges.push(range);
96        let (descriptor, mut readers) = self.stub.open_object(self.request, self.options).await?;
97        if readers.len() == 1 {
98            return Ok((descriptor, readers.pop().unwrap()));
99        }
100        // Even if the service returns multiple read ranges, with different ids,
101        // the code in the library will return an error and close the stream.
102        unreachable!("the stub cannot create more readers")
103    }
104}
105
106impl<S> OpenObject<S> {
107    pub(crate) fn new(
108        bucket: String,
109        object: String,
110        stub: Arc<S>,
111        options: RequestOptions,
112    ) -> Self {
113        let request = OpenObjectRequest::default()
114            .set_bucket(bucket)
115            .set_object(object);
116        Self {
117            request,
118            options,
119            stub,
120        }
121    }
122
123    /// If present, selects a specific revision of this object (as
124    /// opposed to the latest version, the default).
125    ///
126    /// # Example
127    /// ```
128    /// # use google_cloud_storage::client::Storage;
129    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
130    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
131    /// let response = client
132    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
133    ///     .set_generation(123456)
134    ///     .send()
135    ///     .await?;
136    /// # Ok(()) }
137    /// ```
138    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
139        self.request = self.request.set_generation(v.into());
140        self
141    }
142
143    /// Makes the operation conditional on whether the object's current generation
144    /// matches the given value.
145    ///
146    /// # Example
147    /// ```
148    /// # use google_cloud_storage::client::Storage;
149    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
150    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
151    /// let response = client
152    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
153    ///     .set_if_generation_match(123456)
154    ///     .send()
155    ///     .await?;
156    /// # Ok(()) }
157    /// ```
158    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
159    where
160        T: Into<i64>,
161    {
162        self.request = self.request.set_if_generation_match(v.into());
163        self
164    }
165
166    /// Makes the operation conditional on whether the object's live generation
167    /// does not match the given value. If no live object exists, the precondition
168    /// fails.
169    ///
170    /// # Example
171    /// ```
172    /// # use google_cloud_storage::client::Storage;
173    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
174    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
175    /// let response = client
176    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
177    ///     .set_if_generation_not_match(123456)
178    ///     .send()
179    ///     .await?;
180    /// # Ok(()) }
181    /// ```
182    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
183    where
184        T: Into<i64>,
185    {
186        self.request = self.request.set_if_generation_not_match(v.into());
187        self
188    }
189
190    /// Makes the operation conditional on whether the object's current
191    /// metageneration matches the given value.
192    ///
193    /// # Example
194    /// ```
195    /// # use google_cloud_storage::client::Storage;
196    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
197    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
198    /// let response = client
199    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
200    ///     .set_if_metageneration_match(123456)
201    ///     .send()
202    ///     .await?;
203    /// # Ok(()) }
204    /// ```
205    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
206    where
207        T: Into<i64>,
208    {
209        self.request = self.request.set_if_metageneration_match(v.into());
210        self
211    }
212
213    /// Makes the operation conditional on whether the object's current
214    /// metageneration does not match the given value.
215    ///
216    /// # Example
217    /// ```
218    /// # use google_cloud_storage::client::Storage;
219    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
220    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
221    /// let response = client
222    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
223    ///     .set_if_metageneration_not_match(123456)
224    ///     .send()
225    ///     .await?;
226    /// # Ok(()) }
227    /// ```
228    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
229    where
230        T: Into<i64>,
231    {
232        self.request = self.request.set_if_metageneration_not_match(v.into());
233        self
234    }
235
236    /// The encryption key used with the Customer-Supplied Encryption Keys
237    /// feature. In raw bytes format (not base64-encoded).
238    ///
239    /// Example:
240    /// ```
241    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
242    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
243    /// let key: &[u8] = &[97; 32];
244    /// let response = client
245    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
246    ///     .set_key(KeyAes256::new(key)?)
247    ///     .send()
248    ///     .await?;
249    /// println!("response details={response:?}");
250    /// # Ok(()) }
251    /// ```
252    pub fn set_key(mut self, v: KeyAes256) -> Self {
253        self.request = self
254            .request
255            .set_common_object_request_params(crate::model::CommonObjectRequestParams::from(v));
256        self
257    }
258
259    /// The retry policy used for this request.
260    ///
261    /// # Example
262    /// ```
263    /// # use google_cloud_storage::client::Storage;
264    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
265    /// use google_cloud_storage::retry_policy::RetryableErrors;
266    /// use std::time::Duration;
267    /// use gax::retry_policy::RetryPolicyExt;
268    /// let response = client
269    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
270    ///     .with_retry_policy(
271    ///         RetryableErrors
272    ///             .with_attempt_limit(5)
273    ///             .with_time_limit(Duration::from_secs(10)),
274    ///     )
275    ///     .send()
276    ///     .await?;
277    /// println!("response details={response:?}");
278    /// # Ok(()) }
279    /// ```
280    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
281        self.options.retry_policy = v.into().into();
282        self
283    }
284
285    /// The backoff policy used for this request.
286    ///
287    /// # Example
288    /// ```
289    /// # use google_cloud_storage::client::Storage;
290    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
291    /// use std::time::Duration;
292    /// use gax::exponential_backoff::ExponentialBackoff;
293    /// let response = client
294    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
295    ///     .with_backoff_policy(ExponentialBackoff::default())
296    ///     .send()
297    ///     .await?;
298    /// println!("response details={response:?}");
299    /// # Ok(()) }
300    /// ```
301    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
302        mut self,
303        v: V,
304    ) -> Self {
305        self.options.backoff_policy = v.into().into();
306        self
307    }
308
309    /// The retry throttler used for this request.
310    ///
311    /// Most of the time you want to use the same throttler for all the requests
312    /// in a client, and even the same throttler for many clients. Rarely it
313    /// may be necessary to use an custom throttler for some subset of the
314    /// requests.
315    ///
316    /// # Example
317    /// ```
318    /// # use google_cloud_storage::client::Storage;
319    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
320    /// let response = client
321    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
322    ///     .with_retry_throttler(adhoc_throttler())
323    ///     .send()
324    ///     .await?;
325    /// println!("response details={response:?}");
326    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
327    ///     # panic!();
328    /// }
329    /// # Ok(()) }
330    /// ```
331    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
332        mut self,
333        v: V,
334    ) -> Self {
335        self.options.retry_throttler = v.into().into();
336        self
337    }
338
339    /// Configure the resume policy for read requests.
340    ///
341    /// The Cloud Storage client library can automatically resume a read that is
342    /// interrupted by a transient error. Applications may want to limit the
343    /// number of read attempts, or may wish to expand the type of errors
344    /// treated as retryable.
345    ///
346    /// # Example
347    /// ```
348    /// # use google_cloud_storage::client::Storage;
349    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
350    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
351    /// let response = client
352    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
353    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
354    ///     .send()
355    ///     .await?;
356    /// # Ok(()) }
357    /// ```
358    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
359    where
360        V: ReadResumePolicy + 'static,
361    {
362        self.options.set_read_resume_policy(std::sync::Arc::new(v));
363        self
364    }
365
366    /// Configure per-attempt timeout.
367    ///
368    /// # Example
369    /// ```
370    /// # use google_cloud_storage::client::Storage;
371    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
372    /// use std::time::Duration;
373    /// let response = client
374    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
375    ///     .with_attempt_timeout(Duration::from_secs(120))
376    ///     .send()
377    ///     .await?;
378    /// # Ok(()) }
379    /// ```
380    ///
381    /// The Cloud Storage client library times out `open_object()` attempts by
382    /// default (with a 60s timeout). Applications may want to set a different
383    /// value depending on how they are deployed.
384    ///
385    /// Note that the per-attempt timeout is subject to the overall retry loop
386    /// time limits (if any). The effective timeout for each attempt is the
387    /// smallest of (a) the per-attempt timeout, and (b) the remaining time in
388    /// the retry loop.
389    pub fn with_attempt_timeout(mut self, v: Duration) -> Self {
390        self.options.set_bidi_attempt_timeout(v);
391        self
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398    use crate::client::Storage;
399    use crate::model::{CommonObjectRequestParams, Object};
400    use crate::model_ext::tests::create_key_helper;
401    use anyhow::Result;
402    use gax::retry_policy::NeverRetry;
403    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
404    use http::HeaderValue;
405    use static_assertions::assert_impl_all;
406    use storage_grpc_mock::google::storage::v2::{
407        BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
408        ReadRange as ProtoRange,
409    };
410    use storage_grpc_mock::{MockStorage, start};
411
412    // Verify `open_object()` meets normal Send, Sync, requirements.
413    #[tokio::test]
414    async fn traits() -> Result<()> {
415        assert_impl_all!(OpenObject: Clone, std::fmt::Debug);
416        assert_impl_all!(OpenObject: Send, Sync);
417
418        let client = Storage::builder()
419            .with_credentials(Anonymous::new().build())
420            .build()
421            .await?;
422
423        fn need_send<T: Send>(_val: &T) {}
424        fn need_static<T: 'static>(_val: &T) {}
425
426        let open = client.open_object("projects/_/buckets/test-bucket", "test-object");
427        need_static(&open);
428
429        let fut = client
430            .open_object("projects/_/buckets/test-bucket", "test-object")
431            .send();
432        need_send(&fut);
433        need_static(&fut);
434        Ok(())
435    }
436
437    #[tokio::test]
438    async fn open_object_normal() -> Result<()> {
439        const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
440
441        let (tx, rx) = tokio::sync::mpsc::channel::<tonic::Result<BidiReadObjectResponse>>(1);
442        let initial = BidiReadObjectResponse {
443            metadata: Some(ProtoObject {
444                bucket: BUCKET_NAME.to_string(),
445                name: "test-object".to_string(),
446                generation: 123456,
447                size: 42,
448                ..ProtoObject::default()
449            }),
450            ..BidiReadObjectResponse::default()
451        };
452        tx.send(Ok(initial.clone())).await?;
453
454        let mut mock = MockStorage::new();
455        mock.expect_bidi_read_object()
456            .return_once(|_| Ok(tonic::Response::from(rx)));
457        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
458
459        let client = Storage::builder()
460            .with_endpoint(endpoint)
461            .with_credentials(Anonymous::new().build())
462            .build()
463            .await?;
464        let descriptor = client
465            .open_object(BUCKET_NAME, "test-object")
466            .send()
467            .await?;
468
469        let got = descriptor.object();
470        let want = Object::new()
471            .set_bucket(BUCKET_NAME)
472            .set_name("test-object")
473            .set_generation(123456)
474            .set_size(42);
475        assert_eq!(got, want);
476
477        Ok(())
478    }
479
480    #[tokio::test]
481    async fn attributes() -> Result<()> {
482        let options = RequestOptions::new();
483        let builder = OpenObject::new(
484            "bucket".to_string(),
485            "object".to_string(),
486            Arc::new(StorageStub),
487            options,
488        )
489        .set_generation(123)
490        .set_if_generation_match(234)
491        .set_if_generation_not_match(345)
492        .set_if_metageneration_match(456)
493        .set_if_metageneration_not_match(567);
494        let want = OpenObjectRequest::default()
495            .set_bucket("bucket")
496            .set_object("object")
497            .set_generation(123)
498            .set_if_generation_match(234)
499            .set_if_generation_not_match(345)
500            .set_if_metageneration_match(456)
501            .set_if_metageneration_not_match(567);
502        assert_eq!(builder.request, want);
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn csek() -> Result<()> {
508        let options = RequestOptions::new();
509        let builder = OpenObject::new(
510            "bucket".to_string(),
511            "object".to_string(),
512            Arc::new(StorageStub),
513            options,
514        );
515
516        let (raw_key, _, _, _) = create_key_helper();
517        let key = KeyAes256::new(&raw_key)?;
518        let builder = builder.set_key(key.clone());
519        let want = OpenObjectRequest::default()
520            .set_bucket("bucket")
521            .set_object("object")
522            .set_common_object_request_params(CommonObjectRequestParams::from(key));
523        assert_eq!(builder.request, want);
524        Ok(())
525    }
526
527    #[tokio::test]
528    async fn request_options() -> Result<()> {
529        use crate::read_resume_policy::NeverResume;
530        use gax::exponential_backoff::ExponentialBackoffBuilder;
531        use gax::retry_policy::Aip194Strict;
532        use gax::retry_throttler::CircuitBreaker;
533
534        let options = RequestOptions::new();
535        let builder = OpenObject::new(
536            "bucket".to_string(),
537            "object".to_string(),
538            Arc::new(StorageStub),
539            options.clone(),
540        )
541        .with_backoff_policy(
542            ExponentialBackoffBuilder::default()
543                .with_scaling(4.0)
544                .build()
545                .expect("expontial backoff builds"),
546        )
547        .with_retry_policy(Aip194Strict)
548        .with_retry_throttler(CircuitBreaker::default())
549        .with_read_resume_policy(NeverResume)
550        .with_attempt_timeout(Duration::from_secs(120));
551
552        let got = builder.options;
553        assert!(
554            format!("{:?}", got.backoff_policy).contains("ExponentialBackoff"),
555            "{got:?}"
556        );
557        assert!(
558            format!("{:?}", got.retry_policy).contains("Aip194Strict"),
559            "{got:?}"
560        );
561        assert!(
562            format!("{:?}", got.retry_throttler).contains("CircuitBreaker"),
563            "{got:?}"
564        );
565        assert!(
566            format!("{:?}", got.read_resume_policy()).contains("NeverResume"),
567            "{got:?}"
568        );
569        assert_eq!(
570            got.bidi_attempt_timeout,
571            Duration::from_secs(120),
572            "{got:?}"
573        );
574
575        Ok(())
576    }
577
578    #[tokio::test]
579    async fn send() -> anyhow::Result<()> {
580        use storage_grpc_mock::google::storage::v2::{
581            BidiReadObjectResponse, Object as ProtoObject,
582        };
583        use storage_grpc_mock::{MockStorage, start};
584
585        let (tx, rx) = tokio::sync::mpsc::channel::<tonic::Result<BidiReadObjectResponse>>(1);
586        let initial = BidiReadObjectResponse {
587            metadata: Some(ProtoObject {
588                bucket: "projects/_/buckets/test-bucket".to_string(),
589                name: "test-object".to_string(),
590                generation: 123456,
591                ..ProtoObject::default()
592            }),
593            ..BidiReadObjectResponse::default()
594        };
595        tx.send(Ok(initial.clone())).await?;
596
597        let mut mock = MockStorage::new();
598        mock.expect_bidi_read_object()
599            .return_once(|_| Ok(tonic::Response::from(rx)));
600        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
601
602        let client = Storage::builder()
603            .with_credentials(Anonymous::new().build())
604            .with_endpoint(endpoint.clone())
605            .build()
606            .await?;
607
608        let descriptor = client
609            .open_object("projects/_/buckets/test-bucket", "test-object")
610            .send()
611            .await?;
612        let want = Object::new()
613            .set_bucket("projects/_/buckets/test-bucket")
614            .set_name("test-object")
615            .set_generation(123456);
616        assert_eq!(descriptor.object(), want, "{descriptor:?}");
617        assert_eq!(
618            descriptor.headers().get("content-type"),
619            Some(&HeaderValue::from_static("application/grpc")),
620            "headers={:?}",
621            descriptor.headers()
622        );
623        Ok(())
624    }
625
626    #[tokio::test]
627    async fn send_and_read() -> anyhow::Result<()> {
628        use storage_grpc_mock::{MockStorage, start};
629
630        let (tx, rx) = tokio::sync::mpsc::channel::<tonic::Result<BidiReadObjectResponse>>(1);
631        let payload = Vec::from_iter((0..32).map(|i| i as u8));
632        let initial = BidiReadObjectResponse {
633            metadata: Some(ProtoObject {
634                bucket: "projects/_/buckets/test-bucket".to_string(),
635                name: "test-object".to_string(),
636                generation: 123456,
637                ..ProtoObject::default()
638            }),
639            object_data_ranges: vec![ObjectRangeData {
640                read_range: Some(ProtoRange {
641                    read_id: 0_i64,
642                    ..ProtoRange::default()
643                }),
644                range_end: true,
645                checksummed_data: Some(ChecksummedData {
646                    content: payload.clone(),
647                    crc32c: None,
648                }),
649            }],
650            ..BidiReadObjectResponse::default()
651        };
652        tx.send(Ok(initial.clone())).await?;
653
654        let mut mock = MockStorage::new();
655        mock.expect_bidi_read_object()
656            .return_once(|_| Ok(tonic::Response::from(rx)));
657        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
658
659        let client = Storage::builder()
660            .with_credentials(Anonymous::new().build())
661            .with_endpoint(endpoint.clone())
662            .build()
663            .await?;
664
665        let (descriptor, mut reader) = client
666            .open_object("projects/_/buckets/test-bucket", "test-object")
667            .send_and_read(ReadRange::tail(32))
668            .await?;
669        let want = Object::new()
670            .set_bucket("projects/_/buckets/test-bucket")
671            .set_name("test-object")
672            .set_generation(123456);
673        assert_eq!(descriptor.object(), want, "{descriptor:?}");
674        assert_eq!(
675            descriptor.headers().get("content-type"),
676            Some(&HeaderValue::from_static("application/grpc")),
677            "headers={:?}",
678            descriptor.headers()
679        );
680
681        let mut got_payload = Vec::new();
682        while let Some(chunk) = reader.next().await.transpose()? {
683            got_payload.extend_from_slice(&chunk);
684        }
685        assert_eq!(got_payload, payload);
686        Ok(())
687    }
688
689    #[tokio::test(start_paused = true)]
690    async fn timeout() -> anyhow::Result<()> {
691        use storage_grpc_mock::google::storage::v2::BidiReadObjectResponse;
692        use storage_grpc_mock::{MockStorage, start};
693
694        let (_tx, rx) = tokio::sync::mpsc::channel::<tonic::Result<BidiReadObjectResponse>>(1);
695
696        let mut mock = MockStorage::new();
697        mock.expect_bidi_read_object()
698            .return_once(|_| Ok(tonic::Response::from(rx)));
699        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
700
701        let client = Storage::builder()
702            .with_credentials(Anonymous::new().build())
703            .with_endpoint(endpoint.clone())
704            .with_retry_policy(NeverRetry)
705            .build()
706            .await?;
707
708        // This will timeout because we never send the initial message over `_tx`.
709        let target = Duration::from_secs(120);
710        let start = tokio::time::Instant::now();
711        let err = client
712            .open_object("projects/_/buckets/test-bucket", "test-object")
713            .with_attempt_timeout(target)
714            .send()
715            .await
716            .unwrap_err();
717        assert!(err.is_timeout(), "{err:?}");
718        assert_eq!(start.elapsed(), target);
719
720        Ok(())
721    }
722
723    #[derive(Debug)]
724    struct StorageStub;
725    impl crate::stub::Storage for StorageStub {}
726}