rxrust/subject/
local_subject.rs

1use crate::prelude::*;
2use std::cell::RefCell;
3use std::rc::Rc;
4
5type RcPublishers<P> = Rc<RefCell<Vec<Box<P>>>>;
6type _LocalSubject<P> = Subject<RcPublishers<P>, LocalSubscription>;
7type _LocalBehaviorSubject<P, Item> =
8  BehaviorSubject<RcPublishers<P>, LocalSubscription, Item>;
9
10pub type LocalSubject<'a, Item, Err> =
11  _LocalSubject<dyn Publisher<Item = Item, Err = Err> + 'a>;
12
13pub type LocalSubjectRef<'a, Item, Err> =
14  _LocalSubject<dyn Publisher<Item = &'a Item, Err = Err> + 'a>;
15
16pub type LocalSubjectErrRef<'a, Item, Err> =
17  _LocalSubject<dyn Publisher<Item = Item, Err = &'a Err> + 'a>;
18
19pub type LocalSubjectRefAll<'a, Item, Err> =
20  _LocalSubject<dyn Publisher<Item = &'a Item, Err = &'a Err> + 'a>;
21
22pub type LocalBehaviorSubject<'a, Item, Err> =
23  _LocalBehaviorSubject<dyn Publisher<Item = Item, Err = Err> + 'a, Item>;
24
25pub type LocalBehaviorSubjectRef<'a, Item, Err> =
26  _LocalBehaviorSubject<dyn Publisher<Item = &'a Item, Err = Err> + 'a, Item>;
27
28pub type LocalBehaviorSubjectErrRef<'a, Item, Err> =
29  _LocalBehaviorSubject<dyn Publisher<Item = Item, Err = &'a Err> + 'a, Item>;
30
31pub type LocalBehaviorSubjectRefAll<'a, Item, Err> = _LocalBehaviorSubject<
32  dyn Publisher<Item = &'a Item, Err = &'a Err> + 'a,
33  Item,
34>;
35
36impl<'a, Item, Err> LocalSubject<'a, Item, Err> {
37  #[inline]
38  pub fn new() -> Self
39  where
40    Self: Default,
41  {
42    Self::default()
43  }
44  #[inline]
45  pub fn subscribed_size(&self) -> usize {
46    self.observers.observers.borrow().len()
47  }
48}
49
50impl<'a, Item, Err> Observable for LocalSubject<'a, Item, Err> {
51  type Item = Item;
52  type Err = Err;
53}
54
55impl<'a, Item, Err> LocalObservable<'a> for LocalSubject<'a, Item, Err> {
56  type Unsub = LocalSubscription;
57  fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
58    self,
59    subscriber: Subscriber<O, LocalSubscription>,
60  ) -> LocalSubscription {
61    let subscription = subscriber.subscription.clone();
62    self.subscription.add(subscription.clone());
63    self
64      .observers
65      .observers
66      .borrow_mut()
67      .push(Box::new(subscriber));
68    subscription
69  }
70}
71
72impl<'a, Item, Err> LocalBehaviorSubject<'a, Item, Err> {
73  #[inline]
74  pub fn new(value: Item) -> Self
75  where
76    Self: Default,
77  {
78    LocalBehaviorSubject {
79      observers: Default::default(),
80      subscription: Default::default(),
81      value,
82    }
83  }
84  #[inline]
85  pub fn subscribed_size(&self) -> usize {
86    self.observers.observers.borrow().len()
87  }
88}
89
90impl<'a, Item, Err> Observable for LocalBehaviorSubject<'a, Item, Err> {
91  type Item = Item;
92  type Err = Err;
93}
94
95impl<'a, Item, Err> LocalObservable<'a>
96  for LocalBehaviorSubject<'a, Item, Err>
97{
98  type Unsub = LocalSubscription;
99  fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
100    self,
101    subscriber: Subscriber<O, LocalSubscription>,
102  ) -> LocalSubscription {
103    let subscription = subscriber.subscription.clone();
104    self.subscription.add(subscription.clone());
105
106    self
107      .observers
108      .observers
109      .borrow_mut()
110      .push(Box::new(subscriber));
111
112    if !subscription.is_closed() {
113      self
114        .observers
115        .observers
116        .borrow_mut()
117        .last_mut()
118        .unwrap()
119        .next(self.value);
120    }
121
122    subscription
123  }
124}
125
126#[cfg(test)]
127mod test {
128  use crate::prelude::*;
129
130  #[test]
131  fn smoke() {
132    let mut test_code = 1;
133    {
134      let mut subject = LocalSubject::new();
135      subject.clone().subscribe(|v| {
136        test_code = v;
137      });
138      subject.next(2);
139
140      assert_eq!(subject.subscribed_size(), 1);
141    }
142    assert_eq!(test_code, 2);
143  }
144
145  #[test]
146  fn emit_ref() {
147    let mut check = 0;
148
149    {
150      let mut subject = LocalSubjectRef::new();
151      subject.clone().subscribe(|v| {
152        check = *v;
153      });
154      subject.next(&1);
155    }
156    assert_eq!(check, 1);
157
158    {
159      let mut subject = LocalSubjectErrRef::new();
160      subject
161        .clone()
162        .subscribe_err(|_: ()| {}, |err| check = *err);
163      subject.error(&2);
164    }
165    assert_eq!(check, 2);
166
167    {
168      let mut subject = LocalSubjectRefAll::new();
169      subject.clone().subscribe_err(|v| check = *v, |_: &()| {});
170      subject.next(&1);
171    }
172    assert_eq!(check, 1);
173
174    {
175      let mut subject = LocalSubjectRefAll::new();
176      subject
177        .clone()
178        .subscribe_err(|_: &()| {}, |err| check = *err);
179      subject.error(&2);
180    }
181    assert_eq!(check, 2);
182  }
183}