Skip to main content

google_cloud_pubsub/publisher/
builder.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use super::constants::*;
15use super::options::BatchingOptions;
16use crate::client::Publisher;
17use crate::generated::gapic_dataplane::client::Publisher as GapicPublisher;
18use crate::publisher::actor::Dispatcher;
19use crate::publisher::base_publisher::BasePublisher;
20use google_cloud_gax::{
21    backoff_policy::BackoffPolicyArg, retry_policy::RetryPolicyArg,
22    retry_throttler::RetryThrottlerArg,
23};
24use std::time::Duration;
25
26pub use super::base_publisher::BasePublisherBuilder;
27
28/// A builder for a [`Publisher`].
29#[derive(Clone, Debug)]
30pub struct PublisherBuilder {
31    topic: String,
32    batching_options: BatchingOptions,
33    base_builder: BasePublisherBuilder,
34}
35
36impl PublisherBuilder {
37    pub(crate) fn new(topic: String) -> Self {
38        Self {
39            topic,
40            batching_options: BatchingOptions::default(),
41            base_builder: BasePublisher::builder(),
42        }
43    }
44
45    /// Creates a new [`Publisher`] from the builder's configuration.
46    pub async fn build(self) -> crate::ClientBuilderResult<Publisher> {
47        let base_publisher = self.base_builder.build().await?;
48        let publisher = base_publisher
49            .publisher(&self.topic)
50            .set_message_count_threshold(self.batching_options.message_count_threshold)
51            .set_byte_threshold(self.batching_options.byte_threshold)
52            .set_delay_threshold(self.batching_options.delay_threshold)
53            .build();
54        Ok(publisher)
55    }
56
57    /// Sets the message count threshold for batching.
58    ///
59    /// The publisher will send a batch of messages when the number of messages
60    /// in the batch reaches this threshold.
61    ///
62    /// # Example
63    ///
64    /// ```
65    /// # use google_cloud_pubsub::client::Publisher;
66    /// # async fn sample() -> anyhow::Result<()> {
67    /// let publisher = Publisher::builder("projects/my-project/topics/my-topic")
68    ///     .set_message_count_threshold(100)
69    ///     .build()
70    ///     .await?;
71    /// # Ok(()) }
72    /// ```
73    pub fn set_message_count_threshold(mut self, threshold: u32) -> PublisherBuilder {
74        self.batching_options = self.batching_options.set_message_count_threshold(threshold);
75        self
76    }
77
78    /// Sets the byte threshold for batching.
79    ///
80    /// The publisher will send a batch of messages when the total size of the
81    /// messages in the batch reaches this threshold.
82    ///
83    /// # Example
84    ///
85    /// ```
86    /// # use google_cloud_pubsub::client::Publisher;
87    /// # async fn sample() -> anyhow::Result<()> {
88    /// let publisher = Publisher::builder("projects/my-project/topics/my-topic")
89    ///     .set_byte_threshold(1024) // 1 KiB
90    ///     .build()
91    ///     .await?;
92    /// # Ok(()) }
93    /// ```
94    pub fn set_byte_threshold(mut self, threshold: u32) -> PublisherBuilder {
95        self.batching_options = self.batching_options.set_byte_threshold(threshold);
96        self
97    }
98
99    /// Sets the delay threshold for batching.
100    ///
101    /// The publisher will wait a maximum of this amount of time before
102    /// sending a batch of messages.
103    ///
104    /// # Example
105    ///
106    /// ```
107    /// # use google_cloud_pubsub::client::Publisher;
108    /// # use std::time::Duration;
109    /// # async fn sample() -> anyhow::Result<()> {
110    /// let publisher = Publisher::builder("projects/my-project/topics/my-topic")
111    ///     .set_delay_threshold(Duration::from_millis(50))
112    ///     .build()
113    ///     .await?;
114    /// # Ok(()) }
115    /// ```
116    pub fn set_delay_threshold(mut self, threshold: Duration) -> PublisherBuilder {
117        self.batching_options = self.batching_options.set_delay_threshold(threshold);
118        self
119    }
120
121    /// Sets the endpoint.
122    ///
123    /// ```
124    /// # use google_cloud_pubsub::client::Publisher;
125    /// # async fn sample() -> anyhow::Result<()> {
126    /// let client = Publisher::builder("projects/my-project/topics/my-topic")
127    ///     .with_endpoint("http://private.googleapis.com")
128    ///     .build().await?;
129    /// # Ok(()) }
130    /// ```
131    pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
132        self.base_builder = self.base_builder.with_endpoint(v);
133        self
134    }
135
136    /// Enables tracing.
137    ///
138    /// The client libraries can be dynamically instrumented with the Tokio
139    /// [tracing] framework. Setting this flag enables this instrumentation.
140    ///
141    /// ```
142    /// # use google_cloud_pubsub::client::Publisher;
143    /// # async fn sample() -> anyhow::Result<()> {
144    /// let client = Publisher::builder("projects/my-project/topics/my-topic")
145    ///     .with_tracing()
146    ///     .build().await?;
147    /// # Ok(()) }
148    /// ```
149    ///
150    /// [tracing]: https://docs.rs/tracing/latest/tracing/
151    pub fn with_tracing(mut self) -> Self {
152        self.base_builder = self.base_builder.with_tracing();
153        self
154    }
155
156    /// Configure the authentication credentials.
157    ///
158    /// Most Google Cloud services require authentication, though some services
159    /// allow for anonymous access, and some services provide emulators where
160    /// no authentication is required. More information about valid credentials
161    /// types can be found in the [google-cloud-auth] crate documentation.
162    ///
163    /// ```
164    /// # use google_cloud_pubsub::client::Publisher;
165    /// # async fn sample() -> anyhow::Result<()> {
166    /// use google_cloud_auth::credentials::mds;
167    /// let client = Publisher::builder("projects/my-project/topics/my-topic")
168    ///     .with_credentials(
169    ///         mds::Builder::default()
170    ///             .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
171    ///             .build()?)
172    ///     .build().await?;
173    /// # Ok(()) }
174    /// ```
175    ///
176    /// [google-cloud-auth]: https://docs.rs/google-cloud-auth
177    pub fn with_credentials<T: Into<gaxi::options::Credentials>>(mut self, v: T) -> Self {
178        self.base_builder = self.base_builder.with_credentials(v);
179        self
180    }
181
182    /// Configure the retry policy.
183    ///
184    /// The client libraries can automatically retry operations that fail. The
185    /// retry policy controls what errors are considered retryable, sets limits
186    /// on the number of attempts or the time trying to make attempts.
187    ///
188    /// ```
189    /// # use google_cloud_pubsub::client::Publisher;
190    /// # async fn sample() -> anyhow::Result<()> {
191    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
192    /// use google_cloud_pubsub::retry_policy::RetryableErrors;
193    /// let client = Publisher::builder("projects/my-project/topics/my-topic")
194    ///     .with_retry_policy(RetryableErrors.with_attempt_limit(3))
195    ///     .build().await?;
196    /// # Ok(()) };
197    /// ```
198    pub fn with_retry_policy<V: Into<RetryPolicyArg>>(mut self, v: V) -> Self {
199        self.base_builder = self.base_builder.with_retry_policy(v);
200        self
201    }
202
203    /// Configure the retry backoff policy.
204    ///
205    /// The client libraries can automatically retry operations that fail. The
206    /// backoff policy controls how long to wait in between retry attempts.
207    ///
208    /// ```
209    /// # use google_cloud_pubsub::client::Publisher;
210    /// # async fn sample() -> anyhow::Result<()> {
211    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
212    /// use std::time::Duration;
213    /// let policy = ExponentialBackoff::default();
214    /// let client = Publisher::builder("projects/my-project/topics/my-topic")
215    ///     .with_backoff_policy(policy)
216    ///     .build()
217    ///     .await?;
218    /// # Ok(()) }
219    /// ```
220    pub fn with_backoff_policy<V: Into<BackoffPolicyArg>>(mut self, v: V) -> Self {
221        self.base_builder = self.base_builder.with_backoff_policy(v);
222        self
223    }
224
225    /// Configure the retry throttler.
226    ///
227    /// Advanced applications may want to configure a retry throttler to
228    /// [Address Cascading Failures] and when [Handling Overload] conditions.
229    /// The client libraries throttle their retry loop, using a policy to
230    /// control the throttling algorithm. Use this method to fine tune or
231    /// customize the default retry throttler.
232    ///
233    /// [Handling Overload]: https://sre.google/sre-book/handling-overload/
234    /// [Address Cascading Failures]: https://sre.google/sre-book/addressing-cascading-failures/
235    ///
236    /// ```
237    /// # use google_cloud_pubsub::client::Publisher;
238    /// # async fn sample() -> anyhow::Result<()> {
239    /// use google_cloud_gax::retry_throttler::AdaptiveThrottler;
240    /// let client = Publisher::builder("projects/my-project/topics/my-topic")
241    ///     .with_retry_throttler(AdaptiveThrottler::default())
242    ///     .build().await?;
243    /// # Ok(()) };
244    /// ```
245    pub fn with_retry_throttler<V: Into<RetryThrottlerArg>>(mut self, v: V) -> Self {
246        self.base_builder = self.base_builder.with_retry_throttler(v);
247        self
248    }
249
250    /// Configure the number of gRPC subchannels.
251    ///
252    /// # Example
253    /// ```
254    /// # use google_cloud_pubsub::client::Publisher;
255    /// # async fn sample() -> anyhow::Result<()> {
256    /// let client = Publisher::builder("projects/my-project/topics/my-topic")
257    ///     .with_grpc_subchannel_count(4)
258    ///     .build()
259    ///     .await?;
260    /// # Ok(()) }
261    /// ```
262    ///
263    /// gRPC-based clients may exhibit high latency if many requests need to be
264    /// demuxed over a single HTTP/2 connection (often called a *subchannel* in
265    /// gRPC).
266    ///
267    /// Consider using more subchannels if your application makes many
268    /// concurrent requests. Consider using fewer subchannels if your
269    /// application needs the file descriptors for other purposes.
270    pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
271        self.base_builder = self.base_builder.with_grpc_subchannel_count(v);
272        self
273    }
274}
275
276/// Creates [`Publisher`]s with a preconfigured client.
277///
278/// # Example
279///
280/// ```
281/// # async fn sample() -> anyhow::Result<()> {
282/// # use google_cloud_pubsub::*;
283/// # use google_cloud_pubsub::client::BasePublisher;
284/// let client: BasePublisher = BasePublisher::builder().build().await?;
285/// let publisher = client.publisher("projects/my-project/topics/topic").build();
286/// # Ok(()) }
287/// ```
288#[derive(Clone, Debug)]
289pub struct PublisherPartialBuilder {
290    pub(crate) inner: GapicPublisher,
291    topic: String,
292    batching_options: BatchingOptions,
293}
294
295impl PublisherPartialBuilder {
296    /// Creates a new Pub/Sub publisher builder for topic.
297    pub(crate) fn new(client: GapicPublisher, topic: String) -> Self {
298        Self {
299            inner: client,
300            topic,
301            batching_options: BatchingOptions::default(),
302        }
303    }
304
305    /// Sets the message count threshold for batching.
306    ///
307    /// The publisher will send a batch of messages when the number of messages
308    /// in the batch reaches this threshold.
309    ///
310    /// # Example
311    ///
312    /// ```
313    /// # use google_cloud_pubsub::client::BasePublisher;
314    /// # async fn sample() -> anyhow::Result<()> {
315    /// # let client: BasePublisher = BasePublisher::builder().build().await?;
316    /// let publisher = client
317    ///     .publisher("projects/my-project/topics/my-topic")
318    ///     .set_message_count_threshold(100)
319    ///     .build();
320    /// # Ok(()) }
321    /// ```
322    pub fn set_message_count_threshold(mut self, threshold: u32) -> PublisherPartialBuilder {
323        self.batching_options = self.batching_options.set_message_count_threshold(threshold);
324        self
325    }
326
327    /// Sets the byte threshold for batching.
328    ///
329    /// The publisher will send a batch of messages when the total size of the
330    /// messages in the batch reaches this threshold.
331    ///
332    /// # Example
333    ///
334    /// ```
335    /// # use google_cloud_pubsub::client::BasePublisher;
336    /// # async fn sample() -> anyhow::Result<()> {
337    /// # let client: BasePublisher = BasePublisher::builder().build().await?;
338    /// let publisher = client
339    ///     .publisher("projects/my-project/topics/my-topic")
340    ///     .set_byte_threshold(1024) // 1 KiB
341    ///     .build();
342    /// # Ok(()) }
343    /// ```
344    pub fn set_byte_threshold(mut self, threshold: u32) -> PublisherPartialBuilder {
345        self.batching_options = self.batching_options.set_byte_threshold(threshold);
346        self
347    }
348
349    /// Sets the delay threshold for batching.
350    ///
351    /// The publisher will wait a maximum of this amount of time before
352    /// sending a batch of messages.
353    ///
354    /// # Example
355    ///
356    /// ```
357    /// # use google_cloud_pubsub::client::BasePublisher;
358    /// # use std::time::Duration;
359    /// # async fn sample() -> anyhow::Result<()> {
360    /// # let client: BasePublisher = BasePublisher::builder().build().await?;
361    /// let publisher = client
362    ///     .publisher("projects/my-project/topics/my-topic")
363    ///     .set_delay_threshold(Duration::from_millis(50))
364    ///     .build();
365    /// # Ok(()) }
366    /// ```
367    pub fn set_delay_threshold(mut self, threshold: Duration) -> PublisherPartialBuilder {
368        self.batching_options = self.batching_options.set_delay_threshold(threshold);
369        self
370    }
371
372    /// Creates a new [`Publisher`] from the builder's configuration.
373    pub fn build(self) -> Publisher {
374        self.build_return_handle().0
375    }
376
377    // This method starts a background task to manage the batching
378    // and sending of messages. The returned `Publisher` is a
379    // lightweight handle for sending messages to that background task
380    // over a channel.
381    //
382    // This also returns a handle to the background task, which can be
383    // used in testing to manage the task's lifecycle.
384    pub(crate) fn build_return_handle(self) -> (Publisher, tokio::task::JoinHandle<()>) {
385        // Enforce limits by clamping the user-provided options.
386        let batching_options = BatchingOptions::new()
387            .set_delay_threshold(
388                self.batching_options
389                    .delay_threshold
390                    .clamp(Duration::ZERO, MAX_DELAY),
391            )
392            .set_message_count_threshold(
393                self.batching_options
394                    .message_count_threshold
395                    .clamp(0, MAX_MESSAGES),
396            )
397            .set_byte_threshold(self.batching_options.byte_threshold.clamp(0, MAX_BYTES));
398
399        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
400        // Create the Dispatcher that will run in the background.
401        // We don't need to keep track of a handle to the dispatcher.
402        // Dropping the Publisher will drop the only sender to the channel.
403        // This will cause the dispatcher to gracefully exit.
404        let dispatcher = Dispatcher::new(self.topic, self.inner, batching_options.clone(), rx);
405        let handle = tokio::spawn(dispatcher.run());
406
407        (
408            Publisher {
409                batching_options,
410                tx,
411            },
412            handle,
413        )
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    #[tokio::test]
422    async fn builder() -> anyhow::Result<()> {
423        let client: BasePublisher = BasePublisher::builder().build().await?;
424        let builder = client.publisher("projects/my-project/topics/my-topic");
425        let publisher = builder.set_message_count_threshold(1_u32).build();
426        assert_eq!(publisher.batching_options.message_count_threshold, 1_u32);
427
428        let publisher = Publisher::builder("projects/my-project/topics/my-topic")
429            .set_message_count_threshold(1_u32)
430            .build()
431            .await?;
432        assert_eq!(publisher.batching_options.message_count_threshold, 1_u32);
433        Ok(())
434    }
435
436    #[tokio::test]
437    async fn default_batching() -> anyhow::Result<()> {
438        // Test that default values for BasePublisher and Publisher are the same.
439        let topic_name = "projects/my-project/topics/my-topic";
440        let publishers = vec![
441            BasePublisher::builder()
442                .build()
443                .await?
444                .publisher(topic_name)
445                .build(),
446            Publisher::builder(topic_name).build().await?,
447        ];
448
449        for publisher in publishers {
450            assert_eq!(
451                publisher.batching_options.message_count_threshold,
452                BatchingOptions::default().message_count_threshold
453            );
454            assert_eq!(
455                publisher.batching_options.byte_threshold,
456                BatchingOptions::default().byte_threshold
457            );
458            assert_eq!(
459                publisher.batching_options.delay_threshold,
460                BatchingOptions::default().delay_threshold
461            );
462        }
463        Ok(())
464    }
465
466    fn assert_eq_client_config(
467        pub_config: &gaxi::options::ClientConfig,
468        base_config: &gaxi::options::ClientConfig,
469    ) {
470        assert_eq!(pub_config.endpoint, base_config.endpoint);
471        assert_eq!(pub_config.cred.is_some(), base_config.cred.is_some());
472        assert_eq!(pub_config.tracing, base_config.tracing);
473        assert_eq!(
474            pub_config.retry_policy.is_some(),
475            base_config.retry_policy.is_some()
476        );
477        assert_eq!(
478            pub_config.backoff_policy.is_some(),
479            base_config.backoff_policy.is_some()
480        );
481        assert_eq!(
482            pub_config.grpc_subchannel_count,
483            base_config.grpc_subchannel_count
484        );
485    }
486
487    #[test]
488    fn publisher_has_default_client_config() {
489        let pub_builder = Publisher::builder("projects/my-project/topics/my-topic");
490        let base_builder = BasePublisher::builder();
491        let pub_config = &pub_builder.base_builder.config;
492        let base_config = &base_builder.config;
493
494        assert_eq_client_config(pub_config, base_config);
495    }
496
497    #[tokio::test]
498    async fn publisher_builder_sets_client_config() -> anyhow::Result<()> {
499        use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
500
501        use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
502        let throttler = google_cloud_gax::retry_throttler::CircuitBreaker::default();
503        let pub_builder = Publisher::builder("projects/my-project/topics/my-topic")
504            .with_endpoint("test-endpoint.com")
505            .with_credentials(Anonymous::new().build())
506            .with_tracing()
507            .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
508            .with_backoff_policy(
509                google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
510            )
511            .with_retry_throttler(throttler.clone())
512            .with_grpc_subchannel_count(16);
513        let base_builder = BasePublisher::builder()
514            .with_endpoint("test-endpoint.com")
515            .with_credentials(Anonymous::new().build())
516            .with_tracing()
517            .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
518            .with_backoff_policy(
519                google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
520            )
521            .with_retry_throttler(throttler)
522            .with_grpc_subchannel_count(16);
523
524        let pub_config = &pub_builder.base_builder.config;
525        let base_config = &base_builder.config;
526
527        assert_eq_client_config(pub_config, base_config);
528
529        Ok(())
530    }
531}