Skip to main content

google_cloud_pubsub/publisher/
client.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::options::BatchingOptions;
16use crate::publisher::actor::BundledMessage;
17use crate::publisher::actor::ToDispatcher;
18use crate::publisher::builder::PublisherBuilder;
19
20use tokio::sync::mpsc::UnboundedSender;
21use tokio::sync::oneshot;
22
23pub use super::base_publisher::BasePublisher;
24
25/// A Publisher client for the [Cloud Pub/Sub] API.
26///
27/// A `Publisher` sends messages to a specific topic. It manages message batching
28/// and sending in a background task.
29///
30/// ```
31/// # async fn sample() -> anyhow::Result<()> {
32/// # use google_cloud_pubsub::*;
33/// # use google_cloud_pubsub::client::Publisher;
34/// # use model::Message;
35/// let publisher = Publisher::builder("projects/my-project/topics/my-topic").build().await?;
36/// let message_id_future = publisher.publish(Message::new().set_data("Hello, World"));
37/// # Ok(()) }
38/// ```
39///
40/// # Configuration
41///
42/// To configure `Publisher` use the `with_*` methods in the type returned
43/// by [builder()][Publisher::builder]. The default configuration should
44/// work for most applications. Common configuration changes include
45///
46/// * [with_endpoint()]: by default this client uses the global default endpoint
47///   (`https://pubsub.googleapis.com`). Applications using regional
48///   endpoints or running in restricted networks (e.g. a network configured
49//    with [Private Google Access with VPC Service Controls]) may want to
50///   override this default.
51/// * [with_credentials()]: by default this client uses
52///   [Application Default Credentials]. Applications using custom
53///   authentication may need to override this default.
54///
55/// # Pooling and Cloning
56///
57/// `Publisher` holds a connection pool internally, it is advised to
58/// create one and then reuse it. You do not need to wrap `Publisher` in
59/// an [Rc](std::rc::Rc) or [Arc](std::sync::Arc) to reuse it.
60///
61/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview
62/// [with_endpoint()]: crate::builder::publisher::PublisherBuilder::with_endpoint
63/// [with_credentials()]: crate::builder::publisher::PublisherBuilder::with_credentials
64/// [Private Google Access with VPC Service Controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
65/// [Application Default Credentials]: https://cloud.google.com/docs/authentication#adc
66#[derive(Debug, Clone)]
67pub struct Publisher {
68    // A copy of the batching options are stored in the Publisher for testing
69    // purposes and also to include in the Debug output.
70    #[cfg_attr(not(test), expect(dead_code))]
71    pub(crate) batching_options: BatchingOptions,
72    pub(crate) tx: UnboundedSender<ToDispatcher>,
73}
74
75impl Publisher {
76    /// Returns a builder for [Publisher].
77    ///
78    /// # Example
79    ///
80    /// ```
81    /// # async fn sample() -> anyhow::Result<()> {
82    /// # use google_cloud_pubsub::*;
83    /// # use google_cloud_pubsub::client::Publisher;
84    /// let publisher = Publisher::builder("projects/my-project/topics/topic").build().await?;
85    /// # Ok(()) }
86    /// ```
87    pub fn builder<T: Into<String>>(topic: T) -> PublisherBuilder {
88        PublisherBuilder::new(topic.into())
89    }
90
91    /// Publishes a message to the topic.
92    ///
93    /// When this method encounters a non-recoverable error publishing for an ordering key,
94    /// it will pause publishing on all new messages on that ordering key. Any outstanding
95    /// messages that have not yet been published may return an error.
96    ///
97    /// ```
98    /// # use google_cloud_pubsub::client::Publisher;
99    /// # async fn sample(publisher: Publisher) -> anyhow::Result<()> {
100    /// # use google_cloud_pubsub::model::Message;
101    /// let message_id = publisher.publish(Message::new().set_data("Hello, World")).await?;
102    /// # Ok(()) }
103    /// ```
104    #[must_use = "ignoring the publish result may lead to undetected delivery failures"]
105    pub fn publish(&self, msg: crate::model::Message) -> crate::publisher::PublishFuture {
106        let (tx, rx) = tokio::sync::oneshot::channel();
107
108        // If this fails, the Dispatcher is gone, which indicates it has been dropped,
109        // possibly due to the background task being stopped by the runtime.
110        // The PublishFuture will automatically receive an error when `tx` is dropped.
111        if self
112            .tx
113            .send(ToDispatcher::Publish(BundledMessage { msg, tx }))
114            .is_err()
115        {
116            // `tx` is dropped here if the send errors.
117        }
118        crate::publisher::PublishFuture { rx }
119    }
120
121    /// Flushes all buffered messages across all ordering keys, sending them immediately.
122    ///
123    /// ```
124    /// # use google_cloud_pubsub::model::Message;
125    /// # async fn sample(publisher: google_cloud_pubsub::client::Publisher) -> anyhow::Result<()> {
126    /// let _handle = publisher.publish(Message::new().set_data("event"));
127    /// // Ensures the message above is sent without needing to track its future.
128    /// publisher.flush().await;
129    /// # Ok(())
130    /// # }
131    /// ```
132    ///
133    /// This method bypasses configured batching delays and returns only after all
134    /// messages buffered at the time of the call have reached a terminal state
135    /// (success or permanent failure).
136    ///
137    /// ### Recommendations
138    ///
139    /// *   For most use cases, we recommend you `.await`
140    ///     the [`PublishFuture`][crate::publisher::PublishFuture] returned by
141    ///     [`publish`][Self::publish] to retrieve message IDs and handle
142    ///     specific errors.
143    /// *   Use `flush()` as a convenience during application shutdown to
144    ///     ensure the client attempts to send all outstanding data.
145    pub async fn flush(&self) {
146        let (tx, rx) = oneshot::channel();
147        if self.tx.send(ToDispatcher::Flush(tx)).is_ok() {
148            let _ = rx.await;
149        }
150    }
151
152    /// Resume accepting publish for a paused ordering key.
153    ///
154    /// Publishing using an ordering key might be paused if an error is encountered while publishing, to prevent messages from being published out of order.
155    /// If the ordering key is not currently paused, this function is a no-op.
156    ///
157    /// # Example
158    ///
159    /// ```
160    /// # use google_cloud_pubsub::model::Message;
161    /// # async fn sample(publisher: google_cloud_pubsub::client::Publisher) -> anyhow::Result<()> {
162    /// if let Err(_) = publisher.publish(Message::new().set_data("foo").set_ordering_key("bar")).await {
163    ///     // Error handling code can go here.
164    ///     publisher.resume_publish("bar");
165    /// }
166    /// # Ok(())
167    /// # }
168    /// ```
169    pub fn resume_publish<T: std::convert::Into<std::string::String>>(&self, ordering_key: T) {
170        let _ = self
171            .tx
172            .send(ToDispatcher::ResumePublish(ordering_key.into()));
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::publisher::builder::PublisherPartialBuilder;
180    use crate::publisher::client::BasePublisher;
181    use crate::publisher::constants::*;
182    use crate::publisher::options::BatchingOptions;
183    use crate::{
184        generated::gapic_dataplane::client::Publisher as GapicPublisher,
185        model::{Message, PublishResponse},
186    };
187    use google_cloud_test_macros::tokio_test_no_panics;
188    use mockall::Sequence;
189    use rand::{RngExt, distr::Alphanumeric};
190    use std::error::Error;
191    use std::time::Duration;
192
193    static TOPIC: &str = "my-topic";
194
195    mockall::mock! {
196        #[derive(Debug)]
197        GapicPublisher {}
198        impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisher {
199            async fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> crate::Result<crate::Response<crate::model::PublishResponse>>;
200        }
201    }
202
203    // Similar to GapicPublisher but returns impl Future instead.
204    // This is useful for mocking a response with delays/timeouts.
205    // See https://github.com/asomers/mockall/issues/189 for more
206    // detail on why this is needed.
207    // While this can used inplace of GapicPublisher, it makes the
208    // normal usage without async closure much more cumbersome.
209    mockall::mock! {
210        #[derive(Debug)]
211        GapicPublisherWithFuture {}
212        impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
213            fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> impl Future<Output=crate::Result<crate::Response<crate::model::PublishResponse>>> + Send;
214        }
215    }
216
217    fn publish_ok(
218        req: crate::model::PublishRequest,
219        _options: crate::RequestOptions,
220    ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
221        let ids = req
222            .messages
223            .iter()
224            .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
225        Ok(crate::Response::from(
226            PublishResponse::new().set_message_ids(ids),
227        ))
228    }
229
230    fn publish_err(
231        _req: crate::model::PublishRequest,
232        _options: crate::RequestOptions,
233    ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
234        Err(crate::Error::service(
235            google_cloud_gax::error::rpc::Status::default()
236                .set_code(google_cloud_gax::error::rpc::Code::Unknown)
237                .set_message("unknown error has occurred"),
238        ))
239    }
240
241    #[track_caller]
242    fn assert_publish_err(got_err: crate::error::PublishError) {
243        assert!(
244            matches!(got_err, crate::error::PublishError::Rpc(_)),
245            "{got_err:?}"
246        );
247        let source = got_err
248            .source()
249            .and_then(|e| e.downcast_ref::<std::sync::Arc<crate::Error>>())
250            .expect("send error should contain a source");
251        assert!(source.status().is_some(), "{got_err:?}");
252        assert_eq!(
253            source.status().unwrap().code,
254            google_cloud_gax::error::rpc::Code::Unknown,
255            "{got_err:?}"
256        );
257    }
258
259    fn generate_random_data() -> String {
260        rand::rng()
261            .sample_iter(&Alphanumeric)
262            .take(16)
263            .map(char::from)
264            .collect()
265    }
266
267    macro_rules! assert_publishing_is_ok {
268        ($publisher:ident, $($ordering_key:expr),+) => {
269            $(
270                let msg = generate_random_data();
271                let got = $publisher
272                    .publish(
273                        Message::new()
274                            .set_ordering_key($ordering_key)
275                            .set_data(msg.clone()),
276                    )
277                    .await;
278                assert_eq!(got?, msg);
279            )+
280        };
281    }
282
283    macro_rules! assert_publishing_is_paused {
284        ($publisher:ident, $($ordering_key:expr),+) => {
285            $(
286                let got_err = $publisher
287                    .publish(
288                        Message::new()
289                            .set_ordering_key($ordering_key)
290                            .set_data(generate_random_data()),
291                    )
292                    .await;
293                assert!(
294                    matches!(got_err, Err(crate::error::PublishError::OrderingKeyPaused)),
295                    "{got_err:?}"
296                );
297            )+
298        };
299    }
300
301    #[tokio_test_no_panics]
302    async fn publisher_publish_successfully() -> anyhow::Result<()> {
303        let mut mock = MockGapicPublisher::new();
304        mock.expect_publish()
305            .times(2)
306            .withf(|req, _o| req.topic == TOPIC)
307            .returning(publish_ok);
308
309        let client = GapicPublisher::from_stub(mock);
310        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
311            .set_message_count_threshold(1_u32)
312            .build();
313
314        let messages = [
315            Message::new().set_data("hello"),
316            Message::new().set_data("world"),
317        ];
318        let mut handles = Vec::new();
319        for msg in messages {
320            let handle = publisher.publish(msg.clone());
321            handles.push((msg, handle));
322        }
323
324        for (id, rx) in handles.into_iter() {
325            let got = rx.await?;
326            let id = String::from_utf8(id.data.to_vec())?;
327            assert_eq!(got, id);
328        }
329
330        Ok(())
331    }
332
333    #[tokio_test_no_panics]
334    async fn publisher_publish_successfully_with_arc() -> anyhow::Result<()> {
335        let mut mock = MockGapicPublisher::new();
336        mock.expect_publish()
337            .times(2)
338            .withf(|req, _o| req.topic == TOPIC)
339            .returning(publish_ok);
340
341        let mock_arc = std::sync::Arc::new(mock);
342        let client = GapicPublisher::from_stub::<MockGapicPublisher>(mock_arc);
343        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
344            .set_message_count_threshold(1_u32)
345            .build();
346
347        let messages = [
348            Message::new().set_data("hello"),
349            Message::new().set_data("world"),
350        ];
351        let mut handles = Vec::new();
352        for msg in messages {
353            let handle = publisher.publish(msg.clone());
354            handles.push((msg, handle));
355        }
356
357        for (id, rx) in handles.into_iter() {
358            let got = rx.await?;
359            let id = String::from_utf8(id.data.to_vec())?;
360            assert_eq!(got, id);
361        }
362
363        Ok(())
364    }
365
366    #[tokio::test]
367    async fn publisher_publish_large_message() -> anyhow::Result<()> {
368        let mut mock = MockGapicPublisher::new();
369        mock.expect_publish()
370            .withf(|req, _o| req.topic == TOPIC)
371            .returning(publish_ok);
372
373        let client = GapicPublisher::from_stub(mock);
374        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
375            .set_byte_threshold(1_u32)
376            .build();
377        assert_publishing_is_ok!(publisher, "");
378        assert_publishing_is_ok!(publisher, "key");
379
380        Ok(())
381    }
382
383    #[tokio::test(start_paused = true)]
384    async fn worker_handles_forced_shutdown_gracefully() -> anyhow::Result<()> {
385        let mock = MockGapicPublisher::new();
386
387        let client = GapicPublisher::from_stub(mock);
388        let (publisher, background_task_handle) =
389            PublisherPartialBuilder::new(client, TOPIC.to_string())
390                .set_message_count_threshold(100_u32)
391                .build_return_handle();
392
393        let messages = [
394            Message::new().set_data("hello"),
395            Message::new().set_data("world"),
396        ];
397        let mut handles = Vec::new();
398        for msg in messages {
399            let handle = publisher.publish(msg);
400            handles.push(handle);
401        }
402
403        background_task_handle.abort();
404
405        for rx in handles.into_iter() {
406            rx.await
407                .expect_err("expected error when background task canceled");
408        }
409
410        Ok(())
411    }
412
413    #[tokio_test_no_panics(start_paused = true)]
414    async fn dropping_publisher_flushes_pending_messages() -> anyhow::Result<()> {
415        // If we hold on to the handles returned from the publisher, it should
416        // be safe to drop the publisher and .await on the handles.
417        let mut mock = MockGapicPublisher::new();
418        mock.expect_publish()
419            .withf(|req, _o| req.topic == TOPIC)
420            .times(2)
421            .returning(publish_ok);
422
423        let client = GapicPublisher::from_stub(mock);
424        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
425            .set_message_count_threshold(1000_u32)
426            .set_delay_threshold(Duration::from_secs(60))
427            .build();
428
429        let start = tokio::time::Instant::now();
430        let messages = [
431            Message::new().set_data("hello"),
432            Message::new().set_data("world"),
433            Message::new().set_data("hello").set_ordering_key("key"),
434            Message::new().set_data("world").set_ordering_key("key"),
435        ];
436        let mut handles = Vec::new();
437        for msg in messages {
438            let handle = publisher.publish(msg.clone());
439            handles.push((msg, handle));
440        }
441        drop(publisher); // This should trigger the publisher to send all pending messages.
442
443        for (id, rx) in handles.into_iter() {
444            let got = rx.await?;
445            let id = String::from_utf8(id.data.to_vec())?;
446            assert_eq!(got, id);
447            assert_eq!(start.elapsed(), Duration::ZERO);
448        }
449
450        Ok(())
451    }
452
453    #[tokio_test_no_panics]
454    async fn publisher_handles_publish_errors() -> anyhow::Result<()> {
455        let mut mock = MockGapicPublisher::new();
456        mock.expect_publish()
457            .times(2)
458            .withf(|req, _o| req.topic == TOPIC)
459            .returning(publish_err);
460
461        let client = GapicPublisher::from_stub(mock);
462        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
463            .set_message_count_threshold(1_u32)
464            .build();
465
466        let messages = [
467            Message::new().set_data("hello"),
468            Message::new().set_data("world"),
469        ];
470
471        let mut handles = Vec::new();
472        for msg in messages {
473            let handle = publisher.publish(msg.clone());
474            handles.push(handle);
475        }
476
477        for rx in handles.into_iter() {
478            let got = rx.await;
479            assert!(got.is_err(), "{got:?}");
480        }
481
482        Ok(())
483    }
484
485    #[tokio_test_no_panics(start_paused = true)]
486    async fn flush_sends_pending_messages_immediately() -> anyhow::Result<()> {
487        let mut mock = MockGapicPublisher::new();
488        mock.expect_publish()
489            .withf(|req, _o| req.topic == TOPIC)
490            .returning(publish_ok);
491
492        let client = GapicPublisher::from_stub(mock);
493        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
494            // Set a long delay.
495            .set_message_count_threshold(1000_u32)
496            .set_delay_threshold(Duration::from_secs(60))
497            .build();
498
499        let start = tokio::time::Instant::now();
500        let messages = [
501            Message::new().set_data("hello"),
502            Message::new().set_data("world"),
503        ];
504        let mut handles = Vec::new();
505        for msg in messages {
506            let handle = publisher.publish(msg.clone());
507            handles.push((msg, handle));
508        }
509
510        publisher.flush().await;
511        assert_eq!(start.elapsed(), Duration::ZERO);
512
513        let post = publisher.publish(Message::new().set_data("after"));
514        for (id, rx) in handles.into_iter() {
515            let got = rx.await?;
516            let id = String::from_utf8(id.data.to_vec())?;
517            assert_eq!(got, id);
518            assert_eq!(start.elapsed(), Duration::ZERO);
519        }
520
521        // Validate that the post message is only sent after the next timeout.
522        // I.e., the Publisher does not continuously flush new messages.
523        let got = post.await?;
524        assert_eq!(got, "after");
525        assert_eq!(start.elapsed(), Duration::from_secs(60));
526
527        Ok(())
528    }
529
530    #[tokio_test_no_panics(start_paused = true)]
531    // Users should be able to drop handles and the messages will still send.
532    async fn dropping_handles_does_not_prevent_publishing() -> anyhow::Result<()> {
533        let mut mock = MockGapicPublisher::new();
534        mock.expect_publish()
535            .withf(|r, _| {
536                r.messages.len() == 2
537                    && r.messages[0].data == "hello"
538                    && r.messages[1].data == "world"
539            })
540            .return_once(publish_ok);
541
542        let client = GapicPublisher::from_stub(mock);
543        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
544            // Set a long delay.
545            .set_message_count_threshold(1000_u32)
546            .set_delay_threshold(Duration::from_secs(60))
547            .build();
548
549        let start = tokio::time::Instant::now();
550        let messages = [
551            Message::new().set_data("hello"),
552            Message::new().set_data("world"),
553        ];
554        for msg in messages {
555            let handle = publisher.publish(msg.clone());
556            drop(handle);
557        }
558
559        publisher.flush().await;
560        assert_eq!(start.elapsed(), Duration::ZERO);
561
562        Ok(())
563    }
564
565    #[tokio::test(start_paused = true)]
566    async fn flush_with_no_messages_is_noop() -> anyhow::Result<()> {
567        let mock = MockGapicPublisher::new();
568
569        let client = GapicPublisher::from_stub(mock);
570        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
571
572        let start = tokio::time::Instant::now();
573        publisher.flush().await;
574        assert_eq!(start.elapsed(), Duration::ZERO);
575
576        Ok(())
577    }
578
579    #[tokio_test_no_panics]
580    async fn batch_sends_on_message_count_threshold_success() -> anyhow::Result<()> {
581        // Make sure all messages in a batch receive the correct message ID.
582        let mut mock = MockGapicPublisher::new();
583        mock.expect_publish()
584            .withf(|r, _| r.messages.len() == 2)
585            .return_once(publish_ok);
586
587        let client = GapicPublisher::from_stub(mock);
588        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
589            .set_message_count_threshold(2_u32)
590            .set_byte_threshold(MAX_BYTES)
591            .set_delay_threshold(std::time::Duration::MAX)
592            .build();
593
594        let messages = [
595            Message::new().set_data("hello"),
596            Message::new().set_data("world"),
597        ];
598        let mut handles = Vec::new();
599        for msg in messages {
600            let handle = publisher.publish(msg.clone());
601            handles.push((msg, handle));
602        }
603
604        for (id, rx) in handles.into_iter() {
605            let got = rx.await?;
606            let id = String::from_utf8(id.data.to_vec())?;
607            assert_eq!(got, id);
608        }
609
610        Ok(())
611    }
612
613    #[tokio_test_no_panics]
614    async fn batch_sends_on_message_count_threshold_error() -> anyhow::Result<()> {
615        // Make sure all messages in a batch receive an error.
616        let mut mock = MockGapicPublisher::new();
617        mock.expect_publish()
618            .withf(|r, _| r.messages.len() == 2)
619            .return_once(publish_err);
620
621        let client = GapicPublisher::from_stub(mock);
622        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
623            .set_message_count_threshold(2_u32)
624            .set_byte_threshold(MAX_BYTES)
625            .set_delay_threshold(std::time::Duration::MAX)
626            .build();
627
628        let messages = [
629            Message::new().set_data("hello"),
630            Message::new().set_data("world"),
631        ];
632        let mut handles = Vec::new();
633        for msg in messages {
634            let handle = publisher.publish(msg.clone());
635            handles.push(handle);
636        }
637
638        for rx in handles.into_iter() {
639            let got = rx.await;
640            assert!(got.is_err(), "{got:?}");
641        }
642
643        Ok(())
644    }
645
646    #[tokio_test_no_panics(start_paused = true)]
647    async fn batch_sends_on_byte_threshold() -> anyhow::Result<()> {
648        // Make sure all messages in a batch receive the correct message ID.
649        let mut mock = MockGapicPublisher::new();
650        mock.expect_publish()
651            .withf(|r, _| r.messages.len() == 1)
652            .times(2)
653            .returning(publish_ok);
654
655        let client = GapicPublisher::from_stub(mock);
656        // Ensure that the first message does not pass the threshold.
657        let byte_threshold = TOPIC.len() + "hello".len() + "key".len() + 1;
658        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
659            .set_message_count_threshold(MAX_MESSAGES)
660            .set_byte_threshold(byte_threshold as u32)
661            .set_delay_threshold(std::time::Duration::MAX)
662            .build();
663
664        // Validate without ordering key.
665        let handle = publisher.publish(Message::new().set_data("hello"));
666        // Publish a second message to trigger send on threshold.
667        let _handle = publisher.publish(Message::new().set_data("world"));
668        assert_eq!(handle.await?, "hello");
669
670        // Validate with ordering key.
671        let handle = publisher.publish(Message::new().set_data("hello").set_ordering_key("key"));
672        // Publish a second message to trigger send on threshold.
673        let _handle = publisher.publish(Message::new().set_data("world").set_ordering_key("key"));
674        assert_eq!(handle.await?, "hello");
675
676        Ok(())
677    }
678
679    #[tokio_test_no_panics(start_paused = true)]
680    async fn batch_sends_on_delay_threshold() -> anyhow::Result<()> {
681        let mut mock = MockGapicPublisher::new();
682        mock.expect_publish()
683            .withf(|req, _| req.topic == TOPIC)
684            .returning(publish_ok);
685
686        let client = GapicPublisher::from_stub(mock);
687        let delay = std::time::Duration::from_millis(10);
688        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
689            .set_message_count_threshold(u32::MAX)
690            .set_byte_threshold(MAX_BYTES)
691            .set_delay_threshold(delay)
692            .build();
693
694        // Test that messages send after delay.
695        for _ in 0..3 {
696            let start = tokio::time::Instant::now();
697            let messages = [
698                Message::new().set_data("hello 0"),
699                Message::new().set_data("hello 1"),
700                Message::new()
701                    .set_data("hello 2")
702                    .set_ordering_key("ordering key 1"),
703                Message::new()
704                    .set_data("hello 3")
705                    .set_ordering_key("ordering key 2"),
706            ];
707            let mut handles = Vec::new();
708            for msg in messages {
709                let handle = publisher.publish(msg.clone());
710                handles.push((msg, handle));
711            }
712
713            for (id, rx) in handles.into_iter() {
714                let got = rx.await?;
715                let id = String::from_utf8(id.data.to_vec())?;
716                assert_eq!(got, id);
717                assert_eq!(
718                    start.elapsed(),
719                    delay,
720                    "batch of messages should have sent after {:?}",
721                    delay
722                )
723            }
724        }
725
726        Ok(())
727    }
728
729    #[tokio::test(start_paused = true)]
730    #[allow(clippy::get_first)]
731    async fn batching_separates_by_ordering_key() -> anyhow::Result<()> {
732        // Publish messages with different ordering key and validate that they are in different batches.
733        let mut mock = MockGapicPublisher::new();
734        mock.expect_publish()
735            .withf(|r, _| {
736                r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
737            })
738            .returning(publish_ok);
739
740        let client = GapicPublisher::from_stub(mock);
741        // Use a low message count to trigger batch sends.
742        let message_count_threshold = 2_u32;
743        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
744            .set_message_count_threshold(message_count_threshold)
745            .set_byte_threshold(MAX_BYTES)
746            .set_delay_threshold(std::time::Duration::MAX)
747            .build();
748
749        let num_ordering_keys = 3;
750        let mut messages = Vec::new();
751        // We want the number of messages to be a multiple of num_ordering_keys
752        // and message_count_threshold. Otherwise, the final batch of each
753        // ordering key may fail the message len assertion.
754        for i in 0..(2 * message_count_threshold * num_ordering_keys) {
755            messages.push(
756                Message::new()
757                    .set_data(format!("test message {}", i))
758                    .set_ordering_key(format!("ordering key: {}", i % num_ordering_keys)),
759            );
760        }
761        let mut handles = Vec::new();
762        for msg in messages {
763            let handle = publisher.publish(msg.clone());
764            handles.push((msg, handle));
765        }
766
767        for (id, rx) in handles.into_iter() {
768            let got = rx.await?;
769            let id = String::from_utf8(id.data.to_vec())?;
770            assert_eq!(got, id);
771        }
772
773        Ok(())
774    }
775
776    #[tokio_test_no_panics(start_paused = true)]
777    #[allow(clippy::get_first)]
778    async fn batching_handles_empty_ordering_key() -> anyhow::Result<()> {
779        // Publish messages with different ordering key and validate that they are in different batches.
780        let mut mock = MockGapicPublisher::new();
781        mock.expect_publish()
782            .withf(|r, _| {
783                r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
784            })
785            .returning(publish_ok);
786
787        let client = GapicPublisher::from_stub(mock);
788        // Use a low message count to trigger batch sends.
789        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
790            .set_message_count_threshold(2_u32)
791            .set_byte_threshold(MAX_BYTES)
792            .set_delay_threshold(std::time::Duration::MAX)
793            .build();
794
795        let messages = [
796            Message::new().set_data("hello 1"),
797            Message::new().set_data("hello 2").set_ordering_key(""),
798            Message::new()
799                .set_data("hello 3")
800                .set_ordering_key("ordering key :1"),
801            Message::new()
802                .set_data("hello 4")
803                .set_ordering_key("ordering key :1"),
804        ];
805
806        let mut handles = Vec::new();
807        for msg in messages {
808            let handle = publisher.publish(msg.clone());
809            handles.push((msg, handle));
810        }
811
812        for (id, rx) in handles.into_iter() {
813            let got = rx.await?;
814            let id = String::from_utf8(id.data.to_vec())?;
815            assert_eq!(got, id);
816        }
817
818        Ok(())
819    }
820
821    #[tokio_test_no_panics(start_paused = true)]
822    #[allow(clippy::get_first)]
823    async fn ordering_key_limits_to_one_outstanding_batch() -> anyhow::Result<()> {
824        // Verify that Publisher must only have 1 outstanding batch inflight at a time.
825        // This is done by validating that the 2 expected publish calls are done in sequence
826        // with a sleep delay in the first Publish reply.
827        let mut seq = Sequence::new();
828        let mut mock = MockGapicPublisherWithFuture::new();
829        mock.expect_publish()
830            .times(1)
831            .in_sequence(&mut seq)
832            .withf(|r, _| r.messages.len() == 1)
833            .returning({
834                |r, o| {
835                    Box::pin(async move {
836                        tokio::time::sleep(Duration::from_millis(10)).await;
837                        publish_ok(r, o)
838                    })
839                }
840            });
841
842        mock.expect_publish()
843            .times(1)
844            .in_sequence(&mut seq)
845            .withf(|r, _| r.messages.len() == 1)
846            .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
847
848        let client = GapicPublisher::from_stub(mock);
849        // Use a low message count to trigger batch sends.
850        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
851            .set_message_count_threshold(1_u32)
852            .set_byte_threshold(MAX_BYTES)
853            .set_delay_threshold(std::time::Duration::MAX)
854            .build();
855
856        let messages = [
857            Message::new()
858                .set_data("hello 1")
859                .set_ordering_key("ordering key"),
860            Message::new()
861                .set_data("hello 2")
862                .set_ordering_key("ordering key"),
863        ];
864
865        let start = tokio::time::Instant::now();
866        let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
867        let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
868        assert_eq!(msg2_handle.await?, "hello 2");
869        assert_eq!(
870            start.elapsed(),
871            Duration::from_millis(10),
872            "the second batch of messages should have sent after the first which is has been delayed by {:?}",
873            Duration::from_millis(10)
874        );
875        // Also validate the content of the first publish.
876        assert_eq!(msg1_handle.await?, "hello 1");
877
878        Ok(())
879    }
880
881    #[tokio_test_no_panics(start_paused = true)]
882    #[allow(clippy::get_first)]
883    async fn empty_ordering_key_allows_concurrent_batches() -> anyhow::Result<()> {
884        // Verify that for empty ordering key, the Publisher will send multiple batches without
885        // awaiting for the results.
886        // This is done by adding a delay in the first Publish reply and validating that
887        // the second batch does not await for the first batch.
888        let mut seq = Sequence::new();
889        let mut mock = MockGapicPublisherWithFuture::new();
890        mock.expect_publish()
891            .times(1)
892            .in_sequence(&mut seq)
893            .withf(|r, _| r.messages.len() == 1)
894            .returning(|r, o| {
895                Box::pin(async move {
896                    tokio::time::sleep(Duration::from_millis(10)).await;
897                    publish_ok(r, o)
898                })
899            });
900
901        mock.expect_publish()
902            .times(1)
903            .in_sequence(&mut seq)
904            .withf(|r, _| r.topic == TOPIC && r.messages.len() == 1)
905            .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
906
907        let client = GapicPublisher::from_stub(mock);
908        // Use a low message count to trigger batch sends.
909        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
910            .set_message_count_threshold(1_u32)
911            .set_byte_threshold(MAX_BYTES)
912            .set_delay_threshold(std::time::Duration::MAX)
913            .build();
914
915        let messages = [
916            Message::new().set_data("hello 1").set_ordering_key(""),
917            Message::new().set_data("hello 2").set_ordering_key(""),
918        ];
919
920        let start = tokio::time::Instant::now();
921        let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
922        let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
923        assert_eq!(msg2_handle.await?, "hello 2");
924        assert_eq!(
925            start.elapsed(),
926            Duration::from_millis(0),
927            "the second batch of messages should have sent without any delay"
928        );
929        // Also validate the content of the first publish.
930        assert_eq!(msg1_handle.await?, "hello 1");
931
932        Ok(())
933    }
934
935    #[tokio_test_no_panics(start_paused = true)]
936    async fn ordering_key_error_pauses_publisher() -> anyhow::Result<()> {
937        // Verify that a Publish send error will pause the publisher for an ordering key.
938        let mut seq = Sequence::new();
939        let mut mock = MockGapicPublisher::new();
940        mock.expect_publish()
941            .withf(|req, _o| req.topic == TOPIC)
942            .times(1)
943            .in_sequence(&mut seq)
944            .returning(publish_err);
945
946        mock.expect_publish()
947            .withf(|req, _o| req.topic == TOPIC)
948            .times(2)
949            .in_sequence(&mut seq)
950            .returning(publish_ok);
951
952        let client = GapicPublisher::from_stub(mock);
953        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
954            .set_message_count_threshold(1_u32)
955            .build();
956
957        let key = "ordering_key";
958        let msg_0_handle =
959            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
960        // Publish an additional message so that there are pending messages.
961        let msg_1_handle =
962            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
963
964        // Assert the error is caused by the Publish send operation.
965        let mut got_err = msg_0_handle.await.unwrap_err();
966        assert_publish_err(got_err);
967
968        // Assert that the pending message error is caused by the Publisher being paused.
969        got_err = msg_1_handle.await.unwrap_err();
970        assert!(
971            matches!(got_err, crate::error::PublishError::OrderingKeyPaused),
972            "{got_err:?}"
973        );
974
975        // Assert that new publish messages return errors because the Publisher is paused.
976        for _ in 0..3 {
977            assert_publishing_is_paused!(publisher, key);
978        }
979
980        // Verify that the other ordering keys are not paused.
981        assert_publishing_is_ok!(publisher, "", "without_error");
982
983        Ok(())
984    }
985
986    #[tokio_test_no_panics(start_paused = true)]
987    async fn batch_error_pauses_ordering_key() -> anyhow::Result<()> {
988        // Verify that all messages in the same batch receives the Send error for that batch.
989        let mut mock = MockGapicPublisher::new();
990        mock.expect_publish()
991            .times(1)
992            .withf(|r, _| r.topic == TOPIC && r.messages.len() == 2)
993            .returning(publish_err);
994
995        let client = GapicPublisher::from_stub(mock);
996        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
997            .set_message_count_threshold(2_u32)
998            .build();
999
1000        let key = "ordering_key";
1001        // Publish 2 messages so they are in the same batch.
1002        let msg_0_handle =
1003            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1004        let msg_1_handle =
1005            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
1006
1007        // Validate that they both receives the Send error.
1008        let mut got_err = msg_0_handle.await.unwrap_err();
1009        assert_publish_err(got_err);
1010        got_err = msg_1_handle.await.unwrap_err();
1011        assert_publish_err(got_err);
1012
1013        // Assert that new publish messages returns an error because the Publisher is paused.
1014        assert_publishing_is_paused!(publisher, key);
1015
1016        Ok(())
1017    }
1018
1019    #[tokio_test_no_panics(start_paused = true)]
1020    async fn flush_on_paused_ordering_key_returns_error() -> anyhow::Result<()> {
1021        // Verify that Flush on a paused ordering key returns an error.
1022        let mut seq = Sequence::new();
1023        let mut mock = MockGapicPublisher::new();
1024        mock.expect_publish()
1025            .withf(|req, _o| req.topic == TOPIC)
1026            .times(1)
1027            .in_sequence(&mut seq)
1028            .returning(publish_err);
1029
1030        mock.expect_publish()
1031            .withf(|req, _o| req.topic == TOPIC)
1032            .times(2)
1033            .in_sequence(&mut seq)
1034            .returning(publish_ok);
1035
1036        let client = GapicPublisher::from_stub(mock);
1037        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1038
1039        let key = "ordering_key";
1040        // Cause an ordering key to be paused.
1041        let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1042        publisher.flush().await;
1043        // Assert the error is caused by the Publish send operation.
1044        let got_err = handle.await.unwrap_err();
1045        assert_publish_err(got_err);
1046
1047        // Validate that new Publish on the paused ordering key will result in an error.
1048        assert_publishing_is_paused!(publisher, key);
1049
1050        // Verify that the other ordering keys are not paused.
1051        assert_publishing_is_ok!(publisher, "", "without_error");
1052
1053        Ok(())
1054    }
1055
1056    #[tokio_test_no_panics(start_paused = true)]
1057    async fn resuming_non_paused_ordering_key_is_noop() -> anyhow::Result<()> {
1058        let mut mock = MockGapicPublisher::new();
1059        mock.expect_publish()
1060            .withf(|req, _o| req.topic == TOPIC)
1061            .times(4)
1062            .returning(publish_ok);
1063
1064        let client = GapicPublisher::from_stub(mock);
1065        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1066
1067        // Test resume and publish for empty ordering key.
1068        publisher.resume_publish("");
1069        assert_publishing_is_ok!(publisher, "");
1070
1071        // Test resume and publish after the BatchActor has been created for the empty ordering key.
1072        publisher.resume_publish("");
1073        assert_publishing_is_ok!(publisher, "");
1074
1075        // Test resume and publish before the BatchActor has been created.
1076        let key = "without_error";
1077        publisher.resume_publish(key);
1078        assert_publishing_is_ok!(publisher, key);
1079
1080        // Test resume and publish after the BatchActor has been created.
1081        publisher.resume_publish(key);
1082        assert_publishing_is_ok!(publisher, key);
1083
1084        Ok(())
1085    }
1086
1087    #[tokio_test_no_panics(start_paused = true)]
1088    async fn resuming_paused_ordering_key_allows_publishing() -> anyhow::Result<()> {
1089        let mut seq = Sequence::new();
1090        let mut mock = MockGapicPublisher::new();
1091        mock.expect_publish()
1092            .withf(|req, _o| req.topic == TOPIC)
1093            .times(1)
1094            .in_sequence(&mut seq)
1095            .returning(publish_err);
1096
1097        mock.expect_publish()
1098            .withf(|req, _o| req.topic == TOPIC)
1099            .times(3)
1100            .in_sequence(&mut seq)
1101            .returning(publish_ok);
1102
1103        let client = GapicPublisher::from_stub(mock);
1104        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1105
1106        let key = "ordering_key";
1107        // Cause an ordering key to be paused.
1108        let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1109        // Assert the error is caused by the Publish send operation.
1110        let got_err = handle.await.unwrap_err();
1111        assert_publish_err(got_err);
1112
1113        // Validate that new Publish on the paused ordering key will result in an error.
1114        assert_publishing_is_paused!(publisher, key);
1115
1116        // Resume and validate the key is no longer paused.
1117        publisher.resume_publish(key);
1118        assert_publishing_is_ok!(publisher, key);
1119
1120        // Verify that the other ordering keys continue to work as expected.
1121        assert_publishing_is_ok!(publisher, "", "without_error");
1122
1123        Ok(())
1124    }
1125
1126    #[tokio_test_no_panics(start_paused = true)]
1127    async fn resuming_ordering_key_twice_is_safe() -> anyhow::Result<()> {
1128        // Validate that resuming twice sequentially does not have bad side effects.
1129        let mut seq = Sequence::new();
1130        let mut mock = MockGapicPublisher::new();
1131        mock.expect_publish()
1132            .withf(|req, _o| req.topic == TOPIC)
1133            .in_sequence(&mut seq)
1134            .times(1)
1135            .returning(publish_err);
1136
1137        mock.expect_publish()
1138            .withf(|req, _o| req.topic == TOPIC)
1139            .in_sequence(&mut seq)
1140            .return_once(publish_ok);
1141
1142        let client = GapicPublisher::from_stub(mock);
1143        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1144
1145        let key = "ordering_key";
1146        // Cause an ordering key to be paused.
1147        let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1148        publisher.flush().await;
1149        // Assert the error is caused by the Publish send operation.
1150        let got_err = handle.await.unwrap_err();
1151        assert_publish_err(got_err);
1152
1153        // Validate that new Publish on the paused ordering key will result in an error.
1154        assert_publishing_is_paused!(publisher, key);
1155
1156        // Resume twice on the paused ordering key.
1157        publisher.resume_publish(key);
1158        publisher.resume_publish(key);
1159        assert_publishing_is_ok!(publisher, key);
1160
1161        Ok(())
1162    }
1163
1164    #[tokio_test_no_panics(start_paused = true)]
1165    async fn resuming_one_ordering_key_does_not_resume_others() -> anyhow::Result<()> {
1166        // Validate that resume_publish only resumes the paused ordering key .
1167        let mut seq = Sequence::new();
1168        let mut mock = MockGapicPublisher::new();
1169        mock.expect_publish()
1170            .withf(|req, _o| req.topic == TOPIC)
1171            .times(2)
1172            .in_sequence(&mut seq)
1173            .returning(publish_err);
1174
1175        mock.expect_publish()
1176            .withf(|req, _o| req.topic == TOPIC)
1177            .times(1)
1178            .in_sequence(&mut seq)
1179            .returning(publish_ok);
1180
1181        let client = GapicPublisher::from_stub(mock);
1182        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1183
1184        let key_0 = "ordering_key_0";
1185        let key_1 = "ordering_key_1";
1186        // Cause both ordering keys to pause.
1187        let handle_0 = publisher.publish(Message::new().set_ordering_key(key_0).set_data("msg 0"));
1188        let handle_1 = publisher.publish(Message::new().set_ordering_key(key_1).set_data("msg 1"));
1189        publisher.flush().await;
1190        let mut got_err = handle_0.await.unwrap_err();
1191        assert_publish_err(got_err);
1192        got_err = handle_1.await.unwrap_err();
1193        assert_publish_err(got_err);
1194
1195        // Assert that both ordering keys are paused.
1196        assert_publishing_is_paused!(publisher, key_0, key_1);
1197
1198        // Resume on one of the ordering key.
1199        publisher.resume_publish(key_0);
1200
1201        // Validate that only the correct ordering key is resumed.
1202        assert_publishing_is_ok!(publisher, key_0);
1203
1204        // Validate the other ordering key is still paused.
1205        assert_publishing_is_paused!(publisher, key_1);
1206
1207        Ok(())
1208    }
1209
1210    #[tokio::test]
1211    async fn publisher_builder_clamps_batching_options() -> anyhow::Result<()> {
1212        // Test values that are too high and should be clamped.
1213        let oversized_options = BatchingOptions::new()
1214            .set_delay_threshold(MAX_DELAY + Duration::from_secs(1))
1215            .set_message_count_threshold(MAX_MESSAGES + 1)
1216            .set_byte_threshold(MAX_BYTES + 1);
1217
1218        let publishers = vec![
1219            BasePublisher::builder()
1220                .build()
1221                .await?
1222                .publisher("projects/my-project/topics/my-topic")
1223                .set_delay_threshold(oversized_options.delay_threshold)
1224                .set_message_count_threshold(oversized_options.message_count_threshold)
1225                .set_byte_threshold(oversized_options.byte_threshold)
1226                .build(),
1227            Publisher::builder("projects/my-project/topics/my-topic".to_string())
1228                .set_delay_threshold(oversized_options.delay_threshold)
1229                .set_message_count_threshold(oversized_options.message_count_threshold)
1230                .set_byte_threshold(oversized_options.byte_threshold)
1231                .build()
1232                .await?,
1233        ];
1234
1235        for publisher in publishers {
1236            let got = publisher.batching_options;
1237            assert_eq!(got.delay_threshold, MAX_DELAY);
1238            assert_eq!(got.message_count_threshold, MAX_MESSAGES);
1239            assert_eq!(got.byte_threshold, MAX_BYTES);
1240        }
1241
1242        // Test values that are within limits and should not be changed.
1243        let normal_options = BatchingOptions::new()
1244            .set_delay_threshold(Duration::from_secs(10))
1245            .set_message_count_threshold(10_u32)
1246            .set_byte_threshold(100_u32);
1247
1248        let publishers = vec![
1249            BasePublisher::builder()
1250                .build()
1251                .await?
1252                .publisher("projects/my-project/topics/my-topic")
1253                .set_delay_threshold(normal_options.delay_threshold)
1254                .set_message_count_threshold(normal_options.message_count_threshold)
1255                .set_byte_threshold(normal_options.byte_threshold)
1256                .build(),
1257            Publisher::builder("projects/my-project/topics/my-topic".to_string())
1258                .set_delay_threshold(normal_options.delay_threshold)
1259                .set_message_count_threshold(normal_options.message_count_threshold)
1260                .set_byte_threshold(normal_options.byte_threshold)
1261                .build()
1262                .await?,
1263        ];
1264
1265        for publisher in publishers {
1266            let got = publisher.batching_options;
1267
1268            assert_eq!(got.delay_threshold, normal_options.delay_threshold);
1269            assert_eq!(
1270                got.message_count_threshold,
1271                normal_options.message_count_threshold
1272            );
1273            assert_eq!(got.byte_threshold, normal_options.byte_threshold);
1274        }
1275        Ok(())
1276    }
1277}