1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::marker::PhantomData;

use futures::{future::BoxFuture, FutureExt};
use tower::retry::Policy;

/// Retry Policy.
#[derive(Debug)]
pub enum RetryPolicy<T, U, F> {
    /// On.
    On {
        /// Error filter.
        f: F,
        /// Retry times.
        times: usize,
    },
    /// Never.
    Never(PhantomData<fn() -> (T, U)>),
}

impl<T, U, F: Clone> Clone for RetryPolicy<T, U, F> {
    fn clone(&self) -> Self {
        match self {
            Self::Never(_) => Self::Never(PhantomData),
            Self::On { f, times } => Self::On {
                f: f.clone(),
                times: *times,
            },
        }
    }
}

impl<T, U, F: Copy> Copy for RetryPolicy<T, U, F> {}

impl<T, U, F> Default for RetryPolicy<T, U, F> {
    fn default() -> Self {
        Self::never()
    }
}

impl<T, U, F> RetryPolicy<T, U, F> {
    /// Never retry.
    pub fn never() -> Self {
        Self::Never(PhantomData)
    }

    /// Retry on.
    pub fn retry_on<E, F2>(self, f: F2) -> RetryPolicy<T, U, F2>
    where
        F2: Fn(&E) -> bool,
        F2: Send + 'static + Clone,
    {
        RetryPolicy::On { f, times: 0 }
    }
}

impl<T, U, E, F> Policy<T, U, E> for RetryPolicy<T, U, F>
where
    T: 'static + Clone,
    U: 'static,
    F: Fn(&E) -> bool,
    F: Send + 'static + Clone,
{
    type Future = BoxFuture<'static, Self>;

    fn retry(&self, _req: &T, result: Result<&U, &E>) -> Option<Self::Future> {
        match self {
            Self::On { f, times } => match result {
                Ok(_) => None,
                Err(err) => {
                    if f(err) {
                        let times = *times;
                        let secs = (1 << times).min(128);
                        tracing::trace!("retry in {secs}s;");
                        let retry = Self::On {
                            f: f.clone(),
                            times: times + 1,
                        };
                        let fut = async move {
                            tokio::time::sleep(std::time::Duration::from_secs(secs)).await;
                            retry
                        }
                        .boxed();
                        Some(fut)
                    } else {
                        tracing::trace!("retry given up;");
                        None
                    }
                }
            },
            Self::Never(_) => None,
        }
    }

    fn clone_request(&self, req: &T) -> Option<T> {
        Some(req.clone())
    }
}