rx_rust/operators/error_handling/
catch.rs1use crate::disposable::subscription::Subscription;
2use crate::safe_lock_option;
3use crate::utils::types::{Mutable, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7 utils::types::MarkerType,
8};
9use educe::Educe;
10use std::marker::PhantomData;
11
12#[derive(Educe)]
39#[educe(Debug, Clone)]
40pub struct Catch<E0, OE, F> {
41 source: OE,
42 callback: F,
43 _marker: MarkerType<E0>,
44}
45
46impl<E0, OE, F> Catch<E0, OE, F> {
47 pub fn new<'or, 'sub, T, E, OE1>(source: OE, callback: F) -> Self
48 where
49 OE: Observable<'or, 'sub, T, E0>,
50 OE1: Observable<'or, 'sub, T, E>,
51 F: FnOnce(E0) -> OE1,
52 {
53 Self {
54 source,
55 callback,
56 _marker: PhantomData,
57 }
58 }
59}
60
61impl<'or, 'sub, T, E0, E, OE, OE1, F> Observable<'or, 'sub, T, E> for Catch<E0, OE, F>
62where
63 E: 'or,
64 OE: Observable<'or, 'sub, T, E0>,
65 OE1: Observable<'or, 'sub, T, E>,
66 F: FnOnce(E0) -> OE1 + NecessarySendSync + 'or,
67 'sub: 'or,
68{
69 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
70 let sub = Shared::new(Mutable::new(None));
71 let onserver = CatchObserver {
72 observer,
73 callback: self.callback,
74 sub: sub.clone(),
75 _marker: PhantomData,
76 };
77 self.source.subscribe(onserver) + sub
78 }
79}
80
81struct CatchObserver<'sub, E, OR, F> {
82 observer: OR,
83 callback: F,
84 sub: Shared<Mutable<Option<Subscription<'sub>>>>,
85 _marker: MarkerType<E>,
86}
87
88impl<'or, 'sub, T, E0, E, OR, OE1, F> Observer<T, E0> for CatchObserver<'sub, E, OR, F>
89where
90 OR: Observer<T, E> + NecessarySendSync + 'or,
91 OE1: Observable<'or, 'sub, T, E>,
92 F: FnOnce(E0) -> OE1,
93{
94 fn on_next(&mut self, value: T) {
95 self.observer.on_next(value);
96 }
97
98 fn on_termination(self, termination: Termination<E0>) {
99 match termination {
100 Termination::Completed => self.observer.on_termination(Termination::Completed),
101 Termination::Error(error) => {
102 let observable = (self.callback)(error);
103 let sub = observable.subscribe(self.observer);
104 safe_lock_option!(replace: self.sub, sub);
105 }
106 }
107 }
108}