1pub mod threadpool;
2
3use chrono::{DateTime, TimeZone, Timelike, Utc};
4
5use croner::Cron;
6
7use crate::threadpool::ThreadPool;
8
9use std::sync::{
10 atomic::{AtomicUsize, Ordering},
11 Arc, Mutex,
12};
13
14const DEFAULT_THREADPOOL_SIZE: usize = 10;
15
16pub struct CronScheduler<C, F>
17where
18 C: Clone + Send + Sync + 'static,
19 F: FnMut(Option<&C>) + Send + 'static,
20{
21 cron: Cron,
22 task: ScheduledTask,
23 context: Option<Arc<Mutex<ScheduledTaskContext<C>>>>,
24 thread_pool: ThreadPool,
25 callback: Option<F>,
26}
27
28#[derive(PartialEq)]
29enum SchedulerState {
30 Running,
31 Paused,
32 Stopped,
33}
34
35#[derive(PartialEq)]
36pub enum SchedulerResult {
37 Dead = 0,
40
41 NoOp = 1,
43 TaskTriggered,
44
45 ThreadPoolExhausted, }
48
49struct ScheduledTask {
50 scheduler_state: SchedulerState,
51 last_start: Option<DateTime<Utc>>,
52 max_executions: Option<usize>,
53 _executions: usize,
54 shared_state: Arc<Mutex<SharedTaskState>>,
55}
56
57struct SharedTaskState {
58 last_finish: Option<DateTime<Utc>>,
59 active_task_count: AtomicUsize,
60}
61
62pub struct ScheduledTaskContext<C>
63where
64 C: Send + Sync + 'static,
65{
66 context: Option<C>,
67}
68
69impl<C, F> CronScheduler<C, F>
70where
71 C: Clone + Send + Sync + 'static, F: FnMut(Option<&C>) + Clone + Send + 'static,
73{
74 pub fn new(cron: Cron) -> Self {
75 CronScheduler {
76 cron,
77 task: ScheduledTask {
78 scheduler_state: SchedulerState::Stopped,
79 last_start: None,
80 max_executions: None,
81 _executions: 0,
82 shared_state: Arc::new(Mutex::new(SharedTaskState {
83 active_task_count: AtomicUsize::new(0),
84 last_finish: None,
85 })),
86 },
87 context: None,
88 thread_pool: ThreadPool::new(DEFAULT_THREADPOOL_SIZE), callback: None,
90 }
91 }
92
93 pub fn tick<Tz: TimeZone>(&mut self, now: DateTime<Tz>) -> SchedulerResult {
95 if self.task.scheduler_state == SchedulerState::Stopped {
97 return SchedulerResult::Dead;
98 }
99
100 if self.is_busy() && self.thread_pool.max_count() == 1 {
102 return SchedulerResult::NoOp;
104 }
105
106 if let Some(last_run) = self.task.last_start {
108 let last_run_tz = last_run.with_timezone(&now.timezone());
109 if now.with_nanosecond(0) <= last_run_tz.with_nanosecond(0) {
110 return SchedulerResult::NoOp; }
112 }
113
114 if let Some(next_run_time) = self.next_run_from(now.clone()) {
116 if now < next_run_time {
117 return SchedulerResult::NoOp; }
119 } else {
120 return SchedulerResult::Dead; }
122
123 let thread_pool = &self.thread_pool;
124 let shared_state_clone = self.task.shared_state.clone();
125 let callback_clone = self.callback.clone();
126 let context_clone = self.context.clone(); self.task.last_start = Some(Utc::now());
129
130 thread_pool.execute(move || {
131 if let Some(mut callback) = callback_clone {
132 {
134 shared_state_clone
135 .lock()
136 .unwrap()
137 .active_task_count
138 .fetch_add(1, Ordering::SeqCst);
139 }
140
141 let context_clone =
143 context_clone.and_then(|context| context.lock().unwrap().context.clone());
144
145 callback(context_clone.as_ref());
147
148 let mut state = shared_state_clone.lock().unwrap();
150 state.last_finish = Some(Utc::now());
151 state.active_task_count.fetch_sub(1, Ordering::SeqCst);
152 }
153 });
154
155 SchedulerResult::TaskTriggered
156 }
157
158 pub fn start(&mut self, callback: F) {
159 self.task.scheduler_state = SchedulerState::Running;
160 self.callback = Some(callback); }
162
163 pub fn pause(&mut self) {
164 self.task.scheduler_state = SchedulerState::Paused;
165 }
166
167 pub fn with_context(&mut self, context: C) {
168 self.context = Some(Arc::new(Mutex::new(ScheduledTaskContext {
169 context: Some(context),
170 })));
171 }
172
173 pub fn resume(&mut self) {
174 self.task.scheduler_state = SchedulerState::Running;
175 }
176
177 pub fn with_max_executions(mut self, max: usize) -> Self {
178 self.task.max_executions = Some(max);
179 self
180 }
181
182 pub fn with_threadpool_size(mut self, size: usize) -> Self {
183 self.thread_pool = ThreadPool::new(size);
184 self
185 }
186
187 pub fn next_run_from<Tz: TimeZone>(&self, from: DateTime<Tz>) -> Option<DateTime<Tz>> {
188 self.cron.find_next_occurrence(&from, true).ok()
189 }
190
191 pub fn next_run_after<Tz: TimeZone>(&self, after: DateTime<Tz>) -> Option<DateTime<Tz>> {
192 self.cron.find_next_occurrence(&after, false).ok()
193 }
194
195 pub fn next_runs<Tz: TimeZone>(&self, after: DateTime<Tz>, count: usize) -> Vec<DateTime<Tz>> {
196 let mut runs = Vec::new();
197 for next_time in self.cron.iter_after(after).take(count) {
198 runs.push(next_time.clone());
199 }
200 runs
201 }
202
203 pub fn previous_or_current_run<Tz: TimeZone>(&self, timezone: Tz) -> Option<DateTime<Tz>> {
205 self.task.last_start.map(|dt| dt.with_timezone(&timezone))
206 }
207
208 pub fn is_running(&self) -> bool {
209 matches!(self.task.scheduler_state, SchedulerState::Running)
210 }
211
212 pub fn is_stopped(&self) -> bool {
213 matches!(self.task.scheduler_state, SchedulerState::Stopped)
214 }
215
216 pub fn is_busy(&self) -> bool {
217 self.task
218 .shared_state
219 .lock()
220 .unwrap()
221 .active_task_count
222 .load(Ordering::SeqCst)
223 > 0
224 }
225}