another_rxrust/operators/
on_error_resume_next.rs

1use crate::internals::{
2  function_wrapper::FunctionWrapper, stream_controller::StreamController,
3};
4use crate::prelude::*;
5
6#[derive(Clone)]
7pub struct OnErrorResumeNext<'a, Item>
8where
9  Item: Clone + Send + Sync,
10{
11  resume_f: FunctionWrapper<'a, RxError, Observable<'a, Item>>,
12}
13
14impl<'a, Item> OnErrorResumeNext<'a, Item>
15where
16  Item: Clone + Send + Sync,
17{
18  pub fn new<F>(f: F) -> OnErrorResumeNext<'a, Item>
19  where
20    F: Fn(RxError) -> Observable<'a, Item> + Send + Sync + 'a,
21  {
22    OnErrorResumeNext { resume_f: FunctionWrapper::new(f) }
23  }
24  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
25    let f = self.resume_f.clone();
26
27    Observable::<Item>::create(move |s| {
28      let f = f.clone();
29
30      let sctl = StreamController::new(s);
31      let sctl_next = sctl.clone();
32      let sctl_error = sctl.clone();
33      let sctl_complete = sctl.clone();
34
35      source.inner_subscribe(sctl.new_observer(
36        move |_, x| {
37          sctl_next.sink_next(x);
38        },
39        move |serial, e| {
40          sctl_error.upstream_abort_observe(&serial);
41
42          let sctl_error_next = sctl_error.clone();
43          let sctl_error_error = sctl_error.clone();
44          let sctl_error_complete = sctl_error.clone();
45
46          f.call(e).inner_subscribe(sctl_error.new_observer(
47            move |_, xx| {
48              sctl_error_next.sink_next(xx);
49            },
50            move |_, ee| {
51              sctl_error_error.sink_error(ee);
52            },
53            move |serial| {
54              sctl_error_complete.sink_complete(&serial);
55            },
56          ));
57        },
58        move |serial| {
59          sctl_complete.sink_complete(&serial);
60        },
61      ));
62    })
63  }
64}
65
66impl<'a, Item> Observable<'a, Item>
67where
68  Item: Clone + Send + Sync,
69{
70  pub fn on_error_resume_next<F>(&self, f: F) -> Observable<'a, Item>
71  where
72    F: Fn(RxError) -> Observable<'a, Item> + Send + Sync + 'a,
73  {
74    OnErrorResumeNext::new(f).execute(self.clone())
75  }
76}
77
78#[cfg(test)]
79mod tset {
80  use crate::prelude::*;
81
82  #[test]
83  fn basic() {
84    observables::error(RxError::from_error("ERR!"))
85      .on_error_resume_next(|_e| observables::just(1))
86      .subscribe(
87        print_next_fmt!("{}"),
88        print_error_as!(&str),
89        print_complete!(),
90      );
91  }
92
93  #[test]
94  fn just() {
95    observables::just(1)
96      .on_error_resume_next(|_e| observables::just(1))
97      .subscribe(
98        print_next_fmt!("{}"),
99        print_error!(),
100        print_complete!(),
101      );
102  }
103}