synap_sdk/
reactive.rs

1//! Reactive streaming primitives
2//!
3//! Provides base types for reactive message/event consumption.
4
5use futures::Stream;
6use std::pin::Pin;
7use tokio::sync::mpsc;
8
9/// A stream of messages
10pub type MessageStream<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
11
12/// Subscription handle for controlling message streams
13pub struct SubscriptionHandle {
14    cancel_tx: mpsc::UnboundedSender<()>,
15}
16
17impl SubscriptionHandle {
18    /// Create a new subscription handle
19    pub(crate) fn new(cancel_tx: mpsc::UnboundedSender<()>) -> Self {
20        Self { cancel_tx }
21    }
22
23    /// Unsubscribe from the stream
24    pub fn unsubscribe(&self) {
25        let _ = self.cancel_tx.send(());
26    }
27
28    /// Check if subscription is still active
29    pub fn is_active(&self) -> bool {
30        !self.cancel_tx.is_closed()
31    }
32}
33
34#[cfg(test)]
35mod tests {
36    use super::*;
37
38    #[tokio::test]
39    async fn test_subscription_handle() {
40        let (tx, mut rx) = mpsc::unbounded_channel();
41        let handle = SubscriptionHandle::new(tx);
42
43        assert!(handle.is_active());
44        handle.unsubscribe();
45        assert!(rx.recv().await.is_some());
46    }
47}