another_rxrust/operators/
retry_when.rs1use std::marker::PhantomData;
2
3use crate::internals::function_wrapper::*;
4use crate::internals::stream_controller::*;
5use crate::prelude::*;
6
7#[derive(Clone)]
8pub struct RetryWhen<'a, Item>
9where
10 Item: Clone + Send + Sync,
11{
12 predicate_f: FunctionWrapper<'a, RxError, bool>,
13 _item: PhantomData<Item>,
14}
15
16impl<'a, Item> RetryWhen<'a, Item>
17where
18 Item: Clone + Send + Sync,
19{
20 pub fn new<F>(f: F) -> RetryWhen<'a, Item>
21 where
22 F: Fn(RxError) -> bool + Send + Sync + 'a,
23 {
24 RetryWhen {
25 predicate_f: FunctionWrapper::new(f),
26 _item: PhantomData,
27 }
28 }
29
30 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
31 let f = self.predicate_f.clone();
32
33 Observable::<Item>::create(move |s| {
34 fn do_subscribe<'a, Item>(
35 predicate: FunctionWrapper<'a, RxError, bool>,
36 source: Observable<'a, Item>,
37 sctl: StreamController<'a, Item>,
38 ) where
39 Item: Clone + Send + Sync,
40 {
41 let sctl_next = sctl.clone();
42 let sctl_error = sctl.clone();
43 let sctl_complete = sctl.clone();
44 let source_error = source.clone();
45 source.inner_subscribe(sctl.new_observer(
46 move |_, x: Item| {
47 sctl_next.sink_next(x);
48 },
49 move |serial, e| {
50 if predicate.call(e.clone()) {
51 sctl_error.upstream_abort_observe(&serial);
52 do_subscribe(
53 predicate.clone(),
54 source_error.clone(),
55 sctl_error.clone(),
56 );
57 } else {
58 sctl_error.sink_error(e);
59 }
60 },
61 move |serial| sctl_complete.sink_complete(&serial),
62 ));
63 }
64
65 let sctl = StreamController::new(s);
66 do_subscribe(f.clone(), source.clone(), sctl.clone());
67 })
68 }
69}
70
71impl<'a, Item> Observable<'a, Item>
72where
73 Item: Clone + Send + Sync,
74{
75 pub fn retry_when<F>(&self, f: F) -> Observable<'a, Item>
76 where
77 F: Fn(RxError) -> bool + Send + Sync + 'a,
78 {
79 RetryWhen::new(f).execute(self.clone())
80 }
81}
82
83#[cfg(test)]
84mod test {
85 use crate::prelude::*;
86 use std::sync::{Arc, RwLock};
87
88 #[test]
89 fn basic() {
90 let counter = Arc::new(RwLock::new(0));
91 let counter_ob = Arc::clone(&counter);
92 let o = Observable::create(move |s| {
93 let c = *counter_ob.read().unwrap();
94 println!("#{}", c + 1);
95 s.next(c * 100 + 0);
96 s.next(c * 100 + 1);
97 *counter_ob.write().unwrap() += 1;
98 if c < 5 {
99 s.error(RxError::from_error("ERR!"));
100 } else {
101 s.complete();
102 }
103 });
104
105 o.retry_when(|e| {
106 println!(
107 "retry_when {:?}",
108 e.downcast_ref::<&str>()
109 );
110 *counter.read().unwrap() < 2
111 })
112 .subscribe(
113 print_next_fmt!("{}"),
114 print_error_as!(&str),
115 print_complete!(),
116 );
117 }
118}