another_rxrust/
observer.rs1use crate::internals::function_wrapper::*;
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct Observer<'a, T>
7where
8 T: Clone + Send + Sync,
9{
10 fn_next: FunctionWrapper<'a, T, ()>,
11 fn_error: FunctionWrapper<'a, RxError, ()>,
12 fn_complete: FunctionWrapper<'a, (), ()>,
13 fn_on_unsubscribe: Arc<RwLock<Option<FunctionWrapper<'a, (), ()>>>>,
14}
15
16impl<'a, T> Observer<'a, T>
17where
18 T: Clone + Send + Sync,
19{
20 pub fn new<Next, Error, Complete>(
21 next: Next,
22 error: Error,
23 complete: Complete,
24 ) -> Observer<'a, T>
25 where
26 Next: Fn(T) + Send + Sync + 'a,
27 Error: Fn(RxError) + Send + Sync + 'a,
28 Complete: Fn() -> () + Send + Sync + 'a,
29 {
30 Observer::<T> {
31 fn_next: FunctionWrapper::new(next),
32 fn_error: FunctionWrapper::new(error),
33 fn_complete: FunctionWrapper::new(move |_| complete()),
34 fn_on_unsubscribe: Arc::new(RwLock::new(None)),
35 }
36 }
37 pub fn next(&self, x: T) {
38 self.fn_next.call_if_available(x);
39 }
40 pub fn error(&self, x: RxError) {
41 self.fn_error.call_and_clear_if_available(x);
42 }
43 pub fn complete(&self) {
44 self.fn_complete.call_and_clear_if_available(());
45 }
46 pub fn unsubscribe(&self) {
47 self.fn_next.clear();
48 self.fn_error.clear();
49 self.fn_complete.clear();
50 if let Some(f) = &*self.fn_on_unsubscribe.read().unwrap() {
51 f.call(());
52 }
53 *self.fn_on_unsubscribe.write().unwrap() = None;
54 }
55 pub fn is_subscribed(&self) -> bool {
56 self.fn_next.exists() && self.fn_error.exists() && self.fn_complete.exists()
57 }
58 pub(crate) fn set_on_unsubscribe<F>(&self, f: F)
59 where
60 F: Fn() -> () + Send + Sync + 'a,
61 {
62 *self.fn_on_unsubscribe.write().unwrap() =
63 Some(FunctionWrapper::new(move |_| f()));
64 }
65}
66
67#[cfg(test)]
68mod test {
69 use crate::prelude::*;
70 use std::thread;
71
72 #[test]
73 fn basic() {
74 let ob = Observer::new(
75 print_next_fmt!("{}"),
76 print_error_as!(&str),
77 print_complete!(),
78 );
79 ob.next(1);
80 ob.next(2);
81 ob.error(RxError::from_error("ERR!"));
82 ob.complete();
83 }
84
85 #[test]
86 fn basic_with_capture() {
87 let gain = 100;
88 let ob = Observer::new(
89 move |x| println!("next {}", x + gain),
90 print_error_as!(&str),
91 print_complete!(),
92 );
93 ob.next(1);
94 ob.next(2);
95 ob.error(RxError::from_error("ERR!"));
96 ob.complete();
97 }
98
99 #[test]
100 fn close() {
101 let ob = Observer::new(
102 print_next_fmt!("{}"),
103 print_error!(),
104 print_complete!(),
105 );
106 ob.next(1);
107 ob.next(2);
108 ob.unsubscribe();
109 }
110
111 #[test]
112 fn clone_into_thread() {
113 let ob = Observer::new(
114 print_next_fmt!("{}"),
115 print_error!(),
116 print_complete!(),
117 );
118 let a = ob.clone();
119 let b = ob.clone();
120 let th_a = thread::spawn(move || {
121 for n in 0..100 {
122 a.next(n);
123 }
124 a.complete();
125 });
126 let th_b = thread::spawn(move || {
127 for n in 0..10 {
128 b.next(100 + n);
129 }
130 b.complete();
131 });
132
133 th_a.join().ok();
134 th_b.join().ok();
135 }
136}