Skip to main content

google_cloud_pubsub/publisher/
client.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::options::BatchingOptions;
16use crate::publisher::actor::BundledMessage;
17use crate::publisher::actor::ToDispatcher;
18use crate::publisher::builder::PublisherBuilder;
19
20use tokio::sync::mpsc::UnboundedSender;
21use tokio::sync::oneshot;
22
23pub use super::base_publisher::BasePublisher;
24
25/// A Publisher client for the [Cloud Pub/Sub] API.
26///
27/// A `Publisher` sends messages to a specific topic. It manages message batching
28/// and sending in a background task.
29///
30/// ```
31/// # async fn sample() -> anyhow::Result<()> {
32/// # use google_cloud_pubsub::*;
33/// # use google_cloud_pubsub::client::Publisher;
34/// # use model::Message;
35/// let publisher = Publisher::builder("projects/my-project/topics/my-topic").build().await?;
36/// let message_id_future = publisher.publish(Message::new().set_data("Hello, World"));
37/// # Ok(()) }
38/// ```
39///
40/// # Configuration
41///
42/// To configure `Publisher` use the `with_*` methods in the type returned
43/// by [builder()][Publisher::builder]. The default configuration should
44/// work for most applications. Common configuration changes include
45///
46/// * [with_endpoint()]: by default this client uses the global default endpoint
47///   (`https://pubsub.googleapis.com`). Applications using regional
48///   endpoints or running in restricted networks (e.g. a network configured
49//    with [Private Google Access with VPC Service Controls]) may want to
50///   override this default.
51/// * [with_credentials()]: by default this client uses
52///   [Application Default Credentials]. Applications using custom
53///   authentication may need to override this default.
54///
55/// [with_endpoint()]: crate::builder::publisher::PublisherBuilder::with_endpoint
56/// [with_credentials()]: crate::builder::publisher::PublisherBuilder::with_credentials
57/// [Private Google Access with VPC Service Controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
58/// [Application Default Credentials]: https://cloud.google.com/docs/authentication#adc
59///
60/// # Pooling and Cloning
61///
62/// `Publisher` holds a connection pool internally, it is advised to
63/// create one and then reuse it. You do not need to wrap `Publisher` in
64/// an [Rc](std::rc::Rc) or [Arc](std::sync::Arc) to reuse it.
65#[derive(Debug, Clone)]
66pub struct Publisher {
67    #[allow(dead_code)]
68    pub(crate) batching_options: BatchingOptions,
69    pub(crate) tx: UnboundedSender<ToDispatcher>,
70}
71
72impl Publisher {
73    /// Returns a builder for [Publisher].
74    ///
75    /// # Example
76    ///
77    /// ```
78    /// # async fn sample() -> anyhow::Result<()> {
79    /// # use google_cloud_pubsub::*;
80    /// # use google_cloud_pubsub::client::Publisher;
81    /// let publisher = Publisher::builder("projects/my-project/topics/topic").build().await?;
82    /// # Ok(()) }
83    /// ```
84    pub fn builder<T: Into<String>>(topic: T) -> PublisherBuilder {
85        PublisherBuilder::new(topic.into())
86    }
87
88    /// Publishes a message to the topic.
89    ///
90    /// When this method encounters a non-recoverable error publishing for an ordering key,
91    /// it will pause publishing on all new messages on that ordering key. Any outstanding
92    /// messages that have not yet been published may return an error.
93    ///
94    /// ```
95    /// # use google_cloud_pubsub::client::Publisher;
96    /// # async fn sample(publisher: Publisher) -> anyhow::Result<()> {
97    /// # use google_cloud_pubsub::model::Message;
98    /// let message_id = publisher.publish(Message::new().set_data("Hello, World")).await?;
99    /// # Ok(()) }
100    /// ```
101    #[must_use = "ignoring the publish result may lead to undetected delivery failures"]
102    pub fn publish(&self, msg: crate::model::Message) -> crate::publisher::PublishFuture {
103        let (tx, rx) = tokio::sync::oneshot::channel();
104
105        // If this fails, the Dispatcher is gone, which indicates it has been dropped,
106        // possibly due to the background task being stopped by the runtime.
107        // The PublishFuture will automatically receive an error when `tx` is dropped.
108        if self
109            .tx
110            .send(ToDispatcher::Publish(BundledMessage { msg, tx }))
111            .is_err()
112        {
113            // `tx` is dropped here if the send errors.
114        }
115        crate::publisher::PublishFuture { rx }
116    }
117
118    /// Flushes all buffered messages across all ordering keys, sending them immediately.
119    ///
120    /// ```
121    /// # use google_cloud_pubsub::model::Message;
122    /// # async fn sample(publisher: google_cloud_pubsub::client::Publisher) -> anyhow::Result<()> {
123    /// let _handle = publisher.publish(Message::new().set_data("event"));
124    /// // Ensures the message above is sent without needing to track its future.
125    /// publisher.flush().await;
126    /// # Ok(())
127    /// # }
128    /// ```
129    ///
130    /// This method bypasses configured batching delays and returns only after all
131    /// messages buffered at the time of the call have reached a terminal state
132    /// (success or permanent failure).
133    ///
134    /// ### Recommendations
135    ///
136    /// *   For most use cases, we recommend you `.await`
137    ///     the [`PublishFuture`][crate::publisher::PublishFuture] returned by
138    ///     [`publish`][Self::publish] to retrieve message IDs and handle
139    ///     specific errors.
140    /// *   Use `flush()` as a convenience during application shutdown to
141    ///     ensure the client attempts to send all outstanding data.
142    pub async fn flush(&self) {
143        let (tx, rx) = oneshot::channel();
144        if self.tx.send(ToDispatcher::Flush(tx)).is_ok() {
145            let _ = rx.await;
146        }
147    }
148
149    /// Resume accepting publish for a paused ordering key.
150    ///
151    /// Publishing using an ordering key might be paused if an error is encountered while publishing, to prevent messages from being published out of order.
152    /// If the ordering key is not currently paused, this function is a no-op.
153    ///
154    /// # Example
155    ///
156    /// ```
157    /// # use google_cloud_pubsub::model::Message;
158    /// # async fn sample(publisher: google_cloud_pubsub::client::Publisher) -> anyhow::Result<()> {
159    /// if let Err(_) = publisher.publish(Message::new().set_data("foo").set_ordering_key("bar")).await {
160    ///     // Error handling code can go here.
161    ///     publisher.resume_publish("bar");
162    /// }
163    /// # Ok(())
164    /// # }
165    /// ```
166    pub fn resume_publish<T: std::convert::Into<std::string::String>>(&self, ordering_key: T) {
167        let _ = self
168            .tx
169            .send(ToDispatcher::ResumePublish(ordering_key.into()));
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use crate::publisher::builder::PublisherPartialBuilder;
177    use crate::publisher::client::BasePublisher;
178    use crate::publisher::constants::*;
179    use crate::publisher::options::BatchingOptions;
180    use crate::{
181        generated::gapic_dataplane::client::Publisher as GapicPublisher,
182        model::{Message, PublishResponse},
183    };
184    use google_cloud_test_macros::tokio_test_no_panics;
185    use mockall::Sequence;
186    use rand::{RngExt, distr::Alphanumeric};
187    use std::error::Error;
188    use std::time::Duration;
189
190    static TOPIC: &str = "my-topic";
191
192    mockall::mock! {
193        #[derive(Debug)]
194        GapicPublisher {}
195        impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisher {
196            async fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> crate::Result<crate::Response<crate::model::PublishResponse>>;
197        }
198    }
199
200    // Similar to GapicPublisher but returns impl Future instead.
201    // This is useful for mocking a response with delays/timeouts.
202    // See https://github.com/asomers/mockall/issues/189 for more
203    // detail on why this is needed.
204    // While this can used inplace of GapicPublisher, it makes the
205    // normal usage without async closure much more cumbersome.
206    mockall::mock! {
207        #[derive(Debug)]
208        GapicPublisherWithFuture {}
209        impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
210            fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> impl Future<Output=crate::Result<crate::Response<crate::model::PublishResponse>>> + Send;
211        }
212    }
213
214    fn publish_ok(
215        req: crate::model::PublishRequest,
216        _options: crate::RequestOptions,
217    ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
218        let ids = req
219            .messages
220            .iter()
221            .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
222        Ok(crate::Response::from(
223            PublishResponse::new().set_message_ids(ids),
224        ))
225    }
226
227    fn publish_err(
228        _req: crate::model::PublishRequest,
229        _options: crate::RequestOptions,
230    ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
231        Err(crate::Error::service(
232            google_cloud_gax::error::rpc::Status::default()
233                .set_code(google_cloud_gax::error::rpc::Code::Unknown)
234                .set_message("unknown error has occurred"),
235        ))
236    }
237
238    #[track_caller]
239    fn assert_publish_err(got_err: crate::error::PublishError) {
240        assert!(
241            matches!(got_err, crate::error::PublishError::Rpc(_)),
242            "{got_err:?}"
243        );
244        let source = got_err
245            .source()
246            .and_then(|e| e.downcast_ref::<std::sync::Arc<crate::Error>>())
247            .expect("send error should contain a source");
248        assert!(source.status().is_some(), "{got_err:?}");
249        assert_eq!(
250            source.status().unwrap().code,
251            google_cloud_gax::error::rpc::Code::Unknown,
252            "{got_err:?}"
253        );
254    }
255
256    fn generate_random_data() -> String {
257        rand::rng()
258            .sample_iter(&Alphanumeric)
259            .take(16)
260            .map(char::from)
261            .collect()
262    }
263
264    macro_rules! assert_publishing_is_ok {
265        ($publisher:ident, $($ordering_key:expr),+) => {
266            $(
267                let msg = generate_random_data();
268                let got = $publisher
269                    .publish(
270                        Message::new()
271                            .set_ordering_key($ordering_key)
272                            .set_data(msg.clone()),
273                    )
274                    .await;
275                assert_eq!(got?, msg);
276            )+
277        };
278    }
279
280    macro_rules! assert_publishing_is_paused {
281        ($publisher:ident, $($ordering_key:expr),+) => {
282            $(
283                let got_err = $publisher
284                    .publish(
285                        Message::new()
286                            .set_ordering_key($ordering_key)
287                            .set_data(generate_random_data()),
288                    )
289                    .await;
290                assert!(
291                    matches!(got_err, Err(crate::error::PublishError::OrderingKeyPaused)),
292                    "{got_err:?}"
293                );
294            )+
295        };
296    }
297
298    #[tokio_test_no_panics]
299    async fn publisher_publish_successfully() -> anyhow::Result<()> {
300        let mut mock = MockGapicPublisher::new();
301        mock.expect_publish()
302            .times(2)
303            .withf(|req, _o| req.topic == TOPIC)
304            .returning(publish_ok);
305
306        let client = GapicPublisher::from_stub(mock);
307        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
308            .set_message_count_threshold(1_u32)
309            .build();
310
311        let messages = [
312            Message::new().set_data("hello"),
313            Message::new().set_data("world"),
314        ];
315        let mut handles = Vec::new();
316        for msg in messages {
317            let handle = publisher.publish(msg.clone());
318            handles.push((msg, handle));
319        }
320
321        for (id, rx) in handles.into_iter() {
322            let got = rx.await?;
323            let id = String::from_utf8(id.data.to_vec())?;
324            assert_eq!(got, id);
325        }
326
327        Ok(())
328    }
329
330    #[tokio::test]
331    async fn publisher_publish_large_message() -> anyhow::Result<()> {
332        let mut mock = MockGapicPublisher::new();
333        mock.expect_publish()
334            .withf(|req, _o| req.topic == TOPIC)
335            .returning(publish_ok);
336
337        let client = GapicPublisher::from_stub(mock);
338        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
339            .set_byte_threshold(1_u32)
340            .build();
341        assert_publishing_is_ok!(publisher, "");
342        assert_publishing_is_ok!(publisher, "key");
343
344        Ok(())
345    }
346
347    #[tokio::test(start_paused = true)]
348    async fn worker_handles_forced_shutdown_gracefully() -> anyhow::Result<()> {
349        let mock = MockGapicPublisher::new();
350
351        let client = GapicPublisher::from_stub(mock);
352        let (publisher, background_task_handle) =
353            PublisherPartialBuilder::new(client, TOPIC.to_string())
354                .set_message_count_threshold(100_u32)
355                .build_return_handle();
356
357        let messages = [
358            Message::new().set_data("hello"),
359            Message::new().set_data("world"),
360        ];
361        let mut handles = Vec::new();
362        for msg in messages {
363            let handle = publisher.publish(msg);
364            handles.push(handle);
365        }
366
367        background_task_handle.abort();
368
369        for rx in handles.into_iter() {
370            rx.await
371                .expect_err("expected error when background task canceled");
372        }
373
374        Ok(())
375    }
376
377    #[tokio_test_no_panics(start_paused = true)]
378    async fn dropping_publisher_flushes_pending_messages() -> anyhow::Result<()> {
379        // If we hold on to the handles returned from the publisher, it should
380        // be safe to drop the publisher and .await on the handles.
381        let mut mock = MockGapicPublisher::new();
382        mock.expect_publish()
383            .withf(|req, _o| req.topic == TOPIC)
384            .times(2)
385            .returning(publish_ok);
386
387        let client = GapicPublisher::from_stub(mock);
388        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
389            .set_message_count_threshold(1000_u32)
390            .set_delay_threshold(Duration::from_secs(60))
391            .build();
392
393        let start = tokio::time::Instant::now();
394        let messages = [
395            Message::new().set_data("hello"),
396            Message::new().set_data("world"),
397            Message::new().set_data("hello").set_ordering_key("key"),
398            Message::new().set_data("world").set_ordering_key("key"),
399        ];
400        let mut handles = Vec::new();
401        for msg in messages {
402            let handle = publisher.publish(msg.clone());
403            handles.push((msg, handle));
404        }
405        drop(publisher); // This should trigger the publisher to send all pending messages.
406
407        for (id, rx) in handles.into_iter() {
408            let got = rx.await?;
409            let id = String::from_utf8(id.data.to_vec())?;
410            assert_eq!(got, id);
411            assert_eq!(start.elapsed(), Duration::ZERO);
412        }
413
414        Ok(())
415    }
416
417    #[tokio_test_no_panics]
418    async fn publisher_handles_publish_errors() -> anyhow::Result<()> {
419        let mut mock = MockGapicPublisher::new();
420        mock.expect_publish()
421            .times(2)
422            .withf(|req, _o| req.topic == TOPIC)
423            .returning(publish_err);
424
425        let client = GapicPublisher::from_stub(mock);
426        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
427            .set_message_count_threshold(1_u32)
428            .build();
429
430        let messages = [
431            Message::new().set_data("hello"),
432            Message::new().set_data("world"),
433        ];
434
435        let mut handles = Vec::new();
436        for msg in messages {
437            let handle = publisher.publish(msg.clone());
438            handles.push(handle);
439        }
440
441        for rx in handles.into_iter() {
442            let got = rx.await;
443            assert!(got.is_err(), "{got:?}");
444        }
445
446        Ok(())
447    }
448
449    #[tokio_test_no_panics(start_paused = true)]
450    async fn flush_sends_pending_messages_immediately() -> anyhow::Result<()> {
451        let mut mock = MockGapicPublisher::new();
452        mock.expect_publish()
453            .withf(|req, _o| req.topic == TOPIC)
454            .returning(publish_ok);
455
456        let client = GapicPublisher::from_stub(mock);
457        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
458            // Set a long delay.
459            .set_message_count_threshold(1000_u32)
460            .set_delay_threshold(Duration::from_secs(60))
461            .build();
462
463        let start = tokio::time::Instant::now();
464        let messages = [
465            Message::new().set_data("hello"),
466            Message::new().set_data("world"),
467        ];
468        let mut handles = Vec::new();
469        for msg in messages {
470            let handle = publisher.publish(msg.clone());
471            handles.push((msg, handle));
472        }
473
474        publisher.flush().await;
475        assert_eq!(start.elapsed(), Duration::ZERO);
476
477        let post = publisher.publish(Message::new().set_data("after"));
478        for (id, rx) in handles.into_iter() {
479            let got = rx.await?;
480            let id = String::from_utf8(id.data.to_vec())?;
481            assert_eq!(got, id);
482            assert_eq!(start.elapsed(), Duration::ZERO);
483        }
484
485        // Validate that the post message is only sent after the next timeout.
486        // I.e., the Publisher does not continuously flush new messages.
487        let got = post.await?;
488        assert_eq!(got, "after");
489        assert_eq!(start.elapsed(), Duration::from_secs(60));
490
491        Ok(())
492    }
493
494    #[tokio_test_no_panics(start_paused = true)]
495    // Users should be able to drop handles and the messages will still send.
496    async fn dropping_handles_does_not_prevent_publishing() -> anyhow::Result<()> {
497        let mut mock = MockGapicPublisher::new();
498        mock.expect_publish()
499            .withf(|r, _| {
500                r.messages.len() == 2
501                    && r.messages[0].data == "hello"
502                    && r.messages[1].data == "world"
503            })
504            .return_once(publish_ok);
505
506        let client = GapicPublisher::from_stub(mock);
507        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
508            // Set a long delay.
509            .set_message_count_threshold(1000_u32)
510            .set_delay_threshold(Duration::from_secs(60))
511            .build();
512
513        let start = tokio::time::Instant::now();
514        let messages = [
515            Message::new().set_data("hello"),
516            Message::new().set_data("world"),
517        ];
518        for msg in messages {
519            let handle = publisher.publish(msg.clone());
520            drop(handle);
521        }
522
523        publisher.flush().await;
524        assert_eq!(start.elapsed(), Duration::ZERO);
525
526        Ok(())
527    }
528
529    #[tokio::test(start_paused = true)]
530    async fn flush_with_no_messages_is_noop() -> anyhow::Result<()> {
531        let mock = MockGapicPublisher::new();
532
533        let client = GapicPublisher::from_stub(mock);
534        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
535
536        let start = tokio::time::Instant::now();
537        publisher.flush().await;
538        assert_eq!(start.elapsed(), Duration::ZERO);
539
540        Ok(())
541    }
542
543    #[tokio_test_no_panics]
544    async fn batch_sends_on_message_count_threshold_success() -> anyhow::Result<()> {
545        // Make sure all messages in a batch receive the correct message ID.
546        let mut mock = MockGapicPublisher::new();
547        mock.expect_publish()
548            .withf(|r, _| r.messages.len() == 2)
549            .return_once(publish_ok);
550
551        let client = GapicPublisher::from_stub(mock);
552        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
553            .set_message_count_threshold(2_u32)
554            .set_byte_threshold(MAX_BYTES)
555            .set_delay_threshold(std::time::Duration::MAX)
556            .build();
557
558        let messages = [
559            Message::new().set_data("hello"),
560            Message::new().set_data("world"),
561        ];
562        let mut handles = Vec::new();
563        for msg in messages {
564            let handle = publisher.publish(msg.clone());
565            handles.push((msg, handle));
566        }
567
568        for (id, rx) in handles.into_iter() {
569            let got = rx.await?;
570            let id = String::from_utf8(id.data.to_vec())?;
571            assert_eq!(got, id);
572        }
573
574        Ok(())
575    }
576
577    #[tokio_test_no_panics]
578    async fn batch_sends_on_message_count_threshold_error() -> anyhow::Result<()> {
579        // Make sure all messages in a batch receive an error.
580        let mut mock = MockGapicPublisher::new();
581        mock.expect_publish()
582            .withf(|r, _| r.messages.len() == 2)
583            .return_once(publish_err);
584
585        let client = GapicPublisher::from_stub(mock);
586        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
587            .set_message_count_threshold(2_u32)
588            .set_byte_threshold(MAX_BYTES)
589            .set_delay_threshold(std::time::Duration::MAX)
590            .build();
591
592        let messages = [
593            Message::new().set_data("hello"),
594            Message::new().set_data("world"),
595        ];
596        let mut handles = Vec::new();
597        for msg in messages {
598            let handle = publisher.publish(msg.clone());
599            handles.push(handle);
600        }
601
602        for rx in handles.into_iter() {
603            let got = rx.await;
604            assert!(got.is_err(), "{got:?}");
605        }
606
607        Ok(())
608    }
609
610    #[tokio_test_no_panics(start_paused = true)]
611    async fn batch_sends_on_byte_threshold() -> anyhow::Result<()> {
612        // Make sure all messages in a batch receive the correct message ID.
613        let mut mock = MockGapicPublisher::new();
614        mock.expect_publish()
615            .withf(|r, _| r.messages.len() == 1)
616            .times(2)
617            .returning(publish_ok);
618
619        let client = GapicPublisher::from_stub(mock);
620        // Ensure that the first message does not pass the threshold.
621        let byte_threshold = TOPIC.len() + "hello".len() + "key".len() + 1;
622        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
623            .set_message_count_threshold(MAX_MESSAGES)
624            .set_byte_threshold(byte_threshold as u32)
625            .set_delay_threshold(std::time::Duration::MAX)
626            .build();
627
628        // Validate without ordering key.
629        let handle = publisher.publish(Message::new().set_data("hello"));
630        // Publish a second message to trigger send on threshold.
631        let _handle = publisher.publish(Message::new().set_data("world"));
632        assert_eq!(handle.await?, "hello");
633
634        // Validate with ordering key.
635        let handle = publisher.publish(Message::new().set_data("hello").set_ordering_key("key"));
636        // Publish a second message to trigger send on threshold.
637        let _handle = publisher.publish(Message::new().set_data("world").set_ordering_key("key"));
638        assert_eq!(handle.await?, "hello");
639
640        Ok(())
641    }
642
643    #[tokio_test_no_panics(start_paused = true)]
644    async fn batch_sends_on_delay_threshold() -> anyhow::Result<()> {
645        let mut mock = MockGapicPublisher::new();
646        mock.expect_publish()
647            .withf(|req, _| req.topic == TOPIC)
648            .returning(publish_ok);
649
650        let client = GapicPublisher::from_stub(mock);
651        let delay = std::time::Duration::from_millis(10);
652        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
653            .set_message_count_threshold(u32::MAX)
654            .set_byte_threshold(MAX_BYTES)
655            .set_delay_threshold(delay)
656            .build();
657
658        // Test that messages send after delay.
659        for _ in 0..3 {
660            let start = tokio::time::Instant::now();
661            let messages = [
662                Message::new().set_data("hello 0"),
663                Message::new().set_data("hello 1"),
664                Message::new()
665                    .set_data("hello 2")
666                    .set_ordering_key("ordering key 1"),
667                Message::new()
668                    .set_data("hello 3")
669                    .set_ordering_key("ordering key 2"),
670            ];
671            let mut handles = Vec::new();
672            for msg in messages {
673                let handle = publisher.publish(msg.clone());
674                handles.push((msg, handle));
675            }
676
677            for (id, rx) in handles.into_iter() {
678                let got = rx.await?;
679                let id = String::from_utf8(id.data.to_vec())?;
680                assert_eq!(got, id);
681                assert_eq!(
682                    start.elapsed(),
683                    delay,
684                    "batch of messages should have sent after {:?}",
685                    delay
686                )
687            }
688        }
689
690        Ok(())
691    }
692
693    #[tokio::test(start_paused = true)]
694    #[allow(clippy::get_first)]
695    async fn batching_separates_by_ordering_key() -> anyhow::Result<()> {
696        // Publish messages with different ordering key and validate that they are in different batches.
697        let mut mock = MockGapicPublisher::new();
698        mock.expect_publish()
699            .withf(|r, _| {
700                r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
701            })
702            .returning(publish_ok);
703
704        let client = GapicPublisher::from_stub(mock);
705        // Use a low message count to trigger batch sends.
706        let message_count_threshold = 2_u32;
707        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
708            .set_message_count_threshold(message_count_threshold)
709            .set_byte_threshold(MAX_BYTES)
710            .set_delay_threshold(std::time::Duration::MAX)
711            .build();
712
713        let num_ordering_keys = 3;
714        let mut messages = Vec::new();
715        // We want the number of messages to be a multiple of num_ordering_keys
716        // and message_count_threshold. Otherwise, the final batch of each
717        // ordering key may fail the message len assertion.
718        for i in 0..(2 * message_count_threshold * num_ordering_keys) {
719            messages.push(
720                Message::new()
721                    .set_data(format!("test message {}", i))
722                    .set_ordering_key(format!("ordering key: {}", i % num_ordering_keys)),
723            );
724        }
725        let mut handles = Vec::new();
726        for msg in messages {
727            let handle = publisher.publish(msg.clone());
728            handles.push((msg, handle));
729        }
730
731        for (id, rx) in handles.into_iter() {
732            let got = rx.await?;
733            let id = String::from_utf8(id.data.to_vec())?;
734            assert_eq!(got, id);
735        }
736
737        Ok(())
738    }
739
740    #[tokio_test_no_panics(start_paused = true)]
741    #[allow(clippy::get_first)]
742    async fn batching_handles_empty_ordering_key() -> anyhow::Result<()> {
743        // Publish messages with different ordering key and validate that they are in different batches.
744        let mut mock = MockGapicPublisher::new();
745        mock.expect_publish()
746            .withf(|r, _| {
747                r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
748            })
749            .returning(publish_ok);
750
751        let client = GapicPublisher::from_stub(mock);
752        // Use a low message count to trigger batch sends.
753        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
754            .set_message_count_threshold(2_u32)
755            .set_byte_threshold(MAX_BYTES)
756            .set_delay_threshold(std::time::Duration::MAX)
757            .build();
758
759        let messages = [
760            Message::new().set_data("hello 1"),
761            Message::new().set_data("hello 2").set_ordering_key(""),
762            Message::new()
763                .set_data("hello 3")
764                .set_ordering_key("ordering key :1"),
765            Message::new()
766                .set_data("hello 4")
767                .set_ordering_key("ordering key :1"),
768        ];
769
770        let mut handles = Vec::new();
771        for msg in messages {
772            let handle = publisher.publish(msg.clone());
773            handles.push((msg, handle));
774        }
775
776        for (id, rx) in handles.into_iter() {
777            let got = rx.await?;
778            let id = String::from_utf8(id.data.to_vec())?;
779            assert_eq!(got, id);
780        }
781
782        Ok(())
783    }
784
785    #[tokio_test_no_panics(start_paused = true)]
786    #[allow(clippy::get_first)]
787    async fn ordering_key_limits_to_one_outstanding_batch() -> anyhow::Result<()> {
788        // Verify that Publisher must only have 1 outstanding batch inflight at a time.
789        // This is done by validating that the 2 expected publish calls are done in sequence
790        // with a sleep delay in the first Publish reply.
791        let mut seq = Sequence::new();
792        let mut mock = MockGapicPublisherWithFuture::new();
793        mock.expect_publish()
794            .times(1)
795            .in_sequence(&mut seq)
796            .withf(|r, _| r.messages.len() == 1)
797            .returning({
798                |r, o| {
799                    Box::pin(async move {
800                        tokio::time::sleep(Duration::from_millis(10)).await;
801                        publish_ok(r, o)
802                    })
803                }
804            });
805
806        mock.expect_publish()
807            .times(1)
808            .in_sequence(&mut seq)
809            .withf(|r, _| r.messages.len() == 1)
810            .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
811
812        let client = GapicPublisher::from_stub(mock);
813        // Use a low message count to trigger batch sends.
814        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
815            .set_message_count_threshold(1_u32)
816            .set_byte_threshold(MAX_BYTES)
817            .set_delay_threshold(std::time::Duration::MAX)
818            .build();
819
820        let messages = [
821            Message::new()
822                .set_data("hello 1")
823                .set_ordering_key("ordering key"),
824            Message::new()
825                .set_data("hello 2")
826                .set_ordering_key("ordering key"),
827        ];
828
829        let start = tokio::time::Instant::now();
830        let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
831        let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
832        assert_eq!(msg2_handle.await?, "hello 2");
833        assert_eq!(
834            start.elapsed(),
835            Duration::from_millis(10),
836            "the second batch of messages should have sent after the first which is has been delayed by {:?}",
837            Duration::from_millis(10)
838        );
839        // Also validate the content of the first publish.
840        assert_eq!(msg1_handle.await?, "hello 1");
841
842        Ok(())
843    }
844
845    #[tokio_test_no_panics(start_paused = true)]
846    #[allow(clippy::get_first)]
847    async fn empty_ordering_key_allows_concurrent_batches() -> anyhow::Result<()> {
848        // Verify that for empty ordering key, the Publisher will send multiple batches without
849        // awaiting for the results.
850        // This is done by adding a delay in the first Publish reply and validating that
851        // the second batch does not await for the first batch.
852        let mut seq = Sequence::new();
853        let mut mock = MockGapicPublisherWithFuture::new();
854        mock.expect_publish()
855            .times(1)
856            .in_sequence(&mut seq)
857            .withf(|r, _| r.messages.len() == 1)
858            .returning(|r, o| {
859                Box::pin(async move {
860                    tokio::time::sleep(Duration::from_millis(10)).await;
861                    publish_ok(r, o)
862                })
863            });
864
865        mock.expect_publish()
866            .times(1)
867            .in_sequence(&mut seq)
868            .withf(|r, _| r.topic == TOPIC && r.messages.len() == 1)
869            .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
870
871        let client = GapicPublisher::from_stub(mock);
872        // Use a low message count to trigger batch sends.
873        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
874            .set_message_count_threshold(1_u32)
875            .set_byte_threshold(MAX_BYTES)
876            .set_delay_threshold(std::time::Duration::MAX)
877            .build();
878
879        let messages = [
880            Message::new().set_data("hello 1").set_ordering_key(""),
881            Message::new().set_data("hello 2").set_ordering_key(""),
882        ];
883
884        let start = tokio::time::Instant::now();
885        let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
886        let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
887        assert_eq!(msg2_handle.await?, "hello 2");
888        assert_eq!(
889            start.elapsed(),
890            Duration::from_millis(0),
891            "the second batch of messages should have sent without any delay"
892        );
893        // Also validate the content of the first publish.
894        assert_eq!(msg1_handle.await?, "hello 1");
895
896        Ok(())
897    }
898
899    #[tokio_test_no_panics(start_paused = true)]
900    async fn ordering_key_error_pauses_publisher() -> anyhow::Result<()> {
901        // Verify that a Publish send error will pause the publisher for an ordering key.
902        let mut seq = Sequence::new();
903        let mut mock = MockGapicPublisher::new();
904        mock.expect_publish()
905            .withf(|req, _o| req.topic == TOPIC)
906            .times(1)
907            .in_sequence(&mut seq)
908            .returning(publish_err);
909
910        mock.expect_publish()
911            .withf(|req, _o| req.topic == TOPIC)
912            .times(2)
913            .in_sequence(&mut seq)
914            .returning(publish_ok);
915
916        let client = GapicPublisher::from_stub(mock);
917        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
918            .set_message_count_threshold(1_u32)
919            .build();
920
921        let key = "ordering_key";
922        let msg_0_handle =
923            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
924        // Publish an additional message so that there are pending messages.
925        let msg_1_handle =
926            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
927
928        // Assert the error is caused by the Publish send operation.
929        let mut got_err = msg_0_handle.await.unwrap_err();
930        assert_publish_err(got_err);
931
932        // Assert that the pending message error is caused by the Publisher being paused.
933        got_err = msg_1_handle.await.unwrap_err();
934        assert!(
935            matches!(got_err, crate::error::PublishError::OrderingKeyPaused),
936            "{got_err:?}"
937        );
938
939        // Assert that new publish messages return errors because the Publisher is paused.
940        for _ in 0..3 {
941            assert_publishing_is_paused!(publisher, key);
942        }
943
944        // Verify that the other ordering keys are not paused.
945        assert_publishing_is_ok!(publisher, "", "without_error");
946
947        Ok(())
948    }
949
950    #[tokio_test_no_panics(start_paused = true)]
951    async fn batch_error_pauses_ordering_key() -> anyhow::Result<()> {
952        // Verify that all messages in the same batch receives the Send error for that batch.
953        let mut mock = MockGapicPublisher::new();
954        mock.expect_publish()
955            .times(1)
956            .withf(|r, _| r.topic == TOPIC && r.messages.len() == 2)
957            .returning(publish_err);
958
959        let client = GapicPublisher::from_stub(mock);
960        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
961            .set_message_count_threshold(2_u32)
962            .build();
963
964        let key = "ordering_key";
965        // Publish 2 messages so they are in the same batch.
966        let msg_0_handle =
967            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
968        let msg_1_handle =
969            publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
970
971        // Validate that they both receives the Send error.
972        let mut got_err = msg_0_handle.await.unwrap_err();
973        assert_publish_err(got_err);
974        got_err = msg_1_handle.await.unwrap_err();
975        assert_publish_err(got_err);
976
977        // Assert that new publish messages returns an error because the Publisher is paused.
978        assert_publishing_is_paused!(publisher, key);
979
980        Ok(())
981    }
982
983    #[tokio_test_no_panics(start_paused = true)]
984    async fn flush_on_paused_ordering_key_returns_error() -> anyhow::Result<()> {
985        // Verify that Flush on a paused ordering key returns an error.
986        let mut seq = Sequence::new();
987        let mut mock = MockGapicPublisher::new();
988        mock.expect_publish()
989            .withf(|req, _o| req.topic == TOPIC)
990            .times(1)
991            .in_sequence(&mut seq)
992            .returning(publish_err);
993
994        mock.expect_publish()
995            .withf(|req, _o| req.topic == TOPIC)
996            .times(2)
997            .in_sequence(&mut seq)
998            .returning(publish_ok);
999
1000        let client = GapicPublisher::from_stub(mock);
1001        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1002
1003        let key = "ordering_key";
1004        // Cause an ordering key to be paused.
1005        let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1006        publisher.flush().await;
1007        // Assert the error is caused by the Publish send operation.
1008        let got_err = handle.await.unwrap_err();
1009        assert_publish_err(got_err);
1010
1011        // Validate that new Publish on the paused ordering key will result in an error.
1012        assert_publishing_is_paused!(publisher, key);
1013
1014        // Verify that the other ordering keys are not paused.
1015        assert_publishing_is_ok!(publisher, "", "without_error");
1016
1017        Ok(())
1018    }
1019
1020    #[tokio_test_no_panics(start_paused = true)]
1021    async fn resuming_non_paused_ordering_key_is_noop() -> anyhow::Result<()> {
1022        let mut mock = MockGapicPublisher::new();
1023        mock.expect_publish()
1024            .withf(|req, _o| req.topic == TOPIC)
1025            .times(4)
1026            .returning(publish_ok);
1027
1028        let client = GapicPublisher::from_stub(mock);
1029        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1030
1031        // Test resume and publish for empty ordering key.
1032        publisher.resume_publish("");
1033        assert_publishing_is_ok!(publisher, "");
1034
1035        // Test resume and publish after the BatchActor has been created for the empty ordering key.
1036        publisher.resume_publish("");
1037        assert_publishing_is_ok!(publisher, "");
1038
1039        // Test resume and publish before the BatchActor has been created.
1040        let key = "without_error";
1041        publisher.resume_publish(key);
1042        assert_publishing_is_ok!(publisher, key);
1043
1044        // Test resume and publish after the BatchActor has been created.
1045        publisher.resume_publish(key);
1046        assert_publishing_is_ok!(publisher, key);
1047
1048        Ok(())
1049    }
1050
1051    #[tokio_test_no_panics(start_paused = true)]
1052    async fn resuming_paused_ordering_key_allows_publishing() -> anyhow::Result<()> {
1053        let mut seq = Sequence::new();
1054        let mut mock = MockGapicPublisher::new();
1055        mock.expect_publish()
1056            .withf(|req, _o| req.topic == TOPIC)
1057            .times(1)
1058            .in_sequence(&mut seq)
1059            .returning(publish_err);
1060
1061        mock.expect_publish()
1062            .withf(|req, _o| req.topic == TOPIC)
1063            .times(3)
1064            .in_sequence(&mut seq)
1065            .returning(publish_ok);
1066
1067        let client = GapicPublisher::from_stub(mock);
1068        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1069
1070        let key = "ordering_key";
1071        // Cause an ordering key to be paused.
1072        let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1073        // Assert the error is caused by the Publish send operation.
1074        let got_err = handle.await.unwrap_err();
1075        assert_publish_err(got_err);
1076
1077        // Validate that new Publish on the paused ordering key will result in an error.
1078        assert_publishing_is_paused!(publisher, key);
1079
1080        // Resume and validate the key is no longer paused.
1081        publisher.resume_publish(key);
1082        assert_publishing_is_ok!(publisher, key);
1083
1084        // Verify that the other ordering keys continue to work as expected.
1085        assert_publishing_is_ok!(publisher, "", "without_error");
1086
1087        Ok(())
1088    }
1089
1090    #[tokio_test_no_panics(start_paused = true)]
1091    async fn resuming_ordering_key_twice_is_safe() -> anyhow::Result<()> {
1092        // Validate that resuming twice sequentially does not have bad side effects.
1093        let mut seq = Sequence::new();
1094        let mut mock = MockGapicPublisher::new();
1095        mock.expect_publish()
1096            .withf(|req, _o| req.topic == TOPIC)
1097            .in_sequence(&mut seq)
1098            .times(1)
1099            .returning(publish_err);
1100
1101        mock.expect_publish()
1102            .withf(|req, _o| req.topic == TOPIC)
1103            .in_sequence(&mut seq)
1104            .return_once(publish_ok);
1105
1106        let client = GapicPublisher::from_stub(mock);
1107        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1108
1109        let key = "ordering_key";
1110        // Cause an ordering key to be paused.
1111        let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1112        publisher.flush().await;
1113        // Assert the error is caused by the Publish send operation.
1114        let got_err = handle.await.unwrap_err();
1115        assert_publish_err(got_err);
1116
1117        // Validate that new Publish on the paused ordering key will result in an error.
1118        assert_publishing_is_paused!(publisher, key);
1119
1120        // Resume twice on the paused ordering key.
1121        publisher.resume_publish(key);
1122        publisher.resume_publish(key);
1123        assert_publishing_is_ok!(publisher, key);
1124
1125        Ok(())
1126    }
1127
1128    #[tokio_test_no_panics(start_paused = true)]
1129    async fn resuming_one_ordering_key_does_not_resume_others() -> anyhow::Result<()> {
1130        // Validate that resume_publish only resumes the paused ordering key .
1131        let mut seq = Sequence::new();
1132        let mut mock = MockGapicPublisher::new();
1133        mock.expect_publish()
1134            .withf(|req, _o| req.topic == TOPIC)
1135            .times(2)
1136            .in_sequence(&mut seq)
1137            .returning(publish_err);
1138
1139        mock.expect_publish()
1140            .withf(|req, _o| req.topic == TOPIC)
1141            .times(1)
1142            .in_sequence(&mut seq)
1143            .returning(publish_ok);
1144
1145        let client = GapicPublisher::from_stub(mock);
1146        let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1147
1148        let key_0 = "ordering_key_0";
1149        let key_1 = "ordering_key_1";
1150        // Cause both ordering keys to pause.
1151        let handle_0 = publisher.publish(Message::new().set_ordering_key(key_0).set_data("msg 0"));
1152        let handle_1 = publisher.publish(Message::new().set_ordering_key(key_1).set_data("msg 1"));
1153        publisher.flush().await;
1154        let mut got_err = handle_0.await.unwrap_err();
1155        assert_publish_err(got_err);
1156        got_err = handle_1.await.unwrap_err();
1157        assert_publish_err(got_err);
1158
1159        // Assert that both ordering keys are paused.
1160        assert_publishing_is_paused!(publisher, key_0, key_1);
1161
1162        // Resume on one of the ordering key.
1163        publisher.resume_publish(key_0);
1164
1165        // Validate that only the correct ordering key is resumed.
1166        assert_publishing_is_ok!(publisher, key_0);
1167
1168        // Validate the other ordering key is still paused.
1169        assert_publishing_is_paused!(publisher, key_1);
1170
1171        Ok(())
1172    }
1173
1174    #[tokio::test]
1175    async fn publisher_builder_clamps_batching_options() -> anyhow::Result<()> {
1176        // Test values that are too high and should be clamped.
1177        let oversized_options = BatchingOptions::new()
1178            .set_delay_threshold(MAX_DELAY + Duration::from_secs(1))
1179            .set_message_count_threshold(MAX_MESSAGES + 1)
1180            .set_byte_threshold(MAX_BYTES + 1);
1181
1182        let publishers = vec![
1183            BasePublisher::builder()
1184                .build()
1185                .await?
1186                .publisher("projects/my-project/topics/my-topic")
1187                .set_delay_threshold(oversized_options.delay_threshold)
1188                .set_message_count_threshold(oversized_options.message_count_threshold)
1189                .set_byte_threshold(oversized_options.byte_threshold)
1190                .build(),
1191            Publisher::builder("projects/my-project/topics/my-topic".to_string())
1192                .set_delay_threshold(oversized_options.delay_threshold)
1193                .set_message_count_threshold(oversized_options.message_count_threshold)
1194                .set_byte_threshold(oversized_options.byte_threshold)
1195                .build()
1196                .await?,
1197        ];
1198
1199        for publisher in publishers {
1200            let got = publisher.batching_options;
1201            assert_eq!(got.delay_threshold, MAX_DELAY);
1202            assert_eq!(got.message_count_threshold, MAX_MESSAGES);
1203            assert_eq!(got.byte_threshold, MAX_BYTES);
1204        }
1205
1206        // Test values that are within limits and should not be changed.
1207        let normal_options = BatchingOptions::new()
1208            .set_delay_threshold(Duration::from_secs(10))
1209            .set_message_count_threshold(10_u32)
1210            .set_byte_threshold(100_u32);
1211
1212        let publishers = vec![
1213            BasePublisher::builder()
1214                .build()
1215                .await?
1216                .publisher("projects/my-project/topics/my-topic")
1217                .set_delay_threshold(normal_options.delay_threshold)
1218                .set_message_count_threshold(normal_options.message_count_threshold)
1219                .set_byte_threshold(normal_options.byte_threshold)
1220                .build(),
1221            Publisher::builder("projects/my-project/topics/my-topic".to_string())
1222                .set_delay_threshold(normal_options.delay_threshold)
1223                .set_message_count_threshold(normal_options.message_count_threshold)
1224                .set_byte_threshold(normal_options.byte_threshold)
1225                .build()
1226                .await?,
1227        ];
1228
1229        for publisher in publishers {
1230            let got = publisher.batching_options;
1231
1232            assert_eq!(got.delay_threshold, normal_options.delay_threshold);
1233            assert_eq!(
1234                got.message_count_threshold,
1235                normal_options.message_count_threshold
1236            );
1237            assert_eq!(got.byte_threshold, normal_options.byte_threshold);
1238        }
1239        Ok(())
1240    }
1241}