google_cloud_storage/storage/open_object.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::Result;
16use crate::model_ext::{KeyAes256, OpenObjectRequest, ReadRange};
17use crate::object_descriptor::ObjectDescriptor;
18use crate::read_object::ReadObjectResponse;
19use crate::read_resume_policy::ReadResumePolicy;
20use crate::request_options::RequestOptions;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// A request builder for [Storage::open_object][crate::client::Storage::open_object].
25///
26/// # Example
27/// ```
28/// use google_cloud_storage::client::Storage;
29/// # use google_cloud_storage::builder::storage::OpenObject;
30/// async fn sample(client: &Storage) -> anyhow::Result<()> {
31/// let builder: OpenObject = client
32/// .open_object("projects/_/buckets/my-bucket", "my-object");
33/// let descriptor = builder
34/// .set_generation(123)
35/// .send()
36/// .await?;
37/// println!("object metadata={:?}", descriptor.object());
38/// // Use `descriptor` to read data from `my-object`.
39/// Ok(())
40/// }
41/// ```
42#[derive(Clone, Debug)]
43pub struct OpenObject<S = crate::storage::transport::Storage> {
44 stub: Arc<S>,
45 request: OpenObjectRequest,
46 options: RequestOptions,
47}
48
49impl<S> OpenObject<S>
50where
51 S: crate::storage::stub::Storage + 'static,
52{
53 /// Sends the request, returning a new object descriptor.
54 ///
55 /// Example:
56 /// ```ignore
57 /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
58 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
59 /// let open = client
60 /// .open_object("projects/_/buckets/my-bucket", "my-object")
61 /// .send()
62 /// .await?;
63 /// println!("object metadata={:?}", open.object());
64 /// # Ok(()) }
65 /// ```
66 pub async fn send(self) -> Result<ObjectDescriptor> {
67 let (descriptor, _) = self.stub.open_object(self.request, self.options).await?;
68 Ok(descriptor)
69 }
70
71 /// Sends the request, returning a new object descriptor and reader.
72 ///
73 /// Example:
74 /// ```ignore
75 /// # use google_cloud_storage::client::Storage;
76 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
77 /// use google_cloud_storage::model_ext::ReadRange;
78 /// let (descriptor, mut reader) = client
79 /// .open_object("projects/_/buckets/my-bucket", "my-object.parquet")
80 /// .send_and_read(ReadRange::tail(32))
81 /// .await?;
82 /// println!("object metadata={:?}", descriptor.object());
83 /// let data = reader.next().await.transpose()?;
84 /// # Ok(()) }
85 /// ```
86 ///
87 /// This method allows applications to open an object and issue a read
88 /// request in the same RPC, which is typically faster than opening an
89 /// object and then issuing a `read_range()` call. This may be useful when
90 /// opening objects that have metadata information in a footer or header.
91 pub async fn send_and_read(
92 mut self,
93 range: ReadRange,
94 ) -> Result<(ObjectDescriptor, ReadObjectResponse)> {
95 self.request.ranges.push(range);
96 let (descriptor, mut readers) = self.stub.open_object(self.request, self.options).await?;
97 if readers.len() == 1 {
98 return Ok((descriptor, readers.pop().unwrap()));
99 }
100 // Even if the service returns multiple read ranges, with different ids,
101 // the code in the library will return an error and close the stream.
102 unreachable!("the stub cannot create more readers")
103 }
104}
105
106impl<S> OpenObject<S> {
107 pub(crate) fn new<B, O>(
108 stub: std::sync::Arc<S>,
109 bucket: B,
110 object: O,
111 options: RequestOptions,
112 ) -> Self
113 where
114 B: Into<String>,
115 O: Into<String>,
116 {
117 let request = OpenObjectRequest::default()
118 .set_bucket(bucket)
119 .set_object(object);
120 Self {
121 request,
122 options,
123 stub,
124 }
125 }
126
127 /// If present, selects a specific revision of this object (as
128 /// opposed to the latest version, the default).
129 ///
130 /// # Example
131 /// ```
132 /// # use google_cloud_storage::client::Storage;
133 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
134 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
135 /// let response = client
136 /// .open_object("projects/_/buckets/my-bucket", "my-object")
137 /// .set_generation(123456)
138 /// .send()
139 /// .await?;
140 /// # Ok(()) }
141 /// ```
142 pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
143 self.request = self.request.set_generation(v.into());
144 self
145 }
146
147 /// Makes the operation conditional on whether the object's current generation
148 /// matches the given value.
149 ///
150 /// # Example
151 /// ```
152 /// # use google_cloud_storage::client::Storage;
153 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
154 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
155 /// let response = client
156 /// .open_object("projects/_/buckets/my-bucket", "my-object")
157 /// .set_if_generation_match(123456)
158 /// .send()
159 /// .await?;
160 /// # Ok(()) }
161 /// ```
162 pub fn set_if_generation_match<T>(mut self, v: T) -> Self
163 where
164 T: Into<i64>,
165 {
166 self.request = self.request.set_if_generation_match(v.into());
167 self
168 }
169
170 /// Makes the operation conditional on whether the object's live generation
171 /// does not match the given value. If no live object exists, the precondition
172 /// fails.
173 ///
174 /// # Example
175 /// ```
176 /// # use google_cloud_storage::client::Storage;
177 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
178 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
179 /// let response = client
180 /// .open_object("projects/_/buckets/my-bucket", "my-object")
181 /// .set_if_generation_not_match(123456)
182 /// .send()
183 /// .await?;
184 /// # Ok(()) }
185 /// ```
186 pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
187 where
188 T: Into<i64>,
189 {
190 self.request = self.request.set_if_generation_not_match(v.into());
191 self
192 }
193
194 /// Makes the operation conditional on whether the object's current
195 /// metageneration matches the given value.
196 ///
197 /// # Example
198 /// ```
199 /// # use google_cloud_storage::client::Storage;
200 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
201 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
202 /// let response = client
203 /// .open_object("projects/_/buckets/my-bucket", "my-object")
204 /// .set_if_metageneration_match(123456)
205 /// .send()
206 /// .await?;
207 /// # Ok(()) }
208 /// ```
209 pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
210 where
211 T: Into<i64>,
212 {
213 self.request = self.request.set_if_metageneration_match(v.into());
214 self
215 }
216
217 /// Makes the operation conditional on whether the object's current
218 /// metageneration does not match the given value.
219 ///
220 /// # Example
221 /// ```
222 /// # use google_cloud_storage::client::Storage;
223 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
224 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
225 /// let response = client
226 /// .open_object("projects/_/buckets/my-bucket", "my-object")
227 /// .set_if_metageneration_not_match(123456)
228 /// .send()
229 /// .await?;
230 /// # Ok(()) }
231 /// ```
232 pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
233 where
234 T: Into<i64>,
235 {
236 self.request = self.request.set_if_metageneration_not_match(v.into());
237 self
238 }
239
240 /// The encryption key used with the Customer-Supplied Encryption Keys
241 /// feature. In raw bytes format (not base64-encoded).
242 ///
243 /// Example:
244 /// ```
245 /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
246 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
247 /// let key: &[u8] = &[97; 32];
248 /// let response = client
249 /// .open_object("projects/_/buckets/my-bucket", "my-object")
250 /// .set_key(KeyAes256::new(key)?)
251 /// .send()
252 /// .await?;
253 /// println!("response details={response:?}");
254 /// # Ok(()) }
255 /// ```
256 pub fn set_key(mut self, v: KeyAes256) -> Self {
257 self.request = self
258 .request
259 .set_common_object_request_params(crate::model::CommonObjectRequestParams::from(v));
260 self
261 }
262
263 /// The retry policy used for this request.
264 ///
265 /// # Example
266 /// ```
267 /// # use google_cloud_storage::client::Storage;
268 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
269 /// use google_cloud_storage::retry_policy::RetryableErrors;
270 /// use std::time::Duration;
271 /// use google_cloud_gax::retry_policy::RetryPolicyExt;
272 /// let response = client
273 /// .open_object("projects/_/buckets/my-bucket", "my-object")
274 /// .with_retry_policy(
275 /// RetryableErrors
276 /// .with_attempt_limit(5)
277 /// .with_time_limit(Duration::from_secs(10)),
278 /// )
279 /// .send()
280 /// .await?;
281 /// println!("response details={response:?}");
282 /// # Ok(()) }
283 /// ```
284 pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
285 mut self,
286 v: V,
287 ) -> Self {
288 self.options.retry_policy = v.into().into();
289 self
290 }
291
292 /// The backoff policy used for this request.
293 ///
294 /// # Example
295 /// ```
296 /// # use google_cloud_storage::client::Storage;
297 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
298 /// use std::time::Duration;
299 /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
300 /// let response = client
301 /// .open_object("projects/_/buckets/my-bucket", "my-object")
302 /// .with_backoff_policy(ExponentialBackoff::default())
303 /// .send()
304 /// .await?;
305 /// println!("response details={response:?}");
306 /// # Ok(()) }
307 /// ```
308 pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
309 mut self,
310 v: V,
311 ) -> Self {
312 self.options.backoff_policy = v.into().into();
313 self
314 }
315
316 /// The retry throttler used for this request.
317 ///
318 /// Most of the time you want to use the same throttler for all the requests
319 /// in a client, and even the same throttler for many clients. Rarely it
320 /// may be necessary to use an custom throttler for some subset of the
321 /// requests.
322 ///
323 /// # Example
324 /// ```
325 /// # use google_cloud_storage::client::Storage;
326 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
327 /// let response = client
328 /// .open_object("projects/_/buckets/my-bucket", "my-object")
329 /// .with_retry_throttler(adhoc_throttler())
330 /// .send()
331 /// .await?;
332 /// println!("response details={response:?}");
333 /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
334 /// # panic!();
335 /// }
336 /// # Ok(()) }
337 /// ```
338 pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
339 mut self,
340 v: V,
341 ) -> Self {
342 self.options.retry_throttler = v.into().into();
343 self
344 }
345
346 /// Configure the resume policy for read requests.
347 ///
348 /// The Cloud Storage client library can automatically resume a read that is
349 /// interrupted by a transient error. Applications may want to limit the
350 /// number of read attempts, or may wish to expand the type of errors
351 /// treated as retryable.
352 ///
353 /// # Example
354 /// ```
355 /// # use google_cloud_storage::client::Storage;
356 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
357 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
358 /// let response = client
359 /// .open_object("projects/_/buckets/my-bucket", "my-object")
360 /// .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
361 /// .send()
362 /// .await?;
363 /// # Ok(()) }
364 /// ```
365 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
366 where
367 V: ReadResumePolicy + 'static,
368 {
369 self.options.set_read_resume_policy(std::sync::Arc::new(v));
370 self
371 }
372
373 /// Configure per-attempt timeout.
374 ///
375 /// # Example
376 /// ```
377 /// # use google_cloud_storage::client::Storage;
378 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
379 /// use std::time::Duration;
380 /// let response = client
381 /// .open_object("projects/_/buckets/my-bucket", "my-object")
382 /// .with_attempt_timeout(Duration::from_secs(120))
383 /// .send()
384 /// .await?;
385 /// # Ok(()) }
386 /// ```
387 ///
388 /// The Cloud Storage client library times out `open_object()` attempts by
389 /// default (with a 60s timeout). Applications may want to set a different
390 /// value depending on how they are deployed.
391 ///
392 /// Note that the per-attempt timeout is subject to the overall retry loop
393 /// time limits (if any). The effective timeout for each attempt is the
394 /// smallest of (a) the per-attempt timeout, and (b) the remaining time in
395 /// the retry loop.
396 pub fn with_attempt_timeout(mut self, v: Duration) -> Self {
397 self.options.set_bidi_attempt_timeout(v);
398 self
399 }
400
401 /// Sets the `User-Agent` header for this request.
402 ///
403 /// # Example
404 /// ```
405 /// # use google_cloud_storage::client::Storage;
406 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
407 /// let mut response = client
408 /// .open_object("projects/_/buckets/my-bucket", "my-object")
409 /// .with_user_agent("my-app/1.0.0")
410 /// .send()
411 /// .await?;
412 /// println!("response details={response:?}");
413 /// # Ok(()) }
414 /// ```
415 pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
416 self.options.user_agent = Some(user_agent.into());
417 self
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use crate::client::Storage;
425 use crate::model::{CommonObjectRequestParams, Object};
426 use crate::model_ext::tests::create_key_helper;
427 use anyhow::Result;
428 use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
429 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
430 use google_cloud_gax::retry_policy::NeverRetry;
431 use http::HeaderValue;
432 use static_assertions::assert_impl_all;
433 use storage_grpc_mock::google::storage::v2::{
434 BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
435 ReadRange as ProtoRange,
436 };
437 use storage_grpc_mock::{MockStorage, start};
438
439 const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
440 const OBJECT_NAME: &str = "test-object";
441 const USER_AGENT: &str = "quick_foxes_lazy_dogs/1.2.3";
442 const BIND_ADDRESS: &str = "0.0.0.0:0";
443
444 // Verify `open_object()` meets normal Send, Sync, requirements.
445 #[tokio::test]
446 async fn traits() -> Result<()> {
447 assert_impl_all!(OpenObject: Clone, std::fmt::Debug, Send, Sync);
448
449 let client = Storage::builder()
450 .with_credentials(Anonymous::new().build())
451 .build()
452 .await?;
453
454 fn need_send<T: Send>(_val: &T) {}
455 fn need_static<T: 'static>(_val: &T) {}
456
457 let open = client.open_object(BUCKET_NAME, OBJECT_NAME);
458 need_static(&open);
459
460 let fut = client.open_object(BUCKET_NAME, OBJECT_NAME).send();
461 need_send(&fut);
462 need_static(&fut);
463 Ok(())
464 }
465
466 #[tokio::test]
467 async fn open_object_normal() -> Result<()> {
468 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
469 let initial = BidiReadObjectResponse {
470 metadata: Some(ProtoObject {
471 bucket: BUCKET_NAME.to_string(),
472 name: OBJECT_NAME.to_string(),
473 generation: 123456,
474 size: 42,
475 ..ProtoObject::default()
476 }),
477 ..BidiReadObjectResponse::default()
478 };
479 tx.send(Ok(initial)).await?;
480
481 let mut mock = MockStorage::new();
482 mock.expect_bidi_read_object()
483 .return_once(|_| Ok(TonicResponse::from(rx)));
484 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
485
486 let client = Storage::builder()
487 .with_endpoint(endpoint)
488 .with_credentials(Anonymous::new().build())
489 .build()
490 .await?;
491 let descriptor = client.open_object(BUCKET_NAME, OBJECT_NAME).send().await?;
492
493 let got = descriptor.object();
494 let want = Object::new()
495 .set_bucket(BUCKET_NAME)
496 .set_name(OBJECT_NAME)
497 .set_generation(123456)
498 .set_size(42);
499 assert_eq!(got, want);
500
501 Ok(())
502 }
503
504 #[tokio::test]
505 async fn attributes() -> Result<()> {
506 let options = RequestOptions::new();
507 let builder = OpenObject::new(
508 Arc::new(StorageStub),
509 BUCKET_NAME.to_string(),
510 OBJECT_NAME.to_string(),
511 options,
512 )
513 .set_generation(123)
514 .set_if_generation_match(234)
515 .set_if_generation_not_match(345)
516 .set_if_metageneration_match(456)
517 .set_if_metageneration_not_match(567);
518 let want = OpenObjectRequest::default()
519 .set_bucket(BUCKET_NAME)
520 .set_object(OBJECT_NAME)
521 .set_generation(123)
522 .set_if_generation_match(234)
523 .set_if_generation_not_match(345)
524 .set_if_metageneration_match(456)
525 .set_if_metageneration_not_match(567);
526 assert_eq!(builder.request, want);
527 Ok(())
528 }
529
530 #[tokio::test]
531 async fn csek() -> Result<()> {
532 let options = RequestOptions::new();
533 let builder = OpenObject::new(
534 Arc::new(StorageStub),
535 BUCKET_NAME.to_string(),
536 OBJECT_NAME.to_string(),
537 options,
538 );
539
540 let (raw_key, _, _, _) = create_key_helper();
541 let key = KeyAes256::new(&raw_key)?;
542 let builder = builder.set_key(key.clone());
543 let want = OpenObjectRequest::default()
544 .set_bucket(BUCKET_NAME)
545 .set_object(OBJECT_NAME)
546 .set_common_object_request_params(CommonObjectRequestParams::from(key));
547 assert_eq!(builder.request, want);
548 Ok(())
549 }
550
551 #[tokio::test]
552 async fn request_options() -> Result<()> {
553 use crate::read_resume_policy::NeverResume;
554 use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
555 use google_cloud_gax::retry_policy::Aip194Strict;
556 use google_cloud_gax::retry_throttler::CircuitBreaker;
557
558 let options = RequestOptions::new();
559 let builder = OpenObject::new(
560 Arc::new(StorageStub),
561 BUCKET_NAME.to_string(),
562 OBJECT_NAME.to_string(),
563 options,
564 )
565 .with_backoff_policy(
566 ExponentialBackoffBuilder::default()
567 .with_scaling(4.0)
568 .build()
569 .expect("expontial backoff builds"),
570 )
571 .with_retry_policy(Aip194Strict)
572 .with_retry_throttler(CircuitBreaker::default())
573 .with_read_resume_policy(NeverResume)
574 .with_attempt_timeout(Duration::from_secs(120))
575 .with_user_agent(USER_AGENT);
576
577 let got = builder.options;
578 assert!(
579 format!("{:?}", got.backoff_policy).contains("ExponentialBackoff"),
580 "{got:?}"
581 );
582 assert!(
583 format!("{:?}", got.retry_policy).contains("Aip194Strict"),
584 "{got:?}"
585 );
586 assert!(
587 format!("{:?}", got.retry_throttler).contains("CircuitBreaker"),
588 "{got:?}"
589 );
590 assert!(
591 format!("{:?}", got.read_resume_policy()).contains("NeverResume"),
592 "{got:?}"
593 );
594 assert_eq!(
595 got.bidi_attempt_timeout,
596 Duration::from_secs(120),
597 "{got:?}"
598 );
599 assert_eq!(got.user_agent.as_deref(), Some(USER_AGENT), "{got:?}");
600
601 Ok(())
602 }
603
604 #[tokio::test]
605 async fn send() -> anyhow::Result<()> {
606 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
607 let initial = BidiReadObjectResponse {
608 metadata: Some(ProtoObject {
609 bucket: BUCKET_NAME.to_string(),
610 name: OBJECT_NAME.to_string(),
611 generation: 123456,
612 ..ProtoObject::default()
613 }),
614 ..BidiReadObjectResponse::default()
615 };
616 tx.send(Ok(initial)).await?;
617
618 let mut mock = MockStorage::new();
619 mock.expect_bidi_read_object()
620 .return_once(|_| Ok(TonicResponse::from(rx)));
621 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
622
623 let client = Storage::builder()
624 .with_credentials(Anonymous::new().build())
625 .with_endpoint(endpoint)
626 .build()
627 .await?;
628
629 let descriptor = client.open_object(BUCKET_NAME, OBJECT_NAME).send().await?;
630 let want = Object::new()
631 .set_bucket(BUCKET_NAME)
632 .set_name(OBJECT_NAME)
633 .set_generation(123456);
634 assert_eq!(descriptor.object(), want, "{descriptor:?}");
635 assert_eq!(
636 descriptor.headers().get("content-type"),
637 Some(&HeaderValue::from_static("application/grpc")),
638 "headers={:?}",
639 descriptor.headers()
640 );
641 Ok(())
642 }
643
644 #[tokio::test]
645 async fn send_and_read() -> anyhow::Result<()> {
646 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
647 let payload = Vec::from_iter((0..32).map(|i| i as u8));
648 let initial = BidiReadObjectResponse {
649 metadata: Some(ProtoObject {
650 bucket: BUCKET_NAME.to_string(),
651 name: OBJECT_NAME.to_string(),
652 generation: 123456,
653 ..ProtoObject::default()
654 }),
655 object_data_ranges: vec![ObjectRangeData {
656 read_range: Some(ProtoRange {
657 read_id: 0_i64,
658 ..ProtoRange::default()
659 }),
660 range_end: true,
661 checksummed_data: Some(ChecksummedData {
662 content: payload.clone(),
663 crc32c: None,
664 }),
665 }],
666 ..BidiReadObjectResponse::default()
667 };
668 tx.send(Ok(initial)).await?;
669
670 let mut mock = MockStorage::new();
671 mock.expect_bidi_read_object()
672 .return_once(|_| Ok(TonicResponse::from(rx)));
673 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
674
675 let client = Storage::builder()
676 .with_credentials(Anonymous::new().build())
677 .with_endpoint(endpoint)
678 .build()
679 .await?;
680
681 let (descriptor, mut reader) = client
682 .open_object(BUCKET_NAME, OBJECT_NAME)
683 .send_and_read(ReadRange::tail(32))
684 .await?;
685 let want = Object::new()
686 .set_bucket(BUCKET_NAME)
687 .set_name(OBJECT_NAME)
688 .set_generation(123456);
689 assert_eq!(descriptor.object(), want, "{descriptor:?}");
690 assert_eq!(
691 descriptor.headers().get("content-type"),
692 Some(&HeaderValue::from_static("application/grpc")),
693 "headers={:?}",
694 descriptor.headers()
695 );
696
697 let mut got_payload = Vec::new();
698 while let Some(chunk) = reader.next().await.transpose()? {
699 got_payload.extend_from_slice(&chunk);
700 }
701 assert_eq!(got_payload, payload);
702 Ok(())
703 }
704
705 #[tokio::test(start_paused = true)]
706 async fn timeout() -> anyhow::Result<()> {
707 let (_tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
708
709 let mut mock = MockStorage::new();
710 mock.expect_bidi_read_object()
711 .return_once(|_| Ok(TonicResponse::from(rx)));
712 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
713
714 let client = Storage::builder()
715 .with_credentials(Anonymous::new().build())
716 .with_endpoint(endpoint)
717 .with_retry_policy(NeverRetry)
718 .build()
719 .await?;
720
721 // This will timeout because we never send the initial message over `_tx`.
722 let target = Duration::from_secs(120);
723 let start = tokio::time::Instant::now();
724 let err = client
725 .open_object(BUCKET_NAME, OBJECT_NAME)
726 .with_attempt_timeout(target)
727 .send()
728 .await
729 .unwrap_err();
730 assert!(err.is_timeout(), "{err:?}");
731 assert_eq!(start.elapsed(), target);
732
733 Ok(())
734 }
735
736 #[tokio::test]
737 async fn user_agent() -> anyhow::Result<()> {
738 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(1);
739 let initial = BidiReadObjectResponse {
740 metadata: Some(ProtoObject {
741 bucket: BUCKET_NAME.to_string(),
742 name: OBJECT_NAME.to_string(),
743 generation: 123456,
744 ..ProtoObject::default()
745 }),
746 ..BidiReadObjectResponse::default()
747 };
748 tx.send(Ok(initial)).await?;
749
750 let mut mock = MockStorage::new();
751 mock.expect_bidi_read_object().return_once(|request| {
752 let metadata = request.metadata();
753 let user_agent = metadata
754 .get(http::header::USER_AGENT.as_str())
755 .and_then(|v| v.to_str().ok())
756 .expect("user-agent should be set");
757
758 let got = user_agent.split(' ').any(|s| s == USER_AGENT);
759 assert!(got, "{user_agent:?}");
760
761 Ok(TonicResponse::from(rx))
762 });
763 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
764
765 let client = Storage::builder()
766 .with_credentials(Anonymous::new().build())
767 .with_endpoint(endpoint)
768 .build()
769 .await?;
770
771 let _descriptor = client
772 .open_object(BUCKET_NAME, OBJECT_NAME)
773 .with_user_agent(USER_AGENT)
774 .send()
775 .await?;
776 Ok(())
777 }
778
779 #[derive(Debug)]
780 struct StorageStub;
781 impl crate::stub::Storage for StorageStub {}
782}