1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use crate::prelude::*;

#[derive(Clone)]
pub struct Something<T>
where
  T: Clone + Send + Sync,
{
  value: Result<T, RxError>,
}

impl<T> Something<T>
where
  T: Clone + Send + Sync,
{
  pub fn success(x: T) -> Something<T> {
    Something { value: Ok(x) }
  }

  pub fn error(e: RxError) -> Something<T> {
    Something { value: Err(e) }
  }

  pub fn proceed<'a>(self) -> Observable<'a, T> {
    match self.value {
      Ok(x) => observables::just(x),
      Err(e) => observables::error(e),
    }
  }
}

#[cfg(test)]
mod test {
  use crate::prelude::*;
  use std::sync::{Arc, RwLock};

  #[test]
  fn basic() {
    let n = Arc::new(RwLock::new(0));
    let o = Observable::create(|s| {
      *n.write().unwrap() += 1;
      let x = *n.read().unwrap();
      s.next(x);
      s.error(RxError::from_error(x));
    });

    o.map(|x| utils::Something::success(x))
      .on_error_resume_next(|e| {
        if *e.downcast_ref::<i32>().unwrap() > 5 {
          observables::just(utils::Something::error(e)) // pass `retry()` and emit an error
        } else {
          observables::error(e) // trigger `retry()`
        }
      })
      .retry(0)
      .flat_map(|x| x.proceed())
      .subscribe(
        print_next_fmt!("{}"),
        print_error_as!(i32),
        print_complete!(),
      );
  }
}