another_rxrust/subjects/
behavior_subject.rs1use crate::prelude::*;
2use std::sync::{Arc, RwLock};
3
4#[derive(Clone)]
5pub struct BehaviorSubject<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 subject: Arc<subject::Subject<'a, Item>>,
10 last_item: Arc<RwLock<Option<Item>>>,
11 last_error: Arc<RwLock<Option<RxError>>>,
12}
13
14impl<'a, Item> BehaviorSubject<'a, Item>
15where
16 Item: Clone + Send + Sync,
17{
18 pub fn new(initial: Item) -> BehaviorSubject<'a, Item> {
19 BehaviorSubject {
20 subject: Arc::new(subjects::Subject::new()),
21 last_item: Arc::new(RwLock::new(Some(initial))),
22 last_error: Arc::new(RwLock::new(None)),
23 }
24 }
25
26 pub fn next(&self, item: Item) {
27 *self.last_item.write().unwrap() = Some(item.clone());
28 self.subject.next(item);
29 }
30 pub fn error(&self, err: RxError) {
31 *self.last_error.write().unwrap() = Some(err.clone());
32 self.subject.error(err);
33 }
34 pub fn complete(&self) {
35 *self.last_item.write().unwrap() = None;
36 self.subject.complete();
37 }
38 pub fn observable(&self) -> Observable<'a, Item> {
39 let last_item = Arc::clone(&self.last_item);
40 let last_error = Arc::clone(&self.last_error);
41 let subject = Arc::clone(&self.subject);
42
43 Observable::create(move |s| {
44 {
45 let last_item = &*last_item.read().unwrap();
46 let last_error = &*last_error.read().unwrap();
47
48 if let Some(err) = last_error {
49 s.error(err.clone());
50 return;
51 }
52 if let Some(item) = last_item {
53 s.next(item.clone());
54 } else {
55 s.complete();
56 return;
57 }
58 }
59
60 let sbsc = Arc::new(RwLock::new(None::<Subscription>));
61 {
62 let sbsc = Arc::clone(&sbsc);
63 s.set_on_unsubscribe(move || {
64 if let Some(sbsc) = &*sbsc.read().unwrap() {
65 sbsc.unsubscribe();
66 }
67 });
68 }
69
70 let s_next = s.clone();
71 let s_error = s.clone();
72 let s_complete = s.clone();
73 *sbsc.write().unwrap() = Some(subject.observable().subscribe(
74 move |x| s_next.next(x),
75 move |e| s_error.error(e),
76 move || {
77 s_complete.complete();
78 },
79 ));
80 })
81 }
82}
83
84#[cfg(test)]
85mod tset {
86 use crate::prelude::*;
87 use std::{thread, time};
88
89 #[test]
90 fn basic() {
91 let sbj = subjects::BehaviorSubject::<i32>::new(100);
92
93 println!("start #1");
94 sbj.observable().subscribe(
95 |x| println!("#1 next {}", x),
96 |e| println!("#1 error {:?}", e),
97 || println!("#1 complete"),
98 );
99
100 sbj.next(1);
101 sbj.next(2);
102
103 println!("start #2");
104 sbj.observable().subscribe(
105 |x| println!("#2 next {}", x),
106 |e| println!("#2 error {:?}", e),
107 || println!("#2 complete"),
108 );
109
110 sbj.next(3);
111 sbj.complete();
112
113 println!("start #3");
114 sbj.observable().subscribe(
115 |x| println!("#3 next {}", x),
116 |e| println!("#3 error {:?}", e),
117 || println!("#3 complete"),
118 );
119 }
120
121 #[test]
122 fn double() {
123 let sbj = subjects::BehaviorSubject::<i32>::new(100);
124
125 println!("start #1");
126 let sbsc1 = sbj.observable().subscribe(
127 |x| println!("#1 next {}", x),
128 |e| {
129 println!(
130 "#1 error {:?}",
131 e.downcast_ref::<&str>()
132 )
133 },
134 || println!("#1 complete"),
135 );
136
137 sbj.next(1);
138 sbj.next(2);
139 sbj.next(3);
140
141 println!("start #2");
142 sbj.observable().subscribe(
143 |x| println!("#2 next {}", x),
144 |e| {
145 println!(
146 "#2 error {:?}",
147 e.downcast_ref::<&str>()
148 )
149 },
150 || println!("#2 complete"),
151 );
152
153 sbj.next(4);
154 sbj.next(5);
155 sbj.next(6);
156
157 sbsc1.unsubscribe();
158
159 sbj.next(7);
160 sbj.next(8);
161 sbj.next(9);
162
163 sbj.error(RxError::from_error("ERR!"));
164
165 println!("start #3");
166 sbj.observable().subscribe(
167 |x| println!("#3 next {}", x),
168 |e| {
169 println!(
170 "#3 error {:?}",
171 e.downcast_ref::<&str>()
172 )
173 },
174 || println!("#3 complete"),
175 );
176 }
177
178 #[test]
179 fn thread() {
180 let sbj = subjects::BehaviorSubject::<i32>::new(100);
181
182 let sbj_thread = sbj.clone();
183 let th = thread::spawn(move || {
184 for n in 0..10 {
185 thread::sleep(time::Duration::from_millis(100));
186 sbj_thread.next(n);
187 }
188 sbj_thread.complete();
189 });
190
191 println!("start #1");
192 let sbsc1 = sbj.observable().subscribe(
193 |x| println!("#1 next {}", x),
194 |e| println!("#1 error {:?}", e),
195 || println!("#1 complete"),
196 );
197
198 thread::sleep(time::Duration::from_millis(300));
199
200 println!("start #2");
201 sbj.observable().subscribe(
202 |x| println!("#2 next {}", x),
203 |e| println!("#2 error {:?}", e),
204 || println!("#2 complete"),
205 );
206
207 thread::sleep(time::Duration::from_millis(300));
208
209 println!("unsbscribe #1");
210 sbsc1.unsubscribe();
211
212 th.join().ok();
213 }
214}