another_rxrust/operators/
on_error_resume_next.rs1use crate::internals::{
2 function_wrapper::FunctionWrapper, stream_controller::StreamController,
3};
4use crate::prelude::*;
5
6#[derive(Clone)]
7pub struct OnErrorResumeNext<'a, Item>
8where
9 Item: Clone + Send + Sync,
10{
11 resume_f: FunctionWrapper<'a, RxError, Observable<'a, Item>>,
12}
13
14impl<'a, Item> OnErrorResumeNext<'a, Item>
15where
16 Item: Clone + Send + Sync,
17{
18 pub fn new<F>(f: F) -> OnErrorResumeNext<'a, Item>
19 where
20 F: Fn(RxError) -> Observable<'a, Item> + Send + Sync + 'a,
21 {
22 OnErrorResumeNext { resume_f: FunctionWrapper::new(f) }
23 }
24 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
25 let f = self.resume_f.clone();
26
27 Observable::<Item>::create(move |s| {
28 let f = f.clone();
29
30 let sctl = StreamController::new(s);
31 let sctl_next = sctl.clone();
32 let sctl_error = sctl.clone();
33 let sctl_complete = sctl.clone();
34
35 source.inner_subscribe(sctl.new_observer(
36 move |_, x| {
37 sctl_next.sink_next(x);
38 },
39 move |serial, e| {
40 sctl_error.upstream_abort_observe(&serial);
41
42 let sctl_error_next = sctl_error.clone();
43 let sctl_error_error = sctl_error.clone();
44 let sctl_error_complete = sctl_error.clone();
45
46 f.call(e).inner_subscribe(sctl_error.new_observer(
47 move |_, xx| {
48 sctl_error_next.sink_next(xx);
49 },
50 move |_, ee| {
51 sctl_error_error.sink_error(ee);
52 },
53 move |serial| {
54 sctl_error_complete.sink_complete(&serial);
55 },
56 ));
57 },
58 move |serial| {
59 sctl_complete.sink_complete(&serial);
60 },
61 ));
62 })
63 }
64}
65
66impl<'a, Item> Observable<'a, Item>
67where
68 Item: Clone + Send + Sync,
69{
70 pub fn on_error_resume_next<F>(&self, f: F) -> Observable<'a, Item>
71 where
72 F: Fn(RxError) -> Observable<'a, Item> + Send + Sync + 'a,
73 {
74 OnErrorResumeNext::new(f).execute(self.clone())
75 }
76}
77
78#[cfg(test)]
79mod tset {
80 use crate::prelude::*;
81
82 #[test]
83 fn basic() {
84 observables::error(RxError::from_error("ERR!"))
85 .on_error_resume_next(|_e| observables::just(1))
86 .subscribe(
87 print_next_fmt!("{}"),
88 print_error_as!(&str),
89 print_complete!(),
90 );
91 }
92
93 #[test]
94 fn just() {
95 observables::just(1)
96 .on_error_resume_next(|_e| observables::just(1))
97 .subscribe(
98 print_next_fmt!("{}"),
99 print_error!(),
100 print_complete!(),
101 );
102 }
103}