rusty_cron_scheduler/
lib.rs

1use core::time;
2use std::{collections::HashMap, sync::{Arc, Mutex}, thread};
3use uuid::Uuid;
4use chrono::{prelude::*, Duration, TimeDelta};
5use rusty_cron::Cron;
6
7#[derive(Clone)]
8struct Task {
9    task_id: Uuid,
10    time_to_exec: DateTime<Utc>,
11    cron: String,
12    function_to_exec: fn(),
13    execution_threshold_millis: Option<u64>,
14}
15impl Task{
16    pub fn new(task_id: Uuid, time_to_exec: DateTime<Utc>, cron: String, function_to_exec: fn(), execution_threshold_millis: Option<u64>) -> Task{
17        Task {task_id,  time_to_exec, cron, function_to_exec, execution_threshold_millis}
18    }
19}
20
21// This struct allows for the configuration of the scheduler,
22// it's optional to the creation of a scheduler and will
23// use threshold 250 and wait 1000 by default
24pub struct SchedulerConfigOptions {
25    scheduler_wait_millis: u64,
26    execution_threshold_millis: u64,
27}
28impl SchedulerConfigOptions {
29    pub fn new(execution_threshold_millis: u64, scheduler_wait_millis: u64) -> SchedulerConfigOptions{
30        SchedulerConfigOptions{execution_threshold_millis,scheduler_wait_millis }
31    }
32}
33
34pub struct Scheduler{
35    tasks: Arc<Mutex<Vec<Task>>>,
36    config: Arc<SchedulerConfigOptions>,
37    is_started: bool
38}
39impl Scheduler{
40    pub fn new(config: Option<SchedulerConfigOptions>) -> Scheduler{
41        let config_options: Arc<SchedulerConfigOptions>;
42        match config{
43            Some(n) => config_options = Arc::new(n),
44            None => config_options = Arc::new(SchedulerConfigOptions::new(250,1000))
45        }
46        
47        let scheduler: Scheduler = Scheduler {tasks: Arc::new(Mutex::new(Vec::new())), config: config_options, is_started: false};
48        return scheduler;
49    }
50
51    pub fn startup(&mut self){
52        if !self.is_started {
53            let tasks: Arc<Mutex<Vec<Task>>> = self.tasks.clone();
54            let config: Arc<SchedulerConfigOptions> = self.config.clone();
55            std::thread::spawn(move || {
56                let mut running_tasks: HashMap<Uuid, thread::JoinHandle<()>> = HashMap::new();
57                loop {
58                    let now: DateTime<Utc> = chrono::Utc::now();
59
60                    // To avoid concurrency problems we lock the mutex that protects the tasks array
61                    let mut guard = (&*tasks).lock().unwrap();
62                    for task in &mut *guard {
63                        // If current task is on running map and has finished, clear from map and get next exec
64                        if running_tasks.contains_key(&task.task_id) && running_tasks.get(&task.task_id).unwrap().is_finished() {
65                            running_tasks.remove(&task.task_id);
66                            let cloned_now = now.clone();
67                            //Old time to exec +1 second to avoid executing the same time frame, instead cron will return next execution
68                            let old_time_exec = task.time_to_exec.checked_add_signed(TimeDelta::seconds(1)); 
69                            let time = cloned_now.checked_add_signed(Duration::milliseconds(Cron::parse_time(&task.cron, old_time_exec).unwrap())).unwrap();
70                            task.time_to_exec = time;
71                        }
72                        
73                        let task_threshold: u64;
74
75                        match task.execution_threshold_millis {
76                            Some(n) => task_threshold = n,
77                            None => task_threshold = config.execution_threshold_millis
78                        }
79
80                        // If either execution should've happened in the past or execution is in the next few millis
81                        // execute function of task
82                        // Not "else if" because when the task has ended we want to know if it needs to execute now or within threshold
83                        if !running_tasks.contains_key(&task.task_id) && 
84                            ((task.time_to_exec - now).num_milliseconds() < task_threshold.try_into().unwrap())  
85                        {
86                            let task_to_run = task.clone();
87                            let join_handle = thread::spawn(move || {
88                                (task_to_run.function_to_exec)();
89                            });
90
91                            running_tasks.insert(task.task_id, join_handle);
92                        }
93                    }
94
95                    thread::sleep(time::Duration::from_millis(config.scheduler_wait_millis));
96                }
97            });
98
99            self.is_started = true;
100        }
101    }
102
103    pub fn add_task(&mut self, cron: &str, function_to_exec: fn(), execution_threshold_millis: Option<u64>) -> Result<Uuid,String>{
104
105        let time_result = Cron::parse_time(cron, None);
106
107        let time_to_exec: i64;
108        match time_result {
109            Ok(n) => time_to_exec = n,
110            Err(e) => return Err(e)
111        }
112        let uuid = Uuid::new_v4();
113
114        let now: DateTime<Utc> = chrono::Utc::now();
115        let time_to_exec = now.checked_add_signed(Duration::milliseconds(time_to_exec)).unwrap();
116        let guard = &mut *self.tasks.lock().unwrap();
117        guard.push(Task::new(uuid, time_to_exec, cron.to_owned(), function_to_exec, execution_threshold_millis));
118
119        return Ok(uuid);
120    }
121
122    pub fn remove_task(&mut self, task_id: Uuid){
123        let guard = &mut *self.tasks.lock().unwrap();
124        guard.retain(|task| task.task_id != task_id);
125    }
126}