use futures::future::{FusedFuture, FutureExt};
#[derive(Debug, Clone)]
pub struct CancellationToken {
inner: tokio_util::sync::CancellationToken,
}
impl CancellationToken {
#[must_use]
pub fn new() -> Self {
Self {
inner: tokio_util::sync::CancellationToken::new(),
}
}
#[must_use]
pub fn child_token(&self) -> Self {
Self {
inner: self.inner.child_token(),
}
}
pub fn cancel(&self) {
self.inner.cancel();
}
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.inner.is_cancelled()
}
#[must_use]
pub fn cancelled(&self) -> impl FusedFuture<Output = ()> + '_ {
self.inner.cancelled().fuse()
}
}
impl Default for CancellationToken {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Builder, GenericRuntime as _};
#[test_log::test]
fn test_cancelled_future_completes_immediately_when_already_cancelled() {
let runtime = crate::tokio::runtime::build_runtime(&Builder::new()).unwrap();
let token = CancellationToken::new();
token.cancel();
runtime.block_on(async {
token.cancelled().await;
});
runtime.wait().unwrap();
}
#[cfg(feature = "time")]
#[test_log::test]
fn test_cancelled_future_works_in_select() {
let runtime = crate::tokio::runtime::build_runtime(&Builder::new()).unwrap();
let token = CancellationToken::new();
let token_clone = token.clone();
runtime.block_on(async {
let result = tokio::select! {
() = token.cancelled() => "cancelled",
() = async {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
token_clone.cancel();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
} => "timeout",
};
assert_eq!(result, "cancelled");
});
runtime.wait().unwrap();
}
#[test_log::test]
fn test_child_token_cancellation_propagation() {
let parent = CancellationToken::new();
let child = parent.child_token();
assert!(!parent.is_cancelled());
assert!(!child.is_cancelled());
parent.cancel();
assert!(parent.is_cancelled());
assert!(child.is_cancelled());
}
}