hojicha_runtime/subscription.rs
1//! Stream subscription support for async event sources
2//!
3//! This module provides the `Subscription` type for managing subscriptions to
4//! async streams (like WebSocket connections, file watchers, or timers). Subscriptions
5//! automatically forward stream items as messages to your program's event loop.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use futures::stream;
11//! use std::time::Duration;
12//!
13//! // Subscribe to a stream of periodic events
14//! let stream = stream::repeat(MyMsg::Tick)
15//! .throttle(Duration::from_secs(1));
16//!
17//! let subscription = program.subscribe(stream);
18//!
19//! // The stream will send MyMsg::Tick every second
20//! // Until the subscription is dropped or cancelled
21//!
22//! // Cancel when done
23//! subscription.cancel().await;
24//! ```
25//!
26//! # Automatic Cleanup
27//!
28//! Subscriptions are automatically cancelled when dropped, ensuring no resource
29//! leaks from forgotten stream subscriptions.
30
31use tokio::task::JoinHandle;
32use tokio_util::sync::CancellationToken;
33
34/// A handle to a running stream subscription
35pub struct Subscription {
36 handle: JoinHandle<()>,
37 cancel_token: CancellationToken,
38}
39
40impl Subscription {
41 /// Create a new subscription with the given task handle and cancellation token
42 pub(crate) fn new(handle: JoinHandle<()>, cancel_token: CancellationToken) -> Self {
43 Self {
44 handle,
45 cancel_token,
46 }
47 }
48
49 /// Cancel the subscription
50 ///
51 /// This will stop the stream from sending more events to the program.
52 pub fn cancel(&self) {
53 self.cancel_token.cancel();
54 }
55
56 /// Check if the subscription is still active
57 pub fn is_active(&self) -> bool {
58 !self.handle.is_finished() && !self.cancel_token.is_cancelled()
59 }
60
61 /// Check if the subscription has completed
62 pub fn is_finished(&self) -> bool {
63 self.handle.is_finished()
64 }
65}
66
67impl Drop for Subscription {
68 fn drop(&mut self) {
69 // Cancel the subscription when dropped
70 self.cancel_token.cancel();
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use std::time::Duration;
78
79 #[tokio::test]
80 async fn test_subscription_cancel() {
81 let token = CancellationToken::new();
82 let token_clone = token.clone();
83
84 let handle = tokio::spawn(async move {
85 loop {
86 if token_clone.is_cancelled() {
87 break;
88 }
89 tokio::time::sleep(Duration::from_millis(10)).await;
90 }
91 });
92
93 let subscription = Subscription::new(handle, token.clone());
94 assert!(subscription.is_active());
95
96 subscription.cancel();
97 tokio::time::sleep(Duration::from_millis(50)).await;
98
99 assert!(!subscription.is_active());
100 }
101
102 #[tokio::test]
103 async fn test_subscription_drop_cancels() {
104 let token = CancellationToken::new();
105 let token_clone = token.clone();
106
107 let handle = tokio::spawn(async move {
108 loop {
109 if token_clone.is_cancelled() {
110 break;
111 }
112 tokio::time::sleep(Duration::from_millis(10)).await;
113 }
114 });
115
116 {
117 let _subscription = Subscription::new(handle, token.clone());
118 // Subscription dropped here
119 }
120
121 tokio::time::sleep(Duration::from_millis(50)).await;
122 assert!(token.is_cancelled());
123 }
124}