google_cloud_pubsub/publisher/client_builder.rs
1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::base_publisher::BasePublisher;
16use gaxi::options::ClientConfig;
17use google_cloud_gax::backoff_policy::BackoffPolicyArg;
18use google_cloud_gax::client_builder::Result as BuilderResult;
19use google_cloud_gax::retry_policy::RetryPolicyArg;
20use google_cloud_gax::retry_throttler::RetryThrottlerArg;
21
22/// A builder for [`BasePublisher`].
23///
24/// # Example
25/// ```
26/// # use google_cloud_pubsub::client::BasePublisher;
27/// # async fn sample() -> anyhow::Result<()> {
28/// let builder = BasePublisher::builder();
29/// let client = builder
30/// .with_endpoint("https://pubsub.googleapis.com")
31/// .build()
32/// .await?;
33/// # Ok(()) }
34/// ```
35#[derive(Clone, Debug)]
36pub struct BasePublisherBuilder {
37 pub(super) config: ClientConfig,
38}
39
40impl BasePublisherBuilder {
41 pub(super) fn new() -> Self {
42 let mut config = ClientConfig::default();
43 config.backoff_policy = Some(std::sync::Arc::new(
44 super::backoff_policy::default_backoff_policy(),
45 ));
46 config.retry_policy = Some(std::sync::Arc::new(
47 super::retry_policy::default_retry_policy(),
48 ));
49 Self { config }
50 }
51
52 /// Creates a new client.
53 ///
54 /// # Example
55 /// ```
56 /// # use google_cloud_pubsub::client::BasePublisher;
57 /// # async fn sample() -> anyhow::Result<()> {
58 /// let client = BasePublisher::builder().build().await?;
59 /// # Ok(()) }
60 /// ```
61 pub async fn build(self) -> BuilderResult<BasePublisher> {
62 BasePublisher::new(self).await
63 }
64
65 /// Sets the endpoint.
66 ///
67 /// # Example
68 /// ```
69 /// # use google_cloud_pubsub::client::BasePublisher;
70 /// # async fn sample() -> anyhow::Result<()> {
71 /// let client = BasePublisher::builder()
72 /// .with_endpoint("https://private.googleapis.com")
73 /// .build()
74 /// .await?;
75 /// # Ok(()) }
76 /// ```
77 pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
78 self.config.endpoint = Some(v.into());
79 self
80 }
81
82 /// Enables tracing.
83 ///
84 /// The client libraries can be dynamically instrumented with the Tokio
85 /// [tracing] framework. Setting this flag enables this instrumentation.
86 ///
87 /// # Example
88 /// ```
89 /// # use google_cloud_pubsub::client::BasePublisher;
90 /// # async fn sample() -> anyhow::Result<()> {
91 /// let client = BasePublisher::builder()
92 /// .with_tracing()
93 /// .build()
94 /// .await?;
95 /// # Ok(()) }
96 /// ```
97 ///
98 /// [tracing]: https://docs.rs/tracing/latest/tracing/
99 pub fn with_tracing(mut self) -> Self {
100 self.config.tracing = true;
101 self
102 }
103
104 /// Configure the authentication credentials.
105 ///
106 /// Most Google Cloud services require authentication, though some services
107 /// allow for anonymous access, and some services provide emulators where
108 /// no authentication is required. More information about valid credentials
109 /// types can be found in the [google-cloud-auth] crate documentation.
110 ///
111 /// # Example
112 /// ```
113 /// # use google_cloud_pubsub::client::BasePublisher;
114 /// # async fn sample() -> anyhow::Result<()> {
115 /// use google_cloud_auth::credentials::mds;
116 /// let client = BasePublisher::builder()
117 /// .with_credentials(
118 /// mds::Builder::default()
119 /// .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
120 /// .build()?)
121 /// .build()
122 /// .await?;
123 /// # Ok(()) }
124 /// ```
125 ///
126 /// [google-cloud-auth]: https://docs.rs/google-cloud-auth
127 pub fn with_credentials<V: Into<gaxi::options::Credentials>>(mut self, v: V) -> Self {
128 self.config.cred = Some(v.into());
129 self
130 }
131
132 /// Configure the retry policy.
133 ///
134 /// The client libraries can automatically retry operations that fail. The
135 /// retry policy controls what errors are considered retryable, sets limits
136 /// on the number of attempts or the time trying to make attempts.
137 ///
138 /// # Example
139 /// ```
140 /// # use google_cloud_pubsub::client::BasePublisher;
141 /// # async fn sample() -> anyhow::Result<()> {
142 /// use google_cloud_gax::retry_policy::RetryPolicyExt;
143 /// use google_cloud_pubsub::retry_policy::RetryableErrors;
144 /// let client = BasePublisher::builder()
145 /// .with_retry_policy(RetryableErrors.with_attempt_limit(3))
146 /// .build()
147 /// .await?;
148 /// # Ok(()) };
149 /// ```
150 pub fn with_retry_policy<V: Into<RetryPolicyArg>>(mut self, v: V) -> Self {
151 self.config.retry_policy = Some(v.into().into());
152 self
153 }
154
155 /// Configure the retry backoff policy.
156 ///
157 /// The client libraries can automatically retry operations that fail. The
158 /// backoff policy controls how long to wait in between retry attempts.
159 ///
160 /// # Example
161 /// ```
162 /// # use google_cloud_pubsub::client::BasePublisher;
163 /// # async fn sample() -> anyhow::Result<()> {
164 /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
165 /// use std::time::Duration;
166 /// let policy = ExponentialBackoff::default();
167 /// let client = BasePublisher::builder()
168 /// .with_backoff_policy(policy)
169 /// .build()
170 /// .await?;
171 /// # Ok(()) }
172 /// ```
173 pub fn with_backoff_policy<V: Into<BackoffPolicyArg>>(mut self, v: V) -> Self {
174 self.config.backoff_policy = Some(v.into().into());
175 self
176 }
177
178 /// Configure the retry throttler.
179 ///
180 /// Advanced applications may want to configure a retry throttler to
181 /// [Address Cascading Failures] and when [Handling Overload] conditions.
182 /// The client libraries throttle their retry loop, using a policy to
183 /// control the throttling algorithm. Use this method to fine tune or
184 /// customize the default retry throttler.
185 ///
186 /// [Handling Overload]: https://sre.google/sre-book/handling-overload/
187 /// [Address Cascading Failures]: https://sre.google/sre-book/addressing-cascading-failures/
188 ///
189 /// # Example
190 /// ```
191 /// # use google_cloud_pubsub::client::BasePublisher;
192 /// # async fn sample() -> anyhow::Result<()> {
193 /// use google_cloud_gax::retry_throttler::AdaptiveThrottler;
194 /// let client = BasePublisher::builder()
195 /// .with_retry_throttler(AdaptiveThrottler::default())
196 /// .build()
197 /// .await?;
198 /// # Ok(()) };
199 /// ```
200 pub fn with_retry_throttler<V: Into<RetryThrottlerArg>>(mut self, v: V) -> Self {
201 self.config.retry_throttler = v.into().into();
202 self
203 }
204
205 /// Configure the number of gRPC subchannels.
206 ///
207 /// # Example
208 /// ```
209 /// # use google_cloud_pubsub::client::BasePublisher;
210 /// # async fn sample() -> anyhow::Result<()> {
211 /// let client = BasePublisher::builder()
212 /// .with_grpc_subchannel_count(4)
213 /// .build()
214 /// .await?;
215 /// # Ok(()) }
216 /// ```
217 ///
218 /// gRPC-based clients may exhibit high latency if many requests need to be
219 /// demuxed over a single HTTP/2 connection (often called a *subchannel* in
220 /// gRPC).
221 ///
222 /// Consider using more subchannels if your application makes many
223 /// concurrent requests. Consider using fewer subchannels if your
224 /// application needs the file descriptors for other purposes.
225 pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
226 self.config.grpc_subchannel_count = Some(v);
227 self
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
235
236 #[test]
237 fn defaults() -> anyhow::Result<()> {
238 let builder = BasePublisherBuilder::new();
239 assert!(builder.config.endpoint.is_none(), "{builder:?}");
240 assert!(builder.config.cred.is_none(), "{builder:?}");
241 assert!(!builder.config.tracing);
242 assert!(
243 format!("{:?}", &builder.config).contains("AdaptiveThrottler"),
244 "{:?}",
245 builder.config
246 );
247 assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
248 let debug_str = format!("{:?}", &builder.config);
249 assert!(
250 debug_str.contains("initial_delay: 100ms"),
251 "actual: {debug_str}"
252 );
253 assert!(
254 debug_str.contains("maximum_delay: 60s"),
255 "actual: {debug_str}"
256 );
257 assert!(debug_str.contains("scaling: 4.0"), "actual: {debug_str}");
258 assert!(builder.config.retry_policy.is_some(), "{builder:?}");
259 assert!(
260 builder.config.grpc_subchannel_count.is_none(),
261 "{builder:?}"
262 );
263
264 Ok(())
265 }
266
267 #[tokio::test]
268 async fn setters() -> anyhow::Result<()> {
269 use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
270 let builder = BasePublisherBuilder::new()
271 .with_endpoint("test-endpoint.com")
272 .with_credentials(Anonymous::new().build())
273 .with_tracing()
274 .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
275 .with_backoff_policy(
276 google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
277 )
278 .with_retry_throttler(google_cloud_gax::retry_throttler::CircuitBreaker::default())
279 .with_grpc_subchannel_count(16);
280 assert_eq!(
281 builder.config.endpoint,
282 Some("test-endpoint.com".to_string())
283 );
284 assert!(builder.config.cred.is_some(), "{builder:?}");
285 assert!(builder.config.tracing);
286 assert!(
287 format!("{:?}", &builder.config).contains("CircuitBreaker"),
288 "{:?}",
289 builder.config
290 );
291 assert!(builder.config.retry_policy.is_some(), "{builder:?}");
292 assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
293 assert_eq!(builder.config.grpc_subchannel_count, Some(16));
294
295 Ok(())
296 }
297}