1use crate::prelude::*;
2
3#[derive(Clone)]
7pub struct Subscriber<O, U> {
8 pub(crate) observer: O,
9 pub(crate) subscription: U,
10}
11
12impl<O> Subscriber<O, LocalSubscription> {
13 pub fn local(observer: O) -> Self {
14 Subscriber {
15 observer,
16 subscription: LocalSubscription::default(),
17 }
18 }
19}
20
21impl<O> Subscriber<O, SharedSubscription> {
22 pub fn shared(observer: O) -> Self {
23 Subscriber {
24 observer,
25 subscription: SharedSubscription::default(),
26 }
27 }
28}
29
30impl<Item, Err, O, U> Observer for Subscriber<O, U>
31where
32 O: Observer<Item = Item, Err = Err>,
33 U: SubscriptionLike,
34{
35 type Item = Item;
36 type Err = Err;
37 fn next(&mut self, v: Item) {
38 if !self.is_finished() {
39 self.observer.next(v)
40 }
41 }
42
43 fn error(&mut self, err: Err) {
44 if !self.is_finished() {
45 self.observer.error(err);
46 }
47 }
48
49 fn complete(&mut self) {
50 if !self.is_finished() {
51 self.observer.complete();
52 }
53 }
54
55 #[inline]
56 fn is_stopped(&self) -> bool { self.observer.is_stopped() }
57}
58
59impl<O, U: SubscriptionLike> SubscriptionLike for Subscriber<O, U> {
60 #[inline]
61 fn is_closed(&self) -> bool { self.subscription.is_closed() }
62 #[inline]
63 fn unsubscribe(&mut self) { self.subscription.unsubscribe() }
64}
65
66#[cfg(test)]
67mod test {
68 use crate::prelude::*;
69 use std::sync::{Arc, Mutex};
70
71 #[test]
72 fn shared_next_complete() {
73 let (next, _, complete, mut subscriber) = shared_subscriber_creator();
74 subscriber.next(1);
75 subscriber.next(2);
76 subscriber.complete();
77 subscriber.next(3);
78 subscriber.next(4);
79 assert_eq!(*next.lock().unwrap(), 2);
80 assert_eq!(*complete.lock().unwrap(), 1);
81 }
82
83 #[test]
84 fn shared_err_complete() {
85 let (next, error, _, mut subscriber) = shared_subscriber_creator();
86 subscriber.next(1);
87 subscriber.next(2);
88 subscriber.error(());
89 subscriber.next(3);
90 subscriber.next(4);
91
92 assert_eq!(*next.lock().unwrap(), 2);
93 assert_eq!(*error.lock().unwrap(), 1);
94 }
95
96 type SubscriberInfo<O> =
97 (Arc<Mutex<i32>>, Arc<Mutex<i32>>, Arc<Mutex<i32>>, O);
98 fn shared_subscriber_creator()
99 -> SubscriberInfo<impl Observer<Item = i32, Err = ()>> {
100 let next = Arc::new(Mutex::new(0));
101 let err = Arc::new(Mutex::new(0));
102 let complete = Arc::new(Mutex::new(0));
103
104 (
105 next.clone(),
106 err.clone(),
107 complete.clone(),
108 Subscriber::shared(ObserverAll::new(
109 move |_| *next.lock().unwrap() += 1,
110 move |_| *err.lock().unwrap() += 1,
111 move || *complete.lock().unwrap() += 1,
112 )),
113 )
114 }
115
116 #[test]
117 fn next_and_complete() {
118 let mut next = 0;
119 let mut complete = 0;
120
121 let mut subscriber = Subscriber::local(ObserverAll::new(
122 |_: &_| next += 1,
123 |_: &i32| {},
124 || complete += 1,
125 ));
126
127 subscriber.next(&1);
128 subscriber.next(&2);
129 subscriber.complete();
130 subscriber.next(&3);
131 subscriber.next(&4);
132 assert_eq!(next, 2);
133 assert_eq!(complete, 1);
134 }
135
136 #[test]
137 fn next_and_error() {
138 let mut next = 0;
139 let mut err = 0;
140
141 let mut subscriber =
142 Subscriber::local(ObserverErr::new(|_: &_| next += 1, |_: &()| err += 1));
143
144 subscriber.next(&1);
145 subscriber.next(&2);
146 subscriber.error(&());
147 subscriber.next(&3);
148 subscriber.next(&4);
149
150 assert_eq!(next, 2);
151 assert_eq!(err, 1);
152 }
153}