lightspeed_scheduler/
lib.rs

1use crate::error::SchedulerError;
2use crate::job::{Job, JobScheduler};
3use crate::scheduler::{Scheduler, TryToScheduler};
4use arc_swap::ArcSwap;
5use chrono::Utc;
6use chrono_tz::{Tz, UTC};
7use log::*;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12use tokio::task::JoinHandle;
13
14pub mod error;
15pub mod job;
16pub mod scheduler;
17
18#[derive(Clone)]
19pub struct JobExecutor {
20    executor: Arc<JobExecutorInternal>,
21}
22
23struct JobExecutorInternal {
24    sleep_between_checks: ArcSwap<Duration>,
25    running: AtomicBool,
26    timezone: Option<Tz>,
27    jobs: RwLock<Vec<Arc<JobScheduler>>>,
28}
29
30impl JobExecutorInternal {
31    /*
32        /// Returns true if the JobExecutor contains no jobs.
33        pub async fn is_empty(&self) -> bool {
34            let jobs = self.jobs.read().await;
35            jobs.is_empty()
36        }
37
38        /// Returns the number of jobs in the JobExecutor.
39        pub async fn len(&self) -> usize {
40            let jobs = self.jobs.read().await;
41            jobs.len()
42        }
43
44        /// Clear the JobExecutor, removing all jobs.
45        pub async fn clear(&mut self) {
46            let mut jobs = self.jobs.write().await;
47            jobs.clear()
48        }
49
50        /// Returns true if there is at least one job pending.
51        pub async fn is_pending_job(&self) -> bool {
52            let jobs = self.jobs.read().await;
53            for job_scheduler in jobs.iter() {
54                if job_scheduler.is_pending().await {
55                    return true;
56                }
57            }
58            false
59        }
60    */
61    /// Returns true if the Job Executor is running
62    fn is_running(&self) -> bool {
63        self.running.load(Ordering::SeqCst)
64    }
65
66    /// Returns true if there is at least one job running.
67    async fn is_running_job(&self) -> bool {
68        let jobs = self.jobs.read().await;
69        for job_scheduler in jobs.iter() {
70            if job_scheduler.job.is_running().await {
71                return true;
72            }
73        }
74        false
75    }
76
77    /// Run pending jobs in the JobExecutor.
78    async fn run_pending_jobs(&self) {
79        trace!("Check pending jobs");
80        let jobs = self.jobs.read().await;
81        for job_scheduler in jobs.iter() {
82            //println!("check JOB IS PENDING: {}", job.is_pending());
83            if job_scheduler.is_pending().await {
84                //println!("JOB IS RUNNING? {}", is_running);
85                if !job_scheduler.job.is_running().await {
86                    let job_clone = job_scheduler.clone();
87
88                    let timestamp = Utc::now().timestamp();
89                    let group = job_clone.job.group().to_owned();
90                    let name = job_clone.job.name().to_owned();
91
92                    let fut = instrument(timestamp, group.clone(), name.clone(), async move {
93                        info!("Start execution of Job [{}/{}]", group, name);
94                        let start = std::time::Instant::now();
95                        let result = job_clone.run().await;
96
97                        let duration = start.elapsed();
98
99                        let mills = duration.subsec_millis();
100                        let duration_secs = duration.as_secs();
101                        let seconds = duration_secs % 60;
102                        let minutes = (duration_secs / 60) % 60;
103                        let hours = (duration_secs / 60) / 60;
104                        let duration_fmt = format!(
105                            "{hours:02} hour(s), {minutes:02} minute(s), {seconds:02} second(s) and {mills:03} millis"
106                        );
107
108                        match result {
109                            Ok(()) => {
110                                info!(
111                                    "Execution of Job [{}/{}] completed successfully in {}",
112                                    group, name, duration_fmt
113                                );
114                            }
115                            Err(err) => {
116                                error!(
117                                    "Execution of Job [{}/{}] completed with errors in {}. Err: {:?}",
118                                    group, name, duration_fmt, err
119                                );
120                            }
121                        }
122                    });
123
124                    tokio::spawn(fut);
125                } else {
126                    debug!(
127                        "Job [{}/{}] is pending but already running. It will not be executed.",
128                        job_scheduler.job.group(),
129                        job_scheduler.job.name()
130                    )
131                }
132            }
133        }
134    }
135
136    /// Adds a job to the JobExecutor.
137    async fn add_job_with_scheduler<S: Into<Scheduler>>(&self, schedule: S, job: Job) {
138        info!("Add job to scheduler. Group [{}] - Name [{}]", job.group(), job.name());
139        let mut jobs = self.jobs.write().await;
140        jobs.push(Arc::new(JobScheduler::new(schedule.into(), self.timezone, job)));
141    }
142}
143
144impl JobExecutor {
145    /// Creates a new Executor that uses the Local time zone for the execution times evaluation.
146    /// For example, the cron expressions will refer to the Local time zone.
147    pub fn new_with_local_tz() -> JobExecutor {
148        Self::new_with_tz(None)
149    }
150
151    /// Creates a new Executor that uses the UTC time zone for the execution times evaluation.
152    /// For example, the cron expressions will refer to the UTC time zone.
153    pub fn new_with_utc_tz() -> JobExecutor {
154        Self::new_with_tz(Some(UTC))
155    }
156
157    /// Creates a new Executor that uses a custom time zone for the execution times evaluation.
158    /// For example, the cron expressions will refer to the specified time zone.
159    pub fn new_with_tz(timezone: Option<Tz>) -> JobExecutor {
160        JobExecutor {
161            executor: Arc::new(JobExecutorInternal {
162                sleep_between_checks: ArcSwap::new(Arc::new(Duration::new(1, 0))),
163                running: AtomicBool::new(false),
164                timezone,
165                jobs: RwLock::new(vec![]),
166            }),
167        }
168    }
169
170    /// Adds a job to the JobExecutor.
171    pub async fn add_job(&self, schedule: &dyn TryToScheduler, job: Job) -> Result<(), SchedulerError> {
172        self.add_job_with_scheduler(schedule.to_scheduler()?, job).await;
173        Ok(())
174    }
175
176    /// Adds a job to the JobExecutor.
177    pub async fn add_job_with_multi_schedule(
178        &self,
179        schedule: &[&dyn TryToScheduler],
180        job: Job,
181    ) -> Result<(), SchedulerError> {
182        self.add_job_with_scheduler(schedule.to_scheduler()?, job).await;
183        Ok(())
184    }
185
186    /// Adds a job to the JobExecutor.
187    pub async fn add_job_with_scheduler<S: Into<Scheduler>>(&self, schedule: S, job: Job) {
188        self.executor.add_job_with_scheduler(schedule, job).await
189    }
190
191    /// Starts the JobExecutor
192    pub async fn run(&self) -> Result<JoinHandle<()>, SchedulerError> {
193        let was_running = self.executor.running.swap(true, Ordering::SeqCst);
194        if !was_running {
195            let executor = self.executor.clone();
196            Ok(tokio::spawn(async move {
197                info!("Starting the job executor");
198                while executor.is_running() {
199                    executor.run_pending_jobs().await;
200                    tokio::time::sleep(*executor.sleep_between_checks.load().as_ref()).await;
201                }
202                info!("Job executor stopped");
203            }))
204        } else {
205            warn!("The JobExecutor is already running.");
206            Err(SchedulerError::JobExecutionStateError { message: "The JobExecutor is already running.".to_owned() })
207        }
208    }
209
210    /// Stops the JobExecutor
211    pub async fn stop(&self, graceful: bool) -> Result<(), SchedulerError> {
212        let was_running = self.executor.running.swap(false, Ordering::SeqCst);
213        if was_running {
214            info!("Stopping the job executor");
215            if graceful {
216                info!("Wait for all Jobs to complete");
217                while self.executor.is_running_job().await {
218                    tokio::time::sleep(*self.executor.sleep_between_checks.load().as_ref()).await;
219                }
220                info!("All Jobs completed");
221            }
222            Ok(())
223        } else {
224            warn!("The JobExecutor is not running.");
225            Err(SchedulerError::JobExecutionStateError { message: "The JobExecutor is not running.".to_owned() })
226        }
227    }
228
229    /// Sets the sleep time between checks for pending Jobs.
230    /// The default is 1 second.
231    pub fn set_sleep_between_checks(&self, sleep: Duration) {
232        self.executor.sleep_between_checks.store(Arc::new(sleep));
233    }
234}
235
236#[cfg(feature = "tracing")]
237fn instrument<F: std::future::Future<Output = ()>>(
238    timestamp: i64,
239    group: String,
240    name: String,
241    fut: F,
242) -> impl std::future::Future<Output = ()> {
243    use tracing_futures::Instrument;
244    let span = tracing::error_span!("run_pending", group, name, timestamp);
245    fut.instrument(span)
246}
247
248#[cfg(not(feature = "tracing"))]
249fn instrument<F: std::future::Future<Output = ()>>(
250    _timestamp: i64,
251    _group: String,
252    _name: String,
253    fut: F,
254) -> impl std::future::Future<Output = ()> {
255    fut
256}
257
258#[cfg(test)]
259pub mod test {
260
261    use super::*;
262    use chrono::Utc;
263    use std::sync::atomic::{AtomicUsize, Ordering};
264    use std::time::Duration;
265    use tokio::sync::mpsc::channel;
266
267    #[tokio::test]
268    async fn should_not_run_an_already_running_job() {
269        let executor = JobExecutor::new_with_utc_tz();
270
271        let count = Arc::new(AtomicUsize::new(0));
272        let count_clone = count.clone();
273
274        let (tx, mut rx) = channel(1000);
275
276        executor
277            .add_job(
278                &Duration::new(0, 1),
279                Job::new("g", "n", None, move || {
280                    let count_clone = count_clone.clone();
281                    let tx = tx.clone();
282                    Box::pin(async move {
283                        tx.send("").await.unwrap();
284                        println!("job - started");
285                        count_clone.fetch_add(1, Ordering::SeqCst);
286                        tokio::time::sleep(Duration::new(1, 0)).await;
287                        Ok(())
288                    })
289                }),
290            )
291            .await
292            .unwrap();
293
294        for i in 0..100 {
295            println!("run_pending {i}");
296            executor.executor.run_pending_jobs().await;
297            tokio::time::sleep(Duration::new(0, 2)).await;
298        }
299
300        println!("run_pending completed");
301        rx.recv().await.unwrap();
302
303        assert_eq!(count.load(Ordering::Relaxed), 1);
304    }
305
306    #[tokio::test]
307    async fn a_running_job_should_not_block_the_executor() {
308        let executor = JobExecutor::new_with_local_tz();
309
310        let (tx, mut rx) = channel(959898);
311
312        let count_1 = Arc::new(AtomicUsize::new(0));
313        let count_1_clone = count_1.clone();
314        let tx_1 = tx.clone();
315        executor
316            .add_job_with_multi_schedule(
317                &[&Duration::new(0, 1)],
318                Job::new("g", "n", None, move || {
319                    let count_1_clone = count_1_clone.clone();
320                    let tx_1 = tx_1.clone();
321                    Box::pin(async move {
322                        tx_1.send("").await.unwrap();
323                        println!("job 1 - started");
324                        count_1_clone.fetch_add(1, Ordering::SeqCst);
325                        tokio::time::sleep(Duration::new(1, 0)).await;
326                        Ok(())
327                    })
328                }),
329            )
330            .await
331            .unwrap();
332
333        let count_2 = Arc::new(AtomicUsize::new(0));
334        let count_2_clone = count_2.clone();
335        let tx_2 = tx.clone();
336        executor
337            .add_job(
338                &Duration::new(0, 1),
339                Job::new("g", "n", None, move || {
340                    let count_2_clone = count_2_clone.clone();
341                    let tx_2 = tx_2.clone();
342                    Box::pin(async move {
343                        tx_2.send("").await.unwrap();
344                        println!("job 2 - started");
345                        count_2_clone.fetch_add(1, Ordering::SeqCst);
346                        tokio::time::sleep(Duration::new(1, 0)).await;
347                        Ok(())
348                    })
349                }),
350            )
351            .await
352            .unwrap();
353
354        let count_3 = Arc::new(AtomicUsize::new(0));
355        let count_3_clone = count_3.clone();
356        let tx_3 = tx.clone();
357        executor
358            .add_job(
359                &Duration::new(0, 1),
360                Job::new("g", "n", None, move || {
361                    let count_3_clone = count_3_clone.clone();
362                    let tx_3 = tx_3.clone();
363                    Box::pin(async move {
364                        tx_3.send("").await.unwrap();
365                        println!("job 3 - started");
366                        count_3_clone.fetch_add(1, Ordering::SeqCst);
367                        tokio::time::sleep(Duration::new(1, 0)).await;
368                        Ok(())
369                    })
370                }),
371            )
372            .await
373            .unwrap();
374
375        let before_millis = Utc::now().timestamp_millis();
376        for i in 0..100 {
377            println!("run_pending {i}");
378            executor.executor.run_pending_jobs().await;
379            tokio::time::sleep(Duration::new(0, 1_000_000)).await;
380        }
381        let after_millis = Utc::now().timestamp_millis();
382
383        assert!((after_millis - before_millis) >= 100);
384        assert!((after_millis - before_millis) < 1000);
385
386        rx.recv().await.unwrap();
387
388        assert_eq!(count_1.load(Ordering::SeqCst), 1);
389        assert_eq!(count_2.load(Ordering::SeqCst), 1);
390        assert_eq!(count_3.load(Ordering::SeqCst), 1);
391    }
392
393    #[tokio::test]
394    async fn should_gracefully_shutdown_the_job_executor() {
395        let executor = JobExecutor::new_with_utc_tz();
396
397        let count = Arc::new(AtomicUsize::new(0));
398
399        let tasks = 100;
400
401        for _i in 0..tasks {
402            let count_clone = count.clone();
403            executor
404                .add_job(
405                    &Duration::new(0, 1),
406                    Job::new("g", "n", None, move || {
407                        let count_clone = count_clone.clone();
408                        Box::pin(async move {
409                            tokio::time::sleep(Duration::new(1, 0)).await;
410                            println!("job - started");
411                            count_clone.fetch_add(1, Ordering::SeqCst);
412                            Ok(())
413                        })
414                    }),
415                )
416                .await
417                .unwrap();
418        }
419
420        executor.set_sleep_between_checks(Duration::from_millis(10));
421
422        executor.run().await.unwrap();
423
424        loop {
425            if executor.executor.is_running_job().await {
426                break;
427            }
428            tokio::time::sleep(Duration::from_nanos(1)).await;
429        }
430
431        executor.stop(true).await.unwrap();
432
433        assert_eq!(count.load(Ordering::Relaxed), tasks);
434    }
435
436    #[tokio::test]
437    async fn start_should_fail_if_already_running() {
438        let executor = JobExecutor::new_with_utc_tz();
439        assert!(executor.run().await.is_ok());
440        assert!(executor.run().await.is_err());
441        assert!(executor.stop(false).await.is_ok());
442    }
443
444    #[tokio::test]
445    async fn stop_should_fail_if_not_running() {
446        let executor = JobExecutor::new_with_utc_tz();
447        assert!(executor.stop(false).await.is_err());
448        assert!(executor.run().await.is_ok());
449        assert!(executor.stop(false).await.is_ok());
450        assert!(executor.stop(false).await.is_err());
451    }
452
453    #[tokio::test]
454    async fn should_add_with_explicit_scheduler() {
455        let executor = JobExecutor::new_with_utc_tz();
456        executor
457            .add_job_with_scheduler(Scheduler::Never, Job::new("g", "n", None, move || Box::pin(async { Ok(()) })))
458            .await;
459    }
460
461    #[tokio::test]
462    async fn should_register_a_schedule_by_vec() {
463        let executor = JobExecutor::new_with_utc_tz();
464        executor
465            .add_job(&vec!["0 1 * * * * *"], Job::new("g", "n", None, move || Box::pin(async { Ok(()) })))
466            .await
467            .unwrap();
468        executor
469            .add_job(
470                &vec!["0 1 * * * * *".to_owned(), "0 1 * * * * *".to_owned()],
471                Job::new("g", "n", None, move || Box::pin(async { Ok(()) })),
472            )
473            .await
474            .unwrap();
475    }
476}