google_cloud_pubsub/publisher/
publisher.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::options::BatchingOptions;
16use crate::generated::gapic_dataplane::client::Publisher as GapicPublisher;
17use crate::publisher::worker::BundledMessage;
18use crate::publisher::worker::ToWorker;
19use crate::publisher::worker::Worker;
20use std::time::Duration;
21use tokio::sync::mpsc::UnboundedSender;
22use tokio::sync::oneshot;
23
24const MAX_DELAY: Duration = Duration::from_secs(60 * 60 * 24); // 1 day
25// These limits come from https://cloud.google.com/pubsub/docs/batch-messaging#quotas_and_limits_on_batch_messaging.
26// Client libraries are expected to enforce these limits on batch siziing.
27const MAX_MESSAGES: u32 = 1000;
28const MAX_BYTES: u32 = 1e7 as u32; // 10MB
29
30/// A `Publisher` sends messages to a specific topic. It manages message batching
31/// and sending in a background task.
32///
33/// Publishers are created via a [`Client`](crate::client::Client).
34///
35/// ```
36/// # async fn sample() -> anyhow::Result<()> {
37/// # use google_cloud_pubsub::*;
38/// # use client::Client;
39/// # use model::PubsubMessage;
40/// let client = Client::builder().build().await?;
41/// let publisher = client.publisher("projects/my-project/topics/my-topic").build();
42/// let message_id = publisher.publish(PubsubMessage::new().set_data("Hello, World"));
43/// # Ok(()) }
44/// ```
45#[derive(Debug, Clone)]
46pub struct Publisher {
47    #[allow(dead_code)]
48    pub(crate) batching_options: BatchingOptions,
49    tx: UnboundedSender<ToWorker>,
50}
51
52impl Publisher {
53    /// Publishes a message to the topic.
54    ///
55    /// ```
56    /// # use google_cloud_pubsub::client::Publisher;
57    /// # async fn sample(publisher: Publisher) -> anyhow::Result<()> {
58    /// # use google_cloud_pubsub::model::PubsubMessage;
59    /// let message_id = publisher.publish(PubsubMessage::new().set_data("Hello, World")).await?;
60    /// # Ok(()) }
61    /// ```
62    pub fn publish(&self, msg: crate::model::PubsubMessage) -> crate::model_ext::PublishHandle {
63        let (tx, rx) = tokio::sync::oneshot::channel();
64
65        // If this fails, the worker is gone, which indicates something bad has happened.
66        // The PublishHandle will automatically receive an error when `tx` is dropped.
67        if self
68            .tx
69            .send(ToWorker::Publish(BundledMessage { msg, tx }))
70            .is_err()
71        {
72            // `tx` is dropped here if the send errors.
73        }
74        crate::model_ext::PublishHandle { rx }
75    }
76
77    /// Flushes all outstanding messages.
78    ///
79    /// This method sends any messages that have been published but not yet sent,
80    /// regardless of the configured batching options (`delay_threshold`, etc.).
81    ///
82    /// This method is `async` and returns only after all publish attempts for the
83    /// messages in the snapshot have completed. A "completed" attempt means the
84    /// message has either been successfully sent, or has failed permanently after
85    /// exhausting any applicable retry policies.
86    ///
87    /// After flush()` returns, the final result of each individual publish
88    /// operation (i.e., a success with a message ID or a terminal error) will
89    /// be available on its corresponding [PublishHandle](crate::model_ext::PublishHandle).
90    ///
91    /// Messages published after `flush()` is called will be buffered for a
92    /// subsequent batch and are not included in this flush operation.
93    ///
94    /// # Example
95    ///
96    /// ```
97    /// # use google_cloud_pubsub::model::PubsubMessage;
98    /// # async fn sample(publisher: google_cloud_pubsub::client::Publisher) -> anyhow::Result<()> {
99    /// // Publish some messages. They will be buffered according to batching options.
100    /// let handle1 = publisher.publish(PubsubMessage::new().set_data("foo".to_string()));
101    /// let handle2 = publisher.publish(PubsubMessage::new().set_data("bar".to_string()));
102    ///
103    /// // Flush ensures that these messages are sent immediately and waits for
104    /// // the send to complete.
105    /// publisher.flush().await;
106    ///
107    /// // The results for handle1 and handle2 are available.
108    /// let id1 = handle1.await?;
109    /// let id2 = handle2.await?;
110    /// # Ok(())
111    /// # }
112    /// ```
113    pub async fn flush(&self) {
114        let (tx, rx) = oneshot::channel();
115        if self.tx.send(ToWorker::Flush(tx)).is_err() {
116            // `tx` is dropped here if the send errors.
117        }
118        rx.await
119            .expect("the client library should not release the sender");
120    }
121}
122
123/// Creates `Publisher`s.
124///
125/// Publishers are created via a [`Client`][crate::client::Client].
126///
127/// # Example
128///
129/// ```
130/// # async fn sample() -> anyhow::Result<()> {
131/// # use google_cloud_pubsub::*;
132/// # use builder::publisher::ClientBuilder;
133/// # use client::Client;
134/// let client = Client::builder().build().await?;
135/// let publisher = client.publisher("projects/my-project/topics/topic").build();
136/// # Ok(()) }
137/// ```
138#[derive(Clone, Debug)]
139pub struct PublisherBuilder {
140    pub(crate) inner: GapicPublisher,
141    topic: String,
142    batching_options: BatchingOptions,
143}
144
145impl PublisherBuilder {
146    /// Creates a new Pub/Sub publisher builder for topic.
147    pub(crate) fn new(client: GapicPublisher, topic: String) -> Self {
148        Self {
149            inner: client,
150            topic,
151            batching_options: BatchingOptions::default(),
152        }
153    }
154
155    /// Sets the maximum number of messages to be batched together for a single `Publish` call.
156    /// When this number is reached, the batch is sent.
157    ///
158    /// Setting this to `1` disables batching by message count.
159    ///
160    /// # Example
161    /// ```
162    /// # use google_cloud_pubsub::client::Client;
163    /// # async fn sample() -> anyhow::Result<()> {
164    /// # let client = Client::builder().build().await?;
165    /// let publisher = client.publisher("projects/my-project/topics/my-topic")
166    ///     .set_message_count_threshold(100)
167    ///     .build();
168    /// # Ok(()) }
169    /// ```
170    pub fn set_message_count_threshold(mut self, threshold: u32) -> PublisherBuilder {
171        self.batching_options = self.batching_options.set_message_count_threshold(threshold);
172        self
173    }
174
175    /// Sets the byte threshold for batching in a single `Publish` call.
176    /// When this many bytes are accumulated, the batch is sent.
177    ///
178    /// # Example
179    /// ```
180    /// # use google_cloud_pubsub::client::Client;
181    /// # async fn sample() -> anyhow::Result<()> {
182    /// # let client = Client::builder().build().await?;
183    /// let publisher = client.publisher("projects/my-project/topics/my-topic")
184    ///     .set_byte_threshold(100)
185    ///     .build();
186    /// # Ok(()) }
187    /// ```
188    pub fn set_byte_threshold(mut self, threshold: u32) -> PublisherBuilder {
189        self.batching_options = self.batching_options.set_byte_threshold(threshold);
190        self
191    }
192
193    /// Sets the maximum amount of time the publisher will wait before sending a
194    /// batch. When this delay is reached, the current batch is sent, regardless
195    /// of the number of messages or total byte size.
196    ///
197    /// # Example
198    /// ```
199    /// # use google_cloud_pubsub::client::Client;
200    /// # use std::time::Duration;
201    /// # async fn sample() -> anyhow::Result<()> {
202    /// # let client = Client::builder().build().await?;
203    /// let publisher = client.publisher("projects/my-project/topics/my-topic")
204    ///     .set_delay_threshold(Duration::from_millis(50))
205    ///     .build();
206    /// # Ok(()) }
207    /// ```
208    pub fn set_delay_threshold(mut self, threshold: Duration) -> PublisherBuilder {
209        self.batching_options = self.batching_options.set_delay_threshold(threshold);
210        self
211    }
212
213    /// Creates a new [`Publisher`] from the builder's configuration.
214    // This method starts a background task to manage the batching
215    // and sending of messages. The returned `Publisher` is a
216    // lightweight handle for sending messages to that background task
217    // over a channel.
218    pub fn build(self) -> Publisher {
219        // Enforce limits by clamping the user-provided options.
220        let batching_options = BatchingOptions::new()
221            .set_delay_threshold(
222                self.batching_options
223                    .delay_threshold
224                    .clamp(Duration::ZERO, MAX_DELAY),
225            )
226            .set_message_count_threshold(
227                self.batching_options
228                    .message_count_threshold
229                    .clamp(0, MAX_MESSAGES),
230            )
231            .set_byte_threshold(self.batching_options.byte_threshold.clamp(0, MAX_BYTES));
232
233        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
234        // Create the batching worker that will run in the background.
235        // We don't need to keep track of a handle to the worker.
236        // Dropping the Publisher will drop the only sender to the channel.
237        // This will cause worker.run() to read None from the channel and close.
238        let worker = Worker::new(self.topic, self.inner, batching_options.clone(), rx);
239        tokio::spawn(worker.run());
240
241        Publisher {
242            batching_options,
243            tx,
244        }
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use crate::{client::Client, publisher::options::BatchingOptions};
252    use crate::{
253        generated::gapic_dataplane::client::Publisher as GapicPublisher,
254        model::{PublishResponse, PubsubMessage},
255    };
256    use mockall::Sequence;
257
258    mockall::mock! {
259        #[derive(Debug)]
260        GapicPublisher {}
261        impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisher {
262            async fn publish(&self, req: crate::model::PublishRequest, _options: gax::options::RequestOptions) -> gax::Result<gax::response::Response<crate::model::PublishResponse>>;
263        }
264    }
265
266    // Similar to GapicPublisher but returns impl Future instead.
267    // This is useful for mocking a response with delays/timeouts.
268    // See https://github.com/asomers/mockall/issues/189 for more
269    // detail on why this is needed.
270    // While this can used inplace of GapicPublisher, it makes the
271    // normal usage without async closure much more cumbersome.
272    mockall::mock! {
273        #[derive(Debug)]
274        GapicPublisherWithFuture {}
275        impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
276            fn publish(&self, req: crate::model::PublishRequest, _options: gax::options::RequestOptions) -> impl Future<Output=gax::Result<gax::response::Response<crate::model::PublishResponse>>> + Send;
277        }
278    }
279
280    #[tokio::test]
281    async fn test_worker_success() {
282        let mut mock = MockGapicPublisher::new();
283        mock.expect_publish()
284            .returning({
285                |r, _| {
286                    assert_eq!(r.topic, "my-topic");
287                    assert_eq!(r.messages.len(), 1);
288                    let id = String::from_utf8(r.messages[0].data.to_vec()).unwrap();
289                    Ok(gax::response::Response::from(
290                        PublishResponse::new().set_message_ids(vec![id]),
291                    ))
292                }
293            })
294            .times(2);
295
296        let client = GapicPublisher::from_stub(mock);
297        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
298            .set_message_count_threshold(1_u32)
299            .build();
300
301        let messages = vec![
302            PubsubMessage::new().set_data("hello".to_string()),
303            PubsubMessage::new().set_data("world".to_string()),
304        ];
305        let mut handles = Vec::new();
306        for msg in messages {
307            let handle = publisher.publish(msg.clone());
308            handles.push((msg, handle));
309        }
310
311        for (id, rx) in handles.into_iter() {
312            let got = rx.await.expect("expected message id");
313            let id = String::from_utf8(id.data.to_vec()).unwrap();
314            assert_eq!(got, id);
315        }
316    }
317
318    #[tokio::test(start_paused = true)]
319    async fn test_drop_publisher() {
320        // If we hold on to the handles returned from the publisher, it should
321        // be safe to drop the publisher and .await on the handles.
322        let mut mock = MockGapicPublisher::new();
323        mock.expect_publish().return_once({
324            |r, _| {
325                assert_eq!(r.topic, "my-topic");
326                let ids = r
327                    .messages
328                    .iter()
329                    .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
330                Ok(gax::response::Response::from(
331                    PublishResponse::new().set_message_ids(ids),
332                ))
333            }
334        });
335        let client = GapicPublisher::from_stub(mock);
336        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
337            .set_message_count_threshold(1000_u32)
338            .set_delay_threshold(Duration::from_secs(60))
339            .build();
340
341        let start = tokio::time::Instant::now();
342        let messages = vec![
343            PubsubMessage::new().set_data("hello".to_string()),
344            PubsubMessage::new().set_data("world".to_string()),
345        ];
346        let mut handles = Vec::new();
347        for msg in messages {
348            let handle = publisher.publish(msg.clone());
349            handles.push((msg, handle));
350        }
351        drop(publisher); // This should trigger the batch to send, no delay.
352
353        for (id, rx) in handles.into_iter() {
354            let got = rx.await.expect("expected message id");
355            let id = String::from_utf8(id.data.to_vec()).unwrap();
356            assert_eq!(got, id);
357            assert_eq!(start.elapsed(), Duration::ZERO);
358        }
359    }
360
361    #[tokio::test]
362    async fn test_worker_error() {
363        let mut mock = MockGapicPublisher::new();
364        mock.expect_publish()
365            .returning({
366                |r, _| {
367                    assert_eq!(r.topic, "my-topic");
368                    assert_eq!(r.messages.len(), 1);
369                    Err(gax::error::Error::io("io error"))
370                }
371            })
372            .times(2);
373
374        let client = GapicPublisher::from_stub(mock);
375        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
376            .set_message_count_threshold(1_u32)
377            .build();
378
379        let messages = vec![
380            PubsubMessage::new().set_data("hello".to_string()),
381            PubsubMessage::new().set_data("world".to_string()),
382        ];
383
384        let mut handles = Vec::new();
385        for msg in messages {
386            let handle = publisher.publish(msg.clone());
387            handles.push(handle);
388        }
389
390        for rx in handles.into_iter() {
391            let got = rx.await;
392            assert!(got.is_err());
393        }
394    }
395
396    #[tokio::test(start_paused = true)]
397    async fn test_worker_flush() {
398        let mut mock = MockGapicPublisher::new();
399        mock.expect_publish().returning({
400            |r, _| {
401                assert_eq!(r.topic, "my-topic");
402                let ids = r
403                    .messages
404                    .iter()
405                    .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
406                Ok(gax::response::Response::from(
407                    PublishResponse::new().set_message_ids(ids),
408                ))
409            }
410        });
411
412        let client = GapicPublisher::from_stub(mock);
413        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
414            // Set a long delay.
415            .set_message_count_threshold(1000_u32)
416            .set_delay_threshold(Duration::from_secs(60))
417            .build();
418
419        let start = tokio::time::Instant::now();
420        let messages = vec![
421            PubsubMessage::new().set_data("hello".to_string()),
422            PubsubMessage::new().set_data("world".to_string()),
423        ];
424        let mut handles = Vec::new();
425        for msg in messages {
426            let handle = publisher.publish(msg.clone());
427            handles.push((msg, handle));
428        }
429
430        publisher.flush().await;
431        assert_eq!(start.elapsed(), Duration::ZERO);
432
433        let post = publisher.publish(PubsubMessage::new().set_data("after".to_string()));
434        for (id, rx) in handles.into_iter() {
435            let got = rx.await.expect("expected message id");
436            let id = String::from_utf8(id.data.to_vec()).unwrap();
437            assert_eq!(got, id);
438            assert_eq!(start.elapsed(), Duration::ZERO);
439        }
440
441        // The last message is only sent after the next timeout
442        // (worker does not continue to flush).
443        let got = post.await.expect("expected message id");
444        assert_eq!(got, "after");
445        assert_eq!(start.elapsed(), Duration::from_secs(60));
446    }
447
448    // User's should be able to drop handles and the messages will still send.
449    #[tokio::test(start_paused = true)]
450    async fn test_worker_drop_handles() {
451        let mut mock = MockGapicPublisher::new();
452        mock.expect_publish().return_once({
453            move |r, _| {
454                assert_eq!(r.topic, "my-topic");
455                let ids = r
456                    .messages
457                    .iter()
458                    .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
459                assert_eq!(ids.len(), 2);
460                let ids = ids.collect::<Vec<_>>();
461                assert_eq!(ids.clone(), vec!["hello", "world"]);
462                Ok(gax::response::Response::from(
463                    PublishResponse::new().set_message_ids(ids),
464                ))
465            }
466        });
467
468        let client = GapicPublisher::from_stub(mock);
469        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
470            // Set a long delay.
471            .set_message_count_threshold(1000_u32)
472            .set_delay_threshold(Duration::from_secs(60))
473            .build();
474
475        let start = tokio::time::Instant::now();
476        let messages = vec![
477            PubsubMessage::new().set_data("hello".to_string()),
478            PubsubMessage::new().set_data("world".to_string()),
479        ];
480        for msg in messages {
481            publisher.publish(msg.clone());
482        }
483
484        publisher.flush().await;
485        assert_eq!(start.elapsed(), Duration::ZERO);
486    }
487
488    #[tokio::test(start_paused = true)]
489    async fn test_empty_flush() {
490        let mock = MockGapicPublisher::new();
491
492        let client = GapicPublisher::from_stub(mock);
493        let publisher = PublisherBuilder::new(client, "my-topic".to_string()).build();
494
495        let start = tokio::time::Instant::now();
496        publisher.flush().await;
497        assert_eq!(start.elapsed(), Duration::ZERO);
498    }
499
500    #[tokio::test]
501    async fn test_batching_send_on_message_count_threshold_success() {
502        // Make sure all messages in a batch receive the correct message ID.
503        let mut mock = MockGapicPublisher::new();
504        mock.expect_publish().return_once({
505            |r, _| {
506                assert_eq!(r.topic, "my-topic");
507                assert_eq!(r.messages.len(), 2);
508                let ids = r
509                    .messages
510                    .iter()
511                    .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
512                Ok(gax::response::Response::from(
513                    PublishResponse::new().set_message_ids(ids),
514                ))
515            }
516        });
517
518        let client = GapicPublisher::from_stub(mock);
519        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
520            .set_message_count_threshold(2_u32)
521            .set_byte_threshold(MAX_BYTES)
522            .set_delay_threshold(std::time::Duration::MAX)
523            .build();
524
525        let messages = vec![
526            PubsubMessage::new().set_data("hello".to_string()),
527            PubsubMessage::new().set_data("world".to_string()),
528        ];
529        let mut handles = Vec::new();
530        for msg in messages {
531            let handle = publisher.publish(msg.clone());
532            handles.push((msg, handle));
533        }
534
535        for (id, rx) in handles.into_iter() {
536            let got = rx.await.expect("expected message id");
537            let id = String::from_utf8(id.data.to_vec()).unwrap();
538            assert_eq!(got, id);
539        }
540    }
541
542    #[tokio::test]
543    async fn test_batching_send_on_message_count_threshold_error() {
544        // Make sure all messages in a batch receive an error.
545        let mut mock = MockGapicPublisher::new();
546        mock.expect_publish().return_once({
547            |r, _| {
548                assert_eq!(r.topic, "my-topic");
549                assert_eq!(r.messages.len(), 2);
550                Err(gax::error::Error::io("io error"))
551            }
552        });
553
554        let client = GapicPublisher::from_stub(mock);
555        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
556            .set_message_count_threshold(2_u32)
557            .set_byte_threshold(MAX_BYTES)
558            .set_delay_threshold(std::time::Duration::MAX)
559            .build();
560
561        let messages = vec![
562            PubsubMessage::new().set_data("hello".to_string()),
563            PubsubMessage::new().set_data("world".to_string()),
564        ];
565        let mut handles = Vec::new();
566        for msg in messages {
567            let handle = publisher.publish(msg.clone());
568            handles.push(handle);
569        }
570
571        for rx in handles.into_iter() {
572            let got = rx.await;
573            assert!(got.is_err());
574        }
575    }
576
577    #[tokio::test]
578    async fn test_batching_send_on_byte_threshold() {
579        // Make sure all messages in a batch receive the correct message ID.
580        let mut mock = MockGapicPublisher::new();
581        mock.expect_publish().return_once({
582            |r, _| {
583                assert_eq!(r.topic, "my-topic");
584                assert_eq!(r.messages.len(), 2);
585                let ids = r
586                    .messages
587                    .iter()
588                    .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
589                Ok(gax::response::Response::from(
590                    PublishResponse::new().set_message_ids(ids),
591                ))
592            }
593        });
594
595        let client = GapicPublisher::from_stub(mock);
596        // Ensure that the first message does not pass the threshold.
597        let byte_threshold = "my-topic".len() + "hello".len() + 1;
598        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
599            .set_message_count_threshold(MAX_MESSAGES)
600            .set_byte_threshold(byte_threshold as u32)
601            .set_delay_threshold(std::time::Duration::MAX)
602            .build();
603
604        let messages = vec![
605            PubsubMessage::new().set_data("hello".to_string()),
606            PubsubMessage::new().set_data("world".to_string()),
607        ];
608        let mut handles = Vec::new();
609        for msg in messages {
610            let handle = publisher.publish(msg.clone());
611            handles.push((msg, handle));
612        }
613
614        for (id, rx) in handles.into_iter() {
615            let got = rx.await.expect("expected message id");
616            let id = String::from_utf8(id.data.to_vec()).unwrap();
617            assert_eq!(got, id);
618        }
619    }
620
621    #[tokio::test(start_paused = true)]
622    async fn test_batching_send_on_delay_threshold() {
623        let mut mock = MockGapicPublisher::new();
624        mock.expect_publish().returning({
625            |r, _| {
626                assert_eq!(r.topic, "my-topic");
627                let ids = r
628                    .messages
629                    .iter()
630                    .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
631                Ok(gax::response::Response::from(
632                    PublishResponse::new().set_message_ids(ids),
633                ))
634            }
635        });
636
637        let client = GapicPublisher::from_stub(mock);
638        let delay = std::time::Duration::from_millis(10);
639        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
640            .set_message_count_threshold(u32::MAX)
641            .set_byte_threshold(MAX_BYTES)
642            .set_delay_threshold(delay)
643            .build();
644
645        // Test that messages send after delay.
646        for _ in 0..3 {
647            let start = tokio::time::Instant::now();
648            let messages = vec![
649                PubsubMessage::new().set_data("hello 0".to_string()),
650                PubsubMessage::new().set_data("hello 1".to_string()),
651                PubsubMessage::new()
652                    .set_data("hello 2".to_string())
653                    .set_ordering_key("ordering key 1"),
654                PubsubMessage::new()
655                    .set_data("hello 3".to_string())
656                    .set_ordering_key("ordering key 2"),
657            ];
658            let mut handles = Vec::new();
659            for msg in messages {
660                let handle = publisher.publish(msg.clone());
661                handles.push((msg, handle));
662            }
663
664            for (id, rx) in handles.into_iter() {
665                let got = rx.await.expect("expected message id");
666                let id = String::from_utf8(id.data.to_vec()).unwrap();
667                assert_eq!(got, id);
668                assert_eq!(
669                    start.elapsed(),
670                    delay,
671                    "batch of messages should have sent after {:?}",
672                    delay
673                )
674            }
675        }
676    }
677
678    #[tokio::test(start_paused = true)]
679    #[allow(clippy::get_first)]
680    async fn test_batching_on_ordering_key() {
681        // Publish messages with different ordering key and validate that they are in different batches.
682        let mut mock = MockGapicPublisher::new();
683        mock.expect_publish().returning({
684            |r, _| {
685                assert_eq!(r.topic, "my-topic");
686                assert_eq!(r.messages.len(), 2);
687                assert_eq!(
688                    r.messages.get(0).unwrap().ordering_key,
689                    r.messages.get(1).unwrap().ordering_key
690                );
691                let ids = r
692                    .messages
693                    .iter()
694                    .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
695                Ok(gax::response::Response::from(
696                    PublishResponse::new().set_message_ids(ids),
697                ))
698            }
699        });
700
701        let client = GapicPublisher::from_stub(mock);
702        // Use a low message count to trigger batch sends.
703        let message_count_threshold = 2_u32;
704        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
705            .set_message_count_threshold(message_count_threshold)
706            .set_byte_threshold(MAX_BYTES)
707            .set_delay_threshold(std::time::Duration::MAX)
708            .build();
709
710        let num_ordering_keys = 3;
711        let mut messages = Vec::new();
712        // We want the number of messages to be a multiple of num_ordering_keys
713        // and message_count_threshold. Otherwise, the final batch of each
714        // ordering key may fail the message len assertion.
715        for i in 0..(2 * message_count_threshold * num_ordering_keys) {
716            messages.push(
717                PubsubMessage::new()
718                    .set_data(format!("test message {}", i))
719                    .set_ordering_key(format!("ordering key: {}", i % num_ordering_keys)),
720            );
721        }
722        let mut handles = Vec::new();
723        for msg in messages {
724            let handle = publisher.publish(msg.clone());
725            handles.push((msg, handle));
726        }
727
728        for (id, rx) in handles.into_iter() {
729            let got = rx.await.expect("expected message id");
730            let id = String::from_utf8(id.data.to_vec()).unwrap();
731            assert_eq!(got, id);
732        }
733    }
734
735    #[tokio::test(start_paused = true)]
736    #[allow(clippy::get_first)]
737    async fn test_batching_empty_ordering_key() {
738        // Publish messages with different ordering key and validate that they are in different batches.
739        let mut mock = MockGapicPublisher::new();
740        mock.expect_publish().returning({
741            |r, _| {
742                assert_eq!(r.topic, "my-topic");
743                assert_eq!(r.messages.len(), 2);
744                assert_eq!(
745                    r.messages.get(0).unwrap().ordering_key,
746                    r.messages.get(1).unwrap().ordering_key
747                );
748                let ids = r
749                    .messages
750                    .iter()
751                    .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
752                Ok(gax::response::Response::from(
753                    PublishResponse::new().set_message_ids(ids),
754                ))
755            }
756        });
757
758        let client = GapicPublisher::from_stub(mock);
759        // Use a low message count to trigger batch sends.
760        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
761            .set_message_count_threshold(2_u32)
762            .set_byte_threshold(MAX_BYTES)
763            .set_delay_threshold(std::time::Duration::MAX)
764            .build();
765
766        let messages = vec![
767            PubsubMessage::new().set_data("hello 1".to_string()),
768            PubsubMessage::new()
769                .set_data("hello 2".to_string())
770                .set_ordering_key(""),
771            PubsubMessage::new()
772                .set_data("hello 3".to_string())
773                .set_ordering_key("ordering key :1"),
774            PubsubMessage::new()
775                .set_data("hello 4".to_string())
776                .set_ordering_key("ordering key :1"),
777        ];
778
779        let mut handles = Vec::new();
780        for msg in messages {
781            let handle = publisher.publish(msg.clone());
782            handles.push((msg, handle));
783        }
784
785        for (id, rx) in handles.into_iter() {
786            let got = rx.await.expect("expected message id");
787            let id = String::from_utf8(id.data.to_vec()).unwrap();
788            assert_eq!(got, id);
789        }
790    }
791
792    #[tokio::test(start_paused = true)]
793    #[allow(clippy::get_first)]
794    async fn test_ordering_key_only_one_outstanding_batch() {
795        // Verify that Publisher must only have 1 outstanding batch inflight at a time.
796        // This is done by validating that the 2 expected publish calls are done in sequence
797        // with a sleep delay in the first Publish reply.
798        let mut seq = Sequence::new();
799        let mut mock = MockGapicPublisherWithFuture::new();
800        mock.expect_publish()
801            .times(1)
802            .in_sequence(&mut seq)
803            .returning({
804                |r, _| {
805                    Box::pin(async move {
806                        tokio::time::sleep(Duration::from_millis(10)).await;
807                        assert_eq!(r.topic, "my-topic");
808                        assert_eq!(r.messages.len(), 1);
809                        let ids = r
810                            .messages
811                            .iter()
812                            .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
813                        Ok(gax::response::Response::from(
814                            PublishResponse::new().set_message_ids(ids),
815                        ))
816                    })
817                }
818            });
819
820        mock.expect_publish()
821            .times(1)
822            .in_sequence(&mut seq)
823            .returning({
824                |r, _| {
825                    Box::pin(async move {
826                        assert_eq!(r.topic, "my-topic");
827                        assert_eq!(r.messages.len(), 1);
828                        let ids = r
829                            .messages
830                            .iter()
831                            .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
832                        Ok(gax::response::Response::from(
833                            PublishResponse::new().set_message_ids(ids),
834                        ))
835                    })
836                }
837            });
838
839        let client = GapicPublisher::from_stub(mock);
840        // Use a low message count to trigger batch sends.
841        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
842            .set_message_count_threshold(1_u32)
843            .set_byte_threshold(MAX_BYTES)
844            .set_delay_threshold(std::time::Duration::MAX)
845            .build();
846
847        let messages = [
848            PubsubMessage::new()
849                .set_data("hello 1".to_string())
850                .set_ordering_key("ordering key"),
851            PubsubMessage::new()
852                .set_data("hello 2".to_string())
853                .set_ordering_key("ordering key"),
854        ];
855
856        let start = tokio::time::Instant::now();
857        let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
858        let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
859        assert_eq!(msg2_handle.await.expect("expected message id"), "hello 2");
860        assert_eq!(
861            start.elapsed(),
862            Duration::from_millis(10),
863            "the second batch of messages should have sent after the first which is has been delayed by {:?}",
864            Duration::from_millis(10)
865        );
866        // Also validate the content of the first publish.
867        assert_eq!(msg1_handle.await.expect("expected message id"), "hello 1");
868    }
869
870    #[tokio::test(start_paused = true)]
871    #[allow(clippy::get_first)]
872    async fn test_empty_ordering_key_concurrent_batches() {
873        // Verify that for empty ordering key, the Publisher will send multiple batches without
874        // awaiting for the results.
875        // This is done by adding a delay in the first Publish reply and validating that
876        // the second batch does not await for the first batch.
877        let mut seq = Sequence::new();
878        let mut mock = MockGapicPublisherWithFuture::new();
879        mock.expect_publish()
880            .times(1)
881            .in_sequence(&mut seq)
882            .returning({
883                |r, _| {
884                    Box::pin(async move {
885                        tokio::time::sleep(Duration::from_millis(10)).await;
886                        assert_eq!(r.topic, "my-topic");
887                        assert_eq!(r.messages.len(), 1);
888                        let ids = r
889                            .messages
890                            .iter()
891                            .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
892                        Ok(gax::response::Response::from(
893                            PublishResponse::new().set_message_ids(ids),
894                        ))
895                    })
896                }
897            });
898
899        mock.expect_publish()
900            .times(1)
901            .in_sequence(&mut seq)
902            .returning({
903                |r, _| {
904                    Box::pin(async move {
905                        assert_eq!(r.topic, "my-topic");
906                        assert_eq!(r.messages.len(), 1);
907                        let ids = r
908                            .messages
909                            .iter()
910                            .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
911                        Ok(gax::response::Response::from(
912                            PublishResponse::new().set_message_ids(ids),
913                        ))
914                    })
915                }
916            });
917
918        let client = GapicPublisher::from_stub(mock);
919        // Use a low message count to trigger batch sends.
920        let publisher = PublisherBuilder::new(client, "my-topic".to_string())
921            .set_message_count_threshold(1_u32)
922            .set_byte_threshold(MAX_BYTES)
923            .set_delay_threshold(std::time::Duration::MAX)
924            .build();
925
926        let messages = [
927            PubsubMessage::new()
928                .set_data("hello 1".to_string())
929                .set_ordering_key(""),
930            PubsubMessage::new()
931                .set_data("hello 2".to_string())
932                .set_ordering_key(""),
933        ];
934
935        let start = tokio::time::Instant::now();
936        let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
937        let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
938        assert_eq!(msg2_handle.await.expect("expected message id"), "hello 2");
939        assert_eq!(
940            start.elapsed(),
941            Duration::from_millis(0),
942            "the second batch of messages should have sent without any delay"
943        );
944        // Also validate the content of the first publish.
945        assert_eq!(msg1_handle.await.expect("expected message id"), "hello 1");
946    }
947
948    #[tokio::test]
949    async fn builder() -> anyhow::Result<()> {
950        let client = Client::builder().build().await?;
951        let builder = client.publisher("projects/my-project/topics/my-topic".to_string());
952        let publisher = builder.set_message_count_threshold(1_u32).build();
953        assert_eq!(publisher.batching_options.message_count_threshold, 1_u32);
954        Ok(())
955    }
956
957    #[tokio::test]
958    async fn default_batching() -> anyhow::Result<()> {
959        let client = Client::builder().build().await?;
960        let publisher = client
961            .publisher("projects/my-project/topics/my-topic".to_string())
962            .build();
963
964        assert_eq!(
965            publisher.batching_options.message_count_threshold,
966            BatchingOptions::default().message_count_threshold
967        );
968        assert_eq!(
969            publisher.batching_options.byte_threshold,
970            BatchingOptions::default().byte_threshold
971        );
972        assert_eq!(
973            publisher.batching_options.delay_threshold,
974            BatchingOptions::default().delay_threshold
975        );
976        Ok(())
977    }
978
979    #[tokio::test]
980    async fn test_builder_clamping() -> anyhow::Result<()> {
981        // Test values that are too high and should be clamped.
982        let oversized_options = BatchingOptions::new()
983            .set_delay_threshold(MAX_DELAY + Duration::from_secs(1))
984            .set_message_count_threshold(MAX_MESSAGES + 1)
985            .set_byte_threshold(MAX_BYTES + 1);
986
987        let client = Client::builder().build().await?;
988        let publisher = client
989            .publisher("projects/my-project/topics/my-topic".to_string())
990            .set_delay_threshold(oversized_options.delay_threshold)
991            .set_message_count_threshold(oversized_options.message_count_threshold)
992            .set_byte_threshold(oversized_options.byte_threshold)
993            .build();
994        let got = publisher.batching_options;
995
996        assert_eq!(got.delay_threshold, MAX_DELAY);
997        assert_eq!(got.message_count_threshold, MAX_MESSAGES);
998        assert_eq!(got.byte_threshold, MAX_BYTES);
999
1000        // Test values that are within limits and should not be changed.
1001        let normal_options = BatchingOptions::new()
1002            .set_delay_threshold(Duration::from_secs(10))
1003            .set_message_count_threshold(10_u32)
1004            .set_byte_threshold(100_u32);
1005
1006        let publisher = client
1007            .publisher("projects/my-project/topics/my-topic".to_string())
1008            .set_delay_threshold(normal_options.delay_threshold)
1009            .set_message_count_threshold(normal_options.message_count_threshold)
1010            .set_byte_threshold(normal_options.byte_threshold)
1011            .build();
1012        let got = publisher.batching_options;
1013
1014        assert_eq!(got.delay_threshold, normal_options.delay_threshold);
1015        assert_eq!(
1016            got.message_count_threshold,
1017            normal_options.message_count_threshold
1018        );
1019        assert_eq!(got.byte_threshold, normal_options.byte_threshold);
1020
1021        Ok(())
1022    }
1023}