drift_rs/
async_utils.rs

1//! utils for async functions
2
3use std::time::Duration;
4
5use futures_util::{
6    future::{ready, BoxFuture},
7    Future, FutureExt,
8};
9use tokio::task::JoinHandle;
10
11use self::retry_policy::TaskRetryPolicy;
12
13pub mod retry_policy {
14    //! retry policies for async tasks
15    use super::*;
16
17    /// Defines whether an async task should be retried or not
18    pub trait TaskRetryPolicy: Send + Sync + 'static {
19        /// called pre-retry, returns whether retry should proceed or not
20        fn check(&mut self, _attempts: u32) -> BoxFuture<bool>;
21    }
22    /// Create a new fail fast policy
23    pub fn never() -> FailFast {
24        FailFast {}
25    }
26
27    /// Create a new exponential backoff policy
28    pub fn exponential_backoff(max_attempts: u32) -> ExponentialBackoff {
29        ExponentialBackoff { max_attempts }
30    }
31
32    /// Create a new never ending retry policy
33    pub fn forever(delay_s: u32) -> InfiniteRetry {
34        InfiniteRetry { delay_s }
35    }
36
37    /// TaskFails on first retry
38    pub struct FailFast;
39
40    impl TaskRetryPolicy for FailFast {
41        fn check(&mut self, _attempts: u32) -> BoxFuture<bool> {
42            ready(false).boxed()
43        }
44    }
45
46    /// Exponential back-off policy up to `max_attempts`
47    pub struct ExponentialBackoff {
48        max_attempts: u32,
49    }
50
51    impl TaskRetryPolicy for ExponentialBackoff {
52        fn check(&mut self, attempts: u32) -> BoxFuture<bool> {
53            async move {
54                if attempts > self.max_attempts {
55                    false
56                } else {
57                    tokio::time::sleep(Duration::from_secs(2_u64.pow(attempts))).await;
58                    true
59                }
60            }
61            .boxed()
62        }
63    }
64
65    /// A policy that retries a task indefinitely, with constant delay between successive retries
66    pub struct InfiniteRetry {
67        delay_s: u32,
68    }
69
70    impl TaskRetryPolicy for InfiniteRetry {
71        fn check(&mut self, _attempts: u32) -> BoxFuture<bool> {
72            async move {
73                tokio::time::sleep(Duration::from_secs(self.delay_s as u64)).await;
74                true
75            }
76            .boxed()
77        }
78    }
79}
80/// Spawns a new tokio task with udf retry behaviour
81///
82/// - `task_fn` generator function for the task future
83///
84/// ```example
85/// let task_gen = move || {
86///     async move {
87///         1 + 1
88///     }
89/// };
90///
91/// spawn_retry_task(task_gen, FailFast {})
92/// ```
93pub fn spawn_retry_task<F, G>(task_fn: G, mut retry_policy: impl TaskRetryPolicy) -> JoinHandle<()>
94where
95    F: Future + Send + 'static,
96    F::Output: Send + 'static,
97    G: Fn() -> F + Send + Sync + 'static,
98{
99    tokio::spawn(async move {
100        let mut attempts = 0;
101        loop {
102            task_fn().await;
103            if !retry_policy.check(attempts).await {
104                panic!("task reached retry limit");
105            }
106            attempts += 1;
107        }
108    })
109}