1use futures::Stream;
6use std::pin::Pin;
7use tokio::sync::mpsc;
8
9pub type MessageStream<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
11
12pub struct SubscriptionHandle {
14 cancel_tx: mpsc::UnboundedSender<()>,
15}
16
17impl SubscriptionHandle {
18 pub(crate) fn new(cancel_tx: mpsc::UnboundedSender<()>) -> Self {
20 Self { cancel_tx }
21 }
22
23 pub fn unsubscribe(&self) {
25 let _ = self.cancel_tx.send(());
26 }
27
28 pub fn is_active(&self) -> bool {
30 !self.cancel_tx.is_closed()
31 }
32}
33
34#[cfg(test)]
35mod tests {
36 use super::*;
37
38 #[tokio::test]
39 async fn test_subscription_handle() {
40 let (tx, mut rx) = mpsc::unbounded_channel();
41 let handle = SubscriptionHandle::new(tx);
42
43 assert!(handle.is_active());
44 handle.unsubscribe();
45 assert!(rx.recv().await.is_some());
46 }
47}