Skip to main content

google_cloud_pubsub/publisher/
client_builder.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::base_publisher::BasePublisher;
16use gaxi::options::ClientConfig;
17use google_cloud_gax::backoff_policy::BackoffPolicyArg;
18use google_cloud_gax::client_builder::Result as BuilderResult;
19use google_cloud_gax::retry_policy::RetryPolicyArg;
20use google_cloud_gax::retry_throttler::RetryThrottlerArg;
21
22/// A builder for [`BasePublisher`].
23///
24/// # Example
25/// ```
26/// # use google_cloud_pubsub::client::BasePublisher;
27/// # async fn sample() -> anyhow::Result<()> {
28/// let builder = BasePublisher::builder();
29/// let client = builder
30///     .with_endpoint("https://pubsub.googleapis.com")
31///     .build()
32///     .await?;
33/// # Ok(()) }
34/// ```
35#[derive(Clone, Debug)]
36pub struct BasePublisherBuilder {
37    pub(super) config: ClientConfig,
38}
39
40impl BasePublisherBuilder {
41    pub(super) fn new() -> Self {
42        let mut config = ClientConfig::default();
43        config.backoff_policy = Some(std::sync::Arc::new(
44            super::backoff_policy::default_backoff_policy(),
45        ));
46        config.retry_policy = Some(std::sync::Arc::new(
47            super::retry_policy::default_retry_policy(),
48        ));
49        Self { config }
50    }
51
52    /// Creates a new client.
53    ///
54    /// # Example
55    /// ```
56    /// # use google_cloud_pubsub::client::BasePublisher;
57    /// # async fn sample() -> anyhow::Result<()> {
58    /// let client = BasePublisher::builder().build().await?;
59    /// # Ok(()) }
60    /// ```
61    pub async fn build(self) -> BuilderResult<BasePublisher> {
62        BasePublisher::new(self).await
63    }
64
65    /// Sets the endpoint.
66    ///
67    /// # Example
68    /// ```
69    /// # use google_cloud_pubsub::client::BasePublisher;
70    /// # async fn sample() -> anyhow::Result<()> {
71    /// let client = BasePublisher::builder()
72    ///     .with_endpoint("https://private.googleapis.com")
73    ///     .build()
74    ///     .await?;
75    /// # Ok(()) }
76    /// ```
77    pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
78        self.config.endpoint = Some(v.into());
79        self
80    }
81
82    /// Enables tracing.
83    ///
84    /// The client libraries can be dynamically instrumented with the Tokio
85    /// [tracing] framework. Setting this flag enables this instrumentation.
86    ///
87    /// # Example
88    /// ```
89    /// # use google_cloud_pubsub::client::BasePublisher;
90    /// # async fn sample() -> anyhow::Result<()> {
91    /// let client = BasePublisher::builder()
92    ///     .with_tracing()
93    ///     .build()
94    ///     .await?;
95    /// # Ok(()) }
96    /// ```
97    ///
98    /// [tracing]: https://docs.rs/tracing/latest/tracing/
99    pub fn with_tracing(mut self) -> Self {
100        self.config.tracing = true;
101        self
102    }
103
104    /// Configure the authentication credentials.
105    ///
106    /// Most Google Cloud services require authentication, though some services
107    /// allow for anonymous access, and some services provide emulators where
108    /// no authentication is required. More information about valid credentials
109    /// types can be found in the [google-cloud-auth] crate documentation.
110    ///
111    /// # Example
112    /// ```
113    /// # use google_cloud_pubsub::client::BasePublisher;
114    /// # async fn sample() -> anyhow::Result<()> {
115    /// use google_cloud_auth::credentials::mds;
116    /// let client = BasePublisher::builder()
117    ///     .with_credentials(
118    ///         mds::Builder::default()
119    ///             .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
120    ///             .build()?)
121    ///     .build()
122    ///     .await?;
123    /// # Ok(()) }
124    /// ```
125    ///
126    /// [google-cloud-auth]: https://docs.rs/google-cloud-auth
127    pub fn with_credentials<V: Into<gaxi::options::Credentials>>(mut self, v: V) -> Self {
128        self.config.cred = Some(v.into());
129        self
130    }
131
132    /// Configure the retry policy.
133    ///
134    /// The client libraries can automatically retry operations that fail. The
135    /// retry policy controls what errors are considered retryable, sets limits
136    /// on the number of attempts or the time trying to make attempts.
137    ///
138    /// # Example
139    /// ```
140    /// # use google_cloud_pubsub::client::BasePublisher;
141    /// # async fn sample() -> anyhow::Result<()> {
142    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
143    /// use google_cloud_pubsub::retry_policy::RetryableErrors;
144    /// let client = BasePublisher::builder()
145    ///     .with_retry_policy(RetryableErrors.with_attempt_limit(3))
146    ///     .build()
147    ///     .await?;
148    /// # Ok(()) };
149    /// ```
150    pub fn with_retry_policy<V: Into<RetryPolicyArg>>(mut self, v: V) -> Self {
151        self.config.retry_policy = Some(v.into().into());
152        self
153    }
154
155    /// Configure the retry backoff policy.
156    ///
157    /// The client libraries can automatically retry operations that fail. The
158    /// backoff policy controls how long to wait in between retry attempts.
159    ///
160    /// # Example
161    /// ```
162    /// # use google_cloud_pubsub::client::BasePublisher;
163    /// # async fn sample() -> anyhow::Result<()> {
164    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
165    /// use std::time::Duration;
166    /// let policy = ExponentialBackoff::default();
167    /// let client = BasePublisher::builder()
168    ///     .with_backoff_policy(policy)
169    ///     .build()
170    ///     .await?;
171    /// # Ok(()) }
172    /// ```
173    pub fn with_backoff_policy<V: Into<BackoffPolicyArg>>(mut self, v: V) -> Self {
174        self.config.backoff_policy = Some(v.into().into());
175        self
176    }
177
178    /// Configure the retry throttler.
179    ///
180    /// Advanced applications may want to configure a retry throttler to
181    /// [Address Cascading Failures] and when [Handling Overload] conditions.
182    /// The client libraries throttle their retry loop, using a policy to
183    /// control the throttling algorithm. Use this method to fine tune or
184    /// customize the default retry throttler.
185    ///
186    /// [Handling Overload]: https://sre.google/sre-book/handling-overload/
187    /// [Address Cascading Failures]: https://sre.google/sre-book/addressing-cascading-failures/
188    ///
189    /// # Example
190    /// ```
191    /// # use google_cloud_pubsub::client::BasePublisher;
192    /// # async fn sample() -> anyhow::Result<()> {
193    /// use google_cloud_gax::retry_throttler::AdaptiveThrottler;
194    /// let client = BasePublisher::builder()
195    ///     .with_retry_throttler(AdaptiveThrottler::default())
196    ///     .build()
197    ///     .await?;
198    /// # Ok(()) };
199    /// ```
200    pub fn with_retry_throttler<V: Into<RetryThrottlerArg>>(mut self, v: V) -> Self {
201        self.config.retry_throttler = v.into().into();
202        self
203    }
204
205    /// Configure the number of gRPC subchannels.
206    ///
207    /// # Example
208    /// ```
209    /// # use google_cloud_pubsub::client::BasePublisher;
210    /// # async fn sample() -> anyhow::Result<()> {
211    /// let client = BasePublisher::builder()
212    ///     .with_grpc_subchannel_count(4)
213    ///     .build()
214    ///     .await?;
215    /// # Ok(()) }
216    /// ```
217    ///
218    /// gRPC-based clients may exhibit high latency if many requests need to be
219    /// demuxed over a single HTTP/2 connection (often called a *subchannel* in
220    /// gRPC).
221    ///
222    /// Consider using more subchannels if your application makes many
223    /// concurrent requests. Consider using fewer subchannels if your
224    /// application needs the file descriptors for other purposes.
225    pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
226        self.config.grpc_subchannel_count = Some(v);
227        self
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
235
236    #[test]
237    fn defaults() -> anyhow::Result<()> {
238        let builder = BasePublisherBuilder::new();
239        assert!(builder.config.endpoint.is_none(), "{builder:?}");
240        assert!(builder.config.cred.is_none(), "{builder:?}");
241        assert!(!builder.config.tracing);
242        assert!(
243            format!("{:?}", &builder.config).contains("AdaptiveThrottler"),
244            "{:?}",
245            builder.config
246        );
247        assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
248        let debug_str = format!("{:?}", &builder.config);
249        assert!(
250            debug_str.contains("initial_delay: 100ms"),
251            "actual: {debug_str}"
252        );
253        assert!(
254            debug_str.contains("maximum_delay: 60s"),
255            "actual: {debug_str}"
256        );
257        assert!(debug_str.contains("scaling: 4.0"), "actual: {debug_str}");
258        assert!(builder.config.retry_policy.is_some(), "{builder:?}");
259        assert!(
260            builder.config.grpc_subchannel_count.is_none(),
261            "{builder:?}"
262        );
263
264        Ok(())
265    }
266
267    #[tokio::test]
268    async fn setters() -> anyhow::Result<()> {
269        use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
270        let builder = BasePublisherBuilder::new()
271            .with_endpoint("test-endpoint.com")
272            .with_credentials(Anonymous::new().build())
273            .with_tracing()
274            .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
275            .with_backoff_policy(
276                google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
277            )
278            .with_retry_throttler(google_cloud_gax::retry_throttler::CircuitBreaker::default())
279            .with_grpc_subchannel_count(16);
280        assert_eq!(
281            builder.config.endpoint,
282            Some("test-endpoint.com".to_string())
283        );
284        assert!(builder.config.cred.is_some(), "{builder:?}");
285        assert!(builder.config.tracing);
286        assert!(
287            format!("{:?}", &builder.config).contains("CircuitBreaker"),
288            "{:?}",
289            builder.config
290        );
291        assert!(builder.config.retry_policy.is_some(), "{builder:?}");
292        assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
293        assert_eq!(builder.config.grpc_subchannel_count, Some(16));
294
295        Ok(())
296    }
297}