1use crate::async_rt::spawn;
2use crate::{
3 error::RucronError, locker::Locker, metric::MetricType, unlock_and_record, METRIC_STORAGE,
4};
5
6use async_trait::async_trait;
7use chrono::{DateTime, Local};
8use futures::future::Future;
9use http::Extensions;
10use std::{
11 any::type_name,
12 error::Error,
13 fmt::Debug,
14 marker::PhantomData,
15 ops::{Deref, DerefMut},
16 sync::Arc,
17};
18
19#[async_trait]
26pub trait Executor<T>: Send + Sized + 'static {
27 async fn call(self, args: &ArgStorage, name: String);
28}
29
30pub trait SyncExecutor<T> {
32 fn call(self, args: &ArgStorage, name: String);
33}
34
35fn handle_result(res: Result<(), Box<dyn Error>>, name: &str, start: DateTime<Local>) {
36 res.map_err(|e| RucronError::RunTimeError(e.to_string()))
37 .map_or_else(
38 |e| {
39 log::error!("{}", e);
40 METRIC_STORAGE
41 .get(name)
42 .unwrap()
43 .add_failure(MetricType::Error);
44 },
45 |_| {
46 METRIC_STORAGE.get(name).unwrap().swap_time_and_add_runs(
47 Local::now().signed_duration_since(start).num_seconds() as usize,
48 );
49 },
50 );
51}
52
53#[async_trait]
54impl<F, Fut> Executor<()> for F
55where
56 F: Fn() -> Fut + Send + Sync + 'static,
57 Fut: Future<Output = Result<(), Box<dyn Error>>> + Send + 'static,
58{
59 async fn call(self, _args: &ArgStorage, name: String) {
60 let start = Local::now();
61 spawn(async move {
62 handle_result(self().await, &name, start);
63 });
64 }
65}
66
67impl<Func> SyncExecutor<()> for Func
68where
69 Func: Fn() -> Result<(), Box<dyn Error>> + Send + Sync + 'static,
70{
71 fn call(self, _args: &ArgStorage, name: String) {
72 let start = Local::now();
73 rayon::spawn(move || {
74 handle_result(self(), &name, start);
75 });
76 }
77}
78
79#[async_trait]
83pub trait JobHandler: Send + Sized + 'static {
84 async fn call(self, args: Arc<ArgStorage>, name: String);
85 fn name(&self) -> String;
86}
87
88pub trait ParseArgs: Sized + Clone {
127 type Err: Error;
128
129 fn parse_args(args: &ArgStorage) -> Result<Self, Self::Err>;
130}
131
132macro_rules! impl_executor {
133 ( $($ty:ident),* $(,)? ) => {
134 #[async_trait]
135 #[allow(non_snake_case)]
136 impl<F, Fut, $($ty,)*> Executor<($($ty,)*)> for F
137 where
138 F: Fn($($ty,)*) -> Fut + Clone + Send + Sync + 'static,
139 Fut: Future<Output = Result<(), Box<dyn Error>>> + Send,
140 $($ty: ParseArgs + Send + 'static,)*
141 {
142 async fn call(self, args:&ArgStorage, name: String) {
143 let start = Local::now();
144 $(
145 let $ty = match $ty::parse_args(args) {
146 Ok(value) => value,
147 Err(e) => {
148 handle_result(Err(Box::new(e)), &name, start);
149 return;
150 },
151 };
152 )*
153 spawn(async move {
154 handle_result(self($($ty,)*).await, &name, start);
155 });
156 }
157 }
158 };
159}
160
161macro_rules! impl_sync_executor {
162 ( $($ty:ident),* $(,)? ) => {
163 #[allow(non_snake_case)]
164 impl<F, $($ty,)*> SyncExecutor<($($ty,)*)> for F
165 where
166 F: Fn($($ty,)*) -> Result<(), Box<dyn Error>> + Clone + Send + Sync + 'static,
167 $($ty: ParseArgs + Send + 'static,)*
168 {
169 fn call(self, args:&ArgStorage, name: String) {
170 let start = Local::now();
171 $(
172 let $ty = match $ty::parse_args(args) {
173 Ok(value) => value,
174 Err(e) => {
175 handle_result(Err(Box::new(e)), &name, start);
176 return;
177 },
178 };
179 )*
180 rayon::spawn(move ||{
181 handle_result(self($($ty,)*), &name, start);
182 });
183 }
184 }
185 };
186}
187
188impl_executor!(T1);
189impl_executor!(T1, T2);
190impl_executor!(T1, T2, T3);
191impl_executor!(T1, T2, T3, T4);
192impl_executor!(T1, T2, T3, T4, T5);
193impl_executor!(T1, T2, T3, T4, T5, T6);
194impl_executor!(T1, T2, T3, T4, T5, T6, T7);
195impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8);
196impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
197impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
198impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
199impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
200impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
201impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
202impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
203impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
204
205impl_sync_executor!(T1);
206impl_sync_executor!(T1, T2);
207impl_sync_executor!(T1, T2, T3);
208impl_sync_executor!(T1, T2, T3, T4);
209impl_sync_executor!(T1, T2, T3, T4, T5);
210impl_sync_executor!(T1, T2, T3, T4, T5, T6);
211impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7);
212impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8);
213impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
214impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
215impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
216impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
217impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
218impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
219impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
220impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
221
222#[derive(Debug, Clone)]
224pub struct Task<T, TH, L> {
225 pub(crate) name: String,
226 pub(crate) task: T,
227 pub(crate) fallback: TH,
228 pub(crate) locker: Option<L>,
229 pub(crate) need_lock: bool,
230 pub(crate) n_threads: u8,
231}
232
233#[async_trait]
234impl<T, TH, L> JobHandler for Task<T, TH, L>
235where
236 T: JobHandler + Send + Sync + Clone,
237 TH: JobHandler + Send + Sync + 'static,
238 L: Locker + 'static + Send + Sync + Clone,
239{
240 async fn call(self, args: Arc<ArgStorage>, name: String) {
241 if self.name == name {
242 if self.need_lock && self.locker.is_some() {
243 let locker = self.locker.clone();
244 match locker {
245 Some(locker) => match locker.lock(&name[..], args.clone()) {
246 Ok(b) if b => {
247 log::debug!(
248 "[DEBUG] Spawns a new asynchronous task to run: {}",
249 &name[..]
250 );
251 JobHandler::call(self.task, args.clone(), name.clone()).await;
252 log::debug!("[DEBUG] Had finished running: {}", &name[..]);
253 unlock_and_record(locker, &name[..], args);
254 }
255 Ok(b) if !b => {
256 METRIC_STORAGE
257 .get(&self.name())
258 .unwrap()
259 .add_failure(MetricType::Lock);
260 }
261 Ok(_) => {
262 unreachable!("unreachable!")
263 }
264 Err(e) => {
265 log::error!("{}", e);
266 METRIC_STORAGE
267 .get(&self.name())
268 .unwrap()
269 .add_failure(MetricType::Error);
270 }
271 },
272 _ => {}
273 };
274 } else if !self.need_lock {
275 log::debug!(
276 "[DEBUG] Spawns a new asynchronous task to run: {}",
277 &name[..]
278 );
279 for _ in (0..self.n_threads).into_iter() {
280 let name_copy = name.clone();
281 let task_copy = self.task.clone();
282 let args_copy = args.clone();
283 JobHandler::call(task_copy, args_copy, name_copy.clone()).await;
284 log::debug!("[DEBUG] Had finished running: {}", name_copy);
285 }
286 };
287 } else {
288 JobHandler::call(self.fallback, args, name).await;
289 }
290 }
291 #[inline(always)]
292 fn name(&self) -> String {
293 self.name.clone()
294 }
295}
296
297#[derive(Clone)]
299pub struct ExecutorWrapper<E, T> {
300 executor: E,
301 executor_name: String,
302 _marker: PhantomData<T>,
303}
304
305#[derive(Clone)]
307pub struct SyncExecutorWrapper<E, T> {
308 executor: E,
309 executor_name: String,
310 _marker: PhantomData<T>,
311}
312
313pub fn execute<E, T>(executor: E) -> ExecutorWrapper<E, T> {
339 let tname = type_name::<E>();
340 let tokens: Vec<&str> = tname.split("::").collect();
341 let name = match (*tokens).get(tokens.len() - 1) {
342 None => panic!("Invalid name: {:?}", tokens),
343 Some(s) => (*s).into(),
344 };
345 ExecutorWrapper {
346 executor,
347 executor_name: name,
348 _marker: PhantomData,
349 }
350}
351
352impl<E, T> From<ExecutorWrapper<E, T>> for SyncExecutorWrapper<E, T> {
353 fn from(executor: ExecutorWrapper<E, T>) -> Self {
354 SyncExecutorWrapper {
355 executor: executor.executor,
356 executor_name: executor.executor_name,
357 _marker: executor._marker,
358 }
359 }
360}
361
362pub fn sync_execute<E, T>(executor: E) -> SyncExecutorWrapper<E, T> {
391 SyncExecutorWrapper::from(execute(executor))
392}
393
394#[async_trait]
395impl<E, T> JobHandler for ExecutorWrapper<E, T>
396where
397 E: Executor<T> + Send + 'static + Sync + Clone,
398 T: Send + 'static + Sync,
399{
400 async fn call(self, args: Arc<ArgStorage>, name: String) {
401 let exe = self.executor.clone();
402 Executor::call(exe, &*args, name).await;
403 }
404
405 #[inline(always)]
406 fn name(&self) -> String {
407 self.executor_name.clone()
408 }
409}
410
411#[async_trait]
412impl<E, T> JobHandler for SyncExecutorWrapper<E, T>
413where
414 E: SyncExecutor<T> + Send + 'static + Sync + Clone,
415 T: Send + 'static + Sync,
416{
417 async fn call(self, args: Arc<ArgStorage>, name: String) {
418 let exe = self.executor.clone();
419 SyncExecutor::call(exe, &*args, name);
420 }
421
422 #[inline(always)]
423 fn name(&self) -> String {
424 self.executor_name.clone()
425 }
426}
427
428#[derive(Debug)]
435pub struct ArgStorage(Extensions);
436
437impl ArgStorage {
438 pub fn new() -> Self {
439 ArgStorage(Extensions::new())
440 }
441}
442
443impl Deref for ArgStorage {
444 type Target = Extensions;
445
446 fn deref(&self) -> &Self::Target {
447 &self.0
448 }
449}
450
451impl DerefMut for ArgStorage {
452 fn deref_mut(&mut self) -> &mut Self::Target {
453 &mut self.0
454 }
455}