1use super::builder::Subscribe;
16use super::handler::{AckResult, Action, AtLeastOnce, ExactlyOnce, Handler};
17use super::lease_loop::LeaseLoop;
18use super::lease_state::{AtLeastOnceInfo, ExactlyOnceInfo, LeaseInfo, LeaseOptions, NewMessage};
19use super::leaser::DefaultLeaser;
20use super::retry_policy::StreamRetryPolicy;
21use super::shutdown_token::ShutdownToken;
22use super::stream::Stream;
23use super::stub::TonicStreaming as _;
24use super::transport::Transport;
25use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse};
26use crate::model::Message;
27use crate::{Error, Result};
28use futures::FutureExt;
29use futures::future::{BoxFuture, Shared};
30use gaxi::grpc::from_status::to_gax_error;
31use gaxi::prost::FromProto as _;
32use google_cloud_gax::retry_result::RetryResult;
33use std::collections::VecDeque;
34use std::sync::Arc;
35use tokio::sync::mpsc::{UnboundedSender, WeakUnboundedSender, unbounded_channel};
36use tokio::sync::oneshot::Receiver;
37use tokio::time::Duration;
38use tokio_util::sync::{CancellationToken, DropGuard};
39
40#[derive(Debug)]
58pub struct MessageStream {
59 inner: MessageStreamImpl,
66
67 lease_loop: Shared<BoxFuture<'static, ()>>,
69
70 shutdown: CancellationToken,
72
73 _shutdown_guard: DropGuard,
78}
79
80#[derive(Debug)]
81pub struct MessageStreamImpl {
82 stub: Arc<Transport>,
84
85 initial_req: StreamingPullRequest,
87
88 stream: Option<StreamState>,
99
100 pool: VecDeque<(Message, HandlerInfo)>,
106
107 message_tx: WeakUnboundedSender<NewMessage>,
110
111 ack_tx: WeakUnboundedSender<Action>,
114
115 shutdown: CancellationToken,
117}
118
119#[allow(clippy::large_enum_variant)]
122#[derive(Debug)]
123enum StreamState {
124 Closed,
127 Active(Stream<Transport>),
129}
130
131impl MessageStream {
132 pub(super) fn new(builder: Subscribe) -> Self {
133 let stub = builder.inner;
134 let subscription = builder.subscription;
135
136 let (confirmed_tx, confirmed_rx) = unbounded_channel();
137 let (eo_extend_tx, eo_extend_rx) = unbounded_channel();
138 let leaser = DefaultLeaser::new(
139 stub.clone(),
140 confirmed_tx,
141 eo_extend_tx,
142 subscription.clone(),
143 builder.ack_deadline_seconds,
144 builder.grpc_subchannel_count,
145 );
146 let options = LeaseOptions {
147 max_lease: builder.max_lease,
148 max_lease_extension: Duration::from_secs(builder.ack_deadline_seconds as u64),
149 shutdown_behavior: builder.shutdown_behavior,
150 ..Default::default()
151 };
152 let LeaseLoop {
153 handle,
154 message_tx,
155 ack_tx,
156 cancel: shutdown,
157 } = LeaseLoop::new(leaser, confirmed_rx, eo_extend_rx, options);
158 let lease_loop = handle.map(|_| ()).boxed().shared();
159 let _shutdown_guard = shutdown.clone().drop_guard();
160
161 let initial_req = StreamingPullRequest {
162 subscription,
163 stream_ack_deadline_seconds: builder.ack_deadline_seconds,
164 max_outstanding_messages: builder.max_outstanding_messages,
165 max_outstanding_bytes: builder.max_outstanding_bytes,
166 client_id: builder.client_id,
167 protocol_version: 1,
170 ..Default::default()
171 };
172
173 let inner = MessageStreamImpl {
174 stub,
175 initial_req,
176 stream: None,
177 pool: VecDeque::new(),
178 message_tx,
179 ack_tx,
180 shutdown: shutdown.clone(),
181 };
182 Self {
183 inner,
184 lease_loop,
185 shutdown,
186 _shutdown_guard,
187 }
188 }
189
190 pub async fn next(&mut self) -> Option<Result<(Message, Handler)>> {
212 let next = tokio::select! {
213 biased;
214 _ = self.shutdown.cancelled() => {
215 self.inner.close();
216 None
217 },
218 n = self.inner.next() => n,
219 };
220 next
221 }
222
223 #[cfg(feature = "unstable-stream")]
224 #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
225 pub fn into_stream(self) -> impl futures::Stream<Item = Result<(Message, Handler)>> + Unpin {
237 use futures::stream::unfold;
238 Box::pin(unfold(self, |mut stream| async move {
239 stream.next().await.map(|item| (item, stream))
240 }))
241 }
242
243 pub fn shutdown_token(&self) -> ShutdownToken {
266 ShutdownToken {
267 inner: self.shutdown.clone(),
268 fut: self.lease_loop.clone(),
270 }
271 }
272}
273
274impl MessageStreamImpl {
275 async fn next(&mut self) -> Option<Result<(Message, Handler)>> {
276 loop {
277 if let Some((m, hi)) = self.pool.pop_front() {
279 return Some(Ok((m, hi.into_handler(self.ack_tx.upgrade()?))));
280 }
281
282 if let Err(e) = self.populate_pool().await? {
289 match StreamRetryPolicy::on_midstream_error(e) {
291 RetryResult::Continue(_) => {
292 self.stream = None;
294 continue;
295 }
296 RetryResult::Permanent(e) | RetryResult::Exhausted(e) => {
297 self.close();
299 return Some(Err(e));
300 }
301 }
302 }
303 }
304 }
305
306 async fn open_stream(&mut self) -> Result<()> {
308 let stream = Stream::<Transport>::new(self.stub.clone(), self.initial_req.clone()).await?;
309 self.stream = Some(StreamState::Active(stream));
310 Ok(())
311 }
312
313 async fn next_response(&mut self) -> Option<Result<StreamingPullResponse>> {
320 if self.stream.is_none() {
321 if let Err(e) = self.open_stream().await {
323 return Some(Err(e));
324 }
325 }
326
327 let stream = match self.stream.as_mut()? {
328 StreamState::Closed => return None,
329 StreamState::Active(s) => s,
330 };
331 stream
332 .next_message()
333 .await
334 .map_err(to_gax_error)
335 .transpose()
336 }
337
338 async fn populate_pool(&mut self) -> Option<Result<()>> {
348 let resp = match self.next_response().await? {
350 Ok(resp) => resp,
351 Err(e) => return Some(Err(e)),
352 };
353
354 let exactly_once = resp
355 .subscription_properties
356 .is_some_and(|m| m.exactly_once_delivery_enabled);
357
358 for rm in resp.received_messages {
360 let Some(message) = rm.message else {
361 continue;
368 };
369
370 let delivery_attempt = (rm.delivery_attempt > 0).then_some(rm.delivery_attempt);
371
372 let (lease_info, handler_info) = if exactly_once {
373 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
374 (
375 LeaseInfo::ExactlyOnce(ExactlyOnceInfo::new(result_tx)),
376 HandlerInfo::ExactlyOnce {
377 ack_id: rm.ack_id.clone(),
378 result_rx,
379 delivery_attempt,
380 },
381 )
382 } else {
383 (
384 LeaseInfo::AtLeastOnce(AtLeastOnceInfo::new()),
385 HandlerInfo::AtLeastOnce {
386 ack_id: rm.ack_id.clone(),
387 delivery_attempt,
388 },
389 )
390 };
391
392 let _ = self.message_tx.upgrade()?.send(NewMessage {
393 ack_id: rm.ack_id,
394 lease_info,
395 });
396 let message = match message.cnv().map_err(Error::deser) {
397 Ok(message) => message,
398 Err(e) => return Some(Err(e)),
399 };
400 self.pool.push_back((message, handler_info));
401 }
402 Some(Ok(()))
403 }
404
405 fn close(&mut self) {
407 self.stream = Some(StreamState::Closed);
408 self.pool.clear();
409 self.shutdown.cancel();
410 }
411}
412
413#[derive(Debug)]
425enum HandlerInfo {
426 AtLeastOnce {
427 ack_id: String,
428 delivery_attempt: Option<i32>,
429 },
430 ExactlyOnce {
431 ack_id: String,
432 result_rx: Receiver<AckResult>,
433 delivery_attempt: Option<i32>,
434 },
435}
436
437impl HandlerInfo {
438 fn into_handler(self, ack_tx: UnboundedSender<Action>) -> Handler {
441 match self {
442 HandlerInfo::AtLeastOnce {
443 ack_id,
444 delivery_attempt,
445 } => Handler::AtLeastOnce(AtLeastOnce::new(ack_id, ack_tx, delivery_attempt)),
446 HandlerInfo::ExactlyOnce {
447 ack_id,
448 result_rx,
449 delivery_attempt,
450 } => Handler::ExactlyOnce(ExactlyOnce::new(
451 ack_id,
452 ack_tx,
453 result_rx,
454 delivery_attempt,
455 )),
456 }
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::super::ShutdownBehavior;
463 use super::super::client::Subscriber;
464 use super::super::keepalive::KEEPALIVE_PERIOD;
465 use super::super::lease_state::tests::{test_id, test_ids};
466 use super::super::stream::{INITIAL_DELAY, MAXIMUM_DELAY};
467 use super::*;
468 use gaxi::grpc::tonic::{Response as TonicResponse, Status as TonicStatus};
469 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
470 use google_cloud_test_macros::tokio_test_no_panics;
471 use pubsub_grpc_mock::google::pubsub::v1;
472 use pubsub_grpc_mock::{MockSubscriber, start};
473 use test_case::test_case;
474 use tokio::sync::mpsc::{channel, unbounded_channel};
475 use tokio::task::{JoinHandle, JoinSet};
476 use tokio::time::{Duration, Instant};
477
478 fn sorted(mut v: Vec<String>) -> Vec<String> {
479 v.sort();
480 v
481 }
482
483 fn test_data(v: i32) -> bytes::Bytes {
484 bytes::Bytes::from(format!("data-{}", test_id(v)))
485 }
486
487 fn test_response(range: std::ops::Range<i32>) -> v1::StreamingPullResponse {
488 v1::StreamingPullResponse {
489 received_messages: range
490 .into_iter()
491 .map(|i| v1::ReceivedMessage {
492 ack_id: test_id(i),
493 message: Some(v1::PubsubMessage {
494 data: test_data(i).to_vec(),
495 ..Default::default()
496 }),
497 ..Default::default()
498 })
499 .collect(),
500 ..Default::default()
501 }
502 }
503
504 fn test_exactly_once_response(range: std::ops::Range<i32>) -> v1::StreamingPullResponse {
505 v1::StreamingPullResponse {
506 subscription_properties: Some(v1::streaming_pull_response::SubscriptionProperties {
507 exactly_once_delivery_enabled: true,
508 ..Default::default()
509 }),
510 received_messages: range
511 .into_iter()
512 .map(|i| v1::ReceivedMessage {
513 ack_id: test_id(i),
514 message: Some(v1::PubsubMessage {
515 data: test_data(i).to_vec(),
516 ..Default::default()
517 }),
518 ..Default::default()
519 })
520 .collect(),
521 ..Default::default()
522 }
523 }
524
525 async fn test_client(endpoint: String) -> anyhow::Result<Subscriber> {
526 Ok(Subscriber::builder()
527 .with_endpoint(endpoint)
528 .with_credentials(Anonymous::new().build())
529 .build()
530 .await?)
531 }
532
533 #[tokio_test_no_panics]
534 async fn error_starting_stream() -> anyhow::Result<()> {
535 let mut mock = MockSubscriber::new();
536 mock.expect_streaming_pull()
537 .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
538 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
539 let client = test_client(endpoint).await?;
540 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
541 let err = stream
542 .next()
543 .await
544 .expect("stream should not be empty")
545 .expect_err("the first streamed item should be an error");
546 assert!(err.status().is_some(), "{err:?}");
547 let status = err.status().unwrap();
548 assert_eq!(
549 status.code,
550 google_cloud_gax::error::rpc::Code::FailedPrecondition
551 );
552 assert_eq!(status.message, "fail");
553
554 Ok(())
555 }
556
557 #[tokio_test_no_panics]
558 async fn permanent_error_ends_stream() -> anyhow::Result<()> {
559 let mut mock = MockSubscriber::new();
560 mock.expect_streaming_pull()
561 .returning(|_| Err(TonicStatus::failed_precondition("fail")));
562 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
563 let client = test_client(endpoint).await?;
564 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
565 let next = stream.next().await;
566 assert!(
567 matches!(next, Some(Err(_))),
568 "expected permanent error, got {next:?}"
569 );
570
571 let next = stream.next().await;
572 assert!(next.is_none(), "expected end of stream, got {next:?}");
573
574 Ok(())
575 }
576
577 #[tokio_test_no_panics]
578 async fn initial_request() -> anyhow::Result<()> {
579 const MIB: i64 = 1024 * 1024;
580
581 let (recover_writes_tx, mut recover_writes_rx) = channel(1);
584
585 let mut mock = MockSubscriber::new();
586 mock.expect_streaming_pull().return_once(move |request| {
587 tokio::spawn(async move {
588 let mut request_rx = request.into_inner();
591 while let Some(request) = request_rx.recv().await {
592 recover_writes_tx
593 .send(request)
594 .await
595 .expect("forwarding writes always succeeds");
596 }
597 });
598 Err(TonicStatus::failed_precondition("fail"))
599 });
600
601 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
602 let client = test_client(endpoint).await?;
603 let _ = client
604 .subscribe("projects/p/subscriptions/s")
605 .set_max_lease_extension(Duration::from_secs(20))
606 .set_max_outstanding_messages(2000)
607 .set_max_outstanding_bytes(200 * MIB)
608 .build()
609 .next()
610 .await;
611
612 let initial_req = recover_writes_rx
613 .recv()
614 .await
615 .expect("should receive a request")?;
616 assert_eq!(initial_req.subscription, "projects/p/subscriptions/s");
617 assert_eq!(initial_req.stream_ack_deadline_seconds, 20);
618 assert_eq!(initial_req.max_outstanding_messages, 2000);
619 assert_eq!(initial_req.max_outstanding_bytes, 200 * MIB);
620 assert!(
621 !initial_req.client_id.is_empty(),
622 "initial request has empty client id: {initial_req:?}"
623 );
624 assert!(
625 initial_req.protocol_version >= 1,
626 "protocol_version={}",
627 initial_req.protocol_version
628 );
629
630 Ok(())
631 }
632
633 #[tokio_test_no_panics(start_paused = true)]
634 async fn basic_success() -> anyhow::Result<()> {
635 let (response_tx, response_rx) = channel(10);
636 let (ack_tx, mut ack_rx) = unbounded_channel();
637
638 let mut mock = MockSubscriber::new();
639 mock.expect_streaming_pull()
640 .return_once(|_| Ok(TonicResponse::from(response_rx)));
641 mock.expect_acknowledge().returning(move |r| {
642 ack_tx
643 .send(r.into_inner())
644 .expect("sending on channel always succeeds");
645 Ok(TonicResponse::from(()))
646 });
647 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
648 let client = test_client(endpoint).await?;
649 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
650
651 response_tx.send(Ok(test_response(1..2))).await?;
652 response_tx.send(Ok(test_response(2..4))).await?;
653 response_tx.send(Ok(test_response(4..7))).await?;
654 drop(response_tx);
655
656 for i in 1..7 {
657 let Some((m, h)) = stream.next().await.transpose()? else {
658 anyhow::bail!("expected message {i}/6")
659 };
660 assert_eq!(m.data, test_data(i));
661 assert_eq!(h.ack_id(), test_id(i));
662 h.ack();
663 }
664 let end = stream.next().await.transpose()?;
665 assert!(end.is_none(), "Received extra message: {end:?}");
666
667 stream.shutdown_token().shutdown().await;
669
670 let ack_req = ack_rx.try_recv()?;
672 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
673 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..7));
674
675 Ok(())
676 }
677
678 #[test_case(0, None, false; "at_least_once_zero_maps_to_none")]
679 #[test_case(5, Some(5), false; "at_least_once_positive_maps_to_some")]
680 #[test_case(-1, None, false; "at_least_once_negative_maps_to_none")]
681 #[test_case(1, Some(1), false; "at_least_once_one_maps_to_some")]
682 #[test_case(i32::MAX, Some(i32::MAX), false; "at_least_once_max_maps_to_some")]
683 #[test_case(0, None, true; "exactly_once_zero_maps_to_none")]
684 #[test_case(5, Some(5), true; "exactly_once_positive_maps_to_some")]
685 #[test_case(-1, None, true; "exactly_once_negative_maps_to_none")]
686 #[test_case(1, Some(1), true; "exactly_once_one_maps_to_some")]
687 #[test_case(i32::MAX, Some(i32::MAX), true; "exactly_once_max_maps_to_some")]
688 #[tokio_test_no_panics(start_paused = true)]
689 async fn delivery_attempt_mapping(
690 input: i32,
691 expected: Option<i32>,
692 exactly_once: bool,
693 ) -> anyhow::Result<()> {
694 let (response_tx, response_rx) = channel(10);
695
696 let mut mock = MockSubscriber::new();
697 mock.expect_streaming_pull()
698 .return_once(|_| Ok(TonicResponse::from(response_rx)));
699
700 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
701 let client = test_client(endpoint).await?;
702 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
703
704 let resp = v1::StreamingPullResponse {
705 subscription_properties: Some(v1::streaming_pull_response::SubscriptionProperties {
706 exactly_once_delivery_enabled: exactly_once,
707 ..Default::default()
708 }),
709 received_messages: vec![v1::ReceivedMessage {
710 ack_id: test_id(0),
711 message: Some(v1::PubsubMessage {
712 data: test_data(0).to_vec(),
713 ..Default::default()
714 }),
715 delivery_attempt: input,
716 }],
717 ..Default::default()
718 };
719
720 response_tx.send(Ok(resp)).await?;
721 drop(response_tx);
722
723 let Some((_, h)) = stream.next().await.transpose()? else {
724 anyhow::bail!("expected message")
725 };
726 assert_eq!(h.delivery_attempt(), expected);
727
728 Ok(())
729 }
730
731 #[tokio_test_no_panics(start_paused = true)]
732 async fn basic_success_exactly_once() -> anyhow::Result<()> {
733 let (response_tx, response_rx) = channel(10);
734 let (ack_tx, mut ack_rx) = unbounded_channel();
735
736 let mut mock = MockSubscriber::new();
737 mock.expect_streaming_pull()
738 .return_once(|_| Ok(TonicResponse::from(response_rx)));
739 mock.expect_acknowledge().returning(move |r| {
740 ack_tx
741 .send(r.into_inner())
742 .expect("sending on channel always succeeds");
743 Ok(TonicResponse::from(()))
744 });
745 mock.expect_modify_ack_deadline()
746 .returning(|_| Ok(TonicResponse::from(())));
747 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
748 let client = test_client(endpoint).await?;
749 let mut stream = client
750 .subscribe("projects/p/subscriptions/s")
751 .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
752 .build();
753
754 response_tx
755 .send(Ok(test_exactly_once_response(1..2)))
756 .await?;
757 response_tx
758 .send(Ok(test_exactly_once_response(2..4)))
759 .await?;
760 response_tx
761 .send(Ok(test_exactly_once_response(4..7)))
762 .await?;
763 drop(response_tx);
764
765 let mut acks = JoinSet::new();
766 for i in 1..7 {
767 let Some((m, Handler::ExactlyOnce(h))) = stream.next().await.transpose()? else {
768 anyhow::bail!("expected message {i}/6")
769 };
770 assert_eq!(m.data, test_data(i));
771 assert_eq!(h.ack_id(), test_id(i));
772 acks.spawn(h.confirmed_ack());
773 }
774 let end = stream.next().await.transpose()?;
775 assert!(end.is_none(), "Received extra message: {end:?}");
776
777 stream.shutdown_token().shutdown().await;
779
780 while let Some(r) = acks.join_next().await {
782 r??;
783 }
784 let ack_req = ack_rx.try_recv()?;
785 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
786 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..7));
787
788 Ok(())
789 }
790
791 #[tokio_test_no_panics(start_paused = true)]
792 async fn basic_lease_management() -> anyhow::Result<()> {
793 let (response_tx, response_rx) = channel(10);
794 let (ack_tx, mut ack_rx) = unbounded_channel();
795 let (nack_tx, mut nack_rx) = unbounded_channel();
796 let (extend_tx, mut extend_rx) = unbounded_channel();
797
798 let mut mock = MockSubscriber::new();
799 mock.expect_streaming_pull()
800 .return_once(|_| Ok(TonicResponse::from(response_rx)));
801 mock.expect_acknowledge().returning(move |r| {
802 ack_tx
803 .send(r.into_inner())
804 .expect("sending on channel always succeeds");
805 Ok(TonicResponse::from(()))
806 });
807 mock.expect_modify_ack_deadline().returning(move |r| {
808 let r = r.into_inner();
809 if r.ack_deadline_seconds == 0 {
810 nack_tx.send(r).expect("sending on channel always succeeds");
811 } else {
812 extend_tx
813 .send(r)
814 .expect("sending on channel always succeeds");
815 }
816 Ok(TonicResponse::from(()))
817 });
818 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
819 let client = test_client(endpoint).await?;
820 let mut stream = client
821 .subscribe("projects/p/subscriptions/s")
822 .set_max_lease_extension(Duration::from_secs(10))
823 .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
824 .build();
825
826 response_tx.send(Ok(test_response(0..30))).await?;
827 drop(response_tx);
828
829 for i in 0..10 {
831 let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
832 anyhow::bail!("expected message {i}")
833 };
834 h.ack();
835 }
836 for i in 10..20 {
838 let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
839 anyhow::bail!("expected message {i}")
840 };
841 h.nack();
842 }
843 let mut hold = Vec::new();
845 for i in 20..30 {
846 let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
847 anyhow::bail!("expected message {i}")
848 };
849 hold.push(h);
850 }
851
852 tokio::time::advance(Duration::from_secs(10)).await;
855
856 stream.shutdown_token().shutdown().await;
858
859 let ack_req = ack_rx.try_recv()?;
861 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
862 assert_eq!(sorted(ack_req.ack_ids), test_ids(0..10));
863 assert!(ack_rx.is_empty(), "{ack_rx:?}");
864
865 let nack_req = nack_rx.try_recv()?;
867 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
868 assert_eq!(nack_req.ack_deadline_seconds, 0);
869 assert_eq!(sorted(nack_req.ack_ids), test_ids(10..20));
870
871 let nack_req = nack_rx.try_recv()?;
873 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
874 assert_eq!(nack_req.ack_deadline_seconds, 0);
875 assert_eq!(sorted(nack_req.ack_ids), test_ids(20..30));
876 assert!(nack_rx.is_empty(), "{nack_rx:?}");
877
878 let extend_req = extend_rx.try_recv()?;
880 assert_eq!(extend_req.subscription, "projects/p/subscriptions/s");
881 assert_eq!(extend_req.ack_deadline_seconds, 10);
882 assert_eq!(sorted(extend_req.ack_ids), test_ids(20..30));
883
884 Ok(())
885 }
886
887 #[tokio_test_no_panics(start_paused = true)]
888 async fn delayed_responses() -> anyhow::Result<()> {
889 let (response_tx, response_rx) = channel(10);
893 let handle: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
894 tokio::time::sleep(Duration::from_millis(20)).await;
895 response_tx.send(Ok(test_response(1..2))).await?;
896 Ok(())
897 });
898
899 let mut mock = MockSubscriber::new();
900 mock.expect_streaming_pull()
901 .return_once(|_| Ok(TonicResponse::from(response_rx)));
902 mock.expect_modify_ack_deadline()
903 .returning(|_| Ok(TonicResponse::from(())));
904 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
905 let client = test_client(endpoint).await?;
906 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
907 let (m, h) = stream
908 .next()
909 .await
910 .transpose()?
911 .expect("stream should wait for a message");
912 assert_eq!(m.data, test_data(1));
913 assert_eq!(h.ack_id(), test_id(1));
914
915 handle.await??;
916
917 Ok(())
918 }
919
920 #[tokio_test_no_panics]
921 async fn serves_messages_immediately() -> anyhow::Result<()> {
922 let (response_tx, response_rx) = channel(10);
927
928 let mut mock = MockSubscriber::new();
929 mock.expect_streaming_pull()
930 .return_once(|_| Ok(TonicResponse::from(response_rx)));
931 mock.expect_modify_ack_deadline()
932 .returning(|_| Ok(TonicResponse::from(())));
933 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
934 let client = test_client(endpoint).await?;
935 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
936
937 for i in 1..7 {
938 response_tx.send(Ok(test_response(i..i + 1))).await?;
939
940 let Some((m, h)) = stream.next().await.transpose()? else {
941 anyhow::bail!("expected message {i}/6")
942 };
943 assert_eq!(m.data, test_data(i));
944 assert_eq!(h.ack_id(), test_id(i));
945 }
946 drop(response_tx);
947 let end = stream.next().await.transpose()?;
948 assert!(end.is_none(), "Received extra message: {end:?}");
949
950 Ok(())
951 }
952
953 #[tokio_test_no_panics]
954 async fn handles_empty_response() -> anyhow::Result<()> {
955 let (response_tx, response_rx) = channel(10);
956
957 let mut mock = MockSubscriber::new();
958 mock.expect_streaming_pull()
959 .return_once(|_| Ok(TonicResponse::from(response_rx)));
960 mock.expect_modify_ack_deadline()
961 .returning(|_| Ok(TonicResponse::from(())));
962 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
963 let client = test_client(endpoint).await?;
964 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
965
966 response_tx.send(Ok(test_response(1..2))).await?;
967 response_tx.send(Ok(test_response(2..2))).await?;
969 response_tx.send(Ok(test_response(2..3))).await?;
970 drop(response_tx);
971
972 for i in 1..3 {
973 let Some((m, h)) = stream.next().await.transpose()? else {
974 anyhow::bail!("expected message {i}/2")
975 };
976 assert_eq!(m.data, test_data(i));
977 assert_eq!(h.ack_id(), test_id(i));
978 }
979 let end = stream.next().await.transpose()?;
980 assert!(end.is_none(), "Received extra message: {end:?}");
981
982 Ok(())
983 }
984
985 #[tokio_test_no_panics(start_paused = true)]
986 async fn handles_missing_message_field() -> anyhow::Result<()> {
987 let (response_tx, response_rx) = channel(10);
988 let (extend_tx, mut extend_rx) = unbounded_channel();
989
990 let bad = v1::StreamingPullResponse {
991 received_messages: vec![v1::ReceivedMessage {
992 ack_id: "ignored-ack-id".to_string(),
993 message: None,
994 ..Default::default()
995 }],
996 ..Default::default()
997 };
998
999 let mut mock = MockSubscriber::new();
1000 mock.expect_streaming_pull()
1001 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1002 mock.expect_modify_ack_deadline().returning(move |r| {
1003 let r = r.into_inner();
1004 if r.ack_deadline_seconds != 0 {
1005 extend_tx
1006 .send(r)
1007 .expect("sending on channel always succeeds");
1008 }
1009 Ok(TonicResponse::from(()))
1010 });
1011 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1012 let client = test_client(endpoint).await?;
1013 let mut stream = client
1014 .subscribe("projects/p/subscriptions/s")
1015 .set_max_lease_extension(Duration::from_secs(10))
1016 .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1017 .build();
1018
1019 response_tx.send(Ok(test_response(1..4))).await?;
1020 response_tx.send(Ok(bad)).await?;
1022 response_tx.send(Ok(test_response(4..7))).await?;
1023 drop(response_tx);
1024
1025 let mut handlers = Vec::new();
1026 for i in 1..7 {
1027 let Some((m, h)) = stream.next().await.transpose()? else {
1028 anyhow::bail!("expected message {i}/6")
1029 };
1030 assert_eq!(m.data, test_data(i));
1031 assert_eq!(h.ack_id(), test_id(i));
1032 handlers.push(h);
1033 }
1034
1035 tokio::time::advance(Duration::from_secs(10)).await;
1038
1039 stream.shutdown_token().shutdown().await;
1041
1042 let extend_req = extend_rx.try_recv()?;
1044 assert_eq!(extend_req.subscription, "projects/p/subscriptions/s");
1045 assert_eq!(extend_req.ack_deadline_seconds, 10);
1046 assert_eq!(sorted(extend_req.ack_ids), test_ids(1..7));
1048
1049 Ok(())
1050 }
1051
1052 #[tokio_test_no_panics]
1053 async fn permanent_error_midstream() -> anyhow::Result<()> {
1054 let (response_tx, response_rx) = channel(10);
1055
1056 let mut mock = MockSubscriber::new();
1057 mock.expect_streaming_pull()
1058 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1059 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1060 let client = test_client(endpoint).await?;
1061 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1062
1063 response_tx.send(Ok(test_response(1..4))).await?;
1064 response_tx
1065 .send(Err(TonicStatus::failed_precondition("fail")))
1066 .await?;
1067 drop(response_tx);
1068
1069 for i in 1..4 {
1070 let Some((m, h)) = stream.next().await.transpose()? else {
1071 anyhow::bail!("expected message {i}/3")
1072 };
1073 assert_eq!(m.data, test_data(i));
1074 assert_eq!(h.ack_id(), test_id(i));
1075 }
1076 let err = stream
1077 .next()
1078 .await
1079 .transpose()
1080 .expect_err("expected an error from stream");
1081 assert!(err.status().is_some(), "{err:?}");
1082 let status = err.status().unwrap();
1083 assert_eq!(
1084 status.code,
1085 google_cloud_gax::error::rpc::Code::FailedPrecondition
1086 );
1087 assert_eq!(status.message, "fail");
1088
1089 Ok(())
1090 }
1091
1092 #[tokio_test_no_panics(start_paused = true)]
1093 async fn keepalives() -> anyhow::Result<()> {
1094 let (recover_writes_tx, mut recover_writes_rx) = channel(1);
1097 let (response_tx, response_rx) = channel(10);
1098
1099 let mut mock = MockSubscriber::new();
1100 mock.expect_streaming_pull().return_once(move |request| {
1101 tokio::spawn(async move {
1102 let mut request_rx = request.into_inner();
1105 while let Some(request) = request_rx.recv().await {
1106 recover_writes_tx
1107 .send(request)
1108 .await
1109 .expect("forwarding writes always succeeds");
1110 }
1111 });
1112 Ok(TonicResponse::from(response_rx))
1113 });
1114 mock.expect_modify_ack_deadline()
1115 .returning(|_| Ok(TonicResponse::from(())));
1116
1117 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1118 let client = test_client(endpoint).await?;
1119 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1120 response_tx.send(Ok(test_response(1..4))).await?;
1121 let _ = stream.next().await;
1122
1123 let initial_req = recover_writes_rx
1124 .recv()
1125 .await
1126 .expect("should receive an initial request")?;
1127 assert_eq!(initial_req.subscription, "projects/p/subscriptions/s");
1128
1129 tokio::time::advance(KEEPALIVE_PERIOD).await;
1131 let keepalive_req = recover_writes_rx
1132 .recv()
1133 .await
1134 .expect("should receive a keepalive request")?;
1135 assert_eq!(keepalive_req, v1::StreamingPullRequest::default());
1136
1137 drop(stream);
1140
1141 tokio::time::advance(4 * KEEPALIVE_PERIOD).await;
1144 assert!(recover_writes_rx.is_empty(), "{recover_writes_rx:?}");
1145
1146 Ok(())
1147 }
1148
1149 #[tokio_test_no_panics]
1150 async fn client_id() -> anyhow::Result<()> {
1151 let (recover_writes_tx, mut recover_writes_rx) = channel(10);
1154 let recover_writes_tx = std::sync::Arc::new(tokio::sync::Mutex::new(recover_writes_tx));
1155
1156 let mut mock = MockSubscriber::new();
1157 mock.expect_streaming_pull()
1158 .times(3)
1159 .returning(move |request| {
1160 let tx = recover_writes_tx.clone();
1161 tokio::spawn(async move {
1162 let mut request_rx = request.into_inner();
1165 while let Some(request) = request_rx.recv().await {
1166 tx.lock()
1167 .await
1168 .send(request)
1169 .await
1170 .expect("forwarding writes always succeeds");
1171 }
1172 });
1173 Err(TonicStatus::failed_precondition("fail"))
1174 });
1175
1176 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1177
1178 let c1 = test_client(endpoint.clone()).await?;
1181 let _ = c1
1182 .subscribe("projects/p/subscriptions/s")
1183 .build()
1184 .next()
1185 .await;
1186 let req1 = recover_writes_rx
1187 .recv()
1188 .await
1189 .expect("should receive a request")?;
1190 let _ = c1
1191 .subscribe("projects/p/subscriptions/s")
1192 .build()
1193 .next()
1194 .await;
1195 let req2 = recover_writes_rx
1196 .recv()
1197 .await
1198 .expect("should receive a request")?;
1199 assert_eq!(req1.client_id, req2.client_id);
1200
1201 let c2 = test_client(endpoint).await?;
1204 let _ = c2
1205 .subscribe("projects/p/subscriptions/s")
1206 .build()
1207 .next()
1208 .await;
1209 let req3 = recover_writes_rx
1210 .recv()
1211 .await
1212 .expect("should receive a request")?;
1213 assert_ne!(req1.client_id, req3.client_id);
1214
1215 Ok(())
1216 }
1217
1218 #[tokio_test_no_panics(start_paused = true)]
1219 async fn no_immediate_message() -> anyhow::Result<()> {
1220 const TEST_TIMEOUT: Duration = Duration::from_secs(42);
1221
1222 let (_response_tx, response_rx) = channel(10);
1223
1224 let mut mock = MockSubscriber::new();
1225 mock.expect_streaming_pull()
1226 .return_once(move |_| Ok(TonicResponse::from(response_rx)));
1227
1228 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1229 let client = test_client(endpoint).await?;
1230 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1231
1232 let _ = tokio::time::timeout(TEST_TIMEOUT, stream.next())
1233 .await
1234 .expect_err("next() should never yield.");
1235
1236 Ok(())
1237 }
1238
1239 #[tokio_test_no_panics(start_paused = true)]
1240 async fn retry_transient_when_starting_stream() -> anyhow::Result<()> {
1241 const NUM_RETRIES: u32 = 20;
1245
1246 let start_time = Instant::now();
1247 let mut seq = mockall::Sequence::new();
1248 let mut mock = MockSubscriber::new();
1249
1250 mock.expect_streaming_pull()
1252 .times(NUM_RETRIES as usize)
1253 .in_sequence(&mut seq)
1254 .returning(|_| Err(TonicStatus::unavailable("try again")));
1255 mock.expect_streaming_pull()
1257 .times(1)
1258 .in_sequence(&mut seq)
1259 .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
1260 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1261 let client = test_client(endpoint).await?;
1262 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1263 let err = stream
1264 .next()
1265 .await
1266 .expect("stream should not be empty")
1267 .expect_err("the first streamed item should be an error");
1268 assert!(err.status().is_some(), "{err:?}");
1269 let status = err.status().unwrap();
1270 assert_eq!(
1271 status.code,
1272 google_cloud_gax::error::rpc::Code::FailedPrecondition
1273 );
1274 assert_eq!(status.message, "fail");
1275
1276 let elapsed = start_time.elapsed();
1277 assert!(
1278 elapsed <= MAXIMUM_DELAY * NUM_RETRIES,
1279 "elapsed={elapsed:?}"
1280 );
1281 assert!(
1282 elapsed >= INITIAL_DELAY * NUM_RETRIES,
1283 "elapsed={elapsed:?}"
1284 );
1285
1286 Ok(())
1287 }
1288
1289 #[tokio_test_no_panics(start_paused = true)]
1290 async fn resume_midstream_success() -> anyhow::Result<()> {
1291 let (response_tx_1, response_rx_1) = channel(10);
1292 let (response_tx_2, response_rx_2) = channel(10);
1293 let (response_tx_3, response_rx_3) = channel(10);
1294 let (ack_tx, mut ack_rx) = unbounded_channel();
1295
1296 let mut seq = mockall::Sequence::new();
1297 let mut mock = MockSubscriber::new();
1298 mock.expect_streaming_pull()
1299 .times(1)
1300 .in_sequence(&mut seq)
1301 .return_once(|_| Ok(TonicResponse::from(response_rx_1)));
1302 mock.expect_streaming_pull()
1303 .times(1)
1304 .in_sequence(&mut seq)
1305 .return_once(move |_| Ok(TonicResponse::from(response_rx_2)));
1306 mock.expect_streaming_pull()
1307 .times(1)
1308 .in_sequence(&mut seq)
1309 .return_once(|_| Ok(TonicResponse::from(response_rx_3)));
1310 mock.expect_acknowledge().times(1..).returning(move |r| {
1311 ack_tx
1312 .send(r.into_inner())
1313 .expect("sending on channel always succeeds");
1314 Ok(TonicResponse::from(()))
1315 });
1316 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1317 let client = test_client(endpoint).await?;
1318 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1319
1320 response_tx_1.send(Ok(test_response(0..10))).await?;
1321 response_tx_1.send(Ok(test_response(10..20))).await?;
1322 response_tx_1
1323 .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1324 .await?;
1325 drop(response_tx_1);
1326 response_tx_2.send(Ok(test_response(20..30))).await?;
1327 response_tx_2.send(Ok(test_response(30..40))).await?;
1328 response_tx_2
1329 .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1330 .await?;
1331 drop(response_tx_2);
1332 response_tx_3.send(Ok(test_response(40..50))).await?;
1333 drop(response_tx_3);
1334
1335 for i in 0..50 {
1336 let (m, h) = stream
1337 .next()
1338 .await
1339 .unwrap_or_else(|| panic!("expected message {}/50", i + 1))?;
1340 assert_eq!(m.data, test_data(i));
1341 h.ack();
1342 }
1343 let end = stream.next().await.transpose()?;
1344 assert!(end.is_none(), "Received extra message: {end:?}");
1345
1346 stream.shutdown_token().shutdown().await;
1348
1349 let mut got = Vec::new();
1351 while let Ok(ack_req) = ack_rx.try_recv() {
1352 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1353 got.extend(ack_req.ack_ids);
1354 }
1355 assert_eq!(sorted(got), test_ids(0..50));
1356
1357 Ok(())
1358 }
1359
1360 #[tokio_test_no_panics(start_paused = true)]
1361 async fn resume_midstream_hits_permanent_error() -> anyhow::Result<()> {
1362 let (response_tx, response_rx) = channel(10);
1363 let (ack_tx, mut ack_rx) = unbounded_channel();
1364
1365 let mut seq = mockall::Sequence::new();
1366 let mut mock = MockSubscriber::new();
1367 mock.expect_streaming_pull()
1369 .times(1)
1370 .in_sequence(&mut seq)
1371 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1372 mock.expect_streaming_pull()
1374 .times(3)
1375 .in_sequence(&mut seq)
1376 .returning(|_| Err(TonicStatus::unavailable("try again")));
1377 mock.expect_streaming_pull()
1379 .times(1)
1380 .in_sequence(&mut seq)
1381 .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
1382 mock.expect_acknowledge().times(1..).returning(move |r| {
1383 ack_tx
1384 .send(r.into_inner())
1385 .expect("sending on channel always succeeds");
1386 Ok(TonicResponse::from(()))
1387 });
1388 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1389 let client = test_client(endpoint).await?;
1390 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1391
1392 response_tx.send(Ok(test_response(0..10))).await?;
1393 response_tx.send(Ok(test_response(10..20))).await?;
1394 response_tx
1395 .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1396 .await?;
1397 drop(response_tx);
1398
1399 for i in 0..20 {
1400 let (m, h) = stream
1401 .next()
1402 .await
1403 .unwrap_or_else(|| panic!("expected message {}/20", i + 1))?;
1404 assert_eq!(m.data, test_data(i));
1405 h.ack();
1406 }
1407 let err = stream
1408 .next()
1409 .await
1410 .transpose()
1411 .expect_err("expected an error from stream");
1412 assert!(err.status().is_some(), "{err:?}");
1413 let status = err.status().unwrap();
1414 assert_eq!(
1415 status.code,
1416 google_cloud_gax::error::rpc::Code::FailedPrecondition
1417 );
1418 assert_eq!(status.message, "fail");
1419
1420 stream.shutdown_token().shutdown().await;
1422
1423 let mut got = Vec::new();
1425 while let Ok(ack_req) = ack_rx.try_recv() {
1426 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1427 got.extend(ack_req.ack_ids);
1428 }
1429 assert_eq!(sorted(got), test_ids(0..20));
1430
1431 Ok(())
1432 }
1433
1434 #[tokio_test_no_panics]
1435 async fn routing_header() -> anyhow::Result<()> {
1436 let mut mock = MockSubscriber::new();
1437
1438 mock.expect_streaming_pull().return_once(move |request| {
1439 let metadata = request.metadata();
1440 assert_eq!(
1441 metadata
1442 .get("x-goog-request-params")
1443 .expect("routing header missing"),
1444 "subscription=projects/p/subscriptions/s"
1445 );
1446 Err(TonicStatus::failed_precondition("ignored"))
1447 });
1448
1449 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1450 let client = test_client(endpoint).await?;
1451
1452 let _ = client
1453 .subscribe("projects/p/subscriptions/s")
1454 .build()
1455 .next()
1456 .await;
1457
1458 Ok(())
1459 }
1460
1461 #[cfg(feature = "unstable-stream")]
1462 #[tokio_test_no_panics(start_paused = true)]
1463 async fn into_stream() -> anyhow::Result<()> {
1464 use futures::TryStreamExt;
1465 let (response_tx, response_rx) = channel(10);
1466 let (ack_tx, mut ack_rx) = unbounded_channel();
1467
1468 let mut mock = MockSubscriber::new();
1469 mock.expect_streaming_pull()
1470 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1471 mock.expect_acknowledge().returning(move |r| {
1472 ack_tx
1473 .send(r.into_inner())
1474 .expect("sending on channel always succeeds");
1475 Ok(TonicResponse::from(()))
1476 });
1477
1478 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1479 let client = test_client(endpoint).await?;
1480
1481 let stream = client
1482 .subscribe("projects/p/subscriptions/s")
1483 .build()
1484 .into_stream();
1485
1486 response_tx.send(Ok(test_response(1..3))).await?;
1487 drop(response_tx);
1488
1489 let got: Vec<_> = stream
1490 .map_ok(|(m, h)| {
1491 h.ack();
1492 m.data
1493 })
1494 .try_collect()
1495 .await?;
1496 assert_eq!(got, vec![test_data(1), test_data(2)]);
1497
1498 let ack_req = ack_rx
1499 .recv()
1500 .await
1501 .expect("should receive acknowledgements");
1502 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1503 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..3));
1504
1505 Ok(())
1506 }
1507
1508 #[tokio_test_no_panics(start_paused = true)]
1509 async fn basic_lease_expiration() -> anyhow::Result<()> {
1510 const MAX_LEASE_EXTENSION: Duration = Duration::from_secs(10);
1511 const MAX_LEASE: Duration = Duration::from_secs(30);
1512 let (response_tx, response_rx) = channel(10);
1517 let (extend_tx, mut extend_rx) = unbounded_channel();
1518
1519 let mut mock = MockSubscriber::new();
1520 mock.expect_streaming_pull()
1521 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1522 mock.expect_modify_ack_deadline().returning(move |r| {
1523 extend_tx
1524 .send(r.into_inner())
1525 .expect("sending on channel always succeeds");
1526 Ok(TonicResponse::from(()))
1527 });
1528 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1529 let client = test_client(endpoint).await?;
1530 let mut stream = client
1531 .subscribe("projects/p/subscriptions/s")
1532 .set_max_lease(MAX_LEASE)
1533 .set_max_lease_extension(MAX_LEASE_EXTENSION)
1534 .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1535 .build();
1536
1537 response_tx.send(Ok(test_response(0..1))).await?;
1538 drop(response_tx);
1539
1540 let (_m, _h) = stream
1541 .next()
1542 .await
1543 .expect("stream should yield a message")?;
1544
1545 let start_time = Instant::now();
1548 let mut latest = None;
1549 for _ in 0..MAX_LEASE.as_secs() * 2 {
1550 while let Ok(r) = extend_rx.try_recv() {
1551 assert_ne!(r.ack_deadline_seconds, 0, "unexpectedly received a nack");
1552 latest = Some(start_time.elapsed());
1553 }
1554 tokio::time::advance(Duration::from_secs(1)).await;
1555 tokio::task::yield_now().await;
1556 }
1557
1558 let expected_range = (MAX_LEASE - MAX_LEASE_EXTENSION)..=MAX_LEASE;
1560 assert!(
1561 latest.is_some_and(|t| expected_range.contains(&t)),
1562 "{latest:?}"
1563 );
1564
1565 stream.shutdown_token().shutdown().await;
1567
1568 Ok(())
1569 }
1570
1571 #[tokio_test_no_panics(start_paused = true)]
1572 async fn shutdown_wait_for_processing() -> anyhow::Result<()> {
1573 let (response_tx, response_rx) = channel(10);
1574
1575 let mut mock = MockSubscriber::new();
1576 mock.expect_streaming_pull()
1577 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1578 mock.expect_acknowledge()
1579 .times(1)
1580 .returning(|_| Ok(TonicResponse::from(())));
1581 mock.expect_modify_ack_deadline()
1582 .returning(|_| Ok(TonicResponse::from(())));
1583 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1584 let client = test_client(endpoint).await?;
1585 let mut stream = client
1586 .subscribe("projects/p/subscriptions/s")
1587 .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
1588 .build();
1589
1590 response_tx.send(Ok(test_response(0..1))).await?;
1591 drop(response_tx);
1592
1593 let (_m, h) = stream
1594 .next()
1595 .await
1596 .expect("stream should yield a message")?;
1597
1598 tokio::spawn(async move {
1599 tokio::time::sleep(Duration::from_secs(5)).await;
1602 h.ack();
1603 });
1604
1605 stream.shutdown_token().shutdown().await;
1607
1608 Ok(())
1609 }
1610
1611 #[tokio_test_no_panics(start_paused = true)]
1612 async fn at_least_once_and_exactly_once() -> anyhow::Result<()> {
1613 let (response_tx, response_rx) = channel(10);
1614
1615 let mut mock = MockSubscriber::new();
1616 mock.expect_streaming_pull()
1617 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1618 mock.expect_modify_ack_deadline()
1619 .returning(|_| Ok(TonicResponse::from(())));
1620 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1621 let client = test_client(endpoint).await?;
1622 let mut stream = client
1623 .subscribe("projects/p/subscriptions/s")
1624 .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1625 .build();
1626
1627 response_tx.send(Ok(test_response(0..1))).await?;
1628 response_tx
1629 .send(Ok(test_exactly_once_response(1..2)))
1630 .await?;
1631 response_tx.send(Ok(test_response(2..3))).await?;
1632 response_tx
1633 .send(Ok(test_exactly_once_response(3..4)))
1634 .await?;
1635 drop(response_tx);
1636
1637 let (m, h) = stream.next().await.expect("should yield a message")?;
1638 assert_eq!(m.data, test_data(0));
1639 assert_eq!(h.ack_id(), test_id(0));
1640 assert!(matches!(h, Handler::AtLeastOnce(_)), "{h:?}");
1641
1642 let (m, h) = stream.next().await.expect("should yield a message")?;
1643 assert_eq!(m.data, test_data(1));
1644 assert_eq!(h.ack_id(), test_id(1));
1645 assert!(matches!(h, Handler::ExactlyOnce(_)), "{h:?}");
1646
1647 let (m, h) = stream.next().await.expect("should yield a message")?;
1648 assert_eq!(m.data, test_data(2));
1649 assert_eq!(h.ack_id(), test_id(2));
1650 assert!(matches!(h, Handler::AtLeastOnce(_)), "{h:?}");
1651
1652 let (m, h) = stream.next().await.expect("should yield a message")?;
1653 assert_eq!(m.data, test_data(3));
1654 assert_eq!(h.ack_id(), test_id(3));
1655 assert!(matches!(h, Handler::ExactlyOnce(_)), "{h:?}");
1656
1657 let end = stream.next().await.transpose()?;
1658 assert!(end.is_none(), "Received extra message: {end:?}");
1659
1660 stream.shutdown_token().shutdown().await;
1662
1663 Ok(())
1664 }
1665
1666 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1667 async fn cancel_before_open() -> anyhow::Result<()> {
1668 let mut mock = MockSubscriber::new();
1669 mock.expect_streaming_pull()
1670 .returning(|_| Err(TonicStatus::unavailable("try again")));
1671 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1672 let client = test_client(endpoint).await?;
1673 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1674 let shutdown_token = stream.shutdown_token();
1675
1676 let next = tokio::spawn(async move { stream.next().await });
1677 shutdown_token.shutdown().await;
1678
1679 let end = next.await?;
1680 assert!(end.is_none(), "Shutdown should end the stream, got {end:?}");
1681
1682 Ok(())
1683 }
1684
1685 #[tokio_test_no_panics(start_paused = true)]
1686 async fn cancel_midstream() -> anyhow::Result<()> {
1687 let (response_tx, response_rx) = channel(10);
1688 let (ack_tx, mut ack_rx) = unbounded_channel();
1689 let (nack_tx, mut nack_rx) = unbounded_channel();
1690
1691 let mut mock = MockSubscriber::new();
1692 mock.expect_streaming_pull()
1693 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1694 mock.expect_acknowledge().times(1).returning(move |r| {
1695 ack_tx
1696 .send(r.into_inner())
1697 .expect("sending on channel always succeeds");
1698 Ok(TonicResponse::from(()))
1699 });
1700 mock.expect_modify_ack_deadline()
1701 .times(1)
1702 .returning(move |r| {
1703 nack_tx
1704 .send(r.into_inner())
1705 .expect("sending on channel always succeeds");
1706 Ok(TonicResponse::from(()))
1707 });
1708 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1709 let client = test_client(endpoint).await?;
1710 let mut stream = client
1711 .subscribe("projects/p/subscriptions/s")
1712 .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
1713 .build();
1714 let shutdown_token = stream.shutdown_token();
1715
1716 response_tx.send(Ok(test_response(1..10))).await?;
1717 for i in 1..6 {
1718 let Some((m, h)) = stream.next().await.transpose()? else {
1719 anyhow::bail!("expected message {i}/5")
1720 };
1721 assert_eq!(m.data, test_data(i));
1722 h.ack();
1723 }
1724 let shutdown = tokio::spawn(async move {
1725 shutdown_token.shutdown().await;
1726 });
1727 tokio::task::yield_now().await;
1728 let end = stream.next().await.transpose()?;
1729 assert!(end.is_none(), "Shutdown should end the stream, got {end:?}");
1730
1731 shutdown.await?;
1734
1735 let ack_req = ack_rx.try_recv()?;
1736 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1737 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1738
1739 let nack_req = nack_rx.try_recv()?;
1740 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1741 assert_eq!(nack_req.ack_deadline_seconds, 0);
1742 assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1743
1744 Ok(())
1745 }
1746
1747 #[test_case(ShutdownBehavior::NackImmediately)]
1748 #[test_case(ShutdownBehavior::WaitForProcessing)]
1749 #[tokio_test_no_panics(start_paused = true)]
1750 async fn shutdown_without_next(shutdown_behavior: ShutdownBehavior) -> anyhow::Result<()> {
1751 let (response_tx, response_rx) = channel(10);
1752 let (ack_tx, mut ack_rx) = unbounded_channel();
1753 let (nack_tx, mut nack_rx) = unbounded_channel();
1754
1755 let mut mock = MockSubscriber::new();
1756 mock.expect_streaming_pull()
1757 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1758 mock.expect_acknowledge().times(1).returning(move |r| {
1759 ack_tx
1760 .send(r.into_inner())
1761 .expect("sending on channel always succeeds");
1762 Ok(TonicResponse::from(()))
1763 });
1764 mock.expect_modify_ack_deadline()
1765 .times(1)
1766 .returning(move |r| {
1767 nack_tx
1768 .send(r.into_inner())
1769 .expect("sending on channel always succeeds");
1770 Ok(TonicResponse::from(()))
1771 });
1772 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1773 let client = test_client(endpoint).await?;
1774 let mut stream = client
1775 .subscribe("projects/p/subscriptions/s")
1776 .set_shutdown_behavior(shutdown_behavior)
1777 .build();
1778 let shutdown_token = stream.shutdown_token();
1779
1780 response_tx.send(Ok(test_response(1..10))).await?;
1781 for i in 1..6 {
1782 let Some((m, h)) = stream.next().await.transpose()? else {
1783 anyhow::bail!("expected message {i}/5")
1784 };
1785 assert_eq!(m.data, test_data(i));
1786 h.ack();
1787 }
1788 shutdown_token.shutdown().await;
1791
1792 let ack_req = ack_rx.try_recv()?;
1793 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1794 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1795
1796 let nack_req = nack_rx.try_recv()?;
1797 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1798 assert_eq!(nack_req.ack_deadline_seconds, 0);
1799 assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1800
1801 Ok(())
1802 }
1803
1804 #[test_case(ShutdownBehavior::NackImmediately)]
1805 #[test_case(ShutdownBehavior::WaitForProcessing)]
1806 #[tokio_test_no_panics(start_paused = true)]
1807 async fn stream_error_initiates_shutdown(
1808 shutdown_behavior: ShutdownBehavior,
1809 ) -> anyhow::Result<()> {
1810 let (response_tx, response_rx) = channel(10);
1811 let (ack_tx, mut ack_rx) = unbounded_channel();
1812
1813 let mut mock = MockSubscriber::new();
1814 mock.expect_streaming_pull()
1815 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1816 mock.expect_acknowledge().times(1).returning(move |r| {
1817 ack_tx
1818 .send(r.into_inner())
1819 .expect("sending on channel always succeeds");
1820 Ok(TonicResponse::from(()))
1821 });
1822 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1823 let client = test_client(endpoint).await?;
1824 let mut stream = client
1825 .subscribe("projects/p/subscriptions/s")
1826 .set_shutdown_behavior(shutdown_behavior)
1827 .build();
1828 let shutdown_token = stream.shutdown_token();
1829
1830 response_tx.send(Ok(test_response(0..1))).await?;
1831 response_tx
1832 .send(Err(TonicStatus::failed_precondition("fail")))
1833 .await?;
1834 drop(response_tx);
1835
1836 let (m, h) = stream.next().await.expect("should yield a message")?;
1837 assert_eq!(m.data, test_data(0));
1838 h.ack();
1839
1840 let err = stream.next().await.expect("should yield an error");
1841 assert!(err.is_err(), "{err:?}");
1842
1843 shutdown_token.wait_for_shutdown().await;
1846
1847 let ack_req = ack_rx.try_recv()?;
1848 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1849 assert_eq!(ack_req.ack_ids, test_ids(0..1));
1850
1851 Ok(())
1852 }
1853
1854 #[test_case(ShutdownBehavior::NackImmediately)]
1855 #[test_case(ShutdownBehavior::WaitForProcessing)]
1856 #[tokio_test_no_panics(start_paused = true)]
1857 async fn drop_cancels(shutdown_behavior: ShutdownBehavior) -> anyhow::Result<()> {
1858 let (response_tx, response_rx) = channel(10);
1859 let (ack_tx, mut ack_rx) = unbounded_channel();
1860 let (nack_tx, mut nack_rx) = unbounded_channel();
1861
1862 let mut mock = MockSubscriber::new();
1863 mock.expect_streaming_pull()
1864 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1865 mock.expect_acknowledge().times(1).returning(move |r| {
1866 ack_tx
1867 .send(r.into_inner())
1868 .expect("sending on channel always succeeds");
1869 Ok(TonicResponse::from(()))
1870 });
1871 mock.expect_modify_ack_deadline()
1872 .times(1)
1873 .returning(move |r| {
1874 nack_tx
1875 .send(r.into_inner())
1876 .expect("sending on channel always succeeds");
1877 Ok(TonicResponse::from(()))
1878 });
1879 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1880 let client = test_client(endpoint).await?;
1881 let mut stream = client
1882 .subscribe("projects/p/subscriptions/s")
1883 .set_shutdown_behavior(shutdown_behavior)
1884 .build();
1885 let shutdown_token = stream.shutdown_token();
1886
1887 response_tx.send(Ok(test_response(1..10))).await?;
1888 for i in 1..6 {
1889 let Some((m, h)) = stream.next().await.transpose()? else {
1890 anyhow::bail!("expected message {i}/5")
1891 };
1892 assert_eq!(m.data, test_data(i));
1893 h.ack();
1894 }
1895 drop(stream); shutdown_token.wait_for_shutdown().await;
1897
1898 let ack_req = ack_rx.try_recv()?;
1899 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1900 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1901
1902 let nack_req = nack_rx.try_recv()?;
1903 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1904 assert_eq!(nack_req.ack_deadline_seconds, 0);
1905 assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1906
1907 Ok(())
1908 }
1909}