google_cloud_pubsub/subscriber/
shutdown_token.rs1use futures::future::{BoxFuture, Shared};
16use tokio_util::sync::CancellationToken;
17
18#[derive(Clone, Debug)]
32pub struct ShutdownToken {
33 pub(super) inner: CancellationToken,
34 pub(super) fut: Shared<BoxFuture<'static, ()>>,
35}
36
37impl ShutdownToken {
38 pub async fn shutdown(&self) {
48 self.inner.cancel();
49 self.fut.clone().await
50 }
51}
52
53#[cfg(test)]
54mod tests {
55 use super::*;
56 use futures::FutureExt;
57 use tokio::sync::oneshot::channel;
58
59 impl ShutdownToken {
60 pub(crate) async fn wait_for_shutdown(&self) {
61 self.fut.clone().await
62 }
63 }
64
65 #[tokio::test(start_paused = true)]
66 async fn shutdown() -> anyhow::Result<()> {
67 let (tx, rx) = channel();
68 let fut = rx.map(|_| ()).boxed().shared();
69
70 let token = ShutdownToken {
71 inner: CancellationToken::new(),
72 fut,
73 };
74 assert!(!token.inner.is_cancelled(), "{token:?}");
75 assert!(token.fut.peek().is_none(), "future should be pending");
76
77 let token_clone = token.clone();
78 assert!(!token_clone.inner.is_cancelled(), "{token_clone:?}");
79 assert!(token_clone.fut.peek().is_none(), "future should be pending");
80
81 let handle = tokio::spawn(async move {
82 token_clone.shutdown().await;
83 assert!(token_clone.inner.is_cancelled(), "{token_clone:?}");
84 assert!(
85 token_clone.fut.peek().is_some(),
86 "future should be satisfied"
87 );
88 });
89 tokio::task::yield_now().await;
90
91 assert!(token.inner.is_cancelled(), "{token:?}");
92 assert!(token.fut.peek().is_none(), "future should be pending");
93
94 let _ = tx.send(());
96 handle.await?;
97 assert!(token.inner.is_cancelled(), "{token:?}");
98 assert!(token.fut.peek().is_some(), "future should be satisfied");
99
100 token.shutdown().await;
102 assert!(token.inner.is_cancelled(), "{token:?}");
103 assert!(token.fut.peek().is_some(), "future should be satisfied");
104
105 Ok(())
106 }
107}