1use super::options::BatchingOptions;
16use crate::publisher::actor::BundledMessage;
17use crate::publisher::actor::ToDispatcher;
18use crate::publisher::builder::PublisherBuilder;
19
20use tokio::sync::mpsc::UnboundedSender;
21use tokio::sync::oneshot;
22
23pub use super::base_publisher::BasePublisher;
24
25#[derive(Debug, Clone)]
67pub struct Publisher {
68 #[allow(dead_code)]
69 pub(crate) batching_options: BatchingOptions,
70 pub(crate) tx: UnboundedSender<ToDispatcher>,
71}
72
73impl Publisher {
74 pub fn builder<T: Into<String>>(topic: T) -> PublisherBuilder {
86 PublisherBuilder::new(topic.into())
87 }
88
89 #[must_use = "ignoring the publish result may lead to undetected delivery failures"]
103 pub fn publish(&self, msg: crate::model::Message) -> crate::publisher::PublishFuture {
104 let (tx, rx) = tokio::sync::oneshot::channel();
105
106 if self
110 .tx
111 .send(ToDispatcher::Publish(BundledMessage { msg, tx }))
112 .is_err()
113 {
114 }
116 crate::publisher::PublishFuture { rx }
117 }
118
119 pub async fn flush(&self) {
144 let (tx, rx) = oneshot::channel();
145 if self.tx.send(ToDispatcher::Flush(tx)).is_ok() {
146 let _ = rx.await;
147 }
148 }
149
150 pub fn resume_publish<T: std::convert::Into<std::string::String>>(&self, ordering_key: T) {
168 let _ = self
169 .tx
170 .send(ToDispatcher::ResumePublish(ordering_key.into()));
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use crate::publisher::builder::PublisherPartialBuilder;
178 use crate::publisher::client::BasePublisher;
179 use crate::publisher::constants::*;
180 use crate::publisher::options::BatchingOptions;
181 use crate::{
182 generated::gapic_dataplane::client::Publisher as GapicPublisher,
183 model::{Message, PublishResponse},
184 };
185 use google_cloud_test_macros::tokio_test_no_panics;
186 use mockall::Sequence;
187 use rand::{RngExt, distr::Alphanumeric};
188 use std::error::Error;
189 use std::time::Duration;
190
191 static TOPIC: &str = "my-topic";
192
193 mockall::mock! {
194 #[derive(Debug)]
195 GapicPublisher {}
196 impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisher {
197 async fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> crate::Result<crate::Response<crate::model::PublishResponse>>;
198 }
199 }
200
201 mockall::mock! {
208 #[derive(Debug)]
209 GapicPublisherWithFuture {}
210 impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
211 fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> impl Future<Output=crate::Result<crate::Response<crate::model::PublishResponse>>> + Send;
212 }
213 }
214
215 fn publish_ok(
216 req: crate::model::PublishRequest,
217 _options: crate::RequestOptions,
218 ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
219 let ids = req
220 .messages
221 .iter()
222 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
223 Ok(crate::Response::from(
224 PublishResponse::new().set_message_ids(ids),
225 ))
226 }
227
228 fn publish_err(
229 _req: crate::model::PublishRequest,
230 _options: crate::RequestOptions,
231 ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
232 Err(crate::Error::service(
233 google_cloud_gax::error::rpc::Status::default()
234 .set_code(google_cloud_gax::error::rpc::Code::Unknown)
235 .set_message("unknown error has occurred"),
236 ))
237 }
238
239 #[track_caller]
240 fn assert_publish_err(got_err: crate::error::PublishError) {
241 assert!(
242 matches!(got_err, crate::error::PublishError::Rpc(_)),
243 "{got_err:?}"
244 );
245 let source = got_err
246 .source()
247 .and_then(|e| e.downcast_ref::<std::sync::Arc<crate::Error>>())
248 .expect("send error should contain a source");
249 assert!(source.status().is_some(), "{got_err:?}");
250 assert_eq!(
251 source.status().unwrap().code,
252 google_cloud_gax::error::rpc::Code::Unknown,
253 "{got_err:?}"
254 );
255 }
256
257 fn generate_random_data() -> String {
258 rand::rng()
259 .sample_iter(&Alphanumeric)
260 .take(16)
261 .map(char::from)
262 .collect()
263 }
264
265 macro_rules! assert_publishing_is_ok {
266 ($publisher:ident, $($ordering_key:expr),+) => {
267 $(
268 let msg = generate_random_data();
269 let got = $publisher
270 .publish(
271 Message::new()
272 .set_ordering_key($ordering_key)
273 .set_data(msg.clone()),
274 )
275 .await;
276 assert_eq!(got?, msg);
277 )+
278 };
279 }
280
281 macro_rules! assert_publishing_is_paused {
282 ($publisher:ident, $($ordering_key:expr),+) => {
283 $(
284 let got_err = $publisher
285 .publish(
286 Message::new()
287 .set_ordering_key($ordering_key)
288 .set_data(generate_random_data()),
289 )
290 .await;
291 assert!(
292 matches!(got_err, Err(crate::error::PublishError::OrderingKeyPaused)),
293 "{got_err:?}"
294 );
295 )+
296 };
297 }
298
299 #[tokio_test_no_panics]
300 async fn publisher_publish_successfully() -> anyhow::Result<()> {
301 let mut mock = MockGapicPublisher::new();
302 mock.expect_publish()
303 .times(2)
304 .withf(|req, _o| req.topic == TOPIC)
305 .returning(publish_ok);
306
307 let client = GapicPublisher::from_stub(mock);
308 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
309 .set_message_count_threshold(1_u32)
310 .build();
311
312 let messages = [
313 Message::new().set_data("hello"),
314 Message::new().set_data("world"),
315 ];
316 let mut handles = Vec::new();
317 for msg in messages {
318 let handle = publisher.publish(msg.clone());
319 handles.push((msg, handle));
320 }
321
322 for (id, rx) in handles.into_iter() {
323 let got = rx.await?;
324 let id = String::from_utf8(id.data.to_vec())?;
325 assert_eq!(got, id);
326 }
327
328 Ok(())
329 }
330
331 #[tokio::test]
332 async fn publisher_publish_large_message() -> anyhow::Result<()> {
333 let mut mock = MockGapicPublisher::new();
334 mock.expect_publish()
335 .withf(|req, _o| req.topic == TOPIC)
336 .returning(publish_ok);
337
338 let client = GapicPublisher::from_stub(mock);
339 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
340 .set_byte_threshold(1_u32)
341 .build();
342 assert_publishing_is_ok!(publisher, "");
343 assert_publishing_is_ok!(publisher, "key");
344
345 Ok(())
346 }
347
348 #[tokio::test(start_paused = true)]
349 async fn worker_handles_forced_shutdown_gracefully() -> anyhow::Result<()> {
350 let mock = MockGapicPublisher::new();
351
352 let client = GapicPublisher::from_stub(mock);
353 let (publisher, background_task_handle) =
354 PublisherPartialBuilder::new(client, TOPIC.to_string())
355 .set_message_count_threshold(100_u32)
356 .build_return_handle();
357
358 let messages = [
359 Message::new().set_data("hello"),
360 Message::new().set_data("world"),
361 ];
362 let mut handles = Vec::new();
363 for msg in messages {
364 let handle = publisher.publish(msg);
365 handles.push(handle);
366 }
367
368 background_task_handle.abort();
369
370 for rx in handles.into_iter() {
371 rx.await
372 .expect_err("expected error when background task canceled");
373 }
374
375 Ok(())
376 }
377
378 #[tokio_test_no_panics(start_paused = true)]
379 async fn dropping_publisher_flushes_pending_messages() -> anyhow::Result<()> {
380 let mut mock = MockGapicPublisher::new();
383 mock.expect_publish()
384 .withf(|req, _o| req.topic == TOPIC)
385 .times(2)
386 .returning(publish_ok);
387
388 let client = GapicPublisher::from_stub(mock);
389 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
390 .set_message_count_threshold(1000_u32)
391 .set_delay_threshold(Duration::from_secs(60))
392 .build();
393
394 let start = tokio::time::Instant::now();
395 let messages = [
396 Message::new().set_data("hello"),
397 Message::new().set_data("world"),
398 Message::new().set_data("hello").set_ordering_key("key"),
399 Message::new().set_data("world").set_ordering_key("key"),
400 ];
401 let mut handles = Vec::new();
402 for msg in messages {
403 let handle = publisher.publish(msg.clone());
404 handles.push((msg, handle));
405 }
406 drop(publisher); for (id, rx) in handles.into_iter() {
409 let got = rx.await?;
410 let id = String::from_utf8(id.data.to_vec())?;
411 assert_eq!(got, id);
412 assert_eq!(start.elapsed(), Duration::ZERO);
413 }
414
415 Ok(())
416 }
417
418 #[tokio_test_no_panics]
419 async fn publisher_handles_publish_errors() -> anyhow::Result<()> {
420 let mut mock = MockGapicPublisher::new();
421 mock.expect_publish()
422 .times(2)
423 .withf(|req, _o| req.topic == TOPIC)
424 .returning(publish_err);
425
426 let client = GapicPublisher::from_stub(mock);
427 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
428 .set_message_count_threshold(1_u32)
429 .build();
430
431 let messages = [
432 Message::new().set_data("hello"),
433 Message::new().set_data("world"),
434 ];
435
436 let mut handles = Vec::new();
437 for msg in messages {
438 let handle = publisher.publish(msg.clone());
439 handles.push(handle);
440 }
441
442 for rx in handles.into_iter() {
443 let got = rx.await;
444 assert!(got.is_err(), "{got:?}");
445 }
446
447 Ok(())
448 }
449
450 #[tokio_test_no_panics(start_paused = true)]
451 async fn flush_sends_pending_messages_immediately() -> anyhow::Result<()> {
452 let mut mock = MockGapicPublisher::new();
453 mock.expect_publish()
454 .withf(|req, _o| req.topic == TOPIC)
455 .returning(publish_ok);
456
457 let client = GapicPublisher::from_stub(mock);
458 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
459 .set_message_count_threshold(1000_u32)
461 .set_delay_threshold(Duration::from_secs(60))
462 .build();
463
464 let start = tokio::time::Instant::now();
465 let messages = [
466 Message::new().set_data("hello"),
467 Message::new().set_data("world"),
468 ];
469 let mut handles = Vec::new();
470 for msg in messages {
471 let handle = publisher.publish(msg.clone());
472 handles.push((msg, handle));
473 }
474
475 publisher.flush().await;
476 assert_eq!(start.elapsed(), Duration::ZERO);
477
478 let post = publisher.publish(Message::new().set_data("after"));
479 for (id, rx) in handles.into_iter() {
480 let got = rx.await?;
481 let id = String::from_utf8(id.data.to_vec())?;
482 assert_eq!(got, id);
483 assert_eq!(start.elapsed(), Duration::ZERO);
484 }
485
486 let got = post.await?;
489 assert_eq!(got, "after");
490 assert_eq!(start.elapsed(), Duration::from_secs(60));
491
492 Ok(())
493 }
494
495 #[tokio_test_no_panics(start_paused = true)]
496 async fn dropping_handles_does_not_prevent_publishing() -> anyhow::Result<()> {
498 let mut mock = MockGapicPublisher::new();
499 mock.expect_publish()
500 .withf(|r, _| {
501 r.messages.len() == 2
502 && r.messages[0].data == "hello"
503 && r.messages[1].data == "world"
504 })
505 .return_once(publish_ok);
506
507 let client = GapicPublisher::from_stub(mock);
508 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
509 .set_message_count_threshold(1000_u32)
511 .set_delay_threshold(Duration::from_secs(60))
512 .build();
513
514 let start = tokio::time::Instant::now();
515 let messages = [
516 Message::new().set_data("hello"),
517 Message::new().set_data("world"),
518 ];
519 for msg in messages {
520 let handle = publisher.publish(msg.clone());
521 drop(handle);
522 }
523
524 publisher.flush().await;
525 assert_eq!(start.elapsed(), Duration::ZERO);
526
527 Ok(())
528 }
529
530 #[tokio::test(start_paused = true)]
531 async fn flush_with_no_messages_is_noop() -> anyhow::Result<()> {
532 let mock = MockGapicPublisher::new();
533
534 let client = GapicPublisher::from_stub(mock);
535 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
536
537 let start = tokio::time::Instant::now();
538 publisher.flush().await;
539 assert_eq!(start.elapsed(), Duration::ZERO);
540
541 Ok(())
542 }
543
544 #[tokio_test_no_panics]
545 async fn batch_sends_on_message_count_threshold_success() -> anyhow::Result<()> {
546 let mut mock = MockGapicPublisher::new();
548 mock.expect_publish()
549 .withf(|r, _| r.messages.len() == 2)
550 .return_once(publish_ok);
551
552 let client = GapicPublisher::from_stub(mock);
553 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
554 .set_message_count_threshold(2_u32)
555 .set_byte_threshold(MAX_BYTES)
556 .set_delay_threshold(std::time::Duration::MAX)
557 .build();
558
559 let messages = [
560 Message::new().set_data("hello"),
561 Message::new().set_data("world"),
562 ];
563 let mut handles = Vec::new();
564 for msg in messages {
565 let handle = publisher.publish(msg.clone());
566 handles.push((msg, handle));
567 }
568
569 for (id, rx) in handles.into_iter() {
570 let got = rx.await?;
571 let id = String::from_utf8(id.data.to_vec())?;
572 assert_eq!(got, id);
573 }
574
575 Ok(())
576 }
577
578 #[tokio_test_no_panics]
579 async fn batch_sends_on_message_count_threshold_error() -> anyhow::Result<()> {
580 let mut mock = MockGapicPublisher::new();
582 mock.expect_publish()
583 .withf(|r, _| r.messages.len() == 2)
584 .return_once(publish_err);
585
586 let client = GapicPublisher::from_stub(mock);
587 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
588 .set_message_count_threshold(2_u32)
589 .set_byte_threshold(MAX_BYTES)
590 .set_delay_threshold(std::time::Duration::MAX)
591 .build();
592
593 let messages = [
594 Message::new().set_data("hello"),
595 Message::new().set_data("world"),
596 ];
597 let mut handles = Vec::new();
598 for msg in messages {
599 let handle = publisher.publish(msg.clone());
600 handles.push(handle);
601 }
602
603 for rx in handles.into_iter() {
604 let got = rx.await;
605 assert!(got.is_err(), "{got:?}");
606 }
607
608 Ok(())
609 }
610
611 #[tokio_test_no_panics(start_paused = true)]
612 async fn batch_sends_on_byte_threshold() -> anyhow::Result<()> {
613 let mut mock = MockGapicPublisher::new();
615 mock.expect_publish()
616 .withf(|r, _| r.messages.len() == 1)
617 .times(2)
618 .returning(publish_ok);
619
620 let client = GapicPublisher::from_stub(mock);
621 let byte_threshold = TOPIC.len() + "hello".len() + "key".len() + 1;
623 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
624 .set_message_count_threshold(MAX_MESSAGES)
625 .set_byte_threshold(byte_threshold as u32)
626 .set_delay_threshold(std::time::Duration::MAX)
627 .build();
628
629 let handle = publisher.publish(Message::new().set_data("hello"));
631 let _handle = publisher.publish(Message::new().set_data("world"));
633 assert_eq!(handle.await?, "hello");
634
635 let handle = publisher.publish(Message::new().set_data("hello").set_ordering_key("key"));
637 let _handle = publisher.publish(Message::new().set_data("world").set_ordering_key("key"));
639 assert_eq!(handle.await?, "hello");
640
641 Ok(())
642 }
643
644 #[tokio_test_no_panics(start_paused = true)]
645 async fn batch_sends_on_delay_threshold() -> anyhow::Result<()> {
646 let mut mock = MockGapicPublisher::new();
647 mock.expect_publish()
648 .withf(|req, _| req.topic == TOPIC)
649 .returning(publish_ok);
650
651 let client = GapicPublisher::from_stub(mock);
652 let delay = std::time::Duration::from_millis(10);
653 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
654 .set_message_count_threshold(u32::MAX)
655 .set_byte_threshold(MAX_BYTES)
656 .set_delay_threshold(delay)
657 .build();
658
659 for _ in 0..3 {
661 let start = tokio::time::Instant::now();
662 let messages = [
663 Message::new().set_data("hello 0"),
664 Message::new().set_data("hello 1"),
665 Message::new()
666 .set_data("hello 2")
667 .set_ordering_key("ordering key 1"),
668 Message::new()
669 .set_data("hello 3")
670 .set_ordering_key("ordering key 2"),
671 ];
672 let mut handles = Vec::new();
673 for msg in messages {
674 let handle = publisher.publish(msg.clone());
675 handles.push((msg, handle));
676 }
677
678 for (id, rx) in handles.into_iter() {
679 let got = rx.await?;
680 let id = String::from_utf8(id.data.to_vec())?;
681 assert_eq!(got, id);
682 assert_eq!(
683 start.elapsed(),
684 delay,
685 "batch of messages should have sent after {:?}",
686 delay
687 )
688 }
689 }
690
691 Ok(())
692 }
693
694 #[tokio::test(start_paused = true)]
695 #[allow(clippy::get_first)]
696 async fn batching_separates_by_ordering_key() -> anyhow::Result<()> {
697 let mut mock = MockGapicPublisher::new();
699 mock.expect_publish()
700 .withf(|r, _| {
701 r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
702 })
703 .returning(publish_ok);
704
705 let client = GapicPublisher::from_stub(mock);
706 let message_count_threshold = 2_u32;
708 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
709 .set_message_count_threshold(message_count_threshold)
710 .set_byte_threshold(MAX_BYTES)
711 .set_delay_threshold(std::time::Duration::MAX)
712 .build();
713
714 let num_ordering_keys = 3;
715 let mut messages = Vec::new();
716 for i in 0..(2 * message_count_threshold * num_ordering_keys) {
720 messages.push(
721 Message::new()
722 .set_data(format!("test message {}", i))
723 .set_ordering_key(format!("ordering key: {}", i % num_ordering_keys)),
724 );
725 }
726 let mut handles = Vec::new();
727 for msg in messages {
728 let handle = publisher.publish(msg.clone());
729 handles.push((msg, handle));
730 }
731
732 for (id, rx) in handles.into_iter() {
733 let got = rx.await?;
734 let id = String::from_utf8(id.data.to_vec())?;
735 assert_eq!(got, id);
736 }
737
738 Ok(())
739 }
740
741 #[tokio_test_no_panics(start_paused = true)]
742 #[allow(clippy::get_first)]
743 async fn batching_handles_empty_ordering_key() -> anyhow::Result<()> {
744 let mut mock = MockGapicPublisher::new();
746 mock.expect_publish()
747 .withf(|r, _| {
748 r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
749 })
750 .returning(publish_ok);
751
752 let client = GapicPublisher::from_stub(mock);
753 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
755 .set_message_count_threshold(2_u32)
756 .set_byte_threshold(MAX_BYTES)
757 .set_delay_threshold(std::time::Duration::MAX)
758 .build();
759
760 let messages = [
761 Message::new().set_data("hello 1"),
762 Message::new().set_data("hello 2").set_ordering_key(""),
763 Message::new()
764 .set_data("hello 3")
765 .set_ordering_key("ordering key :1"),
766 Message::new()
767 .set_data("hello 4")
768 .set_ordering_key("ordering key :1"),
769 ];
770
771 let mut handles = Vec::new();
772 for msg in messages {
773 let handle = publisher.publish(msg.clone());
774 handles.push((msg, handle));
775 }
776
777 for (id, rx) in handles.into_iter() {
778 let got = rx.await?;
779 let id = String::from_utf8(id.data.to_vec())?;
780 assert_eq!(got, id);
781 }
782
783 Ok(())
784 }
785
786 #[tokio_test_no_panics(start_paused = true)]
787 #[allow(clippy::get_first)]
788 async fn ordering_key_limits_to_one_outstanding_batch() -> anyhow::Result<()> {
789 let mut seq = Sequence::new();
793 let mut mock = MockGapicPublisherWithFuture::new();
794 mock.expect_publish()
795 .times(1)
796 .in_sequence(&mut seq)
797 .withf(|r, _| r.messages.len() == 1)
798 .returning({
799 |r, o| {
800 Box::pin(async move {
801 tokio::time::sleep(Duration::from_millis(10)).await;
802 publish_ok(r, o)
803 })
804 }
805 });
806
807 mock.expect_publish()
808 .times(1)
809 .in_sequence(&mut seq)
810 .withf(|r, _| r.messages.len() == 1)
811 .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
812
813 let client = GapicPublisher::from_stub(mock);
814 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
816 .set_message_count_threshold(1_u32)
817 .set_byte_threshold(MAX_BYTES)
818 .set_delay_threshold(std::time::Duration::MAX)
819 .build();
820
821 let messages = [
822 Message::new()
823 .set_data("hello 1")
824 .set_ordering_key("ordering key"),
825 Message::new()
826 .set_data("hello 2")
827 .set_ordering_key("ordering key"),
828 ];
829
830 let start = tokio::time::Instant::now();
831 let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
832 let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
833 assert_eq!(msg2_handle.await?, "hello 2");
834 assert_eq!(
835 start.elapsed(),
836 Duration::from_millis(10),
837 "the second batch of messages should have sent after the first which is has been delayed by {:?}",
838 Duration::from_millis(10)
839 );
840 assert_eq!(msg1_handle.await?, "hello 1");
842
843 Ok(())
844 }
845
846 #[tokio_test_no_panics(start_paused = true)]
847 #[allow(clippy::get_first)]
848 async fn empty_ordering_key_allows_concurrent_batches() -> anyhow::Result<()> {
849 let mut seq = Sequence::new();
854 let mut mock = MockGapicPublisherWithFuture::new();
855 mock.expect_publish()
856 .times(1)
857 .in_sequence(&mut seq)
858 .withf(|r, _| r.messages.len() == 1)
859 .returning(|r, o| {
860 Box::pin(async move {
861 tokio::time::sleep(Duration::from_millis(10)).await;
862 publish_ok(r, o)
863 })
864 });
865
866 mock.expect_publish()
867 .times(1)
868 .in_sequence(&mut seq)
869 .withf(|r, _| r.topic == TOPIC && r.messages.len() == 1)
870 .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
871
872 let client = GapicPublisher::from_stub(mock);
873 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
875 .set_message_count_threshold(1_u32)
876 .set_byte_threshold(MAX_BYTES)
877 .set_delay_threshold(std::time::Duration::MAX)
878 .build();
879
880 let messages = [
881 Message::new().set_data("hello 1").set_ordering_key(""),
882 Message::new().set_data("hello 2").set_ordering_key(""),
883 ];
884
885 let start = tokio::time::Instant::now();
886 let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
887 let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
888 assert_eq!(msg2_handle.await?, "hello 2");
889 assert_eq!(
890 start.elapsed(),
891 Duration::from_millis(0),
892 "the second batch of messages should have sent without any delay"
893 );
894 assert_eq!(msg1_handle.await?, "hello 1");
896
897 Ok(())
898 }
899
900 #[tokio_test_no_panics(start_paused = true)]
901 async fn ordering_key_error_pauses_publisher() -> anyhow::Result<()> {
902 let mut seq = Sequence::new();
904 let mut mock = MockGapicPublisher::new();
905 mock.expect_publish()
906 .withf(|req, _o| req.topic == TOPIC)
907 .times(1)
908 .in_sequence(&mut seq)
909 .returning(publish_err);
910
911 mock.expect_publish()
912 .withf(|req, _o| req.topic == TOPIC)
913 .times(2)
914 .in_sequence(&mut seq)
915 .returning(publish_ok);
916
917 let client = GapicPublisher::from_stub(mock);
918 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
919 .set_message_count_threshold(1_u32)
920 .build();
921
922 let key = "ordering_key";
923 let msg_0_handle =
924 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
925 let msg_1_handle =
927 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
928
929 let mut got_err = msg_0_handle.await.unwrap_err();
931 assert_publish_err(got_err);
932
933 got_err = msg_1_handle.await.unwrap_err();
935 assert!(
936 matches!(got_err, crate::error::PublishError::OrderingKeyPaused),
937 "{got_err:?}"
938 );
939
940 for _ in 0..3 {
942 assert_publishing_is_paused!(publisher, key);
943 }
944
945 assert_publishing_is_ok!(publisher, "", "without_error");
947
948 Ok(())
949 }
950
951 #[tokio_test_no_panics(start_paused = true)]
952 async fn batch_error_pauses_ordering_key() -> anyhow::Result<()> {
953 let mut mock = MockGapicPublisher::new();
955 mock.expect_publish()
956 .times(1)
957 .withf(|r, _| r.topic == TOPIC && r.messages.len() == 2)
958 .returning(publish_err);
959
960 let client = GapicPublisher::from_stub(mock);
961 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
962 .set_message_count_threshold(2_u32)
963 .build();
964
965 let key = "ordering_key";
966 let msg_0_handle =
968 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
969 let msg_1_handle =
970 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
971
972 let mut got_err = msg_0_handle.await.unwrap_err();
974 assert_publish_err(got_err);
975 got_err = msg_1_handle.await.unwrap_err();
976 assert_publish_err(got_err);
977
978 assert_publishing_is_paused!(publisher, key);
980
981 Ok(())
982 }
983
984 #[tokio_test_no_panics(start_paused = true)]
985 async fn flush_on_paused_ordering_key_returns_error() -> anyhow::Result<()> {
986 let mut seq = Sequence::new();
988 let mut mock = MockGapicPublisher::new();
989 mock.expect_publish()
990 .withf(|req, _o| req.topic == TOPIC)
991 .times(1)
992 .in_sequence(&mut seq)
993 .returning(publish_err);
994
995 mock.expect_publish()
996 .withf(|req, _o| req.topic == TOPIC)
997 .times(2)
998 .in_sequence(&mut seq)
999 .returning(publish_ok);
1000
1001 let client = GapicPublisher::from_stub(mock);
1002 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1003
1004 let key = "ordering_key";
1005 let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1007 publisher.flush().await;
1008 let got_err = handle.await.unwrap_err();
1010 assert_publish_err(got_err);
1011
1012 assert_publishing_is_paused!(publisher, key);
1014
1015 assert_publishing_is_ok!(publisher, "", "without_error");
1017
1018 Ok(())
1019 }
1020
1021 #[tokio_test_no_panics(start_paused = true)]
1022 async fn resuming_non_paused_ordering_key_is_noop() -> anyhow::Result<()> {
1023 let mut mock = MockGapicPublisher::new();
1024 mock.expect_publish()
1025 .withf(|req, _o| req.topic == TOPIC)
1026 .times(4)
1027 .returning(publish_ok);
1028
1029 let client = GapicPublisher::from_stub(mock);
1030 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1031
1032 publisher.resume_publish("");
1034 assert_publishing_is_ok!(publisher, "");
1035
1036 publisher.resume_publish("");
1038 assert_publishing_is_ok!(publisher, "");
1039
1040 let key = "without_error";
1042 publisher.resume_publish(key);
1043 assert_publishing_is_ok!(publisher, key);
1044
1045 publisher.resume_publish(key);
1047 assert_publishing_is_ok!(publisher, key);
1048
1049 Ok(())
1050 }
1051
1052 #[tokio_test_no_panics(start_paused = true)]
1053 async fn resuming_paused_ordering_key_allows_publishing() -> anyhow::Result<()> {
1054 let mut seq = Sequence::new();
1055 let mut mock = MockGapicPublisher::new();
1056 mock.expect_publish()
1057 .withf(|req, _o| req.topic == TOPIC)
1058 .times(1)
1059 .in_sequence(&mut seq)
1060 .returning(publish_err);
1061
1062 mock.expect_publish()
1063 .withf(|req, _o| req.topic == TOPIC)
1064 .times(3)
1065 .in_sequence(&mut seq)
1066 .returning(publish_ok);
1067
1068 let client = GapicPublisher::from_stub(mock);
1069 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1070
1071 let key = "ordering_key";
1072 let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1074 let got_err = handle.await.unwrap_err();
1076 assert_publish_err(got_err);
1077
1078 assert_publishing_is_paused!(publisher, key);
1080
1081 publisher.resume_publish(key);
1083 assert_publishing_is_ok!(publisher, key);
1084
1085 assert_publishing_is_ok!(publisher, "", "without_error");
1087
1088 Ok(())
1089 }
1090
1091 #[tokio_test_no_panics(start_paused = true)]
1092 async fn resuming_ordering_key_twice_is_safe() -> anyhow::Result<()> {
1093 let mut seq = Sequence::new();
1095 let mut mock = MockGapicPublisher::new();
1096 mock.expect_publish()
1097 .withf(|req, _o| req.topic == TOPIC)
1098 .in_sequence(&mut seq)
1099 .times(1)
1100 .returning(publish_err);
1101
1102 mock.expect_publish()
1103 .withf(|req, _o| req.topic == TOPIC)
1104 .in_sequence(&mut seq)
1105 .return_once(publish_ok);
1106
1107 let client = GapicPublisher::from_stub(mock);
1108 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1109
1110 let key = "ordering_key";
1111 let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1113 publisher.flush().await;
1114 let got_err = handle.await.unwrap_err();
1116 assert_publish_err(got_err);
1117
1118 assert_publishing_is_paused!(publisher, key);
1120
1121 publisher.resume_publish(key);
1123 publisher.resume_publish(key);
1124 assert_publishing_is_ok!(publisher, key);
1125
1126 Ok(())
1127 }
1128
1129 #[tokio_test_no_panics(start_paused = true)]
1130 async fn resuming_one_ordering_key_does_not_resume_others() -> anyhow::Result<()> {
1131 let mut seq = Sequence::new();
1133 let mut mock = MockGapicPublisher::new();
1134 mock.expect_publish()
1135 .withf(|req, _o| req.topic == TOPIC)
1136 .times(2)
1137 .in_sequence(&mut seq)
1138 .returning(publish_err);
1139
1140 mock.expect_publish()
1141 .withf(|req, _o| req.topic == TOPIC)
1142 .times(1)
1143 .in_sequence(&mut seq)
1144 .returning(publish_ok);
1145
1146 let client = GapicPublisher::from_stub(mock);
1147 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1148
1149 let key_0 = "ordering_key_0";
1150 let key_1 = "ordering_key_1";
1151 let handle_0 = publisher.publish(Message::new().set_ordering_key(key_0).set_data("msg 0"));
1153 let handle_1 = publisher.publish(Message::new().set_ordering_key(key_1).set_data("msg 1"));
1154 publisher.flush().await;
1155 let mut got_err = handle_0.await.unwrap_err();
1156 assert_publish_err(got_err);
1157 got_err = handle_1.await.unwrap_err();
1158 assert_publish_err(got_err);
1159
1160 assert_publishing_is_paused!(publisher, key_0, key_1);
1162
1163 publisher.resume_publish(key_0);
1165
1166 assert_publishing_is_ok!(publisher, key_0);
1168
1169 assert_publishing_is_paused!(publisher, key_1);
1171
1172 Ok(())
1173 }
1174
1175 #[tokio::test]
1176 async fn publisher_builder_clamps_batching_options() -> anyhow::Result<()> {
1177 let oversized_options = BatchingOptions::new()
1179 .set_delay_threshold(MAX_DELAY + Duration::from_secs(1))
1180 .set_message_count_threshold(MAX_MESSAGES + 1)
1181 .set_byte_threshold(MAX_BYTES + 1);
1182
1183 let publishers = vec![
1184 BasePublisher::builder()
1185 .build()
1186 .await?
1187 .publisher("projects/my-project/topics/my-topic")
1188 .set_delay_threshold(oversized_options.delay_threshold)
1189 .set_message_count_threshold(oversized_options.message_count_threshold)
1190 .set_byte_threshold(oversized_options.byte_threshold)
1191 .build(),
1192 Publisher::builder("projects/my-project/topics/my-topic".to_string())
1193 .set_delay_threshold(oversized_options.delay_threshold)
1194 .set_message_count_threshold(oversized_options.message_count_threshold)
1195 .set_byte_threshold(oversized_options.byte_threshold)
1196 .build()
1197 .await?,
1198 ];
1199
1200 for publisher in publishers {
1201 let got = publisher.batching_options;
1202 assert_eq!(got.delay_threshold, MAX_DELAY);
1203 assert_eq!(got.message_count_threshold, MAX_MESSAGES);
1204 assert_eq!(got.byte_threshold, MAX_BYTES);
1205 }
1206
1207 let normal_options = BatchingOptions::new()
1209 .set_delay_threshold(Duration::from_secs(10))
1210 .set_message_count_threshold(10_u32)
1211 .set_byte_threshold(100_u32);
1212
1213 let publishers = vec![
1214 BasePublisher::builder()
1215 .build()
1216 .await?
1217 .publisher("projects/my-project/topics/my-topic")
1218 .set_delay_threshold(normal_options.delay_threshold)
1219 .set_message_count_threshold(normal_options.message_count_threshold)
1220 .set_byte_threshold(normal_options.byte_threshold)
1221 .build(),
1222 Publisher::builder("projects/my-project/topics/my-topic".to_string())
1223 .set_delay_threshold(normal_options.delay_threshold)
1224 .set_message_count_threshold(normal_options.message_count_threshold)
1225 .set_byte_threshold(normal_options.byte_threshold)
1226 .build()
1227 .await?,
1228 ];
1229
1230 for publisher in publishers {
1231 let got = publisher.batching_options;
1232
1233 assert_eq!(got.delay_threshold, normal_options.delay_threshold);
1234 assert_eq!(
1235 got.message_count_threshold,
1236 normal_options.message_count_threshold
1237 );
1238 assert_eq!(got.byte_threshold, normal_options.byte_threshold);
1239 }
1240 Ok(())
1241 }
1242}