use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
pub struct Subscription {
handle: JoinHandle<()>,
cancel_token: CancellationToken,
}
impl Subscription {
pub(crate) fn new(handle: JoinHandle<()>, cancel_token: CancellationToken) -> Self {
Self {
handle,
cancel_token,
}
}
pub fn cancel(&self) {
self.cancel_token.cancel();
}
pub fn is_active(&self) -> bool {
!self.handle.is_finished() && !self.cancel_token.is_cancelled()
}
pub fn is_finished(&self) -> bool {
self.handle.is_finished()
}
}
impl Drop for Subscription {
fn drop(&mut self) {
self.cancel_token.cancel();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn test_subscription_cancel() {
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
loop {
if token_clone.is_cancelled() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
let subscription = Subscription::new(handle, token.clone());
assert!(subscription.is_active());
subscription.cancel();
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(!subscription.is_active());
}
#[tokio::test]
async fn test_subscription_drop_cancels() {
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
loop {
if token_clone.is_cancelled() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
{
let _subscription = Subscription::new(handle, token.clone());
}
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(token.is_cancelled());
}
}