rxrust/subject/
local_subject.rs1use crate::prelude::*;
2use std::cell::RefCell;
3use std::rc::Rc;
4
5type RcPublishers<P> = Rc<RefCell<Vec<Box<P>>>>;
6type _LocalSubject<P> = Subject<RcPublishers<P>, LocalSubscription>;
7type _LocalBehaviorSubject<P, Item> =
8 BehaviorSubject<RcPublishers<P>, LocalSubscription, Item>;
9
10pub type LocalSubject<'a, Item, Err> =
11 _LocalSubject<dyn Publisher<Item = Item, Err = Err> + 'a>;
12
13pub type LocalSubjectRef<'a, Item, Err> =
14 _LocalSubject<dyn Publisher<Item = &'a Item, Err = Err> + 'a>;
15
16pub type LocalSubjectErrRef<'a, Item, Err> =
17 _LocalSubject<dyn Publisher<Item = Item, Err = &'a Err> + 'a>;
18
19pub type LocalSubjectRefAll<'a, Item, Err> =
20 _LocalSubject<dyn Publisher<Item = &'a Item, Err = &'a Err> + 'a>;
21
22pub type LocalBehaviorSubject<'a, Item, Err> =
23 _LocalBehaviorSubject<dyn Publisher<Item = Item, Err = Err> + 'a, Item>;
24
25pub type LocalBehaviorSubjectRef<'a, Item, Err> =
26 _LocalBehaviorSubject<dyn Publisher<Item = &'a Item, Err = Err> + 'a, Item>;
27
28pub type LocalBehaviorSubjectErrRef<'a, Item, Err> =
29 _LocalBehaviorSubject<dyn Publisher<Item = Item, Err = &'a Err> + 'a, Item>;
30
31pub type LocalBehaviorSubjectRefAll<'a, Item, Err> = _LocalBehaviorSubject<
32 dyn Publisher<Item = &'a Item, Err = &'a Err> + 'a,
33 Item,
34>;
35
36impl<'a, Item, Err> LocalSubject<'a, Item, Err> {
37 #[inline]
38 pub fn new() -> Self
39 where
40 Self: Default,
41 {
42 Self::default()
43 }
44 #[inline]
45 pub fn subscribed_size(&self) -> usize {
46 self.observers.observers.borrow().len()
47 }
48}
49
50impl<'a, Item, Err> Observable for LocalSubject<'a, Item, Err> {
51 type Item = Item;
52 type Err = Err;
53}
54
55impl<'a, Item, Err> LocalObservable<'a> for LocalSubject<'a, Item, Err> {
56 type Unsub = LocalSubscription;
57 fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
58 self,
59 subscriber: Subscriber<O, LocalSubscription>,
60 ) -> LocalSubscription {
61 let subscription = subscriber.subscription.clone();
62 self.subscription.add(subscription.clone());
63 self
64 .observers
65 .observers
66 .borrow_mut()
67 .push(Box::new(subscriber));
68 subscription
69 }
70}
71
72impl<'a, Item, Err> LocalBehaviorSubject<'a, Item, Err> {
73 #[inline]
74 pub fn new(value: Item) -> Self
75 where
76 Self: Default,
77 {
78 LocalBehaviorSubject {
79 observers: Default::default(),
80 subscription: Default::default(),
81 value,
82 }
83 }
84 #[inline]
85 pub fn subscribed_size(&self) -> usize {
86 self.observers.observers.borrow().len()
87 }
88}
89
90impl<'a, Item, Err> Observable for LocalBehaviorSubject<'a, Item, Err> {
91 type Item = Item;
92 type Err = Err;
93}
94
95impl<'a, Item, Err> LocalObservable<'a>
96 for LocalBehaviorSubject<'a, Item, Err>
97{
98 type Unsub = LocalSubscription;
99 fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
100 self,
101 subscriber: Subscriber<O, LocalSubscription>,
102 ) -> LocalSubscription {
103 let subscription = subscriber.subscription.clone();
104 self.subscription.add(subscription.clone());
105
106 self
107 .observers
108 .observers
109 .borrow_mut()
110 .push(Box::new(subscriber));
111
112 if !subscription.is_closed() {
113 self
114 .observers
115 .observers
116 .borrow_mut()
117 .last_mut()
118 .unwrap()
119 .next(self.value);
120 }
121
122 subscription
123 }
124}
125
126#[cfg(test)]
127mod test {
128 use crate::prelude::*;
129
130 #[test]
131 fn smoke() {
132 let mut test_code = 1;
133 {
134 let mut subject = LocalSubject::new();
135 subject.clone().subscribe(|v| {
136 test_code = v;
137 });
138 subject.next(2);
139
140 assert_eq!(subject.subscribed_size(), 1);
141 }
142 assert_eq!(test_code, 2);
143 }
144
145 #[test]
146 fn emit_ref() {
147 let mut check = 0;
148
149 {
150 let mut subject = LocalSubjectRef::new();
151 subject.clone().subscribe(|v| {
152 check = *v;
153 });
154 subject.next(&1);
155 }
156 assert_eq!(check, 1);
157
158 {
159 let mut subject = LocalSubjectErrRef::new();
160 subject
161 .clone()
162 .subscribe_err(|_: ()| {}, |err| check = *err);
163 subject.error(&2);
164 }
165 assert_eq!(check, 2);
166
167 {
168 let mut subject = LocalSubjectRefAll::new();
169 subject.clone().subscribe_err(|v| check = *v, |_: &()| {});
170 subject.next(&1);
171 }
172 assert_eq!(check, 1);
173
174 {
175 let mut subject = LocalSubjectRefAll::new();
176 subject
177 .clone()
178 .subscribe_err(|_: &()| {}, |err| check = *err);
179 subject.error(&2);
180 }
181 assert_eq!(check, 2);
182 }
183}