Skip to main content

taskbot_rs/
lib.rs

1mod retrix;
2
3use retrix::retry_async;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::task;
7
8pub struct Task<E> {
9    pub name: String,
10    pub interval: Duration,
11    pub max_retries: u32,
12    pub retry_base_delay: Duration,
13    pub timeout: Duration,
14    pub action: Arc<dyn Fn() -> task::JoinHandle<Result<(), E>> + Send + Sync>,
15    pub retry_errors: Vec<E>,
16    pub logger: Option<Arc<dyn Fn(&str) + Send + Sync>>,
17    pub on_success: Option<Arc<dyn Fn() + Send + Sync>>,
18    pub on_failure: Option<Arc<dyn Fn() + Send + Sync>>,
19}
20
21pub struct TaskBuilder<E> {
22    name: String,
23    action: Arc<dyn Fn() -> task::JoinHandle<Result<(), E>> + Send + Sync>,
24    interval: Duration,
25    max_retries: u32,
26    retry_base_delay: Duration,
27    timeout: Duration,
28    retry_errors: Vec<E>,
29    logger: Option<Arc<dyn Fn(&str) + Send + Sync>>,
30    on_success: Option<Arc<dyn Fn() + Send + Sync>>,
31    on_failure: Option<Arc<dyn Fn() + Send + Sync>>,
32}
33
34impl<E: Clone + PartialEq + Send + 'static> TaskBuilder<E> {
35    pub fn new<F>(name: &str, action: F) -> Self
36    where
37        F: Fn() -> task::JoinHandle<Result<(), E>> + Send + Sync + 'static,
38    {
39        Self {
40            name: name.to_string(),
41            action: Arc::new(action),
42            interval: Duration::from_secs(5),
43            max_retries: 3,
44            retry_base_delay: Duration::from_millis(100),
45            timeout: Duration::from_secs(10),
46            retry_errors: vec![],
47            logger: None,
48            on_success: None,
49            on_failure: None,
50        }
51    }
52
53    pub fn interval(mut self, interval: Duration) -> Self {
54        self.interval = interval;
55        self
56    }
57
58    pub fn max_retries(mut self, retries: u32) -> Self {
59        self.max_retries = retries;
60        self
61    }
62
63    pub fn retry_base_delay(mut self, delay: Duration) -> Self {
64        self.retry_base_delay = delay;
65        self
66    }
67
68    pub fn timeout(mut self, timeout: Duration) -> Self {
69        self.timeout = timeout;
70        self
71    }
72
73    pub fn retry_errors(mut self, errors: Vec<E>) -> Self {
74        self.retry_errors = errors;
75        self
76    }
77
78    pub fn logger<FN>(mut self, logger: FN) -> Self
79    where
80        FN: Fn(&str) + Send + Sync + 'static,
81    {
82        self.logger = Some(Arc::new(logger));
83        self
84    }
85
86    pub fn on_success<FN>(mut self, callback: FN) -> Self
87    where
88        FN: Fn() + Send + Sync + 'static,
89    {
90        self.on_success = Some(Arc::new(callback));
91        self
92    }
93
94    pub fn on_failure<FN>(mut self, callback: FN) -> Self
95    where
96        FN: Fn() + Send + Sync + 'static,
97    {
98        self.on_failure = Some(Arc::new(callback));
99        self
100    }
101
102    pub fn build(self) -> Task<E> {
103        Task {
104            name: self.name,
105            action: self.action,
106            interval: self.interval,
107            max_retries: self.max_retries,
108            retry_base_delay: self.retry_base_delay,
109            timeout: self.timeout,
110            retry_errors: self.retry_errors,
111            logger: self.logger,
112            on_success: self.on_success,
113            on_failure: self.on_failure,
114        }
115    }
116}
117
118pub struct Scheduler<E> {
119    tasks: Vec<Task<E>>,
120}
121
122impl<E: Clone + PartialEq + Send + Default + 'static> Scheduler<E> {
123    pub fn new() -> Self {
124        Self { tasks: vec![] }
125    }
126
127    pub fn add(&mut self, task: Task<E>) {
128        self.tasks.push(task);
129    }
130
131    pub async fn run_once(&self) {
132        for task in &self.tasks {
133            let t = task.clone_task();
134            let log = |msg: &str| {
135                if let Some(logger) = &t.logger {
136                    (logger)(msg);
137                }
138            };
139
140            let action_closure = t.action.clone();
141            let result = retrix::retry_async(t.max_retries, t.retry_base_delay, || {
142                let handle = (action_closure)();
143                let timeout = t.timeout;
144                Box::pin(async move {
145                    let res = tokio::time::timeout(timeout, handle).await;
146                    match res {
147                        Ok(join_res) => match join_res {
148                            Ok(inner_res) => inner_res,
149                            Err(join_err) => {
150                                eprintln!("Task spawn failed: {}", join_err);
151                                Err(E::default())
152                            }
153                        },
154                        Err(_) => {
155                            eprintln!("Task timed out after {:?}", timeout);
156                            Err(E::default())
157                        }
158                    }
159                })
160            })
161            .await;
162
163            match result {
164                Ok(_) => {
165                    if let Some(cb) = &t.on_success {
166                        (cb)();
167                    }
168                }
169                Err(_) => {
170                    if let Some(cb) = &t.on_failure {
171                        (cb)();
172                    }
173                }
174            }
175
176            log(&format!("Task {} executed/retried", t.name));
177        }
178    }
179
180    pub async fn run(&self) {
181        for task in &self.tasks {
182            let t = task.clone_task();
183            tokio::spawn(async move {
184                loop {
185                    let log = |msg: &str| {
186                        if let Some(logger) = &t.logger {
187                            (logger)(msg);
188                        }
189                    };
190
191                    let action_closure = t.action.clone();
192
193                    let result = retry_async(t.max_retries, t.retry_base_delay, || {
194                        let handle = (action_closure)();
195                        let timeout = t.timeout;
196                        Box::pin(async move {
197                            let res = tokio::time::timeout(timeout, handle).await;
198                            match res {
199                                Ok(join_res) => match join_res {
200                                    Ok(inner_res) => inner_res,
201                                    Err(join_err) => {
202                                        eprintln!("Task spawn failed: {}", join_err);
203                                        Ok(())
204                                    }
205                                },
206                                Err(_) => {
207                                    eprintln!("Task timed out after {:?}", timeout);
208                                    Ok(())
209                                }
210                            }
211                        })
212                    })
213                    .await;
214
215                    match result {
216                        Ok(_) => {
217                            if let Some(cb) = &t.on_success {
218                                (cb)();
219                            }
220                        }
221                        Err(_) => {
222                            if let Some(cb) = &t.on_failure {
223                                (cb)();
224                            }
225                        }
226                    }
227
228                    log(&format!("Task {} executed/retried", t.name));
229                    tokio::time::sleep(t.interval).await;
230                }
231            });
232        }
233    }
234}
235
236impl<E: Clone + PartialEq + Send + 'static> Task<E> {
237    fn clone_task(&self) -> Self {
238        Self {
239            name: self.name.clone(),
240            interval: self.interval,
241            max_retries: self.max_retries,
242            retry_base_delay: self.retry_base_delay,
243            timeout: self.timeout,
244            action: self.action.clone(),
245            retry_errors: self.retry_errors.clone(),
246            logger: self.logger.clone(),
247            on_success: self.on_success.clone(),
248            on_failure: self.on_failure.clone(),
249        }
250    }
251}