1use super::options::BatchingOptions;
16use crate::generated::gapic_dataplane::client::Publisher as GapicPublisher;
17use crate::publisher::worker::BundledMessage;
18use crate::publisher::worker::ToWorker;
19use crate::publisher::worker::Worker;
20use std::time::Duration;
21use tokio::sync::mpsc::UnboundedSender;
22use tokio::sync::oneshot;
23
24const MAX_DELAY: Duration = Duration::from_secs(60 * 60 * 24); const MAX_MESSAGES: u32 = 1000;
28const MAX_BYTES: u32 = 1e7 as u32; #[derive(Debug, Clone)]
46pub struct Publisher {
47 #[allow(dead_code)]
48 pub(crate) batching_options: BatchingOptions,
49 tx: UnboundedSender<ToWorker>,
50}
51
52impl Publisher {
53 pub fn publish(&self, msg: crate::model::PubsubMessage) -> crate::model_ext::PublishHandle {
63 let (tx, rx) = tokio::sync::oneshot::channel();
64
65 if self
68 .tx
69 .send(ToWorker::Publish(BundledMessage { msg, tx }))
70 .is_err()
71 {
72 }
74 crate::model_ext::PublishHandle { rx }
75 }
76
77 pub async fn flush(&self) {
114 let (tx, rx) = oneshot::channel();
115 if self.tx.send(ToWorker::Flush(tx)).is_err() {
116 }
118 rx.await
119 .expect("the client library should not release the sender");
120 }
121}
122
123#[derive(Clone, Debug)]
139pub struct PublisherBuilder {
140 pub(crate) inner: GapicPublisher,
141 topic: String,
142 batching_options: BatchingOptions,
143}
144
145impl PublisherBuilder {
146 pub(crate) fn new(client: GapicPublisher, topic: String) -> Self {
148 Self {
149 inner: client,
150 topic,
151 batching_options: BatchingOptions::default(),
152 }
153 }
154
155 pub fn set_message_count_threshold(mut self, threshold: u32) -> PublisherBuilder {
171 self.batching_options = self.batching_options.set_message_count_threshold(threshold);
172 self
173 }
174
175 pub fn set_byte_threshold(mut self, threshold: u32) -> PublisherBuilder {
189 self.batching_options = self.batching_options.set_byte_threshold(threshold);
190 self
191 }
192
193 pub fn set_delay_threshold(mut self, threshold: Duration) -> PublisherBuilder {
209 self.batching_options = self.batching_options.set_delay_threshold(threshold);
210 self
211 }
212
213 pub fn build(self) -> Publisher {
219 let batching_options = BatchingOptions::new()
221 .set_delay_threshold(
222 self.batching_options
223 .delay_threshold
224 .clamp(Duration::ZERO, MAX_DELAY),
225 )
226 .set_message_count_threshold(
227 self.batching_options
228 .message_count_threshold
229 .clamp(0, MAX_MESSAGES),
230 )
231 .set_byte_threshold(self.batching_options.byte_threshold.clamp(0, MAX_BYTES));
232
233 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
234 let worker = Worker::new(self.topic, self.inner, batching_options.clone(), rx);
239 tokio::spawn(worker.run());
240
241 Publisher {
242 batching_options,
243 tx,
244 }
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use crate::{client::Client, publisher::options::BatchingOptions};
252 use crate::{
253 generated::gapic_dataplane::client::Publisher as GapicPublisher,
254 model::{PublishResponse, PubsubMessage},
255 };
256 use mockall::Sequence;
257
258 mockall::mock! {
259 #[derive(Debug)]
260 GapicPublisher {}
261 impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisher {
262 async fn publish(&self, req: crate::model::PublishRequest, _options: gax::options::RequestOptions) -> gax::Result<gax::response::Response<crate::model::PublishResponse>>;
263 }
264 }
265
266 mockall::mock! {
273 #[derive(Debug)]
274 GapicPublisherWithFuture {}
275 impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
276 fn publish(&self, req: crate::model::PublishRequest, _options: gax::options::RequestOptions) -> impl Future<Output=gax::Result<gax::response::Response<crate::model::PublishResponse>>> + Send;
277 }
278 }
279
280 #[tokio::test]
281 async fn test_worker_success() {
282 let mut mock = MockGapicPublisher::new();
283 mock.expect_publish()
284 .returning({
285 |r, _| {
286 assert_eq!(r.topic, "my-topic");
287 assert_eq!(r.messages.len(), 1);
288 let id = String::from_utf8(r.messages[0].data.to_vec()).unwrap();
289 Ok(gax::response::Response::from(
290 PublishResponse::new().set_message_ids(vec![id]),
291 ))
292 }
293 })
294 .times(2);
295
296 let client = GapicPublisher::from_stub(mock);
297 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
298 .set_message_count_threshold(1_u32)
299 .build();
300
301 let messages = vec![
302 PubsubMessage::new().set_data("hello".to_string()),
303 PubsubMessage::new().set_data("world".to_string()),
304 ];
305 let mut handles = Vec::new();
306 for msg in messages {
307 let handle = publisher.publish(msg.clone());
308 handles.push((msg, handle));
309 }
310
311 for (id, rx) in handles.into_iter() {
312 let got = rx.await.expect("expected message id");
313 let id = String::from_utf8(id.data.to_vec()).unwrap();
314 assert_eq!(got, id);
315 }
316 }
317
318 #[tokio::test(start_paused = true)]
319 async fn test_drop_publisher() {
320 let mut mock = MockGapicPublisher::new();
323 mock.expect_publish().return_once({
324 |r, _| {
325 assert_eq!(r.topic, "my-topic");
326 let ids = r
327 .messages
328 .iter()
329 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
330 Ok(gax::response::Response::from(
331 PublishResponse::new().set_message_ids(ids),
332 ))
333 }
334 });
335 let client = GapicPublisher::from_stub(mock);
336 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
337 .set_message_count_threshold(1000_u32)
338 .set_delay_threshold(Duration::from_secs(60))
339 .build();
340
341 let start = tokio::time::Instant::now();
342 let messages = vec![
343 PubsubMessage::new().set_data("hello".to_string()),
344 PubsubMessage::new().set_data("world".to_string()),
345 ];
346 let mut handles = Vec::new();
347 for msg in messages {
348 let handle = publisher.publish(msg.clone());
349 handles.push((msg, handle));
350 }
351 drop(publisher); for (id, rx) in handles.into_iter() {
354 let got = rx.await.expect("expected message id");
355 let id = String::from_utf8(id.data.to_vec()).unwrap();
356 assert_eq!(got, id);
357 assert_eq!(start.elapsed(), Duration::ZERO);
358 }
359 }
360
361 #[tokio::test]
362 async fn test_worker_error() {
363 let mut mock = MockGapicPublisher::new();
364 mock.expect_publish()
365 .returning({
366 |r, _| {
367 assert_eq!(r.topic, "my-topic");
368 assert_eq!(r.messages.len(), 1);
369 Err(gax::error::Error::io("io error"))
370 }
371 })
372 .times(2);
373
374 let client = GapicPublisher::from_stub(mock);
375 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
376 .set_message_count_threshold(1_u32)
377 .build();
378
379 let messages = vec![
380 PubsubMessage::new().set_data("hello".to_string()),
381 PubsubMessage::new().set_data("world".to_string()),
382 ];
383
384 let mut handles = Vec::new();
385 for msg in messages {
386 let handle = publisher.publish(msg.clone());
387 handles.push(handle);
388 }
389
390 for rx in handles.into_iter() {
391 let got = rx.await;
392 assert!(got.is_err());
393 }
394 }
395
396 #[tokio::test(start_paused = true)]
397 async fn test_worker_flush() {
398 let mut mock = MockGapicPublisher::new();
399 mock.expect_publish().returning({
400 |r, _| {
401 assert_eq!(r.topic, "my-topic");
402 let ids = r
403 .messages
404 .iter()
405 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
406 Ok(gax::response::Response::from(
407 PublishResponse::new().set_message_ids(ids),
408 ))
409 }
410 });
411
412 let client = GapicPublisher::from_stub(mock);
413 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
414 .set_message_count_threshold(1000_u32)
416 .set_delay_threshold(Duration::from_secs(60))
417 .build();
418
419 let start = tokio::time::Instant::now();
420 let messages = vec![
421 PubsubMessage::new().set_data("hello".to_string()),
422 PubsubMessage::new().set_data("world".to_string()),
423 ];
424 let mut handles = Vec::new();
425 for msg in messages {
426 let handle = publisher.publish(msg.clone());
427 handles.push((msg, handle));
428 }
429
430 publisher.flush().await;
431 assert_eq!(start.elapsed(), Duration::ZERO);
432
433 let post = publisher.publish(PubsubMessage::new().set_data("after".to_string()));
434 for (id, rx) in handles.into_iter() {
435 let got = rx.await.expect("expected message id");
436 let id = String::from_utf8(id.data.to_vec()).unwrap();
437 assert_eq!(got, id);
438 assert_eq!(start.elapsed(), Duration::ZERO);
439 }
440
441 let got = post.await.expect("expected message id");
444 assert_eq!(got, "after");
445 assert_eq!(start.elapsed(), Duration::from_secs(60));
446 }
447
448 #[tokio::test(start_paused = true)]
450 async fn test_worker_drop_handles() {
451 let mut mock = MockGapicPublisher::new();
452 mock.expect_publish().return_once({
453 move |r, _| {
454 assert_eq!(r.topic, "my-topic");
455 let ids = r
456 .messages
457 .iter()
458 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
459 assert_eq!(ids.len(), 2);
460 let ids = ids.collect::<Vec<_>>();
461 assert_eq!(ids.clone(), vec!["hello", "world"]);
462 Ok(gax::response::Response::from(
463 PublishResponse::new().set_message_ids(ids),
464 ))
465 }
466 });
467
468 let client = GapicPublisher::from_stub(mock);
469 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
470 .set_message_count_threshold(1000_u32)
472 .set_delay_threshold(Duration::from_secs(60))
473 .build();
474
475 let start = tokio::time::Instant::now();
476 let messages = vec![
477 PubsubMessage::new().set_data("hello".to_string()),
478 PubsubMessage::new().set_data("world".to_string()),
479 ];
480 for msg in messages {
481 publisher.publish(msg.clone());
482 }
483
484 publisher.flush().await;
485 assert_eq!(start.elapsed(), Duration::ZERO);
486 }
487
488 #[tokio::test(start_paused = true)]
489 async fn test_empty_flush() {
490 let mock = MockGapicPublisher::new();
491
492 let client = GapicPublisher::from_stub(mock);
493 let publisher = PublisherBuilder::new(client, "my-topic".to_string()).build();
494
495 let start = tokio::time::Instant::now();
496 publisher.flush().await;
497 assert_eq!(start.elapsed(), Duration::ZERO);
498 }
499
500 #[tokio::test]
501 async fn test_batching_send_on_message_count_threshold_success() {
502 let mut mock = MockGapicPublisher::new();
504 mock.expect_publish().return_once({
505 |r, _| {
506 assert_eq!(r.topic, "my-topic");
507 assert_eq!(r.messages.len(), 2);
508 let ids = r
509 .messages
510 .iter()
511 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
512 Ok(gax::response::Response::from(
513 PublishResponse::new().set_message_ids(ids),
514 ))
515 }
516 });
517
518 let client = GapicPublisher::from_stub(mock);
519 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
520 .set_message_count_threshold(2_u32)
521 .set_byte_threshold(MAX_BYTES)
522 .set_delay_threshold(std::time::Duration::MAX)
523 .build();
524
525 let messages = vec![
526 PubsubMessage::new().set_data("hello".to_string()),
527 PubsubMessage::new().set_data("world".to_string()),
528 ];
529 let mut handles = Vec::new();
530 for msg in messages {
531 let handle = publisher.publish(msg.clone());
532 handles.push((msg, handle));
533 }
534
535 for (id, rx) in handles.into_iter() {
536 let got = rx.await.expect("expected message id");
537 let id = String::from_utf8(id.data.to_vec()).unwrap();
538 assert_eq!(got, id);
539 }
540 }
541
542 #[tokio::test]
543 async fn test_batching_send_on_message_count_threshold_error() {
544 let mut mock = MockGapicPublisher::new();
546 mock.expect_publish().return_once({
547 |r, _| {
548 assert_eq!(r.topic, "my-topic");
549 assert_eq!(r.messages.len(), 2);
550 Err(gax::error::Error::io("io error"))
551 }
552 });
553
554 let client = GapicPublisher::from_stub(mock);
555 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
556 .set_message_count_threshold(2_u32)
557 .set_byte_threshold(MAX_BYTES)
558 .set_delay_threshold(std::time::Duration::MAX)
559 .build();
560
561 let messages = vec![
562 PubsubMessage::new().set_data("hello".to_string()),
563 PubsubMessage::new().set_data("world".to_string()),
564 ];
565 let mut handles = Vec::new();
566 for msg in messages {
567 let handle = publisher.publish(msg.clone());
568 handles.push(handle);
569 }
570
571 for rx in handles.into_iter() {
572 let got = rx.await;
573 assert!(got.is_err());
574 }
575 }
576
577 #[tokio::test]
578 async fn test_batching_send_on_byte_threshold() {
579 let mut mock = MockGapicPublisher::new();
581 mock.expect_publish().return_once({
582 |r, _| {
583 assert_eq!(r.topic, "my-topic");
584 assert_eq!(r.messages.len(), 2);
585 let ids = r
586 .messages
587 .iter()
588 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
589 Ok(gax::response::Response::from(
590 PublishResponse::new().set_message_ids(ids),
591 ))
592 }
593 });
594
595 let client = GapicPublisher::from_stub(mock);
596 let byte_threshold = "my-topic".len() + "hello".len() + 1;
598 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
599 .set_message_count_threshold(MAX_MESSAGES)
600 .set_byte_threshold(byte_threshold as u32)
601 .set_delay_threshold(std::time::Duration::MAX)
602 .build();
603
604 let messages = vec![
605 PubsubMessage::new().set_data("hello".to_string()),
606 PubsubMessage::new().set_data("world".to_string()),
607 ];
608 let mut handles = Vec::new();
609 for msg in messages {
610 let handle = publisher.publish(msg.clone());
611 handles.push((msg, handle));
612 }
613
614 for (id, rx) in handles.into_iter() {
615 let got = rx.await.expect("expected message id");
616 let id = String::from_utf8(id.data.to_vec()).unwrap();
617 assert_eq!(got, id);
618 }
619 }
620
621 #[tokio::test(start_paused = true)]
622 async fn test_batching_send_on_delay_threshold() {
623 let mut mock = MockGapicPublisher::new();
624 mock.expect_publish().returning({
625 |r, _| {
626 assert_eq!(r.topic, "my-topic");
627 let ids = r
628 .messages
629 .iter()
630 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
631 Ok(gax::response::Response::from(
632 PublishResponse::new().set_message_ids(ids),
633 ))
634 }
635 });
636
637 let client = GapicPublisher::from_stub(mock);
638 let delay = std::time::Duration::from_millis(10);
639 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
640 .set_message_count_threshold(u32::MAX)
641 .set_byte_threshold(MAX_BYTES)
642 .set_delay_threshold(delay)
643 .build();
644
645 for _ in 0..3 {
647 let start = tokio::time::Instant::now();
648 let messages = vec![
649 PubsubMessage::new().set_data("hello 0".to_string()),
650 PubsubMessage::new().set_data("hello 1".to_string()),
651 PubsubMessage::new()
652 .set_data("hello 2".to_string())
653 .set_ordering_key("ordering key 1"),
654 PubsubMessage::new()
655 .set_data("hello 3".to_string())
656 .set_ordering_key("ordering key 2"),
657 ];
658 let mut handles = Vec::new();
659 for msg in messages {
660 let handle = publisher.publish(msg.clone());
661 handles.push((msg, handle));
662 }
663
664 for (id, rx) in handles.into_iter() {
665 let got = rx.await.expect("expected message id");
666 let id = String::from_utf8(id.data.to_vec()).unwrap();
667 assert_eq!(got, id);
668 assert_eq!(
669 start.elapsed(),
670 delay,
671 "batch of messages should have sent after {:?}",
672 delay
673 )
674 }
675 }
676 }
677
678 #[tokio::test(start_paused = true)]
679 #[allow(clippy::get_first)]
680 async fn test_batching_on_ordering_key() {
681 let mut mock = MockGapicPublisher::new();
683 mock.expect_publish().returning({
684 |r, _| {
685 assert_eq!(r.topic, "my-topic");
686 assert_eq!(r.messages.len(), 2);
687 assert_eq!(
688 r.messages.get(0).unwrap().ordering_key,
689 r.messages.get(1).unwrap().ordering_key
690 );
691 let ids = r
692 .messages
693 .iter()
694 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
695 Ok(gax::response::Response::from(
696 PublishResponse::new().set_message_ids(ids),
697 ))
698 }
699 });
700
701 let client = GapicPublisher::from_stub(mock);
702 let message_count_threshold = 2_u32;
704 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
705 .set_message_count_threshold(message_count_threshold)
706 .set_byte_threshold(MAX_BYTES)
707 .set_delay_threshold(std::time::Duration::MAX)
708 .build();
709
710 let num_ordering_keys = 3;
711 let mut messages = Vec::new();
712 for i in 0..(2 * message_count_threshold * num_ordering_keys) {
716 messages.push(
717 PubsubMessage::new()
718 .set_data(format!("test message {}", i))
719 .set_ordering_key(format!("ordering key: {}", i % num_ordering_keys)),
720 );
721 }
722 let mut handles = Vec::new();
723 for msg in messages {
724 let handle = publisher.publish(msg.clone());
725 handles.push((msg, handle));
726 }
727
728 for (id, rx) in handles.into_iter() {
729 let got = rx.await.expect("expected message id");
730 let id = String::from_utf8(id.data.to_vec()).unwrap();
731 assert_eq!(got, id);
732 }
733 }
734
735 #[tokio::test(start_paused = true)]
736 #[allow(clippy::get_first)]
737 async fn test_batching_empty_ordering_key() {
738 let mut mock = MockGapicPublisher::new();
740 mock.expect_publish().returning({
741 |r, _| {
742 assert_eq!(r.topic, "my-topic");
743 assert_eq!(r.messages.len(), 2);
744 assert_eq!(
745 r.messages.get(0).unwrap().ordering_key,
746 r.messages.get(1).unwrap().ordering_key
747 );
748 let ids = r
749 .messages
750 .iter()
751 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
752 Ok(gax::response::Response::from(
753 PublishResponse::new().set_message_ids(ids),
754 ))
755 }
756 });
757
758 let client = GapicPublisher::from_stub(mock);
759 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
761 .set_message_count_threshold(2_u32)
762 .set_byte_threshold(MAX_BYTES)
763 .set_delay_threshold(std::time::Duration::MAX)
764 .build();
765
766 let messages = vec![
767 PubsubMessage::new().set_data("hello 1".to_string()),
768 PubsubMessage::new()
769 .set_data("hello 2".to_string())
770 .set_ordering_key(""),
771 PubsubMessage::new()
772 .set_data("hello 3".to_string())
773 .set_ordering_key("ordering key :1"),
774 PubsubMessage::new()
775 .set_data("hello 4".to_string())
776 .set_ordering_key("ordering key :1"),
777 ];
778
779 let mut handles = Vec::new();
780 for msg in messages {
781 let handle = publisher.publish(msg.clone());
782 handles.push((msg, handle));
783 }
784
785 for (id, rx) in handles.into_iter() {
786 let got = rx.await.expect("expected message id");
787 let id = String::from_utf8(id.data.to_vec()).unwrap();
788 assert_eq!(got, id);
789 }
790 }
791
792 #[tokio::test(start_paused = true)]
793 #[allow(clippy::get_first)]
794 async fn test_ordering_key_only_one_outstanding_batch() {
795 let mut seq = Sequence::new();
799 let mut mock = MockGapicPublisherWithFuture::new();
800 mock.expect_publish()
801 .times(1)
802 .in_sequence(&mut seq)
803 .returning({
804 |r, _| {
805 Box::pin(async move {
806 tokio::time::sleep(Duration::from_millis(10)).await;
807 assert_eq!(r.topic, "my-topic");
808 assert_eq!(r.messages.len(), 1);
809 let ids = r
810 .messages
811 .iter()
812 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
813 Ok(gax::response::Response::from(
814 PublishResponse::new().set_message_ids(ids),
815 ))
816 })
817 }
818 });
819
820 mock.expect_publish()
821 .times(1)
822 .in_sequence(&mut seq)
823 .returning({
824 |r, _| {
825 Box::pin(async move {
826 assert_eq!(r.topic, "my-topic");
827 assert_eq!(r.messages.len(), 1);
828 let ids = r
829 .messages
830 .iter()
831 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
832 Ok(gax::response::Response::from(
833 PublishResponse::new().set_message_ids(ids),
834 ))
835 })
836 }
837 });
838
839 let client = GapicPublisher::from_stub(mock);
840 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
842 .set_message_count_threshold(1_u32)
843 .set_byte_threshold(MAX_BYTES)
844 .set_delay_threshold(std::time::Duration::MAX)
845 .build();
846
847 let messages = [
848 PubsubMessage::new()
849 .set_data("hello 1".to_string())
850 .set_ordering_key("ordering key"),
851 PubsubMessage::new()
852 .set_data("hello 2".to_string())
853 .set_ordering_key("ordering key"),
854 ];
855
856 let start = tokio::time::Instant::now();
857 let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
858 let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
859 assert_eq!(msg2_handle.await.expect("expected message id"), "hello 2");
860 assert_eq!(
861 start.elapsed(),
862 Duration::from_millis(10),
863 "the second batch of messages should have sent after the first which is has been delayed by {:?}",
864 Duration::from_millis(10)
865 );
866 assert_eq!(msg1_handle.await.expect("expected message id"), "hello 1");
868 }
869
870 #[tokio::test(start_paused = true)]
871 #[allow(clippy::get_first)]
872 async fn test_empty_ordering_key_concurrent_batches() {
873 let mut seq = Sequence::new();
878 let mut mock = MockGapicPublisherWithFuture::new();
879 mock.expect_publish()
880 .times(1)
881 .in_sequence(&mut seq)
882 .returning({
883 |r, _| {
884 Box::pin(async move {
885 tokio::time::sleep(Duration::from_millis(10)).await;
886 assert_eq!(r.topic, "my-topic");
887 assert_eq!(r.messages.len(), 1);
888 let ids = r
889 .messages
890 .iter()
891 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
892 Ok(gax::response::Response::from(
893 PublishResponse::new().set_message_ids(ids),
894 ))
895 })
896 }
897 });
898
899 mock.expect_publish()
900 .times(1)
901 .in_sequence(&mut seq)
902 .returning({
903 |r, _| {
904 Box::pin(async move {
905 assert_eq!(r.topic, "my-topic");
906 assert_eq!(r.messages.len(), 1);
907 let ids = r
908 .messages
909 .iter()
910 .map(|m| String::from_utf8(m.data.to_vec()).unwrap());
911 Ok(gax::response::Response::from(
912 PublishResponse::new().set_message_ids(ids),
913 ))
914 })
915 }
916 });
917
918 let client = GapicPublisher::from_stub(mock);
919 let publisher = PublisherBuilder::new(client, "my-topic".to_string())
921 .set_message_count_threshold(1_u32)
922 .set_byte_threshold(MAX_BYTES)
923 .set_delay_threshold(std::time::Duration::MAX)
924 .build();
925
926 let messages = [
927 PubsubMessage::new()
928 .set_data("hello 1".to_string())
929 .set_ordering_key(""),
930 PubsubMessage::new()
931 .set_data("hello 2".to_string())
932 .set_ordering_key(""),
933 ];
934
935 let start = tokio::time::Instant::now();
936 let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
937 let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
938 assert_eq!(msg2_handle.await.expect("expected message id"), "hello 2");
939 assert_eq!(
940 start.elapsed(),
941 Duration::from_millis(0),
942 "the second batch of messages should have sent without any delay"
943 );
944 assert_eq!(msg1_handle.await.expect("expected message id"), "hello 1");
946 }
947
948 #[tokio::test]
949 async fn builder() -> anyhow::Result<()> {
950 let client = Client::builder().build().await?;
951 let builder = client.publisher("projects/my-project/topics/my-topic".to_string());
952 let publisher = builder.set_message_count_threshold(1_u32).build();
953 assert_eq!(publisher.batching_options.message_count_threshold, 1_u32);
954 Ok(())
955 }
956
957 #[tokio::test]
958 async fn default_batching() -> anyhow::Result<()> {
959 let client = Client::builder().build().await?;
960 let publisher = client
961 .publisher("projects/my-project/topics/my-topic".to_string())
962 .build();
963
964 assert_eq!(
965 publisher.batching_options.message_count_threshold,
966 BatchingOptions::default().message_count_threshold
967 );
968 assert_eq!(
969 publisher.batching_options.byte_threshold,
970 BatchingOptions::default().byte_threshold
971 );
972 assert_eq!(
973 publisher.batching_options.delay_threshold,
974 BatchingOptions::default().delay_threshold
975 );
976 Ok(())
977 }
978
979 #[tokio::test]
980 async fn test_builder_clamping() -> anyhow::Result<()> {
981 let oversized_options = BatchingOptions::new()
983 .set_delay_threshold(MAX_DELAY + Duration::from_secs(1))
984 .set_message_count_threshold(MAX_MESSAGES + 1)
985 .set_byte_threshold(MAX_BYTES + 1);
986
987 let client = Client::builder().build().await?;
988 let publisher = client
989 .publisher("projects/my-project/topics/my-topic".to_string())
990 .set_delay_threshold(oversized_options.delay_threshold)
991 .set_message_count_threshold(oversized_options.message_count_threshold)
992 .set_byte_threshold(oversized_options.byte_threshold)
993 .build();
994 let got = publisher.batching_options;
995
996 assert_eq!(got.delay_threshold, MAX_DELAY);
997 assert_eq!(got.message_count_threshold, MAX_MESSAGES);
998 assert_eq!(got.byte_threshold, MAX_BYTES);
999
1000 let normal_options = BatchingOptions::new()
1002 .set_delay_threshold(Duration::from_secs(10))
1003 .set_message_count_threshold(10_u32)
1004 .set_byte_threshold(100_u32);
1005
1006 let publisher = client
1007 .publisher("projects/my-project/topics/my-topic".to_string())
1008 .set_delay_threshold(normal_options.delay_threshold)
1009 .set_message_count_threshold(normal_options.message_count_threshold)
1010 .set_byte_threshold(normal_options.byte_threshold)
1011 .build();
1012 let got = publisher.batching_options;
1013
1014 assert_eq!(got.delay_threshold, normal_options.delay_threshold);
1015 assert_eq!(
1016 got.message_count_threshold,
1017 normal_options.message_count_threshold
1018 );
1019 assert_eq!(got.byte_threshold, normal_options.byte_threshold);
1020
1021 Ok(())
1022 }
1023}