hojicha_runtime/
subscription.rs

1//! Stream subscription support for async event sources
2//!
3//! This module provides the `Subscription` type for managing subscriptions to
4//! async streams (like WebSocket connections, file watchers, or timers). Subscriptions
5//! automatically forward stream items as messages to your program's event loop.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use futures::stream;
11//! use std::time::Duration;
12//!
13//! // Subscribe to a stream of periodic events
14//! let stream = stream::repeat(MyMsg::Tick)
15//!     .throttle(Duration::from_secs(1));
16//!
17//! let subscription = program.subscribe(stream);
18//!
19//! // The stream will send MyMsg::Tick every second
20//! // Until the subscription is dropped or cancelled
21//!
22//! // Cancel when done
23//! subscription.cancel().await;
24//! ```
25//!
26//! # Automatic Cleanup
27//!
28//! Subscriptions are automatically cancelled when dropped, ensuring no resource
29//! leaks from forgotten stream subscriptions.
30
31use tokio::task::JoinHandle;
32use tokio_util::sync::CancellationToken;
33
34/// A handle to a running stream subscription
35pub struct Subscription {
36    handle: JoinHandle<()>,
37    cancel_token: CancellationToken,
38}
39
40impl Subscription {
41    /// Create a new subscription with the given task handle and cancellation token
42    pub(crate) fn new(handle: JoinHandle<()>, cancel_token: CancellationToken) -> Self {
43        Self {
44            handle,
45            cancel_token,
46        }
47    }
48
49    /// Cancel the subscription
50    ///
51    /// This will stop the stream from sending more events to the program.
52    pub fn cancel(&self) {
53        self.cancel_token.cancel();
54    }
55
56    /// Check if the subscription is still active
57    pub fn is_active(&self) -> bool {
58        !self.handle.is_finished() && !self.cancel_token.is_cancelled()
59    }
60
61    /// Check if the subscription has completed
62    pub fn is_finished(&self) -> bool {
63        self.handle.is_finished()
64    }
65}
66
67impl Drop for Subscription {
68    fn drop(&mut self) {
69        // Cancel the subscription when dropped
70        self.cancel_token.cancel();
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use std::time::Duration;
78
79    #[tokio::test]
80    async fn test_subscription_cancel() {
81        let token = CancellationToken::new();
82        let token_clone = token.clone();
83
84        let handle = tokio::spawn(async move {
85            loop {
86                if token_clone.is_cancelled() {
87                    break;
88                }
89                tokio::time::sleep(Duration::from_millis(10)).await;
90            }
91        });
92
93        let subscription = Subscription::new(handle, token.clone());
94        assert!(subscription.is_active());
95
96        subscription.cancel();
97        tokio::time::sleep(Duration::from_millis(50)).await;
98
99        assert!(!subscription.is_active());
100    }
101
102    #[tokio::test]
103    async fn test_subscription_drop_cancels() {
104        let token = CancellationToken::new();
105        let token_clone = token.clone();
106
107        let handle = tokio::spawn(async move {
108            loop {
109                if token_clone.is_cancelled() {
110                    break;
111                }
112                tokio::time::sleep(Duration::from_millis(10)).await;
113            }
114        });
115
116        {
117            let _subscription = Subscription::new(handle, token.clone());
118            // Subscription dropped here
119        }
120
121        tokio::time::sleep(Duration::from_millis(50)).await;
122        assert!(token.is_cancelled());
123    }
124}