rxrust/
subject.rs

1use crate::prelude::*;
2use std::fmt::{Debug, Formatter};
3mod local_subject;
4pub use local_subject::*;
5
6mod shared_subject;
7pub use shared_subject::*;
8use std::cell::RefCell;
9use std::rc::Rc;
10use std::sync::{Arc, Mutex};
11
12#[derive(Default, Clone)]
13pub struct Subject<V, S> {
14  pub(crate) observers: SubjectObserver<V>,
15  pub(crate) subscription: S,
16}
17
18impl<O, U: SubscriptionLike> SubscriptionLike for Subject<O, U> {
19  #[inline]
20  fn unsubscribe(&mut self) { self.subscription.unsubscribe(); }
21
22  #[inline]
23  fn is_closed(&self) -> bool { self.subscription.is_closed() }
24}
25
26macro_rules! impl_observer {
27  () => {
28    #[inline]
29    fn next(&mut self, value: Item) { self.observers.next(value) }
30
31    #[inline]
32    fn error(&mut self, err: Err) { self.observers.error(err) }
33
34    #[inline]
35    fn complete(&mut self) { self.observers.complete() }
36
37    #[inline]
38    fn is_stopped(&self) -> bool { self.observers.is_stopped() }
39  };
40}
41
42impl<Item, Err, U, O> Observer for Subject<Arc<Mutex<Vec<O>>>, U>
43where
44  O: Observer<Item = Item, Err = Err> + SubscriptionLike,
45  Item: Clone,
46  Err: Clone,
47{
48  type Item = Item;
49  type Err = Err;
50
51  impl_observer!();
52}
53
54impl<Item, Err, U, O> Observer for Subject<Rc<RefCell<Vec<O>>>, U>
55where
56  O: Observer<Item = Item, Err = Err> + SubscriptionLike,
57  Item: Clone,
58  Err: Clone,
59{
60  type Item = Item;
61  type Err = Err;
62
63  impl_observer!();
64}
65
66impl<Item, Err, U, O> Observer for Subject<Box<Vec<O>>, U>
67where
68  O: Observer<Item = Item, Err = Err> + SubscriptionLike,
69  Item: Clone,
70  Err: Clone,
71{
72  type Item = Item;
73  type Err = Err;
74  impl_observer!();
75}
76
77#[derive(Default, Clone)]
78pub(crate) struct SubjectObserver<V> {
79  pub(crate) observers: V,
80  is_stopped: bool,
81}
82
83impl<Item, Err, O> Observer for SubjectObserver<Arc<Mutex<Vec<O>>>>
84where
85  O: Publisher<Item = Item, Err = Err>,
86  Item: Clone,
87  Err: Clone,
88{
89  type Item = Item;
90  type Err = Err;
91  fn next(&mut self, value: Item) {
92    {
93      let mut vec = self.observers.lock().unwrap();
94      let not_done: Vec<O> = vec
95        .drain(..)
96        .map(|mut o| {
97          o.next(value.clone());
98          o
99        })
100        .filter(|o| !o.is_finished())
101        .collect();
102      for p in not_done {
103        vec.push(p);
104      }
105    }
106  }
107
108  fn error(&mut self, err: Err) {
109    let mut observers = self.observers.lock().unwrap();
110    observers
111      .iter_mut()
112      .for_each(|subscriber| subscriber.error(err.clone()));
113    observers.clear();
114    self.is_stopped = true;
115  }
116
117  fn complete(&mut self) {
118    let mut observers = self.observers.lock().unwrap();
119    observers
120      .iter_mut()
121      .for_each(|subscriber| subscriber.complete());
122    observers.clear();
123    self.is_stopped = true;
124  }
125
126  #[inline]
127  fn is_stopped(&self) -> bool { self.is_stopped }
128}
129
130impl<Item, Err, O> Observer for SubjectObserver<Rc<RefCell<Vec<O>>>>
131where
132  O: Publisher<Item = Item, Err = Err>,
133  Item: Clone,
134  Err: Clone,
135{
136  type Item = Item;
137  type Err = Err;
138  fn next(&mut self, value: Item) {
139    {
140      let mut vec = self.observers.borrow_mut();
141      let not_done: Vec<O> = vec
142        .drain(..)
143        .map(|mut o| {
144          o.next(value.clone());
145          o
146        })
147        .filter(|o| !o.is_finished())
148        .collect();
149      for p in not_done {
150        vec.push(p);
151      }
152    }
153  }
154
155  fn error(&mut self, err: Err) {
156    let mut observers = self.observers.borrow_mut();
157    observers
158      .iter_mut()
159      .for_each(|subscriber| subscriber.error(err.clone()));
160    observers.clear();
161    self.is_stopped = true;
162  }
163
164  fn complete(&mut self) {
165    let mut observers = self.observers.borrow_mut();
166    observers
167      .iter_mut()
168      .for_each(|subscriber| subscriber.complete());
169    observers.clear();
170    self.is_stopped = true;
171  }
172
173  #[inline]
174  fn is_stopped(&self) -> bool { self.is_stopped }
175}
176
177impl<Item, Err, O> Observer for SubjectObserver<Box<Vec<O>>>
178where
179  O: Publisher<Item = Item, Err = Err>,
180  Item: Clone,
181  Err: Clone,
182{
183  type Item = Item;
184  type Err = Err;
185  fn next(&mut self, value: Item) {
186    {
187      let vec = &mut self.observers;
188      let not_done: Vec<O> = vec
189        .drain(..)
190        .map(|mut o| {
191          o.next(value.clone());
192          o
193        })
194        .filter(|o| !o.is_finished())
195        .collect();
196      for p in not_done {
197        vec.push(p);
198      }
199    }
200  }
201
202  fn error(&mut self, err: Err) {
203    let observers = &mut self.observers;
204    observers
205      .iter_mut()
206      .for_each(|subscriber| subscriber.error(err.clone()));
207    observers.clear();
208    self.is_stopped = true;
209  }
210
211  fn complete(&mut self) {
212    let observers = &mut self.observers;
213    observers
214      .iter_mut()
215      .for_each(|subscriber| subscriber.complete());
216    observers.clear();
217    self.is_stopped = true;
218  }
219
220  #[inline]
221  fn is_stopped(&self) -> bool { self.is_stopped }
222}
223impl<O, S> Debug for Subject<Arc<Mutex<Vec<O>>>, S> {
224  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
225    f.debug_struct("LocalSubject")
226      .field(
227        "observer_count",
228        &self.observers.observers.lock().unwrap().len(),
229      )
230      .finish()
231  }
232}
233impl<O, S> Debug for Subject<Rc<RefCell<Vec<O>>>, S> {
234  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
235    f.debug_struct("LocalSubject")
236      .field(
237        "observer_count",
238        &self.observers.observers.borrow_mut().len(),
239      )
240      .finish()
241  }
242}
243
244impl<O, S> Debug for Subject<Box<Vec<O>>, S> {
245  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
246    f.debug_struct("LocalSubject")
247      .field("observer_count", &self.observers.observers.len())
248      .finish()
249  }
250}
251#[cfg(test)]
252mod test {
253  use super::*;
254  use futures::executor::ThreadPool;
255  use std::time::{Duration, Instant};
256
257  #[test]
258  fn base_data_flow() {
259    let mut i = 0;
260    {
261      let mut broadcast = LocalSubject::new();
262      broadcast.clone().subscribe(|v| i = v * 2);
263      broadcast.next(1);
264    }
265    assert_eq!(i, 2);
266  }
267
268  #[test]
269  #[should_panic]
270  fn error() {
271    let mut broadcast = LocalSubject::new();
272    broadcast
273      .clone()
274      .subscribe_err(|_: i32| {}, |e: _| panic!("{}", e));
275    broadcast.next(1);
276
277    broadcast.error(&"should panic!");
278  }
279
280  #[test]
281  fn unsubscribe() {
282    let mut i = 0;
283
284    {
285      let mut subject = LocalSubject::new();
286      subject.clone().subscribe(|v| i = v).unsubscribe();
287      subject.next(100);
288    }
289
290    assert_eq!(i, 0);
291  }
292
293  #[test]
294  fn empty_local_subject_can_convert_into_shared() {
295    let pool = ThreadPool::new().unwrap();
296    use std::sync::{Arc, Mutex};
297    let value = Arc::new(Mutex::new(0));
298    let c_v = value.clone();
299    let subject = SharedSubject::new();
300    let mut subject_c = subject.clone();
301    let stamp = Instant::now();
302    pool.schedule(
303      move |_| {
304        subject_c.next(100);
305        subject_c.complete();
306      },
307      Some(Duration::from_millis(25)),
308      (),
309    );
310    subject
311      .clone()
312      .into_shared()
313      .observe_on(pool)
314      .into_shared()
315      .subscribe_blocking(move |v: i32| {
316        *value.lock().unwrap() = v;
317      });
318    assert!(stamp.elapsed() > Duration::from_millis(25));
319    assert_eq!(*c_v.lock().unwrap(), 100);
320  }
321
322  #[test]
323  fn subject_subscribe_subject() {
324    let mut local = LocalSubject::new();
325    let local2 = LocalSubject::new();
326    local.clone().actual_subscribe(Subscriber {
327      observer: local2.observers,
328      subscription: local2.subscription,
329    });
330    local.next(1);
331    local.error(2);
332  }
333}