google_cloud_pubsub/publisher/builder.rs
1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use super::constants::*;
15use super::options::BatchingOptions;
16use crate::client::Publisher;
17use crate::generated::gapic_dataplane::client::Publisher as GapicPublisher;
18use crate::publisher::actor::Dispatcher;
19use crate::publisher::base_publisher::BasePublisher;
20use google_cloud_gax::{
21 backoff_policy::BackoffPolicyArg, retry_policy::RetryPolicyArg,
22 retry_throttler::RetryThrottlerArg,
23};
24use std::time::Duration;
25
26pub use super::base_publisher::BasePublisherBuilder;
27
28/// A builder for a [`Publisher`].
29#[derive(Clone, Debug)]
30pub struct PublisherBuilder {
31 topic: String,
32 batching_options: BatchingOptions,
33 base_builder: BasePublisherBuilder,
34}
35
36impl PublisherBuilder {
37 pub(crate) fn new(topic: String) -> Self {
38 Self {
39 topic,
40 batching_options: BatchingOptions::default(),
41 base_builder: BasePublisher::builder(),
42 }
43 }
44
45 /// Creates a new [`Publisher`] from the builder's configuration.
46 pub async fn build(self) -> crate::ClientBuilderResult<Publisher> {
47 let base_publisher = self.base_builder.build().await?;
48 let publisher = base_publisher
49 .publisher(&self.topic)
50 .set_message_count_threshold(self.batching_options.message_count_threshold)
51 .set_byte_threshold(self.batching_options.byte_threshold)
52 .set_delay_threshold(self.batching_options.delay_threshold)
53 .build();
54 Ok(publisher)
55 }
56
57 /// Sets the message count threshold for batching.
58 ///
59 /// The publisher will send a batch of messages when the number of messages
60 /// in the batch reaches this threshold.
61 ///
62 /// # Example
63 ///
64 /// ```
65 /// # use google_cloud_pubsub::client::Publisher;
66 /// # async fn sample() -> anyhow::Result<()> {
67 /// let publisher = Publisher::builder("projects/my-project/topics/my-topic")
68 /// .set_message_count_threshold(100)
69 /// .build()
70 /// .await?;
71 /// # Ok(()) }
72 /// ```
73 pub fn set_message_count_threshold(mut self, threshold: u32) -> PublisherBuilder {
74 self.batching_options = self.batching_options.set_message_count_threshold(threshold);
75 self
76 }
77
78 /// Sets the byte threshold for batching.
79 ///
80 /// The publisher will send a batch of messages when the total size of the
81 /// messages in the batch reaches this threshold.
82 ///
83 /// # Example
84 ///
85 /// ```
86 /// # use google_cloud_pubsub::client::Publisher;
87 /// # async fn sample() -> anyhow::Result<()> {
88 /// let publisher = Publisher::builder("projects/my-project/topics/my-topic")
89 /// .set_byte_threshold(1024) // 1 KiB
90 /// .build()
91 /// .await?;
92 /// # Ok(()) }
93 /// ```
94 pub fn set_byte_threshold(mut self, threshold: u32) -> PublisherBuilder {
95 self.batching_options = self.batching_options.set_byte_threshold(threshold);
96 self
97 }
98
99 /// Sets the delay threshold for batching.
100 ///
101 /// The publisher will wait a maximum of this amount of time before
102 /// sending a batch of messages.
103 ///
104 /// # Example
105 ///
106 /// ```
107 /// # use google_cloud_pubsub::client::Publisher;
108 /// # use std::time::Duration;
109 /// # async fn sample() -> anyhow::Result<()> {
110 /// let publisher = Publisher::builder("projects/my-project/topics/my-topic")
111 /// .set_delay_threshold(Duration::from_millis(50))
112 /// .build()
113 /// .await?;
114 /// # Ok(()) }
115 /// ```
116 pub fn set_delay_threshold(mut self, threshold: Duration) -> PublisherBuilder {
117 self.batching_options = self.batching_options.set_delay_threshold(threshold);
118 self
119 }
120
121 /// Sets the endpoint.
122 ///
123 /// ```
124 /// # use google_cloud_pubsub::client::Publisher;
125 /// # async fn sample() -> anyhow::Result<()> {
126 /// let client = Publisher::builder("projects/my-project/topics/my-topic")
127 /// .with_endpoint("http://private.googleapis.com")
128 /// .build().await?;
129 /// # Ok(()) }
130 /// ```
131 pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
132 self.base_builder = self.base_builder.with_endpoint(v);
133 self
134 }
135
136 /// Enables tracing.
137 ///
138 /// The client libraries can be dynamically instrumented with the Tokio
139 /// [tracing] framework. Setting this flag enables this instrumentation.
140 ///
141 /// ```
142 /// # use google_cloud_pubsub::client::Publisher;
143 /// # async fn sample() -> anyhow::Result<()> {
144 /// let client = Publisher::builder("projects/my-project/topics/my-topic")
145 /// .with_tracing()
146 /// .build().await?;
147 /// # Ok(()) }
148 /// ```
149 ///
150 /// [tracing]: https://docs.rs/tracing/latest/tracing/
151 pub fn with_tracing(mut self) -> Self {
152 self.base_builder = self.base_builder.with_tracing();
153 self
154 }
155
156 /// Configure the authentication credentials.
157 ///
158 /// Most Google Cloud services require authentication, though some services
159 /// allow for anonymous access, and some services provide emulators where
160 /// no authentication is required. More information about valid credentials
161 /// types can be found in the [google-cloud-auth] crate documentation.
162 ///
163 /// ```
164 /// # use google_cloud_pubsub::client::Publisher;
165 /// # async fn sample() -> anyhow::Result<()> {
166 /// use google_cloud_auth::credentials::mds;
167 /// let client = Publisher::builder("projects/my-project/topics/my-topic")
168 /// .with_credentials(
169 /// mds::Builder::default()
170 /// .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
171 /// .build()?)
172 /// .build().await?;
173 /// # Ok(()) }
174 /// ```
175 ///
176 /// [google-cloud-auth]: https://docs.rs/google-cloud-auth
177 pub fn with_credentials<T: Into<gaxi::options::Credentials>>(mut self, v: T) -> Self {
178 self.base_builder = self.base_builder.with_credentials(v);
179 self
180 }
181
182 /// Configure the retry policy.
183 ///
184 /// The client libraries can automatically retry operations that fail. The
185 /// retry policy controls what errors are considered retryable, sets limits
186 /// on the number of attempts or the time trying to make attempts.
187 ///
188 /// ```
189 /// # use google_cloud_pubsub::client::Publisher;
190 /// # async fn sample() -> anyhow::Result<()> {
191 /// use google_cloud_gax::retry_policy::RetryPolicyExt;
192 /// use google_cloud_pubsub::retry_policy::RetryableErrors;
193 /// let client = Publisher::builder("projects/my-project/topics/my-topic")
194 /// .with_retry_policy(RetryableErrors.with_attempt_limit(3))
195 /// .build().await?;
196 /// # Ok(()) };
197 /// ```
198 pub fn with_retry_policy<V: Into<RetryPolicyArg>>(mut self, v: V) -> Self {
199 self.base_builder = self.base_builder.with_retry_policy(v);
200 self
201 }
202
203 /// Configure the retry backoff policy.
204 ///
205 /// The client libraries can automatically retry operations that fail. The
206 /// backoff policy controls how long to wait in between retry attempts.
207 ///
208 /// ```
209 /// # use google_cloud_pubsub::client::Publisher;
210 /// # async fn sample() -> anyhow::Result<()> {
211 /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
212 /// use std::time::Duration;
213 /// let policy = ExponentialBackoff::default();
214 /// let client = Publisher::builder("projects/my-project/topics/my-topic")
215 /// .with_backoff_policy(policy)
216 /// .build()
217 /// .await?;
218 /// # Ok(()) }
219 /// ```
220 pub fn with_backoff_policy<V: Into<BackoffPolicyArg>>(mut self, v: V) -> Self {
221 self.base_builder = self.base_builder.with_backoff_policy(v);
222 self
223 }
224
225 /// Configure the retry throttler.
226 ///
227 /// Advanced applications may want to configure a retry throttler to
228 /// [Address Cascading Failures] and when [Handling Overload] conditions.
229 /// The client libraries throttle their retry loop, using a policy to
230 /// control the throttling algorithm. Use this method to fine tune or
231 /// customize the default retry throttler.
232 ///
233 /// [Handling Overload]: https://sre.google/sre-book/handling-overload/
234 /// [Address Cascading Failures]: https://sre.google/sre-book/addressing-cascading-failures/
235 ///
236 /// ```
237 /// # use google_cloud_pubsub::client::Publisher;
238 /// # async fn sample() -> anyhow::Result<()> {
239 /// use google_cloud_gax::retry_throttler::AdaptiveThrottler;
240 /// let client = Publisher::builder("projects/my-project/topics/my-topic")
241 /// .with_retry_throttler(AdaptiveThrottler::default())
242 /// .build().await?;
243 /// # Ok(()) };
244 /// ```
245 pub fn with_retry_throttler<V: Into<RetryThrottlerArg>>(mut self, v: V) -> Self {
246 self.base_builder = self.base_builder.with_retry_throttler(v);
247 self
248 }
249
250 /// Configure the number of gRPC subchannels.
251 ///
252 /// # Example
253 /// ```
254 /// # use google_cloud_pubsub::client::Publisher;
255 /// # async fn sample() -> anyhow::Result<()> {
256 /// let client = Publisher::builder("projects/my-project/topics/my-topic")
257 /// .with_grpc_subchannel_count(4)
258 /// .build()
259 /// .await?;
260 /// # Ok(()) }
261 /// ```
262 ///
263 /// gRPC-based clients may exhibit high latency if many requests need to be
264 /// demuxed over a single HTTP/2 connection (often called a *subchannel* in
265 /// gRPC).
266 ///
267 /// Consider using more subchannels if your application makes many
268 /// concurrent requests. Consider using fewer subchannels if your
269 /// application needs the file descriptors for other purposes.
270 pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
271 self.base_builder = self.base_builder.with_grpc_subchannel_count(v);
272 self
273 }
274}
275
276/// Creates [`Publisher`]s with a preconfigured client.
277///
278/// # Example
279///
280/// ```
281/// # async fn sample() -> anyhow::Result<()> {
282/// # use google_cloud_pubsub::*;
283/// # use google_cloud_pubsub::client::BasePublisher;
284/// let client: BasePublisher = BasePublisher::builder().build().await?;
285/// let publisher = client.publisher("projects/my-project/topics/topic").build();
286/// # Ok(()) }
287/// ```
288#[derive(Clone, Debug)]
289pub struct PublisherPartialBuilder {
290 pub(crate) inner: GapicPublisher,
291 topic: String,
292 batching_options: BatchingOptions,
293}
294
295impl PublisherPartialBuilder {
296 /// Creates a new Pub/Sub publisher builder for topic.
297 pub(crate) fn new(client: GapicPublisher, topic: String) -> Self {
298 Self {
299 inner: client,
300 topic,
301 batching_options: BatchingOptions::default(),
302 }
303 }
304
305 /// Sets the message count threshold for batching.
306 ///
307 /// The publisher will send a batch of messages when the number of messages
308 /// in the batch reaches this threshold.
309 ///
310 /// # Example
311 ///
312 /// ```
313 /// # use google_cloud_pubsub::client::BasePublisher;
314 /// # async fn sample() -> anyhow::Result<()> {
315 /// # let client: BasePublisher = BasePublisher::builder().build().await?;
316 /// let publisher = client
317 /// .publisher("projects/my-project/topics/my-topic")
318 /// .set_message_count_threshold(100)
319 /// .build();
320 /// # Ok(()) }
321 /// ```
322 pub fn set_message_count_threshold(mut self, threshold: u32) -> PublisherPartialBuilder {
323 self.batching_options = self.batching_options.set_message_count_threshold(threshold);
324 self
325 }
326
327 /// Sets the byte threshold for batching.
328 ///
329 /// The publisher will send a batch of messages when the total size of the
330 /// messages in the batch reaches this threshold.
331 ///
332 /// # Example
333 ///
334 /// ```
335 /// # use google_cloud_pubsub::client::BasePublisher;
336 /// # async fn sample() -> anyhow::Result<()> {
337 /// # let client: BasePublisher = BasePublisher::builder().build().await?;
338 /// let publisher = client
339 /// .publisher("projects/my-project/topics/my-topic")
340 /// .set_byte_threshold(1024) // 1 KiB
341 /// .build();
342 /// # Ok(()) }
343 /// ```
344 pub fn set_byte_threshold(mut self, threshold: u32) -> PublisherPartialBuilder {
345 self.batching_options = self.batching_options.set_byte_threshold(threshold);
346 self
347 }
348
349 /// Sets the delay threshold for batching.
350 ///
351 /// The publisher will wait a maximum of this amount of time before
352 /// sending a batch of messages.
353 ///
354 /// # Example
355 ///
356 /// ```
357 /// # use google_cloud_pubsub::client::BasePublisher;
358 /// # use std::time::Duration;
359 /// # async fn sample() -> anyhow::Result<()> {
360 /// # let client: BasePublisher = BasePublisher::builder().build().await?;
361 /// let publisher = client
362 /// .publisher("projects/my-project/topics/my-topic")
363 /// .set_delay_threshold(Duration::from_millis(50))
364 /// .build();
365 /// # Ok(()) }
366 /// ```
367 pub fn set_delay_threshold(mut self, threshold: Duration) -> PublisherPartialBuilder {
368 self.batching_options = self.batching_options.set_delay_threshold(threshold);
369 self
370 }
371
372 /// Creates a new [`Publisher`] from the builder's configuration.
373 pub fn build(self) -> Publisher {
374 self.build_return_handle().0
375 }
376
377 // This method starts a background task to manage the batching
378 // and sending of messages. The returned `Publisher` is a
379 // lightweight handle for sending messages to that background task
380 // over a channel.
381 //
382 // This also returns a handle to the background task, which can be
383 // used in testing to manage the task's lifecycle.
384 pub(crate) fn build_return_handle(self) -> (Publisher, tokio::task::JoinHandle<()>) {
385 // Enforce limits by clamping the user-provided options.
386 let batching_options = BatchingOptions::new()
387 .set_delay_threshold(
388 self.batching_options
389 .delay_threshold
390 .clamp(Duration::ZERO, MAX_DELAY),
391 )
392 .set_message_count_threshold(
393 self.batching_options
394 .message_count_threshold
395 .clamp(0, MAX_MESSAGES),
396 )
397 .set_byte_threshold(self.batching_options.byte_threshold.clamp(0, MAX_BYTES));
398
399 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
400 // Create the Dispatcher that will run in the background.
401 // We don't need to keep track of a handle to the dispatcher.
402 // Dropping the Publisher will drop the only sender to the channel.
403 // This will cause the dispatcher to gracefully exit.
404 let dispatcher = Dispatcher::new(self.topic, self.inner, batching_options.clone(), rx);
405 let handle = tokio::spawn(dispatcher.run());
406
407 (
408 Publisher {
409 batching_options,
410 tx,
411 },
412 handle,
413 )
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420
421 #[tokio::test]
422 async fn builder() -> anyhow::Result<()> {
423 let client: BasePublisher = BasePublisher::builder().build().await?;
424 let builder = client.publisher("projects/my-project/topics/my-topic");
425 let publisher = builder.set_message_count_threshold(1_u32).build();
426 assert_eq!(publisher.batching_options.message_count_threshold, 1_u32);
427
428 let publisher = Publisher::builder("projects/my-project/topics/my-topic")
429 .set_message_count_threshold(1_u32)
430 .build()
431 .await?;
432 assert_eq!(publisher.batching_options.message_count_threshold, 1_u32);
433 Ok(())
434 }
435
436 #[tokio::test]
437 async fn default_batching() -> anyhow::Result<()> {
438 // Test that default values for BasePublisher and Publisher are the same.
439 let topic_name = "projects/my-project/topics/my-topic";
440 let publishers = vec![
441 BasePublisher::builder()
442 .build()
443 .await?
444 .publisher(topic_name)
445 .build(),
446 Publisher::builder(topic_name).build().await?,
447 ];
448
449 for publisher in publishers {
450 assert_eq!(
451 publisher.batching_options.message_count_threshold,
452 BatchingOptions::default().message_count_threshold
453 );
454 assert_eq!(
455 publisher.batching_options.byte_threshold,
456 BatchingOptions::default().byte_threshold
457 );
458 assert_eq!(
459 publisher.batching_options.delay_threshold,
460 BatchingOptions::default().delay_threshold
461 );
462 }
463 Ok(())
464 }
465
466 fn assert_eq_client_config(
467 pub_config: &gaxi::options::ClientConfig,
468 base_config: &gaxi::options::ClientConfig,
469 ) {
470 assert_eq!(pub_config.endpoint, base_config.endpoint);
471 assert_eq!(pub_config.cred.is_some(), base_config.cred.is_some());
472 assert_eq!(pub_config.tracing, base_config.tracing);
473 assert_eq!(
474 pub_config.retry_policy.is_some(),
475 base_config.retry_policy.is_some()
476 );
477 assert_eq!(
478 pub_config.backoff_policy.is_some(),
479 base_config.backoff_policy.is_some()
480 );
481 assert_eq!(
482 pub_config.grpc_subchannel_count,
483 base_config.grpc_subchannel_count
484 );
485 }
486
487 #[test]
488 fn publisher_has_default_client_config() {
489 let pub_builder = Publisher::builder("projects/my-project/topics/my-topic");
490 let base_builder = BasePublisher::builder();
491 let pub_config = &pub_builder.base_builder.config;
492 let base_config = &base_builder.config;
493
494 assert_eq_client_config(pub_config, base_config);
495 }
496
497 #[tokio::test]
498 async fn publisher_builder_sets_client_config() -> anyhow::Result<()> {
499 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
500
501 use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
502 let throttler = google_cloud_gax::retry_throttler::CircuitBreaker::default();
503 let pub_builder = Publisher::builder("projects/my-project/topics/my-topic")
504 .with_endpoint("test-endpoint.com")
505 .with_credentials(Anonymous::new().build())
506 .with_tracing()
507 .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
508 .with_backoff_policy(
509 google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
510 )
511 .with_retry_throttler(throttler.clone())
512 .with_grpc_subchannel_count(16);
513 let base_builder = BasePublisher::builder()
514 .with_endpoint("test-endpoint.com")
515 .with_credentials(Anonymous::new().build())
516 .with_tracing()
517 .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
518 .with_backoff_policy(
519 google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
520 )
521 .with_retry_throttler(throttler)
522 .with_grpc_subchannel_count(16);
523
524 let pub_config = &pub_builder.base_builder.config;
525 let base_config = &base_builder.config;
526
527 assert_eq_client_config(pub_config, base_config);
528
529 Ok(())
530 }
531}