1use super::options::BatchingOptions;
16use crate::publisher::actor::BundledMessage;
17use crate::publisher::actor::ToDispatcher;
18use crate::publisher::builder::PublisherBuilder;
19
20use tokio::sync::mpsc::UnboundedSender;
21use tokio::sync::oneshot;
22
23pub use super::base_publisher::BasePublisher;
24
25#[derive(Debug, Clone)]
67pub struct Publisher {
68 #[cfg_attr(not(test), expect(dead_code))]
71 pub(crate) batching_options: BatchingOptions,
72 pub(crate) tx: UnboundedSender<ToDispatcher>,
73}
74
75impl Publisher {
76 pub fn builder<T: Into<String>>(topic: T) -> PublisherBuilder {
88 PublisherBuilder::new(topic.into())
89 }
90
91 #[must_use = "ignoring the publish result may lead to undetected delivery failures"]
105 pub fn publish(&self, msg: crate::model::Message) -> crate::publisher::PublishFuture {
106 let (tx, rx) = tokio::sync::oneshot::channel();
107
108 if self
112 .tx
113 .send(ToDispatcher::Publish(BundledMessage { msg, tx }))
114 .is_err()
115 {
116 }
118 crate::publisher::PublishFuture { rx }
119 }
120
121 pub async fn flush(&self) {
146 let (tx, rx) = oneshot::channel();
147 if self.tx.send(ToDispatcher::Flush(tx)).is_ok() {
148 let _ = rx.await;
149 }
150 }
151
152 pub fn resume_publish<T: std::convert::Into<std::string::String>>(&self, ordering_key: T) {
170 let _ = self
171 .tx
172 .send(ToDispatcher::ResumePublish(ordering_key.into()));
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::publisher::builder::PublisherPartialBuilder;
180 use crate::publisher::client::BasePublisher;
181 use crate::publisher::constants::*;
182 use crate::publisher::options::BatchingOptions;
183 use crate::{
184 generated::gapic_dataplane::client::Publisher as GapicPublisher,
185 model::{Message, PublishResponse},
186 };
187 use google_cloud_test_macros::tokio_test_no_panics;
188 use mockall::Sequence;
189 use rand::{RngExt, distr::Alphanumeric};
190 use std::error::Error;
191 use std::time::Duration;
192
193 static TOPIC: &str = "my-topic";
194
195 mockall::mock! {
196 #[derive(Debug)]
197 GapicPublisher {}
198 impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisher {
199 async fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> crate::Result<crate::Response<crate::model::PublishResponse>>;
200 }
201 }
202
203 mockall::mock! {
210 #[derive(Debug)]
211 GapicPublisherWithFuture {}
212 impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
213 fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> impl Future<Output=crate::Result<crate::Response<crate::model::PublishResponse>>> + Send;
214 }
215 }
216
217 fn publish_ok(
218 req: crate::model::PublishRequest,
219 _options: crate::RequestOptions,
220 ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
221 let ids = req
222 .messages
223 .iter()
224 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
225 Ok(crate::Response::from(
226 PublishResponse::new().set_message_ids(ids),
227 ))
228 }
229
230 fn publish_err(
231 _req: crate::model::PublishRequest,
232 _options: crate::RequestOptions,
233 ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
234 Err(crate::Error::service(
235 google_cloud_gax::error::rpc::Status::default()
236 .set_code(google_cloud_gax::error::rpc::Code::Unknown)
237 .set_message("unknown error has occurred"),
238 ))
239 }
240
241 #[track_caller]
242 fn assert_publish_err(got_err: crate::error::PublishError) {
243 assert!(
244 matches!(got_err, crate::error::PublishError::Rpc(_)),
245 "{got_err:?}"
246 );
247 let source = got_err
248 .source()
249 .and_then(|e| e.downcast_ref::<std::sync::Arc<crate::Error>>())
250 .expect("send error should contain a source");
251 assert!(source.status().is_some(), "{got_err:?}");
252 assert_eq!(
253 source.status().unwrap().code,
254 google_cloud_gax::error::rpc::Code::Unknown,
255 "{got_err:?}"
256 );
257 }
258
259 fn generate_random_data() -> String {
260 rand::rng()
261 .sample_iter(&Alphanumeric)
262 .take(16)
263 .map(char::from)
264 .collect()
265 }
266
267 macro_rules! assert_publishing_is_ok {
268 ($publisher:ident, $($ordering_key:expr),+) => {
269 $(
270 let msg = generate_random_data();
271 let got = $publisher
272 .publish(
273 Message::new()
274 .set_ordering_key($ordering_key)
275 .set_data(msg.clone()),
276 )
277 .await;
278 assert_eq!(got?, msg);
279 )+
280 };
281 }
282
283 macro_rules! assert_publishing_is_paused {
284 ($publisher:ident, $($ordering_key:expr),+) => {
285 $(
286 let got_err = $publisher
287 .publish(
288 Message::new()
289 .set_ordering_key($ordering_key)
290 .set_data(generate_random_data()),
291 )
292 .await;
293 assert!(
294 matches!(got_err, Err(crate::error::PublishError::OrderingKeyPaused)),
295 "{got_err:?}"
296 );
297 )+
298 };
299 }
300
301 #[tokio_test_no_panics]
302 async fn publisher_publish_successfully() -> anyhow::Result<()> {
303 let mut mock = MockGapicPublisher::new();
304 mock.expect_publish()
305 .times(2)
306 .withf(|req, _o| req.topic == TOPIC)
307 .returning(publish_ok);
308
309 let client = GapicPublisher::from_stub(mock);
310 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
311 .set_message_count_threshold(1_u32)
312 .build();
313
314 let messages = [
315 Message::new().set_data("hello"),
316 Message::new().set_data("world"),
317 ];
318 let mut handles = Vec::new();
319 for msg in messages {
320 let handle = publisher.publish(msg.clone());
321 handles.push((msg, handle));
322 }
323
324 for (id, rx) in handles.into_iter() {
325 let got = rx.await?;
326 let id = String::from_utf8(id.data.to_vec())?;
327 assert_eq!(got, id);
328 }
329
330 Ok(())
331 }
332
333 #[tokio_test_no_panics]
334 async fn publisher_publish_successfully_with_arc() -> anyhow::Result<()> {
335 let mut mock = MockGapicPublisher::new();
336 mock.expect_publish()
337 .times(2)
338 .withf(|req, _o| req.topic == TOPIC)
339 .returning(publish_ok);
340
341 let mock_arc = std::sync::Arc::new(mock);
342 let client = GapicPublisher::from_stub::<MockGapicPublisher>(mock_arc);
343 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
344 .set_message_count_threshold(1_u32)
345 .build();
346
347 let messages = [
348 Message::new().set_data("hello"),
349 Message::new().set_data("world"),
350 ];
351 let mut handles = Vec::new();
352 for msg in messages {
353 let handle = publisher.publish(msg.clone());
354 handles.push((msg, handle));
355 }
356
357 for (id, rx) in handles.into_iter() {
358 let got = rx.await?;
359 let id = String::from_utf8(id.data.to_vec())?;
360 assert_eq!(got, id);
361 }
362
363 Ok(())
364 }
365
366 #[tokio::test]
367 async fn publisher_publish_large_message() -> anyhow::Result<()> {
368 let mut mock = MockGapicPublisher::new();
369 mock.expect_publish()
370 .withf(|req, _o| req.topic == TOPIC)
371 .returning(publish_ok);
372
373 let client = GapicPublisher::from_stub(mock);
374 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
375 .set_byte_threshold(1_u32)
376 .build();
377 assert_publishing_is_ok!(publisher, "");
378 assert_publishing_is_ok!(publisher, "key");
379
380 Ok(())
381 }
382
383 #[tokio::test(start_paused = true)]
384 async fn worker_handles_forced_shutdown_gracefully() -> anyhow::Result<()> {
385 let mock = MockGapicPublisher::new();
386
387 let client = GapicPublisher::from_stub(mock);
388 let (publisher, background_task_handle) =
389 PublisherPartialBuilder::new(client, TOPIC.to_string())
390 .set_message_count_threshold(100_u32)
391 .build_return_handle();
392
393 let messages = [
394 Message::new().set_data("hello"),
395 Message::new().set_data("world"),
396 ];
397 let mut handles = Vec::new();
398 for msg in messages {
399 let handle = publisher.publish(msg);
400 handles.push(handle);
401 }
402
403 background_task_handle.abort();
404
405 for rx in handles.into_iter() {
406 rx.await
407 .expect_err("expected error when background task canceled");
408 }
409
410 Ok(())
411 }
412
413 #[tokio_test_no_panics(start_paused = true)]
414 async fn dropping_publisher_flushes_pending_messages() -> anyhow::Result<()> {
415 let mut mock = MockGapicPublisher::new();
418 mock.expect_publish()
419 .withf(|req, _o| req.topic == TOPIC)
420 .times(2)
421 .returning(publish_ok);
422
423 let client = GapicPublisher::from_stub(mock);
424 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
425 .set_message_count_threshold(1000_u32)
426 .set_delay_threshold(Duration::from_secs(60))
427 .build();
428
429 let start = tokio::time::Instant::now();
430 let messages = [
431 Message::new().set_data("hello"),
432 Message::new().set_data("world"),
433 Message::new().set_data("hello").set_ordering_key("key"),
434 Message::new().set_data("world").set_ordering_key("key"),
435 ];
436 let mut handles = Vec::new();
437 for msg in messages {
438 let handle = publisher.publish(msg.clone());
439 handles.push((msg, handle));
440 }
441 drop(publisher); for (id, rx) in handles.into_iter() {
444 let got = rx.await?;
445 let id = String::from_utf8(id.data.to_vec())?;
446 assert_eq!(got, id);
447 assert_eq!(start.elapsed(), Duration::ZERO);
448 }
449
450 Ok(())
451 }
452
453 #[tokio_test_no_panics]
454 async fn publisher_handles_publish_errors() -> anyhow::Result<()> {
455 let mut mock = MockGapicPublisher::new();
456 mock.expect_publish()
457 .times(2)
458 .withf(|req, _o| req.topic == TOPIC)
459 .returning(publish_err);
460
461 let client = GapicPublisher::from_stub(mock);
462 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
463 .set_message_count_threshold(1_u32)
464 .build();
465
466 let messages = [
467 Message::new().set_data("hello"),
468 Message::new().set_data("world"),
469 ];
470
471 let mut handles = Vec::new();
472 for msg in messages {
473 let handle = publisher.publish(msg.clone());
474 handles.push(handle);
475 }
476
477 for rx in handles.into_iter() {
478 let got = rx.await;
479 assert!(got.is_err(), "{got:?}");
480 }
481
482 Ok(())
483 }
484
485 #[tokio_test_no_panics(start_paused = true)]
486 async fn flush_sends_pending_messages_immediately() -> anyhow::Result<()> {
487 let mut mock = MockGapicPublisher::new();
488 mock.expect_publish()
489 .withf(|req, _o| req.topic == TOPIC)
490 .returning(publish_ok);
491
492 let client = GapicPublisher::from_stub(mock);
493 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
494 .set_message_count_threshold(1000_u32)
496 .set_delay_threshold(Duration::from_secs(60))
497 .build();
498
499 let start = tokio::time::Instant::now();
500 let messages = [
501 Message::new().set_data("hello"),
502 Message::new().set_data("world"),
503 ];
504 let mut handles = Vec::new();
505 for msg in messages {
506 let handle = publisher.publish(msg.clone());
507 handles.push((msg, handle));
508 }
509
510 publisher.flush().await;
511 assert_eq!(start.elapsed(), Duration::ZERO);
512
513 let post = publisher.publish(Message::new().set_data("after"));
514 for (id, rx) in handles.into_iter() {
515 let got = rx.await?;
516 let id = String::from_utf8(id.data.to_vec())?;
517 assert_eq!(got, id);
518 assert_eq!(start.elapsed(), Duration::ZERO);
519 }
520
521 let got = post.await?;
524 assert_eq!(got, "after");
525 assert_eq!(start.elapsed(), Duration::from_secs(60));
526
527 Ok(())
528 }
529
530 #[tokio_test_no_panics(start_paused = true)]
531 async fn dropping_handles_does_not_prevent_publishing() -> anyhow::Result<()> {
533 let mut mock = MockGapicPublisher::new();
534 mock.expect_publish()
535 .withf(|r, _| {
536 r.messages.len() == 2
537 && r.messages[0].data == "hello"
538 && r.messages[1].data == "world"
539 })
540 .return_once(publish_ok);
541
542 let client = GapicPublisher::from_stub(mock);
543 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
544 .set_message_count_threshold(1000_u32)
546 .set_delay_threshold(Duration::from_secs(60))
547 .build();
548
549 let start = tokio::time::Instant::now();
550 let messages = [
551 Message::new().set_data("hello"),
552 Message::new().set_data("world"),
553 ];
554 for msg in messages {
555 let handle = publisher.publish(msg.clone());
556 drop(handle);
557 }
558
559 publisher.flush().await;
560 assert_eq!(start.elapsed(), Duration::ZERO);
561
562 Ok(())
563 }
564
565 #[tokio::test(start_paused = true)]
566 async fn flush_with_no_messages_is_noop() -> anyhow::Result<()> {
567 let mock = MockGapicPublisher::new();
568
569 let client = GapicPublisher::from_stub(mock);
570 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
571
572 let start = tokio::time::Instant::now();
573 publisher.flush().await;
574 assert_eq!(start.elapsed(), Duration::ZERO);
575
576 Ok(())
577 }
578
579 #[tokio_test_no_panics]
580 async fn batch_sends_on_message_count_threshold_success() -> anyhow::Result<()> {
581 let mut mock = MockGapicPublisher::new();
583 mock.expect_publish()
584 .withf(|r, _| r.messages.len() == 2)
585 .return_once(publish_ok);
586
587 let client = GapicPublisher::from_stub(mock);
588 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
589 .set_message_count_threshold(2_u32)
590 .set_byte_threshold(MAX_BYTES)
591 .set_delay_threshold(std::time::Duration::MAX)
592 .build();
593
594 let messages = [
595 Message::new().set_data("hello"),
596 Message::new().set_data("world"),
597 ];
598 let mut handles = Vec::new();
599 for msg in messages {
600 let handle = publisher.publish(msg.clone());
601 handles.push((msg, handle));
602 }
603
604 for (id, rx) in handles.into_iter() {
605 let got = rx.await?;
606 let id = String::from_utf8(id.data.to_vec())?;
607 assert_eq!(got, id);
608 }
609
610 Ok(())
611 }
612
613 #[tokio_test_no_panics]
614 async fn batch_sends_on_message_count_threshold_error() -> anyhow::Result<()> {
615 let mut mock = MockGapicPublisher::new();
617 mock.expect_publish()
618 .withf(|r, _| r.messages.len() == 2)
619 .return_once(publish_err);
620
621 let client = GapicPublisher::from_stub(mock);
622 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
623 .set_message_count_threshold(2_u32)
624 .set_byte_threshold(MAX_BYTES)
625 .set_delay_threshold(std::time::Duration::MAX)
626 .build();
627
628 let messages = [
629 Message::new().set_data("hello"),
630 Message::new().set_data("world"),
631 ];
632 let mut handles = Vec::new();
633 for msg in messages {
634 let handle = publisher.publish(msg.clone());
635 handles.push(handle);
636 }
637
638 for rx in handles.into_iter() {
639 let got = rx.await;
640 assert!(got.is_err(), "{got:?}");
641 }
642
643 Ok(())
644 }
645
646 #[tokio_test_no_panics(start_paused = true)]
647 async fn batch_sends_on_byte_threshold() -> anyhow::Result<()> {
648 let mut mock = MockGapicPublisher::new();
650 mock.expect_publish()
651 .withf(|r, _| r.messages.len() == 1)
652 .times(2)
653 .returning(publish_ok);
654
655 let client = GapicPublisher::from_stub(mock);
656 let byte_threshold = TOPIC.len() + "hello".len() + "key".len() + 1;
658 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
659 .set_message_count_threshold(MAX_MESSAGES)
660 .set_byte_threshold(byte_threshold as u32)
661 .set_delay_threshold(std::time::Duration::MAX)
662 .build();
663
664 let handle = publisher.publish(Message::new().set_data("hello"));
666 let _handle = publisher.publish(Message::new().set_data("world"));
668 assert_eq!(handle.await?, "hello");
669
670 let handle = publisher.publish(Message::new().set_data("hello").set_ordering_key("key"));
672 let _handle = publisher.publish(Message::new().set_data("world").set_ordering_key("key"));
674 assert_eq!(handle.await?, "hello");
675
676 Ok(())
677 }
678
679 #[tokio_test_no_panics(start_paused = true)]
680 async fn batch_sends_on_delay_threshold() -> anyhow::Result<()> {
681 let mut mock = MockGapicPublisher::new();
682 mock.expect_publish()
683 .withf(|req, _| req.topic == TOPIC)
684 .returning(publish_ok);
685
686 let client = GapicPublisher::from_stub(mock);
687 let delay = std::time::Duration::from_millis(10);
688 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
689 .set_message_count_threshold(u32::MAX)
690 .set_byte_threshold(MAX_BYTES)
691 .set_delay_threshold(delay)
692 .build();
693
694 for _ in 0..3 {
696 let start = tokio::time::Instant::now();
697 let messages = [
698 Message::new().set_data("hello 0"),
699 Message::new().set_data("hello 1"),
700 Message::new()
701 .set_data("hello 2")
702 .set_ordering_key("ordering key 1"),
703 Message::new()
704 .set_data("hello 3")
705 .set_ordering_key("ordering key 2"),
706 ];
707 let mut handles = Vec::new();
708 for msg in messages {
709 let handle = publisher.publish(msg.clone());
710 handles.push((msg, handle));
711 }
712
713 for (id, rx) in handles.into_iter() {
714 let got = rx.await?;
715 let id = String::from_utf8(id.data.to_vec())?;
716 assert_eq!(got, id);
717 assert_eq!(
718 start.elapsed(),
719 delay,
720 "batch of messages should have sent after {:?}",
721 delay
722 )
723 }
724 }
725
726 Ok(())
727 }
728
729 #[tokio::test(start_paused = true)]
730 #[allow(clippy::get_first)]
731 async fn batching_separates_by_ordering_key() -> anyhow::Result<()> {
732 let mut mock = MockGapicPublisher::new();
734 mock.expect_publish()
735 .withf(|r, _| {
736 r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
737 })
738 .returning(publish_ok);
739
740 let client = GapicPublisher::from_stub(mock);
741 let message_count_threshold = 2_u32;
743 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
744 .set_message_count_threshold(message_count_threshold)
745 .set_byte_threshold(MAX_BYTES)
746 .set_delay_threshold(std::time::Duration::MAX)
747 .build();
748
749 let num_ordering_keys = 3;
750 let mut messages = Vec::new();
751 for i in 0..(2 * message_count_threshold * num_ordering_keys) {
755 messages.push(
756 Message::new()
757 .set_data(format!("test message {}", i))
758 .set_ordering_key(format!("ordering key: {}", i % num_ordering_keys)),
759 );
760 }
761 let mut handles = Vec::new();
762 for msg in messages {
763 let handle = publisher.publish(msg.clone());
764 handles.push((msg, handle));
765 }
766
767 for (id, rx) in handles.into_iter() {
768 let got = rx.await?;
769 let id = String::from_utf8(id.data.to_vec())?;
770 assert_eq!(got, id);
771 }
772
773 Ok(())
774 }
775
776 #[tokio_test_no_panics(start_paused = true)]
777 #[allow(clippy::get_first)]
778 async fn batching_handles_empty_ordering_key() -> anyhow::Result<()> {
779 let mut mock = MockGapicPublisher::new();
781 mock.expect_publish()
782 .withf(|r, _| {
783 r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
784 })
785 .returning(publish_ok);
786
787 let client = GapicPublisher::from_stub(mock);
788 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
790 .set_message_count_threshold(2_u32)
791 .set_byte_threshold(MAX_BYTES)
792 .set_delay_threshold(std::time::Duration::MAX)
793 .build();
794
795 let messages = [
796 Message::new().set_data("hello 1"),
797 Message::new().set_data("hello 2").set_ordering_key(""),
798 Message::new()
799 .set_data("hello 3")
800 .set_ordering_key("ordering key :1"),
801 Message::new()
802 .set_data("hello 4")
803 .set_ordering_key("ordering key :1"),
804 ];
805
806 let mut handles = Vec::new();
807 for msg in messages {
808 let handle = publisher.publish(msg.clone());
809 handles.push((msg, handle));
810 }
811
812 for (id, rx) in handles.into_iter() {
813 let got = rx.await?;
814 let id = String::from_utf8(id.data.to_vec())?;
815 assert_eq!(got, id);
816 }
817
818 Ok(())
819 }
820
821 #[tokio_test_no_panics(start_paused = true)]
822 #[allow(clippy::get_first)]
823 async fn ordering_key_limits_to_one_outstanding_batch() -> anyhow::Result<()> {
824 let mut seq = Sequence::new();
828 let mut mock = MockGapicPublisherWithFuture::new();
829 mock.expect_publish()
830 .times(1)
831 .in_sequence(&mut seq)
832 .withf(|r, _| r.messages.len() == 1)
833 .returning({
834 |r, o| {
835 Box::pin(async move {
836 tokio::time::sleep(Duration::from_millis(10)).await;
837 publish_ok(r, o)
838 })
839 }
840 });
841
842 mock.expect_publish()
843 .times(1)
844 .in_sequence(&mut seq)
845 .withf(|r, _| r.messages.len() == 1)
846 .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
847
848 let client = GapicPublisher::from_stub(mock);
849 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
851 .set_message_count_threshold(1_u32)
852 .set_byte_threshold(MAX_BYTES)
853 .set_delay_threshold(std::time::Duration::MAX)
854 .build();
855
856 let messages = [
857 Message::new()
858 .set_data("hello 1")
859 .set_ordering_key("ordering key"),
860 Message::new()
861 .set_data("hello 2")
862 .set_ordering_key("ordering key"),
863 ];
864
865 let start = tokio::time::Instant::now();
866 let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
867 let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
868 assert_eq!(msg2_handle.await?, "hello 2");
869 assert_eq!(
870 start.elapsed(),
871 Duration::from_millis(10),
872 "the second batch of messages should have sent after the first which is has been delayed by {:?}",
873 Duration::from_millis(10)
874 );
875 assert_eq!(msg1_handle.await?, "hello 1");
877
878 Ok(())
879 }
880
881 #[tokio_test_no_panics(start_paused = true)]
882 #[allow(clippy::get_first)]
883 async fn empty_ordering_key_allows_concurrent_batches() -> anyhow::Result<()> {
884 let mut seq = Sequence::new();
889 let mut mock = MockGapicPublisherWithFuture::new();
890 mock.expect_publish()
891 .times(1)
892 .in_sequence(&mut seq)
893 .withf(|r, _| r.messages.len() == 1)
894 .returning(|r, o| {
895 Box::pin(async move {
896 tokio::time::sleep(Duration::from_millis(10)).await;
897 publish_ok(r, o)
898 })
899 });
900
901 mock.expect_publish()
902 .times(1)
903 .in_sequence(&mut seq)
904 .withf(|r, _| r.topic == TOPIC && r.messages.len() == 1)
905 .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
906
907 let client = GapicPublisher::from_stub(mock);
908 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
910 .set_message_count_threshold(1_u32)
911 .set_byte_threshold(MAX_BYTES)
912 .set_delay_threshold(std::time::Duration::MAX)
913 .build();
914
915 let messages = [
916 Message::new().set_data("hello 1").set_ordering_key(""),
917 Message::new().set_data("hello 2").set_ordering_key(""),
918 ];
919
920 let start = tokio::time::Instant::now();
921 let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
922 let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
923 assert_eq!(msg2_handle.await?, "hello 2");
924 assert_eq!(
925 start.elapsed(),
926 Duration::from_millis(0),
927 "the second batch of messages should have sent without any delay"
928 );
929 assert_eq!(msg1_handle.await?, "hello 1");
931
932 Ok(())
933 }
934
935 #[tokio_test_no_panics(start_paused = true)]
936 async fn ordering_key_error_pauses_publisher() -> anyhow::Result<()> {
937 let mut seq = Sequence::new();
939 let mut mock = MockGapicPublisher::new();
940 mock.expect_publish()
941 .withf(|req, _o| req.topic == TOPIC)
942 .times(1)
943 .in_sequence(&mut seq)
944 .returning(publish_err);
945
946 mock.expect_publish()
947 .withf(|req, _o| req.topic == TOPIC)
948 .times(2)
949 .in_sequence(&mut seq)
950 .returning(publish_ok);
951
952 let client = GapicPublisher::from_stub(mock);
953 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
954 .set_message_count_threshold(1_u32)
955 .build();
956
957 let key = "ordering_key";
958 let msg_0_handle =
959 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
960 let msg_1_handle =
962 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
963
964 let mut got_err = msg_0_handle.await.unwrap_err();
966 assert_publish_err(got_err);
967
968 got_err = msg_1_handle.await.unwrap_err();
970 assert!(
971 matches!(got_err, crate::error::PublishError::OrderingKeyPaused),
972 "{got_err:?}"
973 );
974
975 for _ in 0..3 {
977 assert_publishing_is_paused!(publisher, key);
978 }
979
980 assert_publishing_is_ok!(publisher, "", "without_error");
982
983 Ok(())
984 }
985
986 #[tokio_test_no_panics(start_paused = true)]
987 async fn batch_error_pauses_ordering_key() -> anyhow::Result<()> {
988 let mut mock = MockGapicPublisher::new();
990 mock.expect_publish()
991 .times(1)
992 .withf(|r, _| r.topic == TOPIC && r.messages.len() == 2)
993 .returning(publish_err);
994
995 let client = GapicPublisher::from_stub(mock);
996 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
997 .set_message_count_threshold(2_u32)
998 .build();
999
1000 let key = "ordering_key";
1001 let msg_0_handle =
1003 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1004 let msg_1_handle =
1005 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
1006
1007 let mut got_err = msg_0_handle.await.unwrap_err();
1009 assert_publish_err(got_err);
1010 got_err = msg_1_handle.await.unwrap_err();
1011 assert_publish_err(got_err);
1012
1013 assert_publishing_is_paused!(publisher, key);
1015
1016 Ok(())
1017 }
1018
1019 #[tokio_test_no_panics(start_paused = true)]
1020 async fn flush_on_paused_ordering_key_returns_error() -> anyhow::Result<()> {
1021 let mut seq = Sequence::new();
1023 let mut mock = MockGapicPublisher::new();
1024 mock.expect_publish()
1025 .withf(|req, _o| req.topic == TOPIC)
1026 .times(1)
1027 .in_sequence(&mut seq)
1028 .returning(publish_err);
1029
1030 mock.expect_publish()
1031 .withf(|req, _o| req.topic == TOPIC)
1032 .times(2)
1033 .in_sequence(&mut seq)
1034 .returning(publish_ok);
1035
1036 let client = GapicPublisher::from_stub(mock);
1037 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1038
1039 let key = "ordering_key";
1040 let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1042 publisher.flush().await;
1043 let got_err = handle.await.unwrap_err();
1045 assert_publish_err(got_err);
1046
1047 assert_publishing_is_paused!(publisher, key);
1049
1050 assert_publishing_is_ok!(publisher, "", "without_error");
1052
1053 Ok(())
1054 }
1055
1056 #[tokio_test_no_panics(start_paused = true)]
1057 async fn resuming_non_paused_ordering_key_is_noop() -> anyhow::Result<()> {
1058 let mut mock = MockGapicPublisher::new();
1059 mock.expect_publish()
1060 .withf(|req, _o| req.topic == TOPIC)
1061 .times(4)
1062 .returning(publish_ok);
1063
1064 let client = GapicPublisher::from_stub(mock);
1065 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1066
1067 publisher.resume_publish("");
1069 assert_publishing_is_ok!(publisher, "");
1070
1071 publisher.resume_publish("");
1073 assert_publishing_is_ok!(publisher, "");
1074
1075 let key = "without_error";
1077 publisher.resume_publish(key);
1078 assert_publishing_is_ok!(publisher, key);
1079
1080 publisher.resume_publish(key);
1082 assert_publishing_is_ok!(publisher, key);
1083
1084 Ok(())
1085 }
1086
1087 #[tokio_test_no_panics(start_paused = true)]
1088 async fn resuming_paused_ordering_key_allows_publishing() -> anyhow::Result<()> {
1089 let mut seq = Sequence::new();
1090 let mut mock = MockGapicPublisher::new();
1091 mock.expect_publish()
1092 .withf(|req, _o| req.topic == TOPIC)
1093 .times(1)
1094 .in_sequence(&mut seq)
1095 .returning(publish_err);
1096
1097 mock.expect_publish()
1098 .withf(|req, _o| req.topic == TOPIC)
1099 .times(3)
1100 .in_sequence(&mut seq)
1101 .returning(publish_ok);
1102
1103 let client = GapicPublisher::from_stub(mock);
1104 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1105
1106 let key = "ordering_key";
1107 let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1109 let got_err = handle.await.unwrap_err();
1111 assert_publish_err(got_err);
1112
1113 assert_publishing_is_paused!(publisher, key);
1115
1116 publisher.resume_publish(key);
1118 assert_publishing_is_ok!(publisher, key);
1119
1120 assert_publishing_is_ok!(publisher, "", "without_error");
1122
1123 Ok(())
1124 }
1125
1126 #[tokio_test_no_panics(start_paused = true)]
1127 async fn resuming_ordering_key_twice_is_safe() -> anyhow::Result<()> {
1128 let mut seq = Sequence::new();
1130 let mut mock = MockGapicPublisher::new();
1131 mock.expect_publish()
1132 .withf(|req, _o| req.topic == TOPIC)
1133 .in_sequence(&mut seq)
1134 .times(1)
1135 .returning(publish_err);
1136
1137 mock.expect_publish()
1138 .withf(|req, _o| req.topic == TOPIC)
1139 .in_sequence(&mut seq)
1140 .return_once(publish_ok);
1141
1142 let client = GapicPublisher::from_stub(mock);
1143 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1144
1145 let key = "ordering_key";
1146 let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1148 publisher.flush().await;
1149 let got_err = handle.await.unwrap_err();
1151 assert_publish_err(got_err);
1152
1153 assert_publishing_is_paused!(publisher, key);
1155
1156 publisher.resume_publish(key);
1158 publisher.resume_publish(key);
1159 assert_publishing_is_ok!(publisher, key);
1160
1161 Ok(())
1162 }
1163
1164 #[tokio_test_no_panics(start_paused = true)]
1165 async fn resuming_one_ordering_key_does_not_resume_others() -> anyhow::Result<()> {
1166 let mut seq = Sequence::new();
1168 let mut mock = MockGapicPublisher::new();
1169 mock.expect_publish()
1170 .withf(|req, _o| req.topic == TOPIC)
1171 .times(2)
1172 .in_sequence(&mut seq)
1173 .returning(publish_err);
1174
1175 mock.expect_publish()
1176 .withf(|req, _o| req.topic == TOPIC)
1177 .times(1)
1178 .in_sequence(&mut seq)
1179 .returning(publish_ok);
1180
1181 let client = GapicPublisher::from_stub(mock);
1182 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1183
1184 let key_0 = "ordering_key_0";
1185 let key_1 = "ordering_key_1";
1186 let handle_0 = publisher.publish(Message::new().set_ordering_key(key_0).set_data("msg 0"));
1188 let handle_1 = publisher.publish(Message::new().set_ordering_key(key_1).set_data("msg 1"));
1189 publisher.flush().await;
1190 let mut got_err = handle_0.await.unwrap_err();
1191 assert_publish_err(got_err);
1192 got_err = handle_1.await.unwrap_err();
1193 assert_publish_err(got_err);
1194
1195 assert_publishing_is_paused!(publisher, key_0, key_1);
1197
1198 publisher.resume_publish(key_0);
1200
1201 assert_publishing_is_ok!(publisher, key_0);
1203
1204 assert_publishing_is_paused!(publisher, key_1);
1206
1207 Ok(())
1208 }
1209
1210 #[tokio::test]
1211 async fn publisher_builder_clamps_batching_options() -> anyhow::Result<()> {
1212 let oversized_options = BatchingOptions::new()
1214 .set_delay_threshold(MAX_DELAY + Duration::from_secs(1))
1215 .set_message_count_threshold(MAX_MESSAGES + 1)
1216 .set_byte_threshold(MAX_BYTES + 1);
1217
1218 let publishers = vec![
1219 BasePublisher::builder()
1220 .build()
1221 .await?
1222 .publisher("projects/my-project/topics/my-topic")
1223 .set_delay_threshold(oversized_options.delay_threshold)
1224 .set_message_count_threshold(oversized_options.message_count_threshold)
1225 .set_byte_threshold(oversized_options.byte_threshold)
1226 .build(),
1227 Publisher::builder("projects/my-project/topics/my-topic".to_string())
1228 .set_delay_threshold(oversized_options.delay_threshold)
1229 .set_message_count_threshold(oversized_options.message_count_threshold)
1230 .set_byte_threshold(oversized_options.byte_threshold)
1231 .build()
1232 .await?,
1233 ];
1234
1235 for publisher in publishers {
1236 let got = publisher.batching_options;
1237 assert_eq!(got.delay_threshold, MAX_DELAY);
1238 assert_eq!(got.message_count_threshold, MAX_MESSAGES);
1239 assert_eq!(got.byte_threshold, MAX_BYTES);
1240 }
1241
1242 let normal_options = BatchingOptions::new()
1244 .set_delay_threshold(Duration::from_secs(10))
1245 .set_message_count_threshold(10_u32)
1246 .set_byte_threshold(100_u32);
1247
1248 let publishers = vec![
1249 BasePublisher::builder()
1250 .build()
1251 .await?
1252 .publisher("projects/my-project/topics/my-topic")
1253 .set_delay_threshold(normal_options.delay_threshold)
1254 .set_message_count_threshold(normal_options.message_count_threshold)
1255 .set_byte_threshold(normal_options.byte_threshold)
1256 .build(),
1257 Publisher::builder("projects/my-project/topics/my-topic".to_string())
1258 .set_delay_threshold(normal_options.delay_threshold)
1259 .set_message_count_threshold(normal_options.message_count_threshold)
1260 .set_byte_threshold(normal_options.byte_threshold)
1261 .build()
1262 .await?,
1263 ];
1264
1265 for publisher in publishers {
1266 let got = publisher.batching_options;
1267
1268 assert_eq!(got.delay_threshold, normal_options.delay_threshold);
1269 assert_eq!(
1270 got.message_count_threshold,
1271 normal_options.message_count_threshold
1272 );
1273 assert_eq!(got.byte_threshold, normal_options.byte_threshold);
1274 }
1275 Ok(())
1276 }
1277}