Skip to main content

doitlater/
worker.rs

1use crate::{error::Error, job::Job, scheduler::Scheduler, Queue, Result};
2use log::{error, info};
3use std::env;
4use std::{
5    panic::{self, AssertUnwindSafe},
6    sync::atomic::{AtomicBool, Ordering},
7    time::{Duration, Instant},
8};
9
10pub struct Worker {
11    queue: Queue,
12    should_exit: AtomicBool,
13    scheduler: Option<Scheduler>,
14}
15
16impl Worker {
17    pub fn new(queue_name: &str, redis_connection_string: &str) -> Result<Self> {
18        Ok(Self {
19            queue: Queue::new(queue_name, redis_connection_string)?,
20            should_exit: AtomicBool::new(false),
21            scheduler: None,
22        })
23    }
24
25    pub fn new_from_env() -> Result<Self> {
26        Self::new(
27            &env::var("QUEUE_NAME").expect("QUEUE_NAME should be set"),
28            &env::var("REDIS_URL").expect("REDIS_URL should be set"),
29        )
30    }
31
32    fn maybe_process_job(&mut self, timeout: Duration) -> Result<Option<Job>> {
33        let maybe_job = self.queue.dequeue_executable_job(timeout)?;
34        match maybe_job {
35            Some(job) => {
36                let job_result = panic::catch_unwind(AssertUnwindSafe(|| job.executable.execute()));
37                if let Ok(ret) = job_result {
38                    if let Err(e) = ret {
39                        Err(Error::JobExecutionError {
40                            job_name: job.name,
41                            error: e.to_string(),
42                        })
43                    } else {
44                        Ok(Some(job))
45                    }
46                } else {
47                    Err(Error::JobExecutionError {
48                        job_name: job.name,
49                        error: "Panic".to_string(),
50                    })
51                }
52            }
53            None => Ok(None),
54        }
55    }
56
57    pub fn run(&mut self) -> Result<()> {
58        let mut now = Instant::now();
59        let mut first = true;
60        while !self.should_exit.load(Ordering::SeqCst) {
61            if let Some(scheduler) = &mut self.scheduler {
62                if first || now.elapsed() > Duration::from_secs(60) {
63                    scheduler.tick()?;
64                    first = false;
65                    now = Instant::now();
66                }
67            }
68            match self.maybe_process_job(Duration::from_secs(10)) {
69                Ok(None) => continue,
70                Ok(Some(job)) => {
71                    info!("Successfully executed {}.", job.name);
72                    self.queue.delete_job_data(&job.name)?;
73                    if let Some(scheduler) = &mut self.scheduler {
74                        scheduler.job_succeeded(&job.name)?;
75                    }
76                }
77                Err(Error::JobExecutionError { job_name, error }) => {
78                    error!("Failed to execute job {} with error: {}.", job_name, error);
79                    self.queue.delete_job_data(&job_name)?;
80                }
81                Err(e) => return Err(e),
82            }
83        }
84        Ok(())
85    }
86
87    pub fn request_shutdown(&self) {
88        self.should_exit.store(true, Ordering::SeqCst);
89    }
90
91    pub fn create_scheduler(&self) -> Result<Scheduler> {
92        Scheduler::new(self.queue.name(), self.queue.get_client_clone())
93    }
94
95    pub fn use_scheduler(&mut self, scheduler: Scheduler) {
96        self.scheduler = Some(scheduler);
97    }
98}