Skip to main content

google_cloud_pubsub/subscriber/
message_stream.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::builder::Subscribe;
16use super::handler::{AckResult, Action, AtLeastOnce, ExactlyOnce, Handler};
17use super::lease_loop::LeaseLoop;
18use super::lease_state::{AtLeastOnceInfo, ExactlyOnceInfo, LeaseInfo, LeaseOptions, NewMessage};
19use super::leaser::DefaultLeaser;
20use super::retry_policy::StreamRetryPolicy;
21use super::shutdown_token::ShutdownToken;
22use super::stream::Stream;
23use super::stub::TonicStreaming as _;
24use super::transport::Transport;
25use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse};
26use crate::model::Message;
27use crate::{Error, Result};
28use futures::FutureExt;
29use futures::future::{BoxFuture, Shared};
30use gaxi::grpc::from_status::to_gax_error;
31use gaxi::prost::FromProto as _;
32use google_cloud_gax::retry_result::RetryResult;
33use std::collections::VecDeque;
34use std::sync::Arc;
35use tokio::sync::mpsc::{UnboundedSender, WeakUnboundedSender, unbounded_channel};
36use tokio::sync::oneshot::Receiver;
37use tokio::time::Duration;
38use tokio_util::sync::{CancellationToken, DropGuard};
39
40/// Represents an open subscribe stream.
41///
42/// This is a stream-like struct for serving messages to an application.
43///
44/// # Example
45/// ```
46/// # use google_cloud_pubsub::client::Subscriber;
47/// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
48/// let mut stream = client
49///     .subscribe("projects/my-project/subscriptions/my-subscription")
50///     .build();
51/// while let Some((m, h)) = stream.next().await.transpose()? {
52///     println!("Received message m={m:?}");
53///     h.ack();
54/// }
55/// # Ok(()) }
56/// ```
57#[derive(Debug)]
58pub struct MessageStream {
59    /// Implementation of the `MessageStream`.
60    ///
61    /// To avoid atomic increments in the critical path, we separate the
62    /// shutdown token from the rest of the struct. This way we can hold a
63    /// mutable reference to `self.inner`, and a reference to `self.shutdown` at
64    /// the same time.
65    inner: MessageStreamImpl,
66
67    /// This future is ready when the lease loop shutdown completes.
68    lease_loop: Shared<BoxFuture<'static, ()>>,
69
70    /// A token that can detect a shutdown from the application.
71    shutdown: CancellationToken,
72
73    /// Signal a shutdown if the application drops this struct.
74    ///
75    /// This field is intentionally unused; it exists solely to trigger a
76    /// shutdown signal via its `Drop` implementation.
77    _shutdown_guard: DropGuard,
78}
79
80#[derive(Debug)]
81pub struct MessageStreamImpl {
82    /// The stub implementing this struct.
83    stub: Arc<Transport>,
84
85    /// The initial request used to start a stream.
86    initial_req: StreamingPullRequest,
87
88    /// The bidirectional stream.
89    ///
90    /// We choose to lazy-initialize the stream when the application asks for a
91    /// message because tonic will not yield the stream to us until the first
92    /// response is available.[^1]
93    ///
94    /// The usability of the `MessageStream` API would suffer if creating an instance
95    /// of `MessageStream` is blocked on the first message being available.
96    ///
97    /// [^1]: <https://github.com/hyperium/tonic/issues/515>
98    stream: Option<StreamState>,
99
100    /// Applications ask for messages one at a time. Individual stream responses
101    /// can contain multiple messages. We use `pool` to hold the extra messages
102    /// while we wait to serve them to applications.
103    ///
104    /// A FIFO queue is necessary to preserve ordering.
105    pool: VecDeque<(Message, HandlerInfo)>,
106
107    /// A sender for sending new messages from the stream into the lease
108    /// management task.
109    message_tx: WeakUnboundedSender<NewMessage>,
110
111    /// A sender for forwarding acks/nacks from the application to the lease
112    /// management task. Each `Handler` holds a clone of this.
113    ack_tx: WeakUnboundedSender<Action>,
114
115    /// A token that can initiate shutdown after a stream error.
116    shutdown: CancellationToken,
117}
118
119// We would rather always allocate enough space to hold the stream on the stack
120// than add a layer of indirection by `Box`ing it.
121#[allow(clippy::large_enum_variant)]
122#[derive(Debug)]
123enum StreamState {
124    /// The stream was cancelled or failed with a permanent error. It should not
125    /// be re-opened.
126    Closed,
127    /// The stream is active.
128    Active(Stream<Transport>),
129}
130
131impl MessageStream {
132    pub(super) fn new(builder: Subscribe) -> Self {
133        let stub = builder.inner;
134        let subscription = builder.subscription;
135
136        let (confirmed_tx, confirmed_rx) = unbounded_channel();
137        let (eo_extend_tx, eo_extend_rx) = unbounded_channel();
138        let leaser = DefaultLeaser::new(
139            stub.clone(),
140            confirmed_tx,
141            eo_extend_tx,
142            subscription.clone(),
143            builder.ack_deadline_seconds,
144            builder.grpc_subchannel_count,
145        );
146        let options = LeaseOptions {
147            max_lease: builder.max_lease,
148            max_lease_extension: Duration::from_secs(builder.ack_deadline_seconds as u64),
149            shutdown_behavior: builder.shutdown_behavior,
150            ..Default::default()
151        };
152        let LeaseLoop {
153            handle,
154            message_tx,
155            ack_tx,
156            cancel: shutdown,
157        } = LeaseLoop::new(leaser, confirmed_rx, eo_extend_rx, options);
158        let lease_loop = handle.map(|_| ()).boxed().shared();
159        let _shutdown_guard = shutdown.clone().drop_guard();
160
161        let initial_req = StreamingPullRequest {
162            subscription,
163            stream_ack_deadline_seconds: builder.ack_deadline_seconds,
164            max_outstanding_messages: builder.max_outstanding_messages,
165            max_outstanding_bytes: builder.max_outstanding_bytes,
166            client_id: builder.client_id,
167            // `protocol_version == 1` means we support receiving heartbeats
168            // (empty `StreamingPullResponse`s) from the server.
169            protocol_version: 1,
170            ..Default::default()
171        };
172
173        let inner = MessageStreamImpl {
174            stub,
175            initial_req,
176            stream: None,
177            pool: VecDeque::new(),
178            message_tx,
179            ack_tx,
180            shutdown: shutdown.clone(),
181        };
182        Self {
183            inner,
184            lease_loop,
185            shutdown,
186            _shutdown_guard,
187        }
188    }
189
190    /// Returns the next message received on this subscription.
191    ///
192    /// # Example
193    /// ```
194    /// # use google_cloud_pubsub::subscriber::MessageStream;
195    /// # async fn sample(mut stream: MessageStream) -> anyhow::Result<()> {
196    /// while let Some((m, h)) = stream.next().await.transpose()? {
197    ///     println!("Received message m={m:?}");
198    ///     h.ack();
199    /// }
200    /// # Ok(()) }
201    /// ```
202    ///
203    /// Returns the message data along with a [Handler] to acknowledge (ack) the
204    /// message.
205    ///
206    /// If the underlying stream encounters a permanent error, an `Error` is
207    /// returned instead.
208    ///
209    /// `None` represents the end of a stream, but in practice, the stream stays
210    /// open until it is cancelled or encounters a permanent error.
211    pub async fn next(&mut self) -> Option<Result<(Message, Handler)>> {
212        let next = tokio::select! {
213            biased;
214            _ = self.shutdown.cancelled() => {
215                self.inner.close();
216                None
217            },
218            n = self.inner.next() => n,
219        };
220        next
221    }
222
223    #[cfg(feature = "unstable-stream")]
224    #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
225    /// Converts the `MessageStream` to a [`futures::Stream`].
226    ///
227    /// # Example
228    /// ```
229    /// # use google_cloud_pubsub::subscriber::MessageStream;
230    /// # async fn sample(stream: MessageStream) -> anyhow::Result<()> {
231    /// use futures::TryStreamExt;
232    /// let mut stream = stream.into_stream();
233    /// while let Some((m, h)) = stream.try_next().await? { /* ... */ }
234    /// # Ok(()) }
235    /// ```
236    pub fn into_stream(self) -> impl futures::Stream<Item = Result<(Message, Handler)>> + Unpin {
237        use futures::stream::unfold;
238        Box::pin(unfold(self, |mut stream| async move {
239            stream.next().await.map(|item| (item, stream))
240        }))
241    }
242
243    /// Returns a shutdown token for the stream.
244    ///
245    /// # Example
246    /// ```
247    /// # use google_cloud_pubsub::subscriber::MessageStream;
248    /// # async fn sample(mut stream: MessageStream) {
249    /// // Get a shutdown token for the stream.
250    /// let shutdown_token = stream.shutdown_token();
251    ///
252    /// // Signal and await a shutdown of the stream.
253    /// shutdown_token.shutdown().await;
254    ///
255    /// // The stream stops yielding messages after shutdown is signaled.
256    /// assert!(stream.next().await.is_none());
257    /// # }
258    /// ```
259    ///
260    /// Use this token to signal and/or await shutdown of the stream.
261    ///
262    /// Awaiting a stream shutdown gives the subscriber time to flush its
263    /// pending acknowledgements, and schedule other messages for redelivery to
264    /// another client as soon as possible.
265    pub fn shutdown_token(&self) -> ShutdownToken {
266        ShutdownToken {
267            inner: self.shutdown.clone(),
268            // This future is ready when the lease loop shutdown completes.
269            fut: self.lease_loop.clone(),
270        }
271    }
272}
273
274impl MessageStreamImpl {
275    async fn next(&mut self) -> Option<Result<(Message, Handler)>> {
276        loop {
277            // Serve a message if we have one ready.
278            if let Some((m, hi)) = self.pool.pop_front() {
279                return Some(Ok((m, hi.into_handler(self.ack_tx.upgrade()?))));
280            }
281
282            // Otherwise, read the next response from the stream, which will
283            // likely populate the message pool.
284            //
285            // Note that a successful read does not necessarily mean there is a
286            // message in the pool. The server occasionally sends heartbeats
287            // (responses with an empty message list). Hence the loop.
288            if let Err(e) = self.populate_pool().await? {
289                // Handle errors opening or reading from the stream.
290                match StreamRetryPolicy::on_midstream_error(e) {
291                    RetryResult::Continue(_) => {
292                        // The stream failed with a transient error. Reset the stream.
293                        self.stream = None;
294                        continue;
295                    }
296                    RetryResult::Permanent(e) | RetryResult::Exhausted(e) => {
297                        // The stream failed with a permanent error. Return the error.
298                        self.close();
299                        return Some(Err(e));
300                    }
301                }
302            }
303        }
304    }
305
306    /// Make a new attempt to open the underlying gRPC stream.
307    async fn open_stream(&mut self) -> Result<()> {
308        let stream = Stream::<Transport>::new(self.stub.clone(), self.initial_req.clone()).await?;
309        self.stream = Some(StreamState::Active(stream));
310        Ok(())
311    }
312
313    /// Reads the next response from the stream.
314    ///
315    /// If necessary, this method will open a new stream.
316    ///
317    /// If we receive an error either opening or reading from the stream, we
318    /// return it.
319    async fn next_response(&mut self) -> Option<Result<StreamingPullResponse>> {
320        if self.stream.is_none() {
321            // Open the stream, if necessary.
322            if let Err(e) = self.open_stream().await {
323                return Some(Err(e));
324            }
325        }
326
327        let stream = match self.stream.as_mut()? {
328            StreamState::Closed => return None,
329            StreamState::Active(s) => s,
330        };
331        stream
332            .next_message()
333            .await
334            .map_err(to_gax_error)
335            .transpose()
336    }
337
338    /// Populate the message pool by reading from the stream.
339    ///
340    /// Read the next response from the stream. If necessary, this method will
341    /// open a new stream.
342    ///
343    /// If we receive a response, we store the messages in `self.pool` and
344    /// forward the ack IDs to the lease management task.
345    ///
346    /// If we receive an error reading from the stream, we return it.
347    async fn populate_pool(&mut self) -> Option<Result<()>> {
348        // Read the next response from the stream.
349        let resp = match self.next_response().await? {
350            Ok(resp) => resp,
351            Err(e) => return Some(Err(e)),
352        };
353
354        let exactly_once = resp
355            .subscription_properties
356            .is_some_and(|m| m.exactly_once_delivery_enabled);
357
358        // Process the received messages in the response.
359        for rm in resp.received_messages {
360            let Some(message) = rm.message else {
361                // The message field should always be present. If not, the proto
362                // message was corrupted while in transit, or there is a bug in
363                // the service.
364                //
365                // The client can just ignore an ack ID without an associated
366                // message.
367                continue;
368            };
369
370            let delivery_attempt = (rm.delivery_attempt > 0).then_some(rm.delivery_attempt);
371
372            let (lease_info, handler_info) = if exactly_once {
373                let (result_tx, result_rx) = tokio::sync::oneshot::channel();
374                (
375                    LeaseInfo::ExactlyOnce(ExactlyOnceInfo::new(result_tx)),
376                    HandlerInfo::ExactlyOnce {
377                        ack_id: rm.ack_id.clone(),
378                        result_rx,
379                        delivery_attempt,
380                    },
381                )
382            } else {
383                (
384                    LeaseInfo::AtLeastOnce(AtLeastOnceInfo::new()),
385                    HandlerInfo::AtLeastOnce {
386                        ack_id: rm.ack_id.clone(),
387                        delivery_attempt,
388                    },
389                )
390            };
391
392            let _ = self.message_tx.upgrade()?.send(NewMessage {
393                ack_id: rm.ack_id,
394                lease_info,
395            });
396            let message = match message.cnv().map_err(Error::deser) {
397                Ok(message) => message,
398                Err(e) => return Some(Err(e)),
399            };
400            self.pool.push_back((message, handler_info));
401        }
402        Some(Ok(()))
403    }
404
405    // Permanently close the stream.
406    fn close(&mut self) {
407        self.stream = Some(StreamState::Closed);
408        self.pool.clear();
409        self.shutdown.cancel();
410    }
411}
412
413/// A `Handler` without its action `Sender`.
414///
415/// We only want to create strong `Sender`s for `Handler`s that we yield to the
416/// application.
417///
418/// Note that the application should be able to signal a shutdown without
419/// dropping the `MessageStream` or calling `MessageStream::next()`.
420///
421/// In these cases, the items in the `MessageStream::pool` are not cleared. So,
422/// if we hold a strong `Sender` in the `pool`, we would never initiate a
423/// shutdown when configured to `WaitForProcessing`.
424#[derive(Debug)]
425enum HandlerInfo {
426    AtLeastOnce {
427        ack_id: String,
428        delivery_attempt: Option<i32>,
429    },
430    ExactlyOnce {
431        ack_id: String,
432        result_rx: Receiver<AckResult>,
433        delivery_attempt: Option<i32>,
434    },
435}
436
437impl HandlerInfo {
438    /// Convert this type to a `Handler`, by adding its action `Sender` before
439    /// serving it to the application.
440    fn into_handler(self, ack_tx: UnboundedSender<Action>) -> Handler {
441        match self {
442            HandlerInfo::AtLeastOnce {
443                ack_id,
444                delivery_attempt,
445            } => Handler::AtLeastOnce(AtLeastOnce::new(ack_id, ack_tx, delivery_attempt)),
446            HandlerInfo::ExactlyOnce {
447                ack_id,
448                result_rx,
449                delivery_attempt,
450            } => Handler::ExactlyOnce(ExactlyOnce::new(
451                ack_id,
452                ack_tx,
453                result_rx,
454                delivery_attempt,
455            )),
456        }
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::super::ShutdownBehavior;
463    use super::super::client::Subscriber;
464    use super::super::keepalive::KEEPALIVE_PERIOD;
465    use super::super::lease_state::tests::{test_id, test_ids};
466    use super::super::stream::{INITIAL_DELAY, MAXIMUM_DELAY};
467    use super::*;
468    use gaxi::grpc::tonic::{Response as TonicResponse, Status as TonicStatus};
469    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
470    use google_cloud_test_macros::tokio_test_no_panics;
471    use pubsub_grpc_mock::google::pubsub::v1;
472    use pubsub_grpc_mock::{MockSubscriber, start};
473    use test_case::test_case;
474    use tokio::sync::mpsc::{channel, unbounded_channel};
475    use tokio::task::{JoinHandle, JoinSet};
476    use tokio::time::{Duration, Instant};
477
478    fn sorted(mut v: Vec<String>) -> Vec<String> {
479        v.sort();
480        v
481    }
482
483    fn test_data(v: i32) -> bytes::Bytes {
484        bytes::Bytes::from(format!("data-{}", test_id(v)))
485    }
486
487    fn test_response(range: std::ops::Range<i32>) -> v1::StreamingPullResponse {
488        v1::StreamingPullResponse {
489            received_messages: range
490                .into_iter()
491                .map(|i| v1::ReceivedMessage {
492                    ack_id: test_id(i),
493                    message: Some(v1::PubsubMessage {
494                        data: test_data(i).to_vec(),
495                        ..Default::default()
496                    }),
497                    ..Default::default()
498                })
499                .collect(),
500            ..Default::default()
501        }
502    }
503
504    fn test_exactly_once_response(range: std::ops::Range<i32>) -> v1::StreamingPullResponse {
505        v1::StreamingPullResponse {
506            subscription_properties: Some(v1::streaming_pull_response::SubscriptionProperties {
507                exactly_once_delivery_enabled: true,
508                ..Default::default()
509            }),
510            received_messages: range
511                .into_iter()
512                .map(|i| v1::ReceivedMessage {
513                    ack_id: test_id(i),
514                    message: Some(v1::PubsubMessage {
515                        data: test_data(i).to_vec(),
516                        ..Default::default()
517                    }),
518                    ..Default::default()
519                })
520                .collect(),
521            ..Default::default()
522        }
523    }
524
525    async fn test_client(endpoint: String) -> anyhow::Result<Subscriber> {
526        Ok(Subscriber::builder()
527            .with_endpoint(endpoint)
528            .with_credentials(Anonymous::new().build())
529            .build()
530            .await?)
531    }
532
533    #[tokio_test_no_panics]
534    async fn error_starting_stream() -> anyhow::Result<()> {
535        let mut mock = MockSubscriber::new();
536        mock.expect_streaming_pull()
537            .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
538        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
539        let client = test_client(endpoint).await?;
540        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
541        let err = stream
542            .next()
543            .await
544            .expect("stream should not be empty")
545            .expect_err("the first streamed item should be an error");
546        assert!(err.status().is_some(), "{err:?}");
547        let status = err.status().unwrap();
548        assert_eq!(
549            status.code,
550            google_cloud_gax::error::rpc::Code::FailedPrecondition
551        );
552        assert_eq!(status.message, "fail");
553
554        Ok(())
555    }
556
557    #[tokio_test_no_panics]
558    async fn permanent_error_ends_stream() -> anyhow::Result<()> {
559        let mut mock = MockSubscriber::new();
560        mock.expect_streaming_pull()
561            .returning(|_| Err(TonicStatus::failed_precondition("fail")));
562        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
563        let client = test_client(endpoint).await?;
564        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
565        let next = stream.next().await;
566        assert!(
567            matches!(next, Some(Err(_))),
568            "expected permanent error, got {next:?}"
569        );
570
571        let next = stream.next().await;
572        assert!(next.is_none(), "expected end of stream, got {next:?}");
573
574        Ok(())
575    }
576
577    #[tokio_test_no_panics]
578    async fn initial_request() -> anyhow::Result<()> {
579        const MIB: i64 = 1024 * 1024;
580
581        // We use this channel to surface writes (requests) from outside our
582        // mock expectation.
583        let (recover_writes_tx, mut recover_writes_rx) = channel(1);
584
585        let mut mock = MockSubscriber::new();
586        mock.expect_streaming_pull().return_once(move |request| {
587            tokio::spawn(async move {
588                // Note that this task stays alive as long as we hold
589                // `recover_writes_rx`.
590                let mut request_rx = request.into_inner();
591                while let Some(request) = request_rx.recv().await {
592                    recover_writes_tx
593                        .send(request)
594                        .await
595                        .expect("forwarding writes always succeeds");
596                }
597            });
598            Err(TonicStatus::failed_precondition("fail"))
599        });
600
601        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
602        let client = test_client(endpoint).await?;
603        let _ = client
604            .subscribe("projects/p/subscriptions/s")
605            .set_max_lease_extension(Duration::from_secs(20))
606            .set_max_outstanding_messages(2000)
607            .set_max_outstanding_bytes(200 * MIB)
608            .build()
609            .next()
610            .await;
611
612        let initial_req = recover_writes_rx
613            .recv()
614            .await
615            .expect("should receive a request")?;
616        assert_eq!(initial_req.subscription, "projects/p/subscriptions/s");
617        assert_eq!(initial_req.stream_ack_deadline_seconds, 20);
618        assert_eq!(initial_req.max_outstanding_messages, 2000);
619        assert_eq!(initial_req.max_outstanding_bytes, 200 * MIB);
620        assert!(
621            !initial_req.client_id.is_empty(),
622            "initial request has empty client id: {initial_req:?}"
623        );
624        assert!(
625            initial_req.protocol_version >= 1,
626            "protocol_version={}",
627            initial_req.protocol_version
628        );
629
630        Ok(())
631    }
632
633    #[tokio_test_no_panics(start_paused = true)]
634    async fn basic_success() -> anyhow::Result<()> {
635        let (response_tx, response_rx) = channel(10);
636        let (ack_tx, mut ack_rx) = unbounded_channel();
637
638        let mut mock = MockSubscriber::new();
639        mock.expect_streaming_pull()
640            .return_once(|_| Ok(TonicResponse::from(response_rx)));
641        mock.expect_acknowledge().returning(move |r| {
642            ack_tx
643                .send(r.into_inner())
644                .expect("sending on channel always succeeds");
645            Ok(TonicResponse::from(()))
646        });
647        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
648        let client = test_client(endpoint).await?;
649        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
650
651        response_tx.send(Ok(test_response(1..2))).await?;
652        response_tx.send(Ok(test_response(2..4))).await?;
653        response_tx.send(Ok(test_response(4..7))).await?;
654        drop(response_tx);
655
656        for i in 1..7 {
657            let Some((m, h)) = stream.next().await.transpose()? else {
658                anyhow::bail!("expected message {i}/6")
659            };
660            assert_eq!(m.data, test_data(i));
661            assert_eq!(h.ack_id(), test_id(i));
662            h.ack();
663        }
664        let end = stream.next().await.transpose()?;
665        assert!(end.is_none(), "Received extra message: {end:?}");
666
667        // Wait for the stream to join its background tasks.
668        stream.shutdown_token().shutdown().await;
669
670        // Verify the acks went through.
671        let ack_req = ack_rx.try_recv()?;
672        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
673        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..7));
674
675        Ok(())
676    }
677
678    #[test_case(0, None, false; "at_least_once_zero_maps_to_none")]
679    #[test_case(5, Some(5), false; "at_least_once_positive_maps_to_some")]
680    #[test_case(-1, None, false; "at_least_once_negative_maps_to_none")]
681    #[test_case(1, Some(1), false; "at_least_once_one_maps_to_some")]
682    #[test_case(i32::MAX, Some(i32::MAX), false; "at_least_once_max_maps_to_some")]
683    #[test_case(0, None, true; "exactly_once_zero_maps_to_none")]
684    #[test_case(5, Some(5), true; "exactly_once_positive_maps_to_some")]
685    #[test_case(-1, None, true; "exactly_once_negative_maps_to_none")]
686    #[test_case(1, Some(1), true; "exactly_once_one_maps_to_some")]
687    #[test_case(i32::MAX, Some(i32::MAX), true; "exactly_once_max_maps_to_some")]
688    #[tokio_test_no_panics(start_paused = true)]
689    async fn delivery_attempt_mapping(
690        input: i32,
691        expected: Option<i32>,
692        exactly_once: bool,
693    ) -> anyhow::Result<()> {
694        let (response_tx, response_rx) = channel(10);
695
696        let mut mock = MockSubscriber::new();
697        mock.expect_streaming_pull()
698            .return_once(|_| Ok(TonicResponse::from(response_rx)));
699
700        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
701        let client = test_client(endpoint).await?;
702        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
703
704        let resp = v1::StreamingPullResponse {
705            subscription_properties: Some(v1::streaming_pull_response::SubscriptionProperties {
706                exactly_once_delivery_enabled: exactly_once,
707                ..Default::default()
708            }),
709            received_messages: vec![v1::ReceivedMessage {
710                ack_id: test_id(0),
711                message: Some(v1::PubsubMessage {
712                    data: test_data(0).to_vec(),
713                    ..Default::default()
714                }),
715                delivery_attempt: input,
716            }],
717            ..Default::default()
718        };
719
720        response_tx.send(Ok(resp)).await?;
721        drop(response_tx);
722
723        let Some((_, h)) = stream.next().await.transpose()? else {
724            anyhow::bail!("expected message")
725        };
726        assert_eq!(h.delivery_attempt(), expected);
727
728        Ok(())
729    }
730
731    #[tokio_test_no_panics(start_paused = true)]
732    async fn basic_success_exactly_once() -> anyhow::Result<()> {
733        let (response_tx, response_rx) = channel(10);
734        let (ack_tx, mut ack_rx) = unbounded_channel();
735
736        let mut mock = MockSubscriber::new();
737        mock.expect_streaming_pull()
738            .return_once(|_| Ok(TonicResponse::from(response_rx)));
739        mock.expect_acknowledge().returning(move |r| {
740            ack_tx
741                .send(r.into_inner())
742                .expect("sending on channel always succeeds");
743            Ok(TonicResponse::from(()))
744        });
745        mock.expect_modify_ack_deadline()
746            .returning(|_| Ok(TonicResponse::from(())));
747        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
748        let client = test_client(endpoint).await?;
749        let mut stream = client
750            .subscribe("projects/p/subscriptions/s")
751            .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
752            .build();
753
754        response_tx
755            .send(Ok(test_exactly_once_response(1..2)))
756            .await?;
757        response_tx
758            .send(Ok(test_exactly_once_response(2..4)))
759            .await?;
760        response_tx
761            .send(Ok(test_exactly_once_response(4..7)))
762            .await?;
763        drop(response_tx);
764
765        let mut acks = JoinSet::new();
766        for i in 1..7 {
767            let Some((m, Handler::ExactlyOnce(h))) = stream.next().await.transpose()? else {
768                anyhow::bail!("expected message {i}/6")
769            };
770            assert_eq!(m.data, test_data(i));
771            assert_eq!(h.ack_id(), test_id(i));
772            acks.spawn(h.confirmed_ack());
773        }
774        let end = stream.next().await.transpose()?;
775        assert!(end.is_none(), "Received extra message: {end:?}");
776
777        // Wait for the stream to join its background tasks.
778        stream.shutdown_token().shutdown().await;
779
780        // Verify the acks went through.
781        while let Some(r) = acks.join_next().await {
782            r??;
783        }
784        let ack_req = ack_rx.try_recv()?;
785        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
786        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..7));
787
788        Ok(())
789    }
790
791    #[tokio_test_no_panics(start_paused = true)]
792    async fn basic_lease_management() -> anyhow::Result<()> {
793        let (response_tx, response_rx) = channel(10);
794        let (ack_tx, mut ack_rx) = unbounded_channel();
795        let (nack_tx, mut nack_rx) = unbounded_channel();
796        let (extend_tx, mut extend_rx) = unbounded_channel();
797
798        let mut mock = MockSubscriber::new();
799        mock.expect_streaming_pull()
800            .return_once(|_| Ok(TonicResponse::from(response_rx)));
801        mock.expect_acknowledge().returning(move |r| {
802            ack_tx
803                .send(r.into_inner())
804                .expect("sending on channel always succeeds");
805            Ok(TonicResponse::from(()))
806        });
807        mock.expect_modify_ack_deadline().returning(move |r| {
808            let r = r.into_inner();
809            if r.ack_deadline_seconds == 0 {
810                nack_tx.send(r).expect("sending on channel always succeeds");
811            } else {
812                extend_tx
813                    .send(r)
814                    .expect("sending on channel always succeeds");
815            }
816            Ok(TonicResponse::from(()))
817        });
818        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
819        let client = test_client(endpoint).await?;
820        let mut stream = client
821            .subscribe("projects/p/subscriptions/s")
822            .set_max_lease_extension(Duration::from_secs(10))
823            .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
824            .build();
825
826        response_tx.send(Ok(test_response(0..30))).await?;
827        drop(response_tx);
828
829        // Ack some messages
830        for i in 0..10 {
831            let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
832                anyhow::bail!("expected message {i}")
833            };
834            h.ack();
835        }
836        // Nack some messages
837        for i in 10..20 {
838            let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
839                anyhow::bail!("expected message {i}")
840            };
841            h.nack();
842        }
843        // Take a long time to process some messages
844        let mut hold = Vec::new();
845        for i in 20..30 {
846            let Some((_, Handler::AtLeastOnce(h))) = stream.next().await.transpose()? else {
847                anyhow::bail!("expected message {i}")
848            };
849            hold.push(h);
850        }
851
852        // Advance the clock 10s, which is the stream ack deadline. In this
853        // time, we should attempt at least one lease extension RPC.
854        tokio::time::advance(Duration::from_secs(10)).await;
855
856        // Close the stream, to make sure pending operations complete.
857        stream.shutdown_token().shutdown().await;
858
859        // Verify the acks went through.
860        let ack_req = ack_rx.try_recv()?;
861        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
862        assert_eq!(sorted(ack_req.ack_ids), test_ids(0..10));
863        assert!(ack_rx.is_empty(), "{ack_rx:?}");
864
865        // Verify the initial nacks went through.
866        let nack_req = nack_rx.try_recv()?;
867        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
868        assert_eq!(nack_req.ack_deadline_seconds, 0);
869        assert_eq!(sorted(nack_req.ack_ids), test_ids(10..20));
870
871        // Verify that we nack the leftover messages when the stream shuts down.
872        let nack_req = nack_rx.try_recv()?;
873        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
874        assert_eq!(nack_req.ack_deadline_seconds, 0);
875        assert_eq!(sorted(nack_req.ack_ids), test_ids(20..30));
876        assert!(nack_rx.is_empty(), "{nack_rx:?}");
877
878        // Verify at least one lease extension attempt was made.
879        let extend_req = extend_rx.try_recv()?;
880        assert_eq!(extend_req.subscription, "projects/p/subscriptions/s");
881        assert_eq!(extend_req.ack_deadline_seconds, 10);
882        assert_eq!(sorted(extend_req.ack_ids), test_ids(20..30));
883
884        Ok(())
885    }
886
887    #[tokio_test_no_panics(start_paused = true)]
888    async fn delayed_responses() -> anyhow::Result<()> {
889        // In this test, we verify the case where an application asks for a
890        // message, but a response is not immediately available on the stream.
891
892        let (response_tx, response_rx) = channel(10);
893        let handle: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
894            tokio::time::sleep(Duration::from_millis(20)).await;
895            response_tx.send(Ok(test_response(1..2))).await?;
896            Ok(())
897        });
898
899        let mut mock = MockSubscriber::new();
900        mock.expect_streaming_pull()
901            .return_once(|_| Ok(TonicResponse::from(response_rx)));
902        mock.expect_modify_ack_deadline()
903            .returning(|_| Ok(TonicResponse::from(())));
904        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
905        let client = test_client(endpoint).await?;
906        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
907        let (m, h) = stream
908            .next()
909            .await
910            .transpose()?
911            .expect("stream should wait for a message");
912        assert_eq!(m.data, test_data(1));
913        assert_eq!(h.ack_id(), test_id(1));
914
915        handle.await??;
916
917        Ok(())
918    }
919
920    #[tokio_test_no_panics]
921    async fn serves_messages_immediately() -> anyhow::Result<()> {
922        // This test verifies we do not do something crazy like draining the
923        // stream (which would never end) before serving messages to the
924        // application.
925
926        let (response_tx, response_rx) = channel(10);
927
928        let mut mock = MockSubscriber::new();
929        mock.expect_streaming_pull()
930            .return_once(|_| Ok(TonicResponse::from(response_rx)));
931        mock.expect_modify_ack_deadline()
932            .returning(|_| Ok(TonicResponse::from(())));
933        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
934        let client = test_client(endpoint).await?;
935        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
936
937        for i in 1..7 {
938            response_tx.send(Ok(test_response(i..i + 1))).await?;
939
940            let Some((m, h)) = stream.next().await.transpose()? else {
941                anyhow::bail!("expected message {i}/6")
942            };
943            assert_eq!(m.data, test_data(i));
944            assert_eq!(h.ack_id(), test_id(i));
945        }
946        drop(response_tx);
947        let end = stream.next().await.transpose()?;
948        assert!(end.is_none(), "Received extra message: {end:?}");
949
950        Ok(())
951    }
952
953    #[tokio_test_no_panics]
954    async fn handles_empty_response() -> anyhow::Result<()> {
955        let (response_tx, response_rx) = channel(10);
956
957        let mut mock = MockSubscriber::new();
958        mock.expect_streaming_pull()
959            .return_once(|_| Ok(TonicResponse::from(response_rx)));
960        mock.expect_modify_ack_deadline()
961            .returning(|_| Ok(TonicResponse::from(())));
962        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
963        let client = test_client(endpoint).await?;
964        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
965
966        response_tx.send(Ok(test_response(1..2))).await?;
967        // See if we can handle an empty range
968        response_tx.send(Ok(test_response(2..2))).await?;
969        response_tx.send(Ok(test_response(2..3))).await?;
970        drop(response_tx);
971
972        for i in 1..3 {
973            let Some((m, h)) = stream.next().await.transpose()? else {
974                anyhow::bail!("expected message {i}/2")
975            };
976            assert_eq!(m.data, test_data(i));
977            assert_eq!(h.ack_id(), test_id(i));
978        }
979        let end = stream.next().await.transpose()?;
980        assert!(end.is_none(), "Received extra message: {end:?}");
981
982        Ok(())
983    }
984
985    #[tokio_test_no_panics(start_paused = true)]
986    async fn handles_missing_message_field() -> anyhow::Result<()> {
987        let (response_tx, response_rx) = channel(10);
988        let (extend_tx, mut extend_rx) = unbounded_channel();
989
990        let bad = v1::StreamingPullResponse {
991            received_messages: vec![v1::ReceivedMessage {
992                ack_id: "ignored-ack-id".to_string(),
993                message: None,
994                ..Default::default()
995            }],
996            ..Default::default()
997        };
998
999        let mut mock = MockSubscriber::new();
1000        mock.expect_streaming_pull()
1001            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1002        mock.expect_modify_ack_deadline().returning(move |r| {
1003            let r = r.into_inner();
1004            if r.ack_deadline_seconds != 0 {
1005                extend_tx
1006                    .send(r)
1007                    .expect("sending on channel always succeeds");
1008            }
1009            Ok(TonicResponse::from(()))
1010        });
1011        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1012        let client = test_client(endpoint).await?;
1013        let mut stream = client
1014            .subscribe("projects/p/subscriptions/s")
1015            .set_max_lease_extension(Duration::from_secs(10))
1016            .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1017            .build();
1018
1019        response_tx.send(Ok(test_response(1..4))).await?;
1020        // See if we can handle an empty range
1021        response_tx.send(Ok(bad)).await?;
1022        response_tx.send(Ok(test_response(4..7))).await?;
1023        drop(response_tx);
1024
1025        let mut handlers = Vec::new();
1026        for i in 1..7 {
1027            let Some((m, h)) = stream.next().await.transpose()? else {
1028                anyhow::bail!("expected message {i}/6")
1029            };
1030            assert_eq!(m.data, test_data(i));
1031            assert_eq!(h.ack_id(), test_id(i));
1032            handlers.push(h);
1033        }
1034
1035        // Advance the clock 10s, which is the stream ack deadline. In this
1036        // time, we should attempt at least one lease extension RPC.
1037        tokio::time::advance(Duration::from_secs(10)).await;
1038
1039        // Close the stream, to make sure pending operations complete.
1040        stream.shutdown_token().shutdown().await;
1041
1042        // Verify at least one lease extension attempt was made.
1043        let extend_req = extend_rx.try_recv()?;
1044        assert_eq!(extend_req.subscription, "projects/p/subscriptions/s");
1045        assert_eq!(extend_req.ack_deadline_seconds, 10);
1046        // Note that we do not expect to see "ignored-ack-id".
1047        assert_eq!(sorted(extend_req.ack_ids), test_ids(1..7));
1048
1049        Ok(())
1050    }
1051
1052    #[tokio_test_no_panics]
1053    async fn permanent_error_midstream() -> anyhow::Result<()> {
1054        let (response_tx, response_rx) = channel(10);
1055
1056        let mut mock = MockSubscriber::new();
1057        mock.expect_streaming_pull()
1058            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1059        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1060        let client = test_client(endpoint).await?;
1061        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1062
1063        response_tx.send(Ok(test_response(1..4))).await?;
1064        response_tx
1065            .send(Err(TonicStatus::failed_precondition("fail")))
1066            .await?;
1067        drop(response_tx);
1068
1069        for i in 1..4 {
1070            let Some((m, h)) = stream.next().await.transpose()? else {
1071                anyhow::bail!("expected message {i}/3")
1072            };
1073            assert_eq!(m.data, test_data(i));
1074            assert_eq!(h.ack_id(), test_id(i));
1075        }
1076        let err = stream
1077            .next()
1078            .await
1079            .transpose()
1080            .expect_err("expected an error from stream");
1081        assert!(err.status().is_some(), "{err:?}");
1082        let status = err.status().unwrap();
1083        assert_eq!(
1084            status.code,
1085            google_cloud_gax::error::rpc::Code::FailedPrecondition
1086        );
1087        assert_eq!(status.message, "fail");
1088
1089        Ok(())
1090    }
1091
1092    #[tokio_test_no_panics(start_paused = true)]
1093    async fn keepalives() -> anyhow::Result<()> {
1094        // We use this channel to surface writes (requests) from outside our
1095        // mock expectation.
1096        let (recover_writes_tx, mut recover_writes_rx) = channel(1);
1097        let (response_tx, response_rx) = channel(10);
1098
1099        let mut mock = MockSubscriber::new();
1100        mock.expect_streaming_pull().return_once(move |request| {
1101            tokio::spawn(async move {
1102                // Note that this task stays alive as long as we hold
1103                // `recover_writes_rx`.
1104                let mut request_rx = request.into_inner();
1105                while let Some(request) = request_rx.recv().await {
1106                    recover_writes_tx
1107                        .send(request)
1108                        .await
1109                        .expect("forwarding writes always succeeds");
1110                }
1111            });
1112            Ok(TonicResponse::from(response_rx))
1113        });
1114        mock.expect_modify_ack_deadline()
1115            .returning(|_| Ok(TonicResponse::from(())));
1116
1117        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1118        let client = test_client(endpoint).await?;
1119        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1120        response_tx.send(Ok(test_response(1..4))).await?;
1121        let _ = stream.next().await;
1122
1123        let initial_req = recover_writes_rx
1124            .recv()
1125            .await
1126            .expect("should receive an initial request")?;
1127        assert_eq!(initial_req.subscription, "projects/p/subscriptions/s");
1128
1129        // Verify that we receive at least one keepalive request on the stream.
1130        tokio::time::advance(KEEPALIVE_PERIOD).await;
1131        let keepalive_req = recover_writes_rx
1132            .recv()
1133            .await
1134            .expect("should receive a keepalive request")?;
1135        assert_eq!(keepalive_req, v1::StreamingPullRequest::default());
1136
1137        // Drop the stream, which should signal a shutdown of the keepalive
1138        // task.
1139        drop(stream);
1140
1141        // Advance the time far enough to expect a keepalive ping, if the
1142        // keepalive task was still running.
1143        tokio::time::advance(4 * KEEPALIVE_PERIOD).await;
1144        assert!(recover_writes_rx.is_empty(), "{recover_writes_rx:?}");
1145
1146        Ok(())
1147    }
1148
1149    #[tokio_test_no_panics]
1150    async fn client_id() -> anyhow::Result<()> {
1151        // We use this channel to surface writes (requests) from outside our
1152        // mock expectation.
1153        let (recover_writes_tx, mut recover_writes_rx) = channel(10);
1154        let recover_writes_tx = std::sync::Arc::new(tokio::sync::Mutex::new(recover_writes_tx));
1155
1156        let mut mock = MockSubscriber::new();
1157        mock.expect_streaming_pull()
1158            .times(3)
1159            .returning(move |request| {
1160                let tx = recover_writes_tx.clone();
1161                tokio::spawn(async move {
1162                    // Note that this task stays alive as long as we hold
1163                    // `recover_writes_rx`.
1164                    let mut request_rx = request.into_inner();
1165                    while let Some(request) = request_rx.recv().await {
1166                        tx.lock()
1167                            .await
1168                            .send(request)
1169                            .await
1170                            .expect("forwarding writes always succeeds");
1171                    }
1172                });
1173                Err(TonicStatus::failed_precondition("fail"))
1174            });
1175
1176        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1177
1178        // Make two requests with the same client. The requests should have the
1179        // same client ID.
1180        let c1 = test_client(endpoint.clone()).await?;
1181        let _ = c1
1182            .subscribe("projects/p/subscriptions/s")
1183            .build()
1184            .next()
1185            .await;
1186        let req1 = recover_writes_rx
1187            .recv()
1188            .await
1189            .expect("should receive a request")?;
1190        let _ = c1
1191            .subscribe("projects/p/subscriptions/s")
1192            .build()
1193            .next()
1194            .await;
1195        let req2 = recover_writes_rx
1196            .recv()
1197            .await
1198            .expect("should receive a request")?;
1199        assert_eq!(req1.client_id, req2.client_id);
1200
1201        // Make a third request with a different client. This request should
1202        // have a different client ID.
1203        let c2 = test_client(endpoint).await?;
1204        let _ = c2
1205            .subscribe("projects/p/subscriptions/s")
1206            .build()
1207            .next()
1208            .await;
1209        let req3 = recover_writes_rx
1210            .recv()
1211            .await
1212            .expect("should receive a request")?;
1213        assert_ne!(req1.client_id, req3.client_id);
1214
1215        Ok(())
1216    }
1217
1218    #[tokio_test_no_panics(start_paused = true)]
1219    async fn no_immediate_message() -> anyhow::Result<()> {
1220        const TEST_TIMEOUT: Duration = Duration::from_secs(42);
1221
1222        let (_response_tx, response_rx) = channel(10);
1223
1224        let mut mock = MockSubscriber::new();
1225        mock.expect_streaming_pull()
1226            .return_once(move |_| Ok(TonicResponse::from(response_rx)));
1227
1228        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1229        let client = test_client(endpoint).await?;
1230        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1231
1232        let _ = tokio::time::timeout(TEST_TIMEOUT, stream.next())
1233            .await
1234            .expect_err("next() should never yield.");
1235
1236        Ok(())
1237    }
1238
1239    #[tokio_test_no_panics(start_paused = true)]
1240    async fn retry_transient_when_starting_stream() -> anyhow::Result<()> {
1241        // The policy should retry forever. Our default retry policies have an
1242        // attempt limit of 10. So we arbitrarily pick a number greater than 10
1243        // for this test.
1244        const NUM_RETRIES: u32 = 20;
1245
1246        let start_time = Instant::now();
1247        let mut seq = mockall::Sequence::new();
1248        let mut mock = MockSubscriber::new();
1249
1250        // Simulate N transient errors
1251        mock.expect_streaming_pull()
1252            .times(NUM_RETRIES as usize)
1253            .in_sequence(&mut seq)
1254            .returning(|_| Err(TonicStatus::unavailable("try again")));
1255        // Simulate a permanent error. Otherwise, we would retry forever.
1256        mock.expect_streaming_pull()
1257            .times(1)
1258            .in_sequence(&mut seq)
1259            .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
1260        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1261        let client = test_client(endpoint).await?;
1262        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1263        let err = stream
1264            .next()
1265            .await
1266            .expect("stream should not be empty")
1267            .expect_err("the first streamed item should be an error");
1268        assert!(err.status().is_some(), "{err:?}");
1269        let status = err.status().unwrap();
1270        assert_eq!(
1271            status.code,
1272            google_cloud_gax::error::rpc::Code::FailedPrecondition
1273        );
1274        assert_eq!(status.message, "fail");
1275
1276        let elapsed = start_time.elapsed();
1277        assert!(
1278            elapsed <= MAXIMUM_DELAY * NUM_RETRIES,
1279            "elapsed={elapsed:?}"
1280        );
1281        assert!(
1282            elapsed >= INITIAL_DELAY * NUM_RETRIES,
1283            "elapsed={elapsed:?}"
1284        );
1285
1286        Ok(())
1287    }
1288
1289    #[tokio_test_no_panics(start_paused = true)]
1290    async fn resume_midstream_success() -> anyhow::Result<()> {
1291        let (response_tx_1, response_rx_1) = channel(10);
1292        let (response_tx_2, response_rx_2) = channel(10);
1293        let (response_tx_3, response_rx_3) = channel(10);
1294        let (ack_tx, mut ack_rx) = unbounded_channel();
1295
1296        let mut seq = mockall::Sequence::new();
1297        let mut mock = MockSubscriber::new();
1298        mock.expect_streaming_pull()
1299            .times(1)
1300            .in_sequence(&mut seq)
1301            .return_once(|_| Ok(TonicResponse::from(response_rx_1)));
1302        mock.expect_streaming_pull()
1303            .times(1)
1304            .in_sequence(&mut seq)
1305            .return_once(move |_| Ok(TonicResponse::from(response_rx_2)));
1306        mock.expect_streaming_pull()
1307            .times(1)
1308            .in_sequence(&mut seq)
1309            .return_once(|_| Ok(TonicResponse::from(response_rx_3)));
1310        mock.expect_acknowledge().times(1..).returning(move |r| {
1311            ack_tx
1312                .send(r.into_inner())
1313                .expect("sending on channel always succeeds");
1314            Ok(TonicResponse::from(()))
1315        });
1316        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1317        let client = test_client(endpoint).await?;
1318        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1319
1320        response_tx_1.send(Ok(test_response(0..10))).await?;
1321        response_tx_1.send(Ok(test_response(10..20))).await?;
1322        response_tx_1
1323            .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1324            .await?;
1325        drop(response_tx_1);
1326        response_tx_2.send(Ok(test_response(20..30))).await?;
1327        response_tx_2.send(Ok(test_response(30..40))).await?;
1328        response_tx_2
1329            .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1330            .await?;
1331        drop(response_tx_2);
1332        response_tx_3.send(Ok(test_response(40..50))).await?;
1333        drop(response_tx_3);
1334
1335        for i in 0..50 {
1336            let (m, h) = stream
1337                .next()
1338                .await
1339                .unwrap_or_else(|| panic!("expected message {}/50", i + 1))?;
1340            assert_eq!(m.data, test_data(i));
1341            h.ack();
1342        }
1343        let end = stream.next().await.transpose()?;
1344        assert!(end.is_none(), "Received extra message: {end:?}");
1345
1346        // Wait for the stream to join its background tasks.
1347        stream.shutdown_token().shutdown().await;
1348
1349        // Verify the acks went through.
1350        let mut got = Vec::new();
1351        while let Ok(ack_req) = ack_rx.try_recv() {
1352            assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1353            got.extend(ack_req.ack_ids);
1354        }
1355        assert_eq!(sorted(got), test_ids(0..50));
1356
1357        Ok(())
1358    }
1359
1360    #[tokio_test_no_panics(start_paused = true)]
1361    async fn resume_midstream_hits_permanent_error() -> anyhow::Result<()> {
1362        let (response_tx, response_rx) = channel(10);
1363        let (ack_tx, mut ack_rx) = unbounded_channel();
1364
1365        let mut seq = mockall::Sequence::new();
1366        let mut mock = MockSubscriber::new();
1367        // Start a successful stream, which will eventually disconnect.
1368        mock.expect_streaming_pull()
1369            .times(1)
1370            .in_sequence(&mut seq)
1371            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1372        // Simulate transient errors attempting to resume the stream.
1373        mock.expect_streaming_pull()
1374            .times(3)
1375            .in_sequence(&mut seq)
1376            .returning(|_| Err(TonicStatus::unavailable("try again")));
1377        // Simulate a permanent error attempting to resume the stream.
1378        mock.expect_streaming_pull()
1379            .times(1)
1380            .in_sequence(&mut seq)
1381            .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
1382        mock.expect_acknowledge().times(1..).returning(move |r| {
1383            ack_tx
1384                .send(r.into_inner())
1385                .expect("sending on channel always succeeds");
1386            Ok(TonicResponse::from(()))
1387        });
1388        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1389        let client = test_client(endpoint).await?;
1390        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1391
1392        response_tx.send(Ok(test_response(0..10))).await?;
1393        response_tx.send(Ok(test_response(10..20))).await?;
1394        response_tx
1395            .send(Err(TonicStatus::unavailable("GFE disconnect. try again")))
1396            .await?;
1397        drop(response_tx);
1398
1399        for i in 0..20 {
1400            let (m, h) = stream
1401                .next()
1402                .await
1403                .unwrap_or_else(|| panic!("expected message {}/20", i + 1))?;
1404            assert_eq!(m.data, test_data(i));
1405            h.ack();
1406        }
1407        let err = stream
1408            .next()
1409            .await
1410            .transpose()
1411            .expect_err("expected an error from stream");
1412        assert!(err.status().is_some(), "{err:?}");
1413        let status = err.status().unwrap();
1414        assert_eq!(
1415            status.code,
1416            google_cloud_gax::error::rpc::Code::FailedPrecondition
1417        );
1418        assert_eq!(status.message, "fail");
1419
1420        // Wait for the stream to join its background tasks.
1421        stream.shutdown_token().shutdown().await;
1422
1423        // Verify the acks went through.
1424        let mut got = Vec::new();
1425        while let Ok(ack_req) = ack_rx.try_recv() {
1426            assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1427            got.extend(ack_req.ack_ids);
1428        }
1429        assert_eq!(sorted(got), test_ids(0..20));
1430
1431        Ok(())
1432    }
1433
1434    #[tokio_test_no_panics]
1435    async fn routing_header() -> anyhow::Result<()> {
1436        let mut mock = MockSubscriber::new();
1437
1438        mock.expect_streaming_pull().return_once(move |request| {
1439            let metadata = request.metadata();
1440            assert_eq!(
1441                metadata
1442                    .get("x-goog-request-params")
1443                    .expect("routing header missing"),
1444                "subscription=projects/p/subscriptions/s"
1445            );
1446            Err(TonicStatus::failed_precondition("ignored"))
1447        });
1448
1449        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1450        let client = test_client(endpoint).await?;
1451
1452        let _ = client
1453            .subscribe("projects/p/subscriptions/s")
1454            .build()
1455            .next()
1456            .await;
1457
1458        Ok(())
1459    }
1460
1461    #[cfg(feature = "unstable-stream")]
1462    #[tokio_test_no_panics(start_paused = true)]
1463    async fn into_stream() -> anyhow::Result<()> {
1464        use futures::TryStreamExt;
1465        let (response_tx, response_rx) = channel(10);
1466        let (ack_tx, mut ack_rx) = unbounded_channel();
1467
1468        let mut mock = MockSubscriber::new();
1469        mock.expect_streaming_pull()
1470            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1471        mock.expect_acknowledge().returning(move |r| {
1472            ack_tx
1473                .send(r.into_inner())
1474                .expect("sending on channel always succeeds");
1475            Ok(TonicResponse::from(()))
1476        });
1477
1478        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1479        let client = test_client(endpoint).await?;
1480
1481        let stream = client
1482            .subscribe("projects/p/subscriptions/s")
1483            .build()
1484            .into_stream();
1485
1486        response_tx.send(Ok(test_response(1..3))).await?;
1487        drop(response_tx);
1488
1489        let got: Vec<_> = stream
1490            .map_ok(|(m, h)| {
1491                h.ack();
1492                m.data
1493            })
1494            .try_collect()
1495            .await?;
1496        assert_eq!(got, vec![test_data(1), test_data(2)]);
1497
1498        let ack_req = ack_rx
1499            .recv()
1500            .await
1501            .expect("should receive acknowledgements");
1502        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1503        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..3));
1504
1505        Ok(())
1506    }
1507
1508    #[tokio_test_no_panics(start_paused = true)]
1509    async fn basic_lease_expiration() -> anyhow::Result<()> {
1510        const MAX_LEASE_EXTENSION: Duration = Duration::from_secs(10);
1511        const MAX_LEASE: Duration = Duration::from_secs(30);
1512        // We configure a max lease for this test (30s) that differs from the
1513        // default (600s) to verify that an application's configuration
1514        // overrides the default.
1515
1516        let (response_tx, response_rx) = channel(10);
1517        let (extend_tx, mut extend_rx) = unbounded_channel();
1518
1519        let mut mock = MockSubscriber::new();
1520        mock.expect_streaming_pull()
1521            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1522        mock.expect_modify_ack_deadline().returning(move |r| {
1523            extend_tx
1524                .send(r.into_inner())
1525                .expect("sending on channel always succeeds");
1526            Ok(TonicResponse::from(()))
1527        });
1528        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1529        let client = test_client(endpoint).await?;
1530        let mut stream = client
1531            .subscribe("projects/p/subscriptions/s")
1532            .set_max_lease(MAX_LEASE)
1533            .set_max_lease_extension(MAX_LEASE_EXTENSION)
1534            .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1535            .build();
1536
1537        response_tx.send(Ok(test_response(0..1))).await?;
1538        drop(response_tx);
1539
1540        let (_m, _h) = stream
1541            .next()
1542            .await
1543            .expect("stream should yield a message")?;
1544
1545        // Advance the clock well past the expected message expiration,
1546        // recording the time at which we sent the last lease extension.
1547        let start_time = Instant::now();
1548        let mut latest = None;
1549        for _ in 0..MAX_LEASE.as_secs() * 2 {
1550            while let Ok(r) = extend_rx.try_recv() {
1551                assert_ne!(r.ack_deadline_seconds, 0, "unexpectedly received a nack");
1552                latest = Some(start_time.elapsed());
1553            }
1554            tokio::time::advance(Duration::from_secs(1)).await;
1555            tokio::task::yield_now().await;
1556        }
1557
1558        // Verify when we stop sending lease extensions.
1559        let expected_range = (MAX_LEASE - MAX_LEASE_EXTENSION)..=MAX_LEASE;
1560        assert!(
1561            latest.is_some_and(|t| expected_range.contains(&t)),
1562            "{latest:?}"
1563        );
1564
1565        // Close the stream, to make sure pending operations complete.
1566        stream.shutdown_token().shutdown().await;
1567
1568        Ok(())
1569    }
1570
1571    #[tokio_test_no_panics(start_paused = true)]
1572    async fn shutdown_wait_for_processing() -> anyhow::Result<()> {
1573        let (response_tx, response_rx) = channel(10);
1574
1575        let mut mock = MockSubscriber::new();
1576        mock.expect_streaming_pull()
1577            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1578        mock.expect_acknowledge()
1579            .times(1)
1580            .returning(|_| Ok(TonicResponse::from(())));
1581        mock.expect_modify_ack_deadline()
1582            .returning(|_| Ok(TonicResponse::from(())));
1583        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1584        let client = test_client(endpoint).await?;
1585        let mut stream = client
1586            .subscribe("projects/p/subscriptions/s")
1587            .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
1588            .build();
1589
1590        response_tx.send(Ok(test_response(0..1))).await?;
1591        drop(response_tx);
1592
1593        let (_m, h) = stream
1594            .next()
1595            .await
1596            .expect("stream should yield a message")?;
1597
1598        tokio::spawn(async move {
1599            // Delay the ack until after the shutdown is signaled. It should
1600            // still go through.
1601            tokio::time::sleep(Duration::from_secs(5)).await;
1602            h.ack();
1603        });
1604
1605        // Close the stream, to make sure pending operations complete.
1606        stream.shutdown_token().shutdown().await;
1607
1608        Ok(())
1609    }
1610
1611    #[tokio_test_no_panics(start_paused = true)]
1612    async fn at_least_once_and_exactly_once() -> anyhow::Result<()> {
1613        let (response_tx, response_rx) = channel(10);
1614
1615        let mut mock = MockSubscriber::new();
1616        mock.expect_streaming_pull()
1617            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1618        mock.expect_modify_ack_deadline()
1619            .returning(|_| Ok(TonicResponse::from(())));
1620        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1621        let client = test_client(endpoint).await?;
1622        let mut stream = client
1623            .subscribe("projects/p/subscriptions/s")
1624            .set_shutdown_behavior(ShutdownBehavior::NackImmediately)
1625            .build();
1626
1627        response_tx.send(Ok(test_response(0..1))).await?;
1628        response_tx
1629            .send(Ok(test_exactly_once_response(1..2)))
1630            .await?;
1631        response_tx.send(Ok(test_response(2..3))).await?;
1632        response_tx
1633            .send(Ok(test_exactly_once_response(3..4)))
1634            .await?;
1635        drop(response_tx);
1636
1637        let (m, h) = stream.next().await.expect("should yield a message")?;
1638        assert_eq!(m.data, test_data(0));
1639        assert_eq!(h.ack_id(), test_id(0));
1640        assert!(matches!(h, Handler::AtLeastOnce(_)), "{h:?}");
1641
1642        let (m, h) = stream.next().await.expect("should yield a message")?;
1643        assert_eq!(m.data, test_data(1));
1644        assert_eq!(h.ack_id(), test_id(1));
1645        assert!(matches!(h, Handler::ExactlyOnce(_)), "{h:?}");
1646
1647        let (m, h) = stream.next().await.expect("should yield a message")?;
1648        assert_eq!(m.data, test_data(2));
1649        assert_eq!(h.ack_id(), test_id(2));
1650        assert!(matches!(h, Handler::AtLeastOnce(_)), "{h:?}");
1651
1652        let (m, h) = stream.next().await.expect("should yield a message")?;
1653        assert_eq!(m.data, test_data(3));
1654        assert_eq!(h.ack_id(), test_id(3));
1655        assert!(matches!(h, Handler::ExactlyOnce(_)), "{h:?}");
1656
1657        let end = stream.next().await.transpose()?;
1658        assert!(end.is_none(), "Received extra message: {end:?}");
1659
1660        // Wait for the stream to join its background tasks.
1661        stream.shutdown_token().shutdown().await;
1662
1663        Ok(())
1664    }
1665
1666    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1667    async fn cancel_before_open() -> anyhow::Result<()> {
1668        let mut mock = MockSubscriber::new();
1669        mock.expect_streaming_pull()
1670            .returning(|_| Err(TonicStatus::unavailable("try again")));
1671        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1672        let client = test_client(endpoint).await?;
1673        let mut stream = client.subscribe("projects/p/subscriptions/s").build();
1674        let shutdown_token = stream.shutdown_token();
1675
1676        let next = tokio::spawn(async move { stream.next().await });
1677        shutdown_token.shutdown().await;
1678
1679        let end = next.await?;
1680        assert!(end.is_none(), "Shutdown should end the stream, got {end:?}");
1681
1682        Ok(())
1683    }
1684
1685    #[tokio_test_no_panics(start_paused = true)]
1686    async fn cancel_midstream() -> anyhow::Result<()> {
1687        let (response_tx, response_rx) = channel(10);
1688        let (ack_tx, mut ack_rx) = unbounded_channel();
1689        let (nack_tx, mut nack_rx) = unbounded_channel();
1690
1691        let mut mock = MockSubscriber::new();
1692        mock.expect_streaming_pull()
1693            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1694        mock.expect_acknowledge().times(1).returning(move |r| {
1695            ack_tx
1696                .send(r.into_inner())
1697                .expect("sending on channel always succeeds");
1698            Ok(TonicResponse::from(()))
1699        });
1700        mock.expect_modify_ack_deadline()
1701            .times(1)
1702            .returning(move |r| {
1703                nack_tx
1704                    .send(r.into_inner())
1705                    .expect("sending on channel always succeeds");
1706                Ok(TonicResponse::from(()))
1707            });
1708        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1709        let client = test_client(endpoint).await?;
1710        let mut stream = client
1711            .subscribe("projects/p/subscriptions/s")
1712            .set_shutdown_behavior(ShutdownBehavior::WaitForProcessing)
1713            .build();
1714        let shutdown_token = stream.shutdown_token();
1715
1716        response_tx.send(Ok(test_response(1..10))).await?;
1717        for i in 1..6 {
1718            let Some((m, h)) = stream.next().await.transpose()? else {
1719                anyhow::bail!("expected message {i}/5")
1720            };
1721            assert_eq!(m.data, test_data(i));
1722            h.ack();
1723        }
1724        let shutdown = tokio::spawn(async move {
1725            shutdown_token.shutdown().await;
1726        });
1727        tokio::task::yield_now().await;
1728        let end = stream.next().await.transpose()?;
1729        assert!(end.is_none(), "Shutdown should end the stream, got {end:?}");
1730
1731        // Verify that we drop the messages and handles in the pool that we have
1732        // not returned to the application yet.
1733        shutdown.await?;
1734
1735        let ack_req = ack_rx.try_recv()?;
1736        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1737        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1738
1739        let nack_req = nack_rx.try_recv()?;
1740        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1741        assert_eq!(nack_req.ack_deadline_seconds, 0);
1742        assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1743
1744        Ok(())
1745    }
1746
1747    #[test_case(ShutdownBehavior::NackImmediately)]
1748    #[test_case(ShutdownBehavior::WaitForProcessing)]
1749    #[tokio_test_no_panics(start_paused = true)]
1750    async fn shutdown_without_next(shutdown_behavior: ShutdownBehavior) -> anyhow::Result<()> {
1751        let (response_tx, response_rx) = channel(10);
1752        let (ack_tx, mut ack_rx) = unbounded_channel();
1753        let (nack_tx, mut nack_rx) = unbounded_channel();
1754
1755        let mut mock = MockSubscriber::new();
1756        mock.expect_streaming_pull()
1757            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1758        mock.expect_acknowledge().times(1).returning(move |r| {
1759            ack_tx
1760                .send(r.into_inner())
1761                .expect("sending on channel always succeeds");
1762            Ok(TonicResponse::from(()))
1763        });
1764        mock.expect_modify_ack_deadline()
1765            .times(1)
1766            .returning(move |r| {
1767                nack_tx
1768                    .send(r.into_inner())
1769                    .expect("sending on channel always succeeds");
1770                Ok(TonicResponse::from(()))
1771            });
1772        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1773        let client = test_client(endpoint).await?;
1774        let mut stream = client
1775            .subscribe("projects/p/subscriptions/s")
1776            .set_shutdown_behavior(shutdown_behavior)
1777            .build();
1778        let shutdown_token = stream.shutdown_token();
1779
1780        response_tx.send(Ok(test_response(1..10))).await?;
1781        for i in 1..6 {
1782            let Some((m, h)) = stream.next().await.transpose()? else {
1783                anyhow::bail!("expected message {i}/5")
1784            };
1785            assert_eq!(m.data, test_data(i));
1786            h.ack();
1787        }
1788        // Note that the application does not have to call `stream.next()`, or
1789        // `drop(stream)` to begin the shutdown procedure after a cancel.
1790        shutdown_token.shutdown().await;
1791
1792        let ack_req = ack_rx.try_recv()?;
1793        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1794        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1795
1796        let nack_req = nack_rx.try_recv()?;
1797        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1798        assert_eq!(nack_req.ack_deadline_seconds, 0);
1799        assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1800
1801        Ok(())
1802    }
1803
1804    #[test_case(ShutdownBehavior::NackImmediately)]
1805    #[test_case(ShutdownBehavior::WaitForProcessing)]
1806    #[tokio_test_no_panics(start_paused = true)]
1807    async fn stream_error_initiates_shutdown(
1808        shutdown_behavior: ShutdownBehavior,
1809    ) -> anyhow::Result<()> {
1810        let (response_tx, response_rx) = channel(10);
1811        let (ack_tx, mut ack_rx) = unbounded_channel();
1812
1813        let mut mock = MockSubscriber::new();
1814        mock.expect_streaming_pull()
1815            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1816        mock.expect_acknowledge().times(1).returning(move |r| {
1817            ack_tx
1818                .send(r.into_inner())
1819                .expect("sending on channel always succeeds");
1820            Ok(TonicResponse::from(()))
1821        });
1822        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1823        let client = test_client(endpoint).await?;
1824        let mut stream = client
1825            .subscribe("projects/p/subscriptions/s")
1826            .set_shutdown_behavior(shutdown_behavior)
1827            .build();
1828        let shutdown_token = stream.shutdown_token();
1829
1830        response_tx.send(Ok(test_response(0..1))).await?;
1831        response_tx
1832            .send(Err(TonicStatus::failed_precondition("fail")))
1833            .await?;
1834        drop(response_tx);
1835
1836        let (m, h) = stream.next().await.expect("should yield a message")?;
1837        assert_eq!(m.data, test_data(0));
1838        h.ack();
1839
1840        let err = stream.next().await.expect("should yield an error");
1841        assert!(err.is_err(), "{err:?}");
1842
1843        // Note that the application does not have to initiate a shutdown after
1844        // a permanent error.
1845        shutdown_token.wait_for_shutdown().await;
1846
1847        let ack_req = ack_rx.try_recv()?;
1848        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1849        assert_eq!(ack_req.ack_ids, test_ids(0..1));
1850
1851        Ok(())
1852    }
1853
1854    #[test_case(ShutdownBehavior::NackImmediately)]
1855    #[test_case(ShutdownBehavior::WaitForProcessing)]
1856    #[tokio_test_no_panics(start_paused = true)]
1857    async fn drop_cancels(shutdown_behavior: ShutdownBehavior) -> anyhow::Result<()> {
1858        let (response_tx, response_rx) = channel(10);
1859        let (ack_tx, mut ack_rx) = unbounded_channel();
1860        let (nack_tx, mut nack_rx) = unbounded_channel();
1861
1862        let mut mock = MockSubscriber::new();
1863        mock.expect_streaming_pull()
1864            .return_once(|_| Ok(TonicResponse::from(response_rx)));
1865        mock.expect_acknowledge().times(1).returning(move |r| {
1866            ack_tx
1867                .send(r.into_inner())
1868                .expect("sending on channel always succeeds");
1869            Ok(TonicResponse::from(()))
1870        });
1871        mock.expect_modify_ack_deadline()
1872            .times(1)
1873            .returning(move |r| {
1874                nack_tx
1875                    .send(r.into_inner())
1876                    .expect("sending on channel always succeeds");
1877                Ok(TonicResponse::from(()))
1878            });
1879        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
1880        let client = test_client(endpoint).await?;
1881        let mut stream = client
1882            .subscribe("projects/p/subscriptions/s")
1883            .set_shutdown_behavior(shutdown_behavior)
1884            .build();
1885        let shutdown_token = stream.shutdown_token();
1886
1887        response_tx.send(Ok(test_response(1..10))).await?;
1888        for i in 1..6 {
1889            let Some((m, h)) = stream.next().await.transpose()? else {
1890                anyhow::bail!("expected message {i}/5")
1891            };
1892            assert_eq!(m.data, test_data(i));
1893            h.ack();
1894        }
1895        drop(stream); // Equivalent to cancelling the `ShutdownToken`
1896        shutdown_token.wait_for_shutdown().await;
1897
1898        let ack_req = ack_rx.try_recv()?;
1899        assert_eq!(ack_req.subscription, "projects/p/subscriptions/s");
1900        assert_eq!(sorted(ack_req.ack_ids), test_ids(1..6));
1901
1902        let nack_req = nack_rx.try_recv()?;
1903        assert_eq!(nack_req.subscription, "projects/p/subscriptions/s");
1904        assert_eq!(nack_req.ack_deadline_seconds, 0);
1905        assert_eq!(sorted(nack_req.ack_ids), test_ids(6..10));
1906
1907        Ok(())
1908    }
1909}