rxrust/
subscriber.rs

1use crate::prelude::*;
2
3/// Implements the Observer trait and Subscription trait. While the Observer is
4/// the public API for consuming the values of an Observable, all Observers get
5/// converted to a Subscriber, in order to provide Subscription capabilities.
6#[derive(Clone)]
7pub struct Subscriber<O, U> {
8  pub(crate) observer: O,
9  pub(crate) subscription: U,
10}
11
12impl<O> Subscriber<O, LocalSubscription> {
13  pub fn local(observer: O) -> Self {
14    Subscriber {
15      observer,
16      subscription: LocalSubscription::default(),
17    }
18  }
19}
20
21impl<O> Subscriber<O, SharedSubscription> {
22  pub fn shared(observer: O) -> Self {
23    Subscriber {
24      observer,
25      subscription: SharedSubscription::default(),
26    }
27  }
28}
29
30impl<Item, Err, O, U> Observer for Subscriber<O, U>
31where
32  O: Observer<Item = Item, Err = Err>,
33  U: SubscriptionLike,
34{
35  type Item = Item;
36  type Err = Err;
37  fn next(&mut self, v: Item) {
38    if !self.is_finished() {
39      self.observer.next(v)
40    }
41  }
42
43  fn error(&mut self, err: Err) {
44    if !self.is_finished() {
45      self.observer.error(err);
46    }
47  }
48
49  fn complete(&mut self) {
50    if !self.is_finished() {
51      self.observer.complete();
52    }
53  }
54
55  #[inline]
56  fn is_stopped(&self) -> bool { self.observer.is_stopped() }
57}
58
59impl<O, U: SubscriptionLike> SubscriptionLike for Subscriber<O, U> {
60  #[inline]
61  fn is_closed(&self) -> bool { self.subscription.is_closed() }
62  #[inline]
63  fn unsubscribe(&mut self) { self.subscription.unsubscribe() }
64}
65
66#[cfg(test)]
67mod test {
68  use crate::prelude::*;
69  use std::sync::{Arc, Mutex};
70
71  #[test]
72  fn shared_next_complete() {
73    let (next, _, complete, mut subscriber) = shared_subscriber_creator();
74    subscriber.next(1);
75    subscriber.next(2);
76    subscriber.complete();
77    subscriber.next(3);
78    subscriber.next(4);
79    assert_eq!(*next.lock().unwrap(), 2);
80    assert_eq!(*complete.lock().unwrap(), 1);
81  }
82
83  #[test]
84  fn shared_err_complete() {
85    let (next, error, _, mut subscriber) = shared_subscriber_creator();
86    subscriber.next(1);
87    subscriber.next(2);
88    subscriber.error(());
89    subscriber.next(3);
90    subscriber.next(4);
91
92    assert_eq!(*next.lock().unwrap(), 2);
93    assert_eq!(*error.lock().unwrap(), 1);
94  }
95
96  type SubscriberInfo<O> =
97    (Arc<Mutex<i32>>, Arc<Mutex<i32>>, Arc<Mutex<i32>>, O);
98  fn shared_subscriber_creator()
99  -> SubscriberInfo<impl Observer<Item = i32, Err = ()>> {
100    let next = Arc::new(Mutex::new(0));
101    let err = Arc::new(Mutex::new(0));
102    let complete = Arc::new(Mutex::new(0));
103
104    (
105      next.clone(),
106      err.clone(),
107      complete.clone(),
108      Subscriber::shared(ObserverAll::new(
109        move |_| *next.lock().unwrap() += 1,
110        move |_| *err.lock().unwrap() += 1,
111        move || *complete.lock().unwrap() += 1,
112      )),
113    )
114  }
115
116  #[test]
117  fn next_and_complete() {
118    let mut next = 0;
119    let mut complete = 0;
120
121    let mut subscriber = Subscriber::local(ObserverAll::new(
122      |_: &_| next += 1,
123      |_: &i32| {},
124      || complete += 1,
125    ));
126
127    subscriber.next(&1);
128    subscriber.next(&2);
129    subscriber.complete();
130    subscriber.next(&3);
131    subscriber.next(&4);
132    assert_eq!(next, 2);
133    assert_eq!(complete, 1);
134  }
135
136  #[test]
137  fn next_and_error() {
138    let mut next = 0;
139    let mut err = 0;
140
141    let mut subscriber =
142      Subscriber::local(ObserverErr::new(|_: &_| next += 1, |_: &()| err += 1));
143
144    subscriber.next(&1);
145    subscriber.next(&2);
146    subscriber.error(&());
147    subscriber.next(&3);
148    subscriber.next(&4);
149
150    assert_eq!(next, 2);
151    assert_eq!(err, 1);
152  }
153}