1use super::tracing::{TracingObjectDescriptor, TracingResponse};
16use crate::Result;
17use crate::model::{Object, ReadObjectRequest};
18use crate::model_ext::WriteObjectRequest;
19use crate::read_object::ReadObjectResponse;
20use crate::storage::client::StorageInner;
21use crate::storage::info::INSTRUMENTATION;
22use crate::storage::perform_upload::PerformUpload;
23use crate::storage::read_object::Reader;
24use crate::storage::request_options::RequestOptions;
25use crate::storage::streaming_source::{Seek, StreamingSource};
26use crate::{
27 model_ext::OpenObjectRequest, object_descriptor::ObjectDescriptor,
28 storage::bidi::connector::Connector, storage::bidi::transport::ObjectDescriptorTransport,
29};
30use gaxi::observability::{ClientRequestAttributes, DurationMetric, RequestRecorder};
31use std::sync::Arc;
32
33#[derive(Clone, Debug)]
49pub struct Storage {
50 inner: Arc<StorageInner>,
51 tracing: bool,
52 metric: DurationMetric,
53}
54
55impl Storage {
56 pub(crate) fn new(inner: Arc<StorageInner>, tracing: bool) -> Arc<Self> {
57 let metric = DurationMetric::new(&INSTRUMENTATION);
58 Arc::new(Self {
59 inner,
60 tracing,
61 metric,
62 })
63 }
64
65 async fn read_object_plain(
66 &self,
67 request: ReadObjectRequest,
68 options: RequestOptions,
69 ) -> Result<ReadObjectResponse> {
70 let reader = Reader {
71 inner: self.inner.clone(),
72 request,
73 options,
74 };
75 reader.response().await
76 }
77
78 #[tracing::instrument(name = "read_object", level = tracing::Level::DEBUG, ret, err(Debug))]
79 async fn read_object_tracing(
80 &self,
81 request: ReadObjectRequest,
82 options: RequestOptions,
83 ) -> Result<ReadObjectResponse> {
84 let resource_name = format!("//storage.googleapis.com/{}", request.bucket);
85 let (span, pending) = gaxi::client_request_signals!(
86 metric: self.metric.clone(),
87 info: *INSTRUMENTATION,
88 method: "client::Storage::read_object",
89 async {
90 if let Some(recorder) = RequestRecorder::current() {
91 recorder.on_client_request(
92 ClientRequestAttributes::default()
93 .set_url_template("/storage/v1/b/{bucket}/o/{object}")
94 .set_resource_name(resource_name),
95 );
96 }
97 self.read_object_plain(request, options).await
98 });
99
100 let response = pending.await?;
101 let inner = TracingResponse::new(response.into_parts(), span);
102 Ok(ReadObjectResponse::new(Box::new(inner)))
103 }
104
105 async fn write_object_buffered_plain<P>(
106 &self,
107 payload: P,
108 request: WriteObjectRequest,
109 options: RequestOptions,
110 ) -> Result<Object>
111 where
112 P: StreamingSource + Send + Sync + 'static,
113 {
114 PerformUpload::new(
115 payload,
116 self.inner.clone(),
117 request.spec,
118 request.params,
119 options,
120 )
121 .send()
122 .await
123 }
124
125 #[tracing::instrument(name = "write_object_buffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
126 async fn write_object_buffered_tracing<P>(
127 &self,
128 payload: P,
129 request: WriteObjectRequest,
130 options: RequestOptions,
131 ) -> Result<Object>
132 where
133 P: StreamingSource + Send + Sync + 'static,
134 {
135 let resource_name = format!(
136 "//storage.googleapis.com/{}",
137 request
138 .spec
139 .resource
140 .as_ref()
141 .map(|r| r.bucket.as_str())
142 .unwrap_or_default()
143 );
144 let (_span, pending) = gaxi::client_request_signals!(
145 metric: self.metric.clone(),
146 info: *INSTRUMENTATION,
147 method: "client::Storage::write_object",
148 async {
149 if let Some(recorder) = RequestRecorder::current() {
150 recorder.on_client_request(
151 ClientRequestAttributes::default()
152 .set_url_template("/upload/storage/v1/b/{bucket}/o")
153 .set_resource_name(resource_name),
154 );
155 }
156 self.write_object_buffered_plain(payload, request, options).await
157 }
158 );
159 pending.await
160 }
161
162 async fn write_object_unbuffered_plain<P>(
163 &self,
164 payload: P,
165 request: WriteObjectRequest,
166 options: RequestOptions,
167 ) -> Result<Object>
168 where
169 P: StreamingSource + Seek + Send + Sync + 'static,
170 {
171 PerformUpload::new(
172 payload,
173 self.inner.clone(),
174 request.spec,
175 request.params,
176 options,
177 )
178 .send_unbuffered()
179 .await
180 }
181
182 #[tracing::instrument(name = "write_object_unbuffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
183 async fn write_object_unbuffered_tracing<P>(
184 &self,
185 payload: P,
186 request: WriteObjectRequest,
187 options: RequestOptions,
188 ) -> Result<Object>
189 where
190 P: StreamingSource + Seek + Send + Sync + 'static,
191 {
192 let resource_name = format!(
193 "//storage.googleapis.com/{}",
194 request
195 .spec
196 .resource
197 .as_ref()
198 .map(|r| r.bucket.as_str())
199 .unwrap_or_default()
200 );
201 let (_span, pending) = gaxi::client_request_signals!(
202 metric: self.metric.clone(),
203 info: *INSTRUMENTATION,
204 method: "client::Storage::write_object",
205 async {
206 if let Some(recorder) = RequestRecorder::current() {
207 recorder.on_client_request(
208 ClientRequestAttributes::default()
209 .set_url_template("/upload/storage/v1/b/{bucket}/o")
210 .set_resource_name(resource_name),
211 );
212 }
213 self.write_object_unbuffered_plain(payload, request, options).await
214 }
215 );
216 pending.await
217 }
218
219 async fn open_object_plain(
220 &self,
221 request: OpenObjectRequest,
222 options: RequestOptions,
223 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
224 let (spec, ranges) = request.into_parts();
225 let connector = Connector::new(spec, options, self.inner.grpc.clone());
226 let (transport, readers) = ObjectDescriptorTransport::new(connector, ranges).await?;
227 Ok((ObjectDescriptor::new(transport), readers))
228 }
229
230 #[tracing::instrument(name = "open_object", level = tracing::Level::DEBUG, ret, err(Debug))]
231 async fn open_object_tracing(
232 &self,
233 request: OpenObjectRequest,
234 options: RequestOptions,
235 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
236 let resource_name = format!("//storage.googleapis.com/{}", request.bucket);
237 let (span, pending) = gaxi::client_request_signals!(
238 metric: self.metric.clone(),
239 info: *INSTRUMENTATION,
240 method: "client::Storage::open_object",
241 async {
242 if let Some(recorder) = RequestRecorder::current() {
243 recorder.on_client_request(
244 ClientRequestAttributes::default()
245 .set_rpc_method("google.storage.v2.Storage/BidiStreamingRead")
246 .set_url_template("/upload/storage/v1/b/{bucket}/o")
247 .set_resource_name(resource_name),
248 );
249 }
250 self.open_object_plain(request, options).await
251 }
252 );
253 let (descriptor, readers) = pending.await?;
254 let descriptor =
255 ObjectDescriptor::new(TracingObjectDescriptor::new(descriptor.into_parts()));
256 let readers = readers
257 .into_iter()
258 .map(|r| {
259 let inner = r.into_parts();
260 ReadObjectResponse::new(Box::new(TracingResponse::new(inner, span.clone())))
261 })
262 .collect::<Vec<_>>();
263 Ok((descriptor, readers))
264 }
265}
266
267impl super::stub::Storage for Storage {
268 async fn read_object(
270 &self,
271 req: ReadObjectRequest,
272 options: RequestOptions,
273 ) -> Result<ReadObjectResponse> {
274 if self.tracing {
275 return self.read_object_tracing(req, options).await;
276 }
277 self.read_object_plain(req, options).await
278 }
279
280 async fn write_object_buffered<P>(
282 &self,
283 payload: P,
284 req: WriteObjectRequest,
285 options: RequestOptions,
286 ) -> Result<Object>
287 where
288 P: StreamingSource + Send + Sync + 'static,
289 {
290 if self.tracing {
291 return self
292 .write_object_buffered_tracing(payload, req, options)
293 .await;
294 }
295 self.write_object_buffered_plain(payload, req, options)
296 .await
297 }
298
299 async fn write_object_unbuffered<P>(
301 &self,
302 payload: P,
303 req: WriteObjectRequest,
304 options: RequestOptions,
305 ) -> Result<Object>
306 where
307 P: StreamingSource + Seek + Send + Sync + 'static,
308 {
309 if self.tracing {
310 return self
311 .write_object_unbuffered_tracing(payload, req, options)
312 .await;
313 }
314 self.write_object_unbuffered_plain(payload, req, options)
315 .await
316 }
317
318 async fn open_object(
319 &self,
320 request: OpenObjectRequest,
321 options: RequestOptions,
322 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
323 if self.tracing {
324 return self.open_object_tracing(request, options).await;
325 }
326 self.open_object_plain(request, options).await
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::{Storage, StorageInner};
333 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
334 use google_cloud_test_utils::test_layer::AttributeValue;
335 use google_cloud_test_utils::test_layer::{CapturedSpan, TestLayer};
336 use httptest::{Expectation, Server, matchers::*, responders::status_code};
337 use pretty_assertions::assert_eq;
338 use std::collections::BTreeMap;
339 use std::sync::Arc;
340
341 impl Storage {
342 pub(crate) fn new_test(inner: Arc<StorageInner>) -> Arc<Self> {
343 Self::new(inner, false)
344 }
345 }
346
347 #[tokio::test]
348 async fn read_object() -> anyhow::Result<()> {
349 let guard = TestLayer::initialize();
350
351 let server = Server::run();
352 server.expect(
353 Expectation::matching(all_of![
354 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
355 request::query(url_decoded(contains(("alt", "media")))),
356 ])
357 .respond_with(status_code(404)),
358 );
359
360 let client = crate::client::Storage::builder()
361 .with_endpoint(format!("http://{}", server.addr()))
362 .with_credentials(Anonymous::new().build())
363 .with_tracing()
364 .build()
365 .await?;
366 let response = client
367 .read_object("projects/_/buckets/test-bucket", "test-object")
368 .send()
369 .await;
370 assert!(
371 matches!(response, Err(ref e) if e.is_transport()),
372 "{response:?}"
373 );
374
375 let captured = TestLayer::capture(&guard);
376 check_debug_log(&captured, "read_object");
377
378 client_request_span(&captured, "read_object", "404", "http");
379
380 Ok(())
381 }
382
383 #[tokio::test]
384 async fn read_object_success() -> anyhow::Result<()> {
385 let guard = TestLayer::initialize();
386
387 let body = (0..100_000)
388 .map(|i| format!("{i:08} {:1000}", ""))
389 .collect::<Vec<_>>()
390 .join("\n");
391 let server = Server::run();
392 server.expect(
393 Expectation::matching(all_of![
394 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
395 request::query(url_decoded(contains(("alt", "media")))),
396 ])
397 .respond_with(
398 status_code(200)
399 .body(body.clone())
400 .append_header("x-goog-generation", 123456),
401 ),
402 );
403
404 let client = crate::client::Storage::builder()
405 .with_endpoint(format!("http://{}", server.addr()))
406 .with_credentials(Anonymous::new().build())
407 .with_tracing()
408 .build()
409 .await?;
410 let mut got = Vec::new();
411 let mut response = client
412 .read_object("projects/_/buckets/test-bucket", "test-object")
413 .send()
414 .await?;
415 let object = response.object();
416 assert_eq!(object.generation, 123456, "{object:?}");
417 while let Some(b) = response.next().await.transpose()? {
418 got.push(b);
419 }
420
421 let captured = TestLayer::capture(&guard);
422 let span = captured
423 .iter()
424 .find(|s| s.name == "client_request")
425 .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
426 assert_eq!(span.events, got.len() + 1, "{span:?}");
428
429 Ok(())
430 }
431
432 #[tokio::test]
433 async fn write_object_buffered() -> anyhow::Result<()> {
434 let guard = TestLayer::initialize();
435
436 let server = Server::run();
437 server.expect(
438 Expectation::matching(all_of![
439 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
440 request::query(url_decoded(contains(("uploadType", "multipart")))),
441 ])
442 .respond_with(status_code(404)),
443 );
444
445 let client = crate::client::Storage::builder()
446 .with_endpoint(format!("http://{}", server.addr()))
447 .with_credentials(Anonymous::new().build())
448 .with_tracing()
449 .build()
450 .await?;
451 let response = client
452 .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
453 .send_buffered()
454 .await;
455 assert!(
456 matches!(response, Err(ref e) if e.is_transport()),
457 "{response:?}"
458 );
459
460 let captured = TestLayer::capture(&guard);
461 check_debug_log(&captured, "write_object_buffered");
462
463 client_request_span(&captured, "write_object", "404", "http");
464
465 Ok(())
466 }
467
468 #[tokio::test]
469 async fn write_object_unbuffered() -> anyhow::Result<()> {
470 let guard = TestLayer::initialize();
471
472 let server = Server::run();
473 server.expect(
474 Expectation::matching(all_of![
475 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
476 request::query(url_decoded(contains(("uploadType", "multipart")))),
477 ])
478 .respond_with(status_code(404)),
479 );
480
481 let client = crate::client::Storage::builder()
482 .with_endpoint(format!("http://{}", server.addr()))
483 .with_credentials(Anonymous::new().build())
484 .with_tracing()
485 .build()
486 .await?;
487 let response = client
488 .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
489 .send_unbuffered()
490 .await;
491 assert!(
492 matches!(response, Err(ref e) if e.is_transport()),
493 "{response:?}"
494 );
495
496 let captured = TestLayer::capture(&guard);
497 check_debug_log(&captured, "write_object_unbuffered");
498
499 client_request_span(&captured, "write_object", "404", "http");
500
501 Ok(())
502 }
503
504 #[tokio::test]
505 async fn open_object() -> anyhow::Result<()> {
506 use gaxi::grpc::tonic::Status as TonicStatus;
507 use google_cloud_gax::error::rpc::Code;
508 use storage_grpc_mock::{MockStorage, start};
509
510 let guard = TestLayer::initialize();
511
512 let mut mock = MockStorage::new();
513 mock.expect_bidi_read_object()
514 .return_once(|_| Err(TonicStatus::not_found("not here")));
515 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
516
517 let client = crate::client::Storage::builder()
518 .with_credentials(Anonymous::new().build())
519 .with_endpoint(endpoint.clone())
520 .with_tracing()
521 .build()
522 .await?;
523 let response = client
524 .open_object("projects/_/buckets/test-bucket", "test-object")
525 .send()
526 .await;
527 assert!(
528 matches!(response, Err(ref e) if e.status().is_some_and(|s| s.code == Code::NotFound)),
529 "{response:?}"
530 );
531
532 let captured = TestLayer::capture(&guard);
533 check_debug_log(&captured, "open_object");
534
535 client_request_span(&captured, "open_object", "NOT_FOUND", "grpc");
536 Ok(())
537 }
538
539 #[tokio::test]
540 #[ignore = "flaky test, see #5290"]
541 async fn open_object_success() -> anyhow::Result<()> {
542 use crate::model_ext::ReadRange;
544 use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
545 use storage_grpc_mock::google::storage::v2::{
546 BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
547 ReadRange as ProtoRange,
548 };
549 use storage_grpc_mock::{MockStorage, start};
550 const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
551 const OBJECT_NAME: &str = "test-object";
552 const BIND_ADDRESS: &str = "0.0.0.0:0";
553 const PAYLOAD: &str = "the quick brown fox jumps over the lazy dog";
554
555 let guard = TestLayer::initialize();
556
557 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(10);
558 let response = BidiReadObjectResponse {
559 metadata: Some(ProtoObject {
560 bucket: BUCKET_NAME.to_string(),
561 name: OBJECT_NAME.to_string(),
562 generation: 123456,
563 ..ProtoObject::default()
564 }),
565 object_data_ranges: vec![ObjectRangeData {
566 read_range: Some(ProtoRange {
567 read_id: 0_i64,
568 ..ProtoRange::default()
569 }),
570 range_end: true,
571 checksummed_data: Some(ChecksummedData {
572 content: PAYLOAD.as_bytes().to_vec(),
573 crc32c: None,
574 }),
575 }],
576 ..BidiReadObjectResponse::default()
577 };
578 tx.send(Ok(response.clone())).await?;
580 tx.send(Ok(response.clone())).await?;
583 tx.send(Ok(response.clone())).await?;
584
585 let mut mock = MockStorage::new();
586 mock.expect_bidi_read_object()
587 .return_once(|_| Ok(TonicResponse::from(rx)));
588 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
589
590 let client = crate::client::Storage::builder()
591 .with_credentials(Anonymous::new().build())
592 .with_endpoint(endpoint.clone())
593 .with_tracing()
594 .build()
595 .await?;
596 let (descriptor, _reader0) = client
597 .open_object(BUCKET_NAME, OBJECT_NAME)
598 .send_and_read(ReadRange::all())
599 .await?;
600 let _reader1 = descriptor.read_range(ReadRange::offset(5)).await;
601 let _reader2 = descriptor.read_range(ReadRange::segment(10, 10)).await;
602 let _reader3 = descriptor.read_range(ReadRange::tail(15)).await;
603
604 let captured = TestLayer::capture(&guard);
605 let _span = captured
606 .iter()
607 .find(|s| s.name == "client_request")
608 .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
609
610 let range_spans = captured
611 .iter()
612 .filter(|s| s.name == "read_range")
613 .collect::<Vec<_>>();
614
615 let _span_reader1 = range_spans
616 .clone()
617 .into_iter()
618 .find(|s| {
619 s.attributes
620 .get("read_range.start")
621 .and_then(|v| v.as_i64())
622 == Some(5)
623 })
624 .unwrap_or_else(|| {
625 panic!("missing `read_range` span for ReadRange::offset(5): {range_spans:#?}")
626 });
627
628 let _span_reader2 = range_spans
629 .clone()
630 .into_iter()
631 .find(|s| {
632 s.attributes
633 .get("read_range.start")
634 .and_then(|v| v.as_i64())
635 == Some(10)
636 && s.attributes
637 .get("read_range.limit")
638 .and_then(|v| v.as_i64())
639 == Some(10)
640 })
641 .unwrap_or_else(|| {
642 panic!("missing `read_range` span for ReadRange::segment(10, 10): {range_spans:#?}")
643 });
644
645 let _span_reader3 = range_spans
646 .clone()
647 .into_iter()
648 .find(|s| {
649 s.attributes
650 .get("read_range.start")
651 .and_then(|v| v.as_i64())
652 == Some(-15)
653 })
654 .unwrap_or_else(|| {
655 panic!("missing `read_range` span for ReadRange::tail(15): {range_spans:#?}")
656 });
657 Ok(())
658 }
659
660 #[track_caller]
661 fn check_debug_log(captured: &Vec<CapturedSpan>, method: &'static str) {
662 let span = captured
663 .iter()
664 .find(|s| s.name == method)
665 .unwrap_or_else(|| panic!("missing `{method}` span in capture: {captured:#?}"));
666
667 let got = BTreeMap::from_iter(span.attributes.clone());
668 let want = ["self", "options", "request"];
669 let missing = want
670 .iter()
671 .filter(|k| !got.contains_key(**k))
672 .collect::<Vec<_>>();
673 assert!(
674 missing.is_empty(),
675 "missing = {missing:?}\ngot = {:?}\nwant = {want:?}\nfull = {got:#?}",
676 got.keys().collect::<Vec<_>>(),
677 );
678 }
679
680 #[track_caller]
681 fn client_request_span(
682 captured: &Vec<CapturedSpan>,
683 method: &'static str,
684 error_type: &'static str,
685 rpc_system: &'static str,
686 ) {
687 let expected_attributes: [(&str, &str); 6] = [
688 ("otel.kind", "Internal"),
689 ("rpc.system.name", rpc_system),
690 ("otel.status_code", "ERROR"),
691 ("gcp.client.service", "storage"),
692 ("gcp.client.repo", "googleapis/google-cloud-rust"),
693 ("gcp.client.artifact", "google-cloud-storage"),
694 ];
695 let span = captured
696 .iter()
697 .find(|s| s.name == "client_request")
698 .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
699 let got = BTreeMap::from_iter(span.attributes.clone());
700 let want = BTreeMap::<String, AttributeValue>::from_iter(
703 expected_attributes
704 .iter()
705 .map(|(k, v)| (k.to_string(), AttributeValue::from(*v)))
706 .chain(
707 [
708 (
709 "otel.name",
710 format!("google_cloud_storage::client::Storage::{method}").into(),
711 ),
712 ("error.type", error_type.into()),
713 ]
714 .map(|(k, v)| (k.to_string(), v)),
715 ),
716 );
717 let mismatch = want
718 .iter()
719 .filter(|(k, v)| !got.get(k.as_str()).is_some_and(|g| g == *v))
720 .collect::<Vec<_>>();
721 assert!(
722 mismatch.is_empty(),
723 "mismatch = {mismatch:?}\ngot = {got:?}\nwant = {want:?}"
724 );
725 }
726}