Skip to main content

google_cloud_pubsub/subscriber/
shutdown_token.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::future::{BoxFuture, Shared};
16use tokio_util::sync::CancellationToken;
17
18/// A token to signal and await shutdown of a stream.
19///
20/// # Example
21/// ```
22/// use google_cloud_pubsub::subscriber::MessageStream;
23/// async fn sample(stream: MessageStream) {
24///   // Get a shutdown token for the stream.
25///   let token = stream.shutdown_token();
26///
27///   // Signal and await a shutdown of the stream.
28///   token.shutdown().await;
29/// }
30/// ```
31#[derive(Clone, Debug)]
32pub struct ShutdownToken {
33    pub(super) inner: CancellationToken,
34    pub(super) fut: Shared<BoxFuture<'static, ()>>,
35}
36
37impl ShutdownToken {
38    /// Signal and await a stream shutdown.
39    ///
40    /// Applications should call this to ensure all pending ack/nack RPCs have
41    /// time to complete before a process exits.
42    ///
43    /// See [`Subscribe::set_shutdown_behavior`][setter] to configure the exact
44    /// behavior on shutdown.
45    ///
46    /// [setter]: crate::builder::subscriber::Subscribe::set_shutdown_behavior
47    pub async fn shutdown(&self) {
48        self.inner.cancel();
49        self.fut.clone().await
50    }
51
52    #[cfg(test)]
53    pub(super) async fn wait_for_shutdown(&self) {
54        self.fut.clone().await
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61    use futures::FutureExt;
62    use tokio::sync::oneshot::channel;
63
64    #[tokio::test(start_paused = true)]
65    async fn shutdown() -> anyhow::Result<()> {
66        let (tx, rx) = channel();
67        let fut = rx.map(|_| ()).boxed().shared();
68
69        let token = ShutdownToken {
70            inner: CancellationToken::new(),
71            fut,
72        };
73        assert!(!token.inner.is_cancelled(), "{token:?}");
74        assert!(token.fut.peek().is_none(), "future should be pending");
75
76        let token_clone = token.clone();
77        assert!(!token_clone.inner.is_cancelled(), "{token_clone:?}");
78        assert!(token_clone.fut.peek().is_none(), "future should be pending");
79
80        let handle = tokio::spawn(async move {
81            token_clone.shutdown().await;
82            assert!(token_clone.inner.is_cancelled(), "{token_clone:?}");
83            assert!(
84                token_clone.fut.peek().is_some(),
85                "future should be satisfied"
86            );
87        });
88        tokio::task::yield_now().await;
89
90        assert!(token.inner.is_cancelled(), "{token:?}");
91        assert!(token.fut.peek().is_none(), "future should be pending");
92
93        // Satisfy the future
94        let _ = tx.send(());
95        handle.await?;
96        assert!(token.inner.is_cancelled(), "{token:?}");
97        assert!(token.fut.peek().is_some(), "future should be satisfied");
98
99        // A second shutdown is a no-op.
100        token.shutdown().await;
101        assert!(token.inner.is_cancelled(), "{token:?}");
102        assert!(token.fut.peek().is_some(), "future should be satisfied");
103
104        Ok(())
105    }
106}