Skip to main content

google_cloud_storage/storage/
open_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::Result;
16use crate::model_ext::{KeyAes256, OpenObjectRequest, ReadRange};
17use crate::object_descriptor::ObjectDescriptor;
18use crate::read_object::ReadObjectResponse;
19use crate::read_resume_policy::ReadResumePolicy;
20use crate::request_options::RequestOptions;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// A request builder for [Storage::open_object][crate::client::Storage::open_object].
25///
26/// # Example
27/// ```
28/// use google_cloud_storage::client::Storage;
29/// # use google_cloud_storage::builder::storage::OpenObject;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31///     let builder: OpenObject = client
32///         .open_object("projects/_/buckets/my-bucket", "my-object");
33///     let descriptor = builder
34///         .set_generation(123)
35///         .send()
36///         .await?;
37///     println!("object metadata={:?}", descriptor.object());
38///     // Use `descriptor` to read data from `my-object`.
39///     Ok(())
40/// }
41/// ```
42#[derive(Clone, Debug)]
43pub struct OpenObject<S = crate::storage::transport::Storage> {
44    stub: Arc<S>,
45    request: OpenObjectRequest,
46    options: RequestOptions,
47}
48
49impl<S> OpenObject<S>
50where
51    S: crate::storage::stub::Storage + 'static,
52{
53    /// Sends the request, returning a new object descriptor.
54    ///
55    /// Example:
56    /// ```ignore
57    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
58    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
59    /// let open = client
60    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
61    ///     .send()
62    ///     .await?;
63    /// println!("object metadata={:?}", open.object());
64    /// # Ok(()) }
65    /// ```
66    pub async fn send(self) -> Result<ObjectDescriptor> {
67        let (descriptor, _) = self.stub.open_object(self.request, self.options).await?;
68        Ok(descriptor)
69    }
70
71    /// Sends the request, returning a new object descriptor and reader.
72    ///
73    /// Example:
74    /// ```ignore
75    /// # use google_cloud_storage::client::Storage;
76    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
77    /// use google_cloud_storage::model_ext::ReadRange;
78    /// let (descriptor, mut reader) = client
79    ///     .open_object("projects/_/buckets/my-bucket", "my-object.parquet")
80    ///     .send_and_read(ReadRange::tail(32))
81    ///     .await?;
82    /// println!("object metadata={:?}", descriptor.object());
83    /// let data = reader.next().await.transpose()?;
84    /// # Ok(()) }
85    /// ```
86    ///
87    /// This method allows applications to open an object and issue a read
88    /// request in the same RPC, which is typically faster than opening an
89    /// object and then issuing a `read_range()` call. This may be useful when
90    /// opening objects that have metadata information in a footer or header.
91    pub async fn send_and_read(
92        mut self,
93        range: ReadRange,
94    ) -> Result<(ObjectDescriptor, ReadObjectResponse)> {
95        self.request.ranges.push(range);
96        let (descriptor, mut readers) = self.stub.open_object(self.request, self.options).await?;
97        if readers.len() == 1 {
98            return Ok((descriptor, readers.pop().unwrap()));
99        }
100        // Even if the service returns multiple read ranges, with different ids,
101        // the code in the library will return an error and close the stream.
102        unreachable!("the stub cannot create more readers")
103    }
104}
105
106impl<S> OpenObject<S> {
107    pub(crate) fn new(
108        bucket: String,
109        object: String,
110        stub: Arc<S>,
111        options: RequestOptions,
112    ) -> Self {
113        let request = OpenObjectRequest::default()
114            .set_bucket(bucket)
115            .set_object(object);
116        Self {
117            request,
118            options,
119            stub,
120        }
121    }
122
123    /// If present, selects a specific revision of this object (as
124    /// opposed to the latest version, the default).
125    ///
126    /// # Example
127    /// ```
128    /// # use google_cloud_storage::client::Storage;
129    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
130    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
131    /// let response = client
132    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
133    ///     .set_generation(123456)
134    ///     .send()
135    ///     .await?;
136    /// # Ok(()) }
137    /// ```
138    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
139        self.request = self.request.set_generation(v.into());
140        self
141    }
142
143    /// Makes the operation conditional on whether the object's current generation
144    /// matches the given value.
145    ///
146    /// # Example
147    /// ```
148    /// # use google_cloud_storage::client::Storage;
149    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
150    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
151    /// let response = client
152    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
153    ///     .set_if_generation_match(123456)
154    ///     .send()
155    ///     .await?;
156    /// # Ok(()) }
157    /// ```
158    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
159    where
160        T: Into<i64>,
161    {
162        self.request = self.request.set_if_generation_match(v.into());
163        self
164    }
165
166    /// Makes the operation conditional on whether the object's live generation
167    /// does not match the given value. If no live object exists, the precondition
168    /// fails.
169    ///
170    /// # Example
171    /// ```
172    /// # use google_cloud_storage::client::Storage;
173    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
174    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
175    /// let response = client
176    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
177    ///     .set_if_generation_not_match(123456)
178    ///     .send()
179    ///     .await?;
180    /// # Ok(()) }
181    /// ```
182    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
183    where
184        T: Into<i64>,
185    {
186        self.request = self.request.set_if_generation_not_match(v.into());
187        self
188    }
189
190    /// Makes the operation conditional on whether the object's current
191    /// metageneration matches the given value.
192    ///
193    /// # Example
194    /// ```
195    /// # use google_cloud_storage::client::Storage;
196    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
197    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
198    /// let response = client
199    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
200    ///     .set_if_metageneration_match(123456)
201    ///     .send()
202    ///     .await?;
203    /// # Ok(()) }
204    /// ```
205    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
206    where
207        T: Into<i64>,
208    {
209        self.request = self.request.set_if_metageneration_match(v.into());
210        self
211    }
212
213    /// Makes the operation conditional on whether the object's current
214    /// metageneration does not match the given value.
215    ///
216    /// # Example
217    /// ```
218    /// # use google_cloud_storage::client::Storage;
219    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
220    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
221    /// let response = client
222    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
223    ///     .set_if_metageneration_not_match(123456)
224    ///     .send()
225    ///     .await?;
226    /// # Ok(()) }
227    /// ```
228    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
229    where
230        T: Into<i64>,
231    {
232        self.request = self.request.set_if_metageneration_not_match(v.into());
233        self
234    }
235
236    /// The encryption key used with the Customer-Supplied Encryption Keys
237    /// feature. In raw bytes format (not base64-encoded).
238    ///
239    /// Example:
240    /// ```
241    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
242    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
243    /// let key: &[u8] = &[97; 32];
244    /// let response = client
245    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
246    ///     .set_key(KeyAes256::new(key)?)
247    ///     .send()
248    ///     .await?;
249    /// println!("response details={response:?}");
250    /// # Ok(()) }
251    /// ```
252    pub fn set_key(mut self, v: KeyAes256) -> Self {
253        self.request = self
254            .request
255            .set_common_object_request_params(crate::model::CommonObjectRequestParams::from(v));
256        self
257    }
258
259    /// The retry policy used for this request.
260    ///
261    /// # Example
262    /// ```
263    /// # use google_cloud_storage::client::Storage;
264    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
265    /// use google_cloud_storage::retry_policy::RetryableErrors;
266    /// use std::time::Duration;
267    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
268    /// let response = client
269    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
270    ///     .with_retry_policy(
271    ///         RetryableErrors
272    ///             .with_attempt_limit(5)
273    ///             .with_time_limit(Duration::from_secs(10)),
274    ///     )
275    ///     .send()
276    ///     .await?;
277    /// println!("response details={response:?}");
278    /// # Ok(()) }
279    /// ```
280    pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
281        mut self,
282        v: V,
283    ) -> Self {
284        self.options.retry_policy = v.into().into();
285        self
286    }
287
288    /// The backoff policy used for this request.
289    ///
290    /// # Example
291    /// ```
292    /// # use google_cloud_storage::client::Storage;
293    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
294    /// use std::time::Duration;
295    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
296    /// let response = client
297    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
298    ///     .with_backoff_policy(ExponentialBackoff::default())
299    ///     .send()
300    ///     .await?;
301    /// println!("response details={response:?}");
302    /// # Ok(()) }
303    /// ```
304    pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
305        mut self,
306        v: V,
307    ) -> Self {
308        self.options.backoff_policy = v.into().into();
309        self
310    }
311
312    /// The retry throttler used for this request.
313    ///
314    /// Most of the time you want to use the same throttler for all the requests
315    /// in a client, and even the same throttler for many clients. Rarely it
316    /// may be necessary to use an custom throttler for some subset of the
317    /// requests.
318    ///
319    /// # Example
320    /// ```
321    /// # use google_cloud_storage::client::Storage;
322    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
323    /// let response = client
324    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
325    ///     .with_retry_throttler(adhoc_throttler())
326    ///     .send()
327    ///     .await?;
328    /// println!("response details={response:?}");
329    /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
330    ///     # panic!();
331    /// }
332    /// # Ok(()) }
333    /// ```
334    pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
335        mut self,
336        v: V,
337    ) -> Self {
338        self.options.retry_throttler = v.into().into();
339        self
340    }
341
342    /// Configure the resume policy for read requests.
343    ///
344    /// The Cloud Storage client library can automatically resume a read that is
345    /// interrupted by a transient error. Applications may want to limit the
346    /// number of read attempts, or may wish to expand the type of errors
347    /// treated as retryable.
348    ///
349    /// # Example
350    /// ```
351    /// # use google_cloud_storage::client::Storage;
352    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
353    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
354    /// let response = client
355    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
356    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
357    ///     .send()
358    ///     .await?;
359    /// # Ok(()) }
360    /// ```
361    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
362    where
363        V: ReadResumePolicy + 'static,
364    {
365        self.options.set_read_resume_policy(std::sync::Arc::new(v));
366        self
367    }
368
369    /// Configure per-attempt timeout.
370    ///
371    /// # Example
372    /// ```
373    /// # use google_cloud_storage::client::Storage;
374    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
375    /// use std::time::Duration;
376    /// let response = client
377    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
378    ///     .with_attempt_timeout(Duration::from_secs(120))
379    ///     .send()
380    ///     .await?;
381    /// # Ok(()) }
382    /// ```
383    ///
384    /// The Cloud Storage client library times out `open_object()` attempts by
385    /// default (with a 60s timeout). Applications may want to set a different
386    /// value depending on how they are deployed.
387    ///
388    /// Note that the per-attempt timeout is subject to the overall retry loop
389    /// time limits (if any). The effective timeout for each attempt is the
390    /// smallest of (a) the per-attempt timeout, and (b) the remaining time in
391    /// the retry loop.
392    pub fn with_attempt_timeout(mut self, v: Duration) -> Self {
393        self.options.set_bidi_attempt_timeout(v);
394        self
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::client::Storage;
402    use crate::model::{CommonObjectRequestParams, Object};
403    use crate::model_ext::tests::create_key_helper;
404    use anyhow::Result;
405    use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
406    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
407    use google_cloud_gax::retry_policy::NeverRetry;
408    use http::HeaderValue;
409    use static_assertions::assert_impl_all;
410    use storage_grpc_mock::google::storage::v2::{
411        BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
412        ReadRange as ProtoRange,
413    };
414    use storage_grpc_mock::{MockStorage, start};
415
416    // Verify `open_object()` meets normal Send, Sync, requirements.
417    #[tokio::test]
418    async fn traits() -> Result<()> {
419        assert_impl_all!(OpenObject: Clone, std::fmt::Debug);
420        assert_impl_all!(OpenObject: Send, Sync);
421
422        let client = Storage::builder()
423            .with_credentials(Anonymous::new().build())
424            .build()
425            .await?;
426
427        fn need_send<T: Send>(_val: &T) {}
428        fn need_static<T: 'static>(_val: &T) {}
429
430        let open = client.open_object("projects/_/buckets/test-bucket", "test-object");
431        need_static(&open);
432
433        let fut = client
434            .open_object("projects/_/buckets/test-bucket", "test-object")
435            .send();
436        need_send(&fut);
437        need_static(&fut);
438        Ok(())
439    }
440
441    #[tokio::test]
442    async fn open_object_normal() -> Result<()> {
443        const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
444
445        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
446        let initial = BidiReadObjectResponse {
447            metadata: Some(ProtoObject {
448                bucket: BUCKET_NAME.to_string(),
449                name: "test-object".to_string(),
450                generation: 123456,
451                size: 42,
452                ..ProtoObject::default()
453            }),
454            ..BidiReadObjectResponse::default()
455        };
456        tx.send(Ok(initial.clone())).await?;
457
458        let mut mock = MockStorage::new();
459        mock.expect_bidi_read_object()
460            .return_once(|_| Ok(TonicResponse::from(rx)));
461        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
462
463        let client = Storage::builder()
464            .with_endpoint(endpoint)
465            .with_credentials(Anonymous::new().build())
466            .build()
467            .await?;
468        let descriptor = client
469            .open_object(BUCKET_NAME, "test-object")
470            .send()
471            .await?;
472
473        let got = descriptor.object();
474        let want = Object::new()
475            .set_bucket(BUCKET_NAME)
476            .set_name("test-object")
477            .set_generation(123456)
478            .set_size(42);
479        assert_eq!(got, want);
480
481        Ok(())
482    }
483
484    #[tokio::test]
485    async fn attributes() -> Result<()> {
486        let options = RequestOptions::new();
487        let builder = OpenObject::new(
488            "bucket".to_string(),
489            "object".to_string(),
490            Arc::new(StorageStub),
491            options,
492        )
493        .set_generation(123)
494        .set_if_generation_match(234)
495        .set_if_generation_not_match(345)
496        .set_if_metageneration_match(456)
497        .set_if_metageneration_not_match(567);
498        let want = OpenObjectRequest::default()
499            .set_bucket("bucket")
500            .set_object("object")
501            .set_generation(123)
502            .set_if_generation_match(234)
503            .set_if_generation_not_match(345)
504            .set_if_metageneration_match(456)
505            .set_if_metageneration_not_match(567);
506        assert_eq!(builder.request, want);
507        Ok(())
508    }
509
510    #[tokio::test]
511    async fn csek() -> Result<()> {
512        let options = RequestOptions::new();
513        let builder = OpenObject::new(
514            "bucket".to_string(),
515            "object".to_string(),
516            Arc::new(StorageStub),
517            options,
518        );
519
520        let (raw_key, _, _, _) = create_key_helper();
521        let key = KeyAes256::new(&raw_key)?;
522        let builder = builder.set_key(key.clone());
523        let want = OpenObjectRequest::default()
524            .set_bucket("bucket")
525            .set_object("object")
526            .set_common_object_request_params(CommonObjectRequestParams::from(key));
527        assert_eq!(builder.request, want);
528        Ok(())
529    }
530
531    #[tokio::test]
532    async fn request_options() -> Result<()> {
533        use crate::read_resume_policy::NeverResume;
534        use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
535        use google_cloud_gax::retry_policy::Aip194Strict;
536        use google_cloud_gax::retry_throttler::CircuitBreaker;
537
538        let options = RequestOptions::new();
539        let builder = OpenObject::new(
540            "bucket".to_string(),
541            "object".to_string(),
542            Arc::new(StorageStub),
543            options.clone(),
544        )
545        .with_backoff_policy(
546            ExponentialBackoffBuilder::default()
547                .with_scaling(4.0)
548                .build()
549                .expect("expontial backoff builds"),
550        )
551        .with_retry_policy(Aip194Strict)
552        .with_retry_throttler(CircuitBreaker::default())
553        .with_read_resume_policy(NeverResume)
554        .with_attempt_timeout(Duration::from_secs(120));
555
556        let got = builder.options;
557        assert!(
558            format!("{:?}", got.backoff_policy).contains("ExponentialBackoff"),
559            "{got:?}"
560        );
561        assert!(
562            format!("{:?}", got.retry_policy).contains("Aip194Strict"),
563            "{got:?}"
564        );
565        assert!(
566            format!("{:?}", got.retry_throttler).contains("CircuitBreaker"),
567            "{got:?}"
568        );
569        assert!(
570            format!("{:?}", got.read_resume_policy()).contains("NeverResume"),
571            "{got:?}"
572        );
573        assert_eq!(
574            got.bidi_attempt_timeout,
575            Duration::from_secs(120),
576            "{got:?}"
577        );
578
579        Ok(())
580    }
581
582    #[tokio::test]
583    async fn send() -> anyhow::Result<()> {
584        use storage_grpc_mock::google::storage::v2::{
585            BidiReadObjectResponse, Object as ProtoObject,
586        };
587        use storage_grpc_mock::{MockStorage, start};
588
589        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
590        let initial = BidiReadObjectResponse {
591            metadata: Some(ProtoObject {
592                bucket: "projects/_/buckets/test-bucket".to_string(),
593                name: "test-object".to_string(),
594                generation: 123456,
595                ..ProtoObject::default()
596            }),
597            ..BidiReadObjectResponse::default()
598        };
599        tx.send(Ok(initial.clone())).await?;
600
601        let mut mock = MockStorage::new();
602        mock.expect_bidi_read_object()
603            .return_once(|_| Ok(TonicResponse::from(rx)));
604        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
605
606        let client = Storage::builder()
607            .with_credentials(Anonymous::new().build())
608            .with_endpoint(endpoint.clone())
609            .build()
610            .await?;
611
612        let descriptor = client
613            .open_object("projects/_/buckets/test-bucket", "test-object")
614            .send()
615            .await?;
616        let want = Object::new()
617            .set_bucket("projects/_/buckets/test-bucket")
618            .set_name("test-object")
619            .set_generation(123456);
620        assert_eq!(descriptor.object(), want, "{descriptor:?}");
621        assert_eq!(
622            descriptor.headers().get("content-type"),
623            Some(&HeaderValue::from_static("application/grpc")),
624            "headers={:?}",
625            descriptor.headers()
626        );
627        Ok(())
628    }
629
630    #[tokio::test]
631    async fn send_and_read() -> anyhow::Result<()> {
632        use storage_grpc_mock::{MockStorage, start};
633
634        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
635        let payload = Vec::from_iter((0..32).map(|i| i as u8));
636        let initial = BidiReadObjectResponse {
637            metadata: Some(ProtoObject {
638                bucket: "projects/_/buckets/test-bucket".to_string(),
639                name: "test-object".to_string(),
640                generation: 123456,
641                ..ProtoObject::default()
642            }),
643            object_data_ranges: vec![ObjectRangeData {
644                read_range: Some(ProtoRange {
645                    read_id: 0_i64,
646                    ..ProtoRange::default()
647                }),
648                range_end: true,
649                checksummed_data: Some(ChecksummedData {
650                    content: payload.clone(),
651                    crc32c: None,
652                }),
653            }],
654            ..BidiReadObjectResponse::default()
655        };
656        tx.send(Ok(initial.clone())).await?;
657
658        let mut mock = MockStorage::new();
659        mock.expect_bidi_read_object()
660            .return_once(|_| Ok(TonicResponse::from(rx)));
661        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
662
663        let client = Storage::builder()
664            .with_credentials(Anonymous::new().build())
665            .with_endpoint(endpoint.clone())
666            .build()
667            .await?;
668
669        let (descriptor, mut reader) = client
670            .open_object("projects/_/buckets/test-bucket", "test-object")
671            .send_and_read(ReadRange::tail(32))
672            .await?;
673        let want = Object::new()
674            .set_bucket("projects/_/buckets/test-bucket")
675            .set_name("test-object")
676            .set_generation(123456);
677        assert_eq!(descriptor.object(), want, "{descriptor:?}");
678        assert_eq!(
679            descriptor.headers().get("content-type"),
680            Some(&HeaderValue::from_static("application/grpc")),
681            "headers={:?}",
682            descriptor.headers()
683        );
684
685        let mut got_payload = Vec::new();
686        while let Some(chunk) = reader.next().await.transpose()? {
687            got_payload.extend_from_slice(&chunk);
688        }
689        assert_eq!(got_payload, payload);
690        Ok(())
691    }
692
693    #[tokio::test(start_paused = true)]
694    async fn timeout() -> anyhow::Result<()> {
695        use storage_grpc_mock::google::storage::v2::BidiReadObjectResponse;
696        use storage_grpc_mock::{MockStorage, start};
697
698        let (_tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
699
700        let mut mock = MockStorage::new();
701        mock.expect_bidi_read_object()
702            .return_once(|_| Ok(TonicResponse::from(rx)));
703        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
704
705        let client = Storage::builder()
706            .with_credentials(Anonymous::new().build())
707            .with_endpoint(endpoint.clone())
708            .with_retry_policy(NeverRetry)
709            .build()
710            .await?;
711
712        // This will timeout because we never send the initial message over `_tx`.
713        let target = Duration::from_secs(120);
714        let start = tokio::time::Instant::now();
715        let err = client
716            .open_object("projects/_/buckets/test-bucket", "test-object")
717            .with_attempt_timeout(target)
718            .send()
719            .await
720            .unwrap_err();
721        assert!(err.is_timeout(), "{err:?}");
722        assert_eq!(start.elapsed(), target);
723
724        Ok(())
725    }
726
727    #[derive(Debug)]
728    struct StorageStub;
729    impl crate::stub::Storage for StorageStub {}
730}