google_cloud_storage/storage/client.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::request_options::RequestOptions;
16use crate::builder::storage::ReadObject;
17use crate::builder::storage::WriteObject;
18use crate::read_resume_policy::ReadResumePolicy;
19use crate::storage::bidi::OpenObject;
20use crate::storage::common_options::CommonOptions;
21use crate::streaming_source::Payload;
22use base64::Engine;
23use base64::prelude::BASE64_STANDARD;
24use gaxi::http::HttpRequestBuilder;
25use gaxi::options::{ClientConfig, Credentials};
26use google_cloud_auth::credentials::Builder as CredentialsBuilder;
27use google_cloud_gax::client_builder::{Error as BuilderError, Result as BuilderResult};
28use std::sync::Arc;
29
30/// Implements a client for the Cloud Storage API.
31///
32/// # Example
33/// ```
34/// # async fn sample() -> anyhow::Result<()> {
35/// # use google_cloud_storage::client::Storage;
36/// let client = Storage::builder().build().await?;
37/// // use `client` to make requests to Cloud Storage.
38/// # Ok(()) }
39/// ```
40///
41/// # Configuration
42///
43/// To configure `Storage` use the `with_*` methods in the type returned
44/// by [builder()][Storage::builder]. The default configuration should
45/// work for most applications. Common configuration changes include
46///
47/// * [with_endpoint()]: by default this client uses the global default endpoint
48/// (`https://storage.googleapis.com`). Applications using regional
49/// endpoints or running in restricted networks (e.g. a network configured
50/// with [Private Google Access with VPC Service Controls]) may want to
51/// override this default.
52/// * [with_credentials()]: by default this client uses
53/// [Application Default Credentials]. Applications using custom
54/// authentication may need to override this default.
55///
56/// # Pooling and Cloning
57///
58/// `Storage` holds a connection pool internally, it is advised to
59/// create one and then reuse it. You do not need to wrap `Storage` in
60/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc`
61/// internally.
62///
63/// # Service Description
64///
65/// The Cloud Storage API allows applications to read and write data through
66/// the abstractions of buckets and objects. For a description of these
67/// abstractions please see <https://cloud.google.com/storage/docs>.
68///
69/// Resources are named as follows:
70///
71/// - Projects are referred to as they are defined by the Resource Manager API,
72/// using strings like `projects/123456` or `projects/my-string-id`.
73///
74/// - Buckets are named using string names of the form:
75/// `projects/{project}/buckets/{bucket}`
76/// For globally unique buckets, `_` may be substituted for the project.
77///
78/// - Objects are uniquely identified by their name along with the name of the
79/// bucket they belong to, as separate strings in this API. For example:
80/// ```no_rust
81/// bucket = "projects/_/buckets/my-bucket"
82/// object = "my-object/with/a/folder-like/name"
83/// ```
84/// Note that object names can contain `/` characters, which are treated as
85/// any other character (no special directory semantics).
86///
87/// [with_endpoint()]: ClientBuilder::with_endpoint
88/// [with_credentials()]: ClientBuilder::with_credentials
89/// [Private Google Access with VPC Service Controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
90/// [Application Default Credentials]: https://cloud.google.com/docs/authentication#adc
91#[derive(Clone, Debug)]
92pub struct Storage<S = crate::stub::DefaultStorage>
93where
94 S: crate::stub::Storage + 'static,
95{
96 stub: std::sync::Arc<S>,
97 options: RequestOptions,
98}
99
100#[derive(Clone, Debug)]
101pub(crate) struct StorageInner {
102 pub client: gaxi::http::ReqwestClient,
103 pub options: RequestOptions,
104 pub grpc: gaxi::grpc::Client,
105}
106
107impl Storage {
108 /// Returns a builder for [Storage].
109 ///
110 /// # Example
111 /// ```
112 /// # use google_cloud_storage::client::Storage;
113 /// # async fn sample() -> anyhow::Result<()> {
114 /// let client = Storage::builder().build().await?;
115 /// # Ok(()) }
116 /// ```
117 pub fn builder() -> ClientBuilder {
118 ClientBuilder::new()
119 }
120}
121
122impl<S> Storage<S>
123where
124 S: crate::storage::stub::Storage + 'static,
125{
126 /// Creates a new client from the provided stub.
127 ///
128 /// The most common case for calling this function is in tests mocking the
129 /// client's behavior.
130 pub fn from_stub(stub: S) -> Self
131 where
132 S: super::stub::Storage + 'static,
133 {
134 Self {
135 stub: std::sync::Arc::new(stub),
136 options: RequestOptions::new(),
137 }
138 }
139
140 /// Write an object with data from any data source.
141 ///
142 /// # Example
143 /// ```
144 /// # use google_cloud_storage::client::Storage;
145 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
146 /// let response = client
147 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
148 /// .send_buffered()
149 /// .await?;
150 /// println!("response details={response:?}");
151 /// # Ok(()) }
152 /// ```
153 ///
154 /// # Example
155 /// ```
156 /// # use google_cloud_storage::client::Storage;
157 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
158 /// let response = client
159 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
160 /// .send_unbuffered()
161 /// .await?;
162 /// println!("response details={response:?}");
163 /// # Ok(()) }
164 /// ```
165 ///
166 /// You can use many different types as the payload. For example, a string,
167 /// a [bytes::Bytes], a [tokio::fs::File], or a custom type that implements
168 /// the [StreamingSource] trait.
169 ///
170 /// If your data source also implements [Seek], prefer [send_unbuffered()]
171 /// to start the write. Otherwise use [send_buffered()].
172 ///
173 /// # Parameters
174 /// * `bucket` - the bucket name containing the object. In
175 /// `projects/_/buckets/{bucket_id}` format.
176 /// * `object` - the object name.
177 /// * `payload` - the object data.
178 ///
179 /// [Seek]: crate::streaming_source::Seek
180 /// [StreamingSource]: crate::streaming_source::StreamingSource
181 /// [send_buffered()]: crate::builder::storage::WriteObject::send_buffered
182 /// [send_unbuffered()]: crate::builder::storage::WriteObject::send_unbuffered
183 pub fn write_object<B, O, T, P>(&self, bucket: B, object: O, payload: T) -> WriteObject<P, S>
184 where
185 B: Into<String>,
186 O: Into<String>,
187 T: Into<Payload<P>>,
188 {
189 WriteObject::new(
190 self.stub.clone(),
191 bucket,
192 object,
193 payload,
194 self.options.clone(),
195 )
196 }
197
198 /// Reads the contents of an object.
199 ///
200 /// # Example
201 /// ```
202 /// # use google_cloud_storage::client::Storage;
203 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
204 /// let mut resp = client
205 /// .read_object("projects/_/buckets/my-bucket", "my-object")
206 /// .send()
207 /// .await?;
208 /// let mut contents = Vec::new();
209 /// while let Some(chunk) = resp.next().await.transpose()? {
210 /// contents.extend_from_slice(&chunk);
211 /// }
212 /// println!("object contents={:?}", bytes::Bytes::from_owner(contents));
213 /// # Ok(()) }
214 /// ```
215 ///
216 /// # Parameters
217 /// * `bucket` - the bucket name containing the object. In
218 /// `projects/_/buckets/{bucket_id}` format.
219 /// * `object` - the object name.
220 pub fn read_object<B, O>(&self, bucket: B, object: O) -> ReadObject<S>
221 where
222 B: Into<String>,
223 O: Into<String>,
224 {
225 ReadObject::new(self.stub.clone(), bucket, object, self.options.clone())
226 }
227
228 /// Opens an object to read its contents using concurrent ranged reads.
229 ///
230 /// # Example
231 /// ```
232 /// # use google_cloud_storage::client::Storage;
233 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
234 /// use google_cloud_storage::model_ext::ReadRange;
235 /// let descriptor = client
236 /// .open_object("projects/_/buckets/my-bucket", "my-object")
237 /// .send()
238 /// .await?;
239 /// // Print the object metadata
240 /// println!("metadata = {:?}", descriptor.object());
241 /// // Read 2000 bytes starting at offset 1000.
242 /// let mut reader = descriptor.read_range(ReadRange::segment(1000, 2000)).await;
243 /// let mut contents = Vec::new();
244 /// while let Some(chunk) = reader.next().await.transpose()? {
245 /// contents.extend_from_slice(&chunk);
246 /// }
247 /// println!("range contents={:?}", bytes::Bytes::from_owner(contents));
248 /// // `descriptor` can be used to read more ranges, concurrently if needed.
249 /// # Ok(()) }
250 /// ```
251 ///
252 /// # Example: open and read in a single RPC
253 /// ```
254 /// # use google_cloud_storage::client::Storage;
255 /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
256 /// use google_cloud_storage::model_ext::ReadRange;
257 /// let (descriptor, mut reader) = client
258 /// .open_object("projects/_/buckets/my-bucket", "my-object")
259 /// .send_and_read(ReadRange::segment(1000, 2000))
260 /// .await?;
261 /// // `descriptor` can be used to read more ranges.
262 /// # Ok(()) }
263 /// ```
264 ///
265 /// <div class="warning">
266 /// The APIs used by this method are only enabled for some projects and
267 /// buckets. Contact your account team to enable this API.
268 /// </div>
269 ///
270 /// # Parameters
271 /// * `bucket` - the bucket name containing the object. In
272 /// `projects/_/buckets/{bucket_id}` format.
273 /// * `object` - the object name.
274 pub fn open_object<B, O>(&self, bucket: B, object: O) -> OpenObject<S>
275 where
276 B: Into<String>,
277 O: Into<String>,
278 {
279 OpenObject::new(self.stub.clone(), bucket, object, self.options.clone())
280 }
281}
282
283impl Storage {
284 pub(crate) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
285 let tracing = builder.config.tracing;
286 let inner = StorageInner::from_parts(builder).await?;
287 let options = inner.options.clone();
288 let stub = crate::storage::transport::Storage::new(Arc::new(inner), tracing);
289 Ok(Self { stub, options })
290 }
291}
292
293impl StorageInner {
294 /// Builds a client assuming `config.cred` and `config.endpoint` are initialized, panics otherwise.
295 pub(self) fn new(
296 client: gaxi::http::ReqwestClient,
297 options: RequestOptions,
298 grpc: gaxi::grpc::Client,
299 ) -> Self {
300 Self {
301 client,
302 options,
303 grpc,
304 }
305 }
306
307 pub(self) async fn from_parts(builder: ClientBuilder) -> BuilderResult<Self> {
308 let (mut config, options) = builder.into_parts()?;
309 config.disable_automatic_decompression = true;
310 config.disable_follow_redirects = true;
311
312 let client = gaxi::http::ReqwestClient::new(config.clone(), super::DEFAULT_HOST).await?;
313 let client = if gaxi::options::tracing_enabled(&config) {
314 client.with_instrumentation(&super::info::INSTRUMENTATION)
315 } else {
316 client
317 };
318 let grpc = if gaxi::options::tracing_enabled(&config) {
319 gaxi::grpc::Client::new_with_instrumentation(
320 config,
321 super::DEFAULT_HOST,
322 &super::info::INSTRUMENTATION,
323 )
324 .await?
325 } else {
326 gaxi::grpc::Client::new(config, super::DEFAULT_HOST).await?
327 };
328
329 let inner = StorageInner::new(client, options, grpc);
330 Ok(inner)
331 }
332}
333
334/// A builder for [Storage].
335///
336/// ```
337/// # use google_cloud_storage::client::Storage;
338/// # async fn sample() -> anyhow::Result<()> {
339/// let builder = Storage::builder();
340/// let client = builder
341/// .with_endpoint("https://storage.googleapis.com")
342/// .build()
343/// .await?;
344/// # Ok(()) }
345/// ```
346pub struct ClientBuilder {
347 // Common options for all clients (generated or not).
348 pub(crate) config: ClientConfig,
349 // Specific options for the storage client. `RequestOptions` also requires
350 // these, it makes sense to share them.
351 common_options: CommonOptions,
352}
353
354impl ClientBuilder {
355 pub(crate) fn new() -> Self {
356 let mut config = ClientConfig::default();
357 config.retry_policy = Some(Arc::new(crate::retry_policy::storage_default()));
358 config.backoff_policy = Some(Arc::new(crate::backoff_policy::default()));
359 {
360 let count = std::thread::available_parallelism().ok();
361 config.grpc_subchannel_count = Some(count.map(|x| x.get()).unwrap_or(1));
362 }
363 let common_options = CommonOptions::new();
364 Self {
365 config,
366 common_options,
367 }
368 }
369
370 /// Creates a new client.
371 ///
372 /// # Example
373 /// ```
374 /// # use google_cloud_storage::client::Storage;
375 /// # async fn sample() -> anyhow::Result<()> {
376 /// let client = Storage::builder().build().await?;
377 /// # Ok(()) }
378 /// ```
379 pub async fn build(self) -> BuilderResult<Storage> {
380 Storage::new(self).await
381 }
382
383 /// Sets the endpoint.
384 ///
385 /// # Example
386 /// ```
387 /// # use google_cloud_storage::client::Storage;
388 /// # async fn sample() -> anyhow::Result<()> {
389 /// let client = Storage::builder()
390 /// .with_endpoint("https://private.googleapis.com")
391 /// .build()
392 /// .await?;
393 /// # Ok(()) }
394 /// ```
395 pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
396 self.config.endpoint = Some(v.into());
397 self
398 }
399
400 /// Configures the authentication credentials.
401 ///
402 /// Google Cloud Storage requires authentication for most buckets. Use this
403 /// method to change the credentials used by the client. More information
404 /// about valid credentials types can be found in the [google-cloud-auth]
405 /// crate documentation.
406 ///
407 /// # Example
408 /// ```
409 /// # use google_cloud_storage::client::Storage;
410 /// # async fn sample() -> anyhow::Result<()> {
411 /// use google_cloud_auth::credentials::mds;
412 /// let client = Storage::builder()
413 /// .with_credentials(
414 /// mds::Builder::default()
415 /// .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
416 /// .build()?)
417 /// .build()
418 /// .await?;
419 /// # Ok(()) }
420 /// ```
421 ///
422 /// [google-cloud-auth]: https://docs.rs/google-cloud-auth
423 pub fn with_credentials<V: Into<Credentials>>(mut self, v: V) -> Self {
424 self.config.cred = Some(v.into());
425 self
426 }
427
428 /// Configure the retry policy.
429 ///
430 /// The client libraries can automatically retry operations that fail. The
431 /// retry policy controls what errors are considered retryable, sets limits
432 /// on the number of attempts or the time trying to make attempts.
433 ///
434 /// # Example
435 /// ```
436 /// # use google_cloud_storage::client::Storage;
437 /// # async fn sample() -> anyhow::Result<()> {
438 /// use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
439 /// let client = Storage::builder()
440 /// .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
441 /// .build()
442 /// .await?;
443 /// # Ok(()) }
444 /// ```
445 pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
446 mut self,
447 v: V,
448 ) -> Self {
449 self.config.retry_policy = Some(v.into().into());
450 self
451 }
452
453 /// Configure the retry backoff policy.
454 ///
455 /// The client libraries can automatically retry operations that fail. The
456 /// backoff policy controls how long to wait in between retry attempts.
457 ///
458 /// # Example
459 /// ```
460 /// # use google_cloud_storage::client::Storage;
461 /// # async fn sample() -> anyhow::Result<()> {
462 /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
463 /// use std::time::Duration;
464 /// let policy = ExponentialBackoff::default();
465 /// let client = Storage::builder()
466 /// .with_backoff_policy(policy)
467 /// .build()
468 /// .await?;
469 /// # Ok(()) }
470 /// ```
471 pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
472 mut self,
473 v: V,
474 ) -> Self {
475 self.config.backoff_policy = Some(v.into().into());
476 self
477 }
478
479 /// Configure the retry throttler.
480 ///
481 /// Advanced applications may want to configure a retry throttler to
482 /// [Address Cascading Failures] and when [Handling Overload] conditions.
483 /// The client libraries throttle their retry loop, using a policy to
484 /// control the throttling algorithm. Use this method to fine tune or
485 /// customize the default retry throtler.
486 ///
487 /// [Handling Overload]: https://sre.google/sre-book/handling-overload/
488 /// [Address Cascading Failures]: https://sre.google/sre-book/addressing-cascading-failures/
489 ///
490 /// # Example
491 /// ```
492 /// # use google_cloud_storage::client::Storage;
493 /// # async fn sample() -> anyhow::Result<()> {
494 /// use google_cloud_gax::retry_throttler::AdaptiveThrottler;
495 /// let client = Storage::builder()
496 /// .with_retry_throttler(AdaptiveThrottler::default())
497 /// .build()
498 /// .await?;
499 /// # Ok(()) }
500 /// ```
501 pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
502 mut self,
503 v: V,
504 ) -> Self {
505 self.config.retry_throttler = v.into().into();
506 self
507 }
508
509 /// Sets the payload size threshold to switch from single-shot to resumable uploads.
510 ///
511 /// # Example
512 /// ```
513 /// # use google_cloud_storage::client::Storage;
514 /// # async fn sample() -> anyhow::Result<()> {
515 /// let client = Storage::builder()
516 /// .with_resumable_upload_threshold(0_usize) // Forces a resumable upload.
517 /// .build()
518 /// .await?;
519 /// let response = client
520 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
521 /// .send_buffered()
522 /// .await?;
523 /// println!("response details={response:?}");
524 /// # Ok(()) }
525 /// ```
526 ///
527 /// The client library can write objects using [single-shot] or [resumable]
528 /// uploads. For small objects, single-shot uploads offer better
529 /// performance, as they require a single HTTP transfer. For larger objects,
530 /// the additional request latency is not significant, and resumable uploads
531 /// offer better recovery on errors.
532 ///
533 /// The library automatically selects resumable uploads when the payload is
534 /// equal to or larger than this option. For smaller writes the client
535 /// library uses single-shot uploads.
536 ///
537 /// The exact threshold depends on where the application is deployed and
538 /// destination bucket location with respect to where the application is
539 /// running. The library defaults should work well in most cases, but some
540 /// applications may benefit from fine-tuning.
541 ///
542 /// [single-shot]: https://cloud.google.com/storage/docs/uploading-objects
543 /// [resumable]: https://cloud.google.com/storage/docs/resumable-uploads
544 pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
545 self.common_options.resumable_upload_threshold = v.into();
546 self
547 }
548
549 /// Changes the buffer size for some resumable uploads.
550 ///
551 /// # Example
552 /// ```
553 /// # use google_cloud_storage::client::Storage;
554 /// # async fn sample() -> anyhow::Result<()> {
555 /// let client = Storage::builder()
556 /// .with_resumable_upload_buffer_size(32 * 1024 * 1024_usize)
557 /// .build()
558 /// .await?;
559 /// let response = client
560 /// .write_object("projects/_/buckets/my-bucket", "my-object", "hello world")
561 /// .send_buffered()
562 /// .await?;
563 /// println!("response details={response:?}");
564 /// # Ok(()) }
565 /// ```
566 ///
567 /// When performing [resumable uploads] from sources without [Seek] the
568 /// client library needs to buffer data in memory until it is persisted by
569 /// the service. Otherwise the data would be lost if the upload is
570 /// interrupted. Applications may want to tune this buffer size:
571 ///
572 /// - Use smaller buffer sizes to support more concurrent writes in the
573 /// same application.
574 /// - Use larger buffer sizes for better throughput. Sending many small
575 /// buffers stalls the writer until the client receives a successful
576 /// response from the service.
577 ///
578 /// Keep in mind that there are diminishing returns on using larger buffers.
579 ///
580 /// [resumable uploads]: https://cloud.google.com/storage/docs/resumable-uploads
581 /// [Seek]: crate::streaming_source::Seek
582 pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
583 self.common_options.resumable_upload_buffer_size = v.into();
584 self
585 }
586
587 /// Configure the resume policy for object reads.
588 ///
589 /// The Cloud Storage client library can automatically resume a read request
590 /// that is interrupted by a transient error. Applications may want to
591 /// limit the number of read attempts, or may wish to expand the type
592 /// of errors treated as retryable.
593 ///
594 /// # Example
595 /// ```
596 /// # use google_cloud_storage::client::Storage;
597 /// # async fn sample() -> anyhow::Result<()> {
598 /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
599 /// let client = Storage::builder()
600 /// .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
601 /// .build()
602 /// .await?;
603 /// # Ok(()) }
604 /// ```
605 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
606 where
607 V: ReadResumePolicy + 'static,
608 {
609 self.common_options.read_resume_policy = Arc::new(v);
610 self
611 }
612
613 /// Configure the number of subchannels used by the client.
614 ///
615 /// # Example
616 /// ```
617 /// # use google_cloud_storage::client::Storage;
618 /// # async fn sample() -> anyhow::Result<()> {
619 /// // By default the client uses `count` subchannels.
620 /// let count = std::thread::available_parallelism()?.get();
621 /// let client = Storage::builder()
622 /// .with_grpc_subchannel_count(std::cmp::max(1, count / 2))
623 /// .build()
624 /// .await?;
625 /// # Ok(()) }
626 /// ```
627 ///
628 /// gRPC-based clients may exhibit high latency if many requests need to be
629 /// demuxed over a single HTTP/2 connection (often called a *subchannel* in gRPC).
630 /// Consider using more subchannels if your application makes many
631 /// concurrent requests. Consider using fewer subchannels if your
632 /// application needs the file descriptors for other purposes.
633 ///
634 /// Keep in mind that Google Cloud limits the number of concurrent RPCs in
635 /// a single connection to about 100.
636 pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
637 self.config.grpc_subchannel_count = Some(v);
638 self
639 }
640
641 /// Enables observability signals for the client.
642 ///
643 /// # Example
644 /// ```
645 /// # use google_cloud_storage::client::Storage;
646 /// # async fn sample() -> anyhow::Result<()> {
647 /// let client = Storage::builder()
648 /// .with_tracing()
649 /// .build()
650 /// .await?;
651 /// // For observing traces and logs, you must also enable a tracing subscriber in your `main` function,
652 /// // for example:
653 /// // tracing_subscriber::fmt::init();
654 /// // For observing metrics, you must also install an OpenTelemetry meter provider in your `main` function,
655 /// // for example:
656 /// // opentelemetry::global::set_meter_provider(provider.clone());
657 /// # Ok(()) }
658 /// ```
659 ///
660 /// <div class="warning">
661 ///
662 /// Observability signals at any level may contain sensitive data such as resource names (bucket
663 /// and object names), full URLs, and error messages.
664 ///
665 /// Before configuring subscribers or exporters for traces and logs, review the contents of the
666 /// spans and consult the [tracing] framework documentation to set up filters and formatters to
667 /// prevent leaking sensitive information, depending on your intended use case.
668 ///
669 /// [OpenTelemetry Semantic Conventions]: https://opentelemetry.io/docs/concepts/semantic-conventions/
670 /// [tracing]: https://docs.rs/tracing/latest/tracing/
671 ///
672 /// </div>
673 ///
674 /// The libraries are instrumented to generate the following signals:
675 ///
676 /// 1. `INFO` spans for each logical client request. Typically a single method call in the client
677 /// struct gets such a span.
678 /// 1. A histogram metric measuring the elapsed time for each logical client request.
679 /// 1. `WARN` logs for each logical client requests that fail.
680 /// 1. `INFO` spans for each low-level attempt RPC attempt. Typically a single method in the client
681 /// struct gets one such span, but there may be more if the library had to retry the RPC.
682 /// 1. `DEBUG` logs for each low-level attempt that fails.
683 ///
684 /// These spans and logs follow [OpenTelemetry Semantic Conventions] with additional Google
685 /// Cloud attributes. Both the spans and logs and are should be suitable for production
686 /// monitoring.
687 ///
688 /// The libraries also have `DEBUG` spans for each request, these include the full request body,
689 /// and the full response body for successful requests, and the full error message, with
690 /// details, for failed requests. Consider the contents of these requests and responses before
691 /// enabling them in production environments, as the request or responses may include sensitive
692 /// data. These `DEBUG` spans use the `google_cloud_storage::tracing` as their target and the
693 /// method name as the span name. You can use the name and/or target to set up your filters.
694 ///
695 /// # More information
696 ///
697 /// The [Enable logging] guide shows you how to initialize a subscriber to
698 /// log events to the console.
699 ///
700 /// [Enable logging]: https://docs.cloud.google.com/rust/enable-logging
701 /// [tracing]: https://docs.rs/tracing
702 pub fn with_tracing(mut self) -> Self {
703 self.config.tracing = true;
704 self
705 }
706
707 pub(crate) fn apply_default_credentials(&mut self) -> BuilderResult<()> {
708 if self.config.cred.is_some() {
709 return Ok(());
710 };
711 let default = CredentialsBuilder::default()
712 .build()
713 .map_err(BuilderError::cred)?;
714 self.config.cred = Some(default);
715 Ok(())
716 }
717
718 pub(crate) fn apply_default_endpoint(&mut self) -> BuilderResult<()> {
719 let _ = self
720 .config
721 .endpoint
722 .get_or_insert_with(|| super::DEFAULT_HOST.to_string());
723 Ok(())
724 }
725
726 // Breaks the builder into its parts, with defaults applied.
727 pub(crate) fn into_parts(
728 mut self,
729 ) -> google_cloud_gax::client_builder::Result<(ClientConfig, RequestOptions)> {
730 self.apply_default_credentials()?;
731 self.apply_default_endpoint()?;
732 let request_options =
733 RequestOptions::new_with_client_config(&self.config, self.common_options);
734 Ok((self.config, request_options))
735 }
736}
737
738/// The set of characters that are percent encoded.
739///
740/// This set is defined at https://cloud.google.com/storage/docs/request-endpoints#encoding:
741///
742/// Encode the following characters when they appear in either the object name
743/// or query string of a request URL:
744/// !, #, $, &, ', (, ), *, +, ,, /, :, ;, =, ?, @, [, ], and space characters.
745pub(crate) const ENCODED_CHARS: percent_encoding::AsciiSet = percent_encoding::CONTROLS
746 .add(b'!')
747 .add(b'#')
748 .add(b'$')
749 .add(b'&')
750 .add(b'\'')
751 .add(b'(')
752 .add(b')')
753 .add(b'*')
754 .add(b'+')
755 .add(b',')
756 .add(b'/')
757 .add(b':')
758 .add(b';')
759 .add(b'=')
760 .add(b'?')
761 .add(b'@')
762 .add(b'[')
763 .add(b']')
764 .add(b' ');
765
766/// Percent encode a string.
767///
768/// To ensure compatibility certain characters need to be encoded when they appear
769/// in either the object name or query string of a request URL.
770pub(crate) fn enc(value: &str) -> String {
771 percent_encoding::utf8_percent_encode(value, &ENCODED_CHARS).to_string()
772}
773
774pub(crate) fn apply_customer_supplied_encryption_headers(
775 builder: HttpRequestBuilder,
776 common_object_request_params: &Option<crate::model::CommonObjectRequestParams>,
777) -> HttpRequestBuilder {
778 common_object_request_params.iter().fold(builder, |b, v| {
779 b.header(
780 "x-goog-encryption-algorithm",
781 v.encryption_algorithm.clone(),
782 )
783 .header(
784 "x-goog-encryption-key",
785 BASE64_STANDARD.encode(v.encryption_key_bytes.clone()),
786 )
787 .header(
788 "x-goog-encryption-key-sha256",
789 BASE64_STANDARD.encode(v.encryption_key_sha256_bytes.clone()),
790 )
791 })
792}
793
794#[cfg(test)]
795pub(crate) mod tests {
796 use super::*;
797 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
798 use google_cloud_gax::retry_result::RetryResult;
799 use google_cloud_gax::retry_state::RetryState;
800 use std::{sync::Arc, time::Duration};
801
802 #[test]
803 fn default_settings() {
804 let builder = ClientBuilder::new().with_credentials(Anonymous::new().build());
805 let config = builder.config;
806 assert!(config.retry_policy.is_some(), "{config:?}");
807 assert!(config.backoff_policy.is_some(), "{config:?}");
808 {
809 assert!(
810 config.grpc_subchannel_count.is_some_and(|v| v >= 1),
811 "{config:?}"
812 );
813 }
814 }
815
816 #[test]
817 fn subchannel_count() {
818 let builder = ClientBuilder::new()
819 .with_credentials(Anonymous::new().build())
820 .with_grpc_subchannel_count(42);
821 let config = builder.config;
822 assert!(
823 config.grpc_subchannel_count.is_some_and(|v| v == 42),
824 "{config:?}"
825 );
826 }
827
828 pub(crate) fn test_builder() -> ClientBuilder {
829 ClientBuilder::new()
830 .with_credentials(Anonymous::new().build())
831 .with_endpoint("http://private.googleapis.com")
832 .with_backoff_policy(
833 google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder::new()
834 .with_initial_delay(Duration::from_millis(1))
835 .with_maximum_delay(Duration::from_millis(2))
836 .build()
837 .expect("hard coded policy should build correctly"),
838 )
839 }
840
841 /// This is used by the request builder tests.
842 pub(crate) async fn test_inner_client(builder: ClientBuilder) -> Arc<StorageInner> {
843 let inner = StorageInner::from_parts(builder)
844 .await
845 .expect("creating an test inner client succeeds");
846 Arc::new(inner)
847 }
848
849 mockall::mock! {
850 #[derive(Debug)]
851 pub RetryThrottler {}
852
853 impl google_cloud_gax::retry_throttler::RetryThrottler for RetryThrottler {
854 fn throttle_retry_attempt(&self) -> bool;
855 fn on_retry_failure(&mut self, flow: &RetryResult);
856 fn on_success(&mut self);
857 }
858 }
859
860 mockall::mock! {
861 #[derive(Debug)]
862 pub RetryPolicy {}
863
864 impl google_cloud_gax::retry_policy::RetryPolicy for RetryPolicy {
865 fn on_error(&self, state: &RetryState, error: google_cloud_gax::error::Error) -> RetryResult;
866 }
867 }
868
869 mockall::mock! {
870 #[derive(Debug)]
871 pub BackoffPolicy {}
872
873 impl google_cloud_gax::backoff_policy::BackoffPolicy for BackoffPolicy {
874 fn on_failure(&self, state: &RetryState) -> std::time::Duration;
875 }
876 }
877
878 mockall::mock! {
879 #[derive(Debug)]
880 pub ReadResumePolicy {}
881
882 impl crate::read_resume_policy::ReadResumePolicy for ReadResumePolicy {
883 fn on_error(&self, query: &crate::read_resume_policy::ResumeQuery, error: google_cloud_gax::error::Error) -> crate::read_resume_policy::ResumeResult;
884 }
885 }
886}