use futures::future::{BoxFuture, Shared};
use tokio_util::sync::CancellationToken;
#[derive(Clone, Debug)]
pub struct ShutdownToken {
pub(super) inner: CancellationToken,
pub(super) fut: Shared<BoxFuture<'static, ()>>,
}
impl ShutdownToken {
pub async fn shutdown(&self) {
self.inner.cancel();
self.fut.clone().await
}
#[cfg(test)]
pub(super) async fn wait_for_shutdown(&self) {
self.fut.clone().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::FutureExt;
use tokio::sync::oneshot::channel;
#[tokio::test(start_paused = true)]
async fn shutdown() -> anyhow::Result<()> {
let (tx, rx) = channel();
let fut = rx.map(|_| ()).boxed().shared();
let token = ShutdownToken {
inner: CancellationToken::new(),
fut,
};
assert!(!token.inner.is_cancelled(), "{token:?}");
assert!(token.fut.peek().is_none(), "future should be pending");
let token_clone = token.clone();
assert!(!token_clone.inner.is_cancelled(), "{token_clone:?}");
assert!(token_clone.fut.peek().is_none(), "future should be pending");
let handle = tokio::spawn(async move {
token_clone.shutdown().await;
assert!(token_clone.inner.is_cancelled(), "{token_clone:?}");
assert!(
token_clone.fut.peek().is_some(),
"future should be satisfied"
);
});
tokio::task::yield_now().await;
assert!(token.inner.is_cancelled(), "{token:?}");
assert!(token.fut.peek().is_none(), "future should be pending");
let _ = tx.send(());
handle.await?;
assert!(token.inner.is_cancelled(), "{token:?}");
assert!(token.fut.peek().is_some(), "future should be satisfied");
token.shutdown().await;
assert!(token.inner.is_cancelled(), "{token:?}");
assert!(token.fut.peek().is_some(), "future should be satisfied");
Ok(())
}
}