another_rxrust/operators/
retry.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4
5#[derive(Clone)]
6pub struct Retry<Item>
7where
8 Item: Clone + Send + Sync,
9{
10 count: usize,
11 _item: PhantomData<Item>,
12}
13
14impl<'a, Item> Retry<Item>
15where
16 Item: Clone + Send + Sync,
17{
18 pub fn new(count: usize) -> Retry<Item> {
19 Retry { count, _item: PhantomData }
20 }
21
22 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
23 let count = self.count;
24
25 Observable::<Item>::create(move |s| {
26 fn do_subscribe<'a, Item>(
27 n: usize,
28 max_retry: usize,
29 source: Observable<'a, Item>,
30 sctl: StreamController<'a, Item>,
31 ) where
32 Item: Clone + Send + Sync,
33 {
34 let sctl_next = sctl.clone();
35 let sctl_error = sctl.clone();
36 let sctl_complete = sctl.clone();
37 let source_error = source.clone();
38 source.inner_subscribe(sctl.new_observer(
39 move |_, x: Item| {
40 sctl_next.sink_next(x);
41 },
42 move |serial, e| {
43 if max_retry == 0 || n < max_retry {
44 sctl_error.upstream_abort_observe(&serial);
45 do_subscribe(
46 n + 1,
47 max_retry,
48 source_error.clone(),
49 sctl_error.clone(),
50 );
51 } else {
52 sctl_error.sink_error(e);
53 }
54 },
55 move |serial| sctl_complete.sink_complete(&serial),
56 ));
57 }
58
59 let sctl = StreamController::new(s);
60 do_subscribe(1, count, source.clone(), sctl.clone());
61 })
62 }
63}
64
65impl<'a, Item> Observable<'a, Item>
66where
67 Item: Clone + Send + Sync,
68{
69 pub fn retry(&self, max_retry: usize) -> Observable<'a, Item> {
70 Retry::new(max_retry).execute(self.clone())
71 }
72}
73
74#[cfg(test)]
75mod test {
76 use crate::prelude::*;
77 use std::sync::{Arc, RwLock};
78
79 #[test]
80 fn basic() {
81 let counter = Arc::new(RwLock::new(0));
82 let counter_ob = Arc::clone(&counter);
83 let o = Observable::create(move |s| {
84 let c = *counter_ob.read().unwrap();
85 println!("#{}", c + 1);
86 s.next(c * 100 + 0);
87 s.next(c * 100 + 1);
88 *counter_ob.write().unwrap() += 1;
89 if c < 5 {
90 s.error(RxError::from_error("ERR!"));
91 } else {
92 s.complete();
93 }
94 });
95
96 o.retry(0).subscribe(
97 print_next_fmt!("{}"),
98 print_error_as!(&str),
99 print_complete!(),
100 );
101
102 *counter.write().unwrap() = 0;
103 o.retry(3).subscribe(
104 print_next_fmt!("{}"),
105 print_error_as!(&str),
106 print_complete!(),
107 );
108 }
109}