rusty_cron_scheduler/
lib.rs1use core::time;
2use std::{collections::HashMap, sync::{Arc, Mutex}, thread};
3use uuid::Uuid;
4use chrono::{prelude::*, Duration, TimeDelta};
5use rusty_cron::Cron;
6
7#[derive(Clone)]
8struct Task {
9 task_id: Uuid,
10 time_to_exec: DateTime<Utc>,
11 cron: String,
12 function_to_exec: fn(),
13 execution_threshold_millis: Option<u64>,
14}
15impl Task{
16 pub fn new(task_id: Uuid, time_to_exec: DateTime<Utc>, cron: String, function_to_exec: fn(), execution_threshold_millis: Option<u64>) -> Task{
17 Task {task_id, time_to_exec, cron, function_to_exec, execution_threshold_millis}
18 }
19}
20
21pub struct SchedulerConfigOptions {
25 scheduler_wait_millis: u64,
26 execution_threshold_millis: u64,
27}
28impl SchedulerConfigOptions {
29 pub fn new(execution_threshold_millis: u64, scheduler_wait_millis: u64) -> SchedulerConfigOptions{
30 SchedulerConfigOptions{execution_threshold_millis,scheduler_wait_millis }
31 }
32}
33
34pub struct Scheduler{
35 tasks: Arc<Mutex<Vec<Task>>>,
36 config: Arc<SchedulerConfigOptions>,
37 is_started: bool
38}
39impl Scheduler{
40 pub fn new(config: Option<SchedulerConfigOptions>) -> Scheduler{
41 let config_options: Arc<SchedulerConfigOptions>;
42 match config{
43 Some(n) => config_options = Arc::new(n),
44 None => config_options = Arc::new(SchedulerConfigOptions::new(250,1000))
45 }
46
47 let scheduler: Scheduler = Scheduler {tasks: Arc::new(Mutex::new(Vec::new())), config: config_options, is_started: false};
48 return scheduler;
49 }
50
51 pub fn startup(&mut self){
52 if !self.is_started {
53 let tasks: Arc<Mutex<Vec<Task>>> = self.tasks.clone();
54 let config: Arc<SchedulerConfigOptions> = self.config.clone();
55 std::thread::spawn(move || {
56 let mut running_tasks: HashMap<Uuid, thread::JoinHandle<()>> = HashMap::new();
57 loop {
58 let now: DateTime<Utc> = chrono::Utc::now();
59
60 let mut guard = (&*tasks).lock().unwrap();
62 for task in &mut *guard {
63 if running_tasks.contains_key(&task.task_id) && running_tasks.get(&task.task_id).unwrap().is_finished() {
65 running_tasks.remove(&task.task_id);
66 let cloned_now = now.clone();
67 let old_time_exec = task.time_to_exec.checked_add_signed(TimeDelta::seconds(1));
69 let time = cloned_now.checked_add_signed(Duration::milliseconds(Cron::parse_time(&task.cron, old_time_exec).unwrap())).unwrap();
70 task.time_to_exec = time;
71 }
72
73 let task_threshold: u64;
74
75 match task.execution_threshold_millis {
76 Some(n) => task_threshold = n,
77 None => task_threshold = config.execution_threshold_millis
78 }
79
80 if !running_tasks.contains_key(&task.task_id) &&
84 ((task.time_to_exec - now).num_milliseconds() < task_threshold.try_into().unwrap())
85 {
86 let task_to_run = task.clone();
87 let join_handle = thread::spawn(move || {
88 (task_to_run.function_to_exec)();
89 });
90
91 running_tasks.insert(task.task_id, join_handle);
92 }
93 }
94
95 thread::sleep(time::Duration::from_millis(config.scheduler_wait_millis));
96 }
97 });
98
99 self.is_started = true;
100 }
101 }
102
103 pub fn add_task(&mut self, cron: &str, function_to_exec: fn(), execution_threshold_millis: Option<u64>) -> Result<Uuid,String>{
104
105 let time_result = Cron::parse_time(cron, None);
106
107 let time_to_exec: i64;
108 match time_result {
109 Ok(n) => time_to_exec = n,
110 Err(e) => return Err(e)
111 }
112 let uuid = Uuid::new_v4();
113
114 let now: DateTime<Utc> = chrono::Utc::now();
115 let time_to_exec = now.checked_add_signed(Duration::milliseconds(time_to_exec)).unwrap();
116 let guard = &mut *self.tasks.lock().unwrap();
117 guard.push(Task::new(uuid, time_to_exec, cron.to_owned(), function_to_exec, execution_threshold_millis));
118
119 return Ok(uuid);
120 }
121
122 pub fn remove_task(&mut self, task_id: Uuid){
123 let guard = &mut *self.tasks.lock().unwrap();
124 guard.retain(|task| task.task_id != task_id);
125 }
126}