1use std::time::Duration;
4
5use futures_util::{
6 future::{ready, BoxFuture},
7 Future, FutureExt,
8};
9use tokio::task::JoinHandle;
10
11use self::retry_policy::TaskRetryPolicy;
12
13pub mod retry_policy {
14 use super::*;
16
17 pub trait TaskRetryPolicy: Send + Sync + 'static {
19 fn check(&mut self, _attempts: u32) -> BoxFuture<bool>;
21 }
22 pub fn never() -> FailFast {
24 FailFast {}
25 }
26
27 pub fn exponential_backoff(max_attempts: u32) -> ExponentialBackoff {
29 ExponentialBackoff { max_attempts }
30 }
31
32 pub fn forever(delay_s: u32) -> InfiniteRetry {
34 InfiniteRetry { delay_s }
35 }
36
37 pub struct FailFast;
39
40 impl TaskRetryPolicy for FailFast {
41 fn check(&mut self, _attempts: u32) -> BoxFuture<bool> {
42 ready(false).boxed()
43 }
44 }
45
46 pub struct ExponentialBackoff {
48 max_attempts: u32,
49 }
50
51 impl TaskRetryPolicy for ExponentialBackoff {
52 fn check(&mut self, attempts: u32) -> BoxFuture<bool> {
53 async move {
54 if attempts > self.max_attempts {
55 false
56 } else {
57 tokio::time::sleep(Duration::from_secs(2_u64.pow(attempts))).await;
58 true
59 }
60 }
61 .boxed()
62 }
63 }
64
65 pub struct InfiniteRetry {
67 delay_s: u32,
68 }
69
70 impl TaskRetryPolicy for InfiniteRetry {
71 fn check(&mut self, _attempts: u32) -> BoxFuture<bool> {
72 async move {
73 tokio::time::sleep(Duration::from_secs(self.delay_s as u64)).await;
74 true
75 }
76 .boxed()
77 }
78 }
79}
80pub fn spawn_retry_task<F, G>(task_fn: G, mut retry_policy: impl TaskRetryPolicy) -> JoinHandle<()>
94where
95 F: Future + Send + 'static,
96 F::Output: Send + 'static,
97 G: Fn() -> F + Send + Sync + 'static,
98{
99 tokio::spawn(async move {
100 let mut attempts = 0;
101 loop {
102 task_fn().await;
103 if !retry_policy.check(attempts).await {
104 panic!("task reached retry limit");
105 }
106 attempts += 1;
107 }
108 })
109}