1use super::options::BatchingOptions;
16use crate::publisher::actor::BundledMessage;
17use crate::publisher::actor::ToDispatcher;
18use crate::publisher::builder::PublisherBuilder;
19
20use tokio::sync::mpsc::UnboundedSender;
21use tokio::sync::oneshot;
22
23pub use super::base_publisher::BasePublisher;
24
25#[derive(Debug, Clone)]
66pub struct Publisher {
67 #[allow(dead_code)]
68 pub(crate) batching_options: BatchingOptions,
69 pub(crate) tx: UnboundedSender<ToDispatcher>,
70}
71
72impl Publisher {
73 pub fn builder<T: Into<String>>(topic: T) -> PublisherBuilder {
85 PublisherBuilder::new(topic.into())
86 }
87
88 #[must_use = "ignoring the publish result may lead to undetected delivery failures"]
102 pub fn publish(&self, msg: crate::model::Message) -> crate::publisher::PublishFuture {
103 let (tx, rx) = tokio::sync::oneshot::channel();
104
105 if self
109 .tx
110 .send(ToDispatcher::Publish(BundledMessage { msg, tx }))
111 .is_err()
112 {
113 }
115 crate::publisher::PublishFuture { rx }
116 }
117
118 pub async fn flush(&self) {
143 let (tx, rx) = oneshot::channel();
144 if self.tx.send(ToDispatcher::Flush(tx)).is_ok() {
145 let _ = rx.await;
146 }
147 }
148
149 pub fn resume_publish<T: std::convert::Into<std::string::String>>(&self, ordering_key: T) {
167 let _ = self
168 .tx
169 .send(ToDispatcher::ResumePublish(ordering_key.into()));
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use crate::publisher::builder::PublisherPartialBuilder;
177 use crate::publisher::client::BasePublisher;
178 use crate::publisher::constants::*;
179 use crate::publisher::options::BatchingOptions;
180 use crate::{
181 generated::gapic_dataplane::client::Publisher as GapicPublisher,
182 model::{Message, PublishResponse},
183 };
184 use google_cloud_test_macros::tokio_test_no_panics;
185 use mockall::Sequence;
186 use rand::{RngExt, distr::Alphanumeric};
187 use std::error::Error;
188 use std::time::Duration;
189
190 static TOPIC: &str = "my-topic";
191
192 mockall::mock! {
193 #[derive(Debug)]
194 GapicPublisher {}
195 impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisher {
196 async fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> crate::Result<crate::Response<crate::model::PublishResponse>>;
197 }
198 }
199
200 mockall::mock! {
207 #[derive(Debug)]
208 GapicPublisherWithFuture {}
209 impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
210 fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> impl Future<Output=crate::Result<crate::Response<crate::model::PublishResponse>>> + Send;
211 }
212 }
213
214 fn publish_ok(
215 req: crate::model::PublishRequest,
216 _options: crate::RequestOptions,
217 ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
218 let ids = req
219 .messages
220 .iter()
221 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
222 Ok(crate::Response::from(
223 PublishResponse::new().set_message_ids(ids),
224 ))
225 }
226
227 fn publish_err(
228 _req: crate::model::PublishRequest,
229 _options: crate::RequestOptions,
230 ) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
231 Err(crate::Error::service(
232 google_cloud_gax::error::rpc::Status::default()
233 .set_code(google_cloud_gax::error::rpc::Code::Unknown)
234 .set_message("unknown error has occurred"),
235 ))
236 }
237
238 #[track_caller]
239 fn assert_publish_err(got_err: crate::error::PublishError) {
240 assert!(
241 matches!(got_err, crate::error::PublishError::Rpc(_)),
242 "{got_err:?}"
243 );
244 let source = got_err
245 .source()
246 .and_then(|e| e.downcast_ref::<std::sync::Arc<crate::Error>>())
247 .expect("send error should contain a source");
248 assert!(source.status().is_some(), "{got_err:?}");
249 assert_eq!(
250 source.status().unwrap().code,
251 google_cloud_gax::error::rpc::Code::Unknown,
252 "{got_err:?}"
253 );
254 }
255
256 fn generate_random_data() -> String {
257 rand::rng()
258 .sample_iter(&Alphanumeric)
259 .take(16)
260 .map(char::from)
261 .collect()
262 }
263
264 macro_rules! assert_publishing_is_ok {
265 ($publisher:ident, $($ordering_key:expr),+) => {
266 $(
267 let msg = generate_random_data();
268 let got = $publisher
269 .publish(
270 Message::new()
271 .set_ordering_key($ordering_key)
272 .set_data(msg.clone()),
273 )
274 .await;
275 assert_eq!(got?, msg);
276 )+
277 };
278 }
279
280 macro_rules! assert_publishing_is_paused {
281 ($publisher:ident, $($ordering_key:expr),+) => {
282 $(
283 let got_err = $publisher
284 .publish(
285 Message::new()
286 .set_ordering_key($ordering_key)
287 .set_data(generate_random_data()),
288 )
289 .await;
290 assert!(
291 matches!(got_err, Err(crate::error::PublishError::OrderingKeyPaused)),
292 "{got_err:?}"
293 );
294 )+
295 };
296 }
297
298 #[tokio_test_no_panics]
299 async fn publisher_publish_successfully() -> anyhow::Result<()> {
300 let mut mock = MockGapicPublisher::new();
301 mock.expect_publish()
302 .times(2)
303 .withf(|req, _o| req.topic == TOPIC)
304 .returning(publish_ok);
305
306 let client = GapicPublisher::from_stub(mock);
307 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
308 .set_message_count_threshold(1_u32)
309 .build();
310
311 let messages = [
312 Message::new().set_data("hello"),
313 Message::new().set_data("world"),
314 ];
315 let mut handles = Vec::new();
316 for msg in messages {
317 let handle = publisher.publish(msg.clone());
318 handles.push((msg, handle));
319 }
320
321 for (id, rx) in handles.into_iter() {
322 let got = rx.await?;
323 let id = String::from_utf8(id.data.to_vec())?;
324 assert_eq!(got, id);
325 }
326
327 Ok(())
328 }
329
330 #[tokio::test]
331 async fn publisher_publish_large_message() -> anyhow::Result<()> {
332 let mut mock = MockGapicPublisher::new();
333 mock.expect_publish()
334 .withf(|req, _o| req.topic == TOPIC)
335 .returning(publish_ok);
336
337 let client = GapicPublisher::from_stub(mock);
338 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
339 .set_byte_threshold(1_u32)
340 .build();
341 assert_publishing_is_ok!(publisher, "");
342 assert_publishing_is_ok!(publisher, "key");
343
344 Ok(())
345 }
346
347 #[tokio::test(start_paused = true)]
348 async fn worker_handles_forced_shutdown_gracefully() -> anyhow::Result<()> {
349 let mock = MockGapicPublisher::new();
350
351 let client = GapicPublisher::from_stub(mock);
352 let (publisher, background_task_handle) =
353 PublisherPartialBuilder::new(client, TOPIC.to_string())
354 .set_message_count_threshold(100_u32)
355 .build_return_handle();
356
357 let messages = [
358 Message::new().set_data("hello"),
359 Message::new().set_data("world"),
360 ];
361 let mut handles = Vec::new();
362 for msg in messages {
363 let handle = publisher.publish(msg);
364 handles.push(handle);
365 }
366
367 background_task_handle.abort();
368
369 for rx in handles.into_iter() {
370 rx.await
371 .expect_err("expected error when background task canceled");
372 }
373
374 Ok(())
375 }
376
377 #[tokio_test_no_panics(start_paused = true)]
378 async fn dropping_publisher_flushes_pending_messages() -> anyhow::Result<()> {
379 let mut mock = MockGapicPublisher::new();
382 mock.expect_publish()
383 .withf(|req, _o| req.topic == TOPIC)
384 .times(2)
385 .returning(publish_ok);
386
387 let client = GapicPublisher::from_stub(mock);
388 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
389 .set_message_count_threshold(1000_u32)
390 .set_delay_threshold(Duration::from_secs(60))
391 .build();
392
393 let start = tokio::time::Instant::now();
394 let messages = [
395 Message::new().set_data("hello"),
396 Message::new().set_data("world"),
397 Message::new().set_data("hello").set_ordering_key("key"),
398 Message::new().set_data("world").set_ordering_key("key"),
399 ];
400 let mut handles = Vec::new();
401 for msg in messages {
402 let handle = publisher.publish(msg.clone());
403 handles.push((msg, handle));
404 }
405 drop(publisher); for (id, rx) in handles.into_iter() {
408 let got = rx.await?;
409 let id = String::from_utf8(id.data.to_vec())?;
410 assert_eq!(got, id);
411 assert_eq!(start.elapsed(), Duration::ZERO);
412 }
413
414 Ok(())
415 }
416
417 #[tokio_test_no_panics]
418 async fn publisher_handles_publish_errors() -> anyhow::Result<()> {
419 let mut mock = MockGapicPublisher::new();
420 mock.expect_publish()
421 .times(2)
422 .withf(|req, _o| req.topic == TOPIC)
423 .returning(publish_err);
424
425 let client = GapicPublisher::from_stub(mock);
426 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
427 .set_message_count_threshold(1_u32)
428 .build();
429
430 let messages = [
431 Message::new().set_data("hello"),
432 Message::new().set_data("world"),
433 ];
434
435 let mut handles = Vec::new();
436 for msg in messages {
437 let handle = publisher.publish(msg.clone());
438 handles.push(handle);
439 }
440
441 for rx in handles.into_iter() {
442 let got = rx.await;
443 assert!(got.is_err(), "{got:?}");
444 }
445
446 Ok(())
447 }
448
449 #[tokio_test_no_panics(start_paused = true)]
450 async fn flush_sends_pending_messages_immediately() -> anyhow::Result<()> {
451 let mut mock = MockGapicPublisher::new();
452 mock.expect_publish()
453 .withf(|req, _o| req.topic == TOPIC)
454 .returning(publish_ok);
455
456 let client = GapicPublisher::from_stub(mock);
457 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
458 .set_message_count_threshold(1000_u32)
460 .set_delay_threshold(Duration::from_secs(60))
461 .build();
462
463 let start = tokio::time::Instant::now();
464 let messages = [
465 Message::new().set_data("hello"),
466 Message::new().set_data("world"),
467 ];
468 let mut handles = Vec::new();
469 for msg in messages {
470 let handle = publisher.publish(msg.clone());
471 handles.push((msg, handle));
472 }
473
474 publisher.flush().await;
475 assert_eq!(start.elapsed(), Duration::ZERO);
476
477 let post = publisher.publish(Message::new().set_data("after"));
478 for (id, rx) in handles.into_iter() {
479 let got = rx.await?;
480 let id = String::from_utf8(id.data.to_vec())?;
481 assert_eq!(got, id);
482 assert_eq!(start.elapsed(), Duration::ZERO);
483 }
484
485 let got = post.await?;
488 assert_eq!(got, "after");
489 assert_eq!(start.elapsed(), Duration::from_secs(60));
490
491 Ok(())
492 }
493
494 #[tokio_test_no_panics(start_paused = true)]
495 async fn dropping_handles_does_not_prevent_publishing() -> anyhow::Result<()> {
497 let mut mock = MockGapicPublisher::new();
498 mock.expect_publish()
499 .withf(|r, _| {
500 r.messages.len() == 2
501 && r.messages[0].data == "hello"
502 && r.messages[1].data == "world"
503 })
504 .return_once(publish_ok);
505
506 let client = GapicPublisher::from_stub(mock);
507 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
508 .set_message_count_threshold(1000_u32)
510 .set_delay_threshold(Duration::from_secs(60))
511 .build();
512
513 let start = tokio::time::Instant::now();
514 let messages = [
515 Message::new().set_data("hello"),
516 Message::new().set_data("world"),
517 ];
518 for msg in messages {
519 let handle = publisher.publish(msg.clone());
520 drop(handle);
521 }
522
523 publisher.flush().await;
524 assert_eq!(start.elapsed(), Duration::ZERO);
525
526 Ok(())
527 }
528
529 #[tokio::test(start_paused = true)]
530 async fn flush_with_no_messages_is_noop() -> anyhow::Result<()> {
531 let mock = MockGapicPublisher::new();
532
533 let client = GapicPublisher::from_stub(mock);
534 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
535
536 let start = tokio::time::Instant::now();
537 publisher.flush().await;
538 assert_eq!(start.elapsed(), Duration::ZERO);
539
540 Ok(())
541 }
542
543 #[tokio_test_no_panics]
544 async fn batch_sends_on_message_count_threshold_success() -> anyhow::Result<()> {
545 let mut mock = MockGapicPublisher::new();
547 mock.expect_publish()
548 .withf(|r, _| r.messages.len() == 2)
549 .return_once(publish_ok);
550
551 let client = GapicPublisher::from_stub(mock);
552 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
553 .set_message_count_threshold(2_u32)
554 .set_byte_threshold(MAX_BYTES)
555 .set_delay_threshold(std::time::Duration::MAX)
556 .build();
557
558 let messages = [
559 Message::new().set_data("hello"),
560 Message::new().set_data("world"),
561 ];
562 let mut handles = Vec::new();
563 for msg in messages {
564 let handle = publisher.publish(msg.clone());
565 handles.push((msg, handle));
566 }
567
568 for (id, rx) in handles.into_iter() {
569 let got = rx.await?;
570 let id = String::from_utf8(id.data.to_vec())?;
571 assert_eq!(got, id);
572 }
573
574 Ok(())
575 }
576
577 #[tokio_test_no_panics]
578 async fn batch_sends_on_message_count_threshold_error() -> anyhow::Result<()> {
579 let mut mock = MockGapicPublisher::new();
581 mock.expect_publish()
582 .withf(|r, _| r.messages.len() == 2)
583 .return_once(publish_err);
584
585 let client = GapicPublisher::from_stub(mock);
586 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
587 .set_message_count_threshold(2_u32)
588 .set_byte_threshold(MAX_BYTES)
589 .set_delay_threshold(std::time::Duration::MAX)
590 .build();
591
592 let messages = [
593 Message::new().set_data("hello"),
594 Message::new().set_data("world"),
595 ];
596 let mut handles = Vec::new();
597 for msg in messages {
598 let handle = publisher.publish(msg.clone());
599 handles.push(handle);
600 }
601
602 for rx in handles.into_iter() {
603 let got = rx.await;
604 assert!(got.is_err(), "{got:?}");
605 }
606
607 Ok(())
608 }
609
610 #[tokio_test_no_panics(start_paused = true)]
611 async fn batch_sends_on_byte_threshold() -> anyhow::Result<()> {
612 let mut mock = MockGapicPublisher::new();
614 mock.expect_publish()
615 .withf(|r, _| r.messages.len() == 1)
616 .times(2)
617 .returning(publish_ok);
618
619 let client = GapicPublisher::from_stub(mock);
620 let byte_threshold = TOPIC.len() + "hello".len() + "key".len() + 1;
622 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
623 .set_message_count_threshold(MAX_MESSAGES)
624 .set_byte_threshold(byte_threshold as u32)
625 .set_delay_threshold(std::time::Duration::MAX)
626 .build();
627
628 let handle = publisher.publish(Message::new().set_data("hello"));
630 let _handle = publisher.publish(Message::new().set_data("world"));
632 assert_eq!(handle.await?, "hello");
633
634 let handle = publisher.publish(Message::new().set_data("hello").set_ordering_key("key"));
636 let _handle = publisher.publish(Message::new().set_data("world").set_ordering_key("key"));
638 assert_eq!(handle.await?, "hello");
639
640 Ok(())
641 }
642
643 #[tokio_test_no_panics(start_paused = true)]
644 async fn batch_sends_on_delay_threshold() -> anyhow::Result<()> {
645 let mut mock = MockGapicPublisher::new();
646 mock.expect_publish()
647 .withf(|req, _| req.topic == TOPIC)
648 .returning(publish_ok);
649
650 let client = GapicPublisher::from_stub(mock);
651 let delay = std::time::Duration::from_millis(10);
652 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
653 .set_message_count_threshold(u32::MAX)
654 .set_byte_threshold(MAX_BYTES)
655 .set_delay_threshold(delay)
656 .build();
657
658 for _ in 0..3 {
660 let start = tokio::time::Instant::now();
661 let messages = [
662 Message::new().set_data("hello 0"),
663 Message::new().set_data("hello 1"),
664 Message::new()
665 .set_data("hello 2")
666 .set_ordering_key("ordering key 1"),
667 Message::new()
668 .set_data("hello 3")
669 .set_ordering_key("ordering key 2"),
670 ];
671 let mut handles = Vec::new();
672 for msg in messages {
673 let handle = publisher.publish(msg.clone());
674 handles.push((msg, handle));
675 }
676
677 for (id, rx) in handles.into_iter() {
678 let got = rx.await?;
679 let id = String::from_utf8(id.data.to_vec())?;
680 assert_eq!(got, id);
681 assert_eq!(
682 start.elapsed(),
683 delay,
684 "batch of messages should have sent after {:?}",
685 delay
686 )
687 }
688 }
689
690 Ok(())
691 }
692
693 #[tokio::test(start_paused = true)]
694 #[allow(clippy::get_first)]
695 async fn batching_separates_by_ordering_key() -> anyhow::Result<()> {
696 let mut mock = MockGapicPublisher::new();
698 mock.expect_publish()
699 .withf(|r, _| {
700 r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
701 })
702 .returning(publish_ok);
703
704 let client = GapicPublisher::from_stub(mock);
705 let message_count_threshold = 2_u32;
707 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
708 .set_message_count_threshold(message_count_threshold)
709 .set_byte_threshold(MAX_BYTES)
710 .set_delay_threshold(std::time::Duration::MAX)
711 .build();
712
713 let num_ordering_keys = 3;
714 let mut messages = Vec::new();
715 for i in 0..(2 * message_count_threshold * num_ordering_keys) {
719 messages.push(
720 Message::new()
721 .set_data(format!("test message {}", i))
722 .set_ordering_key(format!("ordering key: {}", i % num_ordering_keys)),
723 );
724 }
725 let mut handles = Vec::new();
726 for msg in messages {
727 let handle = publisher.publish(msg.clone());
728 handles.push((msg, handle));
729 }
730
731 for (id, rx) in handles.into_iter() {
732 let got = rx.await?;
733 let id = String::from_utf8(id.data.to_vec())?;
734 assert_eq!(got, id);
735 }
736
737 Ok(())
738 }
739
740 #[tokio_test_no_panics(start_paused = true)]
741 #[allow(clippy::get_first)]
742 async fn batching_handles_empty_ordering_key() -> anyhow::Result<()> {
743 let mut mock = MockGapicPublisher::new();
745 mock.expect_publish()
746 .withf(|r, _| {
747 r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
748 })
749 .returning(publish_ok);
750
751 let client = GapicPublisher::from_stub(mock);
752 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
754 .set_message_count_threshold(2_u32)
755 .set_byte_threshold(MAX_BYTES)
756 .set_delay_threshold(std::time::Duration::MAX)
757 .build();
758
759 let messages = [
760 Message::new().set_data("hello 1"),
761 Message::new().set_data("hello 2").set_ordering_key(""),
762 Message::new()
763 .set_data("hello 3")
764 .set_ordering_key("ordering key :1"),
765 Message::new()
766 .set_data("hello 4")
767 .set_ordering_key("ordering key :1"),
768 ];
769
770 let mut handles = Vec::new();
771 for msg in messages {
772 let handle = publisher.publish(msg.clone());
773 handles.push((msg, handle));
774 }
775
776 for (id, rx) in handles.into_iter() {
777 let got = rx.await?;
778 let id = String::from_utf8(id.data.to_vec())?;
779 assert_eq!(got, id);
780 }
781
782 Ok(())
783 }
784
785 #[tokio_test_no_panics(start_paused = true)]
786 #[allow(clippy::get_first)]
787 async fn ordering_key_limits_to_one_outstanding_batch() -> anyhow::Result<()> {
788 let mut seq = Sequence::new();
792 let mut mock = MockGapicPublisherWithFuture::new();
793 mock.expect_publish()
794 .times(1)
795 .in_sequence(&mut seq)
796 .withf(|r, _| r.messages.len() == 1)
797 .returning({
798 |r, o| {
799 Box::pin(async move {
800 tokio::time::sleep(Duration::from_millis(10)).await;
801 publish_ok(r, o)
802 })
803 }
804 });
805
806 mock.expect_publish()
807 .times(1)
808 .in_sequence(&mut seq)
809 .withf(|r, _| r.messages.len() == 1)
810 .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
811
812 let client = GapicPublisher::from_stub(mock);
813 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
815 .set_message_count_threshold(1_u32)
816 .set_byte_threshold(MAX_BYTES)
817 .set_delay_threshold(std::time::Duration::MAX)
818 .build();
819
820 let messages = [
821 Message::new()
822 .set_data("hello 1")
823 .set_ordering_key("ordering key"),
824 Message::new()
825 .set_data("hello 2")
826 .set_ordering_key("ordering key"),
827 ];
828
829 let start = tokio::time::Instant::now();
830 let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
831 let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
832 assert_eq!(msg2_handle.await?, "hello 2");
833 assert_eq!(
834 start.elapsed(),
835 Duration::from_millis(10),
836 "the second batch of messages should have sent after the first which is has been delayed by {:?}",
837 Duration::from_millis(10)
838 );
839 assert_eq!(msg1_handle.await?, "hello 1");
841
842 Ok(())
843 }
844
845 #[tokio_test_no_panics(start_paused = true)]
846 #[allow(clippy::get_first)]
847 async fn empty_ordering_key_allows_concurrent_batches() -> anyhow::Result<()> {
848 let mut seq = Sequence::new();
853 let mut mock = MockGapicPublisherWithFuture::new();
854 mock.expect_publish()
855 .times(1)
856 .in_sequence(&mut seq)
857 .withf(|r, _| r.messages.len() == 1)
858 .returning(|r, o| {
859 Box::pin(async move {
860 tokio::time::sleep(Duration::from_millis(10)).await;
861 publish_ok(r, o)
862 })
863 });
864
865 mock.expect_publish()
866 .times(1)
867 .in_sequence(&mut seq)
868 .withf(|r, _| r.topic == TOPIC && r.messages.len() == 1)
869 .returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
870
871 let client = GapicPublisher::from_stub(mock);
872 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
874 .set_message_count_threshold(1_u32)
875 .set_byte_threshold(MAX_BYTES)
876 .set_delay_threshold(std::time::Duration::MAX)
877 .build();
878
879 let messages = [
880 Message::new().set_data("hello 1").set_ordering_key(""),
881 Message::new().set_data("hello 2").set_ordering_key(""),
882 ];
883
884 let start = tokio::time::Instant::now();
885 let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
886 let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
887 assert_eq!(msg2_handle.await?, "hello 2");
888 assert_eq!(
889 start.elapsed(),
890 Duration::from_millis(0),
891 "the second batch of messages should have sent without any delay"
892 );
893 assert_eq!(msg1_handle.await?, "hello 1");
895
896 Ok(())
897 }
898
899 #[tokio_test_no_panics(start_paused = true)]
900 async fn ordering_key_error_pauses_publisher() -> anyhow::Result<()> {
901 let mut seq = Sequence::new();
903 let mut mock = MockGapicPublisher::new();
904 mock.expect_publish()
905 .withf(|req, _o| req.topic == TOPIC)
906 .times(1)
907 .in_sequence(&mut seq)
908 .returning(publish_err);
909
910 mock.expect_publish()
911 .withf(|req, _o| req.topic == TOPIC)
912 .times(2)
913 .in_sequence(&mut seq)
914 .returning(publish_ok);
915
916 let client = GapicPublisher::from_stub(mock);
917 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
918 .set_message_count_threshold(1_u32)
919 .build();
920
921 let key = "ordering_key";
922 let msg_0_handle =
923 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
924 let msg_1_handle =
926 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
927
928 let mut got_err = msg_0_handle.await.unwrap_err();
930 assert_publish_err(got_err);
931
932 got_err = msg_1_handle.await.unwrap_err();
934 assert!(
935 matches!(got_err, crate::error::PublishError::OrderingKeyPaused),
936 "{got_err:?}"
937 );
938
939 for _ in 0..3 {
941 assert_publishing_is_paused!(publisher, key);
942 }
943
944 assert_publishing_is_ok!(publisher, "", "without_error");
946
947 Ok(())
948 }
949
950 #[tokio_test_no_panics(start_paused = true)]
951 async fn batch_error_pauses_ordering_key() -> anyhow::Result<()> {
952 let mut mock = MockGapicPublisher::new();
954 mock.expect_publish()
955 .times(1)
956 .withf(|r, _| r.topic == TOPIC && r.messages.len() == 2)
957 .returning(publish_err);
958
959 let client = GapicPublisher::from_stub(mock);
960 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
961 .set_message_count_threshold(2_u32)
962 .build();
963
964 let key = "ordering_key";
965 let msg_0_handle =
967 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
968 let msg_1_handle =
969 publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
970
971 let mut got_err = msg_0_handle.await.unwrap_err();
973 assert_publish_err(got_err);
974 got_err = msg_1_handle.await.unwrap_err();
975 assert_publish_err(got_err);
976
977 assert_publishing_is_paused!(publisher, key);
979
980 Ok(())
981 }
982
983 #[tokio_test_no_panics(start_paused = true)]
984 async fn flush_on_paused_ordering_key_returns_error() -> anyhow::Result<()> {
985 let mut seq = Sequence::new();
987 let mut mock = MockGapicPublisher::new();
988 mock.expect_publish()
989 .withf(|req, _o| req.topic == TOPIC)
990 .times(1)
991 .in_sequence(&mut seq)
992 .returning(publish_err);
993
994 mock.expect_publish()
995 .withf(|req, _o| req.topic == TOPIC)
996 .times(2)
997 .in_sequence(&mut seq)
998 .returning(publish_ok);
999
1000 let client = GapicPublisher::from_stub(mock);
1001 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1002
1003 let key = "ordering_key";
1004 let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1006 publisher.flush().await;
1007 let got_err = handle.await.unwrap_err();
1009 assert_publish_err(got_err);
1010
1011 assert_publishing_is_paused!(publisher, key);
1013
1014 assert_publishing_is_ok!(publisher, "", "without_error");
1016
1017 Ok(())
1018 }
1019
1020 #[tokio_test_no_panics(start_paused = true)]
1021 async fn resuming_non_paused_ordering_key_is_noop() -> anyhow::Result<()> {
1022 let mut mock = MockGapicPublisher::new();
1023 mock.expect_publish()
1024 .withf(|req, _o| req.topic == TOPIC)
1025 .times(4)
1026 .returning(publish_ok);
1027
1028 let client = GapicPublisher::from_stub(mock);
1029 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1030
1031 publisher.resume_publish("");
1033 assert_publishing_is_ok!(publisher, "");
1034
1035 publisher.resume_publish("");
1037 assert_publishing_is_ok!(publisher, "");
1038
1039 let key = "without_error";
1041 publisher.resume_publish(key);
1042 assert_publishing_is_ok!(publisher, key);
1043
1044 publisher.resume_publish(key);
1046 assert_publishing_is_ok!(publisher, key);
1047
1048 Ok(())
1049 }
1050
1051 #[tokio_test_no_panics(start_paused = true)]
1052 async fn resuming_paused_ordering_key_allows_publishing() -> anyhow::Result<()> {
1053 let mut seq = Sequence::new();
1054 let mut mock = MockGapicPublisher::new();
1055 mock.expect_publish()
1056 .withf(|req, _o| req.topic == TOPIC)
1057 .times(1)
1058 .in_sequence(&mut seq)
1059 .returning(publish_err);
1060
1061 mock.expect_publish()
1062 .withf(|req, _o| req.topic == TOPIC)
1063 .times(3)
1064 .in_sequence(&mut seq)
1065 .returning(publish_ok);
1066
1067 let client = GapicPublisher::from_stub(mock);
1068 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1069
1070 let key = "ordering_key";
1071 let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1073 let got_err = handle.await.unwrap_err();
1075 assert_publish_err(got_err);
1076
1077 assert_publishing_is_paused!(publisher, key);
1079
1080 publisher.resume_publish(key);
1082 assert_publishing_is_ok!(publisher, key);
1083
1084 assert_publishing_is_ok!(publisher, "", "without_error");
1086
1087 Ok(())
1088 }
1089
1090 #[tokio_test_no_panics(start_paused = true)]
1091 async fn resuming_ordering_key_twice_is_safe() -> anyhow::Result<()> {
1092 let mut seq = Sequence::new();
1094 let mut mock = MockGapicPublisher::new();
1095 mock.expect_publish()
1096 .withf(|req, _o| req.topic == TOPIC)
1097 .in_sequence(&mut seq)
1098 .times(1)
1099 .returning(publish_err);
1100
1101 mock.expect_publish()
1102 .withf(|req, _o| req.topic == TOPIC)
1103 .in_sequence(&mut seq)
1104 .return_once(publish_ok);
1105
1106 let client = GapicPublisher::from_stub(mock);
1107 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1108
1109 let key = "ordering_key";
1110 let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
1112 publisher.flush().await;
1113 let got_err = handle.await.unwrap_err();
1115 assert_publish_err(got_err);
1116
1117 assert_publishing_is_paused!(publisher, key);
1119
1120 publisher.resume_publish(key);
1122 publisher.resume_publish(key);
1123 assert_publishing_is_ok!(publisher, key);
1124
1125 Ok(())
1126 }
1127
1128 #[tokio_test_no_panics(start_paused = true)]
1129 async fn resuming_one_ordering_key_does_not_resume_others() -> anyhow::Result<()> {
1130 let mut seq = Sequence::new();
1132 let mut mock = MockGapicPublisher::new();
1133 mock.expect_publish()
1134 .withf(|req, _o| req.topic == TOPIC)
1135 .times(2)
1136 .in_sequence(&mut seq)
1137 .returning(publish_err);
1138
1139 mock.expect_publish()
1140 .withf(|req, _o| req.topic == TOPIC)
1141 .times(1)
1142 .in_sequence(&mut seq)
1143 .returning(publish_ok);
1144
1145 let client = GapicPublisher::from_stub(mock);
1146 let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
1147
1148 let key_0 = "ordering_key_0";
1149 let key_1 = "ordering_key_1";
1150 let handle_0 = publisher.publish(Message::new().set_ordering_key(key_0).set_data("msg 0"));
1152 let handle_1 = publisher.publish(Message::new().set_ordering_key(key_1).set_data("msg 1"));
1153 publisher.flush().await;
1154 let mut got_err = handle_0.await.unwrap_err();
1155 assert_publish_err(got_err);
1156 got_err = handle_1.await.unwrap_err();
1157 assert_publish_err(got_err);
1158
1159 assert_publishing_is_paused!(publisher, key_0, key_1);
1161
1162 publisher.resume_publish(key_0);
1164
1165 assert_publishing_is_ok!(publisher, key_0);
1167
1168 assert_publishing_is_paused!(publisher, key_1);
1170
1171 Ok(())
1172 }
1173
1174 #[tokio::test]
1175 async fn publisher_builder_clamps_batching_options() -> anyhow::Result<()> {
1176 let oversized_options = BatchingOptions::new()
1178 .set_delay_threshold(MAX_DELAY + Duration::from_secs(1))
1179 .set_message_count_threshold(MAX_MESSAGES + 1)
1180 .set_byte_threshold(MAX_BYTES + 1);
1181
1182 let publishers = vec![
1183 BasePublisher::builder()
1184 .build()
1185 .await?
1186 .publisher("projects/my-project/topics/my-topic")
1187 .set_delay_threshold(oversized_options.delay_threshold)
1188 .set_message_count_threshold(oversized_options.message_count_threshold)
1189 .set_byte_threshold(oversized_options.byte_threshold)
1190 .build(),
1191 Publisher::builder("projects/my-project/topics/my-topic".to_string())
1192 .set_delay_threshold(oversized_options.delay_threshold)
1193 .set_message_count_threshold(oversized_options.message_count_threshold)
1194 .set_byte_threshold(oversized_options.byte_threshold)
1195 .build()
1196 .await?,
1197 ];
1198
1199 for publisher in publishers {
1200 let got = publisher.batching_options;
1201 assert_eq!(got.delay_threshold, MAX_DELAY);
1202 assert_eq!(got.message_count_threshold, MAX_MESSAGES);
1203 assert_eq!(got.byte_threshold, MAX_BYTES);
1204 }
1205
1206 let normal_options = BatchingOptions::new()
1208 .set_delay_threshold(Duration::from_secs(10))
1209 .set_message_count_threshold(10_u32)
1210 .set_byte_threshold(100_u32);
1211
1212 let publishers = vec![
1213 BasePublisher::builder()
1214 .build()
1215 .await?
1216 .publisher("projects/my-project/topics/my-topic")
1217 .set_delay_threshold(normal_options.delay_threshold)
1218 .set_message_count_threshold(normal_options.message_count_threshold)
1219 .set_byte_threshold(normal_options.byte_threshold)
1220 .build(),
1221 Publisher::builder("projects/my-project/topics/my-topic".to_string())
1222 .set_delay_threshold(normal_options.delay_threshold)
1223 .set_message_count_threshold(normal_options.message_count_threshold)
1224 .set_byte_threshold(normal_options.byte_threshold)
1225 .build()
1226 .await?,
1227 ];
1228
1229 for publisher in publishers {
1230 let got = publisher.batching_options;
1231
1232 assert_eq!(got.delay_threshold, normal_options.delay_threshold);
1233 assert_eq!(
1234 got.message_count_threshold,
1235 normal_options.message_count_threshold
1236 );
1237 assert_eq!(got.byte_threshold, normal_options.byte_threshold);
1238 }
1239 Ok(())
1240 }
1241}