another_rxrust/subjects/
async_subject.rs1use crate::prelude::*;
2use std::sync::Arc;
3
4#[derive(Clone)]
5pub struct AsyncSubject<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 subject: Arc<subject::Subject<'a, Item>>,
10}
11
12impl<'a, Item> AsyncSubject<'a, Item>
13where
14 Item: Clone + Send + Sync,
15{
16 pub fn new() -> AsyncSubject<'a, Item> {
17 AsyncSubject {
18 subject: Arc::new(subjects::Subject::new()),
19 }
20 }
21
22 pub fn next(&self, item: Item) {
23 self.subject.next(item);
24 }
25 pub fn error(&self, err: RxError) {
26 self.subject.error(err);
27 }
28 pub fn complete(&self) {
29 self.subject.complete();
30 }
31 pub fn observable(&self) -> Observable<'a, Item> {
32 self.subject.observable().take_last(1).clone()
33 }
34}
35
36#[cfg(test)]
37mod tset {
38 use crate::prelude::*;
39
40 #[test]
41 fn basic() {
42 let sbj = subjects::AsyncSubject::new();
43
44 sbj.observable().subscribe(
45 print_next_fmt!("{}"),
46 print_error!(),
47 print_complete!(),
48 );
49
50 sbj.next(1);
51 sbj.next(2);
52 sbj.next(3);
53 sbj.complete();
54 }
55
56 #[test]
57 fn error() {
58 let sbj = subjects::AsyncSubject::new();
59
60 sbj.observable().subscribe(
61 print_next_fmt!("{}"),
62 print_error_as!(&str),
63 print_complete!(),
64 );
65
66 sbj.next(1);
67 sbj.next(2);
68 sbj.error(RxError::from_error("ERR!"));
69 }
70}