Skip to main content

google_cloud_storage/storage/
transport.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::tracing::{TracingObjectDescriptor, TracingResponse};
16use crate::Result;
17use crate::model::{Object, ReadObjectRequest};
18use crate::model_ext::WriteObjectRequest;
19use crate::read_object::ReadObjectResponse;
20use crate::storage::client::StorageInner;
21use crate::storage::info::INSTRUMENTATION;
22use crate::storage::perform_upload::PerformUpload;
23use crate::storage::read_object::Reader;
24use crate::storage::request_options::RequestOptions;
25use crate::storage::streaming_source::{Seek, StreamingSource};
26use crate::{
27    model_ext::OpenObjectRequest, object_descriptor::ObjectDescriptor,
28    storage::bidi::connector::Connector, storage::bidi::transport::ObjectDescriptorTransport,
29};
30use gaxi::observability::{ClientRequestAttributes, DurationMetric, RequestRecorder};
31use std::sync::Arc;
32
33/// An implementation of [`stub::Storage`][crate::storage::stub::Storage] that
34/// interacts with the Cloud Storage service.
35///
36/// This is the default implementation of a
37/// [`client::Storage<T>`][crate::storage::client::Storage].
38///
39/// ## Example
40///
41/// ```
42/// # async fn sample() -> anyhow::Result<()> {
43/// use google_cloud_storage::client::Storage;
44/// use google_cloud_storage::stub::DefaultStorage;
45/// let client: Storage<DefaultStorage> = Storage::builder().build().await?;
46/// # Ok(()) }
47/// ```
48#[derive(Clone, Debug)]
49pub struct Storage {
50    inner: Arc<StorageInner>,
51    tracing: bool,
52    metric: DurationMetric,
53}
54
55impl Storage {
56    pub(crate) fn new(inner: Arc<StorageInner>, tracing: bool) -> Arc<Self> {
57        let metric = DurationMetric::new(&INSTRUMENTATION);
58        Arc::new(Self {
59            inner,
60            tracing,
61            metric,
62        })
63    }
64
65    async fn read_object_plain(
66        &self,
67        request: ReadObjectRequest,
68        options: RequestOptions,
69    ) -> Result<ReadObjectResponse> {
70        let reader = Reader {
71            inner: self.inner.clone(),
72            request,
73            options,
74        };
75        reader.response().await
76    }
77
78    #[tracing::instrument(name = "read_object", level = tracing::Level::DEBUG, ret, err(Debug))]
79    async fn read_object_tracing(
80        &self,
81        request: ReadObjectRequest,
82        options: RequestOptions,
83    ) -> Result<ReadObjectResponse> {
84        let resource_name = format!("//storage.googleapis.com/{}", request.bucket);
85        let (span, pending) = gaxi::client_request_signals!(
86        metric: self.metric.clone(),
87        info: *INSTRUMENTATION,
88        method: "client::Storage::read_object",
89        async {
90            if let Some(recorder) = RequestRecorder::current() {
91                recorder.on_client_request(
92                    ClientRequestAttributes::default()
93                        .set_url_template("/storage/v1/b/{bucket}/o/{object}")
94                        .set_resource_name(resource_name),
95                );
96            }
97            self.read_object_plain(request, options).await
98        });
99
100        let response = pending.await?;
101        let inner = TracingResponse::new(response.into_parts(), span);
102        Ok(ReadObjectResponse::new(Box::new(inner)))
103    }
104
105    async fn write_object_buffered_plain<P>(
106        &self,
107        payload: P,
108        request: WriteObjectRequest,
109        options: RequestOptions,
110    ) -> Result<Object>
111    where
112        P: StreamingSource + Send + Sync + 'static,
113    {
114        PerformUpload::new(
115            payload,
116            self.inner.clone(),
117            request.spec,
118            request.params,
119            options,
120        )
121        .send()
122        .await
123    }
124
125    #[tracing::instrument(name = "write_object_buffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
126    async fn write_object_buffered_tracing<P>(
127        &self,
128        payload: P,
129        request: WriteObjectRequest,
130        options: RequestOptions,
131    ) -> Result<Object>
132    where
133        P: StreamingSource + Send + Sync + 'static,
134    {
135        let resource_name = format!(
136            "//storage.googleapis.com/{}",
137            request
138                .spec
139                .resource
140                .as_ref()
141                .map(|r| r.bucket.as_str())
142                .unwrap_or_default()
143        );
144        let (_span, pending) = gaxi::client_request_signals!(
145            metric: self.metric.clone(),
146            info: *INSTRUMENTATION,
147            method: "client::Storage::write_object",
148            async {
149                if let Some(recorder) = RequestRecorder::current() {
150                    recorder.on_client_request(
151                        ClientRequestAttributes::default()
152                            .set_url_template("/upload/storage/v1/b/{bucket}/o")
153                            .set_resource_name(resource_name),
154                    );
155                }
156                self.write_object_buffered_plain(payload, request, options).await
157            }
158        );
159        pending.await
160    }
161
162    async fn write_object_unbuffered_plain<P>(
163        &self,
164        payload: P,
165        request: WriteObjectRequest,
166        options: RequestOptions,
167    ) -> Result<Object>
168    where
169        P: StreamingSource + Seek + Send + Sync + 'static,
170    {
171        PerformUpload::new(
172            payload,
173            self.inner.clone(),
174            request.spec,
175            request.params,
176            options,
177        )
178        .send_unbuffered()
179        .await
180    }
181
182    #[tracing::instrument(name = "write_object_unbuffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
183    async fn write_object_unbuffered_tracing<P>(
184        &self,
185        payload: P,
186        request: WriteObjectRequest,
187        options: RequestOptions,
188    ) -> Result<Object>
189    where
190        P: StreamingSource + Seek + Send + Sync + 'static,
191    {
192        let resource_name = format!(
193            "//storage.googleapis.com/{}",
194            request
195                .spec
196                .resource
197                .as_ref()
198                .map(|r| r.bucket.as_str())
199                .unwrap_or_default()
200        );
201        let (_span, pending) = gaxi::client_request_signals!(
202            metric: self.metric.clone(),
203            info: *INSTRUMENTATION,
204            method: "client::Storage::write_object",
205            async {
206                if let Some(recorder) = RequestRecorder::current() {
207                    recorder.on_client_request(
208                        ClientRequestAttributes::default()
209                            .set_url_template("/upload/storage/v1/b/{bucket}/o")
210                            .set_resource_name(resource_name),
211                    );
212                }
213                self.write_object_unbuffered_plain(payload, request, options).await
214            }
215        );
216        pending.await
217    }
218
219    async fn open_object_plain(
220        &self,
221        request: OpenObjectRequest,
222        options: RequestOptions,
223    ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
224        let (spec, ranges) = request.into_parts();
225        let connector = Connector::new(spec, options, self.inner.grpc.clone());
226        let (transport, readers) = ObjectDescriptorTransport::new(connector, ranges).await?;
227        Ok((ObjectDescriptor::new(transport), readers))
228    }
229
230    #[tracing::instrument(name = "open_object", level = tracing::Level::DEBUG, ret, err(Debug))]
231    async fn open_object_tracing(
232        &self,
233        request: OpenObjectRequest,
234        options: RequestOptions,
235    ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
236        let resource_name = format!("//storage.googleapis.com/{}", request.bucket);
237        let (span, pending) = gaxi::client_request_signals!(
238            metric: self.metric.clone(),
239            info: *INSTRUMENTATION,
240            method: "client::Storage::open_object",
241            async {
242                if let Some(recorder) = RequestRecorder::current() {
243                    recorder.on_client_request(
244                        ClientRequestAttributes::default()
245                            .set_rpc_method("google.storage.v2.Storage/BidiStreamingRead")
246                            .set_url_template("/upload/storage/v1/b/{bucket}/o")
247                            .set_resource_name(resource_name),
248                    );
249                }
250                self.open_object_plain(request, options).await
251            }
252        );
253        let (descriptor, readers) = pending.await?;
254        let descriptor =
255            ObjectDescriptor::new(TracingObjectDescriptor::new(descriptor.into_parts()));
256        let readers = readers
257            .into_iter()
258            .map(|r| {
259                let inner = r.into_parts();
260                ReadObjectResponse::new(Box::new(TracingResponse::new(inner, span.clone())))
261            })
262            .collect::<Vec<_>>();
263        Ok((descriptor, readers))
264    }
265}
266
267impl super::stub::Storage for Storage {
268    /// Implements [crate::client::Storage::read_object].
269    async fn read_object(
270        &self,
271        req: ReadObjectRequest,
272        options: RequestOptions,
273    ) -> Result<ReadObjectResponse> {
274        if self.tracing {
275            return self.read_object_tracing(req, options).await;
276        }
277        self.read_object_plain(req, options).await
278    }
279
280    /// Implements [crate::client::Storage::write_object].
281    async fn write_object_buffered<P>(
282        &self,
283        payload: P,
284        req: WriteObjectRequest,
285        options: RequestOptions,
286    ) -> Result<Object>
287    where
288        P: StreamingSource + Send + Sync + 'static,
289    {
290        if self.tracing {
291            return self
292                .write_object_buffered_tracing(payload, req, options)
293                .await;
294        }
295        self.write_object_buffered_plain(payload, req, options)
296            .await
297    }
298
299    /// Implements [crate::client::Storage::write_object].
300    async fn write_object_unbuffered<P>(
301        &self,
302        payload: P,
303        req: WriteObjectRequest,
304        options: RequestOptions,
305    ) -> Result<Object>
306    where
307        P: StreamingSource + Seek + Send + Sync + 'static,
308    {
309        if self.tracing {
310            return self
311                .write_object_unbuffered_tracing(payload, req, options)
312                .await;
313        }
314        self.write_object_unbuffered_plain(payload, req, options)
315            .await
316    }
317
318    async fn open_object(
319        &self,
320        request: OpenObjectRequest,
321        options: RequestOptions,
322    ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
323        if self.tracing {
324            return self.open_object_tracing(request, options).await;
325        }
326        self.open_object_plain(request, options).await
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::{Storage, StorageInner};
333    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
334    use google_cloud_test_utils::test_layer::AttributeValue;
335    use google_cloud_test_utils::test_layer::{CapturedSpan, TestLayer};
336    use httptest::{Expectation, Server, matchers::*, responders::status_code};
337    use pretty_assertions::assert_eq;
338    use std::collections::BTreeMap;
339    use std::sync::Arc;
340
341    impl Storage {
342        pub(crate) fn new_test(inner: Arc<StorageInner>) -> Arc<Self> {
343            Self::new(inner, false)
344        }
345    }
346
347    #[tokio::test]
348    async fn read_object() -> anyhow::Result<()> {
349        let guard = TestLayer::initialize();
350
351        let server = Server::run();
352        server.expect(
353            Expectation::matching(all_of![
354                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
355                request::query(url_decoded(contains(("alt", "media")))),
356            ])
357            .respond_with(status_code(404)),
358        );
359
360        let client = crate::client::Storage::builder()
361            .with_endpoint(format!("http://{}", server.addr()))
362            .with_credentials(Anonymous::new().build())
363            .with_tracing()
364            .build()
365            .await?;
366        let response = client
367            .read_object("projects/_/buckets/test-bucket", "test-object")
368            .send()
369            .await;
370        assert!(
371            matches!(response, Err(ref e) if e.is_transport()),
372            "{response:?}"
373        );
374
375        let captured = TestLayer::capture(&guard);
376        check_debug_log(&captured, "read_object");
377
378        client_request_span(&captured, "read_object", "404", "http");
379
380        Ok(())
381    }
382
383    #[tokio::test]
384    async fn read_object_success() -> anyhow::Result<()> {
385        let guard = TestLayer::initialize();
386
387        let body = (0..100_000)
388            .map(|i| format!("{i:08} {:1000}", ""))
389            .collect::<Vec<_>>()
390            .join("\n");
391        let server = Server::run();
392        server.expect(
393            Expectation::matching(all_of![
394                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
395                request::query(url_decoded(contains(("alt", "media")))),
396            ])
397            .respond_with(
398                status_code(200)
399                    .body(body.clone())
400                    .append_header("x-goog-generation", 123456),
401            ),
402        );
403
404        let client = crate::client::Storage::builder()
405            .with_endpoint(format!("http://{}", server.addr()))
406            .with_credentials(Anonymous::new().build())
407            .with_tracing()
408            .build()
409            .await?;
410        let mut got = Vec::new();
411        let mut response = client
412            .read_object("projects/_/buckets/test-bucket", "test-object")
413            .send()
414            .await?;
415        let object = response.object();
416        assert_eq!(object.generation, 123456, "{object:?}");
417        while let Some(b) = response.next().await.transpose()? {
418            got.push(b);
419        }
420
421        let captured = TestLayer::capture(&guard);
422        let span = captured
423            .iter()
424            .find(|s| s.name == "client_request")
425            .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
426        // The span counts one more event: the EOF
427        assert_eq!(span.events, got.len() + 1, "{span:?}");
428
429        Ok(())
430    }
431
432    #[tokio::test]
433    async fn write_object_buffered() -> anyhow::Result<()> {
434        let guard = TestLayer::initialize();
435
436        let server = Server::run();
437        server.expect(
438            Expectation::matching(all_of![
439                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
440                request::query(url_decoded(contains(("uploadType", "multipart")))),
441            ])
442            .respond_with(status_code(404)),
443        );
444
445        let client = crate::client::Storage::builder()
446            .with_endpoint(format!("http://{}", server.addr()))
447            .with_credentials(Anonymous::new().build())
448            .with_tracing()
449            .build()
450            .await?;
451        let response = client
452            .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
453            .send_buffered()
454            .await;
455        assert!(
456            matches!(response, Err(ref e) if e.is_transport()),
457            "{response:?}"
458        );
459
460        let captured = TestLayer::capture(&guard);
461        check_debug_log(&captured, "write_object_buffered");
462
463        client_request_span(&captured, "write_object", "404", "http");
464
465        Ok(())
466    }
467
468    #[tokio::test]
469    async fn write_object_unbuffered() -> anyhow::Result<()> {
470        let guard = TestLayer::initialize();
471
472        let server = Server::run();
473        server.expect(
474            Expectation::matching(all_of![
475                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
476                request::query(url_decoded(contains(("uploadType", "multipart")))),
477            ])
478            .respond_with(status_code(404)),
479        );
480
481        let client = crate::client::Storage::builder()
482            .with_endpoint(format!("http://{}", server.addr()))
483            .with_credentials(Anonymous::new().build())
484            .with_tracing()
485            .build()
486            .await?;
487        let response = client
488            .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
489            .send_unbuffered()
490            .await;
491        assert!(
492            matches!(response, Err(ref e) if e.is_transport()),
493            "{response:?}"
494        );
495
496        let captured = TestLayer::capture(&guard);
497        check_debug_log(&captured, "write_object_unbuffered");
498
499        client_request_span(&captured, "write_object", "404", "http");
500
501        Ok(())
502    }
503
504    #[tokio::test]
505    async fn open_object() -> anyhow::Result<()> {
506        use gaxi::grpc::tonic::Status as TonicStatus;
507        use google_cloud_gax::error::rpc::Code;
508        use storage_grpc_mock::{MockStorage, start};
509
510        let guard = TestLayer::initialize();
511
512        let mut mock = MockStorage::new();
513        mock.expect_bidi_read_object()
514            .return_once(|_| Err(TonicStatus::not_found("not here")));
515        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
516
517        let client = crate::client::Storage::builder()
518            .with_credentials(Anonymous::new().build())
519            .with_endpoint(endpoint.clone())
520            .with_tracing()
521            .build()
522            .await?;
523        let response = client
524            .open_object("projects/_/buckets/test-bucket", "test-object")
525            .send()
526            .await;
527        assert!(
528            matches!(response, Err(ref e) if e.status().is_some_and(|s| s.code == Code::NotFound)),
529            "{response:?}"
530        );
531
532        let captured = TestLayer::capture(&guard);
533        check_debug_log(&captured, "open_object");
534
535        client_request_span(&captured, "open_object", "NOT_FOUND", "grpc");
536        Ok(())
537    }
538
539    #[tokio::test]
540    #[ignore = "flaky test, see #5290"]
541    async fn open_object_success() -> anyhow::Result<()> {
542        // TODO(#4772) - Move these `use` declarations and constants once the tracing APIs are stable.
543        use crate::model_ext::ReadRange;
544        use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
545        use storage_grpc_mock::google::storage::v2::{
546            BidiReadObjectResponse, ChecksummedData, Object as ProtoObject, ObjectRangeData,
547            ReadRange as ProtoRange,
548        };
549        use storage_grpc_mock::{MockStorage, start};
550        const BUCKET_NAME: &str = "projects/_/buckets/test-bucket";
551        const OBJECT_NAME: &str = "test-object";
552        const BIND_ADDRESS: &str = "0.0.0.0:0";
553        const PAYLOAD: &str = "the quick brown fox jumps over the lazy dog";
554
555        let guard = TestLayer::initialize();
556
557        let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(10);
558        let response = BidiReadObjectResponse {
559            metadata: Some(ProtoObject {
560                bucket: BUCKET_NAME.to_string(),
561                name: OBJECT_NAME.to_string(),
562                generation: 123456,
563                ..ProtoObject::default()
564            }),
565            object_data_ranges: vec![ObjectRangeData {
566                read_range: Some(ProtoRange {
567                    read_id: 0_i64,
568                    ..ProtoRange::default()
569                }),
570                range_end: true,
571                checksummed_data: Some(ChecksummedData {
572                    content: PAYLOAD.as_bytes().to_vec(),
573                    crc32c: None,
574                }),
575            }],
576            ..BidiReadObjectResponse::default()
577        };
578        // This is the initial response.
579        tx.send(Ok(response.clone())).await?;
580        // These simulate the calls to ObjectDescriptor::read_range(). The data is wrong, but this
581        // test is about the spans.
582        tx.send(Ok(response.clone())).await?;
583        tx.send(Ok(response.clone())).await?;
584
585        let mut mock = MockStorage::new();
586        mock.expect_bidi_read_object()
587            .return_once(|_| Ok(TonicResponse::from(rx)));
588        let (endpoint, _server) = start(BIND_ADDRESS, mock).await?;
589
590        let client = crate::client::Storage::builder()
591            .with_credentials(Anonymous::new().build())
592            .with_endpoint(endpoint.clone())
593            .with_tracing()
594            .build()
595            .await?;
596        let (descriptor, _reader0) = client
597            .open_object(BUCKET_NAME, OBJECT_NAME)
598            .send_and_read(ReadRange::all())
599            .await?;
600        let _reader1 = descriptor.read_range(ReadRange::offset(5)).await;
601        let _reader2 = descriptor.read_range(ReadRange::segment(10, 10)).await;
602        let _reader3 = descriptor.read_range(ReadRange::tail(15)).await;
603
604        let captured = TestLayer::capture(&guard);
605        let _span = captured
606            .iter()
607            .find(|s| s.name == "client_request")
608            .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
609
610        let range_spans = captured
611            .iter()
612            .filter(|s| s.name == "read_range")
613            .collect::<Vec<_>>();
614
615        let _span_reader1 = range_spans
616            .clone()
617            .into_iter()
618            .find(|s| {
619                s.attributes
620                    .get("read_range.start")
621                    .and_then(|v| v.as_i64())
622                    == Some(5)
623            })
624            .unwrap_or_else(|| {
625                panic!("missing `read_range` span for ReadRange::offset(5): {range_spans:#?}")
626            });
627
628        let _span_reader2 = range_spans
629            .clone()
630            .into_iter()
631            .find(|s| {
632                s.attributes
633                    .get("read_range.start")
634                    .and_then(|v| v.as_i64())
635                    == Some(10)
636                    && s.attributes
637                        .get("read_range.limit")
638                        .and_then(|v| v.as_i64())
639                        == Some(10)
640            })
641            .unwrap_or_else(|| {
642                panic!("missing `read_range` span for ReadRange::segment(10, 10): {range_spans:#?}")
643            });
644
645        let _span_reader3 = range_spans
646            .clone()
647            .into_iter()
648            .find(|s| {
649                s.attributes
650                    .get("read_range.start")
651                    .and_then(|v| v.as_i64())
652                    == Some(-15)
653            })
654            .unwrap_or_else(|| {
655                panic!("missing `read_range` span for ReadRange::tail(15): {range_spans:#?}")
656            });
657        Ok(())
658    }
659
660    #[track_caller]
661    fn check_debug_log(captured: &Vec<CapturedSpan>, method: &'static str) {
662        let span = captured
663            .iter()
664            .find(|s| s.name == method)
665            .unwrap_or_else(|| panic!("missing `{method}` span in capture: {captured:#?}"));
666
667        let got = BTreeMap::from_iter(span.attributes.clone());
668        let want = ["self", "options", "request"];
669        let missing = want
670            .iter()
671            .filter(|k| !got.contains_key(**k))
672            .collect::<Vec<_>>();
673        assert!(
674            missing.is_empty(),
675            "missing = {missing:?}\ngot  = {:?}\nwant = {want:?}\nfull = {got:#?}",
676            got.keys().collect::<Vec<_>>(),
677        );
678    }
679
680    #[track_caller]
681    fn client_request_span(
682        captured: &Vec<CapturedSpan>,
683        method: &'static str,
684        error_type: &'static str,
685        rpc_system: &'static str,
686    ) {
687        let expected_attributes: [(&str, &str); 6] = [
688            ("otel.kind", "Internal"),
689            ("rpc.system.name", rpc_system),
690            ("otel.status_code", "ERROR"),
691            ("gcp.client.service", "storage"),
692            ("gcp.client.repo", "googleapis/google-cloud-rust"),
693            ("gcp.client.artifact", "google-cloud-storage"),
694        ];
695        let span = captured
696            .iter()
697            .find(|s| s.name == "client_request")
698            .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
699        let got = BTreeMap::from_iter(span.attributes.clone());
700        // This is a subset of the fields, but good enough to catch most
701        // mistakes. Recall that we use a macro, which is already tested.
702        let want = BTreeMap::<String, AttributeValue>::from_iter(
703            expected_attributes
704                .iter()
705                .map(|(k, v)| (k.to_string(), AttributeValue::from(*v)))
706                .chain(
707                    [
708                        (
709                            "otel.name",
710                            format!("google_cloud_storage::client::Storage::{method}").into(),
711                        ),
712                        ("error.type", error_type.into()),
713                    ]
714                    .map(|(k, v)| (k.to_string(), v)),
715                ),
716        );
717        let mismatch = want
718            .iter()
719            .filter(|(k, v)| !got.get(k.as_str()).is_some_and(|g| g == *v))
720            .collect::<Vec<_>>();
721        assert!(
722            mismatch.is_empty(),
723            "mismatch = {mismatch:?}\ngot      = {got:?}\nwant     = {want:?}"
724        );
725    }
726}