1use crate::prelude::*;
2use std::fmt::{Debug, Formatter};
3mod local_subject;
4pub use local_subject::*;
5
6mod shared_subject;
7pub use shared_subject::*;
8use std::cell::RefCell;
9use std::rc::Rc;
10use std::sync::{Arc, Mutex};
11
12#[derive(Default, Clone)]
13pub struct Subject<V, S> {
14 pub(crate) observers: SubjectObserver<V>,
15 pub(crate) subscription: S,
16}
17
18impl<O, U: SubscriptionLike> SubscriptionLike for Subject<O, U> {
19 #[inline]
20 fn unsubscribe(&mut self) { self.subscription.unsubscribe(); }
21
22 #[inline]
23 fn is_closed(&self) -> bool { self.subscription.is_closed() }
24}
25
26macro_rules! impl_observer {
27 () => {
28 #[inline]
29 fn next(&mut self, value: Item) { self.observers.next(value) }
30
31 #[inline]
32 fn error(&mut self, err: Err) { self.observers.error(err) }
33
34 #[inline]
35 fn complete(&mut self) { self.observers.complete() }
36
37 #[inline]
38 fn is_stopped(&self) -> bool { self.observers.is_stopped() }
39 };
40}
41
42impl<Item, Err, U, O> Observer for Subject<Arc<Mutex<Vec<O>>>, U>
43where
44 O: Observer<Item = Item, Err = Err> + SubscriptionLike,
45 Item: Clone,
46 Err: Clone,
47{
48 type Item = Item;
49 type Err = Err;
50
51 impl_observer!();
52}
53
54impl<Item, Err, U, O> Observer for Subject<Rc<RefCell<Vec<O>>>, U>
55where
56 O: Observer<Item = Item, Err = Err> + SubscriptionLike,
57 Item: Clone,
58 Err: Clone,
59{
60 type Item = Item;
61 type Err = Err;
62
63 impl_observer!();
64}
65
66impl<Item, Err, U, O> Observer for Subject<Box<Vec<O>>, U>
67where
68 O: Observer<Item = Item, Err = Err> + SubscriptionLike,
69 Item: Clone,
70 Err: Clone,
71{
72 type Item = Item;
73 type Err = Err;
74 impl_observer!();
75}
76
77#[derive(Default, Clone)]
78pub(crate) struct SubjectObserver<V> {
79 pub(crate) observers: V,
80 is_stopped: bool,
81}
82
83impl<Item, Err, O> Observer for SubjectObserver<Arc<Mutex<Vec<O>>>>
84where
85 O: Publisher<Item = Item, Err = Err>,
86 Item: Clone,
87 Err: Clone,
88{
89 type Item = Item;
90 type Err = Err;
91 fn next(&mut self, value: Item) {
92 {
93 let mut vec = self.observers.lock().unwrap();
94 let not_done: Vec<O> = vec
95 .drain(..)
96 .map(|mut o| {
97 o.next(value.clone());
98 o
99 })
100 .filter(|o| !o.is_finished())
101 .collect();
102 for p in not_done {
103 vec.push(p);
104 }
105 }
106 }
107
108 fn error(&mut self, err: Err) {
109 let mut observers = self.observers.lock().unwrap();
110 observers
111 .iter_mut()
112 .for_each(|subscriber| subscriber.error(err.clone()));
113 observers.clear();
114 self.is_stopped = true;
115 }
116
117 fn complete(&mut self) {
118 let mut observers = self.observers.lock().unwrap();
119 observers
120 .iter_mut()
121 .for_each(|subscriber| subscriber.complete());
122 observers.clear();
123 self.is_stopped = true;
124 }
125
126 #[inline]
127 fn is_stopped(&self) -> bool { self.is_stopped }
128}
129
130impl<Item, Err, O> Observer for SubjectObserver<Rc<RefCell<Vec<O>>>>
131where
132 O: Publisher<Item = Item, Err = Err>,
133 Item: Clone,
134 Err: Clone,
135{
136 type Item = Item;
137 type Err = Err;
138 fn next(&mut self, value: Item) {
139 {
140 let mut vec = self.observers.borrow_mut();
141 let not_done: Vec<O> = vec
142 .drain(..)
143 .map(|mut o| {
144 o.next(value.clone());
145 o
146 })
147 .filter(|o| !o.is_finished())
148 .collect();
149 for p in not_done {
150 vec.push(p);
151 }
152 }
153 }
154
155 fn error(&mut self, err: Err) {
156 let mut observers = self.observers.borrow_mut();
157 observers
158 .iter_mut()
159 .for_each(|subscriber| subscriber.error(err.clone()));
160 observers.clear();
161 self.is_stopped = true;
162 }
163
164 fn complete(&mut self) {
165 let mut observers = self.observers.borrow_mut();
166 observers
167 .iter_mut()
168 .for_each(|subscriber| subscriber.complete());
169 observers.clear();
170 self.is_stopped = true;
171 }
172
173 #[inline]
174 fn is_stopped(&self) -> bool { self.is_stopped }
175}
176
177impl<Item, Err, O> Observer for SubjectObserver<Box<Vec<O>>>
178where
179 O: Publisher<Item = Item, Err = Err>,
180 Item: Clone,
181 Err: Clone,
182{
183 type Item = Item;
184 type Err = Err;
185 fn next(&mut self, value: Item) {
186 {
187 let vec = &mut self.observers;
188 let not_done: Vec<O> = vec
189 .drain(..)
190 .map(|mut o| {
191 o.next(value.clone());
192 o
193 })
194 .filter(|o| !o.is_finished())
195 .collect();
196 for p in not_done {
197 vec.push(p);
198 }
199 }
200 }
201
202 fn error(&mut self, err: Err) {
203 let observers = &mut self.observers;
204 observers
205 .iter_mut()
206 .for_each(|subscriber| subscriber.error(err.clone()));
207 observers.clear();
208 self.is_stopped = true;
209 }
210
211 fn complete(&mut self) {
212 let observers = &mut self.observers;
213 observers
214 .iter_mut()
215 .for_each(|subscriber| subscriber.complete());
216 observers.clear();
217 self.is_stopped = true;
218 }
219
220 #[inline]
221 fn is_stopped(&self) -> bool { self.is_stopped }
222}
223impl<O, S> Debug for Subject<Arc<Mutex<Vec<O>>>, S> {
224 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
225 f.debug_struct("LocalSubject")
226 .field(
227 "observer_count",
228 &self.observers.observers.lock().unwrap().len(),
229 )
230 .finish()
231 }
232}
233impl<O, S> Debug for Subject<Rc<RefCell<Vec<O>>>, S> {
234 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
235 f.debug_struct("LocalSubject")
236 .field(
237 "observer_count",
238 &self.observers.observers.borrow_mut().len(),
239 )
240 .finish()
241 }
242}
243
244impl<O, S> Debug for Subject<Box<Vec<O>>, S> {
245 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
246 f.debug_struct("LocalSubject")
247 .field("observer_count", &self.observers.observers.len())
248 .finish()
249 }
250}
251#[cfg(test)]
252mod test {
253 use super::*;
254 use futures::executor::ThreadPool;
255 use std::time::{Duration, Instant};
256
257 #[test]
258 fn base_data_flow() {
259 let mut i = 0;
260 {
261 let mut broadcast = LocalSubject::new();
262 broadcast.clone().subscribe(|v| i = v * 2);
263 broadcast.next(1);
264 }
265 assert_eq!(i, 2);
266 }
267
268 #[test]
269 #[should_panic]
270 fn error() {
271 let mut broadcast = LocalSubject::new();
272 broadcast
273 .clone()
274 .subscribe_err(|_: i32| {}, |e: _| panic!("{}", e));
275 broadcast.next(1);
276
277 broadcast.error(&"should panic!");
278 }
279
280 #[test]
281 fn unsubscribe() {
282 let mut i = 0;
283
284 {
285 let mut subject = LocalSubject::new();
286 subject.clone().subscribe(|v| i = v).unsubscribe();
287 subject.next(100);
288 }
289
290 assert_eq!(i, 0);
291 }
292
293 #[test]
294 fn empty_local_subject_can_convert_into_shared() {
295 let pool = ThreadPool::new().unwrap();
296 use std::sync::{Arc, Mutex};
297 let value = Arc::new(Mutex::new(0));
298 let c_v = value.clone();
299 let subject = SharedSubject::new();
300 let mut subject_c = subject.clone();
301 let stamp = Instant::now();
302 pool.schedule(
303 move |_| {
304 subject_c.next(100);
305 subject_c.complete();
306 },
307 Some(Duration::from_millis(25)),
308 (),
309 );
310 subject
311 .clone()
312 .into_shared()
313 .observe_on(pool)
314 .into_shared()
315 .subscribe_blocking(move |v: i32| {
316 *value.lock().unwrap() = v;
317 });
318 assert!(stamp.elapsed() > Duration::from_millis(25));
319 assert_eq!(*c_v.lock().unwrap(), 100);
320 }
321
322 #[test]
323 fn subject_subscribe_subject() {
324 let mut local = LocalSubject::new();
325 let local2 = LocalSubject::new();
326 local.clone().actual_subscribe(Subscriber {
327 observer: local2.observers,
328 subscription: local2.subscription,
329 });
330 local.next(1);
331 local.error(2);
332 }
333}