Skip to main content

google_cloud_pubsub/subscriber/
message_stream.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::builder::Subscribe;
16use super::handler::{AckResult, Action, AtLeastOnce, ExactlyOnce, Handler};
17use super::lease_loop::LeaseLoop;
18use super::lease_state::{AtLeastOnceInfo, ExactlyOnceInfo, LeaseInfo, LeaseOptions, NewMessage};
19use super::leaser::DefaultLeaser;
20use super::retry_policy::StreamRetryPolicy;
21use super::shutdown_token::ShutdownToken;
22use super::stream::Stream;
23use super::stub::TonicStreaming as _;
24use super::transport::Transport;
25use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse};
26use crate::model::Message;
27use crate::{Error, Result};
28use futures::FutureExt;
29use futures::future::{BoxFuture, Shared};
30use gaxi::grpc::from_status::to_gax_error;
31use gaxi::prost::FromProto as _;
32use google_cloud_gax::retry_result::RetryResult;
33use std::collections::VecDeque;
34use std::sync::Arc;
35use tokio::sync::mpsc::{UnboundedSender, WeakUnboundedSender, unbounded_channel};
36use tokio::sync::oneshot::Receiver;
37use tokio::time::Duration;
38use tokio_util::sync::{CancellationToken, DropGuard};
39
40/// Represents an open subscribe stream.
41///
42/// This is a stream-like struct for serving messages to an application.
43///
44/// # Example
45/// ```
46/// # use google_cloud_pubsub::client::Subscriber;
47/// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
48/// let mut stream = client
49///     .subscribe("projects/my-project/subscriptions/my-subscription")
50///     .build();
51/// while let Some((m, h)) = stream.next().await.transpose()? {
52///     println!("Received message m={m:?}");
53///     h.ack();
54/// }
55/// # Ok(()) }
56/// ```
57#[derive(Debug)]
58pub struct MessageStream {
59    /// Implementation of the `MessageStream`.
60    ///
61    /// To avoid atomic increments in the critical path, we separate the
62    /// shutdown token from the rest of the struct. This way we can hold a
63    /// mutable reference to `self.inner`, and a reference to `self.shutdown` at
64    /// the same time.
65    inner: MessageStreamImpl,
66
67    /// This future is ready when the lease loop shutdown completes.
68    lease_loop: Shared<BoxFuture<'static, ()>>,
69
70    /// A token that can detect a shutdown from the application.
71    shutdown: CancellationToken,
72
73    /// Signal a shutdown if the application drops this struct.
74    ///
75    /// This field is intentionally unused; it exists solely to trigger a
76    /// shutdown signal via its `Drop` implementation.
77    _shutdown_guard: DropGuard,
78}
79
80#[derive(Debug)]
81pub struct MessageStreamImpl {
82    /// The stub implementing this struct.
83    stub: Arc<Transport>,
84
85    /// The initial request used to start a stream.
86    initial_req: StreamingPullRequest,
87
88    /// The bidirectional stream.
89    ///
90    /// We choose to lazy-initialize the stream when the application asks for a
91    /// message because tonic will not yield the stream to us until the first
92    /// response is available.[^1]
93    ///
94    /// The usability of the `MessageStream` API would suffer if creating an instance
95    /// of `MessageStream` is blocked on the first message being available.
96    ///
97    /// [^1]: <https://github.com/hyperium/tonic/issues/515>
98    stream: Option<StreamState>,
99
100    /// Applications ask for messages one at a time. Individual stream responses
101    /// can contain multiple messages. We use `pool` to hold the extra messages
102    /// while we wait to serve them to applications.
103    ///
104    /// A FIFO queue is necessary to preserve ordering.
105    pool: VecDeque<(Message, HandlerInfo)>,
106
107    /// A sender for sending new messages from the stream into the lease
108    /// management task.
109    message_tx: WeakUnboundedSender<NewMessage>,
110
111    /// A sender for forwarding acks/nacks from the application to the lease
112    /// management task. Each `Handler` holds a clone of this.
113    ack_tx: WeakUnboundedSender<Action>,
114
115    /// A token that can initiate shutdown after a stream error.
116    shutdown: CancellationToken,
117}
118
119// We would rather always allocate enough space to hold the stream on the stack
120// than add a layer of indirection by `Box`ing it.
121#[allow(clippy::large_enum_variant)]
122#[derive(Debug)]
123enum StreamState {
124    /// The stream was cancelled or failed with a permanent error. It should not
125    /// be re-opened.
126    Closed,
127    /// The stream is active.
128    Active(Stream<Transport>),
129}
130
131impl MessageStream {
132    pub(super) fn new(builder: Subscribe) -> Self {
133        let stub = builder.inner;
134        let subscription = builder.subscription;
135
136        let (confirmed_tx, confirmed_rx) = unbounded_channel();
137        let leaser = DefaultLeaser::new(
138            stub.clone(),
139            confirmed_tx,
140            subscription.clone(),
141            builder.ack_deadline_seconds,
142            builder.grpc_subchannel_count,
143        );
144        let options = LeaseOptions {
145            max_lease: builder.max_lease,
146            max_lease_extension: Duration::from_secs(builder.ack_deadline_seconds as u64),
147            shutdown_behavior: builder.shutdown_behavior,
148            ..Default::default()
149        };
150        let LeaseLoop {
151            handle,
152            message_tx,
153            ack_tx,
154        } = LeaseLoop::new(leaser, confirmed_rx, options);
155        let lease_loop = handle.map(|_| ()).boxed().shared();
156
157        let weak_message_tx = message_tx.downgrade();
158        let weak_ack_tx = ack_tx.downgrade();
159
160        let shutdown = CancellationToken::new();
161        let shutdown_clone = shutdown.clone();
162        let _shutdown_guard = shutdown.clone().drop_guard();
163        tokio::spawn(async move {
164            // Hold the strong senders for the channels, dropping them when an
165            // application signals a shutdown. This lets us begin the shutdown
166            // procedure without requiring the application to `drop(stream)` or
167            // call `stream.next()`.
168            shutdown_clone.cancelled().await;
169            drop(message_tx);
170            drop(ack_tx);
171        });
172
173        let initial_req = StreamingPullRequest {
174            subscription,
175            stream_ack_deadline_seconds: builder.ack_deadline_seconds,
176            max_outstanding_messages: builder.max_outstanding_messages,
177            max_outstanding_bytes: builder.max_outstanding_bytes,
178            client_id: builder.client_id,
179            // `protocol_version == 1` means we support receiving heartbeats
180            // (empty `StreamingPullResponse`s) from the server.
181            protocol_version: 1,
182            ..Default::default()
183        };
184
185        let inner = MessageStreamImpl {
186            stub,
187            initial_req,
188            stream: None,
189            pool: VecDeque::new(),
190            message_tx: weak_message_tx,
191            ack_tx: weak_ack_tx,
192            shutdown: shutdown.clone(),
193        };
194        Self {
195            inner,
196            lease_loop,
197            shutdown,
198            _shutdown_guard,
199        }
200    }
201
202    /// Returns the next message received on this subscription.
203    ///
204    /// # Example
205    /// ```
206    /// # use google_cloud_pubsub::subscriber::MessageStream;
207    /// # async fn sample(mut stream: MessageStream) -> anyhow::Result<()> {
208    /// while let Some((m, h)) = stream.next().await.transpose()? {
209    ///     println!("Received message m={m:?}");
210    ///     h.ack();
211    /// }
212    /// # Ok(()) }
213    /// ```
214    ///
215    /// Returns the message data along with a [Handler] to acknowledge (ack) the
216    /// message.
217    ///
218    /// If the underlying stream encounters a permanent error, an `Error` is
219    /// returned instead.
220    ///
221    /// `None` represents the end of a stream, but in practice, the stream stays
222    /// open until it is cancelled or encounters a permanent error.
223    pub async fn next(&mut self) -> Option<Result<(Message, Handler)>> {
224        let next = tokio::select! {
225            biased;
226            _ = self.shutdown.cancelled() => {
227                self.inner.close();
228                None
229            },
230            n = self.inner.next() => n,
231        };
232        next
233    }
234
235    #[cfg(feature = "unstable-stream")]
236    #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
237    /// Converts the `MessageStream` to a [`futures::Stream`].
238    ///
239    /// # Example
240    /// ```
241    /// # use google_cloud_pubsub::subscriber::MessageStream;
242    /// # async fn sample(stream: MessageStream) -> anyhow::Result<()> {
243    /// use futures::TryStreamExt;
244    /// let mut stream = stream.into_stream();
245    /// while let Some((m, h)) = stream.try_next().await? { /* ... */ }
246    /// # Ok(()) }
247    /// ```
248    pub fn into_stream(self) -> impl futures::Stream<Item = Result<(Message, Handler)>> + Unpin {
249        use futures::stream::unfold;
250        Box::pin(unfold(self, |mut stream| async move {
251            stream.next().await.map(|item| (item, stream))
252        }))
253    }
254
255    /// Returns a shutdown token for the stream.
256    ///
257    /// # Example
258    /// ```
259    /// # use google_cloud_pubsub::subscriber::MessageStream;
260    /// # async fn sample(mut stream: MessageStream) {
261    /// // Get a shutdown token for the stream.
262    /// let shutdown_token = stream.shutdown_token();
263    ///
264    /// // Signal and await a shutdown of the stream.
265    /// shutdown_token.shutdown().await;
266    ///
267    /// // The stream stops yielding messages after a cancel.
268    /// assert!(stream.next().await.is_none());
269    /// # }
270    /// ```
271    ///
272    /// Use this token to signal and/or await shutdown of the stream.
273    ///
274    /// Awaiting a stream shutdown gives the subscriber time to flush its
275    /// pending acknowledgements, and schedule other messages for redelivery to
276    /// another client as soon as possible.
277    pub fn shutdown_token(&self) -> ShutdownToken {
278        ShutdownToken {
279            inner: self.shutdown.clone(),
280            // This future is ready when the lease loop shutdown completes.
281            fut: self.lease_loop.clone(),
282        }
283    }
284}
285
286impl MessageStreamImpl {
287    async fn next(&mut self) -> Option<Result<(Message, Handler)>> {
288        loop {
289            // Serve a message if we have one ready.
290            if let Some((m, hi)) = self.pool.pop_front() {
291                return Some(Ok((m, hi.into_handler(self.ack_tx.upgrade()?))));
292            }
293
294            // Otherwise, read the next response from the stream, which will
295            // likely populate the message pool.
296            //
297            // Note that a successful read does not necessarily mean there is a
298            // message in the pool. The server occasionally sends heartbeats
299            // (responses with an empty message list). Hence the loop.
300            if let Err(e) = self.populate_pool().await? {
301                // Handle errors opening or reading from the stream.
302                match StreamRetryPolicy::on_midstream_error(e) {
303                    RetryResult::Continue(_) => {
304                        // The stream failed with a transient error. Reset the stream.
305                        self.stream = None;
306                        continue;
307                    }
308                    RetryResult::Permanent(e) | RetryResult::Exhausted(e) => {
309                        // The stream failed with a permanent error. Return the error.
310                        self.close();
311                        return Some(Err(e));
312                    }
313                }
314            }
315        }
316    }
317
318    /// Make a new attempt to open the underlying gRPC stream.
319    async fn open_stream(&mut self) -> Result<()> {
320        let stream = Stream::<Transport>::new(self.stub.clone(), self.initial_req.clone()).await?;
321        self.stream = Some(StreamState::Active(stream));
322        Ok(())
323    }
324
325    /// Reads the next response from the stream.
326    ///
327    /// If necessary, this method will open a new stream.
328    ///
329    /// If we receive an error either opening or reading from the stream, we
330    /// return it.
331    async fn next_response(&mut self) -> Option<Result<StreamingPullResponse>> {
332        if self.stream.is_none() {
333            // Open the stream, if necessary.
334            if let Err(e) = self.open_stream().await {
335                return Some(Err(e));
336            }
337        }
338
339        let stream = match self.stream.as_mut()? {
340            StreamState::Closed => return None,
341            StreamState::Active(s) => s,
342        };
343        stream
344            .next_message()
345            .await
346            .map_err(to_gax_error)
347            .transpose()
348    }
349
350    /// Populate the message pool by reading from the stream.
351    ///
352    /// Read the next response from the stream. If necessary, this method will
353    /// open a new stream.
354    ///
355    /// If we receive a response, we store the messages in `self.pool` and
356    /// forward the ack IDs to the lease management task.
357    ///
358    /// If we receive an error reading from the stream, we return it.
359    async fn populate_pool(&mut self) -> Option<Result<()>> {
360        // Read the next response from the stream.
361        let resp = match self.next_response().await? {
362            Ok(resp) => resp,
363            Err(e) => return Some(Err(e)),
364        };
365
366        let exactly_once = resp
367            .subscription_properties
368            .is_some_and(|m| m.exactly_once_delivery_enabled);
369
370        // Process the received messages in the response.
371        for rm in resp.received_messages {
372            let Some(message) = rm.message else {
373                // The message field should always be present. If not, the proto
374                // message was corrupted while in transit, or there is a bug in
375                // the service.
376                //
377                // The client can just ignore an ack ID without an associated
378                // message.
379                continue;
380            };
381
382            let (lease_info, handler_info) = if exactly_once {
383                let (result_tx, result_rx) = tokio::sync::oneshot::channel();
384                (
385                    LeaseInfo::ExactlyOnce(ExactlyOnceInfo::new(result_tx)),
386                    HandlerInfo::ExactlyOnce {
387                        ack_id: rm.ack_id.clone(),
388                        result_rx,
389                    },
390                )
391            } else {
392                (
393                    LeaseInfo::AtLeastOnce(AtLeastOnceInfo::new()),
394                    HandlerInfo::AtLeastOnce {
395                        ack_id: rm.ack_id.clone(),
396                    },
397                )
398            };
399
400            let _ = self.message_tx.upgrade()?.send(NewMessage {
401                ack_id: rm.ack_id,
402                lease_info,
403            });
404            let message = match message.cnv().map_err(Error::deser) {
405                Ok(message) => message,
406                Err(e) => return Some(Err(e)),
407            };
408            self.pool.push_back((message, handler_info));
409        }
410        Some(Ok(()))
411    }
412
413    // Permanently close the stream.
414    fn close(&mut self) {
415        self.stream = Some(StreamState::Closed);
416        self.pool.clear();
417        self.shutdown.cancel();
418    }
419}
420
421/// A `Handler` without its action `Sender`.
422///
423/// We only want to create strong `Sender`s for `Handler`s that we yield to the
424/// application.
425///
426/// Note that the application should be able to signal a shutdown without
427/// dropping the `MessageStream` or calling `MessageStream::next()`.
428///
429/// In these cases, the items in the `MessageStream::pool` are not cleared. So,
430/// if we hold a strong `Sender` in the `pool`, we would never initiate a
431/// shutdown when configured to `WaitForProcessing`.
432#[derive(Debug)]
433enum HandlerInfo {
434    AtLeastOnce {
435        ack_id: String,
436    },
437    ExactlyOnce {
438        ack_id: String,
439        result_rx: Receiver<AckResult>,
440    },
441}
442
443impl HandlerInfo {
444    /// Convert this type to a `Handler`, by adding its action `Sender` before
445    /// serving it to the application.
446    fn into_handler(self, ack_tx: UnboundedSender<Action>) -> Handler {
447        match self {
448            HandlerInfo::AtLeastOnce { ack_id } => {
449                Handler::AtLeastOnce(AtLeastOnce::new(ack_id, ack_tx))
450            }
451            HandlerInfo::ExactlyOnce { ack_id, result_rx } => {
452                Handler::ExactlyOnce(ExactlyOnce::new(ack_id, ack_tx, result_rx))
453            }
454        }
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::super::ShutdownBehavior;
461    use super::super::client::Subscriber;
462    use super::super::keepalive::KEEPALIVE_PERIOD;
463    use super::super::lease_state::tests::{test_id, test_ids};
464    use super::super::stream::{INITIAL_DELAY, MAXIMUM_DELAY};
465    use super::*;
466    use gaxi::grpc::tonic::{Response as TonicResponse, Status as TonicStatus};
467    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
468    use google_cloud_test_macros::tokio_test_no_panics;
469    use pubsub_grpc_mock::google::pubsub::v1;
470    use pubsub_grpc_mock::{MockSubscriber, start};
471    use test_case::test_case;
472    use tokio::sync::mpsc::{channel, unbounded_channel};
473    use tokio::task::{JoinHandle, JoinSet};
474    use tokio::time::{Duration, Instant};
475
476    fn sorted(mut v: Vec<String>) -> Vec<String> {
477        v.sort();
478        v
479    }
480
481    fn test_data(v: i32) -> bytes::Bytes {
482        bytes::Bytes::from(format!("data-{}", test_id(v)))
483    }
484
485    fn test_response(range: std::ops::Range<i32>) -> v1::StreamingPullResponse {
486        v1::StreamingPullResponse {
487            received_messages: range
488                .into_iter()
489                .map(|i| v1::ReceivedMessage {
490                    ack_id: test_id(i),
491                    message: Some(v1::PubsubMessage {
492                        data: test_data(i).to_vec(),
493                        ..Default::default()
494                    }),
495                    ..Default::default()
496                })
497                .collect(),
498            ..Default::default()
499        }
500    }
501
502    fn test_exactly_once_response(range: std::ops::Range<i32>) -> v1::StreamingPullResponse {
503        v1::StreamingPullResponse {
504            subscription_properties: Some(v1::streaming_pull_response::SubscriptionProperties {
505                exactly_once_delivery_enabled: true,
506                ..Default::default()
507            }),
508            received_messages: range
509                .into_iter()
510                .map(|i| v1::ReceivedMessage {
511                    ack_id: test_id(i),
512                    message: Some(v1::PubsubMessage {
513                        data: test_data(i).to_vec(),
514                        ..Default::default()
515                    }),
516                    ..Default::default()
517                })
518                .collect(),
519            ..Default::default()
520        }
521    }
522
523    async fn test_client(endpoint: String) -> anyhow::Result<Subscriber> {
524        Ok(Subscriber::builder()
525            .with_endpoint(endpoint)
526            .with_credentials(Anonymous::new().build())
527            .build()
528            .await?)
529    }
530
531    #[tokio_test_no_panics]
532    async fn error_starting_stream() -> anyhow::Result<()> {
533        let mut mock = MockSubscriber::new();
534        mock.expect_streaming_pull()
535            .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
536        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
537        let client = test_client(endpoint).await?;
538        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
539        let err = stream
540            .next()
541            .await
542            .expect("stream should not be empty")
543            .expect_err("the first streamed item should be an error");
544        assert!(err.status().is_some(), "{err:?}");
545        let status = err.status().unwrap();
546        assert_eq!(
547            status.code,
548            google_cloud_gax::error::rpc::Code::FailedPrecondition
549        );
550        assert_eq!(status.message, "fail");
551
552        Ok(())
553    }
554
555    #[tokio_test_no_panics]
556    async fn permanent_error_ends_stream() -> anyhow::Result<()> {
557        let mut mock = MockSubscriber::new();
558        mock.expect_streaming_pull()
559            .returning(|_| Err(TonicStatus::failed_precondition("fail")));
560        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
561        let client = test_client(endpoint).await?;
562        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
563        let next = stream.next().await;
564        assert!(
565            matches!(next, Some(Err(_))),
566            "expected permanent error, got {next:?}"
567        );
568
569        let next = stream.next().await;
570        assert!(next.is_none(), "expected end of stream, got {next:?}");
571
572        Ok(())
573    }
574
575    #[tokio_test_no_panics]
576    async fn initial_request() -> anyhow::Result<()> {
577        const MIB: i64 = 1024 * 1024;
578
579        // We use this channel to surface writes (requests) from outside our
580        // mock expectation.
581        let (recover_writes_tx, mut recover_writes_rx) = channel(1);
582
583        let mut mock = MockSubscriber::new();
584        mock.expect_streaming_pull().return_once(move |request| {
585            tokio::spawn(async move {
586                // Note that this task stays alive as long as we hold
587                // `recover_writes_rx`.
588                let mut request_rx = request.into_inner();
589                while let Some(request) = request_rx.recv().await {
590                    recover_writes_tx
591                        .send(request)
592                        .await
593                        .expect("forwarding writes always succeeds");
594                }
595            });
596            Err(TonicStatus::failed_precondition("fail"))
597        });
598
599        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
600        let client = test_client(endpoint).await?;
601        let _ = client
602            .subscribe("projects/p/subscriptions/s")
603            .set_max_lease_extension(Duration::from_secs(20))
604            .set_max_outstanding_messages(2000)
605            .set_max_outstanding_bytes(200 * MIB)
606            .build()
607            .next()
608            .await;
609
610        let initial_req = recover_writes_rx
611            .recv()
612            .await
613            .expect("should receive a request")?;
614        assert_eq!(initial_req.subscription, "projects/p/subscriptions/s");
615        assert_eq!(initial_req.stream_ack_deadline_seconds, 20);
616        assert_eq!(initial_req.max_outstanding_messages, 2000);
617        assert_eq!(initial_req.max_outstanding_bytes, 200 * MIB);
618        assert!(
619            !initial_req.client_id.is_empty(),
620            "initial request has empty client id: {initial_req:?}"
621        );
622        assert!(
623            initial_req.protocol_version >= 1,
624            "protocol_version={}",
625            initial_req.protocol_version
626        );
627
628        Ok(())
629    }
630
631    #[tokio_test_no_panics(start_paused = true)]
632    async fn basic_success() -> anyhow::Result<()> {
633        let (response_tx, response_rx) = channel(10);
634        let (ack_tx, mut ack_rx) = unbounded_channel();
635
636        let mut mock = MockSubscriber::new();
637        mock.expect_streaming_pull()
638            .return_once(|_| Ok(TonicResponse::from(response_rx)));
639        mock.expect_acknowledge().returning(move |r| {
640            ack_tx
641                .send(r.into_inner())
642                .expect("sending on channel always succeeds");
643            Ok(TonicResponse::from(()))
644        });
645        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
646        let client = test_client(endpoint).await?;
647        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
648
649        response_tx.send(Ok(test_response(1..2))).await?;
650        response_tx.send(Ok(test_response(2..4))).await?;
651        response_tx.send(Ok(test_response(4..7))).await?;
652        drop(response_tx);
653
654        for i in 1..7 {
655            let Some((m, h)) = stream.next().await.transpose()? else {
656                anyhow::bail!("expected message {i}/6")
657            };
658            assert_eq!(m.data, test_data(i));
659            assert_eq!(h.ack_id(), test_id(i));
660            h.ack();
661        }
662        let end = stream.next().await.transpose()?;
663        assert!(end.is_none(), "Received extra message: {end:?}");
664
665        // Wait for the stream to join its background tasks.
666        stream.shutdown_token().shutdown().await;
667
668        // Verify the acks went through.
669        let ack_req = ack_rx.try_recv()?;
670        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
671        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..7));
672
673        Ok(())
674    }
675
676    #[tokio_test_no_panics(start_paused = true)]
677    async fn basic_success_exactly_once() -> anyhow::Result<()> {
678        let (response_tx, response_rx) = channel(10);
679        let (ack_tx, mut ack_rx) = unbounded_channel();
680
681        let mut mock = MockSubscriber::new();
682        mock.expect_streaming_pull()
683            .return_once(|_| Ok(TonicResponse::from(response_rx)));
684        mock.expect_acknowledge().returning(move |r| {
685            ack_tx
686                .send(r.into_inner())
687                .expect("sending on channel always succeeds");
688            Ok(TonicResponse::from(()))
689        });
690        mock.expect_modify_ack_deadline()
691            .returning(|_| Ok(TonicResponse::from(())));
692        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
693        let client = test_client(endpoint).await?;
694        let mut stream = client
695            .subscribe("projects/p/subscriptions/s")
696            .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
697            .build();
698
699        response_tx
700            .send(Ok(test_exactly_once_response(1..2)))
701            .await?;
702        response_tx
703            .send(Ok(test_exactly_once_response(2..4)))
704            .await?;
705        response_tx
706            .send(Ok(test_exactly_once_response(4..7)))
707            .await?;
708        drop(response_tx);
709
710        let mut acks = JoinSet::new();
711        for i in 1..7 {
712            let Some((m, Handler::ExactlyOnce(h))) = stream.next().await.transpose()? else {
713                anyhow::bail!("expected message {i}/6")
714            };
715            assert_eq!(m.data, test_data(i));
716            assert_eq!(h.ack_id(), test_id(i));
717            acks.spawn(h.confirmed_ack());
718        }
719        let end = stream.next().await.transpose()?;
720        assert!(end.is_none(), "Received extra message: {end:?}");
721
722        // Wait for the stream to join its background tasks.
723        stream.shutdown_token().shutdown().await;
724
725        // Verify the acks went through.
726        while let Some(r) = acks.join_next().await {
727            r??;
728        }
729        let ack_req = ack_rx.try_recv()?;
730        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
731        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..7));
732
733        Ok(())
734    }
735
736    #[tokio_test_no_panics(start_paused = true)]
737    async fn basic_lease_management() -> anyhow::Result<()> {
738        let (response_tx, response_rx) = channel(10);
739        let (ack_tx, mut ack_rx) = unbounded_channel();
740        let (nack_tx, mut nack_rx) = unbounded_channel();
741        let (extend_tx, mut extend_rx) = unbounded_channel();
742
743        let mut mock = MockSubscriber::new();
744        mock.expect_streaming_pull()
745            .return_once(|_| Ok(TonicResponse::from(response_rx)));
746        mock.expect_acknowledge().returning(move |r| {
747            ack_tx
748                .send(r.into_inner())
749                .expect("sending on channel always succeeds");
750            Ok(TonicResponse::from(()))
751        });
752        mock.expect_modify_ack_deadline().returning(move |r| {
753            let r = r.into_inner();
754            if r.ack_deadline_seconds == 0 {
755                nack_tx.send(r).expect("sending on channel always succeeds");
756            } else {
757                extend_tx
758                    .send(r)
759                    .expect("sending on channel always succeeds");
760            }
761            Ok(TonicResponse::from(()))
762        });
763        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
764        let client = test_client(endpoint).await?;
765        let mut stream = client
766            .subscribe("projects/p/subscriptions/s")
767            .set_max_lease_extension(Duration::from_secs(10))
768            .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
769            .build();
770
771        response_tx.send(Ok(test_response(0..30))).await?;
772        drop(response_tx);
773
774        // Ack some messages
775        for i in 0..10 {
776            let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
777                anyhow::bail!("expected message {i}")
778            };
779            h.ack();
780        }
781        // Nack some messages
782        for i in 10..20 {
783            let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
784                anyhow::bail!("expected message {i}")
785            };
786            h.nack();
787        }
788        // Take a long time to process some messages
789        let mut hold = Vec::new();
790        for i in 20..30 {
791            let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
792                anyhow::bail!("expected message {i}")
793            };
794            hold.push(h);
795        }
796
797        // Advance the clock 10s, which is the stream ack deadline. In this
798        // time, we should attempt at least one lease extension RPC.
799        tokio::time::advance(Duration::from_secs(10)).await;
800
801        // Close the stream, to make sure pending operations complete.
802        stream.shutdown_token().shutdown().await;
803
804        // Verify the acks went through.
805        let ack_req = ack_rx.try_recv()?;
806        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
807        assert_eq!(sorted(ack_req.ack_ids), test_ids(0..10));
808        assert!(ack_rx.is_empty(), "{ack_rx:?}");
809
810        // Verify the initial nacks went through.
811        let nack_req = nack_rx.try_recv()?;
812        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
813        assert_eq!(nack_req.ack_deadline_seconds, 0);
814        assert_eq!(sorted(nack_req.ack_ids), test_ids(10..20));
815
816        // Verify that we nack the leftover messages when the stream shuts down.
817        let nack_req = nack_rx.try_recv()?;
818        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
819        assert_eq!(nack_req.ack_deadline_seconds, 0);
820        assert_eq!(sorted(nack_req.ack_ids), test_ids(20..30));
821        assert!(nack_rx.is_empty(), "{nack_rx:?}");
822
823        // Verify at least one lease extension attempt was made.
824        let extend_req = extend_rx.try_recv()?;
825        assert_eq!(extend_req.subscription, "projects/p/subscriptions/s");
826        assert_eq!(extend_req.ack_deadline_seconds, 10);
827        assert_eq!(sorted(extend_req.ack_ids), test_ids(20..30));
828
829        Ok(())
830    }
831
832    #[tokio_test_no_panics(start_paused = true)]
833    async fn delayed_responses() -> anyhow::Result<()> {
834        // In this test, we verify the case where an application asks for a
835        // message, but a response is not immediately available on the stream.
836
837        let (response_tx, response_rx) = channel(10);
838        let handle: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
839            tokio::time::sleep(Duration::from_millis(20)).await;
840            response_tx.send(Ok(test_response(1..2))).await?;
841            Ok(())
842        });
843
844        let mut mock = MockSubscriber::new();
845        mock.expect_streaming_pull()
846            .return_once(|_| Ok(TonicResponse::from(response_rx)));
847        mock.expect_modify_ack_deadline()
848            .returning(|_| Ok(TonicResponse::from(())));
849        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
850        let client = test_client(endpoint).await?;
851        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
852        let (m, h) = stream
853            .next()
854            .await
855            .transpose()?
856            .expect("stream should wait for a message");
857        assert_eq!(m.data, test_data(1));
858        assert_eq!(h.ack_id(), test_id(1));
859
860        handle.await??;
861
862        Ok(())
863    }
864
865    #[tokio_test_no_panics]
866    async fn serves_messages_immediately() -> anyhow::Result<()> {
867        // This test verifies we do not do something crazy like draining the
868        // stream (which would never end) before serving messages to the
869        // application.
870
871        let (response_tx, response_rx) = channel(10);
872
873        let mut mock = MockSubscriber::new();
874        mock.expect_streaming_pull()
875            .return_once(|_| Ok(TonicResponse::from(response_rx)));
876        mock.expect_modify_ack_deadline()
877            .returning(|_| Ok(TonicResponse::from(())));
878        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
879        let client = test_client(endpoint).await?;
880        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
881
882        for i in 1..7 {
883            response_tx.send(Ok(test_response(i..i + 1))).await?;
884
885            let Some((m, h)) = stream.next().await.transpose()? else {
886                anyhow::bail!("expected message {i}/6")
887            };
888            assert_eq!(m.data, test_data(i));
889            assert_eq!(h.ack_id(), test_id(i));
890        }
891        drop(response_tx);
892        let end = stream.next().await.transpose()?;
893        assert!(end.is_none(), "Received extra message: {end:?}");
894
895        Ok(())
896    }
897
898    #[tokio_test_no_panics]
899    async fn handles_empty_response() -> anyhow::Result<()> {
900        let (response_tx, response_rx) = channel(10);
901
902        let mut mock = MockSubscriber::new();
903        mock.expect_streaming_pull()
904            .return_once(|_| Ok(TonicResponse::from(response_rx)));
905        mock.expect_modify_ack_deadline()
906            .returning(|_| Ok(TonicResponse::from(())));
907        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
908        let client = test_client(endpoint).await?;
909        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
910
911        response_tx.send(Ok(test_response(1..2))).await?;
912        // See if we can handle an empty range
913        response_tx.send(Ok(test_response(2..2))).await?;
914        response_tx.send(Ok(test_response(2..3))).await?;
915        drop(response_tx);
916
917        for i in 1..3 {
918            let Some((m, h)) = stream.next().await.transpose()? else {
919                anyhow::bail!("expected message {i}/2")
920            };
921            assert_eq!(m.data, test_data(i));
922            assert_eq!(h.ack_id(), test_id(i));
923        }
924        let end = stream.next().await.transpose()?;
925        assert!(end.is_none(), "Received extra message: {end:?}");
926
927        Ok(())
928    }
929
930    #[tokio_test_no_panics(start_paused = true)]
931    async fn handles_missing_message_field() -> anyhow::Result<()> {
932        let (response_tx, response_rx) = channel(10);
933        let (extend_tx, mut extend_rx) = unbounded_channel();
934
935        let bad = v1::StreamingPullResponse {
936            received_messages: vec![v1::ReceivedMessage {
937                ack_id: "ignored-ack-id".to_string(),
938                message: None,
939                ..Default::default()
940            }],
941            ..Default::default()
942        };
943
944        let mut mock = MockSubscriber::new();
945        mock.expect_streaming_pull()
946            .return_once(|_| Ok(TonicResponse::from(response_rx)));
947        mock.expect_modify_ack_deadline().returning(move |r| {
948            let r = r.into_inner();
949            if r.ack_deadline_seconds != 0 {
950                extend_tx
951                    .send(r)
952                    .expect("sending on channel always succeeds");
953            }
954            Ok(TonicResponse::from(()))
955        });
956        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
957        let client = test_client(endpoint).await?;
958        let mut stream = client
959            .subscribe("projects/p/subscriptions/s")
960            .set_max_lease_extension(Duration::from_secs(10))
961            .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
962            .build();
963
964        response_tx.send(Ok(test_response(1..4))).await?;
965        // See if we can handle an empty range
966        response_tx.send(Ok(bad)).await?;
967        response_tx.send(Ok(test_response(4..7))).await?;
968        drop(response_tx);
969
970        let mut handlers = Vec::new();
971        for i in 1..7 {
972            let Some((m, h)) = stream.next().await.transpose()? else {
973                anyhow::bail!("expected message {i}/6")
974            };
975            assert_eq!(m.data, test_data(i));
976            assert_eq!(h.ack_id(), test_id(i));
977            handlers.push(h);
978        }
979
980        // Advance the clock 10s, which is the stream ack deadline. In this
981        // time, we should attempt at least one lease extension RPC.
982        tokio::time::advance(Duration::from_secs(10)).await;
983
984        // Close the stream, to make sure pending operations complete.
985        stream.shutdown_token().shutdown().await;
986
987        // Verify at least one lease extension attempt was made.
988        let extend_req = extend_rx.try_recv()?;
989        assert_eq!(extend_req.subscription, "projects/p/subscriptions/s");
990        assert_eq!(extend_req.ack_deadline_seconds, 10);
991        // Note that we do not expect to see "ignored-ack-id".
992        assert_eq!(sorted(extend_req.ack_ids), test_ids(1..7));
993
994        Ok(())
995    }
996
997    #[tokio_test_no_panics]
998    async fn permanent_error_midstream() -> anyhow::Result<()> {
999        let (response_tx, response_rx) = channel(10);
1000
1001        let mut mock = MockSubscriber::new();
1002        mock.expect_streaming_pull()
1003            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1004        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1005        let client = test_client(endpoint).await?;
1006        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1007
1008        response_tx.send(Ok(test_response(1..4))).await?;
1009        response_tx
1010            .send(Err(TonicStatus::failed_precondition("fail")))
1011            .await?;
1012        drop(response_tx);
1013
1014        for i in 1..4 {
1015            let Some((m, h)) = stream.next().await.transpose()? else {
1016                anyhow::bail!("expected message {i}/3")
1017            };
1018            assert_eq!(m.data, test_data(i));
1019            assert_eq!(h.ack_id(), test_id(i));
1020        }
1021        let err = stream
1022            .next()
1023            .await
1024            .transpose()
1025            .expect_err("expected an error from stream");
1026        assert!(err.status().is_some(), "{err:?}");
1027        let status = err.status().unwrap();
1028        assert_eq!(
1029            status.code,
1030            google_cloud_gax::error::rpc::Code::FailedPrecondition
1031        );
1032        assert_eq!(status.message, "fail");
1033
1034        Ok(())
1035    }
1036
1037    #[tokio_test_no_panics(start_paused = true)]
1038    async fn keepalives() -> anyhow::Result<()> {
1039        // We use this channel to surface writes (requests) from outside our
1040        // mock expectation.
1041        let (recover_writes_tx, mut recover_writes_rx) = channel(1);
1042        let (response_tx, response_rx) = channel(10);
1043
1044        let mut mock = MockSubscriber::new();
1045        mock.expect_streaming_pull().return_once(move |request| {
1046            tokio::spawn(async move {
1047                // Note that this task stays alive as long as we hold
1048                // `recover_writes_rx`.
1049                let mut request_rx = request.into_inner();
1050                while let Some(request) = request_rx.recv().await {
1051                    recover_writes_tx
1052                        .send(request)
1053                        .await
1054                        .expect("forwarding writes always succeeds");
1055                }
1056            });
1057            Ok(TonicResponse::from(response_rx))
1058        });
1059        mock.expect_modify_ack_deadline()
1060            .returning(|_| Ok(TonicResponse::from(())));
1061
1062        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1063        let client = test_client(endpoint).await?;
1064        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1065        response_tx.send(Ok(test_response(1..4))).await?;
1066        let _ = stream.next().await;
1067
1068        let initial_req = recover_writes_rx
1069            .recv()
1070            .await
1071            .expect("should receive an initial request")?;
1072        assert_eq!(initial_req.subscription, "projects/p/subscriptions/s");
1073
1074        // Verify that we receive at least one keepalive request on the stream.
1075        tokio::time::advance(KEEPALIVE_PERIOD).await;
1076        let keepalive_req = recover_writes_rx
1077            .recv()
1078            .await
1079            .expect("should receive a keepalive request")?;
1080        assert_eq!(keepalive_req, v1::StreamingPullRequest::default());
1081
1082        // Drop the stream, which should signal a shutdown of the keepalive
1083        // task.
1084        drop(stream);
1085
1086        // Advance the time far enough to expect a keepalive ping, if the
1087        // keepalive task was still running.
1088        tokio::time::advance(4 * KEEPALIVE_PERIOD).await;
1089        assert!(recover_writes_rx.is_empty(), "{recover_writes_rx:?}");
1090
1091        Ok(())
1092    }
1093
1094    #[tokio_test_no_panics]
1095    async fn client_id() -> anyhow::Result<()> {
1096        // We use this channel to surface writes (requests) from outside our
1097        // mock expectation.
1098        let (recover_writes_tx, mut recover_writes_rx) = channel(10);
1099        let recover_writes_tx = std::sync::Arc::new(tokio::sync::Mutex::new(recover_writes_tx));
1100
1101        let mut mock = MockSubscriber::new();
1102        mock.expect_streaming_pull()
1103            .times(3)
1104            .returning(move |request| {
1105                let tx = recover_writes_tx.clone();
1106                tokio::spawn(async move {
1107                    // Note that this task stays alive as long as we hold
1108                    // `recover_writes_rx`.
1109                    let mut request_rx = request.into_inner();
1110                    while let Some(request) = request_rx.recv().await {
1111                        tx.lock()
1112                            .await
1113                            .send(request)
1114                            .await
1115                            .expect("forwarding writes always succeeds");
1116                    }
1117                });
1118                Err(TonicStatus::failed_precondition("fail"))
1119            });
1120
1121        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1122
1123        // Make two requests with the same client. The requests should have the
1124        // same client ID.
1125        let c1 = test_client(endpoint.clone()).await?;
1126        let _ = c1
1127            .subscribe("projects/p/subscriptions/s")
1128            .build()
1129            .next()
1130            .await;
1131        let req1 = recover_writes_rx
1132            .recv()
1133            .await
1134            .expect("should receive a request")?;
1135        let _ = c1
1136            .subscribe("projects/p/subscriptions/s")
1137            .build()
1138            .next()
1139            .await;
1140        let req2 = recover_writes_rx
1141            .recv()
1142            .await
1143            .expect("should receive a request")?;
1144        assert_eq!(req1.client_id, req2.client_id);
1145
1146        // Make a third request with a different client. This request should
1147        // have a different client ID.
1148        let c2 = test_client(endpoint).await?;
1149        let _ = c2
1150            .subscribe("projects/p/subscriptions/s")
1151            .build()
1152            .next()
1153            .await;
1154        let req3 = recover_writes_rx
1155            .recv()
1156            .await
1157            .expect("should receive a request")?;
1158        assert_ne!(req1.client_id, req3.client_id);
1159
1160        Ok(())
1161    }
1162
1163    #[tokio_test_no_panics(start_paused = true)]
1164    async fn no_immediate_message() -> anyhow::Result<()> {
1165        const TEST_TIMEOUT: Duration = Duration::from_secs(42);
1166
1167        let (_response_tx, response_rx) = channel(10);
1168
1169        let mut mock = MockSubscriber::new();
1170        mock.expect_streaming_pull()
1171            .return_once(move |_| Ok(TonicResponse::from(response_rx)));
1172
1173        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1174        let client = test_client(endpoint).await?;
1175        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1176
1177        let _ = tokio::time::timeout(TEST_TIMEOUT, stream.next())
1178            .await
1179            .expect_err("next() should never yield.");
1180
1181        Ok(())
1182    }
1183
1184    #[tokio_test_no_panics(start_paused = true)]
1185    async fn retry_transient_when_starting_stream() -> anyhow::Result<()> {
1186        // The policy should retry forever. Our default retry policies have an
1187        // attempt limit of 10. So we arbitrarily pick a number greater than 10
1188        // for this test.
1189        const NUM_RETRIES: u32 = 20;
1190
1191        let start_time = Instant::now();
1192        let mut seq = mockall::Sequence::new();
1193        let mut mock = MockSubscriber::new();
1194
1195        // Simulate N transient errors
1196        mock.expect_streaming_pull()
1197            .times(NUM_RETRIES as usize)
1198            .in_sequence(&mut seq)
1199            .returning(|_| Err(TonicStatus::unavailable("try again")));
1200        // Simulate a permanent error. Otherwise, we would retry forever.
1201        mock.expect_streaming_pull()
1202            .times(1)
1203            .in_sequence(&mut seq)
1204            .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
1205        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1206        let client = test_client(endpoint).await?;
1207        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1208        let err = stream
1209            .next()
1210            .await
1211            .expect("stream should not be empty")
1212            .expect_err("the first streamed item should be an error");
1213        assert!(err.status().is_some(), "{err:?}");
1214        let status = err.status().unwrap();
1215        assert_eq!(
1216            status.code,
1217            google_cloud_gax::error::rpc::Code::FailedPrecondition
1218        );
1219        assert_eq!(status.message, "fail");
1220
1221        let elapsed = start_time.elapsed();
1222        assert!(
1223            elapsed <= MAXIMUM_DELAY * NUM_RETRIES,
1224            "elapsed={elapsed:?}"
1225        );
1226        assert!(
1227            elapsed >= INITIAL_DELAY * NUM_RETRIES,
1228            "elapsed={elapsed:?}"
1229        );
1230
1231        Ok(())
1232    }
1233
1234    #[tokio_test_no_panics(start_paused = true)]
1235    async fn resume_midstream_success() -> anyhow::Result<()> {
1236        let (response_tx_1, response_rx_1) = channel(10);
1237        let (response_tx_2, response_rx_2) = channel(10);
1238        let (response_tx_3, response_rx_3) = channel(10);
1239        let (ack_tx, mut ack_rx) = unbounded_channel();
1240
1241        let mut seq = mockall::Sequence::new();
1242        let mut mock = MockSubscriber::new();
1243        mock.expect_streaming_pull()
1244            .times(1)
1245            .in_sequence(&mut seq)
1246            .return_once(|_| Ok(TonicResponse::from(response_rx_1)));
1247        mock.expect_streaming_pull()
1248            .times(1)
1249            .in_sequence(&mut seq)
1250            .return_once(move |_| Ok(TonicResponse::from(response_rx_2)));
1251        mock.expect_streaming_pull()
1252            .times(1)
1253            .in_sequence(&mut seq)
1254            .return_once(|_| Ok(TonicResponse::from(response_rx_3)));
1255        mock.expect_acknowledge().times(1..).returning(move |r| {
1256            ack_tx
1257                .send(r.into_inner())
1258                .expect("sending on channel always succeeds");
1259            Ok(TonicResponse::from(()))
1260        });
1261        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1262        let client = test_client(endpoint).await?;
1263        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1264
1265        response_tx_1.send(Ok(test_response(0..10))).await?;
1266        response_tx_1.send(Ok(test_response(10..20))).await?;
1267        response_tx_1
1268            .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1269            .await?;
1270        drop(response_tx_1);
1271        response_tx_2.send(Ok(test_response(20..30))).await?;
1272        response_tx_2.send(Ok(test_response(30..40))).await?;
1273        response_tx_2
1274            .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1275            .await?;
1276        drop(response_tx_2);
1277        response_tx_3.send(Ok(test_response(40..50))).await?;
1278        drop(response_tx_3);
1279
1280        for i in 0..50 {
1281            let (m, h) = stream
1282                .next()
1283                .await
1284                .unwrap_or_else(|| panic!("expected message {}/50", i + 1))?;
1285            assert_eq!(m.data, test_data(i));
1286            h.ack();
1287        }
1288        let end = stream.next().await.transpose()?;
1289        assert!(end.is_none(), "Received extra message: {end:?}");
1290
1291        // Wait for the stream to join its background tasks.
1292        stream.shutdown_token().shutdown().await;
1293
1294        // Verify the acks went through.
1295        let mut got = Vec::new();
1296        while let Ok(ack_req) = ack_rx.try_recv() {
1297            assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1298            got.extend(ack_req.ack_ids);
1299        }
1300        assert_eq!(sorted(got), test_ids(0..50));
1301
1302        Ok(())
1303    }
1304
1305    #[tokio_test_no_panics(start_paused = true)]
1306    async fn resume_midstream_hits_permanent_error() -> anyhow::Result<()> {
1307        let (response_tx, response_rx) = channel(10);
1308        let (ack_tx, mut ack_rx) = unbounded_channel();
1309
1310        let mut seq = mockall::Sequence::new();
1311        let mut mock = MockSubscriber::new();
1312        // Start a successful stream, which will eventually disconnect.
1313        mock.expect_streaming_pull()
1314            .times(1)
1315            .in_sequence(&mut seq)
1316            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1317        // Simulate transient errors attempting to resume the stream.
1318        mock.expect_streaming_pull()
1319            .times(3)
1320            .in_sequence(&mut seq)
1321            .returning(|_| Err(TonicStatus::unavailable("try again")));
1322        // Simulate a permanent error attempting to resume the stream.
1323        mock.expect_streaming_pull()
1324            .times(1)
1325            .in_sequence(&mut seq)
1326            .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
1327        mock.expect_acknowledge().times(1..).returning(move |r| {
1328            ack_tx
1329                .send(r.into_inner())
1330                .expect("sending on channel always succeeds");
1331            Ok(TonicResponse::from(()))
1332        });
1333        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1334        let client = test_client(endpoint).await?;
1335        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1336
1337        response_tx.send(Ok(test_response(0..10))).await?;
1338        response_tx.send(Ok(test_response(10..20))).await?;
1339        response_tx
1340            .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1341            .await?;
1342        drop(response_tx);
1343
1344        for i in 0..20 {
1345            let (m, h) = stream
1346                .next()
1347                .await
1348                .unwrap_or_else(|| panic!("expected message {}/20", i + 1))?;
1349            assert_eq!(m.data, test_data(i));
1350            h.ack();
1351        }
1352        let err = stream
1353            .next()
1354            .await
1355            .transpose()
1356            .expect_err("expected an error from stream");
1357        assert!(err.status().is_some(), "{err:?}");
1358        let status = err.status().unwrap();
1359        assert_eq!(
1360            status.code,
1361            google_cloud_gax::error::rpc::Code::FailedPrecondition
1362        );
1363        assert_eq!(status.message, "fail");
1364
1365        // Wait for the stream to join its background tasks.
1366        stream.shutdown_token().shutdown().await;
1367
1368        // Verify the acks went through.
1369        let mut got = Vec::new();
1370        while let Ok(ack_req) = ack_rx.try_recv() {
1371            assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1372            got.extend(ack_req.ack_ids);
1373        }
1374        assert_eq!(sorted(got), test_ids(0..20));
1375
1376        Ok(())
1377    }
1378
1379    #[tokio_test_no_panics]
1380    async fn routing_header() -> anyhow::Result<()> {
1381        let mut mock = MockSubscriber::new();
1382
1383        mock.expect_streaming_pull().return_once(move |request| {
1384            let metadata = request.metadata();
1385            assert_eq!(
1386                metadata
1387                    .get("x-goog-request-params")
1388                    .expect("routing header missing"),
1389                "subscription=projects/p/subscriptions/s"
1390            );
1391            Err(TonicStatus::failed_precondition("ignored"))
1392        });
1393
1394        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1395        let client = test_client(endpoint).await?;
1396
1397        let _ = client
1398            .subscribe("projects/p/subscriptions/s")
1399            .build()
1400            .next()
1401            .await;
1402
1403        Ok(())
1404    }
1405
1406    #[cfg(feature = "unstable-stream")]
1407    #[tokio_test_no_panics(start_paused = true)]
1408    async fn into_stream() -> anyhow::Result<()> {
1409        use futures::TryStreamExt;
1410        let (response_tx, response_rx) = channel(10);
1411        let (ack_tx, mut ack_rx) = unbounded_channel();
1412
1413        let mut mock = MockSubscriber::new();
1414        mock.expect_streaming_pull()
1415            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1416        mock.expect_acknowledge().returning(move |r| {
1417            ack_tx
1418                .send(r.into_inner())
1419                .expect("sending on channel always succeeds");
1420            Ok(TonicResponse::from(()))
1421        });
1422
1423        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1424        let client = test_client(endpoint).await?;
1425
1426        let stream = client
1427            .subscribe("projects/p/subscriptions/s")
1428            .build()
1429            .into_stream();
1430
1431        response_tx.send(Ok(test_response(1..3))).await?;
1432        drop(response_tx);
1433
1434        let got: Vec<_> = stream
1435            .map_ok(|(m, h)| {
1436                h.ack();
1437                m.data
1438            })
1439            .try_collect()
1440            .await?;
1441        assert_eq!(got, vec![test_data(1), test_data(2)]);
1442
1443        let ack_req = ack_rx
1444            .recv()
1445            .await
1446            .expect("should receive acknowledgements");
1447        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1448        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..3));
1449
1450        Ok(())
1451    }
1452
1453    #[tokio_test_no_panics(start_paused = true)]
1454    async fn basic_lease_expiration() -> anyhow::Result<()> {
1455        const MAX_LEASE_EXTENSION: Duration = Duration::from_secs(10);
1456        const MAX_LEASE: Duration = Duration::from_secs(30);
1457        // We configure a max lease for this test (30s) that differs from the
1458        // default (600s) to verify that an application's configuration
1459        // overrides the default.
1460
1461        let start_time = Instant::now();
1462        let (response_tx, response_rx) = channel(10);
1463        let (extend_tx, mut extend_rx) = unbounded_channel();
1464
1465        let mut mock = MockSubscriber::new();
1466        mock.expect_streaming_pull()
1467            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1468        mock.expect_modify_ack_deadline().returning(move |r| {
1469            extend_tx
1470                .send(r.into_inner())
1471                .expect("sending on channel always succeeds");
1472            Ok(TonicResponse::from(()))
1473        });
1474        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1475        let client = test_client(endpoint).await?;
1476        let mut stream = client
1477            .subscribe("projects/p/subscriptions/s")
1478            .set_max_lease(MAX_LEASE)
1479            .set_max_lease_extension(MAX_LEASE_EXTENSION)
1480            .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1481            .build();
1482
1483        response_tx.send(Ok(test_response(0..1))).await?;
1484        drop(response_tx);
1485
1486        let (_m, _h) = stream
1487            .next()
1488            .await
1489            .expect("stream should yield a message")?;
1490
1491        // Advance the clock well past the expected message expiration,
1492        // recording the time at which we sent the last lease extension.
1493        let mut latest = None;
1494        for _ in 0..MAX_LEASE.as_secs() * 2 {
1495            while let Ok(r) = extend_rx.try_recv() {
1496                assert_ne!(r.ack_deadline_seconds, 0, "unexpectedly received a nack");
1497                latest = Some(start_time.elapsed());
1498            }
1499            tokio::time::advance(Duration::from_secs(1)).await;
1500            tokio::task::yield_now().await;
1501        }
1502
1503        // Verify when we stop sending lease extensions.
1504        let expected_range = (MAX_LEASE - MAX_LEASE_EXTENSION)..=MAX_LEASE;
1505        assert!(
1506            latest.is_some_and(|t| expected_range.contains(&t)),
1507            "{latest:?}"
1508        );
1509
1510        // Close the stream, to make sure pending operations complete.
1511        stream.shutdown_token().shutdown().await;
1512
1513        Ok(())
1514    }
1515
1516    #[tokio_test_no_panics(start_paused = true)]
1517    async fn shutdown_wait_for_processing() -> anyhow::Result<()> {
1518        let (response_tx, response_rx) = channel(10);
1519
1520        let mut mock = MockSubscriber::new();
1521        mock.expect_streaming_pull()
1522            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1523        mock.expect_acknowledge()
1524            .times(1)
1525            .returning(|_| Ok(TonicResponse::from(())));
1526        mock.expect_modify_ack_deadline()
1527            .returning(|_| Ok(TonicResponse::from(())));
1528        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1529        let client = test_client(endpoint).await?;
1530        let mut stream = client
1531            .subscribe("projects/p/subscriptions/s")
1532            .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
1533            .build();
1534
1535        response_tx.send(Ok(test_response(0..1))).await?;
1536        drop(response_tx);
1537
1538        let (_m, h) = stream
1539            .next()
1540            .await
1541            .expect("stream should yield a message")?;
1542
1543        tokio::spawn(async move {
1544            // Delay the ack until after the shutdown is signaled. It should
1545            // still go through.
1546            tokio::time::sleep(Duration::from_secs(5)).await;
1547            h.ack();
1548        });
1549
1550        // Close the stream, to make sure pending operations complete.
1551        stream.shutdown_token().shutdown().await;
1552
1553        Ok(())
1554    }
1555
1556    #[tokio_test_no_panics(start_paused = true)]
1557    async fn at_least_once_and_exactly_once() -> anyhow::Result<()> {
1558        let (response_tx, response_rx) = channel(10);
1559
1560        let mut mock = MockSubscriber::new();
1561        mock.expect_streaming_pull()
1562            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1563        mock.expect_modify_ack_deadline()
1564            .returning(|_| Ok(TonicResponse::from(())));
1565        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1566        let client = test_client(endpoint).await?;
1567        let mut stream = client
1568            .subscribe("projects/p/subscriptions/s")
1569            .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1570            .build();
1571
1572        response_tx.send(Ok(test_response(0..1))).await?;
1573        response_tx
1574            .send(Ok(test_exactly_once_response(1..2)))
1575            .await?;
1576        response_tx.send(Ok(test_response(2..3))).await?;
1577        response_tx
1578            .send(Ok(test_exactly_once_response(3..4)))
1579            .await?;
1580        drop(response_tx);
1581
1582        let (m, h) = stream.next().await.expect("should yield a message")?;
1583        assert_eq!(m.data, test_data(0));
1584        assert_eq!(h.ack_id(), test_id(0));
1585        assert!(matches!(h, Handler::AtLeastOnce(_)), "{h:?}");
1586
1587        let (m, h) = stream.next().await.expect("should yield a message")?;
1588        assert_eq!(m.data, test_data(1));
1589        assert_eq!(h.ack_id(), test_id(1));
1590        assert!(matches!(h, Handler::ExactlyOnce(_)), "{h:?}");
1591
1592        let (m, h) = stream.next().await.expect("should yield a message")?;
1593        assert_eq!(m.data, test_data(2));
1594        assert_eq!(h.ack_id(), test_id(2));
1595        assert!(matches!(h, Handler::AtLeastOnce(_)), "{h:?}");
1596
1597        let (m, h) = stream.next().await.expect("should yield a message")?;
1598        assert_eq!(m.data, test_data(3));
1599        assert_eq!(h.ack_id(), test_id(3));
1600        assert!(matches!(h, Handler::ExactlyOnce(_)), "{h:?}");
1601
1602        let end = stream.next().await.transpose()?;
1603        assert!(end.is_none(), "Received extra message: {end:?}");
1604
1605        // Wait for the stream to join its background tasks.
1606        stream.shutdown_token().shutdown().await;
1607
1608        Ok(())
1609    }
1610
1611    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1612    async fn cancel_before_open() -> anyhow::Result<()> {
1613        let mut mock = MockSubscriber::new();
1614        mock.expect_streaming_pull()
1615            .returning(|_| Err(TonicStatus::unavailable("try again")));
1616        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1617        let client = test_client(endpoint).await?;
1618        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1619        let shutdown_token = stream.shutdown_token();
1620
1621        let next = tokio::spawn(async move { stream.next().await });
1622        shutdown_token.shutdown().await;
1623
1624        let end = next.await?;
1625        assert!(end.is_none(), "Shutdown should end the stream, got {end:?}");
1626
1627        Ok(())
1628    }
1629
1630    #[tokio_test_no_panics(start_paused = true)]
1631    async fn cancel_midstream() -> anyhow::Result<()> {
1632        let (response_tx, response_rx) = channel(10);
1633        let (ack_tx, mut ack_rx) = unbounded_channel();
1634        let (nack_tx, mut nack_rx) = unbounded_channel();
1635
1636        let mut mock = MockSubscriber::new();
1637        mock.expect_streaming_pull()
1638            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1639        mock.expect_acknowledge().times(1).returning(move |r| {
1640            ack_tx
1641                .send(r.into_inner())
1642                .expect("sending on channel always succeeds");
1643            Ok(TonicResponse::from(()))
1644        });
1645        mock.expect_modify_ack_deadline()
1646            .times(1)
1647            .returning(move |r| {
1648                nack_tx
1649                    .send(r.into_inner())
1650                    .expect("sending on channel always succeeds");
1651                Ok(TonicResponse::from(()))
1652            });
1653        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1654        let client = test_client(endpoint).await?;
1655        let mut stream = client
1656            .subscribe("projects/p/subscriptions/s")
1657            .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
1658            .build();
1659        let shutdown_token = stream.shutdown_token();
1660
1661        response_tx.send(Ok(test_response(1..10))).await?;
1662        for i in 1..6 {
1663            let Some((m, h)) = stream.next().await.transpose()? else {
1664                anyhow::bail!("expected message {i}/5")
1665            };
1666            assert_eq!(m.data, test_data(i));
1667            h.ack();
1668        }
1669        let shutdown = tokio::spawn(async move {
1670            shutdown_token.shutdown().await;
1671        });
1672        tokio::task::yield_now().await;
1673        let end = stream.next().await.transpose()?;
1674        assert!(end.is_none(), "Shutdown should end the stream, got {end:?}");
1675
1676        // Verify that we drop the messages and handles in the pool that we have
1677        // not returned to the application yet.
1678        shutdown.await?;
1679
1680        let ack_req = ack_rx.try_recv()?;
1681        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1682        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1683
1684        let nack_req = nack_rx.try_recv()?;
1685        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1686        assert_eq!(nack_req.ack_deadline_seconds, 0);
1687        assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1688
1689        Ok(())
1690    }
1691
1692    #[test_case(ShutdownBehavior::NackImmediately)]
1693    #[test_case(ShutdownBehavior::WaitForProcessing)]
1694    #[tokio_test_no_panics(start_paused = true)]
1695    async fn shutdown_without_next(shutdown_behavior: ShutdownBehavior) -> anyhow::Result<()> {
1696        let (response_tx, response_rx) = channel(10);
1697        let (ack_tx, mut ack_rx) = unbounded_channel();
1698        let (nack_tx, mut nack_rx) = unbounded_channel();
1699
1700        let mut mock = MockSubscriber::new();
1701        mock.expect_streaming_pull()
1702            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1703        mock.expect_acknowledge().times(1).returning(move |r| {
1704            ack_tx
1705                .send(r.into_inner())
1706                .expect("sending on channel always succeeds");
1707            Ok(TonicResponse::from(()))
1708        });
1709        mock.expect_modify_ack_deadline()
1710            .times(1)
1711            .returning(move |r| {
1712                nack_tx
1713                    .send(r.into_inner())
1714                    .expect("sending on channel always succeeds");
1715                Ok(TonicResponse::from(()))
1716            });
1717        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1718        let client = test_client(endpoint).await?;
1719        let mut stream = client
1720            .subscribe("projects/p/subscriptions/s")
1721            .set_shutdown_behavior(shutdown_behavior)
1722            .build();
1723        let shutdown_token = stream.shutdown_token();
1724
1725        response_tx.send(Ok(test_response(1..10))).await?;
1726        for i in 1..6 {
1727            let Some((m, h)) = stream.next().await.transpose()? else {
1728                anyhow::bail!("expected message {i}/5")
1729            };
1730            assert_eq!(m.data, test_data(i));
1731            h.ack();
1732        }
1733        // Note that the application does not have to call `stream.next()`, or
1734        // `drop(stream)` to begin the shutdown procedure after a cancel.
1735        shutdown_token.shutdown().await;
1736
1737        let ack_req = ack_rx.try_recv()?;
1738        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1739        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1740
1741        let nack_req = nack_rx.try_recv()?;
1742        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1743        assert_eq!(nack_req.ack_deadline_seconds, 0);
1744        assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1745
1746        Ok(())
1747    }
1748
1749    #[test_case(ShutdownBehavior::NackImmediately)]
1750    #[test_case(ShutdownBehavior::WaitForProcessing)]
1751    #[tokio_test_no_panics(start_paused = true)]
1752    async fn stream_error_initiates_shutdown(
1753        shutdown_behavior: ShutdownBehavior,
1754    ) -> anyhow::Result<()> {
1755        let (response_tx, response_rx) = channel(10);
1756        let (ack_tx, mut ack_rx) = unbounded_channel();
1757
1758        let mut mock = MockSubscriber::new();
1759        mock.expect_streaming_pull()
1760            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1761        mock.expect_acknowledge().times(1).returning(move |r| {
1762            ack_tx
1763                .send(r.into_inner())
1764                .expect("sending on channel always succeeds");
1765            Ok(TonicResponse::from(()))
1766        });
1767        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1768        let client = test_client(endpoint).await?;
1769        let mut stream = client
1770            .subscribe("projects/p/subscriptions/s")
1771            .set_shutdown_behavior(shutdown_behavior)
1772            .build();
1773        let shutdown_token = stream.shutdown_token();
1774
1775        response_tx.send(Ok(test_response(0..1))).await?;
1776        response_tx
1777            .send(Err(TonicStatus::failed_precondition("fail")))
1778            .await?;
1779        drop(response_tx);
1780
1781        let (m, h) = stream.next().await.expect("should yield a message")?;
1782        assert_eq!(m.data, test_data(0));
1783        h.ack();
1784
1785        let err = stream.next().await.expect("should yield an error");
1786        assert!(err.is_err(), "{err:?}");
1787
1788        // Note that the application does not have to initiate a shutdown after
1789        // a permanent error.
1790        shutdown_token.wait_for_shutdown().await;
1791
1792        let ack_req = ack_rx.try_recv()?;
1793        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1794        assert_eq!(ack_req.ack_ids, test_ids(0..1));
1795
1796        Ok(())
1797    }
1798
1799    #[test_case(ShutdownBehavior::NackImmediately)]
1800    #[test_case(ShutdownBehavior::WaitForProcessing)]
1801    #[tokio_test_no_panics(start_paused = true)]
1802    async fn drop_cancels(shutdown_behavior: ShutdownBehavior) -> anyhow::Result<()> {
1803        let (response_tx, response_rx) = channel(10);
1804        let (ack_tx, mut ack_rx) = unbounded_channel();
1805        let (nack_tx, mut nack_rx) = unbounded_channel();
1806
1807        let mut mock = MockSubscriber::new();
1808        mock.expect_streaming_pull()
1809            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1810        mock.expect_acknowledge().times(1).returning(move |r| {
1811            ack_tx
1812                .send(r.into_inner())
1813                .expect("sending on channel always succeeds");
1814            Ok(TonicResponse::from(()))
1815        });
1816        mock.expect_modify_ack_deadline()
1817            .times(1)
1818            .returning(move |r| {
1819                nack_tx
1820                    .send(r.into_inner())
1821                    .expect("sending on channel always succeeds");
1822                Ok(TonicResponse::from(()))
1823            });
1824        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1825        let client = test_client(endpoint).await?;
1826        let mut stream = client
1827            .subscribe("projects/p/subscriptions/s")
1828            .set_shutdown_behavior(shutdown_behavior)
1829            .build();
1830        let shutdown_token = stream.shutdown_token();
1831
1832        response_tx.send(Ok(test_response(1..10))).await?;
1833        for i in 1..6 {
1834            let Some((m, h)) = stream.next().await.transpose()? else {
1835                anyhow::bail!("expected message {i}/5")
1836            };
1837            assert_eq!(m.data, test_data(i));
1838            h.ack();
1839        }
1840        drop(stream); // Equivalent to cancelling the `ShutdownToken`
1841        shutdown_token.wait_for_shutdown().await;
1842
1843        let ack_req = ack_rx.try_recv()?;
1844        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1845        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1846
1847        let nack_req = nack_rx.try_recv()?;
1848        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1849        assert_eq!(nack_req.ack_deadline_seconds, 0);
1850        assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1851
1852        Ok(())
1853    }
1854}