another_rxrust/
observer.rs

1use crate::internals::function_wrapper::*;
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct Observer<'a, T>
7where
8  T: Clone + Send + Sync,
9{
10  fn_next: FunctionWrapper<'a, T, ()>,
11  fn_error: FunctionWrapper<'a, RxError, ()>,
12  fn_complete: FunctionWrapper<'a, (), ()>,
13  fn_on_unsubscribe: Arc<RwLock<Option<FunctionWrapper<'a, (), ()>>>>,
14}
15
16impl<'a, T> Observer<'a, T>
17where
18  T: Clone + Send + Sync,
19{
20  pub fn new<Next, Error, Complete>(
21    next: Next,
22    error: Error,
23    complete: Complete,
24  ) -> Observer<'a, T>
25  where
26    Next: Fn(T) + Send + Sync + 'a,
27    Error: Fn(RxError) + Send + Sync + 'a,
28    Complete: Fn() -> () + Send + Sync + 'a,
29  {
30    Observer::<T> {
31      fn_next: FunctionWrapper::new(next),
32      fn_error: FunctionWrapper::new(error),
33      fn_complete: FunctionWrapper::new(move |_| complete()),
34      fn_on_unsubscribe: Arc::new(RwLock::new(None)),
35    }
36  }
37  pub fn next(&self, x: T) {
38    self.fn_next.call_if_available(x);
39  }
40  pub fn error(&self, x: RxError) {
41    self.fn_error.call_and_clear_if_available(x);
42  }
43  pub fn complete(&self) {
44    self.fn_complete.call_and_clear_if_available(());
45  }
46  pub fn unsubscribe(&self) {
47    self.fn_next.clear();
48    self.fn_error.clear();
49    self.fn_complete.clear();
50    if let Some(f) = &*self.fn_on_unsubscribe.read().unwrap() {
51      f.call(());
52    }
53    *self.fn_on_unsubscribe.write().unwrap() = None;
54  }
55  pub fn is_subscribed(&self) -> bool {
56    self.fn_next.exists() && self.fn_error.exists() && self.fn_complete.exists()
57  }
58  pub(crate) fn set_on_unsubscribe<F>(&self, f: F)
59  where
60    F: Fn() -> () + Send + Sync + 'a,
61  {
62    *self.fn_on_unsubscribe.write().unwrap() =
63      Some(FunctionWrapper::new(move |_| f()));
64  }
65}
66
67#[cfg(test)]
68mod test {
69  use crate::prelude::*;
70  use std::thread;
71
72  #[test]
73  fn basic() {
74    let ob = Observer::new(
75      print_next_fmt!("{}"),
76      print_error_as!(&str),
77      print_complete!(),
78    );
79    ob.next(1);
80    ob.next(2);
81    ob.error(RxError::from_error("ERR!"));
82    ob.complete();
83  }
84
85  #[test]
86  fn basic_with_capture() {
87    let gain = 100;
88    let ob = Observer::new(
89      move |x| println!("next {}", x + gain),
90      print_error_as!(&str),
91      print_complete!(),
92    );
93    ob.next(1);
94    ob.next(2);
95    ob.error(RxError::from_error("ERR!"));
96    ob.complete();
97  }
98
99  #[test]
100  fn close() {
101    let ob = Observer::new(
102      print_next_fmt!("{}"),
103      print_error!(),
104      print_complete!(),
105    );
106    ob.next(1);
107    ob.next(2);
108    ob.unsubscribe();
109  }
110
111  #[test]
112  fn clone_into_thread() {
113    let ob = Observer::new(
114      print_next_fmt!("{}"),
115      print_error!(),
116      print_complete!(),
117    );
118    let a = ob.clone();
119    let b = ob.clone();
120    let th_a = thread::spawn(move || {
121      for n in 0..100 {
122        a.next(n);
123      }
124      a.complete();
125    });
126    let th_b = thread::spawn(move || {
127      for n in 0..10 {
128        b.next(100 + n);
129      }
130      b.complete();
131    });
132
133    th_a.join().ok();
134    th_b.join().ok();
135  }
136}