Skip to main content

google_cloud_storage/storage/
open_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::Result;
16use crate::model_ext::{KeyAes256, OpenObjectRequest, ReadRange};
17use crate::object_descriptor::ObjectDescriptor;
18use crate::read_object::ReadObjectResponse;
19use crate::read_resume_policy::ReadResumePolicy;
20use crate::request_options::RequestOptions;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// A request builder for [Storage::open_object][crate::client::Storage::open_object].
25///
26/// # Example
27/// ```
28/// use google_cloud_storage::client::Storage;
29/// # use google_cloud_storage::builder::storage::OpenObject;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31///     let builder: OpenObject = client
32///         .open_object("projects/_/buckets/my-bucket", "my-object");
33///     let descriptor = builder
34///         .set_generation(123)
35///         .send()
36///         .await?;
37///     println!("object metadata={:?}", descriptor.object());
38///     // Use `descriptor` to read data from `my-object`.
39///     Ok(())
40/// }
41/// ```
42#[derive(Clone, Debug)]
43pub struct OpenObject<S = crate::storage::transport::Storage> {
44    stub: Arc<S>,
45    request: OpenObjectRequest,
46    options: RequestOptions,
47}
48
49impl<S> OpenObject<S>
50where
51    S: crate::storage::stub::Storage + 'static,
52{
53    /// Sends the request, returning a new object descriptor.
54    ///
55    /// Example:
56    /// ```ignore
57    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
58    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
59    /// let open = client
60    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
61    ///     .send()
62    ///     .await?;
63    /// println!("object metadata={:?}", open.object());
64    /// # Ok(()) }
65    /// ```
66    pub async fn send(self) -> Result<ObjectDescriptor> {
67        let (descriptor, _) = self.stub.open_object(self.request, self.options).await?;
68        Ok(descriptor)
69    }
70
71    /// Sends the request, returning a new object descriptor and reader.
72    ///
73    /// Example:
74    /// ```ignore
75    /// # use google_cloud_storage::client::Storage;
76    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
77    /// use google_cloud_storage::model_ext::ReadRange;
78    /// let (descriptor, mut reader) = client
79    ///     .open_object("projects/_/buckets/my-bucket", "my-object.parquet")
80    ///     .send_and_read(ReadRange::tail(32))
81    ///     .await?;
82    /// println!("object metadata={:?}", descriptor.object());
83    /// let data = reader.next().await.transpose()?;
84    /// # Ok(()) }
85    /// ```
86    ///
87    /// This method allows applications to open an object and issue a read
88    /// request in the same RPC, which is typically faster than opening an
89    /// object and then issuing a `read_range()` call. This may be useful when
90    /// opening objects that have metadata information in a footer or header.
91    pub async fn send_and_read(
92        mut self,
93        range: ReadRange,
94    ) -> Result<(ObjectDescriptor, ReadObjectResponse)> {
95        self.request.ranges.push(range);
96        let (descriptor, mut readers) = self.stub.open_object(self.request, self.options).await?;
97        if readers.len() == 1 {
98            return Ok((descriptor, readers.pop().unwrap()));
99        }
100        // Even if the service returns multiple read ranges, with different ids,
101        // the code in the library will return an error and close the stream.
102        unreachable!("the stub cannot create more readers")
103    }
104}
105
106impl<S> OpenObject<S> {
107    pub(crate) fn new<B, O>(
108        stub: std::sync::Arc<S>,
109        bucket: B,
110        object: O,
111        options: RequestOptions,
112    ) -> Self
113    where
114        B: Into<String>,
115        O: Into<String>,
116    {
117        let request = OpenObjectRequest::default()
118            .set_bucket(bucket)
119            .set_object(object);
120        Self {
121            request,
122            options,
123            stub,
124        }
125    }
126
127    /// If present, selects a specific revision of this object (as
128    /// opposed to the latest version, the default).
129    ///
130    /// # Example
131    /// ```
132    /// # use google_cloud_storage::client::Storage;
133    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
134    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
135    /// let response = client
136    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
137    ///     .set_generation(123456)
138    ///     .send()
139    ///     .await?;
140    /// # Ok(()) }
141    /// ```
142    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
143        self.request = self.request.set_generation(v.into());
144        self
145    }
146
147    /// Makes the operation conditional on whether the object's current generation
148    /// matches the given value.
149    ///
150    /// # Example
151    /// ```
152    /// # use google_cloud_storage::client::Storage;
153    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
154    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
155    /// let response = client
156    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
157    ///     .set_if_generation_match(123456)
158    ///     .send()
159    ///     .await?;
160    /// # Ok(()) }
161    /// ```
162    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
163    where
164        T: Into<i64>,
165    {
166        self.request = self.request.set_if_generation_match(v.into());
167        self
168    }
169
170    /// Makes the operation conditional on whether the object's live generation
171    /// does not match the given value. If no live object exists, the precondition
172    /// fails.
173    ///
174    /// # Example
175    /// ```
176    /// # use google_cloud_storage::client::Storage;
177    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
178    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
179    /// let response = client
180    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
181    ///     .set_if_generation_not_match(123456)
182    ///     .send()
183    ///     .await?;
184    /// # Ok(()) }
185    /// ```
186    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
187    where
188        T: Into<i64>,
189    {
190        self.request = self.request.set_if_generation_not_match(v.into());
191        self
192    }
193
194    /// Makes the operation conditional on whether the object's current
195    /// metageneration matches the given value.
196    ///
197    /// # Example
198    /// ```
199    /// # use google_cloud_storage::client::Storage;
200    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
201    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
202    /// let response = client
203    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
204    ///     .set_if_metageneration_match(123456)
205    ///     .send()
206    ///     .await?;
207    /// # Ok(()) }
208    /// ```
209    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
210    where
211        T: Into<i64>,
212    {
213        self.request = self.request.set_if_metageneration_match(v.into());
214        self
215    }
216
217    /// Makes the operation conditional on whether the object's current
218    /// metageneration does not match the given value.
219    ///
220    /// # Example
221    /// ```
222    /// # use google_cloud_storage::client::Storage;
223    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
224    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
225    /// let response = client
226    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
227    ///     .set_if_metageneration_not_match(123456)
228    ///     .send()
229    ///     .await?;
230    /// # Ok(()) }
231    /// ```
232    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
233    where
234        T: Into<i64>,
235    {
236        self.request = self.request.set_if_metageneration_not_match(v.into());
237        self
238    }
239
240    /// The encryption key used with the Customer-Supplied Encryption Keys
241    /// feature. In raw bytes format (not base64-encoded).
242    ///
243    /// Example:
244    /// ```
245    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
246    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
247    /// let key: &[u8] = &[97; 32];
248    /// let response = client
249    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
250    ///     .set_key(KeyAes256::new(key)?)
251    ///     .send()
252    ///     .await?;
253    /// println!("response details={response:?}");
254    /// # Ok(()) }
255    /// ```
256    pub fn set_key(mut self, v: KeyAes256) -> Self {
257        self.request = self
258            .request
259            .set_common_object_request_params(crate::model::CommonObjectRequestParams::from(v));
260        self
261    }
262
263    /// The retry policy used for this request.
264    ///
265    /// # Example
266    /// ```
267    /// # use google_cloud_storage::client::Storage;
268    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
269    /// use google_cloud_storage::retry_policy::RetryableErrors;
270    /// use std::time::Duration;
271    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
272    /// let response = client
273    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
274    ///     .with_retry_policy(
275    ///         RetryableErrors
276    ///             .with_attempt_limit(5)
277    ///             .with_time_limit(Duration::from_secs(10)),
278    ///     )
279    ///     .send()
280    ///     .await?;
281    /// println!("response details={response:?}");
282    /// # Ok(()) }
283    /// ```
284    pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
285        mut self,
286        v: V,
287    ) -> Self {
288        self.options.retry_policy = v.into().into();
289        self
290    }
291
292    /// The backoff policy used for this request.
293    ///
294    /// # Example
295    /// ```
296    /// # use google_cloud_storage::client::Storage;
297    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
298    /// use std::time::Duration;
299    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
300    /// let response = client
301    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
302    ///     .with_backoff_policy(ExponentialBackoff::default())
303    ///     .send()
304    ///     .await?;
305    /// println!("response details={response:?}");
306    /// # Ok(()) }
307    /// ```
308    pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
309        mut self,
310        v: V,
311    ) -> Self {
312        self.options.backoff_policy = v.into().into();
313        self
314    }
315
316    /// The retry throttler used for this request.
317    ///
318    /// Most of the time you want to use the same throttler for all the requests
319    /// in a client, and even the same throttler for many clients. Rarely it
320    /// may be necessary to use an custom throttler for some subset of the
321    /// requests.
322    ///
323    /// # Example
324    /// ```
325    /// # use google_cloud_storage::client::Storage;
326    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
327    /// let response = client
328    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
329    ///     .with_retry_throttler(adhoc_throttler())
330    ///     .send()
331    ///     .await?;
332    /// println!("response details={response:?}");
333    /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
334    ///     # panic!();
335    /// }
336    /// # Ok(()) }
337    /// ```
338    pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
339        mut self,
340        v: V,
341    ) -> Self {
342        self.options.retry_throttler = v.into().into();
343        self
344    }
345
346    /// Configure the resume policy for read requests.
347    ///
348    /// The Cloud Storage client library can automatically resume a read that is
349    /// interrupted by a transient error. Applications may want to limit the
350    /// number of read attempts, or may wish to expand the type of errors
351    /// treated as retryable.
352    ///
353    /// # Example
354    /// ```
355    /// # use google_cloud_storage::client::Storage;
356    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
357    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
358    /// let response = client
359    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
360    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
361    ///     .send()
362    ///     .await?;
363    /// # Ok(()) }
364    /// ```
365    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
366    where
367        V: ReadResumePolicy + 'static,
368    {
369        self.options.set_read_resume_policy(std::sync::Arc::new(v));
370        self
371    }
372
373    /// Configure per-attempt timeout.
374    ///
375    /// # Example
376    /// ```
377    /// # use google_cloud_storage::client::Storage;
378    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
379    /// use std::time::Duration;
380    /// let response = client
381    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
382    ///     .with_attempt_timeout(Duration::from_secs(120))
383    ///     .send()
384    ///     .await?;
385    /// # Ok(()) }
386    /// ```
387    ///
388    /// The Cloud Storage client library times out `open_object()` attempts by
389    /// default (with a 60s timeout). Applications may want to set a different
390    /// value depending on how they are deployed.
391    ///
392    /// Note that the per-attempt timeout is subject to the overall retry loop
393    /// time limits (if any). The effective timeout for each attempt is the
394    /// smallest of (a) the per-attempt timeout, and (b) the remaining time in
395    /// the retry loop.
396    pub fn with_attempt_timeout(mut self, v: Duration) -> Self {
397        self.options.set_bidi_attempt_timeout(v);
398        self
399    }
400
401    /// Sets the `User-Agent` header for this request.
402    ///
403    /// # Example
404    /// ```
405    /// # use google_cloud_storage::client::Storage;
406    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
407    /// let mut response = client
408    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
409    ///     .with_user_agent("my-app/1.0.0")
410    ///     .send()
411    ///     .await?;
412    /// println!("response details={response:?}");
413    /// # Ok(()) }
414    /// ```
415    pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
416        self.options.user_agent = Some(user_agent.into());
417        self
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use crate::client::Storage;
425    use crate::model::{CommonObjectRequestParams, Object};
426    use crate::model_ext::tests::create_key_helper;
427    use anyhow::Result;
428    use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
429    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
430    use google_cloud_gax::retry_policy::NeverRetry;
431    use http::HeaderValue;
432    use static_assertions::assert_impl_all;
433    use storage_grpc_mock::google::storage::v2::{
434        BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
435        ReadRange as ProtoRange,
436    };
437    use storage_grpc_mock::{MockStorage, start};
438
439    const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
440    const OBJECT_NAME: &str = "test-object";
441    const USER_AGENT: &str = "quick_foxes_lazy_dogs/1.2.3";
442    const BIND_ADDRESS: &str = "0.0.0.0:0";
443
444    // Verify `open_object()` meets normal Send, Sync, requirements.
445    #[tokio::test]
446    async fn traits() -> Result<()> {
447        assert_impl_all!(OpenObject: Clone, std::fmt::Debug, Send, Sync);
448
449        let client = Storage::builder()
450            .with_credentials(Anonymous::new().build())
451            .build()
452            .await?;
453
454        fn need_send<T: Send>(_val: &T) {}
455        fn need_static<T: 'static>(_val: &T) {}
456
457        let open = client.open_object(BUCKET_NAME, OBJECT_NAME);
458        need_static(&open);
459
460        let fut = client.open_object(BUCKET_NAME, OBJECT_NAME).send();
461        need_send(&fut);
462        need_static(&fut);
463        Ok(())
464    }
465
466    #[tokio::test]
467    async fn open_object_normal() -> Result<()> {
468        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
469        let initial = BidiReadObjectResponse {
470            metadata: Some(ProtoObject {
471                bucket: BUCKET_NAME.to_string(),
472                name: OBJECT_NAME.to_string(),
473                generation: 123456,
474                size: 42,
475                ..ProtoObject::default()
476            }),
477            ..BidiReadObjectResponse::default()
478        };
479        tx.send(Ok(initial)).await?;
480
481        let mut mock = MockStorage::new();
482        mock.expect_bidi_read_object()
483            .return_once(|_| Ok(TonicResponse::from(rx)));
484        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
485
486        let client = Storage::builder()
487            .with_endpoint(endpoint)
488            .with_credentials(Anonymous::new().build())
489            .build()
490            .await?;
491        let descriptor = client.open_object(BUCKET_NAME, OBJECT_NAME).send().await?;
492
493        let got = descriptor.object();
494        let want = Object::new()
495            .set_bucket(BUCKET_NAME)
496            .set_name(OBJECT_NAME)
497            .set_generation(123456)
498            .set_size(42);
499        assert_eq!(got, want);
500
501        Ok(())
502    }
503
504    #[tokio::test]
505    async fn attributes() -> Result<()> {
506        let options = RequestOptions::new();
507        let builder = OpenObject::new(
508            Arc::new(StorageStub),
509            BUCKET_NAME.to_string(),
510            OBJECT_NAME.to_string(),
511            options,
512        )
513        .set_generation(123)
514        .set_if_generation_match(234)
515        .set_if_generation_not_match(345)
516        .set_if_metageneration_match(456)
517        .set_if_metageneration_not_match(567);
518        let want = OpenObjectRequest::default()
519            .set_bucket(BUCKET_NAME)
520            .set_object(OBJECT_NAME)
521            .set_generation(123)
522            .set_if_generation_match(234)
523            .set_if_generation_not_match(345)
524            .set_if_metageneration_match(456)
525            .set_if_metageneration_not_match(567);
526        assert_eq!(builder.request, want);
527        Ok(())
528    }
529
530    #[tokio::test]
531    async fn csek() -> Result<()> {
532        let options = RequestOptions::new();
533        let builder = OpenObject::new(
534            Arc::new(StorageStub),
535            BUCKET_NAME.to_string(),
536            OBJECT_NAME.to_string(),
537            options,
538        );
539
540        let (raw_key, _, _, _) = create_key_helper();
541        let key = KeyAes256::new(&raw_key)?;
542        let builder = builder.set_key(key.clone());
543        let want = OpenObjectRequest::default()
544            .set_bucket(BUCKET_NAME)
545            .set_object(OBJECT_NAME)
546            .set_common_object_request_params(CommonObjectRequestParams::from(key));
547        assert_eq!(builder.request, want);
548        Ok(())
549    }
550
551    #[tokio::test]
552    async fn request_options() -> Result<()> {
553        use crate::read_resume_policy::NeverResume;
554        use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
555        use google_cloud_gax::retry_policy::Aip194Strict;
556        use google_cloud_gax::retry_throttler::CircuitBreaker;
557
558        let options = RequestOptions::new();
559        let builder = OpenObject::new(
560            Arc::new(StorageStub),
561            BUCKET_NAME.to_string(),
562            OBJECT_NAME.to_string(),
563            options,
564        )
565        .with_backoff_policy(
566            ExponentialBackoffBuilder::default()
567                .with_scaling(4.0)
568                .build()
569                .expect("expontial backoff builds"),
570        )
571        .with_retry_policy(Aip194Strict)
572        .with_retry_throttler(CircuitBreaker::default())
573        .with_read_resume_policy(NeverResume)
574        .with_attempt_timeout(Duration::from_secs(120))
575        .with_user_agent(USER_AGENT);
576
577        let got = builder.options;
578        assert!(
579            format!("{:?}", got.backoff_policy).contains("ExponentialBackoff"),
580            "{got:?}"
581        );
582        assert!(
583            format!("{:?}", got.retry_policy).contains("Aip194Strict"),
584            "{got:?}"
585        );
586        assert!(
587            format!("{:?}", got.retry_throttler).contains("CircuitBreaker"),
588            "{got:?}"
589        );
590        assert!(
591            format!("{:?}", got.read_resume_policy()).contains("NeverResume"),
592            "{got:?}"
593        );
594        assert_eq!(
595            got.bidi_attempt_timeout,
596            Duration::from_secs(120),
597            "{got:?}"
598        );
599        assert_eq!(got.user_agent.as_deref(), Some(USER_AGENT), "{got:?}");
600
601        Ok(())
602    }
603
604    #[tokio::test]
605    async fn send() -> anyhow::Result<()> {
606        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
607        let initial = BidiReadObjectResponse {
608            metadata: Some(ProtoObject {
609                bucket: BUCKET_NAME.to_string(),
610                name: OBJECT_NAME.to_string(),
611                generation: 123456,
612                ..ProtoObject::default()
613            }),
614            ..BidiReadObjectResponse::default()
615        };
616        tx.send(Ok(initial)).await?;
617
618        let mut mock = MockStorage::new();
619        mock.expect_bidi_read_object()
620            .return_once(|_| Ok(TonicResponse::from(rx)));
621        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
622
623        let client = Storage::builder()
624            .with_credentials(Anonymous::new().build())
625            .with_endpoint(endpoint)
626            .build()
627            .await?;
628
629        let descriptor = client.open_object(BUCKET_NAME, OBJECT_NAME).send().await?;
630        let want = Object::new()
631            .set_bucket(BUCKET_NAME)
632            .set_name(OBJECT_NAME)
633            .set_generation(123456);
634        assert_eq!(descriptor.object(), want, "{descriptor:?}");
635        assert_eq!(
636            descriptor.headers().get("content-type"),
637            Some(&HeaderValue::from_static("application/grpc")),
638            "headers={:?}",
639            descriptor.headers()
640        );
641        Ok(())
642    }
643
644    #[tokio::test]
645    async fn send_and_read() -> anyhow::Result<()> {
646        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
647        let payload = Vec::from_iter((0..32).map(|i| i as u8));
648        let initial = BidiReadObjectResponse {
649            metadata: Some(ProtoObject {
650                bucket: BUCKET_NAME.to_string(),
651                name: OBJECT_NAME.to_string(),
652                generation: 123456,
653                ..ProtoObject::default()
654            }),
655            object_data_ranges: vec![ObjectRangeData {
656                read_range: Some(ProtoRange {
657                    read_id: 0_i64,
658                    ..ProtoRange::default()
659                }),
660                range_end: true,
661                checksummed_data: Some(ChecksummedData {
662                    content: payload.clone(),
663                    crc32c: None,
664                }),
665            }],
666            ..BidiReadObjectResponse::default()
667        };
668        tx.send(Ok(initial)).await?;
669
670        let mut mock = MockStorage::new();
671        mock.expect_bidi_read_object()
672            .return_once(|_| Ok(TonicResponse::from(rx)));
673        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
674
675        let client = Storage::builder()
676            .with_credentials(Anonymous::new().build())
677            .with_endpoint(endpoint)
678            .build()
679            .await?;
680
681        let (descriptor, mut reader) = client
682            .open_object(BUCKET_NAME, OBJECT_NAME)
683            .send_and_read(ReadRange::tail(32))
684            .await?;
685        let want = Object::new()
686            .set_bucket(BUCKET_NAME)
687            .set_name(OBJECT_NAME)
688            .set_generation(123456);
689        assert_eq!(descriptor.object(), want, "{descriptor:?}");
690        assert_eq!(
691            descriptor.headers().get("content-type"),
692            Some(&HeaderValue::from_static("application/grpc")),
693            "headers={:?}",
694            descriptor.headers()
695        );
696
697        let mut got_payload = Vec::new();
698        while let Some(chunk) = reader.next().await.transpose()? {
699            got_payload.extend_from_slice(&chunk);
700        }
701        assert_eq!(got_payload, payload);
702        Ok(())
703    }
704
705    #[tokio::test(start_paused = true)]
706    async fn timeout() -> anyhow::Result<()> {
707        let (_tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
708
709        let mut mock = MockStorage::new();
710        mock.expect_bidi_read_object()
711            .return_once(|_| Ok(TonicResponse::from(rx)));
712        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
713
714        let client = Storage::builder()
715            .with_credentials(Anonymous::new().build())
716            .with_endpoint(endpoint)
717            .with_retry_policy(NeverRetry)
718            .build()
719            .await?;
720
721        // This will timeout because we never send the initial message over `_tx`.
722        let target = Duration::from_secs(120);
723        let start = tokio::time::Instant::now();
724        let err = client
725            .open_object(BUCKET_NAME, OBJECT_NAME)
726            .with_attempt_timeout(target)
727            .send()
728            .await
729            .unwrap_err();
730        assert!(err.is_timeout(), "{err:?}");
731        assert_eq!(start.elapsed(), target);
732
733        Ok(())
734    }
735
736    #[tokio::test]
737    async fn user_agent() -> anyhow::Result<()> {
738        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
739        let initial = BidiReadObjectResponse {
740            metadata: Some(ProtoObject {
741                bucket: BUCKET_NAME.to_string(),
742                name: OBJECT_NAME.to_string(),
743                generation: 123456,
744                ..ProtoObject::default()
745            }),
746            ..BidiReadObjectResponse::default()
747        };
748        tx.send(Ok(initial)).await?;
749
750        let mut mock = MockStorage::new();
751        mock.expect_bidi_read_object().return_once(|request| {
752            let metadata = request.metadata();
753            let user_agent = metadata
754                .get(http::header::USER_AGENT.as_str())
755                .and_then(|v| v.to_str().ok())
756                .expect("user-agent should be set");
757
758            let got = user_agent.split(' ').any(|s| s == USER_AGENT);
759            assert!(got, "{user_agent:?}");
760
761            Ok(TonicResponse::from(rx))
762        });
763        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
764
765        let client = Storage::builder()
766            .with_credentials(Anonymous::new().build())
767            .with_endpoint(endpoint)
768            .build()
769            .await?;
770
771        let _descriptor = client
772            .open_object(BUCKET_NAME, OBJECT_NAME)
773            .with_user_agent(USER_AGENT)
774            .send()
775            .await?;
776        Ok(())
777    }
778
779    #[derive(Debug)]
780    struct StorageStub;
781    impl crate::stub::Storage for StorageStub {}
782}