1use super::builder::Subscribe;
16use super::handler::{AckResult, Action, AtLeastOnce, ExactlyOnce, Handler};
17use super::lease_loop::LeaseLoop;
18use super::lease_state::{AtLeastOnceInfo, ExactlyOnceInfo, LeaseInfo, LeaseOptions, NewMessage};
19use super::leaser::DefaultLeaser;
20use super::retry_policy::StreamRetryPolicy;
21use super::shutdown_token::ShutdownToken;
22use super::stream::Stream;
23use super::stub::TonicStreaming as _;
24use super::transport::Transport;
25use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse};
26use crate::model::Message;
27use crate::{Error, Result};
28use futures::FutureExt;
29use futures::future::{BoxFuture, Shared};
30use gaxi::grpc::from_status::to_gax_error;
31use gaxi::prost::FromProto as _;
32use google_cloud_gax::retry_result::RetryResult;
33use std::collections::VecDeque;
34use std::sync::Arc;
35use tokio::sync::mpsc::{UnboundedSender, WeakUnboundedSender, unbounded_channel};
36use tokio::sync::oneshot::Receiver;
37use tokio::time::Duration;
38use tokio_util::sync::{CancellationToken, DropGuard};
39
40#[derive(Debug)]
58pub struct MessageStream {
59 inner: MessageStreamImpl,
66
67 lease_loop: Shared<BoxFuture<'static, ()>>,
69
70 shutdown: CancellationToken,
72
73 _shutdown_guard: DropGuard,
78}
79
80#[derive(Debug)]
81pub struct MessageStreamImpl {
82 stub: Arc<Transport>,
84
85 initial_req: StreamingPullRequest,
87
88 stream: Option<StreamState>,
99
100 pool: VecDeque<(Message, HandlerInfo)>,
106
107 message_tx: WeakUnboundedSender<NewMessage>,
110
111 ack_tx: WeakUnboundedSender<Action>,
114
115 shutdown: CancellationToken,
117}
118
119#[allow(clippy::large_enum_variant)]
122#[derive(Debug)]
123enum StreamState {
124 Closed,
127 Active(Stream<Transport>),
129}
130
131impl MessageStream {
132 pub(super) fn new(builder: Subscribe) -> Self {
133 let stub = builder.inner;
134 let subscription = builder.subscription;
135
136 let (confirmed_tx, confirmed_rx) = unbounded_channel();
137 let leaser = DefaultLeaser::new(
138 stub.clone(),
139 confirmed_tx,
140 subscription.clone(),
141 builder.ack_deadline_seconds,
142 builder.grpc_subchannel_count,
143 );
144 let options = LeaseOptions {
145 max_lease: builder.max_lease,
146 max_lease_extension: Duration::from_secs(builder.ack_deadline_seconds as u64),
147 shutdown_behavior: builder.shutdown_behavior,
148 ..Default::default()
149 };
150 let LeaseLoop {
151 handle,
152 message_tx,
153 ack_tx,
154 } = LeaseLoop::new(leaser, confirmed_rx, options);
155 let lease_loop = handle.map(|_| ()).boxed().shared();
156
157 let weak_message_tx = message_tx.downgrade();
158 let weak_ack_tx = ack_tx.downgrade();
159
160 let shutdown = CancellationToken::new();
161 let shutdown_clone = shutdown.clone();
162 let _shutdown_guard = shutdown.clone().drop_guard();
163 tokio::spawn(async move {
164 shutdown_clone.cancelled().await;
169 drop(message_tx);
170 drop(ack_tx);
171 });
172
173 let initial_req = StreamingPullRequest {
174 subscription,
175 stream_ack_deadline_seconds: builder.ack_deadline_seconds,
176 max_outstanding_messages: builder.max_outstanding_messages,
177 max_outstanding_bytes: builder.max_outstanding_bytes,
178 client_id: builder.client_id,
179 protocol_version: 1,
182 ..Default::default()
183 };
184
185 let inner = MessageStreamImpl {
186 stub,
187 initial_req,
188 stream: None,
189 pool: VecDeque::new(),
190 message_tx: weak_message_tx,
191 ack_tx: weak_ack_tx,
192 shutdown: shutdown.clone(),
193 };
194 Self {
195 inner,
196 lease_loop,
197 shutdown,
198 _shutdown_guard,
199 }
200 }
201
202 pub async fn next(&mut self) -> Option<Result<(Message, Handler)>> {
224 let next = tokio::select! {
225 biased;
226 _ = self.shutdown.cancelled() => {
227 self.inner.close();
228 None
229 },
230 n = self.inner.next() => n,
231 };
232 next
233 }
234
235 #[cfg(feature = "unstable-stream")]
236 #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
237 pub fn into_stream(self) -> impl futures::Stream<Item = Result<(Message, Handler)>> + Unpin {
249 use futures::stream::unfold;
250 Box::pin(unfold(self, |mut stream| async move {
251 stream.next().await.map(|item| (item, stream))
252 }))
253 }
254
255 pub fn shutdown_token(&self) -> ShutdownToken {
278 ShutdownToken {
279 inner: self.shutdown.clone(),
280 fut: self.lease_loop.clone(),
282 }
283 }
284}
285
286impl MessageStreamImpl {
287 async fn next(&mut self) -> Option<Result<(Message, Handler)>> {
288 loop {
289 if let Some((m, hi)) = self.pool.pop_front() {
291 return Some(Ok((m, hi.into_handler(self.ack_tx.upgrade()?))));
292 }
293
294 if let Err(e) = self.populate_pool().await? {
301 match StreamRetryPolicy::on_midstream_error(e) {
303 RetryResult::Continue(_) => {
304 self.stream = None;
306 continue;
307 }
308 RetryResult::Permanent(e) | RetryResult::Exhausted(e) => {
309 self.close();
311 return Some(Err(e));
312 }
313 }
314 }
315 }
316 }
317
318 async fn open_stream(&mut self) -> Result<()> {
320 let stream = Stream::<Transport>::new(self.stub.clone(), self.initial_req.clone()).await?;
321 self.stream = Some(StreamState::Active(stream));
322 Ok(())
323 }
324
325 async fn next_response(&mut self) -> Option<Result<StreamingPullResponse>> {
332 if self.stream.is_none() {
333 if let Err(e) = self.open_stream().await {
335 return Some(Err(e));
336 }
337 }
338
339 let stream = match self.stream.as_mut()? {
340 StreamState::Closed => return None,
341 StreamState::Active(s) => s,
342 };
343 stream
344 .next_message()
345 .await
346 .map_err(to_gax_error)
347 .transpose()
348 }
349
350 async fn populate_pool(&mut self) -> Option<Result<()>> {
360 let resp = match self.next_response().await? {
362 Ok(resp) => resp,
363 Err(e) => return Some(Err(e)),
364 };
365
366 let exactly_once = resp
367 .subscription_properties
368 .is_some_and(|m| m.exactly_once_delivery_enabled);
369
370 for rm in resp.received_messages {
372 let Some(message) = rm.message else {
373 continue;
380 };
381
382 let (lease_info, handler_info) = if exactly_once {
383 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
384 (
385 LeaseInfo::ExactlyOnce(ExactlyOnceInfo::new(result_tx)),
386 HandlerInfo::ExactlyOnce {
387 ack_id: rm.ack_id.clone(),
388 result_rx,
389 },
390 )
391 } else {
392 (
393 LeaseInfo::AtLeastOnce(AtLeastOnceInfo::new()),
394 HandlerInfo::AtLeastOnce {
395 ack_id: rm.ack_id.clone(),
396 },
397 )
398 };
399
400 let _ = self.message_tx.upgrade()?.send(NewMessage {
401 ack_id: rm.ack_id,
402 lease_info,
403 });
404 let message = match message.cnv().map_err(Error::deser) {
405 Ok(message) => message,
406 Err(e) => return Some(Err(e)),
407 };
408 self.pool.push_back((message, handler_info));
409 }
410 Some(Ok(()))
411 }
412
413 fn close(&mut self) {
415 self.stream = Some(StreamState::Closed);
416 self.pool.clear();
417 self.shutdown.cancel();
418 }
419}
420
421#[derive(Debug)]
433enum HandlerInfo {
434 AtLeastOnce {
435 ack_id: String,
436 },
437 ExactlyOnce {
438 ack_id: String,
439 result_rx: Receiver<AckResult>,
440 },
441}
442
443impl HandlerInfo {
444 fn into_handler(self, ack_tx: UnboundedSender<Action>) -> Handler {
447 match self {
448 HandlerInfo::AtLeastOnce { ack_id } => {
449 Handler::AtLeastOnce(AtLeastOnce::new(ack_id, ack_tx))
450 }
451 HandlerInfo::ExactlyOnce { ack_id, result_rx } => {
452 Handler::ExactlyOnce(ExactlyOnce::new(ack_id, ack_tx, result_rx))
453 }
454 }
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::super::ShutdownBehavior;
461 use super::super::client::Subscriber;
462 use super::super::keepalive::KEEPALIVE_PERIOD;
463 use super::super::lease_state::tests::{test_id, test_ids};
464 use super::super::stream::{INITIAL_DELAY, MAXIMUM_DELAY};
465 use super::*;
466 use gaxi::grpc::tonic::{Response as TonicResponse, Status as TonicStatus};
467 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
468 use google_cloud_test_macros::tokio_test_no_panics;
469 use pubsub_grpc_mock::google::pubsub::v1;
470 use pubsub_grpc_mock::{MockSubscriber, start};
471 use test_case::test_case;
472 use tokio::sync::mpsc::{channel, unbounded_channel};
473 use tokio::task::{JoinHandle, JoinSet};
474 use tokio::time::{Duration, Instant};
475
476 fn sorted(mut v: Vec<String>) -> Vec<String> {
477 v.sort();
478 v
479 }
480
481 fn test_data(v: i32) -> bytes::Bytes {
482 bytes::Bytes::from(format!("data-{}", test_id(v)))
483 }
484
485 fn test_response(range: std::ops::Range<i32>) -> v1::StreamingPullResponse {
486 v1::StreamingPullResponse {
487 received_messages: range
488 .into_iter()
489 .map(|i| v1::ReceivedMessage {
490 ack_id: test_id(i),
491 message: Some(v1::PubsubMessage {
492 data: test_data(i).to_vec(),
493 ..Default::default()
494 }),
495 ..Default::default()
496 })
497 .collect(),
498 ..Default::default()
499 }
500 }
501
502 fn test_exactly_once_response(range: std::ops::Range<i32>) -> v1::StreamingPullResponse {
503 v1::StreamingPullResponse {
504 subscription_properties: Some(v1::streaming_pull_response::SubscriptionProperties {
505 exactly_once_delivery_enabled: true,
506 ..Default::default()
507 }),
508 received_messages: range
509 .into_iter()
510 .map(|i| v1::ReceivedMessage {
511 ack_id: test_id(i),
512 message: Some(v1::PubsubMessage {
513 data: test_data(i).to_vec(),
514 ..Default::default()
515 }),
516 ..Default::default()
517 })
518 .collect(),
519 ..Default::default()
520 }
521 }
522
523 async fn test_client(endpoint: String) -> anyhow::Result<Subscriber> {
524 Ok(Subscriber::builder()
525 .with_endpoint(endpoint)
526 .with_credentials(Anonymous::new().build())
527 .build()
528 .await?)
529 }
530
531 #[tokio_test_no_panics]
532 async fn error_starting_stream() -> anyhow::Result<()> {
533 let mut mock = MockSubscriber::new();
534 mock.expect_streaming_pull()
535 .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
536 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
537 let client = test_client(endpoint).await?;
538 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
539 let err = stream
540 .next()
541 .await
542 .expect("stream should not be empty")
543 .expect_err("the first streamed item should be an error");
544 assert!(err.status().is_some(), "{err:?}");
545 let status = err.status().unwrap();
546 assert_eq!(
547 status.code,
548 google_cloud_gax::error::rpc::Code::FailedPrecondition
549 );
550 assert_eq!(status.message, "fail");
551
552 Ok(())
553 }
554
555 #[tokio_test_no_panics]
556 async fn permanent_error_ends_stream() -> anyhow::Result<()> {
557 let mut mock = MockSubscriber::new();
558 mock.expect_streaming_pull()
559 .returning(|_| Err(TonicStatus::failed_precondition("fail")));
560 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
561 let client = test_client(endpoint).await?;
562 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
563 let next = stream.next().await;
564 assert!(
565 matches!(next, Some(Err(_))),
566 "expected permanent error, got {next:?}"
567 );
568
569 let next = stream.next().await;
570 assert!(next.is_none(), "expected end of stream, got {next:?}");
571
572 Ok(())
573 }
574
575 #[tokio_test_no_panics]
576 async fn initial_request() -> anyhow::Result<()> {
577 const MIB: i64 = 1024 * 1024;
578
579 let (recover_writes_tx, mut recover_writes_rx) = channel(1);
582
583 let mut mock = MockSubscriber::new();
584 mock.expect_streaming_pull().return_once(move |request| {
585 tokio::spawn(async move {
586 let mut request_rx = request.into_inner();
589 while let Some(request) = request_rx.recv().await {
590 recover_writes_tx
591 .send(request)
592 .await
593 .expect("forwarding writes always succeeds");
594 }
595 });
596 Err(TonicStatus::failed_precondition("fail"))
597 });
598
599 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
600 let client = test_client(endpoint).await?;
601 let _ = client
602 .subscribe("projects/p/subscriptions/s")
603 .set_max_lease_extension(Duration::from_secs(20))
604 .set_max_outstanding_messages(2000)
605 .set_max_outstanding_bytes(200 * MIB)
606 .build()
607 .next()
608 .await;
609
610 let initial_req = recover_writes_rx
611 .recv()
612 .await
613 .expect("should receive a request")?;
614 assert_eq!(initial_req.subscription, "projects/p/subscriptions/s");
615 assert_eq!(initial_req.stream_ack_deadline_seconds, 20);
616 assert_eq!(initial_req.max_outstanding_messages, 2000);
617 assert_eq!(initial_req.max_outstanding_bytes, 200 * MIB);
618 assert!(
619 !initial_req.client_id.is_empty(),
620 "initial request has empty client id: {initial_req:?}"
621 );
622 assert!(
623 initial_req.protocol_version >= 1,
624 "protocol_version={}",
625 initial_req.protocol_version
626 );
627
628 Ok(())
629 }
630
631 #[tokio_test_no_panics(start_paused = true)]
632 async fn basic_success() -> anyhow::Result<()> {
633 let (response_tx, response_rx) = channel(10);
634 let (ack_tx, mut ack_rx) = unbounded_channel();
635
636 let mut mock = MockSubscriber::new();
637 mock.expect_streaming_pull()
638 .return_once(|_| Ok(TonicResponse::from(response_rx)));
639 mock.expect_acknowledge().returning(move |r| {
640 ack_tx
641 .send(r.into_inner())
642 .expect("sending on channel always succeeds");
643 Ok(TonicResponse::from(()))
644 });
645 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
646 let client = test_client(endpoint).await?;
647 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
648
649 response_tx.send(Ok(test_response(1..2))).await?;
650 response_tx.send(Ok(test_response(2..4))).await?;
651 response_tx.send(Ok(test_response(4..7))).await?;
652 drop(response_tx);
653
654 for i in 1..7 {
655 let Some((m, h)) = stream.next().await.transpose()? else {
656 anyhow::bail!("expected message {i}/6")
657 };
658 assert_eq!(m.data, test_data(i));
659 assert_eq!(h.ack_id(), test_id(i));
660 h.ack();
661 }
662 let end = stream.next().await.transpose()?;
663 assert!(end.is_none(), "Received extra message: {end:?}");
664
665 stream.shutdown_token().shutdown().await;
667
668 let ack_req = ack_rx.try_recv()?;
670 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
671 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..7));
672
673 Ok(())
674 }
675
676 #[tokio_test_no_panics(start_paused = true)]
677 async fn basic_success_exactly_once() -> anyhow::Result<()> {
678 let (response_tx, response_rx) = channel(10);
679 let (ack_tx, mut ack_rx) = unbounded_channel();
680
681 let mut mock = MockSubscriber::new();
682 mock.expect_streaming_pull()
683 .return_once(|_| Ok(TonicResponse::from(response_rx)));
684 mock.expect_acknowledge().returning(move |r| {
685 ack_tx
686 .send(r.into_inner())
687 .expect("sending on channel always succeeds");
688 Ok(TonicResponse::from(()))
689 });
690 mock.expect_modify_ack_deadline()
691 .returning(|_| Ok(TonicResponse::from(())));
692 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
693 let client = test_client(endpoint).await?;
694 let mut stream = client
695 .subscribe("projects/p/subscriptions/s")
696 .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
697 .build();
698
699 response_tx
700 .send(Ok(test_exactly_once_response(1..2)))
701 .await?;
702 response_tx
703 .send(Ok(test_exactly_once_response(2..4)))
704 .await?;
705 response_tx
706 .send(Ok(test_exactly_once_response(4..7)))
707 .await?;
708 drop(response_tx);
709
710 let mut acks = JoinSet::new();
711 for i in 1..7 {
712 let Some((m, Handler::ExactlyOnce(h))) = stream.next().await.transpose()? else {
713 anyhow::bail!("expected message {i}/6")
714 };
715 assert_eq!(m.data, test_data(i));
716 assert_eq!(h.ack_id(), test_id(i));
717 acks.spawn(h.confirmed_ack());
718 }
719 let end = stream.next().await.transpose()?;
720 assert!(end.is_none(), "Received extra message: {end:?}");
721
722 stream.shutdown_token().shutdown().await;
724
725 while let Some(r) = acks.join_next().await {
727 r??;
728 }
729 let ack_req = ack_rx.try_recv()?;
730 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
731 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..7));
732
733 Ok(())
734 }
735
736 #[tokio_test_no_panics(start_paused = true)]
737 async fn basic_lease_management() -> anyhow::Result<()> {
738 let (response_tx, response_rx) = channel(10);
739 let (ack_tx, mut ack_rx) = unbounded_channel();
740 let (nack_tx, mut nack_rx) = unbounded_channel();
741 let (extend_tx, mut extend_rx) = unbounded_channel();
742
743 let mut mock = MockSubscriber::new();
744 mock.expect_streaming_pull()
745 .return_once(|_| Ok(TonicResponse::from(response_rx)));
746 mock.expect_acknowledge().returning(move |r| {
747 ack_tx
748 .send(r.into_inner())
749 .expect("sending on channel always succeeds");
750 Ok(TonicResponse::from(()))
751 });
752 mock.expect_modify_ack_deadline().returning(move |r| {
753 let r = r.into_inner();
754 if r.ack_deadline_seconds == 0 {
755 nack_tx.send(r).expect("sending on channel always succeeds");
756 } else {
757 extend_tx
758 .send(r)
759 .expect("sending on channel always succeeds");
760 }
761 Ok(TonicResponse::from(()))
762 });
763 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
764 let client = test_client(endpoint).await?;
765 let mut stream = client
766 .subscribe("projects/p/subscriptions/s")
767 .set_max_lease_extension(Duration::from_secs(10))
768 .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
769 .build();
770
771 response_tx.send(Ok(test_response(0..30))).await?;
772 drop(response_tx);
773
774 for i in 0..10 {
776 let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
777 anyhow::bail!("expected message {i}")
778 };
779 h.ack();
780 }
781 for i in 10..20 {
783 let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
784 anyhow::bail!("expected message {i}")
785 };
786 h.nack();
787 }
788 let mut hold = Vec::new();
790 for i in 20..30 {
791 let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
792 anyhow::bail!("expected message {i}")
793 };
794 hold.push(h);
795 }
796
797 tokio::time::advance(Duration::from_secs(10)).await;
800
801 stream.shutdown_token().shutdown().await;
803
804 let ack_req = ack_rx.try_recv()?;
806 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
807 assert_eq!(sorted(ack_req.ack_ids), test_ids(0..10));
808 assert!(ack_rx.is_empty(), "{ack_rx:?}");
809
810 let nack_req = nack_rx.try_recv()?;
812 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
813 assert_eq!(nack_req.ack_deadline_seconds, 0);
814 assert_eq!(sorted(nack_req.ack_ids), test_ids(10..20));
815
816 let nack_req = nack_rx.try_recv()?;
818 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
819 assert_eq!(nack_req.ack_deadline_seconds, 0);
820 assert_eq!(sorted(nack_req.ack_ids), test_ids(20..30));
821 assert!(nack_rx.is_empty(), "{nack_rx:?}");
822
823 let extend_req = extend_rx.try_recv()?;
825 assert_eq!(extend_req.subscription, "projects/p/subscriptions/s");
826 assert_eq!(extend_req.ack_deadline_seconds, 10);
827 assert_eq!(sorted(extend_req.ack_ids), test_ids(20..30));
828
829 Ok(())
830 }
831
832 #[tokio_test_no_panics(start_paused = true)]
833 async fn delayed_responses() -> anyhow::Result<()> {
834 let (response_tx, response_rx) = channel(10);
838 let handle: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
839 tokio::time::sleep(Duration::from_millis(20)).await;
840 response_tx.send(Ok(test_response(1..2))).await?;
841 Ok(())
842 });
843
844 let mut mock = MockSubscriber::new();
845 mock.expect_streaming_pull()
846 .return_once(|_| Ok(TonicResponse::from(response_rx)));
847 mock.expect_modify_ack_deadline()
848 .returning(|_| Ok(TonicResponse::from(())));
849 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
850 let client = test_client(endpoint).await?;
851 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
852 let (m, h) = stream
853 .next()
854 .await
855 .transpose()?
856 .expect("stream should wait for a message");
857 assert_eq!(m.data, test_data(1));
858 assert_eq!(h.ack_id(), test_id(1));
859
860 handle.await??;
861
862 Ok(())
863 }
864
865 #[tokio_test_no_panics]
866 async fn serves_messages_immediately() -> anyhow::Result<()> {
867 let (response_tx, response_rx) = channel(10);
872
873 let mut mock = MockSubscriber::new();
874 mock.expect_streaming_pull()
875 .return_once(|_| Ok(TonicResponse::from(response_rx)));
876 mock.expect_modify_ack_deadline()
877 .returning(|_| Ok(TonicResponse::from(())));
878 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
879 let client = test_client(endpoint).await?;
880 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
881
882 for i in 1..7 {
883 response_tx.send(Ok(test_response(i..i + 1))).await?;
884
885 let Some((m, h)) = stream.next().await.transpose()? else {
886 anyhow::bail!("expected message {i}/6")
887 };
888 assert_eq!(m.data, test_data(i));
889 assert_eq!(h.ack_id(), test_id(i));
890 }
891 drop(response_tx);
892 let end = stream.next().await.transpose()?;
893 assert!(end.is_none(), "Received extra message: {end:?}");
894
895 Ok(())
896 }
897
898 #[tokio_test_no_panics]
899 async fn handles_empty_response() -> anyhow::Result<()> {
900 let (response_tx, response_rx) = channel(10);
901
902 let mut mock = MockSubscriber::new();
903 mock.expect_streaming_pull()
904 .return_once(|_| Ok(TonicResponse::from(response_rx)));
905 mock.expect_modify_ack_deadline()
906 .returning(|_| Ok(TonicResponse::from(())));
907 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
908 let client = test_client(endpoint).await?;
909 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
910
911 response_tx.send(Ok(test_response(1..2))).await?;
912 response_tx.send(Ok(test_response(2..2))).await?;
914 response_tx.send(Ok(test_response(2..3))).await?;
915 drop(response_tx);
916
917 for i in 1..3 {
918 let Some((m, h)) = stream.next().await.transpose()? else {
919 anyhow::bail!("expected message {i}/2")
920 };
921 assert_eq!(m.data, test_data(i));
922 assert_eq!(h.ack_id(), test_id(i));
923 }
924 let end = stream.next().await.transpose()?;
925 assert!(end.is_none(), "Received extra message: {end:?}");
926
927 Ok(())
928 }
929
930 #[tokio_test_no_panics(start_paused = true)]
931 async fn handles_missing_message_field() -> anyhow::Result<()> {
932 let (response_tx, response_rx) = channel(10);
933 let (extend_tx, mut extend_rx) = unbounded_channel();
934
935 let bad = v1::StreamingPullResponse {
936 received_messages: vec![v1::ReceivedMessage {
937 ack_id: "ignored-ack-id".to_string(),
938 message: None,
939 ..Default::default()
940 }],
941 ..Default::default()
942 };
943
944 let mut mock = MockSubscriber::new();
945 mock.expect_streaming_pull()
946 .return_once(|_| Ok(TonicResponse::from(response_rx)));
947 mock.expect_modify_ack_deadline().returning(move |r| {
948 let r = r.into_inner();
949 if r.ack_deadline_seconds != 0 {
950 extend_tx
951 .send(r)
952 .expect("sending on channel always succeeds");
953 }
954 Ok(TonicResponse::from(()))
955 });
956 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
957 let client = test_client(endpoint).await?;
958 let mut stream = client
959 .subscribe("projects/p/subscriptions/s")
960 .set_max_lease_extension(Duration::from_secs(10))
961 .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
962 .build();
963
964 response_tx.send(Ok(test_response(1..4))).await?;
965 response_tx.send(Ok(bad)).await?;
967 response_tx.send(Ok(test_response(4..7))).await?;
968 drop(response_tx);
969
970 let mut handlers = Vec::new();
971 for i in 1..7 {
972 let Some((m, h)) = stream.next().await.transpose()? else {
973 anyhow::bail!("expected message {i}/6")
974 };
975 assert_eq!(m.data, test_data(i));
976 assert_eq!(h.ack_id(), test_id(i));
977 handlers.push(h);
978 }
979
980 tokio::time::advance(Duration::from_secs(10)).await;
983
984 stream.shutdown_token().shutdown().await;
986
987 let extend_req = extend_rx.try_recv()?;
989 assert_eq!(extend_req.subscription, "projects/p/subscriptions/s");
990 assert_eq!(extend_req.ack_deadline_seconds, 10);
991 assert_eq!(sorted(extend_req.ack_ids), test_ids(1..7));
993
994 Ok(())
995 }
996
997 #[tokio_test_no_panics]
998 async fn permanent_error_midstream() -> anyhow::Result<()> {
999 let (response_tx, response_rx) = channel(10);
1000
1001 let mut mock = MockSubscriber::new();
1002 mock.expect_streaming_pull()
1003 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1004 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1005 let client = test_client(endpoint).await?;
1006 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1007
1008 response_tx.send(Ok(test_response(1..4))).await?;
1009 response_tx
1010 .send(Err(TonicStatus::failed_precondition("fail")))
1011 .await?;
1012 drop(response_tx);
1013
1014 for i in 1..4 {
1015 let Some((m, h)) = stream.next().await.transpose()? else {
1016 anyhow::bail!("expected message {i}/3")
1017 };
1018 assert_eq!(m.data, test_data(i));
1019 assert_eq!(h.ack_id(), test_id(i));
1020 }
1021 let err = stream
1022 .next()
1023 .await
1024 .transpose()
1025 .expect_err("expected an error from stream");
1026 assert!(err.status().is_some(), "{err:?}");
1027 let status = err.status().unwrap();
1028 assert_eq!(
1029 status.code,
1030 google_cloud_gax::error::rpc::Code::FailedPrecondition
1031 );
1032 assert_eq!(status.message, "fail");
1033
1034 Ok(())
1035 }
1036
1037 #[tokio_test_no_panics(start_paused = true)]
1038 async fn keepalives() -> anyhow::Result<()> {
1039 let (recover_writes_tx, mut recover_writes_rx) = channel(1);
1042 let (response_tx, response_rx) = channel(10);
1043
1044 let mut mock = MockSubscriber::new();
1045 mock.expect_streaming_pull().return_once(move |request| {
1046 tokio::spawn(async move {
1047 let mut request_rx = request.into_inner();
1050 while let Some(request) = request_rx.recv().await {
1051 recover_writes_tx
1052 .send(request)
1053 .await
1054 .expect("forwarding writes always succeeds");
1055 }
1056 });
1057 Ok(TonicResponse::from(response_rx))
1058 });
1059 mock.expect_modify_ack_deadline()
1060 .returning(|_| Ok(TonicResponse::from(())));
1061
1062 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1063 let client = test_client(endpoint).await?;
1064 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1065 response_tx.send(Ok(test_response(1..4))).await?;
1066 let _ = stream.next().await;
1067
1068 let initial_req = recover_writes_rx
1069 .recv()
1070 .await
1071 .expect("should receive an initial request")?;
1072 assert_eq!(initial_req.subscription, "projects/p/subscriptions/s");
1073
1074 tokio::time::advance(KEEPALIVE_PERIOD).await;
1076 let keepalive_req = recover_writes_rx
1077 .recv()
1078 .await
1079 .expect("should receive a keepalive request")?;
1080 assert_eq!(keepalive_req, v1::StreamingPullRequest::default());
1081
1082 drop(stream);
1085
1086 tokio::time::advance(4 * KEEPALIVE_PERIOD).await;
1089 assert!(recover_writes_rx.is_empty(), "{recover_writes_rx:?}");
1090
1091 Ok(())
1092 }
1093
1094 #[tokio_test_no_panics]
1095 async fn client_id() -> anyhow::Result<()> {
1096 let (recover_writes_tx, mut recover_writes_rx) = channel(10);
1099 let recover_writes_tx = std::sync::Arc::new(tokio::sync::Mutex::new(recover_writes_tx));
1100
1101 let mut mock = MockSubscriber::new();
1102 mock.expect_streaming_pull()
1103 .times(3)
1104 .returning(move |request| {
1105 let tx = recover_writes_tx.clone();
1106 tokio::spawn(async move {
1107 let mut request_rx = request.into_inner();
1110 while let Some(request) = request_rx.recv().await {
1111 tx.lock()
1112 .await
1113 .send(request)
1114 .await
1115 .expect("forwarding writes always succeeds");
1116 }
1117 });
1118 Err(TonicStatus::failed_precondition("fail"))
1119 });
1120
1121 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1122
1123 let c1 = test_client(endpoint.clone()).await?;
1126 let _ = c1
1127 .subscribe("projects/p/subscriptions/s")
1128 .build()
1129 .next()
1130 .await;
1131 let req1 = recover_writes_rx
1132 .recv()
1133 .await
1134 .expect("should receive a request")?;
1135 let _ = c1
1136 .subscribe("projects/p/subscriptions/s")
1137 .build()
1138 .next()
1139 .await;
1140 let req2 = recover_writes_rx
1141 .recv()
1142 .await
1143 .expect("should receive a request")?;
1144 assert_eq!(req1.client_id, req2.client_id);
1145
1146 let c2 = test_client(endpoint).await?;
1149 let _ = c2
1150 .subscribe("projects/p/subscriptions/s")
1151 .build()
1152 .next()
1153 .await;
1154 let req3 = recover_writes_rx
1155 .recv()
1156 .await
1157 .expect("should receive a request")?;
1158 assert_ne!(req1.client_id, req3.client_id);
1159
1160 Ok(())
1161 }
1162
1163 #[tokio_test_no_panics(start_paused = true)]
1164 async fn no_immediate_message() -> anyhow::Result<()> {
1165 const TEST_TIMEOUT: Duration = Duration::from_secs(42);
1166
1167 let (_response_tx, response_rx) = channel(10);
1168
1169 let mut mock = MockSubscriber::new();
1170 mock.expect_streaming_pull()
1171 .return_once(move |_| Ok(TonicResponse::from(response_rx)));
1172
1173 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1174 let client = test_client(endpoint).await?;
1175 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1176
1177 let _ = tokio::time::timeout(TEST_TIMEOUT, stream.next())
1178 .await
1179 .expect_err("next() should never yield.");
1180
1181 Ok(())
1182 }
1183
1184 #[tokio_test_no_panics(start_paused = true)]
1185 async fn retry_transient_when_starting_stream() -> anyhow::Result<()> {
1186 const NUM_RETRIES: u32 = 20;
1190
1191 let start_time = Instant::now();
1192 let mut seq = mockall::Sequence::new();
1193 let mut mock = MockSubscriber::new();
1194
1195 mock.expect_streaming_pull()
1197 .times(NUM_RETRIES as usize)
1198 .in_sequence(&mut seq)
1199 .returning(|_| Err(TonicStatus::unavailable("try again")));
1200 mock.expect_streaming_pull()
1202 .times(1)
1203 .in_sequence(&mut seq)
1204 .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
1205 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1206 let client = test_client(endpoint).await?;
1207 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1208 let err = stream
1209 .next()
1210 .await
1211 .expect("stream should not be empty")
1212 .expect_err("the first streamed item should be an error");
1213 assert!(err.status().is_some(), "{err:?}");
1214 let status = err.status().unwrap();
1215 assert_eq!(
1216 status.code,
1217 google_cloud_gax::error::rpc::Code::FailedPrecondition
1218 );
1219 assert_eq!(status.message, "fail");
1220
1221 let elapsed = start_time.elapsed();
1222 assert!(
1223 elapsed <= MAXIMUM_DELAY * NUM_RETRIES,
1224 "elapsed={elapsed:?}"
1225 );
1226 assert!(
1227 elapsed >= INITIAL_DELAY * NUM_RETRIES,
1228 "elapsed={elapsed:?}"
1229 );
1230
1231 Ok(())
1232 }
1233
1234 #[tokio_test_no_panics(start_paused = true)]
1235 async fn resume_midstream_success() -> anyhow::Result<()> {
1236 let (response_tx_1, response_rx_1) = channel(10);
1237 let (response_tx_2, response_rx_2) = channel(10);
1238 let (response_tx_3, response_rx_3) = channel(10);
1239 let (ack_tx, mut ack_rx) = unbounded_channel();
1240
1241 let mut seq = mockall::Sequence::new();
1242 let mut mock = MockSubscriber::new();
1243 mock.expect_streaming_pull()
1244 .times(1)
1245 .in_sequence(&mut seq)
1246 .return_once(|_| Ok(TonicResponse::from(response_rx_1)));
1247 mock.expect_streaming_pull()
1248 .times(1)
1249 .in_sequence(&mut seq)
1250 .return_once(move |_| Ok(TonicResponse::from(response_rx_2)));
1251 mock.expect_streaming_pull()
1252 .times(1)
1253 .in_sequence(&mut seq)
1254 .return_once(|_| Ok(TonicResponse::from(response_rx_3)));
1255 mock.expect_acknowledge().times(1..).returning(move |r| {
1256 ack_tx
1257 .send(r.into_inner())
1258 .expect("sending on channel always succeeds");
1259 Ok(TonicResponse::from(()))
1260 });
1261 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1262 let client = test_client(endpoint).await?;
1263 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1264
1265 response_tx_1.send(Ok(test_response(0..10))).await?;
1266 response_tx_1.send(Ok(test_response(10..20))).await?;
1267 response_tx_1
1268 .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1269 .await?;
1270 drop(response_tx_1);
1271 response_tx_2.send(Ok(test_response(20..30))).await?;
1272 response_tx_2.send(Ok(test_response(30..40))).await?;
1273 response_tx_2
1274 .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1275 .await?;
1276 drop(response_tx_2);
1277 response_tx_3.send(Ok(test_response(40..50))).await?;
1278 drop(response_tx_3);
1279
1280 for i in 0..50 {
1281 let (m, h) = stream
1282 .next()
1283 .await
1284 .unwrap_or_else(|| panic!("expected message {}/50", i + 1))?;
1285 assert_eq!(m.data, test_data(i));
1286 h.ack();
1287 }
1288 let end = stream.next().await.transpose()?;
1289 assert!(end.is_none(), "Received extra message: {end:?}");
1290
1291 stream.shutdown_token().shutdown().await;
1293
1294 let mut got = Vec::new();
1296 while let Ok(ack_req) = ack_rx.try_recv() {
1297 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1298 got.extend(ack_req.ack_ids);
1299 }
1300 assert_eq!(sorted(got), test_ids(0..50));
1301
1302 Ok(())
1303 }
1304
1305 #[tokio_test_no_panics(start_paused = true)]
1306 async fn resume_midstream_hits_permanent_error() -> anyhow::Result<()> {
1307 let (response_tx, response_rx) = channel(10);
1308 let (ack_tx, mut ack_rx) = unbounded_channel();
1309
1310 let mut seq = mockall::Sequence::new();
1311 let mut mock = MockSubscriber::new();
1312 mock.expect_streaming_pull()
1314 .times(1)
1315 .in_sequence(&mut seq)
1316 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1317 mock.expect_streaming_pull()
1319 .times(3)
1320 .in_sequence(&mut seq)
1321 .returning(|_| Err(TonicStatus::unavailable("try again")));
1322 mock.expect_streaming_pull()
1324 .times(1)
1325 .in_sequence(&mut seq)
1326 .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
1327 mock.expect_acknowledge().times(1..).returning(move |r| {
1328 ack_tx
1329 .send(r.into_inner())
1330 .expect("sending on channel always succeeds");
1331 Ok(TonicResponse::from(()))
1332 });
1333 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1334 let client = test_client(endpoint).await?;
1335 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1336
1337 response_tx.send(Ok(test_response(0..10))).await?;
1338 response_tx.send(Ok(test_response(10..20))).await?;
1339 response_tx
1340 .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1341 .await?;
1342 drop(response_tx);
1343
1344 for i in 0..20 {
1345 let (m, h) = stream
1346 .next()
1347 .await
1348 .unwrap_or_else(|| panic!("expected message {}/20", i + 1))?;
1349 assert_eq!(m.data, test_data(i));
1350 h.ack();
1351 }
1352 let err = stream
1353 .next()
1354 .await
1355 .transpose()
1356 .expect_err("expected an error from stream");
1357 assert!(err.status().is_some(), "{err:?}");
1358 let status = err.status().unwrap();
1359 assert_eq!(
1360 status.code,
1361 google_cloud_gax::error::rpc::Code::FailedPrecondition
1362 );
1363 assert_eq!(status.message, "fail");
1364
1365 stream.shutdown_token().shutdown().await;
1367
1368 let mut got = Vec::new();
1370 while let Ok(ack_req) = ack_rx.try_recv() {
1371 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1372 got.extend(ack_req.ack_ids);
1373 }
1374 assert_eq!(sorted(got), test_ids(0..20));
1375
1376 Ok(())
1377 }
1378
1379 #[tokio_test_no_panics]
1380 async fn routing_header() -> anyhow::Result<()> {
1381 let mut mock = MockSubscriber::new();
1382
1383 mock.expect_streaming_pull().return_once(move |request| {
1384 let metadata = request.metadata();
1385 assert_eq!(
1386 metadata
1387 .get("x-goog-request-params")
1388 .expect("routing header missing"),
1389 "subscription=projects/p/subscriptions/s"
1390 );
1391 Err(TonicStatus::failed_precondition("ignored"))
1392 });
1393
1394 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1395 let client = test_client(endpoint).await?;
1396
1397 let _ = client
1398 .subscribe("projects/p/subscriptions/s")
1399 .build()
1400 .next()
1401 .await;
1402
1403 Ok(())
1404 }
1405
1406 #[cfg(feature = "unstable-stream")]
1407 #[tokio_test_no_panics(start_paused = true)]
1408 async fn into_stream() -> anyhow::Result<()> {
1409 use futures::TryStreamExt;
1410 let (response_tx, response_rx) = channel(10);
1411 let (ack_tx, mut ack_rx) = unbounded_channel();
1412
1413 let mut mock = MockSubscriber::new();
1414 mock.expect_streaming_pull()
1415 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1416 mock.expect_acknowledge().returning(move |r| {
1417 ack_tx
1418 .send(r.into_inner())
1419 .expect("sending on channel always succeeds");
1420 Ok(TonicResponse::from(()))
1421 });
1422
1423 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1424 let client = test_client(endpoint).await?;
1425
1426 let stream = client
1427 .subscribe("projects/p/subscriptions/s")
1428 .build()
1429 .into_stream();
1430
1431 response_tx.send(Ok(test_response(1..3))).await?;
1432 drop(response_tx);
1433
1434 let got: Vec<_> = stream
1435 .map_ok(|(m, h)| {
1436 h.ack();
1437 m.data
1438 })
1439 .try_collect()
1440 .await?;
1441 assert_eq!(got, vec![test_data(1), test_data(2)]);
1442
1443 let ack_req = ack_rx
1444 .recv()
1445 .await
1446 .expect("should receive acknowledgements");
1447 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1448 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..3));
1449
1450 Ok(())
1451 }
1452
1453 #[tokio_test_no_panics(start_paused = true)]
1454 async fn basic_lease_expiration() -> anyhow::Result<()> {
1455 const MAX_LEASE_EXTENSION: Duration = Duration::from_secs(10);
1456 const MAX_LEASE: Duration = Duration::from_secs(30);
1457 let start_time = Instant::now();
1462 let (response_tx, response_rx) = channel(10);
1463 let (extend_tx, mut extend_rx) = unbounded_channel();
1464
1465 let mut mock = MockSubscriber::new();
1466 mock.expect_streaming_pull()
1467 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1468 mock.expect_modify_ack_deadline().returning(move |r| {
1469 extend_tx
1470 .send(r.into_inner())
1471 .expect("sending on channel always succeeds");
1472 Ok(TonicResponse::from(()))
1473 });
1474 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1475 let client = test_client(endpoint).await?;
1476 let mut stream = client
1477 .subscribe("projects/p/subscriptions/s")
1478 .set_max_lease(MAX_LEASE)
1479 .set_max_lease_extension(MAX_LEASE_EXTENSION)
1480 .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1481 .build();
1482
1483 response_tx.send(Ok(test_response(0..1))).await?;
1484 drop(response_tx);
1485
1486 let (_m, _h) = stream
1487 .next()
1488 .await
1489 .expect("stream should yield a message")?;
1490
1491 let mut latest = None;
1494 for _ in 0..MAX_LEASE.as_secs() * 2 {
1495 while let Ok(r) = extend_rx.try_recv() {
1496 assert_ne!(r.ack_deadline_seconds, 0, "unexpectedly received a nack");
1497 latest = Some(start_time.elapsed());
1498 }
1499 tokio::time::advance(Duration::from_secs(1)).await;
1500 tokio::task::yield_now().await;
1501 }
1502
1503 let expected_range = (MAX_LEASE - MAX_LEASE_EXTENSION)..=MAX_LEASE;
1505 assert!(
1506 latest.is_some_and(|t| expected_range.contains(&t)),
1507 "{latest:?}"
1508 );
1509
1510 stream.shutdown_token().shutdown().await;
1512
1513 Ok(())
1514 }
1515
1516 #[tokio_test_no_panics(start_paused = true)]
1517 async fn shutdown_wait_for_processing() -> anyhow::Result<()> {
1518 let (response_tx, response_rx) = channel(10);
1519
1520 let mut mock = MockSubscriber::new();
1521 mock.expect_streaming_pull()
1522 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1523 mock.expect_acknowledge()
1524 .times(1)
1525 .returning(|_| Ok(TonicResponse::from(())));
1526 mock.expect_modify_ack_deadline()
1527 .returning(|_| Ok(TonicResponse::from(())));
1528 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1529 let client = test_client(endpoint).await?;
1530 let mut stream = client
1531 .subscribe("projects/p/subscriptions/s")
1532 .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
1533 .build();
1534
1535 response_tx.send(Ok(test_response(0..1))).await?;
1536 drop(response_tx);
1537
1538 let (_m, h) = stream
1539 .next()
1540 .await
1541 .expect("stream should yield a message")?;
1542
1543 tokio::spawn(async move {
1544 tokio::time::sleep(Duration::from_secs(5)).await;
1547 h.ack();
1548 });
1549
1550 stream.shutdown_token().shutdown().await;
1552
1553 Ok(())
1554 }
1555
1556 #[tokio_test_no_panics(start_paused = true)]
1557 async fn at_least_once_and_exactly_once() -> anyhow::Result<()> {
1558 let (response_tx, response_rx) = channel(10);
1559
1560 let mut mock = MockSubscriber::new();
1561 mock.expect_streaming_pull()
1562 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1563 mock.expect_modify_ack_deadline()
1564 .returning(|_| Ok(TonicResponse::from(())));
1565 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1566 let client = test_client(endpoint).await?;
1567 let mut stream = client
1568 .subscribe("projects/p/subscriptions/s")
1569 .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1570 .build();
1571
1572 response_tx.send(Ok(test_response(0..1))).await?;
1573 response_tx
1574 .send(Ok(test_exactly_once_response(1..2)))
1575 .await?;
1576 response_tx.send(Ok(test_response(2..3))).await?;
1577 response_tx
1578 .send(Ok(test_exactly_once_response(3..4)))
1579 .await?;
1580 drop(response_tx);
1581
1582 let (m, h) = stream.next().await.expect("should yield a message")?;
1583 assert_eq!(m.data, test_data(0));
1584 assert_eq!(h.ack_id(), test_id(0));
1585 assert!(matches!(h, Handler::AtLeastOnce(_)), "{h:?}");
1586
1587 let (m, h) = stream.next().await.expect("should yield a message")?;
1588 assert_eq!(m.data, test_data(1));
1589 assert_eq!(h.ack_id(), test_id(1));
1590 assert!(matches!(h, Handler::ExactlyOnce(_)), "{h:?}");
1591
1592 let (m, h) = stream.next().await.expect("should yield a message")?;
1593 assert_eq!(m.data, test_data(2));
1594 assert_eq!(h.ack_id(), test_id(2));
1595 assert!(matches!(h, Handler::AtLeastOnce(_)), "{h:?}");
1596
1597 let (m, h) = stream.next().await.expect("should yield a message")?;
1598 assert_eq!(m.data, test_data(3));
1599 assert_eq!(h.ack_id(), test_id(3));
1600 assert!(matches!(h, Handler::ExactlyOnce(_)), "{h:?}");
1601
1602 let end = stream.next().await.transpose()?;
1603 assert!(end.is_none(), "Received extra message: {end:?}");
1604
1605 stream.shutdown_token().shutdown().await;
1607
1608 Ok(())
1609 }
1610
1611 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1612 async fn cancel_before_open() -> anyhow::Result<()> {
1613 let mut mock = MockSubscriber::new();
1614 mock.expect_streaming_pull()
1615 .returning(|_| Err(TonicStatus::unavailable("try again")));
1616 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1617 let client = test_client(endpoint).await?;
1618 let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1619 let shutdown_token = stream.shutdown_token();
1620
1621 let next = tokio::spawn(async move { stream.next().await });
1622 shutdown_token.shutdown().await;
1623
1624 let end = next.await?;
1625 assert!(end.is_none(), "Shutdown should end the stream, got {end:?}");
1626
1627 Ok(())
1628 }
1629
1630 #[tokio_test_no_panics(start_paused = true)]
1631 async fn cancel_midstream() -> anyhow::Result<()> {
1632 let (response_tx, response_rx) = channel(10);
1633 let (ack_tx, mut ack_rx) = unbounded_channel();
1634 let (nack_tx, mut nack_rx) = unbounded_channel();
1635
1636 let mut mock = MockSubscriber::new();
1637 mock.expect_streaming_pull()
1638 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1639 mock.expect_acknowledge().times(1).returning(move |r| {
1640 ack_tx
1641 .send(r.into_inner())
1642 .expect("sending on channel always succeeds");
1643 Ok(TonicResponse::from(()))
1644 });
1645 mock.expect_modify_ack_deadline()
1646 .times(1)
1647 .returning(move |r| {
1648 nack_tx
1649 .send(r.into_inner())
1650 .expect("sending on channel always succeeds");
1651 Ok(TonicResponse::from(()))
1652 });
1653 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1654 let client = test_client(endpoint).await?;
1655 let mut stream = client
1656 .subscribe("projects/p/subscriptions/s")
1657 .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
1658 .build();
1659 let shutdown_token = stream.shutdown_token();
1660
1661 response_tx.send(Ok(test_response(1..10))).await?;
1662 for i in 1..6 {
1663 let Some((m, h)) = stream.next().await.transpose()? else {
1664 anyhow::bail!("expected message {i}/5")
1665 };
1666 assert_eq!(m.data, test_data(i));
1667 h.ack();
1668 }
1669 let shutdown = tokio::spawn(async move {
1670 shutdown_token.shutdown().await;
1671 });
1672 tokio::task::yield_now().await;
1673 let end = stream.next().await.transpose()?;
1674 assert!(end.is_none(), "Shutdown should end the stream, got {end:?}");
1675
1676 shutdown.await?;
1679
1680 let ack_req = ack_rx.try_recv()?;
1681 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1682 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1683
1684 let nack_req = nack_rx.try_recv()?;
1685 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1686 assert_eq!(nack_req.ack_deadline_seconds, 0);
1687 assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1688
1689 Ok(())
1690 }
1691
1692 #[test_case(ShutdownBehavior::NackImmediately)]
1693 #[test_case(ShutdownBehavior::WaitForProcessing)]
1694 #[tokio_test_no_panics(start_paused = true)]
1695 async fn shutdown_without_next(shutdown_behavior: ShutdownBehavior) -> anyhow::Result<()> {
1696 let (response_tx, response_rx) = channel(10);
1697 let (ack_tx, mut ack_rx) = unbounded_channel();
1698 let (nack_tx, mut nack_rx) = unbounded_channel();
1699
1700 let mut mock = MockSubscriber::new();
1701 mock.expect_streaming_pull()
1702 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1703 mock.expect_acknowledge().times(1).returning(move |r| {
1704 ack_tx
1705 .send(r.into_inner())
1706 .expect("sending on channel always succeeds");
1707 Ok(TonicResponse::from(()))
1708 });
1709 mock.expect_modify_ack_deadline()
1710 .times(1)
1711 .returning(move |r| {
1712 nack_tx
1713 .send(r.into_inner())
1714 .expect("sending on channel always succeeds");
1715 Ok(TonicResponse::from(()))
1716 });
1717 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1718 let client = test_client(endpoint).await?;
1719 let mut stream = client
1720 .subscribe("projects/p/subscriptions/s")
1721 .set_shutdown_behavior(shutdown_behavior)
1722 .build();
1723 let shutdown_token = stream.shutdown_token();
1724
1725 response_tx.send(Ok(test_response(1..10))).await?;
1726 for i in 1..6 {
1727 let Some((m, h)) = stream.next().await.transpose()? else {
1728 anyhow::bail!("expected message {i}/5")
1729 };
1730 assert_eq!(m.data, test_data(i));
1731 h.ack();
1732 }
1733 shutdown_token.shutdown().await;
1736
1737 let ack_req = ack_rx.try_recv()?;
1738 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1739 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1740
1741 let nack_req = nack_rx.try_recv()?;
1742 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1743 assert_eq!(nack_req.ack_deadline_seconds, 0);
1744 assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1745
1746 Ok(())
1747 }
1748
1749 #[test_case(ShutdownBehavior::NackImmediately)]
1750 #[test_case(ShutdownBehavior::WaitForProcessing)]
1751 #[tokio_test_no_panics(start_paused = true)]
1752 async fn stream_error_initiates_shutdown(
1753 shutdown_behavior: ShutdownBehavior,
1754 ) -> anyhow::Result<()> {
1755 let (response_tx, response_rx) = channel(10);
1756 let (ack_tx, mut ack_rx) = unbounded_channel();
1757
1758 let mut mock = MockSubscriber::new();
1759 mock.expect_streaming_pull()
1760 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1761 mock.expect_acknowledge().times(1).returning(move |r| {
1762 ack_tx
1763 .send(r.into_inner())
1764 .expect("sending on channel always succeeds");
1765 Ok(TonicResponse::from(()))
1766 });
1767 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1768 let client = test_client(endpoint).await?;
1769 let mut stream = client
1770 .subscribe("projects/p/subscriptions/s")
1771 .set_shutdown_behavior(shutdown_behavior)
1772 .build();
1773 let shutdown_token = stream.shutdown_token();
1774
1775 response_tx.send(Ok(test_response(0..1))).await?;
1776 response_tx
1777 .send(Err(TonicStatus::failed_precondition("fail")))
1778 .await?;
1779 drop(response_tx);
1780
1781 let (m, h) = stream.next().await.expect("should yield a message")?;
1782 assert_eq!(m.data, test_data(0));
1783 h.ack();
1784
1785 let err = stream.next().await.expect("should yield an error");
1786 assert!(err.is_err(), "{err:?}");
1787
1788 shutdown_token.wait_for_shutdown().await;
1791
1792 let ack_req = ack_rx.try_recv()?;
1793 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1794 assert_eq!(ack_req.ack_ids, test_ids(0..1));
1795
1796 Ok(())
1797 }
1798
1799 #[test_case(ShutdownBehavior::NackImmediately)]
1800 #[test_case(ShutdownBehavior::WaitForProcessing)]
1801 #[tokio_test_no_panics(start_paused = true)]
1802 async fn drop_cancels(shutdown_behavior: ShutdownBehavior) -> anyhow::Result<()> {
1803 let (response_tx, response_rx) = channel(10);
1804 let (ack_tx, mut ack_rx) = unbounded_channel();
1805 let (nack_tx, mut nack_rx) = unbounded_channel();
1806
1807 let mut mock = MockSubscriber::new();
1808 mock.expect_streaming_pull()
1809 .return_once(|_| Ok(TonicResponse::from(response_rx)));
1810 mock.expect_acknowledge().times(1).returning(move |r| {
1811 ack_tx
1812 .send(r.into_inner())
1813 .expect("sending on channel always succeeds");
1814 Ok(TonicResponse::from(()))
1815 });
1816 mock.expect_modify_ack_deadline()
1817 .times(1)
1818 .returning(move |r| {
1819 nack_tx
1820 .send(r.into_inner())
1821 .expect("sending on channel always succeeds");
1822 Ok(TonicResponse::from(()))
1823 });
1824 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1825 let client = test_client(endpoint).await?;
1826 let mut stream = client
1827 .subscribe("projects/p/subscriptions/s")
1828 .set_shutdown_behavior(shutdown_behavior)
1829 .build();
1830 let shutdown_token = stream.shutdown_token();
1831
1832 response_tx.send(Ok(test_response(1..10))).await?;
1833 for i in 1..6 {
1834 let Some((m, h)) = stream.next().await.transpose()? else {
1835 anyhow::bail!("expected message {i}/5")
1836 };
1837 assert_eq!(m.data, test_data(i));
1838 h.ack();
1839 }
1840 drop(stream); shutdown_token.wait_for_shutdown().await;
1842
1843 let ack_req = ack_rx.try_recv()?;
1844 assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1845 assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1846
1847 let nack_req = nack_rx.try_recv()?;
1848 assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1849 assert_eq!(nack_req.ack_deadline_seconds, 0);
1850 assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1851
1852 Ok(())
1853 }
1854}