croner_scheduler/
lib.rs

1pub mod threadpool;
2
3use chrono::{DateTime, TimeZone, Timelike, Utc};
4
5use croner::Cron;
6
7use crate::threadpool::ThreadPool;
8
9use std::sync::{
10    atomic::{AtomicUsize, Ordering},
11    Arc, Mutex,
12};
13
14const DEFAULT_THREADPOOL_SIZE: usize = 10;
15
16pub struct CronScheduler<C, F>
17where
18    C: Clone + Send + Sync + 'static,
19    F: FnMut(Option<&C>) + Send + 'static,
20{
21    cron: Cron,
22    task: ScheduledTask,
23    context: Option<Arc<Mutex<ScheduledTaskContext<C>>>>,
24    thread_pool: ThreadPool,
25    callback: Option<F>,
26}
27
28#[derive(PartialEq)]
29enum SchedulerState {
30    Running,
31    Paused,
32    Stopped,
33}
34
35#[derive(PartialEq)]
36pub enum SchedulerResult {
37    // Indicates that the scheduler/main loop should stop when used in
38    // `while scheduler.tick() { }`
39    Dead = 0,
40
41    // Keep the scheduler/main loop alive
42    NoOp = 1,
43    TaskTriggered,
44
45    // ToDo: Exhaustion check not implemented
46    ThreadPoolExhausted, // The default threadpool size is 10
47}
48
49struct ScheduledTask {
50    scheduler_state: SchedulerState,
51    last_start: Option<DateTime<Utc>>,
52    max_executions: Option<usize>,
53    _executions: usize,
54    shared_state: Arc<Mutex<SharedTaskState>>,
55}
56
57struct SharedTaskState {
58    last_finish: Option<DateTime<Utc>>,
59    active_task_count: AtomicUsize,
60}
61
62pub struct ScheduledTaskContext<C>
63where
64    C: Send + Sync + 'static,
65{
66    context: Option<C>,
67}
68
69impl<C, F> CronScheduler<C, F>
70where
71    C: Clone + Send + Sync + 'static, // Added Clone here
72    F: FnMut(Option<&C>) + Clone + Send + 'static,
73{
74    pub fn new(cron: Cron) -> Self {
75        CronScheduler {
76            cron,
77            task: ScheduledTask {
78                scheduler_state: SchedulerState::Stopped,
79                last_start: None,
80                max_executions: None,
81                _executions: 0,
82                shared_state: Arc::new(Mutex::new(SharedTaskState {
83                    active_task_count: AtomicUsize::new(0),
84                    last_finish: None,
85                })),
86            },
87            context: None,
88            thread_pool: ThreadPool::new(DEFAULT_THREADPOOL_SIZE), // By default, allow 10 concurrent tasks per scheduler
89            callback: None,
90        }
91    }
92
93    // Will return false if there is no further work to be done
94    pub fn tick<Tz: TimeZone>(&mut self, now: DateTime<Tz>) -> SchedulerResult {
95        // Check if the scheduler is stopped
96        if self.task.scheduler_state == SchedulerState::Stopped {
97            return SchedulerResult::Dead;
98        }
99
100        // Check if the scheduler is busy
101        if self.is_busy() && self.thread_pool.max_count() == 1 {
102            // Skip this run
103            return SchedulerResult::NoOp;
104        }
105
106        // Check if we are past the expected run time
107        if let Some(last_run) = self.task.last_start {
108            let last_run_tz = last_run.with_timezone(&now.timezone());
109            if now.with_nanosecond(0) <= last_run_tz.with_nanosecond(0) {
110                return SchedulerResult::NoOp; // Already run in this second
111            }
112        }
113
114        // Check if it is time to run
115        if let Some(next_run_time) = self.next_run_from(now.clone()) {
116            if now < next_run_time {
117                return SchedulerResult::NoOp; // Not time to trigger yet
118            }
119        } else {
120            return SchedulerResult::Dead; // If there's no next run time, don't proceed
121        }
122
123        let thread_pool = &self.thread_pool;
124        let shared_state_clone = self.task.shared_state.clone();
125        let callback_clone = self.callback.clone();
126        let context_clone = self.context.clone(); // Clone the optional context
127
128        self.task.last_start = Some(Utc::now());
129
130        thread_pool.execute(move || {
131            if let Some(mut callback) = callback_clone {
132                // Temporarily unlock the mutex to increment running tasks
133                {
134                    shared_state_clone
135                        .lock()
136                        .unwrap()
137                        .active_task_count
138                        .fetch_add(1, Ordering::SeqCst);
139                }
140
141                // Clone the context data if it exists, and pass it as an owned value
142                let context_clone =
143                    context_clone.and_then(|context| context.lock().unwrap().context.clone());
144
145                // Pass the cloned context (now owned) to the callback
146                callback(context_clone.as_ref());
147
148                // Update task state
149                let mut state = shared_state_clone.lock().unwrap();
150                state.last_finish = Some(Utc::now());
151                state.active_task_count.fetch_sub(1, Ordering::SeqCst);
152            }
153        });
154
155        SchedulerResult::TaskTriggered
156    }
157
158    pub fn start(&mut self, callback: F) {
159        self.task.scheduler_state = SchedulerState::Running;
160        self.callback = Some(callback); // Wrap in Arc and Mutex
161    }
162
163    pub fn pause(&mut self) {
164        self.task.scheduler_state = SchedulerState::Paused;
165    }
166
167    pub fn with_context(&mut self, context: C) {
168        self.context = Some(Arc::new(Mutex::new(ScheduledTaskContext {
169            context: Some(context),
170        })));
171    }
172
173    pub fn resume(&mut self) {
174        self.task.scheduler_state = SchedulerState::Running;
175    }
176
177    pub fn with_max_executions(mut self, max: usize) -> Self {
178        self.task.max_executions = Some(max);
179        self
180    }
181
182    pub fn with_threadpool_size(mut self, size: usize) -> Self {
183        self.thread_pool = ThreadPool::new(size);
184        self
185    }
186
187    pub fn next_run_from<Tz: TimeZone>(&self, from: DateTime<Tz>) -> Option<DateTime<Tz>> {
188        self.cron.find_next_occurrence(&from, true).ok()
189    }
190
191    pub fn next_run_after<Tz: TimeZone>(&self, after: DateTime<Tz>) -> Option<DateTime<Tz>> {
192        self.cron.find_next_occurrence(&after, false).ok()
193    }
194
195    pub fn next_runs<Tz: TimeZone>(&self, after: DateTime<Tz>, count: usize) -> Vec<DateTime<Tz>> {
196        let mut runs = Vec::new();
197        for next_time in self.cron.iter_after(after).take(count) {
198            runs.push(next_time.clone());
199        }
200        runs
201    }
202
203    // Returns previous runtime, or current run time if a task is running
204    pub fn previous_or_current_run<Tz: TimeZone>(&self, timezone: Tz) -> Option<DateTime<Tz>> {
205        self.task.last_start.map(|dt| dt.with_timezone(&timezone))
206    }
207
208    pub fn is_running(&self) -> bool {
209        matches!(self.task.scheduler_state, SchedulerState::Running)
210    }
211
212    pub fn is_stopped(&self) -> bool {
213        matches!(self.task.scheduler_state, SchedulerState::Stopped)
214    }
215
216    pub fn is_busy(&self) -> bool {
217        self.task
218            .shared_state
219            .lock()
220            .unwrap()
221            .active_task_count
222            .load(Ordering::SeqCst)
223            > 0
224    }
225}