rxrust/subscription.rs
1//! Subscription trait and implementations
2//!
3//! This module contains the Subscription trait for managing the lifecycle
4//! of subscriptions.
5
6pub mod boxed;
7pub mod dynamic;
8pub mod either;
9pub mod source_with_dynamic;
10pub mod source_with_handle;
11pub mod tuple;
12
13pub use boxed::{BoxedSubscription, BoxedSubscriptionSend, IntoBoxedSubscription};
14pub use dynamic::DynamicSubscriptions;
15pub use either::EitherSubscription;
16pub use source_with_dynamic::SourceWithDynamicSubs;
17pub use source_with_handle::SourceWithHandle;
18pub use tuple::TupleSubscription;
19
20/// A functional adapter that turns a closure into a `Subscription`.
21///
22/// This struct allows users to define teardown/unsubscription logic using a
23/// simple closure, without needing to define a custom struct that implements
24/// the `Subscription` trait. It is particularly useful when working with the
25/// [`create`](crate::observable::ObservableFactory::create) operator, where you
26/// often need to return a cleanup action.
27///
28/// # Why is this needed?
29///
30/// Rust's orphan rules and type inference limitations prevent us from directly
31/// implementing `Subscription` for all `FnOnce()` closures globally.
32/// `ClosureSubscription` serves as an explicit wrapper to tell the compiler:
33/// "Treat this closure as a Subscription".
34///
35/// # Zero-Cost Abstraction
36///
37/// This is a newtype wrapper that compiles down to a direct function call.
38/// There is no runtime overhead compared to calling the closure directly.
39///
40/// # Example
41///
42/// ```rust
43/// use std::convert::Infallible;
44///
45/// use rxrust::{prelude::*, subscription::ClosureSubscription};
46///
47/// Local::create::<(), Infallible, _, _>(|_emitter| {
48/// println!("Subscribed");
49///
50/// // Return a closure wrapped in ClosureSubscription as the teardown logic
51/// ClosureSubscription(move || {
52/// println!("Unsubscribed - cleaning up resources");
53/// })
54/// });
55/// ```
56pub struct ClosureSubscription<F>(pub F);
57
58impl<F> Subscription for ClosureSubscription<F>
59where
60 F: FnOnce(),
61{
62 fn unsubscribe(self) { (self.0)() }
63
64 fn is_closed(&self) -> bool { false }
65}
66
67/// An RAII implementation of a "scoped subscribed" of a subscription.
68/// When this structure is dropped (falls out of scope), the subscription will
69/// be unsubscribed.
70///
71/// Implements the [must_use](
72/// https://doc.rust-lang.org/reference/attributes/diagnostics.html
73/// #the-must_use-attribute)
74/// attribute
75///
76/// If you want to drop it immediately, wrap it in its own scope
77#[must_use]
78pub struct SubscriptionGuard<T: Subscription>(pub(crate) Option<T>);
79
80impl<T: Subscription> SubscriptionGuard<T> {
81 /// Wraps an existing subscription with a guard to enable RAII behavior for
82 /// it.
83 pub fn new(subscription: T) -> SubscriptionGuard<T> { SubscriptionGuard(Some(subscription)) }
84}
85
86impl<T: Subscription> Drop for SubscriptionGuard<T> {
87 fn drop(&mut self) {
88 if let Some(u) = self.0.take() {
89 u.unsubscribe()
90 }
91 }
92}
93
94/// Subscription trait for managing observable subscriptions
95///
96/// Provides methods to cancel a subscription and check its status.
97/// Uses move semantics for `unsubscribe` to match the terminal nature of the
98/// operation, consistent with Observer's `error(self)` and `complete(self)`
99/// methods.
100pub trait Subscription {
101 /// Cancel the subscription (terminal operation, consumes self)
102 fn unsubscribe(self);
103
104 /// Check if the subscription is closed (completed or unsubscribed)
105 fn is_closed(&self) -> bool;
106
107 /// Activates "RAII" behavior for this subscription. That means
108 /// `unsubscribe()` will be called automatically as soon as the returned
109 /// value goes out of scope.
110 ///
111 /// **Attention:** If you don't assign the return value to a variable,
112 /// `unsubscribe()` is called immediately, which is probably not what you
113 /// want!
114 fn unsubscribe_when_dropped(self) -> SubscriptionGuard<Self>
115 where
116 Self: Sized,
117 {
118 SubscriptionGuard::new(self)
119 }
120}
121
122/// Unit subscription - always closed, does nothing
123impl Subscription for () {
124 fn unsubscribe(self) {}
125
126 fn is_closed(&self) -> bool { true }
127}
128
129/// Option subscription - None is closed, Some delegates to inner
130impl<P: Subscription> Subscription for Option<P> {
131 fn unsubscribe(self) {
132 if let Some(inner) = self {
133 inner.unsubscribe()
134 }
135 }
136
137 fn is_closed(&self) -> bool {
138 match self {
139 Some(inner) => inner.is_closed(),
140 None => true,
141 }
142 }
143}
144
145use crate::context::{MutArc, MutRc, RcDeref, RcDerefMut};
146
147impl<P: Subscription> Subscription for MutArc<Option<P>> {
148 fn unsubscribe(self) {
149 let Some(inner) = self.rc_deref_mut().take() else {
150 return;
151 };
152 inner.unsubscribe()
153 }
154
155 fn is_closed(&self) -> bool { self.rc_deref().is_none() }
156}
157
158impl<P: Subscription> Subscription for MutRc<Option<P>> {
159 fn unsubscribe(self) {
160 let Some(inner) = self.rc_deref_mut().take() else {
161 return;
162 };
163 inner.unsubscribe()
164 }
165
166 fn is_closed(&self) -> bool { self.rc_deref().is_none() }
167}
168
169#[cfg(test)]
170mod test {
171 use std::{cell::RefCell, rc::Rc};
172
173 use super::*;
174
175 #[rxrust_macro::test]
176 fn test_subscription_guard_drop() {
177 let unsubscribed = Rc::new(RefCell::new(false));
178 let unsubscribed_clone = unsubscribed.clone();
179
180 struct TestSubscription {
181 is_closed: bool,
182 unsubscribed_flag: Rc<RefCell<bool>>,
183 }
184
185 impl Subscription for TestSubscription {
186 fn unsubscribe(mut self) {
187 *self.unsubscribed_flag.borrow_mut() = true;
188 self.is_closed = true;
189 }
190
191 fn is_closed(&self) -> bool { self.is_closed }
192 }
193
194 let sub = TestSubscription { is_closed: false, unsubscribed_flag: unsubscribed_clone };
195
196 assert!(!*unsubscribed.borrow());
197 {
198 let _guard = sub.unsubscribe_when_dropped();
199 assert!(!*unsubscribed.borrow());
200 }
201 // When _guard goes out of scope, unsubscribe should be called
202 assert!(*unsubscribed.borrow());
203 }
204}