google_cloud_storage/storage/open_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::Result;
16use crate::model_ext::{KeyAes256, OpenObjectRequest, ReadRange};
17use crate::object_descriptor::ObjectDescriptor;
18use crate::read_object::ReadObjectResponse;
19use crate::read_resume_policy::ReadResumePolicy;
20use crate::request_options::RequestOptions;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// A request builder for [Storage::open_object][crate::client::Storage::open_object].
25///
26/// # Example
27/// ```
28/// use google_cloud_storage::client::Storage;
29/// # use google_cloud_storage::builder::storage::OpenObject;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31/// let builder: OpenObject = client
32/// .open_object("projects/_/buckets/my-bucket", "my-object");
33/// let descriptor = builder
34/// .set_generation(123)
35/// .send()
36/// .await?;
37/// println!("object metadata={:?}", descriptor.object());
38/// // Use `descriptor` to read data from `my-object`.
39/// Ok(())
40/// }
41/// ```
42#[derive(Clone, Debug)]
43pub struct OpenObject<S = crate::storage::transport::Storage> {
44 stub: Arc<S>,
45 request: OpenObjectRequest,
46 options: RequestOptions,
47}
48
49impl<S> OpenObject<S>
50where
51 S: crate::storage::stub::Storage + 'static,
52{
53 /// Sends the request, returning a new object descriptor.
54 ///
55 /// Example:
56 /// ```ignore
57 /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
58 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
59 /// let open = client
60 /// .open_object("projects/_/buckets/my-bucket", "my-object")
61 /// .send()
62 /// .await?;
63 /// println!("object metadata={:?}", open.object());
64 /// # Ok(()) }
65 /// ```
66 pub async fn send(self) -> Result<ObjectDescriptor> {
67 let (descriptor, _) = self.stub.open_object(self.request, self.options).await?;
68 Ok(descriptor)
69 }
70
71 /// Sends the request, returning a new object descriptor and reader.
72 ///
73 /// Example:
74 /// ```ignore
75 /// # use google_cloud_storage::client::Storage;
76 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
77 /// use google_cloud_storage::model_ext::ReadRange;
78 /// let (descriptor, mut reader) = client
79 /// .open_object("projects/_/buckets/my-bucket", "my-object.parquet")
80 /// .send_and_read(ReadRange::tail(32))
81 /// .await?;
82 /// println!("object metadata={:?}", descriptor.object());
83 /// let data = reader.next().await.transpose()?;
84 /// # Ok(()) }
85 /// ```
86 ///
87 /// This method allows applications to open an object and issue a read
88 /// request in the same RPC, which is typically faster than opening an
89 /// object and then issuing a `read_range()` call. This may be useful when
90 /// opening objects that have metadata information in a footer or header.
91 pub async fn send_and_read(
92 mut self,
93 range: ReadRange,
94 ) -> Result<(ObjectDescriptor, ReadObjectResponse)> {
95 self.request.ranges.push(range);
96 let (descriptor, mut readers) = self.stub.open_object(self.request, self.options).await?;
97 if readers.len() == 1 {
98 return Ok((descriptor, readers.pop().unwrap()));
99 }
100 // Even if the service returns multiple read ranges, with different ids,
101 // the code in the library will return an error and close the stream.
102 unreachable!("the stub cannot create more readers")
103 }
104}
105
106impl<S> OpenObject<S> {
107 pub(crate) fn new(
108 bucket: String,
109 object: String,
110 stub: Arc<S>,
111 options: RequestOptions,
112 ) -> Self {
113 let request = OpenObjectRequest::default()
114 .set_bucket(bucket)
115 .set_object(object);
116 Self {
117 request,
118 options,
119 stub,
120 }
121 }
122
123 /// If present, selects a specific revision of this object (as
124 /// opposed to the latest version, the default).
125 ///
126 /// # Example
127 /// ```
128 /// # use google_cloud_storage::client::Storage;
129 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
130 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
131 /// let response = client
132 /// .open_object("projects/_/buckets/my-bucket", "my-object")
133 /// .set_generation(123456)
134 /// .send()
135 /// .await?;
136 /// # Ok(()) }
137 /// ```
138 pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
139 self.request = self.request.set_generation(v.into());
140 self
141 }
142
143 /// Makes the operation conditional on whether the object's current generation
144 /// matches the given value.
145 ///
146 /// # Example
147 /// ```
148 /// # use google_cloud_storage::client::Storage;
149 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
150 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
151 /// let response = client
152 /// .open_object("projects/_/buckets/my-bucket", "my-object")
153 /// .set_if_generation_match(123456)
154 /// .send()
155 /// .await?;
156 /// # Ok(()) }
157 /// ```
158 pub fn set_if_generation_match<T>(mut self, v: T) -> Self
159 where
160 T: Into<i64>,
161 {
162 self.request = self.request.set_if_generation_match(v.into());
163 self
164 }
165
166 /// Makes the operation conditional on whether the object's live generation
167 /// does not match the given value. If no live object exists, the precondition
168 /// fails.
169 ///
170 /// # Example
171 /// ```
172 /// # use google_cloud_storage::client::Storage;
173 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
174 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
175 /// let response = client
176 /// .open_object("projects/_/buckets/my-bucket", "my-object")
177 /// .set_if_generation_not_match(123456)
178 /// .send()
179 /// .await?;
180 /// # Ok(()) }
181 /// ```
182 pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
183 where
184 T: Into<i64>,
185 {
186 self.request = self.request.set_if_generation_not_match(v.into());
187 self
188 }
189
190 /// Makes the operation conditional on whether the object's current
191 /// metageneration matches the given value.
192 ///
193 /// # Example
194 /// ```
195 /// # use google_cloud_storage::client::Storage;
196 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
197 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
198 /// let response = client
199 /// .open_object("projects/_/buckets/my-bucket", "my-object")
200 /// .set_if_metageneration_match(123456)
201 /// .send()
202 /// .await?;
203 /// # Ok(()) }
204 /// ```
205 pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
206 where
207 T: Into<i64>,
208 {
209 self.request = self.request.set_if_metageneration_match(v.into());
210 self
211 }
212
213 /// Makes the operation conditional on whether the object's current
214 /// metageneration does not match the given value.
215 ///
216 /// # Example
217 /// ```
218 /// # use google_cloud_storage::client::Storage;
219 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
220 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
221 /// let response = client
222 /// .open_object("projects/_/buckets/my-bucket", "my-object")
223 /// .set_if_metageneration_not_match(123456)
224 /// .send()
225 /// .await?;
226 /// # Ok(()) }
227 /// ```
228 pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
229 where
230 T: Into<i64>,
231 {
232 self.request = self.request.set_if_metageneration_not_match(v.into());
233 self
234 }
235
236 /// The encryption key used with the Customer-Supplied Encryption Keys
237 /// feature. In raw bytes format (not base64-encoded).
238 ///
239 /// Example:
240 /// ```
241 /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
242 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
243 /// let key: &[u8] = &[97; 32];
244 /// let response = client
245 /// .open_object("projects/_/buckets/my-bucket", "my-object")
246 /// .set_key(KeyAes256::new(key)?)
247 /// .send()
248 /// .await?;
249 /// println!("response details={response:?}");
250 /// # Ok(()) }
251 /// ```
252 pub fn set_key(mut self, v: KeyAes256) -> Self {
253 self.request = self
254 .request
255 .set_common_object_request_params(crate::model::CommonObjectRequestParams::from(v));
256 self
257 }
258
259 /// The retry policy used for this request.
260 ///
261 /// # Example
262 /// ```
263 /// # use google_cloud_storage::client::Storage;
264 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
265 /// use google_cloud_storage::retry_policy::RetryableErrors;
266 /// use std::time::Duration;
267 /// use gax::retry_policy::RetryPolicyExt;
268 /// let response = client
269 /// .open_object("projects/_/buckets/my-bucket", "my-object")
270 /// .with_retry_policy(
271 /// RetryableErrors
272 /// .with_attempt_limit(5)
273 /// .with_time_limit(Duration::from_secs(10)),
274 /// )
275 /// .send()
276 /// .await?;
277 /// println!("response details={response:?}");
278 /// # Ok(()) }
279 /// ```
280 pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
281 self.options.retry_policy = v.into().into();
282 self
283 }
284
285 /// The backoff policy used for this request.
286 ///
287 /// # Example
288 /// ```
289 /// # use google_cloud_storage::client::Storage;
290 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
291 /// use std::time::Duration;
292 /// use gax::exponential_backoff::ExponentialBackoff;
293 /// let response = client
294 /// .open_object("projects/_/buckets/my-bucket", "my-object")
295 /// .with_backoff_policy(ExponentialBackoff::default())
296 /// .send()
297 /// .await?;
298 /// println!("response details={response:?}");
299 /// # Ok(()) }
300 /// ```
301 pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
302 mut self,
303 v: V,
304 ) -> Self {
305 self.options.backoff_policy = v.into().into();
306 self
307 }
308
309 /// The retry throttler used for this request.
310 ///
311 /// Most of the time you want to use the same throttler for all the requests
312 /// in a client, and even the same throttler for many clients. Rarely it
313 /// may be necessary to use an custom throttler for some subset of the
314 /// requests.
315 ///
316 /// # Example
317 /// ```
318 /// # use google_cloud_storage::client::Storage;
319 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
320 /// let response = client
321 /// .open_object("projects/_/buckets/my-bucket", "my-object")
322 /// .with_retry_throttler(adhoc_throttler())
323 /// .send()
324 /// .await?;
325 /// println!("response details={response:?}");
326 /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
327 /// # panic!();
328 /// }
329 /// # Ok(()) }
330 /// ```
331 pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
332 mut self,
333 v: V,
334 ) -> Self {
335 self.options.retry_throttler = v.into().into();
336 self
337 }
338
339 /// Configure the resume policy for read requests.
340 ///
341 /// The Cloud Storage client library can automatically resume a read that is
342 /// interrupted by a transient error. Applications may want to limit the
343 /// number of read attempts, or may wish to expand the type of errors
344 /// treated as retryable.
345 ///
346 /// # Example
347 /// ```
348 /// # use google_cloud_storage::client::Storage;
349 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
350 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
351 /// let response = client
352 /// .open_object("projects/_/buckets/my-bucket", "my-object")
353 /// .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
354 /// .send()
355 /// .await?;
356 /// # Ok(()) }
357 /// ```
358 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
359 where
360 V: ReadResumePolicy + 'static,
361 {
362 self.options.set_read_resume_policy(std::sync::Arc::new(v));
363 self
364 }
365
366 /// Configure per-attempt timeout.
367 ///
368 /// # Example
369 /// ```
370 /// # use google_cloud_storage::client::Storage;
371 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
372 /// use std::time::Duration;
373 /// let response = client
374 /// .open_object("projects/_/buckets/my-bucket", "my-object")
375 /// .with_attempt_timeout(Duration::from_secs(120))
376 /// .send()
377 /// .await?;
378 /// # Ok(()) }
379 /// ```
380 ///
381 /// The Cloud Storage client library times out `open_object()` attempts by
382 /// default (with a 60s timeout). Applications may want to set a different
383 /// value depending on how they are deployed.
384 ///
385 /// Note that the per-attempt timeout is subject to the overall retry loop
386 /// time limits (if any). The effective timeout for each attempt is the
387 /// smallest of (a) the per-attempt timeout, and (b) the remaining time in
388 /// the retry loop.
389 pub fn with_attempt_timeout(mut self, v: Duration) -> Self {
390 self.options.set_bidi_attempt_timeout(v);
391 self
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use crate::client::Storage;
399 use crate::model::{CommonObjectRequestParams, Object};
400 use crate::model_ext::tests::create_key_helper;
401 use anyhow::Result;
402 use gax::retry_policy::NeverRetry;
403 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
404 use http::HeaderValue;
405 use static_assertions::assert_impl_all;
406 use storage_grpc_mock::google::storage::v2::{
407 BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
408 ReadRange as ProtoRange,
409 };
410 use storage_grpc_mock::{MockStorage, start};
411
412 // Verify `open_object()` meets normal Send, Sync, requirements.
413 #[tokio::test]
414 async fn traits() -> Result<()> {
415 assert_impl_all!(OpenObject: Clone, std::fmt::Debug);
416 assert_impl_all!(OpenObject: Send, Sync);
417
418 let client = Storage::builder()
419 .with_credentials(Anonymous::new().build())
420 .build()
421 .await?;
422
423 fn need_send<T: Send>(_val: &T) {}
424 fn need_static<T: 'static>(_val: &T) {}
425
426 let open = client.open_object("projects/_/buckets/test-bucket", "test-object");
427 need_static(&open);
428
429 let fut = client
430 .open_object("projects/_/buckets/test-bucket", "test-object")
431 .send();
432 need_send(&fut);
433 need_static(&fut);
434 Ok(())
435 }
436
437 #[tokio::test]
438 async fn open_object_normal() -> Result<()> {
439 const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
440
441 let (tx, rx) = tokio::sync::mpsc::channel::<tonic::Result<BidiReadObjectResponse>>(1);
442 let initial = BidiReadObjectResponse {
443 metadata: Some(ProtoObject {
444 bucket: BUCKET_NAME.to_string(),
445 name: "test-object".to_string(),
446 generation: 123456,
447 size: 42,
448 ..ProtoObject::default()
449 }),
450 ..BidiReadObjectResponse::default()
451 };
452 tx.send(Ok(initial.clone())).await?;
453
454 let mut mock = MockStorage::new();
455 mock.expect_bidi_read_object()
456 .return_once(|_| Ok(tonic::Response::from(rx)));
457 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
458
459 let client = Storage::builder()
460 .with_endpoint(endpoint)
461 .with_credentials(Anonymous::new().build())
462 .build()
463 .await?;
464 let descriptor = client
465 .open_object(BUCKET_NAME, "test-object")
466 .send()
467 .await?;
468
469 let got = descriptor.object();
470 let want = Object::new()
471 .set_bucket(BUCKET_NAME)
472 .set_name("test-object")
473 .set_generation(123456)
474 .set_size(42);
475 assert_eq!(got, want);
476
477 Ok(())
478 }
479
480 #[tokio::test]
481 async fn attributes() -> Result<()> {
482 let options = RequestOptions::new();
483 let builder = OpenObject::new(
484 "bucket".to_string(),
485 "object".to_string(),
486 Arc::new(StorageStub),
487 options,
488 )
489 .set_generation(123)
490 .set_if_generation_match(234)
491 .set_if_generation_not_match(345)
492 .set_if_metageneration_match(456)
493 .set_if_metageneration_not_match(567);
494 let want = OpenObjectRequest::default()
495 .set_bucket("bucket")
496 .set_object("object")
497 .set_generation(123)
498 .set_if_generation_match(234)
499 .set_if_generation_not_match(345)
500 .set_if_metageneration_match(456)
501 .set_if_metageneration_not_match(567);
502 assert_eq!(builder.request, want);
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn csek() -> Result<()> {
508 let options = RequestOptions::new();
509 let builder = OpenObject::new(
510 "bucket".to_string(),
511 "object".to_string(),
512 Arc::new(StorageStub),
513 options,
514 );
515
516 let (raw_key, _, _, _) = create_key_helper();
517 let key = KeyAes256::new(&raw_key)?;
518 let builder = builder.set_key(key.clone());
519 let want = OpenObjectRequest::default()
520 .set_bucket("bucket")
521 .set_object("object")
522 .set_common_object_request_params(CommonObjectRequestParams::from(key));
523 assert_eq!(builder.request, want);
524 Ok(())
525 }
526
527 #[tokio::test]
528 async fn request_options() -> Result<()> {
529 use crate::read_resume_policy::NeverResume;
530 use gax::exponential_backoff::ExponentialBackoffBuilder;
531 use gax::retry_policy::Aip194Strict;
532 use gax::retry_throttler::CircuitBreaker;
533
534 let options = RequestOptions::new();
535 let builder = OpenObject::new(
536 "bucket".to_string(),
537 "object".to_string(),
538 Arc::new(StorageStub),
539 options.clone(),
540 )
541 .with_backoff_policy(
542 ExponentialBackoffBuilder::default()
543 .with_scaling(4.0)
544 .build()
545 .expect("expontial backoff builds"),
546 )
547 .with_retry_policy(Aip194Strict)
548 .with_retry_throttler(CircuitBreaker::default())
549 .with_read_resume_policy(NeverResume)
550 .with_attempt_timeout(Duration::from_secs(120));
551
552 let got = builder.options;
553 assert!(
554 format!("{:?}", got.backoff_policy).contains("ExponentialBackoff"),
555 "{got:?}"
556 );
557 assert!(
558 format!("{:?}", got.retry_policy).contains("Aip194Strict"),
559 "{got:?}"
560 );
561 assert!(
562 format!("{:?}", got.retry_throttler).contains("CircuitBreaker"),
563 "{got:?}"
564 );
565 assert!(
566 format!("{:?}", got.read_resume_policy()).contains("NeverResume"),
567 "{got:?}"
568 );
569 assert_eq!(
570 got.bidi_attempt_timeout,
571 Duration::from_secs(120),
572 "{got:?}"
573 );
574
575 Ok(())
576 }
577
578 #[tokio::test]
579 async fn send() -> anyhow::Result<()> {
580 use storage_grpc_mock::google::storage::v2::{
581 BidiReadObjectResponse, Object as ProtoObject,
582 };
583 use storage_grpc_mock::{MockStorage, start};
584
585 let (tx, rx) = tokio::sync::mpsc::channel::<tonic::Result<BidiReadObjectResponse>>(1);
586 let initial = BidiReadObjectResponse {
587 metadata: Some(ProtoObject {
588 bucket: "projects/_/buckets/test-bucket".to_string(),
589 name: "test-object".to_string(),
590 generation: 123456,
591 ..ProtoObject::default()
592 }),
593 ..BidiReadObjectResponse::default()
594 };
595 tx.send(Ok(initial.clone())).await?;
596
597 let mut mock = MockStorage::new();
598 mock.expect_bidi_read_object()
599 .return_once(|_| Ok(tonic::Response::from(rx)));
600 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
601
602 let client = Storage::builder()
603 .with_credentials(Anonymous::new().build())
604 .with_endpoint(endpoint.clone())
605 .build()
606 .await?;
607
608 let descriptor = client
609 .open_object("projects/_/buckets/test-bucket", "test-object")
610 .send()
611 .await?;
612 let want = Object::new()
613 .set_bucket("projects/_/buckets/test-bucket")
614 .set_name("test-object")
615 .set_generation(123456);
616 assert_eq!(descriptor.object(), want, "{descriptor:?}");
617 assert_eq!(
618 descriptor.headers().get("content-type"),
619 Some(&HeaderValue::from_static("application/grpc")),
620 "headers={:?}",
621 descriptor.headers()
622 );
623 Ok(())
624 }
625
626 #[tokio::test]
627 async fn send_and_read() -> anyhow::Result<()> {
628 use storage_grpc_mock::{MockStorage, start};
629
630 let (tx, rx) = tokio::sync::mpsc::channel::<tonic::Result<BidiReadObjectResponse>>(1);
631 let payload = Vec::from_iter((0..32).map(|i| i as u8));
632 let initial = BidiReadObjectResponse {
633 metadata: Some(ProtoObject {
634 bucket: "projects/_/buckets/test-bucket".to_string(),
635 name: "test-object".to_string(),
636 generation: 123456,
637 ..ProtoObject::default()
638 }),
639 object_data_ranges: vec![ObjectRangeData {
640 read_range: Some(ProtoRange {
641 read_id: 0_i64,
642 ..ProtoRange::default()
643 }),
644 range_end: true,
645 checksummed_data: Some(ChecksummedData {
646 content: payload.clone(),
647 crc32c: None,
648 }),
649 }],
650 ..BidiReadObjectResponse::default()
651 };
652 tx.send(Ok(initial.clone())).await?;
653
654 let mut mock = MockStorage::new();
655 mock.expect_bidi_read_object()
656 .return_once(|_| Ok(tonic::Response::from(rx)));
657 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
658
659 let client = Storage::builder()
660 .with_credentials(Anonymous::new().build())
661 .with_endpoint(endpoint.clone())
662 .build()
663 .await?;
664
665 let (descriptor, mut reader) = client
666 .open_object("projects/_/buckets/test-bucket", "test-object")
667 .send_and_read(ReadRange::tail(32))
668 .await?;
669 let want = Object::new()
670 .set_bucket("projects/_/buckets/test-bucket")
671 .set_name("test-object")
672 .set_generation(123456);
673 assert_eq!(descriptor.object(), want, "{descriptor:?}");
674 assert_eq!(
675 descriptor.headers().get("content-type"),
676 Some(&HeaderValue::from_static("application/grpc")),
677 "headers={:?}",
678 descriptor.headers()
679 );
680
681 let mut got_payload = Vec::new();
682 while let Some(chunk) = reader.next().await.transpose()? {
683 got_payload.extend_from_slice(&chunk);
684 }
685 assert_eq!(got_payload, payload);
686 Ok(())
687 }
688
689 #[tokio::test(start_paused = true)]
690 async fn timeout() -> anyhow::Result<()> {
691 use storage_grpc_mock::google::storage::v2::BidiReadObjectResponse;
692 use storage_grpc_mock::{MockStorage, start};
693
694 let (_tx, rx) = tokio::sync::mpsc::channel::<tonic::Result<BidiReadObjectResponse>>(1);
695
696 let mut mock = MockStorage::new();
697 mock.expect_bidi_read_object()
698 .return_once(|_| Ok(tonic::Response::from(rx)));
699 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
700
701 let client = Storage::builder()
702 .with_credentials(Anonymous::new().build())
703 .with_endpoint(endpoint.clone())
704 .with_retry_policy(NeverRetry)
705 .build()
706 .await?;
707
708 // This will timeout because we never send the initial message over `_tx`.
709 let target = Duration::from_secs(120);
710 let start = tokio::time::Instant::now();
711 let err = client
712 .open_object("projects/_/buckets/test-bucket", "test-object")
713 .with_attempt_timeout(target)
714 .send()
715 .await
716 .unwrap_err();
717 assert!(err.is_timeout(), "{err:?}");
718 assert_eq!(start.elapsed(), target);
719
720 Ok(())
721 }
722
723 #[derive(Debug)]
724 struct StorageStub;
725 impl crate::stub::Storage for StorageStub {}
726}