rxrust/
subscription.rs

1use crate::prelude::*;
2use smallvec::SmallVec;
3use std::{
4  any::Any,
5  cell::RefCell,
6  fmt::{Debug, Formatter},
7  rc::Rc,
8  sync::{Arc, Mutex},
9};
10
11/// Subscription returns from `Observable.subscribe(Subscriber)` to allow
12///  unsubscribing.
13pub trait SubscriptionLike {
14  /// This allows deregistering an stream before it has finished receiving all
15  /// events (i.e. before onCompleted is called).
16  fn unsubscribe(&mut self);
17
18  fn is_closed(&self) -> bool;
19}
20
21impl Debug for Box<dyn SubscriptionLike> {
22  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
23    f.debug_struct("Box<dyn SubscriptionLike>")
24      .field("is_closed", &self.is_closed())
25      .finish()
26  }
27}
28
29#[derive(Clone, Debug, Default)]
30pub struct LocalSubscription(Rc<RefCell<Inner<Box<dyn SubscriptionLike>>>>);
31
32impl LocalSubscription {
33  pub fn add<S: SubscriptionLike + 'static>(&self, subscription: S) {
34    if !self.is_same(&subscription) {
35      self.0.borrow_mut().add(Box::new(subscription))
36    }
37  }
38
39  fn is_same(&self, other: &dyn Any) -> bool {
40    if let Some(other) = other.downcast_ref::<Self>() {
41      Rc::ptr_eq(&self.0, &other.0)
42    } else {
43      false
44    }
45  }
46}
47
48impl TearDownSize for LocalSubscription {
49  fn teardown_size(&self) -> usize { self.0.borrow().teardown.len() }
50}
51
52pub trait TearDownSize: SubscriptionLike {
53  fn teardown_size(&self) -> usize;
54}
55
56impl SubscriptionLike for LocalSubscription {
57  #[inline]
58  fn unsubscribe(&mut self) { self.0.unsubscribe() }
59  #[inline]
60  fn is_closed(&self) -> bool { self.0.is_closed() }
61}
62
63#[derive(Clone, Debug, Default)]
64pub struct SharedSubscription(
65  Arc<Mutex<Inner<Box<dyn SubscriptionLike + Send + Sync>>>>,
66);
67
68impl SharedSubscription {
69  pub fn add<S: SubscriptionLike + Send + Sync + 'static>(
70    &self,
71    subscription: S,
72  ) {
73    if !self.is_same(&subscription) {
74      self.0.lock().unwrap().add(Box::new(subscription));
75    }
76  }
77
78  fn is_same(&self, other: &dyn Any) -> bool {
79    if let Some(other) = other.downcast_ref::<Self>() {
80      Arc::ptr_eq(&self.0, &other.0)
81    } else {
82      false
83    }
84  }
85}
86
87impl TearDownSize for SharedSubscription {
88  fn teardown_size(&self) -> usize { self.0.lock().unwrap().teardown.len() }
89}
90
91impl SubscriptionLike for SharedSubscription {
92  #[inline]
93  fn unsubscribe(&mut self) { self.0.unsubscribe(); }
94  #[inline]
95  fn is_closed(&self) -> bool { self.0.is_closed() }
96}
97
98pub trait Publisher: Observer + SubscriptionLike {
99  #[inline]
100  fn is_finished(&self) -> bool { self.is_closed() || self.is_stopped() }
101}
102
103impl<T> Publisher for T where T: Observer + SubscriptionLike {}
104
105struct Inner<T> {
106  closed: bool,
107  teardown: SmallVec<[T; 1]>,
108}
109
110impl<T> Debug for Inner<T> {
111  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
112    f.debug_struct("Inner")
113      .field("closed", &self.closed)
114      .field("teardown_count", &self.teardown.len())
115      .finish()
116  }
117}
118
119impl<T: SubscriptionLike> SubscriptionLike for Inner<T> {
120  #[inline(always)]
121  fn is_closed(&self) -> bool { self.closed }
122
123  fn unsubscribe(&mut self) {
124    if !self.closed {
125      self.closed = true;
126      for v in &mut self.teardown {
127        v.unsubscribe();
128      }
129    }
130  }
131}
132
133impl<T: SubscriptionLike> Inner<T> {
134  fn add(&mut self, mut v: T) {
135    if self.closed {
136      v.unsubscribe();
137    } else {
138      self.teardown.retain(|v| !v.is_closed());
139      self.teardown.push(v);
140    }
141  }
142}
143
144impl<T> Default for Inner<T> {
145  fn default() -> Self {
146    Inner {
147      closed: false,
148      teardown: SmallVec::new(),
149    }
150  }
151}
152
153impl<T> SubscriptionLike for Arc<Mutex<T>>
154where
155  T: SubscriptionLike,
156{
157  #[inline]
158  fn unsubscribe(&mut self) { self.lock().unwrap().unsubscribe() }
159
160  #[inline]
161  fn is_closed(&self) -> bool { self.lock().unwrap().is_closed() }
162}
163
164impl<T> SubscriptionLike for Rc<RefCell<T>>
165where
166  T: SubscriptionLike,
167{
168  #[inline]
169  fn unsubscribe(&mut self) { self.borrow_mut().unsubscribe() }
170
171  #[inline]
172  fn is_closed(&self) -> bool { self.borrow().is_closed() }
173}
174
175impl<T: ?Sized> SubscriptionLike for Box<T>
176where
177  T: SubscriptionLike,
178{
179  #[inline]
180  fn unsubscribe(&mut self) {
181    let s = &mut **self;
182    s.unsubscribe()
183  }
184
185  #[inline]
186  fn is_closed(&self) -> bool {
187    let s = &**self;
188    s.is_closed()
189  }
190}
191
192/// Wrapper around a subscription which provides the
193/// `unsubscribe_when_dropped()` method.
194pub struct SubscriptionWrapper<T: SubscriptionLike>(pub(crate) T);
195
196impl<T: SubscriptionLike> SubscriptionWrapper<T> {
197  /// Activates "RAII" behavior for this subscription. That means
198  /// `unsubscribe()` will be called automatically as soon as the returned
199  /// value goes out of scope.
200  ///
201  /// **Attention:** If you don't assign the return value to a variable,
202  /// `unsubscribe()` is called immediately, which is probably not what you
203  /// want!
204  pub fn unsubscribe_when_dropped(self) -> SubscriptionGuard<T> {
205    SubscriptionGuard(self.0)
206  }
207
208  /// Consumes this wrapper and returns the underlying subscription.
209  pub fn into_inner(self) -> T { self.0 }
210}
211
212impl<T: SubscriptionLike> SubscriptionLike for SubscriptionWrapper<T> {
213  #[inline]
214  fn is_closed(&self) -> bool { self.0.is_closed() }
215  #[inline]
216  fn unsubscribe(&mut self) { self.0.unsubscribe() }
217}
218
219/// An RAII implementation of a "scoped subscribed" of a subscription.
220/// When this structure is dropped (falls out of scope), the subscription will
221/// be unsubscribed.
222///
223/// Implements the [must_use](
224/// https://doc.rust-lang.org/reference/attributes/diagnostics.html
225/// #the-must_use-attribute)
226/// attribute
227///
228/// If you want to drop it immediately, wrap it in its own scope
229#[derive(Debug)]
230#[must_use]
231pub struct SubscriptionGuard<T: SubscriptionLike>(pub(crate) T);
232
233impl<T: SubscriptionLike> SubscriptionGuard<T> {
234  /// Wraps an existing subscription with a guard to enable RAII behavior for
235  /// it.
236  pub fn new(subscription: T) -> SubscriptionGuard<T> {
237    SubscriptionGuard(subscription)
238  }
239}
240
241impl<T: SubscriptionLike> Drop for SubscriptionGuard<T> {
242  #[inline]
243  fn drop(&mut self) { self.0.unsubscribe() }
244}
245
246#[cfg(test)]
247mod test {
248  use super::*;
249  #[test]
250  fn add_remove_for_local() {
251    let local = LocalSubscription::default();
252    let l1 = LocalSubscription::default();
253    let l2 = LocalSubscription::default();
254    let l3 = LocalSubscription::default();
255    local.add(l1);
256    assert_eq!(local.0.borrow().teardown.len(), 1);
257    local.add(l2);
258    assert_eq!(local.0.borrow().teardown.len(), 2);
259    local.add(l3);
260    assert_eq!(local.0.borrow().teardown.len(), 3);
261  }
262
263  #[test]
264  fn add_remove_for_shared() {
265    let local = SharedSubscription::default();
266    let l1 = SharedSubscription::default();
267    let l2 = SharedSubscription::default();
268    let l3 = SharedSubscription::default();
269    local.add(l1);
270    assert_eq!(local.0.lock().unwrap().teardown.len(), 1);
271    local.add(l2);
272    assert_eq!(local.0.lock().unwrap().teardown.len(), 2);
273    local.add(l3);
274    assert_eq!(local.0.lock().unwrap().teardown.len(), 3);
275  }
276}