google_cloud_storage/storage/client.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::request_options::RequestOptions;
16use crate::builder::storage::ReadObject;
17use crate::builder::storage::WriteObject;
18use crate::read_resume_policy::ReadResumePolicy;
19use crate::storage::bidi::OpenObject;
20use crate::storage::common_options::CommonOptions;
21use crate::streaming_source::Payload;
22use base64::Engine;
23use base64::prelude::BASE64_STANDARD;
24use gaxi::http::HttpRequestBuilder;
25use gaxi::options::{ClientConfig, Credentials};
26use google_cloud_auth::credentials::Builder as CredentialsBuilder;
27use google_cloud_gax::client_builder::{Error as BuilderError, Result as BuilderResult};
28use std::sync::Arc;
29
30/// Implements a client for the Cloud Storage API.
31///
32/// # Example
33/// ```
34/// # async fn sample() -> anyhow::Result<()> {
35/// # use google_cloud_storage::client::Storage;
36/// let client = Storage::builder().build().await?;
37/// // use `client` to make requests to Cloud Storage.
38/// # Ok(()) }
39/// ```
40///
41/// # Configuration
42///
43/// To configure `Storage` use the `with_*` methods in the type returned
44/// by [builder()][Storage::builder]. The default configuration should
45/// work for most applications. Common configuration changes include
46///
47/// * [with_endpoint()]: by default this client uses the global default endpoint
48/// (`https://storage.googleapis.com`). Applications using regional
49/// endpoints or running in restricted networks (e.g. a network configured
50/// with [Private Google Access with VPC Service Controls]) may want to
51/// override this default.
52/// * [with_credentials()]: by default this client uses
53/// [Application Default Credentials]. Applications using custom
54/// authentication may need to override this default.
55///
56/// # Pooling and Cloning
57///
58/// `Storage` holds a connection pool internally, it is advised to
59/// create one and then reuse it. You do not need to wrap `Storage` in
60/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc`
61/// internally.
62///
63/// # Service Description
64///
65/// The Cloud Storage API allows applications to read and write data through
66/// the abstractions of buckets and objects. For a description of these
67/// abstractions please see <https://cloud.google.com/storage/docs>.
68///
69/// Resources are named as follows:
70///
71/// - Projects are referred to as they are defined by the Resource Manager API,
72/// using strings like `projects/123456` or `projects/my-string-id`.
73///
74/// - Buckets are named using string names of the form:
75/// `projects/{project}/buckets/{bucket}`
76/// For globally unique buckets, `_` may be substituted for the project.
77///
78/// - Objects are uniquely identified by their name along with the name of the
79/// bucket they belong to, as separate strings in this API. For example:
80/// ```no_rust
81/// bucket = "projects/_/buckets/my-bucket"
82/// object = "my-object/with/a/folder-like/name"
83/// ```
84/// Note that object names can contain `/` characters, which are treated as
85/// any other character (no special directory semantics).
86///
87/// [with_endpoint()]: ClientBuilder::with_endpoint
88/// [with_credentials()]: ClientBuilder::with_credentials
89/// [Private Google Access with VPC Service Controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
90/// [Application Default Credentials]: https://cloud.google.com/docs/authentication#adc
91#[derive(Clone, Debug)]
92pub struct Storage<S = crate::stub::DefaultStorage>
93where
94 S: crate::stub::Storage + 'static,
95{
96 stub: std::sync::Arc<S>,
97 options: RequestOptions,
98}
99
100#[derive(Clone, Debug)]
101pub(crate) struct StorageInner {
102 pub client: gaxi::http::ReqwestClient,
103 pub options: RequestOptions,
104 pub grpc: gaxi::grpc::Client,
105}
106
107impl Storage {
108 /// Returns a builder for [Storage].
109 ///
110 /// # Example
111 /// ```
112 /// # use google_cloud_storage::client::Storage;
113 /// # async fn sample() -> anyhow::Result<()> {
114 /// let client = Storage::builder().build().await?;
115 /// # Ok(()) }
116 /// ```
117 pub fn builder() -> ClientBuilder {
118 ClientBuilder::new()
119 }
120}
121
122impl<S> Storage<S>
123where
124 S: crate::storage::stub::Storage + 'static,
125{
126 /// Creates a new client from the provided stub.
127 ///
128 /// The most common case for calling this function is in tests mocking the
129 /// client's behavior.
130 pub fn from_stub(stub: impl Into<std::sync::Arc<S>>) -> Self {
131 Self {
132 stub: stub.into(),
133 options: RequestOptions::new(),
134 }
135 }
136
137 /// Write an object with data from any data source.
138 ///
139 /// # Example
140 /// ```
141 /// # use google_cloud_storage::client::Storage;
142 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
143 /// let response = client
144 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
145 /// .send_buffered()
146 /// .await?;
147 /// println!("response details={response:?}");
148 /// # Ok(()) }
149 /// ```
150 ///
151 /// # Example
152 /// ```
153 /// # use google_cloud_storage::client::Storage;
154 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
155 /// let response = client
156 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
157 /// .send_unbuffered()
158 /// .await?;
159 /// println!("response details={response:?}");
160 /// # Ok(()) }
161 /// ```
162 ///
163 /// You can use many different types as the payload. For example, a string,
164 /// a [bytes::Bytes], a [tokio::fs::File], or a custom type that implements
165 /// the [StreamingSource] trait.
166 ///
167 /// If your data source also implements [Seek], prefer [send_unbuffered()]
168 /// to start the write. Otherwise use [send_buffered()].
169 ///
170 /// # Parameters
171 /// * `bucket` - the bucket name containing the object. In
172 /// `projects/_/buckets/{bucket_id}` format.
173 /// * `object` - the object name.
174 /// * `payload` - the object data.
175 ///
176 /// [Seek]: crate::streaming_source::Seek
177 /// [StreamingSource]: crate::streaming_source::StreamingSource
178 /// [send_buffered()]: crate::builder::storage::WriteObject::send_buffered
179 /// [send_unbuffered()]: crate::builder::storage::WriteObject::send_unbuffered
180 pub fn write_object<B, O, T, P>(&self, bucket: B, object: O, payload: T) -> WriteObject<P, S>
181 where
182 B: Into<String>,
183 O: Into<String>,
184 T: Into<Payload<P>>,
185 {
186 WriteObject::new(
187 self.stub.clone(),
188 bucket,
189 object,
190 payload,
191 self.options.clone(),
192 )
193 }
194
195 /// Reads the contents of an object.
196 ///
197 /// # Example
198 /// ```
199 /// # use google_cloud_storage::client::Storage;
200 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
201 /// let mut resp = client
202 /// .read_object("projects/_/buckets/my-bucket", "my-object")
203 /// .send()
204 /// .await?;
205 /// let mut contents = Vec::new();
206 /// while let Some(chunk) = resp.next().await.transpose()? {
207 /// contents.extend_from_slice(&chunk);
208 /// }
209 /// println!("object contents={:?}", bytes::Bytes::from_owner(contents));
210 /// # Ok(()) }
211 /// ```
212 ///
213 /// # Parameters
214 /// * `bucket` - the bucket name containing the object. In
215 /// `projects/_/buckets/{bucket_id}` format.
216 /// * `object` - the object name.
217 pub fn read_object<B, O>(&self, bucket: B, object: O) -> ReadObject<S>
218 where
219 B: Into<String>,
220 O: Into<String>,
221 {
222 ReadObject::new(self.stub.clone(), bucket, object, self.options.clone())
223 }
224
225 /// Opens an object to read its contents using concurrent ranged reads.
226 ///
227 /// # Example
228 /// ```
229 /// # use google_cloud_storage::client::Storage;
230 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
231 /// use google_cloud_storage::model_ext::ReadRange;
232 /// let descriptor = client
233 /// .open_object("projects/_/buckets/my-bucket", "my-object")
234 /// .send()
235 /// .await?;
236 /// // Print the object metadata
237 /// println!("metadata = {:?}", descriptor.object());
238 /// // Read 2000 bytes starting at offset 1000.
239 /// let mut reader = descriptor.read_range(ReadRange::segment(1000, 2000)).await;
240 /// let mut contents = Vec::new();
241 /// while let Some(chunk) = reader.next().await.transpose()? {
242 /// contents.extend_from_slice(&chunk);
243 /// }
244 /// println!("range contents={:?}", bytes::Bytes::from_owner(contents));
245 /// // `descriptor` can be used to read more ranges, concurrently if needed.
246 /// # Ok(()) }
247 /// ```
248 ///
249 /// # Example: open and read in a single RPC
250 /// ```
251 /// # use google_cloud_storage::client::Storage;
252 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
253 /// use google_cloud_storage::model_ext::ReadRange;
254 /// let (descriptor, mut reader) = client
255 /// .open_object("projects/_/buckets/my-bucket", "my-object")
256 /// .send_and_read(ReadRange::segment(1000, 2000))
257 /// .await?;
258 /// // `descriptor` can be used to read more ranges.
259 /// # Ok(()) }
260 /// ```
261 ///
262 /// <div class="warning">
263 /// The APIs used by this method are only enabled for some projects and
264 /// buckets. Contact your account team to enable this API.
265 /// </div>
266 ///
267 /// # Parameters
268 /// * `bucket` - the bucket name containing the object. In
269 /// `projects/_/buckets/{bucket_id}` format.
270 /// * `object` - the object name.
271 pub fn open_object<B, O>(&self, bucket: B, object: O) -> OpenObject<S>
272 where
273 B: Into<String>,
274 O: Into<String>,
275 {
276 OpenObject::new(self.stub.clone(), bucket, object, self.options.clone())
277 }
278}
279
280impl Storage {
281 pub(crate) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
282 let tracing = builder.config.tracing;
283 let inner = StorageInner::from_parts(builder).await?;
284 let options = inner.options.clone();
285 let stub = crate::storage::transport::Storage::new(Arc::new(inner), tracing);
286 Ok(Self { stub, options })
287 }
288}
289
290impl StorageInner {
291 /// Builds a client assuming `config.cred` and `config.endpoint` are initialized, panics otherwise.
292 pub(self) fn new(
293 client: gaxi::http::ReqwestClient,
294 options: RequestOptions,
295 grpc: gaxi::grpc::Client,
296 ) -> Self {
297 Self {
298 client,
299 options,
300 grpc,
301 }
302 }
303
304 pub(self) async fn from_parts(builder: ClientBuilder) -> BuilderResult<Self> {
305 let (mut config, options) = builder.into_parts()?;
306 config.disable_automatic_decompression = true;
307 config.disable_follow_redirects = true;
308
309 let client = gaxi::http::ReqwestClient::new(config.clone(), super::DEFAULT_HOST).await?;
310 let client = if gaxi::options::tracing_enabled(&config) {
311 client.with_instrumentation(&super::info::INSTRUMENTATION)
312 } else {
313 client
314 };
315 let grpc = if gaxi::options::tracing_enabled(&config) {
316 gaxi::grpc::Client::new_with_instrumentation(
317 config,
318 super::DEFAULT_HOST,
319 &super::info::INSTRUMENTATION,
320 )
321 .await?
322 } else {
323 gaxi::grpc::Client::new(config, super::DEFAULT_HOST).await?
324 };
325
326 let inner = StorageInner::new(client, options, grpc);
327 Ok(inner)
328 }
329}
330
331/// A builder for [Storage].
332///
333/// ```
334/// # use google_cloud_storage::client::Storage;
335/// # async fn sample() -> anyhow::Result<()> {
336/// let builder = Storage::builder();
337/// let client = builder
338/// .with_endpoint("https://storage.googleapis.com")
339/// .build()
340/// .await?;
341/// # Ok(()) }
342/// ```
343pub struct ClientBuilder {
344 // Common options for all clients (generated or not).
345 pub(crate) config: ClientConfig,
346 // Specific options for the storage client. `RequestOptions` also requires
347 // these, it makes sense to share them.
348 common_options: CommonOptions,
349}
350
351impl ClientBuilder {
352 pub(crate) fn new() -> Self {
353 let mut config = ClientConfig::default();
354 config.retry_policy = Some(Arc::new(crate::retry_policy::storage_default()));
355 config.backoff_policy = Some(Arc::new(crate::backoff_policy::default()));
356 {
357 let count = std::thread::available_parallelism().ok();
358 config.grpc_subchannel_count = Some(count.map(|x| x.get()).unwrap_or(1));
359 }
360 let common_options = CommonOptions::new();
361 Self {
362 config,
363 common_options,
364 }
365 }
366
367 /// Creates a new client.
368 ///
369 /// # Example
370 /// ```
371 /// # use google_cloud_storage::client::Storage;
372 /// # async fn sample() -> anyhow::Result<()> {
373 /// let client = Storage::builder().build().await?;
374 /// # Ok(()) }
375 /// ```
376 pub async fn build(self) -> BuilderResult<Storage> {
377 Storage::new(self).await
378 }
379
380 /// Sets the endpoint.
381 ///
382 /// # Example
383 /// ```
384 /// # use google_cloud_storage::client::Storage;
385 /// # async fn sample() -> anyhow::Result<()> {
386 /// let client = Storage::builder()
387 /// .with_endpoint("https://private.googleapis.com")
388 /// .build()
389 /// .await?;
390 /// # Ok(()) }
391 /// ```
392 pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
393 self.config.endpoint = Some(v.into());
394 self
395 }
396
397 /// Configure the universe domain.
398 ///
399 /// The universe domain is the default service domain for a given cloud universe.
400 /// The default value is "googleapis.com".
401 ///
402 /// # Example
403 /// ```
404 /// # use google_cloud_storage::client::Storage;
405 /// # async fn sample() -> anyhow::Result<()> {
406 /// let client = Storage::builder()
407 /// .with_universe_domain("googleapis.com")
408 /// .build()
409 /// .await?;
410 /// # Ok(()) }
411 /// ```
412 pub fn with_universe_domain<V: Into<String>>(mut self, v: V) -> Self {
413 self.config.universe_domain = Some(v.into());
414 self
415 }
416
417 /// Configures the authentication credentials.
418 ///
419 /// Google Cloud Storage requires authentication for most buckets. Use this
420 /// method to change the credentials used by the client. More information
421 /// about valid credentials types can be found in the [google-cloud-auth]
422 /// crate documentation.
423 ///
424 /// # Example
425 /// ```
426 /// # use google_cloud_storage::client::Storage;
427 /// # async fn sample() -> anyhow::Result<()> {
428 /// use google_cloud_auth::credentials::mds;
429 /// let client = Storage::builder()
430 /// .with_credentials(
431 /// mds::Builder::default()
432 /// .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
433 /// .build()?)
434 /// .build()
435 /// .await?;
436 /// # Ok(()) }
437 /// ```
438 ///
439 /// [google-cloud-auth]: https://docs.rs/google-cloud-auth
440 pub fn with_credentials<V: Into<Credentials>>(mut self, v: V) -> Self {
441 self.config.cred = Some(v.into());
442 self
443 }
444
445 /// Configure the retry policy.
446 ///
447 /// The client libraries can automatically retry operations that fail. The
448 /// retry policy controls what errors are considered retryable, sets limits
449 /// on the number of attempts or the time trying to make attempts.
450 ///
451 /// # Example
452 /// ```
453 /// # use google_cloud_storage::client::Storage;
454 /// # async fn sample() -> anyhow::Result<()> {
455 /// use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
456 /// let client = Storage::builder()
457 /// .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
458 /// .build()
459 /// .await?;
460 /// # Ok(()) }
461 /// ```
462 pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
463 mut self,
464 v: V,
465 ) -> Self {
466 self.config.retry_policy = Some(v.into().into());
467 self
468 }
469
470 /// Configure the retry backoff policy.
471 ///
472 /// The client libraries can automatically retry operations that fail. The
473 /// backoff policy controls how long to wait in between retry attempts.
474 ///
475 /// # Example
476 /// ```
477 /// # use google_cloud_storage::client::Storage;
478 /// # async fn sample() -> anyhow::Result<()> {
479 /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
480 /// use std::time::Duration;
481 /// let policy = ExponentialBackoff::default();
482 /// let client = Storage::builder()
483 /// .with_backoff_policy(policy)
484 /// .build()
485 /// .await?;
486 /// # Ok(()) }
487 /// ```
488 pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
489 mut self,
490 v: V,
491 ) -> Self {
492 self.config.backoff_policy = Some(v.into().into());
493 self
494 }
495
496 /// Configure the retry throttler.
497 ///
498 /// Advanced applications may want to configure a retry throttler to
499 /// [Address Cascading Failures] and when [Handling Overload] conditions.
500 /// The client libraries throttle their retry loop, using a policy to
501 /// control the throttling algorithm. Use this method to fine tune or
502 /// customize the default retry throtler.
503 ///
504 /// [Handling Overload]: https://sre.google/sre-book/handling-overload/
505 /// [Address Cascading Failures]: https://sre.google/sre-book/addressing-cascading-failures/
506 ///
507 /// # Example
508 /// ```
509 /// # use google_cloud_storage::client::Storage;
510 /// # async fn sample() -> anyhow::Result<()> {
511 /// use google_cloud_gax::retry_throttler::AdaptiveThrottler;
512 /// let client = Storage::builder()
513 /// .with_retry_throttler(AdaptiveThrottler::default())
514 /// .build()
515 /// .await?;
516 /// # Ok(()) }
517 /// ```
518 pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
519 mut self,
520 v: V,
521 ) -> Self {
522 self.config.retry_throttler = v.into().into();
523 self
524 }
525
526 /// Sets the payload size threshold to switch from single-shot to resumable uploads.
527 ///
528 /// # Example
529 /// ```
530 /// # use google_cloud_storage::client::Storage;
531 /// # async fn sample() -> anyhow::Result<()> {
532 /// let client = Storage::builder()
533 /// .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
534 /// .build()
535 /// .await?;
536 /// let response = client
537 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
538 /// .send_buffered()
539 /// .await?;
540 /// println!("response details={response:?}");
541 /// # Ok(()) }
542 /// ```
543 ///
544 /// The client library can write objects using [single-shot] or [resumable]
545 /// uploads. For small objects, single-shot uploads offer better
546 /// performance, as they require a single HTTP transfer. For larger objects,
547 /// the additional request latency is not significant, and resumable uploads
548 /// offer better recovery on errors.
549 ///
550 /// The library automatically selects resumable uploads when the payload is
551 /// equal to or larger than this option. For smaller writes the client
552 /// library uses single-shot uploads.
553 ///
554 /// The exact threshold depends on where the application is deployed and
555 /// destination bucket location with respect to where the application is
556 /// running. The library defaults should work well in most cases, but some
557 /// applications may benefit from fine-tuning.
558 ///
559 /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
560 /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
561 pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
562 self.common_options.resumable_upload_threshold = v.into();
563 self
564 }
565
566 /// Changes the buffer size for some resumable uploads.
567 ///
568 /// # Example
569 /// ```
570 /// # use google_cloud_storage::client::Storage;
571 /// # async fn sample() -> anyhow::Result<()> {
572 /// let client = Storage::builder()
573 /// .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
574 /// .build()
575 /// .await?;
576 /// let response = client
577 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
578 /// .send_buffered()
579 /// .await?;
580 /// println!("response details={response:?}");
581 /// # Ok(()) }
582 /// ```
583 ///
584 /// When performing [resumable uploads] from sources without [Seek] the
585 /// client library needs to buffer data in memory until it is persisted by
586 /// the service. Otherwise the data would be lost if the upload is
587 /// interrupted. Applications may want to tune this buffer size:
588 ///
589 /// - Use smaller buffer sizes to support more concurrent writes in the
590 /// same application.
591 /// - Use larger buffer sizes for better throughput. Sending many small
592 /// buffers stalls the writer until the client receives a successful
593 /// response from the service.
594 ///
595 /// Keep in mind that there are diminishing returns on using larger buffers.
596 ///
597 /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
598 /// [Seek]: crate::streaming_source::Seek
599 pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
600 self.common_options.resumable_upload_buffer_size = v.into();
601 self
602 }
603
604 /// Configure the resume policy for object reads.
605 ///
606 /// The Cloud Storage client library can automatically resume a read request
607 /// that is interrupted by a transient error. Applications may want to
608 /// limit the number of read attempts, or may wish to expand the type
609 /// of errors treated as retryable.
610 ///
611 /// # Example
612 /// ```
613 /// # use google_cloud_storage::client::Storage;
614 /// # async fn sample() -> anyhow::Result<()> {
615 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
616 /// let client = Storage::builder()
617 /// .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
618 /// .build()
619 /// .await?;
620 /// # Ok(()) }
621 /// ```
622 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
623 where
624 V: ReadResumePolicy + 'static,
625 {
626 self.common_options.read_resume_policy = Arc::new(v);
627 self
628 }
629
630 /// Configure the number of subchannels used by the client.
631 ///
632 /// # Example
633 /// ```
634 /// # use google_cloud_storage::client::Storage;
635 /// # async fn sample() -> anyhow::Result<()> {
636 /// // By default the client uses `count` subchannels.
637 /// let count = std::thread::available_parallelism()?.get();
638 /// let client = Storage::builder()
639 /// .with_grpc_subchannel_count(std::cmp::max(1, count / 2))
640 /// .build()
641 /// .await?;
642 /// # Ok(()) }
643 /// ```
644 ///
645 /// gRPC-based clients may exhibit high latency if many requests need to be
646 /// demuxed over a single HTTP/2 connection (often called a *subchannel* in gRPC).
647 /// Consider using more subchannels if your application makes many
648 /// concurrent requests. Consider using fewer subchannels if your
649 /// application needs the file descriptors for other purposes.
650 ///
651 /// Keep in mind that Google Cloud limits the number of concurrent RPCs in
652 /// a single connection to about 100.
653 pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
654 self.config.grpc_subchannel_count = Some(v);
655 self
656 }
657
658 /// Enables observability signals for the client.
659 ///
660 /// # Example
661 /// ```
662 /// # use google_cloud_storage::client::Storage;
663 /// # async fn sample() -> anyhow::Result<()> {
664 /// let client = Storage::builder()
665 /// .with_tracing()
666 /// .build()
667 /// .await?;
668 /// // For observing traces and logs, you must also enable a tracing subscriber in your `main` function,
669 /// // for example:
670 /// // tracing_subscriber::fmt::init();
671 /// // For observing metrics, you must also install an OpenTelemetry meter provider in your `main` function,
672 /// // for example:
673 /// // opentelemetry::global::set_meter_provider(provider.clone());
674 /// # Ok(()) }
675 /// ```
676 ///
677 /// <div class="warning">
678 ///
679 /// Observability signals at any level may contain sensitive data such as resource names (bucket
680 /// and object names), full URLs, and error messages.
681 ///
682 /// Before configuring subscribers or exporters for traces and logs, review the contents of the
683 /// spans and consult the [tracing] framework documentation to set up filters and formatters to
684 /// prevent leaking sensitive information, depending on your intended use case.
685 ///
686 /// [OpenTelemetry Semantic Conventions]: https://opentelemetry.io/docs/concepts/semantic-conventions/
687 /// [tracing]: https://docs.rs/tracing/latest/tracing/
688 ///
689 /// </div>
690 ///
691 /// The libraries are instrumented to generate the following signals:
692 ///
693 /// 1. `INFO` spans for each logical client request. Typically a single method call in the client
694 /// struct gets such a span.
695 /// 1. A histogram metric measuring the elapsed time for each logical client request.
696 /// 1. `WARN` logs for each logical client requests that fail.
697 /// 1. `INFO` spans for each low-level attempt RPC attempt. Typically a single method in the client
698 /// struct gets one such span, but there may be more if the library had to retry the RPC.
699 /// 1. `DEBUG` logs for each low-level attempt that fails.
700 ///
701 /// These spans and logs follow [OpenTelemetry Semantic Conventions] with additional Google
702 /// Cloud attributes. Both the spans and logs and are should be suitable for production
703 /// monitoring.
704 ///
705 /// The libraries also have `DEBUG` spans for each request, these include the full request body,
706 /// and the full response body for successful requests, and the full error message, with
707 /// details, for failed requests. Consider the contents of these requests and responses before
708 /// enabling them in production environments, as the request or responses may include sensitive
709 /// data. These `DEBUG` spans use the `google_cloud_storage::tracing` as their target and the
710 /// method name as the span name. You can use the name and/or target to set up your filters.
711 ///
712 /// # More information
713 ///
714 /// The [Enable logging] guide shows you how to initialize a subscriber to
715 /// log events to the console.
716 ///
717 /// [Enable logging]: https://docs.cloud.google.com/rust/enable-logging
718 /// [tracing]: https://docs.rs/tracing
719 pub fn with_tracing(mut self) -> Self {
720 self.config.tracing = true;
721 self
722 }
723
724 pub(crate) fn apply_default_credentials(&mut self) -> BuilderResult<()> {
725 if self.config.cred.is_some() {
726 return Ok(());
727 };
728 let default = CredentialsBuilder::default()
729 .build()
730 .map_err(BuilderError::cred)?;
731 self.config.cred = Some(default);
732 Ok(())
733 }
734
735 pub(crate) fn apply_default_endpoint(&mut self) -> BuilderResult<()> {
736 let _ = self
737 .config
738 .endpoint
739 .get_or_insert_with(|| super::DEFAULT_HOST.to_string());
740 Ok(())
741 }
742
743 // Breaks the builder into its parts, with defaults applied.
744 pub(crate) fn into_parts(
745 mut self,
746 ) -> google_cloud_gax::client_builder::Result<(ClientConfig, RequestOptions)> {
747 self.apply_default_credentials()?;
748 self.apply_default_endpoint()?;
749 let request_options =
750 RequestOptions::new_with_client_config(&self.config, self.common_options);
751 Ok((self.config, request_options))
752 }
753}
754
755/// The set of characters that are percent encoded.
756///
757/// This set is defined at https://cloud.google.com/storage/docs/request-endpoints#encoding:
758///
759/// Encode the following characters when they appear in either the object name
760/// or query string of a request URL:
761/// !, #, $, &, ', (, ), *, +, ,, /, :, ;, =, ?, @, [, ], and space characters.
762pub(crate) const ENCODED_CHARS: percent_encoding::AsciiSet = percent_encoding::CONTROLS
763 .add(b'!')
764 .add(b'#')
765 .add(b'$')
766 .add(b'&')
767 .add(b'\'')
768 .add(b'(')
769 .add(b')')
770 .add(b'*')
771 .add(b'+')
772 .add(b',')
773 .add(b'/')
774 .add(b':')
775 .add(b';')
776 .add(b'=')
777 .add(b'?')
778 .add(b'@')
779 .add(b'[')
780 .add(b']')
781 .add(b' ');
782
783/// Percent encode a string.
784///
785/// To ensure compatibility certain characters need to be encoded when they appear
786/// in either the object name or query string of a request URL.
787pub(crate) fn enc(value: &str) -> String {
788 percent_encoding::utf8_percent_encode(value, &ENCODED_CHARS).to_string()
789}
790
791pub(crate) fn apply_customer_supplied_encryption_headers(
792 builder: HttpRequestBuilder,
793 common_object_request_params: &Option<crate::model::CommonObjectRequestParams>,
794) -> HttpRequestBuilder {
795 common_object_request_params.iter().fold(builder, |b, v| {
796 b.header(
797 "x-goog-encryption-algorithm",
798 v.encryption_algorithm.clone(),
799 )
800 .header(
801 "x-goog-encryption-key",
802 BASE64_STANDARD.encode(v.encryption_key_bytes.clone()),
803 )
804 .header(
805 "x-goog-encryption-key-sha256",
806 BASE64_STANDARD.encode(v.encryption_key_sha256_bytes.clone()),
807 )
808 })
809}
810
811#[cfg(test)]
812pub(crate) mod tests {
813 use super::*;
814 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
815 use google_cloud_gax::retry_result::RetryResult;
816 use google_cloud_gax::retry_state::RetryState;
817 use std::{sync::Arc, time::Duration};
818
819 #[test]
820 fn default_settings() {
821 let builder = ClientBuilder::new().with_credentials(Anonymous::new().build());
822 let config = builder.config;
823 assert!(config.retry_policy.is_some(), "{config:?}");
824 assert!(config.backoff_policy.is_some(), "{config:?}");
825 {
826 assert!(
827 config.grpc_subchannel_count.is_some_and(|v| v >= 1),
828 "{config:?}"
829 );
830 }
831 }
832
833 #[test]
834 fn subchannel_count() {
835 let builder = ClientBuilder::new()
836 .with_credentials(Anonymous::new().build())
837 .with_grpc_subchannel_count(42);
838 let config = builder.config;
839 assert!(
840 config.grpc_subchannel_count.is_some_and(|v| v == 42),
841 "{config:?}"
842 );
843 }
844
845 #[test]
846 fn universe_domain() {
847 let builder = ClientBuilder::new()
848 .with_credentials(Anonymous::new().build())
849 .with_universe_domain("my-universe.com");
850 let config = builder.config;
851 assert_eq!(
852 config.universe_domain.as_deref(),
853 Some("my-universe.com"),
854 "{config:?}"
855 );
856 }
857
858 #[derive(Debug)]
859 struct DummyStorage;
860
861 impl crate::storage::stub::Storage for DummyStorage {}
862
863 #[test]
864 fn from_stub_accepts_both_raw_and_arc() {
865 let stub = DummyStorage;
866 let _client = Storage::<DummyStorage>::from_stub(stub);
867
868 let stub_arc = std::sync::Arc::new(DummyStorage);
869 let _client_arc = Storage::<DummyStorage>::from_stub(stub_arc);
870 }
871
872 #[test]
873 fn from_stub_allows_sharing_stub() {
874 let stub_arc = std::sync::Arc::new(DummyStorage);
875
876 let _client1 = Storage::<DummyStorage>::from_stub(stub_arc.clone());
877 let _client2 = Storage::<DummyStorage>::from_stub(stub_arc);
878 }
879
880 pub(crate) fn test_builder() -> ClientBuilder {
881 ClientBuilder::new()
882 .with_credentials(Anonymous::new().build())
883 .with_endpoint("http://private.googleapis.com")
884 .with_universe_domain("googleapis.com")
885 .with_backoff_policy(
886 google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder::new()
887 .with_initial_delay(Duration::from_millis(1))
888 .with_maximum_delay(Duration::from_millis(2))
889 .build()
890 .expect("hard coded policy should build correctly"),
891 )
892 }
893
894 /// This is used by the request builder tests.
895 pub(crate) async fn test_inner_client(builder: ClientBuilder) -> Arc<StorageInner> {
896 let inner = StorageInner::from_parts(builder)
897 .await
898 .expect("creating an test inner client succeeds");
899 Arc::new(inner)
900 }
901
902 mockall::mock! {
903 #[derive(Debug)]
904 pub RetryThrottler {}
905
906 impl google_cloud_gax::retry_throttler::RetryThrottler for RetryThrottler {
907 fn throttle_retry_attempt(&self) -> bool;
908 fn on_retry_failure(&mut self, flow: &RetryResult);
909 fn on_success(&mut self);
910 }
911 }
912
913 mockall::mock! {
914 #[derive(Debug)]
915 pub RetryPolicy {}
916
917 impl google_cloud_gax::retry_policy::RetryPolicy for RetryPolicy {
918 fn on_error(&self, state: &RetryState, error: google_cloud_gax::error::Error) -> RetryResult;
919 }
920 }
921
922 mockall::mock! {
923 #[derive(Debug)]
924 pub BackoffPolicy {}
925
926 impl google_cloud_gax::backoff_policy::BackoffPolicy for BackoffPolicy {
927 fn on_failure(&self, state: &RetryState) -> std::time::Duration;
928 }
929 }
930
931 mockall::mock! {
932 #[derive(Debug)]
933 pub ReadResumePolicy {}
934
935 impl crate::read_resume_policy::ReadResumePolicy for ReadResumePolicy {
936 fn on_error(&self, query: &crate::read_resume_policy::ResumeQuery, error: google_cloud_gax::error::Error) -> crate::read_resume_policy::ResumeResult;
937 }
938 }
939}