rxrust/observable/
observable_next.rs

1use crate::prelude::*;
2
3#[derive(Clone)]
4pub struct ObserverN<N, Item> {
5  next: N,
6  is_stopped: bool,
7  _marker: TypeHint<*const Item>,
8}
9
10impl<Item, N> Observer for ObserverN<N, Item>
11where
12  N: FnMut(Item),
13{
14  type Item = Item;
15  type Err = ();
16  #[inline]
17  fn next(&mut self, value: Self::Item) { (self.next)(value); }
18  #[inline]
19  fn error(&mut self, _err: ()) { self.is_stopped = true; }
20  #[inline]
21  fn complete(&mut self) { self.is_stopped = true; }
22  fn is_stopped(&self) -> bool { self.is_stopped }
23}
24
25pub trait SubscribeNext<'a, N> {
26  /// A type implementing [`SubscriptionLike`]
27  type Unsub: SubscriptionLike;
28
29  /// Invokes an execution of an Observable and registers Observer handlers for
30  /// notifications it will emit.
31  fn subscribe(self, next: N) -> SubscriptionWrapper<Self::Unsub>;
32}
33
34impl<'a, S, N> SubscribeNext<'a, N> for S
35where
36  S: LocalObservable<'a, Err = ()>,
37  N: FnMut(S::Item) + 'a,
38  S::Item: 'a,
39{
40  type Unsub = S::Unsub;
41  fn subscribe(self, next: N) -> SubscriptionWrapper<Self::Unsub> {
42    let unsub = self.actual_subscribe(Subscriber::local(ObserverN {
43      next,
44      is_stopped: false,
45      _marker: TypeHint::new(),
46    }));
47    SubscriptionWrapper(unsub)
48  }
49}
50
51impl<'a, S, N> SubscribeNext<'a, N> for Shared<S>
52where
53  S: SharedObservable<Err = ()>,
54  N: FnMut(S::Item) + Send + Sync + 'static,
55  S::Item: 'static,
56{
57  type Unsub = S::Unsub;
58  fn subscribe(self, next: N) -> SubscriptionWrapper<Self::Unsub> {
59    let unsub = self.0.actual_subscribe(Subscriber::shared(ObserverN {
60      next,
61      is_stopped: false,
62      _marker: TypeHint::new(),
63    }));
64    SubscriptionWrapper(unsub)
65  }
66}
67
68#[test]
69fn raii() {
70  let mut times = 0;
71  {
72    let mut subject = LocalSubject::new();
73    {
74      let _ = subject
75        .clone()
76        .subscribe(|_| {
77          times += 1;
78        })
79        .unsubscribe_when_dropped();
80    } // <-- guard is dropped here!
81    subject.next(());
82  }
83  assert_eq!(times, 0);
84}