another_rxrust/operators/
retry.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4
5#[derive(Clone)]
6pub struct Retry<Item>
7where
8  Item: Clone + Send + Sync,
9{
10  count: usize,
11  _item: PhantomData<Item>,
12}
13
14impl<'a, Item> Retry<Item>
15where
16  Item: Clone + Send + Sync,
17{
18  pub fn new(count: usize) -> Retry<Item> {
19    Retry { count, _item: PhantomData }
20  }
21
22  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
23    let count = self.count;
24
25    Observable::<Item>::create(move |s| {
26      fn do_subscribe<'a, Item>(
27        n: usize,
28        max_retry: usize,
29        source: Observable<'a, Item>,
30        sctl: StreamController<'a, Item>,
31      ) where
32        Item: Clone + Send + Sync,
33      {
34        let sctl_next = sctl.clone();
35        let sctl_error = sctl.clone();
36        let sctl_complete = sctl.clone();
37        let source_error = source.clone();
38        source.inner_subscribe(sctl.new_observer(
39          move |_, x: Item| {
40            sctl_next.sink_next(x);
41          },
42          move |serial, e| {
43            if max_retry == 0 || n < max_retry {
44              sctl_error.upstream_abort_observe(&serial);
45              do_subscribe(
46                n + 1,
47                max_retry,
48                source_error.clone(),
49                sctl_error.clone(),
50              );
51            } else {
52              sctl_error.sink_error(e);
53            }
54          },
55          move |serial| sctl_complete.sink_complete(&serial),
56        ));
57      }
58
59      let sctl = StreamController::new(s);
60      do_subscribe(1, count, source.clone(), sctl.clone());
61    })
62  }
63}
64
65impl<'a, Item> Observable<'a, Item>
66where
67  Item: Clone + Send + Sync,
68{
69  pub fn retry(&self, max_retry: usize) -> Observable<'a, Item> {
70    Retry::new(max_retry).execute(self.clone())
71  }
72}
73
74#[cfg(test)]
75mod test {
76  use crate::prelude::*;
77  use std::sync::{Arc, RwLock};
78
79  #[test]
80  fn basic() {
81    let counter = Arc::new(RwLock::new(0));
82    let counter_ob = Arc::clone(&counter);
83    let o = Observable::create(move |s| {
84      let c = *counter_ob.read().unwrap();
85      println!("#{}", c + 1);
86      s.next(c * 100 + 0);
87      s.next(c * 100 + 1);
88      *counter_ob.write().unwrap() += 1;
89      if c < 5 {
90        s.error(RxError::from_error("ERR!"));
91      } else {
92        s.complete();
93      }
94    });
95
96    o.retry(0).subscribe(
97      print_next_fmt!("{}"),
98      print_error_as!(&str),
99      print_complete!(),
100    );
101
102    *counter.write().unwrap() = 0;
103    o.retry(3).subscribe(
104      print_next_fmt!("{}"),
105      print_error_as!(&str),
106      print_complete!(),
107    );
108  }
109}