another_rxrust/operators/
retry_when.rs

1use std::marker::PhantomData;
2
3use crate::internals::function_wrapper::*;
4use crate::internals::stream_controller::*;
5use crate::prelude::*;
6
7#[derive(Clone)]
8pub struct RetryWhen<'a, Item>
9where
10  Item: Clone + Send + Sync,
11{
12  predicate_f: FunctionWrapper<'a, RxError, bool>,
13  _item: PhantomData<Item>,
14}
15
16impl<'a, Item> RetryWhen<'a, Item>
17where
18  Item: Clone + Send + Sync,
19{
20  pub fn new<F>(f: F) -> RetryWhen<'a, Item>
21  where
22    F: Fn(RxError) -> bool + Send + Sync + 'a,
23  {
24    RetryWhen {
25      predicate_f: FunctionWrapper::new(f),
26      _item: PhantomData,
27    }
28  }
29
30  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
31    let f = self.predicate_f.clone();
32
33    Observable::<Item>::create(move |s| {
34      fn do_subscribe<'a, Item>(
35        predicate: FunctionWrapper<'a, RxError, bool>,
36        source: Observable<'a, Item>,
37        sctl: StreamController<'a, Item>,
38      ) where
39        Item: Clone + Send + Sync,
40      {
41        let sctl_next = sctl.clone();
42        let sctl_error = sctl.clone();
43        let sctl_complete = sctl.clone();
44        let source_error = source.clone();
45        source.inner_subscribe(sctl.new_observer(
46          move |_, x: Item| {
47            sctl_next.sink_next(x);
48          },
49          move |serial, e| {
50            if predicate.call(e.clone()) {
51              sctl_error.upstream_abort_observe(&serial);
52              do_subscribe(
53                predicate.clone(),
54                source_error.clone(),
55                sctl_error.clone(),
56              );
57            } else {
58              sctl_error.sink_error(e);
59            }
60          },
61          move |serial| sctl_complete.sink_complete(&serial),
62        ));
63      }
64
65      let sctl = StreamController::new(s);
66      do_subscribe(f.clone(), source.clone(), sctl.clone());
67    })
68  }
69}
70
71impl<'a, Item> Observable<'a, Item>
72where
73  Item: Clone + Send + Sync,
74{
75  pub fn retry_when<F>(&self, f: F) -> Observable<'a, Item>
76  where
77    F: Fn(RxError) -> bool + Send + Sync + 'a,
78  {
79    RetryWhen::new(f).execute(self.clone())
80  }
81}
82
83#[cfg(test)]
84mod test {
85  use crate::prelude::*;
86  use std::sync::{Arc, RwLock};
87
88  #[test]
89  fn basic() {
90    let counter = Arc::new(RwLock::new(0));
91    let counter_ob = Arc::clone(&counter);
92    let o = Observable::create(move |s| {
93      let c = *counter_ob.read().unwrap();
94      println!("#{}", c + 1);
95      s.next(c * 100 + 0);
96      s.next(c * 100 + 1);
97      *counter_ob.write().unwrap() += 1;
98      if c < 5 {
99        s.error(RxError::from_error("ERR!"));
100      } else {
101        s.complete();
102      }
103    });
104
105    o.retry_when(|e| {
106      println!(
107        "retry_when {:?}",
108        e.downcast_ref::<&str>()
109      );
110      *counter.read().unwrap() < 2
111    })
112    .subscribe(
113      print_next_fmt!("{}"),
114      print_error_as!(&str),
115      print_complete!(),
116    );
117  }
118}