1use crate::error::SchedulerError;
2use crate::job::{Job, JobScheduler};
3use crate::scheduler::{Scheduler, TryToScheduler};
4use arc_swap::ArcSwap;
5use chrono::Utc;
6use chrono_tz::{Tz, UTC};
7use log::*;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12use tokio::task::JoinHandle;
13
14pub mod error;
15pub mod job;
16pub mod scheduler;
17
18#[derive(Clone)]
19pub struct JobExecutor {
20 executor: Arc<JobExecutorInternal>,
21}
22
23struct JobExecutorInternal {
24 sleep_between_checks: ArcSwap<Duration>,
25 running: AtomicBool,
26 timezone: Option<Tz>,
27 jobs: RwLock<Vec<Arc<JobScheduler>>>,
28}
29
30impl JobExecutorInternal {
31 fn is_running(&self) -> bool {
63 self.running.load(Ordering::SeqCst)
64 }
65
66 async fn is_running_job(&self) -> bool {
68 let jobs = self.jobs.read().await;
69 for job_scheduler in jobs.iter() {
70 if job_scheduler.job.is_running().await {
71 return true;
72 }
73 }
74 false
75 }
76
77 async fn run_pending_jobs(&self) {
79 trace!("Check pending jobs");
80 let jobs = self.jobs.read().await;
81 for job_scheduler in jobs.iter() {
82 if job_scheduler.is_pending().await {
84 if !job_scheduler.job.is_running().await {
86 let job_clone = job_scheduler.clone();
87
88 let timestamp = Utc::now().timestamp();
89 let group = job_clone.job.group().to_owned();
90 let name = job_clone.job.name().to_owned();
91
92 let fut = instrument(timestamp, group.clone(), name.clone(), async move {
93 info!("Start execution of Job [{}/{}]", group, name);
94 let start = std::time::Instant::now();
95 let result = job_clone.run().await;
96
97 let duration = start.elapsed();
98
99 let mills = duration.subsec_millis();
100 let duration_secs = duration.as_secs();
101 let seconds = duration_secs % 60;
102 let minutes = (duration_secs / 60) % 60;
103 let hours = (duration_secs / 60) / 60;
104 let duration_fmt = format!(
105 "{hours:02} hour(s), {minutes:02} minute(s), {seconds:02} second(s) and {mills:03} millis"
106 );
107
108 match result {
109 Ok(()) => {
110 info!(
111 "Execution of Job [{}/{}] completed successfully in {}",
112 group, name, duration_fmt
113 );
114 }
115 Err(err) => {
116 error!(
117 "Execution of Job [{}/{}] completed with errors in {}. Err: {:?}",
118 group, name, duration_fmt, err
119 );
120 }
121 }
122 });
123
124 tokio::spawn(fut);
125 } else {
126 debug!(
127 "Job [{}/{}] is pending but already running. It will not be executed.",
128 job_scheduler.job.group(),
129 job_scheduler.job.name()
130 )
131 }
132 }
133 }
134 }
135
136 async fn add_job_with_scheduler<S: Into<Scheduler>>(&self, schedule: S, job: Job) {
138 info!("Add job to scheduler. Group [{}] - Name [{}]", job.group(), job.name());
139 let mut jobs = self.jobs.write().await;
140 jobs.push(Arc::new(JobScheduler::new(schedule.into(), self.timezone, job)));
141 }
142}
143
144impl JobExecutor {
145 pub fn new_with_local_tz() -> JobExecutor {
148 Self::new_with_tz(None)
149 }
150
151 pub fn new_with_utc_tz() -> JobExecutor {
154 Self::new_with_tz(Some(UTC))
155 }
156
157 pub fn new_with_tz(timezone: Option<Tz>) -> JobExecutor {
160 JobExecutor {
161 executor: Arc::new(JobExecutorInternal {
162 sleep_between_checks: ArcSwap::new(Arc::new(Duration::new(1, 0))),
163 running: AtomicBool::new(false),
164 timezone,
165 jobs: RwLock::new(vec![]),
166 }),
167 }
168 }
169
170 pub async fn add_job(&self, schedule: &dyn TryToScheduler, job: Job) -> Result<(), SchedulerError> {
172 self.add_job_with_scheduler(schedule.to_scheduler()?, job).await;
173 Ok(())
174 }
175
176 pub async fn add_job_with_multi_schedule(
178 &self,
179 schedule: &[&dyn TryToScheduler],
180 job: Job,
181 ) -> Result<(), SchedulerError> {
182 self.add_job_with_scheduler(schedule.to_scheduler()?, job).await;
183 Ok(())
184 }
185
186 pub async fn add_job_with_scheduler<S: Into<Scheduler>>(&self, schedule: S, job: Job) {
188 self.executor.add_job_with_scheduler(schedule, job).await
189 }
190
191 pub async fn run(&self) -> Result<JoinHandle<()>, SchedulerError> {
193 let was_running = self.executor.running.swap(true, Ordering::SeqCst);
194 if !was_running {
195 let executor = self.executor.clone();
196 Ok(tokio::spawn(async move {
197 info!("Starting the job executor");
198 while executor.is_running() {
199 executor.run_pending_jobs().await;
200 tokio::time::sleep(*executor.sleep_between_checks.load().as_ref()).await;
201 }
202 info!("Job executor stopped");
203 }))
204 } else {
205 warn!("The JobExecutor is already running.");
206 Err(SchedulerError::JobExecutionStateError { message: "The JobExecutor is already running.".to_owned() })
207 }
208 }
209
210 pub async fn stop(&self, graceful: bool) -> Result<(), SchedulerError> {
212 let was_running = self.executor.running.swap(false, Ordering::SeqCst);
213 if was_running {
214 info!("Stopping the job executor");
215 if graceful {
216 info!("Wait for all Jobs to complete");
217 while self.executor.is_running_job().await {
218 tokio::time::sleep(*self.executor.sleep_between_checks.load().as_ref()).await;
219 }
220 info!("All Jobs completed");
221 }
222 Ok(())
223 } else {
224 warn!("The JobExecutor is not running.");
225 Err(SchedulerError::JobExecutionStateError { message: "The JobExecutor is not running.".to_owned() })
226 }
227 }
228
229 pub fn set_sleep_between_checks(&self, sleep: Duration) {
232 self.executor.sleep_between_checks.store(Arc::new(sleep));
233 }
234}
235
236#[cfg(feature = "tracing")]
237fn instrument<F: std::future::Future<Output = ()>>(
238 timestamp: i64,
239 group: String,
240 name: String,
241 fut: F,
242) -> impl std::future::Future<Output = ()> {
243 use tracing_futures::Instrument;
244 let span = tracing::error_span!("run_pending", group, name, timestamp);
245 fut.instrument(span)
246}
247
248#[cfg(not(feature = "tracing"))]
249fn instrument<F: std::future::Future<Output = ()>>(
250 _timestamp: i64,
251 _group: String,
252 _name: String,
253 fut: F,
254) -> impl std::future::Future<Output = ()> {
255 fut
256}
257
258#[cfg(test)]
259pub mod test {
260
261 use super::*;
262 use chrono::Utc;
263 use std::sync::atomic::{AtomicUsize, Ordering};
264 use std::time::Duration;
265 use tokio::sync::mpsc::channel;
266
267 #[tokio::test]
268 async fn should_not_run_an_already_running_job() {
269 let executor = JobExecutor::new_with_utc_tz();
270
271 let count = Arc::new(AtomicUsize::new(0));
272 let count_clone = count.clone();
273
274 let (tx, mut rx) = channel(1000);
275
276 executor
277 .add_job(
278 &Duration::new(0, 1),
279 Job::new("g", "n", None, move || {
280 let count_clone = count_clone.clone();
281 let tx = tx.clone();
282 Box::pin(async move {
283 tx.send("").await.unwrap();
284 println!("job - started");
285 count_clone.fetch_add(1, Ordering::SeqCst);
286 tokio::time::sleep(Duration::new(1, 0)).await;
287 Ok(())
288 })
289 }),
290 )
291 .await
292 .unwrap();
293
294 for i in 0..100 {
295 println!("run_pending {i}");
296 executor.executor.run_pending_jobs().await;
297 tokio::time::sleep(Duration::new(0, 2)).await;
298 }
299
300 println!("run_pending completed");
301 rx.recv().await.unwrap();
302
303 assert_eq!(count.load(Ordering::Relaxed), 1);
304 }
305
306 #[tokio::test]
307 async fn a_running_job_should_not_block_the_executor() {
308 let executor = JobExecutor::new_with_local_tz();
309
310 let (tx, mut rx) = channel(959898);
311
312 let count_1 = Arc::new(AtomicUsize::new(0));
313 let count_1_clone = count_1.clone();
314 let tx_1 = tx.clone();
315 executor
316 .add_job_with_multi_schedule(
317 &[&Duration::new(0, 1)],
318 Job::new("g", "n", None, move || {
319 let count_1_clone = count_1_clone.clone();
320 let tx_1 = tx_1.clone();
321 Box::pin(async move {
322 tx_1.send("").await.unwrap();
323 println!("job 1 - started");
324 count_1_clone.fetch_add(1, Ordering::SeqCst);
325 tokio::time::sleep(Duration::new(1, 0)).await;
326 Ok(())
327 })
328 }),
329 )
330 .await
331 .unwrap();
332
333 let count_2 = Arc::new(AtomicUsize::new(0));
334 let count_2_clone = count_2.clone();
335 let tx_2 = tx.clone();
336 executor
337 .add_job(
338 &Duration::new(0, 1),
339 Job::new("g", "n", None, move || {
340 let count_2_clone = count_2_clone.clone();
341 let tx_2 = tx_2.clone();
342 Box::pin(async move {
343 tx_2.send("").await.unwrap();
344 println!("job 2 - started");
345 count_2_clone.fetch_add(1, Ordering::SeqCst);
346 tokio::time::sleep(Duration::new(1, 0)).await;
347 Ok(())
348 })
349 }),
350 )
351 .await
352 .unwrap();
353
354 let count_3 = Arc::new(AtomicUsize::new(0));
355 let count_3_clone = count_3.clone();
356 let tx_3 = tx.clone();
357 executor
358 .add_job(
359 &Duration::new(0, 1),
360 Job::new("g", "n", None, move || {
361 let count_3_clone = count_3_clone.clone();
362 let tx_3 = tx_3.clone();
363 Box::pin(async move {
364 tx_3.send("").await.unwrap();
365 println!("job 3 - started");
366 count_3_clone.fetch_add(1, Ordering::SeqCst);
367 tokio::time::sleep(Duration::new(1, 0)).await;
368 Ok(())
369 })
370 }),
371 )
372 .await
373 .unwrap();
374
375 let before_millis = Utc::now().timestamp_millis();
376 for i in 0..100 {
377 println!("run_pending {i}");
378 executor.executor.run_pending_jobs().await;
379 tokio::time::sleep(Duration::new(0, 1_000_000)).await;
380 }
381 let after_millis = Utc::now().timestamp_millis();
382
383 assert!((after_millis - before_millis) >= 100);
384 assert!((after_millis - before_millis) < 1000);
385
386 rx.recv().await.unwrap();
387
388 assert_eq!(count_1.load(Ordering::SeqCst), 1);
389 assert_eq!(count_2.load(Ordering::SeqCst), 1);
390 assert_eq!(count_3.load(Ordering::SeqCst), 1);
391 }
392
393 #[tokio::test]
394 async fn should_gracefully_shutdown_the_job_executor() {
395 let executor = JobExecutor::new_with_utc_tz();
396
397 let count = Arc::new(AtomicUsize::new(0));
398
399 let tasks = 100;
400
401 for _i in 0..tasks {
402 let count_clone = count.clone();
403 executor
404 .add_job(
405 &Duration::new(0, 1),
406 Job::new("g", "n", None, move || {
407 let count_clone = count_clone.clone();
408 Box::pin(async move {
409 tokio::time::sleep(Duration::new(1, 0)).await;
410 println!("job - started");
411 count_clone.fetch_add(1, Ordering::SeqCst);
412 Ok(())
413 })
414 }),
415 )
416 .await
417 .unwrap();
418 }
419
420 executor.set_sleep_between_checks(Duration::from_millis(10));
421
422 executor.run().await.unwrap();
423
424 loop {
425 if executor.executor.is_running_job().await {
426 break;
427 }
428 tokio::time::sleep(Duration::from_nanos(1)).await;
429 }
430
431 executor.stop(true).await.unwrap();
432
433 assert_eq!(count.load(Ordering::Relaxed), tasks);
434 }
435
436 #[tokio::test]
437 async fn start_should_fail_if_already_running() {
438 let executor = JobExecutor::new_with_utc_tz();
439 assert!(executor.run().await.is_ok());
440 assert!(executor.run().await.is_err());
441 assert!(executor.stop(false).await.is_ok());
442 }
443
444 #[tokio::test]
445 async fn stop_should_fail_if_not_running() {
446 let executor = JobExecutor::new_with_utc_tz();
447 assert!(executor.stop(false).await.is_err());
448 assert!(executor.run().await.is_ok());
449 assert!(executor.stop(false).await.is_ok());
450 assert!(executor.stop(false).await.is_err());
451 }
452
453 #[tokio::test]
454 async fn should_add_with_explicit_scheduler() {
455 let executor = JobExecutor::new_with_utc_tz();
456 executor
457 .add_job_with_scheduler(Scheduler::Never, Job::new("g", "n", None, move || Box::pin(async { Ok(()) })))
458 .await;
459 }
460
461 #[tokio::test]
462 async fn should_register_a_schedule_by_vec() {
463 let executor = JobExecutor::new_with_utc_tz();
464 executor
465 .add_job(&vec!["0 1 * * * * *"], Job::new("g", "n", None, move || Box::pin(async { Ok(()) })))
466 .await
467 .unwrap();
468 executor
469 .add_job(
470 &vec!["0 1 * * * * *".to_owned(), "0 1 * * * * *".to_owned()],
471 Job::new("g", "n", None, move || Box::pin(async { Ok(()) })),
472 )
473 .await
474 .unwrap();
475 }
476}