Skip to main content

google_cloud_pubsub/publisher/
client_builder.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::base_publisher::BasePublisher;
16use gaxi::options::ClientConfig;
17use google_cloud_gax::backoff_policy::BackoffPolicyArg;
18use google_cloud_gax::client_builder::Result as BuilderResult;
19use google_cloud_gax::retry_policy::RetryPolicyArg;
20use google_cloud_gax::retry_throttler::RetryThrottlerArg;
21
22/// A builder for [`BasePublisher`].
23///
24/// # Example
25/// ```
26/// # use google_cloud_pubsub::client::BasePublisher;
27/// # async fn sample() -> anyhow::Result<()> {
28/// let builder = BasePublisher::builder();
29/// let client = builder
30///     .with_endpoint("https://pubsub.googleapis.com")
31///     .build()
32///     .await?;
33/// # Ok(()) }
34/// ```
35#[derive(Clone, Debug)]
36pub struct BasePublisherBuilder {
37    pub(super) config: ClientConfig,
38}
39
40impl BasePublisherBuilder {
41    pub(super) fn new() -> Self {
42        let mut config = ClientConfig::default();
43        config.backoff_policy = Some(std::sync::Arc::new(
44            super::backoff_policy::default_backoff_policy(),
45        ));
46        config.retry_policy = Some(std::sync::Arc::new(
47            super::retry_policy::default_retry_policy(),
48        ));
49        Self { config }
50    }
51
52    /// Creates a new client.
53    ///
54    /// # Example
55    /// ```
56    /// # use google_cloud_pubsub::client::BasePublisher;
57    /// # async fn sample() -> anyhow::Result<()> {
58    /// let client = BasePublisher::builder().build().await?;
59    /// # Ok(()) }
60    /// ```
61    pub async fn build(self) -> BuilderResult<BasePublisher> {
62        BasePublisher::new(self).await
63    }
64
65    /// Sets the endpoint.
66    ///
67    /// # Example
68    /// ```
69    /// # use google_cloud_pubsub::client::BasePublisher;
70    /// # async fn sample() -> anyhow::Result<()> {
71    /// let client = BasePublisher::builder()
72    ///     .with_endpoint("https://private.googleapis.com")
73    ///     .build()
74    ///     .await?;
75    /// # Ok(()) }
76    /// ```
77    pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
78        self.config.endpoint = Some(v.into());
79        self
80    }
81
82    /// Enables tracing.
83    ///
84    /// The client libraries can be dynamically instrumented with the Tokio
85    /// [tracing] framework. Setting this flag enables this instrumentation.
86    ///
87    /// # Example
88    /// ```
89    /// # use google_cloud_pubsub::client::BasePublisher;
90    /// # async fn sample() -> anyhow::Result<()> {
91    /// let client = BasePublisher::builder()
92    ///     .with_tracing()
93    ///     .build()
94    ///     .await?;
95    /// # Ok(()) }
96    /// ```
97    ///
98    /// [tracing]: https://docs.rs/tracing/latest/tracing/
99    pub fn with_tracing(mut self) -> Self {
100        self.config.tracing = true;
101        self
102    }
103
104    /// Configure the authentication credentials.
105    ///
106    /// Most Google Cloud services require authentication, though some services
107    /// allow for anonymous access, and some services provide emulators where
108    /// no authentication is required. More information about valid credentials
109    /// types can be found in the [google-cloud-auth] crate documentation.
110    ///
111    /// # Example
112    /// ```
113    /// # use google_cloud_pubsub::client::BasePublisher;
114    /// # async fn sample() -> anyhow::Result<()> {
115    /// use google_cloud_auth::credentials::mds;
116    /// let client = BasePublisher::builder()
117    ///     .with_credentials(
118    ///         mds::Builder::default()
119    ///             .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
120    ///             .build()?)
121    ///     .build()
122    ///     .await?;
123    /// # Ok(()) }
124    /// ```
125    ///
126    /// [google-cloud-auth]: https://docs.rs/google-cloud-auth
127    pub fn with_credentials<V: Into<gaxi::options::Credentials>>(mut self, v: V) -> Self {
128        self.config.cred = Some(v.into());
129        self
130    }
131
132    /// Configure the retry policy.
133    ///
134    /// The client libraries can automatically retry operations that fail. The
135    /// retry policy controls what errors are considered retryable, sets limits
136    /// on the number of attempts or the time trying to make attempts.
137    ///
138    /// # Example
139    /// ```
140    /// # use google_cloud_pubsub::client::BasePublisher;
141    /// # async fn sample() -> anyhow::Result<()> {
142    /// use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
143    /// let client = BasePublisher::builder()
144    ///     .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
145    ///     .build()
146    ///     .await?;
147    /// # Ok(()) };
148    /// ```
149    pub fn with_retry_policy<V: Into<RetryPolicyArg>>(mut self, v: V) -> Self {
150        self.config.retry_policy = Some(v.into().into());
151        self
152    }
153
154    /// Configure the retry backoff policy.
155    ///
156    /// The client libraries can automatically retry operations that fail. The
157    /// backoff policy controls how long to wait in between retry attempts.
158    ///
159    /// # Example
160    /// ```
161    /// # use google_cloud_pubsub::client::BasePublisher;
162    /// # async fn sample() -> anyhow::Result<()> {
163    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
164    /// use std::time::Duration;
165    /// let policy = ExponentialBackoff::default();
166    /// let client = BasePublisher::builder()
167    ///     .with_backoff_policy(policy)
168    ///     .build()
169    ///     .await?;
170    /// # Ok(()) }
171    /// ```
172    pub fn with_backoff_policy<V: Into<BackoffPolicyArg>>(mut self, v: V) -> Self {
173        self.config.backoff_policy = Some(v.into().into());
174        self
175    }
176
177    /// Configure the retry throttler.
178    ///
179    /// Advanced applications may want to configure a retry throttler to
180    /// [Address Cascading Failures] and when [Handling Overload] conditions.
181    /// The client libraries throttle their retry loop, using a policy to
182    /// control the throttling algorithm. Use this method to fine tune or
183    /// customize the default retry throttler.
184    ///
185    /// [Handling Overload]: https://sre.google/sre-book/handling-overload/
186    /// [Address Cascading Failures]: https://sre.google/sre-book/addressing-cascading-failures/
187    ///
188    /// # Example
189    /// ```
190    /// # use google_cloud_pubsub::client::BasePublisher;
191    /// # async fn sample() -> anyhow::Result<()> {
192    /// use google_cloud_gax::retry_throttler::AdaptiveThrottler;
193    /// let client = BasePublisher::builder()
194    ///     .with_retry_throttler(AdaptiveThrottler::default())
195    ///     .build()
196    ///     .await?;
197    /// # Ok(()) };
198    /// ```
199    pub fn with_retry_throttler<V: Into<RetryThrottlerArg>>(mut self, v: V) -> Self {
200        self.config.retry_throttler = v.into().into();
201        self
202    }
203
204    /// Configure the number of gRPC subchannels.
205    ///
206    /// # Example
207    /// ```
208    /// # use google_cloud_pubsub::client::BasePublisher;
209    /// # async fn sample() -> anyhow::Result<()> {
210    /// let client = BasePublisher::builder()
211    ///     .with_grpc_subchannel_count(4)
212    ///     .build()
213    ///     .await?;
214    /// # Ok(()) }
215    /// ```
216    ///
217    /// gRPC-based clients may exhibit high latency if many requests need to be
218    /// demuxed over a single HTTP/2 connection (often called a *subchannel* in
219    /// gRPC).
220    ///
221    /// Consider using more subchannels if your application makes many
222    /// concurrent requests. Consider using fewer subchannels if your
223    /// application needs the file descriptors for other purposes.
224    pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
225        self.config.grpc_subchannel_count = Some(v);
226        self
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
234
235    #[test]
236    fn defaults() -> anyhow::Result<()> {
237        let builder = BasePublisherBuilder::new();
238        assert!(builder.config.endpoint.is_none(), "{builder:?}");
239        assert!(builder.config.cred.is_none(), "{builder:?}");
240        assert!(!builder.config.tracing);
241        assert!(
242            format!("{:?}", &builder.config).contains("AdaptiveThrottler"),
243            "{:?}",
244            builder.config
245        );
246        assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
247        let debug_str = format!("{:?}", &builder.config);
248        assert!(
249            debug_str.contains("initial_delay: 100ms"),
250            "actual: {debug_str}"
251        );
252        assert!(
253            debug_str.contains("maximum_delay: 60s"),
254            "actual: {debug_str}"
255        );
256        assert!(debug_str.contains("scaling: 4.0"), "actual: {debug_str}");
257        assert!(builder.config.retry_policy.is_some(), "{builder:?}");
258        assert!(
259            builder.config.grpc_subchannel_count.is_none(),
260            "{builder:?}"
261        );
262
263        Ok(())
264    }
265
266    #[tokio::test]
267    async fn setters() -> anyhow::Result<()> {
268        use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
269        let builder = BasePublisherBuilder::new()
270            .with_endpoint("test-endpoint.com")
271            .with_credentials(Anonymous::new().build())
272            .with_tracing()
273            .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
274            .with_backoff_policy(
275                google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
276            )
277            .with_retry_throttler(google_cloud_gax::retry_throttler::CircuitBreaker::default())
278            .with_grpc_subchannel_count(16);
279        assert_eq!(
280            builder.config.endpoint,
281            Some("test-endpoint.com".to_string())
282        );
283        assert!(builder.config.cred.is_some(), "{builder:?}");
284        assert!(builder.config.tracing);
285        assert!(
286            format!("{:?}", &builder.config).contains("CircuitBreaker"),
287            "{:?}",
288            builder.config
289        );
290        assert!(builder.config.retry_policy.is_some(), "{builder:?}");
291        assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
292        assert_eq!(builder.config.grpc_subchannel_count, Some(16));
293
294        Ok(())
295    }
296}