another_rxrust/subjects/
async_subject.rs

1use crate::prelude::*;
2use std::sync::Arc;
3
4#[derive(Clone)]
5pub struct AsyncSubject<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  subject: Arc<subject::Subject<'a, Item>>,
10}
11
12impl<'a, Item> AsyncSubject<'a, Item>
13where
14  Item: Clone + Send + Sync,
15{
16  pub fn new() -> AsyncSubject<'a, Item> {
17    AsyncSubject {
18      subject: Arc::new(subjects::Subject::new()),
19    }
20  }
21
22  pub fn next(&self, item: Item) {
23    self.subject.next(item);
24  }
25  pub fn error(&self, err: RxError) {
26    self.subject.error(err);
27  }
28  pub fn complete(&self) {
29    self.subject.complete();
30  }
31  pub fn observable(&self) -> Observable<'a, Item> {
32    self.subject.observable().take_last(1).clone()
33  }
34}
35
36#[cfg(test)]
37mod tset {
38  use crate::prelude::*;
39
40  #[test]
41  fn basic() {
42    let sbj = subjects::AsyncSubject::new();
43
44    sbj.observable().subscribe(
45      print_next_fmt!("{}"),
46      print_error!(),
47      print_complete!(),
48    );
49
50    sbj.next(1);
51    sbj.next(2);
52    sbj.next(3);
53    sbj.complete();
54  }
55
56  #[test]
57  fn error() {
58    let sbj = subjects::AsyncSubject::new();
59
60    sbj.observable().subscribe(
61      print_next_fmt!("{}"),
62      print_error_as!(&str),
63      print_complete!(),
64    );
65
66    sbj.next(1);
67    sbj.next(2);
68    sbj.error(RxError::from_error("ERR!"));
69  }
70}