google_cloud_pubsub/subscriber/
shutdown_token.rs1use futures::future::{BoxFuture, Shared};
16use tokio_util::sync::CancellationToken;
17
18#[derive(Clone, Debug)]
32pub struct ShutdownToken {
33 pub(super) inner: CancellationToken,
34 pub(super) fut: Shared<BoxFuture<'static, ()>>,
35}
36
37impl ShutdownToken {
38 pub async fn shutdown(&self) {
48 self.inner.cancel();
49 self.fut.clone().await
50 }
51
52 #[cfg(test)]
53 pub(super) async fn wait_for_shutdown(&self) {
54 self.fut.clone().await
55 }
56}
57
58#[cfg(test)]
59mod tests {
60 use super::*;
61 use futures::FutureExt;
62 use tokio::sync::oneshot::channel;
63
64 #[tokio::test(start_paused = true)]
65 async fn shutdown() -> anyhow::Result<()> {
66 let (tx, rx) = channel();
67 let fut = rx.map(|_| ()).boxed().shared();
68
69 let token = ShutdownToken {
70 inner: CancellationToken::new(),
71 fut,
72 };
73 assert!(!token.inner.is_cancelled(), "{token:?}");
74 assert!(token.fut.peek().is_none(), "future should be pending");
75
76 let token_clone = token.clone();
77 assert!(!token_clone.inner.is_cancelled(), "{token_clone:?}");
78 assert!(token_clone.fut.peek().is_none(), "future should be pending");
79
80 let handle = tokio::spawn(async move {
81 token_clone.shutdown().await;
82 assert!(token_clone.inner.is_cancelled(), "{token_clone:?}");
83 assert!(
84 token_clone.fut.peek().is_some(),
85 "future should be satisfied"
86 );
87 });
88 tokio::task::yield_now().await;
89
90 assert!(token.inner.is_cancelled(), "{token:?}");
91 assert!(token.fut.peek().is_none(), "future should be pending");
92
93 let _ = tx.send(());
95 handle.await?;
96 assert!(token.inner.is_cancelled(), "{token:?}");
97 assert!(token.fut.peek().is_some(), "future should be satisfied");
98
99 token.shutdown().await;
101 assert!(token.inner.is_cancelled(), "{token:?}");
102 assert!(token.fut.peek().is_some(), "future should be satisfied");
103
104 Ok(())
105 }
106}