use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
pub struct Subscription {
id: String,
cancel: CancellationToken,
done: watch::Receiver<bool>,
}
impl Subscription {
pub(crate) fn new(id: String, cancel: CancellationToken, done: watch::Receiver<bool>) -> Self {
Self { id, cancel, done }
}
pub async fn unsubscribe(&self) {
self.cancel.cancel();
let mut done = self.done.clone();
let _ = done.wait_for(|&v| v).await;
}
pub fn cancel(&self) {
self.cancel.cancel();
}
pub fn is_done(&self) -> bool {
*self.done.borrow()
}
pub async fn done(&self) {
let mut done = self.done.clone();
let _ = done.wait_for(|&v| v).await;
}
pub fn id(&self) -> &str {
&self.id
}
pub fn is_closed(&self) -> bool {
self.cancel.is_cancelled()
}
#[allow(dead_code)]
pub(crate) fn token(&self) -> CancellationToken {
self.cancel.clone()
}
}
impl std::fmt::Debug for Subscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription")
.field("id", &self.id)
.field("is_cancelled", &self.cancel.is_cancelled())
.field("is_done", &*self.done.borrow())
.finish()
}
}
impl Drop for Subscription {
fn drop(&mut self) {
self.cancel.cancel();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_subscription_cancel() {
let cancel = CancellationToken::new();
let (done_tx, done_rx) = watch::channel(false);
let sub = Subscription::new("test-1".to_string(), cancel.clone(), done_rx);
assert_eq!(sub.id(), "test-1");
assert!(!sub.is_closed());
assert!(!sub.is_done());
sub.cancel();
assert!(sub.is_closed());
assert!(cancel.is_cancelled());
let _ = done_tx.send(true);
assert!(sub.is_done());
}
#[tokio::test]
async fn test_subscription_unsubscribe() {
let cancel = CancellationToken::new();
let (done_tx, done_rx) = watch::channel(false);
let sub = Subscription::new("test-2".to_string(), cancel, done_rx);
let token = sub.token();
tokio::spawn(async move {
token.cancelled().await;
let _ = done_tx.send(true);
});
sub.unsubscribe().await;
assert!(sub.is_closed());
assert!(sub.is_done());
}
#[tokio::test]
async fn test_subscription_drop_cancels() {
let cancel = CancellationToken::new();
let (_done_tx, done_rx) = watch::channel(false);
let token = cancel.clone();
{
let _sub = Subscription::new("test-3".to_string(), cancel, done_rx);
assert!(!token.is_cancelled());
}
assert!(token.is_cancelled());
}
#[tokio::test]
async fn test_subscription_done_waits() {
let cancel = CancellationToken::new();
let (done_tx, done_rx) = watch::channel(false);
let sub = Subscription::new("test-4".to_string(), cancel, done_rx);
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let _ = done_tx.send(true);
});
sub.done().await;
assert!(sub.is_done());
}
}