Skip to main content

google_cloud_storage/storage/
transport.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(google_cloud_unstable_tracing)]
16use super::tracing::TracingResponse;
17use crate::Result;
18use crate::model::{Object, ReadObjectRequest};
19use crate::model_ext::WriteObjectRequest;
20use crate::read_object::ReadObjectResponse;
21use crate::storage::client::StorageInner;
22#[cfg(google_cloud_unstable_tracing)]
23use crate::storage::info::INSTRUMENTATION;
24use crate::storage::perform_upload::PerformUpload;
25use crate::storage::read_object::Reader;
26use crate::storage::request_options::RequestOptions;
27use crate::storage::streaming_source::{Seek, StreamingSource};
28use crate::{
29    model_ext::OpenObjectRequest, object_descriptor::ObjectDescriptor,
30    storage::bidi::connector::Connector, storage::bidi::transport::ObjectDescriptorTransport,
31};
32#[cfg(google_cloud_unstable_tracing)]
33use gaxi::observability::ResultExt;
34use std::sync::Arc;
35#[cfg(google_cloud_unstable_tracing)]
36use tracing::Instrument;
37
38/// An implementation of [`stub::Storage`][crate::storage::stub::Storage] that
39/// interacts with the Cloud Storage service.
40///
41/// This is the default implementation of a
42/// [`client::Storage<T>`][crate::storage::client::Storage].
43///
44/// ## Example
45///
46/// ```
47/// # async fn sample() -> anyhow::Result<()> {
48/// use google_cloud_storage::client::Storage;
49/// use google_cloud_storage::stub::DefaultStorage;
50/// let client: Storage<DefaultStorage> = Storage::builder().build().await?;
51/// # Ok(()) }
52/// ```
53#[derive(Clone, Debug)]
54pub struct Storage {
55    inner: Arc<StorageInner>,
56    tracing: bool,
57}
58
59impl Storage {
60    #[cfg(test)]
61    pub(crate) fn new_test(inner: Arc<StorageInner>) -> Arc<Self> {
62        Self::new(inner, false)
63    }
64
65    pub(crate) fn new(inner: Arc<StorageInner>, tracing: bool) -> Arc<Self> {
66        Arc::new(Self { inner, tracing })
67    }
68
69    async fn read_object_plain(
70        &self,
71        request: ReadObjectRequest,
72        options: RequestOptions,
73    ) -> Result<ReadObjectResponse> {
74        let reader = Reader {
75            inner: self.inner.clone(),
76            request,
77            options,
78        };
79        reader.response().await
80    }
81
82    #[tracing::instrument(name = "read_object", level = tracing::Level::DEBUG, ret, err(Debug))]
83    async fn read_object_tracing(
84        &self,
85        request: ReadObjectRequest,
86        options: RequestOptions,
87    ) -> Result<ReadObjectResponse> {
88        #[cfg(google_cloud_unstable_tracing)]
89        {
90            let span =
91                gaxi::client_request_span!("client::Storage", "read_object", &INSTRUMENTATION);
92            let response = self
93                .read_object_plain(request, options)
94                .instrument(span.clone())
95                .await
96                .record_in_span(&span)?;
97            let inner = TracingResponse::new(response.into_parts(), span);
98            let response = ReadObjectResponse::new(Box::new(inner));
99            Ok(response)
100        }
101        #[cfg(not(google_cloud_unstable_tracing))]
102        self.read_object_plain(request, options).await
103    }
104
105    async fn write_object_buffered_plain<P>(
106        &self,
107        payload: P,
108        request: WriteObjectRequest,
109        options: RequestOptions,
110    ) -> Result<Object>
111    where
112        P: StreamingSource + Send + Sync + 'static,
113    {
114        PerformUpload::new(
115            payload,
116            self.inner.clone(),
117            request.spec,
118            request.params,
119            options,
120        )
121        .send()
122        .await
123    }
124
125    #[tracing::instrument(name = "write_object_buffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
126    async fn write_object_buffered_tracing<P>(
127        &self,
128        payload: P,
129        request: WriteObjectRequest,
130        options: RequestOptions,
131    ) -> Result<Object>
132    where
133        P: StreamingSource + Send + Sync + 'static,
134    {
135        #[cfg(google_cloud_unstable_tracing)]
136        {
137            let span =
138                gaxi::client_request_span!("client::Storage", "write_object", &INSTRUMENTATION);
139            self.write_object_buffered_plain(payload, request, options)
140                .instrument(span.clone())
141                .await
142                .record_in_span(&span)
143        }
144        #[cfg(not(google_cloud_unstable_tracing))]
145        self.write_object_buffered_plain(payload, request, options)
146            .await
147    }
148
149    async fn write_object_unbuffered_plain<P>(
150        &self,
151        payload: P,
152        request: WriteObjectRequest,
153        options: RequestOptions,
154    ) -> Result<Object>
155    where
156        P: StreamingSource + Seek + Send + Sync + 'static,
157    {
158        PerformUpload::new(
159            payload,
160            self.inner.clone(),
161            request.spec,
162            request.params,
163            options,
164        )
165        .send_unbuffered()
166        .await
167    }
168
169    #[tracing::instrument(name = "write_object_unbuffered", level = tracing::Level::DEBUG, ret, err(Debug), skip(payload))]
170    async fn write_object_unbuffered_tracing<P>(
171        &self,
172        payload: P,
173        request: WriteObjectRequest,
174        options: RequestOptions,
175    ) -> Result<Object>
176    where
177        P: StreamingSource + Seek + Send + Sync + 'static,
178    {
179        #[cfg(google_cloud_unstable_tracing)]
180        {
181            let span =
182                gaxi::client_request_span!("client::Storage", "write_object", &INSTRUMENTATION);
183            self.write_object_unbuffered_plain(payload, request, options)
184                .instrument(span.clone())
185                .await
186                .record_in_span(&span)
187        }
188        #[cfg(not(google_cloud_unstable_tracing))]
189        self.write_object_unbuffered_plain(payload, request, options)
190            .await
191    }
192
193    async fn open_object_plain(
194        &self,
195        request: OpenObjectRequest,
196        options: RequestOptions,
197    ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
198        let (spec, ranges) = request.into_parts();
199        let connector = Connector::new(spec, options, self.inner.grpc.clone());
200        let (transport, readers) = ObjectDescriptorTransport::new(connector, ranges).await?;
201        Ok((ObjectDescriptor::new(transport), readers))
202    }
203
204    #[tracing::instrument(name = "open_object", level = tracing::Level::DEBUG, ret, err(Debug))]
205    async fn open_object_tracing(
206        &self,
207        request: OpenObjectRequest,
208        options: RequestOptions,
209    ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
210        #[cfg(google_cloud_unstable_tracing)]
211        {
212            let span =
213                gaxi::client_request_span!("client::Storage", "open_object", &INSTRUMENTATION);
214            // TODO(#3178) - wrap descriptor and responses with tracing decorators.
215            self.open_object_plain(request, options)
216                .instrument(span.clone())
217                .await
218                .record_in_span(&span)
219        }
220        #[cfg(not(google_cloud_unstable_tracing))]
221        self.open_object_plain(request, options).await
222    }
223}
224
225impl super::stub::Storage for Storage {
226    /// Implements [crate::client::Storage::read_object].
227    async fn read_object(
228        &self,
229        req: ReadObjectRequest,
230        options: RequestOptions,
231    ) -> Result<ReadObjectResponse> {
232        if self.tracing {
233            return self.read_object_tracing(req, options).await;
234        }
235        self.read_object_plain(req, options).await
236    }
237
238    /// Implements [crate::client::Storage::write_object].
239    async fn write_object_buffered<P>(
240        &self,
241        payload: P,
242        req: WriteObjectRequest,
243        options: RequestOptions,
244    ) -> Result<Object>
245    where
246        P: StreamingSource + Send + Sync + 'static,
247    {
248        if self.tracing {
249            return self
250                .write_object_buffered_tracing(payload, req, options)
251                .await;
252        }
253        self.write_object_buffered_plain(payload, req, options)
254            .await
255    }
256
257    /// Implements [crate::client::Storage::write_object].
258    async fn write_object_unbuffered<P>(
259        &self,
260        payload: P,
261        req: WriteObjectRequest,
262        options: RequestOptions,
263    ) -> Result<Object>
264    where
265        P: StreamingSource + Seek + Send + Sync + 'static,
266    {
267        if self.tracing {
268            return self
269                .write_object_unbuffered_tracing(payload, req, options)
270                .await;
271        }
272        self.write_object_unbuffered_plain(payload, req, options)
273            .await
274    }
275
276    async fn open_object(
277        &self,
278        request: OpenObjectRequest,
279        options: RequestOptions,
280    ) -> Result<(ObjectDescriptor, Vec<ReadObjectResponse>)> {
281        if self.tracing {
282            return self.open_object_tracing(request, options).await;
283        }
284        self.open_object_plain(request, options).await
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    #[cfg(google_cloud_unstable_tracing)]
291    use gaxi::observability::attributes::{
292        GCP_CLIENT_LANGUAGE_RUST, OTEL_KIND_INTERNAL, RPC_SYSTEM_HTTP, keys::*,
293    };
294    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
295    #[cfg(google_cloud_unstable_tracing)]
296    use google_cloud_test_utils::test_layer::AttributeValue;
297    use google_cloud_test_utils::test_layer::{CapturedSpan, TestLayer};
298    use httptest::{Expectation, Server, matchers::*, responders::status_code};
299    use std::collections::BTreeMap;
300
301    #[tokio::test]
302    async fn read_object() -> anyhow::Result<()> {
303        let guard = TestLayer::initialize();
304
305        let server = Server::run();
306        server.expect(
307            Expectation::matching(all_of![
308                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
309                request::query(url_decoded(contains(("alt", "media")))),
310            ])
311            .respond_with(status_code(404)),
312        );
313
314        let client = crate::client::Storage::builder()
315            .with_endpoint(format!("http://{}", server.addr()))
316            .with_credentials(Anonymous::new().build())
317            .with_tracing()
318            .build()
319            .await?;
320        let response = client
321            .read_object("projects/_/buckets/test-bucket", "test-object")
322            .send()
323            .await;
324        assert!(
325            matches!(response, Err(ref e) if e.is_transport()),
326            "{response:?}"
327        );
328
329        let captured = TestLayer::capture(&guard);
330        check_debug_log(&captured, "read_object");
331
332        #[cfg(google_cloud_unstable_tracing)]
333        client_request_span(&captured, "read_object", "404");
334
335        Ok(())
336    }
337
338    #[cfg(google_cloud_unstable_tracing)]
339    #[tokio::test]
340    async fn read_object_success() -> anyhow::Result<()> {
341        let guard = TestLayer::initialize();
342
343        let body = (0..100_000)
344            .map(|i| format!("{i:08} {:1000}", ""))
345            .collect::<Vec<_>>()
346            .join("\n");
347        let server = Server::run();
348        server.expect(
349            Expectation::matching(all_of![
350                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
351                request::query(url_decoded(contains(("alt", "media")))),
352            ])
353            .respond_with(
354                status_code(200)
355                    .body(body.clone())
356                    .append_header("x-goog-generation", 123456),
357            ),
358        );
359
360        let client = crate::client::Storage::builder()
361            .with_endpoint(format!("http://{}", server.addr()))
362            .with_credentials(Anonymous::new().build())
363            .with_tracing()
364            .build()
365            .await?;
366        let mut got = Vec::new();
367        let mut response = client
368            .read_object("projects/_/buckets/test-bucket", "test-object")
369            .send()
370            .await?;
371        let object = response.object();
372        assert_eq!(object.generation, 123456, "{object:?}");
373        while let Some(b) = response.next().await.transpose()? {
374            got.push(b);
375        }
376
377        let captured = TestLayer::capture(&guard);
378        let span = captured
379            .iter()
380            .find(|s| s.name == "client_request")
381            .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
382        // The span counts one more event: the EOF
383        assert_eq!(span.events, got.len() + 1, "{span:?}");
384
385        Ok(())
386    }
387
388    #[tokio::test]
389    async fn write_object_buffered() -> anyhow::Result<()> {
390        let guard = TestLayer::initialize();
391
392        let server = Server::run();
393        server.expect(
394            Expectation::matching(all_of![
395                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
396                request::query(url_decoded(contains(("uploadType", "multipart")))),
397            ])
398            .respond_with(status_code(404)),
399        );
400
401        let client = crate::client::Storage::builder()
402            .with_endpoint(format!("http://{}", server.addr()))
403            .with_credentials(Anonymous::new().build())
404            .with_tracing()
405            .build()
406            .await?;
407        let response = client
408            .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
409            .send_buffered()
410            .await;
411        assert!(
412            matches!(response, Err(ref e) if e.is_transport()),
413            "{response:?}"
414        );
415
416        let captured = TestLayer::capture(&guard);
417        check_debug_log(&captured, "write_object_buffered");
418
419        #[cfg(google_cloud_unstable_tracing)]
420        client_request_span(&captured, "write_object", "404");
421
422        Ok(())
423    }
424
425    #[tokio::test]
426    async fn write_object_unbuffered() -> anyhow::Result<()> {
427        let guard = TestLayer::initialize();
428
429        let server = Server::run();
430        server.expect(
431            Expectation::matching(all_of![
432                request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
433                request::query(url_decoded(contains(("uploadType", "multipart")))),
434            ])
435            .respond_with(status_code(404)),
436        );
437
438        let client = crate::client::Storage::builder()
439            .with_endpoint(format!("http://{}", server.addr()))
440            .with_credentials(Anonymous::new().build())
441            .with_tracing()
442            .build()
443            .await?;
444        let response = client
445            .write_object("projects/_/buckets/test-bucket", "test-object", "payload")
446            .send_unbuffered()
447            .await;
448        assert!(
449            matches!(response, Err(ref e) if e.is_transport()),
450            "{response:?}"
451        );
452
453        let captured = TestLayer::capture(&guard);
454        check_debug_log(&captured, "write_object_unbuffered");
455
456        #[cfg(google_cloud_unstable_tracing)]
457        client_request_span(&captured, "write_object", "404");
458
459        Ok(())
460    }
461
462    #[tokio::test]
463    async fn open_object() -> anyhow::Result<()> {
464        use gaxi::grpc::tonic::Status as TonicStatus;
465        use google_cloud_gax::error::rpc::Code;
466        use storage_grpc_mock::{MockStorage, start};
467
468        let guard = TestLayer::initialize();
469
470        let mut mock = MockStorage::new();
471        mock.expect_bidi_read_object()
472            .return_once(|_| Err(TonicStatus::not_found("not here")));
473        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
474
475        let client = crate::client::Storage::builder()
476            .with_credentials(Anonymous::new().build())
477            .with_endpoint(endpoint.clone())
478            .with_tracing()
479            .build()
480            .await?;
481        let response = client
482            .open_object("projects/_/buckets/test-bucket", "test-object")
483            .send()
484            .await;
485        assert!(
486            matches!(response, Err(ref e) if e.status().is_some_and(|s| s.code == Code::NotFound)),
487            "{response:?}"
488        );
489
490        let captured = TestLayer::capture(&guard);
491        check_debug_log(&captured, "open_object");
492
493        #[cfg(google_cloud_unstable_tracing)]
494        client_request_span(&captured, "open_object", "NOT_FOUND");
495        Ok(())
496    }
497
498    #[track_caller]
499    fn check_debug_log(captured: &Vec<CapturedSpan>, method: &'static str) {
500        let span = captured
501            .iter()
502            .find(|s| s.name == method)
503            .unwrap_or_else(|| panic!("missing `{method}` span in capture: {captured:#?}"));
504
505        let got = BTreeMap::from_iter(span.attributes.clone());
506        let want = ["self", "options", "request"];
507        let missing = want
508            .iter()
509            .filter(|k| !got.contains_key(**k))
510            .collect::<Vec<_>>();
511        assert!(
512            missing.is_empty(),
513            "missing = {missing:?}\ngot  = {:?}\nwant = {want:?}\nfull = {got:#?}",
514            got.keys().collect::<Vec<_>>(),
515        );
516    }
517
518    #[cfg(google_cloud_unstable_tracing)]
519    #[track_caller]
520    fn client_request_span(
521        captured: &Vec<CapturedSpan>,
522        method: &'static str,
523        error_type: &'static str,
524    ) {
525        const EXPECTED_ATTRIBUTES: [(&str, &str); 8] = [
526            (OTEL_KIND, OTEL_KIND_INTERNAL),
527            (RPC_SYSTEM, RPC_SYSTEM_HTTP),
528            (RPC_SERVICE, "storage"),
529            (OTEL_STATUS_CODE, "ERROR"),
530            (GCP_CLIENT_SERVICE, "storage"),
531            (GCP_CLIENT_REPO, "googleapis/google-cloud-rust"),
532            (GCP_CLIENT_ARTIFACT, "google-cloud-storage"),
533            (GCP_CLIENT_LANGUAGE, GCP_CLIENT_LANGUAGE_RUST),
534        ];
535        let span = captured
536            .iter()
537            .find(|s| s.name == "client_request")
538            .unwrap_or_else(|| panic!("missing `client_request` span in capture: {captured:#?}"));
539        let got = BTreeMap::from_iter(span.attributes.clone());
540        // This is a subset of the fields, but good enough to catch most
541        // mistakes. Recall that we use a macro, which is already tested.
542        let want = BTreeMap::<String, AttributeValue>::from_iter(
543            EXPECTED_ATTRIBUTES
544                .iter()
545                .map(|(k, v)| (k.to_string(), AttributeValue::from(*v)))
546                .chain(
547                    [
548                        ("gax.client.span", true.into()),
549                        (
550                            OTEL_NAME,
551                            format!("google_cloud_storage::client::Storage::{method}").into(),
552                        ),
553                        (RPC_METHOD, method.into()),
554                        (ERROR_TYPE, error_type.into()),
555                    ]
556                    .map(|(k, v)| (k.to_string(), v)),
557                ),
558        );
559        let mismatch = want
560            .iter()
561            .filter(|(k, v)| !got.get(k.as_str()).is_some_and(|g| g == *v))
562            .collect::<Vec<_>>();
563        assert!(
564            mismatch.is_empty(),
565            "mismatch = {mismatch:?}\ngot      = {got:?}\nwant     = {want:?}"
566        );
567    }
568}