rx_rust/operators/error_handling/
retry.rs1use crate::disposable::subscription::Subscription;
2use crate::safe_lock_option;
3use crate::utils::types::{Mutable, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9
10#[derive(Educe)]
11#[educe(Debug, Clone, PartialEq, Eq)]
12pub enum RetryAction<E, OE1> {
13 Retry(OE1),
14 Stop(E),
15}
16
17#[derive(Educe)]
44#[educe(Debug, Clone)]
45pub struct Retry<OE, F> {
46 source: OE,
47 callback: F,
48}
49
50impl<OE, F> Retry<OE, F> {
51 pub fn new<'or, 'sub, T, E, OE1>(source: OE, callback: F) -> Self
52 where
53 OE: Observable<'or, 'sub, T, E>,
54 OE1: Observable<'or, 'sub, T, E>,
55 F: FnMut(E) -> RetryAction<E, OE1>,
56 {
57 Self { source, callback }
58 }
59}
60
61impl<'or, 'sub, T, E, OE, OE1, F> Observable<'or, 'sub, T, E> for Retry<OE, F>
62where
63 OE: Observable<'or, 'sub, T, E>,
64 OE1: Observable<'or, 'sub, T, E>,
65 F: FnMut(E) -> RetryAction<E, OE1> + NecessarySendSync + 'or,
66 'sub: 'or,
67{
68 fn subscribe(
69 self,
70 observer: impl Observer<T, E> + NecessarySendSync + 'or,
71 ) -> Subscription<'sub> {
72 let sub = Shared::new(Mutable::new(None));
73 let onserver = RetryObserver {
74 observer,
75 callback: self.callback,
76 sub: sub.clone(),
77 };
78 self.source.subscribe(onserver) + sub
79 }
80}
81
82struct RetryObserver<'sub, OR, F> {
83 observer: OR,
84 callback: F,
85 sub: Shared<Mutable<Option<Subscription<'sub>>>>,
86}
87
88impl<'or, 'sub, T, E, OR, OE1, F> Observer<T, E> for RetryObserver<'sub, OR, F>
89where
90 OR: Observer<T, E> + NecessarySendSync + 'or,
91 OE1: Observable<'or, 'sub, T, E>,
92 F: FnMut(E) -> RetryAction<E, OE1> + NecessarySendSync + 'or,
93 'sub: 'or,
94{
95 fn on_next(&mut self, value: T) {
96 self.observer.on_next(value);
97 }
98
99 fn on_termination(mut self, termination: Termination<E>) {
100 match termination {
101 Termination::Completed => self.observer.on_termination(Termination::Completed),
102 Termination::Error(error) => {
103 let action = (self.callback)(error);
104 match action {
105 RetryAction::Retry(observable) => {
106 let sub = self.sub.clone();
107 safe_lock_option!(replace: sub, observable.subscribe(self));
108 }
109 RetryAction::Stop(error) => {
110 self.observer.on_termination(Termination::Error(error))
111 }
112 }
113 }
114 }
115 }
116}