rxrust/observable/
observable_comp.rs

1use crate::prelude::*;
2
3#[derive(Clone)]
4pub struct ObserverComp<N, C, Item> {
5  next: N,
6  complete: C,
7  is_stopped: bool,
8  _marker: TypeHint<*const Item>,
9}
10
11impl<N, C, Item> Observer for ObserverComp<N, C, Item>
12where
13  C: FnMut(),
14  N: FnMut(Item),
15{
16  type Item = Item;
17  type Err = ();
18  #[inline]
19  fn next(&mut self, value: Item) { (self.next)(value); }
20  #[inline]
21  fn error(&mut self, _err: ()) { self.is_stopped = true; }
22  fn complete(&mut self) {
23    (self.complete)();
24    self.is_stopped = true;
25  }
26  fn is_stopped(&self) -> bool { self.is_stopped }
27}
28
29pub trait SubscribeComplete<'a, N, C> {
30  /// A type implementing [`SubscriptionLike`]
31  type Unsub: SubscriptionLike;
32
33  /// Invokes an execution of an Observable and registers Observer handlers for
34  /// notifications it will emit.
35  fn subscribe_complete(
36    self,
37    next: N,
38    complete: C,
39  ) -> SubscriptionWrapper<Self::Unsub>;
40}
41
42impl<'a, S, N, C> SubscribeComplete<'a, N, C> for S
43where
44  S: LocalObservable<'a, Err = ()>,
45  C: FnMut() + 'a,
46  N: FnMut(S::Item) + 'a,
47  S::Item: 'a,
48{
49  type Unsub = S::Unsub;
50  fn subscribe_complete(
51    self,
52    next: N,
53    complete: C,
54  ) -> SubscriptionWrapper<Self::Unsub>
55  where
56    Self: Sized,
57    S::Item: 'a,
58  {
59    let unsub = self.actual_subscribe(Subscriber::local(ObserverComp {
60      next,
61      complete,
62      is_stopped: false,
63      _marker: TypeHint::new(),
64    }));
65    SubscriptionWrapper(unsub)
66  }
67}
68
69impl<'a, S, N, C> SubscribeComplete<'a, N, C> for Shared<S>
70where
71  S: SharedObservable<Err = ()>,
72  C: FnMut() + Send + Sync + 'static,
73  N: FnMut(S::Item) + Send + Sync + 'static,
74  S::Item: 'static,
75{
76  type Unsub = S::Unsub;
77  fn subscribe_complete(
78    self,
79    next: N,
80    complete: C,
81  ) -> SubscriptionWrapper<Self::Unsub>
82  where
83    Self: Sized,
84  {
85    let unsub = self.0.actual_subscribe(Subscriber::shared(ObserverComp {
86      next,
87      complete,
88      is_stopped: false,
89      _marker: TypeHint::new(),
90    }));
91    SubscriptionWrapper(unsub)
92  }
93}
94
95#[test]
96fn raii() {
97  let mut times = 0;
98  {
99    let mut subject = LocalSubject::new();
100    {
101      let _ = subject
102        .clone()
103        .subscribe_complete(|_| times += 1, || {})
104        .unsubscribe_when_dropped();
105    } // <-- guard is dropped here!
106    subject.next(());
107  }
108  assert_eq!(times, 0);
109}