rxrust/
subscription.rs

1//! Subscription trait and implementations
2//!
3//! This module contains the Subscription trait for managing the lifecycle
4//! of subscriptions.
5
6pub mod boxed;
7pub mod dynamic;
8pub mod either;
9pub mod source_with_dynamic;
10pub mod source_with_handle;
11pub mod tuple;
12
13pub use boxed::{BoxedSubscription, BoxedSubscriptionSend, IntoBoxedSubscription};
14pub use dynamic::DynamicSubscriptions;
15pub use either::EitherSubscription;
16pub use source_with_dynamic::SourceWithDynamicSubs;
17pub use source_with_handle::SourceWithHandle;
18pub use tuple::TupleSubscription;
19
20/// A functional adapter that turns a closure into a `Subscription`.
21///
22/// This struct allows users to define teardown/unsubscription logic using a
23/// simple closure, without needing to define a custom struct that implements
24/// the `Subscription` trait. It is particularly useful when working with the
25/// [`create`](crate::observable::ObservableFactory::create) operator, where you
26/// often need to return a cleanup action.
27///
28/// # Why is this needed?
29///
30/// Rust's orphan rules and type inference limitations prevent us from directly
31/// implementing `Subscription` for all `FnOnce()` closures globally.
32/// `ClosureSubscription` serves as an explicit wrapper to tell the compiler:
33/// "Treat this closure as a Subscription".
34///
35/// # Zero-Cost Abstraction
36///
37/// This is a newtype wrapper that compiles down to a direct function call.
38/// There is no runtime overhead compared to calling the closure directly.
39///
40/// # Example
41///
42/// ```rust
43/// use std::convert::Infallible;
44///
45/// use rxrust::{prelude::*, subscription::ClosureSubscription};
46///
47/// Local::create::<(), Infallible, _, _>(|_emitter| {
48///   println!("Subscribed");
49///
50///   // Return a closure wrapped in ClosureSubscription as the teardown logic
51///   ClosureSubscription(move || {
52///     println!("Unsubscribed - cleaning up resources");
53///   })
54/// });
55/// ```
56pub struct ClosureSubscription<F>(pub F);
57
58impl<F> Subscription for ClosureSubscription<F>
59where
60  F: FnOnce(),
61{
62  fn unsubscribe(self) { (self.0)() }
63
64  fn is_closed(&self) -> bool { false }
65}
66
67/// An RAII implementation of a "scoped subscribed" of a subscription.
68/// When this structure is dropped (falls out of scope), the subscription will
69/// be unsubscribed.
70///
71/// Implements the [must_use](
72/// https://doc.rust-lang.org/reference/attributes/diagnostics.html
73/// #the-must_use-attribute)
74/// attribute
75///
76/// If you want to drop it immediately, wrap it in its own scope
77#[must_use]
78pub struct SubscriptionGuard<T: Subscription>(pub(crate) Option<T>);
79
80impl<T: Subscription> SubscriptionGuard<T> {
81  /// Wraps an existing subscription with a guard to enable RAII behavior for
82  /// it.
83  pub fn new(subscription: T) -> SubscriptionGuard<T> { SubscriptionGuard(Some(subscription)) }
84}
85
86impl<T: Subscription> Drop for SubscriptionGuard<T> {
87  fn drop(&mut self) {
88    if let Some(u) = self.0.take() {
89      u.unsubscribe()
90    }
91  }
92}
93
94/// Subscription trait for managing observable subscriptions
95///
96/// Provides methods to cancel a subscription and check its status.
97/// Uses move semantics for `unsubscribe` to match the terminal nature of the
98/// operation, consistent with Observer's `error(self)` and `complete(self)`
99/// methods.
100pub trait Subscription {
101  /// Cancel the subscription (terminal operation, consumes self)
102  fn unsubscribe(self);
103
104  /// Check if the subscription is closed (completed or unsubscribed)
105  fn is_closed(&self) -> bool;
106
107  /// Activates "RAII" behavior for this subscription. That means
108  /// `unsubscribe()` will be called automatically as soon as the returned
109  /// value goes out of scope.
110  ///
111  /// **Attention:** If you don't assign the return value to a variable,
112  /// `unsubscribe()` is called immediately, which is probably not what you
113  /// want!
114  fn unsubscribe_when_dropped(self) -> SubscriptionGuard<Self>
115  where
116    Self: Sized,
117  {
118    SubscriptionGuard::new(self)
119  }
120}
121
122/// Unit subscription - always closed, does nothing
123impl Subscription for () {
124  fn unsubscribe(self) {}
125
126  fn is_closed(&self) -> bool { true }
127}
128
129/// Option subscription - None is closed, Some delegates to inner
130impl<P: Subscription> Subscription for Option<P> {
131  fn unsubscribe(self) {
132    if let Some(inner) = self {
133      inner.unsubscribe()
134    }
135  }
136
137  fn is_closed(&self) -> bool {
138    match self {
139      Some(inner) => inner.is_closed(),
140      None => true,
141    }
142  }
143}
144
145use crate::context::{MutArc, MutRc, RcDeref, RcDerefMut};
146
147impl<P: Subscription> Subscription for MutArc<Option<P>> {
148  fn unsubscribe(self) {
149    let Some(inner) = self.rc_deref_mut().take() else {
150      return;
151    };
152    inner.unsubscribe()
153  }
154
155  fn is_closed(&self) -> bool { self.rc_deref().is_none() }
156}
157
158impl<P: Subscription> Subscription for MutRc<Option<P>> {
159  fn unsubscribe(self) {
160    let Some(inner) = self.rc_deref_mut().take() else {
161      return;
162    };
163    inner.unsubscribe()
164  }
165
166  fn is_closed(&self) -> bool { self.rc_deref().is_none() }
167}
168
169#[cfg(test)]
170mod test {
171  use std::{cell::RefCell, rc::Rc};
172
173  use super::*;
174
175  #[rxrust_macro::test]
176  fn test_subscription_guard_drop() {
177    let unsubscribed = Rc::new(RefCell::new(false));
178    let unsubscribed_clone = unsubscribed.clone();
179
180    struct TestSubscription {
181      is_closed: bool,
182      unsubscribed_flag: Rc<RefCell<bool>>,
183    }
184
185    impl Subscription for TestSubscription {
186      fn unsubscribe(mut self) {
187        *self.unsubscribed_flag.borrow_mut() = true;
188        self.is_closed = true;
189      }
190
191      fn is_closed(&self) -> bool { self.is_closed }
192    }
193
194    let sub = TestSubscription { is_closed: false, unsubscribed_flag: unsubscribed_clone };
195
196    assert!(!*unsubscribed.borrow());
197    {
198      let _guard = sub.unsubscribe_when_dropped();
199      assert!(!*unsubscribed.borrow());
200    }
201    // When _guard goes out of scope, unsubscribe should be called
202    assert!(*unsubscribed.borrow());
203  }
204}