Skip to main content

google_cloud_storage/storage/
open_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::Result;
16use crate::model_ext::{KeyAes256, OpenObjectRequest, ReadRange};
17use crate::object_descriptor::ObjectDescriptor;
18use crate::read_object::ReadObjectResponse;
19use crate::read_resume_policy::ReadResumePolicy;
20use crate::request_options::RequestOptions;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// A request builder for [Storage::open_object][crate::client::Storage::open_object].
25///
26/// # Example
27/// ```
28/// use google_cloud_storage::client::Storage;
29/// # use google_cloud_storage::builder::storage::OpenObject;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31///     let builder: OpenObject = client
32///         .open_object("projects/_/buckets/my-bucket", "my-object");
33///     let descriptor = builder
34///         .set_generation(123)
35///         .send()
36///         .await?;
37///     println!("object metadata={:?}", descriptor.object());
38///     // Use `descriptor` to read data from `my-object`.
39///     Ok(())
40/// }
41/// ```
42#[derive(Clone, Debug)]
43pub struct OpenObject<S = crate::storage::transport::Storage> {
44    stub: Arc<S>,
45    request: OpenObjectRequest,
46    options: RequestOptions,
47}
48
49impl<S> OpenObject<S>
50where
51    S: crate::storage::stub::Storage + 'static,
52{
53    /// Sends the request, returning a new object descriptor.
54    ///
55    /// Example:
56    /// ```ignore
57    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
58    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
59    /// let open = client
60    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
61    ///     .send()
62    ///     .await?;
63    /// println!("object metadata={:?}", open.object());
64    /// # Ok(()) }
65    /// ```
66    pub async fn send(self) -> Result<ObjectDescriptor> {
67        let (descriptor, _) = self.stub.open_object(self.request, self.options).await?;
68        Ok(descriptor)
69    }
70
71    /// Sends the request, returning a new object descriptor and reader.
72    ///
73    /// Example:
74    /// ```ignore
75    /// # use google_cloud_storage::client::Storage;
76    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
77    /// use google_cloud_storage::model_ext::ReadRange;
78    /// let (descriptor, mut reader) = client
79    ///     .open_object("projects/_/buckets/my-bucket", "my-object.parquet")
80    ///     .send_and_read(ReadRange::tail(32))
81    ///     .await?;
82    /// println!("object metadata={:?}", descriptor.object());
83    /// let data = reader.next().await.transpose()?;
84    /// # Ok(()) }
85    /// ```
86    ///
87    /// This method allows applications to open an object and issue a read
88    /// request in the same RPC, which is typically faster than opening an
89    /// object and then issuing a `read_range()` call. This may be useful when
90    /// opening objects that have metadata information in a footer or header.
91    pub async fn send_and_read(
92        mut self,
93        range: ReadRange,
94    ) -> Result<(ObjectDescriptor, ReadObjectResponse)> {
95        self.request.ranges.push(range);
96        let (descriptor, mut readers) = self.stub.open_object(self.request, self.options).await?;
97        if readers.len() == 1 {
98            return Ok((descriptor, readers.pop().unwrap()));
99        }
100        // Even if the service returns multiple read ranges, with different ids,
101        // the code in the library will return an error and close the stream.
102        unreachable!("the stub cannot create more readers")
103    }
104}
105
106impl<S> OpenObject<S> {
107    pub(crate) fn new<B, O>(
108        stub: std::sync::Arc<S>,
109        bucket: B,
110        object: O,
111        options: RequestOptions,
112    ) -> Self
113    where
114        B: Into<String>,
115        O: Into<String>,
116    {
117        let request = OpenObjectRequest::default()
118            .set_bucket(bucket)
119            .set_object(object);
120        Self {
121            request,
122            options,
123            stub,
124        }
125    }
126
127    /// If present, selects a specific revision of this object (as
128    /// opposed to the latest version, the default).
129    ///
130    /// # Example
131    /// ```
132    /// # use google_cloud_storage::client::Storage;
133    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
134    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
135    /// let response = client
136    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
137    ///     .set_generation(123456)
138    ///     .send()
139    ///     .await?;
140    /// # Ok(()) }
141    /// ```
142    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
143        self.request = self.request.set_generation(v.into());
144        self
145    }
146
147    /// Makes the operation conditional on whether the object's current generation
148    /// matches the given value.
149    ///
150    /// # Example
151    /// ```
152    /// # use google_cloud_storage::client::Storage;
153    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
154    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
155    /// let response = client
156    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
157    ///     .set_if_generation_match(123456)
158    ///     .send()
159    ///     .await?;
160    /// # Ok(()) }
161    /// ```
162    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
163    where
164        T: Into<i64>,
165    {
166        self.request = self.request.set_if_generation_match(v.into());
167        self
168    }
169
170    /// Makes the operation conditional on whether the object's live generation
171    /// does not match the given value. If no live object exists, the precondition
172    /// fails.
173    ///
174    /// # Example
175    /// ```
176    /// # use google_cloud_storage::client::Storage;
177    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
178    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
179    /// let response = client
180    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
181    ///     .set_if_generation_not_match(123456)
182    ///     .send()
183    ///     .await?;
184    /// # Ok(()) }
185    /// ```
186    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
187    where
188        T: Into<i64>,
189    {
190        self.request = self.request.set_if_generation_not_match(v.into());
191        self
192    }
193
194    /// Makes the operation conditional on whether the object's current
195    /// metageneration matches the given value.
196    ///
197    /// # Example
198    /// ```
199    /// # use google_cloud_storage::client::Storage;
200    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
201    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
202    /// let response = client
203    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
204    ///     .set_if_metageneration_match(123456)
205    ///     .send()
206    ///     .await?;
207    /// # Ok(()) }
208    /// ```
209    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
210    where
211        T: Into<i64>,
212    {
213        self.request = self.request.set_if_metageneration_match(v.into());
214        self
215    }
216
217    /// Makes the operation conditional on whether the object's current
218    /// metageneration does not match the given value.
219    ///
220    /// # Example
221    /// ```
222    /// # use google_cloud_storage::client::Storage;
223    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
224    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
225    /// let response = client
226    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
227    ///     .set_if_metageneration_not_match(123456)
228    ///     .send()
229    ///     .await?;
230    /// # Ok(()) }
231    /// ```
232    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
233    where
234        T: Into<i64>,
235    {
236        self.request = self.request.set_if_metageneration_not_match(v.into());
237        self
238    }
239
240    /// The encryption key used with the Customer-Supplied Encryption Keys
241    /// feature. In raw bytes format (not base64-encoded).
242    ///
243    /// Example:
244    /// ```
245    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
246    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
247    /// let key: &[u8] = &[97; 32];
248    /// let response = client
249    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
250    ///     .set_key(KeyAes256::new(key)?)
251    ///     .send()
252    ///     .await?;
253    /// println!("response details={response:?}");
254    /// # Ok(()) }
255    /// ```
256    pub fn set_key(mut self, v: KeyAes256) -> Self {
257        self.request = self
258            .request
259            .set_common_object_request_params(crate::model::CommonObjectRequestParams::from(v));
260        self
261    }
262
263    /// The retry policy used for this request.
264    ///
265    /// # Example
266    /// ```
267    /// # use google_cloud_storage::client::Storage;
268    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
269    /// use google_cloud_storage::retry_policy::RetryableErrors;
270    /// use std::time::Duration;
271    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
272    /// let response = client
273    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
274    ///     .with_retry_policy(
275    ///         RetryableErrors
276    ///             .with_attempt_limit(5)
277    ///             .with_time_limit(Duration::from_secs(10)),
278    ///     )
279    ///     .send()
280    ///     .await?;
281    /// println!("response details={response:?}");
282    /// # Ok(()) }
283    /// ```
284    pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
285        mut self,
286        v: V,
287    ) -> Self {
288        self.options.retry_policy = v.into().into();
289        self
290    }
291
292    /// The backoff policy used for this request.
293    ///
294    /// # Example
295    /// ```
296    /// # use google_cloud_storage::client::Storage;
297    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
298    /// use std::time::Duration;
299    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
300    /// let response = client
301    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
302    ///     .with_backoff_policy(ExponentialBackoff::default())
303    ///     .send()
304    ///     .await?;
305    /// println!("response details={response:?}");
306    /// # Ok(()) }
307    /// ```
308    pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
309        mut self,
310        v: V,
311    ) -> Self {
312        self.options.backoff_policy = v.into().into();
313        self
314    }
315
316    /// The retry throttler used for this request.
317    ///
318    /// Most of the time you want to use the same throttler for all the requests
319    /// in a client, and even the same throttler for many clients. Rarely it
320    /// may be necessary to use an custom throttler for some subset of the
321    /// requests.
322    ///
323    /// # Example
324    /// ```
325    /// # use google_cloud_storage::client::Storage;
326    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
327    /// let response = client
328    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
329    ///     .with_retry_throttler(adhoc_throttler())
330    ///     .send()
331    ///     .await?;
332    /// println!("response details={response:?}");
333    /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
334    ///     # panic!();
335    /// }
336    /// # Ok(()) }
337    /// ```
338    pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
339        mut self,
340        v: V,
341    ) -> Self {
342        self.options.retry_throttler = v.into().into();
343        self
344    }
345
346    /// Configure the resume policy for read requests.
347    ///
348    /// The Cloud Storage client library can automatically resume a read that is
349    /// interrupted by a transient error. Applications may want to limit the
350    /// number of read attempts, or may wish to expand the type of errors
351    /// treated as retryable.
352    ///
353    /// # Example
354    /// ```
355    /// # use google_cloud_storage::client::Storage;
356    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
357    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
358    /// let response = client
359    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
360    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
361    ///     .send()
362    ///     .await?;
363    /// # Ok(()) }
364    /// ```
365    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
366    where
367        V: ReadResumePolicy + 'static,
368    {
369        self.options.set_read_resume_policy(std::sync::Arc::new(v));
370        self
371    }
372
373    /// Configure per-attempt timeout.
374    ///
375    /// # Example
376    /// ```
377    /// # use google_cloud_storage::client::Storage;
378    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
379    /// use std::time::Duration;
380    /// let response = client
381    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
382    ///     .with_attempt_timeout(Duration::from_secs(120))
383    ///     .send()
384    ///     .await?;
385    /// # Ok(()) }
386    /// ```
387    ///
388    /// The Cloud Storage client library times out `open_object()` attempts by
389    /// default (with a 60s timeout). Applications may want to set a different
390    /// value depending on how they are deployed.
391    ///
392    /// Note that the per-attempt timeout is subject to the overall retry loop
393    /// time limits (if any). The effective timeout for each attempt is the
394    /// smallest of (a) the per-attempt timeout, and (b) the remaining time in
395    /// the retry loop.
396    pub fn with_attempt_timeout(mut self, v: Duration) -> Self {
397        self.options.set_bidi_attempt_timeout(v);
398        self
399    }
400
401    /// Sets the `User-Agent` header for this request.
402    ///
403    /// # Example
404    /// ```
405    /// # use google_cloud_storage::client::Storage;
406    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
407    /// let mut response = client
408    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
409    ///     .with_user_agent("my-app/1.0.0")
410    ///     .send()
411    ///     .await?;
412    /// println!("response details={response:?}");
413    /// # Ok(()) }
414    /// ```
415    pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
416        self.options.user_agent = Some(user_agent.into());
417        self
418    }
419
420    /// Sets the project that will be billed for this request.
421    ///
422    /// Required for [Requester Pays] buckets. The value overrides any
423    /// `quota_project_id` configured on the credentials; the credential-level
424    /// header is suppressed for this RPC.
425    ///
426    /// # Example
427    /// ```
428    /// # use google_cloud_storage::client::Storage;
429    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
430    /// let response = client
431    ///     .open_object("projects/_/buckets/my-bucket", "my-object")
432    ///     .with_quota_project("my-billing-project")
433    ///     .send()
434    ///     .await?;
435    /// # Ok(()) }
436    /// ```
437    ///
438    /// [Requester Pays]: https://cloud.google.com/storage/docs/requester-pays
439    pub fn with_quota_project(mut self, project: impl Into<String>) -> Self {
440        self.options.set_quota_project(project);
441        self
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448    use crate::client::Storage;
449    use crate::model::{CommonObjectRequestParams, Object};
450    use crate::model_ext::tests::create_key_helper;
451    use anyhow::Result;
452    use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
453    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
454    use google_cloud_gax::retry_policy::NeverRetry;
455    use http::HeaderValue;
456    use static_assertions::assert_impl_all;
457    use storage_grpc_mock::google::storage::v2::{
458        BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
459        ReadRange as ProtoRange,
460    };
461    use storage_grpc_mock::{MockStorage, start};
462
463    const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
464    const OBJECT_NAME: &str = "test-object";
465    const USER_AGENT: &str = "quick_foxes_lazy_dogs/1.2.3";
466    const BIND_ADDRESS: &str = "0.0.0.0:0";
467
468    // Verify `open_object()` meets normal Send, Sync, requirements.
469    #[tokio::test]
470    async fn traits() -> Result<()> {
471        assert_impl_all!(OpenObject: Clone, std::fmt::Debug, Send, Sync);
472
473        let client = Storage::builder()
474            .with_credentials(Anonymous::new().build())
475            .build()
476            .await?;
477
478        fn need_send<T: Send>(_val: &T) {}
479        fn need_static<T: 'static>(_val: &T) {}
480
481        let open = client.open_object(BUCKET_NAME, OBJECT_NAME);
482        need_static(&open);
483
484        let fut = client.open_object(BUCKET_NAME, OBJECT_NAME).send();
485        need_send(&fut);
486        need_static(&fut);
487        Ok(())
488    }
489
490    #[tokio::test]
491    async fn open_object_normal() -> Result<()> {
492        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
493        let initial = BidiReadObjectResponse {
494            metadata: Some(ProtoObject {
495                bucket: BUCKET_NAME.to_string(),
496                name: OBJECT_NAME.to_string(),
497                generation: 123456,
498                size: 42,
499                ..ProtoObject::default()
500            }),
501            ..BidiReadObjectResponse::default()
502        };
503        tx.send(Ok(initial)).await?;
504
505        let mut mock = MockStorage::new();
506        mock.expect_bidi_read_object()
507            .return_once(|_| Ok(TonicResponse::from(rx)));
508        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
509
510        let client = Storage::builder()
511            .with_endpoint(endpoint)
512            .with_credentials(Anonymous::new().build())
513            .build()
514            .await?;
515        let descriptor = client.open_object(BUCKET_NAME, OBJECT_NAME).send().await?;
516
517        let got = descriptor.object();
518        let want = Object::new()
519            .set_bucket(BUCKET_NAME)
520            .set_name(OBJECT_NAME)
521            .set_generation(123456)
522            .set_size(42);
523        assert_eq!(got, want);
524
525        Ok(())
526    }
527
528    #[tokio::test]
529    async fn attributes() -> Result<()> {
530        let options = RequestOptions::new();
531        let builder = OpenObject::new(
532            Arc::new(StorageStub),
533            BUCKET_NAME.to_string(),
534            OBJECT_NAME.to_string(),
535            options,
536        )
537        .set_generation(123)
538        .set_if_generation_match(234)
539        .set_if_generation_not_match(345)
540        .set_if_metageneration_match(456)
541        .set_if_metageneration_not_match(567);
542        let want = OpenObjectRequest::default()
543            .set_bucket(BUCKET_NAME)
544            .set_object(OBJECT_NAME)
545            .set_generation(123)
546            .set_if_generation_match(234)
547            .set_if_generation_not_match(345)
548            .set_if_metageneration_match(456)
549            .set_if_metageneration_not_match(567);
550        assert_eq!(builder.request, want);
551        Ok(())
552    }
553
554    #[tokio::test]
555    async fn csek() -> Result<()> {
556        let options = RequestOptions::new();
557        let builder = OpenObject::new(
558            Arc::new(StorageStub),
559            BUCKET_NAME.to_string(),
560            OBJECT_NAME.to_string(),
561            options,
562        );
563
564        let (raw_key, _, _, _) = create_key_helper();
565        let key = KeyAes256::new(&raw_key)?;
566        let builder = builder.set_key(key.clone());
567        let want = OpenObjectRequest::default()
568            .set_bucket(BUCKET_NAME)
569            .set_object(OBJECT_NAME)
570            .set_common_object_request_params(CommonObjectRequestParams::from(key));
571        assert_eq!(builder.request, want);
572        Ok(())
573    }
574
575    #[tokio::test]
576    async fn request_options() -> Result<()> {
577        use crate::read_resume_policy::NeverResume;
578        use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
579        use google_cloud_gax::retry_policy::Aip194Strict;
580        use google_cloud_gax::retry_throttler::CircuitBreaker;
581
582        let options = RequestOptions::new();
583        let builder = OpenObject::new(
584            Arc::new(StorageStub),
585            BUCKET_NAME.to_string(),
586            OBJECT_NAME.to_string(),
587            options,
588        )
589        .with_backoff_policy(
590            ExponentialBackoffBuilder::default()
591                .with_scaling(4.0)
592                .build()
593                .expect("expontial backoff builds"),
594        )
595        .with_retry_policy(Aip194Strict)
596        .with_retry_throttler(CircuitBreaker::default())
597        .with_read_resume_policy(NeverResume)
598        .with_attempt_timeout(Duration::from_secs(120))
599        .with_user_agent(USER_AGENT);
600
601        let got = builder.options;
602        assert!(
603            format!("{:?}", got.backoff_policy).contains("ExponentialBackoff"),
604            "{got:?}"
605        );
606        assert!(
607            format!("{:?}", got.retry_policy).contains("Aip194Strict"),
608            "{got:?}"
609        );
610        assert!(
611            format!("{:?}", got.retry_throttler).contains("CircuitBreaker"),
612            "{got:?}"
613        );
614        assert!(
615            format!("{:?}", got.read_resume_policy()).contains("NeverResume"),
616            "{got:?}"
617        );
618        assert_eq!(
619            got.bidi_attempt_timeout,
620            Duration::from_secs(120),
621            "{got:?}"
622        );
623        assert_eq!(got.user_agent.as_deref(), Some(USER_AGENT), "{got:?}");
624
625        Ok(())
626    }
627
628    #[tokio::test]
629    async fn send() -> anyhow::Result<()> {
630        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
631        let initial = BidiReadObjectResponse {
632            metadata: Some(ProtoObject {
633                bucket: BUCKET_NAME.to_string(),
634                name: OBJECT_NAME.to_string(),
635                generation: 123456,
636                ..ProtoObject::default()
637            }),
638            ..BidiReadObjectResponse::default()
639        };
640        tx.send(Ok(initial)).await?;
641
642        let mut mock = MockStorage::new();
643        mock.expect_bidi_read_object()
644            .return_once(|_| Ok(TonicResponse::from(rx)));
645        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
646
647        let client = Storage::builder()
648            .with_credentials(Anonymous::new().build())
649            .with_endpoint(endpoint)
650            .build()
651            .await?;
652
653        let descriptor = client.open_object(BUCKET_NAME, OBJECT_NAME).send().await?;
654        let want = Object::new()
655            .set_bucket(BUCKET_NAME)
656            .set_name(OBJECT_NAME)
657            .set_generation(123456);
658        assert_eq!(descriptor.object(), want, "{descriptor:?}");
659        assert_eq!(
660            descriptor.headers().get("content-type"),
661            Some(&HeaderValue::from_static("application/grpc")),
662            "headers={:?}",
663            descriptor.headers()
664        );
665        Ok(())
666    }
667
668    #[tokio::test]
669    async fn send_and_read() -> anyhow::Result<()> {
670        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
671        let payload = Vec::from_iter((0..32).map(|i| i as u8));
672        let initial = BidiReadObjectResponse {
673            metadata: Some(ProtoObject {
674                bucket: BUCKET_NAME.to_string(),
675                name: OBJECT_NAME.to_string(),
676                generation: 123456,
677                ..ProtoObject::default()
678            }),
679            object_data_ranges: vec![ObjectRangeData {
680                read_range: Some(ProtoRange {
681                    read_id: 0_i64,
682                    ..ProtoRange::default()
683                }),
684                range_end: true,
685                checksummed_data: Some(ChecksummedData {
686                    content: payload.clone(),
687                    crc32c: None,
688                }),
689            }],
690            ..BidiReadObjectResponse::default()
691        };
692        tx.send(Ok(initial)).await?;
693
694        let mut mock = MockStorage::new();
695        mock.expect_bidi_read_object()
696            .return_once(|_| Ok(TonicResponse::from(rx)));
697        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
698
699        let client = Storage::builder()
700            .with_credentials(Anonymous::new().build())
701            .with_endpoint(endpoint)
702            .build()
703            .await?;
704
705        let (descriptor, mut reader) = client
706            .open_object(BUCKET_NAME, OBJECT_NAME)
707            .send_and_read(ReadRange::tail(32))
708            .await?;
709        let want = Object::new()
710            .set_bucket(BUCKET_NAME)
711            .set_name(OBJECT_NAME)
712            .set_generation(123456);
713        assert_eq!(descriptor.object(), want, "{descriptor:?}");
714        assert_eq!(
715            descriptor.headers().get("content-type"),
716            Some(&HeaderValue::from_static("application/grpc")),
717            "headers={:?}",
718            descriptor.headers()
719        );
720
721        let mut got_payload = Vec::new();
722        while let Some(chunk) = reader.next().await.transpose()? {
723            got_payload.extend_from_slice(&chunk);
724        }
725        assert_eq!(got_payload, payload);
726        Ok(())
727    }
728
729    #[tokio::test(start_paused = true)]
730    async fn timeout() -> anyhow::Result<()> {
731        let (_tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
732
733        let mut mock = MockStorage::new();
734        mock.expect_bidi_read_object()
735            .return_once(|_| Ok(TonicResponse::from(rx)));
736        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
737
738        let client = Storage::builder()
739            .with_credentials(Anonymous::new().build())
740            .with_endpoint(endpoint)
741            .with_retry_policy(NeverRetry)
742            .build()
743            .await?;
744
745        // This will timeout because we never send the initial message over `_tx`.
746        let target = Duration::from_secs(120);
747        let start = tokio::time::Instant::now();
748        let err = client
749            .open_object(BUCKET_NAME, OBJECT_NAME)
750            .with_attempt_timeout(target)
751            .send()
752            .await
753            .unwrap_err();
754        assert!(err.is_timeout(), "{err:?}");
755        assert_eq!(start.elapsed(), target);
756
757        Ok(())
758    }
759
760    #[tokio::test]
761    async fn user_agent() -> anyhow::Result<()> {
762        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
763        let initial = BidiReadObjectResponse {
764            metadata: Some(ProtoObject {
765                bucket: BUCKET_NAME.to_string(),
766                name: OBJECT_NAME.to_string(),
767                generation: 123456,
768                ..ProtoObject::default()
769            }),
770            ..BidiReadObjectResponse::default()
771        };
772        tx.send(Ok(initial)).await?;
773
774        let mut mock = MockStorage::new();
775        mock.expect_bidi_read_object().return_once(|request| {
776            let metadata = request.metadata();
777            let user_agent = metadata
778                .get(http::header::USER_AGENT.as_str())
779                .and_then(|v| v.to_str().ok())
780                .expect("user-agent should be set");
781
782            let got = user_agent.split(' ').any(|s| s == USER_AGENT);
783            assert!(got, "{user_agent:?}");
784
785            Ok(TonicResponse::from(rx))
786        });
787        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
788
789        let client = Storage::builder()
790            .with_credentials(Anonymous::new().build())
791            .with_endpoint(endpoint)
792            .build()
793            .await?;
794
795        let _descriptor = client
796            .open_object(BUCKET_NAME, OBJECT_NAME)
797            .with_user_agent(USER_AGENT)
798            .send()
799            .await?;
800        Ok(())
801    }
802
803    #[tokio::test]
804    async fn quota_project() -> anyhow::Result<()> {
805        const PROJECT_NAME: &str = "project_lazy_dog";
806        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
807        let initial = BidiReadObjectResponse {
808            metadata: Some(ProtoObject {
809                bucket: BUCKET_NAME.to_string(),
810                name: OBJECT_NAME.to_string(),
811                generation: 123456,
812                ..ProtoObject::default()
813            }),
814            ..BidiReadObjectResponse::default()
815        };
816        tx.send(Ok(initial)).await?;
817
818        let mut mock = MockStorage::new();
819        mock.expect_bidi_read_object().return_once(|request| {
820            let user_project = request
821                .metadata()
822                .get("x-goog-user-project")
823                .and_then(|v| v.to_str().ok())
824                .expect("x-goog-user-project should be set");
825            assert_eq!(user_project, PROJECT_NAME);
826            Ok(TonicResponse::from(rx))
827        });
828        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
829
830        let client = Storage::builder()
831            .with_credentials(Anonymous::new().build())
832            .with_endpoint(endpoint)
833            .build()
834            .await?;
835
836        let _descriptor = client
837            .open_object(BUCKET_NAME, OBJECT_NAME)
838            .with_quota_project(PROJECT_NAME)
839            .send()
840            .await?;
841        Ok(())
842    }
843
844    #[derive(Debug)]
845    struct StorageStub;
846    impl crate::stub::Storage for StorageStub {}
847}