rx_rust/operators/error_handling/
retry.rs1use crate::disposable::subscription::Subscription;
2use crate::safe_lock_option;
3use crate::utils::types::{Mutable, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9
10#[derive(Educe)]
11#[educe(Debug, Clone, PartialEq, Eq)]
12pub enum RetryAction<E, OE1> {
13 Retry(OE1),
14 Stop(E),
15}
16
17#[derive(Educe)]
44#[educe(Debug, Clone)]
45pub struct Retry<OE, F> {
46 source: OE,
47 callback: F,
48}
49
50impl<OE, F> Retry<OE, F> {
51 pub fn new<'or, 'sub, T, E, OE1>(source: OE, callback: F) -> Self
52 where
53 OE: Observable<'or, 'sub, T, E>,
54 OE1: Observable<'or, 'sub, T, E>,
55 F: FnMut(E) -> RetryAction<E, OE1>,
56 {
57 Self { source, callback }
58 }
59}
60
61impl<'or, 'sub, T, E, OE, OE1, F> Observable<'or, 'sub, T, E> for Retry<OE, F>
62where
63 OE: Observable<'or, 'sub, T, E>,
64 OE1: Observable<'or, 'sub, T, E>,
65 F: FnMut(E) -> RetryAction<E, OE1> + NecessarySendSync + 'or,
66 'sub: 'or,
67{
68 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
69 let sub = Shared::new(Mutable::new(None));
70 let onserver = RetryObserver {
71 observer,
72 callback: self.callback,
73 sub: sub.clone(),
74 };
75 self.source.subscribe(onserver) + sub
76 }
77}
78
79struct RetryObserver<'sub, OR, F> {
80 observer: OR,
81 callback: F,
82 sub: Shared<Mutable<Option<Subscription<'sub>>>>,
83}
84
85impl<'or, 'sub, T, E, OR, OE1, F> Observer<T, E> for RetryObserver<'sub, OR, F>
86where
87 OR: Observer<T, E> + NecessarySendSync + 'or,
88 OE1: Observable<'or, 'sub, T, E>,
89 F: FnMut(E) -> RetryAction<E, OE1> + NecessarySendSync + 'or,
90 'sub: 'or,
91{
92 fn on_next(&mut self, value: T) {
93 self.observer.on_next(value);
94 }
95
96 fn on_termination(mut self, termination: Termination<E>) {
97 match termination {
98 Termination::Completed => self.observer.on_termination(Termination::Completed),
99 Termination::Error(error) => {
100 let action = (self.callback)(error);
101 match action {
102 RetryAction::Retry(observable) => {
103 let sub = self.sub.clone();
104 safe_lock_option!(replace: sub, observable.subscribe(self));
105 }
106 RetryAction::Stop(error) => {
107 self.observer.on_termination(Termination::Error(error))
108 }
109 }
110 }
111 }
112 }
113}