another_rxrust/subjects/
behavior_subject.rs

1use crate::prelude::*;
2use std::sync::{Arc, RwLock};
3
4#[derive(Clone)]
5pub struct BehaviorSubject<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  subject: Arc<subject::Subject<'a, Item>>,
10  last_item: Arc<RwLock<Option<Item>>>,
11  last_error: Arc<RwLock<Option<RxError>>>,
12}
13
14impl<'a, Item> BehaviorSubject<'a, Item>
15where
16  Item: Clone + Send + Sync,
17{
18  pub fn new(initial: Item) -> BehaviorSubject<'a, Item> {
19    BehaviorSubject {
20      subject: Arc::new(subjects::Subject::new()),
21      last_item: Arc::new(RwLock::new(Some(initial))),
22      last_error: Arc::new(RwLock::new(None)),
23    }
24  }
25
26  pub fn next(&self, item: Item) {
27    *self.last_item.write().unwrap() = Some(item.clone());
28    self.subject.next(item);
29  }
30  pub fn error(&self, err: RxError) {
31    *self.last_error.write().unwrap() = Some(err.clone());
32    self.subject.error(err);
33  }
34  pub fn complete(&self) {
35    *self.last_item.write().unwrap() = None;
36    self.subject.complete();
37  }
38  pub fn observable(&self) -> Observable<'a, Item> {
39    let last_item = Arc::clone(&self.last_item);
40    let last_error = Arc::clone(&self.last_error);
41    let subject = Arc::clone(&self.subject);
42
43    Observable::create(move |s| {
44      {
45        let last_item = &*last_item.read().unwrap();
46        let last_error = &*last_error.read().unwrap();
47
48        if let Some(err) = last_error {
49          s.error(err.clone());
50          return;
51        }
52        if let Some(item) = last_item {
53          s.next(item.clone());
54        } else {
55          s.complete();
56          return;
57        }
58      }
59
60      let sbsc = Arc::new(RwLock::new(None::<Subscription>));
61      {
62        let sbsc = Arc::clone(&sbsc);
63        s.set_on_unsubscribe(move || {
64          if let Some(sbsc) = &*sbsc.read().unwrap() {
65            sbsc.unsubscribe();
66          }
67        });
68      }
69
70      let s_next = s.clone();
71      let s_error = s.clone();
72      let s_complete = s.clone();
73      *sbsc.write().unwrap() = Some(subject.observable().subscribe(
74        move |x| s_next.next(x),
75        move |e| s_error.error(e),
76        move || {
77          s_complete.complete();
78        },
79      ));
80    })
81  }
82}
83
84#[cfg(test)]
85mod tset {
86  use crate::prelude::*;
87  use std::{thread, time};
88
89  #[test]
90  fn basic() {
91    let sbj = subjects::BehaviorSubject::<i32>::new(100);
92
93    println!("start #1");
94    sbj.observable().subscribe(
95      |x| println!("#1 next {}", x),
96      |e| println!("#1 error {:?}", e),
97      || println!("#1 complete"),
98    );
99
100    sbj.next(1);
101    sbj.next(2);
102
103    println!("start #2");
104    sbj.observable().subscribe(
105      |x| println!("#2 next {}", x),
106      |e| println!("#2 error {:?}", e),
107      || println!("#2 complete"),
108    );
109
110    sbj.next(3);
111    sbj.complete();
112
113    println!("start #3");
114    sbj.observable().subscribe(
115      |x| println!("#3 next {}", x),
116      |e| println!("#3 error {:?}", e),
117      || println!("#3 complete"),
118    );
119  }
120
121  #[test]
122  fn double() {
123    let sbj = subjects::BehaviorSubject::<i32>::new(100);
124
125    println!("start #1");
126    let sbsc1 = sbj.observable().subscribe(
127      |x| println!("#1 next {}", x),
128      |e| {
129        println!(
130          "#1 error {:?}",
131          e.downcast_ref::<&str>()
132        )
133      },
134      || println!("#1 complete"),
135    );
136
137    sbj.next(1);
138    sbj.next(2);
139    sbj.next(3);
140
141    println!("start #2");
142    sbj.observable().subscribe(
143      |x| println!("#2 next {}", x),
144      |e| {
145        println!(
146          "#2 error {:?}",
147          e.downcast_ref::<&str>()
148        )
149      },
150      || println!("#2 complete"),
151    );
152
153    sbj.next(4);
154    sbj.next(5);
155    sbj.next(6);
156
157    sbsc1.unsubscribe();
158
159    sbj.next(7);
160    sbj.next(8);
161    sbj.next(9);
162
163    sbj.error(RxError::from_error("ERR!"));
164
165    println!("start #3");
166    sbj.observable().subscribe(
167      |x| println!("#3 next {}", x),
168      |e| {
169        println!(
170          "#3 error {:?}",
171          e.downcast_ref::<&str>()
172        )
173      },
174      || println!("#3 complete"),
175    );
176  }
177
178  #[test]
179  fn thread() {
180    let sbj = subjects::BehaviorSubject::<i32>::new(100);
181
182    let sbj_thread = sbj.clone();
183    let th = thread::spawn(move || {
184      for n in 0..10 {
185        thread::sleep(time::Duration::from_millis(100));
186        sbj_thread.next(n);
187      }
188      sbj_thread.complete();
189    });
190
191    println!("start #1");
192    let sbsc1 = sbj.observable().subscribe(
193      |x| println!("#1 next {}", x),
194      |e| println!("#1 error {:?}", e),
195      || println!("#1 complete"),
196    );
197
198    thread::sleep(time::Duration::from_millis(300));
199
200    println!("start #2");
201    sbj.observable().subscribe(
202      |x| println!("#2 next {}", x),
203      |e| println!("#2 error {:?}", e),
204      || println!("#2 complete"),
205    );
206
207    thread::sleep(time::Duration::from_millis(300));
208
209    println!("unsbscribe #1");
210    sbsc1.unsubscribe();
211
212    th.join().ok();
213  }
214}