1use super::tracing::{TracingObjectDescriptor, TracingResponse};
16use crate::Result;
17use crate::model::{Object, ReadObjectRequest};
18use crate::model_ext::WriteObjectRequest;
19use crate::read_object::ReadObjectResponse;
20use crate::storage::client::StorageInner;
21use crate::storage::info::INSTRUMENTATION;
22use crate::storage::perform_upload::PerformUpload;
23use crate::storage::read_object::Reader;
24use crate::storage::request_options::RequestOptions;
25use crate::storage::streaming_source::{Seek, StreamingSource};
26use crate::{
27 model_ext::OpenObjectRequest, object_descriptor::ObjectDescriptor,
28 storage::bidi::connector::Connector, storage::bidi::transport::ObjectDescriptorTransport,
29};
30use gaxi::observability::{ClientRequestAttributes, DurationMetric, RequestRecorder};
31use std::sync::Arc;
32
33#[derive(Clone, Debug)]
49pub struct Storage {
50 inner: Arc<StorageInner>,
51 tracing: bool,
52 metric: DurationMetric,
53}
54
55impl Storage {
56 #[cfg(test)]
57 pub(crate) fn new_test(inner: Arc<StorageInner>) -> Arc<Self> {
58 Self::new(inner, false)
59 }
60
61 pub(crate) fn new(inner: Arc<StorageInner>, tracing: bool) -> Arc<Self> {
62 let metric = DurationMetric::new(&INSTRUMENTATION);
63 Arc::new(Self {
64 inner,
65 tracing,
66 metric,
67 })
68 }
69
70 async fn read_object_plain(
71 &self,
72 request: ReadObjectRequest,
73 options: RequestOptions,
74 ) -> Result<ReadObjectResponse> {
75 let reader = Reader {
76 inner: self.inner.clone(),
77 request,
78 options,
79 };
80 reader.response().await
81 }
82
83 #[tracing::instrument(name = "read_object", level = tracing::Level::DEBUG, ret, err(Debug))]
84 async fn read_object_tracing(
85 &self,
86 request: ReadObjectRequest,
87 options: RequestOptions,
88 ) -> Result<ReadObjectResponse> {
89 let resource_name = format!("//storage.googleapis.com/{}", request.bucket);
90 let (span, pending) = gaxi::client_request_signals!(
91 metric: self.metric.clone(),
92 info: *INSTRUMENTATION,
93 method: "client::Storage::read_object",
94 async {
95 if let Some(recorder) = RequestRecorder::current() {
96 recorder.on_client_request(
97 ClientRequestAttributes::default()
98 .set_rpc_method("google.storage.v2.Storage/ReadObject")
99 .set_url_template("/storage/v1/b/{bucket}/o/{object}")
100 .set_resource_name(resource_name),
101 );
102 }
103 self.read_object_plain(request, options).await
104 });
105
106 let response = pending.await?;
107 let inner = TracingResponse::new(response.into_parts(), span);
108 Ok(ReadObjectResponse::new(Box::new(inner)))
109 }
110
111 async fn write_object_buffered_plain<P>(
112 &self,
113 payload: P,
114 request: WriteObjectRequest,
115 options: RequestOptions,
116 ) -> Result<Object>
117 where
118 P: StreamingSource + Send + Sync + 'static,
119 {
120 PerformUpload::new(
121 payload,
122 self.inner.clone(),
123 request.spec,
124 request.params,
125 options,
126 )
127 .send()
128 .await
129 }
130
131 #[tracing::instrument(name = "write_object_buffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
132 async fn write_object_buffered_tracing<P>(
133 &self,
134 payload: P,
135 request: WriteObjectRequest,
136 options: RequestOptions,
137 ) -> Result<Object>
138 where
139 P: StreamingSource + Send + Sync + 'static,
140 {
141 let resource_name = format!(
142 "//storage.googleapis.com/{}",
143 request
144 .spec
145 .resource
146 .as_ref()
147 .map(|r| r.bucket.as_str())
148 .unwrap_or_default()
149 );
150 let (_span, pending) = gaxi::client_request_signals!(
151 metric: self.metric.clone(),
152 info: *INSTRUMENTATION,
153 method: "client::Storage::write_object",
154 async {
155 if let Some(recorder) = RequestRecorder::current() {
156 recorder.on_client_request(
157 ClientRequestAttributes::default()
158 .set_rpc_method("google.storage.v2.Storage/WriteObject")
159 .set_url_template("/upload/storage/v1/b/{bucket}/o")
160 .set_resource_name(resource_name),
161 );
162 }
163 self.write_object_buffered_plain(payload, request, options).await
164 }
165 );
166 pending.await
167 }
168
169 async fn write_object_unbuffered_plain<P>(
170 &self,
171 payload: P,
172 request: WriteObjectRequest,
173 options: RequestOptions,
174 ) -> Result<Object>
175 where
176 P: StreamingSource + Seek + Send + Sync + 'static,
177 {
178 PerformUpload::new(
179 payload,
180 self.inner.clone(),
181 request.spec,
182 request.params,
183 options,
184 )
185 .send_unbuffered()
186 .await
187 }
188
189 #[tracing::instrument(name = "write_object_unbuffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
190 async fn write_object_unbuffered_tracing<P>(
191 &self,
192 payload: P,
193 request: WriteObjectRequest,
194 options: RequestOptions,
195 ) -> Result<Object>
196 where
197 P: StreamingSource + Seek + Send + Sync + 'static,
198 {
199 let resource_name = format!(
200 "//storage.googleapis.com/{}",
201 request
202 .spec
203 .resource
204 .as_ref()
205 .map(|r| r.bucket.as_str())
206 .unwrap_or_default()
207 );
208 let (_span, pending) = gaxi::client_request_signals!(
209 metric: self.metric.clone(),
210 info: *INSTRUMENTATION,
211 method: "client::Storage::write_object",
212 async {
213 if let Some(recorder) = RequestRecorder::current() {
214 recorder.on_client_request(
215 ClientRequestAttributes::default()
216 .set_rpc_method("google.storage.v2.Storage/WriteObject")
217 .set_url_template("/upload/storage/v1/b/{bucket}/o")
218 .set_resource_name(resource_name),
219 );
220 }
221 self.write_object_unbuffered_plain(payload, request, options).await
222 }
223 );
224 pending.await
225 }
226
227 async fn open_object_plain(
228 &self,
229 request: OpenObjectRequest,
230 options: RequestOptions,
231 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
232 let (spec, ranges) = request.into_parts();
233 let connector = Connector::new(spec, options, self.inner.grpc.clone());
234 let (transport, readers) = ObjectDescriptorTransport::new(connector, ranges).await?;
235 Ok((ObjectDescriptor::new(transport), readers))
236 }
237
238 #[tracing::instrument(name = "open_object", level = tracing::Level::DEBUG, ret, err(Debug))]
239 async fn open_object_tracing(
240 &self,
241 request: OpenObjectRequest,
242 options: RequestOptions,
243 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
244 let resource_name = format!("//storage.googleapis.com/{}", request.bucket);
245 let (span, pending) = gaxi::client_request_signals!(
246 metric: self.metric.clone(),
247 info: *INSTRUMENTATION,
248 method: "client::Storage::open_object",
249 async {
250 if let Some(recorder) = RequestRecorder::current() {
251 recorder.on_client_request(
252 ClientRequestAttributes::default()
253 .set_rpc_method("google.storage.v2.Storage/BidiStreamingRead")
254 .set_url_template("/upload/storage/v1/b/{bucket}/o")
255 .set_resource_name(resource_name),
256 );
257 }
258 self.open_object_plain(request, options).await
259 }
260 );
261 let (descriptor, readers) = pending.await?;
262 let descriptor =
263 ObjectDescriptor::new(TracingObjectDescriptor::new(descriptor.into_parts()));
264 let readers = readers
265 .into_iter()
266 .map(|r| {
267 let inner = r.into_parts();
268 ReadObjectResponse::new(Box::new(TracingResponse::new(inner, span.clone())))
269 })
270 .collect::<Vec<_>>();
271 Ok((descriptor, readers))
272 }
273}
274
275impl super::stub::Storage for Storage {
276 async fn read_object(
278 &self,
279 req: ReadObjectRequest,
280 options: RequestOptions,
281 ) -> Result<ReadObjectResponse> {
282 if self.tracing {
283 return self.read_object_tracing(req, options).await;
284 }
285 self.read_object_plain(req, options).await
286 }
287
288 async fn write_object_buffered<P>(
290 &self,
291 payload: P,
292 req: WriteObjectRequest,
293 options: RequestOptions,
294 ) -> Result<Object>
295 where
296 P: StreamingSource + Send + Sync + 'static,
297 {
298 if self.tracing {
299 return self
300 .write_object_buffered_tracing(payload, req, options)
301 .await;
302 }
303 self.write_object_buffered_plain(payload, req, options)
304 .await
305 }
306
307 async fn write_object_unbuffered<P>(
309 &self,
310 payload: P,
311 req: WriteObjectRequest,
312 options: RequestOptions,
313 ) -> Result<Object>
314 where
315 P: StreamingSource + Seek + Send + Sync + 'static,
316 {
317 if self.tracing {
318 return self
319 .write_object_unbuffered_tracing(payload, req, options)
320 .await;
321 }
322 self.write_object_unbuffered_plain(payload, req, options)
323 .await
324 }
325
326 async fn open_object(
327 &self,
328 request: OpenObjectRequest,
329 options: RequestOptions,
330 ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
331 if self.tracing {
332 return self.open_object_tracing(request, options).await;
333 }
334 self.open_object_plain(request, options).await
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
341 use google_cloud_test_utils::test_layer::AttributeValue;
342 use google_cloud_test_utils::test_layer::{CapturedSpan, TestLayer};
343 use httptest::{Expectation, Server, matchers::*, responders::status_code};
344 use pretty_assertions::assert_eq;
345 use std::collections::BTreeMap;
346
347 #[tokio::test]
348 async fn read_object() -> anyhow::Result<()> {
349 let guard = TestLayer::initialize();
350
351 let server = Server::run();
352 server.expect(
353 Expectation::matching(all_of![
354 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
355 request::query(url_decoded(contains(("alt", "media")))),
356 ])
357 .respond_with(status_code(404)),
358 );
359
360 let client = crate::client::Storage::builder()
361 .with_endpoint(format!("http://{}", server.addr()))
362 .with_credentials(Anonymous::new().build())
363 .with_tracing()
364 .build()
365 .await?;
366 let response = client
367 .read_object("projects/_/buckets/test-bucket", "test-object")
368 .send()
369 .await;
370 assert!(
371 matches!(response, Err(ref e) if e.is_transport()),
372 "{response:?}"
373 );
374
375 let captured = TestLayer::capture(&guard);
376 check_debug_log(&captured, "read_object");
377
378 client_request_span(&captured, "read_object", "404", "http");
379
380 Ok(())
381 }
382
383 #[tokio::test]
384 async fn read_object_success() -> anyhow::Result<()> {
385 let guard = TestLayer::initialize();
386
387 let body = (0..100_000)
388 .map(|i| format!("{i:08} {:1000}", ""))
389 .collect::<Vec<_>>()
390 .join("\n");
391 let server = Server::run();
392 server.expect(
393 Expectation::matching(all_of![
394 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
395 request::query(url_decoded(contains(("alt", "media")))),
396 ])
397 .respond_with(
398 status_code(200)
399 .body(body.clone())
400 .append_header("x-goog-generation", 123456),
401 ),
402 );
403
404 let client = crate::client::Storage::builder()
405 .with_endpoint(format!("http://{}", server.addr()))
406 .with_credentials(Anonymous::new().build())
407 .with_tracing()
408 .build()
409 .await?;
410 let mut got = Vec::new();
411 let mut response = client
412 .read_object("projects/_/buckets/test-bucket", "test-object")
413 .send()
414 .await?;
415 let object = response.object();
416 assert_eq!(object.generation, 123456, "{object:?}");
417 while let Some(b) = response.next().await.transpose()? {
418 got.push(b);
419 }
420
421 let captured = TestLayer::capture(&guard);
422 let span = captured
423 .iter()
424 .find(|s| s.name == "client_request")
425 .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
426 assert_eq!(span.events, got.len() + 1, "{span:?}");
428
429 Ok(())
430 }
431
432 #[tokio::test]
433 async fn write_object_buffered() -> anyhow::Result<()> {
434 let guard = TestLayer::initialize();
435
436 let server = Server::run();
437 server.expect(
438 Expectation::matching(all_of![
439 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
440 request::query(url_decoded(contains(("uploadType", "multipart")))),
441 ])
442 .respond_with(status_code(404)),
443 );
444
445 let client = crate::client::Storage::builder()
446 .with_endpoint(format!("http://{}", server.addr()))
447 .with_credentials(Anonymous::new().build())
448 .with_tracing()
449 .build()
450 .await?;
451 let response = client
452 .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
453 .send_buffered()
454 .await;
455 assert!(
456 matches!(response, Err(ref e) if e.is_transport()),
457 "{response:?}"
458 );
459
460 let captured = TestLayer::capture(&guard);
461 check_debug_log(&captured, "write_object_buffered");
462
463 client_request_span(&captured, "write_object", "404", "http");
464
465 Ok(())
466 }
467
468 #[tokio::test]
469 async fn write_object_unbuffered() -> anyhow::Result<()> {
470 let guard = TestLayer::initialize();
471
472 let server = Server::run();
473 server.expect(
474 Expectation::matching(all_of![
475 request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
476 request::query(url_decoded(contains(("uploadType", "multipart")))),
477 ])
478 .respond_with(status_code(404)),
479 );
480
481 let client = crate::client::Storage::builder()
482 .with_endpoint(format!("http://{}", server.addr()))
483 .with_credentials(Anonymous::new().build())
484 .with_tracing()
485 .build()
486 .await?;
487 let response = client
488 .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
489 .send_unbuffered()
490 .await;
491 assert!(
492 matches!(response, Err(ref e) if e.is_transport()),
493 "{response:?}"
494 );
495
496 let captured = TestLayer::capture(&guard);
497 check_debug_log(&captured, "write_object_unbuffered");
498
499 client_request_span(&captured, "write_object", "404", "http");
500
501 Ok(())
502 }
503
504 #[tokio::test]
505 async fn open_object() -> anyhow::Result<()> {
506 use gaxi::grpc::tonic::Status as TonicStatus;
507 use google_cloud_gax::error::rpc::Code;
508 use storage_grpc_mock::{MockStorage, start};
509
510 let guard = TestLayer::initialize();
511
512 let mut mock = MockStorage::new();
513 mock.expect_bidi_read_object()
514 .return_once(|_| Err(TonicStatus::not_found("not here")));
515 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
516
517 let client = crate::client::Storage::builder()
518 .with_credentials(Anonymous::new().build())
519 .with_endpoint(endpoint.clone())
520 .with_tracing()
521 .build()
522 .await?;
523 let response = client
524 .open_object("projects/_/buckets/test-bucket", "test-object")
525 .send()
526 .await;
527 assert!(
528 matches!(response, Err(ref e) if e.status().is_some_and(|s| s.code == Code::NotFound)),
529 "{response:?}"
530 );
531
532 let captured = TestLayer::capture(&guard);
533 check_debug_log(&captured, "open_object");
534
535 client_request_span(&captured, "open_object", "NOT_FOUND", "grpc");
536 Ok(())
537 }
538
539 #[tokio::test]
540 #[ignore = "flaky test, see #5290"]
541 async fn open_object_success() -> anyhow::Result<()> {
542 use crate::model_ext::ReadRange;
544 use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
545 use storage_grpc_mock::google::storage::v2::{
546 BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
547 ReadRange as ProtoRange,
548 };
549 use storage_grpc_mock::{MockStorage, start};
550 const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
551 const OBJECT_NAME: &str = "test-object";
552 const BIND_ADDRESS: &str = "0.0.0.0:0";
553 const PAYLOAD: &str = "the quick brown fox jumps over the lazy dog";
554
555 let guard = TestLayer::initialize();
556
557 let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(10);
558 let response = BidiReadObjectResponse {
559 metadata: Some(ProtoObject {
560 bucket: BUCKET_NAME.to_string(),
561 name: OBJECT_NAME.to_string(),
562 generation: 123456,
563 ..ProtoObject::default()
564 }),
565 object_data_ranges: vec![ObjectRangeData {
566 read_range: Some(ProtoRange {
567 read_id: 0_i64,
568 ..ProtoRange::default()
569 }),
570 range_end: true,
571 checksummed_data: Some(ChecksummedData {
572 content: PAYLOAD.as_bytes().to_vec(),
573 crc32c: None,
574 }),
575 }],
576 ..BidiReadObjectResponse::default()
577 };
578 tx.send(Ok(response.clone())).await?;
580 tx.send(Ok(response.clone())).await?;
583 tx.send(Ok(response.clone())).await?;
584
585 let mut mock = MockStorage::new();
586 mock.expect_bidi_read_object()
587 .return_once(|_| Ok(TonicResponse::from(rx)));
588 let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
589
590 let client = crate::client::Storage::builder()
591 .with_credentials(Anonymous::new().build())
592 .with_endpoint(endpoint.clone())
593 .with_tracing()
594 .build()
595 .await?;
596 let (descriptor, _reader0) = client
597 .open_object(BUCKET_NAME, OBJECT_NAME)
598 .send_and_read(ReadRange::all())
599 .await?;
600 let _reader1 = descriptor.read_range(ReadRange::offset(5)).await;
601 let _reader2 = descriptor.read_range(ReadRange::segment(10, 10)).await;
602 let _reader3 = descriptor.read_range(ReadRange::tail(15)).await;
603
604 let captured = TestLayer::capture(&guard);
605 let _span = captured
606 .iter()
607 .find(|s| s.name == "client_request")
608 .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
609
610 let range_spans = captured
611 .iter()
612 .filter(|s| s.name == "read_range")
613 .collect::<Vec<_>>();
614
615 let _span_reader1 = range_spans
616 .clone()
617 .into_iter()
618 .find(|s| {
619 s.attributes
620 .get("read_range.start")
621 .and_then(|v| v.as_i64())
622 == Some(5)
623 })
624 .unwrap_or_else(|| {
625 panic!("missing `read_range` span for ReadRange::offset(5): {range_spans:#?}")
626 });
627
628 let _span_reader2 = range_spans
629 .clone()
630 .into_iter()
631 .find(|s| {
632 s.attributes
633 .get("read_range.start")
634 .and_then(|v| v.as_i64())
635 == Some(10)
636 && s.attributes
637 .get("read_range.limit")
638 .and_then(|v| v.as_i64())
639 == Some(10)
640 })
641 .unwrap_or_else(|| {
642 panic!("missing `read_range` span for ReadRange::segment(10, 10): {range_spans:#?}")
643 });
644
645 let _span_reader3 = range_spans
646 .clone()
647 .into_iter()
648 .find(|s| {
649 s.attributes
650 .get("read_range.start")
651 .and_then(|v| v.as_i64())
652 == Some(-15)
653 })
654 .unwrap_or_else(|| {
655 panic!("missing `read_range` span for ReadRange::tail(15): {range_spans:#?}")
656 });
657 Ok(())
658 }
659
660 #[track_caller]
661 fn check_debug_log(captured: &Vec<CapturedSpan>, method: &'static str) {
662 let span = captured
663 .iter()
664 .find(|s| s.name == method)
665 .unwrap_or_else(|| panic!("missing `{method}` span in capture: {captured:#?}"));
666
667 let got = BTreeMap::from_iter(span.attributes.clone());
668 let want = ["self", "options", "request"];
669 let missing = want
670 .iter()
671 .filter(|k| !got.contains_key(**k))
672 .collect::<Vec<_>>();
673 assert!(
674 missing.is_empty(),
675 "missing = {missing:?}\ngot = {:?}\nwant = {want:?}\nfull = {got:#?}",
676 got.keys().collect::<Vec<_>>(),
677 );
678 }
679
680 #[track_caller]
681 fn client_request_span(
682 captured: &Vec<CapturedSpan>,
683 method: &'static str,
684 error_type: &'static str,
685 rpc_system: &'static str,
686 ) {
687 let expected_attributes: [(&str, &str); 6] = [
688 ("otel.kind", "Internal"),
689 ("rpc.system.name", rpc_system),
690 ("otel.status_code", "ERROR"),
691 ("gcp.client.service", "storage"),
692 ("gcp.client.repo", "googleapis/google-cloud-rust"),
693 ("gcp.client.artifact", "google-cloud-storage"),
694 ];
695 let span = captured
696 .iter()
697 .find(|s| s.name == "client_request")
698 .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
699 let got = BTreeMap::from_iter(span.attributes.clone());
700 let want = BTreeMap::<String, AttributeValue>::from_iter(
703 expected_attributes
704 .iter()
705 .map(|(k, v)| (k.to_string(), AttributeValue::from(*v)))
706 .chain(
707 [
708 (
709 "otel.name",
710 format!("google_cloud_storage::client::Storage::{method}").into(),
711 ),
712 ("error.type", error_type.into()),
713 ]
714 .map(|(k, v)| (k.to_string(), v)),
715 ),
716 );
717 let mismatch = want
718 .iter()
719 .filter(|(k, v)| !got.get(k.as_str()).is_some_and(|g| g == *v))
720 .collect::<Vec<_>>();
721 assert!(
722 mismatch.is_empty(),
723 "mismatch = {mismatch:?}\ngot = {got:?}\nwant = {want:?}"
724 );
725 }
726}