another_rxrust/utils/
something.rs

1use crate::prelude::*;
2
3#[derive(Clone)]
4pub struct Something<T>
5where
6  T: Clone + Send + Sync,
7{
8  value: Result<T, RxError>,
9}
10
11impl<T> Something<T>
12where
13  T: Clone + Send + Sync,
14{
15  pub fn success(x: T) -> Something<T> {
16    Something { value: Ok(x) }
17  }
18
19  pub fn error(e: RxError) -> Something<T> {
20    Something { value: Err(e) }
21  }
22
23  pub fn proceed<'a>(self) -> Observable<'a, T> {
24    match self.value {
25      Ok(x) => observables::just(x),
26      Err(e) => observables::error(e),
27    }
28  }
29}
30
31#[cfg(test)]
32mod test {
33  use crate::prelude::*;
34  use std::sync::{Arc, RwLock};
35
36  #[test]
37  fn basic() {
38    let n = Arc::new(RwLock::new(0));
39    let o = Observable::create(|s| {
40      *n.write().unwrap() += 1;
41      let x = *n.read().unwrap();
42      s.next(x);
43      s.error(RxError::from_error(x));
44    });
45
46    o.map(|x| utils::Something::success(x))
47      .on_error_resume_next(|e| {
48        if *e.downcast_ref::<i32>().unwrap() > 5 {
49          observables::just(utils::Something::error(e)) // pass `retry()` and emit an error
50        } else {
51          observables::error(e) // trigger `retry()`
52        }
53      })
54      .retry(0)
55      .flat_map(|x| x.proceed())
56      .subscribe(
57        print_next_fmt!("{}"),
58        print_error_as!(i32),
59        print_complete!(),
60      );
61  }
62}