1mod retrix;
2
3use retrix::retry_async;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::task;
7
8pub struct Task<E> {
9 pub name: String,
10 pub interval: Duration,
11 pub max_retries: u32,
12 pub retry_base_delay: Duration,
13 pub timeout: Duration,
14 pub action: Arc<dyn Fn() -> task::JoinHandle<Result<(), E>> + Send + Sync>,
15 pub retry_errors: Vec<E>,
16 pub logger: Option<Arc<dyn Fn(&str) + Send + Sync>>,
17 pub on_success: Option<Arc<dyn Fn() + Send + Sync>>,
18 pub on_failure: Option<Arc<dyn Fn() + Send + Sync>>,
19}
20
21pub struct TaskBuilder<E> {
22 name: String,
23 action: Arc<dyn Fn() -> task::JoinHandle<Result<(), E>> + Send + Sync>,
24 interval: Duration,
25 max_retries: u32,
26 retry_base_delay: Duration,
27 timeout: Duration,
28 retry_errors: Vec<E>,
29 logger: Option<Arc<dyn Fn(&str) + Send + Sync>>,
30 on_success: Option<Arc<dyn Fn() + Send + Sync>>,
31 on_failure: Option<Arc<dyn Fn() + Send + Sync>>,
32}
33
34impl<E: Clone + PartialEq + Send + 'static> TaskBuilder<E> {
35 pub fn new<F>(name: &str, action: F) -> Self
36 where
37 F: Fn() -> task::JoinHandle<Result<(), E>> + Send + Sync + 'static,
38 {
39 Self {
40 name: name.to_string(),
41 action: Arc::new(action),
42 interval: Duration::from_secs(5),
43 max_retries: 3,
44 retry_base_delay: Duration::from_millis(100),
45 timeout: Duration::from_secs(10),
46 retry_errors: vec![],
47 logger: None,
48 on_success: None,
49 on_failure: None,
50 }
51 }
52
53 pub fn interval(mut self, interval: Duration) -> Self {
54 self.interval = interval;
55 self
56 }
57
58 pub fn max_retries(mut self, retries: u32) -> Self {
59 self.max_retries = retries;
60 self
61 }
62
63 pub fn retry_base_delay(mut self, delay: Duration) -> Self {
64 self.retry_base_delay = delay;
65 self
66 }
67
68 pub fn timeout(mut self, timeout: Duration) -> Self {
69 self.timeout = timeout;
70 self
71 }
72
73 pub fn retry_errors(mut self, errors: Vec<E>) -> Self {
74 self.retry_errors = errors;
75 self
76 }
77
78 pub fn logger<FN>(mut self, logger: FN) -> Self
79 where
80 FN: Fn(&str) + Send + Sync + 'static,
81 {
82 self.logger = Some(Arc::new(logger));
83 self
84 }
85
86 pub fn on_success<FN>(mut self, callback: FN) -> Self
87 where
88 FN: Fn() + Send + Sync + 'static,
89 {
90 self.on_success = Some(Arc::new(callback));
91 self
92 }
93
94 pub fn on_failure<FN>(mut self, callback: FN) -> Self
95 where
96 FN: Fn() + Send + Sync + 'static,
97 {
98 self.on_failure = Some(Arc::new(callback));
99 self
100 }
101
102 pub fn build(self) -> Task<E> {
103 Task {
104 name: self.name,
105 action: self.action,
106 interval: self.interval,
107 max_retries: self.max_retries,
108 retry_base_delay: self.retry_base_delay,
109 timeout: self.timeout,
110 retry_errors: self.retry_errors,
111 logger: self.logger,
112 on_success: self.on_success,
113 on_failure: self.on_failure,
114 }
115 }
116}
117
118pub struct Scheduler<E> {
119 tasks: Vec<Task<E>>,
120}
121
122impl<E: Clone + PartialEq + Send + Default + 'static> Scheduler<E> {
123 pub fn new() -> Self {
124 Self { tasks: vec![] }
125 }
126
127 pub fn add(&mut self, task: Task<E>) {
128 self.tasks.push(task);
129 }
130
131 pub async fn run_once(&self) {
132 for task in &self.tasks {
133 let t = task.clone_task();
134 let log = |msg: &str| {
135 if let Some(logger) = &t.logger {
136 (logger)(msg);
137 }
138 };
139
140 let action_closure = t.action.clone();
141 let result = retrix::retry_async(t.max_retries, t.retry_base_delay, || {
142 let handle = (action_closure)();
143 let timeout = t.timeout;
144 Box::pin(async move {
145 let res = tokio::time::timeout(timeout, handle).await;
146 match res {
147 Ok(join_res) => match join_res {
148 Ok(inner_res) => inner_res,
149 Err(join_err) => {
150 eprintln!("Task spawn failed: {}", join_err);
151 Err(E::default())
152 }
153 },
154 Err(_) => {
155 eprintln!("Task timed out after {:?}", timeout);
156 Err(E::default())
157 }
158 }
159 })
160 })
161 .await;
162
163 match result {
164 Ok(_) => {
165 if let Some(cb) = &t.on_success {
166 (cb)();
167 }
168 }
169 Err(_) => {
170 if let Some(cb) = &t.on_failure {
171 (cb)();
172 }
173 }
174 }
175
176 log(&format!("Task {} executed/retried", t.name));
177 }
178 }
179
180 pub async fn run(&self) {
181 for task in &self.tasks {
182 let t = task.clone_task();
183 tokio::spawn(async move {
184 loop {
185 let log = |msg: &str| {
186 if let Some(logger) = &t.logger {
187 (logger)(msg);
188 }
189 };
190
191 let action_closure = t.action.clone();
192
193 let result = retry_async(t.max_retries, t.retry_base_delay, || {
194 let handle = (action_closure)();
195 let timeout = t.timeout;
196 Box::pin(async move {
197 let res = tokio::time::timeout(timeout, handle).await;
198 match res {
199 Ok(join_res) => match join_res {
200 Ok(inner_res) => inner_res,
201 Err(join_err) => {
202 eprintln!("Task spawn failed: {}", join_err);
203 Ok(())
204 }
205 },
206 Err(_) => {
207 eprintln!("Task timed out after {:?}", timeout);
208 Ok(())
209 }
210 }
211 })
212 })
213 .await;
214
215 match result {
216 Ok(_) => {
217 if let Some(cb) = &t.on_success {
218 (cb)();
219 }
220 }
221 Err(_) => {
222 if let Some(cb) = &t.on_failure {
223 (cb)();
224 }
225 }
226 }
227
228 log(&format!("Task {} executed/retried", t.name));
229 tokio::time::sleep(t.interval).await;
230 }
231 });
232 }
233 }
234}
235
236impl<E: Clone + PartialEq + Send + 'static> Task<E> {
237 fn clone_task(&self) -> Self {
238 Self {
239 name: self.name.clone(),
240 interval: self.interval,
241 max_retries: self.max_retries,
242 retry_base_delay: self.retry_base_delay,
243 timeout: self.timeout,
244 action: self.action.clone(),
245 retry_errors: self.retry_errors.clone(),
246 logger: self.logger.clone(),
247 on_success: self.on_success.clone(),
248 on_failure: self.on_failure.clone(),
249 }
250 }
251}