google_cloud_pubsub/publisher/client_builder.rs
1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::base_publisher::BasePublisher;
16use gaxi::options::ClientConfig;
17use google_cloud_gax::backoff_policy::BackoffPolicyArg;
18use google_cloud_gax::client_builder::Result as BuilderResult;
19use google_cloud_gax::retry_policy::RetryPolicyArg;
20use google_cloud_gax::retry_throttler::RetryThrottlerArg;
21
22/// A builder for [`BasePublisher`].
23///
24/// # Example
25/// ```
26/// # use google_cloud_pubsub::client::BasePublisher;
27/// # async fn sample() -> anyhow::Result<()> {
28/// let builder = BasePublisher::builder();
29/// let client = builder
30/// .with_endpoint("https://pubsub.googleapis.com")
31/// .build()
32/// .await?;
33/// # Ok(()) }
34/// ```
35#[derive(Clone, Debug)]
36pub struct BasePublisherBuilder {
37 pub(super) config: ClientConfig,
38}
39
40impl BasePublisherBuilder {
41 pub(super) fn new() -> Self {
42 let mut config = ClientConfig::default();
43 config.backoff_policy = Some(std::sync::Arc::new(
44 super::backoff_policy::default_backoff_policy(),
45 ));
46 config.retry_policy = Some(std::sync::Arc::new(
47 super::retry_policy::default_retry_policy(),
48 ));
49 Self { config }
50 }
51
52 /// Creates a new client.
53 ///
54 /// # Example
55 /// ```
56 /// # use google_cloud_pubsub::client::BasePublisher;
57 /// # async fn sample() -> anyhow::Result<()> {
58 /// let client = BasePublisher::builder().build().await?;
59 /// # Ok(()) }
60 /// ```
61 pub async fn build(self) -> BuilderResult<BasePublisher> {
62 BasePublisher::new(self).await
63 }
64
65 /// Sets the endpoint.
66 ///
67 /// # Example
68 /// ```
69 /// # use google_cloud_pubsub::client::BasePublisher;
70 /// # async fn sample() -> anyhow::Result<()> {
71 /// let client = BasePublisher::builder()
72 /// .with_endpoint("https://private.googleapis.com")
73 /// .build()
74 /// .await?;
75 /// # Ok(()) }
76 /// ```
77 pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
78 self.config.endpoint = Some(v.into());
79 self
80 }
81
82 /// Enables tracing.
83 ///
84 /// The client libraries can be dynamically instrumented with the Tokio
85 /// [tracing] framework. Setting this flag enables this instrumentation.
86 ///
87 /// # Example
88 /// ```
89 /// # use google_cloud_pubsub::client::BasePublisher;
90 /// # async fn sample() -> anyhow::Result<()> {
91 /// let client = BasePublisher::builder()
92 /// .with_tracing()
93 /// .build()
94 /// .await?;
95 /// # Ok(()) }
96 /// ```
97 ///
98 /// [tracing]: https://docs.rs/tracing/latest/tracing/
99 pub fn with_tracing(mut self) -> Self {
100 self.config.tracing = true;
101 self
102 }
103
104 /// Configure the authentication credentials.
105 ///
106 /// Most Google Cloud services require authentication, though some services
107 /// allow for anonymous access, and some services provide emulators where
108 /// no authentication is required. More information about valid credentials
109 /// types can be found in the [google-cloud-auth] crate documentation.
110 ///
111 /// # Example
112 /// ```
113 /// # use google_cloud_pubsub::client::BasePublisher;
114 /// # async fn sample() -> anyhow::Result<()> {
115 /// use google_cloud_auth::credentials::mds;
116 /// let client = BasePublisher::builder()
117 /// .with_credentials(
118 /// mds::Builder::default()
119 /// .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
120 /// .build()?)
121 /// .build()
122 /// .await?;
123 /// # Ok(()) }
124 /// ```
125 ///
126 /// [google-cloud-auth]: https://docs.rs/google-cloud-auth
127 pub fn with_credentials<V: Into<gaxi::options::Credentials>>(mut self, v: V) -> Self {
128 self.config.cred = Some(v.into());
129 self
130 }
131
132 /// Configure the retry policy.
133 ///
134 /// The client libraries can automatically retry operations that fail. The
135 /// retry policy controls what errors are considered retryable, sets limits
136 /// on the number of attempts or the time trying to make attempts.
137 ///
138 /// # Example
139 /// ```
140 /// # use google_cloud_pubsub::client::BasePublisher;
141 /// # async fn sample() -> anyhow::Result<()> {
142 /// use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
143 /// let client = BasePublisher::builder()
144 /// .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
145 /// .build()
146 /// .await?;
147 /// # Ok(()) };
148 /// ```
149 pub fn with_retry_policy<V: Into<RetryPolicyArg>>(mut self, v: V) -> Self {
150 self.config.retry_policy = Some(v.into().into());
151 self
152 }
153
154 /// Configure the retry backoff policy.
155 ///
156 /// The client libraries can automatically retry operations that fail. The
157 /// backoff policy controls how long to wait in between retry attempts.
158 ///
159 /// # Example
160 /// ```
161 /// # use google_cloud_pubsub::client::BasePublisher;
162 /// # async fn sample() -> anyhow::Result<()> {
163 /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
164 /// use std::time::Duration;
165 /// let policy = ExponentialBackoff::default();
166 /// let client = BasePublisher::builder()
167 /// .with_backoff_policy(policy)
168 /// .build()
169 /// .await?;
170 /// # Ok(()) }
171 /// ```
172 pub fn with_backoff_policy<V: Into<BackoffPolicyArg>>(mut self, v: V) -> Self {
173 self.config.backoff_policy = Some(v.into().into());
174 self
175 }
176
177 /// Configure the retry throttler.
178 ///
179 /// Advanced applications may want to configure a retry throttler to
180 /// [Address Cascading Failures] and when [Handling Overload] conditions.
181 /// The client libraries throttle their retry loop, using a policy to
182 /// control the throttling algorithm. Use this method to fine tune or
183 /// customize the default retry throttler.
184 ///
185 /// [Handling Overload]: https://sre.google/sre-book/handling-overload/
186 /// [Address Cascading Failures]: https://sre.google/sre-book/addressing-cascading-failures/
187 ///
188 /// # Example
189 /// ```
190 /// # use google_cloud_pubsub::client::BasePublisher;
191 /// # async fn sample() -> anyhow::Result<()> {
192 /// use google_cloud_gax::retry_throttler::AdaptiveThrottler;
193 /// let client = BasePublisher::builder()
194 /// .with_retry_throttler(AdaptiveThrottler::default())
195 /// .build()
196 /// .await?;
197 /// # Ok(()) };
198 /// ```
199 pub fn with_retry_throttler<V: Into<RetryThrottlerArg>>(mut self, v: V) -> Self {
200 self.config.retry_throttler = v.into().into();
201 self
202 }
203
204 /// Configure the number of gRPC subchannels.
205 ///
206 /// # Example
207 /// ```
208 /// # use google_cloud_pubsub::client::BasePublisher;
209 /// # async fn sample() -> anyhow::Result<()> {
210 /// let client = BasePublisher::builder()
211 /// .with_grpc_subchannel_count(4)
212 /// .build()
213 /// .await?;
214 /// # Ok(()) }
215 /// ```
216 ///
217 /// gRPC-based clients may exhibit high latency if many requests need to be
218 /// demuxed over a single HTTP/2 connection (often called a *subchannel* in
219 /// gRPC).
220 ///
221 /// Consider using more subchannels if your application makes many
222 /// concurrent requests. Consider using fewer subchannels if your
223 /// application needs the file descriptors for other purposes.
224 pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
225 self.config.grpc_subchannel_count = Some(v);
226 self
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
234
235 #[test]
236 fn defaults() -> anyhow::Result<()> {
237 let builder = BasePublisherBuilder::new();
238 assert!(builder.config.endpoint.is_none(), "{builder:?}");
239 assert!(builder.config.cred.is_none(), "{builder:?}");
240 assert!(!builder.config.tracing);
241 assert!(
242 format!("{:?}", &builder.config).contains("AdaptiveThrottler"),
243 "{:?}",
244 builder.config
245 );
246 assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
247 let debug_str = format!("{:?}", &builder.config);
248 assert!(
249 debug_str.contains("initial_delay: 100ms"),
250 "actual: {debug_str}"
251 );
252 assert!(
253 debug_str.contains("maximum_delay: 60s"),
254 "actual: {debug_str}"
255 );
256 assert!(debug_str.contains("scaling: 4.0"), "actual: {debug_str}");
257 assert!(builder.config.retry_policy.is_some(), "{builder:?}");
258 assert!(
259 builder.config.grpc_subchannel_count.is_none(),
260 "{builder:?}"
261 );
262
263 Ok(())
264 }
265
266 #[tokio::test]
267 async fn setters() -> anyhow::Result<()> {
268 use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
269 let builder = BasePublisherBuilder::new()
270 .with_endpoint("test-endpoint.com")
271 .with_credentials(Anonymous::new().build())
272 .with_tracing()
273 .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
274 .with_backoff_policy(
275 google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
276 )
277 .with_retry_throttler(google_cloud_gax::retry_throttler::CircuitBreaker::default())
278 .with_grpc_subchannel_count(16);
279 assert_eq!(
280 builder.config.endpoint,
281 Some("test-endpoint.com".to_string())
282 );
283 assert!(builder.config.cred.is_some(), "{builder:?}");
284 assert!(builder.config.tracing);
285 assert!(
286 format!("{:?}", &builder.config).contains("CircuitBreaker"),
287 "{:?}",
288 builder.config
289 );
290 assert!(builder.config.retry_policy.is_some(), "{builder:?}");
291 assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
292 assert_eq!(builder.config.grpc_subchannel_count, Some(16));
293
294 Ok(())
295 }
296}