Skip to main content

google_cloud_pubsub/publisher/
client.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::options::BatchingOptions;
16use crate::publisher::actor::BundledMessage;
17use crate::publisher::actor::ToDispatcher;
18use crate::publisher::builder::PublisherBuilder;
19
20use tokio::sync::mpsc::UnboundedSender;
21use tokio::sync::oneshot;
22
23pub use super::base_publisher::BasePublisher;
24
25/// A Publisher client for the [Cloud Pub/Sub] API.
26///
27/// A `Publisher` sends messages to a specific topic. It manages message batching
28/// and sending in a background task.
29///
30/// ```
31/// # async fn sample() -> anyhow::Result<()> {
32/// # use google_cloud_pubsub::*;
33/// # use google_cloud_pubsub::client::Publisher;
34/// # use model::Message;
35/// let publisher = Publisher::builder("projects/my-project/topics/my-topic").build().await?;
36/// let message_id_future = publisher.publish(Message::new().set_data("Hello, World"));
37/// # Ok(()) }
38/// ```
39///
40/// # Configuration
41///
42/// To configure `Publisher` use the `with_*` methods in the type returned
43/// by [builder()][Publisher::builder]. The default configuration should
44/// work for most applications. Common configuration changes include
45///
46/// * [with_endpoint()]: by default this client uses the global default endpoint
47///   (`https://pubsub.googleapis.com`). Applications using regional
48///   endpoints or running in restricted networks (e.g. a network configured
49//    with [Private Google Access with VPC Service Controls]) may want to
50///   override this default.
51/// * [with_credentials()]: by default this client uses
52///   [Application Default Credentials]. Applications using custom
53///   authentication may need to override this default.
54///
55/// # Pooling and Cloning
56///
57/// `Publisher` holds a connection pool internally, it is advised to
58/// create one and then reuse it. You do not need to wrap `Publisher` in
59/// an [Rc](std::rc::Rc) or [Arc](std::sync::Arc) to reuse it.
60///
61/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview
62/// [with_endpoint()]: crate::builder::publisher::PublisherBuilder::with_endpoint
63/// [with_credentials()]: crate::builder::publisher::PublisherBuilder::with_credentials
64/// [Private Google Access with VPC Service Controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
65/// [Application Default Credentials]: https://cloud.google.com/docs/authentication#adc
66#[derive(Debug, Clone)]
67pub struct Publisher {
68    #[allow(dead_code)]
69    pub(crate) batching_options: BatchingOptions,
70    pub(crate) tx: UnboundedSender<ToDispatcher>,
71}
72
73impl Publisher {
74    /// Returns a builder for [Publisher].
75    ///
76    /// # Example
77    ///
78    /// ```
79    /// # async fn sample() -> anyhow::Result<()> {
80    /// # use google_cloud_pubsub::*;
81    /// # use google_cloud_pubsub::client::Publisher;
82    /// let publisher = Publisher::builder("projects/my-project/topics/topic").build().await?;
83    /// # Ok(()) }
84    /// ```
85    pub fn builder<T: Into<String>>(topic: T) -> PublisherBuilder {
86        PublisherBuilder::new(topic.into())
87    }
88
89    /// Publishes a message to the topic.
90    ///
91    /// When this method encounters a non-recoverable error publishing for an ordering key,
92    /// it will pause publishing on all new messages on that ordering key. Any outstanding
93    /// messages that have not yet been published may return an error.
94    ///
95    /// ```
96    /// # use google_cloud_pubsub::client::Publisher;
97    /// # async fn sample(publisher: Publisher) -> anyhow::Result<()> {
98    /// # use google_cloud_pubsub::model::Message;
99    /// let message_id = publisher.publish(Message::new().set_data("Hello, World")).await?;
100    /// # Ok(()) }
101    /// ```
102    #[must_use = "ignoring the publish result may lead to undetected delivery failures"]
103    pub fn publish(&self, msg: crate::model::Message) -> crate::publisher::PublishFuture {
104        let (tx, rx) = tokio::sync::oneshot::channel();
105
106        // If this fails, the Dispatcher is gone, which indicates it has been dropped,
107        // possibly due to the background task being stopped by the runtime.
108        // The PublishFuture will automatically receive an error when `tx` is dropped.
109        if self
110            .tx
111            .send(ToDispatcher::Publish(BundledMessage { msg, tx }))
112            .is_err()
113        {
114            // `tx` is dropped here if the send errors.
115        }
116        crate::publisher::PublishFuture { rx }
117    }
118
119    /// Flushes all buffered messages across all ordering keys, sending them immediately.
120    ///
121    /// ```
122    /// # use google_cloud_pubsub::model::Message;
123    /// # async fn sample(publisher: google_cloud_pubsub::client::Publisher) -> anyhow::Result<()> {
124    /// let _handle = publisher.publish(Message::new().set_data("event"));
125    /// // Ensures the message above is sent without needing to track its future.
126    /// publisher.flush().await;
127    /// # Ok(())
128    /// # }
129    /// ```
130    ///
131    /// This method bypasses configured batching delays and returns only after all
132    /// messages buffered at the time of the call have reached a terminal state
133    /// (success or permanent failure).
134    ///
135    /// ### Recommendations
136    ///
137    /// *   For most use cases, we recommend you `.await`
138    ///     the [`PublishFuture`][crate::publisher::PublishFuture] returned by
139    ///     [`publish`][Self::publish] to retrieve message IDs and handle
140    ///     specific errors.
141    /// *   Use `flush()` as a convenience during application shutdown to
142    ///     ensure the client attempts to send all outstanding data.
143    pub async fn flush(&self) {
144        let (tx, rx) = oneshot::channel();
145        if self.tx.send(ToDispatcher::Flush(tx)).is_ok() {
146            let _ = rx.await;
147        }
148    }
149
150    /// Resume accepting publish for a paused ordering key.
151    ///
152    /// Publishing using an ordering key might be paused if an error is encountered while publishing, to prevent messages from being published out of order.
153    /// If the ordering key is not currently paused, this function is a no-op.
154    ///
155    /// # Example
156    ///
157    /// ```
158    /// # use google_cloud_pubsub::model::Message;
159    /// # async fn sample(publisher: google_cloud_pubsub::client::Publisher) -> anyhow::Result<()> {
160    /// if let Err(_) = publisher.publish(Message::new().set_data("foo").set_ordering_key("bar")).await {
161    ///     // Error handling code can go here.
162    ///     publisher.resume_publish("bar");
163    /// }
164    /// # Ok(())
165    /// # }
166    /// ```
167    pub fn resume_publish<T: std::convert::Into<std::string::String>>(&self, ordering_key: T) {
168        let _ = self
169            .tx
170            .send(ToDispatcher::ResumePublish(ordering_key.into()));
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use crate::publisher::builder::PublisherPartialBuilder;
178    use crate::publisher::client::BasePublisher;
179    use crate::publisher::constants::*;
180    use crate::publisher::options::BatchingOptions;
181    use crate::{
182        generated::gapic_dataplane::client::Publisher as GapicPublisher,
183        model::{Message, PublishResponse},
184    };
185    use google_cloud_test_macros::tokio_test_no_panics;
186    use mockall::Sequence;
187    use rand::{RngExt, distr::Alphanumeric};
188    use std::error::Error;
189    use std::time::Duration;
190
191    static TOPIC: &str = "my-topic";
192
193    mockall::mock! {
194        #[derive(Debug)]
195        GapicPublisher {}
196        impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisher {
197            async fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> crate::Result<crate::Response<crate::model::PublishResponse>>;
198        }
199    }
200
201    // Similar to GapicPublisher but returns impl Future instead.
202    // This is useful for mocking a response with delays/timeouts.
203    // See https://github.com/asomers/mockall/issues/189 for more
204    // detail on why this is needed.
205    // While this can used inplace of GapicPublisher, it makes the
206    // normal usage without async closure much more cumbersome.
207    mockall::mock! {
208        #[derive(Debug)]
209        GapicPublisherWithFuture {}
210        impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
211            fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> impl Future<Output=crate::Result<crate::Response<crate::model::PublishResponse>>> + Send;
212        }
213    }
214
215    fn publish_ok(
216        req: crate::model::PublishRequest,
217        _options: crate::RequestOptions,
218    ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
219        let ids = req
220            .messages
221            .iter()
222            .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
223        Ok(crate::Response::from(
224            PublishResponse::new().set_message_ids(ids),
225        ))
226    }
227
228    fn publish_err(
229        _req: crate::model::PublishRequest,
230        _options: crate::RequestOptions,
231    ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
232        Err(crate::Error::service(
233            google_cloud_gax::error::rpc::Status::default()
234                .set_code(google_cloud_gax::error::rpc::Code::Unknown)
235                .set_message("unknown error has occurred"),
236        ))
237    }
238
239    #[track_caller]
240    fn assert_publish_err(got_err: crate::error::PublishError) {
241        assert!(
242            matches!(got_err, crate::error::PublishError::Rpc(_)),
243            "{got_err:?}"
244        );
245        let source = got_err
246            .source()
247            .and_then(|e| e.downcast_ref::<std::sync::Arc<crate::Error>>())
248            .expect("send error should contain a source");
249        assert!(source.status().is_some(), "{got_err:?}");
250        assert_eq!(
251            source.status().unwrap().code,
252            google_cloud_gax::error::rpc::Code::Unknown,
253            "{got_err:?}"
254        );
255    }
256
257    fn generate_random_data() -> String {
258        rand::rng()
259            .sample_iter(&Alphanumeric)
260            .take(16)
261            .map(char::from)
262            .collect()
263    }
264
265    macro_rules! assert_publishing_is_ok {
266        ($publisher:ident, $($ordering_key:expr),+) => {
267            $(
268                let msg = generate_random_data();
269                let got = $publisher
270                    .publish(
271                        Message::new()
272                            .set_ordering_key($ordering_key)
273                            .set_data(msg.clone()),
274                    )
275                    .await;
276                assert_eq!(got?, msg);
277            )+
278        };
279    }
280
281    macro_rules! assert_publishing_is_paused {
282        ($publisher:ident, $($ordering_key:expr),+) => {
283            $(
284                let got_err = $publisher
285                    .publish(
286                        Message::new()
287                            .set_ordering_key($ordering_key)
288                            .set_data(generate_random_data()),
289                    )
290                    .await;
291                assert!(
292                    matches!(got_err, Err(crate::error::PublishError::OrderingKeyPaused)),
293                    "{got_err:?}"
294                );
295            )+
296        };
297    }
298
299    #[tokio_test_no_panics]
300    async fn publisher_publish_successfully() -> anyhow::Result<()> {
301        let mut mock = MockGapicPublisher::new();
302        mock.expect_publish()
303            .times(2)
304            .withf(|req, _o| req.topic == TOPIC)
305            .returning(publish_ok);
306
307        let client = GapicPublisher::from_stub(mock);
308        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
309            .set_message_count_threshold(1_u32)
310            .build();
311
312        let messages = [
313            Message::new().set_data("hello"),
314            Message::new().set_data("world"),
315        ];
316        let mut handles = Vec::new();
317        for msg in messages {
318            let handle = publisher.publish(msg.clone());
319            handles.push((msg, handle));
320        }
321
322        for (id, rx) in handles.into_iter() {
323            let got = rx.await?;
324            let id = String::from_utf8(id.data.to_vec())?;
325            assert_eq!(got, id);
326        }
327
328        Ok(())
329    }
330
331    #[tokio::test]
332    async fn publisher_publish_large_message() -> anyhow::Result<()> {
333        let mut mock = MockGapicPublisher::new();
334        mock.expect_publish()
335            .withf(|req, _o| req.topic == TOPIC)
336            .returning(publish_ok);
337
338        let client = GapicPublisher::from_stub(mock);
339        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
340            .set_byte_threshold(1_u32)
341            .build();
342        assert_publishing_is_ok!(publisher, "");
343        assert_publishing_is_ok!(publisher, "key");
344
345        Ok(())
346    }
347
348    #[tokio::test(start_paused = true)]
349    async fn worker_handles_forced_shutdown_gracefully() -> anyhow::Result<()> {
350        let mock = MockGapicPublisher::new();
351
352        let client = GapicPublisher::from_stub(mock);
353        let (publisher, background_task_handle) =
354            PublisherPartialBuilder::new(client, TOPIC.to_string())
355                .set_message_count_threshold(100_u32)
356                .build_return_handle();
357
358        let messages = [
359            Message::new().set_data("hello"),
360            Message::new().set_data("world"),
361        ];
362        let mut handles = Vec::new();
363        for msg in messages {
364            let handle = publisher.publish(msg);
365            handles.push(handle);
366        }
367
368        background_task_handle.abort();
369
370        for rx in handles.into_iter() {
371            rx.await
372                .expect_err("expected error when background task canceled");
373        }
374
375        Ok(())
376    }
377
378    #[tokio_test_no_panics(start_paused = true)]
379    async fn dropping_publisher_flushes_pending_messages() -> anyhow::Result<()> {
380        // If we hold on to the handles returned from the publisher, it should
381        // be safe to drop the publisher and .await on the handles.
382        let mut mock = MockGapicPublisher::new();
383        mock.expect_publish()
384            .withf(|req, _o| req.topic == TOPIC)
385            .times(2)
386            .returning(publish_ok);
387
388        let client = GapicPublisher::from_stub(mock);
389        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
390            .set_message_count_threshold(1000_u32)
391            .set_delay_threshold(Duration::from_secs(60))
392            .build();
393
394        let start = tokio::time::Instant::now();
395        let messages = [
396            Message::new().set_data("hello"),
397            Message::new().set_data("world"),
398            Message::new().set_data("hello").set_ordering_key("key"),
399            Message::new().set_data("world").set_ordering_key("key"),
400        ];
401        let mut handles = Vec::new();
402        for msg in messages {
403            let handle = publisher.publish(msg.clone());
404            handles.push((msg, handle));
405        }
406        drop(publisher); // This should trigger the publisher to send all pending messages.
407
408        for (id, rx) in handles.into_iter() {
409            let got = rx.await?;
410            let id = String::from_utf8(id.data.to_vec())?;
411            assert_eq!(got, id);
412            assert_eq!(start.elapsed(), Duration::ZERO);
413        }
414
415        Ok(())
416    }
417
418    #[tokio_test_no_panics]
419    async fn publisher_handles_publish_errors() -> anyhow::Result<()> {
420        let mut mock = MockGapicPublisher::new();
421        mock.expect_publish()
422            .times(2)
423            .withf(|req, _o| req.topic == TOPIC)
424            .returning(publish_err);
425
426        let client = GapicPublisher::from_stub(mock);
427        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
428            .set_message_count_threshold(1_u32)
429            .build();
430
431        let messages = [
432            Message::new().set_data("hello"),
433            Message::new().set_data("world"),
434        ];
435
436        let mut handles = Vec::new();
437        for msg in messages {
438            let handle = publisher.publish(msg.clone());
439            handles.push(handle);
440        }
441
442        for rx in handles.into_iter() {
443            let got = rx.await;
444            assert!(got.is_err(), "{got:?}");
445        }
446
447        Ok(())
448    }
449
450    #[tokio_test_no_panics(start_paused = true)]
451    async fn flush_sends_pending_messages_immediately() -> anyhow::Result<()> {
452        let mut mock = MockGapicPublisher::new();
453        mock.expect_publish()
454            .withf(|req, _o| req.topic == TOPIC)
455            .returning(publish_ok);
456
457        let client = GapicPublisher::from_stub(mock);
458        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
459            // Set a long delay.
460            .set_message_count_threshold(1000_u32)
461            .set_delay_threshold(Duration::from_secs(60))
462            .build();
463
464        let start = tokio::time::Instant::now();
465        let messages = [
466            Message::new().set_data("hello"),
467            Message::new().set_data("world"),
468        ];
469        let mut handles = Vec::new();
470        for msg in messages {
471            let handle = publisher.publish(msg.clone());
472            handles.push((msg, handle));
473        }
474
475        publisher.flush().await;
476        assert_eq!(start.elapsed(), Duration::ZERO);
477
478        let post = publisher.publish(Message::new().set_data("after"));
479        for (id, rx) in handles.into_iter() {
480            let got = rx.await?;
481            let id = String::from_utf8(id.data.to_vec())?;
482            assert_eq!(got, id);
483            assert_eq!(start.elapsed(), Duration::ZERO);
484        }
485
486        // Validate that the post message is only sent after the next timeout.
487        // I.e., the Publisher does not continuously flush new messages.
488        let got = post.await?;
489        assert_eq!(got, "after");
490        assert_eq!(start.elapsed(), Duration::from_secs(60));
491
492        Ok(())
493    }
494
495    #[tokio_test_no_panics(start_paused = true)]
496    // Users should be able to drop handles and the messages will still send.
497    async fn dropping_handles_does_not_prevent_publishing() -> anyhow::Result<()> {
498        let mut mock = MockGapicPublisher::new();
499        mock.expect_publish()
500            .withf(|r, _| {
501                r.messages.len() == 2
502                    && r.messages[0].data == "hello"
503                    && r.messages[1].data == "world"
504            })
505            .return_once(publish_ok);
506
507        let client = GapicPublisher::from_stub(mock);
508        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
509            // Set a long delay.
510            .set_message_count_threshold(1000_u32)
511            .set_delay_threshold(Duration::from_secs(60))
512            .build();
513
514        let start = tokio::time::Instant::now();
515        let messages = [
516            Message::new().set_data("hello"),
517            Message::new().set_data("world"),
518        ];
519        for msg in messages {
520            let handle = publisher.publish(msg.clone());
521            drop(handle);
522        }
523
524        publisher.flush().await;
525        assert_eq!(start.elapsed(), Duration::ZERO);
526
527        Ok(())
528    }
529
530    #[tokio::test(start_paused = true)]
531    async fn flush_with_no_messages_is_noop() -> anyhow::Result<()> {
532        let mock = MockGapicPublisher::new();
533
534        let client = GapicPublisher::from_stub(mock);
535        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
536
537        let start = tokio::time::Instant::now();
538        publisher.flush().await;
539        assert_eq!(start.elapsed(), Duration::ZERO);
540
541        Ok(())
542    }
543
544    #[tokio_test_no_panics]
545    async fn batch_sends_on_message_count_threshold_success() -> anyhow::Result<()> {
546        // Make sure all messages in a batch receive the correct message ID.
547        let mut mock = MockGapicPublisher::new();
548        mock.expect_publish()
549            .withf(|r, _| r.messages.len() == 2)
550            .return_once(publish_ok);
551
552        let client = GapicPublisher::from_stub(mock);
553        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
554            .set_message_count_threshold(2_u32)
555            .set_byte_threshold(MAX_BYTES)
556            .set_delay_threshold(std::time::Duration::MAX)
557            .build();
558
559        let messages = [
560            Message::new().set_data("hello"),
561            Message::new().set_data("world"),
562        ];
563        let mut handles = Vec::new();
564        for msg in messages {
565            let handle = publisher.publish(msg.clone());
566            handles.push((msg, handle));
567        }
568
569        for (id, rx) in handles.into_iter() {
570            let got = rx.await?;
571            let id = String::from_utf8(id.data.to_vec())?;
572            assert_eq!(got, id);
573        }
574
575        Ok(())
576    }
577
578    #[tokio_test_no_panics]
579    async fn batch_sends_on_message_count_threshold_error() -> anyhow::Result<()> {
580        // Make sure all messages in a batch receive an error.
581        let mut mock = MockGapicPublisher::new();
582        mock.expect_publish()
583            .withf(|r, _| r.messages.len() == 2)
584            .return_once(publish_err);
585
586        let client = GapicPublisher::from_stub(mock);
587        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
588            .set_message_count_threshold(2_u32)
589            .set_byte_threshold(MAX_BYTES)
590            .set_delay_threshold(std::time::Duration::MAX)
591            .build();
592
593        let messages = [
594            Message::new().set_data("hello"),
595            Message::new().set_data("world"),
596        ];
597        let mut handles = Vec::new();
598        for msg in messages {
599            let handle = publisher.publish(msg.clone());
600            handles.push(handle);
601        }
602
603        for rx in handles.into_iter() {
604            let got = rx.await;
605            assert!(got.is_err(), "{got:?}");
606        }
607
608        Ok(())
609    }
610
611    #[tokio_test_no_panics(start_paused = true)]
612    async fn batch_sends_on_byte_threshold() -> anyhow::Result<()> {
613        // Make sure all messages in a batch receive the correct message ID.
614        let mut mock = MockGapicPublisher::new();
615        mock.expect_publish()
616            .withf(|r, _| r.messages.len() == 1)
617            .times(2)
618            .returning(publish_ok);
619
620        let client = GapicPublisher::from_stub(mock);
621        // Ensure that the first message does not pass the threshold.
622        let byte_threshold = TOPIC.len() + "hello".len() + "key".len() + 1;
623        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
624            .set_message_count_threshold(MAX_MESSAGES)
625            .set_byte_threshold(byte_threshold as u32)
626            .set_delay_threshold(std::time::Duration::MAX)
627            .build();
628
629        // Validate without ordering key.
630        let handle = publisher.publish(Message::new().set_data("hello"));
631        // Publish a second message to trigger send on threshold.
632        let _handle = publisher.publish(Message::new().set_data("world"));
633        assert_eq!(handle.await?, "hello");
634
635        // Validate with ordering key.
636        let handle = publisher.publish(Message::new().set_data("hello").set_ordering_key("key"));
637        // Publish a second message to trigger send on threshold.
638        let _handle = publisher.publish(Message::new().set_data("world").set_ordering_key("key"));
639        assert_eq!(handle.await?, "hello");
640
641        Ok(())
642    }
643
644    #[tokio_test_no_panics(start_paused = true)]
645    async fn batch_sends_on_delay_threshold() -> anyhow::Result<()> {
646        let mut mock = MockGapicPublisher::new();
647        mock.expect_publish()
648            .withf(|req, _| req.topic == TOPIC)
649            .returning(publish_ok);
650
651        let client = GapicPublisher::from_stub(mock);
652        let delay = std::time::Duration::from_millis(10);
653        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
654            .set_message_count_threshold(u32::MAX)
655            .set_byte_threshold(MAX_BYTES)
656            .set_delay_threshold(delay)
657            .build();
658
659        // Test that messages send after delay.
660        for _ in 0..3 {
661            let start = tokio::time::Instant::now();
662            let messages = [
663                Message::new().set_data("hello 0"),
664                Message::new().set_data("hello 1"),
665                Message::new()
666                    .set_data("hello 2")
667                    .set_ordering_key("ordering key 1"),
668                Message::new()
669                    .set_data("hello 3")
670                    .set_ordering_key("ordering key 2"),
671            ];
672            let mut handles = Vec::new();
673            for msg in messages {
674                let handle = publisher.publish(msg.clone());
675                handles.push((msg, handle));
676            }
677
678            for (id, rx) in handles.into_iter() {
679                let got = rx.await?;
680                let id = String::from_utf8(id.data.to_vec())?;
681                assert_eq!(got, id);
682                assert_eq!(
683                    start.elapsed(),
684                    delay,
685                    "batch of messages should have sent after {:?}",
686                    delay
687                )
688            }
689        }
690
691        Ok(())
692    }
693
694    #[tokio::test(start_paused = true)]
695    #[allow(clippy::get_first)]
696    async fn batching_separates_by_ordering_key() -> anyhow::Result<()> {
697        // Publish messages with different ordering key and validate that they are in different batches.
698        let mut mock = MockGapicPublisher::new();
699        mock.expect_publish()
700            .withf(|r, _| {
701                r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
702            })
703            .returning(publish_ok);
704
705        let client = GapicPublisher::from_stub(mock);
706        // Use a low message count to trigger batch sends.
707        let message_count_threshold = 2_u32;
708        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
709            .set_message_count_threshold(message_count_threshold)
710            .set_byte_threshold(MAX_BYTES)
711            .set_delay_threshold(std::time::Duration::MAX)
712            .build();
713
714        let num_ordering_keys = 3;
715        let mut messages = Vec::new();
716        // We want the number of messages to be a multiple of num_ordering_keys
717        // and message_count_threshold. Otherwise, the final batch of each
718        // ordering key may fail the message len assertion.
719        for i in 0..(2 * message_count_threshold * num_ordering_keys) {
720            messages.push(
721                Message::new()
722                    .set_data(format!("test message {}", i))
723                    .set_ordering_key(format!("ordering key: {}", i % num_ordering_keys)),
724            );
725        }
726        let mut handles = Vec::new();
727        for msg in messages {
728            let handle = publisher.publish(msg.clone());
729            handles.push((msg, handle));
730        }
731
732        for (id, rx) in handles.into_iter() {
733            let got = rx.await?;
734            let id = String::from_utf8(id.data.to_vec())?;
735            assert_eq!(got, id);
736        }
737
738        Ok(())
739    }
740
741    #[tokio_test_no_panics(start_paused = true)]
742    #[allow(clippy::get_first)]
743    async fn batching_handles_empty_ordering_key() -> anyhow::Result<()> {
744        // Publish messages with different ordering key and validate that they are in different batches.
745        let mut mock = MockGapicPublisher::new();
746        mock.expect_publish()
747            .withf(|r, _| {
748                r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
749            })
750            .returning(publish_ok);
751
752        let client = GapicPublisher::from_stub(mock);
753        // Use a low message count to trigger batch sends.
754        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
755            .set_message_count_threshold(2_u32)
756            .set_byte_threshold(MAX_BYTES)
757            .set_delay_threshold(std::time::Duration::MAX)
758            .build();
759
760        let messages = [
761            Message::new().set_data("hello 1"),
762            Message::new().set_data("hello 2").set_ordering_key(""),
763            Message::new()
764                .set_data("hello 3")
765                .set_ordering_key("ordering key :1"),
766            Message::new()
767                .set_data("hello 4")
768                .set_ordering_key("ordering key :1"),
769        ];
770
771        let mut handles = Vec::new();
772        for msg in messages {
773            let handle = publisher.publish(msg.clone());
774            handles.push((msg, handle));
775        }
776
777        for (id, rx) in handles.into_iter() {
778            let got = rx.await?;
779            let id = String::from_utf8(id.data.to_vec())?;
780            assert_eq!(got, id);
781        }
782
783        Ok(())
784    }
785
786    #[tokio_test_no_panics(start_paused = true)]
787    #[allow(clippy::get_first)]
788    async fn ordering_key_limits_to_one_outstanding_batch() -> anyhow::Result<()> {
789        // Verify that Publisher must only have 1 outstanding batch inflight at a time.
790        // This is done by validating that the 2 expected publish calls are done in sequence
791        // with a sleep delay in the first Publish reply.
792        let mut seq = Sequence::new();
793        let mut mock = MockGapicPublisherWithFuture::new();
794        mock.expect_publish()
795            .times(1)
796            .in_sequence(&mut seq)
797            .withf(|r, _| r.messages.len() == 1)
798            .returning({
799                |r, o| {
800                    Box::pin(async move {
801                        tokio::time::sleep(Duration::from_millis(10)).await;
802                        publish_ok(r, o)
803                    })
804                }
805            });
806
807        mock.expect_publish()
808            .times(1)
809            .in_sequence(&mut seq)
810            .withf(|r, _| r.messages.len() == 1)
811            .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
812
813        let client = GapicPublisher::from_stub(mock);
814        // Use a low message count to trigger batch sends.
815        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
816            .set_message_count_threshold(1_u32)
817            .set_byte_threshold(MAX_BYTES)
818            .set_delay_threshold(std::time::Duration::MAX)
819            .build();
820
821        let messages = [
822            Message::new()
823                .set_data("hello 1")
824                .set_ordering_key("ordering key"),
825            Message::new()
826                .set_data("hello 2")
827                .set_ordering_key("ordering key"),
828        ];
829
830        let start = tokio::time::Instant::now();
831        let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
832        let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
833        assert_eq!(msg2_handle.await?, "hello 2");
834        assert_eq!(
835            start.elapsed(),
836            Duration::from_millis(10),
837            "the second batch of messages should have sent after the first which is has been delayed by {:?}",
838            Duration::from_millis(10)
839        );
840        // Also validate the content of the first publish.
841        assert_eq!(msg1_handle.await?, "hello 1");
842
843        Ok(())
844    }
845
846    #[tokio_test_no_panics(start_paused = true)]
847    #[allow(clippy::get_first)]
848    async fn empty_ordering_key_allows_concurrent_batches() -> anyhow::Result<()> {
849        // Verify that for empty ordering key, the Publisher will send multiple batches without
850        // awaiting for the results.
851        // This is done by adding a delay in the first Publish reply and validating that
852        // the second batch does not await for the first batch.
853        let mut seq = Sequence::new();
854        let mut mock = MockGapicPublisherWithFuture::new();
855        mock.expect_publish()
856            .times(1)
857            .in_sequence(&mut seq)
858            .withf(|r, _| r.messages.len() == 1)
859            .returning(|r, o| {
860                Box::pin(async move {
861                    tokio::time::sleep(Duration::from_millis(10)).await;
862                    publish_ok(r, o)
863                })
864            });
865
866        mock.expect_publish()
867            .times(1)
868            .in_sequence(&mut seq)
869            .withf(|r, _| r.topic == TOPIC && r.messages.len() == 1)
870            .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
871
872        let client = GapicPublisher::from_stub(mock);
873        // Use a low message count to trigger batch sends.
874        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
875            .set_message_count_threshold(1_u32)
876            .set_byte_threshold(MAX_BYTES)
877            .set_delay_threshold(std::time::Duration::MAX)
878            .build();
879
880        let messages = [
881            Message::new().set_data("hello 1").set_ordering_key(""),
882            Message::new().set_data("hello 2").set_ordering_key(""),
883        ];
884
885        let start = tokio::time::Instant::now();
886        let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
887        let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
888        assert_eq!(msg2_handle.await?, "hello 2");
889        assert_eq!(
890            start.elapsed(),
891            Duration::from_millis(0),
892            "the second batch of messages should have sent without any delay"
893        );
894        // Also validate the content of the first publish.
895        assert_eq!(msg1_handle.await?, "hello 1");
896
897        Ok(())
898    }
899
900    #[tokio_test_no_panics(start_paused = true)]
901    async fn ordering_key_error_pauses_publisher() -> anyhow::Result<()> {
902        // Verify that a Publish send error will pause the publisher for an ordering key.
903        let mut seq = Sequence::new();
904        let mut mock = MockGapicPublisher::new();
905        mock.expect_publish()
906            .withf(|req, _o| req.topic == TOPIC)
907            .times(1)
908            .in_sequence(&mut seq)
909            .returning(publish_err);
910
911        mock.expect_publish()
912            .withf(|req, _o| req.topic == TOPIC)
913            .times(2)
914            .in_sequence(&mut seq)
915            .returning(publish_ok);
916
917        let client = GapicPublisher::from_stub(mock);
918        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
919            .set_message_count_threshold(1_u32)
920            .build();
921
922        let key = "ordering_key";
923        let msg_0_handle =
924            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
925        // Publish an additional message so that there are pending messages.
926        let msg_1_handle =
927            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
928
929        // Assert the error is caused by the Publish send operation.
930        let mut got_err = msg_0_handle.await.unwrap_err();
931        assert_publish_err(got_err);
932
933        // Assert that the pending message error is caused by the Publisher being paused.
934        got_err = msg_1_handle.await.unwrap_err();
935        assert!(
936            matches!(got_err, crate::error::PublishError::OrderingKeyPaused),
937            "{got_err:?}"
938        );
939
940        // Assert that new publish messages return errors because the Publisher is paused.
941        for _ in 0..3 {
942            assert_publishing_is_paused!(publisher, key);
943        }
944
945        // Verify that the other ordering keys are not paused.
946        assert_publishing_is_ok!(publisher, "", "without_error");
947
948        Ok(())
949    }
950
951    #[tokio_test_no_panics(start_paused = true)]
952    async fn batch_error_pauses_ordering_key() -> anyhow::Result<()> {
953        // Verify that all messages in the same batch receives the Send error for that batch.
954        let mut mock = MockGapicPublisher::new();
955        mock.expect_publish()
956            .times(1)
957            .withf(|r, _| r.topic == TOPIC && r.messages.len() == 2)
958            .returning(publish_err);
959
960        let client = GapicPublisher::from_stub(mock);
961        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
962            .set_message_count_threshold(2_u32)
963            .build();
964
965        let key = "ordering_key";
966        // Publish 2 messages so they are in the same batch.
967        let msg_0_handle =
968            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
969        let msg_1_handle =
970            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
971
972        // Validate that they both receives the Send error.
973        let mut got_err = msg_0_handle.await.unwrap_err();
974        assert_publish_err(got_err);
975        got_err = msg_1_handle.await.unwrap_err();
976        assert_publish_err(got_err);
977
978        // Assert that new publish messages returns an error because the Publisher is paused.
979        assert_publishing_is_paused!(publisher, key);
980
981        Ok(())
982    }
983
984    #[tokio_test_no_panics(start_paused = true)]
985    async fn flush_on_paused_ordering_key_returns_error() -> anyhow::Result<()> {
986        // Verify that Flush on a paused ordering key returns an error.
987        let mut seq = Sequence::new();
988        let mut mock = MockGapicPublisher::new();
989        mock.expect_publish()
990            .withf(|req, _o| req.topic == TOPIC)
991            .times(1)
992            .in_sequence(&mut seq)
993            .returning(publish_err);
994
995        mock.expect_publish()
996            .withf(|req, _o| req.topic == TOPIC)
997            .times(2)
998            .in_sequence(&mut seq)
999            .returning(publish_ok);
1000
1001        let client = GapicPublisher::from_stub(mock);
1002        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1003
1004        let key = "ordering_key";
1005        // Cause an ordering key to be paused.
1006        let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1007        publisher.flush().await;
1008        // Assert the error is caused by the Publish send operation.
1009        let got_err = handle.await.unwrap_err();
1010        assert_publish_err(got_err);
1011
1012        // Validate that new Publish on the paused ordering key will result in an error.
1013        assert_publishing_is_paused!(publisher, key);
1014
1015        // Verify that the other ordering keys are not paused.
1016        assert_publishing_is_ok!(publisher, "", "without_error");
1017
1018        Ok(())
1019    }
1020
1021    #[tokio_test_no_panics(start_paused = true)]
1022    async fn resuming_non_paused_ordering_key_is_noop() -> anyhow::Result<()> {
1023        let mut mock = MockGapicPublisher::new();
1024        mock.expect_publish()
1025            .withf(|req, _o| req.topic == TOPIC)
1026            .times(4)
1027            .returning(publish_ok);
1028
1029        let client = GapicPublisher::from_stub(mock);
1030        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1031
1032        // Test resume and publish for empty ordering key.
1033        publisher.resume_publish("");
1034        assert_publishing_is_ok!(publisher, "");
1035
1036        // Test resume and publish after the BatchActor has been created for the empty ordering key.
1037        publisher.resume_publish("");
1038        assert_publishing_is_ok!(publisher, "");
1039
1040        // Test resume and publish before the BatchActor has been created.
1041        let key = "without_error";
1042        publisher.resume_publish(key);
1043        assert_publishing_is_ok!(publisher, key);
1044
1045        // Test resume and publish after the BatchActor has been created.
1046        publisher.resume_publish(key);
1047        assert_publishing_is_ok!(publisher, key);
1048
1049        Ok(())
1050    }
1051
1052    #[tokio_test_no_panics(start_paused = true)]
1053    async fn resuming_paused_ordering_key_allows_publishing() -> anyhow::Result<()> {
1054        let mut seq = Sequence::new();
1055        let mut mock = MockGapicPublisher::new();
1056        mock.expect_publish()
1057            .withf(|req, _o| req.topic == TOPIC)
1058            .times(1)
1059            .in_sequence(&mut seq)
1060            .returning(publish_err);
1061
1062        mock.expect_publish()
1063            .withf(|req, _o| req.topic == TOPIC)
1064            .times(3)
1065            .in_sequence(&mut seq)
1066            .returning(publish_ok);
1067
1068        let client = GapicPublisher::from_stub(mock);
1069        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1070
1071        let key = "ordering_key";
1072        // Cause an ordering key to be paused.
1073        let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1074        // Assert the error is caused by the Publish send operation.
1075        let got_err = handle.await.unwrap_err();
1076        assert_publish_err(got_err);
1077
1078        // Validate that new Publish on the paused ordering key will result in an error.
1079        assert_publishing_is_paused!(publisher, key);
1080
1081        // Resume and validate the key is no longer paused.
1082        publisher.resume_publish(key);
1083        assert_publishing_is_ok!(publisher, key);
1084
1085        // Verify that the other ordering keys continue to work as expected.
1086        assert_publishing_is_ok!(publisher, "", "without_error");
1087
1088        Ok(())
1089    }
1090
1091    #[tokio_test_no_panics(start_paused = true)]
1092    async fn resuming_ordering_key_twice_is_safe() -> anyhow::Result<()> {
1093        // Validate that resuming twice sequentially does not have bad side effects.
1094        let mut seq = Sequence::new();
1095        let mut mock = MockGapicPublisher::new();
1096        mock.expect_publish()
1097            .withf(|req, _o| req.topic == TOPIC)
1098            .in_sequence(&mut seq)
1099            .times(1)
1100            .returning(publish_err);
1101
1102        mock.expect_publish()
1103            .withf(|req, _o| req.topic == TOPIC)
1104            .in_sequence(&mut seq)
1105            .return_once(publish_ok);
1106
1107        let client = GapicPublisher::from_stub(mock);
1108        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1109
1110        let key = "ordering_key";
1111        // Cause an ordering key to be paused.
1112        let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1113        publisher.flush().await;
1114        // Assert the error is caused by the Publish send operation.
1115        let got_err = handle.await.unwrap_err();
1116        assert_publish_err(got_err);
1117
1118        // Validate that new Publish on the paused ordering key will result in an error.
1119        assert_publishing_is_paused!(publisher, key);
1120
1121        // Resume twice on the paused ordering key.
1122        publisher.resume_publish(key);
1123        publisher.resume_publish(key);
1124        assert_publishing_is_ok!(publisher, key);
1125
1126        Ok(())
1127    }
1128
1129    #[tokio_test_no_panics(start_paused = true)]
1130    async fn resuming_one_ordering_key_does_not_resume_others() -> anyhow::Result<()> {
1131        // Validate that resume_publish only resumes the paused ordering key .
1132        let mut seq = Sequence::new();
1133        let mut mock = MockGapicPublisher::new();
1134        mock.expect_publish()
1135            .withf(|req, _o| req.topic == TOPIC)
1136            .times(2)
1137            .in_sequence(&mut seq)
1138            .returning(publish_err);
1139
1140        mock.expect_publish()
1141            .withf(|req, _o| req.topic == TOPIC)
1142            .times(1)
1143            .in_sequence(&mut seq)
1144            .returning(publish_ok);
1145
1146        let client = GapicPublisher::from_stub(mock);
1147        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1148
1149        let key_0 = "ordering_key_0";
1150        let key_1 = "ordering_key_1";
1151        // Cause both ordering keys to pause.
1152        let handle_0 = publisher.publish(Message::new().set_ordering_key(key_0).set_data("msg 0"));
1153        let handle_1 = publisher.publish(Message::new().set_ordering_key(key_1).set_data("msg 1"));
1154        publisher.flush().await;
1155        let mut got_err = handle_0.await.unwrap_err();
1156        assert_publish_err(got_err);
1157        got_err = handle_1.await.unwrap_err();
1158        assert_publish_err(got_err);
1159
1160        // Assert that both ordering keys are paused.
1161        assert_publishing_is_paused!(publisher, key_0, key_1);
1162
1163        // Resume on one of the ordering key.
1164        publisher.resume_publish(key_0);
1165
1166        // Validate that only the correct ordering key is resumed.
1167        assert_publishing_is_ok!(publisher, key_0);
1168
1169        // Validate the other ordering key is still paused.
1170        assert_publishing_is_paused!(publisher, key_1);
1171
1172        Ok(())
1173    }
1174
1175    #[tokio::test]
1176    async fn publisher_builder_clamps_batching_options() -> anyhow::Result<()> {
1177        // Test values that are too high and should be clamped.
1178        let oversized_options = BatchingOptions::new()
1179            .set_delay_threshold(MAX_DELAY + Duration::from_secs(1))
1180            .set_message_count_threshold(MAX_MESSAGES + 1)
1181            .set_byte_threshold(MAX_BYTES + 1);
1182
1183        let publishers = vec![
1184            BasePublisher::builder()
1185                .build()
1186                .await?
1187                .publisher("projects/my-project/topics/my-topic")
1188                .set_delay_threshold(oversized_options.delay_threshold)
1189                .set_message_count_threshold(oversized_options.message_count_threshold)
1190                .set_byte_threshold(oversized_options.byte_threshold)
1191                .build(),
1192            Publisher::builder("projects/my-project/topics/my-topic".to_string())
1193                .set_delay_threshold(oversized_options.delay_threshold)
1194                .set_message_count_threshold(oversized_options.message_count_threshold)
1195                .set_byte_threshold(oversized_options.byte_threshold)
1196                .build()
1197                .await?,
1198        ];
1199
1200        for publisher in publishers {
1201            let got = publisher.batching_options;
1202            assert_eq!(got.delay_threshold, MAX_DELAY);
1203            assert_eq!(got.message_count_threshold, MAX_MESSAGES);
1204            assert_eq!(got.byte_threshold, MAX_BYTES);
1205        }
1206
1207        // Test values that are within limits and should not be changed.
1208        let normal_options = BatchingOptions::new()
1209            .set_delay_threshold(Duration::from_secs(10))
1210            .set_message_count_threshold(10_u32)
1211            .set_byte_threshold(100_u32);
1212
1213        let publishers = vec![
1214            BasePublisher::builder()
1215                .build()
1216                .await?
1217                .publisher("projects/my-project/topics/my-topic")
1218                .set_delay_threshold(normal_options.delay_threshold)
1219                .set_message_count_threshold(normal_options.message_count_threshold)
1220                .set_byte_threshold(normal_options.byte_threshold)
1221                .build(),
1222            Publisher::builder("projects/my-project/topics/my-topic".to_string())
1223                .set_delay_threshold(normal_options.delay_threshold)
1224                .set_message_count_threshold(normal_options.message_count_threshold)
1225                .set_byte_threshold(normal_options.byte_threshold)
1226                .build()
1227                .await?,
1228        ];
1229
1230        for publisher in publishers {
1231            let got = publisher.batching_options;
1232
1233            assert_eq!(got.delay_threshold, normal_options.delay_threshold);
1234            assert_eq!(
1235                got.message_count_threshold,
1236                normal_options.message_count_threshold
1237            );
1238            assert_eq!(got.byte_threshold, normal_options.byte_threshold);
1239        }
1240        Ok(())
1241    }
1242}