Skip to main content

google_cloud_pubsub/subscriber/
shutdown_token.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::future::{BoxFuture, Shared};
16use tokio_util::sync::CancellationToken;
17
18/// A token to signal and await shutdown of a stream.
19///
20/// # Example
21/// ```
22/// use google_cloud_pubsub::subscriber::MessageStream;
23/// async fn sample(stream: MessageStream) {
24///   // Get a shutdown token for the stream.
25///   let token = stream.shutdown_token();
26///
27///   // Signal and await a shutdown of the stream.
28///   token.shutdown().await;
29/// }
30/// ```
31#[derive(Clone, Debug)]
32pub struct ShutdownToken {
33    pub(super) inner: CancellationToken,
34    pub(super) fut: Shared<BoxFuture<'static, ()>>,
35}
36
37impl ShutdownToken {
38    /// Signal and await a stream shutdown.
39    ///
40    /// Applications should call this to ensure all pending ack/nack RPCs have
41    /// time to complete before a process exits.
42    ///
43    /// See [`Subscribe::set_shutdown_behavior`][setter] to configure the exact
44    /// behavior on shutdown.
45    ///
46    /// [setter]: crate::builder::subscriber::Subscribe::set_shutdown_behavior
47    pub async fn shutdown(&self) {
48        self.inner.cancel();
49        self.fut.clone().await
50    }
51}
52
53#[cfg(test)]
54mod tests {
55    use super::*;
56    use futures::FutureExt;
57    use tokio::sync::oneshot::channel;
58
59    impl ShutdownToken {
60        pub(crate) async fn wait_for_shutdown(&self) {
61            self.fut.clone().await
62        }
63    }
64
65    #[tokio::test(start_paused = true)]
66    async fn shutdown() -> anyhow::Result<()> {
67        let (tx, rx) = channel();
68        let fut = rx.map(|_| ()).boxed().shared();
69
70        let token = ShutdownToken {
71            inner: CancellationToken::new(),
72            fut,
73        };
74        assert!(!token.inner.is_cancelled(), "{token:?}");
75        assert!(token.fut.peek().is_none(), "future should be pending");
76
77        let token_clone = token.clone();
78        assert!(!token_clone.inner.is_cancelled(), "{token_clone:?}");
79        assert!(token_clone.fut.peek().is_none(), "future should be pending");
80
81        let handle = tokio::spawn(async move {
82            token_clone.shutdown().await;
83            assert!(token_clone.inner.is_cancelled(), "{token_clone:?}");
84            assert!(
85                token_clone.fut.peek().is_some(),
86                "future should be satisfied"
87            );
88        });
89        tokio::task::yield_now().await;
90
91        assert!(token.inner.is_cancelled(), "{token:?}");
92        assert!(token.fut.peek().is_none(), "future should be pending");
93
94        // Satisfy the future
95        let _ = tx.send(());
96        handle.await?;
97        assert!(token.inner.is_cancelled(), "{token:?}");
98        assert!(token.fut.peek().is_some(), "future should be satisfied");
99
100        // A second shutdown is a no-op.
101        token.shutdown().await;
102        assert!(token.inner.is_cancelled(), "{token:?}");
103        assert!(token.fut.peek().is_some(), "future should be satisfied");
104
105        Ok(())
106    }
107}