1use crate::{error::Error, job::Job, scheduler::Scheduler, Queue, Result};
2use log::{error, info};
3use std::env;
4use std::{
5 panic::{self, AssertUnwindSafe},
6 sync::atomic::{AtomicBool, Ordering},
7 time::{Duration, Instant},
8};
9
10pub struct Worker {
11 queue: Queue,
12 should_exit: AtomicBool,
13 scheduler: Option<Scheduler>,
14}
15
16impl Worker {
17 pub fn new(queue_name: &str, redis_connection_string: &str) -> Result<Self> {
18 Ok(Self {
19 queue: Queue::new(queue_name, redis_connection_string)?,
20 should_exit: AtomicBool::new(false),
21 scheduler: None,
22 })
23 }
24
25 pub fn new_from_env() -> Result<Self> {
26 Self::new(
27 &env::var("QUEUE_NAME").expect("QUEUE_NAME should be set"),
28 &env::var("REDIS_URL").expect("REDIS_URL should be set"),
29 )
30 }
31
32 fn maybe_process_job(&mut self, timeout: Duration) -> Result<Option<Job>> {
33 let maybe_job = self.queue.dequeue_executable_job(timeout)?;
34 match maybe_job {
35 Some(job) => {
36 let job_result = panic::catch_unwind(AssertUnwindSafe(|| job.executable.execute()));
37 if let Ok(ret) = job_result {
38 if let Err(e) = ret {
39 Err(Error::JobExecutionError {
40 job_name: job.name,
41 error: e.to_string(),
42 })
43 } else {
44 Ok(Some(job))
45 }
46 } else {
47 Err(Error::JobExecutionError {
48 job_name: job.name,
49 error: "Panic".to_string(),
50 })
51 }
52 }
53 None => Ok(None),
54 }
55 }
56
57 pub fn run(&mut self) -> Result<()> {
58 let mut now = Instant::now();
59 let mut first = true;
60 while !self.should_exit.load(Ordering::SeqCst) {
61 if let Some(scheduler) = &mut self.scheduler {
62 if first || now.elapsed() > Duration::from_secs(60) {
63 scheduler.tick()?;
64 first = false;
65 now = Instant::now();
66 }
67 }
68 match self.maybe_process_job(Duration::from_secs(10)) {
69 Ok(None) => continue,
70 Ok(Some(job)) => {
71 info!("Successfully executed {}.", job.name);
72 self.queue.delete_job_data(&job.name)?;
73 if let Some(scheduler) = &mut self.scheduler {
74 scheduler.job_succeeded(&job.name)?;
75 }
76 }
77 Err(Error::JobExecutionError { job_name, error }) => {
78 error!("Failed to execute job {} with error: {}.", job_name, error);
79 self.queue.delete_job_data(&job_name)?;
80 }
81 Err(e) => return Err(e),
82 }
83 }
84 Ok(())
85 }
86
87 pub fn request_shutdown(&self) {
88 self.should_exit.store(true, Ordering::SeqCst);
89 }
90
91 pub fn create_scheduler(&self) -> Result<Scheduler> {
92 Scheduler::new(self.queue.name(), self.queue.get_client_clone())
93 }
94
95 pub fn use_scheduler(&mut self, scheduler: Scheduler) {
96 self.scheduler = Some(scheduler);
97 }
98}