rucron/
handler.rs

1use crate::async_rt::spawn;
2use crate::{
3    error::RucronError, locker::Locker, metric::MetricType, unlock_and_record, METRIC_STORAGE,
4};
5
6use async_trait::async_trait;
7use chrono::{DateTime, Local};
8use futures::future::Future;
9use http::Extensions;
10use std::{
11    any::type_name,
12    error::Error,
13    fmt::Debug,
14    marker::PhantomData,
15    ops::{Deref, DerefMut},
16    sync::Arc,
17};
18
19/// The trait is the task to be run in fact, the task must be asynchronous.
20///
21/// The function with no parameters and less than 16 parameters has implemented the trait.
22///
23/// The `Scheduler` will call this function when a job is `runnable`.
24///
25#[async_trait]
26pub trait Executor<T>: Send + Sized + 'static {
27    async fn call(self, args: &ArgStorage, name: String);
28}
29
30/// The trait is similar to `Executor`, the task must be synchronous.
31pub trait SyncExecutor<T> {
32    fn call(self, args: &ArgStorage, name: String);
33}
34
35fn handle_result(res: Result<(), Box<dyn Error>>, name: &str, start: DateTime<Local>) {
36    res.map_err(|e| RucronError::RunTimeError(e.to_string()))
37        .map_or_else(
38            |e| {
39                log::error!("{}", e);
40                METRIC_STORAGE
41                    .get(name)
42                    .unwrap()
43                    .add_failure(MetricType::Error);
44            },
45            |_| {
46                METRIC_STORAGE.get(name).unwrap().swap_time_and_add_runs(
47                    Local::now().signed_duration_since(start).num_seconds() as usize,
48                );
49            },
50        );
51}
52
53#[async_trait]
54impl<F, Fut> Executor<()> for F
55where
56    F: Fn() -> Fut + Send + Sync + 'static,
57    Fut: Future<Output = Result<(), Box<dyn Error>>> + Send + 'static,
58{
59    async fn call(self, _args: &ArgStorage, name: String) {
60        let start = Local::now();
61        spawn(async move {
62            handle_result(self().await, &name, start);
63        });
64    }
65}
66
67impl<Func> SyncExecutor<()> for Func
68where
69    Func: Fn() -> Result<(), Box<dyn Error>> + Send + Sync + 'static,
70{
71    fn call(self, _args: &ArgStorage, name: String) {
72        let start = Local::now();
73        rayon::spawn(move || {
74            handle_result(self(), &name, start);
75        });
76    }
77}
78
79/// `Scheduler` mangages all jobs by this trait. When a job is runnable,
80///
81/// the `Schedluler` find recursively the job by name and parse arguments the job need from `args`.
82#[async_trait]
83pub trait JobHandler: Send + Sized + 'static {
84    async fn call(self, args: Arc<ArgStorage>, name: String);
85    fn name(&self) -> String;
86}
87
88/// Implement the trait to parse or get arguments from [`ArgStorage`].
89///
90/// The [`Scheduler`](super::Scheduler) will call `parse_args` and pass arguments to `job` when run job.
91/// # Examples
92///
93/// ```
94/// use rucron::{Scheduler, EmptyTask, execute, ArgStorage, ParseArgs};
95/// use async_trait::async_trait;
96/// use std::error::Error;
97///
98///
99/// #[derive(Clone)]
100/// struct Person {
101///     age: i32,
102/// }
103///
104/// #[async_trait]
105/// impl ParseArgs for Person {
106///     type Err = std::io::Error;
107///     fn parse_args(args: &ArgStorage) -> Result<Self, Self::Err> {
108///         return Ok(args.get::<Person>().unwrap().clone());
109///     }
110/// }
111/// async fn say_age(p: Person) -> Result<(), Box<dyn Error>>  {
112///     println!("I am {} years old", p.age);
113///     Ok(())
114/// }
115///
116/// #[tokio::main]
117/// async fn main(){
118///     let mut sch = Scheduler::<EmptyTask, ()>::new(2, 10);
119///     let mut storage = ArgStorage::new();
120///     storage.insert(Person { age: 7 });
121///     sch.set_arg_storage(storage);
122///     let sch = sch.every(2).second().immediately_run().todo(execute(say_age)).await;
123///     assert!(sch.is_scheduled("say_age"));
124/// }
125/// ```
126pub trait ParseArgs: Sized + Clone {
127    type Err: Error;
128
129    fn parse_args(args: &ArgStorage) -> Result<Self, Self::Err>;
130}
131
132macro_rules! impl_executor {
133    ( $($ty:ident),* $(,)? ) => {
134        #[async_trait]
135        #[allow(non_snake_case)]
136        impl<F, Fut, $($ty,)*> Executor<($($ty,)*)> for F
137        where
138            F: Fn($($ty,)*) -> Fut + Clone + Send + Sync + 'static,
139            Fut: Future<Output = Result<(), Box<dyn Error>>> + Send,
140            $($ty: ParseArgs + Send + 'static,)*
141        {
142            async fn call(self, args:&ArgStorage, name: String) {
143                let start = Local::now();
144                $(
145                    let $ty = match $ty::parse_args(args) {
146                        Ok(value) => value,
147                        Err(e) => {
148                            handle_result(Err(Box::new(e)), &name, start);
149                            return;
150                        },
151                    };
152                )*
153                spawn(async move {
154                    handle_result(self($($ty,)*).await, &name, start);
155                });
156            }
157        }
158    };
159}
160
161macro_rules! impl_sync_executor {
162    ( $($ty:ident),* $(,)? ) => {
163        #[allow(non_snake_case)]
164        impl<F, $($ty,)*> SyncExecutor<($($ty,)*)> for F
165        where
166            F: Fn($($ty,)*) -> Result<(), Box<dyn Error>> + Clone + Send + Sync + 'static,
167            $($ty: ParseArgs + Send + 'static,)*
168        {
169            fn call(self, args:&ArgStorage, name: String) {
170                let start = Local::now();
171                $(
172                    let $ty = match $ty::parse_args(args) {
173                        Ok(value) => value,
174                        Err(e) => {
175                            handle_result(Err(Box::new(e)), &name, start);
176                            return;
177                        },
178                    };
179                )*
180                rayon::spawn(move ||{
181                    handle_result(self($($ty,)*), &name, start);
182                });
183            }
184        }
185    };
186}
187
188impl_executor!(T1);
189impl_executor!(T1, T2);
190impl_executor!(T1, T2, T3);
191impl_executor!(T1, T2, T3, T4);
192impl_executor!(T1, T2, T3, T4, T5);
193impl_executor!(T1, T2, T3, T4, T5, T6);
194impl_executor!(T1, T2, T3, T4, T5, T6, T7);
195impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8);
196impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
197impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
198impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
199impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
200impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
201impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
202impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
203impl_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
204
205impl_sync_executor!(T1);
206impl_sync_executor!(T1, T2);
207impl_sync_executor!(T1, T2, T3);
208impl_sync_executor!(T1, T2, T3, T4);
209impl_sync_executor!(T1, T2, T3, T4, T5);
210impl_sync_executor!(T1, T2, T3, T4, T5, T6);
211impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7);
212impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8);
213impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
214impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
215impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
216impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
217impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
218impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
219impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
220impl_sync_executor!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
221
222/// `Task` stores runtime parameters of a job which name is `name`.
223#[derive(Debug, Clone)]
224pub struct Task<T, TH, L> {
225    pub(crate) name: String,
226    pub(crate) task: T,
227    pub(crate) fallback: TH,
228    pub(crate) locker: Option<L>,
229    pub(crate) need_lock: bool,
230    pub(crate) n_threads: u8,
231}
232
233#[async_trait]
234impl<T, TH, L> JobHandler for Task<T, TH, L>
235where
236    T: JobHandler + Send + Sync + Clone,
237    TH: JobHandler + Send + Sync + 'static,
238    L: Locker + 'static + Send + Sync + Clone,
239{
240    async fn call(self, args: Arc<ArgStorage>, name: String) {
241        if self.name == name {
242            if self.need_lock && self.locker.is_some() {
243                let locker = self.locker.clone();
244                match locker {
245                    Some(locker) => match locker.lock(&name[..], args.clone()) {
246                        Ok(b) if b => {
247                            log::debug!(
248                                "[DEBUG] Spawns a new asynchronous task to run: {}",
249                                &name[..]
250                            );
251                            JobHandler::call(self.task, args.clone(), name.clone()).await;
252                            log::debug!("[DEBUG] Had finished running: {}", &name[..]);
253                            unlock_and_record(locker, &name[..], args);
254                        }
255                        Ok(b) if !b => {
256                            METRIC_STORAGE
257                                .get(&self.name())
258                                .unwrap()
259                                .add_failure(MetricType::Lock);
260                        }
261                        Ok(_) => {
262                            unreachable!("unreachable!")
263                        }
264                        Err(e) => {
265                            log::error!("{}", e);
266                            METRIC_STORAGE
267                                .get(&self.name())
268                                .unwrap()
269                                .add_failure(MetricType::Error);
270                        }
271                    },
272                    _ => {}
273                };
274            } else if !self.need_lock {
275                log::debug!(
276                    "[DEBUG] Spawns a new asynchronous task to run: {}",
277                    &name[..]
278                );
279                for _ in (0..self.n_threads).into_iter() {
280                    let name_copy = name.clone();
281                    let task_copy = self.task.clone();
282                    let args_copy = args.clone();
283                    JobHandler::call(task_copy, args_copy, name_copy.clone()).await;
284                    log::debug!("[DEBUG] Had finished running: {}", name_copy);
285                }
286            };
287        } else {
288            JobHandler::call(self.fallback, args, name).await;
289        }
290    }
291    #[inline(always)]
292    fn name(&self) -> String {
293        self.name.clone()
294    }
295}
296
297/// [`ExecutorWrapper`] wraps the `Executor` and stores it's name.
298#[derive(Clone)]
299pub struct ExecutorWrapper<E, T> {
300    executor: E,
301    executor_name: String,
302    _marker: PhantomData<T>,
303}
304
305/// [`SyncExecutorWrapper`] wraps the `SyncExecutor` and stores it's name.
306#[derive(Clone)]
307pub struct SyncExecutorWrapper<E, T> {
308    executor: E,
309    executor_name: String,
310    _marker: PhantomData<T>,
311}
312
313/// Create a `ExecutorWrapper` and add this job to the `Scheduler`.
314///
315/// - `executor` is the job to be run.
316///
317/// # Panics
318///
319/// Panics if cann't parse name of `E` by `type_name`.
320///
321/// # Examples
322///
323/// ```
324/// use rucron::{execute, Scheduler, EmptyTask};
325/// use std::error::Error;
326/// use std::sync::Arc;
327///
328/// async fn foo() -> Result<(), Box<dyn Error>> {
329///     println!("{}", "foo");
330///     Ok(())
331/// }
332/// #[tokio::main]
333/// async fn main(){
334///     let sch = Scheduler::<EmptyTask, ()>::new(1, 10);
335///     sch.every(2).second().todo(execute(foo)).await;
336/// }
337/// ```
338pub fn execute<E, T>(executor: E) -> ExecutorWrapper<E, T> {
339    let tname = type_name::<E>();
340    let tokens: Vec<&str> = tname.split("::").collect();
341    let name = match (*tokens).get(tokens.len() - 1) {
342        None => panic!("Invalid name: {:?}", tokens),
343        Some(s) => (*s).into(),
344    };
345    ExecutorWrapper {
346        executor,
347        executor_name: name,
348        _marker: PhantomData,
349    }
350}
351
352impl<E, T> From<ExecutorWrapper<E, T>> for SyncExecutorWrapper<E, T> {
353    fn from(executor: ExecutorWrapper<E, T>) -> Self {
354        SyncExecutorWrapper {
355            executor: executor.executor,
356            executor_name: executor.executor_name,
357            _marker: executor._marker,
358        }
359    }
360}
361
362/// Create a `SyncExecutorWrapper` and add this job to the `Scheduler`.
363/// It is similar to `execute`, but the `executor` needs to be implemented `SyncExecutor`.
364/// If you want to run expensive CPU-bound tasks, please utilize this method to add tasks.
365/// In fact, it uses `rayon` to spawn thread to run the tasks.For more details see:
366/// [rayon](https://github.com/rayon-rs/rayon) and [blocking task](https://ryhl.io/blog/async-what-is-blocking).
367///
368/// # Panics
369///
370/// Panics if cann't parse name of `E` by `type_name`.
371///
372/// # Examples
373///
374/// ```
375/// use rucron::{sync_execute, Scheduler, EmptyTask};
376/// use std::{error::Error, sync::Arc, thread::sleep, time::Duration};
377///
378///
379/// fn foo() -> Result<(), Box<dyn Error>> {
380///     sleep(Duration::from_secs(2));
381///     println!("{}", "foo");
382///     Ok(())
383/// }
384/// #[tokio::main]
385/// async fn main(){
386///     let sch = Scheduler::<EmptyTask, ()>::new(1, 10);
387///     sch.every(2).second().todo(sync_execute(foo)).await;
388/// }
389/// ```
390pub fn sync_execute<E, T>(executor: E) -> SyncExecutorWrapper<E, T> {
391    SyncExecutorWrapper::from(execute(executor))
392}
393
394#[async_trait]
395impl<E, T> JobHandler for ExecutorWrapper<E, T>
396where
397    E: Executor<T> + Send + 'static + Sync + Clone,
398    T: Send + 'static + Sync,
399{
400    async fn call(self, args: Arc<ArgStorage>, name: String) {
401        let exe = self.executor.clone();
402        Executor::call(exe, &*args, name).await;
403    }
404
405    #[inline(always)]
406    fn name(&self) -> String {
407        self.executor_name.clone()
408    }
409}
410
411#[async_trait]
412impl<E, T> JobHandler for SyncExecutorWrapper<E, T>
413where
414    E: SyncExecutor<T> + Send + 'static + Sync + Clone,
415    T: Send + 'static + Sync,
416{
417    async fn call(self, args: Arc<ArgStorage>, name: String) {
418        let exe = self.executor.clone();
419        SyncExecutor::call(exe, &*args, name);
420    }
421
422    #[inline(always)]
423    fn name(&self) -> String {
424        self.executor_name.clone()
425    }
426}
427
428/// The storage  stores all the arguments that `jobs` needed.
429///
430/// It uses the extensions to store arguments, see the [docs] for more details.
431///
432/// [docs]: https://docs.rs/http/latest/http/struct.Extensions.html
433///
434#[derive(Debug)]
435pub struct ArgStorage(Extensions);
436
437impl ArgStorage {
438    pub fn new() -> Self {
439        ArgStorage(Extensions::new())
440    }
441}
442
443impl Deref for ArgStorage {
444    type Target = Extensions;
445
446    fn deref(&self) -> &Self::Target {
447        &self.0
448    }
449}
450
451impl DerefMut for ArgStorage {
452    fn deref_mut(&mut self) -> &mut Self::Target {
453        &mut self.0
454    }
455}